1use crate::agent::{AgentConfig, ReActLoop, ReActStep, ToolSpec};
20use crate::error::AgentRuntimeError;
21use crate::graph::GraphStore;
22use crate::memory::{AgentId, EpisodicStore, WorkingMemory};
23use crate::metrics::RuntimeMetrics;
24use crate::orchestrator::BackpressureGuard;
25use serde::{Deserialize, Serialize};
26use std::marker::PhantomData;
27use std::sync::atomic::Ordering;
28use std::sync::Arc;
29use std::time::Instant;
30
31pub struct NeedsConfig;
35pub struct HasConfig;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct AgentSession {
43 pub session_id: String,
45 pub agent_id: AgentId,
47 pub steps: Vec<ReActStep>,
49 pub memory_hits: usize,
51 pub graph_lookups: usize,
53 pub duration_ms: u64,
55}
56
57impl AgentSession {
58 pub fn step_count(&self) -> usize {
60 self.steps.len()
61 }
62
63 #[cfg(feature = "persistence")]
65 pub async fn save_checkpoint(
66 &self,
67 backend: &dyn crate::persistence::PersistenceBackend,
68 ) -> Result<(), AgentRuntimeError> {
69 let key = format!("session:{}", self.session_id);
70 let bytes = serde_json::to_vec(self)
71 .map_err(|e| AgentRuntimeError::Persistence(format!("serialize: {e}")))?;
72 backend.save(&key, &bytes).await
73 }
74
75 #[cfg(feature = "persistence")]
79 pub async fn load_checkpoint(
80 backend: &dyn crate::persistence::PersistenceBackend,
81 session_id: &str,
82 ) -> Result<Option<AgentSession>, AgentRuntimeError> {
83 let key = format!("session:{session_id}");
84 match backend.load(&key).await? {
85 None => Ok(None),
86 Some(bytes) => {
87 let session = serde_json::from_slice(&bytes)
88 .map_err(|e| AgentRuntimeError::Persistence(format!("deserialize: {e}")))?;
89 Ok(Some(session))
90 }
91 }
92 }
93
94 #[cfg(feature = "persistence")]
98 pub async fn load_step_checkpoint(
99 backend: &dyn crate::persistence::PersistenceBackend,
100 session_id: &str,
101 step: usize,
102 ) -> Result<Option<AgentSession>, AgentRuntimeError> {
103 let key = format!("session:{session_id}:step:{step}");
104 match backend.load(&key).await? {
105 None => Ok(None),
106 Some(bytes) => {
107 let session = serde_json::from_slice(&bytes)
108 .map_err(|e| AgentRuntimeError::Persistence(format!("deserialize: {e}")))?;
109 Ok(Some(session))
110 }
111 }
112 }
113}
114
115pub struct AgentRuntimeBuilder<S = NeedsConfig> {
131 memory: Option<EpisodicStore>,
132 working: Option<WorkingMemory>,
133 graph: Option<GraphStore>,
134 backpressure: Option<BackpressureGuard>,
135 agent_config: Option<AgentConfig>,
136 tools: Vec<Arc<ToolSpec>>,
137 metrics: Arc<RuntimeMetrics>,
138 #[cfg(feature = "persistence")]
139 checkpoint_backend: Option<Arc<dyn crate::persistence::PersistenceBackend>>,
140 _state: PhantomData<S>,
141}
142
143impl std::fmt::Debug for AgentRuntimeBuilder<NeedsConfig> {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 f.debug_struct("AgentRuntimeBuilder<NeedsConfig>")
146 .field("memory", &self.memory.is_some())
147 .field("working", &self.working.is_some())
148 .field("graph", &self.graph.is_some())
149 .field("backpressure", &self.backpressure.is_some())
150 .field("tools", &self.tools.len())
151 .finish()
152 }
153}
154
155impl std::fmt::Debug for AgentRuntimeBuilder<HasConfig> {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("AgentRuntimeBuilder<HasConfig>")
158 .field("memory", &self.memory.is_some())
159 .field("working", &self.working.is_some())
160 .field("graph", &self.graph.is_some())
161 .field("backpressure", &self.backpressure.is_some())
162 .field("agent_config", &self.agent_config.is_some())
163 .field("tools", &self.tools.len())
164 .finish()
165 }
166}
167
168impl Default for AgentRuntimeBuilder<NeedsConfig> {
169 fn default() -> Self {
170 Self {
171 memory: None,
172 working: None,
173 graph: None,
174 backpressure: None,
175 agent_config: None,
176 tools: Vec::new(),
177 metrics: RuntimeMetrics::new(),
178 #[cfg(feature = "persistence")]
179 checkpoint_backend: None,
180 _state: PhantomData,
181 }
182 }
183}
184
185impl<S> AgentRuntimeBuilder<S> {
187 pub fn with_memory(mut self, store: EpisodicStore) -> Self {
189 self.memory = Some(store);
190 self
191 }
192
193 pub fn with_working_memory(mut self, wm: WorkingMemory) -> Self {
195 self.working = Some(wm);
196 self
197 }
198
199 pub fn with_graph(mut self, graph: GraphStore) -> Self {
201 self.graph = Some(graph);
202 self
203 }
204
205 pub fn with_backpressure(mut self, guard: BackpressureGuard) -> Self {
207 self.backpressure = Some(guard);
208 self
209 }
210
211 pub fn register_tool(mut self, spec: ToolSpec) -> Self {
213 self.tools.push(Arc::new(spec));
214 self
215 }
216
217 pub fn with_metrics(mut self, metrics: Arc<RuntimeMetrics>) -> Self {
219 self.metrics = metrics;
220 self
221 }
222
223 #[cfg(feature = "persistence")]
225 pub fn with_checkpoint_backend(
226 mut self,
227 backend: Arc<dyn crate::persistence::PersistenceBackend>,
228 ) -> Self {
229 self.checkpoint_backend = Some(backend);
230 self
231 }
232}
233
234impl AgentRuntimeBuilder<NeedsConfig> {
236 pub fn new() -> Self {
238 Self::default()
239 }
240
241 pub fn with_agent_config(self, config: AgentConfig) -> AgentRuntimeBuilder<HasConfig> {
246 AgentRuntimeBuilder {
247 memory: self.memory,
248 working: self.working,
249 graph: self.graph,
250 backpressure: self.backpressure,
251 agent_config: Some(config),
252 tools: self.tools,
253 metrics: self.metrics,
254 #[cfg(feature = "persistence")]
255 checkpoint_backend: self.checkpoint_backend,
256 _state: PhantomData,
257 }
258 }
259}
260
261impl AgentRuntimeBuilder<HasConfig> {
263 pub fn build(self) -> AgentRuntime {
267 #[allow(clippy::unwrap_used)]
270 let agent_config = self.agent_config.unwrap();
271
272 AgentRuntime {
273 memory: self.memory,
274 working: self.working,
275 graph: self.graph,
276 backpressure: self.backpressure,
277 agent_config,
278 tools: self.tools,
279 metrics: self.metrics,
280 #[cfg(feature = "persistence")]
281 checkpoint_backend: self.checkpoint_backend,
282 }
283 }
284}
285
286pub struct AgentRuntime {
290 memory: Option<EpisodicStore>,
291 working: Option<WorkingMemory>,
292 graph: Option<GraphStore>,
293 backpressure: Option<BackpressureGuard>,
294 agent_config: AgentConfig,
295 tools: Vec<Arc<ToolSpec>>,
296 metrics: Arc<RuntimeMetrics>,
297 #[cfg(feature = "persistence")]
298 checkpoint_backend: Option<Arc<dyn crate::persistence::PersistenceBackend>>,
299}
300
301impl std::fmt::Debug for AgentRuntime {
302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303 let mut s = f.debug_struct("AgentRuntime");
304 s.field("memory", &self.memory.is_some())
305 .field("working", &self.working.is_some())
306 .field("graph", &self.graph.is_some())
307 .field("backpressure", &self.backpressure.is_some())
308 .field("tools", &self.tools.len());
309 #[cfg(feature = "persistence")]
310 s.field("checkpoint_backend", &self.checkpoint_backend.is_some());
311 s.finish()
312 }
313}
314
315impl AgentRuntime {
316 pub fn builder() -> AgentRuntimeBuilder<NeedsConfig> {
318 AgentRuntimeBuilder::new()
319 }
320
321 pub fn metrics(&self) -> Arc<RuntimeMetrics> {
323 Arc::clone(&self.metrics)
324 }
325
326 #[tracing::instrument(skip(self, infer), fields(agent_id = %agent_id))]
339 pub async fn run_agent<F, Fut>(
340 &self,
341 agent_id: AgentId,
342 prompt: &str,
343 infer: F,
344 ) -> Result<AgentSession, AgentRuntimeError>
345 where
346 F: FnMut(String) -> Fut,
347 Fut: std::future::Future<Output = String>,
348 {
349 self.metrics.total_sessions.fetch_add(1, Ordering::Relaxed);
350 self.metrics.active_sessions.fetch_add(1, Ordering::Relaxed);
351
352 let backpressure_result = if let Some(ref guard) = self.backpressure {
354 guard.try_acquire()
355 } else {
356 Ok(())
357 };
358
359 if let Err(e) = backpressure_result {
360 tracing::warn!(agent_id = %agent_id, error = %e, "backpressure shed: rejecting session");
361 self.metrics
362 .backpressure_shed_count
363 .fetch_add(1, Ordering::Relaxed);
364 self.metrics.active_sessions.fetch_sub(1, Ordering::Relaxed);
365 return Err(e);
366 }
367
368 tracing::info!(agent_id = %agent_id, "agent session starting");
369 let outcome = self.run_agent_inner(agent_id.clone(), prompt, infer).await;
370
371 if let Some(ref guard) = self.backpressure {
373 let _ = guard.release();
374 }
375
376 self.metrics.active_sessions.fetch_sub(1, Ordering::Relaxed);
377
378 match &outcome {
379 Ok(session) => {
380 tracing::info!(
381 agent_id = %agent_id,
382 session_id = %session.session_id,
383 steps = session.step_count(),
384 duration_ms = session.duration_ms,
385 "agent session completed"
386 );
387 self.metrics
388 .total_steps
389 .fetch_add(session.step_count() as u64, Ordering::Relaxed);
390 }
391 Err(e) => {
392 tracing::error!(agent_id = %agent_id, error = %e, "agent session failed");
393 }
394 }
395
396 outcome
397 }
398
399 #[tracing::instrument(skip(self, infer), fields(agent_id = %agent_id, session_id = tracing::field::Empty))]
401 async fn run_agent_inner<F, Fut>(
402 &self,
403 agent_id: AgentId,
404 prompt: &str,
405 infer: F,
406 ) -> Result<AgentSession, AgentRuntimeError>
407 where
408 F: FnMut(String) -> Fut,
409 Fut: std::future::Future<Output = String>,
410 {
411 let start = Instant::now();
412 let session_id = uuid::Uuid::new_v4().to_string();
413
414 let mut memory_hits = 0usize;
415 let mut graph_lookups = 0usize;
416
417 let enriched_prompt = if let Some(ref store) = self.memory {
419 let memories = store.recall(&agent_id, self.agent_config.max_memory_recalls)?;
420
421 let memories = if let Some(token_budget) = self.agent_config.max_memory_tokens {
423 let mut used = 0usize;
424 memories
425 .into_iter()
426 .filter(|m| {
427 let tokens = (m.content.len() / 4).max(1);
428 if used + tokens <= token_budget {
429 used += tokens;
430 true
431 } else {
432 false
433 }
434 })
435 .collect::<Vec<_>>()
436 } else {
437 memories
438 };
439
440 memory_hits = memories.len();
441 self.metrics
442 .memory_recall_count
443 .fetch_add(1, Ordering::Relaxed);
444
445 if let Some(budget) = self.agent_config.max_memory_tokens {
446 tracing::debug!(
447 "memory token budget: {budget}, injecting {} items",
448 memory_hits
449 );
450 } else {
451 tracing::debug!("enriched prompt with {} memory items", memory_hits);
452 }
453
454 if memories.is_empty() {
455 prompt.to_owned()
456 } else {
457 let mem_context: Vec<String> = memories
458 .iter()
459 .map(|m| format!("- {}", m.content))
460 .collect();
461 format!(
462 "Relevant memories:\n{}\n\nCurrent prompt: {prompt}",
463 mem_context.join("\n")
464 )
465 }
466 } else {
467 prompt.to_owned()
468 };
469
470 let enriched_prompt = if let Some(ref wm) = self.working {
472 let entries = wm.entries()?;
473 if entries.is_empty() {
474 enriched_prompt
475 } else {
476 let wm_context: Vec<String> = entries
477 .iter()
478 .map(|(k, v)| format!(" {k}: {v}"))
479 .collect();
480 format!(
481 "{enriched_prompt}\n\nCurrent working state:\n{}",
482 wm_context.join("\n")
483 )
484 }
485 } else {
486 enriched_prompt
487 };
488
489 if let Some(ref graph) = self.graph {
491 graph_lookups = graph.entity_count()?;
492 tracing::debug!("graph has {} entities", graph_lookups);
493 }
494
495 let mut react_loop = ReActLoop::new(self.agent_config.clone());
501 for tool in &self.tools {
502 let tool_arc = Arc::clone(tool);
503 let required_fields = tool_arc.required_fields.clone();
504 #[cfg(feature = "orchestrator")]
505 let circuit_breaker = tool_arc.circuit_breaker.clone();
506
507 let mut spec = ToolSpec::new_async(
508 tool_arc.name.clone(),
509 tool_arc.description.clone(),
510 move |args| {
511 let t = Arc::clone(&tool_arc);
512 Box::pin(async move { t.call(args).await })
513 },
514 )
515 .with_required_fields(required_fields);
516
517 #[cfg(feature = "orchestrator")]
518 if let Some(cb) = circuit_breaker {
519 spec = spec.with_circuit_breaker(cb);
520 }
521
522 react_loop.register_tool(spec);
523 }
524
525 tracing::Span::current().record("session_id", &session_id.as_str());
528
529 let steps = react_loop.run(&enriched_prompt, infer).await?;
530 let duration_ms = start.elapsed().as_millis() as u64;
531
532 let session = AgentSession {
533 session_id: session_id.clone(),
534 agent_id,
535 steps,
536 memory_hits,
537 graph_lookups,
538 duration_ms,
539 };
540
541 #[cfg(feature = "persistence")]
543 if let Some(ref backend) = self.checkpoint_backend {
544 tracing::info!(session_id = %session_id, "saving session checkpoint");
545 session.save_checkpoint(backend.as_ref()).await?;
546
547 for i in 1..=session.steps.len() {
549 let partial = AgentSession {
550 session_id: session_id.clone(),
551 agent_id: session.agent_id.clone(),
552 steps: session.steps[..i].to_vec(),
553 memory_hits: session.memory_hits,
554 graph_lookups: session.graph_lookups,
555 duration_ms: session.duration_ms,
556 };
557 let key = format!("session:{session_id}:step:{i}");
558 match serde_json::to_vec(&partial) {
559 Ok(bytes) => {
560 if let Err(e) = backend.save(&key, &bytes).await {
561 tracing::warn!(
562 session_id = %session_id,
563 step = i,
564 error = %e,
565 "failed to save step checkpoint"
566 );
567 }
568 }
569 Err(e) => {
570 tracing::warn!(
571 session_id = %session_id,
572 step = i,
573 error = %e,
574 "failed to serialize step checkpoint"
575 );
576 }
577 }
578 }
579 }
580
581 Ok(session)
582 }
583
584 pub fn memory(&self) -> Option<&EpisodicStore> {
586 self.memory.as_ref()
587 }
588
589 pub fn graph(&self) -> Option<&GraphStore> {
591 self.graph.as_ref()
592 }
593
594 pub fn working_memory(&self) -> Option<&WorkingMemory> {
596 self.working.as_ref()
597 }
598}
599
600#[cfg(test)]
603mod tests {
604 use super::*;
605 use crate::graph::{Entity, GraphStore, Relationship};
606 use crate::memory::EpisodicStore;
607
608 fn simple_config() -> AgentConfig {
609 AgentConfig::new(5, "test")
610 }
611
612 async fn final_answer_infer(_ctx: String) -> String {
613 "Thought: done\nAction: FINAL_ANSWER 42".into()
614 }
615
616 #[tokio::test]
627 async fn test_builder_with_config_compiles() {
628 let _runtime = AgentRuntime::builder()
629 .with_agent_config(simple_config())
630 .build();
631 }
633
634 #[tokio::test]
635 async fn test_builder_succeeds_with_minimal_config() {
636 let _runtime = AgentRuntime::builder()
637 .with_agent_config(simple_config())
638 .build();
639 }
640
641 #[tokio::test]
642 async fn test_builder_with_all_subsystems() {
643 let _runtime = AgentRuntime::builder()
644 .with_agent_config(simple_config())
645 .with_memory(EpisodicStore::new())
646 .with_graph(GraphStore::new())
647 .with_working_memory(WorkingMemory::new(10).unwrap())
648 .with_backpressure(BackpressureGuard::new(5).unwrap())
649 .build();
650 }
651
652 #[tokio::test]
653 async fn test_builder_produces_runtime_with_config() {
654 let runtime = AgentRuntime::builder()
657 .with_agent_config(simple_config())
658 .build();
659 let session = runtime
660 .run_agent(AgentId::new("agent-x"), "hello", final_answer_infer)
661 .await
662 .unwrap();
663 assert!(session.step_count() >= 1);
664 assert!(!session.session_id.is_empty());
665 }
666
667 #[tokio::test]
670 async fn test_run_agent_returns_session_with_steps() {
671 let runtime = AgentRuntime::builder()
672 .with_agent_config(simple_config())
673 .build();
674
675 let session = runtime
676 .run_agent(AgentId::new("agent-1"), "hello", final_answer_infer)
677 .await
678 .unwrap();
679
680 assert_eq!(session.step_count(), 1);
681 }
682
683 #[tokio::test]
684 async fn test_run_agent_session_has_agent_id() {
685 let runtime = AgentRuntime::builder()
686 .with_agent_config(simple_config())
687 .build();
688
689 let session = runtime
690 .run_agent(AgentId::new("agent-42"), "hello", final_answer_infer)
691 .await
692 .unwrap();
693
694 assert_eq!(session.agent_id.0, "agent-42");
695 }
696
697 #[tokio::test]
698 async fn test_run_agent_session_duration_is_set() {
699 let runtime = AgentRuntime::builder()
700 .with_agent_config(simple_config())
701 .build();
702
703 let session = runtime
704 .run_agent(AgentId::new("a"), "hello", final_answer_infer)
705 .await
706 .unwrap();
707
708 let _ = session.duration_ms; }
711
712 #[tokio::test]
713 async fn test_run_agent_session_has_session_id() {
714 let runtime = AgentRuntime::builder()
715 .with_agent_config(simple_config())
716 .build();
717
718 let session = runtime
719 .run_agent(AgentId::new("a"), "hello", final_answer_infer)
720 .await
721 .unwrap();
722
723 assert!(!session.session_id.is_empty());
725 assert_eq!(session.session_id.len(), 36); }
727
728 #[tokio::test]
729 async fn test_run_agent_memory_hits_zero_without_memory() {
730 let runtime = AgentRuntime::builder()
731 .with_agent_config(simple_config())
732 .build();
733
734 let session = runtime
735 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
736 .await
737 .unwrap();
738
739 assert_eq!(session.memory_hits, 0);
740 }
741
742 #[tokio::test]
743 async fn test_run_agent_memory_hits_counts_recalled_items() {
744 let store = EpisodicStore::new();
745 let agent = AgentId::new("mem-agent");
746 store
747 .add_episode(agent.clone(), "remembered fact", 0.8)
748 .unwrap();
749
750 let runtime = AgentRuntime::builder()
751 .with_agent_config(simple_config())
752 .with_memory(store)
753 .build();
754
755 let session = runtime
756 .run_agent(agent, "prompt", final_answer_infer)
757 .await
758 .unwrap();
759
760 assert_eq!(session.memory_hits, 1);
761 }
762
763 #[tokio::test]
764 async fn test_run_agent_graph_lookups_counts_entities() {
765 let graph = GraphStore::new();
766 graph.add_entity(Entity::new("e1", "Node")).unwrap();
767 graph.add_entity(Entity::new("e2", "Node")).unwrap();
768
769 let runtime = AgentRuntime::builder()
770 .with_agent_config(simple_config())
771 .with_graph(graph)
772 .build();
773
774 let session = runtime
775 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
776 .await
777 .unwrap();
778
779 assert_eq!(session.graph_lookups, 2);
780 }
781
782 #[tokio::test]
783 async fn test_run_agent_backpressure_released_after_run() {
784 let guard = BackpressureGuard::new(3).unwrap();
785
786 let runtime = AgentRuntime::builder()
787 .with_agent_config(simple_config())
788 .with_backpressure(guard.clone())
789 .build();
790
791 runtime
792 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
793 .await
794 .unwrap();
795
796 assert_eq!(guard.depth().unwrap(), 0);
797 }
798
799 #[tokio::test]
800 async fn test_run_agent_backpressure_sheds_when_full() {
801 let guard = BackpressureGuard::new(1).unwrap();
802 guard.try_acquire().unwrap(); let runtime = AgentRuntime::builder()
805 .with_agent_config(simple_config())
806 .with_backpressure(guard)
807 .build();
808
809 let result = runtime
810 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
811 .await;
812 assert!(matches!(
813 result,
814 Err(AgentRuntimeError::BackpressureShed { .. })
815 ));
816 }
817
818 #[tokio::test]
819 async fn test_run_agent_max_iterations_error_propagated() {
820 let cfg = AgentConfig::new(2, "model");
821 let runtime = AgentRuntime::builder().with_agent_config(cfg).build();
822
823 let result = runtime
825 .run_agent(AgentId::new("a"), "prompt", |_ctx: String| async {
826 "Thought: looping\nAction: FINAL_ANSWER done".to_string()
827 })
828 .await;
829 assert!(result.is_ok()); }
831
832 #[tokio::test]
833 async fn test_agent_session_step_count_matches_steps() {
834 let session = AgentSession {
835 session_id: "test-session-id".into(),
836 agent_id: AgentId::new("a"),
837 steps: vec![
838 ReActStep {
839 thought: "t".into(),
840 action: "a".into(),
841 observation: "o".into(),
842 },
843 ReActStep {
844 thought: "t2".into(),
845 action: "FINAL_ANSWER".into(),
846 observation: "done".into(),
847 },
848 ],
849 memory_hits: 0,
850 graph_lookups: 0,
851 duration_ms: 10,
852 };
853 assert_eq!(session.step_count(), 2);
854 }
855
856 #[tokio::test]
859 async fn test_runtime_memory_accessor_returns_none_when_not_configured() {
860 let runtime = AgentRuntime::builder()
861 .with_agent_config(simple_config())
862 .build();
863 assert!(runtime.memory().is_none());
864 }
865
866 #[tokio::test]
867 async fn test_runtime_memory_accessor_returns_some_when_configured() {
868 let runtime = AgentRuntime::builder()
869 .with_agent_config(simple_config())
870 .with_memory(EpisodicStore::new())
871 .build();
872 assert!(runtime.memory().is_some());
873 }
874
875 #[tokio::test]
876 async fn test_runtime_graph_accessor_returns_none_when_not_configured() {
877 let runtime = AgentRuntime::builder()
878 .with_agent_config(simple_config())
879 .build();
880 assert!(runtime.graph().is_none());
881 }
882
883 #[tokio::test]
884 async fn test_runtime_graph_accessor_returns_some_when_configured() {
885 let runtime = AgentRuntime::builder()
886 .with_agent_config(simple_config())
887 .with_graph(GraphStore::new())
888 .build();
889 assert!(runtime.graph().is_some());
890 }
891
892 #[tokio::test]
893 async fn test_runtime_working_memory_accessor() {
894 let runtime = AgentRuntime::builder()
895 .with_agent_config(simple_config())
896 .with_working_memory(WorkingMemory::new(5).unwrap())
897 .build();
898 assert!(runtime.working_memory().is_some());
899 }
900
901 #[tokio::test]
902 async fn test_runtime_with_tool_registered() {
903 let runtime = AgentRuntime::builder()
904 .with_agent_config(simple_config())
905 .register_tool(ToolSpec::new("calc", "math", |_| serde_json::json!(99)))
906 .build();
907
908 let mut call_count = 0;
909 let session = runtime
910 .run_agent(AgentId::new("a"), "compute", move |_ctx: String| {
911 call_count += 1;
912 let count = call_count;
913 async move {
914 if count == 1 {
915 "Thought: use calc\nAction: calc {}".into()
916 } else {
917 "Thought: done\nAction: FINAL_ANSWER result".into()
918 }
919 }
920 })
921 .await
922 .unwrap();
923
924 assert!(session.step_count() >= 1);
925 }
926
927 #[tokio::test]
928 async fn test_run_agent_with_graph_relationship_lookup() {
929 let graph = GraphStore::new();
930 graph.add_entity(Entity::new("a", "X")).unwrap();
931 graph.add_entity(Entity::new("b", "Y")).unwrap();
932 graph
933 .add_relationship(Relationship::new("a", "b", "LINKS", 1.0))
934 .unwrap();
935
936 let runtime = AgentRuntime::builder()
937 .with_agent_config(simple_config())
938 .with_graph(graph)
939 .build();
940
941 let session = runtime
942 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
943 .await
944 .unwrap();
945
946 assert_eq!(session.graph_lookups, 2); }
948
949 #[tokio::test]
952 async fn test_metrics_active_sessions_decrements_after_run() {
953 let runtime = AgentRuntime::builder()
954 .with_agent_config(simple_config())
955 .build();
956
957 runtime
958 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
959 .await
960 .unwrap();
961
962 assert_eq!(runtime.metrics().active_sessions(), 0);
963 }
964
965 #[tokio::test]
966 async fn test_metrics_total_sessions_increments() {
967 let runtime = AgentRuntime::builder()
968 .with_agent_config(simple_config())
969 .build();
970
971 runtime
972 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
973 .await
974 .unwrap();
975 runtime
976 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
977 .await
978 .unwrap();
979
980 assert_eq!(runtime.metrics().total_sessions(), 2);
981 }
982
983 #[tokio::test]
984 async fn test_metrics_backpressure_shed_increments_on_shed() {
985 let guard = BackpressureGuard::new(1).unwrap();
986 guard.try_acquire().unwrap(); let runtime = AgentRuntime::builder()
989 .with_agent_config(simple_config())
990 .with_backpressure(guard)
991 .build();
992
993 let _ = runtime
994 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
995 .await;
996
997 assert_eq!(runtime.metrics().backpressure_shed_count(), 1);
998 }
999
1000 #[tokio::test]
1001 async fn test_metrics_memory_recall_count_increments() {
1002 let store = EpisodicStore::new();
1003 let agent = AgentId::new("a");
1004 store.add_episode(agent.clone(), "fact", 0.9).unwrap();
1005
1006 let runtime = AgentRuntime::builder()
1007 .with_agent_config(simple_config())
1008 .with_memory(store)
1009 .build();
1010
1011 runtime
1012 .run_agent(agent, "prompt", final_answer_infer)
1013 .await
1014 .unwrap();
1015
1016 assert_eq!(runtime.metrics().memory_recall_count(), 1);
1017 }
1018
1019 #[tokio::test]
1022 async fn test_agent_config_max_memory_tokens_limits_injection() {
1023 let store = EpisodicStore::new();
1024 let agent = AgentId::new("budget-agent");
1025 for i in 0..5 {
1027 let content = format!("{:0>100}", i); store.add_episode(agent.clone(), content, 0.9).unwrap();
1029 }
1030
1031 let cfg = AgentConfig::new(5, "test").with_max_memory_tokens(10);
1033 let runtime = AgentRuntime::builder()
1034 .with_agent_config(cfg)
1035 .with_memory(store)
1036 .build();
1037
1038 let session = runtime
1039 .run_agent(agent, "prompt", final_answer_infer)
1040 .await
1041 .unwrap();
1042
1043 assert!(
1044 session.memory_hits <= 1,
1045 "expected at most 1 memory hit with tight token budget, got {}",
1046 session.memory_hits
1047 );
1048 }
1049
1050 #[tokio::test]
1053 async fn test_working_memory_injected_into_prompt() {
1054 let wm = WorkingMemory::new(10).unwrap();
1055 wm.set("task", "write tests").unwrap();
1056 wm.set("status", "in progress").unwrap();
1057
1058 let runtime = AgentRuntime::builder()
1059 .with_agent_config(simple_config())
1060 .with_working_memory(wm)
1061 .build();
1062
1063 let mut captured_ctx: Option<String> = None;
1064 let captured_ref = &mut captured_ctx;
1065
1066 runtime
1067 .run_agent(AgentId::new("a"), "do stuff", |ctx: String| {
1068 *captured_ref = Some(ctx.clone());
1069 async move { "Thought: done\nAction: FINAL_ANSWER ok".to_string() }
1070 })
1071 .await
1072 .unwrap();
1073
1074 let ctx = captured_ctx.expect("infer should have been called");
1075 assert!(
1076 ctx.contains("Current working state:"),
1077 "expected working memory injection in context, got: {ctx}"
1078 );
1079 assert!(ctx.contains("task: write tests"));
1080 assert!(ctx.contains("status: in progress"));
1081 }
1082}