Skip to main content

codetether_agent/cognition/
mod.rs

1//! Perpetual cognition runtime for persona swarms.
2//!
3//! Phase 0 scope:
4//! - Contract types for personas, thought events, proposals, and snapshots
5//! - In-memory runtime manager for lifecycle + lineage
6//! - Feature-flagged perpetual loop (no external execution side effects)
7
8pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16#[cfg(not(feature = "functiongemma"))]
17#[allow(dead_code)]
18pub mod tool_router {
19    use super::thinker::CandleDevicePreference;
20    use crate::provider::{CompletionResponse, ToolDefinition};
21    use anyhow::Result;
22
23    /// No-op config when FunctionGemma support is not compiled in.
24    #[derive(Debug, Clone)]
25    pub struct ToolRouterConfig {
26        pub enabled: bool,
27        pub model_path: Option<String>,
28        pub tokenizer_path: Option<String>,
29        pub arch: String,
30        pub device: CandleDevicePreference,
31        pub max_tokens: usize,
32        pub temperature: f32,
33    }
34
35    impl Default for ToolRouterConfig {
36        fn default() -> Self {
37            Self {
38                enabled: false,
39                model_path: None,
40                tokenizer_path: None,
41                arch: "gemma3".to_string(),
42                device: CandleDevicePreference::Auto,
43                max_tokens: 128,
44                temperature: 0.1,
45            }
46        }
47    }
48
49    impl ToolRouterConfig {
50        pub fn from_env() -> Self {
51            Self::default()
52        }
53    }
54
55    /// No-op router when the `functiongemma` feature is disabled.
56    #[derive(Debug, Clone, Default)]
57    pub struct ToolCallRouter;
58
59    impl ToolCallRouter {
60        pub fn from_config(config: &ToolRouterConfig) -> Result<Option<Self>> {
61            if config.enabled {
62                tracing::debug!(
63                    "FunctionGemma requested but feature is disabled; rebuild with --features functiongemma"
64                );
65            }
66            Ok(None)
67        }
68
69        pub async fn maybe_reformat(
70            &self,
71            response: CompletionResponse,
72            _tools: &[ToolDefinition],
73            _model_supports_tools: bool,
74        ) -> CompletionResponse {
75            response
76        }
77    }
78}
79
80pub use thinker::{
81    CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
82};
83
84use crate::tool::ToolRegistry;
85use anyhow::{Result, anyhow};
86use beliefs::{Belief, BeliefStatus};
87use chrono::{DateTime, Duration as ChronoDuration, Utc};
88use serde::{Deserialize, Serialize};
89use serde_json::json;
90use std::collections::{HashMap, VecDeque};
91use std::sync::Arc;
92use std::sync::atomic::{AtomicBool, Ordering};
93use std::time::Duration;
94use tokio::sync::{Mutex, RwLock, broadcast};
95use tokio::task::JoinHandle;
96
97// Ensure re-exported types are referenced to suppress warnings.
98const _: () = {
99    fn _assert_types_used() {
100        let _ = std::mem::size_of::<CandleDevicePreference>();
101        let _ = std::mem::size_of::<ThinkerBackend>();
102        let _ = std::mem::size_of::<ThinkerOutput>();
103    }
104};
105use tokio::time::Instant;
106use uuid::Uuid;
107
108/// Persona execution status.
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub enum PersonaStatus {
112    Active,
113    Idle,
114    Reaped,
115    Error,
116}
117
118/// Policy boundaries for a persona.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct PersonaPolicy {
121    pub max_spawn_depth: u32,
122    pub max_branching_factor: u32,
123    pub token_budget_per_minute: u32,
124    pub compute_ms_per_minute: u32,
125    pub idle_ttl_secs: u64,
126    pub share_memory: bool,
127    #[serde(default)]
128    pub allowed_tools: Vec<String>,
129}
130
131impl Default for PersonaPolicy {
132    fn default() -> Self {
133        Self {
134            max_spawn_depth: 4,
135            max_branching_factor: 4,
136            token_budget_per_minute: 20_000,
137            compute_ms_per_minute: 10_000,
138            idle_ttl_secs: 3_600,
139            share_memory: false,
140            allowed_tools: Vec::new(),
141        }
142    }
143}
144
145/// Identity contract for a persona.
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct PersonaIdentity {
148    pub id: String,
149    pub name: String,
150    pub role: String,
151    pub charter: String,
152    pub swarm_id: Option<String>,
153    pub parent_id: Option<String>,
154    pub depth: u32,
155    pub created_at: DateTime<Utc>,
156    #[serde(default)]
157    pub tags: Vec<String>,
158}
159
160/// Full runtime state for a persona.
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct PersonaRuntimeState {
163    pub identity: PersonaIdentity,
164    pub policy: PersonaPolicy,
165    pub status: PersonaStatus,
166    pub thought_count: u64,
167    pub last_tick_at: Option<DateTime<Utc>>,
168    pub updated_at: DateTime<Utc>,
169    /// Tokens consumed in current 60-second window.
170    pub tokens_this_window: u32,
171    /// Compute milliseconds consumed in current 60-second window.
172    pub compute_ms_this_window: u32,
173    /// Start of the current budget window.
174    pub window_started_at: DateTime<Utc>,
175    /// Last time this persona made meaningful progress (not budget-paused/quorum-waiting).
176    pub last_progress_at: DateTime<Utc>,
177    /// Whether this persona is currently paused due to budget exhaustion.
178    #[serde(default)]
179    pub budget_paused: bool,
180}
181
182/// Proposal risk level.
183#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
184#[serde(rename_all = "snake_case")]
185pub enum ProposalRisk {
186    Low,
187    Medium,
188    High,
189    Critical,
190}
191
192/// Proposal status.
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
194#[serde(rename_all = "snake_case")]
195pub enum ProposalStatus {
196    Created,
197    Verified,
198    Rejected,
199    Executed,
200}
201
202/// A vote on a proposal.
203#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
204#[serde(rename_all = "snake_case")]
205pub enum ProposalVote {
206    Approve,
207    Reject,
208    Veto,
209    Abstain,
210}
211
212/// Proposal contract (think first, execute through gates).
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct Proposal {
215    pub id: String,
216    pub persona_id: String,
217    pub title: String,
218    pub rationale: String,
219    pub evidence_refs: Vec<String>,
220    pub risk: ProposalRisk,
221    pub status: ProposalStatus,
222    pub created_at: DateTime<Utc>,
223    /// Votes from personas: persona_id -> vote
224    #[serde(default)]
225    pub votes: HashMap<String, ProposalVote>,
226    /// Deadline for voting
227    pub vote_deadline: Option<DateTime<Utc>>,
228    /// Whether votes have been requested this cycle
229    #[serde(default)]
230    pub votes_requested: bool,
231    /// Quorum required (frozen at creation time to prevent drift).
232    #[serde(default)]
233    pub quorum_needed: usize,
234}
235
236/// Thought/event types emitted by the cognition loop.
237#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
238#[serde(rename_all = "snake_case")]
239pub enum ThoughtEventType {
240    ThoughtGenerated,
241    HypothesisRaised,
242    CheckRequested,
243    CheckResult,
244    ProposalCreated,
245    ProposalVerified,
246    ProposalRejected,
247    ActionExecuted,
248    PersonaSpawned,
249    PersonaReaped,
250    SnapshotCompressed,
251    BeliefExtracted,
252    BeliefContested,
253    BeliefRevalidated,
254    BudgetPaused,
255    IdleReaped,
256    AttentionCreated,
257    VoteCast,
258    WorkspaceUpdated,
259}
260
261impl ThoughtEventType {
262    fn as_str(&self) -> &'static str {
263        match self {
264            Self::ThoughtGenerated => "thought_generated",
265            Self::HypothesisRaised => "hypothesis_raised",
266            Self::CheckRequested => "check_requested",
267            Self::CheckResult => "check_result",
268            Self::ProposalCreated => "proposal_created",
269            Self::ProposalVerified => "proposal_verified",
270            Self::ProposalRejected => "proposal_rejected",
271            Self::ActionExecuted => "action_executed",
272            Self::PersonaSpawned => "persona_spawned",
273            Self::PersonaReaped => "persona_reaped",
274            Self::SnapshotCompressed => "snapshot_compressed",
275            Self::BeliefExtracted => "belief_extracted",
276            Self::BeliefContested => "belief_contested",
277            Self::BeliefRevalidated => "belief_revalidated",
278            Self::BudgetPaused => "budget_paused",
279            Self::IdleReaped => "idle_reaped",
280            Self::AttentionCreated => "attention_created",
281            Self::VoteCast => "vote_cast",
282            Self::WorkspaceUpdated => "workspace_updated",
283        }
284    }
285}
286
287/// Streamable thought event contract.
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct ThoughtEvent {
290    pub id: String,
291    pub event_type: ThoughtEventType,
292    pub persona_id: Option<String>,
293    pub swarm_id: Option<String>,
294    pub timestamp: DateTime<Utc>,
295    pub payload: serde_json::Value,
296}
297
298/// Distilled memory snapshot contract.
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct MemorySnapshot {
301    pub id: String,
302    pub generated_at: DateTime<Utc>,
303    pub swarm_id: Option<String>,
304    pub persona_scope: Vec<String>,
305    pub summary: String,
306    pub hot_event_count: usize,
307    pub warm_fact_count: usize,
308    pub cold_snapshot_count: usize,
309    pub metadata: HashMap<String, serde_json::Value>,
310}
311
312/// Request payload for creating a persona.
313#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct CreatePersonaRequest {
315    pub persona_id: Option<String>,
316    pub name: String,
317    pub role: String,
318    pub charter: String,
319    pub swarm_id: Option<String>,
320    pub parent_id: Option<String>,
321    pub policy: Option<PersonaPolicy>,
322    #[serde(default)]
323    pub tags: Vec<String>,
324}
325
326/// Request payload for spawning a child persona.
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct SpawnPersonaRequest {
329    pub persona_id: Option<String>,
330    pub name: String,
331    pub role: String,
332    pub charter: String,
333    pub swarm_id: Option<String>,
334    pub policy: Option<PersonaPolicy>,
335}
336
337/// Request payload for reaping persona(s).
338#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct ReapPersonaRequest {
340    pub cascade: Option<bool>,
341    pub reason: Option<String>,
342}
343
344/// Start-control payload for perpetual cognition.
345#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct StartCognitionRequest {
347    pub loop_interval_ms: Option<u64>,
348    pub seed_persona: Option<CreatePersonaRequest>,
349}
350
351/// Stop-control payload for perpetual cognition.
352#[derive(Debug, Clone, Serialize, Deserialize)]
353pub struct StopCognitionRequest {
354    pub reason: Option<String>,
355}
356
357// ── Attention, Governance, GlobalWorkspace ──────────────────────────────
358
359/// Source of an attention item.
360#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
361#[serde(rename_all = "snake_case")]
362pub enum AttentionSource {
363    ContestedBelief,
364    FailedCheck,
365    StaleBelief,
366    ProposalTimeout,
367    FailedExecution,
368}
369
370/// An item requiring persona attention.
371#[derive(Debug, Clone, Serialize, Deserialize)]
372pub struct AttentionItem {
373    pub id: String,
374    pub topic: String,
375    pub topic_tags: Vec<String>,
376    pub priority: f32,
377    pub source_type: AttentionSource,
378    pub source_id: String,
379    pub assigned_persona: Option<String>,
380    pub created_at: DateTime<Utc>,
381    pub resolved_at: Option<DateTime<Utc>>,
382}
383
384/// Governance rules for swarm voting.
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct SwarmGovernance {
387    pub quorum_fraction: f32,
388    pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
389    pub veto_roles: Vec<String>,
390    pub vote_timeout_secs: u64,
391}
392
393impl Default for SwarmGovernance {
394    fn default() -> Self {
395        Self {
396            quorum_fraction: 0.5,
397            required_approvers_by_role: HashMap::new(),
398            veto_roles: Vec::new(),
399            vote_timeout_secs: 300,
400        }
401    }
402}
403
404/// The coherent "now" for the entire swarm.
405#[derive(Debug, Clone, Serialize, Deserialize)]
406pub struct GlobalWorkspace {
407    pub top_beliefs: Vec<String>,
408    pub top_uncertainties: Vec<String>,
409    pub top_attention: Vec<String>,
410    pub active_objectives: Vec<String>,
411    pub updated_at: DateTime<Utc>,
412}
413
414impl Default for GlobalWorkspace {
415    fn default() -> Self {
416        Self {
417            top_beliefs: Vec::new(),
418            top_uncertainties: Vec::new(),
419            top_attention: Vec::new(),
420            active_objectives: Vec::new(),
421            updated_at: Utc::now(),
422        }
423    }
424}
425
426/// Start/stop response status.
427#[derive(Debug, Clone, Serialize, Deserialize)]
428pub struct CognitionStatus {
429    pub enabled: bool,
430    pub running: bool,
431    pub loop_interval_ms: u64,
432    pub started_at: Option<DateTime<Utc>>,
433    pub last_tick_at: Option<DateTime<Utc>>,
434    pub persona_count: usize,
435    pub active_persona_count: usize,
436    pub events_buffered: usize,
437    pub snapshots_buffered: usize,
438}
439
440/// Reap response.
441#[derive(Debug, Clone, Serialize, Deserialize)]
442pub struct ReapPersonaResponse {
443    pub reaped_ids: Vec<String>,
444    pub count: usize,
445}
446
447/// Lineage node response.
448#[derive(Debug, Clone, Serialize, Deserialize)]
449pub struct LineageNode {
450    pub persona_id: String,
451    pub parent_id: Option<String>,
452    pub children: Vec<String>,
453    pub depth: u32,
454    pub status: PersonaStatus,
455}
456
457/// Lineage graph response.
458#[derive(Debug, Clone, Serialize, Deserialize)]
459pub struct LineageGraph {
460    pub nodes: Vec<LineageNode>,
461    pub roots: Vec<String>,
462    pub total_edges: usize,
463}
464
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
466enum ThoughtPhase {
467    Observe,
468    Reflect,
469    Test,
470    Compress,
471}
472
473impl ThoughtPhase {
474    fn from_thought_count(thought_count: u64) -> Self {
475        match thought_count % 4 {
476            1 => Self::Observe,
477            2 => Self::Reflect,
478            3 => Self::Test,
479            _ => Self::Compress,
480        }
481    }
482
483    fn as_str(&self) -> &'static str {
484        match self {
485            Self::Observe => "observe",
486            Self::Reflect => "reflect",
487            Self::Test => "test",
488            Self::Compress => "compress",
489        }
490    }
491
492    fn event_type(&self) -> ThoughtEventType {
493        match self {
494            Self::Observe => ThoughtEventType::ThoughtGenerated,
495            Self::Reflect => ThoughtEventType::HypothesisRaised,
496            Self::Test => ThoughtEventType::CheckRequested,
497            Self::Compress => ThoughtEventType::SnapshotCompressed,
498        }
499    }
500}
501
502#[derive(Debug, Clone)]
503struct ThoughtWorkItem {
504    persona_id: String,
505    persona_name: String,
506    role: String,
507    charter: String,
508    swarm_id: Option<String>,
509    thought_count: u64,
510    phase: ThoughtPhase,
511}
512
513#[derive(Debug, Clone)]
514struct ThoughtResult {
515    source: &'static str,
516    model: Option<String>,
517    finish_reason: Option<String>,
518    thinking: String,
519    prompt_tokens: Option<u32>,
520    completion_tokens: Option<u32>,
521    total_tokens: Option<u32>,
522    latency_ms: u128,
523    error: Option<String>,
524}
525
526/// Runtime options for cognition manager.
527#[derive(Debug, Clone)]
528pub struct CognitionRuntimeOptions {
529    pub enabled: bool,
530    pub loop_interval_ms: u64,
531    pub max_events: usize,
532    pub max_snapshots: usize,
533    pub default_policy: PersonaPolicy,
534}
535
536impl Default for CognitionRuntimeOptions {
537    fn default() -> Self {
538        Self {
539            enabled: false,
540            loop_interval_ms: 2_000,
541            max_events: 2_000,
542            max_snapshots: 128,
543            default_policy: PersonaPolicy::default(),
544        }
545    }
546}
547
548/// In-memory cognition runtime for perpetual persona swarms.
549#[derive(Debug)]
550pub struct CognitionRuntime {
551    enabled: bool,
552    max_events: usize,
553    max_snapshots: usize,
554    default_policy: PersonaPolicy,
555    running: Arc<AtomicBool>,
556    loop_interval_ms: Arc<RwLock<u64>>,
557    started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
558    last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
559    personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
560    proposals: Arc<RwLock<HashMap<String, Proposal>>>,
561    events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
562    snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
563    loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
564    event_tx: broadcast::Sender<ThoughtEvent>,
565    thinker: Option<Arc<ThinkerClient>>,
566    beliefs: Arc<RwLock<HashMap<String, Belief>>>,
567    attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
568    governance: Arc<RwLock<SwarmGovernance>>,
569    workspace: Arc<RwLock<GlobalWorkspace>>,
570    tools: Option<Arc<ToolRegistry>>,
571    receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
572    /// Proposals pending human approval for Critical risk.
573    pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
574}
575
576impl CognitionRuntime {
577    /// Build runtime from environment feature flags.
578    pub fn new_from_env() -> Self {
579        let options = CognitionRuntimeOptions {
580            enabled: env_bool("CODETETHER_COGNITION_ENABLED", true),
581            loop_interval_ms: env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000),
582            max_events: env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000),
583            max_snapshots: env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128),
584            default_policy: PersonaPolicy {
585                max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
586                max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
587                token_budget_per_minute: env_u32(
588                    "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
589                    20_000,
590                ),
591                compute_ms_per_minute: env_u32(
592                    "CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE",
593                    10_000,
594                ),
595                idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
596                share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
597                allowed_tools: Vec::new(),
598            },
599        };
600
601        let thinker_backend = thinker::ThinkerBackend::from_env(
602            &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
603                .unwrap_or_else(|_| "openai_compat".to_string()),
604        );
605        let thinker_timeout_default = match thinker_backend {
606            thinker::ThinkerBackend::OpenAICompat => 30_000,
607            thinker::ThinkerBackend::Candle => 12_000,
608            thinker::ThinkerBackend::Bedrock => 60_000,
609        };
610        let thinker_config = ThinkerConfig {
611            enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
612            backend: thinker_backend,
613            endpoint: normalize_thinker_endpoint(
614                &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
615                    .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
616            ),
617            model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
618                .unwrap_or_else(|_| "qwen3.5-9b".to_string()),
619            api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
620            temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
621            top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
622                .ok()
623                .and_then(|v| v.parse::<f32>().ok()),
624            max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
625            timeout_ms: env_u64(
626                "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
627                thinker_timeout_default,
628            ),
629            candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
630            candle_tokenizer_path: std::env::var(
631                "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
632            )
633            .ok(),
634            candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
635            candle_device: thinker::CandleDevicePreference::from_env(
636                &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
637                    .unwrap_or_else(|_| "auto".to_string()),
638            ),
639            candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
640            candle_repeat_penalty: env_f32(
641                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
642                1.1,
643            ),
644            candle_repeat_last_n: env_usize(
645                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
646                64,
647            ),
648            candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
649            bedrock_region: std::env::var("CODETETHER_COGNITION_THINKER_BEDROCK_REGION")
650                .unwrap_or_else(|_| {
651                    std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-west-2".to_string())
652                }),
653            bedrock_service_tier: std::env::var(
654                "CODETETHER_COGNITION_THINKER_BEDROCK_SERVICE_TIER",
655            )
656            .ok()
657            .map(|v| v.trim().to_ascii_lowercase())
658            .filter(|v| !v.is_empty()),
659        };
660
661        let mut runtime = Self::new_with_options(options);
662        // Configure the thinker on the existing runtime
663        runtime.thinker = Some(thinker_config).and_then(|cfg| {
664            if !cfg.enabled {
665                return None;
666            }
667            match ThinkerClient::new(cfg) {
668                Ok(client) => {
669                    tracing::info!(
670                        backend = ?client.config().backend,
671                        endpoint = %client.config().endpoint,
672                        model = %client.config().model,
673                        "Cognition thinker initialized"
674                    );
675                    Some(Arc::new(client))
676                }
677                Err(error) => {
678                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
679                    None
680                }
681            }
682        });
683        runtime
684    }
685
686    /// Build runtime from explicit options.
687    pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
688        Self::new_with_options_and_thinker(options, None)
689    }
690
691    fn new_with_options_and_thinker(
692        options: CognitionRuntimeOptions,
693        thinker_config: Option<ThinkerConfig>,
694    ) -> Self {
695        let (event_tx, _) = broadcast::channel(256);
696        let thinker = thinker_config.and_then(|cfg| {
697            if !cfg.enabled {
698                return None;
699            }
700            match ThinkerClient::new(cfg) {
701                Ok(client) => {
702                    tracing::info!(
703                        backend = ?client.config().backend,
704                        endpoint = %client.config().endpoint,
705                        model = %client.config().model,
706                        "Cognition thinker initialized"
707                    );
708                    Some(Arc::new(client))
709                }
710                Err(error) => {
711                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
712                    None
713                }
714            }
715        });
716
717        // Load persisted state before construction to avoid blocking_write()
718        // inside a tokio runtime (which panics).
719        #[cfg(test)]
720        let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) = (
721            HashMap::new(),
722            HashMap::new(),
723            HashMap::new(),
724            Vec::new(),
725            GlobalWorkspace::default(),
726        );
727
728        #[cfg(not(test))]
729        let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) =
730            if let Some(persisted) = persistence::load_state() {
731                tracing::info!(
732                    personas = persisted.personas.len(),
733                    beliefs = persisted.beliefs.len(),
734                    persisted_at = %persisted.persisted_at,
735                    "Restoring persisted cognition state"
736                );
737                (
738                    persisted.personas,
739                    persisted.beliefs,
740                    persisted.proposals,
741                    persisted.attention_queue,
742                    persisted.workspace,
743                )
744            } else {
745                (
746                    HashMap::new(),
747                    HashMap::new(),
748                    HashMap::new(),
749                    Vec::new(),
750                    GlobalWorkspace::default(),
751                )
752            };
753
754        Self {
755            enabled: options.enabled,
756            max_events: options.max_events.max(32),
757            max_snapshots: options.max_snapshots.max(8),
758            default_policy: options.default_policy,
759            running: Arc::new(AtomicBool::new(false)),
760            loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
761            started_at: Arc::new(RwLock::new(None)),
762            last_tick_at: Arc::new(RwLock::new(None)),
763            personas: Arc::new(RwLock::new(init_personas)),
764            proposals: Arc::new(RwLock::new(init_proposals)),
765            events: Arc::new(RwLock::new(VecDeque::new())),
766            snapshots: Arc::new(RwLock::new(VecDeque::new())),
767            loop_handle: Arc::new(Mutex::new(None)),
768            event_tx,
769            thinker,
770            beliefs: Arc::new(RwLock::new(init_beliefs)),
771            attention_queue: Arc::new(RwLock::new(init_attention)),
772            governance: Arc::new(RwLock::new(SwarmGovernance::default())),
773            workspace: Arc::new(RwLock::new(init_workspace)),
774            tools: None,
775            receipts: Arc::new(RwLock::new(Vec::new())),
776            pending_approvals: Arc::new(RwLock::new(HashMap::new())),
777        }
778    }
779
780    /// Whether cognition is enabled by feature flag.
781    pub fn is_enabled(&self) -> bool {
782        self.enabled
783    }
784
785    /// Subscribe to thought events for streaming.
786    pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
787        self.event_tx.subscribe()
788    }
789
790    /// Start the perpetual cognition loop.
791    pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
792        if !self.enabled {
793            return Err(anyhow!(
794                "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
795            ));
796        }
797
798        let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
799        if let Some(request) = req {
800            if let Some(interval) = request.loop_interval_ms {
801                let mut lock = self.loop_interval_ms.write().await;
802                *lock = interval.max(100);
803            }
804            requested_seed_persona = request.seed_persona;
805        }
806
807        let has_personas = !self.personas.read().await.is_empty();
808        if !has_personas {
809            let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
810            let _ = self.create_persona(seed).await?;
811        }
812
813        if self.running.load(Ordering::SeqCst) {
814            return Ok(self.status().await);
815        }
816
817        self.running.store(true, Ordering::SeqCst);
818        {
819            let mut started = self.started_at.write().await;
820            *started = Some(Utc::now());
821        }
822
823        let running = Arc::clone(&self.running);
824        let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
825        let last_tick_at = Arc::clone(&self.last_tick_at);
826        let personas = Arc::clone(&self.personas);
827        let proposals = Arc::clone(&self.proposals);
828        let events = Arc::clone(&self.events);
829        let snapshots = Arc::clone(&self.snapshots);
830        let max_events = self.max_events;
831        let max_snapshots = self.max_snapshots;
832        let event_tx = self.event_tx.clone();
833        let thinker = self.thinker.clone();
834        let beliefs = Arc::clone(&self.beliefs);
835        let attention_queue = Arc::clone(&self.attention_queue);
836        let governance = Arc::clone(&self.governance);
837        let workspace = Arc::clone(&self.workspace);
838        let tools = self.tools.clone();
839        let receipts = Arc::clone(&self.receipts);
840        let pending_approvals = Arc::clone(&self.pending_approvals);
841
842        let handle = tokio::spawn(async move {
843            let mut next_tick = Instant::now();
844            while running.load(Ordering::SeqCst) {
845                let now_instant = Instant::now();
846                if now_instant < next_tick {
847                    tokio::time::sleep_until(next_tick).await;
848                }
849                if !running.load(Ordering::SeqCst) {
850                    break;
851                }
852
853                let now = Utc::now();
854                {
855                    let mut lock = last_tick_at.write().await;
856                    *lock = Some(now);
857                }
858
859                let mut new_events = Vec::new();
860                let mut new_snapshots = Vec::new();
861                let mut new_proposals = Vec::new();
862
863                // ── Step 1: Budget window reset + enforcement ──
864                let work_items: Vec<ThoughtWorkItem> = {
865                    let mut map = personas.write().await;
866                    let mut items = Vec::new();
867                    for persona in map.values_mut() {
868                        if persona.status != PersonaStatus::Active {
869                            continue;
870                        }
871
872                        // Reset budget window every 60 seconds
873                        let window_elapsed = now
874                            .signed_duration_since(persona.window_started_at)
875                            .num_seconds();
876                        if window_elapsed >= 60 {
877                            persona.tokens_this_window = 0;
878                            persona.compute_ms_this_window = 0;
879                            persona.window_started_at = now;
880                        }
881
882                        // Check budget
883                        let token_ok =
884                            persona.tokens_this_window < persona.policy.token_budget_per_minute;
885                        let compute_ok =
886                            persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
887                        if !token_ok || !compute_ok {
888                            if !persona.budget_paused {
889                                persona.budget_paused = true;
890                                new_events.push(ThoughtEvent {
891                                    id: Uuid::new_v4().to_string(),
892                                    event_type: ThoughtEventType::BudgetPaused,
893                                    persona_id: Some(persona.identity.id.clone()),
894                                    swarm_id: persona.identity.swarm_id.clone(),
895                                    timestamp: now,
896                                    payload: json!({
897                                        "budget_paused": true,
898                                        "tokens_used": persona.tokens_this_window,
899                                        "compute_ms_used": persona.compute_ms_this_window,
900                                        "token_budget": persona.policy.token_budget_per_minute,
901                                        "compute_budget": persona.policy.compute_ms_per_minute,
902                                    }),
903                                });
904                            }
905                            continue; // skip tick for this persona
906                        }
907
908                        persona.budget_paused = false;
909                        persona.thought_count = persona.thought_count.saturating_add(1);
910                        persona.last_tick_at = Some(now);
911                        persona.updated_at = now;
912                        items.push(ThoughtWorkItem {
913                            persona_id: persona.identity.id.clone(),
914                            persona_name: persona.identity.name.clone(),
915                            role: persona.identity.role.clone(),
916                            charter: persona.identity.charter.clone(),
917                            swarm_id: persona.identity.swarm_id.clone(),
918                            thought_count: persona.thought_count,
919                            phase: ThoughtPhase::from_thought_count(persona.thought_count),
920                        });
921                    }
922                    items
923                };
924
925                for work in &work_items {
926                    let context = recent_persona_context(&events, &work.persona_id, 8).await;
927
928                    let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
929
930                    let event_timestamp = Utc::now();
931                    let is_fallback = thought.source == "fallback";
932                    let tokens_used = thought.total_tokens.unwrap_or(0);
933                    let compute_used = thought.latency_ms as u32;
934
935                    new_events.push(ThoughtEvent {
936                        id: Uuid::new_v4().to_string(),
937                        event_type: work.phase.event_type(),
938                        persona_id: Some(work.persona_id.clone()),
939                        swarm_id: work.swarm_id.clone(),
940                        timestamp: event_timestamp,
941                        payload: json!({
942                            "phase": work.phase.as_str(),
943                            "thought_count": work.thought_count,
944                            "persona": {
945                                "id": work.persona_id.clone(),
946                                "name": work.persona_name.clone(),
947                                "role": work.role.clone(),
948                            },
949                            "context_event_count": context.len(),
950                            "thinking": thought.thinking.clone(),
951                            "source": thought.source,
952                            "model": thought.model.clone(),
953                            "finish_reason": thought.finish_reason.clone(),
954                            "usage": {
955                                "prompt_tokens": thought.prompt_tokens,
956                                "completion_tokens": thought.completion_tokens,
957                                "total_tokens": thought.total_tokens,
958                            },
959                            "latency_ms": thought.latency_ms,
960                            "error": thought.error.clone(),
961                        }),
962                    });
963
964                    // ── After thought: update budget counters ──
965                    {
966                        let mut map = personas.write().await;
967                        if let Some(persona) = map.get_mut(&work.persona_id) {
968                            persona.tokens_this_window =
969                                persona.tokens_this_window.saturating_add(tokens_used);
970                            persona.compute_ms_this_window =
971                                persona.compute_ms_this_window.saturating_add(compute_used);
972                            if !is_fallback {
973                                persona.last_progress_at = Utc::now();
974                            }
975                        }
976                    }
977
978                    // ── Step 3: Belief extraction during Reflect phase ──
979                    if work.phase == ThoughtPhase::Reflect && !is_fallback {
980                        let extracted = beliefs::extract_beliefs_from_thought(
981                            thinker.as_deref(),
982                            &work.persona_id,
983                            &thought.thinking,
984                        )
985                        .await;
986
987                        if !extracted.is_empty() {
988                            let mut belief_store = beliefs.write().await;
989                            let mut attn_queue = attention_queue.write().await;
990                            for mut new_belief in extracted {
991                                // Check for existing belief_key (duplicate detection)
992                                let existing_id = belief_store
993                                    .values()
994                                    .find(|b| {
995                                        b.belief_key == new_belief.belief_key
996                                            && b.status != BeliefStatus::Invalidated
997                                    })
998                                    .map(|b| b.id.clone());
999
1000                                if let Some(eid) = existing_id {
1001                                    // Confirm existing belief
1002                                    if let Some(existing) = belief_store.get_mut(&eid) {
1003                                        if !existing.confirmed_by.contains(&work.persona_id) {
1004                                            existing.confirmed_by.push(work.persona_id.clone());
1005                                        }
1006                                        existing.revalidation_success();
1007                                    }
1008                                } else {
1009                                    // Handle contradiction targets
1010                                    let contest_targets: Vec<String> =
1011                                        new_belief.contradicts.clone();
1012                                    for target_key in &contest_targets {
1013                                        if let Some(target) = belief_store.values_mut().find(|b| {
1014                                            &b.belief_key == target_key
1015                                                && b.status != BeliefStatus::Invalidated
1016                                        }) {
1017                                            target.contested_by.push(new_belief.id.clone());
1018                                            if !new_belief.contradicts.contains(&target.belief_key)
1019                                            {
1020                                                new_belief
1021                                                    .contradicts
1022                                                    .push(target.belief_key.clone());
1023                                            }
1024                                            // Apply contest penalty
1025                                            target.revalidation_failure();
1026                                            if target.confidence >= 0.5 {
1027                                                // Create revalidation attention item
1028                                                attn_queue.push(AttentionItem {
1029                                                    id: Uuid::new_v4().to_string(),
1030                                                    topic: format!(
1031                                                        "Revalidate belief: {}",
1032                                                        target.claim
1033                                                    ),
1034                                                    topic_tags: vec![target.belief_key.clone()],
1035                                                    priority: 0.7,
1036                                                    source_type: AttentionSource::ContestedBelief,
1037                                                    source_id: target.id.clone(),
1038                                                    assigned_persona: None,
1039                                                    created_at: Utc::now(),
1040                                                    resolved_at: None,
1041                                                });
1042                                            }
1043                                        }
1044                                    }
1045
1046                                    new_events.push(ThoughtEvent {
1047                                        id: Uuid::new_v4().to_string(),
1048                                        event_type: ThoughtEventType::BeliefExtracted,
1049                                        persona_id: Some(work.persona_id.clone()),
1050                                        swarm_id: work.swarm_id.clone(),
1051                                        timestamp: Utc::now(),
1052                                        payload: json!({
1053                                            "belief_id": new_belief.id,
1054                                            "belief_key": new_belief.belief_key,
1055                                            "claim": trim_for_storage(&new_belief.claim, 280),
1056                                            "confidence": new_belief.confidence,
1057                                        }),
1058                                    });
1059
1060                                    // Mark progress for belief creation
1061                                    {
1062                                        let mut map = personas.write().await;
1063                                        if let Some(p) = map.get_mut(&work.persona_id) {
1064                                            p.last_progress_at = Utc::now();
1065                                        }
1066                                    }
1067
1068                                    new_belief.clamp_confidence();
1069                                    belief_store.insert(new_belief.id.clone(), new_belief);
1070                                }
1071                            }
1072                        }
1073                    }
1074
1075                    if work.phase == ThoughtPhase::Test {
1076                        new_events.push(ThoughtEvent {
1077                            id: Uuid::new_v4().to_string(),
1078                            event_type: ThoughtEventType::CheckResult,
1079                            persona_id: Some(work.persona_id.clone()),
1080                            swarm_id: work.swarm_id.clone(),
1081                            timestamp: Utc::now(),
1082                            payload: json!({
1083                                "phase": work.phase.as_str(),
1084                                "thought_count": work.thought_count,
1085                                "result_excerpt": trim_for_storage(&thought.thinking, 280),
1086                                "source": thought.source,
1087                                "model": thought.model,
1088                            }),
1089                        });
1090
1091                        // Mark progress for check result
1092                        {
1093                            let mut map = personas.write().await;
1094                            if let Some(p) = map.get_mut(&work.persona_id) {
1095                                p.last_progress_at = Utc::now();
1096                            }
1097                        }
1098
1099                        // ── Step 6: Tool execution via capability leases ──
1100                        if !is_fallback && let Some(ref tool_registry) = tools {
1101                            let allowed = {
1102                                let map = personas.read().await;
1103                                map.get(&work.persona_id)
1104                                    .map(|p| p.policy.allowed_tools.clone())
1105                                    .unwrap_or_default()
1106                            };
1107                            if !allowed.is_empty() {
1108                                let tool_results = executor::execute_tool_requests(
1109                                    thinker.as_deref(),
1110                                    tool_registry,
1111                                    &work.persona_id,
1112                                    &thought.thinking,
1113                                    &allowed,
1114                                )
1115                                .await;
1116
1117                                for result_event in tool_results {
1118                                    new_events.push(result_event);
1119                                    // Mark progress for tool execution
1120                                    let mut map = personas.write().await;
1121                                    if let Some(p) = map.get_mut(&work.persona_id) {
1122                                        p.last_progress_at = Utc::now();
1123                                    }
1124                                }
1125                            }
1126                        }
1127                    }
1128
1129                    if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
1130                        let gov = governance.read().await;
1131                        let proposal = Proposal {
1132                            id: Uuid::new_v4().to_string(),
1133                            persona_id: work.persona_id.clone(),
1134                            title: proposal_title_from_thought(
1135                                &thought.thinking,
1136                                work.thought_count,
1137                            ),
1138                            rationale: trim_for_storage(&thought.thinking, 900),
1139                            evidence_refs: vec!["internal.thought_stream".to_string()],
1140                            risk: ProposalRisk::Low,
1141                            status: ProposalStatus::Created,
1142                            created_at: Utc::now(),
1143                            votes: HashMap::new(),
1144                            vote_deadline: Some(
1145                                Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1146                            ),
1147                            votes_requested: false,
1148                            quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1149                                as usize,
1150                        };
1151
1152                        new_events.push(ThoughtEvent {
1153                            id: Uuid::new_v4().to_string(),
1154                            event_type: ThoughtEventType::ProposalCreated,
1155                            persona_id: Some(work.persona_id.clone()),
1156                            swarm_id: work.swarm_id.clone(),
1157                            timestamp: Utc::now(),
1158                            payload: json!({
1159                                "proposal_id": proposal.id,
1160                                "title": proposal.title,
1161                                "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1162                                "source": thought.source,
1163                                "model": thought.model,
1164                            }),
1165                        });
1166
1167                        new_proposals.push(proposal);
1168                    }
1169
1170                    if work.phase == ThoughtPhase::Compress {
1171                        new_snapshots.push(MemorySnapshot {
1172                            id: Uuid::new_v4().to_string(),
1173                            generated_at: Utc::now(),
1174                            swarm_id: work.swarm_id.clone(),
1175                            persona_scope: vec![work.persona_id.clone()],
1176                            summary: trim_for_storage(&thought.thinking, 1_500),
1177                            hot_event_count: context.len(),
1178                            warm_fact_count: estimate_fact_count(&thought.thinking),
1179                            cold_snapshot_count: 1,
1180                            metadata: HashMap::from([
1181                                (
1182                                    "phase".to_string(),
1183                                    serde_json::Value::String(work.phase.as_str().to_string()),
1184                                ),
1185                                (
1186                                    "role".to_string(),
1187                                    serde_json::Value::String(work.role.clone()),
1188                                ),
1189                                (
1190                                    "source".to_string(),
1191                                    serde_json::Value::String(thought.source.to_string()),
1192                                ),
1193                                (
1194                                    "model".to_string(),
1195                                    serde_json::Value::String(
1196                                        thought
1197                                            .model
1198                                            .clone()
1199                                            .unwrap_or_else(|| "fallback".to_string()),
1200                                    ),
1201                                ),
1202                                (
1203                                    "completion_tokens".to_string(),
1204                                    serde_json::Value::Number(serde_json::Number::from(
1205                                        thought.completion_tokens.unwrap_or(0) as u64,
1206                                    )),
1207                                ),
1208                            ]),
1209                        });
1210
1211                        // ── Step 3: Belief staleness decay in Compress phase ──
1212                        {
1213                            let mut belief_store = beliefs.write().await;
1214                            let mut attn_queue = attention_queue.write().await;
1215                            let stale_ids: Vec<String> = belief_store
1216                                .values()
1217                                .filter(|b| {
1218                                    b.status == BeliefStatus::Active && now > b.review_after
1219                                })
1220                                .map(|b| b.id.clone())
1221                                .collect();
1222                            for id in stale_ids {
1223                                if let Some(belief) = belief_store.get_mut(&id) {
1224                                    belief.decay();
1225                                    attn_queue.push(AttentionItem {
1226                                        id: Uuid::new_v4().to_string(),
1227                                        topic: format!("Stale belief: {}", belief.claim),
1228                                        topic_tags: vec![belief.belief_key.clone()],
1229                                        priority: 0.4,
1230                                        source_type: AttentionSource::StaleBelief,
1231                                        source_id: belief.id.clone(),
1232                                        assigned_persona: None,
1233                                        created_at: now,
1234                                        resolved_at: None,
1235                                    });
1236                                }
1237                            }
1238                        }
1239
1240                        // ── Step 4d: Update GlobalWorkspace ──
1241                        {
1242                            let belief_store = beliefs.read().await;
1243                            let attn_queue = attention_queue.read().await;
1244
1245                            let mut sorted_beliefs: Vec<&Belief> = belief_store
1246                                .values()
1247                                .filter(|b| b.status == BeliefStatus::Active)
1248                                .collect();
1249                            sorted_beliefs.sort_by(|a, b| {
1250                                let score_a = a.confidence
1251                                    * (1.0
1252                                        / (1.0
1253                                            + now.signed_duration_since(a.updated_at).num_minutes()
1254                                                as f32));
1255                                let score_b = b.confidence
1256                                    * (1.0
1257                                        / (1.0
1258                                            + now.signed_duration_since(b.updated_at).num_minutes()
1259                                                as f32));
1260                                score_b
1261                                    .partial_cmp(&score_a)
1262                                    .unwrap_or(std::cmp::Ordering::Equal)
1263                            });
1264
1265                            let top_beliefs: Vec<String> = sorted_beliefs
1266                                .iter()
1267                                .take(10)
1268                                .map(|b| b.id.clone())
1269                                .collect();
1270                            let top_uncertainties: Vec<String> = {
1271                                let mut uncertain: Vec<&Belief> = belief_store
1272                                    .values()
1273                                    .filter(|b| {
1274                                        b.status == BeliefStatus::Stale
1275                                            || !b.contested_by.is_empty()
1276                                    })
1277                                    .collect();
1278                                // Deterministic order: contested first, then lowest confidence,
1279                                // then oldest updated_at.
1280                                uncertain.sort_by(|a, b| {
1281                                    let a_contested = !a.contested_by.is_empty();
1282                                    let b_contested = !b.contested_by.is_empty();
1283                                    b_contested
1284                                        .cmp(&a_contested)
1285                                        .then_with(|| {
1286                                            a.confidence
1287                                                .partial_cmp(&b.confidence)
1288                                                .unwrap_or(std::cmp::Ordering::Equal)
1289                                        })
1290                                        .then_with(|| a.updated_at.cmp(&b.updated_at))
1291                                });
1292                                uncertain
1293                                    .iter()
1294                                    .take(5)
1295                                    .map(|b| {
1296                                        format!(
1297                                            "[{}] {}",
1298                                            b.belief_key,
1299                                            trim_for_storage(&b.claim, 80)
1300                                        )
1301                                    })
1302                                    .collect()
1303                            };
1304
1305                            let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1306                                .iter()
1307                                .filter(|a| a.resolved_at.is_none())
1308                                .collect();
1309                            sorted_attn.sort_by(|a, b| {
1310                                b.priority
1311                                    .partial_cmp(&a.priority)
1312                                    .unwrap_or(std::cmp::Ordering::Equal)
1313                            });
1314                            let top_attention: Vec<String> =
1315                                sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1316
1317                            let mut ws = workspace.write().await;
1318                            ws.top_beliefs = top_beliefs;
1319                            ws.top_uncertainties = top_uncertainties;
1320                            ws.top_attention = top_attention;
1321                            ws.updated_at = now;
1322                        }
1323
1324                        new_events.push(ThoughtEvent {
1325                            id: Uuid::new_v4().to_string(),
1326                            event_type: ThoughtEventType::WorkspaceUpdated,
1327                            persona_id: Some(work.persona_id.clone()),
1328                            swarm_id: work.swarm_id.clone(),
1329                            timestamp: Utc::now(),
1330                            payload: json!({ "updated": true }),
1331                        });
1332                    }
1333                }
1334
1335                // ── Step 4c: Governance — resolve proposals ──
1336                {
1337                    let gov = governance.read().await;
1338                    let mut proposal_store = proposals.write().await;
1339                    let persona_map = personas.read().await;
1340
1341                    let proposal_ids: Vec<String> = proposal_store
1342                        .values()
1343                        .filter(|p| p.status == ProposalStatus::Created)
1344                        .map(|p| p.id.clone())
1345                        .collect();
1346
1347                    let mut attn_queue = attention_queue.write().await;
1348                    for pid in proposal_ids {
1349                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1350                            let quorum_needed = proposal.quorum_needed.max(1);
1351
1352                            // Check vote deadline
1353                            if let Some(deadline) = proposal.vote_deadline
1354                                && now > deadline
1355                            {
1356                                if proposal.votes.len() < quorum_needed {
1357                                    // Timeout without quorum → attention item
1358                                    attn_queue.push(AttentionItem {
1359                                        id: Uuid::new_v4().to_string(),
1360                                        topic: format!("Proposal vote timeout: {}", proposal.title),
1361                                        topic_tags: Vec::new(),
1362                                        priority: 0.6,
1363                                        source_type: AttentionSource::ProposalTimeout,
1364                                        source_id: proposal.id.clone(),
1365                                        assigned_persona: None,
1366                                        created_at: now,
1367                                        resolved_at: None,
1368                                    });
1369                                    proposal.status = ProposalStatus::Rejected;
1370                                    continue;
1371                                }
1372
1373                                // Deadline passed with quorum but missing required approvers → reject
1374                                let required_roles = gov
1375                                    .required_approvers_by_role
1376                                    .get(&proposal.risk)
1377                                    .cloned()
1378                                    .unwrap_or_default();
1379                                let all_required_met = required_roles.iter().all(|role| {
1380                                    proposal.votes.iter().any(|(vid, vote)| {
1381                                        *vote == ProposalVote::Approve
1382                                            && persona_map
1383                                                .get(vid)
1384                                                .map(|p| &p.identity.role == role)
1385                                                .unwrap_or(false)
1386                                    })
1387                                });
1388                                if !all_required_met {
1389                                    attn_queue.push(AttentionItem {
1390                                        id: Uuid::new_v4().to_string(),
1391                                        topic: format!(
1392                                            "Missing required approvers: {}",
1393                                            proposal.title
1394                                        ),
1395                                        topic_tags: Vec::new(),
1396                                        priority: 0.7,
1397                                        source_type: AttentionSource::ProposalTimeout,
1398                                        source_id: proposal.id.clone(),
1399                                        assigned_persona: None,
1400                                        created_at: now,
1401                                        resolved_at: None,
1402                                    });
1403                                    proposal.status = ProposalStatus::Rejected;
1404                                    continue;
1405                                }
1406                            }
1407
1408                            // Resolve if enough votes
1409                            if proposal.votes.len() >= quorum_needed {
1410                                // Check for veto
1411                                let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1412                                    if *vote != ProposalVote::Veto {
1413                                        return false;
1414                                    }
1415                                    if let Some(voter) = persona_map.get(voter_id) {
1416                                        gov.veto_roles.contains(&voter.identity.role)
1417                                    } else {
1418                                        false
1419                                    }
1420                                });
1421
1422                                if vetoed {
1423                                    proposal.status = ProposalStatus::Rejected;
1424                                    continue;
1425                                }
1426
1427                                // Check required approvers
1428                                let required_roles = gov
1429                                    .required_approvers_by_role
1430                                    .get(&proposal.risk)
1431                                    .cloned()
1432                                    .unwrap_or_default();
1433                                let all_required_met = required_roles.iter().all(|role| {
1434                                    proposal.votes.iter().any(|(vid, vote)| {
1435                                        *vote == ProposalVote::Approve
1436                                            && persona_map
1437                                                .get(vid)
1438                                                .map(|p| &p.identity.role == role)
1439                                                .unwrap_or(false)
1440                                    })
1441                                });
1442
1443                                if !all_required_met {
1444                                    continue; // wait for required approvers
1445                                }
1446
1447                                // Count approvals vs rejections
1448                                let approvals = proposal
1449                                    .votes
1450                                    .values()
1451                                    .filter(|v| **v == ProposalVote::Approve)
1452                                    .count();
1453                                let rejections = proposal
1454                                    .votes
1455                                    .values()
1456                                    .filter(|v| **v == ProposalVote::Reject)
1457                                    .count();
1458
1459                                if approvals > rejections {
1460                                    proposal.status = ProposalStatus::Verified;
1461                                } else {
1462                                    proposal.status = ProposalStatus::Rejected;
1463                                }
1464                            }
1465                        }
1466                    }
1467                }
1468
1469                // ── Step 7: Execute verified proposals ──
1470                {
1471                    let mut proposal_store = proposals.write().await;
1472                    let verified_ids: Vec<String> = proposal_store
1473                        .values()
1474                        .filter(|p| p.status == ProposalStatus::Verified)
1475                        .map(|p| p.id.clone())
1476                        .collect();
1477
1478                    for pid in verified_ids {
1479                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1480                            if proposal.risk == ProposalRisk::Critical {
1481                                // Check for human approval
1482                                let approved = {
1483                                    let approvals = pending_approvals.read().await;
1484                                    approvals.get(&pid).copied().unwrap_or(false)
1485                                };
1486                                if !approved {
1487                                    // Register for human approval
1488                                    let mut approvals = pending_approvals.write().await;
1489                                    approvals.entry(pid.clone()).or_insert(false);
1490                                    continue;
1491                                }
1492                            }
1493
1494                            // Create decision receipt
1495                            let receipt = executor::DecisionReceipt {
1496                                id: Uuid::new_v4().to_string(),
1497                                proposal_id: pid.clone(),
1498                                inputs: proposal.evidence_refs.clone(),
1499                                governance_decision: format!(
1500                                    "Approved with {} votes",
1501                                    proposal.votes.len()
1502                                ),
1503                                capability_leases: Vec::new(),
1504                                tool_invocations: Vec::new(),
1505                                outcome: executor::ExecutionOutcome::Success {
1506                                    summary: format!("Proposal '{}' executed", proposal.title),
1507                                },
1508                                created_at: Utc::now(),
1509                            };
1510
1511                            new_events.push(ThoughtEvent {
1512                                id: Uuid::new_v4().to_string(),
1513                                event_type: ThoughtEventType::ActionExecuted,
1514                                persona_id: Some(proposal.persona_id.clone()),
1515                                swarm_id: None,
1516                                timestamp: Utc::now(),
1517                                payload: json!({
1518                                    "receipt_id": receipt.id,
1519                                    "proposal_id": pid,
1520                                    "outcome": "success",
1521                                    "summary": format!("Proposal '{}' executed", proposal.title),
1522                                }),
1523                            });
1524
1525                            receipts.write().await.push(receipt);
1526                            proposal.status = ProposalStatus::Executed;
1527                        }
1528                    }
1529                }
1530
1531                if !new_proposals.is_empty() {
1532                    let mut proposal_store = proposals.write().await;
1533                    for proposal in new_proposals {
1534                        proposal_store.insert(proposal.id.clone(), proposal);
1535                    }
1536                }
1537
1538                for event in new_events {
1539                    push_event_internal(&events, max_events, &event_tx, event).await;
1540                }
1541                for snapshot in new_snapshots {
1542                    push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1543                }
1544
1545                // ── Step 2: Idle TTL reaping ──
1546                {
1547                    let mut map = personas.write().await;
1548                    let idle_ids: Vec<String> = map
1549                        .values()
1550                        .filter(|p| {
1551                            p.status == PersonaStatus::Active
1552                                && !p.budget_paused
1553                                && now.signed_duration_since(p.last_progress_at).num_seconds()
1554                                    > p.policy.idle_ttl_secs as i64
1555                        })
1556                        .map(|p| p.identity.id.clone())
1557                        .collect();
1558
1559                    for id in &idle_ids {
1560                        if let Some(persona) = map.get_mut(id) {
1561                            persona.status = PersonaStatus::Reaped;
1562                            persona.updated_at = now;
1563                        }
1564                        // Also cascade-reap children
1565                        let children: Vec<String> = map
1566                            .values()
1567                            .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1568                            .map(|p| p.identity.id.clone())
1569                            .collect();
1570                        for child_id in children {
1571                            if let Some(child) = map.get_mut(&child_id) {
1572                                child.status = PersonaStatus::Reaped;
1573                                child.updated_at = now;
1574                            }
1575                        }
1576                    }
1577                    drop(map);
1578
1579                    for id in idle_ids {
1580                        push_event_internal(
1581                            &events,
1582                            max_events,
1583                            &event_tx,
1584                            ThoughtEvent {
1585                                id: Uuid::new_v4().to_string(),
1586                                event_type: ThoughtEventType::IdleReaped,
1587                                persona_id: Some(id),
1588                                swarm_id: None,
1589                                timestamp: now,
1590                                payload: json!({ "reason": "idle_ttl_expired" }),
1591                            },
1592                        )
1593                        .await;
1594                    }
1595                }
1596
1597                // ── Step 5: Persistence on Compress ──
1598                if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1599                    let _ = persistence::save_state(
1600                        &personas,
1601                        &proposals,
1602                        &beliefs,
1603                        &attention_queue,
1604                        &workspace,
1605                        &events,
1606                        &snapshots,
1607                    )
1608                    .await;
1609                }
1610
1611                let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1612                next_tick += interval;
1613                let tick_completed = Instant::now();
1614                if tick_completed > next_tick {
1615                    next_tick = tick_completed;
1616                }
1617            }
1618        });
1619
1620        {
1621            let mut lock = self.loop_handle.lock().await;
1622            *lock = Some(handle);
1623        }
1624
1625        Ok(self.status().await)
1626    }
1627
1628    /// Stop the perpetual cognition loop.
1629    pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1630        self.running.store(false, Ordering::SeqCst);
1631
1632        if let Some(handle) = self.loop_handle.lock().await.take() {
1633            handle.abort();
1634            let _ = handle.await;
1635        }
1636
1637        if let Some(reason_value) = reason {
1638            let event = ThoughtEvent {
1639                id: Uuid::new_v4().to_string(),
1640                event_type: ThoughtEventType::CheckResult,
1641                persona_id: None,
1642                swarm_id: None,
1643                timestamp: Utc::now(),
1644                payload: json!({ "stopped": true, "reason": reason_value }),
1645            };
1646            self.push_event(event).await;
1647        }
1648
1649        Ok(self.status().await)
1650    }
1651
1652    /// Create a persona record.
1653    pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1654        let now = Utc::now();
1655        let mut personas = self.personas.write().await;
1656
1657        let mut parent_swarm_id = None;
1658        let mut computed_depth = 0_u32;
1659        let mut inherited_policy = None;
1660
1661        if let Some(parent_id) = req.parent_id.clone() {
1662            let parent = personas
1663                .get(&parent_id)
1664                .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1665
1666            if parent.status == PersonaStatus::Reaped {
1667                return Err(anyhow!("Parent persona {} is reaped", parent_id));
1668            }
1669
1670            parent_swarm_id = parent.identity.swarm_id.clone();
1671            computed_depth = parent.identity.depth.saturating_add(1);
1672            inherited_policy = Some(parent.policy.clone());
1673            let branch_limit = parent.policy.max_branching_factor;
1674
1675            let child_count = personas
1676                .values()
1677                .filter(|p| {
1678                    p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1679                        && p.status != PersonaStatus::Reaped
1680                })
1681                .count();
1682
1683            if child_count as u32 >= branch_limit {
1684                return Err(anyhow!(
1685                    "Parent {} reached branching limit {}",
1686                    parent_id,
1687                    branch_limit
1688                ));
1689            }
1690        }
1691
1692        let policy = req
1693            .policy
1694            .clone()
1695            .or(inherited_policy.clone())
1696            .unwrap_or_else(|| self.default_policy.clone());
1697
1698        let effective_depth_limit = inherited_policy
1699            .as_ref()
1700            .map(|p| p.max_spawn_depth)
1701            .unwrap_or(policy.max_spawn_depth);
1702
1703        if computed_depth > effective_depth_limit {
1704            return Err(anyhow!(
1705                "Spawn depth {} exceeds limit {}",
1706                computed_depth,
1707                effective_depth_limit
1708            ));
1709        }
1710
1711        let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1712        if personas.contains_key(&persona_id) {
1713            return Err(anyhow!("Persona id already exists: {}", persona_id));
1714        }
1715
1716        let identity = PersonaIdentity {
1717            id: persona_id.clone(),
1718            name: req.name,
1719            role: req.role,
1720            charter: req.charter,
1721            swarm_id: req.swarm_id.or(parent_swarm_id),
1722            parent_id: req.parent_id,
1723            depth: computed_depth,
1724            created_at: now,
1725            tags: req.tags,
1726        };
1727
1728        let persona = PersonaRuntimeState {
1729            identity,
1730            policy,
1731            status: PersonaStatus::Active,
1732            thought_count: 0,
1733            last_tick_at: None,
1734            updated_at: now,
1735            tokens_this_window: 0,
1736            compute_ms_this_window: 0,
1737            window_started_at: now,
1738            last_progress_at: now,
1739            budget_paused: false,
1740        };
1741
1742        personas.insert(persona_id, persona.clone());
1743        drop(personas);
1744
1745        self.push_event(ThoughtEvent {
1746            id: Uuid::new_v4().to_string(),
1747            event_type: ThoughtEventType::PersonaSpawned,
1748            persona_id: Some(persona.identity.id.clone()),
1749            swarm_id: persona.identity.swarm_id.clone(),
1750            timestamp: now,
1751            payload: json!({
1752                "name": persona.identity.name,
1753                "role": persona.identity.role,
1754                "depth": persona.identity.depth,
1755            }),
1756        })
1757        .await;
1758
1759        Ok(persona)
1760    }
1761
1762    /// Spawn a child persona under an existing parent.
1763    pub async fn spawn_child(
1764        &self,
1765        parent_id: &str,
1766        req: SpawnPersonaRequest,
1767    ) -> Result<PersonaRuntimeState> {
1768        let request = CreatePersonaRequest {
1769            persona_id: req.persona_id,
1770            name: req.name,
1771            role: req.role,
1772            charter: req.charter,
1773            swarm_id: req.swarm_id,
1774            parent_id: Some(parent_id.to_string()),
1775            policy: req.policy,
1776            tags: Vec::new(),
1777        };
1778        self.create_persona(request).await
1779    }
1780
1781    /// Reap one persona or the full descendant tree.
1782    pub async fn reap_persona(
1783        &self,
1784        persona_id: &str,
1785        req: ReapPersonaRequest,
1786    ) -> Result<ReapPersonaResponse> {
1787        let cascade = req.cascade.unwrap_or(false);
1788        let now = Utc::now();
1789
1790        let mut personas = self.personas.write().await;
1791        if !personas.contains_key(persona_id) {
1792            return Err(anyhow!("Persona not found: {}", persona_id));
1793        }
1794
1795        let mut reaped_ids = vec![persona_id.to_string()];
1796        if cascade {
1797            let mut idx = 0usize;
1798            while idx < reaped_ids.len() {
1799                let current = reaped_ids[idx].clone();
1800                let children: Vec<String> = personas
1801                    .values()
1802                    .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1803                    .map(|p| p.identity.id.clone())
1804                    .collect();
1805                for child in children {
1806                    if !reaped_ids.iter().any(|existing| existing == &child) {
1807                        reaped_ids.push(child);
1808                    }
1809                }
1810                idx += 1;
1811            }
1812        }
1813
1814        for id in &reaped_ids {
1815            if let Some(persona) = personas.get_mut(id) {
1816                persona.status = PersonaStatus::Reaped;
1817                persona.updated_at = now;
1818            }
1819        }
1820        drop(personas);
1821
1822        for id in &reaped_ids {
1823            self.push_event(ThoughtEvent {
1824                id: Uuid::new_v4().to_string(),
1825                event_type: ThoughtEventType::PersonaReaped,
1826                persona_id: Some(id.clone()),
1827                swarm_id: None,
1828                timestamp: now,
1829                payload: json!({
1830                    "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1831                    "cascade": cascade,
1832                }),
1833            })
1834            .await;
1835        }
1836
1837        Ok(ReapPersonaResponse {
1838            count: reaped_ids.len(),
1839            reaped_ids,
1840        })
1841    }
1842
1843    /// Get latest memory snapshot, if any.
1844    pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1845        self.snapshots.read().await.back().cloned()
1846    }
1847
1848    /// Build lineage graph from current persona state.
1849    pub async fn lineage_graph(&self) -> LineageGraph {
1850        let personas = self.personas.read().await;
1851        let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1852        let mut roots = Vec::new();
1853        let mut total_edges = 0usize;
1854
1855        for persona in personas.values() {
1856            if let Some(parent_id) = persona.identity.parent_id.clone() {
1857                children_by_parent
1858                    .entry(parent_id)
1859                    .or_default()
1860                    .push(persona.identity.id.clone());
1861                total_edges = total_edges.saturating_add(1);
1862            } else {
1863                roots.push(persona.identity.id.clone());
1864            }
1865        }
1866
1867        let mut nodes: Vec<LineageNode> = personas
1868            .values()
1869            .map(|persona| {
1870                let mut children = children_by_parent
1871                    .get(&persona.identity.id)
1872                    .cloned()
1873                    .unwrap_or_default();
1874                children.sort();
1875
1876                LineageNode {
1877                    persona_id: persona.identity.id.clone(),
1878                    parent_id: persona.identity.parent_id.clone(),
1879                    children,
1880                    depth: persona.identity.depth,
1881                    status: persona.status,
1882                }
1883            })
1884            .collect();
1885
1886        nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1887        roots.sort();
1888
1889        LineageGraph {
1890            nodes,
1891            roots,
1892            total_edges,
1893        }
1894    }
1895
1896    /// Return a summary status.
1897    pub async fn status(&self) -> CognitionStatus {
1898        let personas = self.personas.read().await;
1899        let events = self.events.read().await;
1900        let snapshots = self.snapshots.read().await;
1901        let started_at = *self.started_at.read().await;
1902        let last_tick_at = *self.last_tick_at.read().await;
1903        let loop_interval_ms = *self.loop_interval_ms.read().await;
1904
1905        let active_persona_count = personas
1906            .values()
1907            .filter(|p| p.status == PersonaStatus::Active)
1908            .count();
1909
1910        CognitionStatus {
1911            enabled: self.enabled,
1912            running: self.running.load(Ordering::SeqCst),
1913            loop_interval_ms,
1914            started_at,
1915            last_tick_at,
1916            persona_count: personas.len(),
1917            active_persona_count,
1918            events_buffered: events.len(),
1919            snapshots_buffered: snapshots.len(),
1920        }
1921    }
1922
1923    async fn push_event(&self, event: ThoughtEvent) {
1924        push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1925    }
1926
1927    /// Set the tool registry for capability-based tool execution.
1928    pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1929        self.tools = Some(registry);
1930    }
1931
1932    /// Get current beliefs.
1933    pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1934        self.beliefs.read().await.clone()
1935    }
1936
1937    /// Get a single belief by ID.
1938    pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1939        self.beliefs.read().await.get(id).cloned()
1940    }
1941
1942    /// Get the current attention queue.
1943    pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1944        self.attention_queue.read().await.clone()
1945    }
1946
1947    /// Get all proposals.
1948    pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1949        self.proposals.read().await.clone()
1950    }
1951
1952    /// Get the current global workspace.
1953    pub async fn get_workspace(&self) -> GlobalWorkspace {
1954        self.workspace.read().await.clone()
1955    }
1956
1957    /// Get decision receipts.
1958    pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1959        self.receipts.read().await.clone()
1960    }
1961
1962    /// Approve a Critical-risk proposal for execution.
1963    pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1964        let proposals = self.proposals.read().await;
1965        let proposal = proposals
1966            .get(proposal_id)
1967            .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1968
1969        if proposal.risk != ProposalRisk::Critical {
1970            return Err(anyhow!("Only Critical proposals require human approval"));
1971        }
1972        if proposal.status != ProposalStatus::Verified {
1973            return Err(anyhow!("Proposal is not in Verified status"));
1974        }
1975        drop(proposals);
1976
1977        let mut approvals = self.pending_approvals.write().await;
1978        approvals.insert(proposal_id.to_string(), true);
1979        Ok(())
1980    }
1981
1982    /// Get governance settings.
1983    pub async fn get_governance(&self) -> SwarmGovernance {
1984        self.governance.read().await.clone()
1985    }
1986
1987    /// Get persona state for a specific persona.
1988    pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1989        self.personas.read().await.get(id).cloned()
1990    }
1991}
1992
1993async fn push_event_internal(
1994    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1995    max_events: usize,
1996    event_tx: &broadcast::Sender<ThoughtEvent>,
1997    event: ThoughtEvent,
1998) {
1999    {
2000        let mut lock = events.write().await;
2001        lock.push_back(event.clone());
2002        while lock.len() > max_events {
2003            lock.pop_front();
2004        }
2005    }
2006    let _ = event_tx.send(event);
2007}
2008
2009async fn push_snapshot_internal(
2010    snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
2011    max_snapshots: usize,
2012    snapshot: MemorySnapshot,
2013) {
2014    let mut lock = snapshots.write().await;
2015    lock.push_back(snapshot);
2016    while lock.len() > max_snapshots {
2017        lock.pop_front();
2018    }
2019}
2020
2021async fn recent_persona_context(
2022    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
2023    persona_id: &str,
2024    limit: usize,
2025) -> Vec<ThoughtEvent> {
2026    let lock = events.read().await;
2027    let mut selected: Vec<ThoughtEvent> = lock
2028        .iter()
2029        .rev()
2030        .filter(|event| {
2031            event.persona_id.as_deref() == Some(persona_id)
2032                || (event.persona_id.is_none()
2033                    && matches!(
2034                        event.event_type,
2035                        ThoughtEventType::CheckResult
2036                            | ThoughtEventType::ProposalCreated
2037                            | ThoughtEventType::SnapshotCompressed
2038                            | ThoughtEventType::WorkspaceUpdated
2039                            | ThoughtEventType::ActionExecuted
2040                            | ThoughtEventType::BudgetPaused
2041                    ))
2042        })
2043        .take(limit)
2044        .cloned()
2045        .collect();
2046    selected.reverse();
2047    selected
2048}
2049
2050async fn generate_phase_thought(
2051    thinker: Option<&ThinkerClient>,
2052    work: &ThoughtWorkItem,
2053    context: &[ThoughtEvent],
2054) -> ThoughtResult {
2055    let started_at = Instant::now();
2056    if let Some(client) = thinker {
2057        let (system_prompt, user_prompt) = build_phase_prompts(work, context);
2058        match client.think(&system_prompt, &user_prompt).await {
2059            Ok(output) => {
2060                let thinking = normalize_thought_output(work, context, &output.text);
2061                if !thinking.is_empty() {
2062                    return ThoughtResult {
2063                        source: "model",
2064                        model: Some(output.model),
2065                        finish_reason: output.finish_reason,
2066                        thinking,
2067                        prompt_tokens: output.prompt_tokens,
2068                        completion_tokens: output.completion_tokens,
2069                        total_tokens: output.total_tokens,
2070                        latency_ms: started_at.elapsed().as_millis(),
2071                        error: None,
2072                    };
2073                }
2074            }
2075            Err(error) => {
2076                return ThoughtResult {
2077                    source: "fallback",
2078                    model: None,
2079                    finish_reason: None,
2080                    thinking: fallback_phase_text(work, context),
2081                    prompt_tokens: None,
2082                    completion_tokens: None,
2083                    total_tokens: None,
2084                    latency_ms: started_at.elapsed().as_millis(),
2085                    error: Some(error.to_string()),
2086                };
2087            }
2088        }
2089    }
2090
2091    ThoughtResult {
2092        source: "fallback",
2093        model: None,
2094        finish_reason: None,
2095        thinking: fallback_phase_text(work, context),
2096        prompt_tokens: None,
2097        completion_tokens: None,
2098        total_tokens: None,
2099        latency_ms: started_at.elapsed().as_millis(),
2100        error: None,
2101    }
2102}
2103
2104fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
2105    let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
2106Respond with concise plain text only. Do not include markdown, XML, or code fences. \
2107Write as an operational process update, not meta narration. \
2108Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
2109Output concrete findings, checks, risks, and next actions. \
2110Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
2111        .to_string();
2112
2113    let context_lines = if context.is_empty() {
2114        "none".to_string()
2115    } else {
2116        context
2117            .iter()
2118            .map(format_context_event)
2119            .collect::<Vec<_>>()
2120            .join("\n")
2121    };
2122
2123    let phase_instruction = match work.phase {
2124        ThoughtPhase::Observe => {
2125            "Process format (exact line labels): \
2126Phase: Observe | Goal: detect current customer/business risk | \
2127Signals: 1-3 concrete signals separated by '; ' | \
2128Uncertainty: one unknown that blocks confidence | \
2129Next_Action: one immediate operational action."
2130        }
2131        ThoughtPhase::Reflect => {
2132            "Process format (exact line labels): \
2133Phase: Reflect | Hypothesis: single testable hypothesis | \
2134Rationale: why this is likely | \
2135Business_Risk: customer/revenue/SLA impact | \
2136Validation_Next_Action: one action to confirm or falsify."
2137        }
2138        ThoughtPhase::Test => {
2139            "Process format (exact line labels): \
2140Phase: Test | Check: single concrete check | \
2141Procedure: short executable procedure | \
2142Expected_Result: pass/fail expectation | \
2143Evidence_Quality: low|medium|high with reason | \
2144Escalation_Trigger: when to escalate immediately."
2145        }
2146        ThoughtPhase::Compress => {
2147            "Process format (exact line labels): \
2148Phase: Compress | State_Summary: current state in one line | \
2149Retained_Facts: 3 short facts separated by '; ' | \
2150Open_Risks: up to 2 unresolved risks separated by '; ' | \
2151Next_Process_Step: next operational step."
2152        }
2153    };
2154
2155    let user_prompt = format!(
2156        "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2157        phase = work.phase.as_str(),
2158        persona_id = work.persona_id,
2159        persona_name = work.persona_name,
2160        role = work.role,
2161        charter = work.charter,
2162        thought_count = work.thought_count,
2163        context = context_lines,
2164        instruction = phase_instruction
2165    );
2166
2167    (system_prompt, user_prompt)
2168}
2169
2170fn format_context_event(event: &ThoughtEvent) -> String {
2171    let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2172    format!(
2173        "{} {} {}",
2174        event.event_type.as_str(),
2175        event.timestamp.to_rfc3339(),
2176        trim_for_storage(&payload, 220)
2177    )
2178}
2179
2180fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2181    let charter = trim_for_storage(&work.charter, 180);
2182    let context_summary = fallback_context_summary(context);
2183    let thought = match work.phase {
2184        ThoughtPhase::Observe => format!(
2185            "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2186            work.role, charter, context_summary
2187        ),
2188        ThoughtPhase::Reflect => "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.".to_string(),
2189        ThoughtPhase::Test => "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.".to_string(),
2190        ThoughtPhase::Compress => format!(
2191            "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2192            work.role, charter, context_summary
2193        ),
2194    };
2195    trim_for_storage(&thought, 1_200)
2196}
2197
2198fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2199    let trimmed = trim_for_storage(raw, 2_000);
2200    if trimmed.trim().is_empty() {
2201        return fallback_phase_text(work, context);
2202    }
2203
2204    // Prefer process-labeled content if the model emitted a preamble first.
2205    if let Some(idx) = find_process_label_start(&trimmed) {
2206        let candidate = trimmed[idx..].trim();
2207        // Return the full candidate (all structured fields), not just the first line.
2208        if !candidate.is_empty()
2209            && !candidate.contains('<')
2210            && !has_template_placeholder_values(candidate)
2211        {
2212            // If it's a multi-line structured output, collapse to pipe-delimited single line.
2213            let collapsed: String = candidate
2214                .lines()
2215                .map(str::trim)
2216                .filter(|l| !l.is_empty())
2217                .collect::<Vec<_>>()
2218                .join(" | ");
2219            let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2220            if cleaned.starts_with("Phase:") {
2221                return cleaned.to_string();
2222            }
2223            return collapsed;
2224        }
2225    }
2226
2227    let lower = trimmed.to_ascii_lowercase();
2228    let looks_meta = lower.starts_with("we need")
2229        || lower.starts_with("i need")
2230        || lower.contains("we need to")
2231        || lower.contains("i need to")
2232        || lower.contains("must output")
2233        || lower.contains("let's ")
2234        || lower.contains("we have to");
2235
2236    if looks_meta || has_template_placeholder_values(&trimmed) {
2237        return fallback_phase_text(work, context);
2238    }
2239
2240    trimmed
2241}
2242
2243fn has_template_placeholder_values(text: &str) -> bool {
2244    let lower = text.to_ascii_lowercase();
2245    [
2246        "goal: ...",
2247        "signals: ...",
2248        "uncertainty: ...",
2249        "next_action: ...",
2250        "hypothesis: ...",
2251        "rationale: ...",
2252        "business_risk: ...",
2253        "validation_next_action: ...",
2254        "check: ...",
2255        "procedure: ...",
2256        "expected_result: ...",
2257        "evidence_quality: ...",
2258        "escalation_trigger: ...",
2259        "state_summary: ...",
2260        "retained_facts: ...",
2261        "open_risks: ...",
2262        "next_process_step: ...",
2263    ]
2264    .iter()
2265    .any(|needle| lower.contains(needle))
2266        || lower.contains("<...")
2267        || lower.contains("tbd")
2268        || lower.contains("todo")
2269}
2270
2271fn find_process_label_start(text: &str) -> Option<usize> {
2272    [
2273        "Phase: Observe",
2274        "Phase: Reflect",
2275        "Phase: Test",
2276        "Phase: Compress",
2277        "Phase:",
2278    ]
2279    .iter()
2280    .filter_map(|label| text.find(label))
2281    .min()
2282}
2283
2284fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2285    if context.is_empty() {
2286        return "No prior events recorded yet.".to_string();
2287    }
2288
2289    let mut latest_error: Option<String> = None;
2290    let mut latest_proposal: Option<String> = None;
2291    let mut latest_check: Option<String> = None;
2292
2293    for event in context.iter().rev() {
2294        if latest_error.is_none()
2295            && let Some(error) = event
2296                .payload
2297                .get("error")
2298                .and_then(serde_json::Value::as_str)
2299            && !error.trim().is_empty()
2300        {
2301            latest_error = Some(trim_for_storage(error, 140));
2302        }
2303
2304        if latest_proposal.is_none()
2305            && event.event_type == ThoughtEventType::ProposalCreated
2306            && let Some(title) = event
2307                .payload
2308                .get("title")
2309                .and_then(serde_json::Value::as_str)
2310            && !title.trim().is_empty()
2311            && !has_template_placeholder_values(title)
2312        {
2313            latest_proposal = Some(trim_for_storage(title, 120));
2314        }
2315
2316        if latest_check.is_none()
2317            && event.event_type == ThoughtEventType::CheckResult
2318            && let Some(result) = event
2319                .payload
2320                .get("result_excerpt")
2321                .and_then(serde_json::Value::as_str)
2322            && !result.trim().is_empty()
2323            && !has_template_placeholder_values(result)
2324        {
2325            latest_check = Some(trim_for_storage(result, 140));
2326        }
2327
2328        if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2329            break;
2330        }
2331    }
2332
2333    let mut lines = vec![format!(
2334        "{} recent cognition events are available.",
2335        context.len()
2336    )];
2337    if let Some(error) = latest_error {
2338        lines.push(format!("Latest error signal: {}.", error));
2339    }
2340    if let Some(proposal) = latest_proposal {
2341        lines.push(format!("Recent proposal: {}.", proposal));
2342    }
2343    if let Some(check) = latest_check {
2344        lines.push(format!("Recent check: {}.", check));
2345    }
2346    lines.join(" ")
2347}
2348
2349fn trim_for_storage(input: &str, max_chars: usize) -> String {
2350    if input.chars().count() <= max_chars {
2351        return input.trim().to_string();
2352    }
2353    let mut trimmed = String::with_capacity(max_chars + 8);
2354    for ch in input.chars().take(max_chars) {
2355        trimmed.push(ch);
2356    }
2357    trimmed.push_str("...");
2358    trimmed.trim().to_string()
2359}
2360
2361fn estimate_fact_count(text: &str) -> usize {
2362    let sentence_count =
2363        text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2364    sentence_count.clamp(1, 12)
2365}
2366
2367fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2368    let first_line = thought
2369        .lines()
2370        .find(|line| !line.trim().is_empty())
2371        .unwrap_or("proposal");
2372    let compact = first_line
2373        .replace(['\t', '\r', '\n'], " ")
2374        .split_whitespace()
2375        .collect::<Vec<_>>()
2376        .join(" ");
2377    let trimmed = trim_for_storage(&compact, 72);
2378    if trimmed.is_empty() {
2379        format!("proposal-{}", thought_count)
2380    } else {
2381        trimmed
2382    }
2383}
2384
2385fn default_seed_persona() -> CreatePersonaRequest {
2386    CreatePersonaRequest {
2387        persona_id: Some("root-thinker".to_string()),
2388        name: "root-thinker".to_string(),
2389        role: "orchestrator".to_string(),
2390        charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2391            .to_string(),
2392        swarm_id: Some("swarm-core".to_string()),
2393        parent_id: None,
2394        policy: None,
2395        tags: vec!["orchestration".to_string()],
2396    }
2397}
2398
2399fn normalize_thinker_endpoint(base_url: &str) -> String {
2400    let trimmed = base_url.trim().trim_end_matches('/');
2401    if trimmed.ends_with("/chat/completions") {
2402        return trimmed.to_string();
2403    }
2404    if trimmed.is_empty() {
2405        return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2406    }
2407    format!("{}/chat/completions", trimmed)
2408}
2409
2410fn env_bool(name: &str, default: bool) -> bool {
2411    std::env::var(name)
2412        .ok()
2413        .and_then(|v| match v.to_ascii_lowercase().as_str() {
2414            "1" | "true" | "yes" | "on" => Some(true),
2415            "0" | "false" | "no" | "off" => Some(false),
2416            _ => None,
2417        })
2418        .unwrap_or(default)
2419}
2420
2421fn env_f32(name: &str, default: f32) -> f32 {
2422    std::env::var(name)
2423        .ok()
2424        .and_then(|v| v.parse::<f32>().ok())
2425        .unwrap_or(default)
2426}
2427
2428fn env_u64(name: &str, default: u64) -> u64 {
2429    std::env::var(name)
2430        .ok()
2431        .and_then(|v| v.parse::<u64>().ok())
2432        .unwrap_or(default)
2433}
2434
2435fn env_u32(name: &str, default: u32) -> u32 {
2436    std::env::var(name)
2437        .ok()
2438        .and_then(|v| v.parse::<u32>().ok())
2439        .unwrap_or(default)
2440}
2441
2442fn env_usize(name: &str, default: usize) -> usize {
2443    std::env::var(name)
2444        .ok()
2445        .and_then(|v| v.parse::<usize>().ok())
2446        .unwrap_or(default)
2447}
2448
2449#[cfg(test)]
2450mod tests {
2451    use super::*;
2452
2453    fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2454        ThoughtWorkItem {
2455            persona_id: "p-1".to_string(),
2456            persona_name: "Spotlessbinco Business Thinker".to_string(),
2457            role: "principal reliability engineer".to_string(),
2458            charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2459                .to_string(),
2460            swarm_id: Some("spotlessbinco".to_string()),
2461            thought_count: 4,
2462            phase,
2463        }
2464    }
2465
2466    fn test_runtime() -> CognitionRuntime {
2467        CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2468            enabled: true,
2469            loop_interval_ms: 25,
2470            max_events: 256,
2471            max_snapshots: 32,
2472            default_policy: PersonaPolicy {
2473                max_spawn_depth: 2,
2474                max_branching_factor: 2,
2475                token_budget_per_minute: 1_000,
2476                compute_ms_per_minute: 1_000,
2477                idle_ttl_secs: 300,
2478                share_memory: false,
2479                allowed_tools: Vec::new(),
2480            },
2481        })
2482    }
2483
2484    #[test]
2485    fn normalize_rejects_placeholder_process_line() {
2486        let work = sample_work_item(ThoughtPhase::Compress);
2487        let output = normalize_thought_output(
2488            &work,
2489            &[],
2490            "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2491        );
2492        assert!(
2493            output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2494        );
2495        assert!(!output.contains("State_Summary: ..."));
2496    }
2497
2498    #[test]
2499    fn normalize_accepts_concrete_process_line() {
2500        let work = sample_work_item(ThoughtPhase::Test);
2501        let output = normalize_thought_output(
2502            &work,
2503            &[],
2504            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2505        );
2506        assert_eq!(
2507            output,
2508            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2509        );
2510    }
2511
2512    #[tokio::test]
2513    async fn create_spawn_and_lineage_work() {
2514        let runtime = test_runtime();
2515
2516        let root = runtime
2517            .create_persona(CreatePersonaRequest {
2518                persona_id: Some("root".to_string()),
2519                name: "root".to_string(),
2520                role: "orchestrator".to_string(),
2521                charter: "coordinate".to_string(),
2522                swarm_id: Some("swarm-a".to_string()),
2523                parent_id: None,
2524                policy: None,
2525                tags: Vec::new(),
2526            })
2527            .await
2528            .expect("root should be created");
2529
2530        assert_eq!(root.identity.depth, 0);
2531
2532        let child = runtime
2533            .spawn_child(
2534                "root",
2535                SpawnPersonaRequest {
2536                    persona_id: Some("child-1".to_string()),
2537                    name: "child-1".to_string(),
2538                    role: "analyst".to_string(),
2539                    charter: "analyze".to_string(),
2540                    swarm_id: None,
2541                    policy: None,
2542                },
2543            )
2544            .await
2545            .expect("child should spawn");
2546
2547        assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2548        assert_eq!(child.identity.depth, 1);
2549
2550        let lineage = runtime.lineage_graph().await;
2551        assert_eq!(lineage.total_edges, 1);
2552        assert_eq!(lineage.roots, vec!["root".to_string()]);
2553    }
2554
2555    #[tokio::test]
2556    async fn branching_and_depth_limits_are_enforced() {
2557        let runtime = test_runtime();
2558
2559        runtime
2560            .create_persona(CreatePersonaRequest {
2561                persona_id: Some("root".to_string()),
2562                name: "root".to_string(),
2563                role: "orchestrator".to_string(),
2564                charter: "coordinate".to_string(),
2565                swarm_id: Some("swarm-a".to_string()),
2566                parent_id: None,
2567                policy: None,
2568                tags: Vec::new(),
2569            })
2570            .await
2571            .expect("root should be created");
2572
2573        runtime
2574            .spawn_child(
2575                "root",
2576                SpawnPersonaRequest {
2577                    persona_id: Some("c1".to_string()),
2578                    name: "c1".to_string(),
2579                    role: "worker".to_string(),
2580                    charter: "run".to_string(),
2581                    swarm_id: None,
2582                    policy: None,
2583                },
2584            )
2585            .await
2586            .expect("first child should spawn");
2587
2588        runtime
2589            .spawn_child(
2590                "root",
2591                SpawnPersonaRequest {
2592                    persona_id: Some("c2".to_string()),
2593                    name: "c2".to_string(),
2594                    role: "worker".to_string(),
2595                    charter: "run".to_string(),
2596                    swarm_id: None,
2597                    policy: None,
2598                },
2599            )
2600            .await
2601            .expect("second child should spawn");
2602
2603        let third_child = runtime
2604            .spawn_child(
2605                "root",
2606                SpawnPersonaRequest {
2607                    persona_id: Some("c3".to_string()),
2608                    name: "c3".to_string(),
2609                    role: "worker".to_string(),
2610                    charter: "run".to_string(),
2611                    swarm_id: None,
2612                    policy: None,
2613                },
2614            )
2615            .await;
2616        assert!(third_child.is_err());
2617
2618        runtime
2619            .spawn_child(
2620                "c1",
2621                SpawnPersonaRequest {
2622                    persona_id: Some("c1-1".to_string()),
2623                    name: "c1-1".to_string(),
2624                    role: "worker".to_string(),
2625                    charter: "run".to_string(),
2626                    swarm_id: None,
2627                    policy: None,
2628                },
2629            )
2630            .await
2631            .expect("depth 2 should be allowed");
2632
2633        let depth_violation = runtime
2634            .spawn_child(
2635                "c1-1",
2636                SpawnPersonaRequest {
2637                    persona_id: Some("c1-1-1".to_string()),
2638                    name: "c1-1-1".to_string(),
2639                    role: "worker".to_string(),
2640                    charter: "run".to_string(),
2641                    swarm_id: None,
2642                    policy: None,
2643                },
2644            )
2645            .await;
2646        assert!(depth_violation.is_err());
2647    }
2648
2649    #[tokio::test]
2650    async fn start_stop_updates_runtime_status() {
2651        let runtime = test_runtime();
2652
2653        runtime
2654            .start(Some(StartCognitionRequest {
2655                loop_interval_ms: Some(10),
2656                seed_persona: Some(CreatePersonaRequest {
2657                    persona_id: Some("seed".to_string()),
2658                    name: "seed".to_string(),
2659                    role: "watcher".to_string(),
2660                    charter: "observe".to_string(),
2661                    swarm_id: Some("swarm-seed".to_string()),
2662                    parent_id: None,
2663                    policy: None,
2664                    tags: Vec::new(),
2665                }),
2666            }))
2667            .await
2668            .expect("runtime should start");
2669
2670        tokio::time::sleep(Duration::from_millis(60)).await;
2671        let running_status = runtime.status().await;
2672        assert!(running_status.running);
2673        assert!(running_status.events_buffered > 0);
2674
2675        runtime
2676            .stop(Some("test".to_string()))
2677            .await
2678            .expect("runtime should stop");
2679        let stopped_status = runtime.status().await;
2680        assert!(!stopped_status.running);
2681    }
2682
2683    #[tokio::test]
2684    async fn zero_budget_persona_is_skipped() {
2685        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2686            enabled: true,
2687            loop_interval_ms: 10,
2688            max_events: 256,
2689            max_snapshots: 32,
2690            default_policy: PersonaPolicy {
2691                max_spawn_depth: 2,
2692                max_branching_factor: 2,
2693                token_budget_per_minute: 0,
2694                compute_ms_per_minute: 0,
2695                idle_ttl_secs: 3_600,
2696                share_memory: false,
2697                allowed_tools: Vec::new(),
2698            },
2699        });
2700
2701        let persona = runtime
2702            .create_persona(CreatePersonaRequest {
2703                persona_id: Some("budget-test".to_string()),
2704                name: "budget-test".to_string(),
2705                role: "tester".to_string(),
2706                charter: "test budgets".to_string(),
2707                swarm_id: None,
2708                parent_id: None,
2709                policy: None,
2710                tags: Vec::new(),
2711            })
2712            .await
2713            .expect("should create persona");
2714
2715        assert_eq!(persona.tokens_this_window, 0);
2716        assert_eq!(persona.compute_ms_this_window, 0);
2717
2718        // Start and run a few ticks
2719        runtime.start(None).await.expect("should start");
2720        tokio::time::sleep(Duration::from_millis(50)).await;
2721        runtime.stop(None).await.expect("should stop");
2722
2723        // Persona should still have 0 thought count (budget blocked)
2724        let p = runtime.get_persona("budget-test").await.unwrap();
2725        assert_eq!(p.thought_count, 0);
2726        assert!(p.budget_paused);
2727    }
2728
2729    #[tokio::test]
2730    async fn budget_counters_reset_after_window() {
2731        let now = Utc::now();
2732        let mut persona = PersonaRuntimeState {
2733            identity: PersonaIdentity {
2734                id: "p1".to_string(),
2735                name: "test".to_string(),
2736                role: "tester".to_string(),
2737                charter: "test".to_string(),
2738                swarm_id: None,
2739                parent_id: None,
2740                depth: 0,
2741                created_at: now,
2742                tags: Vec::new(),
2743            },
2744            policy: PersonaPolicy::default(),
2745            status: PersonaStatus::Active,
2746            thought_count: 0,
2747            last_tick_at: None,
2748            updated_at: now,
2749            tokens_this_window: 5000,
2750            compute_ms_this_window: 3000,
2751            window_started_at: now - ChronoDuration::seconds(61),
2752            last_progress_at: now,
2753            budget_paused: false,
2754        };
2755
2756        // Simulate window reset check
2757        let window_elapsed = now
2758            .signed_duration_since(persona.window_started_at)
2759            .num_seconds();
2760        assert!(window_elapsed >= 60);
2761
2762        // Reset
2763        persona.tokens_this_window = 0;
2764        persona.compute_ms_this_window = 0;
2765        persona.window_started_at = now;
2766
2767        assert_eq!(persona.tokens_this_window, 0);
2768        assert_eq!(persona.compute_ms_this_window, 0);
2769    }
2770
2771    #[tokio::test]
2772    async fn idle_persona_is_reaped() {
2773        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2774            enabled: true,
2775            loop_interval_ms: 10,
2776            max_events: 256,
2777            max_snapshots: 32,
2778            default_policy: PersonaPolicy {
2779                max_spawn_depth: 2,
2780                max_branching_factor: 2,
2781                token_budget_per_minute: 20_000,
2782                compute_ms_per_minute: 10_000,
2783                idle_ttl_secs: 0, // 0 TTL = immediately idle
2784                share_memory: false,
2785                allowed_tools: Vec::new(),
2786            },
2787        });
2788
2789        runtime
2790            .create_persona(CreatePersonaRequest {
2791                persona_id: Some("idle-test".to_string()),
2792                name: "idle-test".to_string(),
2793                role: "idler".to_string(),
2794                charter: "idle away".to_string(),
2795                swarm_id: None,
2796                parent_id: None,
2797                policy: None,
2798                tags: Vec::new(),
2799            })
2800            .await
2801            .expect("should create persona");
2802
2803        // Manually set last_progress_at to the past
2804        {
2805            let mut personas = runtime.personas.write().await;
2806            if let Some(p) = personas.get_mut("idle-test") {
2807                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2808            }
2809        }
2810
2811        runtime.start(None).await.expect("should start");
2812        tokio::time::sleep(Duration::from_millis(100)).await;
2813        runtime.stop(None).await.expect("should stop");
2814
2815        let p = runtime.get_persona("idle-test").await.unwrap();
2816        assert_eq!(p.status, PersonaStatus::Reaped);
2817    }
2818
2819    #[tokio::test]
2820    async fn budget_paused_persona_not_reaped_for_idle() {
2821        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2822            enabled: true,
2823            loop_interval_ms: 10,
2824            max_events: 256,
2825            max_snapshots: 32,
2826            default_policy: PersonaPolicy {
2827                max_spawn_depth: 2,
2828                max_branching_factor: 2,
2829                token_budget_per_minute: 0, // zero budget = always paused
2830                compute_ms_per_minute: 0,
2831                idle_ttl_secs: 0, // 0 TTL
2832                share_memory: false,
2833                allowed_tools: Vec::new(),
2834            },
2835        });
2836
2837        runtime
2838            .create_persona(CreatePersonaRequest {
2839                persona_id: Some("paused-test".to_string()),
2840                name: "paused-test".to_string(),
2841                role: "pauser".to_string(),
2842                charter: "pause".to_string(),
2843                swarm_id: None,
2844                parent_id: None,
2845                policy: None,
2846                tags: Vec::new(),
2847            })
2848            .await
2849            .expect("should create persona");
2850
2851        // Set last_progress_at to the past to trigger idle check
2852        {
2853            let mut personas = runtime.personas.write().await;
2854            if let Some(p) = personas.get_mut("paused-test") {
2855                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2856            }
2857        }
2858
2859        runtime.start(None).await.expect("should start");
2860        tokio::time::sleep(Duration::from_millis(100)).await;
2861        runtime.stop(None).await.expect("should stop");
2862
2863        let p = runtime.get_persona("paused-test").await.unwrap();
2864        // Budget-paused personas are NOT reaped for idle
2865        assert_eq!(p.status, PersonaStatus::Active);
2866        assert!(p.budget_paused);
2867    }
2868
2869    #[tokio::test]
2870    async fn governance_proposal_resolution() {
2871        let runtime = test_runtime();
2872        let gov = SwarmGovernance {
2873            quorum_fraction: 0.5,
2874            required_approvers_by_role: HashMap::new(),
2875            veto_roles: vec!["auditor".to_string()],
2876            vote_timeout_secs: 300,
2877        };
2878        *runtime.governance.write().await = gov;
2879
2880        // Create two personas
2881        runtime
2882            .create_persona(CreatePersonaRequest {
2883                persona_id: Some("voter-1".to_string()),
2884                name: "voter-1".to_string(),
2885                role: "engineer".to_string(),
2886                charter: "vote".to_string(),
2887                swarm_id: None,
2888                parent_id: None,
2889                policy: None,
2890                tags: Vec::new(),
2891            })
2892            .await
2893            .unwrap();
2894        runtime
2895            .create_persona(CreatePersonaRequest {
2896                persona_id: Some("voter-2".to_string()),
2897                name: "voter-2".to_string(),
2898                role: "engineer".to_string(),
2899                charter: "vote".to_string(),
2900                swarm_id: None,
2901                parent_id: None,
2902                policy: None,
2903                tags: Vec::new(),
2904            })
2905            .await
2906            .unwrap();
2907
2908        // Insert a proposal with votes
2909        {
2910            let mut proposals = runtime.proposals.write().await;
2911            let mut votes = HashMap::new();
2912            votes.insert("voter-1".to_string(), ProposalVote::Approve);
2913            proposals.insert(
2914                "prop-1".to_string(),
2915                Proposal {
2916                    id: "prop-1".to_string(),
2917                    persona_id: "voter-1".to_string(),
2918                    title: "test proposal".to_string(),
2919                    rationale: "testing governance".to_string(),
2920                    evidence_refs: Vec::new(),
2921                    risk: ProposalRisk::Low,
2922                    status: ProposalStatus::Created,
2923                    created_at: Utc::now(),
2924                    votes,
2925                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2926                    votes_requested: true,
2927                    quorum_needed: 1,
2928                },
2929            );
2930        }
2931
2932        // quorum = 0.5 * 2 = 1, and we have 1 approval → should be verified
2933        runtime.start(None).await.unwrap();
2934        tokio::time::sleep(Duration::from_millis(100)).await;
2935        runtime.stop(None).await.unwrap();
2936
2937        let proposals = runtime.get_proposals().await;
2938        let prop = proposals.get("prop-1").unwrap();
2939        // Should be verified or executed (it gets verified then executed in same tick)
2940        assert!(
2941            prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2942            "Expected Verified or Executed, got {:?}",
2943            prop.status
2944        );
2945    }
2946
2947    #[tokio::test]
2948    async fn veto_rejects_proposal() {
2949        let runtime = test_runtime();
2950        let gov = SwarmGovernance {
2951            quorum_fraction: 0.5,
2952            required_approvers_by_role: HashMap::new(),
2953            veto_roles: vec!["auditor".to_string()],
2954            vote_timeout_secs: 300,
2955        };
2956        *runtime.governance.write().await = gov;
2957
2958        runtime
2959            .create_persona(CreatePersonaRequest {
2960                persona_id: Some("eng".to_string()),
2961                name: "eng".to_string(),
2962                role: "engineer".to_string(),
2963                charter: "build".to_string(),
2964                swarm_id: None,
2965                parent_id: None,
2966                policy: None,
2967                tags: Vec::new(),
2968            })
2969            .await
2970            .unwrap();
2971        runtime
2972            .create_persona(CreatePersonaRequest {
2973                persona_id: Some("aud".to_string()),
2974                name: "aud".to_string(),
2975                role: "auditor".to_string(),
2976                charter: "audit".to_string(),
2977                swarm_id: None,
2978                parent_id: None,
2979                policy: None,
2980                tags: Vec::new(),
2981            })
2982            .await
2983            .unwrap();
2984
2985        {
2986            let mut proposals = runtime.proposals.write().await;
2987            let mut votes = HashMap::new();
2988            votes.insert("eng".to_string(), ProposalVote::Approve);
2989            votes.insert("aud".to_string(), ProposalVote::Veto);
2990            proposals.insert(
2991                "prop-veto".to_string(),
2992                Proposal {
2993                    id: "prop-veto".to_string(),
2994                    persona_id: "eng".to_string(),
2995                    title: "vetoed proposal".to_string(),
2996                    rationale: "testing veto".to_string(),
2997                    evidence_refs: Vec::new(),
2998                    risk: ProposalRisk::Low,
2999                    status: ProposalStatus::Created,
3000                    created_at: Utc::now(),
3001                    votes,
3002                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
3003                    votes_requested: true,
3004                    quorum_needed: 1,
3005                },
3006            );
3007        }
3008
3009        runtime.start(None).await.unwrap();
3010        tokio::time::sleep(Duration::from_millis(100)).await;
3011        runtime.stop(None).await.unwrap();
3012
3013        let proposals = runtime.get_proposals().await;
3014        let prop = proposals.get("prop-veto").unwrap();
3015        assert_eq!(prop.status, ProposalStatus::Rejected);
3016    }
3017
3018    #[test]
3019    fn global_workspace_default() {
3020        let ws = GlobalWorkspace::default();
3021        assert!(ws.top_beliefs.is_empty());
3022        assert!(ws.top_uncertainties.is_empty());
3023        assert!(ws.top_attention.is_empty());
3024    }
3025
3026    #[test]
3027    fn attention_item_creation() {
3028        let item = AttentionItem {
3029            id: "a1".to_string(),
3030            topic: "test topic".to_string(),
3031            topic_tags: vec!["reliability".to_string()],
3032            priority: 0.8,
3033            source_type: AttentionSource::ContestedBelief,
3034            source_id: "b1".to_string(),
3035            assigned_persona: None,
3036            created_at: Utc::now(),
3037            resolved_at: None,
3038        };
3039        assert!(item.resolved_at.is_none());
3040        assert_eq!(item.priority, 0.8);
3041    }
3042}