Skip to main content

zeph_memory/semantic/
cross_session.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_llm::provider::LlmProvider as _;
5
6use crate::error::MemoryError;
7use crate::types::ConversationId;
8use crate::vector_store::{FieldCondition, FieldValue, VectorFilter};
9
10use super::{SESSION_SUMMARIES_COLLECTION, SemanticMemory};
11
12#[derive(Debug, Clone)]
13pub struct SessionSummaryResult {
14    pub summary_text: String,
15    pub score: f32,
16    pub conversation_id: ConversationId,
17}
18
19impl SemanticMemory {
20    /// Check whether a session summary already exists for the given conversation.
21    ///
22    /// Returns `true` if at least one session summary is stored in `SQLite` for this conversation.
23    /// Used as the primary guard in the shutdown summary path to handle cases where hard
24    /// compaction fired but its Qdrant write failed (the `SQLite` record is the authoritative source).
25    ///
26    /// # Errors
27    ///
28    /// Returns an error if the database query fails.
29    pub async fn has_session_summary(
30        &self,
31        conversation_id: ConversationId,
32    ) -> Result<bool, MemoryError> {
33        let summaries = self.sqlite.load_summaries(conversation_id).await?;
34        Ok(!summaries.is_empty())
35    }
36
37    /// Store a shutdown session summary: persists to `SQLite`, embeds into the
38    /// `zeph_session_summaries` Qdrant collection (so cross-session search can find it),
39    /// and stores key facts into the key-facts collection.
40    ///
41    /// Unlike the hard-compaction path, `first_message_id` and `last_message_id` are `None`
42    /// because the shutdown hook does not track exact message boundaries.
43    ///
44    /// # Errors
45    ///
46    /// Returns an error if the `SQLite` insert fails. Qdrant errors are logged as warnings
47    /// and do not propagate — the `SQLite` record is the authoritative summary store.
48    pub async fn store_shutdown_summary(
49        &self,
50        conversation_id: ConversationId,
51        summary_text: &str,
52        key_facts: &[String],
53    ) -> Result<(), MemoryError> {
54        let token_estimate =
55            i64::try_from(self.token_counter.count_tokens(summary_text)).unwrap_or(0);
56        // Persist to SQLite first — this is the authoritative record and the source of truth
57        // for has_session_summary(). NULL message range = session-level summary.
58        let summary_id = self
59            .sqlite
60            .save_summary(conversation_id, summary_text, None, None, token_estimate)
61            .await?;
62
63        // Embed into SESSION_SUMMARIES_COLLECTION so search_session_summaries() can find it.
64        if let Err(e) = self
65            .store_session_summary(conversation_id, summary_text)
66            .await
67        {
68            tracing::warn!("shutdown summary: failed to embed into session summaries: {e:#}");
69        }
70
71        if !key_facts.is_empty() {
72            self.store_key_facts(conversation_id, summary_id, key_facts)
73                .await;
74        }
75
76        tracing::debug!(
77            conversation_id = conversation_id.0,
78            summary_id,
79            "stored shutdown session summary"
80        );
81        Ok(())
82    }
83
84    /// Store a session summary into the dedicated `zeph_session_summaries` Qdrant collection.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if embedding or Qdrant storage fails.
89    pub async fn store_session_summary(
90        &self,
91        conversation_id: ConversationId,
92        summary_text: &str,
93    ) -> Result<(), MemoryError> {
94        let Some(qdrant) = &self.qdrant else {
95            return Ok(());
96        };
97        if !self.effective_embed_provider().supports_embeddings() {
98            return Ok(());
99        }
100
101        let vector = match tokio::time::timeout(
102            std::time::Duration::from_secs(5),
103            self.effective_embed_provider().embed(summary_text),
104        )
105        .await
106        {
107            Ok(Ok(v)) => v,
108            Ok(Err(e)) => return Err(e.into()),
109            Err(_) => {
110                tracing::warn!("store_session_summary: embed timed out — skipping store");
111                return Ok(());
112            }
113        };
114        let vector_size = u64::try_from(vector.len()).unwrap_or(896);
115        qdrant
116            .ensure_named_collection(SESSION_SUMMARIES_COLLECTION, vector_size)
117            .await?;
118
119        let point_id = {
120            const NS: uuid::Uuid = uuid::Uuid::NAMESPACE_OID;
121            uuid::Uuid::new_v5(&NS, conversation_id.0.to_string().as_bytes()).to_string()
122        };
123        let payload = serde_json::json!({
124            "conversation_id": conversation_id.0,
125            "summary_text": summary_text,
126        });
127
128        qdrant
129            .upsert_to_collection(SESSION_SUMMARIES_COLLECTION, &point_id, payload, vector)
130            .await?;
131
132        tracing::debug!(
133            conversation_id = conversation_id.0,
134            "stored session summary"
135        );
136        Ok(())
137    }
138
139    /// Search session summaries from other conversations.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if embedding or Qdrant search fails.
144    #[cfg_attr(
145        feature = "profiling",
146        tracing::instrument(name = "memory.cross_session", skip_all, fields(result_count = tracing::field::Empty))
147    )]
148    pub async fn search_session_summaries(
149        &self,
150        query: &str,
151        limit: usize,
152        exclude_conversation_id: Option<ConversationId>,
153    ) -> Result<Vec<SessionSummaryResult>, MemoryError> {
154        let Some(qdrant) = &self.qdrant else {
155            tracing::debug!("session-summaries: skipped, no vector store");
156            return Ok(Vec::new());
157        };
158        if !self.effective_embed_provider().supports_embeddings() {
159            tracing::debug!("session-summaries: skipped, no embedding support");
160            return Ok(Vec::new());
161        }
162
163        let vector = match tokio::time::timeout(
164            std::time::Duration::from_secs(5),
165            self.effective_embed_provider().embed(query),
166        )
167        .await
168        {
169            Ok(Ok(v)) => v,
170            Ok(Err(e)) => return Err(e.into()),
171            Err(_) => {
172                tracing::warn!(
173                    "search_session_summaries: embed timed out, returning empty results"
174                );
175                return Ok(Vec::new());
176            }
177        };
178        let vector_size = u64::try_from(vector.len()).unwrap_or(896);
179        qdrant
180            .ensure_named_collection(SESSION_SUMMARIES_COLLECTION, vector_size)
181            .await?;
182
183        let filter = exclude_conversation_id.map(|cid| VectorFilter {
184            must: vec![],
185            must_not: vec![FieldCondition {
186                field: "conversation_id".into(),
187                value: FieldValue::Integer(cid.0),
188            }],
189        });
190
191        let points = qdrant
192            .search_collection(SESSION_SUMMARIES_COLLECTION, &vector, limit, filter)
193            .await?;
194
195        tracing::debug!(
196            results = points.len(),
197            limit,
198            exclude_conversation_id = exclude_conversation_id.map(|c| c.0),
199            "session-summaries: search complete"
200        );
201
202        let results = points
203            .into_iter()
204            .filter_map(|point| {
205                let summary_text = point.payload.get("summary_text")?.as_str()?.to_owned();
206                let conversation_id =
207                    ConversationId(point.payload.get("conversation_id")?.as_i64()?);
208                Some(SessionSummaryResult {
209                    summary_text,
210                    score: point.score,
211                    conversation_id,
212                })
213            })
214            .collect();
215
216        Ok(results)
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use zeph_llm::any::AnyProvider;
223    use zeph_llm::mock::MockProvider;
224
225    use crate::types::MessageId;
226
227    use super::*;
228
229    async fn make_memory() -> SemanticMemory {
230        SemanticMemory::new(
231            ":memory:",
232            "http://127.0.0.1:1",
233            None,
234            AnyProvider::Mock(MockProvider::default()),
235            "test-model",
236        )
237        .await
238        .unwrap()
239    }
240
241    /// Insert a real message into the conversation and return its `MessageId`.
242    /// Required because the `summaries` table has FK constraints on `messages.id`.
243    async fn insert_message(memory: &SemanticMemory, cid: ConversationId) -> MessageId {
244        memory
245            .sqlite()
246            .save_message(cid, "user", "test message")
247            .await
248            .unwrap()
249    }
250
251    #[tokio::test]
252    async fn has_session_summary_returns_false_when_no_summaries() {
253        let memory = make_memory().await;
254        let cid = memory.sqlite().create_conversation().await.unwrap();
255
256        let result = memory.has_session_summary(cid).await.unwrap();
257        assert!(!result, "new conversation must have no summaries");
258    }
259
260    #[tokio::test]
261    async fn has_session_summary_returns_true_after_summary_stored_via_sqlite() {
262        let memory = make_memory().await;
263        let cid = memory.sqlite().create_conversation().await.unwrap();
264        let msg_id = insert_message(&memory, cid).await;
265
266        // Use sqlite directly to insert a valid summary with real FK references.
267        memory
268            .sqlite()
269            .save_summary(
270                cid,
271                "session about Rust and async",
272                Some(msg_id),
273                Some(msg_id),
274                10,
275            )
276            .await
277            .unwrap();
278
279        let result = memory.has_session_summary(cid).await.unwrap();
280        assert!(result, "must return true after a summary is persisted");
281    }
282
283    #[tokio::test]
284    async fn has_session_summary_is_isolated_per_conversation() {
285        let memory = make_memory().await;
286        let cid_a = memory.sqlite().create_conversation().await.unwrap();
287        let cid_b = memory.sqlite().create_conversation().await.unwrap();
288        let msg_id = insert_message(&memory, cid_a).await;
289
290        memory
291            .sqlite()
292            .save_summary(
293                cid_a,
294                "summary for conversation A",
295                Some(msg_id),
296                Some(msg_id),
297                5,
298            )
299            .await
300            .unwrap();
301
302        assert!(
303            memory.has_session_summary(cid_a).await.unwrap(),
304            "cid_a must have a summary"
305        );
306        assert!(
307            !memory.has_session_summary(cid_b).await.unwrap(),
308            "cid_b must not be affected by cid_a summary"
309        );
310    }
311
312    #[test]
313    fn store_session_summary_point_id_is_deterministic() {
314        // Same conversation_id must always produce the same UUID v5 point ID,
315        // ensuring that repeated compaction calls upsert rather than insert a new point.
316        const NS: uuid::Uuid = uuid::Uuid::NAMESPACE_OID;
317        let cid = ConversationId(42);
318        let id1 = uuid::Uuid::new_v5(&NS, cid.0.to_string().as_bytes()).to_string();
319        let id2 = uuid::Uuid::new_v5(&NS, cid.0.to_string().as_bytes()).to_string();
320        assert_eq!(
321            id1, id2,
322            "point_id must be deterministic for the same conversation_id"
323        );
324
325        let cid2 = ConversationId(43);
326        let id3 = uuid::Uuid::new_v5(&NS, cid2.0.to_string().as_bytes()).to_string();
327        assert_ne!(
328            id1, id3,
329            "different conversation_ids must produce different point_ids"
330        );
331    }
332
333    #[test]
334    fn store_session_summary_point_id_boundary_ids() {
335        // conversation_id = 0 and negative values are valid i64 variants — confirm they produce
336        // valid, distinct, and stable UUIDs.
337        const NS: uuid::Uuid = uuid::Uuid::NAMESPACE_OID;
338
339        let id_zero_a = uuid::Uuid::new_v5(&NS, ConversationId(0).0.to_string().as_bytes());
340        let id_zero_b = uuid::Uuid::new_v5(&NS, ConversationId(0).0.to_string().as_bytes());
341        assert_eq!(id_zero_a, id_zero_b, "zero conversation_id must be stable");
342
343        let id_neg = uuid::Uuid::new_v5(&NS, ConversationId(-1).0.to_string().as_bytes());
344        assert_ne!(
345            id_zero_a, id_neg,
346            "zero and -1 conversation_ids must produce different point_ids"
347        );
348
349        // Confirm the UUID version is 5 (deterministic SHA-1 name-based).
350        assert_eq!(
351            id_zero_a.get_version_num(),
352            5,
353            "generated UUID must be version 5"
354        );
355    }
356
357    #[tokio::test]
358    async fn store_shutdown_summary_succeeds_with_null_message_ids() {
359        let memory = make_memory().await;
360        let cid = memory.sqlite().create_conversation().await.unwrap();
361
362        let result = memory
363            .store_shutdown_summary(cid, "summary text", &[])
364            .await;
365
366        assert!(
367            result.is_ok(),
368            "shutdown summary must succeed without messages"
369        );
370        assert!(
371            memory.has_session_summary(cid).await.unwrap(),
372            "SQLite must record the shutdown summary"
373        );
374    }
375}