do_memory_storage_redb/
episodes.rs1use crate::{EPISODES_TABLE, RedbStorage};
4use do_memory_core::{Episode, Error, Result};
5use redb::{ReadableDatabase, ReadableTable};
6use std::sync::Arc;
7use tracing::{debug, info};
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Default)]
12pub struct RedbQuery {
13 pub limit: Option<usize>,
14}
15
16impl RedbStorage {
17 pub async fn store_episode(&self, episode: &Episode) -> Result<()> {
19 debug!("Storing episode in cache: {}", episode.episode_id);
20 let db = Arc::clone(&self.db);
21 let episode_id = episode.episode_id.to_string();
22 let episode_bytes = postcard::to_allocvec(episode)
23 .map_err(|e| Error::Storage(format!("Failed to serialize episode: {}", e)))?;
24
25 let byte_size = episode_bytes.len();
26
27 tokio::task::spawn_blocking(move || {
28 let write_txn = db
29 .begin_write()
30 .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
31
32 {
33 let mut table = write_txn
34 .open_table(EPISODES_TABLE)
35 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
36
37 table
38 .insert(episode_id.as_str(), episode_bytes.as_slice())
39 .map_err(|e| Error::Storage(format!("Failed to insert episode: {}", e)))?;
40 }
41
42 write_txn
43 .commit()
44 .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
45
46 Ok::<(), Error>(())
47 })
48 .await
49 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
50
51 self.cache
53 .record_access(episode.episode_id, false, Some(byte_size))
54 .await;
55
56 info!("Successfully cached episode: {}", episode.episode_id);
57 Ok(())
58 }
59
60 pub async fn get_episode(&self, episode_id: Uuid) -> Result<Option<Episode>> {
62 debug!("Retrieving episode from cache: {}", episode_id);
63 let db = Arc::clone(&self.db);
64 let episode_id_str = episode_id.to_string();
65
66 let result = tokio::task::spawn_blocking(move || {
67 let read_txn = db
68 .begin_read()
69 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
70
71 let table = read_txn
72 .open_table(EPISODES_TABLE)
73 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
74
75 match table
76 .get(episode_id_str.as_str())
77 .map_err(|e| Error::Storage(format!("Failed to get episode: {}", e)))?
78 {
79 Some(bytes_guard) => {
80 let _bytes = bytes_guard.value();
81 let episode: Episode =
82 postcard::from_bytes(bytes_guard.value()).map_err(|e| {
83 Error::Storage(format!("Failed to deserialize episode: {}", e))
84 })?;
85 Ok::<Option<Episode>, Error>(Some(episode))
86 }
87 None => Ok::<Option<Episode>, Error>(None),
88 }
89 })
90 .await
91 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
92
93 if let Some(episode) = &result {
96 let episode_bytes = postcard::to_allocvec(episode)
97 .map_err(|e| Error::Storage(format!("Failed to serialize episode: {}", e)))?;
98 self.cache
99 .record_access(episode_id, true, Some(episode_bytes.len()))
100 .await;
101 }
102
103 Ok(result)
104 }
105
106 pub async fn get_all_episodes(&self, query: &RedbQuery) -> Result<Vec<Episode>> {
108 debug!("Retrieving all episodes from cache");
109 let db = Arc::clone(&self.db);
110 let limit = query.limit;
111
112 tokio::task::spawn_blocking(move || {
113 let read_txn = db
114 .begin_read()
115 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
116
117 let table = read_txn
118 .open_table(EPISODES_TABLE)
119 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
120
121 let mut episodes = Vec::new();
122 let iter = table
123 .iter()
124 .map_err(|e| Error::Storage(format!("Failed to iterate episodes: {}", e)))?;
125
126 for (count, result) in iter.enumerate() {
127 if let Some(max) = limit {
128 if count >= max {
129 break;
130 }
131 }
132
133 let (_, bytes_guard) = result
134 .map_err(|e| Error::Storage(format!("Failed to read episode entry: {}", e)))?;
135
136 let episode: Episode = postcard::from_bytes(bytes_guard.value())
137 .map_err(|e| Error::Storage(format!("Failed to deserialize episode: {}", e)))?;
138
139 episodes.push(episode);
140 }
141
142 Ok(episodes)
143 })
144 .await
145 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
146 }
147
148 pub async fn delete_episode(&self, episode_id: Uuid) -> Result<()> {
150 debug!("Deleting episode from cache: {}", episode_id);
151 let db = Arc::clone(&self.db);
152 let episode_id_str = episode_id.to_string();
153
154 tokio::task::spawn_blocking(move || {
155 let write_txn = db
156 .begin_write()
157 .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
158
159 {
160 let mut table = write_txn
161 .open_table(EPISODES_TABLE)
162 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
163
164 table
165 .remove(episode_id_str.as_str())
166 .map_err(|e| Error::Storage(format!("Failed to delete episode: {}", e)))?;
167 }
168
169 write_txn
170 .commit()
171 .map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
172
173 Ok::<(), Error>(())
174 })
175 .await
176 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))??;
177
178 self.cache.remove(episode_id).await;
180
181 info!("Deleted episode from cache: {}", episode_id);
182 Ok(())
183 }
184}