zeph_memory/compression/promotion.rs
1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Background skill-promotion engine (#3305).
5//!
6//! [`PromotionEngine`] scans a window of recent episodic messages for clustering
7//! patterns and promotes qualifying clusters to SKILL.md files on disk.
8//!
9//! # C1 fix — session provenance
10//!
11//! [`PromotionInput`] carries a `conversation_id` field that is absent from
12//! [`crate::facade::MemoryMatch`]. This allows the engine to enforce the
13//! `min_sessions` heuristic without touching the public recall API.
14//!
15//! # Dependency inversion
16//!
17//! To avoid a circular crate dependency (`zeph-memory` ↔ `zeph-skills`), skill
18//! generation is delegated to [`SkillWriter`], a trait that callers in
19//! `zeph-core` implement using `zeph_skills::generator::SkillGenerator`.
20//! This keeps `zeph-memory` free of a direct `zeph-skills` dependency.
21
22use std::collections::HashSet;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26use crate::error::MemoryError;
27use crate::types::{ConversationId, MessageId};
28
29// ── PromotionInput ────────────────────────────────────────────────────────────
30
31/// A single episodic message prepared for the promotion scan.
32///
33/// This type carries the `conversation_id` (session provenance) that ordinary
34/// [`crate::facade::MemoryMatch`] results do not expose, making it possible to
35/// enforce the [`PromotionConfig::min_sessions`] heuristic.
36#[derive(Debug, Clone)]
37pub struct PromotionInput {
38 /// Identifies the individual message for deduplication bookkeeping.
39 pub message_id: MessageId,
40 /// The session this message belongs to.
41 pub conversation_id: ConversationId,
42 /// Raw message content.
43 pub content: String,
44 /// Pre-computed embedding vector.
45 ///
46 /// When `None`, the scan will skip this row rather than re-embed inline on
47 /// the hot path — embedding is expensive and the promotion engine runs in the
48 /// background.
49 pub embedding: Option<Vec<f32>>,
50}
51
52// ── PromotionCandidate ────────────────────────────────────────────────────────
53
54/// A cluster of episodic messages that qualifies for promotion to a SKILL.md.
55#[derive(Debug, Clone)]
56pub struct PromotionCandidate {
57 /// A stable identifier derived from the cluster centroid (SHA-256 hex, truncated).
58 pub signature: String,
59 /// IDs of the messages in this cluster.
60 pub member_ids: Vec<MessageId>,
61 /// Distinct sessions that contributed at least one member to this cluster.
62 pub session_ids: Vec<ConversationId>,
63 /// Average embedding vector of cluster members (centroid).
64 pub centroid: Vec<f32>,
65}
66
67// ── PromotionConfig ───────────────────────────────────────────────────────────
68
69/// Configuration knobs for [`PromotionEngine`].
70///
71/// All thresholds have conservative defaults — they should be tuned based on
72/// real-world telemetry once the feature is in production.
73#[derive(Debug, Clone)]
74pub struct PromotionConfig {
75 /// Minimum number of cluster members to qualify for promotion. Default: `3`.
76 pub min_occurrences: u32,
77 /// Minimum number of distinct sessions represented in the cluster. Default: `2`.
78 pub min_sessions: u32,
79 /// Cosine similarity threshold for clustering. Messages with similarity ≥ this value
80 /// to a cluster's centroid are merged into that cluster. Default: `0.85`.
81 pub cluster_threshold: f32,
82}
83
84impl Default for PromotionConfig {
85 fn default() -> Self {
86 Self {
87 min_occurrences: 3,
88 min_sessions: 2,
89 cluster_threshold: 0.85,
90 }
91 }
92}
93
94// ── SkillWriter ───────────────────────────────────────────────────────────────
95
96/// Trait for writing a generated SKILL.md to disk.
97///
98/// Implemented in `zeph-core` using `zeph_skills::generator::SkillGenerator`.
99/// Defined here to avoid a circular crate dependency.
100///
101/// # Contract
102///
103/// Implementors must:
104/// - Generate a valid SKILL.md from `description`.
105/// - Apply any configured evaluator gate before writing.
106/// - Return `Ok(())` on success or evaluator rejection (rejection is not an error).
107/// - Return `Err` only on hard failures (LLM error, I/O error).
108pub trait SkillWriter: Send + Sync {
109 /// Generate and persist a SKILL.md from `description`.
110 ///
111 /// `signature` is used as an idempotency key — callers should ensure the skill
112 /// file does not already exist before calling this method.
113 ///
114 /// # Errors
115 ///
116 /// Returns an error string on generation or I/O failure.
117 fn write_skill(
118 &self,
119 description: String,
120 signature: String,
121 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>;
122}
123
124// ── PromotionEngine ───────────────────────────────────────────────────────────
125
126/// Background engine that scans episodic memory and promotes recurring patterns to skills.
127///
128/// Runs off the hot path, typically queued to a `JoinSet` at turn boundary.
129///
130/// # Examples
131///
132/// ```rust,no_run
133/// use std::path::PathBuf;
134/// use std::sync::Arc;
135/// use zeph_memory::compression::promotion::{PromotionEngine, PromotionConfig};
136///
137/// # struct MockWriter;
138/// # impl zeph_memory::compression::promotion::SkillWriter for MockWriter {
139/// # fn write_skill(&self, _d: String, _s: String)
140/// # -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>
141/// # { Box::pin(async { Ok(()) }) }
142/// # }
143/// let engine = PromotionEngine::new(
144/// Arc::new(MockWriter),
145/// PromotionConfig::default(),
146/// PathBuf::from("/tmp/skills"),
147/// );
148/// ```
149pub struct PromotionEngine {
150 writer: Arc<dyn SkillWriter>,
151 config: PromotionConfig,
152 output_dir: PathBuf,
153}
154
155impl PromotionEngine {
156 /// Create a new promotion engine.
157 ///
158 /// `writer` is injected from `zeph-core` and encapsulates `SkillGenerator` +
159 /// optional `SkillEvaluator`. `output_dir` is where SKILL.md directories are created.
160 #[must_use]
161 pub fn new(writer: Arc<dyn SkillWriter>, config: PromotionConfig, output_dir: PathBuf) -> Self {
162 Self {
163 writer,
164 config,
165 output_dir,
166 }
167 }
168
169 /// Scan a recent-episodic window and return clusters that qualify for promotion.
170 ///
171 /// Clustering is greedy: each message is assigned to the first cluster whose centroid
172 /// has cosine similarity ≥ `config.cluster_threshold`; if no cluster matches, a new
173 /// cluster is created. A cluster qualifies when both `min_occurrences` and
174 /// `min_sessions` are satisfied.
175 ///
176 /// Messages without embeddings (`embedding == None`) are silently skipped.
177 ///
178 /// # Panics
179 ///
180 /// Does not panic in practice — the `unwrap` on `embedding` is guarded by the
181 /// `filter(|p| p.embedding.is_some())` step immediately above.
182 ///
183 /// # Errors
184 ///
185 /// Returns [`MemoryError::Promotion`] if embeddings have inconsistent dimensions.
186 #[tracing::instrument(name = "memory.compression.promote.scan", skip_all,
187 fields(window_len = window.len()))]
188 pub async fn scan(
189 &self,
190 window: &[PromotionInput],
191 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
192 // Filter to messages that have embeddings.
193 let embeds: Vec<&PromotionInput> =
194 window.iter().filter(|p| p.embedding.is_some()).collect();
195
196 if embeds.is_empty() {
197 return Ok(vec![]);
198 }
199
200 // Safety: filtered to `is_some()` above.
201 let dim = embeds[0].embedding.as_ref().unwrap().len();
202
203 // Greedy centroid clustering.
204 struct Cluster {
205 centroid: Vec<f32>,
206 member_ids: Vec<MessageId>,
207 session_ids: HashSet<ConversationId>,
208 }
209
210 let mut clusters: Vec<Cluster> = Vec::new();
211
212 for input in &embeds {
213 let emb = input.embedding.as_ref().unwrap();
214 if emb.len() != dim {
215 return Err(MemoryError::Promotion(format!(
216 "embedding dimension mismatch: expected {dim}, got {}",
217 emb.len()
218 )));
219 }
220
221 // Find the first cluster within the similarity threshold.
222 let mut assigned = false;
223 for cluster in &mut clusters {
224 let sim = cosine_similarity(emb, &cluster.centroid);
225 if sim >= self.config.cluster_threshold {
226 // Update centroid (running average).
227 #[allow(clippy::cast_precision_loss)]
228 let n = cluster.member_ids.len() as f32;
229 for (c, v) in cluster.centroid.iter_mut().zip(emb.iter()) {
230 *c = (*c * n + v) / (n + 1.0);
231 }
232 cluster.member_ids.push(input.message_id);
233 cluster.session_ids.insert(input.conversation_id);
234 assigned = true;
235 break;
236 }
237 }
238 if !assigned {
239 clusters.push(Cluster {
240 centroid: emb.clone(),
241 member_ids: vec![input.message_id],
242 session_ids: std::iter::once(input.conversation_id).collect(),
243 });
244 }
245 }
246
247 // Filter clusters that meet both thresholds.
248 let candidates = clusters
249 .into_iter()
250 .filter(|c| {
251 u32::try_from(c.member_ids.len()).unwrap_or(u32::MAX) >= self.config.min_occurrences
252 && u32::try_from(c.session_ids.len()).unwrap_or(u32::MAX)
253 >= self.config.min_sessions
254 })
255 .map(|c| {
256 let signature = cluster_signature(&c.centroid);
257 PromotionCandidate {
258 signature,
259 member_ids: c.member_ids,
260 session_ids: c.session_ids.into_iter().collect(),
261 centroid: c.centroid,
262 }
263 })
264 .collect();
265
266 Ok(candidates)
267 }
268
269 /// Generate and persist a SKILL.md for `candidate`. Idempotent by signature.
270 ///
271 /// On evaluator rejection the method returns `Ok(())` — rejection is a normal outcome.
272 ///
273 /// # Errors
274 ///
275 /// Returns [`MemoryError::Promotion`] on generation, evaluator, or disk-write failure.
276 #[tracing::instrument(name = "memory.compression.promote.persist", skip_all,
277 fields(signature = %candidate.signature))]
278 pub async fn promote(&self, candidate: &PromotionCandidate) -> Result<(), MemoryError> {
279 // Idempotency: skip if already exists.
280 let skill_name = format!("promoted-pattern-{}", &candidate.signature[..12]);
281 let skill_dir = self.output_dir.join(&skill_name);
282 if skill_dir.exists() {
283 tracing::debug!(signature = %candidate.signature, "promotion candidate already exists, skipping");
284 return Ok(());
285 }
286
287 let member_count = candidate.member_ids.len();
288 let session_count = candidate.session_ids.len();
289 let description = format!(
290 "Recurring procedural pattern detected across {member_count} messages in \
291 {session_count} sessions. Generate a concise SKILL.md capturing the common \
292 tool-use pattern or workflow. Signature: {}.",
293 candidate.signature
294 );
295
296 self.writer
297 .write_skill(description, candidate.signature.clone())
298 .await
299 .map_err(MemoryError::Promotion)
300 }
301}
302
303// ── Helper functions ──────────────────────────────────────────────────────────
304
305/// Compute cosine similarity between two equal-length vectors.
306/// Returns `0.0` when either vector is zero-length or the norm is zero.
307fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
308 debug_assert_eq!(a.len(), b.len());
309 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
310 let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
311 let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
312 if norm_a < f32::EPSILON || norm_b < f32::EPSILON {
313 return 0.0;
314 }
315 (dot / (norm_a * norm_b)).clamp(-1.0, 1.0)
316}
317
318/// Derive a stable signature from a centroid vector using SHA-256 hex.
319fn cluster_signature(centroid: &[f32]) -> String {
320 use std::hash::Hash;
321 // Use a simple FNV-like hash of the quantised centroid to avoid
322 // a heavy crypto dependency — this is a deduplication key, not a security hash.
323 let mut hasher = std::collections::hash_map::DefaultHasher::new();
324 for v in centroid {
325 let bits = v.to_bits();
326 bits.hash(&mut hasher);
327 }
328 let h = std::hash::Hasher::finish(&hasher);
329 format!("{h:016x}")
330}
331
332// ── Tests ─────────────────────────────────────────────────────────────────────
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use std::sync::Mutex;
338
339 struct RecordingWriter {
340 written: Mutex<Vec<String>>,
341 }
342
343 impl SkillWriter for RecordingWriter {
344 fn write_skill(
345 &self,
346 description: String,
347 _signature: String,
348 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>
349 {
350 self.written.lock().unwrap().push(description);
351 Box::pin(async { Ok(()) })
352 }
353 }
354
355 fn make_input(id: i64, cid: i64, content: &str, emb: Vec<f32>) -> PromotionInput {
356 PromotionInput {
357 message_id: MessageId(id),
358 conversation_id: ConversationId(cid),
359 content: content.to_string(),
360 embedding: Some(emb),
361 }
362 }
363
364 fn unit_vec(n: usize, val: f32) -> Vec<f32> {
365 let mut v = vec![0.0_f32; n];
366 v[0] = val;
367 // Normalise to unit length.
368 let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
369 v.iter_mut().for_each(|x| *x /= norm);
370 v
371 }
372
373 #[test]
374 fn cosine_similarity_identical() {
375 let v = vec![1.0_f32, 0.0, 0.0];
376 assert!((cosine_similarity(&v, &v) - 1.0).abs() < 1e-6);
377 }
378
379 #[test]
380 fn cosine_similarity_orthogonal() {
381 let a = vec![1.0_f32, 0.0];
382 let b = vec![0.0_f32, 1.0];
383 assert!((cosine_similarity(&a, &b) - 0.0).abs() < 1e-6);
384 }
385
386 #[tokio::test]
387 async fn scan_returns_empty_for_no_embeddings() {
388 let writer = Arc::new(RecordingWriter {
389 written: Mutex::new(vec![]),
390 });
391 let engine =
392 PromotionEngine::new(writer, PromotionConfig::default(), PathBuf::from("/tmp"));
393 let window = vec![PromotionInput {
394 message_id: MessageId(1),
395 conversation_id: ConversationId(1),
396 content: "hello".into(),
397 embedding: None,
398 }];
399 let candidates = engine.scan(&window).await.unwrap();
400 assert!(candidates.is_empty());
401 }
402
403 #[tokio::test]
404 async fn scan_qualifies_cluster_meeting_thresholds() {
405 let writer = Arc::new(RecordingWriter {
406 written: Mutex::new(vec![]),
407 });
408 let config = PromotionConfig {
409 min_occurrences: 3,
410 min_sessions: 2,
411 cluster_threshold: 0.90,
412 };
413 let engine = PromotionEngine::new(writer, config, PathBuf::from("/tmp"));
414
415 // 4 nearly identical vectors from 3 distinct sessions.
416 let base = unit_vec(4, 1.0);
417 let window = vec![
418 make_input(1, 1, "a", base.clone()),
419 make_input(2, 1, "b", base.clone()),
420 make_input(3, 2, "c", base.clone()),
421 make_input(4, 3, "d", base.clone()),
422 ];
423 let candidates = engine.scan(&window).await.unwrap();
424 assert_eq!(candidates.len(), 1, "expected 1 qualifying cluster");
425 let c = &candidates[0];
426 assert_eq!(c.member_ids.len(), 4);
427 assert_eq!(c.session_ids.len(), 3);
428 }
429
430 #[tokio::test]
431 async fn scan_rejects_cluster_below_min_sessions() {
432 let writer = Arc::new(RecordingWriter {
433 written: Mutex::new(vec![]),
434 });
435 let config = PromotionConfig {
436 min_occurrences: 3,
437 min_sessions: 2,
438 cluster_threshold: 0.90,
439 };
440 let engine = PromotionEngine::new(writer, config, PathBuf::from("/tmp"));
441
442 // 4 messages but all from the same session.
443 let base = unit_vec(4, 1.0);
444 let window = (1..=4)
445 .map(|i| make_input(i, 1, "x", base.clone()))
446 .collect::<Vec<_>>();
447 let candidates = engine.scan(&window).await.unwrap();
448 assert!(
449 candidates.is_empty(),
450 "should reject cluster with only 1 session"
451 );
452 }
453
454 #[tokio::test]
455 async fn scan_errors_on_dimension_mismatch() {
456 let writer = Arc::new(RecordingWriter {
457 written: Mutex::new(vec![]),
458 });
459 let engine =
460 PromotionEngine::new(writer, PromotionConfig::default(), PathBuf::from("/tmp"));
461
462 let window = vec![
463 make_input(1, 1, "a", vec![1.0, 0.0, 0.0]),
464 make_input(2, 2, "b", vec![0.0, 1.0]), // wrong dimension
465 ];
466 let result = engine.scan(&window).await;
467 assert!(result.is_err(), "expected error on dimension mismatch");
468 }
469}