1mod thinker;
9
10use anyhow::{Result, anyhow};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18use tokio::sync::{Mutex, RwLock, broadcast};
19use tokio::task::JoinHandle;
20use tokio::time::Instant;
21use thinker::{ThinkerClient, ThinkerConfig};
22use uuid::Uuid;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub enum PersonaStatus {
28 Active,
29 Idle,
30 Reaped,
31 Error,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct PersonaPolicy {
37 pub max_spawn_depth: u32,
38 pub max_branching_factor: u32,
39 pub token_credits_per_minute: u32,
40 pub cpu_credits_per_minute: u32,
41 pub idle_ttl_secs: u64,
42 pub share_memory: bool,
43}
44
45impl Default for PersonaPolicy {
46 fn default() -> Self {
47 Self {
48 max_spawn_depth: 4,
49 max_branching_factor: 4,
50 token_credits_per_minute: 20_000,
51 cpu_credits_per_minute: 10_000,
52 idle_ttl_secs: 3_600,
53 share_memory: false,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct PersonaIdentity {
61 pub id: String,
62 pub name: String,
63 pub role: String,
64 pub charter: String,
65 pub swarm_id: Option<String>,
66 pub parent_id: Option<String>,
67 pub depth: u32,
68 pub created_at: DateTime<Utc>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct PersonaRuntimeState {
74 pub identity: PersonaIdentity,
75 pub policy: PersonaPolicy,
76 pub status: PersonaStatus,
77 pub thought_count: u64,
78 pub last_tick_at: Option<DateTime<Utc>>,
79 pub updated_at: DateTime<Utc>,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub enum ProposalRisk {
86 Low,
87 Medium,
88 High,
89 Critical,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
94#[serde(rename_all = "snake_case")]
95pub enum ProposalStatus {
96 Created,
97 Verified,
98 Rejected,
99 Executed,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct Proposal {
105 pub id: String,
106 pub persona_id: String,
107 pub title: String,
108 pub rationale: String,
109 pub evidence_refs: Vec<String>,
110 pub risk: ProposalRisk,
111 pub status: ProposalStatus,
112 pub created_at: DateTime<Utc>,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub enum ThoughtEventType {
119 ThoughtGenerated,
120 HypothesisRaised,
121 CheckRequested,
122 CheckResult,
123 ProposalCreated,
124 ProposalVerified,
125 ProposalRejected,
126 ActionExecuted,
127 PersonaSpawned,
128 PersonaReaped,
129 SnapshotCompressed,
130}
131
132impl ThoughtEventType {
133 fn as_str(&self) -> &'static str {
134 match self {
135 Self::ThoughtGenerated => "thought_generated",
136 Self::HypothesisRaised => "hypothesis_raised",
137 Self::CheckRequested => "check_requested",
138 Self::CheckResult => "check_result",
139 Self::ProposalCreated => "proposal_created",
140 Self::ProposalVerified => "proposal_verified",
141 Self::ProposalRejected => "proposal_rejected",
142 Self::ActionExecuted => "action_executed",
143 Self::PersonaSpawned => "persona_spawned",
144 Self::PersonaReaped => "persona_reaped",
145 Self::SnapshotCompressed => "snapshot_compressed",
146 }
147 }
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct ThoughtEvent {
153 pub id: String,
154 pub event_type: ThoughtEventType,
155 pub persona_id: Option<String>,
156 pub swarm_id: Option<String>,
157 pub timestamp: DateTime<Utc>,
158 pub payload: serde_json::Value,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct MemorySnapshot {
164 pub id: String,
165 pub generated_at: DateTime<Utc>,
166 pub swarm_id: Option<String>,
167 pub persona_scope: Vec<String>,
168 pub summary: String,
169 pub hot_event_count: usize,
170 pub warm_fact_count: usize,
171 pub cold_snapshot_count: usize,
172 pub metadata: HashMap<String, serde_json::Value>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct CreatePersonaRequest {
178 pub persona_id: Option<String>,
179 pub name: String,
180 pub role: String,
181 pub charter: String,
182 pub swarm_id: Option<String>,
183 pub parent_id: Option<String>,
184 pub policy: Option<PersonaPolicy>,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct SpawnPersonaRequest {
190 pub persona_id: Option<String>,
191 pub name: String,
192 pub role: String,
193 pub charter: String,
194 pub swarm_id: Option<String>,
195 pub policy: Option<PersonaPolicy>,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct ReapPersonaRequest {
201 pub cascade: Option<bool>,
202 pub reason: Option<String>,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct StartCognitionRequest {
208 pub loop_interval_ms: Option<u64>,
209 pub seed_persona: Option<CreatePersonaRequest>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct StopCognitionRequest {
215 pub reason: Option<String>,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct CognitionStatus {
221 pub enabled: bool,
222 pub running: bool,
223 pub loop_interval_ms: u64,
224 pub started_at: Option<DateTime<Utc>>,
225 pub last_tick_at: Option<DateTime<Utc>>,
226 pub persona_count: usize,
227 pub active_persona_count: usize,
228 pub events_buffered: usize,
229 pub snapshots_buffered: usize,
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct ReapPersonaResponse {
235 pub reaped_ids: Vec<String>,
236 pub count: usize,
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct LineageNode {
242 pub persona_id: String,
243 pub parent_id: Option<String>,
244 pub children: Vec<String>,
245 pub depth: u32,
246 pub status: PersonaStatus,
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct LineageGraph {
252 pub nodes: Vec<LineageNode>,
253 pub roots: Vec<String>,
254 pub total_edges: usize,
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq)]
258enum ThoughtPhase {
259 Observe,
260 Reflect,
261 Test,
262 Compress,
263}
264
265impl ThoughtPhase {
266 fn from_thought_count(thought_count: u64) -> Self {
267 match thought_count % 4 {
268 1 => Self::Observe,
269 2 => Self::Reflect,
270 3 => Self::Test,
271 _ => Self::Compress,
272 }
273 }
274
275 fn as_str(&self) -> &'static str {
276 match self {
277 Self::Observe => "observe",
278 Self::Reflect => "reflect",
279 Self::Test => "test",
280 Self::Compress => "compress",
281 }
282 }
283
284 fn event_type(&self) -> ThoughtEventType {
285 match self {
286 Self::Observe => ThoughtEventType::ThoughtGenerated,
287 Self::Reflect => ThoughtEventType::HypothesisRaised,
288 Self::Test => ThoughtEventType::CheckRequested,
289 Self::Compress => ThoughtEventType::SnapshotCompressed,
290 }
291 }
292}
293
294#[derive(Debug, Clone)]
295struct ThoughtWorkItem {
296 persona_id: String,
297 persona_name: String,
298 role: String,
299 charter: String,
300 swarm_id: Option<String>,
301 thought_count: u64,
302 phase: ThoughtPhase,
303}
304
305#[derive(Debug, Clone)]
306struct ThoughtResult {
307 source: &'static str,
308 model: Option<String>,
309 finish_reason: Option<String>,
310 thinking: String,
311 prompt_tokens: Option<u32>,
312 completion_tokens: Option<u32>,
313 total_tokens: Option<u32>,
314 latency_ms: u128,
315 error: Option<String>,
316}
317
318#[derive(Debug, Clone)]
320pub struct CognitionRuntimeOptions {
321 pub enabled: bool,
322 pub loop_interval_ms: u64,
323 pub max_events: usize,
324 pub max_snapshots: usize,
325 pub default_policy: PersonaPolicy,
326}
327
328impl Default for CognitionRuntimeOptions {
329 fn default() -> Self {
330 Self {
331 enabled: false,
332 loop_interval_ms: 2_000,
333 max_events: 2_000,
334 max_snapshots: 128,
335 default_policy: PersonaPolicy::default(),
336 }
337 }
338}
339
340#[derive(Debug)]
342pub struct CognitionRuntime {
343 enabled: bool,
344 max_events: usize,
345 max_snapshots: usize,
346 default_policy: PersonaPolicy,
347 running: Arc<AtomicBool>,
348 loop_interval_ms: Arc<RwLock<u64>>,
349 started_at: Arc<RwLock<Option<DateTime<Utc>>>>,
350 last_tick_at: Arc<RwLock<Option<DateTime<Utc>>>>,
351 personas: Arc<RwLock<HashMap<String, PersonaRuntimeState>>>,
352 proposals: Arc<RwLock<HashMap<String, Proposal>>>,
353 events: Arc<RwLock<VecDeque<ThoughtEvent>>>,
354 snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
355 loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
356 event_tx: broadcast::Sender<ThoughtEvent>,
357 thinker: Option<Arc<ThinkerClient>>,
358}
359
360impl CognitionRuntime {
361 pub fn new_from_env() -> Self {
363 let mut options = CognitionRuntimeOptions::default();
364 options.enabled = env_bool("CODETETHER_COGNITION_ENABLED", true);
365 options.loop_interval_ms = env_u64("CODETETHER_COGNITION_LOOP_INTERVAL_MS", 2_000);
366 options.max_events = env_usize("CODETETHER_COGNITION_MAX_EVENTS", 2_000);
367 options.max_snapshots = env_usize("CODETETHER_COGNITION_MAX_SNAPSHOTS", 128);
368
369 options.default_policy = PersonaPolicy {
370 max_spawn_depth: env_u32("CODETETHER_COGNITION_MAX_SPAWN_DEPTH", 4),
371 max_branching_factor: env_u32("CODETETHER_COGNITION_MAX_BRANCHING_FACTOR", 4),
372 token_credits_per_minute: env_u32(
373 "CODETETHER_COGNITION_TOKEN_CREDITS_PER_MINUTE",
374 20_000,
375 ),
376 cpu_credits_per_minute: env_u32("CODETETHER_COGNITION_CPU_CREDITS_PER_MINUTE", 10_000),
377 idle_ttl_secs: env_u64("CODETETHER_COGNITION_IDLE_TTL_SECS", 3_600),
378 share_memory: env_bool("CODETETHER_COGNITION_SHARE_MEMORY", false),
379 };
380
381 let thinker_config = ThinkerConfig {
382 enabled: env_bool("CODETETHER_COGNITION_THINKER_ENABLED", true),
383 backend: thinker::ThinkerBackend::from_env(
384 &std::env::var("CODETETHER_COGNITION_THINKER_BACKEND")
385 .unwrap_or_else(|_| "openai_compat".to_string()),
386 ),
387 endpoint: normalize_thinker_endpoint(&std::env::var(
388 "CODETETHER_COGNITION_THINKER_BASE_URL",
389 )
390 .unwrap_or_else(|_| "http://127.0.0.1:11434/v1".to_string())),
391 model: std::env::var("CODETETHER_COGNITION_THINKER_MODEL")
392 .unwrap_or_else(|_| "qwen2.5:3b-instruct".to_string()),
393 api_key: std::env::var("CODETETHER_COGNITION_THINKER_API_KEY").ok(),
394 temperature: env_f32("CODETETHER_COGNITION_THINKER_TEMPERATURE", 0.2),
395 top_p: std::env::var("CODETETHER_COGNITION_THINKER_TOP_P")
396 .ok()
397 .and_then(|v| v.parse::<f32>().ok()),
398 max_tokens: env_usize("CODETETHER_COGNITION_THINKER_MAX_TOKENS", 256),
399 timeout_ms: env_u64("CODETETHER_COGNITION_THINKER_TIMEOUT_MS", 12_000),
400 candle_model_path: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_MODEL_PATH")
401 .ok(),
402 candle_tokenizer_path: std::env::var(
403 "CODETETHER_COGNITION_THINKER_CANDLE_TOKENIZER_PATH",
404 )
405 .ok(),
406 candle_arch: std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_ARCH").ok(),
407 candle_device: thinker::CandleDevicePreference::from_env(
408 &std::env::var("CODETETHER_COGNITION_THINKER_CANDLE_DEVICE")
409 .unwrap_or_else(|_| "auto".to_string()),
410 ),
411 candle_cuda_ordinal: env_usize(
412 "CODETETHER_COGNITION_THINKER_CANDLE_CUDA_ORDINAL",
413 0,
414 ),
415 candle_repeat_penalty: env_f32(
416 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_PENALTY",
417 1.1,
418 ),
419 candle_repeat_last_n: env_usize(
420 "CODETETHER_COGNITION_THINKER_CANDLE_REPEAT_LAST_N",
421 64,
422 ),
423 candle_seed: env_u64("CODETETHER_COGNITION_THINKER_CANDLE_SEED", 42),
424 };
425
426 Self::new_with_options_and_thinker(options, Some(thinker_config))
427 }
428
429 pub fn new_with_options(options: CognitionRuntimeOptions) -> Self {
431 Self::new_with_options_and_thinker(options, None)
432 }
433
434 fn new_with_options_and_thinker(
435 options: CognitionRuntimeOptions,
436 thinker_config: Option<ThinkerConfig>,
437 ) -> Self {
438 let (event_tx, _) = broadcast::channel(options.max_events.max(16));
439 let thinker = thinker_config.and_then(|cfg| {
440 if !cfg.enabled {
441 return None;
442 }
443 match ThinkerClient::new(cfg) {
444 Ok(client) => {
445 tracing::info!(
446 backend = ?client.config().backend,
447 endpoint = %client.config().endpoint,
448 model = %client.config().model,
449 "Cognition thinker initialized"
450 );
451 Some(Arc::new(client))
452 }
453 Err(error) => {
454 tracing::warn!(%error, "Failed to initialize cognition thinker; using fallback thoughts");
455 None
456 }
457 }
458 });
459
460 Self {
461 enabled: options.enabled,
462 max_events: options.max_events.max(32),
463 max_snapshots: options.max_snapshots.max(8),
464 default_policy: options.default_policy,
465 running: Arc::new(AtomicBool::new(false)),
466 loop_interval_ms: Arc::new(RwLock::new(options.loop_interval_ms.max(100))),
467 started_at: Arc::new(RwLock::new(None)),
468 last_tick_at: Arc::new(RwLock::new(None)),
469 personas: Arc::new(RwLock::new(HashMap::new())),
470 proposals: Arc::new(RwLock::new(HashMap::new())),
471 events: Arc::new(RwLock::new(VecDeque::new())),
472 snapshots: Arc::new(RwLock::new(VecDeque::new())),
473 loop_handle: Arc::new(Mutex::new(None)),
474 event_tx,
475 thinker,
476 }
477 }
478
479 pub fn is_enabled(&self) -> bool {
481 self.enabled
482 }
483
484 pub fn subscribe_events(&self) -> broadcast::Receiver<ThoughtEvent> {
486 self.event_tx.subscribe()
487 }
488
489 pub async fn start(&self, req: Option<StartCognitionRequest>) -> Result<CognitionStatus> {
491 if !self.enabled {
492 return Err(anyhow!(
493 "Perpetual cognition is disabled. Set CODETETHER_COGNITION_ENABLED=true."
494 ));
495 }
496
497 let mut requested_seed_persona: Option<CreatePersonaRequest> = None;
498 if let Some(request) = req {
499 if let Some(interval) = request.loop_interval_ms {
500 let mut lock = self.loop_interval_ms.write().await;
501 *lock = interval.max(100);
502 }
503 requested_seed_persona = request.seed_persona;
504 }
505
506 let has_personas = !self.personas.read().await.is_empty();
507 if !has_personas {
508 let seed = requested_seed_persona.unwrap_or_else(default_seed_persona);
509 let _ = self.create_persona(seed).await?;
510 }
511
512 if self.running.load(Ordering::SeqCst) {
513 return Ok(self.status().await);
514 }
515
516 self.running.store(true, Ordering::SeqCst);
517 {
518 let mut started = self.started_at.write().await;
519 *started = Some(Utc::now());
520 }
521
522 let running = Arc::clone(&self.running);
523 let loop_interval_ms = Arc::clone(&self.loop_interval_ms);
524 let last_tick_at = Arc::clone(&self.last_tick_at);
525 let personas = Arc::clone(&self.personas);
526 let proposals = Arc::clone(&self.proposals);
527 let events = Arc::clone(&self.events);
528 let snapshots = Arc::clone(&self.snapshots);
529 let max_events = self.max_events;
530 let max_snapshots = self.max_snapshots;
531 let event_tx = self.event_tx.clone();
532 let thinker = self.thinker.clone();
533
534 let handle = tokio::spawn(async move {
535 let mut next_tick = Instant::now();
536 while running.load(Ordering::SeqCst) {
537 let now_instant = Instant::now();
538 if now_instant < next_tick {
539 tokio::time::sleep_until(next_tick).await;
540 }
541 if !running.load(Ordering::SeqCst) {
542 break;
543 }
544
545 let now = Utc::now();
546 {
547 let mut lock = last_tick_at.write().await;
548 *lock = Some(now);
549 }
550
551 let mut new_events = Vec::new();
552 let mut new_snapshots = Vec::new();
553 let mut new_proposals = Vec::new();
554
555 let work_items: Vec<ThoughtWorkItem> = {
556 let mut map = personas.write().await;
557 let mut items = Vec::new();
558 for persona in map.values_mut() {
559 if persona.status != PersonaStatus::Active {
560 continue;
561 }
562
563 persona.thought_count = persona.thought_count.saturating_add(1);
564 persona.last_tick_at = Some(now);
565 persona.updated_at = now;
566 items.push(ThoughtWorkItem {
567 persona_id: persona.identity.id.clone(),
568 persona_name: persona.identity.name.clone(),
569 role: persona.identity.role.clone(),
570 charter: persona.identity.charter.clone(),
571 swarm_id: persona.identity.swarm_id.clone(),
572 thought_count: persona.thought_count,
573 phase: ThoughtPhase::from_thought_count(persona.thought_count),
574 });
575 }
576 items
577 };
578
579 for work in work_items {
580 let context = recent_persona_context(&events, &work.persona_id, 8).await;
581 let thought = generate_phase_thought(thinker.as_deref(), &work, &context).await;
582
583 let event_timestamp = Utc::now();
584 new_events.push(ThoughtEvent {
585 id: Uuid::new_v4().to_string(),
586 event_type: work.phase.event_type(),
587 persona_id: Some(work.persona_id.clone()),
588 swarm_id: work.swarm_id.clone(),
589 timestamp: event_timestamp,
590 payload: json!({
591 "phase": work.phase.as_str(),
592 "thought_count": work.thought_count,
593 "persona": {
594 "id": work.persona_id.clone(),
595 "name": work.persona_name.clone(),
596 "role": work.role.clone(),
597 },
598 "context_event_count": context.len(),
599 "thinking": thought.thinking.clone(),
600 "source": thought.source,
601 "model": thought.model.clone(),
602 "finish_reason": thought.finish_reason.clone(),
603 "usage": {
604 "prompt_tokens": thought.prompt_tokens,
605 "completion_tokens": thought.completion_tokens,
606 "total_tokens": thought.total_tokens,
607 },
608 "latency_ms": thought.latency_ms,
609 "error": thought.error.clone(),
610 }),
611 });
612
613 if work.phase == ThoughtPhase::Test {
614 new_events.push(ThoughtEvent {
615 id: Uuid::new_v4().to_string(),
616 event_type: ThoughtEventType::CheckResult,
617 persona_id: Some(work.persona_id.clone()),
618 swarm_id: work.swarm_id.clone(),
619 timestamp: Utc::now(),
620 payload: json!({
621 "phase": work.phase.as_str(),
622 "thought_count": work.thought_count,
623 "result_excerpt": trim_for_storage(&thought.thinking, 280),
624 "source": thought.source,
625 "model": thought.model,
626 }),
627 });
628 }
629
630 if work.phase == ThoughtPhase::Reflect && work.thought_count % 8 == 2 {
631 let proposal = Proposal {
632 id: Uuid::new_v4().to_string(),
633 persona_id: work.persona_id.clone(),
634 title: proposal_title_from_thought(&thought.thinking, work.thought_count),
635 rationale: trim_for_storage(&thought.thinking, 900),
636 evidence_refs: vec!["internal.thought_stream".to_string()],
637 risk: ProposalRisk::Low,
638 status: ProposalStatus::Created,
639 created_at: Utc::now(),
640 };
641
642 new_events.push(ThoughtEvent {
643 id: Uuid::new_v4().to_string(),
644 event_type: ThoughtEventType::ProposalCreated,
645 persona_id: Some(work.persona_id.clone()),
646 swarm_id: work.swarm_id.clone(),
647 timestamp: Utc::now(),
648 payload: json!({
649 "proposal_id": proposal.id,
650 "title": proposal.title,
651 "rationale_excerpt": trim_for_storage(&proposal.rationale, 220),
652 "source": thought.source,
653 "model": thought.model,
654 }),
655 });
656
657 new_proposals.push(proposal);
658 }
659
660 if work.phase == ThoughtPhase::Compress {
661 new_snapshots.push(MemorySnapshot {
662 id: Uuid::new_v4().to_string(),
663 generated_at: Utc::now(),
664 swarm_id: work.swarm_id.clone(),
665 persona_scope: vec![work.persona_id.clone()],
666 summary: trim_for_storage(&thought.thinking, 1_500),
667 hot_event_count: context.len(),
668 warm_fact_count: estimate_fact_count(&thought.thinking),
669 cold_snapshot_count: 1,
670 metadata: HashMap::from([
671 (
672 "phase".to_string(),
673 serde_json::Value::String(work.phase.as_str().to_string()),
674 ),
675 (
676 "role".to_string(),
677 serde_json::Value::String(work.role.clone()),
678 ),
679 (
680 "source".to_string(),
681 serde_json::Value::String(thought.source.to_string()),
682 ),
683 (
684 "model".to_string(),
685 serde_json::Value::String(
686 thought
687 .model
688 .clone()
689 .unwrap_or_else(|| "fallback".to_string()),
690 ),
691 ),
692 (
693 "completion_tokens".to_string(),
694 serde_json::Value::Number(
695 serde_json::Number::from(
696 thought.completion_tokens.unwrap_or(0) as u64
697 ),
698 ),
699 ),
700 ]),
701 });
702 }
703 }
704
705 if !new_proposals.is_empty() {
706 let mut proposal_store = proposals.write().await;
707 for proposal in new_proposals {
708 proposal_store.insert(proposal.id.clone(), proposal);
709 }
710 }
711
712 for event in new_events {
713 push_event_internal(&events, max_events, &event_tx, event).await;
714 }
715 for snapshot in new_snapshots {
716 push_snapshot_internal(&snapshots, max_snapshots, snapshot).await;
717 }
718
719 let interval = Duration::from_millis((*loop_interval_ms.read().await).max(100));
720 next_tick += interval;
721 let tick_completed = Instant::now();
722 if tick_completed > next_tick {
723 next_tick = tick_completed;
724 }
725 }
726 });
727
728 {
729 let mut lock = self.loop_handle.lock().await;
730 *lock = Some(handle);
731 }
732
733 Ok(self.status().await)
734 }
735
736 pub async fn stop(&self, reason: Option<String>) -> Result<CognitionStatus> {
738 self.running.store(false, Ordering::SeqCst);
739
740 if let Some(handle) = self.loop_handle.lock().await.take() {
741 handle.abort();
742 let _ = handle.await;
743 }
744
745 if let Some(reason_value) = reason {
746 let event = ThoughtEvent {
747 id: Uuid::new_v4().to_string(),
748 event_type: ThoughtEventType::CheckResult,
749 persona_id: None,
750 swarm_id: None,
751 timestamp: Utc::now(),
752 payload: json!({ "stopped": true, "reason": reason_value }),
753 };
754 self.push_event(event).await;
755 }
756
757 Ok(self.status().await)
758 }
759
760 pub async fn create_persona(&self, req: CreatePersonaRequest) -> Result<PersonaRuntimeState> {
762 let now = Utc::now();
763 let mut personas = self.personas.write().await;
764
765 let mut parent_swarm_id = None;
766 let mut computed_depth = 0_u32;
767 let mut inherited_policy = None;
768
769 if let Some(parent_id) = req.parent_id.clone() {
770 let parent = personas
771 .get(&parent_id)
772 .ok_or_else(|| anyhow!("Parent persona not found: {}", parent_id))?;
773
774 if parent.status == PersonaStatus::Reaped {
775 return Err(anyhow!("Parent persona {} is reaped", parent_id));
776 }
777
778 parent_swarm_id = parent.identity.swarm_id.clone();
779 computed_depth = parent.identity.depth.saturating_add(1);
780 inherited_policy = Some(parent.policy.clone());
781 let branch_limit = parent.policy.max_branching_factor;
782
783 let child_count = personas
784 .values()
785 .filter(|p| {
786 p.identity.parent_id.as_deref() == Some(parent_id.as_str())
787 && p.status != PersonaStatus::Reaped
788 })
789 .count();
790
791 if child_count as u32 >= branch_limit {
792 return Err(anyhow!(
793 "Parent {} reached branching limit {}",
794 parent_id,
795 branch_limit
796 ));
797 }
798 }
799
800 let policy = req
801 .policy
802 .clone()
803 .or(inherited_policy.clone())
804 .unwrap_or_else(|| self.default_policy.clone());
805
806 let effective_depth_limit = inherited_policy
807 .as_ref()
808 .map(|p| p.max_spawn_depth)
809 .unwrap_or(policy.max_spawn_depth);
810
811 if computed_depth > effective_depth_limit {
812 return Err(anyhow!(
813 "Spawn depth {} exceeds limit {}",
814 computed_depth,
815 effective_depth_limit
816 ));
817 }
818
819 let persona_id = req.persona_id.unwrap_or_else(|| Uuid::new_v4().to_string());
820 if personas.contains_key(&persona_id) {
821 return Err(anyhow!("Persona id already exists: {}", persona_id));
822 }
823
824 let identity = PersonaIdentity {
825 id: persona_id.clone(),
826 name: req.name,
827 role: req.role,
828 charter: req.charter,
829 swarm_id: req.swarm_id.or(parent_swarm_id),
830 parent_id: req.parent_id,
831 depth: computed_depth,
832 created_at: now,
833 };
834
835 let persona = PersonaRuntimeState {
836 identity,
837 policy,
838 status: PersonaStatus::Active,
839 thought_count: 0,
840 last_tick_at: None,
841 updated_at: now,
842 };
843
844 personas.insert(persona_id, persona.clone());
845 drop(personas);
846
847 self.push_event(ThoughtEvent {
848 id: Uuid::new_v4().to_string(),
849 event_type: ThoughtEventType::PersonaSpawned,
850 persona_id: Some(persona.identity.id.clone()),
851 swarm_id: persona.identity.swarm_id.clone(),
852 timestamp: now,
853 payload: json!({
854 "name": persona.identity.name,
855 "role": persona.identity.role,
856 "depth": persona.identity.depth,
857 }),
858 })
859 .await;
860
861 Ok(persona)
862 }
863
864 pub async fn spawn_child(
866 &self,
867 parent_id: &str,
868 req: SpawnPersonaRequest,
869 ) -> Result<PersonaRuntimeState> {
870 let request = CreatePersonaRequest {
871 persona_id: req.persona_id,
872 name: req.name,
873 role: req.role,
874 charter: req.charter,
875 swarm_id: req.swarm_id,
876 parent_id: Some(parent_id.to_string()),
877 policy: req.policy,
878 };
879 self.create_persona(request).await
880 }
881
882 pub async fn reap_persona(
884 &self,
885 persona_id: &str,
886 req: ReapPersonaRequest,
887 ) -> Result<ReapPersonaResponse> {
888 let cascade = req.cascade.unwrap_or(false);
889 let now = Utc::now();
890
891 let mut personas = self.personas.write().await;
892 if !personas.contains_key(persona_id) {
893 return Err(anyhow!("Persona not found: {}", persona_id));
894 }
895
896 let mut reaped_ids = vec![persona_id.to_string()];
897 if cascade {
898 let mut idx = 0usize;
899 while idx < reaped_ids.len() {
900 let current = reaped_ids[idx].clone();
901 let children: Vec<String> = personas
902 .values()
903 .filter(|p| p.identity.parent_id.as_deref() == Some(current.as_str()))
904 .map(|p| p.identity.id.clone())
905 .collect();
906 for child in children {
907 if !reaped_ids.iter().any(|existing| existing == &child) {
908 reaped_ids.push(child);
909 }
910 }
911 idx += 1;
912 }
913 }
914
915 for id in &reaped_ids {
916 if let Some(persona) = personas.get_mut(id) {
917 persona.status = PersonaStatus::Reaped;
918 persona.updated_at = now;
919 }
920 }
921 drop(personas);
922
923 for id in &reaped_ids {
924 self.push_event(ThoughtEvent {
925 id: Uuid::new_v4().to_string(),
926 event_type: ThoughtEventType::PersonaReaped,
927 persona_id: Some(id.clone()),
928 swarm_id: None,
929 timestamp: now,
930 payload: json!({
931 "reason": req.reason.clone().unwrap_or_else(|| "manual_reap".to_string()),
932 "cascade": cascade,
933 }),
934 })
935 .await;
936 }
937
938 Ok(ReapPersonaResponse {
939 count: reaped_ids.len(),
940 reaped_ids,
941 })
942 }
943
944 pub async fn latest_snapshot(&self) -> Option<MemorySnapshot> {
946 self.snapshots.read().await.back().cloned()
947 }
948
949 pub async fn lineage_graph(&self) -> LineageGraph {
951 let personas = self.personas.read().await;
952 let mut children_by_parent: HashMap<String, Vec<String>> = HashMap::new();
953 let mut roots = Vec::new();
954 let mut total_edges = 0usize;
955
956 for persona in personas.values() {
957 if let Some(parent_id) = persona.identity.parent_id.clone() {
958 children_by_parent
959 .entry(parent_id)
960 .or_default()
961 .push(persona.identity.id.clone());
962 total_edges = total_edges.saturating_add(1);
963 } else {
964 roots.push(persona.identity.id.clone());
965 }
966 }
967
968 let mut nodes: Vec<LineageNode> = personas
969 .values()
970 .map(|persona| {
971 let mut children = children_by_parent
972 .get(&persona.identity.id)
973 .cloned()
974 .unwrap_or_default();
975 children.sort();
976
977 LineageNode {
978 persona_id: persona.identity.id.clone(),
979 parent_id: persona.identity.parent_id.clone(),
980 children,
981 depth: persona.identity.depth,
982 status: persona.status,
983 }
984 })
985 .collect();
986
987 nodes.sort_by(|a, b| a.persona_id.cmp(&b.persona_id));
988 roots.sort();
989
990 LineageGraph {
991 nodes,
992 roots,
993 total_edges,
994 }
995 }
996
997 pub async fn status(&self) -> CognitionStatus {
999 let personas = self.personas.read().await;
1000 let events = self.events.read().await;
1001 let snapshots = self.snapshots.read().await;
1002 let started_at = *self.started_at.read().await;
1003 let last_tick_at = *self.last_tick_at.read().await;
1004 let loop_interval_ms = *self.loop_interval_ms.read().await;
1005
1006 let active_persona_count = personas
1007 .values()
1008 .filter(|p| p.status == PersonaStatus::Active)
1009 .count();
1010
1011 CognitionStatus {
1012 enabled: self.enabled,
1013 running: self.running.load(Ordering::SeqCst),
1014 loop_interval_ms,
1015 started_at,
1016 last_tick_at,
1017 persona_count: personas.len(),
1018 active_persona_count,
1019 events_buffered: events.len(),
1020 snapshots_buffered: snapshots.len(),
1021 }
1022 }
1023
1024 async fn push_event(&self, event: ThoughtEvent) {
1025 push_event_internal(&self.events, self.max_events, &self.event_tx, event).await;
1026 }
1027}
1028
1029async fn push_event_internal(
1030 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1031 max_events: usize,
1032 event_tx: &broadcast::Sender<ThoughtEvent>,
1033 event: ThoughtEvent,
1034) {
1035 {
1036 let mut lock = events.write().await;
1037 lock.push_back(event.clone());
1038 while lock.len() > max_events {
1039 lock.pop_front();
1040 }
1041 }
1042 let _ = event_tx.send(event);
1043}
1044
1045async fn push_snapshot_internal(
1046 snapshots: &Arc<RwLock<VecDeque<MemorySnapshot>>>,
1047 max_snapshots: usize,
1048 snapshot: MemorySnapshot,
1049) {
1050 let mut lock = snapshots.write().await;
1051 lock.push_back(snapshot);
1052 while lock.len() > max_snapshots {
1053 lock.pop_front();
1054 }
1055}
1056
1057async fn recent_persona_context(
1058 events: &Arc<RwLock<VecDeque<ThoughtEvent>>>,
1059 persona_id: &str,
1060 limit: usize,
1061) -> Vec<ThoughtEvent> {
1062 let lock = events.read().await;
1063 let mut selected: Vec<ThoughtEvent> = lock
1064 .iter()
1065 .rev()
1066 .filter(|event| {
1067 event.persona_id.as_deref() == Some(persona_id)
1068 || (event.persona_id.is_none()
1069 && matches!(
1070 event.event_type,
1071 ThoughtEventType::CheckResult
1072 | ThoughtEventType::ProposalCreated
1073 | ThoughtEventType::SnapshotCompressed
1074 ))
1075 })
1076 .take(limit)
1077 .cloned()
1078 .collect();
1079 selected.reverse();
1080 selected
1081}
1082
1083async fn generate_phase_thought(
1084 thinker: Option<&ThinkerClient>,
1085 work: &ThoughtWorkItem,
1086 context: &[ThoughtEvent],
1087) -> ThoughtResult {
1088 let started_at = Instant::now();
1089 if let Some(client) = thinker {
1090 let (system_prompt, user_prompt) = build_phase_prompts(work, context);
1091 match client.think(&system_prompt, &user_prompt).await {
1092 Ok(output) => {
1093 let thinking = trim_for_storage(&output.text, 2_000);
1094 if !thinking.is_empty() {
1095 return ThoughtResult {
1096 source: "model",
1097 model: Some(output.model),
1098 finish_reason: output.finish_reason,
1099 thinking,
1100 prompt_tokens: output.prompt_tokens,
1101 completion_tokens: output.completion_tokens,
1102 total_tokens: output.total_tokens,
1103 latency_ms: started_at.elapsed().as_millis(),
1104 error: None,
1105 };
1106 }
1107 }
1108 Err(error) => {
1109 return ThoughtResult {
1110 source: "fallback",
1111 model: None,
1112 finish_reason: None,
1113 thinking: fallback_phase_text(work, context),
1114 prompt_tokens: None,
1115 completion_tokens: None,
1116 total_tokens: None,
1117 latency_ms: started_at.elapsed().as_millis(),
1118 error: Some(error.to_string()),
1119 };
1120 }
1121 }
1122 }
1123
1124 ThoughtResult {
1125 source: "fallback",
1126 model: None,
1127 finish_reason: None,
1128 thinking: fallback_phase_text(work, context),
1129 prompt_tokens: None,
1130 completion_tokens: None,
1131 total_tokens: None,
1132 latency_ms: started_at.elapsed().as_millis(),
1133 error: None,
1134 }
1135}
1136
1137fn build_phase_prompts(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> (String, String) {
1138 let system_prompt = "You are the internal cognition engine for a persistent autonomous persona. \
1139Respond with concise plain text only. Do not include markdown, XML, or code fences. \
1140Focus on reasoning quality, uncertainty, and actionable checks."
1141 .to_string();
1142
1143 let context_lines = if context.is_empty() {
1144 "none".to_string()
1145 } else {
1146 context
1147 .iter()
1148 .map(format_context_event)
1149 .collect::<Vec<_>>()
1150 .join("\n")
1151 };
1152
1153 let phase_instruction = match work.phase {
1154 ThoughtPhase::Observe => {
1155 "Observe current state. Identify 1-3 concrete signals and one uncertainty."
1156 }
1157 ThoughtPhase::Reflect => {
1158 "Reflect on observed signals. Produce one hypothesis, rationale, and risk note."
1159 }
1160 ThoughtPhase::Test => {
1161 "Design and evaluate one check. Include expected result and evidence quality."
1162 }
1163 ThoughtPhase::Compress => {
1164 "Compress the latest cognition into a short state summary and retained facts."
1165 }
1166 };
1167
1168 let user_prompt = format!(
1169 "phase: {phase}\npersona_id: {persona_id}\npersona_name: {persona_name}\nrole: {role}\ncharter: {charter}\nthought_count: {thought_count}\nrecent_context:\n{context}\n\ninstruction:\n{instruction}",
1170 phase = work.phase.as_str(),
1171 persona_id = work.persona_id,
1172 persona_name = work.persona_name,
1173 role = work.role,
1174 charter = work.charter,
1175 thought_count = work.thought_count,
1176 context = context_lines,
1177 instruction = phase_instruction
1178 );
1179
1180 (system_prompt, user_prompt)
1181}
1182
1183fn format_context_event(event: &ThoughtEvent) -> String {
1184 let payload = serde_json::to_string(&event.payload).unwrap_or_else(|_| "{}".to_string());
1185 format!(
1186 "{} {} {}",
1187 event.event_type.as_str(),
1188 event.timestamp.to_rfc3339(),
1189 trim_for_storage(&payload, 220)
1190 )
1191}
1192
1193fn fallback_phase_text(work: &ThoughtWorkItem, context: &[ThoughtEvent]) -> String {
1194 let phase = work.phase.as_str();
1195 let context_count = context.len();
1196 format!(
1197 "phase={} persona={} role={} context_events={} thought_count={} synthesized_fallback=1",
1198 phase, work.persona_id, work.role, context_count, work.thought_count
1199 )
1200}
1201
1202fn trim_for_storage(input: &str, max_chars: usize) -> String {
1203 if input.chars().count() <= max_chars {
1204 return input.trim().to_string();
1205 }
1206 let mut trimmed = String::with_capacity(max_chars + 8);
1207 for ch in input.chars().take(max_chars) {
1208 trimmed.push(ch);
1209 }
1210 trimmed.push_str("...");
1211 trimmed.trim().to_string()
1212}
1213
1214fn estimate_fact_count(text: &str) -> usize {
1215 let sentence_count = text.matches('.').count() + text.matches('!').count() + text.matches('?').count();
1216 sentence_count.clamp(1, 12)
1217}
1218
1219fn proposal_title_from_thought(thought: &str, thought_count: u64) -> String {
1220 let first_line = thought
1221 .lines()
1222 .find(|line| !line.trim().is_empty())
1223 .unwrap_or("proposal");
1224 let compact = first_line
1225 .replace(['\t', '\r', '\n'], " ")
1226 .split_whitespace()
1227 .collect::<Vec<_>>()
1228 .join(" ");
1229 let trimmed = trim_for_storage(&compact, 72);
1230 if trimmed.is_empty() {
1231 format!("proposal-{}", thought_count)
1232 } else {
1233 trimmed
1234 }
1235}
1236
1237fn default_seed_persona() -> CreatePersonaRequest {
1238 CreatePersonaRequest {
1239 persona_id: Some("root-thinker".to_string()),
1240 name: "root-thinker".to_string(),
1241 role: "orchestrator".to_string(),
1242 charter: "Continuously observe, reflect, test hypotheses, and compress useful insights."
1243 .to_string(),
1244 swarm_id: Some("swarm-core".to_string()),
1245 parent_id: None,
1246 policy: None,
1247 }
1248}
1249
1250fn normalize_thinker_endpoint(base_url: &str) -> String {
1251 let trimmed = base_url.trim().trim_end_matches('/');
1252 if trimmed.ends_with("/chat/completions") {
1253 return trimmed.to_string();
1254 }
1255 if trimmed.is_empty() {
1256 return "http://127.0.0.1:11434/v1/chat/completions".to_string();
1257 }
1258 format!("{}/chat/completions", trimmed)
1259}
1260
1261fn env_bool(name: &str, default: bool) -> bool {
1262 std::env::var(name)
1263 .ok()
1264 .and_then(|v| match v.to_ascii_lowercase().as_str() {
1265 "1" | "true" | "yes" | "on" => Some(true),
1266 "0" | "false" | "no" | "off" => Some(false),
1267 _ => None,
1268 })
1269 .unwrap_or(default)
1270}
1271
1272fn env_f32(name: &str, default: f32) -> f32 {
1273 std::env::var(name)
1274 .ok()
1275 .and_then(|v| v.parse::<f32>().ok())
1276 .unwrap_or(default)
1277}
1278
1279fn env_u64(name: &str, default: u64) -> u64 {
1280 std::env::var(name)
1281 .ok()
1282 .and_then(|v| v.parse::<u64>().ok())
1283 .unwrap_or(default)
1284}
1285
1286fn env_u32(name: &str, default: u32) -> u32 {
1287 std::env::var(name)
1288 .ok()
1289 .and_then(|v| v.parse::<u32>().ok())
1290 .unwrap_or(default)
1291}
1292
1293fn env_usize(name: &str, default: usize) -> usize {
1294 std::env::var(name)
1295 .ok()
1296 .and_then(|v| v.parse::<usize>().ok())
1297 .unwrap_or(default)
1298}
1299
1300#[cfg(test)]
1301mod tests {
1302 use super::*;
1303
1304 fn test_runtime() -> CognitionRuntime {
1305 CognitionRuntime::new_with_options(CognitionRuntimeOptions {
1306 enabled: true,
1307 loop_interval_ms: 25,
1308 max_events: 256,
1309 max_snapshots: 32,
1310 default_policy: PersonaPolicy {
1311 max_spawn_depth: 2,
1312 max_branching_factor: 2,
1313 token_credits_per_minute: 1_000,
1314 cpu_credits_per_minute: 1_000,
1315 idle_ttl_secs: 300,
1316 share_memory: false,
1317 },
1318 })
1319 }
1320
1321 #[tokio::test]
1322 async fn create_spawn_and_lineage_work() {
1323 let runtime = test_runtime();
1324
1325 let root = runtime
1326 .create_persona(CreatePersonaRequest {
1327 persona_id: Some("root".to_string()),
1328 name: "root".to_string(),
1329 role: "orchestrator".to_string(),
1330 charter: "coordinate".to_string(),
1331 swarm_id: Some("swarm-a".to_string()),
1332 parent_id: None,
1333 policy: None,
1334 })
1335 .await
1336 .expect("root should be created");
1337
1338 assert_eq!(root.identity.depth, 0);
1339
1340 let child = runtime
1341 .spawn_child(
1342 "root",
1343 SpawnPersonaRequest {
1344 persona_id: Some("child-1".to_string()),
1345 name: "child-1".to_string(),
1346 role: "analyst".to_string(),
1347 charter: "analyze".to_string(),
1348 swarm_id: None,
1349 policy: None,
1350 },
1351 )
1352 .await
1353 .expect("child should spawn");
1354
1355 assert_eq!(child.identity.parent_id.as_deref(), Some("root"));
1356 assert_eq!(child.identity.depth, 1);
1357
1358 let lineage = runtime.lineage_graph().await;
1359 assert_eq!(lineage.total_edges, 1);
1360 assert_eq!(lineage.roots, vec!["root".to_string()]);
1361 }
1362
1363 #[tokio::test]
1364 async fn branching_and_depth_limits_are_enforced() {
1365 let runtime = test_runtime();
1366
1367 runtime
1368 .create_persona(CreatePersonaRequest {
1369 persona_id: Some("root".to_string()),
1370 name: "root".to_string(),
1371 role: "orchestrator".to_string(),
1372 charter: "coordinate".to_string(),
1373 swarm_id: Some("swarm-a".to_string()),
1374 parent_id: None,
1375 policy: None,
1376 })
1377 .await
1378 .expect("root should be created");
1379
1380 runtime
1381 .spawn_child(
1382 "root",
1383 SpawnPersonaRequest {
1384 persona_id: Some("c1".to_string()),
1385 name: "c1".to_string(),
1386 role: "worker".to_string(),
1387 charter: "run".to_string(),
1388 swarm_id: None,
1389 policy: None,
1390 },
1391 )
1392 .await
1393 .expect("first child should spawn");
1394
1395 runtime
1396 .spawn_child(
1397 "root",
1398 SpawnPersonaRequest {
1399 persona_id: Some("c2".to_string()),
1400 name: "c2".to_string(),
1401 role: "worker".to_string(),
1402 charter: "run".to_string(),
1403 swarm_id: None,
1404 policy: None,
1405 },
1406 )
1407 .await
1408 .expect("second child should spawn");
1409
1410 let third_child = runtime
1411 .spawn_child(
1412 "root",
1413 SpawnPersonaRequest {
1414 persona_id: Some("c3".to_string()),
1415 name: "c3".to_string(),
1416 role: "worker".to_string(),
1417 charter: "run".to_string(),
1418 swarm_id: None,
1419 policy: None,
1420 },
1421 )
1422 .await;
1423 assert!(third_child.is_err());
1424
1425 runtime
1426 .spawn_child(
1427 "c1",
1428 SpawnPersonaRequest {
1429 persona_id: Some("c1-1".to_string()),
1430 name: "c1-1".to_string(),
1431 role: "worker".to_string(),
1432 charter: "run".to_string(),
1433 swarm_id: None,
1434 policy: None,
1435 },
1436 )
1437 .await
1438 .expect("depth 2 should be allowed");
1439
1440 let depth_violation = runtime
1441 .spawn_child(
1442 "c1-1",
1443 SpawnPersonaRequest {
1444 persona_id: Some("c1-1-1".to_string()),
1445 name: "c1-1-1".to_string(),
1446 role: "worker".to_string(),
1447 charter: "run".to_string(),
1448 swarm_id: None,
1449 policy: None,
1450 },
1451 )
1452 .await;
1453 assert!(depth_violation.is_err());
1454 }
1455
1456 #[tokio::test]
1457 async fn start_stop_updates_runtime_status() {
1458 let runtime = test_runtime();
1459
1460 runtime
1461 .start(Some(StartCognitionRequest {
1462 loop_interval_ms: Some(10),
1463 seed_persona: Some(CreatePersonaRequest {
1464 persona_id: Some("seed".to_string()),
1465 name: "seed".to_string(),
1466 role: "watcher".to_string(),
1467 charter: "observe".to_string(),
1468 swarm_id: Some("swarm-seed".to_string()),
1469 parent_id: None,
1470 policy: None,
1471 }),
1472 }))
1473 .await
1474 .expect("runtime should start");
1475
1476 tokio::time::sleep(Duration::from_millis(60)).await;
1477 let running_status = runtime.status().await;
1478 assert!(running_status.running);
1479 assert!(running_status.events_buffered > 0);
1480
1481 runtime
1482 .stop(Some("test".to_string()))
1483 .await
1484 .expect("runtime should stop");
1485 let stopped_status = runtime.status().await;
1486 assert!(!stopped_status.running);
1487 }
1488}