Skip to main content

sphereql_embed/
feedback.rs

1//! User-supplied feedback signals that refine stored meta-learning records.
2//!
3//! Automated quality metrics ([`QualityMetric`](crate::quality_metric::QualityMetric))
4//! only see the geometry of the built pipeline. They can't tell whether
5//! actual users found query results useful. This module defines a minimal
6//! feedback primitive — one scalar signal per query — plus an aggregator
7//! that summarizes signals per `corpus_id` so a
8//! [`MetaTrainingRecord`](crate::meta_model::MetaTrainingRecord)'s
9//! `best_score` can be blended with observed user satisfaction.
10//!
11//! Intended flow (L3 of the metalearning ladder):
12//!
13//! 1. Deploy a tuned pipeline to users.
14//! 2. On each query result, collect a satisfaction signal (thumbs, rating,
15//!    click-through, …). Map it to `[0, 1]` and emit a [`FeedbackEvent`].
16//! 3. Aggregate events into a [`FeedbackAggregator`], persisted under
17//!    [`FeedbackAggregator::default_store_path`].
18//! 4. When selecting a stored record for a new corpus, blend the record's
19//!    automated `best_score` with the corpus's feedback summary via
20//!    [`MetaTrainingRecord::adjust_score_with_feedback`](crate::meta_model::MetaTrainingRecord::adjust_score_with_feedback).
21//!
22//! The meta-model is deliberately *not* retrained here — that's a v2
23//! concern. This module supplies the primitives; composition is up to
24//! the caller.
25
26use std::collections::HashMap;
27use std::fs;
28use std::io;
29use std::path::{Path, PathBuf};
30
31use crate::util::{default_timestamp, sphereql_home_dir};
32
33/// One user-supplied satisfaction signal attached to a specific query.
34///
35/// `score` is a normalized scalar in `[0, 1]`:
36/// - `1.0` = perfect, user got exactly what they wanted.
37/// - `0.5` = neutral / ambiguous.
38/// - `0.0` = wrong, unhelpful, or actively misleading.
39///
40/// Upstream mapping (stars to `[0, 1]`, CTR to `[0, 1]`, etc.) is the
41/// caller's responsibility — the aggregator just computes statistics on
42/// whatever scalar you supply.
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
44pub struct FeedbackEvent {
45    /// Must match the `corpus_id` of the [`MetaTrainingRecord`](crate::meta_model::MetaTrainingRecord)
46    /// the pipeline was built from.
47    pub corpus_id: String,
48    /// Caller-supplied query identifier. Free-form; used for deduping
49    /// and auditing. An empty string is allowed.
50    pub query_id: String,
51    /// Satisfaction signal in `[0, 1]`. Clamped at read time by
52    /// [`FeedbackAggregator::summarize`]; store raw values if you want.
53    pub score: f64,
54    /// Free-form timestamp string. Seconds-since-epoch by default from
55    /// [`FeedbackEvent::now`]; swap in your own format as needed.
56    pub timestamp: String,
57}
58
59impl FeedbackEvent {
60    /// Construct with a default timestamp (epoch seconds).
61    pub fn now(corpus_id: impl Into<String>, query_id: impl Into<String>, score: f64) -> Self {
62        Self {
63            corpus_id: corpus_id.into(),
64            query_id: query_id.into(),
65            score,
66            timestamp: default_timestamp(),
67        }
68    }
69
70    /// Append this event to the user's default feedback store
71    /// (`~/.sphereql/feedback_events.json`).
72    ///
73    /// O(1) per call: opens the file in append mode and writes one
74    /// JSON-encoded line. Previous implementation loaded the full
75    /// aggregator, pushed the event, and rewrote the file — O(N)
76    /// per append, which mattered on a production firehose.
77    /// Legacy array-format stores are migrated to JSONL on the first
78    /// append (one-time O(N) cost) and append at O(1) afterward.
79    ///
80    /// Mirrors [`MetaTrainingRecord::append_to_default_store`](crate::meta_model::MetaTrainingRecord::append_to_default_store)
81    /// — both are instance methods on the data they persist.
82    pub fn append_to_default_store(&self) -> io::Result<PathBuf> {
83        let path = FeedbackAggregator::default_store_path()?;
84        self.append_to(&path)?;
85        Ok(path)
86    }
87
88    /// Append this event to an arbitrary JSONL file. Creates the file
89    /// and any missing parent directories on first call.
90    pub fn append_to(&self, path: impl AsRef<Path>) -> io::Result<()> {
91        use std::io::Write;
92
93        let path = path.as_ref();
94        if let Some(parent) = path.parent()
95            && !parent.as_os_str().is_empty()
96        {
97            fs::create_dir_all(parent)?;
98        }
99
100        // One-time migration: if the store is a legacy JSON array
101        // (what FeedbackAggregator::save used to write), rewrite it
102        // as JSONL so subsequent appends stay O(1).
103        if path.exists() {
104            let head = fs::read_to_string(path)?;
105            if head.trim_start().starts_with('[') {
106                let events: Vec<Self> = serde_json::from_str(head.trim_start())
107                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
108                let mut migrated = String::with_capacity(head.len());
109                for e in &events {
110                    serde_json::to_string(e)
111                        .map(|line| {
112                            migrated.push_str(&line);
113                            migrated.push('\n');
114                        })
115                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
116                }
117                fs::write(path, migrated)?;
118            }
119        }
120
121        let mut f = fs::OpenOptions::new()
122            .create(true)
123            .append(true)
124            .open(path)?;
125        let line = serde_json::to_string(self)
126            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
127        writeln!(f, "{line}")
128    }
129}
130
131/// Summary statistics for the feedback observed on a single corpus.
132///
133/// All scalar fields are computed over the subset of events whose
134/// `corpus_id` matches the summarized corpus. `mean_score` is
135/// clamp-averaged to `[0, 1]` so downstream blending stays bounded even
136/// when raw event scores are dirty.
137#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
138pub struct FeedbackSummary {
139    pub corpus_id: String,
140    pub n_events: usize,
141    pub mean_score: f64,
142    pub min_score: f64,
143    pub max_score: f64,
144}
145
146// ── Aggregator ─────────────────────────────────────────────────────────
147
148/// Accumulates [`FeedbackEvent`]s across sessions and summarizes them by
149/// `corpus_id`.
150///
151/// Serializable as a flat JSON array of events — same pattern as
152/// [`MetaTrainingRecord::save_list`](crate::meta_model::MetaTrainingRecord::save_list).
153/// Append is O(1) amortized; [`Self::summarize`] is O(N) per call, which
154/// is fine for the scale feedback naturally reaches (hundreds to
155/// thousands of events per corpus).
156///
157/// `#[serde(transparent)]` keeps the derive-based serializer
158/// (`serde_json::to_string(&agg)`) and the hand-rolled
159/// [`Self::save`] / [`Self::load`] path on the same JSON shape — a flat
160/// array of events. Without it, the derive would emit `{"events": [...]}`
161/// which `load` rejects.
162#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
163#[serde(transparent)]
164pub struct FeedbackAggregator {
165    // Optional ring-buffer semantics. When `max_events` is set to some
166    // `N`, `record` drops the oldest event whenever the log would
167    // exceed `N`. Serialized transparently as a flat array; the cap
168    // itself is a runtime-only knob and is not persisted.
169    #[serde(skip)]
170    max_events: Option<usize>,
171    events: Vec<FeedbackEvent>,
172}
173
174impl FeedbackAggregator {
175    pub fn new() -> Self {
176        Self::default()
177    }
178
179    /// Construct with a bounded event window. Once the log reaches
180    /// `max_events`, every new `record` call drops the oldest event so
181    /// memory stays capped — appropriate for long-running services
182    /// that only need recent feedback for per-corpus summaries.
183    ///
184    /// Without this cap the event log grows indefinitely; on a 1
185    /// event/sec firehose that reaches 100 MB of JSON within a week.
186    pub fn with_window(max_events: usize) -> Self {
187        Self {
188            max_events: Some(max_events),
189            events: Vec::with_capacity(max_events.min(1024)),
190        }
191    }
192
193    /// Attach (or drop) the event-count cap after construction.
194    pub fn set_max_events(&mut self, max_events: Option<usize>) {
195        self.max_events = max_events;
196        if let Some(cap) = max_events
197            && self.events.len() > cap
198        {
199            let excess = self.events.len() - cap;
200            self.events.drain(0..excess);
201        }
202    }
203
204    /// Total number of accumulated events (across all corpus_ids).
205    pub fn len(&self) -> usize {
206        self.events.len()
207    }
208
209    pub fn is_empty(&self) -> bool {
210        self.events.is_empty()
211    }
212
213    /// Append one event. When a [`Self::with_window`] cap is set and
214    /// the log is already at capacity, the oldest event is evicted
215    /// first — a FIFO ring over the underlying `Vec`.
216    pub fn record(&mut self, event: FeedbackEvent) {
217        if let Some(cap) = self.max_events
218            && self.events.len() >= cap
219        {
220            // `remove(0)` is O(n) but `record` on a capped aggregator
221            // is paired with O(n) event summarization anyway, and the
222            // cap is expected to be in the hundreds, not millions.
223            let excess = self.events.len() + 1 - cap;
224            self.events.drain(0..excess);
225        }
226        self.events.push(event);
227    }
228
229    /// Read-only borrow of the raw event log.
230    pub fn events(&self) -> &[FeedbackEvent] {
231        &self.events
232    }
233
234    /// Distinct `corpus_id`s that have any feedback attached.
235    pub fn corpus_ids(&self) -> Vec<String> {
236        let mut ids: Vec<String> = self
237            .events
238            .iter()
239            .map(|e| e.corpus_id.clone())
240            .collect::<std::collections::HashSet<_>>()
241            .into_iter()
242            .collect();
243        ids.sort();
244        ids
245    }
246
247    /// Summarize feedback for a specific corpus. Returns `None` if the
248    /// corpus has no events yet.
249    pub fn summarize(&self, corpus_id: &str) -> Option<FeedbackSummary> {
250        let mut count = 0usize;
251        let mut sum = 0.0f64;
252        let mut min = f64::INFINITY;
253        let mut max = f64::NEG_INFINITY;
254        for e in &self.events {
255            if e.corpus_id != corpus_id {
256                continue;
257            }
258            let s = e.score.clamp(0.0, 1.0);
259            count += 1;
260            sum += s;
261            if s < min {
262                min = s;
263            }
264            if s > max {
265                max = s;
266            }
267        }
268        if count == 0 {
269            return None;
270        }
271        Some(FeedbackSummary {
272            corpus_id: corpus_id.to_string(),
273            n_events: count,
274            mean_score: sum / count as f64,
275            min_score: min,
276            max_score: max,
277        })
278    }
279
280    /// Summarize every corpus that has events. Sorted by `corpus_id`.
281    pub fn summarize_all(&self) -> Vec<FeedbackSummary> {
282        let mut per_corpus: HashMap<String, (usize, f64, f64, f64)> = HashMap::new();
283        for e in &self.events {
284            let s = e.score.clamp(0.0, 1.0);
285            let entry = per_corpus.entry(e.corpus_id.clone()).or_insert((
286                0,
287                0.0,
288                f64::INFINITY,
289                f64::NEG_INFINITY,
290            ));
291            entry.0 += 1;
292            entry.1 += s;
293            if s < entry.2 {
294                entry.2 = s;
295            }
296            if s > entry.3 {
297                entry.3 = s;
298            }
299        }
300        let mut out: Vec<FeedbackSummary> = per_corpus
301            .into_iter()
302            .map(|(corpus_id, (count, sum, min, max))| FeedbackSummary {
303                corpus_id,
304                n_events: count,
305                mean_score: sum / count as f64,
306                min_score: min,
307                max_score: max,
308            })
309            .collect();
310        out.sort_by(|a, b| a.corpus_id.cmp(&b.corpus_id));
311        out
312    }
313
314    /// Save this aggregator (event list) to a JSON file. Creates parent
315    /// directories as needed.
316    pub fn save(&self, path: impl AsRef<Path>) -> io::Result<()> {
317        let path = path.as_ref();
318        if let Some(parent) = path.parent()
319            && !parent.as_os_str().is_empty()
320        {
321            fs::create_dir_all(parent)?;
322        }
323        let json = serde_json::to_string_pretty(&self.events)
324            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
325        fs::write(path, json)
326    }
327
328    /// Load an aggregator from a JSON event-log file. Returns an empty
329    /// aggregator if the file does not exist.
330    ///
331    /// Accepts both a JSON array (legacy, what `save` writes for
332    /// backward compat) and JSON Lines (new format written by
333    /// [`FeedbackEvent::append_to_default_store`] and sibling append
334    /// paths). Detection is first-character based.
335    pub fn load(path: impl AsRef<Path>) -> io::Result<Self> {
336        let path = path.as_ref();
337        if !path.exists() {
338            return Ok(Self::new());
339        }
340        let raw = fs::read_to_string(path)?;
341        let trimmed = raw.trim_start();
342        if trimmed.is_empty() {
343            return Ok(Self::new());
344        }
345        let events: Vec<FeedbackEvent> = if trimmed.starts_with('[') {
346            serde_json::from_str(trimmed)
347                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
348        } else {
349            trimmed
350                .lines()
351                .filter(|l| !l.trim().is_empty())
352                .map(|l| {
353                    serde_json::from_str::<FeedbackEvent>(l)
354                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
355                })
356                .collect::<io::Result<Vec<_>>>()?
357        };
358        Ok(Self {
359            max_events: None,
360            events,
361        })
362    }
363
364    /// Default on-disk feedback log: `~/.sphereql/feedback_events.json`.
365    /// Parallel convention to
366    /// [`MetaTrainingRecord::default_store_path`](crate::meta_model::MetaTrainingRecord::default_store_path).
367    pub fn default_store_path() -> io::Result<PathBuf> {
368        Ok(sphereql_home_dir()?.join("feedback_events.json"))
369    }
370
371    /// Load the default on-disk feedback store. Empty aggregator if the
372    /// store does not exist yet.
373    pub fn load_default_store() -> io::Result<Self> {
374        Self::load(Self::default_store_path()?)
375    }
376}
377
378// ── Tests ──────────────────────────────────────────────────────────────
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383
384    fn ev(corpus: &str, query: &str, score: f64) -> FeedbackEvent {
385        FeedbackEvent {
386            corpus_id: corpus.into(),
387            query_id: query.into(),
388            score,
389            timestamp: "0".into(),
390        }
391    }
392
393    #[test]
394    fn with_window_evicts_oldest() {
395        let mut a = FeedbackAggregator::with_window(3);
396        for i in 0..5 {
397            a.record(ev("c", &format!("q{i}"), i as f64 / 10.0));
398        }
399        // Only the last 3 events survive (q2, q3, q4).
400        assert_eq!(a.len(), 3);
401        let ids: Vec<&str> = a.events().iter().map(|e| e.query_id.as_str()).collect();
402        assert_eq!(ids, vec!["q2", "q3", "q4"]);
403    }
404
405    #[test]
406    fn set_max_events_trims_existing_log() {
407        let mut a = FeedbackAggregator::new();
408        for i in 0..10 {
409            a.record(ev("c", &format!("q{i}"), 0.5));
410        }
411        a.set_max_events(Some(4));
412        assert_eq!(a.len(), 4);
413        let ids: Vec<&str> = a.events().iter().map(|e| e.query_id.as_str()).collect();
414        assert_eq!(ids, vec!["q6", "q7", "q8", "q9"]);
415    }
416
417    #[test]
418    fn empty_aggregator_has_no_summary() {
419        let a = FeedbackAggregator::new();
420        assert!(a.is_empty());
421        assert!(a.summarize("anything").is_none());
422        assert!(a.summarize_all().is_empty());
423        assert!(a.corpus_ids().is_empty());
424    }
425
426    #[test]
427    fn summarize_single_corpus() {
428        let mut a = FeedbackAggregator::new();
429        a.record(ev("c1", "q1", 0.8));
430        a.record(ev("c1", "q2", 0.6));
431        a.record(ev("c1", "q3", 1.0));
432        let s = a.summarize("c1").unwrap();
433        assert_eq!(s.n_events, 3);
434        assert!((s.mean_score - 0.8).abs() < 1e-12);
435        assert!((s.min_score - 0.6).abs() < 1e-12);
436        assert!((s.max_score - 1.0).abs() < 1e-12);
437    }
438
439    #[test]
440    fn summarize_clamps_out_of_range_scores() {
441        let mut a = FeedbackAggregator::new();
442        a.record(ev("c", "q1", -0.5));
443        a.record(ev("c", "q2", 1.5));
444        let s = a.summarize("c").unwrap();
445        // -0.5 → 0, 1.5 → 1 → mean = 0.5
446        assert!((s.mean_score - 0.5).abs() < 1e-12);
447        assert_eq!(s.min_score, 0.0);
448        assert_eq!(s.max_score, 1.0);
449    }
450
451    #[test]
452    fn summarize_isolates_corpus_ids() {
453        let mut a = FeedbackAggregator::new();
454        a.record(ev("alpha", "q", 0.2));
455        a.record(ev("beta", "q", 0.9));
456        assert!((a.summarize("alpha").unwrap().mean_score - 0.2).abs() < 1e-12);
457        assert!((a.summarize("beta").unwrap().mean_score - 0.9).abs() < 1e-12);
458        assert!(a.summarize("gamma").is_none());
459    }
460
461    #[test]
462    fn summarize_all_is_sorted_by_corpus_id() {
463        let mut a = FeedbackAggregator::new();
464        a.record(ev("zebra", "q", 0.5));
465        a.record(ev("ant", "q", 0.5));
466        a.record(ev("mule", "q", 0.5));
467        let sums = a.summarize_all();
468        assert_eq!(sums.len(), 3);
469        assert_eq!(sums[0].corpus_id, "ant");
470        assert_eq!(sums[1].corpus_id, "mule");
471        assert_eq!(sums[2].corpus_id, "zebra");
472    }
473
474    #[test]
475    fn corpus_ids_distinct_and_sorted() {
476        let mut a = FeedbackAggregator::new();
477        a.record(ev("b", "q", 0.5));
478        a.record(ev("a", "q", 0.5));
479        a.record(ev("b", "q2", 0.5));
480        let ids = a.corpus_ids();
481        assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
482    }
483
484    #[test]
485    fn now_constructor_produces_parseable_timestamp() {
486        let e = FeedbackEvent::now("c", "q", 0.5);
487        assert_eq!(e.corpus_id, "c");
488        assert!(e.timestamp.parse::<u64>().is_ok());
489    }
490
491    #[test]
492    fn save_and_load_roundtrip() {
493        let dir = std::env::temp_dir().join(format!("sphereql_fb_test_{}", std::process::id()));
494        let _ = fs::remove_dir_all(&dir);
495        let path = dir.join("nested").join("events.json");
496
497        let mut a = FeedbackAggregator::new();
498        a.record(ev("c1", "q", 0.7));
499        a.record(ev("c2", "q", 0.3));
500        a.save(&path).unwrap();
501
502        let loaded = FeedbackAggregator::load(&path).unwrap();
503        assert_eq!(loaded.len(), 2);
504        assert_eq!(loaded.events()[0].corpus_id, "c1");
505        assert_eq!(loaded.events()[1].corpus_id, "c2");
506
507        let _ = fs::remove_dir_all(&dir);
508    }
509
510    #[test]
511    fn append_to_creates_jsonl_then_load_roundtrips() {
512        let dir = std::env::temp_dir().join(format!("sphereql_fb_jsonl_{}", std::process::id()));
513        let _ = fs::remove_dir_all(&dir);
514        let path = dir.join("nested").join("events.json");
515
516        // Append three events one by one — each call is O(1).
517        ev("c1", "q1", 0.8).append_to(&path).unwrap();
518        ev("c1", "q2", 0.6).append_to(&path).unwrap();
519        ev("c2", "q3", 0.4).append_to(&path).unwrap();
520
521        let loaded = FeedbackAggregator::load(&path).unwrap();
522        assert_eq!(loaded.len(), 3);
523        assert_eq!(loaded.events()[0].query_id, "q1");
524        assert_eq!(loaded.events()[1].query_id, "q2");
525        assert_eq!(loaded.events()[2].query_id, "q3");
526
527        // Raw file really is JSONL (one record per line).
528        let raw = fs::read_to_string(&path).unwrap();
529        assert_eq!(raw.lines().count(), 3);
530        assert!(!raw.trim_start().starts_with('['));
531
532        let _ = fs::remove_dir_all(&dir);
533    }
534
535    #[test]
536    fn append_to_migrates_legacy_array_file() {
537        let dir = std::env::temp_dir().join(format!("sphereql_fb_migrate_{}", std::process::id()));
538        let _ = fs::remove_dir_all(&dir);
539        let path = dir.join("events.json");
540
541        // Seed with a legacy array file (what `save` used to write).
542        let mut legacy = FeedbackAggregator::new();
543        legacy.record(ev("c1", "q1", 0.9));
544        legacy.record(ev("c2", "q2", 0.5));
545        legacy.save(&path).unwrap();
546
547        // First append migrates the file to JSONL.
548        ev("c3", "q3", 0.7).append_to(&path).unwrap();
549
550        let loaded = FeedbackAggregator::load(&path).unwrap();
551        assert_eq!(loaded.len(), 3);
552        assert_eq!(loaded.events()[0].query_id, "q1");
553        assert_eq!(loaded.events()[2].query_id, "q3");
554
555        // Post-migration shape is JSONL.
556        let raw = fs::read_to_string(&path).unwrap();
557        assert!(!raw.trim_start().starts_with('['));
558        assert_eq!(raw.lines().count(), 3);
559
560        let _ = fs::remove_dir_all(&dir);
561    }
562
563    #[test]
564    fn load_nonexistent_returns_empty() {
565        let path = std::env::temp_dir().join("sphereql_fb_nonexistent_xyz.json");
566        let a = FeedbackAggregator::load(&path).unwrap();
567        assert!(a.is_empty());
568    }
569
570    #[test]
571    fn default_store_path_ends_with_expected_filename() {
572        let p = FeedbackAggregator::default_store_path().unwrap();
573        assert!(p.ends_with("feedback_events.json"));
574        assert!(p.iter().any(|c| c.to_string_lossy() == ".sphereql"));
575    }
576}