Skip to main content

do_memory_storage_redb/
episodes.rs

1//! Episode storage operations for redb cache
2
3use crate::{EPISODES_TABLE, RedbStorage};
4use do_memory_core::{Episode, Error, Result};
5use redb::{ReadableDatabase, ReadableTable};
6use std::sync::Arc;
7use tracing::{debug, info};
8use uuid::Uuid;
9
10/// Query options for episodes
11#[derive(Debug, Clone, Default)]
12pub struct RedbQuery {
13    pub limit: Option<usize>,
14}
15
16impl RedbStorage {
17    /// Store an episode in cache
18    pub async fn store_episode(&self, episode: &Episode) -> Result<()> {
19        debug!("Storing episode in cache: {}", episode.episode_id);
20        let db = Arc::clone(&self.db);
21        let episode_id = episode.episode_id.to_string();
22        let episode_bytes = postcard::to_allocvec(episode)
23            .map_err(|e| Error::Storage(format!("Failed to serialize episode: {}", e)))?;
24
25        let byte_size = episode_bytes.len();
26
27        tokio::task::spawn_blocking(move || {
28            let write_txn = db
29                .begin_write()
30                .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
31
32            {
33                let mut table = write_txn
34                    .open_table(EPISODES_TABLE)
35                    .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
36
37                table
38                    .insert(episode_id.as_str(), episode_bytes.as_slice())
39                    .map_err(|e| Error::Storage(format!("Failed to insert episode: {}", e)))?;
40            }
41
42            write_txn
43                .commit()
44                .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
45
46            Ok::<(), Error>(())
47        })
48        .await
49        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
50
51        // Record cache miss (new item being added)
52        self.cache
53            .record_access(episode.episode_id, false, Some(byte_size))
54            .await;
55
56        info!("Successfully cached episode: {}", episode.episode_id);
57        Ok(())
58    }
59
60    /// Retrieve an episode from cache
61    pub async fn get_episode(&self, episode_id: Uuid) -> Result<Option<Episode>> {
62        debug!("Retrieving episode from cache: {}", episode_id);
63        let db = Arc::clone(&self.db);
64        let episode_id_str = episode_id.to_string();
65
66        let result = tokio::task::spawn_blocking(move || {
67            let read_txn = db
68                .begin_read()
69                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
70
71            let table = read_txn
72                .open_table(EPISODES_TABLE)
73                .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
74
75            match table
76                .get(episode_id_str.as_str())
77                .map_err(|e| Error::Storage(format!("Failed to get episode: {}", e)))?
78            {
79                Some(bytes_guard) => {
80                    let _bytes = bytes_guard.value();
81                    let episode: Episode =
82                        postcard::from_bytes(bytes_guard.value()).map_err(|e| {
83                            Error::Storage(format!("Failed to deserialize episode: {}", e))
84                        })?;
85                    Ok::<Option<Episode>, Error>(Some(episode))
86                }
87                None => Ok::<Option<Episode>, Error>(None),
88            }
89        })
90        .await
91        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
92
93        // Record cache access (hit if found, miss if not)
94        // Only track hits in the cache - misses should not add entries
95        if let Some(episode) = &result {
96            let episode_bytes = postcard::to_allocvec(episode)
97                .map_err(|e| Error::Storage(format!("Failed to serialize episode: {}", e)))?;
98            self.cache
99                .record_access(episode_id, true, Some(episode_bytes.len()))
100                .await;
101        }
102
103        Ok(result)
104    }
105
106    /// Get all episodes from cache (with optional limit)
107    pub async fn get_all_episodes(&self, query: &RedbQuery) -> Result<Vec<Episode>> {
108        debug!("Retrieving all episodes from cache");
109        let db = Arc::clone(&self.db);
110        let limit = query.limit;
111
112        tokio::task::spawn_blocking(move || {
113            let read_txn = db
114                .begin_read()
115                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
116
117            let table = read_txn
118                .open_table(EPISODES_TABLE)
119                .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
120
121            let mut episodes = Vec::new();
122            let iter = table
123                .iter()
124                .map_err(|e| Error::Storage(format!("Failed to iterate episodes: {}", e)))?;
125
126            for (count, result) in iter.enumerate() {
127                if let Some(max) = limit {
128                    if count >= max {
129                        break;
130                    }
131                }
132
133                let (_, bytes_guard) = result
134                    .map_err(|e| Error::Storage(format!("Failed to read episode entry: {}", e)))?;
135
136                let episode: Episode = postcard::from_bytes(bytes_guard.value())
137                    .map_err(|e| Error::Storage(format!("Failed to deserialize episode: {}", e)))?;
138
139                episodes.push(episode);
140            }
141
142            Ok(episodes)
143        })
144        .await
145        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
146    }
147
148    /// Delete an episode from cache
149    pub async fn delete_episode(&self, episode_id: Uuid) -> Result<()> {
150        debug!("Deleting episode from cache: {}", episode_id);
151        let db = Arc::clone(&self.db);
152        let episode_id_str = episode_id.to_string();
153
154        tokio::task::spawn_blocking(move || {
155            let write_txn = db
156                .begin_write()
157                .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
158
159            {
160                let mut table = write_txn
161                    .open_table(EPISODES_TABLE)
162                    .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
163
164                table
165                    .remove(episode_id_str.as_str())
166                    .map_err(|e| Error::Storage(format!("Failed to delete episode: {}", e)))?;
167            }
168
169            write_txn
170                .commit()
171                .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
172
173            Ok::<(), Error>(())
174        })
175        .await
176        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
177
178        // Remove from cache tracking
179        self.cache.remove(episode_id).await;
180
181        info!("Deleted episode from cache: {}", episode_id);
182        Ok(())
183    }
184}