Skip to main content

zeph_memory/store/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4#[allow(unused_imports)]
5use zeph_common;
6use zeph_db::ActiveDialect;
7use zeph_db::fts::sanitize_fts_query;
8#[allow(unused_imports)]
9use zeph_db::{begin_write, sql};
10use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
11
12use super::SqliteStore;
13use crate::error::MemoryError;
14use crate::types::{ConversationId, MessageId};
15
16fn parse_role(s: &str) -> Role {
17    match s {
18        "assistant" => Role::Assistant,
19        "system" => Role::System,
20        _ => Role::User,
21    }
22}
23
24#[must_use]
25pub fn role_str(role: Role) -> &'static str {
26    match role {
27        Role::System => "system",
28        Role::User => "user",
29        Role::Assistant => "assistant",
30    }
31}
32
33/// Map a legacy externally-tagged variant key to the `kind` value used by the current
34/// internally-tagged schema.
35fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
36    match key {
37        "Text" => Some("text"),
38        "ToolOutput" => Some("tool_output"),
39        "Recall" => Some("recall"),
40        "CodeContext" => Some("code_context"),
41        "Summary" => Some("summary"),
42        "CrossSession" => Some("cross_session"),
43        "ToolUse" => Some("tool_use"),
44        "ToolResult" => Some("tool_result"),
45        "Image" => Some("image"),
46        "ThinkingBlock" => Some("thinking_block"),
47        "RedactedThinkingBlock" => Some("redacted_thinking_block"),
48        "Compaction" => Some("compaction"),
49        _ => None,
50    }
51}
52
53/// Attempt to parse a JSON string written in the pre-v0.17.1 externally-tagged format.
54///
55/// Old format: `[{"Summary":{"text":"..."}}, ...]`
56/// New format: `[{"kind":"summary","text":"..."}, ...]`
57///
58/// Returns `None` if the input does not look like the old format or if any element fails
59/// to deserialize after conversion.
60fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
61    let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
62    let mut result = Vec::with_capacity(array.len());
63    for element in array {
64        let obj = element.as_object()?;
65        if obj.contains_key("kind") {
66            return None;
67        }
68        if obj.len() != 1 {
69            return None;
70        }
71        let (key, inner) = obj.iter().next()?;
72        let kind = legacy_key_to_kind(key)?;
73        let mut new_obj = match inner {
74            serde_json::Value::Object(m) => m.clone(),
75            // Image variant wraps a single object directly
76            other => {
77                let mut m = serde_json::Map::new();
78                m.insert("data".to_string(), other.clone());
79                m
80            }
81        };
82        new_obj.insert(
83            "kind".to_string(),
84            serde_json::Value::String(kind.to_string()),
85        );
86        let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
87        result.push(part);
88    }
89    Some(result)
90}
91
92/// Deserialize message parts from a stored JSON string.
93///
94/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
95/// a truncated excerpt of the malformed JSON for diagnostics.
96fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
97    if parts_json == "[]" {
98        return vec![];
99    }
100    match serde_json::from_str(parts_json) {
101        Ok(p) => p,
102        Err(e) => {
103            if let Some(parts) = try_parse_legacy_parts(parts_json) {
104                let truncated = parts_json.chars().take(120).collect::<String>();
105                tracing::warn!(
106                    role = %role_str,
107                    parts_json = %truncated,
108                    "loaded legacy-format message parts via compat path"
109                );
110                return parts;
111            }
112            let truncated = parts_json.chars().take(120).collect::<String>();
113            tracing::warn!(
114                role = %role_str,
115                parts_json = %truncated,
116                error = %e,
117                "failed to deserialize message parts, falling back to empty"
118            );
119            vec![]
120        }
121    }
122}
123
124impl SqliteStore {
125    /// Create a new conversation and return its ID.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the insert fails.
130    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
131        let row: (ConversationId,) = zeph_db::query_as(sql!(
132            "INSERT INTO conversations DEFAULT VALUES RETURNING id"
133        ))
134        .fetch_one(&self.pool)
135        .await?;
136        Ok(row.0)
137    }
138
139    /// Save a message to the given conversation and return the message ID.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if the insert fails.
144    pub async fn save_message(
145        &self,
146        conversation_id: ConversationId,
147        role: &str,
148        content: &str,
149    ) -> Result<MessageId, MemoryError> {
150        self.save_message_with_parts(conversation_id, role, content, "[]")
151            .await
152    }
153
154    /// Save a message with structured parts JSON.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if the insert fails.
159    pub async fn save_message_with_parts(
160        &self,
161        conversation_id: ConversationId,
162        role: &str,
163        content: &str,
164        parts_json: &str,
165    ) -> Result<MessageId, MemoryError> {
166        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
167            .await
168    }
169
170    /// Save a message with visibility metadata.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the insert fails.
175    pub async fn save_message_with_metadata(
176        &self,
177        conversation_id: ConversationId,
178        role: &str,
179        content: &str,
180        parts_json: &str,
181        agent_visible: bool,
182        user_visible: bool,
183    ) -> Result<MessageId, MemoryError> {
184        let importance_score = crate::semantic::importance::compute_importance(content, role);
185        let row: (MessageId,) = zeph_db::query_as(
186            sql!("INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
187             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"),
188        )
189        .bind(conversation_id)
190        .bind(role)
191        .bind(content)
192        .bind(parts_json)
193        .bind(i64::from(agent_visible))
194        .bind(i64::from(user_visible))
195        .bind(importance_score)
196        .fetch_one(&self.pool)
197        .await?;
198        Ok(row.0)
199    }
200
201    /// Load the most recent messages for a conversation, up to `limit`.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the query fails.
206    pub async fn load_history(
207        &self,
208        conversation_id: ConversationId,
209        limit: u32,
210    ) -> Result<Vec<Message>, MemoryError> {
211        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(sql!(
212            "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
213                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
214                WHERE conversation_id = ? AND deleted_at IS NULL \
215                ORDER BY id DESC \
216                LIMIT ?\
217             ) ORDER BY id ASC"
218        ))
219        .bind(conversation_id)
220        .bind(limit)
221        .fetch_all(&self.pool)
222        .await?;
223
224        let messages = rows
225            .into_iter()
226            .map(
227                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
228                    let parts = parse_parts_json(&role_str, &parts_json);
229                    Message {
230                        role: parse_role(&role_str),
231                        content,
232                        parts,
233                        metadata: MessageMetadata {
234                            agent_visible: agent_visible != 0,
235                            user_visible: user_visible != 0,
236                            compacted_at: None,
237                            deferred_summary: None,
238                            focus_pinned: false,
239                            focus_marker_id: None,
240                            db_id: Some(row_id),
241                        },
242                    }
243                },
244            )
245            .collect();
246        Ok(messages)
247    }
248
249    /// Load messages filtered by visibility flags.
250    ///
251    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if the query fails.
256    pub async fn load_history_filtered(
257        &self,
258        conversation_id: ConversationId,
259        limit: u32,
260        agent_visible: Option<bool>,
261        user_visible: Option<bool>,
262    ) -> Result<Vec<Message>, MemoryError> {
263        let av = agent_visible.map(i64::from);
264        let uv = user_visible.map(i64::from);
265
266        let rows: Vec<(String, String, String, i64, i64, i64)> = zeph_db::query_as(
267            sql!("WITH recent AS (\
268                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
269                WHERE conversation_id = ? \
270                  AND deleted_at IS NULL \
271                  AND (? IS NULL OR agent_visible = ?) \
272                  AND (? IS NULL OR user_visible = ?) \
273                ORDER BY id DESC \
274                LIMIT ?\
275             ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC"),
276        )
277        .bind(conversation_id)
278        .bind(av)
279        .bind(av)
280        .bind(uv)
281        .bind(uv)
282        .bind(limit)
283        .fetch_all(&self.pool)
284        .await?;
285
286        let messages = rows
287            .into_iter()
288            .map(
289                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
290                    let parts = parse_parts_json(&role_str, &parts_json);
291                    Message {
292                        role: parse_role(&role_str),
293                        content,
294                        parts,
295                        metadata: MessageMetadata {
296                            agent_visible: agent_visible != 0,
297                            user_visible: user_visible != 0,
298                            compacted_at: None,
299                            deferred_summary: None,
300                            focus_pinned: false,
301                            focus_marker_id: None,
302                            db_id: Some(row_id),
303                        },
304                    }
305                },
306            )
307            .collect();
308        Ok(messages)
309    }
310
311    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
312    ///
313    /// Within a single transaction:
314    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
315    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
316    ///
317    /// Returns the `MessageId` of the inserted summary.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the transaction fails.
322    pub async fn replace_conversation(
323        &self,
324        conversation_id: ConversationId,
325        compacted_range: std::ops::RangeInclusive<MessageId>,
326        summary_role: &str,
327        summary_content: &str,
328    ) -> Result<MessageId, MemoryError> {
329        let now = {
330            let secs = std::time::SystemTime::now()
331                .duration_since(std::time::UNIX_EPOCH)
332                .unwrap_or_default()
333                .as_secs();
334            format!("{secs}")
335        };
336        let start_id = compacted_range.start().0;
337        let end_id = compacted_range.end().0;
338
339        let mut tx = self.pool.begin().await?;
340
341        zeph_db::query(sql!(
342            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
343             WHERE conversation_id = ? AND id >= ? AND id <= ?"
344        ))
345        .bind(&now)
346        .bind(conversation_id)
347        .bind(start_id)
348        .bind(end_id)
349        .execute(&mut *tx)
350        .await?;
351
352        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
353        let row: (MessageId,) = zeph_db::query_as(sql!(
354            "INSERT INTO messages \
355             (conversation_id, role, content, parts, agent_visible, user_visible) \
356             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id"
357        ))
358        .bind(conversation_id)
359        .bind(summary_role)
360        .bind(summary_content)
361        .fetch_one(&mut *tx)
362        .await?;
363
364        tx.commit().await?;
365
366        Ok(row.0)
367    }
368
369    /// Atomically hide `tool_use/tool_result` message pairs and insert summary messages.
370    ///
371    /// Within a single transaction:
372    /// 1. Sets `agent_visible=0, compacted_at=<now>` for each ID in `hide_ids`.
373    /// 2. Inserts each text in `summaries` as a new agent-only assistant message.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if the transaction fails.
378    pub async fn apply_tool_pair_summaries(
379        &self,
380        conversation_id: ConversationId,
381        hide_ids: &[i64],
382        summaries: &[String],
383    ) -> Result<(), MemoryError> {
384        if hide_ids.is_empty() && summaries.is_empty() {
385            return Ok(());
386        }
387
388        let now = std::time::SystemTime::now()
389            .duration_since(std::time::UNIX_EPOCH)
390            .unwrap_or_default()
391            .as_secs()
392            .to_string();
393
394        let mut tx = self.pool.begin().await?;
395
396        for &id in hide_ids {
397            zeph_db::query(sql!(
398                "UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?"
399            ))
400            .bind(&now)
401            .bind(id)
402            .execute(&mut *tx)
403            .await?;
404        }
405
406        for summary in summaries {
407            let content = format!("[tool summary] {summary}");
408            let parts = serde_json::to_string(&[MessagePart::Summary {
409                text: summary.clone(),
410            }])
411            .unwrap_or_else(|_| "[]".to_string());
412            zeph_db::query(sql!(
413                "INSERT INTO messages \
414                 (conversation_id, role, content, parts, agent_visible, user_visible) \
415                 VALUES (?, 'assistant', ?, ?, 1, 0)"
416            ))
417            .bind(conversation_id)
418            .bind(&content)
419            .bind(&parts)
420            .execute(&mut *tx)
421            .await?;
422        }
423
424        tx.commit().await?;
425        Ok(())
426    }
427
428    /// Return the IDs of the N oldest messages in a conversation (ascending order).
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if the query fails.
433    pub async fn oldest_message_ids(
434        &self,
435        conversation_id: ConversationId,
436        n: u32,
437    ) -> Result<Vec<MessageId>, MemoryError> {
438        let rows: Vec<(MessageId,)> = zeph_db::query_as(
439            sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
440        )
441        .bind(conversation_id)
442        .bind(n)
443        .fetch_all(&self.pool)
444        .await?;
445        Ok(rows.into_iter().map(|r| r.0).collect())
446    }
447
448    /// Return the ID of the most recent conversation, if any.
449    ///
450    /// # Errors
451    ///
452    /// Returns an error if the query fails.
453    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
454        let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
455            "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
456        ))
457        .fetch_optional(&self.pool)
458        .await?;
459        Ok(row.map(|r| r.0))
460    }
461
462    /// Fetch a single message by its ID.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the query fails.
467    pub async fn message_by_id(
468        &self,
469        message_id: MessageId,
470    ) -> Result<Option<Message>, MemoryError> {
471        let row: Option<(String, String, String, i64, i64)> = zeph_db::query_as(
472            sql!("SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL"),
473        )
474        .bind(message_id)
475        .fetch_optional(&self.pool)
476        .await?;
477
478        Ok(row.map(
479            |(role_str, content, parts_json, agent_visible, user_visible)| {
480                let parts = parse_parts_json(&role_str, &parts_json);
481                Message {
482                    role: parse_role(&role_str),
483                    content,
484                    parts,
485                    metadata: MessageMetadata {
486                        agent_visible: agent_visible != 0,
487                        user_visible: user_visible != 0,
488                        compacted_at: None,
489                        deferred_summary: None,
490                        focus_pinned: false,
491                        focus_marker_id: None,
492                        db_id: None,
493                    },
494                }
495            },
496        ))
497    }
498
499    /// Fetch messages by a list of IDs in a single query.
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if the query fails.
504    pub async fn messages_by_ids(
505        &self,
506        ids: &[MessageId],
507    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
508        if ids.is_empty() {
509            return Ok(Vec::new());
510        }
511
512        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
513
514        let query = format!(
515            "SELECT id, role, content, parts FROM messages \
516             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
517        );
518        let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
519        for &id in ids {
520            q = q.bind(id);
521        }
522
523        let rows = q.fetch_all(&self.pool).await?;
524
525        Ok(rows
526            .into_iter()
527            .map(|(id, role_str, content, parts_json)| {
528                let parts = parse_parts_json(&role_str, &parts_json);
529                (
530                    id,
531                    Message {
532                        role: parse_role(&role_str),
533                        content,
534                        parts,
535                        metadata: MessageMetadata::default(),
536                    },
537                )
538            })
539            .collect())
540    }
541
542    /// Return message IDs and content for messages without embeddings.
543    ///
544    /// # Errors
545    ///
546    /// Returns an error if the query fails.
547    pub async fn unembedded_message_ids(
548        &self,
549        limit: Option<usize>,
550    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
551        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
552
553        let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
554            "SELECT m.id, m.conversation_id, m.role, m.content \
555             FROM messages m \
556             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
557             WHERE em.id IS NULL AND m.deleted_at IS NULL \
558             ORDER BY m.id ASC \
559             LIMIT ?"
560        ))
561        .bind(effective_limit)
562        .fetch_all(&self.pool)
563        .await?;
564
565        Ok(rows)
566    }
567
568    /// Count the number of messages in a conversation.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the query fails.
573    pub async fn count_messages(
574        &self,
575        conversation_id: ConversationId,
576    ) -> Result<i64, MemoryError> {
577        let row: (i64,) = zeph_db::query_as(sql!(
578            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
579        ))
580        .bind(conversation_id)
581        .fetch_one(&self.pool)
582        .await?;
583        Ok(row.0)
584    }
585
586    /// Count messages in a conversation with id greater than `after_id`.
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if the query fails.
591    pub async fn count_messages_after(
592        &self,
593        conversation_id: ConversationId,
594        after_id: MessageId,
595    ) -> Result<i64, MemoryError> {
596        let row: (i64,) =
597            zeph_db::query_as(
598                sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
599            )
600            .bind(conversation_id)
601            .bind(after_id)
602            .fetch_one(&self.pool)
603            .await?;
604        Ok(row.0)
605    }
606
607    /// Full-text keyword search over messages using FTS5.
608    ///
609    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
610    /// negated to positive for consistency with vector scores).
611    ///
612    /// # Errors
613    ///
614    /// Returns an error if the query fails.
615    pub async fn keyword_search(
616        &self,
617        query: &str,
618        limit: usize,
619        conversation_id: Option<ConversationId>,
620    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
621        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
622        let safe_query = sanitize_fts_query(query);
623        if safe_query.is_empty() {
624            return Ok(Vec::new());
625        }
626
627        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
628            zeph_db::query_as(
629                sql!("SELECT m.id, -rank AS score \
630                 FROM messages_fts f \
631                 JOIN messages m ON m.id = f.rowid \
632                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
633                 ORDER BY rank \
634                 LIMIT ?"),
635            )
636            .bind(&safe_query)
637            .bind(cid)
638            .bind(effective_limit)
639            .fetch_all(&self.pool)
640            .await?
641        } else {
642            zeph_db::query_as(sql!(
643                "SELECT m.id, -rank AS score \
644                 FROM messages_fts f \
645                 JOIN messages m ON m.id = f.rowid \
646                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
647                 ORDER BY rank \
648                 LIMIT ?"
649            ))
650            .bind(&safe_query)
651            .bind(effective_limit)
652            .fetch_all(&self.pool)
653            .await?
654        };
655
656        Ok(rows)
657    }
658
659    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
660    ///
661    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
662    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
663    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
664    ///
665    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
666    /// `None` means "no bound" on that side.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if the query fails.
671    pub async fn keyword_search_with_time_range(
672        &self,
673        query: &str,
674        limit: usize,
675        conversation_id: Option<ConversationId>,
676        after: Option<&str>,
677        before: Option<&str>,
678    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
679        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
680        let safe_query = sanitize_fts_query(query);
681        if safe_query.is_empty() {
682            return Ok(Vec::new());
683        }
684
685        // Build time-range clauses dynamically. Both bounds are optional.
686        let after_clause = if after.is_some() {
687            " AND m.created_at > ?"
688        } else {
689            ""
690        };
691        let before_clause = if before.is_some() {
692            " AND m.created_at < ?"
693        } else {
694            ""
695        };
696        let conv_clause = if conversation_id.is_some() {
697            " AND m.conversation_id = ?"
698        } else {
699            ""
700        };
701
702        let sql = format!(
703            "SELECT m.id, -rank AS score \
704             FROM messages_fts f \
705             JOIN messages m ON m.id = f.rowid \
706             WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
707             {after_clause}{before_clause}{conv_clause} \
708             ORDER BY rank \
709             LIMIT ?"
710        );
711
712        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
713        if let Some(a) = after {
714            q = q.bind(a);
715        }
716        if let Some(b) = before {
717            q = q.bind(b);
718        }
719        if let Some(cid) = conversation_id {
720            q = q.bind(cid);
721        }
722        q = q.bind(effective_limit);
723
724        Ok(q.fetch_all(&self.pool).await?)
725    }
726
727    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
728    ///
729    /// Messages without a `created_at` column fall back to 0.
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if the query fails.
734    pub async fn message_timestamps(
735        &self,
736        ids: &[MessageId],
737    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
738        if ids.is_empty() {
739            return Ok(std::collections::HashMap::new());
740        }
741
742        let placeholders: String =
743            zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
744        let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
745        let query = format!(
746            "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
747        );
748        let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
749        for &id in ids {
750            q = q.bind(id);
751        }
752
753        let rows = q.fetch_all(&self.pool).await?;
754        Ok(rows.into_iter().collect())
755    }
756
757    /// Load a range of messages after a given message ID.
758    ///
759    /// # Errors
760    ///
761    /// Returns an error if the query fails.
762    pub async fn load_messages_range(
763        &self,
764        conversation_id: ConversationId,
765        after_message_id: MessageId,
766        limit: usize,
767    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
768        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
769
770        let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
771            "SELECT id, role, content FROM messages \
772             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
773             ORDER BY id ASC LIMIT ?"
774        ))
775        .bind(conversation_id)
776        .bind(after_message_id)
777        .bind(effective_limit)
778        .fetch_all(&self.pool)
779        .await?;
780
781        Ok(rows)
782    }
783
784    // ── Eviction helpers ──────────────────────────────────────────────────────
785
786    /// Return all non-deleted message IDs with their eviction metadata.
787    ///
788    /// # Errors
789    ///
790    /// Returns an error if the query fails.
791    pub async fn get_eviction_candidates(
792        &self,
793    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
794        let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
795            "SELECT id, created_at, last_accessed, access_count \
796             FROM messages WHERE deleted_at IS NULL"
797        ))
798        .fetch_all(&self.pool)
799        .await?;
800
801        Ok(rows
802            .into_iter()
803            .map(
804                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
805                    id,
806                    created_at,
807                    last_accessed,
808                    access_count: access_count.try_into().unwrap_or(0),
809                },
810            )
811            .collect())
812    }
813
814    /// Soft-delete a set of messages by marking `deleted_at`.
815    ///
816    /// Soft-deleted messages are excluded from all history queries.
817    ///
818    /// # Errors
819    ///
820    /// Returns an error if the update fails.
821    pub async fn soft_delete_messages(
822        &self,
823        ids: &[MessageId],
824    ) -> Result<(), crate::error::MemoryError> {
825        if ids.is_empty() {
826            return Ok(());
827        }
828        // SQLite does not support array binding natively. Batch via individual updates.
829        for &id in ids {
830            zeph_db::query(
831                sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
832            )
833            .bind(id)
834            .execute(&self.pool)
835            .await?;
836        }
837        Ok(())
838    }
839
840    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
841    ///
842    /// # Errors
843    ///
844    /// Returns an error if the query fails.
845    pub async fn get_soft_deleted_message_ids(
846        &self,
847    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
848        let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
849            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
850        ))
851        .fetch_all(&self.pool)
852        .await?;
853        Ok(rows.into_iter().map(|(id,)| id).collect())
854    }
855
856    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
857    ///
858    /// # Errors
859    ///
860    /// Returns an error if the update fails.
861    pub async fn mark_qdrant_cleaned(
862        &self,
863        ids: &[MessageId],
864    ) -> Result<(), crate::error::MemoryError> {
865        for &id in ids {
866            zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
867                .bind(id)
868                .execute(&self.pool)
869                .await?;
870        }
871        Ok(())
872    }
873
874    /// Fetch `importance_score` values for the given message IDs.
875    ///
876    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if the query fails.
881    pub async fn fetch_importance_scores(
882        &self,
883        ids: &[MessageId],
884    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
885        if ids.is_empty() {
886            return Ok(std::collections::HashMap::new());
887        }
888        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
889        let query = format!(
890            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
891        );
892        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
893        for &id in ids {
894            q = q.bind(id);
895        }
896        let rows = q.fetch_all(&self.pool).await?;
897        Ok(rows.into_iter().collect())
898    }
899
900    /// Increment `access_count` and set `last_accessed = CURRENT_TIMESTAMP` for the given IDs.
901    ///
902    /// Skips the update when `ids` is empty.
903    ///
904    /// # Errors
905    ///
906    /// Returns an error if the update fails.
907    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
908        if ids.is_empty() {
909            return Ok(());
910        }
911        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
912        let query = format!(
913            "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
914             WHERE id IN ({placeholders})"
915        );
916        let mut q = zeph_db::query(&query);
917        for &id in ids {
918            q = q.bind(id);
919        }
920        q.execute(&self.pool).await?;
921        Ok(())
922    }
923
924    // ── Tier promotion helpers ─────────────────────────────────────────────────
925
926    /// Return episodic messages with `session_count >= min_sessions`, ordered by
927    /// session count descending then importance score descending.
928    ///
929    /// # Errors
930    ///
931    /// Returns an error if the query fails.
932    pub async fn find_promotion_candidates(
933        &self,
934        min_sessions: u32,
935        batch_size: usize,
936    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
937        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
938        let min = i64::from(min_sessions);
939        let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
940            "SELECT id, conversation_id, content, session_count, importance_score \
941             FROM messages \
942             WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
943             ORDER BY session_count DESC, importance_score DESC \
944             LIMIT ?"
945        ))
946        .bind(min)
947        .bind(limit)
948        .fetch_all(&self.pool)
949        .await?;
950
951        Ok(rows
952            .into_iter()
953            .map(
954                |(id, conversation_id, content, session_count, importance_score)| {
955                    PromotionCandidate {
956                        id,
957                        conversation_id,
958                        content,
959                        session_count: session_count.try_into().unwrap_or(0),
960                        importance_score,
961                    }
962                },
963            )
964            .collect())
965    }
966
967    /// Count messages per tier (episodic, semantic) that are not deleted.
968    ///
969    /// Returns `(episodic_count, semantic_count)`.
970    ///
971    /// # Errors
972    ///
973    /// Returns an error if the query fails.
974    pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
975        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
976            "SELECT tier, COUNT(*) FROM messages \
977             WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
978             GROUP BY tier"
979        ))
980        .fetch_all(&self.pool)
981        .await?;
982
983        let mut episodic = 0i64;
984        let mut semantic = 0i64;
985        for (tier, count) in rows {
986            match tier.as_str() {
987                "episodic" => episodic = count,
988                "semantic" => semantic = count,
989                _ => {}
990            }
991        }
992        Ok((episodic, semantic))
993    }
994
995    /// Count semantic facts (tier='semantic', not deleted).
996    ///
997    /// # Errors
998    ///
999    /// Returns an error if the query fails.
1000    pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1001        let row: (i64,) = zeph_db::query_as(sql!(
1002            "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1003        ))
1004        .fetch_one(&self.pool)
1005        .await?;
1006        Ok(row.0)
1007    }
1008
1009    /// Promote a set of episodic messages to semantic tier in a single transaction.
1010    ///
1011    /// Within one transaction:
1012    /// 1. Inserts a new message with `tier='semantic'` and `promotion_timestamp=unixepoch()`.
1013    /// 2. Soft-deletes the original episodic messages and marks them `qdrant_cleaned=0`
1014    ///    so the eviction sweep picks up their Qdrant vectors.
1015    ///
1016    /// Returns the `MessageId` of the new semantic message.
1017    ///
1018    /// # Errors
1019    ///
1020    /// Returns an error if the transaction fails.
1021    pub async fn promote_to_semantic(
1022        &self,
1023        conversation_id: ConversationId,
1024        merged_content: &str,
1025        original_ids: &[MessageId],
1026    ) -> Result<MessageId, MemoryError> {
1027        if original_ids.is_empty() {
1028            return Err(MemoryError::Other(
1029                "promote_to_semantic: original_ids must not be empty".into(),
1030            ));
1031        }
1032
1033        // Acquire the write lock immediately (BEGIN IMMEDIATE) to avoid the DEFERRED read->write
1034        // upgrade race that causes SQLITE_BUSY when a concurrent writer holds the lock.
1035        let mut tx = begin_write(&self.pool).await?;
1036
1037        // Insert the new semantic fact.
1038        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1039        let promote_insert_raw = format!(
1040            "INSERT INTO messages \
1041             (conversation_id, role, content, parts, agent_visible, user_visible, \
1042              tier, promotion_timestamp) \
1043             VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', {epoch_now}) \
1044             RETURNING id"
1045        );
1046        let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1047        let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1048            .bind(conversation_id)
1049            .bind(merged_content)
1050            .fetch_one(&mut *tx)
1051            .await?;
1052
1053        let new_id = row.0;
1054
1055        // Soft-delete originals and reset qdrant_cleaned so eviction sweep removes vectors.
1056        for &id in original_ids {
1057            zeph_db::query(sql!(
1058                "UPDATE messages \
1059                 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1060                 WHERE id = ? AND deleted_at IS NULL"
1061            ))
1062            .bind(id)
1063            .execute(&mut *tx)
1064            .await?;
1065        }
1066
1067        tx.commit().await?;
1068        Ok(new_id)
1069    }
1070
1071    /// Manually promote a set of messages to semantic tier without merging.
1072    ///
1073    /// Sets `tier='semantic'` and `promotion_timestamp=unixepoch()` for the given IDs.
1074    /// Does NOT soft-delete the originals — use this for direct user-requested promotion.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if the update fails.
1079    pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1080        if ids.is_empty() {
1081            return Ok(0);
1082        }
1083        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1084        let manual_promote_raw = format!(
1085            "UPDATE messages \
1086             SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1087             WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1088        );
1089        let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1090        let mut count = 0usize;
1091        for &id in ids {
1092            let result = zeph_db::query(&manual_promote_sql)
1093                .bind(id)
1094                .execute(&self.pool)
1095                .await?;
1096            count += usize::try_from(result.rows_affected()).unwrap_or(0);
1097        }
1098        Ok(count)
1099    }
1100
1101    /// Increment `session_count` for all episodic messages in a conversation.
1102    ///
1103    /// Called when a session restores an existing conversation to mark that messages
1104    /// were accessed in a new session. Only episodic (non-deleted) messages are updated.
1105    ///
1106    /// # Errors
1107    ///
1108    /// Returns an error if the database update fails.
1109    pub async fn increment_session_counts_for_conversation(
1110        &self,
1111        conversation_id: ConversationId,
1112    ) -> Result<(), MemoryError> {
1113        zeph_db::query(sql!(
1114            "UPDATE messages SET session_count = session_count + 1 \
1115             WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1116        ))
1117        .bind(conversation_id)
1118        .execute(&self.pool)
1119        .await?;
1120        Ok(())
1121    }
1122
1123    /// Fetch the tier string for each of the given message IDs.
1124    ///
1125    /// Messages not found or already deleted are omitted from the result.
1126    ///
1127    /// # Errors
1128    ///
1129    /// Returns an error if the query fails.
1130    pub async fn fetch_tiers(
1131        &self,
1132        ids: &[MessageId],
1133    ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1134        if ids.is_empty() {
1135            return Ok(std::collections::HashMap::new());
1136        }
1137        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1138        let query = format!(
1139            "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1140        );
1141        let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1142        for &id in ids {
1143            q = q.bind(id);
1144        }
1145        let rows = q.fetch_all(&self.pool).await?;
1146        Ok(rows.into_iter().collect())
1147    }
1148
1149    /// Return all conversation IDs that have at least one non-consolidated original message.
1150    ///
1151    /// Used by the consolidation sweep to find conversations that need processing.
1152    ///
1153    /// # Errors
1154    ///
1155    /// Returns an error if the database query fails.
1156    pub async fn conversations_with_unconsolidated_messages(
1157        &self,
1158    ) -> Result<Vec<ConversationId>, MemoryError> {
1159        let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1160            "SELECT DISTINCT conversation_id FROM messages \
1161             WHERE consolidated = 0 AND deleted_at IS NULL"
1162        ))
1163        .fetch_all(&self.pool)
1164        .await?;
1165        Ok(rows.into_iter().map(|(id,)| id).collect())
1166    }
1167
1168    /// Fetch a batch of non-consolidated original messages for the consolidation sweep.
1169    ///
1170    /// Returns `(id, content)` pairs for messages that have not yet been processed by the
1171    /// consolidation loop (`consolidated = 0`) and are not soft-deleted.
1172    ///
1173    /// # Errors
1174    ///
1175    /// Returns an error if the database query fails.
1176    pub async fn find_unconsolidated_messages(
1177        &self,
1178        conversation_id: ConversationId,
1179        limit: usize,
1180    ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1181        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1182        let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1183            "SELECT id, content FROM messages \
1184             WHERE conversation_id = ? \
1185               AND consolidated = 0 \
1186               AND deleted_at IS NULL \
1187             ORDER BY id ASC \
1188             LIMIT ?"
1189        ))
1190        .bind(conversation_id)
1191        .bind(limit)
1192        .fetch_all(&self.pool)
1193        .await?;
1194        Ok(rows)
1195    }
1196
1197    /// Look up which consolidated entry (if any) covers the given original message.
1198    ///
1199    /// Returns the `consolidated_id` of the first consolidation product that lists `source_id`
1200    /// in its sources, or `None` if no consolidated entry covers it.
1201    ///
1202    /// # Errors
1203    ///
1204    /// Returns an error if the database query fails.
1205    pub async fn find_consolidated_for_source(
1206        &self,
1207        source_id: MessageId,
1208    ) -> Result<Option<MessageId>, MemoryError> {
1209        let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1210            "SELECT consolidated_id FROM memory_consolidation_sources \
1211             WHERE source_id = ? \
1212             LIMIT 1"
1213        ))
1214        .bind(source_id)
1215        .fetch_optional(&self.pool)
1216        .await?;
1217        Ok(row.map(|(id,)| id))
1218    }
1219
1220    /// Insert a consolidated message and record its source linkage in a single transaction.
1221    ///
1222    /// Atomically:
1223    /// 1. Inserts the consolidated message with `consolidated = 1` and the given confidence.
1224    /// 2. Inserts rows into `memory_consolidation_sources` for each source ID.
1225    /// 3. Marks each source message's `consolidated = 1` so future sweeps skip them.
1226    ///
1227    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1228    ///
1229    /// # Errors
1230    ///
1231    /// Returns an error if any database operation fails. The transaction is rolled back automatically
1232    /// on failure so no partial state is written.
1233    pub async fn apply_consolidation_merge(
1234        &self,
1235        conversation_id: ConversationId,
1236        role: &str,
1237        merged_content: &str,
1238        source_ids: &[MessageId],
1239        confidence: f32,
1240        confidence_threshold: f32,
1241    ) -> Result<bool, MemoryError> {
1242        if confidence < confidence_threshold {
1243            return Ok(false);
1244        }
1245        if source_ids.is_empty() {
1246            return Ok(false);
1247        }
1248
1249        let mut tx = self.pool.begin().await?;
1250
1251        let importance = crate::semantic::importance::compute_importance(merged_content, role);
1252        let row: (MessageId,) = zeph_db::query_as(sql!(
1253            "INSERT INTO messages \
1254               (conversation_id, role, content, parts, agent_visible, user_visible, \
1255                importance_score, consolidated, consolidation_confidence) \
1256             VALUES (?, ?, ?, '[]', 1, 1, ?, 1, ?) \
1257             RETURNING id"
1258        ))
1259        .bind(conversation_id)
1260        .bind(role)
1261        .bind(merged_content)
1262        .bind(importance)
1263        .bind(confidence)
1264        .fetch_one(&mut *tx)
1265        .await?;
1266        let consolidated_id = row.0;
1267
1268        let consol_sql = format!(
1269            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1270            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1271            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1272        );
1273        for &source_id in source_ids {
1274            zeph_db::query(&consol_sql)
1275                .bind(consolidated_id)
1276                .bind(source_id)
1277                .execute(&mut *tx)
1278                .await?;
1279
1280            // Mark original as consolidated so future sweeps skip it.
1281            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1282                .bind(source_id)
1283                .execute(&mut *tx)
1284                .await?;
1285        }
1286
1287        tx.commit().await?;
1288        Ok(true)
1289    }
1290
1291    /// Update an existing consolidated message in-place with new content.
1292    ///
1293    /// Atomically:
1294    /// 1. Updates `content` and `consolidation_confidence` on `target_id`.
1295    /// 2. Inserts rows into `memory_consolidation_sources` linking `target_id` → each source.
1296    /// 3. Marks each source message's `consolidated = 1`.
1297    ///
1298    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1299    ///
1300    /// # Errors
1301    ///
1302    /// Returns an error if any database operation fails.
1303    pub async fn apply_consolidation_update(
1304        &self,
1305        target_id: MessageId,
1306        new_content: &str,
1307        additional_source_ids: &[MessageId],
1308        confidence: f32,
1309        confidence_threshold: f32,
1310    ) -> Result<bool, MemoryError> {
1311        if confidence < confidence_threshold {
1312            return Ok(false);
1313        }
1314
1315        let mut tx = self.pool.begin().await?;
1316
1317        zeph_db::query(sql!(
1318            "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1319        ))
1320        .bind(new_content)
1321        .bind(confidence)
1322        .bind(target_id)
1323        .execute(&mut *tx)
1324        .await?;
1325
1326        let consol_sql = format!(
1327            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1328            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1329            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1330        );
1331        for &source_id in additional_source_ids {
1332            zeph_db::query(&consol_sql)
1333                .bind(target_id)
1334                .bind(source_id)
1335                .execute(&mut *tx)
1336                .await?;
1337
1338            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1339                .bind(source_id)
1340                .execute(&mut *tx)
1341                .await?;
1342        }
1343
1344        tx.commit().await?;
1345        Ok(true)
1346    }
1347
1348    // ── Forgetting sweep helpers ───────────────────────────────────────────────
1349
1350    /// Set `importance_score` for a single message by ID.
1351    ///
1352    /// Used in tests and by forgetting sweep helpers.
1353    ///
1354    /// # Errors
1355    ///
1356    /// Returns an error if the update fails.
1357    pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1358        zeph_db::query(sql!(
1359            "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1360        ))
1361        .bind(score)
1362        .bind(id)
1363        .execute(&self.pool)
1364        .await?;
1365        Ok(())
1366    }
1367
1368    /// Get `importance_score` for a single message by ID.
1369    ///
1370    /// Returns `None` if the message does not exist or is deleted.
1371    ///
1372    /// # Errors
1373    ///
1374    /// Returns an error if the query fails.
1375    pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1376        let row: Option<(f64,)> = zeph_db::query_as(sql!(
1377            "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1378        ))
1379        .bind(id)
1380        .fetch_optional(&self.pool)
1381        .await?;
1382        Ok(row.map(|(s,)| s))
1383    }
1384
1385    /// Increment `access_count` and update `last_accessed` for a batch of messages.
1386    ///
1387    /// Alias used in forgetting tests; forwards to `increment_access_counts`.
1388    ///
1389    /// # Errors
1390    ///
1391    /// Returns an error if the update fails.
1392    pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1393        self.increment_access_counts(ids).await
1394    }
1395
1396    /// Mark a set of messages as consolidated (`consolidated = 1`).
1397    ///
1398    /// Used in tests to simulate the state after consolidation.
1399    ///
1400    /// # Errors
1401    ///
1402    /// Returns an error if the update fails.
1403    pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1404        for &id in ids {
1405            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1406                .bind(id)
1407                .execute(&self.pool)
1408                .await?;
1409        }
1410        Ok(())
1411    }
1412
1413    /// Execute the three-phase forgetting sweep (`SleepGate`) inside a single transaction.
1414    ///
1415    /// **Phase 1** — Downscale all active non-consolidated importance scores by `decay_rate`.
1416    /// **Phase 2** — Undo the decay for messages accessed within `replay_window_hours` or
1417    ///   with `access_count >= replay_min_access_count` (undo current sweep's decay only).
1418    /// **Phase 3** — Soft-delete messages below `forgetting_floor` that are not protected
1419    ///   by recent access (`protect_recent_hours`) or high access count
1420    ///   (`protect_min_access_count`). Uses `batch_size` as a row-count cap.
1421    ///
1422    /// All phases commit atomically: concurrent readers see either the pre-sweep or
1423    /// post-sweep state, never an intermediate.
1424    ///
1425    /// # Errors
1426    ///
1427    /// Returns an error if any database operation fails.
1428    pub async fn run_forgetting_sweep_tx(
1429        &self,
1430        config: &zeph_common::config::memory::ForgettingConfig,
1431    ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1432        let mut tx = self.pool.begin().await?;
1433
1434        let decay = f64::from(config.decay_rate);
1435        let floor = f64::from(config.forgetting_floor);
1436        let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1437        let replay_hours = i64::from(config.replay_window_hours);
1438        let replay_min_access = i64::from(config.replay_min_access_count);
1439        let protect_hours = i64::from(config.protect_recent_hours);
1440        let protect_min_access = i64::from(config.protect_min_access_count);
1441
1442        // Phase 1: downscale all active, non-consolidated messages (limited to batch_size).
1443        // We target a specific set of IDs to respect sweep_batch_size.
1444        let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1445            "SELECT id FROM messages \
1446             WHERE deleted_at IS NULL AND consolidated = 0 \
1447             ORDER BY importance_score ASC \
1448             LIMIT ?"
1449        ))
1450        .bind(batch)
1451        .fetch_all(&mut *tx)
1452        .await?;
1453
1454        #[allow(clippy::cast_possible_truncation)]
1455        let downscaled = candidate_ids.len() as u32;
1456
1457        if downscaled > 0 {
1458            let placeholders: String = candidate_ids
1459                .iter()
1460                .map(|_| "?")
1461                .collect::<Vec<_>>()
1462                .join(",");
1463            let downscale_sql = format!(
1464                "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1465                 WHERE id IN ({placeholders})"
1466            );
1467            let mut q = zeph_db::query(&downscale_sql);
1468            for &(id,) in &candidate_ids {
1469                q = q.bind(id);
1470            }
1471            q.execute(&mut *tx).await?;
1472        }
1473
1474        // Phase 2: selective replay — undo decay for recently-accessed messages.
1475        // Formula: score / (1 - decay_rate) restores the current sweep's downscaling.
1476        // Cap at 1.0 to avoid exceeding the maximum importance score.
1477        // Scoped to the Phase 1 batch only: messages not decayed this sweep must not
1478        // have their scores inflated by the inverse formula.
1479        let replayed = if downscaled > 0 {
1480            let replay_placeholders: String = candidate_ids
1481                .iter()
1482                .map(|_| "?")
1483                .collect::<Vec<_>>()
1484                .join(",");
1485            let replay_sql = format!(
1486                "UPDATE messages \
1487                 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1488                 WHERE id IN ({replay_placeholders}) \
1489                 AND (\
1490                     (last_accessed IS NOT NULL \
1491                      AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1492                     OR access_count >= ?\
1493                 )"
1494            );
1495            let mut rq = zeph_db::query(&replay_sql);
1496            for &(id,) in &candidate_ids {
1497                rq = rq.bind(id);
1498            }
1499            let replay_result = rq
1500                .bind(replay_hours)
1501                .bind(replay_min_access)
1502                .execute(&mut *tx)
1503                .await?;
1504            #[allow(clippy::cast_possible_truncation)]
1505            let n = replay_result.rows_affected() as u32;
1506            n
1507        } else {
1508            0
1509        };
1510
1511        // Phase 3: targeted forgetting — soft-delete low-score unprotected messages.
1512        let prune_sql = format!(
1513            "UPDATE messages \
1514             SET deleted_at = CURRENT_TIMESTAMP \
1515             WHERE deleted_at IS NULL AND consolidated = 0 \
1516             AND importance_score < {floor} \
1517             AND (\
1518                 last_accessed IS NULL \
1519                 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1520             ) \
1521             AND access_count < ?"
1522        );
1523        let prune_result = zeph_db::query(&prune_sql)
1524            .bind(protect_hours)
1525            .bind(protect_min_access)
1526            .execute(&mut *tx)
1527            .await?;
1528        #[allow(clippy::cast_possible_truncation)]
1529        let pruned = prune_result.rows_affected() as u32;
1530
1531        tx.commit().await?;
1532
1533        Ok(crate::forgetting::ForgettingResult {
1534            downscaled,
1535            replayed,
1536            pruned,
1537        })
1538    }
1539}
1540
1541/// A candidate message for tier promotion, returned by [`SqliteStore::find_promotion_candidates`].
1542#[derive(Debug, Clone)]
1543pub struct PromotionCandidate {
1544    pub id: MessageId,
1545    pub conversation_id: ConversationId,
1546    pub content: String,
1547    pub session_count: u32,
1548    pub importance_score: f64,
1549}
1550
1551#[cfg(test)]
1552mod tests;