1use std::collections::{BTreeSet, HashMap, HashSet};
4use std::time::Instant;
5
6use ainl_graph_extractor::GraphExtractorTask;
7use ainl_memory::{
8 AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
9 RuntimeStateNode, SqliteGraphStore,
10};
11use ainl_persona::axes::default_axis_map;
12use ainl_persona::{
13 EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
14};
15use ainl_semantic_tagger::infer_topic_tags;
16use ainl_semantic_tagger::tag_tool_names;
17use ainl_semantic_tagger::TagNamespace;
18use uuid::Uuid;
19
20use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
21use crate::engine::{
22 AinlGraphArtifact, MemoryContext, PatchDispatchContext, PatchDispatchResult, PatchSkipReason,
23 TurnInput, TurnOutcome, TurnOutput, EMIT_TO_EDGE,
24};
25use crate::hooks::{NoOpHooks, TurnHooks};
26use crate::RuntimeConfig;
27
28pub struct AinlRuntime {
44 config: RuntimeConfig,
45 memory: ainl_memory::GraphMemory,
46 extractor: GraphExtractorTask,
47 turn_count: u32,
48 last_extraction_turn: u32,
49 delegation_depth: u32,
50 hooks: Box<dyn TurnHooks>,
51 evolution_writes_enabled: bool,
55 persona_cache: Option<String>,
56 #[doc(hidden)]
58 test_force_extraction_failure: bool,
59 adapter_registry: AdapterRegistry,
60}
61
62impl AinlRuntime {
63 pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
64 let agent_id = config.agent_id.clone();
65 let memory = ainl_memory::GraphMemory::from_sqlite_store(store);
66 let (init_turn_count, init_persona_cache, init_last_extraction_turn) =
67 if agent_id.is_empty() {
68 (0, None, 0)
69 } else {
70 match memory.sqlite_store().load_runtime_state(&agent_id) {
71 Ok(Some(state)) => {
72 tracing::info!(
73 agent_id = %agent_id,
74 turn_count = state.turn_count,
75 "restored runtime state"
76 );
77 (
78 state.turn_count,
79 state.last_persona_prompt,
80 state.last_extraction_turn,
81 )
82 }
83 Ok(None) => (0, None, 0),
84 Err(e) => {
85 tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
86 (0, None, 0)
87 }
88 }
89 };
90 Self {
91 extractor: GraphExtractorTask::new(&agent_id),
92 memory,
93 config,
94 turn_count: init_turn_count,
95 last_extraction_turn: init_last_extraction_turn,
96 delegation_depth: 0,
97 hooks: Box::new(NoOpHooks),
98 evolution_writes_enabled: true,
99 persona_cache: init_persona_cache,
100 test_force_extraction_failure: false,
101 adapter_registry: AdapterRegistry::new(),
102 }
103 }
104
105 pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
107 self.adapter_registry.register(adapter);
108 }
109
110 pub fn register_default_patch_adapters(&mut self) {
113 self.register_adapter(GraphPatchAdapter::new());
114 }
115
116 pub fn registered_adapters(&self) -> Vec<&str> {
118 self.adapter_registry.registered_names()
119 }
120
121 #[doc(hidden)]
122 pub fn test_turn_count(&self) -> u32 {
123 self.turn_count
124 }
125
126 #[doc(hidden)]
127 pub fn test_delegation_depth(&self) -> u32 {
128 self.delegation_depth
129 }
130
131 #[doc(hidden)]
132 pub fn test_set_delegation_depth(&mut self, depth: u32) {
133 self.delegation_depth = depth;
134 }
135
136 #[doc(hidden)]
137 pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
138 self.test_force_extraction_failure = fail;
139 }
140
141 pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
142 self.hooks = Box::new(hooks);
143 self
144 }
145
146 pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
151 self.evolution_writes_enabled = enabled;
152 self
153 }
154
155 fn require_evolution_writes_enabled(&self) -> Result<(), String> {
156 if self.evolution_writes_enabled {
157 Ok(())
158 } else {
159 Err(
160 "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
161 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
162 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
163 ainl_memory.db"
164 .to_string(),
165 )
166 }
167 }
168
169 pub fn sqlite_store(&self) -> &SqliteGraphStore {
171 self.memory.sqlite_store()
172 }
173
174 pub fn evolution_engine(&self) -> &EvolutionEngine {
182 &self.extractor.evolution_engine
183 }
184
185 pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
189 &mut self.extractor.evolution_engine
190 }
191
192 pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
194 self.extractor.evolution_engine.ingest_signals(signals)
195 }
196
197 pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
199 self.extractor
200 .evolution_engine
201 .correction_tick(axis, correction);
202 }
203
204 pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
208 self.require_evolution_writes_enabled()?;
209 let store = self.memory.sqlite_store();
210 let snap = self.extractor.evolution_engine.snapshot();
211 self.extractor
212 .evolution_engine
213 .write_persona_node(store, &snap)?;
214 Ok(snap)
215 }
216
217 pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
224 self.require_evolution_writes_enabled()?;
225 let store = self.memory.sqlite_store();
226 self.extractor.evolution_engine.evolve(store)
227 }
228
229 pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
231 AinlGraphArtifact::load(self.memory.sqlite_store(), &self.config.agent_id)
232 }
233
234 pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
237 self.compile_memory_context_for(None)
238 }
239
240 pub fn compile_memory_context_for(
242 &self,
243 user_message: Option<&str>,
244 ) -> Result<MemoryContext, String> {
245 if self.config.agent_id.is_empty() {
246 return Err("RuntimeConfig.agent_id must be set".to_string());
247 }
248 let store = self.memory.sqlite_store();
249 let q = store.query(&self.config.agent_id);
250 let recent_episodes = q.recent_episodes(5)?;
251 let effective_user = user_message
252 .map(|s| s.to_string())
253 .filter(|s| !s.is_empty())
254 .or_else(|| {
255 recent_episodes.first().and_then(|n| {
256 if let AinlNodeType::Episode { episodic } = &n.node_type {
257 episodic.user_message.clone().filter(|m| !m.is_empty())
258 } else {
259 None
260 }
261 })
262 });
263 let all_semantic = q.semantic_nodes()?;
264 let relevant_semantic = match effective_user.as_deref() {
265 Some(msg) => self.relevant_semantic_nodes(msg, all_semantic, 10),
266 None => fallback_high_recurrence_semantic(all_semantic, 10),
267 };
268 let active_patches = q.active_patches()?;
269 let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
270 Ok(MemoryContext {
271 recent_episodes,
272 relevant_semantic,
273 active_patches,
274 persona_snapshot,
275 compiled_at: chrono::Utc::now(),
276 })
277 }
278
279 pub fn route_emit_edges(
281 &self,
282 episode_id: Uuid,
283 turn_output_payload: &serde_json::Value,
284 ) -> Result<(), String> {
285 let store = self.memory.sqlite_store();
286 let neighbors = store
287 .query(&self.config.agent_id)
288 .neighbors(episode_id, EMIT_TO_EDGE)?;
289 for n in neighbors {
290 let target = emit_target_name(&n);
291 self.hooks.on_emit(&target, turn_output_payload);
292 }
293 Ok(())
294 }
295
296 pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutput, String> {
298 self.delegation_depth += 1;
299 let rt_ptr = self as *mut Self;
300 let _depth_guard = scopeguard::guard((), |()| unsafe {
304 if (*rt_ptr).delegation_depth > 0 {
305 (*rt_ptr).delegation_depth -= 1;
306 }
307 });
308
309 if self.delegation_depth > self.config.max_delegation_depth {
310 let out = TurnOutput {
311 outcome: TurnOutcome::DepthLimitExceeded,
312 ..Default::default()
313 };
314 self.hooks.on_turn_complete(&out);
315 return Ok(out);
316 }
317
318 if !self.config.enable_graph_memory {
319 let memory_context = MemoryContext::default();
320 let out = TurnOutput {
321 memory_context,
322 outcome: TurnOutcome::GraphMemoryDisabled,
323 ..Default::default()
324 };
325 self.hooks.on_turn_complete(&out);
326 return Ok(out);
327 }
328
329 if self.config.agent_id.is_empty() {
330 return Err("RuntimeConfig.agent_id must be set for run_turn".to_string());
331 }
332
333 let span = tracing::info_span!(
334 "ainl_runtime.run_turn",
335 agent_id = %self.config.agent_id,
336 turn = self.turn_count,
337 depth = input.depth,
338 );
339 let _span_enter = span.enter();
340
341 let validation: GraphValidationReport = self
342 .memory
343 .sqlite_store()
344 .validate_graph(&self.config.agent_id)?;
345 if !validation.is_valid {
346 let mut msg = String::from("graph validation failed before turn");
347 for d in &validation.dangling_edge_details {
348 msg.push_str(&format!(
349 "; {} -> {} [{}]",
350 d.source_id, d.target_id, d.edge_type
351 ));
352 }
353 return Err(msg);
354 }
355
356 self.hooks
357 .on_artifact_loaded(&self.config.agent_id, validation.node_count);
358
359 let mut patches_failed: Vec<String> = Vec::new();
360 let mut warnings: Vec<String> = Vec::new();
361
362 let t_persona = Instant::now();
363 let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
364 Some(cached.clone())
365 } else {
366 let nodes = self
367 .memory
368 .sqlite_store()
369 .query(&self.config.agent_id)
370 .persona_nodes()?;
371 let compiled = compile_persona_from_nodes(&nodes)?;
372 self.persona_cache = compiled.clone();
373 compiled
374 };
375 self.hooks
376 .on_persona_compiled(persona_prompt_contribution.as_deref());
377 tracing::debug!(
378 target: "ainl_runtime",
379 duration_ms = t_persona.elapsed().as_millis() as u64,
380 has_contribution = persona_prompt_contribution.is_some(),
381 "persona_phase"
382 );
383
384 let t_memory = Instant::now();
385 let memory_context = self.compile_memory_context_for(Some(&input.user_message))?;
386 self.hooks.on_memory_context_ready(&memory_context);
387 tracing::debug!(
388 target: "ainl_runtime",
389 duration_ms = t_memory.elapsed().as_millis() as u64,
390 episode_count = memory_context.recent_episodes.len(),
391 semantic_count = memory_context.relevant_semantic.len(),
392 patch_count = memory_context.active_patches.len(),
393 "memory_context"
394 );
395
396 let t_patches = Instant::now();
397 let patch_dispatch_results = if self.config.enable_graph_memory {
398 self.dispatch_patches_collect(
399 &memory_context.active_patches,
400 &input.frame,
401 &mut patches_failed,
402 )
403 } else {
404 Vec::new()
405 };
406 for r in &patch_dispatch_results {
407 tracing::debug!(
408 target: "ainl_runtime",
409 label = %r.label,
410 dispatched = r.dispatched,
411 fitness_before = r.fitness_before,
412 fitness_after = r.fitness_after,
413 "patch_dispatch"
414 );
415 }
416 tracing::debug!(
417 target: "ainl_runtime",
418 duration_ms = t_patches.elapsed().as_millis() as u64,
419 "patch_dispatch_phase"
420 );
421
422 let dispatched_count = patch_dispatch_results
423 .iter()
424 .filter(|r| r.dispatched)
425 .count() as u32;
426 if dispatched_count >= self.config.max_steps {
427 let out = TurnOutput {
428 patch_dispatch_results,
429 memory_context,
430 persona_prompt_contribution,
431 steps_executed: dispatched_count,
432 outcome: TurnOutcome::StepLimitExceeded {
433 steps_executed: dispatched_count,
434 },
435 ..Default::default()
436 };
437 self.hooks.on_turn_complete(&out);
438 return Ok(out);
439 }
440
441 let t_episode = Instant::now();
442 let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
443 let episode_id = record_turn_episode(
444 &self.memory,
445 &self.config.agent_id,
446 &input,
447 &tools_canonical,
448 )?;
449 self.hooks.on_episode_recorded(episode_id);
450 tracing::debug!(
451 target: "ainl_runtime",
452 duration_ms = t_episode.elapsed().as_millis() as u64,
453 episode_id = %episode_id,
454 "episode_record"
455 );
456
457 for &tid in &input.emit_targets {
458 self.memory
459 .sqlite_store()
460 .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)?;
461 }
462
463 let emit_payload = serde_json::json!({
464 "episode_id": episode_id.to_string(),
465 "user_message": input.user_message,
466 "tools_invoked": tools_canonical,
467 "persona_contribution": persona_prompt_contribution,
468 "turn_count": self.turn_count.wrapping_add(1),
469 });
470 if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
471 tracing::warn!(error = %e, "emit routing failed — continuing");
472 warnings.push(format!("emit_routing: {e}"));
473 }
474
475 self.turn_count = self.turn_count.wrapping_add(1);
476
477 let should_extract = self.config.extraction_interval > 0
478 && self.turn_count.saturating_sub(self.last_extraction_turn)
479 >= self.config.extraction_interval;
480
481 let t_extract = Instant::now();
482 let (extraction_report, extraction_failed) = if should_extract {
483 let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
484
485 let res = if force_fail {
486 tracing::warn!(error = "test_forced", "extraction pass failed — continuing");
487 tracing::debug!(
488 target: "ainl_runtime",
489 duration_ms = t_extract.elapsed().as_millis() as u64,
490 signals_ingested = 0u64,
491 skipped = false,
492 "extraction_pass"
493 );
494 (None, true)
495 } else {
496 match self.extractor.run_pass(self.memory.sqlite_store()) {
497 Ok(report) => {
498 tracing::info!(
499 agent_id = %report.agent_id,
500 signals_extracted = report.signals_extracted,
501 signals_applied = report.signals_applied,
502 semantic_nodes_updated = report.semantic_nodes_updated,
503 "ainl-graph-extractor pass completed (scheduled)"
504 );
505 self.hooks.on_extraction_complete(&report);
506 self.persona_cache = None;
507 tracing::debug!(
508 target: "ainl_runtime",
509 duration_ms = t_extract.elapsed().as_millis() as u64,
510 signals_ingested = report.signals_extracted as u64,
511 skipped = false,
512 "extraction_pass"
513 );
514 (Some(report), false)
515 }
516 Err(e) => {
517 tracing::warn!(error = %e, "extraction pass failed — continuing");
518 tracing::debug!(
519 target: "ainl_runtime",
520 duration_ms = t_extract.elapsed().as_millis() as u64,
521 signals_ingested = 0u64,
522 skipped = false,
523 "extraction_pass"
524 );
525 (None, true)
526 }
527 }
528 };
529 self.last_extraction_turn = self.turn_count;
530 res
531 } else {
532 tracing::debug!(
533 target: "ainl_runtime",
534 duration_ms = t_extract.elapsed().as_millis() as u64,
535 signals_ingested = 0u64,
536 skipped = true,
537 "extraction_pass"
538 );
539 (None, false)
540 };
541
542 let outcome = if extraction_failed || !patches_failed.is_empty() || !warnings.is_empty() {
543 TurnOutcome::PartialSuccess {
544 episode_recorded: true,
545 extraction_failed,
546 patches_failed,
547 warnings,
548 }
549 } else {
550 TurnOutcome::Success
551 };
552
553 let out = TurnOutput {
554 episode_id,
555 persona_prompt_contribution,
556 memory_context,
557 extraction_report,
558 steps_executed: dispatched_count,
559 outcome,
560 patch_dispatch_results,
561 };
562
563 if !self.config.agent_id.is_empty() {
564 let persist_state = RuntimeStateNode {
565 agent_id: self.config.agent_id.clone(),
566 turn_count: self.turn_count,
567 last_extraction_turn: self.last_extraction_turn,
568 last_persona_prompt: self.persona_cache.clone(),
569 updated_at: chrono::Utc::now().to_rfc3339(),
570 };
571 if let Err(e) = self
572 .memory
573 .sqlite_store()
574 .save_runtime_state(&persist_state)
575 {
576 tracing::warn!(error = %e, "failed to persist runtime state — non-fatal");
577 }
578 }
579
580 self.hooks.on_turn_complete(&out);
581 Ok(out)
582 }
583
584 pub fn relevant_semantic_nodes(
586 &self,
587 user_message: &str,
588 all_semantic: Vec<AinlMemoryNode>,
589 limit: usize,
590 ) -> Vec<AinlMemoryNode> {
591 let user_tags = infer_topic_tags(user_message);
592 let user_topics: HashSet<String> = user_tags
593 .iter()
594 .filter(|t| t.namespace == TagNamespace::Topic)
595 .map(|t| t.value.to_lowercase())
596 .collect();
597
598 if user_topics.is_empty() {
599 return fallback_high_recurrence_semantic(all_semantic, limit);
600 }
601
602 let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
603 for n in all_semantic {
604 let (score, rec) = match &n.node_type {
605 AinlNodeType::Semantic { semantic } => {
606 let mut s = 0f32;
607 if let Some(cluster) = &semantic.topic_cluster {
608 for slug in cluster
609 .split([',', ';'])
610 .map(|s| s.trim().to_lowercase())
611 .filter(|s| !s.is_empty())
612 {
613 if user_topics.contains(&slug) {
614 s += 1.0;
615 }
616 }
617 }
618 if s == 0.0 {
619 for tag in &semantic.tags {
620 let tl = tag.to_lowercase();
621 if let Some(rest) = tl.strip_prefix("topic:") {
622 let slug = rest.trim();
623 if user_topics.contains(slug) {
624 s = 0.5;
625 break;
626 }
627 }
628 }
629 }
630 (s, semantic.recurrence_count)
631 }
632 _ => (0.0, 0),
633 };
634 if score > 0.0 {
635 scored.push((score, rec, n));
636 }
637 }
638
639 scored.sort_by(|a, b| {
640 b.0.partial_cmp(&a.0)
641 .unwrap_or(std::cmp::Ordering::Equal)
642 .then_with(|| b.1.cmp(&a.1))
643 });
644 scored.into_iter().take(limit).map(|t| t.2).collect()
645 }
646
647 pub fn dispatch_patches(
648 &mut self,
649 patches: &[AinlMemoryNode],
650 frame: &HashMap<String, serde_json::Value>,
651 ) -> Vec<PatchDispatchResult> {
652 let mut discarded = Vec::new();
653 self.dispatch_patches_collect(patches, frame, &mut discarded)
654 }
655
656 fn dispatch_patches_collect(
657 &mut self,
658 patches: &[AinlMemoryNode],
659 frame: &HashMap<String, serde_json::Value>,
660 patches_failed: &mut Vec<String>,
661 ) -> Vec<PatchDispatchResult> {
662 let mut out = Vec::new();
663 for node in patches {
664 let res = self.dispatch_one_patch(node, frame);
665 if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
666 tracing::warn!(label = %res.label, error = %e, "patch fitness write failed — continuing");
667 patches_failed.push(res.label.clone());
668 }
669 out.push(res);
670 }
671 out
672 }
673
674 fn dispatch_one_patch(
675 &mut self,
676 node: &AinlMemoryNode,
677 frame: &HashMap<String, serde_json::Value>,
678 ) -> PatchDispatchResult {
679 let label_default = String::new();
680 let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
681 AinlNodeType::Procedural { procedural } => (
682 procedural_label(procedural),
683 procedural.patch_version,
684 procedural.retired,
685 procedural.declared_reads.clone(),
686 procedural.fitness,
687 ),
688 _ => {
689 return PatchDispatchResult {
690 label: label_default,
691 patch_version: 0,
692 fitness_before: 0.0,
693 fitness_after: 0.0,
694 dispatched: false,
695 skip_reason: Some(PatchSkipReason::NotProcedural),
696 adapter_output: None,
697 adapter_name: None,
698 };
699 }
700 };
701
702 if pv == 0 {
703 return PatchDispatchResult {
704 label: label_src,
705 patch_version: pv,
706 fitness_before: fitness_opt.unwrap_or(0.5),
707 fitness_after: fitness_opt.unwrap_or(0.5),
708 dispatched: false,
709 skip_reason: Some(PatchSkipReason::ZeroVersion),
710 adapter_output: None,
711 adapter_name: None,
712 };
713 }
714 if retired {
715 return PatchDispatchResult {
716 label: label_src.clone(),
717 patch_version: pv,
718 fitness_before: fitness_opt.unwrap_or(0.5),
719 fitness_after: fitness_opt.unwrap_or(0.5),
720 dispatched: false,
721 skip_reason: Some(PatchSkipReason::Retired),
722 adapter_output: None,
723 adapter_name: None,
724 };
725 }
726 for key in &reads {
727 if !frame.contains_key(key) {
728 return PatchDispatchResult {
729 label: label_src.clone(),
730 patch_version: pv,
731 fitness_before: fitness_opt.unwrap_or(0.5),
732 fitness_after: fitness_opt.unwrap_or(0.5),
733 dispatched: false,
734 skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
735 adapter_output: None,
736 adapter_name: None,
737 };
738 }
739 }
740
741 let patch_label = label_src.clone();
742 let adapter_key = patch_label.as_str();
743 let ctx = PatchDispatchContext {
744 patch_label: adapter_key,
745 node,
746 frame,
747 };
748 let (adapter_output, adapter_name) = if let Some(adapter) = self
749 .adapter_registry
750 .get(adapter_key)
751 .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
752 {
753 let aname = adapter.name().to_string();
754 match adapter.execute_patch(&ctx) {
755 Ok(output) => {
756 tracing::debug!(
757 label = %patch_label,
758 adapter = %aname,
759 "adapter executed patch"
760 );
761 (Some(output), Some(aname))
762 }
763 Err(e) => {
764 tracing::warn!(
765 label = %patch_label,
766 adapter = %aname,
767 error = %e,
768 "adapter execution failed — continuing as metadata dispatch"
769 );
770 (None, Some(aname))
771 }
772 }
773 } else {
774 (None, None)
775 };
776
777 let fitness_before = fitness_opt.unwrap_or(0.5);
778 let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
779
780 let store = self.memory.sqlite_store();
781 let updated = match store.read_node(node.id) {
782 Ok(Some(mut n)) => {
783 if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
784 procedural.fitness = Some(fitness_after);
785 }
786 n
787 }
788 Ok(None) => {
789 return PatchDispatchResult {
790 label: label_src,
791 patch_version: pv,
792 fitness_before,
793 fitness_after: fitness_before,
794 dispatched: false,
795 skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
796 adapter_output,
797 adapter_name,
798 };
799 }
800 Err(e) => {
801 return PatchDispatchResult {
802 label: label_src,
803 patch_version: pv,
804 fitness_before,
805 fitness_after: fitness_before,
806 dispatched: false,
807 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
808 adapter_output,
809 adapter_name,
810 };
811 }
812 };
813
814 if let Err(e) = self.memory.write_node(&updated) {
815 return PatchDispatchResult {
816 label: label_src.clone(),
817 patch_version: pv,
818 fitness_before,
819 fitness_after: fitness_before,
820 dispatched: false,
821 skip_reason: Some(PatchSkipReason::PersistFailed(e)),
822 adapter_output,
823 adapter_name,
824 };
825 }
826
827 self.hooks
828 .on_patch_dispatched(label_src.as_str(), fitness_after);
829
830 PatchDispatchResult {
831 label: label_src,
832 patch_version: pv,
833 fitness_before,
834 fitness_after,
835 dispatched: true,
836 skip_reason: None,
837 adapter_output,
838 adapter_name,
839 }
840 }
841}
842
843fn emit_target_name(n: &AinlMemoryNode) -> String {
844 match &n.node_type {
845 AinlNodeType::Persona { persona } => persona.trait_name.clone(),
846 AinlNodeType::Procedural { procedural } => procedural_label(procedural),
847 AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
848 AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
849 AinlNodeType::RuntimeState { runtime_state } => {
850 format!("runtime_state:{}", runtime_state.agent_id)
851 }
852 }
853}
854
855fn procedural_label(p: &ProceduralNode) -> String {
856 if !p.label.is_empty() {
857 p.label.clone()
858 } else {
859 p.pattern_name.clone()
860 }
861}
862
863fn fallback_high_recurrence_semantic(
864 all: Vec<AinlMemoryNode>,
865 limit: usize,
866) -> Vec<AinlMemoryNode> {
867 let mut v: Vec<_> = all
868 .into_iter()
869 .filter(|n| {
870 matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
871 })
872 .collect();
873 v.sort_by(|a, b| {
874 let ra = match &a.node_type {
875 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
876 _ => 0,
877 };
878 let rb = match &b.node_type {
879 AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
880 _ => 0,
881 };
882 rb.cmp(&ra)
883 });
884 v.into_iter().take(limit).collect()
885}
886
887fn persona_snapshot_if_evolved(
888 extractor: &GraphExtractorTask,
889) -> Option<ainl_persona::PersonaSnapshot> {
890 let snap = extractor.evolution_engine.snapshot();
891 let defaults = default_axis_map(0.5);
892 for axis in PersonaAxis::ALL {
893 let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
894 let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
895 if (s - d).abs() > INGEST_SCORE_EPSILON {
896 return Some(snap);
897 }
898 }
899 None
900}
901
902fn compile_persona_from_nodes(nodes: &[AinlMemoryNode]) -> Result<Option<String>, String> {
903 if nodes.is_empty() {
904 return Ok(None);
905 }
906 let mut lines = Vec::new();
907 for n in nodes {
908 if let AinlNodeType::Persona { persona } = &n.node_type {
909 lines.push(format_persona_line(persona));
910 }
911 }
912 if lines.is_empty() {
913 Ok(None)
914 } else {
915 Ok(Some(lines.join("\n")))
916 }
917}
918
919fn format_persona_line(p: &PersonaNode) -> String {
920 format!(
921 "- {} (strength {:.2}, layer {:?}, source {:?})",
922 p.trait_name, p.strength, p.layer, p.source
923 )
924}
925
926fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
929 if tools_invoked.is_empty() {
930 return vec!["turn".to_string()];
931 }
932 let tags = tag_tool_names(tools_invoked);
933 let mut seen: BTreeSet<String> = BTreeSet::new();
934 for t in tags {
935 if t.namespace == TagNamespace::Tool {
936 seen.insert(t.value);
937 }
938 }
939 if seen.is_empty() {
940 vec!["turn".to_string()]
941 } else {
942 seen.into_iter().collect()
943 }
944}
945
946fn record_turn_episode(
947 memory: &ainl_memory::GraphMemory,
948 agent_id: &str,
949 input: &TurnInput,
950 tools_invoked_canonical: &[String],
951) -> Result<Uuid, String> {
952 let turn_id = Uuid::new_v4();
953 let timestamp = chrono::Utc::now().timestamp();
954 let tools = tools_invoked_canonical.to_vec();
955 let mut node = AinlMemoryNode::new_episode(
956 turn_id,
957 timestamp,
958 tools.clone(),
959 None,
960 input.trace_event.clone(),
961 );
962 node.agent_id = agent_id.to_string();
963 if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
964 episodic.user_message = Some(input.user_message.clone());
965 episodic.tools_invoked = tools;
966 }
967 memory.write_node(&node)?;
968 Ok(node.id)
969}