Skip to main content

zeph_memory/store/
preferences.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_common::text::truncate_to_bytes_ref;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use super::SqliteStore;
9use crate::error::MemoryError;
10
11#[derive(Debug, Clone)]
12pub struct LearnedPreferenceRow {
13    pub id: i64,
14    pub preference_key: String,
15    pub preference_value: String,
16    pub confidence: f64,
17    pub evidence_count: i64,
18    pub updated_at: String,
19}
20
21type PreferenceTuple = (i64, String, String, f64, i64, String);
22
23fn row_from_tuple(t: PreferenceTuple) -> LearnedPreferenceRow {
24    LearnedPreferenceRow {
25        id: t.0,
26        preference_key: t.1,
27        preference_value: t.2,
28        confidence: t.3,
29        evidence_count: t.4,
30        updated_at: t.5,
31    }
32}
33
34impl SqliteStore {
35    /// Insert or update a learned preference.
36    ///
37    /// When a key already exists, the value and metadata are updated and
38    /// `updated_at` is refreshed. `evidence_count` is set to the provided
39    /// value (caller is responsible for accumulation logic).
40    ///
41    /// Keys longer than 128 bytes or values longer than 256 bytes are silently
42    /// truncated at a UTF-8 character boundary before storage.
43    ///
44    /// # Errors
45    ///
46    /// Returns an error if the query fails.
47    pub async fn upsert_learned_preference(
48        &self,
49        key: &str,
50        value: &str,
51        confidence: f64,
52        evidence_count: i64,
53    ) -> Result<(), MemoryError> {
54        const MAX_KEY_BYTES: usize = 128;
55        const MAX_VALUE_BYTES: usize = 256;
56        let key_trunc = truncate_to_bytes_ref(key, MAX_KEY_BYTES);
57        let value_trunc = truncate_to_bytes_ref(value, MAX_VALUE_BYTES);
58        if key_trunc.len() < key.len() {
59            tracing::warn!(
60                original_len = key.len(),
61                "learned_preferences: key truncated to 128 bytes"
62            );
63        }
64        if value_trunc.len() < value.len() {
65            tracing::warn!(
66                original_len = value.len(),
67                "learned_preferences: value truncated to 256 bytes"
68            );
69        }
70        zeph_db::query(sql!(
71            "INSERT INTO learned_preferences \
72             (preference_key, preference_value, confidence, evidence_count, updated_at) \
73             VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) \
74             ON CONFLICT(preference_key) DO UPDATE SET \
75               preference_value = excluded.preference_value, \
76               confidence = excluded.confidence, \
77               evidence_count = excluded.evidence_count, \
78               updated_at = CURRENT_TIMESTAMP"
79        ))
80        .bind(key_trunc)
81        .bind(value_trunc)
82        .bind(confidence)
83        .bind(evidence_count)
84        .execute(&self.pool)
85        .await?;
86        Ok(())
87    }
88
89    /// Load all learned preferences, ordered by confidence descending.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the query fails.
94    pub async fn load_learned_preferences(&self) -> Result<Vec<LearnedPreferenceRow>, MemoryError> {
95        let rows: Vec<PreferenceTuple> = zeph_db::query_as(sql!(
96            "SELECT id, preference_key, preference_value, confidence, evidence_count, updated_at \
97             FROM learned_preferences \
98             ORDER BY confidence DESC"
99        ))
100        .fetch_all(&self.pool)
101        .await?;
102        Ok(rows.into_iter().map(row_from_tuple).collect())
103    }
104
105    /// Load corrections with `id > after_id`, ordered by id ascending.
106    ///
107    /// Used by the learning engine to process only new corrections since the
108    /// last analysis run (watermark-based incremental scan).
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the query fails.
113    pub async fn load_corrections_after(
114        &self,
115        after_id: i64,
116        limit: u32,
117    ) -> Result<Vec<super::corrections::UserCorrectionRow>, MemoryError> {
118        use super::corrections::UserCorrectionRow;
119
120        type Tuple = (
121            i64,
122            Option<i64>,
123            String,
124            String,
125            Option<String>,
126            String,
127            String,
128        );
129
130        let rows: Vec<Tuple> = zeph_db::query_as(sql!(
131            "SELECT id, session_id, original_output, correction_text, \
132             skill_name, correction_kind, created_at \
133             FROM user_corrections \
134             WHERE id > ? \
135             ORDER BY id ASC LIMIT ?"
136        ))
137        .bind(after_id)
138        .bind(limit)
139        .fetch_all(&self.pool)
140        .await?;
141
142        Ok(rows
143            .into_iter()
144            .map(|t| UserCorrectionRow {
145                id: t.0,
146                session_id: t.1,
147                original_output: t.2,
148                correction_text: t.3,
149                skill_name: t.4,
150                correction_kind: t.5,
151                created_at: t.6,
152            })
153            .collect())
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    async fn store() -> SqliteStore {
162        SqliteStore::new(":memory:").await.unwrap()
163    }
164
165    #[tokio::test]
166    async fn upsert_and_load() {
167        let s = store().await;
168        s.upsert_learned_preference("verbosity", "concise", 0.9, 5)
169            .await
170            .unwrap();
171        let rows = s.load_learned_preferences().await.unwrap();
172        assert_eq!(rows.len(), 1);
173        assert_eq!(rows[0].preference_key, "verbosity");
174        assert_eq!(rows[0].preference_value, "concise");
175        assert!((rows[0].confidence - 0.9).abs() < 1e-9);
176        assert_eq!(rows[0].evidence_count, 5);
177    }
178
179    #[tokio::test]
180    async fn upsert_updates_existing() {
181        let s = store().await;
182        s.upsert_learned_preference("verbosity", "concise", 0.8, 3)
183            .await
184            .unwrap();
185        s.upsert_learned_preference("verbosity", "verbose", 0.95, 8)
186            .await
187            .unwrap();
188        let rows = s.load_learned_preferences().await.unwrap();
189        assert_eq!(rows.len(), 1);
190        assert_eq!(rows[0].preference_value, "verbose");
191        assert!((rows[0].confidence - 0.95).abs() < 1e-9);
192        assert_eq!(rows[0].evidence_count, 8);
193    }
194
195    #[tokio::test]
196    async fn load_ordered_by_confidence() {
197        let s = store().await;
198        s.upsert_learned_preference("format_preference", "bullet points", 0.75, 3)
199            .await
200            .unwrap();
201        s.upsert_learned_preference("verbosity", "concise", 0.9, 5)
202            .await
203            .unwrap();
204        let rows = s.load_learned_preferences().await.unwrap();
205        assert_eq!(rows[0].preference_key, "verbosity");
206        assert_eq!(rows[1].preference_key, "format_preference");
207    }
208
209    #[tokio::test]
210    async fn load_corrections_after_watermark() {
211        let s = store().await;
212        // Insert two corrections
213        s.store_user_correction(None, "output", "be brief", None, "explicit_rejection")
214            .await
215            .unwrap();
216        let id2 = s
217            .store_user_correction(None, "output2", "use bullets", None, "alternative_request")
218            .await
219            .unwrap();
220        // Watermark at id2-1 => only id2 returned
221        let rows = s.load_corrections_after(id2 - 1, 10).await.unwrap();
222        assert_eq!(rows.len(), 1);
223        assert_eq!(rows[0].correction_text, "use bullets");
224    }
225}