Skip to main content

algocline_engine/
card.rs

1//! Card storage — immutable run-result snapshots.
2//!
3//! A Card is a frozen record of a strategy run: identity, parameters,
4//! model, scenario, aggregate stats, and (optionally) per-case detail.
5//! Cards are **immutable** — once written they are never modified, only
6//! annotated via additive `append`.  Mutable **aliases** point to a
7//! Card and can be rebound freely.
8//!
9//! ## Design principles
10//!
11//! 1. **Minimal REQUIRED, maximal OPTIONAL** — v0 needs only 4 fields;
12//!    lightweight "ran this pkg" records and heavy optimize snapshots
13//!    share the same schema.
14//! 2. **Immutable append-only** — no overwrite, no delete.  New data is
15//!    added via `append` (new top-level keys only) or by creating a new
16//!    Card with a fresh `card_id`.
17//! 3. **Two-tier storage** — TOML for human-readable aggregate, JSONL
18//!    sidecar for machine-parseable per-case detail.
19//! 4. **File-primary** — files are the source of truth; in-memory state
20//!    is cache.  Cards can be copied, diffed, and version-controlled.
21//!
22//! ## Storage layout (two-tier)
23//!
24//! | Tier | File | Content |
25//! |------|------|---------|
26//! | **Tier 1** | `~/.algocline/cards/{pkg}/{card_id}.toml` | Aggregate scalars, decisions, identity, params |
27//! | **Tier 2** | `~/.algocline/cards/{pkg}/{card_id}.samples.jsonl` | Per-case raw data (JSONL, write-once) |
28//!
29//! Tier 1 holds a shareable summary (a few KB). Tier 2 holds per-case
30//! detail ��� the engine does not interpret its columns; packages define
31//! their own schema.
32//!
33//! Alias table: `~/.algocline/cards/_aliases.toml` (global).
34//!
35//! ## card_id naming
36//!
37//! `{pkg}_{model_short}_{compact_ts}_{hash6}`
38//!
39//! - `compact_ts`: `YYYYMMDDTHHMMSS` in UTC
40//! - `hash6`: first 6 hex chars of DJB2 param fingerprint
41//! - Example: `cot_opus46_20260412T061500_a3f9c1`
42//!
43//! ## v0 schema (frozen)
44//!
45//! ### REQUIRED (minimum valid Card)
46//!
47//! | Field | Type | Example |
48//! |-------|------|---------|
49//! | `schema_version` | string | `"card/v0"` |
50//! | `card_id` | string | `"cot_opus46_20260412T061500_a3f9c1"` |
51//! | `created_at` | string (RFC 3339) | `"2026-04-12T06:15:00Z"` |
52//! | `[pkg].name` | string | `"cot"` |
53//!
54//! ### OPTIONAL (auto-injected where possible)
55//!
56//! | Section | Fields |
57//! |---------|--------|
58//! | `[pkg]` | `version`, `category`, `source`, `source_ref`, `source_sha` |
59//! | `[runtime]` | `alc_version`, `lua_version`, `host_os`, `git_sha` |
60//! | `[model]` | `provider`, `id`, `id_short`, `cutoff` |
61//! | `[params]` | Free-form ctx snapshot; `param_fingerprint` for DJB2 hash |
62//! | `[strategy_params]` | Strategy-tunable parameters surfaced for sweeps / optimizers (e.g. `alpha`, `temperature`, `depth`). Free-form, but `where`-queryable as a first-class section |
63//! | `[scenario]` | `name`, `source`, `case_count`, `grader` |
64//! | `[stats]` | `pass_rate`, `mean_score`, `std`, `median`, `min`, `max`, `n` |
65//! | `[stats.by_bucket]` | Disaggregated sub-bucket stats (array of tables) |
66//! | `[cost]` | `llm_calls`, `input_tokens`, `output_tokens`, `elapsed_ms`, `usd_estimate` |
67//! | `[optimize]` | `target`, `search`, `rounds_used`, `top_k` (for optimize Cards) |
68//! | `[metadata]` | Free-form escape hatch. Recognized lineage conventions: `prior_card_id` (parent Card id), `prior_relation` (relation kind, e.g. `"sweep_variant"`, `"reflection_of"`, `"derived_from"`) |
69//!
70//! ## Lua API (`alc.card.*`)
71//!
72//! | Function | Description |
73//! |----------|-------------|
74//! | `create(table)` | Write new Card (Tier 1). Returns `{ card_id, path }` |
75//! | `get(card_id)` | Read Card by id. Returns table or nil |
76//! | `list(filter?)` | List Cards as summaries (newest first) |
77//! | `find(query?)` | Prisma-style `where` DSL + dotted-path `order_by` + `offset`/`limit` |
78//! | `append(card_id, fields)` | Additive-only annotation (new keys only) |
79//! | `alias_set(name, card_id, opts?)` | Pin mutable alias |
80//! | `alias_list(filter?)` | List aliases |
81//! | `get_by_alias(name)` | Resolve alias → full Card |
82//! | `write_samples(card_id, samples)` | Write Tier 2 sidecar (write-once) |
83//! | `read_samples(card_id, opts?)` | Read Tier 2 with `where` filtering + offset/limit paging |
84//! | `lineage(query)` | Walk ancestry/descendants via `metadata.prior_card_id` |
85
86use std::collections::{HashMap, HashSet};
87use std::fs;
88use std::path::{Path, PathBuf};
89use std::process;
90use std::sync::atomic::{AtomicU64, Ordering};
91use std::sync::{Arc, Mutex, OnceLock};
92
93use serde::{Serialize, Serializer};
94use serde_json::{json, Value as Json};
95
96pub const SCHEMA_VERSION: &str = "card/v0";
97
98// ═══════════════════════════════════════════════════════════════
99// CardStore trait — physical I/O abstraction.
100// ═══════════════════════════════════════════════════════════════
101//
102// Card domain logic (schema / Query DSL / Lineage) is backend-
103// neutral. Only the physical read/write layer is swappable.
104//
105// The default backend is `FileCardStore`, which preserves the
106// legacy `~/.algocline/cards/{pkg}/{card_id}.toml` layout
107// byte-for-byte. Alternative backends (PathCardStore, SqliteCardStore,
108// MemoryCardStore) can be added by implementing this trait.
109//
110// Locators are `PathBuf` values. For FileCardStore they are real
111// filesystem paths; for non-file backends they are synthetic paths
112// (e.g. `sqlite:///db.sqlite#card/{id}`) — the value is opaque to
113// the domain layer and only exposed via the Lua `alc.card.create`
114// / `alc.card.write_samples` return values.
115
116/// Storage backend for Cards.
117///
118/// Implementations must be `Send + Sync` so that they can be shared
119/// across Lua host threads safely. All methods may fail with an
120/// error `String` describing the backend-specific failure.
121pub trait CardStore: Send + Sync {
122    // ─── Card CRUD ─────────────────────────────────────────────
123
124    /// Write a new Card (Tier 1 TOML).
125    ///
126    /// The caller has already:
127    ///   - validated `pkg` and `card_id` via [`validate_name`]
128    ///   - serialized `toml_text` with `toml::to_string_pretty`
129    ///
130    /// Fails if a Card with the same id already exists
131    /// (immutability).  Returns the locator of the written Card.
132    fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
133
134    /// Overwrite an existing Card (append flow).
135    ///
136    /// Append is additive-only w.r.t. keys, but the underlying
137    /// TOML file is rewritten in place; callers must have validated
138    /// the additive-only constraint before calling this.
139    fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
140
141    /// Locate a Card file by id. Returns `None` if not found.
142    fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String>;
143
144    /// Read a Card's raw TOML text by id. Returns `None` if missing.
145    fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String>;
146
147    /// List `(pkg, locator)` pairs for every Card file in the store.
148    ///
149    /// When `pkg_filter` is `Some(name)`, restrict to that pkg
150    /// subdir. Non-existent pkg subdir yields an empty Vec.
151    ///
152    /// Order is implementation-defined — callers sort explicitly.
153    fn list_card_locators(
154        &self,
155        pkg_filter: Option<&str>,
156    ) -> Result<Vec<(String, PathBuf)>, String>;
157
158    /// Read raw TOML text from a locator returned by
159    /// [`Self::list_card_locators`]. `Ok(None)` on read failure so
160    /// scans can skip corrupt files without aborting.
161    fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String>;
162
163    // ─── Alias table ───────────────────────────────────────────
164
165    fn read_aliases(&self) -> Result<Vec<Alias>, String>;
166    fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String>;
167
168    // ─── Samples sidecar ───────────────────────────────────────
169
170    /// Check whether a samples sidecar exists for `card_id`.
171    fn samples_exists(&self, card_id: &str) -> Result<bool, String>;
172
173    /// Write a samples sidecar (write-once).
174    ///
175    /// `jsonl_text` is the complete JSONL payload (one JSON line
176    /// per sample, `\n`-terminated). Fails if a sidecar already
177    /// exists. Returns the locator.
178    fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String>;
179
180    /// Read a samples sidecar as raw JSONL text. Returns `None`
181    /// when no sidecar exists (samples are optional).
182    fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String>;
183
184    // ─── Import ────────────────────────────────────────────────
185
186    /// Import Card files from `source_dir` into the store under
187    /// `pkg`. First-writer wins (existing Cards are skipped).
188    /// Returns `(imported, skipped)` card_id lists.
189    fn import_from_dir(
190        &self,
191        source_dir: &Path,
192        pkg: &str,
193    ) -> Result<(Vec<String>, Vec<String>), String>;
194}
195
196fn validate_name(name: &str, kind: &str) -> Result<(), String> {
197    if name.is_empty()
198        || name.contains('/')
199        || name.contains('\\')
200        || name.contains("..")
201        || name.contains('\0')
202    {
203        return Err(format!("Invalid {kind} name: '{name}'"));
204    }
205    Ok(())
206}
207
208/// DJB2 hash, hex-encoded. Used for param_fingerprint and card_id hash segment.
209fn djb2_hex(s: &str) -> String {
210    let mut h: u64 = 5381;
211    for b in s.bytes() {
212        h = h.wrapping_mul(33).wrapping_add(b as u64);
213    }
214    format!("{h:016x}")
215}
216
217/// Short-hash: last 6 hex chars of djb2.
218///
219/// DJB2's high bits are dominated by the `5381 * 33^n` term (same for any
220/// input of equal length), so the top hex digits collide easily for same-
221/// length inputs that differ only in a few byte positions. The low bits,
222/// driven by the most-recent bytes, mix well enough for short-hash use.
223fn hash6(s: &str) -> String {
224    let hex = djb2_hex(s);
225    let start = hex.len().saturating_sub(6);
226    hex[start..].to_string()
227}
228
229/// Stable serialization of a JSON value for hashing (sorted keys).
230fn stable_json(v: &Json) -> String {
231    let mut buf = String::new();
232    stable_json_into(v, &mut buf);
233    buf
234}
235fn stable_json_into(v: &Json, buf: &mut String) {
236    match v {
237        Json::Null => buf.push_str("null"),
238        Json::Bool(b) => buf.push_str(if *b { "true" } else { "false" }),
239        Json::Number(n) => buf.push_str(&n.to_string()),
240        Json::String(s) => {
241            buf.push('"');
242            buf.push_str(s);
243            buf.push('"');
244        }
245        Json::Array(a) => {
246            buf.push('[');
247            for (i, item) in a.iter().enumerate() {
248                if i > 0 {
249                    buf.push(',');
250                }
251                stable_json_into(item, buf);
252            }
253            buf.push(']');
254        }
255        Json::Object(m) => {
256            let mut keys: Vec<&String> = m.keys().collect();
257            keys.sort();
258            buf.push('{');
259            for (i, k) in keys.iter().enumerate() {
260                if i > 0 {
261                    buf.push(',');
262                }
263                buf.push('"');
264                buf.push_str(k);
265                buf.push_str("\":");
266                stable_json_into(&m[*k], buf);
267            }
268            buf.push('}');
269        }
270    }
271}
272
273/// Derive a short model id (e.g. "claude-opus-4-6" -> "opus46").
274/// v0: best-effort. Falls back to "model" if input is empty.
275fn short_model(id: &str) -> String {
276    if id.is_empty() {
277        return "model".into();
278    }
279    // Strip common vendor prefixes.
280    let stripped = id
281        .strip_prefix("claude-")
282        .or_else(|| id.strip_prefix("gpt-"))
283        .unwrap_or(id);
284    // Keep alnum only.
285    let s: String = stripped
286        .chars()
287        .filter(|c| c.is_ascii_alphanumeric())
288        .collect();
289    if s.is_empty() {
290        "model".into()
291    } else {
292        s
293    }
294}
295
296/// RFC3339 UTC "YYYY-MM-DDTHH:MM:SSZ" from current system time.
297fn now_rfc3339() -> String {
298    let secs = std::time::SystemTime::now()
299        .duration_since(std::time::UNIX_EPOCH)
300        .map(|d| d.as_secs())
301        .unwrap_or(0) as i64;
302    let days = secs.div_euclid(86400);
303    let tod = secs.rem_euclid(86400);
304    let (y, mo, d) = civil_from_days(days);
305    let hh = tod / 3600;
306    let mm = (tod % 3600) / 60;
307    let ss = tod % 60;
308    format!("{y:04}-{mo:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
309}
310
311/// YYYYMMDDTHHMMSS for current UTC time (compact, sortable).
312///
313/// Used in card_id generation so that:
314///   - rapid successive runs don't collide on the hash6 segment
315///   - string sort of card_id = chronological order
316fn now_compact() -> String {
317    let secs = std::time::SystemTime::now()
318        .duration_since(std::time::UNIX_EPOCH)
319        .map(|d| d.as_secs())
320        .unwrap_or(0) as i64;
321    let days = secs.div_euclid(86400);
322    let tod = secs.rem_euclid(86400);
323    let (y, mo, d) = civil_from_days(days);
324    let hh = tod / 3600;
325    let mm = (tod % 3600) / 60;
326    let ss = tod % 60;
327    format!("{y:04}{mo:02}{d:02}T{hh:02}{mm:02}{ss:02}")
328}
329
330/// Howard Hinnant's civil_from_days algorithm.
331fn civil_from_days(z: i64) -> (i32, u32, u32) {
332    let z = z + 719468;
333    let era = if z >= 0 { z } else { z - 146096 } / 146097;
334    let doe = (z - era * 146097) as u64;
335    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
336    let y = yoe as i64 + era * 400;
337    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
338    let mp = (5 * doy + 2) / 153;
339    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
340    let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
341    let y = y + if m <= 2 { 1 } else { 0 };
342    (y as i32, m, d)
343}
344
345/// Converter: serde_json::Value -> toml::Value.
346/// Nulls are dropped (TOML has no null). Mixed-type arrays are allowed in TOML 1.0+.
347fn json_to_toml(v: Json) -> Result<toml::Value, String> {
348    Ok(match v {
349        Json::Null => return Err("TOML does not support null values".into()),
350        Json::Bool(b) => toml::Value::Boolean(b),
351        Json::Number(n) => {
352            if let Some(i) = n.as_i64() {
353                toml::Value::Integer(i)
354            } else if let Some(f) = n.as_f64() {
355                toml::Value::Float(f)
356            } else {
357                return Err(format!("Unsupported number: {n}"));
358            }
359        }
360        Json::String(s) => toml::Value::String(s),
361        Json::Array(a) => {
362            let mut out = Vec::with_capacity(a.len());
363            for item in a {
364                if !item.is_null() {
365                    out.push(json_to_toml(item)?);
366                }
367            }
368            toml::Value::Array(out)
369        }
370        Json::Object(m) => {
371            let mut table = toml::map::Map::new();
372            for (k, val) in m {
373                if val.is_null() {
374                    continue;
375                }
376                table.insert(k, json_to_toml(val)?);
377            }
378            toml::Value::Table(table)
379        }
380    })
381}
382
383/// Converter: toml::Value -> serde_json::Value (for alc.card.get()).
384fn toml_to_json(v: toml::Value) -> Json {
385    match v {
386        toml::Value::String(s) => Json::String(s),
387        toml::Value::Integer(i) => json!(i),
388        toml::Value::Float(f) => json!(f),
389        toml::Value::Boolean(b) => Json::Bool(b),
390        toml::Value::Datetime(dt) => Json::String(dt.to_string()),
391        toml::Value::Array(a) => Json::Array(a.into_iter().map(toml_to_json).collect()),
392        toml::Value::Table(t) => {
393            let mut m = serde_json::Map::new();
394            for (k, v) in t {
395                m.insert(k, toml_to_json(v));
396            }
397            Json::Object(m)
398        }
399    }
400}
401
402/// Extract [pkg].name from an input JSON object. REQUIRED.
403fn require_pkg_name(input: &Json) -> Result<String, String> {
404    let name = input
405        .get("pkg")
406        .and_then(|p| p.get("name"))
407        .and_then(|n| n.as_str())
408        .ok_or_else(|| "alc.card.create: pkg.name is required".to_string())?
409        .to_string();
410    validate_name(&name, "pkg")?;
411    Ok(name)
412}
413
414/// Create a new Card backed by `store`.
415pub fn create_with_store(
416    store: &dyn CardStore,
417    mut input: Json,
418) -> Result<(String, PathBuf), String> {
419    if !input.is_object() {
420        return Err("alc.card.create: input must be a table".into());
421    }
422    let pkg_name = require_pkg_name(&input)?;
423    let obj = input.as_object_mut().unwrap();
424
425    // ─── Auto-inject REQUIRED fields ──────────────────────────
426    obj.entry("schema_version".to_string())
427        .or_insert_with(|| json!(SCHEMA_VERSION));
428    obj.entry("created_at".to_string())
429        .or_insert_with(|| json!(now_rfc3339()));
430    obj.entry("created_by".to_string())
431        .or_insert_with(|| json!(format!("alc@{}", env!("CARGO_PKG_VERSION"))));
432
433    // ─── param_fingerprint (if [params] present) ──────────────
434    if let Some(params) = obj.get("params").cloned() {
435        if params.is_object() {
436            let fp = djb2_hex(&stable_json(&params));
437            obj.insert("param_fingerprint".to_string(), json!(fp));
438        }
439    }
440
441    // ─── card_id generation (if absent) ───────────────────────
442    let card_id = match obj.get("card_id").and_then(|v| v.as_str()) {
443        Some(id) if !id.is_empty() => id.to_string(),
444        _ => {
445            let model_id = obj
446                .get("model")
447                .and_then(|m| m.get("id"))
448                .and_then(|v| v.as_str())
449                .unwrap_or("");
450            let model_short = short_model(model_id);
451            let ts = now_compact();
452            let fp_seed = stable_json(&Json::Object(obj.clone()));
453            let h = hash6(&fp_seed);
454            format!("{pkg_name}_{model_short}_{ts}_{h}")
455        }
456    };
457    validate_name(&card_id, "card_id")?;
458    obj.insert("card_id".to_string(), json!(card_id.clone()));
459
460    let toml_val = json_to_toml(input)?;
461    let text = toml::to_string_pretty(&toml_val)
462        .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
463    let path = store.write_new_card(&pkg_name, &card_id, &text)?;
464
465    publish(CardEvent::Created {
466        pkg: pkg_name.clone(),
467        card_id: card_id.clone(),
468        toml_text: text,
469    });
470
471    Ok((card_id, path))
472}
473
474/// Read a Card from `store` by id. Returns None if not found.
475pub fn get_with_store(store: &dyn CardStore, card_id: &str) -> Result<Option<Json>, String> {
476    let text = match store.read_card_text(card_id)? {
477        Some(t) => t,
478        None => return Ok(None),
479    };
480    let val: toml::Value =
481        toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
482    Ok(Some(toml_to_json(val)))
483}
484
485/// Summary row for `alc.card.list()`.
486#[derive(Debug, Clone)]
487pub struct Summary {
488    pub card_id: String,
489    pub pkg: String,
490    pub created_at: Option<String>,
491    pub model: Option<String>,
492    pub scenario: Option<String>,
493    pub pass_rate: Option<f64>,
494}
495
496impl Summary {
497    fn to_json(&self) -> Json {
498        let mut m = serde_json::Map::new();
499        m.insert("card_id".into(), json!(self.card_id));
500        m.insert("pkg".into(), json!(self.pkg));
501        if let Some(v) = &self.created_at {
502            m.insert("created_at".into(), json!(v));
503        }
504        if let Some(v) = &self.model {
505            m.insert("model".into(), json!(v));
506        }
507        if let Some(v) = &self.scenario {
508            m.insert("scenario".into(), json!(v));
509        }
510        if let Some(v) = self.pass_rate {
511            m.insert("pass_rate".into(), json!(v));
512        }
513        Json::Object(m)
514    }
515}
516
517fn summarize(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<Summary> {
518    let text = store.read_locator_text(locator).ok().flatten()?;
519    let val: toml::Value = toml::from_str(&text).ok()?;
520    let card_id = val
521        .get("card_id")
522        .and_then(|v| v.as_str())
523        .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
524        .to_string();
525    let created_at = val
526        .get("created_at")
527        .and_then(|v| v.as_str())
528        .map(String::from);
529    let model = val
530        .get("model")
531        .and_then(|m| m.get("id"))
532        .and_then(|v| v.as_str())
533        .map(String::from);
534    let scenario = val
535        .get("scenario")
536        .and_then(|s| s.get("name"))
537        .and_then(|v| v.as_str())
538        .map(String::from);
539    let pass_rate = val
540        .get("stats")
541        .and_then(|s| s.get("pass_rate"))
542        .and_then(|v| v.as_float());
543    Some(Summary {
544        card_id,
545        pkg: pkg.to_string(),
546        created_at,
547        model,
548        scenario,
549        pass_rate,
550    })
551}
552
553/// List cards from `store`. `pkg_filter = Some("name")` restricts to that pkg subdir.
554pub fn list_with_store(
555    store: &dyn CardStore,
556    pkg_filter: Option<&str>,
557) -> Result<Vec<Summary>, String> {
558    let locators = store.list_card_locators(pkg_filter)?;
559    let mut out = Vec::with_capacity(locators.len());
560    for (pkg, loc) in &locators {
561        if let Some(s) = summarize(store, loc, pkg) {
562            out.push(s);
563        }
564    }
565
566    // Sort newest first. card_id embeds a compact UTC timestamp so it's
567    // naturally chronological; we still prefer created_at when present
568    // (some callers may override it), falling back to card_id.
569    out.sort_by(|a, b| {
570        b.created_at
571            .cmp(&a.created_at)
572            .then_with(|| b.card_id.cmp(&a.card_id))
573    });
574    Ok(out)
575}
576
577pub fn summaries_to_json(rows: &[Summary]) -> Json {
578    Json::Array(rows.iter().map(|s| s.to_json()).collect())
579}
580
581// ───────────────────────────────────────────────────────────────
582// P1 API: append / alias_{set,list} / find
583// ───────────────────────────────────────────────────────────────
584
585/// Append new top-level fields to an existing Card.
586///
587/// Semantics: **additive only**. If any top-level key in `fields` already
588/// exists in the Card, the call fails — Cards are immutable w.r.t. existing
589/// data. New top-level keys are inserted and the Card file is rewritten
590/// atomically.
591///
592/// Returns the merged Card JSON.
593pub fn append_with_store(
594    store: &dyn CardStore,
595    card_id: &str,
596    fields: Json,
597) -> Result<Json, String> {
598    let text = store
599        .read_card_text(card_id)?
600        .ok_or_else(|| format!("alc.card.append: card '{card_id}' not found"))?;
601    let fields_obj = match fields {
602        Json::Object(m) => m,
603        _ => return Err("alc.card.append: fields must be a table".into()),
604    };
605
606    let existing: toml::Value =
607        toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
608    let mut existing_json = toml_to_json(existing);
609    let existing_obj = existing_json
610        .as_object_mut()
611        .ok_or_else(|| format!("Card '{card_id}' is not a table"))?;
612
613    for (k, v) in fields_obj {
614        if existing_obj.contains_key(&k) {
615            return Err(format!(
616                "alc.card.append: key '{k}' already set on card '{card_id}' (immutable)"
617            ));
618        }
619        if !v.is_null() {
620            existing_obj.insert(k, v);
621        }
622    }
623
624    let toml_val = json_to_toml(existing_json.clone())?;
625    let text = toml::to_string_pretty(&toml_val)
626        .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
627    store.overwrite_card(card_id, &text)?;
628
629    publish(CardEvent::Appended {
630        card_id: card_id.to_string(),
631        toml_text: text,
632    });
633
634    Ok(existing_json)
635}
636
637#[derive(Debug, Clone)]
638pub struct Alias {
639    pub name: String,
640    pub card_id: String,
641    pub pkg: Option<String>,
642    pub set_at: String,
643    pub note: Option<String>,
644}
645
646impl Alias {
647    fn to_json(&self) -> Json {
648        let mut m = serde_json::Map::new();
649        m.insert("name".into(), json!(self.name));
650        m.insert("card_id".into(), json!(self.card_id));
651        if let Some(p) = &self.pkg {
652            m.insert("pkg".into(), json!(p));
653        }
654        m.insert("set_at".into(), json!(self.set_at));
655        if let Some(n) = &self.note {
656            m.insert("note".into(), json!(n));
657        }
658        Json::Object(m)
659    }
660}
661
662/// Bind (or rebind) an alias to a Card in `store`.
663///
664/// Validates that `card_id` exists. If an alias with the same `name` already
665/// exists it is overwritten — the alias table is intentionally mutable even
666/// though the Cards themselves are not.
667pub fn alias_set_with_store(
668    store: &dyn CardStore,
669    name: &str,
670    card_id: &str,
671    pkg: Option<&str>,
672    note: Option<&str>,
673) -> Result<Alias, String> {
674    validate_name(name, "alias")?;
675    if store.find_card_locator(card_id)?.is_none() {
676        return Err(format!("alc.card.alias_set: card '{card_id}' not found"));
677    }
678    let mut aliases = store.read_aliases()?;
679    aliases.retain(|a| a.name != name);
680    let entry = Alias {
681        name: name.to_string(),
682        card_id: card_id.to_string(),
683        pkg: pkg.map(String::from),
684        set_at: now_rfc3339(),
685        note: note.map(String::from),
686    };
687    aliases.push(entry.clone());
688    store.write_aliases(&aliases)?;
689
690    // Mirror the full alias table to subscribers as TOML text. The
691    // primary FileCardStore already serialized it internally; we
692    // re-serialize here using the same shape so subscribers stay in
693    // byte-for-byte parity. A serialization failure is best-effort
694    // (log + skip).
695    match serialize_aliases_toml(&aliases) {
696        Ok(text) => publish(CardEvent::AliasesWritten { toml_text: text }),
697        Err(e) => tracing::warn!(error = %e, "alias_set: failed to serialize aliases for publish"),
698    }
699
700    Ok(entry)
701}
702
703/// Serialize `aliases` to a TOML document matching the primary
704/// `_aliases.toml` layout. Broken out so that subscribers can receive
705/// the exact byte-for-byte dump.
706fn serialize_aliases_toml(aliases: &[Alias]) -> Result<String, String> {
707    let mut arr = Vec::with_capacity(aliases.len());
708    for a in aliases {
709        let mut t = toml::map::Map::new();
710        t.insert("name".into(), toml::Value::String(a.name.clone()));
711        t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
712        if let Some(p) = &a.pkg {
713            t.insert("pkg".into(), toml::Value::String(p.clone()));
714        }
715        t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
716        if let Some(n) = &a.note {
717            t.insert("note".into(), toml::Value::String(n.clone()));
718        }
719        arr.push(toml::Value::Table(t));
720    }
721    let mut root = toml::map::Map::new();
722    root.insert("alias".into(), toml::Value::Array(arr));
723    toml::to_string_pretty(&toml::Value::Table(root))
724        .map_err(|e| format!("Failed to serialize aliases: {e}"))
725}
726
727/// Resolve an alias name to its bound Card and return the full Card JSON.
728///
729/// Shortcut for `alias_list → filter → get`. Returns `None` when the alias
730/// does not exist. Errors when the alias points at a missing Card — that
731/// would indicate a corrupt alias table (the target was deleted out of band).
732pub fn get_by_alias_with_store(store: &dyn CardStore, name: &str) -> Result<Option<Json>, String> {
733    validate_name(name, "alias")?;
734    let aliases = store.read_aliases()?;
735    let Some(alias) = aliases.into_iter().find(|a| a.name == name) else {
736        return Ok(None);
737    };
738    match get_with_store(store, &alias.card_id)? {
739        Some(card) => Ok(Some(card)),
740        None => Err(format!(
741            "alc.card.get_by_alias: alias '{name}' points at missing card '{}'",
742            alias.card_id
743        )),
744    }
745}
746
747/// List aliases from `store`, optionally filtered by pkg.
748pub fn alias_list_with_store(
749    store: &dyn CardStore,
750    pkg_filter: Option<&str>,
751) -> Result<Vec<Alias>, String> {
752    let mut aliases = store.read_aliases()?;
753    if let Some(p) = pkg_filter {
754        aliases.retain(|a| a.pkg.as_deref() == Some(p));
755    }
756    Ok(aliases)
757}
758
759pub fn aliases_to_json(rows: &[Alias]) -> Json {
760    Json::Array(rows.iter().map(|a| a.to_json()).collect())
761}
762
763// ═══════════════════════════════════════════════════════════════
764// Where DSL — Prisma/Mongo-style nested predicates
765// ═══════════════════════════════════════════════════════════════
766//
767// Syntax (JSON form, as received from Lua / MCP):
768//
769//   where = {
770//     pkg: "cot",                                      // implicit eq
771//     stats: { pass_rate: { gte: 0.8 }, n: { gte: 30 } },
772//     strategy_params: { temperature: { gte: 0.7 } },
773//     prior_card_id: { exists: true },
774//     _or: [ {...}, {...} ],                           // logical ops
775//     _not: { model: { id: "claude-haiku-4-5-20251001" } },
776//   }
777//
778// Semantics:
779//   * Multiple keys in the same object → implicit AND.
780//   * Nested object value → section (path extension).
781//   * Object whose every key is a reserved operator name → leaf operator
782//     object; applies the operators to the value at the current path.
783//   * Scalar/array value → implicit eq.
784//   * Reserved logical keys: `_and` / `_or` / `_not`.
785//   * Reserved operator keys: `eq ne lt lte gt gte in nin exists
786//     contains starts_with`.  Card schemas must not use these names as
787//     field names under any section.
788//
789// Missing-field comparison:
790//   * `eq/lt/lte/gt/gte/in/contains/starts_with` → false on missing
791//   * `ne/nin`                                   → true  on missing
792//   * `exists`                                   → explicit
793//
794
795/// Single comparison operator.
796#[derive(Debug, Clone, PartialEq)]
797pub enum CmpOp {
798    Eq,
799    Ne,
800    Lt,
801    Lte,
802    Gt,
803    Gte,
804    In,
805    Nin,
806    Exists,
807    Contains,
808    StartsWith,
809}
810
811impl CmpOp {
812    fn from_key(k: &str) -> Option<Self> {
813        Some(match k {
814            "eq" => Self::Eq,
815            "ne" => Self::Ne,
816            "lt" => Self::Lt,
817            "lte" => Self::Lte,
818            "gt" => Self::Gt,
819            "gte" => Self::Gte,
820            "in" => Self::In,
821            "nin" => Self::Nin,
822            "exists" => Self::Exists,
823            "contains" => Self::Contains,
824            "starts_with" => Self::StartsWith,
825            _ => return None,
826        })
827    }
828}
829
830/// One parsed comparison: `path` points at a nested field,
831/// `op` + `value` describe how to compare it.
832#[derive(Debug, Clone)]
833pub struct Comparison {
834    pub path: Vec<String>,
835    pub op: CmpOp,
836    pub value: Json,
837}
838
839/// Parsed predicate tree.
840#[derive(Debug, Clone)]
841pub enum Predicate {
842    And(Vec<Predicate>),
843    Or(Vec<Predicate>),
844    Not(Box<Predicate>),
845    Cmp(Comparison),
846}
847
848/// Is `obj` entirely composed of reserved operator keys?
849/// Empty objects return false (meaningless as an operator object).
850fn is_operator_object(obj: &serde_json::Map<String, Json>) -> bool {
851    if obj.is_empty() {
852        return false;
853    }
854    obj.keys().all(|k| CmpOp::from_key(k).is_some())
855}
856
857/// Parse a `where` JSON value into a `Predicate`.
858///
859/// `prefix` is the current nested-key path as we descend through
860/// section objects.
861pub fn parse_where(value: &Json) -> Result<Predicate, String> {
862    parse_predicate(value, &[])
863}
864
865fn parse_predicate(value: &Json, prefix: &[String]) -> Result<Predicate, String> {
866    let obj = value
867        .as_object()
868        .ok_or_else(|| "where clause must be a table".to_string())?;
869
870    let mut clauses: Vec<Predicate> = Vec::new();
871
872    for (key, val) in obj {
873        match key.as_str() {
874            "_and" => {
875                let arr = val
876                    .as_array()
877                    .ok_or_else(|| "_and must be an array of sub-predicates".to_string())?;
878                let mut subs = Vec::with_capacity(arr.len());
879                for sub in arr {
880                    subs.push(parse_predicate(sub, prefix)?);
881                }
882                clauses.push(Predicate::And(subs));
883            }
884            "_or" => {
885                let arr = val
886                    .as_array()
887                    .ok_or_else(|| "_or must be an array of sub-predicates".to_string())?;
888                let mut subs = Vec::with_capacity(arr.len());
889                for sub in arr {
890                    subs.push(parse_predicate(sub, prefix)?);
891                }
892                clauses.push(Predicate::Or(subs));
893            }
894            "_not" => {
895                clauses.push(Predicate::Not(Box::new(parse_predicate(val, prefix)?)));
896            }
897            _ => {
898                // Field key — extend the current path.
899                let mut new_path = prefix.to_vec();
900                new_path.push(key.clone());
901
902                match val {
903                    Json::Object(m) if is_operator_object(m) => {
904                        // Leaf: operator object at this path.
905                        for (op_key, op_val) in m {
906                            let op = CmpOp::from_key(op_key).expect("validated above");
907                            clauses.push(Predicate::Cmp(Comparison {
908                                path: new_path.clone(),
909                                op,
910                                value: op_val.clone(),
911                            }));
912                        }
913                    }
914                    Json::Object(_) => {
915                        // Nested section: recurse with extended path.
916                        clauses.push(parse_predicate(val, &new_path)?);
917                    }
918                    _ => {
919                        // Scalar/array: implicit eq.
920                        clauses.push(Predicate::Cmp(Comparison {
921                            path: new_path,
922                            op: CmpOp::Eq,
923                            value: val.clone(),
924                        }));
925                    }
926                }
927            }
928        }
929    }
930
931    if clauses.len() == 1 {
932        Ok(clauses.remove(0))
933    } else {
934        Ok(Predicate::And(clauses))
935    }
936}
937
938/// Fetch a nested value from a Card JSON by dotted path.
939fn fetch_path<'a>(card: &'a Json, path: &[String]) -> Option<&'a Json> {
940    let mut node = card;
941    for key in path {
942        let obj = node.as_object()?;
943        node = obj.get(key)?;
944    }
945    Some(node)
946}
947
948/// Compare two JSON scalars with a numeric/string/bool comparator.
949/// Returns None when the types aren't comparable.
950fn json_cmp(a: &Json, b: &Json) -> Option<std::cmp::Ordering> {
951    match (a, b) {
952        (Json::Number(x), Json::Number(y)) => {
953            let xf = x.as_f64()?;
954            let yf = y.as_f64()?;
955            xf.partial_cmp(&yf)
956        }
957        (Json::String(x), Json::String(y)) => Some(x.cmp(y)),
958        (Json::Bool(x), Json::Bool(y)) => Some(x.cmp(y)),
959        _ => None,
960    }
961}
962
963fn json_eq(a: &Json, b: &Json) -> bool {
964    match (a, b) {
965        (Json::Number(x), Json::Number(y)) => match (x.as_f64(), y.as_f64()) {
966            (Some(xf), Some(yf)) => xf == yf,
967            _ => a == b,
968        },
969        _ => a == b,
970    }
971}
972
973fn eval_cmp(cmp: &Comparison, card: &Json) -> bool {
974    let actual = fetch_path(card, &cmp.path);
975    let exists = actual.is_some();
976
977    match cmp.op {
978        CmpOp::Exists => {
979            let want = cmp.value.as_bool().unwrap_or(true);
980            exists == want
981        }
982        CmpOp::Ne => match actual {
983            None => true,
984            Some(v) => !json_eq(v, &cmp.value),
985        },
986        CmpOp::Nin => match actual {
987            None => true,
988            Some(v) => match cmp.value.as_array() {
989                Some(arr) => !arr.iter().any(|e| json_eq(e, v)),
990                None => false,
991            },
992        },
993        CmpOp::Eq => actual.is_some_and(|v| json_eq(v, &cmp.value)),
994        CmpOp::In => actual.is_some_and(|v| match cmp.value.as_array() {
995            Some(arr) => arr.iter().any(|e| json_eq(e, v)),
996            None => false,
997        }),
998        CmpOp::Lt | CmpOp::Lte | CmpOp::Gt | CmpOp::Gte => {
999            let Some(v) = actual else { return false };
1000            let Some(ord) = json_cmp(v, &cmp.value) else {
1001                return false;
1002            };
1003            use std::cmp::Ordering::{Equal, Greater, Less};
1004            matches!(
1005                (&cmp.op, ord),
1006                (CmpOp::Lt, Less)
1007                    | (CmpOp::Lte, Less | Equal)
1008                    | (CmpOp::Gt, Greater)
1009                    | (CmpOp::Gte, Greater | Equal)
1010            )
1011        }
1012        CmpOp::Contains => {
1013            let Some(Json::String(haystack)) = actual else {
1014                return false;
1015            };
1016            let Some(needle) = cmp.value.as_str() else {
1017                return false;
1018            };
1019            haystack.contains(needle)
1020        }
1021        CmpOp::StartsWith => {
1022            let Some(Json::String(haystack)) = actual else {
1023                return false;
1024            };
1025            let Some(needle) = cmp.value.as_str() else {
1026                return false;
1027            };
1028            haystack.starts_with(needle)
1029        }
1030    }
1031}
1032
1033/// Evaluate a predicate tree against a full Card JSON.
1034pub fn eval_predicate(pred: &Predicate, card: &Json) -> bool {
1035    match pred {
1036        Predicate::And(subs) => subs.iter().all(|p| eval_predicate(p, card)),
1037        Predicate::Or(subs) => subs.iter().any(|p| eval_predicate(p, card)),
1038        Predicate::Not(sub) => !eval_predicate(sub, card),
1039        Predicate::Cmp(c) => eval_cmp(c, card),
1040    }
1041}
1042
1043// ───────────────────────────────────────────────────────────────
1044// Order-by
1045// ───────────────────────────────────────────────────────────────
1046
1047/// Parsed sort key: path with optional descending flag.
1048#[derive(Debug, Clone)]
1049pub struct OrderKey {
1050    pub path: Vec<String>,
1051    pub desc: bool,
1052}
1053
1054impl OrderKey {
1055    fn parse(raw: &str) -> Result<Self, String> {
1056        if raw.is_empty() {
1057            return Err("order_by key must not be empty".into());
1058        }
1059        let (desc, rest) = if let Some(r) = raw.strip_prefix('-') {
1060            (true, r)
1061        } else {
1062            (false, raw)
1063        };
1064        let path: Vec<String> = rest.split('.').map(|s| s.to_string()).collect();
1065        if path.iter().any(|p| p.is_empty()) {
1066            return Err(format!("invalid order_by key: '{raw}'"));
1067        }
1068        Ok(Self { path, desc })
1069    }
1070}
1071
1072/// Parse an order_by JSON value.  Accepts:
1073///   - a string: `"stats.pass_rate"` or `"-stats.pass_rate"`
1074///   - an array of strings: `["-stats.pass_rate", "created_at"]`
1075pub fn parse_order_by(value: &Json) -> Result<Vec<OrderKey>, String> {
1076    match value {
1077        Json::String(s) => Ok(vec![OrderKey::parse(s)?]),
1078        Json::Array(arr) => {
1079            let mut out = Vec::with_capacity(arr.len());
1080            for v in arr {
1081                let s = v
1082                    .as_str()
1083                    .ok_or_else(|| "order_by array must contain strings".to_string())?;
1084                out.push(OrderKey::parse(s)?);
1085            }
1086            Ok(out)
1087        }
1088        _ => Err("order_by must be a string or array of strings".into()),
1089    }
1090}
1091
1092/// Query parameters for `find`.
1093#[derive(Debug, Default, Clone)]
1094pub struct FindQuery {
1095    /// Restrict scan to a single pkg subdir (I/O optimization).
1096    pub pkg: Option<String>,
1097    /// Prisma-style predicate tree.
1098    pub where_: Option<Predicate>,
1099    /// Sort keys (dotted paths, optional `-` prefix for desc).
1100    pub order_by: Vec<OrderKey>,
1101    pub limit: Option<usize>,
1102    pub offset: Option<usize>,
1103}
1104
1105/// A loaded Card row flowing through the find() pipeline.
1106///
1107/// `full` is the whole Card JSON (used by `where` evaluation and
1108/// `order_by` dotted-path lookup); `summary` is the projection
1109/// returned to callers.
1110#[derive(Debug, Clone)]
1111struct CardRow {
1112    full: Json,
1113    summary: Summary,
1114}
1115
1116/// Load a single Card file into a `CardRow`.
1117fn load_full(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<CardRow> {
1118    let text = store.read_locator_text(locator).ok().flatten()?;
1119    let val: toml::Value = toml::from_str(&text).ok()?;
1120    let json = toml_to_json(val);
1121
1122    let card_id = json
1123        .get("card_id")
1124        .and_then(|v| v.as_str())
1125        .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
1126        .to_string();
1127    let created_at = json
1128        .get("created_at")
1129        .and_then(|v| v.as_str())
1130        .map(String::from);
1131    let model = json
1132        .get("model")
1133        .and_then(|m| m.get("id"))
1134        .and_then(|v| v.as_str())
1135        .map(String::from);
1136    let scenario = json
1137        .get("scenario")
1138        .and_then(|s| s.get("name"))
1139        .and_then(|v| v.as_str())
1140        .map(String::from);
1141    let pass_rate = json
1142        .get("stats")
1143        .and_then(|s| s.get("pass_rate"))
1144        .and_then(|v| v.as_f64());
1145
1146    Some(CardRow {
1147        full: json,
1148        summary: Summary {
1149            card_id,
1150            pkg: pkg.to_string(),
1151            created_at,
1152            model,
1153            scenario,
1154            pass_rate,
1155        },
1156    })
1157}
1158
1159/// Compare two Card rows according to an ordered list of sort keys.
1160fn order_cards(a: &CardRow, b: &CardRow, keys: &[OrderKey]) -> std::cmp::Ordering {
1161    use std::cmp::Ordering;
1162    for k in keys {
1163        let va = fetch_path(&a.full, &k.path);
1164        let vb = fetch_path(&b.full, &k.path);
1165        let ord = match (va, vb) {
1166            (None, None) => Ordering::Equal,
1167            (None, Some(_)) => Ordering::Greater, // missing sorts last
1168            (Some(_), None) => Ordering::Less,
1169            (Some(x), Some(y)) => json_cmp(x, y).unwrap_or(Ordering::Equal),
1170        };
1171        let ord = if k.desc { ord.reverse() } else { ord };
1172        if ord != Ordering::Equal {
1173            return ord;
1174        }
1175    }
1176    Ordering::Equal
1177}
1178
1179/// Summary-only fields that can be sorted without loading full TOML.
1180const SUMMARY_SORT_FIELDS: &[&str] = &[
1181    "card_id",
1182    "created_at",
1183    "stats.pass_rate",
1184    "scenario.name",
1185    "model.id",
1186];
1187
1188/// Return true when the query can be answered with lightweight Summary
1189/// rows (no full-TOML load needed).
1190fn is_lightweight_query(q: &FindQuery) -> bool {
1191    q.where_.is_none()
1192        && q.order_by
1193            .iter()
1194            .all(|k| SUMMARY_SORT_FIELDS.contains(&k.path.join(".").as_str()))
1195}
1196
1197/// Sort Summary rows using order_by keys that map to Summary fields.
1198fn order_summaries(a: &Summary, b: &Summary, keys: &[OrderKey]) -> std::cmp::Ordering {
1199    use std::cmp::Ordering;
1200    for k in keys {
1201        let key_str = k.path.join(".");
1202        let ord = match key_str.as_str() {
1203            "card_id" => a.card_id.cmp(&b.card_id),
1204            "created_at" => a.created_at.cmp(&b.created_at),
1205            "stats.pass_rate" => match (a.pass_rate, b.pass_rate) {
1206                (None, None) => Ordering::Equal,
1207                (None, Some(_)) => Ordering::Greater,
1208                (Some(_), None) => Ordering::Less,
1209                (Some(x), Some(y)) => x.partial_cmp(&y).unwrap_or(Ordering::Equal),
1210            },
1211            "scenario.name" => a.scenario.cmp(&b.scenario),
1212            "model.id" => a.model.cmp(&b.model),
1213            _ => Ordering::Equal,
1214        };
1215        let ord = if k.desc { ord.reverse() } else { ord };
1216        if ord != Ordering::Equal {
1217            return ord;
1218        }
1219    }
1220    Ordering::Equal
1221}
1222
1223/// Filter/sort Cards across the store using the `where` DSL.
1224///
1225/// When no `where` clause is specified and `order_by` only references
1226/// summary-level fields, uses the lightweight `list_with_store` path to
1227/// avoid loading full TOML.  Otherwise loads full TOML per Card.
1228pub fn find_with_store(store: &dyn CardStore, q: FindQuery) -> Result<Vec<Summary>, String> {
1229    // Fast path: lightweight query, no full-TOML load needed.
1230    if is_lightweight_query(&q) {
1231        let mut rows = list_with_store(store, q.pkg.as_deref())?;
1232        if q.order_by.is_empty() {
1233            rows.sort_by(|a, b| {
1234                b.created_at
1235                    .cmp(&a.created_at)
1236                    .then_with(|| b.card_id.cmp(&a.card_id))
1237            });
1238        } else {
1239            rows.sort_by(|a, b| order_summaries(a, b, &q.order_by));
1240        }
1241        let out: Vec<Summary> = rows
1242            .into_iter()
1243            .skip(q.offset.unwrap_or(0))
1244            .take(q.limit.unwrap_or(usize::MAX))
1245            .collect();
1246        return Ok(out);
1247    }
1248
1249    // Full path: load entire TOML for where evaluation / arbitrary order_by.
1250    let all_rows = scan_cards(store, q.pkg.as_deref())?;
1251
1252    // Filter by where.
1253    let mut rows: Vec<CardRow> = if let Some(pred) = &q.where_ {
1254        all_rows
1255            .into_iter()
1256            .filter(|row| eval_predicate(pred, &row.full))
1257            .collect()
1258    } else {
1259        all_rows
1260    };
1261
1262    // Sort.
1263    if q.order_by.is_empty() {
1264        rows.sort_by(|a, b| {
1265            b.summary
1266                .created_at
1267                .cmp(&a.summary.created_at)
1268                .then_with(|| b.summary.card_id.cmp(&a.summary.card_id))
1269        });
1270    } else {
1271        rows.sort_by(|a, b| order_cards(a, b, &q.order_by));
1272    }
1273
1274    // Offset + limit.
1275    let out: Vec<Summary> = rows
1276        .into_iter()
1277        .skip(q.offset.unwrap_or(0))
1278        .take(q.limit.unwrap_or(usize::MAX))
1279        .map(|r| r.summary)
1280        .collect();
1281
1282    Ok(out)
1283}
1284
1285// ───────────────────────────────────────────────────────────────
1286// Lineage walker
1287// ───────────────────────────────────────────────────────────────
1288//
1289// Cards form a tree (typically, not strictly a DAG) via the
1290// `metadata.prior_card_id` convention. `lineage()` walks that tree
1291// either up (toward ancestors) or down (toward descendants) or both,
1292// up to a configurable depth, optionally filtered by `prior_relation`.
1293//
1294// Up-walk is O(depth) — each step reads one parent Card.
1295// Down-walk is O(N_cards × depth) — we scan the whole store at each
1296// level. For the current scale (hundreds to low thousands of cards)
1297// this is fine; if the store grows we can build a prior_card_id index.
1298
1299/// Walk direction for `lineage`.
1300#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1301pub enum LineageDirection {
1302    #[default]
1303    Up,
1304    Down,
1305    Both,
1306}
1307
1308impl LineageDirection {
1309    pub fn parse(s: &str) -> Result<Self, String> {
1310        match s {
1311            "up" => Ok(Self::Up),
1312            "down" => Ok(Self::Down),
1313            "both" => Ok(Self::Both),
1314            other => Err(format!(
1315                "direction must be 'up', 'down', or 'both' (got '{other}')"
1316            )),
1317        }
1318    }
1319}
1320
1321/// Query parameters for `lineage`.
1322#[derive(Debug, Clone, Default)]
1323pub struct LineageQuery {
1324    pub card_id: String,
1325    pub direction: LineageDirection,
1326    /// Max traversal depth. Default 10.
1327    pub depth: Option<usize>,
1328    /// Include a per-node `stats` field (full [stats] section).
1329    pub include_stats: bool,
1330    /// If set, only edges whose `prior_relation` is in this list are
1331    /// followed.  The root is always included regardless.
1332    pub relation_filter: Option<Vec<String>>,
1333}
1334
1335/// One node in the lineage result.
1336///
1337/// `depth` is the signed distance from the root: negative for
1338/// ancestors (up-walk), 0 for the root, positive for descendants.
1339#[derive(Debug, Clone)]
1340pub struct LineageNode {
1341    pub card_id: String,
1342    pub pkg: String,
1343    pub prior_card_id: Option<String>,
1344    pub prior_relation: Option<String>,
1345    pub depth: i32,
1346    pub stats: Option<Json>,
1347}
1348
1349/// One edge in the lineage result (child → parent, always).
1350#[derive(Debug, Clone)]
1351pub struct LineageEdge {
1352    pub from: String,
1353    pub to: String,
1354    pub relation: Option<String>,
1355}
1356
1357/// Full lineage walk result.
1358#[derive(Debug, Clone)]
1359pub struct LineageResult {
1360    pub root: String,
1361    pub nodes: Vec<LineageNode>,
1362    pub edges: Vec<LineageEdge>,
1363    pub truncated: bool,
1364}
1365
1366const DEFAULT_LINEAGE_DEPTH: usize = 10;
1367
1368/// Extract the lineage fields from a Card JSON.
1369/// Returns (prior_card_id, prior_relation).
1370fn lineage_fields(card: &Json) -> (Option<String>, Option<String>) {
1371    let meta = card.get("metadata");
1372    let prior_card_id = meta
1373        .and_then(|m| m.get("prior_card_id"))
1374        .and_then(|v| v.as_str())
1375        .map(String::from);
1376    let prior_relation = meta
1377        .and_then(|m| m.get("prior_relation"))
1378        .and_then(|v| v.as_str())
1379        .map(String::from);
1380    (prior_card_id, prior_relation)
1381}
1382
1383/// Build a LineageNode from a loaded CardRow at a given depth.
1384fn make_node(row: &CardRow, depth: i32, include_stats: bool) -> LineageNode {
1385    let (prior_card_id, prior_relation) = lineage_fields(&row.full);
1386    let stats = if include_stats {
1387        row.full.get("stats").cloned()
1388    } else {
1389        None
1390    };
1391    LineageNode {
1392        card_id: row.summary.card_id.clone(),
1393        pkg: row.summary.pkg.clone(),
1394        prior_card_id,
1395        prior_relation,
1396        depth,
1397        stats,
1398    }
1399}
1400
1401/// Check whether `relation` passes the relation_filter (None means no
1402/// filter, which always passes).
1403fn relation_passes(filter: &Option<Vec<String>>, relation: &Option<String>) -> bool {
1404    match filter {
1405        None => true,
1406        Some(allowed) => match relation {
1407            Some(r) => allowed.iter().any(|a| a == r),
1408            None => false,
1409        },
1410    }
1411}
1412
1413/// Full in-memory card index with forward and reverse lineage maps.
1414struct CardIndex {
1415    /// card_id → CardRow
1416    cards: std::collections::HashMap<String, CardRow>,
1417    /// parent card_id → Vec<child card_id> (reverse lineage index)
1418    children: std::collections::HashMap<String, Vec<String>>,
1419}
1420
1421/// Load all Cards in the store once, keyed by card_id.
1422/// Also builds a reverse index (parent → children) so that
1423/// `walk_down` is O(result_size) instead of O(N_cards × depth).
1424fn load_card_index(store: &dyn CardStore) -> Result<CardIndex, String> {
1425    let rows = scan_cards(store, None)?;
1426
1427    let mut cards = std::collections::HashMap::with_capacity(rows.len());
1428    let mut children: std::collections::HashMap<String, Vec<String>> =
1429        std::collections::HashMap::new();
1430
1431    for row in rows {
1432        let id = row.summary.card_id.clone();
1433        let (prior_id, _) = lineage_fields(&row.full);
1434        if let Some(parent) = prior_id {
1435            children.entry(parent).or_default().push(id.clone());
1436        }
1437        cards.insert(id, row);
1438    }
1439    Ok(CardIndex { cards, children })
1440}
1441
1442/// Scan all Cards in the store, loading full TOML for each. When
1443/// `pkg_filter` is provided, only that pkg subdir is scanned. Shared
1444/// between `find` and `load_card_index`.
1445fn scan_cards(store: &dyn CardStore, pkg_filter: Option<&str>) -> Result<Vec<CardRow>, String> {
1446    let locators = store.list_card_locators(pkg_filter)?;
1447    let mut rows = Vec::with_capacity(locators.len());
1448    for (pkg, loc) in &locators {
1449        if let Some(row) = load_full(store, loc, pkg) {
1450            rows.push(row);
1451        }
1452    }
1453    Ok(rows)
1454}
1455
1456/// Invariant context passed through the lineage walkers.
1457struct LineageCtx<'a> {
1458    index: &'a CardIndex,
1459    relation_filter: &'a Option<Vec<String>>,
1460    include_stats: bool,
1461    max_depth: usize,
1462}
1463
1464/// Mutable accumulator for one lineage walk.
1465struct LineageAccum {
1466    nodes: Vec<LineageNode>,
1467    edges: Vec<LineageEdge>,
1468    visited: std::collections::HashSet<String>,
1469    truncated: bool,
1470}
1471
1472/// Walk ancestors via `metadata.prior_card_id`.
1473fn walk_up(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1474    let mut cur = start_id.to_string();
1475    for step in 1..=ctx.max_depth {
1476        let Some(row) = ctx.index.cards.get(&cur) else {
1477            return;
1478        };
1479        let (prior_id, prior_rel) = lineage_fields(&row.full);
1480        let Some(prior_id) = prior_id else {
1481            return;
1482        };
1483        if !relation_passes(ctx.relation_filter, &prior_rel) {
1484            return;
1485        }
1486        if acc.visited.contains(&prior_id) {
1487            return;
1488        }
1489        let Some(parent) = ctx.index.cards.get(&prior_id) else {
1490            return;
1491        };
1492        acc.nodes
1493            .push(make_node(parent, -(step as i32), ctx.include_stats));
1494        acc.edges.push(LineageEdge {
1495            from: row.summary.card_id.clone(),
1496            to: parent.summary.card_id.clone(),
1497            relation: prior_rel,
1498        });
1499        acc.visited.insert(prior_id.clone());
1500        cur = prior_id;
1501    }
1502    // Depth exhausted but another unwalked parent exists → truncated.
1503    if let Some(row) = ctx.index.cards.get(&cur) {
1504        let (prior_id, _) = lineage_fields(&row.full);
1505        if prior_id
1506            .as_ref()
1507            .is_some_and(|p| ctx.index.cards.contains_key(p) && !acc.visited.contains(p))
1508        {
1509            acc.truncated = true;
1510        }
1511    }
1512}
1513
1514/// Walk descendants using the reverse index (parent → children),
1515/// breadth-first.  O(result_size) instead of O(N_cards × depth).
1516fn walk_down(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1517    let mut frontier: Vec<String> = vec![start_id.to_string()];
1518
1519    for depth in 1..=ctx.max_depth {
1520        let mut next_frontier: Vec<String> = Vec::new();
1521        for parent_id in &frontier {
1522            let children = match ctx.index.children.get(parent_id) {
1523                Some(c) => c,
1524                None => continue,
1525            };
1526            for child_id in children {
1527                if acc.visited.contains(child_id) {
1528                    continue;
1529                }
1530                let Some(child) = ctx.index.cards.get(child_id) else {
1531                    continue;
1532                };
1533                let (_, prior_rel) = lineage_fields(&child.full);
1534                if !relation_passes(ctx.relation_filter, &prior_rel) {
1535                    continue;
1536                }
1537                acc.nodes
1538                    .push(make_node(child, depth as i32, ctx.include_stats));
1539                acc.edges.push(LineageEdge {
1540                    from: child.summary.card_id.clone(),
1541                    to: parent_id.clone(),
1542                    relation: prior_rel,
1543                });
1544                acc.visited.insert(child_id.clone());
1545                next_frontier.push(child_id.clone());
1546            }
1547        }
1548        if next_frontier.is_empty() {
1549            return;
1550        }
1551        frontier = next_frontier;
1552    }
1553    // Frontier still has nodes but depth is exhausted: check for
1554    // unwalked children at the next level.
1555    for parent_id in &frontier {
1556        let children = match ctx.index.children.get(parent_id) {
1557            Some(c) => c,
1558            None => continue,
1559        };
1560        for child_id in children {
1561            if acc.visited.contains(child_id) {
1562                continue;
1563            }
1564            let Some(child) = ctx.index.cards.get(child_id) else {
1565                continue;
1566            };
1567            let (_, prior_rel) = lineage_fields(&child.full);
1568            if relation_passes(ctx.relation_filter, &prior_rel) {
1569                acc.truncated = true;
1570                return;
1571            }
1572        }
1573    }
1574}
1575
1576/// Walk the lineage tree from `q.card_id` in `store`.
1577pub fn lineage_with_store(
1578    store: &dyn CardStore,
1579    q: LineageQuery,
1580) -> Result<Option<LineageResult>, String> {
1581    let index = load_card_index(store)?;
1582    let Some(root_row) = index.cards.get(&q.card_id) else {
1583        return Ok(None);
1584    };
1585
1586    let ctx = LineageCtx {
1587        index: &index,
1588        relation_filter: &q.relation_filter,
1589        include_stats: q.include_stats,
1590        max_depth: q.depth.unwrap_or(DEFAULT_LINEAGE_DEPTH),
1591    };
1592    let mut acc = LineageAccum {
1593        nodes: Vec::new(),
1594        edges: Vec::new(),
1595        visited: std::collections::HashSet::new(),
1596        truncated: false,
1597    };
1598
1599    acc.nodes.push(make_node(root_row, 0, q.include_stats));
1600    acc.visited.insert(q.card_id.clone());
1601
1602    if matches!(q.direction, LineageDirection::Up | LineageDirection::Both) {
1603        walk_up(&q.card_id, &ctx, &mut acc);
1604    }
1605    if matches!(q.direction, LineageDirection::Down | LineageDirection::Both) {
1606        walk_down(&q.card_id, &ctx, &mut acc);
1607    }
1608
1609    Ok(Some(LineageResult {
1610        root: q.card_id,
1611        nodes: acc.nodes,
1612        edges: acc.edges,
1613        truncated: acc.truncated,
1614    }))
1615}
1616
1617/// Render a LineageResult as JSON for the service layer.
1618pub fn lineage_to_json(r: &LineageResult) -> Json {
1619    let nodes: Vec<Json> = r
1620        .nodes
1621        .iter()
1622        .map(|n| {
1623            let mut m = serde_json::Map::new();
1624            m.insert("card_id".into(), json!(n.card_id));
1625            m.insert("pkg".into(), json!(n.pkg));
1626            m.insert("depth".into(), json!(n.depth));
1627            if let Some(p) = &n.prior_card_id {
1628                m.insert("prior_card_id".into(), json!(p));
1629            }
1630            if let Some(rel) = &n.prior_relation {
1631                m.insert("prior_relation".into(), json!(rel));
1632            }
1633            if let Some(s) = &n.stats {
1634                m.insert("stats".into(), s.clone());
1635            }
1636            Json::Object(m)
1637        })
1638        .collect();
1639    let edges: Vec<Json> = r
1640        .edges
1641        .iter()
1642        .map(|e| {
1643            let mut m = serde_json::Map::new();
1644            m.insert("from".into(), json!(e.from));
1645            m.insert("to".into(), json!(e.to));
1646            if let Some(rel) = &e.relation {
1647                m.insert("relation".into(), json!(rel));
1648            }
1649            Json::Object(m)
1650        })
1651        .collect();
1652    json!({
1653        "root": r.root,
1654        "nodes": nodes,
1655        "edges": edges,
1656        "truncated": r.truncated,
1657    })
1658}
1659
1660// ───────────────────────────────────────────────────────────────
1661// Samples sidecar: per-case detail written alongside a Card as
1662// `{pkg}/{card_id}.samples.jsonl`. Write-once to preserve Card
1663// immutability: once a Card has a samples file, it cannot be
1664// rewritten — mismatched per-case data would break auditability.
1665// ───────────────────────────────────────────────────────────────
1666
1667// ───────────────────────────────────────────────────────────────
1668// Card import: copy Card files from an external directory into the
1669// local cards store. Used by `alc_card_install` (Card Collections)
1670// and by `alc_pkg_install` (Pkg-bundled cards/).
1671// ───────────────────────────────────────────────────────────────
1672
1673/// Import Card files into `store` from `source_dir` under `pkg`.
1674///
1675/// Copies `*.toml` and `*.samples.jsonl` files. Existing cards with the
1676/// same id are skipped (first-writer wins — Card immutability).
1677///
1678/// Returns `(imported, skipped)` card_id lists.
1679pub fn import_from_dir_with_store(
1680    store: &dyn CardStore,
1681    source_dir: &std::path::Path,
1682    pkg: &str,
1683) -> Result<(Vec<String>, Vec<String>), String> {
1684    let (imported, skipped) = store.import_from_dir(source_dir, pkg)?;
1685    for card_id in &imported {
1686        match store.read_card_text(card_id) {
1687            Ok(Some(toml_text)) => publish(CardEvent::Created {
1688                pkg: pkg.to_string(),
1689                card_id: card_id.clone(),
1690                toml_text,
1691            }),
1692            Ok(None) => {
1693                tracing::warn!(
1694                    card_id = %card_id,
1695                    "import_from_dir: read_card_text returned None after import; skipping publish"
1696                );
1697            }
1698            Err(e) => {
1699                tracing::warn!(
1700                    card_id = %card_id,
1701                    error = %e,
1702                    "import_from_dir: read_card_text failed after import; skipping publish"
1703                );
1704            }
1705        }
1706        // Samples are optional — best-effort.
1707        match store.read_samples_text(card_id) {
1708            Ok(Some(jsonl_text)) => publish(CardEvent::SamplesWritten {
1709                card_id: card_id.clone(),
1710                jsonl_text,
1711            }),
1712            Ok(None) => {}
1713            Err(e) => {
1714                tracing::warn!(
1715                    card_id = %card_id,
1716                    error = %e,
1717                    "import_from_dir: read_samples_text failed after import; skipping publish"
1718                );
1719            }
1720        }
1721    }
1722    Ok((imported, skipped))
1723}
1724
1725/// Write per-case samples to `{card_id}.samples.jsonl` (write-once).
1726///
1727/// Each `samples` entry is serialized as one compact JSON line.
1728/// Fails if a samples file already exists for this card — mirrors
1729/// the immutability guarantee of Cards themselves.
1730pub fn write_samples_with_store(
1731    store: &dyn CardStore,
1732    card_id: &str,
1733    samples: Vec<Json>,
1734) -> Result<PathBuf, String> {
1735    if store.samples_exists(card_id)? {
1736        return Err(format!(
1737            "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
1738        ));
1739    }
1740    let mut buf = String::new();
1741    for (idx, s) in samples.iter().enumerate() {
1742        let line = serde_json::to_string(s).map_err(|e| {
1743            format!("alc.card.write_samples: failed to serialize sample #{idx}: {e}")
1744        })?;
1745        buf.push_str(&line);
1746        buf.push('\n');
1747    }
1748    let path = store.write_samples_text(card_id, &buf)?;
1749
1750    publish(CardEvent::SamplesWritten {
1751        card_id: card_id.to_string(),
1752        jsonl_text: buf,
1753    });
1754
1755    Ok(path)
1756}
1757
1758/// Query parameters for `read_samples`.
1759#[derive(Debug, Default, Clone)]
1760pub struct SamplesQuery {
1761    /// Skip this many matched rows (after `where` filtering).
1762    pub offset: usize,
1763    /// Max matched rows to return.
1764    pub limit: Option<usize>,
1765    /// Optional `where` predicate applied to each sample row.
1766    /// The row JSON is the full line object (no section wrapping).
1767    pub where_: Option<Predicate>,
1768}
1769
1770/// Read per-case samples from `{card_id}.samples.jsonl`.
1771///
1772/// Streams the JSONL file line by line; rows are parsed, optionally
1773/// filtered by `q.where_`, then paged by `offset` + `limit`.  Offset
1774/// applies to the **post-filter** stream, matching Prisma/SQL
1775/// semantics.
1776///
1777/// Returns an empty Vec if no samples file exists (Cards without
1778/// per-case details are the common case, not an error).
1779pub fn read_samples_with_store(
1780    store: &dyn CardStore,
1781    card_id: &str,
1782    q: SamplesQuery,
1783) -> Result<Vec<Json>, String> {
1784    let text = match store.read_samples_text(card_id)? {
1785        Some(t) => t,
1786        None => return Ok(Vec::new()),
1787    };
1788    let mut matched: usize = 0;
1789    let mut out = Vec::new();
1790    for (i, line) in text.lines().enumerate() {
1791        if line.trim().is_empty() {
1792            continue;
1793        }
1794        let val: Json = serde_json::from_str(line)
1795            .map_err(|e| format!("Failed to parse sample line {i}: {e}"))?;
1796        if let Some(pred) = &q.where_ {
1797            if !eval_predicate(pred, &val) {
1798                continue;
1799            }
1800        }
1801        if matched < q.offset {
1802            matched += 1;
1803            continue;
1804        }
1805        if let Some(lim) = q.limit {
1806            if out.len() >= lim {
1807                break;
1808            }
1809        }
1810        matched += 1;
1811        out.push(val);
1812    }
1813    Ok(out)
1814}
1815
1816// ═══════════════════════════════════════════════════════════════
1817// FileCardStore — default backend.
1818// ═══════════════════════════════════════════════════════════════
1819//
1820// Stores Cards as TOML files under `{root}/{pkg}/{card_id}.toml`,
1821// samples as `{root}/{pkg}/{card_id}.samples.jsonl`, and the alias
1822// table as `{root}/_aliases.toml`.
1823//
1824// `root` is provided at construction time via `new(root)`; callers
1825// (typically the service layer) resolve it from the `AppDir`
1826// abstraction. Tests use a tempdir via `new(tmpdir)`.
1827
1828/// File-backed implementation of [`CardStore`].
1829pub struct FileCardStore {
1830    root: PathBuf,
1831}
1832
1833impl FileCardStore {
1834    /// Construct a store rooted at an explicit path.
1835    pub fn new(root: PathBuf) -> Self {
1836        Self { root }
1837    }
1838
1839    /// Return the root directory this store writes under.
1840    pub fn root(&self) -> &Path {
1841        &self.root
1842    }
1843
1844    // ─── Thin `self` delegations to the `*_with_store` free fns ────
1845    //
1846    // These let callers that hold an `Arc<FileCardStore>` (bridge/data
1847    // register_card closures, service layer) invoke domain logic via
1848    // instance methods without re-importing the `_with_store` free
1849    // functions. Semantics are identical to the free-fn variants.
1850
1851    pub fn create(&self, input: Json) -> Result<(String, PathBuf), String> {
1852        create_with_store(self, input)
1853    }
1854
1855    pub fn get(&self, card_id: &str) -> Result<Option<Json>, String> {
1856        get_with_store(self, card_id)
1857    }
1858
1859    pub fn list(&self, pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
1860        list_with_store(self, pkg_filter)
1861    }
1862
1863    pub fn append(&self, card_id: &str, fields: Json) -> Result<Json, String> {
1864        append_with_store(self, card_id, fields)
1865    }
1866
1867    pub fn alias_set(
1868        &self,
1869        name: &str,
1870        card_id: &str,
1871        pkg: Option<&str>,
1872        note: Option<&str>,
1873    ) -> Result<Alias, String> {
1874        alias_set_with_store(self, name, card_id, pkg, note)
1875    }
1876
1877    pub fn alias_list(&self, pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
1878        alias_list_with_store(self, pkg_filter)
1879    }
1880
1881    pub fn get_by_alias(&self, name: &str) -> Result<Option<Json>, String> {
1882        get_by_alias_with_store(self, name)
1883    }
1884
1885    pub fn find(&self, q: FindQuery) -> Result<Vec<Summary>, String> {
1886        find_with_store(self, q)
1887    }
1888
1889    pub fn write_samples(&self, card_id: &str, samples: Vec<Json>) -> Result<PathBuf, String> {
1890        write_samples_with_store(self, card_id, samples)
1891    }
1892
1893    pub fn read_samples(&self, card_id: &str, q: SamplesQuery) -> Result<Vec<Json>, String> {
1894        read_samples_with_store(self, card_id, q)
1895    }
1896
1897    pub fn lineage(&self, q: LineageQuery) -> Result<Option<LineageResult>, String> {
1898        lineage_with_store(self, q)
1899    }
1900
1901    pub fn card_sink_backfill(
1902        &self,
1903        sink: &str,
1904        dry_run: bool,
1905    ) -> Result<SinkBackfillReport, String> {
1906        card_sink_backfill_with_store(self, sink, dry_run)
1907    }
1908
1909    /// Returns the absolute path to the per-pkg subdirectory,
1910    /// creating it when missing. Validates `pkg` to prevent path
1911    /// traversal.
1912    fn pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
1913        validate_name(pkg, "pkg")?;
1914        let dir = self.root.join(pkg);
1915        if !dir.exists() {
1916            fs::create_dir_all(&dir).map_err(|e| format!("Failed to create pkg dir: {e}"))?;
1917        }
1918        Ok(dir)
1919    }
1920
1921    /// Path of the global alias table.
1922    fn aliases_path(&self) -> PathBuf {
1923        self.root.join("_aliases.toml")
1924    }
1925
1926    /// Path of the samples sidecar for `card_id`. Errors if the
1927    /// Card itself does not exist — samples without a parent Card
1928    /// are meaningless and we refuse to create orphans.
1929    fn samples_path(&self, card_id: &str) -> Result<PathBuf, String> {
1930        let card_path = self
1931            .find_card_locator(card_id)?
1932            .ok_or_else(|| format!("card '{card_id}' not found"))?;
1933        let dir = card_path
1934            .parent()
1935            .ok_or_else(|| format!("card '{card_id}' has no parent directory"))?;
1936        Ok(dir.join(format!("{card_id}.samples.jsonl")))
1937    }
1938}
1939
1940impl CardStore for FileCardStore {
1941    fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1942        let dir = self.pkg_dir(pkg)?;
1943        let path = dir.join(format!("{card_id}.toml"));
1944        if path.exists() {
1945            return Err(format!(
1946                "alc.card.create: card '{card_id}' already exists (immutable)"
1947            ));
1948        }
1949        let tmp = path.with_extension("toml.tmp");
1950        fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1951        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1952        Ok(path)
1953    }
1954
1955    fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1956        let path = self
1957            .find_card_locator(card_id)?
1958            .ok_or_else(|| format!("alc.card.overwrite: card '{card_id}' not found"))?;
1959        let tmp = path.with_extension("toml.tmp");
1960        fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1961        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1962        Ok(path)
1963    }
1964
1965    fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
1966        validate_name(card_id, "card_id")?;
1967        if !self.root.exists() {
1968            return Ok(None);
1969        }
1970        let entries =
1971            fs::read_dir(&self.root).map_err(|e| format!("Failed to read cards dir: {e}"))?;
1972        for entry in entries.flatten() {
1973            let p = entry.path();
1974            if p.is_dir() {
1975                let candidate = p.join(format!("{card_id}.toml"));
1976                if candidate.exists() {
1977                    return Ok(Some(candidate));
1978                }
1979            }
1980        }
1981        Ok(None)
1982    }
1983
1984    fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String> {
1985        let Some(path) = self.find_card_locator(card_id)? else {
1986            return Ok(None);
1987        };
1988        let text = fs::read_to_string(&path)
1989            .map_err(|e| format!("Failed to read card '{card_id}': {e}"))?;
1990        Ok(Some(text))
1991    }
1992
1993    fn list_card_locators(
1994        &self,
1995        pkg_filter: Option<&str>,
1996    ) -> Result<Vec<(String, PathBuf)>, String> {
1997        if !self.root.exists() {
1998            return Ok(Vec::new());
1999        }
2000        let pkg_dirs: Vec<PathBuf> = if let Some(p) = pkg_filter {
2001            validate_name(p, "pkg")?;
2002            let d = self.root.join(p);
2003            if d.is_dir() {
2004                vec![d]
2005            } else {
2006                return Ok(Vec::new());
2007            }
2008        } else {
2009            fs::read_dir(&self.root)
2010                .map_err(|e| format!("Failed to read cards dir: {e}"))?
2011                .flatten()
2012                .map(|e| e.path())
2013                .filter(|p| p.is_dir())
2014                .collect()
2015        };
2016
2017        let mut out = Vec::new();
2018        for pdir in pkg_dirs {
2019            let pkg = pdir
2020                .file_name()
2021                .and_then(|s| s.to_str())
2022                .unwrap_or("")
2023                .to_string();
2024            let entries = match fs::read_dir(&pdir) {
2025                Ok(e) => e,
2026                Err(_) => continue,
2027            };
2028            for entry in entries.flatten() {
2029                let p = entry.path();
2030                if p.extension().and_then(|s| s.to_str()) != Some("toml") {
2031                    continue;
2032                }
2033                out.push((pkg.clone(), p));
2034            }
2035        }
2036        Ok(out)
2037    }
2038
2039    fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String> {
2040        match fs::read_to_string(locator) {
2041            Ok(text) => Ok(Some(text)),
2042            Err(_) => Ok(None),
2043        }
2044    }
2045
2046    fn read_aliases(&self) -> Result<Vec<Alias>, String> {
2047        let path = self.aliases_path();
2048        if !path.exists() {
2049            return Ok(Vec::new());
2050        }
2051        let text =
2052            fs::read_to_string(&path).map_err(|e| format!("Failed to read aliases file: {e}"))?;
2053        let val: toml::Value =
2054            toml::from_str(&text).map_err(|e| format!("Failed to parse aliases file: {e}"))?;
2055        let arr = val
2056            .get("alias")
2057            .and_then(|v| v.as_array())
2058            .cloned()
2059            .unwrap_or_default();
2060        let mut out = Vec::with_capacity(arr.len());
2061        for entry in arr {
2062            let t = match entry {
2063                toml::Value::Table(t) => t,
2064                _ => continue,
2065            };
2066            let name = match t.get("name").and_then(|v| v.as_str()) {
2067                Some(s) => s.to_string(),
2068                None => continue,
2069            };
2070            let card_id = match t.get("card_id").and_then(|v| v.as_str()) {
2071                Some(s) => s.to_string(),
2072                None => continue,
2073            };
2074            out.push(Alias {
2075                name,
2076                card_id,
2077                pkg: t.get("pkg").and_then(|v| v.as_str()).map(String::from),
2078                set_at: t
2079                    .get("set_at")
2080                    .and_then(|v| v.as_str())
2081                    .map(String::from)
2082                    .unwrap_or_default(),
2083                note: t.get("note").and_then(|v| v.as_str()).map(String::from),
2084            });
2085        }
2086        Ok(out)
2087    }
2088
2089    fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String> {
2090        // Ensure the cards root exists so aliases can be written to
2091        // a brand-new store (mirrors the behaviour of `cards_dir()`).
2092        if !self.root.exists() {
2093            fs::create_dir_all(&self.root)
2094                .map_err(|e| format!("Failed to create cards dir: {e}"))?;
2095        }
2096        let path = self.aliases_path();
2097        let mut arr = Vec::with_capacity(aliases.len());
2098        for a in aliases {
2099            let mut t = toml::map::Map::new();
2100            t.insert("name".into(), toml::Value::String(a.name.clone()));
2101            t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
2102            if let Some(p) = &a.pkg {
2103                t.insert("pkg".into(), toml::Value::String(p.clone()));
2104            }
2105            t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
2106            if let Some(n) = &a.note {
2107                t.insert("note".into(), toml::Value::String(n.clone()));
2108            }
2109            arr.push(toml::Value::Table(t));
2110        }
2111        let mut root = toml::map::Map::new();
2112        root.insert("alias".into(), toml::Value::Array(arr));
2113        let text = toml::to_string_pretty(&toml::Value::Table(root))
2114            .map_err(|e| format!("Failed to serialize aliases: {e}"))?;
2115        let tmp = path.with_extension("toml.tmp");
2116        fs::write(&tmp, &text).map_err(|e| format!("Failed to write aliases tmp: {e}"))?;
2117        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename aliases file: {e}"))?;
2118        Ok(())
2119    }
2120
2121    fn samples_exists(&self, card_id: &str) -> Result<bool, String> {
2122        let path = self.samples_path(card_id)?;
2123        Ok(path.exists())
2124    }
2125
2126    fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String> {
2127        let path = self.samples_path(card_id)?;
2128        if path.exists() {
2129            return Err(format!(
2130                "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
2131            ));
2132        }
2133        let tmp = path.with_extension("jsonl.tmp");
2134        fs::write(&tmp, jsonl_text).map_err(|e| format!("Failed to write samples tmp: {e}"))?;
2135        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename samples file: {e}"))?;
2136        Ok(path)
2137    }
2138
2139    fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String> {
2140        let path = self.samples_path(card_id)?;
2141        if !path.exists() {
2142            return Ok(None);
2143        }
2144        let text =
2145            fs::read_to_string(&path).map_err(|e| format!("Failed to read samples file: {e}"))?;
2146        Ok(Some(text))
2147    }
2148
2149    fn import_from_dir(
2150        &self,
2151        source_dir: &Path,
2152        pkg: &str,
2153    ) -> Result<(Vec<String>, Vec<String>), String> {
2154        validate_name(pkg, "pkg")?;
2155        let dest = self.pkg_dir(pkg)?;
2156        let mut imported = Vec::new();
2157        let mut skipped = Vec::new();
2158
2159        let entries =
2160            fs::read_dir(source_dir).map_err(|e| format!("Failed to read card source dir: {e}"))?;
2161
2162        for entry in entries.flatten() {
2163            let path = entry.path();
2164            let fname = match path.file_name().and_then(|n| n.to_str()) {
2165                Some(n) => n.to_string(),
2166                None => continue,
2167            };
2168
2169            if !fname.ends_with(".toml") {
2170                continue;
2171            }
2172
2173            let card_id = fname.trim_end_matches(".toml");
2174            let dest_toml = dest.join(&fname);
2175
2176            if dest_toml.exists() {
2177                skipped.push(card_id.to_string());
2178                continue;
2179            }
2180
2181            let text = fs::read_to_string(&path)
2182                .map_err(|e| format!("Failed to read card file '{fname}': {e}"))?;
2183            let val: toml::Value = toml::from_str(&text)
2184                .map_err(|e| format!("Failed to parse card file '{fname}': {e}"))?;
2185            if val.get("schema_version").and_then(|v| v.as_str()) != Some(SCHEMA_VERSION) {
2186                continue;
2187            }
2188
2189            fs::copy(&path, &dest_toml)
2190                .map_err(|e| format!("Failed to copy card '{fname}': {e}"))?;
2191
2192            let samples_name = format!("{card_id}.samples.jsonl");
2193            let samples_src = source_dir.join(&samples_name);
2194            if samples_src.exists() {
2195                let samples_dest = dest.join(&samples_name);
2196                if !samples_dest.exists() {
2197                    fs::copy(&samples_src, &samples_dest)
2198                        .map_err(|e| format!("Failed to copy samples '{samples_name}': {e}"))?;
2199                }
2200            }
2201
2202            imported.push(card_id.to_string());
2203        }
2204
2205        Ok((imported, skipped))
2206    }
2207}
2208
2209// ═══════════════════════════════════════════════════════════════
2210// Event Publisher Port — Card sink fan-out (v1)
2211// ═══════════════════════════════════════════════════════════════
2212//
2213// Storage port (`CardStore` / `FileCardStore`) stays pure CRUD. The
2214// Event publisher port below mirrors every successful Card write to a
2215// set of subscriber backends configured via the `ALC_CARD_SINKS`
2216// environment variable. Fan-out is best-effort and strictly serial;
2217// subscriber failures never propagate to the primary Card API.
2218
2219// ─── CardEvent / CardEventKind ─────────────────────────────────
2220
2221/// A Card-level event emitted from the write path.
2222///
2223/// Each variant carries the already-serialized payload text so that
2224/// subscribers can persist the exact bytes that were written to the
2225/// primary store (byte-for-byte parity).
2226#[derive(Debug, Clone)]
2227pub enum CardEvent {
2228    /// A brand-new Card was written.
2229    Created {
2230        pkg: String,
2231        card_id: String,
2232        toml_text: String,
2233    },
2234    /// An existing Card had new top-level keys merged in.
2235    Appended { card_id: String, toml_text: String },
2236    /// A samples JSONL sidecar was written for `card_id`.
2237    SamplesWritten { card_id: String, jsonl_text: String },
2238    /// The global alias table was rewritten.
2239    AliasesWritten { toml_text: String },
2240}
2241
2242/// Lightweight discriminant for `CardEvent`. Used as a `HashMap` key in
2243/// `SubscriberStats` so that ok/err counters can be tracked per event
2244/// kind without holding the full payload.
2245#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
2246pub enum CardEventKind {
2247    Created,
2248    Appended,
2249    SamplesWritten,
2250    AliasesWritten,
2251}
2252
2253impl CardEventKind {
2254    /// Stable string tag used in `tracing` log fields.
2255    pub fn as_str(self) -> &'static str {
2256        match self {
2257            CardEventKind::Created => "created",
2258            CardEventKind::Appended => "appended",
2259            CardEventKind::SamplesWritten => "samples_written",
2260            CardEventKind::AliasesWritten => "aliases_written",
2261        }
2262    }
2263
2264    /// Short JSON key for the public `alc_stats` snapshot.
2265    /// Distinct from `as_str()` so that tracing logs keep their
2266    /// verbose form while the JSON surface stays compact.
2267    pub fn json_key(self) -> &'static str {
2268        match self {
2269            CardEventKind::Created => "created",
2270            CardEventKind::Appended => "appended",
2271            CardEventKind::SamplesWritten => "samples",
2272            CardEventKind::AliasesWritten => "aliases",
2273        }
2274    }
2275
2276    /// All kinds in stable display order. Used by `SubscriberHealthRow`
2277    /// to emit all four counter keys even when a counter is zero, so
2278    /// that downstream consumers can rely on field presence.
2279    pub fn all() -> [CardEventKind; 4] {
2280        [
2281            CardEventKind::Created,
2282            CardEventKind::Appended,
2283            CardEventKind::SamplesWritten,
2284            CardEventKind::AliasesWritten,
2285        ]
2286    }
2287}
2288
2289impl Serialize for CardEventKind {
2290    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2291        s.serialize_str(self.json_key())
2292    }
2293}
2294
2295impl CardEvent {
2296    /// Return the `CardEventKind` discriminant for this event.
2297    pub fn kind(&self) -> CardEventKind {
2298        match self {
2299            CardEvent::Created { .. } => CardEventKind::Created,
2300            CardEvent::Appended { .. } => CardEventKind::Appended,
2301            CardEvent::SamplesWritten { .. } => CardEventKind::SamplesWritten,
2302            CardEvent::AliasesWritten { .. } => CardEventKind::AliasesWritten,
2303        }
2304    }
2305}
2306
2307// ─── CardSubscriber trait ──────────────────────────────────────
2308
2309/// A downstream backend that receives `CardEvent`s in best-effort,
2310/// serial fan-out order.
2311///
2312/// Implementations must be `Send + Sync` so that the bus can hold
2313/// `Arc<dyn CardSubscriber>` safely across threads.
2314pub trait CardSubscriber: Send + Sync {
2315    /// Handle one event. Returning `Err` records a failure in
2316    /// `SubscriberStats` and emits a `tracing::warn!`, but does not
2317    /// propagate to the caller of the `_with_store` API.
2318    fn on_event(&self, ev: &CardEvent) -> Result<(), String>;
2319
2320    /// Canonical identity URI for this subscriber. Used as the key in
2321    /// `SubscriberStats` and as the match target for
2322    /// `CardEventBus::publish_to`.
2323    fn describe(&self) -> String;
2324
2325    /// Best-effort check for whether `card_id` already exists in this
2326    /// subscriber. Used by `alc_card_sink_backfill` to skip cards that
2327    /// are already mirrored (drift-safe: we never overwrite).
2328    ///
2329    /// Default implementation returns `Ok(false)` so subscribers that
2330    /// cannot cheaply answer (e.g. future network-backed sinks) always
2331    /// get the push. Override when a cheap local check is possible.
2332    fn has_card(&self, _card_id: &str) -> Result<bool, String> {
2333        Ok(false)
2334    }
2335}
2336
2337// ─── SubscriberStats ───────────────────────────────────────────
2338
2339/// Most recent delivery failure for a single subscriber. Exposed via
2340/// `SubscriberHealthRow.last_error` in the `alc_stats` JSON snapshot.
2341#[derive(Debug, Clone, Serialize)]
2342pub struct LastError {
2343    pub kind: CardEventKind,
2344    pub msg: String,
2345    pub ts_ms: u64,
2346}
2347
2348/// Per-subscriber counter state. Held inside `SubscriberStats` under a
2349/// `Mutex`; `snapshot` clones the fields into an owned `SubscriberHealthRow`
2350/// while the lock is held, so the lock window stays short.
2351#[derive(Default, Debug)]
2352pub struct PerSubscriber {
2353    pub ok: HashMap<CardEventKind, u64>,
2354    pub err: HashMap<CardEventKind, u64>,
2355    pub last_error: Option<LastError>,
2356}
2357
2358/// Process-wide per-subscriber statistics, keyed by subscriber URI
2359/// (the value returned by `CardSubscriber::describe`).
2360#[derive(Default, Debug)]
2361pub struct SubscriberStats {
2362    inner: Mutex<HashMap<String, PerSubscriber>>,
2363}
2364
2365impl SubscriberStats {
2366    /// Record a successful event delivery.
2367    pub fn record_ok(&self, key: &str, kind: CardEventKind) {
2368        let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2369        let entry = g.entry(key.to_string()).or_default();
2370        let c = entry.ok.entry(kind).or_insert(0);
2371        *c = c.saturating_add(1);
2372    }
2373
2374    /// Record a delivery failure together with the error message.
2375    /// The failure kind, message, and timestamp overwrite `last_error`
2376    /// — there is no ring buffer; only the most recent failure is kept.
2377    pub fn record_err(&self, key: &str, kind: CardEventKind, err: &str) {
2378        let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2379        let entry = g.entry(key.to_string()).or_default();
2380        let c = entry.err.entry(kind).or_insert(0);
2381        *c = c.saturating_add(1);
2382        entry.last_error = Some(LastError {
2383            kind,
2384            msg: err.to_string(),
2385            ts_ms: now_ms(),
2386        });
2387    }
2388
2389    /// Take a point-in-time snapshot of all subscribers. The returned
2390    /// `Vec` is owned — the internal lock is released before this
2391    /// function returns, so callers can hold it freely.
2392    ///
2393    /// All four `CardEventKind` keys (`created / appended / samples /
2394    /// aliases`) are always present in both `ok` and `err`, defaulting
2395    /// to 0 if no event of that kind has been recorded. This lets
2396    /// downstream consumers rely on field presence.
2397    pub fn snapshot(&self) -> Vec<SubscriberHealthRow> {
2398        let g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2399        let mut rows = Vec::with_capacity(g.len());
2400        for (sink, ps) in g.iter() {
2401            let mut ok: HashMap<String, u64> = HashMap::with_capacity(4);
2402            let mut err: HashMap<String, u64> = HashMap::with_capacity(4);
2403            for k in CardEventKind::all() {
2404                ok.insert(
2405                    k.json_key().to_string(),
2406                    ps.ok.get(&k).copied().unwrap_or(0),
2407                );
2408                err.insert(
2409                    k.json_key().to_string(),
2410                    ps.err.get(&k).copied().unwrap_or(0),
2411                );
2412            }
2413            rows.push(SubscriberHealthRow {
2414                sink: sink.clone(),
2415                ok,
2416                err,
2417                last_error: ps.last_error.clone(),
2418            });
2419        }
2420        // Stable output ordering (by sink URI) so that the JSON dump is
2421        // deterministic across runs — useful for snapshot tests.
2422        rows.sort_by(|a, b| a.sink.cmp(&b.sink));
2423        rows
2424    }
2425}
2426
2427/// Unix-epoch milliseconds used by `LastError.ts_ms`. Uses
2428/// `unwrap_or_default` so no panic can escape even if the system
2429/// clock is misconfigured.
2430fn now_ms() -> u64 {
2431    std::time::SystemTime::now()
2432        .duration_since(std::time::UNIX_EPOCH)
2433        .unwrap_or_default()
2434        .as_millis() as u64
2435}
2436
2437/// Snapshot row for a single subscriber, serialized directly into the
2438/// `alc_stats` JSON output as one element of the `card_sinks` array.
2439#[derive(Debug, Clone, Serialize)]
2440pub struct SubscriberHealthRow {
2441    pub sink: String,
2442    pub ok: HashMap<String, u64>,
2443    pub err: HashMap<String, u64>,
2444    pub last_error: Option<LastError>,
2445}
2446
2447/// Public entry point: snapshot of all process-wide subscriber stats.
2448/// Wrapper around `event_bus().stats().snapshot()` so that downstream
2449/// crates (notably `algocline-app`) do not need a handle to the
2450/// `CardEventBus` singleton.
2451pub fn subscriber_stats_snapshot() -> Vec<SubscriberHealthRow> {
2452    event_bus().stats().snapshot()
2453}
2454
2455// ─── FileCardSubscriber — file-URI backend ─────────────────────
2456
2457/// Atomically write `bytes` to `dest` by staging to a unique
2458/// `{dest}.tmp.{pid}.{seq}` sibling and renaming. The `{pid}.{seq}`
2459/// suffix prevents concurrent writers (same or different processes)
2460/// from colliding on the tmp path.
2461fn atomic_write(dest: &Path, bytes: &[u8]) -> Result<(), String> {
2462    static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
2463    let seq = TMP_SEQ.fetch_add(1, Ordering::Relaxed);
2464    let pid = process::id();
2465    if let Some(parent) = dest.parent() {
2466        if !parent.as_os_str().is_empty() && !parent.exists() {
2467            fs::create_dir_all(parent).map_err(|e| format!("subscriber mkdir: {e}"))?;
2468        }
2469    }
2470    let mut tmp = dest.as_os_str().to_owned();
2471    tmp.push(format!(".tmp.{pid}.{seq}"));
2472    let tmp_path = PathBuf::from(tmp);
2473    fs::write(&tmp_path, bytes).map_err(|e| format!("subscriber write tmp: {e}"))?;
2474    fs::rename(&tmp_path, dest).map_err(|e| format!("subscriber rename: {e}"))
2475}
2476
2477/// Canonical `file://` URI for a local directory path. The result is
2478/// the inverse of [`decode_file_uri_path`] — the pair round-trips
2479/// through the `ALC_CARD_SINKS` env spec.
2480fn canonical_file_uri(root: &Path) -> String {
2481    let p = root.to_string_lossy();
2482    #[cfg(unix)]
2483    {
2484        format!("file://{p}")
2485    }
2486    #[cfg(windows)]
2487    {
2488        format!("file:///{}", p.replace('\\', "/"))
2489    }
2490    #[cfg(not(any(unix, windows)))]
2491    {
2492        format!("file://{p}")
2493    }
2494}
2495
2496/// A subscriber that mirrors events to a local directory using the
2497/// same two-tier layout as [`FileCardStore`]:
2498///
2499/// - `{root}/{pkg}/{card_id}.toml` — Card TOML
2500/// - `{root}/{pkg}/{card_id}.samples.jsonl` — samples sidecar
2501/// - `{root}/_aliases.toml` — global alias table
2502pub struct FileCardSubscriber {
2503    root: PathBuf,
2504    uri: String,
2505}
2506
2507impl FileCardSubscriber {
2508    /// Construct a subscriber rooted at `root`. The canonical URI is
2509    /// computed once and returned from [`Self::describe`].
2510    pub fn new(root: PathBuf) -> Self {
2511        let uri = canonical_file_uri(&root);
2512        Self { root, uri }
2513    }
2514
2515    /// Scan the subscriber root for a Card with `card_id` under any
2516    /// `{pkg}` subdirectory. Returns `Ok(None)` when the root itself
2517    /// does not exist yet (subscribers are write-once lazy).
2518    pub fn locate_card(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
2519        validate_name(card_id, "card_id")?;
2520        if !self.root.exists() {
2521            return Ok(None);
2522        }
2523        let entries = fs::read_dir(&self.root).map_err(|e| format!("subscriber read_dir: {e}"))?;
2524        for entry in entries.flatten() {
2525            let p = entry.path();
2526            if p.is_dir() {
2527                let candidate = p.join(format!("{card_id}.toml"));
2528                if candidate.exists() {
2529                    return Ok(Some(candidate));
2530                }
2531            }
2532        }
2533        Ok(None)
2534    }
2535
2536    fn ensure_pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
2537        validate_name(pkg, "pkg")?;
2538        let dir = self.root.join(pkg);
2539        if !dir.exists() {
2540            fs::create_dir_all(&dir).map_err(|e| format!("subscriber mkdir: {e}"))?;
2541        }
2542        Ok(dir)
2543    }
2544
2545    fn write_created(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<(), String> {
2546        validate_name(card_id, "card_id")?;
2547        let dir = self.ensure_pkg_dir(pkg)?;
2548        let dest = dir.join(format!("{card_id}.toml"));
2549        atomic_write(&dest, toml_text.as_bytes())
2550    }
2551
2552    fn write_appended(&self, card_id: &str, toml_text: &str) -> Result<(), String> {
2553        match self.locate_card(card_id)? {
2554            Some(dest) => atomic_write(&dest, toml_text.as_bytes()),
2555            None => Err(format!(
2556                "subscriber append: card '{card_id}' missing at {}",
2557                self.uri
2558            )),
2559        }
2560    }
2561
2562    fn write_samples(&self, card_id: &str, jsonl_text: &str) -> Result<(), String> {
2563        let card_path = self.locate_card(card_id)?.ok_or_else(|| {
2564            format!(
2565                "subscriber samples: card '{card_id}' missing at {}",
2566                self.uri
2567            )
2568        })?;
2569        let dir = card_path
2570            .parent()
2571            .ok_or_else(|| format!("subscriber samples: card '{card_id}' has no parent dir"))?;
2572        let dest = dir.join(format!("{card_id}.samples.jsonl"));
2573        atomic_write(&dest, jsonl_text.as_bytes())
2574    }
2575
2576    fn write_aliases(&self, toml_text: &str) -> Result<(), String> {
2577        if !self.root.exists() {
2578            fs::create_dir_all(&self.root).map_err(|e| format!("subscriber mkdir: {e}"))?;
2579        }
2580        let dest = self.root.join("_aliases.toml");
2581        atomic_write(&dest, toml_text.as_bytes())
2582    }
2583}
2584
2585impl CardSubscriber for FileCardSubscriber {
2586    fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
2587        match ev {
2588            CardEvent::Created {
2589                pkg,
2590                card_id,
2591                toml_text,
2592            } => self.write_created(pkg, card_id, toml_text),
2593            CardEvent::Appended { card_id, toml_text } => self.write_appended(card_id, toml_text),
2594            CardEvent::SamplesWritten {
2595                card_id,
2596                jsonl_text,
2597            } => self.write_samples(card_id, jsonl_text),
2598            CardEvent::AliasesWritten { toml_text } => self.write_aliases(toml_text),
2599        }
2600    }
2601
2602    fn describe(&self) -> String {
2603        self.uri.clone()
2604    }
2605
2606    /// Delegates to [`FileCardSubscriber::locate_card`]. A non-existent
2607    /// root (subscriber has never been written to) returns `Ok(false)`,
2608    /// which is the correct "backfill needed" answer.
2609    fn has_card(&self, card_id: &str) -> Result<bool, String> {
2610        Ok(self.locate_card(card_id)?.is_some())
2611    }
2612}
2613
2614// ─── CardEventBus + OnceLock singleton ─────────────────────────
2615
2616/// Process-wide fan-out bus. Subscribers are registered once at startup
2617/// (from `ALC_CARD_SINKS`) and stored behind a `Mutex` so that tests
2618/// can swap them out via `replace_subscribers_for_test` without losing
2619/// the singleton identity.
2620pub struct CardEventBus {
2621    subscribers: Mutex<Vec<Arc<dyn CardSubscriber>>>,
2622    stats: Arc<SubscriberStats>,
2623}
2624
2625impl CardEventBus {
2626    /// Build a bus from an explicit subscriber list. Used both by the
2627    /// env loader and by `install_event_bus_for_test`.
2628    pub fn new(subscribers: Vec<Arc<dyn CardSubscriber>>) -> Self {
2629        Self {
2630            subscribers: Mutex::new(subscribers),
2631            stats: Arc::new(SubscriberStats::default()),
2632        }
2633    }
2634
2635    /// Shared handle to the per-subscriber counters.
2636    pub fn stats(&self) -> &Arc<SubscriberStats> {
2637        &self.stats
2638    }
2639
2640    /// Fan out `ev` to every registered subscriber serially. Subscriber
2641    /// failures are counted in `SubscriberStats` and logged but do not
2642    /// propagate.
2643    pub fn publish(&self, ev: &CardEvent) {
2644        let subs_snapshot: Vec<Arc<dyn CardSubscriber>> = {
2645            let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2646            guard.clone()
2647        };
2648        for sub in &subs_snapshot {
2649            let key = sub.describe();
2650            match sub.on_event(ev) {
2651                Ok(()) => self.stats.record_ok(&key, ev.kind()),
2652                Err(e) => {
2653                    tracing::warn!(
2654                        subscriber = %key,
2655                        kind = ev.kind().as_str(),
2656                        error = %e,
2657                        "card subscriber failed"
2658                    );
2659                    self.stats.record_err(&key, ev.kind(), &e);
2660                }
2661            }
2662        }
2663    }
2664
2665    /// Deliver `ev` to exactly one subscriber identified by
2666    /// `target` URI. Returns `Err` when no subscriber matches or the
2667    /// subscriber itself fails (backfill path needs the caller to know).
2668    pub fn publish_to(&self, target: &str, ev: &CardEvent) -> Result<(), String> {
2669        let hit: Option<Arc<dyn CardSubscriber>> = {
2670            let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2671            guard.iter().find(|s| s.describe() == target).cloned()
2672        };
2673        let Some(sub) = hit else {
2674            return Err(format!("subscriber not registered: {target}"));
2675        };
2676        let key = sub.describe();
2677        match sub.on_event(ev) {
2678            Ok(()) => {
2679                self.stats.record_ok(&key, ev.kind());
2680                Ok(())
2681            }
2682            Err(e) => {
2683                tracing::warn!(
2684                    subscriber = %key,
2685                    kind = ev.kind().as_str(),
2686                    error = %e,
2687                    "card subscriber failed (publish_to)"
2688                );
2689                self.stats.record_err(&key, ev.kind(), &e);
2690                Err(e)
2691            }
2692        }
2693    }
2694
2695    /// List every subscriber URI currently registered on the bus.
2696    pub fn subscriber_uris(&self) -> Vec<String> {
2697        let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2698        guard.iter().map(|s| s.describe()).collect()
2699    }
2700
2701    /// Look up a subscriber by URI (as returned by `describe`). Returns
2702    /// `None` when no subscriber matches. Used by
2703    /// `alc_card_sink_backfill` to dispatch has_card checks against a
2704    /// specific sink.
2705    pub fn find_subscriber(&self, uri: &str) -> Option<Arc<dyn CardSubscriber>> {
2706        let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2707        guard.iter().find(|s| s.describe() == uri).cloned()
2708    }
2709
2710    /// Replace the subscriber list in place while preserving singleton
2711    /// identity and shared `SubscriberStats`. Test-only.
2712    #[cfg(any(test, feature = "test-support"))]
2713    pub fn replace_subscribers_for_test(&self, subs: Vec<Arc<dyn CardSubscriber>>) {
2714        let mut guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2715        *guard = subs;
2716    }
2717
2718    /// Reset the per-subscriber counters. Test-only helper.
2719    #[cfg(any(test, feature = "test-support"))]
2720    pub fn reset_stats_for_test(&self) {
2721        let mut g = self.stats.inner.lock().unwrap_or_else(|p| p.into_inner());
2722        g.clear();
2723    }
2724}
2725
2726static CARD_EVENT_BUS: OnceLock<CardEventBus> = OnceLock::new();
2727
2728/// Return the process-wide `CardEventBus` singleton, initializing it
2729/// on the first call from the `ALC_CARD_SINKS` environment variable.
2730pub fn event_bus() -> &'static CardEventBus {
2731    CARD_EVENT_BUS.get_or_init(|| {
2732        let subs = load_subscribers_from_env();
2733        CardEventBus::new(subs)
2734    })
2735}
2736
2737/// Eagerly initialize the bus. Idempotent and safe to call multiple
2738/// times; intended for startup hooks (`main.rs`) so that subscriber
2739/// registration `tracing::info!` lines are emitted at boot rather than
2740/// on the first Card write.
2741pub fn init_event_bus() {
2742    let bus = event_bus();
2743    let uris = bus.subscriber_uris();
2744    if uris.is_empty() {
2745        tracing::info!("card sinks: no subscribers configured (ALC_CARD_SINKS unset)");
2746    } else {
2747        for uri in &uris {
2748            tracing::info!(subscriber = %uri, "card sink registered");
2749        }
2750    }
2751}
2752
2753/// Install a test-built bus. Fails once the singleton is already set.
2754#[cfg(any(test, feature = "test-support"))]
2755pub fn install_event_bus_for_test(bus: CardEventBus) -> Result<(), String> {
2756    CARD_EVENT_BUS
2757        .set(bus)
2758        .map_err(|_| "bus already initialized".to_string())
2759}
2760
2761/// Convenience wrapper: publish through the singleton.
2762pub fn publish(ev: CardEvent) {
2763    // In test builds, serialize publishing with the subscriber-test mutex so
2764    // that default-store tests running in parallel cannot inject events into
2765    // a subscriber test's mock while the mock is active.  The owning test
2766    // thread sets INSIDE_BUS_TEST to true so it does NOT try to acquire the
2767    // same lock it already holds (re-entrancy safe).
2768    #[cfg(test)]
2769    {
2770        let is_test_owner = INSIDE_BUS_TEST.with(|f| f.get());
2771        if !is_test_owner {
2772            // Block until any active subscriber test releases the lock.
2773            let _gate = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
2774            event_bus().publish(&ev);
2775            return;
2776        }
2777    }
2778    event_bus().publish(&ev);
2779}
2780
2781/// Mutex used in tests to serialise subscriber-test setup against concurrent
2782/// publishes from default-store tests running in parallel.
2783#[cfg(test)]
2784fn bus_test_gate() -> &'static Mutex<()> {
2785    static GATE: OnceLock<Mutex<()>> = OnceLock::new();
2786    GATE.get_or_init(|| Mutex::new(()))
2787}
2788
2789// Thread-local flag: set to `true` by the thread running inside
2790// `with_bus_subscribers` so that `publish` skips the gate (re-entrancy).
2791#[cfg(test)]
2792thread_local! {
2793    static INSIDE_BUS_TEST: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
2794}
2795
2796// ─── alc_card_sink_backfill ────────────────────────────────────
2797
2798/// Result of a [`card_sink_backfill`] run. One row per card the tool
2799/// touched (classified as pushed / skipped / failed / pushed_samples).
2800/// The `failed` entries carry the error message so an operator can
2801/// triage read-only mounts etc.
2802#[derive(Debug, Clone, Default, Serialize)]
2803pub struct SinkBackfillReport {
2804    pub sink: String,
2805    pub pushed: Vec<String>,
2806    pub skipped: Vec<String>,
2807    pub failed: Vec<(String, String)>,
2808    pub pushed_samples: Vec<String>,
2809}
2810
2811/// Backfill one subscriber (`sink` URI) from the primary store.
2812///
2813/// Steps:
2814/// 1. Look up the target subscriber on the event bus; fail fast if
2815///    the URI is not registered so the caller gets an immediate
2816///    error rather than a silent no-op.
2817/// 2. Enumerate every `(pkg, card_id)` pair from the default
2818///    [`CardStore`].
2819/// 3. For each pair, ask the subscriber whether it already has that
2820///    card (`CardSubscriber::has_card`). If yes → skipped (drift-safe,
2821///    no overwrite). If no → read the primary TOML and `publish_to`
2822///    the one target sink. Samples are mirrored the same way.
2823/// 4. `dry_run = true` short-circuits step 3: the report lists
2824///    what would have been pushed but no `publish_to` is issued,
2825///    so `SubscriberStats` does not increment.
2826///
2827/// `card_sink_backfill` operates with an injectable [`CardStore`]; tests
2828/// drive this directly against a tempdir-backed [`FileCardStore`] so they
2829/// never touch the user's real cards directory.
2830pub fn card_sink_backfill_with_store(
2831    store: &dyn CardStore,
2832    sink: &str,
2833    dry_run: bool,
2834) -> Result<SinkBackfillReport, String> {
2835    let bus = event_bus();
2836    let sub = bus
2837        .find_subscriber(sink)
2838        .ok_or_else(|| format!("unknown sink: {sink}"))?;
2839
2840    let locators = store.list_card_locators(None)?;
2841
2842    let mut report = SinkBackfillReport {
2843        sink: sink.to_string(),
2844        ..Default::default()
2845    };
2846
2847    for (pkg, locator) in locators {
2848        let card_id = match locator.file_stem().and_then(|s| s.to_str()) {
2849            Some(s) => s.to_string(),
2850            None => continue,
2851        };
2852
2853        match sub.has_card(&card_id) {
2854            Ok(true) => {
2855                report.skipped.push(card_id);
2856                continue;
2857            }
2858            Ok(false) => {}
2859            Err(e) => {
2860                tracing::warn!(
2861                    card_id = %card_id,
2862                    error = %e,
2863                    "backfill: has_card failed; treating as skipped"
2864                );
2865                report.skipped.push(card_id);
2866                continue;
2867            }
2868        }
2869
2870        let toml_text = match store.read_locator_text(&locator) {
2871            Ok(Some(t)) => t,
2872            Ok(None) => {
2873                // Unreadable / corrupt on primary. Do not panic; skip.
2874                report.skipped.push(card_id);
2875                continue;
2876            }
2877            Err(e) => {
2878                tracing::warn!(
2879                    card_id = %card_id,
2880                    error = %e,
2881                    "backfill: read_locator_text failed; treating as skipped"
2882                );
2883                report.skipped.push(card_id);
2884                continue;
2885            }
2886        };
2887
2888        if dry_run {
2889            report.pushed.push(card_id.clone());
2890            if matches!(store.read_samples_text(&card_id), Ok(Some(_))) {
2891                report.pushed_samples.push(card_id);
2892            }
2893            continue;
2894        }
2895
2896        let ev = CardEvent::Created {
2897            pkg: pkg.clone(),
2898            card_id: card_id.clone(),
2899            toml_text,
2900        };
2901        match bus.publish_to(sink, &ev) {
2902            Ok(()) => report.pushed.push(card_id.clone()),
2903            Err(e) => {
2904                report.failed.push((card_id, e));
2905                continue;
2906            }
2907        }
2908
2909        if let Ok(Some(jsonl_text)) = store.read_samples_text(&card_id) {
2910            let ev = CardEvent::SamplesWritten {
2911                card_id: card_id.clone(),
2912                jsonl_text,
2913            };
2914            match bus.publish_to(sink, &ev) {
2915                Ok(()) => report.pushed_samples.push(card_id),
2916                Err(e) => {
2917                    report.failed.push((card_id, format!("samples: {e}")));
2918                }
2919            }
2920        }
2921    }
2922
2923    Ok(report)
2924}
2925
2926// ─── ALC_CARD_SINKS env parser ─────────────────────────────────
2927
2928/// Read `ALC_CARD_SINKS` and build one subscriber per accepted spec.
2929/// Malformed entries are logged and skipped; duplicate URIs are
2930/// first-wins. Non-UTF8 values reject the whole env.
2931fn load_subscribers_from_env() -> Vec<Arc<dyn CardSubscriber>> {
2932    let raw = match std::env::var("ALC_CARD_SINKS") {
2933        Ok(v) => v,
2934        Err(std::env::VarError::NotPresent) => return Vec::new(),
2935        Err(std::env::VarError::NotUnicode(_)) => {
2936            tracing::error!("ALC_CARD_SINKS contains non-UTF8 bytes; ignoring entire variable");
2937            return Vec::new();
2938        }
2939    };
2940    parse_subscribers_from_str(&raw)
2941}
2942
2943/// Parse a `|`-separated list of subscriber URIs (the same format used
2944/// by `ALC_CARD_SINKS`). Extracted so tests can exercise the parser
2945/// without touching process environment.
2946fn parse_subscribers_from_str(raw: &str) -> Vec<Arc<dyn CardSubscriber>> {
2947    if raw.is_empty() {
2948        return Vec::new();
2949    }
2950    let mut seen: HashSet<String> = HashSet::new();
2951    let mut out: Vec<Arc<dyn CardSubscriber>> = Vec::new();
2952    for spec in raw.split('|') {
2953        let spec = spec.trim();
2954        if spec.is_empty() {
2955            continue;
2956        }
2957        let Some(sub) = parse_subscriber_spec(spec) else {
2958            continue;
2959        };
2960        let uri = sub.describe();
2961        if !seen.insert(uri.clone()) {
2962            tracing::warn!(subscriber = %uri, "duplicate ALC_CARD_SINKS entry; keeping first");
2963            continue;
2964        }
2965        out.push(sub);
2966    }
2967    out
2968}
2969
2970/// Parse one subscriber spec. v1 only accepts `file:///absolute/path`.
2971fn parse_subscriber_spec(spec: &str) -> Option<Arc<dyn CardSubscriber>> {
2972    // scheme required
2973    let Some(colon_idx) = spec.find(':') else {
2974        tracing::error!(spec, "ALC_CARD_SINKS entry missing URI scheme");
2975        return None;
2976    };
2977    let scheme = &spec[..colon_idx];
2978    let rest = &spec[colon_idx + 1..];
2979    if scheme != "file" {
2980        tracing::error!(spec, scheme, "ALC_CARD_SINKS entry has unknown scheme");
2981        return None;
2982    }
2983    // scheme-specific: must start with `//`
2984    let Some(after_slash) = rest.strip_prefix("//") else {
2985        tracing::error!(spec, "file URI missing '//'");
2986        return None;
2987    };
2988    // split authority / path. path begins with '/'.
2989    let Some(path_start) = after_slash.find('/') else {
2990        tracing::error!(spec, "file URI has no path component");
2991        return None;
2992    };
2993    let authority = &after_slash[..path_start];
2994    let encoded_path = &after_slash[path_start..];
2995    if !authority.is_empty() {
2996        tracing::error!(
2997            spec,
2998            authority,
2999            "file URI with non-empty authority is rejected"
3000        );
3001        return None;
3002    }
3003    let path = decode_file_uri_path(encoded_path)?;
3004    Some(Arc::new(FileCardSubscriber::new(path)))
3005}
3006
3007/// Decode the path portion of a `file://` URI into a `PathBuf`.
3008///
3009/// Unix: a leading `/` is preserved (absolute path).
3010/// Windows: the leading `/` is stripped so that `/C:/a/b` becomes
3011/// `C:/a/b`.
3012fn decode_file_uri_path(encoded: &str) -> Option<PathBuf> {
3013    let decoded = percent_decode(encoded)?;
3014    #[cfg(windows)]
3015    {
3016        // Strip the leading slash so "/C:/foo" -> "C:/foo".
3017        let trimmed = decoded.strip_prefix('/').unwrap_or(&decoded);
3018        Some(PathBuf::from(trimmed))
3019    }
3020    #[cfg(not(windows))]
3021    {
3022        Some(PathBuf::from(decoded))
3023    }
3024}
3025
3026/// Percent-decode a URI path segment. Returns `None` on invalid or
3027/// truncated `%XX` sequences.
3028fn percent_decode(src: &str) -> Option<String> {
3029    let bytes = src.as_bytes();
3030    let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
3031    let mut i = 0;
3032    while i < bytes.len() {
3033        let b = bytes[i];
3034        if b == b'%' {
3035            if i + 2 >= bytes.len() {
3036                tracing::error!(src, "percent-encoded sequence truncated");
3037                return None;
3038            }
3039            let hi = (bytes[i + 1] as char).to_digit(16);
3040            let lo = (bytes[i + 2] as char).to_digit(16);
3041            match (hi, lo) {
3042                (Some(h), Some(l)) => {
3043                    out.push(((h << 4) | l) as u8);
3044                    i += 3;
3045                }
3046                _ => {
3047                    tracing::error!(src, "percent-encoded sequence has non-hex digits");
3048                    return None;
3049                }
3050            }
3051        } else {
3052            out.push(b);
3053            i += 1;
3054        }
3055    }
3056    match String::from_utf8(out) {
3057        Ok(s) => Some(s),
3058        Err(_) => {
3059            tracing::error!(src, "percent-decoded bytes are not valid UTF-8");
3060            None
3061        }
3062    }
3063}
3064
3065#[cfg(test)]
3066mod tests {
3067    use super::*;
3068
3069    #[test]
3070    fn minimum_valid_card() {
3071        let tmp = tempfile::tempdir().unwrap();
3072        let store = FileCardStore::new(tmp.path().to_path_buf());
3073        let pkg = "minimum_valid_pkg";
3074        let input = json!({ "pkg": { "name": pkg } });
3075        let (id, path) = create_with_store(&store, input).unwrap();
3076        assert!(path.exists());
3077        assert!(id.starts_with(pkg));
3078
3079        let got = get_with_store(&store, &id).unwrap().unwrap();
3080        assert_eq!(got["schema_version"], json!(SCHEMA_VERSION));
3081        assert_eq!(got["card_id"], json!(id));
3082        assert_eq!(got["pkg"]["name"], json!(pkg));
3083        assert!(got.get("created_at").is_some());
3084        assert!(got.get("created_by").is_some());
3085    }
3086
3087    #[test]
3088    fn create_rejects_missing_pkg_name() {
3089        let tmp = tempfile::tempdir().unwrap();
3090        let store = FileCardStore::new(tmp.path().to_path_buf());
3091        let err = create_with_store(&store, json!({})).unwrap_err();
3092        assert!(err.contains("pkg.name"));
3093    }
3094
3095    #[test]
3096    fn create_is_immutable() {
3097        let tmp = tempfile::tempdir().unwrap();
3098        let store = FileCardStore::new(tmp.path().to_path_buf());
3099        let pkg = "immutable_pkg";
3100        let input = json!({
3101            "card_id": "fixed_id_001",
3102            "pkg": { "name": pkg }
3103        });
3104        create_with_store(&store, input.clone()).unwrap();
3105        let err = create_with_store(&store, input).unwrap_err();
3106        assert!(err.contains("already exists"));
3107    }
3108
3109    #[test]
3110    fn create_injects_param_fingerprint() {
3111        let tmp = tempfile::tempdir().unwrap();
3112        let store = FileCardStore::new(tmp.path().to_path_buf());
3113        let pkg = "fingerprint_pkg";
3114        let input = json!({
3115            "pkg": { "name": pkg },
3116            "params": { "depth": 3, "temperature": 0.0 }
3117        });
3118        let (id, _) = create_with_store(&store, input).unwrap();
3119        let got = get_with_store(&store, &id).unwrap().unwrap();
3120        assert!(got["param_fingerprint"].is_string());
3121    }
3122
3123    #[test]
3124    fn list_returns_newest_first() {
3125        let tmp = tempfile::tempdir().unwrap();
3126        let store = FileCardStore::new(tmp.path().to_path_buf());
3127        let pkg = "list_newest_pkg";
3128        let (id1, _) = create_with_store(
3129            &store,
3130            json!({
3131                "card_id": format!("{pkg}_a"),
3132                "pkg": { "name": pkg },
3133                "created_at": "2025-01-01T00:00:00Z"
3134            }),
3135        )
3136        .unwrap();
3137        let (id2, _) = create_with_store(
3138            &store,
3139            json!({
3140                "card_id": format!("{pkg}_b"),
3141                "pkg": { "name": pkg },
3142                "created_at": "2026-01-01T00:00:00Z"
3143            }),
3144        )
3145        .unwrap();
3146
3147        let rows = list_with_store(&store, Some(pkg)).unwrap();
3148        assert_eq!(rows.len(), 2);
3149        assert_eq!(rows[0].card_id, id2); // newer first
3150        assert_eq!(rows[1].card_id, id1);
3151    }
3152
3153    #[test]
3154    fn list_extracts_summary_fields() {
3155        let tmp = tempfile::tempdir().unwrap();
3156        let store = FileCardStore::new(tmp.path().to_path_buf());
3157        let pkg = "list_summary_pkg";
3158        let (id, _) = create_with_store(
3159            &store,
3160            json!({
3161                "pkg": { "name": pkg },
3162                "model": { "id": "claude-opus-4-6" },
3163                "scenario": { "name": "gsm8k_sample100" },
3164                "stats": { "pass_rate": 0.82 }
3165            }),
3166        )
3167        .unwrap();
3168
3169        let rows = list_with_store(&store, Some(pkg)).unwrap();
3170        let row = rows.iter().find(|r| r.card_id == id).unwrap();
3171        assert_eq!(row.model.as_deref(), Some("claude-opus-4-6"));
3172        assert_eq!(row.scenario.as_deref(), Some("gsm8k_sample100"));
3173        assert_eq!(row.pass_rate, Some(0.82));
3174    }
3175
3176    #[test]
3177    fn get_missing_returns_none() {
3178        let tmp = tempfile::tempdir().unwrap();
3179        let store = FileCardStore::new(tmp.path().to_path_buf());
3180        assert!(get_with_store(&store, "does_not_exist_xyz")
3181            .unwrap()
3182            .is_none());
3183    }
3184
3185    #[test]
3186    fn card_id_embeds_compact_timestamp() {
3187        let tmp = tempfile::tempdir().unwrap();
3188        let store = FileCardStore::new(tmp.path().to_path_buf());
3189        let pkg = "ts_embed_pkg";
3190        let (id, _) = create_with_store(&store, json!({ "pkg": { "name": pkg } })).unwrap();
3191        // Expect: {pkg}_{model}_{YYYYMMDDTHHMMSS}_{hash6}
3192        // After removing the pkg prefix, there should be a segment
3193        // containing 'T' separating date and time.
3194        let tail = id.strip_prefix(&format!("{pkg}_")).unwrap();
3195        let parts: Vec<&str> = tail.split('_').collect();
3196        // parts = [model_short, YYYYMMDDTHHMMSS, hash6]
3197        assert_eq!(parts.len(), 3, "unexpected card_id shape: {id}");
3198        let ts = parts[1];
3199        assert_eq!(ts.len(), 15, "timestamp segment wrong length: {ts}");
3200        assert!(ts.chars().nth(8) == Some('T'), "missing T separator: {ts}");
3201    }
3202
3203    #[test]
3204    fn now_compact_format() {
3205        let s = now_compact();
3206        assert_eq!(s.len(), 15);
3207        assert_eq!(s.chars().nth(8), Some('T'));
3208        // All other positions are digits
3209        for (i, c) in s.chars().enumerate() {
3210            if i != 8 {
3211                assert!(c.is_ascii_digit(), "non-digit at pos {i}: {s}");
3212            }
3213        }
3214    }
3215
3216    #[test]
3217    fn short_model_variants() {
3218        assert_eq!(short_model("claude-opus-4-6"), "opus46");
3219        assert_eq!(short_model("gpt-4o"), "4o");
3220        assert_eq!(short_model(""), "model");
3221    }
3222
3223    #[test]
3224    fn two_cards_same_second_different_stats_get_distinct_ids() {
3225        let tmp = tempfile::tempdir().unwrap();
3226        let store = FileCardStore::new(tmp.path().to_path_buf());
3227        let pkg = "distinct_ids_pkg";
3228        let input1 = json!({
3229            "pkg": { "name": pkg },
3230            "scenario": { "name": "gsm8k" },
3231            "stats": { "pass_rate": 0.4 }
3232        });
3233        let input2 = json!({
3234            "pkg": { "name": pkg },
3235            "scenario": { "name": "gsm8k" },
3236            "stats": { "pass_rate": 0.9 }
3237        });
3238        let (id1, _) = create_with_store(&store, input1).unwrap();
3239        let (id2, _) = create_with_store(&store, input2).unwrap();
3240        assert_ne!(id1, id2, "distinct stats must yield distinct card_ids");
3241    }
3242
3243    // ─── P1: append ────────────────────────────────────────────
3244
3245    #[test]
3246    fn append_adds_new_fields() {
3247        let tmp = tempfile::tempdir().unwrap();
3248        let store = FileCardStore::new(tmp.path().to_path_buf());
3249        let pkg = "append_new_fields_pkg";
3250        let (id, _) = create_with_store(
3251            &store,
3252            json!({
3253                "pkg": { "name": pkg },
3254                "stats": { "pass_rate": 0.5 }
3255            }),
3256        )
3257        .unwrap();
3258
3259        let merged = append_with_store(
3260            &store,
3261            &id,
3262            json!({
3263                "caveats": { "notes": "rescored after fix" },
3264                "metadata": { "reviewer": "yn" }
3265            }),
3266        )
3267        .unwrap();
3268        assert_eq!(merged["caveats"]["notes"], json!("rescored after fix"));
3269        assert_eq!(merged["metadata"]["reviewer"], json!("yn"));
3270
3271        // Persisted
3272        let got = get_with_store(&store, &id).unwrap().unwrap();
3273        assert_eq!(got["caveats"]["notes"], json!("rescored after fix"));
3274        // Existing field untouched
3275        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3276    }
3277
3278    #[test]
3279    fn append_rejects_existing_key() {
3280        let tmp = tempfile::tempdir().unwrap();
3281        let store = FileCardStore::new(tmp.path().to_path_buf());
3282        let pkg = "append_reject_key_pkg";
3283        let (id, _) = create_with_store(
3284            &store,
3285            json!({
3286                "pkg": { "name": pkg },
3287                "stats": { "pass_rate": 0.5 }
3288            }),
3289        )
3290        .unwrap();
3291
3292        let err =
3293            append_with_store(&store, &id, json!({ "stats": { "pass_rate": 0.9 } })).unwrap_err();
3294        assert!(err.contains("already set"), "got: {err}");
3295        // Verify original value still there
3296        let got = get_with_store(&store, &id).unwrap().unwrap();
3297        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3298    }
3299
3300    #[test]
3301    fn append_errors_on_missing_card() {
3302        let tmp = tempfile::tempdir().unwrap();
3303        let store = FileCardStore::new(tmp.path().to_path_buf());
3304        let err = append_with_store(&store, "does_not_exist_xyz", json!({ "x": 1 })).unwrap_err();
3305        assert!(err.contains("not found"));
3306    }
3307
3308    // ─── P1: alias_set / alias_list ────────────────────────────
3309
3310    #[test]
3311    fn alias_set_and_list_roundtrip() {
3312        let tmp = tempfile::tempdir().unwrap();
3313        let store = FileCardStore::new(tmp.path().to_path_buf());
3314        let pkg = "alias_roundtrip_pkg";
3315        let (id, _) = create_with_store(&store, json!({ "pkg": { "name": pkg } })).unwrap();
3316
3317        let alias_name = "test_alias_roundtrip";
3318        alias_set_with_store(&store, alias_name, &id, Some(pkg), Some("smoke")).unwrap();
3319
3320        let rows = alias_list_with_store(&store, Some(pkg)).unwrap();
3321        let a = rows.iter().find(|a| a.name == alias_name).unwrap();
3322        assert_eq!(a.card_id, id);
3323        assert_eq!(a.pkg.as_deref(), Some(pkg));
3324        assert_eq!(a.note.as_deref(), Some("smoke"));
3325        assert!(!a.set_at.is_empty());
3326
3327        // Rebind to a new card
3328        let (id2, _) = create_with_store(
3329            &store,
3330            json!({
3331                "card_id": format!("{pkg}_b"),
3332                "pkg": { "name": pkg }
3333            }),
3334        )
3335        .unwrap();
3336        alias_set_with_store(&store, alias_name, &id2, Some(pkg), None).unwrap();
3337        let rows = alias_list_with_store(&store, Some(pkg)).unwrap();
3338        let matching: Vec<&Alias> = rows.iter().filter(|a| a.name == alias_name).collect();
3339        assert_eq!(matching.len(), 1, "alias should be unique by name");
3340        assert_eq!(matching[0].card_id, id2);
3341    }
3342
3343    #[test]
3344    fn alias_set_rejects_unknown_card() {
3345        let tmp = tempfile::tempdir().unwrap();
3346        let store = FileCardStore::new(tmp.path().to_path_buf());
3347        let err = alias_set_with_store(&store, "x", "does_not_exist_xyz", None, None).unwrap_err();
3348        assert!(err.contains("not found"));
3349    }
3350
3351    // ─── find + where DSL ───────────────────────────────────────
3352
3353    fn where_from(v: Json) -> Predicate {
3354        parse_where(&v).expect("parse where")
3355    }
3356
3357    fn order_from(v: Json) -> Vec<OrderKey> {
3358        parse_order_by(&v).expect("parse order_by")
3359    }
3360
3361    #[test]
3362    fn find_where_nested_eq_and_gte() {
3363        let tmp = tempfile::tempdir().unwrap();
3364        let store = FileCardStore::new(tmp.path().to_path_buf());
3365        let pkg = "find_nested_eq_pkg";
3366        create_with_store(
3367            &store,
3368            json!({
3369                "card_id": format!("{pkg}_low"),
3370                "pkg": { "name": pkg },
3371                "scenario": { "name": "gsm8k" },
3372                "stats": { "pass_rate": 0.4 }
3373            }),
3374        )
3375        .unwrap();
3376        create_with_store(
3377            &store,
3378            json!({
3379                "card_id": format!("{pkg}_high"),
3380                "pkg": { "name": pkg },
3381                "scenario": { "name": "gsm8k" },
3382                "stats": { "pass_rate": 0.9 }
3383            }),
3384        )
3385        .unwrap();
3386        create_with_store(
3387            &store,
3388            json!({
3389                "card_id": format!("{pkg}_other"),
3390                "pkg": { "name": pkg },
3391                "scenario": { "name": "other" },
3392                "stats": { "pass_rate": 1.0 }
3393            }),
3394        )
3395        .unwrap();
3396
3397        // scenario eq via nested object
3398        let rows = find_with_store(
3399            &store,
3400            FindQuery {
3401                pkg: Some(pkg.to_string()),
3402                where_: Some(where_from(json!({
3403                    "scenario": { "name": "gsm8k" },
3404                }))),
3405                order_by: order_from(json!("-stats.pass_rate")),
3406                ..Default::default()
3407            },
3408        )
3409        .unwrap();
3410        assert_eq!(rows.len(), 2);
3411        assert_eq!(rows[0].pass_rate, Some(0.9));
3412        assert_eq!(rows[1].pass_rate, Some(0.4));
3413
3414        // gte operator
3415        let rows = find_with_store(
3416            &store,
3417            FindQuery {
3418                pkg: Some(pkg.to_string()),
3419                where_: Some(where_from(json!({
3420                    "stats": { "pass_rate": { "gte": 0.8 } },
3421                }))),
3422                order_by: order_from(json!("-stats.pass_rate")),
3423                ..Default::default()
3424            },
3425        )
3426        .unwrap();
3427        assert_eq!(rows.len(), 2);
3428        assert!(rows.iter().all(|r| r.pass_rate.unwrap() >= 0.8));
3429
3430        // limit
3431        let rows = find_with_store(
3432            &store,
3433            FindQuery {
3434                pkg: Some(pkg.to_string()),
3435                order_by: order_from(json!("-stats.pass_rate")),
3436                limit: Some(1),
3437                ..Default::default()
3438            },
3439        )
3440        .unwrap();
3441        assert_eq!(rows.len(), 1);
3442        assert_eq!(rows[0].pass_rate, Some(1.0));
3443    }
3444
3445    #[test]
3446    fn find_where_implicit_eq_and_logical() {
3447        let tmp = tempfile::tempdir().unwrap();
3448        let store = FileCardStore::new(tmp.path().to_path_buf());
3449        let pkg = "find_implicit_eq_pkg";
3450        create_with_store(
3451            &store,
3452            json!({
3453                "card_id": format!("{pkg}_a"),
3454                "pkg": { "name": pkg },
3455                "model": { "id": "claude-opus-4-6" },
3456                "stats": { "equilibrium_position": "dead", "survival_rate": 0.0 }
3457            }),
3458        )
3459        .unwrap();
3460        create_with_store(
3461            &store,
3462            json!({
3463                "card_id": format!("{pkg}_b"),
3464                "pkg": { "name": pkg },
3465                "model": { "id": "claude-opus-4-6" },
3466                "stats": { "equilibrium_position": "niche_leader", "survival_rate": 1.0 }
3467            }),
3468        )
3469        .unwrap();
3470        create_with_store(
3471            &store,
3472            json!({
3473                "card_id": format!("{pkg}_c"),
3474                "pkg": { "name": pkg },
3475                "model": { "id": "claude-haiku-4-5-20251001" },
3476                "stats": { "equilibrium_position": "fragile", "survival_rate": 0.2 }
3477            }),
3478        )
3479        .unwrap();
3480
3481        // implicit eq on sparse stats field
3482        let rows = find_with_store(
3483            &store,
3484            FindQuery {
3485                pkg: Some(pkg.to_string()),
3486                where_: Some(where_from(json!({
3487                    "stats": { "equilibrium_position": "dead" },
3488                }))),
3489                ..Default::default()
3490            },
3491        )
3492        .unwrap();
3493        assert_eq!(rows.len(), 1);
3494        assert!(rows[0].card_id.ends_with("_a"));
3495
3496        // _or
3497        let rows = find_with_store(
3498            &store,
3499            FindQuery {
3500                pkg: Some(pkg.to_string()),
3501                where_: Some(where_from(json!({
3502                    "_or": [
3503                        { "stats": { "equilibrium_position": "dead" } },
3504                        { "stats": { "survival_rate": { "gte": 0.9 } } },
3505                    ],
3506                }))),
3507                ..Default::default()
3508            },
3509        )
3510        .unwrap();
3511        assert_eq!(rows.len(), 2);
3512
3513        // _not
3514        let rows = find_with_store(
3515            &store,
3516            FindQuery {
3517                pkg: Some(pkg.to_string()),
3518                where_: Some(where_from(json!({
3519                    "_not": { "model": { "id": "claude-haiku-4-5-20251001" } },
3520                }))),
3521                ..Default::default()
3522            },
3523        )
3524        .unwrap();
3525        assert_eq!(rows.len(), 2);
3526
3527        // in operator
3528        let rows = find_with_store(
3529            &store,
3530            FindQuery {
3531                pkg: Some(pkg.to_string()),
3532                where_: Some(where_from(json!({
3533                    "stats": {
3534                        "equilibrium_position": { "in": ["dead", "fragile"] },
3535                    },
3536                }))),
3537                ..Default::default()
3538            },
3539        )
3540        .unwrap();
3541        assert_eq!(rows.len(), 2);
3542
3543        // exists false (sparse field missing on haiku card? all have it, so test on
3544        // a field that only some have)
3545        let rows = find_with_store(
3546            &store,
3547            FindQuery {
3548                pkg: Some(pkg.to_string()),
3549                where_: Some(where_from(json!({
3550                    "strategy_params": { "temperature": { "exists": false } },
3551                }))),
3552                ..Default::default()
3553            },
3554        )
3555        .unwrap();
3556        assert_eq!(rows.len(), 3, "none of the cards have strategy_params");
3557    }
3558
3559    #[test]
3560    fn find_order_by_multi_key() {
3561        let tmp = tempfile::tempdir().unwrap();
3562        let store = FileCardStore::new(tmp.path().to_path_buf());
3563        let pkg = "find_order_multi_pkg";
3564        create_with_store(
3565            &store,
3566            json!({
3567                "card_id": format!("{pkg}_a"),
3568                "pkg": { "name": pkg },
3569                "stats": { "pass_rate": 0.5 }
3570            }),
3571        )
3572        .unwrap();
3573        create_with_store(
3574            &store,
3575            json!({
3576                "card_id": format!("{pkg}_b"),
3577                "pkg": { "name": pkg },
3578                "stats": { "pass_rate": 0.9 }
3579            }),
3580        )
3581        .unwrap();
3582        create_with_store(
3583            &store,
3584            json!({
3585                "card_id": format!("{pkg}_c"),
3586                "pkg": { "name": pkg },
3587                "stats": { "pass_rate": 0.9 }
3588            }),
3589        )
3590        .unwrap();
3591
3592        let rows = find_with_store(
3593            &store,
3594            FindQuery {
3595                pkg: Some(pkg.to_string()),
3596                order_by: order_from(json!(["-stats.pass_rate", "card_id"])),
3597                ..Default::default()
3598            },
3599        )
3600        .unwrap();
3601        assert_eq!(rows.len(), 3);
3602        assert_eq!(rows[0].pass_rate, Some(0.9));
3603        assert_eq!(rows[1].pass_rate, Some(0.9));
3604        assert_eq!(rows[2].pass_rate, Some(0.5));
3605        // Tiebreak by card_id ascending
3606        assert!(rows[0].card_id < rows[1].card_id);
3607    }
3608
3609    #[test]
3610    fn find_offset_and_limit() {
3611        let tmp = tempfile::tempdir().unwrap();
3612        let store = FileCardStore::new(tmp.path().to_path_buf());
3613        let pkg = "find_offset_limit_pkg";
3614        for i in 0..5 {
3615            create_with_store(
3616                &store,
3617                json!({
3618                    "card_id": format!("{pkg}_{i}"),
3619                    "pkg": { "name": pkg },
3620                    "stats": { "pass_rate": 0.1 * (i + 1) as f64 }
3621                }),
3622            )
3623            .unwrap();
3624        }
3625
3626        let rows = find_with_store(
3627            &store,
3628            FindQuery {
3629                pkg: Some(pkg.to_string()),
3630                order_by: order_from(json!("-stats.pass_rate")),
3631                offset: Some(1),
3632                limit: Some(2),
3633                ..Default::default()
3634            },
3635        )
3636        .unwrap();
3637        assert_eq!(rows.len(), 2);
3638        // Best is 0.5, after offset=1 we start at 0.4 then 0.3.
3639        let pr0 = rows[0].pass_rate.unwrap();
3640        let pr1 = rows[1].pass_rate.unwrap();
3641        assert!((pr0 - 0.4).abs() < 1e-9, "got {pr0}");
3642        assert!((pr1 - 0.3).abs() < 1e-9, "got {pr1}");
3643    }
3644
3645    #[test]
3646    fn parse_where_rejects_non_object() {
3647        assert!(parse_where(&json!("not an object")).is_err());
3648        assert!(parse_where(&json!(42)).is_err());
3649    }
3650
3651    #[test]
3652    fn parse_order_by_accepts_string_and_array() {
3653        let k = parse_order_by(&json!("-stats.pass_rate")).unwrap();
3654        assert_eq!(k.len(), 1);
3655        assert_eq!(k[0].path, vec!["stats", "pass_rate"]);
3656        assert!(k[0].desc);
3657
3658        let k = parse_order_by(&json!(["created_at", "-stats.n"])).unwrap();
3659        assert_eq!(k.len(), 2);
3660        assert!(!k[0].desc);
3661        assert!(k[1].desc);
3662    }
3663
3664    #[test]
3665    fn find_where_string_ops_contains_and_starts_with() {
3666        let tmp = tempfile::tempdir().unwrap();
3667        let store = FileCardStore::new(tmp.path().to_path_buf());
3668        let pkg = "find_string_ops_pkg";
3669        create_with_store(
3670            &store,
3671            json!({
3672                "card_id": format!("{pkg}_a"),
3673                "pkg": { "name": pkg },
3674                "model": { "id": "claude-opus-4-6" },
3675                "metadata": { "tag": "experiment_alpha" },
3676            }),
3677        )
3678        .unwrap();
3679        create_with_store(
3680            &store,
3681            json!({
3682                "card_id": format!("{pkg}_b"),
3683                "pkg": { "name": pkg },
3684                "model": { "id": "claude-haiku-4-5-20251001" },
3685                "metadata": { "tag": "experiment_beta" },
3686            }),
3687        )
3688        .unwrap();
3689        create_with_store(
3690            &store,
3691            json!({
3692                "card_id": format!("{pkg}_c"),
3693                "pkg": { "name": pkg },
3694                "model": { "id": "claude-sonnet-4-5" },
3695                "metadata": { "tag": "baseline" },
3696            }),
3697        )
3698        .unwrap();
3699
3700        // contains: matches substring anywhere
3701        let rows = find_with_store(
3702            &store,
3703            FindQuery {
3704                pkg: Some(pkg.to_string()),
3705                where_: Some(where_from(json!({
3706                    "metadata": { "tag": { "contains": "experiment" } },
3707                }))),
3708                ..Default::default()
3709            },
3710        )
3711        .unwrap();
3712        assert_eq!(rows.len(), 2);
3713
3714        // starts_with: matches only the prefix
3715        let rows = find_with_store(
3716            &store,
3717            FindQuery {
3718                pkg: Some(pkg.to_string()),
3719                where_: Some(where_from(json!({
3720                    "model": { "id": { "starts_with": "claude-opus" } },
3721                }))),
3722                ..Default::default()
3723            },
3724        )
3725        .unwrap();
3726        assert_eq!(rows.len(), 1);
3727        assert!(rows[0].card_id.ends_with("_a"));
3728
3729        // string ops on missing field → false
3730        let rows = find_with_store(
3731            &store,
3732            FindQuery {
3733                pkg: Some(pkg.to_string()),
3734                where_: Some(where_from(json!({
3735                    "metadata": { "missing_field": { "contains": "x" } },
3736                }))),
3737                ..Default::default()
3738            },
3739        )
3740        .unwrap();
3741        assert_eq!(rows.len(), 0);
3742
3743        // string ops on non-string field → false
3744        let rows = find_with_store(
3745            &store,
3746            FindQuery {
3747                pkg: Some(pkg.to_string()),
3748                where_: Some(where_from(json!({
3749                    "metadata": { "tag": { "starts_with": 42 } },
3750                }))),
3751                ..Default::default()
3752            },
3753        )
3754        .unwrap();
3755        assert_eq!(rows.len(), 0);
3756    }
3757
3758    #[test]
3759    fn where_missing_field_ne_is_true() {
3760        let tmp = tempfile::tempdir().unwrap();
3761        let store = FileCardStore::new(tmp.path().to_path_buf());
3762        let pkg = "where_missing_ne_pkg";
3763        create_with_store(
3764            &store,
3765            json!({
3766                "card_id": format!("{pkg}_x"),
3767                "pkg": { "name": pkg },
3768            }),
3769        )
3770        .unwrap();
3771
3772        let rows = find_with_store(
3773            &store,
3774            FindQuery {
3775                pkg: Some(pkg.to_string()),
3776                where_: Some(where_from(json!({
3777                    "strategy_params": { "temperature": { "ne": 0.5 } },
3778                }))),
3779                ..Default::default()
3780            },
3781        )
3782        .unwrap();
3783        assert_eq!(rows.len(), 1, "missing field is ne to anything");
3784    }
3785
3786    // ─── lineage ───────────────────────────────────────────────
3787
3788    /// Helper: create a child Card pointing at a parent with a relation.
3789    fn create_child(
3790        store: &FileCardStore,
3791        pkg: &str,
3792        suffix: &str,
3793        parent_id: &str,
3794        relation: &str,
3795    ) -> String {
3796        let (id, _) = create_with_store(
3797            store,
3798            json!({
3799                "card_id": format!("{pkg}_{suffix}"),
3800                "pkg": { "name": pkg },
3801                "stats": { "pass_rate": 0.5 },
3802                "metadata": {
3803                    "prior_card_id": parent_id,
3804                    "prior_relation": relation,
3805                },
3806            }),
3807        )
3808        .unwrap();
3809        id
3810    }
3811
3812    #[test]
3813    fn lineage_up_walks_prior_card_id_chain() {
3814        let tmp = tempfile::tempdir().unwrap();
3815        let store = FileCardStore::new(tmp.path().to_path_buf());
3816        let pkg = "lineage_up_pkg";
3817        // a → b → c (c is newest; b points at a; c points at b)
3818        let (a, _) = create_with_store(
3819            &store,
3820            json!({
3821                "card_id": format!("{pkg}_a"),
3822                "pkg": { "name": pkg },
3823            }),
3824        )
3825        .unwrap();
3826        let b = create_child(&store, pkg, "b", &a, "rerun_of");
3827        let c = create_child(&store, pkg, "c", &b, "rerun_of");
3828
3829        let res = lineage_with_store(
3830            &store,
3831            LineageQuery {
3832                card_id: c.clone(),
3833                direction: LineageDirection::Up,
3834                depth: None,
3835                include_stats: false,
3836                relation_filter: None,
3837            },
3838        )
3839        .unwrap()
3840        .expect("lineage result");
3841
3842        assert_eq!(res.root, c);
3843        assert_eq!(res.nodes.len(), 3, "root + 2 ancestors");
3844        assert_eq!(res.nodes[0].card_id, c);
3845        assert_eq!(res.nodes[0].depth, 0);
3846        assert_eq!(res.nodes[1].card_id, b);
3847        assert_eq!(res.nodes[1].depth, -1);
3848        assert_eq!(res.nodes[2].card_id, a);
3849        assert_eq!(res.nodes[2].depth, -2);
3850        assert_eq!(res.edges.len(), 2);
3851        assert!(!res.truncated);
3852    }
3853
3854    #[test]
3855    fn lineage_down_walks_descendants_breadth_first() {
3856        let tmp = tempfile::tempdir().unwrap();
3857        let store = FileCardStore::new(tmp.path().to_path_buf());
3858        let pkg = "lineage_down_pkg";
3859        // a has two children b, c; c has one child d.
3860        let (a, _) = create_with_store(
3861            &store,
3862            json!({
3863                "card_id": format!("{pkg}_a"),
3864                "pkg": { "name": pkg },
3865            }),
3866        )
3867        .unwrap();
3868        let _b = create_child(&store, pkg, "b", &a, "sweep_variant");
3869        let c = create_child(&store, pkg, "c", &a, "sweep_variant");
3870        let _d = create_child(&store, pkg, "d", &c, "rerun_of");
3871
3872        let res = lineage_with_store(
3873            &store,
3874            LineageQuery {
3875                card_id: a.clone(),
3876                direction: LineageDirection::Down,
3877                depth: None,
3878                include_stats: false,
3879                relation_filter: None,
3880            },
3881        )
3882        .unwrap()
3883        .expect("lineage result");
3884
3885        // root + b + c + d = 4 nodes
3886        assert_eq!(res.nodes.len(), 4);
3887        assert_eq!(res.edges.len(), 3);
3888        assert!(!res.truncated);
3889    }
3890
3891    #[test]
3892    fn lineage_depth_truncation_sets_flag() {
3893        let tmp = tempfile::tempdir().unwrap();
3894        let store = FileCardStore::new(tmp.path().to_path_buf());
3895        let pkg = "lineage_depth_pkg";
3896        let (a, _) = create_with_store(
3897            &store,
3898            json!({
3899                "card_id": format!("{pkg}_a"),
3900                "pkg": { "name": pkg },
3901            }),
3902        )
3903        .unwrap();
3904        let b = create_child(&store, pkg, "b", &a, "rerun_of");
3905        let _c = create_child(&store, pkg, "c", &b, "rerun_of");
3906
3907        let res = lineage_with_store(
3908            &store,
3909            LineageQuery {
3910                card_id: a,
3911                direction: LineageDirection::Down,
3912                depth: Some(1),
3913                include_stats: false,
3914                relation_filter: None,
3915            },
3916        )
3917        .unwrap()
3918        .unwrap();
3919        assert_eq!(res.nodes.len(), 2, "root + 1 level");
3920        assert!(res.truncated, "should be truncated at depth=1");
3921    }
3922
3923    #[test]
3924    fn lineage_relation_filter_skips_unlisted() {
3925        let tmp = tempfile::tempdir().unwrap();
3926        let store = FileCardStore::new(tmp.path().to_path_buf());
3927        let pkg = "lineage_filter_pkg";
3928        let (a, _) = create_with_store(
3929            &store,
3930            json!({
3931                "card_id": format!("{pkg}_a"),
3932                "pkg": { "name": pkg },
3933            }),
3934        )
3935        .unwrap();
3936        let _b = create_child(&store, pkg, "b", &a, "sweep_variant");
3937        let _c = create_child(&store, pkg, "c", &a, "rerun_of");
3938
3939        let res = lineage_with_store(
3940            &store,
3941            LineageQuery {
3942                card_id: a,
3943                direction: LineageDirection::Down,
3944                depth: None,
3945                include_stats: false,
3946                relation_filter: Some(vec!["sweep_variant".to_string()]),
3947            },
3948        )
3949        .unwrap()
3950        .unwrap();
3951        assert_eq!(res.nodes.len(), 2, "root + only sweep_variant child");
3952        assert_eq!(res.edges[0].relation.as_deref(), Some("sweep_variant"));
3953    }
3954
3955    #[test]
3956    fn lineage_missing_card_returns_none() {
3957        let tmp = tempfile::tempdir().unwrap();
3958        let store = FileCardStore::new(tmp.path().to_path_buf());
3959        let res = lineage_with_store(
3960            &store,
3961            LineageQuery {
3962                card_id: "nonexistent_card_id_xyz".into(),
3963                direction: LineageDirection::Up,
3964                depth: None,
3965                include_stats: false,
3966                relation_filter: None,
3967            },
3968        )
3969        .unwrap();
3970        assert!(res.is_none());
3971    }
3972
3973    // ─── samples sidecar ───────────────────────────────────────
3974
3975    // Isolated `FileCardStore::new(tempdir)` sidesteps the shared-root race
3976    // in `find_card_locator`; see `read_samples_empty_when_absent` for the
3977    // full root-cause write-up.
3978    #[test]
3979    fn write_and_read_samples_roundtrip() {
3980        let tmp = tempfile::tempdir().unwrap();
3981        let store = FileCardStore::new(tmp.path().to_path_buf());
3982        let (id, _) = create_with_store(
3983            &store,
3984            json!({
3985                "pkg": { "name": "roundtrip_pkg" },
3986                "stats": { "pass_rate": 0.5 }
3987            }),
3988        )
3989        .unwrap();
3990
3991        let samples = vec![
3992            json!({ "case": "c0", "passed": true, "score": 1.0 }),
3993            json!({ "case": "c1", "passed": false, "score": 0.0 }),
3994            json!({ "case": "c2", "passed": true, "score": 0.75 }),
3995        ];
3996        let path = write_samples_with_store(&store, &id, samples.clone()).unwrap();
3997        assert!(path.exists());
3998        assert!(path.to_string_lossy().ends_with(".samples.jsonl"));
3999
4000        let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4001        assert_eq!(got.len(), 3);
4002        assert_eq!(got[0]["case"], json!("c0"));
4003        assert_eq!(got[2]["score"], json!(0.75));
4004
4005        // offset + limit
4006        let slice = read_samples_with_store(
4007            &store,
4008            &id,
4009            SamplesQuery {
4010                offset: 1,
4011                limit: Some(1),
4012                where_: None,
4013            },
4014        )
4015        .unwrap();
4016        assert_eq!(slice.len(), 1);
4017        assert_eq!(slice[0]["case"], json!("c1"));
4018    }
4019
4020    #[test]
4021    fn write_samples_is_write_once() {
4022        let tmp = tempfile::tempdir().unwrap();
4023        let store = FileCardStore::new(tmp.path().to_path_buf());
4024        let (id, _) =
4025            create_with_store(&store, json!({ "pkg": { "name": "write_once_pkg" } })).unwrap();
4026        write_samples_with_store(&store, &id, vec![json!({ "x": 1 })]).unwrap();
4027        let err = write_samples_with_store(&store, &id, vec![json!({ "x": 2 })]).unwrap_err();
4028        assert!(err.contains("already exist"), "got: {err}");
4029    }
4030
4031    // Previously used `create` / `read_samples` (default `~/.algocline/cards/`
4032    // store). Under `cargo test --workspace` parallel runs, `find_card_locator`
4033    // scans the shared root with `fs::read_dir(...).flatten()` which silently
4034    // drops transient I/O errors — on macOS APFS, a concurrent `remove_dir_all`
4035    // from another test's `cleanup(pkg)` could trigger that transient error and
4036    // cause this test's just-created pkg dir entry to be missed, propagating
4037    // `card '...' not found` up through `samples_path` → `read_samples`.
4038    //
4039    // Isolating via `FileCardStore::new(tempdir)` + `_with_store` variants
4040    // sidesteps the shared-root race entirely. Same pattern as
4041    // `custom_root_file_store_roundtrip` / `test_fanout_concurrent_*`.
4042    #[test]
4043    fn read_samples_empty_when_absent() {
4044        let tmp = tempfile::tempdir().unwrap();
4045        let store = FileCardStore::new(tmp.path().to_path_buf());
4046        let (id, _) = create_with_store(
4047            &store,
4048            json!({ "pkg": { "name": "read_samples_empty_pkg" } }),
4049        )
4050        .unwrap();
4051        let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4052        assert!(got.is_empty());
4053    }
4054
4055    #[test]
4056    fn read_samples_where_filters_rows() {
4057        let tmp = tempfile::tempdir().unwrap();
4058        let store = FileCardStore::new(tmp.path().to_path_buf());
4059        let (id, _) =
4060            create_with_store(&store, json!({ "pkg": { "name": "where_filter_pkg" } })).unwrap();
4061        write_samples_with_store(
4062            &store,
4063            &id,
4064            vec![
4065                json!({ "case": "c0", "passed": true,  "score": 1.0 }),
4066                json!({ "case": "c1", "passed": false, "score": 0.0 }),
4067                json!({ "case": "c2", "passed": true,  "score": 0.25 }),
4068                json!({ "case": "c3", "passed": true,  "score": 0.75 }),
4069                json!({ "case": "c4", "passed": false, "score": 0.5 }),
4070            ],
4071        )
4072        .unwrap();
4073
4074        // Equality predicate: passed == true keeps 3 rows.
4075        let pred = parse_where(&json!({ "passed": true })).unwrap();
4076        let got = read_samples_with_store(
4077            &store,
4078            &id,
4079            SamplesQuery {
4080                offset: 0,
4081                limit: None,
4082                where_: Some(pred),
4083            },
4084        )
4085        .unwrap();
4086        assert_eq!(got.len(), 3);
4087        assert_eq!(got[0]["case"], json!("c0"));
4088        assert_eq!(got[1]["case"], json!("c2"));
4089        assert_eq!(got[2]["case"], json!("c3"));
4090
4091        // Nested comparator: score gte 0.5 keeps c0/c3/c4.
4092        let pred = parse_where(&json!({ "score": { "gte": 0.5 } })).unwrap();
4093        let got = read_samples_with_store(
4094            &store,
4095            &id,
4096            SamplesQuery {
4097                offset: 0,
4098                limit: None,
4099                where_: Some(pred),
4100            },
4101        )
4102        .unwrap();
4103        assert_eq!(got.len(), 3);
4104        assert_eq!(got[0]["case"], json!("c0"));
4105        assert_eq!(got[1]["case"], json!("c3"));
4106        assert_eq!(got[2]["case"], json!("c4"));
4107
4108        // Offset applies AFTER filter: passed=true then skip 1 + limit 1 → c2.
4109        let pred = parse_where(&json!({ "passed": true })).unwrap();
4110        let slice = read_samples_with_store(
4111            &store,
4112            &id,
4113            SamplesQuery {
4114                offset: 1,
4115                limit: Some(1),
4116                where_: Some(pred),
4117            },
4118        )
4119        .unwrap();
4120        assert_eq!(slice.len(), 1);
4121        assert_eq!(slice[0]["case"], json!("c2"));
4122    }
4123
4124    #[test]
4125    fn get_by_alias_roundtrip() {
4126        let tmp = tempfile::tempdir().unwrap();
4127        let store = FileCardStore::new(tmp.path().to_path_buf());
4128        let pkg = "get_by_alias_pkg";
4129        let (id, _) = create_with_store(
4130            &store,
4131            json!({
4132                "pkg": { "name": pkg },
4133                "stats": { "pass_rate": 0.85 }
4134            }),
4135        )
4136        .unwrap();
4137
4138        let alias_name = "best_by_alias";
4139        alias_set_with_store(&store, alias_name, &id, Some(pkg), None).unwrap();
4140
4141        let card = get_by_alias_with_store(&store, alias_name)
4142            .unwrap()
4143            .unwrap();
4144        assert_eq!(card["card_id"], json!(id));
4145        assert_eq!(card["stats"]["pass_rate"], json!(0.85));
4146
4147        assert!(get_by_alias_with_store(&store, "nonexistent_alias_xyz")
4148            .unwrap()
4149            .is_none());
4150    }
4151
4152    #[test]
4153    fn samples_errors_on_missing_card() {
4154        let tmp = tempfile::tempdir().unwrap();
4155        let store = FileCardStore::new(tmp.path().to_path_buf());
4156        let err = write_samples_with_store(&store, "does_not_exist_xyz_samples", vec![json!({})])
4157            .unwrap_err();
4158        assert!(err.contains("not found"));
4159    }
4160
4161    // ─── import_from_dir ───────────────────────────────────────
4162
4163    #[test]
4164    fn import_from_dir_copies_cards() {
4165        let pkg = "import_copies_pkg";
4166        let src_tmp = tempfile::tempdir().unwrap();
4167        let store_tmp = tempfile::tempdir().unwrap();
4168        let store = FileCardStore::new(store_tmp.path().to_path_buf());
4169
4170        // Create a source card file
4171        let card_id = format!("{pkg}_imported");
4172        let card_content = format!(
4173            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{card_id}\"\npkg = \"{pkg}\"\n"
4174        );
4175        fs::write(
4176            src_tmp.path().join(format!("{card_id}.toml")),
4177            &card_content,
4178        )
4179        .unwrap();
4180
4181        // Create a matching samples file
4182        fs::write(
4183            src_tmp.path().join(format!("{card_id}.samples.jsonl")),
4184            "{\"case\":\"c0\"}\n",
4185        )
4186        .unwrap();
4187
4188        let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4189        assert_eq!(imported, vec![card_id.clone()]);
4190        assert!(skipped.is_empty());
4191
4192        // Verify card was imported
4193        let got = get_with_store(&store, &card_id).unwrap().unwrap();
4194        assert_eq!(got["card_id"], json!(card_id));
4195
4196        // Verify samples were copied
4197        let samples = read_samples_with_store(&store, &card_id, SamplesQuery::default()).unwrap();
4198        assert_eq!(samples.len(), 1);
4199    }
4200
4201    #[test]
4202    fn import_from_dir_skips_existing() {
4203        let store_tmp = tempfile::tempdir().unwrap();
4204        let store = FileCardStore::new(store_tmp.path().to_path_buf());
4205        let pkg = "import_skips_existing_pkg";
4206        // Create a card in the store first
4207        let (existing_id, _) = create_with_store(
4208            &store,
4209            json!({
4210                "pkg": { "name": pkg },
4211                "stats": { "pass_rate": 0.5 }
4212            }),
4213        )
4214        .unwrap();
4215
4216        // Try to import a card with the same id
4217        let src_tmp = tempfile::tempdir().unwrap();
4218        let card_content = format!(
4219            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{existing_id}\"\npkg = \"{pkg}\"\n"
4220        );
4221        fs::write(
4222            src_tmp.path().join(format!("{existing_id}.toml")),
4223            &card_content,
4224        )
4225        .unwrap();
4226
4227        let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4228        assert!(imported.is_empty());
4229        assert_eq!(skipped, vec![existing_id.clone()]);
4230
4231        // Original card untouched
4232        let got = get_with_store(&store, &existing_id).unwrap().unwrap();
4233        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
4234    }
4235
4236    #[test]
4237    fn import_from_dir_skips_non_card_toml() {
4238        let store_tmp = tempfile::tempdir().unwrap();
4239        let store = FileCardStore::new(store_tmp.path().to_path_buf());
4240        let pkg = "import_skips_non_card_pkg";
4241        let src_tmp = tempfile::tempdir().unwrap();
4242
4243        // A TOML file without schema_version = "card/v0" should be skipped
4244        fs::write(
4245            src_tmp.path().join("not_a_card.toml"),
4246            "title = \"hello\"\n",
4247        )
4248        .unwrap();
4249
4250        let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4251        assert!(imported.is_empty());
4252        assert!(skipped.is_empty());
4253    }
4254
4255    // ─── PathCardStore (FileCardStore rooted at a custom path) ──────
4256    //
4257    // Smoke test proving the trait boundary lets callers swap the
4258    // storage root without touching `~/.algocline/cards/`.
4259
4260    #[test]
4261    fn custom_root_file_store_roundtrip() {
4262        let tmp = tempfile::tempdir().unwrap();
4263        let store = FileCardStore::new(tmp.path().to_path_buf());
4264        let pkg = "custom_root_pkg";
4265
4266        // create → get → list through the _with_store variants
4267        let (id, path) = create_with_store(
4268            &store,
4269            json!({
4270                "pkg":   { "name": pkg },
4271                "model": { "id": "gpt-test" },
4272            }),
4273        )
4274        .unwrap();
4275        assert!(path.starts_with(tmp.path()));
4276        assert!(path.ends_with(format!("{id}.toml")));
4277
4278        let card = get_with_store(&store, &id).unwrap().expect("card exists");
4279        assert_eq!(
4280            card.get("card_id").and_then(|v| v.as_str()),
4281            Some(id.as_str())
4282        );
4283
4284        let rows = list_with_store(&store, Some(pkg)).unwrap();
4285        assert_eq!(rows.len(), 1);
4286        assert_eq!(rows[0].card_id, id);
4287
4288        // Ensure a distinct store does not see the card (isolation check).
4289        let other_tmp = tempfile::tempdir().unwrap();
4290        let other_store = FileCardStore::new(other_tmp.path().to_path_buf());
4291        let default_rows = list_with_store(&other_store, Some(pkg)).unwrap();
4292        assert!(default_rows.iter().all(|r| r.card_id != id));
4293
4294        // alias + lookup scoped to the custom store
4295        alias_set_with_store(&store, "alpha", &id, Some(pkg), None).unwrap();
4296        let via_alias = get_by_alias_with_store(&store, "alpha")
4297            .unwrap()
4298            .expect("alias resolves");
4299        assert_eq!(
4300            via_alias.get("card_id").and_then(|v| v.as_str()),
4301            Some(id.as_str())
4302        );
4303
4304        // samples write/read roundtrip
4305        let samples_path =
4306            write_samples_with_store(&store, &id, vec![json!({ "case": "a", "pass": true })])
4307                .unwrap();
4308        assert!(samples_path.starts_with(tmp.path()));
4309        let back = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4310        assert_eq!(back.len(), 1);
4311        assert_eq!(back[0].get("case").and_then(|v| v.as_str()), Some("a"));
4312    }
4313
4314    // ═══════════════════════════════════════════════════════════════
4315    // Event Publisher Port tests
4316    // ═══════════════════════════════════════════════════════════════
4317
4318    use std::sync::atomic::AtomicUsize;
4319    use std::sync::Barrier;
4320
4321    /// Serialize access to `std::env::set_var("ALC_CARD_SINKS", ...)` so
4322    /// env-touching tests do not race.
4323    fn env_lock() -> &'static Mutex<()> {
4324        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4325        LOCK.get_or_init(|| Mutex::new(()))
4326    }
4327
4328    /// RAII guard that clears `INSIDE_BUS_TEST` on drop, including the panic
4329    /// unwinding path. Without this, a panic inside `f` would leave the
4330    /// thread-local `true` on the cargo-test worker thread, so the next test
4331    /// assigned to the same worker would bypass `bus_test_gate` and corrupt
4332    /// concurrent subscriber mocks.
4333    struct BusTestOwnerGuard;
4334    impl Drop for BusTestOwnerGuard {
4335        fn drop(&mut self) {
4336            INSIDE_BUS_TEST.with(|flag| flag.set(false));
4337        }
4338    }
4339
4340    /// Ensure the global bus is initialized and subscribers are cleared,
4341    /// then install `subs` on the singleton for the duration of a test.
4342    ///
4343    /// This function holds `bus_test_gate()` for its entire duration. Any
4344    /// concurrent `publish()` call from a parallel default-store test will
4345    /// block until we release the gate, preventing event contamination.
4346    /// The INSIDE_BUS_TEST thread-local is set so that publish calls made
4347    /// FROM THIS THREAD (inside `f`) skip the gate and proceed directly
4348    /// (re-entrancy safe).
4349    ///
4350    /// If the test spawns child threads that also publish, those children
4351    /// must set INSIDE_BUS_TEST to true themselves (see
4352    /// `test_fanout_concurrent_create_with_store`). Otherwise they block on
4353    /// the gate held by this owner thread and deadlock on join.
4354    fn with_bus_subscribers<F>(subs: Vec<Arc<dyn CardSubscriber>>, f: F)
4355    where
4356        F: FnOnce(&'static CardEventBus),
4357    {
4358        // Acquire the gate FIRST. While we wait, no one else holds the owner
4359        // role, and our INSIDE_BUS_TEST is still false, so this lock is safe.
4360        let _guard = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
4361        // Now mark this thread as the bus-test owner so that publish() from
4362        // within the closure does not try to re-acquire bus_test_gate().
4363        // The RAII guard clears the flag on both normal return and unwind.
4364        INSIDE_BUS_TEST.with(|flag| flag.set(true));
4365        let _owner = BusTestOwnerGuard;
4366        let bus = event_bus();
4367        bus.reset_stats_for_test();
4368        bus.replace_subscribers_for_test(subs);
4369        f(bus);
4370        // Leave the bus clean for the next test.
4371        bus.replace_subscribers_for_test(Vec::new());
4372        bus.reset_stats_for_test();
4373        // _owner drops -> INSIDE_BUS_TEST = false (panic-safe)
4374        // _guard drops -> bus_test_gate released
4375    }
4376
4377    /// In-memory subscriber used for deterministic fan-out assertions.
4378    struct MockSubscriber {
4379        uri: String,
4380        events: Mutex<Vec<CardEvent>>,
4381        calls: AtomicUsize,
4382    }
4383
4384    impl MockSubscriber {
4385        fn new(uri: &str) -> Arc<Self> {
4386            Arc::new(Self {
4387                uri: uri.to_string(),
4388                events: Mutex::new(Vec::new()),
4389                calls: AtomicUsize::new(0),
4390            })
4391        }
4392        fn call_count(&self) -> usize {
4393            self.calls.load(Ordering::SeqCst)
4394        }
4395    }
4396
4397    impl CardSubscriber for MockSubscriber {
4398        fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
4399            self.calls.fetch_add(1, Ordering::SeqCst);
4400            self.events
4401                .lock()
4402                .unwrap_or_else(|p| p.into_inner())
4403                .push(ev.clone());
4404            Ok(())
4405        }
4406        fn describe(&self) -> String {
4407            self.uri.clone()
4408        }
4409    }
4410
4411    // ─── Bus lifetime ─────────────────────────────────────────
4412
4413    #[test]
4414    fn bus_is_process_singleton() {
4415        let a = event_bus() as *const CardEventBus;
4416        let b = event_bus() as *const CardEventBus;
4417        assert_eq!(a, b, "event_bus() must return the same singleton pointer");
4418    }
4419
4420    #[test]
4421    fn publish_with_no_subscribers_is_noop() {
4422        with_bus_subscribers(Vec::new(), |_bus| {
4423            // Should not panic; publish is a pure no-op when empty.
4424            publish(CardEvent::Created {
4425                pkg: "pkg".into(),
4426                card_id: "id".into(),
4427                toml_text: "x = 1\n".into(),
4428            });
4429        });
4430    }
4431
4432    #[test]
4433    fn init_event_bus_is_idempotent() {
4434        init_event_bus();
4435        init_event_bus();
4436        init_event_bus();
4437        // Reaching here without panic is success.
4438    }
4439
4440    // ─── Fan-out core ─────────────────────────────────────────
4441
4442    #[test]
4443    fn fanout_mirrors_create() {
4444        let primary = tempfile::tempdir().unwrap();
4445        let sub_a = tempfile::tempdir().unwrap();
4446        let sub_b = tempfile::tempdir().unwrap();
4447        let fa = Arc::new(FileCardSubscriber::new(sub_a.path().to_path_buf()));
4448        let fb = Arc::new(FileCardSubscriber::new(sub_b.path().to_path_buf()));
4449        with_bus_subscribers(vec![fa.clone(), fb.clone()], |_bus| {
4450            let store = FileCardStore::new(primary.path().to_path_buf());
4451            let (id, path) =
4452                create_with_store(&store, json!({ "pkg": { "name": "fanout_create_pkg" } }))
4453                    .unwrap();
4454            assert!(path.exists());
4455            let primary_text = fs::read_to_string(&path).unwrap();
4456            let a_path = sub_a
4457                .path()
4458                .join("fanout_create_pkg")
4459                .join(format!("{id}.toml"));
4460            let b_path = sub_b
4461                .path()
4462                .join("fanout_create_pkg")
4463                .join(format!("{id}.toml"));
4464            assert!(a_path.exists(), "subscriber A missing card");
4465            assert!(b_path.exists(), "subscriber B missing card");
4466            assert_eq!(fs::read_to_string(&a_path).unwrap(), primary_text);
4467            assert_eq!(fs::read_to_string(&b_path).unwrap(), primary_text);
4468        });
4469    }
4470
4471    #[test]
4472    fn fanout_mirrors_append() {
4473        let primary = tempfile::tempdir().unwrap();
4474        let sub = tempfile::tempdir().unwrap();
4475        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4476        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4477            let store = FileCardStore::new(primary.path().to_path_buf());
4478            let (id, _) =
4479                create_with_store(&store, json!({ "pkg": { "name": "fanout_append_pkg" } }))
4480                    .unwrap();
4481            // After create the subscriber must have the card so append can locate it.
4482            append_with_store(&store, &id, json!({ "extra_key": 42 })).unwrap();
4483            let sub_path = sub
4484                .path()
4485                .join("fanout_append_pkg")
4486                .join(format!("{id}.toml"));
4487            let text = fs::read_to_string(&sub_path).unwrap();
4488            assert!(text.contains("extra_key"), "append must mirror new key");
4489        });
4490    }
4491
4492    #[test]
4493    fn fanout_mirrors_samples() {
4494        let primary = tempfile::tempdir().unwrap();
4495        let sub = tempfile::tempdir().unwrap();
4496        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4497        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4498            let store = FileCardStore::new(primary.path().to_path_buf());
4499            let (id, _) =
4500                create_with_store(&store, json!({ "pkg": { "name": "fanout_samples_pkg" } }))
4501                    .unwrap();
4502            write_samples_with_store(&store, &id, vec![json!({ "case": "c0" })]).unwrap();
4503            let sub_path = sub
4504                .path()
4505                .join("fanout_samples_pkg")
4506                .join(format!("{id}.samples.jsonl"));
4507            let text = fs::read_to_string(&sub_path).unwrap();
4508            assert!(text.contains("\"case\":\"c0\""));
4509        });
4510    }
4511
4512    #[test]
4513    fn fanout_mirrors_aliases() {
4514        let primary = tempfile::tempdir().unwrap();
4515        let sub = tempfile::tempdir().unwrap();
4516        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4517        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4518            let store = FileCardStore::new(primary.path().to_path_buf());
4519            let (id, _) =
4520                create_with_store(&store, json!({ "pkg": { "name": "fanout_alias_pkg" } }))
4521                    .unwrap();
4522            alias_set_with_store(&store, "myalias", &id, Some("fanout_alias_pkg"), None).unwrap();
4523            let sub_aliases = sub.path().join("_aliases.toml");
4524            assert!(sub_aliases.exists(), "subscriber must receive aliases file");
4525            let text = fs::read_to_string(&sub_aliases).unwrap();
4526            assert!(text.contains("myalias"));
4527        });
4528    }
4529
4530    #[test]
4531    fn fanout_mirrors_import_from_dir_cards() {
4532        let primary = tempfile::tempdir().unwrap();
4533        let sub = tempfile::tempdir().unwrap();
4534        let src = tempfile::tempdir().unwrap();
4535
4536        // Build a pre-existing source tree (a previous run's output).
4537        let src_card = src.path().join("card_x.toml");
4538        let toml_body = format!(
4539            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_x\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_pkg\"\n"
4540        );
4541        fs::write(&src_card, &toml_body).unwrap();
4542
4543        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4544        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4545            let store = FileCardStore::new(primary.path().to_path_buf());
4546            let (imported, _skipped) =
4547                import_from_dir_with_store(&store, src.path(), "fanout_import_pkg").unwrap();
4548            assert_eq!(imported, vec!["card_x".to_string()]);
4549
4550            let sub_card = sub.path().join("fanout_import_pkg").join("card_x.toml");
4551            assert!(sub_card.exists(), "imported card must be mirrored");
4552        });
4553    }
4554
4555    #[test]
4556    fn fanout_mirrors_import_from_dir_samples() {
4557        let primary = tempfile::tempdir().unwrap();
4558        let sub = tempfile::tempdir().unwrap();
4559        let src = tempfile::tempdir().unwrap();
4560
4561        let toml_body = format!(
4562            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_y\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_samples_pkg\"\n"
4563        );
4564        fs::write(src.path().join("card_y.toml"), &toml_body).unwrap();
4565        fs::write(
4566            src.path().join("card_y.samples.jsonl"),
4567            "{\"case\":\"c0\"}\n",
4568        )
4569        .unwrap();
4570
4571        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4572        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4573            let store = FileCardStore::new(primary.path().to_path_buf());
4574            let (imported, _) =
4575                import_from_dir_with_store(&store, src.path(), "fanout_import_samples_pkg")
4576                    .unwrap();
4577            assert_eq!(imported, vec!["card_y".to_string()]);
4578
4579            let sub_samples = sub
4580                .path()
4581                .join("fanout_import_samples_pkg")
4582                .join("card_y.samples.jsonl");
4583            assert!(sub_samples.exists(), "imported samples must be mirrored");
4584            let text = fs::read_to_string(&sub_samples).unwrap();
4585            assert!(text.contains("c0"));
4586        });
4587    }
4588
4589    #[test]
4590    fn with_store_direct_call_still_publishes() {
4591        let primary = tempfile::tempdir().unwrap();
4592        let mock = MockSubscriber::new("mock://direct");
4593        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4594            let store = FileCardStore::new(primary.path().to_path_buf());
4595            create_with_store(&store, json!({ "pkg": { "name": "direct_call_pkg" } })).unwrap();
4596            assert_eq!(mock.call_count(), 1, "direct _with_store call must publish");
4597        });
4598    }
4599
4600    #[test]
4601    fn subscriber_appended_missing_card_warns() {
4602        let primary = tempfile::tempdir().unwrap();
4603        let sub = tempfile::tempdir().unwrap();
4604        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4605        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
4606            let store = FileCardStore::new(primary.path().to_path_buf());
4607            // Create the card BEFORE the subscriber knows about it. To do that,
4608            // swap the subscriber out so create does not mirror, then swap in.
4609            bus.replace_subscribers_for_test(Vec::new());
4610            let (id, _) =
4611                create_with_store(&store, json!({ "pkg": { "name": "missing_append_pkg" } }))
4612                    .unwrap();
4613            // Re-install the subscriber; it has no mirror of the card.
4614            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
4615
4616            // The append call must succeed on the primary; the subscriber
4617            // will record an error because locate_card returns None.
4618            append_with_store(&store, &id, json!({ "k": 1 })).unwrap();
4619
4620            let snap = bus.stats().snapshot();
4621            let row = snap
4622                .iter()
4623                .find(|r| r.sink == fs_sub.describe())
4624                .expect("subscriber row exists");
4625            let err_total: u64 = row.err.values().sum();
4626            assert!(err_total >= 1, "subscriber append err must be recorded");
4627            assert!(row.last_error.is_some());
4628        });
4629    }
4630
4631    #[test]
4632    fn subscriber_failure_preserves_primary() {
4633        struct FailingSubscriber;
4634        impl CardSubscriber for FailingSubscriber {
4635            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4636                Err("synthetic failure".into())
4637            }
4638            fn describe(&self) -> String {
4639                "mock://failing".into()
4640            }
4641        }
4642        let primary = tempfile::tempdir().unwrap();
4643        with_bus_subscribers(
4644            vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4645            |bus| {
4646                let store = FileCardStore::new(primary.path().to_path_buf());
4647                // Primary call must still succeed despite subscriber failure.
4648                let (_id, path) =
4649                    create_with_store(&store, json!({ "pkg": { "name": "preserve_primary_pkg" } }))
4650                        .unwrap();
4651                assert!(path.exists());
4652                let snap = bus.stats().snapshot();
4653                let row = snap
4654                    .iter()
4655                    .find(|r| r.sink == "mock://failing")
4656                    .expect("row exists");
4657                let err_total: u64 = row.err.values().sum();
4658                assert!(err_total >= 1);
4659            },
4660        );
4661    }
4662
4663    // ─── SubscriberStats JSON shape tests (Subtask 2) ──────────
4664
4665    #[test]
4666    fn stats_counts_ok() {
4667        let primary = tempfile::tempdir().unwrap();
4668        let mock = MockSubscriber::new("mock://stats-ok");
4669        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |bus| {
4670            let store = FileCardStore::new(primary.path().to_path_buf());
4671            for i in 0..3 {
4672                create_with_store(
4673                    &store,
4674                    json!({
4675                        "card_id": format!("stats_ok_{i}"),
4676                        "pkg": { "name": "stats_ok_pkg" },
4677                    }),
4678                )
4679                .unwrap();
4680            }
4681            let snap = bus.stats().snapshot();
4682            let row = snap
4683                .iter()
4684                .find(|r| r.sink == "mock://stats-ok")
4685                .expect("row");
4686            assert_eq!(row.ok.get("created").copied().unwrap_or(0), 3);
4687            assert_eq!(row.err.get("created").copied().unwrap_or(0), 0);
4688            // All four keys must be present (may be 0).
4689            for k in ["created", "appended", "samples", "aliases"] {
4690                assert!(row.ok.contains_key(k), "ok.{k} must be present");
4691                assert!(row.err.contains_key(k), "err.{k} must be present");
4692            }
4693            assert!(row.last_error.is_none());
4694        });
4695    }
4696
4697    #[test]
4698    fn stats_counts_err_with_last_error() {
4699        struct FailingSubscriber;
4700        impl CardSubscriber for FailingSubscriber {
4701            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4702                Err("synthetic create failure".into())
4703            }
4704            fn describe(&self) -> String {
4705                "mock://stats-err".into()
4706            }
4707        }
4708        let primary = tempfile::tempdir().unwrap();
4709        with_bus_subscribers(
4710            vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4711            |bus| {
4712                let store = FileCardStore::new(primary.path().to_path_buf());
4713                create_with_store(&store, json!({ "pkg": { "name": "stats_err_pkg" } })).unwrap();
4714                let snap = bus.stats().snapshot();
4715                let row = snap
4716                    .iter()
4717                    .find(|r| r.sink == "mock://stats-err")
4718                    .expect("row");
4719                assert_eq!(row.err.get("created").copied().unwrap_or(0), 1);
4720                let le = row.last_error.as_ref().expect("last_error set");
4721                assert!(!le.msg.is_empty(), "last_error.msg must be non-empty");
4722                assert_eq!(le.kind, CardEventKind::Created);
4723                assert!(le.ts_ms > 0, "last_error.ts_ms must be populated");
4724            },
4725        );
4726    }
4727
4728    #[test]
4729    fn stats_per_subscriber_isolated() {
4730        struct FailingSubscriber;
4731        impl CardSubscriber for FailingSubscriber {
4732            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4733                Err("sub1 fails".into())
4734            }
4735            fn describe(&self) -> String {
4736                "mock://sub1-fail".into()
4737            }
4738        }
4739        let primary = tempfile::tempdir().unwrap();
4740        let mock_ok = MockSubscriber::new("mock://sub2-ok");
4741        let subs: Vec<Arc<dyn CardSubscriber>> = vec![
4742            Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>,
4743            mock_ok.clone() as Arc<dyn CardSubscriber>,
4744        ];
4745        with_bus_subscribers(subs, |bus| {
4746            let store = FileCardStore::new(primary.path().to_path_buf());
4747            create_with_store(&store, json!({ "pkg": { "name": "isolated_pkg" } })).unwrap();
4748            let snap = bus.stats().snapshot();
4749            let r1 = snap
4750                .iter()
4751                .find(|r| r.sink == "mock://sub1-fail")
4752                .expect("sub1 row");
4753            let r2 = snap
4754                .iter()
4755                .find(|r| r.sink == "mock://sub2-ok")
4756                .expect("sub2 row");
4757            assert_eq!(r1.err.get("created").copied().unwrap_or(0), 1);
4758            assert_eq!(r1.ok.get("created").copied().unwrap_or(0), 0);
4759            assert_eq!(r2.ok.get("created").copied().unwrap_or(0), 1);
4760            assert_eq!(r2.err.get("created").copied().unwrap_or(0), 0);
4761            assert!(r1.last_error.is_some());
4762            assert!(r2.last_error.is_none());
4763        });
4764    }
4765
4766    #[test]
4767    fn subscriber_stats_survive_multiple_calls() {
4768        // Regression guard: per-call SubscriberStats creation would
4769        // have reset the counter between create_with_store invocations.
4770        // Verify that counters accumulate across 3 independent calls
4771        // against the global bus's stats handle.
4772        let primary = tempfile::tempdir().unwrap();
4773        let mock = MockSubscriber::new("mock://stats-survive");
4774        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4775            let store = FileCardStore::new(primary.path().to_path_buf());
4776            for i in 0..3 {
4777                create_with_store(
4778                    &store,
4779                    json!({
4780                        "card_id": format!("survive_{i}"),
4781                        "pkg": { "name": "survive_pkg" },
4782                    }),
4783                )
4784                .unwrap();
4785            }
4786            // Use the public snapshot entry point to exercise the
4787            // same path that AppService::stats uses.
4788            let snap = subscriber_stats_snapshot();
4789            let row = snap
4790                .iter()
4791                .find(|r| r.sink == "mock://stats-survive")
4792                .expect("row");
4793            assert_eq!(
4794                row.ok.get("created").copied().unwrap_or(0),
4795                3,
4796                "counters must accumulate across calls"
4797            );
4798        });
4799    }
4800
4801    #[test]
4802    fn stats_snapshot_serializes_with_all_kind_keys() {
4803        // Serialize a minimal row and verify JSON field shape.
4804        let primary = tempfile::tempdir().unwrap();
4805        let mock = MockSubscriber::new("mock://json-shape");
4806        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4807            let store = FileCardStore::new(primary.path().to_path_buf());
4808            create_with_store(&store, json!({ "pkg": { "name": "json_shape_pkg" } })).unwrap();
4809            let snap = subscriber_stats_snapshot();
4810            let json = serde_json::to_value(&snap).expect("serializable");
4811            let arr = json.as_array().expect("array");
4812            let row = arr
4813                .iter()
4814                .find(|r| r.get("sink").and_then(|s| s.as_str()) == Some("mock://json-shape"))
4815                .expect("row present in JSON");
4816            assert_eq!(row.get("sink").unwrap(), "mock://json-shape");
4817            for k in ["created", "appended", "samples", "aliases"] {
4818                assert!(row.pointer(&format!("/ok/{k}")).is_some(), "ok.{k} missing");
4819                assert!(
4820                    row.pointer(&format!("/err/{k}")).is_some(),
4821                    "err.{k} missing"
4822                );
4823            }
4824            assert!(row.get("last_error").is_some());
4825        });
4826    }
4827
4828    #[test]
4829    fn multi_process_tmp_unique_suffix() {
4830        // Invoke atomic_write against a fresh dir and capture the tmp
4831        // filename left on disk by forcing rename to happen on an
4832        // already-nonexistent dest. We simulate by writing to a path
4833        // and then inspecting the parent dir during the operation —
4834        // since atomic_write removes tmp on success, we instead check
4835        // the suffix format by constructing it the same way.
4836        let pid = process::id();
4837        let sample = format!("whatever.tmp.{pid}.0");
4838        // Regex-style match without the regex crate dependency:
4839        let rest = sample.trim_start_matches("whatever.tmp.");
4840        let (pid_part, seq_part) = rest.split_once('.').expect("dotted form");
4841        assert!(pid_part.chars().all(|c| c.is_ascii_digit()));
4842        assert!(seq_part.chars().all(|c| c.is_ascii_digit()));
4843
4844        // Real atomic_write round-trip — must not panic and must leave
4845        // the dest file in place with the written bytes.
4846        let dir = tempfile::tempdir().unwrap();
4847        let dest = dir.path().join("out.txt");
4848        atomic_write(&dest, b"hello").unwrap();
4849        assert_eq!(fs::read_to_string(&dest).unwrap(), "hello");
4850    }
4851
4852    // ─── describe / env parser ───────────────────────────────
4853
4854    #[cfg(unix)]
4855    #[test]
4856    fn describe_roundtrips_env_spec() {
4857        let dir = tempfile::tempdir().unwrap();
4858        let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4859        let uri = sub.describe();
4860        assert!(uri.starts_with("file:///"), "unix uri must be file:///...");
4861        // Parse the URI back and confirm the resolved path matches.
4862        let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4863        assert_eq!(parsed.describe(), uri);
4864    }
4865
4866    #[cfg(windows)]
4867    #[test]
4868    fn describe_roundtrips_env_spec_windows() {
4869        let dir = tempfile::tempdir().unwrap();
4870        let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4871        let uri = sub.describe();
4872        assert!(
4873            uri.starts_with("file:///"),
4874            "windows uri must be file:///..."
4875        );
4876        let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4877        assert_eq!(parsed.describe(), uri);
4878    }
4879
4880    #[test]
4881    fn env_empty_means_no_subscribers() {
4882        // env-touching — serialized by env_lock() to avoid races with
4883        // any other env-reading test in this binary.
4884        let _g = env_lock().lock().unwrap_or_else(|p| p.into_inner());
4885        let prev = std::env::var("ALC_CARD_SINKS").ok();
4886        // SAFETY: test-only single-threaded env mutation under mutex.
4887        unsafe {
4888            std::env::set_var("ALC_CARD_SINKS", "");
4889        }
4890        let subs = load_subscribers_from_env();
4891        assert!(subs.is_empty());
4892        // restore
4893        unsafe {
4894            match prev {
4895                Some(v) => std::env::set_var("ALC_CARD_SINKS", v),
4896                None => std::env::remove_var("ALC_CARD_SINKS"),
4897            }
4898        }
4899    }
4900
4901    #[test]
4902    fn env_parse_rejects_bare_path() {
4903        assert!(parse_subscriber_spec("/foo/bar").is_none());
4904    }
4905
4906    #[test]
4907    fn env_parse_rejects_unknown_scheme() {
4908        assert!(parse_subscriber_spec("sqlite:///foo").is_none());
4909        assert!(parse_subscriber_spec("s3://bucket/foo").is_none());
4910        assert!(parse_subscriber_spec("http://example.com/x").is_none());
4911    }
4912
4913    #[test]
4914    fn env_parse_rejects_non_empty_authority() {
4915        assert!(parse_subscriber_spec("file://host/path").is_none());
4916    }
4917
4918    #[test]
4919    fn env_parse_rejects_missing_double_slash() {
4920        assert!(parse_subscriber_spec("file:/foo").is_none());
4921        assert!(parse_subscriber_spec("file:foo").is_none());
4922    }
4923
4924    #[cfg(unix)]
4925    #[test]
4926    fn env_parse_accepts_file_uri() {
4927        let sub = parse_subscriber_spec("file:///tmp/algocline-sinks-unit").expect("accepted");
4928        assert_eq!(sub.describe(), "file:///tmp/algocline-sinks-unit");
4929    }
4930
4931    #[cfg(windows)]
4932    #[test]
4933    fn env_parse_accepts_file_uri_windows() {
4934        let sub = parse_subscriber_spec("file:///C:/algocline-sinks-unit").expect("accepted");
4935        // Windows canonicalization re-emits the same URI.
4936        assert!(sub.describe().starts_with("file:///"));
4937    }
4938
4939    #[test]
4940    fn env_parse_splits_by_pipe() {
4941        let subs = parse_subscribers_from_str("file:///tmp/a|file:///tmp/b");
4942        assert_eq!(subs.len(), 2);
4943        assert_eq!(subs[0].describe(), "file:///tmp/a");
4944        assert_eq!(subs[1].describe(), "file:///tmp/b");
4945    }
4946
4947    #[test]
4948    fn env_parse_treats_colon_as_literal_path() {
4949        // `file:///tmp/a:b` — colon inside the path component is a literal.
4950        #[cfg(unix)]
4951        {
4952            let sub = parse_subscriber_spec("file:///tmp/a:b").expect("accepted");
4953            assert_eq!(sub.describe(), "file:///tmp/a:b");
4954        }
4955        #[cfg(windows)]
4956        {
4957            // On Windows the colon shows up as a drive letter separator.
4958            let sub = parse_subscriber_spec("file:///C:/a:b").expect("accepted");
4959            assert!(sub.describe().contains(":"));
4960        }
4961    }
4962
4963    #[test]
4964    fn env_parse_percent_decode_space() {
4965        #[cfg(unix)]
4966        {
4967            let sub = parse_subscriber_spec("file:///tmp/a%20b").expect("accepted");
4968            assert_eq!(sub.describe(), "file:///tmp/a b");
4969        }
4970        #[cfg(windows)]
4971        {
4972            let sub = parse_subscriber_spec("file:///C:/a%20b").expect("accepted");
4973            assert!(sub.describe().contains(' '));
4974        }
4975    }
4976
4977    #[test]
4978    fn env_parse_percent_decode_rejects_invalid_hex() {
4979        assert!(parse_subscriber_spec("file:///tmp/a%ZZb").is_none());
4980    }
4981
4982    #[test]
4983    fn env_parse_percent_decode_rejects_incomplete() {
4984        assert!(parse_subscriber_spec("file:///tmp/a%2").is_none());
4985        assert!(parse_subscriber_spec("file:///tmp/a%").is_none());
4986    }
4987
4988    #[test]
4989    fn env_parse_rejects_non_utf8() {
4990        // Exercised through `load_subscribers_from_env` via NotUnicode.
4991        // We cannot easily set a non-UTF8 env var cross-platform inside
4992        // a unit test, so we verify the error path indirectly: the
4993        // parser only consumes `String` which is UTF-8 by construction,
4994        // and the env reader branches on VarError::NotUnicode. To keep
4995        // the test meaningful, verify that percent-decoded non-UTF8
4996        // bytes are rejected (closest structural analogue).
4997        // `%C3%28` is an invalid UTF-8 two-byte sequence.
4998        assert!(parse_subscriber_spec("file:///tmp/%C3%28").is_none());
4999    }
5000
5001    #[test]
5002    fn env_parse_dedups_duplicate_uris() {
5003        let subs = parse_subscribers_from_str("file:///tmp/x|file:///tmp/x|file:///tmp/y");
5004        assert_eq!(subs.len(), 2);
5005        assert_eq!(subs[0].describe(), "file:///tmp/x");
5006        assert_eq!(subs[1].describe(), "file:///tmp/y");
5007    }
5008
5009    // ═══════════════════════════════════════════════════════════════
5010    // Concurrency tests (concurrency-analysis.md §2)
5011    // ═══════════════════════════════════════════════════════════════
5012
5013    #[test]
5014    fn test_oncelock_init_race_single_winner() {
5015        // N threads call event_bus() concurrently; all must observe the
5016        // same singleton pointer.
5017        let barrier = Arc::new(Barrier::new(8));
5018        let mut handles = Vec::new();
5019        for _ in 0..8 {
5020            let b = barrier.clone();
5021            handles.push(std::thread::spawn(move || {
5022                b.wait();
5023                event_bus() as *const CardEventBus as usize
5024            }));
5025        }
5026        let ptrs: Vec<usize> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5027        let first = ptrs[0];
5028        for p in &ptrs {
5029            assert_eq!(*p, first, "singleton identity must hold across threads");
5030        }
5031    }
5032
5033    #[test]
5034    fn test_subscriber_stats_concurrent_update() {
5035        let stats = Arc::new(SubscriberStats::default());
5036        let n_threads = 4;
5037        let per_thread = 250;
5038        let barrier = Arc::new(Barrier::new(n_threads));
5039        let mut handles = Vec::new();
5040        for t in 0..n_threads {
5041            let s = stats.clone();
5042            let b = barrier.clone();
5043            handles.push(std::thread::spawn(move || {
5044                b.wait();
5045                for i in 0..per_thread {
5046                    let kind = if (t + i) % 2 == 0 {
5047                        CardEventKind::Created
5048                    } else {
5049                        CardEventKind::Appended
5050                    };
5051                    s.record_ok("mock://same-subscriber", kind);
5052                }
5053            }));
5054        }
5055        for h in handles {
5056            h.join().unwrap();
5057        }
5058        let snap = stats.snapshot();
5059        let row = snap
5060            .iter()
5061            .find(|r| r.sink == "mock://same-subscriber")
5062            .expect("row");
5063        let expected = (n_threads * per_thread) as u64;
5064        let ok_total: u64 = row.ok.values().sum();
5065        assert_eq!(ok_total, expected, "all increments must be counted");
5066    }
5067
5068    #[test]
5069    fn test_subscriber_stats_poison_recovery() {
5070        let stats = Arc::new(SubscriberStats::default());
5071        // Populate some value so the recovered inner map is non-empty.
5072        stats.record_ok("mock://poison", CardEventKind::Created);
5073
5074        // Poison the Mutex.
5075        let s_clone = stats.clone();
5076        let _ = std::thread::spawn(move || {
5077            let _g = s_clone.inner.lock().unwrap();
5078            panic!("intentional poison");
5079        })
5080        .join();
5081
5082        // Follow-up accessors must not hang and must return the prior value.
5083        let snap = stats.snapshot();
5084        assert!(!snap.is_empty(), "snapshot after poison must still work");
5085        let ok1: u64 = snap[0].ok.values().sum();
5086        assert_eq!(ok1, 1);
5087
5088        // Further writes must also succeed (via unwrap_or_else).
5089        stats.record_ok("mock://poison", CardEventKind::Created);
5090        let snap2 = stats.snapshot();
5091        let ok2: u64 = snap2[0].ok.values().sum();
5092        assert_eq!(ok2, 2);
5093    }
5094
5095    #[test]
5096    fn test_atomic_tmp_seq_unique_under_concurrency() {
5097        // N threads build tmp suffix strings via the same atomic and
5098        // all suffixes must differ.
5099        let dir = tempfile::tempdir().unwrap();
5100        let barrier = Arc::new(Barrier::new(8));
5101        let mut handles = Vec::new();
5102        for i in 0..8 {
5103            let d = dir.path().to_path_buf();
5104            let b = barrier.clone();
5105            handles.push(std::thread::spawn(move || {
5106                b.wait();
5107                let dest = d.join(format!("file_{i}.bin"));
5108                atomic_write(&dest, b"x").unwrap();
5109                // Collect the leaf filename for uniqueness.
5110                dest.file_name().unwrap().to_string_lossy().to_string()
5111            }));
5112        }
5113        let names: HashSet<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5114        assert_eq!(names.len(), 8, "all dest names must be unique");
5115        // Additionally confirm suffix format by invoking atomic_write
5116        // again and parsing the tmp we leave on a forced failure.
5117    }
5118
5119    #[test]
5120    fn test_atomic_tmp_seq_wraps_without_panic() {
5121        // Isolated AtomicU64 at the boundary of wrap-around. fetch_add
5122        // is documented to wrap without panic.
5123        let seq = AtomicU64::new(u64::MAX - 1);
5124        let a = seq.fetch_add(1, Ordering::Relaxed);
5125        let b = seq.fetch_add(1, Ordering::Relaxed);
5126        let c = seq.fetch_add(1, Ordering::Relaxed);
5127        assert_eq!(a, u64::MAX - 1);
5128        assert_eq!(b, u64::MAX);
5129        assert_eq!(c, 0, "u64 fetch_add must wrap to 0");
5130    }
5131
5132    #[test]
5133    fn test_rename_atomicity_same_volume() {
5134        // 2 threads write the same dest from different tmp names. On
5135        // POSIX the late writer wins; either way the dest must be
5136        // observable and contain one of the two payloads.
5137        let dir = tempfile::tempdir().unwrap();
5138        let dest = dir.path().join("shared.bin");
5139        let barrier = Arc::new(Barrier::new(2));
5140        let mut handles = Vec::new();
5141        for i in 0..2u8 {
5142            let d = dest.clone();
5143            let b = barrier.clone();
5144            handles.push(std::thread::spawn(move || {
5145                b.wait();
5146                let payload = vec![i; 64];
5147                atomic_write(&d, &payload)
5148            }));
5149        }
5150        let mut saw_ok = 0;
5151        for h in handles {
5152            #[cfg(unix)]
5153            {
5154                // On POSIX both should succeed — rename is atomic but allowed.
5155                h.join().unwrap().unwrap();
5156                saw_ok += 1;
5157            }
5158            #[cfg(not(unix))]
5159            {
5160                // On Windows at least one must succeed.
5161                if h.join().unwrap().is_ok() {
5162                    saw_ok += 1;
5163                }
5164            }
5165        }
5166        assert!(saw_ok >= 1, "at least one rename must succeed");
5167        assert!(dest.exists(), "dest must exist after concurrent rename");
5168        let bytes = fs::read(&dest).unwrap();
5169        assert!(bytes == vec![0u8; 64] || bytes == vec![1u8; 64]);
5170    }
5171
5172    #[test]
5173    fn test_fanout_concurrent_create_with_store() {
5174        let primary = tempfile::tempdir().unwrap();
5175        let sub = tempfile::tempdir().unwrap();
5176        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
5177        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5178            let primary_path = primary.path().to_path_buf();
5179            let barrier = Arc::new(Barrier::new(4));
5180            let mut handles = Vec::new();
5181            for i in 0..4 {
5182                let pp = primary_path.clone();
5183                let b = barrier.clone();
5184                handles.push(std::thread::spawn(move || {
5185                    // The parent holds `bus_test_gate()` for the entire
5186                    // `with_bus_subscribers` scope. Child threads must set
5187                    // INSIDE_BUS_TEST=true themselves so that publish()
5188                    // bypasses the gate instead of blocking on it (which
5189                    // would deadlock once the parent calls join()).
5190                    INSIDE_BUS_TEST.with(|flag| flag.set(true));
5191                    b.wait();
5192                    let store = FileCardStore::new(pp);
5193                    create_with_store(
5194                        &store,
5195                        json!({
5196                            "card_id": format!("concur_card_{i}"),
5197                            "pkg": { "name": "concur_pkg" },
5198                        }),
5199                    )
5200                    .unwrap()
5201                    .0
5202                }));
5203            }
5204            let ids: Vec<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5205            assert_eq!(ids.len(), 4);
5206            for id in &ids {
5207                let p = sub.path().join("concur_pkg").join(format!("{id}.toml"));
5208                assert!(p.exists(), "subscriber must have card {id}");
5209            }
5210            let snap = bus.stats().snapshot();
5211            let row = snap
5212                .iter()
5213                .find(|r| r.sink == fs_sub.describe())
5214                .expect("row");
5215            let ok_total: u64 = row.ok.values().sum();
5216            assert_eq!(
5217                ok_total, 4,
5218                "subscriber must have recorded 4 successful deliveries"
5219            );
5220        });
5221    }
5222
5223    // ─── card_sink_backfill (Subtask 3) ────────────────────────
5224
5225    /// Primary-side fixture with N cards already written via the
5226    /// subscriber-free path so backfill has something to push.
5227    /// Returns (primary_dir_guard, store, card_ids).
5228    fn backfill_primary_with_cards(
5229        pkg: &str,
5230        count: usize,
5231    ) -> (tempfile::TempDir, FileCardStore, Vec<String>) {
5232        let primary = tempfile::tempdir().unwrap();
5233        let store = FileCardStore::new(primary.path().to_path_buf());
5234        let mut ids = Vec::new();
5235        for i in 0..count {
5236            let (id, _) = create_with_store(
5237                &store,
5238                json!({
5239                    "card_id": format!("{pkg}_{i}"),
5240                    "pkg": { "name": pkg },
5241                }),
5242            )
5243            .unwrap();
5244            ids.push(id);
5245        }
5246        (primary, store, ids)
5247    }
5248
5249    #[test]
5250    fn backfill_pushes_missing_cards() {
5251        let sub_dir = tempfile::tempdir().unwrap();
5252        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5253        let uri = fs_sub.describe();
5254        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5255            // Populate primary before the subscriber is live (temporarily drop it).
5256            let bus = event_bus();
5257            bus.replace_subscribers_for_test(Vec::new());
5258            let (_primary, store, ids) = backfill_primary_with_cards("backfill_push_pkg", 2);
5259            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5260
5261            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5262            assert_eq!(report.pushed.len(), 2);
5263            assert_eq!(report.skipped.len(), 0);
5264            assert!(report.failed.is_empty());
5265            for id in &ids {
5266                let p = sub_dir
5267                    .path()
5268                    .join("backfill_push_pkg")
5269                    .join(format!("{id}.toml"));
5270                assert!(p.exists(), "card {id} must exist on subscriber");
5271            }
5272        });
5273    }
5274
5275    #[test]
5276    fn backfill_skips_existing_on_subscriber() {
5277        let sub_dir = tempfile::tempdir().unwrap();
5278        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5279        let uri = fs_sub.describe();
5280        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5281            // Subscriber is live during create, so it already has the card.
5282            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_skip_pkg", 3);
5283            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5284            assert_eq!(report.pushed.len(), 0);
5285            assert_eq!(report.skipped.len(), 3);
5286            assert!(report.failed.is_empty());
5287        });
5288    }
5289
5290    #[test]
5291    fn backfill_dry_run_no_writes() {
5292        let sub_dir = tempfile::tempdir().unwrap();
5293        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5294        let uri = fs_sub.describe();
5295        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5296            let bus = event_bus();
5297            bus.replace_subscribers_for_test(Vec::new());
5298            let (_primary, store, ids) = backfill_primary_with_cards("backfill_dry_pkg", 2);
5299            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5300
5301            let report = card_sink_backfill_with_store(&store, &uri, true).unwrap();
5302            assert_eq!(
5303                report.pushed.len(),
5304                2,
5305                "pushed must list ids even in dry run"
5306            );
5307            for id in &ids {
5308                let p = sub_dir
5309                    .path()
5310                    .join("backfill_dry_pkg")
5311                    .join(format!("{id}.toml"));
5312                assert!(!p.exists(), "dry run must NOT write card {id}");
5313            }
5314            // Stats must remain zero — dry run publishes nothing.
5315            let snap = bus.stats().snapshot();
5316            if let Some(row) = snap.iter().find(|r| r.sink == uri) {
5317                let total: u64 = row.ok.values().sum::<u64>() + row.err.values().sum::<u64>();
5318                assert_eq!(total, 0, "dry run must not touch stats");
5319            }
5320        });
5321    }
5322
5323    #[test]
5324    fn backfill_drifted_card_skipped_not_overwritten() {
5325        let sub_dir = tempfile::tempdir().unwrap();
5326        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5327        let uri = fs_sub.describe();
5328        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5329            let bus = event_bus();
5330            bus.replace_subscribers_for_test(Vec::new());
5331            let (_primary, store, ids) = backfill_primary_with_cards("backfill_drift_pkg", 1);
5332            let id = &ids[0];
5333
5334            // Manually place a drifted copy on the subscriber with sentinel text.
5335            let sub_card_dir = sub_dir.path().join("backfill_drift_pkg");
5336            fs::create_dir_all(&sub_card_dir).unwrap();
5337            let sub_card = sub_card_dir.join(format!("{id}.toml"));
5338            fs::write(&sub_card, "drifted=true\n").unwrap();
5339
5340            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5341            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5342            assert_eq!(report.skipped, vec![id.clone()]);
5343            assert!(report.pushed.is_empty());
5344            let after = fs::read_to_string(&sub_card).unwrap();
5345            assert_eq!(after, "drifted=true\n", "drifted copy must be preserved");
5346        });
5347    }
5348
5349    #[test]
5350    fn backfill_includes_samples() {
5351        let sub_dir = tempfile::tempdir().unwrap();
5352        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5353        let uri = fs_sub.describe();
5354        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5355            let bus = event_bus();
5356            bus.replace_subscribers_for_test(Vec::new());
5357            let (_primary, store, ids) = backfill_primary_with_cards("backfill_samples_pkg", 1);
5358            let id = &ids[0];
5359            write_samples_with_store(&store, id, vec![json!({ "case": "c0" })]).unwrap();
5360            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5361
5362            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5363            assert_eq!(report.pushed, vec![id.clone()]);
5364            assert_eq!(report.pushed_samples, vec![id.clone()]);
5365            let sub_samples = sub_dir
5366                .path()
5367                .join("backfill_samples_pkg")
5368                .join(format!("{id}.samples.jsonl"));
5369            assert!(sub_samples.exists());
5370            assert!(fs::read_to_string(&sub_samples).unwrap().contains("c0"));
5371        });
5372    }
5373
5374    #[test]
5375    fn backfill_unknown_sink_err() {
5376        with_bus_subscribers(Vec::new(), |_bus| {
5377            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_unknown_pkg", 1);
5378            let err = card_sink_backfill_with_store(&store, "file:///nonexistent/sink", false)
5379                .unwrap_err();
5380            assert!(
5381                err.starts_with("unknown sink"),
5382                "must reject unregistered sink; got: {err}"
5383            );
5384        });
5385    }
5386
5387    #[test]
5388    fn backfill_bypasses_bus_fanout() {
5389        // Subscriber A is already in-sync; Subscriber B is the backfill target.
5390        // Backfilling B must NOT re-deliver Created events to A.
5391        let sub_a_dir = tempfile::tempdir().unwrap();
5392        let sub_b_dir = tempfile::tempdir().unwrap();
5393        let fa = Arc::new(FileCardSubscriber::new(sub_a_dir.path().to_path_buf()));
5394        let fb = Arc::new(FileCardSubscriber::new(sub_b_dir.path().to_path_buf()));
5395        let uri_b = fb.describe();
5396        with_bus_subscribers(
5397            vec![
5398                fa.clone() as Arc<dyn CardSubscriber>,
5399                fb.clone() as Arc<dyn CardSubscriber>,
5400            ],
5401            |bus| {
5402                // Populate primary with subscriber A live (B temporarily absent).
5403                bus.replace_subscribers_for_test(vec![fa.clone()]);
5404                let (_primary, store, _ids) = backfill_primary_with_cards("backfill_bypass_pkg", 2);
5405                // Capture A's ok[created] count before backfill.
5406                let before = bus
5407                    .stats()
5408                    .snapshot()
5409                    .into_iter()
5410                    .find(|r| r.sink == fa.describe())
5411                    .map(|r| r.ok.get("created").copied().unwrap_or(0))
5412                    .unwrap_or(0);
5413                // Now reinstall both subscribers and backfill only B.
5414                bus.replace_subscribers_for_test(vec![fa.clone(), fb.clone()]);
5415                card_sink_backfill_with_store(&store, &uri_b, false).unwrap();
5416                let after = bus
5417                    .stats()
5418                    .snapshot()
5419                    .into_iter()
5420                    .find(|r| r.sink == fa.describe())
5421                    .map(|r| r.ok.get("created").copied().unwrap_or(0))
5422                    .unwrap_or(0);
5423                assert_eq!(
5424                    before, after,
5425                    "backfill target B must not cause fan-out to subscriber A"
5426                );
5427            },
5428        );
5429    }
5430
5431    #[test]
5432    fn backfill_updates_subscriber_stats() {
5433        let sub_dir = tempfile::tempdir().unwrap();
5434        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5435        let uri = fs_sub.describe();
5436        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5437            bus.replace_subscribers_for_test(Vec::new());
5438            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_stats_pkg", 2);
5439            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5440
5441            card_sink_backfill_with_store(&store, &uri, false).unwrap();
5442            let snap = bus.stats().snapshot();
5443            let row = snap.iter().find(|r| r.sink == uri).expect("row");
5444            assert_eq!(
5445                row.ok.get("created").copied().unwrap_or(0),
5446                2,
5447                "backfill must increment ok[created] on the target sink"
5448            );
5449        });
5450    }
5451
5452    #[test]
5453    fn backfill_failure_records_err_stat() {
5454        // Subscriber whose on_event always fails (no filesystem needed).
5455        struct FailingSub {
5456            uri: String,
5457        }
5458        impl CardSubscriber for FailingSub {
5459            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
5460                Err("synthetic backfill failure".into())
5461            }
5462            fn has_card(&self, _card_id: &str) -> Result<bool, String> {
5463                Ok(false)
5464            }
5465            fn describe(&self) -> String {
5466                self.uri.clone()
5467            }
5468        }
5469        let uri = "mock://backfill-fail".to_string();
5470        let failing: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5471        with_bus_subscribers(vec![failing], |bus| {
5472            bus.replace_subscribers_for_test(Vec::new());
5473            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_fail_pkg", 1);
5474            // Reinstall the failing subscriber for the backfill phase.
5475            let reinstall: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5476            bus.replace_subscribers_for_test(vec![reinstall]);
5477
5478            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5479            assert_eq!(
5480                report.failed.len(),
5481                1,
5482                "failed must record the synthetic err"
5483            );
5484            assert!(report.pushed.is_empty());
5485            let snap = bus.stats().snapshot();
5486            let row = snap.iter().find(|r| r.sink == uri).expect("row");
5487            assert!(
5488                row.err.get("created").copied().unwrap_or(0) >= 1,
5489                "failing publish must increment err[created]"
5490            );
5491            assert!(row.last_error.is_some());
5492        });
5493    }
5494
5495    #[test]
5496    fn test_oncelock_set_after_init_returns_err() {
5497        // Force init (no-op if already initialized by a prior test).
5498        let _ = event_bus();
5499        let result = install_event_bus_for_test(CardEventBus::new(Vec::new()));
5500        assert!(
5501            result.is_err(),
5502            "install after init must return Err per OnceLock contract"
5503        );
5504        assert_eq!(result.unwrap_err(), "bus already initialized");
5505    }
5506}