agent_memory/
bus.rs

1//! Coordinates volatile memory, journal persistence, and vector store indexing.
2
3use std::sync::Arc;
4
5use serde_json::Value;
6
7use crate::journal::Journal;
8use crate::record::MemoryRecord;
9use crate::vector_store_api::{VectorMatch, VectorPoint, VectorQuery, VectorStoreClient};
10use crate::volatile::{VolatileConfig, VolatileMemory, VolatileStats};
11use crate::{MemoryError, MemoryResult};
12
13/// Builder for [`MemoryBus`] instances.
14pub struct MemoryBusBuilder {
15    volatile_config: VolatileConfig,
16    journal: Option<Arc<dyn Journal>>,
17    vector_store: Option<Arc<dyn VectorStoreClient>>,
18}
19
20impl MemoryBusBuilder {
21    /// Starts a new builder using the supplied volatile config.
22    #[must_use]
23    pub fn new(volatile_config: VolatileConfig) -> Self {
24        Self {
25            volatile_config,
26            journal: None,
27            vector_store: None,
28        }
29    }
30
31    /// Installs the journal implementation. This is required before calling [`build`](Self::build).
32    #[must_use]
33    pub fn with_journal(mut self, journal: Arc<dyn Journal>) -> Self {
34        self.journal = Some(journal);
35        self
36    }
37
38    /// Installs an optional vector store client.
39    #[must_use]
40    pub fn with_vector_store(mut self, store: Arc<dyn VectorStoreClient>) -> Self {
41        self.vector_store = Some(store);
42        self
43    }
44
45    /// Builds the [`MemoryBus`].
46    ///
47    /// # Errors
48    ///
49    /// Returns [`MemoryError::MissingJournal`] when no journal was provided.
50    pub fn build(self) -> MemoryResult<MemoryBus> {
51        let journal = self.journal.ok_or(MemoryError::MissingJournal)?;
52        Ok(MemoryBus {
53            volatile: Arc::new(VolatileMemory::new(self.volatile_config)),
54            journal,
55            vector_store: self.vector_store,
56        })
57    }
58}
59
60/// Central memory facade used by the runtime.
61#[derive(Clone)]
62pub struct MemoryBus {
63    volatile: Arc<VolatileMemory>,
64    journal: Arc<dyn Journal>,
65    vector_store: Option<Arc<dyn VectorStoreClient>>,
66}
67
68impl MemoryBus {
69    /// Creates a builder for a memory bus.
70    #[must_use]
71    pub fn builder(config: VolatileConfig) -> MemoryBusBuilder {
72        MemoryBusBuilder::new(config)
73    }
74
75    /// Returns the underlying volatile store.
76    #[must_use]
77    pub fn volatile(&self) -> &Arc<VolatileMemory> {
78        &self.volatile
79    }
80
81    /// Returns the configured journal.
82    #[must_use]
83    pub fn journal(&self) -> &Arc<dyn Journal> {
84        &self.journal
85    }
86
87    /// Returns the configured vector store, if present.
88    #[must_use]
89    pub fn vector_store(&self) -> Option<&Arc<dyn VectorStoreClient>> {
90        self.vector_store.as_ref()
91    }
92
93    /// Persists a record across all configured stores.
94    ///
95    /// # Errors
96    ///
97    /// Returns [`MemoryError`] when writing to the journal or vector store
98    /// fails.
99    pub async fn record(&self, record: MemoryRecord) -> MemoryResult<()> {
100        self.volatile.push(record.clone()).await;
101        self.journal.append(&record).await?;
102
103        if let (Some(store), Some(embedding)) = (&self.vector_store, record.embedding().cloned()) {
104            let metadata = if record.metadata().is_empty() {
105                Value::Null
106            } else {
107                Value::Object(record.metadata().clone())
108            };
109
110            let point = VectorPoint::new(record.id(), embedding)
111                .with_metadata(metadata)
112                .with_tags(record.tags().to_vec());
113            store.upsert(point).await?;
114        }
115
116        Ok(())
117    }
118
119    /// Returns recent records from volatile memory.
120    #[must_use]
121    pub async fn recent(&self, limit: usize) -> Vec<MemoryRecord> {
122        self.volatile.recent(limit).await
123    }
124
125    /// Reads the tail of the journal.
126    ///
127    /// # Errors
128    ///
129    /// Returns [`MemoryError`] when reading or decoding entries from the
130    /// journal fails.
131    pub async fn journal_tail(&self, limit: usize) -> MemoryResult<Vec<MemoryRecord>> {
132        self.journal.tail(limit).await
133    }
134
135    /// Queries the configured vector store.
136    ///
137    /// # Errors
138    ///
139    /// Returns [`MemoryError::MissingVectorStore`] when the bus was not
140    /// initialised with a vector store implementation.
141    pub async fn recall(&self, query: VectorQuery) -> MemoryResult<Vec<VectorMatch>> {
142        let store = self
143            .vector_store
144            .as_ref()
145            .ok_or(MemoryError::MissingVectorStore)?;
146        store.query(query).await
147    }
148
149    /// Returns utilisation statistics for the volatile store.
150    #[must_use]
151    pub async fn stats(&self) -> VolatileStats {
152        self.volatile.stats().await
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use bytes::Bytes;
160    use std::num::NonZeroUsize;
161
162    use crate::journal::FileJournal;
163    use crate::record::MemoryChannel;
164    use crate::vector_store_api::LocalVectorStore;
165
166    fn temp_path() -> std::path::PathBuf {
167        let mut path = std::env::temp_dir();
168        path.push(format!("memory-bus-{}.log", uuid::Uuid::new_v4()));
169        path
170    }
171
172    #[tokio::test]
173    async fn records_to_all_components() {
174        let path = temp_path();
175        let journal: Arc<dyn crate::journal::Journal> =
176            Arc::new(FileJournal::open(&path).await.unwrap());
177        let vector_store: Arc<dyn crate::vector_store_api::VectorStoreClient> =
178            Arc::new(LocalVectorStore::new());
179
180        let bus = MemoryBus::builder(VolatileConfig::new(NonZeroUsize::new(8).unwrap()))
181            .with_journal(journal.clone())
182            .with_vector_store(vector_store.clone())
183            .build()
184            .unwrap();
185
186        let record = MemoryRecord::builder(MemoryChannel::Input, Bytes::from_static(b"hello"))
187            .tag("mxp")
188            .unwrap()
189            .build()
190            .unwrap();
191
192        bus.record(record.clone()).await.unwrap();
193
194        let recent = bus.recent(1).await;
195        assert_eq!(recent.len(), 1);
196        assert_eq!(recent[0].payload(), &Bytes::from_static(b"hello"));
197
198        let journal_tail = bus.journal_tail(1).await.unwrap();
199        assert_eq!(journal_tail.len(), 1);
200
201        // Without an embedding the vector store should remain empty.
202        let matches = bus
203            .recall(VectorQuery::new(
204                crate::embeddings::EmbeddingVector::new(vec![1.0]).unwrap(),
205                NonZeroUsize::new(1).unwrap(),
206            ))
207            .await
208            .unwrap();
209        assert!(matches.is_empty());
210
211        if path.exists() {
212            let _ = std::fs::remove_file(path);
213        }
214    }
215
216    #[tokio::test]
217    async fn missing_vector_store_errors() {
218        let path = temp_path();
219        let journal: Arc<dyn crate::journal::Journal> =
220            Arc::new(FileJournal::open(&path).await.unwrap());
221        let bus = MemoryBus::builder(VolatileConfig::default())
222            .with_journal(journal.clone())
223            .build()
224            .unwrap();
225
226        let err = bus
227            .recall(VectorQuery::new(
228                crate::embeddings::EmbeddingVector::new(vec![1.0]).unwrap(),
229                NonZeroUsize::new(1).unwrap(),
230            ))
231            .await
232            .expect_err("missing vector store should error");
233        assert!(matches!(err, MemoryError::MissingVectorStore));
234
235        if path.exists() {
236            let _ = std::fs::remove_file(path);
237        }
238    }
239}