1use crate::agent::{AgentConfig, ReActLoop, ReActStep, ToolSpec};
20use crate::error::AgentRuntimeError;
21use crate::graph::GraphStore;
22use crate::memory::{AgentId, EpisodicStore, WorkingMemory};
23use crate::metrics::RuntimeMetrics;
24use crate::orchestrator::BackpressureGuard;
25use serde::{Deserialize, Serialize};
26use std::marker::PhantomData;
27use std::sync::atomic::Ordering;
28use std::sync::Arc;
29use std::time::Instant;
30
31pub struct NeedsConfig;
35pub struct HasConfig;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct AgentSession {
43 pub session_id: String,
45 pub agent_id: AgentId,
47 pub steps: Vec<ReActStep>,
49 pub memory_hits: usize,
51 pub graph_lookups: usize,
53 pub duration_ms: u64,
55}
56
57impl AgentSession {
58 pub fn step_count(&self) -> usize {
60 self.steps.len()
61 }
62
63 #[cfg(feature = "persistence")]
65 pub async fn save_checkpoint(
66 &self,
67 backend: &dyn crate::persistence::PersistenceBackend,
68 ) -> Result<(), AgentRuntimeError> {
69 let key = format!("session:{}", self.session_id);
70 let bytes = serde_json::to_vec(self)
71 .map_err(|e| AgentRuntimeError::Persistence(format!("serialize: {e}")))?;
72 backend.save(&key, &bytes).await
73 }
74
75 #[cfg(feature = "persistence")]
79 pub async fn load_checkpoint(
80 backend: &dyn crate::persistence::PersistenceBackend,
81 session_id: &str,
82 ) -> Result<Option<AgentSession>, AgentRuntimeError> {
83 let key = format!("session:{session_id}");
84 match backend.load(&key).await? {
85 None => Ok(None),
86 Some(bytes) => {
87 let session = serde_json::from_slice(&bytes)
88 .map_err(|e| AgentRuntimeError::Persistence(format!("deserialize: {e}")))?;
89 Ok(Some(session))
90 }
91 }
92 }
93
94 #[cfg(feature = "persistence")]
98 pub async fn load_step_checkpoint(
99 backend: &dyn crate::persistence::PersistenceBackend,
100 session_id: &str,
101 step: usize,
102 ) -> Result<Option<AgentSession>, AgentRuntimeError> {
103 let key = format!("session:{session_id}:step:{step}");
104 match backend.load(&key).await? {
105 None => Ok(None),
106 Some(bytes) => {
107 let session = serde_json::from_slice(&bytes)
108 .map_err(|e| AgentRuntimeError::Persistence(format!("deserialize: {e}")))?;
109 Ok(Some(session))
110 }
111 }
112 }
113}
114
115pub struct AgentRuntimeBuilder<S = NeedsConfig> {
131 memory: Option<EpisodicStore>,
132 working: Option<WorkingMemory>,
133 graph: Option<GraphStore>,
134 backpressure: Option<BackpressureGuard>,
135 agent_config: Option<AgentConfig>,
136 tools: Vec<Arc<ToolSpec>>,
137 metrics: Arc<RuntimeMetrics>,
138 #[cfg(feature = "persistence")]
139 checkpoint_backend: Option<Arc<dyn crate::persistence::PersistenceBackend>>,
140 _state: PhantomData<S>,
141}
142
143impl std::fmt::Debug for AgentRuntimeBuilder<NeedsConfig> {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 f.debug_struct("AgentRuntimeBuilder<NeedsConfig>")
146 .field("memory", &self.memory.is_some())
147 .field("working", &self.working.is_some())
148 .field("graph", &self.graph.is_some())
149 .field("backpressure", &self.backpressure.is_some())
150 .field("tools", &self.tools.len())
151 .finish()
152 }
153}
154
155impl std::fmt::Debug for AgentRuntimeBuilder<HasConfig> {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("AgentRuntimeBuilder<HasConfig>")
158 .field("memory", &self.memory.is_some())
159 .field("working", &self.working.is_some())
160 .field("graph", &self.graph.is_some())
161 .field("backpressure", &self.backpressure.is_some())
162 .field("agent_config", &self.agent_config.is_some())
163 .field("tools", &self.tools.len())
164 .finish()
165 }
166}
167
168impl Default for AgentRuntimeBuilder<NeedsConfig> {
169 fn default() -> Self {
170 Self {
171 memory: None,
172 working: None,
173 graph: None,
174 backpressure: None,
175 agent_config: None,
176 tools: Vec::new(),
177 metrics: RuntimeMetrics::new(),
178 #[cfg(feature = "persistence")]
179 checkpoint_backend: None,
180 _state: PhantomData,
181 }
182 }
183}
184
185impl<S> AgentRuntimeBuilder<S> {
187 pub fn with_memory(mut self, store: EpisodicStore) -> Self {
189 self.memory = Some(store);
190 self
191 }
192
193 pub fn with_working_memory(mut self, wm: WorkingMemory) -> Self {
195 self.working = Some(wm);
196 self
197 }
198
199 pub fn with_graph(mut self, graph: GraphStore) -> Self {
201 self.graph = Some(graph);
202 self
203 }
204
205 pub fn with_backpressure(mut self, guard: BackpressureGuard) -> Self {
207 self.backpressure = Some(guard);
208 self
209 }
210
211 pub fn register_tool(mut self, spec: ToolSpec) -> Self {
213 self.tools.push(Arc::new(spec));
214 self
215 }
216
217 pub fn with_metrics(mut self, metrics: Arc<RuntimeMetrics>) -> Self {
219 self.metrics = metrics;
220 self
221 }
222
223 #[cfg(feature = "persistence")]
225 pub fn with_checkpoint_backend(
226 mut self,
227 backend: Arc<dyn crate::persistence::PersistenceBackend>,
228 ) -> Self {
229 self.checkpoint_backend = Some(backend);
230 self
231 }
232}
233
234impl AgentRuntimeBuilder<NeedsConfig> {
236 pub fn new() -> Self {
238 Self::default()
239 }
240
241 pub fn with_agent_config(self, config: AgentConfig) -> AgentRuntimeBuilder<HasConfig> {
246 AgentRuntimeBuilder {
247 memory: self.memory,
248 working: self.working,
249 graph: self.graph,
250 backpressure: self.backpressure,
251 agent_config: Some(config),
252 tools: self.tools,
253 metrics: self.metrics,
254 #[cfg(feature = "persistence")]
255 checkpoint_backend: self.checkpoint_backend,
256 _state: PhantomData,
257 }
258 }
259}
260
261impl AgentRuntimeBuilder<HasConfig> {
263 pub fn build(self) -> AgentRuntime {
267 #[allow(clippy::unwrap_used)]
270 let agent_config = self.agent_config.unwrap();
271
272 AgentRuntime {
273 memory: self.memory,
274 working: self.working,
275 graph: self.graph,
276 backpressure: self.backpressure,
277 agent_config,
278 tools: self.tools,
279 metrics: self.metrics,
280 #[cfg(feature = "persistence")]
281 checkpoint_backend: self.checkpoint_backend,
282 }
283 }
284}
285
286pub struct AgentRuntime {
290 memory: Option<EpisodicStore>,
291 working: Option<WorkingMemory>,
292 graph: Option<GraphStore>,
293 backpressure: Option<BackpressureGuard>,
294 agent_config: AgentConfig,
295 tools: Vec<Arc<ToolSpec>>,
296 metrics: Arc<RuntimeMetrics>,
297 #[cfg(feature = "persistence")]
298 checkpoint_backend: Option<Arc<dyn crate::persistence::PersistenceBackend>>,
299}
300
301impl std::fmt::Debug for AgentRuntime {
302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303 let mut s = f.debug_struct("AgentRuntime");
304 s.field("memory", &self.memory.is_some())
305 .field("working", &self.working.is_some())
306 .field("graph", &self.graph.is_some())
307 .field("backpressure", &self.backpressure.is_some())
308 .field("tools", &self.tools.len());
309 #[cfg(feature = "persistence")]
310 s.field("checkpoint_backend", &self.checkpoint_backend.is_some());
311 s.finish()
312 }
313}
314
315impl AgentRuntime {
316 pub fn builder() -> AgentRuntimeBuilder<NeedsConfig> {
318 AgentRuntimeBuilder::new()
319 }
320
321 pub fn metrics(&self) -> Arc<RuntimeMetrics> {
323 Arc::clone(&self.metrics)
324 }
325
326 #[tracing::instrument(skip(self, infer), fields(agent_id = %agent_id))]
339 pub async fn run_agent<F, Fut>(
340 &self,
341 agent_id: AgentId,
342 prompt: &str,
343 infer: F,
344 ) -> Result<AgentSession, AgentRuntimeError>
345 where
346 F: FnMut(String) -> Fut,
347 Fut: std::future::Future<Output = String>,
348 {
349 self.metrics.total_sessions.fetch_add(1, Ordering::Relaxed);
350 self.metrics.active_sessions.fetch_add(1, Ordering::Relaxed);
351
352 let backpressure_result = if let Some(ref guard) = self.backpressure {
354 guard.try_acquire()
355 } else {
356 Ok(())
357 };
358
359 if let Err(e) = backpressure_result {
360 tracing::warn!(agent_id = %agent_id, error = %e, "backpressure shed: rejecting session");
361 self.metrics
362 .backpressure_shed_count
363 .fetch_add(1, Ordering::Relaxed);
364 self.metrics.active_sessions.fetch_sub(1, Ordering::Relaxed);
365 return Err(e);
366 }
367
368 tracing::info!(agent_id = %agent_id, "agent session starting");
369 let outcome = self.run_agent_inner(agent_id.clone(), prompt, infer).await;
370
371 if let Some(ref guard) = self.backpressure {
373 let _ = guard.release();
374 }
375
376 self.metrics.active_sessions.fetch_sub(1, Ordering::Relaxed);
377
378 match &outcome {
379 Ok(session) => {
380 tracing::info!(
381 agent_id = %agent_id,
382 session_id = %session.session_id,
383 steps = session.step_count(),
384 duration_ms = session.duration_ms,
385 "agent session completed"
386 );
387 self.metrics
388 .total_steps
389 .fetch_add(session.step_count() as u64, Ordering::Relaxed);
390 }
391 Err(e) => {
392 tracing::error!(agent_id = %agent_id, error = %e, "agent session failed");
393 }
394 }
395
396 outcome
397 }
398
399 #[tracing::instrument(skip(self, infer), fields(agent_id = %agent_id, session_id = tracing::field::Empty))]
401 async fn run_agent_inner<F, Fut>(
402 &self,
403 agent_id: AgentId,
404 prompt: &str,
405 infer: F,
406 ) -> Result<AgentSession, AgentRuntimeError>
407 where
408 F: FnMut(String) -> Fut,
409 Fut: std::future::Future<Output = String>,
410 {
411 let start = Instant::now();
412 let session_id = uuid::Uuid::new_v4().to_string();
413
414 let mut memory_hits = 0usize;
415 let mut graph_lookups = 0usize;
416
417 let enriched_prompt = if let Some(ref store) = self.memory {
419 let memories = store.recall(&agent_id, self.agent_config.max_memory_recalls)?;
420
421 let memories = if let Some(token_budget) = self.agent_config.max_memory_tokens {
423 let mut used = 0usize;
424 memories
425 .into_iter()
426 .filter(|m| {
427 let tokens = (m.content.len() / 4).max(1);
428 if used + tokens <= token_budget {
429 used += tokens;
430 true
431 } else {
432 false
433 }
434 })
435 .collect::<Vec<_>>()
436 } else {
437 memories
438 };
439
440 memory_hits = memories.len();
441 self.metrics
442 .memory_recall_count
443 .fetch_add(1, Ordering::Relaxed);
444
445 if let Some(budget) = self.agent_config.max_memory_tokens {
446 tracing::debug!(
447 "memory token budget: {budget}, injecting {} items",
448 memory_hits
449 );
450 } else {
451 tracing::debug!("enriched prompt with {} memory items", memory_hits);
452 }
453
454 if memories.is_empty() {
455 prompt.to_owned()
456 } else {
457 let mem_context: Vec<String> = memories
458 .iter()
459 .map(|m| format!("- {}", m.content))
460 .collect();
461 format!(
462 "Relevant memories:\n{}\n\nCurrent prompt: {prompt}",
463 mem_context.join("\n")
464 )
465 }
466 } else {
467 prompt.to_owned()
468 };
469
470 let enriched_prompt = if let Some(ref wm) = self.working {
472 let entries = wm.entries()?;
473 if entries.is_empty() {
474 enriched_prompt
475 } else {
476 let wm_context: Vec<String> =
477 entries.iter().map(|(k, v)| format!(" {k}: {v}")).collect();
478 format!(
479 "{enriched_prompt}\n\nCurrent working state:\n{}",
480 wm_context.join("\n")
481 )
482 }
483 } else {
484 enriched_prompt
485 };
486
487 if let Some(ref graph) = self.graph {
489 graph_lookups = graph.entity_count()?;
490 tracing::debug!("graph has {} entities", graph_lookups);
491 }
492
493 let mut react_loop = ReActLoop::new(self.agent_config.clone());
499 for tool in &self.tools {
500 let tool_arc = Arc::clone(tool);
501 let required_fields = tool_arc.required_fields.clone();
502 #[cfg(feature = "orchestrator")]
503 let circuit_breaker = tool_arc.circuit_breaker.clone();
504
505 let mut spec = ToolSpec::new_async(
506 tool_arc.name.clone(),
507 tool_arc.description.clone(),
508 move |args| {
509 let t = Arc::clone(&tool_arc);
510 Box::pin(async move { t.call(args).await })
511 },
512 )
513 .with_required_fields(required_fields);
514
515 #[cfg(feature = "orchestrator")]
516 if let Some(cb) = circuit_breaker {
517 spec = spec.with_circuit_breaker(cb);
518 }
519
520 react_loop.register_tool(spec);
521 }
522
523 tracing::Span::current().record("session_id", &session_id.as_str());
526
527 let steps = react_loop.run(&enriched_prompt, infer).await?;
528 let duration_ms = start.elapsed().as_millis() as u64;
529
530 let session = AgentSession {
531 session_id: session_id.clone(),
532 agent_id,
533 steps,
534 memory_hits,
535 graph_lookups,
536 duration_ms,
537 };
538
539 #[cfg(feature = "persistence")]
541 if let Some(ref backend) = self.checkpoint_backend {
542 tracing::info!(session_id = %session_id, "saving session checkpoint");
543 session.save_checkpoint(backend.as_ref()).await?;
544
545 for i in 1..=session.steps.len() {
547 let partial = AgentSession {
548 session_id: session_id.clone(),
549 agent_id: session.agent_id.clone(),
550 steps: session.steps[..i].to_vec(),
551 memory_hits: session.memory_hits,
552 graph_lookups: session.graph_lookups,
553 duration_ms: session.duration_ms,
554 };
555 let key = format!("session:{session_id}:step:{i}");
556 match serde_json::to_vec(&partial) {
557 Ok(bytes) => {
558 if let Err(e) = backend.save(&key, &bytes).await {
559 tracing::warn!(
560 session_id = %session_id,
561 step = i,
562 error = %e,
563 "failed to save step checkpoint"
564 );
565 }
566 }
567 Err(e) => {
568 tracing::warn!(
569 session_id = %session_id,
570 step = i,
571 error = %e,
572 "failed to serialize step checkpoint"
573 );
574 }
575 }
576 }
577 }
578
579 Ok(session)
580 }
581
582 pub fn memory(&self) -> Option<&EpisodicStore> {
584 self.memory.as_ref()
585 }
586
587 pub fn graph(&self) -> Option<&GraphStore> {
589 self.graph.as_ref()
590 }
591
592 pub fn working_memory(&self) -> Option<&WorkingMemory> {
594 self.working.as_ref()
595 }
596}
597
598#[cfg(test)]
601mod tests {
602 use super::*;
603 use crate::graph::{Entity, GraphStore, Relationship};
604 use crate::memory::EpisodicStore;
605
606 fn simple_config() -> AgentConfig {
607 AgentConfig::new(5, "test")
608 }
609
610 async fn final_answer_infer(_ctx: String) -> String {
611 "Thought: done\nAction: FINAL_ANSWER 42".into()
612 }
613
614 #[tokio::test]
625 async fn test_builder_with_config_compiles() {
626 let _runtime = AgentRuntime::builder()
627 .with_agent_config(simple_config())
628 .build();
629 }
631
632 #[tokio::test]
633 async fn test_builder_succeeds_with_minimal_config() {
634 let _runtime = AgentRuntime::builder()
635 .with_agent_config(simple_config())
636 .build();
637 }
638
639 #[tokio::test]
640 async fn test_builder_with_all_subsystems() {
641 let _runtime = AgentRuntime::builder()
642 .with_agent_config(simple_config())
643 .with_memory(EpisodicStore::new())
644 .with_graph(GraphStore::new())
645 .with_working_memory(WorkingMemory::new(10).unwrap())
646 .with_backpressure(BackpressureGuard::new(5).unwrap())
647 .build();
648 }
649
650 #[tokio::test]
651 async fn test_builder_produces_runtime_with_config() {
652 let runtime = AgentRuntime::builder()
655 .with_agent_config(simple_config())
656 .build();
657 let session = runtime
658 .run_agent(AgentId::new("agent-x"), "hello", final_answer_infer)
659 .await
660 .unwrap();
661 assert!(session.step_count() >= 1);
662 assert!(!session.session_id.is_empty());
663 }
664
665 #[tokio::test]
668 async fn test_run_agent_returns_session_with_steps() {
669 let runtime = AgentRuntime::builder()
670 .with_agent_config(simple_config())
671 .build();
672
673 let session = runtime
674 .run_agent(AgentId::new("agent-1"), "hello", final_answer_infer)
675 .await
676 .unwrap();
677
678 assert_eq!(session.step_count(), 1);
679 }
680
681 #[tokio::test]
682 async fn test_run_agent_session_has_agent_id() {
683 let runtime = AgentRuntime::builder()
684 .with_agent_config(simple_config())
685 .build();
686
687 let session = runtime
688 .run_agent(AgentId::new("agent-42"), "hello", final_answer_infer)
689 .await
690 .unwrap();
691
692 assert_eq!(session.agent_id.0, "agent-42");
693 }
694
695 #[tokio::test]
696 async fn test_run_agent_session_duration_is_set() {
697 let runtime = AgentRuntime::builder()
698 .with_agent_config(simple_config())
699 .build();
700
701 let session = runtime
702 .run_agent(AgentId::new("a"), "hello", final_answer_infer)
703 .await
704 .unwrap();
705
706 let _ = session.duration_ms; }
709
710 #[tokio::test]
711 async fn test_run_agent_session_has_session_id() {
712 let runtime = AgentRuntime::builder()
713 .with_agent_config(simple_config())
714 .build();
715
716 let session = runtime
717 .run_agent(AgentId::new("a"), "hello", final_answer_infer)
718 .await
719 .unwrap();
720
721 assert!(!session.session_id.is_empty());
723 assert_eq!(session.session_id.len(), 36); }
725
726 #[tokio::test]
727 async fn test_run_agent_memory_hits_zero_without_memory() {
728 let runtime = AgentRuntime::builder()
729 .with_agent_config(simple_config())
730 .build();
731
732 let session = runtime
733 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
734 .await
735 .unwrap();
736
737 assert_eq!(session.memory_hits, 0);
738 }
739
740 #[tokio::test]
741 async fn test_run_agent_memory_hits_counts_recalled_items() {
742 let store = EpisodicStore::new();
743 let agent = AgentId::new("mem-agent");
744 store
745 .add_episode(agent.clone(), "remembered fact", 0.8)
746 .unwrap();
747
748 let runtime = AgentRuntime::builder()
749 .with_agent_config(simple_config())
750 .with_memory(store)
751 .build();
752
753 let session = runtime
754 .run_agent(agent, "prompt", final_answer_infer)
755 .await
756 .unwrap();
757
758 assert_eq!(session.memory_hits, 1);
759 }
760
761 #[tokio::test]
762 async fn test_run_agent_graph_lookups_counts_entities() {
763 let graph = GraphStore::new();
764 graph.add_entity(Entity::new("e1", "Node")).unwrap();
765 graph.add_entity(Entity::new("e2", "Node")).unwrap();
766
767 let runtime = AgentRuntime::builder()
768 .with_agent_config(simple_config())
769 .with_graph(graph)
770 .build();
771
772 let session = runtime
773 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
774 .await
775 .unwrap();
776
777 assert_eq!(session.graph_lookups, 2);
778 }
779
780 #[tokio::test]
781 async fn test_run_agent_backpressure_released_after_run() {
782 let guard = BackpressureGuard::new(3).unwrap();
783
784 let runtime = AgentRuntime::builder()
785 .with_agent_config(simple_config())
786 .with_backpressure(guard.clone())
787 .build();
788
789 runtime
790 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
791 .await
792 .unwrap();
793
794 assert_eq!(guard.depth().unwrap(), 0);
795 }
796
797 #[tokio::test]
798 async fn test_run_agent_backpressure_sheds_when_full() {
799 let guard = BackpressureGuard::new(1).unwrap();
800 guard.try_acquire().unwrap(); let runtime = AgentRuntime::builder()
803 .with_agent_config(simple_config())
804 .with_backpressure(guard)
805 .build();
806
807 let result = runtime
808 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
809 .await;
810 assert!(matches!(
811 result,
812 Err(AgentRuntimeError::BackpressureShed { .. })
813 ));
814 }
815
816 #[tokio::test]
817 async fn test_run_agent_max_iterations_error_propagated() {
818 let cfg = AgentConfig::new(2, "model");
819 let runtime = AgentRuntime::builder().with_agent_config(cfg).build();
820
821 let result = runtime
823 .run_agent(AgentId::new("a"), "prompt", |_ctx: String| async {
824 "Thought: looping\nAction: FINAL_ANSWER done".to_string()
825 })
826 .await;
827 assert!(result.is_ok()); }
829
830 #[tokio::test]
831 async fn test_agent_session_step_count_matches_steps() {
832 let session = AgentSession {
833 session_id: "test-session-id".into(),
834 agent_id: AgentId::new("a"),
835 steps: vec![
836 ReActStep {
837 thought: "t".into(),
838 action: "a".into(),
839 observation: "o".into(),
840 },
841 ReActStep {
842 thought: "t2".into(),
843 action: "FINAL_ANSWER".into(),
844 observation: "done".into(),
845 },
846 ],
847 memory_hits: 0,
848 graph_lookups: 0,
849 duration_ms: 10,
850 };
851 assert_eq!(session.step_count(), 2);
852 }
853
854 #[tokio::test]
857 async fn test_runtime_memory_accessor_returns_none_when_not_configured() {
858 let runtime = AgentRuntime::builder()
859 .with_agent_config(simple_config())
860 .build();
861 assert!(runtime.memory().is_none());
862 }
863
864 #[tokio::test]
865 async fn test_runtime_memory_accessor_returns_some_when_configured() {
866 let runtime = AgentRuntime::builder()
867 .with_agent_config(simple_config())
868 .with_memory(EpisodicStore::new())
869 .build();
870 assert!(runtime.memory().is_some());
871 }
872
873 #[tokio::test]
874 async fn test_runtime_graph_accessor_returns_none_when_not_configured() {
875 let runtime = AgentRuntime::builder()
876 .with_agent_config(simple_config())
877 .build();
878 assert!(runtime.graph().is_none());
879 }
880
881 #[tokio::test]
882 async fn test_runtime_graph_accessor_returns_some_when_configured() {
883 let runtime = AgentRuntime::builder()
884 .with_agent_config(simple_config())
885 .with_graph(GraphStore::new())
886 .build();
887 assert!(runtime.graph().is_some());
888 }
889
890 #[tokio::test]
891 async fn test_runtime_working_memory_accessor() {
892 let runtime = AgentRuntime::builder()
893 .with_agent_config(simple_config())
894 .with_working_memory(WorkingMemory::new(5).unwrap())
895 .build();
896 assert!(runtime.working_memory().is_some());
897 }
898
899 #[tokio::test]
900 async fn test_runtime_with_tool_registered() {
901 let runtime = AgentRuntime::builder()
902 .with_agent_config(simple_config())
903 .register_tool(ToolSpec::new("calc", "math", |_| serde_json::json!(99)))
904 .build();
905
906 let mut call_count = 0;
907 let session = runtime
908 .run_agent(AgentId::new("a"), "compute", move |_ctx: String| {
909 call_count += 1;
910 let count = call_count;
911 async move {
912 if count == 1 {
913 "Thought: use calc\nAction: calc {}".into()
914 } else {
915 "Thought: done\nAction: FINAL_ANSWER result".into()
916 }
917 }
918 })
919 .await
920 .unwrap();
921
922 assert!(session.step_count() >= 1);
923 }
924
925 #[tokio::test]
926 async fn test_run_agent_with_graph_relationship_lookup() {
927 let graph = GraphStore::new();
928 graph.add_entity(Entity::new("a", "X")).unwrap();
929 graph.add_entity(Entity::new("b", "Y")).unwrap();
930 graph
931 .add_relationship(Relationship::new("a", "b", "LINKS", 1.0))
932 .unwrap();
933
934 let runtime = AgentRuntime::builder()
935 .with_agent_config(simple_config())
936 .with_graph(graph)
937 .build();
938
939 let session = runtime
940 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
941 .await
942 .unwrap();
943
944 assert_eq!(session.graph_lookups, 2); }
946
947 #[tokio::test]
950 async fn test_metrics_active_sessions_decrements_after_run() {
951 let runtime = AgentRuntime::builder()
952 .with_agent_config(simple_config())
953 .build();
954
955 runtime
956 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
957 .await
958 .unwrap();
959
960 assert_eq!(runtime.metrics().active_sessions(), 0);
961 }
962
963 #[tokio::test]
964 async fn test_metrics_total_sessions_increments() {
965 let runtime = AgentRuntime::builder()
966 .with_agent_config(simple_config())
967 .build();
968
969 runtime
970 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
971 .await
972 .unwrap();
973 runtime
974 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
975 .await
976 .unwrap();
977
978 assert_eq!(runtime.metrics().total_sessions(), 2);
979 }
980
981 #[tokio::test]
982 async fn test_metrics_backpressure_shed_increments_on_shed() {
983 let guard = BackpressureGuard::new(1).unwrap();
984 guard.try_acquire().unwrap(); let runtime = AgentRuntime::builder()
987 .with_agent_config(simple_config())
988 .with_backpressure(guard)
989 .build();
990
991 let _ = runtime
992 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
993 .await;
994
995 assert_eq!(runtime.metrics().backpressure_shed_count(), 1);
996 }
997
998 #[tokio::test]
999 async fn test_metrics_memory_recall_count_increments() {
1000 let store = EpisodicStore::new();
1001 let agent = AgentId::new("a");
1002 store.add_episode(agent.clone(), "fact", 0.9).unwrap();
1003
1004 let runtime = AgentRuntime::builder()
1005 .with_agent_config(simple_config())
1006 .with_memory(store)
1007 .build();
1008
1009 runtime
1010 .run_agent(agent, "prompt", final_answer_infer)
1011 .await
1012 .unwrap();
1013
1014 assert_eq!(runtime.metrics().memory_recall_count(), 1);
1015 }
1016
1017 #[tokio::test]
1020 async fn test_agent_config_max_memory_tokens_limits_injection() {
1021 let store = EpisodicStore::new();
1022 let agent = AgentId::new("budget-agent");
1023 for i in 0..5 {
1025 let content = format!("{:0>100}", i); store.add_episode(agent.clone(), content, 0.9).unwrap();
1027 }
1028
1029 let cfg = AgentConfig::new(5, "test").with_max_memory_tokens(10);
1031 let runtime = AgentRuntime::builder()
1032 .with_agent_config(cfg)
1033 .with_memory(store)
1034 .build();
1035
1036 let session = runtime
1037 .run_agent(agent, "prompt", final_answer_infer)
1038 .await
1039 .unwrap();
1040
1041 assert!(
1042 session.memory_hits <= 1,
1043 "expected at most 1 memory hit with tight token budget, got {}",
1044 session.memory_hits
1045 );
1046 }
1047
1048 #[tokio::test]
1051 async fn test_working_memory_injected_into_prompt() {
1052 let wm = WorkingMemory::new(10).unwrap();
1053 wm.set("task", "write tests").unwrap();
1054 wm.set("status", "in progress").unwrap();
1055
1056 let runtime = AgentRuntime::builder()
1057 .with_agent_config(simple_config())
1058 .with_working_memory(wm)
1059 .build();
1060
1061 let mut captured_ctx: Option<String> = None;
1062 let captured_ref = &mut captured_ctx;
1063
1064 runtime
1065 .run_agent(AgentId::new("a"), "do stuff", |ctx: String| {
1066 *captured_ref = Some(ctx.clone());
1067 async move { "Thought: done\nAction: FINAL_ANSWER ok".to_string() }
1068 })
1069 .await
1070 .unwrap();
1071
1072 let ctx = captured_ctx.expect("infer should have been called");
1073 assert!(
1074 ctx.contains("Current working state:"),
1075 "expected working memory injection in context, got: {ctx}"
1076 );
1077 assert!(ctx.contains("task: write tests"));
1078 assert!(ctx.contains("status: in progress"));
1079 }
1080}