Skip to main content

codetether_agent/cognition/
mod.rs

1//! Perpetual cognition runtime for persona swarms.
2//!
3//! Phase 0 scope:
4//! - Contract types for personas, thought events, proposals, and snapshots
5//! - In-memory runtime manager for lifecycle + lineage
6//! - Feature-flagged perpetual loop (no external execution side effects)
7
8mod thinker;
9
10#[cfg(feature = "functiongemma")]
11pub mod tool_router;
12
13pub use thinker::{
14    CandleDevicePreference, ThinkerBackend, ThinkerClient, ThinkerConfig, ThinkerOutput,
15};
16
17use anyhow::{Result, anyhow};
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::Duration;
25use tokio::sync::{Mutex, RwLock, broadcast};
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use uuid::Uuid;
29
30/// Persona execution status.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum PersonaStatus {
34    Active,
35    Idle,
36    Reaped,
37    Error,
38}
39
40/// Policy boundaries for a persona.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct PersonaPolicy {
43    pub max_spawn_depth: u32,
44    pub max_branching_factor: u32,
45    pub token_credits_per_minute: u32,
46    pub cpu_credits_per_minute: u32,
47    pub idle_ttl_secs: u64,
48    pub share_memory: bool,
49}
50
51impl Default for PersonaPolicy {
52    fn default() -> Self {
53        Self {
54            max_spawn_depth: 4,
55            max_branching_factor: 4,
56            token_credits_per_minute: 20_000,
57            cpu_credits_per_minute: 10_000,
58            idle_ttl_secs: 3_600,
59            share_memory: false,
60        }
61    }
62}
63
64/// Identity contract for a persona.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct PersonaIdentity {
67    pub id: String,
68    pub name: String,
69    pub role: String,
70    pub charter: String,
71    pub swarm_id: Option<String>,
72    pub parent_id: Option<String>,
73    pub depth: u32,
74    pub created_at: DateTime<Utc>,
75}
76
77/// Full runtime state for a persona.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct PersonaRuntimeState {
80    pub identity: PersonaIdentity,
81    pub policy: PersonaPolicy,
82    pub status: PersonaStatus,
83    pub thought_count: u64,
84    pub last_tick_at: Option<DateTime<Utc>>,
85    pub updated_at: DateTime<Utc>,
86}
87
88/// Proposal risk level.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(rename_all = "snake_case")]
91pub enum ProposalRisk {
92    Low,
93    Medium,
94    High,
95    Critical,
96}
97
98/// Proposal status.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub enum ProposalStatus {
102    Created,
103    Verified,
104    Rejected,
105    Executed,
106}
107
108/// Proposal contract (think first, execute through gates).
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct Proposal {
111    pub id: String,
112    pub persona_id: String,
113    pub title: String,
114    pub rationale: String,
115    pub evidence_refs: Vec<String>,
116    pub risk: ProposalRisk,
117    pub status: ProposalStatus,
118    pub created_at: DateTime<Utc>,
119}
120
121/// Thought/event types emitted by the cognition loop.
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
123#[serde(rename_all = "snake_case")]
124pub enum ThoughtEventType {
125    ThoughtGenerated,
126    HypothesisRaised,
127    CheckRequested,
128    CheckResult,
129    ProposalCreated,
130    ProposalVerified,
131    ProposalRejected,
132    ActionExecuted,
133    PersonaSpawned,
134    PersonaReaped,
135    SnapshotCompressed,
136}
137
138impl ThoughtEventType {
139    fn as_str(&self) -> &'static str {
140        match self {
141            Self::ThoughtGenerated => "thought_generated",
142            Self::HypothesisRaised => "hypothesis_raised",
143            Self::CheckRequested => "check_requested",
144            Self::CheckResult => "check_result",
145            Self::ProposalCreated => "proposal_created",
146            Self::ProposalVerified => "proposal_verified",
147            Self::ProposalRejected => "proposal_rejected",
148            Self::ActionExecuted => "action_executed",
149            Self::PersonaSpawned => "persona_spawned",
150            Self::PersonaReaped => "persona_reaped",
151            Self::SnapshotCompressed => "snapshot_compressed",
152        }
153    }
154}
155
156/// Streamable thought event contract.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ThoughtEvent {
159    pub id: String,
160    pub event_type: ThoughtEventType,
161    pub persona_id: Option<String>,
162    pub swarm_id: Option<String>,
163    pub timestamp: DateTime<Utc>,
164    pub payload: serde_json::Value,
165}
166
167/// Distilled memory snapshot contract.
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct MemorySnapshot {
170    pub id: String,
171    pub generated_at: DateTime<Utc>,
172    pub swarm_id: Option<String>,
173    pub persona_scope: Vec<String>,
174    pub summary: String,
175    pub hot_event_count: usize,
176    pub warm_fact_count: usize,
177    pub cold_snapshot_count: usize,
178    pub metadata: HashMap<String, serde_json::Value>,
179}
180
181/// Request payload for creating a persona.
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct CreatePersonaRequest {
184    pub persona_id: Option<String>,
185    pub name: String,
186    pub role: String,
187    pub charter: String,
188    pub swarm_id: Option<String>,
189    pub parent_id: Option<String>,
190    pub policy: Option<PersonaPolicy>,
191}
192
193/// Request payload for spawning a child persona.
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct SpawnPersonaRequest {
196    pub persona_id: Option<String>,
197    pub name: String,
198    pub role: String,
199    pub charter: String,
200    pub swarm_id: Option<String>,
201    pub policy: Option<PersonaPolicy>,
202}
203
204/// Request payload for reaping persona(s).
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct ReapPersonaRequest {
207    pub cascade: Option<bool>,
208    pub reason: Option<String>,
209}
210
211/// Start-control payload for perpetual cognition.
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct StartCognitionRequest {
214    pub loop_interval_ms: Option<u64>,
215    pub seed_persona: Option<CreatePersonaRequest>,
216}
217
218/// Stop-control payload for perpetual cognition.
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct StopCognitionRequest {
221    pub reason: Option<String>,
222}
223
224/// Start/stop response status.
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct CognitionStatus {
227    pub enabled: bool,
228    pub running: bool,
229    pub loop_interval_ms: u64,
230    pub started_at: Option<DateTime<Utc>>,
231    pub last_tick_at: Option<DateTime<Utc>>,
232    pub persona_count: usize,
233    pub active_persona_count: usize,
234    pub events_buffered: usize,
235    pub snapshots_buffered: usize,
236}
237
238/// Reap response.
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct ReapPersonaResponse {
241    pub reaped_ids: Vec<String>,
242    pub count: usize,
243}
244
245/// Lineage node response.
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct LineageNode {
248    pub persona_id: String,
249    pub parent_id: Option<String>,
250    pub children: Vec<String>,
251    pub depth: u32,
252    pub status: PersonaStatus,
253}
254
255/// Lineage graph response.
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct LineageGraph {
258    pub nodes: Vec<LineageNode>,
259    pub roots: Vec<String>,
260    pub total_edges: usize,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264enum ThoughtPhase {
265    Observe,
266    Reflect,
267    Test,
268    Compress,
269}
270
271impl ThoughtPhase {
272    fn from_thought_count(thought_count: u64) -> Self {
273        match thought_count % 4 {
274            1 => Self::Observe,
275            2 => Self::Reflect,
276            3 => Self::Test,
277            _ => Self::Compress,
278        }
279    }
280
281    fn as_str(&self) -> &'static str {
282        match self {
283            Self::Observe => "observe",
284            Self::Reflect => "reflect",
285            Self::Test => "test",
286            Self::Compress => "compress",
287        }
288    }
289
290    fn event_type(&self) -> ThoughtEventType {
291        match self {
292            Self::Observe => ThoughtEventType::ThoughtGenerated,
293            Self::Reflect => ThoughtEventType::HypothesisRaised,
294            Self::Test => ThoughtEventType::CheckRequested,
295            Self::Compress => ThoughtEventType::SnapshotCompressed,
296        }
297    }
298}
299
300#[derive(Debug, Clone)]
301struct ThoughtWorkItem {
302    persona_id: String,
303    persona_name: String,
304    role: String,
305    charter: String,
306    swarm_id: Option<String>,
307    thought_count: u64,
308    phase: ThoughtPhase,
309}
310
311#[derive(Debug, Clone)]
312struct ThoughtResult {
313    source: &'static str,
314    model: Option<String>,
315    finish_reason: Option<String>,
316    thinking: String,
317    prompt_tokens: Option<u32>,
318    completion_tokens: Option<u32>,
319    total_tokens: Option<u32>,
320    latency_ms: u128,
321    error: Option<String>,
322}
323
324/// Runtime options for cognition manager.
325#[derive(Debug, Clone)]
326pub struct CognitionRuntimeOptions {
327    pub enabled: bool,
328    pub loop_interval_ms: u64,
329    pub max_events: usize,
330    pub max_snapshots: usize,
331    pub default_policy: PersonaPolicy,
332}
333
334impl Default for CognitionRuntimeOptions {
335    fn default() -> Self {
336        Self {
337            enabled: false,
338            loop_interval_ms: 2_000,
339            max_events: 2_000,
340            max_snapshots: 128,
341            default_policy: PersonaPolicy::default(),
342        }
343    }
344}
345
346/// In-memory cognition runtime for perpetual persona swarms.
347#[derive(Debug)]
348pub struct CognitionRuntime {
349    enabled: bool,
350    max_events: usize,
351    max_snapshots: usize,
352    default_policy: PersonaPolicy,
353    running: Arc<AtomicBool>,
354    loop_interval_ms: Arc<RwLock<u64>>,
355    started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
356    last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
357    personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
358    proposals: Arc<RwLock<HashMap<String, Proposal>>>,
359    events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
360    snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
361    loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
362    event_tx: broadcast::Sender<ThoughtEvent>,
363    thinker: Option<Arc<ThinkerClient>>,
364}
365
366impl CognitionRuntime {
367    /// Build runtime from environment feature flags.
368    pub fn new_from_env() -> Self {
369        let mut options = CognitionRuntimeOptions::default();
370        options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
371        options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
372        options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
373        options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
374
375        options.default_policy = PersonaPolicy {
376            max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
377            max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
378            token_credits_per_minute: env_u32(
379                "CODETETHER_COGNITION_TOKEN_CREDITS_PER_MINUTE",
380                20_000,
381            ),
382            cpu_credits_per_minute: env_u32("CODETETHER_COGNITION_CPU_CREDITS_PER_MINUTE", 10_000),
383            idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
384            share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
385        };
386
387        let thinker_config = ThinkerConfig {
388            enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
389            backend: thinker::ThinkerBackend::from_env(
390                &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
391                    .unwrap_or_else(|_| "openai_compat".to_string()),
392            ),
393            endpoint: normalize_thinker_endpoint(
394                &std::env::var("CODETETHER_COGNITION_THINKER_BASE_URL")
395                    .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string()),
396            ),
397            model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
398                .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
399            api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
400            temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
401            top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
402                .ok()
403                .and_then(|v| v.parse::<f32>().ok()),
404            max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
405            timeout_ms: env_u64("CODETETHER_COGNITION_THINKER_TIMEOUT_MS", 12_000),
406            candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH").ok(),
407            candle_tokenizer_path: std::env::var(
408                "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
409            )
410            .ok(),
411            candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
412            candle_device: thinker::CandleDevicePreference::from_env(
413                &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
414                    .unwrap_or_else(|_| "auto".to_string()),
415            ),
416            candle_cuda_ordinal: env_usize("CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL", 0),
417            candle_repeat_penalty: env_f32(
418                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
419                1.1,
420            ),
421            candle_repeat_last_n: env_usize(
422                "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
423                64,
424            ),
425            candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
426        };
427
428        Self::new_with_options_and_thinker(options, Some(thinker_config))
429    }
430
431    /// Build runtime from explicit options.
432    pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
433        Self::new_with_options_and_thinker(options, None)
434    }
435
436    fn new_with_options_and_thinker(
437        options: CognitionRuntimeOptions,
438        thinker_config: Option<ThinkerConfig>,
439    ) -> Self {
440        let (event_tx, _) = broadcast::channel(options.max_events.max(16));
441        let thinker = thinker_config.and_then(|cfg| {
442            if !cfg.enabled {
443                return None;
444            }
445            match ThinkerClient::new(cfg) {
446                Ok(client) => {
447                    tracing::info!(
448                        backend = ?client.config().backend,
449                        endpoint = %client.config().endpoint,
450                        model = %client.config().model,
451                        "Cognition thinker initialized"
452                    );
453                    Some(Arc::new(client))
454                }
455                Err(error) => {
456                    tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
457                    None
458                }
459            }
460        });
461
462        Self {
463            enabled: options.enabled,
464            max_events: options.max_events.max(32),
465            max_snapshots: options.max_snapshots.max(8),
466            default_policy: options.default_policy,
467            running: Arc::new(AtomicBool::new(false)),
468            loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
469            started_at: Arc::new(RwLock::new(None)),
470            last_tick_at: Arc::new(RwLock::new(None)),
471            personas: Arc::new(RwLock::new(HashMap::new())),
472            proposals: Arc::new(RwLock::new(HashMap::new())),
473            events: Arc::new(RwLock::new(VecDeque::new())),
474            snapshots: Arc::new(RwLock::new(VecDeque::new())),
475            loop_handle: Arc::new(Mutex::new(None)),
476            event_tx,
477            thinker,
478        }
479    }
480
481    /// Whether cognition is enabled by feature flag.
482    pub fn is_enabled(&self) -> bool {
483        self.enabled
484    }
485
486    /// Subscribe to thought events for streaming.
487    pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
488        self.event_tx.subscribe()
489    }
490
491    /// Start the perpetual cognition loop.
492    pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
493        if !self.enabled {
494            return Err(anyhow!(
495                "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
496            ));
497        }
498
499        let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
500        if let Some(request) = req {
501            if let Some(interval) = request.loop_interval_ms {
502                let mut lock = self.loop_interval_ms.write().await;
503                *lock = interval.max(100);
504            }
505            requested_seed_persona = request.seed_persona;
506        }
507
508        let has_personas = !self.personas.read().await.is_empty();
509        if !has_personas {
510            let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
511            let _ = self.create_persona(seed).await?;
512        }
513
514        if self.running.load(Ordering::SeqCst) {
515            return Ok(self.status().await);
516        }
517
518        self.running.store(true, Ordering::SeqCst);
519        {
520            let mut started = self.started_at.write().await;
521            *started = Some(Utc::now());
522        }
523
524        let running = Arc::clone(&self.running);
525        let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
526        let last_tick_at = Arc::clone(&self.last_tick_at);
527        let personas = Arc::clone(&self.personas);
528        let proposals = Arc::clone(&self.proposals);
529        let events = Arc::clone(&self.events);
530        let snapshots = Arc::clone(&self.snapshots);
531        let max_events = self.max_events;
532        let max_snapshots = self.max_snapshots;
533        let event_tx = self.event_tx.clone();
534        let thinker = self.thinker.clone();
535
536        let handle = tokio::spawn(async move {
537            let mut next_tick = Instant::now();
538            while running.load(Ordering::SeqCst) {
539                let now_instant = Instant::now();
540                if now_instant < next_tick {
541                    tokio::time::sleep_until(next_tick).await;
542                }
543                if !running.load(Ordering::SeqCst) {
544                    break;
545                }
546
547                let now = Utc::now();
548                {
549                    let mut lock = last_tick_at.write().await;
550                    *lock = Some(now);
551                }
552
553                let mut new_events = Vec::new();
554                let mut new_snapshots = Vec::new();
555                let mut new_proposals = Vec::new();
556
557                let work_items: Vec<ThoughtWorkItem> = {
558                    let mut map = personas.write().await;
559                    let mut items = Vec::new();
560                    for persona in map.values_mut() {
561                        if persona.status != PersonaStatus::Active {
562                            continue;
563                        }
564
565                        persona.thought_count = persona.thought_count.saturating_add(1);
566                        persona.last_tick_at = Some(now);
567                        persona.updated_at = now;
568                        items.push(ThoughtWorkItem {
569                            persona_id: persona.identity.id.clone(),
570                            persona_name: persona.identity.name.clone(),
571                            role: persona.identity.role.clone(),
572                            charter: persona.identity.charter.clone(),
573                            swarm_id: persona.identity.swarm_id.clone(),
574                            thought_count: persona.thought_count,
575                            phase: ThoughtPhase::from_thought_count(persona.thought_count),
576                        });
577                    }
578                    items
579                };
580
581                for work in work_items {
582                    let context = recent_persona_context(&events, &work.persona_id, 8).await;
583                    let thought = generate_phase_thought(thinker.as_deref(), &work, &context).await;
584
585                    let event_timestamp = Utc::now();
586                    new_events.push(ThoughtEvent {
587                        id: Uuid::new_v4().to_string(),
588                        event_type: work.phase.event_type(),
589                        persona_id: Some(work.persona_id.clone()),
590                        swarm_id: work.swarm_id.clone(),
591                        timestamp: event_timestamp,
592                        payload: json!({
593                            "phase": work.phase.as_str(),
594                            "thought_count": work.thought_count,
595                            "persona": {
596                                "id": work.persona_id.clone(),
597                                "name": work.persona_name.clone(),
598                                "role": work.role.clone(),
599                            },
600                            "context_event_count": context.len(),
601                            "thinking": thought.thinking.clone(),
602                            "source": thought.source,
603                            "model": thought.model.clone(),
604                            "finish_reason": thought.finish_reason.clone(),
605                            "usage": {
606                                "prompt_tokens": thought.prompt_tokens,
607                                "completion_tokens": thought.completion_tokens,
608                                "total_tokens": thought.total_tokens,
609                            },
610                            "latency_ms": thought.latency_ms,
611                            "error": thought.error.clone(),
612                        }),
613                    });
614
615                    if work.phase == ThoughtPhase::Test {
616                        new_events.push(ThoughtEvent {
617                            id: Uuid::new_v4().to_string(),
618                            event_type: ThoughtEventType::CheckResult,
619                            persona_id: Some(work.persona_id.clone()),
620                            swarm_id: work.swarm_id.clone(),
621                            timestamp: Utc::now(),
622                            payload: json!({
623                                "phase": work.phase.as_str(),
624                                "thought_count": work.thought_count,
625                                "result_excerpt": trim_for_storage(&thought.thinking, 280),
626                                "source": thought.source,
627                                "model": thought.model,
628                            }),
629                        });
630                    }
631
632                    if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
633                        let proposal = Proposal {
634                            id: Uuid::new_v4().to_string(),
635                            persona_id: work.persona_id.clone(),
636                            title: proposal_title_from_thought(
637                                &thought.thinking,
638                                work.thought_count,
639                            ),
640                            rationale: trim_for_storage(&thought.thinking, 900),
641                            evidence_refs: vec!["internal.thought_stream".to_string()],
642                            risk: ProposalRisk::Low,
643                            status: ProposalStatus::Created,
644                            created_at: Utc::now(),
645                        };
646
647                        new_events.push(ThoughtEvent {
648                            id: Uuid::new_v4().to_string(),
649                            event_type: ThoughtEventType::ProposalCreated,
650                            persona_id: Some(work.persona_id.clone()),
651                            swarm_id: work.swarm_id.clone(),
652                            timestamp: Utc::now(),
653                            payload: json!({
654                                "proposal_id": proposal.id,
655                                "title": proposal.title,
656                                "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
657                                "source": thought.source,
658                                "model": thought.model,
659                            }),
660                        });
661
662                        new_proposals.push(proposal);
663                    }
664
665                    if work.phase == ThoughtPhase::Compress {
666                        new_snapshots.push(MemorySnapshot {
667                            id: Uuid::new_v4().to_string(),
668                            generated_at: Utc::now(),
669                            swarm_id: work.swarm_id.clone(),
670                            persona_scope: vec![work.persona_id.clone()],
671                            summary: trim_for_storage(&thought.thinking, 1_500),
672                            hot_event_count: context.len(),
673                            warm_fact_count: estimate_fact_count(&thought.thinking),
674                            cold_snapshot_count: 1,
675                            metadata: HashMap::from([
676                                (
677                                    "phase".to_string(),
678                                    serde_json::Value::String(work.phase.as_str().to_string()),
679                                ),
680                                (
681                                    "role".to_string(),
682                                    serde_json::Value::String(work.role.clone()),
683                                ),
684                                (
685                                    "source".to_string(),
686                                    serde_json::Value::String(thought.source.to_string()),
687                                ),
688                                (
689                                    "model".to_string(),
690                                    serde_json::Value::String(
691                                        thought
692                                            .model
693                                            .clone()
694                                            .unwrap_or_else(|| "fallback".to_string()),
695                                    ),
696                                ),
697                                (
698                                    "completion_tokens".to_string(),
699                                    serde_json::Value::Number(serde_json::Number::from(
700                                        thought.completion_tokens.unwrap_or(0) as u64,
701                                    )),
702                                ),
703                            ]),
704                        });
705                    }
706                }
707
708                if !new_proposals.is_empty() {
709                    let mut proposal_store = proposals.write().await;
710                    for proposal in new_proposals {
711                        proposal_store.insert(proposal.id.clone(), proposal);
712                    }
713                }
714
715                for event in new_events {
716                    push_event_internal(&events, max_events, &event_tx, event).await;
717                }
718                for snapshot in new_snapshots {
719                    push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
720                }
721
722                let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
723                next_tick += interval;
724                let tick_completed = Instant::now();
725                if tick_completed > next_tick {
726                    next_tick = tick_completed;
727                }
728            }
729        });
730
731        {
732            let mut lock = self.loop_handle.lock().await;
733            *lock = Some(handle);
734        }
735
736        Ok(self.status().await)
737    }
738
739    /// Stop the perpetual cognition loop.
740    pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
741        self.running.store(false, Ordering::SeqCst);
742
743        if let Some(handle) = self.loop_handle.lock().await.take() {
744            handle.abort();
745            let _ = handle.await;
746        }
747
748        if let Some(reason_value) = reason {
749            let event = ThoughtEvent {
750                id: Uuid::new_v4().to_string(),
751                event_type: ThoughtEventType::CheckResult,
752                persona_id: None,
753                swarm_id: None,
754                timestamp: Utc::now(),
755                payload: json!({ "stopped": true, "reason": reason_value }),
756            };
757            self.push_event(event).await;
758        }
759
760        Ok(self.status().await)
761    }
762
763    /// Create a persona record.
764    pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
765        let now = Utc::now();
766        let mut personas = self.personas.write().await;
767
768        let mut parent_swarm_id = None;
769        let mut computed_depth = 0_u32;
770        let mut inherited_policy = None;
771
772        if let Some(parent_id) = req.parent_id.clone() {
773            let parent = personas
774                .get(&parent_id)
775                .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
776
777            if parent.status == PersonaStatus::Reaped {
778                return Err(anyhow!("Parent persona {} is reaped", parent_id));
779            }
780
781            parent_swarm_id = parent.identity.swarm_id.clone();
782            computed_depth = parent.identity.depth.saturating_add(1);
783            inherited_policy = Some(parent.policy.clone());
784            let branch_limit = parent.policy.max_branching_factor;
785
786            let child_count = personas
787                .values()
788                .filter(|p| {
789                    p.identity.parent_id.as_deref() == Some(parent_id.as_str())
790                        && p.status != PersonaStatus::Reaped
791                })
792                .count();
793
794            if child_count as u32 >= branch_limit {
795                return Err(anyhow!(
796                    "Parent {} reached branching limit {}",
797                    parent_id,
798                    branch_limit
799                ));
800            }
801        }
802
803        let policy = req
804            .policy
805            .clone()
806            .or(inherited_policy.clone())
807            .unwrap_or_else(|| self.default_policy.clone());
808
809        let effective_depth_limit = inherited_policy
810            .as_ref()
811            .map(|p| p.max_spawn_depth)
812            .unwrap_or(policy.max_spawn_depth);
813
814        if computed_depth > effective_depth_limit {
815            return Err(anyhow!(
816                "Spawn depth {} exceeds limit {}",
817                computed_depth,
818                effective_depth_limit
819            ));
820        }
821
822        let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
823        if personas.contains_key(&persona_id) {
824            return Err(anyhow!("Persona id already exists: {}", persona_id));
825        }
826
827        let identity = PersonaIdentity {
828            id: persona_id.clone(),
829            name: req.name,
830            role: req.role,
831            charter: req.charter,
832            swarm_id: req.swarm_id.or(parent_swarm_id),
833            parent_id: req.parent_id,
834            depth: computed_depth,
835            created_at: now,
836        };
837
838        let persona = PersonaRuntimeState {
839            identity,
840            policy,
841            status: PersonaStatus::Active,
842            thought_count: 0,
843            last_tick_at: None,
844            updated_at: now,
845        };
846
847        personas.insert(persona_id, persona.clone());
848        drop(personas);
849
850        self.push_event(ThoughtEvent {
851            id: Uuid::new_v4().to_string(),
852            event_type: ThoughtEventType::PersonaSpawned,
853            persona_id: Some(persona.identity.id.clone()),
854            swarm_id: persona.identity.swarm_id.clone(),
855            timestamp: now,
856            payload: json!({
857                "name": persona.identity.name,
858                "role": persona.identity.role,
859                "depth": persona.identity.depth,
860            }),
861        })
862        .await;
863
864        Ok(persona)
865    }
866
867    /// Spawn a child persona under an existing parent.
868    pub async fn spawn_child(
869        &self,
870        parent_id: &str,
871        req: SpawnPersonaRequest,
872    ) -> Result<PersonaRuntimeState> {
873        let request = CreatePersonaRequest {
874            persona_id: req.persona_id,
875            name: req.name,
876            role: req.role,
877            charter: req.charter,
878            swarm_id: req.swarm_id,
879            parent_id: Some(parent_id.to_string()),
880            policy: req.policy,
881        };
882        self.create_persona(request).await
883    }
884
885    /// Reap one persona or the full descendant tree.
886    pub async fn reap_persona(
887        &self,
888        persona_id: &str,
889        req: ReapPersonaRequest,
890    ) -> Result<ReapPersonaResponse> {
891        let cascade = req.cascade.unwrap_or(false);
892        let now = Utc::now();
893
894        let mut personas = self.personas.write().await;
895        if !personas.contains_key(persona_id) {
896            return Err(anyhow!("Persona not found: {}", persona_id));
897        }
898
899        let mut reaped_ids = vec![persona_id.to_string()];
900        if cascade {
901            let mut idx = 0usize;
902            while idx < reaped_ids.len() {
903                let current = reaped_ids[idx].clone();
904                let children: Vec<String> = personas
905                    .values()
906                    .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
907                    .map(|p| p.identity.id.clone())
908                    .collect();
909                for child in children {
910                    if !reaped_ids.iter().any(|existing| existing == &child) {
911                        reaped_ids.push(child);
912                    }
913                }
914                idx += 1;
915            }
916        }
917
918        for id in &reaped_ids {
919            if let Some(persona) = personas.get_mut(id) {
920                persona.status = PersonaStatus::Reaped;
921                persona.updated_at = now;
922            }
923        }
924        drop(personas);
925
926        for id in &reaped_ids {
927            self.push_event(ThoughtEvent {
928                id: Uuid::new_v4().to_string(),
929                event_type: ThoughtEventType::PersonaReaped,
930                persona_id: Some(id.clone()),
931                swarm_id: None,
932                timestamp: now,
933                payload: json!({
934                    "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
935                    "cascade": cascade,
936                }),
937            })
938            .await;
939        }
940
941        Ok(ReapPersonaResponse {
942            count: reaped_ids.len(),
943            reaped_ids,
944        })
945    }
946
947    /// Get latest memory snapshot, if any.
948    pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
949        self.snapshots.read().await.back().cloned()
950    }
951
952    /// Build lineage graph from current persona state.
953    pub async fn lineage_graph(&self) -> LineageGraph {
954        let personas = self.personas.read().await;
955        let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
956        let mut roots = Vec::new();
957        let mut total_edges = 0usize;
958
959        for persona in personas.values() {
960            if let Some(parent_id) = persona.identity.parent_id.clone() {
961                children_by_parent
962                    .entry(parent_id)
963                    .or_default()
964                    .push(persona.identity.id.clone());
965                total_edges = total_edges.saturating_add(1);
966            } else {
967                roots.push(persona.identity.id.clone());
968            }
969        }
970
971        let mut nodes: Vec<LineageNode> = personas
972            .values()
973            .map(|persona| {
974                let mut children = children_by_parent
975                    .get(&persona.identity.id)
976                    .cloned()
977                    .unwrap_or_default();
978                children.sort();
979
980                LineageNode {
981                    persona_id: persona.identity.id.clone(),
982                    parent_id: persona.identity.parent_id.clone(),
983                    children,
984                    depth: persona.identity.depth,
985                    status: persona.status,
986                }
987            })
988            .collect();
989
990        nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
991        roots.sort();
992
993        LineageGraph {
994            nodes,
995            roots,
996            total_edges,
997        }
998    }
999
1000    /// Return a summary status.
1001    pub async fn status(&self) -> CognitionStatus {
1002        let personas = self.personas.read().await;
1003        let events = self.events.read().await;
1004        let snapshots = self.snapshots.read().await;
1005        let started_at = *self.started_at.read().await;
1006        let last_tick_at = *self.last_tick_at.read().await;
1007        let loop_interval_ms = *self.loop_interval_ms.read().await;
1008
1009        let active_persona_count = personas
1010            .values()
1011            .filter(|p| p.status == PersonaStatus::Active)
1012            .count();
1013
1014        CognitionStatus {
1015            enabled: self.enabled,
1016            running: self.running.load(Ordering::SeqCst),
1017            loop_interval_ms,
1018            started_at,
1019            last_tick_at,
1020            persona_count: personas.len(),
1021            active_persona_count,
1022            events_buffered: events.len(),
1023            snapshots_buffered: snapshots.len(),
1024        }
1025    }
1026
1027    async fn push_event(&self, event: ThoughtEvent) {
1028        push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1029    }
1030}
1031
1032async fn push_event_internal(
1033    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1034    max_events: usize,
1035    event_tx: &broadcast::Sender<ThoughtEvent>,
1036    event: ThoughtEvent,
1037) {
1038    {
1039        let mut lock = events.write().await;
1040        lock.push_back(event.clone());
1041        while lock.len() > max_events {
1042            lock.pop_front();
1043        }
1044    }
1045    let _ = event_tx.send(event);
1046}
1047
1048async fn push_snapshot_internal(
1049    snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1050    max_snapshots: usize,
1051    snapshot: MemorySnapshot,
1052) {
1053    let mut lock = snapshots.write().await;
1054    lock.push_back(snapshot);
1055    while lock.len() > max_snapshots {
1056        lock.pop_front();
1057    }
1058}
1059
1060async fn recent_persona_context(
1061    events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1062    persona_id: &str,
1063    limit: usize,
1064) -> Vec<ThoughtEvent> {
1065    let lock = events.read().await;
1066    let mut selected: Vec<ThoughtEvent> = lock
1067        .iter()
1068        .rev()
1069        .filter(|event| {
1070            event.persona_id.as_deref() == Some(persona_id)
1071                || (event.persona_id.is_none()
1072                    && matches!(
1073                        event.event_type,
1074                        ThoughtEventType::CheckResult
1075                            | ThoughtEventType::ProposalCreated
1076                            | ThoughtEventType::SnapshotCompressed
1077                    ))
1078        })
1079        .take(limit)
1080        .cloned()
1081        .collect();
1082    selected.reverse();
1083    selected
1084}
1085
1086async fn generate_phase_thought(
1087    thinker: Option<&ThinkerClient>,
1088    work: &ThoughtWorkItem,
1089    context: &[ThoughtEvent],
1090) -> ThoughtResult {
1091    let started_at = Instant::now();
1092    if let Some(client) = thinker {
1093        let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1094        match client.think(&system_prompt, &user_prompt).await {
1095            Ok(output) => {
1096                let thinking = normalize_thought_output(work, context, &output.text);
1097                if !thinking.is_empty() {
1098                    return ThoughtResult {
1099                        source: "model",
1100                        model: Some(output.model),
1101                        finish_reason: output.finish_reason,
1102                        thinking,
1103                        prompt_tokens: output.prompt_tokens,
1104                        completion_tokens: output.completion_tokens,
1105                        total_tokens: output.total_tokens,
1106                        latency_ms: started_at.elapsed().as_millis(),
1107                        error: None,
1108                    };
1109                }
1110            }
1111            Err(error) => {
1112                return ThoughtResult {
1113                    source: "fallback",
1114                    model: None,
1115                    finish_reason: None,
1116                    thinking: fallback_phase_text(work, context),
1117                    prompt_tokens: None,
1118                    completion_tokens: None,
1119                    total_tokens: None,
1120                    latency_ms: started_at.elapsed().as_millis(),
1121                    error: Some(error.to_string()),
1122                };
1123            }
1124        }
1125    }
1126
1127    ThoughtResult {
1128        source: "fallback",
1129        model: None,
1130        finish_reason: None,
1131        thinking: fallback_phase_text(work, context),
1132        prompt_tokens: None,
1133        completion_tokens: None,
1134        total_tokens: None,
1135        latency_ms: started_at.elapsed().as_millis(),
1136        error: None,
1137    }
1138}
1139
1140fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1141    let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1142Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1143Write as an operational process update, not meta narration. \
1144Do not say phrases like 'I need to', 'we need to', 'I will', or describe your own reasoning process. \
1145Output concrete findings, checks, risks, and next actions."
1146        .to_string();
1147
1148    let context_lines = if context.is_empty() {
1149        "none".to_string()
1150    } else {
1151        context
1152            .iter()
1153            .map(format_context_event)
1154            .collect::<Vec<_>>()
1155            .join("\n")
1156    };
1157
1158    let phase_instruction = match work.phase {
1159        ThoughtPhase::Observe => {
1160            "Process format (exact line labels): \
1161Phase: Observe | Goal: detect current customer/business risk | \
1162Signals: <1-3 concrete signals> | \
1163Uncertainty: <one unknown that blocks confidence> | \
1164Next_Action: <one immediate operational action>."
1165        }
1166        ThoughtPhase::Reflect => {
1167            "Process format (exact line labels): \
1168Phase: Reflect | Hypothesis: <single testable hypothesis> | \
1169Rationale: <why this is likely> | \
1170Business_Risk: <customer/revenue/SLA impact> | \
1171Validation_Next_Action: <one action to confirm or falsify>."
1172        }
1173        ThoughtPhase::Test => {
1174            "Process format (exact line labels): \
1175Phase: Test | Check: <single concrete check> | \
1176Procedure: <short executable procedure> | \
1177Expected_Result: <pass/fail expectation> | \
1178Evidence_Quality: <low|medium|high with reason> | \
1179Escalation_Trigger: <when to escalate immediately>."
1180        }
1181        ThoughtPhase::Compress => {
1182            "Process format (exact line labels): \
1183Phase: Compress | State_Summary: <current state in one line> | \
1184Retained_Facts: <3 short facts separated by '; '> | \
1185Open_Risks: <up to 2 unresolved risks separated by '; '> | \
1186Next_Process_Step: <next operational step>."
1187        }
1188    };
1189
1190    let user_prompt = format!(
1191        "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
1192        phase = work.phase.as_str(),
1193        persona_id = work.persona_id,
1194        persona_name = work.persona_name,
1195        role = work.role,
1196        charter = work.charter,
1197        thought_count = work.thought_count,
1198        context = context_lines,
1199        instruction = phase_instruction
1200    );
1201
1202    (system_prompt, user_prompt)
1203}
1204
1205fn format_context_event(event: &ThoughtEvent) -> String {
1206    let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
1207    format!(
1208        "{} {} {}",
1209        event.event_type.as_str(),
1210        event.timestamp.to_rfc3339(),
1211        trim_for_storage(&payload, 220)
1212    )
1213}
1214
1215fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
1216    let charter = trim_for_storage(&work.charter, 180);
1217    let context_summary = fallback_context_summary(context);
1218    let thought = match work.phase {
1219        ThoughtPhase::Observe => format!(
1220            "Phase: Observe | Goal: detect current customer/business risk | Signals: role={}; charter_focus={}; {} | Uncertainty: live customer-impact telemetry and current incident status are incomplete. | Next_Action: run targeted health/error checks for customer-facing flows and capture failure rate baselines.",
1221            work.role, charter, context_summary
1222        ),
1223        ThoughtPhase::Reflect => format!(
1224            "Phase: Reflect | Hypothesis: current instability risk is most likely in runtime reliability and dependency availability. | Rationale: recent context indicates unresolved operational uncertainty. | Business_Risk: outages can cause SLA breach, revenue loss, and trust erosion. | Validation_Next_Action: confirm via service health trend, dependency error distribution, and rollback readiness.",
1225        ),
1226        ThoughtPhase::Test => format!(
1227            "Phase: Test | Check: verify customer-path service health against recent error spikes and release changes. | Procedure: collect latest health status, error counts, and recent deploy diffs; compare against baseline. | Expected_Result: pass if health is stable and error rate within baseline, fail otherwise. | Evidence_Quality: medium (depends on telemetry completeness). | Escalation_Trigger: escalate immediately on repeated customer-path failures or sustained elevated error rate.",
1228        ),
1229        ThoughtPhase::Compress => format!(
1230            "Phase: Compress | State_Summary: reliability monitoring active with unresolved business-impact uncertainty. | Retained_Facts: role={} ; charter_focus={} ; {} | Open_Risks: potential customer-path instability ; incomplete evidence for confident closure. | Next_Process_Step: convert latest checks into prioritized remediation tasks and verify impact reduction.",
1231            work.role, charter, context_summary
1232        ),
1233    };
1234    trim_for_storage(&thought, 1_200)
1235}
1236
1237fn normalize_thought_output(work: &ThoughtWorkItem, context: &[ThoughtEvent], raw: &str) -> String {
1238    let trimmed = trim_for_storage(raw, 2_000);
1239    if trimmed.trim().is_empty() {
1240        return fallback_phase_text(work, context);
1241    }
1242
1243    // Prefer process-labeled content if the model emitted a preamble first.
1244    if let Some(idx) = find_process_label_start(&trimmed) {
1245        let candidate = trimmed[idx..].trim();
1246        if let Some(first_line) = candidate
1247            .lines()
1248            .map(str::trim)
1249            .find(|line| !line.is_empty())
1250        {
1251            if first_line.starts_with("Phase:") && !first_line.contains('<') {
1252                return first_line.to_string();
1253            }
1254        }
1255        if !candidate.is_empty() && !candidate.contains('<') && !candidate.contains('\n') {
1256            return candidate.to_string();
1257        }
1258    }
1259
1260    let lower = trimmed.to_ascii_lowercase();
1261    let looks_meta = lower.starts_with("we need")
1262        || lower.starts_with("i need")
1263        || lower.contains("we need to")
1264        || lower.contains("i need to")
1265        || lower.contains("must output")
1266        || lower.contains("let's ")
1267        || lower.contains("we have to");
1268
1269    if looks_meta {
1270        return fallback_phase_text(work, context);
1271    }
1272
1273    trimmed
1274}
1275
1276fn find_process_label_start(text: &str) -> Option<usize> {
1277    [
1278        "Phase: Observe",
1279        "Phase: Reflect",
1280        "Phase: Test",
1281        "Phase: Compress",
1282        "Phase:",
1283    ]
1284    .iter()
1285    .filter_map(|label| text.find(label))
1286    .min()
1287}
1288
1289fn fallback_context_summary(context: &[ThoughtEvent]) -> String {
1290    if context.is_empty() {
1291        return "No prior events recorded yet.".to_string();
1292    }
1293
1294    let mut latest_error: Option<String> = None;
1295    let mut latest_proposal: Option<String> = None;
1296    let mut latest_check: Option<String> = None;
1297
1298    for event in context.iter().rev() {
1299        if latest_error.is_none()
1300            && let Some(error) = event
1301                .payload
1302                .get("error")
1303                .and_then(serde_json::Value::as_str)
1304            && !error.trim().is_empty()
1305        {
1306            latest_error = Some(trim_for_storage(error, 140));
1307        }
1308
1309        if latest_proposal.is_none()
1310            && event.event_type == ThoughtEventType::ProposalCreated
1311            && let Some(title) = event
1312                .payload
1313                .get("title")
1314                .and_then(serde_json::Value::as_str)
1315            && !title.trim().is_empty()
1316        {
1317            latest_proposal = Some(trim_for_storage(title, 120));
1318        }
1319
1320        if latest_check.is_none()
1321            && event.event_type == ThoughtEventType::CheckResult
1322            && let Some(result) = event
1323                .payload
1324                .get("result_excerpt")
1325                .and_then(serde_json::Value::as_str)
1326            && !result.trim().is_empty()
1327        {
1328            latest_check = Some(trim_for_storage(result, 140));
1329        }
1330
1331        if latest_error.is_some() && latest_proposal.is_some() && latest_check.is_some() {
1332            break;
1333        }
1334    }
1335
1336    let mut lines = vec![format!(
1337        "{} recent cognition events are available.",
1338        context.len()
1339    )];
1340    if let Some(error) = latest_error {
1341        lines.push(format!("Latest error signal: {}.", error));
1342    }
1343    if let Some(proposal) = latest_proposal {
1344        lines.push(format!("Recent proposal: {}.", proposal));
1345    }
1346    if let Some(check) = latest_check {
1347        lines.push(format!("Recent check: {}.", check));
1348    }
1349    lines.join(" ")
1350}
1351
1352fn trim_for_storage(input: &str, max_chars: usize) -> String {
1353    if input.chars().count() <= max_chars {
1354        return input.trim().to_string();
1355    }
1356    let mut trimmed = String::with_capacity(max_chars + 8);
1357    for ch in input.chars().take(max_chars) {
1358        trimmed.push(ch);
1359    }
1360    trimmed.push_str("...");
1361    trimmed.trim().to_string()
1362}
1363
1364fn estimate_fact_count(text: &str) -> usize {
1365    let sentence_count =
1366        text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
1367    sentence_count.clamp(1, 12)
1368}
1369
1370fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
1371    let first_line = thought
1372        .lines()
1373        .find(|line| !line.trim().is_empty())
1374        .unwrap_or("proposal");
1375    let compact = first_line
1376        .replace(['\t', '\r', '\n'], " ")
1377        .split_whitespace()
1378        .collect::<Vec<_>>()
1379        .join(" ");
1380    let trimmed = trim_for_storage(&compact, 72);
1381    if trimmed.is_empty() {
1382        format!("proposal-{}", thought_count)
1383    } else {
1384        trimmed
1385    }
1386}
1387
1388fn default_seed_persona() -> CreatePersonaRequest {
1389    CreatePersonaRequest {
1390        persona_id: Some("root-thinker".to_string()),
1391        name: "root-thinker".to_string(),
1392        role: "orchestrator".to_string(),
1393        charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
1394            .to_string(),
1395        swarm_id: Some("swarm-core".to_string()),
1396        parent_id: None,
1397        policy: None,
1398    }
1399}
1400
1401fn normalize_thinker_endpoint(base_url: &str) -> String {
1402    let trimmed = base_url.trim().trim_end_matches('/');
1403    if trimmed.ends_with("/chat/completions") {
1404        return trimmed.to_string();
1405    }
1406    if trimmed.is_empty() {
1407        return "http://127.0.0.1:11434/v1/chat/completions".to_string();
1408    }
1409    format!("{}/chat/completions", trimmed)
1410}
1411
1412fn env_bool(name: &str, default: bool) -> bool {
1413    std::env::var(name)
1414        .ok()
1415        .and_then(|v| match v.to_ascii_lowercase().as_str() {
1416            "1" | "true" | "yes" | "on" => Some(true),
1417            "0" | "false" | "no" | "off" => Some(false),
1418            _ => None,
1419        })
1420        .unwrap_or(default)
1421}
1422
1423fn env_f32(name: &str, default: f32) -> f32 {
1424    std::env::var(name)
1425        .ok()
1426        .and_then(|v| v.parse::<f32>().ok())
1427        .unwrap_or(default)
1428}
1429
1430fn env_u64(name: &str, default: u64) -> u64 {
1431    std::env::var(name)
1432        .ok()
1433        .and_then(|v| v.parse::<u64>().ok())
1434        .unwrap_or(default)
1435}
1436
1437fn env_u32(name: &str, default: u32) -> u32 {
1438    std::env::var(name)
1439        .ok()
1440        .and_then(|v| v.parse::<u32>().ok())
1441        .unwrap_or(default)
1442}
1443
1444fn env_usize(name: &str, default: usize) -> usize {
1445    std::env::var(name)
1446        .ok()
1447        .and_then(|v| v.parse::<usize>().ok())
1448        .unwrap_or(default)
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453    use super::*;
1454
1455    fn test_runtime() -> CognitionRuntime {
1456        CognitionRuntime::new_with_options(CognitionRuntimeOptions {
1457            enabled: true,
1458            loop_interval_ms: 25,
1459            max_events: 256,
1460            max_snapshots: 32,
1461            default_policy: PersonaPolicy {
1462                max_spawn_depth: 2,
1463                max_branching_factor: 2,
1464                token_credits_per_minute: 1_000,
1465                cpu_credits_per_minute: 1_000,
1466                idle_ttl_secs: 300,
1467                share_memory: false,
1468            },
1469        })
1470    }
1471
1472    #[tokio::test]
1473    async fn create_spawn_and_lineage_work() {
1474        let runtime = test_runtime();
1475
1476        let root = runtime
1477            .create_persona(CreatePersonaRequest {
1478                persona_id: Some("root".to_string()),
1479                name: "root".to_string(),
1480                role: "orchestrator".to_string(),
1481                charter: "coordinate".to_string(),
1482                swarm_id: Some("swarm-a".to_string()),
1483                parent_id: None,
1484                policy: None,
1485            })
1486            .await
1487            .expect("root should be created");
1488
1489        assert_eq!(root.identity.depth, 0);
1490
1491        let child = runtime
1492            .spawn_child(
1493                "root",
1494                SpawnPersonaRequest {
1495                    persona_id: Some("child-1".to_string()),
1496                    name: "child-1".to_string(),
1497                    role: "analyst".to_string(),
1498                    charter: "analyze".to_string(),
1499                    swarm_id: None,
1500                    policy: None,
1501                },
1502            )
1503            .await
1504            .expect("child should spawn");
1505
1506        assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
1507        assert_eq!(child.identity.depth, 1);
1508
1509        let lineage = runtime.lineage_graph().await;
1510        assert_eq!(lineage.total_edges, 1);
1511        assert_eq!(lineage.roots, vec!["root".to_string()]);
1512    }
1513
1514    #[tokio::test]
1515    async fn branching_and_depth_limits_are_enforced() {
1516        let runtime = test_runtime();
1517
1518        runtime
1519            .create_persona(CreatePersonaRequest {
1520                persona_id: Some("root".to_string()),
1521                name: "root".to_string(),
1522                role: "orchestrator".to_string(),
1523                charter: "coordinate".to_string(),
1524                swarm_id: Some("swarm-a".to_string()),
1525                parent_id: None,
1526                policy: None,
1527            })
1528            .await
1529            .expect("root should be created");
1530
1531        runtime
1532            .spawn_child(
1533                "root",
1534                SpawnPersonaRequest {
1535                    persona_id: Some("c1".to_string()),
1536                    name: "c1".to_string(),
1537                    role: "worker".to_string(),
1538                    charter: "run".to_string(),
1539                    swarm_id: None,
1540                    policy: None,
1541                },
1542            )
1543            .await
1544            .expect("first child should spawn");
1545
1546        runtime
1547            .spawn_child(
1548                "root",
1549                SpawnPersonaRequest {
1550                    persona_id: Some("c2".to_string()),
1551                    name: "c2".to_string(),
1552                    role: "worker".to_string(),
1553                    charter: "run".to_string(),
1554                    swarm_id: None,
1555                    policy: None,
1556                },
1557            )
1558            .await
1559            .expect("second child should spawn");
1560
1561        let third_child = runtime
1562            .spawn_child(
1563                "root",
1564                SpawnPersonaRequest {
1565                    persona_id: Some("c3".to_string()),
1566                    name: "c3".to_string(),
1567                    role: "worker".to_string(),
1568                    charter: "run".to_string(),
1569                    swarm_id: None,
1570                    policy: None,
1571                },
1572            )
1573            .await;
1574        assert!(third_child.is_err());
1575
1576        runtime
1577            .spawn_child(
1578                "c1",
1579                SpawnPersonaRequest {
1580                    persona_id: Some("c1-1".to_string()),
1581                    name: "c1-1".to_string(),
1582                    role: "worker".to_string(),
1583                    charter: "run".to_string(),
1584                    swarm_id: None,
1585                    policy: None,
1586                },
1587            )
1588            .await
1589            .expect("depth 2 should be allowed");
1590
1591        let depth_violation = runtime
1592            .spawn_child(
1593                "c1-1",
1594                SpawnPersonaRequest {
1595                    persona_id: Some("c1-1-1".to_string()),
1596                    name: "c1-1-1".to_string(),
1597                    role: "worker".to_string(),
1598                    charter: "run".to_string(),
1599                    swarm_id: None,
1600                    policy: None,
1601                },
1602            )
1603            .await;
1604        assert!(depth_violation.is_err());
1605    }
1606
1607    #[tokio::test]
1608    async fn start_stop_updates_runtime_status() {
1609        let runtime = test_runtime();
1610
1611        runtime
1612            .start(Some(StartCognitionRequest {
1613                loop_interval_ms: Some(10),
1614                seed_persona: Some(CreatePersonaRequest {
1615                    persona_id: Some("seed".to_string()),
1616                    name: "seed".to_string(),
1617                    role: "watcher".to_string(),
1618                    charter: "observe".to_string(),
1619                    swarm_id: Some("swarm-seed".to_string()),
1620                    parent_id: None,
1621                    policy: None,
1622                }),
1623            }))
1624            .await
1625            .expect("runtime should start");
1626
1627        tokio::time::sleep(Duration::from_millis(60)).await;
1628        let running_status = runtime.status().await;
1629        assert!(running_status.running);
1630        assert!(running_status.events_buffered > 0);
1631
1632        runtime
1633            .stop(Some("test".to_string()))
1634            .await
1635            .expect("runtime should stop");
1636        let stopped_status = runtime.status().await;
1637        assert!(!stopped_status.running);
1638    }
1639}