1use zeph_db::ActiveDialect;
5use zeph_db::fts::sanitize_fts_query;
6#[allow(unused_imports)]
7use zeph_db::sql;
8use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
9
10use super::SqliteStore;
11use crate::error::MemoryError;
12use crate::types::{ConversationId, MessageId};
13
14fn parse_role(s: &str) -> Role {
15 match s {
16 "assistant" => Role::Assistant,
17 "system" => Role::System,
18 _ => Role::User,
19 }
20}
21
22#[must_use]
23pub fn role_str(role: Role) -> &'static str {
24 match role {
25 Role::System => "system",
26 Role::User => "user",
27 Role::Assistant => "assistant",
28 }
29}
30
31fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
34 match key {
35 "Text" => Some("text"),
36 "ToolOutput" => Some("tool_output"),
37 "Recall" => Some("recall"),
38 "CodeContext" => Some("code_context"),
39 "Summary" => Some("summary"),
40 "CrossSession" => Some("cross_session"),
41 "ToolUse" => Some("tool_use"),
42 "ToolResult" => Some("tool_result"),
43 "Image" => Some("image"),
44 "ThinkingBlock" => Some("thinking_block"),
45 "RedactedThinkingBlock" => Some("redacted_thinking_block"),
46 "Compaction" => Some("compaction"),
47 _ => None,
48 }
49}
50
51fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
59 let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
60 let mut result = Vec::with_capacity(array.len());
61 for element in array {
62 let obj = element.as_object()?;
63 if obj.contains_key("kind") {
64 return None;
65 }
66 if obj.len() != 1 {
67 return None;
68 }
69 let (key, inner) = obj.iter().next()?;
70 let kind = legacy_key_to_kind(key)?;
71 let mut new_obj = match inner {
72 serde_json::Value::Object(m) => m.clone(),
73 other => {
75 let mut m = serde_json::Map::new();
76 m.insert("data".to_string(), other.clone());
77 m
78 }
79 };
80 new_obj.insert(
81 "kind".to_string(),
82 serde_json::Value::String(kind.to_string()),
83 );
84 let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
85 result.push(part);
86 }
87 Some(result)
88}
89
90fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
95 if parts_json == "[]" {
96 return vec![];
97 }
98 match serde_json::from_str(parts_json) {
99 Ok(p) => p,
100 Err(e) => {
101 if let Some(parts) = try_parse_legacy_parts(parts_json) {
102 let truncated = parts_json.chars().take(120).collect::<String>();
103 tracing::warn!(
104 role = %role_str,
105 parts_json = %truncated,
106 "loaded legacy-format message parts via compat path"
107 );
108 return parts;
109 }
110 let truncated = parts_json.chars().take(120).collect::<String>();
111 tracing::warn!(
112 role = %role_str,
113 parts_json = %truncated,
114 error = %e,
115 "failed to deserialize message parts, falling back to empty"
116 );
117 vec![]
118 }
119 }
120}
121
122impl SqliteStore {
123 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
129 let row: (ConversationId,) = zeph_db::query_as(sql!(
130 "INSERT INTO conversations DEFAULT VALUES RETURNING id"
131 ))
132 .fetch_one(&self.pool)
133 .await?;
134 Ok(row.0)
135 }
136
137 pub async fn save_message(
143 &self,
144 conversation_id: ConversationId,
145 role: &str,
146 content: &str,
147 ) -> Result<MessageId, MemoryError> {
148 self.save_message_with_parts(conversation_id, role, content, "[]")
149 .await
150 }
151
152 pub async fn save_message_with_parts(
158 &self,
159 conversation_id: ConversationId,
160 role: &str,
161 content: &str,
162 parts_json: &str,
163 ) -> Result<MessageId, MemoryError> {
164 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
165 .await
166 }
167
168 pub async fn save_message_with_metadata(
174 &self,
175 conversation_id: ConversationId,
176 role: &str,
177 content: &str,
178 parts_json: &str,
179 agent_visible: bool,
180 user_visible: bool,
181 ) -> Result<MessageId, MemoryError> {
182 let importance_score = crate::semantic::importance::compute_importance(content, role);
183 let row: (MessageId,) = zeph_db::query_as(
184 sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
185 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
186 )
187 .bind(conversation_id)
188 .bind(role)
189 .bind(content)
190 .bind(parts_json)
191 .bind(i64::from(agent_visible))
192 .bind(i64::from(user_visible))
193 .bind(importance_score)
194 .fetch_one(&self.pool)
195 .await?;
196 Ok(row.0)
197 }
198
199 pub async fn load_history(
205 &self,
206 conversation_id: ConversationId,
207 limit: u32,
208 ) -> Result<Vec<Message>, MemoryError> {
209 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
210 "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
211 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
212 WHERE conversation_id = ? AND deleted_at IS NULL \
213 ORDER BY id DESC \
214 LIMIT ?\
215 ) ORDER BY id ASC"
216 ))
217 .bind(conversation_id)
218 .bind(limit)
219 .fetch_all(&self.pool)
220 .await?;
221
222 let messages = rows
223 .into_iter()
224 .map(
225 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
226 let parts = parse_parts_json(&role_str, &parts_json);
227 Message {
228 role: parse_role(&role_str),
229 content,
230 parts,
231 metadata: MessageMetadata {
232 agent_visible: agent_visible != 0,
233 user_visible: user_visible != 0,
234 compacted_at: None,
235 deferred_summary: None,
236 focus_pinned: false,
237 focus_marker_id: None,
238 db_id: Some(row_id),
239 },
240 }
241 },
242 )
243 .collect();
244 Ok(messages)
245 }
246
247 pub async fn load_history_filtered(
255 &self,
256 conversation_id: ConversationId,
257 limit: u32,
258 agent_visible: Option<bool>,
259 user_visible: Option<bool>,
260 ) -> Result<Vec<Message>, MemoryError> {
261 let av = agent_visible.map(i64::from);
262 let uv = user_visible.map(i64::from);
263
264 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
265 sql!("WITH recent AS (\
266 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
267 WHERE conversation_id = ? \
268 AND deleted_at IS NULL \
269 AND (? IS NULL OR agent_visible = ?) \
270 AND (? IS NULL OR user_visible = ?) \
271 ORDER BY id DESC \
272 LIMIT ?\
273 ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
274 )
275 .bind(conversation_id)
276 .bind(av)
277 .bind(av)
278 .bind(uv)
279 .bind(uv)
280 .bind(limit)
281 .fetch_all(&self.pool)
282 .await?;
283
284 let messages = rows
285 .into_iter()
286 .map(
287 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
288 let parts = parse_parts_json(&role_str, &parts_json);
289 Message {
290 role: parse_role(&role_str),
291 content,
292 parts,
293 metadata: MessageMetadata {
294 agent_visible: agent_visible != 0,
295 user_visible: user_visible != 0,
296 compacted_at: None,
297 deferred_summary: None,
298 focus_pinned: false,
299 focus_marker_id: None,
300 db_id: Some(row_id),
301 },
302 }
303 },
304 )
305 .collect();
306 Ok(messages)
307 }
308
309 pub async fn replace_conversation(
321 &self,
322 conversation_id: ConversationId,
323 compacted_range: std::ops::RangeInclusive<MessageId>,
324 summary_role: &str,
325 summary_content: &str,
326 ) -> Result<MessageId, MemoryError> {
327 let now = {
328 let secs = std::time::SystemTime::now()
329 .duration_since(std::time::UNIX_EPOCH)
330 .unwrap_or_default()
331 .as_secs();
332 format!("{secs}")
333 };
334 let start_id = compacted_range.start().0;
335 let end_id = compacted_range.end().0;
336
337 let mut tx = self.pool.begin().await?;
338
339 zeph_db::query(sql!(
340 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
341 WHERE conversation_id = ? AND id >= ? AND id <= ?"
342 ))
343 .bind(&now)
344 .bind(conversation_id)
345 .bind(start_id)
346 .bind(end_id)
347 .execute(&mut *tx)
348 .await?;
349
350 let row: (MessageId,) = zeph_db::query_as(sql!(
352 "INSERT INTO messages \
353 (conversation_id, role, content, parts, agent_visible, user_visible) \
354 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
355 ))
356 .bind(conversation_id)
357 .bind(summary_role)
358 .bind(summary_content)
359 .fetch_one(&mut *tx)
360 .await?;
361
362 tx.commit().await?;
363
364 Ok(row.0)
365 }
366
367 pub async fn apply_tool_pair_summaries(
377 &self,
378 conversation_id: ConversationId,
379 hide_ids: &[i64],
380 summaries: &[String],
381 ) -> Result<(), MemoryError> {
382 if hide_ids.is_empty() && summaries.is_empty() {
383 return Ok(());
384 }
385
386 let now = std::time::SystemTime::now()
387 .duration_since(std::time::UNIX_EPOCH)
388 .unwrap_or_default()
389 .as_secs()
390 .to_string();
391
392 let mut tx = self.pool.begin().await?;
393
394 for &id in hide_ids {
395 zeph_db::query(sql!(
396 "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
397 ))
398 .bind(&now)
399 .bind(id)
400 .execute(&mut *tx)
401 .await?;
402 }
403
404 for summary in summaries {
405 let content = format!("[tool summary] {summary}");
406 let parts = serde_json::to_string(&[MessagePart::Summary {
407 text: summary.clone(),
408 }])
409 .unwrap_or_else(|_| "[]".to_string());
410 zeph_db::query(sql!(
411 "INSERT INTO messages \
412 (conversation_id, role, content, parts, agent_visible, user_visible) \
413 VALUES (?, 'assistant', ?, ?, 1, 0)"
414 ))
415 .bind(conversation_id)
416 .bind(&content)
417 .bind(&parts)
418 .execute(&mut *tx)
419 .await?;
420 }
421
422 tx.commit().await?;
423 Ok(())
424 }
425
426 pub async fn oldest_message_ids(
432 &self,
433 conversation_id: ConversationId,
434 n: u32,
435 ) -> Result<Vec<MessageId>, MemoryError> {
436 let rows: Vec<(MessageId,)> = zeph_db::query_as(
437 sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
438 )
439 .bind(conversation_id)
440 .bind(n)
441 .fetch_all(&self.pool)
442 .await?;
443 Ok(rows.into_iter().map(|r| r.0).collect())
444 }
445
446 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
452 let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
453 "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
454 ))
455 .fetch_optional(&self.pool)
456 .await?;
457 Ok(row.map(|r| r.0))
458 }
459
460 pub async fn message_by_id(
466 &self,
467 message_id: MessageId,
468 ) -> Result<Option<Message>, MemoryError> {
469 let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
470 sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
471 )
472 .bind(message_id)
473 .fetch_optional(&self.pool)
474 .await?;
475
476 Ok(row.map(
477 |(role_str, content, parts_json, agent_visible, user_visible)| {
478 let parts = parse_parts_json(&role_str, &parts_json);
479 Message {
480 role: parse_role(&role_str),
481 content,
482 parts,
483 metadata: MessageMetadata {
484 agent_visible: agent_visible != 0,
485 user_visible: user_visible != 0,
486 compacted_at: None,
487 deferred_summary: None,
488 focus_pinned: false,
489 focus_marker_id: None,
490 db_id: None,
491 },
492 }
493 },
494 ))
495 }
496
497 pub async fn messages_by_ids(
503 &self,
504 ids: &[MessageId],
505 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
506 if ids.is_empty() {
507 return Ok(Vec::new());
508 }
509
510 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
511
512 let query = format!(
513 "SELECT id, role, content, parts FROM messages \
514 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
515 );
516 let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
517 for &id in ids {
518 q = q.bind(id);
519 }
520
521 let rows = q.fetch_all(&self.pool).await?;
522
523 Ok(rows
524 .into_iter()
525 .map(|(id, role_str, content, parts_json)| {
526 let parts = parse_parts_json(&role_str, &parts_json);
527 (
528 id,
529 Message {
530 role: parse_role(&role_str),
531 content,
532 parts,
533 metadata: MessageMetadata::default(),
534 },
535 )
536 })
537 .collect())
538 }
539
540 pub async fn unembedded_message_ids(
546 &self,
547 limit: Option<usize>,
548 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
549 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
550
551 let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
552 "SELECT m.id, m.conversation_id, m.role, m.content \
553 FROM messages m \
554 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
555 WHERE em.id IS NULL AND m.deleted_at IS NULL \
556 ORDER BY m.id ASC \
557 LIMIT ?"
558 ))
559 .bind(effective_limit)
560 .fetch_all(&self.pool)
561 .await?;
562
563 Ok(rows)
564 }
565
566 pub async fn count_messages(
572 &self,
573 conversation_id: ConversationId,
574 ) -> Result<i64, MemoryError> {
575 let row: (i64,) = zeph_db::query_as(sql!(
576 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
577 ))
578 .bind(conversation_id)
579 .fetch_one(&self.pool)
580 .await?;
581 Ok(row.0)
582 }
583
584 pub async fn count_messages_after(
590 &self,
591 conversation_id: ConversationId,
592 after_id: MessageId,
593 ) -> Result<i64, MemoryError> {
594 let row: (i64,) =
595 zeph_db::query_as(
596 sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
597 )
598 .bind(conversation_id)
599 .bind(after_id)
600 .fetch_one(&self.pool)
601 .await?;
602 Ok(row.0)
603 }
604
605 pub async fn keyword_search(
614 &self,
615 query: &str,
616 limit: usize,
617 conversation_id: Option<ConversationId>,
618 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
619 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
620 let safe_query = sanitize_fts_query(query);
621 if safe_query.is_empty() {
622 return Ok(Vec::new());
623 }
624
625 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
626 zeph_db::query_as(
627 sql!("SELECT m.id, -rank AS score \
628 FROM messages_fts f \
629 JOIN messages m ON m.id = f.rowid \
630 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
631 ORDER BY rank \
632 LIMIT ?"),
633 )
634 .bind(&safe_query)
635 .bind(cid)
636 .bind(effective_limit)
637 .fetch_all(&self.pool)
638 .await?
639 } else {
640 zeph_db::query_as(sql!(
641 "SELECT m.id, -rank AS score \
642 FROM messages_fts f \
643 JOIN messages m ON m.id = f.rowid \
644 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
645 ORDER BY rank \
646 LIMIT ?"
647 ))
648 .bind(&safe_query)
649 .bind(effective_limit)
650 .fetch_all(&self.pool)
651 .await?
652 };
653
654 Ok(rows)
655 }
656
657 pub async fn keyword_search_with_time_range(
670 &self,
671 query: &str,
672 limit: usize,
673 conversation_id: Option<ConversationId>,
674 after: Option<&str>,
675 before: Option<&str>,
676 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
677 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
678 let safe_query = sanitize_fts_query(query);
679 if safe_query.is_empty() {
680 return Ok(Vec::new());
681 }
682
683 let after_clause = if after.is_some() {
685 " AND m.created_at > ?"
686 } else {
687 ""
688 };
689 let before_clause = if before.is_some() {
690 " AND m.created_at < ?"
691 } else {
692 ""
693 };
694 let conv_clause = if conversation_id.is_some() {
695 " AND m.conversation_id = ?"
696 } else {
697 ""
698 };
699
700 let sql = format!(
701 "SELECT m.id, -rank AS score \
702 FROM messages_fts f \
703 JOIN messages m ON m.id = f.rowid \
704 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
705 {after_clause}{before_clause}{conv_clause} \
706 ORDER BY rank \
707 LIMIT ?"
708 );
709
710 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
711 if let Some(a) = after {
712 q = q.bind(a);
713 }
714 if let Some(b) = before {
715 q = q.bind(b);
716 }
717 if let Some(cid) = conversation_id {
718 q = q.bind(cid);
719 }
720 q = q.bind(effective_limit);
721
722 Ok(q.fetch_all(&self.pool).await?)
723 }
724
725 pub async fn message_timestamps(
733 &self,
734 ids: &[MessageId],
735 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
736 if ids.is_empty() {
737 return Ok(std::collections::HashMap::new());
738 }
739
740 let placeholders: String =
741 zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
742 let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
743 let query = format!(
744 "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
745 );
746 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
747 for &id in ids {
748 q = q.bind(id);
749 }
750
751 let rows = q.fetch_all(&self.pool).await?;
752 Ok(rows.into_iter().collect())
753 }
754
755 pub async fn load_messages_range(
761 &self,
762 conversation_id: ConversationId,
763 after_message_id: MessageId,
764 limit: usize,
765 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
766 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
767
768 let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
769 "SELECT id, role, content FROM messages \
770 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
771 ORDER BY id ASC LIMIT ?"
772 ))
773 .bind(conversation_id)
774 .bind(after_message_id)
775 .bind(effective_limit)
776 .fetch_all(&self.pool)
777 .await?;
778
779 Ok(rows)
780 }
781
782 pub async fn get_eviction_candidates(
790 &self,
791 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
792 let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
793 "SELECT id, created_at, last_accessed, access_count \
794 FROM messages WHERE deleted_at IS NULL"
795 ))
796 .fetch_all(&self.pool)
797 .await?;
798
799 Ok(rows
800 .into_iter()
801 .map(
802 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
803 id,
804 created_at,
805 last_accessed,
806 access_count: access_count.try_into().unwrap_or(0),
807 },
808 )
809 .collect())
810 }
811
812 pub async fn soft_delete_messages(
820 &self,
821 ids: &[MessageId],
822 ) -> Result<(), crate::error::MemoryError> {
823 if ids.is_empty() {
824 return Ok(());
825 }
826 for &id in ids {
828 zeph_db::query(
829 sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
830 )
831 .bind(id)
832 .execute(&self.pool)
833 .await?;
834 }
835 Ok(())
836 }
837
838 pub async fn get_soft_deleted_message_ids(
844 &self,
845 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
846 let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
847 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
848 ))
849 .fetch_all(&self.pool)
850 .await?;
851 Ok(rows.into_iter().map(|(id,)| id).collect())
852 }
853
854 pub async fn mark_qdrant_cleaned(
860 &self,
861 ids: &[MessageId],
862 ) -> Result<(), crate::error::MemoryError> {
863 for &id in ids {
864 zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
865 .bind(id)
866 .execute(&self.pool)
867 .await?;
868 }
869 Ok(())
870 }
871
872 pub async fn fetch_importance_scores(
880 &self,
881 ids: &[MessageId],
882 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
883 if ids.is_empty() {
884 return Ok(std::collections::HashMap::new());
885 }
886 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
887 let query = format!(
888 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
889 );
890 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
891 for &id in ids {
892 q = q.bind(id);
893 }
894 let rows = q.fetch_all(&self.pool).await?;
895 Ok(rows.into_iter().collect())
896 }
897
898 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
906 if ids.is_empty() {
907 return Ok(());
908 }
909 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
910 let query = format!(
911 "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
912 WHERE id IN ({placeholders})"
913 );
914 let mut q = zeph_db::query(&query);
915 for &id in ids {
916 q = q.bind(id);
917 }
918 q.execute(&self.pool).await?;
919 Ok(())
920 }
921
922 pub async fn find_promotion_candidates(
931 &self,
932 min_sessions: u32,
933 batch_size: usize,
934 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
935 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
936 let min = i64::from(min_sessions);
937 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
938 "SELECT id, conversation_id, content, session_count, importance_score \
939 FROM messages \
940 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
941 ORDER BY session_count DESC, importance_score DESC \
942 LIMIT ?"
943 ))
944 .bind(min)
945 .bind(limit)
946 .fetch_all(&self.pool)
947 .await?;
948
949 Ok(rows
950 .into_iter()
951 .map(
952 |(id, conversation_id, content, session_count, importance_score)| {
953 PromotionCandidate {
954 id,
955 conversation_id,
956 content,
957 session_count: session_count.try_into().unwrap_or(0),
958 importance_score,
959 }
960 },
961 )
962 .collect())
963 }
964
965 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
973 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
974 "SELECT tier, COUNT(*) FROM messages \
975 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
976 GROUP BY tier"
977 ))
978 .fetch_all(&self.pool)
979 .await?;
980
981 let mut episodic = 0i64;
982 let mut semantic = 0i64;
983 for (tier, count) in rows {
984 match tier.as_str() {
985 "episodic" => episodic = count,
986 "semantic" => semantic = count,
987 _ => {}
988 }
989 }
990 Ok((episodic, semantic))
991 }
992
993 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
999 let row: (i64,) = zeph_db::query_as(sql!(
1000 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1001 ))
1002 .fetch_one(&self.pool)
1003 .await?;
1004 Ok(row.0)
1005 }
1006
1007 pub async fn promote_to_semantic(
1020 &self,
1021 conversation_id: ConversationId,
1022 merged_content: &str,
1023 original_ids: &[MessageId],
1024 ) -> Result<MessageId, MemoryError> {
1025 if original_ids.is_empty() {
1026 return Err(MemoryError::Other(
1027 "promote_to_semantic: original_ids must not be empty".into(),
1028 ));
1029 }
1030
1031 let mut tx = self.pool.begin().await?;
1032
1033 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1035 let promote_insert_raw = format!(
1036 "INSERT INTO messages \
1037 (conversation_id, role, content, parts, agent_visible, user_visible, \
1038 tier, promotion_timestamp) \
1039 VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1040 RETURNING id"
1041 );
1042 let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1043 let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1044 .bind(conversation_id)
1045 .bind(merged_content)
1046 .fetch_one(&mut *tx)
1047 .await?;
1048
1049 let new_id = row.0;
1050
1051 for &id in original_ids {
1053 zeph_db::query(sql!(
1054 "UPDATE messages \
1055 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1056 WHERE id = ? AND deleted_at IS NULL"
1057 ))
1058 .bind(id)
1059 .execute(&mut *tx)
1060 .await?;
1061 }
1062
1063 tx.commit().await?;
1064 Ok(new_id)
1065 }
1066
1067 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1076 if ids.is_empty() {
1077 return Ok(0);
1078 }
1079 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1080 let manual_promote_raw = format!(
1081 "UPDATE messages \
1082 SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1083 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1084 );
1085 let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1086 let mut count = 0usize;
1087 for &id in ids {
1088 let result = zeph_db::query(&manual_promote_sql)
1089 .bind(id)
1090 .execute(&self.pool)
1091 .await?;
1092 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1093 }
1094 Ok(count)
1095 }
1096
1097 pub async fn increment_session_counts_for_conversation(
1106 &self,
1107 conversation_id: ConversationId,
1108 ) -> Result<(), MemoryError> {
1109 zeph_db::query(sql!(
1110 "UPDATE messages SET session_count = session_count + 1 \
1111 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1112 ))
1113 .bind(conversation_id)
1114 .execute(&self.pool)
1115 .await?;
1116 Ok(())
1117 }
1118
1119 pub async fn fetch_tiers(
1127 &self,
1128 ids: &[MessageId],
1129 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1130 if ids.is_empty() {
1131 return Ok(std::collections::HashMap::new());
1132 }
1133 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1134 let query = format!(
1135 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1136 );
1137 let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1138 for &id in ids {
1139 q = q.bind(id);
1140 }
1141 let rows = q.fetch_all(&self.pool).await?;
1142 Ok(rows.into_iter().collect())
1143 }
1144
1145 pub async fn conversations_with_unconsolidated_messages(
1153 &self,
1154 ) -> Result<Vec<ConversationId>, MemoryError> {
1155 let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1156 "SELECT DISTINCT conversation_id FROM messages \
1157 WHERE consolidated = 0 AND deleted_at IS NULL"
1158 ))
1159 .fetch_all(&self.pool)
1160 .await?;
1161 Ok(rows.into_iter().map(|(id,)| id).collect())
1162 }
1163
1164 pub async fn find_unconsolidated_messages(
1173 &self,
1174 conversation_id: ConversationId,
1175 limit: usize,
1176 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1177 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1178 let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1179 "SELECT id, content FROM messages \
1180 WHERE conversation_id = ? \
1181 AND consolidated = 0 \
1182 AND deleted_at IS NULL \
1183 ORDER BY id ASC \
1184 LIMIT ?"
1185 ))
1186 .bind(conversation_id)
1187 .bind(limit)
1188 .fetch_all(&self.pool)
1189 .await?;
1190 Ok(rows)
1191 }
1192
1193 pub async fn find_consolidated_for_source(
1202 &self,
1203 source_id: MessageId,
1204 ) -> Result<Option<MessageId>, MemoryError> {
1205 let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1206 "SELECT consolidated_id FROM memory_consolidation_sources \
1207 WHERE source_id = ? \
1208 LIMIT 1"
1209 ))
1210 .bind(source_id)
1211 .fetch_optional(&self.pool)
1212 .await?;
1213 Ok(row.map(|(id,)| id))
1214 }
1215
1216 pub async fn apply_consolidation_merge(
1230 &self,
1231 conversation_id: ConversationId,
1232 role: &str,
1233 merged_content: &str,
1234 source_ids: &[MessageId],
1235 confidence: f32,
1236 confidence_threshold: f32,
1237 ) -> Result<bool, MemoryError> {
1238 if confidence < confidence_threshold {
1239 return Ok(false);
1240 }
1241 if source_ids.is_empty() {
1242 return Ok(false);
1243 }
1244
1245 let mut tx = self.pool.begin().await?;
1246
1247 let importance = crate::semantic::importance::compute_importance(merged_content, role);
1248 let row: (MessageId,) = zeph_db::query_as(sql!(
1249 "INSERT INTO messages \
1250 (conversation_id, role, content, parts, agent_visible, user_visible, \
1251 importance_score, consolidated, consolidation_confidence) \
1252 VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1253 RETURNING id"
1254 ))
1255 .bind(conversation_id)
1256 .bind(role)
1257 .bind(merged_content)
1258 .bind(importance)
1259 .bind(confidence)
1260 .fetch_one(&mut *tx)
1261 .await?;
1262 let consolidated_id = row.0;
1263
1264 let consol_sql = format!(
1265 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1266 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1267 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1268 );
1269 for &source_id in source_ids {
1270 zeph_db::query(&consol_sql)
1271 .bind(consolidated_id)
1272 .bind(source_id)
1273 .execute(&mut *tx)
1274 .await?;
1275
1276 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1278 .bind(source_id)
1279 .execute(&mut *tx)
1280 .await?;
1281 }
1282
1283 tx.commit().await?;
1284 Ok(true)
1285 }
1286}
1287
1288#[derive(Debug, Clone)]
1290pub struct PromotionCandidate {
1291 pub id: MessageId,
1292 pub conversation_id: ConversationId,
1293 pub content: String,
1294 pub session_count: u32,
1295 pub importance_score: f64,
1296}
1297
1298#[cfg(test)]
1299mod tests;