Skip to main content

codetether_agent/cognition/
mod.rs

1//! Perpetual cognition runtime for persona swarms.
2//!
3//! Phase 0 scope:
4//! - Contract types for personas, thought events, proposals, and snapshots
5//! - In-memory runtime manager for lifecycle + lineage
6//! - Feature-flagged perpetual loop (no external execution side effects)
7
8pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13pub mod tool_router;
14
15pub use thinker::{
16    CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
17};
18
19use crate::tool::ToolRegistry;
20use anyhow::{Result, anyhow};
21use beliefs::{Belief, BeliefStatus};
22use chrono::{DateTime, Duration as ChronoDuration, Utc};
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::{HashMap, VecDeque};
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::time::Duration;
29use tokio::sync::{Mutex, RwLock, broadcast};
30use tokio::task::JoinHandle;
31
32// Ensure re-exported types are referenced to suppress warnings.
33const _: () = {
34    fn _assert_types_used() {
35        let _ = std::mem::size_of::<CandleDevicePreference>();
36        let _ = std::mem::size_of::<ThinkerBackend>();
37        let _ = std::mem::size_of::<ThinkerOutput>();
38    }
39};
40use tokio::time::Instant;
41use uuid::Uuid;
42
43/// Persona execution status.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum PersonaStatus {
47    Active,
48    Idle,
49    Reaped,
50    Error,
51}
52
53/// Policy boundaries for a persona.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct PersonaPolicy {
56    pub max_spawn_depth: u32,
57    pub max_branching_factor: u32,
58    pub token_budget_per_minute: u32,
59    pub compute_ms_per_minute: u32,
60    pub idle_ttl_secs: u64,
61    pub share_memory: bool,
62    #[serde(default)]
63    pub allowed_tools: Vec<String>,
64}
65
66impl Default for PersonaPolicy {
67    fn default() -> Self {
68        Self {
69            max_spawn_depth: 4,
70            max_branching_factor: 4,
71            token_budget_per_minute: 20_000,
72            compute_ms_per_minute: 10_000,
73            idle_ttl_secs: 3_600,
74            share_memory: false,
75            allowed_tools: Vec::new(),
76        }
77    }
78}
79
80/// Identity contract for a persona.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct PersonaIdentity {
83    pub id: String,
84    pub name: String,
85    pub role: String,
86    pub charter: String,
87    pub swarm_id: Option<String>,
88    pub parent_id: Option<String>,
89    pub depth: u32,
90    pub created_at: DateTime<Utc>,
91    #[serde(default)]
92    pub tags: Vec<String>,
93}
94
95/// Full runtime state for a persona.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct PersonaRuntimeState {
98    pub identity: PersonaIdentity,
99    pub policy: PersonaPolicy,
100    pub status: PersonaStatus,
101    pub thought_count: u64,
102    pub last_tick_at: Option<DateTime<Utc>>,
103    pub updated_at: DateTime<Utc>,
104    /// Tokens consumed in current 60-second window.
105    pub tokens_this_window: u32,
106    /// Compute milliseconds consumed in current 60-second window.
107    pub compute_ms_this_window: u32,
108    /// Start of the current budget window.
109    pub window_started_at: DateTime<Utc>,
110    /// Last time this persona made meaningful progress (not budget-paused/quorum-waiting).
111    pub last_progress_at: DateTime<Utc>,
112    /// Whether this persona is currently paused due to budget exhaustion.
113    #[serde(default)]
114    pub budget_paused: bool,
115}
116
117/// Proposal risk level.
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
119#[serde(rename_all = "snake_case")]
120pub enum ProposalRisk {
121    Low,
122    Medium,
123    High,
124    Critical,
125}
126
127/// Proposal status.
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
129#[serde(rename_all = "snake_case")]
130pub enum ProposalStatus {
131    Created,
132    Verified,
133    Rejected,
134    Executed,
135}
136
137/// A vote on a proposal.
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub enum ProposalVote {
141    Approve,
142    Reject,
143    Veto,
144    Abstain,
145}
146
147/// Proposal contract (think first, execute through gates).
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct Proposal {
150    pub id: String,
151    pub persona_id: String,
152    pub title: String,
153    pub rationale: String,
154    pub evidence_refs: Vec<String>,
155    pub risk: ProposalRisk,
156    pub status: ProposalStatus,
157    pub created_at: DateTime<Utc>,
158    /// Votes from personas: persona_id -> vote
159    #[serde(default)]
160    pub votes: HashMap<String, ProposalVote>,
161    /// Deadline for voting
162    pub vote_deadline: Option<DateTime<Utc>>,
163    /// Whether votes have been requested this cycle
164    #[serde(default)]
165    pub votes_requested: bool,
166    /// Quorum required (frozen at creation time to prevent drift).
167    #[serde(default)]
168    pub quorum_needed: usize,
169}
170
171/// Thought/event types emitted by the cognition loop.
172#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
173#[serde(rename_all = "snake_case")]
174pub enum ThoughtEventType {
175    ThoughtGenerated,
176    HypothesisRaised,
177    CheckRequested,
178    CheckResult,
179    ProposalCreated,
180    ProposalVerified,
181    ProposalRejected,
182    ActionExecuted,
183    PersonaSpawned,
184    PersonaReaped,
185    SnapshotCompressed,
186    BeliefExtracted,
187    BeliefContested,
188    BeliefRevalidated,
189    BudgetPaused,
190    IdleReaped,
191    AttentionCreated,
192    VoteCast,
193    WorkspaceUpdated,
194}
195
196impl ThoughtEventType {
197    fn as_str(&self) -> &'static str {
198        match self {
199            Self::ThoughtGenerated => "thought_generated",
200            Self::HypothesisRaised => "hypothesis_raised",
201            Self::CheckRequested => "check_requested",
202            Self::CheckResult => "check_result",
203            Self::ProposalCreated => "proposal_created",
204            Self::ProposalVerified => "proposal_verified",
205            Self::ProposalRejected => "proposal_rejected",
206            Self::ActionExecuted => "action_executed",
207            Self::PersonaSpawned => "persona_spawned",
208            Self::PersonaReaped => "persona_reaped",
209            Self::SnapshotCompressed => "snapshot_compressed",
210            Self::BeliefExtracted => "belief_extracted",
211            Self::BeliefContested => "belief_contested",
212            Self::BeliefRevalidated => "belief_revalidated",
213            Self::BudgetPaused => "budget_paused",
214            Self::IdleReaped => "idle_reaped",
215            Self::AttentionCreated => "attention_created",
216            Self::VoteCast => "vote_cast",
217            Self::WorkspaceUpdated => "workspace_updated",
218        }
219    }
220}
221
222/// Streamable thought event contract.
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct ThoughtEvent {
225    pub id: String,
226    pub event_type: ThoughtEventType,
227    pub persona_id: Option<String>,
228    pub swarm_id: Option<String>,
229    pub timestamp: DateTime<Utc>,
230    pub payload: serde_json::Value,
231}
232
233/// Distilled memory snapshot contract.
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct MemorySnapshot {
236    pub id: String,
237    pub generated_at: DateTime<Utc>,
238    pub swarm_id: Option<String>,
239    pub persona_scope: Vec<String>,
240    pub summary: String,
241    pub hot_event_count: usize,
242    pub warm_fact_count: usize,
243    pub cold_snapshot_count: usize,
244    pub metadata: HashMap<String, serde_json::Value>,
245}
246
247/// Request payload for creating a persona.
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct CreatePersonaRequest {
250    pub persona_id: Option<String>,
251    pub name: String,
252    pub role: String,
253    pub charter: String,
254    pub swarm_id: Option<String>,
255    pub parent_id: Option<String>,
256    pub policy: Option<PersonaPolicy>,
257    #[serde(default)]
258    pub tags: Vec<String>,
259}
260
261/// Request payload for spawning a child persona.
262#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct SpawnPersonaRequest {
264    pub persona_id: Option<String>,
265    pub name: String,
266    pub role: String,
267    pub charter: String,
268    pub swarm_id: Option<String>,
269    pub policy: Option<PersonaPolicy>,
270}
271
272/// Request payload for reaping persona(s).
273#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct ReapPersonaRequest {
275    pub cascade: Option<bool>,
276    pub reason: Option<String>,
277}
278
279/// Start-control payload for perpetual cognition.
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct StartCognitionRequest {
282    pub loop_interval_ms: Option<u64>,
283    pub seed_persona: Option<CreatePersonaRequest>,
284}
285
286/// Stop-control payload for perpetual cognition.
287#[derive(Debug, Clone, Serialize, Deserialize)]
288pub struct StopCognitionRequest {
289    pub reason: Option<String>,
290}
291
292// ── Attention, Governance, GlobalWorkspace ──────────────────────────────
293
294/// Source of an attention item.
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
296#[serde(rename_all = "snake_case")]
297pub enum AttentionSource {
298    ContestedBelief,
299    FailedCheck,
300    StaleBelief,
301    ProposalTimeout,
302    FailedExecution,
303}
304
305/// An item requiring persona attention.
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct AttentionItem {
308    pub id: String,
309    pub topic: String,
310    pub topic_tags: Vec<String>,
311    pub priority: f32,
312    pub source_type: AttentionSource,
313    pub source_id: String,
314    pub assigned_persona: Option<String>,
315    pub created_at: DateTime<Utc>,
316    pub resolved_at: Option<DateTime<Utc>>,
317}
318
319/// Governance rules for swarm voting.
320#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct SwarmGovernance {
322    pub quorum_fraction: f32,
323    pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
324    pub veto_roles: Vec<String>,
325    pub vote_timeout_secs: u64,
326}
327
328impl Default for SwarmGovernance {
329    fn default() -> Self {
330        Self {
331            quorum_fraction: 0.5,
332            required_approvers_by_role: HashMap::new(),
333            veto_roles: Vec::new(),
334            vote_timeout_secs: 300,
335        }
336    }
337}
338
339/// The coherent "now" for the entire swarm.
340#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct GlobalWorkspace {
342    pub top_beliefs: Vec<String>,
343    pub top_uncertainties: Vec<String>,
344    pub top_attention: Vec<String>,
345    pub active_objectives: Vec<String>,
346    pub updated_at: DateTime<Utc>,
347}
348
349impl Default for GlobalWorkspace {
350    fn default() -> Self {
351        Self {
352            top_beliefs: Vec::new(),
353            top_uncertainties: Vec::new(),
354            top_attention: Vec::new(),
355            active_objectives: Vec::new(),
356            updated_at: Utc::now(),
357        }
358    }
359}
360
361/// Start/stop response status.
362#[derive(Debug, Clone, Serialize, Deserialize)]
363pub struct CognitionStatus {
364    pub enabled: bool,
365    pub running: bool,
366    pub loop_interval_ms: u64,
367    pub started_at: Option<DateTime<Utc>>,
368    pub last_tick_at: Option<DateTime<Utc>>,
369    pub persona_count: usize,
370    pub active_persona_count: usize,
371    pub events_buffered: usize,
372    pub snapshots_buffered: usize,
373}
374
375/// Reap response.
376#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct ReapPersonaResponse {
378    pub reaped_ids: Vec<String>,
379    pub count: usize,
380}
381
382/// Lineage node response.
383#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct LineageNode {
385    pub persona_id: String,
386    pub parent_id: Option<String>,
387    pub children: Vec<String>,
388    pub depth: u32,
389    pub status: PersonaStatus,
390}
391
392/// Lineage graph response.
393#[derive(Debug, Clone, Serialize, Deserialize)]
394pub struct LineageGraph {
395    pub nodes: Vec<LineageNode>,
396    pub roots: Vec<String>,
397    pub total_edges: usize,
398}
399
400#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401enum ThoughtPhase {
402    Observe,
403    Reflect,
404    Test,
405    Compress,
406}
407
408impl ThoughtPhase {
409    fn from_thought_count(thought_count: u64) -> Self {
410        match thought_count % 4 {
411            1 => Self::Observe,
412            2 => Self::Reflect,
413            3 => Self::Test,
414            _ => Self::Compress,
415        }
416    }
417
418    fn as_str(&self) -> &'static str {
419        match self {
420            Self::Observe => "observe",
421            Self::Reflect => "reflect",
422            Self::Test => "test",
423            Self::Compress => "compress",
424        }
425    }
426
427    fn event_type(&self) -> ThoughtEventType {
428        match self {
429            Self::Observe => ThoughtEventType::ThoughtGenerated,
430            Self::Reflect => ThoughtEventType::HypothesisRaised,
431            Self::Test => ThoughtEventType::CheckRequested,
432            Self::Compress => ThoughtEventType::SnapshotCompressed,
433        }
434    }
435}
436
437#[derive(Debug, Clone)]
438struct ThoughtWorkItem {
439    persona_id: String,
440    persona_name: String,
441    role: String,
442    charter: String,
443    swarm_id: Option<String>,
444    thought_count: u64,
445    phase: ThoughtPhase,
446}
447
448#[derive(Debug, Clone)]
449struct ThoughtResult {
450    source: &'static str,
451    model: Option<String>,
452    finish_reason: Option<String>,
453    thinking: String,
454    prompt_tokens: Option<u32>,
455    completion_tokens: Option<u32>,
456    total_tokens: Option<u32>,
457    latency_ms: u128,
458    error: Option<String>,
459}
460
461/// Runtime options for cognition manager.
462#[derive(Debug, Clone)]
463pub struct CognitionRuntimeOptions {
464    pub enabled: bool,
465    pub loop_interval_ms: u64,
466    pub max_events: usize,
467    pub max_snapshots: usize,
468    pub default_policy: PersonaPolicy,
469}
470
471impl Default for CognitionRuntimeOptions {
472    fn default() -> Self {
473        Self {
474            enabled: false,
475            loop_interval_ms: 2_000,
476            max_events: 2_000,
477            max_snapshots: 128,
478            default_policy: PersonaPolicy::default(),
479        }
480    }
481}
482
483/// In-memory cognition runtime for perpetual persona swarms.
484#[derive(Debug)]
485pub struct CognitionRuntime {
486    enabled: bool,
487    max_events: usize,
488    max_snapshots: usize,
489    default_policy: PersonaPolicy,
490    running: Arc<AtomicBool>,
491    loop_interval_ms: Arc<RwLock<u64>>,
492    started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
493    last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
494    personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
495    proposals: Arc<RwLock<HashMap<String, Proposal>>>,
496    events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
497    snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
498    loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
499    event_tx: broadcast::Sender<ThoughtEvent>,
500    thinker: Option<Arc<ThinkerClient>>,
501    beliefs: Arc<RwLock<HashMap<String, Belief>>>,
502    attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
503    governance: Arc<RwLock<SwarmGovernance>>,
504    workspace: Arc<RwLock<GlobalWorkspace>>,
505    tools: Option<Arc<ToolRegistry>>,
506    receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
507    /// Proposals pending human approval for Critical risk.
508    pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
509}
510
511impl CognitionRuntime {
512    /// Build runtime from environment feature flags.
513    pub fn new_from_env() -> Self {
514        let mut options = CognitionRuntimeOptions::default();
515        options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
516        options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
517        options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
518        options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
519
520        options.default_policy = PersonaPolicy {
521            max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
522            max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
523            token_budget_per_minute: env_u32(
524                "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
525                20_000,
526            ),
527            compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
528            idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
529            share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
530            allowed_tools: Vec::new(),
531        };
532
533        let thinker_backend = thinker::ThinkerBackend::from_env(
534            &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
535                .unwrap_or_else(|_| "openai_compat".to_string()),
536        );
537        let thinker_timeout_default = match thinker_backend {
538            thinker::ThinkerBackend::OpenAICompat => 30_000,
539            thinker::ThinkerBackend::Candle => 12_000,
540            thinker::ThinkerBackend::Bedrock => 60_000,
541        };
542        let thinker_config = ThinkerConfig {
543            enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
544            backend: thinker_backend,
545            endpoint: normalize_thinker_endpoint(
546                &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
547                    .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
548            ),
549            model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
550                .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
551            api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
552            temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
553            top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
554                .ok()
555                .and_then(|v| v.parse::<f32>().ok()),
556            max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
557            timeout_ms: env_u64(
558                "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
559                thinker_timeout_default,
560            ),
561            candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
562            candle_tokenizer_path: std::env::var(
563                "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
564            )
565            .ok(),
566            candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
567            candle_device: thinker::CandleDevicePreference::from_env(
568                &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
569                    .unwrap_or_else(|_| "auto".to_string()),
570            ),
571            candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
572            candle_repeat_penalty: env_f32(
573                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
574                1.1,
575            ),
576            candle_repeat_last_n: env_usize(
577                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
578                64,
579            ),
580            candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
581            bedrock_region: std::env::var("CODETETHER_COGNITION_THINKER_BEDROCK_REGION")
582                .unwrap_or_else(|_| {
583                    std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-west-2".to_string())
584                }),
585        };
586
587        let mut runtime = Self::new_with_options(options);
588        // Configure the thinker on the existing runtime
589        runtime.thinker = Some(thinker_config).and_then(|cfg| {
590            if !cfg.enabled {
591                return None;
592            }
593            match ThinkerClient::new(cfg) {
594                Ok(client) => {
595                    tracing::info!(
596                        backend = ?client.config().backend,
597                        endpoint = %client.config().endpoint,
598                        model = %client.config().model,
599                        "Cognition thinker initialized"
600                    );
601                    Some(Arc::new(client))
602                }
603                Err(error) => {
604                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
605                    None
606                }
607            }
608        });
609        runtime
610    }
611
612    /// Build runtime from explicit options.
613    pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
614        Self::new_with_options_and_thinker(options, None)
615    }
616
617    fn new_with_options_and_thinker(
618        options: CognitionRuntimeOptions,
619        thinker_config: Option<ThinkerConfig>,
620    ) -> Self {
621        let (event_tx, _) = broadcast::channel(256);
622        let thinker = thinker_config.and_then(|cfg| {
623            if !cfg.enabled {
624                return None;
625            }
626            match ThinkerClient::new(cfg) {
627                Ok(client) => {
628                    tracing::info!(
629                        backend = ?client.config().backend,
630                        endpoint = %client.config().endpoint,
631                        model = %client.config().model,
632                        "Cognition thinker initialized"
633                    );
634                    Some(Arc::new(client))
635                }
636                Err(error) => {
637                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
638                    None
639                }
640            }
641        });
642
643        // Load persisted state before construction to avoid blocking_write()
644        // inside a tokio runtime (which panics).
645        let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) =
646            if let Some(persisted) = persistence::load_state() {
647                tracing::info!(
648                    personas = persisted.personas.len(),
649                    beliefs = persisted.beliefs.len(),
650                    persisted_at = %persisted.persisted_at,
651                    "Restoring persisted cognition state"
652                );
653                (
654                    persisted.personas,
655                    persisted.beliefs,
656                    persisted.proposals,
657                    persisted.attention_queue,
658                    persisted.workspace,
659                )
660            } else {
661                (
662                    HashMap::new(),
663                    HashMap::new(),
664                    HashMap::new(),
665                    Vec::new(),
666                    GlobalWorkspace::default(),
667                )
668            };
669
670        let runtime = Self {
671            enabled: options.enabled,
672            max_events: options.max_events.max(32),
673            max_snapshots: options.max_snapshots.max(8),
674            default_policy: options.default_policy,
675            running: Arc::new(AtomicBool::new(false)),
676            loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
677            started_at: Arc::new(RwLock::new(None)),
678            last_tick_at: Arc::new(RwLock::new(None)),
679            personas: Arc::new(RwLock::new(init_personas)),
680            proposals: Arc::new(RwLock::new(init_proposals)),
681            events: Arc::new(RwLock::new(VecDeque::new())),
682            snapshots: Arc::new(RwLock::new(VecDeque::new())),
683            loop_handle: Arc::new(Mutex::new(None)),
684            event_tx,
685            thinker,
686            beliefs: Arc::new(RwLock::new(init_beliefs)),
687            attention_queue: Arc::new(RwLock::new(init_attention)),
688            governance: Arc::new(RwLock::new(SwarmGovernance::default())),
689            workspace: Arc::new(RwLock::new(init_workspace)),
690            tools: None,
691            receipts: Arc::new(RwLock::new(Vec::new())),
692            pending_approvals: Arc::new(RwLock::new(HashMap::new())),
693        };
694
695        runtime
696    }
697
698    /// Whether cognition is enabled by feature flag.
699    pub fn is_enabled(&self) -> bool {
700        self.enabled
701    }
702
703    /// Subscribe to thought events for streaming.
704    pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
705        self.event_tx.subscribe()
706    }
707
708    /// Start the perpetual cognition loop.
709    pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
710        if !self.enabled {
711            return Err(anyhow!(
712                "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
713            ));
714        }
715
716        let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
717        if let Some(request) = req {
718            if let Some(interval) = request.loop_interval_ms {
719                let mut lock = self.loop_interval_ms.write().await;
720                *lock = interval.max(100);
721            }
722            requested_seed_persona = request.seed_persona;
723        }
724
725        let has_personas = !self.personas.read().await.is_empty();
726        if !has_personas {
727            let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
728            let _ = self.create_persona(seed).await?;
729        }
730
731        if self.running.load(Ordering::SeqCst) {
732            return Ok(self.status().await);
733        }
734
735        self.running.store(true, Ordering::SeqCst);
736        {
737            let mut started = self.started_at.write().await;
738            *started = Some(Utc::now());
739        }
740
741        let running = Arc::clone(&self.running);
742        let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
743        let last_tick_at = Arc::clone(&self.last_tick_at);
744        let personas = Arc::clone(&self.personas);
745        let proposals = Arc::clone(&self.proposals);
746        let events = Arc::clone(&self.events);
747        let snapshots = Arc::clone(&self.snapshots);
748        let max_events = self.max_events;
749        let max_snapshots = self.max_snapshots;
750        let event_tx = self.event_tx.clone();
751        let thinker = self.thinker.clone();
752        let beliefs = Arc::clone(&self.beliefs);
753        let attention_queue = Arc::clone(&self.attention_queue);
754        let governance = Arc::clone(&self.governance);
755        let workspace = Arc::clone(&self.workspace);
756        let tools = self.tools.clone();
757        let receipts = Arc::clone(&self.receipts);
758        let pending_approvals = Arc::clone(&self.pending_approvals);
759
760        let handle = tokio::spawn(async move {
761            let mut next_tick = Instant::now();
762            while running.load(Ordering::SeqCst) {
763                let now_instant = Instant::now();
764                if now_instant < next_tick {
765                    tokio::time::sleep_until(next_tick).await;
766                }
767                if !running.load(Ordering::SeqCst) {
768                    break;
769                }
770
771                let now = Utc::now();
772                {
773                    let mut lock = last_tick_at.write().await;
774                    *lock = Some(now);
775                }
776
777                let mut new_events = Vec::new();
778                let mut new_snapshots = Vec::new();
779                let mut new_proposals = Vec::new();
780
781                // ── Step 1: Budget window reset + enforcement ──
782                let work_items: Vec<ThoughtWorkItem> = {
783                    let mut map = personas.write().await;
784                    let mut items = Vec::new();
785                    for persona in map.values_mut() {
786                        if persona.status != PersonaStatus::Active {
787                            continue;
788                        }
789
790                        // Reset budget window every 60 seconds
791                        let window_elapsed = now
792                            .signed_duration_since(persona.window_started_at)
793                            .num_seconds();
794                        if window_elapsed >= 60 {
795                            persona.tokens_this_window = 0;
796                            persona.compute_ms_this_window = 0;
797                            persona.window_started_at = now;
798                        }
799
800                        // Check budget
801                        let token_ok =
802                            persona.tokens_this_window < persona.policy.token_budget_per_minute;
803                        let compute_ok =
804                            persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
805                        if !token_ok || !compute_ok {
806                            if !persona.budget_paused {
807                                persona.budget_paused = true;
808                                new_events.push(ThoughtEvent {
809                                    id: Uuid::new_v4().to_string(),
810                                    event_type: ThoughtEventType::BudgetPaused,
811                                    persona_id: Some(persona.identity.id.clone()),
812                                    swarm_id: persona.identity.swarm_id.clone(),
813                                    timestamp: now,
814                                    payload: json!({
815                                        "budget_paused": true,
816                                        "tokens_used": persona.tokens_this_window,
817                                        "compute_ms_used": persona.compute_ms_this_window,
818                                        "token_budget": persona.policy.token_budget_per_minute,
819                                        "compute_budget": persona.policy.compute_ms_per_minute,
820                                    }),
821                                });
822                            }
823                            continue; // skip tick for this persona
824                        }
825
826                        persona.budget_paused = false;
827                        persona.thought_count = persona.thought_count.saturating_add(1);
828                        persona.last_tick_at = Some(now);
829                        persona.updated_at = now;
830                        items.push(ThoughtWorkItem {
831                            persona_id: persona.identity.id.clone(),
832                            persona_name: persona.identity.name.clone(),
833                            role: persona.identity.role.clone(),
834                            charter: persona.identity.charter.clone(),
835                            swarm_id: persona.identity.swarm_id.clone(),
836                            thought_count: persona.thought_count,
837                            phase: ThoughtPhase::from_thought_count(persona.thought_count),
838                        });
839                    }
840                    items
841                };
842
843                for work in &work_items {
844                    let context = recent_persona_context(&events, &work.persona_id, 8).await;
845
846                    let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
847
848                    let event_timestamp = Utc::now();
849                    let is_fallback = thought.source == "fallback";
850                    let tokens_used = thought.total_tokens.unwrap_or(0);
851                    let compute_used = thought.latency_ms as u32;
852
853                    new_events.push(ThoughtEvent {
854                        id: Uuid::new_v4().to_string(),
855                        event_type: work.phase.event_type(),
856                        persona_id: Some(work.persona_id.clone()),
857                        swarm_id: work.swarm_id.clone(),
858                        timestamp: event_timestamp,
859                        payload: json!({
860                            "phase": work.phase.as_str(),
861                            "thought_count": work.thought_count,
862                            "persona": {
863                                "id": work.persona_id.clone(),
864                                "name": work.persona_name.clone(),
865                                "role": work.role.clone(),
866                            },
867                            "context_event_count": context.len(),
868                            "thinking": thought.thinking.clone(),
869                            "source": thought.source,
870                            "model": thought.model.clone(),
871                            "finish_reason": thought.finish_reason.clone(),
872                            "usage": {
873                                "prompt_tokens": thought.prompt_tokens,
874                                "completion_tokens": thought.completion_tokens,
875                                "total_tokens": thought.total_tokens,
876                            },
877                            "latency_ms": thought.latency_ms,
878                            "error": thought.error.clone(),
879                        }),
880                    });
881
882                    // ── After thought: update budget counters ──
883                    {
884                        let mut map = personas.write().await;
885                        if let Some(persona) = map.get_mut(&work.persona_id) {
886                            persona.tokens_this_window =
887                                persona.tokens_this_window.saturating_add(tokens_used);
888                            persona.compute_ms_this_window =
889                                persona.compute_ms_this_window.saturating_add(compute_used);
890                            if !is_fallback {
891                                persona.last_progress_at = Utc::now();
892                            }
893                        }
894                    }
895
896                    // ── Step 3: Belief extraction during Reflect phase ──
897                    if work.phase == ThoughtPhase::Reflect && !is_fallback {
898                        let extracted = beliefs::extract_beliefs_from_thought(
899                            thinker.as_deref(),
900                            &work.persona_id,
901                            &thought.thinking,
902                        )
903                        .await;
904
905                        if !extracted.is_empty() {
906                            let mut belief_store = beliefs.write().await;
907                            let mut attn_queue = attention_queue.write().await;
908                            for mut new_belief in extracted {
909                                // Check for existing belief_key (duplicate detection)
910                                let existing_id = belief_store
911                                    .values()
912                                    .find(|b| {
913                                        b.belief_key == new_belief.belief_key
914                                            && b.status != BeliefStatus::Invalidated
915                                    })
916                                    .map(|b| b.id.clone());
917
918                                if let Some(eid) = existing_id {
919                                    // Confirm existing belief
920                                    if let Some(existing) = belief_store.get_mut(&eid) {
921                                        if !existing.confirmed_by.contains(&work.persona_id) {
922                                            existing.confirmed_by.push(work.persona_id.clone());
923                                        }
924                                        existing.revalidation_success();
925                                    }
926                                } else {
927                                    // Handle contradiction targets
928                                    let contest_targets: Vec<String> =
929                                        new_belief.contradicts.clone();
930                                    for target_key in &contest_targets {
931                                        if let Some(target) = belief_store.values_mut().find(|b| {
932                                            &b.belief_key == target_key
933                                                && b.status != BeliefStatus::Invalidated
934                                        }) {
935                                            target.contested_by.push(new_belief.id.clone());
936                                            if !new_belief.contradicts.contains(&target.belief_key)
937                                            {
938                                                new_belief
939                                                    .contradicts
940                                                    .push(target.belief_key.clone());
941                                            }
942                                            // Apply contest penalty
943                                            target.revalidation_failure();
944                                            if target.confidence >= 0.5 {
945                                                // Create revalidation attention item
946                                                attn_queue.push(AttentionItem {
947                                                    id: Uuid::new_v4().to_string(),
948                                                    topic: format!(
949                                                        "Revalidate belief: {}",
950                                                        target.claim
951                                                    ),
952                                                    topic_tags: vec![target.belief_key.clone()],
953                                                    priority: 0.7,
954                                                    source_type: AttentionSource::ContestedBelief,
955                                                    source_id: target.id.clone(),
956                                                    assigned_persona: None,
957                                                    created_at: Utc::now(),
958                                                    resolved_at: None,
959                                                });
960                                            }
961                                        }
962                                    }
963
964                                    new_events.push(ThoughtEvent {
965                                        id: Uuid::new_v4().to_string(),
966                                        event_type: ThoughtEventType::BeliefExtracted,
967                                        persona_id: Some(work.persona_id.clone()),
968                                        swarm_id: work.swarm_id.clone(),
969                                        timestamp: Utc::now(),
970                                        payload: json!({
971                                            "belief_id": new_belief.id,
972                                            "belief_key": new_belief.belief_key,
973                                            "claim": trim_for_storage(&new_belief.claim, 280),
974                                            "confidence": new_belief.confidence,
975                                        }),
976                                    });
977
978                                    // Mark progress for belief creation
979                                    {
980                                        let mut map = personas.write().await;
981                                        if let Some(p) = map.get_mut(&work.persona_id) {
982                                            p.last_progress_at = Utc::now();
983                                        }
984                                    }
985
986                                    new_belief.clamp_confidence();
987                                    belief_store.insert(new_belief.id.clone(), new_belief);
988                                }
989                            }
990                        }
991                    }
992
993                    if work.phase == ThoughtPhase::Test {
994                        new_events.push(ThoughtEvent {
995                            id: Uuid::new_v4().to_string(),
996                            event_type: ThoughtEventType::CheckResult,
997                            persona_id: Some(work.persona_id.clone()),
998                            swarm_id: work.swarm_id.clone(),
999                            timestamp: Utc::now(),
1000                            payload: json!({
1001                                "phase": work.phase.as_str(),
1002                                "thought_count": work.thought_count,
1003                                "result_excerpt": trim_for_storage(&thought.thinking, 280),
1004                                "source": thought.source,
1005                                "model": thought.model,
1006                            }),
1007                        });
1008
1009                        // Mark progress for check result
1010                        {
1011                            let mut map = personas.write().await;
1012                            if let Some(p) = map.get_mut(&work.persona_id) {
1013                                p.last_progress_at = Utc::now();
1014                            }
1015                        }
1016
1017                        // ── Step 6: Tool execution via capability leases ──
1018                        if !is_fallback {
1019                            if let Some(ref tool_registry) = tools {
1020                                let allowed = {
1021                                    let map = personas.read().await;
1022                                    map.get(&work.persona_id)
1023                                        .map(|p| p.policy.allowed_tools.clone())
1024                                        .unwrap_or_default()
1025                                };
1026                                if !allowed.is_empty() {
1027                                    let tool_results = executor::execute_tool_requests(
1028                                        thinker.as_deref(),
1029                                        tool_registry,
1030                                        &work.persona_id,
1031                                        &thought.thinking,
1032                                        &allowed,
1033                                    )
1034                                    .await;
1035
1036                                    for result_event in tool_results {
1037                                        new_events.push(result_event);
1038                                        // Mark progress for tool execution
1039                                        let mut map = personas.write().await;
1040                                        if let Some(p) = map.get_mut(&work.persona_id) {
1041                                            p.last_progress_at = Utc::now();
1042                                        }
1043                                    }
1044                                }
1045                            }
1046                        }
1047                    }
1048
1049                    if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
1050                        let gov = governance.read().await;
1051                        let proposal = Proposal {
1052                            id: Uuid::new_v4().to_string(),
1053                            persona_id: work.persona_id.clone(),
1054                            title: proposal_title_from_thought(
1055                                &thought.thinking,
1056                                work.thought_count,
1057                            ),
1058                            rationale: trim_for_storage(&thought.thinking, 900),
1059                            evidence_refs: vec!["internal.thought_stream".to_string()],
1060                            risk: ProposalRisk::Low,
1061                            status: ProposalStatus::Created,
1062                            created_at: Utc::now(),
1063                            votes: HashMap::new(),
1064                            vote_deadline: Some(
1065                                Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1066                            ),
1067                            votes_requested: false,
1068                            quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1069                                as usize,
1070                        };
1071
1072                        new_events.push(ThoughtEvent {
1073                            id: Uuid::new_v4().to_string(),
1074                            event_type: ThoughtEventType::ProposalCreated,
1075                            persona_id: Some(work.persona_id.clone()),
1076                            swarm_id: work.swarm_id.clone(),
1077                            timestamp: Utc::now(),
1078                            payload: json!({
1079                                "proposal_id": proposal.id,
1080                                "title": proposal.title,
1081                                "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1082                                "source": thought.source,
1083                                "model": thought.model,
1084                            }),
1085                        });
1086
1087                        new_proposals.push(proposal);
1088                    }
1089
1090                    if work.phase == ThoughtPhase::Compress {
1091                        new_snapshots.push(MemorySnapshot {
1092                            id: Uuid::new_v4().to_string(),
1093                            generated_at: Utc::now(),
1094                            swarm_id: work.swarm_id.clone(),
1095                            persona_scope: vec![work.persona_id.clone()],
1096                            summary: trim_for_storage(&thought.thinking, 1_500),
1097                            hot_event_count: context.len(),
1098                            warm_fact_count: estimate_fact_count(&thought.thinking),
1099                            cold_snapshot_count: 1,
1100                            metadata: HashMap::from([
1101                                (
1102                                    "phase".to_string(),
1103                                    serde_json::Value::String(work.phase.as_str().to_string()),
1104                                ),
1105                                (
1106                                    "role".to_string(),
1107                                    serde_json::Value::String(work.role.clone()),
1108                                ),
1109                                (
1110                                    "source".to_string(),
1111                                    serde_json::Value::String(thought.source.to_string()),
1112                                ),
1113                                (
1114                                    "model".to_string(),
1115                                    serde_json::Value::String(
1116                                        thought
1117                                            .model
1118                                            .clone()
1119                                            .unwrap_or_else(|| "fallback".to_string()),
1120                                    ),
1121                                ),
1122                                (
1123                                    "completion_tokens".to_string(),
1124                                    serde_json::Value::Number(serde_json::Number::from(
1125                                        thought.completion_tokens.unwrap_or(0) as u64,
1126                                    )),
1127                                ),
1128                            ]),
1129                        });
1130
1131                        // ── Step 3: Belief staleness decay in Compress phase ──
1132                        {
1133                            let mut belief_store = beliefs.write().await;
1134                            let mut attn_queue = attention_queue.write().await;
1135                            let stale_ids: Vec<String> = belief_store
1136                                .values()
1137                                .filter(|b| {
1138                                    b.status == BeliefStatus::Active && now > b.review_after
1139                                })
1140                                .map(|b| b.id.clone())
1141                                .collect();
1142                            for id in stale_ids {
1143                                if let Some(belief) = belief_store.get_mut(&id) {
1144                                    belief.decay();
1145                                    attn_queue.push(AttentionItem {
1146                                        id: Uuid::new_v4().to_string(),
1147                                        topic: format!("Stale belief: {}", belief.claim),
1148                                        topic_tags: vec![belief.belief_key.clone()],
1149                                        priority: 0.4,
1150                                        source_type: AttentionSource::StaleBelief,
1151                                        source_id: belief.id.clone(),
1152                                        assigned_persona: None,
1153                                        created_at: now,
1154                                        resolved_at: None,
1155                                    });
1156                                }
1157                            }
1158                        }
1159
1160                        // ── Step 4d: Update GlobalWorkspace ──
1161                        {
1162                            let belief_store = beliefs.read().await;
1163                            let attn_queue = attention_queue.read().await;
1164
1165                            let mut sorted_beliefs: Vec<&Belief> = belief_store
1166                                .values()
1167                                .filter(|b| b.status == BeliefStatus::Active)
1168                                .collect();
1169                            sorted_beliefs.sort_by(|a, b| {
1170                                let score_a = a.confidence
1171                                    * (1.0
1172                                        / (1.0
1173                                            + now.signed_duration_since(a.updated_at).num_minutes()
1174                                                as f32));
1175                                let score_b = b.confidence
1176                                    * (1.0
1177                                        / (1.0
1178                                            + now.signed_duration_since(b.updated_at).num_minutes()
1179                                                as f32));
1180                                score_b
1181                                    .partial_cmp(&score_a)
1182                                    .unwrap_or(std::cmp::Ordering::Equal)
1183                            });
1184
1185                            let top_beliefs: Vec<String> = sorted_beliefs
1186                                .iter()
1187                                .take(10)
1188                                .map(|b| b.id.clone())
1189                                .collect();
1190                            let top_uncertainties: Vec<String> = {
1191                                let mut uncertain: Vec<&Belief> = belief_store
1192                                    .values()
1193                                    .filter(|b| {
1194                                        b.status == BeliefStatus::Stale
1195                                            || !b.contested_by.is_empty()
1196                                    })
1197                                    .collect();
1198                                // Deterministic order: contested first, then lowest confidence,
1199                                // then oldest updated_at.
1200                                uncertain.sort_by(|a, b| {
1201                                    let a_contested = !a.contested_by.is_empty();
1202                                    let b_contested = !b.contested_by.is_empty();
1203                                    b_contested
1204                                        .cmp(&a_contested)
1205                                        .then_with(|| {
1206                                            a.confidence
1207                                                .partial_cmp(&b.confidence)
1208                                                .unwrap_or(std::cmp::Ordering::Equal)
1209                                        })
1210                                        .then_with(|| a.updated_at.cmp(&b.updated_at))
1211                                });
1212                                uncertain
1213                                    .iter()
1214                                    .take(5)
1215                                    .map(|b| {
1216                                        format!(
1217                                            "[{}] {}",
1218                                            b.belief_key,
1219                                            trim_for_storage(&b.claim, 80)
1220                                        )
1221                                    })
1222                                    .collect()
1223                            };
1224
1225                            let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1226                                .iter()
1227                                .filter(|a| a.resolved_at.is_none())
1228                                .collect();
1229                            sorted_attn.sort_by(|a, b| {
1230                                b.priority
1231                                    .partial_cmp(&a.priority)
1232                                    .unwrap_or(std::cmp::Ordering::Equal)
1233                            });
1234                            let top_attention: Vec<String> =
1235                                sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1236
1237                            let mut ws = workspace.write().await;
1238                            ws.top_beliefs = top_beliefs;
1239                            ws.top_uncertainties = top_uncertainties;
1240                            ws.top_attention = top_attention;
1241                            ws.updated_at = now;
1242                        }
1243
1244                        new_events.push(ThoughtEvent {
1245                            id: Uuid::new_v4().to_string(),
1246                            event_type: ThoughtEventType::WorkspaceUpdated,
1247                            persona_id: Some(work.persona_id.clone()),
1248                            swarm_id: work.swarm_id.clone(),
1249                            timestamp: Utc::now(),
1250                            payload: json!({ "updated": true }),
1251                        });
1252                    }
1253                }
1254
1255                // ── Step 4c: Governance — resolve proposals ──
1256                {
1257                    let gov = governance.read().await;
1258                    let mut proposal_store = proposals.write().await;
1259                    let persona_map = personas.read().await;
1260
1261                    let proposal_ids: Vec<String> = proposal_store
1262                        .values()
1263                        .filter(|p| p.status == ProposalStatus::Created)
1264                        .map(|p| p.id.clone())
1265                        .collect();
1266
1267                    let mut attn_queue = attention_queue.write().await;
1268                    for pid in proposal_ids {
1269                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1270                            let quorum_needed = proposal.quorum_needed.max(1);
1271
1272                            // Check vote deadline
1273                            if let Some(deadline) = proposal.vote_deadline {
1274                                if now > deadline {
1275                                    if proposal.votes.len() < quorum_needed {
1276                                        // Timeout without quorum → attention item
1277                                        attn_queue.push(AttentionItem {
1278                                            id: Uuid::new_v4().to_string(),
1279                                            topic: format!(
1280                                                "Proposal vote timeout: {}",
1281                                                proposal.title
1282                                            ),
1283                                            topic_tags: Vec::new(),
1284                                            priority: 0.6,
1285                                            source_type: AttentionSource::ProposalTimeout,
1286                                            source_id: proposal.id.clone(),
1287                                            assigned_persona: None,
1288                                            created_at: now,
1289                                            resolved_at: None,
1290                                        });
1291                                        proposal.status = ProposalStatus::Rejected;
1292                                        continue;
1293                                    }
1294
1295                                    // Deadline passed with quorum but missing required approvers → reject
1296                                    let required_roles = gov
1297                                        .required_approvers_by_role
1298                                        .get(&proposal.risk)
1299                                        .cloned()
1300                                        .unwrap_or_default();
1301                                    let all_required_met = required_roles.iter().all(|role| {
1302                                        proposal.votes.iter().any(|(vid, vote)| {
1303                                            *vote == ProposalVote::Approve
1304                                                && persona_map
1305                                                    .get(vid)
1306                                                    .map(|p| &p.identity.role == role)
1307                                                    .unwrap_or(false)
1308                                        })
1309                                    });
1310                                    if !all_required_met {
1311                                        attn_queue.push(AttentionItem {
1312                                            id: Uuid::new_v4().to_string(),
1313                                            topic: format!(
1314                                                "Missing required approvers: {}",
1315                                                proposal.title
1316                                            ),
1317                                            topic_tags: Vec::new(),
1318                                            priority: 0.7,
1319                                            source_type: AttentionSource::ProposalTimeout,
1320                                            source_id: proposal.id.clone(),
1321                                            assigned_persona: None,
1322                                            created_at: now,
1323                                            resolved_at: None,
1324                                        });
1325                                        proposal.status = ProposalStatus::Rejected;
1326                                        continue;
1327                                    }
1328                                }
1329                            }
1330
1331                            // Resolve if enough votes
1332                            if proposal.votes.len() >= quorum_needed {
1333                                // Check for veto
1334                                let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1335                                    if *vote != ProposalVote::Veto {
1336                                        return false;
1337                                    }
1338                                    if let Some(voter) = persona_map.get(voter_id) {
1339                                        gov.veto_roles.contains(&voter.identity.role)
1340                                    } else {
1341                                        false
1342                                    }
1343                                });
1344
1345                                if vetoed {
1346                                    proposal.status = ProposalStatus::Rejected;
1347                                    continue;
1348                                }
1349
1350                                // Check required approvers
1351                                let required_roles = gov
1352                                    .required_approvers_by_role
1353                                    .get(&proposal.risk)
1354                                    .cloned()
1355                                    .unwrap_or_default();
1356                                let all_required_met = required_roles.iter().all(|role| {
1357                                    proposal.votes.iter().any(|(vid, vote)| {
1358                                        *vote == ProposalVote::Approve
1359                                            && persona_map
1360                                                .get(vid)
1361                                                .map(|p| &p.identity.role == role)
1362                                                .unwrap_or(false)
1363                                    })
1364                                });
1365
1366                                if !all_required_met {
1367                                    continue; // wait for required approvers
1368                                }
1369
1370                                // Count approvals vs rejections
1371                                let approvals = proposal
1372                                    .votes
1373                                    .values()
1374                                    .filter(|v| **v == ProposalVote::Approve)
1375                                    .count();
1376                                let rejections = proposal
1377                                    .votes
1378                                    .values()
1379                                    .filter(|v| **v == ProposalVote::Reject)
1380                                    .count();
1381
1382                                if approvals > rejections {
1383                                    proposal.status = ProposalStatus::Verified;
1384                                } else {
1385                                    proposal.status = ProposalStatus::Rejected;
1386                                }
1387                            }
1388                        }
1389                    }
1390                }
1391
1392                // ── Step 7: Execute verified proposals ──
1393                {
1394                    let mut proposal_store = proposals.write().await;
1395                    let verified_ids: Vec<String> = proposal_store
1396                        .values()
1397                        .filter(|p| p.status == ProposalStatus::Verified)
1398                        .map(|p| p.id.clone())
1399                        .collect();
1400
1401                    for pid in verified_ids {
1402                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1403                            if proposal.risk == ProposalRisk::Critical {
1404                                // Check for human approval
1405                                let approved = {
1406                                    let approvals = pending_approvals.read().await;
1407                                    approvals.get(&pid).copied().unwrap_or(false)
1408                                };
1409                                if !approved {
1410                                    // Register for human approval
1411                                    let mut approvals = pending_approvals.write().await;
1412                                    approvals.entry(pid.clone()).or_insert(false);
1413                                    continue;
1414                                }
1415                            }
1416
1417                            // Create decision receipt
1418                            let receipt = executor::DecisionReceipt {
1419                                id: Uuid::new_v4().to_string(),
1420                                proposal_id: pid.clone(),
1421                                inputs: proposal.evidence_refs.clone(),
1422                                governance_decision: format!(
1423                                    "Approved with {} votes",
1424                                    proposal.votes.len()
1425                                ),
1426                                capability_leases: Vec::new(),
1427                                tool_invocations: Vec::new(),
1428                                outcome: executor::ExecutionOutcome::Success {
1429                                    summary: format!("Proposal '{}' executed", proposal.title),
1430                                },
1431                                created_at: Utc::now(),
1432                            };
1433
1434                            new_events.push(ThoughtEvent {
1435                                id: Uuid::new_v4().to_string(),
1436                                event_type: ThoughtEventType::ActionExecuted,
1437                                persona_id: Some(proposal.persona_id.clone()),
1438                                swarm_id: None,
1439                                timestamp: Utc::now(),
1440                                payload: json!({
1441                                    "receipt_id": receipt.id,
1442                                    "proposal_id": pid,
1443                                    "outcome": "success",
1444                                    "summary": format!("Proposal '{}' executed", proposal.title),
1445                                }),
1446                            });
1447
1448                            receipts.write().await.push(receipt);
1449                            proposal.status = ProposalStatus::Executed;
1450                        }
1451                    }
1452                }
1453
1454                if !new_proposals.is_empty() {
1455                    let mut proposal_store = proposals.write().await;
1456                    for proposal in new_proposals {
1457                        proposal_store.insert(proposal.id.clone(), proposal);
1458                    }
1459                }
1460
1461                for event in new_events {
1462                    push_event_internal(&events, max_events, &event_tx, event).await;
1463                }
1464                for snapshot in new_snapshots {
1465                    push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1466                }
1467
1468                // ── Step 2: Idle TTL reaping ──
1469                {
1470                    let mut map = personas.write().await;
1471                    let idle_ids: Vec<String> = map
1472                        .values()
1473                        .filter(|p| {
1474                            p.status == PersonaStatus::Active
1475                                && !p.budget_paused
1476                                && now.signed_duration_since(p.last_progress_at).num_seconds()
1477                                    > p.policy.idle_ttl_secs as i64
1478                        })
1479                        .map(|p| p.identity.id.clone())
1480                        .collect();
1481
1482                    for id in &idle_ids {
1483                        if let Some(persona) = map.get_mut(id) {
1484                            persona.status = PersonaStatus::Reaped;
1485                            persona.updated_at = now;
1486                        }
1487                        // Also cascade-reap children
1488                        let children: Vec<String> = map
1489                            .values()
1490                            .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1491                            .map(|p| p.identity.id.clone())
1492                            .collect();
1493                        for child_id in children {
1494                            if let Some(child) = map.get_mut(&child_id) {
1495                                child.status = PersonaStatus::Reaped;
1496                                child.updated_at = now;
1497                            }
1498                        }
1499                    }
1500                    drop(map);
1501
1502                    for id in idle_ids {
1503                        push_event_internal(
1504                            &events,
1505                            max_events,
1506                            &event_tx,
1507                            ThoughtEvent {
1508                                id: Uuid::new_v4().to_string(),
1509                                event_type: ThoughtEventType::IdleReaped,
1510                                persona_id: Some(id),
1511                                swarm_id: None,
1512                                timestamp: now,
1513                                payload: json!({ "reason": "idle_ttl_expired" }),
1514                            },
1515                        )
1516                        .await;
1517                    }
1518                }
1519
1520                // ── Step 5: Persistence on Compress ──
1521                if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1522                    let _ = persistence::save_state(
1523                        &personas,
1524                        &proposals,
1525                        &beliefs,
1526                        &attention_queue,
1527                        &workspace,
1528                        &events,
1529                        &snapshots,
1530                    )
1531                    .await;
1532                }
1533
1534                let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1535                next_tick += interval;
1536                let tick_completed = Instant::now();
1537                if tick_completed > next_tick {
1538                    next_tick = tick_completed;
1539                }
1540            }
1541        });
1542
1543        {
1544            let mut lock = self.loop_handle.lock().await;
1545            *lock = Some(handle);
1546        }
1547
1548        Ok(self.status().await)
1549    }
1550
1551    /// Stop the perpetual cognition loop.
1552    pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1553        self.running.store(false, Ordering::SeqCst);
1554
1555        if let Some(handle) = self.loop_handle.lock().await.take() {
1556            handle.abort();
1557            let _ = handle.await;
1558        }
1559
1560        if let Some(reason_value) = reason {
1561            let event = ThoughtEvent {
1562                id: Uuid::new_v4().to_string(),
1563                event_type: ThoughtEventType::CheckResult,
1564                persona_id: None,
1565                swarm_id: None,
1566                timestamp: Utc::now(),
1567                payload: json!({ "stopped": true, "reason": reason_value }),
1568            };
1569            self.push_event(event).await;
1570        }
1571
1572        Ok(self.status().await)
1573    }
1574
1575    /// Create a persona record.
1576    pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1577        let now = Utc::now();
1578        let mut personas = self.personas.write().await;
1579
1580        let mut parent_swarm_id = None;
1581        let mut computed_depth = 0_u32;
1582        let mut inherited_policy = None;
1583
1584        if let Some(parent_id) = req.parent_id.clone() {
1585            let parent = personas
1586                .get(&parent_id)
1587                .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1588
1589            if parent.status == PersonaStatus::Reaped {
1590                return Err(anyhow!("Parent persona {} is reaped", parent_id));
1591            }
1592
1593            parent_swarm_id = parent.identity.swarm_id.clone();
1594            computed_depth = parent.identity.depth.saturating_add(1);
1595            inherited_policy = Some(parent.policy.clone());
1596            let branch_limit = parent.policy.max_branching_factor;
1597
1598            let child_count = personas
1599                .values()
1600                .filter(|p| {
1601                    p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1602                        && p.status != PersonaStatus::Reaped
1603                })
1604                .count();
1605
1606            if child_count as u32 >= branch_limit {
1607                return Err(anyhow!(
1608                    "Parent {} reached branching limit {}",
1609                    parent_id,
1610                    branch_limit
1611                ));
1612            }
1613        }
1614
1615        let policy = req
1616            .policy
1617            .clone()
1618            .or(inherited_policy.clone())
1619            .unwrap_or_else(|| self.default_policy.clone());
1620
1621        let effective_depth_limit = inherited_policy
1622            .as_ref()
1623            .map(|p| p.max_spawn_depth)
1624            .unwrap_or(policy.max_spawn_depth);
1625
1626        if computed_depth > effective_depth_limit {
1627            return Err(anyhow!(
1628                "Spawn depth {} exceeds limit {}",
1629                computed_depth,
1630                effective_depth_limit
1631            ));
1632        }
1633
1634        let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1635        if personas.contains_key(&persona_id) {
1636            return Err(anyhow!("Persona id already exists: {}", persona_id));
1637        }
1638
1639        let identity = PersonaIdentity {
1640            id: persona_id.clone(),
1641            name: req.name,
1642            role: req.role,
1643            charter: req.charter,
1644            swarm_id: req.swarm_id.or(parent_swarm_id),
1645            parent_id: req.parent_id,
1646            depth: computed_depth,
1647            created_at: now,
1648            tags: req.tags,
1649        };
1650
1651        let persona = PersonaRuntimeState {
1652            identity,
1653            policy,
1654            status: PersonaStatus::Active,
1655            thought_count: 0,
1656            last_tick_at: None,
1657            updated_at: now,
1658            tokens_this_window: 0,
1659            compute_ms_this_window: 0,
1660            window_started_at: now,
1661            last_progress_at: now,
1662            budget_paused: false,
1663        };
1664
1665        personas.insert(persona_id, persona.clone());
1666        drop(personas);
1667
1668        self.push_event(ThoughtEvent {
1669            id: Uuid::new_v4().to_string(),
1670            event_type: ThoughtEventType::PersonaSpawned,
1671            persona_id: Some(persona.identity.id.clone()),
1672            swarm_id: persona.identity.swarm_id.clone(),
1673            timestamp: now,
1674            payload: json!({
1675                "name": persona.identity.name,
1676                "role": persona.identity.role,
1677                "depth": persona.identity.depth,
1678            }),
1679        })
1680        .await;
1681
1682        Ok(persona)
1683    }
1684
1685    /// Spawn a child persona under an existing parent.
1686    pub async fn spawn_child(
1687        &self,
1688        parent_id: &str,
1689        req: SpawnPersonaRequest,
1690    ) -> Result<PersonaRuntimeState> {
1691        let request = CreatePersonaRequest {
1692            persona_id: req.persona_id,
1693            name: req.name,
1694            role: req.role,
1695            charter: req.charter,
1696            swarm_id: req.swarm_id,
1697            parent_id: Some(parent_id.to_string()),
1698            policy: req.policy,
1699            tags: Vec::new(),
1700        };
1701        self.create_persona(request).await
1702    }
1703
1704    /// Reap one persona or the full descendant tree.
1705    pub async fn reap_persona(
1706        &self,
1707        persona_id: &str,
1708        req: ReapPersonaRequest,
1709    ) -> Result<ReapPersonaResponse> {
1710        let cascade = req.cascade.unwrap_or(false);
1711        let now = Utc::now();
1712
1713        let mut personas = self.personas.write().await;
1714        if !personas.contains_key(persona_id) {
1715            return Err(anyhow!("Persona not found: {}", persona_id));
1716        }
1717
1718        let mut reaped_ids = vec![persona_id.to_string()];
1719        if cascade {
1720            let mut idx = 0usize;
1721            while idx < reaped_ids.len() {
1722                let current = reaped_ids[idx].clone();
1723                let children: Vec<String> = personas
1724                    .values()
1725                    .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1726                    .map(|p| p.identity.id.clone())
1727                    .collect();
1728                for child in children {
1729                    if !reaped_ids.iter().any(|existing| existing == &child) {
1730                        reaped_ids.push(child);
1731                    }
1732                }
1733                idx += 1;
1734            }
1735        }
1736
1737        for id in &reaped_ids {
1738            if let Some(persona) = personas.get_mut(id) {
1739                persona.status = PersonaStatus::Reaped;
1740                persona.updated_at = now;
1741            }
1742        }
1743        drop(personas);
1744
1745        for id in &reaped_ids {
1746            self.push_event(ThoughtEvent {
1747                id: Uuid::new_v4().to_string(),
1748                event_type: ThoughtEventType::PersonaReaped,
1749                persona_id: Some(id.clone()),
1750                swarm_id: None,
1751                timestamp: now,
1752                payload: json!({
1753                    "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1754                    "cascade": cascade,
1755                }),
1756            })
1757            .await;
1758        }
1759
1760        Ok(ReapPersonaResponse {
1761            count: reaped_ids.len(),
1762            reaped_ids,
1763        })
1764    }
1765
1766    /// Get latest memory snapshot, if any.
1767    pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1768        self.snapshots.read().await.back().cloned()
1769    }
1770
1771    /// Build lineage graph from current persona state.
1772    pub async fn lineage_graph(&self) -> LineageGraph {
1773        let personas = self.personas.read().await;
1774        let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1775        let mut roots = Vec::new();
1776        let mut total_edges = 0usize;
1777
1778        for persona in personas.values() {
1779            if let Some(parent_id) = persona.identity.parent_id.clone() {
1780                children_by_parent
1781                    .entry(parent_id)
1782                    .or_default()
1783                    .push(persona.identity.id.clone());
1784                total_edges = total_edges.saturating_add(1);
1785            } else {
1786                roots.push(persona.identity.id.clone());
1787            }
1788        }
1789
1790        let mut nodes: Vec<LineageNode> = personas
1791            .values()
1792            .map(|persona| {
1793                let mut children = children_by_parent
1794                    .get(&persona.identity.id)
1795                    .cloned()
1796                    .unwrap_or_default();
1797                children.sort();
1798
1799                LineageNode {
1800                    persona_id: persona.identity.id.clone(),
1801                    parent_id: persona.identity.parent_id.clone(),
1802                    children,
1803                    depth: persona.identity.depth,
1804                    status: persona.status,
1805                }
1806            })
1807            .collect();
1808
1809        nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1810        roots.sort();
1811
1812        LineageGraph {
1813            nodes,
1814            roots,
1815            total_edges,
1816        }
1817    }
1818
1819    /// Return a summary status.
1820    pub async fn status(&self) -> CognitionStatus {
1821        let personas = self.personas.read().await;
1822        let events = self.events.read().await;
1823        let snapshots = self.snapshots.read().await;
1824        let started_at = *self.started_at.read().await;
1825        let last_tick_at = *self.last_tick_at.read().await;
1826        let loop_interval_ms = *self.loop_interval_ms.read().await;
1827
1828        let active_persona_count = personas
1829            .values()
1830            .filter(|p| p.status == PersonaStatus::Active)
1831            .count();
1832
1833        CognitionStatus {
1834            enabled: self.enabled,
1835            running: self.running.load(Ordering::SeqCst),
1836            loop_interval_ms,
1837            started_at,
1838            last_tick_at,
1839            persona_count: personas.len(),
1840            active_persona_count,
1841            events_buffered: events.len(),
1842            snapshots_buffered: snapshots.len(),
1843        }
1844    }
1845
1846    async fn push_event(&self, event: ThoughtEvent) {
1847        push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1848    }
1849
1850    /// Set the tool registry for capability-based tool execution.
1851    pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1852        self.tools = Some(registry);
1853    }
1854
1855    /// Get current beliefs.
1856    pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1857        self.beliefs.read().await.clone()
1858    }
1859
1860    /// Get a single belief by ID.
1861    pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1862        self.beliefs.read().await.get(id).cloned()
1863    }
1864
1865    /// Get the current attention queue.
1866    pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1867        self.attention_queue.read().await.clone()
1868    }
1869
1870    /// Get all proposals.
1871    pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1872        self.proposals.read().await.clone()
1873    }
1874
1875    /// Get the current global workspace.
1876    pub async fn get_workspace(&self) -> GlobalWorkspace {
1877        self.workspace.read().await.clone()
1878    }
1879
1880    /// Get decision receipts.
1881    pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1882        self.receipts.read().await.clone()
1883    }
1884
1885    /// Approve a Critical-risk proposal for execution.
1886    pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1887        let proposals = self.proposals.read().await;
1888        let proposal = proposals
1889            .get(proposal_id)
1890            .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1891
1892        if proposal.risk != ProposalRisk::Critical {
1893            return Err(anyhow!("Only Critical proposals require human approval"));
1894        }
1895        if proposal.status != ProposalStatus::Verified {
1896            return Err(anyhow!("Proposal is not in Verified status"));
1897        }
1898        drop(proposals);
1899
1900        let mut approvals = self.pending_approvals.write().await;
1901        approvals.insert(proposal_id.to_string(), true);
1902        Ok(())
1903    }
1904
1905    /// Get governance settings.
1906    pub async fn get_governance(&self) -> SwarmGovernance {
1907        self.governance.read().await.clone()
1908    }
1909
1910    /// Get persona state for a specific persona.
1911    pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1912        self.personas.read().await.get(id).cloned()
1913    }
1914}
1915
1916async fn push_event_internal(
1917    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1918    max_events: usize,
1919    event_tx: &broadcast::Sender<ThoughtEvent>,
1920    event: ThoughtEvent,
1921) {
1922    {
1923        let mut lock = events.write().await;
1924        lock.push_back(event.clone());
1925        while lock.len() > max_events {
1926            lock.pop_front();
1927        }
1928    }
1929    let _ = event_tx.send(event);
1930}
1931
1932async fn push_snapshot_internal(
1933    snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1934    max_snapshots: usize,
1935    snapshot: MemorySnapshot,
1936) {
1937    let mut lock = snapshots.write().await;
1938    lock.push_back(snapshot);
1939    while lock.len() > max_snapshots {
1940        lock.pop_front();
1941    }
1942}
1943
1944async fn recent_persona_context(
1945    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1946    persona_id: &str,
1947    limit: usize,
1948) -> Vec<ThoughtEvent> {
1949    let lock = events.read().await;
1950    let mut selected: Vec<ThoughtEvent> = lock
1951        .iter()
1952        .rev()
1953        .filter(|event| {
1954            event.persona_id.as_deref() == Some(persona_id)
1955                || (event.persona_id.is_none()
1956                    && matches!(
1957                        event.event_type,
1958                        ThoughtEventType::CheckResult
1959                            | ThoughtEventType::ProposalCreated
1960                            | ThoughtEventType::SnapshotCompressed
1961                            | ThoughtEventType::WorkspaceUpdated
1962                            | ThoughtEventType::ActionExecuted
1963                            | ThoughtEventType::BudgetPaused
1964                    ))
1965        })
1966        .take(limit)
1967        .cloned()
1968        .collect();
1969    selected.reverse();
1970    selected
1971}
1972
1973async fn generate_phase_thought(
1974    thinker: Option<&ThinkerClient>,
1975    work: &ThoughtWorkItem,
1976    context: &[ThoughtEvent],
1977) -> ThoughtResult {
1978    let started_at = Instant::now();
1979    if let Some(client) = thinker {
1980        let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1981        match client.think(&system_prompt, &user_prompt).await {
1982            Ok(output) => {
1983                let thinking = normalize_thought_output(work, context, &output.text);
1984                if !thinking.is_empty() {
1985                    return ThoughtResult {
1986                        source: "model",
1987                        model: Some(output.model),
1988                        finish_reason: output.finish_reason,
1989                        thinking,
1990                        prompt_tokens: output.prompt_tokens,
1991                        completion_tokens: output.completion_tokens,
1992                        total_tokens: output.total_tokens,
1993                        latency_ms: started_at.elapsed().as_millis(),
1994                        error: None,
1995                    };
1996                }
1997            }
1998            Err(error) => {
1999                return ThoughtResult {
2000                    source: "fallback",
2001                    model: None,
2002                    finish_reason: None,
2003                    thinking: fallback_phase_text(work, context),
2004                    prompt_tokens: None,
2005                    completion_tokens: None,
2006                    total_tokens: None,
2007                    latency_ms: started_at.elapsed().as_millis(),
2008                    error: Some(error.to_string()),
2009                };
2010            }
2011        }
2012    }
2013
2014    ThoughtResult {
2015        source: "fallback",
2016        model: None,
2017        finish_reason: None,
2018        thinking: fallback_phase_text(work, context),
2019        prompt_tokens: None,
2020        completion_tokens: None,
2021        total_tokens: None,
2022        latency_ms: started_at.elapsed().as_millis(),
2023        error: None,
2024    }
2025}
2026
2027fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
2028    let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
2029Respond with concise plain text only. Do not include markdown, XML, or code fences. \
2030Write as an operational process update, not meta narration. \
2031Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
2032Output concrete findings, checks, risks, and next actions. \
2033Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
2034        .to_string();
2035
2036    let context_lines = if context.is_empty() {
2037        "none".to_string()
2038    } else {
2039        context
2040            .iter()
2041            .map(format_context_event)
2042            .collect::<Vec<_>>()
2043            .join("\n")
2044    };
2045
2046    let phase_instruction = match work.phase {
2047        ThoughtPhase::Observe => {
2048            "Process format (exact line labels): \
2049Phase: Observe | Goal: detect current customer/business risk | \
2050Signals: 1-3 concrete signals separated by '; ' | \
2051Uncertainty: one unknown that blocks confidence | \
2052Next_Action: one immediate operational action."
2053        }
2054        ThoughtPhase::Reflect => {
2055            "Process format (exact line labels): \
2056Phase: Reflect | Hypothesis: single testable hypothesis | \
2057Rationale: why this is likely | \
2058Business_Risk: customer/revenue/SLA impact | \
2059Validation_Next_Action: one action to confirm or falsify."
2060        }
2061        ThoughtPhase::Test => {
2062            "Process format (exact line labels): \
2063Phase: Test | Check: single concrete check | \
2064Procedure: short executable procedure | \
2065Expected_Result: pass/fail expectation | \
2066Evidence_Quality: low|medium|high with reason | \
2067Escalation_Trigger: when to escalate immediately."
2068        }
2069        ThoughtPhase::Compress => {
2070            "Process format (exact line labels): \
2071Phase: Compress | State_Summary: current state in one line | \
2072Retained_Facts: 3 short facts separated by '; ' | \
2073Open_Risks: up to 2 unresolved risks separated by '; ' | \
2074Next_Process_Step: next operational step."
2075        }
2076    };
2077
2078    let user_prompt = format!(
2079        "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2080        phase = work.phase.as_str(),
2081        persona_id = work.persona_id,
2082        persona_name = work.persona_name,
2083        role = work.role,
2084        charter = work.charter,
2085        thought_count = work.thought_count,
2086        context = context_lines,
2087        instruction = phase_instruction
2088    );
2089
2090    (system_prompt, user_prompt)
2091}
2092
2093fn format_context_event(event: &ThoughtEvent) -> String {
2094    let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2095    format!(
2096        "{} {} {}",
2097        event.event_type.as_str(),
2098        event.timestamp.to_rfc3339(),
2099        trim_for_storage(&payload, 220)
2100    )
2101}
2102
2103fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2104    let charter = trim_for_storage(&work.charter, 180);
2105    let context_summary = fallback_context_summary(context);
2106    let thought = match work.phase {
2107        ThoughtPhase::Observe => format!(
2108            "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2109            work.role, charter, context_summary
2110        ),
2111        ThoughtPhase::Reflect => format!(
2112            "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
2113        ),
2114        ThoughtPhase::Test => format!(
2115            "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
2116        ),
2117        ThoughtPhase::Compress => format!(
2118            "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2119            work.role, charter, context_summary
2120        ),
2121    };
2122    trim_for_storage(&thought, 1_200)
2123}
2124
2125fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2126    let trimmed = trim_for_storage(raw, 2_000);
2127    if trimmed.trim().is_empty() {
2128        return fallback_phase_text(work, context);
2129    }
2130
2131    // Prefer process-labeled content if the model emitted a preamble first.
2132    if let Some(idx) = find_process_label_start(&trimmed) {
2133        let candidate = trimmed[idx..].trim();
2134        // Return the full candidate (all structured fields), not just the first line.
2135        if !candidate.is_empty()
2136            && !candidate.contains('<')
2137            && !has_template_placeholder_values(candidate)
2138        {
2139            // If it's a multi-line structured output, collapse to pipe-delimited single line.
2140            let collapsed: String = candidate
2141                .lines()
2142                .map(str::trim)
2143                .filter(|l| !l.is_empty())
2144                .collect::<Vec<_>>()
2145                .join(" | ");
2146            let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2147            if cleaned.starts_with("Phase:") {
2148                return cleaned.to_string();
2149            }
2150            return collapsed;
2151        }
2152    }
2153
2154    let lower = trimmed.to_ascii_lowercase();
2155    let looks_meta = lower.starts_with("we need")
2156        || lower.starts_with("i need")
2157        || lower.contains("we need to")
2158        || lower.contains("i need to")
2159        || lower.contains("must output")
2160        || lower.contains("let's ")
2161        || lower.contains("we have to");
2162
2163    if looks_meta || has_template_placeholder_values(&trimmed) {
2164        return fallback_phase_text(work, context);
2165    }
2166
2167    trimmed
2168}
2169
2170fn has_template_placeholder_values(text: &str) -> bool {
2171    let lower = text.to_ascii_lowercase();
2172    [
2173        "goal: ...",
2174        "signals: ...",
2175        "uncertainty: ...",
2176        "next_action: ...",
2177        "hypothesis: ...",
2178        "rationale: ...",
2179        "business_risk: ...",
2180        "validation_next_action: ...",
2181        "check: ...",
2182        "procedure: ...",
2183        "expected_result: ...",
2184        "evidence_quality: ...",
2185        "escalation_trigger: ...",
2186        "state_summary: ...",
2187        "retained_facts: ...",
2188        "open_risks: ...",
2189        "next_process_step: ...",
2190    ]
2191    .iter()
2192    .any(|needle| lower.contains(needle))
2193        || lower.contains("<...")
2194        || lower.contains("tbd")
2195        || lower.contains("todo")
2196}
2197
2198fn find_process_label_start(text: &str) -> Option<usize> {
2199    [
2200        "Phase: Observe",
2201        "Phase: Reflect",
2202        "Phase: Test",
2203        "Phase: Compress",
2204        "Phase:",
2205    ]
2206    .iter()
2207    .filter_map(|label| text.find(label))
2208    .min()
2209}
2210
2211fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2212    if context.is_empty() {
2213        return "No prior events recorded yet.".to_string();
2214    }
2215
2216    let mut latest_error: Option<String> = None;
2217    let mut latest_proposal: Option<String> = None;
2218    let mut latest_check: Option<String> = None;
2219
2220    for event in context.iter().rev() {
2221        if latest_error.is_none()
2222            && let Some(error) = event
2223                .payload
2224                .get("error")
2225                .and_then(serde_json::Value::as_str)
2226            && !error.trim().is_empty()
2227        {
2228            latest_error = Some(trim_for_storage(error, 140));
2229        }
2230
2231        if latest_proposal.is_none()
2232            && event.event_type == ThoughtEventType::ProposalCreated
2233            && let Some(title) = event
2234                .payload
2235                .get("title")
2236                .and_then(serde_json::Value::as_str)
2237            && !title.trim().is_empty()
2238            && !has_template_placeholder_values(title)
2239        {
2240            latest_proposal = Some(trim_for_storage(title, 120));
2241        }
2242
2243        if latest_check.is_none()
2244            && event.event_type == ThoughtEventType::CheckResult
2245            && let Some(result) = event
2246                .payload
2247                .get("result_excerpt")
2248                .and_then(serde_json::Value::as_str)
2249            && !result.trim().is_empty()
2250            && !has_template_placeholder_values(result)
2251        {
2252            latest_check = Some(trim_for_storage(result, 140));
2253        }
2254
2255        if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2256            break;
2257        }
2258    }
2259
2260    let mut lines = vec![format!(
2261        "{} recent cognition events are available.",
2262        context.len()
2263    )];
2264    if let Some(error) = latest_error {
2265        lines.push(format!("Latest error signal: {}.", error));
2266    }
2267    if let Some(proposal) = latest_proposal {
2268        lines.push(format!("Recent proposal: {}.", proposal));
2269    }
2270    if let Some(check) = latest_check {
2271        lines.push(format!("Recent check: {}.", check));
2272    }
2273    lines.join(" ")
2274}
2275
2276fn trim_for_storage(input: &str, max_chars: usize) -> String {
2277    if input.chars().count() <= max_chars {
2278        return input.trim().to_string();
2279    }
2280    let mut trimmed = String::with_capacity(max_chars + 8);
2281    for ch in input.chars().take(max_chars) {
2282        trimmed.push(ch);
2283    }
2284    trimmed.push_str("...");
2285    trimmed.trim().to_string()
2286}
2287
2288fn estimate_fact_count(text: &str) -> usize {
2289    let sentence_count =
2290        text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2291    sentence_count.clamp(1, 12)
2292}
2293
2294fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2295    let first_line = thought
2296        .lines()
2297        .find(|line| !line.trim().is_empty())
2298        .unwrap_or("proposal");
2299    let compact = first_line
2300        .replace(['\t', '\r', '\n'], " ")
2301        .split_whitespace()
2302        .collect::<Vec<_>>()
2303        .join(" ");
2304    let trimmed = trim_for_storage(&compact, 72);
2305    if trimmed.is_empty() {
2306        format!("proposal-{}", thought_count)
2307    } else {
2308        trimmed
2309    }
2310}
2311
2312fn default_seed_persona() -> CreatePersonaRequest {
2313    CreatePersonaRequest {
2314        persona_id: Some("root-thinker".to_string()),
2315        name: "root-thinker".to_string(),
2316        role: "orchestrator".to_string(),
2317        charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2318            .to_string(),
2319        swarm_id: Some("swarm-core".to_string()),
2320        parent_id: None,
2321        policy: None,
2322        tags: vec!["orchestration".to_string()],
2323    }
2324}
2325
2326fn normalize_thinker_endpoint(base_url: &str) -> String {
2327    let trimmed = base_url.trim().trim_end_matches('/');
2328    if trimmed.ends_with("/chat/completions") {
2329        return trimmed.to_string();
2330    }
2331    if trimmed.is_empty() {
2332        return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2333    }
2334    format!("{}/chat/completions", trimmed)
2335}
2336
2337fn env_bool(name: &str, default: bool) -> bool {
2338    std::env::var(name)
2339        .ok()
2340        .and_then(|v| match v.to_ascii_lowercase().as_str() {
2341            "1" | "true" | "yes" | "on" => Some(true),
2342            "0" | "false" | "no" | "off" => Some(false),
2343            _ => None,
2344        })
2345        .unwrap_or(default)
2346}
2347
2348fn env_f32(name: &str, default: f32) -> f32 {
2349    std::env::var(name)
2350        .ok()
2351        .and_then(|v| v.parse::<f32>().ok())
2352        .unwrap_or(default)
2353}
2354
2355fn env_u64(name: &str, default: u64) -> u64 {
2356    std::env::var(name)
2357        .ok()
2358        .and_then(|v| v.parse::<u64>().ok())
2359        .unwrap_or(default)
2360}
2361
2362fn env_u32(name: &str, default: u32) -> u32 {
2363    std::env::var(name)
2364        .ok()
2365        .and_then(|v| v.parse::<u32>().ok())
2366        .unwrap_or(default)
2367}
2368
2369fn env_usize(name: &str, default: usize) -> usize {
2370    std::env::var(name)
2371        .ok()
2372        .and_then(|v| v.parse::<usize>().ok())
2373        .unwrap_or(default)
2374}
2375
2376#[cfg(test)]
2377mod tests {
2378    use super::*;
2379
2380    fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2381        ThoughtWorkItem {
2382            persona_id: "p-1".to_string(),
2383            persona_name: "Spotlessbinco Business Thinker".to_string(),
2384            role: "principal reliability engineer".to_string(),
2385            charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2386                .to_string(),
2387            swarm_id: Some("spotlessbinco".to_string()),
2388            thought_count: 4,
2389            phase,
2390        }
2391    }
2392
2393    fn test_runtime() -> CognitionRuntime {
2394        CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2395            enabled: true,
2396            loop_interval_ms: 25,
2397            max_events: 256,
2398            max_snapshots: 32,
2399            default_policy: PersonaPolicy {
2400                max_spawn_depth: 2,
2401                max_branching_factor: 2,
2402                token_budget_per_minute: 1_000,
2403                compute_ms_per_minute: 1_000,
2404                idle_ttl_secs: 300,
2405                share_memory: false,
2406                allowed_tools: Vec::new(),
2407            },
2408        })
2409    }
2410
2411    #[test]
2412    fn normalize_rejects_placeholder_process_line() {
2413        let work = sample_work_item(ThoughtPhase::Compress);
2414        let output = normalize_thought_output(
2415            &work,
2416            &[],
2417            "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2418        );
2419        assert!(
2420            output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2421        );
2422        assert!(!output.contains("State_Summary: ..."));
2423    }
2424
2425    #[test]
2426    fn normalize_accepts_concrete_process_line() {
2427        let work = sample_work_item(ThoughtPhase::Test);
2428        let output = normalize_thought_output(
2429            &work,
2430            &[],
2431            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2432        );
2433        assert_eq!(
2434            output,
2435            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2436        );
2437    }
2438
2439    #[tokio::test]
2440    async fn create_spawn_and_lineage_work() {
2441        let runtime = test_runtime();
2442
2443        let root = runtime
2444            .create_persona(CreatePersonaRequest {
2445                persona_id: Some("root".to_string()),
2446                name: "root".to_string(),
2447                role: "orchestrator".to_string(),
2448                charter: "coordinate".to_string(),
2449                swarm_id: Some("swarm-a".to_string()),
2450                parent_id: None,
2451                policy: None,
2452                tags: Vec::new(),
2453            })
2454            .await
2455            .expect("root should be created");
2456
2457        assert_eq!(root.identity.depth, 0);
2458
2459        let child = runtime
2460            .spawn_child(
2461                "root",
2462                SpawnPersonaRequest {
2463                    persona_id: Some("child-1".to_string()),
2464                    name: "child-1".to_string(),
2465                    role: "analyst".to_string(),
2466                    charter: "analyze".to_string(),
2467                    swarm_id: None,
2468                    policy: None,
2469                },
2470            )
2471            .await
2472            .expect("child should spawn");
2473
2474        assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2475        assert_eq!(child.identity.depth, 1);
2476
2477        let lineage = runtime.lineage_graph().await;
2478        assert_eq!(lineage.total_edges, 1);
2479        assert_eq!(lineage.roots, vec!["root".to_string()]);
2480    }
2481
2482    #[tokio::test]
2483    async fn branching_and_depth_limits_are_enforced() {
2484        let runtime = test_runtime();
2485
2486        runtime
2487            .create_persona(CreatePersonaRequest {
2488                persona_id: Some("root".to_string()),
2489                name: "root".to_string(),
2490                role: "orchestrator".to_string(),
2491                charter: "coordinate".to_string(),
2492                swarm_id: Some("swarm-a".to_string()),
2493                parent_id: None,
2494                policy: None,
2495                tags: Vec::new(),
2496            })
2497            .await
2498            .expect("root should be created");
2499
2500        runtime
2501            .spawn_child(
2502                "root",
2503                SpawnPersonaRequest {
2504                    persona_id: Some("c1".to_string()),
2505                    name: "c1".to_string(),
2506                    role: "worker".to_string(),
2507                    charter: "run".to_string(),
2508                    swarm_id: None,
2509                    policy: None,
2510                },
2511            )
2512            .await
2513            .expect("first child should spawn");
2514
2515        runtime
2516            .spawn_child(
2517                "root",
2518                SpawnPersonaRequest {
2519                    persona_id: Some("c2".to_string()),
2520                    name: "c2".to_string(),
2521                    role: "worker".to_string(),
2522                    charter: "run".to_string(),
2523                    swarm_id: None,
2524                    policy: None,
2525                },
2526            )
2527            .await
2528            .expect("second child should spawn");
2529
2530        let third_child = runtime
2531            .spawn_child(
2532                "root",
2533                SpawnPersonaRequest {
2534                    persona_id: Some("c3".to_string()),
2535                    name: "c3".to_string(),
2536                    role: "worker".to_string(),
2537                    charter: "run".to_string(),
2538                    swarm_id: None,
2539                    policy: None,
2540                },
2541            )
2542            .await;
2543        assert!(third_child.is_err());
2544
2545        runtime
2546            .spawn_child(
2547                "c1",
2548                SpawnPersonaRequest {
2549                    persona_id: Some("c1-1".to_string()),
2550                    name: "c1-1".to_string(),
2551                    role: "worker".to_string(),
2552                    charter: "run".to_string(),
2553                    swarm_id: None,
2554                    policy: None,
2555                },
2556            )
2557            .await
2558            .expect("depth 2 should be allowed");
2559
2560        let depth_violation = runtime
2561            .spawn_child(
2562                "c1-1",
2563                SpawnPersonaRequest {
2564                    persona_id: Some("c1-1-1".to_string()),
2565                    name: "c1-1-1".to_string(),
2566                    role: "worker".to_string(),
2567                    charter: "run".to_string(),
2568                    swarm_id: None,
2569                    policy: None,
2570                },
2571            )
2572            .await;
2573        assert!(depth_violation.is_err());
2574    }
2575
2576    #[tokio::test]
2577    async fn start_stop_updates_runtime_status() {
2578        let runtime = test_runtime();
2579
2580        runtime
2581            .start(Some(StartCognitionRequest {
2582                loop_interval_ms: Some(10),
2583                seed_persona: Some(CreatePersonaRequest {
2584                    persona_id: Some("seed".to_string()),
2585                    name: "seed".to_string(),
2586                    role: "watcher".to_string(),
2587                    charter: "observe".to_string(),
2588                    swarm_id: Some("swarm-seed".to_string()),
2589                    parent_id: None,
2590                    policy: None,
2591                    tags: Vec::new(),
2592                }),
2593            }))
2594            .await
2595            .expect("runtime should start");
2596
2597        tokio::time::sleep(Duration::from_millis(60)).await;
2598        let running_status = runtime.status().await;
2599        assert!(running_status.running);
2600        assert!(running_status.events_buffered > 0);
2601
2602        runtime
2603            .stop(Some("test".to_string()))
2604            .await
2605            .expect("runtime should stop");
2606        let stopped_status = runtime.status().await;
2607        assert!(!stopped_status.running);
2608    }
2609
2610    #[tokio::test]
2611    async fn zero_budget_persona_is_skipped() {
2612        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2613            enabled: true,
2614            loop_interval_ms: 10,
2615            max_events: 256,
2616            max_snapshots: 32,
2617            default_policy: PersonaPolicy {
2618                max_spawn_depth: 2,
2619                max_branching_factor: 2,
2620                token_budget_per_minute: 0,
2621                compute_ms_per_minute: 0,
2622                idle_ttl_secs: 3_600,
2623                share_memory: false,
2624                allowed_tools: Vec::new(),
2625            },
2626        });
2627
2628        let persona = runtime
2629            .create_persona(CreatePersonaRequest {
2630                persona_id: Some("budget-test".to_string()),
2631                name: "budget-test".to_string(),
2632                role: "tester".to_string(),
2633                charter: "test budgets".to_string(),
2634                swarm_id: None,
2635                parent_id: None,
2636                policy: None,
2637                tags: Vec::new(),
2638            })
2639            .await
2640            .expect("should create persona");
2641
2642        assert_eq!(persona.tokens_this_window, 0);
2643        assert_eq!(persona.compute_ms_this_window, 0);
2644
2645        // Start and run a few ticks
2646        runtime.start(None).await.expect("should start");
2647        tokio::time::sleep(Duration::from_millis(50)).await;
2648        runtime.stop(None).await.expect("should stop");
2649
2650        // Persona should still have 0 thought count (budget blocked)
2651        let p = runtime.get_persona("budget-test").await.unwrap();
2652        assert_eq!(p.thought_count, 0);
2653        assert!(p.budget_paused);
2654    }
2655
2656    #[tokio::test]
2657    async fn budget_counters_reset_after_window() {
2658        let now = Utc::now();
2659        let mut persona = PersonaRuntimeState {
2660            identity: PersonaIdentity {
2661                id: "p1".to_string(),
2662                name: "test".to_string(),
2663                role: "tester".to_string(),
2664                charter: "test".to_string(),
2665                swarm_id: None,
2666                parent_id: None,
2667                depth: 0,
2668                created_at: now,
2669                tags: Vec::new(),
2670            },
2671            policy: PersonaPolicy::default(),
2672            status: PersonaStatus::Active,
2673            thought_count: 0,
2674            last_tick_at: None,
2675            updated_at: now,
2676            tokens_this_window: 5000,
2677            compute_ms_this_window: 3000,
2678            window_started_at: now - ChronoDuration::seconds(61),
2679            last_progress_at: now,
2680            budget_paused: false,
2681        };
2682
2683        // Simulate window reset check
2684        let window_elapsed = now
2685            .signed_duration_since(persona.window_started_at)
2686            .num_seconds();
2687        assert!(window_elapsed >= 60);
2688
2689        // Reset
2690        persona.tokens_this_window = 0;
2691        persona.compute_ms_this_window = 0;
2692        persona.window_started_at = now;
2693
2694        assert_eq!(persona.tokens_this_window, 0);
2695        assert_eq!(persona.compute_ms_this_window, 0);
2696    }
2697
2698    #[tokio::test]
2699    async fn idle_persona_is_reaped() {
2700        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2701            enabled: true,
2702            loop_interval_ms: 10,
2703            max_events: 256,
2704            max_snapshots: 32,
2705            default_policy: PersonaPolicy {
2706                max_spawn_depth: 2,
2707                max_branching_factor: 2,
2708                token_budget_per_minute: 20_000,
2709                compute_ms_per_minute: 10_000,
2710                idle_ttl_secs: 0, // 0 TTL = immediately idle
2711                share_memory: false,
2712                allowed_tools: Vec::new(),
2713            },
2714        });
2715
2716        runtime
2717            .create_persona(CreatePersonaRequest {
2718                persona_id: Some("idle-test".to_string()),
2719                name: "idle-test".to_string(),
2720                role: "idler".to_string(),
2721                charter: "idle away".to_string(),
2722                swarm_id: None,
2723                parent_id: None,
2724                policy: None,
2725                tags: Vec::new(),
2726            })
2727            .await
2728            .expect("should create persona");
2729
2730        // Manually set last_progress_at to the past
2731        {
2732            let mut personas = runtime.personas.write().await;
2733            if let Some(p) = personas.get_mut("idle-test") {
2734                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2735            }
2736        }
2737
2738        runtime.start(None).await.expect("should start");
2739        tokio::time::sleep(Duration::from_millis(100)).await;
2740        runtime.stop(None).await.expect("should stop");
2741
2742        let p = runtime.get_persona("idle-test").await.unwrap();
2743        assert_eq!(p.status, PersonaStatus::Reaped);
2744    }
2745
2746    #[tokio::test]
2747    async fn budget_paused_persona_not_reaped_for_idle() {
2748        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2749            enabled: true,
2750            loop_interval_ms: 10,
2751            max_events: 256,
2752            max_snapshots: 32,
2753            default_policy: PersonaPolicy {
2754                max_spawn_depth: 2,
2755                max_branching_factor: 2,
2756                token_budget_per_minute: 0, // zero budget = always paused
2757                compute_ms_per_minute: 0,
2758                idle_ttl_secs: 0, // 0 TTL
2759                share_memory: false,
2760                allowed_tools: Vec::new(),
2761            },
2762        });
2763
2764        runtime
2765            .create_persona(CreatePersonaRequest {
2766                persona_id: Some("paused-test".to_string()),
2767                name: "paused-test".to_string(),
2768                role: "pauser".to_string(),
2769                charter: "pause".to_string(),
2770                swarm_id: None,
2771                parent_id: None,
2772                policy: None,
2773                tags: Vec::new(),
2774            })
2775            .await
2776            .expect("should create persona");
2777
2778        // Set last_progress_at to the past to trigger idle check
2779        {
2780            let mut personas = runtime.personas.write().await;
2781            if let Some(p) = personas.get_mut("paused-test") {
2782                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2783            }
2784        }
2785
2786        runtime.start(None).await.expect("should start");
2787        tokio::time::sleep(Duration::from_millis(100)).await;
2788        runtime.stop(None).await.expect("should stop");
2789
2790        let p = runtime.get_persona("paused-test").await.unwrap();
2791        // Budget-paused personas are NOT reaped for idle
2792        assert_eq!(p.status, PersonaStatus::Active);
2793        assert!(p.budget_paused);
2794    }
2795
2796    #[tokio::test]
2797    async fn governance_proposal_resolution() {
2798        let runtime = test_runtime();
2799        let gov = SwarmGovernance {
2800            quorum_fraction: 0.5,
2801            required_approvers_by_role: HashMap::new(),
2802            veto_roles: vec!["auditor".to_string()],
2803            vote_timeout_secs: 300,
2804        };
2805        *runtime.governance.write().await = gov;
2806
2807        // Create two personas
2808        runtime
2809            .create_persona(CreatePersonaRequest {
2810                persona_id: Some("voter-1".to_string()),
2811                name: "voter-1".to_string(),
2812                role: "engineer".to_string(),
2813                charter: "vote".to_string(),
2814                swarm_id: None,
2815                parent_id: None,
2816                policy: None,
2817                tags: Vec::new(),
2818            })
2819            .await
2820            .unwrap();
2821        runtime
2822            .create_persona(CreatePersonaRequest {
2823                persona_id: Some("voter-2".to_string()),
2824                name: "voter-2".to_string(),
2825                role: "engineer".to_string(),
2826                charter: "vote".to_string(),
2827                swarm_id: None,
2828                parent_id: None,
2829                policy: None,
2830                tags: Vec::new(),
2831            })
2832            .await
2833            .unwrap();
2834
2835        // Insert a proposal with votes
2836        {
2837            let mut proposals = runtime.proposals.write().await;
2838            let mut votes = HashMap::new();
2839            votes.insert("voter-1".to_string(), ProposalVote::Approve);
2840            proposals.insert(
2841                "prop-1".to_string(),
2842                Proposal {
2843                    id: "prop-1".to_string(),
2844                    persona_id: "voter-1".to_string(),
2845                    title: "test proposal".to_string(),
2846                    rationale: "testing governance".to_string(),
2847                    evidence_refs: Vec::new(),
2848                    risk: ProposalRisk::Low,
2849                    status: ProposalStatus::Created,
2850                    created_at: Utc::now(),
2851                    votes,
2852                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2853                    votes_requested: true,
2854                    quorum_needed: 1,
2855                },
2856            );
2857        }
2858
2859        // quorum = 0.5 * 2 = 1, and we have 1 approval → should be verified
2860        runtime.start(None).await.unwrap();
2861        tokio::time::sleep(Duration::from_millis(100)).await;
2862        runtime.stop(None).await.unwrap();
2863
2864        let proposals = runtime.get_proposals().await;
2865        let prop = proposals.get("prop-1").unwrap();
2866        // Should be verified or executed (it gets verified then executed in same tick)
2867        assert!(
2868            prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2869            "Expected Verified or Executed, got {:?}",
2870            prop.status
2871        );
2872    }
2873
2874    #[tokio::test]
2875    async fn veto_rejects_proposal() {
2876        let runtime = test_runtime();
2877        let gov = SwarmGovernance {
2878            quorum_fraction: 0.5,
2879            required_approvers_by_role: HashMap::new(),
2880            veto_roles: vec!["auditor".to_string()],
2881            vote_timeout_secs: 300,
2882        };
2883        *runtime.governance.write().await = gov;
2884
2885        runtime
2886            .create_persona(CreatePersonaRequest {
2887                persona_id: Some("eng".to_string()),
2888                name: "eng".to_string(),
2889                role: "engineer".to_string(),
2890                charter: "build".to_string(),
2891                swarm_id: None,
2892                parent_id: None,
2893                policy: None,
2894                tags: Vec::new(),
2895            })
2896            .await
2897            .unwrap();
2898        runtime
2899            .create_persona(CreatePersonaRequest {
2900                persona_id: Some("aud".to_string()),
2901                name: "aud".to_string(),
2902                role: "auditor".to_string(),
2903                charter: "audit".to_string(),
2904                swarm_id: None,
2905                parent_id: None,
2906                policy: None,
2907                tags: Vec::new(),
2908            })
2909            .await
2910            .unwrap();
2911
2912        {
2913            let mut proposals = runtime.proposals.write().await;
2914            let mut votes = HashMap::new();
2915            votes.insert("eng".to_string(), ProposalVote::Approve);
2916            votes.insert("aud".to_string(), ProposalVote::Veto);
2917            proposals.insert(
2918                "prop-veto".to_string(),
2919                Proposal {
2920                    id: "prop-veto".to_string(),
2921                    persona_id: "eng".to_string(),
2922                    title: "vetoed proposal".to_string(),
2923                    rationale: "testing veto".to_string(),
2924                    evidence_refs: Vec::new(),
2925                    risk: ProposalRisk::Low,
2926                    status: ProposalStatus::Created,
2927                    created_at: Utc::now(),
2928                    votes,
2929                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2930                    votes_requested: true,
2931                    quorum_needed: 1,
2932                },
2933            );
2934        }
2935
2936        runtime.start(None).await.unwrap();
2937        tokio::time::sleep(Duration::from_millis(100)).await;
2938        runtime.stop(None).await.unwrap();
2939
2940        let proposals = runtime.get_proposals().await;
2941        let prop = proposals.get("prop-veto").unwrap();
2942        assert_eq!(prop.status, ProposalStatus::Rejected);
2943    }
2944
2945    #[test]
2946    fn global_workspace_default() {
2947        let ws = GlobalWorkspace::default();
2948        assert!(ws.top_beliefs.is_empty());
2949        assert!(ws.top_uncertainties.is_empty());
2950        assert!(ws.top_attention.is_empty());
2951    }
2952
2953    #[test]
2954    fn attention_item_creation() {
2955        let item = AttentionItem {
2956            id: "a1".to_string(),
2957            topic: "test topic".to_string(),
2958            topic_tags: vec!["reliability".to_string()],
2959            priority: 0.8,
2960            source_type: AttentionSource::ContestedBelief,
2961            source_id: "b1".to_string(),
2962            assigned_persona: None,
2963            created_at: Utc::now(),
2964            resolved_at: None,
2965        };
2966        assert!(item.resolved_at.is_none());
2967        assert_eq!(item.priority, 0.8);
2968    }
2969}