1use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10pub(crate) fn sanitize_fts5_query(query: &str) -> String {
20 query
21 .split(|c: char| !c.is_alphanumeric())
22 .filter(|t| !t.is_empty())
23 .collect::<Vec<_>>()
24 .join(" ")
25}
26
27fn parse_role(s: &str) -> Role {
28 match s {
29 "assistant" => Role::Assistant,
30 "system" => Role::System,
31 _ => Role::User,
32 }
33}
34
35#[must_use]
36pub fn role_str(role: Role) -> &'static str {
37 match role {
38 Role::System => "system",
39 Role::User => "user",
40 Role::Assistant => "assistant",
41 }
42}
43
44fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
49 if parts_json == "[]" {
50 return vec![];
51 }
52 match serde_json::from_str(parts_json) {
53 Ok(p) => p,
54 Err(e) => {
55 let truncated = parts_json.chars().take(120).collect::<String>();
56 tracing::warn!(
57 role = %role_str,
58 parts_json = %truncated,
59 error = %e,
60 "failed to deserialize message parts, falling back to empty"
61 );
62 vec![]
63 }
64 }
65}
66
67impl SqliteStore {
68 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
74 let row: (ConversationId,) =
75 sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
76 .fetch_one(&self.pool)
77 .await?;
78 Ok(row.0)
79 }
80
81 pub async fn save_message(
87 &self,
88 conversation_id: ConversationId,
89 role: &str,
90 content: &str,
91 ) -> Result<MessageId, MemoryError> {
92 self.save_message_with_parts(conversation_id, role, content, "[]")
93 .await
94 }
95
96 pub async fn save_message_with_parts(
102 &self,
103 conversation_id: ConversationId,
104 role: &str,
105 content: &str,
106 parts_json: &str,
107 ) -> Result<MessageId, MemoryError> {
108 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
109 .await
110 }
111
112 pub async fn save_message_with_metadata(
118 &self,
119 conversation_id: ConversationId,
120 role: &str,
121 content: &str,
122 parts_json: &str,
123 agent_visible: bool,
124 user_visible: bool,
125 ) -> Result<MessageId, MemoryError> {
126 let importance_score = crate::semantic::importance::compute_importance(content, role);
127 let row: (MessageId,) = sqlx::query_as(
128 "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
129 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id",
130 )
131 .bind(conversation_id)
132 .bind(role)
133 .bind(content)
134 .bind(parts_json)
135 .bind(i64::from(agent_visible))
136 .bind(i64::from(user_visible))
137 .bind(importance_score)
138 .fetch_one(&self.pool)
139 .await?;
140 Ok(row.0)
141 }
142
143 pub async fn load_history(
149 &self,
150 conversation_id: ConversationId,
151 limit: u32,
152 ) -> Result<Vec<Message>, MemoryError> {
153 let rows: Vec<(String, String, String, i64, i64, i64)> = sqlx::query_as(
154 "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
155 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
156 WHERE conversation_id = ? AND deleted_at IS NULL \
157 ORDER BY id DESC \
158 LIMIT ?\
159 ) ORDER BY id ASC",
160 )
161 .bind(conversation_id)
162 .bind(limit)
163 .fetch_all(&self.pool)
164 .await?;
165
166 let messages = rows
167 .into_iter()
168 .map(
169 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
170 let parts = parse_parts_json(&role_str, &parts_json);
171 Message {
172 role: parse_role(&role_str),
173 content,
174 parts,
175 metadata: MessageMetadata {
176 agent_visible: agent_visible != 0,
177 user_visible: user_visible != 0,
178 compacted_at: None,
179 deferred_summary: None,
180 focus_pinned: false,
181 focus_marker_id: None,
182 db_id: Some(row_id),
183 },
184 }
185 },
186 )
187 .collect();
188 Ok(messages)
189 }
190
191 pub async fn load_history_filtered(
199 &self,
200 conversation_id: ConversationId,
201 limit: u32,
202 agent_visible: Option<bool>,
203 user_visible: Option<bool>,
204 ) -> Result<Vec<Message>, MemoryError> {
205 let av = agent_visible.map(i64::from);
206 let uv = user_visible.map(i64::from);
207
208 let rows: Vec<(String, String, String, i64, i64, i64)> = sqlx::query_as(
209 "WITH recent AS (\
210 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
211 WHERE conversation_id = ? \
212 AND deleted_at IS NULL \
213 AND (? IS NULL OR agent_visible = ?) \
214 AND (? IS NULL OR user_visible = ?) \
215 ORDER BY id DESC \
216 LIMIT ?\
217 ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC",
218 )
219 .bind(conversation_id)
220 .bind(av)
221 .bind(av)
222 .bind(uv)
223 .bind(uv)
224 .bind(limit)
225 .fetch_all(&self.pool)
226 .await?;
227
228 let messages = rows
229 .into_iter()
230 .map(
231 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
232 let parts = parse_parts_json(&role_str, &parts_json);
233 Message {
234 role: parse_role(&role_str),
235 content,
236 parts,
237 metadata: MessageMetadata {
238 agent_visible: agent_visible != 0,
239 user_visible: user_visible != 0,
240 compacted_at: None,
241 deferred_summary: None,
242 focus_pinned: false,
243 focus_marker_id: None,
244 db_id: Some(row_id),
245 },
246 }
247 },
248 )
249 .collect();
250 Ok(messages)
251 }
252
253 pub async fn replace_conversation(
265 &self,
266 conversation_id: ConversationId,
267 compacted_range: std::ops::RangeInclusive<MessageId>,
268 summary_role: &str,
269 summary_content: &str,
270 ) -> Result<MessageId, MemoryError> {
271 let now = {
272 let secs = std::time::SystemTime::now()
273 .duration_since(std::time::UNIX_EPOCH)
274 .unwrap_or_default()
275 .as_secs();
276 format!("{secs}")
277 };
278 let start_id = compacted_range.start().0;
279 let end_id = compacted_range.end().0;
280
281 let mut tx = self.pool.begin().await?;
282
283 sqlx::query(
284 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
285 WHERE conversation_id = ? AND id >= ? AND id <= ?",
286 )
287 .bind(&now)
288 .bind(conversation_id)
289 .bind(start_id)
290 .bind(end_id)
291 .execute(&mut *tx)
292 .await?;
293
294 let row: (MessageId,) = sqlx::query_as(
296 "INSERT INTO messages \
297 (conversation_id, role, content, parts, agent_visible, user_visible) \
298 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
299 )
300 .bind(conversation_id)
301 .bind(summary_role)
302 .bind(summary_content)
303 .fetch_one(&mut *tx)
304 .await?;
305
306 tx.commit().await?;
307
308 Ok(row.0)
309 }
310
311 pub async fn apply_tool_pair_summaries(
321 &self,
322 conversation_id: ConversationId,
323 hide_ids: &[i64],
324 summaries: &[String],
325 ) -> Result<(), MemoryError> {
326 if hide_ids.is_empty() && summaries.is_empty() {
327 return Ok(());
328 }
329
330 let now = std::time::SystemTime::now()
331 .duration_since(std::time::UNIX_EPOCH)
332 .unwrap_or_default()
333 .as_secs()
334 .to_string();
335
336 let mut tx = self.pool.begin().await?;
337
338 for &id in hide_ids {
339 sqlx::query("UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?")
340 .bind(&now)
341 .bind(id)
342 .execute(&mut *tx)
343 .await?;
344 }
345
346 for summary in summaries {
347 let content = format!("[tool summary] {summary}");
348 let parts = serde_json::to_string(&[MessagePart::Summary {
349 text: summary.clone(),
350 }])
351 .unwrap_or_else(|_| "[]".to_string());
352 sqlx::query(
353 "INSERT INTO messages \
354 (conversation_id, role, content, parts, agent_visible, user_visible) \
355 VALUES (?, 'assistant', ?, ?, 1, 0)",
356 )
357 .bind(conversation_id)
358 .bind(&content)
359 .bind(&parts)
360 .execute(&mut *tx)
361 .await?;
362 }
363
364 tx.commit().await?;
365 Ok(())
366 }
367
368 pub async fn oldest_message_ids(
374 &self,
375 conversation_id: ConversationId,
376 n: u32,
377 ) -> Result<Vec<MessageId>, MemoryError> {
378 let rows: Vec<(MessageId,)> = sqlx::query_as(
379 "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
380 )
381 .bind(conversation_id)
382 .bind(n)
383 .fetch_all(&self.pool)
384 .await?;
385 Ok(rows.into_iter().map(|r| r.0).collect())
386 }
387
388 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
394 let row: Option<(ConversationId,)> =
395 sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
396 .fetch_optional(&self.pool)
397 .await?;
398 Ok(row.map(|r| r.0))
399 }
400
401 pub async fn message_by_id(
407 &self,
408 message_id: MessageId,
409 ) -> Result<Option<Message>, MemoryError> {
410 let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
411 "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
412 )
413 .bind(message_id)
414 .fetch_optional(&self.pool)
415 .await?;
416
417 Ok(row.map(
418 |(role_str, content, parts_json, agent_visible, user_visible)| {
419 let parts = parse_parts_json(&role_str, &parts_json);
420 Message {
421 role: parse_role(&role_str),
422 content,
423 parts,
424 metadata: MessageMetadata {
425 agent_visible: agent_visible != 0,
426 user_visible: user_visible != 0,
427 compacted_at: None,
428 deferred_summary: None,
429 focus_pinned: false,
430 focus_marker_id: None,
431 db_id: None,
432 },
433 }
434 },
435 ))
436 }
437
438 pub async fn messages_by_ids(
444 &self,
445 ids: &[MessageId],
446 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
447 if ids.is_empty() {
448 return Ok(Vec::new());
449 }
450
451 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
452
453 let query = format!(
454 "SELECT id, role, content, parts FROM messages \
455 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
456 );
457 let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
458 for &id in ids {
459 q = q.bind(id);
460 }
461
462 let rows = q.fetch_all(&self.pool).await?;
463
464 Ok(rows
465 .into_iter()
466 .map(|(id, role_str, content, parts_json)| {
467 let parts = parse_parts_json(&role_str, &parts_json);
468 (
469 id,
470 Message {
471 role: parse_role(&role_str),
472 content,
473 parts,
474 metadata: MessageMetadata::default(),
475 },
476 )
477 })
478 .collect())
479 }
480
481 pub async fn unembedded_message_ids(
487 &self,
488 limit: Option<usize>,
489 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
490 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
491
492 let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
493 "SELECT m.id, m.conversation_id, m.role, m.content \
494 FROM messages m \
495 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
496 WHERE em.id IS NULL AND m.deleted_at IS NULL \
497 ORDER BY m.id ASC \
498 LIMIT ?",
499 )
500 .bind(effective_limit)
501 .fetch_all(&self.pool)
502 .await?;
503
504 Ok(rows)
505 }
506
507 pub async fn count_messages(
513 &self,
514 conversation_id: ConversationId,
515 ) -> Result<i64, MemoryError> {
516 let row: (i64,) = sqlx::query_as(
517 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
518 )
519 .bind(conversation_id)
520 .fetch_one(&self.pool)
521 .await?;
522 Ok(row.0)
523 }
524
525 pub async fn count_messages_after(
531 &self,
532 conversation_id: ConversationId,
533 after_id: MessageId,
534 ) -> Result<i64, MemoryError> {
535 let row: (i64,) =
536 sqlx::query_as(
537 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
538 )
539 .bind(conversation_id)
540 .bind(after_id)
541 .fetch_one(&self.pool)
542 .await?;
543 Ok(row.0)
544 }
545
546 pub async fn keyword_search(
555 &self,
556 query: &str,
557 limit: usize,
558 conversation_id: Option<ConversationId>,
559 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
560 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
561 let safe_query = sanitize_fts5_query(query);
562 if safe_query.is_empty() {
563 return Ok(Vec::new());
564 }
565
566 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
567 sqlx::query_as(
568 "SELECT m.id, -rank AS score \
569 FROM messages_fts f \
570 JOIN messages m ON m.id = f.rowid \
571 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
572 ORDER BY rank \
573 LIMIT ?",
574 )
575 .bind(&safe_query)
576 .bind(cid)
577 .bind(effective_limit)
578 .fetch_all(&self.pool)
579 .await?
580 } else {
581 sqlx::query_as(
582 "SELECT m.id, -rank AS score \
583 FROM messages_fts f \
584 JOIN messages m ON m.id = f.rowid \
585 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
586 ORDER BY rank \
587 LIMIT ?",
588 )
589 .bind(&safe_query)
590 .bind(effective_limit)
591 .fetch_all(&self.pool)
592 .await?
593 };
594
595 Ok(rows)
596 }
597
598 pub async fn keyword_search_with_time_range(
611 &self,
612 query: &str,
613 limit: usize,
614 conversation_id: Option<ConversationId>,
615 after: Option<&str>,
616 before: Option<&str>,
617 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
618 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
619 let safe_query = sanitize_fts5_query(query);
620 if safe_query.is_empty() {
621 return Ok(Vec::new());
622 }
623
624 let after_clause = if after.is_some() {
626 " AND m.created_at > ?"
627 } else {
628 ""
629 };
630 let before_clause = if before.is_some() {
631 " AND m.created_at < ?"
632 } else {
633 ""
634 };
635 let conv_clause = if conversation_id.is_some() {
636 " AND m.conversation_id = ?"
637 } else {
638 ""
639 };
640
641 let sql = format!(
642 "SELECT m.id, -rank AS score \
643 FROM messages_fts f \
644 JOIN messages m ON m.id = f.rowid \
645 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
646 {after_clause}{before_clause}{conv_clause} \
647 ORDER BY rank \
648 LIMIT ?"
649 );
650
651 let mut q = sqlx::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
652 if let Some(a) = after {
653 q = q.bind(a);
654 }
655 if let Some(b) = before {
656 q = q.bind(b);
657 }
658 if let Some(cid) = conversation_id {
659 q = q.bind(cid);
660 }
661 q = q.bind(effective_limit);
662
663 Ok(q.fetch_all(&self.pool).await?)
664 }
665
666 pub async fn message_timestamps(
674 &self,
675 ids: &[MessageId],
676 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
677 if ids.is_empty() {
678 return Ok(std::collections::HashMap::new());
679 }
680
681 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
682 let query = format!(
683 "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
684 FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
685 );
686 let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
687 for &id in ids {
688 q = q.bind(id);
689 }
690
691 let rows = q.fetch_all(&self.pool).await?;
692 Ok(rows.into_iter().collect())
693 }
694
695 pub async fn load_messages_range(
701 &self,
702 conversation_id: ConversationId,
703 after_message_id: MessageId,
704 limit: usize,
705 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
706 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
707
708 let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
709 "SELECT id, role, content FROM messages \
710 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
711 ORDER BY id ASC LIMIT ?",
712 )
713 .bind(conversation_id)
714 .bind(after_message_id)
715 .bind(effective_limit)
716 .fetch_all(&self.pool)
717 .await?;
718
719 Ok(rows)
720 }
721
722 pub async fn get_eviction_candidates(
730 &self,
731 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
732 let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
733 "SELECT id, created_at, last_accessed, access_count \
734 FROM messages WHERE deleted_at IS NULL",
735 )
736 .fetch_all(&self.pool)
737 .await?;
738
739 Ok(rows
740 .into_iter()
741 .map(
742 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
743 id,
744 created_at,
745 last_accessed,
746 access_count: access_count.try_into().unwrap_or(0),
747 },
748 )
749 .collect())
750 }
751
752 pub async fn soft_delete_messages(
760 &self,
761 ids: &[MessageId],
762 ) -> Result<(), crate::error::MemoryError> {
763 if ids.is_empty() {
764 return Ok(());
765 }
766 for &id in ids {
768 sqlx::query(
769 "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
770 )
771 .bind(id)
772 .execute(&self.pool)
773 .await?;
774 }
775 Ok(())
776 }
777
778 pub async fn get_soft_deleted_message_ids(
784 &self,
785 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
786 let rows: Vec<(MessageId,)> = sqlx::query_as(
787 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
788 )
789 .fetch_all(&self.pool)
790 .await?;
791 Ok(rows.into_iter().map(|(id,)| id).collect())
792 }
793
794 pub async fn mark_qdrant_cleaned(
800 &self,
801 ids: &[MessageId],
802 ) -> Result<(), crate::error::MemoryError> {
803 for &id in ids {
804 sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
805 .bind(id)
806 .execute(&self.pool)
807 .await?;
808 }
809 Ok(())
810 }
811
812 pub async fn fetch_importance_scores(
820 &self,
821 ids: &[MessageId],
822 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
823 if ids.is_empty() {
824 return Ok(std::collections::HashMap::new());
825 }
826 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
827 let query = format!(
828 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
829 );
830 let mut q = sqlx::query_as::<_, (MessageId, f64)>(&query);
831 for &id in ids {
832 q = q.bind(id);
833 }
834 let rows = q.fetch_all(&self.pool).await?;
835 Ok(rows.into_iter().collect())
836 }
837
838 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
846 if ids.is_empty() {
847 return Ok(());
848 }
849 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
850 let query = format!(
851 "UPDATE messages SET access_count = access_count + 1, last_accessed = datetime('now') \
852 WHERE id IN ({placeholders})"
853 );
854 let mut q = sqlx::query(&query);
855 for &id in ids {
856 q = q.bind(id);
857 }
858 q.execute(&self.pool).await?;
859 Ok(())
860 }
861
862 pub async fn find_promotion_candidates(
871 &self,
872 min_sessions: u32,
873 batch_size: usize,
874 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
875 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
876 let min = i64::from(min_sessions);
877 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = sqlx::query_as(
878 "SELECT id, conversation_id, content, session_count, importance_score \
879 FROM messages \
880 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
881 ORDER BY session_count DESC, importance_score DESC \
882 LIMIT ?",
883 )
884 .bind(min)
885 .bind(limit)
886 .fetch_all(&self.pool)
887 .await?;
888
889 Ok(rows
890 .into_iter()
891 .map(
892 |(id, conversation_id, content, session_count, importance_score)| {
893 PromotionCandidate {
894 id,
895 conversation_id,
896 content,
897 session_count: session_count.try_into().unwrap_or(0),
898 importance_score,
899 }
900 },
901 )
902 .collect())
903 }
904
905 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
913 let rows: Vec<(String, i64)> = sqlx::query_as(
914 "SELECT tier, COUNT(*) FROM messages \
915 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
916 GROUP BY tier",
917 )
918 .fetch_all(&self.pool)
919 .await?;
920
921 let mut episodic = 0i64;
922 let mut semantic = 0i64;
923 for (tier, count) in rows {
924 match tier.as_str() {
925 "episodic" => episodic = count,
926 "semantic" => semantic = count,
927 _ => {}
928 }
929 }
930 Ok((episodic, semantic))
931 }
932
933 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
939 let row: (i64,) = sqlx::query_as(
940 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL",
941 )
942 .fetch_one(&self.pool)
943 .await?;
944 Ok(row.0)
945 }
946
947 pub async fn promote_to_semantic(
960 &self,
961 conversation_id: ConversationId,
962 merged_content: &str,
963 original_ids: &[MessageId],
964 ) -> Result<MessageId, MemoryError> {
965 if original_ids.is_empty() {
966 return Err(MemoryError::Other(
967 "promote_to_semantic: original_ids must not be empty".into(),
968 ));
969 }
970
971 let mut tx = self.pool.begin().await?;
972
973 let row: (MessageId,) = sqlx::query_as(
975 "INSERT INTO messages \
976 (conversation_id, role, content, parts, agent_visible, user_visible, \
977 tier, promotion_timestamp) \
978 VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', unixepoch()) \
979 RETURNING id",
980 )
981 .bind(conversation_id)
982 .bind(merged_content)
983 .fetch_one(&mut *tx)
984 .await?;
985
986 let new_id = row.0;
987
988 for &id in original_ids {
990 sqlx::query(
991 "UPDATE messages \
992 SET deleted_at = datetime('now'), qdrant_cleaned = 0 \
993 WHERE id = ? AND deleted_at IS NULL",
994 )
995 .bind(id)
996 .execute(&mut *tx)
997 .await?;
998 }
999
1000 tx.commit().await?;
1001 Ok(new_id)
1002 }
1003
1004 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1013 if ids.is_empty() {
1014 return Ok(0);
1015 }
1016 let mut count = 0usize;
1017 for &id in ids {
1018 let result = sqlx::query(
1019 "UPDATE messages \
1020 SET tier = 'semantic', promotion_timestamp = unixepoch() \
1021 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'",
1022 )
1023 .bind(id)
1024 .execute(&self.pool)
1025 .await?;
1026 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1027 }
1028 Ok(count)
1029 }
1030
1031 pub async fn increment_session_counts_for_conversation(
1040 &self,
1041 conversation_id: ConversationId,
1042 ) -> Result<(), MemoryError> {
1043 sqlx::query(
1044 "UPDATE messages SET session_count = session_count + 1 \
1045 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL",
1046 )
1047 .bind(conversation_id)
1048 .execute(&self.pool)
1049 .await?;
1050 Ok(())
1051 }
1052
1053 pub async fn fetch_tiers(
1061 &self,
1062 ids: &[MessageId],
1063 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1064 if ids.is_empty() {
1065 return Ok(std::collections::HashMap::new());
1066 }
1067 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1068 let query = format!(
1069 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1070 );
1071 let mut q = sqlx::query_as::<_, (MessageId, String)>(&query);
1072 for &id in ids {
1073 q = q.bind(id);
1074 }
1075 let rows = q.fetch_all(&self.pool).await?;
1076 Ok(rows.into_iter().collect())
1077 }
1078}
1079
1080#[derive(Debug, Clone)]
1082pub struct PromotionCandidate {
1083 pub id: MessageId,
1084 pub conversation_id: ConversationId,
1085 pub content: String,
1086 pub session_count: u32,
1087 pub importance_score: f64,
1088}
1089
1090#[cfg(test)]
1091mod tests;