1pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16pub use thinker::{
17 CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
18};
19
20use crate::tool::ToolRegistry;
21use anyhow::{Result, anyhow};
22use beliefs::{Belief, BeliefStatus};
23use chrono::{DateTime, Duration as ChronoDuration, Utc};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::time::Duration;
30use tokio::sync::{Mutex, RwLock, broadcast};
31use tokio::task::JoinHandle;
32use tokio::time::Instant;
33use uuid::Uuid;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub enum PersonaStatus {
39 Active,
40 Idle,
41 Reaped,
42 Error,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct PersonaPolicy {
48 pub max_spawn_depth: u32,
49 pub max_branching_factor: u32,
50 pub token_budget_per_minute: u32,
51 pub compute_ms_per_minute: u32,
52 pub idle_ttl_secs: u64,
53 pub share_memory: bool,
54 #[serde(default)]
55 pub allowed_tools: Vec<String>,
56}
57
58impl Default for PersonaPolicy {
59 fn default() -> Self {
60 Self {
61 max_spawn_depth: 4,
62 max_branching_factor: 4,
63 token_budget_per_minute: 20_000,
64 compute_ms_per_minute: 10_000,
65 idle_ttl_secs: 3_600,
66 share_memory: false,
67 allowed_tools: Vec::new(),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct PersonaIdentity {
75 pub id: String,
76 pub name: String,
77 pub role: String,
78 pub charter: String,
79 pub swarm_id: Option<String>,
80 pub parent_id: Option<String>,
81 pub depth: u32,
82 pub created_at: DateTime<Utc>,
83 #[serde(default)]
84 pub tags: Vec<String>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PersonaRuntimeState {
90 pub identity: PersonaIdentity,
91 pub policy: PersonaPolicy,
92 pub status: PersonaStatus,
93 pub thought_count: u64,
94 pub last_tick_at: Option<DateTime<Utc>>,
95 pub updated_at: DateTime<Utc>,
96 pub tokens_this_window: u32,
98 pub compute_ms_this_window: u32,
100 pub window_started_at: DateTime<Utc>,
102 pub last_progress_at: DateTime<Utc>,
104 #[serde(default)]
106 pub budget_paused: bool,
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
111#[serde(rename_all = "snake_case")]
112pub enum ProposalRisk {
113 Low,
114 Medium,
115 High,
116 Critical,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "snake_case")]
122pub enum ProposalStatus {
123 Created,
124 Verified,
125 Rejected,
126 Executed,
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub enum ProposalVote {
133 Approve,
134 Reject,
135 Veto,
136 Abstain,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct Proposal {
142 pub id: String,
143 pub persona_id: String,
144 pub title: String,
145 pub rationale: String,
146 pub evidence_refs: Vec<String>,
147 pub risk: ProposalRisk,
148 pub status: ProposalStatus,
149 pub created_at: DateTime<Utc>,
150 #[serde(default)]
152 pub votes: HashMap<String, ProposalVote>,
153 pub vote_deadline: Option<DateTime<Utc>>,
155 #[serde(default)]
157 pub votes_requested: bool,
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "snake_case")]
163pub enum ThoughtEventType {
164 ThoughtGenerated,
165 HypothesisRaised,
166 CheckRequested,
167 CheckResult,
168 ProposalCreated,
169 ProposalVerified,
170 ProposalRejected,
171 ActionExecuted,
172 PersonaSpawned,
173 PersonaReaped,
174 SnapshotCompressed,
175 BeliefExtracted,
176 BeliefContested,
177 BeliefRevalidated,
178 BudgetPaused,
179 IdleReaped,
180 AttentionCreated,
181 VoteCast,
182 WorkspaceUpdated,
183}
184
185impl ThoughtEventType {
186 fn as_str(&self) -> &'static str {
187 match self {
188 Self::ThoughtGenerated => "thought_generated",
189 Self::HypothesisRaised => "hypothesis_raised",
190 Self::CheckRequested => "check_requested",
191 Self::CheckResult => "check_result",
192 Self::ProposalCreated => "proposal_created",
193 Self::ProposalVerified => "proposal_verified",
194 Self::ProposalRejected => "proposal_rejected",
195 Self::ActionExecuted => "action_executed",
196 Self::PersonaSpawned => "persona_spawned",
197 Self::PersonaReaped => "persona_reaped",
198 Self::SnapshotCompressed => "snapshot_compressed",
199 Self::BeliefExtracted => "belief_extracted",
200 Self::BeliefContested => "belief_contested",
201 Self::BeliefRevalidated => "belief_revalidated",
202 Self::BudgetPaused => "budget_paused",
203 Self::IdleReaped => "idle_reaped",
204 Self::AttentionCreated => "attention_created",
205 Self::VoteCast => "vote_cast",
206 Self::WorkspaceUpdated => "workspace_updated",
207 }
208 }
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ThoughtEvent {
214 pub id: String,
215 pub event_type: ThoughtEventType,
216 pub persona_id: Option<String>,
217 pub swarm_id: Option<String>,
218 pub timestamp: DateTime<Utc>,
219 pub payload: serde_json::Value,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct MemorySnapshot {
225 pub id: String,
226 pub generated_at: DateTime<Utc>,
227 pub swarm_id: Option<String>,
228 pub persona_scope: Vec<String>,
229 pub summary: String,
230 pub hot_event_count: usize,
231 pub warm_fact_count: usize,
232 pub cold_snapshot_count: usize,
233 pub metadata: HashMap<String, serde_json::Value>,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct CreatePersonaRequest {
239 pub persona_id: Option<String>,
240 pub name: String,
241 pub role: String,
242 pub charter: String,
243 pub swarm_id: Option<String>,
244 pub parent_id: Option<String>,
245 pub policy: Option<PersonaPolicy>,
246 #[serde(default)]
247 pub tags: Vec<String>,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct SpawnPersonaRequest {
253 pub persona_id: Option<String>,
254 pub name: String,
255 pub role: String,
256 pub charter: String,
257 pub swarm_id: Option<String>,
258 pub policy: Option<PersonaPolicy>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct ReapPersonaRequest {
264 pub cascade: Option<bool>,
265 pub reason: Option<String>,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct StartCognitionRequest {
271 pub loop_interval_ms: Option<u64>,
272 pub seed_persona: Option<CreatePersonaRequest>,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct StopCognitionRequest {
278 pub reason: Option<String>,
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
285#[serde(rename_all = "snake_case")]
286pub enum AttentionSource {
287 ContestedBelief,
288 FailedCheck,
289 StaleBelief,
290 ProposalTimeout,
291 FailedExecution,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct AttentionItem {
297 pub id: String,
298 pub topic: String,
299 pub topic_tags: Vec<String>,
300 pub priority: f32,
301 pub source_type: AttentionSource,
302 pub source_id: String,
303 pub assigned_persona: Option<String>,
304 pub created_at: DateTime<Utc>,
305 pub resolved_at: Option<DateTime<Utc>>,
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SwarmGovernance {
311 pub quorum_fraction: f32,
312 pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
313 pub veto_roles: Vec<String>,
314 pub vote_timeout_secs: u64,
315}
316
317impl Default for SwarmGovernance {
318 fn default() -> Self {
319 Self {
320 quorum_fraction: 0.5,
321 required_approvers_by_role: HashMap::new(),
322 veto_roles: Vec::new(),
323 vote_timeout_secs: 300,
324 }
325 }
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct GlobalWorkspace {
331 pub top_beliefs: Vec<String>,
332 pub top_uncertainties: Vec<String>,
333 pub top_attention: Vec<String>,
334 pub active_objectives: Vec<String>,
335 pub updated_at: DateTime<Utc>,
336}
337
338impl Default for GlobalWorkspace {
339 fn default() -> Self {
340 Self {
341 top_beliefs: Vec::new(),
342 top_uncertainties: Vec::new(),
343 top_attention: Vec::new(),
344 active_objectives: Vec::new(),
345 updated_at: Utc::now(),
346 }
347 }
348}
349
350#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct CognitionStatus {
353 pub enabled: bool,
354 pub running: bool,
355 pub loop_interval_ms: u64,
356 pub started_at: Option<DateTime<Utc>>,
357 pub last_tick_at: Option<DateTime<Utc>>,
358 pub persona_count: usize,
359 pub active_persona_count: usize,
360 pub events_buffered: usize,
361 pub snapshots_buffered: usize,
362}
363
364#[derive(Debug, Clone, Serialize, Deserialize)]
366pub struct ReapPersonaResponse {
367 pub reaped_ids: Vec<String>,
368 pub count: usize,
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct LineageNode {
374 pub persona_id: String,
375 pub parent_id: Option<String>,
376 pub children: Vec<String>,
377 pub depth: u32,
378 pub status: PersonaStatus,
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct LineageGraph {
384 pub nodes: Vec<LineageNode>,
385 pub roots: Vec<String>,
386 pub total_edges: usize,
387}
388
389#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390enum ThoughtPhase {
391 Observe,
392 Reflect,
393 Test,
394 Compress,
395}
396
397impl ThoughtPhase {
398 fn from_thought_count(thought_count: u64) -> Self {
399 match thought_count % 4 {
400 1 => Self::Observe,
401 2 => Self::Reflect,
402 3 => Self::Test,
403 _ => Self::Compress,
404 }
405 }
406
407 fn as_str(&self) -> &'static str {
408 match self {
409 Self::Observe => "observe",
410 Self::Reflect => "reflect",
411 Self::Test => "test",
412 Self::Compress => "compress",
413 }
414 }
415
416 fn event_type(&self) -> ThoughtEventType {
417 match self {
418 Self::Observe => ThoughtEventType::ThoughtGenerated,
419 Self::Reflect => ThoughtEventType::HypothesisRaised,
420 Self::Test => ThoughtEventType::CheckRequested,
421 Self::Compress => ThoughtEventType::SnapshotCompressed,
422 }
423 }
424}
425
426#[derive(Debug, Clone)]
427struct ThoughtWorkItem {
428 persona_id: String,
429 persona_name: String,
430 role: String,
431 charter: String,
432 swarm_id: Option<String>,
433 thought_count: u64,
434 phase: ThoughtPhase,
435}
436
437#[derive(Debug, Clone)]
438struct ThoughtResult {
439 source: &'static str,
440 model: Option<String>,
441 finish_reason: Option<String>,
442 thinking: String,
443 prompt_tokens: Option<u32>,
444 completion_tokens: Option<u32>,
445 total_tokens: Option<u32>,
446 latency_ms: u128,
447 error: Option<String>,
448}
449
450#[derive(Debug, Clone)]
452pub struct CognitionRuntimeOptions {
453 pub enabled: bool,
454 pub loop_interval_ms: u64,
455 pub max_events: usize,
456 pub max_snapshots: usize,
457 pub default_policy: PersonaPolicy,
458}
459
460impl Default for CognitionRuntimeOptions {
461 fn default() -> Self {
462 Self {
463 enabled: false,
464 loop_interval_ms: 2_000,
465 max_events: 2_000,
466 max_snapshots: 128,
467 default_policy: PersonaPolicy::default(),
468 }
469 }
470}
471
472#[derive(Debug)]
474pub struct CognitionRuntime {
475 enabled: bool,
476 max_events: usize,
477 max_snapshots: usize,
478 default_policy: PersonaPolicy,
479 running: Arc<AtomicBool>,
480 loop_interval_ms: Arc<RwLock<u64>>,
481 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
482 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
483 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
484 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
485 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
486 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
487 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
488 event_tx: broadcast::Sender<ThoughtEvent>,
489 thinker: Option<Arc<ThinkerClient>>,
490 beliefs: Arc<RwLock<HashMap<String, Belief>>>,
491 attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
492 governance: Arc<RwLock<SwarmGovernance>>,
493 workspace: Arc<RwLock<GlobalWorkspace>>,
494 tools: Option<Arc<ToolRegistry>>,
495 receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
496 pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
498}
499
500impl CognitionRuntime {
501 pub fn new_from_env() -> Self {
503 let mut options = CognitionRuntimeOptions::default();
504 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
505 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
506 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
507 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
508
509 options.default_policy = PersonaPolicy {
510 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
511 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
512 token_budget_per_minute: env_u32(
513 "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
514 20_000,
515 ),
516 compute_ms_per_minute: env_u32("CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE", 10_000),
517 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
518 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
519 allowed_tools: Vec::new(),
520 };
521
522 let thinker_config = ThinkerConfig {
523 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
524 backend: thinker::ThinkerBackend::from_env(
525 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
526 .unwrap_or_else(|_| "openai_compat".to_string()),
527 ),
528 endpoint: normalize_thinker_endpoint(
529 &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
530 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
531 ),
532 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
533 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
534 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
535 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
536 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
537 .ok()
538 .and_then(|v| v.parse::<f32>().ok()),
539 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
540 timeout_ms: env_u64("CODETETHER_COGNITION_THINKER_TIMEOUT_MS", 12_000),
541 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
542 candle_tokenizer_path: std::env::var(
543 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
544 )
545 .ok(),
546 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
547 candle_device: thinker::CandleDevicePreference::from_env(
548 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
549 .unwrap_or_else(|_| "auto".to_string()),
550 ),
551 candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
552 candle_repeat_penalty: env_f32(
553 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
554 1.1,
555 ),
556 candle_repeat_last_n: env_usize(
557 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
558 64,
559 ),
560 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
561 };
562
563 Self::new_with_options_and_thinker(options, Some(thinker_config))
564 }
565
566 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
568 Self::new_with_options_and_thinker(options, None)
569 }
570
571 fn new_with_options_and_thinker(
572 options: CognitionRuntimeOptions,
573 thinker_config: Option<ThinkerConfig>,
574 ) -> Self {
575 let (event_tx, _) = broadcast::channel(options.max_events.max(16));
576 let thinker = thinker_config.and_then(|cfg| {
577 if !cfg.enabled {
578 return None;
579 }
580 match ThinkerClient::new(cfg) {
581 Ok(client) => {
582 tracing::info!(
583 backend = ?client.config().backend,
584 endpoint = %client.config().endpoint,
585 model = %client.config().model,
586 "Cognition thinker initialized"
587 );
588 Some(Arc::new(client))
589 }
590 Err(error) => {
591 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
592 None
593 }
594 }
595 });
596
597 Self {
598 enabled: options.enabled,
599 max_events: options.max_events.max(32),
600 max_snapshots: options.max_snapshots.max(8),
601 default_policy: options.default_policy,
602 running: Arc::new(AtomicBool::new(false)),
603 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
604 started_at: Arc::new(RwLock::new(None)),
605 last_tick_at: Arc::new(RwLock::new(None)),
606 personas: Arc::new(RwLock::new(HashMap::new())),
607 proposals: Arc::new(RwLock::new(HashMap::new())),
608 events: Arc::new(RwLock::new(VecDeque::new())),
609 snapshots: Arc::new(RwLock::new(VecDeque::new())),
610 loop_handle: Arc::new(Mutex::new(None)),
611 event_tx,
612 thinker,
613 beliefs: Arc::new(RwLock::new(HashMap::new())),
614 attention_queue: Arc::new(RwLock::new(Vec::new())),
615 governance: Arc::new(RwLock::new(SwarmGovernance::default())),
616 workspace: Arc::new(RwLock::new(GlobalWorkspace::default())),
617 tools: None,
618 receipts: Arc::new(RwLock::new(Vec::new())),
619 pending_approvals: Arc::new(RwLock::new(HashMap::new())),
620 }
621 }
622
623 pub fn is_enabled(&self) -> bool {
625 self.enabled
626 }
627
628 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
630 self.event_tx.subscribe()
631 }
632
633 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
635 if !self.enabled {
636 return Err(anyhow!(
637 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
638 ));
639 }
640
641 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
642 if let Some(request) = req {
643 if let Some(interval) = request.loop_interval_ms {
644 let mut lock = self.loop_interval_ms.write().await;
645 *lock = interval.max(100);
646 }
647 requested_seed_persona = request.seed_persona;
648 }
649
650 let has_personas = !self.personas.read().await.is_empty();
651 if !has_personas {
652 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
653 let _ = self.create_persona(seed).await?;
654 }
655
656 if self.running.load(Ordering::SeqCst) {
657 return Ok(self.status().await);
658 }
659
660 self.running.store(true, Ordering::SeqCst);
661 {
662 let mut started = self.started_at.write().await;
663 *started = Some(Utc::now());
664 }
665
666 let running = Arc::clone(&self.running);
667 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
668 let last_tick_at = Arc::clone(&self.last_tick_at);
669 let personas = Arc::clone(&self.personas);
670 let proposals = Arc::clone(&self.proposals);
671 let events = Arc::clone(&self.events);
672 let snapshots = Arc::clone(&self.snapshots);
673 let max_events = self.max_events;
674 let max_snapshots = self.max_snapshots;
675 let event_tx = self.event_tx.clone();
676 let thinker = self.thinker.clone();
677 let beliefs = Arc::clone(&self.beliefs);
678 let attention_queue = Arc::clone(&self.attention_queue);
679 let governance = Arc::clone(&self.governance);
680 let workspace = Arc::clone(&self.workspace);
681 let tools = self.tools.clone();
682 let receipts = Arc::clone(&self.receipts);
683 let pending_approvals = Arc::clone(&self.pending_approvals);
684
685 let handle = tokio::spawn(async move {
686 let mut next_tick = Instant::now();
687 while running.load(Ordering::SeqCst) {
688 let now_instant = Instant::now();
689 if now_instant < next_tick {
690 tokio::time::sleep_until(next_tick).await;
691 }
692 if !running.load(Ordering::SeqCst) {
693 break;
694 }
695
696 let now = Utc::now();
697 {
698 let mut lock = last_tick_at.write().await;
699 *lock = Some(now);
700 }
701
702 let mut new_events = Vec::new();
703 let mut new_snapshots = Vec::new();
704 let mut new_proposals = Vec::new();
705
706 let work_items: Vec<ThoughtWorkItem> = {
708 let mut map = personas.write().await;
709 let mut items = Vec::new();
710 for persona in map.values_mut() {
711 if persona.status != PersonaStatus::Active {
712 continue;
713 }
714
715 let window_elapsed = now
717 .signed_duration_since(persona.window_started_at)
718 .num_seconds();
719 if window_elapsed >= 60 {
720 persona.tokens_this_window = 0;
721 persona.compute_ms_this_window = 0;
722 persona.window_started_at = now;
723 }
724
725 let token_ok =
727 persona.tokens_this_window < persona.policy.token_budget_per_minute;
728 let compute_ok =
729 persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
730 if !token_ok || !compute_ok {
731 if !persona.budget_paused {
732 persona.budget_paused = true;
733 new_events.push(ThoughtEvent {
734 id: Uuid::new_v4().to_string(),
735 event_type: ThoughtEventType::BudgetPaused,
736 persona_id: Some(persona.identity.id.clone()),
737 swarm_id: persona.identity.swarm_id.clone(),
738 timestamp: now,
739 payload: json!({
740 "budget_paused": true,
741 "tokens_used": persona.tokens_this_window,
742 "compute_ms_used": persona.compute_ms_this_window,
743 "token_budget": persona.policy.token_budget_per_minute,
744 "compute_budget": persona.policy.compute_ms_per_minute,
745 }),
746 });
747 }
748 continue; }
750
751 persona.budget_paused = false;
752 persona.thought_count = persona.thought_count.saturating_add(1);
753 persona.last_tick_at = Some(now);
754 persona.updated_at = now;
755 items.push(ThoughtWorkItem {
756 persona_id: persona.identity.id.clone(),
757 persona_name: persona.identity.name.clone(),
758 role: persona.identity.role.clone(),
759 charter: persona.identity.charter.clone(),
760 swarm_id: persona.identity.swarm_id.clone(),
761 thought_count: persona.thought_count,
762 phase: ThoughtPhase::from_thought_count(persona.thought_count),
763 });
764 }
765 items
766 };
767
768 for work in &work_items {
769 let context = recent_persona_context(&events, &work.persona_id, 8).await;
770
771 let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
772
773 let event_timestamp = Utc::now();
774 let is_fallback = thought.source == "fallback";
775
776 new_events.push(ThoughtEvent {
777 id: Uuid::new_v4().to_string(),
778 event_type: work.phase.event_type(),
779 persona_id: Some(work.persona_id.clone()),
780 swarm_id: work.swarm_id.clone(),
781 timestamp: event_timestamp,
782 payload: json!({
783 "phase": work.phase.as_str(),
784 "thought_count": work.thought_count,
785 "persona": {
786 "id": work.persona_id.clone(),
787 "name": work.persona_name.clone(),
788 "role": work.role.clone(),
789 },
790 "context_event_count": context.len(),
791 "thinking": thought.thinking.clone(),
792 "source": thought.source,
793 "model": thought.model.clone(),
794 "finish_reason": thought.finish_reason.clone(),
795 "usage": {
796 "prompt_tokens": thought.prompt_tokens,
797 "completion_tokens": thought.completion_tokens,
798 "total_tokens": thought.total_tokens,
799 },
800 "latency_ms": thought.latency_ms,
801 "error": thought.error.clone(),
802 }),
803 });
804
805 {
807 let mut map = personas.write().await;
808 if let Some(persona) = map.get_mut(&work.persona_id) {
809 let tokens_used = thought.total_tokens.unwrap_or(0);
810 let compute_used = thought.latency_ms as u32;
811 persona.tokens_this_window =
812 persona.tokens_this_window.saturating_add(tokens_used);
813 persona.compute_ms_this_window =
814 persona.compute_ms_this_window.saturating_add(compute_used);
815
816 if !is_fallback {
818 persona.last_progress_at = Utc::now();
819 }
820 }
821 }
822
823 if work.phase == ThoughtPhase::Reflect && !is_fallback {
825 let extracted = beliefs::extract_beliefs_from_thought(
826 thinker.as_deref(),
827 &work.persona_id,
828 &thought.thinking,
829 )
830 .await;
831
832 if !extracted.is_empty() {
833 let mut belief_store = beliefs.write().await;
834 let mut attn_queue = attention_queue.write().await;
835 for mut new_belief in extracted {
836 let existing_id = belief_store
838 .values()
839 .find(|b| {
840 b.belief_key == new_belief.belief_key
841 && b.status != BeliefStatus::Invalidated
842 })
843 .map(|b| b.id.clone());
844
845 if let Some(eid) = existing_id {
846 if let Some(existing) = belief_store.get_mut(&eid) {
848 if !existing.confirmed_by.contains(&work.persona_id) {
849 existing.confirmed_by.push(work.persona_id.clone());
850 }
851 existing.updated_at = Utc::now();
852 }
853 } else {
854 let contest_targets: Vec<String> =
856 new_belief.contradicts.clone();
857 for target_key in &contest_targets {
858 if let Some(target) = belief_store.values_mut().find(|b| {
859 &b.belief_key == target_key
860 && b.status != BeliefStatus::Invalidated
861 }) {
862 target.contested_by.push(new_belief.id.clone());
863 if !new_belief.contradicts.contains(&target.belief_key)
864 {
865 new_belief
866 .contradicts
867 .push(target.belief_key.clone());
868 }
869 target.confidence = (target.confidence - 0.1).max(0.05);
871 if target.confidence < 0.5 {
872 target.status = BeliefStatus::Stale;
873 } else {
874 attn_queue.push(AttentionItem {
876 id: Uuid::new_v4().to_string(),
877 topic: format!(
878 "Revalidate belief: {}",
879 target.claim
880 ),
881 topic_tags: vec![target.belief_key.clone()],
882 priority: 0.7,
883 source_type: AttentionSource::ContestedBelief,
884 source_id: target.id.clone(),
885 assigned_persona: None,
886 created_at: Utc::now(),
887 resolved_at: None,
888 });
889 }
890 }
891 }
892
893 new_events.push(ThoughtEvent {
894 id: Uuid::new_v4().to_string(),
895 event_type: ThoughtEventType::BeliefExtracted,
896 persona_id: Some(work.persona_id.clone()),
897 swarm_id: work.swarm_id.clone(),
898 timestamp: Utc::now(),
899 payload: json!({
900 "belief_id": new_belief.id,
901 "belief_key": new_belief.belief_key,
902 "claim": trim_for_storage(&new_belief.claim, 280),
903 "confidence": new_belief.confidence,
904 }),
905 });
906
907 {
909 let mut map = personas.write().await;
910 if let Some(p) = map.get_mut(&work.persona_id) {
911 p.last_progress_at = Utc::now();
912 }
913 }
914
915 belief_store.insert(new_belief.id.clone(), new_belief);
916 }
917 }
918 }
919 }
920
921 if work.phase == ThoughtPhase::Test {
922 new_events.push(ThoughtEvent {
923 id: Uuid::new_v4().to_string(),
924 event_type: ThoughtEventType::CheckResult,
925 persona_id: Some(work.persona_id.clone()),
926 swarm_id: work.swarm_id.clone(),
927 timestamp: Utc::now(),
928 payload: json!({
929 "phase": work.phase.as_str(),
930 "thought_count": work.thought_count,
931 "result_excerpt": trim_for_storage(&thought.thinking, 280),
932 "source": thought.source,
933 "model": thought.model,
934 }),
935 });
936
937 {
939 let mut map = personas.write().await;
940 if let Some(p) = map.get_mut(&work.persona_id) {
941 p.last_progress_at = Utc::now();
942 }
943 }
944
945 if !is_fallback {
947 if let Some(ref tool_registry) = tools {
948 let allowed = {
949 let map = personas.read().await;
950 map.get(&work.persona_id)
951 .map(|p| p.policy.allowed_tools.clone())
952 .unwrap_or_default()
953 };
954 if !allowed.is_empty() {
955 let tool_results = executor::execute_tool_requests(
956 thinker.as_deref(),
957 tool_registry,
958 &work.persona_id,
959 &thought.thinking,
960 &allowed,
961 )
962 .await;
963
964 for result_event in tool_results {
965 new_events.push(result_event);
966 let mut map = personas.write().await;
968 if let Some(p) = map.get_mut(&work.persona_id) {
969 p.last_progress_at = Utc::now();
970 }
971 }
972 }
973 }
974 }
975 }
976
977 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
978 let gov = governance.read().await;
979 let proposal = Proposal {
980 id: Uuid::new_v4().to_string(),
981 persona_id: work.persona_id.clone(),
982 title: proposal_title_from_thought(
983 &thought.thinking,
984 work.thought_count,
985 ),
986 rationale: trim_for_storage(&thought.thinking, 900),
987 evidence_refs: vec!["internal.thought_stream".to_string()],
988 risk: ProposalRisk::Low,
989 status: ProposalStatus::Created,
990 created_at: Utc::now(),
991 votes: HashMap::new(),
992 vote_deadline: Some(
993 Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
994 ),
995 votes_requested: false,
996 };
997
998 new_events.push(ThoughtEvent {
999 id: Uuid::new_v4().to_string(),
1000 event_type: ThoughtEventType::ProposalCreated,
1001 persona_id: Some(work.persona_id.clone()),
1002 swarm_id: work.swarm_id.clone(),
1003 timestamp: Utc::now(),
1004 payload: json!({
1005 "proposal_id": proposal.id,
1006 "title": proposal.title,
1007 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1008 "source": thought.source,
1009 "model": thought.model,
1010 }),
1011 });
1012
1013 new_proposals.push(proposal);
1014 }
1015
1016 if work.phase == ThoughtPhase::Compress {
1017 new_snapshots.push(MemorySnapshot {
1018 id: Uuid::new_v4().to_string(),
1019 generated_at: Utc::now(),
1020 swarm_id: work.swarm_id.clone(),
1021 persona_scope: vec![work.persona_id.clone()],
1022 summary: trim_for_storage(&thought.thinking, 1_500),
1023 hot_event_count: context.len(),
1024 warm_fact_count: estimate_fact_count(&thought.thinking),
1025 cold_snapshot_count: 1,
1026 metadata: HashMap::from([
1027 (
1028 "phase".to_string(),
1029 serde_json::Value::String(work.phase.as_str().to_string()),
1030 ),
1031 (
1032 "role".to_string(),
1033 serde_json::Value::String(work.role.clone()),
1034 ),
1035 (
1036 "source".to_string(),
1037 serde_json::Value::String(thought.source.to_string()),
1038 ),
1039 (
1040 "model".to_string(),
1041 serde_json::Value::String(
1042 thought
1043 .model
1044 .clone()
1045 .unwrap_or_else(|| "fallback".to_string()),
1046 ),
1047 ),
1048 (
1049 "completion_tokens".to_string(),
1050 serde_json::Value::Number(serde_json::Number::from(
1051 thought.completion_tokens.unwrap_or(0) as u64,
1052 )),
1053 ),
1054 ]),
1055 });
1056
1057 {
1059 let mut belief_store = beliefs.write().await;
1060 let mut attn_queue = attention_queue.write().await;
1061 let stale_ids: Vec<String> = belief_store
1062 .values()
1063 .filter(|b| {
1064 b.status == BeliefStatus::Active && now > b.review_after
1065 })
1066 .map(|b| b.id.clone())
1067 .collect();
1068 for id in stale_ids {
1069 if let Some(belief) = belief_store.get_mut(&id) {
1070 belief.status = BeliefStatus::Stale;
1071 belief.confidence *= 0.98;
1072 belief.confidence = belief.confidence.max(0.05);
1073 attn_queue.push(AttentionItem {
1074 id: Uuid::new_v4().to_string(),
1075 topic: format!("Stale belief: {}", belief.claim),
1076 topic_tags: vec![belief.belief_key.clone()],
1077 priority: 0.4,
1078 source_type: AttentionSource::StaleBelief,
1079 source_id: belief.id.clone(),
1080 assigned_persona: None,
1081 created_at: now,
1082 resolved_at: None,
1083 });
1084 }
1085 }
1086 }
1087
1088 {
1090 let belief_store = beliefs.read().await;
1091 let attn_queue = attention_queue.read().await;
1092
1093 let mut sorted_beliefs: Vec<&Belief> = belief_store
1094 .values()
1095 .filter(|b| b.status == BeliefStatus::Active)
1096 .collect();
1097 sorted_beliefs.sort_by(|a, b| {
1098 let score_a = a.confidence
1099 * (1.0
1100 / (1.0
1101 + now.signed_duration_since(a.updated_at).num_minutes()
1102 as f32));
1103 let score_b = b.confidence
1104 * (1.0
1105 / (1.0
1106 + now.signed_duration_since(b.updated_at).num_minutes()
1107 as f32));
1108 score_b
1109 .partial_cmp(&score_a)
1110 .unwrap_or(std::cmp::Ordering::Equal)
1111 });
1112
1113 let top_beliefs: Vec<String> = sorted_beliefs
1114 .iter()
1115 .take(10)
1116 .map(|b| b.id.clone())
1117 .collect();
1118 let top_uncertainties: Vec<String> = belief_store
1119 .values()
1120 .filter(|b| {
1121 b.status == BeliefStatus::Stale || !b.contested_by.is_empty()
1122 })
1123 .take(5)
1124 .map(|b| {
1125 format!("[{}] {}", b.belief_key, trim_for_storage(&b.claim, 80))
1126 })
1127 .collect();
1128
1129 let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1130 .iter()
1131 .filter(|a| a.resolved_at.is_none())
1132 .collect();
1133 sorted_attn.sort_by(|a, b| {
1134 b.priority
1135 .partial_cmp(&a.priority)
1136 .unwrap_or(std::cmp::Ordering::Equal)
1137 });
1138 let top_attention: Vec<String> =
1139 sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1140
1141 let mut ws = workspace.write().await;
1142 ws.top_beliefs = top_beliefs;
1143 ws.top_uncertainties = top_uncertainties;
1144 ws.top_attention = top_attention;
1145 ws.updated_at = now;
1146 }
1147
1148 new_events.push(ThoughtEvent {
1149 id: Uuid::new_v4().to_string(),
1150 event_type: ThoughtEventType::WorkspaceUpdated,
1151 persona_id: Some(work.persona_id.clone()),
1152 swarm_id: work.swarm_id.clone(),
1153 timestamp: Utc::now(),
1154 payload: json!({ "updated": true }),
1155 });
1156 }
1157 }
1158
1159 {
1161 let gov = governance.read().await;
1162 let mut proposal_store = proposals.write().await;
1163 let persona_map = personas.read().await;
1164 let active_count = persona_map
1165 .values()
1166 .filter(|p| p.status == PersonaStatus::Active)
1167 .count();
1168
1169 let proposal_ids: Vec<String> = proposal_store
1170 .values()
1171 .filter(|p| p.status == ProposalStatus::Created)
1172 .map(|p| p.id.clone())
1173 .collect();
1174
1175 let mut attn_queue = attention_queue.write().await;
1176 for pid in proposal_ids {
1177 if let Some(proposal) = proposal_store.get_mut(&pid) {
1178 if let Some(deadline) = proposal.vote_deadline {
1180 if now > deadline {
1181 let quorum_needed =
1182 (active_count as f32 * gov.quorum_fraction).ceil() as usize;
1183 if proposal.votes.len() < quorum_needed {
1184 attn_queue.push(AttentionItem {
1186 id: Uuid::new_v4().to_string(),
1187 topic: format!(
1188 "Proposal vote timeout: {}",
1189 proposal.title
1190 ),
1191 topic_tags: Vec::new(),
1192 priority: 0.6,
1193 source_type: AttentionSource::ProposalTimeout,
1194 source_id: proposal.id.clone(),
1195 assigned_persona: None,
1196 created_at: now,
1197 resolved_at: None,
1198 });
1199 continue;
1200 }
1201 }
1202 }
1203
1204 let quorum_needed =
1206 (active_count as f32 * gov.quorum_fraction).ceil() as usize;
1207 if proposal.votes.len() >= quorum_needed {
1208 let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1210 if *vote != ProposalVote::Veto {
1211 return false;
1212 }
1213 if let Some(voter) = persona_map.get(voter_id) {
1214 gov.veto_roles.contains(&voter.identity.role)
1215 } else {
1216 false
1217 }
1218 });
1219
1220 if vetoed {
1221 proposal.status = ProposalStatus::Rejected;
1222 continue;
1223 }
1224
1225 let required_roles = gov
1227 .required_approvers_by_role
1228 .get(&proposal.risk)
1229 .cloned()
1230 .unwrap_or_default();
1231 let all_required_met = required_roles.iter().all(|role| {
1232 proposal.votes.iter().any(|(vid, vote)| {
1233 *vote == ProposalVote::Approve
1234 && persona_map
1235 .get(vid)
1236 .map(|p| &p.identity.role == role)
1237 .unwrap_or(false)
1238 })
1239 });
1240
1241 if !all_required_met {
1242 continue; }
1244
1245 let approvals = proposal
1247 .votes
1248 .values()
1249 .filter(|v| **v == ProposalVote::Approve)
1250 .count();
1251 let rejections = proposal
1252 .votes
1253 .values()
1254 .filter(|v| **v == ProposalVote::Reject)
1255 .count();
1256
1257 if approvals > rejections {
1258 proposal.status = ProposalStatus::Verified;
1259 } else {
1260 proposal.status = ProposalStatus::Rejected;
1261 }
1262 }
1263 }
1264 }
1265 }
1266
1267 {
1269 let mut proposal_store = proposals.write().await;
1270 let verified_ids: Vec<String> = proposal_store
1271 .values()
1272 .filter(|p| p.status == ProposalStatus::Verified)
1273 .map(|p| p.id.clone())
1274 .collect();
1275
1276 for pid in verified_ids {
1277 if let Some(proposal) = proposal_store.get_mut(&pid) {
1278 if proposal.risk == ProposalRisk::Critical {
1279 let approved = {
1281 let approvals = pending_approvals.read().await;
1282 approvals.get(&pid).copied().unwrap_or(false)
1283 };
1284 if !approved {
1285 let mut approvals = pending_approvals.write().await;
1287 approvals.entry(pid.clone()).or_insert(false);
1288 continue;
1289 }
1290 }
1291
1292 let receipt = executor::DecisionReceipt {
1294 id: Uuid::new_v4().to_string(),
1295 proposal_id: pid.clone(),
1296 inputs: proposal.evidence_refs.clone(),
1297 governance_decision: format!(
1298 "Approved with {} votes",
1299 proposal.votes.len()
1300 ),
1301 capability_leases: Vec::new(),
1302 tool_invocations: Vec::new(),
1303 outcome: executor::ExecutionOutcome::Success {
1304 summary: format!("Proposal '{}' executed", proposal.title),
1305 },
1306 created_at: Utc::now(),
1307 };
1308
1309 new_events.push(ThoughtEvent {
1310 id: Uuid::new_v4().to_string(),
1311 event_type: ThoughtEventType::ActionExecuted,
1312 persona_id: Some(proposal.persona_id.clone()),
1313 swarm_id: None,
1314 timestamp: Utc::now(),
1315 payload: json!({
1316 "receipt_id": receipt.id,
1317 "proposal_id": pid,
1318 "outcome": "success",
1319 "summary": format!("Proposal '{}' executed", proposal.title),
1320 }),
1321 });
1322
1323 receipts.write().await.push(receipt);
1324 proposal.status = ProposalStatus::Executed;
1325 }
1326 }
1327 }
1328
1329 if !new_proposals.is_empty() {
1330 let mut proposal_store = proposals.write().await;
1331 for proposal in new_proposals {
1332 proposal_store.insert(proposal.id.clone(), proposal);
1333 }
1334 }
1335
1336 for event in new_events {
1337 push_event_internal(&events, max_events, &event_tx, event).await;
1338 }
1339 for snapshot in new_snapshots {
1340 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1341 }
1342
1343 {
1345 let mut map = personas.write().await;
1346 let idle_ids: Vec<String> = map
1347 .values()
1348 .filter(|p| {
1349 p.status == PersonaStatus::Active
1350 && !p.budget_paused
1351 && now.signed_duration_since(p.last_progress_at).num_seconds()
1352 > p.policy.idle_ttl_secs as i64
1353 })
1354 .map(|p| p.identity.id.clone())
1355 .collect();
1356
1357 for id in &idle_ids {
1358 if let Some(persona) = map.get_mut(id) {
1359 persona.status = PersonaStatus::Reaped;
1360 persona.updated_at = now;
1361 }
1362 let children: Vec<String> = map
1364 .values()
1365 .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1366 .map(|p| p.identity.id.clone())
1367 .collect();
1368 for child_id in children {
1369 if let Some(child) = map.get_mut(&child_id) {
1370 child.status = PersonaStatus::Reaped;
1371 child.updated_at = now;
1372 }
1373 }
1374 }
1375 drop(map);
1376
1377 for id in idle_ids {
1378 push_event_internal(
1379 &events,
1380 max_events,
1381 &event_tx,
1382 ThoughtEvent {
1383 id: Uuid::new_v4().to_string(),
1384 event_type: ThoughtEventType::IdleReaped,
1385 persona_id: Some(id),
1386 swarm_id: None,
1387 timestamp: now,
1388 payload: json!({ "reason": "idle_ttl_expired" }),
1389 },
1390 )
1391 .await;
1392 }
1393 }
1394
1395 if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1397 let _ = persistence::save_state(
1398 &personas,
1399 &proposals,
1400 &beliefs,
1401 &attention_queue,
1402 &workspace,
1403 &events,
1404 &snapshots,
1405 )
1406 .await;
1407 }
1408
1409 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1410 next_tick += interval;
1411 let tick_completed = Instant::now();
1412 if tick_completed > next_tick {
1413 next_tick = tick_completed;
1414 }
1415 }
1416 });
1417
1418 {
1419 let mut lock = self.loop_handle.lock().await;
1420 *lock = Some(handle);
1421 }
1422
1423 Ok(self.status().await)
1424 }
1425
1426 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1428 self.running.store(false, Ordering::SeqCst);
1429
1430 if let Some(handle) = self.loop_handle.lock().await.take() {
1431 handle.abort();
1432 let _ = handle.await;
1433 }
1434
1435 if let Some(reason_value) = reason {
1436 let event = ThoughtEvent {
1437 id: Uuid::new_v4().to_string(),
1438 event_type: ThoughtEventType::CheckResult,
1439 persona_id: None,
1440 swarm_id: None,
1441 timestamp: Utc::now(),
1442 payload: json!({ "stopped": true, "reason": reason_value }),
1443 };
1444 self.push_event(event).await;
1445 }
1446
1447 Ok(self.status().await)
1448 }
1449
1450 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1452 let now = Utc::now();
1453 let mut personas = self.personas.write().await;
1454
1455 let mut parent_swarm_id = None;
1456 let mut computed_depth = 0_u32;
1457 let mut inherited_policy = None;
1458
1459 if let Some(parent_id) = req.parent_id.clone() {
1460 let parent = personas
1461 .get(&parent_id)
1462 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1463
1464 if parent.status == PersonaStatus::Reaped {
1465 return Err(anyhow!("Parent persona {} is reaped", parent_id));
1466 }
1467
1468 parent_swarm_id = parent.identity.swarm_id.clone();
1469 computed_depth = parent.identity.depth.saturating_add(1);
1470 inherited_policy = Some(parent.policy.clone());
1471 let branch_limit = parent.policy.max_branching_factor;
1472
1473 let child_count = personas
1474 .values()
1475 .filter(|p| {
1476 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1477 && p.status != PersonaStatus::Reaped
1478 })
1479 .count();
1480
1481 if child_count as u32 >= branch_limit {
1482 return Err(anyhow!(
1483 "Parent {} reached branching limit {}",
1484 parent_id,
1485 branch_limit
1486 ));
1487 }
1488 }
1489
1490 let policy = req
1491 .policy
1492 .clone()
1493 .or(inherited_policy.clone())
1494 .unwrap_or_else(|| self.default_policy.clone());
1495
1496 let effective_depth_limit = inherited_policy
1497 .as_ref()
1498 .map(|p| p.max_spawn_depth)
1499 .unwrap_or(policy.max_spawn_depth);
1500
1501 if computed_depth > effective_depth_limit {
1502 return Err(anyhow!(
1503 "Spawn depth {} exceeds limit {}",
1504 computed_depth,
1505 effective_depth_limit
1506 ));
1507 }
1508
1509 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1510 if personas.contains_key(&persona_id) {
1511 return Err(anyhow!("Persona id already exists: {}", persona_id));
1512 }
1513
1514 let identity = PersonaIdentity {
1515 id: persona_id.clone(),
1516 name: req.name,
1517 role: req.role,
1518 charter: req.charter,
1519 swarm_id: req.swarm_id.or(parent_swarm_id),
1520 parent_id: req.parent_id,
1521 depth: computed_depth,
1522 created_at: now,
1523 tags: req.tags,
1524 };
1525
1526 let persona = PersonaRuntimeState {
1527 identity,
1528 policy,
1529 status: PersonaStatus::Active,
1530 thought_count: 0,
1531 last_tick_at: None,
1532 updated_at: now,
1533 tokens_this_window: 0,
1534 compute_ms_this_window: 0,
1535 window_started_at: now,
1536 last_progress_at: now,
1537 budget_paused: false,
1538 };
1539
1540 personas.insert(persona_id, persona.clone());
1541 drop(personas);
1542
1543 self.push_event(ThoughtEvent {
1544 id: Uuid::new_v4().to_string(),
1545 event_type: ThoughtEventType::PersonaSpawned,
1546 persona_id: Some(persona.identity.id.clone()),
1547 swarm_id: persona.identity.swarm_id.clone(),
1548 timestamp: now,
1549 payload: json!({
1550 "name": persona.identity.name,
1551 "role": persona.identity.role,
1552 "depth": persona.identity.depth,
1553 }),
1554 })
1555 .await;
1556
1557 Ok(persona)
1558 }
1559
1560 pub async fn spawn_child(
1562 &self,
1563 parent_id: &str,
1564 req: SpawnPersonaRequest,
1565 ) -> Result<PersonaRuntimeState> {
1566 let request = CreatePersonaRequest {
1567 persona_id: req.persona_id,
1568 name: req.name,
1569 role: req.role,
1570 charter: req.charter,
1571 swarm_id: req.swarm_id,
1572 parent_id: Some(parent_id.to_string()),
1573 policy: req.policy,
1574 tags: Vec::new(),
1575 };
1576 self.create_persona(request).await
1577 }
1578
1579 pub async fn reap_persona(
1581 &self,
1582 persona_id: &str,
1583 req: ReapPersonaRequest,
1584 ) -> Result<ReapPersonaResponse> {
1585 let cascade = req.cascade.unwrap_or(false);
1586 let now = Utc::now();
1587
1588 let mut personas = self.personas.write().await;
1589 if !personas.contains_key(persona_id) {
1590 return Err(anyhow!("Persona not found: {}", persona_id));
1591 }
1592
1593 let mut reaped_ids = vec![persona_id.to_string()];
1594 if cascade {
1595 let mut idx = 0usize;
1596 while idx < reaped_ids.len() {
1597 let current = reaped_ids[idx].clone();
1598 let children: Vec<String> = personas
1599 .values()
1600 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1601 .map(|p| p.identity.id.clone())
1602 .collect();
1603 for child in children {
1604 if !reaped_ids.iter().any(|existing| existing == &child) {
1605 reaped_ids.push(child);
1606 }
1607 }
1608 idx += 1;
1609 }
1610 }
1611
1612 for id in &reaped_ids {
1613 if let Some(persona) = personas.get_mut(id) {
1614 persona.status = PersonaStatus::Reaped;
1615 persona.updated_at = now;
1616 }
1617 }
1618 drop(personas);
1619
1620 for id in &reaped_ids {
1621 self.push_event(ThoughtEvent {
1622 id: Uuid::new_v4().to_string(),
1623 event_type: ThoughtEventType::PersonaReaped,
1624 persona_id: Some(id.clone()),
1625 swarm_id: None,
1626 timestamp: now,
1627 payload: json!({
1628 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1629 "cascade": cascade,
1630 }),
1631 })
1632 .await;
1633 }
1634
1635 Ok(ReapPersonaResponse {
1636 count: reaped_ids.len(),
1637 reaped_ids,
1638 })
1639 }
1640
1641 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1643 self.snapshots.read().await.back().cloned()
1644 }
1645
1646 pub async fn lineage_graph(&self) -> LineageGraph {
1648 let personas = self.personas.read().await;
1649 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1650 let mut roots = Vec::new();
1651 let mut total_edges = 0usize;
1652
1653 for persona in personas.values() {
1654 if let Some(parent_id) = persona.identity.parent_id.clone() {
1655 children_by_parent
1656 .entry(parent_id)
1657 .or_default()
1658 .push(persona.identity.id.clone());
1659 total_edges = total_edges.saturating_add(1);
1660 } else {
1661 roots.push(persona.identity.id.clone());
1662 }
1663 }
1664
1665 let mut nodes: Vec<LineageNode> = personas
1666 .values()
1667 .map(|persona| {
1668 let mut children = children_by_parent
1669 .get(&persona.identity.id)
1670 .cloned()
1671 .unwrap_or_default();
1672 children.sort();
1673
1674 LineageNode {
1675 persona_id: persona.identity.id.clone(),
1676 parent_id: persona.identity.parent_id.clone(),
1677 children,
1678 depth: persona.identity.depth,
1679 status: persona.status,
1680 }
1681 })
1682 .collect();
1683
1684 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1685 roots.sort();
1686
1687 LineageGraph {
1688 nodes,
1689 roots,
1690 total_edges,
1691 }
1692 }
1693
1694 pub async fn status(&self) -> CognitionStatus {
1696 let personas = self.personas.read().await;
1697 let events = self.events.read().await;
1698 let snapshots = self.snapshots.read().await;
1699 let started_at = *self.started_at.read().await;
1700 let last_tick_at = *self.last_tick_at.read().await;
1701 let loop_interval_ms = *self.loop_interval_ms.read().await;
1702
1703 let active_persona_count = personas
1704 .values()
1705 .filter(|p| p.status == PersonaStatus::Active)
1706 .count();
1707
1708 CognitionStatus {
1709 enabled: self.enabled,
1710 running: self.running.load(Ordering::SeqCst),
1711 loop_interval_ms,
1712 started_at,
1713 last_tick_at,
1714 persona_count: personas.len(),
1715 active_persona_count,
1716 events_buffered: events.len(),
1717 snapshots_buffered: snapshots.len(),
1718 }
1719 }
1720
1721 async fn push_event(&self, event: ThoughtEvent) {
1722 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1723 }
1724
1725 pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1727 self.tools = Some(registry);
1728 }
1729
1730 pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1732 self.beliefs.read().await.clone()
1733 }
1734
1735 pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1737 self.beliefs.read().await.get(id).cloned()
1738 }
1739
1740 pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1742 self.attention_queue.read().await.clone()
1743 }
1744
1745 pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1747 self.proposals.read().await.clone()
1748 }
1749
1750 pub async fn get_workspace(&self) -> GlobalWorkspace {
1752 self.workspace.read().await.clone()
1753 }
1754
1755 pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1757 self.receipts.read().await.clone()
1758 }
1759
1760 pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1762 let proposals = self.proposals.read().await;
1763 let proposal = proposals
1764 .get(proposal_id)
1765 .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1766
1767 if proposal.risk != ProposalRisk::Critical {
1768 return Err(anyhow!("Only Critical proposals require human approval"));
1769 }
1770 if proposal.status != ProposalStatus::Verified {
1771 return Err(anyhow!("Proposal is not in Verified status"));
1772 }
1773 drop(proposals);
1774
1775 let mut approvals = self.pending_approvals.write().await;
1776 approvals.insert(proposal_id.to_string(), true);
1777 Ok(())
1778 }
1779
1780 pub async fn get_governance(&self) -> SwarmGovernance {
1782 self.governance.read().await.clone()
1783 }
1784
1785 pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1787 self.personas.read().await.get(id).cloned()
1788 }
1789}
1790
1791async fn push_event_internal(
1792 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1793 max_events: usize,
1794 event_tx: &broadcast::Sender<ThoughtEvent>,
1795 event: ThoughtEvent,
1796) {
1797 {
1798 let mut lock = events.write().await;
1799 lock.push_back(event.clone());
1800 while lock.len() > max_events {
1801 lock.pop_front();
1802 }
1803 }
1804 let _ = event_tx.send(event);
1805}
1806
1807async fn push_snapshot_internal(
1808 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1809 max_snapshots: usize,
1810 snapshot: MemorySnapshot,
1811) {
1812 let mut lock = snapshots.write().await;
1813 lock.push_back(snapshot);
1814 while lock.len() > max_snapshots {
1815 lock.pop_front();
1816 }
1817}
1818
1819async fn recent_persona_context(
1820 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1821 persona_id: &str,
1822 limit: usize,
1823) -> Vec<ThoughtEvent> {
1824 let lock = events.read().await;
1825 let mut selected: Vec<ThoughtEvent> = lock
1826 .iter()
1827 .rev()
1828 .filter(|event| {
1829 event.persona_id.as_deref() == Some(persona_id)
1830 || (event.persona_id.is_none()
1831 && matches!(
1832 event.event_type,
1833 ThoughtEventType::CheckResult
1834 | ThoughtEventType::ProposalCreated
1835 | ThoughtEventType::SnapshotCompressed
1836 ))
1837 })
1838 .take(limit)
1839 .cloned()
1840 .collect();
1841 selected.reverse();
1842 selected
1843}
1844
1845async fn generate_phase_thought(
1846 thinker: Option<&ThinkerClient>,
1847 work: &ThoughtWorkItem,
1848 context: &[ThoughtEvent],
1849) -> ThoughtResult {
1850 let started_at = Instant::now();
1851 if let Some(client) = thinker {
1852 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1853 match client.think(&system_prompt, &user_prompt).await {
1854 Ok(output) => {
1855 let thinking = normalize_thought_output(work, context, &output.text);
1856 if !thinking.is_empty() {
1857 return ThoughtResult {
1858 source: "model",
1859 model: Some(output.model),
1860 finish_reason: output.finish_reason,
1861 thinking,
1862 prompt_tokens: output.prompt_tokens,
1863 completion_tokens: output.completion_tokens,
1864 total_tokens: output.total_tokens,
1865 latency_ms: started_at.elapsed().as_millis(),
1866 error: None,
1867 };
1868 }
1869 }
1870 Err(error) => {
1871 return ThoughtResult {
1872 source: "fallback",
1873 model: None,
1874 finish_reason: None,
1875 thinking: fallback_phase_text(work, context),
1876 prompt_tokens: None,
1877 completion_tokens: None,
1878 total_tokens: None,
1879 latency_ms: started_at.elapsed().as_millis(),
1880 error: Some(error.to_string()),
1881 };
1882 }
1883 }
1884 }
1885
1886 ThoughtResult {
1887 source: "fallback",
1888 model: None,
1889 finish_reason: None,
1890 thinking: fallback_phase_text(work, context),
1891 prompt_tokens: None,
1892 completion_tokens: None,
1893 total_tokens: None,
1894 latency_ms: started_at.elapsed().as_millis(),
1895 error: None,
1896 }
1897}
1898
1899fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1900 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1901Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1902Write as an operational process update, not meta narration. \
1903Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
1904Output concrete findings, checks, risks, and next actions. \
1905Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
1906 .to_string();
1907
1908 let context_lines = if context.is_empty() {
1909 "none".to_string()
1910 } else {
1911 context
1912 .iter()
1913 .map(format_context_event)
1914 .collect::<Vec<_>>()
1915 .join("\n")
1916 };
1917
1918 let phase_instruction = match work.phase {
1919 ThoughtPhase::Observe => {
1920 "Process format (exact line labels): \
1921Phase: Observe | Goal: detect current customer/business risk | \
1922Signals: 1-3 concrete signals separated by '; ' | \
1923Uncertainty: one unknown that blocks confidence | \
1924Next_Action: one immediate operational action."
1925 }
1926 ThoughtPhase::Reflect => {
1927 "Process format (exact line labels): \
1928Phase: Reflect | Hypothesis: single testable hypothesis | \
1929Rationale: why this is likely | \
1930Business_Risk: customer/revenue/SLA impact | \
1931Validation_Next_Action: one action to confirm or falsify."
1932 }
1933 ThoughtPhase::Test => {
1934 "Process format (exact line labels): \
1935Phase: Test | Check: single concrete check | \
1936Procedure: short executable procedure | \
1937Expected_Result: pass/fail expectation | \
1938Evidence_Quality: low|medium|high with reason | \
1939Escalation_Trigger: when to escalate immediately."
1940 }
1941 ThoughtPhase::Compress => {
1942 "Process format (exact line labels): \
1943Phase: Compress | State_Summary: current state in one line | \
1944Retained_Facts: 3 short facts separated by '; ' | \
1945Open_Risks: up to 2 unresolved risks separated by '; ' | \
1946Next_Process_Step: next operational step."
1947 }
1948 };
1949
1950 let user_prompt = format!(
1951 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
1952 phase = work.phase.as_str(),
1953 persona_id = work.persona_id,
1954 persona_name = work.persona_name,
1955 role = work.role,
1956 charter = work.charter,
1957 thought_count = work.thought_count,
1958 context = context_lines,
1959 instruction = phase_instruction
1960 );
1961
1962 (system_prompt, user_prompt)
1963}
1964
1965fn format_context_event(event: &ThoughtEvent) -> String {
1966 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
1967 format!(
1968 "{} {} {}",
1969 event.event_type.as_str(),
1970 event.timestamp.to_rfc3339(),
1971 trim_for_storage(&payload, 220)
1972 )
1973}
1974
1975fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
1976 let charter = trim_for_storage(&work.charter, 180);
1977 let context_summary = fallback_context_summary(context);
1978 let thought = match work.phase {
1979 ThoughtPhase::Observe => format!(
1980 "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
1981 work.role, charter, context_summary
1982 ),
1983 ThoughtPhase::Reflect => format!(
1984 "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
1985 ),
1986 ThoughtPhase::Test => format!(
1987 "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
1988 ),
1989 ThoughtPhase::Compress => format!(
1990 "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
1991 work.role, charter, context_summary
1992 ),
1993 };
1994 trim_for_storage(&thought, 1_200)
1995}
1996
1997fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
1998 let trimmed = trim_for_storage(raw, 2_000);
1999 if trimmed.trim().is_empty() {
2000 return fallback_phase_text(work, context);
2001 }
2002
2003 if let Some(idx) = find_process_label_start(&trimmed) {
2005 let candidate = trimmed[idx..].trim();
2006 if let Some(first_line) = candidate
2007 .lines()
2008 .map(str::trim)
2009 .find(|line| !line.is_empty())
2010 {
2011 let normalized_line = first_line.trim_matches('"').trim_matches('\'').trim();
2012 if normalized_line.starts_with("Phase:")
2013 && !normalized_line.contains('<')
2014 && !has_template_placeholder_values(normalized_line)
2015 {
2016 return normalized_line.to_string();
2017 }
2018 }
2019 if !candidate.is_empty()
2020 && !candidate.contains('<')
2021 && !candidate.contains('\n')
2022 && !has_template_placeholder_values(candidate)
2023 {
2024 return candidate.to_string();
2025 }
2026 }
2027
2028 let lower = trimmed.to_ascii_lowercase();
2029 let looks_meta = lower.starts_with("we need")
2030 || lower.starts_with("i need")
2031 || lower.contains("we need to")
2032 || lower.contains("i need to")
2033 || lower.contains("must output")
2034 || lower.contains("let's ")
2035 || lower.contains("we have to");
2036
2037 if looks_meta || has_template_placeholder_values(&trimmed) {
2038 return fallback_phase_text(work, context);
2039 }
2040
2041 trimmed
2042}
2043
2044fn has_template_placeholder_values(text: &str) -> bool {
2045 let lower = text.to_ascii_lowercase();
2046 [
2047 "goal: ...",
2048 "signals: ...",
2049 "uncertainty: ...",
2050 "next_action: ...",
2051 "hypothesis: ...",
2052 "rationale: ...",
2053 "business_risk: ...",
2054 "validation_next_action: ...",
2055 "check: ...",
2056 "procedure: ...",
2057 "expected_result: ...",
2058 "evidence_quality: ...",
2059 "escalation_trigger: ...",
2060 "state_summary: ...",
2061 "retained_facts: ...",
2062 "open_risks: ...",
2063 "next_process_step: ...",
2064 ]
2065 .iter()
2066 .any(|needle| lower.contains(needle))
2067 || lower.contains("<...")
2068 || lower.contains("tbd")
2069 || lower.contains("todo")
2070}
2071
2072fn find_process_label_start(text: &str) -> Option<usize> {
2073 [
2074 "Phase: Observe",
2075 "Phase: Reflect",
2076 "Phase: Test",
2077 "Phase: Compress",
2078 "Phase:",
2079 ]
2080 .iter()
2081 .filter_map(|label| text.find(label))
2082 .min()
2083}
2084
2085fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2086 if context.is_empty() {
2087 return "No prior events recorded yet.".to_string();
2088 }
2089
2090 let mut latest_error: Option<String> = None;
2091 let mut latest_proposal: Option<String> = None;
2092 let mut latest_check: Option<String> = None;
2093
2094 for event in context.iter().rev() {
2095 if latest_error.is_none()
2096 && let Some(error) = event
2097 .payload
2098 .get("error")
2099 .and_then(serde_json::Value::as_str)
2100 && !error.trim().is_empty()
2101 {
2102 latest_error = Some(trim_for_storage(error, 140));
2103 }
2104
2105 if latest_proposal.is_none()
2106 && event.event_type == ThoughtEventType::ProposalCreated
2107 && let Some(title) = event
2108 .payload
2109 .get("title")
2110 .and_then(serde_json::Value::as_str)
2111 && !title.trim().is_empty()
2112 && !has_template_placeholder_values(title)
2113 {
2114 latest_proposal = Some(trim_for_storage(title, 120));
2115 }
2116
2117 if latest_check.is_none()
2118 && event.event_type == ThoughtEventType::CheckResult
2119 && let Some(result) = event
2120 .payload
2121 .get("result_excerpt")
2122 .and_then(serde_json::Value::as_str)
2123 && !result.trim().is_empty()
2124 && !has_template_placeholder_values(result)
2125 {
2126 latest_check = Some(trim_for_storage(result, 140));
2127 }
2128
2129 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2130 break;
2131 }
2132 }
2133
2134 let mut lines = vec![format!(
2135 "{} recent cognition events are available.",
2136 context.len()
2137 )];
2138 if let Some(error) = latest_error {
2139 lines.push(format!("Latest error signal: {}.", error));
2140 }
2141 if let Some(proposal) = latest_proposal {
2142 lines.push(format!("Recent proposal: {}.", proposal));
2143 }
2144 if let Some(check) = latest_check {
2145 lines.push(format!("Recent check: {}.", check));
2146 }
2147 lines.join(" ")
2148}
2149
2150fn trim_for_storage(input: &str, max_chars: usize) -> String {
2151 if input.chars().count() <= max_chars {
2152 return input.trim().to_string();
2153 }
2154 let mut trimmed = String::with_capacity(max_chars + 8);
2155 for ch in input.chars().take(max_chars) {
2156 trimmed.push(ch);
2157 }
2158 trimmed.push_str("...");
2159 trimmed.trim().to_string()
2160}
2161
2162fn estimate_fact_count(text: &str) -> usize {
2163 let sentence_count =
2164 text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2165 sentence_count.clamp(1, 12)
2166}
2167
2168fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2169 let first_line = thought
2170 .lines()
2171 .find(|line| !line.trim().is_empty())
2172 .unwrap_or("proposal");
2173 let compact = first_line
2174 .replace(['\t', '\r', '\n'], " ")
2175 .split_whitespace()
2176 .collect::<Vec<_>>()
2177 .join(" ");
2178 let trimmed = trim_for_storage(&compact, 72);
2179 if trimmed.is_empty() {
2180 format!("proposal-{}", thought_count)
2181 } else {
2182 trimmed
2183 }
2184}
2185
2186fn default_seed_persona() -> CreatePersonaRequest {
2187 CreatePersonaRequest {
2188 persona_id: Some("root-thinker".to_string()),
2189 name: "root-thinker".to_string(),
2190 role: "orchestrator".to_string(),
2191 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2192 .to_string(),
2193 swarm_id: Some("swarm-core".to_string()),
2194 parent_id: None,
2195 policy: None,
2196 tags: vec!["orchestration".to_string()],
2197 }
2198}
2199
2200fn normalize_thinker_endpoint(base_url: &str) -> String {
2201 let trimmed = base_url.trim().trim_end_matches('/');
2202 if trimmed.ends_with("/chat/completions") {
2203 return trimmed.to_string();
2204 }
2205 if trimmed.is_empty() {
2206 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2207 }
2208 format!("{}/chat/completions", trimmed)
2209}
2210
2211fn env_bool(name: &str, default: bool) -> bool {
2212 std::env::var(name)
2213 .ok()
2214 .and_then(|v| match v.to_ascii_lowercase().as_str() {
2215 "1" | "true" | "yes" | "on" => Some(true),
2216 "0" | "false" | "no" | "off" => Some(false),
2217 _ => None,
2218 })
2219 .unwrap_or(default)
2220}
2221
2222fn env_f32(name: &str, default: f32) -> f32 {
2223 std::env::var(name)
2224 .ok()
2225 .and_then(|v| v.parse::<f32>().ok())
2226 .unwrap_or(default)
2227}
2228
2229fn env_u64(name: &str, default: u64) -> u64 {
2230 std::env::var(name)
2231 .ok()
2232 .and_then(|v| v.parse::<u64>().ok())
2233 .unwrap_or(default)
2234}
2235
2236fn env_u32(name: &str, default: u32) -> u32 {
2237 std::env::var(name)
2238 .ok()
2239 .and_then(|v| v.parse::<u32>().ok())
2240 .unwrap_or(default)
2241}
2242
2243fn env_usize(name: &str, default: usize) -> usize {
2244 std::env::var(name)
2245 .ok()
2246 .and_then(|v| v.parse::<usize>().ok())
2247 .unwrap_or(default)
2248}
2249
2250#[cfg(test)]
2251mod tests {
2252 use super::*;
2253
2254 fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2255 ThoughtWorkItem {
2256 persona_id: "p-1".to_string(),
2257 persona_name: "Spotlessbinco Business Thinker".to_string(),
2258 role: "principal reliability engineer".to_string(),
2259 charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2260 .to_string(),
2261 swarm_id: Some("spotlessbinco".to_string()),
2262 thought_count: 4,
2263 phase,
2264 }
2265 }
2266
2267 fn test_runtime() -> CognitionRuntime {
2268 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2269 enabled: true,
2270 loop_interval_ms: 25,
2271 max_events: 256,
2272 max_snapshots: 32,
2273 default_policy: PersonaPolicy {
2274 max_spawn_depth: 2,
2275 max_branching_factor: 2,
2276 token_budget_per_minute: 1_000,
2277 compute_ms_per_minute: 1_000,
2278 idle_ttl_secs: 300,
2279 share_memory: false,
2280 allowed_tools: Vec::new(),
2281 },
2282 })
2283 }
2284
2285 #[test]
2286 fn normalize_rejects_placeholder_process_line() {
2287 let work = sample_work_item(ThoughtPhase::Compress);
2288 let output = normalize_thought_output(
2289 &work,
2290 &[],
2291 "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2292 );
2293 assert!(
2294 output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2295 );
2296 assert!(!output.contains("State_Summary: ..."));
2297 }
2298
2299 #[test]
2300 fn normalize_accepts_concrete_process_line() {
2301 let work = sample_work_item(ThoughtPhase::Test);
2302 let output = normalize_thought_output(
2303 &work,
2304 &[],
2305 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2306 );
2307 assert_eq!(
2308 output,
2309 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2310 );
2311 }
2312
2313 #[tokio::test]
2314 async fn create_spawn_and_lineage_work() {
2315 let runtime = test_runtime();
2316
2317 let root = runtime
2318 .create_persona(CreatePersonaRequest {
2319 persona_id: Some("root".to_string()),
2320 name: "root".to_string(),
2321 role: "orchestrator".to_string(),
2322 charter: "coordinate".to_string(),
2323 swarm_id: Some("swarm-a".to_string()),
2324 parent_id: None,
2325 policy: None,
2326 tags: Vec::new(),
2327 })
2328 .await
2329 .expect("root should be created");
2330
2331 assert_eq!(root.identity.depth, 0);
2332
2333 let child = runtime
2334 .spawn_child(
2335 "root",
2336 SpawnPersonaRequest {
2337 persona_id: Some("child-1".to_string()),
2338 name: "child-1".to_string(),
2339 role: "analyst".to_string(),
2340 charter: "analyze".to_string(),
2341 swarm_id: None,
2342 policy: None,
2343 },
2344 )
2345 .await
2346 .expect("child should spawn");
2347
2348 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2349 assert_eq!(child.identity.depth, 1);
2350
2351 let lineage = runtime.lineage_graph().await;
2352 assert_eq!(lineage.total_edges, 1);
2353 assert_eq!(lineage.roots, vec!["root".to_string()]);
2354 }
2355
2356 #[tokio::test]
2357 async fn branching_and_depth_limits_are_enforced() {
2358 let runtime = test_runtime();
2359
2360 runtime
2361 .create_persona(CreatePersonaRequest {
2362 persona_id: Some("root".to_string()),
2363 name: "root".to_string(),
2364 role: "orchestrator".to_string(),
2365 charter: "coordinate".to_string(),
2366 swarm_id: Some("swarm-a".to_string()),
2367 parent_id: None,
2368 policy: None,
2369 tags: Vec::new(),
2370 })
2371 .await
2372 .expect("root should be created");
2373
2374 runtime
2375 .spawn_child(
2376 "root",
2377 SpawnPersonaRequest {
2378 persona_id: Some("c1".to_string()),
2379 name: "c1".to_string(),
2380 role: "worker".to_string(),
2381 charter: "run".to_string(),
2382 swarm_id: None,
2383 policy: None,
2384 },
2385 )
2386 .await
2387 .expect("first child should spawn");
2388
2389 runtime
2390 .spawn_child(
2391 "root",
2392 SpawnPersonaRequest {
2393 persona_id: Some("c2".to_string()),
2394 name: "c2".to_string(),
2395 role: "worker".to_string(),
2396 charter: "run".to_string(),
2397 swarm_id: None,
2398 policy: None,
2399 },
2400 )
2401 .await
2402 .expect("second child should spawn");
2403
2404 let third_child = runtime
2405 .spawn_child(
2406 "root",
2407 SpawnPersonaRequest {
2408 persona_id: Some("c3".to_string()),
2409 name: "c3".to_string(),
2410 role: "worker".to_string(),
2411 charter: "run".to_string(),
2412 swarm_id: None,
2413 policy: None,
2414 },
2415 )
2416 .await;
2417 assert!(third_child.is_err());
2418
2419 runtime
2420 .spawn_child(
2421 "c1",
2422 SpawnPersonaRequest {
2423 persona_id: Some("c1-1".to_string()),
2424 name: "c1-1".to_string(),
2425 role: "worker".to_string(),
2426 charter: "run".to_string(),
2427 swarm_id: None,
2428 policy: None,
2429 },
2430 )
2431 .await
2432 .expect("depth 2 should be allowed");
2433
2434 let depth_violation = runtime
2435 .spawn_child(
2436 "c1-1",
2437 SpawnPersonaRequest {
2438 persona_id: Some("c1-1-1".to_string()),
2439 name: "c1-1-1".to_string(),
2440 role: "worker".to_string(),
2441 charter: "run".to_string(),
2442 swarm_id: None,
2443 policy: None,
2444 },
2445 )
2446 .await;
2447 assert!(depth_violation.is_err());
2448 }
2449
2450 #[tokio::test]
2451 async fn start_stop_updates_runtime_status() {
2452 let runtime = test_runtime();
2453
2454 runtime
2455 .start(Some(StartCognitionRequest {
2456 loop_interval_ms: Some(10),
2457 seed_persona: Some(CreatePersonaRequest {
2458 persona_id: Some("seed".to_string()),
2459 name: "seed".to_string(),
2460 role: "watcher".to_string(),
2461 charter: "observe".to_string(),
2462 swarm_id: Some("swarm-seed".to_string()),
2463 parent_id: None,
2464 policy: None,
2465 tags: Vec::new(),
2466 }),
2467 }))
2468 .await
2469 .expect("runtime should start");
2470
2471 tokio::time::sleep(Duration::from_millis(60)).await;
2472 let running_status = runtime.status().await;
2473 assert!(running_status.running);
2474 assert!(running_status.events_buffered > 0);
2475
2476 runtime
2477 .stop(Some("test".to_string()))
2478 .await
2479 .expect("runtime should stop");
2480 let stopped_status = runtime.status().await;
2481 assert!(!stopped_status.running);
2482 }
2483
2484 #[tokio::test]
2485 async fn zero_budget_persona_is_skipped() {
2486 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2487 enabled: true,
2488 loop_interval_ms: 10,
2489 max_events: 256,
2490 max_snapshots: 32,
2491 default_policy: PersonaPolicy {
2492 max_spawn_depth: 2,
2493 max_branching_factor: 2,
2494 token_budget_per_minute: 0,
2495 compute_ms_per_minute: 0,
2496 idle_ttl_secs: 3_600,
2497 share_memory: false,
2498 allowed_tools: Vec::new(),
2499 },
2500 });
2501
2502 let persona = runtime
2503 .create_persona(CreatePersonaRequest {
2504 persona_id: Some("budget-test".to_string()),
2505 name: "budget-test".to_string(),
2506 role: "tester".to_string(),
2507 charter: "test budgets".to_string(),
2508 swarm_id: None,
2509 parent_id: None,
2510 policy: None,
2511 tags: Vec::new(),
2512 })
2513 .await
2514 .expect("should create persona");
2515
2516 assert_eq!(persona.tokens_this_window, 0);
2517 assert_eq!(persona.compute_ms_this_window, 0);
2518
2519 runtime.start(None).await.expect("should start");
2521 tokio::time::sleep(Duration::from_millis(50)).await;
2522 runtime.stop(None).await.expect("should stop");
2523
2524 let p = runtime.get_persona("budget-test").await.unwrap();
2526 assert_eq!(p.thought_count, 0);
2527 assert!(p.budget_paused);
2528 }
2529
2530 #[tokio::test]
2531 async fn budget_counters_reset_after_window() {
2532 let now = Utc::now();
2533 let mut persona = PersonaRuntimeState {
2534 identity: PersonaIdentity {
2535 id: "p1".to_string(),
2536 name: "test".to_string(),
2537 role: "tester".to_string(),
2538 charter: "test".to_string(),
2539 swarm_id: None,
2540 parent_id: None,
2541 depth: 0,
2542 created_at: now,
2543 tags: Vec::new(),
2544 },
2545 policy: PersonaPolicy::default(),
2546 status: PersonaStatus::Active,
2547 thought_count: 0,
2548 last_tick_at: None,
2549 updated_at: now,
2550 tokens_this_window: 5000,
2551 compute_ms_this_window: 3000,
2552 window_started_at: now - ChronoDuration::seconds(61),
2553 last_progress_at: now,
2554 budget_paused: false,
2555 };
2556
2557 let window_elapsed = now
2559 .signed_duration_since(persona.window_started_at)
2560 .num_seconds();
2561 assert!(window_elapsed >= 60);
2562
2563 persona.tokens_this_window = 0;
2565 persona.compute_ms_this_window = 0;
2566 persona.window_started_at = now;
2567
2568 assert_eq!(persona.tokens_this_window, 0);
2569 assert_eq!(persona.compute_ms_this_window, 0);
2570 }
2571
2572 #[tokio::test]
2573 async fn idle_persona_is_reaped() {
2574 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2575 enabled: true,
2576 loop_interval_ms: 10,
2577 max_events: 256,
2578 max_snapshots: 32,
2579 default_policy: PersonaPolicy {
2580 max_spawn_depth: 2,
2581 max_branching_factor: 2,
2582 token_budget_per_minute: 20_000,
2583 compute_ms_per_minute: 10_000,
2584 idle_ttl_secs: 0, share_memory: false,
2586 allowed_tools: Vec::new(),
2587 },
2588 });
2589
2590 runtime
2591 .create_persona(CreatePersonaRequest {
2592 persona_id: Some("idle-test".to_string()),
2593 name: "idle-test".to_string(),
2594 role: "idler".to_string(),
2595 charter: "idle away".to_string(),
2596 swarm_id: None,
2597 parent_id: None,
2598 policy: None,
2599 tags: Vec::new(),
2600 })
2601 .await
2602 .expect("should create persona");
2603
2604 {
2606 let mut personas = runtime.personas.write().await;
2607 if let Some(p) = personas.get_mut("idle-test") {
2608 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2609 }
2610 }
2611
2612 runtime.start(None).await.expect("should start");
2613 tokio::time::sleep(Duration::from_millis(100)).await;
2614 runtime.stop(None).await.expect("should stop");
2615
2616 let p = runtime.get_persona("idle-test").await.unwrap();
2617 assert_eq!(p.status, PersonaStatus::Reaped);
2618 }
2619
2620 #[tokio::test]
2621 async fn budget_paused_persona_not_reaped_for_idle() {
2622 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2623 enabled: true,
2624 loop_interval_ms: 10,
2625 max_events: 256,
2626 max_snapshots: 32,
2627 default_policy: PersonaPolicy {
2628 max_spawn_depth: 2,
2629 max_branching_factor: 2,
2630 token_budget_per_minute: 0, compute_ms_per_minute: 0,
2632 idle_ttl_secs: 0, share_memory: false,
2634 allowed_tools: Vec::new(),
2635 },
2636 });
2637
2638 runtime
2639 .create_persona(CreatePersonaRequest {
2640 persona_id: Some("paused-test".to_string()),
2641 name: "paused-test".to_string(),
2642 role: "pauser".to_string(),
2643 charter: "pause".to_string(),
2644 swarm_id: None,
2645 parent_id: None,
2646 policy: None,
2647 tags: Vec::new(),
2648 })
2649 .await
2650 .expect("should create persona");
2651
2652 {
2654 let mut personas = runtime.personas.write().await;
2655 if let Some(p) = personas.get_mut("paused-test") {
2656 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2657 }
2658 }
2659
2660 runtime.start(None).await.expect("should start");
2661 tokio::time::sleep(Duration::from_millis(100)).await;
2662 runtime.stop(None).await.expect("should stop");
2663
2664 let p = runtime.get_persona("paused-test").await.unwrap();
2665 assert_eq!(p.status, PersonaStatus::Active);
2667 assert!(p.budget_paused);
2668 }
2669
2670 #[tokio::test]
2671 async fn governance_proposal_resolution() {
2672 let runtime = test_runtime();
2673 let gov = SwarmGovernance {
2674 quorum_fraction: 0.5,
2675 required_approvers_by_role: HashMap::new(),
2676 veto_roles: vec!["auditor".to_string()],
2677 vote_timeout_secs: 300,
2678 };
2679 *runtime.governance.write().await = gov;
2680
2681 runtime
2683 .create_persona(CreatePersonaRequest {
2684 persona_id: Some("voter-1".to_string()),
2685 name: "voter-1".to_string(),
2686 role: "engineer".to_string(),
2687 charter: "vote".to_string(),
2688 swarm_id: None,
2689 parent_id: None,
2690 policy: None,
2691 tags: Vec::new(),
2692 })
2693 .await
2694 .unwrap();
2695 runtime
2696 .create_persona(CreatePersonaRequest {
2697 persona_id: Some("voter-2".to_string()),
2698 name: "voter-2".to_string(),
2699 role: "engineer".to_string(),
2700 charter: "vote".to_string(),
2701 swarm_id: None,
2702 parent_id: None,
2703 policy: None,
2704 tags: Vec::new(),
2705 })
2706 .await
2707 .unwrap();
2708
2709 {
2711 let mut proposals = runtime.proposals.write().await;
2712 let mut votes = HashMap::new();
2713 votes.insert("voter-1".to_string(), ProposalVote::Approve);
2714 proposals.insert(
2715 "prop-1".to_string(),
2716 Proposal {
2717 id: "prop-1".to_string(),
2718 persona_id: "voter-1".to_string(),
2719 title: "test proposal".to_string(),
2720 rationale: "testing governance".to_string(),
2721 evidence_refs: Vec::new(),
2722 risk: ProposalRisk::Low,
2723 status: ProposalStatus::Created,
2724 created_at: Utc::now(),
2725 votes,
2726 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2727 votes_requested: true,
2728 },
2729 );
2730 }
2731
2732 runtime.start(None).await.unwrap();
2734 tokio::time::sleep(Duration::from_millis(100)).await;
2735 runtime.stop(None).await.unwrap();
2736
2737 let proposals = runtime.get_proposals().await;
2738 let prop = proposals.get("prop-1").unwrap();
2739 assert!(
2741 prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2742 "Expected Verified or Executed, got {:?}",
2743 prop.status
2744 );
2745 }
2746
2747 #[tokio::test]
2748 async fn veto_rejects_proposal() {
2749 let runtime = test_runtime();
2750 let gov = SwarmGovernance {
2751 quorum_fraction: 0.5,
2752 required_approvers_by_role: HashMap::new(),
2753 veto_roles: vec!["auditor".to_string()],
2754 vote_timeout_secs: 300,
2755 };
2756 *runtime.governance.write().await = gov;
2757
2758 runtime
2759 .create_persona(CreatePersonaRequest {
2760 persona_id: Some("eng".to_string()),
2761 name: "eng".to_string(),
2762 role: "engineer".to_string(),
2763 charter: "build".to_string(),
2764 swarm_id: None,
2765 parent_id: None,
2766 policy: None,
2767 tags: Vec::new(),
2768 })
2769 .await
2770 .unwrap();
2771 runtime
2772 .create_persona(CreatePersonaRequest {
2773 persona_id: Some("aud".to_string()),
2774 name: "aud".to_string(),
2775 role: "auditor".to_string(),
2776 charter: "audit".to_string(),
2777 swarm_id: None,
2778 parent_id: None,
2779 policy: None,
2780 tags: Vec::new(),
2781 })
2782 .await
2783 .unwrap();
2784
2785 {
2786 let mut proposals = runtime.proposals.write().await;
2787 let mut votes = HashMap::new();
2788 votes.insert("eng".to_string(), ProposalVote::Approve);
2789 votes.insert("aud".to_string(), ProposalVote::Veto);
2790 proposals.insert(
2791 "prop-veto".to_string(),
2792 Proposal {
2793 id: "prop-veto".to_string(),
2794 persona_id: "eng".to_string(),
2795 title: "vetoed proposal".to_string(),
2796 rationale: "testing veto".to_string(),
2797 evidence_refs: Vec::new(),
2798 risk: ProposalRisk::Low,
2799 status: ProposalStatus::Created,
2800 created_at: Utc::now(),
2801 votes,
2802 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2803 votes_requested: true,
2804 },
2805 );
2806 }
2807
2808 runtime.start(None).await.unwrap();
2809 tokio::time::sleep(Duration::from_millis(100)).await;
2810 runtime.stop(None).await.unwrap();
2811
2812 let proposals = runtime.get_proposals().await;
2813 let prop = proposals.get("prop-veto").unwrap();
2814 assert_eq!(prop.status, ProposalStatus::Rejected);
2815 }
2816
2817 #[test]
2818 fn global_workspace_default() {
2819 let ws = GlobalWorkspace::default();
2820 assert!(ws.top_beliefs.is_empty());
2821 assert!(ws.top_uncertainties.is_empty());
2822 assert!(ws.top_attention.is_empty());
2823 }
2824
2825 #[test]
2826 fn attention_item_creation() {
2827 let item = AttentionItem {
2828 id: "a1".to_string(),
2829 topic: "test topic".to_string(),
2830 topic_tags: vec!["reliability".to_string()],
2831 priority: 0.8,
2832 source_type: AttentionSource::ContestedBelief,
2833 source_id: "b1".to_string(),
2834 assigned_persona: None,
2835 created_at: Utc::now(),
2836 resolved_at: None,
2837 };
2838 assert!(item.resolved_at.is_none());
2839 assert_eq!(item.priority, 0.8);
2840 }
2841}