1mod thinker;
9
10#[cfg(feature = "functiongemma")]
11pub mod tool_router;
12
13pub use thinker::{CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput};
14
15use anyhow::{Result, anyhow};
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::time::Duration;
23use tokio::sync::{Mutex, RwLock, broadcast};
24use tokio::task::JoinHandle;
25use tokio::time::Instant;
26use uuid::Uuid;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum PersonaStatus {
32 Active,
33 Idle,
34 Reaped,
35 Error,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PersonaPolicy {
41 pub max_spawn_depth: u32,
42 pub max_branching_factor: u32,
43 pub token_credits_per_minute: u32,
44 pub cpu_credits_per_minute: u32,
45 pub idle_ttl_secs: u64,
46 pub share_memory: bool,
47}
48
49impl Default for PersonaPolicy {
50 fn default() -> Self {
51 Self {
52 max_spawn_depth: 4,
53 max_branching_factor: 4,
54 token_credits_per_minute: 20_000,
55 cpu_credits_per_minute: 10_000,
56 idle_ttl_secs: 3_600,
57 share_memory: false,
58 }
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct PersonaIdentity {
65 pub id: String,
66 pub name: String,
67 pub role: String,
68 pub charter: String,
69 pub swarm_id: Option<String>,
70 pub parent_id: Option<String>,
71 pub depth: u32,
72 pub created_at: DateTime<Utc>,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PersonaRuntimeState {
78 pub identity: PersonaIdentity,
79 pub policy: PersonaPolicy,
80 pub status: PersonaStatus,
81 pub thought_count: u64,
82 pub last_tick_at: Option<DateTime<Utc>>,
83 pub updated_at: DateTime<Utc>,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub enum ProposalRisk {
90 Low,
91 Medium,
92 High,
93 Critical,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99pub enum ProposalStatus {
100 Created,
101 Verified,
102 Rejected,
103 Executed,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct Proposal {
109 pub id: String,
110 pub persona_id: String,
111 pub title: String,
112 pub rationale: String,
113 pub evidence_refs: Vec<String>,
114 pub risk: ProposalRisk,
115 pub status: ProposalStatus,
116 pub created_at: DateTime<Utc>,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "snake_case")]
122pub enum ThoughtEventType {
123 ThoughtGenerated,
124 HypothesisRaised,
125 CheckRequested,
126 CheckResult,
127 ProposalCreated,
128 ProposalVerified,
129 ProposalRejected,
130 ActionExecuted,
131 PersonaSpawned,
132 PersonaReaped,
133 SnapshotCompressed,
134}
135
136impl ThoughtEventType {
137 fn as_str(&self) -> &'static str {
138 match self {
139 Self::ThoughtGenerated => "thought_generated",
140 Self::HypothesisRaised => "hypothesis_raised",
141 Self::CheckRequested => "check_requested",
142 Self::CheckResult => "check_result",
143 Self::ProposalCreated => "proposal_created",
144 Self::ProposalVerified => "proposal_verified",
145 Self::ProposalRejected => "proposal_rejected",
146 Self::ActionExecuted => "action_executed",
147 Self::PersonaSpawned => "persona_spawned",
148 Self::PersonaReaped => "persona_reaped",
149 Self::SnapshotCompressed => "snapshot_compressed",
150 }
151 }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct ThoughtEvent {
157 pub id: String,
158 pub event_type: ThoughtEventType,
159 pub persona_id: Option<String>,
160 pub swarm_id: Option<String>,
161 pub timestamp: DateTime<Utc>,
162 pub payload: serde_json::Value,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct MemorySnapshot {
168 pub id: String,
169 pub generated_at: DateTime<Utc>,
170 pub swarm_id: Option<String>,
171 pub persona_scope: Vec<String>,
172 pub summary: String,
173 pub hot_event_count: usize,
174 pub warm_fact_count: usize,
175 pub cold_snapshot_count: usize,
176 pub metadata: HashMap<String, serde_json::Value>,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct CreatePersonaRequest {
182 pub persona_id: Option<String>,
183 pub name: String,
184 pub role: String,
185 pub charter: String,
186 pub swarm_id: Option<String>,
187 pub parent_id: Option<String>,
188 pub policy: Option<PersonaPolicy>,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct SpawnPersonaRequest {
194 pub persona_id: Option<String>,
195 pub name: String,
196 pub role: String,
197 pub charter: String,
198 pub swarm_id: Option<String>,
199 pub policy: Option<PersonaPolicy>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct ReapPersonaRequest {
205 pub cascade: Option<bool>,
206 pub reason: Option<String>,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct StartCognitionRequest {
212 pub loop_interval_ms: Option<u64>,
213 pub seed_persona: Option<CreatePersonaRequest>,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct StopCognitionRequest {
219 pub reason: Option<String>,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct CognitionStatus {
225 pub enabled: bool,
226 pub running: bool,
227 pub loop_interval_ms: u64,
228 pub started_at: Option<DateTime<Utc>>,
229 pub last_tick_at: Option<DateTime<Utc>>,
230 pub persona_count: usize,
231 pub active_persona_count: usize,
232 pub events_buffered: usize,
233 pub snapshots_buffered: usize,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct ReapPersonaResponse {
239 pub reaped_ids: Vec<String>,
240 pub count: usize,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct LineageNode {
246 pub persona_id: String,
247 pub parent_id: Option<String>,
248 pub children: Vec<String>,
249 pub depth: u32,
250 pub status: PersonaStatus,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct LineageGraph {
256 pub nodes: Vec<LineageNode>,
257 pub roots: Vec<String>,
258 pub total_edges: usize,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262enum ThoughtPhase {
263 Observe,
264 Reflect,
265 Test,
266 Compress,
267}
268
269impl ThoughtPhase {
270 fn from_thought_count(thought_count: u64) -> Self {
271 match thought_count % 4 {
272 1 => Self::Observe,
273 2 => Self::Reflect,
274 3 => Self::Test,
275 _ => Self::Compress,
276 }
277 }
278
279 fn as_str(&self) -> &'static str {
280 match self {
281 Self::Observe => "observe",
282 Self::Reflect => "reflect",
283 Self::Test => "test",
284 Self::Compress => "compress",
285 }
286 }
287
288 fn event_type(&self) -> ThoughtEventType {
289 match self {
290 Self::Observe => ThoughtEventType::ThoughtGenerated,
291 Self::Reflect => ThoughtEventType::HypothesisRaised,
292 Self::Test => ThoughtEventType::CheckRequested,
293 Self::Compress => ThoughtEventType::SnapshotCompressed,
294 }
295 }
296}
297
298#[derive(Debug, Clone)]
299struct ThoughtWorkItem {
300 persona_id: String,
301 persona_name: String,
302 role: String,
303 charter: String,
304 swarm_id: Option<String>,
305 thought_count: u64,
306 phase: ThoughtPhase,
307}
308
309#[derive(Debug, Clone)]
310struct ThoughtResult {
311 source: &'static str,
312 model: Option<String>,
313 finish_reason: Option<String>,
314 thinking: String,
315 prompt_tokens: Option<u32>,
316 completion_tokens: Option<u32>,
317 total_tokens: Option<u32>,
318 latency_ms: u128,
319 error: Option<String>,
320}
321
322#[derive(Debug, Clone)]
324pub struct CognitionRuntimeOptions {
325 pub enabled: bool,
326 pub loop_interval_ms: u64,
327 pub max_events: usize,
328 pub max_snapshots: usize,
329 pub default_policy: PersonaPolicy,
330}
331
332impl Default for CognitionRuntimeOptions {
333 fn default() -> Self {
334 Self {
335 enabled: false,
336 loop_interval_ms: 2_000,
337 max_events: 2_000,
338 max_snapshots: 128,
339 default_policy: PersonaPolicy::default(),
340 }
341 }
342}
343
344#[derive(Debug)]
346pub struct CognitionRuntime {
347 enabled: bool,
348 max_events: usize,
349 max_snapshots: usize,
350 default_policy: PersonaPolicy,
351 running: Arc<AtomicBool>,
352 loop_interval_ms: Arc<RwLock<u64>>,
353 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
354 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
355 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
356 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
357 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
358 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
359 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
360 event_tx: broadcast::Sender<ThoughtEvent>,
361 thinker: Option<Arc<ThinkerClient>>,
362}
363
364impl CognitionRuntime {
365 pub fn new_from_env() -> Self {
367 let mut options = CognitionRuntimeOptions::default();
368 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
369 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
370 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
371 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
372
373 options.default_policy = PersonaPolicy {
374 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
375 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
376 token_credits_per_minute: env_u32(
377 "CODETETHER_COGNITION_TOKEN_CREDITS_PER_MINUTE",
378 20_000,
379 ),
380 cpu_credits_per_minute: env_u32("CODETETHER_COGNITION_CPU_CREDITS_PER_MINUTE", 10_000),
381 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
382 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
383 };
384
385 let thinker_config = ThinkerConfig {
386 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
387 backend: thinker::ThinkerBackend::from_env(
388 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
389 .unwrap_or_else(|_| "openai_compat".to_string()),
390 ),
391 endpoint: normalize_thinker_endpoint(&std::env::var(
392 "CODETETHER_COGNITION_THINKER_BASE_URL",
393 )
394 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string())),
395 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
396 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
397 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
398 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
399 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
400 .ok()
401 .and_then(|v| v.parse::<f32>().ok()),
402 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
403 timeout_ms: env_u64("CODETETHER_COGNITION_THINKER_TIMEOUT_MS", 12_000),
404 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH")
405 .ok(),
406 candle_tokenizer_path: std::env::var(
407 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
408 )
409 .ok(),
410 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
411 candle_device: thinker::CandleDevicePreference::from_env(
412 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
413 .unwrap_or_else(|_| "auto".to_string()),
414 ),
415 candle_cuda_ordinal: env_usize(
416 "CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL",
417 0,
418 ),
419 candle_repeat_penalty: env_f32(
420 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
421 1.1,
422 ),
423 candle_repeat_last_n: env_usize(
424 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
425 64,
426 ),
427 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
428 };
429
430 Self::new_with_options_and_thinker(options, Some(thinker_config))
431 }
432
433 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
435 Self::new_with_options_and_thinker(options, None)
436 }
437
438 fn new_with_options_and_thinker(
439 options: CognitionRuntimeOptions,
440 thinker_config: Option<ThinkerConfig>,
441 ) -> Self {
442 let (event_tx, _) = broadcast::channel(options.max_events.max(16));
443 let thinker = thinker_config.and_then(|cfg| {
444 if !cfg.enabled {
445 return None;
446 }
447 match ThinkerClient::new(cfg) {
448 Ok(client) => {
449 tracing::info!(
450 backend = ?client.config().backend,
451 endpoint = %client.config().endpoint,
452 model = %client.config().model,
453 "Cognition thinker initialized"
454 );
455 Some(Arc::new(client))
456 }
457 Err(error) => {
458 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
459 None
460 }
461 }
462 });
463
464 Self {
465 enabled: options.enabled,
466 max_events: options.max_events.max(32),
467 max_snapshots: options.max_snapshots.max(8),
468 default_policy: options.default_policy,
469 running: Arc::new(AtomicBool::new(false)),
470 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
471 started_at: Arc::new(RwLock::new(None)),
472 last_tick_at: Arc::new(RwLock::new(None)),
473 personas: Arc::new(RwLock::new(HashMap::new())),
474 proposals: Arc::new(RwLock::new(HashMap::new())),
475 events: Arc::new(RwLock::new(VecDeque::new())),
476 snapshots: Arc::new(RwLock::new(VecDeque::new())),
477 loop_handle: Arc::new(Mutex::new(None)),
478 event_tx,
479 thinker,
480 }
481 }
482
483 pub fn is_enabled(&self) -> bool {
485 self.enabled
486 }
487
488 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
490 self.event_tx.subscribe()
491 }
492
493 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
495 if !self.enabled {
496 return Err(anyhow!(
497 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
498 ));
499 }
500
501 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
502 if let Some(request) = req {
503 if let Some(interval) = request.loop_interval_ms {
504 let mut lock = self.loop_interval_ms.write().await;
505 *lock = interval.max(100);
506 }
507 requested_seed_persona = request.seed_persona;
508 }
509
510 let has_personas = !self.personas.read().await.is_empty();
511 if !has_personas {
512 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
513 let _ = self.create_persona(seed).await?;
514 }
515
516 if self.running.load(Ordering::SeqCst) {
517 return Ok(self.status().await);
518 }
519
520 self.running.store(true, Ordering::SeqCst);
521 {
522 let mut started = self.started_at.write().await;
523 *started = Some(Utc::now());
524 }
525
526 let running = Arc::clone(&self.running);
527 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
528 let last_tick_at = Arc::clone(&self.last_tick_at);
529 let personas = Arc::clone(&self.personas);
530 let proposals = Arc::clone(&self.proposals);
531 let events = Arc::clone(&self.events);
532 let snapshots = Arc::clone(&self.snapshots);
533 let max_events = self.max_events;
534 let max_snapshots = self.max_snapshots;
535 let event_tx = self.event_tx.clone();
536 let thinker = self.thinker.clone();
537
538 let handle = tokio::spawn(async move {
539 let mut next_tick = Instant::now();
540 while running.load(Ordering::SeqCst) {
541 let now_instant = Instant::now();
542 if now_instant < next_tick {
543 tokio::time::sleep_until(next_tick).await;
544 }
545 if !running.load(Ordering::SeqCst) {
546 break;
547 }
548
549 let now = Utc::now();
550 {
551 let mut lock = last_tick_at.write().await;
552 *lock = Some(now);
553 }
554
555 let mut new_events = Vec::new();
556 let mut new_snapshots = Vec::new();
557 let mut new_proposals = Vec::new();
558
559 let work_items: Vec<ThoughtWorkItem> = {
560 let mut map = personas.write().await;
561 let mut items = Vec::new();
562 for persona in map.values_mut() {
563 if persona.status != PersonaStatus::Active {
564 continue;
565 }
566
567 persona.thought_count = persona.thought_count.saturating_add(1);
568 persona.last_tick_at = Some(now);
569 persona.updated_at = now;
570 items.push(ThoughtWorkItem {
571 persona_id: persona.identity.id.clone(),
572 persona_name: persona.identity.name.clone(),
573 role: persona.identity.role.clone(),
574 charter: persona.identity.charter.clone(),
575 swarm_id: persona.identity.swarm_id.clone(),
576 thought_count: persona.thought_count,
577 phase: ThoughtPhase::from_thought_count(persona.thought_count),
578 });
579 }
580 items
581 };
582
583 for work in work_items {
584 let context = recent_persona_context(&events, &work.persona_id, 8).await;
585 let thought = generate_phase_thought(thinker.as_deref(), &work, &context).await;
586
587 let event_timestamp = Utc::now();
588 new_events.push(ThoughtEvent {
589 id: Uuid::new_v4().to_string(),
590 event_type: work.phase.event_type(),
591 persona_id: Some(work.persona_id.clone()),
592 swarm_id: work.swarm_id.clone(),
593 timestamp: event_timestamp,
594 payload: json!({
595 "phase": work.phase.as_str(),
596 "thought_count": work.thought_count,
597 "persona": {
598 "id": work.persona_id.clone(),
599 "name": work.persona_name.clone(),
600 "role": work.role.clone(),
601 },
602 "context_event_count": context.len(),
603 "thinking": thought.thinking.clone(),
604 "source": thought.source,
605 "model": thought.model.clone(),
606 "finish_reason": thought.finish_reason.clone(),
607 "usage": {
608 "prompt_tokens": thought.prompt_tokens,
609 "completion_tokens": thought.completion_tokens,
610 "total_tokens": thought.total_tokens,
611 },
612 "latency_ms": thought.latency_ms,
613 "error": thought.error.clone(),
614 }),
615 });
616
617 if work.phase == ThoughtPhase::Test {
618 new_events.push(ThoughtEvent {
619 id: Uuid::new_v4().to_string(),
620 event_type: ThoughtEventType::CheckResult,
621 persona_id: Some(work.persona_id.clone()),
622 swarm_id: work.swarm_id.clone(),
623 timestamp: Utc::now(),
624 payload: json!({
625 "phase": work.phase.as_str(),
626 "thought_count": work.thought_count,
627 "result_excerpt": trim_for_storage(&thought.thinking, 280),
628 "source": thought.source,
629 "model": thought.model,
630 }),
631 });
632 }
633
634 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
635 let proposal = Proposal {
636 id: Uuid::new_v4().to_string(),
637 persona_id: work.persona_id.clone(),
638 title: proposal_title_from_thought(&thought.thinking, work.thought_count),
639 rationale: trim_for_storage(&thought.thinking, 900),
640 evidence_refs: vec!["internal.thought_stream".to_string()],
641 risk: ProposalRisk::Low,
642 status: ProposalStatus::Created,
643 created_at: Utc::now(),
644 };
645
646 new_events.push(ThoughtEvent {
647 id: Uuid::new_v4().to_string(),
648 event_type: ThoughtEventType::ProposalCreated,
649 persona_id: Some(work.persona_id.clone()),
650 swarm_id: work.swarm_id.clone(),
651 timestamp: Utc::now(),
652 payload: json!({
653 "proposal_id": proposal.id,
654 "title": proposal.title,
655 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
656 "source": thought.source,
657 "model": thought.model,
658 }),
659 });
660
661 new_proposals.push(proposal);
662 }
663
664 if work.phase == ThoughtPhase::Compress {
665 new_snapshots.push(MemorySnapshot {
666 id: Uuid::new_v4().to_string(),
667 generated_at: Utc::now(),
668 swarm_id: work.swarm_id.clone(),
669 persona_scope: vec![work.persona_id.clone()],
670 summary: trim_for_storage(&thought.thinking, 1_500),
671 hot_event_count: context.len(),
672 warm_fact_count: estimate_fact_count(&thought.thinking),
673 cold_snapshot_count: 1,
674 metadata: HashMap::from([
675 (
676 "phase".to_string(),
677 serde_json::Value::String(work.phase.as_str().to_string()),
678 ),
679 (
680 "role".to_string(),
681 serde_json::Value::String(work.role.clone()),
682 ),
683 (
684 "source".to_string(),
685 serde_json::Value::String(thought.source.to_string()),
686 ),
687 (
688 "model".to_string(),
689 serde_json::Value::String(
690 thought
691 .model
692 .clone()
693 .unwrap_or_else(|| "fallback".to_string()),
694 ),
695 ),
696 (
697 "completion_tokens".to_string(),
698 serde_json::Value::Number(
699 serde_json::Number::from(
700 thought.completion_tokens.unwrap_or(0) as u64
701 ),
702 ),
703 ),
704 ]),
705 });
706 }
707 }
708
709 if !new_proposals.is_empty() {
710 let mut proposal_store = proposals.write().await;
711 for proposal in new_proposals {
712 proposal_store.insert(proposal.id.clone(), proposal);
713 }
714 }
715
716 for event in new_events {
717 push_event_internal(&events, max_events, &event_tx, event).await;
718 }
719 for snapshot in new_snapshots {
720 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
721 }
722
723 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
724 next_tick += interval;
725 let tick_completed = Instant::now();
726 if tick_completed > next_tick {
727 next_tick = tick_completed;
728 }
729 }
730 });
731
732 {
733 let mut lock = self.loop_handle.lock().await;
734 *lock = Some(handle);
735 }
736
737 Ok(self.status().await)
738 }
739
740 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
742 self.running.store(false, Ordering::SeqCst);
743
744 if let Some(handle) = self.loop_handle.lock().await.take() {
745 handle.abort();
746 let _ = handle.await;
747 }
748
749 if let Some(reason_value) = reason {
750 let event = ThoughtEvent {
751 id: Uuid::new_v4().to_string(),
752 event_type: ThoughtEventType::CheckResult,
753 persona_id: None,
754 swarm_id: None,
755 timestamp: Utc::now(),
756 payload: json!({ "stopped": true, "reason": reason_value }),
757 };
758 self.push_event(event).await;
759 }
760
761 Ok(self.status().await)
762 }
763
764 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
766 let now = Utc::now();
767 let mut personas = self.personas.write().await;
768
769 let mut parent_swarm_id = None;
770 let mut computed_depth = 0_u32;
771 let mut inherited_policy = None;
772
773 if let Some(parent_id) = req.parent_id.clone() {
774 let parent = personas
775 .get(&parent_id)
776 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
777
778 if parent.status == PersonaStatus::Reaped {
779 return Err(anyhow!("Parent persona {} is reaped", parent_id));
780 }
781
782 parent_swarm_id = parent.identity.swarm_id.clone();
783 computed_depth = parent.identity.depth.saturating_add(1);
784 inherited_policy = Some(parent.policy.clone());
785 let branch_limit = parent.policy.max_branching_factor;
786
787 let child_count = personas
788 .values()
789 .filter(|p| {
790 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
791 && p.status != PersonaStatus::Reaped
792 })
793 .count();
794
795 if child_count as u32 >= branch_limit {
796 return Err(anyhow!(
797 "Parent {} reached branching limit {}",
798 parent_id,
799 branch_limit
800 ));
801 }
802 }
803
804 let policy = req
805 .policy
806 .clone()
807 .or(inherited_policy.clone())
808 .unwrap_or_else(|| self.default_policy.clone());
809
810 let effective_depth_limit = inherited_policy
811 .as_ref()
812 .map(|p| p.max_spawn_depth)
813 .unwrap_or(policy.max_spawn_depth);
814
815 if computed_depth > effective_depth_limit {
816 return Err(anyhow!(
817 "Spawn depth {} exceeds limit {}",
818 computed_depth,
819 effective_depth_limit
820 ));
821 }
822
823 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
824 if personas.contains_key(&persona_id) {
825 return Err(anyhow!("Persona id already exists: {}", persona_id));
826 }
827
828 let identity = PersonaIdentity {
829 id: persona_id.clone(),
830 name: req.name,
831 role: req.role,
832 charter: req.charter,
833 swarm_id: req.swarm_id.or(parent_swarm_id),
834 parent_id: req.parent_id,
835 depth: computed_depth,
836 created_at: now,
837 };
838
839 let persona = PersonaRuntimeState {
840 identity,
841 policy,
842 status: PersonaStatus::Active,
843 thought_count: 0,
844 last_tick_at: None,
845 updated_at: now,
846 };
847
848 personas.insert(persona_id, persona.clone());
849 drop(personas);
850
851 self.push_event(ThoughtEvent {
852 id: Uuid::new_v4().to_string(),
853 event_type: ThoughtEventType::PersonaSpawned,
854 persona_id: Some(persona.identity.id.clone()),
855 swarm_id: persona.identity.swarm_id.clone(),
856 timestamp: now,
857 payload: json!({
858 "name": persona.identity.name,
859 "role": persona.identity.role,
860 "depth": persona.identity.depth,
861 }),
862 })
863 .await;
864
865 Ok(persona)
866 }
867
868 pub async fn spawn_child(
870 &self,
871 parent_id: &str,
872 req: SpawnPersonaRequest,
873 ) -> Result<PersonaRuntimeState> {
874 let request = CreatePersonaRequest {
875 persona_id: req.persona_id,
876 name: req.name,
877 role: req.role,
878 charter: req.charter,
879 swarm_id: req.swarm_id,
880 parent_id: Some(parent_id.to_string()),
881 policy: req.policy,
882 };
883 self.create_persona(request).await
884 }
885
886 pub async fn reap_persona(
888 &self,
889 persona_id: &str,
890 req: ReapPersonaRequest,
891 ) -> Result<ReapPersonaResponse> {
892 let cascade = req.cascade.unwrap_or(false);
893 let now = Utc::now();
894
895 let mut personas = self.personas.write().await;
896 if !personas.contains_key(persona_id) {
897 return Err(anyhow!("Persona not found: {}", persona_id));
898 }
899
900 let mut reaped_ids = vec![persona_id.to_string()];
901 if cascade {
902 let mut idx = 0usize;
903 while idx < reaped_ids.len() {
904 let current = reaped_ids[idx].clone();
905 let children: Vec<String> = personas
906 .values()
907 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
908 .map(|p| p.identity.id.clone())
909 .collect();
910 for child in children {
911 if !reaped_ids.iter().any(|existing| existing == &child) {
912 reaped_ids.push(child);
913 }
914 }
915 idx += 1;
916 }
917 }
918
919 for id in &reaped_ids {
920 if let Some(persona) = personas.get_mut(id) {
921 persona.status = PersonaStatus::Reaped;
922 persona.updated_at = now;
923 }
924 }
925 drop(personas);
926
927 for id in &reaped_ids {
928 self.push_event(ThoughtEvent {
929 id: Uuid::new_v4().to_string(),
930 event_type: ThoughtEventType::PersonaReaped,
931 persona_id: Some(id.clone()),
932 swarm_id: None,
933 timestamp: now,
934 payload: json!({
935 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
936 "cascade": cascade,
937 }),
938 })
939 .await;
940 }
941
942 Ok(ReapPersonaResponse {
943 count: reaped_ids.len(),
944 reaped_ids,
945 })
946 }
947
948 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
950 self.snapshots.read().await.back().cloned()
951 }
952
953 pub async fn lineage_graph(&self) -> LineageGraph {
955 let personas = self.personas.read().await;
956 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
957 let mut roots = Vec::new();
958 let mut total_edges = 0usize;
959
960 for persona in personas.values() {
961 if let Some(parent_id) = persona.identity.parent_id.clone() {
962 children_by_parent
963 .entry(parent_id)
964 .or_default()
965 .push(persona.identity.id.clone());
966 total_edges = total_edges.saturating_add(1);
967 } else {
968 roots.push(persona.identity.id.clone());
969 }
970 }
971
972 let mut nodes: Vec<LineageNode> = personas
973 .values()
974 .map(|persona| {
975 let mut children = children_by_parent
976 .get(&persona.identity.id)
977 .cloned()
978 .unwrap_or_default();
979 children.sort();
980
981 LineageNode {
982 persona_id: persona.identity.id.clone(),
983 parent_id: persona.identity.parent_id.clone(),
984 children,
985 depth: persona.identity.depth,
986 status: persona.status,
987 }
988 })
989 .collect();
990
991 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
992 roots.sort();
993
994 LineageGraph {
995 nodes,
996 roots,
997 total_edges,
998 }
999 }
1000
1001 pub async fn status(&self) -> CognitionStatus {
1003 let personas = self.personas.read().await;
1004 let events = self.events.read().await;
1005 let snapshots = self.snapshots.read().await;
1006 let started_at = *self.started_at.read().await;
1007 let last_tick_at = *self.last_tick_at.read().await;
1008 let loop_interval_ms = *self.loop_interval_ms.read().await;
1009
1010 let active_persona_count = personas
1011 .values()
1012 .filter(|p| p.status == PersonaStatus::Active)
1013 .count();
1014
1015 CognitionStatus {
1016 enabled: self.enabled,
1017 running: self.running.load(Ordering::SeqCst),
1018 loop_interval_ms,
1019 started_at,
1020 last_tick_at,
1021 persona_count: personas.len(),
1022 active_persona_count,
1023 events_buffered: events.len(),
1024 snapshots_buffered: snapshots.len(),
1025 }
1026 }
1027
1028 async fn push_event(&self, event: ThoughtEvent) {
1029 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1030 }
1031}
1032
1033async fn push_event_internal(
1034 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1035 max_events: usize,
1036 event_tx: &broadcast::Sender<ThoughtEvent>,
1037 event: ThoughtEvent,
1038) {
1039 {
1040 let mut lock = events.write().await;
1041 lock.push_back(event.clone());
1042 while lock.len() > max_events {
1043 lock.pop_front();
1044 }
1045 }
1046 let _ = event_tx.send(event);
1047}
1048
1049async fn push_snapshot_internal(
1050 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1051 max_snapshots: usize,
1052 snapshot: MemorySnapshot,
1053) {
1054 let mut lock = snapshots.write().await;
1055 lock.push_back(snapshot);
1056 while lock.len() > max_snapshots {
1057 lock.pop_front();
1058 }
1059}
1060
1061async fn recent_persona_context(
1062 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1063 persona_id: &str,
1064 limit: usize,
1065) -> Vec<ThoughtEvent> {
1066 let lock = events.read().await;
1067 let mut selected: Vec<ThoughtEvent> = lock
1068 .iter()
1069 .rev()
1070 .filter(|event| {
1071 event.persona_id.as_deref() == Some(persona_id)
1072 || (event.persona_id.is_none()
1073 && matches!(
1074 event.event_type,
1075 ThoughtEventType::CheckResult
1076 | ThoughtEventType::ProposalCreated
1077 | ThoughtEventType::SnapshotCompressed
1078 ))
1079 })
1080 .take(limit)
1081 .cloned()
1082 .collect();
1083 selected.reverse();
1084 selected
1085}
1086
1087async fn generate_phase_thought(
1088 thinker: Option<&ThinkerClient>,
1089 work: &ThoughtWorkItem,
1090 context: &[ThoughtEvent],
1091) -> ThoughtResult {
1092 let started_at = Instant::now();
1093 if let Some(client) = thinker {
1094 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1095 match client.think(&system_prompt, &user_prompt).await {
1096 Ok(output) => {
1097 let thinking = trim_for_storage(&output.text, 2_000);
1098 if !thinking.is_empty() {
1099 return ThoughtResult {
1100 source: "model",
1101 model: Some(output.model),
1102 finish_reason: output.finish_reason,
1103 thinking,
1104 prompt_tokens: output.prompt_tokens,
1105 completion_tokens: output.completion_tokens,
1106 total_tokens: output.total_tokens,
1107 latency_ms: started_at.elapsed().as_millis(),
1108 error: None,
1109 };
1110 }
1111 }
1112 Err(error) => {
1113 return ThoughtResult {
1114 source: "fallback",
1115 model: None,
1116 finish_reason: None,
1117 thinking: fallback_phase_text(work, context),
1118 prompt_tokens: None,
1119 completion_tokens: None,
1120 total_tokens: None,
1121 latency_ms: started_at.elapsed().as_millis(),
1122 error: Some(error.to_string()),
1123 };
1124 }
1125 }
1126 }
1127
1128 ThoughtResult {
1129 source: "fallback",
1130 model: None,
1131 finish_reason: None,
1132 thinking: fallback_phase_text(work, context),
1133 prompt_tokens: None,
1134 completion_tokens: None,
1135 total_tokens: None,
1136 latency_ms: started_at.elapsed().as_millis(),
1137 error: None,
1138 }
1139}
1140
1141fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1142 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1143Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1144Focus on reasoning quality, uncertainty, and actionable checks."
1145 .to_string();
1146
1147 let context_lines = if context.is_empty() {
1148 "none".to_string()
1149 } else {
1150 context
1151 .iter()
1152 .map(format_context_event)
1153 .collect::<Vec<_>>()
1154 .join("\n")
1155 };
1156
1157 let phase_instruction = match work.phase {
1158 ThoughtPhase::Observe => {
1159 "Observe current state. Identify 1-3 concrete signals and one uncertainty."
1160 }
1161 ThoughtPhase::Reflect => {
1162 "Reflect on observed signals. Produce one hypothesis, rationale, and risk note."
1163 }
1164 ThoughtPhase::Test => {
1165 "Design and evaluate one check. Include expected result and evidence quality."
1166 }
1167 ThoughtPhase::Compress => {
1168 "Compress the latest cognition into a short state summary and retained facts."
1169 }
1170 };
1171
1172 let user_prompt = format!(
1173 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
1174 phase = work.phase.as_str(),
1175 persona_id = work.persona_id,
1176 persona_name = work.persona_name,
1177 role = work.role,
1178 charter = work.charter,
1179 thought_count = work.thought_count,
1180 context = context_lines,
1181 instruction = phase_instruction
1182 );
1183
1184 (system_prompt, user_prompt)
1185}
1186
1187fn format_context_event(event: &ThoughtEvent) -> String {
1188 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
1189 format!(
1190 "{} {} {}",
1191 event.event_type.as_str(),
1192 event.timestamp.to_rfc3339(),
1193 trim_for_storage(&payload, 220)
1194 )
1195}
1196
1197fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
1198 let charter = trim_for_storage(&work.charter, 180);
1199 let context_summary = fallback_context_summary(context);
1200 let thought = match work.phase {
1201 ThoughtPhase::Observe => format!(
1202 "Observation {}: I am monitoring this codebase with a business-first reliability lens. \
1203Role: {}. Charter focus: {}. {} Next focus: identify customer-impacting incidents, degraded flows, \
1204and recent operational regressions before they become outages.",
1205 work.thought_count, work.role, charter, context_summary
1206 ),
1207 ThoughtPhase::Reflect => format!(
1208 "Reflection {}: Based on current signals, the most plausible risk areas are runtime stability, \
1209deployment safety, and dependency/provider availability. {} Business impact if unresolved: service downtime, \
1210SLA breaches, and loss of customer trust. Confidence: low-to-medium until additional checks are gathered.",
1211 work.thought_count, context_summary
1212 ),
1213 ThoughtPhase::Test => format!(
1214 "Check plan {}: run targeted verification on service health, recent errors, and change history. \
1215{} Expected evidence: clear pass/fail indicators for each suspected fault domain. Decision rule: escalate \
1216immediately on repeated failures in customer-facing paths.",
1217 work.thought_count, context_summary
1218 ),
1219 ThoughtPhase::Compress => format!(
1220 "Operational summary {}: system remains under active reliability monitoring with business-priority focus. \
1221{} Immediate next actions: keep triage tight, capture repeatable diagnostics, and convert confirmed findings into \
1222concrete remediation steps for engineering and operations.",
1223 work.thought_count, context_summary
1224 ),
1225 };
1226 trim_for_storage(&thought, 1_200)
1227}
1228
1229fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
1230 if context.is_empty() {
1231 return "No prior events recorded yet.".to_string();
1232 }
1233
1234 let mut latest_error: Option<String> = None;
1235 let mut latest_proposal: Option<String> = None;
1236 let mut latest_check: Option<String> = None;
1237
1238 for event in context.iter().rev() {
1239 if latest_error.is_none()
1240 && let Some(error) = event.payload.get("error").and_then(serde_json::Value::as_str)
1241 && !error.trim().is_empty()
1242 {
1243 latest_error = Some(trim_for_storage(error, 140));
1244 }
1245
1246 if latest_proposal.is_none()
1247 && event.event_type == ThoughtEventType::ProposalCreated
1248 && let Some(title) = event.payload.get("title").and_then(serde_json::Value::as_str)
1249 && !title.trim().is_empty()
1250 {
1251 latest_proposal = Some(trim_for_storage(title, 120));
1252 }
1253
1254 if latest_check.is_none()
1255 && event.event_type == ThoughtEventType::CheckResult
1256 && let Some(result) = event
1257 .payload
1258 .get("result_excerpt")
1259 .and_then(serde_json::Value::as_str)
1260 && !result.trim().is_empty()
1261 {
1262 latest_check = Some(trim_for_storage(result, 140));
1263 }
1264
1265 if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
1266 break;
1267 }
1268 }
1269
1270 let mut lines = vec![format!("{} recent cognition events are available.", context.len())];
1271 if let Some(error) = latest_error {
1272 lines.push(format!("Latest error signal: {}.", error));
1273 }
1274 if let Some(proposal) = latest_proposal {
1275 lines.push(format!("Recent proposal: {}.", proposal));
1276 }
1277 if let Some(check) = latest_check {
1278 lines.push(format!("Recent check: {}.", check));
1279 }
1280 lines.join(" ")
1281}
1282
1283fn trim_for_storage(input: &str, max_chars: usize) -> String {
1284 if input.chars().count() <= max_chars {
1285 return input.trim().to_string();
1286 }
1287 let mut trimmed = String::with_capacity(max_chars + 8);
1288 for ch in input.chars().take(max_chars) {
1289 trimmed.push(ch);
1290 }
1291 trimmed.push_str("...");
1292 trimmed.trim().to_string()
1293}
1294
1295fn estimate_fact_count(text: &str) -> usize {
1296 let sentence_count = text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
1297 sentence_count.clamp(1, 12)
1298}
1299
1300fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
1301 let first_line = thought
1302 .lines()
1303 .find(|line| !line.trim().is_empty())
1304 .unwrap_or("proposal");
1305 let compact = first_line
1306 .replace(['\t', '\r', '\n'], " ")
1307 .split_whitespace()
1308 .collect::<Vec<_>>()
1309 .join(" ");
1310 let trimmed = trim_for_storage(&compact, 72);
1311 if trimmed.is_empty() {
1312 format!("proposal-{}", thought_count)
1313 } else {
1314 trimmed
1315 }
1316}
1317
1318fn default_seed_persona() -> CreatePersonaRequest {
1319 CreatePersonaRequest {
1320 persona_id: Some("root-thinker".to_string()),
1321 name: "root-thinker".to_string(),
1322 role: "orchestrator".to_string(),
1323 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
1324 .to_string(),
1325 swarm_id: Some("swarm-core".to_string()),
1326 parent_id: None,
1327 policy: None,
1328 }
1329}
1330
1331fn normalize_thinker_endpoint(base_url: &str) -> String {
1332 let trimmed = base_url.trim().trim_end_matches('/');
1333 if trimmed.ends_with("/chat/completions") {
1334 return trimmed.to_string();
1335 }
1336 if trimmed.is_empty() {
1337 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
1338 }
1339 format!("{}/chat/completions", trimmed)
1340}
1341
1342fn env_bool(name: &str, default: bool) -> bool {
1343 std::env::var(name)
1344 .ok()
1345 .and_then(|v| match v.to_ascii_lowercase().as_str() {
1346 "1" | "true" | "yes" | "on" => Some(true),
1347 "0" | "false" | "no" | "off" => Some(false),
1348 _ => None,
1349 })
1350 .unwrap_or(default)
1351}
1352
1353fn env_f32(name: &str, default: f32) -> f32 {
1354 std::env::var(name)
1355 .ok()
1356 .and_then(|v| v.parse::<f32>().ok())
1357 .unwrap_or(default)
1358}
1359
1360fn env_u64(name: &str, default: u64) -> u64 {
1361 std::env::var(name)
1362 .ok()
1363 .and_then(|v| v.parse::<u64>().ok())
1364 .unwrap_or(default)
1365}
1366
1367fn env_u32(name: &str, default: u32) -> u32 {
1368 std::env::var(name)
1369 .ok()
1370 .and_then(|v| v.parse::<u32>().ok())
1371 .unwrap_or(default)
1372}
1373
1374fn env_usize(name: &str, default: usize) -> usize {
1375 std::env::var(name)
1376 .ok()
1377 .and_then(|v| v.parse::<usize>().ok())
1378 .unwrap_or(default)
1379}
1380
1381#[cfg(test)]
1382mod tests {
1383 use super::*;
1384
1385 fn test_runtime() -> CognitionRuntime {
1386 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
1387 enabled: true,
1388 loop_interval_ms: 25,
1389 max_events: 256,
1390 max_snapshots: 32,
1391 default_policy: PersonaPolicy {
1392 max_spawn_depth: 2,
1393 max_branching_factor: 2,
1394 token_credits_per_minute: 1_000,
1395 cpu_credits_per_minute: 1_000,
1396 idle_ttl_secs: 300,
1397 share_memory: false,
1398 },
1399 })
1400 }
1401
1402 #[tokio::test]
1403 async fn create_spawn_and_lineage_work() {
1404 let runtime = test_runtime();
1405
1406 let root = runtime
1407 .create_persona(CreatePersonaRequest {
1408 persona_id: Some("root".to_string()),
1409 name: "root".to_string(),
1410 role: "orchestrator".to_string(),
1411 charter: "coordinate".to_string(),
1412 swarm_id: Some("swarm-a".to_string()),
1413 parent_id: None,
1414 policy: None,
1415 })
1416 .await
1417 .expect("root should be created");
1418
1419 assert_eq!(root.identity.depth, 0);
1420
1421 let child = runtime
1422 .spawn_child(
1423 "root",
1424 SpawnPersonaRequest {
1425 persona_id: Some("child-1".to_string()),
1426 name: "child-1".to_string(),
1427 role: "analyst".to_string(),
1428 charter: "analyze".to_string(),
1429 swarm_id: None,
1430 policy: None,
1431 },
1432 )
1433 .await
1434 .expect("child should spawn");
1435
1436 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
1437 assert_eq!(child.identity.depth, 1);
1438
1439 let lineage = runtime.lineage_graph().await;
1440 assert_eq!(lineage.total_edges, 1);
1441 assert_eq!(lineage.roots, vec!["root".to_string()]);
1442 }
1443
1444 #[tokio::test]
1445 async fn branching_and_depth_limits_are_enforced() {
1446 let runtime = test_runtime();
1447
1448 runtime
1449 .create_persona(CreatePersonaRequest {
1450 persona_id: Some("root".to_string()),
1451 name: "root".to_string(),
1452 role: "orchestrator".to_string(),
1453 charter: "coordinate".to_string(),
1454 swarm_id: Some("swarm-a".to_string()),
1455 parent_id: None,
1456 policy: None,
1457 })
1458 .await
1459 .expect("root should be created");
1460
1461 runtime
1462 .spawn_child(
1463 "root",
1464 SpawnPersonaRequest {
1465 persona_id: Some("c1".to_string()),
1466 name: "c1".to_string(),
1467 role: "worker".to_string(),
1468 charter: "run".to_string(),
1469 swarm_id: None,
1470 policy: None,
1471 },
1472 )
1473 .await
1474 .expect("first child should spawn");
1475
1476 runtime
1477 .spawn_child(
1478 "root",
1479 SpawnPersonaRequest {
1480 persona_id: Some("c2".to_string()),
1481 name: "c2".to_string(),
1482 role: "worker".to_string(),
1483 charter: "run".to_string(),
1484 swarm_id: None,
1485 policy: None,
1486 },
1487 )
1488 .await
1489 .expect("second child should spawn");
1490
1491 let third_child = runtime
1492 .spawn_child(
1493 "root",
1494 SpawnPersonaRequest {
1495 persona_id: Some("c3".to_string()),
1496 name: "c3".to_string(),
1497 role: "worker".to_string(),
1498 charter: "run".to_string(),
1499 swarm_id: None,
1500 policy: None,
1501 },
1502 )
1503 .await;
1504 assert!(third_child.is_err());
1505
1506 runtime
1507 .spawn_child(
1508 "c1",
1509 SpawnPersonaRequest {
1510 persona_id: Some("c1-1".to_string()),
1511 name: "c1-1".to_string(),
1512 role: "worker".to_string(),
1513 charter: "run".to_string(),
1514 swarm_id: None,
1515 policy: None,
1516 },
1517 )
1518 .await
1519 .expect("depth 2 should be allowed");
1520
1521 let depth_violation = runtime
1522 .spawn_child(
1523 "c1-1",
1524 SpawnPersonaRequest {
1525 persona_id: Some("c1-1-1".to_string()),
1526 name: "c1-1-1".to_string(),
1527 role: "worker".to_string(),
1528 charter: "run".to_string(),
1529 swarm_id: None,
1530 policy: None,
1531 },
1532 )
1533 .await;
1534 assert!(depth_violation.is_err());
1535 }
1536
1537 #[tokio::test]
1538 async fn start_stop_updates_runtime_status() {
1539 let runtime = test_runtime();
1540
1541 runtime
1542 .start(Some(StartCognitionRequest {
1543 loop_interval_ms: Some(10),
1544 seed_persona: Some(CreatePersonaRequest {
1545 persona_id: Some("seed".to_string()),
1546 name: "seed".to_string(),
1547 role: "watcher".to_string(),
1548 charter: "observe".to_string(),
1549 swarm_id: Some("swarm-seed".to_string()),
1550 parent_id: None,
1551 policy: None,
1552 }),
1553 }))
1554 .await
1555 .expect("runtime should start");
1556
1557 tokio::time::sleep(Duration::from_millis(60)).await;
1558 let running_status = runtime.status().await;
1559 assert!(running_status.running);
1560 assert!(running_status.events_buffered > 0);
1561
1562 runtime
1563 .stop(Some("test".to_string()))
1564 .await
1565 .expect("runtime should stop");
1566 let stopped_status = runtime.status().await;
1567 assert!(!stopped_status.running);
1568 }
1569}