1use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, MessageVisibility, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18 match s {
19 "assistant" => Role::Assistant,
20 "system" => Role::System,
21 _ => Role::User,
22 }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27 match role {
28 Role::System => "system",
29 Role::User => "user",
30 Role::Assistant => "assistant",
31 }
32}
33
34fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37 match key {
38 "Text" => Some("text"),
39 "ToolOutput" => Some("tool_output"),
40 "Recall" => Some("recall"),
41 "CodeContext" => Some("code_context"),
42 "Summary" => Some("summary"),
43 "CrossSession" => Some("cross_session"),
44 "ToolUse" => Some("tool_use"),
45 "ToolResult" => Some("tool_result"),
46 "Image" => Some("image"),
47 "ThinkingBlock" => Some("thinking_block"),
48 "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49 "Compaction" => Some("compaction"),
50 _ => None,
51 }
52}
53
54fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62 let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63 let mut result = Vec::with_capacity(array.len());
64 for element in array {
65 let obj = element.as_object()?;
66 if obj.contains_key("kind") {
67 return None;
68 }
69 if obj.len() != 1 {
70 return None;
71 }
72 let (key, inner) = obj.iter().next()?;
73 let kind = legacy_key_to_kind(key)?;
74 let mut new_obj = match inner {
75 serde_json::Value::Object(m) => m.clone(),
76 other => {
78 let mut m = serde_json::Map::new();
79 m.insert("data".to_string(), other.clone());
80 m
81 }
82 };
83 new_obj.insert(
84 "kind".to_string(),
85 serde_json::Value::String(kind.to_string()),
86 );
87 let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88 result.push(part);
89 }
90 Some(result)
91}
92
93fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98 if parts_json == "[]" {
99 return vec![];
100 }
101 match serde_json::from_str(parts_json) {
102 Ok(p) => p,
103 Err(e) => {
104 if let Some(parts) = try_parse_legacy_parts(parts_json) {
105 let truncated = parts_json.chars().take(120).collect::<String>();
106 tracing::warn!(
107 role = %role_str,
108 parts_json = %truncated,
109 "loaded legacy-format message parts via compat path"
110 );
111 return parts;
112 }
113 let truncated = parts_json.chars().take(120).collect::<String>();
114 tracing::warn!(
115 role = %role_str,
116 parts_json = %truncated,
117 error = %e,
118 "failed to deserialize message parts, falling back to empty"
119 );
120 vec![]
121 }
122 }
123}
124
125impl SqliteStore {
126 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132 let row: (ConversationId,) = zeph_db::query_as(sql!(
133 "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134 ))
135 .fetch_one(&self.pool)
136 .await?;
137 Ok(row.0)
138 }
139
140 pub async fn save_message(
146 &self,
147 conversation_id: ConversationId,
148 role: &str,
149 content: &str,
150 ) -> Result<MessageId, MemoryError> {
151 self.save_message_with_parts(conversation_id, role, content, "[]")
152 .await
153 }
154
155 pub async fn save_message_with_parts(
161 &self,
162 conversation_id: ConversationId,
163 role: &str,
164 content: &str,
165 parts_json: &str,
166 ) -> Result<MessageId, MemoryError> {
167 self.save_message_with_metadata(
168 conversation_id,
169 role,
170 content,
171 parts_json,
172 MessageVisibility::Both,
173 )
174 .await
175 }
176
177 pub async fn save_message_with_category(
185 &self,
186 conversation_id: ConversationId,
187 role: &str,
188 content: &str,
189 category: Option<&str>,
190 ) -> Result<MessageId, MemoryError> {
191 let importance_score = crate::semantic::importance::compute_importance(content, role);
192 let row: (MessageId,) = zeph_db::query_as(sql!(
193 "INSERT INTO messages \
194 (conversation_id, role, content, parts, visibility, \
195 importance_score, category) \
196 VALUES (?, ?, ?, '[]', 'both', ?, ?) RETURNING id"
197 ))
198 .bind(conversation_id)
199 .bind(role)
200 .bind(content)
201 .bind(importance_score)
202 .bind(category)
203 .fetch_one(&self.pool)
204 .await?;
205 Ok(row.0)
206 }
207
208 pub async fn save_message_with_metadata(
214 &self,
215 conversation_id: ConversationId,
216 role: &str,
217 content: &str,
218 parts_json: &str,
219 visibility: MessageVisibility,
220 ) -> Result<MessageId, MemoryError> {
221 const MAX_BYTES: usize = 100 * 1024;
222
223 let content_cow: std::borrow::Cow<'_, str> = if content.len() > MAX_BYTES {
226 let boundary = content.floor_char_boundary(MAX_BYTES);
227 tracing::debug!(
228 original_bytes = content.len(),
229 "save_message: content exceeds 100KB, truncating"
230 );
231 std::borrow::Cow::Owned(format!(
232 "{}... [truncated, {} bytes total]",
233 &content[..boundary],
234 content.len()
235 ))
236 } else {
237 std::borrow::Cow::Borrowed(content)
238 };
239
240 let importance_score = crate::semantic::importance::compute_importance(&content_cow, role);
241 let row: (MessageId,) = zeph_db::query_as(
242 sql!("INSERT INTO messages (conversation_id, role, content, parts, visibility, importance_score) \
243 VALUES (?, ?, ?, ?, ?, ?) RETURNING id"),
244 )
245 .bind(conversation_id)
246 .bind(role)
247 .bind(content_cow.as_ref())
248 .bind(parts_json)
249 .bind(visibility.as_db_str())
250 .bind(importance_score)
251 .fetch_one(&self.pool)
252 .await?;
253 Ok(row.0)
254 }
255
256 pub async fn load_history(
262 &self,
263 conversation_id: ConversationId,
264 limit: u32,
265 ) -> Result<Vec<Message>, MemoryError> {
266 let rows: Vec<(String, String, String, String, i64)> = zeph_db::query_as(sql!(
267 "SELECT role, content, parts, visibility, id FROM (\
268 SELECT role, content, parts, visibility, id FROM messages \
269 WHERE conversation_id = ? AND deleted_at IS NULL \
270 ORDER BY id DESC \
271 LIMIT ?\
272 ) ORDER BY id ASC"
273 ))
274 .bind(conversation_id)
275 .bind(limit)
276 .fetch_all(&self.pool)
277 .await?;
278
279 let messages = rows
280 .into_iter()
281 .map(|(role_str, content, parts_json, visibility_str, row_id)| {
282 let parts = parse_parts_json(&role_str, &parts_json);
283 Message {
284 role: parse_role(&role_str),
285 content,
286 parts,
287 metadata: MessageMetadata {
288 visibility: MessageVisibility::from_db_str(&visibility_str),
289 compacted_at: None,
290 deferred_summary: None,
291 focus_pinned: false,
292 focus_marker_id: None,
293 db_id: Some(row_id),
294 },
295 }
296 })
297 .collect();
298 Ok(messages)
299 }
300
301 pub async fn load_history_filtered(
309 &self,
310 conversation_id: ConversationId,
311 limit: u32,
312 agent_visible: Option<bool>,
313 user_visible: Option<bool>,
314 ) -> Result<Vec<Message>, MemoryError> {
315 let exclude_user_only = agent_visible == Some(true);
321 let exclude_agent_only = user_visible == Some(true);
322
323 let rows: Vec<(String, String, String, String, i64)> = zeph_db::query_as(sql!(
324 "WITH recent AS (\
325 SELECT role, content, parts, visibility, id FROM messages \
326 WHERE conversation_id = ? \
327 AND deleted_at IS NULL \
328 AND (NOT ? OR visibility != 'user_only') \
329 AND (NOT ? OR visibility != 'agent_only') \
330 ORDER BY id DESC \
331 LIMIT ?\
332 ) SELECT role, content, parts, visibility, id FROM recent ORDER BY id ASC"
333 ))
334 .bind(conversation_id)
335 .bind(exclude_user_only)
336 .bind(exclude_agent_only)
337 .bind(limit)
338 .fetch_all(&self.pool)
339 .await?;
340
341 let messages = rows
342 .into_iter()
343 .map(|(role_str, content, parts_json, visibility_str, row_id)| {
344 let parts = parse_parts_json(&role_str, &parts_json);
345 Message {
346 role: parse_role(&role_str),
347 content,
348 parts,
349 metadata: MessageMetadata {
350 visibility: MessageVisibility::from_db_str(&visibility_str),
351 compacted_at: None,
352 deferred_summary: None,
353 focus_pinned: false,
354 focus_marker_id: None,
355 db_id: Some(row_id),
356 },
357 }
358 })
359 .collect();
360 Ok(messages)
361 }
362
363 pub async fn replace_conversation(
375 &self,
376 conversation_id: ConversationId,
377 compacted_range: std::ops::RangeInclusive<MessageId>,
378 summary_role: &str,
379 summary_content: &str,
380 ) -> Result<MessageId, MemoryError> {
381 let now = {
382 let secs = std::time::SystemTime::now()
383 .duration_since(std::time::UNIX_EPOCH)
384 .unwrap_or_default()
385 .as_secs();
386 format!("{secs}")
387 };
388 let start_id = compacted_range.start().0;
389 let end_id = compacted_range.end().0;
390
391 let mut tx = self.pool.begin().await?;
392
393 zeph_db::query(sql!(
394 "UPDATE messages SET visibility = 'user_only', compacted_at = ? \
395 WHERE conversation_id = ? AND id >= ? AND id <= ?"
396 ))
397 .bind(&now)
398 .bind(conversation_id)
399 .bind(start_id)
400 .bind(end_id)
401 .execute(&mut *tx)
402 .await?;
403
404 let row: (MessageId,) = zeph_db::query_as(sql!(
406 "INSERT INTO messages \
407 (conversation_id, role, content, parts, visibility) \
408 VALUES (?, ?, ?, '[]', 'agent_only') RETURNING id"
409 ))
410 .bind(conversation_id)
411 .bind(summary_role)
412 .bind(summary_content)
413 .fetch_one(&mut *tx)
414 .await?;
415
416 tx.commit().await?;
417
418 Ok(row.0)
419 }
420
421 pub async fn apply_tool_pair_summaries(
431 &self,
432 conversation_id: ConversationId,
433 hide_ids: &[i64],
434 summaries: &[String],
435 ) -> Result<(), MemoryError> {
436 if hide_ids.is_empty() && summaries.is_empty() {
437 return Ok(());
438 }
439
440 let now = std::time::SystemTime::now()
441 .duration_since(std::time::UNIX_EPOCH)
442 .unwrap_or_default()
443 .as_secs()
444 .to_string();
445
446 let mut tx = self.pool.begin().await?;
447
448 for &id in hide_ids {
449 zeph_db::query(sql!(
450 "UPDATE messages SET visibility = 'user_only', compacted_at = ? WHERE id = ?"
451 ))
452 .bind(&now)
453 .bind(id)
454 .execute(&mut *tx)
455 .await?;
456 }
457
458 for summary in summaries {
459 let content = format!("[tool summary] {summary}");
460 let parts = serde_json::to_string(&[MessagePart::Summary {
461 text: summary.clone(),
462 }])
463 .unwrap_or_else(|_| "[]".to_string());
464 zeph_db::query(sql!(
465 "INSERT INTO messages \
466 (conversation_id, role, content, parts, visibility) \
467 VALUES (?, 'assistant', ?, ?, 'agent_only')"
468 ))
469 .bind(conversation_id)
470 .bind(&content)
471 .bind(&parts)
472 .execute(&mut *tx)
473 .await?;
474 }
475
476 tx.commit().await?;
477 Ok(())
478 }
479
480 pub async fn oldest_message_ids(
486 &self,
487 conversation_id: ConversationId,
488 n: u32,
489 ) -> Result<Vec<MessageId>, MemoryError> {
490 let rows: Vec<(MessageId,)> = zeph_db::query_as(
491 sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
492 )
493 .bind(conversation_id)
494 .bind(n)
495 .fetch_all(&self.pool)
496 .await?;
497 Ok(rows.into_iter().map(|r| r.0).collect())
498 }
499
500 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
506 let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
507 "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
508 ))
509 .fetch_optional(&self.pool)
510 .await?;
511 Ok(row.map(|r| r.0))
512 }
513
514 pub async fn message_by_id(
520 &self,
521 message_id: MessageId,
522 ) -> Result<Option<Message>, MemoryError> {
523 let row: Option<(String, String, String, String)> = zeph_db::query_as(
524 sql!("SELECT role, content, parts, visibility FROM messages WHERE id = ? AND deleted_at IS NULL"),
525 )
526 .bind(message_id)
527 .fetch_optional(&self.pool)
528 .await?;
529
530 Ok(row.map(|(role_str, content, parts_json, visibility_str)| {
531 let parts = parse_parts_json(&role_str, &parts_json);
532 Message {
533 role: parse_role(&role_str),
534 content,
535 parts,
536 metadata: MessageMetadata {
537 visibility: MessageVisibility::from_db_str(&visibility_str),
538 compacted_at: None,
539 deferred_summary: None,
540 focus_pinned: false,
541 focus_marker_id: None,
542 db_id: None,
543 },
544 }
545 }))
546 }
547
548 pub async fn messages_by_ids(
554 &self,
555 ids: &[MessageId],
556 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
557 if ids.is_empty() {
558 return Ok(Vec::new());
559 }
560
561 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
562
563 let query = format!(
564 "SELECT id, role, content, parts FROM messages \
565 WHERE id IN ({placeholders}) AND visibility != 'user_only' AND deleted_at IS NULL"
566 );
567 let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
568 for &id in ids {
569 q = q.bind(id);
570 }
571
572 let rows = q.fetch_all(&self.pool).await?;
573
574 Ok(rows
575 .into_iter()
576 .map(|(id, role_str, content, parts_json)| {
577 let parts = parse_parts_json(&role_str, &parts_json);
578 (
579 id,
580 Message {
581 role: parse_role(&role_str),
582 content,
583 parts,
584 metadata: MessageMetadata::default(),
585 },
586 )
587 })
588 .collect())
589 }
590
591 pub async fn unembedded_message_ids(
597 &self,
598 limit: Option<usize>,
599 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
600 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
601
602 let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
603 "SELECT m.id, m.conversation_id, m.role, m.content \
604 FROM messages m \
605 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
606 WHERE em.id IS NULL AND m.deleted_at IS NULL \
607 ORDER BY m.id ASC \
608 LIMIT ?"
609 ))
610 .bind(effective_limit)
611 .fetch_all(&self.pool)
612 .await?;
613
614 Ok(rows)
615 }
616
617 pub async fn count_unembedded_messages(&self) -> Result<usize, MemoryError> {
623 let row: (i64,) = zeph_db::query_as(sql!(
624 "SELECT COUNT(*) FROM messages m \
625 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
626 WHERE em.id IS NULL AND m.deleted_at IS NULL"
627 ))
628 .fetch_one(&self.pool)
629 .await?;
630 Ok(usize::try_from(row.0).unwrap_or(usize::MAX))
631 }
632
633 pub fn stream_unembedded_messages(
644 &self,
645 limit: i64,
646 ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
647 {
648 zeph_db::query_as(sql!(
649 "SELECT m.id, m.conversation_id, m.role, m.content \
650 FROM messages m \
651 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
652 WHERE em.id IS NULL AND m.deleted_at IS NULL \
653 ORDER BY m.id ASC \
654 LIMIT ?"
655 ))
656 .bind(limit)
657 .fetch(&self.pool)
658 .map_err(MemoryError::from)
659 }
660
661 pub async fn count_messages(
667 &self,
668 conversation_id: ConversationId,
669 ) -> Result<i64, MemoryError> {
670 let row: (i64,) = zeph_db::query_as(sql!(
671 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
672 ))
673 .bind(conversation_id)
674 .fetch_one(&self.pool)
675 .await?;
676 Ok(row.0)
677 }
678
679 pub async fn count_messages_after(
685 &self,
686 conversation_id: ConversationId,
687 after_id: MessageId,
688 ) -> Result<i64, MemoryError> {
689 let row: (i64,) =
690 zeph_db::query_as(
691 sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
692 )
693 .bind(conversation_id)
694 .bind(after_id)
695 .fetch_one(&self.pool)
696 .await?;
697 Ok(row.0)
698 }
699
700 pub async fn keyword_search(
709 &self,
710 query: &str,
711 limit: usize,
712 conversation_id: Option<ConversationId>,
713 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
714 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
715 let safe_query = sanitize_fts_query(query);
716 if safe_query.is_empty() {
717 return Ok(Vec::new());
718 }
719
720 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
721 zeph_db::query_as(
722 sql!("SELECT m.id, -rank AS score \
723 FROM messages_fts f \
724 JOIN messages m ON m.id = f.rowid \
725 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
726 ORDER BY rank \
727 LIMIT ?"),
728 )
729 .bind(&safe_query)
730 .bind(cid)
731 .bind(effective_limit)
732 .fetch_all(&self.pool)
733 .await?
734 } else {
735 zeph_db::query_as(sql!(
736 "SELECT m.id, -rank AS score \
737 FROM messages_fts f \
738 JOIN messages m ON m.id = f.rowid \
739 WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
740 ORDER BY rank \
741 LIMIT ?"
742 ))
743 .bind(&safe_query)
744 .bind(effective_limit)
745 .fetch_all(&self.pool)
746 .await?
747 };
748
749 Ok(rows)
750 }
751
752 pub async fn keyword_search_with_time_range(
765 &self,
766 query: &str,
767 limit: usize,
768 conversation_id: Option<ConversationId>,
769 after: Option<&str>,
770 before: Option<&str>,
771 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
772 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
773 let safe_query = sanitize_fts_query(query);
774 if safe_query.is_empty() {
775 return Ok(Vec::new());
776 }
777
778 let after_clause = if after.is_some() {
780 " AND m.created_at > ?"
781 } else {
782 ""
783 };
784 let before_clause = if before.is_some() {
785 " AND m.created_at < ?"
786 } else {
787 ""
788 };
789 let conv_clause = if conversation_id.is_some() {
790 " AND m.conversation_id = ?"
791 } else {
792 ""
793 };
794
795 let sql = format!(
796 "SELECT m.id, -rank AS score \
797 FROM messages_fts f \
798 JOIN messages m ON m.id = f.rowid \
799 WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL\
800 {after_clause}{before_clause}{conv_clause} \
801 ORDER BY rank \
802 LIMIT ?"
803 );
804
805 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
806 if let Some(a) = after {
807 q = q.bind(a);
808 }
809 if let Some(b) = before {
810 q = q.bind(b);
811 }
812 if let Some(cid) = conversation_id {
813 q = q.bind(cid);
814 }
815 q = q.bind(effective_limit);
816
817 Ok(q.fetch_all(&self.pool).await?)
818 }
819
820 pub async fn message_timestamps(
828 &self,
829 ids: &[MessageId],
830 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
831 if ids.is_empty() {
832 return Ok(std::collections::HashMap::new());
833 }
834
835 let placeholders: String =
836 zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
837 let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
838 let query = format!(
839 "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
840 );
841 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
842 for &id in ids {
843 q = q.bind(id);
844 }
845
846 let rows = q.fetch_all(&self.pool).await?;
847 Ok(rows.into_iter().collect())
848 }
849
850 pub async fn load_messages_range(
856 &self,
857 conversation_id: ConversationId,
858 after_message_id: MessageId,
859 limit: usize,
860 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
861 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
862
863 let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
864 "SELECT id, role, content FROM messages \
865 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
866 ORDER BY id ASC LIMIT ?"
867 ))
868 .bind(conversation_id)
869 .bind(after_message_id)
870 .bind(effective_limit)
871 .fetch_all(&self.pool)
872 .await?;
873
874 Ok(rows)
875 }
876
877 pub async fn get_eviction_candidates(
885 &self,
886 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
887 let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
888 "SELECT id, created_at, last_accessed, access_count \
889 FROM messages WHERE deleted_at IS NULL"
890 ))
891 .fetch_all(&self.pool)
892 .await?;
893
894 Ok(rows
895 .into_iter()
896 .map(
897 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
898 id,
899 created_at,
900 last_accessed,
901 access_count: access_count.try_into().unwrap_or(0),
902 },
903 )
904 .collect())
905 }
906
907 pub async fn soft_delete_messages(
915 &self,
916 ids: &[MessageId],
917 ) -> Result<(), crate::error::MemoryError> {
918 if ids.is_empty() {
919 return Ok(());
920 }
921 for &id in ids {
923 zeph_db::query(
924 sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
925 )
926 .bind(id)
927 .execute(&self.pool)
928 .await?;
929 }
930 Ok(())
931 }
932
933 pub async fn get_soft_deleted_message_ids(
939 &self,
940 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
941 let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
942 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
943 ))
944 .fetch_all(&self.pool)
945 .await?;
946 Ok(rows.into_iter().map(|(id,)| id).collect())
947 }
948
949 pub async fn mark_qdrant_cleaned(
955 &self,
956 ids: &[MessageId],
957 ) -> Result<(), crate::error::MemoryError> {
958 for &id in ids {
959 zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
960 .bind(id)
961 .execute(&self.pool)
962 .await?;
963 }
964 Ok(())
965 }
966
967 pub async fn fetch_importance_scores(
975 &self,
976 ids: &[MessageId],
977 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
978 if ids.is_empty() {
979 return Ok(std::collections::HashMap::new());
980 }
981 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
982 let query = format!(
983 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
984 );
985 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
986 for &id in ids {
987 q = q.bind(id);
988 }
989 let rows = q.fetch_all(&self.pool).await?;
990 Ok(rows.into_iter().collect())
991 }
992
993 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1001 if ids.is_empty() {
1002 return Ok(());
1003 }
1004 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1005 let query = format!(
1006 "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
1007 WHERE id IN ({placeholders})"
1008 );
1009 let mut q = zeph_db::query(&query);
1010 for &id in ids {
1011 q = q.bind(id);
1012 }
1013 q.execute(&self.pool).await?;
1014 Ok(())
1015 }
1016
1017 pub async fn find_promotion_candidates(
1026 &self,
1027 min_sessions: u32,
1028 batch_size: usize,
1029 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
1030 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
1031 let min = i64::from(min_sessions);
1032 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1033 "SELECT id, conversation_id, content, session_count, importance_score \
1034 FROM messages \
1035 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1036 ORDER BY session_count DESC, importance_score DESC \
1037 LIMIT ?"
1038 ))
1039 .bind(min)
1040 .bind(limit)
1041 .fetch_all(&self.pool)
1042 .await?;
1043
1044 Ok(rows
1045 .into_iter()
1046 .map(
1047 |(id, conversation_id, content, session_count, importance_score)| {
1048 PromotionCandidate {
1049 id,
1050 conversation_id,
1051 content,
1052 session_count: session_count.try_into().unwrap_or(0),
1053 importance_score,
1054 }
1055 },
1056 )
1057 .collect())
1058 }
1059
1060 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1068 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1069 "SELECT tier, COUNT(*) FROM messages \
1070 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1071 GROUP BY tier"
1072 ))
1073 .fetch_all(&self.pool)
1074 .await?;
1075
1076 let mut episodic = 0i64;
1077 let mut semantic = 0i64;
1078 for (tier, count) in rows {
1079 match tier.as_str() {
1080 "episodic" => episodic = count,
1081 "semantic" => semantic = count,
1082 _ => {}
1083 }
1084 }
1085 Ok((episodic, semantic))
1086 }
1087
1088 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1094 let row: (i64,) = zeph_db::query_as(sql!(
1095 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1096 ))
1097 .fetch_one(&self.pool)
1098 .await?;
1099 Ok(row.0)
1100 }
1101
1102 pub async fn promote_to_semantic(
1115 &self,
1116 conversation_id: ConversationId,
1117 merged_content: &str,
1118 original_ids: &[MessageId],
1119 ) -> Result<MessageId, MemoryError> {
1120 if original_ids.is_empty() {
1121 return Err(MemoryError::Other(
1122 "promote_to_semantic: original_ids must not be empty".into(),
1123 ));
1124 }
1125
1126 let mut tx = begin_write(&self.pool).await?;
1129
1130 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1132 let promote_insert_raw = format!(
1133 "INSERT INTO messages \
1134 (conversation_id, role, content, parts, visibility, \
1135 tier, promotion_timestamp) \
1136 VALUES (?, 'assistant', ?, '[]', 'agent_only', 'semantic', {epoch_now}) \
1137 RETURNING id"
1138 );
1139 let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1140 let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1141 .bind(conversation_id)
1142 .bind(merged_content)
1143 .fetch_one(&mut *tx)
1144 .await?;
1145
1146 let new_id = row.0;
1147
1148 for &id in original_ids {
1150 zeph_db::query(sql!(
1151 "UPDATE messages \
1152 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1153 WHERE id = ? AND deleted_at IS NULL"
1154 ))
1155 .bind(id)
1156 .execute(&mut *tx)
1157 .await?;
1158 }
1159
1160 tx.commit().await?;
1161 Ok(new_id)
1162 }
1163
1164 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1173 if ids.is_empty() {
1174 return Ok(0);
1175 }
1176 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1177 let manual_promote_raw = format!(
1178 "UPDATE messages \
1179 SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1180 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1181 );
1182 let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1183 let mut count = 0usize;
1184 for &id in ids {
1185 let result = zeph_db::query(&manual_promote_sql)
1186 .bind(id)
1187 .execute(&self.pool)
1188 .await?;
1189 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1190 }
1191 Ok(count)
1192 }
1193
1194 pub async fn increment_session_counts_for_conversation(
1203 &self,
1204 conversation_id: ConversationId,
1205 ) -> Result<(), MemoryError> {
1206 zeph_db::query(sql!(
1207 "UPDATE messages SET session_count = session_count + 1 \
1208 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1209 ))
1210 .bind(conversation_id)
1211 .execute(&self.pool)
1212 .await?;
1213 Ok(())
1214 }
1215
1216 pub async fn fetch_tiers(
1224 &self,
1225 ids: &[MessageId],
1226 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1227 if ids.is_empty() {
1228 return Ok(std::collections::HashMap::new());
1229 }
1230 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1231 let query = format!(
1232 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1233 );
1234 let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1235 for &id in ids {
1236 q = q.bind(id);
1237 }
1238 let rows = q.fetch_all(&self.pool).await?;
1239 Ok(rows.into_iter().collect())
1240 }
1241
1242 pub async fn conversations_with_unconsolidated_messages(
1250 &self,
1251 ) -> Result<Vec<ConversationId>, MemoryError> {
1252 let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1253 "SELECT DISTINCT conversation_id FROM messages \
1254 WHERE consolidated = 0 AND deleted_at IS NULL"
1255 ))
1256 .fetch_all(&self.pool)
1257 .await?;
1258 Ok(rows.into_iter().map(|(id,)| id).collect())
1259 }
1260
1261 pub async fn find_unconsolidated_messages(
1270 &self,
1271 conversation_id: ConversationId,
1272 limit: usize,
1273 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1274 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1275 let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1276 "SELECT id, content FROM messages \
1277 WHERE conversation_id = ? \
1278 AND consolidated = 0 \
1279 AND deleted_at IS NULL \
1280 ORDER BY id ASC \
1281 LIMIT ?"
1282 ))
1283 .bind(conversation_id)
1284 .bind(limit)
1285 .fetch_all(&self.pool)
1286 .await?;
1287 Ok(rows)
1288 }
1289
1290 pub async fn find_consolidated_for_source(
1299 &self,
1300 source_id: MessageId,
1301 ) -> Result<Option<MessageId>, MemoryError> {
1302 let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1303 "SELECT consolidated_id FROM memory_consolidation_sources \
1304 WHERE source_id = ? \
1305 LIMIT 1"
1306 ))
1307 .bind(source_id)
1308 .fetch_optional(&self.pool)
1309 .await?;
1310 Ok(row.map(|(id,)| id))
1311 }
1312
1313 pub async fn apply_consolidation_merge(
1327 &self,
1328 conversation_id: ConversationId,
1329 role: &str,
1330 merged_content: &str,
1331 source_ids: &[MessageId],
1332 confidence: f32,
1333 confidence_threshold: f32,
1334 ) -> Result<bool, MemoryError> {
1335 if confidence < confidence_threshold {
1336 return Ok(false);
1337 }
1338 if source_ids.is_empty() {
1339 return Ok(false);
1340 }
1341
1342 let mut tx = self.pool.begin().await?;
1343
1344 let importance = crate::semantic::importance::compute_importance(merged_content, role);
1345 let row: (MessageId,) = zeph_db::query_as(sql!(
1346 "INSERT INTO messages \
1347 (conversation_id, role, content, parts, visibility, \
1348 importance_score, consolidated, consolidation_confidence) \
1349 VALUES (?, ?, ?, '[]', 'both', ?, 1, ?) \
1350 RETURNING id"
1351 ))
1352 .bind(conversation_id)
1353 .bind(role)
1354 .bind(merged_content)
1355 .bind(importance)
1356 .bind(confidence)
1357 .fetch_one(&mut *tx)
1358 .await?;
1359 let consolidated_id = row.0;
1360
1361 let consol_sql = format!(
1362 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1363 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1364 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1365 );
1366 for &source_id in source_ids {
1367 zeph_db::query(&consol_sql)
1368 .bind(consolidated_id)
1369 .bind(source_id)
1370 .execute(&mut *tx)
1371 .await?;
1372
1373 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1375 .bind(source_id)
1376 .execute(&mut *tx)
1377 .await?;
1378 }
1379
1380 tx.commit().await?;
1381 Ok(true)
1382 }
1383
1384 pub async fn apply_consolidation_update(
1397 &self,
1398 target_id: MessageId,
1399 new_content: &str,
1400 additional_source_ids: &[MessageId],
1401 confidence: f32,
1402 confidence_threshold: f32,
1403 ) -> Result<bool, MemoryError> {
1404 if confidence < confidence_threshold {
1405 return Ok(false);
1406 }
1407
1408 let mut tx = self.pool.begin().await?;
1409
1410 zeph_db::query(sql!(
1411 "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1412 ))
1413 .bind(new_content)
1414 .bind(confidence)
1415 .bind(target_id)
1416 .execute(&mut *tx)
1417 .await?;
1418
1419 let consol_sql = format!(
1420 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1421 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1422 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1423 );
1424 for &source_id in additional_source_ids {
1425 zeph_db::query(&consol_sql)
1426 .bind(target_id)
1427 .bind(source_id)
1428 .execute(&mut *tx)
1429 .await?;
1430
1431 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1432 .bind(source_id)
1433 .execute(&mut *tx)
1434 .await?;
1435 }
1436
1437 tx.commit().await?;
1438 Ok(true)
1439 }
1440
1441 pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1451 zeph_db::query(sql!(
1452 "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1453 ))
1454 .bind(score)
1455 .bind(id)
1456 .execute(&self.pool)
1457 .await?;
1458 Ok(())
1459 }
1460
1461 pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1469 let row: Option<(f64,)> = zeph_db::query_as(sql!(
1470 "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1471 ))
1472 .bind(id)
1473 .fetch_optional(&self.pool)
1474 .await?;
1475 Ok(row.map(|(s,)| s))
1476 }
1477
1478 pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1486 self.increment_access_counts(ids).await
1487 }
1488
1489 pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1497 for &id in ids {
1498 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1499 .bind(id)
1500 .execute(&self.pool)
1501 .await?;
1502 }
1503 Ok(())
1504 }
1505
1506 pub async fn run_forgetting_sweep_tx(
1522 &self,
1523 config: &zeph_common::config::memory::ForgettingConfig,
1524 ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1525 let mut tx = self.pool.begin().await?;
1526
1527 let decay = f64::from(config.decay_rate);
1528 let floor = f64::from(config.forgetting_floor);
1529 let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1530 let replay_hours = i64::from(config.replay_window_hours);
1531 let replay_min_access = i64::from(config.replay_min_access_count);
1532 let protect_hours = i64::from(config.protect_recent_hours);
1533 let protect_min_access = i64::from(config.protect_min_access_count);
1534
1535 let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1538 "SELECT id FROM messages \
1539 WHERE deleted_at IS NULL AND consolidated = 0 \
1540 ORDER BY importance_score ASC \
1541 LIMIT ?"
1542 ))
1543 .bind(batch)
1544 .fetch_all(&mut *tx)
1545 .await?;
1546
1547 #[allow(clippy::cast_possible_truncation)]
1548 let downscaled = candidate_ids.len() as u32;
1549
1550 if downscaled > 0 {
1551 let placeholders: String = candidate_ids
1552 .iter()
1553 .map(|_| "?")
1554 .collect::<Vec<_>>()
1555 .join(",");
1556 let downscale_sql = format!(
1557 "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1558 WHERE id IN ({placeholders})"
1559 );
1560 let mut q = zeph_db::query(&downscale_sql);
1561 for &(id,) in &candidate_ids {
1562 q = q.bind(id);
1563 }
1564 q.execute(&mut *tx).await?;
1565 }
1566
1567 let replayed = if downscaled > 0 {
1573 let replay_placeholders: String = candidate_ids
1574 .iter()
1575 .map(|_| "?")
1576 .collect::<Vec<_>>()
1577 .join(",");
1578 let replay_sql = format!(
1579 "UPDATE messages \
1580 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1581 WHERE id IN ({replay_placeholders}) \
1582 AND (\
1583 (last_accessed IS NOT NULL \
1584 AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1585 OR access_count >= ?\
1586 )"
1587 );
1588 let mut rq = zeph_db::query(&replay_sql);
1589 for &(id,) in &candidate_ids {
1590 rq = rq.bind(id);
1591 }
1592 let replay_result = rq
1593 .bind(replay_hours)
1594 .bind(replay_min_access)
1595 .execute(&mut *tx)
1596 .await?;
1597 #[allow(clippy::cast_possible_truncation)]
1598 let n = replay_result.rows_affected() as u32;
1599 n
1600 } else {
1601 0
1602 };
1603
1604 let prune_sql = format!(
1606 "UPDATE messages \
1607 SET deleted_at = CURRENT_TIMESTAMP \
1608 WHERE deleted_at IS NULL AND consolidated = 0 \
1609 AND importance_score < {floor} \
1610 AND (\
1611 last_accessed IS NULL \
1612 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1613 ) \
1614 AND access_count < ?"
1615 );
1616 let prune_result = zeph_db::query(&prune_sql)
1617 .bind(protect_hours)
1618 .bind(protect_min_access)
1619 .execute(&mut *tx)
1620 .await?;
1621 #[allow(clippy::cast_possible_truncation)]
1622 let pruned = prune_result.rows_affected() as u32;
1623
1624 tx.commit().await?;
1625
1626 Ok(crate::forgetting::ForgettingResult {
1627 downscaled,
1628 replayed,
1629 pruned,
1630 })
1631 }
1632}
1633
1634#[derive(Debug, Clone)]
1636pub struct PromotionCandidate {
1637 pub id: MessageId,
1638 pub conversation_id: ConversationId,
1639 pub content: String,
1640 pub session_count: u32,
1641 pub importance_score: f64,
1642}
1643
1644#[cfg(test)]
1645mod tests;