1use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, placeholder_list, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, MessageVisibility, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18 match s {
19 "assistant" => Role::Assistant,
20 "system" => Role::System,
21 _ => Role::User,
22 }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27 match role {
28 Role::System => "system",
29 Role::User => "user",
30 Role::Assistant => "assistant",
31 }
32}
33
34fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37 match key {
38 "Text" => Some("text"),
39 "ToolOutput" => Some("tool_output"),
40 "Recall" => Some("recall"),
41 "CodeContext" => Some("code_context"),
42 "Summary" => Some("summary"),
43 "CrossSession" => Some("cross_session"),
44 "ToolUse" => Some("tool_use"),
45 "ToolResult" => Some("tool_result"),
46 "Image" => Some("image"),
47 "ThinkingBlock" => Some("thinking_block"),
48 "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49 "Compaction" => Some("compaction"),
50 _ => None,
51 }
52}
53
54fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62 let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63 let mut result = Vec::with_capacity(array.len());
64 for element in array {
65 let obj = element.as_object()?;
66 if obj.contains_key("kind") {
67 return None;
68 }
69 if obj.len() != 1 {
70 return None;
71 }
72 let (key, inner) = obj.iter().next()?;
73 let kind = legacy_key_to_kind(key)?;
74 let mut new_obj = match inner {
75 serde_json::Value::Object(m) => m.clone(),
76 other => {
78 let mut m = serde_json::Map::new();
79 m.insert("data".to_string(), other.clone());
80 m
81 }
82 };
83 new_obj.insert(
84 "kind".to_string(),
85 serde_json::Value::String(kind.to_string()),
86 );
87 let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88 result.push(part);
89 }
90 Some(result)
91}
92
93fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98 if parts_json == "[]" {
99 return vec![];
100 }
101 match serde_json::from_str(parts_json) {
102 Ok(p) => p,
103 Err(e) => {
104 if let Some(parts) = try_parse_legacy_parts(parts_json) {
105 let truncated = parts_json.chars().take(120).collect::<String>();
106 tracing::warn!(
107 role = %role_str,
108 parts_json = %truncated,
109 "loaded legacy-format message parts via compat path"
110 );
111 return parts;
112 }
113 let truncated = parts_json.chars().take(120).collect::<String>();
114 tracing::warn!(
115 role = %role_str,
116 parts_json = %truncated,
117 error = %e,
118 "failed to deserialize message parts, falling back to empty"
119 );
120 vec![]
121 }
122 }
123}
124
125impl SqliteStore {
126 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132 let row: (ConversationId,) = zeph_db::query_as(sql!(
133 "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134 ))
135 .fetch_one(&self.pool)
136 .await?;
137 Ok(row.0)
138 }
139
140 pub async fn save_message(
146 &self,
147 conversation_id: ConversationId,
148 role: &str,
149 content: &str,
150 ) -> Result<MessageId, MemoryError> {
151 self.save_message_with_parts(conversation_id, role, content, "[]")
152 .await
153 }
154
155 pub async fn save_message_with_parts(
161 &self,
162 conversation_id: ConversationId,
163 role: &str,
164 content: &str,
165 parts_json: &str,
166 ) -> Result<MessageId, MemoryError> {
167 self.save_message_with_metadata(
168 conversation_id,
169 role,
170 content,
171 parts_json,
172 MessageVisibility::Both,
173 )
174 .await
175 }
176
177 pub async fn save_message_with_category(
185 &self,
186 conversation_id: ConversationId,
187 role: &str,
188 content: &str,
189 category: Option<&str>,
190 ) -> Result<MessageId, MemoryError> {
191 let importance_score = crate::semantic::importance::compute_importance(content, role);
192 let row: (MessageId,) = zeph_db::query_as(sql!(
193 "INSERT INTO messages \
194 (conversation_id, role, content, parts, visibility, \
195 importance_score, category) \
196 VALUES (?, ?, ?, '[]', 'both', ?, ?) RETURNING id"
197 ))
198 .bind(conversation_id)
199 .bind(role)
200 .bind(content)
201 .bind(importance_score)
202 .bind(category)
203 .fetch_one(&self.pool)
204 .await?;
205 Ok(row.0)
206 }
207
208 pub async fn save_message_with_metadata(
214 &self,
215 conversation_id: ConversationId,
216 role: &str,
217 content: &str,
218 parts_json: &str,
219 visibility: MessageVisibility,
220 ) -> Result<MessageId, MemoryError> {
221 const MAX_BYTES: usize = 100 * 1024;
222
223 let content_cow: std::borrow::Cow<'_, str> = if content.len() > MAX_BYTES {
226 let boundary = content.floor_char_boundary(MAX_BYTES);
227 tracing::debug!(
228 original_bytes = content.len(),
229 "save_message: content exceeds 100KB, truncating"
230 );
231 std::borrow::Cow::Owned(format!(
232 "{}... [truncated, {} bytes total]",
233 &content[..boundary],
234 content.len()
235 ))
236 } else {
237 std::borrow::Cow::Borrowed(content)
238 };
239
240 let importance_score = crate::semantic::importance::compute_importance(&content_cow, role);
241 let row: (MessageId,) = zeph_db::query_as(
242 sql!("INSERT INTO messages (conversation_id, role, content, parts, visibility, importance_score) \
243 VALUES (?, ?, ?, ?, ?, ?) RETURNING id"),
244 )
245 .bind(conversation_id)
246 .bind(role)
247 .bind(content_cow.as_ref())
248 .bind(parts_json)
249 .bind(visibility.as_db_str())
250 .bind(importance_score)
251 .fetch_one(&self.pool)
252 .await?;
253 Ok(row.0)
254 }
255
256 pub async fn load_history(
262 &self,
263 conversation_id: ConversationId,
264 limit: u32,
265 ) -> Result<Vec<Message>, MemoryError> {
266 let rows: Vec<(String, String, String, String, i64)> = zeph_db::query_as(sql!(
267 "SELECT role, content, parts, visibility, id FROM (\
268 SELECT role, content, parts, visibility, id FROM messages \
269 WHERE conversation_id = ? AND deleted_at IS NULL \
270 ORDER BY id DESC \
271 LIMIT ?\
272 ) ORDER BY id ASC"
273 ))
274 .bind(conversation_id)
275 .bind(limit)
276 .fetch_all(&self.pool)
277 .await?;
278
279 let messages = rows
280 .into_iter()
281 .map(|(role_str, content, parts_json, visibility_str, row_id)| {
282 let parts = parse_parts_json(&role_str, &parts_json);
283 Message {
284 role: parse_role(&role_str),
285 content,
286 parts,
287 metadata: MessageMetadata {
288 visibility: MessageVisibility::from_db_str(&visibility_str),
289 compacted_at: None,
290 deferred_summary: None,
291 focus_pinned: false,
292 focus_marker_id: None,
293 db_id: Some(row_id),
294 },
295 }
296 })
297 .collect();
298 Ok(messages)
299 }
300
301 pub async fn load_history_filtered(
309 &self,
310 conversation_id: ConversationId,
311 limit: u32,
312 agent_visible: Option<bool>,
313 user_visible: Option<bool>,
314 ) -> Result<Vec<Message>, MemoryError> {
315 let exclude_user_only = agent_visible == Some(true);
321 let exclude_agent_only = user_visible == Some(true);
322
323 let rows: Vec<(String, String, String, String, i64)> = zeph_db::query_as(sql!(
324 "WITH recent AS (\
325 SELECT role, content, parts, visibility, id FROM messages \
326 WHERE conversation_id = ? \
327 AND deleted_at IS NULL \
328 AND (NOT ? OR visibility != 'user_only') \
329 AND (NOT ? OR visibility != 'agent_only') \
330 ORDER BY id DESC \
331 LIMIT ?\
332 ) SELECT role, content, parts, visibility, id FROM recent ORDER BY id ASC"
333 ))
334 .bind(conversation_id)
335 .bind(exclude_user_only)
336 .bind(exclude_agent_only)
337 .bind(limit)
338 .fetch_all(&self.pool)
339 .await?;
340
341 let messages = rows
342 .into_iter()
343 .map(|(role_str, content, parts_json, visibility_str, row_id)| {
344 let parts = parse_parts_json(&role_str, &parts_json);
345 Message {
346 role: parse_role(&role_str),
347 content,
348 parts,
349 metadata: MessageMetadata {
350 visibility: MessageVisibility::from_db_str(&visibility_str),
351 compacted_at: None,
352 deferred_summary: None,
353 focus_pinned: false,
354 focus_marker_id: None,
355 db_id: Some(row_id),
356 },
357 }
358 })
359 .collect();
360 Ok(messages)
361 }
362
363 pub async fn replace_conversation(
375 &self,
376 conversation_id: ConversationId,
377 compacted_range: std::ops::RangeInclusive<MessageId>,
378 summary_role: &str,
379 summary_content: &str,
380 ) -> Result<MessageId, MemoryError> {
381 let now = {
382 let secs = std::time::SystemTime::now()
383 .duration_since(std::time::UNIX_EPOCH)
384 .unwrap_or_default()
385 .as_secs();
386 format!("{secs}")
387 };
388 let start_id = compacted_range.start().0;
389 let end_id = compacted_range.end().0;
390
391 let mut tx = self.pool.begin().await?;
392
393 zeph_db::query(sql!(
394 "UPDATE messages SET visibility = 'user_only', compacted_at = ? \
395 WHERE conversation_id = ? AND id >= ? AND id <= ?"
396 ))
397 .bind(&now)
398 .bind(conversation_id)
399 .bind(start_id)
400 .bind(end_id)
401 .execute(&mut *tx)
402 .await?;
403
404 let row: (MessageId,) = zeph_db::query_as(sql!(
406 "INSERT INTO messages \
407 (conversation_id, role, content, parts, visibility) \
408 VALUES (?, ?, ?, '[]', 'agent_only') RETURNING id"
409 ))
410 .bind(conversation_id)
411 .bind(summary_role)
412 .bind(summary_content)
413 .fetch_one(&mut *tx)
414 .await?;
415
416 tx.commit().await?;
417
418 Ok(row.0)
419 }
420
421 pub async fn apply_tool_pair_summaries(
431 &self,
432 conversation_id: ConversationId,
433 hide_ids: &[i64],
434 summaries: &[String],
435 ) -> Result<(), MemoryError> {
436 if hide_ids.is_empty() && summaries.is_empty() {
437 return Ok(());
438 }
439
440 let now = std::time::SystemTime::now()
441 .duration_since(std::time::UNIX_EPOCH)
442 .unwrap_or_default()
443 .as_secs()
444 .to_string();
445
446 let mut tx = self.pool.begin().await?;
447
448 for &id in hide_ids {
449 zeph_db::query(sql!(
450 "UPDATE messages SET visibility = 'user_only', compacted_at = ? WHERE id = ?"
451 ))
452 .bind(&now)
453 .bind(id)
454 .execute(&mut *tx)
455 .await?;
456 }
457
458 for summary in summaries {
459 let content = format!("[tool summary] {summary}");
460 let parts = serde_json::to_string(&[MessagePart::Summary {
461 text: summary.clone(),
462 }])
463 .unwrap_or_else(|_| "[]".to_string());
464 zeph_db::query(sql!(
465 "INSERT INTO messages \
466 (conversation_id, role, content, parts, visibility) \
467 VALUES (?, 'assistant', ?, ?, 'agent_only')"
468 ))
469 .bind(conversation_id)
470 .bind(&content)
471 .bind(&parts)
472 .execute(&mut *tx)
473 .await?;
474 }
475
476 tx.commit().await?;
477 Ok(())
478 }
479
480 pub async fn oldest_message_ids(
486 &self,
487 conversation_id: ConversationId,
488 n: u32,
489 ) -> Result<Vec<MessageId>, MemoryError> {
490 let rows: Vec<(MessageId,)> = zeph_db::query_as(
491 sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
492 )
493 .bind(conversation_id)
494 .bind(n)
495 .fetch_all(&self.pool)
496 .await?;
497 Ok(rows.into_iter().map(|r| r.0).collect())
498 }
499
500 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
506 let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
507 "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
508 ))
509 .fetch_optional(&self.pool)
510 .await?;
511 Ok(row.map(|r| r.0))
512 }
513
514 pub async fn message_by_id(
520 &self,
521 message_id: MessageId,
522 ) -> Result<Option<Message>, MemoryError> {
523 let row: Option<(String, String, String, String)> = zeph_db::query_as(
524 sql!("SELECT role, content, parts, visibility FROM messages WHERE id = ? AND deleted_at IS NULL"),
525 )
526 .bind(message_id)
527 .fetch_optional(&self.pool)
528 .await?;
529
530 Ok(row.map(|(role_str, content, parts_json, visibility_str)| {
531 let parts = parse_parts_json(&role_str, &parts_json);
532 Message {
533 role: parse_role(&role_str),
534 content,
535 parts,
536 metadata: MessageMetadata {
537 visibility: MessageVisibility::from_db_str(&visibility_str),
538 compacted_at: None,
539 deferred_summary: None,
540 focus_pinned: false,
541 focus_marker_id: None,
542 db_id: None,
543 },
544 }
545 }))
546 }
547
548 pub async fn messages_by_ids(
554 &self,
555 ids: &[MessageId],
556 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
557 if ids.is_empty() {
558 return Ok(Vec::new());
559 }
560
561 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
562
563 let query = format!(
564 "SELECT id, role, content, parts FROM messages \
565 WHERE id IN ({placeholders}) AND visibility != 'user_only' AND deleted_at IS NULL"
566 );
567 let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
568 for &id in ids {
569 q = q.bind(id);
570 }
571
572 let rows = q.fetch_all(&self.pool).await?;
573
574 Ok(rows
575 .into_iter()
576 .map(|(id, role_str, content, parts_json)| {
577 let parts = parse_parts_json(&role_str, &parts_json);
578 (
579 id,
580 Message {
581 role: parse_role(&role_str),
582 content,
583 parts,
584 metadata: MessageMetadata::default(),
585 },
586 )
587 })
588 .collect())
589 }
590
591 pub async fn unembedded_message_ids(
597 &self,
598 limit: Option<usize>,
599 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
600 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
601
602 let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
603 "SELECT m.id, m.conversation_id, m.role, m.content \
604 FROM messages m \
605 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
606 WHERE em.id IS NULL AND m.deleted_at IS NULL \
607 ORDER BY m.id ASC \
608 LIMIT ?"
609 ))
610 .bind(effective_limit)
611 .fetch_all(&self.pool)
612 .await?;
613
614 Ok(rows)
615 }
616
617 pub async fn count_unembedded_messages(&self) -> Result<usize, MemoryError> {
623 let row: (i64,) = zeph_db::query_as(sql!(
624 "SELECT COUNT(*) FROM messages m \
625 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
626 WHERE em.id IS NULL AND m.deleted_at IS NULL"
627 ))
628 .fetch_one(&self.pool)
629 .await?;
630 Ok(usize::try_from(row.0).unwrap_or(usize::MAX))
631 }
632
633 pub fn stream_unembedded_messages(
644 &self,
645 limit: i64,
646 ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
647 {
648 zeph_db::query_as(sql!(
649 "SELECT m.id, m.conversation_id, m.role, m.content \
650 FROM messages m \
651 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
652 WHERE em.id IS NULL AND m.deleted_at IS NULL \
653 ORDER BY m.id ASC \
654 LIMIT ?"
655 ))
656 .bind(limit)
657 .fetch(&self.pool)
658 .map_err(MemoryError::from)
659 }
660
661 pub async fn count_messages(
667 &self,
668 conversation_id: ConversationId,
669 ) -> Result<i64, MemoryError> {
670 let row: (i64,) = zeph_db::query_as(sql!(
671 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
672 ))
673 .bind(conversation_id)
674 .fetch_one(&self.pool)
675 .await?;
676 Ok(row.0)
677 }
678
679 pub async fn count_messages_after(
685 &self,
686 conversation_id: ConversationId,
687 after_id: MessageId,
688 ) -> Result<i64, MemoryError> {
689 let row: (i64,) =
690 zeph_db::query_as(
691 sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
692 )
693 .bind(conversation_id)
694 .bind(after_id)
695 .fetch_one(&self.pool)
696 .await?;
697 Ok(row.0)
698 }
699
700 pub async fn keyword_search(
709 &self,
710 query: &str,
711 limit: usize,
712 conversation_id: Option<ConversationId>,
713 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
714 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
715 let safe_query = sanitize_fts_query(query);
716 if safe_query.is_empty() {
717 return Ok(Vec::new());
718 }
719
720 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
721 zeph_db::query_as(
722 sql!("SELECT m.id, -rank AS score \
723 FROM messages_fts f \
724 JOIN messages m ON m.id = f.rowid \
725 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
726 ORDER BY rank \
727 LIMIT ?"),
728 )
729 .bind(&safe_query)
730 .bind(cid)
731 .bind(effective_limit)
732 .fetch_all(&self.pool)
733 .await?
734 } else {
735 zeph_db::query_as(sql!(
736 "SELECT m.id, -rank AS score \
737 FROM messages_fts f \
738 JOIN messages m ON m.id = f.rowid \
739 WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
740 ORDER BY rank \
741 LIMIT ?"
742 ))
743 .bind(&safe_query)
744 .bind(effective_limit)
745 .fetch_all(&self.pool)
746 .await?
747 };
748
749 Ok(rows)
750 }
751
752 pub async fn keyword_search_with_time_range(
765 &self,
766 query: &str,
767 limit: usize,
768 conversation_id: Option<ConversationId>,
769 after: Option<&str>,
770 before: Option<&str>,
771 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
772 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
773 let safe_query = sanitize_fts_query(query);
774 if safe_query.is_empty() {
775 return Ok(Vec::new());
776 }
777
778 let after_clause = if after.is_some() {
780 " AND m.created_at > ?"
781 } else {
782 ""
783 };
784 let before_clause = if before.is_some() {
785 " AND m.created_at < ?"
786 } else {
787 ""
788 };
789 let conv_clause = if conversation_id.is_some() {
790 " AND m.conversation_id = ?"
791 } else {
792 ""
793 };
794
795 let sql = format!(
796 "SELECT m.id, -rank AS score \
797 FROM messages_fts f \
798 JOIN messages m ON m.id = f.rowid \
799 WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL\
800 {after_clause}{before_clause}{conv_clause} \
801 ORDER BY rank \
802 LIMIT ?"
803 );
804
805 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
806 if let Some(a) = after {
807 q = q.bind(a);
808 }
809 if let Some(b) = before {
810 q = q.bind(b);
811 }
812 if let Some(cid) = conversation_id {
813 q = q.bind(cid);
814 }
815 q = q.bind(effective_limit);
816
817 Ok(q.fetch_all(&self.pool).await?)
818 }
819
820 pub async fn message_timestamps(
828 &self,
829 ids: &[MessageId],
830 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
831 if ids.is_empty() {
832 return Ok(std::collections::HashMap::new());
833 }
834
835 let placeholders: String =
836 zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
837 let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
838 let query = format!(
839 "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
840 );
841 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
842 for &id in ids {
843 q = q.bind(id);
844 }
845
846 let rows = q.fetch_all(&self.pool).await?;
847 Ok(rows.into_iter().collect())
848 }
849
850 pub async fn load_messages_range(
856 &self,
857 conversation_id: ConversationId,
858 after_message_id: MessageId,
859 limit: usize,
860 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
861 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
862
863 let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
864 "SELECT id, role, content FROM messages \
865 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
866 ORDER BY id ASC LIMIT ?"
867 ))
868 .bind(conversation_id)
869 .bind(after_message_id)
870 .bind(effective_limit)
871 .fetch_all(&self.pool)
872 .await?;
873
874 Ok(rows)
875 }
876
877 pub async fn get_eviction_candidates(
885 &self,
886 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
887 let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
888 "SELECT id, created_at, last_accessed, access_count \
889 FROM messages WHERE deleted_at IS NULL"
890 ))
891 .fetch_all(&self.pool)
892 .await?;
893
894 Ok(rows
895 .into_iter()
896 .map(
897 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
898 id,
899 created_at,
900 last_accessed,
901 access_count: access_count.try_into().unwrap_or(0),
902 },
903 )
904 .collect())
905 }
906
907 pub async fn soft_delete_messages(
915 &self,
916 ids: &[MessageId],
917 ) -> Result<(), crate::error::MemoryError> {
918 if ids.is_empty() {
919 return Ok(());
920 }
921 for &id in ids {
923 zeph_db::query(
924 sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
925 )
926 .bind(id)
927 .execute(&self.pool)
928 .await?;
929 }
930 Ok(())
931 }
932
933 pub async fn get_soft_deleted_message_ids(
939 &self,
940 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
941 let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
942 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
943 ))
944 .fetch_all(&self.pool)
945 .await?;
946 Ok(rows.into_iter().map(|(id,)| id).collect())
947 }
948
949 pub async fn filter_out_preserved_episode_ids(
964 &self,
965 candidate_ids: &[MessageId],
966 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
967 const MAX_BATCH: usize = 490;
968 if candidate_ids.is_empty() {
969 return Ok(Vec::new());
970 }
971 let mut safe_to_delete: Vec<MessageId> = Vec::with_capacity(candidate_ids.len());
972 for chunk in candidate_ids.chunks(MAX_BATCH) {
973 let placeholders = placeholder_list(1, chunk.len());
974 let sql = format!(
975 "SELECT m.id \
976 FROM messages m \
977 WHERE m.id IN ({placeholders}) \
978 AND NOT EXISTS ( \
979 SELECT 1 \
980 FROM summaries s \
981 WHERE s.first_message_id IS NOT NULL \
982 AND s.last_message_id IS NOT NULL \
983 AND m.id >= s.first_message_id \
984 AND m.id <= s.last_message_id \
985 )"
986 );
987 let mut q = zeph_db::query_as::<_, (MessageId,)>(&sql);
988 for &id in chunk {
989 q = q.bind(id);
990 }
991 let rows: Vec<(MessageId,)> = q.fetch_all(&self.pool).await?;
992 safe_to_delete.extend(rows.into_iter().map(|(id,)| id));
993 }
994 Ok(safe_to_delete)
995 }
996
997 pub async fn mark_qdrant_cleaned(
1003 &self,
1004 ids: &[MessageId],
1005 ) -> Result<(), crate::error::MemoryError> {
1006 for &id in ids {
1007 zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
1008 .bind(id)
1009 .execute(&self.pool)
1010 .await?;
1011 }
1012 Ok(())
1013 }
1014
1015 pub async fn fetch_importance_scores(
1023 &self,
1024 ids: &[MessageId],
1025 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
1026 if ids.is_empty() {
1027 return Ok(std::collections::HashMap::new());
1028 }
1029 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1030 let query = format!(
1031 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1032 );
1033 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
1034 for &id in ids {
1035 q = q.bind(id);
1036 }
1037 let rows = q.fetch_all(&self.pool).await?;
1038 Ok(rows.into_iter().collect())
1039 }
1040
1041 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1049 if ids.is_empty() {
1050 return Ok(());
1051 }
1052 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1053 let query = format!(
1054 "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
1055 WHERE id IN ({placeholders})"
1056 );
1057 let mut q = zeph_db::query(&query);
1058 for &id in ids {
1059 q = q.bind(id);
1060 }
1061 q.execute(&self.pool).await?;
1062 Ok(())
1063 }
1064
1065 pub async fn find_promotion_candidates(
1074 &self,
1075 min_sessions: u32,
1076 batch_size: usize,
1077 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
1078 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
1079 let min = i64::from(min_sessions);
1080 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1081 "SELECT id, conversation_id, content, session_count, importance_score \
1082 FROM messages \
1083 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1084 ORDER BY session_count DESC, importance_score DESC \
1085 LIMIT ?"
1086 ))
1087 .bind(min)
1088 .bind(limit)
1089 .fetch_all(&self.pool)
1090 .await?;
1091
1092 Ok(rows
1093 .into_iter()
1094 .map(
1095 |(id, conversation_id, content, session_count, importance_score)| {
1096 PromotionCandidate {
1097 id,
1098 conversation_id,
1099 content,
1100 session_count: session_count.try_into().unwrap_or(0),
1101 importance_score,
1102 }
1103 },
1104 )
1105 .collect())
1106 }
1107
1108 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1116 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1117 "SELECT tier, COUNT(*) FROM messages \
1118 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1119 GROUP BY tier"
1120 ))
1121 .fetch_all(&self.pool)
1122 .await?;
1123
1124 let mut episodic = 0i64;
1125 let mut semantic = 0i64;
1126 for (tier, count) in rows {
1127 match tier.as_str() {
1128 "episodic" => episodic = count,
1129 "semantic" => semantic = count,
1130 _ => {}
1131 }
1132 }
1133 Ok((episodic, semantic))
1134 }
1135
1136 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1142 let row: (i64,) = zeph_db::query_as(sql!(
1143 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1144 ))
1145 .fetch_one(&self.pool)
1146 .await?;
1147 Ok(row.0)
1148 }
1149
1150 pub async fn promote_to_semantic(
1163 &self,
1164 conversation_id: ConversationId,
1165 merged_content: &str,
1166 original_ids: &[MessageId],
1167 ) -> Result<MessageId, MemoryError> {
1168 if original_ids.is_empty() {
1169 return Err(MemoryError::Other(
1170 "promote_to_semantic: original_ids must not be empty".into(),
1171 ));
1172 }
1173
1174 let mut tx = begin_write(&self.pool).await?;
1177
1178 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1180 let promote_insert_raw = format!(
1181 "INSERT INTO messages \
1182 (conversation_id, role, content, parts, visibility, \
1183 tier, promotion_timestamp) \
1184 VALUES (?, 'assistant', ?, '[]', 'agent_only', 'semantic', {epoch_now}) \
1185 RETURNING id"
1186 );
1187 let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1188 let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1189 .bind(conversation_id)
1190 .bind(merged_content)
1191 .fetch_one(&mut *tx)
1192 .await?;
1193
1194 let new_id = row.0;
1195
1196 for &id in original_ids {
1198 zeph_db::query(sql!(
1199 "UPDATE messages \
1200 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1201 WHERE id = ? AND deleted_at IS NULL"
1202 ))
1203 .bind(id)
1204 .execute(&mut *tx)
1205 .await?;
1206 }
1207
1208 tx.commit().await?;
1209 Ok(new_id)
1210 }
1211
1212 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1221 if ids.is_empty() {
1222 return Ok(0);
1223 }
1224 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1225 let manual_promote_raw = format!(
1226 "UPDATE messages \
1227 SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1228 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1229 );
1230 let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1231 let mut count = 0usize;
1232 for &id in ids {
1233 let result = zeph_db::query(&manual_promote_sql)
1234 .bind(id)
1235 .execute(&self.pool)
1236 .await?;
1237 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1238 }
1239 Ok(count)
1240 }
1241
1242 pub async fn increment_session_counts_for_conversation(
1251 &self,
1252 conversation_id: ConversationId,
1253 ) -> Result<(), MemoryError> {
1254 zeph_db::query(sql!(
1255 "UPDATE messages SET session_count = session_count + 1 \
1256 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1257 ))
1258 .bind(conversation_id)
1259 .execute(&self.pool)
1260 .await?;
1261 Ok(())
1262 }
1263
1264 pub async fn fetch_tiers(
1272 &self,
1273 ids: &[MessageId],
1274 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1275 if ids.is_empty() {
1276 return Ok(std::collections::HashMap::new());
1277 }
1278 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1279 let query = format!(
1280 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1281 );
1282 let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1283 for &id in ids {
1284 q = q.bind(id);
1285 }
1286 let rows = q.fetch_all(&self.pool).await?;
1287 Ok(rows.into_iter().collect())
1288 }
1289
1290 pub async fn conversations_with_unconsolidated_messages(
1298 &self,
1299 ) -> Result<Vec<ConversationId>, MemoryError> {
1300 let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1301 "SELECT DISTINCT conversation_id FROM messages \
1302 WHERE consolidated = 0 AND deleted_at IS NULL"
1303 ))
1304 .fetch_all(&self.pool)
1305 .await?;
1306 Ok(rows.into_iter().map(|(id,)| id).collect())
1307 }
1308
1309 pub async fn find_unconsolidated_messages(
1318 &self,
1319 conversation_id: ConversationId,
1320 limit: usize,
1321 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1322 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1323 let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1324 "SELECT id, content FROM messages \
1325 WHERE conversation_id = ? \
1326 AND consolidated = 0 \
1327 AND deleted_at IS NULL \
1328 ORDER BY id ASC \
1329 LIMIT ?"
1330 ))
1331 .bind(conversation_id)
1332 .bind(limit)
1333 .fetch_all(&self.pool)
1334 .await?;
1335 Ok(rows)
1336 }
1337
1338 pub async fn find_consolidated_for_source(
1347 &self,
1348 source_id: MessageId,
1349 ) -> Result<Option<MessageId>, MemoryError> {
1350 let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1351 "SELECT consolidated_id FROM memory_consolidation_sources \
1352 WHERE source_id = ? \
1353 LIMIT 1"
1354 ))
1355 .bind(source_id)
1356 .fetch_optional(&self.pool)
1357 .await?;
1358 Ok(row.map(|(id,)| id))
1359 }
1360
1361 pub async fn apply_consolidation_merge(
1375 &self,
1376 conversation_id: ConversationId,
1377 role: &str,
1378 merged_content: &str,
1379 source_ids: &[MessageId],
1380 confidence: f32,
1381 confidence_threshold: f32,
1382 ) -> Result<bool, MemoryError> {
1383 if confidence < confidence_threshold {
1384 return Ok(false);
1385 }
1386 if source_ids.is_empty() {
1387 return Ok(false);
1388 }
1389
1390 let mut tx = self.pool.begin().await?;
1391
1392 let importance = crate::semantic::importance::compute_importance(merged_content, role);
1393 let row: (MessageId,) = zeph_db::query_as(sql!(
1394 "INSERT INTO messages \
1395 (conversation_id, role, content, parts, visibility, \
1396 importance_score, consolidated, consolidation_confidence) \
1397 VALUES (?, ?, ?, '[]', 'both', ?, 1, ?) \
1398 RETURNING id"
1399 ))
1400 .bind(conversation_id)
1401 .bind(role)
1402 .bind(merged_content)
1403 .bind(importance)
1404 .bind(confidence)
1405 .fetch_one(&mut *tx)
1406 .await?;
1407 let consolidated_id = row.0;
1408
1409 let consol_sql = format!(
1410 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1411 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1412 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1413 );
1414 for &source_id in source_ids {
1415 zeph_db::query(&consol_sql)
1416 .bind(consolidated_id)
1417 .bind(source_id)
1418 .execute(&mut *tx)
1419 .await?;
1420
1421 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1423 .bind(source_id)
1424 .execute(&mut *tx)
1425 .await?;
1426 }
1427
1428 tx.commit().await?;
1429 Ok(true)
1430 }
1431
1432 pub async fn apply_consolidation_update(
1445 &self,
1446 target_id: MessageId,
1447 new_content: &str,
1448 additional_source_ids: &[MessageId],
1449 confidence: f32,
1450 confidence_threshold: f32,
1451 ) -> Result<bool, MemoryError> {
1452 if confidence < confidence_threshold {
1453 return Ok(false);
1454 }
1455
1456 let mut tx = self.pool.begin().await?;
1457
1458 zeph_db::query(sql!(
1459 "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1460 ))
1461 .bind(new_content)
1462 .bind(confidence)
1463 .bind(target_id)
1464 .execute(&mut *tx)
1465 .await?;
1466
1467 let consol_sql = format!(
1468 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1469 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1470 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1471 );
1472 for &source_id in additional_source_ids {
1473 zeph_db::query(&consol_sql)
1474 .bind(target_id)
1475 .bind(source_id)
1476 .execute(&mut *tx)
1477 .await?;
1478
1479 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1480 .bind(source_id)
1481 .execute(&mut *tx)
1482 .await?;
1483 }
1484
1485 tx.commit().await?;
1486 Ok(true)
1487 }
1488
1489 pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1499 zeph_db::query(sql!(
1500 "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1501 ))
1502 .bind(score)
1503 .bind(id)
1504 .execute(&self.pool)
1505 .await?;
1506 Ok(())
1507 }
1508
1509 pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1517 let row: Option<(f64,)> = zeph_db::query_as(sql!(
1518 "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1519 ))
1520 .bind(id)
1521 .fetch_optional(&self.pool)
1522 .await?;
1523 Ok(row.map(|(s,)| s))
1524 }
1525
1526 pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1534 self.increment_access_counts(ids).await
1535 }
1536
1537 pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1545 for &id in ids {
1546 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1547 .bind(id)
1548 .execute(&self.pool)
1549 .await?;
1550 }
1551 Ok(())
1552 }
1553
1554 pub async fn run_forgetting_sweep_tx(
1570 &self,
1571 config: &zeph_common::config::memory::ForgettingConfig,
1572 ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1573 let mut tx = self.pool.begin().await?;
1574
1575 let decay = f64::from(config.decay_rate);
1576 let floor = f64::from(config.forgetting_floor);
1577 let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1578 let replay_hours = i64::from(config.replay_window_hours);
1579 let replay_min_access = i64::from(config.replay_min_access_count);
1580 let protect_hours = i64::from(config.protect_recent_hours);
1581 let protect_min_access = i64::from(config.protect_min_access_count);
1582
1583 let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1586 "SELECT id FROM messages \
1587 WHERE deleted_at IS NULL AND consolidated = 0 \
1588 ORDER BY importance_score ASC \
1589 LIMIT ?"
1590 ))
1591 .bind(batch)
1592 .fetch_all(&mut *tx)
1593 .await?;
1594
1595 #[allow(clippy::cast_possible_truncation)]
1596 let downscaled = candidate_ids.len() as u32;
1597
1598 if downscaled > 0 {
1599 let placeholders: String = candidate_ids
1600 .iter()
1601 .map(|_| "?")
1602 .collect::<Vec<_>>()
1603 .join(",");
1604 let downscale_sql = format!(
1605 "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1606 WHERE id IN ({placeholders})"
1607 );
1608 let mut q = zeph_db::query(&downscale_sql);
1609 for &(id,) in &candidate_ids {
1610 q = q.bind(id);
1611 }
1612 q.execute(&mut *tx).await?;
1613 }
1614
1615 let replayed = if downscaled > 0 {
1621 let replay_placeholders: String = candidate_ids
1622 .iter()
1623 .map(|_| "?")
1624 .collect::<Vec<_>>()
1625 .join(",");
1626 let replay_sql = format!(
1627 "UPDATE messages \
1628 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1629 WHERE id IN ({replay_placeholders}) \
1630 AND (\
1631 (last_accessed IS NOT NULL \
1632 AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1633 OR access_count >= ?\
1634 )"
1635 );
1636 let mut rq = zeph_db::query(&replay_sql);
1637 for &(id,) in &candidate_ids {
1638 rq = rq.bind(id);
1639 }
1640 let replay_result = rq
1641 .bind(replay_hours)
1642 .bind(replay_min_access)
1643 .execute(&mut *tx)
1644 .await?;
1645 #[allow(clippy::cast_possible_truncation)]
1646 let n = replay_result.rows_affected() as u32;
1647 n
1648 } else {
1649 0
1650 };
1651
1652 let prune_sql = format!(
1654 "UPDATE messages \
1655 SET deleted_at = CURRENT_TIMESTAMP \
1656 WHERE deleted_at IS NULL AND consolidated = 0 \
1657 AND importance_score < {floor} \
1658 AND (\
1659 last_accessed IS NULL \
1660 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1661 ) \
1662 AND access_count < ?"
1663 );
1664 let prune_result = zeph_db::query(&prune_sql)
1665 .bind(protect_hours)
1666 .bind(protect_min_access)
1667 .execute(&mut *tx)
1668 .await?;
1669 #[allow(clippy::cast_possible_truncation)]
1670 let pruned = prune_result.rows_affected() as u32;
1671
1672 tx.commit().await?;
1673
1674 Ok(crate::forgetting::ForgettingResult {
1675 downscaled,
1676 replayed,
1677 pruned,
1678 })
1679 }
1680}
1681
1682#[derive(Debug, Clone)]
1684pub struct PromotionCandidate {
1685 pub id: MessageId,
1686 pub conversation_id: ConversationId,
1687 pub content: String,
1688 pub session_count: u32,
1689 pub importance_score: f64,
1690}
1691
1692#[cfg(test)]
1693mod tests;