1use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_common::ContextFidelity;
8use zeph_db::ActiveDialect;
9use zeph_db::fts::sanitize_fts_query;
10#[allow(unused_imports)]
11use zeph_db::{begin_write, placeholder_list, sql};
12use zeph_llm::provider::{Message, MessageMetadata, MessagePart, MessageVisibility, Role};
13
14use super::SqliteStore;
15use crate::error::MemoryError;
16use crate::types::{ConversationId, MessageId};
17
18fn parse_role(s: &str) -> Role {
19 match s {
20 "assistant" => Role::Assistant,
21 "system" => Role::System,
22 _ => Role::User,
23 }
24}
25
26#[must_use]
27pub fn role_str(role: Role) -> &'static str {
28 match role {
29 Role::System => "system",
30 Role::Assistant => "assistant",
31 Role::User | _ => "user",
32 }
33}
34
35fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
38 match key {
39 "Text" => Some("text"),
40 "ToolOutput" => Some("tool_output"),
41 "Recall" => Some("recall"),
42 "CodeContext" => Some("code_context"),
43 "Summary" => Some("summary"),
44 "CrossSession" => Some("cross_session"),
45 "ToolUse" => Some("tool_use"),
46 "ToolResult" => Some("tool_result"),
47 "Image" => Some("image"),
48 "ThinkingBlock" => Some("thinking_block"),
49 "RedactedThinkingBlock" => Some("redacted_thinking_block"),
50 "Compaction" => Some("compaction"),
51 _ => None,
52 }
53}
54
55fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
63 let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
64 let mut result = Vec::with_capacity(array.len());
65 for element in array {
66 let obj = element.as_object()?;
67 if obj.contains_key("kind") {
68 return None;
69 }
70 if obj.len() != 1 {
71 return None;
72 }
73 let (key, inner) = obj.iter().next()?;
74 let kind = legacy_key_to_kind(key)?;
75 let mut new_obj = match inner {
76 serde_json::Value::Object(m) => m.clone(),
77 other => {
79 let mut m = serde_json::Map::new();
80 m.insert("data".to_string(), other.clone());
81 m
82 }
83 };
84 new_obj.insert(
85 "kind".to_string(),
86 serde_json::Value::String(kind.to_string()),
87 );
88 let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
89 result.push(part);
90 }
91 Some(result)
92}
93
94fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
99 if parts_json == "[]" {
100 return vec![];
101 }
102 match serde_json::from_str(parts_json) {
103 Ok(p) => p,
104 Err(e) => {
105 if let Some(parts) = try_parse_legacy_parts(parts_json) {
106 let truncated = parts_json.chars().take(120).collect::<String>();
107 tracing::warn!(
108 role = %role_str,
109 parts_json = %truncated,
110 "loaded legacy-format message parts via compat path"
111 );
112 return parts;
113 }
114 let truncated = parts_json.chars().take(120).collect::<String>();
115 tracing::warn!(
116 role = %role_str,
117 parts_json = %truncated,
118 error = %e,
119 "failed to deserialize message parts, falling back to empty"
120 );
121 vec![]
122 }
123 }
124}
125
126impl SqliteStore {
127 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
133 let row: (ConversationId,) = zeph_db::query_as(sql!(
134 "INSERT INTO conversations DEFAULT VALUES RETURNING id"
135 ))
136 .fetch_one(&self.pool)
137 .await?;
138 Ok(row.0)
139 }
140
141 pub async fn save_message(
147 &self,
148 conversation_id: ConversationId,
149 role: &str,
150 content: &str,
151 ) -> Result<MessageId, MemoryError> {
152 self.save_message_with_parts(conversation_id, role, content, "[]")
153 .await
154 }
155
156 pub async fn save_message_with_parts(
162 &self,
163 conversation_id: ConversationId,
164 role: &str,
165 content: &str,
166 parts_json: &str,
167 ) -> Result<MessageId, MemoryError> {
168 self.save_message_with_metadata(
169 conversation_id,
170 role,
171 content,
172 parts_json,
173 MessageVisibility::Both,
174 )
175 .await
176 }
177
178 pub async fn save_message_with_category(
186 &self,
187 conversation_id: ConversationId,
188 role: &str,
189 content: &str,
190 category: Option<&str>,
191 ) -> Result<MessageId, MemoryError> {
192 let importance_score = crate::semantic::importance::compute_importance(content, role);
193 let row: (MessageId,) = zeph_db::query_as(sql!(
194 "INSERT INTO messages \
195 (conversation_id, role, content, parts, visibility, \
196 importance_score, category) \
197 VALUES (?, ?, ?, '[]', 'both', ?, ?) RETURNING id"
198 ))
199 .bind(conversation_id)
200 .bind(role)
201 .bind(content)
202 .bind(importance_score)
203 .bind(category)
204 .fetch_one(&self.pool)
205 .await?;
206 Ok(row.0)
207 }
208
209 pub async fn save_message_with_metadata(
215 &self,
216 conversation_id: ConversationId,
217 role: &str,
218 content: &str,
219 parts_json: &str,
220 visibility: MessageVisibility,
221 ) -> Result<MessageId, MemoryError> {
222 const MAX_BYTES: usize = 100 * 1024;
223
224 let content_cow: std::borrow::Cow<'_, str> = if content.len() > MAX_BYTES {
227 let boundary = content.floor_char_boundary(MAX_BYTES);
228 tracing::debug!(
229 original_bytes = content.len(),
230 "save_message: content exceeds 100KB, truncating"
231 );
232 std::borrow::Cow::Owned(format!(
233 "{}... [truncated, {} bytes total]",
234 &content[..boundary],
235 content.len()
236 ))
237 } else {
238 std::borrow::Cow::Borrowed(content)
239 };
240
241 let importance_score = crate::semantic::importance::compute_importance(&content_cow, role);
242 let row: (MessageId,) = zeph_db::query_as(
243 sql!("INSERT INTO messages (conversation_id, role, content, parts, visibility, importance_score) \
244 VALUES (?, ?, ?, ?, ?, ?) RETURNING id"),
245 )
246 .bind(conversation_id)
247 .bind(role)
248 .bind(content_cow.as_ref())
249 .bind(parts_json)
250 .bind(visibility.as_db_str())
251 .bind(importance_score)
252 .fetch_one(&self.pool)
253 .await?;
254 Ok(row.0)
255 }
256
257 pub async fn load_history(
267 &self,
268 conversation_id: ConversationId,
269 limit: u32,
270 ) -> Result<Vec<Message>, MemoryError> {
271 let rows: Vec<(String, String, String, String, i64, i32)> = zeph_db::query_as(sql!(
272 "SELECT role, content, parts, visibility, id, fidelity_tag FROM (\
273 SELECT role, content, parts, visibility, id, fidelity_tag FROM messages \
274 WHERE conversation_id = ? AND deleted_at IS NULL \
275 ORDER BY id DESC \
276 LIMIT ?\
277 ) ORDER BY id ASC"
278 ))
279 .bind(conversation_id)
280 .bind(i64::from(limit))
281 .fetch_all(&self.pool)
282 .await?;
283
284 let messages = rows
285 .into_iter()
286 .map(
287 |(role_str, content, parts_json, visibility_str, row_id, fidelity_raw)| {
288 let parts = parse_parts_json(&role_str, &parts_json);
289 Message {
290 role: parse_role(&role_str),
291 content,
292 parts,
293 metadata: MessageMetadata {
294 visibility: MessageVisibility::from_db_str(&visibility_str),
295 compacted_at: None,
296 deferred_summary: None,
297 focus_pinned: false,
298 focus_marker_id: None,
299 db_id: Some(row_id),
300 fidelity_tag: if fidelity_raw == 0 {
301 None
302 } else {
303 u8::try_from(fidelity_raw)
304 .ok()
305 .map(ContextFidelity::from_u8)
306 },
307 embedding: None,
308 },
309 }
310 },
311 )
312 .collect();
313 Ok(messages)
314 }
315
316 pub async fn load_history_filtered(
328 &self,
329 conversation_id: ConversationId,
330 limit: u32,
331 agent_visible: Option<bool>,
332 user_visible: Option<bool>,
333 ) -> Result<Vec<Message>, MemoryError> {
334 let exclude_user_only = agent_visible == Some(true);
340 let exclude_agent_only = user_visible == Some(true);
341
342 let rows: Vec<(String, String, String, String, i64, i32)> = zeph_db::query_as(sql!(
343 "WITH recent AS (\
344 SELECT role, content, parts, visibility, id, fidelity_tag FROM messages \
345 WHERE conversation_id = ? \
346 AND deleted_at IS NULL \
347 AND (NOT ? OR visibility != 'user_only') \
348 AND (NOT ? OR visibility != 'agent_only') \
349 ORDER BY id DESC \
350 LIMIT ?\
351 ) SELECT role, content, parts, visibility, id, fidelity_tag FROM recent ORDER BY id ASC"
352 ))
353 .bind(conversation_id)
354 .bind(exclude_user_only)
355 .bind(exclude_agent_only)
356 .bind(i64::from(limit))
357 .fetch_all(&self.pool)
358 .await?;
359
360 let messages = rows
361 .into_iter()
362 .map(
363 |(role_str, content, parts_json, visibility_str, row_id, fidelity_raw)| {
364 let parts = parse_parts_json(&role_str, &parts_json);
365 Message {
366 role: parse_role(&role_str),
367 content,
368 parts,
369 metadata: MessageMetadata {
370 visibility: MessageVisibility::from_db_str(&visibility_str),
371 compacted_at: None,
372 deferred_summary: None,
373 focus_pinned: false,
374 focus_marker_id: None,
375 db_id: Some(row_id),
376 fidelity_tag: if fidelity_raw == 0 {
377 None
378 } else {
379 u8::try_from(fidelity_raw)
380 .ok()
381 .map(ContextFidelity::from_u8)
382 },
383 embedding: None,
384 },
385 }
386 },
387 )
388 .collect();
389 Ok(messages)
390 }
391
392 pub async fn update_fidelity_tags(
405 &self,
406 updates: &[(MessageId, u8)],
407 ) -> Result<(), MemoryError> {
408 const MAX_FIDELITY_BATCH: usize = 333;
411 if updates.is_empty() {
412 return Ok(());
413 }
414 let mut tx = self.pool.begin().await?;
415 for chunk in updates.chunks(MAX_FIDELITY_BATCH) {
416 let case_arms: String = chunk
417 .iter()
418 .map(|_| "WHEN ? THEN ?")
419 .collect::<Vec<_>>()
420 .join(" ");
421 let in_list: String = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
422 let sql = format!(
423 "UPDATE messages SET fidelity_tag = CASE id {case_arms} END WHERE id IN ({in_list})"
424 );
425 let mut q = zeph_db::query(&sql);
426 for &(id, tag) in chunk {
427 q = q.bind(id.0).bind(i32::from(tag));
428 }
429 for &(id, _) in chunk {
430 q = q.bind(id.0);
431 }
432 q.execute(&mut *tx).await?;
433 }
434 tx.commit().await?;
435 Ok(())
436 }
437
438 pub async fn replace_conversation(
450 &self,
451 conversation_id: ConversationId,
452 compacted_range: std::ops::RangeInclusive<MessageId>,
453 summary_role: &str,
454 summary_content: &str,
455 ) -> Result<MessageId, MemoryError> {
456 let now = {
457 let secs = std::time::SystemTime::now()
458 .duration_since(std::time::UNIX_EPOCH)
459 .unwrap_or_default()
460 .as_secs();
461 format!("{secs}")
462 };
463 let start_id = compacted_range.start().0;
464 let end_id = compacted_range.end().0;
465
466 let mut tx = self.pool.begin().await?;
467
468 zeph_db::query(sql!(
469 "UPDATE messages SET visibility = 'user_only', compacted_at = ? \
470 WHERE conversation_id = ? AND id >= ? AND id <= ?"
471 ))
472 .bind(&now)
473 .bind(conversation_id)
474 .bind(start_id)
475 .bind(end_id)
476 .execute(&mut *tx)
477 .await?;
478
479 let row: (MessageId,) = zeph_db::query_as(sql!(
481 "INSERT INTO messages \
482 (conversation_id, role, content, parts, visibility) \
483 VALUES (?, ?, ?, '[]', 'agent_only') RETURNING id"
484 ))
485 .bind(conversation_id)
486 .bind(summary_role)
487 .bind(summary_content)
488 .fetch_one(&mut *tx)
489 .await?;
490
491 tx.commit().await?;
492
493 Ok(row.0)
494 }
495
496 pub async fn apply_tool_pair_summaries(
506 &self,
507 conversation_id: ConversationId,
508 hide_ids: &[i64],
509 summaries: &[String],
510 ) -> Result<(), MemoryError> {
511 if hide_ids.is_empty() && summaries.is_empty() {
512 return Ok(());
513 }
514
515 let now = std::time::SystemTime::now()
516 .duration_since(std::time::UNIX_EPOCH)
517 .unwrap_or_default()
518 .as_secs()
519 .to_string();
520
521 let mut tx = self.pool.begin().await?;
522
523 for &id in hide_ids {
524 zeph_db::query(sql!(
525 "UPDATE messages SET visibility = 'user_only', compacted_at = ? WHERE id = ?"
526 ))
527 .bind(&now)
528 .bind(id)
529 .execute(&mut *tx)
530 .await?;
531 }
532
533 for summary in summaries {
534 let content = format!("[tool summary] {summary}");
535 let parts = serde_json::to_string(&[MessagePart::Summary {
536 text: summary.clone(),
537 }])
538 .unwrap_or_else(|_| "[]".to_string());
539 zeph_db::query(sql!(
540 "INSERT INTO messages \
541 (conversation_id, role, content, parts, visibility) \
542 VALUES (?, 'assistant', ?, ?, 'agent_only')"
543 ))
544 .bind(conversation_id)
545 .bind(&content)
546 .bind(&parts)
547 .execute(&mut *tx)
548 .await?;
549 }
550
551 tx.commit().await?;
552 Ok(())
553 }
554
555 pub async fn oldest_message_ids(
561 &self,
562 conversation_id: ConversationId,
563 n: u32,
564 ) -> Result<Vec<MessageId>, MemoryError> {
565 let rows: Vec<(MessageId,)> = zeph_db::query_as(
566 sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
567 )
568 .bind(conversation_id)
569 .bind(i64::from(n))
570 .fetch_all(&self.pool)
571 .await?;
572 Ok(rows.into_iter().map(|r| r.0).collect())
573 }
574
575 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
581 let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
582 "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
583 ))
584 .fetch_optional(&self.pool)
585 .await?;
586 Ok(row.map(|r| r.0))
587 }
588
589 pub async fn message_by_id(
595 &self,
596 message_id: MessageId,
597 ) -> Result<Option<Message>, MemoryError> {
598 let row: Option<(String, String, String, String)> = zeph_db::query_as(
599 sql!("SELECT role, content, parts, visibility FROM messages WHERE id = ? AND deleted_at IS NULL"),
600 )
601 .bind(message_id)
602 .fetch_optional(&self.pool)
603 .await?;
604
605 Ok(row.map(|(role_str, content, parts_json, visibility_str)| {
606 let parts = parse_parts_json(&role_str, &parts_json);
607 Message {
608 role: parse_role(&role_str),
609 content,
610 parts,
611 metadata: MessageMetadata {
612 visibility: MessageVisibility::from_db_str(&visibility_str),
613 compacted_at: None,
614 deferred_summary: None,
615 focus_pinned: false,
616 focus_marker_id: None,
617 db_id: None,
618 fidelity_tag: None,
619 embedding: None,
620 },
621 }
622 }))
623 }
624
625 pub async fn messages_by_ids(
631 &self,
632 ids: &[MessageId],
633 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
634 if ids.is_empty() {
635 return Ok(Vec::new());
636 }
637
638 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
639
640 let query = format!(
641 "SELECT id, role, content, parts FROM messages \
642 WHERE id IN ({placeholders}) AND visibility != 'user_only' AND deleted_at IS NULL"
643 );
644 let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
645 for &id in ids {
646 q = q.bind(id);
647 }
648
649 let rows = q.fetch_all(&self.pool).await?;
650
651 Ok(rows
652 .into_iter()
653 .map(|(id, role_str, content, parts_json)| {
654 let parts = parse_parts_json(&role_str, &parts_json);
655 (
656 id,
657 Message {
658 role: parse_role(&role_str),
659 content,
660 parts,
661 metadata: MessageMetadata {
662 db_id: Some(id.0),
663 ..MessageMetadata::default()
664 },
665 },
666 )
667 })
668 .collect())
669 }
670
671 pub async fn unembedded_message_ids(
677 &self,
678 limit: Option<usize>,
679 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
680 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
681
682 let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
683 "SELECT m.id, m.conversation_id, m.role, m.content \
684 FROM messages m \
685 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
686 WHERE em.id IS NULL AND m.deleted_at IS NULL \
687 ORDER BY m.id ASC \
688 LIMIT ?"
689 ))
690 .bind(effective_limit)
691 .fetch_all(&self.pool)
692 .await?;
693
694 Ok(rows)
695 }
696
697 pub async fn count_unembedded_messages(&self) -> Result<usize, MemoryError> {
703 let row: (i64,) = zeph_db::query_as(sql!(
704 "SELECT COUNT(*) FROM messages m \
705 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
706 WHERE em.id IS NULL AND m.deleted_at IS NULL"
707 ))
708 .fetch_one(&self.pool)
709 .await?;
710 Ok(usize::try_from(row.0).unwrap_or(usize::MAX))
711 }
712
713 pub fn stream_unembedded_messages(
724 &self,
725 limit: i64,
726 ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
727 {
728 zeph_db::query_as(sql!(
729 "SELECT m.id, m.conversation_id, m.role, m.content \
730 FROM messages m \
731 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
732 WHERE em.id IS NULL AND m.deleted_at IS NULL \
733 ORDER BY m.id ASC \
734 LIMIT ?"
735 ))
736 .bind(limit)
737 .fetch(&self.pool)
738 .map_err(MemoryError::from)
739 }
740
741 pub async fn count_messages(
747 &self,
748 conversation_id: ConversationId,
749 ) -> Result<i64, MemoryError> {
750 let row: (i64,) = zeph_db::query_as(sql!(
751 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
752 ))
753 .bind(conversation_id)
754 .fetch_one(&self.pool)
755 .await?;
756 Ok(row.0)
757 }
758
759 pub async fn count_messages_after(
765 &self,
766 conversation_id: ConversationId,
767 after_id: MessageId,
768 ) -> Result<i64, MemoryError> {
769 let row: (i64,) =
770 zeph_db::query_as(
771 sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
772 )
773 .bind(conversation_id)
774 .bind(after_id)
775 .fetch_one(&self.pool)
776 .await?;
777 Ok(row.0)
778 }
779
780 pub async fn keyword_search(
789 &self,
790 query: &str,
791 limit: usize,
792 conversation_id: Option<ConversationId>,
793 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
794 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
795 let safe_query = sanitize_fts_query(query);
796 if safe_query.is_empty() {
797 return Ok(Vec::new());
798 }
799
800 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
801 zeph_db::query_as(
802 sql!("SELECT m.id, -rank AS score \
803 FROM messages_fts f \
804 JOIN messages m ON m.id = f.rowid \
805 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
806 ORDER BY rank \
807 LIMIT ?"),
808 )
809 .bind(&safe_query)
810 .bind(cid)
811 .bind(effective_limit)
812 .fetch_all(&self.pool)
813 .await?
814 } else {
815 zeph_db::query_as(sql!(
816 "SELECT m.id, -rank AS score \
817 FROM messages_fts f \
818 JOIN messages m ON m.id = f.rowid \
819 WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
820 ORDER BY rank \
821 LIMIT ?"
822 ))
823 .bind(&safe_query)
824 .bind(effective_limit)
825 .fetch_all(&self.pool)
826 .await?
827 };
828
829 Ok(rows)
830 }
831
832 pub async fn keyword_search_with_time_range(
845 &self,
846 query: &str,
847 limit: usize,
848 conversation_id: Option<ConversationId>,
849 after: Option<&str>,
850 before: Option<&str>,
851 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
852 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
853 let safe_query = sanitize_fts_query(query);
854 if safe_query.is_empty() {
855 return Ok(Vec::new());
856 }
857
858 let after_clause = if after.is_some() {
860 " AND m.created_at > ?"
861 } else {
862 ""
863 };
864 let before_clause = if before.is_some() {
865 " AND m.created_at < ?"
866 } else {
867 ""
868 };
869 let conv_clause = if conversation_id.is_some() {
870 " AND m.conversation_id = ?"
871 } else {
872 ""
873 };
874
875 let sql = format!(
876 "SELECT m.id, -rank AS score \
877 FROM messages_fts f \
878 JOIN messages m ON m.id = f.rowid \
879 WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL\
880 {after_clause}{before_clause}{conv_clause} \
881 ORDER BY rank \
882 LIMIT ?"
883 );
884
885 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
886 if let Some(a) = after {
887 q = q.bind(a);
888 }
889 if let Some(b) = before {
890 q = q.bind(b);
891 }
892 if let Some(cid) = conversation_id {
893 q = q.bind(cid);
894 }
895 q = q.bind(effective_limit);
896
897 Ok(q.fetch_all(&self.pool).await?)
898 }
899
900 #[tracing::instrument(name = "memory.store.message_timestamps", skip(self, ids), fields(count = ids.len()))]
908 pub async fn message_timestamps(
909 &self,
910 ids: &[MessageId],
911 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
912 if ids.is_empty() {
913 return Ok(std::collections::HashMap::new());
914 }
915
916 let placeholders: String =
917 zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
918 let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
919 let query = format!(
920 "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
921 );
922 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
923 for &id in ids {
924 q = q.bind(id);
925 }
926
927 let rows = q.fetch_all(&self.pool).await?;
928 Ok(rows.into_iter().collect())
929 }
930
931 pub async fn load_messages_range(
937 &self,
938 conversation_id: ConversationId,
939 after_message_id: MessageId,
940 limit: usize,
941 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
942 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
943
944 let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
945 "SELECT id, role, content FROM messages \
946 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
947 ORDER BY id ASC LIMIT ?"
948 ))
949 .bind(conversation_id)
950 .bind(after_message_id)
951 .bind(effective_limit)
952 .fetch_all(&self.pool)
953 .await?;
954
955 Ok(rows)
956 }
957
958 pub async fn get_eviction_candidates(
966 &self,
967 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
968 let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
969 "SELECT id, created_at, last_accessed, access_count \
970 FROM messages WHERE deleted_at IS NULL"
971 ))
972 .fetch_all(&self.pool)
973 .await?;
974
975 Ok(rows
976 .into_iter()
977 .map(
978 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
979 id,
980 created_at,
981 last_accessed,
982 access_count: access_count.try_into().unwrap_or(0),
983 },
984 )
985 .collect())
986 }
987
988 pub async fn soft_delete_messages(
996 &self,
997 ids: &[MessageId],
998 ) -> Result<(), crate::error::MemoryError> {
999 if ids.is_empty() {
1000 return Ok(());
1001 }
1002 for &id in ids {
1004 zeph_db::query(
1005 sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
1006 )
1007 .bind(id)
1008 .execute(&self.pool)
1009 .await?;
1010 }
1011 Ok(())
1012 }
1013
1014 pub async fn get_soft_deleted_message_ids(
1020 &self,
1021 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
1022 let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1023 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
1024 ))
1025 .fetch_all(&self.pool)
1026 .await?;
1027 Ok(rows.into_iter().map(|(id,)| id).collect())
1028 }
1029
1030 pub async fn filter_out_preserved_episode_ids(
1045 &self,
1046 candidate_ids: &[MessageId],
1047 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
1048 const MAX_BATCH: usize = 490;
1049 if candidate_ids.is_empty() {
1050 return Ok(Vec::new());
1051 }
1052 let mut safe_to_delete: Vec<MessageId> = Vec::with_capacity(candidate_ids.len());
1053 for chunk in candidate_ids.chunks(MAX_BATCH) {
1054 let placeholders = placeholder_list(1, chunk.len());
1055 let sql = format!(
1056 "SELECT m.id \
1057 FROM messages m \
1058 WHERE m.id IN ({placeholders}) \
1059 AND NOT EXISTS ( \
1060 SELECT 1 \
1061 FROM summaries s \
1062 WHERE s.first_message_id IS NOT NULL \
1063 AND s.last_message_id IS NOT NULL \
1064 AND m.id >= s.first_message_id \
1065 AND m.id <= s.last_message_id \
1066 )"
1067 );
1068 let mut q = zeph_db::query_as::<_, (MessageId,)>(&sql);
1069 for &id in chunk {
1070 q = q.bind(id);
1071 }
1072 let rows: Vec<(MessageId,)> = q.fetch_all(&self.pool).await?;
1073 safe_to_delete.extend(rows.into_iter().map(|(id,)| id));
1074 }
1075 Ok(safe_to_delete)
1076 }
1077
1078 pub async fn mark_qdrant_cleaned(
1084 &self,
1085 ids: &[MessageId],
1086 ) -> Result<(), crate::error::MemoryError> {
1087 for &id in ids {
1088 zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
1089 .bind(id)
1090 .execute(&self.pool)
1091 .await?;
1092 }
1093 Ok(())
1094 }
1095
1096 pub async fn fetch_importance_scores(
1104 &self,
1105 ids: &[MessageId],
1106 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
1107 if ids.is_empty() {
1108 return Ok(std::collections::HashMap::new());
1109 }
1110 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1111 let query = format!(
1112 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1113 );
1114 let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
1115 for &id in ids {
1116 q = q.bind(id);
1117 }
1118 let rows = q.fetch_all(&self.pool).await?;
1119 Ok(rows.into_iter().collect())
1120 }
1121
1122 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1130 if ids.is_empty() {
1131 return Ok(());
1132 }
1133 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1134 let query = format!(
1135 "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
1136 WHERE id IN ({placeholders})"
1137 );
1138 let mut q = zeph_db::query(&query);
1139 for &id in ids {
1140 q = q.bind(id);
1141 }
1142 q.execute(&self.pool).await?;
1143 Ok(())
1144 }
1145
1146 #[tracing::instrument(name = "memory.store.message_access_counts", skip(self, ids), fields(count = ids.len()))]
1157 pub async fn message_access_counts(
1158 &self,
1159 ids: &[MessageId],
1160 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
1161 if ids.is_empty() {
1162 return Ok(std::collections::HashMap::new());
1163 }
1164 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1165 let query = format!(
1166 "SELECT id, access_count FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1167 );
1168 let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
1169 for &id in ids {
1170 q = q.bind(id);
1171 }
1172 let rows = q.fetch_all(&self.pool).await?;
1173 Ok(rows.into_iter().collect())
1174 }
1175
1176 pub async fn find_promotion_candidates(
1185 &self,
1186 min_sessions: u32,
1187 batch_size: usize,
1188 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
1189 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
1190 let min = i64::from(min_sessions);
1191 let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1192 "SELECT id, conversation_id, content, session_count, importance_score \
1193 FROM messages \
1194 WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1195 ORDER BY session_count DESC, importance_score DESC \
1196 LIMIT ?"
1197 ))
1198 .bind(min)
1199 .bind(limit)
1200 .fetch_all(&self.pool)
1201 .await?;
1202
1203 Ok(rows
1204 .into_iter()
1205 .map(
1206 |(id, conversation_id, content, session_count, importance_score)| {
1207 PromotionCandidate {
1208 id,
1209 conversation_id,
1210 content,
1211 session_count: session_count.try_into().unwrap_or(0),
1212 importance_score,
1213 }
1214 },
1215 )
1216 .collect())
1217 }
1218
1219 pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1227 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1228 "SELECT tier, COUNT(*) FROM messages \
1229 WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1230 GROUP BY tier"
1231 ))
1232 .fetch_all(&self.pool)
1233 .await?;
1234
1235 let mut episodic = 0i64;
1236 let mut semantic = 0i64;
1237 for (tier, count) in rows {
1238 match tier.as_str() {
1239 "episodic" => episodic = count,
1240 "semantic" => semantic = count,
1241 _ => {}
1242 }
1243 }
1244 Ok((episodic, semantic))
1245 }
1246
1247 pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1253 let row: (i64,) = zeph_db::query_as(sql!(
1254 "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1255 ))
1256 .fetch_one(&self.pool)
1257 .await?;
1258 Ok(row.0)
1259 }
1260
1261 pub async fn promote_to_semantic(
1274 &self,
1275 conversation_id: ConversationId,
1276 merged_content: &str,
1277 original_ids: &[MessageId],
1278 ) -> Result<MessageId, MemoryError> {
1279 if original_ids.is_empty() {
1280 return Err(MemoryError::InvalidInput(
1281 "promote_to_semantic: original_ids must not be empty".into(),
1282 ));
1283 }
1284
1285 let mut tx = begin_write(&self.pool).await?;
1288
1289 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1291 let promote_insert_raw = format!(
1292 "INSERT INTO messages \
1293 (conversation_id, role, content, parts, visibility, \
1294 tier, promotion_timestamp) \
1295 VALUES (?, 'assistant', ?, '[]', 'agent_only', 'semantic', {epoch_now}) \
1296 RETURNING id"
1297 );
1298 let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1299 let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1300 .bind(conversation_id)
1301 .bind(merged_content)
1302 .fetch_one(&mut *tx)
1303 .await?;
1304
1305 let new_id = row.0;
1306
1307 for &id in original_ids {
1309 zeph_db::query(sql!(
1310 "UPDATE messages \
1311 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1312 WHERE id = ? AND deleted_at IS NULL"
1313 ))
1314 .bind(id)
1315 .execute(&mut *tx)
1316 .await?;
1317 }
1318
1319 tx.commit().await?;
1320 Ok(new_id)
1321 }
1322
1323 pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1332 if ids.is_empty() {
1333 return Ok(0);
1334 }
1335 let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1336 let manual_promote_raw = format!(
1337 "UPDATE messages \
1338 SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1339 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1340 );
1341 let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1342 let mut count = 0usize;
1343 for &id in ids {
1344 let result = zeph_db::query(&manual_promote_sql)
1345 .bind(id)
1346 .execute(&self.pool)
1347 .await?;
1348 count += usize::try_from(result.rows_affected()).unwrap_or(0);
1349 }
1350 Ok(count)
1351 }
1352
1353 pub async fn increment_session_counts_for_conversation(
1362 &self,
1363 conversation_id: ConversationId,
1364 ) -> Result<(), MemoryError> {
1365 zeph_db::query(sql!(
1366 "UPDATE messages SET session_count = session_count + 1 \
1367 WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1368 ))
1369 .bind(conversation_id)
1370 .execute(&self.pool)
1371 .await?;
1372 Ok(())
1373 }
1374
1375 #[tracing::instrument(name = "memory.store.fetch_tiers", skip(self, ids), fields(count = ids.len()))]
1383 pub async fn fetch_tiers(
1384 &self,
1385 ids: &[MessageId],
1386 ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1387 if ids.is_empty() {
1388 return Ok(std::collections::HashMap::new());
1389 }
1390 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1391 let query = format!(
1392 "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1393 );
1394 let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1395 for &id in ids {
1396 q = q.bind(id);
1397 }
1398 let rows = q.fetch_all(&self.pool).await?;
1399 Ok(rows.into_iter().collect())
1400 }
1401
1402 pub async fn conversations_with_unconsolidated_messages(
1410 &self,
1411 ) -> Result<Vec<ConversationId>, MemoryError> {
1412 let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1413 "SELECT DISTINCT conversation_id FROM messages \
1414 WHERE consolidated = 0 AND deleted_at IS NULL"
1415 ))
1416 .fetch_all(&self.pool)
1417 .await?;
1418 Ok(rows.into_iter().map(|(id,)| id).collect())
1419 }
1420
1421 pub async fn find_unconsolidated_messages(
1430 &self,
1431 conversation_id: ConversationId,
1432 limit: usize,
1433 ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1434 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1435 let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1436 "SELECT id, content FROM messages \
1437 WHERE conversation_id = ? \
1438 AND consolidated = 0 \
1439 AND deleted_at IS NULL \
1440 ORDER BY id ASC \
1441 LIMIT ?"
1442 ))
1443 .bind(conversation_id)
1444 .bind(limit)
1445 .fetch_all(&self.pool)
1446 .await?;
1447 Ok(rows)
1448 }
1449
1450 pub async fn find_consolidated_for_source(
1459 &self,
1460 source_id: MessageId,
1461 ) -> Result<Option<MessageId>, MemoryError> {
1462 let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1463 "SELECT consolidated_id FROM memory_consolidation_sources \
1464 WHERE source_id = ? \
1465 LIMIT 1"
1466 ))
1467 .bind(source_id)
1468 .fetch_optional(&self.pool)
1469 .await?;
1470 Ok(row.map(|(id,)| id))
1471 }
1472
1473 pub async fn apply_consolidation_merge(
1487 &self,
1488 conversation_id: ConversationId,
1489 role: &str,
1490 merged_content: &str,
1491 source_ids: &[MessageId],
1492 confidence: f32,
1493 confidence_threshold: f32,
1494 ) -> Result<bool, MemoryError> {
1495 if confidence < confidence_threshold {
1496 return Ok(false);
1497 }
1498 if source_ids.is_empty() {
1499 return Ok(false);
1500 }
1501
1502 let mut tx = self.pool.begin().await?;
1503
1504 let importance = crate::semantic::importance::compute_importance(merged_content, role);
1505 let row: (MessageId,) = zeph_db::query_as(sql!(
1506 "INSERT INTO messages \
1507 (conversation_id, role, content, parts, visibility, \
1508 importance_score, consolidated, consolidation_confidence) \
1509 VALUES (?, ?, ?, '[]', 'both', ?, 1, ?) \
1510 RETURNING id"
1511 ))
1512 .bind(conversation_id)
1513 .bind(role)
1514 .bind(merged_content)
1515 .bind(importance)
1516 .bind(confidence)
1517 .fetch_one(&mut *tx)
1518 .await?;
1519 let consolidated_id = row.0;
1520
1521 let consol_sql = format!(
1522 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1523 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1524 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1525 );
1526 for &source_id in source_ids {
1527 zeph_db::query(&consol_sql)
1528 .bind(consolidated_id)
1529 .bind(source_id)
1530 .execute(&mut *tx)
1531 .await?;
1532
1533 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1535 .bind(source_id)
1536 .execute(&mut *tx)
1537 .await?;
1538 }
1539
1540 tx.commit().await?;
1541 Ok(true)
1542 }
1543
1544 pub async fn apply_consolidation_update(
1557 &self,
1558 target_id: MessageId,
1559 new_content: &str,
1560 additional_source_ids: &[MessageId],
1561 confidence: f32,
1562 confidence_threshold: f32,
1563 ) -> Result<bool, MemoryError> {
1564 if confidence < confidence_threshold {
1565 return Ok(false);
1566 }
1567
1568 let mut tx = self.pool.begin().await?;
1569
1570 zeph_db::query(sql!(
1571 "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1572 ))
1573 .bind(new_content)
1574 .bind(confidence)
1575 .bind(target_id)
1576 .execute(&mut *tx)
1577 .await?;
1578
1579 let consol_sql = format!(
1580 "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1581 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1582 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1583 );
1584 for &source_id in additional_source_ids {
1585 zeph_db::query(&consol_sql)
1586 .bind(target_id)
1587 .bind(source_id)
1588 .execute(&mut *tx)
1589 .await?;
1590
1591 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1592 .bind(source_id)
1593 .execute(&mut *tx)
1594 .await?;
1595 }
1596
1597 tx.commit().await?;
1598 Ok(true)
1599 }
1600
1601 pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1611 zeph_db::query(sql!(
1612 "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1613 ))
1614 .bind(score)
1615 .bind(id)
1616 .execute(&self.pool)
1617 .await?;
1618 Ok(())
1619 }
1620
1621 pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1629 let row: Option<(f64,)> = zeph_db::query_as(sql!(
1630 "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1631 ))
1632 .bind(id)
1633 .fetch_optional(&self.pool)
1634 .await?;
1635 Ok(row.map(|(s,)| s))
1636 }
1637
1638 pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1646 self.increment_access_counts(ids).await
1647 }
1648
1649 pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1657 for &id in ids {
1658 zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1659 .bind(id)
1660 .execute(&self.pool)
1661 .await?;
1662 }
1663 Ok(())
1664 }
1665
1666 pub async fn run_forgetting_sweep_tx(
1682 &self,
1683 config: &zeph_common::config::memory::ForgettingConfig,
1684 ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1685 let mut tx = self.pool.begin().await?;
1686
1687 let decay = f64::from(config.decay_rate);
1688 let floor = f64::from(config.forgetting_floor);
1689 let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1690 let replay_hours = i64::from(config.replay_window_hours);
1691 let replay_min_access = i64::from(config.replay_min_access_count);
1692 let protect_hours = i64::from(config.protect_recent_hours);
1693 let protect_min_access = i64::from(config.protect_min_access_count);
1694
1695 let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1698 "SELECT id FROM messages \
1699 WHERE deleted_at IS NULL AND consolidated = 0 \
1700 ORDER BY importance_score ASC \
1701 LIMIT ?"
1702 ))
1703 .bind(batch)
1704 .fetch_all(&mut *tx)
1705 .await?;
1706
1707 #[allow(clippy::cast_possible_truncation)]
1708 let downscaled = candidate_ids.len() as u32;
1709
1710 if downscaled > 0 {
1711 let placeholders: String = candidate_ids
1712 .iter()
1713 .map(|_| "?")
1714 .collect::<Vec<_>>()
1715 .join(",");
1716 let downscale_sql = format!(
1717 "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1718 WHERE id IN ({placeholders})"
1719 );
1720 let mut q = zeph_db::query(&downscale_sql);
1721 for &(id,) in &candidate_ids {
1722 q = q.bind(id);
1723 }
1724 q.execute(&mut *tx).await?;
1725 }
1726
1727 let replayed = if downscaled > 0 {
1733 let replay_placeholders: String = candidate_ids
1734 .iter()
1735 .map(|_| "?")
1736 .collect::<Vec<_>>()
1737 .join(",");
1738 let replay_sql = format!(
1739 "UPDATE messages \
1740 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1741 WHERE id IN ({replay_placeholders}) \
1742 AND (\
1743 (last_accessed IS NOT NULL \
1744 AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1745 OR access_count >= ?\
1746 )"
1747 );
1748 let mut rq = zeph_db::query(&replay_sql);
1749 for &(id,) in &candidate_ids {
1750 rq = rq.bind(id);
1751 }
1752 let replay_result = rq
1753 .bind(replay_hours)
1754 .bind(replay_min_access)
1755 .execute(&mut *tx)
1756 .await?;
1757 #[allow(clippy::cast_possible_truncation)]
1758 let n = replay_result.rows_affected() as u32;
1759 n
1760 } else {
1761 0
1762 };
1763
1764 let prune_sql = format!(
1766 "UPDATE messages \
1767 SET deleted_at = CURRENT_TIMESTAMP \
1768 WHERE deleted_at IS NULL AND consolidated = 0 \
1769 AND importance_score < {floor} \
1770 AND (\
1771 last_accessed IS NULL \
1772 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1773 ) \
1774 AND access_count < ?"
1775 );
1776 let prune_result = zeph_db::query(&prune_sql)
1777 .bind(protect_hours)
1778 .bind(protect_min_access)
1779 .execute(&mut *tx)
1780 .await?;
1781 #[allow(clippy::cast_possible_truncation)]
1782 let pruned = prune_result.rows_affected() as u32;
1783
1784 tx.commit().await?;
1785
1786 Ok(crate::forgetting::ForgettingResult {
1787 downscaled,
1788 replayed,
1789 pruned,
1790 })
1791 }
1792}
1793
1794#[derive(Debug, Clone)]
1796pub struct PromotionCandidate {
1797 pub id: MessageId,
1798 pub conversation_id: ConversationId,
1799 pub content: String,
1800 pub session_count: u32,
1801 pub importance_score: f64,
1802}
1803
1804#[cfg(test)]
1805mod tests;