Skip to main content

zeph_memory/store/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_db::ActiveDialect;
5use zeph_db::fts::sanitize_fts_query;
6#[allow(unused_imports)]
7use zeph_db::sql;
8use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
9
10use super::SqliteStore;
11use crate::error::MemoryError;
12use crate::types::{ConversationId, MessageId};
13
14fn parse_role(s: &str) -> Role {
15    match s {
16        "assistant" => Role::Assistant,
17        "system" => Role::System,
18        _ => Role::User,
19    }
20}
21
22#[must_use]
23pub fn role_str(role: Role) -> &'static str {
24    match role {
25        Role::System => "system",
26        Role::User => "user",
27        Role::Assistant => "assistant",
28    }
29}
30
31/// Map a legacy externally-tagged variant key to the `kind` value used by the current
32/// internally-tagged schema.
33fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
34    match key {
35        "Text" => Some("text"),
36        "ToolOutput" => Some("tool_output"),
37        "Recall" => Some("recall"),
38        "CodeContext" => Some("code_context"),
39        "Summary" => Some("summary"),
40        "CrossSession" => Some("cross_session"),
41        "ToolUse" => Some("tool_use"),
42        "ToolResult" => Some("tool_result"),
43        "Image" => Some("image"),
44        "ThinkingBlock" => Some("thinking_block"),
45        "RedactedThinkingBlock" => Some("redacted_thinking_block"),
46        "Compaction" => Some("compaction"),
47        _ => None,
48    }
49}
50
51/// Attempt to parse a JSON string written in the pre-v0.17.1 externally-tagged format.
52///
53/// Old format: `[{"Summary":{"text":"..."}}, ...]`
54/// New format: `[{"kind":"summary","text":"..."}, ...]`
55///
56/// Returns `None` if the input does not look like the old format or if any element fails
57/// to deserialize after conversion.
58fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
59    let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
60    let mut result = Vec::with_capacity(array.len());
61    for element in array {
62        let obj = element.as_object()?;
63        if obj.contains_key("kind") {
64            return None;
65        }
66        if obj.len() != 1 {
67            return None;
68        }
69        let (key, inner) = obj.iter().next()?;
70        let kind = legacy_key_to_kind(key)?;
71        let mut new_obj = match inner {
72            serde_json::Value::Object(m) => m.clone(),
73            // Image variant wraps a single object directly
74            other => {
75                let mut m = serde_json::Map::new();
76                m.insert("data".to_string(), other.clone());
77                m
78            }
79        };
80        new_obj.insert(
81            "kind".to_string(),
82            serde_json::Value::String(kind.to_string()),
83        );
84        let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
85        result.push(part);
86    }
87    Some(result)
88}
89
90/// Deserialize message parts from a stored JSON string.
91///
92/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
93/// a truncated excerpt of the malformed JSON for diagnostics.
94fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
95    if parts_json == "[]" {
96        return vec![];
97    }
98    match serde_json::from_str(parts_json) {
99        Ok(p) => p,
100        Err(e) => {
101            if let Some(parts) = try_parse_legacy_parts(parts_json) {
102                let truncated = parts_json.chars().take(120).collect::<String>();
103                tracing::warn!(
104                    role = %role_str,
105                    parts_json = %truncated,
106                    "loaded legacy-format message parts via compat path"
107                );
108                return parts;
109            }
110            let truncated = parts_json.chars().take(120).collect::<String>();
111            tracing::warn!(
112                role = %role_str,
113                parts_json = %truncated,
114                error = %e,
115                "failed to deserialize message parts, falling back to empty"
116            );
117            vec![]
118        }
119    }
120}
121
122impl SqliteStore {
123    /// Create a new conversation and return its ID.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the insert fails.
128    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
129        let row: (ConversationId,) = zeph_db::query_as(sql!(
130            "INSERT INTO conversations DEFAULT VALUES RETURNING id"
131        ))
132        .fetch_one(&self.pool)
133        .await?;
134        Ok(row.0)
135    }
136
137    /// Save a message to the given conversation and return the message ID.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if the insert fails.
142    pub async fn save_message(
143        &self,
144        conversation_id: ConversationId,
145        role: &str,
146        content: &str,
147    ) -> Result<MessageId, MemoryError> {
148        self.save_message_with_parts(conversation_id, role, content, "[]")
149            .await
150    }
151
152    /// Save a message with structured parts JSON.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if the insert fails.
157    pub async fn save_message_with_parts(
158        &self,
159        conversation_id: ConversationId,
160        role: &str,
161        content: &str,
162        parts_json: &str,
163    ) -> Result<MessageId, MemoryError> {
164        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
165            .await
166    }
167
168    /// Save a message with visibility metadata.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if the insert fails.
173    pub async fn save_message_with_metadata(
174        &self,
175        conversation_id: ConversationId,
176        role: &str,
177        content: &str,
178        parts_json: &str,
179        agent_visible: bool,
180        user_visible: bool,
181    ) -> Result<MessageId, MemoryError> {
182        let importance_score = crate::semantic::importance::compute_importance(content, role);
183        let row: (MessageId,) = zeph_db::query_as(
184            sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
185             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
186        )
187        .bind(conversation_id)
188        .bind(role)
189        .bind(content)
190        .bind(parts_json)
191        .bind(i64::from(agent_visible))
192        .bind(i64::from(user_visible))
193        .bind(importance_score)
194        .fetch_one(&self.pool)
195        .await?;
196        Ok(row.0)
197    }
198
199    /// Load the most recent messages for a conversation, up to `limit`.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if the query fails.
204    pub async fn load_history(
205        &self,
206        conversation_id: ConversationId,
207        limit: u32,
208    ) -> Result<Vec<Message>, MemoryError> {
209        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
210            "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
211                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
212                WHERE conversation_id = ? AND deleted_at IS NULL \
213                ORDER BY id DESC \
214                LIMIT ?\
215             ) ORDER BY id ASC"
216        ))
217        .bind(conversation_id)
218        .bind(limit)
219        .fetch_all(&self.pool)
220        .await?;
221
222        let messages = rows
223            .into_iter()
224            .map(
225                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
226                    let parts = parse_parts_json(&role_str, &parts_json);
227                    Message {
228                        role: parse_role(&role_str),
229                        content,
230                        parts,
231                        metadata: MessageMetadata {
232                            agent_visible: agent_visible != 0,
233                            user_visible: user_visible != 0,
234                            compacted_at: None,
235                            deferred_summary: None,
236                            focus_pinned: false,
237                            focus_marker_id: None,
238                            db_id: Some(row_id),
239                        },
240                    }
241                },
242            )
243            .collect();
244        Ok(messages)
245    }
246
247    /// Load messages filtered by visibility flags.
248    ///
249    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the query fails.
254    pub async fn load_history_filtered(
255        &self,
256        conversation_id: ConversationId,
257        limit: u32,
258        agent_visible: Option<bool>,
259        user_visible: Option<bool>,
260    ) -> Result<Vec<Message>, MemoryError> {
261        let av = agent_visible.map(i64::from);
262        let uv = user_visible.map(i64::from);
263
264        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
265            sql!("WITH recent AS (\
266                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
267                WHERE conversation_id = ? \
268                  AND deleted_at IS NULL \
269                  AND (? IS NULL OR agent_visible = ?) \
270                  AND (? IS NULL OR user_visible = ?) \
271                ORDER BY id DESC \
272                LIMIT ?\
273             ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
274        )
275        .bind(conversation_id)
276        .bind(av)
277        .bind(av)
278        .bind(uv)
279        .bind(uv)
280        .bind(limit)
281        .fetch_all(&self.pool)
282        .await?;
283
284        let messages = rows
285            .into_iter()
286            .map(
287                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
288                    let parts = parse_parts_json(&role_str, &parts_json);
289                    Message {
290                        role: parse_role(&role_str),
291                        content,
292                        parts,
293                        metadata: MessageMetadata {
294                            agent_visible: agent_visible != 0,
295                            user_visible: user_visible != 0,
296                            compacted_at: None,
297                            deferred_summary: None,
298                            focus_pinned: false,
299                            focus_marker_id: None,
300                            db_id: Some(row_id),
301                        },
302                    }
303                },
304            )
305            .collect();
306        Ok(messages)
307    }
308
309    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
310    ///
311    /// Within a single transaction:
312    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
313    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
314    ///
315    /// Returns the `MessageId` of the inserted summary.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if the transaction fails.
320    pub async fn replace_conversation(
321        &self,
322        conversation_id: ConversationId,
323        compacted_range: std::ops::RangeInclusive<MessageId>,
324        summary_role: &str,
325        summary_content: &str,
326    ) -> Result<MessageId, MemoryError> {
327        let now = {
328            let secs = std::time::SystemTime::now()
329                .duration_since(std::time::UNIX_EPOCH)
330                .unwrap_or_default()
331                .as_secs();
332            format!("{secs}")
333        };
334        let start_id = compacted_range.start().0;
335        let end_id = compacted_range.end().0;
336
337        let mut tx = self.pool.begin().await?;
338
339        zeph_db::query(sql!(
340            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
341             WHERE conversation_id = ? AND id >= ? AND id <= ?"
342        ))
343        .bind(&now)
344        .bind(conversation_id)
345        .bind(start_id)
346        .bind(end_id)
347        .execute(&mut *tx)
348        .await?;
349
350        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
351        let row: (MessageId,) = zeph_db::query_as(sql!(
352            "INSERT INTO messages \
353             (conversation_id, role, content, parts, agent_visible, user_visible) \
354             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
355        ))
356        .bind(conversation_id)
357        .bind(summary_role)
358        .bind(summary_content)
359        .fetch_one(&mut *tx)
360        .await?;
361
362        tx.commit().await?;
363
364        Ok(row.0)
365    }
366
367    /// Atomically hide `tool_use/tool_result` message pairs and insert summary messages.
368    ///
369    /// Within a single transaction:
370    /// 1. Sets `agent_visible=0, compacted_at=<now>` for each ID in `hide_ids`.
371    /// 2. Inserts each text in `summaries` as a new agent-only assistant message.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if the transaction fails.
376    pub async fn apply_tool_pair_summaries(
377        &self,
378        conversation_id: ConversationId,
379        hide_ids: &[i64],
380        summaries: &[String],
381    ) -> Result<(), MemoryError> {
382        if hide_ids.is_empty() && summaries.is_empty() {
383            return Ok(());
384        }
385
386        let now = std::time::SystemTime::now()
387            .duration_since(std::time::UNIX_EPOCH)
388            .unwrap_or_default()
389            .as_secs()
390            .to_string();
391
392        let mut tx = self.pool.begin().await?;
393
394        for &id in hide_ids {
395            zeph_db::query(sql!(
396                "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
397            ))
398            .bind(&now)
399            .bind(id)
400            .execute(&mut *tx)
401            .await?;
402        }
403
404        for summary in summaries {
405            let content = format!("[tool summary] {summary}");
406            let parts = serde_json::to_string(&[MessagePart::Summary {
407                text: summary.clone(),
408            }])
409            .unwrap_or_else(|_| "[]".to_string());
410            zeph_db::query(sql!(
411                "INSERT INTO messages \
412                 (conversation_id, role, content, parts, agent_visible, user_visible) \
413                 VALUES (?, 'assistant', ?, ?, 1, 0)"
414            ))
415            .bind(conversation_id)
416            .bind(&content)
417            .bind(&parts)
418            .execute(&mut *tx)
419            .await?;
420        }
421
422        tx.commit().await?;
423        Ok(())
424    }
425
426    /// Return the IDs of the N oldest messages in a conversation (ascending order).
427    ///
428    /// # Errors
429    ///
430    /// Returns an error if the query fails.
431    pub async fn oldest_message_ids(
432        &self,
433        conversation_id: ConversationId,
434        n: u32,
435    ) -> Result<Vec<MessageId>, MemoryError> {
436        let rows: Vec<(MessageId,)> = zeph_db::query_as(
437            sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
438        )
439        .bind(conversation_id)
440        .bind(n)
441        .fetch_all(&self.pool)
442        .await?;
443        Ok(rows.into_iter().map(|r| r.0).collect())
444    }
445
446    /// Return the ID of the most recent conversation, if any.
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if the query fails.
451    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
452        let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
453            "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
454        ))
455        .fetch_optional(&self.pool)
456        .await?;
457        Ok(row.map(|r| r.0))
458    }
459
460    /// Fetch a single message by its ID.
461    ///
462    /// # Errors
463    ///
464    /// Returns an error if the query fails.
465    pub async fn message_by_id(
466        &self,
467        message_id: MessageId,
468    ) -> Result<Option<Message>, MemoryError> {
469        let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
470            sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
471        )
472        .bind(message_id)
473        .fetch_optional(&self.pool)
474        .await?;
475
476        Ok(row.map(
477            |(role_str, content, parts_json, agent_visible, user_visible)| {
478                let parts = parse_parts_json(&role_str, &parts_json);
479                Message {
480                    role: parse_role(&role_str),
481                    content,
482                    parts,
483                    metadata: MessageMetadata {
484                        agent_visible: agent_visible != 0,
485                        user_visible: user_visible != 0,
486                        compacted_at: None,
487                        deferred_summary: None,
488                        focus_pinned: false,
489                        focus_marker_id: None,
490                        db_id: None,
491                    },
492                }
493            },
494        ))
495    }
496
497    /// Fetch messages by a list of IDs in a single query.
498    ///
499    /// # Errors
500    ///
501    /// Returns an error if the query fails.
502    pub async fn messages_by_ids(
503        &self,
504        ids: &[MessageId],
505    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
506        if ids.is_empty() {
507            return Ok(Vec::new());
508        }
509
510        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
511
512        let query = format!(
513            "SELECT id, role, content, parts FROM messages \
514             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
515        );
516        let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
517        for &id in ids {
518            q = q.bind(id);
519        }
520
521        let rows = q.fetch_all(&self.pool).await?;
522
523        Ok(rows
524            .into_iter()
525            .map(|(id, role_str, content, parts_json)| {
526                let parts = parse_parts_json(&role_str, &parts_json);
527                (
528                    id,
529                    Message {
530                        role: parse_role(&role_str),
531                        content,
532                        parts,
533                        metadata: MessageMetadata::default(),
534                    },
535                )
536            })
537            .collect())
538    }
539
540    /// Return message IDs and content for messages without embeddings.
541    ///
542    /// # Errors
543    ///
544    /// Returns an error if the query fails.
545    pub async fn unembedded_message_ids(
546        &self,
547        limit: Option<usize>,
548    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
549        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
550
551        let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
552            "SELECT m.id, m.conversation_id, m.role, m.content \
553             FROM messages m \
554             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
555             WHERE em.id IS NULL AND m.deleted_at IS NULL \
556             ORDER BY m.id ASC \
557             LIMIT ?"
558        ))
559        .bind(effective_limit)
560        .fetch_all(&self.pool)
561        .await?;
562
563        Ok(rows)
564    }
565
566    /// Count the number of messages in a conversation.
567    ///
568    /// # Errors
569    ///
570    /// Returns an error if the query fails.
571    pub async fn count_messages(
572        &self,
573        conversation_id: ConversationId,
574    ) -> Result<i64, MemoryError> {
575        let row: (i64,) = zeph_db::query_as(sql!(
576            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
577        ))
578        .bind(conversation_id)
579        .fetch_one(&self.pool)
580        .await?;
581        Ok(row.0)
582    }
583
584    /// Count messages in a conversation with id greater than `after_id`.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the query fails.
589    pub async fn count_messages_after(
590        &self,
591        conversation_id: ConversationId,
592        after_id: MessageId,
593    ) -> Result<i64, MemoryError> {
594        let row: (i64,) =
595            zeph_db::query_as(
596                sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
597            )
598            .bind(conversation_id)
599            .bind(after_id)
600            .fetch_one(&self.pool)
601            .await?;
602        Ok(row.0)
603    }
604
605    /// Full-text keyword search over messages using FTS5.
606    ///
607    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
608    /// negated to positive for consistency with vector scores).
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the query fails.
613    pub async fn keyword_search(
614        &self,
615        query: &str,
616        limit: usize,
617        conversation_id: Option<ConversationId>,
618    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
619        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
620        let safe_query = sanitize_fts_query(query);
621        if safe_query.is_empty() {
622            return Ok(Vec::new());
623        }
624
625        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
626            zeph_db::query_as(
627                sql!("SELECT m.id, -rank AS score \
628                 FROM messages_fts f \
629                 JOIN messages m ON m.id = f.rowid \
630                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
631                 ORDER BY rank \
632                 LIMIT ?"),
633            )
634            .bind(&safe_query)
635            .bind(cid)
636            .bind(effective_limit)
637            .fetch_all(&self.pool)
638            .await?
639        } else {
640            zeph_db::query_as(sql!(
641                "SELECT m.id, -rank AS score \
642                 FROM messages_fts f \
643                 JOIN messages m ON m.id = f.rowid \
644                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
645                 ORDER BY rank \
646                 LIMIT ?"
647            ))
648            .bind(&safe_query)
649            .bind(effective_limit)
650            .fetch_all(&self.pool)
651            .await?
652        };
653
654        Ok(rows)
655    }
656
657    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
658    ///
659    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
660    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
661    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
662    ///
663    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
664    /// `None` means "no bound" on that side.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if the query fails.
669    pub async fn keyword_search_with_time_range(
670        &self,
671        query: &str,
672        limit: usize,
673        conversation_id: Option<ConversationId>,
674        after: Option<&str>,
675        before: Option<&str>,
676    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
677        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
678        let safe_query = sanitize_fts_query(query);
679        if safe_query.is_empty() {
680            return Ok(Vec::new());
681        }
682
683        // Build time-range clauses dynamically. Both bounds are optional.
684        let after_clause = if after.is_some() {
685            " AND m.created_at > ?"
686        } else {
687            ""
688        };
689        let before_clause = if before.is_some() {
690            " AND m.created_at < ?"
691        } else {
692            ""
693        };
694        let conv_clause = if conversation_id.is_some() {
695            " AND m.conversation_id = ?"
696        } else {
697            ""
698        };
699
700        let sql = format!(
701            "SELECT m.id, -rank AS score \
702             FROM messages_fts f \
703             JOIN messages m ON m.id = f.rowid \
704             WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
705             {after_clause}{before_clause}{conv_clause} \
706             ORDER BY rank \
707             LIMIT ?"
708        );
709
710        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
711        if let Some(a) = after {
712            q = q.bind(a);
713        }
714        if let Some(b) = before {
715            q = q.bind(b);
716        }
717        if let Some(cid) = conversation_id {
718            q = q.bind(cid);
719        }
720        q = q.bind(effective_limit);
721
722        Ok(q.fetch_all(&self.pool).await?)
723    }
724
725    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
726    ///
727    /// Messages without a `created_at` column fall back to 0.
728    ///
729    /// # Errors
730    ///
731    /// Returns an error if the query fails.
732    pub async fn message_timestamps(
733        &self,
734        ids: &[MessageId],
735    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
736        if ids.is_empty() {
737            return Ok(std::collections::HashMap::new());
738        }
739
740        let placeholders: String =
741            zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
742        let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
743        let query = format!(
744            "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
745        );
746        let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
747        for &id in ids {
748            q = q.bind(id);
749        }
750
751        let rows = q.fetch_all(&self.pool).await?;
752        Ok(rows.into_iter().collect())
753    }
754
755    /// Load a range of messages after a given message ID.
756    ///
757    /// # Errors
758    ///
759    /// Returns an error if the query fails.
760    pub async fn load_messages_range(
761        &self,
762        conversation_id: ConversationId,
763        after_message_id: MessageId,
764        limit: usize,
765    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
766        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
767
768        let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
769            "SELECT id, role, content FROM messages \
770             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
771             ORDER BY id ASC LIMIT ?"
772        ))
773        .bind(conversation_id)
774        .bind(after_message_id)
775        .bind(effective_limit)
776        .fetch_all(&self.pool)
777        .await?;
778
779        Ok(rows)
780    }
781
782    // ── Eviction helpers ──────────────────────────────────────────────────────
783
784    /// Return all non-deleted message IDs with their eviction metadata.
785    ///
786    /// # Errors
787    ///
788    /// Returns an error if the query fails.
789    pub async fn get_eviction_candidates(
790        &self,
791    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
792        let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
793            "SELECT id, created_at, last_accessed, access_count \
794             FROM messages WHERE deleted_at IS NULL"
795        ))
796        .fetch_all(&self.pool)
797        .await?;
798
799        Ok(rows
800            .into_iter()
801            .map(
802                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
803                    id,
804                    created_at,
805                    last_accessed,
806                    access_count: access_count.try_into().unwrap_or(0),
807                },
808            )
809            .collect())
810    }
811
812    /// Soft-delete a set of messages by marking `deleted_at`.
813    ///
814    /// Soft-deleted messages are excluded from all history queries.
815    ///
816    /// # Errors
817    ///
818    /// Returns an error if the update fails.
819    pub async fn soft_delete_messages(
820        &self,
821        ids: &[MessageId],
822    ) -> Result<(), crate::error::MemoryError> {
823        if ids.is_empty() {
824            return Ok(());
825        }
826        // SQLite does not support array binding natively. Batch via individual updates.
827        for &id in ids {
828            zeph_db::query(
829                sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
830            )
831            .bind(id)
832            .execute(&self.pool)
833            .await?;
834        }
835        Ok(())
836    }
837
838    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if the query fails.
843    pub async fn get_soft_deleted_message_ids(
844        &self,
845    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
846        let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
847            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
848        ))
849        .fetch_all(&self.pool)
850        .await?;
851        Ok(rows.into_iter().map(|(id,)| id).collect())
852    }
853
854    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
855    ///
856    /// # Errors
857    ///
858    /// Returns an error if the update fails.
859    pub async fn mark_qdrant_cleaned(
860        &self,
861        ids: &[MessageId],
862    ) -> Result<(), crate::error::MemoryError> {
863        for &id in ids {
864            zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
865                .bind(id)
866                .execute(&self.pool)
867                .await?;
868        }
869        Ok(())
870    }
871
872    /// Fetch `importance_score` values for the given message IDs.
873    ///
874    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
875    ///
876    /// # Errors
877    ///
878    /// Returns an error if the query fails.
879    pub async fn fetch_importance_scores(
880        &self,
881        ids: &[MessageId],
882    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
883        if ids.is_empty() {
884            return Ok(std::collections::HashMap::new());
885        }
886        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
887        let query = format!(
888            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
889        );
890        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
891        for &id in ids {
892            q = q.bind(id);
893        }
894        let rows = q.fetch_all(&self.pool).await?;
895        Ok(rows.into_iter().collect())
896    }
897
898    /// Increment `access_count` and set `last_accessed = CURRENT_TIMESTAMP` for the given IDs.
899    ///
900    /// Skips the update when `ids` is empty.
901    ///
902    /// # Errors
903    ///
904    /// Returns an error if the update fails.
905    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
906        if ids.is_empty() {
907            return Ok(());
908        }
909        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
910        let query = format!(
911            "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
912             WHERE id IN ({placeholders})"
913        );
914        let mut q = zeph_db::query(&query);
915        for &id in ids {
916            q = q.bind(id);
917        }
918        q.execute(&self.pool).await?;
919        Ok(())
920    }
921
922    // ── Tier promotion helpers ─────────────────────────────────────────────────
923
924    /// Return episodic messages with `session_count >= min_sessions`, ordered by
925    /// session count descending then importance score descending.
926    ///
927    /// # Errors
928    ///
929    /// Returns an error if the query fails.
930    pub async fn find_promotion_candidates(
931        &self,
932        min_sessions: u32,
933        batch_size: usize,
934    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
935        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
936        let min = i64::from(min_sessions);
937        let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
938            "SELECT id, conversation_id, content, session_count, importance_score \
939             FROM messages \
940             WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
941             ORDER BY session_count DESC, importance_score DESC \
942             LIMIT ?"
943        ))
944        .bind(min)
945        .bind(limit)
946        .fetch_all(&self.pool)
947        .await?;
948
949        Ok(rows
950            .into_iter()
951            .map(
952                |(id, conversation_id, content, session_count, importance_score)| {
953                    PromotionCandidate {
954                        id,
955                        conversation_id,
956                        content,
957                        session_count: session_count.try_into().unwrap_or(0),
958                        importance_score,
959                    }
960                },
961            )
962            .collect())
963    }
964
965    /// Count messages per tier (episodic, semantic) that are not deleted.
966    ///
967    /// Returns `(episodic_count, semantic_count)`.
968    ///
969    /// # Errors
970    ///
971    /// Returns an error if the query fails.
972    pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
973        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
974            "SELECT tier, COUNT(*) FROM messages \
975             WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
976             GROUP BY tier"
977        ))
978        .fetch_all(&self.pool)
979        .await?;
980
981        let mut episodic = 0i64;
982        let mut semantic = 0i64;
983        for (tier, count) in rows {
984            match tier.as_str() {
985                "episodic" => episodic = count,
986                "semantic" => semantic = count,
987                _ => {}
988            }
989        }
990        Ok((episodic, semantic))
991    }
992
993    /// Count semantic facts (tier='semantic', not deleted).
994    ///
995    /// # Errors
996    ///
997    /// Returns an error if the query fails.
998    pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
999        let row: (i64,) = zeph_db::query_as(sql!(
1000            "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1001        ))
1002        .fetch_one(&self.pool)
1003        .await?;
1004        Ok(row.0)
1005    }
1006
1007    /// Promote a set of episodic messages to semantic tier in a single transaction.
1008    ///
1009    /// Within one transaction:
1010    /// 1. Inserts a new message with `tier='semantic'` and `promotion_timestamp=unixepoch()`.
1011    /// 2. Soft-deletes the original episodic messages and marks them `qdrant_cleaned=0`
1012    ///    so the eviction sweep picks up their Qdrant vectors.
1013    ///
1014    /// Returns the `MessageId` of the new semantic message.
1015    ///
1016    /// # Errors
1017    ///
1018    /// Returns an error if the transaction fails.
1019    pub async fn promote_to_semantic(
1020        &self,
1021        conversation_id: ConversationId,
1022        merged_content: &str,
1023        original_ids: &[MessageId],
1024    ) -> Result<MessageId, MemoryError> {
1025        if original_ids.is_empty() {
1026            return Err(MemoryError::Other(
1027                "promote_to_semantic: original_ids must not be empty".into(),
1028            ));
1029        }
1030
1031        let mut tx = self.pool.begin().await?;
1032
1033        // Insert the new semantic fact.
1034        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1035        let promote_insert_raw = format!(
1036            "INSERT INTO messages \
1037             (conversation_id, role, content, parts, agent_visible, user_visible, \
1038              tier, promotion_timestamp) \
1039             VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1040             RETURNING id"
1041        );
1042        let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1043        let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1044            .bind(conversation_id)
1045            .bind(merged_content)
1046            .fetch_one(&mut *tx)
1047            .await?;
1048
1049        let new_id = row.0;
1050
1051        // Soft-delete originals and reset qdrant_cleaned so eviction sweep removes vectors.
1052        for &id in original_ids {
1053            zeph_db::query(sql!(
1054                "UPDATE messages \
1055                 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1056                 WHERE id = ? AND deleted_at IS NULL"
1057            ))
1058            .bind(id)
1059            .execute(&mut *tx)
1060            .await?;
1061        }
1062
1063        tx.commit().await?;
1064        Ok(new_id)
1065    }
1066
1067    /// Manually promote a set of messages to semantic tier without merging.
1068    ///
1069    /// Sets `tier='semantic'` and `promotion_timestamp=unixepoch()` for the given IDs.
1070    /// Does NOT soft-delete the originals — use this for direct user-requested promotion.
1071    ///
1072    /// # Errors
1073    ///
1074    /// Returns an error if the update fails.
1075    pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1076        if ids.is_empty() {
1077            return Ok(0);
1078        }
1079        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1080        let manual_promote_raw = format!(
1081            "UPDATE messages \
1082             SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1083             WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1084        );
1085        let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1086        let mut count = 0usize;
1087        for &id in ids {
1088            let result = zeph_db::query(&manual_promote_sql)
1089                .bind(id)
1090                .execute(&self.pool)
1091                .await?;
1092            count += usize::try_from(result.rows_affected()).unwrap_or(0);
1093        }
1094        Ok(count)
1095    }
1096
1097    /// Increment `session_count` for all episodic messages in a conversation.
1098    ///
1099    /// Called when a session restores an existing conversation to mark that messages
1100    /// were accessed in a new session. Only episodic (non-deleted) messages are updated.
1101    ///
1102    /// # Errors
1103    ///
1104    /// Returns an error if the database update fails.
1105    pub async fn increment_session_counts_for_conversation(
1106        &self,
1107        conversation_id: ConversationId,
1108    ) -> Result<(), MemoryError> {
1109        zeph_db::query(sql!(
1110            "UPDATE messages SET session_count = session_count + 1 \
1111             WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1112        ))
1113        .bind(conversation_id)
1114        .execute(&self.pool)
1115        .await?;
1116        Ok(())
1117    }
1118
1119    /// Fetch the tier string for each of the given message IDs.
1120    ///
1121    /// Messages not found or already deleted are omitted from the result.
1122    ///
1123    /// # Errors
1124    ///
1125    /// Returns an error if the query fails.
1126    pub async fn fetch_tiers(
1127        &self,
1128        ids: &[MessageId],
1129    ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1130        if ids.is_empty() {
1131            return Ok(std::collections::HashMap::new());
1132        }
1133        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1134        let query = format!(
1135            "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1136        );
1137        let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1138        for &id in ids {
1139            q = q.bind(id);
1140        }
1141        let rows = q.fetch_all(&self.pool).await?;
1142        Ok(rows.into_iter().collect())
1143    }
1144
1145    /// Return all conversation IDs that have at least one non-consolidated original message.
1146    ///
1147    /// Used by the consolidation sweep to find conversations that need processing.
1148    ///
1149    /// # Errors
1150    ///
1151    /// Returns an error if the database query fails.
1152    pub async fn conversations_with_unconsolidated_messages(
1153        &self,
1154    ) -> Result<Vec<ConversationId>, MemoryError> {
1155        let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1156            "SELECT DISTINCT conversation_id FROM messages \
1157             WHERE consolidated = 0 AND deleted_at IS NULL"
1158        ))
1159        .fetch_all(&self.pool)
1160        .await?;
1161        Ok(rows.into_iter().map(|(id,)| id).collect())
1162    }
1163
1164    /// Fetch a batch of non-consolidated original messages for the consolidation sweep.
1165    ///
1166    /// Returns `(id, content)` pairs for messages that have not yet been processed by the
1167    /// consolidation loop (`consolidated = 0`) and are not soft-deleted.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error if the database query fails.
1172    pub async fn find_unconsolidated_messages(
1173        &self,
1174        conversation_id: ConversationId,
1175        limit: usize,
1176    ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1177        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1178        let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1179            "SELECT id, content FROM messages \
1180             WHERE conversation_id = ? \
1181               AND consolidated = 0 \
1182               AND deleted_at IS NULL \
1183             ORDER BY id ASC \
1184             LIMIT ?"
1185        ))
1186        .bind(conversation_id)
1187        .bind(limit)
1188        .fetch_all(&self.pool)
1189        .await?;
1190        Ok(rows)
1191    }
1192
1193    /// Look up which consolidated entry (if any) covers the given original message.
1194    ///
1195    /// Returns the `consolidated_id` of the first consolidation product that lists `source_id`
1196    /// in its sources, or `None` if no consolidated entry covers it.
1197    ///
1198    /// # Errors
1199    ///
1200    /// Returns an error if the database query fails.
1201    pub async fn find_consolidated_for_source(
1202        &self,
1203        source_id: MessageId,
1204    ) -> Result<Option<MessageId>, MemoryError> {
1205        let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1206            "SELECT consolidated_id FROM memory_consolidation_sources \
1207             WHERE source_id = ? \
1208             LIMIT 1"
1209        ))
1210        .bind(source_id)
1211        .fetch_optional(&self.pool)
1212        .await?;
1213        Ok(row.map(|(id,)| id))
1214    }
1215
1216    /// Insert a consolidated message and record its source linkage in a single transaction.
1217    ///
1218    /// Atomically:
1219    /// 1. Inserts the consolidated message with `consolidated = 1` and the given confidence.
1220    /// 2. Inserts rows into `memory_consolidation_sources` for each source ID.
1221    /// 3. Marks each source message's `consolidated = 1` so future sweeps skip them.
1222    ///
1223    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1224    ///
1225    /// # Errors
1226    ///
1227    /// Returns an error if any database operation fails. The transaction is rolled back automatically
1228    /// on failure so no partial state is written.
1229    pub async fn apply_consolidation_merge(
1230        &self,
1231        conversation_id: ConversationId,
1232        role: &str,
1233        merged_content: &str,
1234        source_ids: &[MessageId],
1235        confidence: f32,
1236        confidence_threshold: f32,
1237    ) -> Result<bool, MemoryError> {
1238        if confidence < confidence_threshold {
1239            return Ok(false);
1240        }
1241        if source_ids.is_empty() {
1242            return Ok(false);
1243        }
1244
1245        let mut tx = self.pool.begin().await?;
1246
1247        let importance = crate::semantic::importance::compute_importance(merged_content, role);
1248        let row: (MessageId,) = zeph_db::query_as(sql!(
1249            "INSERT INTO messages \
1250               (conversation_id, role, content, parts, agent_visible, user_visible, \
1251                importance_score, consolidated, consolidation_confidence) \
1252             VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1253             RETURNING id"
1254        ))
1255        .bind(conversation_id)
1256        .bind(role)
1257        .bind(merged_content)
1258        .bind(importance)
1259        .bind(confidence)
1260        .fetch_one(&mut *tx)
1261        .await?;
1262        let consolidated_id = row.0;
1263
1264        let consol_sql = format!(
1265            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1266            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1267            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1268        );
1269        for &source_id in source_ids {
1270            zeph_db::query(&consol_sql)
1271                .bind(consolidated_id)
1272                .bind(source_id)
1273                .execute(&mut *tx)
1274                .await?;
1275
1276            // Mark original as consolidated so future sweeps skip it.
1277            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1278                .bind(source_id)
1279                .execute(&mut *tx)
1280                .await?;
1281        }
1282
1283        tx.commit().await?;
1284        Ok(true)
1285    }
1286}
1287
1288/// A candidate message for tier promotion, returned by [`SqliteStore::find_promotion_candidates`].
1289#[derive(Debug, Clone)]
1290pub struct PromotionCandidate {
1291    pub id: MessageId,
1292    pub conversation_id: ConversationId,
1293    pub content: String,
1294    pub session_count: u32,
1295    pub importance_score: f64,
1296}
1297
1298#[cfg(test)]
1299mod tests;