zeph-memory 0.21.2

Semantic memory with SQLite and Qdrant for Zeph agent
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
// SPDX-License-Identifier: MIT OR Apache-2.0

//! `ScrapMem` optical forgetting — progressive content-fidelity decay (issue #3713).
//!
//! Transitions old messages through resolution levels by compressing their content via LLM:
//!
//! 1. **Full** — original message content, unchanged.
//! 2. **Compressed** — LLM-generated summary preserving key facts (stored in `compressed_content`).
//! 3. **`SummaryOnly`** — one-line distilled fact (replaces original content, most compact).
//!
//! The sweep is orthogonal to `SleepGate` (which decays importance scores):
//! - `SleepGate` prunes by importance score below a floor.
//! - Optical forgetting compresses by age (turns since creation).
//!
//! Both can run concurrently; optical forgetting skips messages below the `SleepGate` prune
//! threshold to avoid compressing content that will be pruned shortly anyway.
//!
//! # Invariants
//!
//! - Messages below the `SleepGate` `forgetting_floor` are skipped.
//! - The `episodic_events` table (EM-Graph) references messages by FK; events survive
//!   optical forgetting because messages are never deleted — only their content is replaced.
//! - `focus_pinned` is a runtime-only `MessageMetadata` field and is not stored in the
//!   `messages` table, so it cannot be filtered at the SQL level. The agent loop is
//!   responsible for not triggering optical forgetting on pinned sessions.

use std::sync::Arc;
use std::time::Duration;

use tokio_util::sync::CancellationToken;
use zeph_llm::any::AnyProvider;
use zeph_llm::provider::{LlmProvider as _, Message, MessageMetadata, Role};

pub use zeph_config::memory::OpticalForgettingConfig;

use crate::error::MemoryError;
use crate::store::SqliteStore;

// ── Content fidelity tier ──────────────────────────────────────────────────────

/// Content-fidelity level for optical forgetting.
///
/// Distinct from [`crate::compression::CompressionLevel`], which classifies memory *type*
/// (episodic vs. declarative abstraction). `ContentFidelity` classifies memory *fidelity*:
/// how much of the original content is preserved. A message can be both
/// `CompressionLevel::Episodic` and `ContentFidelity::Compressed`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ContentFidelity {
    /// Original full-fidelity content.
    Full,
    /// LLM-compressed summary preserving key facts.
    Compressed,
    /// One-line distilled fact. Terminal state.
    SummaryOnly,
}

impl ContentFidelity {
    /// Canonical string stored in the `content_fidelity` column.
    #[must_use]
    pub fn as_str(self) -> &'static str {
        match self {
            Self::Full => "Full",
            Self::Compressed => "Compressed",
            Self::SummaryOnly => "SummaryOnly",
        }
    }
}

impl std::fmt::Display for ContentFidelity {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(self.as_str())
    }
}

impl std::str::FromStr for ContentFidelity {
    type Err = String;
    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "Full" => Ok(Self::Full),
            "Compressed" => Ok(Self::Compressed),
            "SummaryOnly" => Ok(Self::SummaryOnly),
            other => Err(format!("unknown content_fidelity: {other}")),
        }
    }
}

// ── Result ────────────────────────────────────────────────────────────────────

/// Outcome of a single optical forgetting sweep.
#[derive(Debug, Default)]
pub struct OpticalForgettingResult {
    /// Messages transitioned Full → Compressed.
    pub compressed: u32,
    /// Messages transitioned `Compressed` → `SummaryOnly`.
    pub summarized: u32,
    /// Messages skipped (pinned or below `SleepGate` floor).
    pub skipped: u32,
}

// ── Background loop ───────────────────────────────────────────────────────────

/// Start the background optical forgetting loop.
///
/// Periodically scans messages older than the configured thresholds and progressively
/// compresses them. Database errors are logged but do not stop the loop.
///
/// The loop respects `cancel` for graceful shutdown.
pub async fn start_optical_forgetting_loop(
    store: Arc<SqliteStore>,
    provider: AnyProvider,
    config: OpticalForgettingConfig,
    forgetting_floor: f32,
    cancel: CancellationToken,
) {
    if !config.enabled {
        tracing::debug!("optical forgetting disabled (optical_forgetting.enabled = false)");
        return;
    }

    let provider = Arc::new(provider);
    let mut ticker = tokio::time::interval(Duration::from_secs(config.sweep_interval_secs));
    ticker.tick().await; // skip first immediate tick

    loop {
        tokio::select! {
            () = cancel.cancelled() => {
                tracing::debug!("optical forgetting loop shutting down");
                return;
            }
            _ = ticker.tick() => {}
        }

        tracing::debug!("optical_forgetting: starting sweep");
        let start = std::time::Instant::now();

        match run_optical_forgetting_sweep(&store, &provider, &config, forgetting_floor).await {
            Ok(r) => {
                tracing::info!(
                    compressed = r.compressed,
                    summarized = r.summarized,
                    skipped = r.skipped,
                    elapsed_ms = start.elapsed().as_millis(),
                    "optical_forgetting: sweep complete"
                );
            }
            Err(e) => {
                tracing::warn!(
                    error = %e,
                    elapsed_ms = start.elapsed().as_millis(),
                    "optical_forgetting: sweep failed, will retry"
                );
            }
        }
    }
}

