1pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16pub use thinker::{
17 CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
18};
19
20use crate::tool::ToolRegistry;
21use anyhow::{Result, anyhow};
22use beliefs::{Belief, BeliefStatus};
23use chrono::{DateTime, Duration as ChronoDuration, Utc};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::time::Duration;
30use tokio::sync::{Mutex, RwLock, broadcast};
31use tokio::task::JoinHandle;
32
33const _: () = {
35 fn _assert_types_used() {
36 let _ = std::mem::size_of::<CandleDevicePreference>();
37 let _ = std::mem::size_of::<ThinkerBackend>();
38 let _ = std::mem::size_of::<ThinkerOutput>();
39 }
40};
41use tokio::time::Instant;
42use uuid::Uuid;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47pub enum PersonaStatus {
48 Active,
49 Idle,
50 Reaped,
51 Error,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct PersonaPolicy {
57 pub max_spawn_depth: u32,
58 pub max_branching_factor: u32,
59 pub token_budget_per_minute: u32,
60 pub compute_ms_per_minute: u32,
61 pub idle_ttl_secs: u64,
62 pub share_memory: bool,
63 #[serde(default)]
64 pub allowed_tools: Vec<String>,
65}
66
67impl Default for PersonaPolicy {
68 fn default() -> Self {
69 Self {
70 max_spawn_depth: 4,
71 max_branching_factor: 4,
72 token_budget_per_minute: 20_000,
73 compute_ms_per_minute: 10_000,
74 idle_ttl_secs: 3_600,
75 share_memory: false,
76 allowed_tools: Vec::new(),
77 }
78 }
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct PersonaIdentity {
84 pub id: String,
85 pub name: String,
86 pub role: String,
87 pub charter: String,
88 pub swarm_id: Option<String>,
89 pub parent_id: Option<String>,
90 pub depth: u32,
91 pub created_at: DateTime<Utc>,
92 #[serde(default)]
93 pub tags: Vec<String>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct PersonaRuntimeState {
99 pub identity: PersonaIdentity,
100 pub policy: PersonaPolicy,
101 pub status: PersonaStatus,
102 pub thought_count: u64,
103 pub last_tick_at: Option<DateTime<Utc>>,
104 pub updated_at: DateTime<Utc>,
105 pub tokens_this_window: u32,
107 pub compute_ms_this_window: u32,
109 pub window_started_at: DateTime<Utc>,
111 pub last_progress_at: DateTime<Utc>,
113 #[serde(default)]
115 pub budget_paused: bool,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
120#[serde(rename_all = "snake_case")]
121pub enum ProposalRisk {
122 Low,
123 Medium,
124 High,
125 Critical,
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(rename_all = "snake_case")]
131pub enum ProposalStatus {
132 Created,
133 Verified,
134 Rejected,
135 Executed,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum ProposalVote {
142 Approve,
143 Reject,
144 Veto,
145 Abstain,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Proposal {
151 pub id: String,
152 pub persona_id: String,
153 pub title: String,
154 pub rationale: String,
155 pub evidence_refs: Vec<String>,
156 pub risk: ProposalRisk,
157 pub status: ProposalStatus,
158 pub created_at: DateTime<Utc>,
159 #[serde(default)]
161 pub votes: HashMap<String, ProposalVote>,
162 pub vote_deadline: Option<DateTime<Utc>>,
164 #[serde(default)]
166 pub votes_requested: bool,
167 #[serde(default)]
169 pub quorum_needed: usize,
170}
171
172#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
174#[serde(rename_all = "snake_case")]
175pub enum ThoughtEventType {
176 ThoughtGenerated,
177 HypothesisRaised,
178 CheckRequested,
179 CheckResult,
180 ProposalCreated,
181 ProposalVerified,
182 ProposalRejected,
183 ActionExecuted,
184 PersonaSpawned,
185 PersonaReaped,
186 SnapshotCompressed,
187 BeliefExtracted,
188 BeliefContested,
189 BeliefRevalidated,
190 BudgetPaused,
191 IdleReaped,
192 AttentionCreated,
193 VoteCast,
194 WorkspaceUpdated,
195}
196
197impl ThoughtEventType {
198 fn as_str(&self) -> &'static str {
199 match self {
200 Self::ThoughtGenerated => "thought_generated",
201 Self::HypothesisRaised => "hypothesis_raised",
202 Self::CheckRequested => "check_requested",
203 Self::CheckResult => "check_result",
204 Self::ProposalCreated => "proposal_created",
205 Self::ProposalVerified => "proposal_verified",
206 Self::ProposalRejected => "proposal_rejected",
207 Self::ActionExecuted => "action_executed",
208 Self::PersonaSpawned => "persona_spawned",
209 Self::PersonaReaped => "persona_reaped",
210 Self::SnapshotCompressed => "snapshot_compressed",
211 Self::BeliefExtracted => "belief_extracted",
212 Self::BeliefContested => "belief_contested",
213 Self::BeliefRevalidated => "belief_revalidated",
214 Self::BudgetPaused => "budget_paused",
215 Self::IdleReaped => "idle_reaped",
216 Self::AttentionCreated => "attention_created",
217 Self::VoteCast => "vote_cast",
218 Self::WorkspaceUpdated => "workspace_updated",
219 }
220 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct ThoughtEvent {
226 pub id: String,
227 pub event_type: ThoughtEventType,
228 pub persona_id: Option<String>,
229 pub swarm_id: Option<String>,
230 pub timestamp: DateTime<Utc>,
231 pub payload: serde_json::Value,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct MemorySnapshot {
237 pub id: String,
238 pub generated_at: DateTime<Utc>,
239 pub swarm_id: Option<String>,
240 pub persona_scope: Vec<String>,
241 pub summary: String,
242 pub hot_event_count: usize,
243 pub warm_fact_count: usize,
244 pub cold_snapshot_count: usize,
245 pub metadata: HashMap<String, serde_json::Value>,
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct CreatePersonaRequest {
251 pub persona_id: Option<String>,
252 pub name: String,
253 pub role: String,
254 pub charter: String,
255 pub swarm_id: Option<String>,
256 pub parent_id: Option<String>,
257 pub policy: Option<PersonaPolicy>,
258 #[serde(default)]
259 pub tags: Vec<String>,
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct SpawnPersonaRequest {
265 pub persona_id: Option<String>,
266 pub name: String,
267 pub role: String,
268 pub charter: String,
269 pub swarm_id: Option<String>,
270 pub policy: Option<PersonaPolicy>,
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct ReapPersonaRequest {
276 pub cascade: Option<bool>,
277 pub reason: Option<String>,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct StartCognitionRequest {
283 pub loop_interval_ms: Option<u64>,
284 pub seed_persona: Option<CreatePersonaRequest>,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct StopCognitionRequest {
290 pub reason: Option<String>,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
297#[serde(rename_all = "snake_case")]
298pub enum AttentionSource {
299 ContestedBelief,
300 FailedCheck,
301 StaleBelief,
302 ProposalTimeout,
303 FailedExecution,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct AttentionItem {
309 pub id: String,
310 pub topic: String,
311 pub topic_tags: Vec<String>,
312 pub priority: f32,
313 pub source_type: AttentionSource,
314 pub source_id: String,
315 pub assigned_persona: Option<String>,
316 pub created_at: DateTime<Utc>,
317 pub resolved_at: Option<DateTime<Utc>>,
318}
319
320#[derive(Debug, Clone, Serialize, Deserialize)]
322pub struct SwarmGovernance {
323 pub quorum_fraction: f32,
324 pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
325 pub veto_roles: Vec<String>,
326 pub vote_timeout_secs: u64,
327}
328
329impl Default for SwarmGovernance {
330 fn default() -> Self {
331 Self {
332 quorum_fraction: 0.5,
333 required_approvers_by_role: HashMap::new(),
334 veto_roles: Vec::new(),
335 vote_timeout_secs: 300,
336 }
337 }
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct GlobalWorkspace {
343 pub top_beliefs: Vec<String>,
344 pub top_uncertainties: Vec<String>,
345 pub top_attention: Vec<String>,
346 pub active_objectives: Vec<String>,
347 pub updated_at: DateTime<Utc>,
348}
349
350impl Default for GlobalWorkspace {
351 fn default() -> Self {
352 Self {
353 top_beliefs: Vec::new(),
354 top_uncertainties: Vec::new(),
355 top_attention: Vec::new(),
356 active_objectives: Vec::new(),
357 updated_at: Utc::now(),
358 }
359 }
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct CognitionStatus {
365 pub enabled: bool,
366 pub running: bool,
367 pub loop_interval_ms: u64,
368 pub started_at: Option<DateTime<Utc>>,
369 pub last_tick_at: Option<DateTime<Utc>>,
370 pub persona_count: usize,
371 pub active_persona_count: usize,
372 pub events_buffered: usize,
373 pub snapshots_buffered: usize,
374}
375
376#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct ReapPersonaResponse {
379 pub reaped_ids: Vec<String>,
380 pub count: usize,
381}
382
383#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct LineageNode {
386 pub persona_id: String,
387 pub parent_id: Option<String>,
388 pub children: Vec<String>,
389 pub depth: u32,
390 pub status: PersonaStatus,
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct LineageGraph {
396 pub nodes: Vec<LineageNode>,
397 pub roots: Vec<String>,
398 pub total_edges: usize,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402enum ThoughtPhase {
403 Observe,
404 Reflect,
405 Test,
406 Compress,
407}
408
409impl ThoughtPhase {
410 fn from_thought_count(thought_count: u64) -> Self {
411 match thought_count % 4 {
412 1 => Self::Observe,
413 2 => Self::Reflect,
414 3 => Self::Test,
415 _ => Self::Compress,
416 }
417 }
418
419 fn as_str(&self) -> &'static str {
420 match self {
421 Self::Observe => "observe",
422 Self::Reflect => "reflect",
423 Self::Test => "test",
424 Self::Compress => "compress",
425 }
426 }
427
428 fn event_type(&self) -> ThoughtEventType {
429 match self {
430 Self::Observe => ThoughtEventType::ThoughtGenerated,
431 Self::Reflect => ThoughtEventType::HypothesisRaised,
432 Self::Test => ThoughtEventType::CheckRequested,
433 Self::Compress => ThoughtEventType::SnapshotCompressed,
434 }
435 }
436}
437
438#[derive(Debug, Clone)]
439struct ThoughtWorkItem {
440 persona_id: String,
441 persona_name: String,
442 role: String,
443 charter: String,
444 swarm_id: Option<String>,
445 thought_count: u64,
446 phase: ThoughtPhase,
447}
448
449#[derive(Debug, Clone)]
450struct ThoughtResult {
451 source: &'static str,
452 model: Option<String>,
453 finish_reason: Option<String>,
454 thinking: String,
455 prompt_tokens: Option<u32>,
456 completion_tokens: Option<u32>,
457 total_tokens: Option<u32>,
458 latency_ms: u128,
459 error: Option<String>,
460}
461
462#[derive(Debug, Clone)]
464pub struct CognitionRuntimeOptions {
465 pub enabled: bool,
466 pub loop_interval_ms: u64,
467 pub max_events: usize,
468 pub max_snapshots: usize,
469 pub default_policy: PersonaPolicy,
470}
471
472impl Default for CognitionRuntimeOptions {
473 fn default() -> Self {
474 Self {
475 enabled: false,
476 loop_interval_ms: 2_000,
477 max_events: 2_000,
478 max_snapshots: 128,
479 default_policy: PersonaPolicy::default(),
480 }
481 }
482}
483
484#[derive(Debug)]
486pub struct CognitionRuntime {
487 enabled: bool,
488 max_events: usize,
489 max_snapshots: usize,
490 default_policy: PersonaPolicy,
491 running: Arc<AtomicBool>,
492 loop_interval_ms: Arc<RwLock<u64>>,
493 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
494 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
495 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
496 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
497 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
498 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
499 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
500 event_tx: broadcast::Sender<ThoughtEvent>,
501 thinker: Option<Arc<ThinkerClient>>,
502 beliefs: Arc<RwLock<HashMap<String, Belief>>>,
503 attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
504 governance: Arc<RwLock<SwarmGovernance>>,
505 workspace: Arc<RwLock<GlobalWorkspace>>,
506 tools: Option<Arc<ToolRegistry>>,
507 receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
508 pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
510}
511
512impl CognitionRuntime {
513 pub fn new_from_env() -> Self {
515 let mut options = CognitionRuntimeOptions::default();
516 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
517 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
518 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
519 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
520
521 options.default_policy = PersonaPolicy {
522 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
523 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
524 token_budget_per_minute: env_u32(
525 "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
526 20_000,
527 ),
528 compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
529 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
530 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
531 allowed_tools: Vec::new(),
532 };
533
534 let thinker_backend = thinker::ThinkerBackend::from_env(
535 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
536 .unwrap_or_else(|_| "openai_compat".to_string()),
537 );
538 let thinker_timeout_default = match thinker_backend {
539 thinker::ThinkerBackend::OpenAICompat => 30_000,
540 thinker::ThinkerBackend::Candle => 12_000,
541 thinker::ThinkerBackend::Bedrock => 60_000,
542 };
543 let thinker_config = ThinkerConfig {
544 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
545 backend: thinker_backend,
546 endpoint: normalize_thinker_endpoint(
547 &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
548 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
549 ),
550 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
551 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
552 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
553 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
554 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
555 .ok()
556 .and_then(|v| v.parse::<f32>().ok()),
557 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
558 timeout_ms: env_u64(
559 "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
560 thinker_timeout_default,
561 ),
562 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
563 candle_tokenizer_path: std::env::var(
564 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
565 )
566 .ok(),
567 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
568 candle_device: thinker::CandleDevicePreference::from_env(
569 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
570 .unwrap_or_else(|_| "auto".to_string()),
571 ),
572 candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
573 candle_repeat_penalty: env_f32(
574 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
575 1.1,
576 ),
577 candle_repeat_last_n: env_usize(
578 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
579 64,
580 ),
581 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
582 bedrock_region: std::env::var("CODETETHER_COGNITION_THINKER_BEDROCK_REGION")
583 .unwrap_or_else(|_| {
584 std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-west-2".to_string())
585 }),
586 };
587
588 let mut runtime = Self::new_with_options(options);
589 runtime.thinker = Some(thinker_config).and_then(|cfg| {
591 if !cfg.enabled {
592 return None;
593 }
594 match ThinkerClient::new(cfg) {
595 Ok(client) => {
596 tracing::info!(
597 backend = ?client.config().backend,
598 endpoint = %client.config().endpoint,
599 model = %client.config().model,
600 "Cognition thinker initialized"
601 );
602 Some(Arc::new(client))
603 }
604 Err(error) => {
605 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
606 None
607 }
608 }
609 });
610 runtime
611 }
612
613 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
615 Self::new_with_options_and_thinker(options, None)
616 }
617
618 fn new_with_options_and_thinker(
619 options: CognitionRuntimeOptions,
620 thinker_config: Option<ThinkerConfig>,
621 ) -> Self {
622 let (event_tx, _) = broadcast::channel(256);
623 let thinker = thinker_config.and_then(|cfg| {
624 if !cfg.enabled {
625 return None;
626 }
627 match ThinkerClient::new(cfg) {
628 Ok(client) => {
629 tracing::info!(
630 backend = ?client.config().backend,
631 endpoint = %client.config().endpoint,
632 model = %client.config().model,
633 "Cognition thinker initialized"
634 );
635 Some(Arc::new(client))
636 }
637 Err(error) => {
638 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
639 None
640 }
641 }
642 });
643
644 let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) =
647 if let Some(persisted) = persistence::load_state() {
648 tracing::info!(
649 personas = persisted.personas.len(),
650 beliefs = persisted.beliefs.len(),
651 persisted_at = %persisted.persisted_at,
652 "Restoring persisted cognition state"
653 );
654 (
655 persisted.personas,
656 persisted.beliefs,
657 persisted.proposals,
658 persisted.attention_queue,
659 persisted.workspace,
660 )
661 } else {
662 (
663 HashMap::new(),
664 HashMap::new(),
665 HashMap::new(),
666 Vec::new(),
667 GlobalWorkspace::default(),
668 )
669 };
670
671 let runtime = Self {
672 enabled: options.enabled,
673 max_events: options.max_events.max(32),
674 max_snapshots: options.max_snapshots.max(8),
675 default_policy: options.default_policy,
676 running: Arc::new(AtomicBool::new(false)),
677 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
678 started_at: Arc::new(RwLock::new(None)),
679 last_tick_at: Arc::new(RwLock::new(None)),
680 personas: Arc::new(RwLock::new(init_personas)),
681 proposals: Arc::new(RwLock::new(init_proposals)),
682 events: Arc::new(RwLock::new(VecDeque::new())),
683 snapshots: Arc::new(RwLock::new(VecDeque::new())),
684 loop_handle: Arc::new(Mutex::new(None)),
685 event_tx,
686 thinker,
687 beliefs: Arc::new(RwLock::new(init_beliefs)),
688 attention_queue: Arc::new(RwLock::new(init_attention)),
689 governance: Arc::new(RwLock::new(SwarmGovernance::default())),
690 workspace: Arc::new(RwLock::new(init_workspace)),
691 tools: None,
692 receipts: Arc::new(RwLock::new(Vec::new())),
693 pending_approvals: Arc::new(RwLock::new(HashMap::new())),
694 };
695
696 runtime
697 }
698
699 pub fn is_enabled(&self) -> bool {
701 self.enabled
702 }
703
704 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
706 self.event_tx.subscribe()
707 }
708
709 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
711 if !self.enabled {
712 return Err(anyhow!(
713 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
714 ));
715 }
716
717 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
718 if let Some(request) = req {
719 if let Some(interval) = request.loop_interval_ms {
720 let mut lock = self.loop_interval_ms.write().await;
721 *lock = interval.max(100);
722 }
723 requested_seed_persona = request.seed_persona;
724 }
725
726 let has_personas = !self.personas.read().await.is_empty();
727 if !has_personas {
728 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
729 let _ = self.create_persona(seed).await?;
730 }
731
732 if self.running.load(Ordering::SeqCst) {
733 return Ok(self.status().await);
734 }
735
736 self.running.store(true, Ordering::SeqCst);
737 {
738 let mut started = self.started_at.write().await;
739 *started = Some(Utc::now());
740 }
741
742 let running = Arc::clone(&self.running);
743 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
744 let last_tick_at = Arc::clone(&self.last_tick_at);
745 let personas = Arc::clone(&self.personas);
746 let proposals = Arc::clone(&self.proposals);
747 let events = Arc::clone(&self.events);
748 let snapshots = Arc::clone(&self.snapshots);
749 let max_events = self.max_events;
750 let max_snapshots = self.max_snapshots;
751 let event_tx = self.event_tx.clone();
752 let thinker = self.thinker.clone();
753 let beliefs = Arc::clone(&self.beliefs);
754 let attention_queue = Arc::clone(&self.attention_queue);
755 let governance = Arc::clone(&self.governance);
756 let workspace = Arc::clone(&self.workspace);
757 let tools = self.tools.clone();
758 let receipts = Arc::clone(&self.receipts);
759 let pending_approvals = Arc::clone(&self.pending_approvals);
760
761 let handle = tokio::spawn(async move {
762 let mut next_tick = Instant::now();
763 while running.load(Ordering::SeqCst) {
764 let now_instant = Instant::now();
765 if now_instant < next_tick {
766 tokio::time::sleep_until(next_tick).await;
767 }
768 if !running.load(Ordering::SeqCst) {
769 break;
770 }
771
772 let now = Utc::now();
773 {
774 let mut lock = last_tick_at.write().await;
775 *lock = Some(now);
776 }
777
778 let mut new_events = Vec::new();
779 let mut new_snapshots = Vec::new();
780 let mut new_proposals = Vec::new();
781
782 let work_items: Vec<ThoughtWorkItem> = {
784 let mut map = personas.write().await;
785 let mut items = Vec::new();
786 for persona in map.values_mut() {
787 if persona.status != PersonaStatus::Active {
788 continue;
789 }
790
791 let window_elapsed = now
793 .signed_duration_since(persona.window_started_at)
794 .num_seconds();
795 if window_elapsed >= 60 {
796 persona.tokens_this_window = 0;
797 persona.compute_ms_this_window = 0;
798 persona.window_started_at = now;
799 }
800
801 let token_ok =
803 persona.tokens_this_window < persona.policy.token_budget_per_minute;
804 let compute_ok =
805 persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
806 if !token_ok || !compute_ok {
807 if !persona.budget_paused {
808 persona.budget_paused = true;
809 new_events.push(ThoughtEvent {
810 id: Uuid::new_v4().to_string(),
811 event_type: ThoughtEventType::BudgetPaused,
812 persona_id: Some(persona.identity.id.clone()),
813 swarm_id: persona.identity.swarm_id.clone(),
814 timestamp: now,
815 payload: json!({
816 "budget_paused": true,
817 "tokens_used": persona.tokens_this_window,
818 "compute_ms_used": persona.compute_ms_this_window,
819 "token_budget": persona.policy.token_budget_per_minute,
820 "compute_budget": persona.policy.compute_ms_per_minute,
821 }),
822 });
823 }
824 continue; }
826
827 persona.budget_paused = false;
828 persona.thought_count = persona.thought_count.saturating_add(1);
829 persona.last_tick_at = Some(now);
830 persona.updated_at = now;
831 items.push(ThoughtWorkItem {
832 persona_id: persona.identity.id.clone(),
833 persona_name: persona.identity.name.clone(),
834 role: persona.identity.role.clone(),
835 charter: persona.identity.charter.clone(),
836 swarm_id: persona.identity.swarm_id.clone(),
837 thought_count: persona.thought_count,
838 phase: ThoughtPhase::from_thought_count(persona.thought_count),
839 });
840 }
841 items
842 };
843
844 for work in &work_items {
845 let context = recent_persona_context(&events, &work.persona_id, 8).await;
846
847 let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
848
849 let event_timestamp = Utc::now();
850 let is_fallback = thought.source == "fallback";
851 let tokens_used = thought.total_tokens.unwrap_or(0);
852 let compute_used = thought.latency_ms as u32;
853
854 new_events.push(ThoughtEvent {
855 id: Uuid::new_v4().to_string(),
856 event_type: work.phase.event_type(),
857 persona_id: Some(work.persona_id.clone()),
858 swarm_id: work.swarm_id.clone(),
859 timestamp: event_timestamp,
860 payload: json!({
861 "phase": work.phase.as_str(),
862 "thought_count": work.thought_count,
863 "persona": {
864 "id": work.persona_id.clone(),
865 "name": work.persona_name.clone(),
866 "role": work.role.clone(),
867 },
868 "context_event_count": context.len(),
869 "thinking": thought.thinking.clone(),
870 "source": thought.source,
871 "model": thought.model.clone(),
872 "finish_reason": thought.finish_reason.clone(),
873 "usage": {
874 "prompt_tokens": thought.prompt_tokens,
875 "completion_tokens": thought.completion_tokens,
876 "total_tokens": thought.total_tokens,
877 },
878 "latency_ms": thought.latency_ms,
879 "error": thought.error.clone(),
880 }),
881 });
882
883 {
885 let mut map = personas.write().await;
886 if let Some(persona) = map.get_mut(&work.persona_id) {
887 persona.tokens_this_window =
888 persona.tokens_this_window.saturating_add(tokens_used);
889 persona.compute_ms_this_window =
890 persona.compute_ms_this_window.saturating_add(compute_used);
891 if !is_fallback {
892 persona.last_progress_at = Utc::now();
893 }
894 }
895 }
896
897 if work.phase == ThoughtPhase::Reflect && !is_fallback {
899 let extracted = beliefs::extract_beliefs_from_thought(
900 thinker.as_deref(),
901 &work.persona_id,
902 &thought.thinking,
903 )
904 .await;
905
906 if !extracted.is_empty() {
907 let mut belief_store = beliefs.write().await;
908 let mut attn_queue = attention_queue.write().await;
909 for mut new_belief in extracted {
910 let existing_id = belief_store
912 .values()
913 .find(|b| {
914 b.belief_key == new_belief.belief_key
915 && b.status != BeliefStatus::Invalidated
916 })
917 .map(|b| b.id.clone());
918
919 if let Some(eid) = existing_id {
920 if let Some(existing) = belief_store.get_mut(&eid) {
922 if !existing.confirmed_by.contains(&work.persona_id) {
923 existing.confirmed_by.push(work.persona_id.clone());
924 }
925 existing.revalidation_success();
926 }
927 } else {
928 let contest_targets: Vec<String> =
930 new_belief.contradicts.clone();
931 for target_key in &contest_targets {
932 if let Some(target) = belief_store.values_mut().find(|b| {
933 &b.belief_key == target_key
934 && b.status != BeliefStatus::Invalidated
935 }) {
936 target.contested_by.push(new_belief.id.clone());
937 if !new_belief.contradicts.contains(&target.belief_key)
938 {
939 new_belief
940 .contradicts
941 .push(target.belief_key.clone());
942 }
943 target.revalidation_failure();
945 if target.confidence >= 0.5 {
946 attn_queue.push(AttentionItem {
948 id: Uuid::new_v4().to_string(),
949 topic: format!(
950 "Revalidate belief: {}",
951 target.claim
952 ),
953 topic_tags: vec![target.belief_key.clone()],
954 priority: 0.7,
955 source_type: AttentionSource::ContestedBelief,
956 source_id: target.id.clone(),
957 assigned_persona: None,
958 created_at: Utc::now(),
959 resolved_at: None,
960 });
961 }
962 }
963 }
964
965 new_events.push(ThoughtEvent {
966 id: Uuid::new_v4().to_string(),
967 event_type: ThoughtEventType::BeliefExtracted,
968 persona_id: Some(work.persona_id.clone()),
969 swarm_id: work.swarm_id.clone(),
970 timestamp: Utc::now(),
971 payload: json!({
972 "belief_id": new_belief.id,
973 "belief_key": new_belief.belief_key,
974 "claim": trim_for_storage(&new_belief.claim, 280),
975 "confidence": new_belief.confidence,
976 }),
977 });
978
979 {
981 let mut map = personas.write().await;
982 if let Some(p) = map.get_mut(&work.persona_id) {
983 p.last_progress_at = Utc::now();
984 }
985 }
986
987 new_belief.clamp_confidence();
988 belief_store.insert(new_belief.id.clone(), new_belief);
989 }
990 }
991 }
992 }
993
994 if work.phase == ThoughtPhase::Test {
995 new_events.push(ThoughtEvent {
996 id: Uuid::new_v4().to_string(),
997 event_type: ThoughtEventType::CheckResult,
998 persona_id: Some(work.persona_id.clone()),
999 swarm_id: work.swarm_id.clone(),
1000 timestamp: Utc::now(),
1001 payload: json!({
1002 "phase": work.phase.as_str(),
1003 "thought_count": work.thought_count,
1004 "result_excerpt": trim_for_storage(&thought.thinking, 280),
1005 "source": thought.source,
1006 "model": thought.model,
1007 }),
1008 });
1009
1010 {
1012 let mut map = personas.write().await;
1013 if let Some(p) = map.get_mut(&work.persona_id) {
1014 p.last_progress_at = Utc::now();
1015 }
1016 }
1017
1018 if !is_fallback {
1020 if let Some(ref tool_registry) = tools {
1021 let allowed = {
1022 let map = personas.read().await;
1023 map.get(&work.persona_id)
1024 .map(|p| p.policy.allowed_tools.clone())
1025 .unwrap_or_default()
1026 };
1027 if !allowed.is_empty() {
1028 let tool_results = executor::execute_tool_requests(
1029 thinker.as_deref(),
1030 tool_registry,
1031 &work.persona_id,
1032 &thought.thinking,
1033 &allowed,
1034 )
1035 .await;
1036
1037 for result_event in tool_results {
1038 new_events.push(result_event);
1039 let mut map = personas.write().await;
1041 if let Some(p) = map.get_mut(&work.persona_id) {
1042 p.last_progress_at = Utc::now();
1043 }
1044 }
1045 }
1046 }
1047 }
1048 }
1049
1050 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
1051 let gov = governance.read().await;
1052 let proposal = Proposal {
1053 id: Uuid::new_v4().to_string(),
1054 persona_id: work.persona_id.clone(),
1055 title: proposal_title_from_thought(
1056 &thought.thinking,
1057 work.thought_count,
1058 ),
1059 rationale: trim_for_storage(&thought.thinking, 900),
1060 evidence_refs: vec!["internal.thought_stream".to_string()],
1061 risk: ProposalRisk::Low,
1062 status: ProposalStatus::Created,
1063 created_at: Utc::now(),
1064 votes: HashMap::new(),
1065 vote_deadline: Some(
1066 Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1067 ),
1068 votes_requested: false,
1069 quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1070 as usize,
1071 };
1072
1073 new_events.push(ThoughtEvent {
1074 id: Uuid::new_v4().to_string(),
1075 event_type: ThoughtEventType::ProposalCreated,
1076 persona_id: Some(work.persona_id.clone()),
1077 swarm_id: work.swarm_id.clone(),
1078 timestamp: Utc::now(),
1079 payload: json!({
1080 "proposal_id": proposal.id,
1081 "title": proposal.title,
1082 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1083 "source": thought.source,
1084 "model": thought.model,
1085 }),
1086 });
1087
1088 new_proposals.push(proposal);
1089 }
1090
1091 if work.phase == ThoughtPhase::Compress {
1092 new_snapshots.push(MemorySnapshot {
1093 id: Uuid::new_v4().to_string(),
1094 generated_at: Utc::now(),
1095 swarm_id: work.swarm_id.clone(),
1096 persona_scope: vec![work.persona_id.clone()],
1097 summary: trim_for_storage(&thought.thinking, 1_500),
1098 hot_event_count: context.len(),
1099 warm_fact_count: estimate_fact_count(&thought.thinking),
1100 cold_snapshot_count: 1,
1101 metadata: HashMap::from([
1102 (
1103 "phase".to_string(),
1104 serde_json::Value::String(work.phase.as_str().to_string()),
1105 ),
1106 (
1107 "role".to_string(),
1108 serde_json::Value::String(work.role.clone()),
1109 ),
1110 (
1111 "source".to_string(),
1112 serde_json::Value::String(thought.source.to_string()),
1113 ),
1114 (
1115 "model".to_string(),
1116 serde_json::Value::String(
1117 thought
1118 .model
1119 .clone()
1120 .unwrap_or_else(|| "fallback".to_string()),
1121 ),
1122 ),
1123 (
1124 "completion_tokens".to_string(),
1125 serde_json::Value::Number(serde_json::Number::from(
1126 thought.completion_tokens.unwrap_or(0) as u64,
1127 )),
1128 ),
1129 ]),
1130 });
1131
1132 {
1134 let mut belief_store = beliefs.write().await;
1135 let mut attn_queue = attention_queue.write().await;
1136 let stale_ids: Vec<String> = belief_store
1137 .values()
1138 .filter(|b| {
1139 b.status == BeliefStatus::Active && now > b.review_after
1140 })
1141 .map(|b| b.id.clone())
1142 .collect();
1143 for id in stale_ids {
1144 if let Some(belief) = belief_store.get_mut(&id) {
1145 belief.decay();
1146 attn_queue.push(AttentionItem {
1147 id: Uuid::new_v4().to_string(),
1148 topic: format!("Stale belief: {}", belief.claim),
1149 topic_tags: vec![belief.belief_key.clone()],
1150 priority: 0.4,
1151 source_type: AttentionSource::StaleBelief,
1152 source_id: belief.id.clone(),
1153 assigned_persona: None,
1154 created_at: now,
1155 resolved_at: None,
1156 });
1157 }
1158 }
1159 }
1160
1161 {
1163 let belief_store = beliefs.read().await;
1164 let attn_queue = attention_queue.read().await;
1165
1166 let mut sorted_beliefs: Vec<&Belief> = belief_store
1167 .values()
1168 .filter(|b| b.status == BeliefStatus::Active)
1169 .collect();
1170 sorted_beliefs.sort_by(|a, b| {
1171 let score_a = a.confidence
1172 * (1.0
1173 / (1.0
1174 + now.signed_duration_since(a.updated_at).num_minutes()
1175 as f32));
1176 let score_b = b.confidence
1177 * (1.0
1178 / (1.0
1179 + now.signed_duration_since(b.updated_at).num_minutes()
1180 as f32));
1181 score_b
1182 .partial_cmp(&score_a)
1183 .unwrap_or(std::cmp::Ordering::Equal)
1184 });
1185
1186 let top_beliefs: Vec<String> = sorted_beliefs
1187 .iter()
1188 .take(10)
1189 .map(|b| b.id.clone())
1190 .collect();
1191 let top_uncertainties: Vec<String> = {
1192 let mut uncertain: Vec<&Belief> = belief_store
1193 .values()
1194 .filter(|b| {
1195 b.status == BeliefStatus::Stale
1196 || !b.contested_by.is_empty()
1197 })
1198 .collect();
1199 uncertain.sort_by(|a, b| {
1202 let a_contested = !a.contested_by.is_empty();
1203 let b_contested = !b.contested_by.is_empty();
1204 b_contested
1205 .cmp(&a_contested)
1206 .then_with(|| {
1207 a.confidence
1208 .partial_cmp(&b.confidence)
1209 .unwrap_or(std::cmp::Ordering::Equal)
1210 })
1211 .then_with(|| a.updated_at.cmp(&b.updated_at))
1212 });
1213 uncertain
1214 .iter()
1215 .take(5)
1216 .map(|b| {
1217 format!(
1218 "[{}] {}",
1219 b.belief_key,
1220 trim_for_storage(&b.claim, 80)
1221 )
1222 })
1223 .collect()
1224 };
1225
1226 let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1227 .iter()
1228 .filter(|a| a.resolved_at.is_none())
1229 .collect();
1230 sorted_attn.sort_by(|a, b| {
1231 b.priority
1232 .partial_cmp(&a.priority)
1233 .unwrap_or(std::cmp::Ordering::Equal)
1234 });
1235 let top_attention: Vec<String> =
1236 sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1237
1238 let mut ws = workspace.write().await;
1239 ws.top_beliefs = top_beliefs;
1240 ws.top_uncertainties = top_uncertainties;
1241 ws.top_attention = top_attention;
1242 ws.updated_at = now;
1243 }
1244
1245 new_events.push(ThoughtEvent {
1246 id: Uuid::new_v4().to_string(),
1247 event_type: ThoughtEventType::WorkspaceUpdated,
1248 persona_id: Some(work.persona_id.clone()),
1249 swarm_id: work.swarm_id.clone(),
1250 timestamp: Utc::now(),
1251 payload: json!({ "updated": true }),
1252 });
1253 }
1254 }
1255
1256 {
1258 let gov = governance.read().await;
1259 let mut proposal_store = proposals.write().await;
1260 let persona_map = personas.read().await;
1261
1262 let proposal_ids: Vec<String> = proposal_store
1263 .values()
1264 .filter(|p| p.status == ProposalStatus::Created)
1265 .map(|p| p.id.clone())
1266 .collect();
1267
1268 let mut attn_queue = attention_queue.write().await;
1269 for pid in proposal_ids {
1270 if let Some(proposal) = proposal_store.get_mut(&pid) {
1271 let quorum_needed = proposal.quorum_needed.max(1);
1272
1273 if let Some(deadline) = proposal.vote_deadline {
1275 if now > deadline {
1276 if proposal.votes.len() < quorum_needed {
1277 attn_queue.push(AttentionItem {
1279 id: Uuid::new_v4().to_string(),
1280 topic: format!(
1281 "Proposal vote timeout: {}",
1282 proposal.title
1283 ),
1284 topic_tags: Vec::new(),
1285 priority: 0.6,
1286 source_type: AttentionSource::ProposalTimeout,
1287 source_id: proposal.id.clone(),
1288 assigned_persona: None,
1289 created_at: now,
1290 resolved_at: None,
1291 });
1292 proposal.status = ProposalStatus::Rejected;
1293 continue;
1294 }
1295
1296 let required_roles = gov
1298 .required_approvers_by_role
1299 .get(&proposal.risk)
1300 .cloned()
1301 .unwrap_or_default();
1302 let all_required_met = required_roles.iter().all(|role| {
1303 proposal.votes.iter().any(|(vid, vote)| {
1304 *vote == ProposalVote::Approve
1305 && persona_map
1306 .get(vid)
1307 .map(|p| &p.identity.role == role)
1308 .unwrap_or(false)
1309 })
1310 });
1311 if !all_required_met {
1312 attn_queue.push(AttentionItem {
1313 id: Uuid::new_v4().to_string(),
1314 topic: format!(
1315 "Missing required approvers: {}",
1316 proposal.title
1317 ),
1318 topic_tags: Vec::new(),
1319 priority: 0.7,
1320 source_type: AttentionSource::ProposalTimeout,
1321 source_id: proposal.id.clone(),
1322 assigned_persona: None,
1323 created_at: now,
1324 resolved_at: None,
1325 });
1326 proposal.status = ProposalStatus::Rejected;
1327 continue;
1328 }
1329 }
1330 }
1331
1332 if proposal.votes.len() >= quorum_needed {
1334 let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1336 if *vote != ProposalVote::Veto {
1337 return false;
1338 }
1339 if let Some(voter) = persona_map.get(voter_id) {
1340 gov.veto_roles.contains(&voter.identity.role)
1341 } else {
1342 false
1343 }
1344 });
1345
1346 if vetoed {
1347 proposal.status = ProposalStatus::Rejected;
1348 continue;
1349 }
1350
1351 let required_roles = gov
1353 .required_approvers_by_role
1354 .get(&proposal.risk)
1355 .cloned()
1356 .unwrap_or_default();
1357 let all_required_met = required_roles.iter().all(|role| {
1358 proposal.votes.iter().any(|(vid, vote)| {
1359 *vote == ProposalVote::Approve
1360 && persona_map
1361 .get(vid)
1362 .map(|p| &p.identity.role == role)
1363 .unwrap_or(false)
1364 })
1365 });
1366
1367 if !all_required_met {
1368 continue; }
1370
1371 let approvals = proposal
1373 .votes
1374 .values()
1375 .filter(|v| **v == ProposalVote::Approve)
1376 .count();
1377 let rejections = proposal
1378 .votes
1379 .values()
1380 .filter(|v| **v == ProposalVote::Reject)
1381 .count();
1382
1383 if approvals > rejections {
1384 proposal.status = ProposalStatus::Verified;
1385 } else {
1386 proposal.status = ProposalStatus::Rejected;
1387 }
1388 }
1389 }
1390 }
1391 }
1392
1393 {
1395 let mut proposal_store = proposals.write().await;
1396 let verified_ids: Vec<String> = proposal_store
1397 .values()
1398 .filter(|p| p.status == ProposalStatus::Verified)
1399 .map(|p| p.id.clone())
1400 .collect();
1401
1402 for pid in verified_ids {
1403 if let Some(proposal) = proposal_store.get_mut(&pid) {
1404 if proposal.risk == ProposalRisk::Critical {
1405 let approved = {
1407 let approvals = pending_approvals.read().await;
1408 approvals.get(&pid).copied().unwrap_or(false)
1409 };
1410 if !approved {
1411 let mut approvals = pending_approvals.write().await;
1413 approvals.entry(pid.clone()).or_insert(false);
1414 continue;
1415 }
1416 }
1417
1418 let receipt = executor::DecisionReceipt {
1420 id: Uuid::new_v4().to_string(),
1421 proposal_id: pid.clone(),
1422 inputs: proposal.evidence_refs.clone(),
1423 governance_decision: format!(
1424 "Approved with {} votes",
1425 proposal.votes.len()
1426 ),
1427 capability_leases: Vec::new(),
1428 tool_invocations: Vec::new(),
1429 outcome: executor::ExecutionOutcome::Success {
1430 summary: format!("Proposal '{}' executed", proposal.title),
1431 },
1432 created_at: Utc::now(),
1433 };
1434
1435 new_events.push(ThoughtEvent {
1436 id: Uuid::new_v4().to_string(),
1437 event_type: ThoughtEventType::ActionExecuted,
1438 persona_id: Some(proposal.persona_id.clone()),
1439 swarm_id: None,
1440 timestamp: Utc::now(),
1441 payload: json!({
1442 "receipt_id": receipt.id,
1443 "proposal_id": pid,
1444 "outcome": "success",
1445 "summary": format!("Proposal '{}' executed", proposal.title),
1446 }),
1447 });
1448
1449 receipts.write().await.push(receipt);
1450 proposal.status = ProposalStatus::Executed;
1451 }
1452 }
1453 }
1454
1455 if !new_proposals.is_empty() {
1456 let mut proposal_store = proposals.write().await;
1457 for proposal in new_proposals {
1458 proposal_store.insert(proposal.id.clone(), proposal);
1459 }
1460 }
1461
1462 for event in new_events {
1463 push_event_internal(&events, max_events, &event_tx, event).await;
1464 }
1465 for snapshot in new_snapshots {
1466 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1467 }
1468
1469 {
1471 let mut map = personas.write().await;
1472 let idle_ids: Vec<String> = map
1473 .values()
1474 .filter(|p| {
1475 p.status == PersonaStatus::Active
1476 && !p.budget_paused
1477 && now.signed_duration_since(p.last_progress_at).num_seconds()
1478 > p.policy.idle_ttl_secs as i64
1479 })
1480 .map(|p| p.identity.id.clone())
1481 .collect();
1482
1483 for id in &idle_ids {
1484 if let Some(persona) = map.get_mut(id) {
1485 persona.status = PersonaStatus::Reaped;
1486 persona.updated_at = now;
1487 }
1488 let children: Vec<String> = map
1490 .values()
1491 .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1492 .map(|p| p.identity.id.clone())
1493 .collect();
1494 for child_id in children {
1495 if let Some(child) = map.get_mut(&child_id) {
1496 child.status = PersonaStatus::Reaped;
1497 child.updated_at = now;
1498 }
1499 }
1500 }
1501 drop(map);
1502
1503 for id in idle_ids {
1504 push_event_internal(
1505 &events,
1506 max_events,
1507 &event_tx,
1508 ThoughtEvent {
1509 id: Uuid::new_v4().to_string(),
1510 event_type: ThoughtEventType::IdleReaped,
1511 persona_id: Some(id),
1512 swarm_id: None,
1513 timestamp: now,
1514 payload: json!({ "reason": "idle_ttl_expired" }),
1515 },
1516 )
1517 .await;
1518 }
1519 }
1520
1521 if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1523 let _ = persistence::save_state(
1524 &personas,
1525 &proposals,
1526 &beliefs,
1527 &attention_queue,
1528 &workspace,
1529 &events,
1530 &snapshots,
1531 )
1532 .await;
1533 }
1534
1535 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1536 next_tick += interval;
1537 let tick_completed = Instant::now();
1538 if tick_completed > next_tick {
1539 next_tick = tick_completed;
1540 }
1541 }
1542 });
1543
1544 {
1545 let mut lock = self.loop_handle.lock().await;
1546 *lock = Some(handle);
1547 }
1548
1549 Ok(self.status().await)
1550 }
1551
1552 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1554 self.running.store(false, Ordering::SeqCst);
1555
1556 if let Some(handle) = self.loop_handle.lock().await.take() {
1557 handle.abort();
1558 let _ = handle.await;
1559 }
1560
1561 if let Some(reason_value) = reason {
1562 let event = ThoughtEvent {
1563 id: Uuid::new_v4().to_string(),
1564 event_type: ThoughtEventType::CheckResult,
1565 persona_id: None,
1566 swarm_id: None,
1567 timestamp: Utc::now(),
1568 payload: json!({ "stopped": true, "reason": reason_value }),
1569 };
1570 self.push_event(event).await;
1571 }
1572
1573 Ok(self.status().await)
1574 }
1575
1576 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1578 let now = Utc::now();
1579 let mut personas = self.personas.write().await;
1580
1581 let mut parent_swarm_id = None;
1582 let mut computed_depth = 0_u32;
1583 let mut inherited_policy = None;
1584
1585 if let Some(parent_id) = req.parent_id.clone() {
1586 let parent = personas
1587 .get(&parent_id)
1588 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1589
1590 if parent.status == PersonaStatus::Reaped {
1591 return Err(anyhow!("Parent persona {} is reaped", parent_id));
1592 }
1593
1594 parent_swarm_id = parent.identity.swarm_id.clone();
1595 computed_depth = parent.identity.depth.saturating_add(1);
1596 inherited_policy = Some(parent.policy.clone());
1597 let branch_limit = parent.policy.max_branching_factor;
1598
1599 let child_count = personas
1600 .values()
1601 .filter(|p| {
1602 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1603 && p.status != PersonaStatus::Reaped
1604 })
1605 .count();
1606
1607 if child_count as u32 >= branch_limit {
1608 return Err(anyhow!(
1609 "Parent {} reached branching limit {}",
1610 parent_id,
1611 branch_limit
1612 ));
1613 }
1614 }
1615
1616 let policy = req
1617 .policy
1618 .clone()
1619 .or(inherited_policy.clone())
1620 .unwrap_or_else(|| self.default_policy.clone());
1621
1622 let effective_depth_limit = inherited_policy
1623 .as_ref()
1624 .map(|p| p.max_spawn_depth)
1625 .unwrap_or(policy.max_spawn_depth);
1626
1627 if computed_depth > effective_depth_limit {
1628 return Err(anyhow!(
1629 "Spawn depth {} exceeds limit {}",
1630 computed_depth,
1631 effective_depth_limit
1632 ));
1633 }
1634
1635 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1636 if personas.contains_key(&persona_id) {
1637 return Err(anyhow!("Persona id already exists: {}", persona_id));
1638 }
1639
1640 let identity = PersonaIdentity {
1641 id: persona_id.clone(),
1642 name: req.name,
1643 role: req.role,
1644 charter: req.charter,
1645 swarm_id: req.swarm_id.or(parent_swarm_id),
1646 parent_id: req.parent_id,
1647 depth: computed_depth,
1648 created_at: now,
1649 tags: req.tags,
1650 };
1651
1652 let persona = PersonaRuntimeState {
1653 identity,
1654 policy,
1655 status: PersonaStatus::Active,
1656 thought_count: 0,
1657 last_tick_at: None,
1658 updated_at: now,
1659 tokens_this_window: 0,
1660 compute_ms_this_window: 0,
1661 window_started_at: now,
1662 last_progress_at: now,
1663 budget_paused: false,
1664 };
1665
1666 personas.insert(persona_id, persona.clone());
1667 drop(personas);
1668
1669 self.push_event(ThoughtEvent {
1670 id: Uuid::new_v4().to_string(),
1671 event_type: ThoughtEventType::PersonaSpawned,
1672 persona_id: Some(persona.identity.id.clone()),
1673 swarm_id: persona.identity.swarm_id.clone(),
1674 timestamp: now,
1675 payload: json!({
1676 "name": persona.identity.name,
1677 "role": persona.identity.role,
1678 "depth": persona.identity.depth,
1679 }),
1680 })
1681 .await;
1682
1683 Ok(persona)
1684 }
1685
1686 pub async fn spawn_child(
1688 &self,
1689 parent_id: &str,
1690 req: SpawnPersonaRequest,
1691 ) -> Result<PersonaRuntimeState> {
1692 let request = CreatePersonaRequest {
1693 persona_id: req.persona_id,
1694 name: req.name,
1695 role: req.role,
1696 charter: req.charter,
1697 swarm_id: req.swarm_id,
1698 parent_id: Some(parent_id.to_string()),
1699 policy: req.policy,
1700 tags: Vec::new(),
1701 };
1702 self.create_persona(request).await
1703 }
1704
1705 pub async fn reap_persona(
1707 &self,
1708 persona_id: &str,
1709 req: ReapPersonaRequest,
1710 ) -> Result<ReapPersonaResponse> {
1711 let cascade = req.cascade.unwrap_or(false);
1712 let now = Utc::now();
1713
1714 let mut personas = self.personas.write().await;
1715 if !personas.contains_key(persona_id) {
1716 return Err(anyhow!("Persona not found: {}", persona_id));
1717 }
1718
1719 let mut reaped_ids = vec![persona_id.to_string()];
1720 if cascade {
1721 let mut idx = 0usize;
1722 while idx < reaped_ids.len() {
1723 let current = reaped_ids[idx].clone();
1724 let children: Vec<String> = personas
1725 .values()
1726 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1727 .map(|p| p.identity.id.clone())
1728 .collect();
1729 for child in children {
1730 if !reaped_ids.iter().any(|existing| existing == &child) {
1731 reaped_ids.push(child);
1732 }
1733 }
1734 idx += 1;
1735 }
1736 }
1737
1738 for id in &reaped_ids {
1739 if let Some(persona) = personas.get_mut(id) {
1740 persona.status = PersonaStatus::Reaped;
1741 persona.updated_at = now;
1742 }
1743 }
1744 drop(personas);
1745
1746 for id in &reaped_ids {
1747 self.push_event(ThoughtEvent {
1748 id: Uuid::new_v4().to_string(),
1749 event_type: ThoughtEventType::PersonaReaped,
1750 persona_id: Some(id.clone()),
1751 swarm_id: None,
1752 timestamp: now,
1753 payload: json!({
1754 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1755 "cascade": cascade,
1756 }),
1757 })
1758 .await;
1759 }
1760
1761 Ok(ReapPersonaResponse {
1762 count: reaped_ids.len(),
1763 reaped_ids,
1764 })
1765 }
1766
1767 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1769 self.snapshots.read().await.back().cloned()
1770 }
1771
1772 pub async fn lineage_graph(&self) -> LineageGraph {
1774 let personas = self.personas.read().await;
1775 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1776 let mut roots = Vec::new();
1777 let mut total_edges = 0usize;
1778
1779 for persona in personas.values() {
1780 if let Some(parent_id) = persona.identity.parent_id.clone() {
1781 children_by_parent
1782 .entry(parent_id)
1783 .or_default()
1784 .push(persona.identity.id.clone());
1785 total_edges = total_edges.saturating_add(1);
1786 } else {
1787 roots.push(persona.identity.id.clone());
1788 }
1789 }
1790
1791 let mut nodes: Vec<LineageNode> = personas
1792 .values()
1793 .map(|persona| {
1794 let mut children = children_by_parent
1795 .get(&persona.identity.id)
1796 .cloned()
1797 .unwrap_or_default();
1798 children.sort();
1799
1800 LineageNode {
1801 persona_id: persona.identity.id.clone(),
1802 parent_id: persona.identity.parent_id.clone(),
1803 children,
1804 depth: persona.identity.depth,
1805 status: persona.status,
1806 }
1807 })
1808 .collect();
1809
1810 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1811 roots.sort();
1812
1813 LineageGraph {
1814 nodes,
1815 roots,
1816 total_edges,
1817 }
1818 }
1819
1820 pub async fn status(&self) -> CognitionStatus {
1822 let personas = self.personas.read().await;
1823 let events = self.events.read().await;
1824 let snapshots = self.snapshots.read().await;
1825 let started_at = *self.started_at.read().await;
1826 let last_tick_at = *self.last_tick_at.read().await;
1827 let loop_interval_ms = *self.loop_interval_ms.read().await;
1828
1829 let active_persona_count = personas
1830 .values()
1831 .filter(|p| p.status == PersonaStatus::Active)
1832 .count();
1833
1834 CognitionStatus {
1835 enabled: self.enabled,
1836 running: self.running.load(Ordering::SeqCst),
1837 loop_interval_ms,
1838 started_at,
1839 last_tick_at,
1840 persona_count: personas.len(),
1841 active_persona_count,
1842 events_buffered: events.len(),
1843 snapshots_buffered: snapshots.len(),
1844 }
1845 }
1846
1847 async fn push_event(&self, event: ThoughtEvent) {
1848 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1849 }
1850
1851 pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1853 self.tools = Some(registry);
1854 }
1855
1856 pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1858 self.beliefs.read().await.clone()
1859 }
1860
1861 pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1863 self.beliefs.read().await.get(id).cloned()
1864 }
1865
1866 pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1868 self.attention_queue.read().await.clone()
1869 }
1870
1871 pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1873 self.proposals.read().await.clone()
1874 }
1875
1876 pub async fn get_workspace(&self) -> GlobalWorkspace {
1878 self.workspace.read().await.clone()
1879 }
1880
1881 pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1883 self.receipts.read().await.clone()
1884 }
1885
1886 pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1888 let proposals = self.proposals.read().await;
1889 let proposal = proposals
1890 .get(proposal_id)
1891 .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1892
1893 if proposal.risk != ProposalRisk::Critical {
1894 return Err(anyhow!("Only Critical proposals require human approval"));
1895 }
1896 if proposal.status != ProposalStatus::Verified {
1897 return Err(anyhow!("Proposal is not in Verified status"));
1898 }
1899 drop(proposals);
1900
1901 let mut approvals = self.pending_approvals.write().await;
1902 approvals.insert(proposal_id.to_string(), true);
1903 Ok(())
1904 }
1905
1906 pub async fn get_governance(&self) -> SwarmGovernance {
1908 self.governance.read().await.clone()
1909 }
1910
1911 pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1913 self.personas.read().await.get(id).cloned()
1914 }
1915}
1916
1917async fn push_event_internal(
1918 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1919 max_events: usize,
1920 event_tx: &broadcast::Sender<ThoughtEvent>,
1921 event: ThoughtEvent,
1922) {
1923 {
1924 let mut lock = events.write().await;
1925 lock.push_back(event.clone());
1926 while lock.len() > max_events {
1927 lock.pop_front();
1928 }
1929 }
1930 let _ = event_tx.send(event);
1931}
1932
1933async fn push_snapshot_internal(
1934 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1935 max_snapshots: usize,
1936 snapshot: MemorySnapshot,
1937) {
1938 let mut lock = snapshots.write().await;
1939 lock.push_back(snapshot);
1940 while lock.len() > max_snapshots {
1941 lock.pop_front();
1942 }
1943}
1944
1945async fn recent_persona_context(
1946 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1947 persona_id: &str,
1948 limit: usize,
1949) -> Vec<ThoughtEvent> {
1950 let lock = events.read().await;
1951 let mut selected: Vec<ThoughtEvent> = lock
1952 .iter()
1953 .rev()
1954 .filter(|event| {
1955 event.persona_id.as_deref() == Some(persona_id)
1956 || (event.persona_id.is_none()
1957 && matches!(
1958 event.event_type,
1959 ThoughtEventType::CheckResult
1960 | ThoughtEventType::ProposalCreated
1961 | ThoughtEventType::SnapshotCompressed
1962 | ThoughtEventType::WorkspaceUpdated
1963 | ThoughtEventType::ActionExecuted
1964 | ThoughtEventType::BudgetPaused
1965 ))
1966 })
1967 .take(limit)
1968 .cloned()
1969 .collect();
1970 selected.reverse();
1971 selected
1972}
1973
1974async fn generate_phase_thought(
1975 thinker: Option<&ThinkerClient>,
1976 work: &ThoughtWorkItem,
1977 context: &[ThoughtEvent],
1978) -> ThoughtResult {
1979 let started_at = Instant::now();
1980 if let Some(client) = thinker {
1981 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1982 match client.think(&system_prompt, &user_prompt).await {
1983 Ok(output) => {
1984 let thinking = normalize_thought_output(work, context, &output.text);
1985 if !thinking.is_empty() {
1986 return ThoughtResult {
1987 source: "model",
1988 model: Some(output.model),
1989 finish_reason: output.finish_reason,
1990 thinking,
1991 prompt_tokens: output.prompt_tokens,
1992 completion_tokens: output.completion_tokens,
1993 total_tokens: output.total_tokens,
1994 latency_ms: started_at.elapsed().as_millis(),
1995 error: None,
1996 };
1997 }
1998 }
1999 Err(error) => {
2000 return ThoughtResult {
2001 source: "fallback",
2002 model: None,
2003 finish_reason: None,
2004 thinking: fallback_phase_text(work, context),
2005 prompt_tokens: None,
2006 completion_tokens: None,
2007 total_tokens: None,
2008 latency_ms: started_at.elapsed().as_millis(),
2009 error: Some(error.to_string()),
2010 };
2011 }
2012 }
2013 }
2014
2015 ThoughtResult {
2016 source: "fallback",
2017 model: None,
2018 finish_reason: None,
2019 thinking: fallback_phase_text(work, context),
2020 prompt_tokens: None,
2021 completion_tokens: None,
2022 total_tokens: None,
2023 latency_ms: started_at.elapsed().as_millis(),
2024 error: None,
2025 }
2026}
2027
2028fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
2029 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
2030Respond with concise plain text only. Do not include markdown, XML, or code fences. \
2031Write as an operational process update, not meta narration. \
2032Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
2033Output concrete findings, checks, risks, and next actions. \
2034Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
2035 .to_string();
2036
2037 let context_lines = if context.is_empty() {
2038 "none".to_string()
2039 } else {
2040 context
2041 .iter()
2042 .map(format_context_event)
2043 .collect::<Vec<_>>()
2044 .join("\n")
2045 };
2046
2047 let phase_instruction = match work.phase {
2048 ThoughtPhase::Observe => {
2049 "Process format (exact line labels): \
2050Phase: Observe | Goal: detect current customer/business risk | \
2051Signals: 1-3 concrete signals separated by '; ' | \
2052Uncertainty: one unknown that blocks confidence | \
2053Next_Action: one immediate operational action."
2054 }
2055 ThoughtPhase::Reflect => {
2056 "Process format (exact line labels): \
2057Phase: Reflect | Hypothesis: single testable hypothesis | \
2058Rationale: why this is likely | \
2059Business_Risk: customer/revenue/SLA impact | \
2060Validation_Next_Action: one action to confirm or falsify."
2061 }
2062 ThoughtPhase::Test => {
2063 "Process format (exact line labels): \
2064Phase: Test | Check: single concrete check | \
2065Procedure: short executable procedure | \
2066Expected_Result: pass/fail expectation | \
2067Evidence_Quality: low|medium|high with reason | \
2068Escalation_Trigger: when to escalate immediately."
2069 }
2070 ThoughtPhase::Compress => {
2071 "Process format (exact line labels): \
2072Phase: Compress | State_Summary: current state in one line | \
2073Retained_Facts: 3 short facts separated by '; ' | \
2074Open_Risks: up to 2 unresolved risks separated by '; ' | \
2075Next_Process_Step: next operational step."
2076 }
2077 };
2078
2079 let user_prompt = format!(
2080 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2081 phase = work.phase.as_str(),
2082 persona_id = work.persona_id,
2083 persona_name = work.persona_name,
2084 role = work.role,
2085 charter = work.charter,
2086 thought_count = work.thought_count,
2087 context = context_lines,
2088 instruction = phase_instruction
2089 );
2090
2091 (system_prompt, user_prompt)
2092}
2093
2094fn format_context_event(event: &ThoughtEvent) -> String {
2095 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2096 format!(
2097 "{} {} {}",
2098 event.event_type.as_str(),
2099 event.timestamp.to_rfc3339(),
2100 trim_for_storage(&payload, 220)
2101 )
2102}
2103
2104fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2105 let charter = trim_for_storage(&work.charter, 180);
2106 let context_summary = fallback_context_summary(context);
2107 let thought = match work.phase {
2108 ThoughtPhase::Observe => format!(
2109 "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2110 work.role, charter, context_summary
2111 ),
2112 ThoughtPhase::Reflect => format!(
2113 "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
2114 ),
2115 ThoughtPhase::Test => format!(
2116 "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
2117 ),
2118 ThoughtPhase::Compress => format!(
2119 "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2120 work.role, charter, context_summary
2121 ),
2122 };
2123 trim_for_storage(&thought, 1_200)
2124}
2125
2126fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2127 let trimmed = trim_for_storage(raw, 2_000);
2128 if trimmed.trim().is_empty() {
2129 return fallback_phase_text(work, context);
2130 }
2131
2132 if let Some(idx) = find_process_label_start(&trimmed) {
2134 let candidate = trimmed[idx..].trim();
2135 if !candidate.is_empty()
2137 && !candidate.contains('<')
2138 && !has_template_placeholder_values(candidate)
2139 {
2140 let collapsed: String = candidate
2142 .lines()
2143 .map(str::trim)
2144 .filter(|l| !l.is_empty())
2145 .collect::<Vec<_>>()
2146 .join(" | ");
2147 let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2148 if cleaned.starts_with("Phase:") {
2149 return cleaned.to_string();
2150 }
2151 return collapsed;
2152 }
2153 }
2154
2155 let lower = trimmed.to_ascii_lowercase();
2156 let looks_meta = lower.starts_with("we need")
2157 || lower.starts_with("i need")
2158 || lower.contains("we need to")
2159 || lower.contains("i need to")
2160 || lower.contains("must output")
2161 || lower.contains("let's ")
2162 || lower.contains("we have to");
2163
2164 if looks_meta || has_template_placeholder_values(&trimmed) {
2165 return fallback_phase_text(work, context);
2166 }
2167
2168 trimmed
2169}
2170
2171fn has_template_placeholder_values(text: &str) -> bool {
2172 let lower = text.to_ascii_lowercase();
2173 [
2174 "goal: ...",
2175 "signals: ...",
2176 "uncertainty: ...",
2177 "next_action: ...",
2178 "hypothesis: ...",
2179 "rationale: ...",
2180 "business_risk: ...",
2181 "validation_next_action: ...",
2182 "check: ...",
2183 "procedure: ...",
2184 "expected_result: ...",
2185 "evidence_quality: ...",
2186 "escalation_trigger: ...",
2187 "state_summary: ...",
2188 "retained_facts: ...",
2189 "open_risks: ...",
2190 "next_process_step: ...",
2191 ]
2192 .iter()
2193 .any(|needle| lower.contains(needle))
2194 || lower.contains("<...")
2195 || lower.contains("tbd")
2196 || lower.contains("todo")
2197}
2198
2199fn find_process_label_start(text: &str) -> Option<usize> {
2200 [
2201 "Phase: Observe",
2202 "Phase: Reflect",
2203 "Phase: Test",
2204 "Phase: Compress",
2205 "Phase:",
2206 ]
2207 .iter()
2208 .filter_map(|label| text.find(label))
2209 .min()
2210}
2211
2212fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2213 if context.is_empty() {
2214 return "No prior events recorded yet.".to_string();
2215 }
2216
2217 let mut latest_error: Option<String> = None;
2218 let mut latest_proposal: Option<String> = None;
2219 let mut latest_check: Option<String> = None;
2220
2221 for event in context.iter().rev() {
2222 if latest_error.is_none()
2223 && let Some(error) = event
2224 .payload
2225 .get("error")
2226 .and_then(serde_json::Value::as_str)
2227 && !error.trim().is_empty()
2228 {
2229 latest_error = Some(trim_for_storage(error, 140));
2230 }
2231
2232 if latest_proposal.is_none()
2233 && event.event_type == ThoughtEventType::ProposalCreated
2234 && let Some(title) = event
2235 .payload
2236 .get("title")
2237 .and_then(serde_json::Value::as_str)
2238 && !title.trim().is_empty()
2239 && !has_template_placeholder_values(title)
2240 {
2241 latest_proposal = Some(trim_for_storage(title, 120));
2242 }
2243
2244 if latest_check.is_none()
2245 && event.event_type == ThoughtEventType::CheckResult
2246 && let Some(result) = event
2247 .payload
2248 .get("result_excerpt")
2249 .and_then(serde_json::Value::as_str)
2250 && !result.trim().is_empty()
2251 && !has_template_placeholder_values(result)
2252 {
2253 latest_check = Some(trim_for_storage(result, 140));
2254 }
2255
2256 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2257 break;
2258 }
2259 }
2260
2261 let mut lines = vec![format!(
2262 "{} recent cognition events are available.",
2263 context.len()
2264 )];
2265 if let Some(error) = latest_error {
2266 lines.push(format!("Latest error signal: {}.", error));
2267 }
2268 if let Some(proposal) = latest_proposal {
2269 lines.push(format!("Recent proposal: {}.", proposal));
2270 }
2271 if let Some(check) = latest_check {
2272 lines.push(format!("Recent check: {}.", check));
2273 }
2274 lines.join(" ")
2275}
2276
2277fn trim_for_storage(input: &str, max_chars: usize) -> String {
2278 if input.chars().count() <= max_chars {
2279 return input.trim().to_string();
2280 }
2281 let mut trimmed = String::with_capacity(max_chars + 8);
2282 for ch in input.chars().take(max_chars) {
2283 trimmed.push(ch);
2284 }
2285 trimmed.push_str("...");
2286 trimmed.trim().to_string()
2287}
2288
2289fn estimate_fact_count(text: &str) -> usize {
2290 let sentence_count =
2291 text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2292 sentence_count.clamp(1, 12)
2293}
2294
2295fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2296 let first_line = thought
2297 .lines()
2298 .find(|line| !line.trim().is_empty())
2299 .unwrap_or("proposal");
2300 let compact = first_line
2301 .replace(['\t', '\r', '\n'], " ")
2302 .split_whitespace()
2303 .collect::<Vec<_>>()
2304 .join(" ");
2305 let trimmed = trim_for_storage(&compact, 72);
2306 if trimmed.is_empty() {
2307 format!("proposal-{}", thought_count)
2308 } else {
2309 trimmed
2310 }
2311}
2312
2313fn default_seed_persona() -> CreatePersonaRequest {
2314 CreatePersonaRequest {
2315 persona_id: Some("root-thinker".to_string()),
2316 name: "root-thinker".to_string(),
2317 role: "orchestrator".to_string(),
2318 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2319 .to_string(),
2320 swarm_id: Some("swarm-core".to_string()),
2321 parent_id: None,
2322 policy: None,
2323 tags: vec!["orchestration".to_string()],
2324 }
2325}
2326
2327fn normalize_thinker_endpoint(base_url: &str) -> String {
2328 let trimmed = base_url.trim().trim_end_matches('/');
2329 if trimmed.ends_with("/chat/completions") {
2330 return trimmed.to_string();
2331 }
2332 if trimmed.is_empty() {
2333 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2334 }
2335 format!("{}/chat/completions", trimmed)
2336}
2337
2338fn env_bool(name: &str, default: bool) -> bool {
2339 std::env::var(name)
2340 .ok()
2341 .and_then(|v| match v.to_ascii_lowercase().as_str() {
2342 "1" | "true" | "yes" | "on" => Some(true),
2343 "0" | "false" | "no" | "off" => Some(false),
2344 _ => None,
2345 })
2346 .unwrap_or(default)
2347}
2348
2349fn env_f32(name: &str, default: f32) -> f32 {
2350 std::env::var(name)
2351 .ok()
2352 .and_then(|v| v.parse::<f32>().ok())
2353 .unwrap_or(default)
2354}
2355
2356fn env_u64(name: &str, default: u64) -> u64 {
2357 std::env::var(name)
2358 .ok()
2359 .and_then(|v| v.parse::<u64>().ok())
2360 .unwrap_or(default)
2361}
2362
2363fn env_u32(name: &str, default: u32) -> u32 {
2364 std::env::var(name)
2365 .ok()
2366 .and_then(|v| v.parse::<u32>().ok())
2367 .unwrap_or(default)
2368}
2369
2370fn env_usize(name: &str, default: usize) -> usize {
2371 std::env::var(name)
2372 .ok()
2373 .and_then(|v| v.parse::<usize>().ok())
2374 .unwrap_or(default)
2375}
2376
2377#[cfg(test)]
2378mod tests {
2379 use super::*;
2380
2381 fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2382 ThoughtWorkItem {
2383 persona_id: "p-1".to_string(),
2384 persona_name: "Spotlessbinco Business Thinker".to_string(),
2385 role: "principal reliability engineer".to_string(),
2386 charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2387 .to_string(),
2388 swarm_id: Some("spotlessbinco".to_string()),
2389 thought_count: 4,
2390 phase,
2391 }
2392 }
2393
2394 fn test_runtime() -> CognitionRuntime {
2395 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2396 enabled: true,
2397 loop_interval_ms: 25,
2398 max_events: 256,
2399 max_snapshots: 32,
2400 default_policy: PersonaPolicy {
2401 max_spawn_depth: 2,
2402 max_branching_factor: 2,
2403 token_budget_per_minute: 1_000,
2404 compute_ms_per_minute: 1_000,
2405 idle_ttl_secs: 300,
2406 share_memory: false,
2407 allowed_tools: Vec::new(),
2408 },
2409 })
2410 }
2411
2412 #[test]
2413 fn normalize_rejects_placeholder_process_line() {
2414 let work = sample_work_item(ThoughtPhase::Compress);
2415 let output = normalize_thought_output(
2416 &work,
2417 &[],
2418 "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2419 );
2420 assert!(
2421 output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2422 );
2423 assert!(!output.contains("State_Summary: ..."));
2424 }
2425
2426 #[test]
2427 fn normalize_accepts_concrete_process_line() {
2428 let work = sample_work_item(ThoughtPhase::Test);
2429 let output = normalize_thought_output(
2430 &work,
2431 &[],
2432 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2433 );
2434 assert_eq!(
2435 output,
2436 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2437 );
2438 }
2439
2440 #[tokio::test]
2441 async fn create_spawn_and_lineage_work() {
2442 let runtime = test_runtime();
2443
2444 let root = runtime
2445 .create_persona(CreatePersonaRequest {
2446 persona_id: Some("root".to_string()),
2447 name: "root".to_string(),
2448 role: "orchestrator".to_string(),
2449 charter: "coordinate".to_string(),
2450 swarm_id: Some("swarm-a".to_string()),
2451 parent_id: None,
2452 policy: None,
2453 tags: Vec::new(),
2454 })
2455 .await
2456 .expect("root should be created");
2457
2458 assert_eq!(root.identity.depth, 0);
2459
2460 let child = runtime
2461 .spawn_child(
2462 "root",
2463 SpawnPersonaRequest {
2464 persona_id: Some("child-1".to_string()),
2465 name: "child-1".to_string(),
2466 role: "analyst".to_string(),
2467 charter: "analyze".to_string(),
2468 swarm_id: None,
2469 policy: None,
2470 },
2471 )
2472 .await
2473 .expect("child should spawn");
2474
2475 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2476 assert_eq!(child.identity.depth, 1);
2477
2478 let lineage = runtime.lineage_graph().await;
2479 assert_eq!(lineage.total_edges, 1);
2480 assert_eq!(lineage.roots, vec!["root".to_string()]);
2481 }
2482
2483 #[tokio::test]
2484 async fn branching_and_depth_limits_are_enforced() {
2485 let runtime = test_runtime();
2486
2487 runtime
2488 .create_persona(CreatePersonaRequest {
2489 persona_id: Some("root".to_string()),
2490 name: "root".to_string(),
2491 role: "orchestrator".to_string(),
2492 charter: "coordinate".to_string(),
2493 swarm_id: Some("swarm-a".to_string()),
2494 parent_id: None,
2495 policy: None,
2496 tags: Vec::new(),
2497 })
2498 .await
2499 .expect("root should be created");
2500
2501 runtime
2502 .spawn_child(
2503 "root",
2504 SpawnPersonaRequest {
2505 persona_id: Some("c1".to_string()),
2506 name: "c1".to_string(),
2507 role: "worker".to_string(),
2508 charter: "run".to_string(),
2509 swarm_id: None,
2510 policy: None,
2511 },
2512 )
2513 .await
2514 .expect("first child should spawn");
2515
2516 runtime
2517 .spawn_child(
2518 "root",
2519 SpawnPersonaRequest {
2520 persona_id: Some("c2".to_string()),
2521 name: "c2".to_string(),
2522 role: "worker".to_string(),
2523 charter: "run".to_string(),
2524 swarm_id: None,
2525 policy: None,
2526 },
2527 )
2528 .await
2529 .expect("second child should spawn");
2530
2531 let third_child = runtime
2532 .spawn_child(
2533 "root",
2534 SpawnPersonaRequest {
2535 persona_id: Some("c3".to_string()),
2536 name: "c3".to_string(),
2537 role: "worker".to_string(),
2538 charter: "run".to_string(),
2539 swarm_id: None,
2540 policy: None,
2541 },
2542 )
2543 .await;
2544 assert!(third_child.is_err());
2545
2546 runtime
2547 .spawn_child(
2548 "c1",
2549 SpawnPersonaRequest {
2550 persona_id: Some("c1-1".to_string()),
2551 name: "c1-1".to_string(),
2552 role: "worker".to_string(),
2553 charter: "run".to_string(),
2554 swarm_id: None,
2555 policy: None,
2556 },
2557 )
2558 .await
2559 .expect("depth 2 should be allowed");
2560
2561 let depth_violation = runtime
2562 .spawn_child(
2563 "c1-1",
2564 SpawnPersonaRequest {
2565 persona_id: Some("c1-1-1".to_string()),
2566 name: "c1-1-1".to_string(),
2567 role: "worker".to_string(),
2568 charter: "run".to_string(),
2569 swarm_id: None,
2570 policy: None,
2571 },
2572 )
2573 .await;
2574 assert!(depth_violation.is_err());
2575 }
2576
2577 #[tokio::test]
2578 async fn start_stop_updates_runtime_status() {
2579 let runtime = test_runtime();
2580
2581 runtime
2582 .start(Some(StartCognitionRequest {
2583 loop_interval_ms: Some(10),
2584 seed_persona: Some(CreatePersonaRequest {
2585 persona_id: Some("seed".to_string()),
2586 name: "seed".to_string(),
2587 role: "watcher".to_string(),
2588 charter: "observe".to_string(),
2589 swarm_id: Some("swarm-seed".to_string()),
2590 parent_id: None,
2591 policy: None,
2592 tags: Vec::new(),
2593 }),
2594 }))
2595 .await
2596 .expect("runtime should start");
2597
2598 tokio::time::sleep(Duration::from_millis(60)).await;
2599 let running_status = runtime.status().await;
2600 assert!(running_status.running);
2601 assert!(running_status.events_buffered > 0);
2602
2603 runtime
2604 .stop(Some("test".to_string()))
2605 .await
2606 .expect("runtime should stop");
2607 let stopped_status = runtime.status().await;
2608 assert!(!stopped_status.running);
2609 }
2610
2611 #[tokio::test]
2612 async fn zero_budget_persona_is_skipped() {
2613 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2614 enabled: true,
2615 loop_interval_ms: 10,
2616 max_events: 256,
2617 max_snapshots: 32,
2618 default_policy: PersonaPolicy {
2619 max_spawn_depth: 2,
2620 max_branching_factor: 2,
2621 token_budget_per_minute: 0,
2622 compute_ms_per_minute: 0,
2623 idle_ttl_secs: 3_600,
2624 share_memory: false,
2625 allowed_tools: Vec::new(),
2626 },
2627 });
2628
2629 let persona = runtime
2630 .create_persona(CreatePersonaRequest {
2631 persona_id: Some("budget-test".to_string()),
2632 name: "budget-test".to_string(),
2633 role: "tester".to_string(),
2634 charter: "test budgets".to_string(),
2635 swarm_id: None,
2636 parent_id: None,
2637 policy: None,
2638 tags: Vec::new(),
2639 })
2640 .await
2641 .expect("should create persona");
2642
2643 assert_eq!(persona.tokens_this_window, 0);
2644 assert_eq!(persona.compute_ms_this_window, 0);
2645
2646 runtime.start(None).await.expect("should start");
2648 tokio::time::sleep(Duration::from_millis(50)).await;
2649 runtime.stop(None).await.expect("should stop");
2650
2651 let p = runtime.get_persona("budget-test").await.unwrap();
2653 assert_eq!(p.thought_count, 0);
2654 assert!(p.budget_paused);
2655 }
2656
2657 #[tokio::test]
2658 async fn budget_counters_reset_after_window() {
2659 let now = Utc::now();
2660 let mut persona = PersonaRuntimeState {
2661 identity: PersonaIdentity {
2662 id: "p1".to_string(),
2663 name: "test".to_string(),
2664 role: "tester".to_string(),
2665 charter: "test".to_string(),
2666 swarm_id: None,
2667 parent_id: None,
2668 depth: 0,
2669 created_at: now,
2670 tags: Vec::new(),
2671 },
2672 policy: PersonaPolicy::default(),
2673 status: PersonaStatus::Active,
2674 thought_count: 0,
2675 last_tick_at: None,
2676 updated_at: now,
2677 tokens_this_window: 5000,
2678 compute_ms_this_window: 3000,
2679 window_started_at: now - ChronoDuration::seconds(61),
2680 last_progress_at: now,
2681 budget_paused: false,
2682 };
2683
2684 let window_elapsed = now
2686 .signed_duration_since(persona.window_started_at)
2687 .num_seconds();
2688 assert!(window_elapsed >= 60);
2689
2690 persona.tokens_this_window = 0;
2692 persona.compute_ms_this_window = 0;
2693 persona.window_started_at = now;
2694
2695 assert_eq!(persona.tokens_this_window, 0);
2696 assert_eq!(persona.compute_ms_this_window, 0);
2697 }
2698
2699 #[tokio::test]
2700 async fn idle_persona_is_reaped() {
2701 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2702 enabled: true,
2703 loop_interval_ms: 10,
2704 max_events: 256,
2705 max_snapshots: 32,
2706 default_policy: PersonaPolicy {
2707 max_spawn_depth: 2,
2708 max_branching_factor: 2,
2709 token_budget_per_minute: 20_000,
2710 compute_ms_per_minute: 10_000,
2711 idle_ttl_secs: 0, share_memory: false,
2713 allowed_tools: Vec::new(),
2714 },
2715 });
2716
2717 runtime
2718 .create_persona(CreatePersonaRequest {
2719 persona_id: Some("idle-test".to_string()),
2720 name: "idle-test".to_string(),
2721 role: "idler".to_string(),
2722 charter: "idle away".to_string(),
2723 swarm_id: None,
2724 parent_id: None,
2725 policy: None,
2726 tags: Vec::new(),
2727 })
2728 .await
2729 .expect("should create persona");
2730
2731 {
2733 let mut personas = runtime.personas.write().await;
2734 if let Some(p) = personas.get_mut("idle-test") {
2735 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2736 }
2737 }
2738
2739 runtime.start(None).await.expect("should start");
2740 tokio::time::sleep(Duration::from_millis(100)).await;
2741 runtime.stop(None).await.expect("should stop");
2742
2743 let p = runtime.get_persona("idle-test").await.unwrap();
2744 assert_eq!(p.status, PersonaStatus::Reaped);
2745 }
2746
2747 #[tokio::test]
2748 async fn budget_paused_persona_not_reaped_for_idle() {
2749 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2750 enabled: true,
2751 loop_interval_ms: 10,
2752 max_events: 256,
2753 max_snapshots: 32,
2754 default_policy: PersonaPolicy {
2755 max_spawn_depth: 2,
2756 max_branching_factor: 2,
2757 token_budget_per_minute: 0, compute_ms_per_minute: 0,
2759 idle_ttl_secs: 0, share_memory: false,
2761 allowed_tools: Vec::new(),
2762 },
2763 });
2764
2765 runtime
2766 .create_persona(CreatePersonaRequest {
2767 persona_id: Some("paused-test".to_string()),
2768 name: "paused-test".to_string(),
2769 role: "pauser".to_string(),
2770 charter: "pause".to_string(),
2771 swarm_id: None,
2772 parent_id: None,
2773 policy: None,
2774 tags: Vec::new(),
2775 })
2776 .await
2777 .expect("should create persona");
2778
2779 {
2781 let mut personas = runtime.personas.write().await;
2782 if let Some(p) = personas.get_mut("paused-test") {
2783 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2784 }
2785 }
2786
2787 runtime.start(None).await.expect("should start");
2788 tokio::time::sleep(Duration::from_millis(100)).await;
2789 runtime.stop(None).await.expect("should stop");
2790
2791 let p = runtime.get_persona("paused-test").await.unwrap();
2792 assert_eq!(p.status, PersonaStatus::Active);
2794 assert!(p.budget_paused);
2795 }
2796
2797 #[tokio::test]
2798 async fn governance_proposal_resolution() {
2799 let runtime = test_runtime();
2800 let gov = SwarmGovernance {
2801 quorum_fraction: 0.5,
2802 required_approvers_by_role: HashMap::new(),
2803 veto_roles: vec!["auditor".to_string()],
2804 vote_timeout_secs: 300,
2805 };
2806 *runtime.governance.write().await = gov;
2807
2808 runtime
2810 .create_persona(CreatePersonaRequest {
2811 persona_id: Some("voter-1".to_string()),
2812 name: "voter-1".to_string(),
2813 role: "engineer".to_string(),
2814 charter: "vote".to_string(),
2815 swarm_id: None,
2816 parent_id: None,
2817 policy: None,
2818 tags: Vec::new(),
2819 })
2820 .await
2821 .unwrap();
2822 runtime
2823 .create_persona(CreatePersonaRequest {
2824 persona_id: Some("voter-2".to_string()),
2825 name: "voter-2".to_string(),
2826 role: "engineer".to_string(),
2827 charter: "vote".to_string(),
2828 swarm_id: None,
2829 parent_id: None,
2830 policy: None,
2831 tags: Vec::new(),
2832 })
2833 .await
2834 .unwrap();
2835
2836 {
2838 let mut proposals = runtime.proposals.write().await;
2839 let mut votes = HashMap::new();
2840 votes.insert("voter-1".to_string(), ProposalVote::Approve);
2841 proposals.insert(
2842 "prop-1".to_string(),
2843 Proposal {
2844 id: "prop-1".to_string(),
2845 persona_id: "voter-1".to_string(),
2846 title: "test proposal".to_string(),
2847 rationale: "testing governance".to_string(),
2848 evidence_refs: Vec::new(),
2849 risk: ProposalRisk::Low,
2850 status: ProposalStatus::Created,
2851 created_at: Utc::now(),
2852 votes,
2853 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2854 votes_requested: true,
2855 quorum_needed: 1,
2856 },
2857 );
2858 }
2859
2860 runtime.start(None).await.unwrap();
2862 tokio::time::sleep(Duration::from_millis(100)).await;
2863 runtime.stop(None).await.unwrap();
2864
2865 let proposals = runtime.get_proposals().await;
2866 let prop = proposals.get("prop-1").unwrap();
2867 assert!(
2869 prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2870 "Expected Verified or Executed, got {:?}",
2871 prop.status
2872 );
2873 }
2874
2875 #[tokio::test]
2876 async fn veto_rejects_proposal() {
2877 let runtime = test_runtime();
2878 let gov = SwarmGovernance {
2879 quorum_fraction: 0.5,
2880 required_approvers_by_role: HashMap::new(),
2881 veto_roles: vec!["auditor".to_string()],
2882 vote_timeout_secs: 300,
2883 };
2884 *runtime.governance.write().await = gov;
2885
2886 runtime
2887 .create_persona(CreatePersonaRequest {
2888 persona_id: Some("eng".to_string()),
2889 name: "eng".to_string(),
2890 role: "engineer".to_string(),
2891 charter: "build".to_string(),
2892 swarm_id: None,
2893 parent_id: None,
2894 policy: None,
2895 tags: Vec::new(),
2896 })
2897 .await
2898 .unwrap();
2899 runtime
2900 .create_persona(CreatePersonaRequest {
2901 persona_id: Some("aud".to_string()),
2902 name: "aud".to_string(),
2903 role: "auditor".to_string(),
2904 charter: "audit".to_string(),
2905 swarm_id: None,
2906 parent_id: None,
2907 policy: None,
2908 tags: Vec::new(),
2909 })
2910 .await
2911 .unwrap();
2912
2913 {
2914 let mut proposals = runtime.proposals.write().await;
2915 let mut votes = HashMap::new();
2916 votes.insert("eng".to_string(), ProposalVote::Approve);
2917 votes.insert("aud".to_string(), ProposalVote::Veto);
2918 proposals.insert(
2919 "prop-veto".to_string(),
2920 Proposal {
2921 id: "prop-veto".to_string(),
2922 persona_id: "eng".to_string(),
2923 title: "vetoed proposal".to_string(),
2924 rationale: "testing veto".to_string(),
2925 evidence_refs: Vec::new(),
2926 risk: ProposalRisk::Low,
2927 status: ProposalStatus::Created,
2928 created_at: Utc::now(),
2929 votes,
2930 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2931 votes_requested: true,
2932 quorum_needed: 1,
2933 },
2934 );
2935 }
2936
2937 runtime.start(None).await.unwrap();
2938 tokio::time::sleep(Duration::from_millis(100)).await;
2939 runtime.stop(None).await.unwrap();
2940
2941 let proposals = runtime.get_proposals().await;
2942 let prop = proposals.get("prop-veto").unwrap();
2943 assert_eq!(prop.status, ProposalStatus::Rejected);
2944 }
2945
2946 #[test]
2947 fn global_workspace_default() {
2948 let ws = GlobalWorkspace::default();
2949 assert!(ws.top_beliefs.is_empty());
2950 assert!(ws.top_uncertainties.is_empty());
2951 assert!(ws.top_attention.is_empty());
2952 }
2953
2954 #[test]
2955 fn attention_item_creation() {
2956 let item = AttentionItem {
2957 id: "a1".to_string(),
2958 topic: "test topic".to_string(),
2959 topic_tags: vec!["reliability".to_string()],
2960 priority: 0.8,
2961 source_type: AttentionSource::ContestedBelief,
2962 source_id: "b1".to_string(),
2963 assigned_persona: None,
2964 created_at: Utc::now(),
2965 resolved_at: None,
2966 };
2967 assert!(item.resolved_at.is_none());
2968 assert_eq!(item.priority, 0.8);
2969 }
2970}