1use std::collections::{BTreeSet, HashMap, HashSet};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use ainl_graph_extractor::GraphExtractorTask;
10use ainl_memory::{
11 AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
12 RuntimeStateNode, SqliteGraphStore,
13};
14use ainl_persona::axes::default_axis_map;
15use ainl_persona::{
16 EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
17};
18use ainl_semantic_tagger::infer_topic_tags;
19use ainl_semantic_tagger::tag_tool_names;
20use ainl_semantic_tagger::TagNamespace;
21use uuid::Uuid;
22
23use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
24use crate::engine::{
25 AinlGraphArtifact, AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult,
26 PatchSkipReason, TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning,
27 EMIT_TO_EDGE,
28};
29use crate::graph_cell::{GraphCell, SqliteStoreRef};
30use crate::hooks::{NoOpHooks, TurnHooks};
31#[cfg(feature = "async")]
32use crate::hooks::TurnHooksAsync;
33use crate::RuntimeConfig;
34
35pub struct AinlRuntime {
51 config: RuntimeConfig,
52 memory: GraphCell,
53 extractor: GraphExtractorTask,
54 turn_count: u64,
55 last_extraction_at_turn: u64,
56 current_depth: Arc<AtomicU32>,
58 hooks: Box<dyn TurnHooks>,
59 evolution_writes_enabled: bool,
63 persona_cache: Option<String>,
64 #[doc(hidden)]
66 test_force_extraction_failure: bool,
67 #[doc(hidden)]
69 test_force_fitness_write_failure: bool,
70 #[doc(hidden)]
72 test_force_runtime_state_write_failure: bool,
73 adapter_registry: AdapterRegistry,
74 #[cfg(feature = "async")]
76 hooks_async: Option<std::sync::Arc<dyn TurnHooksAsync>>,
77}
78
79impl AinlRuntime {
80 pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
81 let agent_id = config.agent_id.clone();
82 let memory = GraphCell::new(store);
83 let (init_turn_count, init_persona_cache, init_last_extraction_at_turn) =
84 if agent_id.is_empty() {
85 (0, None, 0)
86 } else {
87 match memory.read_runtime_state(&agent_id) {
88 Ok(Some(state)) => {
89 tracing::info!(
90 agent_id = %agent_id,
91 turn_count = state.turn_count,
92 "restored runtime state"
93 );
94 let persona_cache = state
95 .persona_snapshot_json
96 .as_ref()
97 .and_then(|json| serde_json::from_str::<String>(json).ok());
98 (
99 state.turn_count,
100 persona_cache,
101 state.last_extraction_at_turn,
102 )
103 }
104 Ok(None) => (0, None, 0),
105 Err(e) => {
106 tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
107 (0, None, 0)
108 }
109 }
110 };
111 Self {
112 extractor: GraphExtractorTask::new(&agent_id),
113 memory,
114 config,
115 turn_count: init_turn_count,
116 last_extraction_at_turn: init_last_extraction_at_turn,
117 current_depth: Arc::new(AtomicU32::new(0)),
118 hooks: Box::new(NoOpHooks),
119 evolution_writes_enabled: true,
120 persona_cache: init_persona_cache,
121 test_force_extraction_failure: false,
122 test_force_fitness_write_failure: false,
123 test_force_runtime_state_write_failure: false,
124 adapter_registry: AdapterRegistry::new(),
125 #[cfg(feature = "async")]
126 hooks_async: None,
127 }
128 }
129
130 pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
132 self.adapter_registry.register(adapter);
133 }
134
135 pub fn register_default_patch_adapters(&mut self) {
138 self.register_adapter(GraphPatchAdapter::new());
139 }
140
141 pub fn registered_adapters(&self) -> Vec<&str> {
143 self.adapter_registry.registered_names()
144 }
145
146 #[doc(hidden)]
147 pub fn test_turn_count(&self) -> u64 {
148 self.turn_count
149 }
150
151 #[doc(hidden)]
152 pub fn test_persona_cache(&self) -> Option<&str> {
153 self.persona_cache.as_deref()
154 }
155
156 #[doc(hidden)]
157 pub fn test_delegation_depth(&self) -> u32 {
158 self.current_depth.load(Ordering::SeqCst)
159 }
160
161 #[doc(hidden)]
162 pub fn test_set_delegation_depth(&mut self, depth: u32) {
163 self.current_depth.store(depth, Ordering::SeqCst);
164 }
165
166 #[doc(hidden)]
167 pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
168 self.test_force_extraction_failure = fail;
169 }
170
171 #[doc(hidden)]
172 pub fn test_set_force_fitness_write_failure(&mut self, fail: bool) {
173 self.test_force_fitness_write_failure = fail;
174 }
175
176 #[doc(hidden)]
178 pub fn test_extractor_mut(&mut self) -> &mut GraphExtractorTask {
179 &mut self.extractor
180 }
181
182 #[doc(hidden)]
183 pub fn test_set_force_runtime_state_write_failure(&mut self, fail: bool) {
184 self.test_force_runtime_state_write_failure = fail;
185 }
186
187 pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
188 self.hooks = Box::new(hooks);
189 self
190 }
191
192 #[cfg(feature = "async")]
194 pub fn with_hooks_async(mut self, hooks: std::sync::Arc<dyn TurnHooksAsync>) -> Self {
195 self.hooks_async = Some(hooks);
196 self
197 }
198
199 pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
204 self.evolution_writes_enabled = enabled;
205 self
206 }
207
208 fn require_evolution_writes_enabled(&self) -> Result<(), String> {
209 if self.evolution_writes_enabled {
210 Ok(())
211 } else {
212 Err(
213 "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
214 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
215 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
216 ainl_memory.db"
217 .to_string(),
218 )
219 }
220 }
221
222 pub fn sqlite_store(&self) -> SqliteStoreRef<'_> {
229 self.memory.sqlite_ref()
230 }
231
232 pub fn evolution_engine(&self) -> &EvolutionEngine {
240 &self.extractor.evolution_engine
241 }
242
243 pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
247 &mut self.extractor.evolution_engine
248 }
249
250 pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
252 self.extractor.evolution_engine.ingest_signals(signals)
253 }
254
255 pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
257 self.extractor
258 .evolution_engine
259 .correction_tick(axis, correction);
260 }
261
262 pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
266 self.require_evolution_writes_enabled()?;
267 let snap = self.extractor.evolution_engine.snapshot();
268 self.memory.with(|m| {
269 self.extractor
270 .evolution_engine
271 .write_persona_node(m.sqlite_store(), &snap)
272 })?;
273 Ok(snap)
274 }
275
276 pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
283 self.require_evolution_writes_enabled()?;
284 self.memory
285 .with(|m| self.extractor.evolution_engine.evolve(m.sqlite_store()))
286 }
287
288 pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
290 self.memory
291 .with(|m| AinlGraphArtifact::load(m.sqlite_store(), &self.config.agent_id))
292 }
293
294 pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
297 self.compile_memory_context_for(None)
298 }
299
300 pub fn compile_memory_context_for(
305 &self,
306 user_message: Option<&str>,
307 ) -> Result<MemoryContext, String> {
308 if self.config.agent_id.is_empty() {
309 return Err("RuntimeConfig.agent_id must be set".to_string());
310 }
311 self.memory.with(|m| {
312 let store = m.sqlite_store();
313 let q = store.query(&self.config.agent_id);
314 let recent_episodes = q.recent_episodes(5)?;
315 let all_semantic = q.semantic_nodes()?;
316 let relevant_semantic = self.relevant_semantic_nodes(
317 user_message.unwrap_or(""),
318 all_semantic,
319 10,
320 );
321 let active_patches = q.active_patches()?;
322 let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
323 Ok(MemoryContext {
324 recent_episodes,
325 relevant_semantic,
326 active_patches,
327 persona_snapshot,
328 compiled_at: chrono::Utc::now(),
329 })
330 })
331 }
332
333 pub fn route_emit_edges(
335 &self,
336 episode_id: Uuid,
337 turn_output_payload: &serde_json::Value,
338 ) -> Result<(), String> {
339 self.memory.with(|m| {
340 let store = m.sqlite_store();
341 let neighbors = store
342 .query(&self.config.agent_id)
343 .neighbors(episode_id, EMIT_TO_EDGE)?;
344 for n in neighbors {
345 let target = emit_target_name(&n);
346 self.hooks.on_emit(&target, turn_output_payload);
347 }
348 Ok(())
349 })
350 }
351
352 pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutcome, AinlRuntimeError> {
354 let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
355 let cd = Arc::clone(&self.current_depth);
356 let _depth_guard = scopeguard::guard((), move |()| {
357 cd.fetch_sub(1, Ordering::SeqCst);
358 });
359
360 if depth >= self.config.max_delegation_depth {
361 return Err(AinlRuntimeError::DelegationDepthExceeded {
362 depth,
363 max: self.config.max_delegation_depth,
364 });
365 }
366
367 if !self.config.enable_graph_memory {
368 let memory_context = MemoryContext::default();
369 let result = TurnResult {
370 memory_context,
371 status: TurnStatus::GraphMemoryDisabled,
372 ..Default::default()
373 };
374 let outcome = TurnOutcome::Complete(result);
375 self.hooks.on_turn_complete(&outcome);
376 return Ok(outcome);
377 }
378
379 if self.config.agent_id.is_empty() {
380 return Err(AinlRuntimeError::Message(
381 "RuntimeConfig.agent_id must be set for run_turn".into(),
382 ));
383 }
384
385 let span = tracing::info_span!(
386 "ainl_runtime.run_turn",
387 agent_id = %self.config.agent_id,
388 turn = self.turn_count,
389 depth = input.depth,
390 );
391 let _span_enter = span.enter();
392
393 let validation: GraphValidationReport = self
394 .memory
395 .with(|m| m.sqlite_store().validate_graph(&self.config.agent_id))
396 .map_err(AinlRuntimeError::from)?;
397 if !validation.is_valid {
398 let mut msg = String::from("graph validation failed before turn");
399 for d in &validation.dangling_edge_details {
400 msg.push_str(&format!(
401 "; {} -> {} [{}]",
402 d.source_id, d.target_id, d.edge_type
403 ));
404 }
405 return Err(AinlRuntimeError::Message(msg));
406 }
407
408 self.hooks
409 .on_artifact_loaded(&self.config.agent_id, validation.node_count);
410
411 let mut turn_warnings: Vec<TurnWarning> = Vec::new();
412
413 let t_persona = Instant::now();
414 let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
415 Some(cached.clone())
416 } else {
417 let nodes = self
418 .memory
419 .with(|m| {
420 m.sqlite_store()
421 .query(&self.config.agent_id)
422 .persona_nodes()
423 })
424 .map_err(AinlRuntimeError::from)?;
425 let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
426 self.persona_cache = compiled.clone();
427 compiled
428 };
429 self.hooks
430 .on_persona_compiled(persona_prompt_contribution.as_deref());
431 tracing::debug!(
432 target: "ainl_runtime",
433 duration_ms = t_persona.elapsed().as_millis() as u64,
434 has_contribution = persona_prompt_contribution.is_some(),
435 "persona_phase"
436 );
437
438 let t_memory = Instant::now();
439 let memory_context = self
440 .compile_memory_context_for(Some(&input.user_message))
441 .map_err(AinlRuntimeError::from)?;
442 self.hooks.on_memory_context_ready(&memory_context);
443 tracing::debug!(
444 target: "ainl_runtime",
445 duration_ms = t_memory.elapsed().as_millis() as u64,
446 episode_count = memory_context.recent_episodes.len(),
447 semantic_count = memory_context.relevant_semantic.len(),
448 patch_count = memory_context.active_patches.len(),
449 "memory_context"
450 );
451
452 let t_patches = Instant::now();
453 let patch_dispatch_results = if self.config.enable_graph_memory {
454 self.dispatch_patches_collect(
455 &memory_context.active_patches,
456 &input.frame,
457 &mut turn_warnings,
458 )
459 } else {
460 Vec::new()
461 };
462 for r in &patch_dispatch_results {
463 tracing::debug!(
464 target: "ainl_runtime",
465 label = %r.label,
466 dispatched = r.dispatched,
467 fitness_before = r.fitness_before,
468 fitness_after = r.fitness_after,
469 "patch_dispatch"
470 );
471 }
472 tracing::debug!(
473 target: "ainl_runtime",
474 duration_ms = t_patches.elapsed().as_millis() as u64,
475 "patch_dispatch_phase"
476 );
477
478 let dispatched_count = patch_dispatch_results
479 .iter()
480 .filter(|r| r.dispatched)
481 .count() as u32;
482 if dispatched_count >= self.config.max_steps {
483 let result = TurnResult {
484 patch_dispatch_results,
485 memory_context,
486 persona_prompt_contribution,
487 steps_executed: dispatched_count,
488 status: TurnStatus::StepLimitExceeded {
489 steps_executed: dispatched_count,
490 },
491 ..Default::default()
492 };
493 let outcome = TurnOutcome::Complete(result);
494 self.hooks.on_turn_complete(&outcome);
495 return Ok(outcome);
496 }
497
498 let t_episode = Instant::now();
499 let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
500 let episode_id = match self.memory.with(|m| {
501 record_turn_episode(m, &self.config.agent_id, &input, &tools_canonical)
502 }) {
503 Ok(id) => id,
504 Err(e) => {
505 tracing::warn!(
506 phase = ?TurnPhase::EpisodeWrite,
507 error = %e,
508 "non-fatal turn write failed — continuing"
509 );
510 turn_warnings.push(TurnWarning {
511 phase: TurnPhase::EpisodeWrite,
512 error: e,
513 });
514 Uuid::nil()
515 }
516 };
517 self.hooks.on_episode_recorded(episode_id);
518 tracing::debug!(
519 target: "ainl_runtime",
520 duration_ms = t_episode.elapsed().as_millis() as u64,
521 episode_id = %episode_id,
522 "episode_record"
523 );
524
525 if !episode_id.is_nil() {
526 for &tid in &input.emit_targets {
527 if let Err(e) = self.memory.with(|m| {
528 m.sqlite_store()
529 .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)
530 }) {
531 tracing::warn!(
532 phase = ?TurnPhase::EpisodeWrite,
533 error = %e,
534 "non-fatal turn write failed — continuing"
535 );
536 turn_warnings.push(TurnWarning {
537 phase: TurnPhase::EpisodeWrite,
538 error: e,
539 });
540 }
541 }
542 }
543
544 let emit_payload = serde_json::json!({
545 "episode_id": episode_id.to_string(),
546 "user_message": input.user_message,
547 "tools_invoked": tools_canonical,
548 "persona_contribution": persona_prompt_contribution,
549 "turn_count": self.turn_count.wrapping_add(1),
550 });
551 if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
552 tracing::warn!(
553 phase = ?TurnPhase::EpisodeWrite,
554 error = %e,
555 "non-fatal turn write failed — continuing"
556 );
557 turn_warnings.push(TurnWarning {
558 phase: TurnPhase::EpisodeWrite,
559 error: format!("emit_routing: {e}"),
560 });
561 }
562
563 self.turn_count = self.turn_count.wrapping_add(1);
564
565 let should_extract = self.config.extraction_interval > 0
566 && self.turn_count.saturating_sub(self.last_extraction_at_turn)
567 >= self.config.extraction_interval as u64;
568
569 let t_extract = Instant::now();
570 let (extraction_report, _extraction_failed) = if should_extract {
571 let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
572
573 let res = if force_fail {
574 let e = "test_forced".to_string();
575 tracing::warn!(
576 phase = ?TurnPhase::ExtractionPass,
577 error = %e,
578 "non-fatal turn write failed — continuing"
579 );
580 turn_warnings.push(TurnWarning {
581 phase: TurnPhase::ExtractionPass,
582 error: e,
583 });
584 tracing::debug!(
585 target: "ainl_runtime",
586 duration_ms = t_extract.elapsed().as_millis() as u64,
587 signals_ingested = 0u64,
588 skipped = false,
589 "extraction_pass"
590 );
591 (None, true)
592 } else {
593 let report = self
594 .memory
595 .with(|m| self.extractor.run_pass(m.sqlite_store()));
596 if let Some(ref e) = report.extract_error {
597 tracing::warn!(
598 phase = ?TurnPhase::ExtractionPass,
599 error = %e,
600 "non-fatal turn write failed — continuing"
601 );
602 turn_warnings.push(TurnWarning {
603 phase: TurnPhase::ExtractionPass,
604 error: e.clone(),
605 });
606 }
607 if let Some(ref e) = report.pattern_error {
608 tracing::warn!(
609 phase = ?TurnPhase::PatternPersistence,
610 error = %e,
611 "non-fatal turn write failed — continuing"
612 );
613 turn_warnings.push(TurnWarning {
614 phase: TurnPhase::PatternPersistence,
615 error: e.clone(),
616 });
617 }
618 if let Some(ref e) = report.persona_error {
619 tracing::warn!(
620 phase = ?TurnPhase::PersonaEvolution,
621 error = %e,
622 "non-fatal turn write failed — continuing"
623 );
624 turn_warnings.push(TurnWarning {
625 phase: TurnPhase::PersonaEvolution,
626 error: e.clone(),
627 });
628 }
629 let extraction_failed = report.has_errors();
630 if !extraction_failed {
631 tracing::info!(
632 agent_id = %report.agent_id,
633 signals_extracted = report.signals_extracted,
634 signals_applied = report.signals_applied,
635 semantic_nodes_updated = report.semantic_nodes_updated,
636 "ainl-graph-extractor pass completed (scheduled)"
637 );
638 }
639 self.hooks.on_extraction_complete(&report);
640 self.persona_cache = None;
641 tracing::debug!(
642 target: "ainl_runtime",
643 duration_ms = t_extract.elapsed().as_millis() as u64,
644 signals_ingested = report.signals_extracted as u64,
645 skipped = false,
646 "extraction_pass"
647 );
648 (Some(report), extraction_failed)
649 };
650 self.last_extraction_at_turn = self.turn_count;
651 res
652 } else {
653 tracing::debug!(
654 target: "ainl_runtime",
655 duration_ms = t_extract.elapsed().as_millis() as u64,
656 signals_ingested = 0u64,
657 skipped = true,
658 "extraction_pass"
659 );
660 (None, false)
661 };
662
663 if let Err(e) = self.memory.with(|m| {
664 try_export_graph_json_armaraos(m.sqlite_store(), &self.config.agent_id)
665 }) {
666 tracing::warn!(
667 phase = ?TurnPhase::ExportRefresh,
668 error = %e,
669 "non-fatal turn write failed — continuing"
670 );
671 turn_warnings.push(TurnWarning {
672 phase: TurnPhase::ExportRefresh,
673 error: e,
674 });
675 }
676
677 if !self.config.agent_id.is_empty() {
678 let state = RuntimeStateNode {
679 agent_id: self.config.agent_id.clone(),
680 turn_count: self.turn_count,
681 last_extraction_at_turn: self.last_extraction_at_turn,
682 persona_snapshot_json: self
683 .persona_cache
684 .as_ref()
685 .and_then(|p| serde_json::to_string(p).ok()),
686 updated_at: chrono::Utc::now().timestamp(),
687 };
688 let write_res = if std::mem::take(&mut self.test_force_runtime_state_write_failure) {
689 Err("injected runtime state write failure".to_string())
690 } else {
691 self.memory.with(|m| m.write_runtime_state(&state))
692 };
693 if let Err(e) = write_res {
694 tracing::warn!(
695 phase = ?TurnPhase::RuntimeStatePersist,
696 error = %e,
697 "failed to persist runtime state — cadence will reset on next restart"
698 );
699 turn_warnings.push(TurnWarning {
700 phase: TurnPhase::RuntimeStatePersist,
701 error: e,
702 });
703 }
704 }
705
706 let result = TurnResult {
707 episode_id,
708 persona_prompt_contribution,
709 memory_context,
710 extraction_report,
711 steps_executed: dispatched_count,
712 patch_dispatch_results,
713 status: TurnStatus::Ok,
714 };
715
716 let outcome = if turn_warnings.is_empty() {
717 TurnOutcome::Complete(result)
718 } else {
719 TurnOutcome::PartialSuccess {
720 result,
721 warnings: turn_warnings,
722 }
723 };
724
725 self.hooks.on_turn_complete(&outcome);
726 Ok(outcome)
727 }
728
729 fn relevant_semantic_nodes(
732 &self,
733 user_message: &str,
734 all_semantic: Vec<AinlMemoryNode>,
735 limit: usize,
736 ) -> Vec<AinlMemoryNode> {
737 let user_tags = infer_topic_tags(user_message);
738 let user_topics: HashSet<String> = user_tags
739 .iter()
740 .filter(|t| t.namespace == TagNamespace::Topic)
741 .map(|t| t.value.to_lowercase())
742 .collect();
743
744 if user_message.trim().is_empty() || user_topics.is_empty() {
745 return fallback_high_recurrence_semantic(all_semantic, limit);
746 }
747
748 let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
749 for n in all_semantic {
750 let (score, rec) = match &n.node_type {
751 AinlNodeType::Semantic { semantic } => {
752 let mut s = 0f32;
753 if let Some(cluster) = &semantic.topic_cluster {
754 for slug in cluster
755 .split([',', ';'])
756 .map(|s| s.trim().to_lowercase())
757 .filter(|s| !s.is_empty())
758 {
759 if user_topics.contains(&slug) {
760 s += 1.0;
761 }
762 }
763 }
764 if s == 0.0 {
765 for tag in &semantic.tags {
766 let tl = tag.to_lowercase();
767 if let Some(rest) = tl.strip_prefix("topic:") {
768 let slug = rest.trim().to_lowercase();
769 if user_topics.contains(&slug) {
770 s = 0.5;
771 break;
772 }
773 }
774 }
775 }
776 (s, semantic.recurrence_count)
777 }
778 _ => (0.0, 0),
779 };
780 scored.push((score, rec, n));
781 }
782
783 scored.sort_by(|a, b| {
784 b.0.partial_cmp(&a.0)
785 .unwrap_or(std::cmp::Ordering::Equal)
786 .then_with(|| b.1.cmp(&a.1))
787 });
788 scored.into_iter().take(limit).map(|t| t.2).collect()
789 }
790
791 pub fn dispatch_patches(
792 &mut self,
793 patches: &[AinlMemoryNode],
794 frame: &HashMap<String, serde_json::Value>,
795 ) -> Vec<PatchDispatchResult> {
796 let mut w = Vec::new();
797 self.dispatch_patches_collect(patches, frame, &mut w)
798 }
799
800 fn dispatch_patches_collect(
801 &mut self,
802 patches: &[AinlMemoryNode],
803 frame: &HashMap<String, serde_json::Value>,
804 turn_warnings: &mut Vec<TurnWarning>,
805 ) -> Vec<PatchDispatchResult> {
806 let mut out = Vec::new();
807 for node in patches {
808 let res = self.dispatch_one_patch(node, frame);
809 if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
810 tracing::warn!(
811 phase = ?TurnPhase::FitnessWriteBack,
812 error = %e,
813 "non-fatal turn write failed — continuing"
814 );
815 turn_warnings.push(TurnWarning {
816 phase: TurnPhase::FitnessWriteBack,
817 error: format!("{}: {}", res.label, e),
818 });
819 }
820 out.push(res);
821 }
822 out
823 }
824
825 fn dispatch_one_patch(
826 &mut self,
827 node: &AinlMemoryNode,
828 frame: &HashMap<String, serde_json::Value>,
829 ) -> PatchDispatchResult {
830 let label_default = String::new();
831 let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
832 AinlNodeType::Procedural { procedural } => (
833 procedural_label(procedural),
834 procedural.patch_version,
835 procedural.retired,
836 procedural.declared_reads.clone(),
837 procedural.fitness,
838 ),
839 _ => {
840 return PatchDispatchResult {
841 label: label_default,
842 patch_version: 0,
843 fitness_before: 0.0,
844 fitness_after: 0.0,
845 dispatched: false,
846 skip_reason: Some(PatchSkipReason::NotProcedural),
847 adapter_output: None,
848 adapter_name: None,
849 };
850 }
851 };
852
853 if pv == 0 {
854 return PatchDispatchResult {
855 label: label_src,
856 patch_version: pv,
857 fitness_before: fitness_opt.unwrap_or(0.5),
858 fitness_after: fitness_opt.unwrap_or(0.5),
859 dispatched: false,
860 skip_reason: Some(PatchSkipReason::ZeroVersion),
861 adapter_output: None,
862 adapter_name: None,
863 };
864 }
865 if retired {
866 return PatchDispatchResult {
867 label: label_src.clone(),
868 patch_version: pv,
869 fitness_before: fitness_opt.unwrap_or(0.5),
870 fitness_after: fitness_opt.unwrap_or(0.5),
871 dispatched: false,
872 skip_reason: Some(PatchSkipReason::Retired),
873 adapter_output: None,
874 adapter_name: None,
875 };
876 }
877 for key in &reads {
878 if !frame.contains_key(key) {
879 return PatchDispatchResult {
880 label: label_src.clone(),
881 patch_version: pv,
882 fitness_before: fitness_opt.unwrap_or(0.5),
883 fitness_after: fitness_opt.unwrap_or(0.5),
884 dispatched: false,
885 skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
886 adapter_output: None,
887 adapter_name: None,
888 };
889 }
890 }
891
892 let patch_label = label_src.clone();
893 let adapter_key = patch_label.as_str();
894 let ctx = PatchDispatchContext {
895 patch_label: adapter_key,
896 node,
897 frame,
898 };
899 let (adapter_output, adapter_name) = if let Some(adapter) = self
900 .adapter_registry
901 .get(adapter_key)
902 .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
903 {
904 let aname = adapter.name().to_string();
905 match adapter.execute_patch(&ctx) {
906 Ok(output) => {
907 tracing::debug!(
908 label = %patch_label,
909 adapter = %aname,
910 "adapter executed patch"
911 );
912 (Some(output), Some(aname))
913 }
914 Err(e) => {
915 tracing::warn!(
916 label = %patch_label,
917 adapter = %aname,
918 error = %e,
919 "adapter execution failed — continuing as metadata dispatch"
920 );
921 (None, Some(aname))
922 }
923 }
924 } else {
925 (None, None)
926 };
927
928 let fitness_before = fitness_opt.unwrap_or(0.5);
929 let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
930
931 let updated = match self.memory.with(|m| {
932 let store = m.sqlite_store();
933 store.read_node(node.id)
934 }) {
935 Ok(Some(mut n)) => {
936 if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
937 procedural.fitness = Some(fitness_after);
938 }
939 n
940 }
941 Ok(None) => {
942 return PatchDispatchResult {
943 label: label_src,
944 patch_version: pv,
945 fitness_before,
946 fitness_after: fitness_before,
947 dispatched: false,
948 skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
949 adapter_output,
950 adapter_name,
951 };
952 }
953 Err(e) => {
954 return PatchDispatchResult {
955 label: label_src,
956 patch_version: pv,
957 fitness_before,
958 fitness_after: fitness_before,
959 dispatched: false,
960 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
961 adapter_output,
962 adapter_name,
963 };
964 }
965 };
966
967 if self.test_force_fitness_write_failure {
968 self.test_force_fitness_write_failure = false;
969 let e = "injected fitness write failure".to_string();
970 return PatchDispatchResult {
971 label: label_src.clone(),
972 patch_version: pv,
973 fitness_before,
974 fitness_after: fitness_before,
975 dispatched: false,
976 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
977 adapter_output,
978 adapter_name,
979 };
980 }
981
982 if let Err(e) = self.memory.with(|m| m.write_node(&updated)) {
983 return PatchDispatchResult {
984 label: label_src.clone(),
985 patch_version: pv,
986 fitness_before,
987 fitness_after: fitness_before,
988 dispatched: false,
989 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
990 adapter_output,
991 adapter_name,
992 };
993 }
994
995 self.hooks
996 .on_patch_dispatched(label_src.as_str(), fitness_after);
997
998 PatchDispatchResult {
999 label: label_src,
1000 patch_version: pv,
1001 fitness_before,
1002 fitness_after,
1003 dispatched: true,
1004 skip_reason: None,
1005 adapter_output,
1006 adapter_name,
1007 }
1008 }
1009}
1010
1011pub(crate) fn emit_target_name(n: &AinlMemoryNode) -> String {
1012 match &n.node_type {
1013 AinlNodeType::Persona { persona } => persona.trait_name.clone(),
1014 AinlNodeType::Procedural { procedural } => procedural_label(procedural),
1015 AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
1016 AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
1017 AinlNodeType::RuntimeState { runtime_state } => {
1018 format!("runtime_state:{}", runtime_state.agent_id)
1019 }
1020 }
1021}
1022
1023pub(crate) fn procedural_label(p: &ProceduralNode) -> String {
1024 if !p.label.is_empty() {
1025 p.label.clone()
1026 } else {
1027 p.pattern_name.clone()
1028 }
1029}
1030
1031pub(crate) fn fallback_high_recurrence_semantic(
1032 all: Vec<AinlMemoryNode>,
1033 limit: usize,
1034) -> Vec<AinlMemoryNode> {
1035 let mut v: Vec<_> = all
1036 .into_iter()
1037 .filter(|n| {
1038 matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
1039 })
1040 .collect();
1041 v.sort_by(|a, b| {
1042 let ra = match &a.node_type {
1043 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1044 _ => 0,
1045 };
1046 let rb = match &b.node_type {
1047 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1048 _ => 0,
1049 };
1050 rb.cmp(&ra)
1051 });
1052 v.into_iter().take(limit).collect()
1053}
1054
1055pub(crate) fn persona_snapshot_if_evolved(
1056 extractor: &GraphExtractorTask,
1057) -> Option<ainl_persona::PersonaSnapshot> {
1058 let snap = extractor.evolution_engine.snapshot();
1059 let defaults = default_axis_map(0.5);
1060 for axis in PersonaAxis::ALL {
1061 let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
1062 let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
1063 if (s - d).abs() > INGEST_SCORE_EPSILON {
1064 return Some(snap);
1065 }
1066 }
1067 None
1068}
1069
1070pub(crate) fn compile_persona_from_nodes(
1071 nodes: &[AinlMemoryNode],
1072) -> Result<Option<String>, String> {
1073 if nodes.is_empty() {
1074 return Ok(None);
1075 }
1076 let mut lines = Vec::new();
1077 for n in nodes {
1078 if let AinlNodeType::Persona { persona } = &n.node_type {
1079 lines.push(format_persona_line(persona));
1080 }
1081 }
1082 if lines.is_empty() {
1083 Ok(None)
1084 } else {
1085 Ok(Some(lines.join("\n")))
1086 }
1087}
1088
1089fn format_persona_line(p: &PersonaNode) -> String {
1090 format!(
1091 "- {} (strength {:.2}, layer {:?}, source {:?})",
1092 p.trait_name, p.strength, p.layer, p.source
1093 )
1094}
1095
1096pub(crate) fn try_export_graph_json_armaraos(
1100 store: &SqliteGraphStore,
1101 agent_id: &str,
1102) -> Result<(), String> {
1103 let trimmed = std::env::var("AINL_GRAPH_MEMORY_ARMARAOS_EXPORT").unwrap_or_default();
1104 let dir = trimmed.trim();
1105 if dir.is_empty() {
1106 return Ok(());
1107 }
1108 let dir_path = PathBuf::from(dir);
1109 std::fs::create_dir_all(&dir_path).map_err(|e| format!("export mkdir: {e}"))?;
1110 let path = dir_path.join(format!("{agent_id}_graph_export.json"));
1111 let snap = store.export_graph(agent_id)?;
1112 let v = serde_json::to_value(&snap).map_err(|e| format!("serialize: {e}"))?;
1113 std::fs::write(
1114 &path,
1115 serde_json::to_vec_pretty(&v).map_err(|e| format!("json encode: {e}"))?,
1116 )
1117 .map_err(|e| format!("write export: {e}"))?;
1118 Ok(())
1119}
1120
1121pub(crate) fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
1122 if tools_invoked.is_empty() {
1123 return vec!["turn".to_string()];
1124 }
1125 let tags = tag_tool_names(tools_invoked);
1126 let mut seen: BTreeSet<String> = BTreeSet::new();
1127 for t in tags {
1128 if t.namespace == TagNamespace::Tool {
1129 seen.insert(t.value);
1130 }
1131 }
1132 if seen.is_empty() {
1133 vec!["turn".to_string()]
1134 } else {
1135 seen.into_iter().collect()
1136 }
1137}
1138
1139pub(crate) fn record_turn_episode(
1140 memory: &ainl_memory::GraphMemory,
1141 agent_id: &str,
1142 input: &TurnInput,
1143 tools_invoked_canonical: &[String],
1144) -> Result<Uuid, String> {
1145 let turn_id = Uuid::new_v4();
1146 let timestamp = chrono::Utc::now().timestamp();
1147 let tools = tools_invoked_canonical.to_vec();
1148 let mut node = AinlMemoryNode::new_episode(
1149 turn_id,
1150 timestamp,
1151 tools.clone(),
1152 None,
1153 input.trace_event.clone(),
1154 );
1155 node.agent_id = agent_id.to_string();
1156 if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
1157 episodic.user_message = Some(input.user_message.clone());
1158 episodic.tools_invoked = tools;
1159 }
1160 memory.write_node(&node)?;
1161 Ok(node.id)
1162}
1163
1164#[cfg(feature = "async")]
1165#[path = "runtime_async.rs"]
1166mod runtime_async_impl;