use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use zeph_llm::any::AnyProvider;
use zeph_llm::provider::{LlmProvider as _, Message, MessageMetadata, Role};
pub use zeph_config::memory::OpticalForgettingConfig;
use crate::error::MemoryError;
use crate::store::SqliteStore;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ContentFidelity {
Full,
Compressed,
SummaryOnly,
}
impl ContentFidelity {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Full => "Full",
Self::Compressed => "Compressed",
Self::SummaryOnly => "SummaryOnly",
}
}
}
impl std::fmt::Display for ContentFidelity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for ContentFidelity {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Full" => Ok(Self::Full),
"Compressed" => Ok(Self::Compressed),
"SummaryOnly" => Ok(Self::SummaryOnly),
other => Err(format!("unknown content_fidelity: {other}")),
}
}
}
#[derive(Debug, Default)]
pub struct OpticalForgettingResult {
pub compressed: u32,
pub summarized: u32,
pub skipped: u32,
}
pub async fn start_optical_forgetting_loop(
store: Arc<SqliteStore>,
provider: AnyProvider,
config: OpticalForgettingConfig,
forgetting_floor: f32,
cancel: CancellationToken,
) {
if !config.enabled {
tracing::debug!("optical forgetting disabled (optical_forgetting.enabled = false)");
return;
}
let provider = Arc::new(provider);
let mut ticker = tokio::time::interval(Duration::from_secs(config.sweep_interval_secs));
ticker.tick().await;
loop {
tokio::select! {
() = cancel.cancelled() => {
tracing::debug!("optical forgetting loop shutting down");
return;
}
_ = ticker.tick() => {}
}
tracing::debug!("optical_forgetting: starting sweep");
let start = std::time::Instant::now();
match run_optical_forgetting_sweep(&store, &provider, &config, forgetting_floor).await {
Ok(r) => {
tracing::info!(
compressed = r.compressed,
summarized = r.summarized,
skipped = r.skipped,
elapsed_ms = start.elapsed().as_millis(),
"optical_forgetting: sweep complete"
);
}
Err(e) => {
tracing::warn!(
error = %e,
elapsed_ms = start.elapsed().as_millis(),
"optical_forgetting: sweep failed, will retry"
);
}
}
}
}
#[tracing::instrument(name = "memory.optical_forgetting", skip_all)]
pub async fn run_optical_forgetting_sweep(
store: &SqliteStore,
provider: &Arc<AnyProvider>,
config: &OpticalForgettingConfig,
forgetting_floor: f32,
) -> Result<OpticalForgettingResult, MemoryError> {
let mut result = OpticalForgettingResult::default();
let full_candidates = fetch_full_candidates(store, config, forgetting_floor).await?;
for (msg_id, content) in full_candidates {
match compress_content(provider, &content).await {
Ok(compressed) => {
store_compressed(store, msg_id, &compressed).await?;
result.compressed += 1;
tracing::debug!(msg_id, "optical_forgetting: Full → Compressed");
}
Err(e) => {
tracing::warn!(error = %e, msg_id, "optical_forgetting: compression failed, skipping");
result.skipped += 1;
}
}
}
let compressed_candidates =
fetch_compressed_candidates(store, config, forgetting_floor).await?;
for (msg_id, compressed_content) in compressed_candidates {
match summarize_content(provider, &compressed_content).await {
Ok(summary) => {
store_summary_only(store, msg_id, &summary).await?;
result.summarized += 1;
tracing::debug!(msg_id, "optical_forgetting: Compressed → SummaryOnly");
}
Err(e) => {
tracing::warn!(error = %e, msg_id, "optical_forgetting: summarization failed, skipping");
result.skipped += 1;
}
}
}
Ok(result)
}
async fn fetch_full_candidates(
store: &SqliteStore,
config: &OpticalForgettingConfig,
forgetting_floor: f32,
) -> Result<Vec<(i64, String)>, MemoryError> {
let rows = sqlx::query_as::<_, (i64, String)>(
"SELECT id, content FROM messages
WHERE content_fidelity = 'Full'
AND deleted_at IS NULL
AND (importance_score IS NULL OR importance_score >= ?)
AND id <= (SELECT COALESCE(MAX(id), 0) - ? FROM messages)
ORDER BY id ASC
LIMIT ?",
)
.bind(forgetting_floor)
.bind(i64::from(config.compress_after_turns))
.bind(i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX))
.fetch_all(store.pool())
.await?;
Ok(rows)
}
async fn fetch_compressed_candidates(
store: &SqliteStore,
config: &OpticalForgettingConfig,
forgetting_floor: f32,
) -> Result<Vec<(i64, String)>, MemoryError> {
let rows = sqlx::query_as::<_, (i64, Option<String>)>(
"SELECT id, compressed_content FROM messages
WHERE content_fidelity = 'Compressed'
AND deleted_at IS NULL
AND (importance_score IS NULL OR importance_score >= ?)
AND id <= (SELECT COALESCE(MAX(id), 0) - ? FROM messages)
ORDER BY id ASC
LIMIT ?",
)
.bind(forgetting_floor)
.bind(i64::from(config.summarize_after_turns))
.bind(i64::try_from(config.sweep_batch_size).unwrap_or(i64::MAX))
.fetch_all(store.pool())
.await?;
Ok(rows
.into_iter()
.filter_map(|(id, content)| content.map(|c| (id, c)))
.collect())
}
async fn store_compressed(
store: &SqliteStore,
msg_id: i64,
compressed: &str,
) -> Result<(), MemoryError> {
sqlx::query(
"UPDATE messages
SET content_fidelity = 'Compressed', compressed_content = ?
WHERE id = ?",
)
.bind(compressed)
.bind(msg_id)
.execute(store.pool())
.await?;
Ok(())
}
async fn store_summary_only(
store: &SqliteStore,
msg_id: i64,
summary: &str,
) -> Result<(), MemoryError> {
sqlx::query(
"UPDATE messages
SET content_fidelity = 'SummaryOnly', content = ?, compressed_content = NULL
WHERE id = ?",
)
.bind(summary)
.bind(msg_id)
.execute(store.pool())
.await?;
Ok(())
}
#[tracing::instrument(name = "memory.optical_forgetting.compress", skip_all, err)]
async fn compress_content(
provider: &Arc<AnyProvider>,
content: &str,
) -> Result<String, MemoryError> {
let cleaned = zeph_common::sanitize::strip_control_chars_preserve_whitespace(content);
let snippet = cleaned.chars().take(2000).collect::<String>();
let messages = vec![
Message {
role: Role::System,
content: "You compress conversation messages into concise summaries that preserve \
all key facts, decisions, and action items. Output only the summary text, \
no preamble."
.to_owned(),
parts: vec![],
metadata: MessageMetadata::default(),
},
Message {
role: Role::User,
content: format!("Compress this message:\n\n{snippet}"),
parts: vec![],
metadata: MessageMetadata::default(),
},
];
let raw = tokio::time::timeout(Duration::from_secs(15), provider.chat(&messages))
.await
.map_err(|_| MemoryError::Timeout("optical_forgetting: compress timed out".into()))?
.map_err(MemoryError::Llm)?;
Ok(raw.trim().to_owned())
}
#[tracing::instrument(name = "memory.optical_forgetting.summarize", skip_all, err)]
async fn summarize_content(
provider: &Arc<AnyProvider>,
content: &str,
) -> Result<String, MemoryError> {
let cleaned = zeph_common::sanitize::strip_control_chars_preserve_whitespace(content);
let snippet = cleaned.chars().take(1000).collect::<String>();
let messages = vec![
Message {
role: Role::System,
content: "You distill summaries into single sentences that capture the essential \
fact or outcome. Output only the one-sentence summary, no preamble."
.to_owned(),
parts: vec![],
metadata: MessageMetadata::default(),
},
Message {
role: Role::User,
content: format!("Distill into one sentence:\n\n{snippet}"),
parts: vec![],
metadata: MessageMetadata::default(),
},
];
let raw = tokio::time::timeout(Duration::from_secs(10), provider.chat(&messages))
.await
.map_err(|_| MemoryError::Timeout("optical_forgetting: summarize timed out".into()))?
.map_err(MemoryError::Llm)?;
Ok(raw.trim().to_owned())
}
#[cfg(test)]
mod tests {
use super::*;
use zeph_config::providers::ProviderName;
#[test]
fn content_fidelity_round_trip() {
for fidelity in [
ContentFidelity::Full,
ContentFidelity::Compressed,
ContentFidelity::SummaryOnly,
] {
let s = fidelity.as_str();
let parsed: ContentFidelity = s.parse().expect("should parse");
assert_eq!(parsed, fidelity);
assert_eq!(format!("{fidelity}"), s);
}
}
#[test]
fn content_fidelity_unknown_string_errors() {
assert!("unknown".parse::<ContentFidelity>().is_err());
}
#[test]
fn optical_forgetting_config_defaults() {
let cfg = OpticalForgettingConfig::default();
assert!(!cfg.enabled);
assert_eq!(cfg.compress_after_turns, 100);
assert_eq!(cfg.summarize_after_turns, 500);
assert_eq!(cfg.sweep_interval_secs, 3600);
assert_eq!(cfg.sweep_batch_size, 50);
}
#[test]
fn optical_forgetting_result_default() {
let r = OpticalForgettingResult::default();
assert_eq!(r.compressed, 0);
assert_eq!(r.summarized, 0);
assert_eq!(r.skipped, 0);
}
#[tokio::test]
async fn sweep_skips_when_no_candidates_old_enough() {
use std::sync::Arc;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use crate::store::SqliteStore;
let store = Arc::new(
SqliteStore::new(":memory:")
.await
.expect("SqliteStore::new"),
);
let provider = Arc::new(AnyProvider::Mock(MockProvider::default()));
let cid = store.create_conversation().await.expect("conversation");
store
.save_message(cid, "user", "hello")
.await
.expect("save_message");
let config = OpticalForgettingConfig {
enabled: true,
compress_after_turns: 100, summarize_after_turns: 500,
sweep_interval_secs: 3600,
sweep_batch_size: 50,
compress_provider: ProviderName::default(),
};
let result = run_optical_forgetting_sweep(&store, &provider, &config, 0.0)
.await
.expect("sweep");
assert_eq!(
result.compressed, 0,
"no message should be compressed when not old enough"
);
assert_eq!(result.summarized, 0);
}
#[tokio::test]
async fn sweep_compresses_eligible_full_message() {
use std::sync::Arc;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use crate::store::SqliteStore;
let store = Arc::new(
SqliteStore::new(":memory:")
.await
.expect("SqliteStore::new"),
);
let mock = MockProvider::with_responses(vec!["compressed summary".to_owned()]);
let provider = Arc::new(AnyProvider::Mock(mock));
let cid = store.create_conversation().await.expect("conversation");
store
.save_message(cid, "user", "first message")
.await
.expect("save_message 1");
store
.save_message(cid, "user", "second message")
.await
.expect("save_message 2");
let config = OpticalForgettingConfig {
enabled: true,
compress_after_turns: 0, summarize_after_turns: 500,
sweep_interval_secs: 3600,
sweep_batch_size: 50,
compress_provider: ProviderName::default(),
};
let result = run_optical_forgetting_sweep(&store, &provider, &config, 0.0)
.await
.expect("sweep");
assert!(
result.compressed >= 1,
"at least one message must be compressed"
);
}
#[tokio::test]
async fn sweep_disabled_returns_empty_result() {
use std::sync::Arc;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use crate::store::SqliteStore;
let store = Arc::new(
SqliteStore::new(":memory:")
.await
.expect("SqliteStore::new"),
);
let provider = Arc::new(AnyProvider::Mock(MockProvider::default()));
let config = OpticalForgettingConfig {
enabled: false,
..Default::default()
};
let result = run_optical_forgetting_sweep(&store, &provider, &config, 0.0)
.await
.expect("sweep with disabled config");
assert_eq!(result.compressed, 0);
assert_eq!(result.summarized, 0);
}
}