Skip to main content

kernex_memory/store/
outcomes.rs

1//! Reward-based learning: raw outcomes (working memory) and distilled lessons (long-term memory).
2//!
3//! All functions accept a `project` parameter for project-scoped isolation.
4//! Empty string `""` = general (no project).
5
6use super::Store;
7use kernex_core::error::KernexError;
8
9impl Store {
10    /// Store a raw outcome from a REWARD marker.
11    pub async fn store_outcome(
12        &self,
13        sender_id: &str,
14        domain: &str,
15        score: i32,
16        lesson: &str,
17        source: &str,
18        project: &str,
19    ) -> Result<(), KernexError> {
20        let id = uuid::Uuid::new_v4().to_string();
21        sqlx::query(
22            "INSERT INTO outcomes (id, sender_id, domain, score, lesson, source, project) \
23             VALUES (?, ?, ?, ?, ?, ?, ?)",
24        )
25        .bind(&id)
26        .bind(sender_id)
27        .bind(domain)
28        .bind(score)
29        .bind(lesson)
30        .bind(source)
31        .bind(project)
32        .execute(&self.pool)
33        .await
34        .map_err(|e| KernexError::Store(format!("store outcome: {e}")))?;
35        Ok(())
36    }
37
38    /// Get recent outcomes for a sender.
39    ///
40    /// When `project` is Some, returns only outcomes for that project.
41    /// When `project` is None, returns all outcomes.
42    pub async fn get_recent_outcomes(
43        &self,
44        sender_id: &str,
45        limit: i64,
46        project: Option<&str>,
47    ) -> Result<Vec<(i32, String, String, String)>, KernexError> {
48        let rows: Vec<(i32, String, String, String)> = match project {
49            Some(p) => {
50                sqlx::query_as(
51                    "SELECT score, domain, lesson, timestamp FROM outcomes \
52                     WHERE sender_id = ? AND project = ? ORDER BY timestamp DESC LIMIT ?",
53                )
54                .bind(sender_id)
55                .bind(p)
56                .bind(limit)
57                .fetch_all(&self.pool)
58                .await
59            }
60            None => {
61                sqlx::query_as(
62                    "SELECT score, domain, lesson, timestamp FROM outcomes \
63                     WHERE sender_id = ? ORDER BY timestamp DESC LIMIT ?",
64                )
65                .bind(sender_id)
66                .bind(limit)
67                .fetch_all(&self.pool)
68                .await
69            }
70        }
71        .map_err(|e| KernexError::Store(format!("get recent outcomes: {e}")))?;
72        Ok(rows)
73    }
74
75    /// Get recent outcomes across all users.
76    pub async fn get_all_recent_outcomes(
77        &self,
78        hours: i64,
79        limit: i64,
80        project: Option<&str>,
81    ) -> Result<Vec<(i32, String, String, String)>, KernexError> {
82        let rows: Vec<(i32, String, String, String)> = match project {
83            Some(p) => {
84                sqlx::query_as(
85                    "SELECT score, domain, lesson, timestamp FROM outcomes \
86                     WHERE datetime(timestamp) >= datetime('now', ? || ' hours') \
87                     AND project = ? \
88                     ORDER BY timestamp DESC LIMIT ?",
89                )
90                .bind(-hours)
91                .bind(p)
92                .bind(limit)
93                .fetch_all(&self.pool)
94                .await
95            }
96            None => {
97                sqlx::query_as(
98                    "SELECT score, domain, lesson, timestamp FROM outcomes \
99                     WHERE datetime(timestamp) >= datetime('now', ? || ' hours') \
100                     ORDER BY timestamp DESC LIMIT ?",
101                )
102                .bind(-hours)
103                .bind(limit)
104                .fetch_all(&self.pool)
105                .await
106            }
107        }
108        .map_err(|e| KernexError::Store(format!("get all recent outcomes: {e}")))?;
109        Ok(rows)
110    }
111
112    /// Store a distilled lesson with content-based deduplication.
113    ///
114    /// Multiple lessons can exist per (sender_id, domain, project). If the exact
115    /// same rule text already exists, its `occurrences` counter is bumped instead
116    /// of creating a duplicate. After insertion, a cap of 10 lessons per
117    /// (sender_id, domain, project) is enforced — oldest are pruned.
118    pub async fn store_lesson(
119        &self,
120        sender_id: &str,
121        domain: &str,
122        rule: &str,
123        project: &str,
124    ) -> Result<(), KernexError> {
125        let existing: Option<(String,)> = sqlx::query_as(
126            "SELECT id FROM lessons \
127             WHERE sender_id = ? AND domain = ? AND project = ? AND rule = ?",
128        )
129        .bind(sender_id)
130        .bind(domain)
131        .bind(project)
132        .bind(rule)
133        .fetch_optional(&self.pool)
134        .await
135        .map_err(|e| KernexError::Store(format!("store lesson check: {e}")))?;
136
137        if let Some((id,)) = existing {
138            sqlx::query(
139                "UPDATE lessons SET occurrences = occurrences + 1, \
140                 updated_at = datetime('now') WHERE id = ?",
141            )
142            .bind(&id)
143            .execute(&self.pool)
144            .await
145            .map_err(|e| KernexError::Store(format!("store lesson reinforce: {e}")))?;
146        } else {
147            let id = uuid::Uuid::new_v4().to_string();
148            sqlx::query(
149                "INSERT INTO lessons (id, sender_id, domain, rule, project) \
150                 VALUES (?, ?, ?, ?, ?)",
151            )
152            .bind(&id)
153            .bind(sender_id)
154            .bind(domain)
155            .bind(rule)
156            .bind(project)
157            .execute(&self.pool)
158            .await
159            .map_err(|e| KernexError::Store(format!("store lesson insert: {e}")))?;
160
161            // Cap enforcement: keep at most 10 per (sender, domain, project).
162            sqlx::query(
163                "DELETE FROM lessons WHERE id IN ( \
164                     SELECT id FROM lessons \
165                     WHERE sender_id = ? AND domain = ? AND project = ? \
166                     ORDER BY updated_at DESC, rowid DESC LIMIT -1 OFFSET 10 \
167                 )",
168            )
169            .bind(sender_id)
170            .bind(domain)
171            .bind(project)
172            .execute(&self.pool)
173            .await
174            .map_err(|e| KernexError::Store(format!("store lesson cap: {e}")))?;
175        }
176
177        Ok(())
178    }
179
180    /// Get lessons for a sender.
181    ///
182    /// When `project` is Some, returns project-specific lessons first, then general.
183    /// When `project` is None, returns general lessons only (project = '').
184    pub async fn get_lessons(
185        &self,
186        sender_id: &str,
187        project: Option<&str>,
188    ) -> Result<Vec<(String, String, String)>, KernexError> {
189        let rows: Vec<(String, String, String)> = match project {
190            Some(p) => {
191                sqlx::query_as(
192                    "SELECT domain, rule, project FROM lessons \
193                     WHERE sender_id = ? AND (project = ? OR project = '') \
194                     ORDER BY CASE WHEN project = ? THEN 0 ELSE 1 END, updated_at DESC \
195                     LIMIT 50",
196                )
197                .bind(sender_id)
198                .bind(p)
199                .bind(p)
200                .fetch_all(&self.pool)
201                .await
202            }
203            None => {
204                sqlx::query_as(
205                    "SELECT domain, rule, project FROM lessons \
206                     WHERE sender_id = ? AND project = '' ORDER BY updated_at DESC \
207                     LIMIT 50",
208                )
209                .bind(sender_id)
210                .fetch_all(&self.pool)
211                .await
212            }
213        }
214        .map_err(|e| KernexError::Store(format!("get lessons: {e}")))?;
215        Ok(rows)
216    }
217
218    /// Get all lessons across all users.
219    pub async fn get_all_lessons(
220        &self,
221        project: Option<&str>,
222    ) -> Result<Vec<(String, String, String)>, KernexError> {
223        let rows: Vec<(String, String, String)> =
224            match project {
225                Some(p) => {
226                    sqlx::query_as(
227                        "SELECT domain, rule, project FROM lessons \
228                     WHERE project = ? OR project = '' \
229                     ORDER BY CASE WHEN project = ? THEN 0 ELSE 1 END, updated_at DESC \
230                     LIMIT 50",
231                    )
232                    .bind(p)
233                    .bind(p)
234                    .fetch_all(&self.pool)
235                    .await
236                }
237                None => sqlx::query_as(
238                    "SELECT domain, rule, project FROM lessons ORDER BY updated_at DESC LIMIT 50",
239                )
240                .fetch_all(&self.pool)
241                .await,
242            }
243            .map_err(|e| KernexError::Store(format!("get all lessons: {e}")))?;
244        Ok(rows)
245    }
246}