Skip to main content

codetether_agent/cognition/
mod.rs

1//! Perpetual cognition runtime for persona swarms.
2//!
3//! Phase 0 scope:
4//! - Contract types for personas, thought events, proposals, and snapshots
5//! - In-memory runtime manager for lifecycle + lineage
6//! - Feature-flagged perpetual loop (no external execution side effects)
7
8pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16pub use thinker::{
17    CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
18};
19
20use crate::tool::ToolRegistry;
21use anyhow::{Result, anyhow};
22use beliefs::{Belief, BeliefStatus};
23use chrono::{DateTime, Duration as ChronoDuration, Utc};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::time::Duration;
30use tokio::sync::{Mutex, RwLock, broadcast};
31use tokio::task::JoinHandle;
32use tokio::time::Instant;
33use uuid::Uuid;
34
35/// Persona execution status.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub enum PersonaStatus {
39    Active,
40    Idle,
41    Reaped,
42    Error,
43}
44
45/// Policy boundaries for a persona.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct PersonaPolicy {
48    pub max_spawn_depth: u32,
49    pub max_branching_factor: u32,
50    pub token_budget_per_minute: u32,
51    pub compute_ms_per_minute: u32,
52    pub idle_ttl_secs: u64,
53    pub share_memory: bool,
54    #[serde(default)]
55    pub allowed_tools: Vec<String>,
56}
57
58impl Default for PersonaPolicy {
59    fn default() -> Self {
60        Self {
61            max_spawn_depth: 4,
62            max_branching_factor: 4,
63            token_budget_per_minute: 20_000,
64            compute_ms_per_minute: 10_000,
65            idle_ttl_secs: 3_600,
66            share_memory: false,
67            allowed_tools: Vec::new(),
68        }
69    }
70}
71
72/// Identity contract for a persona.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct PersonaIdentity {
75    pub id: String,
76    pub name: String,
77    pub role: String,
78    pub charter: String,
79    pub swarm_id: Option<String>,
80    pub parent_id: Option<String>,
81    pub depth: u32,
82    pub created_at: DateTime<Utc>,
83    #[serde(default)]
84    pub tags: Vec<String>,
85}
86
87/// Full runtime state for a persona.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PersonaRuntimeState {
90    pub identity: PersonaIdentity,
91    pub policy: PersonaPolicy,
92    pub status: PersonaStatus,
93    pub thought_count: u64,
94    pub last_tick_at: Option<DateTime<Utc>>,
95    pub updated_at: DateTime<Utc>,
96    /// Tokens consumed in current 60-second window.
97    pub tokens_this_window: u32,
98    /// Compute milliseconds consumed in current 60-second window.
99    pub compute_ms_this_window: u32,
100    /// Start of the current budget window.
101    pub window_started_at: DateTime<Utc>,
102    /// Last time this persona made meaningful progress (not budget-paused/quorum-waiting).
103    pub last_progress_at: DateTime<Utc>,
104    /// Whether this persona is currently paused due to budget exhaustion.
105    #[serde(default)]
106    pub budget_paused: bool,
107}
108
109/// Proposal risk level.
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
111#[serde(rename_all = "snake_case")]
112pub enum ProposalRisk {
113    Low,
114    Medium,
115    High,
116    Critical,
117}
118
119/// Proposal status.
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "snake_case")]
122pub enum ProposalStatus {
123    Created,
124    Verified,
125    Rejected,
126    Executed,
127}
128
129/// A vote on a proposal.
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub enum ProposalVote {
133    Approve,
134    Reject,
135    Veto,
136    Abstain,
137}
138
139/// Proposal contract (think first, execute through gates).
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct Proposal {
142    pub id: String,
143    pub persona_id: String,
144    pub title: String,
145    pub rationale: String,
146    pub evidence_refs: Vec<String>,
147    pub risk: ProposalRisk,
148    pub status: ProposalStatus,
149    pub created_at: DateTime<Utc>,
150    /// Votes from personas: persona_id -> vote
151    #[serde(default)]
152    pub votes: HashMap<String, ProposalVote>,
153    /// Deadline for voting
154    pub vote_deadline: Option<DateTime<Utc>>,
155    /// Whether votes have been requested this cycle
156    #[serde(default)]
157    pub votes_requested: bool,
158    /// Quorum required (frozen at creation time to prevent drift).
159    #[serde(default)]
160    pub quorum_needed: usize,
161}
162
163/// Thought/event types emitted by the cognition loop.
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub enum ThoughtEventType {
167    ThoughtGenerated,
168    HypothesisRaised,
169    CheckRequested,
170    CheckResult,
171    ProposalCreated,
172    ProposalVerified,
173    ProposalRejected,
174    ActionExecuted,
175    PersonaSpawned,
176    PersonaReaped,
177    SnapshotCompressed,
178    BeliefExtracted,
179    BeliefContested,
180    BeliefRevalidated,
181    BudgetPaused,
182    IdleReaped,
183    AttentionCreated,
184    VoteCast,
185    WorkspaceUpdated,
186}
187
188impl ThoughtEventType {
189    fn as_str(&self) -> &'static str {
190        match self {
191            Self::ThoughtGenerated => "thought_generated",
192            Self::HypothesisRaised => "hypothesis_raised",
193            Self::CheckRequested => "check_requested",
194            Self::CheckResult => "check_result",
195            Self::ProposalCreated => "proposal_created",
196            Self::ProposalVerified => "proposal_verified",
197            Self::ProposalRejected => "proposal_rejected",
198            Self::ActionExecuted => "action_executed",
199            Self::PersonaSpawned => "persona_spawned",
200            Self::PersonaReaped => "persona_reaped",
201            Self::SnapshotCompressed => "snapshot_compressed",
202            Self::BeliefExtracted => "belief_extracted",
203            Self::BeliefContested => "belief_contested",
204            Self::BeliefRevalidated => "belief_revalidated",
205            Self::BudgetPaused => "budget_paused",
206            Self::IdleReaped => "idle_reaped",
207            Self::AttentionCreated => "attention_created",
208            Self::VoteCast => "vote_cast",
209            Self::WorkspaceUpdated => "workspace_updated",
210        }
211    }
212}
213
214/// Streamable thought event contract.
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct ThoughtEvent {
217    pub id: String,
218    pub event_type: ThoughtEventType,
219    pub persona_id: Option<String>,
220    pub swarm_id: Option<String>,
221    pub timestamp: DateTime<Utc>,
222    pub payload: serde_json::Value,
223}
224
225/// Distilled memory snapshot contract.
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct MemorySnapshot {
228    pub id: String,
229    pub generated_at: DateTime<Utc>,
230    pub swarm_id: Option<String>,
231    pub persona_scope: Vec<String>,
232    pub summary: String,
233    pub hot_event_count: usize,
234    pub warm_fact_count: usize,
235    pub cold_snapshot_count: usize,
236    pub metadata: HashMap<String, serde_json::Value>,
237}
238
239/// Request payload for creating a persona.
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct CreatePersonaRequest {
242    pub persona_id: Option<String>,
243    pub name: String,
244    pub role: String,
245    pub charter: String,
246    pub swarm_id: Option<String>,
247    pub parent_id: Option<String>,
248    pub policy: Option<PersonaPolicy>,
249    #[serde(default)]
250    pub tags: Vec<String>,
251}
252
253/// Request payload for spawning a child persona.
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct SpawnPersonaRequest {
256    pub persona_id: Option<String>,
257    pub name: String,
258    pub role: String,
259    pub charter: String,
260    pub swarm_id: Option<String>,
261    pub policy: Option<PersonaPolicy>,
262}
263
264/// Request payload for reaping persona(s).
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct ReapPersonaRequest {
267    pub cascade: Option<bool>,
268    pub reason: Option<String>,
269}
270
271/// Start-control payload for perpetual cognition.
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct StartCognitionRequest {
274    pub loop_interval_ms: Option<u64>,
275    pub seed_persona: Option<CreatePersonaRequest>,
276}
277
278/// Stop-control payload for perpetual cognition.
279#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct StopCognitionRequest {
281    pub reason: Option<String>,
282}
283
284// ── Attention, Governance, GlobalWorkspace ──────────────────────────────
285
286/// Source of an attention item.
287#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
288#[serde(rename_all = "snake_case")]
289pub enum AttentionSource {
290    ContestedBelief,
291    FailedCheck,
292    StaleBelief,
293    ProposalTimeout,
294    FailedExecution,
295}
296
297/// An item requiring persona attention.
298#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct AttentionItem {
300    pub id: String,
301    pub topic: String,
302    pub topic_tags: Vec<String>,
303    pub priority: f32,
304    pub source_type: AttentionSource,
305    pub source_id: String,
306    pub assigned_persona: Option<String>,
307    pub created_at: DateTime<Utc>,
308    pub resolved_at: Option<DateTime<Utc>>,
309}
310
311/// Governance rules for swarm voting.
312#[derive(Debug, Clone, Serialize, Deserialize)]
313pub struct SwarmGovernance {
314    pub quorum_fraction: f32,
315    pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
316    pub veto_roles: Vec<String>,
317    pub vote_timeout_secs: u64,
318}
319
320impl Default for SwarmGovernance {
321    fn default() -> Self {
322        Self {
323            quorum_fraction: 0.5,
324            required_approvers_by_role: HashMap::new(),
325            veto_roles: Vec::new(),
326            vote_timeout_secs: 300,
327        }
328    }
329}
330
331/// The coherent "now" for the entire swarm.
332#[derive(Debug, Clone, Serialize, Deserialize)]
333pub struct GlobalWorkspace {
334    pub top_beliefs: Vec<String>,
335    pub top_uncertainties: Vec<String>,
336    pub top_attention: Vec<String>,
337    pub active_objectives: Vec<String>,
338    pub updated_at: DateTime<Utc>,
339}
340
341impl Default for GlobalWorkspace {
342    fn default() -> Self {
343        Self {
344            top_beliefs: Vec::new(),
345            top_uncertainties: Vec::new(),
346            top_attention: Vec::new(),
347            active_objectives: Vec::new(),
348            updated_at: Utc::now(),
349        }
350    }
351}
352
353/// Start/stop response status.
354#[derive(Debug, Clone, Serialize, Deserialize)]
355pub struct CognitionStatus {
356    pub enabled: bool,
357    pub running: bool,
358    pub loop_interval_ms: u64,
359    pub started_at: Option<DateTime<Utc>>,
360    pub last_tick_at: Option<DateTime<Utc>>,
361    pub persona_count: usize,
362    pub active_persona_count: usize,
363    pub events_buffered: usize,
364    pub snapshots_buffered: usize,
365}
366
367/// Reap response.
368#[derive(Debug, Clone, Serialize, Deserialize)]
369pub struct ReapPersonaResponse {
370    pub reaped_ids: Vec<String>,
371    pub count: usize,
372}
373
374/// Lineage node response.
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct LineageNode {
377    pub persona_id: String,
378    pub parent_id: Option<String>,
379    pub children: Vec<String>,
380    pub depth: u32,
381    pub status: PersonaStatus,
382}
383
384/// Lineage graph response.
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct LineageGraph {
387    pub nodes: Vec<LineageNode>,
388    pub roots: Vec<String>,
389    pub total_edges: usize,
390}
391
392#[derive(Debug, Clone, Copy, PartialEq, Eq)]
393enum ThoughtPhase {
394    Observe,
395    Reflect,
396    Test,
397    Compress,
398}
399
400impl ThoughtPhase {
401    fn from_thought_count(thought_count: u64) -> Self {
402        match thought_count % 4 {
403            1 => Self::Observe,
404            2 => Self::Reflect,
405            3 => Self::Test,
406            _ => Self::Compress,
407        }
408    }
409
410    fn as_str(&self) -> &'static str {
411        match self {
412            Self::Observe => "observe",
413            Self::Reflect => "reflect",
414            Self::Test => "test",
415            Self::Compress => "compress",
416        }
417    }
418
419    fn event_type(&self) -> ThoughtEventType {
420        match self {
421            Self::Observe => ThoughtEventType::ThoughtGenerated,
422            Self::Reflect => ThoughtEventType::HypothesisRaised,
423            Self::Test => ThoughtEventType::CheckRequested,
424            Self::Compress => ThoughtEventType::SnapshotCompressed,
425        }
426    }
427}
428
429#[derive(Debug, Clone)]
430struct ThoughtWorkItem {
431    persona_id: String,
432    persona_name: String,
433    role: String,
434    charter: String,
435    swarm_id: Option<String>,
436    thought_count: u64,
437    phase: ThoughtPhase,
438}
439
440#[derive(Debug, Clone)]
441struct ThoughtResult {
442    source: &'static str,
443    model: Option<String>,
444    finish_reason: Option<String>,
445    thinking: String,
446    prompt_tokens: Option<u32>,
447    completion_tokens: Option<u32>,
448    total_tokens: Option<u32>,
449    latency_ms: u128,
450    error: Option<String>,
451}
452
453/// Runtime options for cognition manager.
454#[derive(Debug, Clone)]
455pub struct CognitionRuntimeOptions {
456    pub enabled: bool,
457    pub loop_interval_ms: u64,
458    pub max_events: usize,
459    pub max_snapshots: usize,
460    pub default_policy: PersonaPolicy,
461}
462
463impl Default for CognitionRuntimeOptions {
464    fn default() -> Self {
465        Self {
466            enabled: false,
467            loop_interval_ms: 2_000,
468            max_events: 2_000,
469            max_snapshots: 128,
470            default_policy: PersonaPolicy::default(),
471        }
472    }
473}
474
475/// In-memory cognition runtime for perpetual persona swarms.
476#[derive(Debug)]
477pub struct CognitionRuntime {
478    enabled: bool,
479    max_events: usize,
480    max_snapshots: usize,
481    default_policy: PersonaPolicy,
482    running: Arc<AtomicBool>,
483    loop_interval_ms: Arc<RwLock<u64>>,
484    started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
485    last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
486    personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
487    proposals: Arc<RwLock<HashMap<String, Proposal>>>,
488    events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
489    snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
490    loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
491    event_tx: broadcast::Sender<ThoughtEvent>,
492    thinker: Option<Arc<ThinkerClient>>,
493    beliefs: Arc<RwLock<HashMap<String, Belief>>>,
494    attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
495    governance: Arc<RwLock<SwarmGovernance>>,
496    workspace: Arc<RwLock<GlobalWorkspace>>,
497    tools: Option<Arc<ToolRegistry>>,
498    receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
499    /// Proposals pending human approval for Critical risk.
500    pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
501}
502
503impl CognitionRuntime {
504    /// Build runtime from environment feature flags.
505    pub fn new_from_env() -> Self {
506        let mut options = CognitionRuntimeOptions::default();
507        options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
508        options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
509        options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
510        options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
511
512        options.default_policy = PersonaPolicy {
513            max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
514            max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
515            token_budget_per_minute: env_u32(
516                "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
517                20_000,
518            ),
519            compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
520            idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
521            share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
522            allowed_tools: Vec::new(),
523        };
524
525        let thinker_backend = thinker::ThinkerBackend::from_env(
526            &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
527                .unwrap_or_else(|_| "openai_compat".to_string()),
528        );
529        let thinker_timeout_default = match thinker_backend {
530            thinker::ThinkerBackend::OpenAICompat => 30_000,
531            thinker::ThinkerBackend::Candle => 12_000,
532        };
533        let thinker_config = ThinkerConfig {
534            enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
535            backend: thinker_backend,
536            endpoint: normalize_thinker_endpoint(
537                &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
538                    .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
539            ),
540            model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
541                .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
542            api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
543            temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
544            top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
545                .ok()
546                .and_then(|v| v.parse::<f32>().ok()),
547            max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
548            timeout_ms: env_u64(
549                "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
550                thinker_timeout_default,
551            ),
552            candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
553            candle_tokenizer_path: std::env::var(
554                "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
555            )
556            .ok(),
557            candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
558            candle_device: thinker::CandleDevicePreference::from_env(
559                &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
560                    .unwrap_or_else(|_| "auto".to_string()),
561            ),
562            candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
563            candle_repeat_penalty: env_f32(
564                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
565                1.1,
566            ),
567            candle_repeat_last_n: env_usize(
568                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
569                64,
570            ),
571            candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
572        };
573
574        Self::new_with_options_and_thinker(options, Some(thinker_config))
575    }
576
577    /// Build runtime from explicit options.
578    pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
579        Self::new_with_options_and_thinker(options, None)
580    }
581
582    fn new_with_options_and_thinker(
583        options: CognitionRuntimeOptions,
584        thinker_config: Option<ThinkerConfig>,
585    ) -> Self {
586        let (event_tx, _) = broadcast::channel(256);
587        let thinker = thinker_config.and_then(|cfg| {
588            if !cfg.enabled {
589                return None;
590            }
591            match ThinkerClient::new(cfg) {
592                Ok(client) => {
593                    tracing::info!(
594                        backend = ?client.config().backend,
595                        endpoint = %client.config().endpoint,
596                        model = %client.config().model,
597                        "Cognition thinker initialized"
598                    );
599                    Some(Arc::new(client))
600                }
601                Err(error) => {
602                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
603                    None
604                }
605            }
606        });
607
608        Self {
609            enabled: options.enabled,
610            max_events: options.max_events.max(32),
611            max_snapshots: options.max_snapshots.max(8),
612            default_policy: options.default_policy,
613            running: Arc::new(AtomicBool::new(false)),
614            loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
615            started_at: Arc::new(RwLock::new(None)),
616            last_tick_at: Arc::new(RwLock::new(None)),
617            personas: Arc::new(RwLock::new(HashMap::new())),
618            proposals: Arc::new(RwLock::new(HashMap::new())),
619            events: Arc::new(RwLock::new(VecDeque::new())),
620            snapshots: Arc::new(RwLock::new(VecDeque::new())),
621            loop_handle: Arc::new(Mutex::new(None)),
622            event_tx,
623            thinker,
624            beliefs: Arc::new(RwLock::new(HashMap::new())),
625            attention_queue: Arc::new(RwLock::new(Vec::new())),
626            governance: Arc::new(RwLock::new(SwarmGovernance::default())),
627            workspace: Arc::new(RwLock::new(GlobalWorkspace::default())),
628            tools: None,
629            receipts: Arc::new(RwLock::new(Vec::new())),
630            pending_approvals: Arc::new(RwLock::new(HashMap::new())),
631        }
632    }
633
634    /// Whether cognition is enabled by feature flag.
635    pub fn is_enabled(&self) -> bool {
636        self.enabled
637    }
638
639    /// Subscribe to thought events for streaming.
640    pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
641        self.event_tx.subscribe()
642    }
643
644    /// Start the perpetual cognition loop.
645    pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
646        if !self.enabled {
647            return Err(anyhow!(
648                "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
649            ));
650        }
651
652        let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
653        if let Some(request) = req {
654            if let Some(interval) = request.loop_interval_ms {
655                let mut lock = self.loop_interval_ms.write().await;
656                *lock = interval.max(100);
657            }
658            requested_seed_persona = request.seed_persona;
659        }
660
661        let has_personas = !self.personas.read().await.is_empty();
662        if !has_personas {
663            let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
664            let _ = self.create_persona(seed).await?;
665        }
666
667        if self.running.load(Ordering::SeqCst) {
668            return Ok(self.status().await);
669        }
670
671        self.running.store(true, Ordering::SeqCst);
672        {
673            let mut started = self.started_at.write().await;
674            *started = Some(Utc::now());
675        }
676
677        let running = Arc::clone(&self.running);
678        let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
679        let last_tick_at = Arc::clone(&self.last_tick_at);
680        let personas = Arc::clone(&self.personas);
681        let proposals = Arc::clone(&self.proposals);
682        let events = Arc::clone(&self.events);
683        let snapshots = Arc::clone(&self.snapshots);
684        let max_events = self.max_events;
685        let max_snapshots = self.max_snapshots;
686        let event_tx = self.event_tx.clone();
687        let thinker = self.thinker.clone();
688        let beliefs = Arc::clone(&self.beliefs);
689        let attention_queue = Arc::clone(&self.attention_queue);
690        let governance = Arc::clone(&self.governance);
691        let workspace = Arc::clone(&self.workspace);
692        let tools = self.tools.clone();
693        let receipts = Arc::clone(&self.receipts);
694        let pending_approvals = Arc::clone(&self.pending_approvals);
695
696        let handle = tokio::spawn(async move {
697            let mut next_tick = Instant::now();
698            while running.load(Ordering::SeqCst) {
699                let now_instant = Instant::now();
700                if now_instant < next_tick {
701                    tokio::time::sleep_until(next_tick).await;
702                }
703                if !running.load(Ordering::SeqCst) {
704                    break;
705                }
706
707                let now = Utc::now();
708                {
709                    let mut lock = last_tick_at.write().await;
710                    *lock = Some(now);
711                }
712
713                let mut new_events = Vec::new();
714                let mut new_snapshots = Vec::new();
715                let mut new_proposals = Vec::new();
716
717                // ── Step 1: Budget window reset + enforcement ──
718                let work_items: Vec<ThoughtWorkItem> = {
719                    let mut map = personas.write().await;
720                    let mut items = Vec::new();
721                    for persona in map.values_mut() {
722                        if persona.status != PersonaStatus::Active {
723                            continue;
724                        }
725
726                        // Reset budget window every 60 seconds
727                        let window_elapsed = now
728                            .signed_duration_since(persona.window_started_at)
729                            .num_seconds();
730                        if window_elapsed >= 60 {
731                            persona.tokens_this_window = 0;
732                            persona.compute_ms_this_window = 0;
733                            persona.window_started_at = now;
734                        }
735
736                        // Check budget
737                        let token_ok =
738                            persona.tokens_this_window < persona.policy.token_budget_per_minute;
739                        let compute_ok =
740                            persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
741                        if !token_ok || !compute_ok {
742                            if !persona.budget_paused {
743                                persona.budget_paused = true;
744                                new_events.push(ThoughtEvent {
745                                    id: Uuid::new_v4().to_string(),
746                                    event_type: ThoughtEventType::BudgetPaused,
747                                    persona_id: Some(persona.identity.id.clone()),
748                                    swarm_id: persona.identity.swarm_id.clone(),
749                                    timestamp: now,
750                                    payload: json!({
751                                        "budget_paused": true,
752                                        "tokens_used": persona.tokens_this_window,
753                                        "compute_ms_used": persona.compute_ms_this_window,
754                                        "token_budget": persona.policy.token_budget_per_minute,
755                                        "compute_budget": persona.policy.compute_ms_per_minute,
756                                    }),
757                                });
758                            }
759                            continue; // skip tick for this persona
760                        }
761
762                        persona.budget_paused = false;
763                        persona.thought_count = persona.thought_count.saturating_add(1);
764                        persona.last_tick_at = Some(now);
765                        persona.updated_at = now;
766                        items.push(ThoughtWorkItem {
767                            persona_id: persona.identity.id.clone(),
768                            persona_name: persona.identity.name.clone(),
769                            role: persona.identity.role.clone(),
770                            charter: persona.identity.charter.clone(),
771                            swarm_id: persona.identity.swarm_id.clone(),
772                            thought_count: persona.thought_count,
773                            phase: ThoughtPhase::from_thought_count(persona.thought_count),
774                        });
775                    }
776                    items
777                };
778
779                for work in &work_items {
780                    let context = recent_persona_context(&events, &work.persona_id, 8).await;
781
782                    let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
783
784                    let event_timestamp = Utc::now();
785                    let is_fallback = thought.source == "fallback";
786                    let tokens_used = thought.total_tokens.unwrap_or(0);
787                    let compute_used = thought.latency_ms as u32;
788
789                    new_events.push(ThoughtEvent {
790                        id: Uuid::new_v4().to_string(),
791                        event_type: work.phase.event_type(),
792                        persona_id: Some(work.persona_id.clone()),
793                        swarm_id: work.swarm_id.clone(),
794                        timestamp: event_timestamp,
795                        payload: json!({
796                            "phase": work.phase.as_str(),
797                            "thought_count": work.thought_count,
798                            "persona": {
799                                "id": work.persona_id.clone(),
800                                "name": work.persona_name.clone(),
801                                "role": work.role.clone(),
802                            },
803                            "context_event_count": context.len(),
804                            "thinking": thought.thinking.clone(),
805                            "source": thought.source,
806                            "model": thought.model.clone(),
807                            "finish_reason": thought.finish_reason.clone(),
808                            "usage": {
809                                "prompt_tokens": thought.prompt_tokens,
810                                "completion_tokens": thought.completion_tokens,
811                                "total_tokens": thought.total_tokens,
812                            },
813                            "latency_ms": thought.latency_ms,
814                            "error": thought.error.clone(),
815                        }),
816                    });
817
818                    // ── After thought: update budget counters ──
819                    {
820                        let mut map = personas.write().await;
821                        if let Some(persona) = map.get_mut(&work.persona_id) {
822                            persona.tokens_this_window =
823                                persona.tokens_this_window.saturating_add(tokens_used);
824                            persona.compute_ms_this_window =
825                                persona.compute_ms_this_window.saturating_add(compute_used);
826                            if !is_fallback {
827                                persona.last_progress_at = Utc::now();
828                            }
829                        }
830                    }
831
832                    // ── Step 3: Belief extraction during Reflect phase ──
833                    if work.phase == ThoughtPhase::Reflect && !is_fallback {
834                        let extracted = beliefs::extract_beliefs_from_thought(
835                            thinker.as_deref(),
836                            &work.persona_id,
837                            &thought.thinking,
838                        )
839                        .await;
840
841                        if !extracted.is_empty() {
842                            let mut belief_store = beliefs.write().await;
843                            let mut attn_queue = attention_queue.write().await;
844                            for mut new_belief in extracted {
845                                // Check for existing belief_key (duplicate detection)
846                                let existing_id = belief_store
847                                    .values()
848                                    .find(|b| {
849                                        b.belief_key == new_belief.belief_key
850                                            && b.status != BeliefStatus::Invalidated
851                                    })
852                                    .map(|b| b.id.clone());
853
854                                if let Some(eid) = existing_id {
855                                    // Confirm existing belief
856                                    if let Some(existing) = belief_store.get_mut(&eid) {
857                                        if !existing.confirmed_by.contains(&work.persona_id) {
858                                            existing.confirmed_by.push(work.persona_id.clone());
859                                        }
860                                        existing.updated_at = Utc::now();
861                                    }
862                                } else {
863                                    // Handle contradiction targets
864                                    let contest_targets: Vec<String> =
865                                        new_belief.contradicts.clone();
866                                    for target_key in &contest_targets {
867                                        if let Some(target) = belief_store.values_mut().find(|b| {
868                                            &b.belief_key == target_key
869                                                && b.status != BeliefStatus::Invalidated
870                                        }) {
871                                            target.contested_by.push(new_belief.id.clone());
872                                            if !new_belief.contradicts.contains(&target.belief_key)
873                                            {
874                                                new_belief
875                                                    .contradicts
876                                                    .push(target.belief_key.clone());
877                                            }
878                                            // Apply contest penalty
879                                            target.confidence = (target.confidence - 0.1).max(0.05);
880                                            if target.confidence < 0.5 {
881                                                target.status = BeliefStatus::Stale;
882                                            } else {
883                                                // Create revalidation attention item
884                                                attn_queue.push(AttentionItem {
885                                                    id: Uuid::new_v4().to_string(),
886                                                    topic: format!(
887                                                        "Revalidate belief: {}",
888                                                        target.claim
889                                                    ),
890                                                    topic_tags: vec![target.belief_key.clone()],
891                                                    priority: 0.7,
892                                                    source_type: AttentionSource::ContestedBelief,
893                                                    source_id: target.id.clone(),
894                                                    assigned_persona: None,
895                                                    created_at: Utc::now(),
896                                                    resolved_at: None,
897                                                });
898                                            }
899                                        }
900                                    }
901
902                                    new_events.push(ThoughtEvent {
903                                        id: Uuid::new_v4().to_string(),
904                                        event_type: ThoughtEventType::BeliefExtracted,
905                                        persona_id: Some(work.persona_id.clone()),
906                                        swarm_id: work.swarm_id.clone(),
907                                        timestamp: Utc::now(),
908                                        payload: json!({
909                                            "belief_id": new_belief.id,
910                                            "belief_key": new_belief.belief_key,
911                                            "claim": trim_for_storage(&new_belief.claim, 280),
912                                            "confidence": new_belief.confidence,
913                                        }),
914                                    });
915
916                                    // Mark progress for belief creation
917                                    {
918                                        let mut map = personas.write().await;
919                                        if let Some(p) = map.get_mut(&work.persona_id) {
920                                            p.last_progress_at = Utc::now();
921                                        }
922                                    }
923
924                                    belief_store.insert(new_belief.id.clone(), new_belief);
925                                }
926                            }
927                        }
928                    }
929
930                    if work.phase == ThoughtPhase::Test {
931                        new_events.push(ThoughtEvent {
932                            id: Uuid::new_v4().to_string(),
933                            event_type: ThoughtEventType::CheckResult,
934                            persona_id: Some(work.persona_id.clone()),
935                            swarm_id: work.swarm_id.clone(),
936                            timestamp: Utc::now(),
937                            payload: json!({
938                                "phase": work.phase.as_str(),
939                                "thought_count": work.thought_count,
940                                "result_excerpt": trim_for_storage(&thought.thinking, 280),
941                                "source": thought.source,
942                                "model": thought.model,
943                            }),
944                        });
945
946                        // Mark progress for check result
947                        {
948                            let mut map = personas.write().await;
949                            if let Some(p) = map.get_mut(&work.persona_id) {
950                                p.last_progress_at = Utc::now();
951                            }
952                        }
953
954                        // ── Step 6: Tool execution via capability leases ──
955                        if !is_fallback {
956                            if let Some(ref tool_registry) = tools {
957                                let allowed = {
958                                    let map = personas.read().await;
959                                    map.get(&work.persona_id)
960                                        .map(|p| p.policy.allowed_tools.clone())
961                                        .unwrap_or_default()
962                                };
963                                if !allowed.is_empty() {
964                                    let tool_results = executor::execute_tool_requests(
965                                        thinker.as_deref(),
966                                        tool_registry,
967                                        &work.persona_id,
968                                        &thought.thinking,
969                                        &allowed,
970                                    )
971                                    .await;
972
973                                    for result_event in tool_results {
974                                        new_events.push(result_event);
975                                        // Mark progress for tool execution
976                                        let mut map = personas.write().await;
977                                        if let Some(p) = map.get_mut(&work.persona_id) {
978                                            p.last_progress_at = Utc::now();
979                                        }
980                                    }
981                                }
982                            }
983                        }
984                    }
985
986                    if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
987                        let gov = governance.read().await;
988                        let proposal = Proposal {
989                            id: Uuid::new_v4().to_string(),
990                            persona_id: work.persona_id.clone(),
991                            title: proposal_title_from_thought(
992                                &thought.thinking,
993                                work.thought_count,
994                            ),
995                            rationale: trim_for_storage(&thought.thinking, 900),
996                            evidence_refs: vec!["internal.thought_stream".to_string()],
997                            risk: ProposalRisk::Low,
998                            status: ProposalStatus::Created,
999                            created_at: Utc::now(),
1000                            votes: HashMap::new(),
1001                            vote_deadline: Some(
1002                                Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1003                            ),
1004                            votes_requested: false,
1005                            quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1006                                as usize,
1007                        };
1008
1009                        new_events.push(ThoughtEvent {
1010                            id: Uuid::new_v4().to_string(),
1011                            event_type: ThoughtEventType::ProposalCreated,
1012                            persona_id: Some(work.persona_id.clone()),
1013                            swarm_id: work.swarm_id.clone(),
1014                            timestamp: Utc::now(),
1015                            payload: json!({
1016                                "proposal_id": proposal.id,
1017                                "title": proposal.title,
1018                                "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1019                                "source": thought.source,
1020                                "model": thought.model,
1021                            }),
1022                        });
1023
1024                        new_proposals.push(proposal);
1025                    }
1026
1027                    if work.phase == ThoughtPhase::Compress {
1028                        new_snapshots.push(MemorySnapshot {
1029                            id: Uuid::new_v4().to_string(),
1030                            generated_at: Utc::now(),
1031                            swarm_id: work.swarm_id.clone(),
1032                            persona_scope: vec![work.persona_id.clone()],
1033                            summary: trim_for_storage(&thought.thinking, 1_500),
1034                            hot_event_count: context.len(),
1035                            warm_fact_count: estimate_fact_count(&thought.thinking),
1036                            cold_snapshot_count: 1,
1037                            metadata: HashMap::from([
1038                                (
1039                                    "phase".to_string(),
1040                                    serde_json::Value::String(work.phase.as_str().to_string()),
1041                                ),
1042                                (
1043                                    "role".to_string(),
1044                                    serde_json::Value::String(work.role.clone()),
1045                                ),
1046                                (
1047                                    "source".to_string(),
1048                                    serde_json::Value::String(thought.source.to_string()),
1049                                ),
1050                                (
1051                                    "model".to_string(),
1052                                    serde_json::Value::String(
1053                                        thought
1054                                            .model
1055                                            .clone()
1056                                            .unwrap_or_else(|| "fallback".to_string()),
1057                                    ),
1058                                ),
1059                                (
1060                                    "completion_tokens".to_string(),
1061                                    serde_json::Value::Number(serde_json::Number::from(
1062                                        thought.completion_tokens.unwrap_or(0) as u64,
1063                                    )),
1064                                ),
1065                            ]),
1066                        });
1067
1068                        // ── Step 3: Belief staleness decay in Compress phase ──
1069                        {
1070                            let mut belief_store = beliefs.write().await;
1071                            let mut attn_queue = attention_queue.write().await;
1072                            let stale_ids: Vec<String> = belief_store
1073                                .values()
1074                                .filter(|b| {
1075                                    b.status == BeliefStatus::Active && now > b.review_after
1076                                })
1077                                .map(|b| b.id.clone())
1078                                .collect();
1079                            for id in stale_ids {
1080                                if let Some(belief) = belief_store.get_mut(&id) {
1081                                    belief.status = BeliefStatus::Stale;
1082                                    belief.confidence *= 0.98;
1083                                    belief.confidence = belief.confidence.max(0.05);
1084                                    attn_queue.push(AttentionItem {
1085                                        id: Uuid::new_v4().to_string(),
1086                                        topic: format!("Stale belief: {}", belief.claim),
1087                                        topic_tags: vec![belief.belief_key.clone()],
1088                                        priority: 0.4,
1089                                        source_type: AttentionSource::StaleBelief,
1090                                        source_id: belief.id.clone(),
1091                                        assigned_persona: None,
1092                                        created_at: now,
1093                                        resolved_at: None,
1094                                    });
1095                                }
1096                            }
1097                        }
1098
1099                        // ── Step 4d: Update GlobalWorkspace ──
1100                        {
1101                            let belief_store = beliefs.read().await;
1102                            let attn_queue = attention_queue.read().await;
1103
1104                            let mut sorted_beliefs: Vec<&Belief> = belief_store
1105                                .values()
1106                                .filter(|b| b.status == BeliefStatus::Active)
1107                                .collect();
1108                            sorted_beliefs.sort_by(|a, b| {
1109                                let score_a = a.confidence
1110                                    * (1.0
1111                                        / (1.0
1112                                            + now.signed_duration_since(a.updated_at).num_minutes()
1113                                                as f32));
1114                                let score_b = b.confidence
1115                                    * (1.0
1116                                        / (1.0
1117                                            + now.signed_duration_since(b.updated_at).num_minutes()
1118                                                as f32));
1119                                score_b
1120                                    .partial_cmp(&score_a)
1121                                    .unwrap_or(std::cmp::Ordering::Equal)
1122                            });
1123
1124                            let top_beliefs: Vec<String> = sorted_beliefs
1125                                .iter()
1126                                .take(10)
1127                                .map(|b| b.id.clone())
1128                                .collect();
1129                            let top_uncertainties: Vec<String> = {
1130                                let mut uncertain: Vec<&Belief> = belief_store
1131                                    .values()
1132                                    .filter(|b| {
1133                                        b.status == BeliefStatus::Stale
1134                                            || !b.contested_by.is_empty()
1135                                    })
1136                                    .collect();
1137                                // Deterministic order: contested first, then lowest confidence,
1138                                // then oldest updated_at.
1139                                uncertain.sort_by(|a, b| {
1140                                    let a_contested = !a.contested_by.is_empty();
1141                                    let b_contested = !b.contested_by.is_empty();
1142                                    b_contested
1143                                        .cmp(&a_contested)
1144                                        .then_with(|| {
1145                                            a.confidence
1146                                                .partial_cmp(&b.confidence)
1147                                                .unwrap_or(std::cmp::Ordering::Equal)
1148                                        })
1149                                        .then_with(|| a.updated_at.cmp(&b.updated_at))
1150                                });
1151                                uncertain
1152                                    .iter()
1153                                    .take(5)
1154                                    .map(|b| {
1155                                        format!(
1156                                            "[{}] {}",
1157                                            b.belief_key,
1158                                            trim_for_storage(&b.claim, 80)
1159                                        )
1160                                    })
1161                                    .collect()
1162                            };
1163
1164                            let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1165                                .iter()
1166                                .filter(|a| a.resolved_at.is_none())
1167                                .collect();
1168                            sorted_attn.sort_by(|a, b| {
1169                                b.priority
1170                                    .partial_cmp(&a.priority)
1171                                    .unwrap_or(std::cmp::Ordering::Equal)
1172                            });
1173                            let top_attention: Vec<String> =
1174                                sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1175
1176                            let mut ws = workspace.write().await;
1177                            ws.top_beliefs = top_beliefs;
1178                            ws.top_uncertainties = top_uncertainties;
1179                            ws.top_attention = top_attention;
1180                            ws.updated_at = now;
1181                        }
1182
1183                        new_events.push(ThoughtEvent {
1184                            id: Uuid::new_v4().to_string(),
1185                            event_type: ThoughtEventType::WorkspaceUpdated,
1186                            persona_id: Some(work.persona_id.clone()),
1187                            swarm_id: work.swarm_id.clone(),
1188                            timestamp: Utc::now(),
1189                            payload: json!({ "updated": true }),
1190                        });
1191                    }
1192                }
1193
1194                // ── Step 4c: Governance — resolve proposals ──
1195                {
1196                    let gov = governance.read().await;
1197                    let mut proposal_store = proposals.write().await;
1198                    let persona_map = personas.read().await;
1199
1200                    let proposal_ids: Vec<String> = proposal_store
1201                        .values()
1202                        .filter(|p| p.status == ProposalStatus::Created)
1203                        .map(|p| p.id.clone())
1204                        .collect();
1205
1206                    let mut attn_queue = attention_queue.write().await;
1207                    for pid in proposal_ids {
1208                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1209                            let quorum_needed = proposal.quorum_needed.max(1);
1210
1211                            // Check vote deadline
1212                            if let Some(deadline) = proposal.vote_deadline {
1213                                if now > deadline {
1214                                    if proposal.votes.len() < quorum_needed {
1215                                        // Timeout without quorum → attention item
1216                                        attn_queue.push(AttentionItem {
1217                                            id: Uuid::new_v4().to_string(),
1218                                            topic: format!(
1219                                                "Proposal vote timeout: {}",
1220                                                proposal.title
1221                                            ),
1222                                            topic_tags: Vec::new(),
1223                                            priority: 0.6,
1224                                            source_type: AttentionSource::ProposalTimeout,
1225                                            source_id: proposal.id.clone(),
1226                                            assigned_persona: None,
1227                                            created_at: now,
1228                                            resolved_at: None,
1229                                        });
1230                                        proposal.status = ProposalStatus::Rejected;
1231                                        continue;
1232                                    }
1233
1234                                    // Deadline passed with quorum but missing required approvers → reject
1235                                    let required_roles = gov
1236                                        .required_approvers_by_role
1237                                        .get(&proposal.risk)
1238                                        .cloned()
1239                                        .unwrap_or_default();
1240                                    let all_required_met = required_roles.iter().all(|role| {
1241                                        proposal.votes.iter().any(|(vid, vote)| {
1242                                            *vote == ProposalVote::Approve
1243                                                && persona_map
1244                                                    .get(vid)
1245                                                    .map(|p| &p.identity.role == role)
1246                                                    .unwrap_or(false)
1247                                        })
1248                                    });
1249                                    if !all_required_met {
1250                                        attn_queue.push(AttentionItem {
1251                                            id: Uuid::new_v4().to_string(),
1252                                            topic: format!(
1253                                                "Missing required approvers: {}",
1254                                                proposal.title
1255                                            ),
1256                                            topic_tags: Vec::new(),
1257                                            priority: 0.7,
1258                                            source_type: AttentionSource::ProposalTimeout,
1259                                            source_id: proposal.id.clone(),
1260                                            assigned_persona: None,
1261                                            created_at: now,
1262                                            resolved_at: None,
1263                                        });
1264                                        proposal.status = ProposalStatus::Rejected;
1265                                        continue;
1266                                    }
1267                                }
1268                            }
1269
1270                            // Resolve if enough votes
1271                            if proposal.votes.len() >= quorum_needed {
1272                                // Check for veto
1273                                let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1274                                    if *vote != ProposalVote::Veto {
1275                                        return false;
1276                                    }
1277                                    if let Some(voter) = persona_map.get(voter_id) {
1278                                        gov.veto_roles.contains(&voter.identity.role)
1279                                    } else {
1280                                        false
1281                                    }
1282                                });
1283
1284                                if vetoed {
1285                                    proposal.status = ProposalStatus::Rejected;
1286                                    continue;
1287                                }
1288
1289                                // Check required approvers
1290                                let required_roles = gov
1291                                    .required_approvers_by_role
1292                                    .get(&proposal.risk)
1293                                    .cloned()
1294                                    .unwrap_or_default();
1295                                let all_required_met = required_roles.iter().all(|role| {
1296                                    proposal.votes.iter().any(|(vid, vote)| {
1297                                        *vote == ProposalVote::Approve
1298                                            && persona_map
1299                                                .get(vid)
1300                                                .map(|p| &p.identity.role == role)
1301                                                .unwrap_or(false)
1302                                    })
1303                                });
1304
1305                                if !all_required_met {
1306                                    continue; // wait for required approvers
1307                                }
1308
1309                                // Count approvals vs rejections
1310                                let approvals = proposal
1311                                    .votes
1312                                    .values()
1313                                    .filter(|v| **v == ProposalVote::Approve)
1314                                    .count();
1315                                let rejections = proposal
1316                                    .votes
1317                                    .values()
1318                                    .filter(|v| **v == ProposalVote::Reject)
1319                                    .count();
1320
1321                                if approvals > rejections {
1322                                    proposal.status = ProposalStatus::Verified;
1323                                } else {
1324                                    proposal.status = ProposalStatus::Rejected;
1325                                }
1326                            }
1327                        }
1328                    }
1329                }
1330
1331                // ── Step 7: Execute verified proposals ──
1332                {
1333                    let mut proposal_store = proposals.write().await;
1334                    let verified_ids: Vec<String> = proposal_store
1335                        .values()
1336                        .filter(|p| p.status == ProposalStatus::Verified)
1337                        .map(|p| p.id.clone())
1338                        .collect();
1339
1340                    for pid in verified_ids {
1341                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1342                            if proposal.risk == ProposalRisk::Critical {
1343                                // Check for human approval
1344                                let approved = {
1345                                    let approvals = pending_approvals.read().await;
1346                                    approvals.get(&pid).copied().unwrap_or(false)
1347                                };
1348                                if !approved {
1349                                    // Register for human approval
1350                                    let mut approvals = pending_approvals.write().await;
1351                                    approvals.entry(pid.clone()).or_insert(false);
1352                                    continue;
1353                                }
1354                            }
1355
1356                            // Create decision receipt
1357                            let receipt = executor::DecisionReceipt {
1358                                id: Uuid::new_v4().to_string(),
1359                                proposal_id: pid.clone(),
1360                                inputs: proposal.evidence_refs.clone(),
1361                                governance_decision: format!(
1362                                    "Approved with {} votes",
1363                                    proposal.votes.len()
1364                                ),
1365                                capability_leases: Vec::new(),
1366                                tool_invocations: Vec::new(),
1367                                outcome: executor::ExecutionOutcome::Success {
1368                                    summary: format!("Proposal '{}' executed", proposal.title),
1369                                },
1370                                created_at: Utc::now(),
1371                            };
1372
1373                            new_events.push(ThoughtEvent {
1374                                id: Uuid::new_v4().to_string(),
1375                                event_type: ThoughtEventType::ActionExecuted,
1376                                persona_id: Some(proposal.persona_id.clone()),
1377                                swarm_id: None,
1378                                timestamp: Utc::now(),
1379                                payload: json!({
1380                                    "receipt_id": receipt.id,
1381                                    "proposal_id": pid,
1382                                    "outcome": "success",
1383                                    "summary": format!("Proposal '{}' executed", proposal.title),
1384                                }),
1385                            });
1386
1387                            receipts.write().await.push(receipt);
1388                            proposal.status = ProposalStatus::Executed;
1389                        }
1390                    }
1391                }
1392
1393                if !new_proposals.is_empty() {
1394                    let mut proposal_store = proposals.write().await;
1395                    for proposal in new_proposals {
1396                        proposal_store.insert(proposal.id.clone(), proposal);
1397                    }
1398                }
1399
1400                for event in new_events {
1401                    push_event_internal(&events, max_events, &event_tx, event).await;
1402                }
1403                for snapshot in new_snapshots {
1404                    push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1405                }
1406
1407                // ── Step 2: Idle TTL reaping ──
1408                {
1409                    let mut map = personas.write().await;
1410                    let idle_ids: Vec<String> = map
1411                        .values()
1412                        .filter(|p| {
1413                            p.status == PersonaStatus::Active
1414                                && !p.budget_paused
1415                                && now.signed_duration_since(p.last_progress_at).num_seconds()
1416                                    > p.policy.idle_ttl_secs as i64
1417                        })
1418                        .map(|p| p.identity.id.clone())
1419                        .collect();
1420
1421                    for id in &idle_ids {
1422                        if let Some(persona) = map.get_mut(id) {
1423                            persona.status = PersonaStatus::Reaped;
1424                            persona.updated_at = now;
1425                        }
1426                        // Also cascade-reap children
1427                        let children: Vec<String> = map
1428                            .values()
1429                            .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1430                            .map(|p| p.identity.id.clone())
1431                            .collect();
1432                        for child_id in children {
1433                            if let Some(child) = map.get_mut(&child_id) {
1434                                child.status = PersonaStatus::Reaped;
1435                                child.updated_at = now;
1436                            }
1437                        }
1438                    }
1439                    drop(map);
1440
1441                    for id in idle_ids {
1442                        push_event_internal(
1443                            &events,
1444                            max_events,
1445                            &event_tx,
1446                            ThoughtEvent {
1447                                id: Uuid::new_v4().to_string(),
1448                                event_type: ThoughtEventType::IdleReaped,
1449                                persona_id: Some(id),
1450                                swarm_id: None,
1451                                timestamp: now,
1452                                payload: json!({ "reason": "idle_ttl_expired" }),
1453                            },
1454                        )
1455                        .await;
1456                    }
1457                }
1458
1459                // ── Step 5: Persistence on Compress ──
1460                if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1461                    let _ = persistence::save_state(
1462                        &personas,
1463                        &proposals,
1464                        &beliefs,
1465                        &attention_queue,
1466                        &workspace,
1467                        &events,
1468                        &snapshots,
1469                    )
1470                    .await;
1471                }
1472
1473                let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1474                next_tick += interval;
1475                let tick_completed = Instant::now();
1476                if tick_completed > next_tick {
1477                    next_tick = tick_completed;
1478                }
1479            }
1480        });
1481
1482        {
1483            let mut lock = self.loop_handle.lock().await;
1484            *lock = Some(handle);
1485        }
1486
1487        Ok(self.status().await)
1488    }
1489
1490    /// Stop the perpetual cognition loop.
1491    pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1492        self.running.store(false, Ordering::SeqCst);
1493
1494        if let Some(handle) = self.loop_handle.lock().await.take() {
1495            handle.abort();
1496            let _ = handle.await;
1497        }
1498
1499        if let Some(reason_value) = reason {
1500            let event = ThoughtEvent {
1501                id: Uuid::new_v4().to_string(),
1502                event_type: ThoughtEventType::CheckResult,
1503                persona_id: None,
1504                swarm_id: None,
1505                timestamp: Utc::now(),
1506                payload: json!({ "stopped": true, "reason": reason_value }),
1507            };
1508            self.push_event(event).await;
1509        }
1510
1511        Ok(self.status().await)
1512    }
1513
1514    /// Create a persona record.
1515    pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1516        let now = Utc::now();
1517        let mut personas = self.personas.write().await;
1518
1519        let mut parent_swarm_id = None;
1520        let mut computed_depth = 0_u32;
1521        let mut inherited_policy = None;
1522
1523        if let Some(parent_id) = req.parent_id.clone() {
1524            let parent = personas
1525                .get(&parent_id)
1526                .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1527
1528            if parent.status == PersonaStatus::Reaped {
1529                return Err(anyhow!("Parent persona {} is reaped", parent_id));
1530            }
1531
1532            parent_swarm_id = parent.identity.swarm_id.clone();
1533            computed_depth = parent.identity.depth.saturating_add(1);
1534            inherited_policy = Some(parent.policy.clone());
1535            let branch_limit = parent.policy.max_branching_factor;
1536
1537            let child_count = personas
1538                .values()
1539                .filter(|p| {
1540                    p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1541                        && p.status != PersonaStatus::Reaped
1542                })
1543                .count();
1544
1545            if child_count as u32 >= branch_limit {
1546                return Err(anyhow!(
1547                    "Parent {} reached branching limit {}",
1548                    parent_id,
1549                    branch_limit
1550                ));
1551            }
1552        }
1553
1554        let policy = req
1555            .policy
1556            .clone()
1557            .or(inherited_policy.clone())
1558            .unwrap_or_else(|| self.default_policy.clone());
1559
1560        let effective_depth_limit = inherited_policy
1561            .as_ref()
1562            .map(|p| p.max_spawn_depth)
1563            .unwrap_or(policy.max_spawn_depth);
1564
1565        if computed_depth > effective_depth_limit {
1566            return Err(anyhow!(
1567                "Spawn depth {} exceeds limit {}",
1568                computed_depth,
1569                effective_depth_limit
1570            ));
1571        }
1572
1573        let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1574        if personas.contains_key(&persona_id) {
1575            return Err(anyhow!("Persona id already exists: {}", persona_id));
1576        }
1577
1578        let identity = PersonaIdentity {
1579            id: persona_id.clone(),
1580            name: req.name,
1581            role: req.role,
1582            charter: req.charter,
1583            swarm_id: req.swarm_id.or(parent_swarm_id),
1584            parent_id: req.parent_id,
1585            depth: computed_depth,
1586            created_at: now,
1587            tags: req.tags,
1588        };
1589
1590        let persona = PersonaRuntimeState {
1591            identity,
1592            policy,
1593            status: PersonaStatus::Active,
1594            thought_count: 0,
1595            last_tick_at: None,
1596            updated_at: now,
1597            tokens_this_window: 0,
1598            compute_ms_this_window: 0,
1599            window_started_at: now,
1600            last_progress_at: now,
1601            budget_paused: false,
1602        };
1603
1604        personas.insert(persona_id, persona.clone());
1605        drop(personas);
1606
1607        self.push_event(ThoughtEvent {
1608            id: Uuid::new_v4().to_string(),
1609            event_type: ThoughtEventType::PersonaSpawned,
1610            persona_id: Some(persona.identity.id.clone()),
1611            swarm_id: persona.identity.swarm_id.clone(),
1612            timestamp: now,
1613            payload: json!({
1614                "name": persona.identity.name,
1615                "role": persona.identity.role,
1616                "depth": persona.identity.depth,
1617            }),
1618        })
1619        .await;
1620
1621        Ok(persona)
1622    }
1623
1624    /// Spawn a child persona under an existing parent.
1625    pub async fn spawn_child(
1626        &self,
1627        parent_id: &str,
1628        req: SpawnPersonaRequest,
1629    ) -> Result<PersonaRuntimeState> {
1630        let request = CreatePersonaRequest {
1631            persona_id: req.persona_id,
1632            name: req.name,
1633            role: req.role,
1634            charter: req.charter,
1635            swarm_id: req.swarm_id,
1636            parent_id: Some(parent_id.to_string()),
1637            policy: req.policy,
1638            tags: Vec::new(),
1639        };
1640        self.create_persona(request).await
1641    }
1642
1643    /// Reap one persona or the full descendant tree.
1644    pub async fn reap_persona(
1645        &self,
1646        persona_id: &str,
1647        req: ReapPersonaRequest,
1648    ) -> Result<ReapPersonaResponse> {
1649        let cascade = req.cascade.unwrap_or(false);
1650        let now = Utc::now();
1651
1652        let mut personas = self.personas.write().await;
1653        if !personas.contains_key(persona_id) {
1654            return Err(anyhow!("Persona not found: {}", persona_id));
1655        }
1656
1657        let mut reaped_ids = vec![persona_id.to_string()];
1658        if cascade {
1659            let mut idx = 0usize;
1660            while idx < reaped_ids.len() {
1661                let current = reaped_ids[idx].clone();
1662                let children: Vec<String> = personas
1663                    .values()
1664                    .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1665                    .map(|p| p.identity.id.clone())
1666                    .collect();
1667                for child in children {
1668                    if !reaped_ids.iter().any(|existing| existing == &child) {
1669                        reaped_ids.push(child);
1670                    }
1671                }
1672                idx += 1;
1673            }
1674        }
1675
1676        for id in &reaped_ids {
1677            if let Some(persona) = personas.get_mut(id) {
1678                persona.status = PersonaStatus::Reaped;
1679                persona.updated_at = now;
1680            }
1681        }
1682        drop(personas);
1683
1684        for id in &reaped_ids {
1685            self.push_event(ThoughtEvent {
1686                id: Uuid::new_v4().to_string(),
1687                event_type: ThoughtEventType::PersonaReaped,
1688                persona_id: Some(id.clone()),
1689                swarm_id: None,
1690                timestamp: now,
1691                payload: json!({
1692                    "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1693                    "cascade": cascade,
1694                }),
1695            })
1696            .await;
1697        }
1698
1699        Ok(ReapPersonaResponse {
1700            count: reaped_ids.len(),
1701            reaped_ids,
1702        })
1703    }
1704
1705    /// Get latest memory snapshot, if any.
1706    pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1707        self.snapshots.read().await.back().cloned()
1708    }
1709
1710    /// Build lineage graph from current persona state.
1711    pub async fn lineage_graph(&self) -> LineageGraph {
1712        let personas = self.personas.read().await;
1713        let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1714        let mut roots = Vec::new();
1715        let mut total_edges = 0usize;
1716
1717        for persona in personas.values() {
1718            if let Some(parent_id) = persona.identity.parent_id.clone() {
1719                children_by_parent
1720                    .entry(parent_id)
1721                    .or_default()
1722                    .push(persona.identity.id.clone());
1723                total_edges = total_edges.saturating_add(1);
1724            } else {
1725                roots.push(persona.identity.id.clone());
1726            }
1727        }
1728
1729        let mut nodes: Vec<LineageNode> = personas
1730            .values()
1731            .map(|persona| {
1732                let mut children = children_by_parent
1733                    .get(&persona.identity.id)
1734                    .cloned()
1735                    .unwrap_or_default();
1736                children.sort();
1737
1738                LineageNode {
1739                    persona_id: persona.identity.id.clone(),
1740                    parent_id: persona.identity.parent_id.clone(),
1741                    children,
1742                    depth: persona.identity.depth,
1743                    status: persona.status,
1744                }
1745            })
1746            .collect();
1747
1748        nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1749        roots.sort();
1750
1751        LineageGraph {
1752            nodes,
1753            roots,
1754            total_edges,
1755        }
1756    }
1757
1758    /// Return a summary status.
1759    pub async fn status(&self) -> CognitionStatus {
1760        let personas = self.personas.read().await;
1761        let events = self.events.read().await;
1762        let snapshots = self.snapshots.read().await;
1763        let started_at = *self.started_at.read().await;
1764        let last_tick_at = *self.last_tick_at.read().await;
1765        let loop_interval_ms = *self.loop_interval_ms.read().await;
1766
1767        let active_persona_count = personas
1768            .values()
1769            .filter(|p| p.status == PersonaStatus::Active)
1770            .count();
1771
1772        CognitionStatus {
1773            enabled: self.enabled,
1774            running: self.running.load(Ordering::SeqCst),
1775            loop_interval_ms,
1776            started_at,
1777            last_tick_at,
1778            persona_count: personas.len(),
1779            active_persona_count,
1780            events_buffered: events.len(),
1781            snapshots_buffered: snapshots.len(),
1782        }
1783    }
1784
1785    async fn push_event(&self, event: ThoughtEvent) {
1786        push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1787    }
1788
1789    /// Set the tool registry for capability-based tool execution.
1790    pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1791        self.tools = Some(registry);
1792    }
1793
1794    /// Get current beliefs.
1795    pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1796        self.beliefs.read().await.clone()
1797    }
1798
1799    /// Get a single belief by ID.
1800    pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1801        self.beliefs.read().await.get(id).cloned()
1802    }
1803
1804    /// Get the current attention queue.
1805    pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1806        self.attention_queue.read().await.clone()
1807    }
1808
1809    /// Get all proposals.
1810    pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1811        self.proposals.read().await.clone()
1812    }
1813
1814    /// Get the current global workspace.
1815    pub async fn get_workspace(&self) -> GlobalWorkspace {
1816        self.workspace.read().await.clone()
1817    }
1818
1819    /// Get decision receipts.
1820    pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1821        self.receipts.read().await.clone()
1822    }
1823
1824    /// Approve a Critical-risk proposal for execution.
1825    pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1826        let proposals = self.proposals.read().await;
1827        let proposal = proposals
1828            .get(proposal_id)
1829            .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1830
1831        if proposal.risk != ProposalRisk::Critical {
1832            return Err(anyhow!("Only Critical proposals require human approval"));
1833        }
1834        if proposal.status != ProposalStatus::Verified {
1835            return Err(anyhow!("Proposal is not in Verified status"));
1836        }
1837        drop(proposals);
1838
1839        let mut approvals = self.pending_approvals.write().await;
1840        approvals.insert(proposal_id.to_string(), true);
1841        Ok(())
1842    }
1843
1844    /// Get governance settings.
1845    pub async fn get_governance(&self) -> SwarmGovernance {
1846        self.governance.read().await.clone()
1847    }
1848
1849    /// Get persona state for a specific persona.
1850    pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1851        self.personas.read().await.get(id).cloned()
1852    }
1853}
1854
1855async fn push_event_internal(
1856    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1857    max_events: usize,
1858    event_tx: &broadcast::Sender<ThoughtEvent>,
1859    event: ThoughtEvent,
1860) {
1861    {
1862        let mut lock = events.write().await;
1863        lock.push_back(event.clone());
1864        while lock.len() > max_events {
1865            lock.pop_front();
1866        }
1867    }
1868    let _ = event_tx.send(event);
1869}
1870
1871async fn push_snapshot_internal(
1872    snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1873    max_snapshots: usize,
1874    snapshot: MemorySnapshot,
1875) {
1876    let mut lock = snapshots.write().await;
1877    lock.push_back(snapshot);
1878    while lock.len() > max_snapshots {
1879        lock.pop_front();
1880    }
1881}
1882
1883async fn recent_persona_context(
1884    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1885    persona_id: &str,
1886    limit: usize,
1887) -> Vec<ThoughtEvent> {
1888    let lock = events.read().await;
1889    let mut selected: Vec<ThoughtEvent> = lock
1890        .iter()
1891        .rev()
1892        .filter(|event| {
1893            event.persona_id.as_deref() == Some(persona_id)
1894                || (event.persona_id.is_none()
1895                    && matches!(
1896                        event.event_type,
1897                        ThoughtEventType::CheckResult
1898                            | ThoughtEventType::ProposalCreated
1899                            | ThoughtEventType::SnapshotCompressed
1900                            | ThoughtEventType::WorkspaceUpdated
1901                            | ThoughtEventType::ActionExecuted
1902                            | ThoughtEventType::BudgetPaused
1903                    ))
1904        })
1905        .take(limit)
1906        .cloned()
1907        .collect();
1908    selected.reverse();
1909    selected
1910}
1911
1912async fn generate_phase_thought(
1913    thinker: Option<&ThinkerClient>,
1914    work: &ThoughtWorkItem,
1915    context: &[ThoughtEvent],
1916) -> ThoughtResult {
1917    let started_at = Instant::now();
1918    if let Some(client) = thinker {
1919        let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1920        match client.think(&system_prompt, &user_prompt).await {
1921            Ok(output) => {
1922                let thinking = normalize_thought_output(work, context, &output.text);
1923                if !thinking.is_empty() {
1924                    return ThoughtResult {
1925                        source: "model",
1926                        model: Some(output.model),
1927                        finish_reason: output.finish_reason,
1928                        thinking,
1929                        prompt_tokens: output.prompt_tokens,
1930                        completion_tokens: output.completion_tokens,
1931                        total_tokens: output.total_tokens,
1932                        latency_ms: started_at.elapsed().as_millis(),
1933                        error: None,
1934                    };
1935                }
1936            }
1937            Err(error) => {
1938                return ThoughtResult {
1939                    source: "fallback",
1940                    model: None,
1941                    finish_reason: None,
1942                    thinking: fallback_phase_text(work, context),
1943                    prompt_tokens: None,
1944                    completion_tokens: None,
1945                    total_tokens: None,
1946                    latency_ms: started_at.elapsed().as_millis(),
1947                    error: Some(error.to_string()),
1948                };
1949            }
1950        }
1951    }
1952
1953    ThoughtResult {
1954        source: "fallback",
1955        model: None,
1956        finish_reason: None,
1957        thinking: fallback_phase_text(work, context),
1958        prompt_tokens: None,
1959        completion_tokens: None,
1960        total_tokens: None,
1961        latency_ms: started_at.elapsed().as_millis(),
1962        error: None,
1963    }
1964}
1965
1966fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1967    let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1968Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1969Write as an operational process update, not meta narration. \
1970Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
1971Output concrete findings, checks, risks, and next actions. \
1972Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
1973        .to_string();
1974
1975    let context_lines = if context.is_empty() {
1976        "none".to_string()
1977    } else {
1978        context
1979            .iter()
1980            .map(format_context_event)
1981            .collect::<Vec<_>>()
1982            .join("\n")
1983    };
1984
1985    let phase_instruction = match work.phase {
1986        ThoughtPhase::Observe => {
1987            "Process format (exact line labels): \
1988Phase: Observe | Goal: detect current customer/business risk | \
1989Signals: 1-3 concrete signals separated by '; ' | \
1990Uncertainty: one unknown that blocks confidence | \
1991Next_Action: one immediate operational action."
1992        }
1993        ThoughtPhase::Reflect => {
1994            "Process format (exact line labels): \
1995Phase: Reflect | Hypothesis: single testable hypothesis | \
1996Rationale: why this is likely | \
1997Business_Risk: customer/revenue/SLA impact | \
1998Validation_Next_Action: one action to confirm or falsify."
1999        }
2000        ThoughtPhase::Test => {
2001            "Process format (exact line labels): \
2002Phase: Test | Check: single concrete check | \
2003Procedure: short executable procedure | \
2004Expected_Result: pass/fail expectation | \
2005Evidence_Quality: low|medium|high with reason | \
2006Escalation_Trigger: when to escalate immediately."
2007        }
2008        ThoughtPhase::Compress => {
2009            "Process format (exact line labels): \
2010Phase: Compress | State_Summary: current state in one line | \
2011Retained_Facts: 3 short facts separated by '; ' | \
2012Open_Risks: up to 2 unresolved risks separated by '; ' | \
2013Next_Process_Step: next operational step."
2014        }
2015    };
2016
2017    let user_prompt = format!(
2018        "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2019        phase = work.phase.as_str(),
2020        persona_id = work.persona_id,
2021        persona_name = work.persona_name,
2022        role = work.role,
2023        charter = work.charter,
2024        thought_count = work.thought_count,
2025        context = context_lines,
2026        instruction = phase_instruction
2027    );
2028
2029    (system_prompt, user_prompt)
2030}
2031
2032fn format_context_event(event: &ThoughtEvent) -> String {
2033    let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2034    format!(
2035        "{} {} {}",
2036        event.event_type.as_str(),
2037        event.timestamp.to_rfc3339(),
2038        trim_for_storage(&payload, 220)
2039    )
2040}
2041
2042fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2043    let charter = trim_for_storage(&work.charter, 180);
2044    let context_summary = fallback_context_summary(context);
2045    let thought = match work.phase {
2046        ThoughtPhase::Observe => format!(
2047            "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2048            work.role, charter, context_summary
2049        ),
2050        ThoughtPhase::Reflect => format!(
2051            "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
2052        ),
2053        ThoughtPhase::Test => format!(
2054            "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
2055        ),
2056        ThoughtPhase::Compress => format!(
2057            "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2058            work.role, charter, context_summary
2059        ),
2060    };
2061    trim_for_storage(&thought, 1_200)
2062}
2063
2064fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2065    let trimmed = trim_for_storage(raw, 2_000);
2066    if trimmed.trim().is_empty() {
2067        return fallback_phase_text(work, context);
2068    }
2069
2070    // Prefer process-labeled content if the model emitted a preamble first.
2071    if let Some(idx) = find_process_label_start(&trimmed) {
2072        let candidate = trimmed[idx..].trim();
2073        // Return the full candidate (all structured fields), not just the first line.
2074        if !candidate.is_empty()
2075            && !candidate.contains('<')
2076            && !has_template_placeholder_values(candidate)
2077        {
2078            // If it's a multi-line structured output, collapse to pipe-delimited single line.
2079            let collapsed: String = candidate
2080                .lines()
2081                .map(str::trim)
2082                .filter(|l| !l.is_empty())
2083                .collect::<Vec<_>>()
2084                .join(" | ");
2085            let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2086            if cleaned.starts_with("Phase:") {
2087                return cleaned.to_string();
2088            }
2089            return collapsed;
2090        }
2091    }
2092
2093    let lower = trimmed.to_ascii_lowercase();
2094    let looks_meta = lower.starts_with("we need")
2095        || lower.starts_with("i need")
2096        || lower.contains("we need to")
2097        || lower.contains("i need to")
2098        || lower.contains("must output")
2099        || lower.contains("let's ")
2100        || lower.contains("we have to");
2101
2102    if looks_meta || has_template_placeholder_values(&trimmed) {
2103        return fallback_phase_text(work, context);
2104    }
2105
2106    trimmed
2107}
2108
2109fn has_template_placeholder_values(text: &str) -> bool {
2110    let lower = text.to_ascii_lowercase();
2111    [
2112        "goal: ...",
2113        "signals: ...",
2114        "uncertainty: ...",
2115        "next_action: ...",
2116        "hypothesis: ...",
2117        "rationale: ...",
2118        "business_risk: ...",
2119        "validation_next_action: ...",
2120        "check: ...",
2121        "procedure: ...",
2122        "expected_result: ...",
2123        "evidence_quality: ...",
2124        "escalation_trigger: ...",
2125        "state_summary: ...",
2126        "retained_facts: ...",
2127        "open_risks: ...",
2128        "next_process_step: ...",
2129    ]
2130    .iter()
2131    .any(|needle| lower.contains(needle))
2132        || lower.contains("<...")
2133        || lower.contains("tbd")
2134        || lower.contains("todo")
2135}
2136
2137fn find_process_label_start(text: &str) -> Option<usize> {
2138    [
2139        "Phase: Observe",
2140        "Phase: Reflect",
2141        "Phase: Test",
2142        "Phase: Compress",
2143        "Phase:",
2144    ]
2145    .iter()
2146    .filter_map(|label| text.find(label))
2147    .min()
2148}
2149
2150fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2151    if context.is_empty() {
2152        return "No prior events recorded yet.".to_string();
2153    }
2154
2155    let mut latest_error: Option<String> = None;
2156    let mut latest_proposal: Option<String> = None;
2157    let mut latest_check: Option<String> = None;
2158
2159    for event in context.iter().rev() {
2160        if latest_error.is_none()
2161            && let Some(error) = event
2162                .payload
2163                .get("error")
2164                .and_then(serde_json::Value::as_str)
2165            && !error.trim().is_empty()
2166        {
2167            latest_error = Some(trim_for_storage(error, 140));
2168        }
2169
2170        if latest_proposal.is_none()
2171            && event.event_type == ThoughtEventType::ProposalCreated
2172            && let Some(title) = event
2173                .payload
2174                .get("title")
2175                .and_then(serde_json::Value::as_str)
2176            && !title.trim().is_empty()
2177            && !has_template_placeholder_values(title)
2178        {
2179            latest_proposal = Some(trim_for_storage(title, 120));
2180        }
2181
2182        if latest_check.is_none()
2183            && event.event_type == ThoughtEventType::CheckResult
2184            && let Some(result) = event
2185                .payload
2186                .get("result_excerpt")
2187                .and_then(serde_json::Value::as_str)
2188            && !result.trim().is_empty()
2189            && !has_template_placeholder_values(result)
2190        {
2191            latest_check = Some(trim_for_storage(result, 140));
2192        }
2193
2194        if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2195            break;
2196        }
2197    }
2198
2199    let mut lines = vec![format!(
2200        "{} recent cognition events are available.",
2201        context.len()
2202    )];
2203    if let Some(error) = latest_error {
2204        lines.push(format!("Latest error signal: {}.", error));
2205    }
2206    if let Some(proposal) = latest_proposal {
2207        lines.push(format!("Recent proposal: {}.", proposal));
2208    }
2209    if let Some(check) = latest_check {
2210        lines.push(format!("Recent check: {}.", check));
2211    }
2212    lines.join(" ")
2213}
2214
2215fn trim_for_storage(input: &str, max_chars: usize) -> String {
2216    if input.chars().count() <= max_chars {
2217        return input.trim().to_string();
2218    }
2219    let mut trimmed = String::with_capacity(max_chars + 8);
2220    for ch in input.chars().take(max_chars) {
2221        trimmed.push(ch);
2222    }
2223    trimmed.push_str("...");
2224    trimmed.trim().to_string()
2225}
2226
2227fn estimate_fact_count(text: &str) -> usize {
2228    let sentence_count =
2229        text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2230    sentence_count.clamp(1, 12)
2231}
2232
2233fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2234    let first_line = thought
2235        .lines()
2236        .find(|line| !line.trim().is_empty())
2237        .unwrap_or("proposal");
2238    let compact = first_line
2239        .replace(['\t', '\r', '\n'], " ")
2240        .split_whitespace()
2241        .collect::<Vec<_>>()
2242        .join(" ");
2243    let trimmed = trim_for_storage(&compact, 72);
2244    if trimmed.is_empty() {
2245        format!("proposal-{}", thought_count)
2246    } else {
2247        trimmed
2248    }
2249}
2250
2251fn default_seed_persona() -> CreatePersonaRequest {
2252    CreatePersonaRequest {
2253        persona_id: Some("root-thinker".to_string()),
2254        name: "root-thinker".to_string(),
2255        role: "orchestrator".to_string(),
2256        charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2257            .to_string(),
2258        swarm_id: Some("swarm-core".to_string()),
2259        parent_id: None,
2260        policy: None,
2261        tags: vec!["orchestration".to_string()],
2262    }
2263}
2264
2265fn normalize_thinker_endpoint(base_url: &str) -> String {
2266    let trimmed = base_url.trim().trim_end_matches('/');
2267    if trimmed.ends_with("/chat/completions") {
2268        return trimmed.to_string();
2269    }
2270    if trimmed.is_empty() {
2271        return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2272    }
2273    format!("{}/chat/completions", trimmed)
2274}
2275
2276fn env_bool(name: &str, default: bool) -> bool {
2277    std::env::var(name)
2278        .ok()
2279        .and_then(|v| match v.to_ascii_lowercase().as_str() {
2280            "1" | "true" | "yes" | "on" => Some(true),
2281            "0" | "false" | "no" | "off" => Some(false),
2282            _ => None,
2283        })
2284        .unwrap_or(default)
2285}
2286
2287fn env_f32(name: &str, default: f32) -> f32 {
2288    std::env::var(name)
2289        .ok()
2290        .and_then(|v| v.parse::<f32>().ok())
2291        .unwrap_or(default)
2292}
2293
2294fn env_u64(name: &str, default: u64) -> u64 {
2295    std::env::var(name)
2296        .ok()
2297        .and_then(|v| v.parse::<u64>().ok())
2298        .unwrap_or(default)
2299}
2300
2301fn env_u32(name: &str, default: u32) -> u32 {
2302    std::env::var(name)
2303        .ok()
2304        .and_then(|v| v.parse::<u32>().ok())
2305        .unwrap_or(default)
2306}
2307
2308fn env_usize(name: &str, default: usize) -> usize {
2309    std::env::var(name)
2310        .ok()
2311        .and_then(|v| v.parse::<usize>().ok())
2312        .unwrap_or(default)
2313}
2314
2315#[cfg(test)]
2316mod tests {
2317    use super::*;
2318
2319    fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2320        ThoughtWorkItem {
2321            persona_id: "p-1".to_string(),
2322            persona_name: "Spotlessbinco Business Thinker".to_string(),
2323            role: "principal reliability engineer".to_string(),
2324            charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2325                .to_string(),
2326            swarm_id: Some("spotlessbinco".to_string()),
2327            thought_count: 4,
2328            phase,
2329        }
2330    }
2331
2332    fn test_runtime() -> CognitionRuntime {
2333        CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2334            enabled: true,
2335            loop_interval_ms: 25,
2336            max_events: 256,
2337            max_snapshots: 32,
2338            default_policy: PersonaPolicy {
2339                max_spawn_depth: 2,
2340                max_branching_factor: 2,
2341                token_budget_per_minute: 1_000,
2342                compute_ms_per_minute: 1_000,
2343                idle_ttl_secs: 300,
2344                share_memory: false,
2345                allowed_tools: Vec::new(),
2346            },
2347        })
2348    }
2349
2350    #[test]
2351    fn normalize_rejects_placeholder_process_line() {
2352        let work = sample_work_item(ThoughtPhase::Compress);
2353        let output = normalize_thought_output(
2354            &work,
2355            &[],
2356            "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2357        );
2358        assert!(
2359            output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2360        );
2361        assert!(!output.contains("State_Summary: ..."));
2362    }
2363
2364    #[test]
2365    fn normalize_accepts_concrete_process_line() {
2366        let work = sample_work_item(ThoughtPhase::Test);
2367        let output = normalize_thought_output(
2368            &work,
2369            &[],
2370            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2371        );
2372        assert_eq!(
2373            output,
2374            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2375        );
2376    }
2377
2378    #[tokio::test]
2379    async fn create_spawn_and_lineage_work() {
2380        let runtime = test_runtime();
2381
2382        let root = runtime
2383            .create_persona(CreatePersonaRequest {
2384                persona_id: Some("root".to_string()),
2385                name: "root".to_string(),
2386                role: "orchestrator".to_string(),
2387                charter: "coordinate".to_string(),
2388                swarm_id: Some("swarm-a".to_string()),
2389                parent_id: None,
2390                policy: None,
2391                tags: Vec::new(),
2392            })
2393            .await
2394            .expect("root should be created");
2395
2396        assert_eq!(root.identity.depth, 0);
2397
2398        let child = runtime
2399            .spawn_child(
2400                "root",
2401                SpawnPersonaRequest {
2402                    persona_id: Some("child-1".to_string()),
2403                    name: "child-1".to_string(),
2404                    role: "analyst".to_string(),
2405                    charter: "analyze".to_string(),
2406                    swarm_id: None,
2407                    policy: None,
2408                },
2409            )
2410            .await
2411            .expect("child should spawn");
2412
2413        assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2414        assert_eq!(child.identity.depth, 1);
2415
2416        let lineage = runtime.lineage_graph().await;
2417        assert_eq!(lineage.total_edges, 1);
2418        assert_eq!(lineage.roots, vec!["root".to_string()]);
2419    }
2420
2421    #[tokio::test]
2422    async fn branching_and_depth_limits_are_enforced() {
2423        let runtime = test_runtime();
2424
2425        runtime
2426            .create_persona(CreatePersonaRequest {
2427                persona_id: Some("root".to_string()),
2428                name: "root".to_string(),
2429                role: "orchestrator".to_string(),
2430                charter: "coordinate".to_string(),
2431                swarm_id: Some("swarm-a".to_string()),
2432                parent_id: None,
2433                policy: None,
2434                tags: Vec::new(),
2435            })
2436            .await
2437            .expect("root should be created");
2438
2439        runtime
2440            .spawn_child(
2441                "root",
2442                SpawnPersonaRequest {
2443                    persona_id: Some("c1".to_string()),
2444                    name: "c1".to_string(),
2445                    role: "worker".to_string(),
2446                    charter: "run".to_string(),
2447                    swarm_id: None,
2448                    policy: None,
2449                },
2450            )
2451            .await
2452            .expect("first child should spawn");
2453
2454        runtime
2455            .spawn_child(
2456                "root",
2457                SpawnPersonaRequest {
2458                    persona_id: Some("c2".to_string()),
2459                    name: "c2".to_string(),
2460                    role: "worker".to_string(),
2461                    charter: "run".to_string(),
2462                    swarm_id: None,
2463                    policy: None,
2464                },
2465            )
2466            .await
2467            .expect("second child should spawn");
2468
2469        let third_child = runtime
2470            .spawn_child(
2471                "root",
2472                SpawnPersonaRequest {
2473                    persona_id: Some("c3".to_string()),
2474                    name: "c3".to_string(),
2475                    role: "worker".to_string(),
2476                    charter: "run".to_string(),
2477                    swarm_id: None,
2478                    policy: None,
2479                },
2480            )
2481            .await;
2482        assert!(third_child.is_err());
2483
2484        runtime
2485            .spawn_child(
2486                "c1",
2487                SpawnPersonaRequest {
2488                    persona_id: Some("c1-1".to_string()),
2489                    name: "c1-1".to_string(),
2490                    role: "worker".to_string(),
2491                    charter: "run".to_string(),
2492                    swarm_id: None,
2493                    policy: None,
2494                },
2495            )
2496            .await
2497            .expect("depth 2 should be allowed");
2498
2499        let depth_violation = runtime
2500            .spawn_child(
2501                "c1-1",
2502                SpawnPersonaRequest {
2503                    persona_id: Some("c1-1-1".to_string()),
2504                    name: "c1-1-1".to_string(),
2505                    role: "worker".to_string(),
2506                    charter: "run".to_string(),
2507                    swarm_id: None,
2508                    policy: None,
2509                },
2510            )
2511            .await;
2512        assert!(depth_violation.is_err());
2513    }
2514
2515    #[tokio::test]
2516    async fn start_stop_updates_runtime_status() {
2517        let runtime = test_runtime();
2518
2519        runtime
2520            .start(Some(StartCognitionRequest {
2521                loop_interval_ms: Some(10),
2522                seed_persona: Some(CreatePersonaRequest {
2523                    persona_id: Some("seed".to_string()),
2524                    name: "seed".to_string(),
2525                    role: "watcher".to_string(),
2526                    charter: "observe".to_string(),
2527                    swarm_id: Some("swarm-seed".to_string()),
2528                    parent_id: None,
2529                    policy: None,
2530                    tags: Vec::new(),
2531                }),
2532            }))
2533            .await
2534            .expect("runtime should start");
2535
2536        tokio::time::sleep(Duration::from_millis(60)).await;
2537        let running_status = runtime.status().await;
2538        assert!(running_status.running);
2539        assert!(running_status.events_buffered > 0);
2540
2541        runtime
2542            .stop(Some("test".to_string()))
2543            .await
2544            .expect("runtime should stop");
2545        let stopped_status = runtime.status().await;
2546        assert!(!stopped_status.running);
2547    }
2548
2549    #[tokio::test]
2550    async fn zero_budget_persona_is_skipped() {
2551        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2552            enabled: true,
2553            loop_interval_ms: 10,
2554            max_events: 256,
2555            max_snapshots: 32,
2556            default_policy: PersonaPolicy {
2557                max_spawn_depth: 2,
2558                max_branching_factor: 2,
2559                token_budget_per_minute: 0,
2560                compute_ms_per_minute: 0,
2561                idle_ttl_secs: 3_600,
2562                share_memory: false,
2563                allowed_tools: Vec::new(),
2564            },
2565        });
2566
2567        let persona = runtime
2568            .create_persona(CreatePersonaRequest {
2569                persona_id: Some("budget-test".to_string()),
2570                name: "budget-test".to_string(),
2571                role: "tester".to_string(),
2572                charter: "test budgets".to_string(),
2573                swarm_id: None,
2574                parent_id: None,
2575                policy: None,
2576                tags: Vec::new(),
2577            })
2578            .await
2579            .expect("should create persona");
2580
2581        assert_eq!(persona.tokens_this_window, 0);
2582        assert_eq!(persona.compute_ms_this_window, 0);
2583
2584        // Start and run a few ticks
2585        runtime.start(None).await.expect("should start");
2586        tokio::time::sleep(Duration::from_millis(50)).await;
2587        runtime.stop(None).await.expect("should stop");
2588
2589        // Persona should still have 0 thought count (budget blocked)
2590        let p = runtime.get_persona("budget-test").await.unwrap();
2591        assert_eq!(p.thought_count, 0);
2592        assert!(p.budget_paused);
2593    }
2594
2595    #[tokio::test]
2596    async fn budget_counters_reset_after_window() {
2597        let now = Utc::now();
2598        let mut persona = PersonaRuntimeState {
2599            identity: PersonaIdentity {
2600                id: "p1".to_string(),
2601                name: "test".to_string(),
2602                role: "tester".to_string(),
2603                charter: "test".to_string(),
2604                swarm_id: None,
2605                parent_id: None,
2606                depth: 0,
2607                created_at: now,
2608                tags: Vec::new(),
2609            },
2610            policy: PersonaPolicy::default(),
2611            status: PersonaStatus::Active,
2612            thought_count: 0,
2613            last_tick_at: None,
2614            updated_at: now,
2615            tokens_this_window: 5000,
2616            compute_ms_this_window: 3000,
2617            window_started_at: now - ChronoDuration::seconds(61),
2618            last_progress_at: now,
2619            budget_paused: false,
2620        };
2621
2622        // Simulate window reset check
2623        let window_elapsed = now
2624            .signed_duration_since(persona.window_started_at)
2625            .num_seconds();
2626        assert!(window_elapsed >= 60);
2627
2628        // Reset
2629        persona.tokens_this_window = 0;
2630        persona.compute_ms_this_window = 0;
2631        persona.window_started_at = now;
2632
2633        assert_eq!(persona.tokens_this_window, 0);
2634        assert_eq!(persona.compute_ms_this_window, 0);
2635    }
2636
2637    #[tokio::test]
2638    async fn idle_persona_is_reaped() {
2639        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2640            enabled: true,
2641            loop_interval_ms: 10,
2642            max_events: 256,
2643            max_snapshots: 32,
2644            default_policy: PersonaPolicy {
2645                max_spawn_depth: 2,
2646                max_branching_factor: 2,
2647                token_budget_per_minute: 20_000,
2648                compute_ms_per_minute: 10_000,
2649                idle_ttl_secs: 0, // 0 TTL = immediately idle
2650                share_memory: false,
2651                allowed_tools: Vec::new(),
2652            },
2653        });
2654
2655        runtime
2656            .create_persona(CreatePersonaRequest {
2657                persona_id: Some("idle-test".to_string()),
2658                name: "idle-test".to_string(),
2659                role: "idler".to_string(),
2660                charter: "idle away".to_string(),
2661                swarm_id: None,
2662                parent_id: None,
2663                policy: None,
2664                tags: Vec::new(),
2665            })
2666            .await
2667            .expect("should create persona");
2668
2669        // Manually set last_progress_at to the past
2670        {
2671            let mut personas = runtime.personas.write().await;
2672            if let Some(p) = personas.get_mut("idle-test") {
2673                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2674            }
2675        }
2676
2677        runtime.start(None).await.expect("should start");
2678        tokio::time::sleep(Duration::from_millis(100)).await;
2679        runtime.stop(None).await.expect("should stop");
2680
2681        let p = runtime.get_persona("idle-test").await.unwrap();
2682        assert_eq!(p.status, PersonaStatus::Reaped);
2683    }
2684
2685    #[tokio::test]
2686    async fn budget_paused_persona_not_reaped_for_idle() {
2687        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2688            enabled: true,
2689            loop_interval_ms: 10,
2690            max_events: 256,
2691            max_snapshots: 32,
2692            default_policy: PersonaPolicy {
2693                max_spawn_depth: 2,
2694                max_branching_factor: 2,
2695                token_budget_per_minute: 0, // zero budget = always paused
2696                compute_ms_per_minute: 0,
2697                idle_ttl_secs: 0, // 0 TTL
2698                share_memory: false,
2699                allowed_tools: Vec::new(),
2700            },
2701        });
2702
2703        runtime
2704            .create_persona(CreatePersonaRequest {
2705                persona_id: Some("paused-test".to_string()),
2706                name: "paused-test".to_string(),
2707                role: "pauser".to_string(),
2708                charter: "pause".to_string(),
2709                swarm_id: None,
2710                parent_id: None,
2711                policy: None,
2712                tags: Vec::new(),
2713            })
2714            .await
2715            .expect("should create persona");
2716
2717        // Set last_progress_at to the past to trigger idle check
2718        {
2719            let mut personas = runtime.personas.write().await;
2720            if let Some(p) = personas.get_mut("paused-test") {
2721                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2722            }
2723        }
2724
2725        runtime.start(None).await.expect("should start");
2726        tokio::time::sleep(Duration::from_millis(100)).await;
2727        runtime.stop(None).await.expect("should stop");
2728
2729        let p = runtime.get_persona("paused-test").await.unwrap();
2730        // Budget-paused personas are NOT reaped for idle
2731        assert_eq!(p.status, PersonaStatus::Active);
2732        assert!(p.budget_paused);
2733    }
2734
2735    #[tokio::test]
2736    async fn governance_proposal_resolution() {
2737        let runtime = test_runtime();
2738        let gov = SwarmGovernance {
2739            quorum_fraction: 0.5,
2740            required_approvers_by_role: HashMap::new(),
2741            veto_roles: vec!["auditor".to_string()],
2742            vote_timeout_secs: 300,
2743        };
2744        *runtime.governance.write().await = gov;
2745
2746        // Create two personas
2747        runtime
2748            .create_persona(CreatePersonaRequest {
2749                persona_id: Some("voter-1".to_string()),
2750                name: "voter-1".to_string(),
2751                role: "engineer".to_string(),
2752                charter: "vote".to_string(),
2753                swarm_id: None,
2754                parent_id: None,
2755                policy: None,
2756                tags: Vec::new(),
2757            })
2758            .await
2759            .unwrap();
2760        runtime
2761            .create_persona(CreatePersonaRequest {
2762                persona_id: Some("voter-2".to_string()),
2763                name: "voter-2".to_string(),
2764                role: "engineer".to_string(),
2765                charter: "vote".to_string(),
2766                swarm_id: None,
2767                parent_id: None,
2768                policy: None,
2769                tags: Vec::new(),
2770            })
2771            .await
2772            .unwrap();
2773
2774        // Insert a proposal with votes
2775        {
2776            let mut proposals = runtime.proposals.write().await;
2777            let mut votes = HashMap::new();
2778            votes.insert("voter-1".to_string(), ProposalVote::Approve);
2779            proposals.insert(
2780                "prop-1".to_string(),
2781                Proposal {
2782                    id: "prop-1".to_string(),
2783                    persona_id: "voter-1".to_string(),
2784                    title: "test proposal".to_string(),
2785                    rationale: "testing governance".to_string(),
2786                    evidence_refs: Vec::new(),
2787                    risk: ProposalRisk::Low,
2788                    status: ProposalStatus::Created,
2789                    created_at: Utc::now(),
2790                    votes,
2791                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2792                    votes_requested: true,
2793                    quorum_needed: 1,
2794                },
2795            );
2796        }
2797
2798        // quorum = 0.5 * 2 = 1, and we have 1 approval → should be verified
2799        runtime.start(None).await.unwrap();
2800        tokio::time::sleep(Duration::from_millis(100)).await;
2801        runtime.stop(None).await.unwrap();
2802
2803        let proposals = runtime.get_proposals().await;
2804        let prop = proposals.get("prop-1").unwrap();
2805        // Should be verified or executed (it gets verified then executed in same tick)
2806        assert!(
2807            prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2808            "Expected Verified or Executed, got {:?}",
2809            prop.status
2810        );
2811    }
2812
2813    #[tokio::test]
2814    async fn veto_rejects_proposal() {
2815        let runtime = test_runtime();
2816        let gov = SwarmGovernance {
2817            quorum_fraction: 0.5,
2818            required_approvers_by_role: HashMap::new(),
2819            veto_roles: vec!["auditor".to_string()],
2820            vote_timeout_secs: 300,
2821        };
2822        *runtime.governance.write().await = gov;
2823
2824        runtime
2825            .create_persona(CreatePersonaRequest {
2826                persona_id: Some("eng".to_string()),
2827                name: "eng".to_string(),
2828                role: "engineer".to_string(),
2829                charter: "build".to_string(),
2830                swarm_id: None,
2831                parent_id: None,
2832                policy: None,
2833                tags: Vec::new(),
2834            })
2835            .await
2836            .unwrap();
2837        runtime
2838            .create_persona(CreatePersonaRequest {
2839                persona_id: Some("aud".to_string()),
2840                name: "aud".to_string(),
2841                role: "auditor".to_string(),
2842                charter: "audit".to_string(),
2843                swarm_id: None,
2844                parent_id: None,
2845                policy: None,
2846                tags: Vec::new(),
2847            })
2848            .await
2849            .unwrap();
2850
2851        {
2852            let mut proposals = runtime.proposals.write().await;
2853            let mut votes = HashMap::new();
2854            votes.insert("eng".to_string(), ProposalVote::Approve);
2855            votes.insert("aud".to_string(), ProposalVote::Veto);
2856            proposals.insert(
2857                "prop-veto".to_string(),
2858                Proposal {
2859                    id: "prop-veto".to_string(),
2860                    persona_id: "eng".to_string(),
2861                    title: "vetoed proposal".to_string(),
2862                    rationale: "testing veto".to_string(),
2863                    evidence_refs: Vec::new(),
2864                    risk: ProposalRisk::Low,
2865                    status: ProposalStatus::Created,
2866                    created_at: Utc::now(),
2867                    votes,
2868                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2869                    votes_requested: true,
2870                    quorum_needed: 1,
2871                },
2872            );
2873        }
2874
2875        runtime.start(None).await.unwrap();
2876        tokio::time::sleep(Duration::from_millis(100)).await;
2877        runtime.stop(None).await.unwrap();
2878
2879        let proposals = runtime.get_proposals().await;
2880        let prop = proposals.get("prop-veto").unwrap();
2881        assert_eq!(prop.status, ProposalStatus::Rejected);
2882    }
2883
2884    #[test]
2885    fn global_workspace_default() {
2886        let ws = GlobalWorkspace::default();
2887        assert!(ws.top_beliefs.is_empty());
2888        assert!(ws.top_uncertainties.is_empty());
2889        assert!(ws.top_attention.is_empty());
2890    }
2891
2892    #[test]
2893    fn attention_item_creation() {
2894        let item = AttentionItem {
2895            id: "a1".to_string(),
2896            topic: "test topic".to_string(),
2897            topic_tags: vec!["reliability".to_string()],
2898            priority: 0.8,
2899            source_type: AttentionSource::ContestedBelief,
2900            source_id: "b1".to_string(),
2901            assigned_persona: None,
2902            created_at: Utc::now(),
2903            resolved_at: None,
2904        };
2905        assert!(item.resolved_at.is_none());
2906        assert_eq!(item.priority, 0.8);
2907    }
2908}