// ── Sweep implementation ──────────────────────────────────────────────────────

/// Execute one full optical forgetting sweep.
///
/// Phase 1: compress Full messages older than `compress_after_turns`.
/// Phase 2: summarize Compressed messages older than `summarize_after_turns`.
///
/// Skips messages with `importance_score` below `forgetting_floor`
/// (they will be pruned by `SleepGate` soon anyway).
///
/// # Errors
///
/// Returns an error if any database operation fails.
#[tracing::instrument(name = "memory.optical_forgetting", skip_all)]
pub async fn run_optical_forgetting_sweep(
    store: &SqliteStore,
    provider: &Arc<AnyProvider>,
    config: &OpticalForgettingConfig,
    forgetting_floor: f32,
) -> Result<OpticalForgettingResult, MemoryError> {
    let mut result = OpticalForgettingResult::default();

    // Phase 1: Full → Compressed
    let full_candidates = fetch_full_candidates(store, config, forgetting_floor).await?;
    for (msg_id, content) in full_candidates {
        match compress_content(provider, &content).await {
            Ok(compressed) => {
                store_compressed(store, msg_id, &compressed).await?;
                result.compressed += 1;
                tracing::debug!(msg_id, "optical_forgetting: Full → Compressed");
            }
            Err(e) => {
                tracing::warn!(error = %e, msg_id, "optical_forgetting: compression failed, skipping");
                result.skipped += 1;
            }
        }
    }

    // Phase 2: Compressed → SummaryOnly
    let compressed_candidates =
        fetch_compressed_candidates(store, config, forgetting_floor).await?;
    for (msg_id, compressed_content) in compressed_candidates {
        match summarize_content(provider, &compressed_content).await {
            Ok(summary) => {
                store_summary_only(store, msg_id, &summary).await?;
                result.summarized += 1;
                tracing::debug!(msg_id, "optical_forgetting: Compressed → SummaryOnly");
            }
            Err(e) => {
                tracing::warn!(error = %e, msg_id, "optical_forgetting: summarization failed, skipping");
                result.skipped += 1;
            }
        }
    }

    Ok(result)
}

// ── Database helpers ──────────────────────────────────────────────────────────

/// Fetch message IDs and content for Full messages eligible for compression.
///
/// Skips messages below `forgetting_floor`.
async fn fetch_full_candidates(
    store: &SqliteStore,
    config: &OpticalForgettingConfig,
    forgetting_floor: f32,
) -> Result<Vec<(i64, String)>, MemoryError> {
    // COALESCE handles empty table: MAX(id) returns NULL, NULL - N is NULL in SQLite,
    // making the condition always false (no candidates). COALESCE(MAX(id), 0) returns 0,
    // so 0 - N is negative and no row satisfies id <= negative (same safe result, explicit).
    // Note: focus_pinned is not a DB column (it is a runtime MessageMetadata field only),
    // so pinned messages are not excluded here — the caller should avoid passing pinned
    // message IDs through optical forgetting, which is ensured by only sweeping full
    // sessions at the agent level.
    let rows = sqlx::query_as::<_, (i64, String)>(
        "SELECT id, content FROM messages
         WHERE content_fidelity = 'Full'
           AND deleted_at IS NULL
           AND (importance_score IS NULL OR importance_score >= ?)
           AND id <= (SELECT COALESCE(MAX(id), 0) - ? FROM messages)
         ORDER BY id ASC
         LIMIT ?",
    )
    .bind(forgetting_floor)
    .bind(i64::from(config.compress_after_turns))
    .bind(i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX))
    .fetch_all(store.pool())
    .await?;

    Ok(rows)
}

