1use std::collections::{BTreeSet, HashMap, HashSet};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
10use ainl_graph_extractor::GraphExtractorTask;
11use ainl_memory::{
12 AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
13 RuntimeStateNode, SqliteGraphStore,
14};
15use ainl_persona::axes::default_axis_map;
16use ainl_persona::{
17 EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
18};
19use ainl_semantic_tagger::infer_topic_tags;
20use ainl_semantic_tagger::tag_tool_names;
21use ainl_semantic_tagger::TagNamespace;
22use uuid::Uuid;
23
24use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
25use crate::engine::{
26 AinlGraphArtifact, AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult,
27 PatchSkipReason, TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning,
28 EMIT_TO_EDGE,
29};
30use crate::graph_cell::{GraphCell, SqliteStoreRef};
31#[cfg(feature = "async")]
32use crate::hooks::TurnHooksAsync;
33use crate::hooks::{NoOpHooks, TurnHooks};
34use crate::RuntimeConfig;
35
36pub struct AinlRuntime {
52 config: RuntimeConfig,
53 memory: GraphCell,
54 extractor: GraphExtractorTask,
55 turn_count: u64,
56 last_extraction_at_turn: u64,
57 current_depth: Arc<AtomicU32>,
59 hooks: Box<dyn TurnHooks>,
60 evolution_writes_enabled: bool,
64 persona_cache: Option<String>,
65 #[doc(hidden)]
67 test_force_extraction_failure: bool,
68 #[doc(hidden)]
70 test_force_fitness_write_failure: bool,
71 #[doc(hidden)]
73 test_force_runtime_state_write_failure: bool,
74 adapter_registry: AdapterRegistry,
75 #[cfg(feature = "async")]
77 hooks_async: Option<std::sync::Arc<dyn TurnHooksAsync>>,
78}
79
80impl AinlRuntime {
81 pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
82 let agent_id = config.agent_id.clone();
83 let memory = GraphCell::new(store);
84 let (init_turn_count, init_persona_cache, init_last_extraction_at_turn) =
85 if agent_id.is_empty() {
86 (0, None, 0)
87 } else {
88 match memory.read_runtime_state(&agent_id) {
89 Ok(Some(state)) => {
90 tracing::info!(
91 agent_id = %agent_id,
92 turn_count = state.turn_count,
93 "restored runtime state"
94 );
95 let persona_cache = state
96 .persona_snapshot_json
97 .as_ref()
98 .and_then(|json| serde_json::from_str::<String>(json).ok());
99 (
100 state.turn_count,
101 persona_cache,
102 state.last_extraction_at_turn,
103 )
104 }
105 Ok(None) => (0, None, 0),
106 Err(e) => {
107 tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
108 (0, None, 0)
109 }
110 }
111 };
112 Self {
113 extractor: GraphExtractorTask::new(&agent_id),
114 memory,
115 config,
116 turn_count: init_turn_count,
117 last_extraction_at_turn: init_last_extraction_at_turn,
118 current_depth: Arc::new(AtomicU32::new(0)),
119 hooks: Box::new(NoOpHooks),
120 evolution_writes_enabled: true,
121 persona_cache: init_persona_cache,
122 test_force_extraction_failure: false,
123 test_force_fitness_write_failure: false,
124 test_force_runtime_state_write_failure: false,
125 adapter_registry: AdapterRegistry::new(),
126 #[cfg(feature = "async")]
127 hooks_async: None,
128 }
129 }
130
131 pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
133 self.adapter_registry.register(adapter);
134 }
135
136 pub fn register_default_patch_adapters(&mut self) {
139 self.register_adapter(GraphPatchAdapter::new());
140 }
141
142 pub fn registered_adapters(&self) -> Vec<&str> {
144 self.adapter_registry.registered_names()
145 }
146
147 #[doc(hidden)]
148 #[cfg(any(test, debug_assertions))]
149 pub fn test_turn_count(&self) -> u64 {
150 self.turn_count
151 }
152
153 #[doc(hidden)]
154 #[cfg(any(test, debug_assertions))]
155 pub fn test_persona_cache(&self) -> Option<&str> {
156 self.persona_cache.as_deref()
157 }
158
159 #[doc(hidden)]
160 #[cfg(any(test, debug_assertions))]
161 pub fn test_delegation_depth(&self) -> u32 {
162 self.current_depth.load(Ordering::SeqCst)
163 }
164
165 #[doc(hidden)]
166 #[cfg(any(test, debug_assertions))]
167 pub fn test_set_delegation_depth(&mut self, depth: u32) {
168 self.current_depth.store(depth, Ordering::SeqCst);
169 }
170
171 #[doc(hidden)]
172 #[cfg(any(test, debug_assertions))]
173 pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
174 self.test_force_extraction_failure = fail;
175 }
176
177 #[doc(hidden)]
178 #[cfg(any(test, debug_assertions))]
179 pub fn test_set_force_fitness_write_failure(&mut self, fail: bool) {
180 self.test_force_fitness_write_failure = fail;
181 }
182
183 #[doc(hidden)]
185 #[cfg(any(test, debug_assertions))]
186 pub fn test_extractor_mut(&mut self) -> &mut GraphExtractorTask {
187 &mut self.extractor
188 }
189
190 #[doc(hidden)]
191 #[cfg(any(test, debug_assertions))]
192 pub fn test_set_force_runtime_state_write_failure(&mut self, fail: bool) {
193 self.test_force_runtime_state_write_failure = fail;
194 }
195
196 pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
197 self.hooks = Box::new(hooks);
198 self
199 }
200
201 #[cfg(feature = "async")]
203 pub fn with_hooks_async(mut self, hooks: std::sync::Arc<dyn TurnHooksAsync>) -> Self {
204 self.hooks_async = Some(hooks);
205 self
206 }
207
208 pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
213 self.evolution_writes_enabled = enabled;
214 self
215 }
216
217 pub fn evolution_writes_enabled(&self) -> bool {
222 self.evolution_writes_enabled
223 }
224
225 fn require_evolution_writes_enabled(&self) -> Result<(), String> {
226 if self.evolution_writes_enabled {
227 Ok(())
228 } else {
229 Err(
230 "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
231 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
232 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
233 ainl_memory.db"
234 .to_string(),
235 )
236 }
237 }
238
239 pub fn sqlite_store(&self) -> SqliteStoreRef<'_> {
246 self.memory.sqlite_ref()
247 }
248
249 pub fn evolution_engine(&self) -> &EvolutionEngine {
257 &self.extractor.evolution_engine
258 }
259
260 pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
264 &mut self.extractor.evolution_engine
265 }
266
267 pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
269 self.extractor.evolution_engine.ingest_signals(signals)
270 }
271
272 pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
274 self.extractor
275 .evolution_engine
276 .correction_tick(axis, correction);
277 }
278
279 pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
283 self.require_evolution_writes_enabled()?;
284 let snap = self.extractor.evolution_engine.snapshot();
285 self.memory.with(|m| {
286 self.extractor
287 .evolution_engine
288 .write_persona_node(m.sqlite_store(), &snap)
289 })?;
290 Ok(snap)
291 }
292
293 pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
300 self.require_evolution_writes_enabled()?;
301 self.memory
302 .with(|m| self.extractor.evolution_engine.evolve(m.sqlite_store()))
303 }
304
305 pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
307 self.memory
308 .with(|m| AinlGraphArtifact::load(m.sqlite_store(), &self.config.agent_id))
309 }
310
311 pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
314 self.compile_memory_context_for(None)
315 }
316
317 pub fn compile_memory_context_for(
322 &self,
323 user_message: Option<&str>,
324 ) -> Result<MemoryContext, String> {
325 if self.config.agent_id.is_empty() {
326 return Err("RuntimeConfig.agent_id must be set".to_string());
327 }
328 self.memory.with(|m| {
329 let store = m.sqlite_store();
330 let q = store.query(&self.config.agent_id);
331 let recent_episodes = q.recent_episodes(5)?;
332 let all_semantic = q.semantic_nodes()?;
333 let relevant_semantic =
334 self.relevant_semantic_nodes(user_message.unwrap_or(""), all_semantic, 10);
335 let active_patches = q.active_patches()?;
336 let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
337 Ok(MemoryContext {
338 recent_episodes,
339 relevant_semantic,
340 active_patches,
341 persona_snapshot,
342 compiled_at: chrono::Utc::now(),
343 })
344 })
345 }
346
347 pub fn route_emit_edges(
349 &self,
350 episode_id: Uuid,
351 turn_output_payload: &serde_json::Value,
352 ) -> Result<(), String> {
353 self.memory.with(|m| {
354 let store = m.sqlite_store();
355 let neighbors = store
356 .query(&self.config.agent_id)
357 .neighbors(episode_id, EMIT_TO_EDGE)?;
358 for n in neighbors {
359 let target = emit_target_name(&n);
360 self.hooks.on_emit(&target, turn_output_payload);
361 }
362 Ok(())
363 })
364 }
365
366 pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutcome, AinlRuntimeError> {
368 let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
369 let cd = Arc::clone(&self.current_depth);
370 let _depth_guard = scopeguard::guard((), move |()| {
371 cd.fetch_sub(1, Ordering::SeqCst);
372 });
373
374 if depth >= self.config.max_delegation_depth {
375 return Err(AinlRuntimeError::DelegationDepthExceeded {
376 depth,
377 max: self.config.max_delegation_depth,
378 });
379 }
380
381 if !self.config.enable_graph_memory {
382 let memory_context = MemoryContext::default();
383 let result = TurnResult {
384 memory_context,
385 status: TurnStatus::GraphMemoryDisabled,
386 ..Default::default()
387 };
388 let outcome = TurnOutcome::Complete(result);
389 self.hooks.on_turn_complete(&outcome);
390 return Ok(outcome);
391 }
392
393 if self.config.agent_id.is_empty() {
394 return Err(AinlRuntimeError::Message(
395 "RuntimeConfig.agent_id must be set for run_turn".into(),
396 ));
397 }
398
399 let span = tracing::info_span!(
400 "ainl_runtime.run_turn",
401 agent_id = %self.config.agent_id,
402 turn = self.turn_count,
403 depth = input.depth,
404 );
405 let _span_enter = span.enter();
406
407 let validation: GraphValidationReport = self
408 .memory
409 .with(|m| m.sqlite_store().validate_graph(&self.config.agent_id))
410 .map_err(AinlRuntimeError::from)?;
411 if !validation.is_valid {
412 let mut msg = String::from("graph validation failed before turn");
413 for d in &validation.dangling_edge_details {
414 msg.push_str(&format!(
415 "; {} -> {} [{}]",
416 d.source_id, d.target_id, d.edge_type
417 ));
418 }
419 return Err(AinlRuntimeError::Message(msg));
420 }
421
422 self.hooks
423 .on_artifact_loaded(&self.config.agent_id, validation.node_count);
424
425 let mut turn_warnings: Vec<TurnWarning> = Vec::new();
426
427 let t_persona = Instant::now();
428 let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
429 Some(cached.clone())
430 } else {
431 let nodes = self
432 .memory
433 .with(|m| {
434 m.sqlite_store()
435 .query(&self.config.agent_id)
436 .persona_nodes()
437 })
438 .map_err(AinlRuntimeError::from)?;
439 let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
440 self.persona_cache = compiled.clone();
441 compiled
442 };
443 self.hooks
444 .on_persona_compiled(persona_prompt_contribution.as_deref());
445 tracing::debug!(
446 target: "ainl_runtime",
447 duration_ms = t_persona.elapsed().as_millis() as u64,
448 has_contribution = persona_prompt_contribution.is_some(),
449 "persona_phase"
450 );
451
452 let t_memory = Instant::now();
453 let memory_context = self
454 .compile_memory_context_for(Some(&input.user_message))
455 .map_err(AinlRuntimeError::from)?;
456 self.hooks.on_memory_context_ready(&memory_context);
457 tracing::debug!(
458 target: "ainl_runtime",
459 duration_ms = t_memory.elapsed().as_millis() as u64,
460 episode_count = memory_context.recent_episodes.len(),
461 semantic_count = memory_context.relevant_semantic.len(),
462 patch_count = memory_context.active_patches.len(),
463 "memory_context"
464 );
465
466 let t_patches = Instant::now();
467 let patch_dispatch_results = if self.config.enable_graph_memory {
468 self.dispatch_patches_collect(
469 &memory_context.active_patches,
470 &input.frame,
471 &mut turn_warnings,
472 )
473 } else {
474 Vec::new()
475 };
476 for r in &patch_dispatch_results {
477 tracing::debug!(
478 target: "ainl_runtime",
479 label = %r.label,
480 dispatched = r.dispatched,
481 fitness_before = r.fitness_before,
482 fitness_after = r.fitness_after,
483 "patch_dispatch"
484 );
485 }
486 tracing::debug!(
487 target: "ainl_runtime",
488 duration_ms = t_patches.elapsed().as_millis() as u64,
489 "patch_dispatch_phase"
490 );
491
492 let dispatched_count = patch_dispatch_results
493 .iter()
494 .filter(|r| r.dispatched)
495 .count() as u32;
496 if dispatched_count >= self.config.max_steps {
497 let result = TurnResult {
498 patch_dispatch_results,
499 memory_context,
500 persona_prompt_contribution,
501 steps_executed: dispatched_count,
502 status: TurnStatus::StepLimitExceeded {
503 steps_executed: dispatched_count,
504 },
505 ..Default::default()
506 };
507 let outcome = TurnOutcome::Complete(result);
508 self.hooks.on_turn_complete(&outcome);
509 return Ok(outcome);
510 }
511
512 let t_episode = Instant::now();
513 let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
514 let episode_id = match self
515 .memory
516 .with(|m| record_turn_episode(m, &self.config.agent_id, &input, &tools_canonical))
517 {
518 Ok(id) => id,
519 Err(e) => {
520 tracing::warn!(
521 phase = ?TurnPhase::EpisodeWrite,
522 error = %e,
523 "non-fatal turn write failed — continuing"
524 );
525 turn_warnings.push(TurnWarning {
526 phase: TurnPhase::EpisodeWrite,
527 error: e,
528 });
529 Uuid::nil()
530 }
531 };
532 self.hooks.on_episode_recorded(episode_id);
533 tracing::debug!(
534 target: "ainl_runtime",
535 duration_ms = t_episode.elapsed().as_millis() as u64,
536 episode_id = %episode_id,
537 "episode_record"
538 );
539
540 if !episode_id.is_nil() {
541 for &tid in &input.emit_targets {
542 if let Err(e) = self.memory.with(|m| {
543 m.sqlite_store()
544 .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)
545 }) {
546 tracing::warn!(
547 phase = ?TurnPhase::EpisodeWrite,
548 error = %e,
549 "non-fatal turn write failed — continuing"
550 );
551 turn_warnings.push(TurnWarning {
552 phase: TurnPhase::EpisodeWrite,
553 error: e,
554 });
555 }
556 }
557 }
558
559 let emit_payload = serde_json::json!({
560 "episode_id": episode_id.to_string(),
561 "user_message": input.user_message,
562 "tools_invoked": tools_canonical,
563 "persona_contribution": persona_prompt_contribution,
564 "turn_count": self.turn_count.wrapping_add(1),
565 });
566 if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
567 tracing::warn!(
568 phase = ?TurnPhase::EpisodeWrite,
569 error = %e,
570 "non-fatal turn write failed — continuing"
571 );
572 turn_warnings.push(TurnWarning {
573 phase: TurnPhase::EpisodeWrite,
574 error: format!("emit_routing: {e}"),
575 });
576 }
577
578 if let Err(e) = self.memory.with(|m| {
579 maybe_persist_trajectory_after_episode(
580 m,
581 &self.config.agent_id,
582 episode_id,
583 &tools_canonical,
584 &patch_dispatch_results,
585 &input,
586 )
587 }) {
588 tracing::warn!(
589 phase = ?TurnPhase::EpisodeWrite,
590 error = %e,
591 "non-fatal trajectory persist failed — continuing"
592 );
593 turn_warnings.push(TurnWarning {
594 phase: TurnPhase::EpisodeWrite,
595 error: format!("trajectory_persist: {e}"),
596 });
597 }
598
599 self.turn_count = self.turn_count.wrapping_add(1);
600
601 let should_extract = self.config.extraction_interval > 0
602 && self.turn_count.saturating_sub(self.last_extraction_at_turn)
603 >= self.config.extraction_interval as u64;
604
605 let t_extract = Instant::now();
606 let (extraction_report, _extraction_failed) = if should_extract {
607 let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
608
609 let res = if force_fail {
610 let e = "test_forced".to_string();
611 tracing::warn!(
612 phase = ?TurnPhase::ExtractionPass,
613 error = %e,
614 "non-fatal turn write failed — continuing"
615 );
616 turn_warnings.push(TurnWarning {
617 phase: TurnPhase::ExtractionPass,
618 error: e,
619 });
620 tracing::debug!(
621 target: "ainl_runtime",
622 duration_ms = t_extract.elapsed().as_millis() as u64,
623 signals_ingested = 0u64,
624 skipped = false,
625 "extraction_pass"
626 );
627 (None, true)
628 } else {
629 let report = self
630 .memory
631 .with(|m| self.extractor.run_pass(m.sqlite_store()));
632 if let Some(ref e) = report.extract_error {
633 tracing::warn!(
634 phase = ?TurnPhase::ExtractionPass,
635 error = %e,
636 "non-fatal turn write failed — continuing"
637 );
638 turn_warnings.push(TurnWarning {
639 phase: TurnPhase::ExtractionPass,
640 error: e.clone(),
641 });
642 }
643 if let Some(ref e) = report.pattern_error {
644 tracing::warn!(
645 phase = ?TurnPhase::PatternPersistence,
646 error = %e,
647 "non-fatal turn write failed — continuing"
648 );
649 turn_warnings.push(TurnWarning {
650 phase: TurnPhase::PatternPersistence,
651 error: e.clone(),
652 });
653 }
654 if let Some(ref e) = report.persona_error {
655 tracing::warn!(
656 phase = ?TurnPhase::PersonaEvolution,
657 error = %e,
658 "non-fatal turn write failed — continuing"
659 );
660 turn_warnings.push(TurnWarning {
661 phase: TurnPhase::PersonaEvolution,
662 error: e.clone(),
663 });
664 }
665 let extraction_failed = report.has_errors();
666 if !extraction_failed {
667 tracing::info!(
668 agent_id = %report.agent_id,
669 signals_extracted = report.signals_extracted,
670 signals_applied = report.signals_applied,
671 semantic_nodes_updated = report.semantic_nodes_updated,
672 "ainl-graph-extractor pass completed (scheduled)"
673 );
674 }
675 self.hooks.on_extraction_complete(&report);
676 self.persona_cache = None;
677 tracing::debug!(
678 target: "ainl_runtime",
679 duration_ms = t_extract.elapsed().as_millis() as u64,
680 signals_ingested = report.signals_extracted as u64,
681 skipped = false,
682 "extraction_pass"
683 );
684 (Some(report), extraction_failed)
685 };
686 self.last_extraction_at_turn = self.turn_count;
687 res
688 } else {
689 tracing::debug!(
690 target: "ainl_runtime",
691 duration_ms = t_extract.elapsed().as_millis() as u64,
692 signals_ingested = 0u64,
693 skipped = true,
694 "extraction_pass"
695 );
696 (None, false)
697 };
698
699 if let Err(e) = self
700 .memory
701 .with(|m| try_export_graph_json_armaraos(m.sqlite_store(), &self.config.agent_id))
702 {
703 tracing::warn!(
704 phase = ?TurnPhase::ExportRefresh,
705 error = %e,
706 "non-fatal turn write failed — continuing"
707 );
708 turn_warnings.push(TurnWarning {
709 phase: TurnPhase::ExportRefresh,
710 error: e,
711 });
712 }
713
714 if !self.config.agent_id.is_empty() {
715 let state = RuntimeStateNode {
716 agent_id: self.config.agent_id.clone(),
717 turn_count: self.turn_count,
718 last_extraction_at_turn: self.last_extraction_at_turn,
719 persona_snapshot_json: self
720 .persona_cache
721 .as_ref()
722 .and_then(|p| serde_json::to_string(p).ok()),
723 updated_at: chrono::Utc::now().timestamp(),
724 };
725 let write_res = if std::mem::take(&mut self.test_force_runtime_state_write_failure) {
726 Err("injected runtime state write failure".to_string())
727 } else {
728 self.memory.with(|m| m.write_runtime_state(&state))
729 };
730 if let Err(e) = write_res {
731 tracing::warn!(
732 phase = ?TurnPhase::RuntimeStatePersist,
733 error = %e,
734 "failed to persist runtime state — cadence will reset on next restart"
735 );
736 turn_warnings.push(TurnWarning {
737 phase: TurnPhase::RuntimeStatePersist,
738 error: e,
739 });
740 }
741 }
742
743 if let (Some(gate), Some(phase), Some(trust)) = (
745 input.vitals_gate.as_deref(),
746 input.vitals_phase.as_deref(),
747 input.vitals_trust,
748 ) {
749 self.hooks.on_vitals_classified(gate, phase, trust);
750 }
751
752 let result = TurnResult {
753 episode_id,
754 persona_prompt_contribution,
755 memory_context,
756 extraction_report,
757 steps_executed: dispatched_count,
758 patch_dispatch_results,
759 status: TurnStatus::Ok,
760 vitals_gate: input.vitals_gate.clone(),
761 vitals_phase: input.vitals_phase.clone(),
762 vitals_trust: input.vitals_trust,
763 };
764
765 let outcome = if turn_warnings.is_empty() {
766 TurnOutcome::Complete(result)
767 } else {
768 TurnOutcome::PartialSuccess {
769 result,
770 warnings: turn_warnings,
771 }
772 };
773
774 self.hooks.on_turn_complete(&outcome);
775 Ok(outcome)
776 }
777
778 fn relevant_semantic_nodes(
786 &self,
787 user_message: &str,
788 all_semantic: Vec<AinlMemoryNode>,
789 limit: usize,
790 ) -> Vec<AinlMemoryNode> {
791 let user_tags = infer_topic_tags(user_message);
792 let user_topics: HashSet<String> = user_tags
793 .iter()
794 .filter(|t| t.namespace == TagNamespace::Topic)
795 .map(|t| t.value.to_lowercase())
796 .collect();
797
798 if user_message.trim().is_empty() || user_topics.is_empty() {
799 return fallback_high_recurrence_semantic(all_semantic, limit);
800 }
801
802 let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
803 for n in all_semantic {
804 let (score, rec) = match &n.node_type {
805 AinlNodeType::Semantic { semantic } => {
806 let mut s = 0f32;
807 if let Some(cluster) = &semantic.topic_cluster {
808 for slug in cluster
809 .split([',', ';'])
810 .map(|s| s.trim().to_lowercase())
811 .filter(|s| !s.is_empty())
812 {
813 if user_topics.contains(&slug) {
814 s += 1.0;
815 }
816 }
817 }
818 if s == 0.0 {
819 for tag in &semantic.tags {
820 let tl = tag.to_lowercase();
821 if let Some(rest) = tl.strip_prefix("topic:") {
822 let slug = rest.trim().to_lowercase();
823 if user_topics.contains(&slug) {
824 s = 0.5;
825 break;
826 }
827 }
828 }
829 }
830 let confidence = semantic.confidence;
835 for tag in &semantic.tags {
836 let tl = tag.to_lowercase();
837 if tl.starts_with("vitals:") {
838 if tl.ends_with(":pass") {
839 s += 0.2 * confidence;
840 } else if tl == "vitals:elevated" {
841 s -= 0.1;
842 }
843 }
844 }
845 (s, semantic.recurrence_count)
846 }
847 _ => (0.0, 0),
848 };
849 scored.push((score, rec, n));
850 }
851
852 scored.sort_by(|a, b| {
853 b.0.partial_cmp(&a.0)
854 .unwrap_or(std::cmp::Ordering::Equal)
855 .then_with(|| b.1.cmp(&a.1))
856 });
857 scored.into_iter().take(limit).map(|t| t.2).collect()
858 }
859
860 pub fn dispatch_patches(
861 &mut self,
862 patches: &[AinlMemoryNode],
863 frame: &HashMap<String, serde_json::Value>,
864 ) -> Vec<PatchDispatchResult> {
865 let mut w = Vec::new();
866 self.dispatch_patches_collect(patches, frame, &mut w)
867 }
868
869 fn dispatch_patches_collect(
870 &mut self,
871 patches: &[AinlMemoryNode],
872 frame: &HashMap<String, serde_json::Value>,
873 turn_warnings: &mut Vec<TurnWarning>,
874 ) -> Vec<PatchDispatchResult> {
875 let mut out = Vec::new();
876 for node in patches {
877 let res = self.dispatch_one_patch(node, frame);
878 if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
879 tracing::warn!(
880 phase = ?TurnPhase::FitnessWriteBack,
881 error = %e,
882 "non-fatal turn write failed — continuing"
883 );
884 turn_warnings.push(TurnWarning {
885 phase: TurnPhase::FitnessWriteBack,
886 error: format!("{}: {}", res.label, e),
887 });
888 }
889 out.push(res);
890 }
891 out
892 }
893
894 fn dispatch_one_patch(
895 &mut self,
896 node: &AinlMemoryNode,
897 frame: &HashMap<String, serde_json::Value>,
898 ) -> PatchDispatchResult {
899 let label_default = String::new();
900 let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
901 AinlNodeType::Procedural { procedural } => (
902 procedural_label(procedural),
903 procedural.patch_version,
904 procedural.retired,
905 procedural.declared_reads.clone(),
906 procedural.fitness,
907 ),
908 _ => {
909 return PatchDispatchResult {
910 label: label_default,
911 patch_version: 0,
912 fitness_before: 0.0,
913 fitness_after: 0.0,
914 dispatched: false,
915 skip_reason: Some(PatchSkipReason::NotProcedural),
916 adapter_output: None,
917 adapter_name: None,
918 dispatch_duration_ms: 0,
919 };
920 }
921 };
922
923 if pv == 0 {
924 return PatchDispatchResult {
925 label: label_src,
926 patch_version: pv,
927 fitness_before: fitness_opt.unwrap_or(0.5),
928 fitness_after: fitness_opt.unwrap_or(0.5),
929 dispatched: false,
930 skip_reason: Some(PatchSkipReason::ZeroVersion),
931 adapter_output: None,
932 adapter_name: None,
933 dispatch_duration_ms: 0,
934 };
935 }
936 if retired {
937 return PatchDispatchResult {
938 label: label_src.clone(),
939 patch_version: pv,
940 fitness_before: fitness_opt.unwrap_or(0.5),
941 fitness_after: fitness_opt.unwrap_or(0.5),
942 dispatched: false,
943 skip_reason: Some(PatchSkipReason::Retired),
944 adapter_output: None,
945 adapter_name: None,
946 dispatch_duration_ms: 0,
947 };
948 }
949 for key in &reads {
950 if !frame.contains_key(key) {
951 return PatchDispatchResult {
952 label: label_src.clone(),
953 patch_version: pv,
954 fitness_before: fitness_opt.unwrap_or(0.5),
955 fitness_after: fitness_opt.unwrap_or(0.5),
956 dispatched: false,
957 skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
958 adapter_output: None,
959 adapter_name: None,
960 dispatch_duration_ms: 0,
961 };
962 }
963 }
964
965 let patch_label = label_src.clone();
966 let adapter_key = patch_label.as_str();
967 let ctx = PatchDispatchContext {
968 patch_label: adapter_key,
969 node,
970 frame,
971 };
972 let (adapter_output, adapter_name, dispatch_duration_ms) = if let Some(adapter) = self
973 .adapter_registry
974 .get(adapter_key)
975 .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
976 {
977 let aname = adapter.name().to_string();
978 let t_exec = Instant::now();
979 let (out, name) = match adapter.execute_patch(&ctx) {
980 Ok(output) => {
981 tracing::debug!(
982 label = %patch_label,
983 adapter = %aname,
984 "adapter executed patch"
985 );
986 (Some(output), Some(aname))
987 }
988 Err(e) => {
989 tracing::warn!(
990 label = %patch_label,
991 adapter = %aname,
992 error = %e,
993 "adapter execution failed — continuing as metadata dispatch"
994 );
995 (None, Some(aname))
996 }
997 };
998 let ms = t_exec.elapsed().as_millis() as u64;
999 (out, name, ms)
1000 } else {
1001 (None, None, 0u64)
1002 };
1003
1004 let fitness_before = fitness_opt.unwrap_or(0.5);
1005 let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
1006
1007 let updated = match self.memory.with(|m| {
1008 let store = m.sqlite_store();
1009 store.read_node(node.id)
1010 }) {
1011 Ok(Some(mut n)) => {
1012 if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
1013 procedural.fitness = Some(fitness_after);
1014 }
1015 n
1016 }
1017 Ok(None) => {
1018 return PatchDispatchResult {
1019 label: label_src,
1020 patch_version: pv,
1021 fitness_before,
1022 fitness_after: fitness_before,
1023 dispatched: false,
1024 skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
1025 adapter_output,
1026 adapter_name,
1027 dispatch_duration_ms,
1028 };
1029 }
1030 Err(e) => {
1031 return PatchDispatchResult {
1032 label: label_src,
1033 patch_version: pv,
1034 fitness_before,
1035 fitness_after: fitness_before,
1036 dispatched: false,
1037 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1038 adapter_output,
1039 adapter_name,
1040 dispatch_duration_ms,
1041 };
1042 }
1043 };
1044
1045 if self.test_force_fitness_write_failure {
1046 self.test_force_fitness_write_failure = false;
1047 let e = "injected fitness write failure".to_string();
1048 return PatchDispatchResult {
1049 label: label_src.clone(),
1050 patch_version: pv,
1051 fitness_before,
1052 fitness_after: fitness_before,
1053 dispatched: false,
1054 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1055 adapter_output,
1056 adapter_name,
1057 dispatch_duration_ms,
1058 };
1059 }
1060
1061 if let Err(e) = self.memory.with(|m| m.write_node(&updated)) {
1062 return PatchDispatchResult {
1063 label: label_src.clone(),
1064 patch_version: pv,
1065 fitness_before,
1066 fitness_after: fitness_before,
1067 dispatched: false,
1068 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1069 adapter_output,
1070 adapter_name,
1071 dispatch_duration_ms,
1072 };
1073 }
1074
1075 self.hooks
1076 .on_patch_dispatched(label_src.as_str(), fitness_after);
1077
1078 PatchDispatchResult {
1079 label: label_src,
1080 patch_version: pv,
1081 fitness_before,
1082 fitness_after,
1083 dispatched: true,
1084 skip_reason: None,
1085 adapter_output,
1086 adapter_name,
1087 dispatch_duration_ms,
1088 }
1089 }
1090}
1091
1092pub(crate) fn emit_target_name(n: &AinlMemoryNode) -> String {
1093 match &n.node_type {
1094 AinlNodeType::Persona { persona } => persona.trait_name.clone(),
1095 AinlNodeType::Procedural { procedural } => procedural_label(procedural),
1096 AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
1097 AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
1098 AinlNodeType::RuntimeState { runtime_state } => {
1099 format!("runtime_state:{}", runtime_state.agent_id)
1100 }
1101 AinlNodeType::Trajectory { trajectory } => {
1102 format!("trajectory:{}", trajectory.episode_id)
1103 }
1104 AinlNodeType::Failure { failure } => {
1105 format!(
1106 "failure:{}:{}",
1107 failure.source,
1108 failure.tool_name.as_deref().unwrap_or("")
1109 )
1110 }
1111 }
1112}
1113
1114pub(crate) fn procedural_label(p: &ProceduralNode) -> String {
1115 if !p.label.is_empty() {
1116 p.label.clone()
1117 } else {
1118 p.pattern_name.clone()
1119 }
1120}
1121
1122pub(crate) fn fallback_high_recurrence_semantic(
1123 all: Vec<AinlMemoryNode>,
1124 limit: usize,
1125) -> Vec<AinlMemoryNode> {
1126 let mut v: Vec<_> = all
1127 .into_iter()
1128 .filter(|n| {
1129 matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
1130 })
1131 .collect();
1132 v.sort_by(|a, b| {
1133 let ra = match &a.node_type {
1134 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1135 _ => 0,
1136 };
1137 let rb = match &b.node_type {
1138 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1139 _ => 0,
1140 };
1141 rb.cmp(&ra)
1142 });
1143 v.into_iter().take(limit).collect()
1144}
1145
1146pub(crate) fn persona_snapshot_if_evolved(
1147 extractor: &GraphExtractorTask,
1148) -> Option<ainl_persona::PersonaSnapshot> {
1149 let snap = extractor.evolution_engine.snapshot();
1150 let defaults = default_axis_map(0.5);
1151 for axis in PersonaAxis::ALL {
1152 let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
1153 let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
1154 if (s - d).abs() > INGEST_SCORE_EPSILON {
1155 return Some(snap);
1156 }
1157 }
1158 None
1159}
1160
1161pub(crate) fn compile_persona_from_nodes(
1162 nodes: &[AinlMemoryNode],
1163) -> Result<Option<String>, String> {
1164 if nodes.is_empty() {
1165 return Ok(None);
1166 }
1167 let mut lines = Vec::new();
1168 for n in nodes {
1169 if let AinlNodeType::Persona { persona } = &n.node_type {
1170 lines.push(format_persona_line(persona));
1171 }
1172 }
1173 if lines.is_empty() {
1174 Ok(None)
1175 } else {
1176 Ok(Some(lines.join("\n")))
1177 }
1178}
1179
1180fn format_persona_line(p: &PersonaNode) -> String {
1181 format!(
1182 "- {} (strength {:.2}, layer {:?}, source {:?})",
1183 p.trait_name, p.strength, p.layer, p.source
1184 )
1185}
1186
1187pub(crate) fn try_export_graph_json_armaraos(
1189 store: &SqliteGraphStore,
1190 agent_id: &str,
1191) -> Result<(), String> {
1192 let trimmed = std::env::var("AINL_GRAPH_MEMORY_ARMARAOS_EXPORT").unwrap_or_default();
1193 let dir = trimmed.trim();
1194 if dir.is_empty() {
1195 return Ok(());
1196 }
1197 let dir_path = PathBuf::from(dir);
1198 std::fs::create_dir_all(&dir_path).map_err(|e| format!("export mkdir: {e}"))?;
1199 let path = dir_path.join(format!("{agent_id}_graph_export.json"));
1200 let snap = store.export_graph(agent_id)?;
1201 let v = serde_json::to_value(&snap).map_err(|e| format!("serialize: {e}"))?;
1202 std::fs::write(
1203 &path,
1204 serde_json::to_vec_pretty(&v).map_err(|e| format!("json encode: {e}"))?,
1205 )
1206 .map_err(|e| format!("write export: {e}"))?;
1207 Ok(())
1208}
1209
1210pub(crate) fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
1213 if tools_invoked.is_empty() {
1214 return vec!["turn".to_string()];
1215 }
1216 let tags = tag_tool_names(tools_invoked);
1217 let mut seen: BTreeSet<String> = BTreeSet::new();
1218 for t in tags {
1219 if t.namespace == TagNamespace::Tool {
1220 seen.insert(t.value);
1221 }
1222 }
1223 if seen.is_empty() {
1224 vec!["turn".to_string()]
1225 } else {
1226 seen.into_iter().collect()
1227 }
1228}
1229
1230const TRAJECTORY_PREVIEW_MAX: usize = 512;
1231
1232fn truncate_trajectory_preview(s: String) -> String {
1233 if s.len() <= TRAJECTORY_PREVIEW_MAX {
1234 return s;
1235 }
1236 let mut t = s;
1237 t.truncate(TRAJECTORY_PREVIEW_MAX);
1238 t.push('…');
1239 t
1240}
1241
1242pub(crate) fn trajectory_steps_for_runtime_turn(
1244 patch_results: &[PatchDispatchResult],
1245 tools_canonical: &[String],
1246) -> Vec<TrajectoryStep> {
1247 let base_ms = chrono::Utc::now().timestamp_millis();
1248 let mut steps = Vec::new();
1249
1250 for (i, r) in patch_results.iter().enumerate() {
1251 let adapter = r
1252 .adapter_name
1253 .clone()
1254 .unwrap_or_else(|| "patch".to_string());
1255 let operation = if r.label.is_empty() {
1256 "procedural_patch".to_string()
1257 } else {
1258 r.label.clone()
1259 };
1260 let (success, error) = if r.dispatched {
1261 if r.adapter_output.is_some() {
1262 (true, None)
1263 } else if r.adapter_name.is_some() {
1264 (false, Some("adapter_execute_patch_failed".to_string()))
1265 } else {
1266 (true, None)
1267 }
1268 } else {
1269 (true, None)
1270 };
1271 let outputs_preview = r
1272 .adapter_output
1273 .as_ref()
1274 .and_then(|v| serde_json::to_string(v).ok())
1275 .map(truncate_trajectory_preview);
1276
1277 steps.push(TrajectoryStep {
1278 step_id: format!("patch_{i}"),
1279 timestamp_ms: base_ms + i as i64,
1280 adapter,
1281 operation,
1282 inputs_preview: None,
1283 outputs_preview,
1284 duration_ms: r.dispatch_duration_ms,
1285 success,
1286 error,
1287 vitals: None,
1288 freshness_at_step: None,
1289 frame_vars: None,
1290 tool_telemetry: None,
1291 });
1292 }
1293
1294 let tool_base = base_ms + patch_results.len() as i64;
1295 for (i, name) in tools_canonical.iter().enumerate() {
1296 steps.push(TrajectoryStep {
1297 step_id: format!("tool_{i}"),
1298 timestamp_ms: tool_base + i as i64,
1299 adapter: "builtin".into(),
1300 operation: name.clone(),
1301 inputs_preview: None,
1302 outputs_preview: None,
1303 duration_ms: 0,
1304 success: true,
1305 error: None,
1306 vitals: None,
1307 freshness_at_step: None,
1308 frame_vars: None,
1309 tool_telemetry: None,
1310 });
1311 }
1312
1313 steps
1314}
1315
1316pub(crate) fn maybe_persist_trajectory_after_episode(
1319 memory: &ainl_memory::GraphMemory,
1320 agent_id: &str,
1321 episode_id: Uuid,
1322 tools_canonical: &[String],
1323 patch_dispatch_results: &[PatchDispatchResult],
1324 input: &TurnInput,
1325) -> Result<(), String> {
1326 if episode_id.is_nil() || !ainl_memory::trajectory_env_enabled() {
1327 return Ok(());
1328 }
1329 let session_id = input
1330 .trace_event
1331 .as_ref()
1332 .and_then(|v| v.get("session_id").and_then(|x| x.as_str()))
1333 .unwrap_or("ainl-runtime");
1334 let project_id = std::env::var("AINL_MEMORY_PROJECT_ID").ok();
1335 let hash = std::env::var("AINL_SOURCE_HASH")
1336 .ok()
1337 .map(|s| s.trim().to_string())
1338 .filter(|s| !s.is_empty());
1339
1340 let steps = trajectory_steps_for_runtime_turn(patch_dispatch_results, tools_canonical);
1341 let duration_ms: u64 = steps.iter().map(|s| s.duration_ms).sum();
1342 let outcome = if steps.iter().any(|s| !s.success) {
1343 TrajectoryOutcome::PartialSuccess
1344 } else {
1345 TrajectoryOutcome::Success
1346 };
1347
1348 ainl_memory::persist_trajectory_for_episode(
1349 memory,
1350 agent_id,
1351 episode_id,
1352 steps,
1353 outcome,
1354 session_id,
1355 project_id.as_deref(),
1356 hash.as_deref(),
1357 duration_ms,
1358 None,
1359 None,
1360 )?;
1361 Ok(())
1362}
1363
1364pub(crate) fn record_turn_episode(
1365 memory: &ainl_memory::GraphMemory,
1366 agent_id: &str,
1367 input: &TurnInput,
1368 tools_invoked_canonical: &[String],
1369) -> Result<Uuid, String> {
1370 let turn_id = Uuid::new_v4();
1371 let timestamp = chrono::Utc::now().timestamp();
1372 let tools = tools_invoked_canonical.to_vec();
1373 let mut node = AinlMemoryNode::new_episode(
1374 turn_id,
1375 timestamp,
1376 tools.clone(),
1377 None,
1378 input.trace_event.clone(),
1379 );
1380 node.agent_id = agent_id.to_string();
1381 if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
1382 episodic.user_message = Some(input.user_message.clone());
1383 episodic.tools_invoked = tools;
1384 episodic.vitals_gate = input.vitals_gate.clone();
1386 episodic.vitals_phase = input.vitals_phase.clone();
1387 episodic.vitals_trust = input.vitals_trust;
1388 }
1389 memory.write_node(&node)?;
1390 Ok(node.id)
1391}
1392
1393#[cfg(test)]
1394mod trajectory_step_builder_tests {
1395 use super::trajectory_steps_for_runtime_turn;
1396 use crate::engine::PatchDispatchResult;
1397
1398 fn sample_patch(
1399 dispatched: bool,
1400 adapter_name: Option<String>,
1401 adapter_output: Option<serde_json::Value>,
1402 dispatch_duration_ms: u64,
1403 ) -> PatchDispatchResult {
1404 PatchDispatchResult {
1405 label: "my_patch".into(),
1406 patch_version: 1,
1407 fitness_before: 0.5,
1408 fitness_after: 0.6,
1409 dispatched,
1410 skip_reason: None,
1411 adapter_output,
1412 adapter_name,
1413 dispatch_duration_ms,
1414 }
1415 }
1416
1417 #[test]
1418 fn trajectory_steps_orders_patches_then_tools() {
1419 let patches = vec![sample_patch(
1420 true,
1421 Some("graph".into()),
1422 Some(serde_json::json!({"ok": true})),
1423 12,
1424 )];
1425 let tools = vec!["turn".to_string()];
1426 let steps = trajectory_steps_for_runtime_turn(&patches, &tools);
1427 assert_eq!(steps.len(), 2);
1428 assert!(steps[0].step_id.starts_with("patch_"));
1429 assert!(steps[1].step_id.starts_with("tool_"));
1430 assert_eq!(steps[0].duration_ms, 12);
1431 assert!(steps[0].success);
1432 }
1433
1434 #[test]
1435 fn adapter_failure_marks_step_failed() {
1436 let patches = vec![sample_patch(true, Some("graph".into()), None, 5)];
1437 let steps = trajectory_steps_for_runtime_turn(&patches, &[]);
1438 assert_eq!(steps.len(), 1);
1439 assert!(!steps[0].success);
1440 }
1441}
1442
1443#[cfg(feature = "async")]
1444#[path = "runtime_async.rs"]
1445mod runtime_async_impl;