Skip to main content

zeph_memory/store/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_db::ActiveDialect;
8use zeph_db::fts::sanitize_fts_query;
9#[allow(unused_imports)]
10use zeph_db::{begin_write, sql};
11use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
12
13use super::SqliteStore;
14use crate::error::MemoryError;
15use crate::types::{ConversationId, MessageId};
16
17fn parse_role(s: &str) -> Role {
18    match s {
19        "assistant" => Role::Assistant,
20        "system" => Role::System,
21        _ => Role::User,
22    }
23}
24
25#[must_use]
26pub fn role_str(role: Role) -> &'static str {
27    match role {
28        Role::System => "system",
29        Role::User => "user",
30        Role::Assistant => "assistant",
31    }
32}
33
34/// Map a legacy externally-tagged variant key to the `kind` value used by the current
35/// internally-tagged schema.
36fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
37    match key {
38        "Text" => Some("text"),
39        "ToolOutput" => Some("tool_output"),
40        "Recall" => Some("recall"),
41        "CodeContext" => Some("code_context"),
42        "Summary" => Some("summary"),
43        "CrossSession" => Some("cross_session"),
44        "ToolUse" => Some("tool_use"),
45        "ToolResult" => Some("tool_result"),
46        "Image" => Some("image"),
47        "ThinkingBlock" => Some("thinking_block"),
48        "RedactedThinkingBlock" => Some("redacted_thinking_block"),
49        "Compaction" => Some("compaction"),
50        _ => None,
51    }
52}
53
54/// Attempt to parse a JSON string written in the pre-v0.17.1 externally-tagged format.
55///
56/// Old format: `[{"Summary":{"text":"..."}}, ...]`
57/// New format: `[{"kind":"summary","text":"..."}, ...]`
58///
59/// Returns `None` if the input does not look like the old format or if any element fails
60/// to deserialize after conversion.
61fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
62    let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
63    let mut result = Vec::with_capacity(array.len());
64    for element in array {
65        let obj = element.as_object()?;
66        if obj.contains_key("kind") {
67            return None;
68        }
69        if obj.len() != 1 {
70            return None;
71        }
72        let (key, inner) = obj.iter().next()?;
73        let kind = legacy_key_to_kind(key)?;
74        let mut new_obj = match inner {
75            serde_json::Value::Object(m) => m.clone(),
76            // Image variant wraps a single object directly
77            other => {
78                let mut m = serde_json::Map::new();
79                m.insert("data".to_string(), other.clone());
80                m
81            }
82        };
83        new_obj.insert(
84            "kind".to_string(),
85            serde_json::Value::String(kind.to_string()),
86        );
87        let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
88        result.push(part);
89    }
90    Some(result)
91}
92
93/// Deserialize message parts from a stored JSON string.
94///
95/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
96/// a truncated excerpt of the malformed JSON for diagnostics.
97fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
98    if parts_json == "[]" {
99        return vec![];
100    }
101    match serde_json::from_str(parts_json) {
102        Ok(p) => p,
103        Err(e) => {
104            if let Some(parts) = try_parse_legacy_parts(parts_json) {
105                let truncated = parts_json.chars().take(120).collect::<String>();
106                tracing::warn!(
107                    role = %role_str,
108                    parts_json = %truncated,
109                    "loaded legacy-format message parts via compat path"
110                );
111                return parts;
112            }
113            let truncated = parts_json.chars().take(120).collect::<String>();
114            tracing::warn!(
115                role = %role_str,
116                parts_json = %truncated,
117                error = %e,
118                "failed to deserialize message parts, falling back to empty"
119            );
120            vec![]
121        }
122    }
123}
124
125impl SqliteStore {
126    /// Create a new conversation and return its ID.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the insert fails.
131    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
132        let row: (ConversationId,) = zeph_db::query_as(sql!(
133            "INSERT INTO conversations DEFAULT VALUES RETURNING id"
134        ))
135        .fetch_one(&self.pool)
136        .await?;
137        Ok(row.0)
138    }
139
140    /// Save a message to the given conversation and return the message ID.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the insert fails.
145    pub async fn save_message(
146        &self,
147        conversation_id: ConversationId,
148        role: &str,
149        content: &str,
150    ) -> Result<MessageId, MemoryError> {
151        self.save_message_with_parts(conversation_id, role, content, "[]")
152            .await
153    }
154
155    /// Save a message with structured parts JSON.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the insert fails.
160    pub async fn save_message_with_parts(
161        &self,
162        conversation_id: ConversationId,
163        role: &str,
164        content: &str,
165        parts_json: &str,
166    ) -> Result<MessageId, MemoryError> {
167        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
168            .await
169    }
170
171    /// Save a message with an optional category tag.
172    ///
173    /// The `category` column is NULL when `None` — existing rows are unaffected.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the insert fails.
178    pub async fn save_message_with_category(
179        &self,
180        conversation_id: ConversationId,
181        role: &str,
182        content: &str,
183        category: Option<&str>,
184    ) -> Result<MessageId, MemoryError> {
185        let importance_score = crate::semantic::importance::compute_importance(content, role);
186        let row: (MessageId,) = zeph_db::query_as(sql!(
187            "INSERT INTO messages \
188                 (conversation_id, role, content, parts, agent_visible, user_visible, \
189                  importance_score, category) \
190                 VALUES (?, ?, ?, '[]', 1, 1, ?, ?) RETURNING id"
191        ))
192        .bind(conversation_id)
193        .bind(role)
194        .bind(content)
195        .bind(importance_score)
196        .bind(category)
197        .fetch_one(&self.pool)
198        .await?;
199        Ok(row.0)
200    }
201
202    /// Save a message with visibility metadata.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the insert fails.
207    pub async fn save_message_with_metadata(
208        &self,
209        conversation_id: ConversationId,
210        role: &str,
211        content: &str,
212        parts_json: &str,
213        agent_visible: bool,
214        user_visible: bool,
215    ) -> Result<MessageId, MemoryError> {
216        let importance_score = crate::semantic::importance::compute_importance(content, role);
217        let row: (MessageId,) = zeph_db::query_as(
218            sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
219             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
220        )
221        .bind(conversation_id)
222        .bind(role)
223        .bind(content)
224        .bind(parts_json)
225        .bind(i64::from(agent_visible))
226        .bind(i64::from(user_visible))
227        .bind(importance_score)
228        .fetch_one(&self.pool)
229        .await?;
230        Ok(row.0)
231    }
232
233    /// Load the most recent messages for a conversation, up to `limit`.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the query fails.
238    pub async fn load_history(
239        &self,
240        conversation_id: ConversationId,
241        limit: u32,
242    ) -> Result<Vec<Message>, MemoryError> {
243        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
244            "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
245                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
246                WHERE conversation_id = ? AND deleted_at IS NULL \
247                ORDER BY id DESC \
248                LIMIT ?\
249             ) ORDER BY id ASC"
250        ))
251        .bind(conversation_id)
252        .bind(limit)
253        .fetch_all(&self.pool)
254        .await?;
255
256        let messages = rows
257            .into_iter()
258            .map(
259                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
260                    let parts = parse_parts_json(&role_str, &parts_json);
261                    Message {
262                        role: parse_role(&role_str),
263                        content,
264                        parts,
265                        metadata: MessageMetadata {
266                            agent_visible: agent_visible != 0,
267                            user_visible: user_visible != 0,
268                            compacted_at: None,
269                            deferred_summary: None,
270                            focus_pinned: false,
271                            focus_marker_id: None,
272                            db_id: Some(row_id),
273                        },
274                    }
275                },
276            )
277            .collect();
278        Ok(messages)
279    }
280
281    /// Load messages filtered by visibility flags.
282    ///
283    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if the query fails.
288    pub async fn load_history_filtered(
289        &self,
290        conversation_id: ConversationId,
291        limit: u32,
292        agent_visible: Option<bool>,
293        user_visible: Option<bool>,
294    ) -> Result<Vec<Message>, MemoryError> {
295        let av = agent_visible.map(i64::from);
296        let uv = user_visible.map(i64::from);
297
298        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
299            sql!("WITH recent AS (\
300                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
301                WHERE conversation_id = ? \
302                  AND deleted_at IS NULL \
303                  AND (? IS NULL OR agent_visible = ?) \
304                  AND (? IS NULL OR user_visible = ?) \
305                ORDER BY id DESC \
306                LIMIT ?\
307             ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
308        )
309        .bind(conversation_id)
310        .bind(av)
311        .bind(av)
312        .bind(uv)
313        .bind(uv)
314        .bind(limit)
315        .fetch_all(&self.pool)
316        .await?;
317
318        let messages = rows
319            .into_iter()
320            .map(
321                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
322                    let parts = parse_parts_json(&role_str, &parts_json);
323                    Message {
324                        role: parse_role(&role_str),
325                        content,
326                        parts,
327                        metadata: MessageMetadata {
328                            agent_visible: agent_visible != 0,
329                            user_visible: user_visible != 0,
330                            compacted_at: None,
331                            deferred_summary: None,
332                            focus_pinned: false,
333                            focus_marker_id: None,
334                            db_id: Some(row_id),
335                        },
336                    }
337                },
338            )
339            .collect();
340        Ok(messages)
341    }
342
343    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
344    ///
345    /// Within a single transaction:
346    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
347    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
348    ///
349    /// Returns the `MessageId` of the inserted summary.
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if the transaction fails.
354    pub async fn replace_conversation(
355        &self,
356        conversation_id: ConversationId,
357        compacted_range: std::ops::RangeInclusive<MessageId>,
358        summary_role: &str,
359        summary_content: &str,
360    ) -> Result<MessageId, MemoryError> {
361        let now = {
362            let secs = std::time::SystemTime::now()
363                .duration_since(std::time::UNIX_EPOCH)
364                .unwrap_or_default()
365                .as_secs();
366            format!("{secs}")
367        };
368        let start_id = compacted_range.start().0;
369        let end_id = compacted_range.end().0;
370
371        let mut tx = self.pool.begin().await?;
372
373        zeph_db::query(sql!(
374            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
375             WHERE conversation_id = ? AND id >= ? AND id <= ?"
376        ))
377        .bind(&now)
378        .bind(conversation_id)
379        .bind(start_id)
380        .bind(end_id)
381        .execute(&mut *tx)
382        .await?;
383
384        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
385        let row: (MessageId,) = zeph_db::query_as(sql!(
386            "INSERT INTO messages \
387             (conversation_id, role, content, parts, agent_visible, user_visible) \
388             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
389        ))
390        .bind(conversation_id)
391        .bind(summary_role)
392        .bind(summary_content)
393        .fetch_one(&mut *tx)
394        .await?;
395
396        tx.commit().await?;
397
398        Ok(row.0)
399    }
400
401    /// Atomically hide `tool_use/tool_result` message pairs and insert summary messages.
402    ///
403    /// Within a single transaction:
404    /// 1. Sets `agent_visible=0, compacted_at=<now>` for each ID in `hide_ids`.
405    /// 2. Inserts each text in `summaries` as a new agent-only assistant message.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the transaction fails.
410    pub async fn apply_tool_pair_summaries(
411        &self,
412        conversation_id: ConversationId,
413        hide_ids: &[i64],
414        summaries: &[String],
415    ) -> Result<(), MemoryError> {
416        if hide_ids.is_empty() && summaries.is_empty() {
417            return Ok(());
418        }
419
420        let now = std::time::SystemTime::now()
421            .duration_since(std::time::UNIX_EPOCH)
422            .unwrap_or_default()
423            .as_secs()
424            .to_string();
425
426        let mut tx = self.pool.begin().await?;
427
428        for &id in hide_ids {
429            zeph_db::query(sql!(
430                "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
431            ))
432            .bind(&now)
433            .bind(id)
434            .execute(&mut *tx)
435            .await?;
436        }
437
438        for summary in summaries {
439            let content = format!("[tool summary] {summary}");
440            let parts = serde_json::to_string(&[MessagePart::Summary {
441                text: summary.clone(),
442            }])
443            .unwrap_or_else(|_| "[]".to_string());
444            zeph_db::query(sql!(
445                "INSERT INTO messages \
446                 (conversation_id, role, content, parts, agent_visible, user_visible) \
447                 VALUES (?, 'assistant', ?, ?, 1, 0)"
448            ))
449            .bind(conversation_id)
450            .bind(&content)
451            .bind(&parts)
452            .execute(&mut *tx)
453            .await?;
454        }
455
456        tx.commit().await?;
457        Ok(())
458    }
459
460    /// Return the IDs of the N oldest messages in a conversation (ascending order).
461    ///
462    /// # Errors
463    ///
464    /// Returns an error if the query fails.
465    pub async fn oldest_message_ids(
466        &self,
467        conversation_id: ConversationId,
468        n: u32,
469    ) -> Result<Vec<MessageId>, MemoryError> {
470        let rows: Vec<(MessageId,)> = zeph_db::query_as(
471            sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
472        )
473        .bind(conversation_id)
474        .bind(n)
475        .fetch_all(&self.pool)
476        .await?;
477        Ok(rows.into_iter().map(|r| r.0).collect())
478    }
479
480    /// Return the ID of the most recent conversation, if any.
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if the query fails.
485    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
486        let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
487            "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
488        ))
489        .fetch_optional(&self.pool)
490        .await?;
491        Ok(row.map(|r| r.0))
492    }
493
494    /// Fetch a single message by its ID.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if the query fails.
499    pub async fn message_by_id(
500        &self,
501        message_id: MessageId,
502    ) -> Result<Option<Message>, MemoryError> {
503        let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
504            sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
505        )
506        .bind(message_id)
507        .fetch_optional(&self.pool)
508        .await?;
509
510        Ok(row.map(
511            |(role_str, content, parts_json, agent_visible, user_visible)| {
512                let parts = parse_parts_json(&role_str, &parts_json);
513                Message {
514                    role: parse_role(&role_str),
515                    content,
516                    parts,
517                    metadata: MessageMetadata {
518                        agent_visible: agent_visible != 0,
519                        user_visible: user_visible != 0,
520                        compacted_at: None,
521                        deferred_summary: None,
522                        focus_pinned: false,
523                        focus_marker_id: None,
524                        db_id: None,
525                    },
526                }
527            },
528        ))
529    }
530
531    /// Fetch messages by a list of IDs in a single query.
532    ///
533    /// # Errors
534    ///
535    /// Returns an error if the query fails.
536    pub async fn messages_by_ids(
537        &self,
538        ids: &[MessageId],
539    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
540        if ids.is_empty() {
541            return Ok(Vec::new());
542        }
543
544        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
545
546        let query = format!(
547            "SELECT id, role, content, parts FROM messages \
548             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
549        );
550        let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
551        for &id in ids {
552            q = q.bind(id);
553        }
554
555        let rows = q.fetch_all(&self.pool).await?;
556
557        Ok(rows
558            .into_iter()
559            .map(|(id, role_str, content, parts_json)| {
560                let parts = parse_parts_json(&role_str, &parts_json);
561                (
562                    id,
563                    Message {
564                        role: parse_role(&role_str),
565                        content,
566                        parts,
567                        metadata: MessageMetadata::default(),
568                    },
569                )
570            })
571            .collect())
572    }
573
574    /// Return message IDs and content for messages without embeddings.
575    ///
576    /// # Errors
577    ///
578    /// Returns an error if the query fails.
579    pub async fn unembedded_message_ids(
580        &self,
581        limit: Option<usize>,
582    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
583        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
584
585        let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
586            "SELECT m.id, m.conversation_id, m.role, m.content \
587             FROM messages m \
588             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
589             WHERE em.id IS NULL AND m.deleted_at IS NULL \
590             ORDER BY m.id ASC \
591             LIMIT ?"
592        ))
593        .bind(effective_limit)
594        .fetch_all(&self.pool)
595        .await?;
596
597        Ok(rows)
598    }
599
600    /// Stream message IDs and content for messages without embeddings, one row at a time.
601    ///
602    /// Executes the same query as [`Self::unembedded_message_ids`] but returns a streaming
603    /// cursor instead of loading all rows into a `Vec`. The `SQLite` read transaction is held
604    /// for the duration of iteration; callers must not write to `embeddings_metadata` while
605    /// the stream is live (use a separate async task for writes).
606    ///
607    /// # Errors
608    ///
609    /// Yields a [`MemoryError`] if the query or row decoding fails.
610    pub fn stream_unembedded_messages(
611        &self,
612        limit: i64,
613    ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
614    {
615        zeph_db::query_as(sql!(
616            "SELECT m.id, m.conversation_id, m.role, m.content \
617             FROM messages m \
618             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
619             WHERE em.id IS NULL AND m.deleted_at IS NULL \
620             ORDER BY m.id ASC \
621             LIMIT ?"
622        ))
623        .bind(limit)
624        .fetch(&self.pool)
625        .map_err(MemoryError::from)
626    }
627
628    /// Count the number of messages in a conversation.
629    ///
630    /// # Errors
631    ///
632    /// Returns an error if the query fails.
633    pub async fn count_messages(
634        &self,
635        conversation_id: ConversationId,
636    ) -> Result<i64, MemoryError> {
637        let row: (i64,) = zeph_db::query_as(sql!(
638            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
639        ))
640        .bind(conversation_id)
641        .fetch_one(&self.pool)
642        .await?;
643        Ok(row.0)
644    }
645
646    /// Count messages in a conversation with id greater than `after_id`.
647    ///
648    /// # Errors
649    ///
650    /// Returns an error if the query fails.
651    pub async fn count_messages_after(
652        &self,
653        conversation_id: ConversationId,
654        after_id: MessageId,
655    ) -> Result<i64, MemoryError> {
656        let row: (i64,) =
657            zeph_db::query_as(
658                sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
659            )
660            .bind(conversation_id)
661            .bind(after_id)
662            .fetch_one(&self.pool)
663            .await?;
664        Ok(row.0)
665    }
666
667    /// Full-text keyword search over messages using FTS5.
668    ///
669    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
670    /// negated to positive for consistency with vector scores).
671    ///
672    /// # Errors
673    ///
674    /// Returns an error if the query fails.
675    pub async fn keyword_search(
676        &self,
677        query: &str,
678        limit: usize,
679        conversation_id: Option<ConversationId>,
680    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
681        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
682        let safe_query = sanitize_fts_query(query);
683        if safe_query.is_empty() {
684            return Ok(Vec::new());
685        }
686
687        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
688            zeph_db::query_as(
689                sql!("SELECT m.id, -rank AS score \
690                 FROM messages_fts f \
691                 JOIN messages m ON m.id = f.rowid \
692                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
693                 ORDER BY rank \
694                 LIMIT ?"),
695            )
696            .bind(&safe_query)
697            .bind(cid)
698            .bind(effective_limit)
699            .fetch_all(&self.pool)
700            .await?
701        } else {
702            zeph_db::query_as(sql!(
703                "SELECT m.id, -rank AS score \
704                 FROM messages_fts f \
705                 JOIN messages m ON m.id = f.rowid \
706                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
707                 ORDER BY rank \
708                 LIMIT ?"
709            ))
710            .bind(&safe_query)
711            .bind(effective_limit)
712            .fetch_all(&self.pool)
713            .await?
714        };
715
716        Ok(rows)
717    }
718
719    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
720    ///
721    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
722    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
723    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
724    ///
725    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
726    /// `None` means "no bound" on that side.
727    ///
728    /// # Errors
729    ///
730    /// Returns an error if the query fails.
731    pub async fn keyword_search_with_time_range(
732        &self,
733        query: &str,
734        limit: usize,
735        conversation_id: Option<ConversationId>,
736        after: Option<&str>,
737        before: Option<&str>,
738    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
739        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
740        let safe_query = sanitize_fts_query(query);
741        if safe_query.is_empty() {
742            return Ok(Vec::new());
743        }
744
745        // Build time-range clauses dynamically. Both bounds are optional.
746        let after_clause = if after.is_some() {
747            " AND m.created_at > ?"
748        } else {
749            ""
750        };
751        let before_clause = if before.is_some() {
752            " AND m.created_at < ?"
753        } else {
754            ""
755        };
756        let conv_clause = if conversation_id.is_some() {
757            " AND m.conversation_id = ?"
758        } else {
759            ""
760        };
761
762        let sql = format!(
763            "SELECT m.id, -rank AS score \
764             FROM messages_fts f \
765             JOIN messages m ON m.id = f.rowid \
766             WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
767             {after_clause}{before_clause}{conv_clause} \
768             ORDER BY rank \
769             LIMIT ?"
770        );
771
772        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
773        if let Some(a) = after {
774            q = q.bind(a);
775        }
776        if let Some(b) = before {
777            q = q.bind(b);
778        }
779        if let Some(cid) = conversation_id {
780            q = q.bind(cid);
781        }
782        q = q.bind(effective_limit);
783
784        Ok(q.fetch_all(&self.pool).await?)
785    }
786
787    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
788    ///
789    /// Messages without a `created_at` column fall back to 0.
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the query fails.
794    pub async fn message_timestamps(
795        &self,
796        ids: &[MessageId],
797    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
798        if ids.is_empty() {
799            return Ok(std::collections::HashMap::new());
800        }
801
802        let placeholders: String =
803            zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
804        let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
805        let query = format!(
806            "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
807        );
808        let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
809        for &id in ids {
810            q = q.bind(id);
811        }
812
813        let rows = q.fetch_all(&self.pool).await?;
814        Ok(rows.into_iter().collect())
815    }
816
817    /// Load a range of messages after a given message ID.
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if the query fails.
822    pub async fn load_messages_range(
823        &self,
824        conversation_id: ConversationId,
825        after_message_id: MessageId,
826        limit: usize,
827    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
828        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
829
830        let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
831            "SELECT id, role, content FROM messages \
832             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
833             ORDER BY id ASC LIMIT ?"
834        ))
835        .bind(conversation_id)
836        .bind(after_message_id)
837        .bind(effective_limit)
838        .fetch_all(&self.pool)
839        .await?;
840
841        Ok(rows)
842    }
843
844    // ── Eviction helpers ──────────────────────────────────────────────────────
845
846    /// Return all non-deleted message IDs with their eviction metadata.
847    ///
848    /// # Errors
849    ///
850    /// Returns an error if the query fails.
851    pub async fn get_eviction_candidates(
852        &self,
853    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
854        let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
855            "SELECT id, created_at, last_accessed, access_count \
856             FROM messages WHERE deleted_at IS NULL"
857        ))
858        .fetch_all(&self.pool)
859        .await?;
860
861        Ok(rows
862            .into_iter()
863            .map(
864                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
865                    id,
866                    created_at,
867                    last_accessed,
868                    access_count: access_count.try_into().unwrap_or(0),
869                },
870            )
871            .collect())
872    }
873
874    /// Soft-delete a set of messages by marking `deleted_at`.
875    ///
876    /// Soft-deleted messages are excluded from all history queries.
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if the update fails.
881    pub async fn soft_delete_messages(
882        &self,
883        ids: &[MessageId],
884    ) -> Result<(), crate::error::MemoryError> {
885        if ids.is_empty() {
886            return Ok(());
887        }
888        // SQLite does not support array binding natively. Batch via individual updates.
889        for &id in ids {
890            zeph_db::query(
891                sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
892            )
893            .bind(id)
894            .execute(&self.pool)
895            .await?;
896        }
897        Ok(())
898    }
899
900    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
901    ///
902    /// # Errors
903    ///
904    /// Returns an error if the query fails.
905    pub async fn get_soft_deleted_message_ids(
906        &self,
907    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
908        let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
909            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
910        ))
911        .fetch_all(&self.pool)
912        .await?;
913        Ok(rows.into_iter().map(|(id,)| id).collect())
914    }
915
916    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
917    ///
918    /// # Errors
919    ///
920    /// Returns an error if the update fails.
921    pub async fn mark_qdrant_cleaned(
922        &self,
923        ids: &[MessageId],
924    ) -> Result<(), crate::error::MemoryError> {
925        for &id in ids {
926            zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
927                .bind(id)
928                .execute(&self.pool)
929                .await?;
930        }
931        Ok(())
932    }
933
934    /// Fetch `importance_score` values for the given message IDs.
935    ///
936    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if the query fails.
941    pub async fn fetch_importance_scores(
942        &self,
943        ids: &[MessageId],
944    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
945        if ids.is_empty() {
946            return Ok(std::collections::HashMap::new());
947        }
948        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
949        let query = format!(
950            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
951        );
952        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
953        for &id in ids {
954            q = q.bind(id);
955        }
956        let rows = q.fetch_all(&self.pool).await?;
957        Ok(rows.into_iter().collect())
958    }
959
960    /// Increment `access_count` and set `last_accessed = CURRENT_TIMESTAMP` for the given IDs.
961    ///
962    /// Skips the update when `ids` is empty.
963    ///
964    /// # Errors
965    ///
966    /// Returns an error if the update fails.
967    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
968        if ids.is_empty() {
969            return Ok(());
970        }
971        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
972        let query = format!(
973            "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
974             WHERE id IN ({placeholders})"
975        );
976        let mut q = zeph_db::query(&query);
977        for &id in ids {
978            q = q.bind(id);
979        }
980        q.execute(&self.pool).await?;
981        Ok(())
982    }
983
984    // ── Tier promotion helpers ─────────────────────────────────────────────────
985
986    /// Return episodic messages with `session_count >= min_sessions`, ordered by
987    /// session count descending then importance score descending.
988    ///
989    /// # Errors
990    ///
991    /// Returns an error if the query fails.
992    pub async fn find_promotion_candidates(
993        &self,
994        min_sessions: u32,
995        batch_size: usize,
996    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
997        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
998        let min = i64::from(min_sessions);
999        let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1000            "SELECT id, conversation_id, content, session_count, importance_score \
1001             FROM messages \
1002             WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1003             ORDER BY session_count DESC, importance_score DESC \
1004             LIMIT ?"
1005        ))
1006        .bind(min)
1007        .bind(limit)
1008        .fetch_all(&self.pool)
1009        .await?;
1010
1011        Ok(rows
1012            .into_iter()
1013            .map(
1014                |(id, conversation_id, content, session_count, importance_score)| {
1015                    PromotionCandidate {
1016                        id,
1017                        conversation_id,
1018                        content,
1019                        session_count: session_count.try_into().unwrap_or(0),
1020                        importance_score,
1021                    }
1022                },
1023            )
1024            .collect())
1025    }
1026
1027    /// Count messages per tier (episodic, semantic) that are not deleted.
1028    ///
1029    /// Returns `(episodic_count, semantic_count)`.
1030    ///
1031    /// # Errors
1032    ///
1033    /// Returns an error if the query fails.
1034    pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1035        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1036            "SELECT tier, COUNT(*) FROM messages \
1037             WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1038             GROUP BY tier"
1039        ))
1040        .fetch_all(&self.pool)
1041        .await?;
1042
1043        let mut episodic = 0i64;
1044        let mut semantic = 0i64;
1045        for (tier, count) in rows {
1046            match tier.as_str() {
1047                "episodic" => episodic = count,
1048                "semantic" => semantic = count,
1049                _ => {}
1050            }
1051        }
1052        Ok((episodic, semantic))
1053    }
1054
1055    /// Count semantic facts (tier='semantic', not deleted).
1056    ///
1057    /// # Errors
1058    ///
1059    /// Returns an error if the query fails.
1060    pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1061        let row: (i64,) = zeph_db::query_as(sql!(
1062            "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1063        ))
1064        .fetch_one(&self.pool)
1065        .await?;
1066        Ok(row.0)
1067    }
1068
1069    /// Promote a set of episodic messages to semantic tier in a single transaction.
1070    ///
1071    /// Within one transaction:
1072    /// 1. Inserts a new message with `tier='semantic'` and `promotion_timestamp=unixepoch()`.
1073    /// 2. Soft-deletes the original episodic messages and marks them `qdrant_cleaned=0`
1074    ///    so the eviction sweep picks up their Qdrant vectors.
1075    ///
1076    /// Returns the `MessageId` of the new semantic message.
1077    ///
1078    /// # Errors
1079    ///
1080    /// Returns an error if the transaction fails.
1081    pub async fn promote_to_semantic(
1082        &self,
1083        conversation_id: ConversationId,
1084        merged_content: &str,
1085        original_ids: &[MessageId],
1086    ) -> Result<MessageId, MemoryError> {
1087        if original_ids.is_empty() {
1088            return Err(MemoryError::Other(
1089                "promote_to_semantic: original_ids must not be empty".into(),
1090            ));
1091        }
1092
1093        // Acquire the write lock immediately (BEGIN IMMEDIATE) to avoid the DEFERRED read->write
1094        // upgrade race that causes SQLITE_BUSY when a concurrent writer holds the lock.
1095        let mut tx = begin_write(&self.pool).await?;
1096
1097        // Insert the new semantic fact.
1098        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1099        let promote_insert_raw = format!(
1100            "INSERT INTO messages \
1101             (conversation_id, role, content, parts, agent_visible, user_visible, \
1102              tier, promotion_timestamp) \
1103             VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1104             RETURNING id"
1105        );
1106        let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1107        let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1108            .bind(conversation_id)
1109            .bind(merged_content)
1110            .fetch_one(&mut *tx)
1111            .await?;
1112
1113        let new_id = row.0;
1114
1115        // Soft-delete originals and reset qdrant_cleaned so eviction sweep removes vectors.
1116        for &id in original_ids {
1117            zeph_db::query(sql!(
1118                "UPDATE messages \
1119                 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1120                 WHERE id = ? AND deleted_at IS NULL"
1121            ))
1122            .bind(id)
1123            .execute(&mut *tx)
1124            .await?;
1125        }
1126
1127        tx.commit().await?;
1128        Ok(new_id)
1129    }
1130
1131    /// Manually promote a set of messages to semantic tier without merging.
1132    ///
1133    /// Sets `tier='semantic'` and `promotion_timestamp=unixepoch()` for the given IDs.
1134    /// Does NOT soft-delete the originals — use this for direct user-requested promotion.
1135    ///
1136    /// # Errors
1137    ///
1138    /// Returns an error if the update fails.
1139    pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1140        if ids.is_empty() {
1141            return Ok(0);
1142        }
1143        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1144        let manual_promote_raw = format!(
1145            "UPDATE messages \
1146             SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1147             WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1148        );
1149        let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1150        let mut count = 0usize;
1151        for &id in ids {
1152            let result = zeph_db::query(&manual_promote_sql)
1153                .bind(id)
1154                .execute(&self.pool)
1155                .await?;
1156            count += usize::try_from(result.rows_affected()).unwrap_or(0);
1157        }
1158        Ok(count)
1159    }
1160
1161    /// Increment `session_count` for all episodic messages in a conversation.
1162    ///
1163    /// Called when a session restores an existing conversation to mark that messages
1164    /// were accessed in a new session. Only episodic (non-deleted) messages are updated.
1165    ///
1166    /// # Errors
1167    ///
1168    /// Returns an error if the database update fails.
1169    pub async fn increment_session_counts_for_conversation(
1170        &self,
1171        conversation_id: ConversationId,
1172    ) -> Result<(), MemoryError> {
1173        zeph_db::query(sql!(
1174            "UPDATE messages SET session_count = session_count + 1 \
1175             WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1176        ))
1177        .bind(conversation_id)
1178        .execute(&self.pool)
1179        .await?;
1180        Ok(())
1181    }
1182
1183    /// Fetch the tier string for each of the given message IDs.
1184    ///
1185    /// Messages not found or already deleted are omitted from the result.
1186    ///
1187    /// # Errors
1188    ///
1189    /// Returns an error if the query fails.
1190    pub async fn fetch_tiers(
1191        &self,
1192        ids: &[MessageId],
1193    ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1194        if ids.is_empty() {
1195            return Ok(std::collections::HashMap::new());
1196        }
1197        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1198        let query = format!(
1199            "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1200        );
1201        let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1202        for &id in ids {
1203            q = q.bind(id);
1204        }
1205        let rows = q.fetch_all(&self.pool).await?;
1206        Ok(rows.into_iter().collect())
1207    }
1208
1209    /// Return all conversation IDs that have at least one non-consolidated original message.
1210    ///
1211    /// Used by the consolidation sweep to find conversations that need processing.
1212    ///
1213    /// # Errors
1214    ///
1215    /// Returns an error if the database query fails.
1216    pub async fn conversations_with_unconsolidated_messages(
1217        &self,
1218    ) -> Result<Vec<ConversationId>, MemoryError> {
1219        let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1220            "SELECT DISTINCT conversation_id FROM messages \
1221             WHERE consolidated = 0 AND deleted_at IS NULL"
1222        ))
1223        .fetch_all(&self.pool)
1224        .await?;
1225        Ok(rows.into_iter().map(|(id,)| id).collect())
1226    }
1227
1228    /// Fetch a batch of non-consolidated original messages for the consolidation sweep.
1229    ///
1230    /// Returns `(id, content)` pairs for messages that have not yet been processed by the
1231    /// consolidation loop (`consolidated = 0`) and are not soft-deleted.
1232    ///
1233    /// # Errors
1234    ///
1235    /// Returns an error if the database query fails.
1236    pub async fn find_unconsolidated_messages(
1237        &self,
1238        conversation_id: ConversationId,
1239        limit: usize,
1240    ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1241        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1242        let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1243            "SELECT id, content FROM messages \
1244             WHERE conversation_id = ? \
1245               AND consolidated = 0 \
1246               AND deleted_at IS NULL \
1247             ORDER BY id ASC \
1248             LIMIT ?"
1249        ))
1250        .bind(conversation_id)
1251        .bind(limit)
1252        .fetch_all(&self.pool)
1253        .await?;
1254        Ok(rows)
1255    }
1256
1257    /// Look up which consolidated entry (if any) covers the given original message.
1258    ///
1259    /// Returns the `consolidated_id` of the first consolidation product that lists `source_id`
1260    /// in its sources, or `None` if no consolidated entry covers it.
1261    ///
1262    /// # Errors
1263    ///
1264    /// Returns an error if the database query fails.
1265    pub async fn find_consolidated_for_source(
1266        &self,
1267        source_id: MessageId,
1268    ) -> Result<Option<MessageId>, MemoryError> {
1269        let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1270            "SELECT consolidated_id FROM memory_consolidation_sources \
1271             WHERE source_id = ? \
1272             LIMIT 1"
1273        ))
1274        .bind(source_id)
1275        .fetch_optional(&self.pool)
1276        .await?;
1277        Ok(row.map(|(id,)| id))
1278    }
1279
1280    /// Insert a consolidated message and record its source linkage in a single transaction.
1281    ///
1282    /// Atomically:
1283    /// 1. Inserts the consolidated message with `consolidated = 1` and the given confidence.
1284    /// 2. Inserts rows into `memory_consolidation_sources` for each source ID.
1285    /// 3. Marks each source message's `consolidated = 1` so future sweeps skip them.
1286    ///
1287    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1288    ///
1289    /// # Errors
1290    ///
1291    /// Returns an error if any database operation fails. The transaction is rolled back automatically
1292    /// on failure so no partial state is written.
1293    pub async fn apply_consolidation_merge(
1294        &self,
1295        conversation_id: ConversationId,
1296        role: &str,
1297        merged_content: &str,
1298        source_ids: &[MessageId],
1299        confidence: f32,
1300        confidence_threshold: f32,
1301    ) -> Result<bool, MemoryError> {
1302        if confidence < confidence_threshold {
1303            return Ok(false);
1304        }
1305        if source_ids.is_empty() {
1306            return Ok(false);
1307        }
1308
1309        let mut tx = self.pool.begin().await?;
1310
1311        let importance = crate::semantic::importance::compute_importance(merged_content, role);
1312        let row: (MessageId,) = zeph_db::query_as(sql!(
1313            "INSERT INTO messages \
1314               (conversation_id, role, content, parts, agent_visible, user_visible, \
1315                importance_score, consolidated, consolidation_confidence) \
1316             VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1317             RETURNING id"
1318        ))
1319        .bind(conversation_id)
1320        .bind(role)
1321        .bind(merged_content)
1322        .bind(importance)
1323        .bind(confidence)
1324        .fetch_one(&mut *tx)
1325        .await?;
1326        let consolidated_id = row.0;
1327
1328        let consol_sql = format!(
1329            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1330            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1331            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1332        );
1333        for &source_id in source_ids {
1334            zeph_db::query(&consol_sql)
1335                .bind(consolidated_id)
1336                .bind(source_id)
1337                .execute(&mut *tx)
1338                .await?;
1339
1340            // Mark original as consolidated so future sweeps skip it.
1341            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1342                .bind(source_id)
1343                .execute(&mut *tx)
1344                .await?;
1345        }
1346
1347        tx.commit().await?;
1348        Ok(true)
1349    }
1350
1351    /// Update an existing consolidated message in-place with new content.
1352    ///
1353    /// Atomically:
1354    /// 1. Updates `content` and `consolidation_confidence` on `target_id`.
1355    /// 2. Inserts rows into `memory_consolidation_sources` linking `target_id` → each source.
1356    /// 3. Marks each source message's `consolidated = 1`.
1357    ///
1358    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1359    ///
1360    /// # Errors
1361    ///
1362    /// Returns an error if any database operation fails.
1363    pub async fn apply_consolidation_update(
1364        &self,
1365        target_id: MessageId,
1366        new_content: &str,
1367        additional_source_ids: &[MessageId],
1368        confidence: f32,
1369        confidence_threshold: f32,
1370    ) -> Result<bool, MemoryError> {
1371        if confidence < confidence_threshold {
1372            return Ok(false);
1373        }
1374
1375        let mut tx = self.pool.begin().await?;
1376
1377        zeph_db::query(sql!(
1378            "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1379        ))
1380        .bind(new_content)
1381        .bind(confidence)
1382        .bind(target_id)
1383        .execute(&mut *tx)
1384        .await?;
1385
1386        let consol_sql = format!(
1387            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1388            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1389            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1390        );
1391        for &source_id in additional_source_ids {
1392            zeph_db::query(&consol_sql)
1393                .bind(target_id)
1394                .bind(source_id)
1395                .execute(&mut *tx)
1396                .await?;
1397
1398            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1399                .bind(source_id)
1400                .execute(&mut *tx)
1401                .await?;
1402        }
1403
1404        tx.commit().await?;
1405        Ok(true)
1406    }
1407
1408    // ── Forgetting sweep helpers ───────────────────────────────────────────────
1409
1410    /// Set `importance_score` for a single message by ID.
1411    ///
1412    /// Used in tests and by forgetting sweep helpers.
1413    ///
1414    /// # Errors
1415    ///
1416    /// Returns an error if the update fails.
1417    pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1418        zeph_db::query(sql!(
1419            "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1420        ))
1421        .bind(score)
1422        .bind(id)
1423        .execute(&self.pool)
1424        .await?;
1425        Ok(())
1426    }
1427
1428    /// Get `importance_score` for a single message by ID.
1429    ///
1430    /// Returns `None` if the message does not exist or is deleted.
1431    ///
1432    /// # Errors
1433    ///
1434    /// Returns an error if the query fails.
1435    pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1436        let row: Option<(f64,)> = zeph_db::query_as(sql!(
1437            "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1438        ))
1439        .bind(id)
1440        .fetch_optional(&self.pool)
1441        .await?;
1442        Ok(row.map(|(s,)| s))
1443    }
1444
1445    /// Increment `access_count` and update `last_accessed` for a batch of messages.
1446    ///
1447    /// Alias used in forgetting tests; forwards to `increment_access_counts`.
1448    ///
1449    /// # Errors
1450    ///
1451    /// Returns an error if the update fails.
1452    pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1453        self.increment_access_counts(ids).await
1454    }
1455
1456    /// Mark a set of messages as consolidated (`consolidated = 1`).
1457    ///
1458    /// Used in tests to simulate the state after consolidation.
1459    ///
1460    /// # Errors
1461    ///
1462    /// Returns an error if the update fails.
1463    pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1464        for &id in ids {
1465            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1466                .bind(id)
1467                .execute(&self.pool)
1468                .await?;
1469        }
1470        Ok(())
1471    }
1472
1473    /// Execute the three-phase forgetting sweep (`SleepGate`) inside a single transaction.
1474    ///
1475    /// **Phase 1** — Downscale all active non-consolidated importance scores by `decay_rate`.
1476    /// **Phase 2** — Undo the decay for messages accessed within `replay_window_hours` or
1477    ///   with `access_count >= replay_min_access_count` (undo current sweep's decay only).
1478    /// **Phase 3** — Soft-delete messages below `forgetting_floor` that are not protected
1479    ///   by recent access (`protect_recent_hours`) or high access count
1480    ///   (`protect_min_access_count`). Uses `batch_size` as a row-count cap.
1481    ///
1482    /// All phases commit atomically: concurrent readers see either the pre-sweep or
1483    /// post-sweep state, never an intermediate.
1484    ///
1485    /// # Errors
1486    ///
1487    /// Returns an error if any database operation fails.
1488    pub async fn run_forgetting_sweep_tx(
1489        &self,
1490        config: &zeph_common::config::memory::ForgettingConfig,
1491    ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1492        let mut tx = self.pool.begin().await?;
1493
1494        let decay = f64::from(config.decay_rate);
1495        let floor = f64::from(config.forgetting_floor);
1496        let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1497        let replay_hours = i64::from(config.replay_window_hours);
1498        let replay_min_access = i64::from(config.replay_min_access_count);
1499        let protect_hours = i64::from(config.protect_recent_hours);
1500        let protect_min_access = i64::from(config.protect_min_access_count);
1501
1502        // Phase 1: downscale all active, non-consolidated messages (limited to batch_size).
1503        // We target a specific set of IDs to respect sweep_batch_size.
1504        let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1505            "SELECT id FROM messages \
1506             WHERE deleted_at IS NULL AND consolidated = 0 \
1507             ORDER BY importance_score ASC \
1508             LIMIT ?"
1509        ))
1510        .bind(batch)
1511        .fetch_all(&mut *tx)
1512        .await?;
1513
1514        #[allow(clippy::cast_possible_truncation)]
1515        let downscaled = candidate_ids.len() as u32;
1516
1517        if downscaled > 0 {
1518            let placeholders: String = candidate_ids
1519                .iter()
1520                .map(|_| "?")
1521                .collect::<Vec<_>>()
1522                .join(",");
1523            let downscale_sql = format!(
1524                "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1525                 WHERE id IN ({placeholders})"
1526            );
1527            let mut q = zeph_db::query(&downscale_sql);
1528            for &(id,) in &candidate_ids {
1529                q = q.bind(id);
1530            }
1531            q.execute(&mut *tx).await?;
1532        }
1533
1534        // Phase 2: selective replay — undo decay for recently-accessed messages.
1535        // Formula: score / (1 - decay_rate) restores the current sweep's downscaling.
1536        // Cap at 1.0 to avoid exceeding the maximum importance score.
1537        // Scoped to the Phase 1 batch only: messages not decayed this sweep must not
1538        // have their scores inflated by the inverse formula.
1539        let replayed = if downscaled > 0 {
1540            let replay_placeholders: String = candidate_ids
1541                .iter()
1542                .map(|_| "?")
1543                .collect::<Vec<_>>()
1544                .join(",");
1545            let replay_sql = format!(
1546                "UPDATE messages \
1547                 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1548                 WHERE id IN ({replay_placeholders}) \
1549                 AND (\
1550                     (last_accessed IS NOT NULL \
1551                      AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1552                     OR access_count >= ?\
1553                 )"
1554            );
1555            let mut rq = zeph_db::query(&replay_sql);
1556            for &(id,) in &candidate_ids {
1557                rq = rq.bind(id);
1558            }
1559            let replay_result = rq
1560                .bind(replay_hours)
1561                .bind(replay_min_access)
1562                .execute(&mut *tx)
1563                .await?;
1564            #[allow(clippy::cast_possible_truncation)]
1565            let n = replay_result.rows_affected() as u32;
1566            n
1567        } else {
1568            0
1569        };
1570
1571        // Phase 3: targeted forgetting — soft-delete low-score unprotected messages.
1572        let prune_sql = format!(
1573            "UPDATE messages \
1574             SET deleted_at = CURRENT_TIMESTAMP \
1575             WHERE deleted_at IS NULL AND consolidated = 0 \
1576             AND importance_score < {floor} \
1577             AND (\
1578                 last_accessed IS NULL \
1579                 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1580             ) \
1581             AND access_count < ?"
1582        );
1583        let prune_result = zeph_db::query(&prune_sql)
1584            .bind(protect_hours)
1585            .bind(protect_min_access)
1586            .execute(&mut *tx)
1587            .await?;
1588        #[allow(clippy::cast_possible_truncation)]
1589        let pruned = prune_result.rows_affected() as u32;
1590
1591        tx.commit().await?;
1592
1593        Ok(crate::forgetting::ForgettingResult {
1594            downscaled,
1595            replayed,
1596            pruned,
1597        })
1598    }
1599}
1600
1601/// A candidate message for tier promotion, returned by [`SqliteStore::find_promotion_candidates`].
1602#[derive(Debug, Clone)]
1603pub struct PromotionCandidate {
1604    pub id: MessageId,
1605    pub conversation_id: ConversationId,
1606    pub content: String,
1607    pub session_count: u32,
1608    pub importance_score: f64,
1609}
1610
1611#[cfg(test)]
1612mod tests;