1#[allow(unused_imports)]
5use zeph_common;
6use zeph_db::ActiveDialect;
7use zeph_db::fts::sanitize_fts_query;
8#[allow(unused_imports)]
9use zeph_db::{begin_write, sql};
10use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
11
12use super::SqliteStore;
13use crate::error::MemoryError;
14use crate::types::{ConversationId, MessageId};
15
16fn parse_role(s: &str) -> Role {
17 match s {
18 "assistant" => Role::Assistant,
19 "system" => Role::System,
20 _ => Role::User,
21 }
22}
23
24#[must_use]
25pub fn role_str(role: Role) -> &'static str {
26 match role {
27 Role::System => "system",
28 Role::User => "user",
29 Role::Assistant => "assistant",
30 }
31}
32
33fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
36 match key {
37 "Text" => Some("text"),
38 "ToolOutput" => Some("tool_output"),
39 "Recall" => Some("recall"),
40 "CodeContext" => Some("code_context"),
41 "Summary" => Some("summary"),
42 "CrossSession" => Some("cross_session"),
43 "ToolUse" => Some("tool_use"),
44 "ToolResult" => Some("tool_result"),
45 "Image" => Some("image"),
46 "ThinkingBlock" => Some("thinking_block"),
47 "RedactedThinkingBlock" => Some("redacted_thinking_block"),
48 "Compaction" => Some("compaction"),
49 _ => None,
50 }
51}
52
53fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
61 let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
62 let mut result = Vec::with_capacity(array.len());
63 for element in array {
64 let obj = element.as_object()?;
65 if obj.contains_key("kind") {
66 return None;
67 }
68 if obj.len() != 1 {
69 return None;
70 }
71 let (key, inner) = obj.iter().next()?;
72 let kind = legacy_key_to_kind(key)?;
73 let mut new_obj = match inner {
74 serde_json::Value::Object(m) => m.clone(),
75 other => {
77 let mut m = serde_json::Map::new();
78 m.insert("data".to_string(), other.clone());
79 m
80 }
81 };
82 new_obj.insert(
83 "kind".to_string(),
84 serde_json::Value::String(kind.to_string()),
85 );
86 let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
87 result.push(part);
88 }
89 Some(result)
90}
91
92fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
97 if parts_json == "[]" {
98 return vec![];
99 }
100 match serde_json::from_str(parts_json) {
101 Ok(p) => p,
102 Err(e) => {
103 if let Some(parts) = try_parse_legacy_parts(parts_json) {
104 let truncated = parts_json.chars().take(120).collect::<String>();
105 tracing::warn!(
106 role = %role_str,
107 parts_json = %truncated,
108 "loaded legacy-format message parts via compat path"
109 );
110 return parts;
111 }
112 let truncated = parts_json.chars().take(120).collect::<String>();
113 tracing::warn!(
114 role = %role_str,
115 parts_json = %truncated,
116 error = %e,
117 "failed to deserialize message parts, falling back to empty"
118 );
119 vec![]
120 }
121 }
122}
123
124impl SqliteStore {
125 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
131 let row: (ConversationId,) = zeph_db::query_as(sql!(
132 "INSERT INTO conversations DEFAULT VALUES RETURNING id"
133 ))
134 .fetch_one(&self.pool)
135 .await?;
136 Ok(row.0)
137 }
138
139 pub async fn save_message(
145 &self,
146 conversation_id: ConversationId,
147 role: &str,
148 content: &str,
149 ) -> Result<MessageId, MemoryError> {
150 self.save_message_with_parts(conversation_id, role, content, "[]")
151 .await
152 }
153
154 pub async fn save_message_with_parts(
160 &self,
161 conversation_id: ConversationId,
162 role: &str,
163 content: &str,
164 parts_json: &str,
165 ) -> Result<MessageId, MemoryError> {
166 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
167 .await
168 }
169
170 pub async fn save_message_with_metadata(
176 &self,
177 conversation_id: ConversationId,
178 role: &str,
179 content: &str,
180 parts_json: &str,
181 agent_visible: bool,
182 user_visible: bool,
183 ) -> Result<MessageId, MemoryError> {
184 let importance_score = crate::semantic::importance::compute_importance(content, role);
185 let row: (MessageId,) = zeph_db::query_as(
186 sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
187 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
188 )
189 .bind(conversation_id)
190 .bind(role)
191 .bind(content)
192 .bind(parts_json)
193 .bind(i64::from(agent_visible))
194 .bind(i64::from(user_visible))
195 .bind(importance_score)
196 .fetch_one(&self.pool)
197 .await?;
198 Ok(row.0)
199 }
200
201 pub async fn load_history(
207 &self,
208 conversation_id: ConversationId,
209 limit: u32,
210 ) -> Result<Vec<Message>, MemoryError> {
211 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
212 "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
213 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
214 WHERE conversation_id = ? AND deleted_at IS NULL \
215 ORDER BY id DESC \
216 LIMIT ?\
217 ) ORDER BY id ASC"
218 ))
219 .bind(conversation_id)
220 .bind(limit)
221 .fetch_all(&self.pool)
222 .await?;
223
224 let messages = rows
225 .into_iter()
226 .map(
227 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
228 let parts = parse_parts_json(&role_str, &parts_json);
229 Message {
230 role: parse_role(&role_str),
231 content,
232 parts,
233 metadata: MessageMetadata {
234 agent_visible: agent_visible != 0,
235 user_visible: user_visible != 0,
236 compacted_at: None,
237 deferred_summary: None,
238 focus_pinned: false,
239 focus_marker_id: None,
240 db_id: Some(row_id),
241 },
242 }
243 },
244 )
245 .collect();
246 Ok(messages)
247 }
248
249 pub async fn load_history_filtered(
257 &self,
258 conversation_id: ConversationId,
259 limit: u32,
260 agent_visible: Option<bool>,
261 user_visible: Option<bool>,
262 ) -> Result<Vec<Message>, MemoryError> {
263 let av = agent_visible.map(i64::from);
264 let uv = user_visible.map(i64::from);
265
266 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
267 sql!("WITH recent AS (\
268 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
269 WHERE conversation_id = ? \
270 AND deleted_at IS NULL \
271 AND (? IS NULL OR agent_visible = ?) \
272 AND (? IS NULL OR user_visible = ?) \
273 ORDER BY id DESC \
274 LIMIT ?\
275 ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
276 )
277 .bind(conversation_id)
278 .bind(av)
279 .bind(av)
280 .bind(uv)
281 .bind(uv)
282 .bind(limit)
283 .fetch_all(&self.pool)
284 .await?;
285
286 let messages = rows
287 .into_iter()
288 .map(
289 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
290 let parts = parse_parts_json(&role_str, &parts_json);
291 Message {
292 role: parse_role(&role_str),
293 content,
294 parts,
295 metadata: MessageMetadata {
296 agent_visible: agent_visible != 0,
297 user_visible: user_visible != 0,
298 compacted_at: None,
299 deferred_summary: None,
300 focus_pinned: false,
301 focus_marker_id: None,
302 db_id: Some(row_id),
303 },
304 }
305 },
306 )
307 .collect();
308 Ok(messages)
309 }
310
311 pub async fn replace_conversation(
323 &self,
324 conversation_id: ConversationId,
325 compacted_range: std::ops::RangeInclusive<MessageId>,
326 summary_role: &str,
327 summary_content: &str,
328 ) -> Result<MessageId, MemoryError> {
329 let now = {
330 let secs = std::time::SystemTime::now()
331 .duration_since(std::time::UNIX_EPOCH)
332 .unwrap_or_default()
333 .as_secs();
334 format!("{secs}")
335 };
336 let start_id = compacted_range.start().0;
337 let end_id = compacted_range.end().0;
338
339 let mut tx = self.pool.begin().await?;
340
341 zeph_db::query(sql!(
342 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
343 WHERE conversation_id = ? AND id >= ? AND id <= ?"
344 ))
345 .bind(&now)
346 .bind(conversation_id)
347 .bind(start_id)
348 .bind(end_id)
349 .execute(&mut *tx)
350 .await?;
351
352 let row: (MessageId,) = zeph_db::query_as(sql!(
354 "INSERT INTO messages \
355 (conversation_id, role, content, parts, agent_visible, user_visible) \
356 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
357 ))
358 .bind(conversation_id)
359 .bind(summary_role)
360 .bind(summary_content)
361 .fetch_one(&mut *tx)
362 .await?;
363
364 tx.commit().await?;
365
366 Ok(row.0)
367 }
368
369 pub async fn apply_tool_pair_summaries(
379 &self,
380 conversation_id: ConversationId,
381 hide_ids: &[i64],
382 summaries: &[String],
383 ) -> Result<(), MemoryError> {
384 if hide_ids.is_empty() && summaries.is_empty() {
385 return Ok(());
386 }
387
388 let now = std::time::SystemTime::now()
389 .duration_since(std::time::UNIX_EPOCH)
390 .unwrap_or_default()
391 .as_secs()
392 .to_string();
393
394 let mut tx = self.pool.begin().await?;
395
396 for &id in hide_ids {
397 zeph_db::query(sql!(
398 "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
399 ))
400 .bind(&now)
401 .bind(id)
402 .execute(&mut *tx)
403 .await?;
404 }
405
406 for summary in summaries {
407 let content = format!("[tool summary] {summary}");
408 let parts = serde_json::to_string(&[MessagePart::Summary {
409 text: summary.clone(),
410 }])
411 .unwrap_or_else(|_| "[]".to_string());
412 zeph_db::query(sql!(
413 "INSERT INTO messages \
414 (conversation_id, role, content, parts, agent_visible, user_visible) \
415 VALUES (?, 'assistant', ?, ?, 1, 0)"
416 ))
417 .bind(conversation_id)
418 .bind(&content)
419 .bind(&parts)
420 .execute(&mut *tx)
421 .await?;
422 }
423
424 tx.commit().await?;
425 Ok(())
426 }
427
428 pub async fn oldest_message_ids(
434 &self,
435 conversation_id: ConversationId,
436 n: u32,
437 ) -> Result<Vec<MessageId>, MemoryError> {
438 let rows: Vec<(MessageId,)> = zeph_db::query_as(
439 sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
440 )
441 .bind(conversation_id)
442 .bind(n)
443 .fetch_all(&self.pool)
444 .await?;
445 Ok(rows.into_iter().map(|r| r.0).collect())
446 }
447
448 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
454 let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
455 "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
456 ))
457 .fetch_optional(&self.pool)
458 .await?;
459 Ok(row.map(|r| r.0))
460 }
461
462 pub async fn message_by_id(
468 &self,
469 message_id: MessageId,
470 ) -> Result<Option<Message>, MemoryError> {
471 let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
472 sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
473 )
474 .bind(message_id)
475 .fetch_optional(&self.pool)
476 .await?;
477
478 Ok(row.map(
479 |(role_str, content, parts_json, agent_visible, user_visible)| {
480 let parts = parse_parts_json(&role_str, &parts_json);
481 Message {
482 role: parse_role(&role_str),
483 content,
484 parts,
485 metadata: MessageMetadata {
486 agent_visible: agent_visible != 0,
487 user_visible: user_visible != 0,
488 compacted_at: None,
489 deferred_summary: None,
490 focus_pinned: false,
491 focus_marker_id: None,
492 db_id: None,
493 },
494 }
495 },
496 ))
497 }
498
499 pub async fn messages_by_ids(
505 &self,
506 ids: &[MessageId],
507 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
508 if ids.is_empty() {
509 return Ok(Vec::new());
510 }
511
512 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
513
514 let query = format!(
515 "SELECT id, role, content, parts FROM messages \
516 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
517 );
518 let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
519 for &id in ids {
520 q = q.bind(id);
521 }
522
523 let rows = q.fetch_all(&self.pool).await?;
524
525 Ok(rows
526 .into_iter()
527 .map(|(id, role_str, content, parts_json)| {
528 let parts = parse_parts_json(&role_str, &parts_json);
529 (
530 id,
531 Message {
532 role: parse_role(&role_str),
533 content,
534 parts,
535 metadata: MessageMetadata::default(),
536 },
537 )
538 })
539 .collect())
540 }
541
542 pub async fn unembedded_message_ids(
548 &self,
549 limit: Option<usize>,
550 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
551 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
552
553 let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
554 "SELECT m.id, m.conversation_id, m.role, m.content \
555 FROM messages m \
556 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
557 WHERE em.id IS NULL AND m.deleted_at IS NULL \
558 ORDER BY m.id ASC \
559 LIMIT ?"
560 ))
561 .bind(effective_limit)
562 .fetch_all(&self.pool)
563 .await?;
564
565 Ok(rows)
566 }
567
568 pub async fn count_messages(
574 &self,
575 conversation_id: ConversationId,
576 ) -> Result<i64, MemoryError> {
577 let row: (i64,) = zeph_db::query_as(sql!(
578 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
579 ))
580 .bind(conversation_id)
581 .fetch_one(&self.pool)
582 .await?;
583 Ok(row.0)
584 }
585
586 pub async fn count_messages_after(
592 &self,
593 conversation_id: ConversationId,
594 after_id: MessageId,
595 ) -> Result<i64, MemoryError> {
596 let row: (i64,) =
597 zeph_db::query_as(
598 sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
599 )
600 .bind(conversation_id)
601 .bind(after_id)
602 .fetch_one(&self.pool)
603 .await?;
604 Ok(row.0)
605 }
606
607 pub async fn keyword_search(
616 &self,
617 query: &str,
618 limit: usize,
619 conversation_id: Option<ConversationId>,
620 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
621 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
622 let safe_query = sanitize_fts_query(query);
623 if safe_query.is_empty() {
624 return Ok(Vec::new());
625 }
626
627 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
628 zeph_db::query_as(
629 sql!("SELECT m.id, -rank AS score \
630 FROM messages_fts f \
631 JOIN messages m ON m.id = f.rowid \
632 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
633 ORDER BY rank \
634 LIMIT ?"),
635 )
636 .bind(&safe_query)
637 .bind(cid)
638 .bind(effective_limit)
639 .fetch_all(&self.pool)
640 .await?
641 } else {
642 zeph_db::query_as(sql!(
643 "SELECT m.id, -rank AS score \
644 FROM messages_fts f \
645 JOIN messages m ON m.id = f.rowid \
646 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
647 ORDER BY rank \
648 LIMIT ?"
649 ))
650 .bind(&safe_query)
651 .bind(effective_limit)
652 .fetch_all(&self.pool)
653 .await?
654 };
655
656 Ok(rows)
657 }
658
659 pub async fn keyword_search_with_time_range(
672 &self,
673 query: &str,
674 limit: usize,
675 conversation_id: Option<ConversationId>,
676 after: Option<&str>,
677 before: Option<&str>,
678 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
679 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
680 let safe_query = sanitize_fts_query(query);
681 if safe_query.is_empty() {
682 return Ok(Vec::new());
683 }
684
685 let after_clause = if after.is_some() {
687 " AND m.created_at > ?"
688 } else {
689 ""
690 };
691 let before_clause = if before.is_some() {
692 " AND m.created_at < ?"
693 } else {
694 ""
695 };
696 let conv_clause = if conversation_id.is_some() {
697 " AND m.conversation_id = ?"
698 } else {
699 ""
700 };
701
702 let sql = format!(
703 "SELECT m.id, -rank AS score \
704 FROM messages_fts f \
705 JOIN messages m ON m.id = f.rowid \
706 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
707 {after_clause}{before_clause}{conv_clause} \
708 ORDER BY rank \
709 LIMIT ?"
710 );
711
712 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
713 if let Some(a) = after {
714 q = q.bind(a);
715 }
716 if let Some(b) = before {
717 q = q.bind(b);
718 }
719 if let Some(cid) = conversation_id {
720 q = q.bind(cid);
721 }
722 q = q.bind(effective_limit);
723
724 Ok(q.fetch_all(&self.pool).await?)
725 }
726
727 pub async fn message_timestamps(
735 &self,
736 ids: &[MessageId],
737 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
738 if ids.is_empty() {
739 return Ok(std::collections::HashMap::new());
740 }
741
742 let placeholders: String =
743 zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
744 let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
745 let query = format!(
746 "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
747 );
748 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
749 for &id in ids {
750 q = q.bind(id);
751 }
752
753 let rows = q.fetch_all(&self.pool).await?;
754 Ok(rows.into_iter().collect())
755 }
756
757 pub async fn load_messages_range(
763 &self,
764 conversation_id: ConversationId,
765 after_message_id: MessageId,
766 limit: usize,
767 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
768 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
769
770 let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
771 "SELECT id, role, content FROM messages \
772 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
773 ORDER BY id ASC LIMIT ?"
774 ))
775 .bind(conversation_id)
776 .bind(after_message_id)
777 .bind(effective_limit)
778 .fetch_all(&self.pool)
779 .await?;
780
781 Ok(rows)
782 }
783
784 pub async fn get_eviction_candidates(
792 &self,
793 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
794 let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
795 "SELECT id, created_at, last_accessed, access_count \
796 FROM messages WHERE deleted_at IS NULL"
797 ))
798 .fetch_all(&self.pool)
799 .await?;
800
801 Ok(rows
802 .into_iter()
803 .map(
804 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
805 id,
806 created_at,
807 last_accessed,
808 access_count: access_count.try_into().unwrap_or(0),
809 },
810 )
811 .collect())
812 }
813
814 pub async fn soft_delete_messages(
822 &self,
823 ids: &[MessageId],
824 ) -> Result<(), crate::error::MemoryError> {
825 if ids.is_empty() {
826 return Ok(());
827 }
828 for &id in ids {
830 zeph_db::query(
831 sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
832 )
833 .bind(id)
834 .execute(&self.pool)
835 .await?;
836 }
837 Ok(())
838 }
839
840 pub async fn get_soft_deleted_message_ids(
846 &self,
847 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
848 let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
849 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
850 ))
851 .fetch_all(&self.pool)
852 .await?;
853 Ok(rows.into_iter().map(|(id,)| id).collect())
854 }
855
856 pub async fn mark_qdrant_cleaned(
862 &self,
863 ids: &[MessageId],
864 ) -> Result<(), crate::error::MemoryError> {
865 for &id in ids {
866 zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
867 .bind(id)
868 .execute(&self.pool)
869 .await?;
870 }
871 Ok(())
872 }
873
874 pub async fn fetch_importance_scores(
882 &self,
883 ids: &[MessageId],
884 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
885 if ids.is_empty() {
886 return Ok(std::collections::HashMap::new());
887 }
888 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
889 let query = format!(
890 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
891 );
892 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
893 for &id in ids {
894 q = q.bind(id);
895 }
896 let rows = q.fetch_all(&self.pool).await?;
897 Ok(rows.into_iter().collect())
898 }
899
900 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
908 if ids.is_empty() {
909 return Ok(());
910 }
911 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
912 let query = format!(
913 "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
914 WHERE id IN ({placeholders})"
915 );
916 let mut q = zeph_db::query(&query);
917 for &id in ids {
918 q = q.bind(id);
919 }
920 q.execute(&self.pool).await?;
921 Ok(())
922 }
923
924 pub async fn find_promotion_candidates(
933 &self,
934 min_sessions: u32,
935 batch_size: usize,
936 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
937 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
938 let min = i64::from(min_sessions);
939 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
940 "SELECT id, conversation_id, content, session_count, importance_score \
941 FROM messages \
942 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
943 ORDER BY session_count DESC, importance_score DESC \
944 LIMIT ?"
945 ))
946 .bind(min)
947 .bind(limit)
948 .fetch_all(&self.pool)
949 .await?;
950
951 Ok(rows
952 .into_iter()
953 .map(
954 |(id, conversation_id, content, session_count, importance_score)| {
955 PromotionCandidate {
956 id,
957 conversation_id,
958 content,
959 session_count: session_count.try_into().unwrap_or(0),
960 importance_score,
961 }
962 },
963 )
964 .collect())
965 }
966
967 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
975 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
976 "SELECT tier, COUNT(*) FROM messages \
977 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
978 GROUP BY tier"
979 ))
980 .fetch_all(&self.pool)
981 .await?;
982
983 let mut episodic = 0i64;
984 let mut semantic = 0i64;
985 for (tier, count) in rows {
986 match tier.as_str() {
987 "episodic" => episodic = count,
988 "semantic" => semantic = count,
989 _ => {}
990 }
991 }
992 Ok((episodic, semantic))
993 }
994
995 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1001 let row: (i64,) = zeph_db::query_as(sql!(
1002 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1003 ))
1004 .fetch_one(&self.pool)
1005 .await?;
1006 Ok(row.0)
1007 }
1008
1009 pub async fn promote_to_semantic(
1022 &self,
1023 conversation_id: ConversationId,
1024 merged_content: &str,
1025 original_ids: &[MessageId],
1026 ) -> Result<MessageId, MemoryError> {
1027 if original_ids.is_empty() {
1028 return Err(MemoryError::Other(
1029 "promote_to_semantic: original_ids must not be empty".into(),
1030 ));
1031 }
1032
1033 let mut tx = begin_write(&self.pool).await?;
1036
1037 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1039 let promote_insert_raw = format!(
1040 "INSERT INTO messages \
1041 (conversation_id, role, content, parts, agent_visible, user_visible, \
1042 tier, promotion_timestamp) \
1043 VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1044 RETURNING id"
1045 );
1046 let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1047 let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1048 .bind(conversation_id)
1049 .bind(merged_content)
1050 .fetch_one(&mut *tx)
1051 .await?;
1052
1053 let new_id = row.0;
1054
1055 for &id in original_ids {
1057 zeph_db::query(sql!(
1058 "UPDATE messages \
1059 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1060 WHERE id = ? AND deleted_at IS NULL"
1061 ))
1062 .bind(id)
1063 .execute(&mut *tx)
1064 .await?;
1065 }
1066
1067 tx.commit().await?;
1068 Ok(new_id)
1069 }
1070
1071 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1080 if ids.is_empty() {
1081 return Ok(0);
1082 }
1083 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1084 let manual_promote_raw = format!(
1085 "UPDATE messages \
1086 SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1087 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1088 );
1089 let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1090 let mut count = 0usize;
1091 for &id in ids {
1092 let result = zeph_db::query(&manual_promote_sql)
1093 .bind(id)
1094 .execute(&self.pool)
1095 .await?;
1096 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1097 }
1098 Ok(count)
1099 }
1100
1101 pub async fn increment_session_counts_for_conversation(
1110 &self,
1111 conversation_id: ConversationId,
1112 ) -> Result<(), MemoryError> {
1113 zeph_db::query(sql!(
1114 "UPDATE messages SET session_count = session_count + 1 \
1115 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1116 ))
1117 .bind(conversation_id)
1118 .execute(&self.pool)
1119 .await?;
1120 Ok(())
1121 }
1122
1123 pub async fn fetch_tiers(
1131 &self,
1132 ids: &[MessageId],
1133 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1134 if ids.is_empty() {
1135 return Ok(std::collections::HashMap::new());
1136 }
1137 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1138 let query = format!(
1139 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1140 );
1141 let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1142 for &id in ids {
1143 q = q.bind(id);
1144 }
1145 let rows = q.fetch_all(&self.pool).await?;
1146 Ok(rows.into_iter().collect())
1147 }
1148
1149 pub async fn conversations_with_unconsolidated_messages(
1157 &self,
1158 ) -> Result<Vec<ConversationId>, MemoryError> {
1159 let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1160 "SELECT DISTINCT conversation_id FROM messages \
1161 WHERE consolidated = 0 AND deleted_at IS NULL"
1162 ))
1163 .fetch_all(&self.pool)
1164 .await?;
1165 Ok(rows.into_iter().map(|(id,)| id).collect())
1166 }
1167
1168 pub async fn find_unconsolidated_messages(
1177 &self,
1178 conversation_id: ConversationId,
1179 limit: usize,
1180 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1181 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1182 let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1183 "SELECT id, content FROM messages \
1184 WHERE conversation_id = ? \
1185 AND consolidated = 0 \
1186 AND deleted_at IS NULL \
1187 ORDER BY id ASC \
1188 LIMIT ?"
1189 ))
1190 .bind(conversation_id)
1191 .bind(limit)
1192 .fetch_all(&self.pool)
1193 .await?;
1194 Ok(rows)
1195 }
1196
1197 pub async fn find_consolidated_for_source(
1206 &self,
1207 source_id: MessageId,
1208 ) -> Result<Option<MessageId>, MemoryError> {
1209 let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1210 "SELECT consolidated_id FROM memory_consolidation_sources \
1211 WHERE source_id = ? \
1212 LIMIT 1"
1213 ))
1214 .bind(source_id)
1215 .fetch_optional(&self.pool)
1216 .await?;
1217 Ok(row.map(|(id,)| id))
1218 }
1219
1220 pub async fn apply_consolidation_merge(
1234 &self,
1235 conversation_id: ConversationId,
1236 role: &str,
1237 merged_content: &str,
1238 source_ids: &[MessageId],
1239 confidence: f32,
1240 confidence_threshold: f32,
1241 ) -> Result<bool, MemoryError> {
1242 if confidence < confidence_threshold {
1243 return Ok(false);
1244 }
1245 if source_ids.is_empty() {
1246 return Ok(false);
1247 }
1248
1249 let mut tx = self.pool.begin().await?;
1250
1251 let importance = crate::semantic::importance::compute_importance(merged_content, role);
1252 let row: (MessageId,) = zeph_db::query_as(sql!(
1253 "INSERT INTO messages \
1254 (conversation_id, role, content, parts, agent_visible, user_visible, \
1255 importance_score, consolidated, consolidation_confidence) \
1256 VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1257 RETURNING id"
1258 ))
1259 .bind(conversation_id)
1260 .bind(role)
1261 .bind(merged_content)
1262 .bind(importance)
1263 .bind(confidence)
1264 .fetch_one(&mut *tx)
1265 .await?;
1266 let consolidated_id = row.0;
1267
1268 let consol_sql = format!(
1269 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1270 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1271 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1272 );
1273 for &source_id in source_ids {
1274 zeph_db::query(&consol_sql)
1275 .bind(consolidated_id)
1276 .bind(source_id)
1277 .execute(&mut *tx)
1278 .await?;
1279
1280 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1282 .bind(source_id)
1283 .execute(&mut *tx)
1284 .await?;
1285 }
1286
1287 tx.commit().await?;
1288 Ok(true)
1289 }
1290
1291 pub async fn apply_consolidation_update(
1304 &self,
1305 target_id: MessageId,
1306 new_content: &str,
1307 additional_source_ids: &[MessageId],
1308 confidence: f32,
1309 confidence_threshold: f32,
1310 ) -> Result<bool, MemoryError> {
1311 if confidence < confidence_threshold {
1312 return Ok(false);
1313 }
1314
1315 let mut tx = self.pool.begin().await?;
1316
1317 zeph_db::query(sql!(
1318 "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1319 ))
1320 .bind(new_content)
1321 .bind(confidence)
1322 .bind(target_id)
1323 .execute(&mut *tx)
1324 .await?;
1325
1326 let consol_sql = format!(
1327 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1328 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1329 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1330 );
1331 for &source_id in additional_source_ids {
1332 zeph_db::query(&consol_sql)
1333 .bind(target_id)
1334 .bind(source_id)
1335 .execute(&mut *tx)
1336 .await?;
1337
1338 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1339 .bind(source_id)
1340 .execute(&mut *tx)
1341 .await?;
1342 }
1343
1344 tx.commit().await?;
1345 Ok(true)
1346 }
1347
1348 pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1358 zeph_db::query(sql!(
1359 "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1360 ))
1361 .bind(score)
1362 .bind(id)
1363 .execute(&self.pool)
1364 .await?;
1365 Ok(())
1366 }
1367
1368 pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1376 let row: Option<(f64,)> = zeph_db::query_as(sql!(
1377 "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1378 ))
1379 .bind(id)
1380 .fetch_optional(&self.pool)
1381 .await?;
1382 Ok(row.map(|(s,)| s))
1383 }
1384
1385 pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1393 self.increment_access_counts(ids).await
1394 }
1395
1396 pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1404 for &id in ids {
1405 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1406 .bind(id)
1407 .execute(&self.pool)
1408 .await?;
1409 }
1410 Ok(())
1411 }
1412
1413 pub async fn run_forgetting_sweep_tx(
1429 &self,
1430 config: &zeph_common::config::memory::ForgettingConfig,
1431 ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1432 let mut tx = self.pool.begin().await?;
1433
1434 let decay = f64::from(config.decay_rate);
1435 let floor = f64::from(config.forgetting_floor);
1436 let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1437 let replay_hours = i64::from(config.replay_window_hours);
1438 let replay_min_access = i64::from(config.replay_min_access_count);
1439 let protect_hours = i64::from(config.protect_recent_hours);
1440 let protect_min_access = i64::from(config.protect_min_access_count);
1441
1442 let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1445 "SELECT id FROM messages \
1446 WHERE deleted_at IS NULL AND consolidated = 0 \
1447 ORDER BY importance_score ASC \
1448 LIMIT ?"
1449 ))
1450 .bind(batch)
1451 .fetch_all(&mut *tx)
1452 .await?;
1453
1454 #[allow(clippy::cast_possible_truncation)]
1455 let downscaled = candidate_ids.len() as u32;
1456
1457 if downscaled > 0 {
1458 let placeholders: String = candidate_ids
1459 .iter()
1460 .map(|_| "?")
1461 .collect::<Vec<_>>()
1462 .join(",");
1463 let downscale_sql = format!(
1464 "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1465 WHERE id IN ({placeholders})"
1466 );
1467 let mut q = zeph_db::query(&downscale_sql);
1468 for &(id,) in &candidate_ids {
1469 q = q.bind(id);
1470 }
1471 q.execute(&mut *tx).await?;
1472 }
1473
1474 let replayed = if downscaled > 0 {
1480 let replay_placeholders: String = candidate_ids
1481 .iter()
1482 .map(|_| "?")
1483 .collect::<Vec<_>>()
1484 .join(",");
1485 let replay_sql = format!(
1486 "UPDATE messages \
1487 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1488 WHERE id IN ({replay_placeholders}) \
1489 AND (\
1490 (last_accessed IS NOT NULL \
1491 AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1492 OR access_count >= ?\
1493 )"
1494 );
1495 let mut rq = zeph_db::query(&replay_sql);
1496 for &(id,) in &candidate_ids {
1497 rq = rq.bind(id);
1498 }
1499 let replay_result = rq
1500 .bind(replay_hours)
1501 .bind(replay_min_access)
1502 .execute(&mut *tx)
1503 .await?;
1504 #[allow(clippy::cast_possible_truncation)]
1505 let n = replay_result.rows_affected() as u32;
1506 n
1507 } else {
1508 0
1509 };
1510
1511 let prune_sql = format!(
1513 "UPDATE messages \
1514 SET deleted_at = CURRENT_TIMESTAMP \
1515 WHERE deleted_at IS NULL AND consolidated = 0 \
1516 AND importance_score < {floor} \
1517 AND (\
1518 last_accessed IS NULL \
1519 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1520 ) \
1521 AND access_count < ?"
1522 );
1523 let prune_result = zeph_db::query(&prune_sql)
1524 .bind(protect_hours)
1525 .bind(protect_min_access)
1526 .execute(&mut *tx)
1527 .await?;
1528 #[allow(clippy::cast_possible_truncation)]
1529 let pruned = prune_result.rows_affected() as u32;
1530
1531 tx.commit().await?;
1532
1533 Ok(crate::forgetting::ForgettingResult {
1534 downscaled,
1535 replayed,
1536 pruned,
1537 })
1538 }
1539}
1540
1541#[derive(Debug, Clone)]
1543pub struct PromotionCandidate {
1544 pub id: MessageId,
1545 pub conversation_id: ConversationId,
1546 pub content: String,
1547 pub session_count: u32,
1548 pub importance_score: f64,
1549}
1550
1551#[cfg(test)]
1552mod tests;