1pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16pub use thinker::{
17 CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
18};
19
20use crate::tool::ToolRegistry;
21use anyhow::{Result, anyhow};
22use beliefs::{Belief, BeliefStatus};
23use chrono::{DateTime, Duration as ChronoDuration, Utc};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::time::Duration;
30use tokio::sync::{Mutex, RwLock, broadcast};
31use tokio::task::JoinHandle;
32use tokio::time::Instant;
33use uuid::Uuid;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub enum PersonaStatus {
39 Active,
40 Idle,
41 Reaped,
42 Error,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct PersonaPolicy {
48 pub max_spawn_depth: u32,
49 pub max_branching_factor: u32,
50 pub token_budget_per_minute: u32,
51 pub compute_ms_per_minute: u32,
52 pub idle_ttl_secs: u64,
53 pub share_memory: bool,
54 #[serde(default)]
55 pub allowed_tools: Vec<String>,
56}
57
58impl Default for PersonaPolicy {
59 fn default() -> Self {
60 Self {
61 max_spawn_depth: 4,
62 max_branching_factor: 4,
63 token_budget_per_minute: 20_000,
64 compute_ms_per_minute: 10_000,
65 idle_ttl_secs: 3_600,
66 share_memory: false,
67 allowed_tools: Vec::new(),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct PersonaIdentity {
75 pub id: String,
76 pub name: String,
77 pub role: String,
78 pub charter: String,
79 pub swarm_id: Option<String>,
80 pub parent_id: Option<String>,
81 pub depth: u32,
82 pub created_at: DateTime<Utc>,
83 #[serde(default)]
84 pub tags: Vec<String>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PersonaRuntimeState {
90 pub identity: PersonaIdentity,
91 pub policy: PersonaPolicy,
92 pub status: PersonaStatus,
93 pub thought_count: u64,
94 pub last_tick_at: Option<DateTime<Utc>>,
95 pub updated_at: DateTime<Utc>,
96 pub tokens_this_window: u32,
98 pub compute_ms_this_window: u32,
100 pub window_started_at: DateTime<Utc>,
102 pub last_progress_at: DateTime<Utc>,
104 #[serde(default)]
106 pub budget_paused: bool,
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
111#[serde(rename_all = "snake_case")]
112pub enum ProposalRisk {
113 Low,
114 Medium,
115 High,
116 Critical,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "snake_case")]
122pub enum ProposalStatus {
123 Created,
124 Verified,
125 Rejected,
126 Executed,
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub enum ProposalVote {
133 Approve,
134 Reject,
135 Veto,
136 Abstain,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct Proposal {
142 pub id: String,
143 pub persona_id: String,
144 pub title: String,
145 pub rationale: String,
146 pub evidence_refs: Vec<String>,
147 pub risk: ProposalRisk,
148 pub status: ProposalStatus,
149 pub created_at: DateTime<Utc>,
150 #[serde(default)]
152 pub votes: HashMap<String, ProposalVote>,
153 pub vote_deadline: Option<DateTime<Utc>>,
155 #[serde(default)]
157 pub votes_requested: bool,
158 #[serde(default)]
160 pub quorum_needed: usize,
161}
162
163#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub enum ThoughtEventType {
167 ThoughtGenerated,
168 HypothesisRaised,
169 CheckRequested,
170 CheckResult,
171 ProposalCreated,
172 ProposalVerified,
173 ProposalRejected,
174 ActionExecuted,
175 PersonaSpawned,
176 PersonaReaped,
177 SnapshotCompressed,
178 BeliefExtracted,
179 BeliefContested,
180 BeliefRevalidated,
181 BudgetPaused,
182 IdleReaped,
183 AttentionCreated,
184 VoteCast,
185 WorkspaceUpdated,
186}
187
188impl ThoughtEventType {
189 fn as_str(&self) -> &'static str {
190 match self {
191 Self::ThoughtGenerated => "thought_generated",
192 Self::HypothesisRaised => "hypothesis_raised",
193 Self::CheckRequested => "check_requested",
194 Self::CheckResult => "check_result",
195 Self::ProposalCreated => "proposal_created",
196 Self::ProposalVerified => "proposal_verified",
197 Self::ProposalRejected => "proposal_rejected",
198 Self::ActionExecuted => "action_executed",
199 Self::PersonaSpawned => "persona_spawned",
200 Self::PersonaReaped => "persona_reaped",
201 Self::SnapshotCompressed => "snapshot_compressed",
202 Self::BeliefExtracted => "belief_extracted",
203 Self::BeliefContested => "belief_contested",
204 Self::BeliefRevalidated => "belief_revalidated",
205 Self::BudgetPaused => "budget_paused",
206 Self::IdleReaped => "idle_reaped",
207 Self::AttentionCreated => "attention_created",
208 Self::VoteCast => "vote_cast",
209 Self::WorkspaceUpdated => "workspace_updated",
210 }
211 }
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct ThoughtEvent {
217 pub id: String,
218 pub event_type: ThoughtEventType,
219 pub persona_id: Option<String>,
220 pub swarm_id: Option<String>,
221 pub timestamp: DateTime<Utc>,
222 pub payload: serde_json::Value,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct MemorySnapshot {
228 pub id: String,
229 pub generated_at: DateTime<Utc>,
230 pub swarm_id: Option<String>,
231 pub persona_scope: Vec<String>,
232 pub summary: String,
233 pub hot_event_count: usize,
234 pub warm_fact_count: usize,
235 pub cold_snapshot_count: usize,
236 pub metadata: HashMap<String, serde_json::Value>,
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct CreatePersonaRequest {
242 pub persona_id: Option<String>,
243 pub name: String,
244 pub role: String,
245 pub charter: String,
246 pub swarm_id: Option<String>,
247 pub parent_id: Option<String>,
248 pub policy: Option<PersonaPolicy>,
249 #[serde(default)]
250 pub tags: Vec<String>,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct SpawnPersonaRequest {
256 pub persona_id: Option<String>,
257 pub name: String,
258 pub role: String,
259 pub charter: String,
260 pub swarm_id: Option<String>,
261 pub policy: Option<PersonaPolicy>,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct ReapPersonaRequest {
267 pub cascade: Option<bool>,
268 pub reason: Option<String>,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct StartCognitionRequest {
274 pub loop_interval_ms: Option<u64>,
275 pub seed_persona: Option<CreatePersonaRequest>,
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct StopCognitionRequest {
281 pub reason: Option<String>,
282}
283
284#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
288#[serde(rename_all = "snake_case")]
289pub enum AttentionSource {
290 ContestedBelief,
291 FailedCheck,
292 StaleBelief,
293 ProposalTimeout,
294 FailedExecution,
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct AttentionItem {
300 pub id: String,
301 pub topic: String,
302 pub topic_tags: Vec<String>,
303 pub priority: f32,
304 pub source_type: AttentionSource,
305 pub source_id: String,
306 pub assigned_persona: Option<String>,
307 pub created_at: DateTime<Utc>,
308 pub resolved_at: Option<DateTime<Utc>>,
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize)]
313pub struct SwarmGovernance {
314 pub quorum_fraction: f32,
315 pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
316 pub veto_roles: Vec<String>,
317 pub vote_timeout_secs: u64,
318}
319
320impl Default for SwarmGovernance {
321 fn default() -> Self {
322 Self {
323 quorum_fraction: 0.5,
324 required_approvers_by_role: HashMap::new(),
325 veto_roles: Vec::new(),
326 vote_timeout_secs: 300,
327 }
328 }
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize)]
333pub struct GlobalWorkspace {
334 pub top_beliefs: Vec<String>,
335 pub top_uncertainties: Vec<String>,
336 pub top_attention: Vec<String>,
337 pub active_objectives: Vec<String>,
338 pub updated_at: DateTime<Utc>,
339}
340
341impl Default for GlobalWorkspace {
342 fn default() -> Self {
343 Self {
344 top_beliefs: Vec::new(),
345 top_uncertainties: Vec::new(),
346 top_attention: Vec::new(),
347 active_objectives: Vec::new(),
348 updated_at: Utc::now(),
349 }
350 }
351}
352
353#[derive(Debug, Clone, Serialize, Deserialize)]
355pub struct CognitionStatus {
356 pub enabled: bool,
357 pub running: bool,
358 pub loop_interval_ms: u64,
359 pub started_at: Option<DateTime<Utc>>,
360 pub last_tick_at: Option<DateTime<Utc>>,
361 pub persona_count: usize,
362 pub active_persona_count: usize,
363 pub events_buffered: usize,
364 pub snapshots_buffered: usize,
365}
366
367#[derive(Debug, Clone, Serialize, Deserialize)]
369pub struct ReapPersonaResponse {
370 pub reaped_ids: Vec<String>,
371 pub count: usize,
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct LineageNode {
377 pub persona_id: String,
378 pub parent_id: Option<String>,
379 pub children: Vec<String>,
380 pub depth: u32,
381 pub status: PersonaStatus,
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct LineageGraph {
387 pub nodes: Vec<LineageNode>,
388 pub roots: Vec<String>,
389 pub total_edges: usize,
390}
391
392#[derive(Debug, Clone, Copy, PartialEq, Eq)]
393enum ThoughtPhase {
394 Observe,
395 Reflect,
396 Test,
397 Compress,
398}
399
400impl ThoughtPhase {
401 fn from_thought_count(thought_count: u64) -> Self {
402 match thought_count % 4 {
403 1 => Self::Observe,
404 2 => Self::Reflect,
405 3 => Self::Test,
406 _ => Self::Compress,
407 }
408 }
409
410 fn as_str(&self) -> &'static str {
411 match self {
412 Self::Observe => "observe",
413 Self::Reflect => "reflect",
414 Self::Test => "test",
415 Self::Compress => "compress",
416 }
417 }
418
419 fn event_type(&self) -> ThoughtEventType {
420 match self {
421 Self::Observe => ThoughtEventType::ThoughtGenerated,
422 Self::Reflect => ThoughtEventType::HypothesisRaised,
423 Self::Test => ThoughtEventType::CheckRequested,
424 Self::Compress => ThoughtEventType::SnapshotCompressed,
425 }
426 }
427}
428
429#[derive(Debug, Clone)]
430struct ThoughtWorkItem {
431 persona_id: String,
432 persona_name: String,
433 role: String,
434 charter: String,
435 swarm_id: Option<String>,
436 thought_count: u64,
437 phase: ThoughtPhase,
438}
439
440#[derive(Debug, Clone)]
441struct ThoughtResult {
442 source: &'static str,
443 model: Option<String>,
444 finish_reason: Option<String>,
445 thinking: String,
446 prompt_tokens: Option<u32>,
447 completion_tokens: Option<u32>,
448 total_tokens: Option<u32>,
449 latency_ms: u128,
450 error: Option<String>,
451}
452
453#[derive(Debug, Clone)]
455pub struct CognitionRuntimeOptions {
456 pub enabled: bool,
457 pub loop_interval_ms: u64,
458 pub max_events: usize,
459 pub max_snapshots: usize,
460 pub default_policy: PersonaPolicy,
461}
462
463impl Default for CognitionRuntimeOptions {
464 fn default() -> Self {
465 Self {
466 enabled: false,
467 loop_interval_ms: 2_000,
468 max_events: 2_000,
469 max_snapshots: 128,
470 default_policy: PersonaPolicy::default(),
471 }
472 }
473}
474
475#[derive(Debug)]
477pub struct CognitionRuntime {
478 enabled: bool,
479 max_events: usize,
480 max_snapshots: usize,
481 default_policy: PersonaPolicy,
482 running: Arc<AtomicBool>,
483 loop_interval_ms: Arc<RwLock<u64>>,
484 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
485 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
486 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
487 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
488 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
489 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
490 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
491 event_tx: broadcast::Sender<ThoughtEvent>,
492 thinker: Option<Arc<ThinkerClient>>,
493 beliefs: Arc<RwLock<HashMap<String, Belief>>>,
494 attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
495 governance: Arc<RwLock<SwarmGovernance>>,
496 workspace: Arc<RwLock<GlobalWorkspace>>,
497 tools: Option<Arc<ToolRegistry>>,
498 receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
499 pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
501}
502
503impl CognitionRuntime {
504 pub fn new_from_env() -> Self {
506 let mut options = CognitionRuntimeOptions::default();
507 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
508 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
509 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
510 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
511
512 options.default_policy = PersonaPolicy {
513 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
514 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
515 token_budget_per_minute: env_u32(
516 "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
517 20_000,
518 ),
519 compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
520 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
521 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
522 allowed_tools: Vec::new(),
523 };
524
525 let thinker_backend = thinker::ThinkerBackend::from_env(
526 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
527 .unwrap_or_else(|_| "openai_compat".to_string()),
528 );
529 let thinker_timeout_default = match thinker_backend {
530 thinker::ThinkerBackend::OpenAICompat => 30_000,
531 thinker::ThinkerBackend::Candle => 12_000,
532 };
533 let thinker_config = ThinkerConfig {
534 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
535 backend: thinker_backend,
536 endpoint: normalize_thinker_endpoint(
537 &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
538 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
539 ),
540 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
541 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
542 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
543 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
544 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
545 .ok()
546 .and_then(|v| v.parse::<f32>().ok()),
547 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
548 timeout_ms: env_u64(
549 "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
550 thinker_timeout_default,
551 ),
552 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
553 candle_tokenizer_path: std::env::var(
554 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
555 )
556 .ok(),
557 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
558 candle_device: thinker::CandleDevicePreference::from_env(
559 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
560 .unwrap_or_else(|_| "auto".to_string()),
561 ),
562 candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
563 candle_repeat_penalty: env_f32(
564 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
565 1.1,
566 ),
567 candle_repeat_last_n: env_usize(
568 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
569 64,
570 ),
571 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
572 };
573
574 Self::new_with_options_and_thinker(options, Some(thinker_config))
575 }
576
577 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
579 Self::new_with_options_and_thinker(options, None)
580 }
581
582 fn new_with_options_and_thinker(
583 options: CognitionRuntimeOptions,
584 thinker_config: Option<ThinkerConfig>,
585 ) -> Self {
586 let (event_tx, _) = broadcast::channel(256);
587 let thinker = thinker_config.and_then(|cfg| {
588 if !cfg.enabled {
589 return None;
590 }
591 match ThinkerClient::new(cfg) {
592 Ok(client) => {
593 tracing::info!(
594 backend = ?client.config().backend,
595 endpoint = %client.config().endpoint,
596 model = %client.config().model,
597 "Cognition thinker initialized"
598 );
599 Some(Arc::new(client))
600 }
601 Err(error) => {
602 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
603 None
604 }
605 }
606 });
607
608 Self {
609 enabled: options.enabled,
610 max_events: options.max_events.max(32),
611 max_snapshots: options.max_snapshots.max(8),
612 default_policy: options.default_policy,
613 running: Arc::new(AtomicBool::new(false)),
614 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
615 started_at: Arc::new(RwLock::new(None)),
616 last_tick_at: Arc::new(RwLock::new(None)),
617 personas: Arc::new(RwLock::new(HashMap::new())),
618 proposals: Arc::new(RwLock::new(HashMap::new())),
619 events: Arc::new(RwLock::new(VecDeque::new())),
620 snapshots: Arc::new(RwLock::new(VecDeque::new())),
621 loop_handle: Arc::new(Mutex::new(None)),
622 event_tx,
623 thinker,
624 beliefs: Arc::new(RwLock::new(HashMap::new())),
625 attention_queue: Arc::new(RwLock::new(Vec::new())),
626 governance: Arc::new(RwLock::new(SwarmGovernance::default())),
627 workspace: Arc::new(RwLock::new(GlobalWorkspace::default())),
628 tools: None,
629 receipts: Arc::new(RwLock::new(Vec::new())),
630 pending_approvals: Arc::new(RwLock::new(HashMap::new())),
631 }
632 }
633
634 pub fn is_enabled(&self) -> bool {
636 self.enabled
637 }
638
639 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
641 self.event_tx.subscribe()
642 }
643
644 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
646 if !self.enabled {
647 return Err(anyhow!(
648 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
649 ));
650 }
651
652 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
653 if let Some(request) = req {
654 if let Some(interval) = request.loop_interval_ms {
655 let mut lock = self.loop_interval_ms.write().await;
656 *lock = interval.max(100);
657 }
658 requested_seed_persona = request.seed_persona;
659 }
660
661 let has_personas = !self.personas.read().await.is_empty();
662 if !has_personas {
663 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
664 let _ = self.create_persona(seed).await?;
665 }
666
667 if self.running.load(Ordering::SeqCst) {
668 return Ok(self.status().await);
669 }
670
671 self.running.store(true, Ordering::SeqCst);
672 {
673 let mut started = self.started_at.write().await;
674 *started = Some(Utc::now());
675 }
676
677 let running = Arc::clone(&self.running);
678 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
679 let last_tick_at = Arc::clone(&self.last_tick_at);
680 let personas = Arc::clone(&self.personas);
681 let proposals = Arc::clone(&self.proposals);
682 let events = Arc::clone(&self.events);
683 let snapshots = Arc::clone(&self.snapshots);
684 let max_events = self.max_events;
685 let max_snapshots = self.max_snapshots;
686 let event_tx = self.event_tx.clone();
687 let thinker = self.thinker.clone();
688 let beliefs = Arc::clone(&self.beliefs);
689 let attention_queue = Arc::clone(&self.attention_queue);
690 let governance = Arc::clone(&self.governance);
691 let workspace = Arc::clone(&self.workspace);
692 let tools = self.tools.clone();
693 let receipts = Arc::clone(&self.receipts);
694 let pending_approvals = Arc::clone(&self.pending_approvals);
695
696 let handle = tokio::spawn(async move {
697 let mut next_tick = Instant::now();
698 while running.load(Ordering::SeqCst) {
699 let now_instant = Instant::now();
700 if now_instant < next_tick {
701 tokio::time::sleep_until(next_tick).await;
702 }
703 if !running.load(Ordering::SeqCst) {
704 break;
705 }
706
707 let now = Utc::now();
708 {
709 let mut lock = last_tick_at.write().await;
710 *lock = Some(now);
711 }
712
713 let mut new_events = Vec::new();
714 let mut new_snapshots = Vec::new();
715 let mut new_proposals = Vec::new();
716
717 let work_items: Vec<ThoughtWorkItem> = {
719 let mut map = personas.write().await;
720 let mut items = Vec::new();
721 for persona in map.values_mut() {
722 if persona.status != PersonaStatus::Active {
723 continue;
724 }
725
726 let window_elapsed = now
728 .signed_duration_since(persona.window_started_at)
729 .num_seconds();
730 if window_elapsed >= 60 {
731 persona.tokens_this_window = 0;
732 persona.compute_ms_this_window = 0;
733 persona.window_started_at = now;
734 }
735
736 let token_ok =
738 persona.tokens_this_window < persona.policy.token_budget_per_minute;
739 let compute_ok =
740 persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
741 if !token_ok || !compute_ok {
742 if !persona.budget_paused {
743 persona.budget_paused = true;
744 new_events.push(ThoughtEvent {
745 id: Uuid::new_v4().to_string(),
746 event_type: ThoughtEventType::BudgetPaused,
747 persona_id: Some(persona.identity.id.clone()),
748 swarm_id: persona.identity.swarm_id.clone(),
749 timestamp: now,
750 payload: json!({
751 "budget_paused": true,
752 "tokens_used": persona.tokens_this_window,
753 "compute_ms_used": persona.compute_ms_this_window,
754 "token_budget": persona.policy.token_budget_per_minute,
755 "compute_budget": persona.policy.compute_ms_per_minute,
756 }),
757 });
758 }
759 continue; }
761
762 persona.budget_paused = false;
763 persona.thought_count = persona.thought_count.saturating_add(1);
764 persona.last_tick_at = Some(now);
765 persona.updated_at = now;
766 items.push(ThoughtWorkItem {
767 persona_id: persona.identity.id.clone(),
768 persona_name: persona.identity.name.clone(),
769 role: persona.identity.role.clone(),
770 charter: persona.identity.charter.clone(),
771 swarm_id: persona.identity.swarm_id.clone(),
772 thought_count: persona.thought_count,
773 phase: ThoughtPhase::from_thought_count(persona.thought_count),
774 });
775 }
776 items
777 };
778
779 for work in &work_items {
780 let context = recent_persona_context(&events, &work.persona_id, 8).await;
781
782 let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
783
784 let event_timestamp = Utc::now();
785 let is_fallback = thought.source == "fallback";
786 let tokens_used = thought.total_tokens.unwrap_or(0);
787 let compute_used = thought.latency_ms as u32;
788
789 new_events.push(ThoughtEvent {
790 id: Uuid::new_v4().to_string(),
791 event_type: work.phase.event_type(),
792 persona_id: Some(work.persona_id.clone()),
793 swarm_id: work.swarm_id.clone(),
794 timestamp: event_timestamp,
795 payload: json!({
796 "phase": work.phase.as_str(),
797 "thought_count": work.thought_count,
798 "persona": {
799 "id": work.persona_id.clone(),
800 "name": work.persona_name.clone(),
801 "role": work.role.clone(),
802 },
803 "context_event_count": context.len(),
804 "thinking": thought.thinking.clone(),
805 "source": thought.source,
806 "model": thought.model.clone(),
807 "finish_reason": thought.finish_reason.clone(),
808 "usage": {
809 "prompt_tokens": thought.prompt_tokens,
810 "completion_tokens": thought.completion_tokens,
811 "total_tokens": thought.total_tokens,
812 },
813 "latency_ms": thought.latency_ms,
814 "error": thought.error.clone(),
815 }),
816 });
817
818 {
820 let mut map = personas.write().await;
821 if let Some(persona) = map.get_mut(&work.persona_id) {
822 persona.tokens_this_window =
823 persona.tokens_this_window.saturating_add(tokens_used);
824 persona.compute_ms_this_window =
825 persona.compute_ms_this_window.saturating_add(compute_used);
826 if !is_fallback {
827 persona.last_progress_at = Utc::now();
828 }
829 }
830 }
831
832 if work.phase == ThoughtPhase::Reflect && !is_fallback {
834 let extracted = beliefs::extract_beliefs_from_thought(
835 thinker.as_deref(),
836 &work.persona_id,
837 &thought.thinking,
838 )
839 .await;
840
841 if !extracted.is_empty() {
842 let mut belief_store = beliefs.write().await;
843 let mut attn_queue = attention_queue.write().await;
844 for mut new_belief in extracted {
845 let existing_id = belief_store
847 .values()
848 .find(|b| {
849 b.belief_key == new_belief.belief_key
850 && b.status != BeliefStatus::Invalidated
851 })
852 .map(|b| b.id.clone());
853
854 if let Some(eid) = existing_id {
855 if let Some(existing) = belief_store.get_mut(&eid) {
857 if !existing.confirmed_by.contains(&work.persona_id) {
858 existing.confirmed_by.push(work.persona_id.clone());
859 }
860 existing.updated_at = Utc::now();
861 }
862 } else {
863 let contest_targets: Vec<String> =
865 new_belief.contradicts.clone();
866 for target_key in &contest_targets {
867 if let Some(target) = belief_store.values_mut().find(|b| {
868 &b.belief_key == target_key
869 && b.status != BeliefStatus::Invalidated
870 }) {
871 target.contested_by.push(new_belief.id.clone());
872 if !new_belief.contradicts.contains(&target.belief_key)
873 {
874 new_belief
875 .contradicts
876 .push(target.belief_key.clone());
877 }
878 target.confidence = (target.confidence - 0.1).max(0.05);
880 if target.confidence < 0.5 {
881 target.status = BeliefStatus::Stale;
882 } else {
883 attn_queue.push(AttentionItem {
885 id: Uuid::new_v4().to_string(),
886 topic: format!(
887 "Revalidate belief: {}",
888 target.claim
889 ),
890 topic_tags: vec![target.belief_key.clone()],
891 priority: 0.7,
892 source_type: AttentionSource::ContestedBelief,
893 source_id: target.id.clone(),
894 assigned_persona: None,
895 created_at: Utc::now(),
896 resolved_at: None,
897 });
898 }
899 }
900 }
901
902 new_events.push(ThoughtEvent {
903 id: Uuid::new_v4().to_string(),
904 event_type: ThoughtEventType::BeliefExtracted,
905 persona_id: Some(work.persona_id.clone()),
906 swarm_id: work.swarm_id.clone(),
907 timestamp: Utc::now(),
908 payload: json!({
909 "belief_id": new_belief.id,
910 "belief_key": new_belief.belief_key,
911 "claim": trim_for_storage(&new_belief.claim, 280),
912 "confidence": new_belief.confidence,
913 }),
914 });
915
916 {
918 let mut map = personas.write().await;
919 if let Some(p) = map.get_mut(&work.persona_id) {
920 p.last_progress_at = Utc::now();
921 }
922 }
923
924 belief_store.insert(new_belief.id.clone(), new_belief);
925 }
926 }
927 }
928 }
929
930 if work.phase == ThoughtPhase::Test {
931 new_events.push(ThoughtEvent {
932 id: Uuid::new_v4().to_string(),
933 event_type: ThoughtEventType::CheckResult,
934 persona_id: Some(work.persona_id.clone()),
935 swarm_id: work.swarm_id.clone(),
936 timestamp: Utc::now(),
937 payload: json!({
938 "phase": work.phase.as_str(),
939 "thought_count": work.thought_count,
940 "result_excerpt": trim_for_storage(&thought.thinking, 280),
941 "source": thought.source,
942 "model": thought.model,
943 }),
944 });
945
946 {
948 let mut map = personas.write().await;
949 if let Some(p) = map.get_mut(&work.persona_id) {
950 p.last_progress_at = Utc::now();
951 }
952 }
953
954 if !is_fallback {
956 if let Some(ref tool_registry) = tools {
957 let allowed = {
958 let map = personas.read().await;
959 map.get(&work.persona_id)
960 .map(|p| p.policy.allowed_tools.clone())
961 .unwrap_or_default()
962 };
963 if !allowed.is_empty() {
964 let tool_results = executor::execute_tool_requests(
965 thinker.as_deref(),
966 tool_registry,
967 &work.persona_id,
968 &thought.thinking,
969 &allowed,
970 )
971 .await;
972
973 for result_event in tool_results {
974 new_events.push(result_event);
975 let mut map = personas.write().await;
977 if let Some(p) = map.get_mut(&work.persona_id) {
978 p.last_progress_at = Utc::now();
979 }
980 }
981 }
982 }
983 }
984 }
985
986 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
987 let gov = governance.read().await;
988 let proposal = Proposal {
989 id: Uuid::new_v4().to_string(),
990 persona_id: work.persona_id.clone(),
991 title: proposal_title_from_thought(
992 &thought.thinking,
993 work.thought_count,
994 ),
995 rationale: trim_for_storage(&thought.thinking, 900),
996 evidence_refs: vec!["internal.thought_stream".to_string()],
997 risk: ProposalRisk::Low,
998 status: ProposalStatus::Created,
999 created_at: Utc::now(),
1000 votes: HashMap::new(),
1001 vote_deadline: Some(
1002 Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1003 ),
1004 votes_requested: false,
1005 quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1006 as usize,
1007 };
1008
1009 new_events.push(ThoughtEvent {
1010 id: Uuid::new_v4().to_string(),
1011 event_type: ThoughtEventType::ProposalCreated,
1012 persona_id: Some(work.persona_id.clone()),
1013 swarm_id: work.swarm_id.clone(),
1014 timestamp: Utc::now(),
1015 payload: json!({
1016 "proposal_id": proposal.id,
1017 "title": proposal.title,
1018 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1019 "source": thought.source,
1020 "model": thought.model,
1021 }),
1022 });
1023
1024 new_proposals.push(proposal);
1025 }
1026
1027 if work.phase == ThoughtPhase::Compress {
1028 new_snapshots.push(MemorySnapshot {
1029 id: Uuid::new_v4().to_string(),
1030 generated_at: Utc::now(),
1031 swarm_id: work.swarm_id.clone(),
1032 persona_scope: vec![work.persona_id.clone()],
1033 summary: trim_for_storage(&thought.thinking, 1_500),
1034 hot_event_count: context.len(),
1035 warm_fact_count: estimate_fact_count(&thought.thinking),
1036 cold_snapshot_count: 1,
1037 metadata: HashMap::from([
1038 (
1039 "phase".to_string(),
1040 serde_json::Value::String(work.phase.as_str().to_string()),
1041 ),
1042 (
1043 "role".to_string(),
1044 serde_json::Value::String(work.role.clone()),
1045 ),
1046 (
1047 "source".to_string(),
1048 serde_json::Value::String(thought.source.to_string()),
1049 ),
1050 (
1051 "model".to_string(),
1052 serde_json::Value::String(
1053 thought
1054 .model
1055 .clone()
1056 .unwrap_or_else(|| "fallback".to_string()),
1057 ),
1058 ),
1059 (
1060 "completion_tokens".to_string(),
1061 serde_json::Value::Number(serde_json::Number::from(
1062 thought.completion_tokens.unwrap_or(0) as u64,
1063 )),
1064 ),
1065 ]),
1066 });
1067
1068 {
1070 let mut belief_store = beliefs.write().await;
1071 let mut attn_queue = attention_queue.write().await;
1072 let stale_ids: Vec<String> = belief_store
1073 .values()
1074 .filter(|b| {
1075 b.status == BeliefStatus::Active && now > b.review_after
1076 })
1077 .map(|b| b.id.clone())
1078 .collect();
1079 for id in stale_ids {
1080 if let Some(belief) = belief_store.get_mut(&id) {
1081 belief.status = BeliefStatus::Stale;
1082 belief.confidence *= 0.98;
1083 belief.confidence = belief.confidence.max(0.05);
1084 attn_queue.push(AttentionItem {
1085 id: Uuid::new_v4().to_string(),
1086 topic: format!("Stale belief: {}", belief.claim),
1087 topic_tags: vec![belief.belief_key.clone()],
1088 priority: 0.4,
1089 source_type: AttentionSource::StaleBelief,
1090 source_id: belief.id.clone(),
1091 assigned_persona: None,
1092 created_at: now,
1093 resolved_at: None,
1094 });
1095 }
1096 }
1097 }
1098
1099 {
1101 let belief_store = beliefs.read().await;
1102 let attn_queue = attention_queue.read().await;
1103
1104 let mut sorted_beliefs: Vec<&Belief> = belief_store
1105 .values()
1106 .filter(|b| b.status == BeliefStatus::Active)
1107 .collect();
1108 sorted_beliefs.sort_by(|a, b| {
1109 let score_a = a.confidence
1110 * (1.0
1111 / (1.0
1112 + now.signed_duration_since(a.updated_at).num_minutes()
1113 as f32));
1114 let score_b = b.confidence
1115 * (1.0
1116 / (1.0
1117 + now.signed_duration_since(b.updated_at).num_minutes()
1118 as f32));
1119 score_b
1120 .partial_cmp(&score_a)
1121 .unwrap_or(std::cmp::Ordering::Equal)
1122 });
1123
1124 let top_beliefs: Vec<String> = sorted_beliefs
1125 .iter()
1126 .take(10)
1127 .map(|b| b.id.clone())
1128 .collect();
1129 let top_uncertainties: Vec<String> = {
1130 let mut uncertain: Vec<&Belief> = belief_store
1131 .values()
1132 .filter(|b| {
1133 b.status == BeliefStatus::Stale
1134 || !b.contested_by.is_empty()
1135 })
1136 .collect();
1137 uncertain.sort_by(|a, b| {
1140 let a_contested = !a.contested_by.is_empty();
1141 let b_contested = !b.contested_by.is_empty();
1142 b_contested
1143 .cmp(&a_contested)
1144 .then_with(|| {
1145 a.confidence
1146 .partial_cmp(&b.confidence)
1147 .unwrap_or(std::cmp::Ordering::Equal)
1148 })
1149 .then_with(|| a.updated_at.cmp(&b.updated_at))
1150 });
1151 uncertain
1152 .iter()
1153 .take(5)
1154 .map(|b| {
1155 format!(
1156 "[{}] {}",
1157 b.belief_key,
1158 trim_for_storage(&b.claim, 80)
1159 )
1160 })
1161 .collect()
1162 };
1163
1164 let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1165 .iter()
1166 .filter(|a| a.resolved_at.is_none())
1167 .collect();
1168 sorted_attn.sort_by(|a, b| {
1169 b.priority
1170 .partial_cmp(&a.priority)
1171 .unwrap_or(std::cmp::Ordering::Equal)
1172 });
1173 let top_attention: Vec<String> =
1174 sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1175
1176 let mut ws = workspace.write().await;
1177 ws.top_beliefs = top_beliefs;
1178 ws.top_uncertainties = top_uncertainties;
1179 ws.top_attention = top_attention;
1180 ws.updated_at = now;
1181 }
1182
1183 new_events.push(ThoughtEvent {
1184 id: Uuid::new_v4().to_string(),
1185 event_type: ThoughtEventType::WorkspaceUpdated,
1186 persona_id: Some(work.persona_id.clone()),
1187 swarm_id: work.swarm_id.clone(),
1188 timestamp: Utc::now(),
1189 payload: json!({ "updated": true }),
1190 });
1191 }
1192 }
1193
1194 {
1196 let gov = governance.read().await;
1197 let mut proposal_store = proposals.write().await;
1198 let persona_map = personas.read().await;
1199
1200 let proposal_ids: Vec<String> = proposal_store
1201 .values()
1202 .filter(|p| p.status == ProposalStatus::Created)
1203 .map(|p| p.id.clone())
1204 .collect();
1205
1206 let mut attn_queue = attention_queue.write().await;
1207 for pid in proposal_ids {
1208 if let Some(proposal) = proposal_store.get_mut(&pid) {
1209 let quorum_needed = proposal.quorum_needed.max(1);
1210
1211 if let Some(deadline) = proposal.vote_deadline {
1213 if now > deadline {
1214 if proposal.votes.len() < quorum_needed {
1215 attn_queue.push(AttentionItem {
1217 id: Uuid::new_v4().to_string(),
1218 topic: format!(
1219 "Proposal vote timeout: {}",
1220 proposal.title
1221 ),
1222 topic_tags: Vec::new(),
1223 priority: 0.6,
1224 source_type: AttentionSource::ProposalTimeout,
1225 source_id: proposal.id.clone(),
1226 assigned_persona: None,
1227 created_at: now,
1228 resolved_at: None,
1229 });
1230 proposal.status = ProposalStatus::Rejected;
1231 continue;
1232 }
1233
1234 let required_roles = gov
1236 .required_approvers_by_role
1237 .get(&proposal.risk)
1238 .cloned()
1239 .unwrap_or_default();
1240 let all_required_met = required_roles.iter().all(|role| {
1241 proposal.votes.iter().any(|(vid, vote)| {
1242 *vote == ProposalVote::Approve
1243 && persona_map
1244 .get(vid)
1245 .map(|p| &p.identity.role == role)
1246 .unwrap_or(false)
1247 })
1248 });
1249 if !all_required_met {
1250 attn_queue.push(AttentionItem {
1251 id: Uuid::new_v4().to_string(),
1252 topic: format!(
1253 "Missing required approvers: {}",
1254 proposal.title
1255 ),
1256 topic_tags: Vec::new(),
1257 priority: 0.7,
1258 source_type: AttentionSource::ProposalTimeout,
1259 source_id: proposal.id.clone(),
1260 assigned_persona: None,
1261 created_at: now,
1262 resolved_at: None,
1263 });
1264 proposal.status = ProposalStatus::Rejected;
1265 continue;
1266 }
1267 }
1268 }
1269
1270 if proposal.votes.len() >= quorum_needed {
1272 let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1274 if *vote != ProposalVote::Veto {
1275 return false;
1276 }
1277 if let Some(voter) = persona_map.get(voter_id) {
1278 gov.veto_roles.contains(&voter.identity.role)
1279 } else {
1280 false
1281 }
1282 });
1283
1284 if vetoed {
1285 proposal.status = ProposalStatus::Rejected;
1286 continue;
1287 }
1288
1289 let required_roles = gov
1291 .required_approvers_by_role
1292 .get(&proposal.risk)
1293 .cloned()
1294 .unwrap_or_default();
1295 let all_required_met = required_roles.iter().all(|role| {
1296 proposal.votes.iter().any(|(vid, vote)| {
1297 *vote == ProposalVote::Approve
1298 && persona_map
1299 .get(vid)
1300 .map(|p| &p.identity.role == role)
1301 .unwrap_or(false)
1302 })
1303 });
1304
1305 if !all_required_met {
1306 continue; }
1308
1309 let approvals = proposal
1311 .votes
1312 .values()
1313 .filter(|v| **v == ProposalVote::Approve)
1314 .count();
1315 let rejections = proposal
1316 .votes
1317 .values()
1318 .filter(|v| **v == ProposalVote::Reject)
1319 .count();
1320
1321 if approvals > rejections {
1322 proposal.status = ProposalStatus::Verified;
1323 } else {
1324 proposal.status = ProposalStatus::Rejected;
1325 }
1326 }
1327 }
1328 }
1329 }
1330
1331 {
1333 let mut proposal_store = proposals.write().await;
1334 let verified_ids: Vec<String> = proposal_store
1335 .values()
1336 .filter(|p| p.status == ProposalStatus::Verified)
1337 .map(|p| p.id.clone())
1338 .collect();
1339
1340 for pid in verified_ids {
1341 if let Some(proposal) = proposal_store.get_mut(&pid) {
1342 if proposal.risk == ProposalRisk::Critical {
1343 let approved = {
1345 let approvals = pending_approvals.read().await;
1346 approvals.get(&pid).copied().unwrap_or(false)
1347 };
1348 if !approved {
1349 let mut approvals = pending_approvals.write().await;
1351 approvals.entry(pid.clone()).or_insert(false);
1352 continue;
1353 }
1354 }
1355
1356 let receipt = executor::DecisionReceipt {
1358 id: Uuid::new_v4().to_string(),
1359 proposal_id: pid.clone(),
1360 inputs: proposal.evidence_refs.clone(),
1361 governance_decision: format!(
1362 "Approved with {} votes",
1363 proposal.votes.len()
1364 ),
1365 capability_leases: Vec::new(),
1366 tool_invocations: Vec::new(),
1367 outcome: executor::ExecutionOutcome::Success {
1368 summary: format!("Proposal '{}' executed", proposal.title),
1369 },
1370 created_at: Utc::now(),
1371 };
1372
1373 new_events.push(ThoughtEvent {
1374 id: Uuid::new_v4().to_string(),
1375 event_type: ThoughtEventType::ActionExecuted,
1376 persona_id: Some(proposal.persona_id.clone()),
1377 swarm_id: None,
1378 timestamp: Utc::now(),
1379 payload: json!({
1380 "receipt_id": receipt.id,
1381 "proposal_id": pid,
1382 "outcome": "success",
1383 "summary": format!("Proposal '{}' executed", proposal.title),
1384 }),
1385 });
1386
1387 receipts.write().await.push(receipt);
1388 proposal.status = ProposalStatus::Executed;
1389 }
1390 }
1391 }
1392
1393 if !new_proposals.is_empty() {
1394 let mut proposal_store = proposals.write().await;
1395 for proposal in new_proposals {
1396 proposal_store.insert(proposal.id.clone(), proposal);
1397 }
1398 }
1399
1400 for event in new_events {
1401 push_event_internal(&events, max_events, &event_tx, event).await;
1402 }
1403 for snapshot in new_snapshots {
1404 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1405 }
1406
1407 {
1409 let mut map = personas.write().await;
1410 let idle_ids: Vec<String> = map
1411 .values()
1412 .filter(|p| {
1413 p.status == PersonaStatus::Active
1414 && !p.budget_paused
1415 && now.signed_duration_since(p.last_progress_at).num_seconds()
1416 > p.policy.idle_ttl_secs as i64
1417 })
1418 .map(|p| p.identity.id.clone())
1419 .collect();
1420
1421 for id in &idle_ids {
1422 if let Some(persona) = map.get_mut(id) {
1423 persona.status = PersonaStatus::Reaped;
1424 persona.updated_at = now;
1425 }
1426 let children: Vec<String> = map
1428 .values()
1429 .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1430 .map(|p| p.identity.id.clone())
1431 .collect();
1432 for child_id in children {
1433 if let Some(child) = map.get_mut(&child_id) {
1434 child.status = PersonaStatus::Reaped;
1435 child.updated_at = now;
1436 }
1437 }
1438 }
1439 drop(map);
1440
1441 for id in idle_ids {
1442 push_event_internal(
1443 &events,
1444 max_events,
1445 &event_tx,
1446 ThoughtEvent {
1447 id: Uuid::new_v4().to_string(),
1448 event_type: ThoughtEventType::IdleReaped,
1449 persona_id: Some(id),
1450 swarm_id: None,
1451 timestamp: now,
1452 payload: json!({ "reason": "idle_ttl_expired" }),
1453 },
1454 )
1455 .await;
1456 }
1457 }
1458
1459 if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1461 let _ = persistence::save_state(
1462 &personas,
1463 &proposals,
1464 &beliefs,
1465 &attention_queue,
1466 &workspace,
1467 &events,
1468 &snapshots,
1469 )
1470 .await;
1471 }
1472
1473 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1474 next_tick += interval;
1475 let tick_completed = Instant::now();
1476 if tick_completed > next_tick {
1477 next_tick = tick_completed;
1478 }
1479 }
1480 });
1481
1482 {
1483 let mut lock = self.loop_handle.lock().await;
1484 *lock = Some(handle);
1485 }
1486
1487 Ok(self.status().await)
1488 }
1489
1490 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1492 self.running.store(false, Ordering::SeqCst);
1493
1494 if let Some(handle) = self.loop_handle.lock().await.take() {
1495 handle.abort();
1496 let _ = handle.await;
1497 }
1498
1499 if let Some(reason_value) = reason {
1500 let event = ThoughtEvent {
1501 id: Uuid::new_v4().to_string(),
1502 event_type: ThoughtEventType::CheckResult,
1503 persona_id: None,
1504 swarm_id: None,
1505 timestamp: Utc::now(),
1506 payload: json!({ "stopped": true, "reason": reason_value }),
1507 };
1508 self.push_event(event).await;
1509 }
1510
1511 Ok(self.status().await)
1512 }
1513
1514 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1516 let now = Utc::now();
1517 let mut personas = self.personas.write().await;
1518
1519 let mut parent_swarm_id = None;
1520 let mut computed_depth = 0_u32;
1521 let mut inherited_policy = None;
1522
1523 if let Some(parent_id) = req.parent_id.clone() {
1524 let parent = personas
1525 .get(&parent_id)
1526 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1527
1528 if parent.status == PersonaStatus::Reaped {
1529 return Err(anyhow!("Parent persona {} is reaped", parent_id));
1530 }
1531
1532 parent_swarm_id = parent.identity.swarm_id.clone();
1533 computed_depth = parent.identity.depth.saturating_add(1);
1534 inherited_policy = Some(parent.policy.clone());
1535 let branch_limit = parent.policy.max_branching_factor;
1536
1537 let child_count = personas
1538 .values()
1539 .filter(|p| {
1540 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1541 && p.status != PersonaStatus::Reaped
1542 })
1543 .count();
1544
1545 if child_count as u32 >= branch_limit {
1546 return Err(anyhow!(
1547 "Parent {} reached branching limit {}",
1548 parent_id,
1549 branch_limit
1550 ));
1551 }
1552 }
1553
1554 let policy = req
1555 .policy
1556 .clone()
1557 .or(inherited_policy.clone())
1558 .unwrap_or_else(|| self.default_policy.clone());
1559
1560 let effective_depth_limit = inherited_policy
1561 .as_ref()
1562 .map(|p| p.max_spawn_depth)
1563 .unwrap_or(policy.max_spawn_depth);
1564
1565 if computed_depth > effective_depth_limit {
1566 return Err(anyhow!(
1567 "Spawn depth {} exceeds limit {}",
1568 computed_depth,
1569 effective_depth_limit
1570 ));
1571 }
1572
1573 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1574 if personas.contains_key(&persona_id) {
1575 return Err(anyhow!("Persona id already exists: {}", persona_id));
1576 }
1577
1578 let identity = PersonaIdentity {
1579 id: persona_id.clone(),
1580 name: req.name,
1581 role: req.role,
1582 charter: req.charter,
1583 swarm_id: req.swarm_id.or(parent_swarm_id),
1584 parent_id: req.parent_id,
1585 depth: computed_depth,
1586 created_at: now,
1587 tags: req.tags,
1588 };
1589
1590 let persona = PersonaRuntimeState {
1591 identity,
1592 policy,
1593 status: PersonaStatus::Active,
1594 thought_count: 0,
1595 last_tick_at: None,
1596 updated_at: now,
1597 tokens_this_window: 0,
1598 compute_ms_this_window: 0,
1599 window_started_at: now,
1600 last_progress_at: now,
1601 budget_paused: false,
1602 };
1603
1604 personas.insert(persona_id, persona.clone());
1605 drop(personas);
1606
1607 self.push_event(ThoughtEvent {
1608 id: Uuid::new_v4().to_string(),
1609 event_type: ThoughtEventType::PersonaSpawned,
1610 persona_id: Some(persona.identity.id.clone()),
1611 swarm_id: persona.identity.swarm_id.clone(),
1612 timestamp: now,
1613 payload: json!({
1614 "name": persona.identity.name,
1615 "role": persona.identity.role,
1616 "depth": persona.identity.depth,
1617 }),
1618 })
1619 .await;
1620
1621 Ok(persona)
1622 }
1623
1624 pub async fn spawn_child(
1626 &self,
1627 parent_id: &str,
1628 req: SpawnPersonaRequest,
1629 ) -> Result<PersonaRuntimeState> {
1630 let request = CreatePersonaRequest {
1631 persona_id: req.persona_id,
1632 name: req.name,
1633 role: req.role,
1634 charter: req.charter,
1635 swarm_id: req.swarm_id,
1636 parent_id: Some(parent_id.to_string()),
1637 policy: req.policy,
1638 tags: Vec::new(),
1639 };
1640 self.create_persona(request).await
1641 }
1642
1643 pub async fn reap_persona(
1645 &self,
1646 persona_id: &str,
1647 req: ReapPersonaRequest,
1648 ) -> Result<ReapPersonaResponse> {
1649 let cascade = req.cascade.unwrap_or(false);
1650 let now = Utc::now();
1651
1652 let mut personas = self.personas.write().await;
1653 if !personas.contains_key(persona_id) {
1654 return Err(anyhow!("Persona not found: {}", persona_id));
1655 }
1656
1657 let mut reaped_ids = vec![persona_id.to_string()];
1658 if cascade {
1659 let mut idx = 0usize;
1660 while idx < reaped_ids.len() {
1661 let current = reaped_ids[idx].clone();
1662 let children: Vec<String> = personas
1663 .values()
1664 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1665 .map(|p| p.identity.id.clone())
1666 .collect();
1667 for child in children {
1668 if !reaped_ids.iter().any(|existing| existing == &child) {
1669 reaped_ids.push(child);
1670 }
1671 }
1672 idx += 1;
1673 }
1674 }
1675
1676 for id in &reaped_ids {
1677 if let Some(persona) = personas.get_mut(id) {
1678 persona.status = PersonaStatus::Reaped;
1679 persona.updated_at = now;
1680 }
1681 }
1682 drop(personas);
1683
1684 for id in &reaped_ids {
1685 self.push_event(ThoughtEvent {
1686 id: Uuid::new_v4().to_string(),
1687 event_type: ThoughtEventType::PersonaReaped,
1688 persona_id: Some(id.clone()),
1689 swarm_id: None,
1690 timestamp: now,
1691 payload: json!({
1692 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1693 "cascade": cascade,
1694 }),
1695 })
1696 .await;
1697 }
1698
1699 Ok(ReapPersonaResponse {
1700 count: reaped_ids.len(),
1701 reaped_ids,
1702 })
1703 }
1704
1705 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1707 self.snapshots.read().await.back().cloned()
1708 }
1709
1710 pub async fn lineage_graph(&self) -> LineageGraph {
1712 let personas = self.personas.read().await;
1713 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1714 let mut roots = Vec::new();
1715 let mut total_edges = 0usize;
1716
1717 for persona in personas.values() {
1718 if let Some(parent_id) = persona.identity.parent_id.clone() {
1719 children_by_parent
1720 .entry(parent_id)
1721 .or_default()
1722 .push(persona.identity.id.clone());
1723 total_edges = total_edges.saturating_add(1);
1724 } else {
1725 roots.push(persona.identity.id.clone());
1726 }
1727 }
1728
1729 let mut nodes: Vec<LineageNode> = personas
1730 .values()
1731 .map(|persona| {
1732 let mut children = children_by_parent
1733 .get(&persona.identity.id)
1734 .cloned()
1735 .unwrap_or_default();
1736 children.sort();
1737
1738 LineageNode {
1739 persona_id: persona.identity.id.clone(),
1740 parent_id: persona.identity.parent_id.clone(),
1741 children,
1742 depth: persona.identity.depth,
1743 status: persona.status,
1744 }
1745 })
1746 .collect();
1747
1748 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1749 roots.sort();
1750
1751 LineageGraph {
1752 nodes,
1753 roots,
1754 total_edges,
1755 }
1756 }
1757
1758 pub async fn status(&self) -> CognitionStatus {
1760 let personas = self.personas.read().await;
1761 let events = self.events.read().await;
1762 let snapshots = self.snapshots.read().await;
1763 let started_at = *self.started_at.read().await;
1764 let last_tick_at = *self.last_tick_at.read().await;
1765 let loop_interval_ms = *self.loop_interval_ms.read().await;
1766
1767 let active_persona_count = personas
1768 .values()
1769 .filter(|p| p.status == PersonaStatus::Active)
1770 .count();
1771
1772 CognitionStatus {
1773 enabled: self.enabled,
1774 running: self.running.load(Ordering::SeqCst),
1775 loop_interval_ms,
1776 started_at,
1777 last_tick_at,
1778 persona_count: personas.len(),
1779 active_persona_count,
1780 events_buffered: events.len(),
1781 snapshots_buffered: snapshots.len(),
1782 }
1783 }
1784
1785 async fn push_event(&self, event: ThoughtEvent) {
1786 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1787 }
1788
1789 pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1791 self.tools = Some(registry);
1792 }
1793
1794 pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1796 self.beliefs.read().await.clone()
1797 }
1798
1799 pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1801 self.beliefs.read().await.get(id).cloned()
1802 }
1803
1804 pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1806 self.attention_queue.read().await.clone()
1807 }
1808
1809 pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1811 self.proposals.read().await.clone()
1812 }
1813
1814 pub async fn get_workspace(&self) -> GlobalWorkspace {
1816 self.workspace.read().await.clone()
1817 }
1818
1819 pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1821 self.receipts.read().await.clone()
1822 }
1823
1824 pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1826 let proposals = self.proposals.read().await;
1827 let proposal = proposals
1828 .get(proposal_id)
1829 .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1830
1831 if proposal.risk != ProposalRisk::Critical {
1832 return Err(anyhow!("Only Critical proposals require human approval"));
1833 }
1834 if proposal.status != ProposalStatus::Verified {
1835 return Err(anyhow!("Proposal is not in Verified status"));
1836 }
1837 drop(proposals);
1838
1839 let mut approvals = self.pending_approvals.write().await;
1840 approvals.insert(proposal_id.to_string(), true);
1841 Ok(())
1842 }
1843
1844 pub async fn get_governance(&self) -> SwarmGovernance {
1846 self.governance.read().await.clone()
1847 }
1848
1849 pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1851 self.personas.read().await.get(id).cloned()
1852 }
1853}
1854
1855async fn push_event_internal(
1856 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1857 max_events: usize,
1858 event_tx: &broadcast::Sender<ThoughtEvent>,
1859 event: ThoughtEvent,
1860) {
1861 {
1862 let mut lock = events.write().await;
1863 lock.push_back(event.clone());
1864 while lock.len() > max_events {
1865 lock.pop_front();
1866 }
1867 }
1868 let _ = event_tx.send(event);
1869}
1870
1871async fn push_snapshot_internal(
1872 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1873 max_snapshots: usize,
1874 snapshot: MemorySnapshot,
1875) {
1876 let mut lock = snapshots.write().await;
1877 lock.push_back(snapshot);
1878 while lock.len() > max_snapshots {
1879 lock.pop_front();
1880 }
1881}
1882
1883async fn recent_persona_context(
1884 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1885 persona_id: &str,
1886 limit: usize,
1887) -> Vec<ThoughtEvent> {
1888 let lock = events.read().await;
1889 let mut selected: Vec<ThoughtEvent> = lock
1890 .iter()
1891 .rev()
1892 .filter(|event| {
1893 event.persona_id.as_deref() == Some(persona_id)
1894 || (event.persona_id.is_none()
1895 && matches!(
1896 event.event_type,
1897 ThoughtEventType::CheckResult
1898 | ThoughtEventType::ProposalCreated
1899 | ThoughtEventType::SnapshotCompressed
1900 | ThoughtEventType::WorkspaceUpdated
1901 | ThoughtEventType::ActionExecuted
1902 | ThoughtEventType::BudgetPaused
1903 ))
1904 })
1905 .take(limit)
1906 .cloned()
1907 .collect();
1908 selected.reverse();
1909 selected
1910}
1911
1912async fn generate_phase_thought(
1913 thinker: Option<&ThinkerClient>,
1914 work: &ThoughtWorkItem,
1915 context: &[ThoughtEvent],
1916) -> ThoughtResult {
1917 let started_at = Instant::now();
1918 if let Some(client) = thinker {
1919 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1920 match client.think(&system_prompt, &user_prompt).await {
1921 Ok(output) => {
1922 let thinking = normalize_thought_output(work, context, &output.text);
1923 if !thinking.is_empty() {
1924 return ThoughtResult {
1925 source: "model",
1926 model: Some(output.model),
1927 finish_reason: output.finish_reason,
1928 thinking,
1929 prompt_tokens: output.prompt_tokens,
1930 completion_tokens: output.completion_tokens,
1931 total_tokens: output.total_tokens,
1932 latency_ms: started_at.elapsed().as_millis(),
1933 error: None,
1934 };
1935 }
1936 }
1937 Err(error) => {
1938 return ThoughtResult {
1939 source: "fallback",
1940 model: None,
1941 finish_reason: None,
1942 thinking: fallback_phase_text(work, context),
1943 prompt_tokens: None,
1944 completion_tokens: None,
1945 total_tokens: None,
1946 latency_ms: started_at.elapsed().as_millis(),
1947 error: Some(error.to_string()),
1948 };
1949 }
1950 }
1951 }
1952
1953 ThoughtResult {
1954 source: "fallback",
1955 model: None,
1956 finish_reason: None,
1957 thinking: fallback_phase_text(work, context),
1958 prompt_tokens: None,
1959 completion_tokens: None,
1960 total_tokens: None,
1961 latency_ms: started_at.elapsed().as_millis(),
1962 error: None,
1963 }
1964}
1965
1966fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1967 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1968Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1969Write as an operational process update, not meta narration. \
1970Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
1971Output concrete findings, checks, risks, and next actions. \
1972Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
1973 .to_string();
1974
1975 let context_lines = if context.is_empty() {
1976 "none".to_string()
1977 } else {
1978 context
1979 .iter()
1980 .map(format_context_event)
1981 .collect::<Vec<_>>()
1982 .join("\n")
1983 };
1984
1985 let phase_instruction = match work.phase {
1986 ThoughtPhase::Observe => {
1987 "Process format (exact line labels): \
1988Phase: Observe | Goal: detect current customer/business risk | \
1989Signals: 1-3 concrete signals separated by '; ' | \
1990Uncertainty: one unknown that blocks confidence | \
1991Next_Action: one immediate operational action."
1992 }
1993 ThoughtPhase::Reflect => {
1994 "Process format (exact line labels): \
1995Phase: Reflect | Hypothesis: single testable hypothesis | \
1996Rationale: why this is likely | \
1997Business_Risk: customer/revenue/SLA impact | \
1998Validation_Next_Action: one action to confirm or falsify."
1999 }
2000 ThoughtPhase::Test => {
2001 "Process format (exact line labels): \
2002Phase: Test | Check: single concrete check | \
2003Procedure: short executable procedure | \
2004Expected_Result: pass/fail expectation | \
2005Evidence_Quality: low|medium|high with reason | \
2006Escalation_Trigger: when to escalate immediately."
2007 }
2008 ThoughtPhase::Compress => {
2009 "Process format (exact line labels): \
2010Phase: Compress | State_Summary: current state in one line | \
2011Retained_Facts: 3 short facts separated by '; ' | \
2012Open_Risks: up to 2 unresolved risks separated by '; ' | \
2013Next_Process_Step: next operational step."
2014 }
2015 };
2016
2017 let user_prompt = format!(
2018 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2019 phase = work.phase.as_str(),
2020 persona_id = work.persona_id,
2021 persona_name = work.persona_name,
2022 role = work.role,
2023 charter = work.charter,
2024 thought_count = work.thought_count,
2025 context = context_lines,
2026 instruction = phase_instruction
2027 );
2028
2029 (system_prompt, user_prompt)
2030}
2031
2032fn format_context_event(event: &ThoughtEvent) -> String {
2033 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2034 format!(
2035 "{} {} {}",
2036 event.event_type.as_str(),
2037 event.timestamp.to_rfc3339(),
2038 trim_for_storage(&payload, 220)
2039 )
2040}
2041
2042fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2043 let charter = trim_for_storage(&work.charter, 180);
2044 let context_summary = fallback_context_summary(context);
2045 let thought = match work.phase {
2046 ThoughtPhase::Observe => format!(
2047 "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2048 work.role, charter, context_summary
2049 ),
2050 ThoughtPhase::Reflect => format!(
2051 "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
2052 ),
2053 ThoughtPhase::Test => format!(
2054 "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
2055 ),
2056 ThoughtPhase::Compress => format!(
2057 "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2058 work.role, charter, context_summary
2059 ),
2060 };
2061 trim_for_storage(&thought, 1_200)
2062}
2063
2064fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2065 let trimmed = trim_for_storage(raw, 2_000);
2066 if trimmed.trim().is_empty() {
2067 return fallback_phase_text(work, context);
2068 }
2069
2070 if let Some(idx) = find_process_label_start(&trimmed) {
2072 let candidate = trimmed[idx..].trim();
2073 if !candidate.is_empty()
2075 && !candidate.contains('<')
2076 && !has_template_placeholder_values(candidate)
2077 {
2078 let collapsed: String = candidate
2080 .lines()
2081 .map(str::trim)
2082 .filter(|l| !l.is_empty())
2083 .collect::<Vec<_>>()
2084 .join(" | ");
2085 let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2086 if cleaned.starts_with("Phase:") {
2087 return cleaned.to_string();
2088 }
2089 return collapsed;
2090 }
2091 }
2092
2093 let lower = trimmed.to_ascii_lowercase();
2094 let looks_meta = lower.starts_with("we need")
2095 || lower.starts_with("i need")
2096 || lower.contains("we need to")
2097 || lower.contains("i need to")
2098 || lower.contains("must output")
2099 || lower.contains("let's ")
2100 || lower.contains("we have to");
2101
2102 if looks_meta || has_template_placeholder_values(&trimmed) {
2103 return fallback_phase_text(work, context);
2104 }
2105
2106 trimmed
2107}
2108
2109fn has_template_placeholder_values(text: &str) -> bool {
2110 let lower = text.to_ascii_lowercase();
2111 [
2112 "goal: ...",
2113 "signals: ...",
2114 "uncertainty: ...",
2115 "next_action: ...",
2116 "hypothesis: ...",
2117 "rationale: ...",
2118 "business_risk: ...",
2119 "validation_next_action: ...",
2120 "check: ...",
2121 "procedure: ...",
2122 "expected_result: ...",
2123 "evidence_quality: ...",
2124 "escalation_trigger: ...",
2125 "state_summary: ...",
2126 "retained_facts: ...",
2127 "open_risks: ...",
2128 "next_process_step: ...",
2129 ]
2130 .iter()
2131 .any(|needle| lower.contains(needle))
2132 || lower.contains("<...")
2133 || lower.contains("tbd")
2134 || lower.contains("todo")
2135}
2136
2137fn find_process_label_start(text: &str) -> Option<usize> {
2138 [
2139 "Phase: Observe",
2140 "Phase: Reflect",
2141 "Phase: Test",
2142 "Phase: Compress",
2143 "Phase:",
2144 ]
2145 .iter()
2146 .filter_map(|label| text.find(label))
2147 .min()
2148}
2149
2150fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2151 if context.is_empty() {
2152 return "No prior events recorded yet.".to_string();
2153 }
2154
2155 let mut latest_error: Option<String> = None;
2156 let mut latest_proposal: Option<String> = None;
2157 let mut latest_check: Option<String> = None;
2158
2159 for event in context.iter().rev() {
2160 if latest_error.is_none()
2161 && let Some(error) = event
2162 .payload
2163 .get("error")
2164 .and_then(serde_json::Value::as_str)
2165 && !error.trim().is_empty()
2166 {
2167 latest_error = Some(trim_for_storage(error, 140));
2168 }
2169
2170 if latest_proposal.is_none()
2171 && event.event_type == ThoughtEventType::ProposalCreated
2172 && let Some(title) = event
2173 .payload
2174 .get("title")
2175 .and_then(serde_json::Value::as_str)
2176 && !title.trim().is_empty()
2177 && !has_template_placeholder_values(title)
2178 {
2179 latest_proposal = Some(trim_for_storage(title, 120));
2180 }
2181
2182 if latest_check.is_none()
2183 && event.event_type == ThoughtEventType::CheckResult
2184 && let Some(result) = event
2185 .payload
2186 .get("result_excerpt")
2187 .and_then(serde_json::Value::as_str)
2188 && !result.trim().is_empty()
2189 && !has_template_placeholder_values(result)
2190 {
2191 latest_check = Some(trim_for_storage(result, 140));
2192 }
2193
2194 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2195 break;
2196 }
2197 }
2198
2199 let mut lines = vec![format!(
2200 "{} recent cognition events are available.",
2201 context.len()
2202 )];
2203 if let Some(error) = latest_error {
2204 lines.push(format!("Latest error signal: {}.", error));
2205 }
2206 if let Some(proposal) = latest_proposal {
2207 lines.push(format!("Recent proposal: {}.", proposal));
2208 }
2209 if let Some(check) = latest_check {
2210 lines.push(format!("Recent check: {}.", check));
2211 }
2212 lines.join(" ")
2213}
2214
2215fn trim_for_storage(input: &str, max_chars: usize) -> String {
2216 if input.chars().count() <= max_chars {
2217 return input.trim().to_string();
2218 }
2219 let mut trimmed = String::with_capacity(max_chars + 8);
2220 for ch in input.chars().take(max_chars) {
2221 trimmed.push(ch);
2222 }
2223 trimmed.push_str("...");
2224 trimmed.trim().to_string()
2225}
2226
2227fn estimate_fact_count(text: &str) -> usize {
2228 let sentence_count =
2229 text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2230 sentence_count.clamp(1, 12)
2231}
2232
2233fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2234 let first_line = thought
2235 .lines()
2236 .find(|line| !line.trim().is_empty())
2237 .unwrap_or("proposal");
2238 let compact = first_line
2239 .replace(['\t', '\r', '\n'], " ")
2240 .split_whitespace()
2241 .collect::<Vec<_>>()
2242 .join(" ");
2243 let trimmed = trim_for_storage(&compact, 72);
2244 if trimmed.is_empty() {
2245 format!("proposal-{}", thought_count)
2246 } else {
2247 trimmed
2248 }
2249}
2250
2251fn default_seed_persona() -> CreatePersonaRequest {
2252 CreatePersonaRequest {
2253 persona_id: Some("root-thinker".to_string()),
2254 name: "root-thinker".to_string(),
2255 role: "orchestrator".to_string(),
2256 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2257 .to_string(),
2258 swarm_id: Some("swarm-core".to_string()),
2259 parent_id: None,
2260 policy: None,
2261 tags: vec!["orchestration".to_string()],
2262 }
2263}
2264
2265fn normalize_thinker_endpoint(base_url: &str) -> String {
2266 let trimmed = base_url.trim().trim_end_matches('/');
2267 if trimmed.ends_with("/chat/completions") {
2268 return trimmed.to_string();
2269 }
2270 if trimmed.is_empty() {
2271 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2272 }
2273 format!("{}/chat/completions", trimmed)
2274}
2275
2276fn env_bool(name: &str, default: bool) -> bool {
2277 std::env::var(name)
2278 .ok()
2279 .and_then(|v| match v.to_ascii_lowercase().as_str() {
2280 "1" | "true" | "yes" | "on" => Some(true),
2281 "0" | "false" | "no" | "off" => Some(false),
2282 _ => None,
2283 })
2284 .unwrap_or(default)
2285}
2286
2287fn env_f32(name: &str, default: f32) -> f32 {
2288 std::env::var(name)
2289 .ok()
2290 .and_then(|v| v.parse::<f32>().ok())
2291 .unwrap_or(default)
2292}
2293
2294fn env_u64(name: &str, default: u64) -> u64 {
2295 std::env::var(name)
2296 .ok()
2297 .and_then(|v| v.parse::<u64>().ok())
2298 .unwrap_or(default)
2299}
2300
2301fn env_u32(name: &str, default: u32) -> u32 {
2302 std::env::var(name)
2303 .ok()
2304 .and_then(|v| v.parse::<u32>().ok())
2305 .unwrap_or(default)
2306}
2307
2308fn env_usize(name: &str, default: usize) -> usize {
2309 std::env::var(name)
2310 .ok()
2311 .and_then(|v| v.parse::<usize>().ok())
2312 .unwrap_or(default)
2313}
2314
2315#[cfg(test)]
2316mod tests {
2317 use super::*;
2318
2319 fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2320 ThoughtWorkItem {
2321 persona_id: "p-1".to_string(),
2322 persona_name: "Spotlessbinco Business Thinker".to_string(),
2323 role: "principal reliability engineer".to_string(),
2324 charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2325 .to_string(),
2326 swarm_id: Some("spotlessbinco".to_string()),
2327 thought_count: 4,
2328 phase,
2329 }
2330 }
2331
2332 fn test_runtime() -> CognitionRuntime {
2333 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2334 enabled: true,
2335 loop_interval_ms: 25,
2336 max_events: 256,
2337 max_snapshots: 32,
2338 default_policy: PersonaPolicy {
2339 max_spawn_depth: 2,
2340 max_branching_factor: 2,
2341 token_budget_per_minute: 1_000,
2342 compute_ms_per_minute: 1_000,
2343 idle_ttl_secs: 300,
2344 share_memory: false,
2345 allowed_tools: Vec::new(),
2346 },
2347 })
2348 }
2349
2350 #[test]
2351 fn normalize_rejects_placeholder_process_line() {
2352 let work = sample_work_item(ThoughtPhase::Compress);
2353 let output = normalize_thought_output(
2354 &work,
2355 &[],
2356 "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2357 );
2358 assert!(
2359 output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2360 );
2361 assert!(!output.contains("State_Summary: ..."));
2362 }
2363
2364 #[test]
2365 fn normalize_accepts_concrete_process_line() {
2366 let work = sample_work_item(ThoughtPhase::Test);
2367 let output = normalize_thought_output(
2368 &work,
2369 &[],
2370 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2371 );
2372 assert_eq!(
2373 output,
2374 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2375 );
2376 }
2377
2378 #[tokio::test]
2379 async fn create_spawn_and_lineage_work() {
2380 let runtime = test_runtime();
2381
2382 let root = runtime
2383 .create_persona(CreatePersonaRequest {
2384 persona_id: Some("root".to_string()),
2385 name: "root".to_string(),
2386 role: "orchestrator".to_string(),
2387 charter: "coordinate".to_string(),
2388 swarm_id: Some("swarm-a".to_string()),
2389 parent_id: None,
2390 policy: None,
2391 tags: Vec::new(),
2392 })
2393 .await
2394 .expect("root should be created");
2395
2396 assert_eq!(root.identity.depth, 0);
2397
2398 let child = runtime
2399 .spawn_child(
2400 "root",
2401 SpawnPersonaRequest {
2402 persona_id: Some("child-1".to_string()),
2403 name: "child-1".to_string(),
2404 role: "analyst".to_string(),
2405 charter: "analyze".to_string(),
2406 swarm_id: None,
2407 policy: None,
2408 },
2409 )
2410 .await
2411 .expect("child should spawn");
2412
2413 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2414 assert_eq!(child.identity.depth, 1);
2415
2416 let lineage = runtime.lineage_graph().await;
2417 assert_eq!(lineage.total_edges, 1);
2418 assert_eq!(lineage.roots, vec!["root".to_string()]);
2419 }
2420
2421 #[tokio::test]
2422 async fn branching_and_depth_limits_are_enforced() {
2423 let runtime = test_runtime();
2424
2425 runtime
2426 .create_persona(CreatePersonaRequest {
2427 persona_id: Some("root".to_string()),
2428 name: "root".to_string(),
2429 role: "orchestrator".to_string(),
2430 charter: "coordinate".to_string(),
2431 swarm_id: Some("swarm-a".to_string()),
2432 parent_id: None,
2433 policy: None,
2434 tags: Vec::new(),
2435 })
2436 .await
2437 .expect("root should be created");
2438
2439 runtime
2440 .spawn_child(
2441 "root",
2442 SpawnPersonaRequest {
2443 persona_id: Some("c1".to_string()),
2444 name: "c1".to_string(),
2445 role: "worker".to_string(),
2446 charter: "run".to_string(),
2447 swarm_id: None,
2448 policy: None,
2449 },
2450 )
2451 .await
2452 .expect("first child should spawn");
2453
2454 runtime
2455 .spawn_child(
2456 "root",
2457 SpawnPersonaRequest {
2458 persona_id: Some("c2".to_string()),
2459 name: "c2".to_string(),
2460 role: "worker".to_string(),
2461 charter: "run".to_string(),
2462 swarm_id: None,
2463 policy: None,
2464 },
2465 )
2466 .await
2467 .expect("second child should spawn");
2468
2469 let third_child = runtime
2470 .spawn_child(
2471 "root",
2472 SpawnPersonaRequest {
2473 persona_id: Some("c3".to_string()),
2474 name: "c3".to_string(),
2475 role: "worker".to_string(),
2476 charter: "run".to_string(),
2477 swarm_id: None,
2478 policy: None,
2479 },
2480 )
2481 .await;
2482 assert!(third_child.is_err());
2483
2484 runtime
2485 .spawn_child(
2486 "c1",
2487 SpawnPersonaRequest {
2488 persona_id: Some("c1-1".to_string()),
2489 name: "c1-1".to_string(),
2490 role: "worker".to_string(),
2491 charter: "run".to_string(),
2492 swarm_id: None,
2493 policy: None,
2494 },
2495 )
2496 .await
2497 .expect("depth 2 should be allowed");
2498
2499 let depth_violation = runtime
2500 .spawn_child(
2501 "c1-1",
2502 SpawnPersonaRequest {
2503 persona_id: Some("c1-1-1".to_string()),
2504 name: "c1-1-1".to_string(),
2505 role: "worker".to_string(),
2506 charter: "run".to_string(),
2507 swarm_id: None,
2508 policy: None,
2509 },
2510 )
2511 .await;
2512 assert!(depth_violation.is_err());
2513 }
2514
2515 #[tokio::test]
2516 async fn start_stop_updates_runtime_status() {
2517 let runtime = test_runtime();
2518
2519 runtime
2520 .start(Some(StartCognitionRequest {
2521 loop_interval_ms: Some(10),
2522 seed_persona: Some(CreatePersonaRequest {
2523 persona_id: Some("seed".to_string()),
2524 name: "seed".to_string(),
2525 role: "watcher".to_string(),
2526 charter: "observe".to_string(),
2527 swarm_id: Some("swarm-seed".to_string()),
2528 parent_id: None,
2529 policy: None,
2530 tags: Vec::new(),
2531 }),
2532 }))
2533 .await
2534 .expect("runtime should start");
2535
2536 tokio::time::sleep(Duration::from_millis(60)).await;
2537 let running_status = runtime.status().await;
2538 assert!(running_status.running);
2539 assert!(running_status.events_buffered > 0);
2540
2541 runtime
2542 .stop(Some("test".to_string()))
2543 .await
2544 .expect("runtime should stop");
2545 let stopped_status = runtime.status().await;
2546 assert!(!stopped_status.running);
2547 }
2548
2549 #[tokio::test]
2550 async fn zero_budget_persona_is_skipped() {
2551 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2552 enabled: true,
2553 loop_interval_ms: 10,
2554 max_events: 256,
2555 max_snapshots: 32,
2556 default_policy: PersonaPolicy {
2557 max_spawn_depth: 2,
2558 max_branching_factor: 2,
2559 token_budget_per_minute: 0,
2560 compute_ms_per_minute: 0,
2561 idle_ttl_secs: 3_600,
2562 share_memory: false,
2563 allowed_tools: Vec::new(),
2564 },
2565 });
2566
2567 let persona = runtime
2568 .create_persona(CreatePersonaRequest {
2569 persona_id: Some("budget-test".to_string()),
2570 name: "budget-test".to_string(),
2571 role: "tester".to_string(),
2572 charter: "test budgets".to_string(),
2573 swarm_id: None,
2574 parent_id: None,
2575 policy: None,
2576 tags: Vec::new(),
2577 })
2578 .await
2579 .expect("should create persona");
2580
2581 assert_eq!(persona.tokens_this_window, 0);
2582 assert_eq!(persona.compute_ms_this_window, 0);
2583
2584 runtime.start(None).await.expect("should start");
2586 tokio::time::sleep(Duration::from_millis(50)).await;
2587 runtime.stop(None).await.expect("should stop");
2588
2589 let p = runtime.get_persona("budget-test").await.unwrap();
2591 assert_eq!(p.thought_count, 0);
2592 assert!(p.budget_paused);
2593 }
2594
2595 #[tokio::test]
2596 async fn budget_counters_reset_after_window() {
2597 let now = Utc::now();
2598 let mut persona = PersonaRuntimeState {
2599 identity: PersonaIdentity {
2600 id: "p1".to_string(),
2601 name: "test".to_string(),
2602 role: "tester".to_string(),
2603 charter: "test".to_string(),
2604 swarm_id: None,
2605 parent_id: None,
2606 depth: 0,
2607 created_at: now,
2608 tags: Vec::new(),
2609 },
2610 policy: PersonaPolicy::default(),
2611 status: PersonaStatus::Active,
2612 thought_count: 0,
2613 last_tick_at: None,
2614 updated_at: now,
2615 tokens_this_window: 5000,
2616 compute_ms_this_window: 3000,
2617 window_started_at: now - ChronoDuration::seconds(61),
2618 last_progress_at: now,
2619 budget_paused: false,
2620 };
2621
2622 let window_elapsed = now
2624 .signed_duration_since(persona.window_started_at)
2625 .num_seconds();
2626 assert!(window_elapsed >= 60);
2627
2628 persona.tokens_this_window = 0;
2630 persona.compute_ms_this_window = 0;
2631 persona.window_started_at = now;
2632
2633 assert_eq!(persona.tokens_this_window, 0);
2634 assert_eq!(persona.compute_ms_this_window, 0);
2635 }
2636
2637 #[tokio::test]
2638 async fn idle_persona_is_reaped() {
2639 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2640 enabled: true,
2641 loop_interval_ms: 10,
2642 max_events: 256,
2643 max_snapshots: 32,
2644 default_policy: PersonaPolicy {
2645 max_spawn_depth: 2,
2646 max_branching_factor: 2,
2647 token_budget_per_minute: 20_000,
2648 compute_ms_per_minute: 10_000,
2649 idle_ttl_secs: 0, share_memory: false,
2651 allowed_tools: Vec::new(),
2652 },
2653 });
2654
2655 runtime
2656 .create_persona(CreatePersonaRequest {
2657 persona_id: Some("idle-test".to_string()),
2658 name: "idle-test".to_string(),
2659 role: "idler".to_string(),
2660 charter: "idle away".to_string(),
2661 swarm_id: None,
2662 parent_id: None,
2663 policy: None,
2664 tags: Vec::new(),
2665 })
2666 .await
2667 .expect("should create persona");
2668
2669 {
2671 let mut personas = runtime.personas.write().await;
2672 if let Some(p) = personas.get_mut("idle-test") {
2673 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2674 }
2675 }
2676
2677 runtime.start(None).await.expect("should start");
2678 tokio::time::sleep(Duration::from_millis(100)).await;
2679 runtime.stop(None).await.expect("should stop");
2680
2681 let p = runtime.get_persona("idle-test").await.unwrap();
2682 assert_eq!(p.status, PersonaStatus::Reaped);
2683 }
2684
2685 #[tokio::test]
2686 async fn budget_paused_persona_not_reaped_for_idle() {
2687 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2688 enabled: true,
2689 loop_interval_ms: 10,
2690 max_events: 256,
2691 max_snapshots: 32,
2692 default_policy: PersonaPolicy {
2693 max_spawn_depth: 2,
2694 max_branching_factor: 2,
2695 token_budget_per_minute: 0, compute_ms_per_minute: 0,
2697 idle_ttl_secs: 0, share_memory: false,
2699 allowed_tools: Vec::new(),
2700 },
2701 });
2702
2703 runtime
2704 .create_persona(CreatePersonaRequest {
2705 persona_id: Some("paused-test".to_string()),
2706 name: "paused-test".to_string(),
2707 role: "pauser".to_string(),
2708 charter: "pause".to_string(),
2709 swarm_id: None,
2710 parent_id: None,
2711 policy: None,
2712 tags: Vec::new(),
2713 })
2714 .await
2715 .expect("should create persona");
2716
2717 {
2719 let mut personas = runtime.personas.write().await;
2720 if let Some(p) = personas.get_mut("paused-test") {
2721 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2722 }
2723 }
2724
2725 runtime.start(None).await.expect("should start");
2726 tokio::time::sleep(Duration::from_millis(100)).await;
2727 runtime.stop(None).await.expect("should stop");
2728
2729 let p = runtime.get_persona("paused-test").await.unwrap();
2730 assert_eq!(p.status, PersonaStatus::Active);
2732 assert!(p.budget_paused);
2733 }
2734
2735 #[tokio::test]
2736 async fn governance_proposal_resolution() {
2737 let runtime = test_runtime();
2738 let gov = SwarmGovernance {
2739 quorum_fraction: 0.5,
2740 required_approvers_by_role: HashMap::new(),
2741 veto_roles: vec!["auditor".to_string()],
2742 vote_timeout_secs: 300,
2743 };
2744 *runtime.governance.write().await = gov;
2745
2746 runtime
2748 .create_persona(CreatePersonaRequest {
2749 persona_id: Some("voter-1".to_string()),
2750 name: "voter-1".to_string(),
2751 role: "engineer".to_string(),
2752 charter: "vote".to_string(),
2753 swarm_id: None,
2754 parent_id: None,
2755 policy: None,
2756 tags: Vec::new(),
2757 })
2758 .await
2759 .unwrap();
2760 runtime
2761 .create_persona(CreatePersonaRequest {
2762 persona_id: Some("voter-2".to_string()),
2763 name: "voter-2".to_string(),
2764 role: "engineer".to_string(),
2765 charter: "vote".to_string(),
2766 swarm_id: None,
2767 parent_id: None,
2768 policy: None,
2769 tags: Vec::new(),
2770 })
2771 .await
2772 .unwrap();
2773
2774 {
2776 let mut proposals = runtime.proposals.write().await;
2777 let mut votes = HashMap::new();
2778 votes.insert("voter-1".to_string(), ProposalVote::Approve);
2779 proposals.insert(
2780 "prop-1".to_string(),
2781 Proposal {
2782 id: "prop-1".to_string(),
2783 persona_id: "voter-1".to_string(),
2784 title: "test proposal".to_string(),
2785 rationale: "testing governance".to_string(),
2786 evidence_refs: Vec::new(),
2787 risk: ProposalRisk::Low,
2788 status: ProposalStatus::Created,
2789 created_at: Utc::now(),
2790 votes,
2791 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2792 votes_requested: true,
2793 quorum_needed: 1,
2794 },
2795 );
2796 }
2797
2798 runtime.start(None).await.unwrap();
2800 tokio::time::sleep(Duration::from_millis(100)).await;
2801 runtime.stop(None).await.unwrap();
2802
2803 let proposals = runtime.get_proposals().await;
2804 let prop = proposals.get("prop-1").unwrap();
2805 assert!(
2807 prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2808 "Expected Verified or Executed, got {:?}",
2809 prop.status
2810 );
2811 }
2812
2813 #[tokio::test]
2814 async fn veto_rejects_proposal() {
2815 let runtime = test_runtime();
2816 let gov = SwarmGovernance {
2817 quorum_fraction: 0.5,
2818 required_approvers_by_role: HashMap::new(),
2819 veto_roles: vec!["auditor".to_string()],
2820 vote_timeout_secs: 300,
2821 };
2822 *runtime.governance.write().await = gov;
2823
2824 runtime
2825 .create_persona(CreatePersonaRequest {
2826 persona_id: Some("eng".to_string()),
2827 name: "eng".to_string(),
2828 role: "engineer".to_string(),
2829 charter: "build".to_string(),
2830 swarm_id: None,
2831 parent_id: None,
2832 policy: None,
2833 tags: Vec::new(),
2834 })
2835 .await
2836 .unwrap();
2837 runtime
2838 .create_persona(CreatePersonaRequest {
2839 persona_id: Some("aud".to_string()),
2840 name: "aud".to_string(),
2841 role: "auditor".to_string(),
2842 charter: "audit".to_string(),
2843 swarm_id: None,
2844 parent_id: None,
2845 policy: None,
2846 tags: Vec::new(),
2847 })
2848 .await
2849 .unwrap();
2850
2851 {
2852 let mut proposals = runtime.proposals.write().await;
2853 let mut votes = HashMap::new();
2854 votes.insert("eng".to_string(), ProposalVote::Approve);
2855 votes.insert("aud".to_string(), ProposalVote::Veto);
2856 proposals.insert(
2857 "prop-veto".to_string(),
2858 Proposal {
2859 id: "prop-veto".to_string(),
2860 persona_id: "eng".to_string(),
2861 title: "vetoed proposal".to_string(),
2862 rationale: "testing veto".to_string(),
2863 evidence_refs: Vec::new(),
2864 risk: ProposalRisk::Low,
2865 status: ProposalStatus::Created,
2866 created_at: Utc::now(),
2867 votes,
2868 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2869 votes_requested: true,
2870 quorum_needed: 1,
2871 },
2872 );
2873 }
2874
2875 runtime.start(None).await.unwrap();
2876 tokio::time::sleep(Duration::from_millis(100)).await;
2877 runtime.stop(None).await.unwrap();
2878
2879 let proposals = runtime.get_proposals().await;
2880 let prop = proposals.get("prop-veto").unwrap();
2881 assert_eq!(prop.status, ProposalStatus::Rejected);
2882 }
2883
2884 #[test]
2885 fn global_workspace_default() {
2886 let ws = GlobalWorkspace::default();
2887 assert!(ws.top_beliefs.is_empty());
2888 assert!(ws.top_uncertainties.is_empty());
2889 assert!(ws.top_attention.is_empty());
2890 }
2891
2892 #[test]
2893 fn attention_item_creation() {
2894 let item = AttentionItem {
2895 id: "a1".to_string(),
2896 topic: "test topic".to_string(),
2897 topic_tags: vec!["reliability".to_string()],
2898 priority: 0.8,
2899 source_type: AttentionSource::ContestedBelief,
2900 source_id: "b1".to_string(),
2901 assigned_persona: None,
2902 created_at: Utc::now(),
2903 resolved_at: None,
2904 };
2905 assert!(item.resolved_at.is_none());
2906 assert_eq!(item.priority, 0.8);
2907 }
2908}