Skip to main content

do_memory_storage_redb/
recommendations.rs

1//! Recommendation attribution storage for redb cache
2
3use crate::{
4    RECOMMENDATION_EPISODE_INDEX_TABLE, RECOMMENDATION_FEEDBACK_TABLE,
5    RECOMMENDATION_SESSIONS_TABLE, RedbStorage, with_db_timeout,
6};
7use do_memory_core::memory::attribution::{
8    RecommendationFeedback, RecommendationSession, RecommendationStats,
9};
10use do_memory_core::{Error, Result};
11use redb::{ReadableDatabase, ReadableTable, TableDefinition};
12use serde::de::DeserializeOwned;
13use std::sync::Arc;
14use uuid::Uuid;
15
16impl RedbStorage {
17    pub async fn store_recommendation_session(
18        &self,
19        session: &RecommendationSession,
20    ) -> Result<()> {
21        let db = Arc::clone(&self.db);
22        let session_bytes = postcard::to_allocvec(session).map_err(|e| {
23            Error::Storage(format!("Failed to serialize recommendation session: {}", e))
24        })?;
25        let session_key = session.session_id.to_string();
26        let episode_key = session.episode_id.to_string();
27
28        with_db_timeout(move || {
29            let write_txn = db
30                .begin_write()
31                .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
32
33            {
34                let mut session_table = write_txn
35                    .open_table(RECOMMENDATION_SESSIONS_TABLE)
36                    .map_err(|e| {
37                        Error::Storage(format!(
38                            "Failed to open recommendation sessions table: {}",
39                            e
40                        ))
41                    })?;
42                session_table
43                    .insert(session_key.as_str(), session_bytes.as_slice())
44                    .map_err(|e| {
45                        Error::Storage(format!("Failed to insert recommendation session: {}", e))
46                    })?;
47            }
48
49            {
50                let mut episode_index = write_txn
51                    .open_table(RECOMMENDATION_EPISODE_INDEX_TABLE)
52                    .map_err(|e| {
53                        Error::Storage(format!(
54                            "Failed to open recommendation episode index: {}",
55                            e
56                        ))
57                    })?;
58                episode_index
59                    .insert(episode_key.as_str(), session_key.as_str())
60                    .map_err(|e| {
61                        Error::Storage(format!(
62                            "Failed to update recommendation episode index: {}",
63                            e
64                        ))
65                    })?;
66            }
67
68            write_txn.commit().map_err(|e| {
69                Error::Storage(format!(
70                    "Failed to commit recommendation session transaction: {}",
71                    e
72                ))
73            })?;
74            Ok(())
75        })
76        .await
77    }
78
79    pub async fn get_recommendation_session(
80        &self,
81        session_id: Uuid,
82    ) -> Result<Option<RecommendationSession>> {
83        self.read_postcard_entry(RECOMMENDATION_SESSIONS_TABLE, session_id.to_string())
84            .await
85    }
86
87    pub async fn get_recommendation_session_for_episode(
88        &self,
89        episode_id: Uuid,
90    ) -> Result<Option<RecommendationSession>> {
91        let db = Arc::clone(&self.db);
92        let episode_key = episode_id.to_string();
93
94        let session_id = with_db_timeout(move || {
95            let read_txn = db
96                .begin_read()
97                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
98            let table = read_txn
99                .open_table(RECOMMENDATION_EPISODE_INDEX_TABLE)
100                .map_err(|e| {
101                    Error::Storage(format!(
102                        "Failed to open recommendation episode index: {}",
103                        e
104                    ))
105                })?;
106            let entry = table
107                .get(episode_key.as_str())
108                .map_err(|e| Error::Storage(format!("Failed to read episode index: {}", e)))?;
109            let value = entry.map(|v| v.value().to_string());
110            Ok(value)
111        })
112        .await?;
113
114        if let Some(session_id) = session_id {
115            self.read_postcard_entry(RECOMMENDATION_SESSIONS_TABLE, session_id)
116                .await
117        } else {
118            Ok(None)
119        }
120    }
121
122    pub async fn store_recommendation_feedback(
123        &self,
124        feedback: &RecommendationFeedback,
125    ) -> Result<()> {
126        let db = Arc::clone(&self.db);
127        let feedback_bytes = postcard::to_allocvec(feedback).map_err(|e| {
128            Error::Storage(format!(
129                "Failed to serialize recommendation feedback: {}",
130                e
131            ))
132        })?;
133        let session_key = feedback.session_id.to_string();
134
135        with_db_timeout(move || {
136            let write_txn = db
137                .begin_write()
138                .map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
139            {
140                let mut table = write_txn
141                    .open_table(RECOMMENDATION_FEEDBACK_TABLE)
142                    .map_err(|e| {
143                        Error::Storage(format!(
144                            "Failed to open recommendation feedback table: {}",
145                            e
146                        ))
147                    })?;
148                table
149                    .insert(session_key.as_str(), feedback_bytes.as_slice())
150                    .map_err(|e| {
151                        Error::Storage(format!("Failed to insert recommendation feedback: {}", e))
152                    })?;
153            }
154            write_txn.commit().map_err(|e| {
155                Error::Storage(format!(
156                    "Failed to commit recommendation feedback transaction: {}",
157                    e
158                ))
159            })?;
160            Ok(())
161        })
162        .await
163    }
164
165    pub async fn get_recommendation_feedback(
166        &self,
167        session_id: Uuid,
168    ) -> Result<Option<RecommendationFeedback>> {
169        self.read_postcard_entry(RECOMMENDATION_FEEDBACK_TABLE, session_id.to_string())
170            .await
171    }
172
173    pub async fn get_recommendation_stats(&self) -> Result<RecommendationStats> {
174        let sessions = self
175            .collect_postcard_entries::<RecommendationSession>(RECOMMENDATION_SESSIONS_TABLE)
176            .await?;
177        let feedback = self
178            .collect_postcard_entries::<RecommendationFeedback>(RECOMMENDATION_FEEDBACK_TABLE)
179            .await?;
180
181        let mut stats = RecommendationStats {
182            total_sessions: sessions.len(),
183            total_feedback: feedback.len(),
184            ..RecommendationStats::default()
185        };
186
187        let total_recommended: usize = sessions
188            .iter()
189            .map(|session| session.recommended_pattern_ids.len())
190            .sum();
191        let mut total_applied = 0usize;
192        let mut successful_applications = 0usize;
193        let mut total_ratings = 0f32;
194        let mut rating_count = 0usize;
195
196        for fb in &feedback {
197            total_applied += fb.applied_pattern_ids.len();
198            if matches!(
199                fb.outcome,
200                do_memory_core::types::TaskOutcome::Success { .. }
201                    | do_memory_core::types::TaskOutcome::PartialSuccess { .. }
202            ) {
203                successful_applications += fb.applied_pattern_ids.len();
204            }
205            if let Some(rating) = fb.agent_rating {
206                total_ratings += rating;
207                rating_count += 1;
208            }
209        }
210
211        stats.patterns_applied = total_applied;
212        stats.patterns_ignored = total_recommended.saturating_sub(total_applied);
213        stats.successful_applications = successful_applications;
214        stats.adoption_rate = if total_recommended > 0 {
215            total_applied as f32 / total_recommended as f32
216        } else {
217            0.0
218        };
219        stats.success_after_adoption_rate = if total_applied > 0 {
220            successful_applications as f32 / total_applied as f32
221        } else {
222            0.0
223        };
224        stats.avg_agent_rating = if rating_count > 0 {
225            Some(total_ratings / rating_count as f32)
226        } else {
227            None
228        };
229
230        Ok(stats)
231    }
232
233    async fn read_postcard_entry<T>(
234        &self,
235        table: TableDefinition<'static, &'static str, &'static [u8]>,
236        key: String,
237    ) -> Result<Option<T>>
238    where
239        T: DeserializeOwned + Send + 'static,
240    {
241        let db = Arc::clone(&self.db);
242
243        with_db_timeout(move || {
244            let read_txn = db
245                .begin_read()
246                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
247            let table = read_txn
248                .open_table(table)
249                .map_err(|e| Error::Storage(format!("Failed to open table: {}", e)))?;
250            let guard = table
251                .get(key.as_str())
252                .map_err(|e| Error::Storage(format!("Failed to read value: {}", e)))?;
253            if let Some(value) = guard {
254                let entry = postcard::from_bytes(value.value())
255                    .map_err(|e| Error::Storage(format!("Failed to deserialize entry: {}", e)))?;
256                Ok(Some(entry))
257            } else {
258                Ok(None)
259            }
260        })
261        .await
262    }
263
264    async fn collect_postcard_entries<T>(
265        &self,
266        table: TableDefinition<'static, &'static str, &'static [u8]>,
267    ) -> Result<Vec<T>>
268    where
269        T: DeserializeOwned + Send + 'static,
270    {
271        let db = Arc::clone(&self.db);
272
273        with_db_timeout(move || {
274            let read_txn = db
275                .begin_read()
276                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
277            let table = read_txn
278                .open_table(table)
279                .map_err(|e| Error::Storage(format!("Failed to open table: {}", e)))?;
280
281            let mut entries = Vec::new();
282            let iter = table
283                .iter()
284                .map_err(|e| Error::Storage(format!("Failed to iterate table entries: {}", e)))?;
285            for result in iter {
286                let (_key, value) = result
287                    .map_err(|e| Error::Storage(format!("Failed to read table entry: {}", e)))?;
288                let entry = postcard::from_bytes(value.value())
289                    .map_err(|e| Error::Storage(format!("Failed to deserialize entry: {}", e)))?;
290                entries.push(entry);
291            }
292
293            Ok(entries)
294        })
295        .await
296    }
297}