Skip to main content

zeph_memory/store/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18    match s {
19        "assistant" => Role::Assistant,
20        "system" => Role::System,
21        _ => Role::User,
22    }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27    match role {
28        Role::System => "system",
29        Role::User => "user",
30        Role::Assistant => "assistant",
31    }
32}
33
34/// Map a legacy externally-tagged variant key to the `kind` value used by the current
35/// internally-tagged schema.
36fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37    match key {
38        "Text" => Some("text"),
39        "ToolOutput" => Some("tool_output"),
40        "Recall" => Some("recall"),
41        "CodeContext" => Some("code_context"),
42        "Summary" => Some("summary"),
43        "CrossSession" => Some("cross_session"),
44        "ToolUse" => Some("tool_use"),
45        "ToolResult" => Some("tool_result"),
46        "Image" => Some("image"),
47        "ThinkingBlock" => Some("thinking_block"),
48        "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49        "Compaction" => Some("compaction"),
50        _ => None,
51    }
52}
53
54/// Attempt to parse a JSON string written in the pre-v0.17.1 externally-tagged format.
55///
56/// Old format: `[{"Summary":{"text":"..."}}, ...]`
57/// New format: `[{"kind":"summary","text":"..."}, ...]`
58///
59/// Returns `None` if the input does not look like the old format or if any element fails
60/// to deserialize after conversion.
61fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62    let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63    let mut result = Vec::with_capacity(array.len());
64    for element in array {
65        let obj = element.as_object()?;
66        if obj.contains_key("kind") {
67            return None;
68        }
69        if obj.len() != 1 {
70            return None;
71        }
72        let (key, inner) = obj.iter().next()?;
73        let kind = legacy_key_to_kind(key)?;
74        let mut new_obj = match inner {
75            serde_json::Value::Object(m) => m.clone(),
76            // Image variant wraps a single object directly
77            other => {
78                let mut m = serde_json::Map::new();
79                m.insert("data".to_string(), other.clone());
80                m
81            }
82        };
83        new_obj.insert(
84            "kind".to_string(),
85            serde_json::Value::String(kind.to_string()),
86        );
87        let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88        result.push(part);
89    }
90    Some(result)
91}
92
93/// Deserialize message parts from a stored JSON string.
94///
95/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
96/// a truncated excerpt of the malformed JSON for diagnostics.
97fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98    if parts_json == "[]" {
99        return vec![];
100    }
101    match serde_json::from_str(parts_json) {
102        Ok(p) => p,
103        Err(e) => {
104            if let Some(parts) = try_parse_legacy_parts(parts_json) {
105                let truncated = parts_json.chars().take(120).collect::<String>();
106                tracing::warn!(
107                    role = %role_str,
108                    parts_json = %truncated,
109                    "loaded legacy-format message parts via compat path"
110                );
111                return parts;
112            }
113            let truncated = parts_json.chars().take(120).collect::<String>();
114            tracing::warn!(
115                role = %role_str,
116                parts_json = %truncated,
117                error = %e,
118                "failed to deserialize message parts, falling back to empty"
119            );
120            vec![]
121        }
122    }
123}
124
125impl SqliteStore {
126    /// Create a new conversation and return its ID.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the insert fails.
131    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132        let row: (ConversationId,) = zeph_db::query_as(sql!(
133            "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134        ))
135        .fetch_one(&self.pool)
136        .await?;
137        Ok(row.0)
138    }
139
140    /// Save a message to the given conversation and return the message ID.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the insert fails.
145    pub async fn save_message(
146        &self,
147        conversation_id: ConversationId,
148        role: &str,
149        content: &str,
150    ) -> Result<MessageId, MemoryError> {
151        self.save_message_with_parts(conversation_id, role, content, "[]")
152            .await
153    }
154
155    /// Save a message with structured parts JSON.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the insert fails.
160    pub async fn save_message_with_parts(
161        &self,
162        conversation_id: ConversationId,
163        role: &str,
164        content: &str,
165        parts_json: &str,
166    ) -> Result<MessageId, MemoryError> {
167        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
168            .await
169    }
170
171    /// Save a message with an optional category tag.
172    ///
173    /// The `category` column is NULL when `None` — existing rows are unaffected.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the insert fails.
178    pub async fn save_message_with_category(
179        &self,
180        conversation_id: ConversationId,
181        role: &str,
182        content: &str,
183        category: Option<&str>,
184    ) -> Result<MessageId, MemoryError> {
185        let importance_score = crate::semantic::importance::compute_importance(content, role);
186        let row: (MessageId,) = zeph_db::query_as(sql!(
187            "INSERT INTO messages \
188                 (conversation_id, role, content, parts, agent_visible, user_visible, \
189                  importance_score, category) \
190                 VALUES (?, ?, ?, '[]', 1, 1, ?, ?) RETURNING id"
191        ))
192        .bind(conversation_id)
193        .bind(role)
194        .bind(content)
195        .bind(importance_score)
196        .bind(category)
197        .fetch_one(&self.pool)
198        .await?;
199        Ok(row.0)
200    }
201
202    /// Save a message with visibility metadata.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the insert fails.
207    pub async fn save_message_with_metadata(
208        &self,
209        conversation_id: ConversationId,
210        role: &str,
211        content: &str,
212        parts_json: &str,
213        agent_visible: bool,
214        user_visible: bool,
215    ) -> Result<MessageId, MemoryError> {
216        const MAX_BYTES: usize = 100 * 1024;
217
218        // Truncate plain-text content only. `parts_json` is skipped because a
219        // mid-byte cut produces invalid JSON that breaks downstream deserialization.
220        let content_cow: std::borrow::Cow<'_, str> = if content.len() > MAX_BYTES {
221            let boundary = content.floor_char_boundary(MAX_BYTES);
222            tracing::debug!(
223                original_bytes = content.len(),
224                "save_message: content exceeds 100KB, truncating"
225            );
226            std::borrow::Cow::Owned(format!(
227                "{}... [truncated, {} bytes total]",
228                &content[..boundary],
229                content.len()
230            ))
231        } else {
232            std::borrow::Cow::Borrowed(content)
233        };
234
235        let importance_score = crate::semantic::importance::compute_importance(&content_cow, role);
236        let row: (MessageId,) = zeph_db::query_as(
237            sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
238             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
239        )
240        .bind(conversation_id)
241        .bind(role)
242        .bind(content_cow.as_ref())
243        .bind(parts_json)
244        .bind(i64::from(agent_visible))
245        .bind(i64::from(user_visible))
246        .bind(importance_score)
247        .fetch_one(&self.pool)
248        .await?;
249        Ok(row.0)
250    }
251
252    /// Load the most recent messages for a conversation, up to `limit`.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the query fails.
257    pub async fn load_history(
258        &self,
259        conversation_id: ConversationId,
260        limit: u32,
261    ) -> Result<Vec<Message>, MemoryError> {
262        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
263            "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
264                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
265                WHERE conversation_id = ? AND deleted_at IS NULL \
266                ORDER BY id DESC \
267                LIMIT ?\
268             ) ORDER BY id ASC"
269        ))
270        .bind(conversation_id)
271        .bind(limit)
272        .fetch_all(&self.pool)
273        .await?;
274
275        let messages = rows
276            .into_iter()
277            .map(
278                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
279                    let parts = parse_parts_json(&role_str, &parts_json);
280                    Message {
281                        role: parse_role(&role_str),
282                        content,
283                        parts,
284                        metadata: MessageMetadata {
285                            agent_visible: agent_visible != 0,
286                            user_visible: user_visible != 0,
287                            compacted_at: None,
288                            deferred_summary: None,
289                            focus_pinned: false,
290                            focus_marker_id: None,
291                            db_id: Some(row_id),
292                        },
293                    }
294                },
295            )
296            .collect();
297        Ok(messages)
298    }
299
300    /// Load messages filtered by visibility flags.
301    ///
302    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the query fails.
307    pub async fn load_history_filtered(
308        &self,
309        conversation_id: ConversationId,
310        limit: u32,
311        agent_visible: Option<bool>,
312        user_visible: Option<bool>,
313    ) -> Result<Vec<Message>, MemoryError> {
314        let av = agent_visible.map(i64::from);
315        let uv = user_visible.map(i64::from);
316
317        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
318            sql!("WITH recent AS (\
319                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
320                WHERE conversation_id = ? \
321                  AND deleted_at IS NULL \
322                  AND (? IS NULL OR agent_visible = ?) \
323                  AND (? IS NULL OR user_visible = ?) \
324                ORDER BY id DESC \
325                LIMIT ?\
326             ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
327        )
328        .bind(conversation_id)
329        .bind(av)
330        .bind(av)
331        .bind(uv)
332        .bind(uv)
333        .bind(limit)
334        .fetch_all(&self.pool)
335        .await?;
336
337        let messages = rows
338            .into_iter()
339            .map(
340                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
341                    let parts = parse_parts_json(&role_str, &parts_json);
342                    Message {
343                        role: parse_role(&role_str),
344                        content,
345                        parts,
346                        metadata: MessageMetadata {
347                            agent_visible: agent_visible != 0,
348                            user_visible: user_visible != 0,
349                            compacted_at: None,
350                            deferred_summary: None,
351                            focus_pinned: false,
352                            focus_marker_id: None,
353                            db_id: Some(row_id),
354                        },
355                    }
356                },
357            )
358            .collect();
359        Ok(messages)
360    }
361
362    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
363    ///
364    /// Within a single transaction:
365    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
366    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
367    ///
368    /// Returns the `MessageId` of the inserted summary.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the transaction fails.
373    pub async fn replace_conversation(
374        &self,
375        conversation_id: ConversationId,
376        compacted_range: std::ops::RangeInclusive<MessageId>,
377        summary_role: &str,
378        summary_content: &str,
379    ) -> Result<MessageId, MemoryError> {
380        let now = {
381            let secs = std::time::SystemTime::now()
382                .duration_since(std::time::UNIX_EPOCH)
383                .unwrap_or_default()
384                .as_secs();
385            format!("{secs}")
386        };
387        let start_id = compacted_range.start().0;
388        let end_id = compacted_range.end().0;
389
390        let mut tx = self.pool.begin().await?;
391
392        zeph_db::query(sql!(
393            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
394             WHERE conversation_id = ? AND id >= ? AND id <= ?"
395        ))
396        .bind(&now)
397        .bind(conversation_id)
398        .bind(start_id)
399        .bind(end_id)
400        .execute(&mut *tx)
401        .await?;
402
403        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
404        let row: (MessageId,) = zeph_db::query_as(sql!(
405            "INSERT INTO messages \
406             (conversation_id, role, content, parts, agent_visible, user_visible) \
407             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
408        ))
409        .bind(conversation_id)
410        .bind(summary_role)
411        .bind(summary_content)
412        .fetch_one(&mut *tx)
413        .await?;
414
415        tx.commit().await?;
416
417        Ok(row.0)
418    }
419
420    /// Atomically hide `tool_use/tool_result` message pairs and insert summary messages.
421    ///
422    /// Within a single transaction:
423    /// 1. Sets `agent_visible=0, compacted_at=<now>` for each ID in `hide_ids`.
424    /// 2. Inserts each text in `summaries` as a new agent-only assistant message.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the transaction fails.
429    pub async fn apply_tool_pair_summaries(
430        &self,
431        conversation_id: ConversationId,
432        hide_ids: &[i64],
433        summaries: &[String],
434    ) -> Result<(), MemoryError> {
435        if hide_ids.is_empty() && summaries.is_empty() {
436            return Ok(());
437        }
438
439        let now = std::time::SystemTime::now()
440            .duration_since(std::time::UNIX_EPOCH)
441            .unwrap_or_default()
442            .as_secs()
443            .to_string();
444
445        let mut tx = self.pool.begin().await?;
446
447        for &id in hide_ids {
448            zeph_db::query(sql!(
449                "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
450            ))
451            .bind(&now)
452            .bind(id)
453            .execute(&mut *tx)
454            .await?;
455        }
456
457        for summary in summaries {
458            let content = format!("[tool summary] {summary}");
459            let parts = serde_json::to_string(&[MessagePart::Summary {
460                text: summary.clone(),
461            }])
462            .unwrap_or_else(|_| "[]".to_string());
463            zeph_db::query(sql!(
464                "INSERT INTO messages \
465                 (conversation_id, role, content, parts, agent_visible, user_visible) \
466                 VALUES (?, 'assistant', ?, ?, 1, 0)"
467            ))
468            .bind(conversation_id)
469            .bind(&content)
470            .bind(&parts)
471            .execute(&mut *tx)
472            .await?;
473        }
474
475        tx.commit().await?;
476        Ok(())
477    }
478
479    /// Return the IDs of the N oldest messages in a conversation (ascending order).
480    ///
481    /// # Errors
482    ///
483    /// Returns an error if the query fails.
484    pub async fn oldest_message_ids(
485        &self,
486        conversation_id: ConversationId,
487        n: u32,
488    ) -> Result<Vec<MessageId>, MemoryError> {
489        let rows: Vec<(MessageId,)> = zeph_db::query_as(
490            sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
491        )
492        .bind(conversation_id)
493        .bind(n)
494        .fetch_all(&self.pool)
495        .await?;
496        Ok(rows.into_iter().map(|r| r.0).collect())
497    }
498
499    /// Return the ID of the most recent conversation, if any.
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if the query fails.
504    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
505        let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
506            "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
507        ))
508        .fetch_optional(&self.pool)
509        .await?;
510        Ok(row.map(|r| r.0))
511    }
512
513    /// Fetch a single message by its ID.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the query fails.
518    pub async fn message_by_id(
519        &self,
520        message_id: MessageId,
521    ) -> Result<Option<Message>, MemoryError> {
522        let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
523            sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
524        )
525        .bind(message_id)
526        .fetch_optional(&self.pool)
527        .await?;
528
529        Ok(row.map(
530            |(role_str, content, parts_json, agent_visible, user_visible)| {
531                let parts = parse_parts_json(&role_str, &parts_json);
532                Message {
533                    role: parse_role(&role_str),
534                    content,
535                    parts,
536                    metadata: MessageMetadata {
537                        agent_visible: agent_visible != 0,
538                        user_visible: user_visible != 0,
539                        compacted_at: None,
540                        deferred_summary: None,
541                        focus_pinned: false,
542                        focus_marker_id: None,
543                        db_id: None,
544                    },
545                }
546            },
547        ))
548    }
549
550    /// Fetch messages by a list of IDs in a single query.
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if the query fails.
555    pub async fn messages_by_ids(
556        &self,
557        ids: &[MessageId],
558    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
559        if ids.is_empty() {
560            return Ok(Vec::new());
561        }
562
563        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
564
565        let query = format!(
566            "SELECT id, role, content, parts FROM messages \
567             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
568        );
569        let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
570        for &id in ids {
571            q = q.bind(id);
572        }
573
574        let rows = q.fetch_all(&self.pool).await?;
575
576        Ok(rows
577            .into_iter()
578            .map(|(id, role_str, content, parts_json)| {
579                let parts = parse_parts_json(&role_str, &parts_json);
580                (
581                    id,
582                    Message {
583                        role: parse_role(&role_str),
584                        content,
585                        parts,
586                        metadata: MessageMetadata::default(),
587                    },
588                )
589            })
590            .collect())
591    }
592
593    /// Return message IDs and content for messages without embeddings.
594    ///
595    /// # Errors
596    ///
597    /// Returns an error if the query fails.
598    pub async fn unembedded_message_ids(
599        &self,
600        limit: Option<usize>,
601    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
602        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
603
604        let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
605            "SELECT m.id, m.conversation_id, m.role, m.content \
606             FROM messages m \
607             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
608             WHERE em.id IS NULL AND m.deleted_at IS NULL \
609             ORDER BY m.id ASC \
610             LIMIT ?"
611        ))
612        .bind(effective_limit)
613        .fetch_all(&self.pool)
614        .await?;
615
616        Ok(rows)
617    }
618
619    /// Count messages that have no embedding yet.
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if the query fails.
624    pub async fn count_unembedded_messages(&self) -> Result<usize, MemoryError> {
625        let row: (i64,) = zeph_db::query_as(sql!(
626            "SELECT COUNT(*) FROM messages m \
627             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
628             WHERE em.id IS NULL AND m.deleted_at IS NULL"
629        ))
630        .fetch_one(&self.pool)
631        .await?;
632        Ok(usize::try_from(row.0).unwrap_or(usize::MAX))
633    }
634
635    /// Stream message IDs and content for messages without embeddings, one row at a time.
636    ///
637    /// Executes the same query as [`Self::unembedded_message_ids`] but returns a streaming
638    /// cursor instead of loading all rows into a `Vec`. The `SQLite` read transaction is held
639    /// for the duration of iteration; callers must not write to `embeddings_metadata` while
640    /// the stream is live (use a separate async task for writes).
641    ///
642    /// # Errors
643    ///
644    /// Yields a [`MemoryError`] if the query or row decoding fails.
645    pub fn stream_unembedded_messages(
646        &self,
647        limit: i64,
648    ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
649    {
650        zeph_db::query_as(sql!(
651            "SELECT m.id, m.conversation_id, m.role, m.content \
652             FROM messages m \
653             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
654             WHERE em.id IS NULL AND m.deleted_at IS NULL \
655             ORDER BY m.id ASC \
656             LIMIT ?"
657        ))
658        .bind(limit)
659        .fetch(&self.pool)
660        .map_err(MemoryError::from)
661    }
662
663    /// Count the number of messages in a conversation.
664    ///
665    /// # Errors
666    ///
667    /// Returns an error if the query fails.
668    pub async fn count_messages(
669        &self,
670        conversation_id: ConversationId,
671    ) -> Result<i64, MemoryError> {
672        let row: (i64,) = zeph_db::query_as(sql!(
673            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
674        ))
675        .bind(conversation_id)
676        .fetch_one(&self.pool)
677        .await?;
678        Ok(row.0)
679    }
680
681    /// Count messages in a conversation with id greater than `after_id`.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if the query fails.
686    pub async fn count_messages_after(
687        &self,
688        conversation_id: ConversationId,
689        after_id: MessageId,
690    ) -> Result<i64, MemoryError> {
691        let row: (i64,) =
692            zeph_db::query_as(
693                sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
694            )
695            .bind(conversation_id)
696            .bind(after_id)
697            .fetch_one(&self.pool)
698            .await?;
699        Ok(row.0)
700    }
701
702    /// Full-text keyword search over messages using FTS5.
703    ///
704    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
705    /// negated to positive for consistency with vector scores).
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if the query fails.
710    pub async fn keyword_search(
711        &self,
712        query: &str,
713        limit: usize,
714        conversation_id: Option<ConversationId>,
715    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
716        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
717        let safe_query = sanitize_fts_query(query);
718        if safe_query.is_empty() {
719            return Ok(Vec::new());
720        }
721
722        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
723            zeph_db::query_as(
724                sql!("SELECT m.id, -rank AS score \
725                 FROM messages_fts f \
726                 JOIN messages m ON m.id = f.rowid \
727                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
728                 ORDER BY rank \
729                 LIMIT ?"),
730            )
731            .bind(&safe_query)
732            .bind(cid)
733            .bind(effective_limit)
734            .fetch_all(&self.pool)
735            .await?
736        } else {
737            zeph_db::query_as(sql!(
738                "SELECT m.id, -rank AS score \
739                 FROM messages_fts f \
740                 JOIN messages m ON m.id = f.rowid \
741                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
742                 ORDER BY rank \
743                 LIMIT ?"
744            ))
745            .bind(&safe_query)
746            .bind(effective_limit)
747            .fetch_all(&self.pool)
748            .await?
749        };
750
751        Ok(rows)
752    }
753
754    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
755    ///
756    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
757    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
758    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
759    ///
760    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
761    /// `None` means "no bound" on that side.
762    ///
763    /// # Errors
764    ///
765    /// Returns an error if the query fails.
766    pub async fn keyword_search_with_time_range(
767        &self,
768        query: &str,
769        limit: usize,
770        conversation_id: Option<ConversationId>,
771        after: Option<&str>,
772        before: Option<&str>,
773    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
774        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
775        let safe_query = sanitize_fts_query(query);
776        if safe_query.is_empty() {
777            return Ok(Vec::new());
778        }
779
780        // Build time-range clauses dynamically. Both bounds are optional.
781        let after_clause = if after.is_some() {
782            " AND m.created_at > ?"
783        } else {
784            ""
785        };
786        let before_clause = if before.is_some() {
787            " AND m.created_at < ?"
788        } else {
789            ""
790        };
791        let conv_clause = if conversation_id.is_some() {
792            " AND m.conversation_id = ?"
793        } else {
794            ""
795        };
796
797        let sql = format!(
798            "SELECT m.id, -rank AS score \
799             FROM messages_fts f \
800             JOIN messages m ON m.id = f.rowid \
801             WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
802             {after_clause}{before_clause}{conv_clause} \
803             ORDER BY rank \
804             LIMIT ?"
805        );
806
807        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
808        if let Some(a) = after {
809            q = q.bind(a);
810        }
811        if let Some(b) = before {
812            q = q.bind(b);
813        }
814        if let Some(cid) = conversation_id {
815            q = q.bind(cid);
816        }
817        q = q.bind(effective_limit);
818
819        Ok(q.fetch_all(&self.pool).await?)
820    }
821
822    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
823    ///
824    /// Messages without a `created_at` column fall back to 0.
825    ///
826    /// # Errors
827    ///
828    /// Returns an error if the query fails.
829    pub async fn message_timestamps(
830        &self,
831        ids: &[MessageId],
832    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
833        if ids.is_empty() {
834            return Ok(std::collections::HashMap::new());
835        }
836
837        let placeholders: String =
838            zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
839        let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
840        let query = format!(
841            "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
842        );
843        let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
844        for &id in ids {
845            q = q.bind(id);
846        }
847
848        let rows = q.fetch_all(&self.pool).await?;
849        Ok(rows.into_iter().collect())
850    }
851
852    /// Load a range of messages after a given message ID.
853    ///
854    /// # Errors
855    ///
856    /// Returns an error if the query fails.
857    pub async fn load_messages_range(
858        &self,
859        conversation_id: ConversationId,
860        after_message_id: MessageId,
861        limit: usize,
862    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
863        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
864
865        let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
866            "SELECT id, role, content FROM messages \
867             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
868             ORDER BY id ASC LIMIT ?"
869        ))
870        .bind(conversation_id)
871        .bind(after_message_id)
872        .bind(effective_limit)
873        .fetch_all(&self.pool)
874        .await?;
875
876        Ok(rows)
877    }
878
879    // ── Eviction helpers ──────────────────────────────────────────────────────
880
881    /// Return all non-deleted message IDs with their eviction metadata.
882    ///
883    /// # Errors
884    ///
885    /// Returns an error if the query fails.
886    pub async fn get_eviction_candidates(
887        &self,
888    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
889        let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
890            "SELECT id, created_at, last_accessed, access_count \
891             FROM messages WHERE deleted_at IS NULL"
892        ))
893        .fetch_all(&self.pool)
894        .await?;
895
896        Ok(rows
897            .into_iter()
898            .map(
899                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
900                    id,
901                    created_at,
902                    last_accessed,
903                    access_count: access_count.try_into().unwrap_or(0),
904                },
905            )
906            .collect())
907    }
908
909    /// Soft-delete a set of messages by marking `deleted_at`.
910    ///
911    /// Soft-deleted messages are excluded from all history queries.
912    ///
913    /// # Errors
914    ///
915    /// Returns an error if the update fails.
916    pub async fn soft_delete_messages(
917        &self,
918        ids: &[MessageId],
919    ) -> Result<(), crate::error::MemoryError> {
920        if ids.is_empty() {
921            return Ok(());
922        }
923        // SQLite does not support array binding natively. Batch via individual updates.
924        for &id in ids {
925            zeph_db::query(
926                sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
927            )
928            .bind(id)
929            .execute(&self.pool)
930            .await?;
931        }
932        Ok(())
933    }
934
935    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
936    ///
937    /// # Errors
938    ///
939    /// Returns an error if the query fails.
940    pub async fn get_soft_deleted_message_ids(
941        &self,
942    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
943        let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
944            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
945        ))
946        .fetch_all(&self.pool)
947        .await?;
948        Ok(rows.into_iter().map(|(id,)| id).collect())
949    }
950
951    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
952    ///
953    /// # Errors
954    ///
955    /// Returns an error if the update fails.
956    pub async fn mark_qdrant_cleaned(
957        &self,
958        ids: &[MessageId],
959    ) -> Result<(), crate::error::MemoryError> {
960        for &id in ids {
961            zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
962                .bind(id)
963                .execute(&self.pool)
964                .await?;
965        }
966        Ok(())
967    }
968
969    /// Fetch `importance_score` values for the given message IDs.
970    ///
971    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the query fails.
976    pub async fn fetch_importance_scores(
977        &self,
978        ids: &[MessageId],
979    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
980        if ids.is_empty() {
981            return Ok(std::collections::HashMap::new());
982        }
983        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
984        let query = format!(
985            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
986        );
987        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
988        for &id in ids {
989            q = q.bind(id);
990        }
991        let rows = q.fetch_all(&self.pool).await?;
992        Ok(rows.into_iter().collect())
993    }
994
995    /// Increment `access_count` and set `last_accessed = CURRENT_TIMESTAMP` for the given IDs.
996    ///
997    /// Skips the update when `ids` is empty.
998    ///
999    /// # Errors
1000    ///
1001    /// Returns an error if the update fails.
1002    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1003        if ids.is_empty() {
1004            return Ok(());
1005        }
1006        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1007        let query = format!(
1008            "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
1009             WHERE id IN ({placeholders})"
1010        );
1011        let mut q = zeph_db::query(&query);
1012        for &id in ids {
1013            q = q.bind(id);
1014        }
1015        q.execute(&self.pool).await?;
1016        Ok(())
1017    }
1018
1019    // ── Tier promotion helpers ─────────────────────────────────────────────────
1020
1021    /// Return episodic messages with `session_count >= min_sessions`, ordered by
1022    /// session count descending then importance score descending.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the query fails.
1027    pub async fn find_promotion_candidates(
1028        &self,
1029        min_sessions: u32,
1030        batch_size: usize,
1031    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
1032        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
1033        let min = i64::from(min_sessions);
1034        let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1035            "SELECT id, conversation_id, content, session_count, importance_score \
1036             FROM messages \
1037             WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1038             ORDER BY session_count DESC, importance_score DESC \
1039             LIMIT ?"
1040        ))
1041        .bind(min)
1042        .bind(limit)
1043        .fetch_all(&self.pool)
1044        .await?;
1045
1046        Ok(rows
1047            .into_iter()
1048            .map(
1049                |(id, conversation_id, content, session_count, importance_score)| {
1050                    PromotionCandidate {
1051                        id,
1052                        conversation_id,
1053                        content,
1054                        session_count: session_count.try_into().unwrap_or(0),
1055                        importance_score,
1056                    }
1057                },
1058            )
1059            .collect())
1060    }
1061
1062    /// Count messages per tier (episodic, semantic) that are not deleted.
1063    ///
1064    /// Returns `(episodic_count, semantic_count)`.
1065    ///
1066    /// # Errors
1067    ///
1068    /// Returns an error if the query fails.
1069    pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1070        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1071            "SELECT tier, COUNT(*) FROM messages \
1072             WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1073             GROUP BY tier"
1074        ))
1075        .fetch_all(&self.pool)
1076        .await?;
1077
1078        let mut episodic = 0i64;
1079        let mut semantic = 0i64;
1080        for (tier, count) in rows {
1081            match tier.as_str() {
1082                "episodic" => episodic = count,
1083                "semantic" => semantic = count,
1084                _ => {}
1085            }
1086        }
1087        Ok((episodic, semantic))
1088    }
1089
1090    /// Count semantic facts (tier='semantic', not deleted).
1091    ///
1092    /// # Errors
1093    ///
1094    /// Returns an error if the query fails.
1095    pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1096        let row: (i64,) = zeph_db::query_as(sql!(
1097            "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1098        ))
1099        .fetch_one(&self.pool)
1100        .await?;
1101        Ok(row.0)
1102    }
1103
1104    /// Promote a set of episodic messages to semantic tier in a single transaction.
1105    ///
1106    /// Within one transaction:
1107    /// 1. Inserts a new message with `tier='semantic'` and `promotion_timestamp=unixepoch()`.
1108    /// 2. Soft-deletes the original episodic messages and marks them `qdrant_cleaned=0`
1109    ///    so the eviction sweep picks up their Qdrant vectors.
1110    ///
1111    /// Returns the `MessageId` of the new semantic message.
1112    ///
1113    /// # Errors
1114    ///
1115    /// Returns an error if the transaction fails.
1116    pub async fn promote_to_semantic(
1117        &self,
1118        conversation_id: ConversationId,
1119        merged_content: &str,
1120        original_ids: &[MessageId],
1121    ) -> Result<MessageId, MemoryError> {
1122        if original_ids.is_empty() {
1123            return Err(MemoryError::Other(
1124                "promote_to_semantic: original_ids must not be empty".into(),
1125            ));
1126        }
1127
1128        // Acquire the write lock immediately (BEGIN IMMEDIATE) to avoid the DEFERRED read->write
1129        // upgrade race that causes SQLITE_BUSY when a concurrent writer holds the lock.
1130        let mut tx = begin_write(&self.pool).await?;
1131
1132        // Insert the new semantic fact.
1133        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1134        let promote_insert_raw = format!(
1135            "INSERT INTO messages \
1136             (conversation_id, role, content, parts, agent_visible, user_visible, \
1137              tier, promotion_timestamp) \
1138             VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1139             RETURNING id"
1140        );
1141        let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1142        let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1143            .bind(conversation_id)
1144            .bind(merged_content)
1145            .fetch_one(&mut *tx)
1146            .await?;
1147
1148        let new_id = row.0;
1149
1150        // Soft-delete originals and reset qdrant_cleaned so eviction sweep removes vectors.
1151        for &id in original_ids {
1152            zeph_db::query(sql!(
1153                "UPDATE messages \
1154                 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1155                 WHERE id = ? AND deleted_at IS NULL"
1156            ))
1157            .bind(id)
1158            .execute(&mut *tx)
1159            .await?;
1160        }
1161
1162        tx.commit().await?;
1163        Ok(new_id)
1164    }
1165
1166    /// Manually promote a set of messages to semantic tier without merging.
1167    ///
1168    /// Sets `tier='semantic'` and `promotion_timestamp=unixepoch()` for the given IDs.
1169    /// Does NOT soft-delete the originals — use this for direct user-requested promotion.
1170    ///
1171    /// # Errors
1172    ///
1173    /// Returns an error if the update fails.
1174    pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1175        if ids.is_empty() {
1176            return Ok(0);
1177        }
1178        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1179        let manual_promote_raw = format!(
1180            "UPDATE messages \
1181             SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1182             WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1183        );
1184        let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1185        let mut count = 0usize;
1186        for &id in ids {
1187            let result = zeph_db::query(&manual_promote_sql)
1188                .bind(id)
1189                .execute(&self.pool)
1190                .await?;
1191            count += usize::try_from(result.rows_affected()).unwrap_or(0);
1192        }
1193        Ok(count)
1194    }
1195
1196    /// Increment `session_count` for all episodic messages in a conversation.
1197    ///
1198    /// Called when a session restores an existing conversation to mark that messages
1199    /// were accessed in a new session. Only episodic (non-deleted) messages are updated.
1200    ///
1201    /// # Errors
1202    ///
1203    /// Returns an error if the database update fails.
1204    pub async fn increment_session_counts_for_conversation(
1205        &self,
1206        conversation_id: ConversationId,
1207    ) -> Result<(), MemoryError> {
1208        zeph_db::query(sql!(
1209            "UPDATE messages SET session_count = session_count + 1 \
1210             WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1211        ))
1212        .bind(conversation_id)
1213        .execute(&self.pool)
1214        .await?;
1215        Ok(())
1216    }
1217
1218    /// Fetch the tier string for each of the given message IDs.
1219    ///
1220    /// Messages not found or already deleted are omitted from the result.
1221    ///
1222    /// # Errors
1223    ///
1224    /// Returns an error if the query fails.
1225    pub async fn fetch_tiers(
1226        &self,
1227        ids: &[MessageId],
1228    ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1229        if ids.is_empty() {
1230            return Ok(std::collections::HashMap::new());
1231        }
1232        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1233        let query = format!(
1234            "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1235        );
1236        let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1237        for &id in ids {
1238            q = q.bind(id);
1239        }
1240        let rows = q.fetch_all(&self.pool).await?;
1241        Ok(rows.into_iter().collect())
1242    }
1243
1244    /// Return all conversation IDs that have at least one non-consolidated original message.
1245    ///
1246    /// Used by the consolidation sweep to find conversations that need processing.
1247    ///
1248    /// # Errors
1249    ///
1250    /// Returns an error if the database query fails.
1251    pub async fn conversations_with_unconsolidated_messages(
1252        &self,
1253    ) -> Result<Vec<ConversationId>, MemoryError> {
1254        let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1255            "SELECT DISTINCT conversation_id FROM messages \
1256             WHERE consolidated = 0 AND deleted_at IS NULL"
1257        ))
1258        .fetch_all(&self.pool)
1259        .await?;
1260        Ok(rows.into_iter().map(|(id,)| id).collect())
1261    }
1262
1263    /// Fetch a batch of non-consolidated original messages for the consolidation sweep.
1264    ///
1265    /// Returns `(id, content)` pairs for messages that have not yet been processed by the
1266    /// consolidation loop (`consolidated = 0`) and are not soft-deleted.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns an error if the database query fails.
1271    pub async fn find_unconsolidated_messages(
1272        &self,
1273        conversation_id: ConversationId,
1274        limit: usize,
1275    ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1276        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1277        let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1278            "SELECT id, content FROM messages \
1279             WHERE conversation_id = ? \
1280               AND consolidated = 0 \
1281               AND deleted_at IS NULL \
1282             ORDER BY id ASC \
1283             LIMIT ?"
1284        ))
1285        .bind(conversation_id)
1286        .bind(limit)
1287        .fetch_all(&self.pool)
1288        .await?;
1289        Ok(rows)
1290    }
1291
1292    /// Look up which consolidated entry (if any) covers the given original message.
1293    ///
1294    /// Returns the `consolidated_id` of the first consolidation product that lists `source_id`
1295    /// in its sources, or `None` if no consolidated entry covers it.
1296    ///
1297    /// # Errors
1298    ///
1299    /// Returns an error if the database query fails.
1300    pub async fn find_consolidated_for_source(
1301        &self,
1302        source_id: MessageId,
1303    ) -> Result<Option<MessageId>, MemoryError> {
1304        let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1305            "SELECT consolidated_id FROM memory_consolidation_sources \
1306             WHERE source_id = ? \
1307             LIMIT 1"
1308        ))
1309        .bind(source_id)
1310        .fetch_optional(&self.pool)
1311        .await?;
1312        Ok(row.map(|(id,)| id))
1313    }
1314
1315    /// Insert a consolidated message and record its source linkage in a single transaction.
1316    ///
1317    /// Atomically:
1318    /// 1. Inserts the consolidated message with `consolidated = 1` and the given confidence.
1319    /// 2. Inserts rows into `memory_consolidation_sources` for each source ID.
1320    /// 3. Marks each source message's `consolidated = 1` so future sweeps skip them.
1321    ///
1322    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1323    ///
1324    /// # Errors
1325    ///
1326    /// Returns an error if any database operation fails. The transaction is rolled back automatically
1327    /// on failure so no partial state is written.
1328    pub async fn apply_consolidation_merge(
1329        &self,
1330        conversation_id: ConversationId,
1331        role: &str,
1332        merged_content: &str,
1333        source_ids: &[MessageId],
1334        confidence: f32,
1335        confidence_threshold: f32,
1336    ) -> Result<bool, MemoryError> {
1337        if confidence < confidence_threshold {
1338            return Ok(false);
1339        }
1340        if source_ids.is_empty() {
1341            return Ok(false);
1342        }
1343
1344        let mut tx = self.pool.begin().await?;
1345
1346        let importance = crate::semantic::importance::compute_importance(merged_content, role);
1347        let row: (MessageId,) = zeph_db::query_as(sql!(
1348            "INSERT INTO messages \
1349               (conversation_id, role, content, parts, agent_visible, user_visible, \
1350                importance_score, consolidated, consolidation_confidence) \
1351             VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1352             RETURNING id"
1353        ))
1354        .bind(conversation_id)
1355        .bind(role)
1356        .bind(merged_content)
1357        .bind(importance)
1358        .bind(confidence)
1359        .fetch_one(&mut *tx)
1360        .await?;
1361        let consolidated_id = row.0;
1362
1363        let consol_sql = format!(
1364            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1365            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1366            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1367        );
1368        for &source_id in source_ids {
1369            zeph_db::query(&consol_sql)
1370                .bind(consolidated_id)
1371                .bind(source_id)
1372                .execute(&mut *tx)
1373                .await?;
1374
1375            // Mark original as consolidated so future sweeps skip it.
1376            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1377                .bind(source_id)
1378                .execute(&mut *tx)
1379                .await?;
1380        }
1381
1382        tx.commit().await?;
1383        Ok(true)
1384    }
1385
1386    /// Update an existing consolidated message in-place with new content.
1387    ///
1388    /// Atomically:
1389    /// 1. Updates `content` and `consolidation_confidence` on `target_id`.
1390    /// 2. Inserts rows into `memory_consolidation_sources` linking `target_id` → each source.
1391    /// 3. Marks each source message's `consolidated = 1`.
1392    ///
1393    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1394    ///
1395    /// # Errors
1396    ///
1397    /// Returns an error if any database operation fails.
1398    pub async fn apply_consolidation_update(
1399        &self,
1400        target_id: MessageId,
1401        new_content: &str,
1402        additional_source_ids: &[MessageId],
1403        confidence: f32,
1404        confidence_threshold: f32,
1405    ) -> Result<bool, MemoryError> {
1406        if confidence < confidence_threshold {
1407            return Ok(false);
1408        }
1409
1410        let mut tx = self.pool.begin().await?;
1411
1412        zeph_db::query(sql!(
1413            "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1414        ))
1415        .bind(new_content)
1416        .bind(confidence)
1417        .bind(target_id)
1418        .execute(&mut *tx)
1419        .await?;
1420
1421        let consol_sql = format!(
1422            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1423            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1424            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1425        );
1426        for &source_id in additional_source_ids {
1427            zeph_db::query(&consol_sql)
1428                .bind(target_id)
1429                .bind(source_id)
1430                .execute(&mut *tx)
1431                .await?;
1432
1433            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1434                .bind(source_id)
1435                .execute(&mut *tx)
1436                .await?;
1437        }
1438
1439        tx.commit().await?;
1440        Ok(true)
1441    }
1442
1443    // ── Forgetting sweep helpers ───────────────────────────────────────────────
1444
1445    /// Set `importance_score` for a single message by ID.
1446    ///
1447    /// Used in tests and by forgetting sweep helpers.
1448    ///
1449    /// # Errors
1450    ///
1451    /// Returns an error if the update fails.
1452    pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1453        zeph_db::query(sql!(
1454            "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1455        ))
1456        .bind(score)
1457        .bind(id)
1458        .execute(&self.pool)
1459        .await?;
1460        Ok(())
1461    }
1462
1463    /// Get `importance_score` for a single message by ID.
1464    ///
1465    /// Returns `None` if the message does not exist or is deleted.
1466    ///
1467    /// # Errors
1468    ///
1469    /// Returns an error if the query fails.
1470    pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1471        let row: Option<(f64,)> = zeph_db::query_as(sql!(
1472            "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1473        ))
1474        .bind(id)
1475        .fetch_optional(&self.pool)
1476        .await?;
1477        Ok(row.map(|(s,)| s))
1478    }
1479
1480    /// Increment `access_count` and update `last_accessed` for a batch of messages.
1481    ///
1482    /// Alias used in forgetting tests; forwards to `increment_access_counts`.
1483    ///
1484    /// # Errors
1485    ///
1486    /// Returns an error if the update fails.
1487    pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1488        self.increment_access_counts(ids).await
1489    }
1490
1491    /// Mark a set of messages as consolidated (`consolidated = 1`).
1492    ///
1493    /// Used in tests to simulate the state after consolidation.
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns an error if the update fails.
1498    pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1499        for &id in ids {
1500            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1501                .bind(id)
1502                .execute(&self.pool)
1503                .await?;
1504        }
1505        Ok(())
1506    }
1507
1508    /// Execute the three-phase forgetting sweep (`SleepGate`) inside a single transaction.
1509    ///
1510    /// **Phase 1** — Downscale all active non-consolidated importance scores by `decay_rate`.
1511    /// **Phase 2** — Undo the decay for messages accessed within `replay_window_hours` or
1512    ///   with `access_count >= replay_min_access_count` (undo current sweep's decay only).
1513    /// **Phase 3** — Soft-delete messages below `forgetting_floor` that are not protected
1514    ///   by recent access (`protect_recent_hours`) or high access count
1515    ///   (`protect_min_access_count`). Uses `batch_size` as a row-count cap.
1516    ///
1517    /// All phases commit atomically: concurrent readers see either the pre-sweep or
1518    /// post-sweep state, never an intermediate.
1519    ///
1520    /// # Errors
1521    ///
1522    /// Returns an error if any database operation fails.
1523    pub async fn run_forgetting_sweep_tx(
1524        &self,
1525        config: &zeph_common::config::memory::ForgettingConfig,
1526    ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1527        let mut tx = self.pool.begin().await?;
1528
1529        let decay = f64::from(config.decay_rate);
1530        let floor = f64::from(config.forgetting_floor);
1531        let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1532        let replay_hours = i64::from(config.replay_window_hours);
1533        let replay_min_access = i64::from(config.replay_min_access_count);
1534        let protect_hours = i64::from(config.protect_recent_hours);
1535        let protect_min_access = i64::from(config.protect_min_access_count);
1536
1537        // Phase 1: downscale all active, non-consolidated messages (limited to batch_size).
1538        // We target a specific set of IDs to respect sweep_batch_size.
1539        let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1540            "SELECT id FROM messages \
1541             WHERE deleted_at IS NULL AND consolidated = 0 \
1542             ORDER BY importance_score ASC \
1543             LIMIT ?"
1544        ))
1545        .bind(batch)
1546        .fetch_all(&mut *tx)
1547        .await?;
1548
1549        #[allow(clippy::cast_possible_truncation)]
1550        let downscaled = candidate_ids.len() as u32;
1551
1552        if downscaled > 0 {
1553            let placeholders: String = candidate_ids
1554                .iter()
1555                .map(|_| "?")
1556                .collect::<Vec<_>>()
1557                .join(",");
1558            let downscale_sql = format!(
1559                "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1560                 WHERE id IN ({placeholders})"
1561            );
1562            let mut q = zeph_db::query(&downscale_sql);
1563            for &(id,) in &candidate_ids {
1564                q = q.bind(id);
1565            }
1566            q.execute(&mut *tx).await?;
1567        }
1568
1569        // Phase 2: selective replay — undo decay for recently-accessed messages.
1570        // Formula: score / (1 - decay_rate) restores the current sweep's downscaling.
1571        // Cap at 1.0 to avoid exceeding the maximum importance score.
1572        // Scoped to the Phase 1 batch only: messages not decayed this sweep must not
1573        // have their scores inflated by the inverse formula.
1574        let replayed = if downscaled > 0 {
1575            let replay_placeholders: String = candidate_ids
1576                .iter()
1577                .map(|_| "?")
1578                .collect::<Vec<_>>()
1579                .join(",");
1580            let replay_sql = format!(
1581                "UPDATE messages \
1582                 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1583                 WHERE id IN ({replay_placeholders}) \
1584                 AND (\
1585                     (last_accessed IS NOT NULL \
1586                      AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1587                     OR access_count >= ?\
1588                 )"
1589            );
1590            let mut rq = zeph_db::query(&replay_sql);
1591            for &(id,) in &candidate_ids {
1592                rq = rq.bind(id);
1593            }
1594            let replay_result = rq
1595                .bind(replay_hours)
1596                .bind(replay_min_access)
1597                .execute(&mut *tx)
1598                .await?;
1599            #[allow(clippy::cast_possible_truncation)]
1600            let n = replay_result.rows_affected() as u32;
1601            n
1602        } else {
1603            0
1604        };
1605
1606        // Phase 3: targeted forgetting — soft-delete low-score unprotected messages.
1607        let prune_sql = format!(
1608            "UPDATE messages \
1609             SET deleted_at = CURRENT_TIMESTAMP \
1610             WHERE deleted_at IS NULL AND consolidated = 0 \
1611             AND importance_score < {floor} \
1612             AND (\
1613                 last_accessed IS NULL \
1614                 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1615             ) \
1616             AND access_count < ?"
1617        );
1618        let prune_result = zeph_db::query(&prune_sql)
1619            .bind(protect_hours)
1620            .bind(protect_min_access)
1621            .execute(&mut *tx)
1622            .await?;
1623        #[allow(clippy::cast_possible_truncation)]
1624        let pruned = prune_result.rows_affected() as u32;
1625
1626        tx.commit().await?;
1627
1628        Ok(crate::forgetting::ForgettingResult {
1629            downscaled,
1630            replayed,
1631            pruned,
1632        })
1633    }
1634}
1635
1636/// A candidate message for tier promotion, returned by [`SqliteStore::find_promotion_candidates`].
1637#[derive(Debug, Clone)]
1638pub struct PromotionCandidate {
1639    pub id: MessageId,
1640    pub conversation_id: ConversationId,
1641    pub content: String,
1642    pub session_count: u32,
1643    pub importance_score: f64,
1644}
1645
1646#[cfg(test)]
1647mod tests;