1pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16pub use thinker::{
17 CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
18};
19
20use crate::tool::ToolRegistry;
21use anyhow::{Result, anyhow};
22use beliefs::{Belief, BeliefStatus};
23use chrono::{DateTime, Duration as ChronoDuration, Utc};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::time::Duration;
30use tokio::sync::{Mutex, RwLock, broadcast};
31use tokio::task::JoinHandle;
32
33const _: () = {
35 fn _assert_types_used() {
36 let _ = std::mem::size_of::<CandleDevicePreference>();
37 let _ = std::mem::size_of::<ThinkerBackend>();
38 let _ = std::mem::size_of::<ThinkerOutput>();
39 }
40};
41use tokio::time::Instant;
42use uuid::Uuid;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47pub enum PersonaStatus {
48 Active,
49 Idle,
50 Reaped,
51 Error,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct PersonaPolicy {
57 pub max_spawn_depth: u32,
58 pub max_branching_factor: u32,
59 pub token_budget_per_minute: u32,
60 pub compute_ms_per_minute: u32,
61 pub idle_ttl_secs: u64,
62 pub share_memory: bool,
63 #[serde(default)]
64 pub allowed_tools: Vec<String>,
65}
66
67impl Default for PersonaPolicy {
68 fn default() -> Self {
69 Self {
70 max_spawn_depth: 4,
71 max_branching_factor: 4,
72 token_budget_per_minute: 20_000,
73 compute_ms_per_minute: 10_000,
74 idle_ttl_secs: 3_600,
75 share_memory: false,
76 allowed_tools: Vec::new(),
77 }
78 }
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct PersonaIdentity {
84 pub id: String,
85 pub name: String,
86 pub role: String,
87 pub charter: String,
88 pub swarm_id: Option<String>,
89 pub parent_id: Option<String>,
90 pub depth: u32,
91 pub created_at: DateTime<Utc>,
92 #[serde(default)]
93 pub tags: Vec<String>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct PersonaRuntimeState {
99 pub identity: PersonaIdentity,
100 pub policy: PersonaPolicy,
101 pub status: PersonaStatus,
102 pub thought_count: u64,
103 pub last_tick_at: Option<DateTime<Utc>>,
104 pub updated_at: DateTime<Utc>,
105 pub tokens_this_window: u32,
107 pub compute_ms_this_window: u32,
109 pub window_started_at: DateTime<Utc>,
111 pub last_progress_at: DateTime<Utc>,
113 #[serde(default)]
115 pub budget_paused: bool,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
120#[serde(rename_all = "snake_case")]
121pub enum ProposalRisk {
122 Low,
123 Medium,
124 High,
125 Critical,
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(rename_all = "snake_case")]
131pub enum ProposalStatus {
132 Created,
133 Verified,
134 Rejected,
135 Executed,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum ProposalVote {
142 Approve,
143 Reject,
144 Veto,
145 Abstain,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Proposal {
151 pub id: String,
152 pub persona_id: String,
153 pub title: String,
154 pub rationale: String,
155 pub evidence_refs: Vec<String>,
156 pub risk: ProposalRisk,
157 pub status: ProposalStatus,
158 pub created_at: DateTime<Utc>,
159 #[serde(default)]
161 pub votes: HashMap<String, ProposalVote>,
162 pub vote_deadline: Option<DateTime<Utc>>,
164 #[serde(default)]
166 pub votes_requested: bool,
167 #[serde(default)]
169 pub quorum_needed: usize,
170}
171
172#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
174#[serde(rename_all = "snake_case")]
175pub enum ThoughtEventType {
176 ThoughtGenerated,
177 HypothesisRaised,
178 CheckRequested,
179 CheckResult,
180 ProposalCreated,
181 ProposalVerified,
182 ProposalRejected,
183 ActionExecuted,
184 PersonaSpawned,
185 PersonaReaped,
186 SnapshotCompressed,
187 BeliefExtracted,
188 BeliefContested,
189 BeliefRevalidated,
190 BudgetPaused,
191 IdleReaped,
192 AttentionCreated,
193 VoteCast,
194 WorkspaceUpdated,
195}
196
197impl ThoughtEventType {
198 fn as_str(&self) -> &'static str {
199 match self {
200 Self::ThoughtGenerated => "thought_generated",
201 Self::HypothesisRaised => "hypothesis_raised",
202 Self::CheckRequested => "check_requested",
203 Self::CheckResult => "check_result",
204 Self::ProposalCreated => "proposal_created",
205 Self::ProposalVerified => "proposal_verified",
206 Self::ProposalRejected => "proposal_rejected",
207 Self::ActionExecuted => "action_executed",
208 Self::PersonaSpawned => "persona_spawned",
209 Self::PersonaReaped => "persona_reaped",
210 Self::SnapshotCompressed => "snapshot_compressed",
211 Self::BeliefExtracted => "belief_extracted",
212 Self::BeliefContested => "belief_contested",
213 Self::BeliefRevalidated => "belief_revalidated",
214 Self::BudgetPaused => "budget_paused",
215 Self::IdleReaped => "idle_reaped",
216 Self::AttentionCreated => "attention_created",
217 Self::VoteCast => "vote_cast",
218 Self::WorkspaceUpdated => "workspace_updated",
219 }
220 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct ThoughtEvent {
226 pub id: String,
227 pub event_type: ThoughtEventType,
228 pub persona_id: Option<String>,
229 pub swarm_id: Option<String>,
230 pub timestamp: DateTime<Utc>,
231 pub payload: serde_json::Value,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct MemorySnapshot {
237 pub id: String,
238 pub generated_at: DateTime<Utc>,
239 pub swarm_id: Option<String>,
240 pub persona_scope: Vec<String>,
241 pub summary: String,
242 pub hot_event_count: usize,
243 pub warm_fact_count: usize,
244 pub cold_snapshot_count: usize,
245 pub metadata: HashMap<String, serde_json::Value>,
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct CreatePersonaRequest {
251 pub persona_id: Option<String>,
252 pub name: String,
253 pub role: String,
254 pub charter: String,
255 pub swarm_id: Option<String>,
256 pub parent_id: Option<String>,
257 pub policy: Option<PersonaPolicy>,
258 #[serde(default)]
259 pub tags: Vec<String>,
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct SpawnPersonaRequest {
265 pub persona_id: Option<String>,
266 pub name: String,
267 pub role: String,
268 pub charter: String,
269 pub swarm_id: Option<String>,
270 pub policy: Option<PersonaPolicy>,
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct ReapPersonaRequest {
276 pub cascade: Option<bool>,
277 pub reason: Option<String>,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct StartCognitionRequest {
283 pub loop_interval_ms: Option<u64>,
284 pub seed_persona: Option<CreatePersonaRequest>,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct StopCognitionRequest {
290 pub reason: Option<String>,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
297#[serde(rename_all = "snake_case")]
298pub enum AttentionSource {
299 ContestedBelief,
300 FailedCheck,
301 StaleBelief,
302 ProposalTimeout,
303 FailedExecution,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct AttentionItem {
309 pub id: String,
310 pub topic: String,
311 pub topic_tags: Vec<String>,
312 pub priority: f32,
313 pub source_type: AttentionSource,
314 pub source_id: String,
315 pub assigned_persona: Option<String>,
316 pub created_at: DateTime<Utc>,
317 pub resolved_at: Option<DateTime<Utc>>,
318}
319
320#[derive(Debug, Clone, Serialize, Deserialize)]
322pub struct SwarmGovernance {
323 pub quorum_fraction: f32,
324 pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
325 pub veto_roles: Vec<String>,
326 pub vote_timeout_secs: u64,
327}
328
329impl Default for SwarmGovernance {
330 fn default() -> Self {
331 Self {
332 quorum_fraction: 0.5,
333 required_approvers_by_role: HashMap::new(),
334 veto_roles: Vec::new(),
335 vote_timeout_secs: 300,
336 }
337 }
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct GlobalWorkspace {
343 pub top_beliefs: Vec<String>,
344 pub top_uncertainties: Vec<String>,
345 pub top_attention: Vec<String>,
346 pub active_objectives: Vec<String>,
347 pub updated_at: DateTime<Utc>,
348}
349
350impl Default for GlobalWorkspace {
351 fn default() -> Self {
352 Self {
353 top_beliefs: Vec::new(),
354 top_uncertainties: Vec::new(),
355 top_attention: Vec::new(),
356 active_objectives: Vec::new(),
357 updated_at: Utc::now(),
358 }
359 }
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct CognitionStatus {
365 pub enabled: bool,
366 pub running: bool,
367 pub loop_interval_ms: u64,
368 pub started_at: Option<DateTime<Utc>>,
369 pub last_tick_at: Option<DateTime<Utc>>,
370 pub persona_count: usize,
371 pub active_persona_count: usize,
372 pub events_buffered: usize,
373 pub snapshots_buffered: usize,
374}
375
376#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct ReapPersonaResponse {
379 pub reaped_ids: Vec<String>,
380 pub count: usize,
381}
382
383#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct LineageNode {
386 pub persona_id: String,
387 pub parent_id: Option<String>,
388 pub children: Vec<String>,
389 pub depth: u32,
390 pub status: PersonaStatus,
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct LineageGraph {
396 pub nodes: Vec<LineageNode>,
397 pub roots: Vec<String>,
398 pub total_edges: usize,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402enum ThoughtPhase {
403 Observe,
404 Reflect,
405 Test,
406 Compress,
407}
408
409impl ThoughtPhase {
410 fn from_thought_count(thought_count: u64) -> Self {
411 match thought_count % 4 {
412 1 => Self::Observe,
413 2 => Self::Reflect,
414 3 => Self::Test,
415 _ => Self::Compress,
416 }
417 }
418
419 fn as_str(&self) -> &'static str {
420 match self {
421 Self::Observe => "observe",
422 Self::Reflect => "reflect",
423 Self::Test => "test",
424 Self::Compress => "compress",
425 }
426 }
427
428 fn event_type(&self) -> ThoughtEventType {
429 match self {
430 Self::Observe => ThoughtEventType::ThoughtGenerated,
431 Self::Reflect => ThoughtEventType::HypothesisRaised,
432 Self::Test => ThoughtEventType::CheckRequested,
433 Self::Compress => ThoughtEventType::SnapshotCompressed,
434 }
435 }
436}
437
438#[derive(Debug, Clone)]
439struct ThoughtWorkItem {
440 persona_id: String,
441 persona_name: String,
442 role: String,
443 charter: String,
444 swarm_id: Option<String>,
445 thought_count: u64,
446 phase: ThoughtPhase,
447}
448
449#[derive(Debug, Clone)]
450struct ThoughtResult {
451 source: &'static str,
452 model: Option<String>,
453 finish_reason: Option<String>,
454 thinking: String,
455 prompt_tokens: Option<u32>,
456 completion_tokens: Option<u32>,
457 total_tokens: Option<u32>,
458 latency_ms: u128,
459 error: Option<String>,
460}
461
462#[derive(Debug, Clone)]
464pub struct CognitionRuntimeOptions {
465 pub enabled: bool,
466 pub loop_interval_ms: u64,
467 pub max_events: usize,
468 pub max_snapshots: usize,
469 pub default_policy: PersonaPolicy,
470}
471
472impl Default for CognitionRuntimeOptions {
473 fn default() -> Self {
474 Self {
475 enabled: false,
476 loop_interval_ms: 2_000,
477 max_events: 2_000,
478 max_snapshots: 128,
479 default_policy: PersonaPolicy::default(),
480 }
481 }
482}
483
484#[derive(Debug)]
486pub struct CognitionRuntime {
487 enabled: bool,
488 max_events: usize,
489 max_snapshots: usize,
490 default_policy: PersonaPolicy,
491 running: Arc<AtomicBool>,
492 loop_interval_ms: Arc<RwLock<u64>>,
493 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
494 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
495 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
496 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
497 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
498 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
499 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
500 event_tx: broadcast::Sender<ThoughtEvent>,
501 thinker: Option<Arc<ThinkerClient>>,
502 beliefs: Arc<RwLock<HashMap<String, Belief>>>,
503 attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
504 governance: Arc<RwLock<SwarmGovernance>>,
505 workspace: Arc<RwLock<GlobalWorkspace>>,
506 tools: Option<Arc<ToolRegistry>>,
507 receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
508 pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
510}
511
512impl CognitionRuntime {
513 pub fn new_from_env() -> Self {
515 let mut options = CognitionRuntimeOptions::default();
516 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
517 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
518 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
519 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
520
521 options.default_policy = PersonaPolicy {
522 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
523 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
524 token_budget_per_minute: env_u32(
525 "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
526 20_000,
527 ),
528 compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
529 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
530 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
531 allowed_tools: Vec::new(),
532 };
533
534 let thinker_backend = thinker::ThinkerBackend::from_env(
535 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
536 .unwrap_or_else(|_| "openai_compat".to_string()),
537 );
538 let thinker_timeout_default = match thinker_backend {
539 thinker::ThinkerBackend::OpenAICompat => 30_000,
540 thinker::ThinkerBackend::Candle => 12_000,
541 thinker::ThinkerBackend::Bedrock => 60_000,
542 };
543 let thinker_config = ThinkerConfig {
544 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
545 backend: thinker_backend,
546 endpoint: normalize_thinker_endpoint(
547 &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
548 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
549 ),
550 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
551 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
552 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
553 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
554 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
555 .ok()
556 .and_then(|v| v.parse::<f32>().ok()),
557 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
558 timeout_ms: env_u64(
559 "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
560 thinker_timeout_default,
561 ),
562 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
563 candle_tokenizer_path: std::env::var(
564 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
565 )
566 .ok(),
567 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
568 candle_device: thinker::CandleDevicePreference::from_env(
569 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
570 .unwrap_or_else(|_| "auto".to_string()),
571 ),
572 candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
573 candle_repeat_penalty: env_f32(
574 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
575 1.1,
576 ),
577 candle_repeat_last_n: env_usize(
578 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
579 64,
580 ),
581 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
582 bedrock_region: std::env::var("CODETETHER_COGNITION_THINKER_BEDROCK_REGION")
583 .unwrap_or_else(|_| std::env::var("AWS_DEFAULT_REGION")
584 .unwrap_or_else(|_| "us-west-2".to_string())),
585 };
586
587 let mut runtime = Self::new_with_options(options);
588 runtime.thinker = Some(thinker_config).and_then(|cfg| {
590 if !cfg.enabled {
591 return None;
592 }
593 match ThinkerClient::new(cfg) {
594 Ok(client) => {
595 tracing::info!(
596 backend = ?client.config().backend,
597 endpoint = %client.config().endpoint,
598 model = %client.config().model,
599 "Cognition thinker initialized"
600 );
601 Some(Arc::new(client))
602 }
603 Err(error) => {
604 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
605 None
606 }
607 }
608 });
609 runtime
610 }
611
612 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
614 Self::new_with_options_and_thinker(options, None)
615 }
616
617 fn new_with_options_and_thinker(
618 options: CognitionRuntimeOptions,
619 thinker_config: Option<ThinkerConfig>,
620 ) -> Self {
621 let (event_tx, _) = broadcast::channel(256);
622 let thinker = thinker_config.and_then(|cfg| {
623 if !cfg.enabled {
624 return None;
625 }
626 match ThinkerClient::new(cfg) {
627 Ok(client) => {
628 tracing::info!(
629 backend = ?client.config().backend,
630 endpoint = %client.config().endpoint,
631 model = %client.config().model,
632 "Cognition thinker initialized"
633 );
634 Some(Arc::new(client))
635 }
636 Err(error) => {
637 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
638 None
639 }
640 }
641 });
642
643 let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) =
646 if let Some(persisted) = persistence::load_state() {
647 tracing::info!(
648 personas = persisted.personas.len(),
649 beliefs = persisted.beliefs.len(),
650 persisted_at = %persisted.persisted_at,
651 "Restoring persisted cognition state"
652 );
653 (
654 persisted.personas,
655 persisted.beliefs,
656 persisted.proposals,
657 persisted.attention_queue,
658 persisted.workspace,
659 )
660 } else {
661 (
662 HashMap::new(),
663 HashMap::new(),
664 HashMap::new(),
665 Vec::new(),
666 GlobalWorkspace::default(),
667 )
668 };
669
670 let runtime = Self {
671 enabled: options.enabled,
672 max_events: options.max_events.max(32),
673 max_snapshots: options.max_snapshots.max(8),
674 default_policy: options.default_policy,
675 running: Arc::new(AtomicBool::new(false)),
676 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
677 started_at: Arc::new(RwLock::new(None)),
678 last_tick_at: Arc::new(RwLock::new(None)),
679 personas: Arc::new(RwLock::new(init_personas)),
680 proposals: Arc::new(RwLock::new(init_proposals)),
681 events: Arc::new(RwLock::new(VecDeque::new())),
682 snapshots: Arc::new(RwLock::new(VecDeque::new())),
683 loop_handle: Arc::new(Mutex::new(None)),
684 event_tx,
685 thinker,
686 beliefs: Arc::new(RwLock::new(init_beliefs)),
687 attention_queue: Arc::new(RwLock::new(init_attention)),
688 governance: Arc::new(RwLock::new(SwarmGovernance::default())),
689 workspace: Arc::new(RwLock::new(init_workspace)),
690 tools: None,
691 receipts: Arc::new(RwLock::new(Vec::new())),
692 pending_approvals: Arc::new(RwLock::new(HashMap::new())),
693 };
694
695 runtime
696 }
697
698 pub fn is_enabled(&self) -> bool {
700 self.enabled
701 }
702
703 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
705 self.event_tx.subscribe()
706 }
707
708 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
710 if !self.enabled {
711 return Err(anyhow!(
712 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
713 ));
714 }
715
716 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
717 if let Some(request) = req {
718 if let Some(interval) = request.loop_interval_ms {
719 let mut lock = self.loop_interval_ms.write().await;
720 *lock = interval.max(100);
721 }
722 requested_seed_persona = request.seed_persona;
723 }
724
725 let has_personas = !self.personas.read().await.is_empty();
726 if !has_personas {
727 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
728 let _ = self.create_persona(seed).await?;
729 }
730
731 if self.running.load(Ordering::SeqCst) {
732 return Ok(self.status().await);
733 }
734
735 self.running.store(true, Ordering::SeqCst);
736 {
737 let mut started = self.started_at.write().await;
738 *started = Some(Utc::now());
739 }
740
741 let running = Arc::clone(&self.running);
742 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
743 let last_tick_at = Arc::clone(&self.last_tick_at);
744 let personas = Arc::clone(&self.personas);
745 let proposals = Arc::clone(&self.proposals);
746 let events = Arc::clone(&self.events);
747 let snapshots = Arc::clone(&self.snapshots);
748 let max_events = self.max_events;
749 let max_snapshots = self.max_snapshots;
750 let event_tx = self.event_tx.clone();
751 let thinker = self.thinker.clone();
752 let beliefs = Arc::clone(&self.beliefs);
753 let attention_queue = Arc::clone(&self.attention_queue);
754 let governance = Arc::clone(&self.governance);
755 let workspace = Arc::clone(&self.workspace);
756 let tools = self.tools.clone();
757 let receipts = Arc::clone(&self.receipts);
758 let pending_approvals = Arc::clone(&self.pending_approvals);
759
760 let handle = tokio::spawn(async move {
761 let mut next_tick = Instant::now();
762 while running.load(Ordering::SeqCst) {
763 let now_instant = Instant::now();
764 if now_instant < next_tick {
765 tokio::time::sleep_until(next_tick).await;
766 }
767 if !running.load(Ordering::SeqCst) {
768 break;
769 }
770
771 let now = Utc::now();
772 {
773 let mut lock = last_tick_at.write().await;
774 *lock = Some(now);
775 }
776
777 let mut new_events = Vec::new();
778 let mut new_snapshots = Vec::new();
779 let mut new_proposals = Vec::new();
780
781 let work_items: Vec<ThoughtWorkItem> = {
783 let mut map = personas.write().await;
784 let mut items = Vec::new();
785 for persona in map.values_mut() {
786 if persona.status != PersonaStatus::Active {
787 continue;
788 }
789
790 let window_elapsed = now
792 .signed_duration_since(persona.window_started_at)
793 .num_seconds();
794 if window_elapsed >= 60 {
795 persona.tokens_this_window = 0;
796 persona.compute_ms_this_window = 0;
797 persona.window_started_at = now;
798 }
799
800 let token_ok =
802 persona.tokens_this_window < persona.policy.token_budget_per_minute;
803 let compute_ok =
804 persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
805 if !token_ok || !compute_ok {
806 if !persona.budget_paused {
807 persona.budget_paused = true;
808 new_events.push(ThoughtEvent {
809 id: Uuid::new_v4().to_string(),
810 event_type: ThoughtEventType::BudgetPaused,
811 persona_id: Some(persona.identity.id.clone()),
812 swarm_id: persona.identity.swarm_id.clone(),
813 timestamp: now,
814 payload: json!({
815 "budget_paused": true,
816 "tokens_used": persona.tokens_this_window,
817 "compute_ms_used": persona.compute_ms_this_window,
818 "token_budget": persona.policy.token_budget_per_minute,
819 "compute_budget": persona.policy.compute_ms_per_minute,
820 }),
821 });
822 }
823 continue; }
825
826 persona.budget_paused = false;
827 persona.thought_count = persona.thought_count.saturating_add(1);
828 persona.last_tick_at = Some(now);
829 persona.updated_at = now;
830 items.push(ThoughtWorkItem {
831 persona_id: persona.identity.id.clone(),
832 persona_name: persona.identity.name.clone(),
833 role: persona.identity.role.clone(),
834 charter: persona.identity.charter.clone(),
835 swarm_id: persona.identity.swarm_id.clone(),
836 thought_count: persona.thought_count,
837 phase: ThoughtPhase::from_thought_count(persona.thought_count),
838 });
839 }
840 items
841 };
842
843 for work in &work_items {
844 let context = recent_persona_context(&events, &work.persona_id, 8).await;
845
846 let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
847
848 let event_timestamp = Utc::now();
849 let is_fallback = thought.source == "fallback";
850 let tokens_used = thought.total_tokens.unwrap_or(0);
851 let compute_used = thought.latency_ms as u32;
852
853 new_events.push(ThoughtEvent {
854 id: Uuid::new_v4().to_string(),
855 event_type: work.phase.event_type(),
856 persona_id: Some(work.persona_id.clone()),
857 swarm_id: work.swarm_id.clone(),
858 timestamp: event_timestamp,
859 payload: json!({
860 "phase": work.phase.as_str(),
861 "thought_count": work.thought_count,
862 "persona": {
863 "id": work.persona_id.clone(),
864 "name": work.persona_name.clone(),
865 "role": work.role.clone(),
866 },
867 "context_event_count": context.len(),
868 "thinking": thought.thinking.clone(),
869 "source": thought.source,
870 "model": thought.model.clone(),
871 "finish_reason": thought.finish_reason.clone(),
872 "usage": {
873 "prompt_tokens": thought.prompt_tokens,
874 "completion_tokens": thought.completion_tokens,
875 "total_tokens": thought.total_tokens,
876 },
877 "latency_ms": thought.latency_ms,
878 "error": thought.error.clone(),
879 }),
880 });
881
882 {
884 let mut map = personas.write().await;
885 if let Some(persona) = map.get_mut(&work.persona_id) {
886 persona.tokens_this_window =
887 persona.tokens_this_window.saturating_add(tokens_used);
888 persona.compute_ms_this_window =
889 persona.compute_ms_this_window.saturating_add(compute_used);
890 if !is_fallback {
891 persona.last_progress_at = Utc::now();
892 }
893 }
894 }
895
896 if work.phase == ThoughtPhase::Reflect && !is_fallback {
898 let extracted = beliefs::extract_beliefs_from_thought(
899 thinker.as_deref(),
900 &work.persona_id,
901 &thought.thinking,
902 )
903 .await;
904
905 if !extracted.is_empty() {
906 let mut belief_store = beliefs.write().await;
907 let mut attn_queue = attention_queue.write().await;
908 for mut new_belief in extracted {
909 let existing_id = belief_store
911 .values()
912 .find(|b| {
913 b.belief_key == new_belief.belief_key
914 && b.status != BeliefStatus::Invalidated
915 })
916 .map(|b| b.id.clone());
917
918 if let Some(eid) = existing_id {
919 if let Some(existing) = belief_store.get_mut(&eid) {
921 if !existing.confirmed_by.contains(&work.persona_id) {
922 existing.confirmed_by.push(work.persona_id.clone());
923 }
924 existing.revalidation_success();
925 }
926 } else {
927 let contest_targets: Vec<String> =
929 new_belief.contradicts.clone();
930 for target_key in &contest_targets {
931 if let Some(target) = belief_store.values_mut().find(|b| {
932 &b.belief_key == target_key
933 && b.status != BeliefStatus::Invalidated
934 }) {
935 target.contested_by.push(new_belief.id.clone());
936 if !new_belief.contradicts.contains(&target.belief_key)
937 {
938 new_belief
939 .contradicts
940 .push(target.belief_key.clone());
941 }
942 target.revalidation_failure();
944 if target.confidence >= 0.5 {
945 attn_queue.push(AttentionItem {
947 id: Uuid::new_v4().to_string(),
948 topic: format!(
949 "Revalidate belief: {}",
950 target.claim
951 ),
952 topic_tags: vec![target.belief_key.clone()],
953 priority: 0.7,
954 source_type: AttentionSource::ContestedBelief,
955 source_id: target.id.clone(),
956 assigned_persona: None,
957 created_at: Utc::now(),
958 resolved_at: None,
959 });
960 }
961 }
962 }
963
964 new_events.push(ThoughtEvent {
965 id: Uuid::new_v4().to_string(),
966 event_type: ThoughtEventType::BeliefExtracted,
967 persona_id: Some(work.persona_id.clone()),
968 swarm_id: work.swarm_id.clone(),
969 timestamp: Utc::now(),
970 payload: json!({
971 "belief_id": new_belief.id,
972 "belief_key": new_belief.belief_key,
973 "claim": trim_for_storage(&new_belief.claim, 280),
974 "confidence": new_belief.confidence,
975 }),
976 });
977
978 {
980 let mut map = personas.write().await;
981 if let Some(p) = map.get_mut(&work.persona_id) {
982 p.last_progress_at = Utc::now();
983 }
984 }
985
986 new_belief.clamp_confidence();
987 belief_store.insert(new_belief.id.clone(), new_belief);
988 }
989 }
990 }
991 }
992
993 if work.phase == ThoughtPhase::Test {
994 new_events.push(ThoughtEvent {
995 id: Uuid::new_v4().to_string(),
996 event_type: ThoughtEventType::CheckResult,
997 persona_id: Some(work.persona_id.clone()),
998 swarm_id: work.swarm_id.clone(),
999 timestamp: Utc::now(),
1000 payload: json!({
1001 "phase": work.phase.as_str(),
1002 "thought_count": work.thought_count,
1003 "result_excerpt": trim_for_storage(&thought.thinking, 280),
1004 "source": thought.source,
1005 "model": thought.model,
1006 }),
1007 });
1008
1009 {
1011 let mut map = personas.write().await;
1012 if let Some(p) = map.get_mut(&work.persona_id) {
1013 p.last_progress_at = Utc::now();
1014 }
1015 }
1016
1017 if !is_fallback {
1019 if let Some(ref tool_registry) = tools {
1020 let allowed = {
1021 let map = personas.read().await;
1022 map.get(&work.persona_id)
1023 .map(|p| p.policy.allowed_tools.clone())
1024 .unwrap_or_default()
1025 };
1026 if !allowed.is_empty() {
1027 let tool_results = executor::execute_tool_requests(
1028 thinker.as_deref(),
1029 tool_registry,
1030 &work.persona_id,
1031 &thought.thinking,
1032 &allowed,
1033 )
1034 .await;
1035
1036 for result_event in tool_results {
1037 new_events.push(result_event);
1038 let mut map = personas.write().await;
1040 if let Some(p) = map.get_mut(&work.persona_id) {
1041 p.last_progress_at = Utc::now();
1042 }
1043 }
1044 }
1045 }
1046 }
1047 }
1048
1049 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
1050 let gov = governance.read().await;
1051 let proposal = Proposal {
1052 id: Uuid::new_v4().to_string(),
1053 persona_id: work.persona_id.clone(),
1054 title: proposal_title_from_thought(
1055 &thought.thinking,
1056 work.thought_count,
1057 ),
1058 rationale: trim_for_storage(&thought.thinking, 900),
1059 evidence_refs: vec!["internal.thought_stream".to_string()],
1060 risk: ProposalRisk::Low,
1061 status: ProposalStatus::Created,
1062 created_at: Utc::now(),
1063 votes: HashMap::new(),
1064 vote_deadline: Some(
1065 Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1066 ),
1067 votes_requested: false,
1068 quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1069 as usize,
1070 };
1071
1072 new_events.push(ThoughtEvent {
1073 id: Uuid::new_v4().to_string(),
1074 event_type: ThoughtEventType::ProposalCreated,
1075 persona_id: Some(work.persona_id.clone()),
1076 swarm_id: work.swarm_id.clone(),
1077 timestamp: Utc::now(),
1078 payload: json!({
1079 "proposal_id": proposal.id,
1080 "title": proposal.title,
1081 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1082 "source": thought.source,
1083 "model": thought.model,
1084 }),
1085 });
1086
1087 new_proposals.push(proposal);
1088 }
1089
1090 if work.phase == ThoughtPhase::Compress {
1091 new_snapshots.push(MemorySnapshot {
1092 id: Uuid::new_v4().to_string(),
1093 generated_at: Utc::now(),
1094 swarm_id: work.swarm_id.clone(),
1095 persona_scope: vec![work.persona_id.clone()],
1096 summary: trim_for_storage(&thought.thinking, 1_500),
1097 hot_event_count: context.len(),
1098 warm_fact_count: estimate_fact_count(&thought.thinking),
1099 cold_snapshot_count: 1,
1100 metadata: HashMap::from([
1101 (
1102 "phase".to_string(),
1103 serde_json::Value::String(work.phase.as_str().to_string()),
1104 ),
1105 (
1106 "role".to_string(),
1107 serde_json::Value::String(work.role.clone()),
1108 ),
1109 (
1110 "source".to_string(),
1111 serde_json::Value::String(thought.source.to_string()),
1112 ),
1113 (
1114 "model".to_string(),
1115 serde_json::Value::String(
1116 thought
1117 .model
1118 .clone()
1119 .unwrap_or_else(|| "fallback".to_string()),
1120 ),
1121 ),
1122 (
1123 "completion_tokens".to_string(),
1124 serde_json::Value::Number(serde_json::Number::from(
1125 thought.completion_tokens.unwrap_or(0) as u64,
1126 )),
1127 ),
1128 ]),
1129 });
1130
1131 {
1133 let mut belief_store = beliefs.write().await;
1134 let mut attn_queue = attention_queue.write().await;
1135 let stale_ids: Vec<String> = belief_store
1136 .values()
1137 .filter(|b| {
1138 b.status == BeliefStatus::Active && now > b.review_after
1139 })
1140 .map(|b| b.id.clone())
1141 .collect();
1142 for id in stale_ids {
1143 if let Some(belief) = belief_store.get_mut(&id) {
1144 belief.decay();
1145 attn_queue.push(AttentionItem {
1146 id: Uuid::new_v4().to_string(),
1147 topic: format!("Stale belief: {}", belief.claim),
1148 topic_tags: vec![belief.belief_key.clone()],
1149 priority: 0.4,
1150 source_type: AttentionSource::StaleBelief,
1151 source_id: belief.id.clone(),
1152 assigned_persona: None,
1153 created_at: now,
1154 resolved_at: None,
1155 });
1156 }
1157 }
1158 }
1159
1160 {
1162 let belief_store = beliefs.read().await;
1163 let attn_queue = attention_queue.read().await;
1164
1165 let mut sorted_beliefs: Vec<&Belief> = belief_store
1166 .values()
1167 .filter(|b| b.status == BeliefStatus::Active)
1168 .collect();
1169 sorted_beliefs.sort_by(|a, b| {
1170 let score_a = a.confidence
1171 * (1.0
1172 / (1.0
1173 + now.signed_duration_since(a.updated_at).num_minutes()
1174 as f32));
1175 let score_b = b.confidence
1176 * (1.0
1177 / (1.0
1178 + now.signed_duration_since(b.updated_at).num_minutes()
1179 as f32));
1180 score_b
1181 .partial_cmp(&score_a)
1182 .unwrap_or(std::cmp::Ordering::Equal)
1183 });
1184
1185 let top_beliefs: Vec<String> = sorted_beliefs
1186 .iter()
1187 .take(10)
1188 .map(|b| b.id.clone())
1189 .collect();
1190 let top_uncertainties: Vec<String> = {
1191 let mut uncertain: Vec<&Belief> = belief_store
1192 .values()
1193 .filter(|b| {
1194 b.status == BeliefStatus::Stale
1195 || !b.contested_by.is_empty()
1196 })
1197 .collect();
1198 uncertain.sort_by(|a, b| {
1201 let a_contested = !a.contested_by.is_empty();
1202 let b_contested = !b.contested_by.is_empty();
1203 b_contested
1204 .cmp(&a_contested)
1205 .then_with(|| {
1206 a.confidence
1207 .partial_cmp(&b.confidence)
1208 .unwrap_or(std::cmp::Ordering::Equal)
1209 })
1210 .then_with(|| a.updated_at.cmp(&b.updated_at))
1211 });
1212 uncertain
1213 .iter()
1214 .take(5)
1215 .map(|b| {
1216 format!(
1217 "[{}] {}",
1218 b.belief_key,
1219 trim_for_storage(&b.claim, 80)
1220 )
1221 })
1222 .collect()
1223 };
1224
1225 let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1226 .iter()
1227 .filter(|a| a.resolved_at.is_none())
1228 .collect();
1229 sorted_attn.sort_by(|a, b| {
1230 b.priority
1231 .partial_cmp(&a.priority)
1232 .unwrap_or(std::cmp::Ordering::Equal)
1233 });
1234 let top_attention: Vec<String> =
1235 sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1236
1237 let mut ws = workspace.write().await;
1238 ws.top_beliefs = top_beliefs;
1239 ws.top_uncertainties = top_uncertainties;
1240 ws.top_attention = top_attention;
1241 ws.updated_at = now;
1242 }
1243
1244 new_events.push(ThoughtEvent {
1245 id: Uuid::new_v4().to_string(),
1246 event_type: ThoughtEventType::WorkspaceUpdated,
1247 persona_id: Some(work.persona_id.clone()),
1248 swarm_id: work.swarm_id.clone(),
1249 timestamp: Utc::now(),
1250 payload: json!({ "updated": true }),
1251 });
1252 }
1253 }
1254
1255 {
1257 let gov = governance.read().await;
1258 let mut proposal_store = proposals.write().await;
1259 let persona_map = personas.read().await;
1260
1261 let proposal_ids: Vec<String> = proposal_store
1262 .values()
1263 .filter(|p| p.status == ProposalStatus::Created)
1264 .map(|p| p.id.clone())
1265 .collect();
1266
1267 let mut attn_queue = attention_queue.write().await;
1268 for pid in proposal_ids {
1269 if let Some(proposal) = proposal_store.get_mut(&pid) {
1270 let quorum_needed = proposal.quorum_needed.max(1);
1271
1272 if let Some(deadline) = proposal.vote_deadline {
1274 if now > deadline {
1275 if proposal.votes.len() < quorum_needed {
1276 attn_queue.push(AttentionItem {
1278 id: Uuid::new_v4().to_string(),
1279 topic: format!(
1280 "Proposal vote timeout: {}",
1281 proposal.title
1282 ),
1283 topic_tags: Vec::new(),
1284 priority: 0.6,
1285 source_type: AttentionSource::ProposalTimeout,
1286 source_id: proposal.id.clone(),
1287 assigned_persona: None,
1288 created_at: now,
1289 resolved_at: None,
1290 });
1291 proposal.status = ProposalStatus::Rejected;
1292 continue;
1293 }
1294
1295 let required_roles = gov
1297 .required_approvers_by_role
1298 .get(&proposal.risk)
1299 .cloned()
1300 .unwrap_or_default();
1301 let all_required_met = required_roles.iter().all(|role| {
1302 proposal.votes.iter().any(|(vid, vote)| {
1303 *vote == ProposalVote::Approve
1304 && persona_map
1305 .get(vid)
1306 .map(|p| &p.identity.role == role)
1307 .unwrap_or(false)
1308 })
1309 });
1310 if !all_required_met {
1311 attn_queue.push(AttentionItem {
1312 id: Uuid::new_v4().to_string(),
1313 topic: format!(
1314 "Missing required approvers: {}",
1315 proposal.title
1316 ),
1317 topic_tags: Vec::new(),
1318 priority: 0.7,
1319 source_type: AttentionSource::ProposalTimeout,
1320 source_id: proposal.id.clone(),
1321 assigned_persona: None,
1322 created_at: now,
1323 resolved_at: None,
1324 });
1325 proposal.status = ProposalStatus::Rejected;
1326 continue;
1327 }
1328 }
1329 }
1330
1331 if proposal.votes.len() >= quorum_needed {
1333 let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1335 if *vote != ProposalVote::Veto {
1336 return false;
1337 }
1338 if let Some(voter) = persona_map.get(voter_id) {
1339 gov.veto_roles.contains(&voter.identity.role)
1340 } else {
1341 false
1342 }
1343 });
1344
1345 if vetoed {
1346 proposal.status = ProposalStatus::Rejected;
1347 continue;
1348 }
1349
1350 let required_roles = gov
1352 .required_approvers_by_role
1353 .get(&proposal.risk)
1354 .cloned()
1355 .unwrap_or_default();
1356 let all_required_met = required_roles.iter().all(|role| {
1357 proposal.votes.iter().any(|(vid, vote)| {
1358 *vote == ProposalVote::Approve
1359 && persona_map
1360 .get(vid)
1361 .map(|p| &p.identity.role == role)
1362 .unwrap_or(false)
1363 })
1364 });
1365
1366 if !all_required_met {
1367 continue; }
1369
1370 let approvals = proposal
1372 .votes
1373 .values()
1374 .filter(|v| **v == ProposalVote::Approve)
1375 .count();
1376 let rejections = proposal
1377 .votes
1378 .values()
1379 .filter(|v| **v == ProposalVote::Reject)
1380 .count();
1381
1382 if approvals > rejections {
1383 proposal.status = ProposalStatus::Verified;
1384 } else {
1385 proposal.status = ProposalStatus::Rejected;
1386 }
1387 }
1388 }
1389 }
1390 }
1391
1392 {
1394 let mut proposal_store = proposals.write().await;
1395 let verified_ids: Vec<String> = proposal_store
1396 .values()
1397 .filter(|p| p.status == ProposalStatus::Verified)
1398 .map(|p| p.id.clone())
1399 .collect();
1400
1401 for pid in verified_ids {
1402 if let Some(proposal) = proposal_store.get_mut(&pid) {
1403 if proposal.risk == ProposalRisk::Critical {
1404 let approved = {
1406 let approvals = pending_approvals.read().await;
1407 approvals.get(&pid).copied().unwrap_or(false)
1408 };
1409 if !approved {
1410 let mut approvals = pending_approvals.write().await;
1412 approvals.entry(pid.clone()).or_insert(false);
1413 continue;
1414 }
1415 }
1416
1417 let receipt = executor::DecisionReceipt {
1419 id: Uuid::new_v4().to_string(),
1420 proposal_id: pid.clone(),
1421 inputs: proposal.evidence_refs.clone(),
1422 governance_decision: format!(
1423 "Approved with {} votes",
1424 proposal.votes.len()
1425 ),
1426 capability_leases: Vec::new(),
1427 tool_invocations: Vec::new(),
1428 outcome: executor::ExecutionOutcome::Success {
1429 summary: format!("Proposal '{}' executed", proposal.title),
1430 },
1431 created_at: Utc::now(),
1432 };
1433
1434 new_events.push(ThoughtEvent {
1435 id: Uuid::new_v4().to_string(),
1436 event_type: ThoughtEventType::ActionExecuted,
1437 persona_id: Some(proposal.persona_id.clone()),
1438 swarm_id: None,
1439 timestamp: Utc::now(),
1440 payload: json!({
1441 "receipt_id": receipt.id,
1442 "proposal_id": pid,
1443 "outcome": "success",
1444 "summary": format!("Proposal '{}' executed", proposal.title),
1445 }),
1446 });
1447
1448 receipts.write().await.push(receipt);
1449 proposal.status = ProposalStatus::Executed;
1450 }
1451 }
1452 }
1453
1454 if !new_proposals.is_empty() {
1455 let mut proposal_store = proposals.write().await;
1456 for proposal in new_proposals {
1457 proposal_store.insert(proposal.id.clone(), proposal);
1458 }
1459 }
1460
1461 for event in new_events {
1462 push_event_internal(&events, max_events, &event_tx, event).await;
1463 }
1464 for snapshot in new_snapshots {
1465 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1466 }
1467
1468 {
1470 let mut map = personas.write().await;
1471 let idle_ids: Vec<String> = map
1472 .values()
1473 .filter(|p| {
1474 p.status == PersonaStatus::Active
1475 && !p.budget_paused
1476 && now.signed_duration_since(p.last_progress_at).num_seconds()
1477 > p.policy.idle_ttl_secs as i64
1478 })
1479 .map(|p| p.identity.id.clone())
1480 .collect();
1481
1482 for id in &idle_ids {
1483 if let Some(persona) = map.get_mut(id) {
1484 persona.status = PersonaStatus::Reaped;
1485 persona.updated_at = now;
1486 }
1487 let children: Vec<String> = map
1489 .values()
1490 .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1491 .map(|p| p.identity.id.clone())
1492 .collect();
1493 for child_id in children {
1494 if let Some(child) = map.get_mut(&child_id) {
1495 child.status = PersonaStatus::Reaped;
1496 child.updated_at = now;
1497 }
1498 }
1499 }
1500 drop(map);
1501
1502 for id in idle_ids {
1503 push_event_internal(
1504 &events,
1505 max_events,
1506 &event_tx,
1507 ThoughtEvent {
1508 id: Uuid::new_v4().to_string(),
1509 event_type: ThoughtEventType::IdleReaped,
1510 persona_id: Some(id),
1511 swarm_id: None,
1512 timestamp: now,
1513 payload: json!({ "reason": "idle_ttl_expired" }),
1514 },
1515 )
1516 .await;
1517 }
1518 }
1519
1520 if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1522 let _ = persistence::save_state(
1523 &personas,
1524 &proposals,
1525 &beliefs,
1526 &attention_queue,
1527 &workspace,
1528 &events,
1529 &snapshots,
1530 )
1531 .await;
1532 }
1533
1534 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1535 next_tick += interval;
1536 let tick_completed = Instant::now();
1537 if tick_completed > next_tick {
1538 next_tick = tick_completed;
1539 }
1540 }
1541 });
1542
1543 {
1544 let mut lock = self.loop_handle.lock().await;
1545 *lock = Some(handle);
1546 }
1547
1548 Ok(self.status().await)
1549 }
1550
1551 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1553 self.running.store(false, Ordering::SeqCst);
1554
1555 if let Some(handle) = self.loop_handle.lock().await.take() {
1556 handle.abort();
1557 let _ = handle.await;
1558 }
1559
1560 if let Some(reason_value) = reason {
1561 let event = ThoughtEvent {
1562 id: Uuid::new_v4().to_string(),
1563 event_type: ThoughtEventType::CheckResult,
1564 persona_id: None,
1565 swarm_id: None,
1566 timestamp: Utc::now(),
1567 payload: json!({ "stopped": true, "reason": reason_value }),
1568 };
1569 self.push_event(event).await;
1570 }
1571
1572 Ok(self.status().await)
1573 }
1574
1575 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1577 let now = Utc::now();
1578 let mut personas = self.personas.write().await;
1579
1580 let mut parent_swarm_id = None;
1581 let mut computed_depth = 0_u32;
1582 let mut inherited_policy = None;
1583
1584 if let Some(parent_id) = req.parent_id.clone() {
1585 let parent = personas
1586 .get(&parent_id)
1587 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1588
1589 if parent.status == PersonaStatus::Reaped {
1590 return Err(anyhow!("Parent persona {} is reaped", parent_id));
1591 }
1592
1593 parent_swarm_id = parent.identity.swarm_id.clone();
1594 computed_depth = parent.identity.depth.saturating_add(1);
1595 inherited_policy = Some(parent.policy.clone());
1596 let branch_limit = parent.policy.max_branching_factor;
1597
1598 let child_count = personas
1599 .values()
1600 .filter(|p| {
1601 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1602 && p.status != PersonaStatus::Reaped
1603 })
1604 .count();
1605
1606 if child_count as u32 >= branch_limit {
1607 return Err(anyhow!(
1608 "Parent {} reached branching limit {}",
1609 parent_id,
1610 branch_limit
1611 ));
1612 }
1613 }
1614
1615 let policy = req
1616 .policy
1617 .clone()
1618 .or(inherited_policy.clone())
1619 .unwrap_or_else(|| self.default_policy.clone());
1620
1621 let effective_depth_limit = inherited_policy
1622 .as_ref()
1623 .map(|p| p.max_spawn_depth)
1624 .unwrap_or(policy.max_spawn_depth);
1625
1626 if computed_depth > effective_depth_limit {
1627 return Err(anyhow!(
1628 "Spawn depth {} exceeds limit {}",
1629 computed_depth,
1630 effective_depth_limit
1631 ));
1632 }
1633
1634 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1635 if personas.contains_key(&persona_id) {
1636 return Err(anyhow!("Persona id already exists: {}", persona_id));
1637 }
1638
1639 let identity = PersonaIdentity {
1640 id: persona_id.clone(),
1641 name: req.name,
1642 role: req.role,
1643 charter: req.charter,
1644 swarm_id: req.swarm_id.or(parent_swarm_id),
1645 parent_id: req.parent_id,
1646 depth: computed_depth,
1647 created_at: now,
1648 tags: req.tags,
1649 };
1650
1651 let persona = PersonaRuntimeState {
1652 identity,
1653 policy,
1654 status: PersonaStatus::Active,
1655 thought_count: 0,
1656 last_tick_at: None,
1657 updated_at: now,
1658 tokens_this_window: 0,
1659 compute_ms_this_window: 0,
1660 window_started_at: now,
1661 last_progress_at: now,
1662 budget_paused: false,
1663 };
1664
1665 personas.insert(persona_id, persona.clone());
1666 drop(personas);
1667
1668 self.push_event(ThoughtEvent {
1669 id: Uuid::new_v4().to_string(),
1670 event_type: ThoughtEventType::PersonaSpawned,
1671 persona_id: Some(persona.identity.id.clone()),
1672 swarm_id: persona.identity.swarm_id.clone(),
1673 timestamp: now,
1674 payload: json!({
1675 "name": persona.identity.name,
1676 "role": persona.identity.role,
1677 "depth": persona.identity.depth,
1678 }),
1679 })
1680 .await;
1681
1682 Ok(persona)
1683 }
1684
1685 pub async fn spawn_child(
1687 &self,
1688 parent_id: &str,
1689 req: SpawnPersonaRequest,
1690 ) -> Result<PersonaRuntimeState> {
1691 let request = CreatePersonaRequest {
1692 persona_id: req.persona_id,
1693 name: req.name,
1694 role: req.role,
1695 charter: req.charter,
1696 swarm_id: req.swarm_id,
1697 parent_id: Some(parent_id.to_string()),
1698 policy: req.policy,
1699 tags: Vec::new(),
1700 };
1701 self.create_persona(request).await
1702 }
1703
1704 pub async fn reap_persona(
1706 &self,
1707 persona_id: &str,
1708 req: ReapPersonaRequest,
1709 ) -> Result<ReapPersonaResponse> {
1710 let cascade = req.cascade.unwrap_or(false);
1711 let now = Utc::now();
1712
1713 let mut personas = self.personas.write().await;
1714 if !personas.contains_key(persona_id) {
1715 return Err(anyhow!("Persona not found: {}", persona_id));
1716 }
1717
1718 let mut reaped_ids = vec![persona_id.to_string()];
1719 if cascade {
1720 let mut idx = 0usize;
1721 while idx < reaped_ids.len() {
1722 let current = reaped_ids[idx].clone();
1723 let children: Vec<String> = personas
1724 .values()
1725 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1726 .map(|p| p.identity.id.clone())
1727 .collect();
1728 for child in children {
1729 if !reaped_ids.iter().any(|existing| existing == &child) {
1730 reaped_ids.push(child);
1731 }
1732 }
1733 idx += 1;
1734 }
1735 }
1736
1737 for id in &reaped_ids {
1738 if let Some(persona) = personas.get_mut(id) {
1739 persona.status = PersonaStatus::Reaped;
1740 persona.updated_at = now;
1741 }
1742 }
1743 drop(personas);
1744
1745 for id in &reaped_ids {
1746 self.push_event(ThoughtEvent {
1747 id: Uuid::new_v4().to_string(),
1748 event_type: ThoughtEventType::PersonaReaped,
1749 persona_id: Some(id.clone()),
1750 swarm_id: None,
1751 timestamp: now,
1752 payload: json!({
1753 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1754 "cascade": cascade,
1755 }),
1756 })
1757 .await;
1758 }
1759
1760 Ok(ReapPersonaResponse {
1761 count: reaped_ids.len(),
1762 reaped_ids,
1763 })
1764 }
1765
1766 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1768 self.snapshots.read().await.back().cloned()
1769 }
1770
1771 pub async fn lineage_graph(&self) -> LineageGraph {
1773 let personas = self.personas.read().await;
1774 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1775 let mut roots = Vec::new();
1776 let mut total_edges = 0usize;
1777
1778 for persona in personas.values() {
1779 if let Some(parent_id) = persona.identity.parent_id.clone() {
1780 children_by_parent
1781 .entry(parent_id)
1782 .or_default()
1783 .push(persona.identity.id.clone());
1784 total_edges = total_edges.saturating_add(1);
1785 } else {
1786 roots.push(persona.identity.id.clone());
1787 }
1788 }
1789
1790 let mut nodes: Vec<LineageNode> = personas
1791 .values()
1792 .map(|persona| {
1793 let mut children = children_by_parent
1794 .get(&persona.identity.id)
1795 .cloned()
1796 .unwrap_or_default();
1797 children.sort();
1798
1799 LineageNode {
1800 persona_id: persona.identity.id.clone(),
1801 parent_id: persona.identity.parent_id.clone(),
1802 children,
1803 depth: persona.identity.depth,
1804 status: persona.status,
1805 }
1806 })
1807 .collect();
1808
1809 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1810 roots.sort();
1811
1812 LineageGraph {
1813 nodes,
1814 roots,
1815 total_edges,
1816 }
1817 }
1818
1819 pub async fn status(&self) -> CognitionStatus {
1821 let personas = self.personas.read().await;
1822 let events = self.events.read().await;
1823 let snapshots = self.snapshots.read().await;
1824 let started_at = *self.started_at.read().await;
1825 let last_tick_at = *self.last_tick_at.read().await;
1826 let loop_interval_ms = *self.loop_interval_ms.read().await;
1827
1828 let active_persona_count = personas
1829 .values()
1830 .filter(|p| p.status == PersonaStatus::Active)
1831 .count();
1832
1833 CognitionStatus {
1834 enabled: self.enabled,
1835 running: self.running.load(Ordering::SeqCst),
1836 loop_interval_ms,
1837 started_at,
1838 last_tick_at,
1839 persona_count: personas.len(),
1840 active_persona_count,
1841 events_buffered: events.len(),
1842 snapshots_buffered: snapshots.len(),
1843 }
1844 }
1845
1846 async fn push_event(&self, event: ThoughtEvent) {
1847 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1848 }
1849
1850 pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1852 self.tools = Some(registry);
1853 }
1854
1855 pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1857 self.beliefs.read().await.clone()
1858 }
1859
1860 pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1862 self.beliefs.read().await.get(id).cloned()
1863 }
1864
1865 pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1867 self.attention_queue.read().await.clone()
1868 }
1869
1870 pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1872 self.proposals.read().await.clone()
1873 }
1874
1875 pub async fn get_workspace(&self) -> GlobalWorkspace {
1877 self.workspace.read().await.clone()
1878 }
1879
1880 pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1882 self.receipts.read().await.clone()
1883 }
1884
1885 pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1887 let proposals = self.proposals.read().await;
1888 let proposal = proposals
1889 .get(proposal_id)
1890 .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1891
1892 if proposal.risk != ProposalRisk::Critical {
1893 return Err(anyhow!("Only Critical proposals require human approval"));
1894 }
1895 if proposal.status != ProposalStatus::Verified {
1896 return Err(anyhow!("Proposal is not in Verified status"));
1897 }
1898 drop(proposals);
1899
1900 let mut approvals = self.pending_approvals.write().await;
1901 approvals.insert(proposal_id.to_string(), true);
1902 Ok(())
1903 }
1904
1905 pub async fn get_governance(&self) -> SwarmGovernance {
1907 self.governance.read().await.clone()
1908 }
1909
1910 pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1912 self.personas.read().await.get(id).cloned()
1913 }
1914}
1915
1916async fn push_event_internal(
1917 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1918 max_events: usize,
1919 event_tx: &broadcast::Sender<ThoughtEvent>,
1920 event: ThoughtEvent,
1921) {
1922 {
1923 let mut lock = events.write().await;
1924 lock.push_back(event.clone());
1925 while lock.len() > max_events {
1926 lock.pop_front();
1927 }
1928 }
1929 let _ = event_tx.send(event);
1930}
1931
1932async fn push_snapshot_internal(
1933 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1934 max_snapshots: usize,
1935 snapshot: MemorySnapshot,
1936) {
1937 let mut lock = snapshots.write().await;
1938 lock.push_back(snapshot);
1939 while lock.len() > max_snapshots {
1940 lock.pop_front();
1941 }
1942}
1943
1944async fn recent_persona_context(
1945 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1946 persona_id: &str,
1947 limit: usize,
1948) -> Vec<ThoughtEvent> {
1949 let lock = events.read().await;
1950 let mut selected: Vec<ThoughtEvent> = lock
1951 .iter()
1952 .rev()
1953 .filter(|event| {
1954 event.persona_id.as_deref() == Some(persona_id)
1955 || (event.persona_id.is_none()
1956 && matches!(
1957 event.event_type,
1958 ThoughtEventType::CheckResult
1959 | ThoughtEventType::ProposalCreated
1960 | ThoughtEventType::SnapshotCompressed
1961 | ThoughtEventType::WorkspaceUpdated
1962 | ThoughtEventType::ActionExecuted
1963 | ThoughtEventType::BudgetPaused
1964 ))
1965 })
1966 .take(limit)
1967 .cloned()
1968 .collect();
1969 selected.reverse();
1970 selected
1971}
1972
1973async fn generate_phase_thought(
1974 thinker: Option<&ThinkerClient>,
1975 work: &ThoughtWorkItem,
1976 context: &[ThoughtEvent],
1977) -> ThoughtResult {
1978 let started_at = Instant::now();
1979 if let Some(client) = thinker {
1980 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1981 match client.think(&system_prompt, &user_prompt).await {
1982 Ok(output) => {
1983 let thinking = normalize_thought_output(work, context, &output.text);
1984 if !thinking.is_empty() {
1985 return ThoughtResult {
1986 source: "model",
1987 model: Some(output.model),
1988 finish_reason: output.finish_reason,
1989 thinking,
1990 prompt_tokens: output.prompt_tokens,
1991 completion_tokens: output.completion_tokens,
1992 total_tokens: output.total_tokens,
1993 latency_ms: started_at.elapsed().as_millis(),
1994 error: None,
1995 };
1996 }
1997 }
1998 Err(error) => {
1999 return ThoughtResult {
2000 source: "fallback",
2001 model: None,
2002 finish_reason: None,
2003 thinking: fallback_phase_text(work, context),
2004 prompt_tokens: None,
2005 completion_tokens: None,
2006 total_tokens: None,
2007 latency_ms: started_at.elapsed().as_millis(),
2008 error: Some(error.to_string()),
2009 };
2010 }
2011 }
2012 }
2013
2014 ThoughtResult {
2015 source: "fallback",
2016 model: None,
2017 finish_reason: None,
2018 thinking: fallback_phase_text(work, context),
2019 prompt_tokens: None,
2020 completion_tokens: None,
2021 total_tokens: None,
2022 latency_ms: started_at.elapsed().as_millis(),
2023 error: None,
2024 }
2025}
2026
2027fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
2028 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
2029Respond with concise plain text only. Do not include markdown, XML, or code fences. \
2030Write as an operational process update, not meta narration. \
2031Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
2032Output concrete findings, checks, risks, and next actions. \
2033Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
2034 .to_string();
2035
2036 let context_lines = if context.is_empty() {
2037 "none".to_string()
2038 } else {
2039 context
2040 .iter()
2041 .map(format_context_event)
2042 .collect::<Vec<_>>()
2043 .join("\n")
2044 };
2045
2046 let phase_instruction = match work.phase {
2047 ThoughtPhase::Observe => {
2048 "Process format (exact line labels): \
2049Phase: Observe | Goal: detect current customer/business risk | \
2050Signals: 1-3 concrete signals separated by '; ' | \
2051Uncertainty: one unknown that blocks confidence | \
2052Next_Action: one immediate operational action."
2053 }
2054 ThoughtPhase::Reflect => {
2055 "Process format (exact line labels): \
2056Phase: Reflect | Hypothesis: single testable hypothesis | \
2057Rationale: why this is likely | \
2058Business_Risk: customer/revenue/SLA impact | \
2059Validation_Next_Action: one action to confirm or falsify."
2060 }
2061 ThoughtPhase::Test => {
2062 "Process format (exact line labels): \
2063Phase: Test | Check: single concrete check | \
2064Procedure: short executable procedure | \
2065Expected_Result: pass/fail expectation | \
2066Evidence_Quality: low|medium|high with reason | \
2067Escalation_Trigger: when to escalate immediately."
2068 }
2069 ThoughtPhase::Compress => {
2070 "Process format (exact line labels): \
2071Phase: Compress | State_Summary: current state in one line | \
2072Retained_Facts: 3 short facts separated by '; ' | \
2073Open_Risks: up to 2 unresolved risks separated by '; ' | \
2074Next_Process_Step: next operational step."
2075 }
2076 };
2077
2078 let user_prompt = format!(
2079 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2080 phase = work.phase.as_str(),
2081 persona_id = work.persona_id,
2082 persona_name = work.persona_name,
2083 role = work.role,
2084 charter = work.charter,
2085 thought_count = work.thought_count,
2086 context = context_lines,
2087 instruction = phase_instruction
2088 );
2089
2090 (system_prompt, user_prompt)
2091}
2092
2093fn format_context_event(event: &ThoughtEvent) -> String {
2094 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2095 format!(
2096 "{} {} {}",
2097 event.event_type.as_str(),
2098 event.timestamp.to_rfc3339(),
2099 trim_for_storage(&payload, 220)
2100 )
2101}
2102
2103fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2104 let charter = trim_for_storage(&work.charter, 180);
2105 let context_summary = fallback_context_summary(context);
2106 let thought = match work.phase {
2107 ThoughtPhase::Observe => format!(
2108 "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2109 work.role, charter, context_summary
2110 ),
2111 ThoughtPhase::Reflect => format!(
2112 "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
2113 ),
2114 ThoughtPhase::Test => format!(
2115 "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
2116 ),
2117 ThoughtPhase::Compress => format!(
2118 "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2119 work.role, charter, context_summary
2120 ),
2121 };
2122 trim_for_storage(&thought, 1_200)
2123}
2124
2125fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2126 let trimmed = trim_for_storage(raw, 2_000);
2127 if trimmed.trim().is_empty() {
2128 return fallback_phase_text(work, context);
2129 }
2130
2131 if let Some(idx) = find_process_label_start(&trimmed) {
2133 let candidate = trimmed[idx..].trim();
2134 if !candidate.is_empty()
2136 && !candidate.contains('<')
2137 && !has_template_placeholder_values(candidate)
2138 {
2139 let collapsed: String = candidate
2141 .lines()
2142 .map(str::trim)
2143 .filter(|l| !l.is_empty())
2144 .collect::<Vec<_>>()
2145 .join(" | ");
2146 let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2147 if cleaned.starts_with("Phase:") {
2148 return cleaned.to_string();
2149 }
2150 return collapsed;
2151 }
2152 }
2153
2154 let lower = trimmed.to_ascii_lowercase();
2155 let looks_meta = lower.starts_with("we need")
2156 || lower.starts_with("i need")
2157 || lower.contains("we need to")
2158 || lower.contains("i need to")
2159 || lower.contains("must output")
2160 || lower.contains("let's ")
2161 || lower.contains("we have to");
2162
2163 if looks_meta || has_template_placeholder_values(&trimmed) {
2164 return fallback_phase_text(work, context);
2165 }
2166
2167 trimmed
2168}
2169
2170fn has_template_placeholder_values(text: &str) -> bool {
2171 let lower = text.to_ascii_lowercase();
2172 [
2173 "goal: ...",
2174 "signals: ...",
2175 "uncertainty: ...",
2176 "next_action: ...",
2177 "hypothesis: ...",
2178 "rationale: ...",
2179 "business_risk: ...",
2180 "validation_next_action: ...",
2181 "check: ...",
2182 "procedure: ...",
2183 "expected_result: ...",
2184 "evidence_quality: ...",
2185 "escalation_trigger: ...",
2186 "state_summary: ...",
2187 "retained_facts: ...",
2188 "open_risks: ...",
2189 "next_process_step: ...",
2190 ]
2191 .iter()
2192 .any(|needle| lower.contains(needle))
2193 || lower.contains("<...")
2194 || lower.contains("tbd")
2195 || lower.contains("todo")
2196}
2197
2198fn find_process_label_start(text: &str) -> Option<usize> {
2199 [
2200 "Phase: Observe",
2201 "Phase: Reflect",
2202 "Phase: Test",
2203 "Phase: Compress",
2204 "Phase:",
2205 ]
2206 .iter()
2207 .filter_map(|label| text.find(label))
2208 .min()
2209}
2210
2211fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2212 if context.is_empty() {
2213 return "No prior events recorded yet.".to_string();
2214 }
2215
2216 let mut latest_error: Option<String> = None;
2217 let mut latest_proposal: Option<String> = None;
2218 let mut latest_check: Option<String> = None;
2219
2220 for event in context.iter().rev() {
2221 if latest_error.is_none()
2222 && let Some(error) = event
2223 .payload
2224 .get("error")
2225 .and_then(serde_json::Value::as_str)
2226 && !error.trim().is_empty()
2227 {
2228 latest_error = Some(trim_for_storage(error, 140));
2229 }
2230
2231 if latest_proposal.is_none()
2232 && event.event_type == ThoughtEventType::ProposalCreated
2233 && let Some(title) = event
2234 .payload
2235 .get("title")
2236 .and_then(serde_json::Value::as_str)
2237 && !title.trim().is_empty()
2238 && !has_template_placeholder_values(title)
2239 {
2240 latest_proposal = Some(trim_for_storage(title, 120));
2241 }
2242
2243 if latest_check.is_none()
2244 && event.event_type == ThoughtEventType::CheckResult
2245 && let Some(result) = event
2246 .payload
2247 .get("result_excerpt")
2248 .and_then(serde_json::Value::as_str)
2249 && !result.trim().is_empty()
2250 && !has_template_placeholder_values(result)
2251 {
2252 latest_check = Some(trim_for_storage(result, 140));
2253 }
2254
2255 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2256 break;
2257 }
2258 }
2259
2260 let mut lines = vec![format!(
2261 "{} recent cognition events are available.",
2262 context.len()
2263 )];
2264 if let Some(error) = latest_error {
2265 lines.push(format!("Latest error signal: {}.", error));
2266 }
2267 if let Some(proposal) = latest_proposal {
2268 lines.push(format!("Recent proposal: {}.", proposal));
2269 }
2270 if let Some(check) = latest_check {
2271 lines.push(format!("Recent check: {}.", check));
2272 }
2273 lines.join(" ")
2274}
2275
2276fn trim_for_storage(input: &str, max_chars: usize) -> String {
2277 if input.chars().count() <= max_chars {
2278 return input.trim().to_string();
2279 }
2280 let mut trimmed = String::with_capacity(max_chars + 8);
2281 for ch in input.chars().take(max_chars) {
2282 trimmed.push(ch);
2283 }
2284 trimmed.push_str("...");
2285 trimmed.trim().to_string()
2286}
2287
2288fn estimate_fact_count(text: &str) -> usize {
2289 let sentence_count =
2290 text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2291 sentence_count.clamp(1, 12)
2292}
2293
2294fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2295 let first_line = thought
2296 .lines()
2297 .find(|line| !line.trim().is_empty())
2298 .unwrap_or("proposal");
2299 let compact = first_line
2300 .replace(['\t', '\r', '\n'], " ")
2301 .split_whitespace()
2302 .collect::<Vec<_>>()
2303 .join(" ");
2304 let trimmed = trim_for_storage(&compact, 72);
2305 if trimmed.is_empty() {
2306 format!("proposal-{}", thought_count)
2307 } else {
2308 trimmed
2309 }
2310}
2311
2312fn default_seed_persona() -> CreatePersonaRequest {
2313 CreatePersonaRequest {
2314 persona_id: Some("root-thinker".to_string()),
2315 name: "root-thinker".to_string(),
2316 role: "orchestrator".to_string(),
2317 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2318 .to_string(),
2319 swarm_id: Some("swarm-core".to_string()),
2320 parent_id: None,
2321 policy: None,
2322 tags: vec!["orchestration".to_string()],
2323 }
2324}
2325
2326fn normalize_thinker_endpoint(base_url: &str) -> String {
2327 let trimmed = base_url.trim().trim_end_matches('/');
2328 if trimmed.ends_with("/chat/completions") {
2329 return trimmed.to_string();
2330 }
2331 if trimmed.is_empty() {
2332 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2333 }
2334 format!("{}/chat/completions", trimmed)
2335}
2336
2337fn env_bool(name: &str, default: bool) -> bool {
2338 std::env::var(name)
2339 .ok()
2340 .and_then(|v| match v.to_ascii_lowercase().as_str() {
2341 "1" | "true" | "yes" | "on" => Some(true),
2342 "0" | "false" | "no" | "off" => Some(false),
2343 _ => None,
2344 })
2345 .unwrap_or(default)
2346}
2347
2348fn env_f32(name: &str, default: f32) -> f32 {
2349 std::env::var(name)
2350 .ok()
2351 .and_then(|v| v.parse::<f32>().ok())
2352 .unwrap_or(default)
2353}
2354
2355fn env_u64(name: &str, default: u64) -> u64 {
2356 std::env::var(name)
2357 .ok()
2358 .and_then(|v| v.parse::<u64>().ok())
2359 .unwrap_or(default)
2360}
2361
2362fn env_u32(name: &str, default: u32) -> u32 {
2363 std::env::var(name)
2364 .ok()
2365 .and_then(|v| v.parse::<u32>().ok())
2366 .unwrap_or(default)
2367}
2368
2369fn env_usize(name: &str, default: usize) -> usize {
2370 std::env::var(name)
2371 .ok()
2372 .and_then(|v| v.parse::<usize>().ok())
2373 .unwrap_or(default)
2374}
2375
2376#[cfg(test)]
2377mod tests {
2378 use super::*;
2379
2380 fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2381 ThoughtWorkItem {
2382 persona_id: "p-1".to_string(),
2383 persona_name: "Spotlessbinco Business Thinker".to_string(),
2384 role: "principal reliability engineer".to_string(),
2385 charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2386 .to_string(),
2387 swarm_id: Some("spotlessbinco".to_string()),
2388 thought_count: 4,
2389 phase,
2390 }
2391 }
2392
2393 fn test_runtime() -> CognitionRuntime {
2394 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2395 enabled: true,
2396 loop_interval_ms: 25,
2397 max_events: 256,
2398 max_snapshots: 32,
2399 default_policy: PersonaPolicy {
2400 max_spawn_depth: 2,
2401 max_branching_factor: 2,
2402 token_budget_per_minute: 1_000,
2403 compute_ms_per_minute: 1_000,
2404 idle_ttl_secs: 300,
2405 share_memory: false,
2406 allowed_tools: Vec::new(),
2407 },
2408 })
2409 }
2410
2411 #[test]
2412 fn normalize_rejects_placeholder_process_line() {
2413 let work = sample_work_item(ThoughtPhase::Compress);
2414 let output = normalize_thought_output(
2415 &work,
2416 &[],
2417 "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2418 );
2419 assert!(
2420 output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2421 );
2422 assert!(!output.contains("State_Summary: ..."));
2423 }
2424
2425 #[test]
2426 fn normalize_accepts_concrete_process_line() {
2427 let work = sample_work_item(ThoughtPhase::Test);
2428 let output = normalize_thought_output(
2429 &work,
2430 &[],
2431 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2432 );
2433 assert_eq!(
2434 output,
2435 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2436 );
2437 }
2438
2439 #[tokio::test]
2440 async fn create_spawn_and_lineage_work() {
2441 let runtime = test_runtime();
2442
2443 let root = runtime
2444 .create_persona(CreatePersonaRequest {
2445 persona_id: Some("root".to_string()),
2446 name: "root".to_string(),
2447 role: "orchestrator".to_string(),
2448 charter: "coordinate".to_string(),
2449 swarm_id: Some("swarm-a".to_string()),
2450 parent_id: None,
2451 policy: None,
2452 tags: Vec::new(),
2453 })
2454 .await
2455 .expect("root should be created");
2456
2457 assert_eq!(root.identity.depth, 0);
2458
2459 let child = runtime
2460 .spawn_child(
2461 "root",
2462 SpawnPersonaRequest {
2463 persona_id: Some("child-1".to_string()),
2464 name: "child-1".to_string(),
2465 role: "analyst".to_string(),
2466 charter: "analyze".to_string(),
2467 swarm_id: None,
2468 policy: None,
2469 },
2470 )
2471 .await
2472 .expect("child should spawn");
2473
2474 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2475 assert_eq!(child.identity.depth, 1);
2476
2477 let lineage = runtime.lineage_graph().await;
2478 assert_eq!(lineage.total_edges, 1);
2479 assert_eq!(lineage.roots, vec!["root".to_string()]);
2480 }
2481
2482 #[tokio::test]
2483 async fn branching_and_depth_limits_are_enforced() {
2484 let runtime = test_runtime();
2485
2486 runtime
2487 .create_persona(CreatePersonaRequest {
2488 persona_id: Some("root".to_string()),
2489 name: "root".to_string(),
2490 role: "orchestrator".to_string(),
2491 charter: "coordinate".to_string(),
2492 swarm_id: Some("swarm-a".to_string()),
2493 parent_id: None,
2494 policy: None,
2495 tags: Vec::new(),
2496 })
2497 .await
2498 .expect("root should be created");
2499
2500 runtime
2501 .spawn_child(
2502 "root",
2503 SpawnPersonaRequest {
2504 persona_id: Some("c1".to_string()),
2505 name: "c1".to_string(),
2506 role: "worker".to_string(),
2507 charter: "run".to_string(),
2508 swarm_id: None,
2509 policy: None,
2510 },
2511 )
2512 .await
2513 .expect("first child should spawn");
2514
2515 runtime
2516 .spawn_child(
2517 "root",
2518 SpawnPersonaRequest {
2519 persona_id: Some("c2".to_string()),
2520 name: "c2".to_string(),
2521 role: "worker".to_string(),
2522 charter: "run".to_string(),
2523 swarm_id: None,
2524 policy: None,
2525 },
2526 )
2527 .await
2528 .expect("second child should spawn");
2529
2530 let third_child = runtime
2531 .spawn_child(
2532 "root",
2533 SpawnPersonaRequest {
2534 persona_id: Some("c3".to_string()),
2535 name: "c3".to_string(),
2536 role: "worker".to_string(),
2537 charter: "run".to_string(),
2538 swarm_id: None,
2539 policy: None,
2540 },
2541 )
2542 .await;
2543 assert!(third_child.is_err());
2544
2545 runtime
2546 .spawn_child(
2547 "c1",
2548 SpawnPersonaRequest {
2549 persona_id: Some("c1-1".to_string()),
2550 name: "c1-1".to_string(),
2551 role: "worker".to_string(),
2552 charter: "run".to_string(),
2553 swarm_id: None,
2554 policy: None,
2555 },
2556 )
2557 .await
2558 .expect("depth 2 should be allowed");
2559
2560 let depth_violation = runtime
2561 .spawn_child(
2562 "c1-1",
2563 SpawnPersonaRequest {
2564 persona_id: Some("c1-1-1".to_string()),
2565 name: "c1-1-1".to_string(),
2566 role: "worker".to_string(),
2567 charter: "run".to_string(),
2568 swarm_id: None,
2569 policy: None,
2570 },
2571 )
2572 .await;
2573 assert!(depth_violation.is_err());
2574 }
2575
2576 #[tokio::test]
2577 async fn start_stop_updates_runtime_status() {
2578 let runtime = test_runtime();
2579
2580 runtime
2581 .start(Some(StartCognitionRequest {
2582 loop_interval_ms: Some(10),
2583 seed_persona: Some(CreatePersonaRequest {
2584 persona_id: Some("seed".to_string()),
2585 name: "seed".to_string(),
2586 role: "watcher".to_string(),
2587 charter: "observe".to_string(),
2588 swarm_id: Some("swarm-seed".to_string()),
2589 parent_id: None,
2590 policy: None,
2591 tags: Vec::new(),
2592 }),
2593 }))
2594 .await
2595 .expect("runtime should start");
2596
2597 tokio::time::sleep(Duration::from_millis(60)).await;
2598 let running_status = runtime.status().await;
2599 assert!(running_status.running);
2600 assert!(running_status.events_buffered > 0);
2601
2602 runtime
2603 .stop(Some("test".to_string()))
2604 .await
2605 .expect("runtime should stop");
2606 let stopped_status = runtime.status().await;
2607 assert!(!stopped_status.running);
2608 }
2609
2610 #[tokio::test]
2611 async fn zero_budget_persona_is_skipped() {
2612 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2613 enabled: true,
2614 loop_interval_ms: 10,
2615 max_events: 256,
2616 max_snapshots: 32,
2617 default_policy: PersonaPolicy {
2618 max_spawn_depth: 2,
2619 max_branching_factor: 2,
2620 token_budget_per_minute: 0,
2621 compute_ms_per_minute: 0,
2622 idle_ttl_secs: 3_600,
2623 share_memory: false,
2624 allowed_tools: Vec::new(),
2625 },
2626 });
2627
2628 let persona = runtime
2629 .create_persona(CreatePersonaRequest {
2630 persona_id: Some("budget-test".to_string()),
2631 name: "budget-test".to_string(),
2632 role: "tester".to_string(),
2633 charter: "test budgets".to_string(),
2634 swarm_id: None,
2635 parent_id: None,
2636 policy: None,
2637 tags: Vec::new(),
2638 })
2639 .await
2640 .expect("should create persona");
2641
2642 assert_eq!(persona.tokens_this_window, 0);
2643 assert_eq!(persona.compute_ms_this_window, 0);
2644
2645 runtime.start(None).await.expect("should start");
2647 tokio::time::sleep(Duration::from_millis(50)).await;
2648 runtime.stop(None).await.expect("should stop");
2649
2650 let p = runtime.get_persona("budget-test").await.unwrap();
2652 assert_eq!(p.thought_count, 0);
2653 assert!(p.budget_paused);
2654 }
2655
2656 #[tokio::test]
2657 async fn budget_counters_reset_after_window() {
2658 let now = Utc::now();
2659 let mut persona = PersonaRuntimeState {
2660 identity: PersonaIdentity {
2661 id: "p1".to_string(),
2662 name: "test".to_string(),
2663 role: "tester".to_string(),
2664 charter: "test".to_string(),
2665 swarm_id: None,
2666 parent_id: None,
2667 depth: 0,
2668 created_at: now,
2669 tags: Vec::new(),
2670 },
2671 policy: PersonaPolicy::default(),
2672 status: PersonaStatus::Active,
2673 thought_count: 0,
2674 last_tick_at: None,
2675 updated_at: now,
2676 tokens_this_window: 5000,
2677 compute_ms_this_window: 3000,
2678 window_started_at: now - ChronoDuration::seconds(61),
2679 last_progress_at: now,
2680 budget_paused: false,
2681 };
2682
2683 let window_elapsed = now
2685 .signed_duration_since(persona.window_started_at)
2686 .num_seconds();
2687 assert!(window_elapsed >= 60);
2688
2689 persona.tokens_this_window = 0;
2691 persona.compute_ms_this_window = 0;
2692 persona.window_started_at = now;
2693
2694 assert_eq!(persona.tokens_this_window, 0);
2695 assert_eq!(persona.compute_ms_this_window, 0);
2696 }
2697
2698 #[tokio::test]
2699 async fn idle_persona_is_reaped() {
2700 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2701 enabled: true,
2702 loop_interval_ms: 10,
2703 max_events: 256,
2704 max_snapshots: 32,
2705 default_policy: PersonaPolicy {
2706 max_spawn_depth: 2,
2707 max_branching_factor: 2,
2708 token_budget_per_minute: 20_000,
2709 compute_ms_per_minute: 10_000,
2710 idle_ttl_secs: 0, share_memory: false,
2712 allowed_tools: Vec::new(),
2713 },
2714 });
2715
2716 runtime
2717 .create_persona(CreatePersonaRequest {
2718 persona_id: Some("idle-test".to_string()),
2719 name: "idle-test".to_string(),
2720 role: "idler".to_string(),
2721 charter: "idle away".to_string(),
2722 swarm_id: None,
2723 parent_id: None,
2724 policy: None,
2725 tags: Vec::new(),
2726 })
2727 .await
2728 .expect("should create persona");
2729
2730 {
2732 let mut personas = runtime.personas.write().await;
2733 if let Some(p) = personas.get_mut("idle-test") {
2734 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2735 }
2736 }
2737
2738 runtime.start(None).await.expect("should start");
2739 tokio::time::sleep(Duration::from_millis(100)).await;
2740 runtime.stop(None).await.expect("should stop");
2741
2742 let p = runtime.get_persona("idle-test").await.unwrap();
2743 assert_eq!(p.status, PersonaStatus::Reaped);
2744 }
2745
2746 #[tokio::test]
2747 async fn budget_paused_persona_not_reaped_for_idle() {
2748 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2749 enabled: true,
2750 loop_interval_ms: 10,
2751 max_events: 256,
2752 max_snapshots: 32,
2753 default_policy: PersonaPolicy {
2754 max_spawn_depth: 2,
2755 max_branching_factor: 2,
2756 token_budget_per_minute: 0, compute_ms_per_minute: 0,
2758 idle_ttl_secs: 0, share_memory: false,
2760 allowed_tools: Vec::new(),
2761 },
2762 });
2763
2764 runtime
2765 .create_persona(CreatePersonaRequest {
2766 persona_id: Some("paused-test".to_string()),
2767 name: "paused-test".to_string(),
2768 role: "pauser".to_string(),
2769 charter: "pause".to_string(),
2770 swarm_id: None,
2771 parent_id: None,
2772 policy: None,
2773 tags: Vec::new(),
2774 })
2775 .await
2776 .expect("should create persona");
2777
2778 {
2780 let mut personas = runtime.personas.write().await;
2781 if let Some(p) = personas.get_mut("paused-test") {
2782 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2783 }
2784 }
2785
2786 runtime.start(None).await.expect("should start");
2787 tokio::time::sleep(Duration::from_millis(100)).await;
2788 runtime.stop(None).await.expect("should stop");
2789
2790 let p = runtime.get_persona("paused-test").await.unwrap();
2791 assert_eq!(p.status, PersonaStatus::Active);
2793 assert!(p.budget_paused);
2794 }
2795
2796 #[tokio::test]
2797 async fn governance_proposal_resolution() {
2798 let runtime = test_runtime();
2799 let gov = SwarmGovernance {
2800 quorum_fraction: 0.5,
2801 required_approvers_by_role: HashMap::new(),
2802 veto_roles: vec!["auditor".to_string()],
2803 vote_timeout_secs: 300,
2804 };
2805 *runtime.governance.write().await = gov;
2806
2807 runtime
2809 .create_persona(CreatePersonaRequest {
2810 persona_id: Some("voter-1".to_string()),
2811 name: "voter-1".to_string(),
2812 role: "engineer".to_string(),
2813 charter: "vote".to_string(),
2814 swarm_id: None,
2815 parent_id: None,
2816 policy: None,
2817 tags: Vec::new(),
2818 })
2819 .await
2820 .unwrap();
2821 runtime
2822 .create_persona(CreatePersonaRequest {
2823 persona_id: Some("voter-2".to_string()),
2824 name: "voter-2".to_string(),
2825 role: "engineer".to_string(),
2826 charter: "vote".to_string(),
2827 swarm_id: None,
2828 parent_id: None,
2829 policy: None,
2830 tags: Vec::new(),
2831 })
2832 .await
2833 .unwrap();
2834
2835 {
2837 let mut proposals = runtime.proposals.write().await;
2838 let mut votes = HashMap::new();
2839 votes.insert("voter-1".to_string(), ProposalVote::Approve);
2840 proposals.insert(
2841 "prop-1".to_string(),
2842 Proposal {
2843 id: "prop-1".to_string(),
2844 persona_id: "voter-1".to_string(),
2845 title: "test proposal".to_string(),
2846 rationale: "testing governance".to_string(),
2847 evidence_refs: Vec::new(),
2848 risk: ProposalRisk::Low,
2849 status: ProposalStatus::Created,
2850 created_at: Utc::now(),
2851 votes,
2852 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2853 votes_requested: true,
2854 quorum_needed: 1,
2855 },
2856 );
2857 }
2858
2859 runtime.start(None).await.unwrap();
2861 tokio::time::sleep(Duration::from_millis(100)).await;
2862 runtime.stop(None).await.unwrap();
2863
2864 let proposals = runtime.get_proposals().await;
2865 let prop = proposals.get("prop-1").unwrap();
2866 assert!(
2868 prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2869 "Expected Verified or Executed, got {:?}",
2870 prop.status
2871 );
2872 }
2873
2874 #[tokio::test]
2875 async fn veto_rejects_proposal() {
2876 let runtime = test_runtime();
2877 let gov = SwarmGovernance {
2878 quorum_fraction: 0.5,
2879 required_approvers_by_role: HashMap::new(),
2880 veto_roles: vec!["auditor".to_string()],
2881 vote_timeout_secs: 300,
2882 };
2883 *runtime.governance.write().await = gov;
2884
2885 runtime
2886 .create_persona(CreatePersonaRequest {
2887 persona_id: Some("eng".to_string()),
2888 name: "eng".to_string(),
2889 role: "engineer".to_string(),
2890 charter: "build".to_string(),
2891 swarm_id: None,
2892 parent_id: None,
2893 policy: None,
2894 tags: Vec::new(),
2895 })
2896 .await
2897 .unwrap();
2898 runtime
2899 .create_persona(CreatePersonaRequest {
2900 persona_id: Some("aud".to_string()),
2901 name: "aud".to_string(),
2902 role: "auditor".to_string(),
2903 charter: "audit".to_string(),
2904 swarm_id: None,
2905 parent_id: None,
2906 policy: None,
2907 tags: Vec::new(),
2908 })
2909 .await
2910 .unwrap();
2911
2912 {
2913 let mut proposals = runtime.proposals.write().await;
2914 let mut votes = HashMap::new();
2915 votes.insert("eng".to_string(), ProposalVote::Approve);
2916 votes.insert("aud".to_string(), ProposalVote::Veto);
2917 proposals.insert(
2918 "prop-veto".to_string(),
2919 Proposal {
2920 id: "prop-veto".to_string(),
2921 persona_id: "eng".to_string(),
2922 title: "vetoed proposal".to_string(),
2923 rationale: "testing veto".to_string(),
2924 evidence_refs: Vec::new(),
2925 risk: ProposalRisk::Low,
2926 status: ProposalStatus::Created,
2927 created_at: Utc::now(),
2928 votes,
2929 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2930 votes_requested: true,
2931 quorum_needed: 1,
2932 },
2933 );
2934 }
2935
2936 runtime.start(None).await.unwrap();
2937 tokio::time::sleep(Duration::from_millis(100)).await;
2938 runtime.stop(None).await.unwrap();
2939
2940 let proposals = runtime.get_proposals().await;
2941 let prop = proposals.get("prop-veto").unwrap();
2942 assert_eq!(prop.status, ProposalStatus::Rejected);
2943 }
2944
2945 #[test]
2946 fn global_workspace_default() {
2947 let ws = GlobalWorkspace::default();
2948 assert!(ws.top_beliefs.is_empty());
2949 assert!(ws.top_uncertainties.is_empty());
2950 assert!(ws.top_attention.is_empty());
2951 }
2952
2953 #[test]
2954 fn attention_item_creation() {
2955 let item = AttentionItem {
2956 id: "a1".to_string(),
2957 topic: "test topic".to_string(),
2958 topic_tags: vec!["reliability".to_string()],
2959 priority: 0.8,
2960 source_type: AttentionSource::ContestedBelief,
2961 source_id: "b1".to_string(),
2962 assigned_persona: None,
2963 created_at: Utc::now(),
2964 resolved_at: None,
2965 };
2966 assert!(item.resolved_at.is_none());
2967 assert_eq!(item.priority, 0.8);
2968 }
2969}