Skip to main content

do_memory_storage_turso/storage/
recommendations.rs

1//! Recommendation attribution storage operations (ADR-044)
2
3use crate::TursoStorage;
4use do_memory_core::memory::attribution::{
5    RecommendationFeedback, RecommendationSession, RecommendationStats,
6};
7use do_memory_core::{Error, Result};
8use libsql::params;
9use tracing::debug;
10use uuid::Uuid;
11
12impl TursoStorage {
13    pub async fn store_recommendation_session(
14        &self,
15        session: &RecommendationSession,
16    ) -> Result<()> {
17        const SQL: &str = r#"
18            INSERT INTO recommendation_sessions (session_id, episode_id, timestamp, payload)
19            VALUES (?1, ?2, ?3, ?4)
20            ON CONFLICT(session_id) DO UPDATE SET
21                episode_id = excluded.episode_id,
22                timestamp = excluded.timestamp,
23                payload = excluded.payload
24        "#;
25
26        let payload = serde_json::to_string(session)
27            .map_err(|e| {
28                Error::Storage(format!("Failed to serialize recommendation session: {}", e))
29            })?
30            .into_boxed_str();
31        let (conn, _conn_id) = self.get_connection_with_id().await?;
32        let stmt = self
33            .prepared_cache
34            .get_or_prepare(&conn, SQL)
35            .await
36            .map_err(|e| Error::Storage(format!("Failed to prepare session insert: {}", e)))?;
37
38        stmt.execute(params![
39            session.session_id.to_string(),
40            session.episode_id.to_string(),
41            session.timestamp.timestamp(),
42            payload,
43        ])
44        .await
45        .map_err(|e| Error::Storage(format!("Failed to persist recommendation session: {}", e)))?;
46
47        debug!(session_id = %session.session_id, "Stored recommendation session");
48        Ok(())
49    }
50
51    pub async fn get_recommendation_session(
52        &self,
53        session_id: Uuid,
54    ) -> Result<Option<RecommendationSession>> {
55        let (conn, _conn_id) = self.get_connection_with_id().await?;
56        let mut rows = conn
57            .query(
58                "SELECT payload FROM recommendation_sessions WHERE session_id = ?",
59                params![session_id.to_string()],
60            )
61            .await
62            .map_err(|e| {
63                Error::Storage(format!("Failed to query recommendation session: {}", e))
64            })?;
65
66        if let Some(row) = rows
67            .next()
68            .await
69            .map_err(|e| Error::Storage(format!("Failed to read session row: {}", e)))?
70        {
71            let payload: String = row
72                .get(0)
73                .map_err(|e| Error::Storage(format!("Failed to read session payload: {}", e)))?;
74            let session = serde_json::from_str(&payload).map_err(|e| {
75                Error::Storage(format!(
76                    "Failed to deserialize recommendation session: {}",
77                    e
78                ))
79            })?;
80            Ok(Some(session))
81        } else {
82            Ok(None)
83        }
84    }
85
86    pub async fn get_recommendation_session_for_episode(
87        &self,
88        episode_id: Uuid,
89    ) -> Result<Option<RecommendationSession>> {
90        let (conn, _conn_id) = self.get_connection_with_id().await?;
91        let mut rows = conn
92            .query(
93                "SELECT payload FROM recommendation_sessions WHERE episode_id = ? ORDER BY timestamp DESC LIMIT 1",
94                params![episode_id.to_string()],
95            )
96            .await
97            .map_err(|e| {
98                Error::Storage(format!("Failed to query recommendation session: {}", e))
99            })?;
100
101        if let Some(row) = rows
102            .next()
103            .await
104            .map_err(|e| Error::Storage(format!("Failed to read session row: {}", e)))?
105        {
106            let payload: String = row
107                .get(0)
108                .map_err(|e| Error::Storage(format!("Failed to read session payload: {}", e)))?;
109            let session = serde_json::from_str(&payload).map_err(|e| {
110                Error::Storage(format!(
111                    "Failed to deserialize recommendation session: {}",
112                    e
113                ))
114            })?;
115            Ok(Some(session))
116        } else {
117            Ok(None)
118        }
119    }
120
121    pub async fn store_recommendation_feedback(
122        &self,
123        feedback: &RecommendationFeedback,
124    ) -> Result<()> {
125        const SQL: &str = r#"
126            INSERT INTO recommendation_feedback (session_id, payload)
127            VALUES (?1, ?2)
128            ON CONFLICT(session_id) DO UPDATE SET payload = excluded.payload
129        "#;
130
131        let payload = serde_json::to_string(feedback)
132            .map_err(|e| {
133                Error::Storage(format!(
134                    "Failed to serialize recommendation feedback: {}",
135                    e
136                ))
137            })?
138            .into_boxed_str();
139        let (conn, _conn_id) = self.get_connection_with_id().await?;
140        let stmt = self
141            .prepared_cache
142            .get_or_prepare(&conn, SQL)
143            .await
144            .map_err(|e| Error::Storage(format!("Failed to prepare feedback insert: {}", e)))?;
145
146        stmt.execute(params![feedback.session_id.to_string(), payload])
147            .await
148            .map_err(|e| {
149                Error::Storage(format!("Failed to persist recommendation feedback: {}", e))
150            })?;
151
152        debug!(session_id = %feedback.session_id, "Stored recommendation feedback");
153        Ok(())
154    }
155
156    pub async fn get_recommendation_feedback(
157        &self,
158        session_id: Uuid,
159    ) -> Result<Option<RecommendationFeedback>> {
160        let (conn, _conn_id) = self.get_connection_with_id().await?;
161        let mut rows = conn
162            .query(
163                "SELECT payload FROM recommendation_feedback WHERE session_id = ?",
164                params![session_id.to_string()],
165            )
166            .await
167            .map_err(|e| {
168                Error::Storage(format!("Failed to query recommendation feedback: {}", e))
169            })?;
170
171        if let Some(row) = rows
172            .next()
173            .await
174            .map_err(|e| Error::Storage(format!("Failed to read feedback row: {}", e)))?
175        {
176            let payload: String = row
177                .get(0)
178                .map_err(|e| Error::Storage(format!("Failed to read feedback payload: {}", e)))?;
179            let feedback = serde_json::from_str(&payload).map_err(|e| {
180                Error::Storage(format!(
181                    "Failed to deserialize recommendation feedback: {}",
182                    e
183                ))
184            })?;
185            Ok(Some(feedback))
186        } else {
187            Ok(None)
188        }
189    }
190
191    pub async fn get_recommendation_stats(&self) -> Result<RecommendationStats> {
192        let (conn, _conn_id) = self.get_connection_with_id().await?;
193
194        // Gather sessions
195        let mut rows = conn
196            .query("SELECT payload FROM recommendation_sessions", params![])
197            .await
198            .map_err(|e| {
199                Error::Storage(format!("Failed to query recommendation sessions: {}", e))
200            })?;
201
202        let mut stats = RecommendationStats::default();
203        let mut total_recommended = 0usize;
204
205        while let Some(row) = rows
206            .next()
207            .await
208            .map_err(|e| Error::Storage(format!("Failed to read session row: {}", e)))?
209        {
210            let payload: String = row
211                .get(0)
212                .map_err(|e| Error::Storage(format!("Failed to read session payload: {}", e)))?;
213            let session: RecommendationSession = serde_json::from_str(&payload).map_err(|e| {
214                Error::Storage(format!(
215                    "Failed to deserialize recommendation session: {}",
216                    e
217                ))
218            })?;
219
220            stats.total_sessions += 1;
221            total_recommended += session.recommended_pattern_ids.len();
222        }
223
224        // Gather feedback
225        let mut feedback_rows = conn
226            .query("SELECT payload FROM recommendation_feedback", params![])
227            .await
228            .map_err(|e| {
229                Error::Storage(format!("Failed to query recommendation feedback: {}", e))
230            })?;
231
232        let mut total_applied = 0usize;
233        let mut successful_applications = 0usize;
234        let mut total_ratings = 0f32;
235        let mut rating_count = 0usize;
236
237        while let Some(row) = feedback_rows
238            .next()
239            .await
240            .map_err(|e| Error::Storage(format!("Failed to read feedback row: {}", e)))?
241        {
242            let payload: String = row
243                .get(0)
244                .map_err(|e| Error::Storage(format!("Failed to read feedback payload: {}", e)))?;
245            let feedback: RecommendationFeedback = serde_json::from_str(&payload).map_err(|e| {
246                Error::Storage(format!(
247                    "Failed to deserialize recommendation feedback: {}",
248                    e
249                ))
250            })?;
251
252            stats.total_feedback += 1;
253            total_applied += feedback.applied_pattern_ids.len();
254
255            if matches!(
256                feedback.outcome,
257                do_memory_core::types::TaskOutcome::Success { .. }
258                    | do_memory_core::types::TaskOutcome::PartialSuccess { .. }
259            ) {
260                successful_applications += feedback.applied_pattern_ids.len();
261            }
262
263            if let Some(rating) = feedback.agent_rating {
264                total_ratings += rating;
265                rating_count += 1;
266            }
267        }
268
269        stats.patterns_applied = total_applied;
270        stats.patterns_ignored = total_recommended.saturating_sub(total_applied);
271        stats.successful_applications = successful_applications;
272
273        stats.adoption_rate = if total_recommended > 0 {
274            total_applied as f32 / total_recommended as f32
275        } else {
276            0.0
277        };
278
279        stats.success_after_adoption_rate = if total_applied > 0 {
280            successful_applications as f32 / total_applied as f32
281        } else {
282            0.0
283        };
284
285        stats.avg_agent_rating = if rating_count > 0 {
286            Some(total_ratings / rating_count as f32)
287        } else {
288            None
289        };
290
291        Ok(stats)
292    }
293}