Skip to main content

codetether_agent/cognition/
mod.rs

1//! Perpetual cognition runtime for persona swarms.
2//!
3//! Phase 0 scope:
4//! - Contract types for personas, thought events, proposals, and snapshots
5//! - In-memory runtime manager for lifecycle + lineage
6//! - Feature-flagged perpetual loop (no external execution side effects)
7
8mod thinker;
9
10#[cfg(feature = "functiongemma")]
11pub mod tool_router;
12
13pub use thinker::{CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput};
14
15use anyhow::{Result, anyhow};
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::time::Duration;
23use tokio::sync::{Mutex, RwLock, broadcast};
24use tokio::task::JoinHandle;
25use tokio::time::Instant;
26use uuid::Uuid;
27
28/// Persona execution status.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum PersonaStatus {
32    Active,
33    Idle,
34    Reaped,
35    Error,
36}
37
38/// Policy boundaries for a persona.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PersonaPolicy {
41    pub max_spawn_depth: u32,
42    pub max_branching_factor: u32,
43    pub token_credits_per_minute: u32,
44    pub cpu_credits_per_minute: u32,
45    pub idle_ttl_secs: u64,
46    pub share_memory: bool,
47}
48
49impl Default for PersonaPolicy {
50    fn default() -> Self {
51        Self {
52            max_spawn_depth: 4,
53            max_branching_factor: 4,
54            token_credits_per_minute: 20_000,
55            cpu_credits_per_minute: 10_000,
56            idle_ttl_secs: 3_600,
57            share_memory: false,
58        }
59    }
60}
61
62/// Identity contract for a persona.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct PersonaIdentity {
65    pub id: String,
66    pub name: String,
67    pub role: String,
68    pub charter: String,
69    pub swarm_id: Option<String>,
70    pub parent_id: Option<String>,
71    pub depth: u32,
72    pub created_at: DateTime<Utc>,
73}
74
75/// Full runtime state for a persona.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PersonaRuntimeState {
78    pub identity: PersonaIdentity,
79    pub policy: PersonaPolicy,
80    pub status: PersonaStatus,
81    pub thought_count: u64,
82    pub last_tick_at: Option<DateTime<Utc>>,
83    pub updated_at: DateTime<Utc>,
84}
85
86/// Proposal risk level.
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub enum ProposalRisk {
90    Low,
91    Medium,
92    High,
93    Critical,
94}
95
96/// Proposal status.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99pub enum ProposalStatus {
100    Created,
101    Verified,
102    Rejected,
103    Executed,
104}
105
106/// Proposal contract (think first, execute through gates).
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct Proposal {
109    pub id: String,
110    pub persona_id: String,
111    pub title: String,
112    pub rationale: String,
113    pub evidence_refs: Vec<String>,
114    pub risk: ProposalRisk,
115    pub status: ProposalStatus,
116    pub created_at: DateTime<Utc>,
117}
118
119/// Thought/event types emitted by the cognition loop.
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "snake_case")]
122pub enum ThoughtEventType {
123    ThoughtGenerated,
124    HypothesisRaised,
125    CheckRequested,
126    CheckResult,
127    ProposalCreated,
128    ProposalVerified,
129    ProposalRejected,
130    ActionExecuted,
131    PersonaSpawned,
132    PersonaReaped,
133    SnapshotCompressed,
134}
135
136impl ThoughtEventType {
137    fn as_str(&self) -> &'static str {
138        match self {
139            Self::ThoughtGenerated => "thought_generated",
140            Self::HypothesisRaised => "hypothesis_raised",
141            Self::CheckRequested => "check_requested",
142            Self::CheckResult => "check_result",
143            Self::ProposalCreated => "proposal_created",
144            Self::ProposalVerified => "proposal_verified",
145            Self::ProposalRejected => "proposal_rejected",
146            Self::ActionExecuted => "action_executed",
147            Self::PersonaSpawned => "persona_spawned",
148            Self::PersonaReaped => "persona_reaped",
149            Self::SnapshotCompressed => "snapshot_compressed",
150        }
151    }
152}
153
154/// Streamable thought event contract.
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct ThoughtEvent {
157    pub id: String,
158    pub event_type: ThoughtEventType,
159    pub persona_id: Option<String>,
160    pub swarm_id: Option<String>,
161    pub timestamp: DateTime<Utc>,
162    pub payload: serde_json::Value,
163}
164
165/// Distilled memory snapshot contract.
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct MemorySnapshot {
168    pub id: String,
169    pub generated_at: DateTime<Utc>,
170    pub swarm_id: Option<String>,
171    pub persona_scope: Vec<String>,
172    pub summary: String,
173    pub hot_event_count: usize,
174    pub warm_fact_count: usize,
175    pub cold_snapshot_count: usize,
176    pub metadata: HashMap<String, serde_json::Value>,
177}
178
179/// Request payload for creating a persona.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct CreatePersonaRequest {
182    pub persona_id: Option<String>,
183    pub name: String,
184    pub role: String,
185    pub charter: String,
186    pub swarm_id: Option<String>,
187    pub parent_id: Option<String>,
188    pub policy: Option<PersonaPolicy>,
189}
190
191/// Request payload for spawning a child persona.
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct SpawnPersonaRequest {
194    pub persona_id: Option<String>,
195    pub name: String,
196    pub role: String,
197    pub charter: String,
198    pub swarm_id: Option<String>,
199    pub policy: Option<PersonaPolicy>,
200}
201
202/// Request payload for reaping persona(s).
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct ReapPersonaRequest {
205    pub cascade: Option<bool>,
206    pub reason: Option<String>,
207}
208
209/// Start-control payload for perpetual cognition.
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct StartCognitionRequest {
212    pub loop_interval_ms: Option<u64>,
213    pub seed_persona: Option<CreatePersonaRequest>,
214}
215
216/// Stop-control payload for perpetual cognition.
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct StopCognitionRequest {
219    pub reason: Option<String>,
220}
221
222/// Start/stop response status.
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct CognitionStatus {
225    pub enabled: bool,
226    pub running: bool,
227    pub loop_interval_ms: u64,
228    pub started_at: Option<DateTime<Utc>>,
229    pub last_tick_at: Option<DateTime<Utc>>,
230    pub persona_count: usize,
231    pub active_persona_count: usize,
232    pub events_buffered: usize,
233    pub snapshots_buffered: usize,
234}
235
236/// Reap response.
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct ReapPersonaResponse {
239    pub reaped_ids: Vec<String>,
240    pub count: usize,
241}
242
243/// Lineage node response.
244#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct LineageNode {
246    pub persona_id: String,
247    pub parent_id: Option<String>,
248    pub children: Vec<String>,
249    pub depth: u32,
250    pub status: PersonaStatus,
251}
252
253/// Lineage graph response.
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct LineageGraph {
256    pub nodes: Vec<LineageNode>,
257    pub roots: Vec<String>,
258    pub total_edges: usize,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262enum ThoughtPhase {
263    Observe,
264    Reflect,
265    Test,
266    Compress,
267}
268
269impl ThoughtPhase {
270    fn from_thought_count(thought_count: u64) -> Self {
271        match thought_count % 4 {
272            1 => Self::Observe,
273            2 => Self::Reflect,
274            3 => Self::Test,
275            _ => Self::Compress,
276        }
277    }
278
279    fn as_str(&self) -> &'static str {
280        match self {
281            Self::Observe => "observe",
282            Self::Reflect => "reflect",
283            Self::Test => "test",
284            Self::Compress => "compress",
285        }
286    }
287
288    fn event_type(&self) -> ThoughtEventType {
289        match self {
290            Self::Observe => ThoughtEventType::ThoughtGenerated,
291            Self::Reflect => ThoughtEventType::HypothesisRaised,
292            Self::Test => ThoughtEventType::CheckRequested,
293            Self::Compress => ThoughtEventType::SnapshotCompressed,
294        }
295    }
296}
297
298#[derive(Debug, Clone)]
299struct ThoughtWorkItem {
300    persona_id: String,
301    persona_name: String,
302    role: String,
303    charter: String,
304    swarm_id: Option<String>,
305    thought_count: u64,
306    phase: ThoughtPhase,
307}
308
309#[derive(Debug, Clone)]
310struct ThoughtResult {
311    source: &'static str,
312    model: Option<String>,
313    finish_reason: Option<String>,
314    thinking: String,
315    prompt_tokens: Option<u32>,
316    completion_tokens: Option<u32>,
317    total_tokens: Option<u32>,
318    latency_ms: u128,
319    error: Option<String>,
320}
321
322/// Runtime options for cognition manager.
323#[derive(Debug, Clone)]
324pub struct CognitionRuntimeOptions {
325    pub enabled: bool,
326    pub loop_interval_ms: u64,
327    pub max_events: usize,
328    pub max_snapshots: usize,
329    pub default_policy: PersonaPolicy,
330}
331
332impl Default for CognitionRuntimeOptions {
333    fn default() -> Self {
334        Self {
335            enabled: false,
336            loop_interval_ms: 2_000,
337            max_events: 2_000,
338            max_snapshots: 128,
339            default_policy: PersonaPolicy::default(),
340        }
341    }
342}
343
344/// In-memory cognition runtime for perpetual persona swarms.
345#[derive(Debug)]
346pub struct CognitionRuntime {
347    enabled: bool,
348    max_events: usize,
349    max_snapshots: usize,
350    default_policy: PersonaPolicy,
351    running: Arc<AtomicBool>,
352    loop_interval_ms: Arc<RwLock<u64>>,
353    started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
354    last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
355    personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
356    proposals: Arc<RwLock<HashMap<String, Proposal>>>,
357    events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
358    snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
359    loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
360    event_tx: broadcast::Sender<ThoughtEvent>,
361    thinker: Option<Arc<ThinkerClient>>,
362}
363
364impl CognitionRuntime {
365    /// Build runtime from environment feature flags.
366    pub fn new_from_env() -> Self {
367        let mut options = CognitionRuntimeOptions::default();
368        options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
369        options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
370        options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
371        options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
372
373        options.default_policy = PersonaPolicy {
374            max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
375            max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
376            token_credits_per_minute: env_u32(
377                "CODETETHER_COGNITION_TOKEN_CREDITS_PER_MINUTE",
378                20_000,
379            ),
380            cpu_credits_per_minute: env_u32("CODETETHER_COGNITION_CPU_CREDITS_PER_MINUTE", 10_000),
381            idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
382            share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
383        };
384
385        let thinker_config = ThinkerConfig {
386            enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
387            backend: thinker::ThinkerBackend::from_env(
388                &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
389                    .unwrap_or_else(|_| "openai_compat".to_string()),
390            ),
391            endpoint: normalize_thinker_endpoint(&std::env::var(
392                "CODETETHER_COGNITION_THINKER_BASE_URL",
393            )
394            .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string())),
395            model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
396                .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
397            api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
398            temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
399            top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
400                .ok()
401                .and_then(|v| v.parse::<f32>().ok()),
402            max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
403            timeout_ms: env_u64("CODETETHER_COGNITION_THINKER_TIMEOUT_MS", 12_000),
404            candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH")
405                .ok(),
406            candle_tokenizer_path: std::env::var(
407                "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
408            )
409            .ok(),
410            candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
411            candle_device: thinker::CandleDevicePreference::from_env(
412                &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
413                    .unwrap_or_else(|_| "auto".to_string()),
414            ),
415            candle_cuda_ordinal: env_usize(
416                "CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL",
417                0,
418            ),
419            candle_repeat_penalty: env_f32(
420                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
421                1.1,
422            ),
423            candle_repeat_last_n: env_usize(
424                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
425                64,
426            ),
427            candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
428        };
429
430        Self::new_with_options_and_thinker(options, Some(thinker_config))
431    }
432
433    /// Build runtime from explicit options.
434    pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
435        Self::new_with_options_and_thinker(options, None)
436    }
437
438    fn new_with_options_and_thinker(
439        options: CognitionRuntimeOptions,
440        thinker_config: Option<ThinkerConfig>,
441    ) -> Self {
442        let (event_tx, _) = broadcast::channel(options.max_events.max(16));
443        let thinker = thinker_config.and_then(|cfg| {
444            if !cfg.enabled {
445                return None;
446            }
447            match ThinkerClient::new(cfg) {
448                Ok(client) => {
449                    tracing::info!(
450                        backend = ?client.config().backend,
451                        endpoint = %client.config().endpoint,
452                        model = %client.config().model,
453                        "Cognition thinker initialized"
454                    );
455                    Some(Arc::new(client))
456                }
457                Err(error) => {
458                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
459                    None
460                }
461            }
462        });
463
464        Self {
465            enabled: options.enabled,
466            max_events: options.max_events.max(32),
467            max_snapshots: options.max_snapshots.max(8),
468            default_policy: options.default_policy,
469            running: Arc::new(AtomicBool::new(false)),
470            loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
471            started_at: Arc::new(RwLock::new(None)),
472            last_tick_at: Arc::new(RwLock::new(None)),
473            personas: Arc::new(RwLock::new(HashMap::new())),
474            proposals: Arc::new(RwLock::new(HashMap::new())),
475            events: Arc::new(RwLock::new(VecDeque::new())),
476            snapshots: Arc::new(RwLock::new(VecDeque::new())),
477            loop_handle: Arc::new(Mutex::new(None)),
478            event_tx,
479            thinker,
480        }
481    }
482
483    /// Whether cognition is enabled by feature flag.
484    pub fn is_enabled(&self) -> bool {
485        self.enabled
486    }
487
488    /// Subscribe to thought events for streaming.
489    pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
490        self.event_tx.subscribe()
491    }
492
493    /// Start the perpetual cognition loop.
494    pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
495        if !self.enabled {
496            return Err(anyhow!(
497                "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
498            ));
499        }
500
501        let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
502        if let Some(request) = req {
503            if let Some(interval) = request.loop_interval_ms {
504                let mut lock = self.loop_interval_ms.write().await;
505                *lock = interval.max(100);
506            }
507            requested_seed_persona = request.seed_persona;
508        }
509
510        let has_personas = !self.personas.read().await.is_empty();
511        if !has_personas {
512            let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
513            let _ = self.create_persona(seed).await?;
514        }
515
516        if self.running.load(Ordering::SeqCst) {
517            return Ok(self.status().await);
518        }
519
520        self.running.store(true, Ordering::SeqCst);
521        {
522            let mut started = self.started_at.write().await;
523            *started = Some(Utc::now());
524        }
525
526        let running = Arc::clone(&self.running);
527        let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
528        let last_tick_at = Arc::clone(&self.last_tick_at);
529        let personas = Arc::clone(&self.personas);
530        let proposals = Arc::clone(&self.proposals);
531        let events = Arc::clone(&self.events);
532        let snapshots = Arc::clone(&self.snapshots);
533        let max_events = self.max_events;
534        let max_snapshots = self.max_snapshots;
535        let event_tx = self.event_tx.clone();
536        let thinker = self.thinker.clone();
537
538        let handle = tokio::spawn(async move {
539            let mut next_tick = Instant::now();
540            while running.load(Ordering::SeqCst) {
541                let now_instant = Instant::now();
542                if now_instant < next_tick {
543                    tokio::time::sleep_until(next_tick).await;
544                }
545                if !running.load(Ordering::SeqCst) {
546                    break;
547                }
548
549                let now = Utc::now();
550                {
551                    let mut lock = last_tick_at.write().await;
552                    *lock = Some(now);
553                }
554
555                let mut new_events = Vec::new();
556                let mut new_snapshots = Vec::new();
557                let mut new_proposals = Vec::new();
558
559                let work_items: Vec<ThoughtWorkItem> = {
560                    let mut map = personas.write().await;
561                    let mut items = Vec::new();
562                    for persona in map.values_mut() {
563                        if persona.status != PersonaStatus::Active {
564                            continue;
565                        }
566
567                        persona.thought_count = persona.thought_count.saturating_add(1);
568                        persona.last_tick_at = Some(now);
569                        persona.updated_at = now;
570                        items.push(ThoughtWorkItem {
571                            persona_id: persona.identity.id.clone(),
572                            persona_name: persona.identity.name.clone(),
573                            role: persona.identity.role.clone(),
574                            charter: persona.identity.charter.clone(),
575                            swarm_id: persona.identity.swarm_id.clone(),
576                            thought_count: persona.thought_count,
577                            phase: ThoughtPhase::from_thought_count(persona.thought_count),
578                        });
579                    }
580                    items
581                };
582
583                for work in work_items {
584                    let context = recent_persona_context(&events, &work.persona_id, 8).await;
585                    let thought = generate_phase_thought(thinker.as_deref(), &work, &context).await;
586
587                    let event_timestamp = Utc::now();
588                    new_events.push(ThoughtEvent {
589                        id: Uuid::new_v4().to_string(),
590                        event_type: work.phase.event_type(),
591                        persona_id: Some(work.persona_id.clone()),
592                        swarm_id: work.swarm_id.clone(),
593                        timestamp: event_timestamp,
594                        payload: json!({
595                            "phase": work.phase.as_str(),
596                            "thought_count": work.thought_count,
597                            "persona": {
598                                "id": work.persona_id.clone(),
599                                "name": work.persona_name.clone(),
600                                "role": work.role.clone(),
601                            },
602                            "context_event_count": context.len(),
603                            "thinking": thought.thinking.clone(),
604                            "source": thought.source,
605                            "model": thought.model.clone(),
606                            "finish_reason": thought.finish_reason.clone(),
607                            "usage": {
608                                "prompt_tokens": thought.prompt_tokens,
609                                "completion_tokens": thought.completion_tokens,
610                                "total_tokens": thought.total_tokens,
611                            },
612                            "latency_ms": thought.latency_ms,
613                            "error": thought.error.clone(),
614                        }),
615                    });
616
617                    if work.phase == ThoughtPhase::Test {
618                        new_events.push(ThoughtEvent {
619                            id: Uuid::new_v4().to_string(),
620                            event_type: ThoughtEventType::CheckResult,
621                            persona_id: Some(work.persona_id.clone()),
622                            swarm_id: work.swarm_id.clone(),
623                            timestamp: Utc::now(),
624                            payload: json!({
625                                "phase": work.phase.as_str(),
626                                "thought_count": work.thought_count,
627                                "result_excerpt": trim_for_storage(&thought.thinking, 280),
628                                "source": thought.source,
629                                "model": thought.model,
630                            }),
631                        });
632                    }
633
634                    if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
635                        let proposal = Proposal {
636                            id: Uuid::new_v4().to_string(),
637                            persona_id: work.persona_id.clone(),
638                            title: proposal_title_from_thought(&thought.thinking, work.thought_count),
639                            rationale: trim_for_storage(&thought.thinking, 900),
640                            evidence_refs: vec!["internal.thought_stream".to_string()],
641                            risk: ProposalRisk::Low,
642                            status: ProposalStatus::Created,
643                            created_at: Utc::now(),
644                        };
645
646                        new_events.push(ThoughtEvent {
647                            id: Uuid::new_v4().to_string(),
648                            event_type: ThoughtEventType::ProposalCreated,
649                            persona_id: Some(work.persona_id.clone()),
650                            swarm_id: work.swarm_id.clone(),
651                            timestamp: Utc::now(),
652                            payload: json!({
653                                "proposal_id": proposal.id,
654                                "title": proposal.title,
655                                "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
656                                "source": thought.source,
657                                "model": thought.model,
658                            }),
659                        });
660
661                        new_proposals.push(proposal);
662                    }
663
664                    if work.phase == ThoughtPhase::Compress {
665                        new_snapshots.push(MemorySnapshot {
666                            id: Uuid::new_v4().to_string(),
667                            generated_at: Utc::now(),
668                            swarm_id: work.swarm_id.clone(),
669                            persona_scope: vec![work.persona_id.clone()],
670                            summary: trim_for_storage(&thought.thinking, 1_500),
671                            hot_event_count: context.len(),
672                            warm_fact_count: estimate_fact_count(&thought.thinking),
673                            cold_snapshot_count: 1,
674                            metadata: HashMap::from([
675                                (
676                                    "phase".to_string(),
677                                    serde_json::Value::String(work.phase.as_str().to_string()),
678                                ),
679                                (
680                                    "role".to_string(),
681                                    serde_json::Value::String(work.role.clone()),
682                                ),
683                                (
684                                    "source".to_string(),
685                                    serde_json::Value::String(thought.source.to_string()),
686                                ),
687                                (
688                                    "model".to_string(),
689                                    serde_json::Value::String(
690                                        thought
691                                            .model
692                                            .clone()
693                                            .unwrap_or_else(|| "fallback".to_string()),
694                                    ),
695                                ),
696                                (
697                                    "completion_tokens".to_string(),
698                                    serde_json::Value::Number(
699                                        serde_json::Number::from(
700                                            thought.completion_tokens.unwrap_or(0) as u64
701                                        ),
702                                    ),
703                                ),
704                            ]),
705                        });
706                    }
707                }
708
709                if !new_proposals.is_empty() {
710                    let mut proposal_store = proposals.write().await;
711                    for proposal in new_proposals {
712                        proposal_store.insert(proposal.id.clone(), proposal);
713                    }
714                }
715
716                for event in new_events {
717                    push_event_internal(&events, max_events, &event_tx, event).await;
718                }
719                for snapshot in new_snapshots {
720                    push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
721                }
722
723                let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
724                next_tick += interval;
725                let tick_completed = Instant::now();
726                if tick_completed > next_tick {
727                    next_tick = tick_completed;
728                }
729            }
730        });
731
732        {
733            let mut lock = self.loop_handle.lock().await;
734            *lock = Some(handle);
735        }
736
737        Ok(self.status().await)
738    }
739
740    /// Stop the perpetual cognition loop.
741    pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
742        self.running.store(false, Ordering::SeqCst);
743
744        if let Some(handle) = self.loop_handle.lock().await.take() {
745            handle.abort();
746            let _ = handle.await;
747        }
748
749        if let Some(reason_value) = reason {
750            let event = ThoughtEvent {
751                id: Uuid::new_v4().to_string(),
752                event_type: ThoughtEventType::CheckResult,
753                persona_id: None,
754                swarm_id: None,
755                timestamp: Utc::now(),
756                payload: json!({ "stopped": true, "reason": reason_value }),
757            };
758            self.push_event(event).await;
759        }
760
761        Ok(self.status().await)
762    }
763
764    /// Create a persona record.
765    pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
766        let now = Utc::now();
767        let mut personas = self.personas.write().await;
768
769        let mut parent_swarm_id = None;
770        let mut computed_depth = 0_u32;
771        let mut inherited_policy = None;
772
773        if let Some(parent_id) = req.parent_id.clone() {
774            let parent = personas
775                .get(&parent_id)
776                .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
777
778            if parent.status == PersonaStatus::Reaped {
779                return Err(anyhow!("Parent persona {} is reaped", parent_id));
780            }
781
782            parent_swarm_id = parent.identity.swarm_id.clone();
783            computed_depth = parent.identity.depth.saturating_add(1);
784            inherited_policy = Some(parent.policy.clone());
785            let branch_limit = parent.policy.max_branching_factor;
786
787            let child_count = personas
788                .values()
789                .filter(|p| {
790                    p.identity.parent_id.as_deref() == Some(parent_id.as_str())
791                        && p.status != PersonaStatus::Reaped
792                })
793                .count();
794
795            if child_count as u32 >= branch_limit {
796                return Err(anyhow!(
797                    "Parent {} reached branching limit {}",
798                    parent_id,
799                    branch_limit
800                ));
801            }
802        }
803
804        let policy = req
805            .policy
806            .clone()
807            .or(inherited_policy.clone())
808            .unwrap_or_else(|| self.default_policy.clone());
809
810        let effective_depth_limit = inherited_policy
811            .as_ref()
812            .map(|p| p.max_spawn_depth)
813            .unwrap_or(policy.max_spawn_depth);
814
815        if computed_depth > effective_depth_limit {
816            return Err(anyhow!(
817                "Spawn depth {} exceeds limit {}",
818                computed_depth,
819                effective_depth_limit
820            ));
821        }
822
823        let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
824        if personas.contains_key(&persona_id) {
825            return Err(anyhow!("Persona id already exists: {}", persona_id));
826        }
827
828        let identity = PersonaIdentity {
829            id: persona_id.clone(),
830            name: req.name,
831            role: req.role,
832            charter: req.charter,
833            swarm_id: req.swarm_id.or(parent_swarm_id),
834            parent_id: req.parent_id,
835            depth: computed_depth,
836            created_at: now,
837        };
838
839        let persona = PersonaRuntimeState {
840            identity,
841            policy,
842            status: PersonaStatus::Active,
843            thought_count: 0,
844            last_tick_at: None,
845            updated_at: now,
846        };
847
848        personas.insert(persona_id, persona.clone());
849        drop(personas);
850
851        self.push_event(ThoughtEvent {
852            id: Uuid::new_v4().to_string(),
853            event_type: ThoughtEventType::PersonaSpawned,
854            persona_id: Some(persona.identity.id.clone()),
855            swarm_id: persona.identity.swarm_id.clone(),
856            timestamp: now,
857            payload: json!({
858                "name": persona.identity.name,
859                "role": persona.identity.role,
860                "depth": persona.identity.depth,
861            }),
862        })
863        .await;
864
865        Ok(persona)
866    }
867
868    /// Spawn a child persona under an existing parent.
869    pub async fn spawn_child(
870        &self,
871        parent_id: &str,
872        req: SpawnPersonaRequest,
873    ) -> Result<PersonaRuntimeState> {
874        let request = CreatePersonaRequest {
875            persona_id: req.persona_id,
876            name: req.name,
877            role: req.role,
878            charter: req.charter,
879            swarm_id: req.swarm_id,
880            parent_id: Some(parent_id.to_string()),
881            policy: req.policy,
882        };
883        self.create_persona(request).await
884    }
885
886    /// Reap one persona or the full descendant tree.
887    pub async fn reap_persona(
888        &self,
889        persona_id: &str,
890        req: ReapPersonaRequest,
891    ) -> Result<ReapPersonaResponse> {
892        let cascade = req.cascade.unwrap_or(false);
893        let now = Utc::now();
894
895        let mut personas = self.personas.write().await;
896        if !personas.contains_key(persona_id) {
897            return Err(anyhow!("Persona not found: {}", persona_id));
898        }
899
900        let mut reaped_ids = vec![persona_id.to_string()];
901        if cascade {
902            let mut idx = 0usize;
903            while idx < reaped_ids.len() {
904                let current = reaped_ids[idx].clone();
905                let children: Vec<String> = personas
906                    .values()
907                    .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
908                    .map(|p| p.identity.id.clone())
909                    .collect();
910                for child in children {
911                    if !reaped_ids.iter().any(|existing| existing == &child) {
912                        reaped_ids.push(child);
913                    }
914                }
915                idx += 1;
916            }
917        }
918
919        for id in &reaped_ids {
920            if let Some(persona) = personas.get_mut(id) {
921                persona.status = PersonaStatus::Reaped;
922                persona.updated_at = now;
923            }
924        }
925        drop(personas);
926
927        for id in &reaped_ids {
928            self.push_event(ThoughtEvent {
929                id: Uuid::new_v4().to_string(),
930                event_type: ThoughtEventType::PersonaReaped,
931                persona_id: Some(id.clone()),
932                swarm_id: None,
933                timestamp: now,
934                payload: json!({
935                    "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
936                    "cascade": cascade,
937                }),
938            })
939            .await;
940        }
941
942        Ok(ReapPersonaResponse {
943            count: reaped_ids.len(),
944            reaped_ids,
945        })
946    }
947
948    /// Get latest memory snapshot, if any.
949    pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
950        self.snapshots.read().await.back().cloned()
951    }
952
953    /// Build lineage graph from current persona state.
954    pub async fn lineage_graph(&self) -> LineageGraph {
955        let personas = self.personas.read().await;
956        let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
957        let mut roots = Vec::new();
958        let mut total_edges = 0usize;
959
960        for persona in personas.values() {
961            if let Some(parent_id) = persona.identity.parent_id.clone() {
962                children_by_parent
963                    .entry(parent_id)
964                    .or_default()
965                    .push(persona.identity.id.clone());
966                total_edges = total_edges.saturating_add(1);
967            } else {
968                roots.push(persona.identity.id.clone());
969            }
970        }
971
972        let mut nodes: Vec<LineageNode> = personas
973            .values()
974            .map(|persona| {
975                let mut children = children_by_parent
976                    .get(&persona.identity.id)
977                    .cloned()
978                    .unwrap_or_default();
979                children.sort();
980
981                LineageNode {
982                    persona_id: persona.identity.id.clone(),
983                    parent_id: persona.identity.parent_id.clone(),
984                    children,
985                    depth: persona.identity.depth,
986                    status: persona.status,
987                }
988            })
989            .collect();
990
991        nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
992        roots.sort();
993
994        LineageGraph {
995            nodes,
996            roots,
997            total_edges,
998        }
999    }
1000
1001    /// Return a summary status.
1002    pub async fn status(&self) -> CognitionStatus {
1003        let personas = self.personas.read().await;
1004        let events = self.events.read().await;
1005        let snapshots = self.snapshots.read().await;
1006        let started_at = *self.started_at.read().await;
1007        let last_tick_at = *self.last_tick_at.read().await;
1008        let loop_interval_ms = *self.loop_interval_ms.read().await;
1009
1010        let active_persona_count = personas
1011            .values()
1012            .filter(|p| p.status == PersonaStatus::Active)
1013            .count();
1014
1015        CognitionStatus {
1016            enabled: self.enabled,
1017            running: self.running.load(Ordering::SeqCst),
1018            loop_interval_ms,
1019            started_at,
1020            last_tick_at,
1021            persona_count: personas.len(),
1022            active_persona_count,
1023            events_buffered: events.len(),
1024            snapshots_buffered: snapshots.len(),
1025        }
1026    }
1027
1028    async fn push_event(&self, event: ThoughtEvent) {
1029        push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1030    }
1031}
1032
1033async fn push_event_internal(
1034    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1035    max_events: usize,
1036    event_tx: &broadcast::Sender<ThoughtEvent>,
1037    event: ThoughtEvent,
1038) {
1039    {
1040        let mut lock = events.write().await;
1041        lock.push_back(event.clone());
1042        while lock.len() > max_events {
1043            lock.pop_front();
1044        }
1045    }
1046    let _ = event_tx.send(event);
1047}
1048
1049async fn push_snapshot_internal(
1050    snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1051    max_snapshots: usize,
1052    snapshot: MemorySnapshot,
1053) {
1054    let mut lock = snapshots.write().await;
1055    lock.push_back(snapshot);
1056    while lock.len() > max_snapshots {
1057        lock.pop_front();
1058    }
1059}
1060
1061async fn recent_persona_context(
1062    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1063    persona_id: &str,
1064    limit: usize,
1065) -> Vec<ThoughtEvent> {
1066    let lock = events.read().await;
1067    let mut selected: Vec<ThoughtEvent> = lock
1068        .iter()
1069        .rev()
1070        .filter(|event| {
1071            event.persona_id.as_deref() == Some(persona_id)
1072                || (event.persona_id.is_none()
1073                    && matches!(
1074                        event.event_type,
1075                        ThoughtEventType::CheckResult
1076                            | ThoughtEventType::ProposalCreated
1077                            | ThoughtEventType::SnapshotCompressed
1078                    ))
1079        })
1080        .take(limit)
1081        .cloned()
1082        .collect();
1083    selected.reverse();
1084    selected
1085}
1086
1087async fn generate_phase_thought(
1088    thinker: Option<&ThinkerClient>,
1089    work: &ThoughtWorkItem,
1090    context: &[ThoughtEvent],
1091) -> ThoughtResult {
1092    let started_at = Instant::now();
1093    if let Some(client) = thinker {
1094        let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1095        match client.think(&system_prompt, &user_prompt).await {
1096            Ok(output) => {
1097                let thinking = trim_for_storage(&output.text, 2_000);
1098                if !thinking.is_empty() {
1099                    return ThoughtResult {
1100                        source: "model",
1101                        model: Some(output.model),
1102                        finish_reason: output.finish_reason,
1103                        thinking,
1104                        prompt_tokens: output.prompt_tokens,
1105                        completion_tokens: output.completion_tokens,
1106                        total_tokens: output.total_tokens,
1107                        latency_ms: started_at.elapsed().as_millis(),
1108                        error: None,
1109                    };
1110                }
1111            }
1112            Err(error) => {
1113                return ThoughtResult {
1114                    source: "fallback",
1115                    model: None,
1116                    finish_reason: None,
1117                    thinking: fallback_phase_text(work, context),
1118                    prompt_tokens: None,
1119                    completion_tokens: None,
1120                    total_tokens: None,
1121                    latency_ms: started_at.elapsed().as_millis(),
1122                    error: Some(error.to_string()),
1123                };
1124            }
1125        }
1126    }
1127
1128    ThoughtResult {
1129        source: "fallback",
1130        model: None,
1131        finish_reason: None,
1132        thinking: fallback_phase_text(work, context),
1133        prompt_tokens: None,
1134        completion_tokens: None,
1135        total_tokens: None,
1136        latency_ms: started_at.elapsed().as_millis(),
1137        error: None,
1138    }
1139}
1140
1141fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1142    let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1143Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1144Focus on reasoning quality, uncertainty, and actionable checks."
1145        .to_string();
1146
1147    let context_lines = if context.is_empty() {
1148        "none".to_string()
1149    } else {
1150        context
1151            .iter()
1152            .map(format_context_event)
1153            .collect::<Vec<_>>()
1154            .join("\n")
1155    };
1156
1157    let phase_instruction = match work.phase {
1158        ThoughtPhase::Observe => {
1159            "Observe current state. Identify 1-3 concrete signals and one uncertainty."
1160        }
1161        ThoughtPhase::Reflect => {
1162            "Reflect on observed signals. Produce one hypothesis, rationale, and risk note."
1163        }
1164        ThoughtPhase::Test => {
1165            "Design and evaluate one check. Include expected result and evidence quality."
1166        }
1167        ThoughtPhase::Compress => {
1168            "Compress the latest cognition into a short state summary and retained facts."
1169        }
1170    };
1171
1172    let user_prompt = format!(
1173        "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
1174        phase = work.phase.as_str(),
1175        persona_id = work.persona_id,
1176        persona_name = work.persona_name,
1177        role = work.role,
1178        charter = work.charter,
1179        thought_count = work.thought_count,
1180        context = context_lines,
1181        instruction = phase_instruction
1182    );
1183
1184    (system_prompt, user_prompt)
1185}
1186
1187fn format_context_event(event: &ThoughtEvent) -> String {
1188    let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
1189    format!(
1190        "{} {} {}",
1191        event.event_type.as_str(),
1192        event.timestamp.to_rfc3339(),
1193        trim_for_storage(&payload, 220)
1194    )
1195}
1196
1197fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
1198    let charter = trim_for_storage(&work.charter, 180);
1199    let context_summary = fallback_context_summary(context);
1200    let thought = match work.phase {
1201        ThoughtPhase::Observe => format!(
1202            "Observation {}: I am monitoring this codebase with a business-first reliability lens. \
1203Role: {}. Charter focus: {}. {} Next focus: identify customer-impacting incidents, degraded flows, \
1204and recent operational regressions before they become outages.",
1205            work.thought_count, work.role, charter, context_summary
1206        ),
1207        ThoughtPhase::Reflect => format!(
1208            "Reflection {}: Based on current signals, the most plausible risk areas are runtime stability, \
1209deployment safety, and dependency/provider availability. {} Business impact if unresolved: service downtime, \
1210SLA breaches, and loss of customer trust. Confidence: low-to-medium until additional checks are gathered.",
1211            work.thought_count, context_summary
1212        ),
1213        ThoughtPhase::Test => format!(
1214            "Check plan {}: run targeted verification on service health, recent errors, and change history. \
1215{} Expected evidence: clear pass/fail indicators for each suspected fault domain. Decision rule: escalate \
1216immediately on repeated failures in customer-facing paths.",
1217            work.thought_count, context_summary
1218        ),
1219        ThoughtPhase::Compress => format!(
1220            "Operational summary {}: system remains under active reliability monitoring with business-priority focus. \
1221{} Immediate next actions: keep triage tight, capture repeatable diagnostics, and convert confirmed findings into \
1222concrete remediation steps for engineering and operations.",
1223            work.thought_count, context_summary
1224        ),
1225    };
1226    trim_for_storage(&thought, 1_200)
1227}
1228
1229fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
1230    if context.is_empty() {
1231        return "No prior events recorded yet.".to_string();
1232    }
1233
1234    let mut latest_error: Option<String> = None;
1235    let mut latest_proposal: Option<String> = None;
1236    let mut latest_check: Option<String> = None;
1237
1238    for event in context.iter().rev() {
1239        if latest_error.is_none()
1240            && let Some(error) = event.payload.get("error").and_then(serde_json::Value::as_str)
1241            && !error.trim().is_empty()
1242        {
1243            latest_error = Some(trim_for_storage(error, 140));
1244        }
1245
1246        if latest_proposal.is_none()
1247            && event.event_type == ThoughtEventType::ProposalCreated
1248            && let Some(title) = event.payload.get("title").and_then(serde_json::Value::as_str)
1249            && !title.trim().is_empty()
1250        {
1251            latest_proposal = Some(trim_for_storage(title, 120));
1252        }
1253
1254        if latest_check.is_none()
1255            && event.event_type == ThoughtEventType::CheckResult
1256            && let Some(result) = event
1257                .payload
1258                .get("result_excerpt")
1259                .and_then(serde_json::Value::as_str)
1260            && !result.trim().is_empty()
1261        {
1262            latest_check = Some(trim_for_storage(result, 140));
1263        }
1264
1265        if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
1266            break;
1267        }
1268    }
1269
1270    let mut lines = vec![format!("{} recent cognition events are available.", context.len())];
1271    if let Some(error) = latest_error {
1272        lines.push(format!("Latest error signal: {}.", error));
1273    }
1274    if let Some(proposal) = latest_proposal {
1275        lines.push(format!("Recent proposal: {}.", proposal));
1276    }
1277    if let Some(check) = latest_check {
1278        lines.push(format!("Recent check: {}.", check));
1279    }
1280    lines.join(" ")
1281}
1282
1283fn trim_for_storage(input: &str, max_chars: usize) -> String {
1284    if input.chars().count() <= max_chars {
1285        return input.trim().to_string();
1286    }
1287    let mut trimmed = String::with_capacity(max_chars + 8);
1288    for ch in input.chars().take(max_chars) {
1289        trimmed.push(ch);
1290    }
1291    trimmed.push_str("...");
1292    trimmed.trim().to_string()
1293}
1294
1295fn estimate_fact_count(text: &str) -> usize {
1296    let sentence_count = text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
1297    sentence_count.clamp(1, 12)
1298}
1299
1300fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
1301    let first_line = thought
1302        .lines()
1303        .find(|line| !line.trim().is_empty())
1304        .unwrap_or("proposal");
1305    let compact = first_line
1306        .replace(['\t', '\r', '\n'], " ")
1307        .split_whitespace()
1308        .collect::<Vec<_>>()
1309        .join(" ");
1310    let trimmed = trim_for_storage(&compact, 72);
1311    if trimmed.is_empty() {
1312        format!("proposal-{}", thought_count)
1313    } else {
1314        trimmed
1315    }
1316}
1317
1318fn default_seed_persona() -> CreatePersonaRequest {
1319    CreatePersonaRequest {
1320        persona_id: Some("root-thinker".to_string()),
1321        name: "root-thinker".to_string(),
1322        role: "orchestrator".to_string(),
1323        charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
1324            .to_string(),
1325        swarm_id: Some("swarm-core".to_string()),
1326        parent_id: None,
1327        policy: None,
1328    }
1329}
1330
1331fn normalize_thinker_endpoint(base_url: &str) -> String {
1332    let trimmed = base_url.trim().trim_end_matches('/');
1333    if trimmed.ends_with("/chat/completions") {
1334        return trimmed.to_string();
1335    }
1336    if trimmed.is_empty() {
1337        return "http://127.0.0.1:11434/v1/chat/completions".to_string();
1338    }
1339    format!("{}/chat/completions", trimmed)
1340}
1341
1342fn env_bool(name: &str, default: bool) -> bool {
1343    std::env::var(name)
1344        .ok()
1345        .and_then(|v| match v.to_ascii_lowercase().as_str() {
1346            "1" | "true" | "yes" | "on" => Some(true),
1347            "0" | "false" | "no" | "off" => Some(false),
1348            _ => None,
1349        })
1350        .unwrap_or(default)
1351}
1352
1353fn env_f32(name: &str, default: f32) -> f32 {
1354    std::env::var(name)
1355        .ok()
1356        .and_then(|v| v.parse::<f32>().ok())
1357        .unwrap_or(default)
1358}
1359
1360fn env_u64(name: &str, default: u64) -> u64 {
1361    std::env::var(name)
1362        .ok()
1363        .and_then(|v| v.parse::<u64>().ok())
1364        .unwrap_or(default)
1365}
1366
1367fn env_u32(name: &str, default: u32) -> u32 {
1368    std::env::var(name)
1369        .ok()
1370        .and_then(|v| v.parse::<u32>().ok())
1371        .unwrap_or(default)
1372}
1373
1374fn env_usize(name: &str, default: usize) -> usize {
1375    std::env::var(name)
1376        .ok()
1377        .and_then(|v| v.parse::<usize>().ok())
1378        .unwrap_or(default)
1379}
1380
1381#[cfg(test)]
1382mod tests {
1383    use super::*;
1384
1385    fn test_runtime() -> CognitionRuntime {
1386        CognitionRuntime::new_with_options(CognitionRuntimeOptions {
1387            enabled: true,
1388            loop_interval_ms: 25,
1389            max_events: 256,
1390            max_snapshots: 32,
1391            default_policy: PersonaPolicy {
1392                max_spawn_depth: 2,
1393                max_branching_factor: 2,
1394                token_credits_per_minute: 1_000,
1395                cpu_credits_per_minute: 1_000,
1396                idle_ttl_secs: 300,
1397                share_memory: false,
1398            },
1399        })
1400    }
1401
1402    #[tokio::test]
1403    async fn create_spawn_and_lineage_work() {
1404        let runtime = test_runtime();
1405
1406        let root = runtime
1407            .create_persona(CreatePersonaRequest {
1408                persona_id: Some("root".to_string()),
1409                name: "root".to_string(),
1410                role: "orchestrator".to_string(),
1411                charter: "coordinate".to_string(),
1412                swarm_id: Some("swarm-a".to_string()),
1413                parent_id: None,
1414                policy: None,
1415            })
1416            .await
1417            .expect("root should be created");
1418
1419        assert_eq!(root.identity.depth, 0);
1420
1421        let child = runtime
1422            .spawn_child(
1423                "root",
1424                SpawnPersonaRequest {
1425                    persona_id: Some("child-1".to_string()),
1426                    name: "child-1".to_string(),
1427                    role: "analyst".to_string(),
1428                    charter: "analyze".to_string(),
1429                    swarm_id: None,
1430                    policy: None,
1431                },
1432            )
1433            .await
1434            .expect("child should spawn");
1435
1436        assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
1437        assert_eq!(child.identity.depth, 1);
1438
1439        let lineage = runtime.lineage_graph().await;
1440        assert_eq!(lineage.total_edges, 1);
1441        assert_eq!(lineage.roots, vec!["root".to_string()]);
1442    }
1443
1444    #[tokio::test]
1445    async fn branching_and_depth_limits_are_enforced() {
1446        let runtime = test_runtime();
1447
1448        runtime
1449            .create_persona(CreatePersonaRequest {
1450                persona_id: Some("root".to_string()),
1451                name: "root".to_string(),
1452                role: "orchestrator".to_string(),
1453                charter: "coordinate".to_string(),
1454                swarm_id: Some("swarm-a".to_string()),
1455                parent_id: None,
1456                policy: None,
1457            })
1458            .await
1459            .expect("root should be created");
1460
1461        runtime
1462            .spawn_child(
1463                "root",
1464                SpawnPersonaRequest {
1465                    persona_id: Some("c1".to_string()),
1466                    name: "c1".to_string(),
1467                    role: "worker".to_string(),
1468                    charter: "run".to_string(),
1469                    swarm_id: None,
1470                    policy: None,
1471                },
1472            )
1473            .await
1474            .expect("first child should spawn");
1475
1476        runtime
1477            .spawn_child(
1478                "root",
1479                SpawnPersonaRequest {
1480                    persona_id: Some("c2".to_string()),
1481                    name: "c2".to_string(),
1482                    role: "worker".to_string(),
1483                    charter: "run".to_string(),
1484                    swarm_id: None,
1485                    policy: None,
1486                },
1487            )
1488            .await
1489            .expect("second child should spawn");
1490
1491        let third_child = runtime
1492            .spawn_child(
1493                "root",
1494                SpawnPersonaRequest {
1495                    persona_id: Some("c3".to_string()),
1496                    name: "c3".to_string(),
1497                    role: "worker".to_string(),
1498                    charter: "run".to_string(),
1499                    swarm_id: None,
1500                    policy: None,
1501                },
1502            )
1503            .await;
1504        assert!(third_child.is_err());
1505
1506        runtime
1507            .spawn_child(
1508                "c1",
1509                SpawnPersonaRequest {
1510                    persona_id: Some("c1-1".to_string()),
1511                    name: "c1-1".to_string(),
1512                    role: "worker".to_string(),
1513                    charter: "run".to_string(),
1514                    swarm_id: None,
1515                    policy: None,
1516                },
1517            )
1518            .await
1519            .expect("depth 2 should be allowed");
1520
1521        let depth_violation = runtime
1522            .spawn_child(
1523                "c1-1",
1524                SpawnPersonaRequest {
1525                    persona_id: Some("c1-1-1".to_string()),
1526                    name: "c1-1-1".to_string(),
1527                    role: "worker".to_string(),
1528                    charter: "run".to_string(),
1529                    swarm_id: None,
1530                    policy: None,
1531                },
1532            )
1533            .await;
1534        assert!(depth_violation.is_err());
1535    }
1536
1537    #[tokio::test]
1538    async fn start_stop_updates_runtime_status() {
1539        let runtime = test_runtime();
1540
1541        runtime
1542            .start(Some(StartCognitionRequest {
1543                loop_interval_ms: Some(10),
1544                seed_persona: Some(CreatePersonaRequest {
1545                    persona_id: Some("seed".to_string()),
1546                    name: "seed".to_string(),
1547                    role: "watcher".to_string(),
1548                    charter: "observe".to_string(),
1549                    swarm_id: Some("swarm-seed".to_string()),
1550                    parent_id: None,
1551                    policy: None,
1552                }),
1553            }))
1554            .await
1555            .expect("runtime should start");
1556
1557        tokio::time::sleep(Duration::from_millis(60)).await;
1558        let running_status = runtime.status().await;
1559        assert!(running_status.running);
1560        assert!(running_status.events_buffered > 0);
1561
1562        runtime
1563            .stop(Some("test".to_string()))
1564            .await
1565            .expect("runtime should stop");
1566        let stopped_status = runtime.status().await;
1567        assert!(!stopped_status.running);
1568    }
1569}