Skip to main content

zeph_memory/compression/
promotion.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Background skill-promotion engine (#3305).
5//!
6//! [`PromotionEngine`] scans a window of recent episodic messages for clustering
7//! patterns and promotes qualifying clusters to SKILL.md files on disk.
8//!
9//! # C1 fix — session provenance
10//!
11//! [`PromotionInput`] carries a `conversation_id` field that is absent from
12//! [`crate::facade::MemoryMatch`].  This allows the engine to enforce the
13//! `min_sessions` heuristic without touching the public recall API.
14//!
15//! # Dependency inversion
16//!
17//! To avoid a circular crate dependency (`zeph-memory` ↔ `zeph-skills`), skill
18//! generation is delegated to [`SkillWriter`], a trait that callers in
19//! `zeph-core` implement using `zeph_skills::generator::SkillGenerator`.
20//! This keeps `zeph-memory` free of a direct `zeph-skills` dependency.
21
22use std::collections::HashSet;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26use crate::error::MemoryError;
27use crate::types::{ConversationId, MessageId};
28
29// ── PromotionInput ────────────────────────────────────────────────────────────
30
31/// A single episodic message prepared for the promotion scan.
32///
33/// This type carries the `conversation_id` (session provenance) that ordinary
34/// [`crate::facade::MemoryMatch`] results do not expose, making it possible to
35/// enforce the [`PromotionConfig::min_sessions`] heuristic.
36#[derive(Debug, Clone)]
37pub struct PromotionInput {
38    /// Identifies the individual message for deduplication bookkeeping.
39    pub message_id: MessageId,
40    /// The session this message belongs to.
41    pub conversation_id: ConversationId,
42    /// Raw message content.
43    pub content: String,
44    /// Pre-computed embedding vector.
45    ///
46    /// When `None`, the scan will skip this row rather than re-embed inline on
47    /// the hot path — embedding is expensive and the promotion engine runs in the
48    /// background.
49    pub embedding: Option<Vec<f32>>,
50}
51
52// ── PromotionCandidate ────────────────────────────────────────────────────────
53
54/// A cluster of episodic messages that qualifies for promotion to a SKILL.md.
55#[derive(Debug, Clone)]
56pub struct PromotionCandidate {
57    /// A stable identifier derived from the cluster centroid (SHA-256 hex, truncated).
58    pub signature: String,
59    /// IDs of the messages in this cluster.
60    pub member_ids: Vec<MessageId>,
61    /// Distinct sessions that contributed at least one member to this cluster.
62    pub session_ids: Vec<ConversationId>,
63    /// Average embedding vector of cluster members (centroid).
64    pub centroid: Vec<f32>,
65}
66
67// ── PromotionConfig ───────────────────────────────────────────────────────────
68
69/// Configuration knobs for [`PromotionEngine`].
70///
71/// All thresholds have conservative defaults — they should be tuned based on
72/// real-world telemetry once the feature is in production.
73#[derive(Debug, Clone)]
74pub struct PromotionConfig {
75    /// Minimum number of cluster members to qualify for promotion. Default: `3`.
76    pub min_occurrences: u32,
77    /// Minimum number of distinct sessions represented in the cluster. Default: `2`.
78    pub min_sessions: u32,
79    /// Cosine similarity threshold for clustering. Messages with similarity ≥ this value
80    /// to a cluster's centroid are merged into that cluster. Default: `0.85`.
81    pub cluster_threshold: f32,
82}
83
84impl Default for PromotionConfig {
85    fn default() -> Self {
86        Self {
87            min_occurrences: 3,
88            min_sessions: 2,
89            cluster_threshold: 0.85,
90        }
91    }
92}
93
94// ── SkillWriter ───────────────────────────────────────────────────────────────
95
96/// Trait for writing a generated SKILL.md to disk.
97///
98/// Implemented in `zeph-core` using `zeph_skills::generator::SkillGenerator`.
99/// Defined here to avoid a circular crate dependency.
100///
101/// # Contract
102///
103/// Implementors must:
104/// - Generate a valid SKILL.md from `description`.
105/// - Apply any configured evaluator gate before writing.
106/// - Return `Ok(())` on success or evaluator rejection (rejection is not an error).
107/// - Return `Err` only on hard failures (LLM error, I/O error).
108pub trait SkillWriter: Send + Sync {
109    /// Generate and persist a SKILL.md from `description`.
110    ///
111    /// `signature` is used as an idempotency key — callers should ensure the skill
112    /// file does not already exist before calling this method.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error string on generation or I/O failure.
117    fn write_skill(
118        &self,
119        description: String,
120        signature: String,
121    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>;
122}
123
124// ── PromotionEngine ───────────────────────────────────────────────────────────
125
126/// Background engine that scans episodic memory and promotes recurring patterns to skills.
127///
128/// Runs off the hot path, typically queued to a `JoinSet` at turn boundary.
129///
130/// # Examples
131///
132/// ```rust,no_run
133/// use std::path::PathBuf;
134/// use std::sync::Arc;
135/// use zeph_memory::compression::promotion::{PromotionEngine, PromotionConfig};
136///
137/// # struct MockWriter;
138/// # impl zeph_memory::compression::promotion::SkillWriter for MockWriter {
139/// #   fn write_skill(&self, _d: String, _s: String)
140/// #     -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>
141/// #   { Box::pin(async { Ok(()) }) }
142/// # }
143/// let engine = PromotionEngine::new(
144///     Arc::new(MockWriter),
145///     PromotionConfig::default(),
146///     PathBuf::from("/tmp/skills"),
147/// );
148/// ```
149pub struct PromotionEngine {
150    writer: Arc<dyn SkillWriter>,
151    config: PromotionConfig,
152    output_dir: PathBuf,
153}
154
155impl PromotionEngine {
156    /// Create a new promotion engine.
157    ///
158    /// `writer` is injected from `zeph-core` and encapsulates `SkillGenerator` +
159    /// optional `SkillEvaluator`. `output_dir` is where SKILL.md directories are created.
160    #[must_use]
161    pub fn new(writer: Arc<dyn SkillWriter>, config: PromotionConfig, output_dir: PathBuf) -> Self {
162        Self {
163            writer,
164            config,
165            output_dir,
166        }
167    }
168
169    /// Scan a recent-episodic window and return clusters that qualify for promotion.
170    ///
171    /// Clustering is greedy: each message is assigned to the first cluster whose centroid
172    /// has cosine similarity ≥ `config.cluster_threshold`; if no cluster matches, a new
173    /// cluster is created. A cluster qualifies when both `min_occurrences` and
174    /// `min_sessions` are satisfied.
175    ///
176    /// Messages without embeddings (`embedding == None`) are silently skipped.
177    ///
178    /// # Panics
179    ///
180    /// Does not panic in practice — the `unwrap` on `embedding` is guarded by the
181    /// `filter(|p| p.embedding.is_some())` step immediately above.
182    ///
183    /// # Errors
184    ///
185    /// Returns [`MemoryError::Promotion`] if embeddings have inconsistent dimensions.
186    #[tracing::instrument(name = "memory.compression.promote.scan", skip_all,
187                          fields(window_len = window.len()))]
188    pub async fn scan(
189        &self,
190        window: &[PromotionInput],
191    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
192        // Filter to messages that have embeddings.
193        let embeds: Vec<&PromotionInput> =
194            window.iter().filter(|p| p.embedding.is_some()).collect();
195
196        if embeds.is_empty() {
197            return Ok(vec![]);
198        }
199
200        // Safety: filtered to `is_some()` above.
201        let dim = embeds[0].embedding.as_ref().unwrap().len();
202
203        // Greedy centroid clustering.
204        struct Cluster {
205            centroid: Vec<f32>,
206            member_ids: Vec<MessageId>,
207            session_ids: HashSet<ConversationId>,
208        }
209
210        let mut clusters: Vec<Cluster> = Vec::new();
211
212        for input in &embeds {
213            let emb = input.embedding.as_ref().unwrap();
214            if emb.len() != dim {
215                return Err(MemoryError::Promotion(format!(
216                    "embedding dimension mismatch: expected {dim}, got {}",
217                    emb.len()
218                )));
219            }
220
221            // Find the first cluster within the similarity threshold.
222            let mut assigned = false;
223            for cluster in &mut clusters {
224                let sim = cosine_similarity(emb, &cluster.centroid);
225                if sim >= self.config.cluster_threshold {
226                    // Update centroid (running average).
227                    #[allow(clippy::cast_precision_loss)]
228                    let n = cluster.member_ids.len() as f32;
229                    for (c, v) in cluster.centroid.iter_mut().zip(emb.iter()) {
230                        *c = (*c * n + v) / (n + 1.0);
231                    }
232                    cluster.member_ids.push(input.message_id);
233                    cluster.session_ids.insert(input.conversation_id);
234                    assigned = true;
235                    break;
236                }
237            }
238            if !assigned {
239                clusters.push(Cluster {
240                    centroid: emb.clone(),
241                    member_ids: vec![input.message_id],
242                    session_ids: std::iter::once(input.conversation_id).collect(),
243                });
244            }
245        }
246
247        // Filter clusters that meet both thresholds.
248        let candidates = clusters
249            .into_iter()
250            .filter(|c| {
251                u32::try_from(c.member_ids.len()).unwrap_or(u32::MAX) >= self.config.min_occurrences
252                    && u32::try_from(c.session_ids.len()).unwrap_or(u32::MAX)
253                        >= self.config.min_sessions
254            })
255            .map(|c| {
256                let signature = cluster_signature(&c.centroid);
257                PromotionCandidate {
258                    signature,
259                    member_ids: c.member_ids,
260                    session_ids: c.session_ids.into_iter().collect(),
261                    centroid: c.centroid,
262                }
263            })
264            .collect();
265
266        Ok(candidates)
267    }
268
269    /// Generate and persist a SKILL.md for `candidate`. Idempotent by signature.
270    ///
271    /// On evaluator rejection the method returns `Ok(())` — rejection is a normal outcome.
272    ///
273    /// # Errors
274    ///
275    /// Returns [`MemoryError::Promotion`] on generation, evaluator, or disk-write failure.
276    #[tracing::instrument(name = "memory.compression.promote.persist", skip_all,
277                          fields(signature = %candidate.signature))]
278    pub async fn promote(&self, candidate: &PromotionCandidate) -> Result<(), MemoryError> {
279        // Idempotency: skip if already exists.
280        let skill_name = format!("promoted-pattern-{}", &candidate.signature[..12]);
281        let skill_dir = self.output_dir.join(&skill_name);
282        if skill_dir.exists() {
283            tracing::debug!(signature = %candidate.signature, "promotion candidate already exists, skipping");
284            return Ok(());
285        }
286
287        let member_count = candidate.member_ids.len();
288        let session_count = candidate.session_ids.len();
289        let description = format!(
290            "Recurring procedural pattern detected across {member_count} messages in \
291             {session_count} sessions. Generate a concise SKILL.md capturing the common \
292             tool-use pattern or workflow. Signature: {}.",
293            candidate.signature
294        );
295
296        self.writer
297            .write_skill(description, candidate.signature.clone())
298            .await
299            .map_err(MemoryError::Promotion)
300    }
301}
302
303// ── Helper functions ──────────────────────────────────────────────────────────
304
305/// Compute cosine similarity between two equal-length vectors.
306/// Returns `0.0` when either vector is zero-length or the norm is zero.
307fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
308    debug_assert_eq!(a.len(), b.len());
309    let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
310    let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
311    let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
312    if norm_a < f32::EPSILON || norm_b < f32::EPSILON {
313        return 0.0;
314    }
315    (dot / (norm_a * norm_b)).clamp(-1.0, 1.0)
316}
317
318/// Derive a stable signature from a centroid vector using SHA-256 hex.
319fn cluster_signature(centroid: &[f32]) -> String {
320    use std::hash::Hash;
321    // Use a simple FNV-like hash of the quantised centroid to avoid
322    // a heavy crypto dependency — this is a deduplication key, not a security hash.
323    let mut hasher = std::collections::hash_map::DefaultHasher::new();
324    for v in centroid {
325        let bits = v.to_bits();
326        bits.hash(&mut hasher);
327    }
328    let h = std::hash::Hasher::finish(&hasher);
329    format!("{h:016x}")
330}
331
332// ── Tests ─────────────────────────────────────────────────────────────────────
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use std::sync::Mutex;
338
339    struct RecordingWriter {
340        written: Mutex<Vec<String>>,
341    }
342
343    impl SkillWriter for RecordingWriter {
344        fn write_skill(
345            &self,
346            description: String,
347            _signature: String,
348        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>
349        {
350            self.written.lock().unwrap().push(description);
351            Box::pin(async { Ok(()) })
352        }
353    }
354
355    fn make_input(id: i64, cid: i64, content: &str, emb: Vec<f32>) -> PromotionInput {
356        PromotionInput {
357            message_id: MessageId(id),
358            conversation_id: ConversationId(cid),
359            content: content.to_string(),
360            embedding: Some(emb),
361        }
362    }
363
364    fn unit_vec(n: usize, val: f32) -> Vec<f32> {
365        let mut v = vec![0.0_f32; n];
366        v[0] = val;
367        // Normalise to unit length.
368        let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
369        for x in &mut v {
370            *x /= norm;
371        }
372        v
373    }
374
375    #[test]
376    fn cosine_similarity_identical() {
377        let v = vec![1.0_f32, 0.0, 0.0];
378        assert!((cosine_similarity(&v, &v) - 1.0).abs() < 1e-6);
379    }
380
381    #[test]
382    fn cosine_similarity_orthogonal() {
383        let a = vec![1.0_f32, 0.0];
384        let b = vec![0.0_f32, 1.0];
385        assert!((cosine_similarity(&a, &b) - 0.0).abs() < 1e-6);
386    }
387
388    #[tokio::test]
389    async fn scan_returns_empty_for_no_embeddings() {
390        let writer = Arc::new(RecordingWriter {
391            written: Mutex::new(vec![]),
392        });
393        let engine =
394            PromotionEngine::new(writer, PromotionConfig::default(), PathBuf::from("/tmp"));
395        let window = vec![PromotionInput {
396            message_id: MessageId(1),
397            conversation_id: ConversationId(1),
398            content: "hello".into(),
399            embedding: None,
400        }];
401        let candidates = engine.scan(&window).await.unwrap();
402        assert!(candidates.is_empty());
403    }
404
405    #[tokio::test]
406    async fn scan_qualifies_cluster_meeting_thresholds() {
407        let writer = Arc::new(RecordingWriter {
408            written: Mutex::new(vec![]),
409        });
410        let config = PromotionConfig {
411            min_occurrences: 3,
412            min_sessions: 2,
413            cluster_threshold: 0.90,
414        };
415        let engine = PromotionEngine::new(writer, config, PathBuf::from("/tmp"));
416
417        // 4 nearly identical vectors from 3 distinct sessions.
418        let base = unit_vec(4, 1.0);
419        let window = vec![
420            make_input(1, 1, "a", base.clone()),
421            make_input(2, 1, "b", base.clone()),
422            make_input(3, 2, "c", base.clone()),
423            make_input(4, 3, "d", base.clone()),
424        ];
425        let candidates = engine.scan(&window).await.unwrap();
426        assert_eq!(candidates.len(), 1, "expected 1 qualifying cluster");
427        let c = &candidates[0];
428        assert_eq!(c.member_ids.len(), 4);
429        assert_eq!(c.session_ids.len(), 3);
430    }
431
432    #[tokio::test]
433    async fn scan_rejects_cluster_below_min_sessions() {
434        let writer = Arc::new(RecordingWriter {
435            written: Mutex::new(vec![]),
436        });
437        let config = PromotionConfig {
438            min_occurrences: 3,
439            min_sessions: 2,
440            cluster_threshold: 0.90,
441        };
442        let engine = PromotionEngine::new(writer, config, PathBuf::from("/tmp"));
443
444        // 4 messages but all from the same session.
445        let base = unit_vec(4, 1.0);
446        let window = (1..=4)
447            .map(|i| make_input(i, 1, "x", base.clone()))
448            .collect::<Vec<_>>();
449        let candidates = engine.scan(&window).await.unwrap();
450        assert!(
451            candidates.is_empty(),
452            "should reject cluster with only 1 session"
453        );
454    }
455
456    #[tokio::test]
457    async fn scan_errors_on_dimension_mismatch() {
458        let writer = Arc::new(RecordingWriter {
459            written: Mutex::new(vec![]),
460        });
461        let engine =
462            PromotionEngine::new(writer, PromotionConfig::default(), PathBuf::from("/tmp"));
463
464        let window = vec![
465            make_input(1, 1, "a", vec![1.0, 0.0, 0.0]),
466            make_input(2, 2, "b", vec![0.0, 1.0]), // wrong dimension
467        ];
468        let result = engine.scan(&window).await;
469        assert!(result.is_err(), "expected error on dimension mismatch");
470    }
471}