Skip to main content

zeph_memory/store/
compression_guidelines.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! SQLite-backed store for ACON compression guidelines and failure pairs.
5
6use std::borrow::Cow;
7use std::sync::LazyLock;
8#[allow(unused_imports)]
9use zeph_db::sql;
10
11use regex::Regex;
12
13use crate::error::MemoryError;
14use crate::store::SqliteStore;
15use crate::types::ConversationId;
16
17static SECRET_RE: LazyLock<Regex> = LazyLock::new(|| {
18    Regex::new(
19        r#"(?:sk-|sk_live_|sk_test_|AKIA|ghp_|gho_|-----BEGIN|xoxb-|xoxp-|AIza|ya29\.|glpat-|hf_|npm_|dckr_pat_)[^\s"'`,;\{\}\[\]]*"#,
20    )
21    .expect("secret regex")
22});
23
24static PATH_RE: LazyLock<Regex> = LazyLock::new(|| {
25    Regex::new(r#"(?:/home/|/Users/|/root/|/tmp/|/var/)[^\s"'`,;\{\}\[\]]*"#).expect("path regex")
26});
27
28/// Matches `Authorization: Bearer <token>` headers; captures the token value for redaction.
29static BEARER_RE: LazyLock<Regex> =
30    LazyLock::new(|| Regex::new(r"(?i)(Authorization:\s*Bearer\s+)\S+").expect("bearer regex"));
31
32/// Matches standalone JWT tokens (three Base64url-encoded parts separated by dots).
33/// The signature segment uses `*` to handle `alg=none` JWTs with an empty signature.
34static JWT_RE: LazyLock<Regex> = LazyLock::new(|| {
35    Regex::new(r"eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]*").expect("jwt regex")
36});
37
38/// Redact secrets and filesystem paths from text before persistent storage.
39///
40/// Returns `Cow::Borrowed` when no sensitive content is found (zero-alloc fast path).
41pub(crate) fn redact_sensitive(text: &str) -> Cow<'_, str> {
42    // Each replace_all may return Cow::Borrowed (no match) or Cow::Owned (replaced).
43    // We materialise intermediate Owned values into String so that subsequent steps
44    // do not hold a borrow of a local.
45    let s0: Cow<'_, str> = SECRET_RE.replace_all(text, "[REDACTED]");
46    let s1: Cow<'_, str> = match PATH_RE.replace_all(s0.as_ref(), "[PATH]") {
47        Cow::Borrowed(_) => s0,
48        Cow::Owned(o) => Cow::Owned(o),
49    };
50    // Replace only the token value in Bearer headers, keeping the header name intact.
51    let s2: Cow<'_, str> = match BEARER_RE.replace_all(s1.as_ref(), "${1}[REDACTED]") {
52        Cow::Borrowed(_) => s1,
53        Cow::Owned(o) => Cow::Owned(o),
54    };
55    match JWT_RE.replace_all(s2.as_ref(), "[REDACTED_JWT]") {
56        Cow::Borrowed(_) => s2,
57        Cow::Owned(o) => Cow::Owned(o),
58    }
59}
60
61/// A recorded compression failure pair: the compressed context and the response
62/// that indicated context was lost.
63#[derive(Debug, Clone)]
64pub struct CompressionFailurePair {
65    pub id: i64,
66    pub conversation_id: ConversationId,
67    pub compressed_context: String,
68    pub failure_reason: String,
69    pub category: String,
70    pub created_at: String,
71}
72
73/// Maximum characters stored per `compressed_context` or `failure_reason` field.
74const MAX_FIELD_CHARS: usize = 4096;
75
76fn truncate_field(s: &str) -> &str {
77    let mut idx = MAX_FIELD_CHARS;
78    while idx > 0 && !s.is_char_boundary(idx) {
79        idx -= 1;
80    }
81    &s[..idx.min(s.len())]
82}
83
84impl SqliteStore {
85    /// Load the latest active compression guidelines.
86    ///
87    /// When `conversation_id` is `Some`, returns conversation-specific guidelines
88    /// preferred over global (NULL) ones. When `None`, returns only global guidelines.
89    ///
90    /// Returns `(version, guidelines_text)`. Returns `(0, "")` if no guidelines exist yet.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the database query fails.
95    pub async fn load_compression_guidelines(
96        &self,
97        conversation_id: Option<ConversationId>,
98    ) -> Result<(i64, String), MemoryError> {
99        let row = zeph_db::query_as::<_, (i64, String)>(sql!(
100            // When conversation_id is Some(cid): `conversation_id = cid` matches
101            // conversation-specific rows; `conversation_id IS NULL` matches global rows.
102            // The CASE ensures conversation-specific rows sort before global ones.
103            // When conversation_id is None: `conversation_id = NULL` is always false in SQL,
104            // so only `conversation_id IS NULL` rows match — correct global-only behavior.
105            "SELECT version, guidelines FROM compression_guidelines \
106             WHERE conversation_id = ? OR conversation_id IS NULL \
107             ORDER BY CASE WHEN conversation_id IS NOT NULL THEN 0 ELSE 1 END, \
108                      version DESC \
109             LIMIT 1"
110        ))
111        .bind(conversation_id.map(|c| c.0))
112        .fetch_optional(&self.pool)
113        .await?;
114
115        Ok(row.unwrap_or((0, String::new())))
116    }
117
118    /// Load only the version and creation timestamp of the latest active compression guidelines.
119    ///
120    /// Same scoping rules as [`load_compression_guidelines`]: conversation-specific rows are
121    /// preferred over global ones.  Returns `(0, "")` if no guidelines exist yet.
122    ///
123    /// Use this in hot paths where the full text is not needed (e.g. metrics sync).
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the database query fails.
128    pub async fn load_compression_guidelines_meta(
129        &self,
130        conversation_id: Option<ConversationId>,
131    ) -> Result<(i64, String), MemoryError> {
132        let row = zeph_db::query_as::<_, (i64, String)>(sql!(
133            "SELECT version, created_at FROM compression_guidelines \
134             WHERE conversation_id = ? OR conversation_id IS NULL \
135             ORDER BY CASE WHEN conversation_id IS NOT NULL THEN 0 ELSE 1 END, \
136                      version DESC \
137             LIMIT 1"
138        ))
139        .bind(conversation_id.map(|c| c.0)) // lgtm[rust/cleartext-logging]
140        .fetch_optional(&self.pool)
141        .await?;
142
143        Ok(row.unwrap_or((0, String::new())))
144    }
145
146    /// Save a new version of the compression guidelines.
147    ///
148    /// When `conversation_id` is `Some`, the guidelines are scoped to that conversation.
149    /// When `None`, the guidelines are global (apply as fallback for all conversations).
150    ///
151    /// Inserts a new row; older versions are retained for audit.
152    /// Returns the new version number.
153    ///
154    /// Note: version numbers are globally sequential across all conversation scopes —
155    /// they are not per-conversation counters. The UNIQUE(version) constraint from
156    /// migration 033 is preserved.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the database insert fails (including FK violation if
161    /// `conversation_id` does not reference a valid conversation row).
162    pub async fn save_compression_guidelines(
163        &self,
164        guidelines: &str,
165        token_count: i64,
166        conversation_id: Option<ConversationId>,
167    ) -> Result<i64, MemoryError> {
168        // The INSERT...SELECT computes MAX(version)+1 across all rows (global + per-conversation)
169        // and inserts it in a single statement. SQLite's single-writer WAL guarantee makes this
170        // atomic — no concurrent writer can observe the same MAX and produce a duplicate version.
171        let new_version: i64 = zeph_db::query_scalar(
172            sql!("INSERT INTO compression_guidelines (version, guidelines, token_count, conversation_id) \
173             SELECT COALESCE(MAX(version), 0) + 1, ?, ?, ? \
174             FROM compression_guidelines \
175             RETURNING version"),
176        )
177        .bind(guidelines)
178        .bind(token_count)
179        .bind(conversation_id.map(|c| c.0))
180        .fetch_one(&self.pool)
181        .await?;
182        Ok(new_version)
183    }
184
185    /// Log a compression failure pair.
186    ///
187    /// Both `compressed_context` and `failure_reason` are truncated to 4096 chars.
188    /// `category` should be one of: `tool_output`, `assistant_reasoning`, `user_context`, `unknown`.
189    /// Returns the inserted row id.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if the database insert fails.
194    pub async fn log_compression_failure(
195        &self,
196        conversation_id: ConversationId,
197        compressed_context: &str,
198        failure_reason: &str,
199        category: &str,
200    ) -> Result<i64, MemoryError> {
201        let ctx = redact_sensitive(compressed_context);
202        let ctx = truncate_field(&ctx);
203        let reason = redact_sensitive(failure_reason);
204        let reason = truncate_field(&reason);
205        let id = zeph_db::query_scalar(sql!(
206            "INSERT INTO compression_failure_pairs \
207             (conversation_id, compressed_context, failure_reason, category) \
208             VALUES (?, ?, ?, ?) RETURNING id"
209        ))
210        .bind(conversation_id.0)
211        .bind(ctx)
212        .bind(reason)
213        .bind(category)
214        .fetch_one(&self.pool)
215        .await?;
216        Ok(id)
217    }
218
219    /// Get unused failure pairs (oldest first), up to `limit`.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if the database query fails.
224    pub async fn get_unused_failure_pairs(
225        &self,
226        limit: usize,
227    ) -> Result<Vec<CompressionFailurePair>, MemoryError> {
228        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
229        let rows = zeph_db::query_as::<_, (i64, i64, String, String, String, String)>(sql!(
230            "SELECT id, conversation_id, compressed_context, failure_reason, category, created_at \
231             FROM compression_failure_pairs \
232             WHERE used_in_update = 0 \
233             ORDER BY created_at ASC \
234             LIMIT ?"
235        ))
236        .bind(limit)
237        .fetch_all(&self.pool)
238        .await?;
239
240        Ok(rows
241            .into_iter()
242            .map(
243                |(id, cid, ctx, reason, category, created_at)| CompressionFailurePair {
244                    id,
245                    conversation_id: ConversationId(cid),
246                    compressed_context: ctx,
247                    failure_reason: reason,
248                    category,
249                    created_at,
250                },
251            )
252            .collect())
253    }
254
255    /// Get unused failure pairs for a specific category (oldest first), up to `limit`.
256    ///
257    /// Used by the categorized ACON updater to run per-category update cycles.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the database query fails.
262    pub async fn get_unused_failure_pairs_by_category(
263        &self,
264        category: &str,
265        limit: usize,
266    ) -> Result<Vec<CompressionFailurePair>, MemoryError> {
267        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
268        let rows = zeph_db::query_as::<_, (i64, i64, String, String, String, String)>(sql!(
269            "SELECT id, conversation_id, compressed_context, failure_reason, category, created_at \
270             FROM compression_failure_pairs \
271             WHERE used_in_update = 0 AND category = ? \
272             ORDER BY created_at ASC \
273             LIMIT ?"
274        ))
275        .bind(category)
276        .bind(limit)
277        .fetch_all(&self.pool)
278        .await?;
279
280        Ok(rows
281            .into_iter()
282            .map(
283                |(id, cid, ctx, reason, cat, created_at)| CompressionFailurePair {
284                    id,
285                    conversation_id: ConversationId(cid),
286                    compressed_context: ctx,
287                    failure_reason: reason,
288                    category: cat,
289                    created_at,
290                },
291            )
292            .collect())
293    }
294
295    /// Count unused failure pairs for a specific category.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if the database query fails.
300    pub async fn count_unused_failure_pairs_by_category(
301        &self,
302        category: &str,
303    ) -> Result<i64, MemoryError> {
304        let count = zeph_db::query_scalar(sql!(
305            "SELECT COUNT(*) FROM compression_failure_pairs \
306             WHERE used_in_update = 0 AND category = ?"
307        ))
308        .bind(category)
309        .fetch_one(&self.pool)
310        .await?;
311        Ok(count)
312    }
313
314    /// Load the latest compression guidelines for a specific category.
315    ///
316    /// When `conversation_id` is `Some`, prefers conversation-specific rows.
317    /// Returns `(0, "")` if no guidelines exist for this category.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the database query fails.
322    pub async fn load_compression_guidelines_by_category(
323        &self,
324        category: &str,
325        conversation_id: Option<ConversationId>,
326    ) -> Result<(i64, String), MemoryError> {
327        let row = zeph_db::query_as::<_, (i64, String)>(sql!(
328            "SELECT version, guidelines FROM compression_guidelines \
329             WHERE category = ? \
330             AND (conversation_id = ? OR conversation_id IS NULL) \
331             ORDER BY CASE WHEN conversation_id IS NOT NULL THEN 0 ELSE 1 END, \
332                      version DESC \
333             LIMIT 1"
334        ))
335        .bind(category)
336        .bind(conversation_id.map(|c| c.0))
337        .fetch_optional(&self.pool)
338        .await?;
339
340        Ok(row.unwrap_or((0, String::new())))
341    }
342
343    /// Save a new version of compression guidelines for a specific category.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if the database insert fails.
348    pub async fn save_compression_guidelines_with_category(
349        &self,
350        guidelines: &str,
351        token_count: i64,
352        category: &str,
353        conversation_id: Option<ConversationId>,
354    ) -> Result<i64, MemoryError> {
355        let new_version: i64 = zeph_db::query_scalar(sql!(
356            "INSERT INTO compression_guidelines \
357             (version, category, guidelines, token_count, conversation_id) \
358             SELECT COALESCE(MAX(version), 0) + 1, ?, ?, ?, ? \
359             FROM compression_guidelines \
360             RETURNING version"
361        ))
362        .bind(category)
363        .bind(guidelines)
364        .bind(token_count)
365        .bind(conversation_id.map(|c| c.0))
366        .fetch_one(&self.pool)
367        .await?;
368        Ok(new_version)
369    }
370
371    /// Mark failure pairs as consumed by the updater.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if the database update fails.
376    pub async fn mark_failure_pairs_used(&self, ids: &[i64]) -> Result<(), MemoryError> {
377        if ids.is_empty() {
378            return Ok(());
379        }
380        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
381        let query = format!(
382            "UPDATE compression_failure_pairs SET used_in_update = 1 WHERE id IN ({placeholders})"
383        );
384        let mut q = zeph_db::query(&query);
385        for id in ids {
386            q = q.bind(id);
387        }
388        q.execute(&self.pool).await?;
389        Ok(())
390    }
391
392    /// Count unused failure pairs.
393    ///
394    /// # Errors
395    ///
396    /// Returns an error if the database query fails.
397    pub async fn count_unused_failure_pairs(&self) -> Result<i64, MemoryError> {
398        let count = zeph_db::query_scalar(sql!(
399            "SELECT COUNT(*) FROM compression_failure_pairs WHERE used_in_update = 0"
400        ))
401        .fetch_one(&self.pool)
402        .await?;
403        Ok(count)
404    }
405
406    /// Delete old used failure pairs, keeping the most recent `keep_recent` unused pairs.
407    ///
408    /// Removes all rows where `used_in_update = 1`. Unused rows are managed by the
409    /// `max_stored_pairs` enforcement below: if there are more than `keep_recent` unused pairs,
410    /// the oldest excess rows are deleted.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the database query fails.
415    pub async fn cleanup_old_failure_pairs(&self, keep_recent: usize) -> Result<(), MemoryError> {
416        // Delete all used pairs (they've already been processed).
417        zeph_db::query(sql!(
418            "DELETE FROM compression_failure_pairs WHERE used_in_update = 1"
419        ))
420        .execute(&self.pool)
421        .await?;
422
423        // Keep only the most recent `keep_recent` unused pairs.
424        let keep = i64::try_from(keep_recent).unwrap_or(i64::MAX);
425        zeph_db::query(sql!(
426            "DELETE FROM compression_failure_pairs \
427             WHERE used_in_update = 0 \
428             AND id NOT IN ( \
429                 SELECT id FROM compression_failure_pairs \
430                 WHERE used_in_update = 0 \
431                 ORDER BY created_at DESC \
432                 LIMIT ? \
433             )"
434        ))
435        .bind(keep)
436        .execute(&self.pool)
437        .await?;
438
439        Ok(())
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446
447    // pool_size=1 is required: SQLite :memory: creates an isolated database per
448    // connection, so multiple connections would each see an empty schema.
449    async fn make_store() -> SqliteStore {
450        SqliteStore::with_pool_size(":memory:", 1)
451            .await
452            .expect("in-memory SqliteStore")
453    }
454
455    #[tokio::test]
456    async fn load_guidelines_meta_returns_defaults_when_empty() {
457        let store = make_store().await;
458        let (version, created_at) = store.load_compression_guidelines_meta(None).await.unwrap();
459        assert_eq!(version, 0);
460        assert!(created_at.is_empty());
461    }
462
463    #[tokio::test]
464    async fn load_guidelines_meta_returns_version_and_created_at() {
465        let store = make_store().await;
466        store
467            .save_compression_guidelines("keep file paths", 4, None)
468            .await
469            .unwrap();
470        let (version, created_at) = store.load_compression_guidelines_meta(None).await.unwrap();
471        assert_eq!(version, 1);
472        assert!(!created_at.is_empty(), "created_at should be populated");
473    }
474
475    #[tokio::test]
476    async fn load_guidelines_returns_defaults_when_empty() {
477        let store = make_store().await;
478        let (version, text) = store.load_compression_guidelines(None).await.unwrap();
479        assert_eq!(version, 0);
480        assert!(text.is_empty());
481    }
482
483    #[tokio::test]
484    async fn save_and_load_guidelines() {
485        let store = make_store().await;
486        let v1 = store
487            .save_compression_guidelines("always preserve file paths", 4, None)
488            .await
489            .unwrap();
490        assert_eq!(v1, 1);
491        let v2 = store
492            .save_compression_guidelines(
493                "always preserve file paths\nalways preserve errors",
494                8,
495                None,
496            )
497            .await
498            .unwrap();
499        assert_eq!(v2, 2);
500        // Loading should return the latest version.
501        let (v, text) = store.load_compression_guidelines(None).await.unwrap();
502        assert_eq!(v, 2);
503        assert!(text.contains("errors"));
504    }
505
506    #[tokio::test]
507    async fn load_guidelines_prefers_conversation_specific() {
508        let store = make_store().await;
509        let cid = ConversationId(store.create_conversation().await.unwrap().0);
510        store
511            .save_compression_guidelines("global rule", 2, None)
512            .await
513            .unwrap();
514        store
515            .save_compression_guidelines("conversation rule", 2, Some(cid))
516            .await
517            .unwrap();
518        let (_, text) = store.load_compression_guidelines(Some(cid)).await.unwrap();
519        assert_eq!(text, "conversation rule");
520    }
521
522    #[tokio::test]
523    async fn load_guidelines_falls_back_to_global() {
524        let store = make_store().await;
525        let cid = ConversationId(store.create_conversation().await.unwrap().0);
526        store
527            .save_compression_guidelines("global rule", 2, None)
528            .await
529            .unwrap();
530        // No conversation-specific guidelines; should fall back to global.
531        let (_, text) = store.load_compression_guidelines(Some(cid)).await.unwrap();
532        assert_eq!(text, "global rule");
533    }
534
535    #[tokio::test]
536    async fn load_guidelines_none_returns_global_only() {
537        let store = make_store().await;
538        let cid = ConversationId(store.create_conversation().await.unwrap().0);
539        store
540            .save_compression_guidelines("conversation rule", 2, Some(cid))
541            .await
542            .unwrap();
543        // None should not return conversation-scoped guidelines.
544        let (version, text) = store.load_compression_guidelines(None).await.unwrap();
545        assert_eq!(version, 0);
546        assert!(text.is_empty());
547    }
548
549    #[tokio::test]
550    async fn load_guidelines_scope_isolation() {
551        let store = make_store().await;
552        let cid_a = ConversationId(store.create_conversation().await.unwrap().0);
553        let cid_b = ConversationId(store.create_conversation().await.unwrap().0);
554
555        // Global guideline (conversation_id = None) — visible to all conversations.
556        store
557            .save_compression_guidelines("Use bullet points", 1, None)
558            .await
559            .unwrap();
560        // Conversation-A-specific guideline — must NOT be visible to B.
561        store
562            .save_compression_guidelines("Be concise", 2, Some(cid_a))
563            .await
564            .unwrap();
565
566        // Conversation B: gets only the global guideline, not A's.
567        let (_, text_b) = store
568            .load_compression_guidelines(Some(cid_b))
569            .await
570            .unwrap();
571        assert_eq!(
572            text_b, "Use bullet points",
573            "conversation B must see global guideline"
574        );
575
576        // Conversation A: gets its own guideline (preferred over global).
577        let (_, text_a) = store
578            .load_compression_guidelines(Some(cid_a))
579            .await
580            .unwrap();
581        assert_eq!(
582            text_a, "Be concise",
583            "conversation A must prefer its own guideline over global"
584        );
585
586        // None scope: gets only the global guideline.
587        let (_, text_global) = store.load_compression_guidelines(None).await.unwrap();
588        assert_eq!(
589            text_global, "Use bullet points",
590            "None scope must see only the global guideline"
591        );
592    }
593
594    #[tokio::test]
595    async fn save_with_nonexistent_conversation_id_fails() {
596        let store = make_store().await;
597        let nonexistent = ConversationId(99999);
598        let result = store
599            .save_compression_guidelines("rule", 1, Some(nonexistent))
600            .await;
601        assert!(
602            result.is_err(),
603            "FK violation expected for nonexistent conversation_id"
604        );
605    }
606
607    #[tokio::test]
608    async fn cascade_delete_removes_conversation_guidelines() {
609        let store = make_store().await;
610        let cid = ConversationId(store.create_conversation().await.unwrap().0);
611        store
612            .save_compression_guidelines("rule", 1, Some(cid))
613            .await
614            .unwrap();
615        // Delete the conversation row directly — should cascade-delete the guideline.
616        zeph_db::query(sql!("DELETE FROM conversations WHERE id = ?"))
617            .bind(cid.0)
618            .execute(store.pool())
619            .await
620            .unwrap();
621        let (version, text) = store.load_compression_guidelines(Some(cid)).await.unwrap();
622        assert_eq!(version, 0);
623        assert!(text.is_empty());
624    }
625
626    #[tokio::test]
627    async fn log_and_count_failure_pairs() {
628        let store = make_store().await;
629        let cid = ConversationId(store.create_conversation().await.unwrap().0);
630        store
631            .log_compression_failure(cid, "compressed ctx", "i don't recall that", "unknown")
632            .await
633            .unwrap();
634        let count = store.count_unused_failure_pairs().await.unwrap();
635        assert_eq!(count, 1);
636    }
637
638    #[tokio::test]
639    async fn get_unused_pairs_sorted_oldest_first() {
640        let store = make_store().await;
641        let cid = ConversationId(store.create_conversation().await.unwrap().0);
642        store
643            .log_compression_failure(cid, "ctx A", "reason A", "unknown")
644            .await
645            .unwrap();
646        store
647            .log_compression_failure(cid, "ctx B", "reason B", "unknown")
648            .await
649            .unwrap();
650        let pairs = store.get_unused_failure_pairs(10).await.unwrap();
651        assert_eq!(pairs.len(), 2);
652        assert_eq!(pairs[0].compressed_context, "ctx A");
653    }
654
655    #[tokio::test]
656    async fn mark_pairs_used_reduces_count() {
657        let store = make_store().await;
658        let cid = ConversationId(store.create_conversation().await.unwrap().0);
659        let id = store
660            .log_compression_failure(cid, "ctx", "reason", "unknown")
661            .await
662            .unwrap();
663        store.mark_failure_pairs_used(&[id]).await.unwrap();
664        let count = store.count_unused_failure_pairs().await.unwrap();
665        assert_eq!(count, 0);
666    }
667
668    #[tokio::test]
669    async fn cleanup_deletes_used_and_trims_unused() {
670        let store = make_store().await;
671        let cid = ConversationId(store.create_conversation().await.unwrap().0);
672        // Add 3 pairs and mark 1 used.
673        let id1 = store
674            .log_compression_failure(cid, "ctx1", "r1", "tool_output")
675            .await
676            .unwrap();
677        store
678            .log_compression_failure(cid, "ctx2", "r2", "tool_output")
679            .await
680            .unwrap();
681        store
682            .log_compression_failure(cid, "ctx3", "r3", "unknown")
683            .await
684            .unwrap();
685        store.mark_failure_pairs_used(&[id1]).await.unwrap();
686        // Cleanup: keep at most 1 unused.
687        store.cleanup_old_failure_pairs(1).await.unwrap();
688        let count = store.count_unused_failure_pairs().await.unwrap();
689        assert_eq!(count, 1, "only 1 unused pair should remain");
690    }
691
692    #[test]
693    fn redact_sensitive_api_key_is_redacted() {
694        let result = redact_sensitive("token sk-abc123def456 used for auth");
695        assert!(result.contains("[REDACTED]"), "API key must be redacted");
696        assert!(
697            !result.contains("sk-abc123"),
698            "original key must not appear"
699        );
700    }
701
702    #[test]
703    fn redact_sensitive_plain_text_borrows() {
704        let text = "safe text, no secrets here";
705        let result = redact_sensitive(text);
706        assert!(
707            matches!(result, Cow::Borrowed(_)),
708            "plain text must return Cow::Borrowed (zero-alloc)"
709        );
710    }
711
712    #[test]
713    fn redact_sensitive_filesystem_path_is_redacted() {
714        let result = redact_sensitive("config loaded from /Users/dev/project/config.toml");
715        assert!(
716            result.contains("[PATH]"),
717            "filesystem path must be redacted"
718        );
719        assert!(
720            !result.contains("/Users/dev/"),
721            "original path must not appear"
722        );
723    }
724
725    #[test]
726    fn redact_sensitive_combined_secret_and_path() {
727        let result = redact_sensitive("key sk-abc at /home/user/file");
728        assert!(result.contains("[REDACTED]"), "secret must be redacted");
729        assert!(result.contains("[PATH]"), "path must be redacted");
730    }
731
732    #[tokio::test]
733    async fn log_compression_failure_redacts_secrets() {
734        let store = make_store().await;
735        let cid = ConversationId(store.create_conversation().await.unwrap().0);
736        store
737            .log_compression_failure(
738                cid,
739                "token sk-abc123def456 used for auth",
740                "context lost",
741                "unknown",
742            )
743            .await
744            .unwrap();
745        let pairs = store.get_unused_failure_pairs(10).await.unwrap();
746        assert_eq!(pairs.len(), 1);
747        assert!(
748            pairs[0].compressed_context.contains("[REDACTED]"),
749            "stored context must have redacted secret"
750        );
751        assert!(
752            !pairs[0].compressed_context.contains("sk-abc123"),
753            "stored context must not contain raw secret"
754        );
755    }
756
757    #[tokio::test]
758    async fn log_compression_failure_redacts_paths() {
759        let store = make_store().await;
760        let cid = ConversationId(store.create_conversation().await.unwrap().0);
761        store
762            .log_compression_failure(
763                cid,
764                "/Users/dev/project/config.toml was loaded",
765                "lost",
766                "unknown",
767            )
768            .await
769            .unwrap();
770        let pairs = store.get_unused_failure_pairs(10).await.unwrap();
771        assert!(
772            pairs[0].compressed_context.contains("[PATH]"),
773            "stored context must have redacted path"
774        );
775        assert!(
776            !pairs[0].compressed_context.contains("/Users/dev/"),
777            "stored context must not contain raw path"
778        );
779    }
780
781    #[tokio::test]
782    async fn log_compression_failure_reason_also_redacted() {
783        let store = make_store().await;
784        let cid = ConversationId(store.create_conversation().await.unwrap().0);
785        store
786            .log_compression_failure(
787                cid,
788                "some context",
789                "secret ghp_abc123xyz was leaked",
790                "unknown",
791            )
792            .await
793            .unwrap();
794        let pairs = store.get_unused_failure_pairs(10).await.unwrap();
795        assert!(
796            pairs[0].failure_reason.contains("[REDACTED]"),
797            "failure_reason must also be redacted"
798        );
799        assert!(
800            !pairs[0].failure_reason.contains("ghp_abc123xyz"),
801            "raw secret must not appear in failure_reason"
802        );
803    }
804
805    #[tokio::test]
806    async fn truncate_field_respects_char_boundary() {
807        let s = "а".repeat(5000); // Cyrillic 'а', 2 bytes each
808        let truncated = truncate_field(&s);
809        assert!(truncated.len() <= MAX_FIELD_CHARS);
810        assert!(s.is_char_boundary(truncated.len()));
811    }
812
813    #[tokio::test]
814    async fn unique_constraint_prevents_duplicate_version() {
815        let store = make_store().await;
816        // Insert version 1 via the public API.
817        store
818            .save_compression_guidelines("first", 1, None)
819            .await
820            .unwrap();
821        // store.pool() access is intentional: we need direct pool access to bypass
822        // the public API and test the UNIQUE constraint at the SQL level.
823        let result = zeph_db::query(
824            sql!("INSERT INTO compression_guidelines (version, guidelines, token_count) VALUES (1, 'dup', 0)"),
825        )
826        .execute(store.pool())
827        .await;
828        assert!(
829            result.is_err(),
830            "duplicate version insert should violate UNIQUE constraint"
831        );
832    }
833
834    #[test]
835    fn redact_sensitive_bearer_token_is_redacted() {
836        let result =
837            redact_sensitive("Authorization: Bearer eyJhbGciOiJSUzI1NiJ9.payload.signature");
838        assert!(
839            result.contains("[REDACTED]"),
840            "Bearer token must be redacted: {result}"
841        );
842        assert!(
843            !result.contains("eyJhbGciOiJSUzI1NiJ9"),
844            "raw JWT header must not appear: {result}"
845        );
846        assert!(
847            result.contains("Authorization:"),
848            "header name must be preserved: {result}"
849        );
850    }
851
852    #[test]
853    fn redact_sensitive_bearer_token_case_insensitive() {
854        let result =
855            redact_sensitive("authorization: bearer eyJhbGciOiJSUzI1NiJ9.payload.signature");
856        assert!(
857            result.contains("[REDACTED]"),
858            "Bearer header match must be case-insensitive: {result}"
859        );
860    }
861
862    #[test]
863    fn redact_sensitive_standalone_jwt_is_redacted() {
864        let jwt = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyMTIzIn0.SflKxwRJSMeKKF2";
865        let input = format!("token value: {jwt} was found in logs");
866        let result = redact_sensitive(&input);
867        assert!(
868            result.contains("[REDACTED_JWT]"),
869            "standalone JWT must be replaced with [REDACTED_JWT]: {result}"
870        );
871        assert!(
872            !result.contains("eyJhbGci"),
873            "raw JWT must not appear: {result}"
874        );
875    }
876
877    #[test]
878    fn redact_sensitive_mixed_content_all_redacted() {
879        let input =
880            "key sk-abc123 at /home/user/f with Authorization: Bearer eyJhbG.pay.sig and eyJx.b.c";
881        let result = redact_sensitive(input);
882        assert!(result.contains("[REDACTED]"), "API key must be redacted");
883        assert!(result.contains("[PATH]"), "path must be redacted");
884        assert!(!result.contains("sk-abc123"), "raw API key must not appear");
885        assert!(!result.contains("eyJhbG"), "raw JWT must not appear");
886    }
887
888    #[test]
889    fn redact_sensitive_partial_jwt_not_redacted() {
890        // A string starting with eyJ but missing the third segment is not a valid JWT.
891        let input = "eyJhbGciOiJSUzI1NiJ9.onlytwoparts";
892        let result = redact_sensitive(input);
893        // Should not be replaced by the JWT regex (only two dot-separated parts).
894        assert!(
895            !result.contains("[REDACTED_JWT]"),
896            "two-part eyJ string must not be treated as JWT: {result}"
897        );
898        // No substitution occurred — must be zero-alloc Cow::Borrowed.
899        assert!(
900            matches!(result, Cow::Borrowed(_)),
901            "no-match input must return Cow::Borrowed: {result}"
902        );
903    }
904
905    #[test]
906    fn redact_sensitive_alg_none_jwt_empty_signature_redacted() {
907        // alg=none JWTs have an empty third segment: <header>.<payload>.
908        let input =
909            "token: eyJhbGciOiJub25lIn0.eyJzdWIiOiJ1c2VyIn0. was submitted without signature";
910        let result = redact_sensitive(input);
911        assert!(
912            result.contains("[REDACTED_JWT]"),
913            "alg=none JWT with empty signature must be redacted: {result}"
914        );
915        assert!(
916            !result.contains("eyJhbGciOiJub25lIn0"),
917            "raw alg=none JWT header must not appear: {result}"
918        );
919    }
920
921    // ── Category-aware store methods (MF-4) ──────────────────────────────────
922
923    #[tokio::test]
924    async fn get_unused_pairs_by_category_filters_correctly() {
925        let store = make_store().await;
926        let cid = ConversationId(store.create_conversation().await.unwrap().0);
927        store
928            .log_compression_failure(cid, "tool ctx", "lost tool output", "tool_output")
929            .await
930            .unwrap();
931        store
932            .log_compression_failure(cid, "user ctx", "lost user context", "user_context")
933            .await
934            .unwrap();
935
936        let tool_pairs = store
937            .get_unused_failure_pairs_by_category("tool_output", 10)
938            .await
939            .unwrap();
940        assert_eq!(tool_pairs.len(), 1);
941        assert_eq!(tool_pairs[0].category, "tool_output");
942        assert_eq!(tool_pairs[0].compressed_context, "tool ctx");
943
944        let user_pairs = store
945            .get_unused_failure_pairs_by_category("user_context", 10)
946            .await
947            .unwrap();
948        assert_eq!(user_pairs.len(), 1);
949        assert_eq!(user_pairs[0].category, "user_context");
950
951        // Unknown category returns nothing.
952        let unknown_pairs = store
953            .get_unused_failure_pairs_by_category("assistant_reasoning", 10)
954            .await
955            .unwrap();
956        assert!(unknown_pairs.is_empty());
957    }
958
959    #[tokio::test]
960    async fn count_unused_pairs_by_category_returns_correct_count() {
961        let store = make_store().await;
962        let cid = ConversationId(store.create_conversation().await.unwrap().0);
963        store
964            .log_compression_failure(cid, "ctx A", "reason", "tool_output")
965            .await
966            .unwrap();
967        store
968            .log_compression_failure(cid, "ctx B", "reason", "tool_output")
969            .await
970            .unwrap();
971        store
972            .log_compression_failure(cid, "ctx C", "reason", "user_context")
973            .await
974            .unwrap();
975
976        let tool_count = store
977            .count_unused_failure_pairs_by_category("tool_output")
978            .await
979            .unwrap();
980        assert_eq!(tool_count, 2);
981
982        let user_count = store
983            .count_unused_failure_pairs_by_category("user_context")
984            .await
985            .unwrap();
986        assert_eq!(user_count, 1);
987
988        let unknown_count = store
989            .count_unused_failure_pairs_by_category("assistant_reasoning")
990            .await
991            .unwrap();
992        assert_eq!(unknown_count, 0);
993    }
994
995    #[tokio::test]
996    async fn save_and_load_guidelines_by_category() {
997        let store = make_store().await;
998        store
999            .save_compression_guidelines_with_category(
1000                "preserve tool names",
1001                3,
1002                "tool_output",
1003                None,
1004            )
1005            .await
1006            .unwrap();
1007        store
1008            .save_compression_guidelines_with_category("keep user intent", 3, "user_context", None)
1009            .await
1010            .unwrap();
1011
1012        let (_, tool_text) = store
1013            .load_compression_guidelines_by_category("tool_output", None)
1014            .await
1015            .unwrap();
1016        assert_eq!(tool_text, "preserve tool names");
1017
1018        let (_, user_text) = store
1019            .load_compression_guidelines_by_category("user_context", None)
1020            .await
1021            .unwrap();
1022        assert_eq!(user_text, "keep user intent");
1023    }
1024
1025    #[tokio::test]
1026    async fn load_guidelines_by_category_returns_defaults_when_empty() {
1027        let store = make_store().await;
1028        // No guidelines saved for this category.
1029        let (version, text) = store
1030            .load_compression_guidelines_by_category("tool_output", None)
1031            .await
1032            .unwrap();
1033        assert_eq!(version, 0, "version must be 0 when no entries exist");
1034        assert!(text.is_empty(), "text must be empty when no entries exist");
1035    }
1036
1037    /// Concurrent saves must produce strictly unique versions with no collisions.
1038    ///
1039    /// Uses a file-backed database because `SQLite` `:memory:` creates an isolated
1040    /// database per connection — a multi-connection pool over `:memory:` would give
1041    /// each writer its own empty schema and cannot test shared-state atomicity.
1042    #[tokio::test]
1043    async fn concurrent_saves_produce_unique_versions() {
1044        use std::collections::HashSet;
1045        use std::sync::Arc;
1046
1047        let dir = tempfile::tempdir().expect("tempdir");
1048        let db_path = dir.path().join("test.db");
1049        let store = Arc::new(
1050            SqliteStore::with_pool_size(db_path.to_str().expect("utf8 path"), 4)
1051                .await
1052                .expect("file-backed SqliteStore"),
1053        );
1054
1055        let tasks: Vec<_> = (0..8_i64)
1056            .map(|i| {
1057                let s = Arc::clone(&store);
1058                tokio::spawn(async move {
1059                    s.save_compression_guidelines(&format!("guideline {i}"), i, None)
1060                        .await
1061                        .expect("concurrent save must succeed")
1062                })
1063            })
1064            .collect();
1065
1066        let mut versions = HashSet::new();
1067        for task in tasks {
1068            let v = task.await.expect("task must not panic");
1069            assert!(versions.insert(v), "version {v} appeared more than once");
1070        }
1071        assert_eq!(
1072            versions.len(),
1073            8,
1074            "all 8 saves must produce distinct versions"
1075        );
1076    }
1077}