/// Fetch message IDs and compressed content for `Compressed` messages eligible for `SummaryOnly`.
async fn fetch_compressed_candidates(
    store: &SqliteStore,
    config: &OpticalForgettingConfig,
    forgetting_floor: f32,
) -> Result<Vec<(i64, String)>, MemoryError> {
    let rows = sqlx::query_as::<_, (i64, Option<String>)>(
        "SELECT id, compressed_content FROM messages
         WHERE content_fidelity = 'Compressed'
           AND deleted_at IS NULL
           AND (importance_score IS NULL OR importance_score >= ?)
           AND id <= (SELECT COALESCE(MAX(id), 0) - ? FROM messages)
         ORDER BY id ASC
         LIMIT ?",
    )
    .bind(forgetting_floor)
    .bind(i64::from(config.summarize_after_turns))
    .bind(i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX))
    .fetch_all(store.pool())
    .await?;

    Ok(rows
        .into_iter()
        .filter_map(|(id, content)| content.map(|c| (id, c)))
        .collect())
}

/// Update a message to Compressed state, storing the LLM summary in `compressed_content`.
async fn store_compressed(
    store: &SqliteStore,
    msg_id: i64,
    compressed: &str,
) -> Result<(), MemoryError> {
    sqlx::query(
        "UPDATE messages
         SET content_fidelity = 'Compressed', compressed_content = ?
         WHERE id = ?",
    )
    .bind(compressed)
    .bind(msg_id)
    .execute(store.pool())
    .await?;
    Ok(())
}

/// Update a message to `SummaryOnly` state, replacing content with the one-line summary.
async fn store_summary_only(
    store: &SqliteStore,
    msg_id: i64,
    summary: &str,
) -> Result<(), MemoryError> {
    sqlx::query(
        "UPDATE messages
         SET content_fidelity = 'SummaryOnly', content = ?, compressed_content = NULL
         WHERE id = ?",
    )
    .bind(summary)
    .bind(msg_id)
    .execute(store.pool())
    .await?;
    Ok(())
}

// ── LLM compression helpers ───────────────────────────────────────────────────

/// Ask the LLM to produce a compressed summary of `content`.
#[tracing::instrument(name = "memory.optical_forgetting.compress", skip_all, err)]
async fn compress_content(
    provider: &Arc<AnyProvider>,
    content: &str,
) -> Result<String, MemoryError> {
    let cleaned = zeph_common::sanitize::strip_control_chars_preserve_whitespace(content);
    let snippet = cleaned.chars().take(2000).collect::<String>();
    let messages = vec![
        Message {
            role: Role::System,
            content: "You compress conversation messages into concise summaries that preserve \
                      all key facts, decisions, and action items. Output only the summary text, \
                      no preamble."
                .to_owned(),
            parts: vec![],
            metadata: MessageMetadata::default(),
        },
        Message {
            role: Role::User,
            content: format!("Compress this message:\n\n{snippet}"),
            parts: vec![],
            metadata: MessageMetadata::default(),
        },
    ];

    let raw = tokio::time::timeout(Duration::from_secs(15), provider.chat(&messages))
        .await
        .map_err(|_| MemoryError::Timeout("optical_forgetting: compress timed out".into()))?
        .map_err(MemoryError::Llm)?;

    Ok(raw.trim().to_owned())
}

