Skip to main content

zeph_memory/store/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, placeholder_list, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, MessageVisibility, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18    match s {
19        "assistant" => Role::Assistant,
20        "system" => Role::System,
21        _ => Role::User,
22    }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27    match role {
28        Role::System => "system",
29        Role::User => "user",
30        Role::Assistant => "assistant",
31    }
32}
33
34/// Map a legacy externally-tagged variant key to the `kind` value used by the current
35/// internally-tagged schema.
36fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37    match key {
38        "Text" => Some("text"),
39        "ToolOutput" => Some("tool_output"),
40        "Recall" => Some("recall"),
41        "CodeContext" => Some("code_context"),
42        "Summary" => Some("summary"),
43        "CrossSession" => Some("cross_session"),
44        "ToolUse" => Some("tool_use"),
45        "ToolResult" => Some("tool_result"),
46        "Image" => Some("image"),
47        "ThinkingBlock" => Some("thinking_block"),
48        "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49        "Compaction" => Some("compaction"),
50        _ => None,
51    }
52}
53
54/// Attempt to parse a JSON string written in the pre-v0.17.1 externally-tagged format.
55///
56/// Old format: `[{"Summary":{"text":"..."}}, ...]`
57/// New format: `[{"kind":"summary","text":"..."}, ...]`
58///
59/// Returns `None` if the input does not look like the old format or if any element fails
60/// to deserialize after conversion.
61fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62    let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63    let mut result = Vec::with_capacity(array.len());
64    for element in array {
65        let obj = element.as_object()?;
66        if obj.contains_key("kind") {
67            return None;
68        }
69        if obj.len() != 1 {
70            return None;
71        }
72        let (key, inner) = obj.iter().next()?;
73        let kind = legacy_key_to_kind(key)?;
74        let mut new_obj = match inner {
75            serde_json::Value::Object(m) => m.clone(),
76            // Image variant wraps a single object directly
77            other => {
78                let mut m = serde_json::Map::new();
79                m.insert("data".to_string(), other.clone());
80                m
81            }
82        };
83        new_obj.insert(
84            "kind".to_string(),
85            serde_json::Value::String(kind.to_string()),
86        );
87        let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88        result.push(part);
89    }
90    Some(result)
91}
92
93/// Deserialize message parts from a stored JSON string.
94///
95/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
96/// a truncated excerpt of the malformed JSON for diagnostics.
97fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98    if parts_json == "[]" {
99        return vec![];
100    }
101    match serde_json::from_str(parts_json) {
102        Ok(p) => p,
103        Err(e) => {
104            if let Some(parts) = try_parse_legacy_parts(parts_json) {
105                let truncated = parts_json.chars().take(120).collect::<String>();
106                tracing::warn!(
107                    role = %role_str,
108                    parts_json = %truncated,
109                    "loaded legacy-format message parts via compat path"
110                );
111                return parts;
112            }
113            let truncated = parts_json.chars().take(120).collect::<String>();
114            tracing::warn!(
115                role = %role_str,
116                parts_json = %truncated,
117                error = %e,
118                "failed to deserialize message parts, falling back to empty"
119            );
120            vec![]
121        }
122    }
123}
124
125impl SqliteStore {
126    /// Create a new conversation and return its ID.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the insert fails.
131    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132        let row: (ConversationId,) = zeph_db::query_as(sql!(
133            "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134        ))
135        .fetch_one(&self.pool)
136        .await?;
137        Ok(row.0)
138    }
139
140    /// Save a message to the given conversation and return the message ID.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the insert fails.
145    pub async fn save_message(
146        &self,
147        conversation_id: ConversationId,
148        role: &str,
149        content: &str,
150    ) -> Result<MessageId, MemoryError> {
151        self.save_message_with_parts(conversation_id, role, content, "[]")
152            .await
153    }
154
155    /// Save a message with structured parts JSON.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the insert fails.
160    pub async fn save_message_with_parts(
161        &self,
162        conversation_id: ConversationId,
163        role: &str,
164        content: &str,
165        parts_json: &str,
166    ) -> Result<MessageId, MemoryError> {
167        self.save_message_with_metadata(
168            conversation_id,
169            role,
170            content,
171            parts_json,
172            MessageVisibility::Both,
173        )
174        .await
175    }
176
177    /// Save a message with an optional category tag.
178    ///
179    /// The `category` column is NULL when `None` — existing rows are unaffected.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if the insert fails.
184    pub async fn save_message_with_category(
185        &self,
186        conversation_id: ConversationId,
187        role: &str,
188        content: &str,
189        category: Option<&str>,
190    ) -> Result<MessageId, MemoryError> {
191        let importance_score = crate::semantic::importance::compute_importance(content, role);
192        let row: (MessageId,) = zeph_db::query_as(sql!(
193            "INSERT INTO messages \
194                 (conversation_id, role, content, parts, visibility, \
195                  importance_score, category) \
196                 VALUES (?, ?, ?, '[]', 'both', ?, ?) RETURNING id"
197        ))
198        .bind(conversation_id)
199        .bind(role)
200        .bind(content)
201        .bind(importance_score)
202        .bind(category)
203        .fetch_one(&self.pool)
204        .await?;
205        Ok(row.0)
206    }
207
208    /// Save a message with visibility metadata.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the insert fails.
213    pub async fn save_message_with_metadata(
214        &self,
215        conversation_id: ConversationId,
216        role: &str,
217        content: &str,
218        parts_json: &str,
219        visibility: MessageVisibility,
220    ) -> Result<MessageId, MemoryError> {
221        const MAX_BYTES: usize = 100 * 1024;
222
223        // Truncate plain-text content only. `parts_json` is skipped because a
224        // mid-byte cut produces invalid JSON that breaks downstream deserialization.
225        let content_cow: std::borrow::Cow<'_, str> = if content.len() > MAX_BYTES {
226            let boundary = content.floor_char_boundary(MAX_BYTES);
227            tracing::debug!(
228                original_bytes = content.len(),
229                "save_message: content exceeds 100KB, truncating"
230            );
231            std::borrow::Cow::Owned(format!(
232                "{}... [truncated, {} bytes total]",
233                &content[..boundary],
234                content.len()
235            ))
236        } else {
237            std::borrow::Cow::Borrowed(content)
238        };
239
240        let importance_score = crate::semantic::importance::compute_importance(&content_cow, role);
241        let row: (MessageId,) = zeph_db::query_as(
242            sql!("INSERT INTO messages (conversation_id, role, content, parts, visibility, importance_score) \
243             VALUES (?, ?, ?, ?, ?, ?) RETURNING id"),
244        )
245        .bind(conversation_id)
246        .bind(role)
247        .bind(content_cow.as_ref())
248        .bind(parts_json)
249        .bind(visibility.as_db_str())
250        .bind(importance_score)
251        .fetch_one(&self.pool)
252        .await?;
253        Ok(row.0)
254    }
255
256    /// Load the most recent messages for a conversation, up to `limit`.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the query fails.
261    pub async fn load_history(
262        &self,
263        conversation_id: ConversationId,
264        limit: u32,
265    ) -> Result<Vec<Message>, MemoryError> {
266        let rows: Vec<(String, String, String, String, i64)> = zeph_db::query_as(sql!(
267            "SELECT role, content, parts, visibility, id FROM (\
268                SELECT role, content, parts, visibility, id FROM messages \
269                WHERE conversation_id = ? AND deleted_at IS NULL \
270                ORDER BY id DESC \
271                LIMIT ?\
272             ) ORDER BY id ASC"
273        ))
274        .bind(conversation_id)
275        .bind(limit)
276        .fetch_all(&self.pool)
277        .await?;
278
279        let messages = rows
280            .into_iter()
281            .map(|(role_str, content, parts_json, visibility_str, row_id)| {
282                let parts = parse_parts_json(&role_str, &parts_json);
283                Message {
284                    role: parse_role(&role_str),
285                    content,
286                    parts,
287                    metadata: MessageMetadata {
288                        visibility: MessageVisibility::from_db_str(&visibility_str),
289                        compacted_at: None,
290                        deferred_summary: None,
291                        focus_pinned: false,
292                        focus_marker_id: None,
293                        db_id: Some(row_id),
294                    },
295                }
296            })
297            .collect();
298        Ok(messages)
299    }
300
301    /// Load messages filtered by visibility flags.
302    ///
303    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if the query fails.
308    pub async fn load_history_filtered(
309        &self,
310        conversation_id: ConversationId,
311        limit: u32,
312        agent_visible: Option<bool>,
313        user_visible: Option<bool>,
314    ) -> Result<Vec<Message>, MemoryError> {
315        // Map boolean filters to SQL predicates on the visibility column.
316        // agent_visible=true  → exclude 'user_only'  rows
317        // user_visible=true   → exclude 'agent_only' rows
318        // The two filters are independent; when both are Some(true) the
319        // combined effect is to keep only 'both' rows.
320        let exclude_user_only = agent_visible == Some(true);
321        let exclude_agent_only = user_visible == Some(true);
322
323        let rows: Vec<(String, String, String, String, i64)> = zeph_db::query_as(sql!(
324            "WITH recent AS (\
325                SELECT role, content, parts, visibility, id FROM messages \
326                WHERE conversation_id = ? \
327                  AND deleted_at IS NULL \
328                  AND (NOT ? OR visibility != 'user_only') \
329                  AND (NOT ? OR visibility != 'agent_only') \
330                ORDER BY id DESC \
331                LIMIT ?\
332             ) SELECT role, content, parts, visibility, id FROM recent ORDER BY id ASC"
333        ))
334        .bind(conversation_id)
335        .bind(exclude_user_only)
336        .bind(exclude_agent_only)
337        .bind(limit)
338        .fetch_all(&self.pool)
339        .await?;
340
341        let messages = rows
342            .into_iter()
343            .map(|(role_str, content, parts_json, visibility_str, row_id)| {
344                let parts = parse_parts_json(&role_str, &parts_json);
345                Message {
346                    role: parse_role(&role_str),
347                    content,
348                    parts,
349                    metadata: MessageMetadata {
350                        visibility: MessageVisibility::from_db_str(&visibility_str),
351                        compacted_at: None,
352                        deferred_summary: None,
353                        focus_pinned: false,
354                        focus_marker_id: None,
355                        db_id: Some(row_id),
356                    },
357                }
358            })
359            .collect();
360        Ok(messages)
361    }
362
363    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
364    ///
365    /// Within a single transaction:
366    /// 1. Updates `visibility=user_only, compacted_at=now` for messages in `compacted_range`.
367    /// 2. Inserts `summary_content` with `visibility=agent_only`.
368    ///
369    /// Returns the `MessageId` of the inserted summary.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the transaction fails.
374    pub async fn replace_conversation(
375        &self,
376        conversation_id: ConversationId,
377        compacted_range: std::ops::RangeInclusive<MessageId>,
378        summary_role: &str,
379        summary_content: &str,
380    ) -> Result<MessageId, MemoryError> {
381        let now = {
382            let secs = std::time::SystemTime::now()
383                .duration_since(std::time::UNIX_EPOCH)
384                .unwrap_or_default()
385                .as_secs();
386            format!("{secs}")
387        };
388        let start_id = compacted_range.start().0;
389        let end_id = compacted_range.end().0;
390
391        let mut tx = self.pool.begin().await?;
392
393        zeph_db::query(sql!(
394            "UPDATE messages SET visibility = 'user_only', compacted_at = ? \
395             WHERE conversation_id = ? AND id >= ? AND id <= ?"
396        ))
397        .bind(&now)
398        .bind(conversation_id)
399        .bind(start_id)
400        .bind(end_id)
401        .execute(&mut *tx)
402        .await?;
403
404        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
405        let row: (MessageId,) = zeph_db::query_as(sql!(
406            "INSERT INTO messages \
407             (conversation_id, role, content, parts, visibility) \
408             VALUES (?, ?, ?, '[]', 'agent_only') RETURNING id"
409        ))
410        .bind(conversation_id)
411        .bind(summary_role)
412        .bind(summary_content)
413        .fetch_one(&mut *tx)
414        .await?;
415
416        tx.commit().await?;
417
418        Ok(row.0)
419    }
420
421    /// Atomically hide `tool_use/tool_result` message pairs and insert summary messages.
422    ///
423    /// Within a single transaction:
424    /// 1. Sets `visibility=user_only, compacted_at=<now>` for each ID in `hide_ids`.
425    /// 2. Inserts each text in `summaries` as a new agent-only assistant message.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if the transaction fails.
430    pub async fn apply_tool_pair_summaries(
431        &self,
432        conversation_id: ConversationId,
433        hide_ids: &[i64],
434        summaries: &[String],
435    ) -> Result<(), MemoryError> {
436        if hide_ids.is_empty() && summaries.is_empty() {
437            return Ok(());
438        }
439
440        let now = std::time::SystemTime::now()
441            .duration_since(std::time::UNIX_EPOCH)
442            .unwrap_or_default()
443            .as_secs()
444            .to_string();
445
446        let mut tx = self.pool.begin().await?;
447
448        for &id in hide_ids {
449            zeph_db::query(sql!(
450                "UPDATE messages SET visibility = 'user_only', compacted_at = ? WHERE id = ?"
451            ))
452            .bind(&now)
453            .bind(id)
454            .execute(&mut *tx)
455            .await?;
456        }
457
458        for summary in summaries {
459            let content = format!("[tool summary] {summary}");
460            let parts = serde_json::to_string(&[MessagePart::Summary {
461                text: summary.clone(),
462            }])
463            .unwrap_or_else(|_| "[]".to_string());
464            zeph_db::query(sql!(
465                "INSERT INTO messages \
466                 (conversation_id, role, content, parts, visibility) \
467                 VALUES (?, 'assistant', ?, ?, 'agent_only')"
468            ))
469            .bind(conversation_id)
470            .bind(&content)
471            .bind(&parts)
472            .execute(&mut *tx)
473            .await?;
474        }
475
476        tx.commit().await?;
477        Ok(())
478    }
479
480    /// Return the IDs of the N oldest messages in a conversation (ascending order).
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if the query fails.
485    pub async fn oldest_message_ids(
486        &self,
487        conversation_id: ConversationId,
488        n: u32,
489    ) -> Result<Vec<MessageId>, MemoryError> {
490        let rows: Vec<(MessageId,)> = zeph_db::query_as(
491            sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
492        )
493        .bind(conversation_id)
494        .bind(n)
495        .fetch_all(&self.pool)
496        .await?;
497        Ok(rows.into_iter().map(|r| r.0).collect())
498    }
499
500    /// Return the ID of the most recent conversation, if any.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if the query fails.
505    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
506        let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
507            "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
508        ))
509        .fetch_optional(&self.pool)
510        .await?;
511        Ok(row.map(|r| r.0))
512    }
513
514    /// Fetch a single message by its ID.
515    ///
516    /// # Errors
517    ///
518    /// Returns an error if the query fails.
519    pub async fn message_by_id(
520        &self,
521        message_id: MessageId,
522    ) -> Result<Option<Message>, MemoryError> {
523        let row: Option<(String, String, String, String)> = zeph_db::query_as(
524            sql!("SELECT role, content, parts, visibility FROM messages WHERE id = ? AND deleted_at IS NULL"),
525        )
526        .bind(message_id)
527        .fetch_optional(&self.pool)
528        .await?;
529
530        Ok(row.map(|(role_str, content, parts_json, visibility_str)| {
531            let parts = parse_parts_json(&role_str, &parts_json);
532            Message {
533                role: parse_role(&role_str),
534                content,
535                parts,
536                metadata: MessageMetadata {
537                    visibility: MessageVisibility::from_db_str(&visibility_str),
538                    compacted_at: None,
539                    deferred_summary: None,
540                    focus_pinned: false,
541                    focus_marker_id: None,
542                    db_id: None,
543                },
544            }
545        }))
546    }
547
548    /// Fetch messages by a list of IDs in a single query.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if the query fails.
553    pub async fn messages_by_ids(
554        &self,
555        ids: &[MessageId],
556    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
557        if ids.is_empty() {
558            return Ok(Vec::new());
559        }
560
561        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
562
563        let query = format!(
564            "SELECT id, role, content, parts FROM messages \
565             WHERE id IN ({placeholders}) AND visibility != 'user_only' AND deleted_at IS NULL"
566        );
567        let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
568        for &id in ids {
569            q = q.bind(id);
570        }
571
572        let rows = q.fetch_all(&self.pool).await?;
573
574        Ok(rows
575            .into_iter()
576            .map(|(id, role_str, content, parts_json)| {
577                let parts = parse_parts_json(&role_str, &parts_json);
578                (
579                    id,
580                    Message {
581                        role: parse_role(&role_str),
582                        content,
583                        parts,
584                        metadata: MessageMetadata::default(),
585                    },
586                )
587            })
588            .collect())
589    }
590
591    /// Return message IDs and content for messages without embeddings.
592    ///
593    /// # Errors
594    ///
595    /// Returns an error if the query fails.
596    pub async fn unembedded_message_ids(
597        &self,
598        limit: Option<usize>,
599    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
600        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
601
602        let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
603            "SELECT m.id, m.conversation_id, m.role, m.content \
604             FROM messages m \
605             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
606             WHERE em.id IS NULL AND m.deleted_at IS NULL \
607             ORDER BY m.id ASC \
608             LIMIT ?"
609        ))
610        .bind(effective_limit)
611        .fetch_all(&self.pool)
612        .await?;
613
614        Ok(rows)
615    }
616
617    /// Count messages that have no embedding yet.
618    ///
619    /// # Errors
620    ///
621    /// Returns an error if the query fails.
622    pub async fn count_unembedded_messages(&self) -> Result<usize, MemoryError> {
623        let row: (i64,) = zeph_db::query_as(sql!(
624            "SELECT COUNT(*) FROM messages m \
625             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
626             WHERE em.id IS NULL AND m.deleted_at IS NULL"
627        ))
628        .fetch_one(&self.pool)
629        .await?;
630        Ok(usize::try_from(row.0).unwrap_or(usize::MAX))
631    }
632
633    /// Stream message IDs and content for messages without embeddings, one row at a time.
634    ///
635    /// Executes the same query as [`Self::unembedded_message_ids`] but returns a streaming
636    /// cursor instead of loading all rows into a `Vec`. The `SQLite` read transaction is held
637    /// for the duration of iteration; callers must not write to `embeddings_metadata` while
638    /// the stream is live (use a separate async task for writes).
639    ///
640    /// # Errors
641    ///
642    /// Yields a [`MemoryError`] if the query or row decoding fails.
643    pub fn stream_unembedded_messages(
644        &self,
645        limit: i64,
646    ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
647    {
648        zeph_db::query_as(sql!(
649            "SELECT m.id, m.conversation_id, m.role, m.content \
650             FROM messages m \
651             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
652             WHERE em.id IS NULL AND m.deleted_at IS NULL \
653             ORDER BY m.id ASC \
654             LIMIT ?"
655        ))
656        .bind(limit)
657        .fetch(&self.pool)
658        .map_err(MemoryError::from)
659    }
660
661    /// Count the number of messages in a conversation.
662    ///
663    /// # Errors
664    ///
665    /// Returns an error if the query fails.
666    pub async fn count_messages(
667        &self,
668        conversation_id: ConversationId,
669    ) -> Result<i64, MemoryError> {
670        let row: (i64,) = zeph_db::query_as(sql!(
671            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
672        ))
673        .bind(conversation_id)
674        .fetch_one(&self.pool)
675        .await?;
676        Ok(row.0)
677    }
678
679    /// Count messages in a conversation with id greater than `after_id`.
680    ///
681    /// # Errors
682    ///
683    /// Returns an error if the query fails.
684    pub async fn count_messages_after(
685        &self,
686        conversation_id: ConversationId,
687        after_id: MessageId,
688    ) -> Result<i64, MemoryError> {
689        let row: (i64,) =
690            zeph_db::query_as(
691                sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
692            )
693            .bind(conversation_id)
694            .bind(after_id)
695            .fetch_one(&self.pool)
696            .await?;
697        Ok(row.0)
698    }
699
700    /// Full-text keyword search over messages using FTS5.
701    ///
702    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
703    /// negated to positive for consistency with vector scores).
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if the query fails.
708    pub async fn keyword_search(
709        &self,
710        query: &str,
711        limit: usize,
712        conversation_id: Option<ConversationId>,
713    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
714        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
715        let safe_query = sanitize_fts_query(query);
716        if safe_query.is_empty() {
717            return Ok(Vec::new());
718        }
719
720        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
721            zeph_db::query_as(
722                sql!("SELECT m.id, -rank AS score \
723                 FROM messages_fts f \
724                 JOIN messages m ON m.id = f.rowid \
725                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
726                 ORDER BY rank \
727                 LIMIT ?"),
728            )
729            .bind(&safe_query)
730            .bind(cid)
731            .bind(effective_limit)
732            .fetch_all(&self.pool)
733            .await?
734        } else {
735            zeph_db::query_as(sql!(
736                "SELECT m.id, -rank AS score \
737                 FROM messages_fts f \
738                 JOIN messages m ON m.id = f.rowid \
739                 WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
740                 ORDER BY rank \
741                 LIMIT ?"
742            ))
743            .bind(&safe_query)
744            .bind(effective_limit)
745            .fetch_all(&self.pool)
746            .await?
747        };
748
749        Ok(rows)
750    }
751
752    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
753    ///
754    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
755    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
756    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
757    ///
758    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
759    /// `None` means "no bound" on that side.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the query fails.
764    pub async fn keyword_search_with_time_range(
765        &self,
766        query: &str,
767        limit: usize,
768        conversation_id: Option<ConversationId>,
769        after: Option<&str>,
770        before: Option<&str>,
771    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
772        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
773        let safe_query = sanitize_fts_query(query);
774        if safe_query.is_empty() {
775            return Ok(Vec::new());
776        }
777
778        // Build time-range clauses dynamically. Both bounds are optional.
779        let after_clause = if after.is_some() {
780            " AND m.created_at > ?"
781        } else {
782            ""
783        };
784        let before_clause = if before.is_some() {
785            " AND m.created_at < ?"
786        } else {
787            ""
788        };
789        let conv_clause = if conversation_id.is_some() {
790            " AND m.conversation_id = ?"
791        } else {
792            ""
793        };
794
795        let sql = format!(
796            "SELECT m.id, -rank AS score \
797             FROM messages_fts f \
798             JOIN messages m ON m.id = f.rowid \
799             WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL\
800             {after_clause}{before_clause}{conv_clause} \
801             ORDER BY rank \
802             LIMIT ?"
803        );
804
805        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
806        if let Some(a) = after {
807            q = q.bind(a);
808        }
809        if let Some(b) = before {
810            q = q.bind(b);
811        }
812        if let Some(cid) = conversation_id {
813            q = q.bind(cid);
814        }
815        q = q.bind(effective_limit);
816
817        Ok(q.fetch_all(&self.pool).await?)
818    }
819
820    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
821    ///
822    /// Messages without a `created_at` column fall back to 0.
823    ///
824    /// # Errors
825    ///
826    /// Returns an error if the query fails.
827    pub async fn message_timestamps(
828        &self,
829        ids: &[MessageId],
830    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
831        if ids.is_empty() {
832            return Ok(std::collections::HashMap::new());
833        }
834
835        let placeholders: String =
836            zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
837        let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
838        let query = format!(
839            "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
840        );
841        let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
842        for &id in ids {
843            q = q.bind(id);
844        }
845
846        let rows = q.fetch_all(&self.pool).await?;
847        Ok(rows.into_iter().collect())
848    }
849
850    /// Load a range of messages after a given message ID.
851    ///
852    /// # Errors
853    ///
854    /// Returns an error if the query fails.
855    pub async fn load_messages_range(
856        &self,
857        conversation_id: ConversationId,
858        after_message_id: MessageId,
859        limit: usize,
860    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
861        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
862
863        let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
864            "SELECT id, role, content FROM messages \
865             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
866             ORDER BY id ASC LIMIT ?"
867        ))
868        .bind(conversation_id)
869        .bind(after_message_id)
870        .bind(effective_limit)
871        .fetch_all(&self.pool)
872        .await?;
873
874        Ok(rows)
875    }
876
877    // ── Eviction helpers ──────────────────────────────────────────────────────
878
879    /// Return all non-deleted message IDs with their eviction metadata.
880    ///
881    /// # Errors
882    ///
883    /// Returns an error if the query fails.
884    pub async fn get_eviction_candidates(
885        &self,
886    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
887        let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
888            "SELECT id, created_at, last_accessed, access_count \
889             FROM messages WHERE deleted_at IS NULL"
890        ))
891        .fetch_all(&self.pool)
892        .await?;
893
894        Ok(rows
895            .into_iter()
896            .map(
897                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
898                    id,
899                    created_at,
900                    last_accessed,
901                    access_count: access_count.try_into().unwrap_or(0),
902                },
903            )
904            .collect())
905    }
906
907    /// Soft-delete a set of messages by marking `deleted_at`.
908    ///
909    /// Soft-deleted messages are excluded from all history queries.
910    ///
911    /// # Errors
912    ///
913    /// Returns an error if the update fails.
914    pub async fn soft_delete_messages(
915        &self,
916        ids: &[MessageId],
917    ) -> Result<(), crate::error::MemoryError> {
918        if ids.is_empty() {
919            return Ok(());
920        }
921        // SQLite does not support array binding natively. Batch via individual updates.
922        for &id in ids {
923            zeph_db::query(
924                sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
925            )
926            .bind(id)
927            .execute(&self.pool)
928            .await?;
929        }
930        Ok(())
931    }
932
933    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
934    ///
935    /// # Errors
936    ///
937    /// Returns an error if the query fails.
938    pub async fn get_soft_deleted_message_ids(
939        &self,
940    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
941        let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
942            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
943        ))
944        .fetch_all(&self.pool)
945        .await?;
946        Ok(rows.into_iter().map(|(id,)| id).collect())
947    }
948
949    /// Filter `candidate_ids` to retain only IDs that are NOT covered by any
950    /// summary's `[first_message_id, last_message_id]` range (MM-F4, #3341).
951    ///
952    /// Returns the **safe-to-delete** subset: candidate IDs that do not fall inside
953    /// any summary range. This enforces the data-integrity invariant that raw episodes
954    /// referenced by a summary can never be soft-deleted by the eviction sweep.
955    ///
956    /// Batched at `MAX_BATCH = 490` placeholders per chunk to stay under the `SQLite`
957    /// 999-parameter limit. The `idx_summaries_message_range` partial index (migration 077)
958    /// makes the inner `NOT EXISTS` range probe efficient.
959    ///
960    /// # Errors
961    ///
962    /// Returns [`MemoryError`] if the underlying query fails.
963    pub async fn filter_out_preserved_episode_ids(
964        &self,
965        candidate_ids: &[MessageId],
966    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
967        const MAX_BATCH: usize = 490;
968        if candidate_ids.is_empty() {
969            return Ok(Vec::new());
970        }
971        let mut safe_to_delete: Vec<MessageId> = Vec::with_capacity(candidate_ids.len());
972        for chunk in candidate_ids.chunks(MAX_BATCH) {
973            let placeholders = placeholder_list(1, chunk.len());
974            let sql = format!(
975                "SELECT m.id \
976                   FROM messages m \
977                  WHERE m.id IN ({placeholders}) \
978                    AND NOT EXISTS ( \
979                          SELECT 1 \
980                            FROM summaries s \
981                           WHERE s.first_message_id IS NOT NULL \
982                             AND s.last_message_id IS NOT NULL \
983                             AND m.id >= s.first_message_id \
984                             AND m.id <= s.last_message_id \
985                        )"
986            );
987            let mut q = zeph_db::query_as::<_, (MessageId,)>(&sql);
988            for &id in chunk {
989                q = q.bind(id);
990            }
991            let rows: Vec<(MessageId,)> = q.fetch_all(&self.pool).await?;
992            safe_to_delete.extend(rows.into_iter().map(|(id,)| id));
993        }
994        Ok(safe_to_delete)
995    }
996
997    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
998    ///
999    /// # Errors
1000    ///
1001    /// Returns an error if the update fails.
1002    pub async fn mark_qdrant_cleaned(
1003        &self,
1004        ids: &[MessageId],
1005    ) -> Result<(), crate::error::MemoryError> {
1006        for &id in ids {
1007            zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
1008                .bind(id)
1009                .execute(&self.pool)
1010                .await?;
1011        }
1012        Ok(())
1013    }
1014
1015    /// Fetch `importance_score` values for the given message IDs.
1016    ///
1017    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns an error if the query fails.
1022    pub async fn fetch_importance_scores(
1023        &self,
1024        ids: &[MessageId],
1025    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
1026        if ids.is_empty() {
1027            return Ok(std::collections::HashMap::new());
1028        }
1029        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1030        let query = format!(
1031            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1032        );
1033        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
1034        for &id in ids {
1035            q = q.bind(id);
1036        }
1037        let rows = q.fetch_all(&self.pool).await?;
1038        Ok(rows.into_iter().collect())
1039    }
1040
1041    /// Increment `access_count` and set `last_accessed = CURRENT_TIMESTAMP` for the given IDs.
1042    ///
1043    /// Skips the update when `ids` is empty.
1044    ///
1045    /// # Errors
1046    ///
1047    /// Returns an error if the update fails.
1048    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1049        if ids.is_empty() {
1050            return Ok(());
1051        }
1052        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1053        let query = format!(
1054            "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
1055             WHERE id IN ({placeholders})"
1056        );
1057        let mut q = zeph_db::query(&query);
1058        for &id in ids {
1059            q = q.bind(id);
1060        }
1061        q.execute(&self.pool).await?;
1062        Ok(())
1063    }
1064
1065    // ── Tier promotion helpers ─────────────────────────────────────────────────
1066
1067    /// Return episodic messages with `session_count >= min_sessions`, ordered by
1068    /// session count descending then importance score descending.
1069    ///
1070    /// # Errors
1071    ///
1072    /// Returns an error if the query fails.
1073    pub async fn find_promotion_candidates(
1074        &self,
1075        min_sessions: u32,
1076        batch_size: usize,
1077    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
1078        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
1079        let min = i64::from(min_sessions);
1080        let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1081            "SELECT id, conversation_id, content, session_count, importance_score \
1082             FROM messages \
1083             WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1084             ORDER BY session_count DESC, importance_score DESC \
1085             LIMIT ?"
1086        ))
1087        .bind(min)
1088        .bind(limit)
1089        .fetch_all(&self.pool)
1090        .await?;
1091
1092        Ok(rows
1093            .into_iter()
1094            .map(
1095                |(id, conversation_id, content, session_count, importance_score)| {
1096                    PromotionCandidate {
1097                        id,
1098                        conversation_id,
1099                        content,
1100                        session_count: session_count.try_into().unwrap_or(0),
1101                        importance_score,
1102                    }
1103                },
1104            )
1105            .collect())
1106    }
1107
1108    /// Count messages per tier (episodic, semantic) that are not deleted.
1109    ///
1110    /// Returns `(episodic_count, semantic_count)`.
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if the query fails.
1115    pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1116        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1117            "SELECT tier, COUNT(*) FROM messages \
1118             WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1119             GROUP BY tier"
1120        ))
1121        .fetch_all(&self.pool)
1122        .await?;
1123
1124        let mut episodic = 0i64;
1125        let mut semantic = 0i64;
1126        for (tier, count) in rows {
1127            match tier.as_str() {
1128                "episodic" => episodic = count,
1129                "semantic" => semantic = count,
1130                _ => {}
1131            }
1132        }
1133        Ok((episodic, semantic))
1134    }
1135
1136    /// Count semantic facts (tier='semantic', not deleted).
1137    ///
1138    /// # Errors
1139    ///
1140    /// Returns an error if the query fails.
1141    pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1142        let row: (i64,) = zeph_db::query_as(sql!(
1143            "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1144        ))
1145        .fetch_one(&self.pool)
1146        .await?;
1147        Ok(row.0)
1148    }
1149
1150    /// Promote a set of episodic messages to semantic tier in a single transaction.
1151    ///
1152    /// Within one transaction:
1153    /// 1. Inserts a new message with `tier='semantic'` and `promotion_timestamp=unixepoch()`.
1154    /// 2. Soft-deletes the original episodic messages and marks them `qdrant_cleaned=0`
1155    ///    so the eviction sweep picks up their Qdrant vectors.
1156    ///
1157    /// Returns the `MessageId` of the new semantic message.
1158    ///
1159    /// # Errors
1160    ///
1161    /// Returns an error if the transaction fails.
1162    pub async fn promote_to_semantic(
1163        &self,
1164        conversation_id: ConversationId,
1165        merged_content: &str,
1166        original_ids: &[MessageId],
1167    ) -> Result<MessageId, MemoryError> {
1168        if original_ids.is_empty() {
1169            return Err(MemoryError::Other(
1170                "promote_to_semantic: original_ids must not be empty".into(),
1171            ));
1172        }
1173
1174        // Acquire the write lock immediately (BEGIN IMMEDIATE) to avoid the DEFERRED read->write
1175        // upgrade race that causes SQLITE_BUSY when a concurrent writer holds the lock.
1176        let mut tx = begin_write(&self.pool).await?;
1177
1178        // Insert the new semantic fact.
1179        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1180        let promote_insert_raw = format!(
1181            "INSERT INTO messages \
1182             (conversation_id, role, content, parts, visibility, \
1183              tier, promotion_timestamp) \
1184             VALUES (?, 'assistant', ?, '[]', 'agent_only', 'semantic', {epoch_now}) \
1185             RETURNING id"
1186        );
1187        let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1188        let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1189            .bind(conversation_id)
1190            .bind(merged_content)
1191            .fetch_one(&mut *tx)
1192            .await?;
1193
1194        let new_id = row.0;
1195
1196        // Soft-delete originals and reset qdrant_cleaned so eviction sweep removes vectors.
1197        for &id in original_ids {
1198            zeph_db::query(sql!(
1199                "UPDATE messages \
1200                 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1201                 WHERE id = ? AND deleted_at IS NULL"
1202            ))
1203            .bind(id)
1204            .execute(&mut *tx)
1205            .await?;
1206        }
1207
1208        tx.commit().await?;
1209        Ok(new_id)
1210    }
1211
1212    /// Manually promote a set of messages to semantic tier without merging.
1213    ///
1214    /// Sets `tier='semantic'` and `promotion_timestamp=unixepoch()` for the given IDs.
1215    /// Does NOT soft-delete the originals — use this for direct user-requested promotion.
1216    ///
1217    /// # Errors
1218    ///
1219    /// Returns an error if the update fails.
1220    pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1221        if ids.is_empty() {
1222            return Ok(0);
1223        }
1224        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1225        let manual_promote_raw = format!(
1226            "UPDATE messages \
1227             SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1228             WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1229        );
1230        let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1231        let mut count = 0usize;
1232        for &id in ids {
1233            let result = zeph_db::query(&manual_promote_sql)
1234                .bind(id)
1235                .execute(&self.pool)
1236                .await?;
1237            count += usize::try_from(result.rows_affected()).unwrap_or(0);
1238        }
1239        Ok(count)
1240    }
1241
1242    /// Increment `session_count` for all episodic messages in a conversation.
1243    ///
1244    /// Called when a session restores an existing conversation to mark that messages
1245    /// were accessed in a new session. Only episodic (non-deleted) messages are updated.
1246    ///
1247    /// # Errors
1248    ///
1249    /// Returns an error if the database update fails.
1250    pub async fn increment_session_counts_for_conversation(
1251        &self,
1252        conversation_id: ConversationId,
1253    ) -> Result<(), MemoryError> {
1254        zeph_db::query(sql!(
1255            "UPDATE messages SET session_count = session_count + 1 \
1256             WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1257        ))
1258        .bind(conversation_id)
1259        .execute(&self.pool)
1260        .await?;
1261        Ok(())
1262    }
1263
1264    /// Fetch the tier string for each of the given message IDs.
1265    ///
1266    /// Messages not found or already deleted are omitted from the result.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns an error if the query fails.
1271    pub async fn fetch_tiers(
1272        &self,
1273        ids: &[MessageId],
1274    ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1275        if ids.is_empty() {
1276            return Ok(std::collections::HashMap::new());
1277        }
1278        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1279        let query = format!(
1280            "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1281        );
1282        let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1283        for &id in ids {
1284            q = q.bind(id);
1285        }
1286        let rows = q.fetch_all(&self.pool).await?;
1287        Ok(rows.into_iter().collect())
1288    }
1289
1290    /// Return all conversation IDs that have at least one non-consolidated original message.
1291    ///
1292    /// Used by the consolidation sweep to find conversations that need processing.
1293    ///
1294    /// # Errors
1295    ///
1296    /// Returns an error if the database query fails.
1297    pub async fn conversations_with_unconsolidated_messages(
1298        &self,
1299    ) -> Result<Vec<ConversationId>, MemoryError> {
1300        let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1301            "SELECT DISTINCT conversation_id FROM messages \
1302             WHERE consolidated = 0 AND deleted_at IS NULL"
1303        ))
1304        .fetch_all(&self.pool)
1305        .await?;
1306        Ok(rows.into_iter().map(|(id,)| id).collect())
1307    }
1308
1309    /// Fetch a batch of non-consolidated original messages for the consolidation sweep.
1310    ///
1311    /// Returns `(id, content)` pairs for messages that have not yet been processed by the
1312    /// consolidation loop (`consolidated = 0`) and are not soft-deleted.
1313    ///
1314    /// # Errors
1315    ///
1316    /// Returns an error if the database query fails.
1317    pub async fn find_unconsolidated_messages(
1318        &self,
1319        conversation_id: ConversationId,
1320        limit: usize,
1321    ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1322        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1323        let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1324            "SELECT id, content FROM messages \
1325             WHERE conversation_id = ? \
1326               AND consolidated = 0 \
1327               AND deleted_at IS NULL \
1328             ORDER BY id ASC \
1329             LIMIT ?"
1330        ))
1331        .bind(conversation_id)
1332        .bind(limit)
1333        .fetch_all(&self.pool)
1334        .await?;
1335        Ok(rows)
1336    }
1337
1338    /// Look up which consolidated entry (if any) covers the given original message.
1339    ///
1340    /// Returns the `consolidated_id` of the first consolidation product that lists `source_id`
1341    /// in its sources, or `None` if no consolidated entry covers it.
1342    ///
1343    /// # Errors
1344    ///
1345    /// Returns an error if the database query fails.
1346    pub async fn find_consolidated_for_source(
1347        &self,
1348        source_id: MessageId,
1349    ) -> Result<Option<MessageId>, MemoryError> {
1350        let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1351            "SELECT consolidated_id FROM memory_consolidation_sources \
1352             WHERE source_id = ? \
1353             LIMIT 1"
1354        ))
1355        .bind(source_id)
1356        .fetch_optional(&self.pool)
1357        .await?;
1358        Ok(row.map(|(id,)| id))
1359    }
1360
1361    /// Insert a consolidated message and record its source linkage in a single transaction.
1362    ///
1363    /// Atomically:
1364    /// 1. Inserts the consolidated message with `consolidated = 1` and the given confidence.
1365    /// 2. Inserts rows into `memory_consolidation_sources` for each source ID.
1366    /// 3. Marks each source message's `consolidated = 1` so future sweeps skip them.
1367    ///
1368    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1369    ///
1370    /// # Errors
1371    ///
1372    /// Returns an error if any database operation fails. The transaction is rolled back automatically
1373    /// on failure so no partial state is written.
1374    pub async fn apply_consolidation_merge(
1375        &self,
1376        conversation_id: ConversationId,
1377        role: &str,
1378        merged_content: &str,
1379        source_ids: &[MessageId],
1380        confidence: f32,
1381        confidence_threshold: f32,
1382    ) -> Result<bool, MemoryError> {
1383        if confidence < confidence_threshold {
1384            return Ok(false);
1385        }
1386        if source_ids.is_empty() {
1387            return Ok(false);
1388        }
1389
1390        let mut tx = self.pool.begin().await?;
1391
1392        let importance = crate::semantic::importance::compute_importance(merged_content, role);
1393        let row: (MessageId,) = zeph_db::query_as(sql!(
1394            "INSERT INTO messages \
1395               (conversation_id, role, content, parts, visibility, \
1396                importance_score, consolidated, consolidation_confidence) \
1397             VALUES (?, ?, ?, '[]', 'both', ?, 1, ?) \
1398             RETURNING id"
1399        ))
1400        .bind(conversation_id)
1401        .bind(role)
1402        .bind(merged_content)
1403        .bind(importance)
1404        .bind(confidence)
1405        .fetch_one(&mut *tx)
1406        .await?;
1407        let consolidated_id = row.0;
1408
1409        let consol_sql = format!(
1410            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1411            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1412            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1413        );
1414        for &source_id in source_ids {
1415            zeph_db::query(&consol_sql)
1416                .bind(consolidated_id)
1417                .bind(source_id)
1418                .execute(&mut *tx)
1419                .await?;
1420
1421            // Mark original as consolidated so future sweeps skip it.
1422            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1423                .bind(source_id)
1424                .execute(&mut *tx)
1425                .await?;
1426        }
1427
1428        tx.commit().await?;
1429        Ok(true)
1430    }
1431
1432    /// Update an existing consolidated message in-place with new content.
1433    ///
1434    /// Atomically:
1435    /// 1. Updates `content` and `consolidation_confidence` on `target_id`.
1436    /// 2. Inserts rows into `memory_consolidation_sources` linking `target_id` → each source.
1437    /// 3. Marks each source message's `consolidated = 1`.
1438    ///
1439    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1440    ///
1441    /// # Errors
1442    ///
1443    /// Returns an error if any database operation fails.
1444    pub async fn apply_consolidation_update(
1445        &self,
1446        target_id: MessageId,
1447        new_content: &str,
1448        additional_source_ids: &[MessageId],
1449        confidence: f32,
1450        confidence_threshold: f32,
1451    ) -> Result<bool, MemoryError> {
1452        if confidence < confidence_threshold {
1453            return Ok(false);
1454        }
1455
1456        let mut tx = self.pool.begin().await?;
1457
1458        zeph_db::query(sql!(
1459            "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1460        ))
1461        .bind(new_content)
1462        .bind(confidence)
1463        .bind(target_id)
1464        .execute(&mut *tx)
1465        .await?;
1466
1467        let consol_sql = format!(
1468            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1469            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1470            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1471        );
1472        for &source_id in additional_source_ids {
1473            zeph_db::query(&consol_sql)
1474                .bind(target_id)
1475                .bind(source_id)
1476                .execute(&mut *tx)
1477                .await?;
1478
1479            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1480                .bind(source_id)
1481                .execute(&mut *tx)
1482                .await?;
1483        }
1484
1485        tx.commit().await?;
1486        Ok(true)
1487    }
1488
1489    // ── Forgetting sweep helpers ───────────────────────────────────────────────
1490
1491    /// Set `importance_score` for a single message by ID.
1492    ///
1493    /// Used in tests and by forgetting sweep helpers.
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns an error if the update fails.
1498    pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1499        zeph_db::query(sql!(
1500            "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1501        ))
1502        .bind(score)
1503        .bind(id)
1504        .execute(&self.pool)
1505        .await?;
1506        Ok(())
1507    }
1508
1509    /// Get `importance_score` for a single message by ID.
1510    ///
1511    /// Returns `None` if the message does not exist or is deleted.
1512    ///
1513    /// # Errors
1514    ///
1515    /// Returns an error if the query fails.
1516    pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1517        let row: Option<(f64,)> = zeph_db::query_as(sql!(
1518            "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1519        ))
1520        .bind(id)
1521        .fetch_optional(&self.pool)
1522        .await?;
1523        Ok(row.map(|(s,)| s))
1524    }
1525
1526    /// Increment `access_count` and update `last_accessed` for a batch of messages.
1527    ///
1528    /// Alias used in forgetting tests; forwards to `increment_access_counts`.
1529    ///
1530    /// # Errors
1531    ///
1532    /// Returns an error if the update fails.
1533    pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1534        self.increment_access_counts(ids).await
1535    }
1536
1537    /// Mark a set of messages as consolidated (`consolidated = 1`).
1538    ///
1539    /// Used in tests to simulate the state after consolidation.
1540    ///
1541    /// # Errors
1542    ///
1543    /// Returns an error if the update fails.
1544    pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1545        for &id in ids {
1546            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1547                .bind(id)
1548                .execute(&self.pool)
1549                .await?;
1550        }
1551        Ok(())
1552    }
1553
1554    /// Execute the three-phase forgetting sweep (`SleepGate`) inside a single transaction.
1555    ///
1556    /// **Phase 1** — Downscale all active non-consolidated importance scores by `decay_rate`.
1557    /// **Phase 2** — Undo the decay for messages accessed within `replay_window_hours` or
1558    ///   with `access_count >= replay_min_access_count` (undo current sweep's decay only).
1559    /// **Phase 3** — Soft-delete messages below `forgetting_floor` that are not protected
1560    ///   by recent access (`protect_recent_hours`) or high access count
1561    ///   (`protect_min_access_count`). Uses `batch_size` as a row-count cap.
1562    ///
1563    /// All phases commit atomically: concurrent readers see either the pre-sweep or
1564    /// post-sweep state, never an intermediate.
1565    ///
1566    /// # Errors
1567    ///
1568    /// Returns an error if any database operation fails.
1569    pub async fn run_forgetting_sweep_tx(
1570        &self,
1571        config: &zeph_common::config::memory::ForgettingConfig,
1572    ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1573        let mut tx = self.pool.begin().await?;
1574
1575        let decay = f64::from(config.decay_rate);
1576        let floor = f64::from(config.forgetting_floor);
1577        let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1578        let replay_hours = i64::from(config.replay_window_hours);
1579        let replay_min_access = i64::from(config.replay_min_access_count);
1580        let protect_hours = i64::from(config.protect_recent_hours);
1581        let protect_min_access = i64::from(config.protect_min_access_count);
1582
1583        // Phase 1: downscale all active, non-consolidated messages (limited to batch_size).
1584        // We target a specific set of IDs to respect sweep_batch_size.
1585        let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1586            "SELECT id FROM messages \
1587             WHERE deleted_at IS NULL AND consolidated = 0 \
1588             ORDER BY importance_score ASC \
1589             LIMIT ?"
1590        ))
1591        .bind(batch)
1592        .fetch_all(&mut *tx)
1593        .await?;
1594
1595        #[allow(clippy::cast_possible_truncation)]
1596        let downscaled = candidate_ids.len() as u32;
1597
1598        if downscaled > 0 {
1599            let placeholders: String = candidate_ids
1600                .iter()
1601                .map(|_| "?")
1602                .collect::<Vec<_>>()
1603                .join(",");
1604            let downscale_sql = format!(
1605                "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1606                 WHERE id IN ({placeholders})"
1607            );
1608            let mut q = zeph_db::query(&downscale_sql);
1609            for &(id,) in &candidate_ids {
1610                q = q.bind(id);
1611            }
1612            q.execute(&mut *tx).await?;
1613        }
1614
1615        // Phase 2: selective replay — undo decay for recently-accessed messages.
1616        // Formula: score / (1 - decay_rate) restores the current sweep's downscaling.
1617        // Cap at 1.0 to avoid exceeding the maximum importance score.
1618        // Scoped to the Phase 1 batch only: messages not decayed this sweep must not
1619        // have their scores inflated by the inverse formula.
1620        let replayed = if downscaled > 0 {
1621            let replay_placeholders: String = candidate_ids
1622                .iter()
1623                .map(|_| "?")
1624                .collect::<Vec<_>>()
1625                .join(",");
1626            let replay_sql = format!(
1627                "UPDATE messages \
1628                 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1629                 WHERE id IN ({replay_placeholders}) \
1630                 AND (\
1631                     (last_accessed IS NOT NULL \
1632                      AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1633                     OR access_count >= ?\
1634                 )"
1635            );
1636            let mut rq = zeph_db::query(&replay_sql);
1637            for &(id,) in &candidate_ids {
1638                rq = rq.bind(id);
1639            }
1640            let replay_result = rq
1641                .bind(replay_hours)
1642                .bind(replay_min_access)
1643                .execute(&mut *tx)
1644                .await?;
1645            #[allow(clippy::cast_possible_truncation)]
1646            let n = replay_result.rows_affected() as u32;
1647            n
1648        } else {
1649            0
1650        };
1651
1652        // Phase 3: targeted forgetting — soft-delete low-score unprotected messages.
1653        let prune_sql = format!(
1654            "UPDATE messages \
1655             SET deleted_at = CURRENT_TIMESTAMP \
1656             WHERE deleted_at IS NULL AND consolidated = 0 \
1657             AND importance_score < {floor} \
1658             AND (\
1659                 last_accessed IS NULL \
1660                 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1661             ) \
1662             AND access_count < ?"
1663        );
1664        let prune_result = zeph_db::query(&prune_sql)
1665            .bind(protect_hours)
1666            .bind(protect_min_access)
1667            .execute(&mut *tx)
1668            .await?;
1669        #[allow(clippy::cast_possible_truncation)]
1670        let pruned = prune_result.rows_affected() as u32;
1671
1672        tx.commit().await?;
1673
1674        Ok(crate::forgetting::ForgettingResult {
1675            downscaled,
1676            replayed,
1677            pruned,
1678        })
1679    }
1680}
1681
1682/// A candidate message for tier promotion, returned by [`SqliteStore::find_promotion_candidates`].
1683#[derive(Debug, Clone)]
1684pub struct PromotionCandidate {
1685    pub id: MessageId,
1686    pub conversation_id: ConversationId,
1687    pub content: String,
1688    pub session_count: u32,
1689    pub importance_score: f64,
1690}
1691
1692#[cfg(test)]
1693mod tests;