1use crate::agent::{AgentConfig, ReActLoop, ReActStep, ToolSpec};
20use crate::error::AgentRuntimeError;
21use crate::memory::{AgentId, EpisodicStore, WorkingMemory};
22use crate::metrics::RuntimeMetrics;
23use serde::{Deserialize, Serialize};
24use std::marker::PhantomData;
25use std::sync::atomic::Ordering;
26use std::sync::Arc;
27use std::time::Instant;
28
29#[cfg(feature = "graph")]
30use crate::graph::GraphStore;
31
32#[cfg(feature = "orchestrator")]
33use crate::orchestrator::BackpressureGuard;
34
35pub struct NeedsConfig;
39pub struct HasConfig;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct AgentSession {
47 pub session_id: String,
49 pub agent_id: AgentId,
51 pub steps: Vec<ReActStep>,
53 pub memory_hits: usize,
55 pub graph_lookups: usize,
57 pub duration_ms: u64,
59 #[serde(default)]
65 pub checkpoint_errors: Vec<String>,
66}
67
68impl AgentSession {
69 pub fn step_count(&self) -> usize {
80 self.steps.len()
81 }
82
83 pub fn final_answer(&self) -> Option<String> {
88 let last = self.steps.last()?;
89 let upper = last.action.trim().to_ascii_uppercase();
90 if upper.starts_with("FINAL_ANSWER") {
91 let answer = last.action.trim()["FINAL_ANSWER".len()..].trim().to_owned();
92 Some(answer)
93 } else {
94 None
95 }
96 }
97
98 #[cfg(feature = "persistence")]
100 pub async fn save_checkpoint(
101 &self,
102 backend: &dyn crate::persistence::PersistenceBackend,
103 ) -> Result<(), AgentRuntimeError> {
104 let key = format!("session:{}", self.session_id);
105 let bytes = serde_json::to_vec(self)
106 .map_err(|e| AgentRuntimeError::Persistence(format!("serialize: {e}")))?;
107 backend.save(&key, &bytes).await
108 }
109
110 #[cfg(feature = "persistence")]
114 pub async fn load_checkpoint(
115 backend: &dyn crate::persistence::PersistenceBackend,
116 session_id: &str,
117 ) -> Result<Option<AgentSession>, AgentRuntimeError> {
118 let key = format!("session:{session_id}");
119 match backend.load(&key).await? {
120 None => Ok(None),
121 Some(bytes) => {
122 let session = serde_json::from_slice(&bytes)
123 .map_err(|e| AgentRuntimeError::Persistence(format!("deserialize: {e}")))?;
124 Ok(Some(session))
125 }
126 }
127 }
128
129 #[cfg(feature = "persistence")]
136 pub async fn load_step_checkpoint(
137 backend: &dyn crate::persistence::PersistenceBackend,
138 session_id: &str,
139 step: usize,
140 ) -> Result<Option<AgentSession>, AgentRuntimeError> {
141 Self::load_checkpoint_at_step(backend, session_id, step).await
142 }
143
144 #[cfg(feature = "persistence")]
149 pub async fn load_checkpoint_at_step(
150 backend: &dyn crate::persistence::PersistenceBackend,
151 session_id: &str,
152 step: usize,
153 ) -> Result<Option<AgentSession>, AgentRuntimeError> {
154 let key = format!("session:{session_id}:step:{step}");
155 match backend.load(&key).await? {
156 None => Ok(None),
157 Some(bytes) => {
158 let session = serde_json::from_slice(&bytes)
159 .map_err(|e| AgentRuntimeError::Persistence(format!("deserialize: {e}")))?;
160 Ok(Some(session))
161 }
162 }
163 }
164}
165
166pub struct AgentRuntimeBuilder<S = NeedsConfig> {
183 memory: Option<EpisodicStore>,
184 working: Option<WorkingMemory>,
185 #[cfg(feature = "graph")]
186 graph: Option<GraphStore>,
187 #[cfg(feature = "orchestrator")]
188 backpressure: Option<BackpressureGuard>,
189 agent_config: Option<AgentConfig>,
190 tools: Vec<Arc<ToolSpec>>,
191 metrics: Arc<RuntimeMetrics>,
192 #[cfg(feature = "persistence")]
193 checkpoint_backend: Option<Arc<dyn crate::persistence::PersistenceBackend>>,
194 _state: PhantomData<S>,
195}
196
197impl std::fmt::Debug for AgentRuntimeBuilder<NeedsConfig> {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 let mut s = f.debug_struct("AgentRuntimeBuilder<NeedsConfig>");
200 s.field("memory", &self.memory.is_some())
201 .field("working", &self.working.is_some());
202 #[cfg(feature = "graph")]
203 s.field("graph", &self.graph.is_some());
204 #[cfg(feature = "orchestrator")]
205 s.field("backpressure", &self.backpressure.is_some());
206 s.field("tools", &self.tools.len()).finish()
207 }
208}
209
210impl std::fmt::Debug for AgentRuntimeBuilder<HasConfig> {
211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 let mut s = f.debug_struct("AgentRuntimeBuilder<HasConfig>");
213 s.field("memory", &self.memory.is_some())
214 .field("working", &self.working.is_some());
215 #[cfg(feature = "graph")]
216 s.field("graph", &self.graph.is_some());
217 #[cfg(feature = "orchestrator")]
218 s.field("backpressure", &self.backpressure.is_some());
219 s.field("agent_config", &self.agent_config.is_some())
220 .field("tools", &self.tools.len())
221 .finish()
222 }
223}
224
225impl Default for AgentRuntimeBuilder<NeedsConfig> {
226 fn default() -> Self {
227 Self {
228 memory: None,
229 working: None,
230 #[cfg(feature = "graph")]
231 graph: None,
232 #[cfg(feature = "orchestrator")]
233 backpressure: None,
234 agent_config: None,
235 tools: Vec::new(),
236 metrics: RuntimeMetrics::new(),
237 #[cfg(feature = "persistence")]
238 checkpoint_backend: None,
239 _state: PhantomData,
240 }
241 }
242}
243
244impl<S> AgentRuntimeBuilder<S> {
246 pub fn with_memory(mut self, store: EpisodicStore) -> Self {
248 self.memory = Some(store);
249 self
250 }
251
252 pub fn with_working_memory(mut self, wm: WorkingMemory) -> Self {
254 self.working = Some(wm);
255 self
256 }
257
258 #[cfg(feature = "graph")]
260 pub fn with_graph(mut self, graph: GraphStore) -> Self {
261 self.graph = Some(graph);
262 self
263 }
264
265 #[cfg(feature = "orchestrator")]
267 pub fn with_backpressure(mut self, guard: BackpressureGuard) -> Self {
268 self.backpressure = Some(guard);
269 self
270 }
271
272 pub fn register_tool(mut self, spec: ToolSpec) -> Self {
274 self.tools.push(Arc::new(spec));
275 self
276 }
277
278 pub fn register_tools(mut self, specs: impl IntoIterator<Item = ToolSpec>) -> Self {
284 for spec in specs {
285 self.tools.push(Arc::new(spec));
286 }
287 self
288 }
289
290 pub fn with_metrics(mut self, metrics: Arc<RuntimeMetrics>) -> Self {
292 self.metrics = metrics;
293 self
294 }
295
296 #[cfg(feature = "persistence")]
298 pub fn with_checkpoint_backend(
299 mut self,
300 backend: Arc<dyn crate::persistence::PersistenceBackend>,
301 ) -> Self {
302 self.checkpoint_backend = Some(backend);
303 self
304 }
305}
306
307impl AgentRuntimeBuilder<NeedsConfig> {
309 pub fn new() -> Self {
311 Self::default()
312 }
313
314 pub fn with_agent_config(self, config: AgentConfig) -> AgentRuntimeBuilder<HasConfig> {
323 AgentRuntimeBuilder {
324 memory: self.memory,
325 working: self.working,
326 #[cfg(feature = "graph")]
327 graph: self.graph,
328 #[cfg(feature = "orchestrator")]
329 backpressure: self.backpressure,
330 agent_config: Some(config),
331 tools: self.tools,
332 metrics: self.metrics,
333 #[cfg(feature = "persistence")]
334 checkpoint_backend: self.checkpoint_backend,
335 _state: PhantomData,
336 }
337 }
338}
339
340impl AgentRuntimeBuilder<HasConfig> {
342 pub fn build(self) -> AgentRuntime {
346 #[allow(clippy::unwrap_used)]
349 let agent_config = self.agent_config.unwrap();
350
351 AgentRuntime {
352 memory: self.memory,
353 working: self.working,
354 #[cfg(feature = "graph")]
355 graph: self.graph,
356 #[cfg(feature = "orchestrator")]
357 backpressure: self.backpressure,
358 agent_config,
359 tools: self.tools,
360 metrics: self.metrics,
361 #[cfg(feature = "persistence")]
362 checkpoint_backend: self.checkpoint_backend,
363 }
364 }
365}
366
367pub struct AgentRuntime {
371 memory: Option<EpisodicStore>,
372 working: Option<WorkingMemory>,
373 #[cfg(feature = "graph")]
374 graph: Option<GraphStore>,
375 #[cfg(feature = "orchestrator")]
376 backpressure: Option<BackpressureGuard>,
377 agent_config: AgentConfig,
378 tools: Vec<Arc<ToolSpec>>,
379 metrics: Arc<RuntimeMetrics>,
380 #[cfg(feature = "persistence")]
381 checkpoint_backend: Option<Arc<dyn crate::persistence::PersistenceBackend>>,
382}
383
384impl std::fmt::Debug for AgentRuntime {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 let mut s = f.debug_struct("AgentRuntime");
387 s.field("memory", &self.memory.is_some())
388 .field("working", &self.working.is_some());
389 #[cfg(feature = "graph")]
390 s.field("graph", &self.graph.is_some());
391 #[cfg(feature = "orchestrator")]
392 s.field("backpressure", &self.backpressure.is_some());
393 s.field("tools", &self.tools.len());
394 #[cfg(feature = "persistence")]
395 s.field("checkpoint_backend", &self.checkpoint_backend.is_some());
396 s.finish()
397 }
398}
399
400impl AgentRuntime {
401 pub fn builder() -> AgentRuntimeBuilder<NeedsConfig> {
403 AgentRuntimeBuilder::new()
404 }
405
406 pub fn quick(max_iterations: usize, model: impl Into<String>) -> Self {
408 AgentRuntime::builder()
409 .with_agent_config(AgentConfig::new(max_iterations, model))
410 .build()
411 }
412
413 pub fn metrics(&self) -> Arc<RuntimeMetrics> {
415 Arc::clone(&self.metrics)
416 }
417
418 #[tracing::instrument(skip(self, infer), fields(agent_id = %agent_id))]
431 pub async fn run_agent<F, Fut>(
432 &self,
433 agent_id: AgentId,
434 prompt: &str,
435 infer: F,
436 ) -> Result<AgentSession, AgentRuntimeError>
437 where
438 F: FnMut(String) -> Fut,
439 Fut: std::future::Future<Output = String>,
440 {
441 #[cfg(feature = "orchestrator")]
444 {
445 let backpressure_result = if let Some(ref guard) = self.backpressure {
446 guard.try_acquire()
447 } else {
448 Ok(())
449 };
450 if let Err(e) = backpressure_result {
451 tracing::warn!(agent_id = %agent_id, error = %e, "backpressure shed: rejecting session");
452 self.metrics
453 .backpressure_shed_count
454 .fetch_add(1, Ordering::Relaxed);
455 return Err(e);
456 }
457 }
458
459 self.metrics.total_sessions.fetch_add(1, Ordering::Relaxed);
460 self.metrics.active_sessions.fetch_add(1, Ordering::Relaxed);
461
462 tracing::info!(agent_id = %agent_id, "agent session starting");
463 let outcome = self.run_agent_inner(agent_id.clone(), prompt, infer).await;
464
465 #[cfg(feature = "orchestrator")]
467 if let Some(ref guard) = self.backpressure {
468 let _ = guard.release();
469 }
470
471 let _ = self.metrics.active_sessions.fetch_update(
474 Ordering::Relaxed,
475 Ordering::Relaxed,
476 |v| Some(v.saturating_sub(1)),
477 );
478
479 match &outcome {
480 Ok(session) => {
481 tracing::info!(
482 agent_id = %agent_id,
483 session_id = %session.session_id,
484 steps = session.step_count(),
485 duration_ms = session.duration_ms,
486 "agent session completed"
487 );
488 self.metrics
489 .total_steps
490 .fetch_add(session.step_count() as u64, Ordering::Relaxed);
491 }
492 Err(e) => {
493 tracing::error!(agent_id = %agent_id, error = %e, "agent session failed");
494 }
495 }
496
497 outcome
498 }
499
500 #[tracing::instrument(skip(self, infer), fields(agent_id = %agent_id, session_id = tracing::field::Empty))]
502 async fn run_agent_inner<F, Fut>(
503 &self,
504 agent_id: AgentId,
505 prompt: &str,
506 infer: F,
507 ) -> Result<AgentSession, AgentRuntimeError>
508 where
509 F: FnMut(String) -> Fut,
510 Fut: std::future::Future<Output = String>,
511 {
512 let start = Instant::now();
513 let session_id = uuid::Uuid::new_v4().to_string();
514
515 let mut memory_hits = 0usize;
516 let mut graph_lookups = 0usize;
517
518 let enriched_prompt = if let Some(ref store) = self.memory {
520 let memories = store.recall(&agent_id, self.agent_config.max_memory_recalls)?;
521
522 let memories = if let Some(token_budget) = self.agent_config.max_memory_tokens {
524 let mut used = 0usize;
525 memories
526 .into_iter()
527 .filter(|m| {
528 let tokens = (m.content.len() / 4).max(1);
529 if used + tokens <= token_budget {
530 used += tokens;
531 true
532 } else {
533 false
534 }
535 })
536 .collect::<Vec<_>>()
537 } else {
538 memories
539 };
540
541 memory_hits = memories.len();
542 self.metrics
543 .memory_recall_count
544 .fetch_add(1, Ordering::Relaxed);
545
546 if let Some(budget) = self.agent_config.max_memory_tokens {
547 tracing::debug!(
548 "memory token budget: {budget}, injecting {} items",
549 memory_hits
550 );
551 } else {
552 tracing::debug!("enriched prompt with {} memory items", memory_hits);
553 }
554
555 if memories.is_empty() {
556 prompt.to_owned()
557 } else {
558 let mem_context: Vec<String> = memories
559 .iter()
560 .map(|m| format!("- {}", m.content))
561 .collect();
562 format!(
563 "Relevant memories:\n{}\n\nCurrent prompt: {prompt}",
564 mem_context.join("\n")
565 )
566 }
567 } else {
568 prompt.to_owned()
569 };
570
571 let enriched_prompt = if let Some(ref wm) = self.working {
573 let entries = wm.entries()?;
574 if entries.is_empty() {
575 enriched_prompt
576 } else {
577 let wm_context: Vec<String> =
578 entries.iter().map(|(k, v)| format!(" {k}: {v}")).collect();
579 format!(
580 "{enriched_prompt}\n\nCurrent working state:\n{}",
581 wm_context.join("\n")
582 )
583 }
584 } else {
585 enriched_prompt
586 };
587
588 #[cfg(feature = "graph")]
590 if let Some(ref graph) = self.graph {
591 graph_lookups = graph.entity_count()?;
592 tracing::debug!("graph has {} entities", graph_lookups);
593 }
594
595 let mut react_loop = ReActLoop::new(self.agent_config.clone())
601 .with_metrics(Arc::clone(&self.metrics));
602
603 #[cfg(feature = "persistence")]
605 if let Some(ref backend) = self.checkpoint_backend {
606 react_loop = react_loop
607 .with_step_checkpoint(Arc::clone(backend), session_id.clone());
608 }
609
610 for tool in &self.tools {
611 let tool_arc = Arc::clone(tool);
612 let required_fields = tool_arc.required_fields.clone();
613 #[cfg(feature = "orchestrator")]
614 let circuit_breaker = tool_arc.circuit_breaker.clone();
615
616 let mut spec = ToolSpec::new_async(
617 tool_arc.name.clone(),
618 tool_arc.description.clone(),
619 move |args| {
620 let t = Arc::clone(&tool_arc);
621 Box::pin(async move { t.call(args).await })
622 },
623 )
624 .with_required_fields(required_fields);
625
626 #[cfg(feature = "orchestrator")]
627 if let Some(cb) = circuit_breaker {
628 spec = spec.with_circuit_breaker(cb);
629 }
630
631 react_loop.register_tool(spec);
632 }
633
634 tracing::Span::current().record("session_id", &session_id.as_str());
637
638 let steps = react_loop.run(&enriched_prompt, infer).await?;
639 let duration_ms = start.elapsed().as_millis() as u64;
640
641 #[cfg(feature = "persistence")]
643 let mut ckpt_errors: Vec<String> = Vec::new();
644
645 #[cfg(feature = "persistence")]
647 if let Some(ref backend) = self.checkpoint_backend {
648 tracing::info!(session_id = %session_id, "saving session checkpoint");
649
650 let tmp = AgentSession {
652 session_id: session_id.clone(),
653 agent_id: agent_id.clone(),
654 steps: steps.clone(),
655 memory_hits,
656 graph_lookups,
657 duration_ms,
658 checkpoint_errors: vec![],
659 };
660 tmp.save_checkpoint(backend.as_ref()).await?;
661
662 for i in 1..=steps.len() {
664 let partial = AgentSession {
665 session_id: session_id.clone(),
666 agent_id: agent_id.clone(),
667 steps: steps[..i].to_vec(),
668 memory_hits,
669 graph_lookups,
670 duration_ms,
671 checkpoint_errors: vec![],
672 };
673 let key = format!("session:{session_id}:step:{i}");
674 match serde_json::to_vec(&partial) {
675 Ok(bytes) => {
676 if let Err(e) = backend.save(&key, &bytes).await {
677 let msg = format!("session:{session_id} step:{i} save: {e}");
678 tracing::warn!("{}", msg);
679 ckpt_errors.push(msg);
680 }
681 }
682 Err(e) => {
683 let msg =
684 format!("session:{session_id} step:{i} serialise: {e}");
685 tracing::warn!("{}", msg);
686 ckpt_errors.push(msg);
687 }
688 }
689 }
690 }
691
692 let session = AgentSession {
693 session_id,
694 agent_id,
695 steps,
696 memory_hits,
697 graph_lookups,
698 duration_ms,
699 #[cfg(feature = "persistence")]
700 checkpoint_errors: ckpt_errors,
701 #[cfg(not(feature = "persistence"))]
702 checkpoint_errors: vec![],
703 };
704
705 Ok(session)
706 }
707
708 pub fn memory(&self) -> Option<&EpisodicStore> {
710 self.memory.as_ref()
711 }
712
713 #[cfg(feature = "graph")]
715 pub fn graph(&self) -> Option<&GraphStore> {
716 self.graph.as_ref()
717 }
718
719 pub fn working_memory(&self) -> Option<&WorkingMemory> {
721 self.working.as_ref()
722 }
723
724 pub async fn shutdown(&self) {
732 tracing::info!("AgentRuntime shutting down");
733 tracing::info!(
734 active_sessions = self.metrics.active_sessions(),
735 total_sessions = self.metrics.total_sessions(),
736 total_steps = self.metrics.total_steps(),
737 total_tool_calls = self.metrics.total_tool_calls(),
738 failed_tool_calls = self.metrics.failed_tool_calls(),
739 "final metrics snapshot on shutdown"
740 );
741
742 #[cfg(feature = "persistence")]
743 if let Some(ref backend) = self.checkpoint_backend {
744 let ts = chrono::Utc::now().to_rfc3339();
745 match backend.save("runtime:shutdown", ts.as_bytes()).await {
746 Ok(()) => tracing::debug!("shutdown sentinel saved"),
747 Err(e) => tracing::warn!(error = %e, "failed to save shutdown sentinel"),
748 }
749 }
750
751 tracing::info!("AgentRuntime shutdown complete");
752 }
753}
754
755#[cfg(test)]
758mod tests {
759 use super::*;
760 use crate::graph::{Entity, GraphStore, Relationship};
761 use crate::memory::EpisodicStore;
762
763 fn simple_config() -> AgentConfig {
764 AgentConfig::new(5, "test")
765 }
766
767 async fn final_answer_infer(_ctx: String) -> String {
768 "Thought: done\nAction: FINAL_ANSWER 42".into()
769 }
770
771 #[tokio::test]
782 async fn test_builder_with_config_compiles() {
783 let _runtime = AgentRuntime::builder()
784 .with_agent_config(simple_config())
785 .build();
786 }
788
789 #[tokio::test]
790 async fn test_builder_succeeds_with_minimal_config() {
791 let _runtime = AgentRuntime::builder()
792 .with_agent_config(simple_config())
793 .build();
794 }
795
796 #[tokio::test]
797 async fn test_builder_with_all_subsystems() {
798 let _runtime = AgentRuntime::builder()
799 .with_agent_config(simple_config())
800 .with_memory(EpisodicStore::new())
801 .with_graph(GraphStore::new())
802 .with_working_memory(WorkingMemory::new(10).unwrap())
803 .with_backpressure(BackpressureGuard::new(5).unwrap())
804 .build();
805 }
806
807 #[tokio::test]
808 async fn test_builder_produces_runtime_with_config() {
809 let runtime = AgentRuntime::builder()
812 .with_agent_config(simple_config())
813 .build();
814 let session = runtime
815 .run_agent(AgentId::new("agent-x"), "hello", final_answer_infer)
816 .await
817 .unwrap();
818 assert!(session.step_count() >= 1);
819 assert!(!session.session_id.is_empty());
820 }
821
822 #[tokio::test]
825 async fn test_run_agent_returns_session_with_steps() {
826 let runtime = AgentRuntime::builder()
827 .with_agent_config(simple_config())
828 .build();
829
830 let session = runtime
831 .run_agent(AgentId::new("agent-1"), "hello", final_answer_infer)
832 .await
833 .unwrap();
834
835 assert_eq!(session.step_count(), 1);
836 }
837
838 #[tokio::test]
839 async fn test_run_agent_session_has_agent_id() {
840 let runtime = AgentRuntime::builder()
841 .with_agent_config(simple_config())
842 .build();
843
844 let session = runtime
845 .run_agent(AgentId::new("agent-42"), "hello", final_answer_infer)
846 .await
847 .unwrap();
848
849 assert_eq!(session.agent_id.0, "agent-42");
850 }
851
852 #[tokio::test]
853 async fn test_run_agent_session_duration_is_set() {
854 let runtime = AgentRuntime::builder()
855 .with_agent_config(simple_config())
856 .build();
857
858 let session = runtime
859 .run_agent(AgentId::new("a"), "hello", final_answer_infer)
860 .await
861 .unwrap();
862
863 let _ = session.duration_ms; }
866
867 #[tokio::test]
868 async fn test_run_agent_session_has_session_id() {
869 let runtime = AgentRuntime::builder()
870 .with_agent_config(simple_config())
871 .build();
872
873 let session = runtime
874 .run_agent(AgentId::new("a"), "hello", final_answer_infer)
875 .await
876 .unwrap();
877
878 assert!(!session.session_id.is_empty());
880 assert_eq!(session.session_id.len(), 36); }
882
883 #[tokio::test]
884 async fn test_run_agent_memory_hits_zero_without_memory() {
885 let runtime = AgentRuntime::builder()
886 .with_agent_config(simple_config())
887 .build();
888
889 let session = runtime
890 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
891 .await
892 .unwrap();
893
894 assert_eq!(session.memory_hits, 0);
895 }
896
897 #[tokio::test]
898 async fn test_run_agent_memory_hits_counts_recalled_items() {
899 let store = EpisodicStore::new();
900 let agent = AgentId::new("mem-agent");
901 store
902 .add_episode(agent.clone(), "remembered fact", 0.8)
903 .unwrap();
904
905 let runtime = AgentRuntime::builder()
906 .with_agent_config(simple_config())
907 .with_memory(store)
908 .build();
909
910 let session = runtime
911 .run_agent(agent, "prompt", final_answer_infer)
912 .await
913 .unwrap();
914
915 assert_eq!(session.memory_hits, 1);
916 }
917
918 #[tokio::test]
919 async fn test_run_agent_graph_lookups_counts_entities() {
920 let graph = GraphStore::new();
921 graph.add_entity(Entity::new("e1", "Node")).unwrap();
922 graph.add_entity(Entity::new("e2", "Node")).unwrap();
923
924 let runtime = AgentRuntime::builder()
925 .with_agent_config(simple_config())
926 .with_graph(graph)
927 .build();
928
929 let session = runtime
930 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
931 .await
932 .unwrap();
933
934 assert_eq!(session.graph_lookups, 2);
935 }
936
937 #[tokio::test]
938 async fn test_run_agent_backpressure_released_after_run() {
939 let guard = BackpressureGuard::new(3).unwrap();
940
941 let runtime = AgentRuntime::builder()
942 .with_agent_config(simple_config())
943 .with_backpressure(guard.clone())
944 .build();
945
946 runtime
947 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
948 .await
949 .unwrap();
950
951 assert_eq!(guard.depth().unwrap(), 0);
952 }
953
954 #[tokio::test]
955 async fn test_run_agent_backpressure_sheds_when_full() {
956 let guard = BackpressureGuard::new(1).unwrap();
957 guard.try_acquire().unwrap(); let runtime = AgentRuntime::builder()
960 .with_agent_config(simple_config())
961 .with_backpressure(guard)
962 .build();
963
964 let result = runtime
965 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
966 .await;
967 assert!(matches!(
968 result,
969 Err(AgentRuntimeError::BackpressureShed { .. })
970 ));
971 }
972
973 #[tokio::test]
974 async fn test_run_agent_max_iterations_error_propagated() {
975 let cfg = AgentConfig::new(2, "model");
976 let runtime = AgentRuntime::builder().with_agent_config(cfg).build();
977
978 let result = runtime
980 .run_agent(AgentId::new("a"), "prompt", |_ctx: String| async {
981 "Thought: looping\nAction: FINAL_ANSWER done".to_string()
982 })
983 .await;
984 assert!(result.is_ok()); }
986
987 #[tokio::test]
988 async fn test_agent_session_step_count_matches_steps() {
989 let session = AgentSession {
990 session_id: "test-session-id".into(),
991 agent_id: AgentId::new("a"),
992 steps: vec![
993 ReActStep {
994 thought: "t".into(),
995 action: "a".into(),
996 observation: "o".into(),
997 step_duration_ms: 0,
998 },
999 ReActStep {
1000 thought: "t2".into(),
1001 action: "FINAL_ANSWER".into(),
1002 observation: "done".into(),
1003 step_duration_ms: 0,
1004 },
1005 ],
1006 memory_hits: 0,
1007 graph_lookups: 0,
1008 duration_ms: 10,
1009 checkpoint_errors: vec![],
1010 };
1011 assert_eq!(session.step_count(), 2);
1012 }
1013
1014 #[tokio::test]
1017 async fn test_runtime_memory_accessor_returns_none_when_not_configured() {
1018 let runtime = AgentRuntime::builder()
1019 .with_agent_config(simple_config())
1020 .build();
1021 assert!(runtime.memory().is_none());
1022 }
1023
1024 #[tokio::test]
1025 async fn test_runtime_memory_accessor_returns_some_when_configured() {
1026 let runtime = AgentRuntime::builder()
1027 .with_agent_config(simple_config())
1028 .with_memory(EpisodicStore::new())
1029 .build();
1030 assert!(runtime.memory().is_some());
1031 }
1032
1033 #[tokio::test]
1034 async fn test_runtime_graph_accessor_returns_none_when_not_configured() {
1035 let runtime = AgentRuntime::builder()
1036 .with_agent_config(simple_config())
1037 .build();
1038 assert!(runtime.graph().is_none());
1039 }
1040
1041 #[tokio::test]
1042 async fn test_runtime_graph_accessor_returns_some_when_configured() {
1043 let runtime = AgentRuntime::builder()
1044 .with_agent_config(simple_config())
1045 .with_graph(GraphStore::new())
1046 .build();
1047 assert!(runtime.graph().is_some());
1048 }
1049
1050 #[tokio::test]
1051 async fn test_runtime_working_memory_accessor() {
1052 let runtime = AgentRuntime::builder()
1053 .with_agent_config(simple_config())
1054 .with_working_memory(WorkingMemory::new(5).unwrap())
1055 .build();
1056 assert!(runtime.working_memory().is_some());
1057 }
1058
1059 #[tokio::test]
1060 async fn test_runtime_with_tool_registered() {
1061 let runtime = AgentRuntime::builder()
1062 .with_agent_config(simple_config())
1063 .register_tool(ToolSpec::new("calc", "math", |_| serde_json::json!(99)))
1064 .build();
1065
1066 let mut call_count = 0;
1067 let session = runtime
1068 .run_agent(AgentId::new("a"), "compute", move |_ctx: String| {
1069 call_count += 1;
1070 let count = call_count;
1071 async move {
1072 if count == 1 {
1073 "Thought: use calc\nAction: calc {}".into()
1074 } else {
1075 "Thought: done\nAction: FINAL_ANSWER result".into()
1076 }
1077 }
1078 })
1079 .await
1080 .unwrap();
1081
1082 assert!(session.step_count() >= 1);
1083 }
1084
1085 #[tokio::test]
1086 async fn test_run_agent_with_graph_relationship_lookup() {
1087 let graph = GraphStore::new();
1088 graph.add_entity(Entity::new("a", "X")).unwrap();
1089 graph.add_entity(Entity::new("b", "Y")).unwrap();
1090 graph
1091 .add_relationship(Relationship::new("a", "b", "LINKS", 1.0))
1092 .unwrap();
1093
1094 let runtime = AgentRuntime::builder()
1095 .with_agent_config(simple_config())
1096 .with_graph(graph)
1097 .build();
1098
1099 let session = runtime
1100 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1101 .await
1102 .unwrap();
1103
1104 assert_eq!(session.graph_lookups, 2); }
1106
1107 #[tokio::test]
1110 async fn test_metrics_active_sessions_decrements_after_run() {
1111 let runtime = AgentRuntime::builder()
1112 .with_agent_config(simple_config())
1113 .build();
1114
1115 runtime
1116 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1117 .await
1118 .unwrap();
1119
1120 assert_eq!(runtime.metrics().active_sessions(), 0);
1121 }
1122
1123 #[tokio::test]
1124 async fn test_metrics_total_sessions_increments() {
1125 let runtime = AgentRuntime::builder()
1126 .with_agent_config(simple_config())
1127 .build();
1128
1129 runtime
1130 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1131 .await
1132 .unwrap();
1133 runtime
1134 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1135 .await
1136 .unwrap();
1137
1138 assert_eq!(runtime.metrics().total_sessions(), 2);
1139 }
1140
1141 #[tokio::test]
1142 async fn test_metrics_backpressure_shed_increments_on_shed() {
1143 let guard = BackpressureGuard::new(1).unwrap();
1144 guard.try_acquire().unwrap(); let runtime = AgentRuntime::builder()
1147 .with_agent_config(simple_config())
1148 .with_backpressure(guard)
1149 .build();
1150
1151 let _ = runtime
1152 .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1153 .await;
1154
1155 assert_eq!(runtime.metrics().backpressure_shed_count(), 1);
1156 }
1157
1158 #[tokio::test]
1159 async fn test_metrics_memory_recall_count_increments() {
1160 let store = EpisodicStore::new();
1161 let agent = AgentId::new("a");
1162 store.add_episode(agent.clone(), "fact", 0.9).unwrap();
1163
1164 let runtime = AgentRuntime::builder()
1165 .with_agent_config(simple_config())
1166 .with_memory(store)
1167 .build();
1168
1169 runtime
1170 .run_agent(agent, "prompt", final_answer_infer)
1171 .await
1172 .unwrap();
1173
1174 assert_eq!(runtime.metrics().memory_recall_count(), 1);
1175 }
1176
1177 #[tokio::test]
1180 async fn test_agent_config_max_memory_tokens_limits_injection() {
1181 let store = EpisodicStore::new();
1182 let agent = AgentId::new("budget-agent");
1183 for i in 0..5 {
1185 let content = format!("{:0>100}", i); store.add_episode(agent.clone(), content, 0.9).unwrap();
1187 }
1188
1189 let cfg = AgentConfig::new(5, "test").with_max_memory_tokens(10);
1191 let runtime = AgentRuntime::builder()
1192 .with_agent_config(cfg)
1193 .with_memory(store)
1194 .build();
1195
1196 let session = runtime
1197 .run_agent(agent, "prompt", final_answer_infer)
1198 .await
1199 .unwrap();
1200
1201 assert!(
1202 session.memory_hits <= 1,
1203 "expected at most 1 memory hit with tight token budget, got {}",
1204 session.memory_hits
1205 );
1206 }
1207
1208 #[tokio::test]
1211 async fn test_working_memory_injected_into_prompt() {
1212 let wm = WorkingMemory::new(10).unwrap();
1213 wm.set("task", "write tests").unwrap();
1214 wm.set("status", "in progress").unwrap();
1215
1216 let runtime = AgentRuntime::builder()
1217 .with_agent_config(simple_config())
1218 .with_working_memory(wm)
1219 .build();
1220
1221 let mut captured_ctx: Option<String> = None;
1222 let captured_ref = &mut captured_ctx;
1223
1224 runtime
1225 .run_agent(AgentId::new("a"), "do stuff", |ctx: String| {
1226 *captured_ref = Some(ctx.clone());
1227 async move { "Thought: done\nAction: FINAL_ANSWER ok".to_string() }
1228 })
1229 .await
1230 .unwrap();
1231
1232 let ctx = captured_ctx.expect("infer should have been called");
1233 assert!(
1234 ctx.contains("Current working state:"),
1235 "expected working memory injection in context, got: {ctx}"
1236 );
1237 assert!(ctx.contains("task: write tests"));
1238 assert!(ctx.contains("status: in progress"));
1239 }
1240
1241 #[tokio::test]
1244 async fn test_token_budget_zero_returns_no_memories() {
1245 let store = EpisodicStore::new();
1247 let agent = AgentId::new("budget-agent");
1248 store.add_episode(agent.clone(), "short", 0.9).unwrap();
1249
1250 let mut config = AgentConfig::new(5, "test-model");
1251 config.max_memory_tokens = Some(0);
1252 config.max_memory_recalls = 10;
1253
1254 let runtime = AgentRuntime::builder()
1255 .with_memory(store)
1256 .with_agent_config(config)
1257 .build();
1258
1259 let steps = runtime
1260 .run_agent(
1261 agent,
1262 "test",
1263 |_ctx| async { "Thought: ok\nAction: FINAL_ANSWER done".to_string() },
1264 )
1265 .await
1266 .unwrap();
1267
1268 assert_eq!(steps.steps.len(), 1);
1270 }
1271
1272 #[tokio::test]
1273 async fn test_token_budget_smaller_than_smallest_item_returns_no_memories() {
1274 let store = EpisodicStore::new();
1275 let agent = AgentId::new("budget-agent2");
1276 store
1278 .add_episode(agent.clone(), "a".repeat(40), 0.9)
1279 .unwrap();
1280
1281 let mut config = AgentConfig::new(5, "test-model");
1282 config.max_memory_tokens = Some(1);
1283 config.max_memory_recalls = 10;
1284
1285 let runtime = AgentRuntime::builder()
1286 .with_memory(store)
1287 .with_agent_config(config)
1288 .build();
1289
1290 let session = runtime
1291 .run_agent(
1292 agent,
1293 "test",
1294 |_ctx| async { "Thought: ok\nAction: FINAL_ANSWER done".to_string() },
1295 )
1296 .await
1297 .unwrap();
1298
1299 assert_eq!(session.memory_hits, 0);
1300 }
1301
1302 #[tokio::test]
1305 async fn test_agent_runtime_quick_runs_agent() {
1306 let runtime = AgentRuntime::quick(5, "test-model");
1307 let agent = AgentId::new("quick-agent");
1308 let session = runtime
1309 .run_agent(agent, "hello", |_ctx| async {
1310 "Thought: done\nAction: FINAL_ANSWER ok".to_string()
1311 })
1312 .await
1313 .unwrap();
1314 assert_eq!(session.step_count(), 1);
1315 }
1316
1317 #[test]
1320 fn test_final_answer_extracts_text() {
1321 let session = AgentSession {
1322 session_id: "s".into(),
1323 agent_id: AgentId::new("a"),
1324 steps: vec![ReActStep {
1325 thought: "done".into(),
1326 action: "FINAL_ANSWER Paris".into(),
1327 observation: "".into(),
1328 step_duration_ms: 0,
1329 }],
1330 memory_hits: 0,
1331 graph_lookups: 0,
1332 duration_ms: 0,
1333 checkpoint_errors: vec![],
1334 };
1335 assert_eq!(session.final_answer(), Some("Paris".to_string()));
1336 }
1337
1338 #[test]
1339 fn test_final_answer_returns_none_without_final_step() {
1340 let session = AgentSession {
1341 session_id: "s".into(),
1342 agent_id: AgentId::new("a"),
1343 steps: vec![ReActStep {
1344 thought: "thinking".into(),
1345 action: "search {}".into(),
1346 observation: "result".into(),
1347 step_duration_ms: 0,
1348 }],
1349 memory_hits: 0,
1350 graph_lookups: 0,
1351 duration_ms: 0,
1352 checkpoint_errors: vec![],
1353 };
1354 assert_eq!(session.final_answer(), None);
1355
1356 let empty_session = AgentSession {
1357 session_id: "s2".into(),
1358 agent_id: AgentId::new("a"),
1359 steps: vec![],
1360 memory_hits: 0,
1361 graph_lookups: 0,
1362 duration_ms: 0,
1363 checkpoint_errors: vec![],
1364 };
1365 assert_eq!(empty_session.final_answer(), None);
1366 }
1367}