/// Ask the LLM to distill `content` into a single-line summary.
#[tracing::instrument(name = "memory.optical_forgetting.summarize", skip_all, err)]
async fn summarize_content(
    provider: &Arc<AnyProvider>,
    content: &str,
) -> Result<String, MemoryError> {
    let cleaned = zeph_common::sanitize::strip_control_chars_preserve_whitespace(content);
    let snippet = cleaned.chars().take(1000).collect::<String>();
    let messages = vec![
        Message {
            role: Role::System,
            content: "You distill summaries into single sentences that capture the essential \
                      fact or outcome. Output only the one-sentence summary, no preamble."
                .to_owned(),
            parts: vec![],
            metadata: MessageMetadata::default(),
        },
        Message {
            role: Role::User,
            content: format!("Distill into one sentence:\n\n{snippet}"),
            parts: vec![],
            metadata: MessageMetadata::default(),
        },
    ];

    let raw = tokio::time::timeout(Duration::from_secs(10), provider.chat(&messages))
        .await
        .map_err(|_| MemoryError::Timeout("optical_forgetting: summarize timed out".into()))?
        .map_err(MemoryError::Llm)?;

    Ok(raw.trim().to_owned())
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use zeph_config::providers::ProviderName;

    #[test]
    fn content_fidelity_round_trip() {
        for fidelity in [
            ContentFidelity::Full,
            ContentFidelity::Compressed,
            ContentFidelity::SummaryOnly,
        ] {
            let s = fidelity.as_str();
            let parsed: ContentFidelity = s.parse().expect("should parse");
            assert_eq!(parsed, fidelity);
            assert_eq!(format!("{fidelity}"), s);
        }
    }

    #[test]
    fn content_fidelity_unknown_string_errors() {
        assert!("unknown".parse::<ContentFidelity>().is_err());
    }

    #[test]
    fn optical_forgetting_config_defaults() {
        let cfg = OpticalForgettingConfig::default();
        assert!(!cfg.enabled);
        assert_eq!(cfg.compress_after_turns, 100);
        assert_eq!(cfg.summarize_after_turns, 500);
        assert_eq!(cfg.sweep_interval_secs, 3600);
        assert_eq!(cfg.sweep_batch_size, 50);
    }

    #[test]
    fn optical_forgetting_result_default() {
        let r = OpticalForgettingResult::default();
        assert_eq!(r.compressed, 0);
        assert_eq!(r.summarized, 0);
        assert_eq!(r.skipped, 0);
    }

    /// Verify that `run_optical_forgetting_sweep` skips all messages when
    /// `compress_after_turns` is larger than the message count (nothing is old enough).
    #[tokio::test]
    async fn sweep_skips_when_no_candidates_old_enough() {
        use std::sync::Arc;

        use zeph_llm::any::AnyProvider;
        use zeph_llm::mock::MockProvider;

        use crate::store::SqliteStore;

        let store = Arc::new(
            SqliteStore::new(":memory:")
                .await
                .expect("SqliteStore::new"),
        );
        let provider = Arc::new(AnyProvider::Mock(MockProvider::default()));

        let cid = store.create_conversation().await.expect("conversation");
        store
            .save_message(cid, "user", "hello")
            .await
            .expect("save_message");

        let config = OpticalForgettingConfig {
            enabled: true,
            compress_after_turns: 100, // message is too recent
            summarize_after_turns: 500,
            sweep_interval_secs: 3600,
            sweep_batch_size: 50,
            compress_provider: ProviderName::default(),
        };
        let result = run_optical_forgetting_sweep(&store, &provider, &config, 0.0)
            .await
            .expect("sweep");

        assert_eq!(
            result.compressed, 0,
            "no message should be compressed when not old enough"
        );
        assert_eq!(result.summarized, 0);
    }

    /// Verify that `run_optical_forgetting_sweep` compresses a Full message that is
    /// old enough (`compress_after_turns` = 0).
    #[tokio::test]
    async fn sweep_compresses_eligible_full_message() {
        use std::sync::Arc;

        use zeph_llm::any::AnyProvider;
        use zeph_llm::mock::MockProvider;

        use crate::store::SqliteStore;

        let store = Arc::new(
            SqliteStore::new(":memory:")
                .await
                .expect("SqliteStore::new"),
        );
        let mock = MockProvider::with_responses(vec!["compressed summary".to_owned()]);
        let provider = Arc::new(AnyProvider::Mock(mock));

        let cid = store.create_conversation().await.expect("conversation");
        // Insert two messages so MAX(id) - 0 = MAX(id), meaning the first message qualifies.
        store
            .save_message(cid, "user", "first message")
            .await
            .expect("save_message 1");
        store
            .save_message(cid, "user", "second message")
            .await
            .expect("save_message 2");

        let config = OpticalForgettingConfig {
            enabled: true,
            compress_after_turns: 0, // everything is eligible
            summarize_after_turns: 500,
            sweep_interval_secs: 3600,
            sweep_batch_size: 50,
            compress_provider: ProviderName::default(),
        };
        let result = run_optical_forgetting_sweep(&store, &provider, &config, 0.0)
            .await
            .expect("sweep");

        // At least one message compressed (the mock returns one response).
        assert!(
            result.compressed >= 1,
            "at least one message must be compressed"
        );
    }

    /// Verify early return when `enabled = false`.
    #[tokio::test]
    async fn sweep_disabled_returns_empty_result() {
        use std::sync::Arc;

        use zeph_llm::any::AnyProvider;
        use zeph_llm::mock::MockProvider;

        use crate::store::SqliteStore;

        let store = Arc::new(
            SqliteStore::new(":memory:")
                .await
                .expect("SqliteStore::new"),
        );
        let provider = Arc::new(AnyProvider::Mock(MockProvider::default()));
        let config = OpticalForgettingConfig {
            enabled: false,
            ..Default::default()
        };
        // With enabled=false, the loop won't call sweep at all. Test that sweep itself
        // produces no side effects when the DB is empty.
        let result = run_optical_forgetting_sweep(&store, &provider, &config, 0.0)
            .await
            .expect("sweep with disabled config");
        assert_eq!(result.compressed, 0);
        assert_eq!(result.summarized, 0);
    }
}