1pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13pub mod tool_router;
14
15pub use thinker::{
16 CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
17};
18
19use crate::tool::ToolRegistry;
20use anyhow::{Result, anyhow};
21use beliefs::{Belief, BeliefStatus};
22use chrono::{DateTime, Duration as ChronoDuration, Utc};
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::{HashMap, VecDeque};
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::time::Duration;
29use tokio::sync::{Mutex, RwLock, broadcast};
30use tokio::task::JoinHandle;
31
32const _: () = {
34 fn _assert_types_used() {
35 let _ = std::mem::size_of::<CandleDevicePreference>();
36 let _ = std::mem::size_of::<ThinkerBackend>();
37 let _ = std::mem::size_of::<ThinkerOutput>();
38 }
39};
40use tokio::time::Instant;
41use uuid::Uuid;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum PersonaStatus {
47 Active,
48 Idle,
49 Reaped,
50 Error,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct PersonaPolicy {
56 pub max_spawn_depth: u32,
57 pub max_branching_factor: u32,
58 pub token_budget_per_minute: u32,
59 pub compute_ms_per_minute: u32,
60 pub idle_ttl_secs: u64,
61 pub share_memory: bool,
62 #[serde(default)]
63 pub allowed_tools: Vec<String>,
64}
65
66impl Default for PersonaPolicy {
67 fn default() -> Self {
68 Self {
69 max_spawn_depth: 4,
70 max_branching_factor: 4,
71 token_budget_per_minute: 20_000,
72 compute_ms_per_minute: 10_000,
73 idle_ttl_secs: 3_600,
74 share_memory: false,
75 allowed_tools: Vec::new(),
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct PersonaIdentity {
83 pub id: String,
84 pub name: String,
85 pub role: String,
86 pub charter: String,
87 pub swarm_id: Option<String>,
88 pub parent_id: Option<String>,
89 pub depth: u32,
90 pub created_at: DateTime<Utc>,
91 #[serde(default)]
92 pub tags: Vec<String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct PersonaRuntimeState {
98 pub identity: PersonaIdentity,
99 pub policy: PersonaPolicy,
100 pub status: PersonaStatus,
101 pub thought_count: u64,
102 pub last_tick_at: Option<DateTime<Utc>>,
103 pub updated_at: DateTime<Utc>,
104 pub tokens_this_window: u32,
106 pub compute_ms_this_window: u32,
108 pub window_started_at: DateTime<Utc>,
110 pub last_progress_at: DateTime<Utc>,
112 #[serde(default)]
114 pub budget_paused: bool,
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
119#[serde(rename_all = "snake_case")]
120pub enum ProposalRisk {
121 Low,
122 Medium,
123 High,
124 Critical,
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
129#[serde(rename_all = "snake_case")]
130pub enum ProposalStatus {
131 Created,
132 Verified,
133 Rejected,
134 Executed,
135}
136
137#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub enum ProposalVote {
141 Approve,
142 Reject,
143 Veto,
144 Abstain,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct Proposal {
150 pub id: String,
151 pub persona_id: String,
152 pub title: String,
153 pub rationale: String,
154 pub evidence_refs: Vec<String>,
155 pub risk: ProposalRisk,
156 pub status: ProposalStatus,
157 pub created_at: DateTime<Utc>,
158 #[serde(default)]
160 pub votes: HashMap<String, ProposalVote>,
161 pub vote_deadline: Option<DateTime<Utc>>,
163 #[serde(default)]
165 pub votes_requested: bool,
166 #[serde(default)]
168 pub quorum_needed: usize,
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
173#[serde(rename_all = "snake_case")]
174pub enum ThoughtEventType {
175 ThoughtGenerated,
176 HypothesisRaised,
177 CheckRequested,
178 CheckResult,
179 ProposalCreated,
180 ProposalVerified,
181 ProposalRejected,
182 ActionExecuted,
183 PersonaSpawned,
184 PersonaReaped,
185 SnapshotCompressed,
186 BeliefExtracted,
187 BeliefContested,
188 BeliefRevalidated,
189 BudgetPaused,
190 IdleReaped,
191 AttentionCreated,
192 VoteCast,
193 WorkspaceUpdated,
194}
195
196impl ThoughtEventType {
197 fn as_str(&self) -> &'static str {
198 match self {
199 Self::ThoughtGenerated => "thought_generated",
200 Self::HypothesisRaised => "hypothesis_raised",
201 Self::CheckRequested => "check_requested",
202 Self::CheckResult => "check_result",
203 Self::ProposalCreated => "proposal_created",
204 Self::ProposalVerified => "proposal_verified",
205 Self::ProposalRejected => "proposal_rejected",
206 Self::ActionExecuted => "action_executed",
207 Self::PersonaSpawned => "persona_spawned",
208 Self::PersonaReaped => "persona_reaped",
209 Self::SnapshotCompressed => "snapshot_compressed",
210 Self::BeliefExtracted => "belief_extracted",
211 Self::BeliefContested => "belief_contested",
212 Self::BeliefRevalidated => "belief_revalidated",
213 Self::BudgetPaused => "budget_paused",
214 Self::IdleReaped => "idle_reaped",
215 Self::AttentionCreated => "attention_created",
216 Self::VoteCast => "vote_cast",
217 Self::WorkspaceUpdated => "workspace_updated",
218 }
219 }
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct ThoughtEvent {
225 pub id: String,
226 pub event_type: ThoughtEventType,
227 pub persona_id: Option<String>,
228 pub swarm_id: Option<String>,
229 pub timestamp: DateTime<Utc>,
230 pub payload: serde_json::Value,
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct MemorySnapshot {
236 pub id: String,
237 pub generated_at: DateTime<Utc>,
238 pub swarm_id: Option<String>,
239 pub persona_scope: Vec<String>,
240 pub summary: String,
241 pub hot_event_count: usize,
242 pub warm_fact_count: usize,
243 pub cold_snapshot_count: usize,
244 pub metadata: HashMap<String, serde_json::Value>,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct CreatePersonaRequest {
250 pub persona_id: Option<String>,
251 pub name: String,
252 pub role: String,
253 pub charter: String,
254 pub swarm_id: Option<String>,
255 pub parent_id: Option<String>,
256 pub policy: Option<PersonaPolicy>,
257 #[serde(default)]
258 pub tags: Vec<String>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct SpawnPersonaRequest {
264 pub persona_id: Option<String>,
265 pub name: String,
266 pub role: String,
267 pub charter: String,
268 pub swarm_id: Option<String>,
269 pub policy: Option<PersonaPolicy>,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct ReapPersonaRequest {
275 pub cascade: Option<bool>,
276 pub reason: Option<String>,
277}
278
279#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct StartCognitionRequest {
282 pub loop_interval_ms: Option<u64>,
283 pub seed_persona: Option<CreatePersonaRequest>,
284}
285
286#[derive(Debug, Clone, Serialize, Deserialize)]
288pub struct StopCognitionRequest {
289 pub reason: Option<String>,
290}
291
292#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
296#[serde(rename_all = "snake_case")]
297pub enum AttentionSource {
298 ContestedBelief,
299 FailedCheck,
300 StaleBelief,
301 ProposalTimeout,
302 FailedExecution,
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct AttentionItem {
308 pub id: String,
309 pub topic: String,
310 pub topic_tags: Vec<String>,
311 pub priority: f32,
312 pub source_type: AttentionSource,
313 pub source_id: String,
314 pub assigned_persona: Option<String>,
315 pub created_at: DateTime<Utc>,
316 pub resolved_at: Option<DateTime<Utc>>,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct SwarmGovernance {
322 pub quorum_fraction: f32,
323 pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
324 pub veto_roles: Vec<String>,
325 pub vote_timeout_secs: u64,
326}
327
328impl Default for SwarmGovernance {
329 fn default() -> Self {
330 Self {
331 quorum_fraction: 0.5,
332 required_approvers_by_role: HashMap::new(),
333 veto_roles: Vec::new(),
334 vote_timeout_secs: 300,
335 }
336 }
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct GlobalWorkspace {
342 pub top_beliefs: Vec<String>,
343 pub top_uncertainties: Vec<String>,
344 pub top_attention: Vec<String>,
345 pub active_objectives: Vec<String>,
346 pub updated_at: DateTime<Utc>,
347}
348
349impl Default for GlobalWorkspace {
350 fn default() -> Self {
351 Self {
352 top_beliefs: Vec::new(),
353 top_uncertainties: Vec::new(),
354 top_attention: Vec::new(),
355 active_objectives: Vec::new(),
356 updated_at: Utc::now(),
357 }
358 }
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize)]
363pub struct CognitionStatus {
364 pub enabled: bool,
365 pub running: bool,
366 pub loop_interval_ms: u64,
367 pub started_at: Option<DateTime<Utc>>,
368 pub last_tick_at: Option<DateTime<Utc>>,
369 pub persona_count: usize,
370 pub active_persona_count: usize,
371 pub events_buffered: usize,
372 pub snapshots_buffered: usize,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct ReapPersonaResponse {
378 pub reaped_ids: Vec<String>,
379 pub count: usize,
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct LineageNode {
385 pub persona_id: String,
386 pub parent_id: Option<String>,
387 pub children: Vec<String>,
388 pub depth: u32,
389 pub status: PersonaStatus,
390}
391
392#[derive(Debug, Clone, Serialize, Deserialize)]
394pub struct LineageGraph {
395 pub nodes: Vec<LineageNode>,
396 pub roots: Vec<String>,
397 pub total_edges: usize,
398}
399
400#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401enum ThoughtPhase {
402 Observe,
403 Reflect,
404 Test,
405 Compress,
406}
407
408impl ThoughtPhase {
409 fn from_thought_count(thought_count: u64) -> Self {
410 match thought_count % 4 {
411 1 => Self::Observe,
412 2 => Self::Reflect,
413 3 => Self::Test,
414 _ => Self::Compress,
415 }
416 }
417
418 fn as_str(&self) -> &'static str {
419 match self {
420 Self::Observe => "observe",
421 Self::Reflect => "reflect",
422 Self::Test => "test",
423 Self::Compress => "compress",
424 }
425 }
426
427 fn event_type(&self) -> ThoughtEventType {
428 match self {
429 Self::Observe => ThoughtEventType::ThoughtGenerated,
430 Self::Reflect => ThoughtEventType::HypothesisRaised,
431 Self::Test => ThoughtEventType::CheckRequested,
432 Self::Compress => ThoughtEventType::SnapshotCompressed,
433 }
434 }
435}
436
437#[derive(Debug, Clone)]
438struct ThoughtWorkItem {
439 persona_id: String,
440 persona_name: String,
441 role: String,
442 charter: String,
443 swarm_id: Option<String>,
444 thought_count: u64,
445 phase: ThoughtPhase,
446}
447
448#[derive(Debug, Clone)]
449struct ThoughtResult {
450 source: &'static str,
451 model: Option<String>,
452 finish_reason: Option<String>,
453 thinking: String,
454 prompt_tokens: Option<u32>,
455 completion_tokens: Option<u32>,
456 total_tokens: Option<u32>,
457 latency_ms: u128,
458 error: Option<String>,
459}
460
461#[derive(Debug, Clone)]
463pub struct CognitionRuntimeOptions {
464 pub enabled: bool,
465 pub loop_interval_ms: u64,
466 pub max_events: usize,
467 pub max_snapshots: usize,
468 pub default_policy: PersonaPolicy,
469}
470
471impl Default for CognitionRuntimeOptions {
472 fn default() -> Self {
473 Self {
474 enabled: false,
475 loop_interval_ms: 2_000,
476 max_events: 2_000,
477 max_snapshots: 128,
478 default_policy: PersonaPolicy::default(),
479 }
480 }
481}
482
483#[derive(Debug)]
485pub struct CognitionRuntime {
486 enabled: bool,
487 max_events: usize,
488 max_snapshots: usize,
489 default_policy: PersonaPolicy,
490 running: Arc<AtomicBool>,
491 loop_interval_ms: Arc<RwLock<u64>>,
492 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
493 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
494 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
495 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
496 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
497 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
498 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
499 event_tx: broadcast::Sender<ThoughtEvent>,
500 thinker: Option<Arc<ThinkerClient>>,
501 beliefs: Arc<RwLock<HashMap<String, Belief>>>,
502 attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
503 governance: Arc<RwLock<SwarmGovernance>>,
504 workspace: Arc<RwLock<GlobalWorkspace>>,
505 tools: Option<Arc<ToolRegistry>>,
506 receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
507 pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
509}
510
511impl CognitionRuntime {
512 pub fn new_from_env() -> Self {
514 let mut options = CognitionRuntimeOptions::default();
515 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
516 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
517 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
518 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
519
520 options.default_policy = PersonaPolicy {
521 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
522 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
523 token_budget_per_minute: env_u32(
524 "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
525 20_000,
526 ),
527 compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
528 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
529 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
530 allowed_tools: Vec::new(),
531 };
532
533 let thinker_backend = thinker::ThinkerBackend::from_env(
534 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
535 .unwrap_or_else(|_| "openai_compat".to_string()),
536 );
537 let thinker_timeout_default = match thinker_backend {
538 thinker::ThinkerBackend::OpenAICompat => 30_000,
539 thinker::ThinkerBackend::Candle => 12_000,
540 thinker::ThinkerBackend::Bedrock => 60_000,
541 };
542 let thinker_config = ThinkerConfig {
543 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
544 backend: thinker_backend,
545 endpoint: normalize_thinker_endpoint(
546 &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
547 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
548 ),
549 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
550 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
551 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
552 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
553 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
554 .ok()
555 .and_then(|v| v.parse::<f32>().ok()),
556 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
557 timeout_ms: env_u64(
558 "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
559 thinker_timeout_default,
560 ),
561 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
562 candle_tokenizer_path: std::env::var(
563 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
564 )
565 .ok(),
566 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
567 candle_device: thinker::CandleDevicePreference::from_env(
568 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
569 .unwrap_or_else(|_| "auto".to_string()),
570 ),
571 candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
572 candle_repeat_penalty: env_f32(
573 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
574 1.1,
575 ),
576 candle_repeat_last_n: env_usize(
577 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
578 64,
579 ),
580 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
581 bedrock_region: std::env::var("CODETETHER_COGNITION_THINKER_BEDROCK_REGION")
582 .unwrap_or_else(|_| {
583 std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-west-2".to_string())
584 }),
585 };
586
587 let mut runtime = Self::new_with_options(options);
588 runtime.thinker = Some(thinker_config).and_then(|cfg| {
590 if !cfg.enabled {
591 return None;
592 }
593 match ThinkerClient::new(cfg) {
594 Ok(client) => {
595 tracing::info!(
596 backend = ?client.config().backend,
597 endpoint = %client.config().endpoint,
598 model = %client.config().model,
599 "Cognition thinker initialized"
600 );
601 Some(Arc::new(client))
602 }
603 Err(error) => {
604 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
605 None
606 }
607 }
608 });
609 runtime
610 }
611
612 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
614 Self::new_with_options_and_thinker(options, None)
615 }
616
617 fn new_with_options_and_thinker(
618 options: CognitionRuntimeOptions,
619 thinker_config: Option<ThinkerConfig>,
620 ) -> Self {
621 let (event_tx, _) = broadcast::channel(256);
622 let thinker = thinker_config.and_then(|cfg| {
623 if !cfg.enabled {
624 return None;
625 }
626 match ThinkerClient::new(cfg) {
627 Ok(client) => {
628 tracing::info!(
629 backend = ?client.config().backend,
630 endpoint = %client.config().endpoint,
631 model = %client.config().model,
632 "Cognition thinker initialized"
633 );
634 Some(Arc::new(client))
635 }
636 Err(error) => {
637 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
638 None
639 }
640 }
641 });
642
643 let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) =
646 if let Some(persisted) = persistence::load_state() {
647 tracing::info!(
648 personas = persisted.personas.len(),
649 beliefs = persisted.beliefs.len(),
650 persisted_at = %persisted.persisted_at,
651 "Restoring persisted cognition state"
652 );
653 (
654 persisted.personas,
655 persisted.beliefs,
656 persisted.proposals,
657 persisted.attention_queue,
658 persisted.workspace,
659 )
660 } else {
661 (
662 HashMap::new(),
663 HashMap::new(),
664 HashMap::new(),
665 Vec::new(),
666 GlobalWorkspace::default(),
667 )
668 };
669
670 let runtime = Self {
671 enabled: options.enabled,
672 max_events: options.max_events.max(32),
673 max_snapshots: options.max_snapshots.max(8),
674 default_policy: options.default_policy,
675 running: Arc::new(AtomicBool::new(false)),
676 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
677 started_at: Arc::new(RwLock::new(None)),
678 last_tick_at: Arc::new(RwLock::new(None)),
679 personas: Arc::new(RwLock::new(init_personas)),
680 proposals: Arc::new(RwLock::new(init_proposals)),
681 events: Arc::new(RwLock::new(VecDeque::new())),
682 snapshots: Arc::new(RwLock::new(VecDeque::new())),
683 loop_handle: Arc::new(Mutex::new(None)),
684 event_tx,
685 thinker,
686 beliefs: Arc::new(RwLock::new(init_beliefs)),
687 attention_queue: Arc::new(RwLock::new(init_attention)),
688 governance: Arc::new(RwLock::new(SwarmGovernance::default())),
689 workspace: Arc::new(RwLock::new(init_workspace)),
690 tools: None,
691 receipts: Arc::new(RwLock::new(Vec::new())),
692 pending_approvals: Arc::new(RwLock::new(HashMap::new())),
693 };
694
695 runtime
696 }
697
698 pub fn is_enabled(&self) -> bool {
700 self.enabled
701 }
702
703 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
705 self.event_tx.subscribe()
706 }
707
708 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
710 if !self.enabled {
711 return Err(anyhow!(
712 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
713 ));
714 }
715
716 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
717 if let Some(request) = req {
718 if let Some(interval) = request.loop_interval_ms {
719 let mut lock = self.loop_interval_ms.write().await;
720 *lock = interval.max(100);
721 }
722 requested_seed_persona = request.seed_persona;
723 }
724
725 let has_personas = !self.personas.read().await.is_empty();
726 if !has_personas {
727 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
728 let _ = self.create_persona(seed).await?;
729 }
730
731 if self.running.load(Ordering::SeqCst) {
732 return Ok(self.status().await);
733 }
734
735 self.running.store(true, Ordering::SeqCst);
736 {
737 let mut started = self.started_at.write().await;
738 *started = Some(Utc::now());
739 }
740
741 let running = Arc::clone(&self.running);
742 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
743 let last_tick_at = Arc::clone(&self.last_tick_at);
744 let personas = Arc::clone(&self.personas);
745 let proposals = Arc::clone(&self.proposals);
746 let events = Arc::clone(&self.events);
747 let snapshots = Arc::clone(&self.snapshots);
748 let max_events = self.max_events;
749 let max_snapshots = self.max_snapshots;
750 let event_tx = self.event_tx.clone();
751 let thinker = self.thinker.clone();
752 let beliefs = Arc::clone(&self.beliefs);
753 let attention_queue = Arc::clone(&self.attention_queue);
754 let governance = Arc::clone(&self.governance);
755 let workspace = Arc::clone(&self.workspace);
756 let tools = self.tools.clone();
757 let receipts = Arc::clone(&self.receipts);
758 let pending_approvals = Arc::clone(&self.pending_approvals);
759
760 let handle = tokio::spawn(async move {
761 let mut next_tick = Instant::now();
762 while running.load(Ordering::SeqCst) {
763 let now_instant = Instant::now();
764 if now_instant < next_tick {
765 tokio::time::sleep_until(next_tick).await;
766 }
767 if !running.load(Ordering::SeqCst) {
768 break;
769 }
770
771 let now = Utc::now();
772 {
773 let mut lock = last_tick_at.write().await;
774 *lock = Some(now);
775 }
776
777 let mut new_events = Vec::new();
778 let mut new_snapshots = Vec::new();
779 let mut new_proposals = Vec::new();
780
781 let work_items: Vec<ThoughtWorkItem> = {
783 let mut map = personas.write().await;
784 let mut items = Vec::new();
785 for persona in map.values_mut() {
786 if persona.status != PersonaStatus::Active {
787 continue;
788 }
789
790 let window_elapsed = now
792 .signed_duration_since(persona.window_started_at)
793 .num_seconds();
794 if window_elapsed >= 60 {
795 persona.tokens_this_window = 0;
796 persona.compute_ms_this_window = 0;
797 persona.window_started_at = now;
798 }
799
800 let token_ok =
802 persona.tokens_this_window < persona.policy.token_budget_per_minute;
803 let compute_ok =
804 persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
805 if !token_ok || !compute_ok {
806 if !persona.budget_paused {
807 persona.budget_paused = true;
808 new_events.push(ThoughtEvent {
809 id: Uuid::new_v4().to_string(),
810 event_type: ThoughtEventType::BudgetPaused,
811 persona_id: Some(persona.identity.id.clone()),
812 swarm_id: persona.identity.swarm_id.clone(),
813 timestamp: now,
814 payload: json!({
815 "budget_paused": true,
816 "tokens_used": persona.tokens_this_window,
817 "compute_ms_used": persona.compute_ms_this_window,
818 "token_budget": persona.policy.token_budget_per_minute,
819 "compute_budget": persona.policy.compute_ms_per_minute,
820 }),
821 });
822 }
823 continue; }
825
826 persona.budget_paused = false;
827 persona.thought_count = persona.thought_count.saturating_add(1);
828 persona.last_tick_at = Some(now);
829 persona.updated_at = now;
830 items.push(ThoughtWorkItem {
831 persona_id: persona.identity.id.clone(),
832 persona_name: persona.identity.name.clone(),
833 role: persona.identity.role.clone(),
834 charter: persona.identity.charter.clone(),
835 swarm_id: persona.identity.swarm_id.clone(),
836 thought_count: persona.thought_count,
837 phase: ThoughtPhase::from_thought_count(persona.thought_count),
838 });
839 }
840 items
841 };
842
843 for work in &work_items {
844 let context = recent_persona_context(&events, &work.persona_id, 8).await;
845
846 let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
847
848 let event_timestamp = Utc::now();
849 let is_fallback = thought.source == "fallback";
850 let tokens_used = thought.total_tokens.unwrap_or(0);
851 let compute_used = thought.latency_ms as u32;
852
853 new_events.push(ThoughtEvent {
854 id: Uuid::new_v4().to_string(),
855 event_type: work.phase.event_type(),
856 persona_id: Some(work.persona_id.clone()),
857 swarm_id: work.swarm_id.clone(),
858 timestamp: event_timestamp,
859 payload: json!({
860 "phase": work.phase.as_str(),
861 "thought_count": work.thought_count,
862 "persona": {
863 "id": work.persona_id.clone(),
864 "name": work.persona_name.clone(),
865 "role": work.role.clone(),
866 },
867 "context_event_count": context.len(),
868 "thinking": thought.thinking.clone(),
869 "source": thought.source,
870 "model": thought.model.clone(),
871 "finish_reason": thought.finish_reason.clone(),
872 "usage": {
873 "prompt_tokens": thought.prompt_tokens,
874 "completion_tokens": thought.completion_tokens,
875 "total_tokens": thought.total_tokens,
876 },
877 "latency_ms": thought.latency_ms,
878 "error": thought.error.clone(),
879 }),
880 });
881
882 {
884 let mut map = personas.write().await;
885 if let Some(persona) = map.get_mut(&work.persona_id) {
886 persona.tokens_this_window =
887 persona.tokens_this_window.saturating_add(tokens_used);
888 persona.compute_ms_this_window =
889 persona.compute_ms_this_window.saturating_add(compute_used);
890 if !is_fallback {
891 persona.last_progress_at = Utc::now();
892 }
893 }
894 }
895
896 if work.phase == ThoughtPhase::Reflect && !is_fallback {
898 let extracted = beliefs::extract_beliefs_from_thought(
899 thinker.as_deref(),
900 &work.persona_id,
901 &thought.thinking,
902 )
903 .await;
904
905 if !extracted.is_empty() {
906 let mut belief_store = beliefs.write().await;
907 let mut attn_queue = attention_queue.write().await;
908 for mut new_belief in extracted {
909 let existing_id = belief_store
911 .values()
912 .find(|b| {
913 b.belief_key == new_belief.belief_key
914 && b.status != BeliefStatus::Invalidated
915 })
916 .map(|b| b.id.clone());
917
918 if let Some(eid) = existing_id {
919 if let Some(existing) = belief_store.get_mut(&eid) {
921 if !existing.confirmed_by.contains(&work.persona_id) {
922 existing.confirmed_by.push(work.persona_id.clone());
923 }
924 existing.revalidation_success();
925 }
926 } else {
927 let contest_targets: Vec<String> =
929 new_belief.contradicts.clone();
930 for target_key in &contest_targets {
931 if let Some(target) = belief_store.values_mut().find(|b| {
932 &b.belief_key == target_key
933 && b.status != BeliefStatus::Invalidated
934 }) {
935 target.contested_by.push(new_belief.id.clone());
936 if !new_belief.contradicts.contains(&target.belief_key)
937 {
938 new_belief
939 .contradicts
940 .push(target.belief_key.clone());
941 }
942 target.revalidation_failure();
944 if target.confidence >= 0.5 {
945 attn_queue.push(AttentionItem {
947 id: Uuid::new_v4().to_string(),
948 topic: format!(
949 "Revalidate belief: {}",
950 target.claim
951 ),
952 topic_tags: vec![target.belief_key.clone()],
953 priority: 0.7,
954 source_type: AttentionSource::ContestedBelief,
955 source_id: target.id.clone(),
956 assigned_persona: None,
957 created_at: Utc::now(),
958 resolved_at: None,
959 });
960 }
961 }
962 }
963
964 new_events.push(ThoughtEvent {
965 id: Uuid::new_v4().to_string(),
966 event_type: ThoughtEventType::BeliefExtracted,
967 persona_id: Some(work.persona_id.clone()),
968 swarm_id: work.swarm_id.clone(),
969 timestamp: Utc::now(),
970 payload: json!({
971 "belief_id": new_belief.id,
972 "belief_key": new_belief.belief_key,
973 "claim": trim_for_storage(&new_belief.claim, 280),
974 "confidence": new_belief.confidence,
975 }),
976 });
977
978 {
980 let mut map = personas.write().await;
981 if let Some(p) = map.get_mut(&work.persona_id) {
982 p.last_progress_at = Utc::now();
983 }
984 }
985
986 new_belief.clamp_confidence();
987 belief_store.insert(new_belief.id.clone(), new_belief);
988 }
989 }
990 }
991 }
992
993 if work.phase == ThoughtPhase::Test {
994 new_events.push(ThoughtEvent {
995 id: Uuid::new_v4().to_string(),
996 event_type: ThoughtEventType::CheckResult,
997 persona_id: Some(work.persona_id.clone()),
998 swarm_id: work.swarm_id.clone(),
999 timestamp: Utc::now(),
1000 payload: json!({
1001 "phase": work.phase.as_str(),
1002 "thought_count": work.thought_count,
1003 "result_excerpt": trim_for_storage(&thought.thinking, 280),
1004 "source": thought.source,
1005 "model": thought.model,
1006 }),
1007 });
1008
1009 {
1011 let mut map = personas.write().await;
1012 if let Some(p) = map.get_mut(&work.persona_id) {
1013 p.last_progress_at = Utc::now();
1014 }
1015 }
1016
1017 if !is_fallback {
1019 if let Some(ref tool_registry) = tools {
1020 let allowed = {
1021 let map = personas.read().await;
1022 map.get(&work.persona_id)
1023 .map(|p| p.policy.allowed_tools.clone())
1024 .unwrap_or_default()
1025 };
1026 if !allowed.is_empty() {
1027 let tool_results = executor::execute_tool_requests(
1028 thinker.as_deref(),
1029 tool_registry,
1030 &work.persona_id,
1031 &thought.thinking,
1032 &allowed,
1033 )
1034 .await;
1035
1036 for result_event in tool_results {
1037 new_events.push(result_event);
1038 let mut map = personas.write().await;
1040 if let Some(p) = map.get_mut(&work.persona_id) {
1041 p.last_progress_at = Utc::now();
1042 }
1043 }
1044 }
1045 }
1046 }
1047 }
1048
1049 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
1050 let gov = governance.read().await;
1051 let proposal = Proposal {
1052 id: Uuid::new_v4().to_string(),
1053 persona_id: work.persona_id.clone(),
1054 title: proposal_title_from_thought(
1055 &thought.thinking,
1056 work.thought_count,
1057 ),
1058 rationale: trim_for_storage(&thought.thinking, 900),
1059 evidence_refs: vec!["internal.thought_stream".to_string()],
1060 risk: ProposalRisk::Low,
1061 status: ProposalStatus::Created,
1062 created_at: Utc::now(),
1063 votes: HashMap::new(),
1064 vote_deadline: Some(
1065 Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1066 ),
1067 votes_requested: false,
1068 quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1069 as usize,
1070 };
1071
1072 new_events.push(ThoughtEvent {
1073 id: Uuid::new_v4().to_string(),
1074 event_type: ThoughtEventType::ProposalCreated,
1075 persona_id: Some(work.persona_id.clone()),
1076 swarm_id: work.swarm_id.clone(),
1077 timestamp: Utc::now(),
1078 payload: json!({
1079 "proposal_id": proposal.id,
1080 "title": proposal.title,
1081 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1082 "source": thought.source,
1083 "model": thought.model,
1084 }),
1085 });
1086
1087 new_proposals.push(proposal);
1088 }
1089
1090 if work.phase == ThoughtPhase::Compress {
1091 new_snapshots.push(MemorySnapshot {
1092 id: Uuid::new_v4().to_string(),
1093 generated_at: Utc::now(),
1094 swarm_id: work.swarm_id.clone(),
1095 persona_scope: vec![work.persona_id.clone()],
1096 summary: trim_for_storage(&thought.thinking, 1_500),
1097 hot_event_count: context.len(),
1098 warm_fact_count: estimate_fact_count(&thought.thinking),
1099 cold_snapshot_count: 1,
1100 metadata: HashMap::from([
1101 (
1102 "phase".to_string(),
1103 serde_json::Value::String(work.phase.as_str().to_string()),
1104 ),
1105 (
1106 "role".to_string(),
1107 serde_json::Value::String(work.role.clone()),
1108 ),
1109 (
1110 "source".to_string(),
1111 serde_json::Value::String(thought.source.to_string()),
1112 ),
1113 (
1114 "model".to_string(),
1115 serde_json::Value::String(
1116 thought
1117 .model
1118 .clone()
1119 .unwrap_or_else(|| "fallback".to_string()),
1120 ),
1121 ),
1122 (
1123 "completion_tokens".to_string(),
1124 serde_json::Value::Number(serde_json::Number::from(
1125 thought.completion_tokens.unwrap_or(0) as u64,
1126 )),
1127 ),
1128 ]),
1129 });
1130
1131 {
1133 let mut belief_store = beliefs.write().await;
1134 let mut attn_queue = attention_queue.write().await;
1135 let stale_ids: Vec<String> = belief_store
1136 .values()
1137 .filter(|b| {
1138 b.status == BeliefStatus::Active && now > b.review_after
1139 })
1140 .map(|b| b.id.clone())
1141 .collect();
1142 for id in stale_ids {
1143 if let Some(belief) = belief_store.get_mut(&id) {
1144 belief.decay();
1145 attn_queue.push(AttentionItem {
1146 id: Uuid::new_v4().to_string(),
1147 topic: format!("Stale belief: {}", belief.claim),
1148 topic_tags: vec![belief.belief_key.clone()],
1149 priority: 0.4,
1150 source_type: AttentionSource::StaleBelief,
1151 source_id: belief.id.clone(),
1152 assigned_persona: None,
1153 created_at: now,
1154 resolved_at: None,
1155 });
1156 }
1157 }
1158 }
1159
1160 {
1162 let belief_store = beliefs.read().await;
1163 let attn_queue = attention_queue.read().await;
1164
1165 let mut sorted_beliefs: Vec<&Belief> = belief_store
1166 .values()
1167 .filter(|b| b.status == BeliefStatus::Active)
1168 .collect();
1169 sorted_beliefs.sort_by(|a, b| {
1170 let score_a = a.confidence
1171 * (1.0
1172 / (1.0
1173 + now.signed_duration_since(a.updated_at).num_minutes()
1174 as f32));
1175 let score_b = b.confidence
1176 * (1.0
1177 / (1.0
1178 + now.signed_duration_since(b.updated_at).num_minutes()
1179 as f32));
1180 score_b
1181 .partial_cmp(&score_a)
1182 .unwrap_or(std::cmp::Ordering::Equal)
1183 });
1184
1185 let top_beliefs: Vec<String> = sorted_beliefs
1186 .iter()
1187 .take(10)
1188 .map(|b| b.id.clone())
1189 .collect();
1190 let top_uncertainties: Vec<String> = {
1191 let mut uncertain: Vec<&Belief> = belief_store
1192 .values()
1193 .filter(|b| {
1194 b.status == BeliefStatus::Stale
1195 || !b.contested_by.is_empty()
1196 })
1197 .collect();
1198 uncertain.sort_by(|a, b| {
1201 let a_contested = !a.contested_by.is_empty();
1202 let b_contested = !b.contested_by.is_empty();
1203 b_contested
1204 .cmp(&a_contested)
1205 .then_with(|| {
1206 a.confidence
1207 .partial_cmp(&b.confidence)
1208 .unwrap_or(std::cmp::Ordering::Equal)
1209 })
1210 .then_with(|| a.updated_at.cmp(&b.updated_at))
1211 });
1212 uncertain
1213 .iter()
1214 .take(5)
1215 .map(|b| {
1216 format!(
1217 "[{}] {}",
1218 b.belief_key,
1219 trim_for_storage(&b.claim, 80)
1220 )
1221 })
1222 .collect()
1223 };
1224
1225 let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1226 .iter()
1227 .filter(|a| a.resolved_at.is_none())
1228 .collect();
1229 sorted_attn.sort_by(|a, b| {
1230 b.priority
1231 .partial_cmp(&a.priority)
1232 .unwrap_or(std::cmp::Ordering::Equal)
1233 });
1234 let top_attention: Vec<String> =
1235 sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1236
1237 let mut ws = workspace.write().await;
1238 ws.top_beliefs = top_beliefs;
1239 ws.top_uncertainties = top_uncertainties;
1240 ws.top_attention = top_attention;
1241 ws.updated_at = now;
1242 }
1243
1244 new_events.push(ThoughtEvent {
1245 id: Uuid::new_v4().to_string(),
1246 event_type: ThoughtEventType::WorkspaceUpdated,
1247 persona_id: Some(work.persona_id.clone()),
1248 swarm_id: work.swarm_id.clone(),
1249 timestamp: Utc::now(),
1250 payload: json!({ "updated": true }),
1251 });
1252 }
1253 }
1254
1255 {
1257 let gov = governance.read().await;
1258 let mut proposal_store = proposals.write().await;
1259 let persona_map = personas.read().await;
1260
1261 let proposal_ids: Vec<String> = proposal_store
1262 .values()
1263 .filter(|p| p.status == ProposalStatus::Created)
1264 .map(|p| p.id.clone())
1265 .collect();
1266
1267 let mut attn_queue = attention_queue.write().await;
1268 for pid in proposal_ids {
1269 if let Some(proposal) = proposal_store.get_mut(&pid) {
1270 let quorum_needed = proposal.quorum_needed.max(1);
1271
1272 if let Some(deadline) = proposal.vote_deadline {
1274 if now > deadline {
1275 if proposal.votes.len() < quorum_needed {
1276 attn_queue.push(AttentionItem {
1278 id: Uuid::new_v4().to_string(),
1279 topic: format!(
1280 "Proposal vote timeout: {}",
1281 proposal.title
1282 ),
1283 topic_tags: Vec::new(),
1284 priority: 0.6,
1285 source_type: AttentionSource::ProposalTimeout,
1286 source_id: proposal.id.clone(),
1287 assigned_persona: None,
1288 created_at: now,
1289 resolved_at: None,
1290 });
1291 proposal.status = ProposalStatus::Rejected;
1292 continue;
1293 }
1294
1295 let required_roles = gov
1297 .required_approvers_by_role
1298 .get(&proposal.risk)
1299 .cloned()
1300 .unwrap_or_default();
1301 let all_required_met = required_roles.iter().all(|role| {
1302 proposal.votes.iter().any(|(vid, vote)| {
1303 *vote == ProposalVote::Approve
1304 && persona_map
1305 .get(vid)
1306 .map(|p| &p.identity.role == role)
1307 .unwrap_or(false)
1308 })
1309 });
1310 if !all_required_met {
1311 attn_queue.push(AttentionItem {
1312 id: Uuid::new_v4().to_string(),
1313 topic: format!(
1314 "Missing required approvers: {}",
1315 proposal.title
1316 ),
1317 topic_tags: Vec::new(),
1318 priority: 0.7,
1319 source_type: AttentionSource::ProposalTimeout,
1320 source_id: proposal.id.clone(),
1321 assigned_persona: None,
1322 created_at: now,
1323 resolved_at: None,
1324 });
1325 proposal.status = ProposalStatus::Rejected;
1326 continue;
1327 }
1328 }
1329 }
1330
1331 if proposal.votes.len() >= quorum_needed {
1333 let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1335 if *vote != ProposalVote::Veto {
1336 return false;
1337 }
1338 if let Some(voter) = persona_map.get(voter_id) {
1339 gov.veto_roles.contains(&voter.identity.role)
1340 } else {
1341 false
1342 }
1343 });
1344
1345 if vetoed {
1346 proposal.status = ProposalStatus::Rejected;
1347 continue;
1348 }
1349
1350 let required_roles = gov
1352 .required_approvers_by_role
1353 .get(&proposal.risk)
1354 .cloned()
1355 .unwrap_or_default();
1356 let all_required_met = required_roles.iter().all(|role| {
1357 proposal.votes.iter().any(|(vid, vote)| {
1358 *vote == ProposalVote::Approve
1359 && persona_map
1360 .get(vid)
1361 .map(|p| &p.identity.role == role)
1362 .unwrap_or(false)
1363 })
1364 });
1365
1366 if !all_required_met {
1367 continue; }
1369
1370 let approvals = proposal
1372 .votes
1373 .values()
1374 .filter(|v| **v == ProposalVote::Approve)
1375 .count();
1376 let rejections = proposal
1377 .votes
1378 .values()
1379 .filter(|v| **v == ProposalVote::Reject)
1380 .count();
1381
1382 if approvals > rejections {
1383 proposal.status = ProposalStatus::Verified;
1384 } else {
1385 proposal.status = ProposalStatus::Rejected;
1386 }
1387 }
1388 }
1389 }
1390 }
1391
1392 {
1394 let mut proposal_store = proposals.write().await;
1395 let verified_ids: Vec<String> = proposal_store
1396 .values()
1397 .filter(|p| p.status == ProposalStatus::Verified)
1398 .map(|p| p.id.clone())
1399 .collect();
1400
1401 for pid in verified_ids {
1402 if let Some(proposal) = proposal_store.get_mut(&pid) {
1403 if proposal.risk == ProposalRisk::Critical {
1404 let approved = {
1406 let approvals = pending_approvals.read().await;
1407 approvals.get(&pid).copied().unwrap_or(false)
1408 };
1409 if !approved {
1410 let mut approvals = pending_approvals.write().await;
1412 approvals.entry(pid.clone()).or_insert(false);
1413 continue;
1414 }
1415 }
1416
1417 let receipt = executor::DecisionReceipt {
1419 id: Uuid::new_v4().to_string(),
1420 proposal_id: pid.clone(),
1421 inputs: proposal.evidence_refs.clone(),
1422 governance_decision: format!(
1423 "Approved with {} votes",
1424 proposal.votes.len()
1425 ),
1426 capability_leases: Vec::new(),
1427 tool_invocations: Vec::new(),
1428 outcome: executor::ExecutionOutcome::Success {
1429 summary: format!("Proposal '{}' executed", proposal.title),
1430 },
1431 created_at: Utc::now(),
1432 };
1433
1434 new_events.push(ThoughtEvent {
1435 id: Uuid::new_v4().to_string(),
1436 event_type: ThoughtEventType::ActionExecuted,
1437 persona_id: Some(proposal.persona_id.clone()),
1438 swarm_id: None,
1439 timestamp: Utc::now(),
1440 payload: json!({
1441 "receipt_id": receipt.id,
1442 "proposal_id": pid,
1443 "outcome": "success",
1444 "summary": format!("Proposal '{}' executed", proposal.title),
1445 }),
1446 });
1447
1448 receipts.write().await.push(receipt);
1449 proposal.status = ProposalStatus::Executed;
1450 }
1451 }
1452 }
1453
1454 if !new_proposals.is_empty() {
1455 let mut proposal_store = proposals.write().await;
1456 for proposal in new_proposals {
1457 proposal_store.insert(proposal.id.clone(), proposal);
1458 }
1459 }
1460
1461 for event in new_events {
1462 push_event_internal(&events, max_events, &event_tx, event).await;
1463 }
1464 for snapshot in new_snapshots {
1465 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1466 }
1467
1468 {
1470 let mut map = personas.write().await;
1471 let idle_ids: Vec<String> = map
1472 .values()
1473 .filter(|p| {
1474 p.status == PersonaStatus::Active
1475 && !p.budget_paused
1476 && now.signed_duration_since(p.last_progress_at).num_seconds()
1477 > p.policy.idle_ttl_secs as i64
1478 })
1479 .map(|p| p.identity.id.clone())
1480 .collect();
1481
1482 for id in &idle_ids {
1483 if let Some(persona) = map.get_mut(id) {
1484 persona.status = PersonaStatus::Reaped;
1485 persona.updated_at = now;
1486 }
1487 let children: Vec<String> = map
1489 .values()
1490 .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1491 .map(|p| p.identity.id.clone())
1492 .collect();
1493 for child_id in children {
1494 if let Some(child) = map.get_mut(&child_id) {
1495 child.status = PersonaStatus::Reaped;
1496 child.updated_at = now;
1497 }
1498 }
1499 }
1500 drop(map);
1501
1502 for id in idle_ids {
1503 push_event_internal(
1504 &events,
1505 max_events,
1506 &event_tx,
1507 ThoughtEvent {
1508 id: Uuid::new_v4().to_string(),
1509 event_type: ThoughtEventType::IdleReaped,
1510 persona_id: Some(id),
1511 swarm_id: None,
1512 timestamp: now,
1513 payload: json!({ "reason": "idle_ttl_expired" }),
1514 },
1515 )
1516 .await;
1517 }
1518 }
1519
1520 if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1522 let _ = persistence::save_state(
1523 &personas,
1524 &proposals,
1525 &beliefs,
1526 &attention_queue,
1527 &workspace,
1528 &events,
1529 &snapshots,
1530 )
1531 .await;
1532 }
1533
1534 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1535 next_tick += interval;
1536 let tick_completed = Instant::now();
1537 if tick_completed > next_tick {
1538 next_tick = tick_completed;
1539 }
1540 }
1541 });
1542
1543 {
1544 let mut lock = self.loop_handle.lock().await;
1545 *lock = Some(handle);
1546 }
1547
1548 Ok(self.status().await)
1549 }
1550
1551 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1553 self.running.store(false, Ordering::SeqCst);
1554
1555 if let Some(handle) = self.loop_handle.lock().await.take() {
1556 handle.abort();
1557 let _ = handle.await;
1558 }
1559
1560 if let Some(reason_value) = reason {
1561 let event = ThoughtEvent {
1562 id: Uuid::new_v4().to_string(),
1563 event_type: ThoughtEventType::CheckResult,
1564 persona_id: None,
1565 swarm_id: None,
1566 timestamp: Utc::now(),
1567 payload: json!({ "stopped": true, "reason": reason_value }),
1568 };
1569 self.push_event(event).await;
1570 }
1571
1572 Ok(self.status().await)
1573 }
1574
1575 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1577 let now = Utc::now();
1578 let mut personas = self.personas.write().await;
1579
1580 let mut parent_swarm_id = None;
1581 let mut computed_depth = 0_u32;
1582 let mut inherited_policy = None;
1583
1584 if let Some(parent_id) = req.parent_id.clone() {
1585 let parent = personas
1586 .get(&parent_id)
1587 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1588
1589 if parent.status == PersonaStatus::Reaped {
1590 return Err(anyhow!("Parent persona {} is reaped", parent_id));
1591 }
1592
1593 parent_swarm_id = parent.identity.swarm_id.clone();
1594 computed_depth = parent.identity.depth.saturating_add(1);
1595 inherited_policy = Some(parent.policy.clone());
1596 let branch_limit = parent.policy.max_branching_factor;
1597
1598 let child_count = personas
1599 .values()
1600 .filter(|p| {
1601 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1602 && p.status != PersonaStatus::Reaped
1603 })
1604 .count();
1605
1606 if child_count as u32 >= branch_limit {
1607 return Err(anyhow!(
1608 "Parent {} reached branching limit {}",
1609 parent_id,
1610 branch_limit
1611 ));
1612 }
1613 }
1614
1615 let policy = req
1616 .policy
1617 .clone()
1618 .or(inherited_policy.clone())
1619 .unwrap_or_else(|| self.default_policy.clone());
1620
1621 let effective_depth_limit = inherited_policy
1622 .as_ref()
1623 .map(|p| p.max_spawn_depth)
1624 .unwrap_or(policy.max_spawn_depth);
1625
1626 if computed_depth > effective_depth_limit {
1627 return Err(anyhow!(
1628 "Spawn depth {} exceeds limit {}",
1629 computed_depth,
1630 effective_depth_limit
1631 ));
1632 }
1633
1634 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1635 if personas.contains_key(&persona_id) {
1636 return Err(anyhow!("Persona id already exists: {}", persona_id));
1637 }
1638
1639 let identity = PersonaIdentity {
1640 id: persona_id.clone(),
1641 name: req.name,
1642 role: req.role,
1643 charter: req.charter,
1644 swarm_id: req.swarm_id.or(parent_swarm_id),
1645 parent_id: req.parent_id,
1646 depth: computed_depth,
1647 created_at: now,
1648 tags: req.tags,
1649 };
1650
1651 let persona = PersonaRuntimeState {
1652 identity,
1653 policy,
1654 status: PersonaStatus::Active,
1655 thought_count: 0,
1656 last_tick_at: None,
1657 updated_at: now,
1658 tokens_this_window: 0,
1659 compute_ms_this_window: 0,
1660 window_started_at: now,
1661 last_progress_at: now,
1662 budget_paused: false,
1663 };
1664
1665 personas.insert(persona_id, persona.clone());
1666 drop(personas);
1667
1668 self.push_event(ThoughtEvent {
1669 id: Uuid::new_v4().to_string(),
1670 event_type: ThoughtEventType::PersonaSpawned,
1671 persona_id: Some(persona.identity.id.clone()),
1672 swarm_id: persona.identity.swarm_id.clone(),
1673 timestamp: now,
1674 payload: json!({
1675 "name": persona.identity.name,
1676 "role": persona.identity.role,
1677 "depth": persona.identity.depth,
1678 }),
1679 })
1680 .await;
1681
1682 Ok(persona)
1683 }
1684
1685 pub async fn spawn_child(
1687 &self,
1688 parent_id: &str,
1689 req: SpawnPersonaRequest,
1690 ) -> Result<PersonaRuntimeState> {
1691 let request = CreatePersonaRequest {
1692 persona_id: req.persona_id,
1693 name: req.name,
1694 role: req.role,
1695 charter: req.charter,
1696 swarm_id: req.swarm_id,
1697 parent_id: Some(parent_id.to_string()),
1698 policy: req.policy,
1699 tags: Vec::new(),
1700 };
1701 self.create_persona(request).await
1702 }
1703
1704 pub async fn reap_persona(
1706 &self,
1707 persona_id: &str,
1708 req: ReapPersonaRequest,
1709 ) -> Result<ReapPersonaResponse> {
1710 let cascade = req.cascade.unwrap_or(false);
1711 let now = Utc::now();
1712
1713 let mut personas = self.personas.write().await;
1714 if !personas.contains_key(persona_id) {
1715 return Err(anyhow!("Persona not found: {}", persona_id));
1716 }
1717
1718 let mut reaped_ids = vec![persona_id.to_string()];
1719 if cascade {
1720 let mut idx = 0usize;
1721 while idx < reaped_ids.len() {
1722 let current = reaped_ids[idx].clone();
1723 let children: Vec<String> = personas
1724 .values()
1725 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1726 .map(|p| p.identity.id.clone())
1727 .collect();
1728 for child in children {
1729 if !reaped_ids.iter().any(|existing| existing == &child) {
1730 reaped_ids.push(child);
1731 }
1732 }
1733 idx += 1;
1734 }
1735 }
1736
1737 for id in &reaped_ids {
1738 if let Some(persona) = personas.get_mut(id) {
1739 persona.status = PersonaStatus::Reaped;
1740 persona.updated_at = now;
1741 }
1742 }
1743 drop(personas);
1744
1745 for id in &reaped_ids {
1746 self.push_event(ThoughtEvent {
1747 id: Uuid::new_v4().to_string(),
1748 event_type: ThoughtEventType::PersonaReaped,
1749 persona_id: Some(id.clone()),
1750 swarm_id: None,
1751 timestamp: now,
1752 payload: json!({
1753 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1754 "cascade": cascade,
1755 }),
1756 })
1757 .await;
1758 }
1759
1760 Ok(ReapPersonaResponse {
1761 count: reaped_ids.len(),
1762 reaped_ids,
1763 })
1764 }
1765
1766 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1768 self.snapshots.read().await.back().cloned()
1769 }
1770
1771 pub async fn lineage_graph(&self) -> LineageGraph {
1773 let personas = self.personas.read().await;
1774 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1775 let mut roots = Vec::new();
1776 let mut total_edges = 0usize;
1777
1778 for persona in personas.values() {
1779 if let Some(parent_id) = persona.identity.parent_id.clone() {
1780 children_by_parent
1781 .entry(parent_id)
1782 .or_default()
1783 .push(persona.identity.id.clone());
1784 total_edges = total_edges.saturating_add(1);
1785 } else {
1786 roots.push(persona.identity.id.clone());
1787 }
1788 }
1789
1790 let mut nodes: Vec<LineageNode> = personas
1791 .values()
1792 .map(|persona| {
1793 let mut children = children_by_parent
1794 .get(&persona.identity.id)
1795 .cloned()
1796 .unwrap_or_default();
1797 children.sort();
1798
1799 LineageNode {
1800 persona_id: persona.identity.id.clone(),
1801 parent_id: persona.identity.parent_id.clone(),
1802 children,
1803 depth: persona.identity.depth,
1804 status: persona.status,
1805 }
1806 })
1807 .collect();
1808
1809 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1810 roots.sort();
1811
1812 LineageGraph {
1813 nodes,
1814 roots,
1815 total_edges,
1816 }
1817 }
1818
1819 pub async fn status(&self) -> CognitionStatus {
1821 let personas = self.personas.read().await;
1822 let events = self.events.read().await;
1823 let snapshots = self.snapshots.read().await;
1824 let started_at = *self.started_at.read().await;
1825 let last_tick_at = *self.last_tick_at.read().await;
1826 let loop_interval_ms = *self.loop_interval_ms.read().await;
1827
1828 let active_persona_count = personas
1829 .values()
1830 .filter(|p| p.status == PersonaStatus::Active)
1831 .count();
1832
1833 CognitionStatus {
1834 enabled: self.enabled,
1835 running: self.running.load(Ordering::SeqCst),
1836 loop_interval_ms,
1837 started_at,
1838 last_tick_at,
1839 persona_count: personas.len(),
1840 active_persona_count,
1841 events_buffered: events.len(),
1842 snapshots_buffered: snapshots.len(),
1843 }
1844 }
1845
1846 async fn push_event(&self, event: ThoughtEvent) {
1847 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1848 }
1849
1850 pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1852 self.tools = Some(registry);
1853 }
1854
1855 pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1857 self.beliefs.read().await.clone()
1858 }
1859
1860 pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1862 self.beliefs.read().await.get(id).cloned()
1863 }
1864
1865 pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1867 self.attention_queue.read().await.clone()
1868 }
1869
1870 pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1872 self.proposals.read().await.clone()
1873 }
1874
1875 pub async fn get_workspace(&self) -> GlobalWorkspace {
1877 self.workspace.read().await.clone()
1878 }
1879
1880 pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1882 self.receipts.read().await.clone()
1883 }
1884
1885 pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1887 let proposals = self.proposals.read().await;
1888 let proposal = proposals
1889 .get(proposal_id)
1890 .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1891
1892 if proposal.risk != ProposalRisk::Critical {
1893 return Err(anyhow!("Only Critical proposals require human approval"));
1894 }
1895 if proposal.status != ProposalStatus::Verified {
1896 return Err(anyhow!("Proposal is not in Verified status"));
1897 }
1898 drop(proposals);
1899
1900 let mut approvals = self.pending_approvals.write().await;
1901 approvals.insert(proposal_id.to_string(), true);
1902 Ok(())
1903 }
1904
1905 pub async fn get_governance(&self) -> SwarmGovernance {
1907 self.governance.read().await.clone()
1908 }
1909
1910 pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1912 self.personas.read().await.get(id).cloned()
1913 }
1914}
1915
1916async fn push_event_internal(
1917 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1918 max_events: usize,
1919 event_tx: &broadcast::Sender<ThoughtEvent>,
1920 event: ThoughtEvent,
1921) {
1922 {
1923 let mut lock = events.write().await;
1924 lock.push_back(event.clone());
1925 while lock.len() > max_events {
1926 lock.pop_front();
1927 }
1928 }
1929 let _ = event_tx.send(event);
1930}
1931
1932async fn push_snapshot_internal(
1933 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1934 max_snapshots: usize,
1935 snapshot: MemorySnapshot,
1936) {
1937 let mut lock = snapshots.write().await;
1938 lock.push_back(snapshot);
1939 while lock.len() > max_snapshots {
1940 lock.pop_front();
1941 }
1942}
1943
1944async fn recent_persona_context(
1945 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1946 persona_id: &str,
1947 limit: usize,
1948) -> Vec<ThoughtEvent> {
1949 let lock = events.read().await;
1950 let mut selected: Vec<ThoughtEvent> = lock
1951 .iter()
1952 .rev()
1953 .filter(|event| {
1954 event.persona_id.as_deref() == Some(persona_id)
1955 || (event.persona_id.is_none()
1956 && matches!(
1957 event.event_type,
1958 ThoughtEventType::CheckResult
1959 | ThoughtEventType::ProposalCreated
1960 | ThoughtEventType::SnapshotCompressed
1961 | ThoughtEventType::WorkspaceUpdated
1962 | ThoughtEventType::ActionExecuted
1963 | ThoughtEventType::BudgetPaused
1964 ))
1965 })
1966 .take(limit)
1967 .cloned()
1968 .collect();
1969 selected.reverse();
1970 selected
1971}
1972
1973async fn generate_phase_thought(
1974 thinker: Option<&ThinkerClient>,
1975 work: &ThoughtWorkItem,
1976 context: &[ThoughtEvent],
1977) -> ThoughtResult {
1978 let started_at = Instant::now();
1979 if let Some(client) = thinker {
1980 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1981 match client.think(&system_prompt, &user_prompt).await {
1982 Ok(output) => {
1983 let thinking = normalize_thought_output(work, context, &output.text);
1984 if !thinking.is_empty() {
1985 return ThoughtResult {
1986 source: "model",
1987 model: Some(output.model),
1988 finish_reason: output.finish_reason,
1989 thinking,
1990 prompt_tokens: output.prompt_tokens,
1991 completion_tokens: output.completion_tokens,
1992 total_tokens: output.total_tokens,
1993 latency_ms: started_at.elapsed().as_millis(),
1994 error: None,
1995 };
1996 }
1997 }
1998 Err(error) => {
1999 return ThoughtResult {
2000 source: "fallback",
2001 model: None,
2002 finish_reason: None,
2003 thinking: fallback_phase_text(work, context),
2004 prompt_tokens: None,
2005 completion_tokens: None,
2006 total_tokens: None,
2007 latency_ms: started_at.elapsed().as_millis(),
2008 error: Some(error.to_string()),
2009 };
2010 }
2011 }
2012 }
2013
2014 ThoughtResult {
2015 source: "fallback",
2016 model: None,
2017 finish_reason: None,
2018 thinking: fallback_phase_text(work, context),
2019 prompt_tokens: None,
2020 completion_tokens: None,
2021 total_tokens: None,
2022 latency_ms: started_at.elapsed().as_millis(),
2023 error: None,
2024 }
2025}
2026
2027fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
2028 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
2029Respond with concise plain text only. Do not include markdown, XML, or code fences. \
2030Write as an operational process update, not meta narration. \
2031Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
2032Output concrete findings, checks, risks, and next actions. \
2033Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
2034 .to_string();
2035
2036 let context_lines = if context.is_empty() {
2037 "none".to_string()
2038 } else {
2039 context
2040 .iter()
2041 .map(format_context_event)
2042 .collect::<Vec<_>>()
2043 .join("\n")
2044 };
2045
2046 let phase_instruction = match work.phase {
2047 ThoughtPhase::Observe => {
2048 "Process format (exact line labels): \
2049Phase: Observe | Goal: detect current customer/business risk | \
2050Signals: 1-3 concrete signals separated by '; ' | \
2051Uncertainty: one unknown that blocks confidence | \
2052Next_Action: one immediate operational action."
2053 }
2054 ThoughtPhase::Reflect => {
2055 "Process format (exact line labels): \
2056Phase: Reflect | Hypothesis: single testable hypothesis | \
2057Rationale: why this is likely | \
2058Business_Risk: customer/revenue/SLA impact | \
2059Validation_Next_Action: one action to confirm or falsify."
2060 }
2061 ThoughtPhase::Test => {
2062 "Process format (exact line labels): \
2063Phase: Test | Check: single concrete check | \
2064Procedure: short executable procedure | \
2065Expected_Result: pass/fail expectation | \
2066Evidence_Quality: low|medium|high with reason | \
2067Escalation_Trigger: when to escalate immediately."
2068 }
2069 ThoughtPhase::Compress => {
2070 "Process format (exact line labels): \
2071Phase: Compress | State_Summary: current state in one line | \
2072Retained_Facts: 3 short facts separated by '; ' | \
2073Open_Risks: up to 2 unresolved risks separated by '; ' | \
2074Next_Process_Step: next operational step."
2075 }
2076 };
2077
2078 let user_prompt = format!(
2079 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2080 phase = work.phase.as_str(),
2081 persona_id = work.persona_id,
2082 persona_name = work.persona_name,
2083 role = work.role,
2084 charter = work.charter,
2085 thought_count = work.thought_count,
2086 context = context_lines,
2087 instruction = phase_instruction
2088 );
2089
2090 (system_prompt, user_prompt)
2091}
2092
2093fn format_context_event(event: &ThoughtEvent) -> String {
2094 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2095 format!(
2096 "{} {} {}",
2097 event.event_type.as_str(),
2098 event.timestamp.to_rfc3339(),
2099 trim_for_storage(&payload, 220)
2100 )
2101}
2102
2103fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2104 let charter = trim_for_storage(&work.charter, 180);
2105 let context_summary = fallback_context_summary(context);
2106 let thought = match work.phase {
2107 ThoughtPhase::Observe => format!(
2108 "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2109 work.role, charter, context_summary
2110 ),
2111 ThoughtPhase::Reflect => format!(
2112 "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
2113 ),
2114 ThoughtPhase::Test => format!(
2115 "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
2116 ),
2117 ThoughtPhase::Compress => format!(
2118 "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2119 work.role, charter, context_summary
2120 ),
2121 };
2122 trim_for_storage(&thought, 1_200)
2123}
2124
2125fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2126 let trimmed = trim_for_storage(raw, 2_000);
2127 if trimmed.trim().is_empty() {
2128 return fallback_phase_text(work, context);
2129 }
2130
2131 if let Some(idx) = find_process_label_start(&trimmed) {
2133 let candidate = trimmed[idx..].trim();
2134 if !candidate.is_empty()
2136 && !candidate.contains('<')
2137 && !has_template_placeholder_values(candidate)
2138 {
2139 let collapsed: String = candidate
2141 .lines()
2142 .map(str::trim)
2143 .filter(|l| !l.is_empty())
2144 .collect::<Vec<_>>()
2145 .join(" | ");
2146 let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2147 if cleaned.starts_with("Phase:") {
2148 return cleaned.to_string();
2149 }
2150 return collapsed;
2151 }
2152 }
2153
2154 let lower = trimmed.to_ascii_lowercase();
2155 let looks_meta = lower.starts_with("we need")
2156 || lower.starts_with("i need")
2157 || lower.contains("we need to")
2158 || lower.contains("i need to")
2159 || lower.contains("must output")
2160 || lower.contains("let's ")
2161 || lower.contains("we have to");
2162
2163 if looks_meta || has_template_placeholder_values(&trimmed) {
2164 return fallback_phase_text(work, context);
2165 }
2166
2167 trimmed
2168}
2169
2170fn has_template_placeholder_values(text: &str) -> bool {
2171 let lower = text.to_ascii_lowercase();
2172 [
2173 "goal: ...",
2174 "signals: ...",
2175 "uncertainty: ...",
2176 "next_action: ...",
2177 "hypothesis: ...",
2178 "rationale: ...",
2179 "business_risk: ...",
2180 "validation_next_action: ...",
2181 "check: ...",
2182 "procedure: ...",
2183 "expected_result: ...",
2184 "evidence_quality: ...",
2185 "escalation_trigger: ...",
2186 "state_summary: ...",
2187 "retained_facts: ...",
2188 "open_risks: ...",
2189 "next_process_step: ...",
2190 ]
2191 .iter()
2192 .any(|needle| lower.contains(needle))
2193 || lower.contains("<...")
2194 || lower.contains("tbd")
2195 || lower.contains("todo")
2196}
2197
2198fn find_process_label_start(text: &str) -> Option<usize> {
2199 [
2200 "Phase: Observe",
2201 "Phase: Reflect",
2202 "Phase: Test",
2203 "Phase: Compress",
2204 "Phase:",
2205 ]
2206 .iter()
2207 .filter_map(|label| text.find(label))
2208 .min()
2209}
2210
2211fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2212 if context.is_empty() {
2213 return "No prior events recorded yet.".to_string();
2214 }
2215
2216 let mut latest_error: Option<String> = None;
2217 let mut latest_proposal: Option<String> = None;
2218 let mut latest_check: Option<String> = None;
2219
2220 for event in context.iter().rev() {
2221 if latest_error.is_none()
2222 && let Some(error) = event
2223 .payload
2224 .get("error")
2225 .and_then(serde_json::Value::as_str)
2226 && !error.trim().is_empty()
2227 {
2228 latest_error = Some(trim_for_storage(error, 140));
2229 }
2230
2231 if latest_proposal.is_none()
2232 && event.event_type == ThoughtEventType::ProposalCreated
2233 && let Some(title) = event
2234 .payload
2235 .get("title")
2236 .and_then(serde_json::Value::as_str)
2237 && !title.trim().is_empty()
2238 && !has_template_placeholder_values(title)
2239 {
2240 latest_proposal = Some(trim_for_storage(title, 120));
2241 }
2242
2243 if latest_check.is_none()
2244 && event.event_type == ThoughtEventType::CheckResult
2245 && let Some(result) = event
2246 .payload
2247 .get("result_excerpt")
2248 .and_then(serde_json::Value::as_str)
2249 && !result.trim().is_empty()
2250 && !has_template_placeholder_values(result)
2251 {
2252 latest_check = Some(trim_for_storage(result, 140));
2253 }
2254
2255 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2256 break;
2257 }
2258 }
2259
2260 let mut lines = vec![format!(
2261 "{} recent cognition events are available.",
2262 context.len()
2263 )];
2264 if let Some(error) = latest_error {
2265 lines.push(format!("Latest error signal: {}.", error));
2266 }
2267 if let Some(proposal) = latest_proposal {
2268 lines.push(format!("Recent proposal: {}.", proposal));
2269 }
2270 if let Some(check) = latest_check {
2271 lines.push(format!("Recent check: {}.", check));
2272 }
2273 lines.join(" ")
2274}
2275
2276fn trim_for_storage(input: &str, max_chars: usize) -> String {
2277 if input.chars().count() <= max_chars {
2278 return input.trim().to_string();
2279 }
2280 let mut trimmed = String::with_capacity(max_chars + 8);
2281 for ch in input.chars().take(max_chars) {
2282 trimmed.push(ch);
2283 }
2284 trimmed.push_str("...");
2285 trimmed.trim().to_string()
2286}
2287
2288fn estimate_fact_count(text: &str) -> usize {
2289 let sentence_count =
2290 text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2291 sentence_count.clamp(1, 12)
2292}
2293
2294fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2295 let first_line = thought
2296 .lines()
2297 .find(|line| !line.trim().is_empty())
2298 .unwrap_or("proposal");
2299 let compact = first_line
2300 .replace(['\t', '\r', '\n'], " ")
2301 .split_whitespace()
2302 .collect::<Vec<_>>()
2303 .join(" ");
2304 let trimmed = trim_for_storage(&compact, 72);
2305 if trimmed.is_empty() {
2306 format!("proposal-{}", thought_count)
2307 } else {
2308 trimmed
2309 }
2310}
2311
2312fn default_seed_persona() -> CreatePersonaRequest {
2313 CreatePersonaRequest {
2314 persona_id: Some("root-thinker".to_string()),
2315 name: "root-thinker".to_string(),
2316 role: "orchestrator".to_string(),
2317 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2318 .to_string(),
2319 swarm_id: Some("swarm-core".to_string()),
2320 parent_id: None,
2321 policy: None,
2322 tags: vec!["orchestration".to_string()],
2323 }
2324}
2325
2326fn normalize_thinker_endpoint(base_url: &str) -> String {
2327 let trimmed = base_url.trim().trim_end_matches('/');
2328 if trimmed.ends_with("/chat/completions") {
2329 return trimmed.to_string();
2330 }
2331 if trimmed.is_empty() {
2332 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2333 }
2334 format!("{}/chat/completions", trimmed)
2335}
2336
2337fn env_bool(name: &str, default: bool) -> bool {
2338 std::env::var(name)
2339 .ok()
2340 .and_then(|v| match v.to_ascii_lowercase().as_str() {
2341 "1" | "true" | "yes" | "on" => Some(true),
2342 "0" | "false" | "no" | "off" => Some(false),
2343 _ => None,
2344 })
2345 .unwrap_or(default)
2346}
2347
2348fn env_f32(name: &str, default: f32) -> f32 {
2349 std::env::var(name)
2350 .ok()
2351 .and_then(|v| v.parse::<f32>().ok())
2352 .unwrap_or(default)
2353}
2354
2355fn env_u64(name: &str, default: u64) -> u64 {
2356 std::env::var(name)
2357 .ok()
2358 .and_then(|v| v.parse::<u64>().ok())
2359 .unwrap_or(default)
2360}
2361
2362fn env_u32(name: &str, default: u32) -> u32 {
2363 std::env::var(name)
2364 .ok()
2365 .and_then(|v| v.parse::<u32>().ok())
2366 .unwrap_or(default)
2367}
2368
2369fn env_usize(name: &str, default: usize) -> usize {
2370 std::env::var(name)
2371 .ok()
2372 .and_then(|v| v.parse::<usize>().ok())
2373 .unwrap_or(default)
2374}
2375
2376#[cfg(test)]
2377mod tests {
2378 use super::*;
2379
2380 fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2381 ThoughtWorkItem {
2382 persona_id: "p-1".to_string(),
2383 persona_name: "Spotlessbinco Business Thinker".to_string(),
2384 role: "principal reliability engineer".to_string(),
2385 charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2386 .to_string(),
2387 swarm_id: Some("spotlessbinco".to_string()),
2388 thought_count: 4,
2389 phase,
2390 }
2391 }
2392
2393 fn test_runtime() -> CognitionRuntime {
2394 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2395 enabled: true,
2396 loop_interval_ms: 25,
2397 max_events: 256,
2398 max_snapshots: 32,
2399 default_policy: PersonaPolicy {
2400 max_spawn_depth: 2,
2401 max_branching_factor: 2,
2402 token_budget_per_minute: 1_000,
2403 compute_ms_per_minute: 1_000,
2404 idle_ttl_secs: 300,
2405 share_memory: false,
2406 allowed_tools: Vec::new(),
2407 },
2408 })
2409 }
2410
2411 #[test]
2412 fn normalize_rejects_placeholder_process_line() {
2413 let work = sample_work_item(ThoughtPhase::Compress);
2414 let output = normalize_thought_output(
2415 &work,
2416 &[],
2417 "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2418 );
2419 assert!(
2420 output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2421 );
2422 assert!(!output.contains("State_Summary: ..."));
2423 }
2424
2425 #[test]
2426 fn normalize_accepts_concrete_process_line() {
2427 let work = sample_work_item(ThoughtPhase::Test);
2428 let output = normalize_thought_output(
2429 &work,
2430 &[],
2431 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2432 );
2433 assert_eq!(
2434 output,
2435 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2436 );
2437 }
2438
2439 #[tokio::test]
2440 async fn create_spawn_and_lineage_work() {
2441 let runtime = test_runtime();
2442
2443 let root = runtime
2444 .create_persona(CreatePersonaRequest {
2445 persona_id: Some("root".to_string()),
2446 name: "root".to_string(),
2447 role: "orchestrator".to_string(),
2448 charter: "coordinate".to_string(),
2449 swarm_id: Some("swarm-a".to_string()),
2450 parent_id: None,
2451 policy: None,
2452 tags: Vec::new(),
2453 })
2454 .await
2455 .expect("root should be created");
2456
2457 assert_eq!(root.identity.depth, 0);
2458
2459 let child = runtime
2460 .spawn_child(
2461 "root",
2462 SpawnPersonaRequest {
2463 persona_id: Some("child-1".to_string()),
2464 name: "child-1".to_string(),
2465 role: "analyst".to_string(),
2466 charter: "analyze".to_string(),
2467 swarm_id: None,
2468 policy: None,
2469 },
2470 )
2471 .await
2472 .expect("child should spawn");
2473
2474 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2475 assert_eq!(child.identity.depth, 1);
2476
2477 let lineage = runtime.lineage_graph().await;
2478 assert_eq!(lineage.total_edges, 1);
2479 assert_eq!(lineage.roots, vec!["root".to_string()]);
2480 }
2481
2482 #[tokio::test]
2483 async fn branching_and_depth_limits_are_enforced() {
2484 let runtime = test_runtime();
2485
2486 runtime
2487 .create_persona(CreatePersonaRequest {
2488 persona_id: Some("root".to_string()),
2489 name: "root".to_string(),
2490 role: "orchestrator".to_string(),
2491 charter: "coordinate".to_string(),
2492 swarm_id: Some("swarm-a".to_string()),
2493 parent_id: None,
2494 policy: None,
2495 tags: Vec::new(),
2496 })
2497 .await
2498 .expect("root should be created");
2499
2500 runtime
2501 .spawn_child(
2502 "root",
2503 SpawnPersonaRequest {
2504 persona_id: Some("c1".to_string()),
2505 name: "c1".to_string(),
2506 role: "worker".to_string(),
2507 charter: "run".to_string(),
2508 swarm_id: None,
2509 policy: None,
2510 },
2511 )
2512 .await
2513 .expect("first child should spawn");
2514
2515 runtime
2516 .spawn_child(
2517 "root",
2518 SpawnPersonaRequest {
2519 persona_id: Some("c2".to_string()),
2520 name: "c2".to_string(),
2521 role: "worker".to_string(),
2522 charter: "run".to_string(),
2523 swarm_id: None,
2524 policy: None,
2525 },
2526 )
2527 .await
2528 .expect("second child should spawn");
2529
2530 let third_child = runtime
2531 .spawn_child(
2532 "root",
2533 SpawnPersonaRequest {
2534 persona_id: Some("c3".to_string()),
2535 name: "c3".to_string(),
2536 role: "worker".to_string(),
2537 charter: "run".to_string(),
2538 swarm_id: None,
2539 policy: None,
2540 },
2541 )
2542 .await;
2543 assert!(third_child.is_err());
2544
2545 runtime
2546 .spawn_child(
2547 "c1",
2548 SpawnPersonaRequest {
2549 persona_id: Some("c1-1".to_string()),
2550 name: "c1-1".to_string(),
2551 role: "worker".to_string(),
2552 charter: "run".to_string(),
2553 swarm_id: None,
2554 policy: None,
2555 },
2556 )
2557 .await
2558 .expect("depth 2 should be allowed");
2559
2560 let depth_violation = runtime
2561 .spawn_child(
2562 "c1-1",
2563 SpawnPersonaRequest {
2564 persona_id: Some("c1-1-1".to_string()),
2565 name: "c1-1-1".to_string(),
2566 role: "worker".to_string(),
2567 charter: "run".to_string(),
2568 swarm_id: None,
2569 policy: None,
2570 },
2571 )
2572 .await;
2573 assert!(depth_violation.is_err());
2574 }
2575
2576 #[tokio::test]
2577 async fn start_stop_updates_runtime_status() {
2578 let runtime = test_runtime();
2579
2580 runtime
2581 .start(Some(StartCognitionRequest {
2582 loop_interval_ms: Some(10),
2583 seed_persona: Some(CreatePersonaRequest {
2584 persona_id: Some("seed".to_string()),
2585 name: "seed".to_string(),
2586 role: "watcher".to_string(),
2587 charter: "observe".to_string(),
2588 swarm_id: Some("swarm-seed".to_string()),
2589 parent_id: None,
2590 policy: None,
2591 tags: Vec::new(),
2592 }),
2593 }))
2594 .await
2595 .expect("runtime should start");
2596
2597 tokio::time::sleep(Duration::from_millis(60)).await;
2598 let running_status = runtime.status().await;
2599 assert!(running_status.running);
2600 assert!(running_status.events_buffered > 0);
2601
2602 runtime
2603 .stop(Some("test".to_string()))
2604 .await
2605 .expect("runtime should stop");
2606 let stopped_status = runtime.status().await;
2607 assert!(!stopped_status.running);
2608 }
2609
2610 #[tokio::test]
2611 async fn zero_budget_persona_is_skipped() {
2612 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2613 enabled: true,
2614 loop_interval_ms: 10,
2615 max_events: 256,
2616 max_snapshots: 32,
2617 default_policy: PersonaPolicy {
2618 max_spawn_depth: 2,
2619 max_branching_factor: 2,
2620 token_budget_per_minute: 0,
2621 compute_ms_per_minute: 0,
2622 idle_ttl_secs: 3_600,
2623 share_memory: false,
2624 allowed_tools: Vec::new(),
2625 },
2626 });
2627
2628 let persona = runtime
2629 .create_persona(CreatePersonaRequest {
2630 persona_id: Some("budget-test".to_string()),
2631 name: "budget-test".to_string(),
2632 role: "tester".to_string(),
2633 charter: "test budgets".to_string(),
2634 swarm_id: None,
2635 parent_id: None,
2636 policy: None,
2637 tags: Vec::new(),
2638 })
2639 .await
2640 .expect("should create persona");
2641
2642 assert_eq!(persona.tokens_this_window, 0);
2643 assert_eq!(persona.compute_ms_this_window, 0);
2644
2645 runtime.start(None).await.expect("should start");
2647 tokio::time::sleep(Duration::from_millis(50)).await;
2648 runtime.stop(None).await.expect("should stop");
2649
2650 let p = runtime.get_persona("budget-test").await.unwrap();
2652 assert_eq!(p.thought_count, 0);
2653 assert!(p.budget_paused);
2654 }
2655
2656 #[tokio::test]
2657 async fn budget_counters_reset_after_window() {
2658 let now = Utc::now();
2659 let mut persona = PersonaRuntimeState {
2660 identity: PersonaIdentity {
2661 id: "p1".to_string(),
2662 name: "test".to_string(),
2663 role: "tester".to_string(),
2664 charter: "test".to_string(),
2665 swarm_id: None,
2666 parent_id: None,
2667 depth: 0,
2668 created_at: now,
2669 tags: Vec::new(),
2670 },
2671 policy: PersonaPolicy::default(),
2672 status: PersonaStatus::Active,
2673 thought_count: 0,
2674 last_tick_at: None,
2675 updated_at: now,
2676 tokens_this_window: 5000,
2677 compute_ms_this_window: 3000,
2678 window_started_at: now - ChronoDuration::seconds(61),
2679 last_progress_at: now,
2680 budget_paused: false,
2681 };
2682
2683 let window_elapsed = now
2685 .signed_duration_since(persona.window_started_at)
2686 .num_seconds();
2687 assert!(window_elapsed >= 60);
2688
2689 persona.tokens_this_window = 0;
2691 persona.compute_ms_this_window = 0;
2692 persona.window_started_at = now;
2693
2694 assert_eq!(persona.tokens_this_window, 0);
2695 assert_eq!(persona.compute_ms_this_window, 0);
2696 }
2697
2698 #[tokio::test]
2699 async fn idle_persona_is_reaped() {
2700 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2701 enabled: true,
2702 loop_interval_ms: 10,
2703 max_events: 256,
2704 max_snapshots: 32,
2705 default_policy: PersonaPolicy {
2706 max_spawn_depth: 2,
2707 max_branching_factor: 2,
2708 token_budget_per_minute: 20_000,
2709 compute_ms_per_minute: 10_000,
2710 idle_ttl_secs: 0, share_memory: false,
2712 allowed_tools: Vec::new(),
2713 },
2714 });
2715
2716 runtime
2717 .create_persona(CreatePersonaRequest {
2718 persona_id: Some("idle-test".to_string()),
2719 name: "idle-test".to_string(),
2720 role: "idler".to_string(),
2721 charter: "idle away".to_string(),
2722 swarm_id: None,
2723 parent_id: None,
2724 policy: None,
2725 tags: Vec::new(),
2726 })
2727 .await
2728 .expect("should create persona");
2729
2730 {
2732 let mut personas = runtime.personas.write().await;
2733 if let Some(p) = personas.get_mut("idle-test") {
2734 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2735 }
2736 }
2737
2738 runtime.start(None).await.expect("should start");
2739 tokio::time::sleep(Duration::from_millis(100)).await;
2740 runtime.stop(None).await.expect("should stop");
2741
2742 let p = runtime.get_persona("idle-test").await.unwrap();
2743 assert_eq!(p.status, PersonaStatus::Reaped);
2744 }
2745
2746 #[tokio::test]
2747 async fn budget_paused_persona_not_reaped_for_idle() {
2748 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2749 enabled: true,
2750 loop_interval_ms: 10,
2751 max_events: 256,
2752 max_snapshots: 32,
2753 default_policy: PersonaPolicy {
2754 max_spawn_depth: 2,
2755 max_branching_factor: 2,
2756 token_budget_per_minute: 0, compute_ms_per_minute: 0,
2758 idle_ttl_secs: 0, share_memory: false,
2760 allowed_tools: Vec::new(),
2761 },
2762 });
2763
2764 runtime
2765 .create_persona(CreatePersonaRequest {
2766 persona_id: Some("paused-test".to_string()),
2767 name: "paused-test".to_string(),
2768 role: "pauser".to_string(),
2769 charter: "pause".to_string(),
2770 swarm_id: None,
2771 parent_id: None,
2772 policy: None,
2773 tags: Vec::new(),
2774 })
2775 .await
2776 .expect("should create persona");
2777
2778 {
2780 let mut personas = runtime.personas.write().await;
2781 if let Some(p) = personas.get_mut("paused-test") {
2782 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2783 }
2784 }
2785
2786 runtime.start(None).await.expect("should start");
2787 tokio::time::sleep(Duration::from_millis(100)).await;
2788 runtime.stop(None).await.expect("should stop");
2789
2790 let p = runtime.get_persona("paused-test").await.unwrap();
2791 assert_eq!(p.status, PersonaStatus::Active);
2793 assert!(p.budget_paused);
2794 }
2795
2796 #[tokio::test]
2797 async fn governance_proposal_resolution() {
2798 let runtime = test_runtime();
2799 let gov = SwarmGovernance {
2800 quorum_fraction: 0.5,
2801 required_approvers_by_role: HashMap::new(),
2802 veto_roles: vec!["auditor".to_string()],
2803 vote_timeout_secs: 300,
2804 };
2805 *runtime.governance.write().await = gov;
2806
2807 runtime
2809 .create_persona(CreatePersonaRequest {
2810 persona_id: Some("voter-1".to_string()),
2811 name: "voter-1".to_string(),
2812 role: "engineer".to_string(),
2813 charter: "vote".to_string(),
2814 swarm_id: None,
2815 parent_id: None,
2816 policy: None,
2817 tags: Vec::new(),
2818 })
2819 .await
2820 .unwrap();
2821 runtime
2822 .create_persona(CreatePersonaRequest {
2823 persona_id: Some("voter-2".to_string()),
2824 name: "voter-2".to_string(),
2825 role: "engineer".to_string(),
2826 charter: "vote".to_string(),
2827 swarm_id: None,
2828 parent_id: None,
2829 policy: None,
2830 tags: Vec::new(),
2831 })
2832 .await
2833 .unwrap();
2834
2835 {
2837 let mut proposals = runtime.proposals.write().await;
2838 let mut votes = HashMap::new();
2839 votes.insert("voter-1".to_string(), ProposalVote::Approve);
2840 proposals.insert(
2841 "prop-1".to_string(),
2842 Proposal {
2843 id: "prop-1".to_string(),
2844 persona_id: "voter-1".to_string(),
2845 title: "test proposal".to_string(),
2846 rationale: "testing governance".to_string(),
2847 evidence_refs: Vec::new(),
2848 risk: ProposalRisk::Low,
2849 status: ProposalStatus::Created,
2850 created_at: Utc::now(),
2851 votes,
2852 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2853 votes_requested: true,
2854 quorum_needed: 1,
2855 },
2856 );
2857 }
2858
2859 runtime.start(None).await.unwrap();
2861 tokio::time::sleep(Duration::from_millis(100)).await;
2862 runtime.stop(None).await.unwrap();
2863
2864 let proposals = runtime.get_proposals().await;
2865 let prop = proposals.get("prop-1").unwrap();
2866 assert!(
2868 prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2869 "Expected Verified or Executed, got {:?}",
2870 prop.status
2871 );
2872 }
2873
2874 #[tokio::test]
2875 async fn veto_rejects_proposal() {
2876 let runtime = test_runtime();
2877 let gov = SwarmGovernance {
2878 quorum_fraction: 0.5,
2879 required_approvers_by_role: HashMap::new(),
2880 veto_roles: vec!["auditor".to_string()],
2881 vote_timeout_secs: 300,
2882 };
2883 *runtime.governance.write().await = gov;
2884
2885 runtime
2886 .create_persona(CreatePersonaRequest {
2887 persona_id: Some("eng".to_string()),
2888 name: "eng".to_string(),
2889 role: "engineer".to_string(),
2890 charter: "build".to_string(),
2891 swarm_id: None,
2892 parent_id: None,
2893 policy: None,
2894 tags: Vec::new(),
2895 })
2896 .await
2897 .unwrap();
2898 runtime
2899 .create_persona(CreatePersonaRequest {
2900 persona_id: Some("aud".to_string()),
2901 name: "aud".to_string(),
2902 role: "auditor".to_string(),
2903 charter: "audit".to_string(),
2904 swarm_id: None,
2905 parent_id: None,
2906 policy: None,
2907 tags: Vec::new(),
2908 })
2909 .await
2910 .unwrap();
2911
2912 {
2913 let mut proposals = runtime.proposals.write().await;
2914 let mut votes = HashMap::new();
2915 votes.insert("eng".to_string(), ProposalVote::Approve);
2916 votes.insert("aud".to_string(), ProposalVote::Veto);
2917 proposals.insert(
2918 "prop-veto".to_string(),
2919 Proposal {
2920 id: "prop-veto".to_string(),
2921 persona_id: "eng".to_string(),
2922 title: "vetoed proposal".to_string(),
2923 rationale: "testing veto".to_string(),
2924 evidence_refs: Vec::new(),
2925 risk: ProposalRisk::Low,
2926 status: ProposalStatus::Created,
2927 created_at: Utc::now(),
2928 votes,
2929 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2930 votes_requested: true,
2931 quorum_needed: 1,
2932 },
2933 );
2934 }
2935
2936 runtime.start(None).await.unwrap();
2937 tokio::time::sleep(Duration::from_millis(100)).await;
2938 runtime.stop(None).await.unwrap();
2939
2940 let proposals = runtime.get_proposals().await;
2941 let prop = proposals.get("prop-veto").unwrap();
2942 assert_eq!(prop.status, ProposalStatus::Rejected);
2943 }
2944
2945 #[test]
2946 fn global_workspace_default() {
2947 let ws = GlobalWorkspace::default();
2948 assert!(ws.top_beliefs.is_empty());
2949 assert!(ws.top_uncertainties.is_empty());
2950 assert!(ws.top_attention.is_empty());
2951 }
2952
2953 #[test]
2954 fn attention_item_creation() {
2955 let item = AttentionItem {
2956 id: "a1".to_string(),
2957 topic: "test topic".to_string(),
2958 topic_tags: vec!["reliability".to_string()],
2959 priority: 0.8,
2960 source_type: AttentionSource::ContestedBelief,
2961 source_id: "b1".to_string(),
2962 assigned_persona: None,
2963 created_at: Utc::now(),
2964 resolved_at: None,
2965 };
2966 assert!(item.resolved_at.is_none());
2967 assert_eq!(item.priority, 0.8);
2968 }
2969}