1use std::collections::{BTreeSet, HashMap, HashSet};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use ainl_graph_extractor::GraphExtractorTask;
10use ainl_memory::{
11 AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
12 RuntimeStateNode, SqliteGraphStore,
13};
14use ainl_persona::axes::default_axis_map;
15use ainl_persona::{
16 EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
17};
18use ainl_semantic_tagger::infer_topic_tags;
19use ainl_semantic_tagger::tag_tool_names;
20use ainl_semantic_tagger::TagNamespace;
21use uuid::Uuid;
22
23use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
24use crate::engine::{
25 AinlGraphArtifact, AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult,
26 PatchSkipReason, TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning,
27 EMIT_TO_EDGE,
28};
29use crate::graph_cell::{GraphCell, SqliteStoreRef};
30#[cfg(feature = "async")]
31use crate::hooks::TurnHooksAsync;
32use crate::hooks::{NoOpHooks, TurnHooks};
33use crate::RuntimeConfig;
34
35pub struct AinlRuntime {
51 config: RuntimeConfig,
52 memory: GraphCell,
53 extractor: GraphExtractorTask,
54 turn_count: u64,
55 last_extraction_at_turn: u64,
56 current_depth: Arc<AtomicU32>,
58 hooks: Box<dyn TurnHooks>,
59 evolution_writes_enabled: bool,
63 persona_cache: Option<String>,
64 #[doc(hidden)]
66 test_force_extraction_failure: bool,
67 #[doc(hidden)]
69 test_force_fitness_write_failure: bool,
70 #[doc(hidden)]
72 test_force_runtime_state_write_failure: bool,
73 adapter_registry: AdapterRegistry,
74 #[cfg(feature = "async")]
76 hooks_async: Option<std::sync::Arc<dyn TurnHooksAsync>>,
77}
78
79impl AinlRuntime {
80 pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
81 let agent_id = config.agent_id.clone();
82 let memory = GraphCell::new(store);
83 let (init_turn_count, init_persona_cache, init_last_extraction_at_turn) =
84 if agent_id.is_empty() {
85 (0, None, 0)
86 } else {
87 match memory.read_runtime_state(&agent_id) {
88 Ok(Some(state)) => {
89 tracing::info!(
90 agent_id = %agent_id,
91 turn_count = state.turn_count,
92 "restored runtime state"
93 );
94 let persona_cache = state
95 .persona_snapshot_json
96 .as_ref()
97 .and_then(|json| serde_json::from_str::<String>(json).ok());
98 (
99 state.turn_count,
100 persona_cache,
101 state.last_extraction_at_turn,
102 )
103 }
104 Ok(None) => (0, None, 0),
105 Err(e) => {
106 tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
107 (0, None, 0)
108 }
109 }
110 };
111 Self {
112 extractor: GraphExtractorTask::new(&agent_id),
113 memory,
114 config,
115 turn_count: init_turn_count,
116 last_extraction_at_turn: init_last_extraction_at_turn,
117 current_depth: Arc::new(AtomicU32::new(0)),
118 hooks: Box::new(NoOpHooks),
119 evolution_writes_enabled: true,
120 persona_cache: init_persona_cache,
121 test_force_extraction_failure: false,
122 test_force_fitness_write_failure: false,
123 test_force_runtime_state_write_failure: false,
124 adapter_registry: AdapterRegistry::new(),
125 #[cfg(feature = "async")]
126 hooks_async: None,
127 }
128 }
129
130 pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
132 self.adapter_registry.register(adapter);
133 }
134
135 pub fn register_default_patch_adapters(&mut self) {
138 self.register_adapter(GraphPatchAdapter::new());
139 }
140
141 pub fn registered_adapters(&self) -> Vec<&str> {
143 self.adapter_registry.registered_names()
144 }
145
146 #[doc(hidden)]
147 #[cfg(any(test, debug_assertions))]
148 pub fn test_turn_count(&self) -> u64 {
149 self.turn_count
150 }
151
152 #[doc(hidden)]
153 #[cfg(any(test, debug_assertions))]
154 pub fn test_persona_cache(&self) -> Option<&str> {
155 self.persona_cache.as_deref()
156 }
157
158 #[doc(hidden)]
159 #[cfg(any(test, debug_assertions))]
160 pub fn test_delegation_depth(&self) -> u32 {
161 self.current_depth.load(Ordering::SeqCst)
162 }
163
164 #[doc(hidden)]
165 #[cfg(any(test, debug_assertions))]
166 pub fn test_set_delegation_depth(&mut self, depth: u32) {
167 self.current_depth.store(depth, Ordering::SeqCst);
168 }
169
170 #[doc(hidden)]
171 #[cfg(any(test, debug_assertions))]
172 pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
173 self.test_force_extraction_failure = fail;
174 }
175
176 #[doc(hidden)]
177 #[cfg(any(test, debug_assertions))]
178 pub fn test_set_force_fitness_write_failure(&mut self, fail: bool) {
179 self.test_force_fitness_write_failure = fail;
180 }
181
182 #[doc(hidden)]
184 #[cfg(any(test, debug_assertions))]
185 pub fn test_extractor_mut(&mut self) -> &mut GraphExtractorTask {
186 &mut self.extractor
187 }
188
189 #[doc(hidden)]
190 #[cfg(any(test, debug_assertions))]
191 pub fn test_set_force_runtime_state_write_failure(&mut self, fail: bool) {
192 self.test_force_runtime_state_write_failure = fail;
193 }
194
195 pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
196 self.hooks = Box::new(hooks);
197 self
198 }
199
200 #[cfg(feature = "async")]
202 pub fn with_hooks_async(mut self, hooks: std::sync::Arc<dyn TurnHooksAsync>) -> Self {
203 self.hooks_async = Some(hooks);
204 self
205 }
206
207 pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
212 self.evolution_writes_enabled = enabled;
213 self
214 }
215
216 pub fn evolution_writes_enabled(&self) -> bool {
221 self.evolution_writes_enabled
222 }
223
224 fn require_evolution_writes_enabled(&self) -> Result<(), String> {
225 if self.evolution_writes_enabled {
226 Ok(())
227 } else {
228 Err(
229 "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
230 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
231 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
232 ainl_memory.db"
233 .to_string(),
234 )
235 }
236 }
237
238 pub fn sqlite_store(&self) -> SqliteStoreRef<'_> {
245 self.memory.sqlite_ref()
246 }
247
248 pub fn evolution_engine(&self) -> &EvolutionEngine {
256 &self.extractor.evolution_engine
257 }
258
259 pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
263 &mut self.extractor.evolution_engine
264 }
265
266 pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
268 self.extractor.evolution_engine.ingest_signals(signals)
269 }
270
271 pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
273 self.extractor
274 .evolution_engine
275 .correction_tick(axis, correction);
276 }
277
278 pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
282 self.require_evolution_writes_enabled()?;
283 let snap = self.extractor.evolution_engine.snapshot();
284 self.memory.with(|m| {
285 self.extractor
286 .evolution_engine
287 .write_persona_node(m.sqlite_store(), &snap)
288 })?;
289 Ok(snap)
290 }
291
292 pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
299 self.require_evolution_writes_enabled()?;
300 self.memory
301 .with(|m| self.extractor.evolution_engine.evolve(m.sqlite_store()))
302 }
303
304 pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
306 self.memory
307 .with(|m| AinlGraphArtifact::load(m.sqlite_store(), &self.config.agent_id))
308 }
309
310 pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
313 self.compile_memory_context_for(None)
314 }
315
316 pub fn compile_memory_context_for(
321 &self,
322 user_message: Option<&str>,
323 ) -> Result<MemoryContext, String> {
324 if self.config.agent_id.is_empty() {
325 return Err("RuntimeConfig.agent_id must be set".to_string());
326 }
327 self.memory.with(|m| {
328 let store = m.sqlite_store();
329 let q = store.query(&self.config.agent_id);
330 let recent_episodes = q.recent_episodes(5)?;
331 let all_semantic = q.semantic_nodes()?;
332 let relevant_semantic =
333 self.relevant_semantic_nodes(user_message.unwrap_or(""), all_semantic, 10);
334 let active_patches = q.active_patches()?;
335 let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
336 Ok(MemoryContext {
337 recent_episodes,
338 relevant_semantic,
339 active_patches,
340 persona_snapshot,
341 compiled_at: chrono::Utc::now(),
342 })
343 })
344 }
345
346 pub fn route_emit_edges(
348 &self,
349 episode_id: Uuid,
350 turn_output_payload: &serde_json::Value,
351 ) -> Result<(), String> {
352 self.memory.with(|m| {
353 let store = m.sqlite_store();
354 let neighbors = store
355 .query(&self.config.agent_id)
356 .neighbors(episode_id, EMIT_TO_EDGE)?;
357 for n in neighbors {
358 let target = emit_target_name(&n);
359 self.hooks.on_emit(&target, turn_output_payload);
360 }
361 Ok(())
362 })
363 }
364
365 pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutcome, AinlRuntimeError> {
367 let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
368 let cd = Arc::clone(&self.current_depth);
369 let _depth_guard = scopeguard::guard((), move |()| {
370 cd.fetch_sub(1, Ordering::SeqCst);
371 });
372
373 if depth >= self.config.max_delegation_depth {
374 return Err(AinlRuntimeError::DelegationDepthExceeded {
375 depth,
376 max: self.config.max_delegation_depth,
377 });
378 }
379
380 if !self.config.enable_graph_memory {
381 let memory_context = MemoryContext::default();
382 let result = TurnResult {
383 memory_context,
384 status: TurnStatus::GraphMemoryDisabled,
385 ..Default::default()
386 };
387 let outcome = TurnOutcome::Complete(result);
388 self.hooks.on_turn_complete(&outcome);
389 return Ok(outcome);
390 }
391
392 if self.config.agent_id.is_empty() {
393 return Err(AinlRuntimeError::Message(
394 "RuntimeConfig.agent_id must be set for run_turn".into(),
395 ));
396 }
397
398 let span = tracing::info_span!(
399 "ainl_runtime.run_turn",
400 agent_id = %self.config.agent_id,
401 turn = self.turn_count,
402 depth = input.depth,
403 );
404 let _span_enter = span.enter();
405
406 let validation: GraphValidationReport = self
407 .memory
408 .with(|m| m.sqlite_store().validate_graph(&self.config.agent_id))
409 .map_err(AinlRuntimeError::from)?;
410 if !validation.is_valid {
411 let mut msg = String::from("graph validation failed before turn");
412 for d in &validation.dangling_edge_details {
413 msg.push_str(&format!(
414 "; {} -> {} [{}]",
415 d.source_id, d.target_id, d.edge_type
416 ));
417 }
418 return Err(AinlRuntimeError::Message(msg));
419 }
420
421 self.hooks
422 .on_artifact_loaded(&self.config.agent_id, validation.node_count);
423
424 let mut turn_warnings: Vec<TurnWarning> = Vec::new();
425
426 let t_persona = Instant::now();
427 let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
428 Some(cached.clone())
429 } else {
430 let nodes = self
431 .memory
432 .with(|m| {
433 m.sqlite_store()
434 .query(&self.config.agent_id)
435 .persona_nodes()
436 })
437 .map_err(AinlRuntimeError::from)?;
438 let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
439 self.persona_cache = compiled.clone();
440 compiled
441 };
442 self.hooks
443 .on_persona_compiled(persona_prompt_contribution.as_deref());
444 tracing::debug!(
445 target: "ainl_runtime",
446 duration_ms = t_persona.elapsed().as_millis() as u64,
447 has_contribution = persona_prompt_contribution.is_some(),
448 "persona_phase"
449 );
450
451 let t_memory = Instant::now();
452 let memory_context = self
453 .compile_memory_context_for(Some(&input.user_message))
454 .map_err(AinlRuntimeError::from)?;
455 self.hooks.on_memory_context_ready(&memory_context);
456 tracing::debug!(
457 target: "ainl_runtime",
458 duration_ms = t_memory.elapsed().as_millis() as u64,
459 episode_count = memory_context.recent_episodes.len(),
460 semantic_count = memory_context.relevant_semantic.len(),
461 patch_count = memory_context.active_patches.len(),
462 "memory_context"
463 );
464
465 let t_patches = Instant::now();
466 let patch_dispatch_results = if self.config.enable_graph_memory {
467 self.dispatch_patches_collect(
468 &memory_context.active_patches,
469 &input.frame,
470 &mut turn_warnings,
471 )
472 } else {
473 Vec::new()
474 };
475 for r in &patch_dispatch_results {
476 tracing::debug!(
477 target: "ainl_runtime",
478 label = %r.label,
479 dispatched = r.dispatched,
480 fitness_before = r.fitness_before,
481 fitness_after = r.fitness_after,
482 "patch_dispatch"
483 );
484 }
485 tracing::debug!(
486 target: "ainl_runtime",
487 duration_ms = t_patches.elapsed().as_millis() as u64,
488 "patch_dispatch_phase"
489 );
490
491 let dispatched_count = patch_dispatch_results
492 .iter()
493 .filter(|r| r.dispatched)
494 .count() as u32;
495 if dispatched_count >= self.config.max_steps {
496 let result = TurnResult {
497 patch_dispatch_results,
498 memory_context,
499 persona_prompt_contribution,
500 steps_executed: dispatched_count,
501 status: TurnStatus::StepLimitExceeded {
502 steps_executed: dispatched_count,
503 },
504 ..Default::default()
505 };
506 let outcome = TurnOutcome::Complete(result);
507 self.hooks.on_turn_complete(&outcome);
508 return Ok(outcome);
509 }
510
511 let t_episode = Instant::now();
512 let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
513 let episode_id = match self
514 .memory
515 .with(|m| record_turn_episode(m, &self.config.agent_id, &input, &tools_canonical))
516 {
517 Ok(id) => id,
518 Err(e) => {
519 tracing::warn!(
520 phase = ?TurnPhase::EpisodeWrite,
521 error = %e,
522 "non-fatal turn write failed — continuing"
523 );
524 turn_warnings.push(TurnWarning {
525 phase: TurnPhase::EpisodeWrite,
526 error: e,
527 });
528 Uuid::nil()
529 }
530 };
531 self.hooks.on_episode_recorded(episode_id);
532 tracing::debug!(
533 target: "ainl_runtime",
534 duration_ms = t_episode.elapsed().as_millis() as u64,
535 episode_id = %episode_id,
536 "episode_record"
537 );
538
539 if !episode_id.is_nil() {
540 for &tid in &input.emit_targets {
541 if let Err(e) = self.memory.with(|m| {
542 m.sqlite_store()
543 .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)
544 }) {
545 tracing::warn!(
546 phase = ?TurnPhase::EpisodeWrite,
547 error = %e,
548 "non-fatal turn write failed — continuing"
549 );
550 turn_warnings.push(TurnWarning {
551 phase: TurnPhase::EpisodeWrite,
552 error: e,
553 });
554 }
555 }
556 }
557
558 let emit_payload = serde_json::json!({
559 "episode_id": episode_id.to_string(),
560 "user_message": input.user_message,
561 "tools_invoked": tools_canonical,
562 "persona_contribution": persona_prompt_contribution,
563 "turn_count": self.turn_count.wrapping_add(1),
564 });
565 if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
566 tracing::warn!(
567 phase = ?TurnPhase::EpisodeWrite,
568 error = %e,
569 "non-fatal turn write failed — continuing"
570 );
571 turn_warnings.push(TurnWarning {
572 phase: TurnPhase::EpisodeWrite,
573 error: format!("emit_routing: {e}"),
574 });
575 }
576
577 self.turn_count = self.turn_count.wrapping_add(1);
578
579 let should_extract = self.config.extraction_interval > 0
580 && self.turn_count.saturating_sub(self.last_extraction_at_turn)
581 >= self.config.extraction_interval as u64;
582
583 let t_extract = Instant::now();
584 let (extraction_report, _extraction_failed) = if should_extract {
585 let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
586
587 let res = if force_fail {
588 let e = "test_forced".to_string();
589 tracing::warn!(
590 phase = ?TurnPhase::ExtractionPass,
591 error = %e,
592 "non-fatal turn write failed — continuing"
593 );
594 turn_warnings.push(TurnWarning {
595 phase: TurnPhase::ExtractionPass,
596 error: e,
597 });
598 tracing::debug!(
599 target: "ainl_runtime",
600 duration_ms = t_extract.elapsed().as_millis() as u64,
601 signals_ingested = 0u64,
602 skipped = false,
603 "extraction_pass"
604 );
605 (None, true)
606 } else {
607 let report = self
608 .memory
609 .with(|m| self.extractor.run_pass(m.sqlite_store()));
610 if let Some(ref e) = report.extract_error {
611 tracing::warn!(
612 phase = ?TurnPhase::ExtractionPass,
613 error = %e,
614 "non-fatal turn write failed — continuing"
615 );
616 turn_warnings.push(TurnWarning {
617 phase: TurnPhase::ExtractionPass,
618 error: e.clone(),
619 });
620 }
621 if let Some(ref e) = report.pattern_error {
622 tracing::warn!(
623 phase = ?TurnPhase::PatternPersistence,
624 error = %e,
625 "non-fatal turn write failed — continuing"
626 );
627 turn_warnings.push(TurnWarning {
628 phase: TurnPhase::PatternPersistence,
629 error: e.clone(),
630 });
631 }
632 if let Some(ref e) = report.persona_error {
633 tracing::warn!(
634 phase = ?TurnPhase::PersonaEvolution,
635 error = %e,
636 "non-fatal turn write failed — continuing"
637 );
638 turn_warnings.push(TurnWarning {
639 phase: TurnPhase::PersonaEvolution,
640 error: e.clone(),
641 });
642 }
643 let extraction_failed = report.has_errors();
644 if !extraction_failed {
645 tracing::info!(
646 agent_id = %report.agent_id,
647 signals_extracted = report.signals_extracted,
648 signals_applied = report.signals_applied,
649 semantic_nodes_updated = report.semantic_nodes_updated,
650 "ainl-graph-extractor pass completed (scheduled)"
651 );
652 }
653 self.hooks.on_extraction_complete(&report);
654 self.persona_cache = None;
655 tracing::debug!(
656 target: "ainl_runtime",
657 duration_ms = t_extract.elapsed().as_millis() as u64,
658 signals_ingested = report.signals_extracted as u64,
659 skipped = false,
660 "extraction_pass"
661 );
662 (Some(report), extraction_failed)
663 };
664 self.last_extraction_at_turn = self.turn_count;
665 res
666 } else {
667 tracing::debug!(
668 target: "ainl_runtime",
669 duration_ms = t_extract.elapsed().as_millis() as u64,
670 signals_ingested = 0u64,
671 skipped = true,
672 "extraction_pass"
673 );
674 (None, false)
675 };
676
677 if let Err(e) = self
678 .memory
679 .with(|m| try_export_graph_json_armaraos(m.sqlite_store(), &self.config.agent_id))
680 {
681 tracing::warn!(
682 phase = ?TurnPhase::ExportRefresh,
683 error = %e,
684 "non-fatal turn write failed — continuing"
685 );
686 turn_warnings.push(TurnWarning {
687 phase: TurnPhase::ExportRefresh,
688 error: e,
689 });
690 }
691
692 if !self.config.agent_id.is_empty() {
693 let state = RuntimeStateNode {
694 agent_id: self.config.agent_id.clone(),
695 turn_count: self.turn_count,
696 last_extraction_at_turn: self.last_extraction_at_turn,
697 persona_snapshot_json: self
698 .persona_cache
699 .as_ref()
700 .and_then(|p| serde_json::to_string(p).ok()),
701 updated_at: chrono::Utc::now().timestamp(),
702 };
703 let write_res = if std::mem::take(&mut self.test_force_runtime_state_write_failure) {
704 Err("injected runtime state write failure".to_string())
705 } else {
706 self.memory.with(|m| m.write_runtime_state(&state))
707 };
708 if let Err(e) = write_res {
709 tracing::warn!(
710 phase = ?TurnPhase::RuntimeStatePersist,
711 error = %e,
712 "failed to persist runtime state — cadence will reset on next restart"
713 );
714 turn_warnings.push(TurnWarning {
715 phase: TurnPhase::RuntimeStatePersist,
716 error: e,
717 });
718 }
719 }
720
721 if let (Some(gate), Some(phase), Some(trust)) = (
723 input.vitals_gate.as_deref(),
724 input.vitals_phase.as_deref(),
725 input.vitals_trust,
726 ) {
727 self.hooks.on_vitals_classified(gate, phase, trust);
728 }
729
730 let result = TurnResult {
731 episode_id,
732 persona_prompt_contribution,
733 memory_context,
734 extraction_report,
735 steps_executed: dispatched_count,
736 patch_dispatch_results,
737 status: TurnStatus::Ok,
738 vitals_gate: input.vitals_gate.clone(),
739 vitals_phase: input.vitals_phase.clone(),
740 vitals_trust: input.vitals_trust,
741 };
742
743 let outcome = if turn_warnings.is_empty() {
744 TurnOutcome::Complete(result)
745 } else {
746 TurnOutcome::PartialSuccess {
747 result,
748 warnings: turn_warnings,
749 }
750 };
751
752 self.hooks.on_turn_complete(&outcome);
753 Ok(outcome)
754 }
755
756 fn relevant_semantic_nodes(
764 &self,
765 user_message: &str,
766 all_semantic: Vec<AinlMemoryNode>,
767 limit: usize,
768 ) -> Vec<AinlMemoryNode> {
769 let user_tags = infer_topic_tags(user_message);
770 let user_topics: HashSet<String> = user_tags
771 .iter()
772 .filter(|t| t.namespace == TagNamespace::Topic)
773 .map(|t| t.value.to_lowercase())
774 .collect();
775
776 if user_message.trim().is_empty() || user_topics.is_empty() {
777 return fallback_high_recurrence_semantic(all_semantic, limit);
778 }
779
780 let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
781 for n in all_semantic {
782 let (score, rec) = match &n.node_type {
783 AinlNodeType::Semantic { semantic } => {
784 let mut s = 0f32;
785 if let Some(cluster) = &semantic.topic_cluster {
786 for slug in cluster
787 .split([',', ';'])
788 .map(|s| s.trim().to_lowercase())
789 .filter(|s| !s.is_empty())
790 {
791 if user_topics.contains(&slug) {
792 s += 1.0;
793 }
794 }
795 }
796 if s == 0.0 {
797 for tag in &semantic.tags {
798 let tl = tag.to_lowercase();
799 if let Some(rest) = tl.strip_prefix("topic:") {
800 let slug = rest.trim().to_lowercase();
801 if user_topics.contains(&slug) {
802 s = 0.5;
803 break;
804 }
805 }
806 }
807 }
808 let confidence = semantic.confidence;
813 for tag in &semantic.tags {
814 let tl = tag.to_lowercase();
815 if tl.starts_with("vitals:") {
816 if tl.ends_with(":pass") {
817 s += 0.2 * confidence;
818 } else if tl == "vitals:elevated" {
819 s -= 0.1;
820 }
821 }
822 }
823 (s, semantic.recurrence_count)
824 }
825 _ => (0.0, 0),
826 };
827 scored.push((score, rec, n));
828 }
829
830 scored.sort_by(|a, b| {
831 b.0.partial_cmp(&a.0)
832 .unwrap_or(std::cmp::Ordering::Equal)
833 .then_with(|| b.1.cmp(&a.1))
834 });
835 scored.into_iter().take(limit).map(|t| t.2).collect()
836 }
837
838 pub fn dispatch_patches(
839 &mut self,
840 patches: &[AinlMemoryNode],
841 frame: &HashMap<String, serde_json::Value>,
842 ) -> Vec<PatchDispatchResult> {
843 let mut w = Vec::new();
844 self.dispatch_patches_collect(patches, frame, &mut w)
845 }
846
847 fn dispatch_patches_collect(
848 &mut self,
849 patches: &[AinlMemoryNode],
850 frame: &HashMap<String, serde_json::Value>,
851 turn_warnings: &mut Vec<TurnWarning>,
852 ) -> Vec<PatchDispatchResult> {
853 let mut out = Vec::new();
854 for node in patches {
855 let res = self.dispatch_one_patch(node, frame);
856 if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
857 tracing::warn!(
858 phase = ?TurnPhase::FitnessWriteBack,
859 error = %e,
860 "non-fatal turn write failed — continuing"
861 );
862 turn_warnings.push(TurnWarning {
863 phase: TurnPhase::FitnessWriteBack,
864 error: format!("{}: {}", res.label, e),
865 });
866 }
867 out.push(res);
868 }
869 out
870 }
871
872 fn dispatch_one_patch(
873 &mut self,
874 node: &AinlMemoryNode,
875 frame: &HashMap<String, serde_json::Value>,
876 ) -> PatchDispatchResult {
877 let label_default = String::new();
878 let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
879 AinlNodeType::Procedural { procedural } => (
880 procedural_label(procedural),
881 procedural.patch_version,
882 procedural.retired,
883 procedural.declared_reads.clone(),
884 procedural.fitness,
885 ),
886 _ => {
887 return PatchDispatchResult {
888 label: label_default,
889 patch_version: 0,
890 fitness_before: 0.0,
891 fitness_after: 0.0,
892 dispatched: false,
893 skip_reason: Some(PatchSkipReason::NotProcedural),
894 adapter_output: None,
895 adapter_name: None,
896 };
897 }
898 };
899
900 if pv == 0 {
901 return PatchDispatchResult {
902 label: label_src,
903 patch_version: pv,
904 fitness_before: fitness_opt.unwrap_or(0.5),
905 fitness_after: fitness_opt.unwrap_or(0.5),
906 dispatched: false,
907 skip_reason: Some(PatchSkipReason::ZeroVersion),
908 adapter_output: None,
909 adapter_name: None,
910 };
911 }
912 if retired {
913 return PatchDispatchResult {
914 label: label_src.clone(),
915 patch_version: pv,
916 fitness_before: fitness_opt.unwrap_or(0.5),
917 fitness_after: fitness_opt.unwrap_or(0.5),
918 dispatched: false,
919 skip_reason: Some(PatchSkipReason::Retired),
920 adapter_output: None,
921 adapter_name: None,
922 };
923 }
924 for key in &reads {
925 if !frame.contains_key(key) {
926 return PatchDispatchResult {
927 label: label_src.clone(),
928 patch_version: pv,
929 fitness_before: fitness_opt.unwrap_or(0.5),
930 fitness_after: fitness_opt.unwrap_or(0.5),
931 dispatched: false,
932 skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
933 adapter_output: None,
934 adapter_name: None,
935 };
936 }
937 }
938
939 let patch_label = label_src.clone();
940 let adapter_key = patch_label.as_str();
941 let ctx = PatchDispatchContext {
942 patch_label: adapter_key,
943 node,
944 frame,
945 };
946 let (adapter_output, adapter_name) = if let Some(adapter) = self
947 .adapter_registry
948 .get(adapter_key)
949 .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
950 {
951 let aname = adapter.name().to_string();
952 match adapter.execute_patch(&ctx) {
953 Ok(output) => {
954 tracing::debug!(
955 label = %patch_label,
956 adapter = %aname,
957 "adapter executed patch"
958 );
959 (Some(output), Some(aname))
960 }
961 Err(e) => {
962 tracing::warn!(
963 label = %patch_label,
964 adapter = %aname,
965 error = %e,
966 "adapter execution failed — continuing as metadata dispatch"
967 );
968 (None, Some(aname))
969 }
970 }
971 } else {
972 (None, None)
973 };
974
975 let fitness_before = fitness_opt.unwrap_or(0.5);
976 let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
977
978 let updated = match self.memory.with(|m| {
979 let store = m.sqlite_store();
980 store.read_node(node.id)
981 }) {
982 Ok(Some(mut n)) => {
983 if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
984 procedural.fitness = Some(fitness_after);
985 }
986 n
987 }
988 Ok(None) => {
989 return PatchDispatchResult {
990 label: label_src,
991 patch_version: pv,
992 fitness_before,
993 fitness_after: fitness_before,
994 dispatched: false,
995 skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
996 adapter_output,
997 adapter_name,
998 };
999 }
1000 Err(e) => {
1001 return PatchDispatchResult {
1002 label: label_src,
1003 patch_version: pv,
1004 fitness_before,
1005 fitness_after: fitness_before,
1006 dispatched: false,
1007 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1008 adapter_output,
1009 adapter_name,
1010 };
1011 }
1012 };
1013
1014 if self.test_force_fitness_write_failure {
1015 self.test_force_fitness_write_failure = false;
1016 let e = "injected fitness write failure".to_string();
1017 return PatchDispatchResult {
1018 label: label_src.clone(),
1019 patch_version: pv,
1020 fitness_before,
1021 fitness_after: fitness_before,
1022 dispatched: false,
1023 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1024 adapter_output,
1025 adapter_name,
1026 };
1027 }
1028
1029 if let Err(e) = self.memory.with(|m| m.write_node(&updated)) {
1030 return PatchDispatchResult {
1031 label: label_src.clone(),
1032 patch_version: pv,
1033 fitness_before,
1034 fitness_after: fitness_before,
1035 dispatched: false,
1036 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1037 adapter_output,
1038 adapter_name,
1039 };
1040 }
1041
1042 self.hooks
1043 .on_patch_dispatched(label_src.as_str(), fitness_after);
1044
1045 PatchDispatchResult {
1046 label: label_src,
1047 patch_version: pv,
1048 fitness_before,
1049 fitness_after,
1050 dispatched: true,
1051 skip_reason: None,
1052 adapter_output,
1053 adapter_name,
1054 }
1055 }
1056}
1057
1058pub(crate) fn emit_target_name(n: &AinlMemoryNode) -> String {
1059 match &n.node_type {
1060 AinlNodeType::Persona { persona } => persona.trait_name.clone(),
1061 AinlNodeType::Procedural { procedural } => procedural_label(procedural),
1062 AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
1063 AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
1064 AinlNodeType::RuntimeState { runtime_state } => {
1065 format!("runtime_state:{}", runtime_state.agent_id)
1066 }
1067 }
1068}
1069
1070pub(crate) fn procedural_label(p: &ProceduralNode) -> String {
1071 if !p.label.is_empty() {
1072 p.label.clone()
1073 } else {
1074 p.pattern_name.clone()
1075 }
1076}
1077
1078pub(crate) fn fallback_high_recurrence_semantic(
1079 all: Vec<AinlMemoryNode>,
1080 limit: usize,
1081) -> Vec<AinlMemoryNode> {
1082 let mut v: Vec<_> = all
1083 .into_iter()
1084 .filter(|n| {
1085 matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
1086 })
1087 .collect();
1088 v.sort_by(|a, b| {
1089 let ra = match &a.node_type {
1090 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1091 _ => 0,
1092 };
1093 let rb = match &b.node_type {
1094 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1095 _ => 0,
1096 };
1097 rb.cmp(&ra)
1098 });
1099 v.into_iter().take(limit).collect()
1100}
1101
1102pub(crate) fn persona_snapshot_if_evolved(
1103 extractor: &GraphExtractorTask,
1104) -> Option<ainl_persona::PersonaSnapshot> {
1105 let snap = extractor.evolution_engine.snapshot();
1106 let defaults = default_axis_map(0.5);
1107 for axis in PersonaAxis::ALL {
1108 let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
1109 let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
1110 if (s - d).abs() > INGEST_SCORE_EPSILON {
1111 return Some(snap);
1112 }
1113 }
1114 None
1115}
1116
1117pub(crate) fn compile_persona_from_nodes(
1118 nodes: &[AinlMemoryNode],
1119) -> Result<Option<String>, String> {
1120 if nodes.is_empty() {
1121 return Ok(None);
1122 }
1123 let mut lines = Vec::new();
1124 for n in nodes {
1125 if let AinlNodeType::Persona { persona } = &n.node_type {
1126 lines.push(format_persona_line(persona));
1127 }
1128 }
1129 if lines.is_empty() {
1130 Ok(None)
1131 } else {
1132 Ok(Some(lines.join("\n")))
1133 }
1134}
1135
1136fn format_persona_line(p: &PersonaNode) -> String {
1137 format!(
1138 "- {} (strength {:.2}, layer {:?}, source {:?})",
1139 p.trait_name, p.strength, p.layer, p.source
1140 )
1141}
1142
1143pub(crate) fn try_export_graph_json_armaraos(
1147 store: &SqliteGraphStore,
1148 agent_id: &str,
1149) -> Result<(), String> {
1150 let trimmed = std::env::var("AINL_GRAPH_MEMORY_ARMARAOS_EXPORT").unwrap_or_default();
1151 let dir = trimmed.trim();
1152 if dir.is_empty() {
1153 return Ok(());
1154 }
1155 let dir_path = PathBuf::from(dir);
1156 std::fs::create_dir_all(&dir_path).map_err(|e| format!("export mkdir: {e}"))?;
1157 let path = dir_path.join(format!("{agent_id}_graph_export.json"));
1158 let snap = store.export_graph(agent_id)?;
1159 let v = serde_json::to_value(&snap).map_err(|e| format!("serialize: {e}"))?;
1160 std::fs::write(
1161 &path,
1162 serde_json::to_vec_pretty(&v).map_err(|e| format!("json encode: {e}"))?,
1163 )
1164 .map_err(|e| format!("write export: {e}"))?;
1165 Ok(())
1166}
1167
1168pub(crate) fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
1169 if tools_invoked.is_empty() {
1170 return vec!["turn".to_string()];
1171 }
1172 let tags = tag_tool_names(tools_invoked);
1173 let mut seen: BTreeSet<String> = BTreeSet::new();
1174 for t in tags {
1175 if t.namespace == TagNamespace::Tool {
1176 seen.insert(t.value);
1177 }
1178 }
1179 if seen.is_empty() {
1180 vec!["turn".to_string()]
1181 } else {
1182 seen.into_iter().collect()
1183 }
1184}
1185
1186pub(crate) fn record_turn_episode(
1187 memory: &ainl_memory::GraphMemory,
1188 agent_id: &str,
1189 input: &TurnInput,
1190 tools_invoked_canonical: &[String],
1191) -> Result<Uuid, String> {
1192 let turn_id = Uuid::new_v4();
1193 let timestamp = chrono::Utc::now().timestamp();
1194 let tools = tools_invoked_canonical.to_vec();
1195 let mut node = AinlMemoryNode::new_episode(
1196 turn_id,
1197 timestamp,
1198 tools.clone(),
1199 None,
1200 input.trace_event.clone(),
1201 );
1202 node.agent_id = agent_id.to_string();
1203 if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
1204 episodic.user_message = Some(input.user_message.clone());
1205 episodic.tools_invoked = tools;
1206 episodic.vitals_gate = input.vitals_gate.clone();
1208 episodic.vitals_phase = input.vitals_phase.clone();
1209 episodic.vitals_trust = input.vitals_trust;
1210 }
1211 memory.write_node(&node)?;
1212 Ok(node.id)
1213}
1214
1215#[cfg(feature = "async")]
1216#[path = "runtime_async.rs"]
1217mod runtime_async_impl;