Skip to main content

rig_core/
memory.rs

1//! Conversation memory: Rig-managed persistent conversation history for agents.
2//!
3//! Memory differs from existing agent context features:
4//! - [`crate::agent::AgentBuilder::context`]: static documents always included in prompts.
5//! - [`crate::agent::AgentBuilder::dynamic_context`]: RAG documents fetched from a vector store.
6//! - [`crate::agent::prompt_request::PromptRequest::with_history`]: caller-managed message history.
7//! - **Memory** (this module): Rig-managed history loaded and saved automatically per
8//!   conversation id.
9//!
10//! # Example
11//!
12//! ```no_run
13//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
14//! use rig_core::client::{CompletionClient, ProviderClient};
15//! use rig_core::completion::Prompt;
16//! use rig_core::memory::InMemoryConversationMemory;
17//! use rig_core::providers::openai;
18//!
19//! let memory = InMemoryConversationMemory::new();
20//!
21//! let openai = openai::Client::from_env()?;
22//! let agent = openai.agent("gpt-4o").memory(memory).build();
23//!
24//! agent.prompt("My name is Alice.")
25//!     .conversation("thread-1")
26//!     .await?;
27//!
28//! let answer = agent.prompt("What's my name?")
29//!     .conversation("thread-1")
30//!     .await?;
31//! # Ok(()) }
32//! ```
33//!
34//! Truncation, summarization, and other history-shaping policies live in the
35//! `rig-memory` companion crate. To shape history inside the in-tree backend,
36//! pass a closure to [`InMemoryConversationMemory::with_filter`].
37
38use std::{
39    collections::HashMap,
40    sync::{Arc, Mutex},
41};
42
43use crate::{
44    completion::Message,
45    wasm_compat::{WasmBoxedFuture, WasmCompatSend, WasmCompatSync},
46};
47
48/// Boxed error source for memory backend failures.
49#[cfg(not(target_family = "wasm"))]
50pub type MemoryBackendError = Box<dyn std::error::Error + Send + Sync + 'static>;
51
52/// Boxed error source for memory backend failures.
53#[cfg(target_family = "wasm")]
54pub type MemoryBackendError = Box<dyn std::error::Error + 'static>;
55
56/// Errors produced by a [`ConversationMemory`] backend.
57#[derive(Debug, thiserror::Error)]
58#[non_exhaustive]
59pub enum MemoryError {
60    /// The backing store failed to load, append, or clear messages.
61    #[error("Memory backend error: {0}")]
62    Backend(MemoryBackendError),
63
64    /// A history-shaping filter or policy rejected the loaded history.
65    #[error("Memory policy error: {0}")]
66    Policy(String),
67
68    /// An internal invariant was violated (e.g. a poisoned in-process lock).
69    /// Distinct from [`MemoryError::Backend`], which is reserved for failures
70    /// of the underlying conversation store.
71    #[error("Memory internal error: {0}")]
72    Internal(String),
73}
74
75impl MemoryError {
76    /// Wrap an arbitrary error from a backend implementation.
77    pub fn backend<E>(source: E) -> Self
78    where
79        E: Into<MemoryBackendError>,
80    {
81        Self::Backend(source.into())
82    }
83}
84
85/// A persistent conversation history backend.
86///
87/// Implementors store an ordered list of [`Message`]s per `conversation_id`. Rig
88/// invokes [`ConversationMemory::load`] before sending a prompt and
89/// [`ConversationMemory::append`] after a successful turn.
90///
91/// Implementations should keep `append` cheap; it runs inline before the agent
92/// returns its response.
93pub trait ConversationMemory: WasmCompatSend + WasmCompatSync {
94    /// Load the full conversation history for `conversation_id`.
95    ///
96    /// Returns an empty `Vec` if the conversation has no stored messages.
97    fn load<'a>(
98        &'a self,
99        conversation_id: &'a str,
100    ) -> WasmBoxedFuture<'a, Result<Vec<Message>, MemoryError>>;
101
102    /// Append `messages` to the conversation identified by `conversation_id`.
103    ///
104    /// Called after a successful agent turn with the user prompt, the assistant
105    /// response, and any tool-call/tool-result pairs that occurred during the turn.
106    fn append<'a>(
107        &'a self,
108        conversation_id: &'a str,
109        messages: Vec<Message>,
110    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>>;
111
112    /// Remove all stored messages for `conversation_id`.
113    fn clear<'a>(
114        &'a self,
115        conversation_id: &'a str,
116    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>>;
117}
118
119impl<M> ConversationMemory for Arc<M>
120where
121    M: ConversationMemory + ?Sized,
122{
123    fn load<'a>(
124        &'a self,
125        conversation_id: &'a str,
126    ) -> WasmBoxedFuture<'a, Result<Vec<Message>, MemoryError>> {
127        (**self).load(conversation_id)
128    }
129
130    fn append<'a>(
131        &'a self,
132        conversation_id: &'a str,
133        messages: Vec<Message>,
134    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
135        (**self).append(conversation_id, messages)
136    }
137
138    fn clear<'a>(
139        &'a self,
140        conversation_id: &'a str,
141    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
142        (**self).clear(conversation_id)
143    }
144}
145
146impl<M> ConversationMemory for Box<M>
147where
148    M: ConversationMemory + ?Sized,
149{
150    fn load<'a>(
151        &'a self,
152        conversation_id: &'a str,
153    ) -> WasmBoxedFuture<'a, Result<Vec<Message>, MemoryError>> {
154        (**self).load(conversation_id)
155    }
156
157    fn append<'a>(
158        &'a self,
159        conversation_id: &'a str,
160        messages: Vec<Message>,
161    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
162        (**self).append(conversation_id, messages)
163    }
164
165    fn clear<'a>(
166        &'a self,
167        conversation_id: &'a str,
168    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
169        (**self).clear(conversation_id)
170    }
171}
172
173/// A history-shaping closure applied during [`InMemoryConversationMemory::load`].
174///
175/// Implemented automatically for any closure with the right signature; the
176/// trait exists to combine `Fn` with the WASM-compatible `Send`/`Sync` markers
177/// in a single trait object.
178pub trait MessageFilter:
179    Fn(Vec<Message>) -> Vec<Message> + WasmCompatSend + WasmCompatSync
180{
181}
182
183impl<F> MessageFilter for F where
184    F: Fn(Vec<Message>) -> Vec<Message> + WasmCompatSend + WasmCompatSync
185{
186}
187
188/// A side-channel for messages that a memory policy or adapter removes from
189/// active history during [`ConversationMemory::load`].
190///
191/// Truncating policies (sliding window, token budget, …) drop older turns
192/// once their limit is exceeded. Without a hook those messages are silently
193/// lost. A [`DemotionHook`] receives the demoted messages and can persist
194/// them into a long-tail store (semantic memory, episodic recall, archival
195/// storage, …), turning truncation into demotion.
196///
197/// The trait is defined here in `rig-core` so that *any* memory backend
198/// (in-memory, vector store, file archive, …) can implement it without
199/// taking on a `rig-memory` dependency. The composing adapter that actually
200/// wires a [`ConversationMemory`] backend, a policy, and a hook together
201/// lives in the `rig-memory` companion crate.
202///
203/// Hooks should be inexpensive: their future is awaited inline on every
204/// `load` that produces demoted messages, so a slow hook delays the agent's
205/// next turn. Offload heavy I/O (network writes, disk fsyncs, …) to a
206/// background task or a buffered channel inside the implementation.
207///
208/// # Idempotency contract
209///
210/// Implementations **must** be idempotent on the
211/// `(conversation_id, messages)` pair. Composing adapters such as the
212/// `DemotingPolicyMemory` in `rig-memory` track in-process delivery
213/// watermarks to avoid replaying the same demotion within a single
214/// process lifetime, but those watermarks are not persisted: across
215/// process restarts (or when a new adapter is constructed over an
216/// existing backend) the hook will receive previously-delivered
217/// messages again. Hooks that append to durable storage should
218/// deduplicate by content hash, by `(conversation_id, message_id)`,
219/// or by an equivalent stable key.
220pub trait DemotionHook: WasmCompatSend + WasmCompatSync {
221    /// Receive `messages` that were demoted out of the active window for
222    /// `conversation_id`.
223    ///
224    /// `messages` are in original conversation order. Errors are propagated
225    /// as [`MemoryError::Backend`] by the composing adapter.
226    fn on_demote<'a>(
227        &'a self,
228        conversation_id: &'a str,
229        messages: Vec<Message>,
230    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>>;
231}
232
233/// A [`DemotionHook`] that does nothing. Useful as a default when an adapter
234/// requires a hook value but the caller has no long-tail store wired up yet.
235#[derive(Debug, Default, Clone, Copy)]
236pub struct NoopDemotionHook;
237
238impl DemotionHook for NoopDemotionHook {
239    fn on_demote<'a>(
240        &'a self,
241        _conversation_id: &'a str,
242        _messages: Vec<Message>,
243    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
244        Box::pin(async move { Ok(()) })
245    }
246}
247
248/// Forwarding impl so callers can pass `Arc<H>` wherever a `DemotionHook`
249/// is expected (e.g. when sharing a single hook between multiple memory
250/// adapters).
251impl<H> DemotionHook for Arc<H>
252where
253    H: DemotionHook + ?Sized,
254{
255    fn on_demote<'a>(
256        &'a self,
257        conversation_id: &'a str,
258        messages: Vec<Message>,
259    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
260        (**self).on_demote(conversation_id, messages)
261    }
262}
263
264/// Derives a single [`Message`]-shaped artifact from a slice of messages
265/// that a memory policy has evicted from the active window.
266///
267/// Where a [`DemotionHook`] is a one-way drain — observe what fell out and
268/// return `()` — a `Compactor` is the inverse: it takes the evicted prefix
269/// (and optionally the previous summary) and produces a derived artifact
270/// that the composing adapter splices *back into* the active history. The
271/// resulting prompt is no longer a verbatim suffix of the conversation; it
272/// is `[summary, ...recent_window]`.
273///
274/// Implementations typically wrap an LLM call (`LlmCompactor<M>`) or a
275/// pure template rollup. They run inline on the load path whenever the
276/// policy demotes new messages, so a slow compactor delays the agent's
277/// next turn — keep them fast or offload to a cached/background pipeline.
278///
279/// # Rolling summaries
280///
281/// `carry_over` is the artifact produced by the previous compaction for
282/// this conversation, if any. Implementations that want a *recursive*
283/// summary (the canonical pattern for long-running agents) should
284/// summarize `evicted` *together with* `carry_over` so context lost in
285/// earlier compactions is preserved transitively. Stateless implementations
286/// can ignore `carry_over` and produce a fresh summary of `evicted` alone.
287///
288/// # Idempotency contract
289///
290/// Composing adapters track per-conversation in-process delivery so the
291/// same `evicted` slice is not compacted twice within a process lifetime,
292/// but those watermarks are not persisted across restarts. Implementations
293/// that have side effects (writing summaries to a vector store, billing an
294/// LLM call) should deduplicate by conversation id and content hash, the
295/// same way [`DemotionHook`] implementations do.
296pub trait Compactor: WasmCompatSend + WasmCompatSync {
297    /// The summary value produced by [`Compactor::compact`].
298    ///
299    /// `Into<Message>` is required so the composing adapter can splice the
300    /// artifact at the front of the loaded history. `Clone` is required so
301    /// the adapter can keep a private copy as `carry_over` for the next
302    /// compaction.
303    type Artifact: Into<Message> + Clone + WasmCompatSend + WasmCompatSync + 'static;
304
305    /// Produce a summary artifact for `evicted`, optionally combining it
306    /// with the previous summary in `carry_over`.
307    ///
308    /// `evicted` is in original conversation order. Errors are propagated
309    /// unchanged by composing adapters; pick the [`MemoryError`] variant
310    /// that best describes the failure ([`MemoryError::Backend`] for I/O
311    /// or remote-LLM faults, [`MemoryError::Internal`] for invariant
312    /// breaks, and so on). The adapter does not re-wrap the returned
313    /// variant.
314    fn compact<'a>(
315        &'a self,
316        conversation_id: &'a str,
317        evicted: &'a [Message],
318        carry_over: Option<&'a Self::Artifact>,
319    ) -> WasmBoxedFuture<'a, Result<Self::Artifact, MemoryError>>;
320}
321
322/// Forwarding impl so callers can pass `Arc<C>` wherever a `Compactor` is
323/// expected (e.g. when sharing a single compactor across adapters).
324impl<C> Compactor for Arc<C>
325where
326    C: Compactor + ?Sized,
327{
328    type Artifact = C::Artifact;
329
330    fn compact<'a>(
331        &'a self,
332        conversation_id: &'a str,
333        evicted: &'a [Message],
334        carry_over: Option<&'a Self::Artifact>,
335    ) -> WasmBoxedFuture<'a, Result<Self::Artifact, MemoryError>> {
336        (**self).compact(conversation_id, evicted, carry_over)
337    }
338}
339
340/// A simple thread-safe in-memory [`ConversationMemory`] backed by a `HashMap`.
341///
342/// Messages are stored in process memory only and lost on restart. Useful for
343/// tests, examples, and short-lived agents. Pass a closure to
344/// [`InMemoryConversationMemory::with_filter`] to apply a history-shaping
345/// transformation on every load (truncation, summarization, re-ordering, etc.).
346/// Reusable named policies live in the `rig-memory` companion crate.
347#[derive(Clone, Default)]
348pub struct InMemoryConversationMemory {
349    inner: Arc<Mutex<HashMap<String, Vec<Message>>>>,
350    filter: Option<Arc<dyn MessageFilter>>,
351}
352
353impl InMemoryConversationMemory {
354    /// Create an empty in-memory store with no filter.
355    pub fn new() -> Self {
356        Self::default()
357    }
358
359    /// Apply `filter` to the loaded message list on every `load`.
360    ///
361    /// The filter runs after raw messages are read from the store and before
362    /// they are returned to the agent. Use it for truncation, summarization, or
363    /// any other shaping. For reusable named policies, depend on `rig-memory`.
364    pub fn with_filter<F>(mut self, filter: F) -> Self
365    where
366        F: MessageFilter + 'static,
367    {
368        self.filter = Some(Arc::new(filter));
369        self
370    }
371
372    fn lock(
373        &self,
374    ) -> Result<std::sync::MutexGuard<'_, HashMap<String, Vec<Message>>>, MemoryError> {
375        self.inner
376            .lock()
377            .map_err(|e| MemoryError::Internal(e.to_string()))
378    }
379}
380
381impl std::fmt::Debug for InMemoryConversationMemory {
382    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383        f.debug_struct("InMemoryConversationMemory")
384            .field("filter", &self.filter.as_ref().map(|_| "<filter>"))
385            .finish()
386    }
387}
388
389impl ConversationMemory for InMemoryConversationMemory {
390    fn load<'a>(
391        &'a self,
392        conversation_id: &'a str,
393    ) -> WasmBoxedFuture<'a, Result<Vec<Message>, MemoryError>> {
394        Box::pin(async move {
395            let messages = {
396                let guard = self.lock()?;
397                guard.get(conversation_id).cloned().unwrap_or_default()
398            };
399            match &self.filter {
400                Some(filter) => Ok(filter(messages)),
401                None => Ok(messages),
402            }
403        })
404    }
405
406    fn append<'a>(
407        &'a self,
408        conversation_id: &'a str,
409        messages: Vec<Message>,
410    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
411        Box::pin(async move {
412            let mut guard = self.lock()?;
413            guard
414                .entry(conversation_id.to_string())
415                .or_default()
416                .extend(messages);
417            Ok(())
418        })
419    }
420
421    fn clear<'a>(
422        &'a self,
423        conversation_id: &'a str,
424    ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
425        Box::pin(async move {
426            let mut guard = self.lock()?;
427            guard.remove(conversation_id);
428            Ok(())
429        })
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use crate::completion::Message;
437
438    fn user(text: &str) -> Message {
439        Message::user(text)
440    }
441
442    fn assistant(text: &str) -> Message {
443        Message::assistant(text)
444    }
445
446    #[tokio::test]
447    async fn round_trip() {
448        let mem = InMemoryConversationMemory::new();
449        assert!(mem.load("c1").await.unwrap().is_empty());
450
451        mem.append("c1", vec![user("hello"), assistant("hi")])
452            .await
453            .unwrap();
454
455        let loaded = mem.load("c1").await.unwrap();
456        assert_eq!(loaded.len(), 2);
457    }
458
459    #[tokio::test]
460    async fn isolation_between_conversations() {
461        let mem = InMemoryConversationMemory::new();
462        mem.append("a", vec![user("hi a")]).await.unwrap();
463        mem.append("b", vec![user("hi b")]).await.unwrap();
464
465        assert_eq!(mem.load("a").await.unwrap().len(), 1);
466        assert_eq!(mem.load("b").await.unwrap().len(), 1);
467    }
468
469    #[tokio::test]
470    async fn clear_removes_history() {
471        let mem = InMemoryConversationMemory::new();
472        mem.append("c", vec![user("x")]).await.unwrap();
473        mem.clear("c").await.unwrap();
474        assert!(mem.load("c").await.unwrap().is_empty());
475    }
476
477    #[tokio::test]
478    async fn with_filter_transforms_loaded_messages() {
479        let mem = InMemoryConversationMemory::new()
480            .with_filter(|msgs: Vec<Message>| msgs.into_iter().rev().take(2).collect());
481
482        mem.append(
483            "c",
484            vec![user("1"), assistant("2"), user("3"), assistant("4")],
485        )
486        .await
487        .unwrap();
488
489        let loaded = mem.load("c").await.unwrap();
490        assert_eq!(loaded.len(), 2, "filter should retain only 2 messages");
491    }
492
493    #[tokio::test]
494    async fn arc_conversation_memory_forwards_to_inner() {
495        let inner = Arc::new(InMemoryConversationMemory::new());
496        let mem: Arc<dyn ConversationMemory> = inner.clone();
497
498        mem.append("c", vec![user("hello")]).await.unwrap();
499
500        assert_eq!(inner.load("c").await.unwrap().len(), 1);
501        mem.clear("c").await.unwrap();
502        assert!(inner.load("c").await.unwrap().is_empty());
503    }
504
505    #[tokio::test]
506    async fn boxed_conversation_memory_forwards_to_inner() {
507        let mem: Box<dyn ConversationMemory> = Box::new(InMemoryConversationMemory::new());
508
509        mem.append("c", vec![user("hello")]).await.unwrap();
510
511        assert_eq!(mem.load("c").await.unwrap().len(), 1);
512        mem.clear("c").await.unwrap();
513        assert!(mem.load("c").await.unwrap().is_empty());
514    }
515}