1use std::collections::{BTreeSet, HashMap, HashSet};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use ainl_graph_extractor::GraphExtractorTask;
10use ainl_memory::{
11 AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
12 RuntimeStateNode, SqliteGraphStore,
13};
14use ainl_persona::axes::default_axis_map;
15use ainl_persona::{
16 EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
17};
18use ainl_semantic_tagger::infer_topic_tags;
19use ainl_semantic_tagger::tag_tool_names;
20use ainl_semantic_tagger::TagNamespace;
21use uuid::Uuid;
22
23use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
24use crate::engine::{
25 AinlGraphArtifact, AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult,
26 PatchSkipReason, TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning,
27 EMIT_TO_EDGE,
28};
29use crate::graph_cell::{GraphCell, SqliteStoreRef};
30use crate::hooks::{NoOpHooks, TurnHooks};
31#[cfg(feature = "async")]
32use crate::hooks::TurnHooksAsync;
33use crate::RuntimeConfig;
34
35pub struct AinlRuntime {
51 config: RuntimeConfig,
52 memory: GraphCell,
53 extractor: GraphExtractorTask,
54 turn_count: u64,
55 last_extraction_at_turn: u64,
56 current_depth: Arc<AtomicU32>,
58 hooks: Box<dyn TurnHooks>,
59 evolution_writes_enabled: bool,
63 persona_cache: Option<String>,
64 #[doc(hidden)]
66 test_force_extraction_failure: bool,
67 #[doc(hidden)]
69 test_force_fitness_write_failure: bool,
70 #[doc(hidden)]
72 test_force_runtime_state_write_failure: bool,
73 adapter_registry: AdapterRegistry,
74 #[cfg(feature = "async")]
76 hooks_async: Option<std::sync::Arc<dyn TurnHooksAsync>>,
77}
78
79impl AinlRuntime {
80 pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
81 let agent_id = config.agent_id.clone();
82 let memory = GraphCell::new(store);
83 let (init_turn_count, init_persona_cache, init_last_extraction_at_turn) =
84 if agent_id.is_empty() {
85 (0, None, 0)
86 } else {
87 match memory.read_runtime_state(&agent_id) {
88 Ok(Some(state)) => {
89 tracing::info!(
90 agent_id = %agent_id,
91 turn_count = state.turn_count,
92 "restored runtime state"
93 );
94 let persona_cache = state
95 .persona_snapshot_json
96 .as_ref()
97 .and_then(|json| serde_json::from_str::<String>(json).ok());
98 (
99 state.turn_count,
100 persona_cache,
101 state.last_extraction_at_turn,
102 )
103 }
104 Ok(None) => (0, None, 0),
105 Err(e) => {
106 tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
107 (0, None, 0)
108 }
109 }
110 };
111 Self {
112 extractor: GraphExtractorTask::new(&agent_id),
113 memory,
114 config,
115 turn_count: init_turn_count,
116 last_extraction_at_turn: init_last_extraction_at_turn,
117 current_depth: Arc::new(AtomicU32::new(0)),
118 hooks: Box::new(NoOpHooks),
119 evolution_writes_enabled: true,
120 persona_cache: init_persona_cache,
121 test_force_extraction_failure: false,
122 test_force_fitness_write_failure: false,
123 test_force_runtime_state_write_failure: false,
124 adapter_registry: AdapterRegistry::new(),
125 #[cfg(feature = "async")]
126 hooks_async: None,
127 }
128 }
129
130 pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
132 self.adapter_registry.register(adapter);
133 }
134
135 pub fn register_default_patch_adapters(&mut self) {
138 self.register_adapter(GraphPatchAdapter::new());
139 }
140
141 pub fn registered_adapters(&self) -> Vec<&str> {
143 self.adapter_registry.registered_names()
144 }
145
146 #[doc(hidden)]
147 pub fn test_turn_count(&self) -> u64 {
148 self.turn_count
149 }
150
151 #[doc(hidden)]
152 pub fn test_persona_cache(&self) -> Option<&str> {
153 self.persona_cache.as_deref()
154 }
155
156 #[doc(hidden)]
157 pub fn test_delegation_depth(&self) -> u32 {
158 self.current_depth.load(Ordering::SeqCst)
159 }
160
161 #[doc(hidden)]
162 pub fn test_set_delegation_depth(&mut self, depth: u32) {
163 self.current_depth.store(depth, Ordering::SeqCst);
164 }
165
166 #[doc(hidden)]
167 pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
168 self.test_force_extraction_failure = fail;
169 }
170
171 #[doc(hidden)]
172 pub fn test_set_force_fitness_write_failure(&mut self, fail: bool) {
173 self.test_force_fitness_write_failure = fail;
174 }
175
176 #[doc(hidden)]
178 pub fn test_extractor_mut(&mut self) -> &mut GraphExtractorTask {
179 &mut self.extractor
180 }
181
182 #[doc(hidden)]
183 pub fn test_set_force_runtime_state_write_failure(&mut self, fail: bool) {
184 self.test_force_runtime_state_write_failure = fail;
185 }
186
187 pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
188 self.hooks = Box::new(hooks);
189 self
190 }
191
192 #[cfg(feature = "async")]
194 pub fn with_hooks_async(mut self, hooks: std::sync::Arc<dyn TurnHooksAsync>) -> Self {
195 self.hooks_async = Some(hooks);
196 self
197 }
198
199 pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
204 self.evolution_writes_enabled = enabled;
205 self
206 }
207
208 fn require_evolution_writes_enabled(&self) -> Result<(), String> {
209 if self.evolution_writes_enabled {
210 Ok(())
211 } else {
212 Err(
213 "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
214 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
215 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
216 ainl_memory.db"
217 .to_string(),
218 )
219 }
220 }
221
222 pub fn sqlite_store(&self) -> SqliteStoreRef<'_> {
229 self.memory.sqlite_ref()
230 }
231
232 pub fn evolution_engine(&self) -> &EvolutionEngine {
240 &self.extractor.evolution_engine
241 }
242
243 pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
247 &mut self.extractor.evolution_engine
248 }
249
250 pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
252 self.extractor.evolution_engine.ingest_signals(signals)
253 }
254
255 pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
257 self.extractor
258 .evolution_engine
259 .correction_tick(axis, correction);
260 }
261
262 pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
266 self.require_evolution_writes_enabled()?;
267 let snap = self.extractor.evolution_engine.snapshot();
268 self.memory.with(|m| {
269 self.extractor
270 .evolution_engine
271 .write_persona_node(m.sqlite_store(), &snap)
272 })?;
273 Ok(snap)
274 }
275
276 pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
283 self.require_evolution_writes_enabled()?;
284 self.memory
285 .with(|m| self.extractor.evolution_engine.evolve(m.sqlite_store()))
286 }
287
288 pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
290 self.memory
291 .with(|m| AinlGraphArtifact::load(m.sqlite_store(), &self.config.agent_id))
292 }
293
294 pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
297 self.compile_memory_context_for(None)
298 }
299
300 pub fn compile_memory_context_for(
305 &self,
306 user_message: Option<&str>,
307 ) -> Result<MemoryContext, String> {
308 if self.config.agent_id.is_empty() {
309 return Err("RuntimeConfig.agent_id must be set".to_string());
310 }
311 self.memory.with(|m| {
312 let store = m.sqlite_store();
313 let q = store.query(&self.config.agent_id);
314 let recent_episodes = q.recent_episodes(5)?;
315 let all_semantic = q.semantic_nodes()?;
316 let relevant_semantic = self.relevant_semantic_nodes(
317 user_message.unwrap_or(""),
318 all_semantic,
319 10,
320 );
321 let active_patches = q.active_patches()?;
322 let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
323 Ok(MemoryContext {
324 recent_episodes,
325 relevant_semantic,
326 active_patches,
327 persona_snapshot,
328 compiled_at: chrono::Utc::now(),
329 })
330 })
331 }
332
333 pub fn route_emit_edges(
335 &self,
336 episode_id: Uuid,
337 turn_output_payload: &serde_json::Value,
338 ) -> Result<(), String> {
339 self.memory.with(|m| {
340 let store = m.sqlite_store();
341 let neighbors = store
342 .query(&self.config.agent_id)
343 .neighbors(episode_id, EMIT_TO_EDGE)?;
344 for n in neighbors {
345 let target = emit_target_name(&n);
346 self.hooks.on_emit(&target, turn_output_payload);
347 }
348 Ok(())
349 })
350 }
351
352 pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutcome, AinlRuntimeError> {
354 let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
355 let cd = Arc::clone(&self.current_depth);
356 let _depth_guard = scopeguard::guard((), move |()| {
357 cd.fetch_sub(1, Ordering::SeqCst);
358 });
359
360 if depth >= self.config.max_delegation_depth {
361 return Err(AinlRuntimeError::DelegationDepthExceeded {
362 depth,
363 max: self.config.max_delegation_depth,
364 });
365 }
366
367 if !self.config.enable_graph_memory {
368 let memory_context = MemoryContext::default();
369 let result = TurnResult {
370 memory_context,
371 status: TurnStatus::GraphMemoryDisabled,
372 ..Default::default()
373 };
374 let outcome = TurnOutcome::Complete(result);
375 self.hooks.on_turn_complete(&outcome);
376 return Ok(outcome);
377 }
378
379 if self.config.agent_id.is_empty() {
380 return Err(AinlRuntimeError::Message(
381 "RuntimeConfig.agent_id must be set for run_turn".into(),
382 ));
383 }
384
385 let span = tracing::info_span!(
386 "ainl_runtime.run_turn",
387 agent_id = %self.config.agent_id,
388 turn = self.turn_count,
389 depth = input.depth,
390 );
391 let _span_enter = span.enter();
392
393 let validation: GraphValidationReport = self
394 .memory
395 .with(|m| m.sqlite_store().validate_graph(&self.config.agent_id))
396 .map_err(AinlRuntimeError::from)?;
397 if !validation.is_valid {
398 let mut msg = String::from("graph validation failed before turn");
399 for d in &validation.dangling_edge_details {
400 msg.push_str(&format!(
401 "; {} -> {} [{}]",
402 d.source_id, d.target_id, d.edge_type
403 ));
404 }
405 return Err(AinlRuntimeError::Message(msg));
406 }
407
408 self.hooks
409 .on_artifact_loaded(&self.config.agent_id, validation.node_count);
410
411 let mut turn_warnings: Vec<TurnWarning> = Vec::new();
412
413 let t_persona = Instant::now();
414 let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
415 Some(cached.clone())
416 } else {
417 let nodes = self
418 .memory
419 .with(|m| {
420 m.sqlite_store()
421 .query(&self.config.agent_id)
422 .persona_nodes()
423 })
424 .map_err(AinlRuntimeError::from)?;
425 let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
426 self.persona_cache = compiled.clone();
427 compiled
428 };
429 self.hooks
430 .on_persona_compiled(persona_prompt_contribution.as_deref());
431 tracing::debug!(
432 target: "ainl_runtime",
433 duration_ms = t_persona.elapsed().as_millis() as u64,
434 has_contribution = persona_prompt_contribution.is_some(),
435 "persona_phase"
436 );
437
438 let t_memory = Instant::now();
439 let memory_context = self
440 .compile_memory_context_for(Some(&input.user_message))
441 .map_err(AinlRuntimeError::from)?;
442 self.hooks.on_memory_context_ready(&memory_context);
443 tracing::debug!(
444 target: "ainl_runtime",
445 duration_ms = t_memory.elapsed().as_millis() as u64,
446 episode_count = memory_context.recent_episodes.len(),
447 semantic_count = memory_context.relevant_semantic.len(),
448 patch_count = memory_context.active_patches.len(),
449 "memory_context"
450 );
451
452 let t_patches = Instant::now();
453 let patch_dispatch_results = if self.config.enable_graph_memory {
454 self.dispatch_patches_collect(
455 &memory_context.active_patches,
456 &input.frame,
457 &mut turn_warnings,
458 )
459 } else {
460 Vec::new()
461 };
462 for r in &patch_dispatch_results {
463 tracing::debug!(
464 target: "ainl_runtime",
465 label = %r.label,
466 dispatched = r.dispatched,
467 fitness_before = r.fitness_before,
468 fitness_after = r.fitness_after,
469 "patch_dispatch"
470 );
471 }
472 tracing::debug!(
473 target: "ainl_runtime",
474 duration_ms = t_patches.elapsed().as_millis() as u64,
475 "patch_dispatch_phase"
476 );
477
478 let dispatched_count = patch_dispatch_results
479 .iter()
480 .filter(|r| r.dispatched)
481 .count() as u32;
482 if dispatched_count >= self.config.max_steps {
483 let result = TurnResult {
484 patch_dispatch_results,
485 memory_context,
486 persona_prompt_contribution,
487 steps_executed: dispatched_count,
488 status: TurnStatus::StepLimitExceeded {
489 steps_executed: dispatched_count,
490 },
491 ..Default::default()
492 };
493 let outcome = TurnOutcome::Complete(result);
494 self.hooks.on_turn_complete(&outcome);
495 return Ok(outcome);
496 }
497
498 let t_episode = Instant::now();
499 let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
500 let episode_id = match self.memory.with(|m| {
501 record_turn_episode(m, &self.config.agent_id, &input, &tools_canonical)
502 }) {
503 Ok(id) => id,
504 Err(e) => {
505 tracing::warn!(
506 phase = ?TurnPhase::EpisodeWrite,
507 error = %e,
508 "non-fatal turn write failed — continuing"
509 );
510 turn_warnings.push(TurnWarning {
511 phase: TurnPhase::EpisodeWrite,
512 error: e,
513 });
514 Uuid::nil()
515 }
516 };
517 self.hooks.on_episode_recorded(episode_id);
518 tracing::debug!(
519 target: "ainl_runtime",
520 duration_ms = t_episode.elapsed().as_millis() as u64,
521 episode_id = %episode_id,
522 "episode_record"
523 );
524
525 if !episode_id.is_nil() {
526 for &tid in &input.emit_targets {
527 if let Err(e) = self.memory.with(|m| {
528 m.sqlite_store()
529 .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)
530 }) {
531 tracing::warn!(
532 phase = ?TurnPhase::EpisodeWrite,
533 error = %e,
534 "non-fatal turn write failed — continuing"
535 );
536 turn_warnings.push(TurnWarning {
537 phase: TurnPhase::EpisodeWrite,
538 error: e,
539 });
540 }
541 }
542 }
543
544 let emit_payload = serde_json::json!({
545 "episode_id": episode_id.to_string(),
546 "user_message": input.user_message,
547 "tools_invoked": tools_canonical,
548 "persona_contribution": persona_prompt_contribution,
549 "turn_count": self.turn_count.wrapping_add(1),
550 });
551 if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
552 tracing::warn!(
553 phase = ?TurnPhase::EpisodeWrite,
554 error = %e,
555 "non-fatal turn write failed — continuing"
556 );
557 turn_warnings.push(TurnWarning {
558 phase: TurnPhase::EpisodeWrite,
559 error: format!("emit_routing: {e}"),
560 });
561 }
562
563 self.turn_count = self.turn_count.wrapping_add(1);
564
565 let should_extract = self.config.extraction_interval > 0
566 && self.turn_count.saturating_sub(self.last_extraction_at_turn)
567 >= self.config.extraction_interval as u64;
568
569 let t_extract = Instant::now();
570 let (extraction_report, _extraction_failed) = if should_extract {
571 let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
572
573 let res = if force_fail {
574 let e = "test_forced".to_string();
575 tracing::warn!(
576 phase = ?TurnPhase::ExtractionPass,
577 error = %e,
578 "non-fatal turn write failed — continuing"
579 );
580 turn_warnings.push(TurnWarning {
581 phase: TurnPhase::ExtractionPass,
582 error: e,
583 });
584 tracing::debug!(
585 target: "ainl_runtime",
586 duration_ms = t_extract.elapsed().as_millis() as u64,
587 signals_ingested = 0u64,
588 skipped = false,
589 "extraction_pass"
590 );
591 (None, true)
592 } else {
593 let report = self
594 .memory
595 .with(|m| self.extractor.run_pass(m.sqlite_store()));
596 if let Some(ref e) = report.extract_error {
597 tracing::warn!(
598 phase = ?TurnPhase::ExtractionPass,
599 error = %e,
600 "non-fatal turn write failed — continuing"
601 );
602 turn_warnings.push(TurnWarning {
603 phase: TurnPhase::ExtractionPass,
604 error: e.clone(),
605 });
606 }
607 if let Some(ref e) = report.pattern_error {
608 tracing::warn!(
609 phase = ?TurnPhase::PatternPersistence,
610 error = %e,
611 "non-fatal turn write failed — continuing"
612 );
613 turn_warnings.push(TurnWarning {
614 phase: TurnPhase::PatternPersistence,
615 error: e.clone(),
616 });
617 }
618 if let Some(ref e) = report.persona_error {
619 tracing::warn!(
620 phase = ?TurnPhase::PersonaEvolution,
621 error = %e,
622 "non-fatal turn write failed — continuing"
623 );
624 turn_warnings.push(TurnWarning {
625 phase: TurnPhase::PersonaEvolution,
626 error: e.clone(),
627 });
628 }
629 let extraction_failed = report.has_errors();
630 if !extraction_failed {
631 tracing::info!(
632 agent_id = %report.agent_id,
633 signals_extracted = report.signals_extracted,
634 signals_applied = report.signals_applied,
635 semantic_nodes_updated = report.semantic_nodes_updated,
636 "ainl-graph-extractor pass completed (scheduled)"
637 );
638 }
639 self.hooks.on_extraction_complete(&report);
640 self.persona_cache = None;
641 tracing::debug!(
642 target: "ainl_runtime",
643 duration_ms = t_extract.elapsed().as_millis() as u64,
644 signals_ingested = report.signals_extracted as u64,
645 skipped = false,
646 "extraction_pass"
647 );
648 (Some(report), extraction_failed)
649 };
650 self.last_extraction_at_turn = self.turn_count;
651 res
652 } else {
653 tracing::debug!(
654 target: "ainl_runtime",
655 duration_ms = t_extract.elapsed().as_millis() as u64,
656 signals_ingested = 0u64,
657 skipped = true,
658 "extraction_pass"
659 );
660 (None, false)
661 };
662
663 if let Err(e) = self.memory.with(|m| {
664 try_export_graph_json_armaraos(m.sqlite_store(), &self.config.agent_id)
665 }) {
666 tracing::warn!(
667 phase = ?TurnPhase::ExportRefresh,
668 error = %e,
669 "non-fatal turn write failed — continuing"
670 );
671 turn_warnings.push(TurnWarning {
672 phase: TurnPhase::ExportRefresh,
673 error: e,
674 });
675 }
676
677 if !self.config.agent_id.is_empty() {
678 let state = RuntimeStateNode {
679 agent_id: self.config.agent_id.clone(),
680 turn_count: self.turn_count,
681 last_extraction_at_turn: self.last_extraction_at_turn,
682 persona_snapshot_json: self
683 .persona_cache
684 .as_ref()
685 .and_then(|p| serde_json::to_string(p).ok()),
686 updated_at: chrono::Utc::now().timestamp(),
687 };
688 let write_res = if std::mem::take(&mut self.test_force_runtime_state_write_failure) {
689 Err("injected runtime state write failure".to_string())
690 } else {
691 self.memory.with(|m| m.write_runtime_state(&state))
692 };
693 if let Err(e) = write_res {
694 tracing::warn!(
695 phase = ?TurnPhase::RuntimeStatePersist,
696 error = %e,
697 "failed to persist runtime state — cadence will reset on next restart"
698 );
699 turn_warnings.push(TurnWarning {
700 phase: TurnPhase::RuntimeStatePersist,
701 error: e,
702 });
703 }
704 }
705
706 if let (Some(gate), Some(phase), Some(trust)) = (
708 input.vitals_gate.as_deref(),
709 input.vitals_phase.as_deref(),
710 input.vitals_trust,
711 ) {
712 self.hooks.on_vitals_classified(gate, phase, trust);
713 }
714
715 let result = TurnResult {
716 episode_id,
717 persona_prompt_contribution,
718 memory_context,
719 extraction_report,
720 steps_executed: dispatched_count,
721 patch_dispatch_results,
722 status: TurnStatus::Ok,
723 vitals_gate: input.vitals_gate.clone(),
724 vitals_phase: input.vitals_phase.clone(),
725 vitals_trust: input.vitals_trust,
726 };
727
728 let outcome = if turn_warnings.is_empty() {
729 TurnOutcome::Complete(result)
730 } else {
731 TurnOutcome::PartialSuccess {
732 result,
733 warnings: turn_warnings,
734 }
735 };
736
737 self.hooks.on_turn_complete(&outcome);
738 Ok(outcome)
739 }
740
741 fn relevant_semantic_nodes(
749 &self,
750 user_message: &str,
751 all_semantic: Vec<AinlMemoryNode>,
752 limit: usize,
753 ) -> Vec<AinlMemoryNode> {
754 let user_tags = infer_topic_tags(user_message);
755 let user_topics: HashSet<String> = user_tags
756 .iter()
757 .filter(|t| t.namespace == TagNamespace::Topic)
758 .map(|t| t.value.to_lowercase())
759 .collect();
760
761 if user_message.trim().is_empty() || user_topics.is_empty() {
762 return fallback_high_recurrence_semantic(all_semantic, limit);
763 }
764
765 let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
766 for n in all_semantic {
767 let (score, rec) = match &n.node_type {
768 AinlNodeType::Semantic { semantic } => {
769 let mut s = 0f32;
770 if let Some(cluster) = &semantic.topic_cluster {
771 for slug in cluster
772 .split([',', ';'])
773 .map(|s| s.trim().to_lowercase())
774 .filter(|s| !s.is_empty())
775 {
776 if user_topics.contains(&slug) {
777 s += 1.0;
778 }
779 }
780 }
781 if s == 0.0 {
782 for tag in &semantic.tags {
783 let tl = tag.to_lowercase();
784 if let Some(rest) = tl.strip_prefix("topic:") {
785 let slug = rest.trim().to_lowercase();
786 if user_topics.contains(&slug) {
787 s = 0.5;
788 break;
789 }
790 }
791 }
792 }
793 let confidence = semantic.confidence;
798 for tag in &semantic.tags {
799 let tl = tag.to_lowercase();
800 if tl.starts_with("vitals:") {
801 if tl.ends_with(":pass") {
802 s += 0.2 * confidence;
803 } else if tl == "vitals:elevated" {
804 s -= 0.1;
805 }
806 }
807 }
808 (s, semantic.recurrence_count)
809 }
810 _ => (0.0, 0),
811 };
812 scored.push((score, rec, n));
813 }
814
815 scored.sort_by(|a, b| {
816 b.0.partial_cmp(&a.0)
817 .unwrap_or(std::cmp::Ordering::Equal)
818 .then_with(|| b.1.cmp(&a.1))
819 });
820 scored.into_iter().take(limit).map(|t| t.2).collect()
821 }
822
823 pub fn dispatch_patches(
824 &mut self,
825 patches: &[AinlMemoryNode],
826 frame: &HashMap<String, serde_json::Value>,
827 ) -> Vec<PatchDispatchResult> {
828 let mut w = Vec::new();
829 self.dispatch_patches_collect(patches, frame, &mut w)
830 }
831
832 fn dispatch_patches_collect(
833 &mut self,
834 patches: &[AinlMemoryNode],
835 frame: &HashMap<String, serde_json::Value>,
836 turn_warnings: &mut Vec<TurnWarning>,
837 ) -> Vec<PatchDispatchResult> {
838 let mut out = Vec::new();
839 for node in patches {
840 let res = self.dispatch_one_patch(node, frame);
841 if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
842 tracing::warn!(
843 phase = ?TurnPhase::FitnessWriteBack,
844 error = %e,
845 "non-fatal turn write failed — continuing"
846 );
847 turn_warnings.push(TurnWarning {
848 phase: TurnPhase::FitnessWriteBack,
849 error: format!("{}: {}", res.label, e),
850 });
851 }
852 out.push(res);
853 }
854 out
855 }
856
857 fn dispatch_one_patch(
858 &mut self,
859 node: &AinlMemoryNode,
860 frame: &HashMap<String, serde_json::Value>,
861 ) -> PatchDispatchResult {
862 let label_default = String::new();
863 let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
864 AinlNodeType::Procedural { procedural } => (
865 procedural_label(procedural),
866 procedural.patch_version,
867 procedural.retired,
868 procedural.declared_reads.clone(),
869 procedural.fitness,
870 ),
871 _ => {
872 return PatchDispatchResult {
873 label: label_default,
874 patch_version: 0,
875 fitness_before: 0.0,
876 fitness_after: 0.0,
877 dispatched: false,
878 skip_reason: Some(PatchSkipReason::NotProcedural),
879 adapter_output: None,
880 adapter_name: None,
881 };
882 }
883 };
884
885 if pv == 0 {
886 return PatchDispatchResult {
887 label: label_src,
888 patch_version: pv,
889 fitness_before: fitness_opt.unwrap_or(0.5),
890 fitness_after: fitness_opt.unwrap_or(0.5),
891 dispatched: false,
892 skip_reason: Some(PatchSkipReason::ZeroVersion),
893 adapter_output: None,
894 adapter_name: None,
895 };
896 }
897 if retired {
898 return PatchDispatchResult {
899 label: label_src.clone(),
900 patch_version: pv,
901 fitness_before: fitness_opt.unwrap_or(0.5),
902 fitness_after: fitness_opt.unwrap_or(0.5),
903 dispatched: false,
904 skip_reason: Some(PatchSkipReason::Retired),
905 adapter_output: None,
906 adapter_name: None,
907 };
908 }
909 for key in &reads {
910 if !frame.contains_key(key) {
911 return PatchDispatchResult {
912 label: label_src.clone(),
913 patch_version: pv,
914 fitness_before: fitness_opt.unwrap_or(0.5),
915 fitness_after: fitness_opt.unwrap_or(0.5),
916 dispatched: false,
917 skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
918 adapter_output: None,
919 adapter_name: None,
920 };
921 }
922 }
923
924 let patch_label = label_src.clone();
925 let adapter_key = patch_label.as_str();
926 let ctx = PatchDispatchContext {
927 patch_label: adapter_key,
928 node,
929 frame,
930 };
931 let (adapter_output, adapter_name) = if let Some(adapter) = self
932 .adapter_registry
933 .get(adapter_key)
934 .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
935 {
936 let aname = adapter.name().to_string();
937 match adapter.execute_patch(&ctx) {
938 Ok(output) => {
939 tracing::debug!(
940 label = %patch_label,
941 adapter = %aname,
942 "adapter executed patch"
943 );
944 (Some(output), Some(aname))
945 }
946 Err(e) => {
947 tracing::warn!(
948 label = %patch_label,
949 adapter = %aname,
950 error = %e,
951 "adapter execution failed — continuing as metadata dispatch"
952 );
953 (None, Some(aname))
954 }
955 }
956 } else {
957 (None, None)
958 };
959
960 let fitness_before = fitness_opt.unwrap_or(0.5);
961 let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
962
963 let updated = match self.memory.with(|m| {
964 let store = m.sqlite_store();
965 store.read_node(node.id)
966 }) {
967 Ok(Some(mut n)) => {
968 if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
969 procedural.fitness = Some(fitness_after);
970 }
971 n
972 }
973 Ok(None) => {
974 return PatchDispatchResult {
975 label: label_src,
976 patch_version: pv,
977 fitness_before,
978 fitness_after: fitness_before,
979 dispatched: false,
980 skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
981 adapter_output,
982 adapter_name,
983 };
984 }
985 Err(e) => {
986 return PatchDispatchResult {
987 label: label_src,
988 patch_version: pv,
989 fitness_before,
990 fitness_after: fitness_before,
991 dispatched: false,
992 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
993 adapter_output,
994 adapter_name,
995 };
996 }
997 };
998
999 if self.test_force_fitness_write_failure {
1000 self.test_force_fitness_write_failure = false;
1001 let e = "injected fitness write failure".to_string();
1002 return PatchDispatchResult {
1003 label: label_src.clone(),
1004 patch_version: pv,
1005 fitness_before,
1006 fitness_after: fitness_before,
1007 dispatched: false,
1008 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1009 adapter_output,
1010 adapter_name,
1011 };
1012 }
1013
1014 if let Err(e) = self.memory.with(|m| m.write_node(&updated)) {
1015 return PatchDispatchResult {
1016 label: label_src.clone(),
1017 patch_version: pv,
1018 fitness_before,
1019 fitness_after: fitness_before,
1020 dispatched: false,
1021 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1022 adapter_output,
1023 adapter_name,
1024 };
1025 }
1026
1027 self.hooks
1028 .on_patch_dispatched(label_src.as_str(), fitness_after);
1029
1030 PatchDispatchResult {
1031 label: label_src,
1032 patch_version: pv,
1033 fitness_before,
1034 fitness_after,
1035 dispatched: true,
1036 skip_reason: None,
1037 adapter_output,
1038 adapter_name,
1039 }
1040 }
1041}
1042
1043pub(crate) fn emit_target_name(n: &AinlMemoryNode) -> String {
1044 match &n.node_type {
1045 AinlNodeType::Persona { persona } => persona.trait_name.clone(),
1046 AinlNodeType::Procedural { procedural } => procedural_label(procedural),
1047 AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
1048 AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
1049 AinlNodeType::RuntimeState { runtime_state } => {
1050 format!("runtime_state:{}", runtime_state.agent_id)
1051 }
1052 }
1053}
1054
1055pub(crate) fn procedural_label(p: &ProceduralNode) -> String {
1056 if !p.label.is_empty() {
1057 p.label.clone()
1058 } else {
1059 p.pattern_name.clone()
1060 }
1061}
1062
1063pub(crate) fn fallback_high_recurrence_semantic(
1064 all: Vec<AinlMemoryNode>,
1065 limit: usize,
1066) -> Vec<AinlMemoryNode> {
1067 let mut v: Vec<_> = all
1068 .into_iter()
1069 .filter(|n| {
1070 matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
1071 })
1072 .collect();
1073 v.sort_by(|a, b| {
1074 let ra = match &a.node_type {
1075 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1076 _ => 0,
1077 };
1078 let rb = match &b.node_type {
1079 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1080 _ => 0,
1081 };
1082 rb.cmp(&ra)
1083 });
1084 v.into_iter().take(limit).collect()
1085}
1086
1087pub(crate) fn persona_snapshot_if_evolved(
1088 extractor: &GraphExtractorTask,
1089) -> Option<ainl_persona::PersonaSnapshot> {
1090 let snap = extractor.evolution_engine.snapshot();
1091 let defaults = default_axis_map(0.5);
1092 for axis in PersonaAxis::ALL {
1093 let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
1094 let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
1095 if (s - d).abs() > INGEST_SCORE_EPSILON {
1096 return Some(snap);
1097 }
1098 }
1099 None
1100}
1101
1102pub(crate) fn compile_persona_from_nodes(
1103 nodes: &[AinlMemoryNode],
1104) -> Result<Option<String>, String> {
1105 if nodes.is_empty() {
1106 return Ok(None);
1107 }
1108 let mut lines = Vec::new();
1109 for n in nodes {
1110 if let AinlNodeType::Persona { persona } = &n.node_type {
1111 lines.push(format_persona_line(persona));
1112 }
1113 }
1114 if lines.is_empty() {
1115 Ok(None)
1116 } else {
1117 Ok(Some(lines.join("\n")))
1118 }
1119}
1120
1121fn format_persona_line(p: &PersonaNode) -> String {
1122 format!(
1123 "- {} (strength {:.2}, layer {:?}, source {:?})",
1124 p.trait_name, p.strength, p.layer, p.source
1125 )
1126}
1127
1128pub(crate) fn try_export_graph_json_armaraos(
1132 store: &SqliteGraphStore,
1133 agent_id: &str,
1134) -> Result<(), String> {
1135 let trimmed = std::env::var("AINL_GRAPH_MEMORY_ARMARAOS_EXPORT").unwrap_or_default();
1136 let dir = trimmed.trim();
1137 if dir.is_empty() {
1138 return Ok(());
1139 }
1140 let dir_path = PathBuf::from(dir);
1141 std::fs::create_dir_all(&dir_path).map_err(|e| format!("export mkdir: {e}"))?;
1142 let path = dir_path.join(format!("{agent_id}_graph_export.json"));
1143 let snap = store.export_graph(agent_id)?;
1144 let v = serde_json::to_value(&snap).map_err(|e| format!("serialize: {e}"))?;
1145 std::fs::write(
1146 &path,
1147 serde_json::to_vec_pretty(&v).map_err(|e| format!("json encode: {e}"))?,
1148 )
1149 .map_err(|e| format!("write export: {e}"))?;
1150 Ok(())
1151}
1152
1153pub(crate) fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
1154 if tools_invoked.is_empty() {
1155 return vec!["turn".to_string()];
1156 }
1157 let tags = tag_tool_names(tools_invoked);
1158 let mut seen: BTreeSet<String> = BTreeSet::new();
1159 for t in tags {
1160 if t.namespace == TagNamespace::Tool {
1161 seen.insert(t.value);
1162 }
1163 }
1164 if seen.is_empty() {
1165 vec!["turn".to_string()]
1166 } else {
1167 seen.into_iter().collect()
1168 }
1169}
1170
1171pub(crate) fn record_turn_episode(
1172 memory: &ainl_memory::GraphMemory,
1173 agent_id: &str,
1174 input: &TurnInput,
1175 tools_invoked_canonical: &[String],
1176) -> Result<Uuid, String> {
1177 let turn_id = Uuid::new_v4();
1178 let timestamp = chrono::Utc::now().timestamp();
1179 let tools = tools_invoked_canonical.to_vec();
1180 let mut node = AinlMemoryNode::new_episode(
1181 turn_id,
1182 timestamp,
1183 tools.clone(),
1184 None,
1185 input.trace_event.clone(),
1186 );
1187 node.agent_id = agent_id.to_string();
1188 if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
1189 episodic.user_message = Some(input.user_message.clone());
1190 episodic.tools_invoked = tools;
1191 episodic.vitals_gate = input.vitals_gate.clone();
1193 episodic.vitals_phase = input.vitals_phase.clone();
1194 episodic.vitals_trust = input.vitals_trust;
1195 }
1196 memory.write_node(&node)?;
1197 Ok(node.id)
1198}
1199
1200#[cfg(feature = "async")]
1201#[path = "runtime_async.rs"]
1202mod runtime_async_impl;