Skip to main content

sphereql_embed/
feedback.rs

1//! User-supplied feedback signals that refine stored meta-learning records.
2//!
3//! Automated quality metrics ([`QualityMetric`](crate::quality_metric::QualityMetric))
4//! only see the geometry of the built pipeline. They can't tell whether
5//! actual users found query results useful. This module defines a minimal
6//! feedback primitive — one scalar signal per query — plus an aggregator
7//! that summarizes signals per `corpus_id` so a
8//! [`MetaTrainingRecord`](crate::meta_model::MetaTrainingRecord)'s
9//! `best_score` can be blended with observed user satisfaction.
10//!
11//! Intended flow (L3 of the metalearning ladder):
12//!
13//! 1. Deploy a tuned pipeline to users.
14//! 2. On each query result, collect a satisfaction signal (thumbs, rating,
15//!    click-through, …). Map it to `[0, 1]` and emit a [`FeedbackEvent`].
16//! 3. Aggregate events into a [`FeedbackAggregator`], persisted under
17//!    [`FeedbackAggregator::default_store_path`].
18//! 4. When selecting a stored record for a new corpus, blend the record's
19//!    automated `best_score` with the corpus's feedback summary via
20//!    [`MetaTrainingRecord::adjust_score_with_feedback`](crate::meta_model::MetaTrainingRecord::adjust_score_with_feedback).
21//!
22//! The meta-model is deliberately *not* retrained here — that's a v2
23//! concern. This module supplies the primitives; composition is up to
24//! the caller.
25
26use std::collections::HashMap;
27use std::fs;
28use std::io;
29use std::path::{Path, PathBuf};
30
31use crate::util::{default_timestamp, migrate_legacy_array_to_jsonl, sphereql_home_dir};
32
33/// One user-supplied satisfaction signal attached to a specific query.
34///
35/// `score` is a normalized scalar in `[0, 1]`:
36/// - `1.0` = perfect, user got exactly what they wanted.
37/// - `0.5` = neutral / ambiguous.
38/// - `0.0` = wrong, unhelpful, or actively misleading.
39///
40/// Upstream mapping (stars to `[0, 1]`, CTR to `[0, 1]`, etc.) is the
41/// caller's responsibility — the aggregator just computes statistics on
42/// whatever scalar you supply.
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
44pub struct FeedbackEvent {
45    /// Must match the `corpus_id` of the [`MetaTrainingRecord`](crate::meta_model::MetaTrainingRecord)
46    /// the pipeline was built from.
47    pub corpus_id: String,
48    /// Caller-supplied query identifier. Free-form; used for deduping
49    /// and auditing. An empty string is allowed.
50    pub query_id: String,
51    /// Satisfaction signal in `[0, 1]`. Clamped at read time by
52    /// [`FeedbackAggregator::summarize`]; store raw values if you want.
53    pub score: f64,
54    /// Free-form timestamp string. Seconds-since-epoch by default from
55    /// [`FeedbackEvent::now`]; swap in your own format as needed.
56    pub timestamp: String,
57}
58
59impl FeedbackEvent {
60    /// Construct with a default timestamp (epoch seconds).
61    pub fn now(corpus_id: impl Into<String>, query_id: impl Into<String>, score: f64) -> Self {
62        Self {
63            corpus_id: corpus_id.into(),
64            query_id: query_id.into(),
65            score,
66            timestamp: default_timestamp(),
67        }
68    }
69
70    /// Append this event to the user's default feedback store
71    /// (`~/.sphereql/feedback_events.json`).
72    ///
73    /// O(1) per call: opens the file in append mode and writes one
74    /// JSON-encoded line. Previous implementation loaded the full
75    /// aggregator, pushed the event, and rewrote the file — O(N)
76    /// per append, which mattered on a production firehose.
77    /// Legacy array-format stores are migrated to JSONL on the first
78    /// append (one-time O(N) cost) and append at O(1) afterward.
79    ///
80    /// Mirrors [`MetaTrainingRecord::append_to_default_store`](crate::meta_model::MetaTrainingRecord::append_to_default_store)
81    /// — both are instance methods on the data they persist.
82    pub fn append_to_default_store(&self) -> io::Result<PathBuf> {
83        let path = FeedbackAggregator::default_store_path()?;
84        self.append_to(&path)?;
85        Ok(path)
86    }
87
88    /// Append this event to an arbitrary JSONL file. Creates the file
89    /// and any missing parent directories on first call.
90    pub fn append_to(&self, path: impl AsRef<Path>) -> io::Result<()> {
91        use std::io::Write;
92
93        let path = path.as_ref();
94        if let Some(parent) = path.parent()
95            && !parent.as_os_str().is_empty()
96        {
97            fs::create_dir_all(parent)?;
98        }
99
100        // One-time migration: if the store is a legacy JSON array
101        // (what FeedbackAggregator::save used to write), rewrite it
102        // as JSONL so subsequent appends stay O(1).
103        migrate_legacy_array_to_jsonl(path, |head| {
104            let events: Vec<Self> = serde_json::from_str(head.trim_start())
105                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
106            let mut migrated = String::with_capacity(head.len());
107            for e in &events {
108                serde_json::to_string(e)
109                    .map(|line| {
110                        migrated.push_str(&line);
111                        migrated.push('\n');
112                    })
113                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
114            }
115            Ok(migrated)
116        })?;
117
118        let mut f = fs::OpenOptions::new()
119            .create(true)
120            .append(true)
121            .open(path)?;
122        let line = serde_json::to_string(self)
123            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
124        writeln!(f, "{line}")
125    }
126}
127
128/// Summary statistics for the feedback observed on a single corpus.
129///
130/// All scalar fields are computed over the subset of events whose
131/// `corpus_id` matches the summarized corpus. `mean_score` is
132/// clamp-averaged to `[0, 1]` so downstream blending stays bounded even
133/// when raw event scores are dirty.
134#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
135pub struct FeedbackSummary {
136    pub corpus_id: String,
137    pub n_events: usize,
138    pub mean_score: f64,
139    pub min_score: f64,
140    pub max_score: f64,
141}
142
143// ── Aggregator ─────────────────────────────────────────────────────────
144
145/// Accumulates [`FeedbackEvent`]s across sessions and summarizes them by
146/// `corpus_id`.
147///
148/// Serializable as a flat JSON array of events — same pattern as
149/// [`MetaTrainingRecord::save_list`](crate::meta_model::MetaTrainingRecord::save_list).
150/// Append is O(1) amortized; [`Self::summarize`] is O(N) per call, which
151/// is fine for the scale feedback naturally reaches (hundreds to
152/// thousands of events per corpus).
153///
154/// `#[serde(transparent)]` keeps the derive-based serializer
155/// (`serde_json::to_string(&agg)`) and the hand-rolled
156/// [`Self::save`] / [`Self::load`] path on the same JSON shape — a flat
157/// array of events. Without it, the derive would emit `{"events": [...]}`
158/// which `load` rejects.
159#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
160#[serde(transparent)]
161pub struct FeedbackAggregator {
162    // Optional ring-buffer semantics. When `max_events` is set to some
163    // `N`, `record` drops the oldest event whenever the log would
164    // exceed `N`. Serialized transparently as a flat array; the cap
165    // itself is a runtime-only knob and is not persisted.
166    #[serde(skip)]
167    max_events: Option<usize>,
168    events: Vec<FeedbackEvent>,
169}
170
171impl FeedbackAggregator {
172    pub fn new() -> Self {
173        Self::default()
174    }
175
176    /// Construct with a bounded event window. Once the log reaches
177    /// `max_events`, every new `record` call drops the oldest event so
178    /// memory stays capped — appropriate for long-running services
179    /// that only need recent feedback for per-corpus summaries.
180    ///
181    /// Without this cap the event log grows indefinitely; on a 1
182    /// event/sec firehose that reaches 100 MB of JSON within a week.
183    pub fn with_window(max_events: usize) -> Self {
184        let max_events = max_events.max(1);
185        Self {
186            max_events: Some(max_events),
187            events: Vec::with_capacity(max_events.min(1024)),
188        }
189    }
190
191    /// Attach (or drop) the event-count cap after construction.
192    ///
193    /// `Some(0)` is normalized to `Some(1)` so the ring never produces
194    /// a negative-sized drain in [`Self::record`].
195    pub fn set_max_events(&mut self, max_events: Option<usize>) {
196        let max_events = max_events.map(|n| n.max(1));
197        self.max_events = max_events;
198        if let Some(cap) = max_events
199            && self.events.len() > cap
200        {
201            let excess = self.events.len() - cap;
202            self.events.drain(0..excess);
203        }
204    }
205
206    /// Total number of accumulated events (across all corpus_ids).
207    pub fn len(&self) -> usize {
208        self.events.len()
209    }
210
211    pub fn is_empty(&self) -> bool {
212        self.events.is_empty()
213    }
214
215    /// Append one event. When a [`Self::with_window`] cap is set and
216    /// the log is already at capacity, the oldest event is evicted
217    /// first — a FIFO ring over the underlying `Vec`.
218    pub fn record(&mut self, event: FeedbackEvent) {
219        if let Some(cap) = self.max_events
220            && self.events.len() >= cap
221        {
222            // `remove(0)` is O(n) but `record` on a capped aggregator
223            // is paired with O(n) event summarization anyway, and the
224            // cap is expected to be in the hundreds, not millions.
225            let excess = self.events.len() + 1 - cap;
226            self.events.drain(0..excess);
227        }
228        self.events.push(event);
229    }
230
231    /// Read-only borrow of the raw event log.
232    pub fn events(&self) -> &[FeedbackEvent] {
233        &self.events
234    }
235
236    /// Distinct `corpus_id`s that have any feedback attached.
237    pub fn corpus_ids(&self) -> Vec<String> {
238        let mut ids: Vec<String> = self
239            .events
240            .iter()
241            .map(|e| e.corpus_id.clone())
242            .collect::<std::collections::HashSet<_>>()
243            .into_iter()
244            .collect();
245        ids.sort();
246        ids
247    }
248
249    /// Summarize feedback for a specific corpus. Returns `None` if the
250    /// corpus has no events yet.
251    pub fn summarize(&self, corpus_id: &str) -> Option<FeedbackSummary> {
252        let mut count = 0usize;
253        let mut sum = 0.0f64;
254        let mut min = f64::INFINITY;
255        let mut max = f64::NEG_INFINITY;
256        for e in &self.events {
257            if e.corpus_id != corpus_id {
258                continue;
259            }
260            let s = e.score.clamp(0.0, 1.0);
261            count += 1;
262            sum += s;
263            if s < min {
264                min = s;
265            }
266            if s > max {
267                max = s;
268            }
269        }
270        if count == 0 {
271            return None;
272        }
273        Some(FeedbackSummary {
274            corpus_id: corpus_id.to_string(),
275            n_events: count,
276            mean_score: sum / count as f64,
277            min_score: min,
278            max_score: max,
279        })
280    }
281
282    /// Summarize every corpus that has events. Sorted by `corpus_id`.
283    pub fn summarize_all(&self) -> Vec<FeedbackSummary> {
284        let mut per_corpus: HashMap<String, (usize, f64, f64, f64)> = HashMap::new();
285        for e in &self.events {
286            let s = e.score.clamp(0.0, 1.0);
287            let entry = per_corpus.entry(e.corpus_id.clone()).or_insert((
288                0,
289                0.0,
290                f64::INFINITY,
291                f64::NEG_INFINITY,
292            ));
293            entry.0 += 1;
294            entry.1 += s;
295            if s < entry.2 {
296                entry.2 = s;
297            }
298            if s > entry.3 {
299                entry.3 = s;
300            }
301        }
302        let mut out: Vec<FeedbackSummary> = per_corpus
303            .into_iter()
304            .map(|(corpus_id, (count, sum, min, max))| FeedbackSummary {
305                corpus_id,
306                n_events: count,
307                mean_score: sum / count as f64,
308                min_score: min,
309                max_score: max,
310            })
311            .collect();
312        out.sort_by(|a, b| a.corpus_id.cmp(&b.corpus_id));
313        out
314    }
315
316    /// Save this aggregator (event list) to a JSON file. Creates parent
317    /// directories as needed.
318    ///
319    /// Writes a pretty-printed JSON array — the legacy store format.
320    /// If you save to [`Self::default_store_path`], the next
321    /// [`FeedbackEvent::append_to_default_store`] call will migrate
322    /// the file back to JSONL.
323    pub fn save(&self, path: impl AsRef<Path>) -> io::Result<()> {
324        let path = path.as_ref();
325        if let Some(parent) = path.parent()
326            && !parent.as_os_str().is_empty()
327        {
328            fs::create_dir_all(parent)?;
329        }
330        let json = serde_json::to_string_pretty(&self.events)
331            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
332        fs::write(path, json)
333    }
334
335    /// Load an aggregator from a JSON event-log file. Returns an empty
336    /// aggregator if the file does not exist.
337    ///
338    /// Accepts both a JSON array (legacy, what `save` writes for
339    /// backward compat) and JSON Lines (new format written by
340    /// [`FeedbackEvent::append_to_default_store`] and sibling append
341    /// paths). Detection is first-character based.
342    pub fn load(path: impl AsRef<Path>) -> io::Result<Self> {
343        let path = path.as_ref();
344        if !path.exists() {
345            return Ok(Self::new());
346        }
347        let raw = fs::read_to_string(path)?;
348        let trimmed = raw.trim_start();
349        if trimmed.is_empty() {
350            return Ok(Self::new());
351        }
352        let events: Vec<FeedbackEvent> = if trimmed.starts_with('[') {
353            serde_json::from_str(trimmed)
354                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
355        } else {
356            trimmed
357                .lines()
358                .filter(|l| !l.trim().is_empty())
359                .map(|l| {
360                    serde_json::from_str::<FeedbackEvent>(l)
361                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
362                })
363                .collect::<io::Result<Vec<_>>>()?
364        };
365        Ok(Self {
366            max_events: None,
367            events,
368        })
369    }
370
371    /// Default on-disk feedback log: `~/.sphereql/feedback_events.json`.
372    /// Parallel convention to
373    /// [`MetaTrainingRecord::default_store_path`](crate::meta_model::MetaTrainingRecord::default_store_path).
374    pub fn default_store_path() -> io::Result<PathBuf> {
375        Ok(sphereql_home_dir()?.join("feedback_events.json"))
376    }
377
378    /// Load the default on-disk feedback store. Empty aggregator if the
379    /// store does not exist yet.
380    pub fn load_default_store() -> io::Result<Self> {
381        Self::load(Self::default_store_path()?)
382    }
383}
384
385// ── Tests ──────────────────────────────────────────────────────────────
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390
391    fn ev(corpus: &str, query: &str, score: f64) -> FeedbackEvent {
392        FeedbackEvent {
393            corpus_id: corpus.into(),
394            query_id: query.into(),
395            score,
396            timestamp: "0".into(),
397        }
398    }
399
400    #[test]
401    fn with_window_evicts_oldest() {
402        let mut a = FeedbackAggregator::with_window(3);
403        for i in 0..5 {
404            a.record(ev("c", &format!("q{i}"), i as f64 / 10.0));
405        }
406        // Only the last 3 events survive (q2, q3, q4).
407        assert_eq!(a.len(), 3);
408        let ids: Vec<&str> = a.events().iter().map(|e| e.query_id.as_str()).collect();
409        assert_eq!(ids, vec!["q2", "q3", "q4"]);
410    }
411
412    #[test]
413    fn set_max_events_trims_existing_log() {
414        let mut a = FeedbackAggregator::new();
415        for i in 0..10 {
416            a.record(ev("c", &format!("q{i}"), 0.5));
417        }
418        a.set_max_events(Some(4));
419        assert_eq!(a.len(), 4);
420        let ids: Vec<&str> = a.events().iter().map(|e| e.query_id.as_str()).collect();
421        assert_eq!(ids, vec!["q6", "q7", "q8", "q9"]);
422    }
423
424    #[test]
425    fn empty_aggregator_has_no_summary() {
426        let a = FeedbackAggregator::new();
427        assert!(a.is_empty());
428        assert!(a.summarize("anything").is_none());
429        assert!(a.summarize_all().is_empty());
430        assert!(a.corpus_ids().is_empty());
431    }
432
433    #[test]
434    fn summarize_single_corpus() {
435        let mut a = FeedbackAggregator::new();
436        a.record(ev("c1", "q1", 0.8));
437        a.record(ev("c1", "q2", 0.6));
438        a.record(ev("c1", "q3", 1.0));
439        let s = a.summarize("c1").unwrap();
440        assert_eq!(s.n_events, 3);
441        assert!((s.mean_score - 0.8).abs() < 1e-12);
442        assert!((s.min_score - 0.6).abs() < 1e-12);
443        assert!((s.max_score - 1.0).abs() < 1e-12);
444    }
445
446    #[test]
447    fn summarize_clamps_out_of_range_scores() {
448        let mut a = FeedbackAggregator::new();
449        a.record(ev("c", "q1", -0.5));
450        a.record(ev("c", "q2", 1.5));
451        let s = a.summarize("c").unwrap();
452        // -0.5 → 0, 1.5 → 1 → mean = 0.5
453        assert!((s.mean_score - 0.5).abs() < 1e-12);
454        assert_eq!(s.min_score, 0.0);
455        assert_eq!(s.max_score, 1.0);
456    }
457
458    #[test]
459    fn summarize_isolates_corpus_ids() {
460        let mut a = FeedbackAggregator::new();
461        a.record(ev("alpha", "q", 0.2));
462        a.record(ev("beta", "q", 0.9));
463        assert!((a.summarize("alpha").unwrap().mean_score - 0.2).abs() < 1e-12);
464        assert!((a.summarize("beta").unwrap().mean_score - 0.9).abs() < 1e-12);
465        assert!(a.summarize("gamma").is_none());
466    }
467
468    #[test]
469    fn summarize_all_is_sorted_by_corpus_id() {
470        let mut a = FeedbackAggregator::new();
471        a.record(ev("zebra", "q", 0.5));
472        a.record(ev("ant", "q", 0.5));
473        a.record(ev("mule", "q", 0.5));
474        let sums = a.summarize_all();
475        assert_eq!(sums.len(), 3);
476        assert_eq!(sums[0].corpus_id, "ant");
477        assert_eq!(sums[1].corpus_id, "mule");
478        assert_eq!(sums[2].corpus_id, "zebra");
479    }
480
481    #[test]
482    fn corpus_ids_distinct_and_sorted() {
483        let mut a = FeedbackAggregator::new();
484        a.record(ev("b", "q", 0.5));
485        a.record(ev("a", "q", 0.5));
486        a.record(ev("b", "q2", 0.5));
487        let ids = a.corpus_ids();
488        assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
489    }
490
491    #[test]
492    fn now_constructor_produces_parseable_timestamp() {
493        let e = FeedbackEvent::now("c", "q", 0.5);
494        assert_eq!(e.corpus_id, "c");
495        assert!(e.timestamp.parse::<u64>().is_ok());
496    }
497
498    #[test]
499    fn save_and_load_roundtrip() {
500        let dir = std::env::temp_dir().join(format!("sphereql_fb_test_{}", std::process::id()));
501        let _ = fs::remove_dir_all(&dir);
502        let path = dir.join("nested").join("events.json");
503
504        let mut a = FeedbackAggregator::new();
505        a.record(ev("c1", "q", 0.7));
506        a.record(ev("c2", "q", 0.3));
507        a.save(&path).unwrap();
508
509        let loaded = FeedbackAggregator::load(&path).unwrap();
510        assert_eq!(loaded.len(), 2);
511        assert_eq!(loaded.events()[0].corpus_id, "c1");
512        assert_eq!(loaded.events()[1].corpus_id, "c2");
513
514        let _ = fs::remove_dir_all(&dir);
515    }
516
517    #[test]
518    fn append_to_creates_jsonl_then_load_roundtrips() {
519        let dir = std::env::temp_dir().join(format!("sphereql_fb_jsonl_{}", std::process::id()));
520        let _ = fs::remove_dir_all(&dir);
521        let path = dir.join("nested").join("events.json");
522
523        // Append three events one by one — each call is O(1).
524        ev("c1", "q1", 0.8).append_to(&path).unwrap();
525        ev("c1", "q2", 0.6).append_to(&path).unwrap();
526        ev("c2", "q3", 0.4).append_to(&path).unwrap();
527
528        let loaded = FeedbackAggregator::load(&path).unwrap();
529        assert_eq!(loaded.len(), 3);
530        assert_eq!(loaded.events()[0].query_id, "q1");
531        assert_eq!(loaded.events()[1].query_id, "q2");
532        assert_eq!(loaded.events()[2].query_id, "q3");
533
534        // Raw file really is JSONL (one record per line).
535        let raw = fs::read_to_string(&path).unwrap();
536        assert_eq!(raw.lines().count(), 3);
537        assert!(!raw.trim_start().starts_with('['));
538
539        let _ = fs::remove_dir_all(&dir);
540    }
541
542    #[test]
543    fn append_to_migrates_legacy_array_file() {
544        let dir = std::env::temp_dir().join(format!("sphereql_fb_migrate_{}", std::process::id()));
545        let _ = fs::remove_dir_all(&dir);
546        let path = dir.join("events.json");
547
548        // Seed with a legacy array file (what `save` used to write).
549        let mut legacy = FeedbackAggregator::new();
550        legacy.record(ev("c1", "q1", 0.9));
551        legacy.record(ev("c2", "q2", 0.5));
552        legacy.save(&path).unwrap();
553
554        // First append migrates the file to JSONL.
555        ev("c3", "q3", 0.7).append_to(&path).unwrap();
556
557        let loaded = FeedbackAggregator::load(&path).unwrap();
558        assert_eq!(loaded.len(), 3);
559        assert_eq!(loaded.events()[0].query_id, "q1");
560        assert_eq!(loaded.events()[2].query_id, "q3");
561
562        // Post-migration shape is JSONL.
563        let raw = fs::read_to_string(&path).unwrap();
564        assert!(!raw.trim_start().starts_with('['));
565        assert_eq!(raw.lines().count(), 3);
566
567        let _ = fs::remove_dir_all(&dir);
568    }
569
570    #[test]
571    fn load_nonexistent_returns_empty() {
572        let path = std::env::temp_dir().join("sphereql_fb_nonexistent_xyz.json");
573        let a = FeedbackAggregator::load(&path).unwrap();
574        assert!(a.is_empty());
575    }
576
577    #[test]
578    fn default_store_path_ends_with_expected_filename() {
579        let p = FeedbackAggregator::default_store_path().unwrap();
580        assert!(p.ends_with("feedback_events.json"));
581        assert!(p.iter().any(|c| c.to_string_lossy() == ".sphereql"));
582    }
583}