1mod thinker;
9
10#[cfg(feature = "functiongemma")]
11pub mod tool_router;
12
13pub use thinker::{
14 CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
15};
16
17use anyhow::{Result, anyhow};
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::Duration;
25use tokio::sync::{Mutex, RwLock, broadcast};
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use uuid::Uuid;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum PersonaStatus {
34 Active,
35 Idle,
36 Reaped,
37 Error,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct PersonaPolicy {
43 pub max_spawn_depth: u32,
44 pub max_branching_factor: u32,
45 pub token_credits_per_minute: u32,
46 pub cpu_credits_per_minute: u32,
47 pub idle_ttl_secs: u64,
48 pub share_memory: bool,
49}
50
51impl Default for PersonaPolicy {
52 fn default() -> Self {
53 Self {
54 max_spawn_depth: 4,
55 max_branching_factor: 4,
56 token_credits_per_minute: 20_000,
57 cpu_credits_per_minute: 10_000,
58 idle_ttl_secs: 3_600,
59 share_memory: false,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct PersonaIdentity {
67 pub id: String,
68 pub name: String,
69 pub role: String,
70 pub charter: String,
71 pub swarm_id: Option<String>,
72 pub parent_id: Option<String>,
73 pub depth: u32,
74 pub created_at: DateTime<Utc>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct PersonaRuntimeState {
80 pub identity: PersonaIdentity,
81 pub policy: PersonaPolicy,
82 pub status: PersonaStatus,
83 pub thought_count: u64,
84 pub last_tick_at: Option<DateTime<Utc>>,
85 pub updated_at: DateTime<Utc>,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(rename_all = "snake_case")]
91pub enum ProposalRisk {
92 Low,
93 Medium,
94 High,
95 Critical,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub enum ProposalStatus {
102 Created,
103 Verified,
104 Rejected,
105 Executed,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct Proposal {
111 pub id: String,
112 pub persona_id: String,
113 pub title: String,
114 pub rationale: String,
115 pub evidence_refs: Vec<String>,
116 pub risk: ProposalRisk,
117 pub status: ProposalStatus,
118 pub created_at: DateTime<Utc>,
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
123#[serde(rename_all = "snake_case")]
124pub enum ThoughtEventType {
125 ThoughtGenerated,
126 HypothesisRaised,
127 CheckRequested,
128 CheckResult,
129 ProposalCreated,
130 ProposalVerified,
131 ProposalRejected,
132 ActionExecuted,
133 PersonaSpawned,
134 PersonaReaped,
135 SnapshotCompressed,
136}
137
138impl ThoughtEventType {
139 fn as_str(&self) -> &'static str {
140 match self {
141 Self::ThoughtGenerated => "thought_generated",
142 Self::HypothesisRaised => "hypothesis_raised",
143 Self::CheckRequested => "check_requested",
144 Self::CheckResult => "check_result",
145 Self::ProposalCreated => "proposal_created",
146 Self::ProposalVerified => "proposal_verified",
147 Self::ProposalRejected => "proposal_rejected",
148 Self::ActionExecuted => "action_executed",
149 Self::PersonaSpawned => "persona_spawned",
150 Self::PersonaReaped => "persona_reaped",
151 Self::SnapshotCompressed => "snapshot_compressed",
152 }
153 }
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ThoughtEvent {
159 pub id: String,
160 pub event_type: ThoughtEventType,
161 pub persona_id: Option<String>,
162 pub swarm_id: Option<String>,
163 pub timestamp: DateTime<Utc>,
164 pub payload: serde_json::Value,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct MemorySnapshot {
170 pub id: String,
171 pub generated_at: DateTime<Utc>,
172 pub swarm_id: Option<String>,
173 pub persona_scope: Vec<String>,
174 pub summary: String,
175 pub hot_event_count: usize,
176 pub warm_fact_count: usize,
177 pub cold_snapshot_count: usize,
178 pub metadata: HashMap<String, serde_json::Value>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct CreatePersonaRequest {
184 pub persona_id: Option<String>,
185 pub name: String,
186 pub role: String,
187 pub charter: String,
188 pub swarm_id: Option<String>,
189 pub parent_id: Option<String>,
190 pub policy: Option<PersonaPolicy>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct SpawnPersonaRequest {
196 pub persona_id: Option<String>,
197 pub name: String,
198 pub role: String,
199 pub charter: String,
200 pub swarm_id: Option<String>,
201 pub policy: Option<PersonaPolicy>,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct ReapPersonaRequest {
207 pub cascade: Option<bool>,
208 pub reason: Option<String>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct StartCognitionRequest {
214 pub loop_interval_ms: Option<u64>,
215 pub seed_persona: Option<CreatePersonaRequest>,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct StopCognitionRequest {
221 pub reason: Option<String>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct CognitionStatus {
227 pub enabled: bool,
228 pub running: bool,
229 pub loop_interval_ms: u64,
230 pub started_at: Option<DateTime<Utc>>,
231 pub last_tick_at: Option<DateTime<Utc>>,
232 pub persona_count: usize,
233 pub active_persona_count: usize,
234 pub events_buffered: usize,
235 pub snapshots_buffered: usize,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct ReapPersonaResponse {
241 pub reaped_ids: Vec<String>,
242 pub count: usize,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct LineageNode {
248 pub persona_id: String,
249 pub parent_id: Option<String>,
250 pub children: Vec<String>,
251 pub depth: u32,
252 pub status: PersonaStatus,
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct LineageGraph {
258 pub nodes: Vec<LineageNode>,
259 pub roots: Vec<String>,
260 pub total_edges: usize,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264enum ThoughtPhase {
265 Observe,
266 Reflect,
267 Test,
268 Compress,
269}
270
271impl ThoughtPhase {
272 fn from_thought_count(thought_count: u64) -> Self {
273 match thought_count % 4 {
274 1 => Self::Observe,
275 2 => Self::Reflect,
276 3 => Self::Test,
277 _ => Self::Compress,
278 }
279 }
280
281 fn as_str(&self) -> &'static str {
282 match self {
283 Self::Observe => "observe",
284 Self::Reflect => "reflect",
285 Self::Test => "test",
286 Self::Compress => "compress",
287 }
288 }
289
290 fn event_type(&self) -> ThoughtEventType {
291 match self {
292 Self::Observe => ThoughtEventType::ThoughtGenerated,
293 Self::Reflect => ThoughtEventType::HypothesisRaised,
294 Self::Test => ThoughtEventType::CheckRequested,
295 Self::Compress => ThoughtEventType::SnapshotCompressed,
296 }
297 }
298}
299
300#[derive(Debug, Clone)]
301struct ThoughtWorkItem {
302 persona_id: String,
303 persona_name: String,
304 role: String,
305 charter: String,
306 swarm_id: Option<String>,
307 thought_count: u64,
308 phase: ThoughtPhase,
309}
310
311#[derive(Debug, Clone)]
312struct ThoughtResult {
313 source: &'static str,
314 model: Option<String>,
315 finish_reason: Option<String>,
316 thinking: String,
317 prompt_tokens: Option<u32>,
318 completion_tokens: Option<u32>,
319 total_tokens: Option<u32>,
320 latency_ms: u128,
321 error: Option<String>,
322}
323
324#[derive(Debug, Clone)]
326pub struct CognitionRuntimeOptions {
327 pub enabled: bool,
328 pub loop_interval_ms: u64,
329 pub max_events: usize,
330 pub max_snapshots: usize,
331 pub default_policy: PersonaPolicy,
332}
333
334impl Default for CognitionRuntimeOptions {
335 fn default() -> Self {
336 Self {
337 enabled: false,
338 loop_interval_ms: 2_000,
339 max_events: 2_000,
340 max_snapshots: 128,
341 default_policy: PersonaPolicy::default(),
342 }
343 }
344}
345
346#[derive(Debug)]
348pub struct CognitionRuntime {
349 enabled: bool,
350 max_events: usize,
351 max_snapshots: usize,
352 default_policy: PersonaPolicy,
353 running: Arc<AtomicBool>,
354 loop_interval_ms: Arc<RwLock<u64>>,
355 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
356 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
357 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
358 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
359 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
360 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
361 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
362 event_tx: broadcast::Sender<ThoughtEvent>,
363 thinker: Option<Arc<ThinkerClient>>,
364}
365
366impl CognitionRuntime {
367 pub fn new_from_env() -> Self {
369 let mut options = CognitionRuntimeOptions::default();
370 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
371 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
372 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
373 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
374
375 options.default_policy = PersonaPolicy {
376 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
377 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
378 token_credits_per_minute: env_u32(
379 "CODETETHER_COGNITION_TOKEN_CREDITS_PER_MINUTE",
380 20_000,
381 ),
382 cpu_credits_per_minute: env_u32("CODETETHER_COGNITION_CPU_CREDITS_PER_MINUTE", 10_000),
383 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
384 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
385 };
386
387 let thinker_config = ThinkerConfig {
388 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
389 backend: thinker::ThinkerBackend::from_env(
390 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
391 .unwrap_or_else(|_| "openai_compat".to_string()),
392 ),
393 endpoint: normalize_thinker_endpoint(
394 &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
395 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
396 ),
397 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
398 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
399 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
400 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
401 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
402 .ok()
403 .and_then(|v| v.parse::<f32>().ok()),
404 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
405 timeout_ms: env_u64("CODETETHER_COGNITION_THINKER_TIMEOUT_MS", 12_000),
406 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
407 candle_tokenizer_path: std::env::var(
408 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
409 )
410 .ok(),
411 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
412 candle_device: thinker::CandleDevicePreference::from_env(
413 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
414 .unwrap_or_else(|_| "auto".to_string()),
415 ),
416 candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
417 candle_repeat_penalty: env_f32(
418 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
419 1.1,
420 ),
421 candle_repeat_last_n: env_usize(
422 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
423 64,
424 ),
425 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
426 };
427
428 Self::new_with_options_and_thinker(options, Some(thinker_config))
429 }
430
431 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
433 Self::new_with_options_and_thinker(options, None)
434 }
435
436 fn new_with_options_and_thinker(
437 options: CognitionRuntimeOptions,
438 thinker_config: Option<ThinkerConfig>,
439 ) -> Self {
440 let (event_tx, _) = broadcast::channel(options.max_events.max(16));
441 let thinker = thinker_config.and_then(|cfg| {
442 if !cfg.enabled {
443 return None;
444 }
445 match ThinkerClient::new(cfg) {
446 Ok(client) => {
447 tracing::info!(
448 backend = ?client.config().backend,
449 endpoint = %client.config().endpoint,
450 model = %client.config().model,
451 "Cognition thinker initialized"
452 );
453 Some(Arc::new(client))
454 }
455 Err(error) => {
456 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
457 None
458 }
459 }
460 });
461
462 Self {
463 enabled: options.enabled,
464 max_events: options.max_events.max(32),
465 max_snapshots: options.max_snapshots.max(8),
466 default_policy: options.default_policy,
467 running: Arc::new(AtomicBool::new(false)),
468 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
469 started_at: Arc::new(RwLock::new(None)),
470 last_tick_at: Arc::new(RwLock::new(None)),
471 personas: Arc::new(RwLock::new(HashMap::new())),
472 proposals: Arc::new(RwLock::new(HashMap::new())),
473 events: Arc::new(RwLock::new(VecDeque::new())),
474 snapshots: Arc::new(RwLock::new(VecDeque::new())),
475 loop_handle: Arc::new(Mutex::new(None)),
476 event_tx,
477 thinker,
478 }
479 }
480
481 pub fn is_enabled(&self) -> bool {
483 self.enabled
484 }
485
486 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
488 self.event_tx.subscribe()
489 }
490
491 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
493 if !self.enabled {
494 return Err(anyhow!(
495 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
496 ));
497 }
498
499 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
500 if let Some(request) = req {
501 if let Some(interval) = request.loop_interval_ms {
502 let mut lock = self.loop_interval_ms.write().await;
503 *lock = interval.max(100);
504 }
505 requested_seed_persona = request.seed_persona;
506 }
507
508 let has_personas = !self.personas.read().await.is_empty();
509 if !has_personas {
510 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
511 let _ = self.create_persona(seed).await?;
512 }
513
514 if self.running.load(Ordering::SeqCst) {
515 return Ok(self.status().await);
516 }
517
518 self.running.store(true, Ordering::SeqCst);
519 {
520 let mut started = self.started_at.write().await;
521 *started = Some(Utc::now());
522 }
523
524 let running = Arc::clone(&self.running);
525 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
526 let last_tick_at = Arc::clone(&self.last_tick_at);
527 let personas = Arc::clone(&self.personas);
528 let proposals = Arc::clone(&self.proposals);
529 let events = Arc::clone(&self.events);
530 let snapshots = Arc::clone(&self.snapshots);
531 let max_events = self.max_events;
532 let max_snapshots = self.max_snapshots;
533 let event_tx = self.event_tx.clone();
534 let thinker = self.thinker.clone();
535
536 let handle = tokio::spawn(async move {
537 let mut next_tick = Instant::now();
538 while running.load(Ordering::SeqCst) {
539 let now_instant = Instant::now();
540 if now_instant < next_tick {
541 tokio::time::sleep_until(next_tick).await;
542 }
543 if !running.load(Ordering::SeqCst) {
544 break;
545 }
546
547 let now = Utc::now();
548 {
549 let mut lock = last_tick_at.write().await;
550 *lock = Some(now);
551 }
552
553 let mut new_events = Vec::new();
554 let mut new_snapshots = Vec::new();
555 let mut new_proposals = Vec::new();
556
557 let work_items: Vec<ThoughtWorkItem> = {
558 let mut map = personas.write().await;
559 let mut items = Vec::new();
560 for persona in map.values_mut() {
561 if persona.status != PersonaStatus::Active {
562 continue;
563 }
564
565 persona.thought_count = persona.thought_count.saturating_add(1);
566 persona.last_tick_at = Some(now);
567 persona.updated_at = now;
568 items.push(ThoughtWorkItem {
569 persona_id: persona.identity.id.clone(),
570 persona_name: persona.identity.name.clone(),
571 role: persona.identity.role.clone(),
572 charter: persona.identity.charter.clone(),
573 swarm_id: persona.identity.swarm_id.clone(),
574 thought_count: persona.thought_count,
575 phase: ThoughtPhase::from_thought_count(persona.thought_count),
576 });
577 }
578 items
579 };
580
581 for work in work_items {
582 let context = recent_persona_context(&events, &work.persona_id, 8).await;
583 let thought = generate_phase_thought(thinker.as_deref(), &work, &context).await;
584
585 let event_timestamp = Utc::now();
586 new_events.push(ThoughtEvent {
587 id: Uuid::new_v4().to_string(),
588 event_type: work.phase.event_type(),
589 persona_id: Some(work.persona_id.clone()),
590 swarm_id: work.swarm_id.clone(),
591 timestamp: event_timestamp,
592 payload: json!({
593 "phase": work.phase.as_str(),
594 "thought_count": work.thought_count,
595 "persona": {
596 "id": work.persona_id.clone(),
597 "name": work.persona_name.clone(),
598 "role": work.role.clone(),
599 },
600 "context_event_count": context.len(),
601 "thinking": thought.thinking.clone(),
602 "source": thought.source,
603 "model": thought.model.clone(),
604 "finish_reason": thought.finish_reason.clone(),
605 "usage": {
606 "prompt_tokens": thought.prompt_tokens,
607 "completion_tokens": thought.completion_tokens,
608 "total_tokens": thought.total_tokens,
609 },
610 "latency_ms": thought.latency_ms,
611 "error": thought.error.clone(),
612 }),
613 });
614
615 if work.phase == ThoughtPhase::Test {
616 new_events.push(ThoughtEvent {
617 id: Uuid::new_v4().to_string(),
618 event_type: ThoughtEventType::CheckResult,
619 persona_id: Some(work.persona_id.clone()),
620 swarm_id: work.swarm_id.clone(),
621 timestamp: Utc::now(),
622 payload: json!({
623 "phase": work.phase.as_str(),
624 "thought_count": work.thought_count,
625 "result_excerpt": trim_for_storage(&thought.thinking, 280),
626 "source": thought.source,
627 "model": thought.model,
628 }),
629 });
630 }
631
632 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
633 let proposal = Proposal {
634 id: Uuid::new_v4().to_string(),
635 persona_id: work.persona_id.clone(),
636 title: proposal_title_from_thought(
637 &thought.thinking,
638 work.thought_count,
639 ),
640 rationale: trim_for_storage(&thought.thinking, 900),
641 evidence_refs: vec!["internal.thought_stream".to_string()],
642 risk: ProposalRisk::Low,
643 status: ProposalStatus::Created,
644 created_at: Utc::now(),
645 };
646
647 new_events.push(ThoughtEvent {
648 id: Uuid::new_v4().to_string(),
649 event_type: ThoughtEventType::ProposalCreated,
650 persona_id: Some(work.persona_id.clone()),
651 swarm_id: work.swarm_id.clone(),
652 timestamp: Utc::now(),
653 payload: json!({
654 "proposal_id": proposal.id,
655 "title": proposal.title,
656 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
657 "source": thought.source,
658 "model": thought.model,
659 }),
660 });
661
662 new_proposals.push(proposal);
663 }
664
665 if work.phase == ThoughtPhase::Compress {
666 new_snapshots.push(MemorySnapshot {
667 id: Uuid::new_v4().to_string(),
668 generated_at: Utc::now(),
669 swarm_id: work.swarm_id.clone(),
670 persona_scope: vec![work.persona_id.clone()],
671 summary: trim_for_storage(&thought.thinking, 1_500),
672 hot_event_count: context.len(),
673 warm_fact_count: estimate_fact_count(&thought.thinking),
674 cold_snapshot_count: 1,
675 metadata: HashMap::from([
676 (
677 "phase".to_string(),
678 serde_json::Value::String(work.phase.as_str().to_string()),
679 ),
680 (
681 "role".to_string(),
682 serde_json::Value::String(work.role.clone()),
683 ),
684 (
685 "source".to_string(),
686 serde_json::Value::String(thought.source.to_string()),
687 ),
688 (
689 "model".to_string(),
690 serde_json::Value::String(
691 thought
692 .model
693 .clone()
694 .unwrap_or_else(|| "fallback".to_string()),
695 ),
696 ),
697 (
698 "completion_tokens".to_string(),
699 serde_json::Value::Number(serde_json::Number::from(
700 thought.completion_tokens.unwrap_or(0) as u64,
701 )),
702 ),
703 ]),
704 });
705 }
706 }
707
708 if !new_proposals.is_empty() {
709 let mut proposal_store = proposals.write().await;
710 for proposal in new_proposals {
711 proposal_store.insert(proposal.id.clone(), proposal);
712 }
713 }
714
715 for event in new_events {
716 push_event_internal(&events, max_events, &event_tx, event).await;
717 }
718 for snapshot in new_snapshots {
719 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
720 }
721
722 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
723 next_tick += interval;
724 let tick_completed = Instant::now();
725 if tick_completed > next_tick {
726 next_tick = tick_completed;
727 }
728 }
729 });
730
731 {
732 let mut lock = self.loop_handle.lock().await;
733 *lock = Some(handle);
734 }
735
736 Ok(self.status().await)
737 }
738
739 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
741 self.running.store(false, Ordering::SeqCst);
742
743 if let Some(handle) = self.loop_handle.lock().await.take() {
744 handle.abort();
745 let _ = handle.await;
746 }
747
748 if let Some(reason_value) = reason {
749 let event = ThoughtEvent {
750 id: Uuid::new_v4().to_string(),
751 event_type: ThoughtEventType::CheckResult,
752 persona_id: None,
753 swarm_id: None,
754 timestamp: Utc::now(),
755 payload: json!({ "stopped": true, "reason": reason_value }),
756 };
757 self.push_event(event).await;
758 }
759
760 Ok(self.status().await)
761 }
762
763 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
765 let now = Utc::now();
766 let mut personas = self.personas.write().await;
767
768 let mut parent_swarm_id = None;
769 let mut computed_depth = 0_u32;
770 let mut inherited_policy = None;
771
772 if let Some(parent_id) = req.parent_id.clone() {
773 let parent = personas
774 .get(&parent_id)
775 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
776
777 if parent.status == PersonaStatus::Reaped {
778 return Err(anyhow!("Parent persona {} is reaped", parent_id));
779 }
780
781 parent_swarm_id = parent.identity.swarm_id.clone();
782 computed_depth = parent.identity.depth.saturating_add(1);
783 inherited_policy = Some(parent.policy.clone());
784 let branch_limit = parent.policy.max_branching_factor;
785
786 let child_count = personas
787 .values()
788 .filter(|p| {
789 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
790 && p.status != PersonaStatus::Reaped
791 })
792 .count();
793
794 if child_count as u32 >= branch_limit {
795 return Err(anyhow!(
796 "Parent {} reached branching limit {}",
797 parent_id,
798 branch_limit
799 ));
800 }
801 }
802
803 let policy = req
804 .policy
805 .clone()
806 .or(inherited_policy.clone())
807 .unwrap_or_else(|| self.default_policy.clone());
808
809 let effective_depth_limit = inherited_policy
810 .as_ref()
811 .map(|p| p.max_spawn_depth)
812 .unwrap_or(policy.max_spawn_depth);
813
814 if computed_depth > effective_depth_limit {
815 return Err(anyhow!(
816 "Spawn depth {} exceeds limit {}",
817 computed_depth,
818 effective_depth_limit
819 ));
820 }
821
822 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
823 if personas.contains_key(&persona_id) {
824 return Err(anyhow!("Persona id already exists: {}", persona_id));
825 }
826
827 let identity = PersonaIdentity {
828 id: persona_id.clone(),
829 name: req.name,
830 role: req.role,
831 charter: req.charter,
832 swarm_id: req.swarm_id.or(parent_swarm_id),
833 parent_id: req.parent_id,
834 depth: computed_depth,
835 created_at: now,
836 };
837
838 let persona = PersonaRuntimeState {
839 identity,
840 policy,
841 status: PersonaStatus::Active,
842 thought_count: 0,
843 last_tick_at: None,
844 updated_at: now,
845 };
846
847 personas.insert(persona_id, persona.clone());
848 drop(personas);
849
850 self.push_event(ThoughtEvent {
851 id: Uuid::new_v4().to_string(),
852 event_type: ThoughtEventType::PersonaSpawned,
853 persona_id: Some(persona.identity.id.clone()),
854 swarm_id: persona.identity.swarm_id.clone(),
855 timestamp: now,
856 payload: json!({
857 "name": persona.identity.name,
858 "role": persona.identity.role,
859 "depth": persona.identity.depth,
860 }),
861 })
862 .await;
863
864 Ok(persona)
865 }
866
867 pub async fn spawn_child(
869 &self,
870 parent_id: &str,
871 req: SpawnPersonaRequest,
872 ) -> Result<PersonaRuntimeState> {
873 let request = CreatePersonaRequest {
874 persona_id: req.persona_id,
875 name: req.name,
876 role: req.role,
877 charter: req.charter,
878 swarm_id: req.swarm_id,
879 parent_id: Some(parent_id.to_string()),
880 policy: req.policy,
881 };
882 self.create_persona(request).await
883 }
884
885 pub async fn reap_persona(
887 &self,
888 persona_id: &str,
889 req: ReapPersonaRequest,
890 ) -> Result<ReapPersonaResponse> {
891 let cascade = req.cascade.unwrap_or(false);
892 let now = Utc::now();
893
894 let mut personas = self.personas.write().await;
895 if !personas.contains_key(persona_id) {
896 return Err(anyhow!("Persona not found: {}", persona_id));
897 }
898
899 let mut reaped_ids = vec![persona_id.to_string()];
900 if cascade {
901 let mut idx = 0usize;
902 while idx < reaped_ids.len() {
903 let current = reaped_ids[idx].clone();
904 let children: Vec<String> = personas
905 .values()
906 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
907 .map(|p| p.identity.id.clone())
908 .collect();
909 for child in children {
910 if !reaped_ids.iter().any(|existing| existing == &child) {
911 reaped_ids.push(child);
912 }
913 }
914 idx += 1;
915 }
916 }
917
918 for id in &reaped_ids {
919 if let Some(persona) = personas.get_mut(id) {
920 persona.status = PersonaStatus::Reaped;
921 persona.updated_at = now;
922 }
923 }
924 drop(personas);
925
926 for id in &reaped_ids {
927 self.push_event(ThoughtEvent {
928 id: Uuid::new_v4().to_string(),
929 event_type: ThoughtEventType::PersonaReaped,
930 persona_id: Some(id.clone()),
931 swarm_id: None,
932 timestamp: now,
933 payload: json!({
934 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
935 "cascade": cascade,
936 }),
937 })
938 .await;
939 }
940
941 Ok(ReapPersonaResponse {
942 count: reaped_ids.len(),
943 reaped_ids,
944 })
945 }
946
947 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
949 self.snapshots.read().await.back().cloned()
950 }
951
952 pub async fn lineage_graph(&self) -> LineageGraph {
954 let personas = self.personas.read().await;
955 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
956 let mut roots = Vec::new();
957 let mut total_edges = 0usize;
958
959 for persona in personas.values() {
960 if let Some(parent_id) = persona.identity.parent_id.clone() {
961 children_by_parent
962 .entry(parent_id)
963 .or_default()
964 .push(persona.identity.id.clone());
965 total_edges = total_edges.saturating_add(1);
966 } else {
967 roots.push(persona.identity.id.clone());
968 }
969 }
970
971 let mut nodes: Vec<LineageNode> = personas
972 .values()
973 .map(|persona| {
974 let mut children = children_by_parent
975 .get(&persona.identity.id)
976 .cloned()
977 .unwrap_or_default();
978 children.sort();
979
980 LineageNode {
981 persona_id: persona.identity.id.clone(),
982 parent_id: persona.identity.parent_id.clone(),
983 children,
984 depth: persona.identity.depth,
985 status: persona.status,
986 }
987 })
988 .collect();
989
990 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
991 roots.sort();
992
993 LineageGraph {
994 nodes,
995 roots,
996 total_edges,
997 }
998 }
999
1000 pub async fn status(&self) -> CognitionStatus {
1002 let personas = self.personas.read().await;
1003 let events = self.events.read().await;
1004 let snapshots = self.snapshots.read().await;
1005 let started_at = *self.started_at.read().await;
1006 let last_tick_at = *self.last_tick_at.read().await;
1007 let loop_interval_ms = *self.loop_interval_ms.read().await;
1008
1009 let active_persona_count = personas
1010 .values()
1011 .filter(|p| p.status == PersonaStatus::Active)
1012 .count();
1013
1014 CognitionStatus {
1015 enabled: self.enabled,
1016 running: self.running.load(Ordering::SeqCst),
1017 loop_interval_ms,
1018 started_at,
1019 last_tick_at,
1020 persona_count: personas.len(),
1021 active_persona_count,
1022 events_buffered: events.len(),
1023 snapshots_buffered: snapshots.len(),
1024 }
1025 }
1026
1027 async fn push_event(&self, event: ThoughtEvent) {
1028 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1029 }
1030}
1031
1032async fn push_event_internal(
1033 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1034 max_events: usize,
1035 event_tx: &broadcast::Sender<ThoughtEvent>,
1036 event: ThoughtEvent,
1037) {
1038 {
1039 let mut lock = events.write().await;
1040 lock.push_back(event.clone());
1041 while lock.len() > max_events {
1042 lock.pop_front();
1043 }
1044 }
1045 let _ = event_tx.send(event);
1046}
1047
1048async fn push_snapshot_internal(
1049 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1050 max_snapshots: usize,
1051 snapshot: MemorySnapshot,
1052) {
1053 let mut lock = snapshots.write().await;
1054 lock.push_back(snapshot);
1055 while lock.len() > max_snapshots {
1056 lock.pop_front();
1057 }
1058}
1059
1060async fn recent_persona_context(
1061 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1062 persona_id: &str,
1063 limit: usize,
1064) -> Vec<ThoughtEvent> {
1065 let lock = events.read().await;
1066 let mut selected: Vec<ThoughtEvent> = lock
1067 .iter()
1068 .rev()
1069 .filter(|event| {
1070 event.persona_id.as_deref() == Some(persona_id)
1071 || (event.persona_id.is_none()
1072 && matches!(
1073 event.event_type,
1074 ThoughtEventType::CheckResult
1075 | ThoughtEventType::ProposalCreated
1076 | ThoughtEventType::SnapshotCompressed
1077 ))
1078 })
1079 .take(limit)
1080 .cloned()
1081 .collect();
1082 selected.reverse();
1083 selected
1084}
1085
1086async fn generate_phase_thought(
1087 thinker: Option<&ThinkerClient>,
1088 work: &ThoughtWorkItem,
1089 context: &[ThoughtEvent],
1090) -> ThoughtResult {
1091 let started_at = Instant::now();
1092 if let Some(client) = thinker {
1093 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1094 match client.think(&system_prompt, &user_prompt).await {
1095 Ok(output) => {
1096 let thinking = normalize_thought_output(work, context, &output.text);
1097 if !thinking.is_empty() {
1098 return ThoughtResult {
1099 source: "model",
1100 model: Some(output.model),
1101 finish_reason: output.finish_reason,
1102 thinking,
1103 prompt_tokens: output.prompt_tokens,
1104 completion_tokens: output.completion_tokens,
1105 total_tokens: output.total_tokens,
1106 latency_ms: started_at.elapsed().as_millis(),
1107 error: None,
1108 };
1109 }
1110 }
1111 Err(error) => {
1112 return ThoughtResult {
1113 source: "fallback",
1114 model: None,
1115 finish_reason: None,
1116 thinking: fallback_phase_text(work, context),
1117 prompt_tokens: None,
1118 completion_tokens: None,
1119 total_tokens: None,
1120 latency_ms: started_at.elapsed().as_millis(),
1121 error: Some(error.to_string()),
1122 };
1123 }
1124 }
1125 }
1126
1127 ThoughtResult {
1128 source: "fallback",
1129 model: None,
1130 finish_reason: None,
1131 thinking: fallback_phase_text(work, context),
1132 prompt_tokens: None,
1133 completion_tokens: None,
1134 total_tokens: None,
1135 latency_ms: started_at.elapsed().as_millis(),
1136 error: None,
1137 }
1138}
1139
1140fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1141 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1142Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1143Write as an operational process update, not meta narration. \
1144Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
1145Output concrete findings, checks, risks, and next actions. \
1146Fill every labeled field with concrete content. Never output placeholders such as '...', '<...>', 'TBD', or 'TODO'."
1147 .to_string();
1148
1149 let context_lines = if context.is_empty() {
1150 "none".to_string()
1151 } else {
1152 context
1153 .iter()
1154 .map(format_context_event)
1155 .collect::<Vec<_>>()
1156 .join("\n")
1157 };
1158
1159 let phase_instruction = match work.phase {
1160 ThoughtPhase::Observe => {
1161 "Process format (exact line labels): \
1162Phase: Observe | Goal: detect current customer/business risk | \
1163Signals: 1-3 concrete signals separated by '; ' | \
1164Uncertainty: one unknown that blocks confidence | \
1165Next_Action: one immediate operational action."
1166 }
1167 ThoughtPhase::Reflect => {
1168 "Process format (exact line labels): \
1169Phase: Reflect | Hypothesis: single testable hypothesis | \
1170Rationale: why this is likely | \
1171Business_Risk: customer/revenue/SLA impact | \
1172Validation_Next_Action: one action to confirm or falsify."
1173 }
1174 ThoughtPhase::Test => {
1175 "Process format (exact line labels): \
1176Phase: Test | Check: single concrete check | \
1177Procedure: short executable procedure | \
1178Expected_Result: pass/fail expectation | \
1179Evidence_Quality: low|medium|high with reason | \
1180Escalation_Trigger: when to escalate immediately."
1181 }
1182 ThoughtPhase::Compress => {
1183 "Process format (exact line labels): \
1184Phase: Compress | State_Summary: current state in one line | \
1185Retained_Facts: 3 short facts separated by '; ' | \
1186Open_Risks: up to 2 unresolved risks separated by '; ' | \
1187Next_Process_Step: next operational step."
1188 }
1189 };
1190
1191 let user_prompt = format!(
1192 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
1193 phase = work.phase.as_str(),
1194 persona_id = work.persona_id,
1195 persona_name = work.persona_name,
1196 role = work.role,
1197 charter = work.charter,
1198 thought_count = work.thought_count,
1199 context = context_lines,
1200 instruction = phase_instruction
1201 );
1202
1203 (system_prompt, user_prompt)
1204}
1205
1206fn format_context_event(event: &ThoughtEvent) -> String {
1207 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
1208 format!(
1209 "{} {} {}",
1210 event.event_type.as_str(),
1211 event.timestamp.to_rfc3339(),
1212 trim_for_storage(&payload, 220)
1213 )
1214}
1215
1216fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
1217 let charter = trim_for_storage(&work.charter, 180);
1218 let context_summary = fallback_context_summary(context);
1219 let thought = match work.phase {
1220 ThoughtPhase::Observe => format!(
1221 "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
1222 work.role, charter, context_summary
1223 ),
1224 ThoughtPhase::Reflect => format!(
1225 "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
1226 ),
1227 ThoughtPhase::Test => format!(
1228 "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
1229 ),
1230 ThoughtPhase::Compress => format!(
1231 "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
1232 work.role, charter, context_summary
1233 ),
1234 };
1235 trim_for_storage(&thought, 1_200)
1236}
1237
1238fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
1239 let trimmed = trim_for_storage(raw, 2_000);
1240 if trimmed.trim().is_empty() {
1241 return fallback_phase_text(work, context);
1242 }
1243
1244 if let Some(idx) = find_process_label_start(&trimmed) {
1246 let candidate = trimmed[idx..].trim();
1247 if let Some(first_line) = candidate
1248 .lines()
1249 .map(str::trim)
1250 .find(|line| !line.is_empty())
1251 {
1252 let normalized_line = first_line.trim_matches('"').trim_matches('\'').trim();
1253 if normalized_line.starts_with("Phase:")
1254 && !normalized_line.contains('<')
1255 && !has_template_placeholder_values(normalized_line)
1256 {
1257 return normalized_line.to_string();
1258 }
1259 }
1260 if !candidate.is_empty()
1261 && !candidate.contains('<')
1262 && !candidate.contains('\n')
1263 && !has_template_placeholder_values(candidate)
1264 {
1265 return candidate.to_string();
1266 }
1267 }
1268
1269 let lower = trimmed.to_ascii_lowercase();
1270 let looks_meta = lower.starts_with("we need")
1271 || lower.starts_with("i need")
1272 || lower.contains("we need to")
1273 || lower.contains("i need to")
1274 || lower.contains("must output")
1275 || lower.contains("let's ")
1276 || lower.contains("we have to");
1277
1278 if looks_meta || has_template_placeholder_values(&trimmed) {
1279 return fallback_phase_text(work, context);
1280 }
1281
1282 trimmed
1283}
1284
1285fn has_template_placeholder_values(text: &str) -> bool {
1286 let lower = text.to_ascii_lowercase();
1287 [
1288 "goal: ...",
1289 "signals: ...",
1290 "uncertainty: ...",
1291 "next_action: ...",
1292 "hypothesis: ...",
1293 "rationale: ...",
1294 "business_risk: ...",
1295 "validation_next_action: ...",
1296 "check: ...",
1297 "procedure: ...",
1298 "expected_result: ...",
1299 "evidence_quality: ...",
1300 "escalation_trigger: ...",
1301 "state_summary: ...",
1302 "retained_facts: ...",
1303 "open_risks: ...",
1304 "next_process_step: ...",
1305 ]
1306 .iter()
1307 .any(|needle| lower.contains(needle))
1308 || lower.contains("<...")
1309 || lower.contains("tbd")
1310 || lower.contains("todo")
1311}
1312
1313fn find_process_label_start(text: &str) -> Option<usize> {
1314 [
1315 "Phase: Observe",
1316 "Phase: Reflect",
1317 "Phase: Test",
1318 "Phase: Compress",
1319 "Phase:",
1320 ]
1321 .iter()
1322 .filter_map(|label| text.find(label))
1323 .min()
1324}
1325
1326fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
1327 if context.is_empty() {
1328 return "No prior events recorded yet.".to_string();
1329 }
1330
1331 let mut latest_error: Option<String> = None;
1332 let mut latest_proposal: Option<String> = None;
1333 let mut latest_check: Option<String> = None;
1334
1335 for event in context.iter().rev() {
1336 if latest_error.is_none()
1337 && let Some(error) = event
1338 .payload
1339 .get("error")
1340 .and_then(serde_json::Value::as_str)
1341 && !error.trim().is_empty()
1342 {
1343 latest_error = Some(trim_for_storage(error, 140));
1344 }
1345
1346 if latest_proposal.is_none()
1347 && event.event_type == ThoughtEventType::ProposalCreated
1348 && let Some(title) = event
1349 .payload
1350 .get("title")
1351 .and_then(serde_json::Value::as_str)
1352 && !title.trim().is_empty()
1353 && !has_template_placeholder_values(title)
1354 {
1355 latest_proposal = Some(trim_for_storage(title, 120));
1356 }
1357
1358 if latest_check.is_none()
1359 && event.event_type == ThoughtEventType::CheckResult
1360 && let Some(result) = event
1361 .payload
1362 .get("result_excerpt")
1363 .and_then(serde_json::Value::as_str)
1364 && !result.trim().is_empty()
1365 && !has_template_placeholder_values(result)
1366 {
1367 latest_check = Some(trim_for_storage(result, 140));
1368 }
1369
1370 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
1371 break;
1372 }
1373 }
1374
1375 let mut lines = vec![format!(
1376 "{} recent cognition events are available.",
1377 context.len()
1378 )];
1379 if let Some(error) = latest_error {
1380 lines.push(format!("Latest error signal: {}.", error));
1381 }
1382 if let Some(proposal) = latest_proposal {
1383 lines.push(format!("Recent proposal: {}.", proposal));
1384 }
1385 if let Some(check) = latest_check {
1386 lines.push(format!("Recent check: {}.", check));
1387 }
1388 lines.join(" ")
1389}
1390
1391fn trim_for_storage(input: &str, max_chars: usize) -> String {
1392 if input.chars().count() <= max_chars {
1393 return input.trim().to_string();
1394 }
1395 let mut trimmed = String::with_capacity(max_chars + 8);
1396 for ch in input.chars().take(max_chars) {
1397 trimmed.push(ch);
1398 }
1399 trimmed.push_str("...");
1400 trimmed.trim().to_string()
1401}
1402
1403fn estimate_fact_count(text: &str) -> usize {
1404 let sentence_count =
1405 text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
1406 sentence_count.clamp(1, 12)
1407}
1408
1409fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
1410 let first_line = thought
1411 .lines()
1412 .find(|line| !line.trim().is_empty())
1413 .unwrap_or("proposal");
1414 let compact = first_line
1415 .replace(['\t', '\r', '\n'], " ")
1416 .split_whitespace()
1417 .collect::<Vec<_>>()
1418 .join(" ");
1419 let trimmed = trim_for_storage(&compact, 72);
1420 if trimmed.is_empty() {
1421 format!("proposal-{}", thought_count)
1422 } else {
1423 trimmed
1424 }
1425}
1426
1427fn default_seed_persona() -> CreatePersonaRequest {
1428 CreatePersonaRequest {
1429 persona_id: Some("root-thinker".to_string()),
1430 name: "root-thinker".to_string(),
1431 role: "orchestrator".to_string(),
1432 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
1433 .to_string(),
1434 swarm_id: Some("swarm-core".to_string()),
1435 parent_id: None,
1436 policy: None,
1437 }
1438}
1439
1440fn normalize_thinker_endpoint(base_url: &str) -> String {
1441 let trimmed = base_url.trim().trim_end_matches('/');
1442 if trimmed.ends_with("/chat/completions") {
1443 return trimmed.to_string();
1444 }
1445 if trimmed.is_empty() {
1446 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
1447 }
1448 format!("{}/chat/completions", trimmed)
1449}
1450
1451fn env_bool(name: &str, default: bool) -> bool {
1452 std::env::var(name)
1453 .ok()
1454 .and_then(|v| match v.to_ascii_lowercase().as_str() {
1455 "1" | "true" | "yes" | "on" => Some(true),
1456 "0" | "false" | "no" | "off" => Some(false),
1457 _ => None,
1458 })
1459 .unwrap_or(default)
1460}
1461
1462fn env_f32(name: &str, default: f32) -> f32 {
1463 std::env::var(name)
1464 .ok()
1465 .and_then(|v| v.parse::<f32>().ok())
1466 .unwrap_or(default)
1467}
1468
1469fn env_u64(name: &str, default: u64) -> u64 {
1470 std::env::var(name)
1471 .ok()
1472 .and_then(|v| v.parse::<u64>().ok())
1473 .unwrap_or(default)
1474}
1475
1476fn env_u32(name: &str, default: u32) -> u32 {
1477 std::env::var(name)
1478 .ok()
1479 .and_then(|v| v.parse::<u32>().ok())
1480 .unwrap_or(default)
1481}
1482
1483fn env_usize(name: &str, default: usize) -> usize {
1484 std::env::var(name)
1485 .ok()
1486 .and_then(|v| v.parse::<usize>().ok())
1487 .unwrap_or(default)
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492 use super::*;
1493
1494 fn sample_work_item(phase: ThoughtPhase) -> ThoughtWorkItem {
1495 ThoughtWorkItem {
1496 persona_id: "p-1".to_string(),
1497 persona_name: "Spotlessbinco Business Thinker".to_string(),
1498 role: "principal reliability engineer".to_string(),
1499 charter: "Continuously think about /home/riley/spotlessbinco as a production business system."
1500 .to_string(),
1501 swarm_id: Some("spotlessbinco".to_string()),
1502 thought_count: 4,
1503 phase,
1504 }
1505 }
1506
1507 fn test_runtime() -> CognitionRuntime {
1508 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
1509 enabled: true,
1510 loop_interval_ms: 25,
1511 max_events: 256,
1512 max_snapshots: 32,
1513 default_policy: PersonaPolicy {
1514 max_spawn_depth: 2,
1515 max_branching_factor: 2,
1516 token_credits_per_minute: 1_000,
1517 cpu_credits_per_minute: 1_000,
1518 idle_ttl_secs: 300,
1519 share_memory: false,
1520 },
1521 })
1522 }
1523
1524 #[test]
1525 fn normalize_rejects_placeholder_process_line() {
1526 let work = sample_work_item(ThoughtPhase::Compress);
1527 let output = normalize_thought_output(
1528 &work,
1529 &[],
1530 "Phase: Compress | State_Summary: ... | Retained_Facts: ... | Open_Risks: ... | Next_Process_Step: ...",
1531 );
1532 assert!(
1533 output.starts_with("Phase: Compress | State_Summary: reliability monitoring active")
1534 );
1535 assert!(!output.contains("State_Summary: ..."));
1536 }
1537
1538 #[test]
1539 fn normalize_accepts_concrete_process_line() {
1540 let work = sample_work_item(ThoughtPhase::Test);
1541 let output = normalize_thought_output(
1542 &work,
1543 &[],
1544 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes",
1545 );
1546 assert_eq!(
1547 output,
1548 "Phase: Test | Check: inspect ingress 5xx over last 15m | Procedure: query error-rate dashboard and compare baseline | Expected_Result: pass if <=0.5% 5xx, fail otherwise | Evidence_Quality: high from direct telemetry | Escalation_Trigger: >2% 5xx for 5 minutes"
1549 );
1550 }
1551
1552 #[tokio::test]
1553 async fn create_spawn_and_lineage_work() {
1554 let runtime = test_runtime();
1555
1556 let root = runtime
1557 .create_persona(CreatePersonaRequest {
1558 persona_id: Some("root".to_string()),
1559 name: "root".to_string(),
1560 role: "orchestrator".to_string(),
1561 charter: "coordinate".to_string(),
1562 swarm_id: Some("swarm-a".to_string()),
1563 parent_id: None,
1564 policy: None,
1565 })
1566 .await
1567 .expect("root should be created");
1568
1569 assert_eq!(root.identity.depth, 0);
1570
1571 let child = runtime
1572 .spawn_child(
1573 "root",
1574 SpawnPersonaRequest {
1575 persona_id: Some("child-1".to_string()),
1576 name: "child-1".to_string(),
1577 role: "analyst".to_string(),
1578 charter: "analyze".to_string(),
1579 swarm_id: None,
1580 policy: None,
1581 },
1582 )
1583 .await
1584 .expect("child should spawn");
1585
1586 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
1587 assert_eq!(child.identity.depth, 1);
1588
1589 let lineage = runtime.lineage_graph().await;
1590 assert_eq!(lineage.total_edges, 1);
1591 assert_eq!(lineage.roots, vec!["root".to_string()]);
1592 }
1593
1594 #[tokio::test]
1595 async fn branching_and_depth_limits_are_enforced() {
1596 let runtime = test_runtime();
1597
1598 runtime
1599 .create_persona(CreatePersonaRequest {
1600 persona_id: Some("root".to_string()),
1601 name: "root".to_string(),
1602 role: "orchestrator".to_string(),
1603 charter: "coordinate".to_string(),
1604 swarm_id: Some("swarm-a".to_string()),
1605 parent_id: None,
1606 policy: None,
1607 })
1608 .await
1609 .expect("root should be created");
1610
1611 runtime
1612 .spawn_child(
1613 "root",
1614 SpawnPersonaRequest {
1615 persona_id: Some("c1".to_string()),
1616 name: "c1".to_string(),
1617 role: "worker".to_string(),
1618 charter: "run".to_string(),
1619 swarm_id: None,
1620 policy: None,
1621 },
1622 )
1623 .await
1624 .expect("first child should spawn");
1625
1626 runtime
1627 .spawn_child(
1628 "root",
1629 SpawnPersonaRequest {
1630 persona_id: Some("c2".to_string()),
1631 name: "c2".to_string(),
1632 role: "worker".to_string(),
1633 charter: "run".to_string(),
1634 swarm_id: None,
1635 policy: None,
1636 },
1637 )
1638 .await
1639 .expect("second child should spawn");
1640
1641 let third_child = runtime
1642 .spawn_child(
1643 "root",
1644 SpawnPersonaRequest {
1645 persona_id: Some("c3".to_string()),
1646 name: "c3".to_string(),
1647 role: "worker".to_string(),
1648 charter: "run".to_string(),
1649 swarm_id: None,
1650 policy: None,
1651 },
1652 )
1653 .await;
1654 assert!(third_child.is_err());
1655
1656 runtime
1657 .spawn_child(
1658 "c1",
1659 SpawnPersonaRequest {
1660 persona_id: Some("c1-1".to_string()),
1661 name: "c1-1".to_string(),
1662 role: "worker".to_string(),
1663 charter: "run".to_string(),
1664 swarm_id: None,
1665 policy: None,
1666 },
1667 )
1668 .await
1669 .expect("depth 2 should be allowed");
1670
1671 let depth_violation = runtime
1672 .spawn_child(
1673 "c1-1",
1674 SpawnPersonaRequest {
1675 persona_id: Some("c1-1-1".to_string()),
1676 name: "c1-1-1".to_string(),
1677 role: "worker".to_string(),
1678 charter: "run".to_string(),
1679 swarm_id: None,
1680 policy: None,
1681 },
1682 )
1683 .await;
1684 assert!(depth_violation.is_err());
1685 }
1686
1687 #[tokio::test]
1688 async fn start_stop_updates_runtime_status() {
1689 let runtime = test_runtime();
1690
1691 runtime
1692 .start(Some(StartCognitionRequest {
1693 loop_interval_ms: Some(10),
1694 seed_persona: Some(CreatePersonaRequest {
1695 persona_id: Some("seed".to_string()),
1696 name: "seed".to_string(),
1697 role: "watcher".to_string(),
1698 charter: "observe".to_string(),
1699 swarm_id: Some("swarm-seed".to_string()),
1700 parent_id: None,
1701 policy: None,
1702 }),
1703 }))
1704 .await
1705 .expect("runtime should start");
1706
1707 tokio::time::sleep(Duration::from_millis(60)).await;
1708 let running_status = runtime.status().await;
1709 assert!(running_status.running);
1710 assert!(running_status.events_buffered > 0);
1711
1712 runtime
1713 .stop(Some("test".to_string()))
1714 .await
1715 .expect("runtime should stop");
1716 let stopped_status = runtime.status().await;
1717 assert!(!stopped_status.running);
1718 }
1719}