1use crate::{EPISODES_TABLE, METADATA_TABLE, RedbStorage, SUMMARIES_TABLE};
4use do_memory_core::episodic::CapacityManager;
5use do_memory_core::semantic::EpisodeSummary;
6use do_memory_core::{Episode, Error, Result};
7use redb::{ReadableDatabase, ReadableTable, ReadableTableMetadata};
8use std::sync::Arc;
9use tracing::{debug, info, warn};
10use uuid::Uuid;
11
12impl RedbStorage {
13 pub async fn store_metadata(&self, key: &str, value: &str) -> Result<()> {
15 debug!("Storing metadata: {} = {}", key, value);
16 let db = Arc::clone(&self.db);
17 let key_str = key.to_string();
18 let value_bytes = value.as_bytes().to_vec();
19
20 tokio::task::spawn_blocking(move || {
21 let write_txn = db
22 .begin_write()
23 .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
24
25 {
26 let mut table = write_txn
27 .open_table(METADATA_TABLE)
28 .map_err(|e| Error::Storage(format!("Failed to open metadata table: {}", e)))?;
29
30 table
31 .insert(key_str.as_str(), value_bytes.as_slice())
32 .map_err(|e| Error::Storage(format!("Failed to insert metadata: {}", e)))?;
33 }
34
35 write_txn
36 .commit()
37 .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
38
39 Ok::<(), Error>(())
40 })
41 .await
42 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
43
44 Ok(())
45 }
46
47 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>> {
49 debug!("Retrieving metadata: {}", key);
50 let db = Arc::clone(&self.db);
51 let key_str = key.to_string();
52
53 tokio::task::spawn_blocking(move || {
54 let read_txn = db
55 .begin_read()
56 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
57
58 let table = read_txn
59 .open_table(METADATA_TABLE)
60 .map_err(|e| Error::Storage(format!("Failed to open metadata table: {}", e)))?;
61
62 match table
63 .get(key_str.as_str())
64 .map_err(|e| Error::Storage(format!("Failed to get metadata: {}", e)))?
65 {
66 Some(bytes_guard) => {
67 let _bytes = bytes_guard.value();
68 let value = String::from_utf8(_bytes.to_vec())
69 .map_err(|e| Error::Storage(format!("Failed to decode metadata: {}", e)))?;
70 Ok(Some(value))
71 }
72 None => Ok(None),
73 }
74 })
75 .await
76 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
77 }
78
79 pub async fn store_episode_summary(&self, summary: &EpisodeSummary) -> Result<()> {
116 debug!("Storing episode summary: {}", summary.episode_id);
117 let db = Arc::clone(&self.db);
118 let summary_id = summary.episode_id.to_string();
119 let summary_bytes = postcard::to_allocvec(summary)
120 .map_err(|e| Error::Storage(format!("Failed to serialize episode summary: {}", e)))?;
121
122 tokio::task::spawn_blocking(move || {
123 let write_txn = db
124 .begin_write()
125 .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
126
127 {
128 let mut table = write_txn.open_table(SUMMARIES_TABLE).map_err(|e| {
129 Error::Storage(format!("Failed to open summaries table: {}", e))
130 })?;
131
132 table
133 .insert(summary_id.as_str(), summary_bytes.as_slice())
134 .map_err(|e| {
135 Error::Storage(format!("Failed to insert episode summary: {}", e))
136 })?;
137 }
138
139 write_txn
140 .commit()
141 .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
142
143 Ok::<(), Error>(())
144 })
145 .await
146 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
147
148 info!(
149 "Successfully stored episode summary: {}",
150 summary.episode_id
151 );
152 Ok(())
153 }
154
155 pub async fn get_episode_summary(&self, episode_id: Uuid) -> Result<Option<EpisodeSummary>> {
181 debug!("Retrieving episode summary: {}", episode_id);
182 let db = Arc::clone(&self.db);
183 let episode_id_str = episode_id.to_string();
184
185 tokio::task::spawn_blocking(move || {
186 let read_txn = db
187 .begin_read()
188 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
189
190 let table = read_txn
191 .open_table(SUMMARIES_TABLE)
192 .map_err(|e| Error::Storage(format!("Failed to open summaries table: {}", e)))?;
193
194 match table
195 .get(episode_id_str.as_str())
196 .map_err(|e| Error::Storage(format!("Failed to get episode summary: {}", e)))?
197 {
198 Some(bytes_guard) => {
199 let summary: EpisodeSummary = postcard::from_bytes(bytes_guard.value())
200 .map_err(|e| {
201 Error::Storage(format!("Failed to deserialize episode summary: {}", e))
202 })?;
203 Ok(Some(summary))
204 }
205 None => Ok(None),
206 }
207 })
208 .await
209 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
210 }
211
212 pub async fn store_episode_with_capacity(
252 &self,
253 episode: &Episode,
254 summary: Option<&EpisodeSummary>,
255 capacity_manager: &CapacityManager,
256 ) -> Result<Option<Vec<Uuid>>> {
257 debug!(
258 "Storing episode with capacity enforcement: {}",
259 episode.episode_id
260 );
261
262 let db = Arc::clone(&self.db);
263 let episode_clone = episode.clone();
264 let summary_clone = summary.cloned();
265 let capacity_manager_clone = capacity_manager.clone();
266
267 let result = tokio::task::spawn_blocking(move || {
268 let write_txn = db
270 .begin_write()
271 .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
272
273 let evicted_ids: Vec<Uuid>;
274
275 {
276 let episodes_table = write_txn
278 .open_table(EPISODES_TABLE)
279 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
280
281 let current_count = episodes_table
282 .len()
283 .map_err(|e| Error::Storage(format!("Failed to get episode count: {}", e)))?
284 as usize;
285
286 info!(
287 "Current episode count: {} (checking capacity)",
288 current_count
289 );
290
291 let need_eviction = !capacity_manager_clone.can_store(current_count);
293
294 if need_eviction {
295 info!("Capacity limit reached, selecting episodes for eviction");
296
297 let mut all_episodes = Vec::new();
299 let iter = episodes_table.iter().map_err(|e| {
300 Error::Storage(format!("Failed to iterate episodes: {}", e))
301 })?;
302
303 for result in iter {
304 let (_, bytes_guard) = result.map_err(|e| {
305 Error::Storage(format!("Failed to read episode entry: {}", e))
306 })?;
307
308 let ep: Episode =
309 postcard::from_bytes(bytes_guard.value()).map_err(|e| {
310 Error::Storage(format!("Failed to deserialize episode: {}", e))
311 })?;
312
313 all_episodes.push(ep);
314 }
315
316 evicted_ids = capacity_manager_clone.evict_if_needed(&all_episodes);
318
319 info!("Selected {} episodes for eviction", evicted_ids.len());
320 } else {
321 evicted_ids = Vec::new();
322 }
323
324 drop(episodes_table); }
326
327 if !evicted_ids.is_empty() {
329 let mut episodes_table = write_txn
330 .open_table(EPISODES_TABLE)
331 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
332
333 let mut summaries_table = write_txn.open_table(SUMMARIES_TABLE).map_err(|e| {
334 Error::Storage(format!("Failed to open summaries table: {}", e))
335 })?;
336
337 for evicted_id in &evicted_ids {
338 let evicted_id_str = evicted_id.to_string();
339
340 episodes_table
342 .remove(evicted_id_str.as_str())
343 .map_err(|e| Error::Storage(format!("Failed to delete episode: {}", e)))?;
344
345 let _ = summaries_table.remove(evicted_id_str.as_str());
347 }
348
349 warn!("Evicted {} episodes to make room", evicted_ids.len());
350 }
351
352 {
354 let episode_id = episode_clone.episode_id.to_string();
355 let episode_bytes = postcard::to_allocvec(&episode_clone)
356 .map_err(|e| Error::Storage(format!("Failed to serialize episode: {}", e)))?;
357
358 let mut episodes_table = write_txn
359 .open_table(EPISODES_TABLE)
360 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
361
362 episodes_table
363 .insert(episode_id.as_str(), episode_bytes.as_slice())
364 .map_err(|e| Error::Storage(format!("Failed to insert episode: {}", e)))?;
365 }
366
367 if let Some(summary) = summary_clone {
369 let summary_id = summary.episode_id.to_string();
370 let summary_bytes = postcard::to_allocvec(&summary).map_err(|e| {
371 Error::Storage(format!("Failed to serialize episode summary: {}", e))
372 })?;
373
374 let mut summaries_table = write_txn.open_table(SUMMARIES_TABLE).map_err(|e| {
375 Error::Storage(format!("Failed to open summaries table: {}", e))
376 })?;
377
378 summaries_table
379 .insert(summary_id.as_str(), summary_bytes.as_slice())
380 .map_err(|e| {
381 Error::Storage(format!("Failed to insert episode summary: {}", e))
382 })?;
383 }
384
385 {
387 let episodes_table = write_txn
388 .open_table(EPISODES_TABLE)
389 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
390
391 let new_count = episodes_table
392 .len()
393 .map_err(|e| Error::Storage(format!("Failed to get episode count: {}", e)))?
394 as usize;
395
396 let mut metadata_table = write_txn
397 .open_table(METADATA_TABLE)
398 .map_err(|e| Error::Storage(format!("Failed to open metadata table: {}", e)))?;
399
400 metadata_table
401 .insert("episode_count", new_count.to_string().as_bytes())
402 .map_err(|e| {
403 Error::Storage(format!("Failed to update episode count: {}", e))
404 })?;
405
406 info!("Updated episode count metadata: {} episodes", new_count);
407 }
408
409 write_txn
411 .commit()
412 .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
413
414 info!(
415 "Successfully stored episode {} with capacity enforcement",
416 episode_clone.episode_id
417 );
418
419 Ok::<Option<Vec<Uuid>>, Error>(if evicted_ids.is_empty() {
420 None
421 } else {
422 Some(evicted_ids)
423 })
424 })
425 .await
426 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
427
428 if let Ok(episode_bytes) = postcard::to_allocvec(episode) {
430 self.cache
431 .record_access(episode.episode_id, false, Some(episode_bytes.len()))
432 .await;
433 }
434
435 Ok(result)
436 }
437}