Skip to main content

algocline_engine/
card.rs

1//! Card storage — immutable run-result snapshots.
2//!
3//! A Card is a frozen record of a strategy run: identity, parameters,
4//! model, scenario, aggregate stats, and (optionally) per-case detail.
5//! Cards are **immutable** — once written they are never modified, only
6//! annotated via additive `append`.  Mutable **aliases** point to a
7//! Card and can be rebound freely.
8//!
9//! ## Design principles
10//!
11//! 1. **Minimal REQUIRED, maximal OPTIONAL** — v0 needs only 4 fields;
12//!    lightweight "ran this pkg" records and heavy optimize snapshots
13//!    share the same schema.
14//! 2. **Immutable append-only** — no overwrite, no delete.  New data is
15//!    added via `append` (new top-level keys only) or by creating a new
16//!    Card with a fresh `card_id`.
17//! 3. **Two-tier storage** — TOML for human-readable aggregate, JSONL
18//!    sidecar for machine-parseable per-case detail.
19//! 4. **File-primary** — files are the source of truth; in-memory state
20//!    is cache.  Cards can be copied, diffed, and version-controlled.
21//!
22//! ## Storage layout (two-tier)
23//!
24//! | Tier | File | Content |
25//! |------|------|---------|
26//! | **Tier 1** | `~/.algocline/cards/{pkg}/{card_id}.toml` | Aggregate scalars, decisions, identity, params |
27//! | **Tier 2** | `~/.algocline/cards/{pkg}/{card_id}.samples.jsonl` | Per-case raw data (JSONL, write-once) |
28//!
29//! Tier 1 holds a shareable summary (a few KB). Tier 2 holds per-case
30//! detail ��� the engine does not interpret its columns; packages define
31//! their own schema.
32//!
33//! Alias table: `~/.algocline/cards/_aliases.toml` (global).
34//!
35//! ## card_id naming
36//!
37//! `{pkg}_{model_short}_{compact_ts}_{hash6}`
38//!
39//! - `compact_ts`: `YYYYMMDDTHHMMSS` in UTC
40//! - `hash6`: first 6 hex chars of DJB2 param fingerprint
41//! - Example: `cot_opus46_20260412T061500_a3f9c1`
42//!
43//! ## v0 schema (frozen)
44//!
45//! ### REQUIRED (minimum valid Card)
46//!
47//! | Field | Type | Example |
48//! |-------|------|---------|
49//! | `schema_version` | string | `"card/v0"` |
50//! | `card_id` | string | `"cot_opus46_20260412T061500_a3f9c1"` |
51//! | `created_at` | string (RFC 3339) | `"2026-04-12T06:15:00Z"` |
52//! | `[pkg].name` | string | `"cot"` |
53//!
54//! ### OPTIONAL (auto-injected where possible)
55//!
56//! | Section | Fields |
57//! |---------|--------|
58//! | `[pkg]` | `version`, `category`, `source`, `source_ref`, `source_sha` |
59//! | `[runtime]` | `alc_version`, `lua_version`, `host_os`, `git_sha` |
60//! | `[model]` | `provider`, `id`, `id_short`, `cutoff` |
61//! | `[params]` | Free-form ctx snapshot; `param_fingerprint` for DJB2 hash |
62//! | `[strategy_params]` | Strategy-tunable parameters surfaced for sweeps / optimizers (e.g. `alpha`, `temperature`, `depth`). Free-form, but `where`-queryable as a first-class section |
63//! | `[scenario]` | `name`, `source`, `case_count`, `grader` |
64//! | `[stats]` | `pass_rate`, `mean_score`, `std`, `median`, `min`, `max`, `n` |
65//! | `[stats.by_bucket]` | Disaggregated sub-bucket stats (array of tables) |
66//! | `[cost]` | `llm_calls`, `input_tokens`, `output_tokens`, `elapsed_ms`, `usd_estimate` |
67//! | `[optimize]` | `target`, `search`, `rounds_used`, `top_k` (for optimize Cards) |
68//! | `[metadata]` | Free-form escape hatch. Recognized lineage conventions: `prior_card_id` (parent Card id), `prior_relation` (relation kind, e.g. `"sweep_variant"`, `"reflection_of"`, `"derived_from"`) |
69//!
70//! ## Lua API (`alc.card.*`)
71//!
72//! | Function | Description |
73//! |----------|-------------|
74//! | `create(table)` | Write new Card (Tier 1). Returns `{ card_id, path }` |
75//! | `get(card_id)` | Read Card by id. Returns table or nil |
76//! | `list(filter?)` | List Cards as summaries (newest first) |
77//! | `find(query?)` | Prisma-style `where` DSL + dotted-path `order_by` + `offset`/`limit` |
78//! | `append(card_id, fields)` | Additive-only annotation (new keys only) |
79//! | `alias_set(name, card_id, opts?)` | Pin mutable alias |
80//! | `alias_list(filter?)` | List aliases |
81//! | `get_by_alias(name)` | Resolve alias → full Card |
82//! | `write_samples(card_id, samples)` | Write Tier 2 sidecar (write-once) |
83//! | `read_samples(card_id, opts?)` | Read Tier 2 with `where` filtering + offset/limit paging |
84//! | `lineage(query)` | Walk ancestry/descendants via `metadata.prior_card_id` |
85
86use std::collections::{HashMap, HashSet};
87use std::fs;
88use std::path::{Path, PathBuf};
89use std::process;
90use std::sync::atomic::{AtomicU64, Ordering};
91use std::sync::{Arc, Mutex, OnceLock};
92
93use serde::{Serialize, Serializer};
94use serde_json::{json, Value as Json};
95
96pub const SCHEMA_VERSION: &str = "card/v0";
97
98// ═══════════════════════════════════════════════════════════════
99// CardStore trait — physical I/O abstraction.
100// ═══════════════════════════════════════════════════════════════
101//
102// Card domain logic (schema / Query DSL / Lineage) is backend-
103// neutral. Only the physical read/write layer is swappable.
104//
105// The default backend is `FileCardStore`, which preserves the
106// legacy `~/.algocline/cards/{pkg}/{card_id}.toml` layout
107// byte-for-byte. Alternative backends (PathCardStore, SqliteCardStore,
108// MemoryCardStore) can be added by implementing this trait.
109//
110// Locators are `PathBuf` values. For FileCardStore they are real
111// filesystem paths; for non-file backends they are synthetic paths
112// (e.g. `sqlite:///db.sqlite#card/{id}`) — the value is opaque to
113// the domain layer and only exposed via the Lua `alc.card.create`
114// / `alc.card.write_samples` return values.
115
116/// Storage backend for Cards.
117///
118/// Implementations must be `Send + Sync` so that they can be shared
119/// across Lua host threads safely. All methods may fail with an
120/// error `String` describing the backend-specific failure.
121pub trait CardStore: Send + Sync {
122    // ─── Card CRUD ─────────────────────────────────────────────
123
124    /// Write a new Card (Tier 1 TOML).
125    ///
126    /// The caller has already:
127    ///   - validated `pkg` and `card_id` via [`validate_name`]
128    ///   - serialized `toml_text` with `toml::to_string_pretty`
129    ///
130    /// Fails if a Card with the same id already exists
131    /// (immutability).  Returns the locator of the written Card.
132    fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
133
134    /// Overwrite an existing Card (append flow).
135    ///
136    /// Append is additive-only w.r.t. keys, but the underlying
137    /// TOML file is rewritten in place; callers must have validated
138    /// the additive-only constraint before calling this.
139    fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
140
141    /// Locate a Card file by id. Returns `None` if not found.
142    fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String>;
143
144    /// Read a Card's raw TOML text by id. Returns `None` if missing.
145    fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String>;
146
147    /// List `(pkg, locator)` pairs for every Card file in the store.
148    ///
149    /// When `pkg_filter` is `Some(name)`, restrict to that pkg
150    /// subdir. Non-existent pkg subdir yields an empty Vec.
151    ///
152    /// Order is implementation-defined — callers sort explicitly.
153    fn list_card_locators(
154        &self,
155        pkg_filter: Option<&str>,
156    ) -> Result<Vec<(String, PathBuf)>, String>;
157
158    /// Read raw TOML text from a locator returned by
159    /// [`Self::list_card_locators`]. `Ok(None)` on read failure so
160    /// scans can skip corrupt files without aborting.
161    fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String>;
162
163    // ─── Alias table ───────────────────────────────────────────
164
165    fn read_aliases(&self) -> Result<Vec<Alias>, String>;
166    fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String>;
167
168    // ─── Samples sidecar ───────────────────────────────────────
169
170    /// Check whether a samples sidecar exists for `card_id`.
171    fn samples_exists(&self, card_id: &str) -> Result<bool, String>;
172
173    /// Write a samples sidecar (write-once).
174    ///
175    /// `jsonl_text` is the complete JSONL payload (one JSON line
176    /// per sample, `\n`-terminated). Fails if a sidecar already
177    /// exists. Returns the locator.
178    fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String>;
179
180    /// Read a samples sidecar as raw JSONL text. Returns `None`
181    /// when no sidecar exists (samples are optional).
182    fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String>;
183
184    // ─── Import ────────────────────────────────────────────────
185
186    /// Import Card files from `source_dir` into the store under
187    /// `pkg`. First-writer wins (existing Cards are skipped).
188    /// Returns `(imported, skipped)` card_id lists.
189    fn import_from_dir(
190        &self,
191        source_dir: &Path,
192        pkg: &str,
193    ) -> Result<(Vec<String>, Vec<String>), String>;
194}
195
196fn validate_name(name: &str, kind: &str) -> Result<(), String> {
197    if name.is_empty()
198        || name.contains('/')
199        || name.contains('\\')
200        || name.contains("..")
201        || name.contains('\0')
202    {
203        return Err(format!("Invalid {kind} name: '{name}'"));
204    }
205    Ok(())
206}
207
208/// DJB2 hash, hex-encoded. Used for param_fingerprint and card_id hash segment.
209fn djb2_hex(s: &str) -> String {
210    let mut h: u64 = 5381;
211    for b in s.bytes() {
212        h = h.wrapping_mul(33).wrapping_add(b as u64);
213    }
214    format!("{h:016x}")
215}
216
217/// Short-hash: last 6 hex chars of djb2.
218///
219/// DJB2's high bits are dominated by the `5381 * 33^n` term (same for any
220/// input of equal length), so the top hex digits collide easily for same-
221/// length inputs that differ only in a few byte positions. The low bits,
222/// driven by the most-recent bytes, mix well enough for short-hash use.
223fn hash6(s: &str) -> String {
224    let hex = djb2_hex(s);
225    let start = hex.len().saturating_sub(6);
226    hex[start..].to_string()
227}
228
229/// Stable serialization of a JSON value for hashing (sorted keys).
230fn stable_json(v: &Json) -> String {
231    let mut buf = String::new();
232    stable_json_into(v, &mut buf);
233    buf
234}
235fn stable_json_into(v: &Json, buf: &mut String) {
236    match v {
237        Json::Null => buf.push_str("null"),
238        Json::Bool(b) => buf.push_str(if *b { "true" } else { "false" }),
239        Json::Number(n) => buf.push_str(&n.to_string()),
240        Json::String(s) => {
241            buf.push('"');
242            buf.push_str(s);
243            buf.push('"');
244        }
245        Json::Array(a) => {
246            buf.push('[');
247            for (i, item) in a.iter().enumerate() {
248                if i > 0 {
249                    buf.push(',');
250                }
251                stable_json_into(item, buf);
252            }
253            buf.push(']');
254        }
255        Json::Object(m) => {
256            let mut keys: Vec<&String> = m.keys().collect();
257            keys.sort();
258            buf.push('{');
259            for (i, k) in keys.iter().enumerate() {
260                if i > 0 {
261                    buf.push(',');
262                }
263                buf.push('"');
264                buf.push_str(k);
265                buf.push_str("\":");
266                stable_json_into(&m[*k], buf);
267            }
268            buf.push('}');
269        }
270    }
271}
272
273/// Derive a short model id (e.g. "claude-opus-4-6" -> "opus46").
274/// v0: best-effort. Falls back to "model" if input is empty.
275fn short_model(id: &str) -> String {
276    if id.is_empty() {
277        return "model".into();
278    }
279    // Strip common vendor prefixes.
280    let stripped = id
281        .strip_prefix("claude-")
282        .or_else(|| id.strip_prefix("gpt-"))
283        .unwrap_or(id);
284    // Keep alnum only.
285    let s: String = stripped
286        .chars()
287        .filter(|c| c.is_ascii_alphanumeric())
288        .collect();
289    if s.is_empty() {
290        "model".into()
291    } else {
292        s
293    }
294}
295
296/// RFC3339 UTC "YYYY-MM-DDTHH:MM:SSZ" from current system time.
297fn now_rfc3339() -> String {
298    let secs = std::time::SystemTime::now()
299        .duration_since(std::time::UNIX_EPOCH)
300        .map(|d| d.as_secs())
301        .unwrap_or(0) as i64;
302    let days = secs.div_euclid(86400);
303    let tod = secs.rem_euclid(86400);
304    let (y, mo, d) = civil_from_days(days);
305    let hh = tod / 3600;
306    let mm = (tod % 3600) / 60;
307    let ss = tod % 60;
308    format!("{y:04}-{mo:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
309}
310
311/// YYYYMMDDTHHMMSS for current UTC time (compact, sortable).
312///
313/// Used in card_id generation so that:
314///   - rapid successive runs don't collide on the hash6 segment
315///   - string sort of card_id = chronological order
316fn now_compact() -> String {
317    let secs = std::time::SystemTime::now()
318        .duration_since(std::time::UNIX_EPOCH)
319        .map(|d| d.as_secs())
320        .unwrap_or(0) as i64;
321    let days = secs.div_euclid(86400);
322    let tod = secs.rem_euclid(86400);
323    let (y, mo, d) = civil_from_days(days);
324    let hh = tod / 3600;
325    let mm = (tod % 3600) / 60;
326    let ss = tod % 60;
327    format!("{y:04}{mo:02}{d:02}T{hh:02}{mm:02}{ss:02}")
328}
329
330/// Howard Hinnant's civil_from_days algorithm.
331fn civil_from_days(z: i64) -> (i32, u32, u32) {
332    let z = z + 719468;
333    let era = if z >= 0 { z } else { z - 146096 } / 146097;
334    let doe = (z - era * 146097) as u64;
335    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
336    let y = yoe as i64 + era * 400;
337    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
338    let mp = (5 * doy + 2) / 153;
339    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
340    let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
341    let y = y + if m <= 2 { 1 } else { 0 };
342    (y as i32, m, d)
343}
344
345/// Converter: serde_json::Value -> toml::Value.
346/// Nulls are dropped (TOML has no null). Mixed-type arrays are allowed in TOML 1.0+.
347fn json_to_toml(v: Json) -> Result<toml::Value, String> {
348    Ok(match v {
349        Json::Null => return Err("TOML does not support null values".into()),
350        Json::Bool(b) => toml::Value::Boolean(b),
351        Json::Number(n) => {
352            if let Some(i) = n.as_i64() {
353                toml::Value::Integer(i)
354            } else if let Some(f) = n.as_f64() {
355                toml::Value::Float(f)
356            } else {
357                return Err(format!("Unsupported number: {n}"));
358            }
359        }
360        Json::String(s) => toml::Value::String(s),
361        Json::Array(a) => {
362            let mut out = Vec::with_capacity(a.len());
363            for item in a {
364                if !item.is_null() {
365                    out.push(json_to_toml(item)?);
366                }
367            }
368            toml::Value::Array(out)
369        }
370        Json::Object(m) => {
371            let mut table = toml::map::Map::new();
372            for (k, val) in m {
373                if val.is_null() {
374                    continue;
375                }
376                table.insert(k, json_to_toml(val)?);
377            }
378            toml::Value::Table(table)
379        }
380    })
381}
382
383/// Converter: toml::Value -> serde_json::Value (for alc.card.get()).
384fn toml_to_json(v: toml::Value) -> Json {
385    match v {
386        toml::Value::String(s) => Json::String(s),
387        toml::Value::Integer(i) => json!(i),
388        toml::Value::Float(f) => json!(f),
389        toml::Value::Boolean(b) => Json::Bool(b),
390        toml::Value::Datetime(dt) => Json::String(dt.to_string()),
391        toml::Value::Array(a) => Json::Array(a.into_iter().map(toml_to_json).collect()),
392        toml::Value::Table(t) => {
393            let mut m = serde_json::Map::new();
394            for (k, v) in t {
395                m.insert(k, toml_to_json(v));
396            }
397            Json::Object(m)
398        }
399    }
400}
401
402/// Extract [pkg].name from an input JSON object. REQUIRED.
403fn require_pkg_name(input: &Json) -> Result<String, String> {
404    let name = input
405        .get("pkg")
406        .and_then(|p| p.get("name"))
407        .and_then(|n| n.as_str())
408        .ok_or_else(|| "alc.card.create: pkg.name is required".to_string())?
409        .to_string();
410    validate_name(&name, "pkg")?;
411    Ok(name)
412}
413
414/// Create a new Card backed by `store`.
415pub fn create_with_store(
416    store: &dyn CardStore,
417    mut input: Json,
418) -> Result<(String, PathBuf), String> {
419    if !input.is_object() {
420        return Err("alc.card.create: input must be a table".into());
421    }
422    let pkg_name = require_pkg_name(&input)?;
423    let obj = input.as_object_mut().unwrap();
424
425    // ─── Auto-inject REQUIRED fields ──────────────────────────
426    obj.entry("schema_version".to_string())
427        .or_insert_with(|| json!(SCHEMA_VERSION));
428    obj.entry("created_at".to_string())
429        .or_insert_with(|| json!(now_rfc3339()));
430    obj.entry("created_by".to_string())
431        .or_insert_with(|| json!(format!("alc@{}", env!("CARGO_PKG_VERSION"))));
432
433    // ─── param_fingerprint (if [params] present) ──────────────
434    if let Some(params) = obj.get("params").cloned() {
435        if params.is_object() {
436            let fp = djb2_hex(&stable_json(&params));
437            obj.insert("param_fingerprint".to_string(), json!(fp));
438        }
439    }
440
441    // ─── card_id generation (if absent) ───────────────────────
442    let card_id = match obj.get("card_id").and_then(|v| v.as_str()) {
443        Some(id) if !id.is_empty() => id.to_string(),
444        _ => {
445            let model_id = obj
446                .get("model")
447                .and_then(|m| m.get("id"))
448                .and_then(|v| v.as_str())
449                .unwrap_or("");
450            let model_short = short_model(model_id);
451            let ts = now_compact();
452            let fp_seed = stable_json(&Json::Object(obj.clone()));
453            let h = hash6(&fp_seed);
454            format!("{pkg_name}_{model_short}_{ts}_{h}")
455        }
456    };
457    validate_name(&card_id, "card_id")?;
458    obj.insert("card_id".to_string(), json!(card_id.clone()));
459
460    let toml_val = json_to_toml(input)?;
461    let text = toml::to_string_pretty(&toml_val)
462        .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
463    let path = store.write_new_card(&pkg_name, &card_id, &text)?;
464
465    publish(CardEvent::Created {
466        pkg: pkg_name.clone(),
467        card_id: card_id.clone(),
468        toml_text: text,
469    });
470
471    Ok((card_id, path))
472}
473
474/// Read a Card from `store` by id. Returns None if not found.
475pub fn get_with_store(store: &dyn CardStore, card_id: &str) -> Result<Option<Json>, String> {
476    let text = match store.read_card_text(card_id)? {
477        Some(t) => t,
478        None => return Ok(None),
479    };
480    let val: toml::Value =
481        toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
482    Ok(Some(toml_to_json(val)))
483}
484
485/// Summary row for `alc.card.list()`.
486#[derive(Debug, Clone)]
487pub struct Summary {
488    pub card_id: String,
489    pub pkg: String,
490    pub created_at: Option<String>,
491    pub model: Option<String>,
492    pub scenario: Option<String>,
493    pub pass_rate: Option<f64>,
494}
495
496impl Summary {
497    fn to_json(&self) -> Json {
498        let mut m = serde_json::Map::new();
499        m.insert("card_id".into(), json!(self.card_id));
500        m.insert("pkg".into(), json!(self.pkg));
501        if let Some(v) = &self.created_at {
502            m.insert("created_at".into(), json!(v));
503        }
504        if let Some(v) = &self.model {
505            m.insert("model".into(), json!(v));
506        }
507        if let Some(v) = &self.scenario {
508            m.insert("scenario".into(), json!(v));
509        }
510        if let Some(v) = self.pass_rate {
511            m.insert("pass_rate".into(), json!(v));
512        }
513        Json::Object(m)
514    }
515}
516
517fn summarize(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<Summary> {
518    let text = store.read_locator_text(locator).ok().flatten()?;
519    let val: toml::Value = toml::from_str(&text).ok()?;
520    let card_id = val
521        .get("card_id")
522        .and_then(|v| v.as_str())
523        .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
524        .to_string();
525    let created_at = val
526        .get("created_at")
527        .and_then(|v| v.as_str())
528        .map(String::from);
529    let model = val
530        .get("model")
531        .and_then(|m| m.get("id"))
532        .and_then(|v| v.as_str())
533        .map(String::from);
534    let scenario = val
535        .get("scenario")
536        .and_then(|s| s.get("name"))
537        .and_then(|v| v.as_str())
538        .map(String::from);
539    let pass_rate = val
540        .get("stats")
541        .and_then(|s| s.get("pass_rate"))
542        .and_then(|v| v.as_float());
543    Some(Summary {
544        card_id,
545        pkg: pkg.to_string(),
546        created_at,
547        model,
548        scenario,
549        pass_rate,
550    })
551}
552
553/// List cards from `store`. `pkg_filter = Some("name")` restricts to that pkg subdir.
554pub fn list_with_store(
555    store: &dyn CardStore,
556    pkg_filter: Option<&str>,
557) -> Result<Vec<Summary>, String> {
558    let locators = store.list_card_locators(pkg_filter)?;
559    let mut out = Vec::with_capacity(locators.len());
560    for (pkg, loc) in &locators {
561        if let Some(s) = summarize(store, loc, pkg) {
562            out.push(s);
563        }
564    }
565
566    // Sort newest first. card_id embeds a compact UTC timestamp so it's
567    // naturally chronological; we still prefer created_at when present
568    // (some callers may override it), falling back to card_id.
569    out.sort_by(|a, b| {
570        b.created_at
571            .cmp(&a.created_at)
572            .then_with(|| b.card_id.cmp(&a.card_id))
573    });
574    Ok(out)
575}
576
577pub fn summaries_to_json(rows: &[Summary]) -> Json {
578    Json::Array(rows.iter().map(|s| s.to_json()).collect())
579}
580
581// ───────────────────────────────────────────────────────────────
582// P1 API: append / alias_{set,list} / find
583// ───────────────────────────────────────────────────────────────
584
585/// Append new top-level fields to an existing Card.
586///
587/// Semantics: **additive only**. If any top-level key in `fields` already
588/// exists in the Card, the call fails — Cards are immutable w.r.t. existing
589/// data. New top-level keys are inserted and the Card file is rewritten
590/// atomically.
591///
592/// Returns the merged Card JSON.
593pub fn append_with_store(
594    store: &dyn CardStore,
595    card_id: &str,
596    fields: Json,
597) -> Result<Json, String> {
598    let text = store
599        .read_card_text(card_id)?
600        .ok_or_else(|| format!("alc.card.append: card '{card_id}' not found"))?;
601    let fields_obj = match fields {
602        Json::Object(m) => m,
603        _ => return Err("alc.card.append: fields must be a table".into()),
604    };
605
606    let existing: toml::Value =
607        toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
608    let mut existing_json = toml_to_json(existing);
609    let existing_obj = existing_json
610        .as_object_mut()
611        .ok_or_else(|| format!("Card '{card_id}' is not a table"))?;
612
613    for (k, v) in fields_obj {
614        if existing_obj.contains_key(&k) {
615            return Err(format!(
616                "alc.card.append: key '{k}' already set on card '{card_id}' (immutable)"
617            ));
618        }
619        if !v.is_null() {
620            existing_obj.insert(k, v);
621        }
622    }
623
624    let toml_val = json_to_toml(existing_json.clone())?;
625    let text = toml::to_string_pretty(&toml_val)
626        .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
627    store.overwrite_card(card_id, &text)?;
628
629    publish(CardEvent::Appended {
630        card_id: card_id.to_string(),
631        toml_text: text,
632    });
633
634    Ok(existing_json)
635}
636
637#[derive(Debug, Clone)]
638pub struct Alias {
639    pub name: String,
640    pub card_id: String,
641    pub pkg: Option<String>,
642    pub set_at: String,
643    pub note: Option<String>,
644}
645
646impl Alias {
647    fn to_json(&self) -> Json {
648        let mut m = serde_json::Map::new();
649        m.insert("name".into(), json!(self.name));
650        m.insert("card_id".into(), json!(self.card_id));
651        if let Some(p) = &self.pkg {
652            m.insert("pkg".into(), json!(p));
653        }
654        m.insert("set_at".into(), json!(self.set_at));
655        if let Some(n) = &self.note {
656            m.insert("note".into(), json!(n));
657        }
658        Json::Object(m)
659    }
660}
661
662/// Bind (or rebind) an alias to a Card in `store`.
663///
664/// Validates that `card_id` exists. If an alias with the same `name` already
665/// exists it is overwritten — the alias table is intentionally mutable even
666/// though the Cards themselves are not.
667pub fn alias_set_with_store(
668    store: &dyn CardStore,
669    name: &str,
670    card_id: &str,
671    pkg: Option<&str>,
672    note: Option<&str>,
673) -> Result<Alias, String> {
674    validate_name(name, "alias")?;
675    if store.find_card_locator(card_id)?.is_none() {
676        return Err(format!("alc.card.alias_set: card '{card_id}' not found"));
677    }
678    let mut aliases = store.read_aliases()?;
679    aliases.retain(|a| a.name != name);
680    let entry = Alias {
681        name: name.to_string(),
682        card_id: card_id.to_string(),
683        pkg: pkg.map(String::from),
684        set_at: now_rfc3339(),
685        note: note.map(String::from),
686    };
687    aliases.push(entry.clone());
688    store.write_aliases(&aliases)?;
689
690    // Mirror the full alias table to subscribers as TOML text. The
691    // primary FileCardStore already serialized it internally; we
692    // re-serialize here using the same shape so subscribers stay in
693    // byte-for-byte parity. A serialization failure is best-effort
694    // (log + skip).
695    match serialize_aliases_toml(&aliases) {
696        Ok(text) => publish(CardEvent::AliasesWritten { toml_text: text }),
697        Err(e) => tracing::warn!(error = %e, "alias_set: failed to serialize aliases for publish"),
698    }
699
700    Ok(entry)
701}
702
703/// Serialize `aliases` to a TOML document matching the primary
704/// `_aliases.toml` layout. Broken out so that subscribers can receive
705/// the exact byte-for-byte dump.
706fn serialize_aliases_toml(aliases: &[Alias]) -> Result<String, String> {
707    let mut arr = Vec::with_capacity(aliases.len());
708    for a in aliases {
709        let mut t = toml::map::Map::new();
710        t.insert("name".into(), toml::Value::String(a.name.clone()));
711        t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
712        if let Some(p) = &a.pkg {
713            t.insert("pkg".into(), toml::Value::String(p.clone()));
714        }
715        t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
716        if let Some(n) = &a.note {
717            t.insert("note".into(), toml::Value::String(n.clone()));
718        }
719        arr.push(toml::Value::Table(t));
720    }
721    let mut root = toml::map::Map::new();
722    root.insert("alias".into(), toml::Value::Array(arr));
723    toml::to_string_pretty(&toml::Value::Table(root))
724        .map_err(|e| format!("Failed to serialize aliases: {e}"))
725}
726
727/// Resolve an alias name to its bound Card and return the full Card JSON.
728///
729/// Shortcut for `alias_list → filter → get`. Returns `None` when the alias
730/// does not exist. Errors when the alias points at a missing Card — that
731/// would indicate a corrupt alias table (the target was deleted out of band).
732pub fn get_by_alias_with_store(store: &dyn CardStore, name: &str) -> Result<Option<Json>, String> {
733    validate_name(name, "alias")?;
734    let aliases = store.read_aliases()?;
735    let Some(alias) = aliases.into_iter().find(|a| a.name == name) else {
736        return Ok(None);
737    };
738    match get_with_store(store, &alias.card_id)? {
739        Some(card) => Ok(Some(card)),
740        None => Err(format!(
741            "alc.card.get_by_alias: alias '{name}' points at missing card '{}'",
742            alias.card_id
743        )),
744    }
745}
746
747/// List aliases from `store`, optionally filtered by pkg.
748pub fn alias_list_with_store(
749    store: &dyn CardStore,
750    pkg_filter: Option<&str>,
751) -> Result<Vec<Alias>, String> {
752    let mut aliases = store.read_aliases()?;
753    if let Some(p) = pkg_filter {
754        aliases.retain(|a| a.pkg.as_deref() == Some(p));
755    }
756    Ok(aliases)
757}
758
759pub fn aliases_to_json(rows: &[Alias]) -> Json {
760    Json::Array(rows.iter().map(|a| a.to_json()).collect())
761}
762
763// ═══════════════════════════════════════════════════════════════
764// Where DSL — Prisma/Mongo-style nested predicates
765// ═══════════════════════════════════════════════════════════════
766//
767// Syntax (JSON form, as received from Lua / MCP):
768//
769//   where = {
770//     pkg: "cot",                                      // implicit eq
771//     stats: { pass_rate: { gte: 0.8 }, n: { gte: 30 } },
772//     strategy_params: { temperature: { gte: 0.7 } },
773//     prior_card_id: { exists: true },
774//     _or: [ {...}, {...} ],                           // logical ops
775//     _not: { model: { id: "claude-haiku-4-5-20251001" } },
776//   }
777//
778// Semantics:
779//   * Multiple keys in the same object → implicit AND.
780//   * Nested object value → section (path extension).
781//   * Object whose every key is a reserved operator name → leaf operator
782//     object; applies the operators to the value at the current path.
783//   * Scalar/array value → implicit eq.
784//   * Reserved logical keys: `_and` / `_or` / `_not`.
785//   * Reserved operator keys: `eq ne lt lte gt gte in nin exists
786//     contains starts_with`.  Card schemas must not use these names as
787//     field names under any section.
788//
789// Missing-field comparison:
790//   * `eq/lt/lte/gt/gte/in/contains/starts_with` → false on missing
791//   * `ne/nin`                                   → true  on missing
792//   * `exists`                                   → explicit
793//
794
795/// Single comparison operator.
796#[derive(Debug, Clone, PartialEq)]
797pub enum CmpOp {
798    Eq,
799    Ne,
800    Lt,
801    Lte,
802    Gt,
803    Gte,
804    In,
805    Nin,
806    Exists,
807    Contains,
808    StartsWith,
809}
810
811impl CmpOp {
812    fn from_key(k: &str) -> Option<Self> {
813        Some(match k {
814            "eq" => Self::Eq,
815            "ne" => Self::Ne,
816            "lt" => Self::Lt,
817            "lte" => Self::Lte,
818            "gt" => Self::Gt,
819            "gte" => Self::Gte,
820            "in" => Self::In,
821            "nin" => Self::Nin,
822            "exists" => Self::Exists,
823            "contains" => Self::Contains,
824            "starts_with" => Self::StartsWith,
825            _ => return None,
826        })
827    }
828}
829
830/// One parsed comparison: `path` points at a nested field,
831/// `op` + `value` describe how to compare it.
832#[derive(Debug, Clone)]
833pub struct Comparison {
834    pub path: Vec<String>,
835    pub op: CmpOp,
836    pub value: Json,
837}
838
839/// Parsed predicate tree.
840#[derive(Debug, Clone)]
841pub enum Predicate {
842    And(Vec<Predicate>),
843    Or(Vec<Predicate>),
844    Not(Box<Predicate>),
845    Cmp(Comparison),
846}
847
848/// Is `obj` entirely composed of reserved operator keys?
849/// Empty objects return false (meaningless as an operator object).
850fn is_operator_object(obj: &serde_json::Map<String, Json>) -> bool {
851    if obj.is_empty() {
852        return false;
853    }
854    obj.keys().all(|k| CmpOp::from_key(k).is_some())
855}
856
857/// Parse a `where` JSON value into a `Predicate`.
858///
859/// `prefix` is the current nested-key path as we descend through
860/// section objects.
861pub fn parse_where(value: &Json) -> Result<Predicate, String> {
862    parse_predicate(value, &[])
863}
864
865fn parse_predicate(value: &Json, prefix: &[String]) -> Result<Predicate, String> {
866    let obj = value
867        .as_object()
868        .ok_or_else(|| "where clause must be a table".to_string())?;
869
870    let mut clauses: Vec<Predicate> = Vec::new();
871
872    for (key, val) in obj {
873        match key.as_str() {
874            "_and" => {
875                let arr = val
876                    .as_array()
877                    .ok_or_else(|| "_and must be an array of sub-predicates".to_string())?;
878                let mut subs = Vec::with_capacity(arr.len());
879                for sub in arr {
880                    subs.push(parse_predicate(sub, prefix)?);
881                }
882                clauses.push(Predicate::And(subs));
883            }
884            "_or" => {
885                let arr = val
886                    .as_array()
887                    .ok_or_else(|| "_or must be an array of sub-predicates".to_string())?;
888                let mut subs = Vec::with_capacity(arr.len());
889                for sub in arr {
890                    subs.push(parse_predicate(sub, prefix)?);
891                }
892                clauses.push(Predicate::Or(subs));
893            }
894            "_not" => {
895                clauses.push(Predicate::Not(Box::new(parse_predicate(val, prefix)?)));
896            }
897            _ => {
898                // Field key — extend the current path.
899                let mut new_path = prefix.to_vec();
900                new_path.push(key.clone());
901
902                match val {
903                    Json::Object(m) if is_operator_object(m) => {
904                        // Leaf: operator object at this path.
905                        for (op_key, op_val) in m {
906                            let op = CmpOp::from_key(op_key).expect("validated above");
907                            clauses.push(Predicate::Cmp(Comparison {
908                                path: new_path.clone(),
909                                op,
910                                value: op_val.clone(),
911                            }));
912                        }
913                    }
914                    Json::Object(_) => {
915                        // Nested section: recurse with extended path.
916                        clauses.push(parse_predicate(val, &new_path)?);
917                    }
918                    _ => {
919                        // Scalar/array: implicit eq.
920                        clauses.push(Predicate::Cmp(Comparison {
921                            path: new_path,
922                            op: CmpOp::Eq,
923                            value: val.clone(),
924                        }));
925                    }
926                }
927            }
928        }
929    }
930
931    if clauses.len() == 1 {
932        Ok(clauses.remove(0))
933    } else {
934        Ok(Predicate::And(clauses))
935    }
936}
937
938/// Fetch a nested value from a Card JSON by dotted path.
939fn fetch_path<'a>(card: &'a Json, path: &[String]) -> Option<&'a Json> {
940    let mut node = card;
941    for key in path {
942        let obj = node.as_object()?;
943        node = obj.get(key)?;
944    }
945    Some(node)
946}
947
948/// Compare two JSON scalars with a numeric/string/bool comparator.
949/// Returns None when the types aren't comparable.
950fn json_cmp(a: &Json, b: &Json) -> Option<std::cmp::Ordering> {
951    match (a, b) {
952        (Json::Number(x), Json::Number(y)) => {
953            let xf = x.as_f64()?;
954            let yf = y.as_f64()?;
955            xf.partial_cmp(&yf)
956        }
957        (Json::String(x), Json::String(y)) => Some(x.cmp(y)),
958        (Json::Bool(x), Json::Bool(y)) => Some(x.cmp(y)),
959        _ => None,
960    }
961}
962
963fn json_eq(a: &Json, b: &Json) -> bool {
964    match (a, b) {
965        (Json::Number(x), Json::Number(y)) => match (x.as_f64(), y.as_f64()) {
966            (Some(xf), Some(yf)) => xf == yf,
967            _ => a == b,
968        },
969        _ => a == b,
970    }
971}
972
973fn eval_cmp(cmp: &Comparison, card: &Json) -> bool {
974    let actual = fetch_path(card, &cmp.path);
975    let exists = actual.is_some();
976
977    match cmp.op {
978        CmpOp::Exists => {
979            let want = cmp.value.as_bool().unwrap_or(true);
980            exists == want
981        }
982        CmpOp::Ne => match actual {
983            None => true,
984            Some(v) => !json_eq(v, &cmp.value),
985        },
986        CmpOp::Nin => match actual {
987            None => true,
988            Some(v) => match cmp.value.as_array() {
989                Some(arr) => !arr.iter().any(|e| json_eq(e, v)),
990                None => false,
991            },
992        },
993        CmpOp::Eq => actual.is_some_and(|v| json_eq(v, &cmp.value)),
994        CmpOp::In => actual.is_some_and(|v| match cmp.value.as_array() {
995            Some(arr) => arr.iter().any(|e| json_eq(e, v)),
996            None => false,
997        }),
998        CmpOp::Lt | CmpOp::Lte | CmpOp::Gt | CmpOp::Gte => {
999            let Some(v) = actual else { return false };
1000            let Some(ord) = json_cmp(v, &cmp.value) else {
1001                return false;
1002            };
1003            use std::cmp::Ordering::{Equal, Greater, Less};
1004            matches!(
1005                (&cmp.op, ord),
1006                (CmpOp::Lt, Less)
1007                    | (CmpOp::Lte, Less | Equal)
1008                    | (CmpOp::Gt, Greater)
1009                    | (CmpOp::Gte, Greater | Equal)
1010            )
1011        }
1012        CmpOp::Contains => {
1013            let Some(Json::String(haystack)) = actual else {
1014                return false;
1015            };
1016            let Some(needle) = cmp.value.as_str() else {
1017                return false;
1018            };
1019            haystack.contains(needle)
1020        }
1021        CmpOp::StartsWith => {
1022            let Some(Json::String(haystack)) = actual else {
1023                return false;
1024            };
1025            let Some(needle) = cmp.value.as_str() else {
1026                return false;
1027            };
1028            haystack.starts_with(needle)
1029        }
1030    }
1031}
1032
1033/// Evaluate a predicate tree against a full Card JSON.
1034pub fn eval_predicate(pred: &Predicate, card: &Json) -> bool {
1035    match pred {
1036        Predicate::And(subs) => subs.iter().all(|p| eval_predicate(p, card)),
1037        Predicate::Or(subs) => subs.iter().any(|p| eval_predicate(p, card)),
1038        Predicate::Not(sub) => !eval_predicate(sub, card),
1039        Predicate::Cmp(c) => eval_cmp(c, card),
1040    }
1041}
1042
1043// ───────────────────────────────────────────────────────────────
1044// Order-by
1045// ───────────────────────────────────────────────────────────────
1046
1047/// Parsed sort key: path with optional descending flag.
1048#[derive(Debug, Clone)]
1049pub struct OrderKey {
1050    pub path: Vec<String>,
1051    pub desc: bool,
1052}
1053
1054impl OrderKey {
1055    fn parse(raw: &str) -> Result<Self, String> {
1056        if raw.is_empty() {
1057            return Err("order_by key must not be empty".into());
1058        }
1059        let (desc, rest) = if let Some(r) = raw.strip_prefix('-') {
1060            (true, r)
1061        } else {
1062            (false, raw)
1063        };
1064        let path: Vec<String> = rest.split('.').map(|s| s.to_string()).collect();
1065        if path.iter().any(|p| p.is_empty()) {
1066            return Err(format!("invalid order_by key: '{raw}'"));
1067        }
1068        Ok(Self { path, desc })
1069    }
1070}
1071
1072/// Parse an order_by JSON value.  Accepts:
1073///   - a string: `"stats.pass_rate"` or `"-stats.pass_rate"`
1074///   - an array of strings: `["-stats.pass_rate", "created_at"]`
1075pub fn parse_order_by(value: &Json) -> Result<Vec<OrderKey>, String> {
1076    match value {
1077        Json::String(s) => Ok(vec![OrderKey::parse(s)?]),
1078        Json::Array(arr) => {
1079            let mut out = Vec::with_capacity(arr.len());
1080            for v in arr {
1081                let s = v
1082                    .as_str()
1083                    .ok_or_else(|| "order_by array must contain strings".to_string())?;
1084                out.push(OrderKey::parse(s)?);
1085            }
1086            Ok(out)
1087        }
1088        _ => Err("order_by must be a string or array of strings".into()),
1089    }
1090}
1091
1092/// Query parameters for `find`.
1093#[derive(Debug, Default, Clone)]
1094pub struct FindQuery {
1095    /// Restrict scan to a single pkg subdir (I/O optimization).
1096    pub pkg: Option<String>,
1097    /// Prisma-style predicate tree.
1098    pub where_: Option<Predicate>,
1099    /// Sort keys (dotted paths, optional `-` prefix for desc).
1100    pub order_by: Vec<OrderKey>,
1101    pub limit: Option<usize>,
1102    pub offset: Option<usize>,
1103}
1104
1105/// A loaded Card row flowing through the find() pipeline.
1106///
1107/// `full` is the whole Card JSON (used by `where` evaluation and
1108/// `order_by` dotted-path lookup); `summary` is the projection
1109/// returned to callers.
1110#[derive(Debug, Clone)]
1111struct CardRow {
1112    full: Json,
1113    summary: Summary,
1114}
1115
1116/// Load a single Card file into a `CardRow`.
1117fn load_full(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<CardRow> {
1118    let text = store.read_locator_text(locator).ok().flatten()?;
1119    let val: toml::Value = toml::from_str(&text).ok()?;
1120    let json = toml_to_json(val);
1121
1122    let card_id = json
1123        .get("card_id")
1124        .and_then(|v| v.as_str())
1125        .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
1126        .to_string();
1127    let created_at = json
1128        .get("created_at")
1129        .and_then(|v| v.as_str())
1130        .map(String::from);
1131    let model = json
1132        .get("model")
1133        .and_then(|m| m.get("id"))
1134        .and_then(|v| v.as_str())
1135        .map(String::from);
1136    let scenario = json
1137        .get("scenario")
1138        .and_then(|s| s.get("name"))
1139        .and_then(|v| v.as_str())
1140        .map(String::from);
1141    let pass_rate = json
1142        .get("stats")
1143        .and_then(|s| s.get("pass_rate"))
1144        .and_then(|v| v.as_f64());
1145
1146    Some(CardRow {
1147        full: json,
1148        summary: Summary {
1149            card_id,
1150            pkg: pkg.to_string(),
1151            created_at,
1152            model,
1153            scenario,
1154            pass_rate,
1155        },
1156    })
1157}
1158
1159/// Compare two Card rows according to an ordered list of sort keys.
1160fn order_cards(a: &CardRow, b: &CardRow, keys: &[OrderKey]) -> std::cmp::Ordering {
1161    use std::cmp::Ordering;
1162    for k in keys {
1163        let va = fetch_path(&a.full, &k.path);
1164        let vb = fetch_path(&b.full, &k.path);
1165        let ord = match (va, vb) {
1166            (None, None) => Ordering::Equal,
1167            (None, Some(_)) => Ordering::Greater, // missing sorts last
1168            (Some(_), None) => Ordering::Less,
1169            (Some(x), Some(y)) => json_cmp(x, y).unwrap_or(Ordering::Equal),
1170        };
1171        let ord = if k.desc { ord.reverse() } else { ord };
1172        if ord != Ordering::Equal {
1173            return ord;
1174        }
1175    }
1176    Ordering::Equal
1177}
1178
1179/// Summary-only fields that can be sorted without loading full TOML.
1180const SUMMARY_SORT_FIELDS: &[&str] = &[
1181    "card_id",
1182    "created_at",
1183    "stats.pass_rate",
1184    "scenario.name",
1185    "model.id",
1186];
1187
1188/// Return true when the query can be answered with lightweight Summary
1189/// rows (no full-TOML load needed).
1190fn is_lightweight_query(q: &FindQuery) -> bool {
1191    q.where_.is_none()
1192        && q.order_by
1193            .iter()
1194            .all(|k| SUMMARY_SORT_FIELDS.contains(&k.path.join(".").as_str()))
1195}
1196
1197/// Sort Summary rows using order_by keys that map to Summary fields.
1198fn order_summaries(a: &Summary, b: &Summary, keys: &[OrderKey]) -> std::cmp::Ordering {
1199    use std::cmp::Ordering;
1200    for k in keys {
1201        let key_str = k.path.join(".");
1202        let ord = match key_str.as_str() {
1203            "card_id" => a.card_id.cmp(&b.card_id),
1204            "created_at" => a.created_at.cmp(&b.created_at),
1205            "stats.pass_rate" => match (a.pass_rate, b.pass_rate) {
1206                (None, None) => Ordering::Equal,
1207                (None, Some(_)) => Ordering::Greater,
1208                (Some(_), None) => Ordering::Less,
1209                (Some(x), Some(y)) => x.partial_cmp(&y).unwrap_or(Ordering::Equal),
1210            },
1211            "scenario.name" => a.scenario.cmp(&b.scenario),
1212            "model.id" => a.model.cmp(&b.model),
1213            _ => Ordering::Equal,
1214        };
1215        let ord = if k.desc { ord.reverse() } else { ord };
1216        if ord != Ordering::Equal {
1217            return ord;
1218        }
1219    }
1220    Ordering::Equal
1221}
1222
1223/// Filter/sort Cards across the store using the `where` DSL.
1224///
1225/// When no `where` clause is specified and `order_by` only references
1226/// summary-level fields, uses the lightweight `list_with_store` path to
1227/// avoid loading full TOML.  Otherwise loads full TOML per Card.
1228pub fn find_with_store(store: &dyn CardStore, q: FindQuery) -> Result<Vec<Summary>, String> {
1229    // Fast path: lightweight query, no full-TOML load needed.
1230    if is_lightweight_query(&q) {
1231        let mut rows = list_with_store(store, q.pkg.as_deref())?;
1232        if q.order_by.is_empty() {
1233            rows.sort_by(|a, b| {
1234                b.created_at
1235                    .cmp(&a.created_at)
1236                    .then_with(|| b.card_id.cmp(&a.card_id))
1237            });
1238        } else {
1239            rows.sort_by(|a, b| order_summaries(a, b, &q.order_by));
1240        }
1241        let out: Vec<Summary> = rows
1242            .into_iter()
1243            .skip(q.offset.unwrap_or(0))
1244            .take(q.limit.unwrap_or(usize::MAX))
1245            .collect();
1246        return Ok(out);
1247    }
1248
1249    // Full path: load entire TOML for where evaluation / arbitrary order_by.
1250    let all_rows = scan_cards(store, q.pkg.as_deref())?;
1251
1252    // Filter by where.
1253    let mut rows: Vec<CardRow> = if let Some(pred) = &q.where_ {
1254        all_rows
1255            .into_iter()
1256            .filter(|row| eval_predicate(pred, &row.full))
1257            .collect()
1258    } else {
1259        all_rows
1260    };
1261
1262    // Sort.
1263    if q.order_by.is_empty() {
1264        rows.sort_by(|a, b| {
1265            b.summary
1266                .created_at
1267                .cmp(&a.summary.created_at)
1268                .then_with(|| b.summary.card_id.cmp(&a.summary.card_id))
1269        });
1270    } else {
1271        rows.sort_by(|a, b| order_cards(a, b, &q.order_by));
1272    }
1273
1274    // Offset + limit.
1275    let out: Vec<Summary> = rows
1276        .into_iter()
1277        .skip(q.offset.unwrap_or(0))
1278        .take(q.limit.unwrap_or(usize::MAX))
1279        .map(|r| r.summary)
1280        .collect();
1281
1282    Ok(out)
1283}
1284
1285// ───────────────────────────────────────────────────────────────
1286// Lineage walker
1287// ───────────────────────────────────────────────────────────────
1288//
1289// Cards form a tree (typically, not strictly a DAG) via the
1290// `metadata.prior_card_id` convention. `lineage()` walks that tree
1291// either up (toward ancestors) or down (toward descendants) or both,
1292// up to a configurable depth, optionally filtered by `prior_relation`.
1293//
1294// Up-walk is O(depth) — each step reads one parent Card.
1295// Down-walk is O(N_cards × depth) — we scan the whole store at each
1296// level. For the current scale (hundreds to low thousands of cards)
1297// this is fine; if the store grows we can build a prior_card_id index.
1298
1299/// Walk direction for `lineage`.
1300#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1301pub enum LineageDirection {
1302    #[default]
1303    Up,
1304    Down,
1305    Both,
1306}
1307
1308impl LineageDirection {
1309    pub fn parse(s: &str) -> Result<Self, String> {
1310        match s {
1311            "up" => Ok(Self::Up),
1312            "down" => Ok(Self::Down),
1313            "both" => Ok(Self::Both),
1314            other => Err(format!(
1315                "direction must be 'up', 'down', or 'both' (got '{other}')"
1316            )),
1317        }
1318    }
1319}
1320
1321/// Query parameters for `lineage`.
1322#[derive(Debug, Clone, Default)]
1323pub struct LineageQuery {
1324    pub card_id: String,
1325    pub direction: LineageDirection,
1326    /// Max traversal depth. Default 10.
1327    pub depth: Option<usize>,
1328    /// Include a per-node `stats` field (full [stats] section).
1329    pub include_stats: bool,
1330    /// If set, only edges whose `prior_relation` is in this list are
1331    /// followed.  The root is always included regardless.
1332    pub relation_filter: Option<Vec<String>>,
1333}
1334
1335/// One node in the lineage result.
1336///
1337/// `depth` is the signed distance from the root: negative for
1338/// ancestors (up-walk), 0 for the root, positive for descendants.
1339#[derive(Debug, Clone)]
1340pub struct LineageNode {
1341    pub card_id: String,
1342    pub pkg: String,
1343    pub prior_card_id: Option<String>,
1344    pub prior_relation: Option<String>,
1345    pub depth: i32,
1346    pub stats: Option<Json>,
1347}
1348
1349/// One edge in the lineage result (child → parent, always).
1350#[derive(Debug, Clone)]
1351pub struct LineageEdge {
1352    pub from: String,
1353    pub to: String,
1354    pub relation: Option<String>,
1355}
1356
1357/// Full lineage walk result.
1358#[derive(Debug, Clone)]
1359pub struct LineageResult {
1360    pub root: String,
1361    pub nodes: Vec<LineageNode>,
1362    pub edges: Vec<LineageEdge>,
1363    pub truncated: bool,
1364}
1365
1366const DEFAULT_LINEAGE_DEPTH: usize = 10;
1367
1368/// Extract the lineage fields from a Card JSON.
1369/// Returns (prior_card_id, prior_relation).
1370fn lineage_fields(card: &Json) -> (Option<String>, Option<String>) {
1371    let meta = card.get("metadata");
1372    let prior_card_id = meta
1373        .and_then(|m| m.get("prior_card_id"))
1374        .and_then(|v| v.as_str())
1375        .map(String::from);
1376    let prior_relation = meta
1377        .and_then(|m| m.get("prior_relation"))
1378        .and_then(|v| v.as_str())
1379        .map(String::from);
1380    (prior_card_id, prior_relation)
1381}
1382
1383/// Build a LineageNode from a loaded CardRow at a given depth.
1384fn make_node(row: &CardRow, depth: i32, include_stats: bool) -> LineageNode {
1385    let (prior_card_id, prior_relation) = lineage_fields(&row.full);
1386    let stats = if include_stats {
1387        row.full.get("stats").cloned()
1388    } else {
1389        None
1390    };
1391    LineageNode {
1392        card_id: row.summary.card_id.clone(),
1393        pkg: row.summary.pkg.clone(),
1394        prior_card_id,
1395        prior_relation,
1396        depth,
1397        stats,
1398    }
1399}
1400
1401/// Check whether `relation` passes the relation_filter (None means no
1402/// filter, which always passes).
1403fn relation_passes(filter: &Option<Vec<String>>, relation: &Option<String>) -> bool {
1404    match filter {
1405        None => true,
1406        Some(allowed) => match relation {
1407            Some(r) => allowed.iter().any(|a| a == r),
1408            None => false,
1409        },
1410    }
1411}
1412
1413/// Full in-memory card index with forward and reverse lineage maps.
1414struct CardIndex {
1415    /// card_id → CardRow
1416    cards: std::collections::HashMap<String, CardRow>,
1417    /// parent card_id → Vec<child card_id> (reverse lineage index)
1418    children: std::collections::HashMap<String, Vec<String>>,
1419}
1420
1421/// Load all Cards in the store once, keyed by card_id.
1422/// Also builds a reverse index (parent → children) so that
1423/// `walk_down` is O(result_size) instead of O(N_cards × depth).
1424fn load_card_index(store: &dyn CardStore) -> Result<CardIndex, String> {
1425    let rows = scan_cards(store, None)?;
1426
1427    let mut cards = std::collections::HashMap::with_capacity(rows.len());
1428    let mut children: std::collections::HashMap<String, Vec<String>> =
1429        std::collections::HashMap::new();
1430
1431    for row in rows {
1432        let id = row.summary.card_id.clone();
1433        let (prior_id, _) = lineage_fields(&row.full);
1434        if let Some(parent) = prior_id {
1435            children.entry(parent).or_default().push(id.clone());
1436        }
1437        cards.insert(id, row);
1438    }
1439    Ok(CardIndex { cards, children })
1440}
1441
1442/// Scan all Cards in the store, loading full TOML for each. When
1443/// `pkg_filter` is provided, only that pkg subdir is scanned. Shared
1444/// between `find` and `load_card_index`.
1445fn scan_cards(store: &dyn CardStore, pkg_filter: Option<&str>) -> Result<Vec<CardRow>, String> {
1446    let locators = store.list_card_locators(pkg_filter)?;
1447    let mut rows = Vec::with_capacity(locators.len());
1448    for (pkg, loc) in &locators {
1449        if let Some(row) = load_full(store, loc, pkg) {
1450            rows.push(row);
1451        }
1452    }
1453    Ok(rows)
1454}
1455
1456/// Invariant context passed through the lineage walkers.
1457struct LineageCtx<'a> {
1458    index: &'a CardIndex,
1459    relation_filter: &'a Option<Vec<String>>,
1460    include_stats: bool,
1461    max_depth: usize,
1462}
1463
1464/// Mutable accumulator for one lineage walk.
1465struct LineageAccum {
1466    nodes: Vec<LineageNode>,
1467    edges: Vec<LineageEdge>,
1468    visited: std::collections::HashSet<String>,
1469    truncated: bool,
1470}
1471
1472/// Walk ancestors via `metadata.prior_card_id`.
1473fn walk_up(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1474    let mut cur = start_id.to_string();
1475    for step in 1..=ctx.max_depth {
1476        let Some(row) = ctx.index.cards.get(&cur) else {
1477            return;
1478        };
1479        let (prior_id, prior_rel) = lineage_fields(&row.full);
1480        let Some(prior_id) = prior_id else {
1481            return;
1482        };
1483        if !relation_passes(ctx.relation_filter, &prior_rel) {
1484            return;
1485        }
1486        if acc.visited.contains(&prior_id) {
1487            return;
1488        }
1489        let Some(parent) = ctx.index.cards.get(&prior_id) else {
1490            return;
1491        };
1492        acc.nodes
1493            .push(make_node(parent, -(step as i32), ctx.include_stats));
1494        acc.edges.push(LineageEdge {
1495            from: row.summary.card_id.clone(),
1496            to: parent.summary.card_id.clone(),
1497            relation: prior_rel,
1498        });
1499        acc.visited.insert(prior_id.clone());
1500        cur = prior_id;
1501    }
1502    // Depth exhausted but another unwalked parent exists → truncated.
1503    if let Some(row) = ctx.index.cards.get(&cur) {
1504        let (prior_id, _) = lineage_fields(&row.full);
1505        if prior_id
1506            .as_ref()
1507            .is_some_and(|p| ctx.index.cards.contains_key(p) && !acc.visited.contains(p))
1508        {
1509            acc.truncated = true;
1510        }
1511    }
1512}
1513
1514/// Walk descendants using the reverse index (parent → children),
1515/// breadth-first.  O(result_size) instead of O(N_cards × depth).
1516fn walk_down(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1517    let mut frontier: Vec<String> = vec![start_id.to_string()];
1518
1519    for depth in 1..=ctx.max_depth {
1520        let mut next_frontier: Vec<String> = Vec::new();
1521        for parent_id in &frontier {
1522            let children = match ctx.index.children.get(parent_id) {
1523                Some(c) => c,
1524                None => continue,
1525            };
1526            for child_id in children {
1527                if acc.visited.contains(child_id) {
1528                    continue;
1529                }
1530                let Some(child) = ctx.index.cards.get(child_id) else {
1531                    continue;
1532                };
1533                let (_, prior_rel) = lineage_fields(&child.full);
1534                if !relation_passes(ctx.relation_filter, &prior_rel) {
1535                    continue;
1536                }
1537                acc.nodes
1538                    .push(make_node(child, depth as i32, ctx.include_stats));
1539                acc.edges.push(LineageEdge {
1540                    from: child.summary.card_id.clone(),
1541                    to: parent_id.clone(),
1542                    relation: prior_rel,
1543                });
1544                acc.visited.insert(child_id.clone());
1545                next_frontier.push(child_id.clone());
1546            }
1547        }
1548        if next_frontier.is_empty() {
1549            return;
1550        }
1551        frontier = next_frontier;
1552    }
1553    // Frontier still has nodes but depth is exhausted: check for
1554    // unwalked children at the next level.
1555    for parent_id in &frontier {
1556        let children = match ctx.index.children.get(parent_id) {
1557            Some(c) => c,
1558            None => continue,
1559        };
1560        for child_id in children {
1561            if acc.visited.contains(child_id) {
1562                continue;
1563            }
1564            let Some(child) = ctx.index.cards.get(child_id) else {
1565                continue;
1566            };
1567            let (_, prior_rel) = lineage_fields(&child.full);
1568            if relation_passes(ctx.relation_filter, &prior_rel) {
1569                acc.truncated = true;
1570                return;
1571            }
1572        }
1573    }
1574}
1575
1576/// Walk the lineage tree from `q.card_id` in `store`.
1577pub fn lineage_with_store(
1578    store: &dyn CardStore,
1579    q: LineageQuery,
1580) -> Result<Option<LineageResult>, String> {
1581    let index = load_card_index(store)?;
1582    let Some(root_row) = index.cards.get(&q.card_id) else {
1583        return Ok(None);
1584    };
1585
1586    let ctx = LineageCtx {
1587        index: &index,
1588        relation_filter: &q.relation_filter,
1589        include_stats: q.include_stats,
1590        max_depth: q.depth.unwrap_or(DEFAULT_LINEAGE_DEPTH),
1591    };
1592    let mut acc = LineageAccum {
1593        nodes: Vec::new(),
1594        edges: Vec::new(),
1595        visited: std::collections::HashSet::new(),
1596        truncated: false,
1597    };
1598
1599    acc.nodes.push(make_node(root_row, 0, q.include_stats));
1600    acc.visited.insert(q.card_id.clone());
1601
1602    if matches!(q.direction, LineageDirection::Up | LineageDirection::Both) {
1603        walk_up(&q.card_id, &ctx, &mut acc);
1604    }
1605    if matches!(q.direction, LineageDirection::Down | LineageDirection::Both) {
1606        walk_down(&q.card_id, &ctx, &mut acc);
1607    }
1608
1609    Ok(Some(LineageResult {
1610        root: q.card_id,
1611        nodes: acc.nodes,
1612        edges: acc.edges,
1613        truncated: acc.truncated,
1614    }))
1615}
1616
1617/// Render a LineageResult as JSON for the service layer.
1618pub fn lineage_to_json(r: &LineageResult) -> Json {
1619    let nodes: Vec<Json> = r
1620        .nodes
1621        .iter()
1622        .map(|n| {
1623            let mut m = serde_json::Map::new();
1624            m.insert("card_id".into(), json!(n.card_id));
1625            m.insert("pkg".into(), json!(n.pkg));
1626            m.insert("depth".into(), json!(n.depth));
1627            if let Some(p) = &n.prior_card_id {
1628                m.insert("prior_card_id".into(), json!(p));
1629            }
1630            if let Some(rel) = &n.prior_relation {
1631                m.insert("prior_relation".into(), json!(rel));
1632            }
1633            if let Some(s) = &n.stats {
1634                m.insert("stats".into(), s.clone());
1635            }
1636            Json::Object(m)
1637        })
1638        .collect();
1639    let edges: Vec<Json> = r
1640        .edges
1641        .iter()
1642        .map(|e| {
1643            let mut m = serde_json::Map::new();
1644            m.insert("from".into(), json!(e.from));
1645            m.insert("to".into(), json!(e.to));
1646            if let Some(rel) = &e.relation {
1647                m.insert("relation".into(), json!(rel));
1648            }
1649            Json::Object(m)
1650        })
1651        .collect();
1652    json!({
1653        "root": r.root,
1654        "nodes": nodes,
1655        "edges": edges,
1656        "truncated": r.truncated,
1657    })
1658}
1659
1660// ───────────────────────────────────────────────────────────────
1661// Samples sidecar: per-case detail written alongside a Card as
1662// `{pkg}/{card_id}.samples.jsonl`. Write-once to preserve Card
1663// immutability: once a Card has a samples file, it cannot be
1664// rewritten — mismatched per-case data would break auditability.
1665// ───────────────────────────────────────────────────────────────
1666
1667// ───────────────────────────────────────────────────────────────
1668// Card import: copy Card files from an external directory into the
1669// local cards store. Used by `alc_card_install` (Card Collections)
1670// and by `alc_pkg_install` (Pkg-bundled cards/).
1671// ───────────────────────────────────────────────────────────────
1672
1673/// Import Card files into `store` from `source_dir` under `pkg`.
1674///
1675/// Copies `*.toml` and `*.samples.jsonl` files. Existing cards with the
1676/// same id are skipped (first-writer wins — Card immutability).
1677///
1678/// Returns `(imported, skipped)` card_id lists.
1679pub fn import_from_dir_with_store(
1680    store: &dyn CardStore,
1681    source_dir: &std::path::Path,
1682    pkg: &str,
1683) -> Result<(Vec<String>, Vec<String>), String> {
1684    let (imported, skipped) = store.import_from_dir(source_dir, pkg)?;
1685    for card_id in &imported {
1686        match store.read_card_text(card_id) {
1687            Ok(Some(toml_text)) => publish(CardEvent::Created {
1688                pkg: pkg.to_string(),
1689                card_id: card_id.clone(),
1690                toml_text,
1691            }),
1692            Ok(None) => {
1693                tracing::warn!(
1694                    card_id = %card_id,
1695                    "import_from_dir: read_card_text returned None after import; skipping publish"
1696                );
1697            }
1698            Err(e) => {
1699                tracing::warn!(
1700                    card_id = %card_id,
1701                    error = %e,
1702                    "import_from_dir: read_card_text failed after import; skipping publish"
1703                );
1704            }
1705        }
1706        // Samples are optional — best-effort.
1707        match store.read_samples_text(card_id) {
1708            Ok(Some(jsonl_text)) => publish(CardEvent::SamplesWritten {
1709                card_id: card_id.clone(),
1710                jsonl_text,
1711            }),
1712            Ok(None) => {}
1713            Err(e) => {
1714                tracing::warn!(
1715                    card_id = %card_id,
1716                    error = %e,
1717                    "import_from_dir: read_samples_text failed after import; skipping publish"
1718                );
1719            }
1720        }
1721    }
1722    Ok((imported, skipped))
1723}
1724
1725/// Write per-case samples to `{card_id}.samples.jsonl` (write-once).
1726///
1727/// Each `samples` entry is serialized as one compact JSON line.
1728/// Fails if a samples file already exists for this card — mirrors
1729/// the immutability guarantee of Cards themselves.
1730pub fn write_samples_with_store(
1731    store: &dyn CardStore,
1732    card_id: &str,
1733    samples: Vec<Json>,
1734) -> Result<PathBuf, String> {
1735    if store.samples_exists(card_id)? {
1736        return Err(format!(
1737            "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
1738        ));
1739    }
1740    let mut buf = String::new();
1741    for (idx, s) in samples.iter().enumerate() {
1742        let line = serde_json::to_string(s).map_err(|e| {
1743            format!("alc.card.write_samples: failed to serialize sample #{idx}: {e}")
1744        })?;
1745        buf.push_str(&line);
1746        buf.push('\n');
1747    }
1748    let path = store.write_samples_text(card_id, &buf)?;
1749
1750    publish(CardEvent::SamplesWritten {
1751        card_id: card_id.to_string(),
1752        jsonl_text: buf,
1753    });
1754
1755    Ok(path)
1756}
1757
1758/// Query parameters for `read_samples`.
1759#[derive(Debug, Default, Clone)]
1760pub struct SamplesQuery {
1761    /// Skip this many matched rows (after `where` filtering).
1762    pub offset: usize,
1763    /// Max matched rows to return.
1764    pub limit: Option<usize>,
1765    /// Optional `where` predicate applied to each sample row.
1766    /// The row JSON is the full line object (no section wrapping).
1767    pub where_: Option<Predicate>,
1768}
1769
1770/// Read per-case samples from `{card_id}.samples.jsonl`.
1771///
1772/// Streams the JSONL file line by line; rows are parsed, optionally
1773/// filtered by `q.where_`, then paged by `offset` + `limit`.  Offset
1774/// applies to the **post-filter** stream, matching Prisma/SQL
1775/// semantics.
1776///
1777/// Returns an empty Vec if no samples file exists (Cards without
1778/// per-case details are the common case, not an error).
1779pub fn read_samples_with_store(
1780    store: &dyn CardStore,
1781    card_id: &str,
1782    q: SamplesQuery,
1783) -> Result<Vec<Json>, String> {
1784    let text = match store.read_samples_text(card_id)? {
1785        Some(t) => t,
1786        None => return Ok(Vec::new()),
1787    };
1788    let mut matched: usize = 0;
1789    let mut out = Vec::new();
1790    for (i, line) in text.lines().enumerate() {
1791        if line.trim().is_empty() {
1792            continue;
1793        }
1794        let val: Json = serde_json::from_str(line)
1795            .map_err(|e| format!("Failed to parse sample line {i}: {e}"))?;
1796        if let Some(pred) = &q.where_ {
1797            if !eval_predicate(pred, &val) {
1798                continue;
1799            }
1800        }
1801        if matched < q.offset {
1802            matched += 1;
1803            continue;
1804        }
1805        if let Some(lim) = q.limit {
1806            if out.len() >= lim {
1807                break;
1808            }
1809        }
1810        matched += 1;
1811        out.push(val);
1812    }
1813    Ok(out)
1814}
1815
1816// ═══════════════════════════════════════════════════════════════
1817// FileCardStore — default backend.
1818// ═══════════════════════════════════════════════════════════════
1819//
1820// Stores Cards as TOML files under `{root}/{pkg}/{card_id}.toml`,
1821// samples as `{root}/{pkg}/{card_id}.samples.jsonl`, and the alias
1822// table as `{root}/_aliases.toml`.
1823//
1824// `root` is provided at construction time via `new(root)`; callers
1825// (typically the service layer) resolve it from the `AppDir`
1826// abstraction. Tests use a tempdir via `new(tmpdir)`.
1827
1828/// File-backed implementation of [`CardStore`].
1829pub struct FileCardStore {
1830    root: PathBuf,
1831}
1832
1833impl FileCardStore {
1834    /// Construct a store rooted at an explicit path.
1835    pub fn new(root: PathBuf) -> Self {
1836        Self { root }
1837    }
1838
1839    /// Return the root directory this store writes under.
1840    pub fn root(&self) -> &Path {
1841        &self.root
1842    }
1843
1844    // ─── Thin `self` delegations to the `*_with_store` free fns ────
1845    //
1846    // These let callers that hold an `Arc<FileCardStore>` (bridge/data
1847    // register_card closures, service layer) invoke domain logic via
1848    // instance methods without re-importing the `_with_store` free
1849    // functions. Semantics are identical to the free-fn variants.
1850
1851    pub fn create(&self, input: Json) -> Result<(String, PathBuf), String> {
1852        create_with_store(self, input)
1853    }
1854
1855    pub fn get(&self, card_id: &str) -> Result<Option<Json>, String> {
1856        get_with_store(self, card_id)
1857    }
1858
1859    pub fn list(&self, pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
1860        list_with_store(self, pkg_filter)
1861    }
1862
1863    pub fn append(&self, card_id: &str, fields: Json) -> Result<Json, String> {
1864        append_with_store(self, card_id, fields)
1865    }
1866
1867    pub fn alias_set(
1868        &self,
1869        name: &str,
1870        card_id: &str,
1871        pkg: Option<&str>,
1872        note: Option<&str>,
1873    ) -> Result<Alias, String> {
1874        alias_set_with_store(self, name, card_id, pkg, note)
1875    }
1876
1877    pub fn alias_list(&self, pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
1878        alias_list_with_store(self, pkg_filter)
1879    }
1880
1881    pub fn get_by_alias(&self, name: &str) -> Result<Option<Json>, String> {
1882        get_by_alias_with_store(self, name)
1883    }
1884
1885    pub fn find(&self, q: FindQuery) -> Result<Vec<Summary>, String> {
1886        find_with_store(self, q)
1887    }
1888
1889    pub fn write_samples(&self, card_id: &str, samples: Vec<Json>) -> Result<PathBuf, String> {
1890        write_samples_with_store(self, card_id, samples)
1891    }
1892
1893    pub fn read_samples(&self, card_id: &str, q: SamplesQuery) -> Result<Vec<Json>, String> {
1894        read_samples_with_store(self, card_id, q)
1895    }
1896
1897    pub fn lineage(&self, q: LineageQuery) -> Result<Option<LineageResult>, String> {
1898        lineage_with_store(self, q)
1899    }
1900
1901    pub fn card_sink_backfill(
1902        &self,
1903        sink: &str,
1904        dry_run: bool,
1905    ) -> Result<SinkBackfillReport, String> {
1906        card_sink_backfill_with_store(self, sink, dry_run)
1907    }
1908
1909    /// Returns the absolute path to the per-pkg subdirectory,
1910    /// creating it when missing. Validates `pkg` to prevent path
1911    /// traversal.
1912    fn pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
1913        validate_name(pkg, "pkg")?;
1914        let dir = self.root.join(pkg);
1915        if !dir.exists() {
1916            fs::create_dir_all(&dir).map_err(|e| format!("Failed to create pkg dir: {e}"))?;
1917        }
1918        Ok(dir)
1919    }
1920
1921    /// Path of the global alias table.
1922    fn aliases_path(&self) -> PathBuf {
1923        self.root.join("_aliases.toml")
1924    }
1925
1926    /// Path of the samples sidecar for `card_id`. Errors if the
1927    /// Card itself does not exist — samples without a parent Card
1928    /// are meaningless and we refuse to create orphans.
1929    fn samples_path(&self, card_id: &str) -> Result<PathBuf, String> {
1930        let card_path = self
1931            .find_card_locator(card_id)?
1932            .ok_or_else(|| format!("card '{card_id}' not found"))?;
1933        let dir = card_path
1934            .parent()
1935            .ok_or_else(|| format!("card '{card_id}' has no parent directory"))?;
1936        Ok(dir.join(format!("{card_id}.samples.jsonl")))
1937    }
1938}
1939
1940impl CardStore for FileCardStore {
1941    fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1942        let dir = self.pkg_dir(pkg)?;
1943        let path = dir.join(format!("{card_id}.toml"));
1944        if path.exists() {
1945            return Err(format!(
1946                "alc.card.create: card '{card_id}' already exists (immutable)"
1947            ));
1948        }
1949        let tmp = path.with_extension("toml.tmp");
1950        fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1951        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1952        Ok(path)
1953    }
1954
1955    fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1956        let path = self
1957            .find_card_locator(card_id)?
1958            .ok_or_else(|| format!("alc.card.overwrite: card '{card_id}' not found"))?;
1959        let tmp = path.with_extension("toml.tmp");
1960        fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1961        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1962        Ok(path)
1963    }
1964
1965    fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
1966        validate_name(card_id, "card_id")?;
1967        if !self.root.exists() {
1968            return Ok(None);
1969        }
1970        let entries =
1971            fs::read_dir(&self.root).map_err(|e| format!("Failed to read cards dir: {e}"))?;
1972        for entry in entries.flatten() {
1973            let p = entry.path();
1974            if p.is_dir() {
1975                let candidate = p.join(format!("{card_id}.toml"));
1976                if candidate.exists() {
1977                    return Ok(Some(candidate));
1978                }
1979            }
1980        }
1981        Ok(None)
1982    }
1983
1984    fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String> {
1985        let Some(path) = self.find_card_locator(card_id)? else {
1986            return Ok(None);
1987        };
1988        let text = fs::read_to_string(&path)
1989            .map_err(|e| format!("Failed to read card '{card_id}': {e}"))?;
1990        Ok(Some(text))
1991    }
1992
1993    fn list_card_locators(
1994        &self,
1995        pkg_filter: Option<&str>,
1996    ) -> Result<Vec<(String, PathBuf)>, String> {
1997        if !self.root.exists() {
1998            return Ok(Vec::new());
1999        }
2000        let pkg_dirs: Vec<PathBuf> = if let Some(p) = pkg_filter {
2001            validate_name(p, "pkg")?;
2002            let d = self.root.join(p);
2003            if d.is_dir() {
2004                vec![d]
2005            } else {
2006                return Ok(Vec::new());
2007            }
2008        } else {
2009            fs::read_dir(&self.root)
2010                .map_err(|e| format!("Failed to read cards dir: {e}"))?
2011                .flatten()
2012                .map(|e| e.path())
2013                .filter(|p| p.is_dir())
2014                .collect()
2015        };
2016
2017        let mut out = Vec::new();
2018        for pdir in pkg_dirs {
2019            let pkg = pdir
2020                .file_name()
2021                .and_then(|s| s.to_str())
2022                .unwrap_or("")
2023                .to_string();
2024            let entries = match fs::read_dir(&pdir) {
2025                Ok(e) => e,
2026                Err(_) => continue,
2027            };
2028            for entry in entries.flatten() {
2029                let p = entry.path();
2030                if p.extension().and_then(|s| s.to_str()) != Some("toml") {
2031                    continue;
2032                }
2033                out.push((pkg.clone(), p));
2034            }
2035        }
2036        Ok(out)
2037    }
2038
2039    fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String> {
2040        match fs::read_to_string(locator) {
2041            Ok(text) => Ok(Some(text)),
2042            Err(_) => Ok(None),
2043        }
2044    }
2045
2046    fn read_aliases(&self) -> Result<Vec<Alias>, String> {
2047        let path = self.aliases_path();
2048        if !path.exists() {
2049            return Ok(Vec::new());
2050        }
2051        let text =
2052            fs::read_to_string(&path).map_err(|e| format!("Failed to read aliases file: {e}"))?;
2053        let val: toml::Value =
2054            toml::from_str(&text).map_err(|e| format!("Failed to parse aliases file: {e}"))?;
2055        let arr = val
2056            .get("alias")
2057            .and_then(|v| v.as_array())
2058            .cloned()
2059            .unwrap_or_default();
2060        let mut out = Vec::with_capacity(arr.len());
2061        for entry in arr {
2062            let t = match entry {
2063                toml::Value::Table(t) => t,
2064                _ => continue,
2065            };
2066            let name = match t.get("name").and_then(|v| v.as_str()) {
2067                Some(s) => s.to_string(),
2068                None => continue,
2069            };
2070            let card_id = match t.get("card_id").and_then(|v| v.as_str()) {
2071                Some(s) => s.to_string(),
2072                None => continue,
2073            };
2074            out.push(Alias {
2075                name,
2076                card_id,
2077                pkg: t.get("pkg").and_then(|v| v.as_str()).map(String::from),
2078                set_at: t
2079                    .get("set_at")
2080                    .and_then(|v| v.as_str())
2081                    .map(String::from)
2082                    .unwrap_or_default(),
2083                note: t.get("note").and_then(|v| v.as_str()).map(String::from),
2084            });
2085        }
2086        Ok(out)
2087    }
2088
2089    fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String> {
2090        // Ensure the cards root exists so aliases can be written to
2091        // a brand-new store (mirrors the behaviour of `cards_dir()`).
2092        if !self.root.exists() {
2093            fs::create_dir_all(&self.root)
2094                .map_err(|e| format!("Failed to create cards dir: {e}"))?;
2095        }
2096        let path = self.aliases_path();
2097        let mut arr = Vec::with_capacity(aliases.len());
2098        for a in aliases {
2099            let mut t = toml::map::Map::new();
2100            t.insert("name".into(), toml::Value::String(a.name.clone()));
2101            t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
2102            if let Some(p) = &a.pkg {
2103                t.insert("pkg".into(), toml::Value::String(p.clone()));
2104            }
2105            t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
2106            if let Some(n) = &a.note {
2107                t.insert("note".into(), toml::Value::String(n.clone()));
2108            }
2109            arr.push(toml::Value::Table(t));
2110        }
2111        let mut root = toml::map::Map::new();
2112        root.insert("alias".into(), toml::Value::Array(arr));
2113        let text = toml::to_string_pretty(&toml::Value::Table(root))
2114            .map_err(|e| format!("Failed to serialize aliases: {e}"))?;
2115        let tmp = path.with_extension("toml.tmp");
2116        fs::write(&tmp, &text).map_err(|e| format!("Failed to write aliases tmp: {e}"))?;
2117        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename aliases file: {e}"))?;
2118        Ok(())
2119    }
2120
2121    fn samples_exists(&self, card_id: &str) -> Result<bool, String> {
2122        let path = self.samples_path(card_id)?;
2123        Ok(path.exists())
2124    }
2125
2126    fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String> {
2127        let path = self.samples_path(card_id)?;
2128        if path.exists() {
2129            return Err(format!(
2130                "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
2131            ));
2132        }
2133        let tmp = path.with_extension("jsonl.tmp");
2134        fs::write(&tmp, jsonl_text).map_err(|e| format!("Failed to write samples tmp: {e}"))?;
2135        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename samples file: {e}"))?;
2136        Ok(path)
2137    }
2138
2139    fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String> {
2140        let path = self.samples_path(card_id)?;
2141        if !path.exists() {
2142            return Ok(None);
2143        }
2144        let text =
2145            fs::read_to_string(&path).map_err(|e| format!("Failed to read samples file: {e}"))?;
2146        Ok(Some(text))
2147    }
2148
2149    fn import_from_dir(
2150        &self,
2151        source_dir: &Path,
2152        pkg: &str,
2153    ) -> Result<(Vec<String>, Vec<String>), String> {
2154        validate_name(pkg, "pkg")?;
2155        let dest = self.pkg_dir(pkg)?;
2156        let mut imported = Vec::new();
2157        let mut skipped = Vec::new();
2158
2159        let entries =
2160            fs::read_dir(source_dir).map_err(|e| format!("Failed to read card source dir: {e}"))?;
2161
2162        for entry in entries.flatten() {
2163            let path = entry.path();
2164            let fname = match path.file_name().and_then(|n| n.to_str()) {
2165                Some(n) => n.to_string(),
2166                None => continue,
2167            };
2168
2169            if !fname.ends_with(".toml") {
2170                continue;
2171            }
2172
2173            let card_id = fname.trim_end_matches(".toml");
2174            let dest_toml = dest.join(&fname);
2175
2176            if dest_toml.exists() {
2177                skipped.push(card_id.to_string());
2178                continue;
2179            }
2180
2181            let text = fs::read_to_string(&path)
2182                .map_err(|e| format!("Failed to read card file '{fname}': {e}"))?;
2183            let val: toml::Value = toml::from_str(&text)
2184                .map_err(|e| format!("Failed to parse card file '{fname}': {e}"))?;
2185            if val.get("schema_version").and_then(|v| v.as_str()) != Some(SCHEMA_VERSION) {
2186                continue;
2187            }
2188
2189            fs::copy(&path, &dest_toml)
2190                .map_err(|e| format!("Failed to copy card '{fname}': {e}"))?;
2191
2192            let samples_name = format!("{card_id}.samples.jsonl");
2193            let samples_src = source_dir.join(&samples_name);
2194            if samples_src.exists() {
2195                let samples_dest = dest.join(&samples_name);
2196                if !samples_dest.exists() {
2197                    fs::copy(&samples_src, &samples_dest)
2198                        .map_err(|e| format!("Failed to copy samples '{samples_name}': {e}"))?;
2199                }
2200            }
2201
2202            imported.push(card_id.to_string());
2203        }
2204
2205        Ok((imported, skipped))
2206    }
2207}
2208
2209// ═══════════════════════════════════════════════════════════════
2210// Event Publisher Port — Card sink fan-out (v1)
2211// ═══════════════════════════════════════════════════════════════
2212//
2213// Storage port (`CardStore` / `FileCardStore`) stays pure CRUD. The
2214// Event publisher port below mirrors every successful Card write to a
2215// set of subscriber backends configured via the `ALC_CARD_SINKS`
2216// environment variable. Fan-out is best-effort and strictly serial;
2217// subscriber failures never propagate to the primary Card API.
2218
2219// ─── CardEvent / CardEventKind ─────────────────────────────────
2220
2221/// A Card-level event emitted from the write path.
2222///
2223/// Each variant carries the already-serialized payload text so that
2224/// subscribers can persist the exact bytes that were written to the
2225/// primary store (byte-for-byte parity).
2226#[derive(Debug, Clone)]
2227pub enum CardEvent {
2228    /// A brand-new Card was written.
2229    Created {
2230        pkg: String,
2231        card_id: String,
2232        toml_text: String,
2233    },
2234    /// An existing Card had new top-level keys merged in.
2235    Appended { card_id: String, toml_text: String },
2236    /// A samples JSONL sidecar was written for `card_id`.
2237    SamplesWritten { card_id: String, jsonl_text: String },
2238    /// The global alias table was rewritten.
2239    AliasesWritten { toml_text: String },
2240}
2241
2242/// Lightweight discriminant for `CardEvent`. Used as a `HashMap` key in
2243/// `SubscriberStats` so that ok/err counters can be tracked per event
2244/// kind without holding the full payload.
2245#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
2246pub enum CardEventKind {
2247    Created,
2248    Appended,
2249    SamplesWritten,
2250    AliasesWritten,
2251}
2252
2253impl CardEventKind {
2254    /// Stable string tag used in `tracing` log fields.
2255    pub fn as_str(self) -> &'static str {
2256        match self {
2257            CardEventKind::Created => "created",
2258            CardEventKind::Appended => "appended",
2259            CardEventKind::SamplesWritten => "samples_written",
2260            CardEventKind::AliasesWritten => "aliases_written",
2261        }
2262    }
2263
2264    /// Short JSON key for the public `alc_stats` snapshot.
2265    /// Distinct from `as_str()` so that tracing logs keep their
2266    /// verbose form while the JSON surface stays compact.
2267    pub fn json_key(self) -> &'static str {
2268        match self {
2269            CardEventKind::Created => "created",
2270            CardEventKind::Appended => "appended",
2271            CardEventKind::SamplesWritten => "samples",
2272            CardEventKind::AliasesWritten => "aliases",
2273        }
2274    }
2275
2276    /// All kinds in stable display order. Used by `SubscriberHealthRow`
2277    /// to emit all four counter keys even when a counter is zero, so
2278    /// that downstream consumers can rely on field presence.
2279    pub fn all() -> [CardEventKind; 4] {
2280        [
2281            CardEventKind::Created,
2282            CardEventKind::Appended,
2283            CardEventKind::SamplesWritten,
2284            CardEventKind::AliasesWritten,
2285        ]
2286    }
2287}
2288
2289impl Serialize for CardEventKind {
2290    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2291        s.serialize_str(self.json_key())
2292    }
2293}
2294
2295impl CardEvent {
2296    /// Return the `CardEventKind` discriminant for this event.
2297    pub fn kind(&self) -> CardEventKind {
2298        match self {
2299            CardEvent::Created { .. } => CardEventKind::Created,
2300            CardEvent::Appended { .. } => CardEventKind::Appended,
2301            CardEvent::SamplesWritten { .. } => CardEventKind::SamplesWritten,
2302            CardEvent::AliasesWritten { .. } => CardEventKind::AliasesWritten,
2303        }
2304    }
2305}
2306
2307// ─── CardSubscriber trait ──────────────────────────────────────
2308
2309/// A downstream backend that receives `CardEvent`s in best-effort,
2310/// serial fan-out order.
2311///
2312/// Implementations must be `Send + Sync` so that the bus can hold
2313/// `Arc<dyn CardSubscriber>` safely across threads.
2314pub trait CardSubscriber: Send + Sync {
2315    /// Handle one event. Returning `Err` records a failure in
2316    /// `SubscriberStats` and emits a `tracing::warn!`, but does not
2317    /// propagate to the caller of the `_with_store` API.
2318    fn on_event(&self, ev: &CardEvent) -> Result<(), String>;
2319
2320    /// Canonical identity URI for this subscriber. Used as the key in
2321    /// `SubscriberStats` and as the match target for
2322    /// `CardEventBus::publish_to`.
2323    fn describe(&self) -> String;
2324
2325    /// Best-effort check for whether `card_id` already exists in this
2326    /// subscriber. Used by `alc_card_sink_backfill` to skip cards that
2327    /// are already mirrored (drift-safe: we never overwrite).
2328    ///
2329    /// Default implementation returns `Ok(false)` so subscribers that
2330    /// cannot cheaply answer (e.g. future network-backed sinks) always
2331    /// get the push. Override when a cheap local check is possible.
2332    fn has_card(&self, _card_id: &str) -> Result<bool, String> {
2333        Ok(false)
2334    }
2335}
2336
2337// ─── SubscriberStats ───────────────────────────────────────────
2338
2339/// Most recent delivery failure for a single subscriber. Exposed via
2340/// `SubscriberHealthRow.last_error` in the `alc_stats` JSON snapshot.
2341#[derive(Debug, Clone, Serialize)]
2342pub struct LastError {
2343    pub kind: CardEventKind,
2344    pub msg: String,
2345    pub ts_ms: u64,
2346}
2347
2348/// Per-subscriber counter state. Held inside `SubscriberStats` under a
2349/// `Mutex`; `snapshot` clones the fields into an owned `SubscriberHealthRow`
2350/// while the lock is held, so the lock window stays short.
2351#[derive(Default, Debug)]
2352pub struct PerSubscriber {
2353    pub ok: HashMap<CardEventKind, u64>,
2354    pub err: HashMap<CardEventKind, u64>,
2355    pub last_error: Option<LastError>,
2356}
2357
2358/// Process-wide per-subscriber statistics, keyed by subscriber URI
2359/// (the value returned by `CardSubscriber::describe`).
2360#[derive(Default, Debug)]
2361pub struct SubscriberStats {
2362    inner: Mutex<HashMap<String, PerSubscriber>>,
2363}
2364
2365impl SubscriberStats {
2366    /// Record a successful event delivery.
2367    pub fn record_ok(&self, key: &str, kind: CardEventKind) {
2368        let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2369        let entry = g.entry(key.to_string()).or_default();
2370        let c = entry.ok.entry(kind).or_insert(0);
2371        *c = c.saturating_add(1);
2372    }
2373
2374    /// Record a delivery failure together with the error message.
2375    /// The failure kind, message, and timestamp overwrite `last_error`
2376    /// — there is no ring buffer; only the most recent failure is kept.
2377    pub fn record_err(&self, key: &str, kind: CardEventKind, err: &str) {
2378        let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2379        let entry = g.entry(key.to_string()).or_default();
2380        let c = entry.err.entry(kind).or_insert(0);
2381        *c = c.saturating_add(1);
2382        entry.last_error = Some(LastError {
2383            kind,
2384            msg: err.to_string(),
2385            ts_ms: now_ms(),
2386        });
2387    }
2388
2389    /// Take a point-in-time snapshot of all subscribers. The returned
2390    /// `Vec` is owned — the internal lock is released before this
2391    /// function returns, so callers can hold it freely.
2392    ///
2393    /// All four `CardEventKind` keys (`created / appended / samples /
2394    /// aliases`) are always present in both `ok` and `err`, defaulting
2395    /// to 0 if no event of that kind has been recorded. This lets
2396    /// downstream consumers rely on field presence.
2397    pub fn snapshot(&self) -> Vec<SubscriberHealthRow> {
2398        let g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2399        let mut rows = Vec::with_capacity(g.len());
2400        for (sink, ps) in g.iter() {
2401            let mut ok: HashMap<String, u64> = HashMap::with_capacity(4);
2402            let mut err: HashMap<String, u64> = HashMap::with_capacity(4);
2403            for k in CardEventKind::all() {
2404                ok.insert(
2405                    k.json_key().to_string(),
2406                    ps.ok.get(&k).copied().unwrap_or(0),
2407                );
2408                err.insert(
2409                    k.json_key().to_string(),
2410                    ps.err.get(&k).copied().unwrap_or(0),
2411                );
2412            }
2413            rows.push(SubscriberHealthRow {
2414                sink: sink.clone(),
2415                ok,
2416                err,
2417                last_error: ps.last_error.clone(),
2418            });
2419        }
2420        // Stable output ordering (by sink URI) so that the JSON dump is
2421        // deterministic across runs — useful for snapshot tests.
2422        rows.sort_by(|a, b| a.sink.cmp(&b.sink));
2423        rows
2424    }
2425}
2426
2427/// Unix-epoch milliseconds used by `LastError.ts_ms`. Uses
2428/// `unwrap_or_default` so no panic can escape even if the system
2429/// clock is misconfigured.
2430fn now_ms() -> u64 {
2431    std::time::SystemTime::now()
2432        .duration_since(std::time::UNIX_EPOCH)
2433        .unwrap_or_default()
2434        .as_millis() as u64
2435}
2436
2437/// Snapshot row for a single subscriber, serialized directly into the
2438/// `alc_stats` JSON output as one element of the `card_sinks` array.
2439#[derive(Debug, Clone, Serialize)]
2440pub struct SubscriberHealthRow {
2441    pub sink: String,
2442    pub ok: HashMap<String, u64>,
2443    pub err: HashMap<String, u64>,
2444    pub last_error: Option<LastError>,
2445}
2446
2447/// Public entry point: snapshot of all process-wide subscriber stats.
2448/// Wrapper around `event_bus().stats().snapshot()` so that downstream
2449/// crates (notably `algocline-app`) do not need a handle to the
2450/// `CardEventBus` singleton.
2451pub fn subscriber_stats_snapshot() -> Vec<SubscriberHealthRow> {
2452    event_bus().stats().snapshot()
2453}
2454
2455// ─── FileCardSubscriber — file-URI backend ─────────────────────
2456
2457/// Atomically write `bytes` to `dest` by staging to a unique
2458/// `{dest}.tmp.{pid}.{seq}` sibling and renaming. The `{pid}.{seq}`
2459/// suffix prevents concurrent writers (same or different processes)
2460/// from colliding on the tmp path.
2461fn atomic_write(dest: &Path, bytes: &[u8]) -> Result<(), String> {
2462    static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
2463    let seq = TMP_SEQ.fetch_add(1, Ordering::Relaxed);
2464    let pid = process::id();
2465    if let Some(parent) = dest.parent() {
2466        if !parent.as_os_str().is_empty() && !parent.exists() {
2467            fs::create_dir_all(parent).map_err(|e| format!("subscriber mkdir: {e}"))?;
2468        }
2469    }
2470    let mut tmp = dest.as_os_str().to_owned();
2471    tmp.push(format!(".tmp.{pid}.{seq}"));
2472    let tmp_path = PathBuf::from(tmp);
2473    fs::write(&tmp_path, bytes).map_err(|e| format!("subscriber write tmp: {e}"))?;
2474    fs::rename(&tmp_path, dest).map_err(|e| format!("subscriber rename: {e}"))
2475}
2476
2477/// Canonical `file://` URI for a local directory path. The result is
2478/// the inverse of [`decode_file_uri_path`] — the pair round-trips
2479/// through the `ALC_CARD_SINKS` env spec.
2480fn canonical_file_uri(root: &Path) -> String {
2481    let p = root.to_string_lossy();
2482    #[cfg(unix)]
2483    {
2484        format!("file://{p}")
2485    }
2486    #[cfg(windows)]
2487    {
2488        format!("file:///{}", p.replace('\\', "/"))
2489    }
2490    #[cfg(not(any(unix, windows)))]
2491    {
2492        format!("file://{p}")
2493    }
2494}
2495
2496/// A subscriber that mirrors events to a local directory using the
2497/// same two-tier layout as [`FileCardStore`]:
2498///
2499/// - `{root}/{pkg}/{card_id}.toml` — Card TOML
2500/// - `{root}/{pkg}/{card_id}.samples.jsonl` — samples sidecar
2501/// - `{root}/_aliases.toml` — global alias table
2502pub struct FileCardSubscriber {
2503    root: PathBuf,
2504    uri: String,
2505}
2506
2507impl FileCardSubscriber {
2508    /// Construct a subscriber rooted at `root`. The canonical URI is
2509    /// computed once and returned from [`Self::describe`].
2510    pub fn new(root: PathBuf) -> Self {
2511        let uri = canonical_file_uri(&root);
2512        Self { root, uri }
2513    }
2514
2515    /// Scan the subscriber root for a Card with `card_id` under any
2516    /// `{pkg}` subdirectory. Returns `Ok(None)` when the root itself
2517    /// does not exist yet (subscribers are write-once lazy).
2518    pub fn locate_card(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
2519        validate_name(card_id, "card_id")?;
2520        if !self.root.exists() {
2521            return Ok(None);
2522        }
2523        let entries = fs::read_dir(&self.root).map_err(|e| format!("subscriber read_dir: {e}"))?;
2524        for entry in entries.flatten() {
2525            let p = entry.path();
2526            if p.is_dir() {
2527                let candidate = p.join(format!("{card_id}.toml"));
2528                if candidate.exists() {
2529                    return Ok(Some(candidate));
2530                }
2531            }
2532        }
2533        Ok(None)
2534    }
2535
2536    fn ensure_pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
2537        validate_name(pkg, "pkg")?;
2538        let dir = self.root.join(pkg);
2539        if !dir.exists() {
2540            fs::create_dir_all(&dir).map_err(|e| format!("subscriber mkdir: {e}"))?;
2541        }
2542        Ok(dir)
2543    }
2544
2545    fn write_created(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<(), String> {
2546        validate_name(card_id, "card_id")?;
2547        let dir = self.ensure_pkg_dir(pkg)?;
2548        let dest = dir.join(format!("{card_id}.toml"));
2549        atomic_write(&dest, toml_text.as_bytes())
2550    }
2551
2552    fn write_appended(&self, card_id: &str, toml_text: &str) -> Result<(), String> {
2553        match self.locate_card(card_id)? {
2554            Some(dest) => atomic_write(&dest, toml_text.as_bytes()),
2555            None => Err(format!(
2556                "subscriber append: card '{card_id}' missing at {}",
2557                self.uri
2558            )),
2559        }
2560    }
2561
2562    fn write_samples(&self, card_id: &str, jsonl_text: &str) -> Result<(), String> {
2563        let card_path = self.locate_card(card_id)?.ok_or_else(|| {
2564            format!(
2565                "subscriber samples: card '{card_id}' missing at {}",
2566                self.uri
2567            )
2568        })?;
2569        let dir = card_path
2570            .parent()
2571            .ok_or_else(|| format!("subscriber samples: card '{card_id}' has no parent dir"))?;
2572        let dest = dir.join(format!("{card_id}.samples.jsonl"));
2573        atomic_write(&dest, jsonl_text.as_bytes())
2574    }
2575
2576    fn write_aliases(&self, toml_text: &str) -> Result<(), String> {
2577        if !self.root.exists() {
2578            fs::create_dir_all(&self.root).map_err(|e| format!("subscriber mkdir: {e}"))?;
2579        }
2580        let dest = self.root.join("_aliases.toml");
2581        atomic_write(&dest, toml_text.as_bytes())
2582    }
2583}
2584
2585impl CardSubscriber for FileCardSubscriber {
2586    fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
2587        match ev {
2588            CardEvent::Created {
2589                pkg,
2590                card_id,
2591                toml_text,
2592            } => self.write_created(pkg, card_id, toml_text),
2593            CardEvent::Appended { card_id, toml_text } => self.write_appended(card_id, toml_text),
2594            CardEvent::SamplesWritten {
2595                card_id,
2596                jsonl_text,
2597            } => self.write_samples(card_id, jsonl_text),
2598            CardEvent::AliasesWritten { toml_text } => self.write_aliases(toml_text),
2599        }
2600    }
2601
2602    fn describe(&self) -> String {
2603        self.uri.clone()
2604    }
2605
2606    /// Delegates to [`FileCardSubscriber::locate_card`]. A non-existent
2607    /// root (subscriber has never been written to) returns `Ok(false)`,
2608    /// which is the correct "backfill needed" answer.
2609    fn has_card(&self, card_id: &str) -> Result<bool, String> {
2610        Ok(self.locate_card(card_id)?.is_some())
2611    }
2612}
2613
2614// ─── CardEventBus + OnceLock singleton ─────────────────────────
2615
2616/// Process-wide fan-out bus. Subscribers are registered once at startup
2617/// (from `ALC_CARD_SINKS`) and stored behind a `Mutex` so that tests
2618/// can swap them out via `replace_subscribers_for_test` without losing
2619/// the singleton identity.
2620pub struct CardEventBus {
2621    subscribers: Mutex<Vec<Arc<dyn CardSubscriber>>>,
2622    stats: Arc<SubscriberStats>,
2623}
2624
2625impl CardEventBus {
2626    /// Build a bus from an explicit subscriber list. Used both by the
2627    /// env loader and by `install_event_bus_for_test`.
2628    pub fn new(subscribers: Vec<Arc<dyn CardSubscriber>>) -> Self {
2629        Self {
2630            subscribers: Mutex::new(subscribers),
2631            stats: Arc::new(SubscriberStats::default()),
2632        }
2633    }
2634
2635    /// Shared handle to the per-subscriber counters.
2636    pub fn stats(&self) -> &Arc<SubscriberStats> {
2637        &self.stats
2638    }
2639
2640    /// Fan out `ev` to every registered subscriber serially. Subscriber
2641    /// failures are counted in `SubscriberStats` and logged but do not
2642    /// propagate.
2643    pub fn publish(&self, ev: &CardEvent) {
2644        let subs_snapshot: Vec<Arc<dyn CardSubscriber>> = {
2645            let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2646            guard.clone()
2647        };
2648        for sub in &subs_snapshot {
2649            let key = sub.describe();
2650            match sub.on_event(ev) {
2651                Ok(()) => self.stats.record_ok(&key, ev.kind()),
2652                Err(e) => {
2653                    tracing::warn!(
2654                        subscriber = %key,
2655                        kind = ev.kind().as_str(),
2656                        error = %e,
2657                        "card subscriber failed"
2658                    );
2659                    self.stats.record_err(&key, ev.kind(), &e);
2660                }
2661            }
2662        }
2663    }
2664
2665    /// Deliver `ev` to exactly one subscriber identified by
2666    /// `target` URI. Returns `Err` when no subscriber matches or the
2667    /// subscriber itself fails (backfill path needs the caller to know).
2668    pub fn publish_to(&self, target: &str, ev: &CardEvent) -> Result<(), String> {
2669        let hit: Option<Arc<dyn CardSubscriber>> = {
2670            let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2671            guard.iter().find(|s| s.describe() == target).cloned()
2672        };
2673        let Some(sub) = hit else {
2674            return Err(format!("subscriber not registered: {target}"));
2675        };
2676        let key = sub.describe();
2677        match sub.on_event(ev) {
2678            Ok(()) => {
2679                self.stats.record_ok(&key, ev.kind());
2680                Ok(())
2681            }
2682            Err(e) => {
2683                tracing::warn!(
2684                    subscriber = %key,
2685                    kind = ev.kind().as_str(),
2686                    error = %e,
2687                    "card subscriber failed (publish_to)"
2688                );
2689                self.stats.record_err(&key, ev.kind(), &e);
2690                Err(e)
2691            }
2692        }
2693    }
2694
2695    /// List every subscriber URI currently registered on the bus.
2696    pub fn subscriber_uris(&self) -> Vec<String> {
2697        let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2698        guard.iter().map(|s| s.describe()).collect()
2699    }
2700
2701    /// Look up a subscriber by URI (as returned by `describe`). Returns
2702    /// `None` when no subscriber matches. Used by
2703    /// `alc_card_sink_backfill` to dispatch has_card checks against a
2704    /// specific sink.
2705    pub fn find_subscriber(&self, uri: &str) -> Option<Arc<dyn CardSubscriber>> {
2706        let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2707        guard.iter().find(|s| s.describe() == uri).cloned()
2708    }
2709
2710    /// Replace the subscriber list in place while preserving singleton
2711    /// identity and shared `SubscriberStats`. Test-only.
2712    #[cfg(any(test, feature = "test-support"))]
2713    pub fn replace_subscribers_for_test(&self, subs: Vec<Arc<dyn CardSubscriber>>) {
2714        let mut guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2715        *guard = subs;
2716    }
2717
2718    /// Reset the per-subscriber counters. Test-only helper.
2719    #[cfg(any(test, feature = "test-support"))]
2720    pub fn reset_stats_for_test(&self) {
2721        let mut g = self.stats.inner.lock().unwrap_or_else(|p| p.into_inner());
2722        g.clear();
2723    }
2724}
2725
2726static CARD_EVENT_BUS: OnceLock<CardEventBus> = OnceLock::new();
2727
2728/// Return the process-wide `CardEventBus` singleton, initializing it
2729/// on the first call from the `ALC_CARD_SINKS` environment variable.
2730pub fn event_bus() -> &'static CardEventBus {
2731    CARD_EVENT_BUS.get_or_init(|| {
2732        let subs = load_subscribers_from_env();
2733        CardEventBus::new(subs)
2734    })
2735}
2736
2737/// Eagerly initialize the bus. Idempotent and safe to call multiple
2738/// times; intended for startup hooks (`main.rs`) so that subscriber
2739/// registration `tracing::info!` lines are emitted at boot rather than
2740/// on the first Card write.
2741pub fn init_event_bus() {
2742    let bus = event_bus();
2743    let uris = bus.subscriber_uris();
2744    if uris.is_empty() {
2745        tracing::info!("card sinks: no subscribers configured (ALC_CARD_SINKS unset)");
2746    } else {
2747        for uri in &uris {
2748            tracing::info!(subscriber = %uri, "card sink registered");
2749        }
2750    }
2751}
2752
2753/// Install a test-built bus. Fails once the singleton is already set.
2754#[cfg(any(test, feature = "test-support"))]
2755pub fn install_event_bus_for_test(bus: CardEventBus) -> Result<(), String> {
2756    CARD_EVENT_BUS
2757        .set(bus)
2758        .map_err(|_| "bus already initialized".to_string())
2759}
2760
2761/// Convenience wrapper: publish through the singleton.
2762pub fn publish(ev: CardEvent) {
2763    // In test builds, serialize publishing with the subscriber-test mutex so
2764    // that default-store tests running in parallel cannot inject events into
2765    // a subscriber test's mock while the mock is active.  The owning test
2766    // thread sets INSIDE_BUS_TEST to true so it does NOT try to acquire the
2767    // same lock it already holds (re-entrancy safe).
2768    #[cfg(test)]
2769    {
2770        let is_test_owner = INSIDE_BUS_TEST.with(|f| f.get());
2771        if !is_test_owner {
2772            // Block until any active subscriber test releases the lock.
2773            let _gate = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
2774            event_bus().publish(&ev);
2775            return;
2776        }
2777    }
2778    event_bus().publish(&ev);
2779}
2780
2781/// Mutex used in tests to serialise subscriber-test setup against concurrent
2782/// publishes from default-store tests running in parallel.
2783#[cfg(test)]
2784fn bus_test_gate() -> &'static Mutex<()> {
2785    static GATE: OnceLock<Mutex<()>> = OnceLock::new();
2786    GATE.get_or_init(|| Mutex::new(()))
2787}
2788
2789// Thread-local flag: set to `true` by the thread running inside
2790// `with_bus_subscribers` so that `publish` skips the gate (re-entrancy).
2791#[cfg(test)]
2792thread_local! {
2793    static INSIDE_BUS_TEST: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
2794}
2795
2796// ─── alc_card_sink_backfill ────────────────────────────────────
2797
2798/// Result of a [`card_sink_backfill`] run. One row per card the tool
2799/// touched (classified as pushed / skipped / failed / pushed_samples).
2800/// The `failed` entries carry the error message so an operator can
2801/// triage read-only mounts etc.
2802#[derive(Debug, Clone, Default, Serialize)]
2803pub struct SinkBackfillReport {
2804    pub sink: String,
2805    pub pushed: Vec<String>,
2806    pub skipped: Vec<String>,
2807    pub failed: Vec<(String, String)>,
2808    pub pushed_samples: Vec<String>,
2809}
2810
2811/// Backfill one subscriber (`sink` URI) from the primary store.
2812///
2813/// Steps:
2814/// 1. Look up the target subscriber on the event bus; fail fast if
2815///    the URI is not registered so the caller gets an immediate
2816///    error rather than a silent no-op.
2817/// 2. Enumerate every `(pkg, card_id)` pair from the default
2818///    [`CardStore`].
2819/// 3. For each pair, ask the subscriber whether it already has that
2820///    card (`CardSubscriber::has_card`). If yes → skipped (drift-safe,
2821///    no overwrite). If no → read the primary TOML and `publish_to`
2822///    the one target sink. Samples are mirrored the same way.
2823/// 4. `dry_run = true` short-circuits step 3: the report lists
2824///    what would have been pushed but no `publish_to` is issued,
2825///    so `SubscriberStats` does not increment.
2826///
2827/// `card_sink_backfill` operates with an injectable [`CardStore`]; tests
2828/// drive this directly against a tempdir-backed [`FileCardStore`] so they
2829/// never touch the user's real cards directory.
2830pub fn card_sink_backfill_with_store(
2831    store: &dyn CardStore,
2832    sink: &str,
2833    dry_run: bool,
2834) -> Result<SinkBackfillReport, String> {
2835    let bus = event_bus();
2836    let sub = bus
2837        .find_subscriber(sink)
2838        .ok_or_else(|| format!("unknown sink: {sink}"))?;
2839
2840    let locators = store.list_card_locators(None)?;
2841
2842    let mut report = SinkBackfillReport {
2843        sink: sink.to_string(),
2844        ..Default::default()
2845    };
2846
2847    for (pkg, locator) in locators {
2848        let card_id = match locator.file_stem().and_then(|s| s.to_str()) {
2849            Some(s) => s.to_string(),
2850            None => continue,
2851        };
2852
2853        match sub.has_card(&card_id) {
2854            Ok(true) => {
2855                report.skipped.push(card_id);
2856                continue;
2857            }
2858            Ok(false) => {}
2859            Err(e) => {
2860                tracing::warn!(
2861                    card_id = %card_id,
2862                    error = %e,
2863                    "backfill: has_card failed; treating as skipped"
2864                );
2865                report.skipped.push(card_id);
2866                continue;
2867            }
2868        }
2869
2870        let toml_text = match store.read_locator_text(&locator) {
2871            Ok(Some(t)) => t,
2872            Ok(None) => {
2873                // Unreadable / corrupt on primary. Do not panic; skip.
2874                report.skipped.push(card_id);
2875                continue;
2876            }
2877            Err(e) => {
2878                tracing::warn!(
2879                    card_id = %card_id,
2880                    error = %e,
2881                    "backfill: read_locator_text failed; treating as skipped"
2882                );
2883                report.skipped.push(card_id);
2884                continue;
2885            }
2886        };
2887
2888        if dry_run {
2889            report.pushed.push(card_id.clone());
2890            if matches!(store.read_samples_text(&card_id), Ok(Some(_))) {
2891                report.pushed_samples.push(card_id);
2892            }
2893            continue;
2894        }
2895
2896        let ev = CardEvent::Created {
2897            pkg: pkg.clone(),
2898            card_id: card_id.clone(),
2899            toml_text,
2900        };
2901        match bus.publish_to(sink, &ev) {
2902            Ok(()) => report.pushed.push(card_id.clone()),
2903            Err(e) => {
2904                report.failed.push((card_id, e));
2905                continue;
2906            }
2907        }
2908
2909        if let Ok(Some(jsonl_text)) = store.read_samples_text(&card_id) {
2910            let ev = CardEvent::SamplesWritten {
2911                card_id: card_id.clone(),
2912                jsonl_text,
2913            };
2914            match bus.publish_to(sink, &ev) {
2915                Ok(()) => report.pushed_samples.push(card_id),
2916                Err(e) => {
2917                    report.failed.push((card_id, format!("samples: {e}")));
2918                }
2919            }
2920        }
2921    }
2922
2923    Ok(report)
2924}
2925
2926// ─── ALC_CARD_SINKS env parser ─────────────────────────────────
2927
2928/// Read `ALC_CARD_SINKS` and build one subscriber per accepted spec.
2929/// Malformed entries are logged and skipped; duplicate URIs are
2930/// first-wins. Non-UTF8 values reject the whole env.
2931fn load_subscribers_from_env() -> Vec<Arc<dyn CardSubscriber>> {
2932    let raw = match std::env::var("ALC_CARD_SINKS") {
2933        Ok(v) => v,
2934        Err(std::env::VarError::NotPresent) => return Vec::new(),
2935        Err(std::env::VarError::NotUnicode(_)) => {
2936            tracing::error!("ALC_CARD_SINKS contains non-UTF8 bytes; ignoring entire variable");
2937            return Vec::new();
2938        }
2939    };
2940    parse_subscribers_from_str(&raw)
2941}
2942
2943/// Parse a `|`-separated list of subscriber URIs (the same format used
2944/// by `ALC_CARD_SINKS`). Extracted so tests can exercise the parser
2945/// without touching process environment.
2946fn parse_subscribers_from_str(raw: &str) -> Vec<Arc<dyn CardSubscriber>> {
2947    if raw.is_empty() {
2948        return Vec::new();
2949    }
2950    let mut seen: HashSet<String> = HashSet::new();
2951    let mut out: Vec<Arc<dyn CardSubscriber>> = Vec::new();
2952    for spec in raw.split('|') {
2953        let spec = spec.trim();
2954        if spec.is_empty() {
2955            continue;
2956        }
2957        let Some(sub) = parse_subscriber_spec(spec) else {
2958            continue;
2959        };
2960        let uri = sub.describe();
2961        if !seen.insert(uri.clone()) {
2962            tracing::warn!(subscriber = %uri, "duplicate ALC_CARD_SINKS entry; keeping first");
2963            continue;
2964        }
2965        out.push(sub);
2966    }
2967    out
2968}
2969
2970/// Parse one subscriber spec. v1 only accepts `file:///absolute/path`.
2971fn parse_subscriber_spec(spec: &str) -> Option<Arc<dyn CardSubscriber>> {
2972    // scheme required
2973    let Some(colon_idx) = spec.find(':') else {
2974        tracing::error!(spec, "ALC_CARD_SINKS entry missing URI scheme");
2975        return None;
2976    };
2977    let scheme = &spec[..colon_idx];
2978    let rest = &spec[colon_idx + 1..];
2979    if scheme != "file" {
2980        tracing::error!(spec, scheme, "ALC_CARD_SINKS entry has unknown scheme");
2981        return None;
2982    }
2983    // scheme-specific: must start with `//`
2984    let Some(after_slash) = rest.strip_prefix("//") else {
2985        tracing::error!(spec, "file URI missing '//'");
2986        return None;
2987    };
2988    // split authority / path. path begins with '/'.
2989    let Some(path_start) = after_slash.find('/') else {
2990        tracing::error!(spec, "file URI has no path component");
2991        return None;
2992    };
2993    let authority = &after_slash[..path_start];
2994    let encoded_path = &after_slash[path_start..];
2995    if !authority.is_empty() {
2996        tracing::error!(
2997            spec,
2998            authority,
2999            "file URI with non-empty authority is rejected"
3000        );
3001        return None;
3002    }
3003    let path = decode_file_uri_path(encoded_path)?;
3004    Some(Arc::new(FileCardSubscriber::new(path)))
3005}
3006
3007/// Decode the path portion of a `file://` URI into a `PathBuf`.
3008///
3009/// Unix: a leading `/` is preserved (absolute path).
3010/// Windows: the leading `/` is stripped so that `/C:/a/b` becomes
3011/// `C:/a/b`.
3012fn decode_file_uri_path(encoded: &str) -> Option<PathBuf> {
3013    let decoded = percent_decode(encoded)?;
3014    #[cfg(windows)]
3015    {
3016        // Strip the leading slash so "/C:/foo" -> "C:/foo".
3017        let trimmed = decoded.strip_prefix('/').unwrap_or(&decoded);
3018        Some(PathBuf::from(trimmed))
3019    }
3020    #[cfg(not(windows))]
3021    {
3022        Some(PathBuf::from(decoded))
3023    }
3024}
3025
3026/// Percent-decode a URI path segment. Returns `None` on invalid or
3027/// truncated `%XX` sequences.
3028fn percent_decode(src: &str) -> Option<String> {
3029    let bytes = src.as_bytes();
3030    let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
3031    let mut i = 0;
3032    while i < bytes.len() {
3033        let b = bytes[i];
3034        if b == b'%' {
3035            if i + 2 >= bytes.len() {
3036                tracing::error!(src, "percent-encoded sequence truncated");
3037                return None;
3038            }
3039            let hi = (bytes[i + 1] as char).to_digit(16);
3040            let lo = (bytes[i + 2] as char).to_digit(16);
3041            match (hi, lo) {
3042                (Some(h), Some(l)) => {
3043                    out.push(((h << 4) | l) as u8);
3044                    i += 3;
3045                }
3046                _ => {
3047                    tracing::error!(src, "percent-encoded sequence has non-hex digits");
3048                    return None;
3049                }
3050            }
3051        } else {
3052            out.push(b);
3053            i += 1;
3054        }
3055    }
3056    match String::from_utf8(out) {
3057        Ok(s) => Some(s),
3058        Err(_) => {
3059            tracing::error!(src, "percent-decoded bytes are not valid UTF-8");
3060            None
3061        }
3062    }
3063}
3064
3065#[cfg(test)]
3066mod tests {
3067    use super::*;
3068
3069    /// Test-only shared [`FileCardStore`] rooted at a process-wide tempdir.
3070    ///
3071    /// The legacy module-level free-fn tests (originally backed by
3072    /// `~/.algocline/cards/`) now target this store so no test reads
3073    /// the real HOME directory. Each test still uses `unique_pkg()`
3074    /// for cross-test isolation under a single root, mirroring the
3075    /// legacy pkg-namespaced layout.
3076    fn shared_store() -> &'static FileCardStore {
3077        static STORE: OnceLock<FileCardStore> = OnceLock::new();
3078        STORE.get_or_init(|| {
3079            let tmp = tempfile::tempdir().expect("test tempdir");
3080            // Leak the guard: the dir survives for the whole test
3081            // binary lifetime which matches the OnceLock lifetime.
3082            let root = tmp.path().to_path_buf();
3083            std::mem::forget(tmp);
3084            FileCardStore::new(root)
3085        })
3086    }
3087
3088    // ─── Back-compat free-fn shims over `shared_store()` ──────────
3089    //
3090    // These wrap the same `*_with_store` free fns that the production
3091    // `Arc<FileCardStore>` path calls, but through a tempdir-rooted
3092    // store so tests never touch HOME. Kept private to the test module.
3093
3094    fn create(input: Json) -> Result<(String, PathBuf), String> {
3095        create_with_store(shared_store(), input)
3096    }
3097
3098    fn get(card_id: &str) -> Result<Option<Json>, String> {
3099        get_with_store(shared_store(), card_id)
3100    }
3101
3102    fn list(pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
3103        list_with_store(shared_store(), pkg_filter)
3104    }
3105
3106    fn append(card_id: &str, fields: Json) -> Result<Json, String> {
3107        append_with_store(shared_store(), card_id, fields)
3108    }
3109
3110    fn alias_set(
3111        name: &str,
3112        card_id: &str,
3113        pkg: Option<&str>,
3114        note: Option<&str>,
3115    ) -> Result<Alias, String> {
3116        alias_set_with_store(shared_store(), name, card_id, pkg, note)
3117    }
3118
3119    fn alias_list(pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
3120        alias_list_with_store(shared_store(), pkg_filter)
3121    }
3122
3123    fn get_by_alias(name: &str) -> Result<Option<Json>, String> {
3124        get_by_alias_with_store(shared_store(), name)
3125    }
3126
3127    fn find(q: FindQuery) -> Result<Vec<Summary>, String> {
3128        find_with_store(shared_store(), q)
3129    }
3130
3131    // Shim names used only by a subset of tests; remaining back-compat
3132    // wrappers for functions exercised directly here. Tests for paths
3133    // not covered by a shim call the `_with_store` variants against
3134    // their own tempdir stores.
3135
3136    fn lineage(q: LineageQuery) -> Result<Option<LineageResult>, String> {
3137        lineage_with_store(shared_store(), q)
3138    }
3139
3140    fn import_from_dir(
3141        source_dir: &std::path::Path,
3142        pkg: &str,
3143    ) -> Result<(Vec<String>, Vec<String>), String> {
3144        import_from_dir_with_store(shared_store(), source_dir, pkg)
3145    }
3146
3147    fn unique_pkg() -> String {
3148        let ns = std::time::SystemTime::now()
3149            .duration_since(std::time::UNIX_EPOCH)
3150            .unwrap()
3151            .as_nanos();
3152        format!("_test_card_{ns}")
3153    }
3154
3155    fn cleanup(pkg: &str) {
3156        if let Ok(d) = shared_store().pkg_dir(pkg) {
3157            let _ = fs::remove_dir_all(&d);
3158        }
3159    }
3160
3161    #[test]
3162    fn minimum_valid_card() {
3163        let pkg = unique_pkg();
3164        let input = json!({ "pkg": { "name": pkg } });
3165        let (id, path) = create(input).unwrap();
3166        assert!(path.exists());
3167        assert!(id.starts_with(&pkg));
3168
3169        let got = get(&id).unwrap().unwrap();
3170        assert_eq!(got["schema_version"], json!(SCHEMA_VERSION));
3171        assert_eq!(got["card_id"], json!(id));
3172        assert_eq!(got["pkg"]["name"], json!(pkg));
3173        assert!(got.get("created_at").is_some());
3174        assert!(got.get("created_by").is_some());
3175
3176        cleanup(&pkg);
3177    }
3178
3179    #[test]
3180    fn create_rejects_missing_pkg_name() {
3181        let err = create(json!({})).unwrap_err();
3182        assert!(err.contains("pkg.name"));
3183    }
3184
3185    #[test]
3186    fn create_is_immutable() {
3187        let pkg = unique_pkg();
3188        let input = json!({
3189            "card_id": "fixed_id_001",
3190            "pkg": { "name": pkg }
3191        });
3192        create(input.clone()).unwrap();
3193        let err = create(input).unwrap_err();
3194        assert!(err.contains("already exists"));
3195        cleanup(&pkg);
3196    }
3197
3198    #[test]
3199    fn create_injects_param_fingerprint() {
3200        let pkg = unique_pkg();
3201        let input = json!({
3202            "pkg": { "name": pkg },
3203            "params": { "depth": 3, "temperature": 0.0 }
3204        });
3205        let (id, _) = create(input).unwrap();
3206        let got = get(&id).unwrap().unwrap();
3207        assert!(got["param_fingerprint"].is_string());
3208        cleanup(&pkg);
3209    }
3210
3211    #[test]
3212    fn list_returns_newest_first() {
3213        let pkg = unique_pkg();
3214        // First card
3215        let (id1, _) = create(json!({
3216            "card_id": format!("{pkg}_a"),
3217            "pkg": { "name": pkg },
3218            "created_at": "2025-01-01T00:00:00Z"
3219        }))
3220        .unwrap();
3221        let (id2, _) = create(json!({
3222            "card_id": format!("{pkg}_b"),
3223            "pkg": { "name": pkg },
3224            "created_at": "2026-01-01T00:00:00Z"
3225        }))
3226        .unwrap();
3227
3228        let rows = list(Some(&pkg)).unwrap();
3229        assert_eq!(rows.len(), 2);
3230        assert_eq!(rows[0].card_id, id2); // newer first
3231        assert_eq!(rows[1].card_id, id1);
3232
3233        cleanup(&pkg);
3234    }
3235
3236    #[test]
3237    fn list_extracts_summary_fields() {
3238        let pkg = unique_pkg();
3239        let (id, _) = create(json!({
3240            "pkg": { "name": pkg },
3241            "model": { "id": "claude-opus-4-6" },
3242            "scenario": { "name": "gsm8k_sample100" },
3243            "stats": { "pass_rate": 0.82 }
3244        }))
3245        .unwrap();
3246
3247        let rows = list(Some(&pkg)).unwrap();
3248        let row = rows.iter().find(|r| r.card_id == id).unwrap();
3249        assert_eq!(row.model.as_deref(), Some("claude-opus-4-6"));
3250        assert_eq!(row.scenario.as_deref(), Some("gsm8k_sample100"));
3251        assert_eq!(row.pass_rate, Some(0.82));
3252
3253        cleanup(&pkg);
3254    }
3255
3256    #[test]
3257    fn get_missing_returns_none() {
3258        assert!(get("does_not_exist_xyz").unwrap().is_none());
3259    }
3260
3261    #[test]
3262    fn card_id_embeds_compact_timestamp() {
3263        let pkg = unique_pkg();
3264        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3265        // Expect: {pkg}_{model}_{YYYYMMDDTHHMMSS}_{hash6}
3266        // After removing the pkg prefix, there should be a segment
3267        // containing 'T' separating date and time.
3268        let tail = id.strip_prefix(&format!("{pkg}_")).unwrap();
3269        let parts: Vec<&str> = tail.split('_').collect();
3270        // parts = [model_short, YYYYMMDDTHHMMSS, hash6]
3271        assert_eq!(parts.len(), 3, "unexpected card_id shape: {id}");
3272        let ts = parts[1];
3273        assert_eq!(ts.len(), 15, "timestamp segment wrong length: {ts}");
3274        assert!(ts.chars().nth(8) == Some('T'), "missing T separator: {ts}");
3275        cleanup(&pkg);
3276    }
3277
3278    #[test]
3279    fn now_compact_format() {
3280        let s = now_compact();
3281        assert_eq!(s.len(), 15);
3282        assert_eq!(s.chars().nth(8), Some('T'));
3283        // All other positions are digits
3284        for (i, c) in s.chars().enumerate() {
3285            if i != 8 {
3286                assert!(c.is_ascii_digit(), "non-digit at pos {i}: {s}");
3287            }
3288        }
3289    }
3290
3291    #[test]
3292    fn short_model_variants() {
3293        assert_eq!(short_model("claude-opus-4-6"), "opus46");
3294        assert_eq!(short_model("gpt-4o"), "4o");
3295        assert_eq!(short_model(""), "model");
3296    }
3297
3298    #[test]
3299    fn two_cards_same_second_different_stats_get_distinct_ids() {
3300        let pkg = unique_pkg();
3301        let input1 = json!({
3302            "pkg": { "name": pkg },
3303            "scenario": { "name": "gsm8k" },
3304            "stats": { "pass_rate": 0.4 }
3305        });
3306        let input2 = json!({
3307            "pkg": { "name": pkg },
3308            "scenario": { "name": "gsm8k" },
3309            "stats": { "pass_rate": 0.9 }
3310        });
3311        let (id1, _) = create(input1).unwrap();
3312        let (id2, _) = create(input2).unwrap();
3313        assert_ne!(id1, id2, "distinct stats must yield distinct card_ids");
3314        cleanup(&pkg);
3315    }
3316
3317    // ─── P1: append ────────────────────────────────────────────
3318
3319    #[test]
3320    fn append_adds_new_fields() {
3321        let pkg = unique_pkg();
3322        let (id, _) = create(json!({
3323            "pkg": { "name": pkg },
3324            "stats": { "pass_rate": 0.5 }
3325        }))
3326        .unwrap();
3327
3328        let merged = append(
3329            &id,
3330            json!({
3331                "caveats": { "notes": "rescored after fix" },
3332                "metadata": { "reviewer": "yn" }
3333            }),
3334        )
3335        .unwrap();
3336        assert_eq!(merged["caveats"]["notes"], json!("rescored after fix"));
3337        assert_eq!(merged["metadata"]["reviewer"], json!("yn"));
3338
3339        // Persisted
3340        let got = get(&id).unwrap().unwrap();
3341        assert_eq!(got["caveats"]["notes"], json!("rescored after fix"));
3342        // Existing field untouched
3343        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3344
3345        cleanup(&pkg);
3346    }
3347
3348    #[test]
3349    fn append_rejects_existing_key() {
3350        let pkg = unique_pkg();
3351        let (id, _) = create(json!({
3352            "pkg": { "name": pkg },
3353            "stats": { "pass_rate": 0.5 }
3354        }))
3355        .unwrap();
3356
3357        let err = append(&id, json!({ "stats": { "pass_rate": 0.9 } })).unwrap_err();
3358        assert!(err.contains("already set"), "got: {err}");
3359        // Verify original value still there
3360        let got = get(&id).unwrap().unwrap();
3361        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3362
3363        cleanup(&pkg);
3364    }
3365
3366    #[test]
3367    fn append_errors_on_missing_card() {
3368        let err = append("does_not_exist_xyz", json!({ "x": 1 })).unwrap_err();
3369        assert!(err.contains("not found"));
3370    }
3371
3372    // ─── P1: alias_set / alias_list ────────────────────────────
3373
3374    #[test]
3375    fn alias_set_and_list_roundtrip() {
3376        let pkg = unique_pkg();
3377        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3378
3379        let alias_name = format!("test_alias_{}", &pkg);
3380        alias_set(&alias_name, &id, Some(&pkg), Some("smoke")).unwrap();
3381
3382        let rows = alias_list(Some(&pkg)).unwrap();
3383        let a = rows.iter().find(|a| a.name == alias_name).unwrap();
3384        assert_eq!(a.card_id, id);
3385        assert_eq!(a.pkg.as_deref(), Some(pkg.as_str()));
3386        assert_eq!(a.note.as_deref(), Some("smoke"));
3387        assert!(!a.set_at.is_empty());
3388
3389        // Rebind to a new card
3390        let (id2, _) = create(json!({
3391            "card_id": format!("{pkg}_b"),
3392            "pkg": { "name": pkg }
3393        }))
3394        .unwrap();
3395        alias_set(&alias_name, &id2, Some(&pkg), None).unwrap();
3396        let rows = alias_list(Some(&pkg)).unwrap();
3397        let matching: Vec<&Alias> = rows.iter().filter(|a| a.name == alias_name).collect();
3398        assert_eq!(matching.len(), 1, "alias should be unique by name");
3399        assert_eq!(matching[0].card_id, id2);
3400
3401        // Cleanup: remove our alias from the file
3402        let store = shared_store();
3403        let remaining: Vec<Alias> = store
3404            .read_aliases()
3405            .unwrap()
3406            .into_iter()
3407            .filter(|a| a.name != alias_name)
3408            .collect();
3409        store.write_aliases(&remaining).unwrap();
3410        cleanup(&pkg);
3411    }
3412
3413    #[test]
3414    fn alias_set_rejects_unknown_card() {
3415        let err = alias_set("x", "does_not_exist_xyz", None, None).unwrap_err();
3416        assert!(err.contains("not found"));
3417    }
3418
3419    // ─── find + where DSL ───────────────────────────────────────
3420
3421    fn where_from(v: Json) -> Predicate {
3422        parse_where(&v).expect("parse where")
3423    }
3424
3425    fn order_from(v: Json) -> Vec<OrderKey> {
3426        parse_order_by(&v).expect("parse order_by")
3427    }
3428
3429    #[test]
3430    fn find_where_nested_eq_and_gte() {
3431        let pkg = unique_pkg();
3432        create(json!({
3433            "card_id": format!("{pkg}_low"),
3434            "pkg": { "name": pkg },
3435            "scenario": { "name": "gsm8k" },
3436            "stats": { "pass_rate": 0.4 }
3437        }))
3438        .unwrap();
3439        create(json!({
3440            "card_id": format!("{pkg}_high"),
3441            "pkg": { "name": pkg },
3442            "scenario": { "name": "gsm8k" },
3443            "stats": { "pass_rate": 0.9 }
3444        }))
3445        .unwrap();
3446        create(json!({
3447            "card_id": format!("{pkg}_other"),
3448            "pkg": { "name": pkg },
3449            "scenario": { "name": "other" },
3450            "stats": { "pass_rate": 1.0 }
3451        }))
3452        .unwrap();
3453
3454        // scenario eq via nested object
3455        let rows = find(FindQuery {
3456            pkg: Some(pkg.clone()),
3457            where_: Some(where_from(json!({
3458                "scenario": { "name": "gsm8k" },
3459            }))),
3460            order_by: order_from(json!("-stats.pass_rate")),
3461            ..Default::default()
3462        })
3463        .unwrap();
3464        assert_eq!(rows.len(), 2);
3465        assert_eq!(rows[0].pass_rate, Some(0.9));
3466        assert_eq!(rows[1].pass_rate, Some(0.4));
3467
3468        // gte operator
3469        let rows = find(FindQuery {
3470            pkg: Some(pkg.clone()),
3471            where_: Some(where_from(json!({
3472                "stats": { "pass_rate": { "gte": 0.8 } },
3473            }))),
3474            order_by: order_from(json!("-stats.pass_rate")),
3475            ..Default::default()
3476        })
3477        .unwrap();
3478        assert_eq!(rows.len(), 2);
3479        assert!(rows.iter().all(|r| r.pass_rate.unwrap() >= 0.8));
3480
3481        // limit
3482        let rows = find(FindQuery {
3483            pkg: Some(pkg.clone()),
3484            order_by: order_from(json!("-stats.pass_rate")),
3485            limit: Some(1),
3486            ..Default::default()
3487        })
3488        .unwrap();
3489        assert_eq!(rows.len(), 1);
3490        assert_eq!(rows[0].pass_rate, Some(1.0));
3491
3492        cleanup(&pkg);
3493    }
3494
3495    #[test]
3496    fn find_where_implicit_eq_and_logical() {
3497        let pkg = unique_pkg();
3498        create(json!({
3499            "card_id": format!("{pkg}_a"),
3500            "pkg": { "name": pkg },
3501            "model": { "id": "claude-opus-4-6" },
3502            "stats": { "equilibrium_position": "dead", "survival_rate": 0.0 }
3503        }))
3504        .unwrap();
3505        create(json!({
3506            "card_id": format!("{pkg}_b"),
3507            "pkg": { "name": pkg },
3508            "model": { "id": "claude-opus-4-6" },
3509            "stats": { "equilibrium_position": "niche_leader", "survival_rate": 1.0 }
3510        }))
3511        .unwrap();
3512        create(json!({
3513            "card_id": format!("{pkg}_c"),
3514            "pkg": { "name": pkg },
3515            "model": { "id": "claude-haiku-4-5-20251001" },
3516            "stats": { "equilibrium_position": "fragile", "survival_rate": 0.2 }
3517        }))
3518        .unwrap();
3519
3520        // implicit eq on sparse stats field
3521        let rows = find(FindQuery {
3522            pkg: Some(pkg.clone()),
3523            where_: Some(where_from(json!({
3524                "stats": { "equilibrium_position": "dead" },
3525            }))),
3526            ..Default::default()
3527        })
3528        .unwrap();
3529        assert_eq!(rows.len(), 1);
3530        assert!(rows[0].card_id.ends_with("_a"));
3531
3532        // _or
3533        let rows = find(FindQuery {
3534            pkg: Some(pkg.clone()),
3535            where_: Some(where_from(json!({
3536                "_or": [
3537                    { "stats": { "equilibrium_position": "dead" } },
3538                    { "stats": { "survival_rate": { "gte": 0.9 } } },
3539                ],
3540            }))),
3541            ..Default::default()
3542        })
3543        .unwrap();
3544        assert_eq!(rows.len(), 2);
3545
3546        // _not
3547        let rows = find(FindQuery {
3548            pkg: Some(pkg.clone()),
3549            where_: Some(where_from(json!({
3550                "_not": { "model": { "id": "claude-haiku-4-5-20251001" } },
3551            }))),
3552            ..Default::default()
3553        })
3554        .unwrap();
3555        assert_eq!(rows.len(), 2);
3556
3557        // in operator
3558        let rows = find(FindQuery {
3559            pkg: Some(pkg.clone()),
3560            where_: Some(where_from(json!({
3561                "stats": {
3562                    "equilibrium_position": { "in": ["dead", "fragile"] },
3563                },
3564            }))),
3565            ..Default::default()
3566        })
3567        .unwrap();
3568        assert_eq!(rows.len(), 2);
3569
3570        // exists false (sparse field missing on haiku card? all have it, so test on
3571        // a field that only some have)
3572        let rows = find(FindQuery {
3573            pkg: Some(pkg.clone()),
3574            where_: Some(where_from(json!({
3575                "strategy_params": { "temperature": { "exists": false } },
3576            }))),
3577            ..Default::default()
3578        })
3579        .unwrap();
3580        assert_eq!(rows.len(), 3, "none of the cards have strategy_params");
3581
3582        cleanup(&pkg);
3583    }
3584
3585    #[test]
3586    fn find_order_by_multi_key() {
3587        let pkg = unique_pkg();
3588        create(json!({
3589            "card_id": format!("{pkg}_a"),
3590            "pkg": { "name": pkg },
3591            "stats": { "pass_rate": 0.5 }
3592        }))
3593        .unwrap();
3594        create(json!({
3595            "card_id": format!("{pkg}_b"),
3596            "pkg": { "name": pkg },
3597            "stats": { "pass_rate": 0.9 }
3598        }))
3599        .unwrap();
3600        create(json!({
3601            "card_id": format!("{pkg}_c"),
3602            "pkg": { "name": pkg },
3603            "stats": { "pass_rate": 0.9 }
3604        }))
3605        .unwrap();
3606
3607        let rows = find(FindQuery {
3608            pkg: Some(pkg.clone()),
3609            order_by: order_from(json!(["-stats.pass_rate", "card_id"])),
3610            ..Default::default()
3611        })
3612        .unwrap();
3613        assert_eq!(rows.len(), 3);
3614        assert_eq!(rows[0].pass_rate, Some(0.9));
3615        assert_eq!(rows[1].pass_rate, Some(0.9));
3616        assert_eq!(rows[2].pass_rate, Some(0.5));
3617        // Tiebreak by card_id ascending
3618        assert!(rows[0].card_id < rows[1].card_id);
3619
3620        cleanup(&pkg);
3621    }
3622
3623    #[test]
3624    fn find_offset_and_limit() {
3625        let pkg = unique_pkg();
3626        for i in 0..5 {
3627            create(json!({
3628                "card_id": format!("{pkg}_{i}"),
3629                "pkg": { "name": pkg },
3630                "stats": { "pass_rate": 0.1 * (i + 1) as f64 }
3631            }))
3632            .unwrap();
3633        }
3634
3635        let rows = find(FindQuery {
3636            pkg: Some(pkg.clone()),
3637            order_by: order_from(json!("-stats.pass_rate")),
3638            offset: Some(1),
3639            limit: Some(2),
3640            ..Default::default()
3641        })
3642        .unwrap();
3643        assert_eq!(rows.len(), 2);
3644        // Best is 0.5, after offset=1 we start at 0.4 then 0.3.
3645        let pr0 = rows[0].pass_rate.unwrap();
3646        let pr1 = rows[1].pass_rate.unwrap();
3647        assert!((pr0 - 0.4).abs() < 1e-9, "got {pr0}");
3648        assert!((pr1 - 0.3).abs() < 1e-9, "got {pr1}");
3649
3650        cleanup(&pkg);
3651    }
3652
3653    #[test]
3654    fn parse_where_rejects_non_object() {
3655        assert!(parse_where(&json!("not an object")).is_err());
3656        assert!(parse_where(&json!(42)).is_err());
3657    }
3658
3659    #[test]
3660    fn parse_order_by_accepts_string_and_array() {
3661        let k = parse_order_by(&json!("-stats.pass_rate")).unwrap();
3662        assert_eq!(k.len(), 1);
3663        assert_eq!(k[0].path, vec!["stats", "pass_rate"]);
3664        assert!(k[0].desc);
3665
3666        let k = parse_order_by(&json!(["created_at", "-stats.n"])).unwrap();
3667        assert_eq!(k.len(), 2);
3668        assert!(!k[0].desc);
3669        assert!(k[1].desc);
3670    }
3671
3672    #[test]
3673    fn find_where_string_ops_contains_and_starts_with() {
3674        let pkg = unique_pkg();
3675        create(json!({
3676            "card_id": format!("{pkg}_a"),
3677            "pkg": { "name": pkg },
3678            "model": { "id": "claude-opus-4-6" },
3679            "metadata": { "tag": "experiment_alpha" },
3680        }))
3681        .unwrap();
3682        create(json!({
3683            "card_id": format!("{pkg}_b"),
3684            "pkg": { "name": pkg },
3685            "model": { "id": "claude-haiku-4-5-20251001" },
3686            "metadata": { "tag": "experiment_beta" },
3687        }))
3688        .unwrap();
3689        create(json!({
3690            "card_id": format!("{pkg}_c"),
3691            "pkg": { "name": pkg },
3692            "model": { "id": "claude-sonnet-4-5" },
3693            "metadata": { "tag": "baseline" },
3694        }))
3695        .unwrap();
3696
3697        // contains: matches substring anywhere
3698        let rows = find(FindQuery {
3699            pkg: Some(pkg.clone()),
3700            where_: Some(where_from(json!({
3701                "metadata": { "tag": { "contains": "experiment" } },
3702            }))),
3703            ..Default::default()
3704        })
3705        .unwrap();
3706        assert_eq!(rows.len(), 2);
3707
3708        // starts_with: matches only the prefix
3709        let rows = find(FindQuery {
3710            pkg: Some(pkg.clone()),
3711            where_: Some(where_from(json!({
3712                "model": { "id": { "starts_with": "claude-opus" } },
3713            }))),
3714            ..Default::default()
3715        })
3716        .unwrap();
3717        assert_eq!(rows.len(), 1);
3718        assert!(rows[0].card_id.ends_with("_a"));
3719
3720        // string ops on missing field → false
3721        let rows = find(FindQuery {
3722            pkg: Some(pkg.clone()),
3723            where_: Some(where_from(json!({
3724                "metadata": { "missing_field": { "contains": "x" } },
3725            }))),
3726            ..Default::default()
3727        })
3728        .unwrap();
3729        assert_eq!(rows.len(), 0);
3730
3731        // string ops on non-string field → false
3732        let rows = find(FindQuery {
3733            pkg: Some(pkg.clone()),
3734            where_: Some(where_from(json!({
3735                "metadata": { "tag": { "starts_with": 42 } },
3736            }))),
3737            ..Default::default()
3738        })
3739        .unwrap();
3740        assert_eq!(rows.len(), 0);
3741
3742        cleanup(&pkg);
3743    }
3744
3745    #[test]
3746    fn where_missing_field_ne_is_true() {
3747        let pkg = unique_pkg();
3748        create(json!({
3749            "card_id": format!("{pkg}_x"),
3750            "pkg": { "name": pkg },
3751        }))
3752        .unwrap();
3753
3754        let rows = find(FindQuery {
3755            pkg: Some(pkg.clone()),
3756            where_: Some(where_from(json!({
3757                "strategy_params": { "temperature": { "ne": 0.5 } },
3758            }))),
3759            ..Default::default()
3760        })
3761        .unwrap();
3762        assert_eq!(rows.len(), 1, "missing field is ne to anything");
3763
3764        cleanup(&pkg);
3765    }
3766
3767    // ─── lineage ───────────────────────────────────────────────
3768
3769    /// Helper: create a child Card pointing at a parent with a relation.
3770    fn create_child(pkg: &str, suffix: &str, parent_id: &str, relation: &str) -> String {
3771        let (id, _) = create(json!({
3772            "card_id": format!("{pkg}_{suffix}"),
3773            "pkg": { "name": pkg },
3774            "stats": { "pass_rate": 0.5 },
3775            "metadata": {
3776                "prior_card_id": parent_id,
3777                "prior_relation": relation,
3778            },
3779        }))
3780        .unwrap();
3781        id
3782    }
3783
3784    #[test]
3785    fn lineage_up_walks_prior_card_id_chain() {
3786        let pkg = unique_pkg();
3787        // a → b → c (c is newest; b points at a; c points at b)
3788        let (a, _) = create(json!({
3789            "card_id": format!("{pkg}_a"),
3790            "pkg": { "name": pkg },
3791        }))
3792        .unwrap();
3793        let b = create_child(&pkg, "b", &a, "rerun_of");
3794        let c = create_child(&pkg, "c", &b, "rerun_of");
3795
3796        let res = lineage(LineageQuery {
3797            card_id: c.clone(),
3798            direction: LineageDirection::Up,
3799            depth: None,
3800            include_stats: false,
3801            relation_filter: None,
3802        })
3803        .unwrap()
3804        .expect("lineage result");
3805
3806        assert_eq!(res.root, c);
3807        assert_eq!(res.nodes.len(), 3, "root + 2 ancestors");
3808        assert_eq!(res.nodes[0].card_id, c);
3809        assert_eq!(res.nodes[0].depth, 0);
3810        assert_eq!(res.nodes[1].card_id, b);
3811        assert_eq!(res.nodes[1].depth, -1);
3812        assert_eq!(res.nodes[2].card_id, a);
3813        assert_eq!(res.nodes[2].depth, -2);
3814        assert_eq!(res.edges.len(), 2);
3815        assert!(!res.truncated);
3816
3817        cleanup(&pkg);
3818    }
3819
3820    #[test]
3821    fn lineage_down_walks_descendants_breadth_first() {
3822        let pkg = unique_pkg();
3823        // a has two children b, c; c has one child d.
3824        let (a, _) = create(json!({
3825            "card_id": format!("{pkg}_a"),
3826            "pkg": { "name": pkg },
3827        }))
3828        .unwrap();
3829        let _b = create_child(&pkg, "b", &a, "sweep_variant");
3830        let c = create_child(&pkg, "c", &a, "sweep_variant");
3831        let _d = create_child(&pkg, "d", &c, "rerun_of");
3832
3833        let res = lineage(LineageQuery {
3834            card_id: a.clone(),
3835            direction: LineageDirection::Down,
3836            depth: None,
3837            include_stats: false,
3838            relation_filter: None,
3839        })
3840        .unwrap()
3841        .expect("lineage result");
3842
3843        // root + b + c + d = 4 nodes
3844        assert_eq!(res.nodes.len(), 4);
3845        assert_eq!(res.edges.len(), 3);
3846        assert!(!res.truncated);
3847
3848        cleanup(&pkg);
3849    }
3850
3851    #[test]
3852    fn lineage_depth_truncation_sets_flag() {
3853        let pkg = unique_pkg();
3854        let (a, _) = create(json!({
3855            "card_id": format!("{pkg}_a"),
3856            "pkg": { "name": pkg },
3857        }))
3858        .unwrap();
3859        let b = create_child(&pkg, "b", &a, "rerun_of");
3860        let _c = create_child(&pkg, "c", &b, "rerun_of");
3861
3862        let res = lineage(LineageQuery {
3863            card_id: a,
3864            direction: LineageDirection::Down,
3865            depth: Some(1),
3866            include_stats: false,
3867            relation_filter: None,
3868        })
3869        .unwrap()
3870        .unwrap();
3871        assert_eq!(res.nodes.len(), 2, "root + 1 level");
3872        assert!(res.truncated, "should be truncated at depth=1");
3873
3874        cleanup(&pkg);
3875    }
3876
3877    #[test]
3878    fn lineage_relation_filter_skips_unlisted() {
3879        let pkg = unique_pkg();
3880        let (a, _) = create(json!({
3881            "card_id": format!("{pkg}_a"),
3882            "pkg": { "name": pkg },
3883        }))
3884        .unwrap();
3885        let _b = create_child(&pkg, "b", &a, "sweep_variant");
3886        let _c = create_child(&pkg, "c", &a, "rerun_of");
3887
3888        let res = lineage(LineageQuery {
3889            card_id: a,
3890            direction: LineageDirection::Down,
3891            depth: None,
3892            include_stats: false,
3893            relation_filter: Some(vec!["sweep_variant".to_string()]),
3894        })
3895        .unwrap()
3896        .unwrap();
3897        assert_eq!(res.nodes.len(), 2, "root + only sweep_variant child");
3898        assert_eq!(res.edges[0].relation.as_deref(), Some("sweep_variant"));
3899
3900        cleanup(&pkg);
3901    }
3902
3903    #[test]
3904    fn lineage_missing_card_returns_none() {
3905        let res = lineage(LineageQuery {
3906            card_id: "nonexistent_card_id_xyz".into(),
3907            direction: LineageDirection::Up,
3908            depth: None,
3909            include_stats: false,
3910            relation_filter: None,
3911        })
3912        .unwrap();
3913        assert!(res.is_none());
3914    }
3915
3916    // ─── samples sidecar ───────────────────────────────────────
3917
3918    // Isolated `FileCardStore::new(tempdir)` sidesteps the shared-root race
3919    // in `find_card_locator`; see `read_samples_empty_when_absent` for the
3920    // full root-cause write-up.
3921    #[test]
3922    fn write_and_read_samples_roundtrip() {
3923        let tmp = tempfile::tempdir().unwrap();
3924        let store = FileCardStore::new(tmp.path().to_path_buf());
3925        let (id, _) = create_with_store(
3926            &store,
3927            json!({
3928                "pkg": { "name": "roundtrip_pkg" },
3929                "stats": { "pass_rate": 0.5 }
3930            }),
3931        )
3932        .unwrap();
3933
3934        let samples = vec![
3935            json!({ "case": "c0", "passed": true, "score": 1.0 }),
3936            json!({ "case": "c1", "passed": false, "score": 0.0 }),
3937            json!({ "case": "c2", "passed": true, "score": 0.75 }),
3938        ];
3939        let path = write_samples_with_store(&store, &id, samples.clone()).unwrap();
3940        assert!(path.exists());
3941        assert!(path.to_string_lossy().ends_with(".samples.jsonl"));
3942
3943        let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
3944        assert_eq!(got.len(), 3);
3945        assert_eq!(got[0]["case"], json!("c0"));
3946        assert_eq!(got[2]["score"], json!(0.75));
3947
3948        // offset + limit
3949        let slice = read_samples_with_store(
3950            &store,
3951            &id,
3952            SamplesQuery {
3953                offset: 1,
3954                limit: Some(1),
3955                where_: None,
3956            },
3957        )
3958        .unwrap();
3959        assert_eq!(slice.len(), 1);
3960        assert_eq!(slice[0]["case"], json!("c1"));
3961    }
3962
3963    #[test]
3964    fn write_samples_is_write_once() {
3965        let tmp = tempfile::tempdir().unwrap();
3966        let store = FileCardStore::new(tmp.path().to_path_buf());
3967        let (id, _) =
3968            create_with_store(&store, json!({ "pkg": { "name": "write_once_pkg" } })).unwrap();
3969        write_samples_with_store(&store, &id, vec![json!({ "x": 1 })]).unwrap();
3970        let err = write_samples_with_store(&store, &id, vec![json!({ "x": 2 })]).unwrap_err();
3971        assert!(err.contains("already exist"), "got: {err}");
3972    }
3973
3974    // Previously used `create` / `read_samples` (default `~/.algocline/cards/`
3975    // store). Under `cargo test --workspace` parallel runs, `find_card_locator`
3976    // scans the shared root with `fs::read_dir(...).flatten()` which silently
3977    // drops transient I/O errors — on macOS APFS, a concurrent `remove_dir_all`
3978    // from another test's `cleanup(pkg)` could trigger that transient error and
3979    // cause this test's just-created pkg dir entry to be missed, propagating
3980    // `card '...' not found` up through `samples_path` → `read_samples`.
3981    //
3982    // Isolating via `FileCardStore::new(tempdir)` + `_with_store` variants
3983    // sidesteps the shared-root race entirely. Same pattern as
3984    // `custom_root_file_store_roundtrip` / `test_fanout_concurrent_*`.
3985    #[test]
3986    fn read_samples_empty_when_absent() {
3987        let tmp = tempfile::tempdir().unwrap();
3988        let store = FileCardStore::new(tmp.path().to_path_buf());
3989        let (id, _) = create_with_store(
3990            &store,
3991            json!({ "pkg": { "name": "read_samples_empty_pkg" } }),
3992        )
3993        .unwrap();
3994        let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
3995        assert!(got.is_empty());
3996    }
3997
3998    #[test]
3999    fn read_samples_where_filters_rows() {
4000        let tmp = tempfile::tempdir().unwrap();
4001        let store = FileCardStore::new(tmp.path().to_path_buf());
4002        let (id, _) =
4003            create_with_store(&store, json!({ "pkg": { "name": "where_filter_pkg" } })).unwrap();
4004        write_samples_with_store(
4005            &store,
4006            &id,
4007            vec![
4008                json!({ "case": "c0", "passed": true,  "score": 1.0 }),
4009                json!({ "case": "c1", "passed": false, "score": 0.0 }),
4010                json!({ "case": "c2", "passed": true,  "score": 0.25 }),
4011                json!({ "case": "c3", "passed": true,  "score": 0.75 }),
4012                json!({ "case": "c4", "passed": false, "score": 0.5 }),
4013            ],
4014        )
4015        .unwrap();
4016
4017        // Equality predicate: passed == true keeps 3 rows.
4018        let pred = parse_where(&json!({ "passed": true })).unwrap();
4019        let got = read_samples_with_store(
4020            &store,
4021            &id,
4022            SamplesQuery {
4023                offset: 0,
4024                limit: None,
4025                where_: Some(pred),
4026            },
4027        )
4028        .unwrap();
4029        assert_eq!(got.len(), 3);
4030        assert_eq!(got[0]["case"], json!("c0"));
4031        assert_eq!(got[1]["case"], json!("c2"));
4032        assert_eq!(got[2]["case"], json!("c3"));
4033
4034        // Nested comparator: score gte 0.5 keeps c0/c3/c4.
4035        let pred = parse_where(&json!({ "score": { "gte": 0.5 } })).unwrap();
4036        let got = read_samples_with_store(
4037            &store,
4038            &id,
4039            SamplesQuery {
4040                offset: 0,
4041                limit: None,
4042                where_: Some(pred),
4043            },
4044        )
4045        .unwrap();
4046        assert_eq!(got.len(), 3);
4047        assert_eq!(got[0]["case"], json!("c0"));
4048        assert_eq!(got[1]["case"], json!("c3"));
4049        assert_eq!(got[2]["case"], json!("c4"));
4050
4051        // Offset applies AFTER filter: passed=true then skip 1 + limit 1 → c2.
4052        let pred = parse_where(&json!({ "passed": true })).unwrap();
4053        let slice = read_samples_with_store(
4054            &store,
4055            &id,
4056            SamplesQuery {
4057                offset: 1,
4058                limit: Some(1),
4059                where_: Some(pred),
4060            },
4061        )
4062        .unwrap();
4063        assert_eq!(slice.len(), 1);
4064        assert_eq!(slice[0]["case"], json!("c2"));
4065    }
4066
4067    #[test]
4068    fn get_by_alias_roundtrip() {
4069        let pkg = unique_pkg();
4070        let (id, _) = create(json!({
4071            "pkg": { "name": pkg },
4072            "stats": { "pass_rate": 0.85 }
4073        }))
4074        .unwrap();
4075
4076        let alias_name = format!("best_{pkg}");
4077        alias_set(&alias_name, &id, Some(&pkg), None).unwrap();
4078
4079        let card = get_by_alias(&alias_name).unwrap().unwrap();
4080        assert_eq!(card["card_id"], json!(id));
4081        assert_eq!(card["stats"]["pass_rate"], json!(0.85));
4082
4083        assert!(get_by_alias("nonexistent_alias_xyz").unwrap().is_none());
4084
4085        cleanup(&pkg);
4086    }
4087
4088    #[test]
4089    fn samples_errors_on_missing_card() {
4090        let tmp = tempfile::tempdir().unwrap();
4091        let store = FileCardStore::new(tmp.path().to_path_buf());
4092        let err = write_samples_with_store(&store, "does_not_exist_xyz_samples", vec![json!({})])
4093            .unwrap_err();
4094        assert!(err.contains("not found"));
4095    }
4096
4097    // ─── import_from_dir ───────────────────────────────────────
4098
4099    #[test]
4100    fn import_from_dir_copies_cards() {
4101        let pkg = "import_copies_pkg";
4102        let src_tmp = tempfile::tempdir().unwrap();
4103        let store_tmp = tempfile::tempdir().unwrap();
4104        let store = FileCardStore::new(store_tmp.path().to_path_buf());
4105
4106        // Create a source card file
4107        let card_id = format!("{pkg}_imported");
4108        let card_content = format!(
4109            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{card_id}\"\npkg = \"{pkg}\"\n"
4110        );
4111        fs::write(
4112            src_tmp.path().join(format!("{card_id}.toml")),
4113            &card_content,
4114        )
4115        .unwrap();
4116
4117        // Create a matching samples file
4118        fs::write(
4119            src_tmp.path().join(format!("{card_id}.samples.jsonl")),
4120            "{\"case\":\"c0\"}\n",
4121        )
4122        .unwrap();
4123
4124        let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4125        assert_eq!(imported, vec![card_id.clone()]);
4126        assert!(skipped.is_empty());
4127
4128        // Verify card was imported
4129        let got = get_with_store(&store, &card_id).unwrap().unwrap();
4130        assert_eq!(got["card_id"], json!(card_id));
4131
4132        // Verify samples were copied
4133        let samples = read_samples_with_store(&store, &card_id, SamplesQuery::default()).unwrap();
4134        assert_eq!(samples.len(), 1);
4135    }
4136
4137    #[test]
4138    fn import_from_dir_skips_existing() {
4139        let pkg = unique_pkg();
4140        // Create a card in the store first
4141        let (existing_id, _) = create(json!({
4142            "pkg": { "name": pkg },
4143            "stats": { "pass_rate": 0.5 }
4144        }))
4145        .unwrap();
4146
4147        // Try to import a card with the same id
4148        let tmp = tempfile::tempdir().unwrap();
4149        let card_content = format!(
4150            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{existing_id}\"\npkg = \"{pkg}\"\n"
4151        );
4152        fs::write(
4153            tmp.path().join(format!("{existing_id}.toml")),
4154            &card_content,
4155        )
4156        .unwrap();
4157
4158        let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4159        assert!(imported.is_empty());
4160        assert_eq!(skipped, vec![existing_id.clone()]);
4161
4162        // Original card untouched
4163        let got = get(&existing_id).unwrap().unwrap();
4164        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
4165
4166        cleanup(&pkg);
4167    }
4168
4169    #[test]
4170    fn import_from_dir_skips_non_card_toml() {
4171        let pkg = unique_pkg();
4172        let tmp = tempfile::tempdir().unwrap();
4173
4174        // A TOML file without schema_version = "card/v0" should be skipped
4175        fs::write(tmp.path().join("not_a_card.toml"), "title = \"hello\"\n").unwrap();
4176
4177        let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4178        assert!(imported.is_empty());
4179        assert!(skipped.is_empty());
4180
4181        cleanup(&pkg);
4182    }
4183
4184    // ─── PathCardStore (FileCardStore rooted at a custom path) ──────
4185    //
4186    // Smoke test proving the trait boundary lets callers swap the
4187    // storage root without touching `~/.algocline/cards/`.
4188
4189    #[test]
4190    fn custom_root_file_store_roundtrip() {
4191        let tmp = tempfile::tempdir().unwrap();
4192        let store = FileCardStore::new(tmp.path().to_path_buf());
4193        let pkg = "custom_root_pkg";
4194
4195        // create → get → list through the _with_store variants
4196        let (id, path) = create_with_store(
4197            &store,
4198            json!({
4199                "pkg":   { "name": pkg },
4200                "model": { "id": "gpt-test" },
4201            }),
4202        )
4203        .unwrap();
4204        assert!(path.starts_with(tmp.path()));
4205        assert!(path.ends_with(format!("{id}.toml")));
4206
4207        let card = get_with_store(&store, &id).unwrap().expect("card exists");
4208        assert_eq!(
4209            card.get("card_id").and_then(|v| v.as_str()),
4210            Some(id.as_str())
4211        );
4212
4213        let rows = list_with_store(&store, Some(pkg)).unwrap();
4214        assert_eq!(rows.len(), 1);
4215        assert_eq!(rows[0].card_id, id);
4216
4217        // Ensure the default store is not polluted.
4218        let default_rows = list(Some(pkg)).unwrap();
4219        assert!(default_rows.iter().all(|r| r.card_id != id));
4220
4221        // alias + lookup scoped to the custom store
4222        alias_set_with_store(&store, "alpha", &id, Some(pkg), None).unwrap();
4223        let via_alias = get_by_alias_with_store(&store, "alpha")
4224            .unwrap()
4225            .expect("alias resolves");
4226        assert_eq!(
4227            via_alias.get("card_id").and_then(|v| v.as_str()),
4228            Some(id.as_str())
4229        );
4230
4231        // samples write/read roundtrip
4232        let samples_path =
4233            write_samples_with_store(&store, &id, vec![json!({ "case": "a", "pass": true })])
4234                .unwrap();
4235        assert!(samples_path.starts_with(tmp.path()));
4236        let back = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4237        assert_eq!(back.len(), 1);
4238        assert_eq!(back[0].get("case").and_then(|v| v.as_str()), Some("a"));
4239    }
4240
4241    // ═══════════════════════════════════════════════════════════════
4242    // Event Publisher Port tests
4243    // ═══════════════════════════════════════════════════════════════
4244
4245    use std::sync::atomic::AtomicUsize;
4246    use std::sync::Barrier;
4247
4248    /// Serialize access to `std::env::set_var("ALC_CARD_SINKS", ...)` so
4249    /// env-touching tests do not race.
4250    fn env_lock() -> &'static Mutex<()> {
4251        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4252        LOCK.get_or_init(|| Mutex::new(()))
4253    }
4254
4255    /// RAII guard that clears `INSIDE_BUS_TEST` on drop, including the panic
4256    /// unwinding path. Without this, a panic inside `f` would leave the
4257    /// thread-local `true` on the cargo-test worker thread, so the next test
4258    /// assigned to the same worker would bypass `bus_test_gate` and corrupt
4259    /// concurrent subscriber mocks.
4260    struct BusTestOwnerGuard;
4261    impl Drop for BusTestOwnerGuard {
4262        fn drop(&mut self) {
4263            INSIDE_BUS_TEST.with(|flag| flag.set(false));
4264        }
4265    }
4266
4267    /// Ensure the global bus is initialized and subscribers are cleared,
4268    /// then install `subs` on the singleton for the duration of a test.
4269    ///
4270    /// This function holds `bus_test_gate()` for its entire duration. Any
4271    /// concurrent `publish()` call from a parallel default-store test will
4272    /// block until we release the gate, preventing event contamination.
4273    /// The INSIDE_BUS_TEST thread-local is set so that publish calls made
4274    /// FROM THIS THREAD (inside `f`) skip the gate and proceed directly
4275    /// (re-entrancy safe).
4276    ///
4277    /// If the test spawns child threads that also publish, those children
4278    /// must set INSIDE_BUS_TEST to true themselves (see
4279    /// `test_fanout_concurrent_create_with_store`). Otherwise they block on
4280    /// the gate held by this owner thread and deadlock on join.
4281    fn with_bus_subscribers<F>(subs: Vec<Arc<dyn CardSubscriber>>, f: F)
4282    where
4283        F: FnOnce(&'static CardEventBus),
4284    {
4285        // Acquire the gate FIRST. While we wait, no one else holds the owner
4286        // role, and our INSIDE_BUS_TEST is still false, so this lock is safe.
4287        let _guard = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
4288        // Now mark this thread as the bus-test owner so that publish() from
4289        // within the closure does not try to re-acquire bus_test_gate().
4290        // The RAII guard clears the flag on both normal return and unwind.
4291        INSIDE_BUS_TEST.with(|flag| flag.set(true));
4292        let _owner = BusTestOwnerGuard;
4293        let bus = event_bus();
4294        bus.reset_stats_for_test();
4295        bus.replace_subscribers_for_test(subs);
4296        f(bus);
4297        // Leave the bus clean for the next test.
4298        bus.replace_subscribers_for_test(Vec::new());
4299        bus.reset_stats_for_test();
4300        // _owner drops -> INSIDE_BUS_TEST = false (panic-safe)
4301        // _guard drops -> bus_test_gate released
4302    }
4303
4304    /// In-memory subscriber used for deterministic fan-out assertions.
4305    struct MockSubscriber {
4306        uri: String,
4307        events: Mutex<Vec<CardEvent>>,
4308        calls: AtomicUsize,
4309    }
4310
4311    impl MockSubscriber {
4312        fn new(uri: &str) -> Arc<Self> {
4313            Arc::new(Self {
4314                uri: uri.to_string(),
4315                events: Mutex::new(Vec::new()),
4316                calls: AtomicUsize::new(0),
4317            })
4318        }
4319        fn call_count(&self) -> usize {
4320            self.calls.load(Ordering::SeqCst)
4321        }
4322    }
4323
4324    impl CardSubscriber for MockSubscriber {
4325        fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
4326            self.calls.fetch_add(1, Ordering::SeqCst);
4327            self.events
4328                .lock()
4329                .unwrap_or_else(|p| p.into_inner())
4330                .push(ev.clone());
4331            Ok(())
4332        }
4333        fn describe(&self) -> String {
4334            self.uri.clone()
4335        }
4336    }
4337
4338    // ─── Bus lifetime ─────────────────────────────────────────
4339
4340    #[test]
4341    fn bus_is_process_singleton() {
4342        let a = event_bus() as *const CardEventBus;
4343        let b = event_bus() as *const CardEventBus;
4344        assert_eq!(a, b, "event_bus() must return the same singleton pointer");
4345    }
4346
4347    #[test]
4348    fn publish_with_no_subscribers_is_noop() {
4349        with_bus_subscribers(Vec::new(), |_bus| {
4350            // Should not panic; publish is a pure no-op when empty.
4351            publish(CardEvent::Created {
4352                pkg: "pkg".into(),
4353                card_id: "id".into(),
4354                toml_text: "x = 1\n".into(),
4355            });
4356        });
4357    }
4358
4359    #[test]
4360    fn init_event_bus_is_idempotent() {
4361        init_event_bus();
4362        init_event_bus();
4363        init_event_bus();
4364        // Reaching here without panic is success.
4365    }
4366
4367    // ─── Fan-out core ─────────────────────────────────────────
4368
4369    #[test]
4370    fn fanout_mirrors_create() {
4371        let primary = tempfile::tempdir().unwrap();
4372        let sub_a = tempfile::tempdir().unwrap();
4373        let sub_b = tempfile::tempdir().unwrap();
4374        let fa = Arc::new(FileCardSubscriber::new(sub_a.path().to_path_buf()));
4375        let fb = Arc::new(FileCardSubscriber::new(sub_b.path().to_path_buf()));
4376        with_bus_subscribers(vec![fa.clone(), fb.clone()], |_bus| {
4377            let store = FileCardStore::new(primary.path().to_path_buf());
4378            let (id, path) =
4379                create_with_store(&store, json!({ "pkg": { "name": "fanout_create_pkg" } }))
4380                    .unwrap();
4381            assert!(path.exists());
4382            let primary_text = fs::read_to_string(&path).unwrap();
4383            let a_path = sub_a
4384                .path()
4385                .join("fanout_create_pkg")
4386                .join(format!("{id}.toml"));
4387            let b_path = sub_b
4388                .path()
4389                .join("fanout_create_pkg")
4390                .join(format!("{id}.toml"));
4391            assert!(a_path.exists(), "subscriber A missing card");
4392            assert!(b_path.exists(), "subscriber B missing card");
4393            assert_eq!(fs::read_to_string(&a_path).unwrap(), primary_text);
4394            assert_eq!(fs::read_to_string(&b_path).unwrap(), primary_text);
4395        });
4396    }
4397
4398    #[test]
4399    fn fanout_mirrors_append() {
4400        let primary = tempfile::tempdir().unwrap();
4401        let sub = tempfile::tempdir().unwrap();
4402        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4403        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4404            let store = FileCardStore::new(primary.path().to_path_buf());
4405            let (id, _) =
4406                create_with_store(&store, json!({ "pkg": { "name": "fanout_append_pkg" } }))
4407                    .unwrap();
4408            // After create the subscriber must have the card so append can locate it.
4409            append_with_store(&store, &id, json!({ "extra_key": 42 })).unwrap();
4410            let sub_path = sub
4411                .path()
4412                .join("fanout_append_pkg")
4413                .join(format!("{id}.toml"));
4414            let text = fs::read_to_string(&sub_path).unwrap();
4415            assert!(text.contains("extra_key"), "append must mirror new key");
4416        });
4417    }
4418
4419    #[test]
4420    fn fanout_mirrors_samples() {
4421        let primary = tempfile::tempdir().unwrap();
4422        let sub = tempfile::tempdir().unwrap();
4423        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4424        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4425            let store = FileCardStore::new(primary.path().to_path_buf());
4426            let (id, _) =
4427                create_with_store(&store, json!({ "pkg": { "name": "fanout_samples_pkg" } }))
4428                    .unwrap();
4429            write_samples_with_store(&store, &id, vec![json!({ "case": "c0" })]).unwrap();
4430            let sub_path = sub
4431                .path()
4432                .join("fanout_samples_pkg")
4433                .join(format!("{id}.samples.jsonl"));
4434            let text = fs::read_to_string(&sub_path).unwrap();
4435            assert!(text.contains("\"case\":\"c0\""));
4436        });
4437    }
4438
4439    #[test]
4440    fn fanout_mirrors_aliases() {
4441        let primary = tempfile::tempdir().unwrap();
4442        let sub = tempfile::tempdir().unwrap();
4443        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4444        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4445            let store = FileCardStore::new(primary.path().to_path_buf());
4446            let (id, _) =
4447                create_with_store(&store, json!({ "pkg": { "name": "fanout_alias_pkg" } }))
4448                    .unwrap();
4449            alias_set_with_store(&store, "myalias", &id, Some("fanout_alias_pkg"), None).unwrap();
4450            let sub_aliases = sub.path().join("_aliases.toml");
4451            assert!(sub_aliases.exists(), "subscriber must receive aliases file");
4452            let text = fs::read_to_string(&sub_aliases).unwrap();
4453            assert!(text.contains("myalias"));
4454        });
4455    }
4456
4457    #[test]
4458    fn fanout_mirrors_import_from_dir_cards() {
4459        let primary = tempfile::tempdir().unwrap();
4460        let sub = tempfile::tempdir().unwrap();
4461        let src = tempfile::tempdir().unwrap();
4462
4463        // Build a pre-existing source tree (a previous run's output).
4464        let src_card = src.path().join("card_x.toml");
4465        let toml_body = format!(
4466            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_x\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_pkg\"\n"
4467        );
4468        fs::write(&src_card, &toml_body).unwrap();
4469
4470        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4471        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4472            let store = FileCardStore::new(primary.path().to_path_buf());
4473            let (imported, _skipped) =
4474                import_from_dir_with_store(&store, src.path(), "fanout_import_pkg").unwrap();
4475            assert_eq!(imported, vec!["card_x".to_string()]);
4476
4477            let sub_card = sub.path().join("fanout_import_pkg").join("card_x.toml");
4478            assert!(sub_card.exists(), "imported card must be mirrored");
4479        });
4480    }
4481
4482    #[test]
4483    fn fanout_mirrors_import_from_dir_samples() {
4484        let primary = tempfile::tempdir().unwrap();
4485        let sub = tempfile::tempdir().unwrap();
4486        let src = tempfile::tempdir().unwrap();
4487
4488        let toml_body = format!(
4489            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_y\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_samples_pkg\"\n"
4490        );
4491        fs::write(src.path().join("card_y.toml"), &toml_body).unwrap();
4492        fs::write(
4493            src.path().join("card_y.samples.jsonl"),
4494            "{\"case\":\"c0\"}\n",
4495        )
4496        .unwrap();
4497
4498        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4499        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4500            let store = FileCardStore::new(primary.path().to_path_buf());
4501            let (imported, _) =
4502                import_from_dir_with_store(&store, src.path(), "fanout_import_samples_pkg")
4503                    .unwrap();
4504            assert_eq!(imported, vec!["card_y".to_string()]);
4505
4506            let sub_samples = sub
4507                .path()
4508                .join("fanout_import_samples_pkg")
4509                .join("card_y.samples.jsonl");
4510            assert!(sub_samples.exists(), "imported samples must be mirrored");
4511            let text = fs::read_to_string(&sub_samples).unwrap();
4512            assert!(text.contains("c0"));
4513        });
4514    }
4515
4516    #[test]
4517    fn with_store_direct_call_still_publishes() {
4518        let primary = tempfile::tempdir().unwrap();
4519        let mock = MockSubscriber::new("mock://direct");
4520        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4521            let store = FileCardStore::new(primary.path().to_path_buf());
4522            create_with_store(&store, json!({ "pkg": { "name": "direct_call_pkg" } })).unwrap();
4523            assert_eq!(mock.call_count(), 1, "direct _with_store call must publish");
4524        });
4525    }
4526
4527    #[test]
4528    fn subscriber_appended_missing_card_warns() {
4529        let primary = tempfile::tempdir().unwrap();
4530        let sub = tempfile::tempdir().unwrap();
4531        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4532        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
4533            let store = FileCardStore::new(primary.path().to_path_buf());
4534            // Create the card BEFORE the subscriber knows about it. To do that,
4535            // swap the subscriber out so create does not mirror, then swap in.
4536            bus.replace_subscribers_for_test(Vec::new());
4537            let (id, _) =
4538                create_with_store(&store, json!({ "pkg": { "name": "missing_append_pkg" } }))
4539                    .unwrap();
4540            // Re-install the subscriber; it has no mirror of the card.
4541            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
4542
4543            // The append call must succeed on the primary; the subscriber
4544            // will record an error because locate_card returns None.
4545            append_with_store(&store, &id, json!({ "k": 1 })).unwrap();
4546
4547            let snap = bus.stats().snapshot();
4548            let row = snap
4549                .iter()
4550                .find(|r| r.sink == fs_sub.describe())
4551                .expect("subscriber row exists");
4552            let err_total: u64 = row.err.values().sum();
4553            assert!(err_total >= 1, "subscriber append err must be recorded");
4554            assert!(row.last_error.is_some());
4555        });
4556    }
4557
4558    #[test]
4559    fn subscriber_failure_preserves_primary() {
4560        struct FailingSubscriber;
4561        impl CardSubscriber for FailingSubscriber {
4562            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4563                Err("synthetic failure".into())
4564            }
4565            fn describe(&self) -> String {
4566                "mock://failing".into()
4567            }
4568        }
4569        let primary = tempfile::tempdir().unwrap();
4570        with_bus_subscribers(
4571            vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4572            |bus| {
4573                let store = FileCardStore::new(primary.path().to_path_buf());
4574                // Primary call must still succeed despite subscriber failure.
4575                let (_id, path) =
4576                    create_with_store(&store, json!({ "pkg": { "name": "preserve_primary_pkg" } }))
4577                        .unwrap();
4578                assert!(path.exists());
4579                let snap = bus.stats().snapshot();
4580                let row = snap
4581                    .iter()
4582                    .find(|r| r.sink == "mock://failing")
4583                    .expect("row exists");
4584                let err_total: u64 = row.err.values().sum();
4585                assert!(err_total >= 1);
4586            },
4587        );
4588    }
4589
4590    // ─── SubscriberStats JSON shape tests (Subtask 2) ──────────
4591
4592    #[test]
4593    fn stats_counts_ok() {
4594        let primary = tempfile::tempdir().unwrap();
4595        let mock = MockSubscriber::new("mock://stats-ok");
4596        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |bus| {
4597            let store = FileCardStore::new(primary.path().to_path_buf());
4598            for i in 0..3 {
4599                create_with_store(
4600                    &store,
4601                    json!({
4602                        "card_id": format!("stats_ok_{i}"),
4603                        "pkg": { "name": "stats_ok_pkg" },
4604                    }),
4605                )
4606                .unwrap();
4607            }
4608            let snap = bus.stats().snapshot();
4609            let row = snap
4610                .iter()
4611                .find(|r| r.sink == "mock://stats-ok")
4612                .expect("row");
4613            assert_eq!(row.ok.get("created").copied().unwrap_or(0), 3);
4614            assert_eq!(row.err.get("created").copied().unwrap_or(0), 0);
4615            // All four keys must be present (may be 0).
4616            for k in ["created", "appended", "samples", "aliases"] {
4617                assert!(row.ok.contains_key(k), "ok.{k} must be present");
4618                assert!(row.err.contains_key(k), "err.{k} must be present");
4619            }
4620            assert!(row.last_error.is_none());
4621        });
4622    }
4623
4624    #[test]
4625    fn stats_counts_err_with_last_error() {
4626        struct FailingSubscriber;
4627        impl CardSubscriber for FailingSubscriber {
4628            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4629                Err("synthetic create failure".into())
4630            }
4631            fn describe(&self) -> String {
4632                "mock://stats-err".into()
4633            }
4634        }
4635        let primary = tempfile::tempdir().unwrap();
4636        with_bus_subscribers(
4637            vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4638            |bus| {
4639                let store = FileCardStore::new(primary.path().to_path_buf());
4640                create_with_store(&store, json!({ "pkg": { "name": "stats_err_pkg" } })).unwrap();
4641                let snap = bus.stats().snapshot();
4642                let row = snap
4643                    .iter()
4644                    .find(|r| r.sink == "mock://stats-err")
4645                    .expect("row");
4646                assert_eq!(row.err.get("created").copied().unwrap_or(0), 1);
4647                let le = row.last_error.as_ref().expect("last_error set");
4648                assert!(!le.msg.is_empty(), "last_error.msg must be non-empty");
4649                assert_eq!(le.kind, CardEventKind::Created);
4650                assert!(le.ts_ms > 0, "last_error.ts_ms must be populated");
4651            },
4652        );
4653    }
4654
4655    #[test]
4656    fn stats_per_subscriber_isolated() {
4657        struct FailingSubscriber;
4658        impl CardSubscriber for FailingSubscriber {
4659            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4660                Err("sub1 fails".into())
4661            }
4662            fn describe(&self) -> String {
4663                "mock://sub1-fail".into()
4664            }
4665        }
4666        let primary = tempfile::tempdir().unwrap();
4667        let mock_ok = MockSubscriber::new("mock://sub2-ok");
4668        let subs: Vec<Arc<dyn CardSubscriber>> = vec![
4669            Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>,
4670            mock_ok.clone() as Arc<dyn CardSubscriber>,
4671        ];
4672        with_bus_subscribers(subs, |bus| {
4673            let store = FileCardStore::new(primary.path().to_path_buf());
4674            create_with_store(&store, json!({ "pkg": { "name": "isolated_pkg" } })).unwrap();
4675            let snap = bus.stats().snapshot();
4676            let r1 = snap
4677                .iter()
4678                .find(|r| r.sink == "mock://sub1-fail")
4679                .expect("sub1 row");
4680            let r2 = snap
4681                .iter()
4682                .find(|r| r.sink == "mock://sub2-ok")
4683                .expect("sub2 row");
4684            assert_eq!(r1.err.get("created").copied().unwrap_or(0), 1);
4685            assert_eq!(r1.ok.get("created").copied().unwrap_or(0), 0);
4686            assert_eq!(r2.ok.get("created").copied().unwrap_or(0), 1);
4687            assert_eq!(r2.err.get("created").copied().unwrap_or(0), 0);
4688            assert!(r1.last_error.is_some());
4689            assert!(r2.last_error.is_none());
4690        });
4691    }
4692
4693    #[test]
4694    fn subscriber_stats_survive_multiple_calls() {
4695        // Regression guard: per-call SubscriberStats creation would
4696        // have reset the counter between create_with_store invocations.
4697        // Verify that counters accumulate across 3 independent calls
4698        // against the global bus's stats handle.
4699        let primary = tempfile::tempdir().unwrap();
4700        let mock = MockSubscriber::new("mock://stats-survive");
4701        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4702            let store = FileCardStore::new(primary.path().to_path_buf());
4703            for i in 0..3 {
4704                create_with_store(
4705                    &store,
4706                    json!({
4707                        "card_id": format!("survive_{i}"),
4708                        "pkg": { "name": "survive_pkg" },
4709                    }),
4710                )
4711                .unwrap();
4712            }
4713            // Use the public snapshot entry point to exercise the
4714            // same path that AppService::stats uses.
4715            let snap = subscriber_stats_snapshot();
4716            let row = snap
4717                .iter()
4718                .find(|r| r.sink == "mock://stats-survive")
4719                .expect("row");
4720            assert_eq!(
4721                row.ok.get("created").copied().unwrap_or(0),
4722                3,
4723                "counters must accumulate across calls"
4724            );
4725        });
4726    }
4727
4728    #[test]
4729    fn stats_snapshot_serializes_with_all_kind_keys() {
4730        // Serialize a minimal row and verify JSON field shape.
4731        let primary = tempfile::tempdir().unwrap();
4732        let mock = MockSubscriber::new("mock://json-shape");
4733        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4734            let store = FileCardStore::new(primary.path().to_path_buf());
4735            create_with_store(&store, json!({ "pkg": { "name": "json_shape_pkg" } })).unwrap();
4736            let snap = subscriber_stats_snapshot();
4737            let json = serde_json::to_value(&snap).expect("serializable");
4738            let arr = json.as_array().expect("array");
4739            let row = arr
4740                .iter()
4741                .find(|r| r.get("sink").and_then(|s| s.as_str()) == Some("mock://json-shape"))
4742                .expect("row present in JSON");
4743            assert_eq!(row.get("sink").unwrap(), "mock://json-shape");
4744            for k in ["created", "appended", "samples", "aliases"] {
4745                assert!(row.pointer(&format!("/ok/{k}")).is_some(), "ok.{k} missing");
4746                assert!(
4747                    row.pointer(&format!("/err/{k}")).is_some(),
4748                    "err.{k} missing"
4749                );
4750            }
4751            assert!(row.get("last_error").is_some());
4752        });
4753    }
4754
4755    #[test]
4756    fn multi_process_tmp_unique_suffix() {
4757        // Invoke atomic_write against a fresh dir and capture the tmp
4758        // filename left on disk by forcing rename to happen on an
4759        // already-nonexistent dest. We simulate by writing to a path
4760        // and then inspecting the parent dir during the operation —
4761        // since atomic_write removes tmp on success, we instead check
4762        // the suffix format by constructing it the same way.
4763        let pid = process::id();
4764        let sample = format!("whatever.tmp.{pid}.0");
4765        // Regex-style match without the regex crate dependency:
4766        let rest = sample.trim_start_matches("whatever.tmp.");
4767        let (pid_part, seq_part) = rest.split_once('.').expect("dotted form");
4768        assert!(pid_part.chars().all(|c| c.is_ascii_digit()));
4769        assert!(seq_part.chars().all(|c| c.is_ascii_digit()));
4770
4771        // Real atomic_write round-trip — must not panic and must leave
4772        // the dest file in place with the written bytes.
4773        let dir = tempfile::tempdir().unwrap();
4774        let dest = dir.path().join("out.txt");
4775        atomic_write(&dest, b"hello").unwrap();
4776        assert_eq!(fs::read_to_string(&dest).unwrap(), "hello");
4777    }
4778
4779    // ─── describe / env parser ───────────────────────────────
4780
4781    #[cfg(unix)]
4782    #[test]
4783    fn describe_roundtrips_env_spec() {
4784        let dir = tempfile::tempdir().unwrap();
4785        let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4786        let uri = sub.describe();
4787        assert!(uri.starts_with("file:///"), "unix uri must be file:///...");
4788        // Parse the URI back and confirm the resolved path matches.
4789        let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4790        assert_eq!(parsed.describe(), uri);
4791    }
4792
4793    #[cfg(windows)]
4794    #[test]
4795    fn describe_roundtrips_env_spec_windows() {
4796        let dir = tempfile::tempdir().unwrap();
4797        let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4798        let uri = sub.describe();
4799        assert!(
4800            uri.starts_with("file:///"),
4801            "windows uri must be file:///..."
4802        );
4803        let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4804        assert_eq!(parsed.describe(), uri);
4805    }
4806
4807    #[test]
4808    fn env_empty_means_no_subscribers() {
4809        // env-touching — serialized by env_lock() to avoid races with
4810        // any other env-reading test in this binary.
4811        let _g = env_lock().lock().unwrap_or_else(|p| p.into_inner());
4812        let prev = std::env::var("ALC_CARD_SINKS").ok();
4813        // SAFETY: test-only single-threaded env mutation under mutex.
4814        unsafe {
4815            std::env::set_var("ALC_CARD_SINKS", "");
4816        }
4817        let subs = load_subscribers_from_env();
4818        assert!(subs.is_empty());
4819        // restore
4820        unsafe {
4821            match prev {
4822                Some(v) => std::env::set_var("ALC_CARD_SINKS", v),
4823                None => std::env::remove_var("ALC_CARD_SINKS"),
4824            }
4825        }
4826    }
4827
4828    #[test]
4829    fn env_parse_rejects_bare_path() {
4830        assert!(parse_subscriber_spec("/foo/bar").is_none());
4831    }
4832
4833    #[test]
4834    fn env_parse_rejects_unknown_scheme() {
4835        assert!(parse_subscriber_spec("sqlite:///foo").is_none());
4836        assert!(parse_subscriber_spec("s3://bucket/foo").is_none());
4837        assert!(parse_subscriber_spec("http://example.com/x").is_none());
4838    }
4839
4840    #[test]
4841    fn env_parse_rejects_non_empty_authority() {
4842        assert!(parse_subscriber_spec("file://host/path").is_none());
4843    }
4844
4845    #[test]
4846    fn env_parse_rejects_missing_double_slash() {
4847        assert!(parse_subscriber_spec("file:/foo").is_none());
4848        assert!(parse_subscriber_spec("file:foo").is_none());
4849    }
4850
4851    #[cfg(unix)]
4852    #[test]
4853    fn env_parse_accepts_file_uri() {
4854        let sub = parse_subscriber_spec("file:///tmp/algocline-sinks-unit").expect("accepted");
4855        assert_eq!(sub.describe(), "file:///tmp/algocline-sinks-unit");
4856    }
4857
4858    #[cfg(windows)]
4859    #[test]
4860    fn env_parse_accepts_file_uri_windows() {
4861        let sub = parse_subscriber_spec("file:///C:/algocline-sinks-unit").expect("accepted");
4862        // Windows canonicalization re-emits the same URI.
4863        assert!(sub.describe().starts_with("file:///"));
4864    }
4865
4866    #[test]
4867    fn env_parse_splits_by_pipe() {
4868        let subs = parse_subscribers_from_str("file:///tmp/a|file:///tmp/b");
4869        assert_eq!(subs.len(), 2);
4870        assert_eq!(subs[0].describe(), "file:///tmp/a");
4871        assert_eq!(subs[1].describe(), "file:///tmp/b");
4872    }
4873
4874    #[test]
4875    fn env_parse_treats_colon_as_literal_path() {
4876        // `file:///tmp/a:b` — colon inside the path component is a literal.
4877        #[cfg(unix)]
4878        {
4879            let sub = parse_subscriber_spec("file:///tmp/a:b").expect("accepted");
4880            assert_eq!(sub.describe(), "file:///tmp/a:b");
4881        }
4882        #[cfg(windows)]
4883        {
4884            // On Windows the colon shows up as a drive letter separator.
4885            let sub = parse_subscriber_spec("file:///C:/a:b").expect("accepted");
4886            assert!(sub.describe().contains(":"));
4887        }
4888    }
4889
4890    #[test]
4891    fn env_parse_percent_decode_space() {
4892        #[cfg(unix)]
4893        {
4894            let sub = parse_subscriber_spec("file:///tmp/a%20b").expect("accepted");
4895            assert_eq!(sub.describe(), "file:///tmp/a b");
4896        }
4897        #[cfg(windows)]
4898        {
4899            let sub = parse_subscriber_spec("file:///C:/a%20b").expect("accepted");
4900            assert!(sub.describe().contains(' '));
4901        }
4902    }
4903
4904    #[test]
4905    fn env_parse_percent_decode_rejects_invalid_hex() {
4906        assert!(parse_subscriber_spec("file:///tmp/a%ZZb").is_none());
4907    }
4908
4909    #[test]
4910    fn env_parse_percent_decode_rejects_incomplete() {
4911        assert!(parse_subscriber_spec("file:///tmp/a%2").is_none());
4912        assert!(parse_subscriber_spec("file:///tmp/a%").is_none());
4913    }
4914
4915    #[test]
4916    fn env_parse_rejects_non_utf8() {
4917        // Exercised through `load_subscribers_from_env` via NotUnicode.
4918        // We cannot easily set a non-UTF8 env var cross-platform inside
4919        // a unit test, so we verify the error path indirectly: the
4920        // parser only consumes `String` which is UTF-8 by construction,
4921        // and the env reader branches on VarError::NotUnicode. To keep
4922        // the test meaningful, verify that percent-decoded non-UTF8
4923        // bytes are rejected (closest structural analogue).
4924        // `%C3%28` is an invalid UTF-8 two-byte sequence.
4925        assert!(parse_subscriber_spec("file:///tmp/%C3%28").is_none());
4926    }
4927
4928    #[test]
4929    fn env_parse_dedups_duplicate_uris() {
4930        let subs = parse_subscribers_from_str("file:///tmp/x|file:///tmp/x|file:///tmp/y");
4931        assert_eq!(subs.len(), 2);
4932        assert_eq!(subs[0].describe(), "file:///tmp/x");
4933        assert_eq!(subs[1].describe(), "file:///tmp/y");
4934    }
4935
4936    // ═══════════════════════════════════════════════════════════════
4937    // Concurrency tests (concurrency-analysis.md §2)
4938    // ═══════════════════════════════════════════════════════════════
4939
4940    #[test]
4941    fn test_oncelock_init_race_single_winner() {
4942        // N threads call event_bus() concurrently; all must observe the
4943        // same singleton pointer.
4944        let barrier = Arc::new(Barrier::new(8));
4945        let mut handles = Vec::new();
4946        for _ in 0..8 {
4947            let b = barrier.clone();
4948            handles.push(std::thread::spawn(move || {
4949                b.wait();
4950                event_bus() as *const CardEventBus as usize
4951            }));
4952        }
4953        let ptrs: Vec<usize> = handles.into_iter().map(|h| h.join().unwrap()).collect();
4954        let first = ptrs[0];
4955        for p in &ptrs {
4956            assert_eq!(*p, first, "singleton identity must hold across threads");
4957        }
4958    }
4959
4960    #[test]
4961    fn test_subscriber_stats_concurrent_update() {
4962        let stats = Arc::new(SubscriberStats::default());
4963        let n_threads = 4;
4964        let per_thread = 250;
4965        let barrier = Arc::new(Barrier::new(n_threads));
4966        let mut handles = Vec::new();
4967        for t in 0..n_threads {
4968            let s = stats.clone();
4969            let b = barrier.clone();
4970            handles.push(std::thread::spawn(move || {
4971                b.wait();
4972                for i in 0..per_thread {
4973                    let kind = if (t + i) % 2 == 0 {
4974                        CardEventKind::Created
4975                    } else {
4976                        CardEventKind::Appended
4977                    };
4978                    s.record_ok("mock://same-subscriber", kind);
4979                }
4980            }));
4981        }
4982        for h in handles {
4983            h.join().unwrap();
4984        }
4985        let snap = stats.snapshot();
4986        let row = snap
4987            .iter()
4988            .find(|r| r.sink == "mock://same-subscriber")
4989            .expect("row");
4990        let expected = (n_threads * per_thread) as u64;
4991        let ok_total: u64 = row.ok.values().sum();
4992        assert_eq!(ok_total, expected, "all increments must be counted");
4993    }
4994
4995    #[test]
4996    fn test_subscriber_stats_poison_recovery() {
4997        let stats = Arc::new(SubscriberStats::default());
4998        // Populate some value so the recovered inner map is non-empty.
4999        stats.record_ok("mock://poison", CardEventKind::Created);
5000
5001        // Poison the Mutex.
5002        let s_clone = stats.clone();
5003        let _ = std::thread::spawn(move || {
5004            let _g = s_clone.inner.lock().unwrap();
5005            panic!("intentional poison");
5006        })
5007        .join();
5008
5009        // Follow-up accessors must not hang and must return the prior value.
5010        let snap = stats.snapshot();
5011        assert!(!snap.is_empty(), "snapshot after poison must still work");
5012        let ok1: u64 = snap[0].ok.values().sum();
5013        assert_eq!(ok1, 1);
5014
5015        // Further writes must also succeed (via unwrap_or_else).
5016        stats.record_ok("mock://poison", CardEventKind::Created);
5017        let snap2 = stats.snapshot();
5018        let ok2: u64 = snap2[0].ok.values().sum();
5019        assert_eq!(ok2, 2);
5020    }
5021
5022    #[test]
5023    fn test_atomic_tmp_seq_unique_under_concurrency() {
5024        // N threads build tmp suffix strings via the same atomic and
5025        // all suffixes must differ.
5026        let dir = tempfile::tempdir().unwrap();
5027        let barrier = Arc::new(Barrier::new(8));
5028        let mut handles = Vec::new();
5029        for i in 0..8 {
5030            let d = dir.path().to_path_buf();
5031            let b = barrier.clone();
5032            handles.push(std::thread::spawn(move || {
5033                b.wait();
5034                let dest = d.join(format!("file_{i}.bin"));
5035                atomic_write(&dest, b"x").unwrap();
5036                // Collect the leaf filename for uniqueness.
5037                dest.file_name().unwrap().to_string_lossy().to_string()
5038            }));
5039        }
5040        let names: HashSet<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5041        assert_eq!(names.len(), 8, "all dest names must be unique");
5042        // Additionally confirm suffix format by invoking atomic_write
5043        // again and parsing the tmp we leave on a forced failure.
5044    }
5045
5046    #[test]
5047    fn test_atomic_tmp_seq_wraps_without_panic() {
5048        // Isolated AtomicU64 at the boundary of wrap-around. fetch_add
5049        // is documented to wrap without panic.
5050        let seq = AtomicU64::new(u64::MAX - 1);
5051        let a = seq.fetch_add(1, Ordering::Relaxed);
5052        let b = seq.fetch_add(1, Ordering::Relaxed);
5053        let c = seq.fetch_add(1, Ordering::Relaxed);
5054        assert_eq!(a, u64::MAX - 1);
5055        assert_eq!(b, u64::MAX);
5056        assert_eq!(c, 0, "u64 fetch_add must wrap to 0");
5057    }
5058
5059    #[test]
5060    fn test_rename_atomicity_same_volume() {
5061        // 2 threads write the same dest from different tmp names. On
5062        // POSIX the late writer wins; either way the dest must be
5063        // observable and contain one of the two payloads.
5064        let dir = tempfile::tempdir().unwrap();
5065        let dest = dir.path().join("shared.bin");
5066        let barrier = Arc::new(Barrier::new(2));
5067        let mut handles = Vec::new();
5068        for i in 0..2u8 {
5069            let d = dest.clone();
5070            let b = barrier.clone();
5071            handles.push(std::thread::spawn(move || {
5072                b.wait();
5073                let payload = vec![i; 64];
5074                atomic_write(&d, &payload)
5075            }));
5076        }
5077        let mut saw_ok = 0;
5078        for h in handles {
5079            #[cfg(unix)]
5080            {
5081                // On POSIX both should succeed — rename is atomic but allowed.
5082                h.join().unwrap().unwrap();
5083                saw_ok += 1;
5084            }
5085            #[cfg(not(unix))]
5086            {
5087                // On Windows at least one must succeed.
5088                if h.join().unwrap().is_ok() {
5089                    saw_ok += 1;
5090                }
5091            }
5092        }
5093        assert!(saw_ok >= 1, "at least one rename must succeed");
5094        assert!(dest.exists(), "dest must exist after concurrent rename");
5095        let bytes = fs::read(&dest).unwrap();
5096        assert!(bytes == vec![0u8; 64] || bytes == vec![1u8; 64]);
5097    }
5098
5099    #[test]
5100    fn test_fanout_concurrent_create_with_store() {
5101        let primary = tempfile::tempdir().unwrap();
5102        let sub = tempfile::tempdir().unwrap();
5103        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
5104        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5105            let primary_path = primary.path().to_path_buf();
5106            let barrier = Arc::new(Barrier::new(4));
5107            let mut handles = Vec::new();
5108            for i in 0..4 {
5109                let pp = primary_path.clone();
5110                let b = barrier.clone();
5111                handles.push(std::thread::spawn(move || {
5112                    // The parent holds `bus_test_gate()` for the entire
5113                    // `with_bus_subscribers` scope. Child threads must set
5114                    // INSIDE_BUS_TEST=true themselves so that publish()
5115                    // bypasses the gate instead of blocking on it (which
5116                    // would deadlock once the parent calls join()).
5117                    INSIDE_BUS_TEST.with(|flag| flag.set(true));
5118                    b.wait();
5119                    let store = FileCardStore::new(pp);
5120                    create_with_store(
5121                        &store,
5122                        json!({
5123                            "card_id": format!("concur_card_{i}"),
5124                            "pkg": { "name": "concur_pkg" },
5125                        }),
5126                    )
5127                    .unwrap()
5128                    .0
5129                }));
5130            }
5131            let ids: Vec<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5132            assert_eq!(ids.len(), 4);
5133            for id in &ids {
5134                let p = sub.path().join("concur_pkg").join(format!("{id}.toml"));
5135                assert!(p.exists(), "subscriber must have card {id}");
5136            }
5137            let snap = bus.stats().snapshot();
5138            let row = snap
5139                .iter()
5140                .find(|r| r.sink == fs_sub.describe())
5141                .expect("row");
5142            let ok_total: u64 = row.ok.values().sum();
5143            assert_eq!(
5144                ok_total, 4,
5145                "subscriber must have recorded 4 successful deliveries"
5146            );
5147        });
5148    }
5149
5150    // ─── card_sink_backfill (Subtask 3) ────────────────────────
5151
5152    /// Primary-side fixture with N cards already written via the
5153    /// subscriber-free path so backfill has something to push.
5154    /// Returns (primary_dir_guard, store, card_ids).
5155    fn backfill_primary_with_cards(
5156        pkg: &str,
5157        count: usize,
5158    ) -> (tempfile::TempDir, FileCardStore, Vec<String>) {
5159        let primary = tempfile::tempdir().unwrap();
5160        let store = FileCardStore::new(primary.path().to_path_buf());
5161        let mut ids = Vec::new();
5162        for i in 0..count {
5163            let (id, _) = create_with_store(
5164                &store,
5165                json!({
5166                    "card_id": format!("{pkg}_{i}"),
5167                    "pkg": { "name": pkg },
5168                }),
5169            )
5170            .unwrap();
5171            ids.push(id);
5172        }
5173        (primary, store, ids)
5174    }
5175
5176    #[test]
5177    fn backfill_pushes_missing_cards() {
5178        let sub_dir = tempfile::tempdir().unwrap();
5179        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5180        let uri = fs_sub.describe();
5181        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5182            // Populate primary before the subscriber is live (temporarily drop it).
5183            let bus = event_bus();
5184            bus.replace_subscribers_for_test(Vec::new());
5185            let (_primary, store, ids) = backfill_primary_with_cards("backfill_push_pkg", 2);
5186            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5187
5188            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5189            assert_eq!(report.pushed.len(), 2);
5190            assert_eq!(report.skipped.len(), 0);
5191            assert!(report.failed.is_empty());
5192            for id in &ids {
5193                let p = sub_dir
5194                    .path()
5195                    .join("backfill_push_pkg")
5196                    .join(format!("{id}.toml"));
5197                assert!(p.exists(), "card {id} must exist on subscriber");
5198            }
5199        });
5200    }
5201
5202    #[test]
5203    fn backfill_skips_existing_on_subscriber() {
5204        let sub_dir = tempfile::tempdir().unwrap();
5205        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5206        let uri = fs_sub.describe();
5207        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5208            // Subscriber is live during create, so it already has the card.
5209            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_skip_pkg", 3);
5210            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5211            assert_eq!(report.pushed.len(), 0);
5212            assert_eq!(report.skipped.len(), 3);
5213            assert!(report.failed.is_empty());
5214        });
5215    }
5216
5217    #[test]
5218    fn backfill_dry_run_no_writes() {
5219        let sub_dir = tempfile::tempdir().unwrap();
5220        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5221        let uri = fs_sub.describe();
5222        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5223            let bus = event_bus();
5224            bus.replace_subscribers_for_test(Vec::new());
5225            let (_primary, store, ids) = backfill_primary_with_cards("backfill_dry_pkg", 2);
5226            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5227
5228            let report = card_sink_backfill_with_store(&store, &uri, true).unwrap();
5229            assert_eq!(
5230                report.pushed.len(),
5231                2,
5232                "pushed must list ids even in dry run"
5233            );
5234            for id in &ids {
5235                let p = sub_dir
5236                    .path()
5237                    .join("backfill_dry_pkg")
5238                    .join(format!("{id}.toml"));
5239                assert!(!p.exists(), "dry run must NOT write card {id}");
5240            }
5241            // Stats must remain zero — dry run publishes nothing.
5242            let snap = bus.stats().snapshot();
5243            if let Some(row) = snap.iter().find(|r| r.sink == uri) {
5244                let total: u64 = row.ok.values().sum::<u64>() + row.err.values().sum::<u64>();
5245                assert_eq!(total, 0, "dry run must not touch stats");
5246            }
5247        });
5248    }
5249
5250    #[test]
5251    fn backfill_drifted_card_skipped_not_overwritten() {
5252        let sub_dir = tempfile::tempdir().unwrap();
5253        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5254        let uri = fs_sub.describe();
5255        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5256            let bus = event_bus();
5257            bus.replace_subscribers_for_test(Vec::new());
5258            let (_primary, store, ids) = backfill_primary_with_cards("backfill_drift_pkg", 1);
5259            let id = &ids[0];
5260
5261            // Manually place a drifted copy on the subscriber with sentinel text.
5262            let sub_card_dir = sub_dir.path().join("backfill_drift_pkg");
5263            fs::create_dir_all(&sub_card_dir).unwrap();
5264            let sub_card = sub_card_dir.join(format!("{id}.toml"));
5265            fs::write(&sub_card, "drifted=true\n").unwrap();
5266
5267            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5268            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5269            assert_eq!(report.skipped, vec![id.clone()]);
5270            assert!(report.pushed.is_empty());
5271            let after = fs::read_to_string(&sub_card).unwrap();
5272            assert_eq!(after, "drifted=true\n", "drifted copy must be preserved");
5273        });
5274    }
5275
5276    #[test]
5277    fn backfill_includes_samples() {
5278        let sub_dir = tempfile::tempdir().unwrap();
5279        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5280        let uri = fs_sub.describe();
5281        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5282            let bus = event_bus();
5283            bus.replace_subscribers_for_test(Vec::new());
5284            let (_primary, store, ids) = backfill_primary_with_cards("backfill_samples_pkg", 1);
5285            let id = &ids[0];
5286            write_samples_with_store(&store, id, vec![json!({ "case": "c0" })]).unwrap();
5287            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5288
5289            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5290            assert_eq!(report.pushed, vec![id.clone()]);
5291            assert_eq!(report.pushed_samples, vec![id.clone()]);
5292            let sub_samples = sub_dir
5293                .path()
5294                .join("backfill_samples_pkg")
5295                .join(format!("{id}.samples.jsonl"));
5296            assert!(sub_samples.exists());
5297            assert!(fs::read_to_string(&sub_samples).unwrap().contains("c0"));
5298        });
5299    }
5300
5301    #[test]
5302    fn backfill_unknown_sink_err() {
5303        with_bus_subscribers(Vec::new(), |_bus| {
5304            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_unknown_pkg", 1);
5305            let err = card_sink_backfill_with_store(&store, "file:///nonexistent/sink", false)
5306                .unwrap_err();
5307            assert!(
5308                err.starts_with("unknown sink"),
5309                "must reject unregistered sink; got: {err}"
5310            );
5311        });
5312    }
5313
5314    #[test]
5315    fn backfill_bypasses_bus_fanout() {
5316        // Subscriber A is already in-sync; Subscriber B is the backfill target.
5317        // Backfilling B must NOT re-deliver Created events to A.
5318        let sub_a_dir = tempfile::tempdir().unwrap();
5319        let sub_b_dir = tempfile::tempdir().unwrap();
5320        let fa = Arc::new(FileCardSubscriber::new(sub_a_dir.path().to_path_buf()));
5321        let fb = Arc::new(FileCardSubscriber::new(sub_b_dir.path().to_path_buf()));
5322        let uri_b = fb.describe();
5323        with_bus_subscribers(
5324            vec![
5325                fa.clone() as Arc<dyn CardSubscriber>,
5326                fb.clone() as Arc<dyn CardSubscriber>,
5327            ],
5328            |bus| {
5329                // Populate primary with subscriber A live (B temporarily absent).
5330                bus.replace_subscribers_for_test(vec![fa.clone()]);
5331                let (_primary, store, _ids) = backfill_primary_with_cards("backfill_bypass_pkg", 2);
5332                // Capture A's ok[created] count before backfill.
5333                let before = bus
5334                    .stats()
5335                    .snapshot()
5336                    .into_iter()
5337                    .find(|r| r.sink == fa.describe())
5338                    .map(|r| r.ok.get("created").copied().unwrap_or(0))
5339                    .unwrap_or(0);
5340                // Now reinstall both subscribers and backfill only B.
5341                bus.replace_subscribers_for_test(vec![fa.clone(), fb.clone()]);
5342                card_sink_backfill_with_store(&store, &uri_b, false).unwrap();
5343                let after = bus
5344                    .stats()
5345                    .snapshot()
5346                    .into_iter()
5347                    .find(|r| r.sink == fa.describe())
5348                    .map(|r| r.ok.get("created").copied().unwrap_or(0))
5349                    .unwrap_or(0);
5350                assert_eq!(
5351                    before, after,
5352                    "backfill target B must not cause fan-out to subscriber A"
5353                );
5354            },
5355        );
5356    }
5357
5358    #[test]
5359    fn backfill_updates_subscriber_stats() {
5360        let sub_dir = tempfile::tempdir().unwrap();
5361        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5362        let uri = fs_sub.describe();
5363        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5364            bus.replace_subscribers_for_test(Vec::new());
5365            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_stats_pkg", 2);
5366            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5367
5368            card_sink_backfill_with_store(&store, &uri, false).unwrap();
5369            let snap = bus.stats().snapshot();
5370            let row = snap.iter().find(|r| r.sink == uri).expect("row");
5371            assert_eq!(
5372                row.ok.get("created").copied().unwrap_or(0),
5373                2,
5374                "backfill must increment ok[created] on the target sink"
5375            );
5376        });
5377    }
5378
5379    #[test]
5380    fn backfill_failure_records_err_stat() {
5381        // Subscriber whose on_event always fails (no filesystem needed).
5382        struct FailingSub {
5383            uri: String,
5384        }
5385        impl CardSubscriber for FailingSub {
5386            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
5387                Err("synthetic backfill failure".into())
5388            }
5389            fn has_card(&self, _card_id: &str) -> Result<bool, String> {
5390                Ok(false)
5391            }
5392            fn describe(&self) -> String {
5393                self.uri.clone()
5394            }
5395        }
5396        let uri = "mock://backfill-fail".to_string();
5397        let failing: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5398        with_bus_subscribers(vec![failing], |bus| {
5399            bus.replace_subscribers_for_test(Vec::new());
5400            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_fail_pkg", 1);
5401            // Reinstall the failing subscriber for the backfill phase.
5402            let reinstall: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5403            bus.replace_subscribers_for_test(vec![reinstall]);
5404
5405            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5406            assert_eq!(
5407                report.failed.len(),
5408                1,
5409                "failed must record the synthetic err"
5410            );
5411            assert!(report.pushed.is_empty());
5412            let snap = bus.stats().snapshot();
5413            let row = snap.iter().find(|r| r.sink == uri).expect("row");
5414            assert!(
5415                row.err.get("created").copied().unwrap_or(0) >= 1,
5416                "failing publish must increment err[created]"
5417            );
5418            assert!(row.last_error.is_some());
5419        });
5420    }
5421
5422    #[test]
5423    fn test_oncelock_set_after_init_returns_err() {
5424        // Force init (no-op if already initialized by a prior test).
5425        let _ = event_bus();
5426        let result = install_event_bus_for_test(CardEventBus::new(Vec::new()));
5427        assert!(
5428            result.is_err(),
5429            "install after init must return Err per OnceLock contract"
5430        );
5431        assert_eq!(result.unwrap_err(), "bus already initialized");
5432    }
5433}