rig_core/memory.rs
1//! Conversation memory: Rig-managed persistent conversation history for agents.
2//!
3//! Memory differs from existing agent context features:
4//! - [`crate::agent::AgentBuilder::context`]: static documents always included in prompts.
5//! - [`crate::agent::AgentBuilder::dynamic_context`]: RAG documents fetched from a vector store.
6//! - [`crate::agent::prompt_request::PromptRequest::with_history`]: caller-managed message history.
7//! - **Memory** (this module): Rig-managed history loaded and saved automatically per
8//! conversation id.
9//!
10//! # Example
11//!
12//! ```no_run
13//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
14//! use rig_core::client::{CompletionClient, ProviderClient};
15//! use rig_core::completion::Prompt;
16//! use rig_core::memory::InMemoryConversationMemory;
17//! use rig_core::providers::openai;
18//!
19//! let memory = InMemoryConversationMemory::new();
20//!
21//! let openai = openai::Client::from_env()?;
22//! let agent = openai.agent("gpt-4o").memory(memory).build();
23//!
24//! agent.prompt("My name is Alice.")
25//! .conversation("thread-1")
26//! .await?;
27//!
28//! let answer = agent.prompt("What's my name?")
29//! .conversation("thread-1")
30//! .await?;
31//! # Ok(()) }
32//! ```
33//!
34//! Truncation, summarization, and other history-shaping policies live in the
35//! `rig-memory` companion crate. To shape history inside the in-tree backend,
36//! pass a closure to [`InMemoryConversationMemory::with_filter`].
37
38use std::{
39 collections::HashMap,
40 sync::{Arc, Mutex},
41};
42
43use crate::{
44 completion::Message,
45 wasm_compat::{WasmBoxedFuture, WasmCompatSend, WasmCompatSync},
46};
47
48/// Boxed error source for memory backend failures.
49#[cfg(not(target_family = "wasm"))]
50pub type MemoryBackendError = Box<dyn std::error::Error + Send + Sync + 'static>;
51
52/// Boxed error source for memory backend failures.
53#[cfg(target_family = "wasm")]
54pub type MemoryBackendError = Box<dyn std::error::Error + 'static>;
55
56/// Errors produced by a [`ConversationMemory`] backend.
57#[derive(Debug, thiserror::Error)]
58#[non_exhaustive]
59pub enum MemoryError {
60 /// The backing store failed to load, append, or clear messages.
61 #[error("Memory backend error: {0}")]
62 Backend(MemoryBackendError),
63
64 /// A history-shaping filter or policy rejected the loaded history.
65 #[error("Memory policy error: {0}")]
66 Policy(String),
67
68 /// An internal invariant was violated (e.g. a poisoned in-process lock).
69 /// Distinct from [`MemoryError::Backend`], which is reserved for failures
70 /// of the underlying conversation store.
71 #[error("Memory internal error: {0}")]
72 Internal(String),
73}
74
75impl MemoryError {
76 /// Wrap an arbitrary error from a backend implementation.
77 pub fn backend<E>(source: E) -> Self
78 where
79 E: Into<MemoryBackendError>,
80 {
81 Self::Backend(source.into())
82 }
83}
84
85/// A persistent conversation history backend.
86///
87/// Implementors store an ordered list of [`Message`]s per `conversation_id`. Rig
88/// invokes [`ConversationMemory::load`] before sending a prompt and
89/// [`ConversationMemory::append`] after a successful turn.
90///
91/// Implementations should keep `append` cheap; it runs inline before the agent
92/// returns its response.
93pub trait ConversationMemory: WasmCompatSend + WasmCompatSync {
94 /// Load the full conversation history for `conversation_id`.
95 ///
96 /// Returns an empty `Vec` if the conversation has no stored messages.
97 fn load<'a>(
98 &'a self,
99 conversation_id: &'a str,
100 ) -> WasmBoxedFuture<'a, Result<Vec<Message>, MemoryError>>;
101
102 /// Append `messages` to the conversation identified by `conversation_id`.
103 ///
104 /// Called after a successful agent turn with the user prompt, the assistant
105 /// response, and any tool-call/tool-result pairs that occurred during the turn.
106 fn append<'a>(
107 &'a self,
108 conversation_id: &'a str,
109 messages: Vec<Message>,
110 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>>;
111
112 /// Remove all stored messages for `conversation_id`.
113 fn clear<'a>(
114 &'a self,
115 conversation_id: &'a str,
116 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>>;
117}
118
119impl<M> ConversationMemory for Arc<M>
120where
121 M: ConversationMemory + ?Sized,
122{
123 fn load<'a>(
124 &'a self,
125 conversation_id: &'a str,
126 ) -> WasmBoxedFuture<'a, Result<Vec<Message>, MemoryError>> {
127 (**self).load(conversation_id)
128 }
129
130 fn append<'a>(
131 &'a self,
132 conversation_id: &'a str,
133 messages: Vec<Message>,
134 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
135 (**self).append(conversation_id, messages)
136 }
137
138 fn clear<'a>(
139 &'a self,
140 conversation_id: &'a str,
141 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
142 (**self).clear(conversation_id)
143 }
144}
145
146impl<M> ConversationMemory for Box<M>
147where
148 M: ConversationMemory + ?Sized,
149{
150 fn load<'a>(
151 &'a self,
152 conversation_id: &'a str,
153 ) -> WasmBoxedFuture<'a, Result<Vec<Message>, MemoryError>> {
154 (**self).load(conversation_id)
155 }
156
157 fn append<'a>(
158 &'a self,
159 conversation_id: &'a str,
160 messages: Vec<Message>,
161 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
162 (**self).append(conversation_id, messages)
163 }
164
165 fn clear<'a>(
166 &'a self,
167 conversation_id: &'a str,
168 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
169 (**self).clear(conversation_id)
170 }
171}
172
173/// A history-shaping closure applied during [`InMemoryConversationMemory::load`].
174///
175/// Implemented automatically for any closure with the right signature; the
176/// trait exists to combine `Fn` with the WASM-compatible `Send`/`Sync` markers
177/// in a single trait object.
178pub trait MessageFilter:
179 Fn(Vec<Message>) -> Vec<Message> + WasmCompatSend + WasmCompatSync
180{
181}
182
183impl<F> MessageFilter for F where
184 F: Fn(Vec<Message>) -> Vec<Message> + WasmCompatSend + WasmCompatSync
185{
186}
187
188/// A side-channel for messages that a memory policy or adapter removes from
189/// active history during [`ConversationMemory::load`].
190///
191/// Truncating policies (sliding window, token budget, …) drop older turns
192/// once their limit is exceeded. Without a hook those messages are silently
193/// lost. A [`DemotionHook`] receives the demoted messages and can persist
194/// them into a long-tail store (semantic memory, episodic recall, archival
195/// storage, …), turning truncation into demotion.
196///
197/// The trait is defined here in `rig-core` so that *any* memory backend
198/// (in-memory, vector store, file archive, …) can implement it without
199/// taking on a `rig-memory` dependency. The composing adapter that actually
200/// wires a [`ConversationMemory`] backend, a policy, and a hook together
201/// lives in the `rig-memory` companion crate.
202///
203/// Hooks should be inexpensive: their future is awaited inline on every
204/// `load` that produces demoted messages, so a slow hook delays the agent's
205/// next turn. Offload heavy I/O (network writes, disk fsyncs, …) to a
206/// background task or a buffered channel inside the implementation.
207///
208/// # Idempotency contract
209///
210/// Implementations **must** be idempotent on the
211/// `(conversation_id, messages)` pair. Composing adapters such as the
212/// `DemotingPolicyMemory` in `rig-memory` track in-process delivery
213/// watermarks to avoid replaying the same demotion within a single
214/// process lifetime, but those watermarks are not persisted: across
215/// process restarts (or when a new adapter is constructed over an
216/// existing backend) the hook will receive previously-delivered
217/// messages again. Hooks that append to durable storage should
218/// deduplicate by content hash, by `(conversation_id, message_id)`,
219/// or by an equivalent stable key.
220pub trait DemotionHook: WasmCompatSend + WasmCompatSync {
221 /// Receive `messages` that were demoted out of the active window for
222 /// `conversation_id`.
223 ///
224 /// `messages` are in original conversation order. Errors are propagated
225 /// as [`MemoryError::Backend`] by the composing adapter.
226 fn on_demote<'a>(
227 &'a self,
228 conversation_id: &'a str,
229 messages: Vec<Message>,
230 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>>;
231}
232
233/// A [`DemotionHook`] that does nothing. Useful as a default when an adapter
234/// requires a hook value but the caller has no long-tail store wired up yet.
235#[derive(Debug, Default, Clone, Copy)]
236pub struct NoopDemotionHook;
237
238impl DemotionHook for NoopDemotionHook {
239 fn on_demote<'a>(
240 &'a self,
241 _conversation_id: &'a str,
242 _messages: Vec<Message>,
243 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
244 Box::pin(async move { Ok(()) })
245 }
246}
247
248/// Forwarding impl so callers can pass `Arc<H>` wherever a `DemotionHook`
249/// is expected (e.g. when sharing a single hook between multiple memory
250/// adapters).
251impl<H> DemotionHook for Arc<H>
252where
253 H: DemotionHook + ?Sized,
254{
255 fn on_demote<'a>(
256 &'a self,
257 conversation_id: &'a str,
258 messages: Vec<Message>,
259 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
260 (**self).on_demote(conversation_id, messages)
261 }
262}
263
264/// Derives a single [`Message`]-shaped artifact from a slice of messages
265/// that a memory policy has evicted from the active window.
266///
267/// Where a [`DemotionHook`] is a one-way drain — observe what fell out and
268/// return `()` — a `Compactor` is the inverse: it takes the evicted prefix
269/// (and optionally the previous summary) and produces a derived artifact
270/// that the composing adapter splices *back into* the active history. The
271/// resulting prompt is no longer a verbatim suffix of the conversation; it
272/// is `[summary, ...recent_window]`.
273///
274/// Implementations typically wrap an LLM call (`LlmCompactor<M>`) or a
275/// pure template rollup. They run inline on the load path whenever the
276/// policy demotes new messages, so a slow compactor delays the agent's
277/// next turn — keep them fast or offload to a cached/background pipeline.
278///
279/// # Rolling summaries
280///
281/// `carry_over` is the artifact produced by the previous compaction for
282/// this conversation, if any. Implementations that want a *recursive*
283/// summary (the canonical pattern for long-running agents) should
284/// summarize `evicted` *together with* `carry_over` so context lost in
285/// earlier compactions is preserved transitively. Stateless implementations
286/// can ignore `carry_over` and produce a fresh summary of `evicted` alone.
287///
288/// # Idempotency contract
289///
290/// Composing adapters track per-conversation in-process delivery so the
291/// same `evicted` slice is not compacted twice within a process lifetime,
292/// but those watermarks are not persisted across restarts. Implementations
293/// that have side effects (writing summaries to a vector store, billing an
294/// LLM call) should deduplicate by conversation id and content hash, the
295/// same way [`DemotionHook`] implementations do.
296pub trait Compactor: WasmCompatSend + WasmCompatSync {
297 /// The summary value produced by [`Compactor::compact`].
298 ///
299 /// `Into<Message>` is required so the composing adapter can splice the
300 /// artifact at the front of the loaded history. `Clone` is required so
301 /// the adapter can keep a private copy as `carry_over` for the next
302 /// compaction.
303 type Artifact: Into<Message> + Clone + WasmCompatSend + WasmCompatSync + 'static;
304
305 /// Produce a summary artifact for `evicted`, optionally combining it
306 /// with the previous summary in `carry_over`.
307 ///
308 /// `evicted` is in original conversation order. Errors are propagated
309 /// unchanged by composing adapters; pick the [`MemoryError`] variant
310 /// that best describes the failure ([`MemoryError::Backend`] for I/O
311 /// or remote-LLM faults, [`MemoryError::Internal`] for invariant
312 /// breaks, and so on). The adapter does not re-wrap the returned
313 /// variant.
314 fn compact<'a>(
315 &'a self,
316 conversation_id: &'a str,
317 evicted: &'a [Message],
318 carry_over: Option<&'a Self::Artifact>,
319 ) -> WasmBoxedFuture<'a, Result<Self::Artifact, MemoryError>>;
320}
321
322/// Forwarding impl so callers can pass `Arc<C>` wherever a `Compactor` is
323/// expected (e.g. when sharing a single compactor across adapters).
324impl<C> Compactor for Arc<C>
325where
326 C: Compactor + ?Sized,
327{
328 type Artifact = C::Artifact;
329
330 fn compact<'a>(
331 &'a self,
332 conversation_id: &'a str,
333 evicted: &'a [Message],
334 carry_over: Option<&'a Self::Artifact>,
335 ) -> WasmBoxedFuture<'a, Result<Self::Artifact, MemoryError>> {
336 (**self).compact(conversation_id, evicted, carry_over)
337 }
338}
339
340/// A simple thread-safe in-memory [`ConversationMemory`] backed by a `HashMap`.
341///
342/// Messages are stored in process memory only and lost on restart. Useful for
343/// tests, examples, and short-lived agents. Pass a closure to
344/// [`InMemoryConversationMemory::with_filter`] to apply a history-shaping
345/// transformation on every load (truncation, summarization, re-ordering, etc.).
346/// Reusable named policies live in the `rig-memory` companion crate.
347#[derive(Clone, Default)]
348pub struct InMemoryConversationMemory {
349 inner: Arc<Mutex<HashMap<String, Vec<Message>>>>,
350 filter: Option<Arc<dyn MessageFilter>>,
351}
352
353impl InMemoryConversationMemory {
354 /// Create an empty in-memory store with no filter.
355 pub fn new() -> Self {
356 Self::default()
357 }
358
359 /// Apply `filter` to the loaded message list on every `load`.
360 ///
361 /// The filter runs after raw messages are read from the store and before
362 /// they are returned to the agent. Use it for truncation, summarization, or
363 /// any other shaping. For reusable named policies, depend on `rig-memory`.
364 pub fn with_filter<F>(mut self, filter: F) -> Self
365 where
366 F: MessageFilter + 'static,
367 {
368 self.filter = Some(Arc::new(filter));
369 self
370 }
371
372 fn lock(
373 &self,
374 ) -> Result<std::sync::MutexGuard<'_, HashMap<String, Vec<Message>>>, MemoryError> {
375 self.inner
376 .lock()
377 .map_err(|e| MemoryError::Internal(e.to_string()))
378 }
379}
380
381impl std::fmt::Debug for InMemoryConversationMemory {
382 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383 f.debug_struct("InMemoryConversationMemory")
384 .field("filter", &self.filter.as_ref().map(|_| "<filter>"))
385 .finish()
386 }
387}
388
389impl ConversationMemory for InMemoryConversationMemory {
390 fn load<'a>(
391 &'a self,
392 conversation_id: &'a str,
393 ) -> WasmBoxedFuture<'a, Result<Vec<Message>, MemoryError>> {
394 Box::pin(async move {
395 let messages = {
396 let guard = self.lock()?;
397 guard.get(conversation_id).cloned().unwrap_or_default()
398 };
399 match &self.filter {
400 Some(filter) => Ok(filter(messages)),
401 None => Ok(messages),
402 }
403 })
404 }
405
406 fn append<'a>(
407 &'a self,
408 conversation_id: &'a str,
409 messages: Vec<Message>,
410 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
411 Box::pin(async move {
412 let mut guard = self.lock()?;
413 guard
414 .entry(conversation_id.to_string())
415 .or_default()
416 .extend(messages);
417 Ok(())
418 })
419 }
420
421 fn clear<'a>(
422 &'a self,
423 conversation_id: &'a str,
424 ) -> WasmBoxedFuture<'a, Result<(), MemoryError>> {
425 Box::pin(async move {
426 let mut guard = self.lock()?;
427 guard.remove(conversation_id);
428 Ok(())
429 })
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use crate::completion::Message;
437
438 fn user(text: &str) -> Message {
439 Message::user(text)
440 }
441
442 fn assistant(text: &str) -> Message {
443 Message::assistant(text)
444 }
445
446 #[tokio::test]
447 async fn round_trip() {
448 let mem = InMemoryConversationMemory::new();
449 assert!(mem.load("c1").await.unwrap().is_empty());
450
451 mem.append("c1", vec![user("hello"), assistant("hi")])
452 .await
453 .unwrap();
454
455 let loaded = mem.load("c1").await.unwrap();
456 assert_eq!(loaded.len(), 2);
457 }
458
459 #[tokio::test]
460 async fn isolation_between_conversations() {
461 let mem = InMemoryConversationMemory::new();
462 mem.append("a", vec![user("hi a")]).await.unwrap();
463 mem.append("b", vec![user("hi b")]).await.unwrap();
464
465 assert_eq!(mem.load("a").await.unwrap().len(), 1);
466 assert_eq!(mem.load("b").await.unwrap().len(), 1);
467 }
468
469 #[tokio::test]
470 async fn clear_removes_history() {
471 let mem = InMemoryConversationMemory::new();
472 mem.append("c", vec![user("x")]).await.unwrap();
473 mem.clear("c").await.unwrap();
474 assert!(mem.load("c").await.unwrap().is_empty());
475 }
476
477 #[tokio::test]
478 async fn with_filter_transforms_loaded_messages() {
479 let mem = InMemoryConversationMemory::new()
480 .with_filter(|msgs: Vec<Message>| msgs.into_iter().rev().take(2).collect());
481
482 mem.append(
483 "c",
484 vec![user("1"), assistant("2"), user("3"), assistant("4")],
485 )
486 .await
487 .unwrap();
488
489 let loaded = mem.load("c").await.unwrap();
490 assert_eq!(loaded.len(), 2, "filter should retain only 2 messages");
491 }
492
493 #[tokio::test]
494 async fn arc_conversation_memory_forwards_to_inner() {
495 let inner = Arc::new(InMemoryConversationMemory::new());
496 let mem: Arc<dyn ConversationMemory> = inner.clone();
497
498 mem.append("c", vec![user("hello")]).await.unwrap();
499
500 assert_eq!(inner.load("c").await.unwrap().len(), 1);
501 mem.clear("c").await.unwrap();
502 assert!(inner.load("c").await.unwrap().is_empty());
503 }
504
505 #[tokio::test]
506 async fn boxed_conversation_memory_forwards_to_inner() {
507 let mem: Box<dyn ConversationMemory> = Box::new(InMemoryConversationMemory::new());
508
509 mem.append("c", vec![user("hello")]).await.unwrap();
510
511 assert_eq!(mem.load("c").await.unwrap().len(), 1);
512 mem.clear("c").await.unwrap();
513 assert!(mem.load("c").await.unwrap().is_empty());
514 }
515}