Skip to main content

zeph_memory/store/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18    match s {
19        "assistant" => Role::Assistant,
20        "system" => Role::System,
21        _ => Role::User,
22    }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27    match role {
28        Role::System => "system",
29        Role::User => "user",
30        Role::Assistant => "assistant",
31    }
32}
33
34/// Map a legacy externally-tagged variant key to the `kind` value used by the current
35/// internally-tagged schema.
36fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37    match key {
38        "Text" => Some("text"),
39        "ToolOutput" => Some("tool_output"),
40        "Recall" => Some("recall"),
41        "CodeContext" => Some("code_context"),
42        "Summary" => Some("summary"),
43        "CrossSession" => Some("cross_session"),
44        "ToolUse" => Some("tool_use"),
45        "ToolResult" => Some("tool_result"),
46        "Image" => Some("image"),
47        "ThinkingBlock" => Some("thinking_block"),
48        "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49        "Compaction" => Some("compaction"),
50        _ => None,
51    }
52}
53
54/// Attempt to parse a JSON string written in the pre-v0.17.1 externally-tagged format.
55///
56/// Old format: `[{"Summary":{"text":"..."}}, ...]`
57/// New format: `[{"kind":"summary","text":"..."}, ...]`
58///
59/// Returns `None` if the input does not look like the old format or if any element fails
60/// to deserialize after conversion.
61fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62    let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63    let mut result = Vec::with_capacity(array.len());
64    for element in array {
65        let obj = element.as_object()?;
66        if obj.contains_key("kind") {
67            return None;
68        }
69        if obj.len() != 1 {
70            return None;
71        }
72        let (key, inner) = obj.iter().next()?;
73        let kind = legacy_key_to_kind(key)?;
74        let mut new_obj = match inner {
75            serde_json::Value::Object(m) => m.clone(),
76            // Image variant wraps a single object directly
77            other => {
78                let mut m = serde_json::Map::new();
79                m.insert("data".to_string(), other.clone());
80                m
81            }
82        };
83        new_obj.insert(
84            "kind".to_string(),
85            serde_json::Value::String(kind.to_string()),
86        );
87        let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88        result.push(part);
89    }
90    Some(result)
91}
92
93/// Deserialize message parts from a stored JSON string.
94///
95/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
96/// a truncated excerpt of the malformed JSON for diagnostics.
97fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98    if parts_json == "[]" {
99        return vec![];
100    }
101    match serde_json::from_str(parts_json) {
102        Ok(p) => p,
103        Err(e) => {
104            if let Some(parts) = try_parse_legacy_parts(parts_json) {
105                let truncated = parts_json.chars().take(120).collect::<String>();
106                tracing::warn!(
107                    role = %role_str,
108                    parts_json = %truncated,
109                    "loaded legacy-format message parts via compat path"
110                );
111                return parts;
112            }
113            let truncated = parts_json.chars().take(120).collect::<String>();
114            tracing::warn!(
115                role = %role_str,
116                parts_json = %truncated,
117                error = %e,
118                "failed to deserialize message parts, falling back to empty"
119            );
120            vec![]
121        }
122    }
123}
124
125impl SqliteStore {
126    /// Create a new conversation and return its ID.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the insert fails.
131    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132        let row: (ConversationId,) = zeph_db::query_as(sql!(
133            "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134        ))
135        .fetch_one(&self.pool)
136        .await?;
137        Ok(row.0)
138    }
139
140    /// Save a message to the given conversation and return the message ID.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the insert fails.
145    pub async fn save_message(
146        &self,
147        conversation_id: ConversationId,
148        role: &str,
149        content: &str,
150    ) -> Result<MessageId, MemoryError> {
151        self.save_message_with_parts(conversation_id, role, content, "[]")
152            .await
153    }
154
155    /// Save a message with structured parts JSON.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the insert fails.
160    pub async fn save_message_with_parts(
161        &self,
162        conversation_id: ConversationId,
163        role: &str,
164        content: &str,
165        parts_json: &str,
166    ) -> Result<MessageId, MemoryError> {
167        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
168            .await
169    }
170
171    /// Save a message with visibility metadata.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the insert fails.
176    pub async fn save_message_with_metadata(
177        &self,
178        conversation_id: ConversationId,
179        role: &str,
180        content: &str,
181        parts_json: &str,
182        agent_visible: bool,
183        user_visible: bool,
184    ) -> Result<MessageId, MemoryError> {
185        let importance_score = crate::semantic::importance::compute_importance(content, role);
186        let row: (MessageId,) = zeph_db::query_as(
187            sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
188             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
189        )
190        .bind(conversation_id)
191        .bind(role)
192        .bind(content)
193        .bind(parts_json)
194        .bind(i64::from(agent_visible))
195        .bind(i64::from(user_visible))
196        .bind(importance_score)
197        .fetch_one(&self.pool)
198        .await?;
199        Ok(row.0)
200    }
201
202    /// Load the most recent messages for a conversation, up to `limit`.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the query fails.
207    pub async fn load_history(
208        &self,
209        conversation_id: ConversationId,
210        limit: u32,
211    ) -> Result<Vec<Message>, MemoryError> {
212        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
213            "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
214                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
215                WHERE conversation_id = ? AND deleted_at IS NULL \
216                ORDER BY id DESC \
217                LIMIT ?\
218             ) ORDER BY id ASC"
219        ))
220        .bind(conversation_id)
221        .bind(limit)
222        .fetch_all(&self.pool)
223        .await?;
224
225        let messages = rows
226            .into_iter()
227            .map(
228                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
229                    let parts = parse_parts_json(&role_str, &parts_json);
230                    Message {
231                        role: parse_role(&role_str),
232                        content,
233                        parts,
234                        metadata: MessageMetadata {
235                            agent_visible: agent_visible != 0,
236                            user_visible: user_visible != 0,
237                            compacted_at: None,
238                            deferred_summary: None,
239                            focus_pinned: false,
240                            focus_marker_id: None,
241                            db_id: Some(row_id),
242                        },
243                    }
244                },
245            )
246            .collect();
247        Ok(messages)
248    }
249
250    /// Load messages filtered by visibility flags.
251    ///
252    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the query fails.
257    pub async fn load_history_filtered(
258        &self,
259        conversation_id: ConversationId,
260        limit: u32,
261        agent_visible: Option<bool>,
262        user_visible: Option<bool>,
263    ) -> Result<Vec<Message>, MemoryError> {
264        let av = agent_visible.map(i64::from);
265        let uv = user_visible.map(i64::from);
266
267        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
268            sql!("WITH recent AS (\
269                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
270                WHERE conversation_id = ? \
271                  AND deleted_at IS NULL \
272                  AND (? IS NULL OR agent_visible = ?) \
273                  AND (? IS NULL OR user_visible = ?) \
274                ORDER BY id DESC \
275                LIMIT ?\
276             ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
277        )
278        .bind(conversation_id)
279        .bind(av)
280        .bind(av)
281        .bind(uv)
282        .bind(uv)
283        .bind(limit)
284        .fetch_all(&self.pool)
285        .await?;
286
287        let messages = rows
288            .into_iter()
289            .map(
290                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
291                    let parts = parse_parts_json(&role_str, &parts_json);
292                    Message {
293                        role: parse_role(&role_str),
294                        content,
295                        parts,
296                        metadata: MessageMetadata {
297                            agent_visible: agent_visible != 0,
298                            user_visible: user_visible != 0,
299                            compacted_at: None,
300                            deferred_summary: None,
301                            focus_pinned: false,
302                            focus_marker_id: None,
303                            db_id: Some(row_id),
304                        },
305                    }
306                },
307            )
308            .collect();
309        Ok(messages)
310    }
311
312    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
313    ///
314    /// Within a single transaction:
315    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
316    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
317    ///
318    /// Returns the `MessageId` of the inserted summary.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if the transaction fails.
323    pub async fn replace_conversation(
324        &self,
325        conversation_id: ConversationId,
326        compacted_range: std::ops::RangeInclusive<MessageId>,
327        summary_role: &str,
328        summary_content: &str,
329    ) -> Result<MessageId, MemoryError> {
330        let now = {
331            let secs = std::time::SystemTime::now()
332                .duration_since(std::time::UNIX_EPOCH)
333                .unwrap_or_default()
334                .as_secs();
335            format!("{secs}")
336        };
337        let start_id = compacted_range.start().0;
338        let end_id = compacted_range.end().0;
339
340        let mut tx = self.pool.begin().await?;
341
342        zeph_db::query(sql!(
343            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
344             WHERE conversation_id = ? AND id >= ? AND id <= ?"
345        ))
346        .bind(&now)
347        .bind(conversation_id)
348        .bind(start_id)
349        .bind(end_id)
350        .execute(&mut *tx)
351        .await?;
352
353        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
354        let row: (MessageId,) = zeph_db::query_as(sql!(
355            "INSERT INTO messages \
356             (conversation_id, role, content, parts, agent_visible, user_visible) \
357             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
358        ))
359        .bind(conversation_id)
360        .bind(summary_role)
361        .bind(summary_content)
362        .fetch_one(&mut *tx)
363        .await?;
364
365        tx.commit().await?;
366
367        Ok(row.0)
368    }
369
370    /// Atomically hide `tool_use/tool_result` message pairs and insert summary messages.
371    ///
372    /// Within a single transaction:
373    /// 1. Sets `agent_visible=0, compacted_at=<now>` for each ID in `hide_ids`.
374    /// 2. Inserts each text in `summaries` as a new agent-only assistant message.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the transaction fails.
379    pub async fn apply_tool_pair_summaries(
380        &self,
381        conversation_id: ConversationId,
382        hide_ids: &[i64],
383        summaries: &[String],
384    ) -> Result<(), MemoryError> {
385        if hide_ids.is_empty() && summaries.is_empty() {
386            return Ok(());
387        }
388
389        let now = std::time::SystemTime::now()
390            .duration_since(std::time::UNIX_EPOCH)
391            .unwrap_or_default()
392            .as_secs()
393            .to_string();
394
395        let mut tx = self.pool.begin().await?;
396
397        for &id in hide_ids {
398            zeph_db::query(sql!(
399                "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
400            ))
401            .bind(&now)
402            .bind(id)
403            .execute(&mut *tx)
404            .await?;
405        }
406
407        for summary in summaries {
408            let content = format!("[tool summary] {summary}");
409            let parts = serde_json::to_string(&[MessagePart::Summary {
410                text: summary.clone(),
411            }])
412            .unwrap_or_else(|_| "[]".to_string());
413            zeph_db::query(sql!(
414                "INSERT INTO messages \
415                 (conversation_id, role, content, parts, agent_visible, user_visible) \
416                 VALUES (?, 'assistant', ?, ?, 1, 0)"
417            ))
418            .bind(conversation_id)
419            .bind(&content)
420            .bind(&parts)
421            .execute(&mut *tx)
422            .await?;
423        }
424
425        tx.commit().await?;
426        Ok(())
427    }
428
429    /// Return the IDs of the N oldest messages in a conversation (ascending order).
430    ///
431    /// # Errors
432    ///
433    /// Returns an error if the query fails.
434    pub async fn oldest_message_ids(
435        &self,
436        conversation_id: ConversationId,
437        n: u32,
438    ) -> Result<Vec<MessageId>, MemoryError> {
439        let rows: Vec<(MessageId,)> = zeph_db::query_as(
440            sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
441        )
442        .bind(conversation_id)
443        .bind(n)
444        .fetch_all(&self.pool)
445        .await?;
446        Ok(rows.into_iter().map(|r| r.0).collect())
447    }
448
449    /// Return the ID of the most recent conversation, if any.
450    ///
451    /// # Errors
452    ///
453    /// Returns an error if the query fails.
454    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
455        let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
456            "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
457        ))
458        .fetch_optional(&self.pool)
459        .await?;
460        Ok(row.map(|r| r.0))
461    }
462
463    /// Fetch a single message by its ID.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if the query fails.
468    pub async fn message_by_id(
469        &self,
470        message_id: MessageId,
471    ) -> Result<Option<Message>, MemoryError> {
472        let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
473            sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
474        )
475        .bind(message_id)
476        .fetch_optional(&self.pool)
477        .await?;
478
479        Ok(row.map(
480            |(role_str, content, parts_json, agent_visible, user_visible)| {
481                let parts = parse_parts_json(&role_str, &parts_json);
482                Message {
483                    role: parse_role(&role_str),
484                    content,
485                    parts,
486                    metadata: MessageMetadata {
487                        agent_visible: agent_visible != 0,
488                        user_visible: user_visible != 0,
489                        compacted_at: None,
490                        deferred_summary: None,
491                        focus_pinned: false,
492                        focus_marker_id: None,
493                        db_id: None,
494                    },
495                }
496            },
497        ))
498    }
499
500    /// Fetch messages by a list of IDs in a single query.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if the query fails.
505    pub async fn messages_by_ids(
506        &self,
507        ids: &[MessageId],
508    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
509        if ids.is_empty() {
510            return Ok(Vec::new());
511        }
512
513        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
514
515        let query = format!(
516            "SELECT id, role, content, parts FROM messages \
517             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
518        );
519        let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
520        for &id in ids {
521            q = q.bind(id);
522        }
523
524        let rows = q.fetch_all(&self.pool).await?;
525
526        Ok(rows
527            .into_iter()
528            .map(|(id, role_str, content, parts_json)| {
529                let parts = parse_parts_json(&role_str, &parts_json);
530                (
531                    id,
532                    Message {
533                        role: parse_role(&role_str),
534                        content,
535                        parts,
536                        metadata: MessageMetadata::default(),
537                    },
538                )
539            })
540            .collect())
541    }
542
543    /// Return message IDs and content for messages without embeddings.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if the query fails.
548    pub async fn unembedded_message_ids(
549        &self,
550        limit: Option<usize>,
551    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
552        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
553
554        let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
555            "SELECT m.id, m.conversation_id, m.role, m.content \
556             FROM messages m \
557             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
558             WHERE em.id IS NULL AND m.deleted_at IS NULL \
559             ORDER BY m.id ASC \
560             LIMIT ?"
561        ))
562        .bind(effective_limit)
563        .fetch_all(&self.pool)
564        .await?;
565
566        Ok(rows)
567    }
568
569    /// Stream message IDs and content for messages without embeddings, one row at a time.
570    ///
571    /// Executes the same query as [`Self::unembedded_message_ids`] but returns a streaming
572    /// cursor instead of loading all rows into a `Vec`. The `SQLite` read transaction is held
573    /// for the duration of iteration; callers must not write to `embeddings_metadata` while
574    /// the stream is live (use a separate async task for writes).
575    ///
576    /// # Errors
577    ///
578    /// Yields a [`MemoryError`] if the query or row decoding fails.
579    pub fn stream_unembedded_messages(
580        &self,
581        limit: i64,
582    ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
583    {
584        zeph_db::query_as(sql!(
585            "SELECT m.id, m.conversation_id, m.role, m.content \
586             FROM messages m \
587             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
588             WHERE em.id IS NULL AND m.deleted_at IS NULL \
589             ORDER BY m.id ASC \
590             LIMIT ?"
591        ))
592        .bind(limit)
593        .fetch(&self.pool)
594        .map_err(MemoryError::from)
595    }
596
597    /// Count the number of messages in a conversation.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the query fails.
602    pub async fn count_messages(
603        &self,
604        conversation_id: ConversationId,
605    ) -> Result<i64, MemoryError> {
606        let row: (i64,) = zeph_db::query_as(sql!(
607            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
608        ))
609        .bind(conversation_id)
610        .fetch_one(&self.pool)
611        .await?;
612        Ok(row.0)
613    }
614
615    /// Count messages in a conversation with id greater than `after_id`.
616    ///
617    /// # Errors
618    ///
619    /// Returns an error if the query fails.
620    pub async fn count_messages_after(
621        &self,
622        conversation_id: ConversationId,
623        after_id: MessageId,
624    ) -> Result<i64, MemoryError> {
625        let row: (i64,) =
626            zeph_db::query_as(
627                sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
628            )
629            .bind(conversation_id)
630            .bind(after_id)
631            .fetch_one(&self.pool)
632            .await?;
633        Ok(row.0)
634    }
635
636    /// Full-text keyword search over messages using FTS5.
637    ///
638    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
639    /// negated to positive for consistency with vector scores).
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if the query fails.
644    pub async fn keyword_search(
645        &self,
646        query: &str,
647        limit: usize,
648        conversation_id: Option<ConversationId>,
649    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
650        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
651        let safe_query = sanitize_fts_query(query);
652        if safe_query.is_empty() {
653            return Ok(Vec::new());
654        }
655
656        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
657            zeph_db::query_as(
658                sql!("SELECT m.id, -rank AS score \
659                 FROM messages_fts f \
660                 JOIN messages m ON m.id = f.rowid \
661                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
662                 ORDER BY rank \
663                 LIMIT ?"),
664            )
665            .bind(&safe_query)
666            .bind(cid)
667            .bind(effective_limit)
668            .fetch_all(&self.pool)
669            .await?
670        } else {
671            zeph_db::query_as(sql!(
672                "SELECT m.id, -rank AS score \
673                 FROM messages_fts f \
674                 JOIN messages m ON m.id = f.rowid \
675                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
676                 ORDER BY rank \
677                 LIMIT ?"
678            ))
679            .bind(&safe_query)
680            .bind(effective_limit)
681            .fetch_all(&self.pool)
682            .await?
683        };
684
685        Ok(rows)
686    }
687
688    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
689    ///
690    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
691    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
692    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
693    ///
694    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
695    /// `None` means "no bound" on that side.
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if the query fails.
700    pub async fn keyword_search_with_time_range(
701        &self,
702        query: &str,
703        limit: usize,
704        conversation_id: Option<ConversationId>,
705        after: Option<&str>,
706        before: Option<&str>,
707    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
708        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
709        let safe_query = sanitize_fts_query(query);
710        if safe_query.is_empty() {
711            return Ok(Vec::new());
712        }
713
714        // Build time-range clauses dynamically. Both bounds are optional.
715        let after_clause = if after.is_some() {
716            " AND m.created_at > ?"
717        } else {
718            ""
719        };
720        let before_clause = if before.is_some() {
721            " AND m.created_at < ?"
722        } else {
723            ""
724        };
725        let conv_clause = if conversation_id.is_some() {
726            " AND m.conversation_id = ?"
727        } else {
728            ""
729        };
730
731        let sql = format!(
732            "SELECT m.id, -rank AS score \
733             FROM messages_fts f \
734             JOIN messages m ON m.id = f.rowid \
735             WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
736             {after_clause}{before_clause}{conv_clause} \
737             ORDER BY rank \
738             LIMIT ?"
739        );
740
741        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
742        if let Some(a) = after {
743            q = q.bind(a);
744        }
745        if let Some(b) = before {
746            q = q.bind(b);
747        }
748        if let Some(cid) = conversation_id {
749            q = q.bind(cid);
750        }
751        q = q.bind(effective_limit);
752
753        Ok(q.fetch_all(&self.pool).await?)
754    }
755
756    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
757    ///
758    /// Messages without a `created_at` column fall back to 0.
759    ///
760    /// # Errors
761    ///
762    /// Returns an error if the query fails.
763    pub async fn message_timestamps(
764        &self,
765        ids: &[MessageId],
766    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
767        if ids.is_empty() {
768            return Ok(std::collections::HashMap::new());
769        }
770
771        let placeholders: String =
772            zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
773        let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
774        let query = format!(
775            "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
776        );
777        let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
778        for &id in ids {
779            q = q.bind(id);
780        }
781
782        let rows = q.fetch_all(&self.pool).await?;
783        Ok(rows.into_iter().collect())
784    }
785
786    /// Load a range of messages after a given message ID.
787    ///
788    /// # Errors
789    ///
790    /// Returns an error if the query fails.
791    pub async fn load_messages_range(
792        &self,
793        conversation_id: ConversationId,
794        after_message_id: MessageId,
795        limit: usize,
796    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
797        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
798
799        let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
800            "SELECT id, role, content FROM messages \
801             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
802             ORDER BY id ASC LIMIT ?"
803        ))
804        .bind(conversation_id)
805        .bind(after_message_id)
806        .bind(effective_limit)
807        .fetch_all(&self.pool)
808        .await?;
809
810        Ok(rows)
811    }
812
813    // ── Eviction helpers ──────────────────────────────────────────────────────
814
815    /// Return all non-deleted message IDs with their eviction metadata.
816    ///
817    /// # Errors
818    ///
819    /// Returns an error if the query fails.
820    pub async fn get_eviction_candidates(
821        &self,
822    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
823        let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
824            "SELECT id, created_at, last_accessed, access_count \
825             FROM messages WHERE deleted_at IS NULL"
826        ))
827        .fetch_all(&self.pool)
828        .await?;
829
830        Ok(rows
831            .into_iter()
832            .map(
833                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
834                    id,
835                    created_at,
836                    last_accessed,
837                    access_count: access_count.try_into().unwrap_or(0),
838                },
839            )
840            .collect())
841    }
842
843    /// Soft-delete a set of messages by marking `deleted_at`.
844    ///
845    /// Soft-deleted messages are excluded from all history queries.
846    ///
847    /// # Errors
848    ///
849    /// Returns an error if the update fails.
850    pub async fn soft_delete_messages(
851        &self,
852        ids: &[MessageId],
853    ) -> Result<(), crate::error::MemoryError> {
854        if ids.is_empty() {
855            return Ok(());
856        }
857        // SQLite does not support array binding natively. Batch via individual updates.
858        for &id in ids {
859            zeph_db::query(
860                sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
861            )
862            .bind(id)
863            .execute(&self.pool)
864            .await?;
865        }
866        Ok(())
867    }
868
869    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if the query fails.
874    pub async fn get_soft_deleted_message_ids(
875        &self,
876    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
877        let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
878            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
879        ))
880        .fetch_all(&self.pool)
881        .await?;
882        Ok(rows.into_iter().map(|(id,)| id).collect())
883    }
884
885    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if the update fails.
890    pub async fn mark_qdrant_cleaned(
891        &self,
892        ids: &[MessageId],
893    ) -> Result<(), crate::error::MemoryError> {
894        for &id in ids {
895            zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
896                .bind(id)
897                .execute(&self.pool)
898                .await?;
899        }
900        Ok(())
901    }
902
903    /// Fetch `importance_score` values for the given message IDs.
904    ///
905    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
906    ///
907    /// # Errors
908    ///
909    /// Returns an error if the query fails.
910    pub async fn fetch_importance_scores(
911        &self,
912        ids: &[MessageId],
913    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
914        if ids.is_empty() {
915            return Ok(std::collections::HashMap::new());
916        }
917        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
918        let query = format!(
919            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
920        );
921        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
922        for &id in ids {
923            q = q.bind(id);
924        }
925        let rows = q.fetch_all(&self.pool).await?;
926        Ok(rows.into_iter().collect())
927    }
928
929    /// Increment `access_count` and set `last_accessed = CURRENT_TIMESTAMP` for the given IDs.
930    ///
931    /// Skips the update when `ids` is empty.
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the update fails.
936    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
937        if ids.is_empty() {
938            return Ok(());
939        }
940        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
941        let query = format!(
942            "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
943             WHERE id IN ({placeholders})"
944        );
945        let mut q = zeph_db::query(&query);
946        for &id in ids {
947            q = q.bind(id);
948        }
949        q.execute(&self.pool).await?;
950        Ok(())
951    }
952
953    // ── Tier promotion helpers ─────────────────────────────────────────────────
954
955    /// Return episodic messages with `session_count >= min_sessions`, ordered by
956    /// session count descending then importance score descending.
957    ///
958    /// # Errors
959    ///
960    /// Returns an error if the query fails.
961    pub async fn find_promotion_candidates(
962        &self,
963        min_sessions: u32,
964        batch_size: usize,
965    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
966        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
967        let min = i64::from(min_sessions);
968        let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
969            "SELECT id, conversation_id, content, session_count, importance_score \
970             FROM messages \
971             WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
972             ORDER BY session_count DESC, importance_score DESC \
973             LIMIT ?"
974        ))
975        .bind(min)
976        .bind(limit)
977        .fetch_all(&self.pool)
978        .await?;
979
980        Ok(rows
981            .into_iter()
982            .map(
983                |(id, conversation_id, content, session_count, importance_score)| {
984                    PromotionCandidate {
985                        id,
986                        conversation_id,
987                        content,
988                        session_count: session_count.try_into().unwrap_or(0),
989                        importance_score,
990                    }
991                },
992            )
993            .collect())
994    }
995
996    /// Count messages per tier (episodic, semantic) that are not deleted.
997    ///
998    /// Returns `(episodic_count, semantic_count)`.
999    ///
1000    /// # Errors
1001    ///
1002    /// Returns an error if the query fails.
1003    pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1004        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1005            "SELECT tier, COUNT(*) FROM messages \
1006             WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1007             GROUP BY tier"
1008        ))
1009        .fetch_all(&self.pool)
1010        .await?;
1011
1012        let mut episodic = 0i64;
1013        let mut semantic = 0i64;
1014        for (tier, count) in rows {
1015            match tier.as_str() {
1016                "episodic" => episodic = count,
1017                "semantic" => semantic = count,
1018                _ => {}
1019            }
1020        }
1021        Ok((episodic, semantic))
1022    }
1023
1024    /// Count semantic facts (tier='semantic', not deleted).
1025    ///
1026    /// # Errors
1027    ///
1028    /// Returns an error if the query fails.
1029    pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1030        let row: (i64,) = zeph_db::query_as(sql!(
1031            "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1032        ))
1033        .fetch_one(&self.pool)
1034        .await?;
1035        Ok(row.0)
1036    }
1037
1038    /// Promote a set of episodic messages to semantic tier in a single transaction.
1039    ///
1040    /// Within one transaction:
1041    /// 1. Inserts a new message with `tier='semantic'` and `promotion_timestamp=unixepoch()`.
1042    /// 2. Soft-deletes the original episodic messages and marks them `qdrant_cleaned=0`
1043    ///    so the eviction sweep picks up their Qdrant vectors.
1044    ///
1045    /// Returns the `MessageId` of the new semantic message.
1046    ///
1047    /// # Errors
1048    ///
1049    /// Returns an error if the transaction fails.
1050    pub async fn promote_to_semantic(
1051        &self,
1052        conversation_id: ConversationId,
1053        merged_content: &str,
1054        original_ids: &[MessageId],
1055    ) -> Result<MessageId, MemoryError> {
1056        if original_ids.is_empty() {
1057            return Err(MemoryError::Other(
1058                "promote_to_semantic: original_ids must not be empty".into(),
1059            ));
1060        }
1061
1062        // Acquire the write lock immediately (BEGIN IMMEDIATE) to avoid the DEFERRED read->write
1063        // upgrade race that causes SQLITE_BUSY when a concurrent writer holds the lock.
1064        let mut tx = begin_write(&self.pool).await?;
1065
1066        // Insert the new semantic fact.
1067        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1068        let promote_insert_raw = format!(
1069            "INSERT INTO messages \
1070             (conversation_id, role, content, parts, agent_visible, user_visible, \
1071              tier, promotion_timestamp) \
1072             VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1073             RETURNING id"
1074        );
1075        let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1076        let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1077            .bind(conversation_id)
1078            .bind(merged_content)
1079            .fetch_one(&mut *tx)
1080            .await?;
1081
1082        let new_id = row.0;
1083
1084        // Soft-delete originals and reset qdrant_cleaned so eviction sweep removes vectors.
1085        for &id in original_ids {
1086            zeph_db::query(sql!(
1087                "UPDATE messages \
1088                 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1089                 WHERE id = ? AND deleted_at IS NULL"
1090            ))
1091            .bind(id)
1092            .execute(&mut *tx)
1093            .await?;
1094        }
1095
1096        tx.commit().await?;
1097        Ok(new_id)
1098    }
1099
1100    /// Manually promote a set of messages to semantic tier without merging.
1101    ///
1102    /// Sets `tier='semantic'` and `promotion_timestamp=unixepoch()` for the given IDs.
1103    /// Does NOT soft-delete the originals — use this for direct user-requested promotion.
1104    ///
1105    /// # Errors
1106    ///
1107    /// Returns an error if the update fails.
1108    pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1109        if ids.is_empty() {
1110            return Ok(0);
1111        }
1112        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1113        let manual_promote_raw = format!(
1114            "UPDATE messages \
1115             SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1116             WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1117        );
1118        let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1119        let mut count = 0usize;
1120        for &id in ids {
1121            let result = zeph_db::query(&manual_promote_sql)
1122                .bind(id)
1123                .execute(&self.pool)
1124                .await?;
1125            count += usize::try_from(result.rows_affected()).unwrap_or(0);
1126        }
1127        Ok(count)
1128    }
1129
1130    /// Increment `session_count` for all episodic messages in a conversation.
1131    ///
1132    /// Called when a session restores an existing conversation to mark that messages
1133    /// were accessed in a new session. Only episodic (non-deleted) messages are updated.
1134    ///
1135    /// # Errors
1136    ///
1137    /// Returns an error if the database update fails.
1138    pub async fn increment_session_counts_for_conversation(
1139        &self,
1140        conversation_id: ConversationId,
1141    ) -> Result<(), MemoryError> {
1142        zeph_db::query(sql!(
1143            "UPDATE messages SET session_count = session_count + 1 \
1144             WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1145        ))
1146        .bind(conversation_id)
1147        .execute(&self.pool)
1148        .await?;
1149        Ok(())
1150    }
1151
1152    /// Fetch the tier string for each of the given message IDs.
1153    ///
1154    /// Messages not found or already deleted are omitted from the result.
1155    ///
1156    /// # Errors
1157    ///
1158    /// Returns an error if the query fails.
1159    pub async fn fetch_tiers(
1160        &self,
1161        ids: &[MessageId],
1162    ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1163        if ids.is_empty() {
1164            return Ok(std::collections::HashMap::new());
1165        }
1166        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1167        let query = format!(
1168            "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1169        );
1170        let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1171        for &id in ids {
1172            q = q.bind(id);
1173        }
1174        let rows = q.fetch_all(&self.pool).await?;
1175        Ok(rows.into_iter().collect())
1176    }
1177
1178    /// Return all conversation IDs that have at least one non-consolidated original message.
1179    ///
1180    /// Used by the consolidation sweep to find conversations that need processing.
1181    ///
1182    /// # Errors
1183    ///
1184    /// Returns an error if the database query fails.
1185    pub async fn conversations_with_unconsolidated_messages(
1186        &self,
1187    ) -> Result<Vec<ConversationId>, MemoryError> {
1188        let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1189            "SELECT DISTINCT conversation_id FROM messages \
1190             WHERE consolidated = 0 AND deleted_at IS NULL"
1191        ))
1192        .fetch_all(&self.pool)
1193        .await?;
1194        Ok(rows.into_iter().map(|(id,)| id).collect())
1195    }
1196
1197    /// Fetch a batch of non-consolidated original messages for the consolidation sweep.
1198    ///
1199    /// Returns `(id, content)` pairs for messages that have not yet been processed by the
1200    /// consolidation loop (`consolidated = 0`) and are not soft-deleted.
1201    ///
1202    /// # Errors
1203    ///
1204    /// Returns an error if the database query fails.
1205    pub async fn find_unconsolidated_messages(
1206        &self,
1207        conversation_id: ConversationId,
1208        limit: usize,
1209    ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1210        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1211        let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1212            "SELECT id, content FROM messages \
1213             WHERE conversation_id = ? \
1214               AND consolidated = 0 \
1215               AND deleted_at IS NULL \
1216             ORDER BY id ASC \
1217             LIMIT ?"
1218        ))
1219        .bind(conversation_id)
1220        .bind(limit)
1221        .fetch_all(&self.pool)
1222        .await?;
1223        Ok(rows)
1224    }
1225
1226    /// Look up which consolidated entry (if any) covers the given original message.
1227    ///
1228    /// Returns the `consolidated_id` of the first consolidation product that lists `source_id`
1229    /// in its sources, or `None` if no consolidated entry covers it.
1230    ///
1231    /// # Errors
1232    ///
1233    /// Returns an error if the database query fails.
1234    pub async fn find_consolidated_for_source(
1235        &self,
1236        source_id: MessageId,
1237    ) -> Result<Option<MessageId>, MemoryError> {
1238        let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1239            "SELECT consolidated_id FROM memory_consolidation_sources \
1240             WHERE source_id = ? \
1241             LIMIT 1"
1242        ))
1243        .bind(source_id)
1244        .fetch_optional(&self.pool)
1245        .await?;
1246        Ok(row.map(|(id,)| id))
1247    }
1248
1249    /// Insert a consolidated message and record its source linkage in a single transaction.
1250    ///
1251    /// Atomically:
1252    /// 1. Inserts the consolidated message with `consolidated = 1` and the given confidence.
1253    /// 2. Inserts rows into `memory_consolidation_sources` for each source ID.
1254    /// 3. Marks each source message's `consolidated = 1` so future sweeps skip them.
1255    ///
1256    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1257    ///
1258    /// # Errors
1259    ///
1260    /// Returns an error if any database operation fails. The transaction is rolled back automatically
1261    /// on failure so no partial state is written.
1262    pub async fn apply_consolidation_merge(
1263        &self,
1264        conversation_id: ConversationId,
1265        role: &str,
1266        merged_content: &str,
1267        source_ids: &[MessageId],
1268        confidence: f32,
1269        confidence_threshold: f32,
1270    ) -> Result<bool, MemoryError> {
1271        if confidence < confidence_threshold {
1272            return Ok(false);
1273        }
1274        if source_ids.is_empty() {
1275            return Ok(false);
1276        }
1277
1278        let mut tx = self.pool.begin().await?;
1279
1280        let importance = crate::semantic::importance::compute_importance(merged_content, role);
1281        let row: (MessageId,) = zeph_db::query_as(sql!(
1282            "INSERT INTO messages \
1283               (conversation_id, role, content, parts, agent_visible, user_visible, \
1284                importance_score, consolidated, consolidation_confidence) \
1285             VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1286             RETURNING id"
1287        ))
1288        .bind(conversation_id)
1289        .bind(role)
1290        .bind(merged_content)
1291        .bind(importance)
1292        .bind(confidence)
1293        .fetch_one(&mut *tx)
1294        .await?;
1295        let consolidated_id = row.0;
1296
1297        let consol_sql = format!(
1298            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1299            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1300            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1301        );
1302        for &source_id in source_ids {
1303            zeph_db::query(&consol_sql)
1304                .bind(consolidated_id)
1305                .bind(source_id)
1306                .execute(&mut *tx)
1307                .await?;
1308
1309            // Mark original as consolidated so future sweeps skip it.
1310            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1311                .bind(source_id)
1312                .execute(&mut *tx)
1313                .await?;
1314        }
1315
1316        tx.commit().await?;
1317        Ok(true)
1318    }
1319
1320    /// Update an existing consolidated message in-place with new content.
1321    ///
1322    /// Atomically:
1323    /// 1. Updates `content` and `consolidation_confidence` on `target_id`.
1324    /// 2. Inserts rows into `memory_consolidation_sources` linking `target_id` → each source.
1325    /// 3. Marks each source message's `consolidated = 1`.
1326    ///
1327    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns an error if any database operation fails.
1332    pub async fn apply_consolidation_update(
1333        &self,
1334        target_id: MessageId,
1335        new_content: &str,
1336        additional_source_ids: &[MessageId],
1337        confidence: f32,
1338        confidence_threshold: f32,
1339    ) -> Result<bool, MemoryError> {
1340        if confidence < confidence_threshold {
1341            return Ok(false);
1342        }
1343
1344        let mut tx = self.pool.begin().await?;
1345
1346        zeph_db::query(sql!(
1347            "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1348        ))
1349        .bind(new_content)
1350        .bind(confidence)
1351        .bind(target_id)
1352        .execute(&mut *tx)
1353        .await?;
1354
1355        let consol_sql = format!(
1356            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1357            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1358            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1359        );
1360        for &source_id in additional_source_ids {
1361            zeph_db::query(&consol_sql)
1362                .bind(target_id)
1363                .bind(source_id)
1364                .execute(&mut *tx)
1365                .await?;
1366
1367            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1368                .bind(source_id)
1369                .execute(&mut *tx)
1370                .await?;
1371        }
1372
1373        tx.commit().await?;
1374        Ok(true)
1375    }
1376
1377    // ── Forgetting sweep helpers ───────────────────────────────────────────────
1378
1379    /// Set `importance_score` for a single message by ID.
1380    ///
1381    /// Used in tests and by forgetting sweep helpers.
1382    ///
1383    /// # Errors
1384    ///
1385    /// Returns an error if the update fails.
1386    pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1387        zeph_db::query(sql!(
1388            "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1389        ))
1390        .bind(score)
1391        .bind(id)
1392        .execute(&self.pool)
1393        .await?;
1394        Ok(())
1395    }
1396
1397    /// Get `importance_score` for a single message by ID.
1398    ///
1399    /// Returns `None` if the message does not exist or is deleted.
1400    ///
1401    /// # Errors
1402    ///
1403    /// Returns an error if the query fails.
1404    pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1405        let row: Option<(f64,)> = zeph_db::query_as(sql!(
1406            "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1407        ))
1408        .bind(id)
1409        .fetch_optional(&self.pool)
1410        .await?;
1411        Ok(row.map(|(s,)| s))
1412    }
1413
1414    /// Increment `access_count` and update `last_accessed` for a batch of messages.
1415    ///
1416    /// Alias used in forgetting tests; forwards to `increment_access_counts`.
1417    ///
1418    /// # Errors
1419    ///
1420    /// Returns an error if the update fails.
1421    pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1422        self.increment_access_counts(ids).await
1423    }
1424
1425    /// Mark a set of messages as consolidated (`consolidated = 1`).
1426    ///
1427    /// Used in tests to simulate the state after consolidation.
1428    ///
1429    /// # Errors
1430    ///
1431    /// Returns an error if the update fails.
1432    pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1433        for &id in ids {
1434            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1435                .bind(id)
1436                .execute(&self.pool)
1437                .await?;
1438        }
1439        Ok(())
1440    }
1441
1442    /// Execute the three-phase forgetting sweep (`SleepGate`) inside a single transaction.
1443    ///
1444    /// **Phase 1** — Downscale all active non-consolidated importance scores by `decay_rate`.
1445    /// **Phase 2** — Undo the decay for messages accessed within `replay_window_hours` or
1446    ///   with `access_count >= replay_min_access_count` (undo current sweep's decay only).
1447    /// **Phase 3** — Soft-delete messages below `forgetting_floor` that are not protected
1448    ///   by recent access (`protect_recent_hours`) or high access count
1449    ///   (`protect_min_access_count`). Uses `batch_size` as a row-count cap.
1450    ///
1451    /// All phases commit atomically: concurrent readers see either the pre-sweep or
1452    /// post-sweep state, never an intermediate.
1453    ///
1454    /// # Errors
1455    ///
1456    /// Returns an error if any database operation fails.
1457    pub async fn run_forgetting_sweep_tx(
1458        &self,
1459        config: &zeph_common::config::memory::ForgettingConfig,
1460    ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1461        let mut tx = self.pool.begin().await?;
1462
1463        let decay = f64::from(config.decay_rate);
1464        let floor = f64::from(config.forgetting_floor);
1465        let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1466        let replay_hours = i64::from(config.replay_window_hours);
1467        let replay_min_access = i64::from(config.replay_min_access_count);
1468        let protect_hours = i64::from(config.protect_recent_hours);
1469        let protect_min_access = i64::from(config.protect_min_access_count);
1470
1471        // Phase 1: downscale all active, non-consolidated messages (limited to batch_size).
1472        // We target a specific set of IDs to respect sweep_batch_size.
1473        let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1474            "SELECT id FROM messages \
1475             WHERE deleted_at IS NULL AND consolidated = 0 \
1476             ORDER BY importance_score ASC \
1477             LIMIT ?"
1478        ))
1479        .bind(batch)
1480        .fetch_all(&mut *tx)
1481        .await?;
1482
1483        #[allow(clippy::cast_possible_truncation)]
1484        let downscaled = candidate_ids.len() as u32;
1485
1486        if downscaled > 0 {
1487            let placeholders: String = candidate_ids
1488                .iter()
1489                .map(|_| "?")
1490                .collect::<Vec<_>>()
1491                .join(",");
1492            let downscale_sql = format!(
1493                "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1494                 WHERE id IN ({placeholders})"
1495            );
1496            let mut q = zeph_db::query(&downscale_sql);
1497            for &(id,) in &candidate_ids {
1498                q = q.bind(id);
1499            }
1500            q.execute(&mut *tx).await?;
1501        }
1502
1503        // Phase 2: selective replay — undo decay for recently-accessed messages.
1504        // Formula: score / (1 - decay_rate) restores the current sweep's downscaling.
1505        // Cap at 1.0 to avoid exceeding the maximum importance score.
1506        // Scoped to the Phase 1 batch only: messages not decayed this sweep must not
1507        // have their scores inflated by the inverse formula.
1508        let replayed = if downscaled > 0 {
1509            let replay_placeholders: String = candidate_ids
1510                .iter()
1511                .map(|_| "?")
1512                .collect::<Vec<_>>()
1513                .join(",");
1514            let replay_sql = format!(
1515                "UPDATE messages \
1516                 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1517                 WHERE id IN ({replay_placeholders}) \
1518                 AND (\
1519                     (last_accessed IS NOT NULL \
1520                      AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1521                     OR access_count >= ?\
1522                 )"
1523            );
1524            let mut rq = zeph_db::query(&replay_sql);
1525            for &(id,) in &candidate_ids {
1526                rq = rq.bind(id);
1527            }
1528            let replay_result = rq
1529                .bind(replay_hours)
1530                .bind(replay_min_access)
1531                .execute(&mut *tx)
1532                .await?;
1533            #[allow(clippy::cast_possible_truncation)]
1534            let n = replay_result.rows_affected() as u32;
1535            n
1536        } else {
1537            0
1538        };
1539
1540        // Phase 3: targeted forgetting — soft-delete low-score unprotected messages.
1541        let prune_sql = format!(
1542            "UPDATE messages \
1543             SET deleted_at = CURRENT_TIMESTAMP \
1544             WHERE deleted_at IS NULL AND consolidated = 0 \
1545             AND importance_score < {floor} \
1546             AND (\
1547                 last_accessed IS NULL \
1548                 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1549             ) \
1550             AND access_count < ?"
1551        );
1552        let prune_result = zeph_db::query(&prune_sql)
1553            .bind(protect_hours)
1554            .bind(protect_min_access)
1555            .execute(&mut *tx)
1556            .await?;
1557        #[allow(clippy::cast_possible_truncation)]
1558        let pruned = prune_result.rows_affected() as u32;
1559
1560        tx.commit().await?;
1561
1562        Ok(crate::forgetting::ForgettingResult {
1563            downscaled,
1564            replayed,
1565            pruned,
1566        })
1567    }
1568}
1569
1570/// A candidate message for tier promotion, returned by [`SqliteStore::find_promotion_candidates`].
1571#[derive(Debug, Clone)]
1572pub struct PromotionCandidate {
1573    pub id: MessageId,
1574    pub conversation_id: ConversationId,
1575    pub content: String,
1576    pub session_count: u32,
1577    pub importance_score: f64,
1578}
1579
1580#[cfg(test)]
1581mod tests;