do_memory_storage_redb/
recommendations.rs1use crate::{
4 RECOMMENDATION_EPISODE_INDEX_TABLE, RECOMMENDATION_FEEDBACK_TABLE,
5 RECOMMENDATION_SESSIONS_TABLE, RedbStorage, with_db_timeout,
6};
7use do_memory_core::memory::attribution::{
8 RecommendationFeedback, RecommendationSession, RecommendationStats,
9};
10use do_memory_core::{Error, Result};
11use redb::{ReadableDatabase, ReadableTable, TableDefinition};
12use serde::de::DeserializeOwned;
13use std::sync::Arc;
14use uuid::Uuid;
15
16impl RedbStorage {
17 pub async fn store_recommendation_session(
18 &self,
19 session: &RecommendationSession,
20 ) -> Result<()> {
21 let db = Arc::clone(&self.db);
22 let session_bytes = postcard::to_allocvec(session).map_err(|e| {
23 Error::Storage(format!("Failed to serialize recommendation session: {}", e))
24 })?;
25 let session_key = session.session_id.to_string();
26 let episode_key = session.episode_id.to_string();
27
28 with_db_timeout(move || {
29 let write_txn = db
30 .begin_write()
31 .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
32
33 {
34 let mut session_table = write_txn
35 .open_table(RECOMMENDATION_SESSIONS_TABLE)
36 .map_err(|e| {
37 Error::Storage(format!(
38 "Failed to open recommendation sessions table: {}",
39 e
40 ))
41 })?;
42 session_table
43 .insert(session_key.as_str(), session_bytes.as_slice())
44 .map_err(|e| {
45 Error::Storage(format!("Failed to insert recommendation session: {}", e))
46 })?;
47 }
48
49 {
50 let mut episode_index = write_txn
51 .open_table(RECOMMENDATION_EPISODE_INDEX_TABLE)
52 .map_err(|e| {
53 Error::Storage(format!(
54 "Failed to open recommendation episode index: {}",
55 e
56 ))
57 })?;
58 episode_index
59 .insert(episode_key.as_str(), session_key.as_str())
60 .map_err(|e| {
61 Error::Storage(format!(
62 "Failed to update recommendation episode index: {}",
63 e
64 ))
65 })?;
66 }
67
68 write_txn.commit().map_err(|e| {
69 Error::Storage(format!(
70 "Failed to commit recommendation session transaction: {}",
71 e
72 ))
73 })?;
74 Ok(())
75 })
76 .await
77 }
78
79 pub async fn get_recommendation_session(
80 &self,
81 session_id: Uuid,
82 ) -> Result<Option<RecommendationSession>> {
83 self.read_postcard_entry(RECOMMENDATION_SESSIONS_TABLE, session_id.to_string())
84 .await
85 }
86
87 pub async fn get_recommendation_session_for_episode(
88 &self,
89 episode_id: Uuid,
90 ) -> Result<Option<RecommendationSession>> {
91 let db = Arc::clone(&self.db);
92 let episode_key = episode_id.to_string();
93
94 let session_id = with_db_timeout(move || {
95 let read_txn = db
96 .begin_read()
97 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
98 let table = read_txn
99 .open_table(RECOMMENDATION_EPISODE_INDEX_TABLE)
100 .map_err(|e| {
101 Error::Storage(format!(
102 "Failed to open recommendation episode index: {}",
103 e
104 ))
105 })?;
106 let entry = table
107 .get(episode_key.as_str())
108 .map_err(|e| Error::Storage(format!("Failed to read episode index: {}", e)))?;
109 let value = entry.map(|v| v.value().to_string());
110 Ok(value)
111 })
112 .await?;
113
114 if let Some(session_id) = session_id {
115 self.read_postcard_entry(RECOMMENDATION_SESSIONS_TABLE, session_id)
116 .await
117 } else {
118 Ok(None)
119 }
120 }
121
122 pub async fn store_recommendation_feedback(
123 &self,
124 feedback: &RecommendationFeedback,
125 ) -> Result<()> {
126 let db = Arc::clone(&self.db);
127 let feedback_bytes = postcard::to_allocvec(feedback).map_err(|e| {
128 Error::Storage(format!(
129 "Failed to serialize recommendation feedback: {}",
130 e
131 ))
132 })?;
133 let session_key = feedback.session_id.to_string();
134
135 with_db_timeout(move || {
136 let write_txn = db
137 .begin_write()
138 .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
139 {
140 let mut table = write_txn
141 .open_table(RECOMMENDATION_FEEDBACK_TABLE)
142 .map_err(|e| {
143 Error::Storage(format!(
144 "Failed to open recommendation feedback table: {}",
145 e
146 ))
147 })?;
148 table
149 .insert(session_key.as_str(), feedback_bytes.as_slice())
150 .map_err(|e| {
151 Error::Storage(format!("Failed to insert recommendation feedback: {}", e))
152 })?;
153 }
154 write_txn.commit().map_err(|e| {
155 Error::Storage(format!(
156 "Failed to commit recommendation feedback transaction: {}",
157 e
158 ))
159 })?;
160 Ok(())
161 })
162 .await
163 }
164
165 pub async fn get_recommendation_feedback(
166 &self,
167 session_id: Uuid,
168 ) -> Result<Option<RecommendationFeedback>> {
169 self.read_postcard_entry(RECOMMENDATION_FEEDBACK_TABLE, session_id.to_string())
170 .await
171 }
172
173 pub async fn get_recommendation_stats(&self) -> Result<RecommendationStats> {
174 let sessions = self
175 .collect_postcard_entries::<RecommendationSession>(RECOMMENDATION_SESSIONS_TABLE)
176 .await?;
177 let feedback = self
178 .collect_postcard_entries::<RecommendationFeedback>(RECOMMENDATION_FEEDBACK_TABLE)
179 .await?;
180
181 let mut stats = RecommendationStats {
182 total_sessions: sessions.len(),
183 total_feedback: feedback.len(),
184 ..RecommendationStats::default()
185 };
186
187 let total_recommended: usize = sessions
188 .iter()
189 .map(|session| session.recommended_pattern_ids.len())
190 .sum();
191 let mut total_applied = 0usize;
192 let mut successful_applications = 0usize;
193 let mut total_ratings = 0f32;
194 let mut rating_count = 0usize;
195
196 for fb in &feedback {
197 total_applied += fb.applied_pattern_ids.len();
198 if matches!(
199 fb.outcome,
200 do_memory_core::types::TaskOutcome::Success { .. }
201 | do_memory_core::types::TaskOutcome::PartialSuccess { .. }
202 ) {
203 successful_applications += fb.applied_pattern_ids.len();
204 }
205 if let Some(rating) = fb.agent_rating {
206 total_ratings += rating;
207 rating_count += 1;
208 }
209 }
210
211 stats.patterns_applied = total_applied;
212 stats.patterns_ignored = total_recommended.saturating_sub(total_applied);
213 stats.successful_applications = successful_applications;
214 stats.adoption_rate = if total_recommended > 0 {
215 total_applied as f32 / total_recommended as f32
216 } else {
217 0.0
218 };
219 stats.success_after_adoption_rate = if total_applied > 0 {
220 successful_applications as f32 / total_applied as f32
221 } else {
222 0.0
223 };
224 stats.avg_agent_rating = if rating_count > 0 {
225 Some(total_ratings / rating_count as f32)
226 } else {
227 None
228 };
229
230 Ok(stats)
231 }
232
233 async fn read_postcard_entry<T>(
234 &self,
235 table: TableDefinition<'static, &'static str, &'static [u8]>,
236 key: String,
237 ) -> Result<Option<T>>
238 where
239 T: DeserializeOwned + Send + 'static,
240 {
241 let db = Arc::clone(&self.db);
242
243 with_db_timeout(move || {
244 let read_txn = db
245 .begin_read()
246 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
247 let table = read_txn
248 .open_table(table)
249 .map_err(|e| Error::Storage(format!("Failed to open table: {}", e)))?;
250 let guard = table
251 .get(key.as_str())
252 .map_err(|e| Error::Storage(format!("Failed to read value: {}", e)))?;
253 if let Some(value) = guard {
254 let entry = postcard::from_bytes(value.value())
255 .map_err(|e| Error::Storage(format!("Failed to deserialize entry: {}", e)))?;
256 Ok(Some(entry))
257 } else {
258 Ok(None)
259 }
260 })
261 .await
262 }
263
264 async fn collect_postcard_entries<T>(
265 &self,
266 table: TableDefinition<'static, &'static str, &'static [u8]>,
267 ) -> Result<Vec<T>>
268 where
269 T: DeserializeOwned + Send + 'static,
270 {
271 let db = Arc::clone(&self.db);
272
273 with_db_timeout(move || {
274 let read_txn = db
275 .begin_read()
276 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
277 let table = read_txn
278 .open_table(table)
279 .map_err(|e| Error::Storage(format!("Failed to open table: {}", e)))?;
280
281 let mut entries = Vec::new();
282 let iter = table
283 .iter()
284 .map_err(|e| Error::Storage(format!("Failed to iterate table entries: {}", e)))?;
285 for result in iter {
286 let (_key, value) = result
287 .map_err(|e| Error::Storage(format!("Failed to read table entry: {}", e)))?;
288 let entry = postcard::from_bytes(value.value())
289 .map_err(|e| Error::Storage(format!("Failed to deserialize entry: {}", e)))?;
290 entries.push(entry);
291 }
292
293 Ok(entries)
294 })
295 .await
296 }
297}