1mod thinker;
9
10#[cfg(feature = "functiongemma")]
11pub mod tool_router;
12
13pub use thinker::{
14 CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
15};
16
17use anyhow::{Result, anyhow};
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::Duration;
25use tokio::sync::{Mutex, RwLock, broadcast};
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use uuid::Uuid;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum PersonaStatus {
34 Active,
35 Idle,
36 Reaped,
37 Error,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct PersonaPolicy {
43 pub max_spawn_depth: u32,
44 pub max_branching_factor: u32,
45 pub token_credits_per_minute: u32,
46 pub cpu_credits_per_minute: u32,
47 pub idle_ttl_secs: u64,
48 pub share_memory: bool,
49}
50
51impl Default for PersonaPolicy {
52 fn default() -> Self {
53 Self {
54 max_spawn_depth: 4,
55 max_branching_factor: 4,
56 token_credits_per_minute: 20_000,
57 cpu_credits_per_minute: 10_000,
58 idle_ttl_secs: 3_600,
59 share_memory: false,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct PersonaIdentity {
67 pub id: String,
68 pub name: String,
69 pub role: String,
70 pub charter: String,
71 pub swarm_id: Option<String>,
72 pub parent_id: Option<String>,
73 pub depth: u32,
74 pub created_at: DateTime<Utc>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct PersonaRuntimeState {
80 pub identity: PersonaIdentity,
81 pub policy: PersonaPolicy,
82 pub status: PersonaStatus,
83 pub thought_count: u64,
84 pub last_tick_at: Option<DateTime<Utc>>,
85 pub updated_at: DateTime<Utc>,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(rename_all = "snake_case")]
91pub enum ProposalRisk {
92 Low,
93 Medium,
94 High,
95 Critical,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub enum ProposalStatus {
102 Created,
103 Verified,
104 Rejected,
105 Executed,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct Proposal {
111 pub id: String,
112 pub persona_id: String,
113 pub title: String,
114 pub rationale: String,
115 pub evidence_refs: Vec<String>,
116 pub risk: ProposalRisk,
117 pub status: ProposalStatus,
118 pub created_at: DateTime<Utc>,
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
123#[serde(rename_all = "snake_case")]
124pub enum ThoughtEventType {
125 ThoughtGenerated,
126 HypothesisRaised,
127 CheckRequested,
128 CheckResult,
129 ProposalCreated,
130 ProposalVerified,
131 ProposalRejected,
132 ActionExecuted,
133 PersonaSpawned,
134 PersonaReaped,
135 SnapshotCompressed,
136}
137
138impl ThoughtEventType {
139 fn as_str(&self) -> &'static str {
140 match self {
141 Self::ThoughtGenerated => "thought_generated",
142 Self::HypothesisRaised => "hypothesis_raised",
143 Self::CheckRequested => "check_requested",
144 Self::CheckResult => "check_result",
145 Self::ProposalCreated => "proposal_created",
146 Self::ProposalVerified => "proposal_verified",
147 Self::ProposalRejected => "proposal_rejected",
148 Self::ActionExecuted => "action_executed",
149 Self::PersonaSpawned => "persona_spawned",
150 Self::PersonaReaped => "persona_reaped",
151 Self::SnapshotCompressed => "snapshot_compressed",
152 }
153 }
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ThoughtEvent {
159 pub id: String,
160 pub event_type: ThoughtEventType,
161 pub persona_id: Option<String>,
162 pub swarm_id: Option<String>,
163 pub timestamp: DateTime<Utc>,
164 pub payload: serde_json::Value,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct MemorySnapshot {
170 pub id: String,
171 pub generated_at: DateTime<Utc>,
172 pub swarm_id: Option<String>,
173 pub persona_scope: Vec<String>,
174 pub summary: String,
175 pub hot_event_count: usize,
176 pub warm_fact_count: usize,
177 pub cold_snapshot_count: usize,
178 pub metadata: HashMap<String, serde_json::Value>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct CreatePersonaRequest {
184 pub persona_id: Option<String>,
185 pub name: String,
186 pub role: String,
187 pub charter: String,
188 pub swarm_id: Option<String>,
189 pub parent_id: Option<String>,
190 pub policy: Option<PersonaPolicy>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct SpawnPersonaRequest {
196 pub persona_id: Option<String>,
197 pub name: String,
198 pub role: String,
199 pub charter: String,
200 pub swarm_id: Option<String>,
201 pub policy: Option<PersonaPolicy>,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct ReapPersonaRequest {
207 pub cascade: Option<bool>,
208 pub reason: Option<String>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct StartCognitionRequest {
214 pub loop_interval_ms: Option<u64>,
215 pub seed_persona: Option<CreatePersonaRequest>,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct StopCognitionRequest {
221 pub reason: Option<String>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct CognitionStatus {
227 pub enabled: bool,
228 pub running: bool,
229 pub loop_interval_ms: u64,
230 pub started_at: Option<DateTime<Utc>>,
231 pub last_tick_at: Option<DateTime<Utc>>,
232 pub persona_count: usize,
233 pub active_persona_count: usize,
234 pub events_buffered: usize,
235 pub snapshots_buffered: usize,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct ReapPersonaResponse {
241 pub reaped_ids: Vec<String>,
242 pub count: usize,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct LineageNode {
248 pub persona_id: String,
249 pub parent_id: Option<String>,
250 pub children: Vec<String>,
251 pub depth: u32,
252 pub status: PersonaStatus,
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct LineageGraph {
258 pub nodes: Vec<LineageNode>,
259 pub roots: Vec<String>,
260 pub total_edges: usize,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264enum ThoughtPhase {
265 Observe,
266 Reflect,
267 Test,
268 Compress,
269}
270
271impl ThoughtPhase {
272 fn from_thought_count(thought_count: u64) -> Self {
273 match thought_count % 4 {
274 1 => Self::Observe,
275 2 => Self::Reflect,
276 3 => Self::Test,
277 _ => Self::Compress,
278 }
279 }
280
281 fn as_str(&self) -> &'static str {
282 match self {
283 Self::Observe => "observe",
284 Self::Reflect => "reflect",
285 Self::Test => "test",
286 Self::Compress => "compress",
287 }
288 }
289
290 fn event_type(&self) -> ThoughtEventType {
291 match self {
292 Self::Observe => ThoughtEventType::ThoughtGenerated,
293 Self::Reflect => ThoughtEventType::HypothesisRaised,
294 Self::Test => ThoughtEventType::CheckRequested,
295 Self::Compress => ThoughtEventType::SnapshotCompressed,
296 }
297 }
298}
299
300#[derive(Debug, Clone)]
301struct ThoughtWorkItem {
302 persona_id: String,
303 persona_name: String,
304 role: String,
305 charter: String,
306 swarm_id: Option<String>,
307 thought_count: u64,
308 phase: ThoughtPhase,
309}
310
311#[derive(Debug, Clone)]
312struct ThoughtResult {
313 source: &'static str,
314 model: Option<String>,
315 finish_reason: Option<String>,
316 thinking: String,
317 prompt_tokens: Option<u32>,
318 completion_tokens: Option<u32>,
319 total_tokens: Option<u32>,
320 latency_ms: u128,
321 error: Option<String>,
322}
323
324#[derive(Debug, Clone)]
326pub struct CognitionRuntimeOptions {
327 pub enabled: bool,
328 pub loop_interval_ms: u64,
329 pub max_events: usize,
330 pub max_snapshots: usize,
331 pub default_policy: PersonaPolicy,
332}
333
334impl Default for CognitionRuntimeOptions {
335 fn default() -> Self {
336 Self {
337 enabled: false,
338 loop_interval_ms: 2_000,
339 max_events: 2_000,
340 max_snapshots: 128,
341 default_policy: PersonaPolicy::default(),
342 }
343 }
344}
345
346#[derive(Debug)]
348pub struct CognitionRuntime {
349 enabled: bool,
350 max_events: usize,
351 max_snapshots: usize,
352 default_policy: PersonaPolicy,
353 running: Arc<AtomicBool>,
354 loop_interval_ms: Arc<RwLock<u64>>,
355 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
356 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
357 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
358 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
359 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
360 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
361 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
362 event_tx: broadcast::Sender<ThoughtEvent>,
363 thinker: Option<Arc<ThinkerClient>>,
364}
365
366impl CognitionRuntime {
367 pub fn new_from_env() -> Self {
369 let mut options = CognitionRuntimeOptions::default();
370 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
371 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
372 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
373 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
374
375 options.default_policy = PersonaPolicy {
376 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
377 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
378 token_credits_per_minute: env_u32(
379 "CODETETHER_COGNITION_TOKEN_CREDITS_PER_MINUTE",
380 20_000,
381 ),
382 cpu_credits_per_minute: env_u32("CODETETHER_COGNITION_CPU_CREDITS_PER_MINUTE", 10_000),
383 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
384 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
385 };
386
387 let thinker_config = ThinkerConfig {
388 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
389 backend: thinker::ThinkerBackend::from_env(
390 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
391 .unwrap_or_else(|_| "openai_compat".to_string()),
392 ),
393 endpoint: normalize_thinker_endpoint(
394 &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
395 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
396 ),
397 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
398 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
399 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
400 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
401 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
402 .ok()
403 .and_then(|v| v.parse::<f32>().ok()),
404 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
405 timeout_ms: env_u64("CODETETHER_COGNITION_THINKER_TIMEOUT_MS", 12_000),
406 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
407 candle_tokenizer_path: std::env::var(
408 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
409 )
410 .ok(),
411 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
412 candle_device: thinker::CandleDevicePreference::from_env(
413 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
414 .unwrap_or_else(|_| "auto".to_string()),
415 ),
416 candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
417 candle_repeat_penalty: env_f32(
418 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
419 1.1,
420 ),
421 candle_repeat_last_n: env_usize(
422 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
423 64,
424 ),
425 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
426 };
427
428 Self::new_with_options_and_thinker(options, Some(thinker_config))
429 }
430
431 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
433 Self::new_with_options_and_thinker(options, None)
434 }
435
436 fn new_with_options_and_thinker(
437 options: CognitionRuntimeOptions,
438 thinker_config: Option<ThinkerConfig>,
439 ) -> Self {
440 let (event_tx, _) = broadcast::channel(options.max_events.max(16));
441 let thinker = thinker_config.and_then(|cfg| {
442 if !cfg.enabled {
443 return None;
444 }
445 match ThinkerClient::new(cfg) {
446 Ok(client) => {
447 tracing::info!(
448 backend = ?client.config().backend,
449 endpoint = %client.config().endpoint,
450 model = %client.config().model,
451 "Cognition thinker initialized"
452 );
453 Some(Arc::new(client))
454 }
455 Err(error) => {
456 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
457 None
458 }
459 }
460 });
461
462 Self {
463 enabled: options.enabled,
464 max_events: options.max_events.max(32),
465 max_snapshots: options.max_snapshots.max(8),
466 default_policy: options.default_policy,
467 running: Arc::new(AtomicBool::new(false)),
468 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
469 started_at: Arc::new(RwLock::new(None)),
470 last_tick_at: Arc::new(RwLock::new(None)),
471 personas: Arc::new(RwLock::new(HashMap::new())),
472 proposals: Arc::new(RwLock::new(HashMap::new())),
473 events: Arc::new(RwLock::new(VecDeque::new())),
474 snapshots: Arc::new(RwLock::new(VecDeque::new())),
475 loop_handle: Arc::new(Mutex::new(None)),
476 event_tx,
477 thinker,
478 }
479 }
480
481 pub fn is_enabled(&self) -> bool {
483 self.enabled
484 }
485
486 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
488 self.event_tx.subscribe()
489 }
490
491 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
493 if !self.enabled {
494 return Err(anyhow!(
495 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
496 ));
497 }
498
499 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
500 if let Some(request) = req {
501 if let Some(interval) = request.loop_interval_ms {
502 let mut lock = self.loop_interval_ms.write().await;
503 *lock = interval.max(100);
504 }
505 requested_seed_persona = request.seed_persona;
506 }
507
508 let has_personas = !self.personas.read().await.is_empty();
509 if !has_personas {
510 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
511 let _ = self.create_persona(seed).await?;
512 }
513
514 if self.running.load(Ordering::SeqCst) {
515 return Ok(self.status().await);
516 }
517
518 self.running.store(true, Ordering::SeqCst);
519 {
520 let mut started = self.started_at.write().await;
521 *started = Some(Utc::now());
522 }
523
524 let running = Arc::clone(&self.running);
525 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
526 let last_tick_at = Arc::clone(&self.last_tick_at);
527 let personas = Arc::clone(&self.personas);
528 let proposals = Arc::clone(&self.proposals);
529 let events = Arc::clone(&self.events);
530 let snapshots = Arc::clone(&self.snapshots);
531 let max_events = self.max_events;
532 let max_snapshots = self.max_snapshots;
533 let event_tx = self.event_tx.clone();
534 let thinker = self.thinker.clone();
535
536 let handle = tokio::spawn(async move {
537 let mut next_tick = Instant::now();
538 while running.load(Ordering::SeqCst) {
539 let now_instant = Instant::now();
540 if now_instant < next_tick {
541 tokio::time::sleep_until(next_tick).await;
542 }
543 if !running.load(Ordering::SeqCst) {
544 break;
545 }
546
547 let now = Utc::now();
548 {
549 let mut lock = last_tick_at.write().await;
550 *lock = Some(now);
551 }
552
553 let mut new_events = Vec::new();
554 let mut new_snapshots = Vec::new();
555 let mut new_proposals = Vec::new();
556
557 let work_items: Vec<ThoughtWorkItem> = {
558 let mut map = personas.write().await;
559 let mut items = Vec::new();
560 for persona in map.values_mut() {
561 if persona.status != PersonaStatus::Active {
562 continue;
563 }
564
565 persona.thought_count = persona.thought_count.saturating_add(1);
566 persona.last_tick_at = Some(now);
567 persona.updated_at = now;
568 items.push(ThoughtWorkItem {
569 persona_id: persona.identity.id.clone(),
570 persona_name: persona.identity.name.clone(),
571 role: persona.identity.role.clone(),
572 charter: persona.identity.charter.clone(),
573 swarm_id: persona.identity.swarm_id.clone(),
574 thought_count: persona.thought_count,
575 phase: ThoughtPhase::from_thought_count(persona.thought_count),
576 });
577 }
578 items
579 };
580
581 for work in work_items {
582 let context = recent_persona_context(&events, &work.persona_id, 8).await;
583 let thought = generate_phase_thought(thinker.as_deref(), &work, &context).await;
584
585 let event_timestamp = Utc::now();
586 new_events.push(ThoughtEvent {
587 id: Uuid::new_v4().to_string(),
588 event_type: work.phase.event_type(),
589 persona_id: Some(work.persona_id.clone()),
590 swarm_id: work.swarm_id.clone(),
591 timestamp: event_timestamp,
592 payload: json!({
593 "phase": work.phase.as_str(),
594 "thought_count": work.thought_count,
595 "persona": {
596 "id": work.persona_id.clone(),
597 "name": work.persona_name.clone(),
598 "role": work.role.clone(),
599 },
600 "context_event_count": context.len(),
601 "thinking": thought.thinking.clone(),
602 "source": thought.source,
603 "model": thought.model.clone(),
604 "finish_reason": thought.finish_reason.clone(),
605 "usage": {
606 "prompt_tokens": thought.prompt_tokens,
607 "completion_tokens": thought.completion_tokens,
608 "total_tokens": thought.total_tokens,
609 },
610 "latency_ms": thought.latency_ms,
611 "error": thought.error.clone(),
612 }),
613 });
614
615 if work.phase == ThoughtPhase::Test {
616 new_events.push(ThoughtEvent {
617 id: Uuid::new_v4().to_string(),
618 event_type: ThoughtEventType::CheckResult,
619 persona_id: Some(work.persona_id.clone()),
620 swarm_id: work.swarm_id.clone(),
621 timestamp: Utc::now(),
622 payload: json!({
623 "phase": work.phase.as_str(),
624 "thought_count": work.thought_count,
625 "result_excerpt": trim_for_storage(&thought.thinking, 280),
626 "source": thought.source,
627 "model": thought.model,
628 }),
629 });
630 }
631
632 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
633 let proposal = Proposal {
634 id: Uuid::new_v4().to_string(),
635 persona_id: work.persona_id.clone(),
636 title: proposal_title_from_thought(
637 &thought.thinking,
638 work.thought_count,
639 ),
640 rationale: trim_for_storage(&thought.thinking, 900),
641 evidence_refs: vec!["internal.thought_stream".to_string()],
642 risk: ProposalRisk::Low,
643 status: ProposalStatus::Created,
644 created_at: Utc::now(),
645 };
646
647 new_events.push(ThoughtEvent {
648 id: Uuid::new_v4().to_string(),
649 event_type: ThoughtEventType::ProposalCreated,
650 persona_id: Some(work.persona_id.clone()),
651 swarm_id: work.swarm_id.clone(),
652 timestamp: Utc::now(),
653 payload: json!({
654 "proposal_id": proposal.id,
655 "title": proposal.title,
656 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
657 "source": thought.source,
658 "model": thought.model,
659 }),
660 });
661
662 new_proposals.push(proposal);
663 }
664
665 if work.phase == ThoughtPhase::Compress {
666 new_snapshots.push(MemorySnapshot {
667 id: Uuid::new_v4().to_string(),
668 generated_at: Utc::now(),
669 swarm_id: work.swarm_id.clone(),
670 persona_scope: vec![work.persona_id.clone()],
671 summary: trim_for_storage(&thought.thinking, 1_500),
672 hot_event_count: context.len(),
673 warm_fact_count: estimate_fact_count(&thought.thinking),
674 cold_snapshot_count: 1,
675 metadata: HashMap::from([
676 (
677 "phase".to_string(),
678 serde_json::Value::String(work.phase.as_str().to_string()),
679 ),
680 (
681 "role".to_string(),
682 serde_json::Value::String(work.role.clone()),
683 ),
684 (
685 "source".to_string(),
686 serde_json::Value::String(thought.source.to_string()),
687 ),
688 (
689 "model".to_string(),
690 serde_json::Value::String(
691 thought
692 .model
693 .clone()
694 .unwrap_or_else(|| "fallback".to_string()),
695 ),
696 ),
697 (
698 "completion_tokens".to_string(),
699 serde_json::Value::Number(serde_json::Number::from(
700 thought.completion_tokens.unwrap_or(0) as u64,
701 )),
702 ),
703 ]),
704 });
705 }
706 }
707
708 if !new_proposals.is_empty() {
709 let mut proposal_store = proposals.write().await;
710 for proposal in new_proposals {
711 proposal_store.insert(proposal.id.clone(), proposal);
712 }
713 }
714
715 for event in new_events {
716 push_event_internal(&events, max_events, &event_tx, event).await;
717 }
718 for snapshot in new_snapshots {
719 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
720 }
721
722 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
723 next_tick += interval;
724 let tick_completed = Instant::now();
725 if tick_completed > next_tick {
726 next_tick = tick_completed;
727 }
728 }
729 });
730
731 {
732 let mut lock = self.loop_handle.lock().await;
733 *lock = Some(handle);
734 }
735
736 Ok(self.status().await)
737 }
738
739 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
741 self.running.store(false, Ordering::SeqCst);
742
743 if let Some(handle) = self.loop_handle.lock().await.take() {
744 handle.abort();
745 let _ = handle.await;
746 }
747
748 if let Some(reason_value) = reason {
749 let event = ThoughtEvent {
750 id: Uuid::new_v4().to_string(),
751 event_type: ThoughtEventType::CheckResult,
752 persona_id: None,
753 swarm_id: None,
754 timestamp: Utc::now(),
755 payload: json!({ "stopped": true, "reason": reason_value }),
756 };
757 self.push_event(event).await;
758 }
759
760 Ok(self.status().await)
761 }
762
763 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
765 let now = Utc::now();
766 let mut personas = self.personas.write().await;
767
768 let mut parent_swarm_id = None;
769 let mut computed_depth = 0_u32;
770 let mut inherited_policy = None;
771
772 if let Some(parent_id) = req.parent_id.clone() {
773 let parent = personas
774 .get(&parent_id)
775 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
776
777 if parent.status == PersonaStatus::Reaped {
778 return Err(anyhow!("Parent persona {} is reaped", parent_id));
779 }
780
781 parent_swarm_id = parent.identity.swarm_id.clone();
782 computed_depth = parent.identity.depth.saturating_add(1);
783 inherited_policy = Some(parent.policy.clone());
784 let branch_limit = parent.policy.max_branching_factor;
785
786 let child_count = personas
787 .values()
788 .filter(|p| {
789 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
790 && p.status != PersonaStatus::Reaped
791 })
792 .count();
793
794 if child_count as u32 >= branch_limit {
795 return Err(anyhow!(
796 "Parent {} reached branching limit {}",
797 parent_id,
798 branch_limit
799 ));
800 }
801 }
802
803 let policy = req
804 .policy
805 .clone()
806 .or(inherited_policy.clone())
807 .unwrap_or_else(|| self.default_policy.clone());
808
809 let effective_depth_limit = inherited_policy
810 .as_ref()
811 .map(|p| p.max_spawn_depth)
812 .unwrap_or(policy.max_spawn_depth);
813
814 if computed_depth > effective_depth_limit {
815 return Err(anyhow!(
816 "Spawn depth {} exceeds limit {}",
817 computed_depth,
818 effective_depth_limit
819 ));
820 }
821
822 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
823 if personas.contains_key(&persona_id) {
824 return Err(anyhow!("Persona id already exists: {}", persona_id));
825 }
826
827 let identity = PersonaIdentity {
828 id: persona_id.clone(),
829 name: req.name,
830 role: req.role,
831 charter: req.charter,
832 swarm_id: req.swarm_id.or(parent_swarm_id),
833 parent_id: req.parent_id,
834 depth: computed_depth,
835 created_at: now,
836 };
837
838 let persona = PersonaRuntimeState {
839 identity,
840 policy,
841 status: PersonaStatus::Active,
842 thought_count: 0,
843 last_tick_at: None,
844 updated_at: now,
845 };
846
847 personas.insert(persona_id, persona.clone());
848 drop(personas);
849
850 self.push_event(ThoughtEvent {
851 id: Uuid::new_v4().to_string(),
852 event_type: ThoughtEventType::PersonaSpawned,
853 persona_id: Some(persona.identity.id.clone()),
854 swarm_id: persona.identity.swarm_id.clone(),
855 timestamp: now,
856 payload: json!({
857 "name": persona.identity.name,
858 "role": persona.identity.role,
859 "depth": persona.identity.depth,
860 }),
861 })
862 .await;
863
864 Ok(persona)
865 }
866
867 pub async fn spawn_child(
869 &self,
870 parent_id: &str,
871 req: SpawnPersonaRequest,
872 ) -> Result<PersonaRuntimeState> {
873 let request = CreatePersonaRequest {
874 persona_id: req.persona_id,
875 name: req.name,
876 role: req.role,
877 charter: req.charter,
878 swarm_id: req.swarm_id,
879 parent_id: Some(parent_id.to_string()),
880 policy: req.policy,
881 };
882 self.create_persona(request).await
883 }
884
885 pub async fn reap_persona(
887 &self,
888 persona_id: &str,
889 req: ReapPersonaRequest,
890 ) -> Result<ReapPersonaResponse> {
891 let cascade = req.cascade.unwrap_or(false);
892 let now = Utc::now();
893
894 let mut personas = self.personas.write().await;
895 if !personas.contains_key(persona_id) {
896 return Err(anyhow!("Persona not found: {}", persona_id));
897 }
898
899 let mut reaped_ids = vec![persona_id.to_string()];
900 if cascade {
901 let mut idx = 0usize;
902 while idx < reaped_ids.len() {
903 let current = reaped_ids[idx].clone();
904 let children: Vec<String> = personas
905 .values()
906 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
907 .map(|p| p.identity.id.clone())
908 .collect();
909 for child in children {
910 if !reaped_ids.iter().any(|existing| existing == &child) {
911 reaped_ids.push(child);
912 }
913 }
914 idx += 1;
915 }
916 }
917
918 for id in &reaped_ids {
919 if let Some(persona) = personas.get_mut(id) {
920 persona.status = PersonaStatus::Reaped;
921 persona.updated_at = now;
922 }
923 }
924 drop(personas);
925
926 for id in &reaped_ids {
927 self.push_event(ThoughtEvent {
928 id: Uuid::new_v4().to_string(),
929 event_type: ThoughtEventType::PersonaReaped,
930 persona_id: Some(id.clone()),
931 swarm_id: None,
932 timestamp: now,
933 payload: json!({
934 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
935 "cascade": cascade,
936 }),
937 })
938 .await;
939 }
940
941 Ok(ReapPersonaResponse {
942 count: reaped_ids.len(),
943 reaped_ids,
944 })
945 }
946
947 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
949 self.snapshots.read().await.back().cloned()
950 }
951
952 pub async fn lineage_graph(&self) -> LineageGraph {
954 let personas = self.personas.read().await;
955 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
956 let mut roots = Vec::new();
957 let mut total_edges = 0usize;
958
959 for persona in personas.values() {
960 if let Some(parent_id) = persona.identity.parent_id.clone() {
961 children_by_parent
962 .entry(parent_id)
963 .or_default()
964 .push(persona.identity.id.clone());
965 total_edges = total_edges.saturating_add(1);
966 } else {
967 roots.push(persona.identity.id.clone());
968 }
969 }
970
971 let mut nodes: Vec<LineageNode> = personas
972 .values()
973 .map(|persona| {
974 let mut children = children_by_parent
975 .get(&persona.identity.id)
976 .cloned()
977 .unwrap_or_default();
978 children.sort();
979
980 LineageNode {
981 persona_id: persona.identity.id.clone(),
982 parent_id: persona.identity.parent_id.clone(),
983 children,
984 depth: persona.identity.depth,
985 status: persona.status,
986 }
987 })
988 .collect();
989
990 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
991 roots.sort();
992
993 LineageGraph {
994 nodes,
995 roots,
996 total_edges,
997 }
998 }
999
1000 pub async fn status(&self) -> CognitionStatus {
1002 let personas = self.personas.read().await;
1003 let events = self.events.read().await;
1004 let snapshots = self.snapshots.read().await;
1005 let started_at = *self.started_at.read().await;
1006 let last_tick_at = *self.last_tick_at.read().await;
1007 let loop_interval_ms = *self.loop_interval_ms.read().await;
1008
1009 let active_persona_count = personas
1010 .values()
1011 .filter(|p| p.status == PersonaStatus::Active)
1012 .count();
1013
1014 CognitionStatus {
1015 enabled: self.enabled,
1016 running: self.running.load(Ordering::SeqCst),
1017 loop_interval_ms,
1018 started_at,
1019 last_tick_at,
1020 persona_count: personas.len(),
1021 active_persona_count,
1022 events_buffered: events.len(),
1023 snapshots_buffered: snapshots.len(),
1024 }
1025 }
1026
1027 async fn push_event(&self, event: ThoughtEvent) {
1028 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1029 }
1030}
1031
1032async fn push_event_internal(
1033 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1034 max_events: usize,
1035 event_tx: &broadcast::Sender<ThoughtEvent>,
1036 event: ThoughtEvent,
1037) {
1038 {
1039 let mut lock = events.write().await;
1040 lock.push_back(event.clone());
1041 while lock.len() > max_events {
1042 lock.pop_front();
1043 }
1044 }
1045 let _ = event_tx.send(event);
1046}
1047
1048async fn push_snapshot_internal(
1049 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1050 max_snapshots: usize,
1051 snapshot: MemorySnapshot,
1052) {
1053 let mut lock = snapshots.write().await;
1054 lock.push_back(snapshot);
1055 while lock.len() > max_snapshots {
1056 lock.pop_front();
1057 }
1058}
1059
1060async fn recent_persona_context(
1061 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1062 persona_id: &str,
1063 limit: usize,
1064) -> Vec<ThoughtEvent> {
1065 let lock = events.read().await;
1066 let mut selected: Vec<ThoughtEvent> = lock
1067 .iter()
1068 .rev()
1069 .filter(|event| {
1070 event.persona_id.as_deref() == Some(persona_id)
1071 || (event.persona_id.is_none()
1072 && matches!(
1073 event.event_type,
1074 ThoughtEventType::CheckResult
1075 | ThoughtEventType::ProposalCreated
1076 | ThoughtEventType::SnapshotCompressed
1077 ))
1078 })
1079 .take(limit)
1080 .cloned()
1081 .collect();
1082 selected.reverse();
1083 selected
1084}
1085
1086async fn generate_phase_thought(
1087 thinker: Option<&ThinkerClient>,
1088 work: &ThoughtWorkItem,
1089 context: &[ThoughtEvent],
1090) -> ThoughtResult {
1091 let started_at = Instant::now();
1092 if let Some(client) = thinker {
1093 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1094 match client.think(&system_prompt, &user_prompt).await {
1095 Ok(output) => {
1096 let thinking = normalize_thought_output(work, context, &output.text);
1097 if !thinking.is_empty() {
1098 return ThoughtResult {
1099 source: "model",
1100 model: Some(output.model),
1101 finish_reason: output.finish_reason,
1102 thinking,
1103 prompt_tokens: output.prompt_tokens,
1104 completion_tokens: output.completion_tokens,
1105 total_tokens: output.total_tokens,
1106 latency_ms: started_at.elapsed().as_millis(),
1107 error: None,
1108 };
1109 }
1110 }
1111 Err(error) => {
1112 return ThoughtResult {
1113 source: "fallback",
1114 model: None,
1115 finish_reason: None,
1116 thinking: fallback_phase_text(work, context),
1117 prompt_tokens: None,
1118 completion_tokens: None,
1119 total_tokens: None,
1120 latency_ms: started_at.elapsed().as_millis(),
1121 error: Some(error.to_string()),
1122 };
1123 }
1124 }
1125 }
1126
1127 ThoughtResult {
1128 source: "fallback",
1129 model: None,
1130 finish_reason: None,
1131 thinking: fallback_phase_text(work, context),
1132 prompt_tokens: None,
1133 completion_tokens: None,
1134 total_tokens: None,
1135 latency_ms: started_at.elapsed().as_millis(),
1136 error: None,
1137 }
1138}
1139
1140fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1141 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1142Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1143Write as an operational process update, not meta narration. \
1144Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
1145Output concrete findings, checks, risks, and next actions."
1146 .to_string();
1147
1148 let context_lines = if context.is_empty() {
1149 "none".to_string()
1150 } else {
1151 context
1152 .iter()
1153 .map(format_context_event)
1154 .collect::<Vec<_>>()
1155 .join("\n")
1156 };
1157
1158 let phase_instruction = match work.phase {
1159 ThoughtPhase::Observe => {
1160 "Process format (exact line labels): \
1161Phase: Observe | Goal: detect current customer/business risk | \
1162Signals: <1-3 concrete signals> | \
1163Uncertainty: <one unknown that blocks confidence> | \
1164Next_Action: <one immediate operational action>."
1165 }
1166 ThoughtPhase::Reflect => {
1167 "Process format (exact line labels): \
1168Phase: Reflect | Hypothesis: <single testable hypothesis> | \
1169Rationale: <why this is likely> | \
1170Business_Risk: <customer/revenue/SLA impact> | \
1171Validation_Next_Action: <one action to confirm or falsify>."
1172 }
1173 ThoughtPhase::Test => {
1174 "Process format (exact line labels): \
1175Phase: Test | Check: <single concrete check> | \
1176Procedure: <short executable procedure> | \
1177Expected_Result: <pass/fail expectation> | \
1178Evidence_Quality: <low|medium|high with reason> | \
1179Escalation_Trigger: <when to escalate immediately>."
1180 }
1181 ThoughtPhase::Compress => {
1182 "Process format (exact line labels): \
1183Phase: Compress | State_Summary: <current state in one line> | \
1184Retained_Facts: <3 short facts separated by '; '> | \
1185Open_Risks: <up to 2 unresolved risks separated by '; '> | \
1186Next_Process_Step: <next operational step>."
1187 }
1188 };
1189
1190 let user_prompt = format!(
1191 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
1192 phase = work.phase.as_str(),
1193 persona_id = work.persona_id,
1194 persona_name = work.persona_name,
1195 role = work.role,
1196 charter = work.charter,
1197 thought_count = work.thought_count,
1198 context = context_lines,
1199 instruction = phase_instruction
1200 );
1201
1202 (system_prompt, user_prompt)
1203}
1204
1205fn format_context_event(event: &ThoughtEvent) -> String {
1206 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
1207 format!(
1208 "{} {} {}",
1209 event.event_type.as_str(),
1210 event.timestamp.to_rfc3339(),
1211 trim_for_storage(&payload, 220)
1212 )
1213}
1214
1215fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
1216 let charter = trim_for_storage(&work.charter, 180);
1217 let context_summary = fallback_context_summary(context);
1218 let thought = match work.phase {
1219 ThoughtPhase::Observe => format!(
1220 "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
1221 work.role, charter, context_summary
1222 ),
1223 ThoughtPhase::Reflect => format!(
1224 "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
1225 ),
1226 ThoughtPhase::Test => format!(
1227 "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
1228 ),
1229 ThoughtPhase::Compress => format!(
1230 "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
1231 work.role, charter, context_summary
1232 ),
1233 };
1234 trim_for_storage(&thought, 1_200)
1235}
1236
1237fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
1238 let trimmed = trim_for_storage(raw, 2_000);
1239 if trimmed.trim().is_empty() {
1240 return fallback_phase_text(work, context);
1241 }
1242
1243 if let Some(idx) = find_process_label_start(&trimmed) {
1245 let candidate = trimmed[idx..].trim();
1246 if let Some(first_line) = candidate
1247 .lines()
1248 .map(str::trim)
1249 .find(|line| !line.is_empty())
1250 {
1251 if first_line.starts_with("Phase:") && !first_line.contains('<') {
1252 return first_line.to_string();
1253 }
1254 }
1255 if !candidate.is_empty() && !candidate.contains('<') && !candidate.contains('\n') {
1256 return candidate.to_string();
1257 }
1258 }
1259
1260 let lower = trimmed.to_ascii_lowercase();
1261 let looks_meta = lower.starts_with("we need")
1262 || lower.starts_with("i need")
1263 || lower.contains("we need to")
1264 || lower.contains("i need to")
1265 || lower.contains("must output")
1266 || lower.contains("let's ")
1267 || lower.contains("we have to");
1268
1269 if looks_meta {
1270 return fallback_phase_text(work, context);
1271 }
1272
1273 trimmed
1274}
1275
1276fn find_process_label_start(text: &str) -> Option<usize> {
1277 [
1278 "Phase: Observe",
1279 "Phase: Reflect",
1280 "Phase: Test",
1281 "Phase: Compress",
1282 "Phase:",
1283 ]
1284 .iter()
1285 .filter_map(|label| text.find(label))
1286 .min()
1287}
1288
1289fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
1290 if context.is_empty() {
1291 return "No prior events recorded yet.".to_string();
1292 }
1293
1294 let mut latest_error: Option<String> = None;
1295 let mut latest_proposal: Option<String> = None;
1296 let mut latest_check: Option<String> = None;
1297
1298 for event in context.iter().rev() {
1299 if latest_error.is_none()
1300 && let Some(error) = event
1301 .payload
1302 .get("error")
1303 .and_then(serde_json::Value::as_str)
1304 && !error.trim().is_empty()
1305 {
1306 latest_error = Some(trim_for_storage(error, 140));
1307 }
1308
1309 if latest_proposal.is_none()
1310 && event.event_type == ThoughtEventType::ProposalCreated
1311 && let Some(title) = event
1312 .payload
1313 .get("title")
1314 .and_then(serde_json::Value::as_str)
1315 && !title.trim().is_empty()
1316 {
1317 latest_proposal = Some(trim_for_storage(title, 120));
1318 }
1319
1320 if latest_check.is_none()
1321 && event.event_type == ThoughtEventType::CheckResult
1322 && let Some(result) = event
1323 .payload
1324 .get("result_excerpt")
1325 .and_then(serde_json::Value::as_str)
1326 && !result.trim().is_empty()
1327 {
1328 latest_check = Some(trim_for_storage(result, 140));
1329 }
1330
1331 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
1332 break;
1333 }
1334 }
1335
1336 let mut lines = vec![format!(
1337 "{} recent cognition events are available.",
1338 context.len()
1339 )];
1340 if let Some(error) = latest_error {
1341 lines.push(format!("Latest error signal: {}.", error));
1342 }
1343 if let Some(proposal) = latest_proposal {
1344 lines.push(format!("Recent proposal: {}.", proposal));
1345 }
1346 if let Some(check) = latest_check {
1347 lines.push(format!("Recent check: {}.", check));
1348 }
1349 lines.join(" ")
1350}
1351
1352fn trim_for_storage(input: &str, max_chars: usize) -> String {
1353 if input.chars().count() <= max_chars {
1354 return input.trim().to_string();
1355 }
1356 let mut trimmed = String::with_capacity(max_chars + 8);
1357 for ch in input.chars().take(max_chars) {
1358 trimmed.push(ch);
1359 }
1360 trimmed.push_str("...");
1361 trimmed.trim().to_string()
1362}
1363
1364fn estimate_fact_count(text: &str) -> usize {
1365 let sentence_count =
1366 text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
1367 sentence_count.clamp(1, 12)
1368}
1369
1370fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
1371 let first_line = thought
1372 .lines()
1373 .find(|line| !line.trim().is_empty())
1374 .unwrap_or("proposal");
1375 let compact = first_line
1376 .replace(['\t', '\r', '\n'], " ")
1377 .split_whitespace()
1378 .collect::<Vec<_>>()
1379 .join(" ");
1380 let trimmed = trim_for_storage(&compact, 72);
1381 if trimmed.is_empty() {
1382 format!("proposal-{}", thought_count)
1383 } else {
1384 trimmed
1385 }
1386}
1387
1388fn default_seed_persona() -> CreatePersonaRequest {
1389 CreatePersonaRequest {
1390 persona_id: Some("root-thinker".to_string()),
1391 name: "root-thinker".to_string(),
1392 role: "orchestrator".to_string(),
1393 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
1394 .to_string(),
1395 swarm_id: Some("swarm-core".to_string()),
1396 parent_id: None,
1397 policy: None,
1398 }
1399}
1400
1401fn normalize_thinker_endpoint(base_url: &str) -> String {
1402 let trimmed = base_url.trim().trim_end_matches('/');
1403 if trimmed.ends_with("/chat/completions") {
1404 return trimmed.to_string();
1405 }
1406 if trimmed.is_empty() {
1407 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
1408 }
1409 format!("{}/chat/completions", trimmed)
1410}
1411
1412fn env_bool(name: &str, default: bool) -> bool {
1413 std::env::var(name)
1414 .ok()
1415 .and_then(|v| match v.to_ascii_lowercase().as_str() {
1416 "1" | "true" | "yes" | "on" => Some(true),
1417 "0" | "false" | "no" | "off" => Some(false),
1418 _ => None,
1419 })
1420 .unwrap_or(default)
1421}
1422
1423fn env_f32(name: &str, default: f32) -> f32 {
1424 std::env::var(name)
1425 .ok()
1426 .and_then(|v| v.parse::<f32>().ok())
1427 .unwrap_or(default)
1428}
1429
1430fn env_u64(name: &str, default: u64) -> u64 {
1431 std::env::var(name)
1432 .ok()
1433 .and_then(|v| v.parse::<u64>().ok())
1434 .unwrap_or(default)
1435}
1436
1437fn env_u32(name: &str, default: u32) -> u32 {
1438 std::env::var(name)
1439 .ok()
1440 .and_then(|v| v.parse::<u32>().ok())
1441 .unwrap_or(default)
1442}
1443
1444fn env_usize(name: &str, default: usize) -> usize {
1445 std::env::var(name)
1446 .ok()
1447 .and_then(|v| v.parse::<usize>().ok())
1448 .unwrap_or(default)
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453 use super::*;
1454
1455 fn test_runtime() -> CognitionRuntime {
1456 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
1457 enabled: true,
1458 loop_interval_ms: 25,
1459 max_events: 256,
1460 max_snapshots: 32,
1461 default_policy: PersonaPolicy {
1462 max_spawn_depth: 2,
1463 max_branching_factor: 2,
1464 token_credits_per_minute: 1_000,
1465 cpu_credits_per_minute: 1_000,
1466 idle_ttl_secs: 300,
1467 share_memory: false,
1468 },
1469 })
1470 }
1471
1472 #[tokio::test]
1473 async fn create_spawn_and_lineage_work() {
1474 let runtime = test_runtime();
1475
1476 let root = runtime
1477 .create_persona(CreatePersonaRequest {
1478 persona_id: Some("root".to_string()),
1479 name: "root".to_string(),
1480 role: "orchestrator".to_string(),
1481 charter: "coordinate".to_string(),
1482 swarm_id: Some("swarm-a".to_string()),
1483 parent_id: None,
1484 policy: None,
1485 })
1486 .await
1487 .expect("root should be created");
1488
1489 assert_eq!(root.identity.depth, 0);
1490
1491 let child = runtime
1492 .spawn_child(
1493 "root",
1494 SpawnPersonaRequest {
1495 persona_id: Some("child-1".to_string()),
1496 name: "child-1".to_string(),
1497 role: "analyst".to_string(),
1498 charter: "analyze".to_string(),
1499 swarm_id: None,
1500 policy: None,
1501 },
1502 )
1503 .await
1504 .expect("child should spawn");
1505
1506 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
1507 assert_eq!(child.identity.depth, 1);
1508
1509 let lineage = runtime.lineage_graph().await;
1510 assert_eq!(lineage.total_edges, 1);
1511 assert_eq!(lineage.roots, vec!["root".to_string()]);
1512 }
1513
1514 #[tokio::test]
1515 async fn branching_and_depth_limits_are_enforced() {
1516 let runtime = test_runtime();
1517
1518 runtime
1519 .create_persona(CreatePersonaRequest {
1520 persona_id: Some("root".to_string()),
1521 name: "root".to_string(),
1522 role: "orchestrator".to_string(),
1523 charter: "coordinate".to_string(),
1524 swarm_id: Some("swarm-a".to_string()),
1525 parent_id: None,
1526 policy: None,
1527 })
1528 .await
1529 .expect("root should be created");
1530
1531 runtime
1532 .spawn_child(
1533 "root",
1534 SpawnPersonaRequest {
1535 persona_id: Some("c1".to_string()),
1536 name: "c1".to_string(),
1537 role: "worker".to_string(),
1538 charter: "run".to_string(),
1539 swarm_id: None,
1540 policy: None,
1541 },
1542 )
1543 .await
1544 .expect("first child should spawn");
1545
1546 runtime
1547 .spawn_child(
1548 "root",
1549 SpawnPersonaRequest {
1550 persona_id: Some("c2".to_string()),
1551 name: "c2".to_string(),
1552 role: "worker".to_string(),
1553 charter: "run".to_string(),
1554 swarm_id: None,
1555 policy: None,
1556 },
1557 )
1558 .await
1559 .expect("second child should spawn");
1560
1561 let third_child = runtime
1562 .spawn_child(
1563 "root",
1564 SpawnPersonaRequest {
1565 persona_id: Some("c3".to_string()),
1566 name: "c3".to_string(),
1567 role: "worker".to_string(),
1568 charter: "run".to_string(),
1569 swarm_id: None,
1570 policy: None,
1571 },
1572 )
1573 .await;
1574 assert!(third_child.is_err());
1575
1576 runtime
1577 .spawn_child(
1578 "c1",
1579 SpawnPersonaRequest {
1580 persona_id: Some("c1-1".to_string()),
1581 name: "c1-1".to_string(),
1582 role: "worker".to_string(),
1583 charter: "run".to_string(),
1584 swarm_id: None,
1585 policy: None,
1586 },
1587 )
1588 .await
1589 .expect("depth 2 should be allowed");
1590
1591 let depth_violation = runtime
1592 .spawn_child(
1593 "c1-1",
1594 SpawnPersonaRequest {
1595 persona_id: Some("c1-1-1".to_string()),
1596 name: "c1-1-1".to_string(),
1597 role: "worker".to_string(),
1598 charter: "run".to_string(),
1599 swarm_id: None,
1600 policy: None,
1601 },
1602 )
1603 .await;
1604 assert!(depth_violation.is_err());
1605 }
1606
1607 #[tokio::test]
1608 async fn start_stop_updates_runtime_status() {
1609 let runtime = test_runtime();
1610
1611 runtime
1612 .start(Some(StartCognitionRequest {
1613 loop_interval_ms: Some(10),
1614 seed_persona: Some(CreatePersonaRequest {
1615 persona_id: Some("seed".to_string()),
1616 name: "seed".to_string(),
1617 role: "watcher".to_string(),
1618 charter: "observe".to_string(),
1619 swarm_id: Some("swarm-seed".to_string()),
1620 parent_id: None,
1621 policy: None,
1622 }),
1623 }))
1624 .await
1625 .expect("runtime should start");
1626
1627 tokio::time::sleep(Duration::from_millis(60)).await;
1628 let running_status = runtime.status().await;
1629 assert!(running_status.running);
1630 assert!(running_status.events_buffered > 0);
1631
1632 runtime
1633 .stop(Some("test".to_string()))
1634 .await
1635 .expect("runtime should stop");
1636 let stopped_status = runtime.status().await;
1637 assert!(!stopped_status.running);
1638 }
1639}