Skip to main content

do_memory_storage_redb/
episodes_summaries.rs

1//! Episode summaries and capacity operations for redb cache
2
3use crate::{EPISODES_TABLE, METADATA_TABLE, RedbStorage, SUMMARIES_TABLE};
4use do_memory_core::episodic::CapacityManager;
5use do_memory_core::semantic::EpisodeSummary;
6use do_memory_core::{Episode, Error, Result};
7use redb::{ReadableDatabase, ReadableTable, ReadableTableMetadata};
8use std::sync::Arc;
9use tracing::{debug, info, warn};
10use uuid::Uuid;
11
12impl RedbStorage {
13    /// Store metadata value
14    pub async fn store_metadata(&self, key: &str, value: &str) -> Result<()> {
15        debug!("Storing metadata: {} = {}", key, value);
16        let db = Arc::clone(&self.db);
17        let key_str = key.to_string();
18        let value_bytes = value.as_bytes().to_vec();
19
20        tokio::task::spawn_blocking(move || {
21            let write_txn = db
22                .begin_write()
23                .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
24
25            {
26                let mut table = write_txn
27                    .open_table(METADATA_TABLE)
28                    .map_err(|e| Error::Storage(format!("Failed to open metadata table: {}", e)))?;
29
30                table
31                    .insert(key_str.as_str(), value_bytes.as_slice())
32                    .map_err(|e| Error::Storage(format!("Failed to insert metadata: {}", e)))?;
33            }
34
35            write_txn
36                .commit()
37                .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
38
39            Ok::<(), Error>(())
40        })
41        .await
42        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
43
44        Ok(())
45    }
46
47    /// Retrieve metadata value
48    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>> {
49        debug!("Retrieving metadata: {}", key);
50        let db = Arc::clone(&self.db);
51        let key_str = key.to_string();
52
53        tokio::task::spawn_blocking(move || {
54            let read_txn = db
55                .begin_read()
56                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
57
58            let table = read_txn
59                .open_table(METADATA_TABLE)
60                .map_err(|e| Error::Storage(format!("Failed to open metadata table: {}", e)))?;
61
62            match table
63                .get(key_str.as_str())
64                .map_err(|e| Error::Storage(format!("Failed to get metadata: {}", e)))?
65            {
66                Some(bytes_guard) => {
67                    let _bytes = bytes_guard.value();
68                    let value = String::from_utf8(_bytes.to_vec())
69                        .map_err(|e| Error::Storage(format!("Failed to decode metadata: {}", e)))?;
70                    Ok(Some(value))
71                }
72                None => Ok(None),
73            }
74        })
75        .await
76        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
77    }
78
79    /// Store an episode summary.
80    ///
81    /// Stores a semantic summary of an episode for efficient retrieval.
82    /// Uses postcard serialization for compact storage.
83    ///
84    /// # Arguments
85    ///
86    /// * `summary` - The episode summary to store
87    ///
88    /// # Returns
89    ///
90    /// `Ok(())` on success, error otherwise.
91    ///
92    /// # Examples
93    ///
94    /// ```no_run
95    /// # use do_memory_storage_redb::RedbStorage;
96    /// # use do_memory_core::semantic::EpisodeSummary;
97    /// # use std::path::Path;
98    /// # use uuid::Uuid;
99    /// # use chrono::Utc;
100    /// # async fn example() -> anyhow::Result<()> {
101    /// # let storage = RedbStorage::new(Path::new("./test.redb")).await?;
102    /// let summary = EpisodeSummary {
103    ///     episode_id: Uuid::new_v4(),
104    ///     summary_text: "Task completed successfully".to_string(),
105    ///     key_concepts: vec!["rust".to_string(), "testing".to_string()],
106    ///     key_steps: vec!["Step 1: Initialize".to_string()],
107    ///     summary_embedding: None,
108    ///     created_at: Utc::now(),
109    /// };
110    ///
111    /// storage.store_episode_summary(&summary).await?;
112    /// # Ok(())
113    /// # }
114    /// ```
115    pub async fn store_episode_summary(&self, summary: &EpisodeSummary) -> Result<()> {
116        debug!("Storing episode summary: {}", summary.episode_id);
117        let db = Arc::clone(&self.db);
118        let summary_id = summary.episode_id.to_string();
119        let summary_bytes = postcard::to_allocvec(summary)
120            .map_err(|e| Error::Storage(format!("Failed to serialize episode summary: {}", e)))?;
121
122        tokio::task::spawn_blocking(move || {
123            let write_txn = db
124                .begin_write()
125                .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
126
127            {
128                let mut table = write_txn.open_table(SUMMARIES_TABLE).map_err(|e| {
129                    Error::Storage(format!("Failed to open summaries table: {}", e))
130                })?;
131
132                table
133                    .insert(summary_id.as_str(), summary_bytes.as_slice())
134                    .map_err(|e| {
135                        Error::Storage(format!("Failed to insert episode summary: {}", e))
136                    })?;
137            }
138
139            write_txn
140                .commit()
141                .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
142
143            Ok::<(), Error>(())
144        })
145        .await
146        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
147
148        info!(
149            "Successfully stored episode summary: {}",
150            summary.episode_id
151        );
152        Ok(())
153    }
154
155    /// Retrieve an episode summary.
156    ///
157    /// # Arguments
158    ///
159    /// * `episode_id` - ID of the episode whose summary to retrieve
160    ///
161    /// # Returns
162    ///
163    /// The episode summary if found, `None` otherwise.
164    ///
165    /// # Examples
166    ///
167    /// ```no_run
168    /// # use do_memory_storage_redb::RedbStorage;
169    /// # use std::path::Path;
170    /// # use uuid::Uuid;
171    /// # async fn example() -> anyhow::Result<()> {
172    /// # let storage = RedbStorage::new(Path::new("./test.redb")).await?;
173    /// let episode_id = Uuid::new_v4();
174    /// if let Some(summary) = storage.get_episode_summary(episode_id).await? {
175    ///     println!("Summary: {}", summary.summary_text);
176    /// }
177    /// # Ok(())
178    /// # }
179    /// ```
180    pub async fn get_episode_summary(&self, episode_id: Uuid) -> Result<Option<EpisodeSummary>> {
181        debug!("Retrieving episode summary: {}", episode_id);
182        let db = Arc::clone(&self.db);
183        let episode_id_str = episode_id.to_string();
184
185        tokio::task::spawn_blocking(move || {
186            let read_txn = db
187                .begin_read()
188                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
189
190            let table = read_txn
191                .open_table(SUMMARIES_TABLE)
192                .map_err(|e| Error::Storage(format!("Failed to open summaries table: {}", e)))?;
193
194            match table
195                .get(episode_id_str.as_str())
196                .map_err(|e| Error::Storage(format!("Failed to get episode summary: {}", e)))?
197            {
198                Some(bytes_guard) => {
199                    let summary: EpisodeSummary = postcard::from_bytes(bytes_guard.value())
200                        .map_err(|e| {
201                            Error::Storage(format!("Failed to deserialize episode summary: {}", e))
202                        })?;
203                    Ok(Some(summary))
204                }
205                None => Ok(None),
206            }
207        })
208        .await
209        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
210    }
211
212    /// Store an episode with capacity enforcement.
213    ///
214    /// This method enforces capacity limits by evicting low-relevance episodes
215    /// when storage is full. The eviction and insertion happen atomically in a
216    /// single write transaction to ensure consistency.
217    ///
218    /// # Arguments
219    ///
220    /// * `episode` - The episode to store
221    /// * `summary` - Optional episode summary to store alongside
222    /// * `capacity_manager` - Manager that determines eviction policy
223    ///
224    /// # Returns
225    ///
226    /// `Ok(Some(evicted_ids))` if episodes were evicted, `Ok(None)` if no eviction needed.
227    ///
228    /// # Examples
229    ///
230    /// ```no_run
231    /// # use do_memory_storage_redb::RedbStorage;
232    /// # use do_memory_core::{Episode, TaskContext, TaskType};
233    /// # use do_memory_core::episodic::{CapacityManager, EvictionPolicy};
234    /// # use std::path::Path;
235    /// # async fn example() -> anyhow::Result<()> {
236    /// # let storage = RedbStorage::new(Path::new("./test.redb")).await?;
237    /// let capacity_mgr = CapacityManager::new(100, EvictionPolicy::RelevanceWeighted);
238    /// let episode = Episode::new(
239    ///     "Test task".to_string(),
240    ///     TaskContext::default(),
241    ///     TaskType::Testing,
242    /// );
243    ///
244    /// let evicted = storage.store_episode_with_capacity(&episode, None, &capacity_mgr).await?;
245    /// if let Some(evicted_ids) = evicted {
246    ///     println!("Evicted {} episodes", evicted_ids.len());
247    /// }
248    /// # Ok(())
249    /// # }
250    /// ```
251    pub async fn store_episode_with_capacity(
252        &self,
253        episode: &Episode,
254        summary: Option<&EpisodeSummary>,
255        capacity_manager: &CapacityManager,
256    ) -> Result<Option<Vec<Uuid>>> {
257        debug!(
258            "Storing episode with capacity enforcement: {}",
259            episode.episode_id
260        );
261
262        let db = Arc::clone(&self.db);
263        let episode_clone = episode.clone();
264        let summary_clone = summary.cloned();
265        let capacity_manager_clone = capacity_manager.clone();
266
267        let result = tokio::task::spawn_blocking(move || {
268            // Begin a single write transaction for atomic evict-then-insert
269            let write_txn = db
270                .begin_write()
271                .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
272
273            let evicted_ids: Vec<Uuid>;
274
275            {
276                // 1. Get current episode count
277                let episodes_table = write_txn
278                    .open_table(EPISODES_TABLE)
279                    .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
280
281                let current_count = episodes_table
282                    .len()
283                    .map_err(|e| Error::Storage(format!("Failed to get episode count: {}", e)))?
284                    as usize;
285
286                info!(
287                    "Current episode count: {} (checking capacity)",
288                    current_count
289                );
290
291                // 2. Check if we need to evict
292                let need_eviction = !capacity_manager_clone.can_store(current_count);
293
294                if need_eviction {
295                    info!("Capacity limit reached, selecting episodes for eviction");
296
297                    // Load all episodes to determine which to evict
298                    let mut all_episodes = Vec::new();
299                    let iter = episodes_table.iter().map_err(|e| {
300                        Error::Storage(format!("Failed to iterate episodes: {}", e))
301                    })?;
302
303                    for result in iter {
304                        let (_, bytes_guard) = result.map_err(|e| {
305                            Error::Storage(format!("Failed to read episode entry: {}", e))
306                        })?;
307
308                        let ep: Episode =
309                            postcard::from_bytes(bytes_guard.value()).map_err(|e| {
310                                Error::Storage(format!("Failed to deserialize episode: {}", e))
311                            })?;
312
313                        all_episodes.push(ep);
314                    }
315
316                    // Determine which episodes to evict
317                    evicted_ids = capacity_manager_clone.evict_if_needed(&all_episodes);
318
319                    info!("Selected {} episodes for eviction", evicted_ids.len());
320                } else {
321                    evicted_ids = Vec::new();
322                }
323
324                drop(episodes_table); // Release the read-only table handle
325            }
326
327            // 3. Perform eviction if needed (in the same transaction)
328            if !evicted_ids.is_empty() {
329                let mut episodes_table = write_txn
330                    .open_table(EPISODES_TABLE)
331                    .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
332
333                let mut summaries_table = write_txn.open_table(SUMMARIES_TABLE).map_err(|e| {
334                    Error::Storage(format!("Failed to open summaries table: {}", e))
335                })?;
336
337                for evicted_id in &evicted_ids {
338                    let evicted_id_str = evicted_id.to_string();
339
340                    // Delete episode
341                    episodes_table
342                        .remove(evicted_id_str.as_str())
343                        .map_err(|e| Error::Storage(format!("Failed to delete episode: {}", e)))?;
344
345                    // Delete summary (if exists - no error if not found)
346                    let _ = summaries_table.remove(evicted_id_str.as_str());
347                }
348
349                warn!("Evicted {} episodes to make room", evicted_ids.len());
350            }
351
352            // 4. Insert new episode
353            {
354                let episode_id = episode_clone.episode_id.to_string();
355                let episode_bytes = postcard::to_allocvec(&episode_clone)
356                    .map_err(|e| Error::Storage(format!("Failed to serialize episode: {}", e)))?;
357
358                let mut episodes_table = write_txn
359                    .open_table(EPISODES_TABLE)
360                    .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
361
362                episodes_table
363                    .insert(episode_id.as_str(), episode_bytes.as_slice())
364                    .map_err(|e| Error::Storage(format!("Failed to insert episode: {}", e)))?;
365            }
366
367            // 5. Insert summary if provided
368            if let Some(summary) = summary_clone {
369                let summary_id = summary.episode_id.to_string();
370                let summary_bytes = postcard::to_allocvec(&summary).map_err(|e| {
371                    Error::Storage(format!("Failed to serialize episode summary: {}", e))
372                })?;
373
374                let mut summaries_table = write_txn.open_table(SUMMARIES_TABLE).map_err(|e| {
375                    Error::Storage(format!("Failed to open summaries table: {}", e))
376                })?;
377
378                summaries_table
379                    .insert(summary_id.as_str(), summary_bytes.as_slice())
380                    .map_err(|e| {
381                        Error::Storage(format!("Failed to insert episode summary: {}", e))
382                    })?;
383            }
384
385            // 6. Update episode count metadata
386            {
387                let episodes_table = write_txn
388                    .open_table(EPISODES_TABLE)
389                    .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
390
391                let new_count = episodes_table
392                    .len()
393                    .map_err(|e| Error::Storage(format!("Failed to get episode count: {}", e)))?
394                    as usize;
395
396                let mut metadata_table = write_txn
397                    .open_table(METADATA_TABLE)
398                    .map_err(|e| Error::Storage(format!("Failed to open metadata table: {}", e)))?;
399
400                metadata_table
401                    .insert("episode_count", new_count.to_string().as_bytes())
402                    .map_err(|e| {
403                        Error::Storage(format!("Failed to update episode count: {}", e))
404                    })?;
405
406                info!("Updated episode count metadata: {} episodes", new_count);
407            }
408
409            // 7. Commit the transaction
410            write_txn
411                .commit()
412                .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
413
414            info!(
415                "Successfully stored episode {} with capacity enforcement",
416                episode_clone.episode_id
417            );
418
419            Ok::<Option<Vec<Uuid>>, Error>(if evicted_ids.is_empty() {
420                None
421            } else {
422                Some(evicted_ids)
423            })
424        })
425        .await
426        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
427
428        // Record cache access for the new episode
429        if let Ok(episode_bytes) = postcard::to_allocvec(episode) {
430            self.cache
431                .record_access(episode.episode_id, false, Some(episode_bytes.len()))
432                .await;
433        }
434
435        Ok(result)
436    }
437}