1use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18 match s {
19 "assistant" => Role::Assistant,
20 "system" => Role::System,
21 _ => Role::User,
22 }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27 match role {
28 Role::System => "system",
29 Role::User => "user",
30 Role::Assistant => "assistant",
31 }
32}
33
34fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37 match key {
38 "Text" => Some("text"),
39 "ToolOutput" => Some("tool_output"),
40 "Recall" => Some("recall"),
41 "CodeContext" => Some("code_context"),
42 "Summary" => Some("summary"),
43 "CrossSession" => Some("cross_session"),
44 "ToolUse" => Some("tool_use"),
45 "ToolResult" => Some("tool_result"),
46 "Image" => Some("image"),
47 "ThinkingBlock" => Some("thinking_block"),
48 "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49 "Compaction" => Some("compaction"),
50 _ => None,
51 }
52}
53
54fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62 let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63 let mut result = Vec::with_capacity(array.len());
64 for element in array {
65 let obj = element.as_object()?;
66 if obj.contains_key("kind") {
67 return None;
68 }
69 if obj.len() != 1 {
70 return None;
71 }
72 let (key, inner) = obj.iter().next()?;
73 let kind = legacy_key_to_kind(key)?;
74 let mut new_obj = match inner {
75 serde_json::Value::Object(m) => m.clone(),
76 other => {
78 let mut m = serde_json::Map::new();
79 m.insert("data".to_string(), other.clone());
80 m
81 }
82 };
83 new_obj.insert(
84 "kind".to_string(),
85 serde_json::Value::String(kind.to_string()),
86 );
87 let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88 result.push(part);
89 }
90 Some(result)
91}
92
93fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98 if parts_json == "[]" {
99 return vec![];
100 }
101 match serde_json::from_str(parts_json) {
102 Ok(p) => p,
103 Err(e) => {
104 if let Some(parts) = try_parse_legacy_parts(parts_json) {
105 let truncated = parts_json.chars().take(120).collect::<String>();
106 tracing::warn!(
107 role = %role_str,
108 parts_json = %truncated,
109 "loaded legacy-format message parts via compat path"
110 );
111 return parts;
112 }
113 let truncated = parts_json.chars().take(120).collect::<String>();
114 tracing::warn!(
115 role = %role_str,
116 parts_json = %truncated,
117 error = %e,
118 "failed to deserialize message parts, falling back to empty"
119 );
120 vec![]
121 }
122 }
123}
124
125impl SqliteStore {
126 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132 let row: (ConversationId,) = zeph_db::query_as(sql!(
133 "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134 ))
135 .fetch_one(&self.pool)
136 .await?;
137 Ok(row.0)
138 }
139
140 pub async fn save_message(
146 &self,
147 conversation_id: ConversationId,
148 role: &str,
149 content: &str,
150 ) -> Result<MessageId, MemoryError> {
151 self.save_message_with_parts(conversation_id, role, content, "[]")
152 .await
153 }
154
155 pub async fn save_message_with_parts(
161 &self,
162 conversation_id: ConversationId,
163 role: &str,
164 content: &str,
165 parts_json: &str,
166 ) -> Result<MessageId, MemoryError> {
167 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
168 .await
169 }
170
171 pub async fn save_message_with_category(
179 &self,
180 conversation_id: ConversationId,
181 role: &str,
182 content: &str,
183 category: Option<&str>,
184 ) -> Result<MessageId, MemoryError> {
185 let importance_score = crate::semantic::importance::compute_importance(content, role);
186 let row: (MessageId,) = zeph_db::query_as(sql!(
187 "INSERT INTO messages \
188 (conversation_id, role, content, parts, agent_visible, user_visible, \
189 importance_score, category) \
190 VALUES (?, ?, ?, '[]', 1, 1, ?, ?) RETURNING id"
191 ))
192 .bind(conversation_id)
193 .bind(role)
194 .bind(content)
195 .bind(importance_score)
196 .bind(category)
197 .fetch_one(&self.pool)
198 .await?;
199 Ok(row.0)
200 }
201
202 pub async fn save_message_with_metadata(
208 &self,
209 conversation_id: ConversationId,
210 role: &str,
211 content: &str,
212 parts_json: &str,
213 agent_visible: bool,
214 user_visible: bool,
215 ) -> Result<MessageId, MemoryError> {
216 const MAX_BYTES: usize = 100 * 1024;
217
218 let content_cow: std::borrow::Cow<'_, str> = if content.len() > MAX_BYTES {
221 let boundary = content.floor_char_boundary(MAX_BYTES);
222 tracing::debug!(
223 original_bytes = content.len(),
224 "save_message: content exceeds 100KB, truncating"
225 );
226 std::borrow::Cow::Owned(format!(
227 "{}... [truncated, {} bytes total]",
228 &content[..boundary],
229 content.len()
230 ))
231 } else {
232 std::borrow::Cow::Borrowed(content)
233 };
234
235 let importance_score = crate::semantic::importance::compute_importance(&content_cow, role);
236 let row: (MessageId,) = zeph_db::query_as(
237 sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
238 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
239 )
240 .bind(conversation_id)
241 .bind(role)
242 .bind(content_cow.as_ref())
243 .bind(parts_json)
244 .bind(i64::from(agent_visible))
245 .bind(i64::from(user_visible))
246 .bind(importance_score)
247 .fetch_one(&self.pool)
248 .await?;
249 Ok(row.0)
250 }
251
252 pub async fn load_history(
258 &self,
259 conversation_id: ConversationId,
260 limit: u32,
261 ) -> Result<Vec<Message>, MemoryError> {
262 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
263 "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
264 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
265 WHERE conversation_id = ? AND deleted_at IS NULL \
266 ORDER BY id DESC \
267 LIMIT ?\
268 ) ORDER BY id ASC"
269 ))
270 .bind(conversation_id)
271 .bind(limit)
272 .fetch_all(&self.pool)
273 .await?;
274
275 let messages = rows
276 .into_iter()
277 .map(
278 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
279 let parts = parse_parts_json(&role_str, &parts_json);
280 Message {
281 role: parse_role(&role_str),
282 content,
283 parts,
284 metadata: MessageMetadata {
285 agent_visible: agent_visible != 0,
286 user_visible: user_visible != 0,
287 compacted_at: None,
288 deferred_summary: None,
289 focus_pinned: false,
290 focus_marker_id: None,
291 db_id: Some(row_id),
292 },
293 }
294 },
295 )
296 .collect();
297 Ok(messages)
298 }
299
300 pub async fn load_history_filtered(
308 &self,
309 conversation_id: ConversationId,
310 limit: u32,
311 agent_visible: Option<bool>,
312 user_visible: Option<bool>,
313 ) -> Result<Vec<Message>, MemoryError> {
314 let av = agent_visible.map(i64::from);
315 let uv = user_visible.map(i64::from);
316
317 let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
318 sql!("WITH recent AS (\
319 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
320 WHERE conversation_id = ? \
321 AND deleted_at IS NULL \
322 AND (? IS NULL OR agent_visible = ?) \
323 AND (? IS NULL OR user_visible = ?) \
324 ORDER BY id DESC \
325 LIMIT ?\
326 ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
327 )
328 .bind(conversation_id)
329 .bind(av)
330 .bind(av)
331 .bind(uv)
332 .bind(uv)
333 .bind(limit)
334 .fetch_all(&self.pool)
335 .await?;
336
337 let messages = rows
338 .into_iter()
339 .map(
340 |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
341 let parts = parse_parts_json(&role_str, &parts_json);
342 Message {
343 role: parse_role(&role_str),
344 content,
345 parts,
346 metadata: MessageMetadata {
347 agent_visible: agent_visible != 0,
348 user_visible: user_visible != 0,
349 compacted_at: None,
350 deferred_summary: None,
351 focus_pinned: false,
352 focus_marker_id: None,
353 db_id: Some(row_id),
354 },
355 }
356 },
357 )
358 .collect();
359 Ok(messages)
360 }
361
362 pub async fn replace_conversation(
374 &self,
375 conversation_id: ConversationId,
376 compacted_range: std::ops::RangeInclusive<MessageId>,
377 summary_role: &str,
378 summary_content: &str,
379 ) -> Result<MessageId, MemoryError> {
380 let now = {
381 let secs = std::time::SystemTime::now()
382 .duration_since(std::time::UNIX_EPOCH)
383 .unwrap_or_default()
384 .as_secs();
385 format!("{secs}")
386 };
387 let start_id = compacted_range.start().0;
388 let end_id = compacted_range.end().0;
389
390 let mut tx = self.pool.begin().await?;
391
392 zeph_db::query(sql!(
393 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
394 WHERE conversation_id = ? AND id >= ? AND id <= ?"
395 ))
396 .bind(&now)
397 .bind(conversation_id)
398 .bind(start_id)
399 .bind(end_id)
400 .execute(&mut *tx)
401 .await?;
402
403 let row: (MessageId,) = zeph_db::query_as(sql!(
405 "INSERT INTO messages \
406 (conversation_id, role, content, parts, agent_visible, user_visible) \
407 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
408 ))
409 .bind(conversation_id)
410 .bind(summary_role)
411 .bind(summary_content)
412 .fetch_one(&mut *tx)
413 .await?;
414
415 tx.commit().await?;
416
417 Ok(row.0)
418 }
419
420 pub async fn apply_tool_pair_summaries(
430 &self,
431 conversation_id: ConversationId,
432 hide_ids: &[i64],
433 summaries: &[String],
434 ) -> Result<(), MemoryError> {
435 if hide_ids.is_empty() && summaries.is_empty() {
436 return Ok(());
437 }
438
439 let now = std::time::SystemTime::now()
440 .duration_since(std::time::UNIX_EPOCH)
441 .unwrap_or_default()
442 .as_secs()
443 .to_string();
444
445 let mut tx = self.pool.begin().await?;
446
447 for &id in hide_ids {
448 zeph_db::query(sql!(
449 "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
450 ))
451 .bind(&now)
452 .bind(id)
453 .execute(&mut *tx)
454 .await?;
455 }
456
457 for summary in summaries {
458 let content = format!("[tool summary] {summary}");
459 let parts = serde_json::to_string(&[MessagePart::Summary {
460 text: summary.clone(),
461 }])
462 .unwrap_or_else(|_| "[]".to_string());
463 zeph_db::query(sql!(
464 "INSERT INTO messages \
465 (conversation_id, role, content, parts, agent_visible, user_visible) \
466 VALUES (?, 'assistant', ?, ?, 1, 0)"
467 ))
468 .bind(conversation_id)
469 .bind(&content)
470 .bind(&parts)
471 .execute(&mut *tx)
472 .await?;
473 }
474
475 tx.commit().await?;
476 Ok(())
477 }
478
479 pub async fn oldest_message_ids(
485 &self,
486 conversation_id: ConversationId,
487 n: u32,
488 ) -> Result<Vec<MessageId>, MemoryError> {
489 let rows: Vec<(MessageId,)> = zeph_db::query_as(
490 sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
491 )
492 .bind(conversation_id)
493 .bind(n)
494 .fetch_all(&self.pool)
495 .await?;
496 Ok(rows.into_iter().map(|r| r.0).collect())
497 }
498
499 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
505 let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
506 "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
507 ))
508 .fetch_optional(&self.pool)
509 .await?;
510 Ok(row.map(|r| r.0))
511 }
512
513 pub async fn message_by_id(
519 &self,
520 message_id: MessageId,
521 ) -> Result<Option<Message>, MemoryError> {
522 let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
523 sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
524 )
525 .bind(message_id)
526 .fetch_optional(&self.pool)
527 .await?;
528
529 Ok(row.map(
530 |(role_str, content, parts_json, agent_visible, user_visible)| {
531 let parts = parse_parts_json(&role_str, &parts_json);
532 Message {
533 role: parse_role(&role_str),
534 content,
535 parts,
536 metadata: MessageMetadata {
537 agent_visible: agent_visible != 0,
538 user_visible: user_visible != 0,
539 compacted_at: None,
540 deferred_summary: None,
541 focus_pinned: false,
542 focus_marker_id: None,
543 db_id: None,
544 },
545 }
546 },
547 ))
548 }
549
550 pub async fn messages_by_ids(
556 &self,
557 ids: &[MessageId],
558 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
559 if ids.is_empty() {
560 return Ok(Vec::new());
561 }
562
563 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
564
565 let query = format!(
566 "SELECT id, role, content, parts FROM messages \
567 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
568 );
569 let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
570 for &id in ids {
571 q = q.bind(id);
572 }
573
574 let rows = q.fetch_all(&self.pool).await?;
575
576 Ok(rows
577 .into_iter()
578 .map(|(id, role_str, content, parts_json)| {
579 let parts = parse_parts_json(&role_str, &parts_json);
580 (
581 id,
582 Message {
583 role: parse_role(&role_str),
584 content,
585 parts,
586 metadata: MessageMetadata::default(),
587 },
588 )
589 })
590 .collect())
591 }
592
593 pub async fn unembedded_message_ids(
599 &self,
600 limit: Option<usize>,
601 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
602 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
603
604 let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
605 "SELECT m.id, m.conversation_id, m.role, m.content \
606 FROM messages m \
607 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
608 WHERE em.id IS NULL AND m.deleted_at IS NULL \
609 ORDER BY m.id ASC \
610 LIMIT ?"
611 ))
612 .bind(effective_limit)
613 .fetch_all(&self.pool)
614 .await?;
615
616 Ok(rows)
617 }
618
619 pub async fn count_unembedded_messages(&self) -> Result<usize, MemoryError> {
625 let row: (i64,) = zeph_db::query_as(sql!(
626 "SELECT COUNT(*) FROM messages m \
627 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
628 WHERE em.id IS NULL AND m.deleted_at IS NULL"
629 ))
630 .fetch_one(&self.pool)
631 .await?;
632 Ok(usize::try_from(row.0).unwrap_or(usize::MAX))
633 }
634
635 pub fn stream_unembedded_messages(
646 &self,
647 limit: i64,
648 ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
649 {
650 zeph_db::query_as(sql!(
651 "SELECT m.id, m.conversation_id, m.role, m.content \
652 FROM messages m \
653 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
654 WHERE em.id IS NULL AND m.deleted_at IS NULL \
655 ORDER BY m.id ASC \
656 LIMIT ?"
657 ))
658 .bind(limit)
659 .fetch(&self.pool)
660 .map_err(MemoryError::from)
661 }
662
663 pub async fn count_messages(
669 &self,
670 conversation_id: ConversationId,
671 ) -> Result<i64, MemoryError> {
672 let row: (i64,) = zeph_db::query_as(sql!(
673 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
674 ))
675 .bind(conversation_id)
676 .fetch_one(&self.pool)
677 .await?;
678 Ok(row.0)
679 }
680
681 pub async fn count_messages_after(
687 &self,
688 conversation_id: ConversationId,
689 after_id: MessageId,
690 ) -> Result<i64, MemoryError> {
691 let row: (i64,) =
692 zeph_db::query_as(
693 sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
694 )
695 .bind(conversation_id)
696 .bind(after_id)
697 .fetch_one(&self.pool)
698 .await?;
699 Ok(row.0)
700 }
701
702 pub async fn keyword_search(
711 &self,
712 query: &str,
713 limit: usize,
714 conversation_id: Option<ConversationId>,
715 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
716 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
717 let safe_query = sanitize_fts_query(query);
718 if safe_query.is_empty() {
719 return Ok(Vec::new());
720 }
721
722 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
723 zeph_db::query_as(
724 sql!("SELECT m.id, -rank AS score \
725 FROM messages_fts f \
726 JOIN messages m ON m.id = f.rowid \
727 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
728 ORDER BY rank \
729 LIMIT ?"),
730 )
731 .bind(&safe_query)
732 .bind(cid)
733 .bind(effective_limit)
734 .fetch_all(&self.pool)
735 .await?
736 } else {
737 zeph_db::query_as(sql!(
738 "SELECT m.id, -rank AS score \
739 FROM messages_fts f \
740 JOIN messages m ON m.id = f.rowid \
741 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
742 ORDER BY rank \
743 LIMIT ?"
744 ))
745 .bind(&safe_query)
746 .bind(effective_limit)
747 .fetch_all(&self.pool)
748 .await?
749 };
750
751 Ok(rows)
752 }
753
754 pub async fn keyword_search_with_time_range(
767 &self,
768 query: &str,
769 limit: usize,
770 conversation_id: Option<ConversationId>,
771 after: Option<&str>,
772 before: Option<&str>,
773 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
774 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
775 let safe_query = sanitize_fts_query(query);
776 if safe_query.is_empty() {
777 return Ok(Vec::new());
778 }
779
780 let after_clause = if after.is_some() {
782 " AND m.created_at > ?"
783 } else {
784 ""
785 };
786 let before_clause = if before.is_some() {
787 " AND m.created_at < ?"
788 } else {
789 ""
790 };
791 let conv_clause = if conversation_id.is_some() {
792 " AND m.conversation_id = ?"
793 } else {
794 ""
795 };
796
797 let sql = format!(
798 "SELECT m.id, -rank AS score \
799 FROM messages_fts f \
800 JOIN messages m ON m.id = f.rowid \
801 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
802 {after_clause}{before_clause}{conv_clause} \
803 ORDER BY rank \
804 LIMIT ?"
805 );
806
807 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
808 if let Some(a) = after {
809 q = q.bind(a);
810 }
811 if let Some(b) = before {
812 q = q.bind(b);
813 }
814 if let Some(cid) = conversation_id {
815 q = q.bind(cid);
816 }
817 q = q.bind(effective_limit);
818
819 Ok(q.fetch_all(&self.pool).await?)
820 }
821
822 pub async fn message_timestamps(
830 &self,
831 ids: &[MessageId],
832 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
833 if ids.is_empty() {
834 return Ok(std::collections::HashMap::new());
835 }
836
837 let placeholders: String =
838 zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
839 let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
840 let query = format!(
841 "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
842 );
843 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
844 for &id in ids {
845 q = q.bind(id);
846 }
847
848 let rows = q.fetch_all(&self.pool).await?;
849 Ok(rows.into_iter().collect())
850 }
851
852 pub async fn load_messages_range(
858 &self,
859 conversation_id: ConversationId,
860 after_message_id: MessageId,
861 limit: usize,
862 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
863 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
864
865 let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
866 "SELECT id, role, content FROM messages \
867 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
868 ORDER BY id ASC LIMIT ?"
869 ))
870 .bind(conversation_id)
871 .bind(after_message_id)
872 .bind(effective_limit)
873 .fetch_all(&self.pool)
874 .await?;
875
876 Ok(rows)
877 }
878
879 pub async fn get_eviction_candidates(
887 &self,
888 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
889 let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
890 "SELECT id, created_at, last_accessed, access_count \
891 FROM messages WHERE deleted_at IS NULL"
892 ))
893 .fetch_all(&self.pool)
894 .await?;
895
896 Ok(rows
897 .into_iter()
898 .map(
899 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
900 id,
901 created_at,
902 last_accessed,
903 access_count: access_count.try_into().unwrap_or(0),
904 },
905 )
906 .collect())
907 }
908
909 pub async fn soft_delete_messages(
917 &self,
918 ids: &[MessageId],
919 ) -> Result<(), crate::error::MemoryError> {
920 if ids.is_empty() {
921 return Ok(());
922 }
923 for &id in ids {
925 zeph_db::query(
926 sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
927 )
928 .bind(id)
929 .execute(&self.pool)
930 .await?;
931 }
932 Ok(())
933 }
934
935 pub async fn get_soft_deleted_message_ids(
941 &self,
942 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
943 let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
944 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
945 ))
946 .fetch_all(&self.pool)
947 .await?;
948 Ok(rows.into_iter().map(|(id,)| id).collect())
949 }
950
951 pub async fn mark_qdrant_cleaned(
957 &self,
958 ids: &[MessageId],
959 ) -> Result<(), crate::error::MemoryError> {
960 for &id in ids {
961 zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
962 .bind(id)
963 .execute(&self.pool)
964 .await?;
965 }
966 Ok(())
967 }
968
969 pub async fn fetch_importance_scores(
977 &self,
978 ids: &[MessageId],
979 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
980 if ids.is_empty() {
981 return Ok(std::collections::HashMap::new());
982 }
983 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
984 let query = format!(
985 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
986 );
987 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
988 for &id in ids {
989 q = q.bind(id);
990 }
991 let rows = q.fetch_all(&self.pool).await?;
992 Ok(rows.into_iter().collect())
993 }
994
995 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1003 if ids.is_empty() {
1004 return Ok(());
1005 }
1006 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1007 let query = format!(
1008 "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
1009 WHERE id IN ({placeholders})"
1010 );
1011 let mut q = zeph_db::query(&query);
1012 for &id in ids {
1013 q = q.bind(id);
1014 }
1015 q.execute(&self.pool).await?;
1016 Ok(())
1017 }
1018
1019 pub async fn find_promotion_candidates(
1028 &self,
1029 min_sessions: u32,
1030 batch_size: usize,
1031 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
1032 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
1033 let min = i64::from(min_sessions);
1034 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1035 "SELECT id, conversation_id, content, session_count, importance_score \
1036 FROM messages \
1037 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1038 ORDER BY session_count DESC, importance_score DESC \
1039 LIMIT ?"
1040 ))
1041 .bind(min)
1042 .bind(limit)
1043 .fetch_all(&self.pool)
1044 .await?;
1045
1046 Ok(rows
1047 .into_iter()
1048 .map(
1049 |(id, conversation_id, content, session_count, importance_score)| {
1050 PromotionCandidate {
1051 id,
1052 conversation_id,
1053 content,
1054 session_count: session_count.try_into().unwrap_or(0),
1055 importance_score,
1056 }
1057 },
1058 )
1059 .collect())
1060 }
1061
1062 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1070 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1071 "SELECT tier, COUNT(*) FROM messages \
1072 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1073 GROUP BY tier"
1074 ))
1075 .fetch_all(&self.pool)
1076 .await?;
1077
1078 let mut episodic = 0i64;
1079 let mut semantic = 0i64;
1080 for (tier, count) in rows {
1081 match tier.as_str() {
1082 "episodic" => episodic = count,
1083 "semantic" => semantic = count,
1084 _ => {}
1085 }
1086 }
1087 Ok((episodic, semantic))
1088 }
1089
1090 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1096 let row: (i64,) = zeph_db::query_as(sql!(
1097 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1098 ))
1099 .fetch_one(&self.pool)
1100 .await?;
1101 Ok(row.0)
1102 }
1103
1104 pub async fn promote_to_semantic(
1117 &self,
1118 conversation_id: ConversationId,
1119 merged_content: &str,
1120 original_ids: &[MessageId],
1121 ) -> Result<MessageId, MemoryError> {
1122 if original_ids.is_empty() {
1123 return Err(MemoryError::Other(
1124 "promote_to_semantic: original_ids must not be empty".into(),
1125 ));
1126 }
1127
1128 let mut tx = begin_write(&self.pool).await?;
1131
1132 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1134 let promote_insert_raw = format!(
1135 "INSERT INTO messages \
1136 (conversation_id, role, content, parts, agent_visible, user_visible, \
1137 tier, promotion_timestamp) \
1138 VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1139 RETURNING id"
1140 );
1141 let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1142 let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1143 .bind(conversation_id)
1144 .bind(merged_content)
1145 .fetch_one(&mut *tx)
1146 .await?;
1147
1148 let new_id = row.0;
1149
1150 for &id in original_ids {
1152 zeph_db::query(sql!(
1153 "UPDATE messages \
1154 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1155 WHERE id = ? AND deleted_at IS NULL"
1156 ))
1157 .bind(id)
1158 .execute(&mut *tx)
1159 .await?;
1160 }
1161
1162 tx.commit().await?;
1163 Ok(new_id)
1164 }
1165
1166 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1175 if ids.is_empty() {
1176 return Ok(0);
1177 }
1178 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1179 let manual_promote_raw = format!(
1180 "UPDATE messages \
1181 SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1182 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1183 );
1184 let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1185 let mut count = 0usize;
1186 for &id in ids {
1187 let result = zeph_db::query(&manual_promote_sql)
1188 .bind(id)
1189 .execute(&self.pool)
1190 .await?;
1191 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1192 }
1193 Ok(count)
1194 }
1195
1196 pub async fn increment_session_counts_for_conversation(
1205 &self,
1206 conversation_id: ConversationId,
1207 ) -> Result<(), MemoryError> {
1208 zeph_db::query(sql!(
1209 "UPDATE messages SET session_count = session_count + 1 \
1210 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1211 ))
1212 .bind(conversation_id)
1213 .execute(&self.pool)
1214 .await?;
1215 Ok(())
1216 }
1217
1218 pub async fn fetch_tiers(
1226 &self,
1227 ids: &[MessageId],
1228 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1229 if ids.is_empty() {
1230 return Ok(std::collections::HashMap::new());
1231 }
1232 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1233 let query = format!(
1234 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1235 );
1236 let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1237 for &id in ids {
1238 q = q.bind(id);
1239 }
1240 let rows = q.fetch_all(&self.pool).await?;
1241 Ok(rows.into_iter().collect())
1242 }
1243
1244 pub async fn conversations_with_unconsolidated_messages(
1252 &self,
1253 ) -> Result<Vec<ConversationId>, MemoryError> {
1254 let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1255 "SELECT DISTINCT conversation_id FROM messages \
1256 WHERE consolidated = 0 AND deleted_at IS NULL"
1257 ))
1258 .fetch_all(&self.pool)
1259 .await?;
1260 Ok(rows.into_iter().map(|(id,)| id).collect())
1261 }
1262
1263 pub async fn find_unconsolidated_messages(
1272 &self,
1273 conversation_id: ConversationId,
1274 limit: usize,
1275 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1276 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1277 let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1278 "SELECT id, content FROM messages \
1279 WHERE conversation_id = ? \
1280 AND consolidated = 0 \
1281 AND deleted_at IS NULL \
1282 ORDER BY id ASC \
1283 LIMIT ?"
1284 ))
1285 .bind(conversation_id)
1286 .bind(limit)
1287 .fetch_all(&self.pool)
1288 .await?;
1289 Ok(rows)
1290 }
1291
1292 pub async fn find_consolidated_for_source(
1301 &self,
1302 source_id: MessageId,
1303 ) -> Result<Option<MessageId>, MemoryError> {
1304 let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1305 "SELECT consolidated_id FROM memory_consolidation_sources \
1306 WHERE source_id = ? \
1307 LIMIT 1"
1308 ))
1309 .bind(source_id)
1310 .fetch_optional(&self.pool)
1311 .await?;
1312 Ok(row.map(|(id,)| id))
1313 }
1314
1315 pub async fn apply_consolidation_merge(
1329 &self,
1330 conversation_id: ConversationId,
1331 role: &str,
1332 merged_content: &str,
1333 source_ids: &[MessageId],
1334 confidence: f32,
1335 confidence_threshold: f32,
1336 ) -> Result<bool, MemoryError> {
1337 if confidence < confidence_threshold {
1338 return Ok(false);
1339 }
1340 if source_ids.is_empty() {
1341 return Ok(false);
1342 }
1343
1344 let mut tx = self.pool.begin().await?;
1345
1346 let importance = crate::semantic::importance::compute_importance(merged_content, role);
1347 let row: (MessageId,) = zeph_db::query_as(sql!(
1348 "INSERT INTO messages \
1349 (conversation_id, role, content, parts, agent_visible, user_visible, \
1350 importance_score, consolidated, consolidation_confidence) \
1351 VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1352 RETURNING id"
1353 ))
1354 .bind(conversation_id)
1355 .bind(role)
1356 .bind(merged_content)
1357 .bind(importance)
1358 .bind(confidence)
1359 .fetch_one(&mut *tx)
1360 .await?;
1361 let consolidated_id = row.0;
1362
1363 let consol_sql = format!(
1364 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1365 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1366 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1367 );
1368 for &source_id in source_ids {
1369 zeph_db::query(&consol_sql)
1370 .bind(consolidated_id)
1371 .bind(source_id)
1372 .execute(&mut *tx)
1373 .await?;
1374
1375 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1377 .bind(source_id)
1378 .execute(&mut *tx)
1379 .await?;
1380 }
1381
1382 tx.commit().await?;
1383 Ok(true)
1384 }
1385
1386 pub async fn apply_consolidation_update(
1399 &self,
1400 target_id: MessageId,
1401 new_content: &str,
1402 additional_source_ids: &[MessageId],
1403 confidence: f32,
1404 confidence_threshold: f32,
1405 ) -> Result<bool, MemoryError> {
1406 if confidence < confidence_threshold {
1407 return Ok(false);
1408 }
1409
1410 let mut tx = self.pool.begin().await?;
1411
1412 zeph_db::query(sql!(
1413 "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1414 ))
1415 .bind(new_content)
1416 .bind(confidence)
1417 .bind(target_id)
1418 .execute(&mut *tx)
1419 .await?;
1420
1421 let consol_sql = format!(
1422 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1423 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1424 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1425 );
1426 for &source_id in additional_source_ids {
1427 zeph_db::query(&consol_sql)
1428 .bind(target_id)
1429 .bind(source_id)
1430 .execute(&mut *tx)
1431 .await?;
1432
1433 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1434 .bind(source_id)
1435 .execute(&mut *tx)
1436 .await?;
1437 }
1438
1439 tx.commit().await?;
1440 Ok(true)
1441 }
1442
1443 pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1453 zeph_db::query(sql!(
1454 "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1455 ))
1456 .bind(score)
1457 .bind(id)
1458 .execute(&self.pool)
1459 .await?;
1460 Ok(())
1461 }
1462
1463 pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1471 let row: Option<(f64,)> = zeph_db::query_as(sql!(
1472 "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1473 ))
1474 .bind(id)
1475 .fetch_optional(&self.pool)
1476 .await?;
1477 Ok(row.map(|(s,)| s))
1478 }
1479
1480 pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1488 self.increment_access_counts(ids).await
1489 }
1490
1491 pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1499 for &id in ids {
1500 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1501 .bind(id)
1502 .execute(&self.pool)
1503 .await?;
1504 }
1505 Ok(())
1506 }
1507
1508 pub async fn run_forgetting_sweep_tx(
1524 &self,
1525 config: &zeph_common::config::memory::ForgettingConfig,
1526 ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1527 let mut tx = self.pool.begin().await?;
1528
1529 let decay = f64::from(config.decay_rate);
1530 let floor = f64::from(config.forgetting_floor);
1531 let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1532 let replay_hours = i64::from(config.replay_window_hours);
1533 let replay_min_access = i64::from(config.replay_min_access_count);
1534 let protect_hours = i64::from(config.protect_recent_hours);
1535 let protect_min_access = i64::from(config.protect_min_access_count);
1536
1537 let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1540 "SELECT id FROM messages \
1541 WHERE deleted_at IS NULL AND consolidated = 0 \
1542 ORDER BY importance_score ASC \
1543 LIMIT ?"
1544 ))
1545 .bind(batch)
1546 .fetch_all(&mut *tx)
1547 .await?;
1548
1549 #[allow(clippy::cast_possible_truncation)]
1550 let downscaled = candidate_ids.len() as u32;
1551
1552 if downscaled > 0 {
1553 let placeholders: String = candidate_ids
1554 .iter()
1555 .map(|_| "?")
1556 .collect::<Vec<_>>()
1557 .join(",");
1558 let downscale_sql = format!(
1559 "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1560 WHERE id IN ({placeholders})"
1561 );
1562 let mut q = zeph_db::query(&downscale_sql);
1563 for &(id,) in &candidate_ids {
1564 q = q.bind(id);
1565 }
1566 q.execute(&mut *tx).await?;
1567 }
1568
1569 let replayed = if downscaled > 0 {
1575 let replay_placeholders: String = candidate_ids
1576 .iter()
1577 .map(|_| "?")
1578 .collect::<Vec<_>>()
1579 .join(",");
1580 let replay_sql = format!(
1581 "UPDATE messages \
1582 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1583 WHERE id IN ({replay_placeholders}) \
1584 AND (\
1585 (last_accessed IS NOT NULL \
1586 AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1587 OR access_count >= ?\
1588 )"
1589 );
1590 let mut rq = zeph_db::query(&replay_sql);
1591 for &(id,) in &candidate_ids {
1592 rq = rq.bind(id);
1593 }
1594 let replay_result = rq
1595 .bind(replay_hours)
1596 .bind(replay_min_access)
1597 .execute(&mut *tx)
1598 .await?;
1599 #[allow(clippy::cast_possible_truncation)]
1600 let n = replay_result.rows_affected() as u32;
1601 n
1602 } else {
1603 0
1604 };
1605
1606 let prune_sql = format!(
1608 "UPDATE messages \
1609 SET deleted_at = CURRENT_TIMESTAMP \
1610 WHERE deleted_at IS NULL AND consolidated = 0 \
1611 AND importance_score < {floor} \
1612 AND (\
1613 last_accessed IS NULL \
1614 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1615 ) \
1616 AND access_count < ?"
1617 );
1618 let prune_result = zeph_db::query(&prune_sql)
1619 .bind(protect_hours)
1620 .bind(protect_min_access)
1621 .execute(&mut *tx)
1622 .await?;
1623 #[allow(clippy::cast_possible_truncation)]
1624 let pruned = prune_result.rows_affected() as u32;
1625
1626 tx.commit().await?;
1627
1628 Ok(crate::forgetting::ForgettingResult {
1629 downscaled,
1630 replayed,
1631 pruned,
1632 })
1633 }
1634}
1635
1636#[derive(Debug, Clone)]
1638pub struct PromotionCandidate {
1639 pub id: MessageId,
1640 pub conversation_id: ConversationId,
1641 pub content: String,
1642 pub session_count: u32,
1643 pub importance_score: f64,
1644}
1645
1646#[cfg(test)]
1647mod tests;