do_memory_storage_turso/storage/
recommendations.rs1use crate::TursoStorage;
4use do_memory_core::memory::attribution::{
5 RecommendationFeedback, RecommendationSession, RecommendationStats,
6};
7use do_memory_core::{Error, Result};
8use libsql::params;
9use tracing::debug;
10use uuid::Uuid;
11
12impl TursoStorage {
13 pub async fn store_recommendation_session(
14 &self,
15 session: &RecommendationSession,
16 ) -> Result<()> {
17 const SQL: &str = r#"
18 INSERT INTO recommendation_sessions (session_id, episode_id, timestamp, payload)
19 VALUES (?1, ?2, ?3, ?4)
20 ON CONFLICT(session_id) DO UPDATE SET
21 episode_id = excluded.episode_id,
22 timestamp = excluded.timestamp,
23 payload = excluded.payload
24 "#;
25
26 let payload = serde_json::to_string(session)
27 .map_err(|e| {
28 Error::Storage(format!("Failed to serialize recommendation session: {}", e))
29 })?
30 .into_boxed_str();
31 let (conn, _conn_id) = self.get_connection_with_id().await?;
32 let stmt = self
33 .prepared_cache
34 .get_or_prepare(&conn, SQL)
35 .await
36 .map_err(|e| Error::Storage(format!("Failed to prepare session insert: {}", e)))?;
37
38 stmt.execute(params![
39 session.session_id.to_string(),
40 session.episode_id.to_string(),
41 session.timestamp.timestamp(),
42 payload,
43 ])
44 .await
45 .map_err(|e| Error::Storage(format!("Failed to persist recommendation session: {}", e)))?;
46
47 debug!(session_id = %session.session_id, "Stored recommendation session");
48 Ok(())
49 }
50
51 pub async fn get_recommendation_session(
52 &self,
53 session_id: Uuid,
54 ) -> Result<Option<RecommendationSession>> {
55 let (conn, _conn_id) = self.get_connection_with_id().await?;
56 let mut rows = conn
57 .query(
58 "SELECT payload FROM recommendation_sessions WHERE session_id = ?",
59 params![session_id.to_string()],
60 )
61 .await
62 .map_err(|e| {
63 Error::Storage(format!("Failed to query recommendation session: {}", e))
64 })?;
65
66 if let Some(row) = rows
67 .next()
68 .await
69 .map_err(|e| Error::Storage(format!("Failed to read session row: {}", e)))?
70 {
71 let payload: String = row
72 .get(0)
73 .map_err(|e| Error::Storage(format!("Failed to read session payload: {}", e)))?;
74 let session = serde_json::from_str(&payload).map_err(|e| {
75 Error::Storage(format!(
76 "Failed to deserialize recommendation session: {}",
77 e
78 ))
79 })?;
80 Ok(Some(session))
81 } else {
82 Ok(None)
83 }
84 }
85
86 pub async fn get_recommendation_session_for_episode(
87 &self,
88 episode_id: Uuid,
89 ) -> Result<Option<RecommendationSession>> {
90 let (conn, _conn_id) = self.get_connection_with_id().await?;
91 let mut rows = conn
92 .query(
93 "SELECT payload FROM recommendation_sessions WHERE episode_id = ? ORDER BY timestamp DESC LIMIT 1",
94 params![episode_id.to_string()],
95 )
96 .await
97 .map_err(|e| {
98 Error::Storage(format!("Failed to query recommendation session: {}", e))
99 })?;
100
101 if let Some(row) = rows
102 .next()
103 .await
104 .map_err(|e| Error::Storage(format!("Failed to read session row: {}", e)))?
105 {
106 let payload: String = row
107 .get(0)
108 .map_err(|e| Error::Storage(format!("Failed to read session payload: {}", e)))?;
109 let session = serde_json::from_str(&payload).map_err(|e| {
110 Error::Storage(format!(
111 "Failed to deserialize recommendation session: {}",
112 e
113 ))
114 })?;
115 Ok(Some(session))
116 } else {
117 Ok(None)
118 }
119 }
120
121 pub async fn store_recommendation_feedback(
122 &self,
123 feedback: &RecommendationFeedback,
124 ) -> Result<()> {
125 const SQL: &str = r#"
126 INSERT INTO recommendation_feedback (session_id, payload)
127 VALUES (?1, ?2)
128 ON CONFLICT(session_id) DO UPDATE SET payload = excluded.payload
129 "#;
130
131 let payload = serde_json::to_string(feedback)
132 .map_err(|e| {
133 Error::Storage(format!(
134 "Failed to serialize recommendation feedback: {}",
135 e
136 ))
137 })?
138 .into_boxed_str();
139 let (conn, _conn_id) = self.get_connection_with_id().await?;
140 let stmt = self
141 .prepared_cache
142 .get_or_prepare(&conn, SQL)
143 .await
144 .map_err(|e| Error::Storage(format!("Failed to prepare feedback insert: {}", e)))?;
145
146 stmt.execute(params![feedback.session_id.to_string(), payload])
147 .await
148 .map_err(|e| {
149 Error::Storage(format!("Failed to persist recommendation feedback: {}", e))
150 })?;
151
152 debug!(session_id = %feedback.session_id, "Stored recommendation feedback");
153 Ok(())
154 }
155
156 pub async fn get_recommendation_feedback(
157 &self,
158 session_id: Uuid,
159 ) -> Result<Option<RecommendationFeedback>> {
160 let (conn, _conn_id) = self.get_connection_with_id().await?;
161 let mut rows = conn
162 .query(
163 "SELECT payload FROM recommendation_feedback WHERE session_id = ?",
164 params![session_id.to_string()],
165 )
166 .await
167 .map_err(|e| {
168 Error::Storage(format!("Failed to query recommendation feedback: {}", e))
169 })?;
170
171 if let Some(row) = rows
172 .next()
173 .await
174 .map_err(|e| Error::Storage(format!("Failed to read feedback row: {}", e)))?
175 {
176 let payload: String = row
177 .get(0)
178 .map_err(|e| Error::Storage(format!("Failed to read feedback payload: {}", e)))?;
179 let feedback = serde_json::from_str(&payload).map_err(|e| {
180 Error::Storage(format!(
181 "Failed to deserialize recommendation feedback: {}",
182 e
183 ))
184 })?;
185 Ok(Some(feedback))
186 } else {
187 Ok(None)
188 }
189 }
190
191 pub async fn get_recommendation_stats(&self) -> Result<RecommendationStats> {
192 let (conn, _conn_id) = self.get_connection_with_id().await?;
193
194 let mut rows = conn
196 .query("SELECT payload FROM recommendation_sessions", params![])
197 .await
198 .map_err(|e| {
199 Error::Storage(format!("Failed to query recommendation sessions: {}", e))
200 })?;
201
202 let mut stats = RecommendationStats::default();
203 let mut total_recommended = 0usize;
204
205 while let Some(row) = rows
206 .next()
207 .await
208 .map_err(|e| Error::Storage(format!("Failed to read session row: {}", e)))?
209 {
210 let payload: String = row
211 .get(0)
212 .map_err(|e| Error::Storage(format!("Failed to read session payload: {}", e)))?;
213 let session: RecommendationSession = serde_json::from_str(&payload).map_err(|e| {
214 Error::Storage(format!(
215 "Failed to deserialize recommendation session: {}",
216 e
217 ))
218 })?;
219
220 stats.total_sessions += 1;
221 total_recommended += session.recommended_pattern_ids.len();
222 }
223
224 let mut feedback_rows = conn
226 .query("SELECT payload FROM recommendation_feedback", params![])
227 .await
228 .map_err(|e| {
229 Error::Storage(format!("Failed to query recommendation feedback: {}", e))
230 })?;
231
232 let mut total_applied = 0usize;
233 let mut successful_applications = 0usize;
234 let mut total_ratings = 0f32;
235 let mut rating_count = 0usize;
236
237 while let Some(row) = feedback_rows
238 .next()
239 .await
240 .map_err(|e| Error::Storage(format!("Failed to read feedback row: {}", e)))?
241 {
242 let payload: String = row
243 .get(0)
244 .map_err(|e| Error::Storage(format!("Failed to read feedback payload: {}", e)))?;
245 let feedback: RecommendationFeedback = serde_json::from_str(&payload).map_err(|e| {
246 Error::Storage(format!(
247 "Failed to deserialize recommendation feedback: {}",
248 e
249 ))
250 })?;
251
252 stats.total_feedback += 1;
253 total_applied += feedback.applied_pattern_ids.len();
254
255 if matches!(
256 feedback.outcome,
257 do_memory_core::types::TaskOutcome::Success { .. }
258 | do_memory_core::types::TaskOutcome::PartialSuccess { .. }
259 ) {
260 successful_applications += feedback.applied_pattern_ids.len();
261 }
262
263 if let Some(rating) = feedback.agent_rating {
264 total_ratings += rating;
265 rating_count += 1;
266 }
267 }
268
269 stats.patterns_applied = total_applied;
270 stats.patterns_ignored = total_recommended.saturating_sub(total_applied);
271 stats.successful_applications = successful_applications;
272
273 stats.adoption_rate = if total_recommended > 0 {
274 total_applied as f32 / total_recommended as f32
275 } else {
276 0.0
277 };
278
279 stats.success_after_adoption_rate = if total_applied > 0 {
280 successful_applications as f32 / total_applied as f32
281 } else {
282 0.0
283 };
284
285 stats.avg_agent_rating = if rating_count > 0 {
286 Some(total_ratings / rating_count as f32)
287 } else {
288 None
289 };
290
291 Ok(stats)
292 }
293}