Skip to main content

zeph_memory/store/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::TryStreamExt as _;
5#[allow(unused_imports)]
6use zeph_common;
7use zeph_common::ContextFidelity;
8use zeph_db::ActiveDialect;
9use zeph_db::fts::sanitize_fts_query;
10#[allow(unused_imports)]
11use zeph_db::{begin_write, placeholder_list, sql};
12use zeph_llm::provider::{Message, MessageMetadata, MessagePart, MessageVisibility, Role};
13
14use super::SqliteStore;
15use crate::error::MemoryError;
16use crate::types::{ConversationId, MessageId};
17
18fn parse_role(s: &str) -> Role {
19    match s {
20        "assistant" => Role::Assistant,
21        "system" => Role::System,
22        _ => Role::User,
23    }
24}
25
26#[must_use]
27pub fn role_str(role: Role) -> &'static str {
28    match role {
29        Role::System => "system",
30        Role::Assistant => "assistant",
31        Role::User | _ => "user",
32    }
33}
34
35/// Map a legacy externally-tagged variant key to the `kind` value used by the current
36/// internally-tagged schema.
37fn legacy_key_to_kind(key: &str) -> Option<&'static str> {
38    match key {
39        "Text" => Some("text"),
40        "ToolOutput" => Some("tool_output"),
41        "Recall" => Some("recall"),
42        "CodeContext" => Some("code_context"),
43        "Summary" => Some("summary"),
44        "CrossSession" => Some("cross_session"),
45        "ToolUse" => Some("tool_use"),
46        "ToolResult" => Some("tool_result"),
47        "Image" => Some("image"),
48        "ThinkingBlock" => Some("thinking_block"),
49        "RedactedThinkingBlock" => Some("redacted_thinking_block"),
50        "Compaction" => Some("compaction"),
51        _ => None,
52    }
53}
54
55/// Attempt to parse a JSON string written in the pre-v0.17.1 externally-tagged format.
56///
57/// Old format: `[{"Summary":{"text":"..."}}, ...]`
58/// New format: `[{"kind":"summary","text":"..."}, ...]`
59///
60/// Returns `None` if the input does not look like the old format or if any element fails
61/// to deserialize after conversion.
62fn try_parse_legacy_parts(parts_json: &str) -> Option<Vec<MessagePart>> {
63    let array: Vec<serde_json::Value> = serde_json::from_str(parts_json).ok()?;
64    let mut result = Vec::with_capacity(array.len());
65    for element in array {
66        let obj = element.as_object()?;
67        if obj.contains_key("kind") {
68            return None;
69        }
70        if obj.len() != 1 {
71            return None;
72        }
73        let (key, inner) = obj.iter().next()?;
74        let kind = legacy_key_to_kind(key)?;
75        let mut new_obj = match inner {
76            serde_json::Value::Object(m) => m.clone(),
77            // Image variant wraps a single object directly
78            other => {
79                let mut m = serde_json::Map::new();
80                m.insert("data".to_string(), other.clone());
81                m
82            }
83        };
84        new_obj.insert(
85            "kind".to_string(),
86            serde_json::Value::String(kind.to_string()),
87        );
88        let part: MessagePart = serde_json::from_value(serde_json::Value::Object(new_obj)).ok()?;
89        result.push(part);
90    }
91    Some(result)
92}
93
94/// Deserialize message parts from a stored JSON string.
95///
96/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
97/// a truncated excerpt of the malformed JSON for diagnostics.
98fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
99    if parts_json == "[]" {
100        return vec![];
101    }
102    match serde_json::from_str(parts_json) {
103        Ok(p) => p,
104        Err(e) => {
105            if let Some(parts) = try_parse_legacy_parts(parts_json) {
106                let truncated = parts_json.chars().take(120).collect::<String>();
107                tracing::warn!(
108                    role = %role_str,
109                    parts_json = %truncated,
110                    "loaded legacy-format message parts via compat path"
111                );
112                return parts;
113            }
114            let truncated = parts_json.chars().take(120).collect::<String>();
115            tracing::warn!(
116                role = %role_str,
117                parts_json = %truncated,
118                error = %e,
119                "failed to deserialize message parts, falling back to empty"
120            );
121            vec![]
122        }
123    }
124}
125
126impl SqliteStore {
127    /// Create a new conversation and return its ID.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the insert fails.
132    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
133        let row: (ConversationId,) = zeph_db::query_as(sql!(
134            "INSERT INTO conversations DEFAULT VALUES RETURNING id"
135        ))
136        .fetch_one(&self.pool)
137        .await?;
138        Ok(row.0)
139    }
140
141    /// Save a message to the given conversation and return the message ID.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the insert fails.
146    pub async fn save_message(
147        &self,
148        conversation_id: ConversationId,
149        role: &str,
150        content: &str,
151    ) -> Result<MessageId, MemoryError> {
152        self.save_message_with_parts(conversation_id, role, content, "[]")
153            .await
154    }
155
156    /// Save a message with structured parts JSON.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the insert fails.
161    pub async fn save_message_with_parts(
162        &self,
163        conversation_id: ConversationId,
164        role: &str,
165        content: &str,
166        parts_json: &str,
167    ) -> Result<MessageId, MemoryError> {
168        self.save_message_with_metadata(
169            conversation_id,
170            role,
171            content,
172            parts_json,
173            MessageVisibility::Both,
174        )
175        .await
176    }
177
178    /// Save a message with an optional category tag.
179    ///
180    /// The `category` column is NULL when `None` — existing rows are unaffected.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the insert fails.
185    pub async fn save_message_with_category(
186        &self,
187        conversation_id: ConversationId,
188        role: &str,
189        content: &str,
190        category: Option<&str>,
191    ) -> Result<MessageId, MemoryError> {
192        let importance_score = crate::semantic::importance::compute_importance(content, role);
193        let row: (MessageId,) = zeph_db::query_as(sql!(
194            "INSERT INTO messages \
195                 (conversation_id, role, content, parts, visibility, \
196                  importance_score, category) \
197                 VALUES (?, ?, ?, '[]', 'both', ?, ?) RETURNING id"
198        ))
199        .bind(conversation_id)
200        .bind(role)
201        .bind(content)
202        .bind(importance_score)
203        .bind(category)
204        .fetch_one(&self.pool)
205        .await?;
206        Ok(row.0)
207    }
208
209    /// Save a message with visibility metadata.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the insert fails.
214    pub async fn save_message_with_metadata(
215        &self,
216        conversation_id: ConversationId,
217        role: &str,
218        content: &str,
219        parts_json: &str,
220        visibility: MessageVisibility,
221    ) -> Result<MessageId, MemoryError> {
222        const MAX_BYTES: usize = 100 * 1024;
223
224        // Truncate plain-text content only. `parts_json` is skipped because a
225        // mid-byte cut produces invalid JSON that breaks downstream deserialization.
226        let content_cow: std::borrow::Cow<'_, str> = if content.len() > MAX_BYTES {
227            let boundary = content.floor_char_boundary(MAX_BYTES);
228            tracing::debug!(
229                original_bytes = content.len(),
230                "save_message: content exceeds 100KB, truncating"
231            );
232            std::borrow::Cow::Owned(format!(
233                "{}... [truncated, {} bytes total]",
234                &content[..boundary],
235                content.len()
236            ))
237        } else {
238            std::borrow::Cow::Borrowed(content)
239        };
240
241        let importance_score = crate::semantic::importance::compute_importance(&content_cow, role);
242        let row: (MessageId,) = zeph_db::query_as(
243            sql!("INSERT INTO messages (conversation_id, role, content, parts, visibility, importance_score) \
244             VALUES (?, ?, ?, ?, ?, ?) RETURNING id"),
245        )
246        .bind(conversation_id)
247        .bind(role)
248        .bind(content_cow.as_ref())
249        .bind(parts_json)
250        .bind(visibility.as_db_str())
251        .bind(importance_score)
252        .fetch_one(&self.pool)
253        .await?;
254        Ok(row.0)
255    }
256
257    /// Load the most recent messages for a conversation, up to `limit`.
258    ///
259    /// Restores persisted `fidelity_tag` into [`MessageMetadata::fidelity_tag`].
260    /// A stored value of `0` maps to `None` (never scored / Full by default) so
261    /// messages written before CAM was enabled do not acquire a spurious floor.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the query fails.
266    pub async fn load_history(
267        &self,
268        conversation_id: ConversationId,
269        limit: u32,
270    ) -> Result<Vec<Message>, MemoryError> {
271        let rows: Vec<(String, String, String, String, i64, i32)> = zeph_db::query_as(sql!(
272            "SELECT role, content, parts, visibility, id, fidelity_tag FROM (\
273                SELECT role, content, parts, visibility, id, fidelity_tag FROM messages \
274                WHERE conversation_id = ? AND deleted_at IS NULL \
275                ORDER BY id DESC \
276                LIMIT ?\
277             ) ORDER BY id ASC"
278        ))
279        .bind(conversation_id)
280        .bind(i64::from(limit))
281        .fetch_all(&self.pool)
282        .await?;
283
284        let messages = rows
285            .into_iter()
286            .map(
287                |(role_str, content, parts_json, visibility_str, row_id, fidelity_raw)| {
288                    let parts = parse_parts_json(&role_str, &parts_json);
289                    Message {
290                        role: parse_role(&role_str),
291                        content,
292                        parts,
293                        metadata: MessageMetadata {
294                            visibility: MessageVisibility::from_db_str(&visibility_str),
295                            compacted_at: None,
296                            deferred_summary: None,
297                            focus_pinned: false,
298                            focus_marker_id: None,
299                            db_id: Some(row_id),
300                            fidelity_tag: if fidelity_raw == 0 {
301                                None
302                            } else {
303                                u8::try_from(fidelity_raw)
304                                    .ok()
305                                    .map(ContextFidelity::from_u8)
306                            },
307                            embedding: None,
308                        },
309                    }
310                },
311            )
312            .collect();
313        Ok(messages)
314    }
315
316    /// Load messages filtered by visibility flags.
317    ///
318    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
319    ///
320    /// Restores persisted `fidelity_tag` into [`MessageMetadata::fidelity_tag`].
321    /// A stored value of `0` maps to `None` (never scored / Full by default) so
322    /// messages written before CAM was enabled do not acquire a spurious floor.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the query fails.
327    pub async fn load_history_filtered(
328        &self,
329        conversation_id: ConversationId,
330        limit: u32,
331        agent_visible: Option<bool>,
332        user_visible: Option<bool>,
333    ) -> Result<Vec<Message>, MemoryError> {
334        // Map boolean filters to SQL predicates on the visibility column.
335        // agent_visible=true  → exclude 'user_only'  rows
336        // user_visible=true   → exclude 'agent_only' rows
337        // The two filters are independent; when both are Some(true) the
338        // combined effect is to keep only 'both' rows.
339        let exclude_user_only = agent_visible == Some(true);
340        let exclude_agent_only = user_visible == Some(true);
341
342        let rows: Vec<(String, String, String, String, i64, i32)> = zeph_db::query_as(sql!(
343            "WITH recent AS (\
344                SELECT role, content, parts, visibility, id, fidelity_tag FROM messages \
345                WHERE conversation_id = ? \
346                  AND deleted_at IS NULL \
347                  AND (NOT ? OR visibility != 'user_only') \
348                  AND (NOT ? OR visibility != 'agent_only') \
349                ORDER BY id DESC \
350                LIMIT ?\
351             ) SELECT role, content, parts, visibility, id, fidelity_tag FROM recent ORDER BY id ASC"
352        ))
353        .bind(conversation_id)
354        .bind(exclude_user_only)
355        .bind(exclude_agent_only)
356        .bind(i64::from(limit))
357        .fetch_all(&self.pool)
358        .await?;
359
360        let messages = rows
361            .into_iter()
362            .map(
363                |(role_str, content, parts_json, visibility_str, row_id, fidelity_raw)| {
364                    let parts = parse_parts_json(&role_str, &parts_json);
365                    Message {
366                        role: parse_role(&role_str),
367                        content,
368                        parts,
369                        metadata: MessageMetadata {
370                            visibility: MessageVisibility::from_db_str(&visibility_str),
371                            compacted_at: None,
372                            deferred_summary: None,
373                            focus_pinned: false,
374                            focus_marker_id: None,
375                            db_id: Some(row_id),
376                            fidelity_tag: if fidelity_raw == 0 {
377                                None
378                            } else {
379                                u8::try_from(fidelity_raw)
380                                    .ok()
381                                    .map(ContextFidelity::from_u8)
382                            },
383                            embedding: None,
384                        },
385                    }
386                },
387            )
388            .collect();
389        Ok(messages)
390    }
391
392    /// Batch-update fidelity tags for messages by their database IDs.
393    ///
394    /// Called after fidelity scoring to persist the assigned fidelity levels so subsequent
395    /// turns see the persisted floor invariant.
396    ///
397    /// Updates are chunked at 333 rows per statement to stay under `SQLITE_MAX_VARIABLE_NUMBER=999`
398    /// (each row uses 3 bind slots). All chunks execute in a single transaction for atomicity.
399    /// The operation is a no-op when `updates` is empty.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the transaction or any UPDATE fails.
404    pub async fn update_fidelity_tags(
405        &self,
406        updates: &[(MessageId, u8)],
407    ) -> Result<(), MemoryError> {
408        // Each row occupies 3 SQLite bind slots (2 for CASE arm + 1 for IN list).
409        // SQLITE_MAX_VARIABLE_NUMBER = 999, so max rows per statement = floor(999/3) = 333.
410        const MAX_FIDELITY_BATCH: usize = 333;
411        if updates.is_empty() {
412            return Ok(());
413        }
414        let mut tx = self.pool.begin().await?;
415        for chunk in updates.chunks(MAX_FIDELITY_BATCH) {
416            let case_arms: String = chunk
417                .iter()
418                .map(|_| "WHEN ? THEN ?")
419                .collect::<Vec<_>>()
420                .join(" ");
421            let in_list: String = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
422            let sql = format!(
423                "UPDATE messages SET fidelity_tag = CASE id {case_arms} END WHERE id IN ({in_list})"
424            );
425            let mut q = zeph_db::query(&sql);
426            for &(id, tag) in chunk {
427                q = q.bind(id.0).bind(i32::from(tag));
428            }
429            for &(id, _) in chunk {
430                q = q.bind(id.0);
431            }
432            q.execute(&mut *tx).await?;
433        }
434        tx.commit().await?;
435        Ok(())
436    }
437
438    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
439    ///
440    /// Within a single transaction:
441    /// 1. Updates `visibility=user_only, compacted_at=now` for messages in `compacted_range`.
442    /// 2. Inserts `summary_content` with `visibility=agent_only`.
443    ///
444    /// Returns the `MessageId` of the inserted summary.
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if the transaction fails.
449    pub async fn replace_conversation(
450        &self,
451        conversation_id: ConversationId,
452        compacted_range: std::ops::RangeInclusive<MessageId>,
453        summary_role: &str,
454        summary_content: &str,
455    ) -> Result<MessageId, MemoryError> {
456        let now = {
457            let secs = std::time::SystemTime::now()
458                .duration_since(std::time::UNIX_EPOCH)
459                .unwrap_or_default()
460                .as_secs();
461            format!("{secs}")
462        };
463        let start_id = compacted_range.start().0;
464        let end_id = compacted_range.end().0;
465
466        let mut tx = self.pool.begin().await?;
467
468        zeph_db::query(sql!(
469            "UPDATE messages SET visibility = 'user_only', compacted_at = ? \
470             WHERE conversation_id = ? AND id >= ? AND id <= ?"
471        ))
472        .bind(&now)
473        .bind(conversation_id)
474        .bind(start_id)
475        .bind(end_id)
476        .execute(&mut *tx)
477        .await?;
478
479        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
480        let row: (MessageId,) = zeph_db::query_as(sql!(
481            "INSERT INTO messages \
482             (conversation_id, role, content, parts, visibility) \
483             VALUES (?, ?, ?, '[]', 'agent_only') RETURNING id"
484        ))
485        .bind(conversation_id)
486        .bind(summary_role)
487        .bind(summary_content)
488        .fetch_one(&mut *tx)
489        .await?;
490
491        tx.commit().await?;
492
493        Ok(row.0)
494    }
495
496    /// Atomically hide `tool_use/tool_result` message pairs and insert summary messages.
497    ///
498    /// Within a single transaction:
499    /// 1. Sets `visibility=user_only, compacted_at=<now>` for each ID in `hide_ids`.
500    /// 2. Inserts each text in `summaries` as a new agent-only assistant message.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if the transaction fails.
505    pub async fn apply_tool_pair_summaries(
506        &self,
507        conversation_id: ConversationId,
508        hide_ids: &[i64],
509        summaries: &[String],
510    ) -> Result<(), MemoryError> {
511        if hide_ids.is_empty() && summaries.is_empty() {
512            return Ok(());
513        }
514
515        let now = std::time::SystemTime::now()
516            .duration_since(std::time::UNIX_EPOCH)
517            .unwrap_or_default()
518            .as_secs()
519            .to_string();
520
521        let mut tx = self.pool.begin().await?;
522
523        for &id in hide_ids {
524            zeph_db::query(sql!(
525                "UPDATE messages SET visibility = 'user_only', compacted_at = ? WHERE id = ?"
526            ))
527            .bind(&now)
528            .bind(id)
529            .execute(&mut *tx)
530            .await?;
531        }
532
533        for summary in summaries {
534            let content = format!("[tool summary] {summary}");
535            let parts = serde_json::to_string(&[MessagePart::Summary {
536                text: summary.clone(),
537            }])
538            .unwrap_or_else(|_| "[]".to_string());
539            zeph_db::query(sql!(
540                "INSERT INTO messages \
541                 (conversation_id, role, content, parts, visibility) \
542                 VALUES (?, 'assistant', ?, ?, 'agent_only')"
543            ))
544            .bind(conversation_id)
545            .bind(&content)
546            .bind(&parts)
547            .execute(&mut *tx)
548            .await?;
549        }
550
551        tx.commit().await?;
552        Ok(())
553    }
554
555    /// Return the IDs of the N oldest messages in a conversation (ascending order).
556    ///
557    /// # Errors
558    ///
559    /// Returns an error if the query fails.
560    pub async fn oldest_message_ids(
561        &self,
562        conversation_id: ConversationId,
563        n: u32,
564    ) -> Result<Vec<MessageId>, MemoryError> {
565        let rows: Vec<(MessageId,)> = zeph_db::query_as(
566            sql!("SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?"),
567        )
568        .bind(conversation_id)
569        .bind(i64::from(n))
570        .fetch_all(&self.pool)
571        .await?;
572        Ok(rows.into_iter().map(|r| r.0).collect())
573    }
574
575    /// Return the ID of the most recent conversation, if any.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if the query fails.
580    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
581        let row: Option<(ConversationId,)> = zeph_db::query_as(sql!(
582            "SELECT id FROM conversations ORDER BY id DESC LIMIT 1"
583        ))
584        .fetch_optional(&self.pool)
585        .await?;
586        Ok(row.map(|r| r.0))
587    }
588
589    /// Fetch a single message by its ID.
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if the query fails.
594    pub async fn message_by_id(
595        &self,
596        message_id: MessageId,
597    ) -> Result<Option<Message>, MemoryError> {
598        let row: Option<(String, String, String, String)> = zeph_db::query_as(
599            sql!("SELECT role, content, parts, visibility FROM messages WHERE id = ? AND deleted_at IS NULL"),
600        )
601        .bind(message_id)
602        .fetch_optional(&self.pool)
603        .await?;
604
605        Ok(row.map(|(role_str, content, parts_json, visibility_str)| {
606            let parts = parse_parts_json(&role_str, &parts_json);
607            Message {
608                role: parse_role(&role_str),
609                content,
610                parts,
611                metadata: MessageMetadata {
612                    visibility: MessageVisibility::from_db_str(&visibility_str),
613                    compacted_at: None,
614                    deferred_summary: None,
615                    focus_pinned: false,
616                    focus_marker_id: None,
617                    db_id: None,
618                    fidelity_tag: None,
619                    embedding: None,
620                },
621            }
622        }))
623    }
624
625    /// Fetch messages by a list of IDs in a single query.
626    ///
627    /// # Errors
628    ///
629    /// Returns an error if the query fails.
630    pub async fn messages_by_ids(
631        &self,
632        ids: &[MessageId],
633    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
634        if ids.is_empty() {
635            return Ok(Vec::new());
636        }
637
638        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
639
640        let query = format!(
641            "SELECT id, role, content, parts FROM messages \
642             WHERE id IN ({placeholders}) AND visibility != 'user_only' AND deleted_at IS NULL"
643        );
644        let mut q = zeph_db::query_as::<_, (MessageId, String, String, String)>(&query);
645        for &id in ids {
646            q = q.bind(id);
647        }
648
649        let rows = q.fetch_all(&self.pool).await?;
650
651        Ok(rows
652            .into_iter()
653            .map(|(id, role_str, content, parts_json)| {
654                let parts = parse_parts_json(&role_str, &parts_json);
655                (
656                    id,
657                    Message {
658                        role: parse_role(&role_str),
659                        content,
660                        parts,
661                        metadata: MessageMetadata {
662                            db_id: Some(id.0),
663                            ..MessageMetadata::default()
664                        },
665                    },
666                )
667            })
668            .collect())
669    }
670
671    /// Return message IDs and content for messages without embeddings.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if the query fails.
676    pub async fn unembedded_message_ids(
677        &self,
678        limit: Option<usize>,
679    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
680        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
681
682        let rows: Vec<(MessageId, ConversationId, String, String)> = zeph_db::query_as(sql!(
683            "SELECT m.id, m.conversation_id, m.role, m.content \
684             FROM messages m \
685             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
686             WHERE em.id IS NULL AND m.deleted_at IS NULL \
687             ORDER BY m.id ASC \
688             LIMIT ?"
689        ))
690        .bind(effective_limit)
691        .fetch_all(&self.pool)
692        .await?;
693
694        Ok(rows)
695    }
696
697    /// Count messages that have no embedding yet.
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if the query fails.
702    pub async fn count_unembedded_messages(&self) -> Result<usize, MemoryError> {
703        let row: (i64,) = zeph_db::query_as(sql!(
704            "SELECT COUNT(*) FROM messages m \
705             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
706             WHERE em.id IS NULL AND m.deleted_at IS NULL"
707        ))
708        .fetch_one(&self.pool)
709        .await?;
710        Ok(usize::try_from(row.0).unwrap_or(usize::MAX))
711    }
712
713    /// Stream message IDs and content for messages without embeddings, one row at a time.
714    ///
715    /// Executes the same query as [`Self::unembedded_message_ids`] but returns a streaming
716    /// cursor instead of loading all rows into a `Vec`. The `SQLite` read transaction is held
717    /// for the duration of iteration; callers must not write to `embeddings_metadata` while
718    /// the stream is live (use a separate async task for writes).
719    ///
720    /// # Errors
721    ///
722    /// Yields a [`MemoryError`] if the query or row decoding fails.
723    pub fn stream_unembedded_messages(
724        &self,
725        limit: i64,
726    ) -> impl futures::Stream<Item = Result<(MessageId, ConversationId, String, String), MemoryError>> + '_
727    {
728        zeph_db::query_as(sql!(
729            "SELECT m.id, m.conversation_id, m.role, m.content \
730             FROM messages m \
731             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
732             WHERE em.id IS NULL AND m.deleted_at IS NULL \
733             ORDER BY m.id ASC \
734             LIMIT ?"
735        ))
736        .bind(limit)
737        .fetch(&self.pool)
738        .map_err(MemoryError::from)
739    }
740
741    /// Count the number of messages in a conversation.
742    ///
743    /// # Errors
744    ///
745    /// Returns an error if the query fails.
746    pub async fn count_messages(
747        &self,
748        conversation_id: ConversationId,
749    ) -> Result<i64, MemoryError> {
750        let row: (i64,) = zeph_db::query_as(sql!(
751            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL"
752        ))
753        .bind(conversation_id)
754        .fetch_one(&self.pool)
755        .await?;
756        Ok(row.0)
757    }
758
759    /// Count messages in a conversation with id greater than `after_id`.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the query fails.
764    pub async fn count_messages_after(
765        &self,
766        conversation_id: ConversationId,
767        after_id: MessageId,
768    ) -> Result<i64, MemoryError> {
769        let row: (i64,) =
770            zeph_db::query_as(
771                sql!("SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL"),
772            )
773            .bind(conversation_id)
774            .bind(after_id)
775            .fetch_one(&self.pool)
776            .await?;
777        Ok(row.0)
778    }
779
780    /// Full-text keyword search over messages using FTS5.
781    ///
782    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
783    /// negated to positive for consistency with vector scores).
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if the query fails.
788    pub async fn keyword_search(
789        &self,
790        query: &str,
791        limit: usize,
792        conversation_id: Option<ConversationId>,
793    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
794        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
795        let safe_query = sanitize_fts_query(query);
796        if safe_query.is_empty() {
797            return Ok(Vec::new());
798        }
799
800        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
801            zeph_db::query_as(
802                sql!("SELECT m.id, -rank AS score \
803                 FROM messages_fts f \
804                 JOIN messages m ON m.id = f.rowid \
805                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
806                 ORDER BY rank \
807                 LIMIT ?"),
808            )
809            .bind(&safe_query)
810            .bind(cid)
811            .bind(effective_limit)
812            .fetch_all(&self.pool)
813            .await?
814        } else {
815            zeph_db::query_as(sql!(
816                "SELECT m.id, -rank AS score \
817                 FROM messages_fts f \
818                 JOIN messages m ON m.id = f.rowid \
819                 WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL \
820                 ORDER BY rank \
821                 LIMIT ?"
822            ))
823            .bind(&safe_query)
824            .bind(effective_limit)
825            .fetch_all(&self.pool)
826            .await?
827        };
828
829        Ok(rows)
830    }
831
832    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
833    ///
834    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
835    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
836    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
837    ///
838    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
839    /// `None` means "no bound" on that side.
840    ///
841    /// # Errors
842    ///
843    /// Returns an error if the query fails.
844    pub async fn keyword_search_with_time_range(
845        &self,
846        query: &str,
847        limit: usize,
848        conversation_id: Option<ConversationId>,
849        after: Option<&str>,
850        before: Option<&str>,
851    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
852        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
853        let safe_query = sanitize_fts_query(query);
854        if safe_query.is_empty() {
855            return Ok(Vec::new());
856        }
857
858        // Build time-range clauses dynamically. Both bounds are optional.
859        let after_clause = if after.is_some() {
860            " AND m.created_at > ?"
861        } else {
862            ""
863        };
864        let before_clause = if before.is_some() {
865            " AND m.created_at < ?"
866        } else {
867            ""
868        };
869        let conv_clause = if conversation_id.is_some() {
870            " AND m.conversation_id = ?"
871        } else {
872            ""
873        };
874
875        let sql = format!(
876            "SELECT m.id, -rank AS score \
877             FROM messages_fts f \
878             JOIN messages m ON m.id = f.rowid \
879             WHERE messages_fts MATCH ? AND m.visibility != 'user_only' AND m.deleted_at IS NULL\
880             {after_clause}{before_clause}{conv_clause} \
881             ORDER BY rank \
882             LIMIT ?"
883        );
884
885        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
886        if let Some(a) = after {
887            q = q.bind(a);
888        }
889        if let Some(b) = before {
890            q = q.bind(b);
891        }
892        if let Some(cid) = conversation_id {
893            q = q.bind(cid);
894        }
895        q = q.bind(effective_limit);
896
897        Ok(q.fetch_all(&self.pool).await?)
898    }
899
900    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
901    ///
902    /// Messages without a `created_at` column fall back to 0.
903    ///
904    /// # Errors
905    ///
906    /// Returns an error if the query fails.
907    #[tracing::instrument(name = "memory.store.message_timestamps", skip(self, ids), fields(count = ids.len()))]
908    pub async fn message_timestamps(
909        &self,
910        ids: &[MessageId],
911    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
912        if ids.is_empty() {
913            return Ok(std::collections::HashMap::new());
914        }
915
916        let placeholders: String =
917            zeph_db::rewrite_placeholders(&ids.iter().map(|_| "?").collect::<Vec<_>>().join(","));
918        let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
919        let query = format!(
920            "SELECT id, {epoch_expr} FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
921        );
922        let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
923        for &id in ids {
924            q = q.bind(id);
925        }
926
927        let rows = q.fetch_all(&self.pool).await?;
928        Ok(rows.into_iter().collect())
929    }
930
931    /// Load a range of messages after a given message ID.
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the query fails.
936    pub async fn load_messages_range(
937        &self,
938        conversation_id: ConversationId,
939        after_message_id: MessageId,
940        limit: usize,
941    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
942        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
943
944        let rows: Vec<(MessageId, String, String)> = zeph_db::query_as(sql!(
945            "SELECT id, role, content FROM messages \
946             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
947             ORDER BY id ASC LIMIT ?"
948        ))
949        .bind(conversation_id)
950        .bind(after_message_id)
951        .bind(effective_limit)
952        .fetch_all(&self.pool)
953        .await?;
954
955        Ok(rows)
956    }
957
958    // ── Eviction helpers ──────────────────────────────────────────────────────
959
960    /// Return all non-deleted message IDs with their eviction metadata.
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the query fails.
965    pub async fn get_eviction_candidates(
966        &self,
967    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
968        let rows: Vec<(MessageId, String, Option<String>, i64)> = zeph_db::query_as(sql!(
969            "SELECT id, created_at, last_accessed, access_count \
970             FROM messages WHERE deleted_at IS NULL"
971        ))
972        .fetch_all(&self.pool)
973        .await?;
974
975        Ok(rows
976            .into_iter()
977            .map(
978                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
979                    id,
980                    created_at,
981                    last_accessed,
982                    access_count: access_count.try_into().unwrap_or(0),
983                },
984            )
985            .collect())
986    }
987
988    /// Soft-delete a set of messages by marking `deleted_at`.
989    ///
990    /// Soft-deleted messages are excluded from all history queries.
991    ///
992    /// # Errors
993    ///
994    /// Returns an error if the update fails.
995    pub async fn soft_delete_messages(
996        &self,
997        ids: &[MessageId],
998    ) -> Result<(), crate::error::MemoryError> {
999        if ids.is_empty() {
1000            return Ok(());
1001        }
1002        // SQLite does not support array binding natively. Batch via individual updates.
1003        for &id in ids {
1004            zeph_db::query(
1005                sql!("UPDATE messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = ? AND deleted_at IS NULL"),
1006            )
1007            .bind(id)
1008            .execute(&self.pool)
1009            .await?;
1010        }
1011        Ok(())
1012    }
1013
1014    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
1015    ///
1016    /// # Errors
1017    ///
1018    /// Returns an error if the query fails.
1019    pub async fn get_soft_deleted_message_ids(
1020        &self,
1021    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
1022        let rows: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1023            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0"
1024        ))
1025        .fetch_all(&self.pool)
1026        .await?;
1027        Ok(rows.into_iter().map(|(id,)| id).collect())
1028    }
1029
1030    /// Filter `candidate_ids` to retain only IDs that are NOT covered by any
1031    /// summary's `[first_message_id, last_message_id]` range (MM-F4, #3341).
1032    ///
1033    /// Returns the **safe-to-delete** subset: candidate IDs that do not fall inside
1034    /// any summary range. This enforces the data-integrity invariant that raw episodes
1035    /// referenced by a summary can never be soft-deleted by the eviction sweep.
1036    ///
1037    /// Batched at `MAX_BATCH = 490` placeholders per chunk to stay under the `SQLite`
1038    /// 999-parameter limit. The `idx_summaries_message_range` partial index (migration 077)
1039    /// makes the inner `NOT EXISTS` range probe efficient.
1040    ///
1041    /// # Errors
1042    ///
1043    /// Returns [`MemoryError`] if the underlying query fails.
1044    pub async fn filter_out_preserved_episode_ids(
1045        &self,
1046        candidate_ids: &[MessageId],
1047    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
1048        const MAX_BATCH: usize = 490;
1049        if candidate_ids.is_empty() {
1050            return Ok(Vec::new());
1051        }
1052        let mut safe_to_delete: Vec<MessageId> = Vec::with_capacity(candidate_ids.len());
1053        for chunk in candidate_ids.chunks(MAX_BATCH) {
1054            let placeholders = placeholder_list(1, chunk.len());
1055            let sql = format!(
1056                "SELECT m.id \
1057                   FROM messages m \
1058                  WHERE m.id IN ({placeholders}) \
1059                    AND NOT EXISTS ( \
1060                          SELECT 1 \
1061                            FROM summaries s \
1062                           WHERE s.first_message_id IS NOT NULL \
1063                             AND s.last_message_id IS NOT NULL \
1064                             AND m.id >= s.first_message_id \
1065                             AND m.id <= s.last_message_id \
1066                        )"
1067            );
1068            let mut q = zeph_db::query_as::<_, (MessageId,)>(&sql);
1069            for &id in chunk {
1070                q = q.bind(id);
1071            }
1072            let rows: Vec<(MessageId,)> = q.fetch_all(&self.pool).await?;
1073            safe_to_delete.extend(rows.into_iter().map(|(id,)| id));
1074        }
1075        Ok(safe_to_delete)
1076    }
1077
1078    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
1079    ///
1080    /// # Errors
1081    ///
1082    /// Returns an error if the update fails.
1083    pub async fn mark_qdrant_cleaned(
1084        &self,
1085        ids: &[MessageId],
1086    ) -> Result<(), crate::error::MemoryError> {
1087        for &id in ids {
1088            zeph_db::query(sql!("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?"))
1089                .bind(id)
1090                .execute(&self.pool)
1091                .await?;
1092        }
1093        Ok(())
1094    }
1095
1096    /// Fetch `importance_score` values for the given message IDs.
1097    ///
1098    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
1099    ///
1100    /// # Errors
1101    ///
1102    /// Returns an error if the query fails.
1103    pub async fn fetch_importance_scores(
1104        &self,
1105        ids: &[MessageId],
1106    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
1107        if ids.is_empty() {
1108            return Ok(std::collections::HashMap::new());
1109        }
1110        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1111        let query = format!(
1112            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1113        );
1114        let mut q = zeph_db::query_as::<_, (MessageId, f64)>(&query);
1115        for &id in ids {
1116            q = q.bind(id);
1117        }
1118        let rows = q.fetch_all(&self.pool).await?;
1119        Ok(rows.into_iter().collect())
1120    }
1121
1122    /// Increment `access_count` and set `last_accessed = CURRENT_TIMESTAMP` for the given IDs.
1123    ///
1124    /// Skips the update when `ids` is empty.
1125    ///
1126    /// # Errors
1127    ///
1128    /// Returns an error if the update fails.
1129    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1130        if ids.is_empty() {
1131            return Ok(());
1132        }
1133        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1134        let query = format!(
1135            "UPDATE messages SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP \
1136             WHERE id IN ({placeholders})"
1137        );
1138        let mut q = zeph_db::query(&query);
1139        for &id in ids {
1140            q = q.bind(id);
1141        }
1142        q.execute(&self.pool).await?;
1143        Ok(())
1144    }
1145
1146    /// Fetch `access_count` for the given message IDs.
1147    ///
1148    /// Messages not found or already deleted are omitted from the result.
1149    ///
1150    /// Used by the MEMTIER cognitive signal to approximate message importance
1151    /// based on access frequency.
1152    ///
1153    /// # Errors
1154    ///
1155    /// Returns an error if the query fails.
1156    #[tracing::instrument(name = "memory.store.message_access_counts", skip(self, ids), fields(count = ids.len()))]
1157    pub async fn message_access_counts(
1158        &self,
1159        ids: &[MessageId],
1160    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
1161        if ids.is_empty() {
1162            return Ok(std::collections::HashMap::new());
1163        }
1164        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1165        let query = format!(
1166            "SELECT id, access_count FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1167        );
1168        let mut q = zeph_db::query_as::<_, (MessageId, i64)>(&query);
1169        for &id in ids {
1170            q = q.bind(id);
1171        }
1172        let rows = q.fetch_all(&self.pool).await?;
1173        Ok(rows.into_iter().collect())
1174    }
1175
1176    // ── Tier promotion helpers ─────────────────────────────────────────────────
1177
1178    /// Return episodic messages with `session_count >= min_sessions`, ordered by
1179    /// session count descending then importance score descending.
1180    ///
1181    /// # Errors
1182    ///
1183    /// Returns an error if the query fails.
1184    pub async fn find_promotion_candidates(
1185        &self,
1186        min_sessions: u32,
1187        batch_size: usize,
1188    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
1189        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
1190        let min = i64::from(min_sessions);
1191        let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = zeph_db::query_as(sql!(
1192            "SELECT id, conversation_id, content, session_count, importance_score \
1193             FROM messages \
1194             WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
1195             ORDER BY session_count DESC, importance_score DESC \
1196             LIMIT ?"
1197        ))
1198        .bind(min)
1199        .bind(limit)
1200        .fetch_all(&self.pool)
1201        .await?;
1202
1203        Ok(rows
1204            .into_iter()
1205            .map(
1206                |(id, conversation_id, content, session_count, importance_score)| {
1207                    PromotionCandidate {
1208                        id,
1209                        conversation_id,
1210                        content,
1211                        session_count: session_count.try_into().unwrap_or(0),
1212                        importance_score,
1213                    }
1214                },
1215            )
1216            .collect())
1217    }
1218
1219    /// Count messages per tier (episodic, semantic) that are not deleted.
1220    ///
1221    /// Returns `(episodic_count, semantic_count)`.
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns an error if the query fails.
1226    pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
1227        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1228            "SELECT tier, COUNT(*) FROM messages \
1229             WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
1230             GROUP BY tier"
1231        ))
1232        .fetch_all(&self.pool)
1233        .await?;
1234
1235        let mut episodic = 0i64;
1236        let mut semantic = 0i64;
1237        for (tier, count) in rows {
1238            match tier.as_str() {
1239                "episodic" => episodic = count,
1240                "semantic" => semantic = count,
1241                _ => {}
1242            }
1243        }
1244        Ok((episodic, semantic))
1245    }
1246
1247    /// Count semantic facts (tier='semantic', not deleted).
1248    ///
1249    /// # Errors
1250    ///
1251    /// Returns an error if the query fails.
1252    pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
1253        let row: (i64,) = zeph_db::query_as(sql!(
1254            "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL"
1255        ))
1256        .fetch_one(&self.pool)
1257        .await?;
1258        Ok(row.0)
1259    }
1260
1261    /// Promote a set of episodic messages to semantic tier in a single transaction.
1262    ///
1263    /// Within one transaction:
1264    /// 1. Inserts a new message with `tier='semantic'` and `promotion_timestamp=unixepoch()`.
1265    /// 2. Soft-deletes the original episodic messages and marks them `qdrant_cleaned=0`
1266    ///    so the eviction sweep picks up their Qdrant vectors.
1267    ///
1268    /// Returns the `MessageId` of the new semantic message.
1269    ///
1270    /// # Errors
1271    ///
1272    /// Returns an error if the transaction fails.
1273    pub async fn promote_to_semantic(
1274        &self,
1275        conversation_id: ConversationId,
1276        merged_content: &str,
1277        original_ids: &[MessageId],
1278    ) -> Result<MessageId, MemoryError> {
1279        if original_ids.is_empty() {
1280            return Err(MemoryError::InvalidInput(
1281                "promote_to_semantic: original_ids must not be empty".into(),
1282            ));
1283        }
1284
1285        // Acquire the write lock immediately (BEGIN IMMEDIATE) to avoid the DEFERRED read->write
1286        // upgrade race that causes SQLITE_BUSY when a concurrent writer holds the lock.
1287        let mut tx = begin_write(&self.pool).await?;
1288
1289        // Insert the new semantic fact.
1290        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1291        let promote_insert_raw = format!(
1292            "INSERT INTO messages \
1293             (conversation_id, role, content, parts, visibility, \
1294              tier, promotion_timestamp) \
1295             VALUES (?, 'assistant', ?, '[]', 'agent_only', 'semantic', {epoch_now}) \
1296             RETURNING id"
1297        );
1298        let promote_insert_sql = zeph_db::rewrite_placeholders(&promote_insert_raw);
1299        let row: (MessageId,) = zeph_db::query_as(&promote_insert_sql)
1300            .bind(conversation_id)
1301            .bind(merged_content)
1302            .fetch_one(&mut *tx)
1303            .await?;
1304
1305        let new_id = row.0;
1306
1307        // Soft-delete originals and reset qdrant_cleaned so eviction sweep removes vectors.
1308        for &id in original_ids {
1309            zeph_db::query(sql!(
1310                "UPDATE messages \
1311                 SET deleted_at = CURRENT_TIMESTAMP, qdrant_cleaned = 0 \
1312                 WHERE id = ? AND deleted_at IS NULL"
1313            ))
1314            .bind(id)
1315            .execute(&mut *tx)
1316            .await?;
1317        }
1318
1319        tx.commit().await?;
1320        Ok(new_id)
1321    }
1322
1323    /// Manually promote a set of messages to semantic tier without merging.
1324    ///
1325    /// Sets `tier='semantic'` and `promotion_timestamp=unixepoch()` for the given IDs.
1326    /// Does NOT soft-delete the originals — use this for direct user-requested promotion.
1327    ///
1328    /// # Errors
1329    ///
1330    /// Returns an error if the update fails.
1331    pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1332        if ids.is_empty() {
1333            return Ok(0);
1334        }
1335        let epoch_now = <zeph_db::ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1336        let manual_promote_raw = format!(
1337            "UPDATE messages \
1338             SET tier = 'semantic', promotion_timestamp = {epoch_now} \
1339             WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'"
1340        );
1341        let manual_promote_sql = zeph_db::rewrite_placeholders(&manual_promote_raw);
1342        let mut count = 0usize;
1343        for &id in ids {
1344            let result = zeph_db::query(&manual_promote_sql)
1345                .bind(id)
1346                .execute(&self.pool)
1347                .await?;
1348            count += usize::try_from(result.rows_affected()).unwrap_or(0);
1349        }
1350        Ok(count)
1351    }
1352
1353    /// Increment `session_count` for all episodic messages in a conversation.
1354    ///
1355    /// Called when a session restores an existing conversation to mark that messages
1356    /// were accessed in a new session. Only episodic (non-deleted) messages are updated.
1357    ///
1358    /// # Errors
1359    ///
1360    /// Returns an error if the database update fails.
1361    pub async fn increment_session_counts_for_conversation(
1362        &self,
1363        conversation_id: ConversationId,
1364    ) -> Result<(), MemoryError> {
1365        zeph_db::query(sql!(
1366            "UPDATE messages SET session_count = session_count + 1 \
1367             WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL"
1368        ))
1369        .bind(conversation_id)
1370        .execute(&self.pool)
1371        .await?;
1372        Ok(())
1373    }
1374
1375    /// Fetch the tier string for each of the given message IDs.
1376    ///
1377    /// Messages not found or already deleted are omitted from the result.
1378    ///
1379    /// # Errors
1380    ///
1381    /// Returns an error if the query fails.
1382    #[tracing::instrument(name = "memory.store.fetch_tiers", skip(self, ids), fields(count = ids.len()))]
1383    pub async fn fetch_tiers(
1384        &self,
1385        ids: &[MessageId],
1386    ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1387        if ids.is_empty() {
1388            return Ok(std::collections::HashMap::new());
1389        }
1390        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1391        let query = format!(
1392            "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1393        );
1394        let mut q = zeph_db::query_as::<_, (MessageId, String)>(&query);
1395        for &id in ids {
1396            q = q.bind(id);
1397        }
1398        let rows = q.fetch_all(&self.pool).await?;
1399        Ok(rows.into_iter().collect())
1400    }
1401
1402    /// Return all conversation IDs that have at least one non-consolidated original message.
1403    ///
1404    /// Used by the consolidation sweep to find conversations that need processing.
1405    ///
1406    /// # Errors
1407    ///
1408    /// Returns an error if the database query fails.
1409    pub async fn conversations_with_unconsolidated_messages(
1410        &self,
1411    ) -> Result<Vec<ConversationId>, MemoryError> {
1412        let rows: Vec<(ConversationId,)> = zeph_db::query_as(sql!(
1413            "SELECT DISTINCT conversation_id FROM messages \
1414             WHERE consolidated = 0 AND deleted_at IS NULL"
1415        ))
1416        .fetch_all(&self.pool)
1417        .await?;
1418        Ok(rows.into_iter().map(|(id,)| id).collect())
1419    }
1420
1421    /// Fetch a batch of non-consolidated original messages for the consolidation sweep.
1422    ///
1423    /// Returns `(id, content)` pairs for messages that have not yet been processed by the
1424    /// consolidation loop (`consolidated = 0`) and are not soft-deleted.
1425    ///
1426    /// # Errors
1427    ///
1428    /// Returns an error if the database query fails.
1429    pub async fn find_unconsolidated_messages(
1430        &self,
1431        conversation_id: ConversationId,
1432        limit: usize,
1433    ) -> Result<Vec<(MessageId, String)>, MemoryError> {
1434        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
1435        let rows: Vec<(MessageId, String)> = zeph_db::query_as(sql!(
1436            "SELECT id, content FROM messages \
1437             WHERE conversation_id = ? \
1438               AND consolidated = 0 \
1439               AND deleted_at IS NULL \
1440             ORDER BY id ASC \
1441             LIMIT ?"
1442        ))
1443        .bind(conversation_id)
1444        .bind(limit)
1445        .fetch_all(&self.pool)
1446        .await?;
1447        Ok(rows)
1448    }
1449
1450    /// Look up which consolidated entry (if any) covers the given original message.
1451    ///
1452    /// Returns the `consolidated_id` of the first consolidation product that lists `source_id`
1453    /// in its sources, or `None` if no consolidated entry covers it.
1454    ///
1455    /// # Errors
1456    ///
1457    /// Returns an error if the database query fails.
1458    pub async fn find_consolidated_for_source(
1459        &self,
1460        source_id: MessageId,
1461    ) -> Result<Option<MessageId>, MemoryError> {
1462        let row: Option<(MessageId,)> = zeph_db::query_as(sql!(
1463            "SELECT consolidated_id FROM memory_consolidation_sources \
1464             WHERE source_id = ? \
1465             LIMIT 1"
1466        ))
1467        .bind(source_id)
1468        .fetch_optional(&self.pool)
1469        .await?;
1470        Ok(row.map(|(id,)| id))
1471    }
1472
1473    /// Insert a consolidated message and record its source linkage in a single transaction.
1474    ///
1475    /// Atomically:
1476    /// 1. Inserts the consolidated message with `consolidated = 1` and the given confidence.
1477    /// 2. Inserts rows into `memory_consolidation_sources` for each source ID.
1478    /// 3. Marks each source message's `consolidated = 1` so future sweeps skip them.
1479    ///
1480    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1481    ///
1482    /// # Errors
1483    ///
1484    /// Returns an error if any database operation fails. The transaction is rolled back automatically
1485    /// on failure so no partial state is written.
1486    pub async fn apply_consolidation_merge(
1487        &self,
1488        conversation_id: ConversationId,
1489        role: &str,
1490        merged_content: &str,
1491        source_ids: &[MessageId],
1492        confidence: f32,
1493        confidence_threshold: f32,
1494    ) -> Result<bool, MemoryError> {
1495        if confidence < confidence_threshold {
1496            return Ok(false);
1497        }
1498        if source_ids.is_empty() {
1499            return Ok(false);
1500        }
1501
1502        let mut tx = self.pool.begin().await?;
1503
1504        let importance = crate::semantic::importance::compute_importance(merged_content, role);
1505        let row: (MessageId,) = zeph_db::query_as(sql!(
1506            "INSERT INTO messages \
1507               (conversation_id, role, content, parts, visibility, \
1508                importance_score, consolidated, consolidation_confidence) \
1509             VALUES (?, ?, ?, '[]', 'both', ?, 1, ?) \
1510             RETURNING id"
1511        ))
1512        .bind(conversation_id)
1513        .bind(role)
1514        .bind(merged_content)
1515        .bind(importance)
1516        .bind(confidence)
1517        .fetch_one(&mut *tx)
1518        .await?;
1519        let consolidated_id = row.0;
1520
1521        let consol_sql = format!(
1522            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1523            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1524            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1525        );
1526        for &source_id in source_ids {
1527            zeph_db::query(&consol_sql)
1528                .bind(consolidated_id)
1529                .bind(source_id)
1530                .execute(&mut *tx)
1531                .await?;
1532
1533            // Mark original as consolidated so future sweeps skip it.
1534            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1535                .bind(source_id)
1536                .execute(&mut *tx)
1537                .await?;
1538        }
1539
1540        tx.commit().await?;
1541        Ok(true)
1542    }
1543
1544    /// Update an existing consolidated message in-place with new content.
1545    ///
1546    /// Atomically:
1547    /// 1. Updates `content` and `consolidation_confidence` on `target_id`.
1548    /// 2. Inserts rows into `memory_consolidation_sources` linking `target_id` → each source.
1549    /// 3. Marks each source message's `consolidated = 1`.
1550    ///
1551    /// If `confidence < confidence_threshold` the operation is skipped and `false` is returned.
1552    ///
1553    /// # Errors
1554    ///
1555    /// Returns an error if any database operation fails.
1556    pub async fn apply_consolidation_update(
1557        &self,
1558        target_id: MessageId,
1559        new_content: &str,
1560        additional_source_ids: &[MessageId],
1561        confidence: f32,
1562        confidence_threshold: f32,
1563    ) -> Result<bool, MemoryError> {
1564        if confidence < confidence_threshold {
1565            return Ok(false);
1566        }
1567
1568        let mut tx = self.pool.begin().await?;
1569
1570        zeph_db::query(sql!(
1571            "UPDATE messages SET content = ?, consolidation_confidence = ?, consolidated = 1 WHERE id = ?"
1572        ))
1573        .bind(new_content)
1574        .bind(confidence)
1575        .bind(target_id)
1576        .execute(&mut *tx)
1577        .await?;
1578
1579        let consol_sql = format!(
1580            "{} INTO memory_consolidation_sources (consolidated_id, source_id) VALUES (?, ?){}",
1581            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
1582            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
1583        );
1584        for &source_id in additional_source_ids {
1585            zeph_db::query(&consol_sql)
1586                .bind(target_id)
1587                .bind(source_id)
1588                .execute(&mut *tx)
1589                .await?;
1590
1591            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1592                .bind(source_id)
1593                .execute(&mut *tx)
1594                .await?;
1595        }
1596
1597        tx.commit().await?;
1598        Ok(true)
1599    }
1600
1601    // ── Forgetting sweep helpers ───────────────────────────────────────────────
1602
1603    /// Set `importance_score` for a single message by ID.
1604    ///
1605    /// Used in tests and by forgetting sweep helpers.
1606    ///
1607    /// # Errors
1608    ///
1609    /// Returns an error if the update fails.
1610    pub async fn set_importance_score(&self, id: MessageId, score: f64) -> Result<(), MemoryError> {
1611        zeph_db::query(sql!(
1612            "UPDATE messages SET importance_score = ? WHERE id = ? AND deleted_at IS NULL"
1613        ))
1614        .bind(score)
1615        .bind(id)
1616        .execute(&self.pool)
1617        .await?;
1618        Ok(())
1619    }
1620
1621    /// Get `importance_score` for a single message by ID.
1622    ///
1623    /// Returns `None` if the message does not exist or is deleted.
1624    ///
1625    /// # Errors
1626    ///
1627    /// Returns an error if the query fails.
1628    pub async fn get_importance_score(&self, id: MessageId) -> Result<Option<f64>, MemoryError> {
1629        let row: Option<(f64,)> = zeph_db::query_as(sql!(
1630            "SELECT importance_score FROM messages WHERE id = ? AND deleted_at IS NULL"
1631        ))
1632        .bind(id)
1633        .fetch_optional(&self.pool)
1634        .await?;
1635        Ok(row.map(|(s,)| s))
1636    }
1637
1638    /// Increment `access_count` and update `last_accessed` for a batch of messages.
1639    ///
1640    /// Alias used in forgetting tests; forwards to `increment_access_counts`.
1641    ///
1642    /// # Errors
1643    ///
1644    /// Returns an error if the update fails.
1645    pub async fn batch_increment_access_count(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
1646        self.increment_access_counts(ids).await
1647    }
1648
1649    /// Mark a set of messages as consolidated (`consolidated = 1`).
1650    ///
1651    /// Used in tests to simulate the state after consolidation.
1652    ///
1653    /// # Errors
1654    ///
1655    /// Returns an error if the update fails.
1656    pub async fn mark_messages_consolidated(&self, ids: &[i64]) -> Result<(), MemoryError> {
1657        for &id in ids {
1658            zeph_db::query(sql!("UPDATE messages SET consolidated = 1 WHERE id = ?"))
1659                .bind(id)
1660                .execute(&self.pool)
1661                .await?;
1662        }
1663        Ok(())
1664    }
1665
1666    /// Execute the three-phase forgetting sweep (`SleepGate`) inside a single transaction.
1667    ///
1668    /// **Phase 1** — Downscale all active non-consolidated importance scores by `decay_rate`.
1669    /// **Phase 2** — Undo the decay for messages accessed within `replay_window_hours` or
1670    ///   with `access_count >= replay_min_access_count` (undo current sweep's decay only).
1671    /// **Phase 3** — Soft-delete messages below `forgetting_floor` that are not protected
1672    ///   by recent access (`protect_recent_hours`) or high access count
1673    ///   (`protect_min_access_count`). Uses `batch_size` as a row-count cap.
1674    ///
1675    /// All phases commit atomically: concurrent readers see either the pre-sweep or
1676    /// post-sweep state, never an intermediate.
1677    ///
1678    /// # Errors
1679    ///
1680    /// Returns an error if any database operation fails.
1681    pub async fn run_forgetting_sweep_tx(
1682        &self,
1683        config: &zeph_common::config::memory::ForgettingConfig,
1684    ) -> Result<crate::forgetting::ForgettingResult, MemoryError> {
1685        let mut tx = self.pool.begin().await?;
1686
1687        let decay = f64::from(config.decay_rate);
1688        let floor = f64::from(config.forgetting_floor);
1689        let batch = i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX);
1690        let replay_hours = i64::from(config.replay_window_hours);
1691        let replay_min_access = i64::from(config.replay_min_access_count);
1692        let protect_hours = i64::from(config.protect_recent_hours);
1693        let protect_min_access = i64::from(config.protect_min_access_count);
1694
1695        // Phase 1: downscale all active, non-consolidated messages (limited to batch_size).
1696        // We target a specific set of IDs to respect sweep_batch_size.
1697        let candidate_ids: Vec<(MessageId,)> = zeph_db::query_as(sql!(
1698            "SELECT id FROM messages \
1699             WHERE deleted_at IS NULL AND consolidated = 0 \
1700             ORDER BY importance_score ASC \
1701             LIMIT ?"
1702        ))
1703        .bind(batch)
1704        .fetch_all(&mut *tx)
1705        .await?;
1706
1707        #[allow(clippy::cast_possible_truncation)]
1708        let downscaled = candidate_ids.len() as u32;
1709
1710        if downscaled > 0 {
1711            let placeholders: String = candidate_ids
1712                .iter()
1713                .map(|_| "?")
1714                .collect::<Vec<_>>()
1715                .join(",");
1716            let downscale_sql = format!(
1717                "UPDATE messages SET importance_score = importance_score * (1.0 - {decay}) \
1718                 WHERE id IN ({placeholders})"
1719            );
1720            let mut q = zeph_db::query(&downscale_sql);
1721            for &(id,) in &candidate_ids {
1722                q = q.bind(id);
1723            }
1724            q.execute(&mut *tx).await?;
1725        }
1726
1727        // Phase 2: selective replay — undo decay for recently-accessed messages.
1728        // Formula: score / (1 - decay_rate) restores the current sweep's downscaling.
1729        // Cap at 1.0 to avoid exceeding the maximum importance score.
1730        // Scoped to the Phase 1 batch only: messages not decayed this sweep must not
1731        // have their scores inflated by the inverse formula.
1732        let replayed = if downscaled > 0 {
1733            let replay_placeholders: String = candidate_ids
1734                .iter()
1735                .map(|_| "?")
1736                .collect::<Vec<_>>()
1737                .join(",");
1738            let replay_sql = format!(
1739                "UPDATE messages \
1740                 SET importance_score = MIN(1.0, importance_score / (1.0 - {decay})) \
1741                 WHERE id IN ({replay_placeholders}) \
1742                 AND (\
1743                     (last_accessed IS NOT NULL \
1744                      AND last_accessed >= datetime('now', '-' || ? || ' hours')) \
1745                     OR access_count >= ?\
1746                 )"
1747            );
1748            let mut rq = zeph_db::query(&replay_sql);
1749            for &(id,) in &candidate_ids {
1750                rq = rq.bind(id);
1751            }
1752            let replay_result = rq
1753                .bind(replay_hours)
1754                .bind(replay_min_access)
1755                .execute(&mut *tx)
1756                .await?;
1757            #[allow(clippy::cast_possible_truncation)]
1758            let n = replay_result.rows_affected() as u32;
1759            n
1760        } else {
1761            0
1762        };
1763
1764        // Phase 3: targeted forgetting — soft-delete low-score unprotected messages.
1765        let prune_sql = format!(
1766            "UPDATE messages \
1767             SET deleted_at = CURRENT_TIMESTAMP \
1768             WHERE deleted_at IS NULL AND consolidated = 0 \
1769             AND importance_score < {floor} \
1770             AND (\
1771                 last_accessed IS NULL \
1772                 OR last_accessed < datetime('now', '-' || ? || ' hours')\
1773             ) \
1774             AND access_count < ?"
1775        );
1776        let prune_result = zeph_db::query(&prune_sql)
1777            .bind(protect_hours)
1778            .bind(protect_min_access)
1779            .execute(&mut *tx)
1780            .await?;
1781        #[allow(clippy::cast_possible_truncation)]
1782        let pruned = prune_result.rows_affected() as u32;
1783
1784        tx.commit().await?;
1785
1786        Ok(crate::forgetting::ForgettingResult {
1787            downscaled,
1788            replayed,
1789            pruned,
1790        })
1791    }
1792}
1793
1794/// A candidate message for tier promotion, returned by [`SqliteStore::find_promotion_candidates`].
1795#[derive(Debug, Clone)]
1796pub struct PromotionCandidate {
1797    pub id: MessageId,
1798    pub conversation_id: ConversationId,
1799    pub content: String,
1800    pub session_count: u32,
1801    pub importance_score: f64,
1802}
1803
1804#[cfg(test)]
1805mod tests;