Skip to main content

llm_agent_runtime/
runtime.rs

1//! # Module: AgentRuntime
2//!
3//! ## Responsibility
4//! Wire memory, graph, orchestrator, and agent loop into a single coordinator
5//! using a builder pattern. Provides `run_agent` which executes a ReAct loop,
6//! optionally enriching context from memory and graph lookups.
7//!
8//! ## Guarantees
9//! - Builder uses a typestate parameter to enforce `agent_config` at compile time:
10//!   `build()` is only callable once `with_agent_config` has been called.
11//! - `run_agent` is async and returns a typed `AgentSession` with step count,
12//!   durations, and hits.
13//! - Non-panicking: all paths return `Result`
14//!
15//! ## NOT Responsible For
16//! - Actual LLM inference (callers supply a mock/stub inference fn)
17//! - Persistence across process restarts (unless `persistence` feature is enabled)
18
19use crate::agent::{AgentConfig, ReActLoop, ReActStep, ToolSpec};
20use crate::error::AgentRuntimeError;
21use crate::memory::{AgentId, EpisodicStore, WorkingMemory};
22use crate::metrics::RuntimeMetrics;
23use serde::{Deserialize, Serialize};
24use std::marker::PhantomData;
25use std::sync::atomic::Ordering;
26use std::sync::Arc;
27use std::time::Instant;
28
29#[cfg(feature = "graph")]
30use crate::graph::GraphStore;
31
32#[cfg(feature = "orchestrator")]
33use crate::orchestrator::BackpressureGuard;
34
35// ── Typestate markers ─────────────────────────────────────────────────────────
36
37/// Builder state: agent config has not been provided yet.
38pub struct NeedsConfig;
39/// Builder state: agent config has been provided; `build()` is available.
40pub struct HasConfig;
41
42// ── AgentSession ──────────────────────────────────────────────────────────────
43
44/// The result of a single agent run.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct AgentSession {
47    /// Stable unique identifier for this session (UUID v4 string).
48    pub session_id: String,
49    /// The agent ID used for this session.
50    pub agent_id: AgentId,
51    /// All ReAct steps executed during the session.
52    pub steps: Vec<ReActStep>,
53    /// Number of episodic memory retrievals made during the session.
54    pub memory_hits: usize,
55    /// Number of graph lookups made during the session.
56    pub graph_lookups: usize,
57    /// Wall-clock duration of the session in milliseconds.
58    pub duration_ms: u64,
59    /// Non-fatal errors encountered while saving per-step checkpoints.
60    ///
61    /// Populated only when a persistence backend is configured.  A non-empty
62    /// list means some step snapshots may be missing from storage, but the
63    /// session itself completed successfully.
64    #[serde(default)]
65    pub checkpoint_errors: Vec<String>,
66}
67
68impl AgentSession {
69    /// Return the number of steps in the session.
70    ///
71    /// Each [`ReActStep`] in `steps` carries a `step_duration_ms` field measuring
72    /// wall-clock time from inference call to observation for that individual step.
73    /// Use this to identify slow steps:
74    /// ```rust,ignore
75    /// for (i, step) in session.steps.iter().enumerate() {
76    ///     println!("step {i}: {}ms", step.step_duration_ms);
77    /// }
78    /// ```
79    pub fn step_count(&self) -> usize {
80        self.steps.len()
81    }
82
83    /// Return the final answer text from the last step, if available.
84    ///
85    /// Extracts the content after `FINAL_ANSWER` in the last step's `action` field.
86    /// Returns `None` if there are no steps or the last action is not a FINAL_ANSWER.
87    pub fn final_answer(&self) -> Option<String> {
88        let last = self.steps.last()?;
89        let upper = last.action.trim().to_ascii_uppercase();
90        if upper.starts_with("FINAL_ANSWER") {
91            let answer = last.action.trim()["FINAL_ANSWER".len()..].trim().to_owned();
92            Some(answer)
93        } else {
94            None
95        }
96    }
97
98    /// Persist this session as a checkpoint under `"session:<session_id>"`.
99    #[cfg(feature = "persistence")]
100    pub async fn save_checkpoint(
101        &self,
102        backend: &dyn crate::persistence::PersistenceBackend,
103    ) -> Result<(), AgentRuntimeError> {
104        let key = format!("session:{}", self.session_id);
105        let bytes = serde_json::to_vec(self)
106            .map_err(|e| AgentRuntimeError::Persistence(format!("serialize: {e}")))?;
107        backend.save(&key, &bytes).await
108    }
109
110    /// Load a previously saved checkpoint by `session_id`.
111    ///
112    /// Returns `None` if no checkpoint exists for the given ID.
113    #[cfg(feature = "persistence")]
114    pub async fn load_checkpoint(
115        backend: &dyn crate::persistence::PersistenceBackend,
116        session_id: &str,
117    ) -> Result<Option<AgentSession>, AgentRuntimeError> {
118        let key = format!("session:{session_id}");
119        match backend.load(&key).await? {
120            None => Ok(None),
121            Some(bytes) => {
122                let session = serde_json::from_slice(&bytes)
123                    .map_err(|e| AgentRuntimeError::Persistence(format!("deserialize: {e}")))?;
124                Ok(Some(session))
125            }
126        }
127    }
128
129    /// Load the session snapshot saved after step `step` completed.
130    ///
131    /// Alias for [`load_checkpoint_at_step`] — provided for ergonomic
132    /// compatibility with call sites that prefer this naming convention.
133    ///
134    /// [`load_checkpoint_at_step`]: AgentSession::load_checkpoint_at_step
135    #[cfg(feature = "persistence")]
136    pub async fn load_step_checkpoint(
137        backend: &dyn crate::persistence::PersistenceBackend,
138        session_id: &str,
139        step: usize,
140    ) -> Result<Option<AgentSession>, AgentRuntimeError> {
141        Self::load_checkpoint_at_step(backend, session_id, step).await
142    }
143
144    /// Load the session snapshot saved after step `step` completed.
145    ///
146    /// Returns `None` if no checkpoint exists for the given session/step pair.
147    /// The step number is 1-based (step 1 = after the first ReAct iteration).
148    #[cfg(feature = "persistence")]
149    pub async fn load_checkpoint_at_step(
150        backend: &dyn crate::persistence::PersistenceBackend,
151        session_id: &str,
152        step: usize,
153    ) -> Result<Option<AgentSession>, AgentRuntimeError> {
154        let key = format!("session:{session_id}:step:{step}");
155        match backend.load(&key).await? {
156            None => Ok(None),
157            Some(bytes) => {
158                let session = serde_json::from_slice(&bytes)
159                    .map_err(|e| AgentRuntimeError::Persistence(format!("deserialize: {e}")))?;
160                Ok(Some(session))
161            }
162        }
163    }
164}
165
166// ── AgentRuntimeBuilder ───────────────────────────────────────────────────────
167
168/// Builder for `AgentRuntime`.
169///
170/// Uses a typestate parameter `S` to enforce that `with_agent_config` is called
171/// before `build()`.  Calling `build()` on a `AgentRuntimeBuilder<NeedsConfig>`
172/// is a **compile-time error**.
173///
174/// Typical usage:
175/// ```ignore
176/// let runtime = AgentRuntime::builder()      // AgentRuntimeBuilder<NeedsConfig>
177///     .with_memory(store)
178///     .with_agent_config(cfg)                // → AgentRuntimeBuilder<HasConfig>
179///     .build();                              // → AgentRuntime (infallible)
180/// ```
181/// Builder for [`AgentRuntime`].
182pub struct AgentRuntimeBuilder<S = NeedsConfig> {
183    memory: Option<EpisodicStore>,
184    working: Option<WorkingMemory>,
185    #[cfg(feature = "graph")]
186    graph: Option<GraphStore>,
187    #[cfg(feature = "orchestrator")]
188    backpressure: Option<BackpressureGuard>,
189    agent_config: Option<AgentConfig>,
190    tools: Vec<Arc<ToolSpec>>,
191    metrics: Arc<RuntimeMetrics>,
192    #[cfg(feature = "persistence")]
193    checkpoint_backend: Option<Arc<dyn crate::persistence::PersistenceBackend>>,
194    _state: PhantomData<S>,
195}
196
197impl std::fmt::Debug for AgentRuntimeBuilder<NeedsConfig> {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        let mut s = f.debug_struct("AgentRuntimeBuilder<NeedsConfig>");
200        s.field("memory", &self.memory.is_some())
201            .field("working", &self.working.is_some());
202        #[cfg(feature = "graph")]
203        s.field("graph", &self.graph.is_some());
204        #[cfg(feature = "orchestrator")]
205        s.field("backpressure", &self.backpressure.is_some());
206        s.field("tools", &self.tools.len()).finish()
207    }
208}
209
210impl std::fmt::Debug for AgentRuntimeBuilder<HasConfig> {
211    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212        let mut s = f.debug_struct("AgentRuntimeBuilder<HasConfig>");
213        s.field("memory", &self.memory.is_some())
214            .field("working", &self.working.is_some());
215        #[cfg(feature = "graph")]
216        s.field("graph", &self.graph.is_some());
217        #[cfg(feature = "orchestrator")]
218        s.field("backpressure", &self.backpressure.is_some());
219        s.field("agent_config", &self.agent_config.is_some())
220            .field("tools", &self.tools.len())
221            .finish()
222    }
223}
224
225impl Default for AgentRuntimeBuilder<NeedsConfig> {
226    fn default() -> Self {
227        Self {
228            memory: None,
229            working: None,
230            #[cfg(feature = "graph")]
231            graph: None,
232            #[cfg(feature = "orchestrator")]
233            backpressure: None,
234            agent_config: None,
235            tools: Vec::new(),
236            metrics: RuntimeMetrics::new(),
237            #[cfg(feature = "persistence")]
238            checkpoint_backend: None,
239            _state: PhantomData,
240        }
241    }
242}
243
244// Methods available on ALL builder states.
245impl<S> AgentRuntimeBuilder<S> {
246    /// Attach an episodic memory store.
247    pub fn with_memory(mut self, store: EpisodicStore) -> Self {
248        self.memory = Some(store);
249        self
250    }
251
252    /// Attach a working memory store.
253    pub fn with_working_memory(mut self, wm: WorkingMemory) -> Self {
254        self.working = Some(wm);
255        self
256    }
257
258    /// Attach a graph store.
259    #[cfg(feature = "graph")]
260    pub fn with_graph(mut self, graph: GraphStore) -> Self {
261        self.graph = Some(graph);
262        self
263    }
264
265    /// Attach a backpressure guard.
266    #[cfg(feature = "orchestrator")]
267    pub fn with_backpressure(mut self, guard: BackpressureGuard) -> Self {
268        self.backpressure = Some(guard);
269        self
270    }
271
272    /// Register a tool available to the agent loop.
273    pub fn register_tool(mut self, spec: ToolSpec) -> Self {
274        self.tools.push(Arc::new(spec));
275        self
276    }
277
278    /// Register multiple tools at once.
279    ///
280    /// Equivalent to calling [`register_tool`] for each spec.
281    ///
282    /// [`register_tool`]: AgentRuntimeBuilder::register_tool
283    pub fn register_tools(mut self, specs: impl IntoIterator<Item = ToolSpec>) -> Self {
284        for spec in specs {
285            self.tools.push(Arc::new(spec));
286        }
287        self
288    }
289
290    /// Attach a shared `RuntimeMetrics` instance.
291    pub fn with_metrics(mut self, metrics: Arc<RuntimeMetrics>) -> Self {
292        self.metrics = metrics;
293        self
294    }
295
296    /// Attach a persistence backend for session checkpointing.
297    #[cfg(feature = "persistence")]
298    pub fn with_checkpoint_backend(
299        mut self,
300        backend: Arc<dyn crate::persistence::PersistenceBackend>,
301    ) -> Self {
302        self.checkpoint_backend = Some(backend);
303        self
304    }
305}
306
307// `with_agent_config` transitions NeedsConfig → HasConfig.
308impl AgentRuntimeBuilder<NeedsConfig> {
309    /// Create a new builder (equivalent to `Default::default()`).
310    pub fn new() -> Self {
311        Self::default()
312    }
313
314    /// Set the agent loop configuration.
315    ///
316    /// After this call the builder transitions to `AgentRuntimeBuilder<HasConfig>`,
317    /// making `build()` available.
318    /// Set the agent loop configuration.
319    ///
320    /// After this call the builder transitions to `AgentRuntimeBuilder<HasConfig>`,
321    /// making `build()` available.
322    pub fn with_agent_config(self, config: AgentConfig) -> AgentRuntimeBuilder<HasConfig> {
323        AgentRuntimeBuilder {
324            memory: self.memory,
325            working: self.working,
326            #[cfg(feature = "graph")]
327            graph: self.graph,
328            #[cfg(feature = "orchestrator")]
329            backpressure: self.backpressure,
330            agent_config: Some(config),
331            tools: self.tools,
332            metrics: self.metrics,
333            #[cfg(feature = "persistence")]
334            checkpoint_backend: self.checkpoint_backend,
335            _state: PhantomData,
336        }
337    }
338}
339
340// `build()` is only available once we have a config.
341impl AgentRuntimeBuilder<HasConfig> {
342    /// Build the `AgentRuntime`.
343    ///
344    /// This is infallible: the typestate guarantees `agent_config` is present.
345    pub fn build(self) -> AgentRuntime {
346        // SAFETY: `agent_config` is always `Some` in `HasConfig` state because
347        // `with_agent_config` is the only way to reach this state.
348        #[allow(clippy::unwrap_used)]
349        let agent_config = self.agent_config.unwrap();
350
351        AgentRuntime {
352            memory: self.memory,
353            working: self.working,
354            #[cfg(feature = "graph")]
355            graph: self.graph,
356            #[cfg(feature = "orchestrator")]
357            backpressure: self.backpressure,
358            agent_config,
359            tools: self.tools,
360            metrics: self.metrics,
361            #[cfg(feature = "persistence")]
362            checkpoint_backend: self.checkpoint_backend,
363        }
364    }
365}
366
367// ── AgentRuntime ──────────────────────────────────────────────────────────────
368
369/// Unified runtime that coordinates memory, graph, orchestration, and agent loop.
370pub struct AgentRuntime {
371    memory: Option<EpisodicStore>,
372    working: Option<WorkingMemory>,
373    #[cfg(feature = "graph")]
374    graph: Option<GraphStore>,
375    #[cfg(feature = "orchestrator")]
376    backpressure: Option<BackpressureGuard>,
377    agent_config: AgentConfig,
378    tools: Vec<Arc<ToolSpec>>,
379    metrics: Arc<RuntimeMetrics>,
380    #[cfg(feature = "persistence")]
381    checkpoint_backend: Option<Arc<dyn crate::persistence::PersistenceBackend>>,
382}
383
384impl std::fmt::Debug for AgentRuntime {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        let mut s = f.debug_struct("AgentRuntime");
387        s.field("memory", &self.memory.is_some())
388            .field("working", &self.working.is_some());
389        #[cfg(feature = "graph")]
390        s.field("graph", &self.graph.is_some());
391        #[cfg(feature = "orchestrator")]
392        s.field("backpressure", &self.backpressure.is_some());
393        s.field("tools", &self.tools.len());
394        #[cfg(feature = "persistence")]
395        s.field("checkpoint_backend", &self.checkpoint_backend.is_some());
396        s.finish()
397    }
398}
399
400impl AgentRuntime {
401    /// Return a new builder in the `NeedsConfig` state.
402    pub fn builder() -> AgentRuntimeBuilder<NeedsConfig> {
403        AgentRuntimeBuilder::new()
404    }
405
406    /// Construct a minimal `AgentRuntime` in one call with sensible defaults.
407    pub fn quick(max_iterations: usize, model: impl Into<String>) -> Self {
408        AgentRuntime::builder()
409            .with_agent_config(AgentConfig::new(max_iterations, model))
410            .build()
411    }
412
413    /// Return a shared reference to the runtime metrics.
414    pub fn metrics(&self) -> Arc<RuntimeMetrics> {
415        Arc::clone(&self.metrics)
416    }
417
418    /// Run the agent loop for the given prompt.
419    ///
420    /// Optionally recalls episodic memories and injects them into the context.
421    /// Optionally enforces backpressure before starting.
422    ///
423    /// # Arguments
424    /// * `agent_id` — identifies the agent for memory retrieval
425    /// * `prompt` — the user's input prompt
426    /// * `infer` — async inference function: `(context: String) -> impl Future<Output = String>`
427    ///
428    /// # Returns
429    /// An `AgentSession` with step count, hits, duration, and a stable session ID.
430    #[tracing::instrument(skip(self, infer), fields(agent_id = %agent_id))]
431    pub async fn run_agent<F, Fut>(
432        &self,
433        agent_id: AgentId,
434        prompt: &str,
435        infer: F,
436    ) -> Result<AgentSession, AgentRuntimeError>
437    where
438        F: FnMut(String) -> Fut,
439        Fut: std::future::Future<Output = String>,
440    {
441        // Acquire backpressure slot before counting the session — shed requests
442        // must not inflate total_sessions or active_sessions.
443        #[cfg(feature = "orchestrator")]
444        {
445            let backpressure_result = if let Some(ref guard) = self.backpressure {
446                guard.try_acquire()
447            } else {
448                Ok(())
449            };
450            if let Err(e) = backpressure_result {
451                tracing::warn!(agent_id = %agent_id, error = %e, "backpressure shed: rejecting session");
452                self.metrics
453                    .backpressure_shed_count
454                    .fetch_add(1, Ordering::Relaxed);
455                return Err(e);
456            }
457        }
458
459        self.metrics.total_sessions.fetch_add(1, Ordering::Relaxed);
460        self.metrics.active_sessions.fetch_add(1, Ordering::Relaxed);
461
462        tracing::info!(agent_id = %agent_id, "agent session starting");
463        let outcome = self.run_agent_inner(agent_id.clone(), prompt, infer).await;
464
465        // Always release backpressure — success or error.
466        #[cfg(feature = "orchestrator")]
467        if let Some(ref guard) = self.backpressure {
468            let _ = guard.release();
469        }
470
471        // Saturating decrement — guards against underflow to usize::MAX if
472        // active_sessions is somehow already 0 (e.g. double-decrement bug).
473        let _ = self.metrics.active_sessions.fetch_update(
474            Ordering::Relaxed,
475            Ordering::Relaxed,
476            |v| Some(v.saturating_sub(1)),
477        );
478
479        match &outcome {
480            Ok(session) => {
481                tracing::info!(
482                    agent_id = %agent_id,
483                    session_id = %session.session_id,
484                    steps = session.step_count(),
485                    duration_ms = session.duration_ms,
486                    "agent session completed"
487                );
488                self.metrics
489                    .total_steps
490                    .fetch_add(session.step_count() as u64, Ordering::Relaxed);
491            }
492            Err(e) => {
493                tracing::error!(agent_id = %agent_id, error = %e, "agent session failed");
494            }
495        }
496
497        outcome
498    }
499
500    /// Inner implementation of `run_agent`, called after backpressure is acquired.
501    #[tracing::instrument(skip(self, infer), fields(agent_id = %agent_id, session_id = tracing::field::Empty))]
502    async fn run_agent_inner<F, Fut>(
503        &self,
504        agent_id: AgentId,
505        prompt: &str,
506        infer: F,
507    ) -> Result<AgentSession, AgentRuntimeError>
508    where
509        F: FnMut(String) -> Fut,
510        Fut: std::future::Future<Output = String>,
511    {
512        let start = Instant::now();
513        let session_id = uuid::Uuid::new_v4().to_string();
514
515        let mut memory_hits = 0usize;
516        let mut graph_lookups = 0usize;
517
518        // Build enriched prompt from episodic memory.
519        let enriched_prompt = if let Some(ref store) = self.memory {
520            let memories = store.recall(&agent_id, self.agent_config.max_memory_recalls)?;
521
522            // Apply token budget if configured.
523            let memories = if let Some(token_budget) = self.agent_config.max_memory_tokens {
524                let mut used = 0usize;
525                memories
526                    .into_iter()
527                    .filter(|m| {
528                        let tokens = (m.content.len() / 4).max(1);
529                        if used + tokens <= token_budget {
530                            used += tokens;
531                            true
532                        } else {
533                            false
534                        }
535                    })
536                    .collect::<Vec<_>>()
537            } else {
538                memories
539            };
540
541            memory_hits = memories.len();
542            self.metrics
543                .memory_recall_count
544                .fetch_add(1, Ordering::Relaxed);
545
546            if let Some(budget) = self.agent_config.max_memory_tokens {
547                tracing::debug!(
548                    "memory token budget: {budget}, injecting {} items",
549                    memory_hits
550                );
551            } else {
552                tracing::debug!("enriched prompt with {} memory items", memory_hits);
553            }
554
555            if memories.is_empty() {
556                prompt.to_owned()
557            } else {
558                let mem_context: Vec<String> = memories
559                    .iter()
560                    .map(|m| format!("- {}", m.content))
561                    .collect();
562                format!(
563                    "Relevant memories:\n{}\n\nCurrent prompt: {prompt}",
564                    mem_context.join("\n")
565                )
566            }
567        } else {
568            prompt.to_owned()
569        };
570
571        // Inject working memory into prompt.
572        let enriched_prompt = if let Some(ref wm) = self.working {
573            let entries = wm.entries()?;
574            if entries.is_empty() {
575                enriched_prompt
576            } else {
577                let wm_context: Vec<String> =
578                    entries.iter().map(|(k, v)| format!("  {k}: {v}")).collect();
579                format!(
580                    "{enriched_prompt}\n\nCurrent working state:\n{}",
581                    wm_context.join("\n")
582                )
583            }
584        } else {
585            enriched_prompt
586        };
587
588        // Count graph entities as "lookups" for session metadata.
589        #[cfg(feature = "graph")]
590        if let Some(ref graph) = self.graph {
591            graph_lookups = graph.entity_count()?;
592            tracing::debug!("graph has {} entities", graph_lookups);
593        }
594
595        // Build the ReAct loop and register tools.
596        // Each ToolSpec is stored as an Arc so we can clone the Arc into the
597        // handler closure without moving ownership out of self.tools.
598        // Required fields and the per-tool circuit breaker are preserved so
599        // that validation and fast-fail behaviour work correctly at run time.
600        let mut react_loop = ReActLoop::new(self.agent_config.clone())
601            .with_metrics(Arc::clone(&self.metrics));
602
603        // Item 11 — wire per-step loop checkpointing.
604        #[cfg(feature = "persistence")]
605        if let Some(ref backend) = self.checkpoint_backend {
606            react_loop = react_loop
607                .with_step_checkpoint(Arc::clone(backend), session_id.clone());
608        }
609
610        for tool in &self.tools {
611            let tool_arc = Arc::clone(tool);
612            let required_fields = tool_arc.required_fields.clone();
613            #[cfg(feature = "orchestrator")]
614            let circuit_breaker = tool_arc.circuit_breaker.clone();
615
616            let mut spec = ToolSpec::new_async(
617                tool_arc.name.clone(),
618                tool_arc.description.clone(),
619                move |args| {
620                    let t = Arc::clone(&tool_arc);
621                    Box::pin(async move { t.call(args).await })
622                },
623            )
624            .with_required_fields(required_fields);
625
626            #[cfg(feature = "orchestrator")]
627            if let Some(cb) = circuit_breaker {
628                spec = spec.with_circuit_breaker(cb);
629            }
630
631            react_loop.register_tool(spec);
632        }
633
634        // Record the session_id into the current tracing span so that all
635        // child spans (ReActLoop iterations, tool calls) carry this field.
636        tracing::Span::current().record("session_id", &session_id.as_str());
637
638        let steps = react_loop.run(&enriched_prompt, infer).await?;
639        let duration_ms = start.elapsed().as_millis() as u64;
640
641        // Item 6 — collect per-step checkpoint errors; surfaced in AgentSession.
642        #[cfg(feature = "persistence")]
643        let mut ckpt_errors: Vec<String> = Vec::new();
644
645        // Save final checkpoint if a backend is configured.
646        #[cfg(feature = "persistence")]
647        if let Some(ref backend) = self.checkpoint_backend {
648            tracing::info!(session_id = %session_id, "saving session checkpoint");
649
650            // Build a temporary session without errors to save as the base checkpoint.
651            let tmp = AgentSession {
652                session_id: session_id.clone(),
653                agent_id: agent_id.clone(),
654                steps: steps.clone(),
655                memory_hits,
656                graph_lookups,
657                duration_ms,
658                checkpoint_errors: vec![],
659            };
660            tmp.save_checkpoint(backend.as_ref()).await?;
661
662            // Save incremental per-step consolidated snapshots.
663            for i in 1..=steps.len() {
664                let partial = AgentSession {
665                    session_id: session_id.clone(),
666                    agent_id: agent_id.clone(),
667                    steps: steps[..i].to_vec(),
668                    memory_hits,
669                    graph_lookups,
670                    duration_ms,
671                    checkpoint_errors: vec![],
672                };
673                let key = format!("session:{session_id}:step:{i}");
674                match serde_json::to_vec(&partial) {
675                    Ok(bytes) => {
676                        if let Err(e) = backend.save(&key, &bytes).await {
677                            let msg = format!("session:{session_id} step:{i} save: {e}");
678                            tracing::warn!("{}", msg);
679                            ckpt_errors.push(msg);
680                        }
681                    }
682                    Err(e) => {
683                        let msg =
684                            format!("session:{session_id} step:{i} serialise: {e}");
685                        tracing::warn!("{}", msg);
686                        ckpt_errors.push(msg);
687                    }
688                }
689            }
690        }
691
692        let session = AgentSession {
693            session_id,
694            agent_id,
695            steps,
696            memory_hits,
697            graph_lookups,
698            duration_ms,
699            #[cfg(feature = "persistence")]
700            checkpoint_errors: ckpt_errors,
701            #[cfg(not(feature = "persistence"))]
702            checkpoint_errors: vec![],
703        };
704
705        Ok(session)
706    }
707
708    /// Return a reference to the episodic memory store, if configured.
709    pub fn memory(&self) -> Option<&EpisodicStore> {
710        self.memory.as_ref()
711    }
712
713    /// Return a reference to the graph store, if configured.
714    #[cfg(feature = "graph")]
715    pub fn graph(&self) -> Option<&GraphStore> {
716        self.graph.as_ref()
717    }
718
719    /// Return a reference to the working memory, if configured.
720    pub fn working_memory(&self) -> Option<&WorkingMemory> {
721        self.working.as_ref()
722    }
723
724    /// Gracefully shut down the runtime.
725    ///
726    /// Logs a structured shutdown event with the final metrics snapshot.
727    /// If the `persistence` feature is enabled and a checkpoint backend is
728    /// configured, writes a sentinel key so operators can confirm clean shutdown.
729    ///
730    /// After calling `shutdown`, the runtime should not be used again.
731    pub async fn shutdown(&self) {
732        tracing::info!("AgentRuntime shutting down");
733        tracing::info!(
734            active_sessions = self.metrics.active_sessions(),
735            total_sessions = self.metrics.total_sessions(),
736            total_steps = self.metrics.total_steps(),
737            total_tool_calls = self.metrics.total_tool_calls(),
738            failed_tool_calls = self.metrics.failed_tool_calls(),
739            "final metrics snapshot on shutdown"
740        );
741
742        #[cfg(feature = "persistence")]
743        if let Some(ref backend) = self.checkpoint_backend {
744            let ts = chrono::Utc::now().to_rfc3339();
745            match backend.save("runtime:shutdown", ts.as_bytes()).await {
746                Ok(()) => tracing::debug!("shutdown sentinel saved"),
747                Err(e) => tracing::warn!(error = %e, "failed to save shutdown sentinel"),
748            }
749        }
750
751        tracing::info!("AgentRuntime shutdown complete");
752    }
753}
754
755// ── Tests ─────────────────────────────────────────────────────────────────────
756
757#[cfg(test)]
758mod tests {
759    use super::*;
760    use crate::graph::{Entity, GraphStore, Relationship};
761    use crate::memory::EpisodicStore;
762
763    fn simple_config() -> AgentConfig {
764        AgentConfig::new(5, "test")
765    }
766
767    async fn final_answer_infer(_ctx: String) -> String {
768        "Thought: done\nAction: FINAL_ANSWER 42".into()
769    }
770
771    // ── Builder ───────────────────────────────────────────────────────────────
772
773    // NOTE: test_builder_fails_without_agent_config has been removed.
774    // The typestate pattern makes calling .build() without .with_agent_config()
775    // a *compile-time error* — AgentRuntimeBuilder<NeedsConfig> has no build()
776    // method.  There is nothing to test at runtime.
777
778    /// Verifies that the builder compiles and produces a runtime when config is
779    /// provided.  This is the runtime-observable counterpart to the former
780    /// "fails without config" test.
781    #[tokio::test]
782    async fn test_builder_with_config_compiles() {
783        let _runtime = AgentRuntime::builder()
784            .with_agent_config(simple_config())
785            .build();
786        // If this compiles and runs, the typestate transition worked correctly.
787    }
788
789    #[tokio::test]
790    async fn test_builder_succeeds_with_minimal_config() {
791        let _runtime = AgentRuntime::builder()
792            .with_agent_config(simple_config())
793            .build();
794    }
795
796    #[tokio::test]
797    async fn test_builder_with_all_subsystems() {
798        let _runtime = AgentRuntime::builder()
799            .with_agent_config(simple_config())
800            .with_memory(EpisodicStore::new())
801            .with_graph(GraphStore::new())
802            .with_working_memory(WorkingMemory::new(10).unwrap())
803            .with_backpressure(BackpressureGuard::new(5).unwrap())
804            .build();
805    }
806
807    #[tokio::test]
808    async fn test_builder_produces_runtime_with_config() {
809        // Confirm the built runtime accepts a run_agent call — the most direct
810        // evidence that the builder wired everything correctly.
811        let runtime = AgentRuntime::builder()
812            .with_agent_config(simple_config())
813            .build();
814        let session = runtime
815            .run_agent(AgentId::new("agent-x"), "hello", final_answer_infer)
816            .await
817            .unwrap();
818        assert!(session.step_count() >= 1);
819        assert!(!session.session_id.is_empty());
820    }
821
822    // ── run_agent ─────────────────────────────────────────────────────────────
823
824    #[tokio::test]
825    async fn test_run_agent_returns_session_with_steps() {
826        let runtime = AgentRuntime::builder()
827            .with_agent_config(simple_config())
828            .build();
829
830        let session = runtime
831            .run_agent(AgentId::new("agent-1"), "hello", final_answer_infer)
832            .await
833            .unwrap();
834
835        assert_eq!(session.step_count(), 1);
836    }
837
838    #[tokio::test]
839    async fn test_run_agent_session_has_agent_id() {
840        let runtime = AgentRuntime::builder()
841            .with_agent_config(simple_config())
842            .build();
843
844        let session = runtime
845            .run_agent(AgentId::new("agent-42"), "hello", final_answer_infer)
846            .await
847            .unwrap();
848
849        assert_eq!(session.agent_id.0, "agent-42");
850    }
851
852    #[tokio::test]
853    async fn test_run_agent_session_duration_is_set() {
854        let runtime = AgentRuntime::builder()
855            .with_agent_config(simple_config())
856            .build();
857
858        let session = runtime
859            .run_agent(AgentId::new("a"), "hello", final_answer_infer)
860            .await
861            .unwrap();
862
863        // Duration should be non-negative (0 ms is valid for a fast mock)
864        let _ = session.duration_ms; // just verify it compiles and is set
865    }
866
867    #[tokio::test]
868    async fn test_run_agent_session_has_session_id() {
869        let runtime = AgentRuntime::builder()
870            .with_agent_config(simple_config())
871            .build();
872
873        let session = runtime
874            .run_agent(AgentId::new("a"), "hello", final_answer_infer)
875            .await
876            .unwrap();
877
878        // session_id must be a non-empty UUID string
879        assert!(!session.session_id.is_empty());
880        assert_eq!(session.session_id.len(), 36); // UUID v4 canonical form
881    }
882
883    #[tokio::test]
884    async fn test_run_agent_memory_hits_zero_without_memory() {
885        let runtime = AgentRuntime::builder()
886            .with_agent_config(simple_config())
887            .build();
888
889        let session = runtime
890            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
891            .await
892            .unwrap();
893
894        assert_eq!(session.memory_hits, 0);
895    }
896
897    #[tokio::test]
898    async fn test_run_agent_memory_hits_counts_recalled_items() {
899        let store = EpisodicStore::new();
900        let agent = AgentId::new("mem-agent");
901        store
902            .add_episode(agent.clone(), "remembered fact", 0.8)
903            .unwrap();
904
905        let runtime = AgentRuntime::builder()
906            .with_agent_config(simple_config())
907            .with_memory(store)
908            .build();
909
910        let session = runtime
911            .run_agent(agent, "prompt", final_answer_infer)
912            .await
913            .unwrap();
914
915        assert_eq!(session.memory_hits, 1);
916    }
917
918    #[tokio::test]
919    async fn test_run_agent_graph_lookups_counts_entities() {
920        let graph = GraphStore::new();
921        graph.add_entity(Entity::new("e1", "Node")).unwrap();
922        graph.add_entity(Entity::new("e2", "Node")).unwrap();
923
924        let runtime = AgentRuntime::builder()
925            .with_agent_config(simple_config())
926            .with_graph(graph)
927            .build();
928
929        let session = runtime
930            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
931            .await
932            .unwrap();
933
934        assert_eq!(session.graph_lookups, 2);
935    }
936
937    #[tokio::test]
938    async fn test_run_agent_backpressure_released_after_run() {
939        let guard = BackpressureGuard::new(3).unwrap();
940
941        let runtime = AgentRuntime::builder()
942            .with_agent_config(simple_config())
943            .with_backpressure(guard.clone())
944            .build();
945
946        runtime
947            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
948            .await
949            .unwrap();
950
951        assert_eq!(guard.depth().unwrap(), 0);
952    }
953
954    #[tokio::test]
955    async fn test_run_agent_backpressure_sheds_when_full() {
956        let guard = BackpressureGuard::new(1).unwrap();
957        guard.try_acquire().unwrap(); // pre-fill
958
959        let runtime = AgentRuntime::builder()
960            .with_agent_config(simple_config())
961            .with_backpressure(guard)
962            .build();
963
964        let result = runtime
965            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
966            .await;
967        assert!(matches!(
968            result,
969            Err(AgentRuntimeError::BackpressureShed { .. })
970        ));
971    }
972
973    #[tokio::test]
974    async fn test_run_agent_max_iterations_error_propagated() {
975        let cfg = AgentConfig::new(2, "model");
976        let runtime = AgentRuntime::builder().with_agent_config(cfg).build();
977
978        // Simulate an infer fn that always produces FINAL_ANSWER immediately
979        let result = runtime
980            .run_agent(AgentId::new("a"), "prompt", |_ctx: String| async {
981                "Thought: looping\nAction: FINAL_ANSWER done".to_string()
982            })
983            .await;
984        assert!(result.is_ok()); // final answer on first call, ok
985    }
986
987    #[tokio::test]
988    async fn test_agent_session_step_count_matches_steps() {
989        let session = AgentSession {
990            session_id: "test-session-id".into(),
991            agent_id: AgentId::new("a"),
992            steps: vec![
993                ReActStep {
994                    thought: "t".into(),
995                    action: "a".into(),
996                    observation: "o".into(),
997                    step_duration_ms: 0,
998                },
999                ReActStep {
1000                    thought: "t2".into(),
1001                    action: "FINAL_ANSWER".into(),
1002                    observation: "done".into(),
1003                    step_duration_ms: 0,
1004                },
1005            ],
1006            memory_hits: 0,
1007            graph_lookups: 0,
1008            duration_ms: 10,
1009            checkpoint_errors: vec![],
1010        };
1011        assert_eq!(session.step_count(), 2);
1012    }
1013
1014    // ── Accessor methods ──────────────────────────────────────────────────────
1015
1016    #[tokio::test]
1017    async fn test_runtime_memory_accessor_returns_none_when_not_configured() {
1018        let runtime = AgentRuntime::builder()
1019            .with_agent_config(simple_config())
1020            .build();
1021        assert!(runtime.memory().is_none());
1022    }
1023
1024    #[tokio::test]
1025    async fn test_runtime_memory_accessor_returns_some_when_configured() {
1026        let runtime = AgentRuntime::builder()
1027            .with_agent_config(simple_config())
1028            .with_memory(EpisodicStore::new())
1029            .build();
1030        assert!(runtime.memory().is_some());
1031    }
1032
1033    #[tokio::test]
1034    async fn test_runtime_graph_accessor_returns_none_when_not_configured() {
1035        let runtime = AgentRuntime::builder()
1036            .with_agent_config(simple_config())
1037            .build();
1038        assert!(runtime.graph().is_none());
1039    }
1040
1041    #[tokio::test]
1042    async fn test_runtime_graph_accessor_returns_some_when_configured() {
1043        let runtime = AgentRuntime::builder()
1044            .with_agent_config(simple_config())
1045            .with_graph(GraphStore::new())
1046            .build();
1047        assert!(runtime.graph().is_some());
1048    }
1049
1050    #[tokio::test]
1051    async fn test_runtime_working_memory_accessor() {
1052        let runtime = AgentRuntime::builder()
1053            .with_agent_config(simple_config())
1054            .with_working_memory(WorkingMemory::new(5).unwrap())
1055            .build();
1056        assert!(runtime.working_memory().is_some());
1057    }
1058
1059    #[tokio::test]
1060    async fn test_runtime_with_tool_registered() {
1061        let runtime = AgentRuntime::builder()
1062            .with_agent_config(simple_config())
1063            .register_tool(ToolSpec::new("calc", "math", |_| serde_json::json!(99)))
1064            .build();
1065
1066        let mut call_count = 0;
1067        let session = runtime
1068            .run_agent(AgentId::new("a"), "compute", move |_ctx: String| {
1069                call_count += 1;
1070                let count = call_count;
1071                async move {
1072                    if count == 1 {
1073                        "Thought: use calc\nAction: calc {}".into()
1074                    } else {
1075                        "Thought: done\nAction: FINAL_ANSWER result".into()
1076                    }
1077                }
1078            })
1079            .await
1080            .unwrap();
1081
1082        assert!(session.step_count() >= 1);
1083    }
1084
1085    #[tokio::test]
1086    async fn test_run_agent_with_graph_relationship_lookup() {
1087        let graph = GraphStore::new();
1088        graph.add_entity(Entity::new("a", "X")).unwrap();
1089        graph.add_entity(Entity::new("b", "Y")).unwrap();
1090        graph
1091            .add_relationship(Relationship::new("a", "b", "LINKS", 1.0))
1092            .unwrap();
1093
1094        let runtime = AgentRuntime::builder()
1095            .with_agent_config(simple_config())
1096            .with_graph(graph)
1097            .build();
1098
1099        let session = runtime
1100            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1101            .await
1102            .unwrap();
1103
1104        assert_eq!(session.graph_lookups, 2); // 2 entities
1105    }
1106
1107    // ── Metrics ───────────────────────────────────────────────────────────────
1108
1109    #[tokio::test]
1110    async fn test_metrics_active_sessions_decrements_after_run() {
1111        let runtime = AgentRuntime::builder()
1112            .with_agent_config(simple_config())
1113            .build();
1114
1115        runtime
1116            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1117            .await
1118            .unwrap();
1119
1120        assert_eq!(runtime.metrics().active_sessions(), 0);
1121    }
1122
1123    #[tokio::test]
1124    async fn test_metrics_total_sessions_increments() {
1125        let runtime = AgentRuntime::builder()
1126            .with_agent_config(simple_config())
1127            .build();
1128
1129        runtime
1130            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1131            .await
1132            .unwrap();
1133        runtime
1134            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1135            .await
1136            .unwrap();
1137
1138        assert_eq!(runtime.metrics().total_sessions(), 2);
1139    }
1140
1141    #[tokio::test]
1142    async fn test_metrics_backpressure_shed_increments_on_shed() {
1143        let guard = BackpressureGuard::new(1).unwrap();
1144        guard.try_acquire().unwrap(); // pre-fill
1145
1146        let runtime = AgentRuntime::builder()
1147            .with_agent_config(simple_config())
1148            .with_backpressure(guard)
1149            .build();
1150
1151        let _ = runtime
1152            .run_agent(AgentId::new("a"), "prompt", final_answer_infer)
1153            .await;
1154
1155        assert_eq!(runtime.metrics().backpressure_shed_count(), 1);
1156    }
1157
1158    #[tokio::test]
1159    async fn test_metrics_memory_recall_count_increments() {
1160        let store = EpisodicStore::new();
1161        let agent = AgentId::new("a");
1162        store.add_episode(agent.clone(), "fact", 0.9).unwrap();
1163
1164        let runtime = AgentRuntime::builder()
1165            .with_agent_config(simple_config())
1166            .with_memory(store)
1167            .build();
1168
1169        runtime
1170            .run_agent(agent, "prompt", final_answer_infer)
1171            .await
1172            .unwrap();
1173
1174        assert_eq!(runtime.metrics().memory_recall_count(), 1);
1175    }
1176
1177    // ── Memory token budgeting ────────────────────────────────────────────────
1178
1179    #[tokio::test]
1180    async fn test_agent_config_max_memory_tokens_limits_injection() {
1181        let store = EpisodicStore::new();
1182        let agent = AgentId::new("budget-agent");
1183        // Each memory has ~100 chars → ~25 tokens each
1184        for i in 0..5 {
1185            let content = format!("{:0>100}", i); // 100-char string
1186            store.add_episode(agent.clone(), content, 0.9).unwrap();
1187        }
1188
1189        // Token budget of 10 allows at most ~1 memory (each is ~25 tokens).
1190        let cfg = AgentConfig::new(5, "test").with_max_memory_tokens(10);
1191        let runtime = AgentRuntime::builder()
1192            .with_agent_config(cfg)
1193            .with_memory(store)
1194            .build();
1195
1196        let session = runtime
1197            .run_agent(agent, "prompt", final_answer_infer)
1198            .await
1199            .unwrap();
1200
1201        assert!(
1202            session.memory_hits <= 1,
1203            "expected at most 1 memory hit with tight token budget, got {}",
1204            session.memory_hits
1205        );
1206    }
1207
1208    // ── Working memory injection ──────────────────────────────────────────────
1209
1210    #[tokio::test]
1211    async fn test_working_memory_injected_into_prompt() {
1212        let wm = WorkingMemory::new(10).unwrap();
1213        wm.set("task", "write tests").unwrap();
1214        wm.set("status", "in progress").unwrap();
1215
1216        let runtime = AgentRuntime::builder()
1217            .with_agent_config(simple_config())
1218            .with_working_memory(wm)
1219            .build();
1220
1221        let mut captured_ctx: Option<String> = None;
1222        let captured_ref = &mut captured_ctx;
1223
1224        runtime
1225            .run_agent(AgentId::new("a"), "do stuff", |ctx: String| {
1226                *captured_ref = Some(ctx.clone());
1227                async move { "Thought: done\nAction: FINAL_ANSWER ok".to_string() }
1228            })
1229            .await
1230            .unwrap();
1231
1232        let ctx = captured_ctx.expect("infer should have been called");
1233        assert!(
1234            ctx.contains("Current working state:"),
1235            "expected working memory injection in context, got: {ctx}"
1236        );
1237        assert!(ctx.contains("task: write tests"));
1238        assert!(ctx.contains("status: in progress"));
1239    }
1240
1241    // ── Task 15: Token budget edge case tests ─────────────────────────────────
1242
1243    #[tokio::test]
1244    async fn test_token_budget_zero_returns_no_memories() {
1245        // A budget of 0 should result in no memories being injected.
1246        let store = EpisodicStore::new();
1247        let agent = AgentId::new("budget-agent");
1248        store.add_episode(agent.clone(), "short", 0.9).unwrap();
1249
1250        let mut config = AgentConfig::new(5, "test-model");
1251        config.max_memory_tokens = Some(0);
1252        config.max_memory_recalls = 10;
1253
1254        let runtime = AgentRuntime::builder()
1255            .with_memory(store)
1256            .with_agent_config(config)
1257            .build();
1258
1259        let steps = runtime
1260            .run_agent(
1261                agent,
1262                "test",
1263                |_ctx| async { "Thought: ok\nAction: FINAL_ANSWER done".to_string() },
1264            )
1265            .await
1266            .unwrap();
1267
1268        // The run should succeed; we just verify it doesn't panic or error.
1269        assert_eq!(steps.steps.len(), 1);
1270    }
1271
1272    #[tokio::test]
1273    async fn test_token_budget_smaller_than_smallest_item_returns_no_memories() {
1274        let store = EpisodicStore::new();
1275        let agent = AgentId::new("budget-agent2");
1276        // Content is 40 chars → ~10 tokens (40/4). Budget = 1 → none fit.
1277        store
1278            .add_episode(agent.clone(), "a".repeat(40), 0.9)
1279            .unwrap();
1280
1281        let mut config = AgentConfig::new(5, "test-model");
1282        config.max_memory_tokens = Some(1);
1283        config.max_memory_recalls = 10;
1284
1285        let runtime = AgentRuntime::builder()
1286            .with_memory(store)
1287            .with_agent_config(config)
1288            .build();
1289
1290        let session = runtime
1291            .run_agent(
1292                agent,
1293                "test",
1294                |_ctx| async { "Thought: ok\nAction: FINAL_ANSWER done".to_string() },
1295            )
1296            .await
1297            .unwrap();
1298
1299        assert_eq!(session.memory_hits, 0);
1300    }
1301
1302    // ── Improvement 8: AgentRuntime::quick() ──────────────────────────────────
1303
1304    #[tokio::test]
1305    async fn test_agent_runtime_quick_runs_agent() {
1306        let runtime = AgentRuntime::quick(5, "test-model");
1307        let agent = AgentId::new("quick-agent");
1308        let session = runtime
1309            .run_agent(agent, "hello", |_ctx| async {
1310                "Thought: done\nAction: FINAL_ANSWER ok".to_string()
1311            })
1312            .await
1313            .unwrap();
1314        assert_eq!(session.step_count(), 1);
1315    }
1316
1317    // ── #1 final_answer() ─────────────────────────────────────────────────────
1318
1319    #[test]
1320    fn test_final_answer_extracts_text() {
1321        let session = AgentSession {
1322            session_id: "s".into(),
1323            agent_id: AgentId::new("a"),
1324            steps: vec![ReActStep {
1325                thought: "done".into(),
1326                action: "FINAL_ANSWER Paris".into(),
1327                observation: "".into(),
1328                step_duration_ms: 0,
1329            }],
1330            memory_hits: 0,
1331            graph_lookups: 0,
1332            duration_ms: 0,
1333            checkpoint_errors: vec![],
1334        };
1335        assert_eq!(session.final_answer(), Some("Paris".to_string()));
1336    }
1337
1338    #[test]
1339    fn test_final_answer_returns_none_without_final_step() {
1340        let session = AgentSession {
1341            session_id: "s".into(),
1342            agent_id: AgentId::new("a"),
1343            steps: vec![ReActStep {
1344                thought: "thinking".into(),
1345                action: "search {}".into(),
1346                observation: "result".into(),
1347                step_duration_ms: 0,
1348            }],
1349            memory_hits: 0,
1350            graph_lookups: 0,
1351            duration_ms: 0,
1352            checkpoint_errors: vec![],
1353        };
1354        assert_eq!(session.final_answer(), None);
1355
1356        let empty_session = AgentSession {
1357            session_id: "s2".into(),
1358            agent_id: AgentId::new("a"),
1359            steps: vec![],
1360            memory_hits: 0,
1361            graph_lookups: 0,
1362            duration_ms: 0,
1363            checkpoint_errors: vec![],
1364        };
1365        assert_eq!(empty_session.final_answer(), None);
1366    }
1367}