Skip to main content

algocline_engine/
card.rs

1//! Card storage — immutable run-result snapshots.
2//!
3//! A Card is a frozen record of a strategy run: identity, parameters,
4//! model, scenario, aggregate stats, and (optionally) per-case detail.
5//! Cards are **immutable** — once written they are never modified, only
6//! annotated via additive `append`.  Mutable **aliases** point to a
7//! Card and can be rebound freely.
8//!
9//! ## Design principles
10//!
11//! 1. **Minimal REQUIRED, maximal OPTIONAL** — v0 needs only 4 fields;
12//!    lightweight "ran this pkg" records and heavy optimize snapshots
13//!    share the same schema.
14//! 2. **Immutable append-only** — no overwrite, no delete.  New data is
15//!    added via `append` (new top-level keys only) or by creating a new
16//!    Card with a fresh `card_id`.
17//! 3. **Two-tier storage** — TOML for human-readable aggregate, JSONL
18//!    sidecar for machine-parseable per-case detail.
19//! 4. **File-primary** — files are the source of truth; in-memory state
20//!    is cache.  Cards can be copied, diffed, and version-controlled.
21//!
22//! ## Storage layout (two-tier)
23//!
24//! | Tier | File | Content |
25//! |------|------|---------|
26//! | **Tier 1** | `~/.algocline/cards/{pkg}/{card_id}.toml` | Aggregate scalars, decisions, identity, params |
27//! | **Tier 2** | `~/.algocline/cards/{pkg}/{card_id}.samples.jsonl` | Per-case raw data (JSONL, write-once) |
28//!
29//! Tier 1 holds a shareable summary (a few KB). Tier 2 holds per-case
30//! detail ��� the engine does not interpret its columns; packages define
31//! their own schema.
32//!
33//! Alias table: `~/.algocline/cards/_aliases.toml` (global).
34//!
35//! ## card_id naming
36//!
37//! `{pkg}_{model_short}_{compact_ts}_{hash6}`
38//!
39//! - `compact_ts`: `YYYYMMDDTHHMMSS` in UTC
40//! - `hash6`: first 6 hex chars of DJB2 param fingerprint
41//! - Example: `cot_opus46_20260412T061500_a3f9c1`
42//!
43//! ## v0 schema (frozen)
44//!
45//! ### REQUIRED (minimum valid Card)
46//!
47//! | Field | Type | Example |
48//! |-------|------|---------|
49//! | `schema_version` | string | `"card/v0"` |
50//! | `card_id` | string | `"cot_opus46_20260412T061500_a3f9c1"` |
51//! | `created_at` | string (RFC 3339) | `"2026-04-12T06:15:00Z"` |
52//! | `[pkg].name` | string | `"cot"` |
53//!
54//! ### OPTIONAL (auto-injected where possible)
55//!
56//! | Section | Fields |
57//! |---------|--------|
58//! | `[pkg]` | `version`, `category`, `source`, `source_ref`, `source_sha` |
59//! | `[runtime]` | `alc_version`, `lua_version`, `host_os`, `git_sha` |
60//! | `[model]` | `provider`, `id`, `id_short`, `cutoff` |
61//! | `[params]` | Free-form ctx snapshot; `param_fingerprint` for DJB2 hash |
62//! | `[strategy_params]` | Strategy-tunable parameters surfaced for sweeps / optimizers (e.g. `alpha`, `temperature`, `depth`). Free-form, but `where`-queryable as a first-class section |
63//! | `[scenario]` | `name`, `source`, `case_count`, `grader` |
64//! | `[stats]` | `pass_rate`, `mean_score`, `std`, `median`, `min`, `max`, `n` |
65//! | `[stats.by_bucket]` | Disaggregated sub-bucket stats (array of tables) |
66//! | `[cost]` | `llm_calls`, `input_tokens`, `output_tokens`, `elapsed_ms`, `usd_estimate` |
67//! | `[optimize]` | `target`, `search`, `rounds_used`, `top_k` (for optimize Cards) |
68//! | `[metadata]` | Free-form escape hatch. Recognized lineage conventions: `prior_card_id` (parent Card id), `prior_relation` (relation kind, e.g. `"sweep_variant"`, `"reflection_of"`, `"derived_from"`) |
69//!
70//! ## Lua API (`alc.card.*`)
71//!
72//! | Function | Description |
73//! |----------|-------------|
74//! | `create(table)` | Write new Card (Tier 1). Returns `{ card_id, path }` |
75//! | `get(card_id)` | Read Card by id. Returns table or nil |
76//! | `list(filter?)` | List Cards as summaries (newest first) |
77//! | `find(query?)` | Prisma-style `where` DSL + dotted-path `order_by` + `offset`/`limit` |
78//! | `append(card_id, fields)` | Additive-only annotation (new keys only) |
79//! | `alias_set(name, card_id, opts?)` | Pin mutable alias |
80//! | `alias_list(filter?)` | List aliases |
81//! | `get_by_alias(name)` | Resolve alias → full Card |
82//! | `write_samples(card_id, samples)` | Write Tier 2 sidecar (write-once) |
83//! | `read_samples(card_id, opts?)` | Read Tier 2 with `where` filtering + offset/limit paging |
84//! | `lineage(query)` | Walk ancestry/descendants via `metadata.prior_card_id` |
85
86use std::collections::{HashMap, HashSet};
87use std::fs;
88use std::path::{Path, PathBuf};
89use std::process;
90use std::sync::atomic::{AtomicU64, Ordering};
91use std::sync::{Arc, Mutex, OnceLock};
92
93use serde::{Serialize, Serializer};
94use serde_json::{json, Value as Json};
95
96pub const SCHEMA_VERSION: &str = "card/v0";
97
98// ═══════════════════════════════════════════════════════════════
99// CardStore trait — physical I/O abstraction.
100// ═══════════════════════════════════════════════════════════════
101//
102// Card domain logic (schema / Query DSL / Lineage) is backend-
103// neutral. Only the physical read/write layer is swappable.
104//
105// The default backend is `FileCardStore`, which preserves the
106// legacy `~/.algocline/cards/{pkg}/{card_id}.toml` layout
107// byte-for-byte. Alternative backends (PathCardStore, SqliteCardStore,
108// MemoryCardStore) can be added by implementing this trait.
109//
110// Locators are `PathBuf` values. For FileCardStore they are real
111// filesystem paths; for non-file backends they are synthetic paths
112// (e.g. `sqlite:///db.sqlite#card/{id}`) — the value is opaque to
113// the domain layer and only exposed via the Lua `alc.card.create`
114// / `alc.card.write_samples` return values.
115
116/// Storage backend for Cards.
117///
118/// Implementations must be `Send + Sync` so that they can be shared
119/// across Lua host threads safely. All methods may fail with an
120/// error `String` describing the backend-specific failure.
121pub trait CardStore: Send + Sync {
122    // ─── Card CRUD ─────────────────────────────────────────────
123
124    /// Write a new Card (Tier 1 TOML).
125    ///
126    /// The caller has already:
127    ///   - validated `pkg` and `card_id` via [`validate_name`]
128    ///   - serialized `toml_text` with `toml::to_string_pretty`
129    ///
130    /// Fails if a Card with the same id already exists
131    /// (immutability).  Returns the locator of the written Card.
132    fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
133
134    /// Overwrite an existing Card (append flow).
135    ///
136    /// Append is additive-only w.r.t. keys, but the underlying
137    /// TOML file is rewritten in place; callers must have validated
138    /// the additive-only constraint before calling this.
139    fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
140
141    /// Locate a Card file by id. Returns `None` if not found.
142    fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String>;
143
144    /// Read a Card's raw TOML text by id. Returns `None` if missing.
145    fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String>;
146
147    /// List `(pkg, locator)` pairs for every Card file in the store.
148    ///
149    /// When `pkg_filter` is `Some(name)`, restrict to that pkg
150    /// subdir. Non-existent pkg subdir yields an empty Vec.
151    ///
152    /// Order is implementation-defined — callers sort explicitly.
153    fn list_card_locators(
154        &self,
155        pkg_filter: Option<&str>,
156    ) -> Result<Vec<(String, PathBuf)>, String>;
157
158    /// Read raw TOML text from a locator returned by
159    /// [`Self::list_card_locators`]. `Ok(None)` on read failure so
160    /// scans can skip corrupt files without aborting.
161    fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String>;
162
163    // ─── Alias table ───────────────────────────────────────────
164
165    fn read_aliases(&self) -> Result<Vec<Alias>, String>;
166    fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String>;
167
168    // ─── Samples sidecar ───────────────────────────────────────
169
170    /// Check whether a samples sidecar exists for `card_id`.
171    fn samples_exists(&self, card_id: &str) -> Result<bool, String>;
172
173    /// Write a samples sidecar (write-once).
174    ///
175    /// `jsonl_text` is the complete JSONL payload (one JSON line
176    /// per sample, `\n`-terminated). Fails if a sidecar already
177    /// exists. Returns the locator.
178    fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String>;
179
180    /// Read a samples sidecar as raw JSONL text. Returns `None`
181    /// when no sidecar exists (samples are optional).
182    fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String>;
183
184    // ─── Import ────────────────────────────────────────────────
185
186    /// Import Card files from `source_dir` into the store under
187    /// `pkg`. First-writer wins (existing Cards are skipped).
188    /// Returns `(imported, skipped)` card_id lists.
189    fn import_from_dir(
190        &self,
191        source_dir: &Path,
192        pkg: &str,
193    ) -> Result<(Vec<String>, Vec<String>), String>;
194}
195
196/// Return the default backend (File-backed, `~/.algocline/cards/`).
197fn default_store() -> Result<FileCardStore, String> {
198    FileCardStore::from_home()
199}
200
201/// Resolve the cards root directory, creating it if needed.
202fn cards_dir() -> Result<PathBuf, String> {
203    let home = dirs::home_dir().ok_or("Cannot determine home directory")?;
204    let dir = home.join(".algocline").join("cards");
205    if !dir.exists() {
206        fs::create_dir_all(&dir).map_err(|e| format!("Failed to create cards dir: {e}"))?;
207    }
208    Ok(dir)
209}
210
211fn validate_name(name: &str, kind: &str) -> Result<(), String> {
212    if name.is_empty()
213        || name.contains('/')
214        || name.contains('\\')
215        || name.contains("..")
216        || name.contains('\0')
217    {
218        return Err(format!("Invalid {kind} name: '{name}'"));
219    }
220    Ok(())
221}
222
223/// DJB2 hash, hex-encoded. Used for param_fingerprint and card_id hash segment.
224fn djb2_hex(s: &str) -> String {
225    let mut h: u64 = 5381;
226    for b in s.bytes() {
227        h = h.wrapping_mul(33).wrapping_add(b as u64);
228    }
229    format!("{h:016x}")
230}
231
232/// Short-hash: last 6 hex chars of djb2.
233///
234/// DJB2's high bits are dominated by the `5381 * 33^n` term (same for any
235/// input of equal length), so the top hex digits collide easily for same-
236/// length inputs that differ only in a few byte positions. The low bits,
237/// driven by the most-recent bytes, mix well enough for short-hash use.
238fn hash6(s: &str) -> String {
239    let hex = djb2_hex(s);
240    let start = hex.len().saturating_sub(6);
241    hex[start..].to_string()
242}
243
244/// Stable serialization of a JSON value for hashing (sorted keys).
245fn stable_json(v: &Json) -> String {
246    let mut buf = String::new();
247    stable_json_into(v, &mut buf);
248    buf
249}
250fn stable_json_into(v: &Json, buf: &mut String) {
251    match v {
252        Json::Null => buf.push_str("null"),
253        Json::Bool(b) => buf.push_str(if *b { "true" } else { "false" }),
254        Json::Number(n) => buf.push_str(&n.to_string()),
255        Json::String(s) => {
256            buf.push('"');
257            buf.push_str(s);
258            buf.push('"');
259        }
260        Json::Array(a) => {
261            buf.push('[');
262            for (i, item) in a.iter().enumerate() {
263                if i > 0 {
264                    buf.push(',');
265                }
266                stable_json_into(item, buf);
267            }
268            buf.push(']');
269        }
270        Json::Object(m) => {
271            let mut keys: Vec<&String> = m.keys().collect();
272            keys.sort();
273            buf.push('{');
274            for (i, k) in keys.iter().enumerate() {
275                if i > 0 {
276                    buf.push(',');
277                }
278                buf.push('"');
279                buf.push_str(k);
280                buf.push_str("\":");
281                stable_json_into(&m[*k], buf);
282            }
283            buf.push('}');
284        }
285    }
286}
287
288/// Derive a short model id (e.g. "claude-opus-4-6" -> "opus46").
289/// v0: best-effort. Falls back to "model" if input is empty.
290fn short_model(id: &str) -> String {
291    if id.is_empty() {
292        return "model".into();
293    }
294    // Strip common vendor prefixes.
295    let stripped = id
296        .strip_prefix("claude-")
297        .or_else(|| id.strip_prefix("gpt-"))
298        .unwrap_or(id);
299    // Keep alnum only.
300    let s: String = stripped
301        .chars()
302        .filter(|c| c.is_ascii_alphanumeric())
303        .collect();
304    if s.is_empty() {
305        "model".into()
306    } else {
307        s
308    }
309}
310
311/// RFC3339 UTC "YYYY-MM-DDTHH:MM:SSZ" from current system time.
312fn now_rfc3339() -> String {
313    let secs = std::time::SystemTime::now()
314        .duration_since(std::time::UNIX_EPOCH)
315        .map(|d| d.as_secs())
316        .unwrap_or(0) as i64;
317    let days = secs.div_euclid(86400);
318    let tod = secs.rem_euclid(86400);
319    let (y, mo, d) = civil_from_days(days);
320    let hh = tod / 3600;
321    let mm = (tod % 3600) / 60;
322    let ss = tod % 60;
323    format!("{y:04}-{mo:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
324}
325
326/// YYYYMMDDTHHMMSS for current UTC time (compact, sortable).
327///
328/// Used in card_id generation so that:
329///   - rapid successive runs don't collide on the hash6 segment
330///   - string sort of card_id = chronological order
331fn now_compact() -> String {
332    let secs = std::time::SystemTime::now()
333        .duration_since(std::time::UNIX_EPOCH)
334        .map(|d| d.as_secs())
335        .unwrap_or(0) as i64;
336    let days = secs.div_euclid(86400);
337    let tod = secs.rem_euclid(86400);
338    let (y, mo, d) = civil_from_days(days);
339    let hh = tod / 3600;
340    let mm = (tod % 3600) / 60;
341    let ss = tod % 60;
342    format!("{y:04}{mo:02}{d:02}T{hh:02}{mm:02}{ss:02}")
343}
344
345/// Howard Hinnant's civil_from_days algorithm.
346fn civil_from_days(z: i64) -> (i32, u32, u32) {
347    let z = z + 719468;
348    let era = if z >= 0 { z } else { z - 146096 } / 146097;
349    let doe = (z - era * 146097) as u64;
350    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
351    let y = yoe as i64 + era * 400;
352    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
353    let mp = (5 * doy + 2) / 153;
354    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
355    let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
356    let y = y + if m <= 2 { 1 } else { 0 };
357    (y as i32, m, d)
358}
359
360/// Converter: serde_json::Value -> toml::Value.
361/// Nulls are dropped (TOML has no null). Mixed-type arrays are allowed in TOML 1.0+.
362fn json_to_toml(v: Json) -> Result<toml::Value, String> {
363    Ok(match v {
364        Json::Null => return Err("TOML does not support null values".into()),
365        Json::Bool(b) => toml::Value::Boolean(b),
366        Json::Number(n) => {
367            if let Some(i) = n.as_i64() {
368                toml::Value::Integer(i)
369            } else if let Some(f) = n.as_f64() {
370                toml::Value::Float(f)
371            } else {
372                return Err(format!("Unsupported number: {n}"));
373            }
374        }
375        Json::String(s) => toml::Value::String(s),
376        Json::Array(a) => {
377            let mut out = Vec::with_capacity(a.len());
378            for item in a {
379                if !item.is_null() {
380                    out.push(json_to_toml(item)?);
381                }
382            }
383            toml::Value::Array(out)
384        }
385        Json::Object(m) => {
386            let mut table = toml::map::Map::new();
387            for (k, val) in m {
388                if val.is_null() {
389                    continue;
390                }
391                table.insert(k, json_to_toml(val)?);
392            }
393            toml::Value::Table(table)
394        }
395    })
396}
397
398/// Converter: toml::Value -> serde_json::Value (for alc.card.get()).
399fn toml_to_json(v: toml::Value) -> Json {
400    match v {
401        toml::Value::String(s) => Json::String(s),
402        toml::Value::Integer(i) => json!(i),
403        toml::Value::Float(f) => json!(f),
404        toml::Value::Boolean(b) => Json::Bool(b),
405        toml::Value::Datetime(dt) => Json::String(dt.to_string()),
406        toml::Value::Array(a) => Json::Array(a.into_iter().map(toml_to_json).collect()),
407        toml::Value::Table(t) => {
408            let mut m = serde_json::Map::new();
409            for (k, v) in t {
410                m.insert(k, toml_to_json(v));
411            }
412            Json::Object(m)
413        }
414    }
415}
416
417/// Extract [pkg].name from an input JSON object. REQUIRED.
418fn require_pkg_name(input: &Json) -> Result<String, String> {
419    let name = input
420        .get("pkg")
421        .and_then(|p| p.get("name"))
422        .and_then(|n| n.as_str())
423        .ok_or_else(|| "alc.card.create: pkg.name is required".to_string())?
424        .to_string();
425    validate_name(&name, "pkg")?;
426    Ok(name)
427}
428
429/// Main create entry. Returns (card_id, absolute_path).
430pub fn create(input: Json) -> Result<(String, PathBuf), String> {
431    create_with_store(&default_store()?, input)
432}
433
434/// Create a new Card backed by `store`. See [`create`] for the default-store variant.
435pub fn create_with_store(
436    store: &dyn CardStore,
437    mut input: Json,
438) -> Result<(String, PathBuf), String> {
439    if !input.is_object() {
440        return Err("alc.card.create: input must be a table".into());
441    }
442    let pkg_name = require_pkg_name(&input)?;
443    let obj = input.as_object_mut().unwrap();
444
445    // ─── Auto-inject REQUIRED fields ──────────────────────────
446    obj.entry("schema_version".to_string())
447        .or_insert_with(|| json!(SCHEMA_VERSION));
448    obj.entry("created_at".to_string())
449        .or_insert_with(|| json!(now_rfc3339()));
450    obj.entry("created_by".to_string())
451        .or_insert_with(|| json!(format!("alc@{}", env!("CARGO_PKG_VERSION"))));
452
453    // ─── param_fingerprint (if [params] present) ──────────────
454    if let Some(params) = obj.get("params").cloned() {
455        if params.is_object() {
456            let fp = djb2_hex(&stable_json(&params));
457            obj.insert("param_fingerprint".to_string(), json!(fp));
458        }
459    }
460
461    // ─── card_id generation (if absent) ───────────────────────
462    let card_id = match obj.get("card_id").and_then(|v| v.as_str()) {
463        Some(id) if !id.is_empty() => id.to_string(),
464        _ => {
465            let model_id = obj
466                .get("model")
467                .and_then(|m| m.get("id"))
468                .and_then(|v| v.as_str())
469                .unwrap_or("");
470            let model_short = short_model(model_id);
471            let ts = now_compact();
472            let fp_seed = stable_json(&Json::Object(obj.clone()));
473            let h = hash6(&fp_seed);
474            format!("{pkg_name}_{model_short}_{ts}_{h}")
475        }
476    };
477    validate_name(&card_id, "card_id")?;
478    obj.insert("card_id".to_string(), json!(card_id.clone()));
479
480    let toml_val = json_to_toml(input)?;
481    let text = toml::to_string_pretty(&toml_val)
482        .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
483    let path = store.write_new_card(&pkg_name, &card_id, &text)?;
484
485    publish(CardEvent::Created {
486        pkg: pkg_name.clone(),
487        card_id: card_id.clone(),
488        toml_text: text,
489    });
490
491    Ok((card_id, path))
492}
493
494/// Read a Card by id. Returns None if not found.
495pub fn get(card_id: &str) -> Result<Option<Json>, String> {
496    get_with_store(&default_store()?, card_id)
497}
498
499/// Read a Card from `store`. See [`get`] for the default-store variant.
500pub fn get_with_store(store: &dyn CardStore, card_id: &str) -> Result<Option<Json>, String> {
501    let text = match store.read_card_text(card_id)? {
502        Some(t) => t,
503        None => return Ok(None),
504    };
505    let val: toml::Value =
506        toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
507    Ok(Some(toml_to_json(val)))
508}
509
510/// Summary row for `alc.card.list()`.
511#[derive(Debug, Clone)]
512pub struct Summary {
513    pub card_id: String,
514    pub pkg: String,
515    pub created_at: Option<String>,
516    pub model: Option<String>,
517    pub scenario: Option<String>,
518    pub pass_rate: Option<f64>,
519}
520
521impl Summary {
522    fn to_json(&self) -> Json {
523        let mut m = serde_json::Map::new();
524        m.insert("card_id".into(), json!(self.card_id));
525        m.insert("pkg".into(), json!(self.pkg));
526        if let Some(v) = &self.created_at {
527            m.insert("created_at".into(), json!(v));
528        }
529        if let Some(v) = &self.model {
530            m.insert("model".into(), json!(v));
531        }
532        if let Some(v) = &self.scenario {
533            m.insert("scenario".into(), json!(v));
534        }
535        if let Some(v) = self.pass_rate {
536            m.insert("pass_rate".into(), json!(v));
537        }
538        Json::Object(m)
539    }
540}
541
542fn summarize(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<Summary> {
543    let text = store.read_locator_text(locator).ok().flatten()?;
544    let val: toml::Value = toml::from_str(&text).ok()?;
545    let card_id = val
546        .get("card_id")
547        .and_then(|v| v.as_str())
548        .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
549        .to_string();
550    let created_at = val
551        .get("created_at")
552        .and_then(|v| v.as_str())
553        .map(String::from);
554    let model = val
555        .get("model")
556        .and_then(|m| m.get("id"))
557        .and_then(|v| v.as_str())
558        .map(String::from);
559    let scenario = val
560        .get("scenario")
561        .and_then(|s| s.get("name"))
562        .and_then(|v| v.as_str())
563        .map(String::from);
564    let pass_rate = val
565        .get("stats")
566        .and_then(|s| s.get("pass_rate"))
567        .and_then(|v| v.as_float());
568    Some(Summary {
569        card_id,
570        pkg: pkg.to_string(),
571        created_at,
572        model,
573        scenario,
574        pass_rate,
575    })
576}
577
578/// List cards. `pkg_filter = Some("name")` restricts to that pkg subdir.
579pub fn list(pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
580    list_with_store(&default_store()?, pkg_filter)
581}
582
583/// List cards from `store`. See [`list`] for the default-store variant.
584pub fn list_with_store(
585    store: &dyn CardStore,
586    pkg_filter: Option<&str>,
587) -> Result<Vec<Summary>, String> {
588    let locators = store.list_card_locators(pkg_filter)?;
589    let mut out = Vec::with_capacity(locators.len());
590    for (pkg, loc) in &locators {
591        if let Some(s) = summarize(store, loc, pkg) {
592            out.push(s);
593        }
594    }
595
596    // Sort newest first. card_id embeds a compact UTC timestamp so it's
597    // naturally chronological; we still prefer created_at when present
598    // (some callers may override it), falling back to card_id.
599    out.sort_by(|a, b| {
600        b.created_at
601            .cmp(&a.created_at)
602            .then_with(|| b.card_id.cmp(&a.card_id))
603    });
604    Ok(out)
605}
606
607pub fn summaries_to_json(rows: &[Summary]) -> Json {
608    Json::Array(rows.iter().map(|s| s.to_json()).collect())
609}
610
611// ───────────────────────────────────────────────────────────────
612// P1 API: append / alias_{set,list} / find
613// ───────────────────────────────────────────────────────────────
614
615/// Append new top-level fields to an existing Card.
616///
617/// Semantics: **additive only**. If any top-level key in `fields` already
618/// exists in the Card, the call fails — Cards are immutable w.r.t. existing
619/// data. New top-level keys are inserted and the Card file is rewritten
620/// atomically.
621///
622/// Returns the merged Card JSON.
623pub fn append(card_id: &str, fields: Json) -> Result<Json, String> {
624    append_with_store(&default_store()?, card_id, fields)
625}
626
627/// Append to a Card in `store`. See [`append`] for the default-store variant.
628pub fn append_with_store(
629    store: &dyn CardStore,
630    card_id: &str,
631    fields: Json,
632) -> Result<Json, String> {
633    let text = store
634        .read_card_text(card_id)?
635        .ok_or_else(|| format!("alc.card.append: card '{card_id}' not found"))?;
636    let fields_obj = match fields {
637        Json::Object(m) => m,
638        _ => return Err("alc.card.append: fields must be a table".into()),
639    };
640
641    let existing: toml::Value =
642        toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
643    let mut existing_json = toml_to_json(existing);
644    let existing_obj = existing_json
645        .as_object_mut()
646        .ok_or_else(|| format!("Card '{card_id}' is not a table"))?;
647
648    for (k, v) in fields_obj {
649        if existing_obj.contains_key(&k) {
650            return Err(format!(
651                "alc.card.append: key '{k}' already set on card '{card_id}' (immutable)"
652            ));
653        }
654        if !v.is_null() {
655            existing_obj.insert(k, v);
656        }
657    }
658
659    let toml_val = json_to_toml(existing_json.clone())?;
660    let text = toml::to_string_pretty(&toml_val)
661        .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
662    store.overwrite_card(card_id, &text)?;
663
664    publish(CardEvent::Appended {
665        card_id: card_id.to_string(),
666        toml_text: text,
667    });
668
669    Ok(existing_json)
670}
671
672#[derive(Debug, Clone)]
673pub struct Alias {
674    pub name: String,
675    pub card_id: String,
676    pub pkg: Option<String>,
677    pub set_at: String,
678    pub note: Option<String>,
679}
680
681impl Alias {
682    fn to_json(&self) -> Json {
683        let mut m = serde_json::Map::new();
684        m.insert("name".into(), json!(self.name));
685        m.insert("card_id".into(), json!(self.card_id));
686        if let Some(p) = &self.pkg {
687            m.insert("pkg".into(), json!(p));
688        }
689        m.insert("set_at".into(), json!(self.set_at));
690        if let Some(n) = &self.note {
691            m.insert("note".into(), json!(n));
692        }
693        Json::Object(m)
694    }
695}
696
697/// Bind (or rebind) an alias to a Card.
698///
699/// Validates that `card_id` exists. If an alias with the same `name` already
700/// exists it is overwritten — the alias table is intentionally mutable even
701/// though the Cards themselves are not.
702pub fn alias_set(
703    name: &str,
704    card_id: &str,
705    pkg: Option<&str>,
706    note: Option<&str>,
707) -> Result<Alias, String> {
708    alias_set_with_store(&default_store()?, name, card_id, pkg, note)
709}
710
711/// Bind an alias in `store`. See [`alias_set`] for the default-store variant.
712pub fn alias_set_with_store(
713    store: &dyn CardStore,
714    name: &str,
715    card_id: &str,
716    pkg: Option<&str>,
717    note: Option<&str>,
718) -> Result<Alias, String> {
719    validate_name(name, "alias")?;
720    if store.find_card_locator(card_id)?.is_none() {
721        return Err(format!("alc.card.alias_set: card '{card_id}' not found"));
722    }
723    let mut aliases = store.read_aliases()?;
724    aliases.retain(|a| a.name != name);
725    let entry = Alias {
726        name: name.to_string(),
727        card_id: card_id.to_string(),
728        pkg: pkg.map(String::from),
729        set_at: now_rfc3339(),
730        note: note.map(String::from),
731    };
732    aliases.push(entry.clone());
733    store.write_aliases(&aliases)?;
734
735    // Mirror the full alias table to subscribers as TOML text. The
736    // primary FileCardStore already serialized it internally; we
737    // re-serialize here using the same shape so subscribers stay in
738    // byte-for-byte parity. A serialization failure is best-effort
739    // (log + skip).
740    match serialize_aliases_toml(&aliases) {
741        Ok(text) => publish(CardEvent::AliasesWritten { toml_text: text }),
742        Err(e) => tracing::warn!(error = %e, "alias_set: failed to serialize aliases for publish"),
743    }
744
745    Ok(entry)
746}
747
748/// Serialize `aliases` to a TOML document matching the primary
749/// `_aliases.toml` layout. Broken out so that subscribers can receive
750/// the exact byte-for-byte dump.
751fn serialize_aliases_toml(aliases: &[Alias]) -> Result<String, String> {
752    let mut arr = Vec::with_capacity(aliases.len());
753    for a in aliases {
754        let mut t = toml::map::Map::new();
755        t.insert("name".into(), toml::Value::String(a.name.clone()));
756        t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
757        if let Some(p) = &a.pkg {
758            t.insert("pkg".into(), toml::Value::String(p.clone()));
759        }
760        t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
761        if let Some(n) = &a.note {
762            t.insert("note".into(), toml::Value::String(n.clone()));
763        }
764        arr.push(toml::Value::Table(t));
765    }
766    let mut root = toml::map::Map::new();
767    root.insert("alias".into(), toml::Value::Array(arr));
768    toml::to_string_pretty(&toml::Value::Table(root))
769        .map_err(|e| format!("Failed to serialize aliases: {e}"))
770}
771
772/// Resolve an alias name to its bound Card and return the full Card JSON.
773///
774/// Shortcut for `alias_list → filter → get`. Returns `None` when the alias
775/// does not exist. Errors when the alias points at a missing Card — that
776/// would indicate a corrupt alias table (the target was deleted out of band).
777pub fn get_by_alias(name: &str) -> Result<Option<Json>, String> {
778    get_by_alias_with_store(&default_store()?, name)
779}
780
781/// Resolve an alias in `store`. See [`get_by_alias`] for the default-store variant.
782pub fn get_by_alias_with_store(store: &dyn CardStore, name: &str) -> Result<Option<Json>, String> {
783    validate_name(name, "alias")?;
784    let aliases = store.read_aliases()?;
785    let Some(alias) = aliases.into_iter().find(|a| a.name == name) else {
786        return Ok(None);
787    };
788    match get_with_store(store, &alias.card_id)? {
789        Some(card) => Ok(Some(card)),
790        None => Err(format!(
791            "alc.card.get_by_alias: alias '{name}' points at missing card '{}'",
792            alias.card_id
793        )),
794    }
795}
796
797/// List aliases, optionally filtered by pkg.
798pub fn alias_list(pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
799    alias_list_with_store(&default_store()?, pkg_filter)
800}
801
802/// List aliases from `store`. See [`alias_list`] for the default-store variant.
803pub fn alias_list_with_store(
804    store: &dyn CardStore,
805    pkg_filter: Option<&str>,
806) -> Result<Vec<Alias>, String> {
807    let mut aliases = store.read_aliases()?;
808    if let Some(p) = pkg_filter {
809        aliases.retain(|a| a.pkg.as_deref() == Some(p));
810    }
811    Ok(aliases)
812}
813
814pub fn aliases_to_json(rows: &[Alias]) -> Json {
815    Json::Array(rows.iter().map(|a| a.to_json()).collect())
816}
817
818// ═══════════════════════════════════════════════════════════════
819// Where DSL — Prisma/Mongo-style nested predicates
820// ═══════════════════════════════════════════════════════════════
821//
822// See `workspace/tasks/card-dsl/design.md` for the full spec.
823//
824// Syntax (JSON form, as received from Lua / MCP):
825//
826//   where = {
827//     pkg: "cot",                                      // implicit eq
828//     stats: { pass_rate: { gte: 0.8 }, n: { gte: 30 } },
829//     strategy_params: { temperature: { gte: 0.7 } },
830//     prior_card_id: { exists: true },
831//     _or: [ {...}, {...} ],                           // logical ops
832//     _not: { model: { id: "claude-haiku-4-5-20251001" } },
833//   }
834//
835// Semantics:
836//   * Multiple keys in the same object → implicit AND.
837//   * Nested object value → section (path extension).
838//   * Object whose every key is a reserved operator name → leaf operator
839//     object; applies the operators to the value at the current path.
840//   * Scalar/array value → implicit eq.
841//   * Reserved logical keys: `_and` / `_or` / `_not`.
842//   * Reserved operator keys: `eq ne lt lte gt gte in nin exists
843//     contains starts_with`.  Card schemas must not use these names as
844//     field names under any section.
845//
846// Missing-field comparison:
847//   * `eq/lt/lte/gt/gte/in/contains/starts_with` → false on missing
848//   * `ne/nin`                                   → true  on missing
849//   * `exists`                                   → explicit
850//
851
852/// Single comparison operator.
853#[derive(Debug, Clone, PartialEq)]
854pub enum CmpOp {
855    Eq,
856    Ne,
857    Lt,
858    Lte,
859    Gt,
860    Gte,
861    In,
862    Nin,
863    Exists,
864    Contains,
865    StartsWith,
866}
867
868impl CmpOp {
869    fn from_key(k: &str) -> Option<Self> {
870        Some(match k {
871            "eq" => Self::Eq,
872            "ne" => Self::Ne,
873            "lt" => Self::Lt,
874            "lte" => Self::Lte,
875            "gt" => Self::Gt,
876            "gte" => Self::Gte,
877            "in" => Self::In,
878            "nin" => Self::Nin,
879            "exists" => Self::Exists,
880            "contains" => Self::Contains,
881            "starts_with" => Self::StartsWith,
882            _ => return None,
883        })
884    }
885}
886
887/// One parsed comparison: `path` points at a nested field,
888/// `op` + `value` describe how to compare it.
889#[derive(Debug, Clone)]
890pub struct Comparison {
891    pub path: Vec<String>,
892    pub op: CmpOp,
893    pub value: Json,
894}
895
896/// Parsed predicate tree.
897#[derive(Debug, Clone)]
898pub enum Predicate {
899    And(Vec<Predicate>),
900    Or(Vec<Predicate>),
901    Not(Box<Predicate>),
902    Cmp(Comparison),
903}
904
905/// Is `obj` entirely composed of reserved operator keys?
906/// Empty objects return false (meaningless as an operator object).
907fn is_operator_object(obj: &serde_json::Map<String, Json>) -> bool {
908    if obj.is_empty() {
909        return false;
910    }
911    obj.keys().all(|k| CmpOp::from_key(k).is_some())
912}
913
914/// Parse a `where` JSON value into a `Predicate`.
915///
916/// `prefix` is the current nested-key path as we descend through
917/// section objects.
918pub fn parse_where(value: &Json) -> Result<Predicate, String> {
919    parse_predicate(value, &[])
920}
921
922fn parse_predicate(value: &Json, prefix: &[String]) -> Result<Predicate, String> {
923    let obj = value
924        .as_object()
925        .ok_or_else(|| "where clause must be a table".to_string())?;
926
927    let mut clauses: Vec<Predicate> = Vec::new();
928
929    for (key, val) in obj {
930        match key.as_str() {
931            "_and" => {
932                let arr = val
933                    .as_array()
934                    .ok_or_else(|| "_and must be an array of sub-predicates".to_string())?;
935                let mut subs = Vec::with_capacity(arr.len());
936                for sub in arr {
937                    subs.push(parse_predicate(sub, prefix)?);
938                }
939                clauses.push(Predicate::And(subs));
940            }
941            "_or" => {
942                let arr = val
943                    .as_array()
944                    .ok_or_else(|| "_or must be an array of sub-predicates".to_string())?;
945                let mut subs = Vec::with_capacity(arr.len());
946                for sub in arr {
947                    subs.push(parse_predicate(sub, prefix)?);
948                }
949                clauses.push(Predicate::Or(subs));
950            }
951            "_not" => {
952                clauses.push(Predicate::Not(Box::new(parse_predicate(val, prefix)?)));
953            }
954            _ => {
955                // Field key — extend the current path.
956                let mut new_path = prefix.to_vec();
957                new_path.push(key.clone());
958
959                match val {
960                    Json::Object(m) if is_operator_object(m) => {
961                        // Leaf: operator object at this path.
962                        for (op_key, op_val) in m {
963                            let op = CmpOp::from_key(op_key).expect("validated above");
964                            clauses.push(Predicate::Cmp(Comparison {
965                                path: new_path.clone(),
966                                op,
967                                value: op_val.clone(),
968                            }));
969                        }
970                    }
971                    Json::Object(_) => {
972                        // Nested section: recurse with extended path.
973                        clauses.push(parse_predicate(val, &new_path)?);
974                    }
975                    _ => {
976                        // Scalar/array: implicit eq.
977                        clauses.push(Predicate::Cmp(Comparison {
978                            path: new_path,
979                            op: CmpOp::Eq,
980                            value: val.clone(),
981                        }));
982                    }
983                }
984            }
985        }
986    }
987
988    if clauses.len() == 1 {
989        Ok(clauses.remove(0))
990    } else {
991        Ok(Predicate::And(clauses))
992    }
993}
994
995/// Fetch a nested value from a Card JSON by dotted path.
996fn fetch_path<'a>(card: &'a Json, path: &[String]) -> Option<&'a Json> {
997    let mut node = card;
998    for key in path {
999        let obj = node.as_object()?;
1000        node = obj.get(key)?;
1001    }
1002    Some(node)
1003}
1004
1005/// Compare two JSON scalars with a numeric/string/bool comparator.
1006/// Returns None when the types aren't comparable.
1007fn json_cmp(a: &Json, b: &Json) -> Option<std::cmp::Ordering> {
1008    match (a, b) {
1009        (Json::Number(x), Json::Number(y)) => {
1010            let xf = x.as_f64()?;
1011            let yf = y.as_f64()?;
1012            xf.partial_cmp(&yf)
1013        }
1014        (Json::String(x), Json::String(y)) => Some(x.cmp(y)),
1015        (Json::Bool(x), Json::Bool(y)) => Some(x.cmp(y)),
1016        _ => None,
1017    }
1018}
1019
1020fn json_eq(a: &Json, b: &Json) -> bool {
1021    match (a, b) {
1022        (Json::Number(x), Json::Number(y)) => match (x.as_f64(), y.as_f64()) {
1023            (Some(xf), Some(yf)) => xf == yf,
1024            _ => a == b,
1025        },
1026        _ => a == b,
1027    }
1028}
1029
1030fn eval_cmp(cmp: &Comparison, card: &Json) -> bool {
1031    let actual = fetch_path(card, &cmp.path);
1032    let exists = actual.is_some();
1033
1034    match cmp.op {
1035        CmpOp::Exists => {
1036            let want = cmp.value.as_bool().unwrap_or(true);
1037            exists == want
1038        }
1039        CmpOp::Ne => match actual {
1040            None => true,
1041            Some(v) => !json_eq(v, &cmp.value),
1042        },
1043        CmpOp::Nin => match actual {
1044            None => true,
1045            Some(v) => match cmp.value.as_array() {
1046                Some(arr) => !arr.iter().any(|e| json_eq(e, v)),
1047                None => false,
1048            },
1049        },
1050        CmpOp::Eq => actual.is_some_and(|v| json_eq(v, &cmp.value)),
1051        CmpOp::In => actual.is_some_and(|v| match cmp.value.as_array() {
1052            Some(arr) => arr.iter().any(|e| json_eq(e, v)),
1053            None => false,
1054        }),
1055        CmpOp::Lt | CmpOp::Lte | CmpOp::Gt | CmpOp::Gte => {
1056            let Some(v) = actual else { return false };
1057            let Some(ord) = json_cmp(v, &cmp.value) else {
1058                return false;
1059            };
1060            use std::cmp::Ordering::{Equal, Greater, Less};
1061            matches!(
1062                (&cmp.op, ord),
1063                (CmpOp::Lt, Less)
1064                    | (CmpOp::Lte, Less | Equal)
1065                    | (CmpOp::Gt, Greater)
1066                    | (CmpOp::Gte, Greater | Equal)
1067            )
1068        }
1069        CmpOp::Contains => {
1070            let Some(Json::String(haystack)) = actual else {
1071                return false;
1072            };
1073            let Some(needle) = cmp.value.as_str() else {
1074                return false;
1075            };
1076            haystack.contains(needle)
1077        }
1078        CmpOp::StartsWith => {
1079            let Some(Json::String(haystack)) = actual else {
1080                return false;
1081            };
1082            let Some(needle) = cmp.value.as_str() else {
1083                return false;
1084            };
1085            haystack.starts_with(needle)
1086        }
1087    }
1088}
1089
1090/// Evaluate a predicate tree against a full Card JSON.
1091pub fn eval_predicate(pred: &Predicate, card: &Json) -> bool {
1092    match pred {
1093        Predicate::And(subs) => subs.iter().all(|p| eval_predicate(p, card)),
1094        Predicate::Or(subs) => subs.iter().any(|p| eval_predicate(p, card)),
1095        Predicate::Not(sub) => !eval_predicate(sub, card),
1096        Predicate::Cmp(c) => eval_cmp(c, card),
1097    }
1098}
1099
1100// ───────────────────────────────────────────────────────────────
1101// Order-by
1102// ───────────────────────────────────────────────────────────────
1103
1104/// Parsed sort key: path with optional descending flag.
1105#[derive(Debug, Clone)]
1106pub struct OrderKey {
1107    pub path: Vec<String>,
1108    pub desc: bool,
1109}
1110
1111impl OrderKey {
1112    fn parse(raw: &str) -> Result<Self, String> {
1113        if raw.is_empty() {
1114            return Err("order_by key must not be empty".into());
1115        }
1116        let (desc, rest) = if let Some(r) = raw.strip_prefix('-') {
1117            (true, r)
1118        } else {
1119            (false, raw)
1120        };
1121        let path: Vec<String> = rest.split('.').map(|s| s.to_string()).collect();
1122        if path.iter().any(|p| p.is_empty()) {
1123            return Err(format!("invalid order_by key: '{raw}'"));
1124        }
1125        Ok(Self { path, desc })
1126    }
1127}
1128
1129/// Parse an order_by JSON value.  Accepts:
1130///   - a string: `"stats.pass_rate"` or `"-stats.pass_rate"`
1131///   - an array of strings: `["-stats.pass_rate", "created_at"]`
1132pub fn parse_order_by(value: &Json) -> Result<Vec<OrderKey>, String> {
1133    match value {
1134        Json::String(s) => Ok(vec![OrderKey::parse(s)?]),
1135        Json::Array(arr) => {
1136            let mut out = Vec::with_capacity(arr.len());
1137            for v in arr {
1138                let s = v
1139                    .as_str()
1140                    .ok_or_else(|| "order_by array must contain strings".to_string())?;
1141                out.push(OrderKey::parse(s)?);
1142            }
1143            Ok(out)
1144        }
1145        _ => Err("order_by must be a string or array of strings".into()),
1146    }
1147}
1148
1149/// Query parameters for `find`.
1150#[derive(Debug, Default, Clone)]
1151pub struct FindQuery {
1152    /// Restrict scan to a single pkg subdir (I/O optimization).
1153    pub pkg: Option<String>,
1154    /// Prisma-style predicate tree.
1155    pub where_: Option<Predicate>,
1156    /// Sort keys (dotted paths, optional `-` prefix for desc).
1157    pub order_by: Vec<OrderKey>,
1158    pub limit: Option<usize>,
1159    pub offset: Option<usize>,
1160}
1161
1162/// A loaded Card row flowing through the find() pipeline.
1163///
1164/// `full` is the whole Card JSON (used by `where` evaluation and
1165/// `order_by` dotted-path lookup); `summary` is the projection
1166/// returned to callers.
1167#[derive(Debug, Clone)]
1168struct CardRow {
1169    full: Json,
1170    summary: Summary,
1171}
1172
1173/// Load a single Card file into a `CardRow`.
1174fn load_full(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<CardRow> {
1175    let text = store.read_locator_text(locator).ok().flatten()?;
1176    let val: toml::Value = toml::from_str(&text).ok()?;
1177    let json = toml_to_json(val);
1178
1179    let card_id = json
1180        .get("card_id")
1181        .and_then(|v| v.as_str())
1182        .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
1183        .to_string();
1184    let created_at = json
1185        .get("created_at")
1186        .and_then(|v| v.as_str())
1187        .map(String::from);
1188    let model = json
1189        .get("model")
1190        .and_then(|m| m.get("id"))
1191        .and_then(|v| v.as_str())
1192        .map(String::from);
1193    let scenario = json
1194        .get("scenario")
1195        .and_then(|s| s.get("name"))
1196        .and_then(|v| v.as_str())
1197        .map(String::from);
1198    let pass_rate = json
1199        .get("stats")
1200        .and_then(|s| s.get("pass_rate"))
1201        .and_then(|v| v.as_f64());
1202
1203    Some(CardRow {
1204        full: json,
1205        summary: Summary {
1206            card_id,
1207            pkg: pkg.to_string(),
1208            created_at,
1209            model,
1210            scenario,
1211            pass_rate,
1212        },
1213    })
1214}
1215
1216/// Compare two Card rows according to an ordered list of sort keys.
1217fn order_cards(a: &CardRow, b: &CardRow, keys: &[OrderKey]) -> std::cmp::Ordering {
1218    use std::cmp::Ordering;
1219    for k in keys {
1220        let va = fetch_path(&a.full, &k.path);
1221        let vb = fetch_path(&b.full, &k.path);
1222        let ord = match (va, vb) {
1223            (None, None) => Ordering::Equal,
1224            (None, Some(_)) => Ordering::Greater, // missing sorts last
1225            (Some(_), None) => Ordering::Less,
1226            (Some(x), Some(y)) => json_cmp(x, y).unwrap_or(Ordering::Equal),
1227        };
1228        let ord = if k.desc { ord.reverse() } else { ord };
1229        if ord != Ordering::Equal {
1230            return ord;
1231        }
1232    }
1233    Ordering::Equal
1234}
1235
1236/// Summary-only fields that can be sorted without loading full TOML.
1237const SUMMARY_SORT_FIELDS: &[&str] = &[
1238    "card_id",
1239    "created_at",
1240    "stats.pass_rate",
1241    "scenario.name",
1242    "model.id",
1243];
1244
1245/// Return true when the query can be answered with lightweight Summary
1246/// rows (no full-TOML load needed).
1247fn is_lightweight_query(q: &FindQuery) -> bool {
1248    q.where_.is_none()
1249        && q.order_by
1250            .iter()
1251            .all(|k| SUMMARY_SORT_FIELDS.contains(&k.path.join(".").as_str()))
1252}
1253
1254/// Sort Summary rows using order_by keys that map to Summary fields.
1255fn order_summaries(a: &Summary, b: &Summary, keys: &[OrderKey]) -> std::cmp::Ordering {
1256    use std::cmp::Ordering;
1257    for k in keys {
1258        let key_str = k.path.join(".");
1259        let ord = match key_str.as_str() {
1260            "card_id" => a.card_id.cmp(&b.card_id),
1261            "created_at" => a.created_at.cmp(&b.created_at),
1262            "stats.pass_rate" => match (a.pass_rate, b.pass_rate) {
1263                (None, None) => Ordering::Equal,
1264                (None, Some(_)) => Ordering::Greater,
1265                (Some(_), None) => Ordering::Less,
1266                (Some(x), Some(y)) => x.partial_cmp(&y).unwrap_or(Ordering::Equal),
1267            },
1268            "scenario.name" => a.scenario.cmp(&b.scenario),
1269            "model.id" => a.model.cmp(&b.model),
1270            _ => Ordering::Equal,
1271        };
1272        let ord = if k.desc { ord.reverse() } else { ord };
1273        if ord != Ordering::Equal {
1274            return ord;
1275        }
1276    }
1277    Ordering::Equal
1278}
1279
1280/// Filter/sort Cards across the store using the `where` DSL.
1281///
1282/// When no `where` clause is specified and `order_by` only references
1283/// summary-level fields, uses the lightweight `list()` path to avoid
1284/// loading full TOML.  Otherwise loads full TOML per Card.
1285pub fn find(q: FindQuery) -> Result<Vec<Summary>, String> {
1286    find_with_store(&default_store()?, q)
1287}
1288
1289/// Filter/sort Cards from `store`. See [`find`] for the default-store variant.
1290pub fn find_with_store(store: &dyn CardStore, q: FindQuery) -> Result<Vec<Summary>, String> {
1291    // Fast path: lightweight query, no full-TOML load needed.
1292    if is_lightweight_query(&q) {
1293        let mut rows = list_with_store(store, q.pkg.as_deref())?;
1294        if q.order_by.is_empty() {
1295            rows.sort_by(|a, b| {
1296                b.created_at
1297                    .cmp(&a.created_at)
1298                    .then_with(|| b.card_id.cmp(&a.card_id))
1299            });
1300        } else {
1301            rows.sort_by(|a, b| order_summaries(a, b, &q.order_by));
1302        }
1303        let out: Vec<Summary> = rows
1304            .into_iter()
1305            .skip(q.offset.unwrap_or(0))
1306            .take(q.limit.unwrap_or(usize::MAX))
1307            .collect();
1308        return Ok(out);
1309    }
1310
1311    // Full path: load entire TOML for where evaluation / arbitrary order_by.
1312    let all_rows = scan_cards(store, q.pkg.as_deref())?;
1313
1314    // Filter by where.
1315    let mut rows: Vec<CardRow> = if let Some(pred) = &q.where_ {
1316        all_rows
1317            .into_iter()
1318            .filter(|row| eval_predicate(pred, &row.full))
1319            .collect()
1320    } else {
1321        all_rows
1322    };
1323
1324    // Sort.
1325    if q.order_by.is_empty() {
1326        rows.sort_by(|a, b| {
1327            b.summary
1328                .created_at
1329                .cmp(&a.summary.created_at)
1330                .then_with(|| b.summary.card_id.cmp(&a.summary.card_id))
1331        });
1332    } else {
1333        rows.sort_by(|a, b| order_cards(a, b, &q.order_by));
1334    }
1335
1336    // Offset + limit.
1337    let out: Vec<Summary> = rows
1338        .into_iter()
1339        .skip(q.offset.unwrap_or(0))
1340        .take(q.limit.unwrap_or(usize::MAX))
1341        .map(|r| r.summary)
1342        .collect();
1343
1344    Ok(out)
1345}
1346
1347// ───────────────────────────────────────────────────────────────
1348// Lineage walker
1349// ───────────────────────────────────────────────────────────────
1350//
1351// Cards form a tree (typically, not strictly a DAG) via the
1352// `metadata.prior_card_id` convention. `lineage()` walks that tree
1353// either up (toward ancestors) or down (toward descendants) or both,
1354// up to a configurable depth, optionally filtered by `prior_relation`.
1355//
1356// Up-walk is O(depth) — each step reads one parent Card.
1357// Down-walk is O(N_cards × depth) — we scan the whole store at each
1358// level. For the current scale (hundreds to low thousands of cards)
1359// this is fine; if the store grows we can build a prior_card_id index.
1360
1361/// Walk direction for `lineage`.
1362#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1363pub enum LineageDirection {
1364    #[default]
1365    Up,
1366    Down,
1367    Both,
1368}
1369
1370impl LineageDirection {
1371    pub fn parse(s: &str) -> Result<Self, String> {
1372        match s {
1373            "up" => Ok(Self::Up),
1374            "down" => Ok(Self::Down),
1375            "both" => Ok(Self::Both),
1376            other => Err(format!(
1377                "direction must be 'up', 'down', or 'both' (got '{other}')"
1378            )),
1379        }
1380    }
1381}
1382
1383/// Query parameters for `lineage`.
1384#[derive(Debug, Clone, Default)]
1385pub struct LineageQuery {
1386    pub card_id: String,
1387    pub direction: LineageDirection,
1388    /// Max traversal depth. Default 10.
1389    pub depth: Option<usize>,
1390    /// Include a per-node `stats` field (full [stats] section).
1391    pub include_stats: bool,
1392    /// If set, only edges whose `prior_relation` is in this list are
1393    /// followed.  The root is always included regardless.
1394    pub relation_filter: Option<Vec<String>>,
1395}
1396
1397/// One node in the lineage result.
1398///
1399/// `depth` is the signed distance from the root: negative for
1400/// ancestors (up-walk), 0 for the root, positive for descendants.
1401#[derive(Debug, Clone)]
1402pub struct LineageNode {
1403    pub card_id: String,
1404    pub pkg: String,
1405    pub prior_card_id: Option<String>,
1406    pub prior_relation: Option<String>,
1407    pub depth: i32,
1408    pub stats: Option<Json>,
1409}
1410
1411/// One edge in the lineage result (child → parent, always).
1412#[derive(Debug, Clone)]
1413pub struct LineageEdge {
1414    pub from: String,
1415    pub to: String,
1416    pub relation: Option<String>,
1417}
1418
1419/// Full lineage walk result.
1420#[derive(Debug, Clone)]
1421pub struct LineageResult {
1422    pub root: String,
1423    pub nodes: Vec<LineageNode>,
1424    pub edges: Vec<LineageEdge>,
1425    pub truncated: bool,
1426}
1427
1428const DEFAULT_LINEAGE_DEPTH: usize = 10;
1429
1430/// Extract the lineage fields from a Card JSON.
1431/// Returns (prior_card_id, prior_relation).
1432fn lineage_fields(card: &Json) -> (Option<String>, Option<String>) {
1433    let meta = card.get("metadata");
1434    let prior_card_id = meta
1435        .and_then(|m| m.get("prior_card_id"))
1436        .and_then(|v| v.as_str())
1437        .map(String::from);
1438    let prior_relation = meta
1439        .and_then(|m| m.get("prior_relation"))
1440        .and_then(|v| v.as_str())
1441        .map(String::from);
1442    (prior_card_id, prior_relation)
1443}
1444
1445/// Build a LineageNode from a loaded CardRow at a given depth.
1446fn make_node(row: &CardRow, depth: i32, include_stats: bool) -> LineageNode {
1447    let (prior_card_id, prior_relation) = lineage_fields(&row.full);
1448    let stats = if include_stats {
1449        row.full.get("stats").cloned()
1450    } else {
1451        None
1452    };
1453    LineageNode {
1454        card_id: row.summary.card_id.clone(),
1455        pkg: row.summary.pkg.clone(),
1456        prior_card_id,
1457        prior_relation,
1458        depth,
1459        stats,
1460    }
1461}
1462
1463/// Check whether `relation` passes the relation_filter (None means no
1464/// filter, which always passes).
1465fn relation_passes(filter: &Option<Vec<String>>, relation: &Option<String>) -> bool {
1466    match filter {
1467        None => true,
1468        Some(allowed) => match relation {
1469            Some(r) => allowed.iter().any(|a| a == r),
1470            None => false,
1471        },
1472    }
1473}
1474
1475/// Full in-memory card index with forward and reverse lineage maps.
1476struct CardIndex {
1477    /// card_id → CardRow
1478    cards: std::collections::HashMap<String, CardRow>,
1479    /// parent card_id → Vec<child card_id> (reverse lineage index)
1480    children: std::collections::HashMap<String, Vec<String>>,
1481}
1482
1483/// Load all Cards in the store once, keyed by card_id.
1484/// Also builds a reverse index (parent → children) so that
1485/// `walk_down` is O(result_size) instead of O(N_cards × depth).
1486fn load_card_index(store: &dyn CardStore) -> Result<CardIndex, String> {
1487    let rows = scan_cards(store, None)?;
1488
1489    let mut cards = std::collections::HashMap::with_capacity(rows.len());
1490    let mut children: std::collections::HashMap<String, Vec<String>> =
1491        std::collections::HashMap::new();
1492
1493    for row in rows {
1494        let id = row.summary.card_id.clone();
1495        let (prior_id, _) = lineage_fields(&row.full);
1496        if let Some(parent) = prior_id {
1497            children.entry(parent).or_default().push(id.clone());
1498        }
1499        cards.insert(id, row);
1500    }
1501    Ok(CardIndex { cards, children })
1502}
1503
1504/// Scan all Cards in the store, loading full TOML for each. When
1505/// `pkg_filter` is provided, only that pkg subdir is scanned. Shared
1506/// between `find` and `load_card_index`.
1507fn scan_cards(store: &dyn CardStore, pkg_filter: Option<&str>) -> Result<Vec<CardRow>, String> {
1508    let locators = store.list_card_locators(pkg_filter)?;
1509    let mut rows = Vec::with_capacity(locators.len());
1510    for (pkg, loc) in &locators {
1511        if let Some(row) = load_full(store, loc, pkg) {
1512            rows.push(row);
1513        }
1514    }
1515    Ok(rows)
1516}
1517
1518/// Invariant context passed through the lineage walkers.
1519struct LineageCtx<'a> {
1520    index: &'a CardIndex,
1521    relation_filter: &'a Option<Vec<String>>,
1522    include_stats: bool,
1523    max_depth: usize,
1524}
1525
1526/// Mutable accumulator for one lineage walk.
1527struct LineageAccum {
1528    nodes: Vec<LineageNode>,
1529    edges: Vec<LineageEdge>,
1530    visited: std::collections::HashSet<String>,
1531    truncated: bool,
1532}
1533
1534/// Walk ancestors via `metadata.prior_card_id`.
1535fn walk_up(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1536    let mut cur = start_id.to_string();
1537    for step in 1..=ctx.max_depth {
1538        let Some(row) = ctx.index.cards.get(&cur) else {
1539            return;
1540        };
1541        let (prior_id, prior_rel) = lineage_fields(&row.full);
1542        let Some(prior_id) = prior_id else {
1543            return;
1544        };
1545        if !relation_passes(ctx.relation_filter, &prior_rel) {
1546            return;
1547        }
1548        if acc.visited.contains(&prior_id) {
1549            return;
1550        }
1551        let Some(parent) = ctx.index.cards.get(&prior_id) else {
1552            return;
1553        };
1554        acc.nodes
1555            .push(make_node(parent, -(step as i32), ctx.include_stats));
1556        acc.edges.push(LineageEdge {
1557            from: row.summary.card_id.clone(),
1558            to: parent.summary.card_id.clone(),
1559            relation: prior_rel,
1560        });
1561        acc.visited.insert(prior_id.clone());
1562        cur = prior_id;
1563    }
1564    // Depth exhausted but another unwalked parent exists → truncated.
1565    if let Some(row) = ctx.index.cards.get(&cur) {
1566        let (prior_id, _) = lineage_fields(&row.full);
1567        if prior_id
1568            .as_ref()
1569            .is_some_and(|p| ctx.index.cards.contains_key(p) && !acc.visited.contains(p))
1570        {
1571            acc.truncated = true;
1572        }
1573    }
1574}
1575
1576/// Walk descendants using the reverse index (parent → children),
1577/// breadth-first.  O(result_size) instead of O(N_cards × depth).
1578fn walk_down(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1579    let mut frontier: Vec<String> = vec![start_id.to_string()];
1580
1581    for depth in 1..=ctx.max_depth {
1582        let mut next_frontier: Vec<String> = Vec::new();
1583        for parent_id in &frontier {
1584            let children = match ctx.index.children.get(parent_id) {
1585                Some(c) => c,
1586                None => continue,
1587            };
1588            for child_id in children {
1589                if acc.visited.contains(child_id) {
1590                    continue;
1591                }
1592                let Some(child) = ctx.index.cards.get(child_id) else {
1593                    continue;
1594                };
1595                let (_, prior_rel) = lineage_fields(&child.full);
1596                if !relation_passes(ctx.relation_filter, &prior_rel) {
1597                    continue;
1598                }
1599                acc.nodes
1600                    .push(make_node(child, depth as i32, ctx.include_stats));
1601                acc.edges.push(LineageEdge {
1602                    from: child.summary.card_id.clone(),
1603                    to: parent_id.clone(),
1604                    relation: prior_rel,
1605                });
1606                acc.visited.insert(child_id.clone());
1607                next_frontier.push(child_id.clone());
1608            }
1609        }
1610        if next_frontier.is_empty() {
1611            return;
1612        }
1613        frontier = next_frontier;
1614    }
1615    // Frontier still has nodes but depth is exhausted: check for
1616    // unwalked children at the next level.
1617    for parent_id in &frontier {
1618        let children = match ctx.index.children.get(parent_id) {
1619            Some(c) => c,
1620            None => continue,
1621        };
1622        for child_id in children {
1623            if acc.visited.contains(child_id) {
1624                continue;
1625            }
1626            let Some(child) = ctx.index.cards.get(child_id) else {
1627                continue;
1628            };
1629            let (_, prior_rel) = lineage_fields(&child.full);
1630            if relation_passes(ctx.relation_filter, &prior_rel) {
1631                acc.truncated = true;
1632                return;
1633            }
1634        }
1635    }
1636}
1637
1638/// Walk the lineage tree from `q.card_id`.
1639pub fn lineage(q: LineageQuery) -> Result<Option<LineageResult>, String> {
1640    lineage_with_store(&default_store()?, q)
1641}
1642
1643/// Walk the lineage tree in `store`. See [`lineage`] for the default-store variant.
1644pub fn lineage_with_store(
1645    store: &dyn CardStore,
1646    q: LineageQuery,
1647) -> Result<Option<LineageResult>, String> {
1648    let index = load_card_index(store)?;
1649    let Some(root_row) = index.cards.get(&q.card_id) else {
1650        return Ok(None);
1651    };
1652
1653    let ctx = LineageCtx {
1654        index: &index,
1655        relation_filter: &q.relation_filter,
1656        include_stats: q.include_stats,
1657        max_depth: q.depth.unwrap_or(DEFAULT_LINEAGE_DEPTH),
1658    };
1659    let mut acc = LineageAccum {
1660        nodes: Vec::new(),
1661        edges: Vec::new(),
1662        visited: std::collections::HashSet::new(),
1663        truncated: false,
1664    };
1665
1666    acc.nodes.push(make_node(root_row, 0, q.include_stats));
1667    acc.visited.insert(q.card_id.clone());
1668
1669    if matches!(q.direction, LineageDirection::Up | LineageDirection::Both) {
1670        walk_up(&q.card_id, &ctx, &mut acc);
1671    }
1672    if matches!(q.direction, LineageDirection::Down | LineageDirection::Both) {
1673        walk_down(&q.card_id, &ctx, &mut acc);
1674    }
1675
1676    Ok(Some(LineageResult {
1677        root: q.card_id,
1678        nodes: acc.nodes,
1679        edges: acc.edges,
1680        truncated: acc.truncated,
1681    }))
1682}
1683
1684/// Render a LineageResult as JSON for the service layer.
1685pub fn lineage_to_json(r: &LineageResult) -> Json {
1686    let nodes: Vec<Json> = r
1687        .nodes
1688        .iter()
1689        .map(|n| {
1690            let mut m = serde_json::Map::new();
1691            m.insert("card_id".into(), json!(n.card_id));
1692            m.insert("pkg".into(), json!(n.pkg));
1693            m.insert("depth".into(), json!(n.depth));
1694            if let Some(p) = &n.prior_card_id {
1695                m.insert("prior_card_id".into(), json!(p));
1696            }
1697            if let Some(rel) = &n.prior_relation {
1698                m.insert("prior_relation".into(), json!(rel));
1699            }
1700            if let Some(s) = &n.stats {
1701                m.insert("stats".into(), s.clone());
1702            }
1703            Json::Object(m)
1704        })
1705        .collect();
1706    let edges: Vec<Json> = r
1707        .edges
1708        .iter()
1709        .map(|e| {
1710            let mut m = serde_json::Map::new();
1711            m.insert("from".into(), json!(e.from));
1712            m.insert("to".into(), json!(e.to));
1713            if let Some(rel) = &e.relation {
1714                m.insert("relation".into(), json!(rel));
1715            }
1716            Json::Object(m)
1717        })
1718        .collect();
1719    json!({
1720        "root": r.root,
1721        "nodes": nodes,
1722        "edges": edges,
1723        "truncated": r.truncated,
1724    })
1725}
1726
1727// ───────────────────────────────────────────────────────────────
1728// Samples sidecar: per-case detail written alongside a Card as
1729// `{pkg}/{card_id}.samples.jsonl`. Write-once to preserve Card
1730// immutability: once a Card has a samples file, it cannot be
1731// rewritten — mismatched per-case data would break auditability.
1732// ───────────────────────────────────────────────────────────────
1733
1734// ───────────────────────────────────────────────────────────────
1735// Card import: copy Card files from an external directory into the
1736// local cards store. Used by `alc_card_install` (Card Collections)
1737// and by `alc_pkg_install` (Pkg-bundled cards/).
1738// ───────────────────────────────────────────────────────────────
1739
1740/// Import Card files from `source_dir` into `~/.algocline/cards/{pkg}/`.
1741///
1742/// Copies `*.toml` and `*.samples.jsonl` files. Existing cards with the
1743/// same id are skipped (first-writer wins — Card immutability).
1744///
1745/// Returns `(imported, skipped)` card_id lists.
1746pub fn import_from_dir(
1747    source_dir: &std::path::Path,
1748    pkg: &str,
1749) -> Result<(Vec<String>, Vec<String>), String> {
1750    import_from_dir_with_store(&default_store()?, source_dir, pkg)
1751}
1752
1753/// Import Card files into `store`. See [`import_from_dir`] for the default-store variant.
1754pub fn import_from_dir_with_store(
1755    store: &dyn CardStore,
1756    source_dir: &std::path::Path,
1757    pkg: &str,
1758) -> Result<(Vec<String>, Vec<String>), String> {
1759    let (imported, skipped) = store.import_from_dir(source_dir, pkg)?;
1760    for card_id in &imported {
1761        match store.read_card_text(card_id) {
1762            Ok(Some(toml_text)) => publish(CardEvent::Created {
1763                pkg: pkg.to_string(),
1764                card_id: card_id.clone(),
1765                toml_text,
1766            }),
1767            Ok(None) => {
1768                tracing::warn!(
1769                    card_id = %card_id,
1770                    "import_from_dir: read_card_text returned None after import; skipping publish"
1771                );
1772            }
1773            Err(e) => {
1774                tracing::warn!(
1775                    card_id = %card_id,
1776                    error = %e,
1777                    "import_from_dir: read_card_text failed after import; skipping publish"
1778                );
1779            }
1780        }
1781        // Samples are optional — best-effort.
1782        match store.read_samples_text(card_id) {
1783            Ok(Some(jsonl_text)) => publish(CardEvent::SamplesWritten {
1784                card_id: card_id.clone(),
1785                jsonl_text,
1786            }),
1787            Ok(None) => {}
1788            Err(e) => {
1789                tracing::warn!(
1790                    card_id = %card_id,
1791                    error = %e,
1792                    "import_from_dir: read_samples_text failed after import; skipping publish"
1793                );
1794            }
1795        }
1796    }
1797    Ok((imported, skipped))
1798}
1799
1800/// Write per-case samples to `{card_id}.samples.jsonl` (write-once).
1801///
1802/// Each `samples` entry is serialized as one compact JSON line.
1803/// Fails if a samples file already exists for this card — mirrors
1804/// the immutability guarantee of Cards themselves.
1805pub fn write_samples(card_id: &str, samples: Vec<Json>) -> Result<PathBuf, String> {
1806    write_samples_with_store(&default_store()?, card_id, samples)
1807}
1808
1809/// Write samples via `store`. See [`write_samples`] for the default-store variant.
1810pub fn write_samples_with_store(
1811    store: &dyn CardStore,
1812    card_id: &str,
1813    samples: Vec<Json>,
1814) -> Result<PathBuf, String> {
1815    if store.samples_exists(card_id)? {
1816        return Err(format!(
1817            "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
1818        ));
1819    }
1820    let mut buf = String::new();
1821    for (idx, s) in samples.iter().enumerate() {
1822        let line = serde_json::to_string(s).map_err(|e| {
1823            format!("alc.card.write_samples: failed to serialize sample #{idx}: {e}")
1824        })?;
1825        buf.push_str(&line);
1826        buf.push('\n');
1827    }
1828    let path = store.write_samples_text(card_id, &buf)?;
1829
1830    publish(CardEvent::SamplesWritten {
1831        card_id: card_id.to_string(),
1832        jsonl_text: buf,
1833    });
1834
1835    Ok(path)
1836}
1837
1838/// Query parameters for `read_samples`.
1839#[derive(Debug, Default, Clone)]
1840pub struct SamplesQuery {
1841    /// Skip this many matched rows (after `where` filtering).
1842    pub offset: usize,
1843    /// Max matched rows to return.
1844    pub limit: Option<usize>,
1845    /// Optional `where` predicate applied to each sample row.
1846    /// The row JSON is the full line object (no section wrapping).
1847    pub where_: Option<Predicate>,
1848}
1849
1850/// Read per-case samples from `{card_id}.samples.jsonl`.
1851///
1852/// Streams the JSONL file line by line; rows are parsed, optionally
1853/// filtered by `q.where_`, then paged by `offset` + `limit`.  Offset
1854/// applies to the **post-filter** stream, matching Prisma/SQL
1855/// semantics.
1856///
1857/// Returns an empty Vec if no samples file exists (Cards without
1858/// per-case details are the common case, not an error).
1859pub fn read_samples(card_id: &str, q: SamplesQuery) -> Result<Vec<Json>, String> {
1860    read_samples_with_store(&default_store()?, card_id, q)
1861}
1862
1863/// Read samples from `store`. See [`read_samples`] for the default-store variant.
1864pub fn read_samples_with_store(
1865    store: &dyn CardStore,
1866    card_id: &str,
1867    q: SamplesQuery,
1868) -> Result<Vec<Json>, String> {
1869    let text = match store.read_samples_text(card_id)? {
1870        Some(t) => t,
1871        None => return Ok(Vec::new()),
1872    };
1873    let mut matched: usize = 0;
1874    let mut out = Vec::new();
1875    for (i, line) in text.lines().enumerate() {
1876        if line.trim().is_empty() {
1877            continue;
1878        }
1879        let val: Json = serde_json::from_str(line)
1880            .map_err(|e| format!("Failed to parse sample line {i}: {e}"))?;
1881        if let Some(pred) = &q.where_ {
1882            if !eval_predicate(pred, &val) {
1883                continue;
1884            }
1885        }
1886        if matched < q.offset {
1887            matched += 1;
1888            continue;
1889        }
1890        if let Some(lim) = q.limit {
1891            if out.len() >= lim {
1892                break;
1893            }
1894        }
1895        matched += 1;
1896        out.push(val);
1897    }
1898    Ok(out)
1899}
1900
1901// ═══════════════════════════════════════════════════════════════
1902// FileCardStore — default backend.
1903// ═══════════════════════════════════════════════════════════════
1904//
1905// Stores Cards as TOML files under `{root}/{pkg}/{card_id}.toml`,
1906// samples as `{root}/{pkg}/{card_id}.samples.jsonl`, and the alias
1907// table as `{root}/_aliases.toml`.
1908//
1909// `root` defaults to `~/.algocline/cards/` via `from_home()`. Tests
1910// may use `new(tmpdir)` to redirect storage to a scratch directory.
1911
1912/// File-backed implementation of [`CardStore`].
1913pub struct FileCardStore {
1914    root: PathBuf,
1915}
1916
1917impl FileCardStore {
1918    /// Construct a store rooted at an explicit path.
1919    pub fn new(root: PathBuf) -> Self {
1920        Self { root }
1921    }
1922
1923    /// Construct the default store rooted at `~/.algocline/cards/`.
1924    pub fn from_home() -> Result<Self, String> {
1925        Ok(Self { root: cards_dir()? })
1926    }
1927
1928    /// Returns the absolute path to the per-pkg subdirectory,
1929    /// creating it when missing. Validates `pkg` to prevent path
1930    /// traversal.
1931    fn pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
1932        validate_name(pkg, "pkg")?;
1933        let dir = self.root.join(pkg);
1934        if !dir.exists() {
1935            fs::create_dir_all(&dir).map_err(|e| format!("Failed to create pkg dir: {e}"))?;
1936        }
1937        Ok(dir)
1938    }
1939
1940    /// Path of the global alias table.
1941    fn aliases_path(&self) -> PathBuf {
1942        self.root.join("_aliases.toml")
1943    }
1944
1945    /// Path of the samples sidecar for `card_id`. Errors if the
1946    /// Card itself does not exist — samples without a parent Card
1947    /// are meaningless and we refuse to create orphans.
1948    fn samples_path(&self, card_id: &str) -> Result<PathBuf, String> {
1949        let card_path = self
1950            .find_card_locator(card_id)?
1951            .ok_or_else(|| format!("card '{card_id}' not found"))?;
1952        let dir = card_path
1953            .parent()
1954            .ok_or_else(|| format!("card '{card_id}' has no parent directory"))?;
1955        Ok(dir.join(format!("{card_id}.samples.jsonl")))
1956    }
1957}
1958
1959impl CardStore for FileCardStore {
1960    fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1961        let dir = self.pkg_dir(pkg)?;
1962        let path = dir.join(format!("{card_id}.toml"));
1963        if path.exists() {
1964            return Err(format!(
1965                "alc.card.create: card '{card_id}' already exists (immutable)"
1966            ));
1967        }
1968        let tmp = path.with_extension("toml.tmp");
1969        fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1970        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1971        Ok(path)
1972    }
1973
1974    fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1975        let path = self
1976            .find_card_locator(card_id)?
1977            .ok_or_else(|| format!("alc.card.overwrite: card '{card_id}' not found"))?;
1978        let tmp = path.with_extension("toml.tmp");
1979        fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1980        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1981        Ok(path)
1982    }
1983
1984    fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
1985        validate_name(card_id, "card_id")?;
1986        if !self.root.exists() {
1987            return Ok(None);
1988        }
1989        let entries =
1990            fs::read_dir(&self.root).map_err(|e| format!("Failed to read cards dir: {e}"))?;
1991        for entry in entries.flatten() {
1992            let p = entry.path();
1993            if p.is_dir() {
1994                let candidate = p.join(format!("{card_id}.toml"));
1995                if candidate.exists() {
1996                    return Ok(Some(candidate));
1997                }
1998            }
1999        }
2000        Ok(None)
2001    }
2002
2003    fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String> {
2004        let Some(path) = self.find_card_locator(card_id)? else {
2005            return Ok(None);
2006        };
2007        let text = fs::read_to_string(&path)
2008            .map_err(|e| format!("Failed to read card '{card_id}': {e}"))?;
2009        Ok(Some(text))
2010    }
2011
2012    fn list_card_locators(
2013        &self,
2014        pkg_filter: Option<&str>,
2015    ) -> Result<Vec<(String, PathBuf)>, String> {
2016        if !self.root.exists() {
2017            return Ok(Vec::new());
2018        }
2019        let pkg_dirs: Vec<PathBuf> = if let Some(p) = pkg_filter {
2020            validate_name(p, "pkg")?;
2021            let d = self.root.join(p);
2022            if d.is_dir() {
2023                vec![d]
2024            } else {
2025                return Ok(Vec::new());
2026            }
2027        } else {
2028            fs::read_dir(&self.root)
2029                .map_err(|e| format!("Failed to read cards dir: {e}"))?
2030                .flatten()
2031                .map(|e| e.path())
2032                .filter(|p| p.is_dir())
2033                .collect()
2034        };
2035
2036        let mut out = Vec::new();
2037        for pdir in pkg_dirs {
2038            let pkg = pdir
2039                .file_name()
2040                .and_then(|s| s.to_str())
2041                .unwrap_or("")
2042                .to_string();
2043            let entries = match fs::read_dir(&pdir) {
2044                Ok(e) => e,
2045                Err(_) => continue,
2046            };
2047            for entry in entries.flatten() {
2048                let p = entry.path();
2049                if p.extension().and_then(|s| s.to_str()) != Some("toml") {
2050                    continue;
2051                }
2052                out.push((pkg.clone(), p));
2053            }
2054        }
2055        Ok(out)
2056    }
2057
2058    fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String> {
2059        match fs::read_to_string(locator) {
2060            Ok(text) => Ok(Some(text)),
2061            Err(_) => Ok(None),
2062        }
2063    }
2064
2065    fn read_aliases(&self) -> Result<Vec<Alias>, String> {
2066        let path = self.aliases_path();
2067        if !path.exists() {
2068            return Ok(Vec::new());
2069        }
2070        let text =
2071            fs::read_to_string(&path).map_err(|e| format!("Failed to read aliases file: {e}"))?;
2072        let val: toml::Value =
2073            toml::from_str(&text).map_err(|e| format!("Failed to parse aliases file: {e}"))?;
2074        let arr = val
2075            .get("alias")
2076            .and_then(|v| v.as_array())
2077            .cloned()
2078            .unwrap_or_default();
2079        let mut out = Vec::with_capacity(arr.len());
2080        for entry in arr {
2081            let t = match entry {
2082                toml::Value::Table(t) => t,
2083                _ => continue,
2084            };
2085            let name = match t.get("name").and_then(|v| v.as_str()) {
2086                Some(s) => s.to_string(),
2087                None => continue,
2088            };
2089            let card_id = match t.get("card_id").and_then(|v| v.as_str()) {
2090                Some(s) => s.to_string(),
2091                None => continue,
2092            };
2093            out.push(Alias {
2094                name,
2095                card_id,
2096                pkg: t.get("pkg").and_then(|v| v.as_str()).map(String::from),
2097                set_at: t
2098                    .get("set_at")
2099                    .and_then(|v| v.as_str())
2100                    .map(String::from)
2101                    .unwrap_or_default(),
2102                note: t.get("note").and_then(|v| v.as_str()).map(String::from),
2103            });
2104        }
2105        Ok(out)
2106    }
2107
2108    fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String> {
2109        // Ensure the cards root exists so aliases can be written to
2110        // a brand-new store (mirrors the behaviour of `cards_dir()`).
2111        if !self.root.exists() {
2112            fs::create_dir_all(&self.root)
2113                .map_err(|e| format!("Failed to create cards dir: {e}"))?;
2114        }
2115        let path = self.aliases_path();
2116        let mut arr = Vec::with_capacity(aliases.len());
2117        for a in aliases {
2118            let mut t = toml::map::Map::new();
2119            t.insert("name".into(), toml::Value::String(a.name.clone()));
2120            t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
2121            if let Some(p) = &a.pkg {
2122                t.insert("pkg".into(), toml::Value::String(p.clone()));
2123            }
2124            t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
2125            if let Some(n) = &a.note {
2126                t.insert("note".into(), toml::Value::String(n.clone()));
2127            }
2128            arr.push(toml::Value::Table(t));
2129        }
2130        let mut root = toml::map::Map::new();
2131        root.insert("alias".into(), toml::Value::Array(arr));
2132        let text = toml::to_string_pretty(&toml::Value::Table(root))
2133            .map_err(|e| format!("Failed to serialize aliases: {e}"))?;
2134        let tmp = path.with_extension("toml.tmp");
2135        fs::write(&tmp, &text).map_err(|e| format!("Failed to write aliases tmp: {e}"))?;
2136        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename aliases file: {e}"))?;
2137        Ok(())
2138    }
2139
2140    fn samples_exists(&self, card_id: &str) -> Result<bool, String> {
2141        let path = self.samples_path(card_id)?;
2142        Ok(path.exists())
2143    }
2144
2145    fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String> {
2146        let path = self.samples_path(card_id)?;
2147        if path.exists() {
2148            return Err(format!(
2149                "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
2150            ));
2151        }
2152        let tmp = path.with_extension("jsonl.tmp");
2153        fs::write(&tmp, jsonl_text).map_err(|e| format!("Failed to write samples tmp: {e}"))?;
2154        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename samples file: {e}"))?;
2155        Ok(path)
2156    }
2157
2158    fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String> {
2159        let path = self.samples_path(card_id)?;
2160        if !path.exists() {
2161            return Ok(None);
2162        }
2163        let text =
2164            fs::read_to_string(&path).map_err(|e| format!("Failed to read samples file: {e}"))?;
2165        Ok(Some(text))
2166    }
2167
2168    fn import_from_dir(
2169        &self,
2170        source_dir: &Path,
2171        pkg: &str,
2172    ) -> Result<(Vec<String>, Vec<String>), String> {
2173        validate_name(pkg, "pkg")?;
2174        let dest = self.pkg_dir(pkg)?;
2175        let mut imported = Vec::new();
2176        let mut skipped = Vec::new();
2177
2178        let entries =
2179            fs::read_dir(source_dir).map_err(|e| format!("Failed to read card source dir: {e}"))?;
2180
2181        for entry in entries.flatten() {
2182            let path = entry.path();
2183            let fname = match path.file_name().and_then(|n| n.to_str()) {
2184                Some(n) => n.to_string(),
2185                None => continue,
2186            };
2187
2188            if !fname.ends_with(".toml") {
2189                continue;
2190            }
2191
2192            let card_id = fname.trim_end_matches(".toml");
2193            let dest_toml = dest.join(&fname);
2194
2195            if dest_toml.exists() {
2196                skipped.push(card_id.to_string());
2197                continue;
2198            }
2199
2200            let text = fs::read_to_string(&path)
2201                .map_err(|e| format!("Failed to read card file '{fname}': {e}"))?;
2202            let val: toml::Value = toml::from_str(&text)
2203                .map_err(|e| format!("Failed to parse card file '{fname}': {e}"))?;
2204            if val.get("schema_version").and_then(|v| v.as_str()) != Some(SCHEMA_VERSION) {
2205                continue;
2206            }
2207
2208            fs::copy(&path, &dest_toml)
2209                .map_err(|e| format!("Failed to copy card '{fname}': {e}"))?;
2210
2211            let samples_name = format!("{card_id}.samples.jsonl");
2212            let samples_src = source_dir.join(&samples_name);
2213            if samples_src.exists() {
2214                let samples_dest = dest.join(&samples_name);
2215                if !samples_dest.exists() {
2216                    fs::copy(&samples_src, &samples_dest)
2217                        .map_err(|e| format!("Failed to copy samples '{samples_name}': {e}"))?;
2218                }
2219            }
2220
2221            imported.push(card_id.to_string());
2222        }
2223
2224        Ok((imported, skipped))
2225    }
2226}
2227
2228// ═══════════════════════════════════════════════════════════════
2229// Event Publisher Port — Card sink fan-out (v1)
2230// ═══════════════════════════════════════════════════════════════
2231//
2232// Storage port (`CardStore` / `FileCardStore`) stays pure CRUD. The
2233// Event publisher port below mirrors every successful Card write to a
2234// set of subscriber backends configured via the `ALC_CARD_SINKS`
2235// environment variable. Fan-out is best-effort and strictly serial;
2236// subscriber failures never propagate to the primary Card API.
2237
2238// ─── CardEvent / CardEventKind ─────────────────────────────────
2239
2240/// A Card-level event emitted from the write path.
2241///
2242/// Each variant carries the already-serialized payload text so that
2243/// subscribers can persist the exact bytes that were written to the
2244/// primary store (byte-for-byte parity).
2245#[derive(Debug, Clone)]
2246pub enum CardEvent {
2247    /// A brand-new Card was written.
2248    Created {
2249        pkg: String,
2250        card_id: String,
2251        toml_text: String,
2252    },
2253    /// An existing Card had new top-level keys merged in.
2254    Appended { card_id: String, toml_text: String },
2255    /// A samples JSONL sidecar was written for `card_id`.
2256    SamplesWritten { card_id: String, jsonl_text: String },
2257    /// The global alias table was rewritten.
2258    AliasesWritten { toml_text: String },
2259}
2260
2261/// Lightweight discriminant for `CardEvent`. Used as a `HashMap` key in
2262/// `SubscriberStats` so that ok/err counters can be tracked per event
2263/// kind without holding the full payload.
2264#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
2265pub enum CardEventKind {
2266    Created,
2267    Appended,
2268    SamplesWritten,
2269    AliasesWritten,
2270}
2271
2272impl CardEventKind {
2273    /// Stable string tag used in `tracing` log fields.
2274    pub fn as_str(self) -> &'static str {
2275        match self {
2276            CardEventKind::Created => "created",
2277            CardEventKind::Appended => "appended",
2278            CardEventKind::SamplesWritten => "samples_written",
2279            CardEventKind::AliasesWritten => "aliases_written",
2280        }
2281    }
2282
2283    /// Short JSON key for the public `alc_stats` snapshot.
2284    /// Distinct from `as_str()` so that tracing logs keep their
2285    /// verbose form while the JSON surface stays compact.
2286    pub fn json_key(self) -> &'static str {
2287        match self {
2288            CardEventKind::Created => "created",
2289            CardEventKind::Appended => "appended",
2290            CardEventKind::SamplesWritten => "samples",
2291            CardEventKind::AliasesWritten => "aliases",
2292        }
2293    }
2294
2295    /// All kinds in stable display order. Used by `SubscriberHealthRow`
2296    /// to emit all four counter keys even when a counter is zero, so
2297    /// that downstream consumers can rely on field presence.
2298    pub fn all() -> [CardEventKind; 4] {
2299        [
2300            CardEventKind::Created,
2301            CardEventKind::Appended,
2302            CardEventKind::SamplesWritten,
2303            CardEventKind::AliasesWritten,
2304        ]
2305    }
2306}
2307
2308impl Serialize for CardEventKind {
2309    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2310        s.serialize_str(self.json_key())
2311    }
2312}
2313
2314impl CardEvent {
2315    /// Return the `CardEventKind` discriminant for this event.
2316    pub fn kind(&self) -> CardEventKind {
2317        match self {
2318            CardEvent::Created { .. } => CardEventKind::Created,
2319            CardEvent::Appended { .. } => CardEventKind::Appended,
2320            CardEvent::SamplesWritten { .. } => CardEventKind::SamplesWritten,
2321            CardEvent::AliasesWritten { .. } => CardEventKind::AliasesWritten,
2322        }
2323    }
2324}
2325
2326// ─── CardSubscriber trait ──────────────────────────────────────
2327
2328/// A downstream backend that receives `CardEvent`s in best-effort,
2329/// serial fan-out order.
2330///
2331/// Implementations must be `Send + Sync` so that the bus can hold
2332/// `Arc<dyn CardSubscriber>` safely across threads.
2333pub trait CardSubscriber: Send + Sync {
2334    /// Handle one event. Returning `Err` records a failure in
2335    /// `SubscriberStats` and emits a `tracing::warn!`, but does not
2336    /// propagate to the caller of the `_with_store` API.
2337    fn on_event(&self, ev: &CardEvent) -> Result<(), String>;
2338
2339    /// Canonical identity URI for this subscriber. Used as the key in
2340    /// `SubscriberStats` and as the match target for
2341    /// `CardEventBus::publish_to`.
2342    fn describe(&self) -> String;
2343
2344    /// Best-effort check for whether `card_id` already exists in this
2345    /// subscriber. Used by `alc_card_sink_backfill` to skip cards that
2346    /// are already mirrored (drift-safe: we never overwrite).
2347    ///
2348    /// Default implementation returns `Ok(false)` so subscribers that
2349    /// cannot cheaply answer (e.g. future network-backed sinks) always
2350    /// get the push. Override when a cheap local check is possible.
2351    fn has_card(&self, _card_id: &str) -> Result<bool, String> {
2352        Ok(false)
2353    }
2354}
2355
2356// ─── SubscriberStats ───────────────────────────────────────────
2357
2358/// Most recent delivery failure for a single subscriber. Exposed via
2359/// `SubscriberHealthRow.last_error` in the `alc_stats` JSON snapshot.
2360#[derive(Debug, Clone, Serialize)]
2361pub struct LastError {
2362    pub kind: CardEventKind,
2363    pub msg: String,
2364    pub ts_ms: u64,
2365}
2366
2367/// Per-subscriber counter state. Held inside `SubscriberStats` under a
2368/// `Mutex`; `snapshot` clones the fields into an owned `SubscriberHealthRow`
2369/// while the lock is held, so the lock window stays short.
2370#[derive(Default, Debug)]
2371pub struct PerSubscriber {
2372    pub ok: HashMap<CardEventKind, u64>,
2373    pub err: HashMap<CardEventKind, u64>,
2374    pub last_error: Option<LastError>,
2375}
2376
2377/// Process-wide per-subscriber statistics, keyed by subscriber URI
2378/// (the value returned by `CardSubscriber::describe`).
2379#[derive(Default, Debug)]
2380pub struct SubscriberStats {
2381    inner: Mutex<HashMap<String, PerSubscriber>>,
2382}
2383
2384impl SubscriberStats {
2385    /// Record a successful event delivery.
2386    pub fn record_ok(&self, key: &str, kind: CardEventKind) {
2387        let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2388        let entry = g.entry(key.to_string()).or_default();
2389        let c = entry.ok.entry(kind).or_insert(0);
2390        *c = c.saturating_add(1);
2391    }
2392
2393    /// Record a delivery failure together with the error message.
2394    /// The failure kind, message, and timestamp overwrite `last_error`
2395    /// — there is no ring buffer; only the most recent failure is kept.
2396    pub fn record_err(&self, key: &str, kind: CardEventKind, err: &str) {
2397        let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2398        let entry = g.entry(key.to_string()).or_default();
2399        let c = entry.err.entry(kind).or_insert(0);
2400        *c = c.saturating_add(1);
2401        entry.last_error = Some(LastError {
2402            kind,
2403            msg: err.to_string(),
2404            ts_ms: now_ms(),
2405        });
2406    }
2407
2408    /// Take a point-in-time snapshot of all subscribers. The returned
2409    /// `Vec` is owned — the internal lock is released before this
2410    /// function returns, so callers can hold it freely.
2411    ///
2412    /// All four `CardEventKind` keys (`created / appended / samples /
2413    /// aliases`) are always present in both `ok` and `err`, defaulting
2414    /// to 0 if no event of that kind has been recorded. This lets
2415    /// downstream consumers rely on field presence.
2416    pub fn snapshot(&self) -> Vec<SubscriberHealthRow> {
2417        let g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2418        let mut rows = Vec::with_capacity(g.len());
2419        for (sink, ps) in g.iter() {
2420            let mut ok: HashMap<String, u64> = HashMap::with_capacity(4);
2421            let mut err: HashMap<String, u64> = HashMap::with_capacity(4);
2422            for k in CardEventKind::all() {
2423                ok.insert(
2424                    k.json_key().to_string(),
2425                    ps.ok.get(&k).copied().unwrap_or(0),
2426                );
2427                err.insert(
2428                    k.json_key().to_string(),
2429                    ps.err.get(&k).copied().unwrap_or(0),
2430                );
2431            }
2432            rows.push(SubscriberHealthRow {
2433                sink: sink.clone(),
2434                ok,
2435                err,
2436                last_error: ps.last_error.clone(),
2437            });
2438        }
2439        // Stable output ordering (by sink URI) so that the JSON dump is
2440        // deterministic across runs — useful for snapshot tests.
2441        rows.sort_by(|a, b| a.sink.cmp(&b.sink));
2442        rows
2443    }
2444}
2445
2446/// Unix-epoch milliseconds used by `LastError.ts_ms`. Uses
2447/// `unwrap_or_default` so no panic can escape even if the system
2448/// clock is misconfigured.
2449fn now_ms() -> u64 {
2450    std::time::SystemTime::now()
2451        .duration_since(std::time::UNIX_EPOCH)
2452        .unwrap_or_default()
2453        .as_millis() as u64
2454}
2455
2456/// Snapshot row for a single subscriber, serialized directly into the
2457/// `alc_stats` JSON output as one element of the `card_sinks` array.
2458#[derive(Debug, Clone, Serialize)]
2459pub struct SubscriberHealthRow {
2460    pub sink: String,
2461    pub ok: HashMap<String, u64>,
2462    pub err: HashMap<String, u64>,
2463    pub last_error: Option<LastError>,
2464}
2465
2466/// Public entry point: snapshot of all process-wide subscriber stats.
2467/// Wrapper around `event_bus().stats().snapshot()` so that downstream
2468/// crates (notably `algocline-app`) do not need a handle to the
2469/// `CardEventBus` singleton.
2470pub fn subscriber_stats_snapshot() -> Vec<SubscriberHealthRow> {
2471    event_bus().stats().snapshot()
2472}
2473
2474// ─── FileCardSubscriber — file-URI backend ─────────────────────
2475
2476/// Atomically write `bytes` to `dest` by staging to a unique
2477/// `{dest}.tmp.{pid}.{seq}` sibling and renaming. The `{pid}.{seq}`
2478/// suffix prevents concurrent writers (same or different processes)
2479/// from colliding on the tmp path.
2480fn atomic_write(dest: &Path, bytes: &[u8]) -> Result<(), String> {
2481    static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
2482    let seq = TMP_SEQ.fetch_add(1, Ordering::Relaxed);
2483    let pid = process::id();
2484    if let Some(parent) = dest.parent() {
2485        if !parent.as_os_str().is_empty() && !parent.exists() {
2486            fs::create_dir_all(parent).map_err(|e| format!("subscriber mkdir: {e}"))?;
2487        }
2488    }
2489    let mut tmp = dest.as_os_str().to_owned();
2490    tmp.push(format!(".tmp.{pid}.{seq}"));
2491    let tmp_path = PathBuf::from(tmp);
2492    fs::write(&tmp_path, bytes).map_err(|e| format!("subscriber write tmp: {e}"))?;
2493    fs::rename(&tmp_path, dest).map_err(|e| format!("subscriber rename: {e}"))
2494}
2495
2496/// Canonical `file://` URI for a local directory path. The result is
2497/// the inverse of [`decode_file_uri_path`] — the pair round-trips
2498/// through the `ALC_CARD_SINKS` env spec.
2499fn canonical_file_uri(root: &Path) -> String {
2500    let p = root.to_string_lossy();
2501    #[cfg(unix)]
2502    {
2503        format!("file://{p}")
2504    }
2505    #[cfg(windows)]
2506    {
2507        format!("file:///{}", p.replace('\\', "/"))
2508    }
2509    #[cfg(not(any(unix, windows)))]
2510    {
2511        format!("file://{p}")
2512    }
2513}
2514
2515/// A subscriber that mirrors events to a local directory using the
2516/// same two-tier layout as [`FileCardStore`]:
2517///
2518/// - `{root}/{pkg}/{card_id}.toml` — Card TOML
2519/// - `{root}/{pkg}/{card_id}.samples.jsonl` — samples sidecar
2520/// - `{root}/_aliases.toml` — global alias table
2521pub struct FileCardSubscriber {
2522    root: PathBuf,
2523    uri: String,
2524}
2525
2526impl FileCardSubscriber {
2527    /// Construct a subscriber rooted at `root`. The canonical URI is
2528    /// computed once and returned from [`Self::describe`].
2529    pub fn new(root: PathBuf) -> Self {
2530        let uri = canonical_file_uri(&root);
2531        Self { root, uri }
2532    }
2533
2534    /// Scan the subscriber root for a Card with `card_id` under any
2535    /// `{pkg}` subdirectory. Returns `Ok(None)` when the root itself
2536    /// does not exist yet (subscribers are write-once lazy).
2537    pub fn locate_card(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
2538        validate_name(card_id, "card_id")?;
2539        if !self.root.exists() {
2540            return Ok(None);
2541        }
2542        let entries = fs::read_dir(&self.root).map_err(|e| format!("subscriber read_dir: {e}"))?;
2543        for entry in entries.flatten() {
2544            let p = entry.path();
2545            if p.is_dir() {
2546                let candidate = p.join(format!("{card_id}.toml"));
2547                if candidate.exists() {
2548                    return Ok(Some(candidate));
2549                }
2550            }
2551        }
2552        Ok(None)
2553    }
2554
2555    fn ensure_pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
2556        validate_name(pkg, "pkg")?;
2557        let dir = self.root.join(pkg);
2558        if !dir.exists() {
2559            fs::create_dir_all(&dir).map_err(|e| format!("subscriber mkdir: {e}"))?;
2560        }
2561        Ok(dir)
2562    }
2563
2564    fn write_created(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<(), String> {
2565        validate_name(card_id, "card_id")?;
2566        let dir = self.ensure_pkg_dir(pkg)?;
2567        let dest = dir.join(format!("{card_id}.toml"));
2568        atomic_write(&dest, toml_text.as_bytes())
2569    }
2570
2571    fn write_appended(&self, card_id: &str, toml_text: &str) -> Result<(), String> {
2572        match self.locate_card(card_id)? {
2573            Some(dest) => atomic_write(&dest, toml_text.as_bytes()),
2574            None => Err(format!(
2575                "subscriber append: card '{card_id}' missing at {}",
2576                self.uri
2577            )),
2578        }
2579    }
2580
2581    fn write_samples(&self, card_id: &str, jsonl_text: &str) -> Result<(), String> {
2582        let card_path = self.locate_card(card_id)?.ok_or_else(|| {
2583            format!(
2584                "subscriber samples: card '{card_id}' missing at {}",
2585                self.uri
2586            )
2587        })?;
2588        let dir = card_path
2589            .parent()
2590            .ok_or_else(|| format!("subscriber samples: card '{card_id}' has no parent dir"))?;
2591        let dest = dir.join(format!("{card_id}.samples.jsonl"));
2592        atomic_write(&dest, jsonl_text.as_bytes())
2593    }
2594
2595    fn write_aliases(&self, toml_text: &str) -> Result<(), String> {
2596        if !self.root.exists() {
2597            fs::create_dir_all(&self.root).map_err(|e| format!("subscriber mkdir: {e}"))?;
2598        }
2599        let dest = self.root.join("_aliases.toml");
2600        atomic_write(&dest, toml_text.as_bytes())
2601    }
2602}
2603
2604impl CardSubscriber for FileCardSubscriber {
2605    fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
2606        match ev {
2607            CardEvent::Created {
2608                pkg,
2609                card_id,
2610                toml_text,
2611            } => self.write_created(pkg, card_id, toml_text),
2612            CardEvent::Appended { card_id, toml_text } => self.write_appended(card_id, toml_text),
2613            CardEvent::SamplesWritten {
2614                card_id,
2615                jsonl_text,
2616            } => self.write_samples(card_id, jsonl_text),
2617            CardEvent::AliasesWritten { toml_text } => self.write_aliases(toml_text),
2618        }
2619    }
2620
2621    fn describe(&self) -> String {
2622        self.uri.clone()
2623    }
2624
2625    /// Delegates to [`FileCardSubscriber::locate_card`]. A non-existent
2626    /// root (subscriber has never been written to) returns `Ok(false)`,
2627    /// which is the correct "backfill needed" answer.
2628    fn has_card(&self, card_id: &str) -> Result<bool, String> {
2629        Ok(self.locate_card(card_id)?.is_some())
2630    }
2631}
2632
2633// ─── CardEventBus + OnceLock singleton ─────────────────────────
2634
2635/// Process-wide fan-out bus. Subscribers are registered once at startup
2636/// (from `ALC_CARD_SINKS`) and stored behind a `Mutex` so that tests
2637/// can swap them out via `replace_subscribers_for_test` without losing
2638/// the singleton identity.
2639pub struct CardEventBus {
2640    subscribers: Mutex<Vec<Arc<dyn CardSubscriber>>>,
2641    stats: Arc<SubscriberStats>,
2642}
2643
2644impl CardEventBus {
2645    /// Build a bus from an explicit subscriber list. Used both by the
2646    /// env loader and by `install_event_bus_for_test`.
2647    pub fn new(subscribers: Vec<Arc<dyn CardSubscriber>>) -> Self {
2648        Self {
2649            subscribers: Mutex::new(subscribers),
2650            stats: Arc::new(SubscriberStats::default()),
2651        }
2652    }
2653
2654    /// Shared handle to the per-subscriber counters.
2655    pub fn stats(&self) -> &Arc<SubscriberStats> {
2656        &self.stats
2657    }
2658
2659    /// Fan out `ev` to every registered subscriber serially. Subscriber
2660    /// failures are counted in `SubscriberStats` and logged but do not
2661    /// propagate.
2662    pub fn publish(&self, ev: &CardEvent) {
2663        let subs_snapshot: Vec<Arc<dyn CardSubscriber>> = {
2664            let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2665            guard.clone()
2666        };
2667        for sub in &subs_snapshot {
2668            let key = sub.describe();
2669            match sub.on_event(ev) {
2670                Ok(()) => self.stats.record_ok(&key, ev.kind()),
2671                Err(e) => {
2672                    tracing::warn!(
2673                        subscriber = %key,
2674                        kind = ev.kind().as_str(),
2675                        error = %e,
2676                        "card subscriber failed"
2677                    );
2678                    self.stats.record_err(&key, ev.kind(), &e);
2679                }
2680            }
2681        }
2682    }
2683
2684    /// Deliver `ev` to exactly one subscriber identified by
2685    /// `target` URI. Returns `Err` when no subscriber matches or the
2686    /// subscriber itself fails (backfill path needs the caller to know).
2687    pub fn publish_to(&self, target: &str, ev: &CardEvent) -> Result<(), String> {
2688        let hit: Option<Arc<dyn CardSubscriber>> = {
2689            let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2690            guard.iter().find(|s| s.describe() == target).cloned()
2691        };
2692        let Some(sub) = hit else {
2693            return Err(format!("subscriber not registered: {target}"));
2694        };
2695        let key = sub.describe();
2696        match sub.on_event(ev) {
2697            Ok(()) => {
2698                self.stats.record_ok(&key, ev.kind());
2699                Ok(())
2700            }
2701            Err(e) => {
2702                tracing::warn!(
2703                    subscriber = %key,
2704                    kind = ev.kind().as_str(),
2705                    error = %e,
2706                    "card subscriber failed (publish_to)"
2707                );
2708                self.stats.record_err(&key, ev.kind(), &e);
2709                Err(e)
2710            }
2711        }
2712    }
2713
2714    /// List every subscriber URI currently registered on the bus.
2715    pub fn subscriber_uris(&self) -> Vec<String> {
2716        let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2717        guard.iter().map(|s| s.describe()).collect()
2718    }
2719
2720    /// Look up a subscriber by URI (as returned by `describe`). Returns
2721    /// `None` when no subscriber matches. Used by
2722    /// `alc_card_sink_backfill` to dispatch has_card checks against a
2723    /// specific sink.
2724    pub fn find_subscriber(&self, uri: &str) -> Option<Arc<dyn CardSubscriber>> {
2725        let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2726        guard.iter().find(|s| s.describe() == uri).cloned()
2727    }
2728
2729    /// Replace the subscriber list in place while preserving singleton
2730    /// identity and shared `SubscriberStats`. Test-only.
2731    #[cfg(any(test, feature = "test-support"))]
2732    pub fn replace_subscribers_for_test(&self, subs: Vec<Arc<dyn CardSubscriber>>) {
2733        let mut guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2734        *guard = subs;
2735    }
2736
2737    /// Reset the per-subscriber counters. Test-only helper.
2738    #[cfg(any(test, feature = "test-support"))]
2739    pub fn reset_stats_for_test(&self) {
2740        let mut g = self.stats.inner.lock().unwrap_or_else(|p| p.into_inner());
2741        g.clear();
2742    }
2743}
2744
2745static CARD_EVENT_BUS: OnceLock<CardEventBus> = OnceLock::new();
2746
2747/// Return the process-wide `CardEventBus` singleton, initializing it
2748/// on the first call from the `ALC_CARD_SINKS` environment variable.
2749pub fn event_bus() -> &'static CardEventBus {
2750    CARD_EVENT_BUS.get_or_init(|| {
2751        let subs = load_subscribers_from_env();
2752        CardEventBus::new(subs)
2753    })
2754}
2755
2756/// Eagerly initialize the bus. Idempotent and safe to call multiple
2757/// times; intended for startup hooks (`main.rs`) so that subscriber
2758/// registration `tracing::info!` lines are emitted at boot rather than
2759/// on the first Card write.
2760pub fn init_event_bus() {
2761    let bus = event_bus();
2762    let uris = bus.subscriber_uris();
2763    if uris.is_empty() {
2764        tracing::info!("card sinks: no subscribers configured (ALC_CARD_SINKS unset)");
2765    } else {
2766        for uri in &uris {
2767            tracing::info!(subscriber = %uri, "card sink registered");
2768        }
2769    }
2770}
2771
2772/// Install a test-built bus. Fails once the singleton is already set.
2773#[cfg(any(test, feature = "test-support"))]
2774pub fn install_event_bus_for_test(bus: CardEventBus) -> Result<(), String> {
2775    CARD_EVENT_BUS
2776        .set(bus)
2777        .map_err(|_| "bus already initialized".to_string())
2778}
2779
2780/// Convenience wrapper: publish through the singleton.
2781pub fn publish(ev: CardEvent) {
2782    // In test builds, serialize publishing with the subscriber-test mutex so
2783    // that default-store tests running in parallel cannot inject events into
2784    // a subscriber test's mock while the mock is active.  The owning test
2785    // thread sets INSIDE_BUS_TEST to true so it does NOT try to acquire the
2786    // same lock it already holds (re-entrancy safe).
2787    #[cfg(test)]
2788    {
2789        let is_test_owner = INSIDE_BUS_TEST.with(|f| f.get());
2790        if !is_test_owner {
2791            // Block until any active subscriber test releases the lock.
2792            let _gate = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
2793            event_bus().publish(&ev);
2794            return;
2795        }
2796    }
2797    event_bus().publish(&ev);
2798}
2799
2800/// Mutex used in tests to serialise subscriber-test setup against concurrent
2801/// publishes from default-store tests running in parallel.
2802#[cfg(test)]
2803fn bus_test_gate() -> &'static Mutex<()> {
2804    static GATE: OnceLock<Mutex<()>> = OnceLock::new();
2805    GATE.get_or_init(|| Mutex::new(()))
2806}
2807
2808// Thread-local flag: set to `true` by the thread running inside
2809// `with_bus_subscribers` so that `publish` skips the gate (re-entrancy).
2810#[cfg(test)]
2811thread_local! {
2812    static INSIDE_BUS_TEST: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
2813}
2814
2815// ─── alc_card_sink_backfill ────────────────────────────────────
2816
2817/// Result of a [`card_sink_backfill`] run. One row per card the tool
2818/// touched (classified as pushed / skipped / failed / pushed_samples).
2819/// The `failed` entries carry the error message so an operator can
2820/// triage read-only mounts etc.
2821#[derive(Debug, Clone, Default, Serialize)]
2822pub struct SinkBackfillReport {
2823    pub sink: String,
2824    pub pushed: Vec<String>,
2825    pub skipped: Vec<String>,
2826    pub failed: Vec<(String, String)>,
2827    pub pushed_samples: Vec<String>,
2828}
2829
2830/// Backfill one subscriber (`sink` URI) from the primary store.
2831///
2832/// Steps:
2833/// 1. Look up the target subscriber on the event bus; fail fast if
2834///    the URI is not registered so the caller gets an immediate
2835///    error rather than a silent no-op.
2836/// 2. Enumerate every `(pkg, card_id)` pair from the default
2837///    [`CardStore`].
2838/// 3. For each pair, ask the subscriber whether it already has that
2839///    card (`CardSubscriber::has_card`). If yes → skipped (drift-safe,
2840///    no overwrite). If no → read the primary TOML and `publish_to`
2841///    the one target sink. Samples are mirrored the same way.
2842/// 4. `dry_run = true` short-circuits step 3: the report lists
2843///    what would have been pushed but no `publish_to` is issued,
2844///    so `SubscriberStats` does not increment.
2845pub fn card_sink_backfill(sink: &str, dry_run: bool) -> Result<SinkBackfillReport, String> {
2846    let store = default_store()?;
2847    card_sink_backfill_with_store(&store, sink, dry_run)
2848}
2849
2850/// `card_sink_backfill` with an injectable [`CardStore`]. Tests drive
2851/// this directly against a tempdir-backed [`FileCardStore`] to avoid
2852/// touching the user's real `~/.algocline/cards/`.
2853pub fn card_sink_backfill_with_store(
2854    store: &dyn CardStore,
2855    sink: &str,
2856    dry_run: bool,
2857) -> Result<SinkBackfillReport, String> {
2858    let bus = event_bus();
2859    let sub = bus
2860        .find_subscriber(sink)
2861        .ok_or_else(|| format!("unknown sink: {sink}"))?;
2862
2863    let locators = store.list_card_locators(None)?;
2864
2865    let mut report = SinkBackfillReport {
2866        sink: sink.to_string(),
2867        ..Default::default()
2868    };
2869
2870    for (pkg, locator) in locators {
2871        let card_id = match locator.file_stem().and_then(|s| s.to_str()) {
2872            Some(s) => s.to_string(),
2873            None => continue,
2874        };
2875
2876        match sub.has_card(&card_id) {
2877            Ok(true) => {
2878                report.skipped.push(card_id);
2879                continue;
2880            }
2881            Ok(false) => {}
2882            Err(e) => {
2883                tracing::warn!(
2884                    card_id = %card_id,
2885                    error = %e,
2886                    "backfill: has_card failed; treating as skipped"
2887                );
2888                report.skipped.push(card_id);
2889                continue;
2890            }
2891        }
2892
2893        let toml_text = match store.read_locator_text(&locator) {
2894            Ok(Some(t)) => t,
2895            Ok(None) => {
2896                // Unreadable / corrupt on primary. Do not panic; skip.
2897                report.skipped.push(card_id);
2898                continue;
2899            }
2900            Err(e) => {
2901                tracing::warn!(
2902                    card_id = %card_id,
2903                    error = %e,
2904                    "backfill: read_locator_text failed; treating as skipped"
2905                );
2906                report.skipped.push(card_id);
2907                continue;
2908            }
2909        };
2910
2911        if dry_run {
2912            report.pushed.push(card_id.clone());
2913            if matches!(store.read_samples_text(&card_id), Ok(Some(_))) {
2914                report.pushed_samples.push(card_id);
2915            }
2916            continue;
2917        }
2918
2919        let ev = CardEvent::Created {
2920            pkg: pkg.clone(),
2921            card_id: card_id.clone(),
2922            toml_text,
2923        };
2924        match bus.publish_to(sink, &ev) {
2925            Ok(()) => report.pushed.push(card_id.clone()),
2926            Err(e) => {
2927                report.failed.push((card_id, e));
2928                continue;
2929            }
2930        }
2931
2932        if let Ok(Some(jsonl_text)) = store.read_samples_text(&card_id) {
2933            let ev = CardEvent::SamplesWritten {
2934                card_id: card_id.clone(),
2935                jsonl_text,
2936            };
2937            match bus.publish_to(sink, &ev) {
2938                Ok(()) => report.pushed_samples.push(card_id),
2939                Err(e) => {
2940                    report.failed.push((card_id, format!("samples: {e}")));
2941                }
2942            }
2943        }
2944    }
2945
2946    Ok(report)
2947}
2948
2949// ─── ALC_CARD_SINKS env parser ─────────────────────────────────
2950
2951/// Read `ALC_CARD_SINKS` and build one subscriber per accepted spec.
2952/// Malformed entries are logged and skipped; duplicate URIs are
2953/// first-wins. Non-UTF8 values reject the whole env.
2954fn load_subscribers_from_env() -> Vec<Arc<dyn CardSubscriber>> {
2955    let raw = match std::env::var("ALC_CARD_SINKS") {
2956        Ok(v) => v,
2957        Err(std::env::VarError::NotPresent) => return Vec::new(),
2958        Err(std::env::VarError::NotUnicode(_)) => {
2959            tracing::error!("ALC_CARD_SINKS contains non-UTF8 bytes; ignoring entire variable");
2960            return Vec::new();
2961        }
2962    };
2963    parse_subscribers_from_str(&raw)
2964}
2965
2966/// Parse a `|`-separated list of subscriber URIs (the same format used
2967/// by `ALC_CARD_SINKS`). Extracted so tests can exercise the parser
2968/// without touching process environment.
2969fn parse_subscribers_from_str(raw: &str) -> Vec<Arc<dyn CardSubscriber>> {
2970    if raw.is_empty() {
2971        return Vec::new();
2972    }
2973    let mut seen: HashSet<String> = HashSet::new();
2974    let mut out: Vec<Arc<dyn CardSubscriber>> = Vec::new();
2975    for spec in raw.split('|') {
2976        let spec = spec.trim();
2977        if spec.is_empty() {
2978            continue;
2979        }
2980        let Some(sub) = parse_subscriber_spec(spec) else {
2981            continue;
2982        };
2983        let uri = sub.describe();
2984        if !seen.insert(uri.clone()) {
2985            tracing::warn!(subscriber = %uri, "duplicate ALC_CARD_SINKS entry; keeping first");
2986            continue;
2987        }
2988        out.push(sub);
2989    }
2990    out
2991}
2992
2993/// Parse one subscriber spec. v1 only accepts `file:///absolute/path`.
2994fn parse_subscriber_spec(spec: &str) -> Option<Arc<dyn CardSubscriber>> {
2995    // scheme required
2996    let Some(colon_idx) = spec.find(':') else {
2997        tracing::error!(spec, "ALC_CARD_SINKS entry missing URI scheme");
2998        return None;
2999    };
3000    let scheme = &spec[..colon_idx];
3001    let rest = &spec[colon_idx + 1..];
3002    if scheme != "file" {
3003        tracing::error!(spec, scheme, "ALC_CARD_SINKS entry has unknown scheme");
3004        return None;
3005    }
3006    // scheme-specific: must start with `//`
3007    let Some(after_slash) = rest.strip_prefix("//") else {
3008        tracing::error!(spec, "file URI missing '//'");
3009        return None;
3010    };
3011    // split authority / path. path begins with '/'.
3012    let Some(path_start) = after_slash.find('/') else {
3013        tracing::error!(spec, "file URI has no path component");
3014        return None;
3015    };
3016    let authority = &after_slash[..path_start];
3017    let encoded_path = &after_slash[path_start..];
3018    if !authority.is_empty() {
3019        tracing::error!(
3020            spec,
3021            authority,
3022            "file URI with non-empty authority is rejected"
3023        );
3024        return None;
3025    }
3026    let path = decode_file_uri_path(encoded_path)?;
3027    Some(Arc::new(FileCardSubscriber::new(path)))
3028}
3029
3030/// Decode the path portion of a `file://` URI into a `PathBuf`.
3031///
3032/// Unix: a leading `/` is preserved (absolute path).
3033/// Windows: the leading `/` is stripped so that `/C:/a/b` becomes
3034/// `C:/a/b`.
3035fn decode_file_uri_path(encoded: &str) -> Option<PathBuf> {
3036    let decoded = percent_decode(encoded)?;
3037    #[cfg(windows)]
3038    {
3039        // Strip the leading slash so "/C:/foo" -> "C:/foo".
3040        let trimmed = decoded.strip_prefix('/').unwrap_or(&decoded);
3041        Some(PathBuf::from(trimmed))
3042    }
3043    #[cfg(not(windows))]
3044    {
3045        Some(PathBuf::from(decoded))
3046    }
3047}
3048
3049/// Percent-decode a URI path segment. Returns `None` on invalid or
3050/// truncated `%XX` sequences.
3051fn percent_decode(src: &str) -> Option<String> {
3052    let bytes = src.as_bytes();
3053    let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
3054    let mut i = 0;
3055    while i < bytes.len() {
3056        let b = bytes[i];
3057        if b == b'%' {
3058            if i + 2 >= bytes.len() {
3059                tracing::error!(src, "percent-encoded sequence truncated");
3060                return None;
3061            }
3062            let hi = (bytes[i + 1] as char).to_digit(16);
3063            let lo = (bytes[i + 2] as char).to_digit(16);
3064            match (hi, lo) {
3065                (Some(h), Some(l)) => {
3066                    out.push(((h << 4) | l) as u8);
3067                    i += 3;
3068                }
3069                _ => {
3070                    tracing::error!(src, "percent-encoded sequence has non-hex digits");
3071                    return None;
3072                }
3073            }
3074        } else {
3075            out.push(b);
3076            i += 1;
3077        }
3078    }
3079    match String::from_utf8(out) {
3080        Ok(s) => Some(s),
3081        Err(_) => {
3082            tracing::error!(src, "percent-decoded bytes are not valid UTF-8");
3083            None
3084        }
3085    }
3086}
3087
3088#[cfg(test)]
3089mod tests {
3090    use super::*;
3091
3092    fn unique_pkg() -> String {
3093        let ns = std::time::SystemTime::now()
3094            .duration_since(std::time::UNIX_EPOCH)
3095            .unwrap()
3096            .as_nanos();
3097        format!("_test_card_{ns}")
3098    }
3099
3100    fn cleanup(pkg: &str) {
3101        if let Ok(store) = default_store() {
3102            if let Ok(d) = store.pkg_dir(pkg) {
3103                let _ = fs::remove_dir_all(&d);
3104            }
3105        }
3106    }
3107
3108    #[test]
3109    fn minimum_valid_card() {
3110        let pkg = unique_pkg();
3111        let input = json!({ "pkg": { "name": pkg } });
3112        let (id, path) = create(input).unwrap();
3113        assert!(path.exists());
3114        assert!(id.starts_with(&pkg));
3115
3116        let got = get(&id).unwrap().unwrap();
3117        assert_eq!(got["schema_version"], json!(SCHEMA_VERSION));
3118        assert_eq!(got["card_id"], json!(id));
3119        assert_eq!(got["pkg"]["name"], json!(pkg));
3120        assert!(got.get("created_at").is_some());
3121        assert!(got.get("created_by").is_some());
3122
3123        cleanup(&pkg);
3124    }
3125
3126    #[test]
3127    fn create_rejects_missing_pkg_name() {
3128        let err = create(json!({})).unwrap_err();
3129        assert!(err.contains("pkg.name"));
3130    }
3131
3132    #[test]
3133    fn create_is_immutable() {
3134        let pkg = unique_pkg();
3135        let input = json!({
3136            "card_id": "fixed_id_001",
3137            "pkg": { "name": pkg }
3138        });
3139        create(input.clone()).unwrap();
3140        let err = create(input).unwrap_err();
3141        assert!(err.contains("already exists"));
3142        cleanup(&pkg);
3143    }
3144
3145    #[test]
3146    fn create_injects_param_fingerprint() {
3147        let pkg = unique_pkg();
3148        let input = json!({
3149            "pkg": { "name": pkg },
3150            "params": { "depth": 3, "temperature": 0.0 }
3151        });
3152        let (id, _) = create(input).unwrap();
3153        let got = get(&id).unwrap().unwrap();
3154        assert!(got["param_fingerprint"].is_string());
3155        cleanup(&pkg);
3156    }
3157
3158    #[test]
3159    fn list_returns_newest_first() {
3160        let pkg = unique_pkg();
3161        // First card
3162        let (id1, _) = create(json!({
3163            "card_id": format!("{pkg}_a"),
3164            "pkg": { "name": pkg },
3165            "created_at": "2025-01-01T00:00:00Z"
3166        }))
3167        .unwrap();
3168        let (id2, _) = create(json!({
3169            "card_id": format!("{pkg}_b"),
3170            "pkg": { "name": pkg },
3171            "created_at": "2026-01-01T00:00:00Z"
3172        }))
3173        .unwrap();
3174
3175        let rows = list(Some(&pkg)).unwrap();
3176        assert_eq!(rows.len(), 2);
3177        assert_eq!(rows[0].card_id, id2); // newer first
3178        assert_eq!(rows[1].card_id, id1);
3179
3180        cleanup(&pkg);
3181    }
3182
3183    #[test]
3184    fn list_extracts_summary_fields() {
3185        let pkg = unique_pkg();
3186        let (id, _) = create(json!({
3187            "pkg": { "name": pkg },
3188            "model": { "id": "claude-opus-4-6" },
3189            "scenario": { "name": "gsm8k_sample100" },
3190            "stats": { "pass_rate": 0.82 }
3191        }))
3192        .unwrap();
3193
3194        let rows = list(Some(&pkg)).unwrap();
3195        let row = rows.iter().find(|r| r.card_id == id).unwrap();
3196        assert_eq!(row.model.as_deref(), Some("claude-opus-4-6"));
3197        assert_eq!(row.scenario.as_deref(), Some("gsm8k_sample100"));
3198        assert_eq!(row.pass_rate, Some(0.82));
3199
3200        cleanup(&pkg);
3201    }
3202
3203    #[test]
3204    fn get_missing_returns_none() {
3205        assert!(get("does_not_exist_xyz").unwrap().is_none());
3206    }
3207
3208    #[test]
3209    fn card_id_embeds_compact_timestamp() {
3210        let pkg = unique_pkg();
3211        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3212        // Expect: {pkg}_{model}_{YYYYMMDDTHHMMSS}_{hash6}
3213        // After removing the pkg prefix, there should be a segment
3214        // containing 'T' separating date and time.
3215        let tail = id.strip_prefix(&format!("{pkg}_")).unwrap();
3216        let parts: Vec<&str> = tail.split('_').collect();
3217        // parts = [model_short, YYYYMMDDTHHMMSS, hash6]
3218        assert_eq!(parts.len(), 3, "unexpected card_id shape: {id}");
3219        let ts = parts[1];
3220        assert_eq!(ts.len(), 15, "timestamp segment wrong length: {ts}");
3221        assert!(ts.chars().nth(8) == Some('T'), "missing T separator: {ts}");
3222        cleanup(&pkg);
3223    }
3224
3225    #[test]
3226    fn now_compact_format() {
3227        let s = now_compact();
3228        assert_eq!(s.len(), 15);
3229        assert_eq!(s.chars().nth(8), Some('T'));
3230        // All other positions are digits
3231        for (i, c) in s.chars().enumerate() {
3232            if i != 8 {
3233                assert!(c.is_ascii_digit(), "non-digit at pos {i}: {s}");
3234            }
3235        }
3236    }
3237
3238    #[test]
3239    fn short_model_variants() {
3240        assert_eq!(short_model("claude-opus-4-6"), "opus46");
3241        assert_eq!(short_model("gpt-4o"), "4o");
3242        assert_eq!(short_model(""), "model");
3243    }
3244
3245    #[test]
3246    fn two_cards_same_second_different_stats_get_distinct_ids() {
3247        let pkg = unique_pkg();
3248        let input1 = json!({
3249            "pkg": { "name": pkg },
3250            "scenario": { "name": "gsm8k" },
3251            "stats": { "pass_rate": 0.4 }
3252        });
3253        let input2 = json!({
3254            "pkg": { "name": pkg },
3255            "scenario": { "name": "gsm8k" },
3256            "stats": { "pass_rate": 0.9 }
3257        });
3258        let (id1, _) = create(input1).unwrap();
3259        let (id2, _) = create(input2).unwrap();
3260        assert_ne!(id1, id2, "distinct stats must yield distinct card_ids");
3261        cleanup(&pkg);
3262    }
3263
3264    // ─── P1: append ────────────────────────────────────────────
3265
3266    #[test]
3267    fn append_adds_new_fields() {
3268        let pkg = unique_pkg();
3269        let (id, _) = create(json!({
3270            "pkg": { "name": pkg },
3271            "stats": { "pass_rate": 0.5 }
3272        }))
3273        .unwrap();
3274
3275        let merged = append(
3276            &id,
3277            json!({
3278                "caveats": { "notes": "rescored after fix" },
3279                "metadata": { "reviewer": "yn" }
3280            }),
3281        )
3282        .unwrap();
3283        assert_eq!(merged["caveats"]["notes"], json!("rescored after fix"));
3284        assert_eq!(merged["metadata"]["reviewer"], json!("yn"));
3285
3286        // Persisted
3287        let got = get(&id).unwrap().unwrap();
3288        assert_eq!(got["caveats"]["notes"], json!("rescored after fix"));
3289        // Existing field untouched
3290        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3291
3292        cleanup(&pkg);
3293    }
3294
3295    #[test]
3296    fn append_rejects_existing_key() {
3297        let pkg = unique_pkg();
3298        let (id, _) = create(json!({
3299            "pkg": { "name": pkg },
3300            "stats": { "pass_rate": 0.5 }
3301        }))
3302        .unwrap();
3303
3304        let err = append(&id, json!({ "stats": { "pass_rate": 0.9 } })).unwrap_err();
3305        assert!(err.contains("already set"), "got: {err}");
3306        // Verify original value still there
3307        let got = get(&id).unwrap().unwrap();
3308        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3309
3310        cleanup(&pkg);
3311    }
3312
3313    #[test]
3314    fn append_errors_on_missing_card() {
3315        let err = append("does_not_exist_xyz", json!({ "x": 1 })).unwrap_err();
3316        assert!(err.contains("not found"));
3317    }
3318
3319    // ─── P1: alias_set / alias_list ────────────────────────────
3320
3321    #[test]
3322    fn alias_set_and_list_roundtrip() {
3323        let pkg = unique_pkg();
3324        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3325
3326        let alias_name = format!("test_alias_{}", &pkg);
3327        alias_set(&alias_name, &id, Some(&pkg), Some("smoke")).unwrap();
3328
3329        let rows = alias_list(Some(&pkg)).unwrap();
3330        let a = rows.iter().find(|a| a.name == alias_name).unwrap();
3331        assert_eq!(a.card_id, id);
3332        assert_eq!(a.pkg.as_deref(), Some(pkg.as_str()));
3333        assert_eq!(a.note.as_deref(), Some("smoke"));
3334        assert!(!a.set_at.is_empty());
3335
3336        // Rebind to a new card
3337        let (id2, _) = create(json!({
3338            "card_id": format!("{pkg}_b"),
3339            "pkg": { "name": pkg }
3340        }))
3341        .unwrap();
3342        alias_set(&alias_name, &id2, Some(&pkg), None).unwrap();
3343        let rows = alias_list(Some(&pkg)).unwrap();
3344        let matching: Vec<&Alias> = rows.iter().filter(|a| a.name == alias_name).collect();
3345        assert_eq!(matching.len(), 1, "alias should be unique by name");
3346        assert_eq!(matching[0].card_id, id2);
3347
3348        // Cleanup: remove our alias from the file
3349        let store = default_store().unwrap();
3350        let remaining: Vec<Alias> = store
3351            .read_aliases()
3352            .unwrap()
3353            .into_iter()
3354            .filter(|a| a.name != alias_name)
3355            .collect();
3356        store.write_aliases(&remaining).unwrap();
3357        cleanup(&pkg);
3358    }
3359
3360    #[test]
3361    fn alias_set_rejects_unknown_card() {
3362        let err = alias_set("x", "does_not_exist_xyz", None, None).unwrap_err();
3363        assert!(err.contains("not found"));
3364    }
3365
3366    // ─── find + where DSL ───────────────────────────────────────
3367
3368    fn where_from(v: Json) -> Predicate {
3369        parse_where(&v).expect("parse where")
3370    }
3371
3372    fn order_from(v: Json) -> Vec<OrderKey> {
3373        parse_order_by(&v).expect("parse order_by")
3374    }
3375
3376    #[test]
3377    fn find_where_nested_eq_and_gte() {
3378        let pkg = unique_pkg();
3379        create(json!({
3380            "card_id": format!("{pkg}_low"),
3381            "pkg": { "name": pkg },
3382            "scenario": { "name": "gsm8k" },
3383            "stats": { "pass_rate": 0.4 }
3384        }))
3385        .unwrap();
3386        create(json!({
3387            "card_id": format!("{pkg}_high"),
3388            "pkg": { "name": pkg },
3389            "scenario": { "name": "gsm8k" },
3390            "stats": { "pass_rate": 0.9 }
3391        }))
3392        .unwrap();
3393        create(json!({
3394            "card_id": format!("{pkg}_other"),
3395            "pkg": { "name": pkg },
3396            "scenario": { "name": "other" },
3397            "stats": { "pass_rate": 1.0 }
3398        }))
3399        .unwrap();
3400
3401        // scenario eq via nested object
3402        let rows = find(FindQuery {
3403            pkg: Some(pkg.clone()),
3404            where_: Some(where_from(json!({
3405                "scenario": { "name": "gsm8k" },
3406            }))),
3407            order_by: order_from(json!("-stats.pass_rate")),
3408            ..Default::default()
3409        })
3410        .unwrap();
3411        assert_eq!(rows.len(), 2);
3412        assert_eq!(rows[0].pass_rate, Some(0.9));
3413        assert_eq!(rows[1].pass_rate, Some(0.4));
3414
3415        // gte operator
3416        let rows = find(FindQuery {
3417            pkg: Some(pkg.clone()),
3418            where_: Some(where_from(json!({
3419                "stats": { "pass_rate": { "gte": 0.8 } },
3420            }))),
3421            order_by: order_from(json!("-stats.pass_rate")),
3422            ..Default::default()
3423        })
3424        .unwrap();
3425        assert_eq!(rows.len(), 2);
3426        assert!(rows.iter().all(|r| r.pass_rate.unwrap() >= 0.8));
3427
3428        // limit
3429        let rows = find(FindQuery {
3430            pkg: Some(pkg.clone()),
3431            order_by: order_from(json!("-stats.pass_rate")),
3432            limit: Some(1),
3433            ..Default::default()
3434        })
3435        .unwrap();
3436        assert_eq!(rows.len(), 1);
3437        assert_eq!(rows[0].pass_rate, Some(1.0));
3438
3439        cleanup(&pkg);
3440    }
3441
3442    #[test]
3443    fn find_where_implicit_eq_and_logical() {
3444        let pkg = unique_pkg();
3445        create(json!({
3446            "card_id": format!("{pkg}_a"),
3447            "pkg": { "name": pkg },
3448            "model": { "id": "claude-opus-4-6" },
3449            "stats": { "equilibrium_position": "dead", "survival_rate": 0.0 }
3450        }))
3451        .unwrap();
3452        create(json!({
3453            "card_id": format!("{pkg}_b"),
3454            "pkg": { "name": pkg },
3455            "model": { "id": "claude-opus-4-6" },
3456            "stats": { "equilibrium_position": "niche_leader", "survival_rate": 1.0 }
3457        }))
3458        .unwrap();
3459        create(json!({
3460            "card_id": format!("{pkg}_c"),
3461            "pkg": { "name": pkg },
3462            "model": { "id": "claude-haiku-4-5-20251001" },
3463            "stats": { "equilibrium_position": "fragile", "survival_rate": 0.2 }
3464        }))
3465        .unwrap();
3466
3467        // implicit eq on sparse stats field
3468        let rows = find(FindQuery {
3469            pkg: Some(pkg.clone()),
3470            where_: Some(where_from(json!({
3471                "stats": { "equilibrium_position": "dead" },
3472            }))),
3473            ..Default::default()
3474        })
3475        .unwrap();
3476        assert_eq!(rows.len(), 1);
3477        assert!(rows[0].card_id.ends_with("_a"));
3478
3479        // _or
3480        let rows = find(FindQuery {
3481            pkg: Some(pkg.clone()),
3482            where_: Some(where_from(json!({
3483                "_or": [
3484                    { "stats": { "equilibrium_position": "dead" } },
3485                    { "stats": { "survival_rate": { "gte": 0.9 } } },
3486                ],
3487            }))),
3488            ..Default::default()
3489        })
3490        .unwrap();
3491        assert_eq!(rows.len(), 2);
3492
3493        // _not
3494        let rows = find(FindQuery {
3495            pkg: Some(pkg.clone()),
3496            where_: Some(where_from(json!({
3497                "_not": { "model": { "id": "claude-haiku-4-5-20251001" } },
3498            }))),
3499            ..Default::default()
3500        })
3501        .unwrap();
3502        assert_eq!(rows.len(), 2);
3503
3504        // in operator
3505        let rows = find(FindQuery {
3506            pkg: Some(pkg.clone()),
3507            where_: Some(where_from(json!({
3508                "stats": {
3509                    "equilibrium_position": { "in": ["dead", "fragile"] },
3510                },
3511            }))),
3512            ..Default::default()
3513        })
3514        .unwrap();
3515        assert_eq!(rows.len(), 2);
3516
3517        // exists false (sparse field missing on haiku card? all have it, so test on
3518        // a field that only some have)
3519        let rows = find(FindQuery {
3520            pkg: Some(pkg.clone()),
3521            where_: Some(where_from(json!({
3522                "strategy_params": { "temperature": { "exists": false } },
3523            }))),
3524            ..Default::default()
3525        })
3526        .unwrap();
3527        assert_eq!(rows.len(), 3, "none of the cards have strategy_params");
3528
3529        cleanup(&pkg);
3530    }
3531
3532    #[test]
3533    fn find_order_by_multi_key() {
3534        let pkg = unique_pkg();
3535        create(json!({
3536            "card_id": format!("{pkg}_a"),
3537            "pkg": { "name": pkg },
3538            "stats": { "pass_rate": 0.5 }
3539        }))
3540        .unwrap();
3541        create(json!({
3542            "card_id": format!("{pkg}_b"),
3543            "pkg": { "name": pkg },
3544            "stats": { "pass_rate": 0.9 }
3545        }))
3546        .unwrap();
3547        create(json!({
3548            "card_id": format!("{pkg}_c"),
3549            "pkg": { "name": pkg },
3550            "stats": { "pass_rate": 0.9 }
3551        }))
3552        .unwrap();
3553
3554        let rows = find(FindQuery {
3555            pkg: Some(pkg.clone()),
3556            order_by: order_from(json!(["-stats.pass_rate", "card_id"])),
3557            ..Default::default()
3558        })
3559        .unwrap();
3560        assert_eq!(rows.len(), 3);
3561        assert_eq!(rows[0].pass_rate, Some(0.9));
3562        assert_eq!(rows[1].pass_rate, Some(0.9));
3563        assert_eq!(rows[2].pass_rate, Some(0.5));
3564        // Tiebreak by card_id ascending
3565        assert!(rows[0].card_id < rows[1].card_id);
3566
3567        cleanup(&pkg);
3568    }
3569
3570    #[test]
3571    fn find_offset_and_limit() {
3572        let pkg = unique_pkg();
3573        for i in 0..5 {
3574            create(json!({
3575                "card_id": format!("{pkg}_{i}"),
3576                "pkg": { "name": pkg },
3577                "stats": { "pass_rate": 0.1 * (i + 1) as f64 }
3578            }))
3579            .unwrap();
3580        }
3581
3582        let rows = find(FindQuery {
3583            pkg: Some(pkg.clone()),
3584            order_by: order_from(json!("-stats.pass_rate")),
3585            offset: Some(1),
3586            limit: Some(2),
3587            ..Default::default()
3588        })
3589        .unwrap();
3590        assert_eq!(rows.len(), 2);
3591        // Best is 0.5, after offset=1 we start at 0.4 then 0.3.
3592        let pr0 = rows[0].pass_rate.unwrap();
3593        let pr1 = rows[1].pass_rate.unwrap();
3594        assert!((pr0 - 0.4).abs() < 1e-9, "got {pr0}");
3595        assert!((pr1 - 0.3).abs() < 1e-9, "got {pr1}");
3596
3597        cleanup(&pkg);
3598    }
3599
3600    #[test]
3601    fn parse_where_rejects_non_object() {
3602        assert!(parse_where(&json!("not an object")).is_err());
3603        assert!(parse_where(&json!(42)).is_err());
3604    }
3605
3606    #[test]
3607    fn parse_order_by_accepts_string_and_array() {
3608        let k = parse_order_by(&json!("-stats.pass_rate")).unwrap();
3609        assert_eq!(k.len(), 1);
3610        assert_eq!(k[0].path, vec!["stats", "pass_rate"]);
3611        assert!(k[0].desc);
3612
3613        let k = parse_order_by(&json!(["created_at", "-stats.n"])).unwrap();
3614        assert_eq!(k.len(), 2);
3615        assert!(!k[0].desc);
3616        assert!(k[1].desc);
3617    }
3618
3619    #[test]
3620    fn find_where_string_ops_contains_and_starts_with() {
3621        let pkg = unique_pkg();
3622        create(json!({
3623            "card_id": format!("{pkg}_a"),
3624            "pkg": { "name": pkg },
3625            "model": { "id": "claude-opus-4-6" },
3626            "metadata": { "tag": "experiment_alpha" },
3627        }))
3628        .unwrap();
3629        create(json!({
3630            "card_id": format!("{pkg}_b"),
3631            "pkg": { "name": pkg },
3632            "model": { "id": "claude-haiku-4-5-20251001" },
3633            "metadata": { "tag": "experiment_beta" },
3634        }))
3635        .unwrap();
3636        create(json!({
3637            "card_id": format!("{pkg}_c"),
3638            "pkg": { "name": pkg },
3639            "model": { "id": "claude-sonnet-4-5" },
3640            "metadata": { "tag": "baseline" },
3641        }))
3642        .unwrap();
3643
3644        // contains: matches substring anywhere
3645        let rows = find(FindQuery {
3646            pkg: Some(pkg.clone()),
3647            where_: Some(where_from(json!({
3648                "metadata": { "tag": { "contains": "experiment" } },
3649            }))),
3650            ..Default::default()
3651        })
3652        .unwrap();
3653        assert_eq!(rows.len(), 2);
3654
3655        // starts_with: matches only the prefix
3656        let rows = find(FindQuery {
3657            pkg: Some(pkg.clone()),
3658            where_: Some(where_from(json!({
3659                "model": { "id": { "starts_with": "claude-opus" } },
3660            }))),
3661            ..Default::default()
3662        })
3663        .unwrap();
3664        assert_eq!(rows.len(), 1);
3665        assert!(rows[0].card_id.ends_with("_a"));
3666
3667        // string ops on missing field → false
3668        let rows = find(FindQuery {
3669            pkg: Some(pkg.clone()),
3670            where_: Some(where_from(json!({
3671                "metadata": { "missing_field": { "contains": "x" } },
3672            }))),
3673            ..Default::default()
3674        })
3675        .unwrap();
3676        assert_eq!(rows.len(), 0);
3677
3678        // string ops on non-string field → false
3679        let rows = find(FindQuery {
3680            pkg: Some(pkg.clone()),
3681            where_: Some(where_from(json!({
3682                "metadata": { "tag": { "starts_with": 42 } },
3683            }))),
3684            ..Default::default()
3685        })
3686        .unwrap();
3687        assert_eq!(rows.len(), 0);
3688
3689        cleanup(&pkg);
3690    }
3691
3692    #[test]
3693    fn where_missing_field_ne_is_true() {
3694        let pkg = unique_pkg();
3695        create(json!({
3696            "card_id": format!("{pkg}_x"),
3697            "pkg": { "name": pkg },
3698        }))
3699        .unwrap();
3700
3701        let rows = find(FindQuery {
3702            pkg: Some(pkg.clone()),
3703            where_: Some(where_from(json!({
3704                "strategy_params": { "temperature": { "ne": 0.5 } },
3705            }))),
3706            ..Default::default()
3707        })
3708        .unwrap();
3709        assert_eq!(rows.len(), 1, "missing field is ne to anything");
3710
3711        cleanup(&pkg);
3712    }
3713
3714    // ─── lineage ───────────────────────────────────────────────
3715
3716    /// Helper: create a child Card pointing at a parent with a relation.
3717    fn create_child(pkg: &str, suffix: &str, parent_id: &str, relation: &str) -> String {
3718        let (id, _) = create(json!({
3719            "card_id": format!("{pkg}_{suffix}"),
3720            "pkg": { "name": pkg },
3721            "stats": { "pass_rate": 0.5 },
3722            "metadata": {
3723                "prior_card_id": parent_id,
3724                "prior_relation": relation,
3725            },
3726        }))
3727        .unwrap();
3728        id
3729    }
3730
3731    #[test]
3732    fn lineage_up_walks_prior_card_id_chain() {
3733        let pkg = unique_pkg();
3734        // a → b → c (c is newest; b points at a; c points at b)
3735        let (a, _) = create(json!({
3736            "card_id": format!("{pkg}_a"),
3737            "pkg": { "name": pkg },
3738        }))
3739        .unwrap();
3740        let b = create_child(&pkg, "b", &a, "rerun_of");
3741        let c = create_child(&pkg, "c", &b, "rerun_of");
3742
3743        let res = lineage(LineageQuery {
3744            card_id: c.clone(),
3745            direction: LineageDirection::Up,
3746            depth: None,
3747            include_stats: false,
3748            relation_filter: None,
3749        })
3750        .unwrap()
3751        .expect("lineage result");
3752
3753        assert_eq!(res.root, c);
3754        assert_eq!(res.nodes.len(), 3, "root + 2 ancestors");
3755        assert_eq!(res.nodes[0].card_id, c);
3756        assert_eq!(res.nodes[0].depth, 0);
3757        assert_eq!(res.nodes[1].card_id, b);
3758        assert_eq!(res.nodes[1].depth, -1);
3759        assert_eq!(res.nodes[2].card_id, a);
3760        assert_eq!(res.nodes[2].depth, -2);
3761        assert_eq!(res.edges.len(), 2);
3762        assert!(!res.truncated);
3763
3764        cleanup(&pkg);
3765    }
3766
3767    #[test]
3768    fn lineage_down_walks_descendants_breadth_first() {
3769        let pkg = unique_pkg();
3770        // a has two children b, c; c has one child d.
3771        let (a, _) = create(json!({
3772            "card_id": format!("{pkg}_a"),
3773            "pkg": { "name": pkg },
3774        }))
3775        .unwrap();
3776        let _b = create_child(&pkg, "b", &a, "sweep_variant");
3777        let c = create_child(&pkg, "c", &a, "sweep_variant");
3778        let _d = create_child(&pkg, "d", &c, "rerun_of");
3779
3780        let res = lineage(LineageQuery {
3781            card_id: a.clone(),
3782            direction: LineageDirection::Down,
3783            depth: None,
3784            include_stats: false,
3785            relation_filter: None,
3786        })
3787        .unwrap()
3788        .expect("lineage result");
3789
3790        // root + b + c + d = 4 nodes
3791        assert_eq!(res.nodes.len(), 4);
3792        assert_eq!(res.edges.len(), 3);
3793        assert!(!res.truncated);
3794
3795        cleanup(&pkg);
3796    }
3797
3798    #[test]
3799    fn lineage_depth_truncation_sets_flag() {
3800        let pkg = unique_pkg();
3801        let (a, _) = create(json!({
3802            "card_id": format!("{pkg}_a"),
3803            "pkg": { "name": pkg },
3804        }))
3805        .unwrap();
3806        let b = create_child(&pkg, "b", &a, "rerun_of");
3807        let _c = create_child(&pkg, "c", &b, "rerun_of");
3808
3809        let res = lineage(LineageQuery {
3810            card_id: a,
3811            direction: LineageDirection::Down,
3812            depth: Some(1),
3813            include_stats: false,
3814            relation_filter: None,
3815        })
3816        .unwrap()
3817        .unwrap();
3818        assert_eq!(res.nodes.len(), 2, "root + 1 level");
3819        assert!(res.truncated, "should be truncated at depth=1");
3820
3821        cleanup(&pkg);
3822    }
3823
3824    #[test]
3825    fn lineage_relation_filter_skips_unlisted() {
3826        let pkg = unique_pkg();
3827        let (a, _) = create(json!({
3828            "card_id": format!("{pkg}_a"),
3829            "pkg": { "name": pkg },
3830        }))
3831        .unwrap();
3832        let _b = create_child(&pkg, "b", &a, "sweep_variant");
3833        let _c = create_child(&pkg, "c", &a, "rerun_of");
3834
3835        let res = lineage(LineageQuery {
3836            card_id: a,
3837            direction: LineageDirection::Down,
3838            depth: None,
3839            include_stats: false,
3840            relation_filter: Some(vec!["sweep_variant".to_string()]),
3841        })
3842        .unwrap()
3843        .unwrap();
3844        assert_eq!(res.nodes.len(), 2, "root + only sweep_variant child");
3845        assert_eq!(res.edges[0].relation.as_deref(), Some("sweep_variant"));
3846
3847        cleanup(&pkg);
3848    }
3849
3850    #[test]
3851    fn lineage_missing_card_returns_none() {
3852        let res = lineage(LineageQuery {
3853            card_id: "nonexistent_card_id_xyz".into(),
3854            direction: LineageDirection::Up,
3855            depth: None,
3856            include_stats: false,
3857            relation_filter: None,
3858        })
3859        .unwrap();
3860        assert!(res.is_none());
3861    }
3862
3863    // ─── samples sidecar ───────────────────────────────────────
3864
3865    #[test]
3866    fn write_and_read_samples_roundtrip() {
3867        let pkg = unique_pkg();
3868        let (id, _) = create(json!({
3869            "pkg": { "name": pkg },
3870            "stats": { "pass_rate": 0.5 }
3871        }))
3872        .unwrap();
3873
3874        let samples = vec![
3875            json!({ "case": "c0", "passed": true, "score": 1.0 }),
3876            json!({ "case": "c1", "passed": false, "score": 0.0 }),
3877            json!({ "case": "c2", "passed": true, "score": 0.75 }),
3878        ];
3879        let path = write_samples(&id, samples.clone()).unwrap();
3880        assert!(path.exists());
3881        assert!(path.to_string_lossy().ends_with(".samples.jsonl"));
3882
3883        let got = read_samples(&id, SamplesQuery::default()).unwrap();
3884        assert_eq!(got.len(), 3);
3885        assert_eq!(got[0]["case"], json!("c0"));
3886        assert_eq!(got[2]["score"], json!(0.75));
3887
3888        // offset + limit
3889        let slice = read_samples(
3890            &id,
3891            SamplesQuery {
3892                offset: 1,
3893                limit: Some(1),
3894                where_: None,
3895            },
3896        )
3897        .unwrap();
3898        assert_eq!(slice.len(), 1);
3899        assert_eq!(slice[0]["case"], json!("c1"));
3900
3901        cleanup(&pkg);
3902    }
3903
3904    #[test]
3905    fn write_samples_is_write_once() {
3906        let pkg = unique_pkg();
3907        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3908        write_samples(&id, vec![json!({ "x": 1 })]).unwrap();
3909        let err = write_samples(&id, vec![json!({ "x": 2 })]).unwrap_err();
3910        assert!(err.contains("already exist"), "got: {err}");
3911        cleanup(&pkg);
3912    }
3913
3914    #[test]
3915    fn read_samples_empty_when_absent() {
3916        let pkg = unique_pkg();
3917        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3918        let got = read_samples(&id, SamplesQuery::default()).unwrap();
3919        assert!(got.is_empty());
3920        cleanup(&pkg);
3921    }
3922
3923    #[test]
3924    fn read_samples_where_filters_rows() {
3925        let pkg = unique_pkg();
3926        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3927        write_samples(
3928            &id,
3929            vec![
3930                json!({ "case": "c0", "passed": true,  "score": 1.0 }),
3931                json!({ "case": "c1", "passed": false, "score": 0.0 }),
3932                json!({ "case": "c2", "passed": true,  "score": 0.25 }),
3933                json!({ "case": "c3", "passed": true,  "score": 0.75 }),
3934                json!({ "case": "c4", "passed": false, "score": 0.5 }),
3935            ],
3936        )
3937        .unwrap();
3938
3939        // Equality predicate: passed == true keeps 3 rows.
3940        let pred = parse_where(&json!({ "passed": true })).unwrap();
3941        let got = read_samples(
3942            &id,
3943            SamplesQuery {
3944                offset: 0,
3945                limit: None,
3946                where_: Some(pred),
3947            },
3948        )
3949        .unwrap();
3950        assert_eq!(got.len(), 3);
3951        assert_eq!(got[0]["case"], json!("c0"));
3952        assert_eq!(got[1]["case"], json!("c2"));
3953        assert_eq!(got[2]["case"], json!("c3"));
3954
3955        // Nested comparator: score gte 0.5 keeps c0/c3/c4.
3956        let pred = parse_where(&json!({ "score": { "gte": 0.5 } })).unwrap();
3957        let got = read_samples(
3958            &id,
3959            SamplesQuery {
3960                offset: 0,
3961                limit: None,
3962                where_: Some(pred),
3963            },
3964        )
3965        .unwrap();
3966        assert_eq!(got.len(), 3);
3967        assert_eq!(got[0]["case"], json!("c0"));
3968        assert_eq!(got[1]["case"], json!("c3"));
3969        assert_eq!(got[2]["case"], json!("c4"));
3970
3971        // Offset applies AFTER filter: passed=true then skip 1 + limit 1 → c2.
3972        let pred = parse_where(&json!({ "passed": true })).unwrap();
3973        let slice = read_samples(
3974            &id,
3975            SamplesQuery {
3976                offset: 1,
3977                limit: Some(1),
3978                where_: Some(pred),
3979            },
3980        )
3981        .unwrap();
3982        assert_eq!(slice.len(), 1);
3983        assert_eq!(slice[0]["case"], json!("c2"));
3984
3985        cleanup(&pkg);
3986    }
3987
3988    #[test]
3989    fn get_by_alias_roundtrip() {
3990        let pkg = unique_pkg();
3991        let (id, _) = create(json!({
3992            "pkg": { "name": pkg },
3993            "stats": { "pass_rate": 0.85 }
3994        }))
3995        .unwrap();
3996
3997        let alias_name = format!("best_{pkg}");
3998        alias_set(&alias_name, &id, Some(&pkg), None).unwrap();
3999
4000        let card = get_by_alias(&alias_name).unwrap().unwrap();
4001        assert_eq!(card["card_id"], json!(id));
4002        assert_eq!(card["stats"]["pass_rate"], json!(0.85));
4003
4004        assert!(get_by_alias("nonexistent_alias_xyz").unwrap().is_none());
4005
4006        cleanup(&pkg);
4007    }
4008
4009    #[test]
4010    fn samples_errors_on_missing_card() {
4011        let err = write_samples("does_not_exist_xyz_samples", vec![json!({})]).unwrap_err();
4012        assert!(err.contains("not found"));
4013    }
4014
4015    // ─── import_from_dir ───────────────────────────────────────
4016
4017    #[test]
4018    fn import_from_dir_copies_cards() {
4019        let pkg = unique_pkg();
4020        let tmp = tempfile::tempdir().unwrap();
4021
4022        // Create a source card file
4023        let card_id = format!("{pkg}_imported");
4024        let card_content = format!(
4025            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{card_id}\"\npkg = \"{pkg}\"\n"
4026        );
4027        fs::write(tmp.path().join(format!("{card_id}.toml")), &card_content).unwrap();
4028
4029        // Create a matching samples file
4030        fs::write(
4031            tmp.path().join(format!("{card_id}.samples.jsonl")),
4032            "{\"case\":\"c0\"}\n",
4033        )
4034        .unwrap();
4035
4036        let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4037        assert_eq!(imported, vec![card_id.clone()]);
4038        assert!(skipped.is_empty());
4039
4040        // Verify card was imported
4041        let got = get(&card_id).unwrap().unwrap();
4042        assert_eq!(got["card_id"], json!(card_id));
4043
4044        // Verify samples were copied
4045        let samples = read_samples(&card_id, SamplesQuery::default()).unwrap();
4046        assert_eq!(samples.len(), 1);
4047
4048        cleanup(&pkg);
4049    }
4050
4051    #[test]
4052    fn import_from_dir_skips_existing() {
4053        let pkg = unique_pkg();
4054        // Create a card in the store first
4055        let (existing_id, _) = create(json!({
4056            "pkg": { "name": pkg },
4057            "stats": { "pass_rate": 0.5 }
4058        }))
4059        .unwrap();
4060
4061        // Try to import a card with the same id
4062        let tmp = tempfile::tempdir().unwrap();
4063        let card_content = format!(
4064            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{existing_id}\"\npkg = \"{pkg}\"\n"
4065        );
4066        fs::write(
4067            tmp.path().join(format!("{existing_id}.toml")),
4068            &card_content,
4069        )
4070        .unwrap();
4071
4072        let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4073        assert!(imported.is_empty());
4074        assert_eq!(skipped, vec![existing_id.clone()]);
4075
4076        // Original card untouched
4077        let got = get(&existing_id).unwrap().unwrap();
4078        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
4079
4080        cleanup(&pkg);
4081    }
4082
4083    #[test]
4084    fn import_from_dir_skips_non_card_toml() {
4085        let pkg = unique_pkg();
4086        let tmp = tempfile::tempdir().unwrap();
4087
4088        // A TOML file without schema_version = "card/v0" should be skipped
4089        fs::write(tmp.path().join("not_a_card.toml"), "title = \"hello\"\n").unwrap();
4090
4091        let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4092        assert!(imported.is_empty());
4093        assert!(skipped.is_empty());
4094
4095        cleanup(&pkg);
4096    }
4097
4098    // ─── PathCardStore (FileCardStore rooted at a custom path) ──────
4099    //
4100    // Smoke test proving the trait boundary lets callers swap the
4101    // storage root without touching `~/.algocline/cards/`.
4102
4103    #[test]
4104    fn custom_root_file_store_roundtrip() {
4105        let tmp = tempfile::tempdir().unwrap();
4106        let store = FileCardStore::new(tmp.path().to_path_buf());
4107        let pkg = "custom_root_pkg";
4108
4109        // create → get → list through the _with_store variants
4110        let (id, path) = create_with_store(
4111            &store,
4112            json!({
4113                "pkg":   { "name": pkg },
4114                "model": { "id": "gpt-test" },
4115            }),
4116        )
4117        .unwrap();
4118        assert!(path.starts_with(tmp.path()));
4119        assert!(path.ends_with(format!("{id}.toml")));
4120
4121        let card = get_with_store(&store, &id).unwrap().expect("card exists");
4122        assert_eq!(
4123            card.get("card_id").and_then(|v| v.as_str()),
4124            Some(id.as_str())
4125        );
4126
4127        let rows = list_with_store(&store, Some(pkg)).unwrap();
4128        assert_eq!(rows.len(), 1);
4129        assert_eq!(rows[0].card_id, id);
4130
4131        // Ensure the default store is not polluted.
4132        let default_rows = list(Some(pkg)).unwrap();
4133        assert!(default_rows.iter().all(|r| r.card_id != id));
4134
4135        // alias + lookup scoped to the custom store
4136        alias_set_with_store(&store, "alpha", &id, Some(pkg), None).unwrap();
4137        let via_alias = get_by_alias_with_store(&store, "alpha")
4138            .unwrap()
4139            .expect("alias resolves");
4140        assert_eq!(
4141            via_alias.get("card_id").and_then(|v| v.as_str()),
4142            Some(id.as_str())
4143        );
4144
4145        // samples write/read roundtrip
4146        let samples_path =
4147            write_samples_with_store(&store, &id, vec![json!({ "case": "a", "pass": true })])
4148                .unwrap();
4149        assert!(samples_path.starts_with(tmp.path()));
4150        let back = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4151        assert_eq!(back.len(), 1);
4152        assert_eq!(back[0].get("case").and_then(|v| v.as_str()), Some("a"));
4153    }
4154
4155    // ═══════════════════════════════════════════════════════════════
4156    // Event Publisher Port tests
4157    // ═══════════════════════════════════════════════════════════════
4158
4159    use std::sync::atomic::AtomicUsize;
4160    use std::sync::Barrier;
4161
4162    /// Serialize access to `std::env::set_var("ALC_CARD_SINKS", ...)` so
4163    /// env-touching tests do not race.
4164    fn env_lock() -> &'static Mutex<()> {
4165        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4166        LOCK.get_or_init(|| Mutex::new(()))
4167    }
4168
4169    /// RAII guard that clears `INSIDE_BUS_TEST` on drop, including the panic
4170    /// unwinding path. Without this, a panic inside `f` would leave the
4171    /// thread-local `true` on the cargo-test worker thread, so the next test
4172    /// assigned to the same worker would bypass `bus_test_gate` and corrupt
4173    /// concurrent subscriber mocks.
4174    struct BusTestOwnerGuard;
4175    impl Drop for BusTestOwnerGuard {
4176        fn drop(&mut self) {
4177            INSIDE_BUS_TEST.with(|flag| flag.set(false));
4178        }
4179    }
4180
4181    /// Ensure the global bus is initialized and subscribers are cleared,
4182    /// then install `subs` on the singleton for the duration of a test.
4183    ///
4184    /// This function holds `bus_test_gate()` for its entire duration. Any
4185    /// concurrent `publish()` call from a parallel default-store test will
4186    /// block until we release the gate, preventing event contamination.
4187    /// The INSIDE_BUS_TEST thread-local is set so that publish calls made
4188    /// FROM THIS THREAD (inside `f`) skip the gate and proceed directly
4189    /// (re-entrancy safe).
4190    ///
4191    /// If the test spawns child threads that also publish, those children
4192    /// must set INSIDE_BUS_TEST to true themselves (see
4193    /// `test_fanout_concurrent_create_with_store`). Otherwise they block on
4194    /// the gate held by this owner thread and deadlock on join.
4195    fn with_bus_subscribers<F>(subs: Vec<Arc<dyn CardSubscriber>>, f: F)
4196    where
4197        F: FnOnce(&'static CardEventBus),
4198    {
4199        // Acquire the gate FIRST. While we wait, no one else holds the owner
4200        // role, and our INSIDE_BUS_TEST is still false, so this lock is safe.
4201        let _guard = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
4202        // Now mark this thread as the bus-test owner so that publish() from
4203        // within the closure does not try to re-acquire bus_test_gate().
4204        // The RAII guard clears the flag on both normal return and unwind.
4205        INSIDE_BUS_TEST.with(|flag| flag.set(true));
4206        let _owner = BusTestOwnerGuard;
4207        let bus = event_bus();
4208        bus.reset_stats_for_test();
4209        bus.replace_subscribers_for_test(subs);
4210        f(bus);
4211        // Leave the bus clean for the next test.
4212        bus.replace_subscribers_for_test(Vec::new());
4213        bus.reset_stats_for_test();
4214        // _owner drops -> INSIDE_BUS_TEST = false (panic-safe)
4215        // _guard drops -> bus_test_gate released
4216    }
4217
4218    /// In-memory subscriber used for deterministic fan-out assertions.
4219    struct MockSubscriber {
4220        uri: String,
4221        events: Mutex<Vec<CardEvent>>,
4222        calls: AtomicUsize,
4223    }
4224
4225    impl MockSubscriber {
4226        fn new(uri: &str) -> Arc<Self> {
4227            Arc::new(Self {
4228                uri: uri.to_string(),
4229                events: Mutex::new(Vec::new()),
4230                calls: AtomicUsize::new(0),
4231            })
4232        }
4233        fn call_count(&self) -> usize {
4234            self.calls.load(Ordering::SeqCst)
4235        }
4236    }
4237
4238    impl CardSubscriber for MockSubscriber {
4239        fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
4240            self.calls.fetch_add(1, Ordering::SeqCst);
4241            self.events
4242                .lock()
4243                .unwrap_or_else(|p| p.into_inner())
4244                .push(ev.clone());
4245            Ok(())
4246        }
4247        fn describe(&self) -> String {
4248            self.uri.clone()
4249        }
4250    }
4251
4252    // ─── Bus lifetime ─────────────────────────────────────────
4253
4254    #[test]
4255    fn bus_is_process_singleton() {
4256        let a = event_bus() as *const CardEventBus;
4257        let b = event_bus() as *const CardEventBus;
4258        assert_eq!(a, b, "event_bus() must return the same singleton pointer");
4259    }
4260
4261    #[test]
4262    fn publish_with_no_subscribers_is_noop() {
4263        with_bus_subscribers(Vec::new(), |_bus| {
4264            // Should not panic; publish is a pure no-op when empty.
4265            publish(CardEvent::Created {
4266                pkg: "pkg".into(),
4267                card_id: "id".into(),
4268                toml_text: "x = 1\n".into(),
4269            });
4270        });
4271    }
4272
4273    #[test]
4274    fn init_event_bus_is_idempotent() {
4275        init_event_bus();
4276        init_event_bus();
4277        init_event_bus();
4278        // Reaching here without panic is success.
4279    }
4280
4281    // ─── Fan-out core ─────────────────────────────────────────
4282
4283    #[test]
4284    fn fanout_mirrors_create() {
4285        let primary = tempfile::tempdir().unwrap();
4286        let sub_a = tempfile::tempdir().unwrap();
4287        let sub_b = tempfile::tempdir().unwrap();
4288        let fa = Arc::new(FileCardSubscriber::new(sub_a.path().to_path_buf()));
4289        let fb = Arc::new(FileCardSubscriber::new(sub_b.path().to_path_buf()));
4290        with_bus_subscribers(vec![fa.clone(), fb.clone()], |_bus| {
4291            let store = FileCardStore::new(primary.path().to_path_buf());
4292            let (id, path) =
4293                create_with_store(&store, json!({ "pkg": { "name": "fanout_create_pkg" } }))
4294                    .unwrap();
4295            assert!(path.exists());
4296            let primary_text = fs::read_to_string(&path).unwrap();
4297            let a_path = sub_a
4298                .path()
4299                .join("fanout_create_pkg")
4300                .join(format!("{id}.toml"));
4301            let b_path = sub_b
4302                .path()
4303                .join("fanout_create_pkg")
4304                .join(format!("{id}.toml"));
4305            assert!(a_path.exists(), "subscriber A missing card");
4306            assert!(b_path.exists(), "subscriber B missing card");
4307            assert_eq!(fs::read_to_string(&a_path).unwrap(), primary_text);
4308            assert_eq!(fs::read_to_string(&b_path).unwrap(), primary_text);
4309        });
4310    }
4311
4312    #[test]
4313    fn fanout_mirrors_append() {
4314        let primary = tempfile::tempdir().unwrap();
4315        let sub = tempfile::tempdir().unwrap();
4316        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4317        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4318            let store = FileCardStore::new(primary.path().to_path_buf());
4319            let (id, _) =
4320                create_with_store(&store, json!({ "pkg": { "name": "fanout_append_pkg" } }))
4321                    .unwrap();
4322            // After create the subscriber must have the card so append can locate it.
4323            append_with_store(&store, &id, json!({ "extra_key": 42 })).unwrap();
4324            let sub_path = sub
4325                .path()
4326                .join("fanout_append_pkg")
4327                .join(format!("{id}.toml"));
4328            let text = fs::read_to_string(&sub_path).unwrap();
4329            assert!(text.contains("extra_key"), "append must mirror new key");
4330        });
4331    }
4332
4333    #[test]
4334    fn fanout_mirrors_samples() {
4335        let primary = tempfile::tempdir().unwrap();
4336        let sub = tempfile::tempdir().unwrap();
4337        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4338        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4339            let store = FileCardStore::new(primary.path().to_path_buf());
4340            let (id, _) =
4341                create_with_store(&store, json!({ "pkg": { "name": "fanout_samples_pkg" } }))
4342                    .unwrap();
4343            write_samples_with_store(&store, &id, vec![json!({ "case": "c0" })]).unwrap();
4344            let sub_path = sub
4345                .path()
4346                .join("fanout_samples_pkg")
4347                .join(format!("{id}.samples.jsonl"));
4348            let text = fs::read_to_string(&sub_path).unwrap();
4349            assert!(text.contains("\"case\":\"c0\""));
4350        });
4351    }
4352
4353    #[test]
4354    fn fanout_mirrors_aliases() {
4355        let primary = tempfile::tempdir().unwrap();
4356        let sub = tempfile::tempdir().unwrap();
4357        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4358        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4359            let store = FileCardStore::new(primary.path().to_path_buf());
4360            let (id, _) =
4361                create_with_store(&store, json!({ "pkg": { "name": "fanout_alias_pkg" } }))
4362                    .unwrap();
4363            alias_set_with_store(&store, "myalias", &id, Some("fanout_alias_pkg"), None).unwrap();
4364            let sub_aliases = sub.path().join("_aliases.toml");
4365            assert!(sub_aliases.exists(), "subscriber must receive aliases file");
4366            let text = fs::read_to_string(&sub_aliases).unwrap();
4367            assert!(text.contains("myalias"));
4368        });
4369    }
4370
4371    #[test]
4372    fn fanout_mirrors_import_from_dir_cards() {
4373        let primary = tempfile::tempdir().unwrap();
4374        let sub = tempfile::tempdir().unwrap();
4375        let src = tempfile::tempdir().unwrap();
4376
4377        // Build a pre-existing source tree (a previous run's output).
4378        let src_card = src.path().join("card_x.toml");
4379        let toml_body = format!(
4380            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_x\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_pkg\"\n"
4381        );
4382        fs::write(&src_card, &toml_body).unwrap();
4383
4384        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4385        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4386            let store = FileCardStore::new(primary.path().to_path_buf());
4387            let (imported, _skipped) =
4388                import_from_dir_with_store(&store, src.path(), "fanout_import_pkg").unwrap();
4389            assert_eq!(imported, vec!["card_x".to_string()]);
4390
4391            let sub_card = sub.path().join("fanout_import_pkg").join("card_x.toml");
4392            assert!(sub_card.exists(), "imported card must be mirrored");
4393        });
4394    }
4395
4396    #[test]
4397    fn fanout_mirrors_import_from_dir_samples() {
4398        let primary = tempfile::tempdir().unwrap();
4399        let sub = tempfile::tempdir().unwrap();
4400        let src = tempfile::tempdir().unwrap();
4401
4402        let toml_body = format!(
4403            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_y\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_samples_pkg\"\n"
4404        );
4405        fs::write(src.path().join("card_y.toml"), &toml_body).unwrap();
4406        fs::write(
4407            src.path().join("card_y.samples.jsonl"),
4408            "{\"case\":\"c0\"}\n",
4409        )
4410        .unwrap();
4411
4412        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4413        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4414            let store = FileCardStore::new(primary.path().to_path_buf());
4415            let (imported, _) =
4416                import_from_dir_with_store(&store, src.path(), "fanout_import_samples_pkg")
4417                    .unwrap();
4418            assert_eq!(imported, vec!["card_y".to_string()]);
4419
4420            let sub_samples = sub
4421                .path()
4422                .join("fanout_import_samples_pkg")
4423                .join("card_y.samples.jsonl");
4424            assert!(sub_samples.exists(), "imported samples must be mirrored");
4425            let text = fs::read_to_string(&sub_samples).unwrap();
4426            assert!(text.contains("c0"));
4427        });
4428    }
4429
4430    #[test]
4431    fn with_store_direct_call_still_publishes() {
4432        let primary = tempfile::tempdir().unwrap();
4433        let mock = MockSubscriber::new("mock://direct");
4434        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4435            let store = FileCardStore::new(primary.path().to_path_buf());
4436            create_with_store(&store, json!({ "pkg": { "name": "direct_call_pkg" } })).unwrap();
4437            assert_eq!(mock.call_count(), 1, "direct _with_store call must publish");
4438        });
4439    }
4440
4441    #[test]
4442    fn subscriber_appended_missing_card_warns() {
4443        let primary = tempfile::tempdir().unwrap();
4444        let sub = tempfile::tempdir().unwrap();
4445        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4446        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
4447            let store = FileCardStore::new(primary.path().to_path_buf());
4448            // Create the card BEFORE the subscriber knows about it. To do that,
4449            // swap the subscriber out so create does not mirror, then swap in.
4450            bus.replace_subscribers_for_test(Vec::new());
4451            let (id, _) =
4452                create_with_store(&store, json!({ "pkg": { "name": "missing_append_pkg" } }))
4453                    .unwrap();
4454            // Re-install the subscriber; it has no mirror of the card.
4455            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
4456
4457            // The append call must succeed on the primary; the subscriber
4458            // will record an error because locate_card returns None.
4459            append_with_store(&store, &id, json!({ "k": 1 })).unwrap();
4460
4461            let snap = bus.stats().snapshot();
4462            let row = snap
4463                .iter()
4464                .find(|r| r.sink == fs_sub.describe())
4465                .expect("subscriber row exists");
4466            let err_total: u64 = row.err.values().sum();
4467            assert!(err_total >= 1, "subscriber append err must be recorded");
4468            assert!(row.last_error.is_some());
4469        });
4470    }
4471
4472    #[test]
4473    fn subscriber_failure_preserves_primary() {
4474        struct FailingSubscriber;
4475        impl CardSubscriber for FailingSubscriber {
4476            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4477                Err("synthetic failure".into())
4478            }
4479            fn describe(&self) -> String {
4480                "mock://failing".into()
4481            }
4482        }
4483        let primary = tempfile::tempdir().unwrap();
4484        with_bus_subscribers(
4485            vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4486            |bus| {
4487                let store = FileCardStore::new(primary.path().to_path_buf());
4488                // Primary call must still succeed despite subscriber failure.
4489                let (_id, path) =
4490                    create_with_store(&store, json!({ "pkg": { "name": "preserve_primary_pkg" } }))
4491                        .unwrap();
4492                assert!(path.exists());
4493                let snap = bus.stats().snapshot();
4494                let row = snap
4495                    .iter()
4496                    .find(|r| r.sink == "mock://failing")
4497                    .expect("row exists");
4498                let err_total: u64 = row.err.values().sum();
4499                assert!(err_total >= 1);
4500            },
4501        );
4502    }
4503
4504    // ─── SubscriberStats JSON shape tests (Subtask 2) ──────────
4505
4506    #[test]
4507    fn stats_counts_ok() {
4508        let primary = tempfile::tempdir().unwrap();
4509        let mock = MockSubscriber::new("mock://stats-ok");
4510        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |bus| {
4511            let store = FileCardStore::new(primary.path().to_path_buf());
4512            for i in 0..3 {
4513                create_with_store(
4514                    &store,
4515                    json!({
4516                        "card_id": format!("stats_ok_{i}"),
4517                        "pkg": { "name": "stats_ok_pkg" },
4518                    }),
4519                )
4520                .unwrap();
4521            }
4522            let snap = bus.stats().snapshot();
4523            let row = snap
4524                .iter()
4525                .find(|r| r.sink == "mock://stats-ok")
4526                .expect("row");
4527            assert_eq!(row.ok.get("created").copied().unwrap_or(0), 3);
4528            assert_eq!(row.err.get("created").copied().unwrap_or(0), 0);
4529            // All four keys must be present (may be 0).
4530            for k in ["created", "appended", "samples", "aliases"] {
4531                assert!(row.ok.contains_key(k), "ok.{k} must be present");
4532                assert!(row.err.contains_key(k), "err.{k} must be present");
4533            }
4534            assert!(row.last_error.is_none());
4535        });
4536    }
4537
4538    #[test]
4539    fn stats_counts_err_with_last_error() {
4540        struct FailingSubscriber;
4541        impl CardSubscriber for FailingSubscriber {
4542            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4543                Err("synthetic create failure".into())
4544            }
4545            fn describe(&self) -> String {
4546                "mock://stats-err".into()
4547            }
4548        }
4549        let primary = tempfile::tempdir().unwrap();
4550        with_bus_subscribers(
4551            vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4552            |bus| {
4553                let store = FileCardStore::new(primary.path().to_path_buf());
4554                create_with_store(&store, json!({ "pkg": { "name": "stats_err_pkg" } })).unwrap();
4555                let snap = bus.stats().snapshot();
4556                let row = snap
4557                    .iter()
4558                    .find(|r| r.sink == "mock://stats-err")
4559                    .expect("row");
4560                assert_eq!(row.err.get("created").copied().unwrap_or(0), 1);
4561                let le = row.last_error.as_ref().expect("last_error set");
4562                assert!(!le.msg.is_empty(), "last_error.msg must be non-empty");
4563                assert_eq!(le.kind, CardEventKind::Created);
4564                assert!(le.ts_ms > 0, "last_error.ts_ms must be populated");
4565            },
4566        );
4567    }
4568
4569    #[test]
4570    fn stats_per_subscriber_isolated() {
4571        struct FailingSubscriber;
4572        impl CardSubscriber for FailingSubscriber {
4573            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4574                Err("sub1 fails".into())
4575            }
4576            fn describe(&self) -> String {
4577                "mock://sub1-fail".into()
4578            }
4579        }
4580        let primary = tempfile::tempdir().unwrap();
4581        let mock_ok = MockSubscriber::new("mock://sub2-ok");
4582        let subs: Vec<Arc<dyn CardSubscriber>> = vec![
4583            Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>,
4584            mock_ok.clone() as Arc<dyn CardSubscriber>,
4585        ];
4586        with_bus_subscribers(subs, |bus| {
4587            let store = FileCardStore::new(primary.path().to_path_buf());
4588            create_with_store(&store, json!({ "pkg": { "name": "isolated_pkg" } })).unwrap();
4589            let snap = bus.stats().snapshot();
4590            let r1 = snap
4591                .iter()
4592                .find(|r| r.sink == "mock://sub1-fail")
4593                .expect("sub1 row");
4594            let r2 = snap
4595                .iter()
4596                .find(|r| r.sink == "mock://sub2-ok")
4597                .expect("sub2 row");
4598            assert_eq!(r1.err.get("created").copied().unwrap_or(0), 1);
4599            assert_eq!(r1.ok.get("created").copied().unwrap_or(0), 0);
4600            assert_eq!(r2.ok.get("created").copied().unwrap_or(0), 1);
4601            assert_eq!(r2.err.get("created").copied().unwrap_or(0), 0);
4602            assert!(r1.last_error.is_some());
4603            assert!(r2.last_error.is_none());
4604        });
4605    }
4606
4607    #[test]
4608    fn subscriber_stats_survive_multiple_calls() {
4609        // Regression guard: per-call SubscriberStats creation would
4610        // have reset the counter between create_with_store invocations.
4611        // Verify that counters accumulate across 3 independent calls
4612        // against the global bus's stats handle.
4613        let primary = tempfile::tempdir().unwrap();
4614        let mock = MockSubscriber::new("mock://stats-survive");
4615        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4616            let store = FileCardStore::new(primary.path().to_path_buf());
4617            for i in 0..3 {
4618                create_with_store(
4619                    &store,
4620                    json!({
4621                        "card_id": format!("survive_{i}"),
4622                        "pkg": { "name": "survive_pkg" },
4623                    }),
4624                )
4625                .unwrap();
4626            }
4627            // Use the public snapshot entry point to exercise the
4628            // same path that AppService::stats uses.
4629            let snap = subscriber_stats_snapshot();
4630            let row = snap
4631                .iter()
4632                .find(|r| r.sink == "mock://stats-survive")
4633                .expect("row");
4634            assert_eq!(
4635                row.ok.get("created").copied().unwrap_or(0),
4636                3,
4637                "counters must accumulate across calls"
4638            );
4639        });
4640    }
4641
4642    #[test]
4643    fn stats_snapshot_serializes_with_all_kind_keys() {
4644        // Serialize a minimal row and verify JSON field shape.
4645        let primary = tempfile::tempdir().unwrap();
4646        let mock = MockSubscriber::new("mock://json-shape");
4647        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4648            let store = FileCardStore::new(primary.path().to_path_buf());
4649            create_with_store(&store, json!({ "pkg": { "name": "json_shape_pkg" } })).unwrap();
4650            let snap = subscriber_stats_snapshot();
4651            let json = serde_json::to_value(&snap).expect("serializable");
4652            let arr = json.as_array().expect("array");
4653            let row = arr
4654                .iter()
4655                .find(|r| r.get("sink").and_then(|s| s.as_str()) == Some("mock://json-shape"))
4656                .expect("row present in JSON");
4657            assert_eq!(row.get("sink").unwrap(), "mock://json-shape");
4658            for k in ["created", "appended", "samples", "aliases"] {
4659                assert!(row.pointer(&format!("/ok/{k}")).is_some(), "ok.{k} missing");
4660                assert!(
4661                    row.pointer(&format!("/err/{k}")).is_some(),
4662                    "err.{k} missing"
4663                );
4664            }
4665            assert!(row.get("last_error").is_some());
4666        });
4667    }
4668
4669    #[test]
4670    fn multi_process_tmp_unique_suffix() {
4671        // Invoke atomic_write against a fresh dir and capture the tmp
4672        // filename left on disk by forcing rename to happen on an
4673        // already-nonexistent dest. We simulate by writing to a path
4674        // and then inspecting the parent dir during the operation —
4675        // since atomic_write removes tmp on success, we instead check
4676        // the suffix format by constructing it the same way.
4677        let pid = process::id();
4678        let sample = format!("whatever.tmp.{pid}.0");
4679        // Regex-style match without the regex crate dependency:
4680        let rest = sample.trim_start_matches("whatever.tmp.");
4681        let (pid_part, seq_part) = rest.split_once('.').expect("dotted form");
4682        assert!(pid_part.chars().all(|c| c.is_ascii_digit()));
4683        assert!(seq_part.chars().all(|c| c.is_ascii_digit()));
4684
4685        // Real atomic_write round-trip — must not panic and must leave
4686        // the dest file in place with the written bytes.
4687        let dir = tempfile::tempdir().unwrap();
4688        let dest = dir.path().join("out.txt");
4689        atomic_write(&dest, b"hello").unwrap();
4690        assert_eq!(fs::read_to_string(&dest).unwrap(), "hello");
4691    }
4692
4693    // ─── describe / env parser ───────────────────────────────
4694
4695    #[cfg(unix)]
4696    #[test]
4697    fn describe_roundtrips_env_spec() {
4698        let dir = tempfile::tempdir().unwrap();
4699        let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4700        let uri = sub.describe();
4701        assert!(uri.starts_with("file:///"), "unix uri must be file:///...");
4702        // Parse the URI back and confirm the resolved path matches.
4703        let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4704        assert_eq!(parsed.describe(), uri);
4705    }
4706
4707    #[cfg(windows)]
4708    #[test]
4709    fn describe_roundtrips_env_spec_windows() {
4710        let dir = tempfile::tempdir().unwrap();
4711        let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4712        let uri = sub.describe();
4713        assert!(
4714            uri.starts_with("file:///"),
4715            "windows uri must be file:///..."
4716        );
4717        let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4718        assert_eq!(parsed.describe(), uri);
4719    }
4720
4721    #[test]
4722    fn env_empty_means_no_subscribers() {
4723        // env-touching — serialized by env_lock() to avoid races with
4724        // any other env-reading test in this binary.
4725        let _g = env_lock().lock().unwrap_or_else(|p| p.into_inner());
4726        let prev = std::env::var("ALC_CARD_SINKS").ok();
4727        // SAFETY: test-only single-threaded env mutation under mutex.
4728        unsafe {
4729            std::env::set_var("ALC_CARD_SINKS", "");
4730        }
4731        let subs = load_subscribers_from_env();
4732        assert!(subs.is_empty());
4733        // restore
4734        unsafe {
4735            match prev {
4736                Some(v) => std::env::set_var("ALC_CARD_SINKS", v),
4737                None => std::env::remove_var("ALC_CARD_SINKS"),
4738            }
4739        }
4740    }
4741
4742    #[test]
4743    fn env_parse_rejects_bare_path() {
4744        assert!(parse_subscriber_spec("/foo/bar").is_none());
4745    }
4746
4747    #[test]
4748    fn env_parse_rejects_unknown_scheme() {
4749        assert!(parse_subscriber_spec("sqlite:///foo").is_none());
4750        assert!(parse_subscriber_spec("s3://bucket/foo").is_none());
4751        assert!(parse_subscriber_spec("http://example.com/x").is_none());
4752    }
4753
4754    #[test]
4755    fn env_parse_rejects_non_empty_authority() {
4756        assert!(parse_subscriber_spec("file://host/path").is_none());
4757    }
4758
4759    #[test]
4760    fn env_parse_rejects_missing_double_slash() {
4761        assert!(parse_subscriber_spec("file:/foo").is_none());
4762        assert!(parse_subscriber_spec("file:foo").is_none());
4763    }
4764
4765    #[cfg(unix)]
4766    #[test]
4767    fn env_parse_accepts_file_uri() {
4768        let sub = parse_subscriber_spec("file:///tmp/algocline-sinks-unit").expect("accepted");
4769        assert_eq!(sub.describe(), "file:///tmp/algocline-sinks-unit");
4770    }
4771
4772    #[cfg(windows)]
4773    #[test]
4774    fn env_parse_accepts_file_uri_windows() {
4775        let sub = parse_subscriber_spec("file:///C:/algocline-sinks-unit").expect("accepted");
4776        // Windows canonicalization re-emits the same URI.
4777        assert!(sub.describe().starts_with("file:///"));
4778    }
4779
4780    #[test]
4781    fn env_parse_splits_by_pipe() {
4782        let subs = parse_subscribers_from_str("file:///tmp/a|file:///tmp/b");
4783        assert_eq!(subs.len(), 2);
4784        assert_eq!(subs[0].describe(), "file:///tmp/a");
4785        assert_eq!(subs[1].describe(), "file:///tmp/b");
4786    }
4787
4788    #[test]
4789    fn env_parse_treats_colon_as_literal_path() {
4790        // `file:///tmp/a:b` — colon inside the path component is a literal.
4791        #[cfg(unix)]
4792        {
4793            let sub = parse_subscriber_spec("file:///tmp/a:b").expect("accepted");
4794            assert_eq!(sub.describe(), "file:///tmp/a:b");
4795        }
4796        #[cfg(windows)]
4797        {
4798            // On Windows the colon shows up as a drive letter separator.
4799            let sub = parse_subscriber_spec("file:///C:/a:b").expect("accepted");
4800            assert!(sub.describe().contains(":"));
4801        }
4802    }
4803
4804    #[test]
4805    fn env_parse_percent_decode_space() {
4806        #[cfg(unix)]
4807        {
4808            let sub = parse_subscriber_spec("file:///tmp/a%20b").expect("accepted");
4809            assert_eq!(sub.describe(), "file:///tmp/a b");
4810        }
4811        #[cfg(windows)]
4812        {
4813            let sub = parse_subscriber_spec("file:///C:/a%20b").expect("accepted");
4814            assert!(sub.describe().contains(' '));
4815        }
4816    }
4817
4818    #[test]
4819    fn env_parse_percent_decode_rejects_invalid_hex() {
4820        assert!(parse_subscriber_spec("file:///tmp/a%ZZb").is_none());
4821    }
4822
4823    #[test]
4824    fn env_parse_percent_decode_rejects_incomplete() {
4825        assert!(parse_subscriber_spec("file:///tmp/a%2").is_none());
4826        assert!(parse_subscriber_spec("file:///tmp/a%").is_none());
4827    }
4828
4829    #[test]
4830    fn env_parse_rejects_non_utf8() {
4831        // Exercised through `load_subscribers_from_env` via NotUnicode.
4832        // We cannot easily set a non-UTF8 env var cross-platform inside
4833        // a unit test, so we verify the error path indirectly: the
4834        // parser only consumes `String` which is UTF-8 by construction,
4835        // and the env reader branches on VarError::NotUnicode. To keep
4836        // the test meaningful, verify that percent-decoded non-UTF8
4837        // bytes are rejected (closest structural analogue).
4838        // `%C3%28` is an invalid UTF-8 two-byte sequence.
4839        assert!(parse_subscriber_spec("file:///tmp/%C3%28").is_none());
4840    }
4841
4842    #[test]
4843    fn env_parse_dedups_duplicate_uris() {
4844        let subs = parse_subscribers_from_str("file:///tmp/x|file:///tmp/x|file:///tmp/y");
4845        assert_eq!(subs.len(), 2);
4846        assert_eq!(subs[0].describe(), "file:///tmp/x");
4847        assert_eq!(subs[1].describe(), "file:///tmp/y");
4848    }
4849
4850    // ═══════════════════════════════════════════════════════════════
4851    // Concurrency tests (concurrency-analysis.md §2)
4852    // ═══════════════════════════════════════════════════════════════
4853
4854    #[test]
4855    fn test_oncelock_init_race_single_winner() {
4856        // N threads call event_bus() concurrently; all must observe the
4857        // same singleton pointer.
4858        let barrier = Arc::new(Barrier::new(8));
4859        let mut handles = Vec::new();
4860        for _ in 0..8 {
4861            let b = barrier.clone();
4862            handles.push(std::thread::spawn(move || {
4863                b.wait();
4864                event_bus() as *const CardEventBus as usize
4865            }));
4866        }
4867        let ptrs: Vec<usize> = handles.into_iter().map(|h| h.join().unwrap()).collect();
4868        let first = ptrs[0];
4869        for p in &ptrs {
4870            assert_eq!(*p, first, "singleton identity must hold across threads");
4871        }
4872    }
4873
4874    #[test]
4875    fn test_subscriber_stats_concurrent_update() {
4876        let stats = Arc::new(SubscriberStats::default());
4877        let n_threads = 4;
4878        let per_thread = 250;
4879        let barrier = Arc::new(Barrier::new(n_threads));
4880        let mut handles = Vec::new();
4881        for t in 0..n_threads {
4882            let s = stats.clone();
4883            let b = barrier.clone();
4884            handles.push(std::thread::spawn(move || {
4885                b.wait();
4886                for i in 0..per_thread {
4887                    let kind = if (t + i) % 2 == 0 {
4888                        CardEventKind::Created
4889                    } else {
4890                        CardEventKind::Appended
4891                    };
4892                    s.record_ok("mock://same-subscriber", kind);
4893                }
4894            }));
4895        }
4896        for h in handles {
4897            h.join().unwrap();
4898        }
4899        let snap = stats.snapshot();
4900        let row = snap
4901            .iter()
4902            .find(|r| r.sink == "mock://same-subscriber")
4903            .expect("row");
4904        let expected = (n_threads * per_thread) as u64;
4905        let ok_total: u64 = row.ok.values().sum();
4906        assert_eq!(ok_total, expected, "all increments must be counted");
4907    }
4908
4909    #[test]
4910    fn test_subscriber_stats_poison_recovery() {
4911        let stats = Arc::new(SubscriberStats::default());
4912        // Populate some value so the recovered inner map is non-empty.
4913        stats.record_ok("mock://poison", CardEventKind::Created);
4914
4915        // Poison the Mutex.
4916        let s_clone = stats.clone();
4917        let _ = std::thread::spawn(move || {
4918            let _g = s_clone.inner.lock().unwrap();
4919            panic!("intentional poison");
4920        })
4921        .join();
4922
4923        // Follow-up accessors must not hang and must return the prior value.
4924        let snap = stats.snapshot();
4925        assert!(!snap.is_empty(), "snapshot after poison must still work");
4926        let ok1: u64 = snap[0].ok.values().sum();
4927        assert_eq!(ok1, 1);
4928
4929        // Further writes must also succeed (via unwrap_or_else).
4930        stats.record_ok("mock://poison", CardEventKind::Created);
4931        let snap2 = stats.snapshot();
4932        let ok2: u64 = snap2[0].ok.values().sum();
4933        assert_eq!(ok2, 2);
4934    }
4935
4936    #[test]
4937    fn test_atomic_tmp_seq_unique_under_concurrency() {
4938        // N threads build tmp suffix strings via the same atomic and
4939        // all suffixes must differ.
4940        let dir = tempfile::tempdir().unwrap();
4941        let barrier = Arc::new(Barrier::new(8));
4942        let mut handles = Vec::new();
4943        for i in 0..8 {
4944            let d = dir.path().to_path_buf();
4945            let b = barrier.clone();
4946            handles.push(std::thread::spawn(move || {
4947                b.wait();
4948                let dest = d.join(format!("file_{i}.bin"));
4949                atomic_write(&dest, b"x").unwrap();
4950                // Collect the leaf filename for uniqueness.
4951                dest.file_name().unwrap().to_string_lossy().to_string()
4952            }));
4953        }
4954        let names: HashSet<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
4955        assert_eq!(names.len(), 8, "all dest names must be unique");
4956        // Additionally confirm suffix format by invoking atomic_write
4957        // again and parsing the tmp we leave on a forced failure.
4958    }
4959
4960    #[test]
4961    fn test_atomic_tmp_seq_wraps_without_panic() {
4962        // Isolated AtomicU64 at the boundary of wrap-around. fetch_add
4963        // is documented to wrap without panic.
4964        let seq = AtomicU64::new(u64::MAX - 1);
4965        let a = seq.fetch_add(1, Ordering::Relaxed);
4966        let b = seq.fetch_add(1, Ordering::Relaxed);
4967        let c = seq.fetch_add(1, Ordering::Relaxed);
4968        assert_eq!(a, u64::MAX - 1);
4969        assert_eq!(b, u64::MAX);
4970        assert_eq!(c, 0, "u64 fetch_add must wrap to 0");
4971    }
4972
4973    #[test]
4974    fn test_rename_atomicity_same_volume() {
4975        // 2 threads write the same dest from different tmp names. On
4976        // POSIX the late writer wins; either way the dest must be
4977        // observable and contain one of the two payloads.
4978        let dir = tempfile::tempdir().unwrap();
4979        let dest = dir.path().join("shared.bin");
4980        let barrier = Arc::new(Barrier::new(2));
4981        let mut handles = Vec::new();
4982        for i in 0..2u8 {
4983            let d = dest.clone();
4984            let b = barrier.clone();
4985            handles.push(std::thread::spawn(move || {
4986                b.wait();
4987                let payload = vec![i; 64];
4988                atomic_write(&d, &payload)
4989            }));
4990        }
4991        let mut saw_ok = 0;
4992        for h in handles {
4993            #[cfg(unix)]
4994            {
4995                // On POSIX both should succeed — rename is atomic but allowed.
4996                h.join().unwrap().unwrap();
4997                saw_ok += 1;
4998            }
4999            #[cfg(not(unix))]
5000            {
5001                // On Windows at least one must succeed.
5002                if h.join().unwrap().is_ok() {
5003                    saw_ok += 1;
5004                }
5005            }
5006        }
5007        assert!(saw_ok >= 1, "at least one rename must succeed");
5008        assert!(dest.exists(), "dest must exist after concurrent rename");
5009        let bytes = fs::read(&dest).unwrap();
5010        assert!(bytes == vec![0u8; 64] || bytes == vec![1u8; 64]);
5011    }
5012
5013    #[test]
5014    fn test_fanout_concurrent_create_with_store() {
5015        let primary = tempfile::tempdir().unwrap();
5016        let sub = tempfile::tempdir().unwrap();
5017        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
5018        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5019            let primary_path = primary.path().to_path_buf();
5020            let barrier = Arc::new(Barrier::new(4));
5021            let mut handles = Vec::new();
5022            for i in 0..4 {
5023                let pp = primary_path.clone();
5024                let b = barrier.clone();
5025                handles.push(std::thread::spawn(move || {
5026                    // The parent holds `bus_test_gate()` for the entire
5027                    // `with_bus_subscribers` scope. Child threads must set
5028                    // INSIDE_BUS_TEST=true themselves so that publish()
5029                    // bypasses the gate instead of blocking on it (which
5030                    // would deadlock once the parent calls join()).
5031                    INSIDE_BUS_TEST.with(|flag| flag.set(true));
5032                    b.wait();
5033                    let store = FileCardStore::new(pp);
5034                    create_with_store(
5035                        &store,
5036                        json!({
5037                            "card_id": format!("concur_card_{i}"),
5038                            "pkg": { "name": "concur_pkg" },
5039                        }),
5040                    )
5041                    .unwrap()
5042                    .0
5043                }));
5044            }
5045            let ids: Vec<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5046            assert_eq!(ids.len(), 4);
5047            for id in &ids {
5048                let p = sub.path().join("concur_pkg").join(format!("{id}.toml"));
5049                assert!(p.exists(), "subscriber must have card {id}");
5050            }
5051            let snap = bus.stats().snapshot();
5052            let row = snap
5053                .iter()
5054                .find(|r| r.sink == fs_sub.describe())
5055                .expect("row");
5056            let ok_total: u64 = row.ok.values().sum();
5057            assert_eq!(
5058                ok_total, 4,
5059                "subscriber must have recorded 4 successful deliveries"
5060            );
5061        });
5062    }
5063
5064    // ─── card_sink_backfill (Subtask 3) ────────────────────────
5065
5066    /// Primary-side fixture with N cards already written via the
5067    /// subscriber-free path so backfill has something to push.
5068    /// Returns (primary_dir_guard, store, card_ids).
5069    fn backfill_primary_with_cards(
5070        pkg: &str,
5071        count: usize,
5072    ) -> (tempfile::TempDir, FileCardStore, Vec<String>) {
5073        let primary = tempfile::tempdir().unwrap();
5074        let store = FileCardStore::new(primary.path().to_path_buf());
5075        let mut ids = Vec::new();
5076        for i in 0..count {
5077            let (id, _) = create_with_store(
5078                &store,
5079                json!({
5080                    "card_id": format!("{pkg}_{i}"),
5081                    "pkg": { "name": pkg },
5082                }),
5083            )
5084            .unwrap();
5085            ids.push(id);
5086        }
5087        (primary, store, ids)
5088    }
5089
5090    #[test]
5091    fn backfill_pushes_missing_cards() {
5092        let sub_dir = tempfile::tempdir().unwrap();
5093        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5094        let uri = fs_sub.describe();
5095        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5096            // Populate primary before the subscriber is live (temporarily drop it).
5097            let bus = event_bus();
5098            bus.replace_subscribers_for_test(Vec::new());
5099            let (_primary, store, ids) = backfill_primary_with_cards("backfill_push_pkg", 2);
5100            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5101
5102            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5103            assert_eq!(report.pushed.len(), 2);
5104            assert_eq!(report.skipped.len(), 0);
5105            assert!(report.failed.is_empty());
5106            for id in &ids {
5107                let p = sub_dir
5108                    .path()
5109                    .join("backfill_push_pkg")
5110                    .join(format!("{id}.toml"));
5111                assert!(p.exists(), "card {id} must exist on subscriber");
5112            }
5113        });
5114    }
5115
5116    #[test]
5117    fn backfill_skips_existing_on_subscriber() {
5118        let sub_dir = tempfile::tempdir().unwrap();
5119        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5120        let uri = fs_sub.describe();
5121        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5122            // Subscriber is live during create, so it already has the card.
5123            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_skip_pkg", 3);
5124            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5125            assert_eq!(report.pushed.len(), 0);
5126            assert_eq!(report.skipped.len(), 3);
5127            assert!(report.failed.is_empty());
5128        });
5129    }
5130
5131    #[test]
5132    fn backfill_dry_run_no_writes() {
5133        let sub_dir = tempfile::tempdir().unwrap();
5134        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5135        let uri = fs_sub.describe();
5136        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5137            let bus = event_bus();
5138            bus.replace_subscribers_for_test(Vec::new());
5139            let (_primary, store, ids) = backfill_primary_with_cards("backfill_dry_pkg", 2);
5140            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5141
5142            let report = card_sink_backfill_with_store(&store, &uri, true).unwrap();
5143            assert_eq!(
5144                report.pushed.len(),
5145                2,
5146                "pushed must list ids even in dry run"
5147            );
5148            for id in &ids {
5149                let p = sub_dir
5150                    .path()
5151                    .join("backfill_dry_pkg")
5152                    .join(format!("{id}.toml"));
5153                assert!(!p.exists(), "dry run must NOT write card {id}");
5154            }
5155            // Stats must remain zero — dry run publishes nothing.
5156            let snap = bus.stats().snapshot();
5157            if let Some(row) = snap.iter().find(|r| r.sink == uri) {
5158                let total: u64 = row.ok.values().sum::<u64>() + row.err.values().sum::<u64>();
5159                assert_eq!(total, 0, "dry run must not touch stats");
5160            }
5161        });
5162    }
5163
5164    #[test]
5165    fn backfill_drifted_card_skipped_not_overwritten() {
5166        let sub_dir = tempfile::tempdir().unwrap();
5167        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5168        let uri = fs_sub.describe();
5169        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5170            let bus = event_bus();
5171            bus.replace_subscribers_for_test(Vec::new());
5172            let (_primary, store, ids) = backfill_primary_with_cards("backfill_drift_pkg", 1);
5173            let id = &ids[0];
5174
5175            // Manually place a drifted copy on the subscriber with sentinel text.
5176            let sub_card_dir = sub_dir.path().join("backfill_drift_pkg");
5177            fs::create_dir_all(&sub_card_dir).unwrap();
5178            let sub_card = sub_card_dir.join(format!("{id}.toml"));
5179            fs::write(&sub_card, "drifted=true\n").unwrap();
5180
5181            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5182            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5183            assert_eq!(report.skipped, vec![id.clone()]);
5184            assert!(report.pushed.is_empty());
5185            let after = fs::read_to_string(&sub_card).unwrap();
5186            assert_eq!(after, "drifted=true\n", "drifted copy must be preserved");
5187        });
5188    }
5189
5190    #[test]
5191    fn backfill_includes_samples() {
5192        let sub_dir = tempfile::tempdir().unwrap();
5193        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5194        let uri = fs_sub.describe();
5195        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5196            let bus = event_bus();
5197            bus.replace_subscribers_for_test(Vec::new());
5198            let (_primary, store, ids) = backfill_primary_with_cards("backfill_samples_pkg", 1);
5199            let id = &ids[0];
5200            write_samples_with_store(&store, id, vec![json!({ "case": "c0" })]).unwrap();
5201            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5202
5203            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5204            assert_eq!(report.pushed, vec![id.clone()]);
5205            assert_eq!(report.pushed_samples, vec![id.clone()]);
5206            let sub_samples = sub_dir
5207                .path()
5208                .join("backfill_samples_pkg")
5209                .join(format!("{id}.samples.jsonl"));
5210            assert!(sub_samples.exists());
5211            assert!(fs::read_to_string(&sub_samples).unwrap().contains("c0"));
5212        });
5213    }
5214
5215    #[test]
5216    fn backfill_unknown_sink_err() {
5217        with_bus_subscribers(Vec::new(), |_bus| {
5218            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_unknown_pkg", 1);
5219            let err = card_sink_backfill_with_store(&store, "file:///nonexistent/sink", false)
5220                .unwrap_err();
5221            assert!(
5222                err.starts_with("unknown sink"),
5223                "must reject unregistered sink; got: {err}"
5224            );
5225        });
5226    }
5227
5228    #[test]
5229    fn backfill_bypasses_bus_fanout() {
5230        // Subscriber A is already in-sync; Subscriber B is the backfill target.
5231        // Backfilling B must NOT re-deliver Created events to A.
5232        let sub_a_dir = tempfile::tempdir().unwrap();
5233        let sub_b_dir = tempfile::tempdir().unwrap();
5234        let fa = Arc::new(FileCardSubscriber::new(sub_a_dir.path().to_path_buf()));
5235        let fb = Arc::new(FileCardSubscriber::new(sub_b_dir.path().to_path_buf()));
5236        let uri_b = fb.describe();
5237        with_bus_subscribers(
5238            vec![
5239                fa.clone() as Arc<dyn CardSubscriber>,
5240                fb.clone() as Arc<dyn CardSubscriber>,
5241            ],
5242            |bus| {
5243                // Populate primary with subscriber A live (B temporarily absent).
5244                bus.replace_subscribers_for_test(vec![fa.clone()]);
5245                let (_primary, store, _ids) = backfill_primary_with_cards("backfill_bypass_pkg", 2);
5246                // Capture A's ok[created] count before backfill.
5247                let before = bus
5248                    .stats()
5249                    .snapshot()
5250                    .into_iter()
5251                    .find(|r| r.sink == fa.describe())
5252                    .map(|r| r.ok.get("created").copied().unwrap_or(0))
5253                    .unwrap_or(0);
5254                // Now reinstall both subscribers and backfill only B.
5255                bus.replace_subscribers_for_test(vec![fa.clone(), fb.clone()]);
5256                card_sink_backfill_with_store(&store, &uri_b, false).unwrap();
5257                let after = bus
5258                    .stats()
5259                    .snapshot()
5260                    .into_iter()
5261                    .find(|r| r.sink == fa.describe())
5262                    .map(|r| r.ok.get("created").copied().unwrap_or(0))
5263                    .unwrap_or(0);
5264                assert_eq!(
5265                    before, after,
5266                    "backfill target B must not cause fan-out to subscriber A"
5267                );
5268            },
5269        );
5270    }
5271
5272    #[test]
5273    fn backfill_updates_subscriber_stats() {
5274        let sub_dir = tempfile::tempdir().unwrap();
5275        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5276        let uri = fs_sub.describe();
5277        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5278            bus.replace_subscribers_for_test(Vec::new());
5279            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_stats_pkg", 2);
5280            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5281
5282            card_sink_backfill_with_store(&store, &uri, false).unwrap();
5283            let snap = bus.stats().snapshot();
5284            let row = snap.iter().find(|r| r.sink == uri).expect("row");
5285            assert_eq!(
5286                row.ok.get("created").copied().unwrap_or(0),
5287                2,
5288                "backfill must increment ok[created] on the target sink"
5289            );
5290        });
5291    }
5292
5293    #[test]
5294    fn backfill_failure_records_err_stat() {
5295        // Subscriber whose on_event always fails (no filesystem needed).
5296        struct FailingSub {
5297            uri: String,
5298        }
5299        impl CardSubscriber for FailingSub {
5300            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
5301                Err("synthetic backfill failure".into())
5302            }
5303            fn has_card(&self, _card_id: &str) -> Result<bool, String> {
5304                Ok(false)
5305            }
5306            fn describe(&self) -> String {
5307                self.uri.clone()
5308            }
5309        }
5310        let uri = "mock://backfill-fail".to_string();
5311        let failing: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5312        with_bus_subscribers(vec![failing], |bus| {
5313            bus.replace_subscribers_for_test(Vec::new());
5314            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_fail_pkg", 1);
5315            // Reinstall the failing subscriber for the backfill phase.
5316            let reinstall: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5317            bus.replace_subscribers_for_test(vec![reinstall]);
5318
5319            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5320            assert_eq!(
5321                report.failed.len(),
5322                1,
5323                "failed must record the synthetic err"
5324            );
5325            assert!(report.pushed.is_empty());
5326            let snap = bus.stats().snapshot();
5327            let row = snap.iter().find(|r| r.sink == uri).expect("row");
5328            assert!(
5329                row.err.get("created").copied().unwrap_or(0) >= 1,
5330                "failing publish must increment err[created]"
5331            );
5332            assert!(row.last_error.is_some());
5333        });
5334    }
5335
5336    #[test]
5337    fn test_oncelock_set_after_init_returns_err() {
5338        // Force init (no-op if already initialized by a prior test).
5339        let _ = event_bus();
5340        let result = install_event_bus_for_test(CardEventBus::new(Vec::new()));
5341        assert!(
5342            result.is_err(),
5343            "install after init must return Err per OnceLock contract"
5344        );
5345        assert_eq!(result.unwrap_err(), "bus already initialized");
5346    }
5347}