1use crate::scenes::MemScene;
7use crate::types::{MemSceneId, MessageId};
8#[allow(unused_imports)]
9use zeph_db::sql;
10
11use zeph_db::ActiveDialect;
12
13use crate::error::MemoryError;
14
15use super::SqliteStore;
16
17impl SqliteStore {
18 pub async fn find_unscened_semantic_messages(
26 &self,
27 limit: usize,
28 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
29 let limit_i64 = i64::try_from(limit).unwrap_or(i64::MAX);
30 let rows: Vec<(i64, String)> = zeph_db::query_as(
31 r"
32 SELECT m.id, m.content
33 FROM messages m
34 WHERE m.tier = 'semantic'
35 AND m.deleted_at IS NULL
36 AND m.id NOT IN (SELECT message_id FROM mem_scene_members)
37 ORDER BY m.id ASC
38 LIMIT ?
39 ",
40 )
41 .bind(limit_i64)
42 .fetch_all(&self.pool)
43 .await?;
44
45 Ok(rows
46 .into_iter()
47 .map(|(id, content)| (MessageId(id), content))
48 .collect())
49 }
50
51 pub async fn insert_mem_scene(
59 &self,
60 label: &str,
61 profile: &str,
62 member_ids: &[MessageId],
63 ) -> Result<MemSceneId, MemoryError> {
64 let member_count = i64::try_from(member_ids.len()).unwrap_or(0);
65 let mut tx = self.pool.begin().await?;
66
67 let row: (i64,) = zeph_db::query_as(sql!(
68 "INSERT INTO mem_scenes (label, profile, member_count) VALUES (?, ?, ?) RETURNING id"
69 ))
70 .bind(label)
71 .bind(profile)
72 .bind(member_count)
73 .fetch_one(&mut *tx)
74 .await?;
75 let scene_id = row.0;
76
77 let member_sql = format!(
78 "{} INTO mem_scene_members (scene_id, message_id) VALUES (?, ?){}",
79 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
80 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
81 );
82 for &msg_id in member_ids {
83 zeph_db::query(&member_sql)
84 .bind(scene_id)
85 .bind(msg_id.0)
86 .execute(&mut *tx)
87 .await?;
88 }
89
90 tx.commit().await?;
91 Ok(MemSceneId(scene_id))
92 }
93
94 pub async fn list_mem_scenes(&self) -> Result<Vec<MemScene>, MemoryError> {
100 let rows: Vec<(i64, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
101 "SELECT id, label, profile, member_count, created_at, updated_at \
102 FROM mem_scenes ORDER BY created_at DESC"
103 ))
104 .fetch_all(&self.pool)
105 .await?;
106
107 Ok(rows
108 .into_iter()
109 .map(
110 |(id, label, profile, member_count, created_at, updated_at)| MemScene {
111 id: MemSceneId(id),
112 label,
113 profile,
114 member_count: u32::try_from(member_count).unwrap_or(0),
115 created_at,
116 updated_at,
117 },
118 )
119 .collect())
120 }
121
122 pub async fn scene_member_ids(
128 &self,
129 scene_id: MemSceneId,
130 ) -> Result<Vec<MessageId>, MemoryError> {
131 let rows: Vec<(i64,)> = zeph_db::query_as(sql!(
132 "SELECT message_id FROM mem_scene_members WHERE scene_id = ?"
133 ))
134 .bind(scene_id.0)
135 .fetch_all(&self.pool)
136 .await?;
137
138 Ok(rows.into_iter().map(|(id,)| MessageId(id)).collect())
139 }
140
141 pub async fn reset_mem_scenes(&self) -> Result<u64, MemoryError> {
147 let result = zeph_db::query(sql!("DELETE FROM mem_scenes"))
148 .execute(&self.pool)
149 .await?;
150 Ok(result.rows_affected())
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 async fn make_store() -> SqliteStore {
159 SqliteStore::with_pool_size(":memory:", 1).await.unwrap()
163 }
164
165 async fn seed_messages(store: &SqliteStore, n: usize) -> Vec<MessageId> {
167 let cid = store.create_conversation().await.unwrap();
168 let mut ids = Vec::with_capacity(n);
169 for i in 0..n {
170 let id = store
171 .save_message(cid, "user", &format!("msg {i}"))
172 .await
173 .unwrap();
174 ids.push(id);
175 }
176 ids
177 }
178
179 #[tokio::test]
181 async fn insert_and_list_scene() {
182 let store = make_store().await;
183 let ids = seed_messages(&store, 2).await;
184
185 let scene_id = store
186 .insert_mem_scene("Rust Auth", "JWT tokens used for RS256.", &ids)
187 .await
188 .unwrap();
189 assert!(scene_id.0 > 0, "scene id must be positive");
190
191 let scenes = store.list_mem_scenes().await.unwrap();
192 assert_eq!(scenes.len(), 1);
193 assert_eq!(scenes[0].label, "Rust Auth");
194 assert_eq!(scenes[0].member_count, 2);
195 }
196
197 #[tokio::test]
199 async fn scene_member_ids_expansion() {
200 let store = make_store().await;
201 let ids = seed_messages(&store, 3).await;
202
203 let scene_id = store
204 .insert_mem_scene("Topic A", "Profile text.", &ids)
205 .await
206 .unwrap();
207
208 let members = store.scene_member_ids(scene_id).await.unwrap();
209 assert_eq!(members.len(), 3);
210 for id in &ids {
211 assert!(members.contains(id), "member {id} must be in expansion");
212 }
213 }
214
215 #[tokio::test]
217 async fn find_unscened_excludes_assigned_members() {
218 let store = make_store().await;
219 let ids = seed_messages(&store, 3).await;
220
221 for id in &ids {
223 zeph_db::query(sql!("UPDATE messages SET tier = 'semantic' WHERE id = ?"))
224 .bind(id.0)
225 .execute(store.pool())
226 .await
227 .unwrap();
228 }
229
230 let unscened = store.find_unscened_semantic_messages(100).await.unwrap();
232 assert_eq!(unscened.len(), 3);
233
234 store
236 .insert_mem_scene("Partial Scene", "Some profile", &ids[..2])
237 .await
238 .unwrap();
239
240 let unscened_after = store.find_unscened_semantic_messages(100).await.unwrap();
242 assert_eq!(unscened_after.len(), 1);
243 assert_eq!(unscened_after[0].0, ids[2]);
244 }
245
246 #[tokio::test]
248 async fn reset_scenes_clears_all() {
249 let store = make_store().await;
250 let ids1 = seed_messages(&store, 1).await;
251 let ids2 = seed_messages(&store, 1).await;
252
253 store
254 .insert_mem_scene("Scene 1", "Profile 1", &ids1)
255 .await
256 .unwrap();
257 store
258 .insert_mem_scene("Scene 2", "Profile 2", &ids2)
259 .await
260 .unwrap();
261
262 let deleted = store.reset_mem_scenes().await.unwrap();
263 assert_eq!(deleted, 2);
264
265 let scenes = store.list_mem_scenes().await.unwrap();
266 assert!(scenes.is_empty());
267 }
268
269 #[tokio::test]
273 async fn list_scenes_ordered_newest_first() {
274 let store = make_store().await;
275 let ids1 = seed_messages(&store, 1).await;
276 let ids2 = seed_messages(&store, 1).await;
277
278 zeph_db::query(
280 sql!("INSERT INTO mem_scenes (label, profile, member_count, created_at, updated_at) VALUES (?, ?, ?, ?, ?)"),
281 )
282 .bind("First")
283 .bind("Profile first")
284 .bind(1i64)
285 .bind(1_000_000i64)
286 .bind(1_000_000i64)
287 .execute(store.pool())
288 .await
289 .unwrap();
290 let scene1_id: (i64,) = zeph_db::query_as(sql!("SELECT last_insert_rowid()"))
291 .fetch_one(store.pool())
292 .await
293 .unwrap();
294
295 zeph_db::query(
296 sql!("INSERT INTO mem_scenes (label, profile, member_count, created_at, updated_at) VALUES (?, ?, ?, ?, ?)"),
297 )
298 .bind("Second")
299 .bind("Profile second")
300 .bind(1i64)
301 .bind(2_000_000i64)
302 .bind(2_000_000i64)
303 .execute(store.pool())
304 .await
305 .unwrap();
306 let scene2_id: (i64,) = zeph_db::query_as(sql!("SELECT last_insert_rowid()"))
307 .fetch_one(store.pool())
308 .await
309 .unwrap();
310
311 zeph_db::query(sql!(
313 "INSERT INTO mem_scene_members (scene_id, message_id) VALUES (?, ?)"
314 ))
315 .bind(scene1_id.0)
316 .bind(ids1[0].0)
317 .execute(store.pool())
318 .await
319 .unwrap();
320 zeph_db::query(sql!(
321 "INSERT INTO mem_scene_members (scene_id, message_id) VALUES (?, ?)"
322 ))
323 .bind(scene2_id.0)
324 .bind(ids2[0].0)
325 .execute(store.pool())
326 .await
327 .unwrap();
328
329 let scenes = store.list_mem_scenes().await.unwrap();
330 assert_eq!(scenes.len(), 2);
332 assert_eq!(scenes[0].label, "Second");
333 assert_eq!(scenes[1].label, "First");
334 }
335}