Skip to main content

zeph_memory/store/
compression_guidelines.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! SQLite-backed store for ACON compression guidelines and failure pairs.
5
6use std::borrow::Cow;
7use std::sync::LazyLock;
8#[allow(unused_imports)]
9use zeph_db::sql;
10
11use regex::Regex;
12
13use crate::error::MemoryError;
14use crate::store::SqliteStore;
15use crate::types::ConversationId;
16
17static SECRET_RE: LazyLock<Regex> = LazyLock::new(|| {
18    Regex::new(
19        r#"(?:sk-|sk_live_|sk_test_|AKIA|ghp_|gho_|-----BEGIN|xoxb-|xoxp-|AIza|ya29\.|glpat-|hf_|npm_|dckr_pat_)[^\s"'`,;\{\}\[\]]*"#,
20    )
21    .expect("secret regex")
22});
23
24static PATH_RE: LazyLock<Regex> = LazyLock::new(|| {
25    Regex::new(r#"(?:/home/|/Users/|/root/|/tmp/|/var/)[^\s"'`,;\{\}\[\]]*"#).expect("path regex")
26});
27
28/// Matches `Authorization: Bearer <token>` headers; captures the token value for redaction.
29static BEARER_RE: LazyLock<Regex> =
30    LazyLock::new(|| Regex::new(r"(?i)(Authorization:\s*Bearer\s+)\S+").expect("bearer regex"));
31
32/// Matches standalone JWT tokens (three Base64url-encoded parts separated by dots).
33/// The signature segment uses `*` to handle `alg=none` JWTs with an empty signature.
34static JWT_RE: LazyLock<Regex> = LazyLock::new(|| {
35    Regex::new(r"eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]*").expect("jwt regex")
36});
37
38/// Redact secrets and filesystem paths from text before persistent storage.
39///
40/// Returns `Cow::Borrowed` when no sensitive content is found (zero-alloc fast path).
41pub(crate) fn redact_sensitive(text: &str) -> Cow<'_, str> {
42    // Each replace_all may return Cow::Borrowed (no match) or Cow::Owned (replaced).
43    // We materialise intermediate Owned values into String so that subsequent steps
44    // do not hold a borrow of a local.
45    let s0: Cow<'_, str> = SECRET_RE.replace_all(text, "[REDACTED]");
46    let s1: Cow<'_, str> = match PATH_RE.replace_all(s0.as_ref(), "[PATH]") {
47        Cow::Borrowed(_) => s0,
48        Cow::Owned(o) => Cow::Owned(o),
49    };
50    // Replace only the token value in Bearer headers, keeping the header name intact.
51    let s2: Cow<'_, str> = match BEARER_RE.replace_all(s1.as_ref(), "${1}[REDACTED]") {
52        Cow::Borrowed(_) => s1,
53        Cow::Owned(o) => Cow::Owned(o),
54    };
55    match JWT_RE.replace_all(s2.as_ref(), "[REDACTED_JWT]") {
56        Cow::Borrowed(_) => s2,
57        Cow::Owned(o) => Cow::Owned(o),
58    }
59}
60
61/// A recorded compression failure pair: the compressed context and the response
62/// that indicated context was lost.
63#[derive(Debug, Clone)]
64pub struct CompressionFailurePair {
65    pub id: i64,
66    pub conversation_id: ConversationId,
67    pub compressed_context: String,
68    pub failure_reason: String,
69    pub created_at: String,
70}
71
72/// Maximum characters stored per `compressed_context` or `failure_reason` field.
73const MAX_FIELD_CHARS: usize = 4096;
74
75fn truncate_field(s: &str) -> &str {
76    let mut idx = MAX_FIELD_CHARS;
77    while idx > 0 && !s.is_char_boundary(idx) {
78        idx -= 1;
79    }
80    &s[..idx.min(s.len())]
81}
82
83impl SqliteStore {
84    /// Load the latest active compression guidelines.
85    ///
86    /// When `conversation_id` is `Some`, returns conversation-specific guidelines
87    /// preferred over global (NULL) ones. When `None`, returns only global guidelines.
88    ///
89    /// Returns `(version, guidelines_text)`. Returns `(0, "")` if no guidelines exist yet.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the database query fails.
94    pub async fn load_compression_guidelines(
95        &self,
96        conversation_id: Option<ConversationId>,
97    ) -> Result<(i64, String), MemoryError> {
98        let row = zeph_db::query_as::<_, (i64, String)>(sql!(
99            // When conversation_id is Some(cid): `conversation_id = cid` matches
100            // conversation-specific rows; `conversation_id IS NULL` matches global rows.
101            // The CASE ensures conversation-specific rows sort before global ones.
102            // When conversation_id is None: `conversation_id = NULL` is always false in SQL,
103            // so only `conversation_id IS NULL` rows match — correct global-only behavior.
104            "SELECT version, guidelines FROM compression_guidelines \
105             WHERE conversation_id = ? OR conversation_id IS NULL \
106             ORDER BY CASE WHEN conversation_id IS NOT NULL THEN 0 ELSE 1 END, \
107                      version DESC \
108             LIMIT 1"
109        ))
110        .bind(conversation_id.map(|c| c.0))
111        .fetch_optional(&self.pool)
112        .await?;
113
114        Ok(row.unwrap_or((0, String::new())))
115    }
116
117    /// Load only the version and creation timestamp of the latest active compression guidelines.
118    ///
119    /// Same scoping rules as [`load_compression_guidelines`]: conversation-specific rows are
120    /// preferred over global ones.  Returns `(0, "")` if no guidelines exist yet.
121    ///
122    /// Use this in hot paths where the full text is not needed (e.g. metrics sync).
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if the database query fails.
127    pub async fn load_compression_guidelines_meta(
128        &self,
129        conversation_id: Option<ConversationId>,
130    ) -> Result<(i64, String), MemoryError> {
131        let row = zeph_db::query_as::<_, (i64, String)>(sql!(
132            "SELECT version, created_at FROM compression_guidelines \
133             WHERE conversation_id = ? OR conversation_id IS NULL \
134             ORDER BY CASE WHEN conversation_id IS NOT NULL THEN 0 ELSE 1 END, \
135                      version DESC \
136             LIMIT 1"
137        ))
138        .bind(conversation_id.map(|c| c.0)) // lgtm[rust/cleartext-logging]
139        .fetch_optional(&self.pool)
140        .await?;
141
142        Ok(row.unwrap_or((0, String::new())))
143    }
144
145    /// Save a new version of the compression guidelines.
146    ///
147    /// When `conversation_id` is `Some`, the guidelines are scoped to that conversation.
148    /// When `None`, the guidelines are global (apply as fallback for all conversations).
149    ///
150    /// Inserts a new row; older versions are retained for audit.
151    /// Returns the new version number.
152    ///
153    /// Note: version numbers are globally sequential across all conversation scopes —
154    /// they are not per-conversation counters. The UNIQUE(version) constraint from
155    /// migration 033 is preserved.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the database insert fails (including FK violation if
160    /// `conversation_id` does not reference a valid conversation row).
161    pub async fn save_compression_guidelines(
162        &self,
163        guidelines: &str,
164        token_count: i64,
165        conversation_id: Option<ConversationId>,
166    ) -> Result<i64, MemoryError> {
167        // The INSERT...SELECT computes MAX(version)+1 across all rows (global + per-conversation)
168        // and inserts it in a single statement. SQLite's single-writer WAL guarantee makes this
169        // atomic — no concurrent writer can observe the same MAX and produce a duplicate version.
170        let new_version: i64 = zeph_db::query_scalar(
171            sql!("INSERT INTO compression_guidelines (version, guidelines, token_count, conversation_id) \
172             SELECT COALESCE(MAX(version), 0) + 1, ?, ?, ? \
173             FROM compression_guidelines \
174             RETURNING version"),
175        )
176        .bind(guidelines)
177        .bind(token_count)
178        .bind(conversation_id.map(|c| c.0))
179        .fetch_one(&self.pool)
180        .await?;
181        Ok(new_version)
182    }
183
184    /// Log a compression failure pair.
185    ///
186    /// Both `compressed_context` and `failure_reason` are truncated to 4096 chars.
187    /// Returns the inserted row id.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the database insert fails.
192    pub async fn log_compression_failure(
193        &self,
194        conversation_id: ConversationId,
195        compressed_context: &str,
196        failure_reason: &str,
197    ) -> Result<i64, MemoryError> {
198        let ctx = redact_sensitive(compressed_context);
199        let ctx = truncate_field(&ctx);
200        let reason = redact_sensitive(failure_reason);
201        let reason = truncate_field(&reason);
202        let id = zeph_db::query_scalar(sql!(
203            "INSERT INTO compression_failure_pairs \
204             (conversation_id, compressed_context, failure_reason) \
205             VALUES (?, ?, ?) RETURNING id"
206        ))
207        .bind(conversation_id.0)
208        .bind(ctx)
209        .bind(reason)
210        .fetch_one(&self.pool)
211        .await?;
212        Ok(id)
213    }
214
215    /// Get unused failure pairs (oldest first), up to `limit`.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if the database query fails.
220    pub async fn get_unused_failure_pairs(
221        &self,
222        limit: usize,
223    ) -> Result<Vec<CompressionFailurePair>, MemoryError> {
224        let limit = i64::try_from(limit).unwrap_or(i64::MAX);
225        let rows = zeph_db::query_as::<_, (i64, i64, String, String, String)>(sql!(
226            "SELECT id, conversation_id, compressed_context, failure_reason, created_at \
227             FROM compression_failure_pairs \
228             WHERE used_in_update = 0 \
229             ORDER BY created_at ASC \
230             LIMIT ?"
231        ))
232        .bind(limit)
233        .fetch_all(&self.pool)
234        .await?;
235
236        Ok(rows
237            .into_iter()
238            .map(
239                |(id, cid, ctx, reason, created_at)| CompressionFailurePair {
240                    id,
241                    conversation_id: ConversationId(cid),
242                    compressed_context: ctx,
243                    failure_reason: reason,
244                    created_at,
245                },
246            )
247            .collect())
248    }
249
250    /// Mark failure pairs as consumed by the updater.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the database update fails.
255    pub async fn mark_failure_pairs_used(&self, ids: &[i64]) -> Result<(), MemoryError> {
256        if ids.is_empty() {
257            return Ok(());
258        }
259        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
260        let query = format!(
261            "UPDATE compression_failure_pairs SET used_in_update = 1 WHERE id IN ({placeholders})"
262        );
263        let mut q = zeph_db::query(&query);
264        for id in ids {
265            q = q.bind(id);
266        }
267        q.execute(&self.pool).await?;
268        Ok(())
269    }
270
271    /// Count unused failure pairs.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the database query fails.
276    pub async fn count_unused_failure_pairs(&self) -> Result<i64, MemoryError> {
277        let count = zeph_db::query_scalar(sql!(
278            "SELECT COUNT(*) FROM compression_failure_pairs WHERE used_in_update = 0"
279        ))
280        .fetch_one(&self.pool)
281        .await?;
282        Ok(count)
283    }
284
285    /// Delete old used failure pairs, keeping the most recent `keep_recent` unused pairs.
286    ///
287    /// Removes all rows where `used_in_update = 1`. Unused rows are managed by the
288    /// `max_stored_pairs` enforcement below: if there are more than `keep_recent` unused pairs,
289    /// the oldest excess rows are deleted.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if the database query fails.
294    pub async fn cleanup_old_failure_pairs(&self, keep_recent: usize) -> Result<(), MemoryError> {
295        // Delete all used pairs (they've already been processed).
296        zeph_db::query(sql!(
297            "DELETE FROM compression_failure_pairs WHERE used_in_update = 1"
298        ))
299        .execute(&self.pool)
300        .await?;
301
302        // Keep only the most recent `keep_recent` unused pairs.
303        let keep = i64::try_from(keep_recent).unwrap_or(i64::MAX);
304        zeph_db::query(sql!(
305            "DELETE FROM compression_failure_pairs \
306             WHERE used_in_update = 0 \
307             AND id NOT IN ( \
308                 SELECT id FROM compression_failure_pairs \
309                 WHERE used_in_update = 0 \
310                 ORDER BY created_at DESC \
311                 LIMIT ? \
312             )"
313        ))
314        .bind(keep)
315        .execute(&self.pool)
316        .await?;
317
318        Ok(())
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325
326    // pool_size=1 is required: SQLite :memory: creates an isolated database per
327    // connection, so multiple connections would each see an empty schema.
328    async fn make_store() -> SqliteStore {
329        SqliteStore::with_pool_size(":memory:", 1)
330            .await
331            .expect("in-memory SqliteStore")
332    }
333
334    #[tokio::test]
335    async fn load_guidelines_meta_returns_defaults_when_empty() {
336        let store = make_store().await;
337        let (version, created_at) = store.load_compression_guidelines_meta(None).await.unwrap();
338        assert_eq!(version, 0);
339        assert!(created_at.is_empty());
340    }
341
342    #[tokio::test]
343    async fn load_guidelines_meta_returns_version_and_created_at() {
344        let store = make_store().await;
345        store
346            .save_compression_guidelines("keep file paths", 4, None)
347            .await
348            .unwrap();
349        let (version, created_at) = store.load_compression_guidelines_meta(None).await.unwrap();
350        assert_eq!(version, 1);
351        assert!(!created_at.is_empty(), "created_at should be populated");
352    }
353
354    #[tokio::test]
355    async fn load_guidelines_returns_defaults_when_empty() {
356        let store = make_store().await;
357        let (version, text) = store.load_compression_guidelines(None).await.unwrap();
358        assert_eq!(version, 0);
359        assert!(text.is_empty());
360    }
361
362    #[tokio::test]
363    async fn save_and_load_guidelines() {
364        let store = make_store().await;
365        let v1 = store
366            .save_compression_guidelines("always preserve file paths", 4, None)
367            .await
368            .unwrap();
369        assert_eq!(v1, 1);
370        let v2 = store
371            .save_compression_guidelines(
372                "always preserve file paths\nalways preserve errors",
373                8,
374                None,
375            )
376            .await
377            .unwrap();
378        assert_eq!(v2, 2);
379        // Loading should return the latest version.
380        let (v, text) = store.load_compression_guidelines(None).await.unwrap();
381        assert_eq!(v, 2);
382        assert!(text.contains("errors"));
383    }
384
385    #[tokio::test]
386    async fn load_guidelines_prefers_conversation_specific() {
387        let store = make_store().await;
388        let cid = ConversationId(store.create_conversation().await.unwrap().0);
389        store
390            .save_compression_guidelines("global rule", 2, None)
391            .await
392            .unwrap();
393        store
394            .save_compression_guidelines("conversation rule", 2, Some(cid))
395            .await
396            .unwrap();
397        let (_, text) = store.load_compression_guidelines(Some(cid)).await.unwrap();
398        assert_eq!(text, "conversation rule");
399    }
400
401    #[tokio::test]
402    async fn load_guidelines_falls_back_to_global() {
403        let store = make_store().await;
404        let cid = ConversationId(store.create_conversation().await.unwrap().0);
405        store
406            .save_compression_guidelines("global rule", 2, None)
407            .await
408            .unwrap();
409        // No conversation-specific guidelines; should fall back to global.
410        let (_, text) = store.load_compression_guidelines(Some(cid)).await.unwrap();
411        assert_eq!(text, "global rule");
412    }
413
414    #[tokio::test]
415    async fn load_guidelines_none_returns_global_only() {
416        let store = make_store().await;
417        let cid = ConversationId(store.create_conversation().await.unwrap().0);
418        store
419            .save_compression_guidelines("conversation rule", 2, Some(cid))
420            .await
421            .unwrap();
422        // None should not return conversation-scoped guidelines.
423        let (version, text) = store.load_compression_guidelines(None).await.unwrap();
424        assert_eq!(version, 0);
425        assert!(text.is_empty());
426    }
427
428    #[tokio::test]
429    async fn load_guidelines_scope_isolation() {
430        let store = make_store().await;
431        let cid_a = ConversationId(store.create_conversation().await.unwrap().0);
432        let cid_b = ConversationId(store.create_conversation().await.unwrap().0);
433
434        // Global guideline (conversation_id = None) — visible to all conversations.
435        store
436            .save_compression_guidelines("Use bullet points", 1, None)
437            .await
438            .unwrap();
439        // Conversation-A-specific guideline — must NOT be visible to B.
440        store
441            .save_compression_guidelines("Be concise", 2, Some(cid_a))
442            .await
443            .unwrap();
444
445        // Conversation B: gets only the global guideline, not A's.
446        let (_, text_b) = store
447            .load_compression_guidelines(Some(cid_b))
448            .await
449            .unwrap();
450        assert_eq!(
451            text_b, "Use bullet points",
452            "conversation B must see global guideline"
453        );
454
455        // Conversation A: gets its own guideline (preferred over global).
456        let (_, text_a) = store
457            .load_compression_guidelines(Some(cid_a))
458            .await
459            .unwrap();
460        assert_eq!(
461            text_a, "Be concise",
462            "conversation A must prefer its own guideline over global"
463        );
464
465        // None scope: gets only the global guideline.
466        let (_, text_global) = store.load_compression_guidelines(None).await.unwrap();
467        assert_eq!(
468            text_global, "Use bullet points",
469            "None scope must see only the global guideline"
470        );
471    }
472
473    #[tokio::test]
474    async fn save_with_nonexistent_conversation_id_fails() {
475        let store = make_store().await;
476        let nonexistent = ConversationId(99999);
477        let result = store
478            .save_compression_guidelines("rule", 1, Some(nonexistent))
479            .await;
480        assert!(
481            result.is_err(),
482            "FK violation expected for nonexistent conversation_id"
483        );
484    }
485
486    #[tokio::test]
487    async fn cascade_delete_removes_conversation_guidelines() {
488        let store = make_store().await;
489        let cid = ConversationId(store.create_conversation().await.unwrap().0);
490        store
491            .save_compression_guidelines("rule", 1, Some(cid))
492            .await
493            .unwrap();
494        // Delete the conversation row directly — should cascade-delete the guideline.
495        zeph_db::query(sql!("DELETE FROM conversations WHERE id = ?"))
496            .bind(cid.0)
497            .execute(store.pool())
498            .await
499            .unwrap();
500        let (version, text) = store.load_compression_guidelines(Some(cid)).await.unwrap();
501        assert_eq!(version, 0);
502        assert!(text.is_empty());
503    }
504
505    #[tokio::test]
506    async fn log_and_count_failure_pairs() {
507        let store = make_store().await;
508        let cid = ConversationId(store.create_conversation().await.unwrap().0);
509        store
510            .log_compression_failure(cid, "compressed ctx", "i don't recall that")
511            .await
512            .unwrap();
513        let count = store.count_unused_failure_pairs().await.unwrap();
514        assert_eq!(count, 1);
515    }
516
517    #[tokio::test]
518    async fn get_unused_pairs_sorted_oldest_first() {
519        let store = make_store().await;
520        let cid = ConversationId(store.create_conversation().await.unwrap().0);
521        store
522            .log_compression_failure(cid, "ctx A", "reason A")
523            .await
524            .unwrap();
525        store
526            .log_compression_failure(cid, "ctx B", "reason B")
527            .await
528            .unwrap();
529        let pairs = store.get_unused_failure_pairs(10).await.unwrap();
530        assert_eq!(pairs.len(), 2);
531        assert_eq!(pairs[0].compressed_context, "ctx A");
532    }
533
534    #[tokio::test]
535    async fn mark_pairs_used_reduces_count() {
536        let store = make_store().await;
537        let cid = ConversationId(store.create_conversation().await.unwrap().0);
538        let id = store
539            .log_compression_failure(cid, "ctx", "reason")
540            .await
541            .unwrap();
542        store.mark_failure_pairs_used(&[id]).await.unwrap();
543        let count = store.count_unused_failure_pairs().await.unwrap();
544        assert_eq!(count, 0);
545    }
546
547    #[tokio::test]
548    async fn cleanup_deletes_used_and_trims_unused() {
549        let store = make_store().await;
550        let cid = ConversationId(store.create_conversation().await.unwrap().0);
551        // Add 3 pairs and mark 1 used.
552        let id1 = store
553            .log_compression_failure(cid, "ctx1", "r1")
554            .await
555            .unwrap();
556        store
557            .log_compression_failure(cid, "ctx2", "r2")
558            .await
559            .unwrap();
560        store
561            .log_compression_failure(cid, "ctx3", "r3")
562            .await
563            .unwrap();
564        store.mark_failure_pairs_used(&[id1]).await.unwrap();
565        // Cleanup: keep at most 1 unused.
566        store.cleanup_old_failure_pairs(1).await.unwrap();
567        let count = store.count_unused_failure_pairs().await.unwrap();
568        assert_eq!(count, 1, "only 1 unused pair should remain");
569    }
570
571    #[test]
572    fn redact_sensitive_api_key_is_redacted() {
573        let result = redact_sensitive("token sk-abc123def456 used for auth");
574        assert!(result.contains("[REDACTED]"), "API key must be redacted");
575        assert!(
576            !result.contains("sk-abc123"),
577            "original key must not appear"
578        );
579    }
580
581    #[test]
582    fn redact_sensitive_plain_text_borrows() {
583        let text = "safe text, no secrets here";
584        let result = redact_sensitive(text);
585        assert!(
586            matches!(result, Cow::Borrowed(_)),
587            "plain text must return Cow::Borrowed (zero-alloc)"
588        );
589    }
590
591    #[test]
592    fn redact_sensitive_filesystem_path_is_redacted() {
593        let result = redact_sensitive("config loaded from /Users/dev/project/config.toml");
594        assert!(
595            result.contains("[PATH]"),
596            "filesystem path must be redacted"
597        );
598        assert!(
599            !result.contains("/Users/dev/"),
600            "original path must not appear"
601        );
602    }
603
604    #[test]
605    fn redact_sensitive_combined_secret_and_path() {
606        let result = redact_sensitive("key sk-abc at /home/user/file");
607        assert!(result.contains("[REDACTED]"), "secret must be redacted");
608        assert!(result.contains("[PATH]"), "path must be redacted");
609    }
610
611    #[tokio::test]
612    async fn log_compression_failure_redacts_secrets() {
613        let store = make_store().await;
614        let cid = ConversationId(store.create_conversation().await.unwrap().0);
615        store
616            .log_compression_failure(cid, "token sk-abc123def456 used for auth", "context lost")
617            .await
618            .unwrap();
619        let pairs = store.get_unused_failure_pairs(10).await.unwrap();
620        assert_eq!(pairs.len(), 1);
621        assert!(
622            pairs[0].compressed_context.contains("[REDACTED]"),
623            "stored context must have redacted secret"
624        );
625        assert!(
626            !pairs[0].compressed_context.contains("sk-abc123"),
627            "stored context must not contain raw secret"
628        );
629    }
630
631    #[tokio::test]
632    async fn log_compression_failure_redacts_paths() {
633        let store = make_store().await;
634        let cid = ConversationId(store.create_conversation().await.unwrap().0);
635        store
636            .log_compression_failure(cid, "/Users/dev/project/config.toml was loaded", "lost")
637            .await
638            .unwrap();
639        let pairs = store.get_unused_failure_pairs(10).await.unwrap();
640        assert!(
641            pairs[0].compressed_context.contains("[PATH]"),
642            "stored context must have redacted path"
643        );
644        assert!(
645            !pairs[0].compressed_context.contains("/Users/dev/"),
646            "stored context must not contain raw path"
647        );
648    }
649
650    #[tokio::test]
651    async fn log_compression_failure_reason_also_redacted() {
652        let store = make_store().await;
653        let cid = ConversationId(store.create_conversation().await.unwrap().0);
654        store
655            .log_compression_failure(cid, "some context", "secret ghp_abc123xyz was leaked")
656            .await
657            .unwrap();
658        let pairs = store.get_unused_failure_pairs(10).await.unwrap();
659        assert!(
660            pairs[0].failure_reason.contains("[REDACTED]"),
661            "failure_reason must also be redacted"
662        );
663        assert!(
664            !pairs[0].failure_reason.contains("ghp_abc123xyz"),
665            "raw secret must not appear in failure_reason"
666        );
667    }
668
669    #[tokio::test]
670    async fn truncate_field_respects_char_boundary() {
671        let s = "а".repeat(5000); // Cyrillic 'а', 2 bytes each
672        let truncated = truncate_field(&s);
673        assert!(truncated.len() <= MAX_FIELD_CHARS);
674        assert!(s.is_char_boundary(truncated.len()));
675    }
676
677    #[tokio::test]
678    async fn unique_constraint_prevents_duplicate_version() {
679        let store = make_store().await;
680        // Insert version 1 via the public API.
681        store
682            .save_compression_guidelines("first", 1, None)
683            .await
684            .unwrap();
685        // store.pool() access is intentional: we need direct pool access to bypass
686        // the public API and test the UNIQUE constraint at the SQL level.
687        let result = zeph_db::query(
688            sql!("INSERT INTO compression_guidelines (version, guidelines, token_count) VALUES (1, 'dup', 0)"),
689        )
690        .execute(store.pool())
691        .await;
692        assert!(
693            result.is_err(),
694            "duplicate version insert should violate UNIQUE constraint"
695        );
696    }
697
698    #[test]
699    fn redact_sensitive_bearer_token_is_redacted() {
700        let result =
701            redact_sensitive("Authorization: Bearer eyJhbGciOiJSUzI1NiJ9.payload.signature");
702        assert!(
703            result.contains("[REDACTED]"),
704            "Bearer token must be redacted: {result}"
705        );
706        assert!(
707            !result.contains("eyJhbGciOiJSUzI1NiJ9"),
708            "raw JWT header must not appear: {result}"
709        );
710        assert!(
711            result.contains("Authorization:"),
712            "header name must be preserved: {result}"
713        );
714    }
715
716    #[test]
717    fn redact_sensitive_bearer_token_case_insensitive() {
718        let result =
719            redact_sensitive("authorization: bearer eyJhbGciOiJSUzI1NiJ9.payload.signature");
720        assert!(
721            result.contains("[REDACTED]"),
722            "Bearer header match must be case-insensitive: {result}"
723        );
724    }
725
726    #[test]
727    fn redact_sensitive_standalone_jwt_is_redacted() {
728        let jwt = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyMTIzIn0.SflKxwRJSMeKKF2";
729        let input = format!("token value: {jwt} was found in logs");
730        let result = redact_sensitive(&input);
731        assert!(
732            result.contains("[REDACTED_JWT]"),
733            "standalone JWT must be replaced with [REDACTED_JWT]: {result}"
734        );
735        assert!(
736            !result.contains("eyJhbGci"),
737            "raw JWT must not appear: {result}"
738        );
739    }
740
741    #[test]
742    fn redact_sensitive_mixed_content_all_redacted() {
743        let input =
744            "key sk-abc123 at /home/user/f with Authorization: Bearer eyJhbG.pay.sig and eyJx.b.c";
745        let result = redact_sensitive(input);
746        assert!(result.contains("[REDACTED]"), "API key must be redacted");
747        assert!(result.contains("[PATH]"), "path must be redacted");
748        assert!(!result.contains("sk-abc123"), "raw API key must not appear");
749        assert!(!result.contains("eyJhbG"), "raw JWT must not appear");
750    }
751
752    #[test]
753    fn redact_sensitive_partial_jwt_not_redacted() {
754        // A string starting with eyJ but missing the third segment is not a valid JWT.
755        let input = "eyJhbGciOiJSUzI1NiJ9.onlytwoparts";
756        let result = redact_sensitive(input);
757        // Should not be replaced by the JWT regex (only two dot-separated parts).
758        assert!(
759            !result.contains("[REDACTED_JWT]"),
760            "two-part eyJ string must not be treated as JWT: {result}"
761        );
762        // No substitution occurred — must be zero-alloc Cow::Borrowed.
763        assert!(
764            matches!(result, Cow::Borrowed(_)),
765            "no-match input must return Cow::Borrowed: {result}"
766        );
767    }
768
769    #[test]
770    fn redact_sensitive_alg_none_jwt_empty_signature_redacted() {
771        // alg=none JWTs have an empty third segment: <header>.<payload>.
772        let input =
773            "token: eyJhbGciOiJub25lIn0.eyJzdWIiOiJ1c2VyIn0. was submitted without signature";
774        let result = redact_sensitive(input);
775        assert!(
776            result.contains("[REDACTED_JWT]"),
777            "alg=none JWT with empty signature must be redacted: {result}"
778        );
779        assert!(
780            !result.contains("eyJhbGciOiJub25lIn0"),
781            "raw alg=none JWT header must not appear: {result}"
782        );
783    }
784
785    /// Concurrent saves must produce strictly unique versions with no collisions.
786    ///
787    /// Uses a file-backed database because `SQLite` `:memory:` creates an isolated
788    /// database per connection — a multi-connection pool over `:memory:` would give
789    /// each writer its own empty schema and cannot test shared-state atomicity.
790    #[tokio::test]
791    async fn concurrent_saves_produce_unique_versions() {
792        use std::collections::HashSet;
793        use std::sync::Arc;
794
795        let dir = tempfile::tempdir().expect("tempdir");
796        let db_path = dir.path().join("test.db");
797        let store = Arc::new(
798            SqliteStore::with_pool_size(db_path.to_str().expect("utf8 path"), 4)
799                .await
800                .expect("file-backed SqliteStore"),
801        );
802
803        let tasks: Vec<_> = (0..8_i64)
804            .map(|i| {
805                let s = Arc::clone(&store);
806                tokio::spawn(async move {
807                    s.save_compression_guidelines(&format!("guideline {i}"), i, None)
808                        .await
809                        .expect("concurrent save must succeed")
810                })
811            })
812            .collect();
813
814        let mut versions = HashSet::new();
815        for task in tasks {
816            let v = task.await.expect("task must not panic");
817            assert!(versions.insert(v), "version {v} appeared more than once");
818        }
819        assert_eq!(
820            versions.len(),
821            8,
822            "all 8 saves must produce distinct versions"
823        );
824    }
825}