Skip to main content

zeph_memory/store/
mem_scenes.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! `SQLite` operations for `MemScene` consolidation (#2332).
5
6use crate::scenes::MemScene;
7use crate::types::{MemSceneId, MessageId};
8#[allow(unused_imports)]
9use zeph_db::sql;
10
11use zeph_db::ActiveDialect;
12
13use crate::error::MemoryError;
14
15use super::SqliteStore;
16
17impl SqliteStore {
18    /// Fetch semantic-tier messages not yet assigned to any scene.
19    ///
20    /// Returns `(message_id, content)` pairs ordered by `id` ASC.
21    ///
22    /// # Errors
23    ///
24    /// Returns an error if the `SQLite` query fails.
25    pub async fn find_unscened_semantic_messages(
26        &self,
27        limit: usize,
28    ) -> Result<Vec<(MessageId, String)>, MemoryError> {
29        let limit_i64 = i64::try_from(limit).unwrap_or(i64::MAX);
30        let rows: Vec<(i64, String)> = zeph_db::query_as(
31            r"
32            SELECT m.id, m.content
33            FROM messages m
34            WHERE m.tier = 'semantic'
35              AND m.deleted_at IS NULL
36              AND m.id NOT IN (SELECT message_id FROM mem_scene_members)
37            ORDER BY m.id ASC
38            LIMIT ?
39            ",
40        )
41        .bind(limit_i64)
42        .fetch_all(&self.pool)
43        .await?;
44
45        Ok(rows
46            .into_iter()
47            .map(|(id, content)| (MessageId(id), content))
48            .collect())
49    }
50
51    /// Insert a new `MemScene` and link member messages to it.
52    ///
53    /// Returns the new scene's ID.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the `SQLite` insert fails.
58    pub async fn insert_mem_scene(
59        &self,
60        label: &str,
61        profile: &str,
62        member_ids: &[MessageId],
63    ) -> Result<MemSceneId, MemoryError> {
64        let member_count = i64::try_from(member_ids.len()).unwrap_or(0);
65        let mut tx = self.pool.begin().await?;
66
67        let row: (i64,) = zeph_db::query_as(sql!(
68            "INSERT INTO mem_scenes (label, profile, member_count) VALUES (?, ?, ?) RETURNING id"
69        ))
70        .bind(label)
71        .bind(profile)
72        .bind(member_count)
73        .fetch_one(&mut *tx)
74        .await?;
75        let scene_id = row.0;
76
77        let member_sql = format!(
78            "{} INTO mem_scene_members (scene_id, message_id) VALUES (?, ?){}",
79            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
80            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
81        );
82        for &msg_id in member_ids {
83            zeph_db::query(&member_sql)
84                .bind(scene_id)
85                .bind(msg_id.0)
86                .execute(&mut *tx)
87                .await?;
88        }
89
90        tx.commit().await?;
91        Ok(MemSceneId(scene_id))
92    }
93
94    /// List all `MemScenes` ordered by creation time descending.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the `SQLite` query fails.
99    pub async fn list_mem_scenes(&self) -> Result<Vec<MemScene>, MemoryError> {
100        let rows: Vec<(i64, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
101            "SELECT id, label, profile, member_count, created_at, updated_at \
102             FROM mem_scenes ORDER BY created_at DESC"
103        ))
104        .fetch_all(&self.pool)
105        .await?;
106
107        Ok(rows
108            .into_iter()
109            .map(
110                |(id, label, profile, member_count, created_at, updated_at)| MemScene {
111                    id: MemSceneId(id),
112                    label,
113                    profile,
114                    member_count: u32::try_from(member_count).unwrap_or(0),
115                    created_at,
116                    updated_at,
117                },
118            )
119            .collect())
120    }
121
122    /// Fetch member message IDs for a given scene.
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if the `SQLite` query fails.
127    pub async fn scene_member_ids(
128        &self,
129        scene_id: MemSceneId,
130    ) -> Result<Vec<MessageId>, MemoryError> {
131        let rows: Vec<(i64,)> = zeph_db::query_as(sql!(
132            "SELECT message_id FROM mem_scene_members WHERE scene_id = ?"
133        ))
134        .bind(scene_id.0)
135        .fetch_all(&self.pool)
136        .await?;
137
138        Ok(rows.into_iter().map(|(id,)| MessageId(id)).collect())
139    }
140
141    /// Delete all `MemScenes` and their memberships (reset for re-clustering).
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the `SQLite` delete fails.
146    pub async fn reset_mem_scenes(&self) -> Result<u64, MemoryError> {
147        let result = zeph_db::query(sql!("DELETE FROM mem_scenes"))
148            .execute(&self.pool)
149            .await?;
150        Ok(result.rows_affected())
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    async fn make_store() -> SqliteStore {
159        // pool_size=1 is required: SQLite :memory: creates an isolated database per
160        // connection, so a multi-connection pool would split inserts across empty DBs,
161        // causing FK violations when querying across connections.
162        SqliteStore::with_pool_size(":memory:", 1).await.unwrap()
163    }
164
165    /// Create N real messages in the DB and return their IDs.
166    async fn seed_messages(store: &SqliteStore, n: usize) -> Vec<MessageId> {
167        let cid = store.create_conversation().await.unwrap();
168        let mut ids = Vec::with_capacity(n);
169        for i in 0..n {
170            let id = store
171                .save_message(cid, "user", &format!("msg {i}"))
172                .await
173                .unwrap();
174            ids.push(id);
175        }
176        ids
177    }
178
179    // Test: insert_mem_scene creates a scene and links member messages.
180    #[tokio::test]
181    async fn insert_and_list_scene() {
182        let store = make_store().await;
183        let ids = seed_messages(&store, 2).await;
184
185        let scene_id = store
186            .insert_mem_scene("Rust Auth", "JWT tokens used for RS256.", &ids)
187            .await
188            .unwrap();
189        assert!(scene_id.0 > 0, "scene id must be positive");
190
191        let scenes = store.list_mem_scenes().await.unwrap();
192        assert_eq!(scenes.len(), 1);
193        assert_eq!(scenes[0].label, "Rust Auth");
194        assert_eq!(scenes[0].member_count, 2);
195    }
196
197    // Test: scene_member_ids returns linked message IDs (scene member expansion on demand).
198    #[tokio::test]
199    async fn scene_member_ids_expansion() {
200        let store = make_store().await;
201        let ids = seed_messages(&store, 3).await;
202
203        let scene_id = store
204            .insert_mem_scene("Topic A", "Profile text.", &ids)
205            .await
206            .unwrap();
207
208        let members = store.scene_member_ids(scene_id).await.unwrap();
209        assert_eq!(members.len(), 3);
210        for id in &ids {
211            assert!(members.contains(id), "member {id} must be in expansion");
212        }
213    }
214
215    // Test: find_unscened_semantic_messages excludes already-assigned messages.
216    #[tokio::test]
217    async fn find_unscened_excludes_assigned_members() {
218        let store = make_store().await;
219        let ids = seed_messages(&store, 3).await;
220
221        // Promote all to semantic tier.
222        for id in &ids {
223            zeph_db::query(sql!("UPDATE messages SET tier = 'semantic' WHERE id = ?"))
224                .bind(id.0)
225                .execute(store.pool())
226                .await
227                .unwrap();
228        }
229
230        // All three should appear as unscened.
231        let unscened = store.find_unscened_semantic_messages(100).await.unwrap();
232        assert_eq!(unscened.len(), 3);
233
234        // Assign first two to a scene.
235        store
236            .insert_mem_scene("Partial Scene", "Some profile", &ids[..2])
237            .await
238            .unwrap();
239
240        // Now only the third should be unscened.
241        let unscened_after = store.find_unscened_semantic_messages(100).await.unwrap();
242        assert_eq!(unscened_after.len(), 1);
243        assert_eq!(unscened_after[0].0, ids[2]);
244    }
245
246    // Test: reset_mem_scenes clears all scenes and allows re-clustering.
247    #[tokio::test]
248    async fn reset_scenes_clears_all() {
249        let store = make_store().await;
250        let ids1 = seed_messages(&store, 1).await;
251        let ids2 = seed_messages(&store, 1).await;
252
253        store
254            .insert_mem_scene("Scene 1", "Profile 1", &ids1)
255            .await
256            .unwrap();
257        store
258            .insert_mem_scene("Scene 2", "Profile 2", &ids2)
259            .await
260            .unwrap();
261
262        let deleted = store.reset_mem_scenes().await.unwrap();
263        assert_eq!(deleted, 2);
264
265        let scenes = store.list_mem_scenes().await.unwrap();
266        assert!(scenes.is_empty());
267    }
268
269    // Test: list_mem_scenes returns newest first (by id DESC as proxy for created_at DESC).
270    // unixepoch() has 1-second resolution — use explicit created_at override via INSERT to
271    // guarantee ordering even when both inserts happen within the same second.
272    #[tokio::test]
273    async fn list_scenes_ordered_newest_first() {
274        let store = make_store().await;
275        let ids1 = seed_messages(&store, 1).await;
276        let ids2 = seed_messages(&store, 1).await;
277
278        // Insert directly with distinct created_at values to avoid single-second collision.
279        zeph_db::query(
280            sql!("INSERT INTO mem_scenes (label, profile, member_count, created_at, updated_at) VALUES (?, ?, ?, ?, ?)"),
281        )
282        .bind("First")
283        .bind("Profile first")
284        .bind(1i64)
285        .bind(1_000_000i64)
286        .bind(1_000_000i64)
287        .execute(store.pool())
288        .await
289        .unwrap();
290        let scene1_id: (i64,) = zeph_db::query_as(sql!("SELECT last_insert_rowid()"))
291            .fetch_one(store.pool())
292            .await
293            .unwrap();
294
295        zeph_db::query(
296            sql!("INSERT INTO mem_scenes (label, profile, member_count, created_at, updated_at) VALUES (?, ?, ?, ?, ?)"),
297        )
298        .bind("Second")
299        .bind("Profile second")
300        .bind(1i64)
301        .bind(2_000_000i64)
302        .bind(2_000_000i64)
303        .execute(store.pool())
304        .await
305        .unwrap();
306        let scene2_id: (i64,) = zeph_db::query_as(sql!("SELECT last_insert_rowid()"))
307            .fetch_one(store.pool())
308            .await
309            .unwrap();
310
311        // Link messages to satisfy FK.
312        zeph_db::query(sql!(
313            "INSERT INTO mem_scene_members (scene_id, message_id) VALUES (?, ?)"
314        ))
315        .bind(scene1_id.0)
316        .bind(ids1[0].0)
317        .execute(store.pool())
318        .await
319        .unwrap();
320        zeph_db::query(sql!(
321            "INSERT INTO mem_scene_members (scene_id, message_id) VALUES (?, ?)"
322        ))
323        .bind(scene2_id.0)
324        .bind(ids2[0].0)
325        .execute(store.pool())
326        .await
327        .unwrap();
328
329        let scenes = store.list_mem_scenes().await.unwrap();
330        // "Second" has created_at=2_000_000 > "First" created_at=1_000_000 → Second comes first.
331        assert_eq!(scenes.len(), 2);
332        assert_eq!(scenes[0].label, "Second");
333        assert_eq!(scenes[1].label, "First");
334    }
335}