Skip to main content

codetether_agent/cognition/
mod.rs

1//! Perpetual cognition runtime for persona swarms.
2//!
3//! Phase 0 scope:
4//! - Contract types for personas, thought events, proposals, and snapshots
5//! - In-memory runtime manager for lifecycle + lineage
6//! - Feature-flagged perpetual loop (no external execution side effects)
7
8pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16pub use thinker::{
17    CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
18};
19
20use crate::tool::ToolRegistry;
21use anyhow::{Result, anyhow};
22use beliefs::{Belief, BeliefStatus};
23use chrono::{DateTime, Duration as ChronoDuration, Utc};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::time::Duration;
30use tokio::sync::{Mutex, RwLock, broadcast};
31use tokio::task::JoinHandle;
32
33// Ensure re-exported types are referenced to suppress warnings.
34const _: () = {
35    fn _assert_types_used() {
36        let _ = std::mem::size_of::<CandleDevicePreference>();
37        let _ = std::mem::size_of::<ThinkerBackend>();
38        let _ = std::mem::size_of::<ThinkerOutput>();
39    }
40};
41use tokio::time::Instant;
42use uuid::Uuid;
43
44/// Persona execution status.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47pub enum PersonaStatus {
48    Active,
49    Idle,
50    Reaped,
51    Error,
52}
53
54/// Policy boundaries for a persona.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct PersonaPolicy {
57    pub max_spawn_depth: u32,
58    pub max_branching_factor: u32,
59    pub token_budget_per_minute: u32,
60    pub compute_ms_per_minute: u32,
61    pub idle_ttl_secs: u64,
62    pub share_memory: bool,
63    #[serde(default)]
64    pub allowed_tools: Vec<String>,
65}
66
67impl Default for PersonaPolicy {
68    fn default() -> Self {
69        Self {
70            max_spawn_depth: 4,
71            max_branching_factor: 4,
72            token_budget_per_minute: 20_000,
73            compute_ms_per_minute: 10_000,
74            idle_ttl_secs: 3_600,
75            share_memory: false,
76            allowed_tools: Vec::new(),
77        }
78    }
79}
80
81/// Identity contract for a persona.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct PersonaIdentity {
84    pub id: String,
85    pub name: String,
86    pub role: String,
87    pub charter: String,
88    pub swarm_id: Option<String>,
89    pub parent_id: Option<String>,
90    pub depth: u32,
91    pub created_at: DateTime<Utc>,
92    #[serde(default)]
93    pub tags: Vec<String>,
94}
95
96/// Full runtime state for a persona.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct PersonaRuntimeState {
99    pub identity: PersonaIdentity,
100    pub policy: PersonaPolicy,
101    pub status: PersonaStatus,
102    pub thought_count: u64,
103    pub last_tick_at: Option<DateTime<Utc>>,
104    pub updated_at: DateTime<Utc>,
105    /// Tokens consumed in current 60-second window.
106    pub tokens_this_window: u32,
107    /// Compute milliseconds consumed in current 60-second window.
108    pub compute_ms_this_window: u32,
109    /// Start of the current budget window.
110    pub window_started_at: DateTime<Utc>,
111    /// Last time this persona made meaningful progress (not budget-paused/quorum-waiting).
112    pub last_progress_at: DateTime<Utc>,
113    /// Whether this persona is currently paused due to budget exhaustion.
114    #[serde(default)]
115    pub budget_paused: bool,
116}
117
118/// Proposal risk level.
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
120#[serde(rename_all = "snake_case")]
121pub enum ProposalRisk {
122    Low,
123    Medium,
124    High,
125    Critical,
126}
127
128/// Proposal status.
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(rename_all = "snake_case")]
131pub enum ProposalStatus {
132    Created,
133    Verified,
134    Rejected,
135    Executed,
136}
137
138/// A vote on a proposal.
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum ProposalVote {
142    Approve,
143    Reject,
144    Veto,
145    Abstain,
146}
147
148/// Proposal contract (think first, execute through gates).
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Proposal {
151    pub id: String,
152    pub persona_id: String,
153    pub title: String,
154    pub rationale: String,
155    pub evidence_refs: Vec<String>,
156    pub risk: ProposalRisk,
157    pub status: ProposalStatus,
158    pub created_at: DateTime<Utc>,
159    /// Votes from personas: persona_id -> vote
160    #[serde(default)]
161    pub votes: HashMap<String, ProposalVote>,
162    /// Deadline for voting
163    pub vote_deadline: Option<DateTime<Utc>>,
164    /// Whether votes have been requested this cycle
165    #[serde(default)]
166    pub votes_requested: bool,
167    /// Quorum required (frozen at creation time to prevent drift).
168    #[serde(default)]
169    pub quorum_needed: usize,
170}
171
172/// Thought/event types emitted by the cognition loop.
173#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
174#[serde(rename_all = "snake_case")]
175pub enum ThoughtEventType {
176    ThoughtGenerated,
177    HypothesisRaised,
178    CheckRequested,
179    CheckResult,
180    ProposalCreated,
181    ProposalVerified,
182    ProposalRejected,
183    ActionExecuted,
184    PersonaSpawned,
185    PersonaReaped,
186    SnapshotCompressed,
187    BeliefExtracted,
188    BeliefContested,
189    BeliefRevalidated,
190    BudgetPaused,
191    IdleReaped,
192    AttentionCreated,
193    VoteCast,
194    WorkspaceUpdated,
195}
196
197impl ThoughtEventType {
198    fn as_str(&self) -> &'static str {
199        match self {
200            Self::ThoughtGenerated => "thought_generated",
201            Self::HypothesisRaised => "hypothesis_raised",
202            Self::CheckRequested => "check_requested",
203            Self::CheckResult => "check_result",
204            Self::ProposalCreated => "proposal_created",
205            Self::ProposalVerified => "proposal_verified",
206            Self::ProposalRejected => "proposal_rejected",
207            Self::ActionExecuted => "action_executed",
208            Self::PersonaSpawned => "persona_spawned",
209            Self::PersonaReaped => "persona_reaped",
210            Self::SnapshotCompressed => "snapshot_compressed",
211            Self::BeliefExtracted => "belief_extracted",
212            Self::BeliefContested => "belief_contested",
213            Self::BeliefRevalidated => "belief_revalidated",
214            Self::BudgetPaused => "budget_paused",
215            Self::IdleReaped => "idle_reaped",
216            Self::AttentionCreated => "attention_created",
217            Self::VoteCast => "vote_cast",
218            Self::WorkspaceUpdated => "workspace_updated",
219        }
220    }
221}
222
223/// Streamable thought event contract.
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct ThoughtEvent {
226    pub id: String,
227    pub event_type: ThoughtEventType,
228    pub persona_id: Option<String>,
229    pub swarm_id: Option<String>,
230    pub timestamp: DateTime<Utc>,
231    pub payload: serde_json::Value,
232}
233
234/// Distilled memory snapshot contract.
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct MemorySnapshot {
237    pub id: String,
238    pub generated_at: DateTime<Utc>,
239    pub swarm_id: Option<String>,
240    pub persona_scope: Vec<String>,
241    pub summary: String,
242    pub hot_event_count: usize,
243    pub warm_fact_count: usize,
244    pub cold_snapshot_count: usize,
245    pub metadata: HashMap<String, serde_json::Value>,
246}
247
248/// Request payload for creating a persona.
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct CreatePersonaRequest {
251    pub persona_id: Option<String>,
252    pub name: String,
253    pub role: String,
254    pub charter: String,
255    pub swarm_id: Option<String>,
256    pub parent_id: Option<String>,
257    pub policy: Option<PersonaPolicy>,
258    #[serde(default)]
259    pub tags: Vec<String>,
260}
261
262/// Request payload for spawning a child persona.
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct SpawnPersonaRequest {
265    pub persona_id: Option<String>,
266    pub name: String,
267    pub role: String,
268    pub charter: String,
269    pub swarm_id: Option<String>,
270    pub policy: Option<PersonaPolicy>,
271}
272
273/// Request payload for reaping persona(s).
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct ReapPersonaRequest {
276    pub cascade: Option<bool>,
277    pub reason: Option<String>,
278}
279
280/// Start-control payload for perpetual cognition.
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct StartCognitionRequest {
283    pub loop_interval_ms: Option<u64>,
284    pub seed_persona: Option<CreatePersonaRequest>,
285}
286
287/// Stop-control payload for perpetual cognition.
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct StopCognitionRequest {
290    pub reason: Option<String>,
291}
292
293// ── Attention, Governance, GlobalWorkspace ──────────────────────────────
294
295/// Source of an attention item.
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
297#[serde(rename_all = "snake_case")]
298pub enum AttentionSource {
299    ContestedBelief,
300    FailedCheck,
301    StaleBelief,
302    ProposalTimeout,
303    FailedExecution,
304}
305
306/// An item requiring persona attention.
307#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct AttentionItem {
309    pub id: String,
310    pub topic: String,
311    pub topic_tags: Vec<String>,
312    pub priority: f32,
313    pub source_type: AttentionSource,
314    pub source_id: String,
315    pub assigned_persona: Option<String>,
316    pub created_at: DateTime<Utc>,
317    pub resolved_at: Option<DateTime<Utc>>,
318}
319
320/// Governance rules for swarm voting.
321#[derive(Debug, Clone, Serialize, Deserialize)]
322pub struct SwarmGovernance {
323    pub quorum_fraction: f32,
324    pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
325    pub veto_roles: Vec<String>,
326    pub vote_timeout_secs: u64,
327}
328
329impl Default for SwarmGovernance {
330    fn default() -> Self {
331        Self {
332            quorum_fraction: 0.5,
333            required_approvers_by_role: HashMap::new(),
334            veto_roles: Vec::new(),
335            vote_timeout_secs: 300,
336        }
337    }
338}
339
340/// The coherent "now" for the entire swarm.
341#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct GlobalWorkspace {
343    pub top_beliefs: Vec<String>,
344    pub top_uncertainties: Vec<String>,
345    pub top_attention: Vec<String>,
346    pub active_objectives: Vec<String>,
347    pub updated_at: DateTime<Utc>,
348}
349
350impl Default for GlobalWorkspace {
351    fn default() -> Self {
352        Self {
353            top_beliefs: Vec::new(),
354            top_uncertainties: Vec::new(),
355            top_attention: Vec::new(),
356            active_objectives: Vec::new(),
357            updated_at: Utc::now(),
358        }
359    }
360}
361
362/// Start/stop response status.
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct CognitionStatus {
365    pub enabled: bool,
366    pub running: bool,
367    pub loop_interval_ms: u64,
368    pub started_at: Option<DateTime<Utc>>,
369    pub last_tick_at: Option<DateTime<Utc>>,
370    pub persona_count: usize,
371    pub active_persona_count: usize,
372    pub events_buffered: usize,
373    pub snapshots_buffered: usize,
374}
375
376/// Reap response.
377#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct ReapPersonaResponse {
379    pub reaped_ids: Vec<String>,
380    pub count: usize,
381}
382
383/// Lineage node response.
384#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct LineageNode {
386    pub persona_id: String,
387    pub parent_id: Option<String>,
388    pub children: Vec<String>,
389    pub depth: u32,
390    pub status: PersonaStatus,
391}
392
393/// Lineage graph response.
394#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct LineageGraph {
396    pub nodes: Vec<LineageNode>,
397    pub roots: Vec<String>,
398    pub total_edges: usize,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402enum ThoughtPhase {
403    Observe,
404    Reflect,
405    Test,
406    Compress,
407}
408
409impl ThoughtPhase {
410    fn from_thought_count(thought_count: u64) -> Self {
411        match thought_count % 4 {
412            1 => Self::Observe,
413            2 => Self::Reflect,
414            3 => Self::Test,
415            _ => Self::Compress,
416        }
417    }
418
419    fn as_str(&self) -> &'static str {
420        match self {
421            Self::Observe => "observe",
422            Self::Reflect => "reflect",
423            Self::Test => "test",
424            Self::Compress => "compress",
425        }
426    }
427
428    fn event_type(&self) -> ThoughtEventType {
429        match self {
430            Self::Observe => ThoughtEventType::ThoughtGenerated,
431            Self::Reflect => ThoughtEventType::HypothesisRaised,
432            Self::Test => ThoughtEventType::CheckRequested,
433            Self::Compress => ThoughtEventType::SnapshotCompressed,
434        }
435    }
436}
437
438#[derive(Debug, Clone)]
439struct ThoughtWorkItem {
440    persona_id: String,
441    persona_name: String,
442    role: String,
443    charter: String,
444    swarm_id: Option<String>,
445    thought_count: u64,
446    phase: ThoughtPhase,
447}
448
449#[derive(Debug, Clone)]
450struct ThoughtResult {
451    source: &'static str,
452    model: Option<String>,
453    finish_reason: Option<String>,
454    thinking: String,
455    prompt_tokens: Option<u32>,
456    completion_tokens: Option<u32>,
457    total_tokens: Option<u32>,
458    latency_ms: u128,
459    error: Option<String>,
460}
461
462/// Runtime options for cognition manager.
463#[derive(Debug, Clone)]
464pub struct CognitionRuntimeOptions {
465    pub enabled: bool,
466    pub loop_interval_ms: u64,
467    pub max_events: usize,
468    pub max_snapshots: usize,
469    pub default_policy: PersonaPolicy,
470}
471
472impl Default for CognitionRuntimeOptions {
473    fn default() -> Self {
474        Self {
475            enabled: false,
476            loop_interval_ms: 2_000,
477            max_events: 2_000,
478            max_snapshots: 128,
479            default_policy: PersonaPolicy::default(),
480        }
481    }
482}
483
484/// In-memory cognition runtime for perpetual persona swarms.
485#[derive(Debug)]
486pub struct CognitionRuntime {
487    enabled: bool,
488    max_events: usize,
489    max_snapshots: usize,
490    default_policy: PersonaPolicy,
491    running: Arc<AtomicBool>,
492    loop_interval_ms: Arc<RwLock<u64>>,
493    started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
494    last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
495    personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
496    proposals: Arc<RwLock<HashMap<String, Proposal>>>,
497    events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
498    snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
499    loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
500    event_tx: broadcast::Sender<ThoughtEvent>,
501    thinker: Option<Arc<ThinkerClient>>,
502    beliefs: Arc<RwLock<HashMap<String, Belief>>>,
503    attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
504    governance: Arc<RwLock<SwarmGovernance>>,
505    workspace: Arc<RwLock<GlobalWorkspace>>,
506    tools: Option<Arc<ToolRegistry>>,
507    receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
508    /// Proposals pending human approval for Critical risk.
509    pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
510}
511
512impl CognitionRuntime {
513    /// Build runtime from environment feature flags.
514    pub fn new_from_env() -> Self {
515        let mut options = CognitionRuntimeOptions::default();
516        options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
517        options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
518        options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
519        options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
520
521        options.default_policy = PersonaPolicy {
522            max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
523            max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
524            token_budget_per_minute: env_u32(
525                "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
526                20_000,
527            ),
528            compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
529            idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
530            share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
531            allowed_tools: Vec::new(),
532        };
533
534        let thinker_backend = thinker::ThinkerBackend::from_env(
535            &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
536                .unwrap_or_else(|_| "openai_compat".to_string()),
537        );
538        let thinker_timeout_default = match thinker_backend {
539            thinker::ThinkerBackend::OpenAICompat => 30_000,
540            thinker::ThinkerBackend::Candle => 12_000,
541            thinker::ThinkerBackend::Bedrock => 60_000,
542        };
543        let thinker_config = ThinkerConfig {
544            enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
545            backend: thinker_backend,
546            endpoint: normalize_thinker_endpoint(
547                &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
548                    .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
549            ),
550            model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
551                .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
552            api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
553            temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
554            top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
555                .ok()
556                .and_then(|v| v.parse::<f32>().ok()),
557            max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
558            timeout_ms: env_u64(
559                "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
560                thinker_timeout_default,
561            ),
562            candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
563            candle_tokenizer_path: std::env::var(
564                "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
565            )
566            .ok(),
567            candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
568            candle_device: thinker::CandleDevicePreference::from_env(
569                &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
570                    .unwrap_or_else(|_| "auto".to_string()),
571            ),
572            candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
573            candle_repeat_penalty: env_f32(
574                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
575                1.1,
576            ),
577            candle_repeat_last_n: env_usize(
578                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
579                64,
580            ),
581            candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
582            bedrock_region: std::env::var("CODETETHER_COGNITION_THINKER_BEDROCK_REGION")
583                .unwrap_or_else(|_| {
584                    std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-west-2".to_string())
585                }),
586        };
587
588        let mut runtime = Self::new_with_options(options);
589        // Configure the thinker on the existing runtime
590        runtime.thinker = Some(thinker_config).and_then(|cfg| {
591            if !cfg.enabled {
592                return None;
593            }
594            match ThinkerClient::new(cfg) {
595                Ok(client) => {
596                    tracing::info!(
597                        backend = ?client.config().backend,
598                        endpoint = %client.config().endpoint,
599                        model = %client.config().model,
600                        "Cognition thinker initialized"
601                    );
602                    Some(Arc::new(client))
603                }
604                Err(error) => {
605                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
606                    None
607                }
608            }
609        });
610        runtime
611    }
612
613    /// Build runtime from explicit options.
614    pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
615        Self::new_with_options_and_thinker(options, None)
616    }
617
618    fn new_with_options_and_thinker(
619        options: CognitionRuntimeOptions,
620        thinker_config: Option<ThinkerConfig>,
621    ) -> Self {
622        let (event_tx, _) = broadcast::channel(256);
623        let thinker = thinker_config.and_then(|cfg| {
624            if !cfg.enabled {
625                return None;
626            }
627            match ThinkerClient::new(cfg) {
628                Ok(client) => {
629                    tracing::info!(
630                        backend = ?client.config().backend,
631                        endpoint = %client.config().endpoint,
632                        model = %client.config().model,
633                        "Cognition thinker initialized"
634                    );
635                    Some(Arc::new(client))
636                }
637                Err(error) => {
638                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
639                    None
640                }
641            }
642        });
643
644        // Load persisted state before construction to avoid blocking_write()
645        // inside a tokio runtime (which panics).
646        let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) =
647            if let Some(persisted) = persistence::load_state() {
648                tracing::info!(
649                    personas = persisted.personas.len(),
650                    beliefs = persisted.beliefs.len(),
651                    persisted_at = %persisted.persisted_at,
652                    "Restoring persisted cognition state"
653                );
654                (
655                    persisted.personas,
656                    persisted.beliefs,
657                    persisted.proposals,
658                    persisted.attention_queue,
659                    persisted.workspace,
660                )
661            } else {
662                (
663                    HashMap::new(),
664                    HashMap::new(),
665                    HashMap::new(),
666                    Vec::new(),
667                    GlobalWorkspace::default(),
668                )
669            };
670
671        let runtime = Self {
672            enabled: options.enabled,
673            max_events: options.max_events.max(32),
674            max_snapshots: options.max_snapshots.max(8),
675            default_policy: options.default_policy,
676            running: Arc::new(AtomicBool::new(false)),
677            loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
678            started_at: Arc::new(RwLock::new(None)),
679            last_tick_at: Arc::new(RwLock::new(None)),
680            personas: Arc::new(RwLock::new(init_personas)),
681            proposals: Arc::new(RwLock::new(init_proposals)),
682            events: Arc::new(RwLock::new(VecDeque::new())),
683            snapshots: Arc::new(RwLock::new(VecDeque::new())),
684            loop_handle: Arc::new(Mutex::new(None)),
685            event_tx,
686            thinker,
687            beliefs: Arc::new(RwLock::new(init_beliefs)),
688            attention_queue: Arc::new(RwLock::new(init_attention)),
689            governance: Arc::new(RwLock::new(SwarmGovernance::default())),
690            workspace: Arc::new(RwLock::new(init_workspace)),
691            tools: None,
692            receipts: Arc::new(RwLock::new(Vec::new())),
693            pending_approvals: Arc::new(RwLock::new(HashMap::new())),
694        };
695
696        runtime
697    }
698
699    /// Whether cognition is enabled by feature flag.
700    pub fn is_enabled(&self) -> bool {
701        self.enabled
702    }
703
704    /// Subscribe to thought events for streaming.
705    pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
706        self.event_tx.subscribe()
707    }
708
709    /// Start the perpetual cognition loop.
710    pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
711        if !self.enabled {
712            return Err(anyhow!(
713                "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
714            ));
715        }
716
717        let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
718        if let Some(request) = req {
719            if let Some(interval) = request.loop_interval_ms {
720                let mut lock = self.loop_interval_ms.write().await;
721                *lock = interval.max(100);
722            }
723            requested_seed_persona = request.seed_persona;
724        }
725
726        let has_personas = !self.personas.read().await.is_empty();
727        if !has_personas {
728            let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
729            let _ = self.create_persona(seed).await?;
730        }
731
732        if self.running.load(Ordering::SeqCst) {
733            return Ok(self.status().await);
734        }
735
736        self.running.store(true, Ordering::SeqCst);
737        {
738            let mut started = self.started_at.write().await;
739            *started = Some(Utc::now());
740        }
741
742        let running = Arc::clone(&self.running);
743        let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
744        let last_tick_at = Arc::clone(&self.last_tick_at);
745        let personas = Arc::clone(&self.personas);
746        let proposals = Arc::clone(&self.proposals);
747        let events = Arc::clone(&self.events);
748        let snapshots = Arc::clone(&self.snapshots);
749        let max_events = self.max_events;
750        let max_snapshots = self.max_snapshots;
751        let event_tx = self.event_tx.clone();
752        let thinker = self.thinker.clone();
753        let beliefs = Arc::clone(&self.beliefs);
754        let attention_queue = Arc::clone(&self.attention_queue);
755        let governance = Arc::clone(&self.governance);
756        let workspace = Arc::clone(&self.workspace);
757        let tools = self.tools.clone();
758        let receipts = Arc::clone(&self.receipts);
759        let pending_approvals = Arc::clone(&self.pending_approvals);
760
761        let handle = tokio::spawn(async move {
762            let mut next_tick = Instant::now();
763            while running.load(Ordering::SeqCst) {
764                let now_instant = Instant::now();
765                if now_instant < next_tick {
766                    tokio::time::sleep_until(next_tick).await;
767                }
768                if !running.load(Ordering::SeqCst) {
769                    break;
770                }
771
772                let now = Utc::now();
773                {
774                    let mut lock = last_tick_at.write().await;
775                    *lock = Some(now);
776                }
777
778                let mut new_events = Vec::new();
779                let mut new_snapshots = Vec::new();
780                let mut new_proposals = Vec::new();
781
782                // ── Step 1: Budget window reset + enforcement ──
783                let work_items: Vec<ThoughtWorkItem> = {
784                    let mut map = personas.write().await;
785                    let mut items = Vec::new();
786                    for persona in map.values_mut() {
787                        if persona.status != PersonaStatus::Active {
788                            continue;
789                        }
790
791                        // Reset budget window every 60 seconds
792                        let window_elapsed = now
793                            .signed_duration_since(persona.window_started_at)
794                            .num_seconds();
795                        if window_elapsed >= 60 {
796                            persona.tokens_this_window = 0;
797                            persona.compute_ms_this_window = 0;
798                            persona.window_started_at = now;
799                        }
800
801                        // Check budget
802                        let token_ok =
803                            persona.tokens_this_window < persona.policy.token_budget_per_minute;
804                        let compute_ok =
805                            persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
806                        if !token_ok || !compute_ok {
807                            if !persona.budget_paused {
808                                persona.budget_paused = true;
809                                new_events.push(ThoughtEvent {
810                                    id: Uuid::new_v4().to_string(),
811                                    event_type: ThoughtEventType::BudgetPaused,
812                                    persona_id: Some(persona.identity.id.clone()),
813                                    swarm_id: persona.identity.swarm_id.clone(),
814                                    timestamp: now,
815                                    payload: json!({
816                                        "budget_paused": true,
817                                        "tokens_used": persona.tokens_this_window,
818                                        "compute_ms_used": persona.compute_ms_this_window,
819                                        "token_budget": persona.policy.token_budget_per_minute,
820                                        "compute_budget": persona.policy.compute_ms_per_minute,
821                                    }),
822                                });
823                            }
824                            continue; // skip tick for this persona
825                        }
826
827                        persona.budget_paused = false;
828                        persona.thought_count = persona.thought_count.saturating_add(1);
829                        persona.last_tick_at = Some(now);
830                        persona.updated_at = now;
831                        items.push(ThoughtWorkItem {
832                            persona_id: persona.identity.id.clone(),
833                            persona_name: persona.identity.name.clone(),
834                            role: persona.identity.role.clone(),
835                            charter: persona.identity.charter.clone(),
836                            swarm_id: persona.identity.swarm_id.clone(),
837                            thought_count: persona.thought_count,
838                            phase: ThoughtPhase::from_thought_count(persona.thought_count),
839                        });
840                    }
841                    items
842                };
843
844                for work in &work_items {
845                    let context = recent_persona_context(&events, &work.persona_id, 8).await;
846
847                    let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
848
849                    let event_timestamp = Utc::now();
850                    let is_fallback = thought.source == "fallback";
851                    let tokens_used = thought.total_tokens.unwrap_or(0);
852                    let compute_used = thought.latency_ms as u32;
853
854                    new_events.push(ThoughtEvent {
855                        id: Uuid::new_v4().to_string(),
856                        event_type: work.phase.event_type(),
857                        persona_id: Some(work.persona_id.clone()),
858                        swarm_id: work.swarm_id.clone(),
859                        timestamp: event_timestamp,
860                        payload: json!({
861                            "phase": work.phase.as_str(),
862                            "thought_count": work.thought_count,
863                            "persona": {
864                                "id": work.persona_id.clone(),
865                                "name": work.persona_name.clone(),
866                                "role": work.role.clone(),
867                            },
868                            "context_event_count": context.len(),
869                            "thinking": thought.thinking.clone(),
870                            "source": thought.source,
871                            "model": thought.model.clone(),
872                            "finish_reason": thought.finish_reason.clone(),
873                            "usage": {
874                                "prompt_tokens": thought.prompt_tokens,
875                                "completion_tokens": thought.completion_tokens,
876                                "total_tokens": thought.total_tokens,
877                            },
878                            "latency_ms": thought.latency_ms,
879                            "error": thought.error.clone(),
880                        }),
881                    });
882
883                    // ── After thought: update budget counters ──
884                    {
885                        let mut map = personas.write().await;
886                        if let Some(persona) = map.get_mut(&work.persona_id) {
887                            persona.tokens_this_window =
888                                persona.tokens_this_window.saturating_add(tokens_used);
889                            persona.compute_ms_this_window =
890                                persona.compute_ms_this_window.saturating_add(compute_used);
891                            if !is_fallback {
892                                persona.last_progress_at = Utc::now();
893                            }
894                        }
895                    }
896
897                    // ── Step 3: Belief extraction during Reflect phase ──
898                    if work.phase == ThoughtPhase::Reflect && !is_fallback {
899                        let extracted = beliefs::extract_beliefs_from_thought(
900                            thinker.as_deref(),
901                            &work.persona_id,
902                            &thought.thinking,
903                        )
904                        .await;
905
906                        if !extracted.is_empty() {
907                            let mut belief_store = beliefs.write().await;
908                            let mut attn_queue = attention_queue.write().await;
909                            for mut new_belief in extracted {
910                                // Check for existing belief_key (duplicate detection)
911                                let existing_id = belief_store
912                                    .values()
913                                    .find(|b| {
914                                        b.belief_key == new_belief.belief_key
915                                            && b.status != BeliefStatus::Invalidated
916                                    })
917                                    .map(|b| b.id.clone());
918
919                                if let Some(eid) = existing_id {
920                                    // Confirm existing belief
921                                    if let Some(existing) = belief_store.get_mut(&eid) {
922                                        if !existing.confirmed_by.contains(&work.persona_id) {
923                                            existing.confirmed_by.push(work.persona_id.clone());
924                                        }
925                                        existing.revalidation_success();
926                                    }
927                                } else {
928                                    // Handle contradiction targets
929                                    let contest_targets: Vec<String> =
930                                        new_belief.contradicts.clone();
931                                    for target_key in &contest_targets {
932                                        if let Some(target) = belief_store.values_mut().find(|b| {
933                                            &b.belief_key == target_key
934                                                && b.status != BeliefStatus::Invalidated
935                                        }) {
936                                            target.contested_by.push(new_belief.id.clone());
937                                            if !new_belief.contradicts.contains(&target.belief_key)
938                                            {
939                                                new_belief
940                                                    .contradicts
941                                                    .push(target.belief_key.clone());
942                                            }
943                                            // Apply contest penalty
944                                            target.revalidation_failure();
945                                            if target.confidence >= 0.5 {
946                                                // Create revalidation attention item
947                                                attn_queue.push(AttentionItem {
948                                                    id: Uuid::new_v4().to_string(),
949                                                    topic: format!(
950                                                        "Revalidate belief: {}",
951                                                        target.claim
952                                                    ),
953                                                    topic_tags: vec![target.belief_key.clone()],
954                                                    priority: 0.7,
955                                                    source_type: AttentionSource::ContestedBelief,
956                                                    source_id: target.id.clone(),
957                                                    assigned_persona: None,
958                                                    created_at: Utc::now(),
959                                                    resolved_at: None,
960                                                });
961                                            }
962                                        }
963                                    }
964
965                                    new_events.push(ThoughtEvent {
966                                        id: Uuid::new_v4().to_string(),
967                                        event_type: ThoughtEventType::BeliefExtracted,
968                                        persona_id: Some(work.persona_id.clone()),
969                                        swarm_id: work.swarm_id.clone(),
970                                        timestamp: Utc::now(),
971                                        payload: json!({
972                                            "belief_id": new_belief.id,
973                                            "belief_key": new_belief.belief_key,
974                                            "claim": trim_for_storage(&new_belief.claim, 280),
975                                            "confidence": new_belief.confidence,
976                                        }),
977                                    });
978
979                                    // Mark progress for belief creation
980                                    {
981                                        let mut map = personas.write().await;
982                                        if let Some(p) = map.get_mut(&work.persona_id) {
983                                            p.last_progress_at = Utc::now();
984                                        }
985                                    }
986
987                                    new_belief.clamp_confidence();
988                                    belief_store.insert(new_belief.id.clone(), new_belief);
989                                }
990                            }
991                        }
992                    }
993
994                    if work.phase == ThoughtPhase::Test {
995                        new_events.push(ThoughtEvent {
996                            id: Uuid::new_v4().to_string(),
997                            event_type: ThoughtEventType::CheckResult,
998                            persona_id: Some(work.persona_id.clone()),
999                            swarm_id: work.swarm_id.clone(),
1000                            timestamp: Utc::now(),
1001                            payload: json!({
1002                                "phase": work.phase.as_str(),
1003                                "thought_count": work.thought_count,
1004                                "result_excerpt": trim_for_storage(&thought.thinking, 280),
1005                                "source": thought.source,
1006                                "model": thought.model,
1007                            }),
1008                        });
1009
1010                        // Mark progress for check result
1011                        {
1012                            let mut map = personas.write().await;
1013                            if let Some(p) = map.get_mut(&work.persona_id) {
1014                                p.last_progress_at = Utc::now();
1015                            }
1016                        }
1017
1018                        // ── Step 6: Tool execution via capability leases ──
1019                        if !is_fallback {
1020                            if let Some(ref tool_registry) = tools {
1021                                let allowed = {
1022                                    let map = personas.read().await;
1023                                    map.get(&work.persona_id)
1024                                        .map(|p| p.policy.allowed_tools.clone())
1025                                        .unwrap_or_default()
1026                                };
1027                                if !allowed.is_empty() {
1028                                    let tool_results = executor::execute_tool_requests(
1029                                        thinker.as_deref(),
1030                                        tool_registry,
1031                                        &work.persona_id,
1032                                        &thought.thinking,
1033                                        &allowed,
1034                                    )
1035                                    .await;
1036
1037                                    for result_event in tool_results {
1038                                        new_events.push(result_event);
1039                                        // Mark progress for tool execution
1040                                        let mut map = personas.write().await;
1041                                        if let Some(p) = map.get_mut(&work.persona_id) {
1042                                            p.last_progress_at = Utc::now();
1043                                        }
1044                                    }
1045                                }
1046                            }
1047                        }
1048                    }
1049
1050                    if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
1051                        let gov = governance.read().await;
1052                        let proposal = Proposal {
1053                            id: Uuid::new_v4().to_string(),
1054                            persona_id: work.persona_id.clone(),
1055                            title: proposal_title_from_thought(
1056                                &thought.thinking,
1057                                work.thought_count,
1058                            ),
1059                            rationale: trim_for_storage(&thought.thinking, 900),
1060                            evidence_refs: vec!["internal.thought_stream".to_string()],
1061                            risk: ProposalRisk::Low,
1062                            status: ProposalStatus::Created,
1063                            created_at: Utc::now(),
1064                            votes: HashMap::new(),
1065                            vote_deadline: Some(
1066                                Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1067                            ),
1068                            votes_requested: false,
1069                            quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1070                                as usize,
1071                        };
1072
1073                        new_events.push(ThoughtEvent {
1074                            id: Uuid::new_v4().to_string(),
1075                            event_type: ThoughtEventType::ProposalCreated,
1076                            persona_id: Some(work.persona_id.clone()),
1077                            swarm_id: work.swarm_id.clone(),
1078                            timestamp: Utc::now(),
1079                            payload: json!({
1080                                "proposal_id": proposal.id,
1081                                "title": proposal.title,
1082                                "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1083                                "source": thought.source,
1084                                "model": thought.model,
1085                            }),
1086                        });
1087
1088                        new_proposals.push(proposal);
1089                    }
1090
1091                    if work.phase == ThoughtPhase::Compress {
1092                        new_snapshots.push(MemorySnapshot {
1093                            id: Uuid::new_v4().to_string(),
1094                            generated_at: Utc::now(),
1095                            swarm_id: work.swarm_id.clone(),
1096                            persona_scope: vec![work.persona_id.clone()],
1097                            summary: trim_for_storage(&thought.thinking, 1_500),
1098                            hot_event_count: context.len(),
1099                            warm_fact_count: estimate_fact_count(&thought.thinking),
1100                            cold_snapshot_count: 1,
1101                            metadata: HashMap::from([
1102                                (
1103                                    "phase".to_string(),
1104                                    serde_json::Value::String(work.phase.as_str().to_string()),
1105                                ),
1106                                (
1107                                    "role".to_string(),
1108                                    serde_json::Value::String(work.role.clone()),
1109                                ),
1110                                (
1111                                    "source".to_string(),
1112                                    serde_json::Value::String(thought.source.to_string()),
1113                                ),
1114                                (
1115                                    "model".to_string(),
1116                                    serde_json::Value::String(
1117                                        thought
1118                                            .model
1119                                            .clone()
1120                                            .unwrap_or_else(|| "fallback".to_string()),
1121                                    ),
1122                                ),
1123                                (
1124                                    "completion_tokens".to_string(),
1125                                    serde_json::Value::Number(serde_json::Number::from(
1126                                        thought.completion_tokens.unwrap_or(0) as u64,
1127                                    )),
1128                                ),
1129                            ]),
1130                        });
1131
1132                        // ── Step 3: Belief staleness decay in Compress phase ──
1133                        {
1134                            let mut belief_store = beliefs.write().await;
1135                            let mut attn_queue = attention_queue.write().await;
1136                            let stale_ids: Vec<String> = belief_store
1137                                .values()
1138                                .filter(|b| {
1139                                    b.status == BeliefStatus::Active && now > b.review_after
1140                                })
1141                                .map(|b| b.id.clone())
1142                                .collect();
1143                            for id in stale_ids {
1144                                if let Some(belief) = belief_store.get_mut(&id) {
1145                                    belief.decay();
1146                                    attn_queue.push(AttentionItem {
1147                                        id: Uuid::new_v4().to_string(),
1148                                        topic: format!("Stale belief: {}", belief.claim),
1149                                        topic_tags: vec![belief.belief_key.clone()],
1150                                        priority: 0.4,
1151                                        source_type: AttentionSource::StaleBelief,
1152                                        source_id: belief.id.clone(),
1153                                        assigned_persona: None,
1154                                        created_at: now,
1155                                        resolved_at: None,
1156                                    });
1157                                }
1158                            }
1159                        }
1160
1161                        // ── Step 4d: Update GlobalWorkspace ──
1162                        {
1163                            let belief_store = beliefs.read().await;
1164                            let attn_queue = attention_queue.read().await;
1165
1166                            let mut sorted_beliefs: Vec<&Belief> = belief_store
1167                                .values()
1168                                .filter(|b| b.status == BeliefStatus::Active)
1169                                .collect();
1170                            sorted_beliefs.sort_by(|a, b| {
1171                                let score_a = a.confidence
1172                                    * (1.0
1173                                        / (1.0
1174                                            + now.signed_duration_since(a.updated_at).num_minutes()
1175                                                as f32));
1176                                let score_b = b.confidence
1177                                    * (1.0
1178                                        / (1.0
1179                                            + now.signed_duration_since(b.updated_at).num_minutes()
1180                                                as f32));
1181                                score_b
1182                                    .partial_cmp(&score_a)
1183                                    .unwrap_or(std::cmp::Ordering::Equal)
1184                            });
1185
1186                            let top_beliefs: Vec<String> = sorted_beliefs
1187                                .iter()
1188                                .take(10)
1189                                .map(|b| b.id.clone())
1190                                .collect();
1191                            let top_uncertainties: Vec<String> = {
1192                                let mut uncertain: Vec<&Belief> = belief_store
1193                                    .values()
1194                                    .filter(|b| {
1195                                        b.status == BeliefStatus::Stale
1196                                            || !b.contested_by.is_empty()
1197                                    })
1198                                    .collect();
1199                                // Deterministic order: contested first, then lowest confidence,
1200                                // then oldest updated_at.
1201                                uncertain.sort_by(|a, b| {
1202                                    let a_contested = !a.contested_by.is_empty();
1203                                    let b_contested = !b.contested_by.is_empty();
1204                                    b_contested
1205                                        .cmp(&a_contested)
1206                                        .then_with(|| {
1207                                            a.confidence
1208                                                .partial_cmp(&b.confidence)
1209                                                .unwrap_or(std::cmp::Ordering::Equal)
1210                                        })
1211                                        .then_with(|| a.updated_at.cmp(&b.updated_at))
1212                                });
1213                                uncertain
1214                                    .iter()
1215                                    .take(5)
1216                                    .map(|b| {
1217                                        format!(
1218                                            "[{}] {}",
1219                                            b.belief_key,
1220                                            trim_for_storage(&b.claim, 80)
1221                                        )
1222                                    })
1223                                    .collect()
1224                            };
1225
1226                            let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1227                                .iter()
1228                                .filter(|a| a.resolved_at.is_none())
1229                                .collect();
1230                            sorted_attn.sort_by(|a, b| {
1231                                b.priority
1232                                    .partial_cmp(&a.priority)
1233                                    .unwrap_or(std::cmp::Ordering::Equal)
1234                            });
1235                            let top_attention: Vec<String> =
1236                                sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1237
1238                            let mut ws = workspace.write().await;
1239                            ws.top_beliefs = top_beliefs;
1240                            ws.top_uncertainties = top_uncertainties;
1241                            ws.top_attention = top_attention;
1242                            ws.updated_at = now;
1243                        }
1244
1245                        new_events.push(ThoughtEvent {
1246                            id: Uuid::new_v4().to_string(),
1247                            event_type: ThoughtEventType::WorkspaceUpdated,
1248                            persona_id: Some(work.persona_id.clone()),
1249                            swarm_id: work.swarm_id.clone(),
1250                            timestamp: Utc::now(),
1251                            payload: json!({ "updated": true }),
1252                        });
1253                    }
1254                }
1255
1256                // ── Step 4c: Governance — resolve proposals ──
1257                {
1258                    let gov = governance.read().await;
1259                    let mut proposal_store = proposals.write().await;
1260                    let persona_map = personas.read().await;
1261
1262                    let proposal_ids: Vec<String> = proposal_store
1263                        .values()
1264                        .filter(|p| p.status == ProposalStatus::Created)
1265                        .map(|p| p.id.clone())
1266                        .collect();
1267
1268                    let mut attn_queue = attention_queue.write().await;
1269                    for pid in proposal_ids {
1270                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1271                            let quorum_needed = proposal.quorum_needed.max(1);
1272
1273                            // Check vote deadline
1274                            if let Some(deadline) = proposal.vote_deadline {
1275                                if now > deadline {
1276                                    if proposal.votes.len() < quorum_needed {
1277                                        // Timeout without quorum → attention item
1278                                        attn_queue.push(AttentionItem {
1279                                            id: Uuid::new_v4().to_string(),
1280                                            topic: format!(
1281                                                "Proposal vote timeout: {}",
1282                                                proposal.title
1283                                            ),
1284                                            topic_tags: Vec::new(),
1285                                            priority: 0.6,
1286                                            source_type: AttentionSource::ProposalTimeout,
1287                                            source_id: proposal.id.clone(),
1288                                            assigned_persona: None,
1289                                            created_at: now,
1290                                            resolved_at: None,
1291                                        });
1292                                        proposal.status = ProposalStatus::Rejected;
1293                                        continue;
1294                                    }
1295
1296                                    // Deadline passed with quorum but missing required approvers → reject
1297                                    let required_roles = gov
1298                                        .required_approvers_by_role
1299                                        .get(&proposal.risk)
1300                                        .cloned()
1301                                        .unwrap_or_default();
1302                                    let all_required_met = required_roles.iter().all(|role| {
1303                                        proposal.votes.iter().any(|(vid, vote)| {
1304                                            *vote == ProposalVote::Approve
1305                                                && persona_map
1306                                                    .get(vid)
1307                                                    .map(|p| &p.identity.role == role)
1308                                                    .unwrap_or(false)
1309                                        })
1310                                    });
1311                                    if !all_required_met {
1312                                        attn_queue.push(AttentionItem {
1313                                            id: Uuid::new_v4().to_string(),
1314                                            topic: format!(
1315                                                "Missing required approvers: {}",
1316                                                proposal.title
1317                                            ),
1318                                            topic_tags: Vec::new(),
1319                                            priority: 0.7,
1320                                            source_type: AttentionSource::ProposalTimeout,
1321                                            source_id: proposal.id.clone(),
1322                                            assigned_persona: None,
1323                                            created_at: now,
1324                                            resolved_at: None,
1325                                        });
1326                                        proposal.status = ProposalStatus::Rejected;
1327                                        continue;
1328                                    }
1329                                }
1330                            }
1331
1332                            // Resolve if enough votes
1333                            if proposal.votes.len() >= quorum_needed {
1334                                // Check for veto
1335                                let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1336                                    if *vote != ProposalVote::Veto {
1337                                        return false;
1338                                    }
1339                                    if let Some(voter) = persona_map.get(voter_id) {
1340                                        gov.veto_roles.contains(&voter.identity.role)
1341                                    } else {
1342                                        false
1343                                    }
1344                                });
1345
1346                                if vetoed {
1347                                    proposal.status = ProposalStatus::Rejected;
1348                                    continue;
1349                                }
1350
1351                                // Check required approvers
1352                                let required_roles = gov
1353                                    .required_approvers_by_role
1354                                    .get(&proposal.risk)
1355                                    .cloned()
1356                                    .unwrap_or_default();
1357                                let all_required_met = required_roles.iter().all(|role| {
1358                                    proposal.votes.iter().any(|(vid, vote)| {
1359                                        *vote == ProposalVote::Approve
1360                                            && persona_map
1361                                                .get(vid)
1362                                                .map(|p| &p.identity.role == role)
1363                                                .unwrap_or(false)
1364                                    })
1365                                });
1366
1367                                if !all_required_met {
1368                                    continue; // wait for required approvers
1369                                }
1370
1371                                // Count approvals vs rejections
1372                                let approvals = proposal
1373                                    .votes
1374                                    .values()
1375                                    .filter(|v| **v == ProposalVote::Approve)
1376                                    .count();
1377                                let rejections = proposal
1378                                    .votes
1379                                    .values()
1380                                    .filter(|v| **v == ProposalVote::Reject)
1381                                    .count();
1382
1383                                if approvals > rejections {
1384                                    proposal.status = ProposalStatus::Verified;
1385                                } else {
1386                                    proposal.status = ProposalStatus::Rejected;
1387                                }
1388                            }
1389                        }
1390                    }
1391                }
1392
1393                // ── Step 7: Execute verified proposals ──
1394                {
1395                    let mut proposal_store = proposals.write().await;
1396                    let verified_ids: Vec<String> = proposal_store
1397                        .values()
1398                        .filter(|p| p.status == ProposalStatus::Verified)
1399                        .map(|p| p.id.clone())
1400                        .collect();
1401
1402                    for pid in verified_ids {
1403                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1404                            if proposal.risk == ProposalRisk::Critical {
1405                                // Check for human approval
1406                                let approved = {
1407                                    let approvals = pending_approvals.read().await;
1408                                    approvals.get(&pid).copied().unwrap_or(false)
1409                                };
1410                                if !approved {
1411                                    // Register for human approval
1412                                    let mut approvals = pending_approvals.write().await;
1413                                    approvals.entry(pid.clone()).or_insert(false);
1414                                    continue;
1415                                }
1416                            }
1417
1418                            // Create decision receipt
1419                            let receipt = executor::DecisionReceipt {
1420                                id: Uuid::new_v4().to_string(),
1421                                proposal_id: pid.clone(),
1422                                inputs: proposal.evidence_refs.clone(),
1423                                governance_decision: format!(
1424                                    "Approved with {} votes",
1425                                    proposal.votes.len()
1426                                ),
1427                                capability_leases: Vec::new(),
1428                                tool_invocations: Vec::new(),
1429                                outcome: executor::ExecutionOutcome::Success {
1430                                    summary: format!("Proposal '{}' executed", proposal.title),
1431                                },
1432                                created_at: Utc::now(),
1433                            };
1434
1435                            new_events.push(ThoughtEvent {
1436                                id: Uuid::new_v4().to_string(),
1437                                event_type: ThoughtEventType::ActionExecuted,
1438                                persona_id: Some(proposal.persona_id.clone()),
1439                                swarm_id: None,
1440                                timestamp: Utc::now(),
1441                                payload: json!({
1442                                    "receipt_id": receipt.id,
1443                                    "proposal_id": pid,
1444                                    "outcome": "success",
1445                                    "summary": format!("Proposal '{}' executed", proposal.title),
1446                                }),
1447                            });
1448
1449                            receipts.write().await.push(receipt);
1450                            proposal.status = ProposalStatus::Executed;
1451                        }
1452                    }
1453                }
1454
1455                if !new_proposals.is_empty() {
1456                    let mut proposal_store = proposals.write().await;
1457                    for proposal in new_proposals {
1458                        proposal_store.insert(proposal.id.clone(), proposal);
1459                    }
1460                }
1461
1462                for event in new_events {
1463                    push_event_internal(&events, max_events, &event_tx, event).await;
1464                }
1465                for snapshot in new_snapshots {
1466                    push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1467                }
1468
1469                // ── Step 2: Idle TTL reaping ──
1470                {
1471                    let mut map = personas.write().await;
1472                    let idle_ids: Vec<String> = map
1473                        .values()
1474                        .filter(|p| {
1475                            p.status == PersonaStatus::Active
1476                                && !p.budget_paused
1477                                && now.signed_duration_since(p.last_progress_at).num_seconds()
1478                                    > p.policy.idle_ttl_secs as i64
1479                        })
1480                        .map(|p| p.identity.id.clone())
1481                        .collect();
1482
1483                    for id in &idle_ids {
1484                        if let Some(persona) = map.get_mut(id) {
1485                            persona.status = PersonaStatus::Reaped;
1486                            persona.updated_at = now;
1487                        }
1488                        // Also cascade-reap children
1489                        let children: Vec<String> = map
1490                            .values()
1491                            .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1492                            .map(|p| p.identity.id.clone())
1493                            .collect();
1494                        for child_id in children {
1495                            if let Some(child) = map.get_mut(&child_id) {
1496                                child.status = PersonaStatus::Reaped;
1497                                child.updated_at = now;
1498                            }
1499                        }
1500                    }
1501                    drop(map);
1502
1503                    for id in idle_ids {
1504                        push_event_internal(
1505                            &events,
1506                            max_events,
1507                            &event_tx,
1508                            ThoughtEvent {
1509                                id: Uuid::new_v4().to_string(),
1510                                event_type: ThoughtEventType::IdleReaped,
1511                                persona_id: Some(id),
1512                                swarm_id: None,
1513                                timestamp: now,
1514                                payload: json!({ "reason": "idle_ttl_expired" }),
1515                            },
1516                        )
1517                        .await;
1518                    }
1519                }
1520
1521                // ── Step 5: Persistence on Compress ──
1522                if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1523                    let _ = persistence::save_state(
1524                        &personas,
1525                        &proposals,
1526                        &beliefs,
1527                        &attention_queue,
1528                        &workspace,
1529                        &events,
1530                        &snapshots,
1531                    )
1532                    .await;
1533                }
1534
1535                let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1536                next_tick += interval;
1537                let tick_completed = Instant::now();
1538                if tick_completed > next_tick {
1539                    next_tick = tick_completed;
1540                }
1541            }
1542        });
1543
1544        {
1545            let mut lock = self.loop_handle.lock().await;
1546            *lock = Some(handle);
1547        }
1548
1549        Ok(self.status().await)
1550    }
1551
1552    /// Stop the perpetual cognition loop.
1553    pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1554        self.running.store(false, Ordering::SeqCst);
1555
1556        if let Some(handle) = self.loop_handle.lock().await.take() {
1557            handle.abort();
1558            let _ = handle.await;
1559        }
1560
1561        if let Some(reason_value) = reason {
1562            let event = ThoughtEvent {
1563                id: Uuid::new_v4().to_string(),
1564                event_type: ThoughtEventType::CheckResult,
1565                persona_id: None,
1566                swarm_id: None,
1567                timestamp: Utc::now(),
1568                payload: json!({ "stopped": true, "reason": reason_value }),
1569            };
1570            self.push_event(event).await;
1571        }
1572
1573        Ok(self.status().await)
1574    }
1575
1576    /// Create a persona record.
1577    pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1578        let now = Utc::now();
1579        let mut personas = self.personas.write().await;
1580
1581        let mut parent_swarm_id = None;
1582        let mut computed_depth = 0_u32;
1583        let mut inherited_policy = None;
1584
1585        if let Some(parent_id) = req.parent_id.clone() {
1586            let parent = personas
1587                .get(&parent_id)
1588                .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1589
1590            if parent.status == PersonaStatus::Reaped {
1591                return Err(anyhow!("Parent persona {} is reaped", parent_id));
1592            }
1593
1594            parent_swarm_id = parent.identity.swarm_id.clone();
1595            computed_depth = parent.identity.depth.saturating_add(1);
1596            inherited_policy = Some(parent.policy.clone());
1597            let branch_limit = parent.policy.max_branching_factor;
1598
1599            let child_count = personas
1600                .values()
1601                .filter(|p| {
1602                    p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1603                        && p.status != PersonaStatus::Reaped
1604                })
1605                .count();
1606
1607            if child_count as u32 >= branch_limit {
1608                return Err(anyhow!(
1609                    "Parent {} reached branching limit {}",
1610                    parent_id,
1611                    branch_limit
1612                ));
1613            }
1614        }
1615
1616        let policy = req
1617            .policy
1618            .clone()
1619            .or(inherited_policy.clone())
1620            .unwrap_or_else(|| self.default_policy.clone());
1621
1622        let effective_depth_limit = inherited_policy
1623            .as_ref()
1624            .map(|p| p.max_spawn_depth)
1625            .unwrap_or(policy.max_spawn_depth);
1626
1627        if computed_depth > effective_depth_limit {
1628            return Err(anyhow!(
1629                "Spawn depth {} exceeds limit {}",
1630                computed_depth,
1631                effective_depth_limit
1632            ));
1633        }
1634
1635        let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1636        if personas.contains_key(&persona_id) {
1637            return Err(anyhow!("Persona id already exists: {}", persona_id));
1638        }
1639
1640        let identity = PersonaIdentity {
1641            id: persona_id.clone(),
1642            name: req.name,
1643            role: req.role,
1644            charter: req.charter,
1645            swarm_id: req.swarm_id.or(parent_swarm_id),
1646            parent_id: req.parent_id,
1647            depth: computed_depth,
1648            created_at: now,
1649            tags: req.tags,
1650        };
1651
1652        let persona = PersonaRuntimeState {
1653            identity,
1654            policy,
1655            status: PersonaStatus::Active,
1656            thought_count: 0,
1657            last_tick_at: None,
1658            updated_at: now,
1659            tokens_this_window: 0,
1660            compute_ms_this_window: 0,
1661            window_started_at: now,
1662            last_progress_at: now,
1663            budget_paused: false,
1664        };
1665
1666        personas.insert(persona_id, persona.clone());
1667        drop(personas);
1668
1669        self.push_event(ThoughtEvent {
1670            id: Uuid::new_v4().to_string(),
1671            event_type: ThoughtEventType::PersonaSpawned,
1672            persona_id: Some(persona.identity.id.clone()),
1673            swarm_id: persona.identity.swarm_id.clone(),
1674            timestamp: now,
1675            payload: json!({
1676                "name": persona.identity.name,
1677                "role": persona.identity.role,
1678                "depth": persona.identity.depth,
1679            }),
1680        })
1681        .await;
1682
1683        Ok(persona)
1684    }
1685
1686    /// Spawn a child persona under an existing parent.
1687    pub async fn spawn_child(
1688        &self,
1689        parent_id: &str,
1690        req: SpawnPersonaRequest,
1691    ) -> Result<PersonaRuntimeState> {
1692        let request = CreatePersonaRequest {
1693            persona_id: req.persona_id,
1694            name: req.name,
1695            role: req.role,
1696            charter: req.charter,
1697            swarm_id: req.swarm_id,
1698            parent_id: Some(parent_id.to_string()),
1699            policy: req.policy,
1700            tags: Vec::new(),
1701        };
1702        self.create_persona(request).await
1703    }
1704
1705    /// Reap one persona or the full descendant tree.
1706    pub async fn reap_persona(
1707        &self,
1708        persona_id: &str,
1709        req: ReapPersonaRequest,
1710    ) -> Result<ReapPersonaResponse> {
1711        let cascade = req.cascade.unwrap_or(false);
1712        let now = Utc::now();
1713
1714        let mut personas = self.personas.write().await;
1715        if !personas.contains_key(persona_id) {
1716            return Err(anyhow!("Persona not found: {}", persona_id));
1717        }
1718
1719        let mut reaped_ids = vec![persona_id.to_string()];
1720        if cascade {
1721            let mut idx = 0usize;
1722            while idx < reaped_ids.len() {
1723                let current = reaped_ids[idx].clone();
1724                let children: Vec<String> = personas
1725                    .values()
1726                    .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1727                    .map(|p| p.identity.id.clone())
1728                    .collect();
1729                for child in children {
1730                    if !reaped_ids.iter().any(|existing| existing == &child) {
1731                        reaped_ids.push(child);
1732                    }
1733                }
1734                idx += 1;
1735            }
1736        }
1737
1738        for id in &reaped_ids {
1739            if let Some(persona) = personas.get_mut(id) {
1740                persona.status = PersonaStatus::Reaped;
1741                persona.updated_at = now;
1742            }
1743        }
1744        drop(personas);
1745
1746        for id in &reaped_ids {
1747            self.push_event(ThoughtEvent {
1748                id: Uuid::new_v4().to_string(),
1749                event_type: ThoughtEventType::PersonaReaped,
1750                persona_id: Some(id.clone()),
1751                swarm_id: None,
1752                timestamp: now,
1753                payload: json!({
1754                    "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1755                    "cascade": cascade,
1756                }),
1757            })
1758            .await;
1759        }
1760
1761        Ok(ReapPersonaResponse {
1762            count: reaped_ids.len(),
1763            reaped_ids,
1764        })
1765    }
1766
1767    /// Get latest memory snapshot, if any.
1768    pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1769        self.snapshots.read().await.back().cloned()
1770    }
1771
1772    /// Build lineage graph from current persona state.
1773    pub async fn lineage_graph(&self) -> LineageGraph {
1774        let personas = self.personas.read().await;
1775        let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1776        let mut roots = Vec::new();
1777        let mut total_edges = 0usize;
1778
1779        for persona in personas.values() {
1780            if let Some(parent_id) = persona.identity.parent_id.clone() {
1781                children_by_parent
1782                    .entry(parent_id)
1783                    .or_default()
1784                    .push(persona.identity.id.clone());
1785                total_edges = total_edges.saturating_add(1);
1786            } else {
1787                roots.push(persona.identity.id.clone());
1788            }
1789        }
1790
1791        let mut nodes: Vec<LineageNode> = personas
1792            .values()
1793            .map(|persona| {
1794                let mut children = children_by_parent
1795                    .get(&persona.identity.id)
1796                    .cloned()
1797                    .unwrap_or_default();
1798                children.sort();
1799
1800                LineageNode {
1801                    persona_id: persona.identity.id.clone(),
1802                    parent_id: persona.identity.parent_id.clone(),
1803                    children,
1804                    depth: persona.identity.depth,
1805                    status: persona.status,
1806                }
1807            })
1808            .collect();
1809
1810        nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1811        roots.sort();
1812
1813        LineageGraph {
1814            nodes,
1815            roots,
1816            total_edges,
1817        }
1818    }
1819
1820    /// Return a summary status.
1821    pub async fn status(&self) -> CognitionStatus {
1822        let personas = self.personas.read().await;
1823        let events = self.events.read().await;
1824        let snapshots = self.snapshots.read().await;
1825        let started_at = *self.started_at.read().await;
1826        let last_tick_at = *self.last_tick_at.read().await;
1827        let loop_interval_ms = *self.loop_interval_ms.read().await;
1828
1829        let active_persona_count = personas
1830            .values()
1831            .filter(|p| p.status == PersonaStatus::Active)
1832            .count();
1833
1834        CognitionStatus {
1835            enabled: self.enabled,
1836            running: self.running.load(Ordering::SeqCst),
1837            loop_interval_ms,
1838            started_at,
1839            last_tick_at,
1840            persona_count: personas.len(),
1841            active_persona_count,
1842            events_buffered: events.len(),
1843            snapshots_buffered: snapshots.len(),
1844        }
1845    }
1846
1847    async fn push_event(&self, event: ThoughtEvent) {
1848        push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1849    }
1850
1851    /// Set the tool registry for capability-based tool execution.
1852    pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1853        self.tools = Some(registry);
1854    }
1855
1856    /// Get current beliefs.
1857    pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1858        self.beliefs.read().await.clone()
1859    }
1860
1861    /// Get a single belief by ID.
1862    pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1863        self.beliefs.read().await.get(id).cloned()
1864    }
1865
1866    /// Get the current attention queue.
1867    pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1868        self.attention_queue.read().await.clone()
1869    }
1870
1871    /// Get all proposals.
1872    pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1873        self.proposals.read().await.clone()
1874    }
1875
1876    /// Get the current global workspace.
1877    pub async fn get_workspace(&self) -> GlobalWorkspace {
1878        self.workspace.read().await.clone()
1879    }
1880
1881    /// Get decision receipts.
1882    pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1883        self.receipts.read().await.clone()
1884    }
1885
1886    /// Approve a Critical-risk proposal for execution.
1887    pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1888        let proposals = self.proposals.read().await;
1889        let proposal = proposals
1890            .get(proposal_id)
1891            .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1892
1893        if proposal.risk != ProposalRisk::Critical {
1894            return Err(anyhow!("Only Critical proposals require human approval"));
1895        }
1896        if proposal.status != ProposalStatus::Verified {
1897            return Err(anyhow!("Proposal is not in Verified status"));
1898        }
1899        drop(proposals);
1900
1901        let mut approvals = self.pending_approvals.write().await;
1902        approvals.insert(proposal_id.to_string(), true);
1903        Ok(())
1904    }
1905
1906    /// Get governance settings.
1907    pub async fn get_governance(&self) -> SwarmGovernance {
1908        self.governance.read().await.clone()
1909    }
1910
1911    /// Get persona state for a specific persona.
1912    pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1913        self.personas.read().await.get(id).cloned()
1914    }
1915}
1916
1917async fn push_event_internal(
1918    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1919    max_events: usize,
1920    event_tx: &broadcast::Sender<ThoughtEvent>,
1921    event: ThoughtEvent,
1922) {
1923    {
1924        let mut lock = events.write().await;
1925        lock.push_back(event.clone());
1926        while lock.len() > max_events {
1927            lock.pop_front();
1928        }
1929    }
1930    let _ = event_tx.send(event);
1931}
1932
1933async fn push_snapshot_internal(
1934    snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1935    max_snapshots: usize,
1936    snapshot: MemorySnapshot,
1937) {
1938    let mut lock = snapshots.write().await;
1939    lock.push_back(snapshot);
1940    while lock.len() > max_snapshots {
1941        lock.pop_front();
1942    }
1943}
1944
1945async fn recent_persona_context(
1946    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1947    persona_id: &str,
1948    limit: usize,
1949) -> Vec<ThoughtEvent> {
1950    let lock = events.read().await;
1951    let mut selected: Vec<ThoughtEvent> = lock
1952        .iter()
1953        .rev()
1954        .filter(|event| {
1955            event.persona_id.as_deref() == Some(persona_id)
1956                || (event.persona_id.is_none()
1957                    && matches!(
1958                        event.event_type,
1959                        ThoughtEventType::CheckResult
1960                            | ThoughtEventType::ProposalCreated
1961                            | ThoughtEventType::SnapshotCompressed
1962                            | ThoughtEventType::WorkspaceUpdated
1963                            | ThoughtEventType::ActionExecuted
1964                            | ThoughtEventType::BudgetPaused
1965                    ))
1966        })
1967        .take(limit)
1968        .cloned()
1969        .collect();
1970    selected.reverse();
1971    selected
1972}
1973
1974async fn generate_phase_thought(
1975    thinker: Option<&ThinkerClient>,
1976    work: &ThoughtWorkItem,
1977    context: &[ThoughtEvent],
1978) -> ThoughtResult {
1979    let started_at = Instant::now();
1980    if let Some(client) = thinker {
1981        let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1982        match client.think(&system_prompt, &user_prompt).await {
1983            Ok(output) => {
1984                let thinking = normalize_thought_output(work, context, &output.text);
1985                if !thinking.is_empty() {
1986                    return ThoughtResult {
1987                        source: "model",
1988                        model: Some(output.model),
1989                        finish_reason: output.finish_reason,
1990                        thinking,
1991                        prompt_tokens: output.prompt_tokens,
1992                        completion_tokens: output.completion_tokens,
1993                        total_tokens: output.total_tokens,
1994                        latency_ms: started_at.elapsed().as_millis(),
1995                        error: None,
1996                    };
1997                }
1998            }
1999            Err(error) => {
2000                return ThoughtResult {
2001                    source: "fallback",
2002                    model: None,
2003                    finish_reason: None,
2004                    thinking: fallback_phase_text(work, context),
2005                    prompt_tokens: None,
2006                    completion_tokens: None,
2007                    total_tokens: None,
2008                    latency_ms: started_at.elapsed().as_millis(),
2009                    error: Some(error.to_string()),
2010                };
2011            }
2012        }
2013    }
2014
2015    ThoughtResult {
2016        source: "fallback",
2017        model: None,
2018        finish_reason: None,
2019        thinking: fallback_phase_text(work, context),
2020        prompt_tokens: None,
2021        completion_tokens: None,
2022        total_tokens: None,
2023        latency_ms: started_at.elapsed().as_millis(),
2024        error: None,
2025    }
2026}
2027
2028fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
2029    let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
2030Respond with concise plain text only. Do not include markdown, XML, or code fences. \
2031Write as an operational process update, not meta narration. \
2032Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
2033Output concrete findings, checks, risks, and next actions. \
2034Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
2035        .to_string();
2036
2037    let context_lines = if context.is_empty() {
2038        "none".to_string()
2039    } else {
2040        context
2041            .iter()
2042            .map(format_context_event)
2043            .collect::<Vec<_>>()
2044            .join("\n")
2045    };
2046
2047    let phase_instruction = match work.phase {
2048        ThoughtPhase::Observe => {
2049            "Process format (exact line labels): \
2050Phase: Observe | Goal: detect current customer/business risk | \
2051Signals: 1-3 concrete signals separated by '; ' | \
2052Uncertainty: one unknown that blocks confidence | \
2053Next_Action: one immediate operational action."
2054        }
2055        ThoughtPhase::Reflect => {
2056            "Process format (exact line labels): \
2057Phase: Reflect | Hypothesis: single testable hypothesis | \
2058Rationale: why this is likely | \
2059Business_Risk: customer/revenue/SLA impact | \
2060Validation_Next_Action: one action to confirm or falsify."
2061        }
2062        ThoughtPhase::Test => {
2063            "Process format (exact line labels): \
2064Phase: Test | Check: single concrete check | \
2065Procedure: short executable procedure | \
2066Expected_Result: pass/fail expectation | \
2067Evidence_Quality: low|medium|high with reason | \
2068Escalation_Trigger: when to escalate immediately."
2069        }
2070        ThoughtPhase::Compress => {
2071            "Process format (exact line labels): \
2072Phase: Compress | State_Summary: current state in one line | \
2073Retained_Facts: 3 short facts separated by '; ' | \
2074Open_Risks: up to 2 unresolved risks separated by '; ' | \
2075Next_Process_Step: next operational step."
2076        }
2077    };
2078
2079    let user_prompt = format!(
2080        "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2081        phase = work.phase.as_str(),
2082        persona_id = work.persona_id,
2083        persona_name = work.persona_name,
2084        role = work.role,
2085        charter = work.charter,
2086        thought_count = work.thought_count,
2087        context = context_lines,
2088        instruction = phase_instruction
2089    );
2090
2091    (system_prompt, user_prompt)
2092}
2093
2094fn format_context_event(event: &ThoughtEvent) -> String {
2095    let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2096    format!(
2097        "{} {} {}",
2098        event.event_type.as_str(),
2099        event.timestamp.to_rfc3339(),
2100        trim_for_storage(&payload, 220)
2101    )
2102}
2103
2104fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2105    let charter = trim_for_storage(&work.charter, 180);
2106    let context_summary = fallback_context_summary(context);
2107    let thought = match work.phase {
2108        ThoughtPhase::Observe => format!(
2109            "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2110            work.role, charter, context_summary
2111        ),
2112        ThoughtPhase::Reflect => format!(
2113            "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
2114        ),
2115        ThoughtPhase::Test => format!(
2116            "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
2117        ),
2118        ThoughtPhase::Compress => format!(
2119            "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2120            work.role, charter, context_summary
2121        ),
2122    };
2123    trim_for_storage(&thought, 1_200)
2124}
2125
2126fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2127    let trimmed = trim_for_storage(raw, 2_000);
2128    if trimmed.trim().is_empty() {
2129        return fallback_phase_text(work, context);
2130    }
2131
2132    // Prefer process-labeled content if the model emitted a preamble first.
2133    if let Some(idx) = find_process_label_start(&trimmed) {
2134        let candidate = trimmed[idx..].trim();
2135        // Return the full candidate (all structured fields), not just the first line.
2136        if !candidate.is_empty()
2137            && !candidate.contains('<')
2138            && !has_template_placeholder_values(candidate)
2139        {
2140            // If it's a multi-line structured output, collapse to pipe-delimited single line.
2141            let collapsed: String = candidate
2142                .lines()
2143                .map(str::trim)
2144                .filter(|l| !l.is_empty())
2145                .collect::<Vec<_>>()
2146                .join(" | ");
2147            let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2148            if cleaned.starts_with("Phase:") {
2149                return cleaned.to_string();
2150            }
2151            return collapsed;
2152        }
2153    }
2154
2155    let lower = trimmed.to_ascii_lowercase();
2156    let looks_meta = lower.starts_with("we need")
2157        || lower.starts_with("i need")
2158        || lower.contains("we need to")
2159        || lower.contains("i need to")
2160        || lower.contains("must output")
2161        || lower.contains("let's ")
2162        || lower.contains("we have to");
2163
2164    if looks_meta || has_template_placeholder_values(&trimmed) {
2165        return fallback_phase_text(work, context);
2166    }
2167
2168    trimmed
2169}
2170
2171fn has_template_placeholder_values(text: &str) -> bool {
2172    let lower = text.to_ascii_lowercase();
2173    [
2174        "goal: ...",
2175        "signals: ...",
2176        "uncertainty: ...",
2177        "next_action: ...",
2178        "hypothesis: ...",
2179        "rationale: ...",
2180        "business_risk: ...",
2181        "validation_next_action: ...",
2182        "check: ...",
2183        "procedure: ...",
2184        "expected_result: ...",
2185        "evidence_quality: ...",
2186        "escalation_trigger: ...",
2187        "state_summary: ...",
2188        "retained_facts: ...",
2189        "open_risks: ...",
2190        "next_process_step: ...",
2191    ]
2192    .iter()
2193    .any(|needle| lower.contains(needle))
2194        || lower.contains("<...")
2195        || lower.contains("tbd")
2196        || lower.contains("todo")
2197}
2198
2199fn find_process_label_start(text: &str) -> Option<usize> {
2200    [
2201        "Phase: Observe",
2202        "Phase: Reflect",
2203        "Phase: Test",
2204        "Phase: Compress",
2205        "Phase:",
2206    ]
2207    .iter()
2208    .filter_map(|label| text.find(label))
2209    .min()
2210}
2211
2212fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2213    if context.is_empty() {
2214        return "No prior events recorded yet.".to_string();
2215    }
2216
2217    let mut latest_error: Option<String> = None;
2218    let mut latest_proposal: Option<String> = None;
2219    let mut latest_check: Option<String> = None;
2220
2221    for event in context.iter().rev() {
2222        if latest_error.is_none()
2223            && let Some(error) = event
2224                .payload
2225                .get("error")
2226                .and_then(serde_json::Value::as_str)
2227            && !error.trim().is_empty()
2228        {
2229            latest_error = Some(trim_for_storage(error, 140));
2230        }
2231
2232        if latest_proposal.is_none()
2233            && event.event_type == ThoughtEventType::ProposalCreated
2234            && let Some(title) = event
2235                .payload
2236                .get("title")
2237                .and_then(serde_json::Value::as_str)
2238            && !title.trim().is_empty()
2239            && !has_template_placeholder_values(title)
2240        {
2241            latest_proposal = Some(trim_for_storage(title, 120));
2242        }
2243
2244        if latest_check.is_none()
2245            && event.event_type == ThoughtEventType::CheckResult
2246            && let Some(result) = event
2247                .payload
2248                .get("result_excerpt")
2249                .and_then(serde_json::Value::as_str)
2250            && !result.trim().is_empty()
2251            && !has_template_placeholder_values(result)
2252        {
2253            latest_check = Some(trim_for_storage(result, 140));
2254        }
2255
2256        if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2257            break;
2258        }
2259    }
2260
2261    let mut lines = vec![format!(
2262        "{} recent cognition events are available.",
2263        context.len()
2264    )];
2265    if let Some(error) = latest_error {
2266        lines.push(format!("Latest error signal: {}.", error));
2267    }
2268    if let Some(proposal) = latest_proposal {
2269        lines.push(format!("Recent proposal: {}.", proposal));
2270    }
2271    if let Some(check) = latest_check {
2272        lines.push(format!("Recent check: {}.", check));
2273    }
2274    lines.join(" ")
2275}
2276
2277fn trim_for_storage(input: &str, max_chars: usize) -> String {
2278    if input.chars().count() <= max_chars {
2279        return input.trim().to_string();
2280    }
2281    let mut trimmed = String::with_capacity(max_chars + 8);
2282    for ch in input.chars().take(max_chars) {
2283        trimmed.push(ch);
2284    }
2285    trimmed.push_str("...");
2286    trimmed.trim().to_string()
2287}
2288
2289fn estimate_fact_count(text: &str) -> usize {
2290    let sentence_count =
2291        text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2292    sentence_count.clamp(1, 12)
2293}
2294
2295fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2296    let first_line = thought
2297        .lines()
2298        .find(|line| !line.trim().is_empty())
2299        .unwrap_or("proposal");
2300    let compact = first_line
2301        .replace(['\t', '\r', '\n'], " ")
2302        .split_whitespace()
2303        .collect::<Vec<_>>()
2304        .join(" ");
2305    let trimmed = trim_for_storage(&compact, 72);
2306    if trimmed.is_empty() {
2307        format!("proposal-{}", thought_count)
2308    } else {
2309        trimmed
2310    }
2311}
2312
2313fn default_seed_persona() -> CreatePersonaRequest {
2314    CreatePersonaRequest {
2315        persona_id: Some("root-thinker".to_string()),
2316        name: "root-thinker".to_string(),
2317        role: "orchestrator".to_string(),
2318        charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2319            .to_string(),
2320        swarm_id: Some("swarm-core".to_string()),
2321        parent_id: None,
2322        policy: None,
2323        tags: vec!["orchestration".to_string()],
2324    }
2325}
2326
2327fn normalize_thinker_endpoint(base_url: &str) -> String {
2328    let trimmed = base_url.trim().trim_end_matches('/');
2329    if trimmed.ends_with("/chat/completions") {
2330        return trimmed.to_string();
2331    }
2332    if trimmed.is_empty() {
2333        return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2334    }
2335    format!("{}/chat/completions", trimmed)
2336}
2337
2338fn env_bool(name: &str, default: bool) -> bool {
2339    std::env::var(name)
2340        .ok()
2341        .and_then(|v| match v.to_ascii_lowercase().as_str() {
2342            "1" | "true" | "yes" | "on" => Some(true),
2343            "0" | "false" | "no" | "off" => Some(false),
2344            _ => None,
2345        })
2346        .unwrap_or(default)
2347}
2348
2349fn env_f32(name: &str, default: f32) -> f32 {
2350    std::env::var(name)
2351        .ok()
2352        .and_then(|v| v.parse::<f32>().ok())
2353        .unwrap_or(default)
2354}
2355
2356fn env_u64(name: &str, default: u64) -> u64 {
2357    std::env::var(name)
2358        .ok()
2359        .and_then(|v| v.parse::<u64>().ok())
2360        .unwrap_or(default)
2361}
2362
2363fn env_u32(name: &str, default: u32) -> u32 {
2364    std::env::var(name)
2365        .ok()
2366        .and_then(|v| v.parse::<u32>().ok())
2367        .unwrap_or(default)
2368}
2369
2370fn env_usize(name: &str, default: usize) -> usize {
2371    std::env::var(name)
2372        .ok()
2373        .and_then(|v| v.parse::<usize>().ok())
2374        .unwrap_or(default)
2375}
2376
2377#[cfg(test)]
2378mod tests {
2379    use super::*;
2380
2381    fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2382        ThoughtWorkItem {
2383            persona_id: "p-1".to_string(),
2384            persona_name: "Spotlessbinco Business Thinker".to_string(),
2385            role: "principal reliability engineer".to_string(),
2386            charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2387                .to_string(),
2388            swarm_id: Some("spotlessbinco".to_string()),
2389            thought_count: 4,
2390            phase,
2391        }
2392    }
2393
2394    fn test_runtime() -> CognitionRuntime {
2395        CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2396            enabled: true,
2397            loop_interval_ms: 25,
2398            max_events: 256,
2399            max_snapshots: 32,
2400            default_policy: PersonaPolicy {
2401                max_spawn_depth: 2,
2402                max_branching_factor: 2,
2403                token_budget_per_minute: 1_000,
2404                compute_ms_per_minute: 1_000,
2405                idle_ttl_secs: 300,
2406                share_memory: false,
2407                allowed_tools: Vec::new(),
2408            },
2409        })
2410    }
2411
2412    #[test]
2413    fn normalize_rejects_placeholder_process_line() {
2414        let work = sample_work_item(ThoughtPhase::Compress);
2415        let output = normalize_thought_output(
2416            &work,
2417            &[],
2418            "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2419        );
2420        assert!(
2421            output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2422        );
2423        assert!(!output.contains("State_Summary: ..."));
2424    }
2425
2426    #[test]
2427    fn normalize_accepts_concrete_process_line() {
2428        let work = sample_work_item(ThoughtPhase::Test);
2429        let output = normalize_thought_output(
2430            &work,
2431            &[],
2432            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2433        );
2434        assert_eq!(
2435            output,
2436            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2437        );
2438    }
2439
2440    #[tokio::test]
2441    async fn create_spawn_and_lineage_work() {
2442        let runtime = test_runtime();
2443
2444        let root = runtime
2445            .create_persona(CreatePersonaRequest {
2446                persona_id: Some("root".to_string()),
2447                name: "root".to_string(),
2448                role: "orchestrator".to_string(),
2449                charter: "coordinate".to_string(),
2450                swarm_id: Some("swarm-a".to_string()),
2451                parent_id: None,
2452                policy: None,
2453                tags: Vec::new(),
2454            })
2455            .await
2456            .expect("root should be created");
2457
2458        assert_eq!(root.identity.depth, 0);
2459
2460        let child = runtime
2461            .spawn_child(
2462                "root",
2463                SpawnPersonaRequest {
2464                    persona_id: Some("child-1".to_string()),
2465                    name: "child-1".to_string(),
2466                    role: "analyst".to_string(),
2467                    charter: "analyze".to_string(),
2468                    swarm_id: None,
2469                    policy: None,
2470                },
2471            )
2472            .await
2473            .expect("child should spawn");
2474
2475        assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2476        assert_eq!(child.identity.depth, 1);
2477
2478        let lineage = runtime.lineage_graph().await;
2479        assert_eq!(lineage.total_edges, 1);
2480        assert_eq!(lineage.roots, vec!["root".to_string()]);
2481    }
2482
2483    #[tokio::test]
2484    async fn branching_and_depth_limits_are_enforced() {
2485        let runtime = test_runtime();
2486
2487        runtime
2488            .create_persona(CreatePersonaRequest {
2489                persona_id: Some("root".to_string()),
2490                name: "root".to_string(),
2491                role: "orchestrator".to_string(),
2492                charter: "coordinate".to_string(),
2493                swarm_id: Some("swarm-a".to_string()),
2494                parent_id: None,
2495                policy: None,
2496                tags: Vec::new(),
2497            })
2498            .await
2499            .expect("root should be created");
2500
2501        runtime
2502            .spawn_child(
2503                "root",
2504                SpawnPersonaRequest {
2505                    persona_id: Some("c1".to_string()),
2506                    name: "c1".to_string(),
2507                    role: "worker".to_string(),
2508                    charter: "run".to_string(),
2509                    swarm_id: None,
2510                    policy: None,
2511                },
2512            )
2513            .await
2514            .expect("first child should spawn");
2515
2516        runtime
2517            .spawn_child(
2518                "root",
2519                SpawnPersonaRequest {
2520                    persona_id: Some("c2".to_string()),
2521                    name: "c2".to_string(),
2522                    role: "worker".to_string(),
2523                    charter: "run".to_string(),
2524                    swarm_id: None,
2525                    policy: None,
2526                },
2527            )
2528            .await
2529            .expect("second child should spawn");
2530
2531        let third_child = runtime
2532            .spawn_child(
2533                "root",
2534                SpawnPersonaRequest {
2535                    persona_id: Some("c3".to_string()),
2536                    name: "c3".to_string(),
2537                    role: "worker".to_string(),
2538                    charter: "run".to_string(),
2539                    swarm_id: None,
2540                    policy: None,
2541                },
2542            )
2543            .await;
2544        assert!(third_child.is_err());
2545
2546        runtime
2547            .spawn_child(
2548                "c1",
2549                SpawnPersonaRequest {
2550                    persona_id: Some("c1-1".to_string()),
2551                    name: "c1-1".to_string(),
2552                    role: "worker".to_string(),
2553                    charter: "run".to_string(),
2554                    swarm_id: None,
2555                    policy: None,
2556                },
2557            )
2558            .await
2559            .expect("depth 2 should be allowed");
2560
2561        let depth_violation = runtime
2562            .spawn_child(
2563                "c1-1",
2564                SpawnPersonaRequest {
2565                    persona_id: Some("c1-1-1".to_string()),
2566                    name: "c1-1-1".to_string(),
2567                    role: "worker".to_string(),
2568                    charter: "run".to_string(),
2569                    swarm_id: None,
2570                    policy: None,
2571                },
2572            )
2573            .await;
2574        assert!(depth_violation.is_err());
2575    }
2576
2577    #[tokio::test]
2578    async fn start_stop_updates_runtime_status() {
2579        let runtime = test_runtime();
2580
2581        runtime
2582            .start(Some(StartCognitionRequest {
2583                loop_interval_ms: Some(10),
2584                seed_persona: Some(CreatePersonaRequest {
2585                    persona_id: Some("seed".to_string()),
2586                    name: "seed".to_string(),
2587                    role: "watcher".to_string(),
2588                    charter: "observe".to_string(),
2589                    swarm_id: Some("swarm-seed".to_string()),
2590                    parent_id: None,
2591                    policy: None,
2592                    tags: Vec::new(),
2593                }),
2594            }))
2595            .await
2596            .expect("runtime should start");
2597
2598        tokio::time::sleep(Duration::from_millis(60)).await;
2599        let running_status = runtime.status().await;
2600        assert!(running_status.running);
2601        assert!(running_status.events_buffered > 0);
2602
2603        runtime
2604            .stop(Some("test".to_string()))
2605            .await
2606            .expect("runtime should stop");
2607        let stopped_status = runtime.status().await;
2608        assert!(!stopped_status.running);
2609    }
2610
2611    #[tokio::test]
2612    async fn zero_budget_persona_is_skipped() {
2613        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2614            enabled: true,
2615            loop_interval_ms: 10,
2616            max_events: 256,
2617            max_snapshots: 32,
2618            default_policy: PersonaPolicy {
2619                max_spawn_depth: 2,
2620                max_branching_factor: 2,
2621                token_budget_per_minute: 0,
2622                compute_ms_per_minute: 0,
2623                idle_ttl_secs: 3_600,
2624                share_memory: false,
2625                allowed_tools: Vec::new(),
2626            },
2627        });
2628
2629        let persona = runtime
2630            .create_persona(CreatePersonaRequest {
2631                persona_id: Some("budget-test".to_string()),
2632                name: "budget-test".to_string(),
2633                role: "tester".to_string(),
2634                charter: "test budgets".to_string(),
2635                swarm_id: None,
2636                parent_id: None,
2637                policy: None,
2638                tags: Vec::new(),
2639            })
2640            .await
2641            .expect("should create persona");
2642
2643        assert_eq!(persona.tokens_this_window, 0);
2644        assert_eq!(persona.compute_ms_this_window, 0);
2645
2646        // Start and run a few ticks
2647        runtime.start(None).await.expect("should start");
2648        tokio::time::sleep(Duration::from_millis(50)).await;
2649        runtime.stop(None).await.expect("should stop");
2650
2651        // Persona should still have 0 thought count (budget blocked)
2652        let p = runtime.get_persona("budget-test").await.unwrap();
2653        assert_eq!(p.thought_count, 0);
2654        assert!(p.budget_paused);
2655    }
2656
2657    #[tokio::test]
2658    async fn budget_counters_reset_after_window() {
2659        let now = Utc::now();
2660        let mut persona = PersonaRuntimeState {
2661            identity: PersonaIdentity {
2662                id: "p1".to_string(),
2663                name: "test".to_string(),
2664                role: "tester".to_string(),
2665                charter: "test".to_string(),
2666                swarm_id: None,
2667                parent_id: None,
2668                depth: 0,
2669                created_at: now,
2670                tags: Vec::new(),
2671            },
2672            policy: PersonaPolicy::default(),
2673            status: PersonaStatus::Active,
2674            thought_count: 0,
2675            last_tick_at: None,
2676            updated_at: now,
2677            tokens_this_window: 5000,
2678            compute_ms_this_window: 3000,
2679            window_started_at: now - ChronoDuration::seconds(61),
2680            last_progress_at: now,
2681            budget_paused: false,
2682        };
2683
2684        // Simulate window reset check
2685        let window_elapsed = now
2686            .signed_duration_since(persona.window_started_at)
2687            .num_seconds();
2688        assert!(window_elapsed >= 60);
2689
2690        // Reset
2691        persona.tokens_this_window = 0;
2692        persona.compute_ms_this_window = 0;
2693        persona.window_started_at = now;
2694
2695        assert_eq!(persona.tokens_this_window, 0);
2696        assert_eq!(persona.compute_ms_this_window, 0);
2697    }
2698
2699    #[tokio::test]
2700    async fn idle_persona_is_reaped() {
2701        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2702            enabled: true,
2703            loop_interval_ms: 10,
2704            max_events: 256,
2705            max_snapshots: 32,
2706            default_policy: PersonaPolicy {
2707                max_spawn_depth: 2,
2708                max_branching_factor: 2,
2709                token_budget_per_minute: 20_000,
2710                compute_ms_per_minute: 10_000,
2711                idle_ttl_secs: 0, // 0 TTL = immediately idle
2712                share_memory: false,
2713                allowed_tools: Vec::new(),
2714            },
2715        });
2716
2717        runtime
2718            .create_persona(CreatePersonaRequest {
2719                persona_id: Some("idle-test".to_string()),
2720                name: "idle-test".to_string(),
2721                role: "idler".to_string(),
2722                charter: "idle away".to_string(),
2723                swarm_id: None,
2724                parent_id: None,
2725                policy: None,
2726                tags: Vec::new(),
2727            })
2728            .await
2729            .expect("should create persona");
2730
2731        // Manually set last_progress_at to the past
2732        {
2733            let mut personas = runtime.personas.write().await;
2734            if let Some(p) = personas.get_mut("idle-test") {
2735                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2736            }
2737        }
2738
2739        runtime.start(None).await.expect("should start");
2740        tokio::time::sleep(Duration::from_millis(100)).await;
2741        runtime.stop(None).await.expect("should stop");
2742
2743        let p = runtime.get_persona("idle-test").await.unwrap();
2744        assert_eq!(p.status, PersonaStatus::Reaped);
2745    }
2746
2747    #[tokio::test]
2748    async fn budget_paused_persona_not_reaped_for_idle() {
2749        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2750            enabled: true,
2751            loop_interval_ms: 10,
2752            max_events: 256,
2753            max_snapshots: 32,
2754            default_policy: PersonaPolicy {
2755                max_spawn_depth: 2,
2756                max_branching_factor: 2,
2757                token_budget_per_minute: 0, // zero budget = always paused
2758                compute_ms_per_minute: 0,
2759                idle_ttl_secs: 0, // 0 TTL
2760                share_memory: false,
2761                allowed_tools: Vec::new(),
2762            },
2763        });
2764
2765        runtime
2766            .create_persona(CreatePersonaRequest {
2767                persona_id: Some("paused-test".to_string()),
2768                name: "paused-test".to_string(),
2769                role: "pauser".to_string(),
2770                charter: "pause".to_string(),
2771                swarm_id: None,
2772                parent_id: None,
2773                policy: None,
2774                tags: Vec::new(),
2775            })
2776            .await
2777            .expect("should create persona");
2778
2779        // Set last_progress_at to the past to trigger idle check
2780        {
2781            let mut personas = runtime.personas.write().await;
2782            if let Some(p) = personas.get_mut("paused-test") {
2783                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2784            }
2785        }
2786
2787        runtime.start(None).await.expect("should start");
2788        tokio::time::sleep(Duration::from_millis(100)).await;
2789        runtime.stop(None).await.expect("should stop");
2790
2791        let p = runtime.get_persona("paused-test").await.unwrap();
2792        // Budget-paused personas are NOT reaped for idle
2793        assert_eq!(p.status, PersonaStatus::Active);
2794        assert!(p.budget_paused);
2795    }
2796
2797    #[tokio::test]
2798    async fn governance_proposal_resolution() {
2799        let runtime = test_runtime();
2800        let gov = SwarmGovernance {
2801            quorum_fraction: 0.5,
2802            required_approvers_by_role: HashMap::new(),
2803            veto_roles: vec!["auditor".to_string()],
2804            vote_timeout_secs: 300,
2805        };
2806        *runtime.governance.write().await = gov;
2807
2808        // Create two personas
2809        runtime
2810            .create_persona(CreatePersonaRequest {
2811                persona_id: Some("voter-1".to_string()),
2812                name: "voter-1".to_string(),
2813                role: "engineer".to_string(),
2814                charter: "vote".to_string(),
2815                swarm_id: None,
2816                parent_id: None,
2817                policy: None,
2818                tags: Vec::new(),
2819            })
2820            .await
2821            .unwrap();
2822        runtime
2823            .create_persona(CreatePersonaRequest {
2824                persona_id: Some("voter-2".to_string()),
2825                name: "voter-2".to_string(),
2826                role: "engineer".to_string(),
2827                charter: "vote".to_string(),
2828                swarm_id: None,
2829                parent_id: None,
2830                policy: None,
2831                tags: Vec::new(),
2832            })
2833            .await
2834            .unwrap();
2835
2836        // Insert a proposal with votes
2837        {
2838            let mut proposals = runtime.proposals.write().await;
2839            let mut votes = HashMap::new();
2840            votes.insert("voter-1".to_string(), ProposalVote::Approve);
2841            proposals.insert(
2842                "prop-1".to_string(),
2843                Proposal {
2844                    id: "prop-1".to_string(),
2845                    persona_id: "voter-1".to_string(),
2846                    title: "test proposal".to_string(),
2847                    rationale: "testing governance".to_string(),
2848                    evidence_refs: Vec::new(),
2849                    risk: ProposalRisk::Low,
2850                    status: ProposalStatus::Created,
2851                    created_at: Utc::now(),
2852                    votes,
2853                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2854                    votes_requested: true,
2855                    quorum_needed: 1,
2856                },
2857            );
2858        }
2859
2860        // quorum = 0.5 * 2 = 1, and we have 1 approval → should be verified
2861        runtime.start(None).await.unwrap();
2862        tokio::time::sleep(Duration::from_millis(100)).await;
2863        runtime.stop(None).await.unwrap();
2864
2865        let proposals = runtime.get_proposals().await;
2866        let prop = proposals.get("prop-1").unwrap();
2867        // Should be verified or executed (it gets verified then executed in same tick)
2868        assert!(
2869            prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2870            "Expected Verified or Executed, got {:?}",
2871            prop.status
2872        );
2873    }
2874
2875    #[tokio::test]
2876    async fn veto_rejects_proposal() {
2877        let runtime = test_runtime();
2878        let gov = SwarmGovernance {
2879            quorum_fraction: 0.5,
2880            required_approvers_by_role: HashMap::new(),
2881            veto_roles: vec!["auditor".to_string()],
2882            vote_timeout_secs: 300,
2883        };
2884        *runtime.governance.write().await = gov;
2885
2886        runtime
2887            .create_persona(CreatePersonaRequest {
2888                persona_id: Some("eng".to_string()),
2889                name: "eng".to_string(),
2890                role: "engineer".to_string(),
2891                charter: "build".to_string(),
2892                swarm_id: None,
2893                parent_id: None,
2894                policy: None,
2895                tags: Vec::new(),
2896            })
2897            .await
2898            .unwrap();
2899        runtime
2900            .create_persona(CreatePersonaRequest {
2901                persona_id: Some("aud".to_string()),
2902                name: "aud".to_string(),
2903                role: "auditor".to_string(),
2904                charter: "audit".to_string(),
2905                swarm_id: None,
2906                parent_id: None,
2907                policy: None,
2908                tags: Vec::new(),
2909            })
2910            .await
2911            .unwrap();
2912
2913        {
2914            let mut proposals = runtime.proposals.write().await;
2915            let mut votes = HashMap::new();
2916            votes.insert("eng".to_string(), ProposalVote::Approve);
2917            votes.insert("aud".to_string(), ProposalVote::Veto);
2918            proposals.insert(
2919                "prop-veto".to_string(),
2920                Proposal {
2921                    id: "prop-veto".to_string(),
2922                    persona_id: "eng".to_string(),
2923                    title: "vetoed proposal".to_string(),
2924                    rationale: "testing veto".to_string(),
2925                    evidence_refs: Vec::new(),
2926                    risk: ProposalRisk::Low,
2927                    status: ProposalStatus::Created,
2928                    created_at: Utc::now(),
2929                    votes,
2930                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2931                    votes_requested: true,
2932                    quorum_needed: 1,
2933                },
2934            );
2935        }
2936
2937        runtime.start(None).await.unwrap();
2938        tokio::time::sleep(Duration::from_millis(100)).await;
2939        runtime.stop(None).await.unwrap();
2940
2941        let proposals = runtime.get_proposals().await;
2942        let prop = proposals.get("prop-veto").unwrap();
2943        assert_eq!(prop.status, ProposalStatus::Rejected);
2944    }
2945
2946    #[test]
2947    fn global_workspace_default() {
2948        let ws = GlobalWorkspace::default();
2949        assert!(ws.top_beliefs.is_empty());
2950        assert!(ws.top_uncertainties.is_empty());
2951        assert!(ws.top_attention.is_empty());
2952    }
2953
2954    #[test]
2955    fn attention_item_creation() {
2956        let item = AttentionItem {
2957            id: "a1".to_string(),
2958            topic: "test topic".to_string(),
2959            topic_tags: vec!["reliability".to_string()],
2960            priority: 0.8,
2961            source_type: AttentionSource::ContestedBelief,
2962            source_id: "b1".to_string(),
2963            assigned_persona: None,
2964            created_at: Utc::now(),
2965            resolved_at: None,
2966        };
2967        assert!(item.resolved_at.is_none());
2968        assert_eq!(item.priority, 0.8);
2969    }
2970}