1use std::sync::Arc;
4
5use serde_json::Value;
6
7use crate::journal::Journal;
8use crate::record::MemoryRecord;
9use crate::vector_store_api::{VectorMatch, VectorPoint, VectorQuery, VectorStoreClient};
10use crate::volatile::{VolatileConfig, VolatileMemory, VolatileStats};
11use crate::{MemoryError, MemoryResult};
12
13pub struct MemoryBusBuilder {
15 volatile_config: VolatileConfig,
16 journal: Option<Arc<dyn Journal>>,
17 vector_store: Option<Arc<dyn VectorStoreClient>>,
18}
19
20impl MemoryBusBuilder {
21 #[must_use]
23 pub fn new(volatile_config: VolatileConfig) -> Self {
24 Self {
25 volatile_config,
26 journal: None,
27 vector_store: None,
28 }
29 }
30
31 #[must_use]
33 pub fn with_journal(mut self, journal: Arc<dyn Journal>) -> Self {
34 self.journal = Some(journal);
35 self
36 }
37
38 #[must_use]
40 pub fn with_vector_store(mut self, store: Arc<dyn VectorStoreClient>) -> Self {
41 self.vector_store = Some(store);
42 self
43 }
44
45 pub fn build(self) -> MemoryResult<MemoryBus> {
51 let journal = self.journal.ok_or(MemoryError::MissingJournal)?;
52 Ok(MemoryBus {
53 volatile: Arc::new(VolatileMemory::new(self.volatile_config)),
54 journal,
55 vector_store: self.vector_store,
56 })
57 }
58}
59
60#[derive(Clone)]
62pub struct MemoryBus {
63 volatile: Arc<VolatileMemory>,
64 journal: Arc<dyn Journal>,
65 vector_store: Option<Arc<dyn VectorStoreClient>>,
66}
67
68impl MemoryBus {
69 #[must_use]
71 pub fn builder(config: VolatileConfig) -> MemoryBusBuilder {
72 MemoryBusBuilder::new(config)
73 }
74
75 #[must_use]
77 pub fn volatile(&self) -> &Arc<VolatileMemory> {
78 &self.volatile
79 }
80
81 #[must_use]
83 pub fn journal(&self) -> &Arc<dyn Journal> {
84 &self.journal
85 }
86
87 #[must_use]
89 pub fn vector_store(&self) -> Option<&Arc<dyn VectorStoreClient>> {
90 self.vector_store.as_ref()
91 }
92
93 pub async fn record(&self, record: MemoryRecord) -> MemoryResult<()> {
100 self.volatile.push(record.clone()).await;
101 self.journal.append(&record).await?;
102
103 if let (Some(store), Some(embedding)) = (&self.vector_store, record.embedding().cloned()) {
104 let metadata = if record.metadata().is_empty() {
105 Value::Null
106 } else {
107 Value::Object(record.metadata().clone())
108 };
109
110 let point = VectorPoint::new(record.id(), embedding)
111 .with_metadata(metadata)
112 .with_tags(record.tags().to_vec());
113 store.upsert(point).await?;
114 }
115
116 Ok(())
117 }
118
119 #[must_use]
121 pub async fn recent(&self, limit: usize) -> Vec<MemoryRecord> {
122 self.volatile.recent(limit).await
123 }
124
125 pub async fn journal_tail(&self, limit: usize) -> MemoryResult<Vec<MemoryRecord>> {
132 self.journal.tail(limit).await
133 }
134
135 pub async fn recall(&self, query: VectorQuery) -> MemoryResult<Vec<VectorMatch>> {
142 let store = self
143 .vector_store
144 .as_ref()
145 .ok_or(MemoryError::MissingVectorStore)?;
146 store.query(query).await
147 }
148
149 #[must_use]
151 pub async fn stats(&self) -> VolatileStats {
152 self.volatile.stats().await
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use bytes::Bytes;
160 use std::num::NonZeroUsize;
161
162 use crate::journal::FileJournal;
163 use crate::record::MemoryChannel;
164 use crate::vector_store_api::LocalVectorStore;
165
166 fn temp_path() -> std::path::PathBuf {
167 let mut path = std::env::temp_dir();
168 path.push(format!("memory-bus-{}.log", uuid::Uuid::new_v4()));
169 path
170 }
171
172 #[tokio::test]
173 async fn records_to_all_components() {
174 let path = temp_path();
175 let journal: Arc<dyn crate::journal::Journal> =
176 Arc::new(FileJournal::open(&path).await.unwrap());
177 let vector_store: Arc<dyn crate::vector_store_api::VectorStoreClient> =
178 Arc::new(LocalVectorStore::new());
179
180 let bus = MemoryBus::builder(VolatileConfig::new(NonZeroUsize::new(8).unwrap()))
181 .with_journal(journal.clone())
182 .with_vector_store(vector_store.clone())
183 .build()
184 .unwrap();
185
186 let record = MemoryRecord::builder(MemoryChannel::Input, Bytes::from_static(b"hello"))
187 .tag("mxp")
188 .unwrap()
189 .build()
190 .unwrap();
191
192 bus.record(record.clone()).await.unwrap();
193
194 let recent = bus.recent(1).await;
195 assert_eq!(recent.len(), 1);
196 assert_eq!(recent[0].payload(), &Bytes::from_static(b"hello"));
197
198 let journal_tail = bus.journal_tail(1).await.unwrap();
199 assert_eq!(journal_tail.len(), 1);
200
201 let matches = bus
203 .recall(VectorQuery::new(
204 crate::embeddings::EmbeddingVector::new(vec![1.0]).unwrap(),
205 NonZeroUsize::new(1).unwrap(),
206 ))
207 .await
208 .unwrap();
209 assert!(matches.is_empty());
210
211 if path.exists() {
212 let _ = std::fs::remove_file(path);
213 }
214 }
215
216 #[tokio::test]
217 async fn missing_vector_store_errors() {
218 let path = temp_path();
219 let journal: Arc<dyn crate::journal::Journal> =
220 Arc::new(FileJournal::open(&path).await.unwrap());
221 let bus = MemoryBus::builder(VolatileConfig::default())
222 .with_journal(journal.clone())
223 .build()
224 .unwrap();
225
226 let err = bus
227 .recall(VectorQuery::new(
228 crate::embeddings::EmbeddingVector::new(vec![1.0]).unwrap(),
229 NonZeroUsize::new(1).unwrap(),
230 ))
231 .await
232 .expect_err("missing vector store should error");
233 assert!(matches!(err, MemoryError::MissingVectorStore));
234
235 if path.exists() {
236 let _ = std::fs::remove_file(path);
237 }
238 }
239}