1use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18 match s {
19 "assistant" => Role::Assistant,
20 "system" => Role::System,
21 _ => Role::User,
22 }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27 match role {
28 Role::System => "system",
29 Role::User => "user",
30 Role::Assistant => "assistant",
31 }
32}
33
34fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37 match key {
38 "Text" => Some("text"),
39 "ToolOutput" => Some("tool_output"),
40 "Recall" => Some("recall"),
41 "CodeContext" => Some("code_context"),
42 "Summary" => Some("summary"),
43 "CrossSession" => Some("cross_session"),
44 "ToolUse" => Some("tool_use"),
45 "ToolResult" => Some("tool_result"),
46 "Image" => Some("image"),
47 "ThinkingBlock" => Some("thinking_block"),
48 "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49 "Compaction" => Some("compaction"),
50 _ => None,
51 }
52}
53
54fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62 let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63 let mut result = Vec::with_capacity(array.len());
64 for element in array {
65 let obj = element.as_object()?;
66 if obj.contains_key("kind") {
67 return None;
68 }
69 if obj.len() != 1 {
70 return None;
71 }
72 let (key, inner) = obj.iter().next()?;
73 let kind = legacy_key_to_kind(key)?;
74 let mut new_obj = match inner {
75 serde_json::Value::Object(m) => m.clone(),
76 other => {
78 let mut m = serde_json::Map::new();
79 m.insert("data".to_string(), other.clone());
80 m
81 }
82 };
83 new_obj.insert(
84 "kind".to_string(),
85 serde_json::Value::String(kind.to_string()),
86 );
87 let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88 result.push(part);
89 }
90 Some(result)
91}
92
93fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98 if parts_json == "[]" {
99 return vec![];
100 }
101 match serde_json::from_str(parts_json) {
102 Ok(p) => p,
103 Err(e) => {
104 if let Some(parts) = try_parse_legacy_parts(parts_json) {
105 let truncated = parts_json.chars().take(120).collect::<String>();
106 tracing::warn!(
107 role = %role_str,
108 parts_json = %truncated,
109 "loaded legacy-format message parts via compat path"
110 );
111 return parts;
112 }
113 let truncated = parts_json.chars().take(120).collect::<String>();
114 tracing::warn!(
115 role = %role_str,
116 parts_json = %truncated,
117 error = %e,
118 "failed to deserialize message parts, falling back to empty"
119 );
120 vec![]
121 }
122 }
123}
124
125impl SqliteStore {
126 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132 let row: (ConversationId,) = zeph_db::query_as(sql!(
133 "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134 ))
135 .fetch_one(&self.pool)
136 .await?;
137 Ok(row.0)
138 }
139
140 pub async fn save_message(
146 &self,
147 conversation_id: ConversationId,
148 role: &str,
149 content: &str,
150 ) -> Result<MessageId, MemoryError> {
151 self.save_message_with_parts(conversation_id, role, content, "[]")
152 .await
153 }
154
155 pub async fn save_message_with_parts(
161 &self,
162 conversation_id: ConversationId,
163 role: &str,
164 content: &str,
165 parts_json: &str,
166 ) -> Result<MessageId, MemoryError> {
167 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
168 .await
169 }
170
171 pub async fn save_message_with_metadata(
177 &self,
178 conversation_id: ConversationId,
179 role: &str,
180 content: &str,
181 parts_json: &str,
182 agent_visible: bool,
183 user_visible: bool,
184 ) -> Result<MessageId, MemoryError> {
185 let importance_score = crate::semantic::importance::compute_importance(content, role);
186 let row: (MessageId,) = zeph_db::query_as(
187 sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
188 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
189 )
190 .bind(conversation_id)
191 .bind(role)
192 .bind(content)
193 .bind(parts_json)
194 .bind(i64::from(agent_visible))
195 .bind(i64::from(user_visible))
196 .bind(importance_score)
197 .fetch_one(&self.pool)
198 .await?;
199 Ok(row.0)
200 }
201
202 pub async fn load_history(
208 &self,
209 conversation_id: ConversationId,
210 limit: u32,
211 ) -> Result<Vec<Message>, MemoryError> {
212 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
213 "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
214 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
215 WHERE conversation_id = ? AND deleted_at IS NULL \
216 ORDER BY id DESC \
217 LIMIT ?\
218 ) ORDER BY id ASC"
219 ))
220 .bind(conversation_id)
221 .bind(limit)
222 .fetch_all(&self.pool)
223 .await?;
224
225 let messages = rows
226 .into_iter()
227 .map(
228 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
229 let parts = parse_parts_json(&role_str, &parts_json);
230 Message {
231 role: parse_role(&role_str),
232 content,
233 parts,
234 metadata: MessageMetadata {
235 agent_visible: agent_visible != 0,
236 user_visible: user_visible != 0,
237 compacted_at: None,
238 deferred_summary: None,
239 focus_pinned: false,
240 focus_marker_id: None,
241 db_id: Some(row_id),
242 },
243 }
244 },
245 )
246 .collect();
247 Ok(messages)
248 }
249
250 pub async fn load_history_filtered(
258 &self,
259 conversation_id: ConversationId,
260 limit: u32,
261 agent_visible: Option<bool>,
262 user_visible: Option<bool>,
263 ) -> Result<Vec<Message>, MemoryError> {
264 let av = agent_visible.map(i64::from);
265 let uv = user_visible.map(i64::from);
266
267 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
268 sql!("WITH recent AS (\
269 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
270 WHERE conversation_id = ? \
271 AND deleted_at IS NULL \
272 AND (? IS NULL OR agent_visible = ?) \
273 AND (? IS NULL OR user_visible = ?) \
274 ORDER BY id DESC \
275 LIMIT ?\
276 ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
277 )
278 .bind(conversation_id)
279 .bind(av)
280 .bind(av)
281 .bind(uv)
282 .bind(uv)
283 .bind(limit)
284 .fetch_all(&self.pool)
285 .await?;
286
287 let messages = rows
288 .into_iter()
289 .map(
290 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
291 let parts = parse_parts_json(&role_str, &parts_json);
292 Message {
293 role: parse_role(&role_str),
294 content,
295 parts,
296 metadata: MessageMetadata {
297 agent_visible: agent_visible != 0,
298 user_visible: user_visible != 0,
299 compacted_at: None,
300 deferred_summary: None,
301 focus_pinned: false,
302 focus_marker_id: None,
303 db_id: Some(row_id),
304 },
305 }
306 },
307 )
308 .collect();
309 Ok(messages)
310 }
311
312 pub async fn replace_conversation(
324 &self,
325 conversation_id: ConversationId,
326 compacted_range: std::ops::RangeInclusive<MessageId>,
327 summary_role: &str,
328 summary_content: &str,
329 ) -> Result<MessageId, MemoryError> {
330 let now = {
331 let secs = std::time::SystemTime::now()
332 .duration_since(std::time::UNIX_EPOCH)
333 .unwrap_or_default()
334 .as_secs();
335 format!("{secs}")
336 };
337 let start_id = compacted_range.start().0;
338 let end_id = compacted_range.end().0;
339
340 let mut tx = self.pool.begin().await?;
341
342 zeph_db::query(sql!(
343 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
344 WHERE conversation_id = ? AND id >= ? AND id <= ?"
345 ))
346 .bind(&now)
347 .bind(conversation_id)
348 .bind(start_id)
349 .bind(end_id)
350 .execute(&mut *tx)
351 .await?;
352
353 let row: (MessageId,) = zeph_db::query_as(sql!(
355 "INSERT INTO messages \
356 (conversation_id, role, content, parts, agent_visible, user_visible) \
357 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
358 ))
359 .bind(conversation_id)
360 .bind(summary_role)
361 .bind(summary_content)
362 .fetch_one(&mut *tx)
363 .await?;
364
365 tx.commit().await?;
366
367 Ok(row.0)
368 }
369
370 pub async fn apply_tool_pair_summaries(
380 &self,
381 conversation_id: ConversationId,
382 hide_ids: &[i64],
383 summaries: &[String],
384 ) -> Result<(), MemoryError> {
385 if hide_ids.is_empty() && summaries.is_empty() {
386 return Ok(());
387 }
388
389 let now = std::time::SystemTime::now()
390 .duration_since(std::time::UNIX_EPOCH)
391 .unwrap_or_default()
392 .as_secs()
393 .to_string();
394
395 let mut tx = self.pool.begin().await?;
396
397 for &id in hide_ids {
398 zeph_db::query(sql!(
399 "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
400 ))
401 .bind(&now)
402 .bind(id)
403 .execute(&mut *tx)
404 .await?;
405 }
406
407 for summary in summaries {
408 let content = format!("[tool summary] {summary}");
409 let parts = serde_json::to_string(&[MessagePart::Summary {
410 text: summary.clone(),
411 }])
412 .unwrap_or_else(|_| "[]".to_string());
413 zeph_db::query(sql!(
414 "INSERT INTO messages \
415 (conversation_id, role, content, parts, agent_visible, user_visible) \
416 VALUES (?, 'assistant', ?, ?, 1, 0)"
417 ))
418 .bind(conversation_id)
419 .bind(&content)
420 .bind(&parts)
421 .execute(&mut *tx)
422 .await?;
423 }
424
425 tx.commit().await?;
426 Ok(())
427 }
428
429 pub async fn oldest_message_ids(
435 &self,
436 conversation_id: ConversationId,
437 n: u32,
438 ) -> Result<Vec<MessageId>, MemoryError> {
439 let rows: Vec<(MessageId,)> = zeph_db::query_as(
440 sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
441 )
442 .bind(conversation_id)
443 .bind(n)
444 .fetch_all(&self.pool)
445 .await?;
446 Ok(rows.into_iter().map(|r| r.0).collect())
447 }
448
449 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
455 let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
456 "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
457 ))
458 .fetch_optional(&self.pool)
459 .await?;
460 Ok(row.map(|r| r.0))
461 }
462
463 pub async fn message_by_id(
469 &self,
470 message_id: MessageId,
471 ) -> Result<Option<Message>, MemoryError> {
472 let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
473 sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
474 )
475 .bind(message_id)
476 .fetch_optional(&self.pool)
477 .await?;
478
479 Ok(row.map(
480 |(role_str, content, parts_json, agent_visible, user_visible)| {
481 let parts = parse_parts_json(&role_str, &parts_json);
482 Message {
483 role: parse_role(&role_str),
484 content,
485 parts,
486 metadata: MessageMetadata {
487 agent_visible: agent_visible != 0,
488 user_visible: user_visible != 0,
489 compacted_at: None,
490 deferred_summary: None,
491 focus_pinned: false,
492 focus_marker_id: None,
493 db_id: None,
494 },
495 }
496 },
497 ))
498 }
499
500 pub async fn messages_by_ids(
506 &self,
507 ids: &[MessageId],
508 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
509 if ids.is_empty() {
510 return Ok(Vec::new());
511 }
512
513 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
514
515 let query = format!(
516 "SELECT id, role, content, parts FROM messages \
517 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
518 );
519 let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
520 for &id in ids {
521 q = q.bind(id);
522 }
523
524 let rows = q.fetch_all(&self.pool).await?;
525
526 Ok(rows
527 .into_iter()
528 .map(|(id, role_str, content, parts_json)| {
529 let parts = parse_parts_json(&role_str, &parts_json);
530 (
531 id,
532 Message {
533 role: parse_role(&role_str),
534 content,
535 parts,
536 metadata: MessageMetadata::default(),
537 },
538 )
539 })
540 .collect())
541 }
542
543 pub async fn unembedded_message_ids(
549 &self,
550 limit: Option<usize>,
551 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
552 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
553
554 let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
555 "SELECT m.id, m.conversation_id, m.role, m.content \
556 FROM messages m \
557 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
558 WHERE em.id IS NULL AND m.deleted_at IS NULL \
559 ORDER BY m.id ASC \
560 LIMIT ?"
561 ))
562 .bind(effective_limit)
563 .fetch_all(&self.pool)
564 .await?;
565
566 Ok(rows)
567 }
568
569 pub fn stream_unembedded_messages(
580 &self,
581 limit: i64,
582 ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
583 {
584 zeph_db::query_as(sql!(
585 "SELECT m.id, m.conversation_id, m.role, m.content \
586 FROM messages m \
587 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
588 WHERE em.id IS NULL AND m.deleted_at IS NULL \
589 ORDER BY m.id ASC \
590 LIMIT ?"
591 ))
592 .bind(limit)
593 .fetch(&self.pool)
594 .map_err(MemoryError::from)
595 }
596
597 pub async fn count_messages(
603 &self,
604 conversation_id: ConversationId,
605 ) -> Result<i64, MemoryError> {
606 let row: (i64,) = zeph_db::query_as(sql!(
607 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
608 ))
609 .bind(conversation_id)
610 .fetch_one(&self.pool)
611 .await?;
612 Ok(row.0)
613 }
614
615 pub async fn count_messages_after(
621 &self,
622 conversation_id: ConversationId,
623 after_id: MessageId,
624 ) -> Result<i64, MemoryError> {
625 let row: (i64,) =
626 zeph_db::query_as(
627 sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
628 )
629 .bind(conversation_id)
630 .bind(after_id)
631 .fetch_one(&self.pool)
632 .await?;
633 Ok(row.0)
634 }
635
636 pub async fn keyword_search(
645 &self,
646 query: &str,
647 limit: usize,
648 conversation_id: Option<ConversationId>,
649 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
650 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
651 let safe_query = sanitize_fts_query(query);
652 if safe_query.is_empty() {
653 return Ok(Vec::new());
654 }
655
656 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
657 zeph_db::query_as(
658 sql!("SELECT m.id, -rank AS score \
659 FROM messages_fts f \
660 JOIN messages m ON m.id = f.rowid \
661 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
662 ORDER BY rank \
663 LIMIT ?"),
664 )
665 .bind(&safe_query)
666 .bind(cid)
667 .bind(effective_limit)
668 .fetch_all(&self.pool)
669 .await?
670 } else {
671 zeph_db::query_as(sql!(
672 "SELECT m.id, -rank AS score \
673 FROM messages_fts f \
674 JOIN messages m ON m.id = f.rowid \
675 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
676 ORDER BY rank \
677 LIMIT ?"
678 ))
679 .bind(&safe_query)
680 .bind(effective_limit)
681 .fetch_all(&self.pool)
682 .await?
683 };
684
685 Ok(rows)
686 }
687
688 pub async fn keyword_search_with_time_range(
701 &self,
702 query: &str,
703 limit: usize,
704 conversation_id: Option<ConversationId>,
705 after: Option<&str>,
706 before: Option<&str>,
707 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
708 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
709 let safe_query = sanitize_fts_query(query);
710 if safe_query.is_empty() {
711 return Ok(Vec::new());
712 }
713
714 let after_clause = if after.is_some() {
716 " AND m.created_at > ?"
717 } else {
718 ""
719 };
720 let before_clause = if before.is_some() {
721 " AND m.created_at < ?"
722 } else {
723 ""
724 };
725 let conv_clause = if conversation_id.is_some() {
726 " AND m.conversation_id = ?"
727 } else {
728 ""
729 };
730
731 let sql = format!(
732 "SELECT m.id, -rank AS score \
733 FROM messages_fts f \
734 JOIN messages m ON m.id = f.rowid \
735 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
736 {after_clause}{before_clause}{conv_clause} \
737 ORDER BY rank \
738 LIMIT ?"
739 );
740
741 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
742 if let Some(a) = after {
743 q = q.bind(a);
744 }
745 if let Some(b) = before {
746 q = q.bind(b);
747 }
748 if let Some(cid) = conversation_id {
749 q = q.bind(cid);
750 }
751 q = q.bind(effective_limit);
752
753 Ok(q.fetch_all(&self.pool).await?)
754 }
755
756 pub async fn message_timestamps(
764 &self,
765 ids: &[MessageId],
766 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
767 if ids.is_empty() {
768 return Ok(std::collections::HashMap::new());
769 }
770
771 let placeholders: String =
772 zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
773 let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
774 let query = format!(
775 "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
776 );
777 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
778 for &id in ids {
779 q = q.bind(id);
780 }
781
782 let rows = q.fetch_all(&self.pool).await?;
783 Ok(rows.into_iter().collect())
784 }
785
786 pub async fn load_messages_range(
792 &self,
793 conversation_id: ConversationId,
794 after_message_id: MessageId,
795 limit: usize,
796 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
797 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
798
799 let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
800 "SELECT id, role, content FROM messages \
801 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
802 ORDER BY id ASC LIMIT ?"
803 ))
804 .bind(conversation_id)
805 .bind(after_message_id)
806 .bind(effective_limit)
807 .fetch_all(&self.pool)
808 .await?;
809
810 Ok(rows)
811 }
812
813 pub async fn get_eviction_candidates(
821 &self,
822 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
823 let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
824 "SELECT id, created_at, last_accessed, access_count \
825 FROM messages WHERE deleted_at IS NULL"
826 ))
827 .fetch_all(&self.pool)
828 .await?;
829
830 Ok(rows
831 .into_iter()
832 .map(
833 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
834 id,
835 created_at,
836 last_accessed,
837 access_count: access_count.try_into().unwrap_or(0),
838 },
839 )
840 .collect())
841 }
842
843 pub async fn soft_delete_messages(
851 &self,
852 ids: &[MessageId],
853 ) -> Result<(), crate::error::MemoryError> {
854 if ids.is_empty() {
855 return Ok(());
856 }
857 for &id in ids {
859 zeph_db::query(
860 sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
861 )
862 .bind(id)
863 .execute(&self.pool)
864 .await?;
865 }
866 Ok(())
867 }
868
869 pub async fn get_soft_deleted_message_ids(
875 &self,
876 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
877 let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
878 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
879 ))
880 .fetch_all(&self.pool)
881 .await?;
882 Ok(rows.into_iter().map(|(id,)| id).collect())
883 }
884
885 pub async fn mark_qdrant_cleaned(
891 &self,
892 ids: &[MessageId],
893 ) -> Result<(), crate::error::MemoryError> {
894 for &id in ids {
895 zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
896 .bind(id)
897 .execute(&self.pool)
898 .await?;
899 }
900 Ok(())
901 }
902
903 pub async fn fetch_importance_scores(
911 &self,
912 ids: &[MessageId],
913 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
914 if ids.is_empty() {
915 return Ok(std::collections::HashMap::new());
916 }
917 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
918 let query = format!(
919 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
920 );
921 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
922 for &id in ids {
923 q = q.bind(id);
924 }
925 let rows = q.fetch_all(&self.pool).await?;
926 Ok(rows.into_iter().collect())
927 }
928
929 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
937 if ids.is_empty() {
938 return Ok(());
939 }
940 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
941 let query = format!(
942 "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
943 WHERE id IN ({placeholders})"
944 );
945 let mut q = zeph_db::query(&query);
946 for &id in ids {
947 q = q.bind(id);
948 }
949 q.execute(&self.pool).await?;
950 Ok(())
951 }
952
953 pub async fn find_promotion_candidates(
962 &self,
963 min_sessions: u32,
964 batch_size: usize,
965 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
966 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
967 let min = i64::from(min_sessions);
968 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
969 "SELECT id, conversation_id, content, session_count, importance_score \
970 FROM messages \
971 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
972 ORDER BY session_count DESC, importance_score DESC \
973 LIMIT ?"
974 ))
975 .bind(min)
976 .bind(limit)
977 .fetch_all(&self.pool)
978 .await?;
979
980 Ok(rows
981 .into_iter()
982 .map(
983 |(id, conversation_id, content, session_count, importance_score)| {
984 PromotionCandidate {
985 id,
986 conversation_id,
987 content,
988 session_count: session_count.try_into().unwrap_or(0),
989 importance_score,
990 }
991 },
992 )
993 .collect())
994 }
995
996 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1004 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1005 "SELECT tier, COUNT(*) FROM messages \
1006 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1007 GROUP BY tier"
1008 ))
1009 .fetch_all(&self.pool)
1010 .await?;
1011
1012 let mut episodic = 0i64;
1013 let mut semantic = 0i64;
1014 for (tier, count) in rows {
1015 match tier.as_str() {
1016 "episodic" => episodic = count,
1017 "semantic" => semantic = count,
1018 _ => {}
1019 }
1020 }
1021 Ok((episodic, semantic))
1022 }
1023
1024 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1030 let row: (i64,) = zeph_db::query_as(sql!(
1031 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1032 ))
1033 .fetch_one(&self.pool)
1034 .await?;
1035 Ok(row.0)
1036 }
1037
1038 pub async fn promote_to_semantic(
1051 &self,
1052 conversation_id: ConversationId,
1053 merged_content: &str,
1054 original_ids: &[MessageId],
1055 ) -> Result<MessageId, MemoryError> {
1056 if original_ids.is_empty() {
1057 return Err(MemoryError::Other(
1058 "promote_to_semantic: original_ids must not be empty".into(),
1059 ));
1060 }
1061
1062 let mut tx = begin_write(&self.pool).await?;
1065
1066 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1068 let promote_insert_raw = format!(
1069 "INSERT INTO messages \
1070 (conversation_id, role, content, parts, agent_visible, user_visible, \
1071 tier, promotion_timestamp) \
1072 VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1073 RETURNING id"
1074 );
1075 let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1076 let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1077 .bind(conversation_id)
1078 .bind(merged_content)
1079 .fetch_one(&mut *tx)
1080 .await?;
1081
1082 let new_id = row.0;
1083
1084 for &id in original_ids {
1086 zeph_db::query(sql!(
1087 "UPDATE messages \
1088 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1089 WHERE id = ? AND deleted_at IS NULL"
1090 ))
1091 .bind(id)
1092 .execute(&mut *tx)
1093 .await?;
1094 }
1095
1096 tx.commit().await?;
1097 Ok(new_id)
1098 }
1099
1100 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1109 if ids.is_empty() {
1110 return Ok(0);
1111 }
1112 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1113 let manual_promote_raw = format!(
1114 "UPDATE messages \
1115 SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1116 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1117 );
1118 let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1119 let mut count = 0usize;
1120 for &id in ids {
1121 let result = zeph_db::query(&manual_promote_sql)
1122 .bind(id)
1123 .execute(&self.pool)
1124 .await?;
1125 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1126 }
1127 Ok(count)
1128 }
1129
1130 pub async fn increment_session_counts_for_conversation(
1139 &self,
1140 conversation_id: ConversationId,
1141 ) -> Result<(), MemoryError> {
1142 zeph_db::query(sql!(
1143 "UPDATE messages SET session_count = session_count + 1 \
1144 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1145 ))
1146 .bind(conversation_id)
1147 .execute(&self.pool)
1148 .await?;
1149 Ok(())
1150 }
1151
1152 pub async fn fetch_tiers(
1160 &self,
1161 ids: &[MessageId],
1162 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1163 if ids.is_empty() {
1164 return Ok(std::collections::HashMap::new());
1165 }
1166 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1167 let query = format!(
1168 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1169 );
1170 let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1171 for &id in ids {
1172 q = q.bind(id);
1173 }
1174 let rows = q.fetch_all(&self.pool).await?;
1175 Ok(rows.into_iter().collect())
1176 }
1177
1178 pub async fn conversations_with_unconsolidated_messages(
1186 &self,
1187 ) -> Result<Vec<ConversationId>, MemoryError> {
1188 let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1189 "SELECT DISTINCT conversation_id FROM messages \
1190 WHERE consolidated = 0 AND deleted_at IS NULL"
1191 ))
1192 .fetch_all(&self.pool)
1193 .await?;
1194 Ok(rows.into_iter().map(|(id,)| id).collect())
1195 }
1196
1197 pub async fn find_unconsolidated_messages(
1206 &self,
1207 conversation_id: ConversationId,
1208 limit: usize,
1209 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1210 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1211 let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1212 "SELECT id, content FROM messages \
1213 WHERE conversation_id = ? \
1214 AND consolidated = 0 \
1215 AND deleted_at IS NULL \
1216 ORDER BY id ASC \
1217 LIMIT ?"
1218 ))
1219 .bind(conversation_id)
1220 .bind(limit)
1221 .fetch_all(&self.pool)
1222 .await?;
1223 Ok(rows)
1224 }
1225
1226 pub async fn find_consolidated_for_source(
1235 &self,
1236 source_id: MessageId,
1237 ) -> Result<Option<MessageId>, MemoryError> {
1238 let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1239 "SELECT consolidated_id FROM memory_consolidation_sources \
1240 WHERE source_id = ? \
1241 LIMIT 1"
1242 ))
1243 .bind(source_id)
1244 .fetch_optional(&self.pool)
1245 .await?;
1246 Ok(row.map(|(id,)| id))
1247 }
1248
1249 pub async fn apply_consolidation_merge(
1263 &self,
1264 conversation_id: ConversationId,
1265 role: &str,
1266 merged_content: &str,
1267 source_ids: &[MessageId],
1268 confidence: f32,
1269 confidence_threshold: f32,
1270 ) -> Result<bool, MemoryError> {
1271 if confidence < confidence_threshold {
1272 return Ok(false);
1273 }
1274 if source_ids.is_empty() {
1275 return Ok(false);
1276 }
1277
1278 let mut tx = self.pool.begin().await?;
1279
1280 let importance = crate::semantic::importance::compute_importance(merged_content, role);
1281 let row: (MessageId,) = zeph_db::query_as(sql!(
1282 "INSERT INTO messages \
1283 (conversation_id, role, content, parts, agent_visible, user_visible, \
1284 importance_score, consolidated, consolidation_confidence) \
1285 VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1286 RETURNING id"
1287 ))
1288 .bind(conversation_id)
1289 .bind(role)
1290 .bind(merged_content)
1291 .bind(importance)
1292 .bind(confidence)
1293 .fetch_one(&mut *tx)
1294 .await?;
1295 let consolidated_id = row.0;
1296
1297 let consol_sql = format!(
1298 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1299 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1300 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1301 );
1302 for &source_id in source_ids {
1303 zeph_db::query(&consol_sql)
1304 .bind(consolidated_id)
1305 .bind(source_id)
1306 .execute(&mut *tx)
1307 .await?;
1308
1309 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1311 .bind(source_id)
1312 .execute(&mut *tx)
1313 .await?;
1314 }
1315
1316 tx.commit().await?;
1317 Ok(true)
1318 }
1319
1320 pub async fn apply_consolidation_update(
1333 &self,
1334 target_id: MessageId,
1335 new_content: &str,
1336 additional_source_ids: &[MessageId],
1337 confidence: f32,
1338 confidence_threshold: f32,
1339 ) -> Result<bool, MemoryError> {
1340 if confidence < confidence_threshold {
1341 return Ok(false);
1342 }
1343
1344 let mut tx = self.pool.begin().await?;
1345
1346 zeph_db::query(sql!(
1347 "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1348 ))
1349 .bind(new_content)
1350 .bind(confidence)
1351 .bind(target_id)
1352 .execute(&mut *tx)
1353 .await?;
1354
1355 let consol_sql = format!(
1356 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1357 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1358 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1359 );
1360 for &source_id in additional_source_ids {
1361 zeph_db::query(&consol_sql)
1362 .bind(target_id)
1363 .bind(source_id)
1364 .execute(&mut *tx)
1365 .await?;
1366
1367 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1368 .bind(source_id)
1369 .execute(&mut *tx)
1370 .await?;
1371 }
1372
1373 tx.commit().await?;
1374 Ok(true)
1375 }
1376
1377 pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1387 zeph_db::query(sql!(
1388 "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1389 ))
1390 .bind(score)
1391 .bind(id)
1392 .execute(&self.pool)
1393 .await?;
1394 Ok(())
1395 }
1396
1397 pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1405 let row: Option<(f64,)> = zeph_db::query_as(sql!(
1406 "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1407 ))
1408 .bind(id)
1409 .fetch_optional(&self.pool)
1410 .await?;
1411 Ok(row.map(|(s,)| s))
1412 }
1413
1414 pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1422 self.increment_access_counts(ids).await
1423 }
1424
1425 pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1433 for &id in ids {
1434 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1435 .bind(id)
1436 .execute(&self.pool)
1437 .await?;
1438 }
1439 Ok(())
1440 }
1441
1442 pub async fn run_forgetting_sweep_tx(
1458 &self,
1459 config: &zeph_common::config::memory::ForgettingConfig,
1460 ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1461 let mut tx = self.pool.begin().await?;
1462
1463 let decay = f64::from(config.decay_rate);
1464 let floor = f64::from(config.forgetting_floor);
1465 let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1466 let replay_hours = i64::from(config.replay_window_hours);
1467 let replay_min_access = i64::from(config.replay_min_access_count);
1468 let protect_hours = i64::from(config.protect_recent_hours);
1469 let protect_min_access = i64::from(config.protect_min_access_count);
1470
1471 let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1474 "SELECT id FROM messages \
1475 WHERE deleted_at IS NULL AND consolidated = 0 \
1476 ORDER BY importance_score ASC \
1477 LIMIT ?"
1478 ))
1479 .bind(batch)
1480 .fetch_all(&mut *tx)
1481 .await?;
1482
1483 #[allow(clippy::cast_possible_truncation)]
1484 let downscaled = candidate_ids.len() as u32;
1485
1486 if downscaled > 0 {
1487 let placeholders: String = candidate_ids
1488 .iter()
1489 .map(|_| "?")
1490 .collect::<Vec<_>>()
1491 .join(",");
1492 let downscale_sql = format!(
1493 "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1494 WHERE id IN ({placeholders})"
1495 );
1496 let mut q = zeph_db::query(&downscale_sql);
1497 for &(id,) in &candidate_ids {
1498 q = q.bind(id);
1499 }
1500 q.execute(&mut *tx).await?;
1501 }
1502
1503 let replayed = if downscaled > 0 {
1509 let replay_placeholders: String = candidate_ids
1510 .iter()
1511 .map(|_| "?")
1512 .collect::<Vec<_>>()
1513 .join(",");
1514 let replay_sql = format!(
1515 "UPDATE messages \
1516 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1517 WHERE id IN ({replay_placeholders}) \
1518 AND (\
1519 (last_accessed IS NOT NULL \
1520 AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1521 OR access_count >= ?\
1522 )"
1523 );
1524 let mut rq = zeph_db::query(&replay_sql);
1525 for &(id,) in &candidate_ids {
1526 rq = rq.bind(id);
1527 }
1528 let replay_result = rq
1529 .bind(replay_hours)
1530 .bind(replay_min_access)
1531 .execute(&mut *tx)
1532 .await?;
1533 #[allow(clippy::cast_possible_truncation)]
1534 let n = replay_result.rows_affected() as u32;
1535 n
1536 } else {
1537 0
1538 };
1539
1540 let prune_sql = format!(
1542 "UPDATE messages \
1543 SET deleted_at = CURRENT_TIMESTAMP \
1544 WHERE deleted_at IS NULL AND consolidated = 0 \
1545 AND importance_score < {floor} \
1546 AND (\
1547 last_accessed IS NULL \
1548 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1549 ) \
1550 AND access_count < ?"
1551 );
1552 let prune_result = zeph_db::query(&prune_sql)
1553 .bind(protect_hours)
1554 .bind(protect_min_access)
1555 .execute(&mut *tx)
1556 .await?;
1557 #[allow(clippy::cast_possible_truncation)]
1558 let pruned = prune_result.rows_affected() as u32;
1559
1560 tx.commit().await?;
1561
1562 Ok(crate::forgetting::ForgettingResult {
1563 downscaled,
1564 replayed,
1565 pruned,
1566 })
1567 }
1568}
1569
1570#[derive(Debug, Clone)]
1572pub struct PromotionCandidate {
1573 pub id: MessageId,
1574 pub conversation_id: ConversationId,
1575 pub content: String,
1576 pub session_count: u32,
1577 pub importance_score: f64,
1578}
1579
1580#[cfg(test)]
1581mod tests;