1use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18 match s {
19 "assistant" => Role::Assistant,
20 "system" => Role::System,
21 _ => Role::User,
22 }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27 match role {
28 Role::System => "system",
29 Role::User => "user",
30 Role::Assistant => "assistant",
31 }
32}
33
34fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37 match key {
38 "Text" => Some("text"),
39 "ToolOutput" => Some("tool_output"),
40 "Recall" => Some("recall"),
41 "CodeContext" => Some("code_context"),
42 "Summary" => Some("summary"),
43 "CrossSession" => Some("cross_session"),
44 "ToolUse" => Some("tool_use"),
45 "ToolResult" => Some("tool_result"),
46 "Image" => Some("image"),
47 "ThinkingBlock" => Some("thinking_block"),
48 "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49 "Compaction" => Some("compaction"),
50 _ => None,
51 }
52}
53
54fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62 let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63 let mut result = Vec::with_capacity(array.len());
64 for element in array {
65 let obj = element.as_object()?;
66 if obj.contains_key("kind") {
67 return None;
68 }
69 if obj.len() != 1 {
70 return None;
71 }
72 let (key, inner) = obj.iter().next()?;
73 let kind = legacy_key_to_kind(key)?;
74 let mut new_obj = match inner {
75 serde_json::Value::Object(m) => m.clone(),
76 other => {
78 let mut m = serde_json::Map::new();
79 m.insert("data".to_string(), other.clone());
80 m
81 }
82 };
83 new_obj.insert(
84 "kind".to_string(),
85 serde_json::Value::String(kind.to_string()),
86 );
87 let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88 result.push(part);
89 }
90 Some(result)
91}
92
93fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98 if parts_json == "[]" {
99 return vec![];
100 }
101 match serde_json::from_str(parts_json) {
102 Ok(p) => p,
103 Err(e) => {
104 if let Some(parts) = try_parse_legacy_parts(parts_json) {
105 let truncated = parts_json.chars().take(120).collect::<String>();
106 tracing::warn!(
107 role = %role_str,
108 parts_json = %truncated,
109 "loaded legacy-format message parts via compat path"
110 );
111 return parts;
112 }
113 let truncated = parts_json.chars().take(120).collect::<String>();
114 tracing::warn!(
115 role = %role_str,
116 parts_json = %truncated,
117 error = %e,
118 "failed to deserialize message parts, falling back to empty"
119 );
120 vec![]
121 }
122 }
123}
124
125impl SqliteStore {
126 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132 let row: (ConversationId,) = zeph_db::query_as(sql!(
133 "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134 ))
135 .fetch_one(&self.pool)
136 .await?;
137 Ok(row.0)
138 }
139
140 pub async fn save_message(
146 &self,
147 conversation_id: ConversationId,
148 role: &str,
149 content: &str,
150 ) -> Result<MessageId, MemoryError> {
151 self.save_message_with_parts(conversation_id, role, content, "[]")
152 .await
153 }
154
155 pub async fn save_message_with_parts(
161 &self,
162 conversation_id: ConversationId,
163 role: &str,
164 content: &str,
165 parts_json: &str,
166 ) -> Result<MessageId, MemoryError> {
167 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
168 .await
169 }
170
171 pub async fn save_message_with_category(
179 &self,
180 conversation_id: ConversationId,
181 role: &str,
182 content: &str,
183 category: Option<&str>,
184 ) -> Result<MessageId, MemoryError> {
185 let importance_score = crate::semantic::importance::compute_importance(content, role);
186 let row: (MessageId,) = zeph_db::query_as(sql!(
187 "INSERT INTO messages \
188 (conversation_id, role, content, parts, agent_visible, user_visible, \
189 importance_score, category) \
190 VALUES (?, ?, ?, '[]', 1, 1, ?, ?) RETURNING id"
191 ))
192 .bind(conversation_id)
193 .bind(role)
194 .bind(content)
195 .bind(importance_score)
196 .bind(category)
197 .fetch_one(&self.pool)
198 .await?;
199 Ok(row.0)
200 }
201
202 pub async fn save_message_with_metadata(
208 &self,
209 conversation_id: ConversationId,
210 role: &str,
211 content: &str,
212 parts_json: &str,
213 agent_visible: bool,
214 user_visible: bool,
215 ) -> Result<MessageId, MemoryError> {
216 let importance_score = crate::semantic::importance::compute_importance(content, role);
217 let row: (MessageId,) = zeph_db::query_as(
218 sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
219 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
220 )
221 .bind(conversation_id)
222 .bind(role)
223 .bind(content)
224 .bind(parts_json)
225 .bind(i64::from(agent_visible))
226 .bind(i64::from(user_visible))
227 .bind(importance_score)
228 .fetch_one(&self.pool)
229 .await?;
230 Ok(row.0)
231 }
232
233 pub async fn load_history(
239 &self,
240 conversation_id: ConversationId,
241 limit: u32,
242 ) -> Result<Vec<Message>, MemoryError> {
243 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
244 "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
245 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
246 WHERE conversation_id = ? AND deleted_at IS NULL \
247 ORDER BY id DESC \
248 LIMIT ?\
249 ) ORDER BY id ASC"
250 ))
251 .bind(conversation_id)
252 .bind(limit)
253 .fetch_all(&self.pool)
254 .await?;
255
256 let messages = rows
257 .into_iter()
258 .map(
259 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
260 let parts = parse_parts_json(&role_str, &parts_json);
261 Message {
262 role: parse_role(&role_str),
263 content,
264 parts,
265 metadata: MessageMetadata {
266 agent_visible: agent_visible != 0,
267 user_visible: user_visible != 0,
268 compacted_at: None,
269 deferred_summary: None,
270 focus_pinned: false,
271 focus_marker_id: None,
272 db_id: Some(row_id),
273 },
274 }
275 },
276 )
277 .collect();
278 Ok(messages)
279 }
280
281 pub async fn load_history_filtered(
289 &self,
290 conversation_id: ConversationId,
291 limit: u32,
292 agent_visible: Option<bool>,
293 user_visible: Option<bool>,
294 ) -> Result<Vec<Message>, MemoryError> {
295 let av = agent_visible.map(i64::from);
296 let uv = user_visible.map(i64::from);
297
298 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
299 sql!("WITH recent AS (\
300 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
301 WHERE conversation_id = ? \
302 AND deleted_at IS NULL \
303 AND (? IS NULL OR agent_visible = ?) \
304 AND (? IS NULL OR user_visible = ?) \
305 ORDER BY id DESC \
306 LIMIT ?\
307 ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
308 )
309 .bind(conversation_id)
310 .bind(av)
311 .bind(av)
312 .bind(uv)
313 .bind(uv)
314 .bind(limit)
315 .fetch_all(&self.pool)
316 .await?;
317
318 let messages = rows
319 .into_iter()
320 .map(
321 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
322 let parts = parse_parts_json(&role_str, &parts_json);
323 Message {
324 role: parse_role(&role_str),
325 content,
326 parts,
327 metadata: MessageMetadata {
328 agent_visible: agent_visible != 0,
329 user_visible: user_visible != 0,
330 compacted_at: None,
331 deferred_summary: None,
332 focus_pinned: false,
333 focus_marker_id: None,
334 db_id: Some(row_id),
335 },
336 }
337 },
338 )
339 .collect();
340 Ok(messages)
341 }
342
343 pub async fn replace_conversation(
355 &self,
356 conversation_id: ConversationId,
357 compacted_range: std::ops::RangeInclusive<MessageId>,
358 summary_role: &str,
359 summary_content: &str,
360 ) -> Result<MessageId, MemoryError> {
361 let now = {
362 let secs = std::time::SystemTime::now()
363 .duration_since(std::time::UNIX_EPOCH)
364 .unwrap_or_default()
365 .as_secs();
366 format!("{secs}")
367 };
368 let start_id = compacted_range.start().0;
369 let end_id = compacted_range.end().0;
370
371 let mut tx = self.pool.begin().await?;
372
373 zeph_db::query(sql!(
374 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
375 WHERE conversation_id = ? AND id >= ? AND id <= ?"
376 ))
377 .bind(&now)
378 .bind(conversation_id)
379 .bind(start_id)
380 .bind(end_id)
381 .execute(&mut *tx)
382 .await?;
383
384 let row: (MessageId,) = zeph_db::query_as(sql!(
386 "INSERT INTO messages \
387 (conversation_id, role, content, parts, agent_visible, user_visible) \
388 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
389 ))
390 .bind(conversation_id)
391 .bind(summary_role)
392 .bind(summary_content)
393 .fetch_one(&mut *tx)
394 .await?;
395
396 tx.commit().await?;
397
398 Ok(row.0)
399 }
400
401 pub async fn apply_tool_pair_summaries(
411 &self,
412 conversation_id: ConversationId,
413 hide_ids: &[i64],
414 summaries: &[String],
415 ) -> Result<(), MemoryError> {
416 if hide_ids.is_empty() && summaries.is_empty() {
417 return Ok(());
418 }
419
420 let now = std::time::SystemTime::now()
421 .duration_since(std::time::UNIX_EPOCH)
422 .unwrap_or_default()
423 .as_secs()
424 .to_string();
425
426 let mut tx = self.pool.begin().await?;
427
428 for &id in hide_ids {
429 zeph_db::query(sql!(
430 "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
431 ))
432 .bind(&now)
433 .bind(id)
434 .execute(&mut *tx)
435 .await?;
436 }
437
438 for summary in summaries {
439 let content = format!("[tool summary] {summary}");
440 let parts = serde_json::to_string(&[MessagePart::Summary {
441 text: summary.clone(),
442 }])
443 .unwrap_or_else(|_| "[]".to_string());
444 zeph_db::query(sql!(
445 "INSERT INTO messages \
446 (conversation_id, role, content, parts, agent_visible, user_visible) \
447 VALUES (?, 'assistant', ?, ?, 1, 0)"
448 ))
449 .bind(conversation_id)
450 .bind(&content)
451 .bind(&parts)
452 .execute(&mut *tx)
453 .await?;
454 }
455
456 tx.commit().await?;
457 Ok(())
458 }
459
460 pub async fn oldest_message_ids(
466 &self,
467 conversation_id: ConversationId,
468 n: u32,
469 ) -> Result<Vec<MessageId>, MemoryError> {
470 let rows: Vec<(MessageId,)> = zeph_db::query_as(
471 sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
472 )
473 .bind(conversation_id)
474 .bind(n)
475 .fetch_all(&self.pool)
476 .await?;
477 Ok(rows.into_iter().map(|r| r.0).collect())
478 }
479
480 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
486 let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
487 "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
488 ))
489 .fetch_optional(&self.pool)
490 .await?;
491 Ok(row.map(|r| r.0))
492 }
493
494 pub async fn message_by_id(
500 &self,
501 message_id: MessageId,
502 ) -> Result<Option<Message>, MemoryError> {
503 let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
504 sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
505 )
506 .bind(message_id)
507 .fetch_optional(&self.pool)
508 .await?;
509
510 Ok(row.map(
511 |(role_str, content, parts_json, agent_visible, user_visible)| {
512 let parts = parse_parts_json(&role_str, &parts_json);
513 Message {
514 role: parse_role(&role_str),
515 content,
516 parts,
517 metadata: MessageMetadata {
518 agent_visible: agent_visible != 0,
519 user_visible: user_visible != 0,
520 compacted_at: None,
521 deferred_summary: None,
522 focus_pinned: false,
523 focus_marker_id: None,
524 db_id: None,
525 },
526 }
527 },
528 ))
529 }
530
531 pub async fn messages_by_ids(
537 &self,
538 ids: &[MessageId],
539 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
540 if ids.is_empty() {
541 return Ok(Vec::new());
542 }
543
544 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
545
546 let query = format!(
547 "SELECT id, role, content, parts FROM messages \
548 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
549 );
550 let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
551 for &id in ids {
552 q = q.bind(id);
553 }
554
555 let rows = q.fetch_all(&self.pool).await?;
556
557 Ok(rows
558 .into_iter()
559 .map(|(id, role_str, content, parts_json)| {
560 let parts = parse_parts_json(&role_str, &parts_json);
561 (
562 id,
563 Message {
564 role: parse_role(&role_str),
565 content,
566 parts,
567 metadata: MessageMetadata::default(),
568 },
569 )
570 })
571 .collect())
572 }
573
574 pub async fn unembedded_message_ids(
580 &self,
581 limit: Option<usize>,
582 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
583 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
584
585 let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
586 "SELECT m.id, m.conversation_id, m.role, m.content \
587 FROM messages m \
588 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
589 WHERE em.id IS NULL AND m.deleted_at IS NULL \
590 ORDER BY m.id ASC \
591 LIMIT ?"
592 ))
593 .bind(effective_limit)
594 .fetch_all(&self.pool)
595 .await?;
596
597 Ok(rows)
598 }
599
600 pub fn stream_unembedded_messages(
611 &self,
612 limit: i64,
613 ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
614 {
615 zeph_db::query_as(sql!(
616 "SELECT m.id, m.conversation_id, m.role, m.content \
617 FROM messages m \
618 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
619 WHERE em.id IS NULL AND m.deleted_at IS NULL \
620 ORDER BY m.id ASC \
621 LIMIT ?"
622 ))
623 .bind(limit)
624 .fetch(&self.pool)
625 .map_err(MemoryError::from)
626 }
627
628 pub async fn count_messages(
634 &self,
635 conversation_id: ConversationId,
636 ) -> Result<i64, MemoryError> {
637 let row: (i64,) = zeph_db::query_as(sql!(
638 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
639 ))
640 .bind(conversation_id)
641 .fetch_one(&self.pool)
642 .await?;
643 Ok(row.0)
644 }
645
646 pub async fn count_messages_after(
652 &self,
653 conversation_id: ConversationId,
654 after_id: MessageId,
655 ) -> Result<i64, MemoryError> {
656 let row: (i64,) =
657 zeph_db::query_as(
658 sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
659 )
660 .bind(conversation_id)
661 .bind(after_id)
662 .fetch_one(&self.pool)
663 .await?;
664 Ok(row.0)
665 }
666
667 pub async fn keyword_search(
676 &self,
677 query: &str,
678 limit: usize,
679 conversation_id: Option<ConversationId>,
680 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
681 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
682 let safe_query = sanitize_fts_query(query);
683 if safe_query.is_empty() {
684 return Ok(Vec::new());
685 }
686
687 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
688 zeph_db::query_as(
689 sql!("SELECT m.id, -rank AS score \
690 FROM messages_fts f \
691 JOIN messages m ON m.id = f.rowid \
692 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
693 ORDER BY rank \
694 LIMIT ?"),
695 )
696 .bind(&safe_query)
697 .bind(cid)
698 .bind(effective_limit)
699 .fetch_all(&self.pool)
700 .await?
701 } else {
702 zeph_db::query_as(sql!(
703 "SELECT m.id, -rank AS score \
704 FROM messages_fts f \
705 JOIN messages m ON m.id = f.rowid \
706 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
707 ORDER BY rank \
708 LIMIT ?"
709 ))
710 .bind(&safe_query)
711 .bind(effective_limit)
712 .fetch_all(&self.pool)
713 .await?
714 };
715
716 Ok(rows)
717 }
718
719 pub async fn keyword_search_with_time_range(
732 &self,
733 query: &str,
734 limit: usize,
735 conversation_id: Option<ConversationId>,
736 after: Option<&str>,
737 before: Option<&str>,
738 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
739 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
740 let safe_query = sanitize_fts_query(query);
741 if safe_query.is_empty() {
742 return Ok(Vec::new());
743 }
744
745 let after_clause = if after.is_some() {
747 " AND m.created_at > ?"
748 } else {
749 ""
750 };
751 let before_clause = if before.is_some() {
752 " AND m.created_at < ?"
753 } else {
754 ""
755 };
756 let conv_clause = if conversation_id.is_some() {
757 " AND m.conversation_id = ?"
758 } else {
759 ""
760 };
761
762 let sql = format!(
763 "SELECT m.id, -rank AS score \
764 FROM messages_fts f \
765 JOIN messages m ON m.id = f.rowid \
766 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
767 {after_clause}{before_clause}{conv_clause} \
768 ORDER BY rank \
769 LIMIT ?"
770 );
771
772 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
773 if let Some(a) = after {
774 q = q.bind(a);
775 }
776 if let Some(b) = before {
777 q = q.bind(b);
778 }
779 if let Some(cid) = conversation_id {
780 q = q.bind(cid);
781 }
782 q = q.bind(effective_limit);
783
784 Ok(q.fetch_all(&self.pool).await?)
785 }
786
787 pub async fn message_timestamps(
795 &self,
796 ids: &[MessageId],
797 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
798 if ids.is_empty() {
799 return Ok(std::collections::HashMap::new());
800 }
801
802 let placeholders: String =
803 zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
804 let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
805 let query = format!(
806 "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
807 );
808 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
809 for &id in ids {
810 q = q.bind(id);
811 }
812
813 let rows = q.fetch_all(&self.pool).await?;
814 Ok(rows.into_iter().collect())
815 }
816
817 pub async fn load_messages_range(
823 &self,
824 conversation_id: ConversationId,
825 after_message_id: MessageId,
826 limit: usize,
827 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
828 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
829
830 let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
831 "SELECT id, role, content FROM messages \
832 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
833 ORDER BY id ASC LIMIT ?"
834 ))
835 .bind(conversation_id)
836 .bind(after_message_id)
837 .bind(effective_limit)
838 .fetch_all(&self.pool)
839 .await?;
840
841 Ok(rows)
842 }
843
844 pub async fn get_eviction_candidates(
852 &self,
853 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
854 let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
855 "SELECT id, created_at, last_accessed, access_count \
856 FROM messages WHERE deleted_at IS NULL"
857 ))
858 .fetch_all(&self.pool)
859 .await?;
860
861 Ok(rows
862 .into_iter()
863 .map(
864 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
865 id,
866 created_at,
867 last_accessed,
868 access_count: access_count.try_into().unwrap_or(0),
869 },
870 )
871 .collect())
872 }
873
874 pub async fn soft_delete_messages(
882 &self,
883 ids: &[MessageId],
884 ) -> Result<(), crate::error::MemoryError> {
885 if ids.is_empty() {
886 return Ok(());
887 }
888 for &id in ids {
890 zeph_db::query(
891 sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
892 )
893 .bind(id)
894 .execute(&self.pool)
895 .await?;
896 }
897 Ok(())
898 }
899
900 pub async fn get_soft_deleted_message_ids(
906 &self,
907 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
908 let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
909 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
910 ))
911 .fetch_all(&self.pool)
912 .await?;
913 Ok(rows.into_iter().map(|(id,)| id).collect())
914 }
915
916 pub async fn mark_qdrant_cleaned(
922 &self,
923 ids: &[MessageId],
924 ) -> Result<(), crate::error::MemoryError> {
925 for &id in ids {
926 zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
927 .bind(id)
928 .execute(&self.pool)
929 .await?;
930 }
931 Ok(())
932 }
933
934 pub async fn fetch_importance_scores(
942 &self,
943 ids: &[MessageId],
944 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
945 if ids.is_empty() {
946 return Ok(std::collections::HashMap::new());
947 }
948 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
949 let query = format!(
950 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
951 );
952 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
953 for &id in ids {
954 q = q.bind(id);
955 }
956 let rows = q.fetch_all(&self.pool).await?;
957 Ok(rows.into_iter().collect())
958 }
959
960 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
968 if ids.is_empty() {
969 return Ok(());
970 }
971 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
972 let query = format!(
973 "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
974 WHERE id IN ({placeholders})"
975 );
976 let mut q = zeph_db::query(&query);
977 for &id in ids {
978 q = q.bind(id);
979 }
980 q.execute(&self.pool).await?;
981 Ok(())
982 }
983
984 pub async fn find_promotion_candidates(
993 &self,
994 min_sessions: u32,
995 batch_size: usize,
996 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
997 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
998 let min = i64::from(min_sessions);
999 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1000 "SELECT id, conversation_id, content, session_count, importance_score \
1001 FROM messages \
1002 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1003 ORDER BY session_count DESC, importance_score DESC \
1004 LIMIT ?"
1005 ))
1006 .bind(min)
1007 .bind(limit)
1008 .fetch_all(&self.pool)
1009 .await?;
1010
1011 Ok(rows
1012 .into_iter()
1013 .map(
1014 |(id, conversation_id, content, session_count, importance_score)| {
1015 PromotionCandidate {
1016 id,
1017 conversation_id,
1018 content,
1019 session_count: session_count.try_into().unwrap_or(0),
1020 importance_score,
1021 }
1022 },
1023 )
1024 .collect())
1025 }
1026
1027 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1035 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1036 "SELECT tier, COUNT(*) FROM messages \
1037 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1038 GROUP BY tier"
1039 ))
1040 .fetch_all(&self.pool)
1041 .await?;
1042
1043 let mut episodic = 0i64;
1044 let mut semantic = 0i64;
1045 for (tier, count) in rows {
1046 match tier.as_str() {
1047 "episodic" => episodic = count,
1048 "semantic" => semantic = count,
1049 _ => {}
1050 }
1051 }
1052 Ok((episodic, semantic))
1053 }
1054
1055 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1061 let row: (i64,) = zeph_db::query_as(sql!(
1062 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1063 ))
1064 .fetch_one(&self.pool)
1065 .await?;
1066 Ok(row.0)
1067 }
1068
1069 pub async fn promote_to_semantic(
1082 &self,
1083 conversation_id: ConversationId,
1084 merged_content: &str,
1085 original_ids: &[MessageId],
1086 ) -> Result<MessageId, MemoryError> {
1087 if original_ids.is_empty() {
1088 return Err(MemoryError::Other(
1089 "promote_to_semantic: original_ids must not be empty".into(),
1090 ));
1091 }
1092
1093 let mut tx = begin_write(&self.pool).await?;
1096
1097 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1099 let promote_insert_raw = format!(
1100 "INSERT INTO messages \
1101 (conversation_id, role, content, parts, agent_visible, user_visible, \
1102 tier, promotion_timestamp) \
1103 VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1104 RETURNING id"
1105 );
1106 let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1107 let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1108 .bind(conversation_id)
1109 .bind(merged_content)
1110 .fetch_one(&mut *tx)
1111 .await?;
1112
1113 let new_id = row.0;
1114
1115 for &id in original_ids {
1117 zeph_db::query(sql!(
1118 "UPDATE messages \
1119 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1120 WHERE id = ? AND deleted_at IS NULL"
1121 ))
1122 .bind(id)
1123 .execute(&mut *tx)
1124 .await?;
1125 }
1126
1127 tx.commit().await?;
1128 Ok(new_id)
1129 }
1130
1131 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1140 if ids.is_empty() {
1141 return Ok(0);
1142 }
1143 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1144 let manual_promote_raw = format!(
1145 "UPDATE messages \
1146 SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1147 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1148 );
1149 let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1150 let mut count = 0usize;
1151 for &id in ids {
1152 let result = zeph_db::query(&manual_promote_sql)
1153 .bind(id)
1154 .execute(&self.pool)
1155 .await?;
1156 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1157 }
1158 Ok(count)
1159 }
1160
1161 pub async fn increment_session_counts_for_conversation(
1170 &self,
1171 conversation_id: ConversationId,
1172 ) -> Result<(), MemoryError> {
1173 zeph_db::query(sql!(
1174 "UPDATE messages SET session_count = session_count + 1 \
1175 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1176 ))
1177 .bind(conversation_id)
1178 .execute(&self.pool)
1179 .await?;
1180 Ok(())
1181 }
1182
1183 pub async fn fetch_tiers(
1191 &self,
1192 ids: &[MessageId],
1193 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1194 if ids.is_empty() {
1195 return Ok(std::collections::HashMap::new());
1196 }
1197 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1198 let query = format!(
1199 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1200 );
1201 let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1202 for &id in ids {
1203 q = q.bind(id);
1204 }
1205 let rows = q.fetch_all(&self.pool).await?;
1206 Ok(rows.into_iter().collect())
1207 }
1208
1209 pub async fn conversations_with_unconsolidated_messages(
1217 &self,
1218 ) -> Result<Vec<ConversationId>, MemoryError> {
1219 let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1220 "SELECT DISTINCT conversation_id FROM messages \
1221 WHERE consolidated = 0 AND deleted_at IS NULL"
1222 ))
1223 .fetch_all(&self.pool)
1224 .await?;
1225 Ok(rows.into_iter().map(|(id,)| id).collect())
1226 }
1227
1228 pub async fn find_unconsolidated_messages(
1237 &self,
1238 conversation_id: ConversationId,
1239 limit: usize,
1240 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1241 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1242 let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1243 "SELECT id, content FROM messages \
1244 WHERE conversation_id = ? \
1245 AND consolidated = 0 \
1246 AND deleted_at IS NULL \
1247 ORDER BY id ASC \
1248 LIMIT ?"
1249 ))
1250 .bind(conversation_id)
1251 .bind(limit)
1252 .fetch_all(&self.pool)
1253 .await?;
1254 Ok(rows)
1255 }
1256
1257 pub async fn find_consolidated_for_source(
1266 &self,
1267 source_id: MessageId,
1268 ) -> Result<Option<MessageId>, MemoryError> {
1269 let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1270 "SELECT consolidated_id FROM memory_consolidation_sources \
1271 WHERE source_id = ? \
1272 LIMIT 1"
1273 ))
1274 .bind(source_id)
1275 .fetch_optional(&self.pool)
1276 .await?;
1277 Ok(row.map(|(id,)| id))
1278 }
1279
1280 pub async fn apply_consolidation_merge(
1294 &self,
1295 conversation_id: ConversationId,
1296 role: &str,
1297 merged_content: &str,
1298 source_ids: &[MessageId],
1299 confidence: f32,
1300 confidence_threshold: f32,
1301 ) -> Result<bool, MemoryError> {
1302 if confidence < confidence_threshold {
1303 return Ok(false);
1304 }
1305 if source_ids.is_empty() {
1306 return Ok(false);
1307 }
1308
1309 let mut tx = self.pool.begin().await?;
1310
1311 let importance = crate::semantic::importance::compute_importance(merged_content, role);
1312 let row: (MessageId,) = zeph_db::query_as(sql!(
1313 "INSERT INTO messages \
1314 (conversation_id, role, content, parts, agent_visible, user_visible, \
1315 importance_score, consolidated, consolidation_confidence) \
1316 VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1317 RETURNING id"
1318 ))
1319 .bind(conversation_id)
1320 .bind(role)
1321 .bind(merged_content)
1322 .bind(importance)
1323 .bind(confidence)
1324 .fetch_one(&mut *tx)
1325 .await?;
1326 let consolidated_id = row.0;
1327
1328 let consol_sql = format!(
1329 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1330 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1331 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1332 );
1333 for &source_id in source_ids {
1334 zeph_db::query(&consol_sql)
1335 .bind(consolidated_id)
1336 .bind(source_id)
1337 .execute(&mut *tx)
1338 .await?;
1339
1340 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1342 .bind(source_id)
1343 .execute(&mut *tx)
1344 .await?;
1345 }
1346
1347 tx.commit().await?;
1348 Ok(true)
1349 }
1350
1351 pub async fn apply_consolidation_update(
1364 &self,
1365 target_id: MessageId,
1366 new_content: &str,
1367 additional_source_ids: &[MessageId],
1368 confidence: f32,
1369 confidence_threshold: f32,
1370 ) -> Result<bool, MemoryError> {
1371 if confidence < confidence_threshold {
1372 return Ok(false);
1373 }
1374
1375 let mut tx = self.pool.begin().await?;
1376
1377 zeph_db::query(sql!(
1378 "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1379 ))
1380 .bind(new_content)
1381 .bind(confidence)
1382 .bind(target_id)
1383 .execute(&mut *tx)
1384 .await?;
1385
1386 let consol_sql = format!(
1387 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1388 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1389 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1390 );
1391 for &source_id in additional_source_ids {
1392 zeph_db::query(&consol_sql)
1393 .bind(target_id)
1394 .bind(source_id)
1395 .execute(&mut *tx)
1396 .await?;
1397
1398 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1399 .bind(source_id)
1400 .execute(&mut *tx)
1401 .await?;
1402 }
1403
1404 tx.commit().await?;
1405 Ok(true)
1406 }
1407
1408 pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1418 zeph_db::query(sql!(
1419 "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1420 ))
1421 .bind(score)
1422 .bind(id)
1423 .execute(&self.pool)
1424 .await?;
1425 Ok(())
1426 }
1427
1428 pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1436 let row: Option<(f64,)> = zeph_db::query_as(sql!(
1437 "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1438 ))
1439 .bind(id)
1440 .fetch_optional(&self.pool)
1441 .await?;
1442 Ok(row.map(|(s,)| s))
1443 }
1444
1445 pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1453 self.increment_access_counts(ids).await
1454 }
1455
1456 pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1464 for &id in ids {
1465 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1466 .bind(id)
1467 .execute(&self.pool)
1468 .await?;
1469 }
1470 Ok(())
1471 }
1472
1473 pub async fn run_forgetting_sweep_tx(
1489 &self,
1490 config: &zeph_common::config::memory::ForgettingConfig,
1491 ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1492 let mut tx = self.pool.begin().await?;
1493
1494 let decay = f64::from(config.decay_rate);
1495 let floor = f64::from(config.forgetting_floor);
1496 let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1497 let replay_hours = i64::from(config.replay_window_hours);
1498 let replay_min_access = i64::from(config.replay_min_access_count);
1499 let protect_hours = i64::from(config.protect_recent_hours);
1500 let protect_min_access = i64::from(config.protect_min_access_count);
1501
1502 let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1505 "SELECT id FROM messages \
1506 WHERE deleted_at IS NULL AND consolidated = 0 \
1507 ORDER BY importance_score ASC \
1508 LIMIT ?"
1509 ))
1510 .bind(batch)
1511 .fetch_all(&mut *tx)
1512 .await?;
1513
1514 #[allow(clippy::cast_possible_truncation)]
1515 let downscaled = candidate_ids.len() as u32;
1516
1517 if downscaled > 0 {
1518 let placeholders: String = candidate_ids
1519 .iter()
1520 .map(|_| "?")
1521 .collect::<Vec<_>>()
1522 .join(",");
1523 let downscale_sql = format!(
1524 "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1525 WHERE id IN ({placeholders})"
1526 );
1527 let mut q = zeph_db::query(&downscale_sql);
1528 for &(id,) in &candidate_ids {
1529 q = q.bind(id);
1530 }
1531 q.execute(&mut *tx).await?;
1532 }
1533
1534 let replayed = if downscaled > 0 {
1540 let replay_placeholders: String = candidate_ids
1541 .iter()
1542 .map(|_| "?")
1543 .collect::<Vec<_>>()
1544 .join(",");
1545 let replay_sql = format!(
1546 "UPDATE messages \
1547 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1548 WHERE id IN ({replay_placeholders}) \
1549 AND (\
1550 (last_accessed IS NOT NULL \
1551 AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1552 OR access_count >= ?\
1553 )"
1554 );
1555 let mut rq = zeph_db::query(&replay_sql);
1556 for &(id,) in &candidate_ids {
1557 rq = rq.bind(id);
1558 }
1559 let replay_result = rq
1560 .bind(replay_hours)
1561 .bind(replay_min_access)
1562 .execute(&mut *tx)
1563 .await?;
1564 #[allow(clippy::cast_possible_truncation)]
1565 let n = replay_result.rows_affected() as u32;
1566 n
1567 } else {
1568 0
1569 };
1570
1571 let prune_sql = format!(
1573 "UPDATE messages \
1574 SET deleted_at = CURRENT_TIMESTAMP \
1575 WHERE deleted_at IS NULL AND consolidated = 0 \
1576 AND importance_score < {floor} \
1577 AND (\
1578 last_accessed IS NULL \
1579 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1580 ) \
1581 AND access_count < ?"
1582 );
1583 let prune_result = zeph_db::query(&prune_sql)
1584 .bind(protect_hours)
1585 .bind(protect_min_access)
1586 .execute(&mut *tx)
1587 .await?;
1588 #[allow(clippy::cast_possible_truncation)]
1589 let pruned = prune_result.rows_affected() as u32;
1590
1591 tx.commit().await?;
1592
1593 Ok(crate::forgetting::ForgettingResult {
1594 downscaled,
1595 replayed,
1596 pruned,
1597 })
1598 }
1599}
1600
1601#[derive(Debug, Clone)]
1603pub struct PromotionCandidate {
1604 pub id: MessageId,
1605 pub conversation_id: ConversationId,
1606 pub content: String,
1607 pub session_count: u32,
1608 pub importance_score: f64,
1609}
1610
1611#[cfg(test)]
1612mod tests;