zeph_memory/compression/promotion.rs
1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Background skill-promotion engine (#3305).
5//!
6//! [`PromotionEngine`] scans a window of recent episodic messages for clustering
7//! patterns and promotes qualifying clusters to SKILL.md files on disk.
8//!
9//! # C1 fix — session provenance
10//!
11//! [`PromotionInput`] carries a `conversation_id` field that is absent from
12//! [`crate::facade::MemoryMatch`]. This allows the engine to enforce the
13//! `min_sessions` heuristic without touching the public recall API.
14//!
15//! # Dependency inversion
16//!
17//! To avoid a circular crate dependency (`zeph-memory` ↔ `zeph-skills`), skill
18//! generation is delegated to [`SkillWriter`], a trait that callers in
19//! `zeph-core` implement using `zeph_skills::generator::SkillGenerator`.
20//! This keeps `zeph-memory` free of a direct `zeph-skills` dependency.
21
22use std::collections::HashSet;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26use crate::error::MemoryError;
27use crate::types::{ConversationId, MessageId};
28
29// ── PromotionInput ────────────────────────────────────────────────────────────
30
31/// A single episodic message prepared for the promotion scan.
32///
33/// This type carries the `conversation_id` (session provenance) that ordinary
34/// [`crate::facade::MemoryMatch`] results do not expose, making it possible to
35/// enforce the [`PromotionConfig::min_sessions`] heuristic.
36#[derive(Debug, Clone)]
37pub struct PromotionInput {
38 /// Identifies the individual message for deduplication bookkeeping.
39 pub message_id: MessageId,
40 /// The session this message belongs to.
41 pub conversation_id: ConversationId,
42 /// Raw message content.
43 pub content: String,
44 /// Pre-computed embedding vector.
45 ///
46 /// When `None`, the scan will skip this row rather than re-embed inline on
47 /// the hot path — embedding is expensive and the promotion engine runs in the
48 /// background.
49 pub embedding: Option<Vec<f32>>,
50}
51
52// ── PromotionCandidate ────────────────────────────────────────────────────────
53
54/// A cluster of episodic messages that qualifies for promotion to a SKILL.md.
55#[derive(Debug, Clone)]
56pub struct PromotionCandidate {
57 /// A stable identifier derived from the cluster centroid (SHA-256 hex, truncated).
58 pub signature: String,
59 /// IDs of the messages in this cluster.
60 pub member_ids: Vec<MessageId>,
61 /// Distinct sessions that contributed at least one member to this cluster.
62 pub session_ids: Vec<ConversationId>,
63 /// Average embedding vector of cluster members (centroid).
64 pub centroid: Vec<f32>,
65}
66
67// ── PromotionConfig ───────────────────────────────────────────────────────────
68
69/// Configuration knobs for [`PromotionEngine`].
70///
71/// All thresholds have conservative defaults — they should be tuned based on
72/// real-world telemetry once the feature is in production.
73#[derive(Debug, Clone)]
74pub struct PromotionConfig {
75 /// Minimum number of cluster members to qualify for promotion. Default: `3`.
76 pub min_occurrences: u32,
77 /// Minimum number of distinct sessions represented in the cluster. Default: `2`.
78 pub min_sessions: u32,
79 /// Cosine similarity threshold for clustering. Messages with similarity ≥ this value
80 /// to a cluster's centroid are merged into that cluster. Default: `0.85`.
81 pub cluster_threshold: f32,
82}
83
84impl Default for PromotionConfig {
85 fn default() -> Self {
86 Self {
87 min_occurrences: 3,
88 min_sessions: 2,
89 cluster_threshold: 0.85,
90 }
91 }
92}
93
94// ── SkillWriter ───────────────────────────────────────────────────────────────
95
96/// Trait for writing a generated SKILL.md to disk.
97///
98/// Implemented in `zeph-core` using `zeph_skills::generator::SkillGenerator`.
99/// Defined here to avoid a circular crate dependency.
100///
101/// # Contract
102///
103/// Implementors must:
104/// - Generate a valid SKILL.md from `description`.
105/// - Apply any configured evaluator gate before writing.
106/// - Return `Ok(())` on success or evaluator rejection (rejection is not an error).
107/// - Return `Err` only on hard failures (LLM error, I/O error).
108pub trait SkillWriter: Send + Sync {
109 /// Generate and persist a SKILL.md from `description`.
110 ///
111 /// `signature` is used as an idempotency key — callers should ensure the skill
112 /// file does not already exist before calling this method.
113 ///
114 /// # Errors
115 ///
116 /// Returns an error string on generation or I/O failure.
117 fn write_skill(
118 &self,
119 description: String,
120 signature: String,
121 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>;
122}
123
124// ── PromotionEngine ───────────────────────────────────────────────────────────
125
126/// Background engine that scans episodic memory and promotes recurring patterns to skills.
127///
128/// Runs off the hot path, typically queued to a `JoinSet` at turn boundary.
129///
130/// # Examples
131///
132/// ```rust,no_run
133/// use std::path::PathBuf;
134/// use std::sync::Arc;
135/// use zeph_memory::compression::promotion::{PromotionEngine, PromotionConfig};
136///
137/// # struct MockWriter;
138/// # impl zeph_memory::compression::promotion::SkillWriter for MockWriter {
139/// # fn write_skill(&self, _d: String, _s: String)
140/// # -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>
141/// # { Box::pin(async { Ok(()) }) }
142/// # }
143/// let engine = PromotionEngine::new(
144/// Arc::new(MockWriter),
145/// PromotionConfig::default(),
146/// PathBuf::from("/tmp/skills"),
147/// );
148/// ```
149pub struct PromotionEngine {
150 writer: Arc<dyn SkillWriter>,
151 config: PromotionConfig,
152 output_dir: PathBuf,
153}
154
155impl PromotionEngine {
156 /// Create a new promotion engine.
157 ///
158 /// `writer` is injected from `zeph-core` and encapsulates `SkillGenerator` +
159 /// optional `SkillEvaluator`. `output_dir` is where SKILL.md directories are created.
160 #[must_use]
161 pub fn new(writer: Arc<dyn SkillWriter>, config: PromotionConfig, output_dir: PathBuf) -> Self {
162 Self {
163 writer,
164 config,
165 output_dir,
166 }
167 }
168
169 /// Scan a recent-episodic window and return clusters that qualify for promotion.
170 ///
171 /// Clustering is greedy: each message is assigned to the first cluster whose centroid
172 /// has cosine similarity ≥ `config.cluster_threshold`; if no cluster matches, a new
173 /// cluster is created. A cluster qualifies when both `min_occurrences` and
174 /// `min_sessions` are satisfied.
175 ///
176 /// Messages without embeddings (`embedding == None`) are silently skipped.
177 ///
178 /// # Panics
179 ///
180 /// Does not panic in practice — the `unwrap` on `embedding` is guarded by the
181 /// `filter(|p| p.embedding.is_some())` step immediately above.
182 ///
183 /// # Errors
184 ///
185 /// Returns [`MemoryError::Promotion`] if embeddings have inconsistent dimensions.
186 #[tracing::instrument(name = "memory.compression.promote.scan", skip_all,
187 fields(window_len = window.len()))]
188 pub async fn scan(
189 &self,
190 window: &[PromotionInput],
191 ) -> Result<Vec<PromotionCandidate>, MemoryError> {
192 // Filter to messages that have embeddings.
193 let embeds: Vec<&PromotionInput> =
194 window.iter().filter(|p| p.embedding.is_some()).collect();
195
196 if embeds.is_empty() {
197 return Ok(vec![]);
198 }
199
200 // Safety: filtered to `is_some()` above.
201 let dim = embeds[0].embedding.as_ref().unwrap().len();
202
203 // Greedy centroid clustering.
204 struct Cluster {
205 centroid: Vec<f32>,
206 member_ids: Vec<MessageId>,
207 session_ids: HashSet<ConversationId>,
208 }
209
210 let mut clusters: Vec<Cluster> = Vec::new();
211
212 for input in &embeds {
213 let emb = input.embedding.as_ref().unwrap();
214 if emb.len() != dim {
215 return Err(MemoryError::Promotion(format!(
216 "embedding dimension mismatch: expected {dim}, got {}",
217 emb.len()
218 )));
219 }
220
221 // Find the first cluster within the similarity threshold.
222 let mut assigned = false;
223 for cluster in &mut clusters {
224 let sim = cosine_similarity(emb, &cluster.centroid);
225 if sim >= self.config.cluster_threshold {
226 // Update centroid (running average).
227 #[allow(clippy::cast_precision_loss)]
228 let n = cluster.member_ids.len() as f32;
229 for (c, v) in cluster.centroid.iter_mut().zip(emb.iter()) {
230 *c = (*c * n + v) / (n + 1.0);
231 }
232 cluster.member_ids.push(input.message_id);
233 cluster.session_ids.insert(input.conversation_id);
234 assigned = true;
235 break;
236 }
237 }
238 if !assigned {
239 clusters.push(Cluster {
240 centroid: emb.clone(),
241 member_ids: vec![input.message_id],
242 session_ids: std::iter::once(input.conversation_id).collect(),
243 });
244 }
245 }
246
247 // Filter clusters that meet both thresholds.
248 let candidates = clusters
249 .into_iter()
250 .filter(|c| {
251 u32::try_from(c.member_ids.len()).unwrap_or(u32::MAX) >= self.config.min_occurrences
252 && u32::try_from(c.session_ids.len()).unwrap_or(u32::MAX)
253 >= self.config.min_sessions
254 })
255 .map(|c| {
256 let signature = cluster_signature(&c.centroid);
257 PromotionCandidate {
258 signature,
259 member_ids: c.member_ids,
260 session_ids: c.session_ids.into_iter().collect(),
261 centroid: c.centroid,
262 }
263 })
264 .collect();
265
266 Ok(candidates)
267 }
268
269 /// Generate and persist a SKILL.md for `candidate`. Idempotent by signature.
270 ///
271 /// On evaluator rejection the method returns `Ok(())` — rejection is a normal outcome.
272 ///
273 /// # Errors
274 ///
275 /// Returns [`MemoryError::Promotion`] on generation, evaluator, or disk-write failure.
276 #[tracing::instrument(name = "memory.compression.promote.persist", skip_all,
277 fields(signature = %candidate.signature))]
278 pub async fn promote(&self, candidate: &PromotionCandidate) -> Result<(), MemoryError> {
279 // Idempotency: skip if already exists.
280 let skill_name = format!("promoted-pattern-{}", &candidate.signature[..12]);
281 let skill_dir = self.output_dir.join(&skill_name);
282 if skill_dir.exists() {
283 tracing::debug!(signature = %candidate.signature, "promotion candidate already exists, skipping");
284 return Ok(());
285 }
286
287 let member_count = candidate.member_ids.len();
288 let session_count = candidate.session_ids.len();
289 let description = format!(
290 "Recurring procedural pattern detected across {member_count} messages in \
291 {session_count} sessions. Generate a concise SKILL.md capturing the common \
292 tool-use pattern or workflow. Signature: {}.",
293 candidate.signature
294 );
295
296 self.writer
297 .write_skill(description, candidate.signature.clone())
298 .await
299 .map_err(MemoryError::Promotion)
300 }
301}
302
303// ── Helper functions ──────────────────────────────────────────────────────────
304
305/// Compute cosine similarity between two equal-length vectors.
306/// Returns `0.0` when either vector is zero-length or the norm is zero.
307fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
308 debug_assert_eq!(a.len(), b.len());
309 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
310 let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
311 let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
312 if norm_a < f32::EPSILON || norm_b < f32::EPSILON {
313 return 0.0;
314 }
315 (dot / (norm_a * norm_b)).clamp(-1.0, 1.0)
316}
317
318/// Derive a stable signature from a centroid vector using SHA-256 hex.
319fn cluster_signature(centroid: &[f32]) -> String {
320 use std::hash::Hash;
321 // Use a simple FNV-like hash of the quantised centroid to avoid
322 // a heavy crypto dependency — this is a deduplication key, not a security hash.
323 let mut hasher = std::collections::hash_map::DefaultHasher::new();
324 for v in centroid {
325 let bits = v.to_bits();
326 bits.hash(&mut hasher);
327 }
328 let h = std::hash::Hasher::finish(&hasher);
329 format!("{h:016x}")
330}
331
332// ── Tests ─────────────────────────────────────────────────────────────────────
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use std::sync::Mutex;
338
339 struct RecordingWriter {
340 written: Mutex<Vec<String>>,
341 }
342
343 impl SkillWriter for RecordingWriter {
344 fn write_skill(
345 &self,
346 description: String,
347 _signature: String,
348 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>
349 {
350 self.written.lock().unwrap().push(description);
351 Box::pin(async { Ok(()) })
352 }
353 }
354
355 fn make_input(id: i64, cid: i64, content: &str, emb: Vec<f32>) -> PromotionInput {
356 PromotionInput {
357 message_id: MessageId(id),
358 conversation_id: ConversationId(cid),
359 content: content.to_string(),
360 embedding: Some(emb),
361 }
362 }
363
364 fn unit_vec(n: usize, val: f32) -> Vec<f32> {
365 let mut v = vec![0.0_f32; n];
366 v[0] = val;
367 // Normalise to unit length.
368 let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
369 for x in &mut v {
370 *x /= norm;
371 }
372 v
373 }
374
375 #[test]
376 fn cosine_similarity_identical() {
377 let v = vec![1.0_f32, 0.0, 0.0];
378 assert!((cosine_similarity(&v, &v) - 1.0).abs() < 1e-6);
379 }
380
381 #[test]
382 fn cosine_similarity_orthogonal() {
383 let a = vec![1.0_f32, 0.0];
384 let b = vec![0.0_f32, 1.0];
385 assert!((cosine_similarity(&a, &b) - 0.0).abs() < 1e-6);
386 }
387
388 #[tokio::test]
389 async fn scan_returns_empty_for_no_embeddings() {
390 let writer = Arc::new(RecordingWriter {
391 written: Mutex::new(vec![]),
392 });
393 let engine =
394 PromotionEngine::new(writer, PromotionConfig::default(), PathBuf::from("/tmp"));
395 let window = vec![PromotionInput {
396 message_id: MessageId(1),
397 conversation_id: ConversationId(1),
398 content: "hello".into(),
399 embedding: None,
400 }];
401 let candidates = engine.scan(&window).await.unwrap();
402 assert!(candidates.is_empty());
403 }
404
405 #[tokio::test]
406 async fn scan_qualifies_cluster_meeting_thresholds() {
407 let writer = Arc::new(RecordingWriter {
408 written: Mutex::new(vec![]),
409 });
410 let config = PromotionConfig {
411 min_occurrences: 3,
412 min_sessions: 2,
413 cluster_threshold: 0.90,
414 };
415 let engine = PromotionEngine::new(writer, config, PathBuf::from("/tmp"));
416
417 // 4 nearly identical vectors from 3 distinct sessions.
418 let base = unit_vec(4, 1.0);
419 let window = vec![
420 make_input(1, 1, "a", base.clone()),
421 make_input(2, 1, "b", base.clone()),
422 make_input(3, 2, "c", base.clone()),
423 make_input(4, 3, "d", base.clone()),
424 ];
425 let candidates = engine.scan(&window).await.unwrap();
426 assert_eq!(candidates.len(), 1, "expected 1 qualifying cluster");
427 let c = &candidates[0];
428 assert_eq!(c.member_ids.len(), 4);
429 assert_eq!(c.session_ids.len(), 3);
430 }
431
432 #[tokio::test]
433 async fn scan_rejects_cluster_below_min_sessions() {
434 let writer = Arc::new(RecordingWriter {
435 written: Mutex::new(vec![]),
436 });
437 let config = PromotionConfig {
438 min_occurrences: 3,
439 min_sessions: 2,
440 cluster_threshold: 0.90,
441 };
442 let engine = PromotionEngine::new(writer, config, PathBuf::from("/tmp"));
443
444 // 4 messages but all from the same session.
445 let base = unit_vec(4, 1.0);
446 let window = (1..=4)
447 .map(|i| make_input(i, 1, "x", base.clone()))
448 .collect::<Vec<_>>();
449 let candidates = engine.scan(&window).await.unwrap();
450 assert!(
451 candidates.is_empty(),
452 "should reject cluster with only 1 session"
453 );
454 }
455
456 #[tokio::test]
457 async fn scan_errors_on_dimension_mismatch() {
458 let writer = Arc::new(RecordingWriter {
459 written: Mutex::new(vec![]),
460 });
461 let engine =
462 PromotionEngine::new(writer, PromotionConfig::default(), PathBuf::from("/tmp"));
463
464 let window = vec![
465 make_input(1, 1, "a", vec![1.0, 0.0, 0.0]),
466 make_input(2, 2, "b", vec![0.0, 1.0]), // wrong dimension
467 ];
468 let result = engine.scan(&window).await;
469 assert!(result.is_err(), "expected error on dimension mismatch");
470 }
471}