Skip to main content

codetether_agent/cognition/
mod.rs

1//! Perpetual cognition runtime for persona swarms.
2//!
3//! Phase 0 scope:
4//! - Contract types for personas, thought events, proposals, and snapshots
5//! - In-memory runtime manager for lifecycle + lineage
6//! - Feature-flagged perpetual loop (no external execution side effects)
7
8pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16pub use thinker::{
17    CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
18};
19
20use crate::tool::ToolRegistry;
21use anyhow::{Result, anyhow};
22use beliefs::{Belief, BeliefStatus};
23use chrono::{DateTime, Duration as ChronoDuration, Utc};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::time::Duration;
30use tokio::sync::{Mutex, RwLock, broadcast};
31use tokio::task::JoinHandle;
32use tokio::time::Instant;
33use uuid::Uuid;
34
35/// Persona execution status.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub enum PersonaStatus {
39    Active,
40    Idle,
41    Reaped,
42    Error,
43}
44
45/// Policy boundaries for a persona.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct PersonaPolicy {
48    pub max_spawn_depth: u32,
49    pub max_branching_factor: u32,
50    pub token_budget_per_minute: u32,
51    pub compute_ms_per_minute: u32,
52    pub idle_ttl_secs: u64,
53    pub share_memory: bool,
54    #[serde(default)]
55    pub allowed_tools: Vec<String>,
56}
57
58impl Default for PersonaPolicy {
59    fn default() -> Self {
60        Self {
61            max_spawn_depth: 4,
62            max_branching_factor: 4,
63            token_budget_per_minute: 20_000,
64            compute_ms_per_minute: 10_000,
65            idle_ttl_secs: 3_600,
66            share_memory: false,
67            allowed_tools: Vec::new(),
68        }
69    }
70}
71
72/// Identity contract for a persona.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct PersonaIdentity {
75    pub id: String,
76    pub name: String,
77    pub role: String,
78    pub charter: String,
79    pub swarm_id: Option<String>,
80    pub parent_id: Option<String>,
81    pub depth: u32,
82    pub created_at: DateTime<Utc>,
83    #[serde(default)]
84    pub tags: Vec<String>,
85}
86
87/// Full runtime state for a persona.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PersonaRuntimeState {
90    pub identity: PersonaIdentity,
91    pub policy: PersonaPolicy,
92    pub status: PersonaStatus,
93    pub thought_count: u64,
94    pub last_tick_at: Option<DateTime<Utc>>,
95    pub updated_at: DateTime<Utc>,
96    /// Tokens consumed in current 60-second window.
97    pub tokens_this_window: u32,
98    /// Compute milliseconds consumed in current 60-second window.
99    pub compute_ms_this_window: u32,
100    /// Start of the current budget window.
101    pub window_started_at: DateTime<Utc>,
102    /// Last time this persona made meaningful progress (not budget-paused/quorum-waiting).
103    pub last_progress_at: DateTime<Utc>,
104    /// Whether this persona is currently paused due to budget exhaustion.
105    #[serde(default)]
106    pub budget_paused: bool,
107}
108
109/// Proposal risk level.
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
111#[serde(rename_all = "snake_case")]
112pub enum ProposalRisk {
113    Low,
114    Medium,
115    High,
116    Critical,
117}
118
119/// Proposal status.
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "snake_case")]
122pub enum ProposalStatus {
123    Created,
124    Verified,
125    Rejected,
126    Executed,
127}
128
129/// A vote on a proposal.
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub enum ProposalVote {
133    Approve,
134    Reject,
135    Veto,
136    Abstain,
137}
138
139/// Proposal contract (think first, execute through gates).
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct Proposal {
142    pub id: String,
143    pub persona_id: String,
144    pub title: String,
145    pub rationale: String,
146    pub evidence_refs: Vec<String>,
147    pub risk: ProposalRisk,
148    pub status: ProposalStatus,
149    pub created_at: DateTime<Utc>,
150    /// Votes from personas: persona_id -> vote
151    #[serde(default)]
152    pub votes: HashMap<String, ProposalVote>,
153    /// Deadline for voting
154    pub vote_deadline: Option<DateTime<Utc>>,
155    /// Whether votes have been requested this cycle
156    #[serde(default)]
157    pub votes_requested: bool,
158}
159
160/// Thought/event types emitted by the cognition loop.
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "snake_case")]
163pub enum ThoughtEventType {
164    ThoughtGenerated,
165    HypothesisRaised,
166    CheckRequested,
167    CheckResult,
168    ProposalCreated,
169    ProposalVerified,
170    ProposalRejected,
171    ActionExecuted,
172    PersonaSpawned,
173    PersonaReaped,
174    SnapshotCompressed,
175    BeliefExtracted,
176    BeliefContested,
177    BeliefRevalidated,
178    BudgetPaused,
179    IdleReaped,
180    AttentionCreated,
181    VoteCast,
182    WorkspaceUpdated,
183}
184
185impl ThoughtEventType {
186    fn as_str(&self) -> &'static str {
187        match self {
188            Self::ThoughtGenerated => "thought_generated",
189            Self::HypothesisRaised => "hypothesis_raised",
190            Self::CheckRequested => "check_requested",
191            Self::CheckResult => "check_result",
192            Self::ProposalCreated => "proposal_created",
193            Self::ProposalVerified => "proposal_verified",
194            Self::ProposalRejected => "proposal_rejected",
195            Self::ActionExecuted => "action_executed",
196            Self::PersonaSpawned => "persona_spawned",
197            Self::PersonaReaped => "persona_reaped",
198            Self::SnapshotCompressed => "snapshot_compressed",
199            Self::BeliefExtracted => "belief_extracted",
200            Self::BeliefContested => "belief_contested",
201            Self::BeliefRevalidated => "belief_revalidated",
202            Self::BudgetPaused => "budget_paused",
203            Self::IdleReaped => "idle_reaped",
204            Self::AttentionCreated => "attention_created",
205            Self::VoteCast => "vote_cast",
206            Self::WorkspaceUpdated => "workspace_updated",
207        }
208    }
209}
210
211/// Streamable thought event contract.
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ThoughtEvent {
214    pub id: String,
215    pub event_type: ThoughtEventType,
216    pub persona_id: Option<String>,
217    pub swarm_id: Option<String>,
218    pub timestamp: DateTime<Utc>,
219    pub payload: serde_json::Value,
220}
221
222/// Distilled memory snapshot contract.
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct MemorySnapshot {
225    pub id: String,
226    pub generated_at: DateTime<Utc>,
227    pub swarm_id: Option<String>,
228    pub persona_scope: Vec<String>,
229    pub summary: String,
230    pub hot_event_count: usize,
231    pub warm_fact_count: usize,
232    pub cold_snapshot_count: usize,
233    pub metadata: HashMap<String, serde_json::Value>,
234}
235
236/// Request payload for creating a persona.
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct CreatePersonaRequest {
239    pub persona_id: Option<String>,
240    pub name: String,
241    pub role: String,
242    pub charter: String,
243    pub swarm_id: Option<String>,
244    pub parent_id: Option<String>,
245    pub policy: Option<PersonaPolicy>,
246    #[serde(default)]
247    pub tags: Vec<String>,
248}
249
250/// Request payload for spawning a child persona.
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct SpawnPersonaRequest {
253    pub persona_id: Option<String>,
254    pub name: String,
255    pub role: String,
256    pub charter: String,
257    pub swarm_id: Option<String>,
258    pub policy: Option<PersonaPolicy>,
259}
260
261/// Request payload for reaping persona(s).
262#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct ReapPersonaRequest {
264    pub cascade: Option<bool>,
265    pub reason: Option<String>,
266}
267
268/// Start-control payload for perpetual cognition.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct StartCognitionRequest {
271    pub loop_interval_ms: Option<u64>,
272    pub seed_persona: Option<CreatePersonaRequest>,
273}
274
275/// Stop-control payload for perpetual cognition.
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct StopCognitionRequest {
278    pub reason: Option<String>,
279}
280
281// ── Attention, Governance, GlobalWorkspace ──────────────────────────────
282
283/// Source of an attention item.
284#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
285#[serde(rename_all = "snake_case")]
286pub enum AttentionSource {
287    ContestedBelief,
288    FailedCheck,
289    StaleBelief,
290    ProposalTimeout,
291    FailedExecution,
292}
293
294/// An item requiring persona attention.
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct AttentionItem {
297    pub id: String,
298    pub topic: String,
299    pub topic_tags: Vec<String>,
300    pub priority: f32,
301    pub source_type: AttentionSource,
302    pub source_id: String,
303    pub assigned_persona: Option<String>,
304    pub created_at: DateTime<Utc>,
305    pub resolved_at: Option<DateTime<Utc>>,
306}
307
308/// Governance rules for swarm voting.
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SwarmGovernance {
311    pub quorum_fraction: f32,
312    pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
313    pub veto_roles: Vec<String>,
314    pub vote_timeout_secs: u64,
315}
316
317impl Default for SwarmGovernance {
318    fn default() -> Self {
319        Self {
320            quorum_fraction: 0.5,
321            required_approvers_by_role: HashMap::new(),
322            veto_roles: Vec::new(),
323            vote_timeout_secs: 300,
324        }
325    }
326}
327
328/// The coherent "now" for the entire swarm.
329#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct GlobalWorkspace {
331    pub top_beliefs: Vec<String>,
332    pub top_uncertainties: Vec<String>,
333    pub top_attention: Vec<String>,
334    pub active_objectives: Vec<String>,
335    pub updated_at: DateTime<Utc>,
336}
337
338impl Default for GlobalWorkspace {
339    fn default() -> Self {
340        Self {
341            top_beliefs: Vec::new(),
342            top_uncertainties: Vec::new(),
343            top_attention: Vec::new(),
344            active_objectives: Vec::new(),
345            updated_at: Utc::now(),
346        }
347    }
348}
349
350/// Start/stop response status.
351#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct CognitionStatus {
353    pub enabled: bool,
354    pub running: bool,
355    pub loop_interval_ms: u64,
356    pub started_at: Option<DateTime<Utc>>,
357    pub last_tick_at: Option<DateTime<Utc>>,
358    pub persona_count: usize,
359    pub active_persona_count: usize,
360    pub events_buffered: usize,
361    pub snapshots_buffered: usize,
362}
363
364/// Reap response.
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub struct ReapPersonaResponse {
367    pub reaped_ids: Vec<String>,
368    pub count: usize,
369}
370
371/// Lineage node response.
372#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct LineageNode {
374    pub persona_id: String,
375    pub parent_id: Option<String>,
376    pub children: Vec<String>,
377    pub depth: u32,
378    pub status: PersonaStatus,
379}
380
381/// Lineage graph response.
382#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct LineageGraph {
384    pub nodes: Vec<LineageNode>,
385    pub roots: Vec<String>,
386    pub total_edges: usize,
387}
388
389#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390enum ThoughtPhase {
391    Observe,
392    Reflect,
393    Test,
394    Compress,
395}
396
397impl ThoughtPhase {
398    fn from_thought_count(thought_count: u64) -> Self {
399        match thought_count % 4 {
400            1 => Self::Observe,
401            2 => Self::Reflect,
402            3 => Self::Test,
403            _ => Self::Compress,
404        }
405    }
406
407    fn as_str(&self) -> &'static str {
408        match self {
409            Self::Observe => "observe",
410            Self::Reflect => "reflect",
411            Self::Test => "test",
412            Self::Compress => "compress",
413        }
414    }
415
416    fn event_type(&self) -> ThoughtEventType {
417        match self {
418            Self::Observe => ThoughtEventType::ThoughtGenerated,
419            Self::Reflect => ThoughtEventType::HypothesisRaised,
420            Self::Test => ThoughtEventType::CheckRequested,
421            Self::Compress => ThoughtEventType::SnapshotCompressed,
422        }
423    }
424}
425
426#[derive(Debug, Clone)]
427struct ThoughtWorkItem {
428    persona_id: String,
429    persona_name: String,
430    role: String,
431    charter: String,
432    swarm_id: Option<String>,
433    thought_count: u64,
434    phase: ThoughtPhase,
435}
436
437#[derive(Debug, Clone)]
438struct ThoughtResult {
439    source: &'static str,
440    model: Option<String>,
441    finish_reason: Option<String>,
442    thinking: String,
443    prompt_tokens: Option<u32>,
444    completion_tokens: Option<u32>,
445    total_tokens: Option<u32>,
446    latency_ms: u128,
447    error: Option<String>,
448}
449
450/// Runtime options for cognition manager.
451#[derive(Debug, Clone)]
452pub struct CognitionRuntimeOptions {
453    pub enabled: bool,
454    pub loop_interval_ms: u64,
455    pub max_events: usize,
456    pub max_snapshots: usize,
457    pub default_policy: PersonaPolicy,
458}
459
460impl Default for CognitionRuntimeOptions {
461    fn default() -> Self {
462        Self {
463            enabled: false,
464            loop_interval_ms: 2_000,
465            max_events: 2_000,
466            max_snapshots: 128,
467            default_policy: PersonaPolicy::default(),
468        }
469    }
470}
471
472/// In-memory cognition runtime for perpetual persona swarms.
473#[derive(Debug)]
474pub struct CognitionRuntime {
475    enabled: bool,
476    max_events: usize,
477    max_snapshots: usize,
478    default_policy: PersonaPolicy,
479    running: Arc<AtomicBool>,
480    loop_interval_ms: Arc<RwLock<u64>>,
481    started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
482    last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
483    personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
484    proposals: Arc<RwLock<HashMap<String, Proposal>>>,
485    events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
486    snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
487    loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
488    event_tx: broadcast::Sender<ThoughtEvent>,
489    thinker: Option<Arc<ThinkerClient>>,
490    beliefs: Arc<RwLock<HashMap<String, Belief>>>,
491    attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
492    governance: Arc<RwLock<SwarmGovernance>>,
493    workspace: Arc<RwLock<GlobalWorkspace>>,
494    tools: Option<Arc<ToolRegistry>>,
495    receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
496    /// Proposals pending human approval for Critical risk.
497    pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
498}
499
500impl CognitionRuntime {
501    /// Build runtime from environment feature flags.
502    pub fn new_from_env() -> Self {
503        let mut options = CognitionRuntimeOptions::default();
504        options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
505        options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
506        options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
507        options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
508
509        options.default_policy = PersonaPolicy {
510            max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
511            max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
512            token_budget_per_minute: env_u32(
513                "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
514                20_000,
515            ),
516            compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
517            idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
518            share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
519            allowed_tools: Vec::new(),
520        };
521
522        let thinker_config = ThinkerConfig {
523            enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
524            backend: thinker::ThinkerBackend::from_env(
525                &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
526                    .unwrap_or_else(|_| "openai_compat".to_string()),
527            ),
528            endpoint: normalize_thinker_endpoint(
529                &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
530                    .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
531            ),
532            model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
533                .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
534            api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
535            temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
536            top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
537                .ok()
538                .and_then(|v| v.parse::<f32>().ok()),
539            max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
540            timeout_ms: env_u64("CODETETHER_COGNITION_THINKER_TIMEOUT_MS", 12_000),
541            candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
542            candle_tokenizer_path: std::env::var(
543                "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
544            )
545            .ok(),
546            candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
547            candle_device: thinker::CandleDevicePreference::from_env(
548                &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
549                    .unwrap_or_else(|_| "auto".to_string()),
550            ),
551            candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
552            candle_repeat_penalty: env_f32(
553                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
554                1.1,
555            ),
556            candle_repeat_last_n: env_usize(
557                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
558                64,
559            ),
560            candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
561        };
562
563        Self::new_with_options_and_thinker(options, Some(thinker_config))
564    }
565
566    /// Build runtime from explicit options.
567    pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
568        Self::new_with_options_and_thinker(options, None)
569    }
570
571    fn new_with_options_and_thinker(
572        options: CognitionRuntimeOptions,
573        thinker_config: Option<ThinkerConfig>,
574    ) -> Self {
575        let (event_tx, _) = broadcast::channel(options.max_events.max(16));
576        let thinker = thinker_config.and_then(|cfg| {
577            if !cfg.enabled {
578                return None;
579            }
580            match ThinkerClient::new(cfg) {
581                Ok(client) => {
582                    tracing::info!(
583                        backend = ?client.config().backend,
584                        endpoint = %client.config().endpoint,
585                        model = %client.config().model,
586                        "Cognition thinker initialized"
587                    );
588                    Some(Arc::new(client))
589                }
590                Err(error) => {
591                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
592                    None
593                }
594            }
595        });
596
597        Self {
598            enabled: options.enabled,
599            max_events: options.max_events.max(32),
600            max_snapshots: options.max_snapshots.max(8),
601            default_policy: options.default_policy,
602            running: Arc::new(AtomicBool::new(false)),
603            loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
604            started_at: Arc::new(RwLock::new(None)),
605            last_tick_at: Arc::new(RwLock::new(None)),
606            personas: Arc::new(RwLock::new(HashMap::new())),
607            proposals: Arc::new(RwLock::new(HashMap::new())),
608            events: Arc::new(RwLock::new(VecDeque::new())),
609            snapshots: Arc::new(RwLock::new(VecDeque::new())),
610            loop_handle: Arc::new(Mutex::new(None)),
611            event_tx,
612            thinker,
613            beliefs: Arc::new(RwLock::new(HashMap::new())),
614            attention_queue: Arc::new(RwLock::new(Vec::new())),
615            governance: Arc::new(RwLock::new(SwarmGovernance::default())),
616            workspace: Arc::new(RwLock::new(GlobalWorkspace::default())),
617            tools: None,
618            receipts: Arc::new(RwLock::new(Vec::new())),
619            pending_approvals: Arc::new(RwLock::new(HashMap::new())),
620        }
621    }
622
623    /// Whether cognition is enabled by feature flag.
624    pub fn is_enabled(&self) -> bool {
625        self.enabled
626    }
627
628    /// Subscribe to thought events for streaming.
629    pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
630        self.event_tx.subscribe()
631    }
632
633    /// Start the perpetual cognition loop.
634    pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
635        if !self.enabled {
636            return Err(anyhow!(
637                "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
638            ));
639        }
640
641        let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
642        if let Some(request) = req {
643            if let Some(interval) = request.loop_interval_ms {
644                let mut lock = self.loop_interval_ms.write().await;
645                *lock = interval.max(100);
646            }
647            requested_seed_persona = request.seed_persona;
648        }
649
650        let has_personas = !self.personas.read().await.is_empty();
651        if !has_personas {
652            let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
653            let _ = self.create_persona(seed).await?;
654        }
655
656        if self.running.load(Ordering::SeqCst) {
657            return Ok(self.status().await);
658        }
659
660        self.running.store(true, Ordering::SeqCst);
661        {
662            let mut started = self.started_at.write().await;
663            *started = Some(Utc::now());
664        }
665
666        let running = Arc::clone(&self.running);
667        let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
668        let last_tick_at = Arc::clone(&self.last_tick_at);
669        let personas = Arc::clone(&self.personas);
670        let proposals = Arc::clone(&self.proposals);
671        let events = Arc::clone(&self.events);
672        let snapshots = Arc::clone(&self.snapshots);
673        let max_events = self.max_events;
674        let max_snapshots = self.max_snapshots;
675        let event_tx = self.event_tx.clone();
676        let thinker = self.thinker.clone();
677        let beliefs = Arc::clone(&self.beliefs);
678        let attention_queue = Arc::clone(&self.attention_queue);
679        let governance = Arc::clone(&self.governance);
680        let workspace = Arc::clone(&self.workspace);
681        let tools = self.tools.clone();
682        let receipts = Arc::clone(&self.receipts);
683        let pending_approvals = Arc::clone(&self.pending_approvals);
684
685        let handle = tokio::spawn(async move {
686            let mut next_tick = Instant::now();
687            while running.load(Ordering::SeqCst) {
688                let now_instant = Instant::now();
689                if now_instant < next_tick {
690                    tokio::time::sleep_until(next_tick).await;
691                }
692                if !running.load(Ordering::SeqCst) {
693                    break;
694                }
695
696                let now = Utc::now();
697                {
698                    let mut lock = last_tick_at.write().await;
699                    *lock = Some(now);
700                }
701
702                let mut new_events = Vec::new();
703                let mut new_snapshots = Vec::new();
704                let mut new_proposals = Vec::new();
705
706                // ── Step 1: Budget window reset + enforcement ──
707                let work_items: Vec<ThoughtWorkItem> = {
708                    let mut map = personas.write().await;
709                    let mut items = Vec::new();
710                    for persona in map.values_mut() {
711                        if persona.status != PersonaStatus::Active {
712                            continue;
713                        }
714
715                        // Reset budget window every 60 seconds
716                        let window_elapsed = now
717                            .signed_duration_since(persona.window_started_at)
718                            .num_seconds();
719                        if window_elapsed >= 60 {
720                            persona.tokens_this_window = 0;
721                            persona.compute_ms_this_window = 0;
722                            persona.window_started_at = now;
723                        }
724
725                        // Check budget
726                        let token_ok =
727                            persona.tokens_this_window < persona.policy.token_budget_per_minute;
728                        let compute_ok =
729                            persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
730                        if !token_ok || !compute_ok {
731                            if !persona.budget_paused {
732                                persona.budget_paused = true;
733                                new_events.push(ThoughtEvent {
734                                    id: Uuid::new_v4().to_string(),
735                                    event_type: ThoughtEventType::BudgetPaused,
736                                    persona_id: Some(persona.identity.id.clone()),
737                                    swarm_id: persona.identity.swarm_id.clone(),
738                                    timestamp: now,
739                                    payload: json!({
740                                        "budget_paused": true,
741                                        "tokens_used": persona.tokens_this_window,
742                                        "compute_ms_used": persona.compute_ms_this_window,
743                                        "token_budget": persona.policy.token_budget_per_minute,
744                                        "compute_budget": persona.policy.compute_ms_per_minute,
745                                    }),
746                                });
747                            }
748                            continue; // skip tick for this persona
749                        }
750
751                        persona.budget_paused = false;
752                        persona.thought_count = persona.thought_count.saturating_add(1);
753                        persona.last_tick_at = Some(now);
754                        persona.updated_at = now;
755                        items.push(ThoughtWorkItem {
756                            persona_id: persona.identity.id.clone(),
757                            persona_name: persona.identity.name.clone(),
758                            role: persona.identity.role.clone(),
759                            charter: persona.identity.charter.clone(),
760                            swarm_id: persona.identity.swarm_id.clone(),
761                            thought_count: persona.thought_count,
762                            phase: ThoughtPhase::from_thought_count(persona.thought_count),
763                        });
764                    }
765                    items
766                };
767
768                for work in &work_items {
769                    let context = recent_persona_context(&events, &work.persona_id, 8).await;
770
771                    let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
772
773                    let event_timestamp = Utc::now();
774                    let is_fallback = thought.source == "fallback";
775
776                    new_events.push(ThoughtEvent {
777                        id: Uuid::new_v4().to_string(),
778                        event_type: work.phase.event_type(),
779                        persona_id: Some(work.persona_id.clone()),
780                        swarm_id: work.swarm_id.clone(),
781                        timestamp: event_timestamp,
782                        payload: json!({
783                            "phase": work.phase.as_str(),
784                            "thought_count": work.thought_count,
785                            "persona": {
786                                "id": work.persona_id.clone(),
787                                "name": work.persona_name.clone(),
788                                "role": work.role.clone(),
789                            },
790                            "context_event_count": context.len(),
791                            "thinking": thought.thinking.clone(),
792                            "source": thought.source,
793                            "model": thought.model.clone(),
794                            "finish_reason": thought.finish_reason.clone(),
795                            "usage": {
796                                "prompt_tokens": thought.prompt_tokens,
797                                "completion_tokens": thought.completion_tokens,
798                                "total_tokens": thought.total_tokens,
799                            },
800                            "latency_ms": thought.latency_ms,
801                            "error": thought.error.clone(),
802                        }),
803                    });
804
805                    // ── After thought: update budget counters ──
806                    {
807                        let mut map = personas.write().await;
808                        if let Some(persona) = map.get_mut(&work.persona_id) {
809                            let tokens_used = thought.total_tokens.unwrap_or(0);
810                            let compute_used = thought.latency_ms as u32;
811                            persona.tokens_this_window =
812                                persona.tokens_this_window.saturating_add(tokens_used);
813                            persona.compute_ms_this_window =
814                                persona.compute_ms_this_window.saturating_add(compute_used);
815
816                            // Step 2: Update last_progress_at for non-fallback thought
817                            if !is_fallback {
818                                persona.last_progress_at = Utc::now();
819                            }
820                        }
821                    }
822
823                    // ── Step 3: Belief extraction during Reflect phase ──
824                    if work.phase == ThoughtPhase::Reflect && !is_fallback {
825                        let extracted = beliefs::extract_beliefs_from_thought(
826                            thinker.as_deref(),
827                            &work.persona_id,
828                            &thought.thinking,
829                        )
830                        .await;
831
832                        if !extracted.is_empty() {
833                            let mut belief_store = beliefs.write().await;
834                            let mut attn_queue = attention_queue.write().await;
835                            for mut new_belief in extracted {
836                                // Check for existing belief_key (duplicate detection)
837                                let existing_id = belief_store
838                                    .values()
839                                    .find(|b| {
840                                        b.belief_key == new_belief.belief_key
841                                            && b.status != BeliefStatus::Invalidated
842                                    })
843                                    .map(|b| b.id.clone());
844
845                                if let Some(eid) = existing_id {
846                                    // Confirm existing belief
847                                    if let Some(existing) = belief_store.get_mut(&eid) {
848                                        if !existing.confirmed_by.contains(&work.persona_id) {
849                                            existing.confirmed_by.push(work.persona_id.clone());
850                                        }
851                                        existing.updated_at = Utc::now();
852                                    }
853                                } else {
854                                    // Handle contradiction targets
855                                    let contest_targets: Vec<String> =
856                                        new_belief.contradicts.clone();
857                                    for target_key in &contest_targets {
858                                        if let Some(target) = belief_store.values_mut().find(|b| {
859                                            &b.belief_key == target_key
860                                                && b.status != BeliefStatus::Invalidated
861                                        }) {
862                                            target.contested_by.push(new_belief.id.clone());
863                                            if !new_belief.contradicts.contains(&target.belief_key)
864                                            {
865                                                new_belief
866                                                    .contradicts
867                                                    .push(target.belief_key.clone());
868                                            }
869                                            // Apply contest penalty
870                                            target.confidence = (target.confidence - 0.1).max(0.05);
871                                            if target.confidence < 0.5 {
872                                                target.status = BeliefStatus::Stale;
873                                            } else {
874                                                // Create revalidation attention item
875                                                attn_queue.push(AttentionItem {
876                                                    id: Uuid::new_v4().to_string(),
877                                                    topic: format!(
878                                                        "Revalidate belief: {}",
879                                                        target.claim
880                                                    ),
881                                                    topic_tags: vec![target.belief_key.clone()],
882                                                    priority: 0.7,
883                                                    source_type: AttentionSource::ContestedBelief,
884                                                    source_id: target.id.clone(),
885                                                    assigned_persona: None,
886                                                    created_at: Utc::now(),
887                                                    resolved_at: None,
888                                                });
889                                            }
890                                        }
891                                    }
892
893                                    new_events.push(ThoughtEvent {
894                                        id: Uuid::new_v4().to_string(),
895                                        event_type: ThoughtEventType::BeliefExtracted,
896                                        persona_id: Some(work.persona_id.clone()),
897                                        swarm_id: work.swarm_id.clone(),
898                                        timestamp: Utc::now(),
899                                        payload: json!({
900                                            "belief_id": new_belief.id,
901                                            "belief_key": new_belief.belief_key,
902                                            "claim": trim_for_storage(&new_belief.claim, 280),
903                                            "confidence": new_belief.confidence,
904                                        }),
905                                    });
906
907                                    // Mark progress for belief creation
908                                    {
909                                        let mut map = personas.write().await;
910                                        if let Some(p) = map.get_mut(&work.persona_id) {
911                                            p.last_progress_at = Utc::now();
912                                        }
913                                    }
914
915                                    belief_store.insert(new_belief.id.clone(), new_belief);
916                                }
917                            }
918                        }
919                    }
920
921                    if work.phase == ThoughtPhase::Test {
922                        new_events.push(ThoughtEvent {
923                            id: Uuid::new_v4().to_string(),
924                            event_type: ThoughtEventType::CheckResult,
925                            persona_id: Some(work.persona_id.clone()),
926                            swarm_id: work.swarm_id.clone(),
927                            timestamp: Utc::now(),
928                            payload: json!({
929                                "phase": work.phase.as_str(),
930                                "thought_count": work.thought_count,
931                                "result_excerpt": trim_for_storage(&thought.thinking, 280),
932                                "source": thought.source,
933                                "model": thought.model,
934                            }),
935                        });
936
937                        // Mark progress for check result
938                        {
939                            let mut map = personas.write().await;
940                            if let Some(p) = map.get_mut(&work.persona_id) {
941                                p.last_progress_at = Utc::now();
942                            }
943                        }
944
945                        // ── Step 6: Tool execution via capability leases ──
946                        if !is_fallback {
947                            if let Some(ref tool_registry) = tools {
948                                let allowed = {
949                                    let map = personas.read().await;
950                                    map.get(&work.persona_id)
951                                        .map(|p| p.policy.allowed_tools.clone())
952                                        .unwrap_or_default()
953                                };
954                                if !allowed.is_empty() {
955                                    let tool_results = executor::execute_tool_requests(
956                                        thinker.as_deref(),
957                                        tool_registry,
958                                        &work.persona_id,
959                                        &thought.thinking,
960                                        &allowed,
961                                    )
962                                    .await;
963
964                                    for result_event in tool_results {
965                                        new_events.push(result_event);
966                                        // Mark progress for tool execution
967                                        let mut map = personas.write().await;
968                                        if let Some(p) = map.get_mut(&work.persona_id) {
969                                            p.last_progress_at = Utc::now();
970                                        }
971                                    }
972                                }
973                            }
974                        }
975                    }
976
977                    if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
978                        let gov = governance.read().await;
979                        let proposal = Proposal {
980                            id: Uuid::new_v4().to_string(),
981                            persona_id: work.persona_id.clone(),
982                            title: proposal_title_from_thought(
983                                &thought.thinking,
984                                work.thought_count,
985                            ),
986                            rationale: trim_for_storage(&thought.thinking, 900),
987                            evidence_refs: vec!["internal.thought_stream".to_string()],
988                            risk: ProposalRisk::Low,
989                            status: ProposalStatus::Created,
990                            created_at: Utc::now(),
991                            votes: HashMap::new(),
992                            vote_deadline: Some(
993                                Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
994                            ),
995                            votes_requested: false,
996                        };
997
998                        new_events.push(ThoughtEvent {
999                            id: Uuid::new_v4().to_string(),
1000                            event_type: ThoughtEventType::ProposalCreated,
1001                            persona_id: Some(work.persona_id.clone()),
1002                            swarm_id: work.swarm_id.clone(),
1003                            timestamp: Utc::now(),
1004                            payload: json!({
1005                                "proposal_id": proposal.id,
1006                                "title": proposal.title,
1007                                "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1008                                "source": thought.source,
1009                                "model": thought.model,
1010                            }),
1011                        });
1012
1013                        new_proposals.push(proposal);
1014                    }
1015
1016                    if work.phase == ThoughtPhase::Compress {
1017                        new_snapshots.push(MemorySnapshot {
1018                            id: Uuid::new_v4().to_string(),
1019                            generated_at: Utc::now(),
1020                            swarm_id: work.swarm_id.clone(),
1021                            persona_scope: vec![work.persona_id.clone()],
1022                            summary: trim_for_storage(&thought.thinking, 1_500),
1023                            hot_event_count: context.len(),
1024                            warm_fact_count: estimate_fact_count(&thought.thinking),
1025                            cold_snapshot_count: 1,
1026                            metadata: HashMap::from([
1027                                (
1028                                    "phase".to_string(),
1029                                    serde_json::Value::String(work.phase.as_str().to_string()),
1030                                ),
1031                                (
1032                                    "role".to_string(),
1033                                    serde_json::Value::String(work.role.clone()),
1034                                ),
1035                                (
1036                                    "source".to_string(),
1037                                    serde_json::Value::String(thought.source.to_string()),
1038                                ),
1039                                (
1040                                    "model".to_string(),
1041                                    serde_json::Value::String(
1042                                        thought
1043                                            .model
1044                                            .clone()
1045                                            .unwrap_or_else(|| "fallback".to_string()),
1046                                    ),
1047                                ),
1048                                (
1049                                    "completion_tokens".to_string(),
1050                                    serde_json::Value::Number(serde_json::Number::from(
1051                                        thought.completion_tokens.unwrap_or(0) as u64,
1052                                    )),
1053                                ),
1054                            ]),
1055                        });
1056
1057                        // ── Step 3: Belief staleness decay in Compress phase ──
1058                        {
1059                            let mut belief_store = beliefs.write().await;
1060                            let mut attn_queue = attention_queue.write().await;
1061                            let stale_ids: Vec<String> = belief_store
1062                                .values()
1063                                .filter(|b| {
1064                                    b.status == BeliefStatus::Active && now > b.review_after
1065                                })
1066                                .map(|b| b.id.clone())
1067                                .collect();
1068                            for id in stale_ids {
1069                                if let Some(belief) = belief_store.get_mut(&id) {
1070                                    belief.status = BeliefStatus::Stale;
1071                                    belief.confidence *= 0.98;
1072                                    belief.confidence = belief.confidence.max(0.05);
1073                                    attn_queue.push(AttentionItem {
1074                                        id: Uuid::new_v4().to_string(),
1075                                        topic: format!("Stale belief: {}", belief.claim),
1076                                        topic_tags: vec![belief.belief_key.clone()],
1077                                        priority: 0.4,
1078                                        source_type: AttentionSource::StaleBelief,
1079                                        source_id: belief.id.clone(),
1080                                        assigned_persona: None,
1081                                        created_at: now,
1082                                        resolved_at: None,
1083                                    });
1084                                }
1085                            }
1086                        }
1087
1088                        // ── Step 4d: Update GlobalWorkspace ──
1089                        {
1090                            let belief_store = beliefs.read().await;
1091                            let attn_queue = attention_queue.read().await;
1092
1093                            let mut sorted_beliefs: Vec<&Belief> = belief_store
1094                                .values()
1095                                .filter(|b| b.status == BeliefStatus::Active)
1096                                .collect();
1097                            sorted_beliefs.sort_by(|a, b| {
1098                                let score_a = a.confidence
1099                                    * (1.0
1100                                        / (1.0
1101                                            + now.signed_duration_since(a.updated_at).num_minutes()
1102                                                as f32));
1103                                let score_b = b.confidence
1104                                    * (1.0
1105                                        / (1.0
1106                                            + now.signed_duration_since(b.updated_at).num_minutes()
1107                                                as f32));
1108                                score_b
1109                                    .partial_cmp(&score_a)
1110                                    .unwrap_or(std::cmp::Ordering::Equal)
1111                            });
1112
1113                            let top_beliefs: Vec<String> = sorted_beliefs
1114                                .iter()
1115                                .take(10)
1116                                .map(|b| b.id.clone())
1117                                .collect();
1118                            let top_uncertainties: Vec<String> = belief_store
1119                                .values()
1120                                .filter(|b| {
1121                                    b.status == BeliefStatus::Stale || !b.contested_by.is_empty()
1122                                })
1123                                .take(5)
1124                                .map(|b| {
1125                                    format!("[{}] {}", b.belief_key, trim_for_storage(&b.claim, 80))
1126                                })
1127                                .collect();
1128
1129                            let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1130                                .iter()
1131                                .filter(|a| a.resolved_at.is_none())
1132                                .collect();
1133                            sorted_attn.sort_by(|a, b| {
1134                                b.priority
1135                                    .partial_cmp(&a.priority)
1136                                    .unwrap_or(std::cmp::Ordering::Equal)
1137                            });
1138                            let top_attention: Vec<String> =
1139                                sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1140
1141                            let mut ws = workspace.write().await;
1142                            ws.top_beliefs = top_beliefs;
1143                            ws.top_uncertainties = top_uncertainties;
1144                            ws.top_attention = top_attention;
1145                            ws.updated_at = now;
1146                        }
1147
1148                        new_events.push(ThoughtEvent {
1149                            id: Uuid::new_v4().to_string(),
1150                            event_type: ThoughtEventType::WorkspaceUpdated,
1151                            persona_id: Some(work.persona_id.clone()),
1152                            swarm_id: work.swarm_id.clone(),
1153                            timestamp: Utc::now(),
1154                            payload: json!({ "updated": true }),
1155                        });
1156                    }
1157                }
1158
1159                // ── Step 4c: Governance — resolve proposals ──
1160                {
1161                    let gov = governance.read().await;
1162                    let mut proposal_store = proposals.write().await;
1163                    let persona_map = personas.read().await;
1164                    let active_count = persona_map
1165                        .values()
1166                        .filter(|p| p.status == PersonaStatus::Active)
1167                        .count();
1168
1169                    let proposal_ids: Vec<String> = proposal_store
1170                        .values()
1171                        .filter(|p| p.status == ProposalStatus::Created)
1172                        .map(|p| p.id.clone())
1173                        .collect();
1174
1175                    let mut attn_queue = attention_queue.write().await;
1176                    for pid in proposal_ids {
1177                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1178                            // Check vote deadline
1179                            if let Some(deadline) = proposal.vote_deadline {
1180                                if now > deadline {
1181                                    let quorum_needed =
1182                                        (active_count as f32 * gov.quorum_fraction).ceil() as usize;
1183                                    if proposal.votes.len() < quorum_needed {
1184                                        // Timeout without quorum → attention item
1185                                        attn_queue.push(AttentionItem {
1186                                            id: Uuid::new_v4().to_string(),
1187                                            topic: format!(
1188                                                "Proposal vote timeout: {}",
1189                                                proposal.title
1190                                            ),
1191                                            topic_tags: Vec::new(),
1192                                            priority: 0.6,
1193                                            source_type: AttentionSource::ProposalTimeout,
1194                                            source_id: proposal.id.clone(),
1195                                            assigned_persona: None,
1196                                            created_at: now,
1197                                            resolved_at: None,
1198                                        });
1199                                        continue;
1200                                    }
1201                                }
1202                            }
1203
1204                            // Resolve if enough votes
1205                            let quorum_needed =
1206                                (active_count as f32 * gov.quorum_fraction).ceil() as usize;
1207                            if proposal.votes.len() >= quorum_needed {
1208                                // Check for veto
1209                                let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1210                                    if *vote != ProposalVote::Veto {
1211                                        return false;
1212                                    }
1213                                    if let Some(voter) = persona_map.get(voter_id) {
1214                                        gov.veto_roles.contains(&voter.identity.role)
1215                                    } else {
1216                                        false
1217                                    }
1218                                });
1219
1220                                if vetoed {
1221                                    proposal.status = ProposalStatus::Rejected;
1222                                    continue;
1223                                }
1224
1225                                // Check required approvers
1226                                let required_roles = gov
1227                                    .required_approvers_by_role
1228                                    .get(&proposal.risk)
1229                                    .cloned()
1230                                    .unwrap_or_default();
1231                                let all_required_met = required_roles.iter().all(|role| {
1232                                    proposal.votes.iter().any(|(vid, vote)| {
1233                                        *vote == ProposalVote::Approve
1234                                            && persona_map
1235                                                .get(vid)
1236                                                .map(|p| &p.identity.role == role)
1237                                                .unwrap_or(false)
1238                                    })
1239                                });
1240
1241                                if !all_required_met {
1242                                    continue; // wait for required approvers
1243                                }
1244
1245                                // Count approvals vs rejections
1246                                let approvals = proposal
1247                                    .votes
1248                                    .values()
1249                                    .filter(|v| **v == ProposalVote::Approve)
1250                                    .count();
1251                                let rejections = proposal
1252                                    .votes
1253                                    .values()
1254                                    .filter(|v| **v == ProposalVote::Reject)
1255                                    .count();
1256
1257                                if approvals > rejections {
1258                                    proposal.status = ProposalStatus::Verified;
1259                                } else {
1260                                    proposal.status = ProposalStatus::Rejected;
1261                                }
1262                            }
1263                        }
1264                    }
1265                }
1266
1267                // ── Step 7: Execute verified proposals ──
1268                {
1269                    let mut proposal_store = proposals.write().await;
1270                    let verified_ids: Vec<String> = proposal_store
1271                        .values()
1272                        .filter(|p| p.status == ProposalStatus::Verified)
1273                        .map(|p| p.id.clone())
1274                        .collect();
1275
1276                    for pid in verified_ids {
1277                        if let Some(proposal) = proposal_store.get_mut(&pid) {
1278                            if proposal.risk == ProposalRisk::Critical {
1279                                // Check for human approval
1280                                let approved = {
1281                                    let approvals = pending_approvals.read().await;
1282                                    approvals.get(&pid).copied().unwrap_or(false)
1283                                };
1284                                if !approved {
1285                                    // Register for human approval
1286                                    let mut approvals = pending_approvals.write().await;
1287                                    approvals.entry(pid.clone()).or_insert(false);
1288                                    continue;
1289                                }
1290                            }
1291
1292                            // Create decision receipt
1293                            let receipt = executor::DecisionReceipt {
1294                                id: Uuid::new_v4().to_string(),
1295                                proposal_id: pid.clone(),
1296                                inputs: proposal.evidence_refs.clone(),
1297                                governance_decision: format!(
1298                                    "Approved with {} votes",
1299                                    proposal.votes.len()
1300                                ),
1301                                capability_leases: Vec::new(),
1302                                tool_invocations: Vec::new(),
1303                                outcome: executor::ExecutionOutcome::Success {
1304                                    summary: format!("Proposal '{}' executed", proposal.title),
1305                                },
1306                                created_at: Utc::now(),
1307                            };
1308
1309                            new_events.push(ThoughtEvent {
1310                                id: Uuid::new_v4().to_string(),
1311                                event_type: ThoughtEventType::ActionExecuted,
1312                                persona_id: Some(proposal.persona_id.clone()),
1313                                swarm_id: None,
1314                                timestamp: Utc::now(),
1315                                payload: json!({
1316                                    "receipt_id": receipt.id,
1317                                    "proposal_id": pid,
1318                                    "outcome": "success",
1319                                    "summary": format!("Proposal '{}' executed", proposal.title),
1320                                }),
1321                            });
1322
1323                            receipts.write().await.push(receipt);
1324                            proposal.status = ProposalStatus::Executed;
1325                        }
1326                    }
1327                }
1328
1329                if !new_proposals.is_empty() {
1330                    let mut proposal_store = proposals.write().await;
1331                    for proposal in new_proposals {
1332                        proposal_store.insert(proposal.id.clone(), proposal);
1333                    }
1334                }
1335
1336                for event in new_events {
1337                    push_event_internal(&events, max_events, &event_tx, event).await;
1338                }
1339                for snapshot in new_snapshots {
1340                    push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1341                }
1342
1343                // ── Step 2: Idle TTL reaping ──
1344                {
1345                    let mut map = personas.write().await;
1346                    let idle_ids: Vec<String> = map
1347                        .values()
1348                        .filter(|p| {
1349                            p.status == PersonaStatus::Active
1350                                && !p.budget_paused
1351                                && now.signed_duration_since(p.last_progress_at).num_seconds()
1352                                    > p.policy.idle_ttl_secs as i64
1353                        })
1354                        .map(|p| p.identity.id.clone())
1355                        .collect();
1356
1357                    for id in &idle_ids {
1358                        if let Some(persona) = map.get_mut(id) {
1359                            persona.status = PersonaStatus::Reaped;
1360                            persona.updated_at = now;
1361                        }
1362                        // Also cascade-reap children
1363                        let children: Vec<String> = map
1364                            .values()
1365                            .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1366                            .map(|p| p.identity.id.clone())
1367                            .collect();
1368                        for child_id in children {
1369                            if let Some(child) = map.get_mut(&child_id) {
1370                                child.status = PersonaStatus::Reaped;
1371                                child.updated_at = now;
1372                            }
1373                        }
1374                    }
1375                    drop(map);
1376
1377                    for id in idle_ids {
1378                        push_event_internal(
1379                            &events,
1380                            max_events,
1381                            &event_tx,
1382                            ThoughtEvent {
1383                                id: Uuid::new_v4().to_string(),
1384                                event_type: ThoughtEventType::IdleReaped,
1385                                persona_id: Some(id),
1386                                swarm_id: None,
1387                                timestamp: now,
1388                                payload: json!({ "reason": "idle_ttl_expired" }),
1389                            },
1390                        )
1391                        .await;
1392                    }
1393                }
1394
1395                // ── Step 5: Persistence on Compress ──
1396                if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1397                    let _ = persistence::save_state(
1398                        &personas,
1399                        &proposals,
1400                        &beliefs,
1401                        &attention_queue,
1402                        &workspace,
1403                        &events,
1404                        &snapshots,
1405                    )
1406                    .await;
1407                }
1408
1409                let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1410                next_tick += interval;
1411                let tick_completed = Instant::now();
1412                if tick_completed > next_tick {
1413                    next_tick = tick_completed;
1414                }
1415            }
1416        });
1417
1418        {
1419            let mut lock = self.loop_handle.lock().await;
1420            *lock = Some(handle);
1421        }
1422
1423        Ok(self.status().await)
1424    }
1425
1426    /// Stop the perpetual cognition loop.
1427    pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1428        self.running.store(false, Ordering::SeqCst);
1429
1430        if let Some(handle) = self.loop_handle.lock().await.take() {
1431            handle.abort();
1432            let _ = handle.await;
1433        }
1434
1435        if let Some(reason_value) = reason {
1436            let event = ThoughtEvent {
1437                id: Uuid::new_v4().to_string(),
1438                event_type: ThoughtEventType::CheckResult,
1439                persona_id: None,
1440                swarm_id: None,
1441                timestamp: Utc::now(),
1442                payload: json!({ "stopped": true, "reason": reason_value }),
1443            };
1444            self.push_event(event).await;
1445        }
1446
1447        Ok(self.status().await)
1448    }
1449
1450    /// Create a persona record.
1451    pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1452        let now = Utc::now();
1453        let mut personas = self.personas.write().await;
1454
1455        let mut parent_swarm_id = None;
1456        let mut computed_depth = 0_u32;
1457        let mut inherited_policy = None;
1458
1459        if let Some(parent_id) = req.parent_id.clone() {
1460            let parent = personas
1461                .get(&parent_id)
1462                .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1463
1464            if parent.status == PersonaStatus::Reaped {
1465                return Err(anyhow!("Parent persona {} is reaped", parent_id));
1466            }
1467
1468            parent_swarm_id = parent.identity.swarm_id.clone();
1469            computed_depth = parent.identity.depth.saturating_add(1);
1470            inherited_policy = Some(parent.policy.clone());
1471            let branch_limit = parent.policy.max_branching_factor;
1472
1473            let child_count = personas
1474                .values()
1475                .filter(|p| {
1476                    p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1477                        && p.status != PersonaStatus::Reaped
1478                })
1479                .count();
1480
1481            if child_count as u32 >= branch_limit {
1482                return Err(anyhow!(
1483                    "Parent {} reached branching limit {}",
1484                    parent_id,
1485                    branch_limit
1486                ));
1487            }
1488        }
1489
1490        let policy = req
1491            .policy
1492            .clone()
1493            .or(inherited_policy.clone())
1494            .unwrap_or_else(|| self.default_policy.clone());
1495
1496        let effective_depth_limit = inherited_policy
1497            .as_ref()
1498            .map(|p| p.max_spawn_depth)
1499            .unwrap_or(policy.max_spawn_depth);
1500
1501        if computed_depth > effective_depth_limit {
1502            return Err(anyhow!(
1503                "Spawn depth {} exceeds limit {}",
1504                computed_depth,
1505                effective_depth_limit
1506            ));
1507        }
1508
1509        let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1510        if personas.contains_key(&persona_id) {
1511            return Err(anyhow!("Persona id already exists: {}", persona_id));
1512        }
1513
1514        let identity = PersonaIdentity {
1515            id: persona_id.clone(),
1516            name: req.name,
1517            role: req.role,
1518            charter: req.charter,
1519            swarm_id: req.swarm_id.or(parent_swarm_id),
1520            parent_id: req.parent_id,
1521            depth: computed_depth,
1522            created_at: now,
1523            tags: req.tags,
1524        };
1525
1526        let persona = PersonaRuntimeState {
1527            identity,
1528            policy,
1529            status: PersonaStatus::Active,
1530            thought_count: 0,
1531            last_tick_at: None,
1532            updated_at: now,
1533            tokens_this_window: 0,
1534            compute_ms_this_window: 0,
1535            window_started_at: now,
1536            last_progress_at: now,
1537            budget_paused: false,
1538        };
1539
1540        personas.insert(persona_id, persona.clone());
1541        drop(personas);
1542
1543        self.push_event(ThoughtEvent {
1544            id: Uuid::new_v4().to_string(),
1545            event_type: ThoughtEventType::PersonaSpawned,
1546            persona_id: Some(persona.identity.id.clone()),
1547            swarm_id: persona.identity.swarm_id.clone(),
1548            timestamp: now,
1549            payload: json!({
1550                "name": persona.identity.name,
1551                "role": persona.identity.role,
1552                "depth": persona.identity.depth,
1553            }),
1554        })
1555        .await;
1556
1557        Ok(persona)
1558    }
1559
1560    /// Spawn a child persona under an existing parent.
1561    pub async fn spawn_child(
1562        &self,
1563        parent_id: &str,
1564        req: SpawnPersonaRequest,
1565    ) -> Result<PersonaRuntimeState> {
1566        let request = CreatePersonaRequest {
1567            persona_id: req.persona_id,
1568            name: req.name,
1569            role: req.role,
1570            charter: req.charter,
1571            swarm_id: req.swarm_id,
1572            parent_id: Some(parent_id.to_string()),
1573            policy: req.policy,
1574            tags: Vec::new(),
1575        };
1576        self.create_persona(request).await
1577    }
1578
1579    /// Reap one persona or the full descendant tree.
1580    pub async fn reap_persona(
1581        &self,
1582        persona_id: &str,
1583        req: ReapPersonaRequest,
1584    ) -> Result<ReapPersonaResponse> {
1585        let cascade = req.cascade.unwrap_or(false);
1586        let now = Utc::now();
1587
1588        let mut personas = self.personas.write().await;
1589        if !personas.contains_key(persona_id) {
1590            return Err(anyhow!("Persona not found: {}", persona_id));
1591        }
1592
1593        let mut reaped_ids = vec![persona_id.to_string()];
1594        if cascade {
1595            let mut idx = 0usize;
1596            while idx < reaped_ids.len() {
1597                let current = reaped_ids[idx].clone();
1598                let children: Vec<String> = personas
1599                    .values()
1600                    .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1601                    .map(|p| p.identity.id.clone())
1602                    .collect();
1603                for child in children {
1604                    if !reaped_ids.iter().any(|existing| existing == &child) {
1605                        reaped_ids.push(child);
1606                    }
1607                }
1608                idx += 1;
1609            }
1610        }
1611
1612        for id in &reaped_ids {
1613            if let Some(persona) = personas.get_mut(id) {
1614                persona.status = PersonaStatus::Reaped;
1615                persona.updated_at = now;
1616            }
1617        }
1618        drop(personas);
1619
1620        for id in &reaped_ids {
1621            self.push_event(ThoughtEvent {
1622                id: Uuid::new_v4().to_string(),
1623                event_type: ThoughtEventType::PersonaReaped,
1624                persona_id: Some(id.clone()),
1625                swarm_id: None,
1626                timestamp: now,
1627                payload: json!({
1628                    "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1629                    "cascade": cascade,
1630                }),
1631            })
1632            .await;
1633        }
1634
1635        Ok(ReapPersonaResponse {
1636            count: reaped_ids.len(),
1637            reaped_ids,
1638        })
1639    }
1640
1641    /// Get latest memory snapshot, if any.
1642    pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1643        self.snapshots.read().await.back().cloned()
1644    }
1645
1646    /// Build lineage graph from current persona state.
1647    pub async fn lineage_graph(&self) -> LineageGraph {
1648        let personas = self.personas.read().await;
1649        let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1650        let mut roots = Vec::new();
1651        let mut total_edges = 0usize;
1652
1653        for persona in personas.values() {
1654            if let Some(parent_id) = persona.identity.parent_id.clone() {
1655                children_by_parent
1656                    .entry(parent_id)
1657                    .or_default()
1658                    .push(persona.identity.id.clone());
1659                total_edges = total_edges.saturating_add(1);
1660            } else {
1661                roots.push(persona.identity.id.clone());
1662            }
1663        }
1664
1665        let mut nodes: Vec<LineageNode> = personas
1666            .values()
1667            .map(|persona| {
1668                let mut children = children_by_parent
1669                    .get(&persona.identity.id)
1670                    .cloned()
1671                    .unwrap_or_default();
1672                children.sort();
1673
1674                LineageNode {
1675                    persona_id: persona.identity.id.clone(),
1676                    parent_id: persona.identity.parent_id.clone(),
1677                    children,
1678                    depth: persona.identity.depth,
1679                    status: persona.status,
1680                }
1681            })
1682            .collect();
1683
1684        nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1685        roots.sort();
1686
1687        LineageGraph {
1688            nodes,
1689            roots,
1690            total_edges,
1691        }
1692    }
1693
1694    /// Return a summary status.
1695    pub async fn status(&self) -> CognitionStatus {
1696        let personas = self.personas.read().await;
1697        let events = self.events.read().await;
1698        let snapshots = self.snapshots.read().await;
1699        let started_at = *self.started_at.read().await;
1700        let last_tick_at = *self.last_tick_at.read().await;
1701        let loop_interval_ms = *self.loop_interval_ms.read().await;
1702
1703        let active_persona_count = personas
1704            .values()
1705            .filter(|p| p.status == PersonaStatus::Active)
1706            .count();
1707
1708        CognitionStatus {
1709            enabled: self.enabled,
1710            running: self.running.load(Ordering::SeqCst),
1711            loop_interval_ms,
1712            started_at,
1713            last_tick_at,
1714            persona_count: personas.len(),
1715            active_persona_count,
1716            events_buffered: events.len(),
1717            snapshots_buffered: snapshots.len(),
1718        }
1719    }
1720
1721    async fn push_event(&self, event: ThoughtEvent) {
1722        push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1723    }
1724
1725    /// Set the tool registry for capability-based tool execution.
1726    pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1727        self.tools = Some(registry);
1728    }
1729
1730    /// Get current beliefs.
1731    pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1732        self.beliefs.read().await.clone()
1733    }
1734
1735    /// Get a single belief by ID.
1736    pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1737        self.beliefs.read().await.get(id).cloned()
1738    }
1739
1740    /// Get the current attention queue.
1741    pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1742        self.attention_queue.read().await.clone()
1743    }
1744
1745    /// Get all proposals.
1746    pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1747        self.proposals.read().await.clone()
1748    }
1749
1750    /// Get the current global workspace.
1751    pub async fn get_workspace(&self) -> GlobalWorkspace {
1752        self.workspace.read().await.clone()
1753    }
1754
1755    /// Get decision receipts.
1756    pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1757        self.receipts.read().await.clone()
1758    }
1759
1760    /// Approve a Critical-risk proposal for execution.
1761    pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1762        let proposals = self.proposals.read().await;
1763        let proposal = proposals
1764            .get(proposal_id)
1765            .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1766
1767        if proposal.risk != ProposalRisk::Critical {
1768            return Err(anyhow!("Only Critical proposals require human approval"));
1769        }
1770        if proposal.status != ProposalStatus::Verified {
1771            return Err(anyhow!("Proposal is not in Verified status"));
1772        }
1773        drop(proposals);
1774
1775        let mut approvals = self.pending_approvals.write().await;
1776        approvals.insert(proposal_id.to_string(), true);
1777        Ok(())
1778    }
1779
1780    /// Get governance settings.
1781    pub async fn get_governance(&self) -> SwarmGovernance {
1782        self.governance.read().await.clone()
1783    }
1784
1785    /// Get persona state for a specific persona.
1786    pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1787        self.personas.read().await.get(id).cloned()
1788    }
1789}
1790
1791async fn push_event_internal(
1792    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1793    max_events: usize,
1794    event_tx: &broadcast::Sender<ThoughtEvent>,
1795    event: ThoughtEvent,
1796) {
1797    {
1798        let mut lock = events.write().await;
1799        lock.push_back(event.clone());
1800        while lock.len() > max_events {
1801            lock.pop_front();
1802        }
1803    }
1804    let _ = event_tx.send(event);
1805}
1806
1807async fn push_snapshot_internal(
1808    snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1809    max_snapshots: usize,
1810    snapshot: MemorySnapshot,
1811) {
1812    let mut lock = snapshots.write().await;
1813    lock.push_back(snapshot);
1814    while lock.len() > max_snapshots {
1815        lock.pop_front();
1816    }
1817}
1818
1819async fn recent_persona_context(
1820    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1821    persona_id: &str,
1822    limit: usize,
1823) -> Vec<ThoughtEvent> {
1824    let lock = events.read().await;
1825    let mut selected: Vec<ThoughtEvent> = lock
1826        .iter()
1827        .rev()
1828        .filter(|event| {
1829            event.persona_id.as_deref() == Some(persona_id)
1830                || (event.persona_id.is_none()
1831                    && matches!(
1832                        event.event_type,
1833                        ThoughtEventType::CheckResult
1834                            | ThoughtEventType::ProposalCreated
1835                            | ThoughtEventType::SnapshotCompressed
1836                    ))
1837        })
1838        .take(limit)
1839        .cloned()
1840        .collect();
1841    selected.reverse();
1842    selected
1843}
1844
1845async fn generate_phase_thought(
1846    thinker: Option<&ThinkerClient>,
1847    work: &ThoughtWorkItem,
1848    context: &[ThoughtEvent],
1849) -> ThoughtResult {
1850    let started_at = Instant::now();
1851    if let Some(client) = thinker {
1852        let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1853        match client.think(&system_prompt, &user_prompt).await {
1854            Ok(output) => {
1855                let thinking = normalize_thought_output(work, context, &output.text);
1856                if !thinking.is_empty() {
1857                    return ThoughtResult {
1858                        source: "model",
1859                        model: Some(output.model),
1860                        finish_reason: output.finish_reason,
1861                        thinking,
1862                        prompt_tokens: output.prompt_tokens,
1863                        completion_tokens: output.completion_tokens,
1864                        total_tokens: output.total_tokens,
1865                        latency_ms: started_at.elapsed().as_millis(),
1866                        error: None,
1867                    };
1868                }
1869            }
1870            Err(error) => {
1871                return ThoughtResult {
1872                    source: "fallback",
1873                    model: None,
1874                    finish_reason: None,
1875                    thinking: fallback_phase_text(work, context),
1876                    prompt_tokens: None,
1877                    completion_tokens: None,
1878                    total_tokens: None,
1879                    latency_ms: started_at.elapsed().as_millis(),
1880                    error: Some(error.to_string()),
1881                };
1882            }
1883        }
1884    }
1885
1886    ThoughtResult {
1887        source: "fallback",
1888        model: None,
1889        finish_reason: None,
1890        thinking: fallback_phase_text(work, context),
1891        prompt_tokens: None,
1892        completion_tokens: None,
1893        total_tokens: None,
1894        latency_ms: started_at.elapsed().as_millis(),
1895        error: None,
1896    }
1897}
1898
1899fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1900    let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1901Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1902Write as an operational process update, not meta narration. \
1903Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
1904Output concrete findings, checks, risks, and next actions. \
1905Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
1906        .to_string();
1907
1908    let context_lines = if context.is_empty() {
1909        "none".to_string()
1910    } else {
1911        context
1912            .iter()
1913            .map(format_context_event)
1914            .collect::<Vec<_>>()
1915            .join("\n")
1916    };
1917
1918    let phase_instruction = match work.phase {
1919        ThoughtPhase::Observe => {
1920            "Process format (exact line labels): \
1921Phase: Observe | Goal: detect current customer/business risk | \
1922Signals: 1-3 concrete signals separated by '; ' | \
1923Uncertainty: one unknown that blocks confidence | \
1924Next_Action: one immediate operational action."
1925        }
1926        ThoughtPhase::Reflect => {
1927            "Process format (exact line labels): \
1928Phase: Reflect | Hypothesis: single testable hypothesis | \
1929Rationale: why this is likely | \
1930Business_Risk: customer/revenue/SLA impact | \
1931Validation_Next_Action: one action to confirm or falsify."
1932        }
1933        ThoughtPhase::Test => {
1934            "Process format (exact line labels): \
1935Phase: Test | Check: single concrete check | \
1936Procedure: short executable procedure | \
1937Expected_Result: pass/fail expectation | \
1938Evidence_Quality: low|medium|high with reason | \
1939Escalation_Trigger: when to escalate immediately."
1940        }
1941        ThoughtPhase::Compress => {
1942            "Process format (exact line labels): \
1943Phase: Compress | State_Summary: current state in one line | \
1944Retained_Facts: 3 short facts separated by '; ' | \
1945Open_Risks: up to 2 unresolved risks separated by '; ' | \
1946Next_Process_Step: next operational step."
1947        }
1948    };
1949
1950    let user_prompt = format!(
1951        "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
1952        phase = work.phase.as_str(),
1953        persona_id = work.persona_id,
1954        persona_name = work.persona_name,
1955        role = work.role,
1956        charter = work.charter,
1957        thought_count = work.thought_count,
1958        context = context_lines,
1959        instruction = phase_instruction
1960    );
1961
1962    (system_prompt, user_prompt)
1963}
1964
1965fn format_context_event(event: &ThoughtEvent) -> String {
1966    let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
1967    format!(
1968        "{} {} {}",
1969        event.event_type.as_str(),
1970        event.timestamp.to_rfc3339(),
1971        trim_for_storage(&payload, 220)
1972    )
1973}
1974
1975fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
1976    let charter = trim_for_storage(&work.charter, 180);
1977    let context_summary = fallback_context_summary(context);
1978    let thought = match work.phase {
1979        ThoughtPhase::Observe => format!(
1980            "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
1981            work.role, charter, context_summary
1982        ),
1983        ThoughtPhase::Reflect => format!(
1984            "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
1985        ),
1986        ThoughtPhase::Test => format!(
1987            "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
1988        ),
1989        ThoughtPhase::Compress => format!(
1990            "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
1991            work.role, charter, context_summary
1992        ),
1993    };
1994    trim_for_storage(&thought, 1_200)
1995}
1996
1997fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
1998    let trimmed = trim_for_storage(raw, 2_000);
1999    if trimmed.trim().is_empty() {
2000        return fallback_phase_text(work, context);
2001    }
2002
2003    // Prefer process-labeled content if the model emitted a preamble first.
2004    if let Some(idx) = find_process_label_start(&trimmed) {
2005        let candidate = trimmed[idx..].trim();
2006        if let Some(first_line) = candidate
2007            .lines()
2008            .map(str::trim)
2009            .find(|line| !line.is_empty())
2010        {
2011            let normalized_line = first_line.trim_matches('"').trim_matches('\'').trim();
2012            if normalized_line.starts_with("Phase:")
2013                && !normalized_line.contains('<')
2014                && !has_template_placeholder_values(normalized_line)
2015            {
2016                return normalized_line.to_string();
2017            }
2018        }
2019        if !candidate.is_empty()
2020            && !candidate.contains('<')
2021            && !candidate.contains('\n')
2022            && !has_template_placeholder_values(candidate)
2023        {
2024            return candidate.to_string();
2025        }
2026    }
2027
2028    let lower = trimmed.to_ascii_lowercase();
2029    let looks_meta = lower.starts_with("we need")
2030        || lower.starts_with("i need")
2031        || lower.contains("we need to")
2032        || lower.contains("i need to")
2033        || lower.contains("must output")
2034        || lower.contains("let's ")
2035        || lower.contains("we have to");
2036
2037    if looks_meta || has_template_placeholder_values(&trimmed) {
2038        return fallback_phase_text(work, context);
2039    }
2040
2041    trimmed
2042}
2043
2044fn has_template_placeholder_values(text: &str) -> bool {
2045    let lower = text.to_ascii_lowercase();
2046    [
2047        "goal: ...",
2048        "signals: ...",
2049        "uncertainty: ...",
2050        "next_action: ...",
2051        "hypothesis: ...",
2052        "rationale: ...",
2053        "business_risk: ...",
2054        "validation_next_action: ...",
2055        "check: ...",
2056        "procedure: ...",
2057        "expected_result: ...",
2058        "evidence_quality: ...",
2059        "escalation_trigger: ...",
2060        "state_summary: ...",
2061        "retained_facts: ...",
2062        "open_risks: ...",
2063        "next_process_step: ...",
2064    ]
2065    .iter()
2066    .any(|needle| lower.contains(needle))
2067        || lower.contains("<...")
2068        || lower.contains("tbd")
2069        || lower.contains("todo")
2070}
2071
2072fn find_process_label_start(text: &str) -> Option<usize> {
2073    [
2074        "Phase: Observe",
2075        "Phase: Reflect",
2076        "Phase: Test",
2077        "Phase: Compress",
2078        "Phase:",
2079    ]
2080    .iter()
2081    .filter_map(|label| text.find(label))
2082    .min()
2083}
2084
2085fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2086    if context.is_empty() {
2087        return "No prior events recorded yet.".to_string();
2088    }
2089
2090    let mut latest_error: Option<String> = None;
2091    let mut latest_proposal: Option<String> = None;
2092    let mut latest_check: Option<String> = None;
2093
2094    for event in context.iter().rev() {
2095        if latest_error.is_none()
2096            && let Some(error) = event
2097                .payload
2098                .get("error")
2099                .and_then(serde_json::Value::as_str)
2100            && !error.trim().is_empty()
2101        {
2102            latest_error = Some(trim_for_storage(error, 140));
2103        }
2104
2105        if latest_proposal.is_none()
2106            && event.event_type == ThoughtEventType::ProposalCreated
2107            && let Some(title) = event
2108                .payload
2109                .get("title")
2110                .and_then(serde_json::Value::as_str)
2111            && !title.trim().is_empty()
2112            && !has_template_placeholder_values(title)
2113        {
2114            latest_proposal = Some(trim_for_storage(title, 120));
2115        }
2116
2117        if latest_check.is_none()
2118            && event.event_type == ThoughtEventType::CheckResult
2119            && let Some(result) = event
2120                .payload
2121                .get("result_excerpt")
2122                .and_then(serde_json::Value::as_str)
2123            && !result.trim().is_empty()
2124            && !has_template_placeholder_values(result)
2125        {
2126            latest_check = Some(trim_for_storage(result, 140));
2127        }
2128
2129        if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2130            break;
2131        }
2132    }
2133
2134    let mut lines = vec![format!(
2135        "{} recent cognition events are available.",
2136        context.len()
2137    )];
2138    if let Some(error) = latest_error {
2139        lines.push(format!("Latest error signal: {}.", error));
2140    }
2141    if let Some(proposal) = latest_proposal {
2142        lines.push(format!("Recent proposal: {}.", proposal));
2143    }
2144    if let Some(check) = latest_check {
2145        lines.push(format!("Recent check: {}.", check));
2146    }
2147    lines.join(" ")
2148}
2149
2150fn trim_for_storage(input: &str, max_chars: usize) -> String {
2151    if input.chars().count() <= max_chars {
2152        return input.trim().to_string();
2153    }
2154    let mut trimmed = String::with_capacity(max_chars + 8);
2155    for ch in input.chars().take(max_chars) {
2156        trimmed.push(ch);
2157    }
2158    trimmed.push_str("...");
2159    trimmed.trim().to_string()
2160}
2161
2162fn estimate_fact_count(text: &str) -> usize {
2163    let sentence_count =
2164        text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2165    sentence_count.clamp(1, 12)
2166}
2167
2168fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2169    let first_line = thought
2170        .lines()
2171        .find(|line| !line.trim().is_empty())
2172        .unwrap_or("proposal");
2173    let compact = first_line
2174        .replace(['\t', '\r', '\n'], " ")
2175        .split_whitespace()
2176        .collect::<Vec<_>>()
2177        .join(" ");
2178    let trimmed = trim_for_storage(&compact, 72);
2179    if trimmed.is_empty() {
2180        format!("proposal-{}", thought_count)
2181    } else {
2182        trimmed
2183    }
2184}
2185
2186fn default_seed_persona() -> CreatePersonaRequest {
2187    CreatePersonaRequest {
2188        persona_id: Some("root-thinker".to_string()),
2189        name: "root-thinker".to_string(),
2190        role: "orchestrator".to_string(),
2191        charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2192            .to_string(),
2193        swarm_id: Some("swarm-core".to_string()),
2194        parent_id: None,
2195        policy: None,
2196        tags: vec!["orchestration".to_string()],
2197    }
2198}
2199
2200fn normalize_thinker_endpoint(base_url: &str) -> String {
2201    let trimmed = base_url.trim().trim_end_matches('/');
2202    if trimmed.ends_with("/chat/completions") {
2203        return trimmed.to_string();
2204    }
2205    if trimmed.is_empty() {
2206        return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2207    }
2208    format!("{}/chat/completions", trimmed)
2209}
2210
2211fn env_bool(name: &str, default: bool) -> bool {
2212    std::env::var(name)
2213        .ok()
2214        .and_then(|v| match v.to_ascii_lowercase().as_str() {
2215            "1" | "true" | "yes" | "on" => Some(true),
2216            "0" | "false" | "no" | "off" => Some(false),
2217            _ => None,
2218        })
2219        .unwrap_or(default)
2220}
2221
2222fn env_f32(name: &str, default: f32) -> f32 {
2223    std::env::var(name)
2224        .ok()
2225        .and_then(|v| v.parse::<f32>().ok())
2226        .unwrap_or(default)
2227}
2228
2229fn env_u64(name: &str, default: u64) -> u64 {
2230    std::env::var(name)
2231        .ok()
2232        .and_then(|v| v.parse::<u64>().ok())
2233        .unwrap_or(default)
2234}
2235
2236fn env_u32(name: &str, default: u32) -> u32 {
2237    std::env::var(name)
2238        .ok()
2239        .and_then(|v| v.parse::<u32>().ok())
2240        .unwrap_or(default)
2241}
2242
2243fn env_usize(name: &str, default: usize) -> usize {
2244    std::env::var(name)
2245        .ok()
2246        .and_then(|v| v.parse::<usize>().ok())
2247        .unwrap_or(default)
2248}
2249
2250#[cfg(test)]
2251mod tests {
2252    use super::*;
2253
2254    fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2255        ThoughtWorkItem {
2256            persona_id: "p-1".to_string(),
2257            persona_name: "Spotlessbinco Business Thinker".to_string(),
2258            role: "principal reliability engineer".to_string(),
2259            charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2260                .to_string(),
2261            swarm_id: Some("spotlessbinco".to_string()),
2262            thought_count: 4,
2263            phase,
2264        }
2265    }
2266
2267    fn test_runtime() -> CognitionRuntime {
2268        CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2269            enabled: true,
2270            loop_interval_ms: 25,
2271            max_events: 256,
2272            max_snapshots: 32,
2273            default_policy: PersonaPolicy {
2274                max_spawn_depth: 2,
2275                max_branching_factor: 2,
2276                token_budget_per_minute: 1_000,
2277                compute_ms_per_minute: 1_000,
2278                idle_ttl_secs: 300,
2279                share_memory: false,
2280                allowed_tools: Vec::new(),
2281            },
2282        })
2283    }
2284
2285    #[test]
2286    fn normalize_rejects_placeholder_process_line() {
2287        let work = sample_work_item(ThoughtPhase::Compress);
2288        let output = normalize_thought_output(
2289            &work,
2290            &[],
2291            "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2292        );
2293        assert!(
2294            output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2295        );
2296        assert!(!output.contains("State_Summary: ..."));
2297    }
2298
2299    #[test]
2300    fn normalize_accepts_concrete_process_line() {
2301        let work = sample_work_item(ThoughtPhase::Test);
2302        let output = normalize_thought_output(
2303            &work,
2304            &[],
2305            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2306        );
2307        assert_eq!(
2308            output,
2309            "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2310        );
2311    }
2312
2313    #[tokio::test]
2314    async fn create_spawn_and_lineage_work() {
2315        let runtime = test_runtime();
2316
2317        let root = runtime
2318            .create_persona(CreatePersonaRequest {
2319                persona_id: Some("root".to_string()),
2320                name: "root".to_string(),
2321                role: "orchestrator".to_string(),
2322                charter: "coordinate".to_string(),
2323                swarm_id: Some("swarm-a".to_string()),
2324                parent_id: None,
2325                policy: None,
2326                tags: Vec::new(),
2327            })
2328            .await
2329            .expect("root should be created");
2330
2331        assert_eq!(root.identity.depth, 0);
2332
2333        let child = runtime
2334            .spawn_child(
2335                "root",
2336                SpawnPersonaRequest {
2337                    persona_id: Some("child-1".to_string()),
2338                    name: "child-1".to_string(),
2339                    role: "analyst".to_string(),
2340                    charter: "analyze".to_string(),
2341                    swarm_id: None,
2342                    policy: None,
2343                },
2344            )
2345            .await
2346            .expect("child should spawn");
2347
2348        assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2349        assert_eq!(child.identity.depth, 1);
2350
2351        let lineage = runtime.lineage_graph().await;
2352        assert_eq!(lineage.total_edges, 1);
2353        assert_eq!(lineage.roots, vec!["root".to_string()]);
2354    }
2355
2356    #[tokio::test]
2357    async fn branching_and_depth_limits_are_enforced() {
2358        let runtime = test_runtime();
2359
2360        runtime
2361            .create_persona(CreatePersonaRequest {
2362                persona_id: Some("root".to_string()),
2363                name: "root".to_string(),
2364                role: "orchestrator".to_string(),
2365                charter: "coordinate".to_string(),
2366                swarm_id: Some("swarm-a".to_string()),
2367                parent_id: None,
2368                policy: None,
2369                tags: Vec::new(),
2370            })
2371            .await
2372            .expect("root should be created");
2373
2374        runtime
2375            .spawn_child(
2376                "root",
2377                SpawnPersonaRequest {
2378                    persona_id: Some("c1".to_string()),
2379                    name: "c1".to_string(),
2380                    role: "worker".to_string(),
2381                    charter: "run".to_string(),
2382                    swarm_id: None,
2383                    policy: None,
2384                },
2385            )
2386            .await
2387            .expect("first child should spawn");
2388
2389        runtime
2390            .spawn_child(
2391                "root",
2392                SpawnPersonaRequest {
2393                    persona_id: Some("c2".to_string()),
2394                    name: "c2".to_string(),
2395                    role: "worker".to_string(),
2396                    charter: "run".to_string(),
2397                    swarm_id: None,
2398                    policy: None,
2399                },
2400            )
2401            .await
2402            .expect("second child should spawn");
2403
2404        let third_child = runtime
2405            .spawn_child(
2406                "root",
2407                SpawnPersonaRequest {
2408                    persona_id: Some("c3".to_string()),
2409                    name: "c3".to_string(),
2410                    role: "worker".to_string(),
2411                    charter: "run".to_string(),
2412                    swarm_id: None,
2413                    policy: None,
2414                },
2415            )
2416            .await;
2417        assert!(third_child.is_err());
2418
2419        runtime
2420            .spawn_child(
2421                "c1",
2422                SpawnPersonaRequest {
2423                    persona_id: Some("c1-1".to_string()),
2424                    name: "c1-1".to_string(),
2425                    role: "worker".to_string(),
2426                    charter: "run".to_string(),
2427                    swarm_id: None,
2428                    policy: None,
2429                },
2430            )
2431            .await
2432            .expect("depth 2 should be allowed");
2433
2434        let depth_violation = runtime
2435            .spawn_child(
2436                "c1-1",
2437                SpawnPersonaRequest {
2438                    persona_id: Some("c1-1-1".to_string()),
2439                    name: "c1-1-1".to_string(),
2440                    role: "worker".to_string(),
2441                    charter: "run".to_string(),
2442                    swarm_id: None,
2443                    policy: None,
2444                },
2445            )
2446            .await;
2447        assert!(depth_violation.is_err());
2448    }
2449
2450    #[tokio::test]
2451    async fn start_stop_updates_runtime_status() {
2452        let runtime = test_runtime();
2453
2454        runtime
2455            .start(Some(StartCognitionRequest {
2456                loop_interval_ms: Some(10),
2457                seed_persona: Some(CreatePersonaRequest {
2458                    persona_id: Some("seed".to_string()),
2459                    name: "seed".to_string(),
2460                    role: "watcher".to_string(),
2461                    charter: "observe".to_string(),
2462                    swarm_id: Some("swarm-seed".to_string()),
2463                    parent_id: None,
2464                    policy: None,
2465                    tags: Vec::new(),
2466                }),
2467            }))
2468            .await
2469            .expect("runtime should start");
2470
2471        tokio::time::sleep(Duration::from_millis(60)).await;
2472        let running_status = runtime.status().await;
2473        assert!(running_status.running);
2474        assert!(running_status.events_buffered > 0);
2475
2476        runtime
2477            .stop(Some("test".to_string()))
2478            .await
2479            .expect("runtime should stop");
2480        let stopped_status = runtime.status().await;
2481        assert!(!stopped_status.running);
2482    }
2483
2484    #[tokio::test]
2485    async fn zero_budget_persona_is_skipped() {
2486        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2487            enabled: true,
2488            loop_interval_ms: 10,
2489            max_events: 256,
2490            max_snapshots: 32,
2491            default_policy: PersonaPolicy {
2492                max_spawn_depth: 2,
2493                max_branching_factor: 2,
2494                token_budget_per_minute: 0,
2495                compute_ms_per_minute: 0,
2496                idle_ttl_secs: 3_600,
2497                share_memory: false,
2498                allowed_tools: Vec::new(),
2499            },
2500        });
2501
2502        let persona = runtime
2503            .create_persona(CreatePersonaRequest {
2504                persona_id: Some("budget-test".to_string()),
2505                name: "budget-test".to_string(),
2506                role: "tester".to_string(),
2507                charter: "test budgets".to_string(),
2508                swarm_id: None,
2509                parent_id: None,
2510                policy: None,
2511                tags: Vec::new(),
2512            })
2513            .await
2514            .expect("should create persona");
2515
2516        assert_eq!(persona.tokens_this_window, 0);
2517        assert_eq!(persona.compute_ms_this_window, 0);
2518
2519        // Start and run a few ticks
2520        runtime.start(None).await.expect("should start");
2521        tokio::time::sleep(Duration::from_millis(50)).await;
2522        runtime.stop(None).await.expect("should stop");
2523
2524        // Persona should still have 0 thought count (budget blocked)
2525        let p = runtime.get_persona("budget-test").await.unwrap();
2526        assert_eq!(p.thought_count, 0);
2527        assert!(p.budget_paused);
2528    }
2529
2530    #[tokio::test]
2531    async fn budget_counters_reset_after_window() {
2532        let now = Utc::now();
2533        let mut persona = PersonaRuntimeState {
2534            identity: PersonaIdentity {
2535                id: "p1".to_string(),
2536                name: "test".to_string(),
2537                role: "tester".to_string(),
2538                charter: "test".to_string(),
2539                swarm_id: None,
2540                parent_id: None,
2541                depth: 0,
2542                created_at: now,
2543                tags: Vec::new(),
2544            },
2545            policy: PersonaPolicy::default(),
2546            status: PersonaStatus::Active,
2547            thought_count: 0,
2548            last_tick_at: None,
2549            updated_at: now,
2550            tokens_this_window: 5000,
2551            compute_ms_this_window: 3000,
2552            window_started_at: now - ChronoDuration::seconds(61),
2553            last_progress_at: now,
2554            budget_paused: false,
2555        };
2556
2557        // Simulate window reset check
2558        let window_elapsed = now
2559            .signed_duration_since(persona.window_started_at)
2560            .num_seconds();
2561        assert!(window_elapsed >= 60);
2562
2563        // Reset
2564        persona.tokens_this_window = 0;
2565        persona.compute_ms_this_window = 0;
2566        persona.window_started_at = now;
2567
2568        assert_eq!(persona.tokens_this_window, 0);
2569        assert_eq!(persona.compute_ms_this_window, 0);
2570    }
2571
2572    #[tokio::test]
2573    async fn idle_persona_is_reaped() {
2574        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2575            enabled: true,
2576            loop_interval_ms: 10,
2577            max_events: 256,
2578            max_snapshots: 32,
2579            default_policy: PersonaPolicy {
2580                max_spawn_depth: 2,
2581                max_branching_factor: 2,
2582                token_budget_per_minute: 20_000,
2583                compute_ms_per_minute: 10_000,
2584                idle_ttl_secs: 0, // 0 TTL = immediately idle
2585                share_memory: false,
2586                allowed_tools: Vec::new(),
2587            },
2588        });
2589
2590        runtime
2591            .create_persona(CreatePersonaRequest {
2592                persona_id: Some("idle-test".to_string()),
2593                name: "idle-test".to_string(),
2594                role: "idler".to_string(),
2595                charter: "idle away".to_string(),
2596                swarm_id: None,
2597                parent_id: None,
2598                policy: None,
2599                tags: Vec::new(),
2600            })
2601            .await
2602            .expect("should create persona");
2603
2604        // Manually set last_progress_at to the past
2605        {
2606            let mut personas = runtime.personas.write().await;
2607            if let Some(p) = personas.get_mut("idle-test") {
2608                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2609            }
2610        }
2611
2612        runtime.start(None).await.expect("should start");
2613        tokio::time::sleep(Duration::from_millis(100)).await;
2614        runtime.stop(None).await.expect("should stop");
2615
2616        let p = runtime.get_persona("idle-test").await.unwrap();
2617        assert_eq!(p.status, PersonaStatus::Reaped);
2618    }
2619
2620    #[tokio::test]
2621    async fn budget_paused_persona_not_reaped_for_idle() {
2622        let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2623            enabled: true,
2624            loop_interval_ms: 10,
2625            max_events: 256,
2626            max_snapshots: 32,
2627            default_policy: PersonaPolicy {
2628                max_spawn_depth: 2,
2629                max_branching_factor: 2,
2630                token_budget_per_minute: 0, // zero budget = always paused
2631                compute_ms_per_minute: 0,
2632                idle_ttl_secs: 0, // 0 TTL
2633                share_memory: false,
2634                allowed_tools: Vec::new(),
2635            },
2636        });
2637
2638        runtime
2639            .create_persona(CreatePersonaRequest {
2640                persona_id: Some("paused-test".to_string()),
2641                name: "paused-test".to_string(),
2642                role: "pauser".to_string(),
2643                charter: "pause".to_string(),
2644                swarm_id: None,
2645                parent_id: None,
2646                policy: None,
2647                tags: Vec::new(),
2648            })
2649            .await
2650            .expect("should create persona");
2651
2652        // Set last_progress_at to the past to trigger idle check
2653        {
2654            let mut personas = runtime.personas.write().await;
2655            if let Some(p) = personas.get_mut("paused-test") {
2656                p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2657            }
2658        }
2659
2660        runtime.start(None).await.expect("should start");
2661        tokio::time::sleep(Duration::from_millis(100)).await;
2662        runtime.stop(None).await.expect("should stop");
2663
2664        let p = runtime.get_persona("paused-test").await.unwrap();
2665        // Budget-paused personas are NOT reaped for idle
2666        assert_eq!(p.status, PersonaStatus::Active);
2667        assert!(p.budget_paused);
2668    }
2669
2670    #[tokio::test]
2671    async fn governance_proposal_resolution() {
2672        let runtime = test_runtime();
2673        let gov = SwarmGovernance {
2674            quorum_fraction: 0.5,
2675            required_approvers_by_role: HashMap::new(),
2676            veto_roles: vec!["auditor".to_string()],
2677            vote_timeout_secs: 300,
2678        };
2679        *runtime.governance.write().await = gov;
2680
2681        // Create two personas
2682        runtime
2683            .create_persona(CreatePersonaRequest {
2684                persona_id: Some("voter-1".to_string()),
2685                name: "voter-1".to_string(),
2686                role: "engineer".to_string(),
2687                charter: "vote".to_string(),
2688                swarm_id: None,
2689                parent_id: None,
2690                policy: None,
2691                tags: Vec::new(),
2692            })
2693            .await
2694            .unwrap();
2695        runtime
2696            .create_persona(CreatePersonaRequest {
2697                persona_id: Some("voter-2".to_string()),
2698                name: "voter-2".to_string(),
2699                role: "engineer".to_string(),
2700                charter: "vote".to_string(),
2701                swarm_id: None,
2702                parent_id: None,
2703                policy: None,
2704                tags: Vec::new(),
2705            })
2706            .await
2707            .unwrap();
2708
2709        // Insert a proposal with votes
2710        {
2711            let mut proposals = runtime.proposals.write().await;
2712            let mut votes = HashMap::new();
2713            votes.insert("voter-1".to_string(), ProposalVote::Approve);
2714            proposals.insert(
2715                "prop-1".to_string(),
2716                Proposal {
2717                    id: "prop-1".to_string(),
2718                    persona_id: "voter-1".to_string(),
2719                    title: "test proposal".to_string(),
2720                    rationale: "testing governance".to_string(),
2721                    evidence_refs: Vec::new(),
2722                    risk: ProposalRisk::Low,
2723                    status: ProposalStatus::Created,
2724                    created_at: Utc::now(),
2725                    votes,
2726                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2727                    votes_requested: true,
2728                },
2729            );
2730        }
2731
2732        // quorum = 0.5 * 2 = 1, and we have 1 approval → should be verified
2733        runtime.start(None).await.unwrap();
2734        tokio::time::sleep(Duration::from_millis(100)).await;
2735        runtime.stop(None).await.unwrap();
2736
2737        let proposals = runtime.get_proposals().await;
2738        let prop = proposals.get("prop-1").unwrap();
2739        // Should be verified or executed (it gets verified then executed in same tick)
2740        assert!(
2741            prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2742            "Expected Verified or Executed, got {:?}",
2743            prop.status
2744        );
2745    }
2746
2747    #[tokio::test]
2748    async fn veto_rejects_proposal() {
2749        let runtime = test_runtime();
2750        let gov = SwarmGovernance {
2751            quorum_fraction: 0.5,
2752            required_approvers_by_role: HashMap::new(),
2753            veto_roles: vec!["auditor".to_string()],
2754            vote_timeout_secs: 300,
2755        };
2756        *runtime.governance.write().await = gov;
2757
2758        runtime
2759            .create_persona(CreatePersonaRequest {
2760                persona_id: Some("eng".to_string()),
2761                name: "eng".to_string(),
2762                role: "engineer".to_string(),
2763                charter: "build".to_string(),
2764                swarm_id: None,
2765                parent_id: None,
2766                policy: None,
2767                tags: Vec::new(),
2768            })
2769            .await
2770            .unwrap();
2771        runtime
2772            .create_persona(CreatePersonaRequest {
2773                persona_id: Some("aud".to_string()),
2774                name: "aud".to_string(),
2775                role: "auditor".to_string(),
2776                charter: "audit".to_string(),
2777                swarm_id: None,
2778                parent_id: None,
2779                policy: None,
2780                tags: Vec::new(),
2781            })
2782            .await
2783            .unwrap();
2784
2785        {
2786            let mut proposals = runtime.proposals.write().await;
2787            let mut votes = HashMap::new();
2788            votes.insert("eng".to_string(), ProposalVote::Approve);
2789            votes.insert("aud".to_string(), ProposalVote::Veto);
2790            proposals.insert(
2791                "prop-veto".to_string(),
2792                Proposal {
2793                    id: "prop-veto".to_string(),
2794                    persona_id: "eng".to_string(),
2795                    title: "vetoed proposal".to_string(),
2796                    rationale: "testing veto".to_string(),
2797                    evidence_refs: Vec::new(),
2798                    risk: ProposalRisk::Low,
2799                    status: ProposalStatus::Created,
2800                    created_at: Utc::now(),
2801                    votes,
2802                    vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2803                    votes_requested: true,
2804                },
2805            );
2806        }
2807
2808        runtime.start(None).await.unwrap();
2809        tokio::time::sleep(Duration::from_millis(100)).await;
2810        runtime.stop(None).await.unwrap();
2811
2812        let proposals = runtime.get_proposals().await;
2813        let prop = proposals.get("prop-veto").unwrap();
2814        assert_eq!(prop.status, ProposalStatus::Rejected);
2815    }
2816
2817    #[test]
2818    fn global_workspace_default() {
2819        let ws = GlobalWorkspace::default();
2820        assert!(ws.top_beliefs.is_empty());
2821        assert!(ws.top_uncertainties.is_empty());
2822        assert!(ws.top_attention.is_empty());
2823    }
2824
2825    #[test]
2826    fn attention_item_creation() {
2827        let item = AttentionItem {
2828            id: "a1".to_string(),
2829            topic: "test topic".to_string(),
2830            topic_tags: vec!["reliability".to_string()],
2831            priority: 0.8,
2832            source_type: AttentionSource::ContestedBelief,
2833            source_id: "b1".to_string(),
2834            assigned_persona: None,
2835            created_at: Utc::now(),
2836            resolved_at: None,
2837        };
2838        assert!(item.resolved_at.is_none());
2839        assert_eq!(item.priority, 0.8);
2840    }
2841}