1pub mod beliefs;
9pub mod executor;
10pub mod persistence;
11mod thinker;
12
13#[cfg(feature = "functiongemma")]
14pub mod tool_router;
15
16#[cfg(not(feature = "functiongemma"))]
17#[allow(dead_code)]
18pub mod tool_router {
19 use super::thinker::CandleDevicePreference;
20 use crate::provider::{CompletionResponse, ToolDefinition};
21 use anyhow::Result;
22
23 #[derive(Debug, Clone)]
25 pub struct ToolRouterConfig {
26 pub enabled: bool,
27 pub model_path: Option<String>,
28 pub tokenizer_path: Option<String>,
29 pub arch: String,
30 pub device: CandleDevicePreference,
31 pub max_tokens: usize,
32 pub temperature: f32,
33 }
34
35 impl Default for ToolRouterConfig {
36 fn default() -> Self {
37 Self {
38 enabled: false,
39 model_path: None,
40 tokenizer_path: None,
41 arch: "gemma3".to_string(),
42 device: CandleDevicePreference::Auto,
43 max_tokens: 128,
44 temperature: 0.1,
45 }
46 }
47 }
48
49 impl ToolRouterConfig {
50 pub fn from_env() -> Self {
51 Self::default()
52 }
53 }
54
55 #[derive(Debug, Clone, Default)]
57 pub struct ToolCallRouter;
58
59 impl ToolCallRouter {
60 pub fn from_config(config: &ToolRouterConfig) -> Result<Option<Self>> {
61 if config.enabled {
62 tracing::debug!(
63 "FunctionGemma requested but feature is disabled; rebuild with --features functiongemma"
64 );
65 }
66 Ok(None)
67 }
68
69 pub async fn maybe_reformat(
70 &self,
71 response: CompletionResponse,
72 _tools: &[ToolDefinition],
73 _model_supports_tools: bool,
74 ) -> CompletionResponse {
75 response
76 }
77 }
78}
79
80pub use thinker::{
81 CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
82};
83
84use crate::tool::ToolRegistry;
85use anyhow::{Result, anyhow};
86use beliefs::{Belief, BeliefStatus};
87use chrono::{DateTime, Duration as ChronoDuration, Utc};
88use serde::{Deserialize, Serialize};
89use serde_json::json;
90use std::collections::{HashMap, VecDeque};
91use std::sync::Arc;
92use std::sync::atomic::{AtomicBool, Ordering};
93use std::time::Duration;
94use tokio::sync::{Mutex, RwLock, broadcast};
95use tokio::task::JoinHandle;
96
97const _: () = {
99 fn _assert_types_used() {
100 let _ = std::mem::size_of::<CandleDevicePreference>();
101 let _ = std::mem::size_of::<ThinkerBackend>();
102 let _ = std::mem::size_of::<ThinkerOutput>();
103 }
104};
105use tokio::time::Instant;
106use uuid::Uuid;
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub enum PersonaStatus {
112 Active,
113 Idle,
114 Reaped,
115 Error,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct PersonaPolicy {
121 pub max_spawn_depth: u32,
122 pub max_branching_factor: u32,
123 pub token_budget_per_minute: u32,
124 pub compute_ms_per_minute: u32,
125 pub idle_ttl_secs: u64,
126 pub share_memory: bool,
127 #[serde(default)]
128 pub allowed_tools: Vec<String>,
129}
130
131impl Default for PersonaPolicy {
132 fn default() -> Self {
133 Self {
134 max_spawn_depth: 4,
135 max_branching_factor: 4,
136 token_budget_per_minute: 20_000,
137 compute_ms_per_minute: 10_000,
138 idle_ttl_secs: 3_600,
139 share_memory: false,
140 allowed_tools: Vec::new(),
141 }
142 }
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct PersonaIdentity {
148 pub id: String,
149 pub name: String,
150 pub role: String,
151 pub charter: String,
152 pub swarm_id: Option<String>,
153 pub parent_id: Option<String>,
154 pub depth: u32,
155 pub created_at: DateTime<Utc>,
156 #[serde(default)]
157 pub tags: Vec<String>,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct PersonaRuntimeState {
163 pub identity: PersonaIdentity,
164 pub policy: PersonaPolicy,
165 pub status: PersonaStatus,
166 pub thought_count: u64,
167 pub last_tick_at: Option<DateTime<Utc>>,
168 pub updated_at: DateTime<Utc>,
169 pub tokens_this_window: u32,
171 pub compute_ms_this_window: u32,
173 pub window_started_at: DateTime<Utc>,
175 pub last_progress_at: DateTime<Utc>,
177 #[serde(default)]
179 pub budget_paused: bool,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
184#[serde(rename_all = "snake_case")]
185pub enum ProposalRisk {
186 Low,
187 Medium,
188 High,
189 Critical,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
194#[serde(rename_all = "snake_case")]
195pub enum ProposalStatus {
196 Created,
197 Verified,
198 Rejected,
199 Executed,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
204#[serde(rename_all = "snake_case")]
205pub enum ProposalVote {
206 Approve,
207 Reject,
208 Veto,
209 Abstain,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct Proposal {
215 pub id: String,
216 pub persona_id: String,
217 pub title: String,
218 pub rationale: String,
219 pub evidence_refs: Vec<String>,
220 pub risk: ProposalRisk,
221 pub status: ProposalStatus,
222 pub created_at: DateTime<Utc>,
223 #[serde(default)]
225 pub votes: HashMap<String, ProposalVote>,
226 pub vote_deadline: Option<DateTime<Utc>>,
228 #[serde(default)]
230 pub votes_requested: bool,
231 #[serde(default)]
233 pub quorum_needed: usize,
234}
235
236#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
238#[serde(rename_all = "snake_case")]
239pub enum ThoughtEventType {
240 ThoughtGenerated,
241 HypothesisRaised,
242 CheckRequested,
243 CheckResult,
244 ProposalCreated,
245 ProposalVerified,
246 ProposalRejected,
247 ActionExecuted,
248 PersonaSpawned,
249 PersonaReaped,
250 SnapshotCompressed,
251 BeliefExtracted,
252 BeliefContested,
253 BeliefRevalidated,
254 BudgetPaused,
255 IdleReaped,
256 AttentionCreated,
257 VoteCast,
258 WorkspaceUpdated,
259}
260
261impl ThoughtEventType {
262 fn as_str(&self) -> &'static str {
263 match self {
264 Self::ThoughtGenerated => "thought_generated",
265 Self::HypothesisRaised => "hypothesis_raised",
266 Self::CheckRequested => "check_requested",
267 Self::CheckResult => "check_result",
268 Self::ProposalCreated => "proposal_created",
269 Self::ProposalVerified => "proposal_verified",
270 Self::ProposalRejected => "proposal_rejected",
271 Self::ActionExecuted => "action_executed",
272 Self::PersonaSpawned => "persona_spawned",
273 Self::PersonaReaped => "persona_reaped",
274 Self::SnapshotCompressed => "snapshot_compressed",
275 Self::BeliefExtracted => "belief_extracted",
276 Self::BeliefContested => "belief_contested",
277 Self::BeliefRevalidated => "belief_revalidated",
278 Self::BudgetPaused => "budget_paused",
279 Self::IdleReaped => "idle_reaped",
280 Self::AttentionCreated => "attention_created",
281 Self::VoteCast => "vote_cast",
282 Self::WorkspaceUpdated => "workspace_updated",
283 }
284 }
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct ThoughtEvent {
290 pub id: String,
291 pub event_type: ThoughtEventType,
292 pub persona_id: Option<String>,
293 pub swarm_id: Option<String>,
294 pub timestamp: DateTime<Utc>,
295 pub payload: serde_json::Value,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct MemorySnapshot {
301 pub id: String,
302 pub generated_at: DateTime<Utc>,
303 pub swarm_id: Option<String>,
304 pub persona_scope: Vec<String>,
305 pub summary: String,
306 pub hot_event_count: usize,
307 pub warm_fact_count: usize,
308 pub cold_snapshot_count: usize,
309 pub metadata: HashMap<String, serde_json::Value>,
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct CreatePersonaRequest {
315 pub persona_id: Option<String>,
316 pub name: String,
317 pub role: String,
318 pub charter: String,
319 pub swarm_id: Option<String>,
320 pub parent_id: Option<String>,
321 pub policy: Option<PersonaPolicy>,
322 #[serde(default)]
323 pub tags: Vec<String>,
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct SpawnPersonaRequest {
329 pub persona_id: Option<String>,
330 pub name: String,
331 pub role: String,
332 pub charter: String,
333 pub swarm_id: Option<String>,
334 pub policy: Option<PersonaPolicy>,
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct ReapPersonaRequest {
340 pub cascade: Option<bool>,
341 pub reason: Option<String>,
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct StartCognitionRequest {
347 pub loop_interval_ms: Option<u64>,
348 pub seed_persona: Option<CreatePersonaRequest>,
349}
350
351#[derive(Debug, Clone, Serialize, Deserialize)]
353pub struct StopCognitionRequest {
354 pub reason: Option<String>,
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
361#[serde(rename_all = "snake_case")]
362pub enum AttentionSource {
363 ContestedBelief,
364 FailedCheck,
365 StaleBelief,
366 ProposalTimeout,
367 FailedExecution,
368}
369
370#[derive(Debug, Clone, Serialize, Deserialize)]
372pub struct AttentionItem {
373 pub id: String,
374 pub topic: String,
375 pub topic_tags: Vec<String>,
376 pub priority: f32,
377 pub source_type: AttentionSource,
378 pub source_id: String,
379 pub assigned_persona: Option<String>,
380 pub created_at: DateTime<Utc>,
381 pub resolved_at: Option<DateTime<Utc>>,
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct SwarmGovernance {
387 pub quorum_fraction: f32,
388 pub required_approvers_by_role: HashMap<ProposalRisk, Vec<String>>,
389 pub veto_roles: Vec<String>,
390 pub vote_timeout_secs: u64,
391}
392
393impl Default for SwarmGovernance {
394 fn default() -> Self {
395 Self {
396 quorum_fraction: 0.5,
397 required_approvers_by_role: HashMap::new(),
398 veto_roles: Vec::new(),
399 vote_timeout_secs: 300,
400 }
401 }
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize)]
406pub struct GlobalWorkspace {
407 pub top_beliefs: Vec<String>,
408 pub top_uncertainties: Vec<String>,
409 pub top_attention: Vec<String>,
410 pub active_objectives: Vec<String>,
411 pub updated_at: DateTime<Utc>,
412}
413
414impl Default for GlobalWorkspace {
415 fn default() -> Self {
416 Self {
417 top_beliefs: Vec::new(),
418 top_uncertainties: Vec::new(),
419 top_attention: Vec::new(),
420 active_objectives: Vec::new(),
421 updated_at: Utc::now(),
422 }
423 }
424}
425
426#[derive(Debug, Clone, Serialize, Deserialize)]
428pub struct CognitionStatus {
429 pub enabled: bool,
430 pub running: bool,
431 pub loop_interval_ms: u64,
432 pub started_at: Option<DateTime<Utc>>,
433 pub last_tick_at: Option<DateTime<Utc>>,
434 pub persona_count: usize,
435 pub active_persona_count: usize,
436 pub events_buffered: usize,
437 pub snapshots_buffered: usize,
438}
439
440#[derive(Debug, Clone, Serialize, Deserialize)]
442pub struct ReapPersonaResponse {
443 pub reaped_ids: Vec<String>,
444 pub count: usize,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
449pub struct LineageNode {
450 pub persona_id: String,
451 pub parent_id: Option<String>,
452 pub children: Vec<String>,
453 pub depth: u32,
454 pub status: PersonaStatus,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
459pub struct LineageGraph {
460 pub nodes: Vec<LineageNode>,
461 pub roots: Vec<String>,
462 pub total_edges: usize,
463}
464
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
466enum ThoughtPhase {
467 Observe,
468 Reflect,
469 Test,
470 Compress,
471}
472
473impl ThoughtPhase {
474 fn from_thought_count(thought_count: u64) -> Self {
475 match thought_count % 4 {
476 1 => Self::Observe,
477 2 => Self::Reflect,
478 3 => Self::Test,
479 _ => Self::Compress,
480 }
481 }
482
483 fn as_str(&self) -> &'static str {
484 match self {
485 Self::Observe => "observe",
486 Self::Reflect => "reflect",
487 Self::Test => "test",
488 Self::Compress => "compress",
489 }
490 }
491
492 fn event_type(&self) -> ThoughtEventType {
493 match self {
494 Self::Observe => ThoughtEventType::ThoughtGenerated,
495 Self::Reflect => ThoughtEventType::HypothesisRaised,
496 Self::Test => ThoughtEventType::CheckRequested,
497 Self::Compress => ThoughtEventType::SnapshotCompressed,
498 }
499 }
500}
501
502#[derive(Debug, Clone)]
503struct ThoughtWorkItem {
504 persona_id: String,
505 persona_name: String,
506 role: String,
507 charter: String,
508 swarm_id: Option<String>,
509 thought_count: u64,
510 phase: ThoughtPhase,
511}
512
513#[derive(Debug, Clone)]
514struct ThoughtResult {
515 source: &'static str,
516 model: Option<String>,
517 finish_reason: Option<String>,
518 thinking: String,
519 prompt_tokens: Option<u32>,
520 completion_tokens: Option<u32>,
521 total_tokens: Option<u32>,
522 latency_ms: u128,
523 error: Option<String>,
524}
525
526#[derive(Debug, Clone)]
528pub struct CognitionRuntimeOptions {
529 pub enabled: bool,
530 pub loop_interval_ms: u64,
531 pub max_events: usize,
532 pub max_snapshots: usize,
533 pub default_policy: PersonaPolicy,
534}
535
536impl Default for CognitionRuntimeOptions {
537 fn default() -> Self {
538 Self {
539 enabled: false,
540 loop_interval_ms: 2_000,
541 max_events: 2_000,
542 max_snapshots: 128,
543 default_policy: PersonaPolicy::default(),
544 }
545 }
546}
547
548#[derive(Debug)]
550pub struct CognitionRuntime {
551 enabled: bool,
552 max_events: usize,
553 max_snapshots: usize,
554 default_policy: PersonaPolicy,
555 running: Arc<AtomicBool>,
556 loop_interval_ms: Arc<RwLock<u64>>,
557 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
558 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
559 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
560 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
561 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
562 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
563 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
564 event_tx: broadcast::Sender<ThoughtEvent>,
565 thinker: Option<Arc<ThinkerClient>>,
566 beliefs: Arc<RwLock<HashMap<String, Belief>>>,
567 attention_queue: Arc<RwLock<Vec<AttentionItem>>>,
568 governance: Arc<RwLock<SwarmGovernance>>,
569 workspace: Arc<RwLock<GlobalWorkspace>>,
570 tools: Option<Arc<ToolRegistry>>,
571 receipts: Arc<RwLock<Vec<executor::DecisionReceipt>>>,
572 pending_approvals: Arc<RwLock<HashMap<String, bool>>>,
574}
575
576impl CognitionRuntime {
577 pub fn new_from_env() -> Self {
579 let options = CognitionRuntimeOptions {
580 enabled: env_bool("CODETETHER_COGNITION_ENABLED", true),
581 loop_interval_ms: env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000),
582 max_events: env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000),
583 max_snapshots: env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128),
584 default_policy: PersonaPolicy {
585 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
586 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
587 token_budget_per_minute: env_u32(
588 "CODETETHER_COGNITION_TOKEN_BUDGET_PER_MINUTE",
589 20_000,
590 ),
591 compute_ms_per_minute: env_u32(
592 "CODETETHER_COGNITION_COMPUTE_MS_PER_MINUTE",
593 10_000,
594 ),
595 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
596 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
597 allowed_tools: Vec::new(),
598 },
599 };
600
601 let thinker_backend = thinker::ThinkerBackend::from_env(
602 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
603 .unwrap_or_else(|_| "openai_compat".to_string()),
604 );
605 let thinker_timeout_default = match thinker_backend {
606 thinker::ThinkerBackend::OpenAICompat => 30_000,
607 thinker::ThinkerBackend::Candle => 12_000,
608 thinker::ThinkerBackend::Bedrock => 60_000,
609 };
610 let thinker_config = ThinkerConfig {
611 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
612 backend: thinker_backend,
613 endpoint: normalize_thinker_endpoint(
614 &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
615 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
616 ),
617 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
618 .unwrap_or_else(|_| "qwen3.5-9b".to_string()),
619 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
620 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
621 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
622 .ok()
623 .and_then(|v| v.parse::<f32>().ok()),
624 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
625 timeout_ms: env_u64(
626 "CODETETHER_COGNITION_THINKER_TIMEOUT_MS",
627 thinker_timeout_default,
628 ),
629 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
630 candle_tokenizer_path: std::env::var(
631 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
632 )
633 .ok(),
634 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
635 candle_device: thinker::CandleDevicePreference::from_env(
636 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
637 .unwrap_or_else(|_| "auto".to_string()),
638 ),
639 candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
640 candle_repeat_penalty: env_f32(
641 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
642 1.1,
643 ),
644 candle_repeat_last_n: env_usize(
645 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
646 64,
647 ),
648 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
649 bedrock_region: std::env::var("CODETETHER_COGNITION_THINKER_BEDROCK_REGION")
650 .unwrap_or_else(|_| {
651 std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-west-2".to_string())
652 }),
653 bedrock_service_tier: std::env::var(
654 "CODETETHER_COGNITION_THINKER_BEDROCK_SERVICE_TIER",
655 )
656 .ok()
657 .map(|v| v.trim().to_ascii_lowercase())
658 .filter(|v| !v.is_empty()),
659 };
660
661 let mut runtime = Self::new_with_options(options);
662 runtime.thinker = Some(thinker_config).and_then(|cfg| {
664 if !cfg.enabled {
665 return None;
666 }
667 match ThinkerClient::new(cfg) {
668 Ok(client) => {
669 tracing::info!(
670 backend = ?client.config().backend,
671 endpoint = %client.config().endpoint,
672 model = %client.config().model,
673 "Cognition thinker initialized"
674 );
675 Some(Arc::new(client))
676 }
677 Err(error) => {
678 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
679 None
680 }
681 }
682 });
683 runtime
684 }
685
686 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
688 Self::new_with_options_and_thinker(options, None)
689 }
690
691 fn new_with_options_and_thinker(
692 options: CognitionRuntimeOptions,
693 thinker_config: Option<ThinkerConfig>,
694 ) -> Self {
695 let (event_tx, _) = broadcast::channel(256);
696 let thinker = thinker_config.and_then(|cfg| {
697 if !cfg.enabled {
698 return None;
699 }
700 match ThinkerClient::new(cfg) {
701 Ok(client) => {
702 tracing::info!(
703 backend = ?client.config().backend,
704 endpoint = %client.config().endpoint,
705 model = %client.config().model,
706 "Cognition thinker initialized"
707 );
708 Some(Arc::new(client))
709 }
710 Err(error) => {
711 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
712 None
713 }
714 }
715 });
716
717 #[cfg(test)]
720 let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) = (
721 HashMap::new(),
722 HashMap::new(),
723 HashMap::new(),
724 Vec::new(),
725 GlobalWorkspace::default(),
726 );
727
728 #[cfg(not(test))]
729 let (init_personas, init_beliefs, init_proposals, init_attention, init_workspace) =
730 if let Some(persisted) = persistence::load_state() {
731 tracing::info!(
732 personas = persisted.personas.len(),
733 beliefs = persisted.beliefs.len(),
734 persisted_at = %persisted.persisted_at,
735 "Restoring persisted cognition state"
736 );
737 (
738 persisted.personas,
739 persisted.beliefs,
740 persisted.proposals,
741 persisted.attention_queue,
742 persisted.workspace,
743 )
744 } else {
745 (
746 HashMap::new(),
747 HashMap::new(),
748 HashMap::new(),
749 Vec::new(),
750 GlobalWorkspace::default(),
751 )
752 };
753
754 Self {
755 enabled: options.enabled,
756 max_events: options.max_events.max(32),
757 max_snapshots: options.max_snapshots.max(8),
758 default_policy: options.default_policy,
759 running: Arc::new(AtomicBool::new(false)),
760 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
761 started_at: Arc::new(RwLock::new(None)),
762 last_tick_at: Arc::new(RwLock::new(None)),
763 personas: Arc::new(RwLock::new(init_personas)),
764 proposals: Arc::new(RwLock::new(init_proposals)),
765 events: Arc::new(RwLock::new(VecDeque::new())),
766 snapshots: Arc::new(RwLock::new(VecDeque::new())),
767 loop_handle: Arc::new(Mutex::new(None)),
768 event_tx,
769 thinker,
770 beliefs: Arc::new(RwLock::new(init_beliefs)),
771 attention_queue: Arc::new(RwLock::new(init_attention)),
772 governance: Arc::new(RwLock::new(SwarmGovernance::default())),
773 workspace: Arc::new(RwLock::new(init_workspace)),
774 tools: None,
775 receipts: Arc::new(RwLock::new(Vec::new())),
776 pending_approvals: Arc::new(RwLock::new(HashMap::new())),
777 }
778 }
779
780 pub fn is_enabled(&self) -> bool {
782 self.enabled
783 }
784
785 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
787 self.event_tx.subscribe()
788 }
789
790 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
792 if !self.enabled {
793 return Err(anyhow!(
794 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
795 ));
796 }
797
798 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
799 if let Some(request) = req {
800 if let Some(interval) = request.loop_interval_ms {
801 let mut lock = self.loop_interval_ms.write().await;
802 *lock = interval.max(100);
803 }
804 requested_seed_persona = request.seed_persona;
805 }
806
807 let has_personas = !self.personas.read().await.is_empty();
808 if !has_personas {
809 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
810 let _ = self.create_persona(seed).await?;
811 }
812
813 if self.running.load(Ordering::SeqCst) {
814 return Ok(self.status().await);
815 }
816
817 self.running.store(true, Ordering::SeqCst);
818 {
819 let mut started = self.started_at.write().await;
820 *started = Some(Utc::now());
821 }
822
823 let running = Arc::clone(&self.running);
824 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
825 let last_tick_at = Arc::clone(&self.last_tick_at);
826 let personas = Arc::clone(&self.personas);
827 let proposals = Arc::clone(&self.proposals);
828 let events = Arc::clone(&self.events);
829 let snapshots = Arc::clone(&self.snapshots);
830 let max_events = self.max_events;
831 let max_snapshots = self.max_snapshots;
832 let event_tx = self.event_tx.clone();
833 let thinker = self.thinker.clone();
834 let beliefs = Arc::clone(&self.beliefs);
835 let attention_queue = Arc::clone(&self.attention_queue);
836 let governance = Arc::clone(&self.governance);
837 let workspace = Arc::clone(&self.workspace);
838 let tools = self.tools.clone();
839 let receipts = Arc::clone(&self.receipts);
840 let pending_approvals = Arc::clone(&self.pending_approvals);
841
842 let handle = tokio::spawn(async move {
843 let mut next_tick = Instant::now();
844 while running.load(Ordering::SeqCst) {
845 let now_instant = Instant::now();
846 if now_instant < next_tick {
847 tokio::time::sleep_until(next_tick).await;
848 }
849 if !running.load(Ordering::SeqCst) {
850 break;
851 }
852
853 let now = Utc::now();
854 {
855 let mut lock = last_tick_at.write().await;
856 *lock = Some(now);
857 }
858
859 let mut new_events = Vec::new();
860 let mut new_snapshots = Vec::new();
861 let mut new_proposals = Vec::new();
862
863 let work_items: Vec<ThoughtWorkItem> = {
865 let mut map = personas.write().await;
866 let mut items = Vec::new();
867 for persona in map.values_mut() {
868 if persona.status != PersonaStatus::Active {
869 continue;
870 }
871
872 let window_elapsed = now
874 .signed_duration_since(persona.window_started_at)
875 .num_seconds();
876 if window_elapsed >= 60 {
877 persona.tokens_this_window = 0;
878 persona.compute_ms_this_window = 0;
879 persona.window_started_at = now;
880 }
881
882 let token_ok =
884 persona.tokens_this_window < persona.policy.token_budget_per_minute;
885 let compute_ok =
886 persona.compute_ms_this_window < persona.policy.compute_ms_per_minute;
887 if !token_ok || !compute_ok {
888 if !persona.budget_paused {
889 persona.budget_paused = true;
890 new_events.push(ThoughtEvent {
891 id: Uuid::new_v4().to_string(),
892 event_type: ThoughtEventType::BudgetPaused,
893 persona_id: Some(persona.identity.id.clone()),
894 swarm_id: persona.identity.swarm_id.clone(),
895 timestamp: now,
896 payload: json!({
897 "budget_paused": true,
898 "tokens_used": persona.tokens_this_window,
899 "compute_ms_used": persona.compute_ms_this_window,
900 "token_budget": persona.policy.token_budget_per_minute,
901 "compute_budget": persona.policy.compute_ms_per_minute,
902 }),
903 });
904 }
905 continue; }
907
908 persona.budget_paused = false;
909 persona.thought_count = persona.thought_count.saturating_add(1);
910 persona.last_tick_at = Some(now);
911 persona.updated_at = now;
912 items.push(ThoughtWorkItem {
913 persona_id: persona.identity.id.clone(),
914 persona_name: persona.identity.name.clone(),
915 role: persona.identity.role.clone(),
916 charter: persona.identity.charter.clone(),
917 swarm_id: persona.identity.swarm_id.clone(),
918 thought_count: persona.thought_count,
919 phase: ThoughtPhase::from_thought_count(persona.thought_count),
920 });
921 }
922 items
923 };
924
925 for work in &work_items {
926 let context = recent_persona_context(&events, &work.persona_id, 8).await;
927
928 let thought = generate_phase_thought(thinker.as_deref(), work, &context).await;
929
930 let event_timestamp = Utc::now();
931 let is_fallback = thought.source == "fallback";
932 let tokens_used = thought.total_tokens.unwrap_or(0);
933 let compute_used = thought.latency_ms as u32;
934
935 new_events.push(ThoughtEvent {
936 id: Uuid::new_v4().to_string(),
937 event_type: work.phase.event_type(),
938 persona_id: Some(work.persona_id.clone()),
939 swarm_id: work.swarm_id.clone(),
940 timestamp: event_timestamp,
941 payload: json!({
942 "phase": work.phase.as_str(),
943 "thought_count": work.thought_count,
944 "persona": {
945 "id": work.persona_id.clone(),
946 "name": work.persona_name.clone(),
947 "role": work.role.clone(),
948 },
949 "context_event_count": context.len(),
950 "thinking": thought.thinking.clone(),
951 "source": thought.source,
952 "model": thought.model.clone(),
953 "finish_reason": thought.finish_reason.clone(),
954 "usage": {
955 "prompt_tokens": thought.prompt_tokens,
956 "completion_tokens": thought.completion_tokens,
957 "total_tokens": thought.total_tokens,
958 },
959 "latency_ms": thought.latency_ms,
960 "error": thought.error.clone(),
961 }),
962 });
963
964 {
966 let mut map = personas.write().await;
967 if let Some(persona) = map.get_mut(&work.persona_id) {
968 persona.tokens_this_window =
969 persona.tokens_this_window.saturating_add(tokens_used);
970 persona.compute_ms_this_window =
971 persona.compute_ms_this_window.saturating_add(compute_used);
972 if !is_fallback {
973 persona.last_progress_at = Utc::now();
974 }
975 }
976 }
977
978 if work.phase == ThoughtPhase::Reflect && !is_fallback {
980 let extracted = beliefs::extract_beliefs_from_thought(
981 thinker.as_deref(),
982 &work.persona_id,
983 &thought.thinking,
984 )
985 .await;
986
987 if !extracted.is_empty() {
988 let mut belief_store = beliefs.write().await;
989 let mut attn_queue = attention_queue.write().await;
990 for mut new_belief in extracted {
991 let existing_id = belief_store
993 .values()
994 .find(|b| {
995 b.belief_key == new_belief.belief_key
996 && b.status != BeliefStatus::Invalidated
997 })
998 .map(|b| b.id.clone());
999
1000 if let Some(eid) = existing_id {
1001 if let Some(existing) = belief_store.get_mut(&eid) {
1003 if !existing.confirmed_by.contains(&work.persona_id) {
1004 existing.confirmed_by.push(work.persona_id.clone());
1005 }
1006 existing.revalidation_success();
1007 }
1008 } else {
1009 let contest_targets: Vec<String> =
1011 new_belief.contradicts.clone();
1012 for target_key in &contest_targets {
1013 if let Some(target) = belief_store.values_mut().find(|b| {
1014 &b.belief_key == target_key
1015 && b.status != BeliefStatus::Invalidated
1016 }) {
1017 target.contested_by.push(new_belief.id.clone());
1018 if !new_belief.contradicts.contains(&target.belief_key)
1019 {
1020 new_belief
1021 .contradicts
1022 .push(target.belief_key.clone());
1023 }
1024 target.revalidation_failure();
1026 if target.confidence >= 0.5 {
1027 attn_queue.push(AttentionItem {
1029 id: Uuid::new_v4().to_string(),
1030 topic: format!(
1031 "Revalidate belief: {}",
1032 target.claim
1033 ),
1034 topic_tags: vec![target.belief_key.clone()],
1035 priority: 0.7,
1036 source_type: AttentionSource::ContestedBelief,
1037 source_id: target.id.clone(),
1038 assigned_persona: None,
1039 created_at: Utc::now(),
1040 resolved_at: None,
1041 });
1042 }
1043 }
1044 }
1045
1046 new_events.push(ThoughtEvent {
1047 id: Uuid::new_v4().to_string(),
1048 event_type: ThoughtEventType::BeliefExtracted,
1049 persona_id: Some(work.persona_id.clone()),
1050 swarm_id: work.swarm_id.clone(),
1051 timestamp: Utc::now(),
1052 payload: json!({
1053 "belief_id": new_belief.id,
1054 "belief_key": new_belief.belief_key,
1055 "claim": trim_for_storage(&new_belief.claim, 280),
1056 "confidence": new_belief.confidence,
1057 }),
1058 });
1059
1060 {
1062 let mut map = personas.write().await;
1063 if let Some(p) = map.get_mut(&work.persona_id) {
1064 p.last_progress_at = Utc::now();
1065 }
1066 }
1067
1068 new_belief.clamp_confidence();
1069 belief_store.insert(new_belief.id.clone(), new_belief);
1070 }
1071 }
1072 }
1073 }
1074
1075 if work.phase == ThoughtPhase::Test {
1076 new_events.push(ThoughtEvent {
1077 id: Uuid::new_v4().to_string(),
1078 event_type: ThoughtEventType::CheckResult,
1079 persona_id: Some(work.persona_id.clone()),
1080 swarm_id: work.swarm_id.clone(),
1081 timestamp: Utc::now(),
1082 payload: json!({
1083 "phase": work.phase.as_str(),
1084 "thought_count": work.thought_count,
1085 "result_excerpt": trim_for_storage(&thought.thinking, 280),
1086 "source": thought.source,
1087 "model": thought.model,
1088 }),
1089 });
1090
1091 {
1093 let mut map = personas.write().await;
1094 if let Some(p) = map.get_mut(&work.persona_id) {
1095 p.last_progress_at = Utc::now();
1096 }
1097 }
1098
1099 if !is_fallback && let Some(ref tool_registry) = tools {
1101 let allowed = {
1102 let map = personas.read().await;
1103 map.get(&work.persona_id)
1104 .map(|p| p.policy.allowed_tools.clone())
1105 .unwrap_or_default()
1106 };
1107 if !allowed.is_empty() {
1108 let tool_results = executor::execute_tool_requests(
1109 thinker.as_deref(),
1110 tool_registry,
1111 &work.persona_id,
1112 &thought.thinking,
1113 &allowed,
1114 )
1115 .await;
1116
1117 for result_event in tool_results {
1118 new_events.push(result_event);
1119 let mut map = personas.write().await;
1121 if let Some(p) = map.get_mut(&work.persona_id) {
1122 p.last_progress_at = Utc::now();
1123 }
1124 }
1125 }
1126 }
1127 }
1128
1129 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
1130 let gov = governance.read().await;
1131 let proposal = Proposal {
1132 id: Uuid::new_v4().to_string(),
1133 persona_id: work.persona_id.clone(),
1134 title: proposal_title_from_thought(
1135 &thought.thinking,
1136 work.thought_count,
1137 ),
1138 rationale: trim_for_storage(&thought.thinking, 900),
1139 evidence_refs: vec!["internal.thought_stream".to_string()],
1140 risk: ProposalRisk::Low,
1141 status: ProposalStatus::Created,
1142 created_at: Utc::now(),
1143 votes: HashMap::new(),
1144 vote_deadline: Some(
1145 Utc::now() + ChronoDuration::seconds(gov.vote_timeout_secs as i64),
1146 ),
1147 votes_requested: false,
1148 quorum_needed: (work_items.len() as f32 * gov.quorum_fraction).ceil()
1149 as usize,
1150 };
1151
1152 new_events.push(ThoughtEvent {
1153 id: Uuid::new_v4().to_string(),
1154 event_type: ThoughtEventType::ProposalCreated,
1155 persona_id: Some(work.persona_id.clone()),
1156 swarm_id: work.swarm_id.clone(),
1157 timestamp: Utc::now(),
1158 payload: json!({
1159 "proposal_id": proposal.id,
1160 "title": proposal.title,
1161 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
1162 "source": thought.source,
1163 "model": thought.model,
1164 }),
1165 });
1166
1167 new_proposals.push(proposal);
1168 }
1169
1170 if work.phase == ThoughtPhase::Compress {
1171 new_snapshots.push(MemorySnapshot {
1172 id: Uuid::new_v4().to_string(),
1173 generated_at: Utc::now(),
1174 swarm_id: work.swarm_id.clone(),
1175 persona_scope: vec![work.persona_id.clone()],
1176 summary: trim_for_storage(&thought.thinking, 1_500),
1177 hot_event_count: context.len(),
1178 warm_fact_count: estimate_fact_count(&thought.thinking),
1179 cold_snapshot_count: 1,
1180 metadata: HashMap::from([
1181 (
1182 "phase".to_string(),
1183 serde_json::Value::String(work.phase.as_str().to_string()),
1184 ),
1185 (
1186 "role".to_string(),
1187 serde_json::Value::String(work.role.clone()),
1188 ),
1189 (
1190 "source".to_string(),
1191 serde_json::Value::String(thought.source.to_string()),
1192 ),
1193 (
1194 "model".to_string(),
1195 serde_json::Value::String(
1196 thought
1197 .model
1198 .clone()
1199 .unwrap_or_else(|| "fallback".to_string()),
1200 ),
1201 ),
1202 (
1203 "completion_tokens".to_string(),
1204 serde_json::Value::Number(serde_json::Number::from(
1205 thought.completion_tokens.unwrap_or(0) as u64,
1206 )),
1207 ),
1208 ]),
1209 });
1210
1211 {
1213 let mut belief_store = beliefs.write().await;
1214 let mut attn_queue = attention_queue.write().await;
1215 let stale_ids: Vec<String> = belief_store
1216 .values()
1217 .filter(|b| {
1218 b.status == BeliefStatus::Active && now > b.review_after
1219 })
1220 .map(|b| b.id.clone())
1221 .collect();
1222 for id in stale_ids {
1223 if let Some(belief) = belief_store.get_mut(&id) {
1224 belief.decay();
1225 attn_queue.push(AttentionItem {
1226 id: Uuid::new_v4().to_string(),
1227 topic: format!("Stale belief: {}", belief.claim),
1228 topic_tags: vec![belief.belief_key.clone()],
1229 priority: 0.4,
1230 source_type: AttentionSource::StaleBelief,
1231 source_id: belief.id.clone(),
1232 assigned_persona: None,
1233 created_at: now,
1234 resolved_at: None,
1235 });
1236 }
1237 }
1238 }
1239
1240 {
1242 let belief_store = beliefs.read().await;
1243 let attn_queue = attention_queue.read().await;
1244
1245 let mut sorted_beliefs: Vec<&Belief> = belief_store
1246 .values()
1247 .filter(|b| b.status == BeliefStatus::Active)
1248 .collect();
1249 sorted_beliefs.sort_by(|a, b| {
1250 let score_a = a.confidence
1251 * (1.0
1252 / (1.0
1253 + now.signed_duration_since(a.updated_at).num_minutes()
1254 as f32));
1255 let score_b = b.confidence
1256 * (1.0
1257 / (1.0
1258 + now.signed_duration_since(b.updated_at).num_minutes()
1259 as f32));
1260 score_b
1261 .partial_cmp(&score_a)
1262 .unwrap_or(std::cmp::Ordering::Equal)
1263 });
1264
1265 let top_beliefs: Vec<String> = sorted_beliefs
1266 .iter()
1267 .take(10)
1268 .map(|b| b.id.clone())
1269 .collect();
1270 let top_uncertainties: Vec<String> = {
1271 let mut uncertain: Vec<&Belief> = belief_store
1272 .values()
1273 .filter(|b| {
1274 b.status == BeliefStatus::Stale
1275 || !b.contested_by.is_empty()
1276 })
1277 .collect();
1278 uncertain.sort_by(|a, b| {
1281 let a_contested = !a.contested_by.is_empty();
1282 let b_contested = !b.contested_by.is_empty();
1283 b_contested
1284 .cmp(&a_contested)
1285 .then_with(|| {
1286 a.confidence
1287 .partial_cmp(&b.confidence)
1288 .unwrap_or(std::cmp::Ordering::Equal)
1289 })
1290 .then_with(|| a.updated_at.cmp(&b.updated_at))
1291 });
1292 uncertain
1293 .iter()
1294 .take(5)
1295 .map(|b| {
1296 format!(
1297 "[{}] {}",
1298 b.belief_key,
1299 trim_for_storage(&b.claim, 80)
1300 )
1301 })
1302 .collect()
1303 };
1304
1305 let mut sorted_attn: Vec<&AttentionItem> = attn_queue
1306 .iter()
1307 .filter(|a| a.resolved_at.is_none())
1308 .collect();
1309 sorted_attn.sort_by(|a, b| {
1310 b.priority
1311 .partial_cmp(&a.priority)
1312 .unwrap_or(std::cmp::Ordering::Equal)
1313 });
1314 let top_attention: Vec<String> =
1315 sorted_attn.iter().take(10).map(|a| a.id.clone()).collect();
1316
1317 let mut ws = workspace.write().await;
1318 ws.top_beliefs = top_beliefs;
1319 ws.top_uncertainties = top_uncertainties;
1320 ws.top_attention = top_attention;
1321 ws.updated_at = now;
1322 }
1323
1324 new_events.push(ThoughtEvent {
1325 id: Uuid::new_v4().to_string(),
1326 event_type: ThoughtEventType::WorkspaceUpdated,
1327 persona_id: Some(work.persona_id.clone()),
1328 swarm_id: work.swarm_id.clone(),
1329 timestamp: Utc::now(),
1330 payload: json!({ "updated": true }),
1331 });
1332 }
1333 }
1334
1335 {
1337 let gov = governance.read().await;
1338 let mut proposal_store = proposals.write().await;
1339 let persona_map = personas.read().await;
1340
1341 let proposal_ids: Vec<String> = proposal_store
1342 .values()
1343 .filter(|p| p.status == ProposalStatus::Created)
1344 .map(|p| p.id.clone())
1345 .collect();
1346
1347 let mut attn_queue = attention_queue.write().await;
1348 for pid in proposal_ids {
1349 if let Some(proposal) = proposal_store.get_mut(&pid) {
1350 let quorum_needed = proposal.quorum_needed.max(1);
1351
1352 if let Some(deadline) = proposal.vote_deadline
1354 && now > deadline
1355 {
1356 if proposal.votes.len() < quorum_needed {
1357 attn_queue.push(AttentionItem {
1359 id: Uuid::new_v4().to_string(),
1360 topic: format!("Proposal vote timeout: {}", proposal.title),
1361 topic_tags: Vec::new(),
1362 priority: 0.6,
1363 source_type: AttentionSource::ProposalTimeout,
1364 source_id: proposal.id.clone(),
1365 assigned_persona: None,
1366 created_at: now,
1367 resolved_at: None,
1368 });
1369 proposal.status = ProposalStatus::Rejected;
1370 continue;
1371 }
1372
1373 let required_roles = gov
1375 .required_approvers_by_role
1376 .get(&proposal.risk)
1377 .cloned()
1378 .unwrap_or_default();
1379 let all_required_met = required_roles.iter().all(|role| {
1380 proposal.votes.iter().any(|(vid, vote)| {
1381 *vote == ProposalVote::Approve
1382 && persona_map
1383 .get(vid)
1384 .map(|p| &p.identity.role == role)
1385 .unwrap_or(false)
1386 })
1387 });
1388 if !all_required_met {
1389 attn_queue.push(AttentionItem {
1390 id: Uuid::new_v4().to_string(),
1391 topic: format!(
1392 "Missing required approvers: {}",
1393 proposal.title
1394 ),
1395 topic_tags: Vec::new(),
1396 priority: 0.7,
1397 source_type: AttentionSource::ProposalTimeout,
1398 source_id: proposal.id.clone(),
1399 assigned_persona: None,
1400 created_at: now,
1401 resolved_at: None,
1402 });
1403 proposal.status = ProposalStatus::Rejected;
1404 continue;
1405 }
1406 }
1407
1408 if proposal.votes.len() >= quorum_needed {
1410 let vetoed = proposal.votes.iter().any(|(voter_id, vote)| {
1412 if *vote != ProposalVote::Veto {
1413 return false;
1414 }
1415 if let Some(voter) = persona_map.get(voter_id) {
1416 gov.veto_roles.contains(&voter.identity.role)
1417 } else {
1418 false
1419 }
1420 });
1421
1422 if vetoed {
1423 proposal.status = ProposalStatus::Rejected;
1424 continue;
1425 }
1426
1427 let required_roles = gov
1429 .required_approvers_by_role
1430 .get(&proposal.risk)
1431 .cloned()
1432 .unwrap_or_default();
1433 let all_required_met = required_roles.iter().all(|role| {
1434 proposal.votes.iter().any(|(vid, vote)| {
1435 *vote == ProposalVote::Approve
1436 && persona_map
1437 .get(vid)
1438 .map(|p| &p.identity.role == role)
1439 .unwrap_or(false)
1440 })
1441 });
1442
1443 if !all_required_met {
1444 continue; }
1446
1447 let approvals = proposal
1449 .votes
1450 .values()
1451 .filter(|v| **v == ProposalVote::Approve)
1452 .count();
1453 let rejections = proposal
1454 .votes
1455 .values()
1456 .filter(|v| **v == ProposalVote::Reject)
1457 .count();
1458
1459 if approvals > rejections {
1460 proposal.status = ProposalStatus::Verified;
1461 } else {
1462 proposal.status = ProposalStatus::Rejected;
1463 }
1464 }
1465 }
1466 }
1467 }
1468
1469 {
1471 let mut proposal_store = proposals.write().await;
1472 let verified_ids: Vec<String> = proposal_store
1473 .values()
1474 .filter(|p| p.status == ProposalStatus::Verified)
1475 .map(|p| p.id.clone())
1476 .collect();
1477
1478 for pid in verified_ids {
1479 if let Some(proposal) = proposal_store.get_mut(&pid) {
1480 if proposal.risk == ProposalRisk::Critical {
1481 let approved = {
1483 let approvals = pending_approvals.read().await;
1484 approvals.get(&pid).copied().unwrap_or(false)
1485 };
1486 if !approved {
1487 let mut approvals = pending_approvals.write().await;
1489 approvals.entry(pid.clone()).or_insert(false);
1490 continue;
1491 }
1492 }
1493
1494 let receipt = executor::DecisionReceipt {
1496 id: Uuid::new_v4().to_string(),
1497 proposal_id: pid.clone(),
1498 inputs: proposal.evidence_refs.clone(),
1499 governance_decision: format!(
1500 "Approved with {} votes",
1501 proposal.votes.len()
1502 ),
1503 capability_leases: Vec::new(),
1504 tool_invocations: Vec::new(),
1505 outcome: executor::ExecutionOutcome::Success {
1506 summary: format!("Proposal '{}' executed", proposal.title),
1507 },
1508 created_at: Utc::now(),
1509 };
1510
1511 new_events.push(ThoughtEvent {
1512 id: Uuid::new_v4().to_string(),
1513 event_type: ThoughtEventType::ActionExecuted,
1514 persona_id: Some(proposal.persona_id.clone()),
1515 swarm_id: None,
1516 timestamp: Utc::now(),
1517 payload: json!({
1518 "receipt_id": receipt.id,
1519 "proposal_id": pid,
1520 "outcome": "success",
1521 "summary": format!("Proposal '{}' executed", proposal.title),
1522 }),
1523 });
1524
1525 receipts.write().await.push(receipt);
1526 proposal.status = ProposalStatus::Executed;
1527 }
1528 }
1529 }
1530
1531 if !new_proposals.is_empty() {
1532 let mut proposal_store = proposals.write().await;
1533 for proposal in new_proposals {
1534 proposal_store.insert(proposal.id.clone(), proposal);
1535 }
1536 }
1537
1538 for event in new_events {
1539 push_event_internal(&events, max_events, &event_tx, event).await;
1540 }
1541 for snapshot in new_snapshots {
1542 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
1543 }
1544
1545 {
1547 let mut map = personas.write().await;
1548 let idle_ids: Vec<String> = map
1549 .values()
1550 .filter(|p| {
1551 p.status == PersonaStatus::Active
1552 && !p.budget_paused
1553 && now.signed_duration_since(p.last_progress_at).num_seconds()
1554 > p.policy.idle_ttl_secs as i64
1555 })
1556 .map(|p| p.identity.id.clone())
1557 .collect();
1558
1559 for id in &idle_ids {
1560 if let Some(persona) = map.get_mut(id) {
1561 persona.status = PersonaStatus::Reaped;
1562 persona.updated_at = now;
1563 }
1564 let children: Vec<String> = map
1566 .values()
1567 .filter(|p| p.identity.parent_id.as_deref() == Some(id.as_str()))
1568 .map(|p| p.identity.id.clone())
1569 .collect();
1570 for child_id in children {
1571 if let Some(child) = map.get_mut(&child_id) {
1572 child.status = PersonaStatus::Reaped;
1573 child.updated_at = now;
1574 }
1575 }
1576 }
1577 drop(map);
1578
1579 for id in idle_ids {
1580 push_event_internal(
1581 &events,
1582 max_events,
1583 &event_tx,
1584 ThoughtEvent {
1585 id: Uuid::new_v4().to_string(),
1586 event_type: ThoughtEventType::IdleReaped,
1587 persona_id: Some(id),
1588 swarm_id: None,
1589 timestamp: now,
1590 payload: json!({ "reason": "idle_ttl_expired" }),
1591 },
1592 )
1593 .await;
1594 }
1595 }
1596
1597 if work_items.iter().any(|w| w.phase == ThoughtPhase::Compress) {
1599 let _ = persistence::save_state(
1600 &personas,
1601 &proposals,
1602 &beliefs,
1603 &attention_queue,
1604 &workspace,
1605 &events,
1606 &snapshots,
1607 )
1608 .await;
1609 }
1610
1611 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
1612 next_tick += interval;
1613 let tick_completed = Instant::now();
1614 if tick_completed > next_tick {
1615 next_tick = tick_completed;
1616 }
1617 }
1618 });
1619
1620 {
1621 let mut lock = self.loop_handle.lock().await;
1622 *lock = Some(handle);
1623 }
1624
1625 Ok(self.status().await)
1626 }
1627
1628 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
1630 self.running.store(false, Ordering::SeqCst);
1631
1632 if let Some(handle) = self.loop_handle.lock().await.take() {
1633 handle.abort();
1634 let _ = handle.await;
1635 }
1636
1637 if let Some(reason_value) = reason {
1638 let event = ThoughtEvent {
1639 id: Uuid::new_v4().to_string(),
1640 event_type: ThoughtEventType::CheckResult,
1641 persona_id: None,
1642 swarm_id: None,
1643 timestamp: Utc::now(),
1644 payload: json!({ "stopped": true, "reason": reason_value }),
1645 };
1646 self.push_event(event).await;
1647 }
1648
1649 Ok(self.status().await)
1650 }
1651
1652 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
1654 let now = Utc::now();
1655 let mut personas = self.personas.write().await;
1656
1657 let mut parent_swarm_id = None;
1658 let mut computed_depth = 0_u32;
1659 let mut inherited_policy = None;
1660
1661 if let Some(parent_id) = req.parent_id.clone() {
1662 let parent = personas
1663 .get(&parent_id)
1664 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
1665
1666 if parent.status == PersonaStatus::Reaped {
1667 return Err(anyhow!("Parent persona {} is reaped", parent_id));
1668 }
1669
1670 parent_swarm_id = parent.identity.swarm_id.clone();
1671 computed_depth = parent.identity.depth.saturating_add(1);
1672 inherited_policy = Some(parent.policy.clone());
1673 let branch_limit = parent.policy.max_branching_factor;
1674
1675 let child_count = personas
1676 .values()
1677 .filter(|p| {
1678 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
1679 && p.status != PersonaStatus::Reaped
1680 })
1681 .count();
1682
1683 if child_count as u32 >= branch_limit {
1684 return Err(anyhow!(
1685 "Parent {} reached branching limit {}",
1686 parent_id,
1687 branch_limit
1688 ));
1689 }
1690 }
1691
1692 let policy = req
1693 .policy
1694 .clone()
1695 .or(inherited_policy.clone())
1696 .unwrap_or_else(|| self.default_policy.clone());
1697
1698 let effective_depth_limit = inherited_policy
1699 .as_ref()
1700 .map(|p| p.max_spawn_depth)
1701 .unwrap_or(policy.max_spawn_depth);
1702
1703 if computed_depth > effective_depth_limit {
1704 return Err(anyhow!(
1705 "Spawn depth {} exceeds limit {}",
1706 computed_depth,
1707 effective_depth_limit
1708 ));
1709 }
1710
1711 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
1712 if personas.contains_key(&persona_id) {
1713 return Err(anyhow!("Persona id already exists: {}", persona_id));
1714 }
1715
1716 let identity = PersonaIdentity {
1717 id: persona_id.clone(),
1718 name: req.name,
1719 role: req.role,
1720 charter: req.charter,
1721 swarm_id: req.swarm_id.or(parent_swarm_id),
1722 parent_id: req.parent_id,
1723 depth: computed_depth,
1724 created_at: now,
1725 tags: req.tags,
1726 };
1727
1728 let persona = PersonaRuntimeState {
1729 identity,
1730 policy,
1731 status: PersonaStatus::Active,
1732 thought_count: 0,
1733 last_tick_at: None,
1734 updated_at: now,
1735 tokens_this_window: 0,
1736 compute_ms_this_window: 0,
1737 window_started_at: now,
1738 last_progress_at: now,
1739 budget_paused: false,
1740 };
1741
1742 personas.insert(persona_id, persona.clone());
1743 drop(personas);
1744
1745 self.push_event(ThoughtEvent {
1746 id: Uuid::new_v4().to_string(),
1747 event_type: ThoughtEventType::PersonaSpawned,
1748 persona_id: Some(persona.identity.id.clone()),
1749 swarm_id: persona.identity.swarm_id.clone(),
1750 timestamp: now,
1751 payload: json!({
1752 "name": persona.identity.name,
1753 "role": persona.identity.role,
1754 "depth": persona.identity.depth,
1755 }),
1756 })
1757 .await;
1758
1759 Ok(persona)
1760 }
1761
1762 pub async fn spawn_child(
1764 &self,
1765 parent_id: &str,
1766 req: SpawnPersonaRequest,
1767 ) -> Result<PersonaRuntimeState> {
1768 let request = CreatePersonaRequest {
1769 persona_id: req.persona_id,
1770 name: req.name,
1771 role: req.role,
1772 charter: req.charter,
1773 swarm_id: req.swarm_id,
1774 parent_id: Some(parent_id.to_string()),
1775 policy: req.policy,
1776 tags: Vec::new(),
1777 };
1778 self.create_persona(request).await
1779 }
1780
1781 pub async fn reap_persona(
1783 &self,
1784 persona_id: &str,
1785 req: ReapPersonaRequest,
1786 ) -> Result<ReapPersonaResponse> {
1787 let cascade = req.cascade.unwrap_or(false);
1788 let now = Utc::now();
1789
1790 let mut personas = self.personas.write().await;
1791 if !personas.contains_key(persona_id) {
1792 return Err(anyhow!("Persona not found: {}", persona_id));
1793 }
1794
1795 let mut reaped_ids = vec![persona_id.to_string()];
1796 if cascade {
1797 let mut idx = 0usize;
1798 while idx < reaped_ids.len() {
1799 let current = reaped_ids[idx].clone();
1800 let children: Vec<String> = personas
1801 .values()
1802 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
1803 .map(|p| p.identity.id.clone())
1804 .collect();
1805 for child in children {
1806 if !reaped_ids.iter().any(|existing| existing == &child) {
1807 reaped_ids.push(child);
1808 }
1809 }
1810 idx += 1;
1811 }
1812 }
1813
1814 for id in &reaped_ids {
1815 if let Some(persona) = personas.get_mut(id) {
1816 persona.status = PersonaStatus::Reaped;
1817 persona.updated_at = now;
1818 }
1819 }
1820 drop(personas);
1821
1822 for id in &reaped_ids {
1823 self.push_event(ThoughtEvent {
1824 id: Uuid::new_v4().to_string(),
1825 event_type: ThoughtEventType::PersonaReaped,
1826 persona_id: Some(id.clone()),
1827 swarm_id: None,
1828 timestamp: now,
1829 payload: json!({
1830 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
1831 "cascade": cascade,
1832 }),
1833 })
1834 .await;
1835 }
1836
1837 Ok(ReapPersonaResponse {
1838 count: reaped_ids.len(),
1839 reaped_ids,
1840 })
1841 }
1842
1843 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
1845 self.snapshots.read().await.back().cloned()
1846 }
1847
1848 pub async fn lineage_graph(&self) -> LineageGraph {
1850 let personas = self.personas.read().await;
1851 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
1852 let mut roots = Vec::new();
1853 let mut total_edges = 0usize;
1854
1855 for persona in personas.values() {
1856 if let Some(parent_id) = persona.identity.parent_id.clone() {
1857 children_by_parent
1858 .entry(parent_id)
1859 .or_default()
1860 .push(persona.identity.id.clone());
1861 total_edges = total_edges.saturating_add(1);
1862 } else {
1863 roots.push(persona.identity.id.clone());
1864 }
1865 }
1866
1867 let mut nodes: Vec<LineageNode> = personas
1868 .values()
1869 .map(|persona| {
1870 let mut children = children_by_parent
1871 .get(&persona.identity.id)
1872 .cloned()
1873 .unwrap_or_default();
1874 children.sort();
1875
1876 LineageNode {
1877 persona_id: persona.identity.id.clone(),
1878 parent_id: persona.identity.parent_id.clone(),
1879 children,
1880 depth: persona.identity.depth,
1881 status: persona.status,
1882 }
1883 })
1884 .collect();
1885
1886 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
1887 roots.sort();
1888
1889 LineageGraph {
1890 nodes,
1891 roots,
1892 total_edges,
1893 }
1894 }
1895
1896 pub async fn status(&self) -> CognitionStatus {
1898 let personas = self.personas.read().await;
1899 let events = self.events.read().await;
1900 let snapshots = self.snapshots.read().await;
1901 let started_at = *self.started_at.read().await;
1902 let last_tick_at = *self.last_tick_at.read().await;
1903 let loop_interval_ms = *self.loop_interval_ms.read().await;
1904
1905 let active_persona_count = personas
1906 .values()
1907 .filter(|p| p.status == PersonaStatus::Active)
1908 .count();
1909
1910 CognitionStatus {
1911 enabled: self.enabled,
1912 running: self.running.load(Ordering::SeqCst),
1913 loop_interval_ms,
1914 started_at,
1915 last_tick_at,
1916 persona_count: personas.len(),
1917 active_persona_count,
1918 events_buffered: events.len(),
1919 snapshots_buffered: snapshots.len(),
1920 }
1921 }
1922
1923 async fn push_event(&self, event: ThoughtEvent) {
1924 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1925 }
1926
1927 pub fn set_tools(&mut self, registry: Arc<ToolRegistry>) {
1929 self.tools = Some(registry);
1930 }
1931
1932 pub async fn get_beliefs(&self) -> HashMap<String, Belief> {
1934 self.beliefs.read().await.clone()
1935 }
1936
1937 pub async fn get_belief(&self, id: &str) -> Option<Belief> {
1939 self.beliefs.read().await.get(id).cloned()
1940 }
1941
1942 pub async fn get_attention_queue(&self) -> Vec<AttentionItem> {
1944 self.attention_queue.read().await.clone()
1945 }
1946
1947 pub async fn get_proposals(&self) -> HashMap<String, Proposal> {
1949 self.proposals.read().await.clone()
1950 }
1951
1952 pub async fn get_workspace(&self) -> GlobalWorkspace {
1954 self.workspace.read().await.clone()
1955 }
1956
1957 pub async fn get_receipts(&self) -> Vec<executor::DecisionReceipt> {
1959 self.receipts.read().await.clone()
1960 }
1961
1962 pub async fn approve_proposal(&self, proposal_id: &str) -> Result<()> {
1964 let proposals = self.proposals.read().await;
1965 let proposal = proposals
1966 .get(proposal_id)
1967 .ok_or_else(|| anyhow!("Proposal not found: {}", proposal_id))?;
1968
1969 if proposal.risk != ProposalRisk::Critical {
1970 return Err(anyhow!("Only Critical proposals require human approval"));
1971 }
1972 if proposal.status != ProposalStatus::Verified {
1973 return Err(anyhow!("Proposal is not in Verified status"));
1974 }
1975 drop(proposals);
1976
1977 let mut approvals = self.pending_approvals.write().await;
1978 approvals.insert(proposal_id.to_string(), true);
1979 Ok(())
1980 }
1981
1982 pub async fn get_governance(&self) -> SwarmGovernance {
1984 self.governance.read().await.clone()
1985 }
1986
1987 pub async fn get_persona(&self, id: &str) -> Option<PersonaRuntimeState> {
1989 self.personas.read().await.get(id).cloned()
1990 }
1991}
1992
1993async fn push_event_internal(
1994 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1995 max_events: usize,
1996 event_tx: &broadcast::Sender<ThoughtEvent>,
1997 event: ThoughtEvent,
1998) {
1999 {
2000 let mut lock = events.write().await;
2001 lock.push_back(event.clone());
2002 while lock.len() > max_events {
2003 lock.pop_front();
2004 }
2005 }
2006 let _ = event_tx.send(event);
2007}
2008
2009async fn push_snapshot_internal(
2010 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
2011 max_snapshots: usize,
2012 snapshot: MemorySnapshot,
2013) {
2014 let mut lock = snapshots.write().await;
2015 lock.push_back(snapshot);
2016 while lock.len() > max_snapshots {
2017 lock.pop_front();
2018 }
2019}
2020
2021async fn recent_persona_context(
2022 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
2023 persona_id: &str,
2024 limit: usize,
2025) -> Vec<ThoughtEvent> {
2026 let lock = events.read().await;
2027 let mut selected: Vec<ThoughtEvent> = lock
2028 .iter()
2029 .rev()
2030 .filter(|event| {
2031 event.persona_id.as_deref() == Some(persona_id)
2032 || (event.persona_id.is_none()
2033 && matches!(
2034 event.event_type,
2035 ThoughtEventType::CheckResult
2036 | ThoughtEventType::ProposalCreated
2037 | ThoughtEventType::SnapshotCompressed
2038 | ThoughtEventType::WorkspaceUpdated
2039 | ThoughtEventType::ActionExecuted
2040 | ThoughtEventType::BudgetPaused
2041 ))
2042 })
2043 .take(limit)
2044 .cloned()
2045 .collect();
2046 selected.reverse();
2047 selected
2048}
2049
2050async fn generate_phase_thought(
2051 thinker: Option<&ThinkerClient>,
2052 work: &ThoughtWorkItem,
2053 context: &[ThoughtEvent],
2054) -> ThoughtResult {
2055 let started_at = Instant::now();
2056 if let Some(client) = thinker {
2057 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
2058 match client.think(&system_prompt, &user_prompt).await {
2059 Ok(output) => {
2060 let thinking = normalize_thought_output(work, context, &output.text);
2061 if !thinking.is_empty() {
2062 return ThoughtResult {
2063 source: "model",
2064 model: Some(output.model),
2065 finish_reason: output.finish_reason,
2066 thinking,
2067 prompt_tokens: output.prompt_tokens,
2068 completion_tokens: output.completion_tokens,
2069 total_tokens: output.total_tokens,
2070 latency_ms: started_at.elapsed().as_millis(),
2071 error: None,
2072 };
2073 }
2074 }
2075 Err(error) => {
2076 return ThoughtResult {
2077 source: "fallback",
2078 model: None,
2079 finish_reason: None,
2080 thinking: fallback_phase_text(work, context),
2081 prompt_tokens: None,
2082 completion_tokens: None,
2083 total_tokens: None,
2084 latency_ms: started_at.elapsed().as_millis(),
2085 error: Some(error.to_string()),
2086 };
2087 }
2088 }
2089 }
2090
2091 ThoughtResult {
2092 source: "fallback",
2093 model: None,
2094 finish_reason: None,
2095 thinking: fallback_phase_text(work, context),
2096 prompt_tokens: None,
2097 completion_tokens: None,
2098 total_tokens: None,
2099 latency_ms: started_at.elapsed().as_millis(),
2100 error: None,
2101 }
2102}
2103
2104fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
2105 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
2106Respond with concise plain text only. Do not include markdown, XML, or code fences. \
2107Write as an operational process update, not meta narration. \
2108Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
2109Output concrete findings, checks, risks, and next actions. \
2110Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
2111 .to_string();
2112
2113 let context_lines = if context.is_empty() {
2114 "none".to_string()
2115 } else {
2116 context
2117 .iter()
2118 .map(format_context_event)
2119 .collect::<Vec<_>>()
2120 .join("\n")
2121 };
2122
2123 let phase_instruction = match work.phase {
2124 ThoughtPhase::Observe => {
2125 "Process format (exact line labels): \
2126Phase: Observe | Goal: detect current customer/business risk | \
2127Signals: 1-3 concrete signals separated by '; ' | \
2128Uncertainty: one unknown that blocks confidence | \
2129Next_Action: one immediate operational action."
2130 }
2131 ThoughtPhase::Reflect => {
2132 "Process format (exact line labels): \
2133Phase: Reflect | Hypothesis: single testable hypothesis | \
2134Rationale: why this is likely | \
2135Business_Risk: customer/revenue/SLA impact | \
2136Validation_Next_Action: one action to confirm or falsify."
2137 }
2138 ThoughtPhase::Test => {
2139 "Process format (exact line labels): \
2140Phase: Test | Check: single concrete check | \
2141Procedure: short executable procedure | \
2142Expected_Result: pass/fail expectation | \
2143Evidence_Quality: low|medium|high with reason | \
2144Escalation_Trigger: when to escalate immediately."
2145 }
2146 ThoughtPhase::Compress => {
2147 "Process format (exact line labels): \
2148Phase: Compress | State_Summary: current state in one line | \
2149Retained_Facts: 3 short facts separated by '; ' | \
2150Open_Risks: up to 2 unresolved risks separated by '; ' | \
2151Next_Process_Step: next operational step."
2152 }
2153 };
2154
2155 let user_prompt = format!(
2156 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
2157 phase = work.phase.as_str(),
2158 persona_id = work.persona_id,
2159 persona_name = work.persona_name,
2160 role = work.role,
2161 charter = work.charter,
2162 thought_count = work.thought_count,
2163 context = context_lines,
2164 instruction = phase_instruction
2165 );
2166
2167 (system_prompt, user_prompt)
2168}
2169
2170fn format_context_event(event: &ThoughtEvent) -> String {
2171 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
2172 format!(
2173 "{} {} {}",
2174 event.event_type.as_str(),
2175 event.timestamp.to_rfc3339(),
2176 trim_for_storage(&payload, 220)
2177 )
2178}
2179
2180fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
2181 let charter = trim_for_storage(&work.charter, 180);
2182 let context_summary = fallback_context_summary(context);
2183 let thought = match work.phase {
2184 ThoughtPhase::Observe => format!(
2185 "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
2186 work.role, charter, context_summary
2187 ),
2188 ThoughtPhase::Reflect => "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.".to_string(),
2189 ThoughtPhase::Test => "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.".to_string(),
2190 ThoughtPhase::Compress => format!(
2191 "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
2192 work.role, charter, context_summary
2193 ),
2194 };
2195 trim_for_storage(&thought, 1_200)
2196}
2197
2198fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
2199 let trimmed = trim_for_storage(raw, 2_000);
2200 if trimmed.trim().is_empty() {
2201 return fallback_phase_text(work, context);
2202 }
2203
2204 if let Some(idx) = find_process_label_start(&trimmed) {
2206 let candidate = trimmed[idx..].trim();
2207 if !candidate.is_empty()
2209 && !candidate.contains('<')
2210 && !has_template_placeholder_values(candidate)
2211 {
2212 let collapsed: String = candidate
2214 .lines()
2215 .map(str::trim)
2216 .filter(|l| !l.is_empty())
2217 .collect::<Vec<_>>()
2218 .join(" | ");
2219 let cleaned = collapsed.trim_matches('"').trim_matches('\'').trim();
2220 if cleaned.starts_with("Phase:") {
2221 return cleaned.to_string();
2222 }
2223 return collapsed;
2224 }
2225 }
2226
2227 let lower = trimmed.to_ascii_lowercase();
2228 let looks_meta = lower.starts_with("we need")
2229 || lower.starts_with("i need")
2230 || lower.contains("we need to")
2231 || lower.contains("i need to")
2232 || lower.contains("must output")
2233 || lower.contains("let's ")
2234 || lower.contains("we have to");
2235
2236 if looks_meta || has_template_placeholder_values(&trimmed) {
2237 return fallback_phase_text(work, context);
2238 }
2239
2240 trimmed
2241}
2242
2243fn has_template_placeholder_values(text: &str) -> bool {
2244 let lower = text.to_ascii_lowercase();
2245 [
2246 "goal: ...",
2247 "signals: ...",
2248 "uncertainty: ...",
2249 "next_action: ...",
2250 "hypothesis: ...",
2251 "rationale: ...",
2252 "business_risk: ...",
2253 "validation_next_action: ...",
2254 "check: ...",
2255 "procedure: ...",
2256 "expected_result: ...",
2257 "evidence_quality: ...",
2258 "escalation_trigger: ...",
2259 "state_summary: ...",
2260 "retained_facts: ...",
2261 "open_risks: ...",
2262 "next_process_step: ...",
2263 ]
2264 .iter()
2265 .any(|needle| lower.contains(needle))
2266 || lower.contains("<...")
2267 || lower.contains("tbd")
2268 || lower.contains("todo")
2269}
2270
2271fn find_process_label_start(text: &str) -> Option<usize> {
2272 [
2273 "Phase: Observe",
2274 "Phase: Reflect",
2275 "Phase: Test",
2276 "Phase: Compress",
2277 "Phase:",
2278 ]
2279 .iter()
2280 .filter_map(|label| text.find(label))
2281 .min()
2282}
2283
2284fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
2285 if context.is_empty() {
2286 return "No prior events recorded yet.".to_string();
2287 }
2288
2289 let mut latest_error: Option<String> = None;
2290 let mut latest_proposal: Option<String> = None;
2291 let mut latest_check: Option<String> = None;
2292
2293 for event in context.iter().rev() {
2294 if latest_error.is_none()
2295 && let Some(error) = event
2296 .payload
2297 .get("error")
2298 .and_then(serde_json::Value::as_str)
2299 && !error.trim().is_empty()
2300 {
2301 latest_error = Some(trim_for_storage(error, 140));
2302 }
2303
2304 if latest_proposal.is_none()
2305 && event.event_type == ThoughtEventType::ProposalCreated
2306 && let Some(title) = event
2307 .payload
2308 .get("title")
2309 .and_then(serde_json::Value::as_str)
2310 && !title.trim().is_empty()
2311 && !has_template_placeholder_values(title)
2312 {
2313 latest_proposal = Some(trim_for_storage(title, 120));
2314 }
2315
2316 if latest_check.is_none()
2317 && event.event_type == ThoughtEventType::CheckResult
2318 && let Some(result) = event
2319 .payload
2320 .get("result_excerpt")
2321 .and_then(serde_json::Value::as_str)
2322 && !result.trim().is_empty()
2323 && !has_template_placeholder_values(result)
2324 {
2325 latest_check = Some(trim_for_storage(result, 140));
2326 }
2327
2328 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
2329 break;
2330 }
2331 }
2332
2333 let mut lines = vec![format!(
2334 "{} recent cognition events are available.",
2335 context.len()
2336 )];
2337 if let Some(error) = latest_error {
2338 lines.push(format!("Latest error signal: {}.", error));
2339 }
2340 if let Some(proposal) = latest_proposal {
2341 lines.push(format!("Recent proposal: {}.", proposal));
2342 }
2343 if let Some(check) = latest_check {
2344 lines.push(format!("Recent check: {}.", check));
2345 }
2346 lines.join(" ")
2347}
2348
2349fn trim_for_storage(input: &str, max_chars: usize) -> String {
2350 if input.chars().count() <= max_chars {
2351 return input.trim().to_string();
2352 }
2353 let mut trimmed = String::with_capacity(max_chars + 8);
2354 for ch in input.chars().take(max_chars) {
2355 trimmed.push(ch);
2356 }
2357 trimmed.push_str("...");
2358 trimmed.trim().to_string()
2359}
2360
2361fn estimate_fact_count(text: &str) -> usize {
2362 let sentence_count =
2363 text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
2364 sentence_count.clamp(1, 12)
2365}
2366
2367fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
2368 let first_line = thought
2369 .lines()
2370 .find(|line| !line.trim().is_empty())
2371 .unwrap_or("proposal");
2372 let compact = first_line
2373 .replace(['\t', '\r', '\n'], " ")
2374 .split_whitespace()
2375 .collect::<Vec<_>>()
2376 .join(" ");
2377 let trimmed = trim_for_storage(&compact, 72);
2378 if trimmed.is_empty() {
2379 format!("proposal-{}", thought_count)
2380 } else {
2381 trimmed
2382 }
2383}
2384
2385fn default_seed_persona() -> CreatePersonaRequest {
2386 CreatePersonaRequest {
2387 persona_id: Some("root-thinker".to_string()),
2388 name: "root-thinker".to_string(),
2389 role: "orchestrator".to_string(),
2390 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
2391 .to_string(),
2392 swarm_id: Some("swarm-core".to_string()),
2393 parent_id: None,
2394 policy: None,
2395 tags: vec!["orchestration".to_string()],
2396 }
2397}
2398
2399fn normalize_thinker_endpoint(base_url: &str) -> String {
2400 let trimmed = base_url.trim().trim_end_matches('/');
2401 if trimmed.ends_with("/chat/completions") {
2402 return trimmed.to_string();
2403 }
2404 if trimmed.is_empty() {
2405 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
2406 }
2407 format!("{}/chat/completions", trimmed)
2408}
2409
2410fn env_bool(name: &str, default: bool) -> bool {
2411 std::env::var(name)
2412 .ok()
2413 .and_then(|v| match v.to_ascii_lowercase().as_str() {
2414 "1" | "true" | "yes" | "on" => Some(true),
2415 "0" | "false" | "no" | "off" => Some(false),
2416 _ => None,
2417 })
2418 .unwrap_or(default)
2419}
2420
2421fn env_f32(name: &str, default: f32) -> f32 {
2422 std::env::var(name)
2423 .ok()
2424 .and_then(|v| v.parse::<f32>().ok())
2425 .unwrap_or(default)
2426}
2427
2428fn env_u64(name: &str, default: u64) -> u64 {
2429 std::env::var(name)
2430 .ok()
2431 .and_then(|v| v.parse::<u64>().ok())
2432 .unwrap_or(default)
2433}
2434
2435fn env_u32(name: &str, default: u32) -> u32 {
2436 std::env::var(name)
2437 .ok()
2438 .and_then(|v| v.parse::<u32>().ok())
2439 .unwrap_or(default)
2440}
2441
2442fn env_usize(name: &str, default: usize) -> usize {
2443 std::env::var(name)
2444 .ok()
2445 .and_then(|v| v.parse::<usize>().ok())
2446 .unwrap_or(default)
2447}
2448
2449#[cfg(test)]
2450mod tests {
2451 use super::*;
2452
2453 fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
2454 ThoughtWorkItem {
2455 persona_id: "p-1".to_string(),
2456 persona_name: "Spotlessbinco Business Thinker".to_string(),
2457 role: "principal reliability engineer".to_string(),
2458 charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
2459 .to_string(),
2460 swarm_id: Some("spotlessbinco".to_string()),
2461 thought_count: 4,
2462 phase,
2463 }
2464 }
2465
2466 fn test_runtime() -> CognitionRuntime {
2467 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2468 enabled: true,
2469 loop_interval_ms: 25,
2470 max_events: 256,
2471 max_snapshots: 32,
2472 default_policy: PersonaPolicy {
2473 max_spawn_depth: 2,
2474 max_branching_factor: 2,
2475 token_budget_per_minute: 1_000,
2476 compute_ms_per_minute: 1_000,
2477 idle_ttl_secs: 300,
2478 share_memory: false,
2479 allowed_tools: Vec::new(),
2480 },
2481 })
2482 }
2483
2484 #[test]
2485 fn normalize_rejects_placeholder_process_line() {
2486 let work = sample_work_item(ThoughtPhase::Compress);
2487 let output = normalize_thought_output(
2488 &work,
2489 &[],
2490 "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
2491 );
2492 assert!(
2493 output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
2494 );
2495 assert!(!output.contains("State_Summary: ..."));
2496 }
2497
2498 #[test]
2499 fn normalize_accepts_concrete_process_line() {
2500 let work = sample_work_item(ThoughtPhase::Test);
2501 let output = normalize_thought_output(
2502 &work,
2503 &[],
2504 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
2505 );
2506 assert_eq!(
2507 output,
2508 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
2509 );
2510 }
2511
2512 #[tokio::test]
2513 async fn create_spawn_and_lineage_work() {
2514 let runtime = test_runtime();
2515
2516 let root = runtime
2517 .create_persona(CreatePersonaRequest {
2518 persona_id: Some("root".to_string()),
2519 name: "root".to_string(),
2520 role: "orchestrator".to_string(),
2521 charter: "coordinate".to_string(),
2522 swarm_id: Some("swarm-a".to_string()),
2523 parent_id: None,
2524 policy: None,
2525 tags: Vec::new(),
2526 })
2527 .await
2528 .expect("root should be created");
2529
2530 assert_eq!(root.identity.depth, 0);
2531
2532 let child = runtime
2533 .spawn_child(
2534 "root",
2535 SpawnPersonaRequest {
2536 persona_id: Some("child-1".to_string()),
2537 name: "child-1".to_string(),
2538 role: "analyst".to_string(),
2539 charter: "analyze".to_string(),
2540 swarm_id: None,
2541 policy: None,
2542 },
2543 )
2544 .await
2545 .expect("child should spawn");
2546
2547 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
2548 assert_eq!(child.identity.depth, 1);
2549
2550 let lineage = runtime.lineage_graph().await;
2551 assert_eq!(lineage.total_edges, 1);
2552 assert_eq!(lineage.roots, vec!["root".to_string()]);
2553 }
2554
2555 #[tokio::test]
2556 async fn branching_and_depth_limits_are_enforced() {
2557 let runtime = test_runtime();
2558
2559 runtime
2560 .create_persona(CreatePersonaRequest {
2561 persona_id: Some("root".to_string()),
2562 name: "root".to_string(),
2563 role: "orchestrator".to_string(),
2564 charter: "coordinate".to_string(),
2565 swarm_id: Some("swarm-a".to_string()),
2566 parent_id: None,
2567 policy: None,
2568 tags: Vec::new(),
2569 })
2570 .await
2571 .expect("root should be created");
2572
2573 runtime
2574 .spawn_child(
2575 "root",
2576 SpawnPersonaRequest {
2577 persona_id: Some("c1".to_string()),
2578 name: "c1".to_string(),
2579 role: "worker".to_string(),
2580 charter: "run".to_string(),
2581 swarm_id: None,
2582 policy: None,
2583 },
2584 )
2585 .await
2586 .expect("first child should spawn");
2587
2588 runtime
2589 .spawn_child(
2590 "root",
2591 SpawnPersonaRequest {
2592 persona_id: Some("c2".to_string()),
2593 name: "c2".to_string(),
2594 role: "worker".to_string(),
2595 charter: "run".to_string(),
2596 swarm_id: None,
2597 policy: None,
2598 },
2599 )
2600 .await
2601 .expect("second child should spawn");
2602
2603 let third_child = runtime
2604 .spawn_child(
2605 "root",
2606 SpawnPersonaRequest {
2607 persona_id: Some("c3".to_string()),
2608 name: "c3".to_string(),
2609 role: "worker".to_string(),
2610 charter: "run".to_string(),
2611 swarm_id: None,
2612 policy: None,
2613 },
2614 )
2615 .await;
2616 assert!(third_child.is_err());
2617
2618 runtime
2619 .spawn_child(
2620 "c1",
2621 SpawnPersonaRequest {
2622 persona_id: Some("c1-1".to_string()),
2623 name: "c1-1".to_string(),
2624 role: "worker".to_string(),
2625 charter: "run".to_string(),
2626 swarm_id: None,
2627 policy: None,
2628 },
2629 )
2630 .await
2631 .expect("depth 2 should be allowed");
2632
2633 let depth_violation = runtime
2634 .spawn_child(
2635 "c1-1",
2636 SpawnPersonaRequest {
2637 persona_id: Some("c1-1-1".to_string()),
2638 name: "c1-1-1".to_string(),
2639 role: "worker".to_string(),
2640 charter: "run".to_string(),
2641 swarm_id: None,
2642 policy: None,
2643 },
2644 )
2645 .await;
2646 assert!(depth_violation.is_err());
2647 }
2648
2649 #[tokio::test]
2650 async fn start_stop_updates_runtime_status() {
2651 let runtime = test_runtime();
2652
2653 runtime
2654 .start(Some(StartCognitionRequest {
2655 loop_interval_ms: Some(10),
2656 seed_persona: Some(CreatePersonaRequest {
2657 persona_id: Some("seed".to_string()),
2658 name: "seed".to_string(),
2659 role: "watcher".to_string(),
2660 charter: "observe".to_string(),
2661 swarm_id: Some("swarm-seed".to_string()),
2662 parent_id: None,
2663 policy: None,
2664 tags: Vec::new(),
2665 }),
2666 }))
2667 .await
2668 .expect("runtime should start");
2669
2670 tokio::time::sleep(Duration::from_millis(60)).await;
2671 let running_status = runtime.status().await;
2672 assert!(running_status.running);
2673 assert!(running_status.events_buffered > 0);
2674
2675 runtime
2676 .stop(Some("test".to_string()))
2677 .await
2678 .expect("runtime should stop");
2679 let stopped_status = runtime.status().await;
2680 assert!(!stopped_status.running);
2681 }
2682
2683 #[tokio::test]
2684 async fn zero_budget_persona_is_skipped() {
2685 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2686 enabled: true,
2687 loop_interval_ms: 10,
2688 max_events: 256,
2689 max_snapshots: 32,
2690 default_policy: PersonaPolicy {
2691 max_spawn_depth: 2,
2692 max_branching_factor: 2,
2693 token_budget_per_minute: 0,
2694 compute_ms_per_minute: 0,
2695 idle_ttl_secs: 3_600,
2696 share_memory: false,
2697 allowed_tools: Vec::new(),
2698 },
2699 });
2700
2701 let persona = runtime
2702 .create_persona(CreatePersonaRequest {
2703 persona_id: Some("budget-test".to_string()),
2704 name: "budget-test".to_string(),
2705 role: "tester".to_string(),
2706 charter: "test budgets".to_string(),
2707 swarm_id: None,
2708 parent_id: None,
2709 policy: None,
2710 tags: Vec::new(),
2711 })
2712 .await
2713 .expect("should create persona");
2714
2715 assert_eq!(persona.tokens_this_window, 0);
2716 assert_eq!(persona.compute_ms_this_window, 0);
2717
2718 runtime.start(None).await.expect("should start");
2720 tokio::time::sleep(Duration::from_millis(50)).await;
2721 runtime.stop(None).await.expect("should stop");
2722
2723 let p = runtime.get_persona("budget-test").await.unwrap();
2725 assert_eq!(p.thought_count, 0);
2726 assert!(p.budget_paused);
2727 }
2728
2729 #[tokio::test]
2730 async fn budget_counters_reset_after_window() {
2731 let now = Utc::now();
2732 let mut persona = PersonaRuntimeState {
2733 identity: PersonaIdentity {
2734 id: "p1".to_string(),
2735 name: "test".to_string(),
2736 role: "tester".to_string(),
2737 charter: "test".to_string(),
2738 swarm_id: None,
2739 parent_id: None,
2740 depth: 0,
2741 created_at: now,
2742 tags: Vec::new(),
2743 },
2744 policy: PersonaPolicy::default(),
2745 status: PersonaStatus::Active,
2746 thought_count: 0,
2747 last_tick_at: None,
2748 updated_at: now,
2749 tokens_this_window: 5000,
2750 compute_ms_this_window: 3000,
2751 window_started_at: now - ChronoDuration::seconds(61),
2752 last_progress_at: now,
2753 budget_paused: false,
2754 };
2755
2756 let window_elapsed = now
2758 .signed_duration_since(persona.window_started_at)
2759 .num_seconds();
2760 assert!(window_elapsed >= 60);
2761
2762 persona.tokens_this_window = 0;
2764 persona.compute_ms_this_window = 0;
2765 persona.window_started_at = now;
2766
2767 assert_eq!(persona.tokens_this_window, 0);
2768 assert_eq!(persona.compute_ms_this_window, 0);
2769 }
2770
2771 #[tokio::test]
2772 async fn idle_persona_is_reaped() {
2773 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2774 enabled: true,
2775 loop_interval_ms: 10,
2776 max_events: 256,
2777 max_snapshots: 32,
2778 default_policy: PersonaPolicy {
2779 max_spawn_depth: 2,
2780 max_branching_factor: 2,
2781 token_budget_per_minute: 20_000,
2782 compute_ms_per_minute: 10_000,
2783 idle_ttl_secs: 0, share_memory: false,
2785 allowed_tools: Vec::new(),
2786 },
2787 });
2788
2789 runtime
2790 .create_persona(CreatePersonaRequest {
2791 persona_id: Some("idle-test".to_string()),
2792 name: "idle-test".to_string(),
2793 role: "idler".to_string(),
2794 charter: "idle away".to_string(),
2795 swarm_id: None,
2796 parent_id: None,
2797 policy: None,
2798 tags: Vec::new(),
2799 })
2800 .await
2801 .expect("should create persona");
2802
2803 {
2805 let mut personas = runtime.personas.write().await;
2806 if let Some(p) = personas.get_mut("idle-test") {
2807 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2808 }
2809 }
2810
2811 runtime.start(None).await.expect("should start");
2812 tokio::time::sleep(Duration::from_millis(100)).await;
2813 runtime.stop(None).await.expect("should stop");
2814
2815 let p = runtime.get_persona("idle-test").await.unwrap();
2816 assert_eq!(p.status, PersonaStatus::Reaped);
2817 }
2818
2819 #[tokio::test]
2820 async fn budget_paused_persona_not_reaped_for_idle() {
2821 let runtime = CognitionRuntime::new_with_options(CognitionRuntimeOptions {
2822 enabled: true,
2823 loop_interval_ms: 10,
2824 max_events: 256,
2825 max_snapshots: 32,
2826 default_policy: PersonaPolicy {
2827 max_spawn_depth: 2,
2828 max_branching_factor: 2,
2829 token_budget_per_minute: 0, compute_ms_per_minute: 0,
2831 idle_ttl_secs: 0, share_memory: false,
2833 allowed_tools: Vec::new(),
2834 },
2835 });
2836
2837 runtime
2838 .create_persona(CreatePersonaRequest {
2839 persona_id: Some("paused-test".to_string()),
2840 name: "paused-test".to_string(),
2841 role: "pauser".to_string(),
2842 charter: "pause".to_string(),
2843 swarm_id: None,
2844 parent_id: None,
2845 policy: None,
2846 tags: Vec::new(),
2847 })
2848 .await
2849 .expect("should create persona");
2850
2851 {
2853 let mut personas = runtime.personas.write().await;
2854 if let Some(p) = personas.get_mut("paused-test") {
2855 p.last_progress_at = Utc::now() - ChronoDuration::seconds(10);
2856 }
2857 }
2858
2859 runtime.start(None).await.expect("should start");
2860 tokio::time::sleep(Duration::from_millis(100)).await;
2861 runtime.stop(None).await.expect("should stop");
2862
2863 let p = runtime.get_persona("paused-test").await.unwrap();
2864 assert_eq!(p.status, PersonaStatus::Active);
2866 assert!(p.budget_paused);
2867 }
2868
2869 #[tokio::test]
2870 async fn governance_proposal_resolution() {
2871 let runtime = test_runtime();
2872 let gov = SwarmGovernance {
2873 quorum_fraction: 0.5,
2874 required_approvers_by_role: HashMap::new(),
2875 veto_roles: vec!["auditor".to_string()],
2876 vote_timeout_secs: 300,
2877 };
2878 *runtime.governance.write().await = gov;
2879
2880 runtime
2882 .create_persona(CreatePersonaRequest {
2883 persona_id: Some("voter-1".to_string()),
2884 name: "voter-1".to_string(),
2885 role: "engineer".to_string(),
2886 charter: "vote".to_string(),
2887 swarm_id: None,
2888 parent_id: None,
2889 policy: None,
2890 tags: Vec::new(),
2891 })
2892 .await
2893 .unwrap();
2894 runtime
2895 .create_persona(CreatePersonaRequest {
2896 persona_id: Some("voter-2".to_string()),
2897 name: "voter-2".to_string(),
2898 role: "engineer".to_string(),
2899 charter: "vote".to_string(),
2900 swarm_id: None,
2901 parent_id: None,
2902 policy: None,
2903 tags: Vec::new(),
2904 })
2905 .await
2906 .unwrap();
2907
2908 {
2910 let mut proposals = runtime.proposals.write().await;
2911 let mut votes = HashMap::new();
2912 votes.insert("voter-1".to_string(), ProposalVote::Approve);
2913 proposals.insert(
2914 "prop-1".to_string(),
2915 Proposal {
2916 id: "prop-1".to_string(),
2917 persona_id: "voter-1".to_string(),
2918 title: "test proposal".to_string(),
2919 rationale: "testing governance".to_string(),
2920 evidence_refs: Vec::new(),
2921 risk: ProposalRisk::Low,
2922 status: ProposalStatus::Created,
2923 created_at: Utc::now(),
2924 votes,
2925 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
2926 votes_requested: true,
2927 quorum_needed: 1,
2928 },
2929 );
2930 }
2931
2932 runtime.start(None).await.unwrap();
2934 tokio::time::sleep(Duration::from_millis(100)).await;
2935 runtime.stop(None).await.unwrap();
2936
2937 let proposals = runtime.get_proposals().await;
2938 let prop = proposals.get("prop-1").unwrap();
2939 assert!(
2941 prop.status == ProposalStatus::Verified || prop.status == ProposalStatus::Executed,
2942 "Expected Verified or Executed, got {:?}",
2943 prop.status
2944 );
2945 }
2946
2947 #[tokio::test]
2948 async fn veto_rejects_proposal() {
2949 let runtime = test_runtime();
2950 let gov = SwarmGovernance {
2951 quorum_fraction: 0.5,
2952 required_approvers_by_role: HashMap::new(),
2953 veto_roles: vec!["auditor".to_string()],
2954 vote_timeout_secs: 300,
2955 };
2956 *runtime.governance.write().await = gov;
2957
2958 runtime
2959 .create_persona(CreatePersonaRequest {
2960 persona_id: Some("eng".to_string()),
2961 name: "eng".to_string(),
2962 role: "engineer".to_string(),
2963 charter: "build".to_string(),
2964 swarm_id: None,
2965 parent_id: None,
2966 policy: None,
2967 tags: Vec::new(),
2968 })
2969 .await
2970 .unwrap();
2971 runtime
2972 .create_persona(CreatePersonaRequest {
2973 persona_id: Some("aud".to_string()),
2974 name: "aud".to_string(),
2975 role: "auditor".to_string(),
2976 charter: "audit".to_string(),
2977 swarm_id: None,
2978 parent_id: None,
2979 policy: None,
2980 tags: Vec::new(),
2981 })
2982 .await
2983 .unwrap();
2984
2985 {
2986 let mut proposals = runtime.proposals.write().await;
2987 let mut votes = HashMap::new();
2988 votes.insert("eng".to_string(), ProposalVote::Approve);
2989 votes.insert("aud".to_string(), ProposalVote::Veto);
2990 proposals.insert(
2991 "prop-veto".to_string(),
2992 Proposal {
2993 id: "prop-veto".to_string(),
2994 persona_id: "eng".to_string(),
2995 title: "vetoed proposal".to_string(),
2996 rationale: "testing veto".to_string(),
2997 evidence_refs: Vec::new(),
2998 risk: ProposalRisk::Low,
2999 status: ProposalStatus::Created,
3000 created_at: Utc::now(),
3001 votes,
3002 vote_deadline: Some(Utc::now() + ChronoDuration::seconds(300)),
3003 votes_requested: true,
3004 quorum_needed: 1,
3005 },
3006 );
3007 }
3008
3009 runtime.start(None).await.unwrap();
3010 tokio::time::sleep(Duration::from_millis(100)).await;
3011 runtime.stop(None).await.unwrap();
3012
3013 let proposals = runtime.get_proposals().await;
3014 let prop = proposals.get("prop-veto").unwrap();
3015 assert_eq!(prop.status, ProposalStatus::Rejected);
3016 }
3017
3018 #[test]
3019 fn global_workspace_default() {
3020 let ws = GlobalWorkspace::default();
3021 assert!(ws.top_beliefs.is_empty());
3022 assert!(ws.top_uncertainties.is_empty());
3023 assert!(ws.top_attention.is_empty());
3024 }
3025
3026 #[test]
3027 fn attention_item_creation() {
3028 let item = AttentionItem {
3029 id: "a1".to_string(),
3030 topic: "test topic".to_string(),
3031 topic_tags: vec!["reliability".to_string()],
3032 priority: 0.8,
3033 source_type: AttentionSource::ContestedBelief,
3034 source_id: "b1".to_string(),
3035 assigned_persona: None,
3036 created_at: Utc::now(),
3037 resolved_at: None,
3038 };
3039 assert!(item.resolved_at.is_none());
3040 assert_eq!(item.priority, 0.8);
3041 }
3042}