Skip to main content

algocline_engine/
card.rs

1//! Card storage — immutable run-result snapshots.
2//!
3//! A Card is a frozen record of a strategy run: identity, parameters,
4//! model, scenario, aggregate stats, and (optionally) per-case detail.
5//! Cards are **immutable** — once written they are never modified, only
6//! annotated via additive `append`.  Mutable **aliases** point to a
7//! Card and can be rebound freely.
8//!
9//! ## Design principles
10//!
11//! 1. **Minimal REQUIRED, maximal OPTIONAL** — v0 needs only 4 fields;
12//!    lightweight "ran this pkg" records and heavy optimize snapshots
13//!    share the same schema.
14//! 2. **Immutable append-only** — no overwrite, no delete.  New data is
15//!    added via `append` (new top-level keys only) or by creating a new
16//!    Card with a fresh `card_id`.
17//! 3. **Two-tier storage** — TOML for human-readable aggregate, JSONL
18//!    sidecar for machine-parseable per-case detail.
19//! 4. **File-primary** — files are the source of truth; in-memory state
20//!    is cache.  Cards can be copied, diffed, and version-controlled.
21//!
22//! ## Storage layout (two-tier)
23//!
24//! | Tier | File | Content |
25//! |------|------|---------|
26//! | **Tier 1** | `~/.algocline/cards/{pkg}/{card_id}.toml` | Aggregate scalars, decisions, identity, params |
27//! | **Tier 2** | `~/.algocline/cards/{pkg}/{card_id}.samples.jsonl` | Per-case raw data (JSONL, write-once) |
28//!
29//! Tier 1 holds a shareable summary (a few KB). Tier 2 holds per-case
30//! detail ��� the engine does not interpret its columns; packages define
31//! their own schema.
32//!
33//! Alias table: `~/.algocline/cards/_aliases.toml` (global).
34//!
35//! ## card_id naming
36//!
37//! `{pkg}_{model_short}_{compact_ts}_{hash6}`
38//!
39//! - `compact_ts`: `YYYYMMDDTHHMMSS` in UTC
40//! - `hash6`: first 6 hex chars of DJB2 param fingerprint
41//! - Example: `cot_opus46_20260412T061500_a3f9c1`
42//!
43//! ## v0 schema (frozen)
44//!
45//! ### REQUIRED (minimum valid Card)
46//!
47//! | Field | Type | Example |
48//! |-------|------|---------|
49//! | `schema_version` | string | `"card/v0"` |
50//! | `card_id` | string | `"cot_opus46_20260412T061500_a3f9c1"` |
51//! | `created_at` | string (RFC 3339) | `"2026-04-12T06:15:00Z"` |
52//! | `[pkg].name` | string | `"cot"` |
53//!
54//! ### OPTIONAL (auto-injected where possible)
55//!
56//! | Section | Fields |
57//! |---------|--------|
58//! | `[pkg]` | `version`, `category`, `source`, `source_ref`, `source_sha` |
59//! | `[runtime]` | `alc_version`, `lua_version`, `host_os`, `git_sha` |
60//! | `[model]` | `provider`, `id`, `id_short`, `cutoff` |
61//! | `[params]` | Free-form ctx snapshot; `param_fingerprint` for DJB2 hash |
62//! | `[strategy_params]` | Strategy-tunable parameters surfaced for sweeps / optimizers (e.g. `alpha`, `temperature`, `depth`). Free-form, but `where`-queryable as a first-class section |
63//! | `[scenario]` | `name`, `source`, `case_count`, `grader` |
64//! | `[stats]` | `pass_rate`, `mean_score`, `std`, `median`, `min`, `max`, `n` |
65//! | `[stats.by_bucket]` | Disaggregated sub-bucket stats (array of tables) |
66//! | `[cost]` | `llm_calls`, `input_tokens`, `output_tokens`, `elapsed_ms`, `usd_estimate` |
67//! | `[optimize]` | `target`, `search`, `rounds_used`, `top_k` (for optimize Cards) |
68//! | `[metadata]` | Free-form escape hatch. Recognized lineage conventions: `prior_card_id` (parent Card id), `prior_relation` (relation kind, e.g. `"sweep_variant"`, `"reflection_of"`, `"derived_from"`) |
69//!
70//! ## Lua API (`alc.card.*`)
71//!
72//! | Function | Description |
73//! |----------|-------------|
74//! | `create(table)` | Write new Card (Tier 1). Returns `{ card_id, path }` |
75//! | `get(card_id)` | Read Card by id. Returns table or nil |
76//! | `list(filter?)` | List Cards as summaries (newest first) |
77//! | `find(query?)` | Prisma-style `where` DSL + dotted-path `order_by` + `offset`/`limit` |
78//! | `append(card_id, fields)` | Additive-only annotation (new keys only) |
79//! | `alias_set(name, card_id, opts?)` | Pin mutable alias |
80//! | `alias_list(filter?)` | List aliases |
81//! | `get_by_alias(name)` | Resolve alias → full Card |
82//! | `write_samples(card_id, samples)` | Write Tier 2 sidecar (write-once) |
83//! | `read_samples(card_id, opts?)` | Read Tier 2 with `where` filtering + offset/limit paging |
84//! | `lineage(query)` | Walk ancestry/descendants via `metadata.prior_card_id` |
85
86use std::collections::{HashMap, HashSet};
87use std::fs;
88use std::path::{Path, PathBuf};
89use std::process;
90use std::sync::atomic::{AtomicU64, Ordering};
91use std::sync::{Arc, Mutex, OnceLock};
92
93use serde::{Serialize, Serializer};
94use serde_json::{json, Value as Json};
95
96pub const SCHEMA_VERSION: &str = "card/v0";
97
98// ═══════════════════════════════════════════════════════════════
99// CardStore trait — physical I/O abstraction.
100// ═══════════════════════════════════════════════════════════════
101//
102// Card domain logic (schema / Query DSL / Lineage) is backend-
103// neutral. Only the physical read/write layer is swappable.
104//
105// The default backend is `FileCardStore`, which preserves the
106// legacy `~/.algocline/cards/{pkg}/{card_id}.toml` layout
107// byte-for-byte. Alternative backends (PathCardStore, SqliteCardStore,
108// MemoryCardStore) can be added by implementing this trait.
109//
110// Locators are `PathBuf` values. For FileCardStore they are real
111// filesystem paths; for non-file backends they are synthetic paths
112// (e.g. `sqlite:///db.sqlite#card/{id}`) — the value is opaque to
113// the domain layer and only exposed via the Lua `alc.card.create`
114// / `alc.card.write_samples` return values.
115
116/// Storage backend for Cards.
117///
118/// Implementations must be `Send + Sync` so that they can be shared
119/// across Lua host threads safely. All methods may fail with an
120/// error `String` describing the backend-specific failure.
121pub trait CardStore: Send + Sync {
122    // ─── Card CRUD ─────────────────────────────────────────────
123
124    /// Write a new Card (Tier 1 TOML).
125    ///
126    /// The caller has already:
127    ///   - validated `pkg` and `card_id` via [`validate_name`]
128    ///   - serialized `toml_text` with `toml::to_string_pretty`
129    ///
130    /// Fails if a Card with the same id already exists
131    /// (immutability).  Returns the locator of the written Card.
132    fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
133
134    /// Overwrite an existing Card (append flow).
135    ///
136    /// Append is additive-only w.r.t. keys, but the underlying
137    /// TOML file is rewritten in place; callers must have validated
138    /// the additive-only constraint before calling this.
139    fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
140
141    /// Locate a Card file by id. Returns `None` if not found.
142    fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String>;
143
144    /// Read a Card's raw TOML text by id. Returns `None` if missing.
145    fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String>;
146
147    /// List `(pkg, locator)` pairs for every Card file in the store.
148    ///
149    /// When `pkg_filter` is `Some(name)`, restrict to that pkg
150    /// subdir. Non-existent pkg subdir yields an empty Vec.
151    ///
152    /// Order is implementation-defined — callers sort explicitly.
153    fn list_card_locators(
154        &self,
155        pkg_filter: Option<&str>,
156    ) -> Result<Vec<(String, PathBuf)>, String>;
157
158    /// Read raw TOML text from a locator returned by
159    /// [`Self::list_card_locators`]. `Ok(None)` on read failure so
160    /// scans can skip corrupt files without aborting.
161    fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String>;
162
163    // ─── Alias table ───────────────────────────────────────────
164
165    fn read_aliases(&self) -> Result<Vec<Alias>, String>;
166    fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String>;
167
168    // ─── Samples sidecar ───────────────────────────────────────
169
170    /// Check whether a samples sidecar exists for `card_id`.
171    fn samples_exists(&self, card_id: &str) -> Result<bool, String>;
172
173    /// Write a samples sidecar (write-once).
174    ///
175    /// `jsonl_text` is the complete JSONL payload (one JSON line
176    /// per sample, `\n`-terminated). Fails if a sidecar already
177    /// exists. Returns the locator.
178    fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String>;
179
180    /// Read a samples sidecar as raw JSONL text. Returns `None`
181    /// when no sidecar exists (samples are optional).
182    fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String>;
183
184    // ─── Import ────────────────────────────────────────────────
185
186    /// Import Card files from `source_dir` into the store under
187    /// `pkg`. First-writer wins (existing Cards are skipped).
188    /// Returns `(imported, skipped)` card_id lists.
189    fn import_from_dir(
190        &self,
191        source_dir: &Path,
192        pkg: &str,
193    ) -> Result<(Vec<String>, Vec<String>), String>;
194}
195
196/// Return the default backend (File-backed, `~/.algocline/cards/`).
197fn default_store() -> Result<FileCardStore, String> {
198    FileCardStore::from_home()
199}
200
201/// Resolve the cards root directory, creating it if needed.
202fn cards_dir() -> Result<PathBuf, String> {
203    let home = dirs::home_dir().ok_or("Cannot determine home directory")?;
204    let dir = home.join(".algocline").join("cards");
205    if !dir.exists() {
206        fs::create_dir_all(&dir).map_err(|e| format!("Failed to create cards dir: {e}"))?;
207    }
208    Ok(dir)
209}
210
211fn validate_name(name: &str, kind: &str) -> Result<(), String> {
212    if name.is_empty()
213        || name.contains('/')
214        || name.contains('\\')
215        || name.contains("..")
216        || name.contains('\0')
217    {
218        return Err(format!("Invalid {kind} name: '{name}'"));
219    }
220    Ok(())
221}
222
223/// DJB2 hash, hex-encoded. Used for param_fingerprint and card_id hash segment.
224fn djb2_hex(s: &str) -> String {
225    let mut h: u64 = 5381;
226    for b in s.bytes() {
227        h = h.wrapping_mul(33).wrapping_add(b as u64);
228    }
229    format!("{h:016x}")
230}
231
232/// Short-hash: last 6 hex chars of djb2.
233///
234/// DJB2's high bits are dominated by the `5381 * 33^n` term (same for any
235/// input of equal length), so the top hex digits collide easily for same-
236/// length inputs that differ only in a few byte positions. The low bits,
237/// driven by the most-recent bytes, mix well enough for short-hash use.
238fn hash6(s: &str) -> String {
239    let hex = djb2_hex(s);
240    let start = hex.len().saturating_sub(6);
241    hex[start..].to_string()
242}
243
244/// Stable serialization of a JSON value for hashing (sorted keys).
245fn stable_json(v: &Json) -> String {
246    let mut buf = String::new();
247    stable_json_into(v, &mut buf);
248    buf
249}
250fn stable_json_into(v: &Json, buf: &mut String) {
251    match v {
252        Json::Null => buf.push_str("null"),
253        Json::Bool(b) => buf.push_str(if *b { "true" } else { "false" }),
254        Json::Number(n) => buf.push_str(&n.to_string()),
255        Json::String(s) => {
256            buf.push('"');
257            buf.push_str(s);
258            buf.push('"');
259        }
260        Json::Array(a) => {
261            buf.push('[');
262            for (i, item) in a.iter().enumerate() {
263                if i > 0 {
264                    buf.push(',');
265                }
266                stable_json_into(item, buf);
267            }
268            buf.push(']');
269        }
270        Json::Object(m) => {
271            let mut keys: Vec<&String> = m.keys().collect();
272            keys.sort();
273            buf.push('{');
274            for (i, k) in keys.iter().enumerate() {
275                if i > 0 {
276                    buf.push(',');
277                }
278                buf.push('"');
279                buf.push_str(k);
280                buf.push_str("\":");
281                stable_json_into(&m[*k], buf);
282            }
283            buf.push('}');
284        }
285    }
286}
287
288/// Derive a short model id (e.g. "claude-opus-4-6" -> "opus46").
289/// v0: best-effort. Falls back to "model" if input is empty.
290fn short_model(id: &str) -> String {
291    if id.is_empty() {
292        return "model".into();
293    }
294    // Strip common vendor prefixes.
295    let stripped = id
296        .strip_prefix("claude-")
297        .or_else(|| id.strip_prefix("gpt-"))
298        .unwrap_or(id);
299    // Keep alnum only.
300    let s: String = stripped
301        .chars()
302        .filter(|c| c.is_ascii_alphanumeric())
303        .collect();
304    if s.is_empty() {
305        "model".into()
306    } else {
307        s
308    }
309}
310
311/// RFC3339 UTC "YYYY-MM-DDTHH:MM:SSZ" from current system time.
312fn now_rfc3339() -> String {
313    let secs = std::time::SystemTime::now()
314        .duration_since(std::time::UNIX_EPOCH)
315        .map(|d| d.as_secs())
316        .unwrap_or(0) as i64;
317    let days = secs.div_euclid(86400);
318    let tod = secs.rem_euclid(86400);
319    let (y, mo, d) = civil_from_days(days);
320    let hh = tod / 3600;
321    let mm = (tod % 3600) / 60;
322    let ss = tod % 60;
323    format!("{y:04}-{mo:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
324}
325
326/// YYYYMMDDTHHMMSS for current UTC time (compact, sortable).
327///
328/// Used in card_id generation so that:
329///   - rapid successive runs don't collide on the hash6 segment
330///   - string sort of card_id = chronological order
331fn now_compact() -> String {
332    let secs = std::time::SystemTime::now()
333        .duration_since(std::time::UNIX_EPOCH)
334        .map(|d| d.as_secs())
335        .unwrap_or(0) as i64;
336    let days = secs.div_euclid(86400);
337    let tod = secs.rem_euclid(86400);
338    let (y, mo, d) = civil_from_days(days);
339    let hh = tod / 3600;
340    let mm = (tod % 3600) / 60;
341    let ss = tod % 60;
342    format!("{y:04}{mo:02}{d:02}T{hh:02}{mm:02}{ss:02}")
343}
344
345/// Howard Hinnant's civil_from_days algorithm.
346fn civil_from_days(z: i64) -> (i32, u32, u32) {
347    let z = z + 719468;
348    let era = if z >= 0 { z } else { z - 146096 } / 146097;
349    let doe = (z - era * 146097) as u64;
350    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
351    let y = yoe as i64 + era * 400;
352    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
353    let mp = (5 * doy + 2) / 153;
354    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
355    let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
356    let y = y + if m <= 2 { 1 } else { 0 };
357    (y as i32, m, d)
358}
359
360/// Converter: serde_json::Value -> toml::Value.
361/// Nulls are dropped (TOML has no null). Mixed-type arrays are allowed in TOML 1.0+.
362fn json_to_toml(v: Json) -> Result<toml::Value, String> {
363    Ok(match v {
364        Json::Null => return Err("TOML does not support null values".into()),
365        Json::Bool(b) => toml::Value::Boolean(b),
366        Json::Number(n) => {
367            if let Some(i) = n.as_i64() {
368                toml::Value::Integer(i)
369            } else if let Some(f) = n.as_f64() {
370                toml::Value::Float(f)
371            } else {
372                return Err(format!("Unsupported number: {n}"));
373            }
374        }
375        Json::String(s) => toml::Value::String(s),
376        Json::Array(a) => {
377            let mut out = Vec::with_capacity(a.len());
378            for item in a {
379                if !item.is_null() {
380                    out.push(json_to_toml(item)?);
381                }
382            }
383            toml::Value::Array(out)
384        }
385        Json::Object(m) => {
386            let mut table = toml::map::Map::new();
387            for (k, val) in m {
388                if val.is_null() {
389                    continue;
390                }
391                table.insert(k, json_to_toml(val)?);
392            }
393            toml::Value::Table(table)
394        }
395    })
396}
397
398/// Converter: toml::Value -> serde_json::Value (for alc.card.get()).
399fn toml_to_json(v: toml::Value) -> Json {
400    match v {
401        toml::Value::String(s) => Json::String(s),
402        toml::Value::Integer(i) => json!(i),
403        toml::Value::Float(f) => json!(f),
404        toml::Value::Boolean(b) => Json::Bool(b),
405        toml::Value::Datetime(dt) => Json::String(dt.to_string()),
406        toml::Value::Array(a) => Json::Array(a.into_iter().map(toml_to_json).collect()),
407        toml::Value::Table(t) => {
408            let mut m = serde_json::Map::new();
409            for (k, v) in t {
410                m.insert(k, toml_to_json(v));
411            }
412            Json::Object(m)
413        }
414    }
415}
416
417/// Extract [pkg].name from an input JSON object. REQUIRED.
418fn require_pkg_name(input: &Json) -> Result<String, String> {
419    let name = input
420        .get("pkg")
421        .and_then(|p| p.get("name"))
422        .and_then(|n| n.as_str())
423        .ok_or_else(|| "alc.card.create: pkg.name is required".to_string())?
424        .to_string();
425    validate_name(&name, "pkg")?;
426    Ok(name)
427}
428
429/// Main create entry. Returns (card_id, absolute_path).
430pub fn create(input: Json) -> Result<(String, PathBuf), String> {
431    create_with_store(&default_store()?, input)
432}
433
434/// Create a new Card backed by `store`. See [`create`] for the default-store variant.
435pub fn create_with_store(
436    store: &dyn CardStore,
437    mut input: Json,
438) -> Result<(String, PathBuf), String> {
439    if !input.is_object() {
440        return Err("alc.card.create: input must be a table".into());
441    }
442    let pkg_name = require_pkg_name(&input)?;
443    let obj = input.as_object_mut().unwrap();
444
445    // ─── Auto-inject REQUIRED fields ──────────────────────────
446    obj.entry("schema_version".to_string())
447        .or_insert_with(|| json!(SCHEMA_VERSION));
448    obj.entry("created_at".to_string())
449        .or_insert_with(|| json!(now_rfc3339()));
450    obj.entry("created_by".to_string())
451        .or_insert_with(|| json!(format!("alc@{}", env!("CARGO_PKG_VERSION"))));
452
453    // ─── param_fingerprint (if [params] present) ──────────────
454    if let Some(params) = obj.get("params").cloned() {
455        if params.is_object() {
456            let fp = djb2_hex(&stable_json(&params));
457            obj.insert("param_fingerprint".to_string(), json!(fp));
458        }
459    }
460
461    // ─── card_id generation (if absent) ───────────────────────
462    let card_id = match obj.get("card_id").and_then(|v| v.as_str()) {
463        Some(id) if !id.is_empty() => id.to_string(),
464        _ => {
465            let model_id = obj
466                .get("model")
467                .and_then(|m| m.get("id"))
468                .and_then(|v| v.as_str())
469                .unwrap_or("");
470            let model_short = short_model(model_id);
471            let ts = now_compact();
472            let fp_seed = stable_json(&Json::Object(obj.clone()));
473            let h = hash6(&fp_seed);
474            format!("{pkg_name}_{model_short}_{ts}_{h}")
475        }
476    };
477    validate_name(&card_id, "card_id")?;
478    obj.insert("card_id".to_string(), json!(card_id.clone()));
479
480    let toml_val = json_to_toml(input)?;
481    let text = toml::to_string_pretty(&toml_val)
482        .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
483    let path = store.write_new_card(&pkg_name, &card_id, &text)?;
484
485    publish(CardEvent::Created {
486        pkg: pkg_name.clone(),
487        card_id: card_id.clone(),
488        toml_text: text,
489    });
490
491    Ok((card_id, path))
492}
493
494/// Read a Card by id. Returns None if not found.
495pub fn get(card_id: &str) -> Result<Option<Json>, String> {
496    get_with_store(&default_store()?, card_id)
497}
498
499/// Read a Card from `store`. See [`get`] for the default-store variant.
500pub fn get_with_store(store: &dyn CardStore, card_id: &str) -> Result<Option<Json>, String> {
501    let text = match store.read_card_text(card_id)? {
502        Some(t) => t,
503        None => return Ok(None),
504    };
505    let val: toml::Value =
506        toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
507    Ok(Some(toml_to_json(val)))
508}
509
510/// Summary row for `alc.card.list()`.
511#[derive(Debug, Clone)]
512pub struct Summary {
513    pub card_id: String,
514    pub pkg: String,
515    pub created_at: Option<String>,
516    pub model: Option<String>,
517    pub scenario: Option<String>,
518    pub pass_rate: Option<f64>,
519}
520
521impl Summary {
522    fn to_json(&self) -> Json {
523        let mut m = serde_json::Map::new();
524        m.insert("card_id".into(), json!(self.card_id));
525        m.insert("pkg".into(), json!(self.pkg));
526        if let Some(v) = &self.created_at {
527            m.insert("created_at".into(), json!(v));
528        }
529        if let Some(v) = &self.model {
530            m.insert("model".into(), json!(v));
531        }
532        if let Some(v) = &self.scenario {
533            m.insert("scenario".into(), json!(v));
534        }
535        if let Some(v) = self.pass_rate {
536            m.insert("pass_rate".into(), json!(v));
537        }
538        Json::Object(m)
539    }
540}
541
542fn summarize(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<Summary> {
543    let text = store.read_locator_text(locator).ok().flatten()?;
544    let val: toml::Value = toml::from_str(&text).ok()?;
545    let card_id = val
546        .get("card_id")
547        .and_then(|v| v.as_str())
548        .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
549        .to_string();
550    let created_at = val
551        .get("created_at")
552        .and_then(|v| v.as_str())
553        .map(String::from);
554    let model = val
555        .get("model")
556        .and_then(|m| m.get("id"))
557        .and_then(|v| v.as_str())
558        .map(String::from);
559    let scenario = val
560        .get("scenario")
561        .and_then(|s| s.get("name"))
562        .and_then(|v| v.as_str())
563        .map(String::from);
564    let pass_rate = val
565        .get("stats")
566        .and_then(|s| s.get("pass_rate"))
567        .and_then(|v| v.as_float());
568    Some(Summary {
569        card_id,
570        pkg: pkg.to_string(),
571        created_at,
572        model,
573        scenario,
574        pass_rate,
575    })
576}
577
578/// List cards. `pkg_filter = Some("name")` restricts to that pkg subdir.
579pub fn list(pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
580    list_with_store(&default_store()?, pkg_filter)
581}
582
583/// List cards from `store`. See [`list`] for the default-store variant.
584pub fn list_with_store(
585    store: &dyn CardStore,
586    pkg_filter: Option<&str>,
587) -> Result<Vec<Summary>, String> {
588    let locators = store.list_card_locators(pkg_filter)?;
589    let mut out = Vec::with_capacity(locators.len());
590    for (pkg, loc) in &locators {
591        if let Some(s) = summarize(store, loc, pkg) {
592            out.push(s);
593        }
594    }
595
596    // Sort newest first. card_id embeds a compact UTC timestamp so it's
597    // naturally chronological; we still prefer created_at when present
598    // (some callers may override it), falling back to card_id.
599    out.sort_by(|a, b| {
600        b.created_at
601            .cmp(&a.created_at)
602            .then_with(|| b.card_id.cmp(&a.card_id))
603    });
604    Ok(out)
605}
606
607pub fn summaries_to_json(rows: &[Summary]) -> Json {
608    Json::Array(rows.iter().map(|s| s.to_json()).collect())
609}
610
611// ───────────────────────────────────────────────────────────────
612// P1 API: append / alias_{set,list} / find
613// ───────────────────────────────────────────────────────────────
614
615/// Append new top-level fields to an existing Card.
616///
617/// Semantics: **additive only**. If any top-level key in `fields` already
618/// exists in the Card, the call fails — Cards are immutable w.r.t. existing
619/// data. New top-level keys are inserted and the Card file is rewritten
620/// atomically.
621///
622/// Returns the merged Card JSON.
623pub fn append(card_id: &str, fields: Json) -> Result<Json, String> {
624    append_with_store(&default_store()?, card_id, fields)
625}
626
627/// Append to a Card in `store`. See [`append`] for the default-store variant.
628pub fn append_with_store(
629    store: &dyn CardStore,
630    card_id: &str,
631    fields: Json,
632) -> Result<Json, String> {
633    let text = store
634        .read_card_text(card_id)?
635        .ok_or_else(|| format!("alc.card.append: card '{card_id}' not found"))?;
636    let fields_obj = match fields {
637        Json::Object(m) => m,
638        _ => return Err("alc.card.append: fields must be a table".into()),
639    };
640
641    let existing: toml::Value =
642        toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
643    let mut existing_json = toml_to_json(existing);
644    let existing_obj = existing_json
645        .as_object_mut()
646        .ok_or_else(|| format!("Card '{card_id}' is not a table"))?;
647
648    for (k, v) in fields_obj {
649        if existing_obj.contains_key(&k) {
650            return Err(format!(
651                "alc.card.append: key '{k}' already set on card '{card_id}' (immutable)"
652            ));
653        }
654        if !v.is_null() {
655            existing_obj.insert(k, v);
656        }
657    }
658
659    let toml_val = json_to_toml(existing_json.clone())?;
660    let text = toml::to_string_pretty(&toml_val)
661        .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
662    store.overwrite_card(card_id, &text)?;
663
664    publish(CardEvent::Appended {
665        card_id: card_id.to_string(),
666        toml_text: text,
667    });
668
669    Ok(existing_json)
670}
671
672#[derive(Debug, Clone)]
673pub struct Alias {
674    pub name: String,
675    pub card_id: String,
676    pub pkg: Option<String>,
677    pub set_at: String,
678    pub note: Option<String>,
679}
680
681impl Alias {
682    fn to_json(&self) -> Json {
683        let mut m = serde_json::Map::new();
684        m.insert("name".into(), json!(self.name));
685        m.insert("card_id".into(), json!(self.card_id));
686        if let Some(p) = &self.pkg {
687            m.insert("pkg".into(), json!(p));
688        }
689        m.insert("set_at".into(), json!(self.set_at));
690        if let Some(n) = &self.note {
691            m.insert("note".into(), json!(n));
692        }
693        Json::Object(m)
694    }
695}
696
697/// Bind (or rebind) an alias to a Card.
698///
699/// Validates that `card_id` exists. If an alias with the same `name` already
700/// exists it is overwritten — the alias table is intentionally mutable even
701/// though the Cards themselves are not.
702pub fn alias_set(
703    name: &str,
704    card_id: &str,
705    pkg: Option<&str>,
706    note: Option<&str>,
707) -> Result<Alias, String> {
708    alias_set_with_store(&default_store()?, name, card_id, pkg, note)
709}
710
711/// Bind an alias in `store`. See [`alias_set`] for the default-store variant.
712pub fn alias_set_with_store(
713    store: &dyn CardStore,
714    name: &str,
715    card_id: &str,
716    pkg: Option<&str>,
717    note: Option<&str>,
718) -> Result<Alias, String> {
719    validate_name(name, "alias")?;
720    if store.find_card_locator(card_id)?.is_none() {
721        return Err(format!("alc.card.alias_set: card '{card_id}' not found"));
722    }
723    let mut aliases = store.read_aliases()?;
724    aliases.retain(|a| a.name != name);
725    let entry = Alias {
726        name: name.to_string(),
727        card_id: card_id.to_string(),
728        pkg: pkg.map(String::from),
729        set_at: now_rfc3339(),
730        note: note.map(String::from),
731    };
732    aliases.push(entry.clone());
733    store.write_aliases(&aliases)?;
734
735    // Mirror the full alias table to subscribers as TOML text. The
736    // primary FileCardStore already serialized it internally; we
737    // re-serialize here using the same shape so subscribers stay in
738    // byte-for-byte parity. A serialization failure is best-effort
739    // (log + skip).
740    match serialize_aliases_toml(&aliases) {
741        Ok(text) => publish(CardEvent::AliasesWritten { toml_text: text }),
742        Err(e) => tracing::warn!(error = %e, "alias_set: failed to serialize aliases for publish"),
743    }
744
745    Ok(entry)
746}
747
748/// Serialize `aliases` to a TOML document matching the primary
749/// `_aliases.toml` layout. Broken out so that subscribers can receive
750/// the exact byte-for-byte dump.
751fn serialize_aliases_toml(aliases: &[Alias]) -> Result<String, String> {
752    let mut arr = Vec::with_capacity(aliases.len());
753    for a in aliases {
754        let mut t = toml::map::Map::new();
755        t.insert("name".into(), toml::Value::String(a.name.clone()));
756        t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
757        if let Some(p) = &a.pkg {
758            t.insert("pkg".into(), toml::Value::String(p.clone()));
759        }
760        t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
761        if let Some(n) = &a.note {
762            t.insert("note".into(), toml::Value::String(n.clone()));
763        }
764        arr.push(toml::Value::Table(t));
765    }
766    let mut root = toml::map::Map::new();
767    root.insert("alias".into(), toml::Value::Array(arr));
768    toml::to_string_pretty(&toml::Value::Table(root))
769        .map_err(|e| format!("Failed to serialize aliases: {e}"))
770}
771
772/// Resolve an alias name to its bound Card and return the full Card JSON.
773///
774/// Shortcut for `alias_list → filter → get`. Returns `None` when the alias
775/// does not exist. Errors when the alias points at a missing Card — that
776/// would indicate a corrupt alias table (the target was deleted out of band).
777pub fn get_by_alias(name: &str) -> Result<Option<Json>, String> {
778    get_by_alias_with_store(&default_store()?, name)
779}
780
781/// Resolve an alias in `store`. See [`get_by_alias`] for the default-store variant.
782pub fn get_by_alias_with_store(store: &dyn CardStore, name: &str) -> Result<Option<Json>, String> {
783    validate_name(name, "alias")?;
784    let aliases = store.read_aliases()?;
785    let Some(alias) = aliases.into_iter().find(|a| a.name == name) else {
786        return Ok(None);
787    };
788    match get_with_store(store, &alias.card_id)? {
789        Some(card) => Ok(Some(card)),
790        None => Err(format!(
791            "alc.card.get_by_alias: alias '{name}' points at missing card '{}'",
792            alias.card_id
793        )),
794    }
795}
796
797/// List aliases, optionally filtered by pkg.
798pub fn alias_list(pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
799    alias_list_with_store(&default_store()?, pkg_filter)
800}
801
802/// List aliases from `store`. See [`alias_list`] for the default-store variant.
803pub fn alias_list_with_store(
804    store: &dyn CardStore,
805    pkg_filter: Option<&str>,
806) -> Result<Vec<Alias>, String> {
807    let mut aliases = store.read_aliases()?;
808    if let Some(p) = pkg_filter {
809        aliases.retain(|a| a.pkg.as_deref() == Some(p));
810    }
811    Ok(aliases)
812}
813
814pub fn aliases_to_json(rows: &[Alias]) -> Json {
815    Json::Array(rows.iter().map(|a| a.to_json()).collect())
816}
817
818// ═══════════════════════════════════════════════════════════════
819// Where DSL — Prisma/Mongo-style nested predicates
820// ═══════════════════════════════════════════════════════════════
821//
822// See `workspace/tasks/card-dsl/design.md` for the full spec.
823//
824// Syntax (JSON form, as received from Lua / MCP):
825//
826//   where = {
827//     pkg: "cot",                                      // implicit eq
828//     stats: { pass_rate: { gte: 0.8 }, n: { gte: 30 } },
829//     strategy_params: { temperature: { gte: 0.7 } },
830//     prior_card_id: { exists: true },
831//     _or: [ {...}, {...} ],                           // logical ops
832//     _not: { model: { id: "claude-haiku-4-5-20251001" } },
833//   }
834//
835// Semantics:
836//   * Multiple keys in the same object → implicit AND.
837//   * Nested object value → section (path extension).
838//   * Object whose every key is a reserved operator name → leaf operator
839//     object; applies the operators to the value at the current path.
840//   * Scalar/array value → implicit eq.
841//   * Reserved logical keys: `_and` / `_or` / `_not`.
842//   * Reserved operator keys: `eq ne lt lte gt gte in nin exists
843//     contains starts_with`.  Card schemas must not use these names as
844//     field names under any section.
845//
846// Missing-field comparison:
847//   * `eq/lt/lte/gt/gte/in/contains/starts_with` → false on missing
848//   * `ne/nin`                                   → true  on missing
849//   * `exists`                                   → explicit
850//
851
852/// Single comparison operator.
853#[derive(Debug, Clone, PartialEq)]
854pub enum CmpOp {
855    Eq,
856    Ne,
857    Lt,
858    Lte,
859    Gt,
860    Gte,
861    In,
862    Nin,
863    Exists,
864    Contains,
865    StartsWith,
866}
867
868impl CmpOp {
869    fn from_key(k: &str) -> Option<Self> {
870        Some(match k {
871            "eq" => Self::Eq,
872            "ne" => Self::Ne,
873            "lt" => Self::Lt,
874            "lte" => Self::Lte,
875            "gt" => Self::Gt,
876            "gte" => Self::Gte,
877            "in" => Self::In,
878            "nin" => Self::Nin,
879            "exists" => Self::Exists,
880            "contains" => Self::Contains,
881            "starts_with" => Self::StartsWith,
882            _ => return None,
883        })
884    }
885}
886
887/// One parsed comparison: `path` points at a nested field,
888/// `op` + `value` describe how to compare it.
889#[derive(Debug, Clone)]
890pub struct Comparison {
891    pub path: Vec<String>,
892    pub op: CmpOp,
893    pub value: Json,
894}
895
896/// Parsed predicate tree.
897#[derive(Debug, Clone)]
898pub enum Predicate {
899    And(Vec<Predicate>),
900    Or(Vec<Predicate>),
901    Not(Box<Predicate>),
902    Cmp(Comparison),
903}
904
905/// Is `obj` entirely composed of reserved operator keys?
906/// Empty objects return false (meaningless as an operator object).
907fn is_operator_object(obj: &serde_json::Map<String, Json>) -> bool {
908    if obj.is_empty() {
909        return false;
910    }
911    obj.keys().all(|k| CmpOp::from_key(k).is_some())
912}
913
914/// Parse a `where` JSON value into a `Predicate`.
915///
916/// `prefix` is the current nested-key path as we descend through
917/// section objects.
918pub fn parse_where(value: &Json) -> Result<Predicate, String> {
919    parse_predicate(value, &[])
920}
921
922fn parse_predicate(value: &Json, prefix: &[String]) -> Result<Predicate, String> {
923    let obj = value
924        .as_object()
925        .ok_or_else(|| "where clause must be a table".to_string())?;
926
927    let mut clauses: Vec<Predicate> = Vec::new();
928
929    for (key, val) in obj {
930        match key.as_str() {
931            "_and" => {
932                let arr = val
933                    .as_array()
934                    .ok_or_else(|| "_and must be an array of sub-predicates".to_string())?;
935                let mut subs = Vec::with_capacity(arr.len());
936                for sub in arr {
937                    subs.push(parse_predicate(sub, prefix)?);
938                }
939                clauses.push(Predicate::And(subs));
940            }
941            "_or" => {
942                let arr = val
943                    .as_array()
944                    .ok_or_else(|| "_or must be an array of sub-predicates".to_string())?;
945                let mut subs = Vec::with_capacity(arr.len());
946                for sub in arr {
947                    subs.push(parse_predicate(sub, prefix)?);
948                }
949                clauses.push(Predicate::Or(subs));
950            }
951            "_not" => {
952                clauses.push(Predicate::Not(Box::new(parse_predicate(val, prefix)?)));
953            }
954            _ => {
955                // Field key — extend the current path.
956                let mut new_path = prefix.to_vec();
957                new_path.push(key.clone());
958
959                match val {
960                    Json::Object(m) if is_operator_object(m) => {
961                        // Leaf: operator object at this path.
962                        for (op_key, op_val) in m {
963                            let op = CmpOp::from_key(op_key).expect("validated above");
964                            clauses.push(Predicate::Cmp(Comparison {
965                                path: new_path.clone(),
966                                op,
967                                value: op_val.clone(),
968                            }));
969                        }
970                    }
971                    Json::Object(_) => {
972                        // Nested section: recurse with extended path.
973                        clauses.push(parse_predicate(val, &new_path)?);
974                    }
975                    _ => {
976                        // Scalar/array: implicit eq.
977                        clauses.push(Predicate::Cmp(Comparison {
978                            path: new_path,
979                            op: CmpOp::Eq,
980                            value: val.clone(),
981                        }));
982                    }
983                }
984            }
985        }
986    }
987
988    if clauses.len() == 1 {
989        Ok(clauses.remove(0))
990    } else {
991        Ok(Predicate::And(clauses))
992    }
993}
994
995/// Fetch a nested value from a Card JSON by dotted path.
996fn fetch_path<'a>(card: &'a Json, path: &[String]) -> Option<&'a Json> {
997    let mut node = card;
998    for key in path {
999        let obj = node.as_object()?;
1000        node = obj.get(key)?;
1001    }
1002    Some(node)
1003}
1004
1005/// Compare two JSON scalars with a numeric/string/bool comparator.
1006/// Returns None when the types aren't comparable.
1007fn json_cmp(a: &Json, b: &Json) -> Option<std::cmp::Ordering> {
1008    match (a, b) {
1009        (Json::Number(x), Json::Number(y)) => {
1010            let xf = x.as_f64()?;
1011            let yf = y.as_f64()?;
1012            xf.partial_cmp(&yf)
1013        }
1014        (Json::String(x), Json::String(y)) => Some(x.cmp(y)),
1015        (Json::Bool(x), Json::Bool(y)) => Some(x.cmp(y)),
1016        _ => None,
1017    }
1018}
1019
1020fn json_eq(a: &Json, b: &Json) -> bool {
1021    match (a, b) {
1022        (Json::Number(x), Json::Number(y)) => match (x.as_f64(), y.as_f64()) {
1023            (Some(xf), Some(yf)) => xf == yf,
1024            _ => a == b,
1025        },
1026        _ => a == b,
1027    }
1028}
1029
1030fn eval_cmp(cmp: &Comparison, card: &Json) -> bool {
1031    let actual = fetch_path(card, &cmp.path);
1032    let exists = actual.is_some();
1033
1034    match cmp.op {
1035        CmpOp::Exists => {
1036            let want = cmp.value.as_bool().unwrap_or(true);
1037            exists == want
1038        }
1039        CmpOp::Ne => match actual {
1040            None => true,
1041            Some(v) => !json_eq(v, &cmp.value),
1042        },
1043        CmpOp::Nin => match actual {
1044            None => true,
1045            Some(v) => match cmp.value.as_array() {
1046                Some(arr) => !arr.iter().any(|e| json_eq(e, v)),
1047                None => false,
1048            },
1049        },
1050        CmpOp::Eq => actual.is_some_and(|v| json_eq(v, &cmp.value)),
1051        CmpOp::In => actual.is_some_and(|v| match cmp.value.as_array() {
1052            Some(arr) => arr.iter().any(|e| json_eq(e, v)),
1053            None => false,
1054        }),
1055        CmpOp::Lt | CmpOp::Lte | CmpOp::Gt | CmpOp::Gte => {
1056            let Some(v) = actual else { return false };
1057            let Some(ord) = json_cmp(v, &cmp.value) else {
1058                return false;
1059            };
1060            use std::cmp::Ordering::{Equal, Greater, Less};
1061            matches!(
1062                (&cmp.op, ord),
1063                (CmpOp::Lt, Less)
1064                    | (CmpOp::Lte, Less | Equal)
1065                    | (CmpOp::Gt, Greater)
1066                    | (CmpOp::Gte, Greater | Equal)
1067            )
1068        }
1069        CmpOp::Contains => {
1070            let Some(Json::String(haystack)) = actual else {
1071                return false;
1072            };
1073            let Some(needle) = cmp.value.as_str() else {
1074                return false;
1075            };
1076            haystack.contains(needle)
1077        }
1078        CmpOp::StartsWith => {
1079            let Some(Json::String(haystack)) = actual else {
1080                return false;
1081            };
1082            let Some(needle) = cmp.value.as_str() else {
1083                return false;
1084            };
1085            haystack.starts_with(needle)
1086        }
1087    }
1088}
1089
1090/// Evaluate a predicate tree against a full Card JSON.
1091pub fn eval_predicate(pred: &Predicate, card: &Json) -> bool {
1092    match pred {
1093        Predicate::And(subs) => subs.iter().all(|p| eval_predicate(p, card)),
1094        Predicate::Or(subs) => subs.iter().any(|p| eval_predicate(p, card)),
1095        Predicate::Not(sub) => !eval_predicate(sub, card),
1096        Predicate::Cmp(c) => eval_cmp(c, card),
1097    }
1098}
1099
1100// ───────────────────────────────────────────────────────────────
1101// Order-by
1102// ───────────────────────────────────────────────────────────────
1103
1104/// Parsed sort key: path with optional descending flag.
1105#[derive(Debug, Clone)]
1106pub struct OrderKey {
1107    pub path: Vec<String>,
1108    pub desc: bool,
1109}
1110
1111impl OrderKey {
1112    fn parse(raw: &str) -> Result<Self, String> {
1113        if raw.is_empty() {
1114            return Err("order_by key must not be empty".into());
1115        }
1116        let (desc, rest) = if let Some(r) = raw.strip_prefix('-') {
1117            (true, r)
1118        } else {
1119            (false, raw)
1120        };
1121        let path: Vec<String> = rest.split('.').map(|s| s.to_string()).collect();
1122        if path.iter().any(|p| p.is_empty()) {
1123            return Err(format!("invalid order_by key: '{raw}'"));
1124        }
1125        Ok(Self { path, desc })
1126    }
1127}
1128
1129/// Parse an order_by JSON value.  Accepts:
1130///   - a string: `"stats.pass_rate"` or `"-stats.pass_rate"`
1131///   - an array of strings: `["-stats.pass_rate", "created_at"]`
1132pub fn parse_order_by(value: &Json) -> Result<Vec<OrderKey>, String> {
1133    match value {
1134        Json::String(s) => Ok(vec![OrderKey::parse(s)?]),
1135        Json::Array(arr) => {
1136            let mut out = Vec::with_capacity(arr.len());
1137            for v in arr {
1138                let s = v
1139                    .as_str()
1140                    .ok_or_else(|| "order_by array must contain strings".to_string())?;
1141                out.push(OrderKey::parse(s)?);
1142            }
1143            Ok(out)
1144        }
1145        _ => Err("order_by must be a string or array of strings".into()),
1146    }
1147}
1148
1149/// Query parameters for `find`.
1150#[derive(Debug, Default, Clone)]
1151pub struct FindQuery {
1152    /// Restrict scan to a single pkg subdir (I/O optimization).
1153    pub pkg: Option<String>,
1154    /// Prisma-style predicate tree.
1155    pub where_: Option<Predicate>,
1156    /// Sort keys (dotted paths, optional `-` prefix for desc).
1157    pub order_by: Vec<OrderKey>,
1158    pub limit: Option<usize>,
1159    pub offset: Option<usize>,
1160}
1161
1162/// A loaded Card row flowing through the find() pipeline.
1163///
1164/// `full` is the whole Card JSON (used by `where` evaluation and
1165/// `order_by` dotted-path lookup); `summary` is the projection
1166/// returned to callers.
1167#[derive(Debug, Clone)]
1168struct CardRow {
1169    full: Json,
1170    summary: Summary,
1171}
1172
1173/// Load a single Card file into a `CardRow`.
1174fn load_full(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<CardRow> {
1175    let text = store.read_locator_text(locator).ok().flatten()?;
1176    let val: toml::Value = toml::from_str(&text).ok()?;
1177    let json = toml_to_json(val);
1178
1179    let card_id = json
1180        .get("card_id")
1181        .and_then(|v| v.as_str())
1182        .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
1183        .to_string();
1184    let created_at = json
1185        .get("created_at")
1186        .and_then(|v| v.as_str())
1187        .map(String::from);
1188    let model = json
1189        .get("model")
1190        .and_then(|m| m.get("id"))
1191        .and_then(|v| v.as_str())
1192        .map(String::from);
1193    let scenario = json
1194        .get("scenario")
1195        .and_then(|s| s.get("name"))
1196        .and_then(|v| v.as_str())
1197        .map(String::from);
1198    let pass_rate = json
1199        .get("stats")
1200        .and_then(|s| s.get("pass_rate"))
1201        .and_then(|v| v.as_f64());
1202
1203    Some(CardRow {
1204        full: json,
1205        summary: Summary {
1206            card_id,
1207            pkg: pkg.to_string(),
1208            created_at,
1209            model,
1210            scenario,
1211            pass_rate,
1212        },
1213    })
1214}
1215
1216/// Compare two Card rows according to an ordered list of sort keys.
1217fn order_cards(a: &CardRow, b: &CardRow, keys: &[OrderKey]) -> std::cmp::Ordering {
1218    use std::cmp::Ordering;
1219    for k in keys {
1220        let va = fetch_path(&a.full, &k.path);
1221        let vb = fetch_path(&b.full, &k.path);
1222        let ord = match (va, vb) {
1223            (None, None) => Ordering::Equal,
1224            (None, Some(_)) => Ordering::Greater, // missing sorts last
1225            (Some(_), None) => Ordering::Less,
1226            (Some(x), Some(y)) => json_cmp(x, y).unwrap_or(Ordering::Equal),
1227        };
1228        let ord = if k.desc { ord.reverse() } else { ord };
1229        if ord != Ordering::Equal {
1230            return ord;
1231        }
1232    }
1233    Ordering::Equal
1234}
1235
1236/// Summary-only fields that can be sorted without loading full TOML.
1237const SUMMARY_SORT_FIELDS: &[&str] = &[
1238    "card_id",
1239    "created_at",
1240    "stats.pass_rate",
1241    "scenario.name",
1242    "model.id",
1243];
1244
1245/// Return true when the query can be answered with lightweight Summary
1246/// rows (no full-TOML load needed).
1247fn is_lightweight_query(q: &FindQuery) -> bool {
1248    q.where_.is_none()
1249        && q.order_by
1250            .iter()
1251            .all(|k| SUMMARY_SORT_FIELDS.contains(&k.path.join(".").as_str()))
1252}
1253
1254/// Sort Summary rows using order_by keys that map to Summary fields.
1255fn order_summaries(a: &Summary, b: &Summary, keys: &[OrderKey]) -> std::cmp::Ordering {
1256    use std::cmp::Ordering;
1257    for k in keys {
1258        let key_str = k.path.join(".");
1259        let ord = match key_str.as_str() {
1260            "card_id" => a.card_id.cmp(&b.card_id),
1261            "created_at" => a.created_at.cmp(&b.created_at),
1262            "stats.pass_rate" => match (a.pass_rate, b.pass_rate) {
1263                (None, None) => Ordering::Equal,
1264                (None, Some(_)) => Ordering::Greater,
1265                (Some(_), None) => Ordering::Less,
1266                (Some(x), Some(y)) => x.partial_cmp(&y).unwrap_or(Ordering::Equal),
1267            },
1268            "scenario.name" => a.scenario.cmp(&b.scenario),
1269            "model.id" => a.model.cmp(&b.model),
1270            _ => Ordering::Equal,
1271        };
1272        let ord = if k.desc { ord.reverse() } else { ord };
1273        if ord != Ordering::Equal {
1274            return ord;
1275        }
1276    }
1277    Ordering::Equal
1278}
1279
1280/// Filter/sort Cards across the store using the `where` DSL.
1281///
1282/// When no `where` clause is specified and `order_by` only references
1283/// summary-level fields, uses the lightweight `list()` path to avoid
1284/// loading full TOML.  Otherwise loads full TOML per Card.
1285pub fn find(q: FindQuery) -> Result<Vec<Summary>, String> {
1286    find_with_store(&default_store()?, q)
1287}
1288
1289/// Filter/sort Cards from `store`. See [`find`] for the default-store variant.
1290pub fn find_with_store(store: &dyn CardStore, q: FindQuery) -> Result<Vec<Summary>, String> {
1291    // Fast path: lightweight query, no full-TOML load needed.
1292    if is_lightweight_query(&q) {
1293        let mut rows = list_with_store(store, q.pkg.as_deref())?;
1294        if q.order_by.is_empty() {
1295            rows.sort_by(|a, b| {
1296                b.created_at
1297                    .cmp(&a.created_at)
1298                    .then_with(|| b.card_id.cmp(&a.card_id))
1299            });
1300        } else {
1301            rows.sort_by(|a, b| order_summaries(a, b, &q.order_by));
1302        }
1303        let out: Vec<Summary> = rows
1304            .into_iter()
1305            .skip(q.offset.unwrap_or(0))
1306            .take(q.limit.unwrap_or(usize::MAX))
1307            .collect();
1308        return Ok(out);
1309    }
1310
1311    // Full path: load entire TOML for where evaluation / arbitrary order_by.
1312    let all_rows = scan_cards(store, q.pkg.as_deref())?;
1313
1314    // Filter by where.
1315    let mut rows: Vec<CardRow> = if let Some(pred) = &q.where_ {
1316        all_rows
1317            .into_iter()
1318            .filter(|row| eval_predicate(pred, &row.full))
1319            .collect()
1320    } else {
1321        all_rows
1322    };
1323
1324    // Sort.
1325    if q.order_by.is_empty() {
1326        rows.sort_by(|a, b| {
1327            b.summary
1328                .created_at
1329                .cmp(&a.summary.created_at)
1330                .then_with(|| b.summary.card_id.cmp(&a.summary.card_id))
1331        });
1332    } else {
1333        rows.sort_by(|a, b| order_cards(a, b, &q.order_by));
1334    }
1335
1336    // Offset + limit.
1337    let out: Vec<Summary> = rows
1338        .into_iter()
1339        .skip(q.offset.unwrap_or(0))
1340        .take(q.limit.unwrap_or(usize::MAX))
1341        .map(|r| r.summary)
1342        .collect();
1343
1344    Ok(out)
1345}
1346
1347// ───────────────────────────────────────────────────────────────
1348// Lineage walker
1349// ───────────────────────────────────────────────────────────────
1350//
1351// Cards form a tree (typically, not strictly a DAG) via the
1352// `metadata.prior_card_id` convention. `lineage()` walks that tree
1353// either up (toward ancestors) or down (toward descendants) or both,
1354// up to a configurable depth, optionally filtered by `prior_relation`.
1355//
1356// Up-walk is O(depth) — each step reads one parent Card.
1357// Down-walk is O(N_cards × depth) — we scan the whole store at each
1358// level. For the current scale (hundreds to low thousands of cards)
1359// this is fine; if the store grows we can build a prior_card_id index.
1360
1361/// Walk direction for `lineage`.
1362#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1363pub enum LineageDirection {
1364    #[default]
1365    Up,
1366    Down,
1367    Both,
1368}
1369
1370impl LineageDirection {
1371    pub fn parse(s: &str) -> Result<Self, String> {
1372        match s {
1373            "up" => Ok(Self::Up),
1374            "down" => Ok(Self::Down),
1375            "both" => Ok(Self::Both),
1376            other => Err(format!(
1377                "direction must be 'up', 'down', or 'both' (got '{other}')"
1378            )),
1379        }
1380    }
1381}
1382
1383/// Query parameters for `lineage`.
1384#[derive(Debug, Clone, Default)]
1385pub struct LineageQuery {
1386    pub card_id: String,
1387    pub direction: LineageDirection,
1388    /// Max traversal depth. Default 10.
1389    pub depth: Option<usize>,
1390    /// Include a per-node `stats` field (full [stats] section).
1391    pub include_stats: bool,
1392    /// If set, only edges whose `prior_relation` is in this list are
1393    /// followed.  The root is always included regardless.
1394    pub relation_filter: Option<Vec<String>>,
1395}
1396
1397/// One node in the lineage result.
1398///
1399/// `depth` is the signed distance from the root: negative for
1400/// ancestors (up-walk), 0 for the root, positive for descendants.
1401#[derive(Debug, Clone)]
1402pub struct LineageNode {
1403    pub card_id: String,
1404    pub pkg: String,
1405    pub prior_card_id: Option<String>,
1406    pub prior_relation: Option<String>,
1407    pub depth: i32,
1408    pub stats: Option<Json>,
1409}
1410
1411/// One edge in the lineage result (child → parent, always).
1412#[derive(Debug, Clone)]
1413pub struct LineageEdge {
1414    pub from: String,
1415    pub to: String,
1416    pub relation: Option<String>,
1417}
1418
1419/// Full lineage walk result.
1420#[derive(Debug, Clone)]
1421pub struct LineageResult {
1422    pub root: String,
1423    pub nodes: Vec<LineageNode>,
1424    pub edges: Vec<LineageEdge>,
1425    pub truncated: bool,
1426}
1427
1428const DEFAULT_LINEAGE_DEPTH: usize = 10;
1429
1430/// Extract the lineage fields from a Card JSON.
1431/// Returns (prior_card_id, prior_relation).
1432fn lineage_fields(card: &Json) -> (Option<String>, Option<String>) {
1433    let meta = card.get("metadata");
1434    let prior_card_id = meta
1435        .and_then(|m| m.get("prior_card_id"))
1436        .and_then(|v| v.as_str())
1437        .map(String::from);
1438    let prior_relation = meta
1439        .and_then(|m| m.get("prior_relation"))
1440        .and_then(|v| v.as_str())
1441        .map(String::from);
1442    (prior_card_id, prior_relation)
1443}
1444
1445/// Build a LineageNode from a loaded CardRow at a given depth.
1446fn make_node(row: &CardRow, depth: i32, include_stats: bool) -> LineageNode {
1447    let (prior_card_id, prior_relation) = lineage_fields(&row.full);
1448    let stats = if include_stats {
1449        row.full.get("stats").cloned()
1450    } else {
1451        None
1452    };
1453    LineageNode {
1454        card_id: row.summary.card_id.clone(),
1455        pkg: row.summary.pkg.clone(),
1456        prior_card_id,
1457        prior_relation,
1458        depth,
1459        stats,
1460    }
1461}
1462
1463/// Check whether `relation` passes the relation_filter (None means no
1464/// filter, which always passes).
1465fn relation_passes(filter: &Option<Vec<String>>, relation: &Option<String>) -> bool {
1466    match filter {
1467        None => true,
1468        Some(allowed) => match relation {
1469            Some(r) => allowed.iter().any(|a| a == r),
1470            None => false,
1471        },
1472    }
1473}
1474
1475/// Full in-memory card index with forward and reverse lineage maps.
1476struct CardIndex {
1477    /// card_id → CardRow
1478    cards: std::collections::HashMap<String, CardRow>,
1479    /// parent card_id → Vec<child card_id> (reverse lineage index)
1480    children: std::collections::HashMap<String, Vec<String>>,
1481}
1482
1483/// Load all Cards in the store once, keyed by card_id.
1484/// Also builds a reverse index (parent → children) so that
1485/// `walk_down` is O(result_size) instead of O(N_cards × depth).
1486fn load_card_index(store: &dyn CardStore) -> Result<CardIndex, String> {
1487    let rows = scan_cards(store, None)?;
1488
1489    let mut cards = std::collections::HashMap::with_capacity(rows.len());
1490    let mut children: std::collections::HashMap<String, Vec<String>> =
1491        std::collections::HashMap::new();
1492
1493    for row in rows {
1494        let id = row.summary.card_id.clone();
1495        let (prior_id, _) = lineage_fields(&row.full);
1496        if let Some(parent) = prior_id {
1497            children.entry(parent).or_default().push(id.clone());
1498        }
1499        cards.insert(id, row);
1500    }
1501    Ok(CardIndex { cards, children })
1502}
1503
1504/// Scan all Cards in the store, loading full TOML for each. When
1505/// `pkg_filter` is provided, only that pkg subdir is scanned. Shared
1506/// between `find` and `load_card_index`.
1507fn scan_cards(store: &dyn CardStore, pkg_filter: Option<&str>) -> Result<Vec<CardRow>, String> {
1508    let locators = store.list_card_locators(pkg_filter)?;
1509    let mut rows = Vec::with_capacity(locators.len());
1510    for (pkg, loc) in &locators {
1511        if let Some(row) = load_full(store, loc, pkg) {
1512            rows.push(row);
1513        }
1514    }
1515    Ok(rows)
1516}
1517
1518/// Invariant context passed through the lineage walkers.
1519struct LineageCtx<'a> {
1520    index: &'a CardIndex,
1521    relation_filter: &'a Option<Vec<String>>,
1522    include_stats: bool,
1523    max_depth: usize,
1524}
1525
1526/// Mutable accumulator for one lineage walk.
1527struct LineageAccum {
1528    nodes: Vec<LineageNode>,
1529    edges: Vec<LineageEdge>,
1530    visited: std::collections::HashSet<String>,
1531    truncated: bool,
1532}
1533
1534/// Walk ancestors via `metadata.prior_card_id`.
1535fn walk_up(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1536    let mut cur = start_id.to_string();
1537    for step in 1..=ctx.max_depth {
1538        let Some(row) = ctx.index.cards.get(&cur) else {
1539            return;
1540        };
1541        let (prior_id, prior_rel) = lineage_fields(&row.full);
1542        let Some(prior_id) = prior_id else {
1543            return;
1544        };
1545        if !relation_passes(ctx.relation_filter, &prior_rel) {
1546            return;
1547        }
1548        if acc.visited.contains(&prior_id) {
1549            return;
1550        }
1551        let Some(parent) = ctx.index.cards.get(&prior_id) else {
1552            return;
1553        };
1554        acc.nodes
1555            .push(make_node(parent, -(step as i32), ctx.include_stats));
1556        acc.edges.push(LineageEdge {
1557            from: row.summary.card_id.clone(),
1558            to: parent.summary.card_id.clone(),
1559            relation: prior_rel,
1560        });
1561        acc.visited.insert(prior_id.clone());
1562        cur = prior_id;
1563    }
1564    // Depth exhausted but another unwalked parent exists → truncated.
1565    if let Some(row) = ctx.index.cards.get(&cur) {
1566        let (prior_id, _) = lineage_fields(&row.full);
1567        if prior_id
1568            .as_ref()
1569            .is_some_and(|p| ctx.index.cards.contains_key(p) && !acc.visited.contains(p))
1570        {
1571            acc.truncated = true;
1572        }
1573    }
1574}
1575
1576/// Walk descendants using the reverse index (parent → children),
1577/// breadth-first.  O(result_size) instead of O(N_cards × depth).
1578fn walk_down(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1579    let mut frontier: Vec<String> = vec![start_id.to_string()];
1580
1581    for depth in 1..=ctx.max_depth {
1582        let mut next_frontier: Vec<String> = Vec::new();
1583        for parent_id in &frontier {
1584            let children = match ctx.index.children.get(parent_id) {
1585                Some(c) => c,
1586                None => continue,
1587            };
1588            for child_id in children {
1589                if acc.visited.contains(child_id) {
1590                    continue;
1591                }
1592                let Some(child) = ctx.index.cards.get(child_id) else {
1593                    continue;
1594                };
1595                let (_, prior_rel) = lineage_fields(&child.full);
1596                if !relation_passes(ctx.relation_filter, &prior_rel) {
1597                    continue;
1598                }
1599                acc.nodes
1600                    .push(make_node(child, depth as i32, ctx.include_stats));
1601                acc.edges.push(LineageEdge {
1602                    from: child.summary.card_id.clone(),
1603                    to: parent_id.clone(),
1604                    relation: prior_rel,
1605                });
1606                acc.visited.insert(child_id.clone());
1607                next_frontier.push(child_id.clone());
1608            }
1609        }
1610        if next_frontier.is_empty() {
1611            return;
1612        }
1613        frontier = next_frontier;
1614    }
1615    // Frontier still has nodes but depth is exhausted: check for
1616    // unwalked children at the next level.
1617    for parent_id in &frontier {
1618        let children = match ctx.index.children.get(parent_id) {
1619            Some(c) => c,
1620            None => continue,
1621        };
1622        for child_id in children {
1623            if acc.visited.contains(child_id) {
1624                continue;
1625            }
1626            let Some(child) = ctx.index.cards.get(child_id) else {
1627                continue;
1628            };
1629            let (_, prior_rel) = lineage_fields(&child.full);
1630            if relation_passes(ctx.relation_filter, &prior_rel) {
1631                acc.truncated = true;
1632                return;
1633            }
1634        }
1635    }
1636}
1637
1638/// Walk the lineage tree from `q.card_id`.
1639pub fn lineage(q: LineageQuery) -> Result<Option<LineageResult>, String> {
1640    lineage_with_store(&default_store()?, q)
1641}
1642
1643/// Walk the lineage tree in `store`. See [`lineage`] for the default-store variant.
1644pub fn lineage_with_store(
1645    store: &dyn CardStore,
1646    q: LineageQuery,
1647) -> Result<Option<LineageResult>, String> {
1648    let index = load_card_index(store)?;
1649    let Some(root_row) = index.cards.get(&q.card_id) else {
1650        return Ok(None);
1651    };
1652
1653    let ctx = LineageCtx {
1654        index: &index,
1655        relation_filter: &q.relation_filter,
1656        include_stats: q.include_stats,
1657        max_depth: q.depth.unwrap_or(DEFAULT_LINEAGE_DEPTH),
1658    };
1659    let mut acc = LineageAccum {
1660        nodes: Vec::new(),
1661        edges: Vec::new(),
1662        visited: std::collections::HashSet::new(),
1663        truncated: false,
1664    };
1665
1666    acc.nodes.push(make_node(root_row, 0, q.include_stats));
1667    acc.visited.insert(q.card_id.clone());
1668
1669    if matches!(q.direction, LineageDirection::Up | LineageDirection::Both) {
1670        walk_up(&q.card_id, &ctx, &mut acc);
1671    }
1672    if matches!(q.direction, LineageDirection::Down | LineageDirection::Both) {
1673        walk_down(&q.card_id, &ctx, &mut acc);
1674    }
1675
1676    Ok(Some(LineageResult {
1677        root: q.card_id,
1678        nodes: acc.nodes,
1679        edges: acc.edges,
1680        truncated: acc.truncated,
1681    }))
1682}
1683
1684/// Render a LineageResult as JSON for the service layer.
1685pub fn lineage_to_json(r: &LineageResult) -> Json {
1686    let nodes: Vec<Json> = r
1687        .nodes
1688        .iter()
1689        .map(|n| {
1690            let mut m = serde_json::Map::new();
1691            m.insert("card_id".into(), json!(n.card_id));
1692            m.insert("pkg".into(), json!(n.pkg));
1693            m.insert("depth".into(), json!(n.depth));
1694            if let Some(p) = &n.prior_card_id {
1695                m.insert("prior_card_id".into(), json!(p));
1696            }
1697            if let Some(rel) = &n.prior_relation {
1698                m.insert("prior_relation".into(), json!(rel));
1699            }
1700            if let Some(s) = &n.stats {
1701                m.insert("stats".into(), s.clone());
1702            }
1703            Json::Object(m)
1704        })
1705        .collect();
1706    let edges: Vec<Json> = r
1707        .edges
1708        .iter()
1709        .map(|e| {
1710            let mut m = serde_json::Map::new();
1711            m.insert("from".into(), json!(e.from));
1712            m.insert("to".into(), json!(e.to));
1713            if let Some(rel) = &e.relation {
1714                m.insert("relation".into(), json!(rel));
1715            }
1716            Json::Object(m)
1717        })
1718        .collect();
1719    json!({
1720        "root": r.root,
1721        "nodes": nodes,
1722        "edges": edges,
1723        "truncated": r.truncated,
1724    })
1725}
1726
1727// ───────────────────────────────────────────────────────────────
1728// Samples sidecar: per-case detail written alongside a Card as
1729// `{pkg}/{card_id}.samples.jsonl`. Write-once to preserve Card
1730// immutability: once a Card has a samples file, it cannot be
1731// rewritten — mismatched per-case data would break auditability.
1732// ───────────────────────────────────────────────────────────────
1733
1734// ───────────────────────────────────────────────────────────────
1735// Card import: copy Card files from an external directory into the
1736// local cards store. Used by `alc_card_install` (Card Collections)
1737// and by `alc_pkg_install` (Pkg-bundled cards/).
1738// ───────────────────────────────────────────────────────────────
1739
1740/// Import Card files from `source_dir` into `~/.algocline/cards/{pkg}/`.
1741///
1742/// Copies `*.toml` and `*.samples.jsonl` files. Existing cards with the
1743/// same id are skipped (first-writer wins — Card immutability).
1744///
1745/// Returns `(imported, skipped)` card_id lists.
1746pub fn import_from_dir(
1747    source_dir: &std::path::Path,
1748    pkg: &str,
1749) -> Result<(Vec<String>, Vec<String>), String> {
1750    import_from_dir_with_store(&default_store()?, source_dir, pkg)
1751}
1752
1753/// Import Card files into `store`. See [`import_from_dir`] for the default-store variant.
1754pub fn import_from_dir_with_store(
1755    store: &dyn CardStore,
1756    source_dir: &std::path::Path,
1757    pkg: &str,
1758) -> Result<(Vec<String>, Vec<String>), String> {
1759    let (imported, skipped) = store.import_from_dir(source_dir, pkg)?;
1760    for card_id in &imported {
1761        match store.read_card_text(card_id) {
1762            Ok(Some(toml_text)) => publish(CardEvent::Created {
1763                pkg: pkg.to_string(),
1764                card_id: card_id.clone(),
1765                toml_text,
1766            }),
1767            Ok(None) => {
1768                tracing::warn!(
1769                    card_id = %card_id,
1770                    "import_from_dir: read_card_text returned None after import; skipping publish"
1771                );
1772            }
1773            Err(e) => {
1774                tracing::warn!(
1775                    card_id = %card_id,
1776                    error = %e,
1777                    "import_from_dir: read_card_text failed after import; skipping publish"
1778                );
1779            }
1780        }
1781        // Samples are optional — best-effort.
1782        match store.read_samples_text(card_id) {
1783            Ok(Some(jsonl_text)) => publish(CardEvent::SamplesWritten {
1784                card_id: card_id.clone(),
1785                jsonl_text,
1786            }),
1787            Ok(None) => {}
1788            Err(e) => {
1789                tracing::warn!(
1790                    card_id = %card_id,
1791                    error = %e,
1792                    "import_from_dir: read_samples_text failed after import; skipping publish"
1793                );
1794            }
1795        }
1796    }
1797    Ok((imported, skipped))
1798}
1799
1800/// Write per-case samples to `{card_id}.samples.jsonl` (write-once).
1801///
1802/// Each `samples` entry is serialized as one compact JSON line.
1803/// Fails if a samples file already exists for this card — mirrors
1804/// the immutability guarantee of Cards themselves.
1805pub fn write_samples(card_id: &str, samples: Vec<Json>) -> Result<PathBuf, String> {
1806    write_samples_with_store(&default_store()?, card_id, samples)
1807}
1808
1809/// Write samples via `store`. See [`write_samples`] for the default-store variant.
1810pub fn write_samples_with_store(
1811    store: &dyn CardStore,
1812    card_id: &str,
1813    samples: Vec<Json>,
1814) -> Result<PathBuf, String> {
1815    if store.samples_exists(card_id)? {
1816        return Err(format!(
1817            "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
1818        ));
1819    }
1820    let mut buf = String::new();
1821    for (idx, s) in samples.iter().enumerate() {
1822        let line = serde_json::to_string(s).map_err(|e| {
1823            format!("alc.card.write_samples: failed to serialize sample #{idx}: {e}")
1824        })?;
1825        buf.push_str(&line);
1826        buf.push('\n');
1827    }
1828    let path = store.write_samples_text(card_id, &buf)?;
1829
1830    publish(CardEvent::SamplesWritten {
1831        card_id: card_id.to_string(),
1832        jsonl_text: buf,
1833    });
1834
1835    Ok(path)
1836}
1837
1838/// Query parameters for `read_samples`.
1839#[derive(Debug, Default, Clone)]
1840pub struct SamplesQuery {
1841    /// Skip this many matched rows (after `where` filtering).
1842    pub offset: usize,
1843    /// Max matched rows to return.
1844    pub limit: Option<usize>,
1845    /// Optional `where` predicate applied to each sample row.
1846    /// The row JSON is the full line object (no section wrapping).
1847    pub where_: Option<Predicate>,
1848}
1849
1850/// Read per-case samples from `{card_id}.samples.jsonl`.
1851///
1852/// Streams the JSONL file line by line; rows are parsed, optionally
1853/// filtered by `q.where_`, then paged by `offset` + `limit`.  Offset
1854/// applies to the **post-filter** stream, matching Prisma/SQL
1855/// semantics.
1856///
1857/// Returns an empty Vec if no samples file exists (Cards without
1858/// per-case details are the common case, not an error).
1859pub fn read_samples(card_id: &str, q: SamplesQuery) -> Result<Vec<Json>, String> {
1860    read_samples_with_store(&default_store()?, card_id, q)
1861}
1862
1863/// Read samples from `store`. See [`read_samples`] for the default-store variant.
1864pub fn read_samples_with_store(
1865    store: &dyn CardStore,
1866    card_id: &str,
1867    q: SamplesQuery,
1868) -> Result<Vec<Json>, String> {
1869    let text = match store.read_samples_text(card_id)? {
1870        Some(t) => t,
1871        None => return Ok(Vec::new()),
1872    };
1873    let mut matched: usize = 0;
1874    let mut out = Vec::new();
1875    for (i, line) in text.lines().enumerate() {
1876        if line.trim().is_empty() {
1877            continue;
1878        }
1879        let val: Json = serde_json::from_str(line)
1880            .map_err(|e| format!("Failed to parse sample line {i}: {e}"))?;
1881        if let Some(pred) = &q.where_ {
1882            if !eval_predicate(pred, &val) {
1883                continue;
1884            }
1885        }
1886        if matched < q.offset {
1887            matched += 1;
1888            continue;
1889        }
1890        if let Some(lim) = q.limit {
1891            if out.len() >= lim {
1892                break;
1893            }
1894        }
1895        matched += 1;
1896        out.push(val);
1897    }
1898    Ok(out)
1899}
1900
1901// ═══════════════════════════════════════════════════════════════
1902// FileCardStore — default backend.
1903// ═══════════════════════════════════════════════════════════════
1904//
1905// Stores Cards as TOML files under `{root}/{pkg}/{card_id}.toml`,
1906// samples as `{root}/{pkg}/{card_id}.samples.jsonl`, and the alias
1907// table as `{root}/_aliases.toml`.
1908//
1909// `root` defaults to `~/.algocline/cards/` via `from_home()`. Tests
1910// may use `new(tmpdir)` to redirect storage to a scratch directory.
1911
1912/// File-backed implementation of [`CardStore`].
1913pub struct FileCardStore {
1914    root: PathBuf,
1915}
1916
1917impl FileCardStore {
1918    /// Construct a store rooted at an explicit path.
1919    pub fn new(root: PathBuf) -> Self {
1920        Self { root }
1921    }
1922
1923    /// Construct the default store rooted at `~/.algocline/cards/`.
1924    pub fn from_home() -> Result<Self, String> {
1925        Ok(Self { root: cards_dir()? })
1926    }
1927
1928    /// Returns the absolute path to the per-pkg subdirectory,
1929    /// creating it when missing. Validates `pkg` to prevent path
1930    /// traversal.
1931    fn pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
1932        validate_name(pkg, "pkg")?;
1933        let dir = self.root.join(pkg);
1934        if !dir.exists() {
1935            fs::create_dir_all(&dir).map_err(|e| format!("Failed to create pkg dir: {e}"))?;
1936        }
1937        Ok(dir)
1938    }
1939
1940    /// Path of the global alias table.
1941    fn aliases_path(&self) -> PathBuf {
1942        self.root.join("_aliases.toml")
1943    }
1944
1945    /// Path of the samples sidecar for `card_id`. Errors if the
1946    /// Card itself does not exist — samples without a parent Card
1947    /// are meaningless and we refuse to create orphans.
1948    fn samples_path(&self, card_id: &str) -> Result<PathBuf, String> {
1949        let card_path = self
1950            .find_card_locator(card_id)?
1951            .ok_or_else(|| format!("card '{card_id}' not found"))?;
1952        let dir = card_path
1953            .parent()
1954            .ok_or_else(|| format!("card '{card_id}' has no parent directory"))?;
1955        Ok(dir.join(format!("{card_id}.samples.jsonl")))
1956    }
1957}
1958
1959impl CardStore for FileCardStore {
1960    fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1961        let dir = self.pkg_dir(pkg)?;
1962        let path = dir.join(format!("{card_id}.toml"));
1963        if path.exists() {
1964            return Err(format!(
1965                "alc.card.create: card '{card_id}' already exists (immutable)"
1966            ));
1967        }
1968        let tmp = path.with_extension("toml.tmp");
1969        fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1970        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1971        Ok(path)
1972    }
1973
1974    fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1975        let path = self
1976            .find_card_locator(card_id)?
1977            .ok_or_else(|| format!("alc.card.overwrite: card '{card_id}' not found"))?;
1978        let tmp = path.with_extension("toml.tmp");
1979        fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1980        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1981        Ok(path)
1982    }
1983
1984    fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
1985        validate_name(card_id, "card_id")?;
1986        if !self.root.exists() {
1987            return Ok(None);
1988        }
1989        let entries =
1990            fs::read_dir(&self.root).map_err(|e| format!("Failed to read cards dir: {e}"))?;
1991        for entry in entries.flatten() {
1992            let p = entry.path();
1993            if p.is_dir() {
1994                let candidate = p.join(format!("{card_id}.toml"));
1995                if candidate.exists() {
1996                    return Ok(Some(candidate));
1997                }
1998            }
1999        }
2000        Ok(None)
2001    }
2002
2003    fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String> {
2004        let Some(path) = self.find_card_locator(card_id)? else {
2005            return Ok(None);
2006        };
2007        let text = fs::read_to_string(&path)
2008            .map_err(|e| format!("Failed to read card '{card_id}': {e}"))?;
2009        Ok(Some(text))
2010    }
2011
2012    fn list_card_locators(
2013        &self,
2014        pkg_filter: Option<&str>,
2015    ) -> Result<Vec<(String, PathBuf)>, String> {
2016        if !self.root.exists() {
2017            return Ok(Vec::new());
2018        }
2019        let pkg_dirs: Vec<PathBuf> = if let Some(p) = pkg_filter {
2020            validate_name(p, "pkg")?;
2021            let d = self.root.join(p);
2022            if d.is_dir() {
2023                vec![d]
2024            } else {
2025                return Ok(Vec::new());
2026            }
2027        } else {
2028            fs::read_dir(&self.root)
2029                .map_err(|e| format!("Failed to read cards dir: {e}"))?
2030                .flatten()
2031                .map(|e| e.path())
2032                .filter(|p| p.is_dir())
2033                .collect()
2034        };
2035
2036        let mut out = Vec::new();
2037        for pdir in pkg_dirs {
2038            let pkg = pdir
2039                .file_name()
2040                .and_then(|s| s.to_str())
2041                .unwrap_or("")
2042                .to_string();
2043            let entries = match fs::read_dir(&pdir) {
2044                Ok(e) => e,
2045                Err(_) => continue,
2046            };
2047            for entry in entries.flatten() {
2048                let p = entry.path();
2049                if p.extension().and_then(|s| s.to_str()) != Some("toml") {
2050                    continue;
2051                }
2052                out.push((pkg.clone(), p));
2053            }
2054        }
2055        Ok(out)
2056    }
2057
2058    fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String> {
2059        match fs::read_to_string(locator) {
2060            Ok(text) => Ok(Some(text)),
2061            Err(_) => Ok(None),
2062        }
2063    }
2064
2065    fn read_aliases(&self) -> Result<Vec<Alias>, String> {
2066        let path = self.aliases_path();
2067        if !path.exists() {
2068            return Ok(Vec::new());
2069        }
2070        let text =
2071            fs::read_to_string(&path).map_err(|e| format!("Failed to read aliases file: {e}"))?;
2072        let val: toml::Value =
2073            toml::from_str(&text).map_err(|e| format!("Failed to parse aliases file: {e}"))?;
2074        let arr = val
2075            .get("alias")
2076            .and_then(|v| v.as_array())
2077            .cloned()
2078            .unwrap_or_default();
2079        let mut out = Vec::with_capacity(arr.len());
2080        for entry in arr {
2081            let t = match entry {
2082                toml::Value::Table(t) => t,
2083                _ => continue,
2084            };
2085            let name = match t.get("name").and_then(|v| v.as_str()) {
2086                Some(s) => s.to_string(),
2087                None => continue,
2088            };
2089            let card_id = match t.get("card_id").and_then(|v| v.as_str()) {
2090                Some(s) => s.to_string(),
2091                None => continue,
2092            };
2093            out.push(Alias {
2094                name,
2095                card_id,
2096                pkg: t.get("pkg").and_then(|v| v.as_str()).map(String::from),
2097                set_at: t
2098                    .get("set_at")
2099                    .and_then(|v| v.as_str())
2100                    .map(String::from)
2101                    .unwrap_or_default(),
2102                note: t.get("note").and_then(|v| v.as_str()).map(String::from),
2103            });
2104        }
2105        Ok(out)
2106    }
2107
2108    fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String> {
2109        // Ensure the cards root exists so aliases can be written to
2110        // a brand-new store (mirrors the behaviour of `cards_dir()`).
2111        if !self.root.exists() {
2112            fs::create_dir_all(&self.root)
2113                .map_err(|e| format!("Failed to create cards dir: {e}"))?;
2114        }
2115        let path = self.aliases_path();
2116        let mut arr = Vec::with_capacity(aliases.len());
2117        for a in aliases {
2118            let mut t = toml::map::Map::new();
2119            t.insert("name".into(), toml::Value::String(a.name.clone()));
2120            t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
2121            if let Some(p) = &a.pkg {
2122                t.insert("pkg".into(), toml::Value::String(p.clone()));
2123            }
2124            t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
2125            if let Some(n) = &a.note {
2126                t.insert("note".into(), toml::Value::String(n.clone()));
2127            }
2128            arr.push(toml::Value::Table(t));
2129        }
2130        let mut root = toml::map::Map::new();
2131        root.insert("alias".into(), toml::Value::Array(arr));
2132        let text = toml::to_string_pretty(&toml::Value::Table(root))
2133            .map_err(|e| format!("Failed to serialize aliases: {e}"))?;
2134        let tmp = path.with_extension("toml.tmp");
2135        fs::write(&tmp, &text).map_err(|e| format!("Failed to write aliases tmp: {e}"))?;
2136        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename aliases file: {e}"))?;
2137        Ok(())
2138    }
2139
2140    fn samples_exists(&self, card_id: &str) -> Result<bool, String> {
2141        let path = self.samples_path(card_id)?;
2142        Ok(path.exists())
2143    }
2144
2145    fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String> {
2146        let path = self.samples_path(card_id)?;
2147        if path.exists() {
2148            return Err(format!(
2149                "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
2150            ));
2151        }
2152        let tmp = path.with_extension("jsonl.tmp");
2153        fs::write(&tmp, jsonl_text).map_err(|e| format!("Failed to write samples tmp: {e}"))?;
2154        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename samples file: {e}"))?;
2155        Ok(path)
2156    }
2157
2158    fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String> {
2159        let path = self.samples_path(card_id)?;
2160        if !path.exists() {
2161            return Ok(None);
2162        }
2163        let text =
2164            fs::read_to_string(&path).map_err(|e| format!("Failed to read samples file: {e}"))?;
2165        Ok(Some(text))
2166    }
2167
2168    fn import_from_dir(
2169        &self,
2170        source_dir: &Path,
2171        pkg: &str,
2172    ) -> Result<(Vec<String>, Vec<String>), String> {
2173        validate_name(pkg, "pkg")?;
2174        let dest = self.pkg_dir(pkg)?;
2175        let mut imported = Vec::new();
2176        let mut skipped = Vec::new();
2177
2178        let entries =
2179            fs::read_dir(source_dir).map_err(|e| format!("Failed to read card source dir: {e}"))?;
2180
2181        for entry in entries.flatten() {
2182            let path = entry.path();
2183            let fname = match path.file_name().and_then(|n| n.to_str()) {
2184                Some(n) => n.to_string(),
2185                None => continue,
2186            };
2187
2188            if !fname.ends_with(".toml") {
2189                continue;
2190            }
2191
2192            let card_id = fname.trim_end_matches(".toml");
2193            let dest_toml = dest.join(&fname);
2194
2195            if dest_toml.exists() {
2196                skipped.push(card_id.to_string());
2197                continue;
2198            }
2199
2200            let text = fs::read_to_string(&path)
2201                .map_err(|e| format!("Failed to read card file '{fname}': {e}"))?;
2202            let val: toml::Value = toml::from_str(&text)
2203                .map_err(|e| format!("Failed to parse card file '{fname}': {e}"))?;
2204            if val.get("schema_version").and_then(|v| v.as_str()) != Some(SCHEMA_VERSION) {
2205                continue;
2206            }
2207
2208            fs::copy(&path, &dest_toml)
2209                .map_err(|e| format!("Failed to copy card '{fname}': {e}"))?;
2210
2211            let samples_name = format!("{card_id}.samples.jsonl");
2212            let samples_src = source_dir.join(&samples_name);
2213            if samples_src.exists() {
2214                let samples_dest = dest.join(&samples_name);
2215                if !samples_dest.exists() {
2216                    fs::copy(&samples_src, &samples_dest)
2217                        .map_err(|e| format!("Failed to copy samples '{samples_name}': {e}"))?;
2218                }
2219            }
2220
2221            imported.push(card_id.to_string());
2222        }
2223
2224        Ok((imported, skipped))
2225    }
2226}
2227
2228// ═══════════════════════════════════════════════════════════════
2229// Event Publisher Port — Card sink fan-out (v1)
2230// ═══════════════════════════════════════════════════════════════
2231//
2232// Storage port (`CardStore` / `FileCardStore`) stays pure CRUD. The
2233// Event publisher port below mirrors every successful Card write to a
2234// set of subscriber backends configured via the `ALC_CARD_SINKS`
2235// environment variable. Fan-out is best-effort and strictly serial;
2236// subscriber failures never propagate to the primary Card API.
2237
2238// ─── CardEvent / CardEventKind ─────────────────────────────────
2239
2240/// A Card-level event emitted from the write path.
2241///
2242/// Each variant carries the already-serialized payload text so that
2243/// subscribers can persist the exact bytes that were written to the
2244/// primary store (byte-for-byte parity).
2245#[derive(Debug, Clone)]
2246pub enum CardEvent {
2247    /// A brand-new Card was written.
2248    Created {
2249        pkg: String,
2250        card_id: String,
2251        toml_text: String,
2252    },
2253    /// An existing Card had new top-level keys merged in.
2254    Appended { card_id: String, toml_text: String },
2255    /// A samples JSONL sidecar was written for `card_id`.
2256    SamplesWritten { card_id: String, jsonl_text: String },
2257    /// The global alias table was rewritten.
2258    AliasesWritten { toml_text: String },
2259}
2260
2261/// Lightweight discriminant for `CardEvent`. Used as a `HashMap` key in
2262/// `SubscriberStats` so that ok/err counters can be tracked per event
2263/// kind without holding the full payload.
2264#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
2265pub enum CardEventKind {
2266    Created,
2267    Appended,
2268    SamplesWritten,
2269    AliasesWritten,
2270}
2271
2272impl CardEventKind {
2273    /// Stable string tag used in `tracing` log fields.
2274    pub fn as_str(self) -> &'static str {
2275        match self {
2276            CardEventKind::Created => "created",
2277            CardEventKind::Appended => "appended",
2278            CardEventKind::SamplesWritten => "samples_written",
2279            CardEventKind::AliasesWritten => "aliases_written",
2280        }
2281    }
2282
2283    /// Short JSON key for the public `alc_stats` snapshot.
2284    /// Distinct from `as_str()` so that tracing logs keep their
2285    /// verbose form while the JSON surface stays compact.
2286    pub fn json_key(self) -> &'static str {
2287        match self {
2288            CardEventKind::Created => "created",
2289            CardEventKind::Appended => "appended",
2290            CardEventKind::SamplesWritten => "samples",
2291            CardEventKind::AliasesWritten => "aliases",
2292        }
2293    }
2294
2295    /// All kinds in stable display order. Used by `SubscriberHealthRow`
2296    /// to emit all four counter keys even when a counter is zero, so
2297    /// that downstream consumers can rely on field presence.
2298    pub fn all() -> [CardEventKind; 4] {
2299        [
2300            CardEventKind::Created,
2301            CardEventKind::Appended,
2302            CardEventKind::SamplesWritten,
2303            CardEventKind::AliasesWritten,
2304        ]
2305    }
2306}
2307
2308impl Serialize for CardEventKind {
2309    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2310        s.serialize_str(self.json_key())
2311    }
2312}
2313
2314impl CardEvent {
2315    /// Return the `CardEventKind` discriminant for this event.
2316    pub fn kind(&self) -> CardEventKind {
2317        match self {
2318            CardEvent::Created { .. } => CardEventKind::Created,
2319            CardEvent::Appended { .. } => CardEventKind::Appended,
2320            CardEvent::SamplesWritten { .. } => CardEventKind::SamplesWritten,
2321            CardEvent::AliasesWritten { .. } => CardEventKind::AliasesWritten,
2322        }
2323    }
2324}
2325
2326// ─── CardSubscriber trait ──────────────────────────────────────
2327
2328/// A downstream backend that receives `CardEvent`s in best-effort,
2329/// serial fan-out order.
2330///
2331/// Implementations must be `Send + Sync` so that the bus can hold
2332/// `Arc<dyn CardSubscriber>` safely across threads.
2333pub trait CardSubscriber: Send + Sync {
2334    /// Handle one event. Returning `Err` records a failure in
2335    /// `SubscriberStats` and emits a `tracing::warn!`, but does not
2336    /// propagate to the caller of the `_with_store` API.
2337    fn on_event(&self, ev: &CardEvent) -> Result<(), String>;
2338
2339    /// Canonical identity URI for this subscriber. Used as the key in
2340    /// `SubscriberStats` and as the match target for
2341    /// `CardEventBus::publish_to`.
2342    fn describe(&self) -> String;
2343
2344    /// Best-effort check for whether `card_id` already exists in this
2345    /// subscriber. Used by `alc_card_sink_backfill` to skip cards that
2346    /// are already mirrored (drift-safe: we never overwrite).
2347    ///
2348    /// Default implementation returns `Ok(false)` so subscribers that
2349    /// cannot cheaply answer (e.g. future network-backed sinks) always
2350    /// get the push. Override when a cheap local check is possible.
2351    fn has_card(&self, _card_id: &str) -> Result<bool, String> {
2352        Ok(false)
2353    }
2354}
2355
2356// ─── SubscriberStats ───────────────────────────────────────────
2357
2358/// Most recent delivery failure for a single subscriber. Exposed via
2359/// `SubscriberHealthRow.last_error` in the `alc_stats` JSON snapshot.
2360#[derive(Debug, Clone, Serialize)]
2361pub struct LastError {
2362    pub kind: CardEventKind,
2363    pub msg: String,
2364    pub ts_ms: u64,
2365}
2366
2367/// Per-subscriber counter state. Held inside `SubscriberStats` under a
2368/// `Mutex`; `snapshot` clones the fields into an owned `SubscriberHealthRow`
2369/// while the lock is held, so the lock window stays short.
2370#[derive(Default, Debug)]
2371pub struct PerSubscriber {
2372    pub ok: HashMap<CardEventKind, u64>,
2373    pub err: HashMap<CardEventKind, u64>,
2374    pub last_error: Option<LastError>,
2375}
2376
2377/// Process-wide per-subscriber statistics, keyed by subscriber URI
2378/// (the value returned by `CardSubscriber::describe`).
2379#[derive(Default, Debug)]
2380pub struct SubscriberStats {
2381    inner: Mutex<HashMap<String, PerSubscriber>>,
2382}
2383
2384impl SubscriberStats {
2385    /// Record a successful event delivery.
2386    pub fn record_ok(&self, key: &str, kind: CardEventKind) {
2387        let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2388        let entry = g.entry(key.to_string()).or_default();
2389        let c = entry.ok.entry(kind).or_insert(0);
2390        *c = c.saturating_add(1);
2391    }
2392
2393    /// Record a delivery failure together with the error message.
2394    /// The failure kind, message, and timestamp overwrite `last_error`
2395    /// — there is no ring buffer; only the most recent failure is kept.
2396    pub fn record_err(&self, key: &str, kind: CardEventKind, err: &str) {
2397        let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2398        let entry = g.entry(key.to_string()).or_default();
2399        let c = entry.err.entry(kind).or_insert(0);
2400        *c = c.saturating_add(1);
2401        entry.last_error = Some(LastError {
2402            kind,
2403            msg: err.to_string(),
2404            ts_ms: now_ms(),
2405        });
2406    }
2407
2408    /// Take a point-in-time snapshot of all subscribers. The returned
2409    /// `Vec` is owned — the internal lock is released before this
2410    /// function returns, so callers can hold it freely.
2411    ///
2412    /// All four `CardEventKind` keys (`created / appended / samples /
2413    /// aliases`) are always present in both `ok` and `err`, defaulting
2414    /// to 0 if no event of that kind has been recorded. This lets
2415    /// downstream consumers rely on field presence.
2416    pub fn snapshot(&self) -> Vec<SubscriberHealthRow> {
2417        let g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2418        let mut rows = Vec::with_capacity(g.len());
2419        for (sink, ps) in g.iter() {
2420            let mut ok: HashMap<String, u64> = HashMap::with_capacity(4);
2421            let mut err: HashMap<String, u64> = HashMap::with_capacity(4);
2422            for k in CardEventKind::all() {
2423                ok.insert(
2424                    k.json_key().to_string(),
2425                    ps.ok.get(&k).copied().unwrap_or(0),
2426                );
2427                err.insert(
2428                    k.json_key().to_string(),
2429                    ps.err.get(&k).copied().unwrap_or(0),
2430                );
2431            }
2432            rows.push(SubscriberHealthRow {
2433                sink: sink.clone(),
2434                ok,
2435                err,
2436                last_error: ps.last_error.clone(),
2437            });
2438        }
2439        // Stable output ordering (by sink URI) so that the JSON dump is
2440        // deterministic across runs — useful for snapshot tests.
2441        rows.sort_by(|a, b| a.sink.cmp(&b.sink));
2442        rows
2443    }
2444}
2445
2446/// Unix-epoch milliseconds used by `LastError.ts_ms`. Uses
2447/// `unwrap_or_default` so no panic can escape even if the system
2448/// clock is misconfigured.
2449fn now_ms() -> u64 {
2450    std::time::SystemTime::now()
2451        .duration_since(std::time::UNIX_EPOCH)
2452        .unwrap_or_default()
2453        .as_millis() as u64
2454}
2455
2456/// Snapshot row for a single subscriber, serialized directly into the
2457/// `alc_stats` JSON output as one element of the `card_sinks` array.
2458#[derive(Debug, Clone, Serialize)]
2459pub struct SubscriberHealthRow {
2460    pub sink: String,
2461    pub ok: HashMap<String, u64>,
2462    pub err: HashMap<String, u64>,
2463    pub last_error: Option<LastError>,
2464}
2465
2466/// Public entry point: snapshot of all process-wide subscriber stats.
2467/// Wrapper around `event_bus().stats().snapshot()` so that downstream
2468/// crates (notably `algocline-app`) do not need a handle to the
2469/// `CardEventBus` singleton.
2470pub fn subscriber_stats_snapshot() -> Vec<SubscriberHealthRow> {
2471    event_bus().stats().snapshot()
2472}
2473
2474// ─── FileCardSubscriber — file-URI backend ─────────────────────
2475
2476/// Atomically write `bytes` to `dest` by staging to a unique
2477/// `{dest}.tmp.{pid}.{seq}` sibling and renaming. The `{pid}.{seq}`
2478/// suffix prevents concurrent writers (same or different processes)
2479/// from colliding on the tmp path.
2480fn atomic_write(dest: &Path, bytes: &[u8]) -> Result<(), String> {
2481    static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
2482    let seq = TMP_SEQ.fetch_add(1, Ordering::Relaxed);
2483    let pid = process::id();
2484    if let Some(parent) = dest.parent() {
2485        if !parent.as_os_str().is_empty() && !parent.exists() {
2486            fs::create_dir_all(parent).map_err(|e| format!("subscriber mkdir: {e}"))?;
2487        }
2488    }
2489    let mut tmp = dest.as_os_str().to_owned();
2490    tmp.push(format!(".tmp.{pid}.{seq}"));
2491    let tmp_path = PathBuf::from(tmp);
2492    fs::write(&tmp_path, bytes).map_err(|e| format!("subscriber write tmp: {e}"))?;
2493    fs::rename(&tmp_path, dest).map_err(|e| format!("subscriber rename: {e}"))
2494}
2495
2496/// Canonical `file://` URI for a local directory path. The result is
2497/// the inverse of [`decode_file_uri_path`] — the pair round-trips
2498/// through the `ALC_CARD_SINKS` env spec.
2499fn canonical_file_uri(root: &Path) -> String {
2500    let p = root.to_string_lossy();
2501    #[cfg(unix)]
2502    {
2503        format!("file://{p}")
2504    }
2505    #[cfg(windows)]
2506    {
2507        format!("file:///{}", p.replace('\\', "/"))
2508    }
2509    #[cfg(not(any(unix, windows)))]
2510    {
2511        format!("file://{p}")
2512    }
2513}
2514
2515/// A subscriber that mirrors events to a local directory using the
2516/// same two-tier layout as [`FileCardStore`]:
2517///
2518/// - `{root}/{pkg}/{card_id}.toml` — Card TOML
2519/// - `{root}/{pkg}/{card_id}.samples.jsonl` — samples sidecar
2520/// - `{root}/_aliases.toml` — global alias table
2521pub struct FileCardSubscriber {
2522    root: PathBuf,
2523    uri: String,
2524}
2525
2526impl FileCardSubscriber {
2527    /// Construct a subscriber rooted at `root`. The canonical URI is
2528    /// computed once and returned from [`Self::describe`].
2529    pub fn new(root: PathBuf) -> Self {
2530        let uri = canonical_file_uri(&root);
2531        Self { root, uri }
2532    }
2533
2534    /// Scan the subscriber root for a Card with `card_id` under any
2535    /// `{pkg}` subdirectory. Returns `Ok(None)` when the root itself
2536    /// does not exist yet (subscribers are write-once lazy).
2537    pub fn locate_card(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
2538        validate_name(card_id, "card_id")?;
2539        if !self.root.exists() {
2540            return Ok(None);
2541        }
2542        let entries = fs::read_dir(&self.root).map_err(|e| format!("subscriber read_dir: {e}"))?;
2543        for entry in entries.flatten() {
2544            let p = entry.path();
2545            if p.is_dir() {
2546                let candidate = p.join(format!("{card_id}.toml"));
2547                if candidate.exists() {
2548                    return Ok(Some(candidate));
2549                }
2550            }
2551        }
2552        Ok(None)
2553    }
2554
2555    fn ensure_pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
2556        validate_name(pkg, "pkg")?;
2557        let dir = self.root.join(pkg);
2558        if !dir.exists() {
2559            fs::create_dir_all(&dir).map_err(|e| format!("subscriber mkdir: {e}"))?;
2560        }
2561        Ok(dir)
2562    }
2563
2564    fn write_created(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<(), String> {
2565        validate_name(card_id, "card_id")?;
2566        let dir = self.ensure_pkg_dir(pkg)?;
2567        let dest = dir.join(format!("{card_id}.toml"));
2568        atomic_write(&dest, toml_text.as_bytes())
2569    }
2570
2571    fn write_appended(&self, card_id: &str, toml_text: &str) -> Result<(), String> {
2572        match self.locate_card(card_id)? {
2573            Some(dest) => atomic_write(&dest, toml_text.as_bytes()),
2574            None => Err(format!(
2575                "subscriber append: card '{card_id}' missing at {}",
2576                self.uri
2577            )),
2578        }
2579    }
2580
2581    fn write_samples(&self, card_id: &str, jsonl_text: &str) -> Result<(), String> {
2582        let card_path = self.locate_card(card_id)?.ok_or_else(|| {
2583            format!(
2584                "subscriber samples: card '{card_id}' missing at {}",
2585                self.uri
2586            )
2587        })?;
2588        let dir = card_path
2589            .parent()
2590            .ok_or_else(|| format!("subscriber samples: card '{card_id}' has no parent dir"))?;
2591        let dest = dir.join(format!("{card_id}.samples.jsonl"));
2592        atomic_write(&dest, jsonl_text.as_bytes())
2593    }
2594
2595    fn write_aliases(&self, toml_text: &str) -> Result<(), String> {
2596        if !self.root.exists() {
2597            fs::create_dir_all(&self.root).map_err(|e| format!("subscriber mkdir: {e}"))?;
2598        }
2599        let dest = self.root.join("_aliases.toml");
2600        atomic_write(&dest, toml_text.as_bytes())
2601    }
2602}
2603
2604impl CardSubscriber for FileCardSubscriber {
2605    fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
2606        match ev {
2607            CardEvent::Created {
2608                pkg,
2609                card_id,
2610                toml_text,
2611            } => self.write_created(pkg, card_id, toml_text),
2612            CardEvent::Appended { card_id, toml_text } => self.write_appended(card_id, toml_text),
2613            CardEvent::SamplesWritten {
2614                card_id,
2615                jsonl_text,
2616            } => self.write_samples(card_id, jsonl_text),
2617            CardEvent::AliasesWritten { toml_text } => self.write_aliases(toml_text),
2618        }
2619    }
2620
2621    fn describe(&self) -> String {
2622        self.uri.clone()
2623    }
2624
2625    /// Delegates to [`FileCardSubscriber::locate_card`]. A non-existent
2626    /// root (subscriber has never been written to) returns `Ok(false)`,
2627    /// which is the correct "backfill needed" answer.
2628    fn has_card(&self, card_id: &str) -> Result<bool, String> {
2629        Ok(self.locate_card(card_id)?.is_some())
2630    }
2631}
2632
2633// ─── CardEventBus + OnceLock singleton ─────────────────────────
2634
2635/// Process-wide fan-out bus. Subscribers are registered once at startup
2636/// (from `ALC_CARD_SINKS`) and stored behind a `Mutex` so that tests
2637/// can swap them out via `replace_subscribers_for_test` without losing
2638/// the singleton identity.
2639pub struct CardEventBus {
2640    subscribers: Mutex<Vec<Arc<dyn CardSubscriber>>>,
2641    stats: Arc<SubscriberStats>,
2642}
2643
2644impl CardEventBus {
2645    /// Build a bus from an explicit subscriber list. Used both by the
2646    /// env loader and by `install_event_bus_for_test`.
2647    pub fn new(subscribers: Vec<Arc<dyn CardSubscriber>>) -> Self {
2648        Self {
2649            subscribers: Mutex::new(subscribers),
2650            stats: Arc::new(SubscriberStats::default()),
2651        }
2652    }
2653
2654    /// Shared handle to the per-subscriber counters.
2655    pub fn stats(&self) -> &Arc<SubscriberStats> {
2656        &self.stats
2657    }
2658
2659    /// Fan out `ev` to every registered subscriber serially. Subscriber
2660    /// failures are counted in `SubscriberStats` and logged but do not
2661    /// propagate.
2662    pub fn publish(&self, ev: &CardEvent) {
2663        let subs_snapshot: Vec<Arc<dyn CardSubscriber>> = {
2664            let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2665            guard.clone()
2666        };
2667        for sub in &subs_snapshot {
2668            let key = sub.describe();
2669            match sub.on_event(ev) {
2670                Ok(()) => self.stats.record_ok(&key, ev.kind()),
2671                Err(e) => {
2672                    tracing::warn!(
2673                        subscriber = %key,
2674                        kind = ev.kind().as_str(),
2675                        error = %e,
2676                        "card subscriber failed"
2677                    );
2678                    self.stats.record_err(&key, ev.kind(), &e);
2679                }
2680            }
2681        }
2682    }
2683
2684    /// Deliver `ev` to exactly one subscriber identified by
2685    /// `target` URI. Returns `Err` when no subscriber matches or the
2686    /// subscriber itself fails (backfill path needs the caller to know).
2687    pub fn publish_to(&self, target: &str, ev: &CardEvent) -> Result<(), String> {
2688        let hit: Option<Arc<dyn CardSubscriber>> = {
2689            let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2690            guard.iter().find(|s| s.describe() == target).cloned()
2691        };
2692        let Some(sub) = hit else {
2693            return Err(format!("subscriber not registered: {target}"));
2694        };
2695        let key = sub.describe();
2696        match sub.on_event(ev) {
2697            Ok(()) => {
2698                self.stats.record_ok(&key, ev.kind());
2699                Ok(())
2700            }
2701            Err(e) => {
2702                tracing::warn!(
2703                    subscriber = %key,
2704                    kind = ev.kind().as_str(),
2705                    error = %e,
2706                    "card subscriber failed (publish_to)"
2707                );
2708                self.stats.record_err(&key, ev.kind(), &e);
2709                Err(e)
2710            }
2711        }
2712    }
2713
2714    /// List every subscriber URI currently registered on the bus.
2715    pub fn subscriber_uris(&self) -> Vec<String> {
2716        let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2717        guard.iter().map(|s| s.describe()).collect()
2718    }
2719
2720    /// Look up a subscriber by URI (as returned by `describe`). Returns
2721    /// `None` when no subscriber matches. Used by
2722    /// `alc_card_sink_backfill` to dispatch has_card checks against a
2723    /// specific sink.
2724    pub fn find_subscriber(&self, uri: &str) -> Option<Arc<dyn CardSubscriber>> {
2725        let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2726        guard.iter().find(|s| s.describe() == uri).cloned()
2727    }
2728
2729    /// Replace the subscriber list in place while preserving singleton
2730    /// identity and shared `SubscriberStats`. Test-only.
2731    #[cfg(any(test, feature = "test-support"))]
2732    pub fn replace_subscribers_for_test(&self, subs: Vec<Arc<dyn CardSubscriber>>) {
2733        let mut guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2734        *guard = subs;
2735    }
2736
2737    /// Reset the per-subscriber counters. Test-only helper.
2738    #[cfg(any(test, feature = "test-support"))]
2739    pub fn reset_stats_for_test(&self) {
2740        let mut g = self.stats.inner.lock().unwrap_or_else(|p| p.into_inner());
2741        g.clear();
2742    }
2743}
2744
2745static CARD_EVENT_BUS: OnceLock<CardEventBus> = OnceLock::new();
2746
2747/// Return the process-wide `CardEventBus` singleton, initializing it
2748/// on the first call from the `ALC_CARD_SINKS` environment variable.
2749pub fn event_bus() -> &'static CardEventBus {
2750    CARD_EVENT_BUS.get_or_init(|| {
2751        let subs = load_subscribers_from_env();
2752        CardEventBus::new(subs)
2753    })
2754}
2755
2756/// Eagerly initialize the bus. Idempotent and safe to call multiple
2757/// times; intended for startup hooks (`main.rs`) so that subscriber
2758/// registration `tracing::info!` lines are emitted at boot rather than
2759/// on the first Card write.
2760pub fn init_event_bus() {
2761    let bus = event_bus();
2762    let uris = bus.subscriber_uris();
2763    if uris.is_empty() {
2764        tracing::info!("card sinks: no subscribers configured (ALC_CARD_SINKS unset)");
2765    } else {
2766        for uri in &uris {
2767            tracing::info!(subscriber = %uri, "card sink registered");
2768        }
2769    }
2770}
2771
2772/// Install a test-built bus. Fails once the singleton is already set.
2773#[cfg(any(test, feature = "test-support"))]
2774pub fn install_event_bus_for_test(bus: CardEventBus) -> Result<(), String> {
2775    CARD_EVENT_BUS
2776        .set(bus)
2777        .map_err(|_| "bus already initialized".to_string())
2778}
2779
2780/// Convenience wrapper: publish through the singleton.
2781pub fn publish(ev: CardEvent) {
2782    // In test builds, serialize publishing with the subscriber-test mutex so
2783    // that default-store tests running in parallel cannot inject events into
2784    // a subscriber test's mock while the mock is active.  The owning test
2785    // thread sets INSIDE_BUS_TEST to true so it does NOT try to acquire the
2786    // same lock it already holds (re-entrancy safe).
2787    #[cfg(test)]
2788    {
2789        let is_test_owner = INSIDE_BUS_TEST.with(|f| f.get());
2790        if !is_test_owner {
2791            // Block until any active subscriber test releases the lock.
2792            let _gate = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
2793            event_bus().publish(&ev);
2794            return;
2795        }
2796    }
2797    event_bus().publish(&ev);
2798}
2799
2800/// Mutex used in tests to serialise subscriber-test setup against concurrent
2801/// publishes from default-store tests running in parallel.
2802#[cfg(test)]
2803fn bus_test_gate() -> &'static Mutex<()> {
2804    static GATE: OnceLock<Mutex<()>> = OnceLock::new();
2805    GATE.get_or_init(|| Mutex::new(()))
2806}
2807
2808// Thread-local flag: set to `true` by the thread running inside
2809// `with_bus_subscribers` so that `publish` skips the gate (re-entrancy).
2810#[cfg(test)]
2811thread_local! {
2812    static INSIDE_BUS_TEST: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
2813}
2814
2815// ─── alc_card_sink_backfill ────────────────────────────────────
2816
2817/// Result of a [`card_sink_backfill`] run. One row per card the tool
2818/// touched (classified as pushed / skipped / failed / pushed_samples).
2819/// The `failed` entries carry the error message so an operator can
2820/// triage read-only mounts etc.
2821#[derive(Debug, Clone, Default, Serialize)]
2822pub struct SinkBackfillReport {
2823    pub sink: String,
2824    pub pushed: Vec<String>,
2825    pub skipped: Vec<String>,
2826    pub failed: Vec<(String, String)>,
2827    pub pushed_samples: Vec<String>,
2828}
2829
2830/// Backfill one subscriber (`sink` URI) from the primary store.
2831///
2832/// Steps:
2833/// 1. Look up the target subscriber on the event bus; fail fast if
2834///    the URI is not registered so the caller gets an immediate
2835///    error rather than a silent no-op.
2836/// 2. Enumerate every `(pkg, card_id)` pair from the default
2837///    [`CardStore`].
2838/// 3. For each pair, ask the subscriber whether it already has that
2839///    card (`CardSubscriber::has_card`). If yes → skipped (drift-safe,
2840///    no overwrite). If no → read the primary TOML and `publish_to`
2841///    the one target sink. Samples are mirrored the same way.
2842/// 4. `dry_run = true` short-circuits step 3: the report lists
2843///    what would have been pushed but no `publish_to` is issued,
2844///    so `SubscriberStats` does not increment.
2845pub fn card_sink_backfill(sink: &str, dry_run: bool) -> Result<SinkBackfillReport, String> {
2846    let store = default_store()?;
2847    card_sink_backfill_with_store(&store, sink, dry_run)
2848}
2849
2850/// `card_sink_backfill` with an injectable [`CardStore`]. Tests drive
2851/// this directly against a tempdir-backed [`FileCardStore`] to avoid
2852/// touching the user's real `~/.algocline/cards/`.
2853pub fn card_sink_backfill_with_store(
2854    store: &dyn CardStore,
2855    sink: &str,
2856    dry_run: bool,
2857) -> Result<SinkBackfillReport, String> {
2858    let bus = event_bus();
2859    let sub = bus
2860        .find_subscriber(sink)
2861        .ok_or_else(|| format!("unknown sink: {sink}"))?;
2862
2863    let locators = store.list_card_locators(None)?;
2864
2865    let mut report = SinkBackfillReport {
2866        sink: sink.to_string(),
2867        ..Default::default()
2868    };
2869
2870    for (pkg, locator) in locators {
2871        let card_id = match locator.file_stem().and_then(|s| s.to_str()) {
2872            Some(s) => s.to_string(),
2873            None => continue,
2874        };
2875
2876        match sub.has_card(&card_id) {
2877            Ok(true) => {
2878                report.skipped.push(card_id);
2879                continue;
2880            }
2881            Ok(false) => {}
2882            Err(e) => {
2883                tracing::warn!(
2884                    card_id = %card_id,
2885                    error = %e,
2886                    "backfill: has_card failed; treating as skipped"
2887                );
2888                report.skipped.push(card_id);
2889                continue;
2890            }
2891        }
2892
2893        let toml_text = match store.read_locator_text(&locator) {
2894            Ok(Some(t)) => t,
2895            Ok(None) => {
2896                // Unreadable / corrupt on primary. Do not panic; skip.
2897                report.skipped.push(card_id);
2898                continue;
2899            }
2900            Err(e) => {
2901                tracing::warn!(
2902                    card_id = %card_id,
2903                    error = %e,
2904                    "backfill: read_locator_text failed; treating as skipped"
2905                );
2906                report.skipped.push(card_id);
2907                continue;
2908            }
2909        };
2910
2911        if dry_run {
2912            report.pushed.push(card_id.clone());
2913            if matches!(store.read_samples_text(&card_id), Ok(Some(_))) {
2914                report.pushed_samples.push(card_id);
2915            }
2916            continue;
2917        }
2918
2919        let ev = CardEvent::Created {
2920            pkg: pkg.clone(),
2921            card_id: card_id.clone(),
2922            toml_text,
2923        };
2924        match bus.publish_to(sink, &ev) {
2925            Ok(()) => report.pushed.push(card_id.clone()),
2926            Err(e) => {
2927                report.failed.push((card_id, e));
2928                continue;
2929            }
2930        }
2931
2932        if let Ok(Some(jsonl_text)) = store.read_samples_text(&card_id) {
2933            let ev = CardEvent::SamplesWritten {
2934                card_id: card_id.clone(),
2935                jsonl_text,
2936            };
2937            match bus.publish_to(sink, &ev) {
2938                Ok(()) => report.pushed_samples.push(card_id),
2939                Err(e) => {
2940                    report.failed.push((card_id, format!("samples: {e}")));
2941                }
2942            }
2943        }
2944    }
2945
2946    Ok(report)
2947}
2948
2949// ─── ALC_CARD_SINKS env parser ─────────────────────────────────
2950
2951/// Read `ALC_CARD_SINKS` and build one subscriber per accepted spec.
2952/// Malformed entries are logged and skipped; duplicate URIs are
2953/// first-wins. Non-UTF8 values reject the whole env.
2954fn load_subscribers_from_env() -> Vec<Arc<dyn CardSubscriber>> {
2955    let raw = match std::env::var("ALC_CARD_SINKS") {
2956        Ok(v) => v,
2957        Err(std::env::VarError::NotPresent) => return Vec::new(),
2958        Err(std::env::VarError::NotUnicode(_)) => {
2959            tracing::error!("ALC_CARD_SINKS contains non-UTF8 bytes; ignoring entire variable");
2960            return Vec::new();
2961        }
2962    };
2963    parse_subscribers_from_str(&raw)
2964}
2965
2966/// Parse a `|`-separated list of subscriber URIs (the same format used
2967/// by `ALC_CARD_SINKS`). Extracted so tests can exercise the parser
2968/// without touching process environment.
2969fn parse_subscribers_from_str(raw: &str) -> Vec<Arc<dyn CardSubscriber>> {
2970    if raw.is_empty() {
2971        return Vec::new();
2972    }
2973    let mut seen: HashSet<String> = HashSet::new();
2974    let mut out: Vec<Arc<dyn CardSubscriber>> = Vec::new();
2975    for spec in raw.split('|') {
2976        let spec = spec.trim();
2977        if spec.is_empty() {
2978            continue;
2979        }
2980        let Some(sub) = parse_subscriber_spec(spec) else {
2981            continue;
2982        };
2983        let uri = sub.describe();
2984        if !seen.insert(uri.clone()) {
2985            tracing::warn!(subscriber = %uri, "duplicate ALC_CARD_SINKS entry; keeping first");
2986            continue;
2987        }
2988        out.push(sub);
2989    }
2990    out
2991}
2992
2993/// Parse one subscriber spec. v1 only accepts `file:///absolute/path`.
2994fn parse_subscriber_spec(spec: &str) -> Option<Arc<dyn CardSubscriber>> {
2995    // scheme required
2996    let Some(colon_idx) = spec.find(':') else {
2997        tracing::error!(spec, "ALC_CARD_SINKS entry missing URI scheme");
2998        return None;
2999    };
3000    let scheme = &spec[..colon_idx];
3001    let rest = &spec[colon_idx + 1..];
3002    if scheme != "file" {
3003        tracing::error!(spec, scheme, "ALC_CARD_SINKS entry has unknown scheme");
3004        return None;
3005    }
3006    // scheme-specific: must start with `//`
3007    let Some(after_slash) = rest.strip_prefix("//") else {
3008        tracing::error!(spec, "file URI missing '//'");
3009        return None;
3010    };
3011    // split authority / path. path begins with '/'.
3012    let Some(path_start) = after_slash.find('/') else {
3013        tracing::error!(spec, "file URI has no path component");
3014        return None;
3015    };
3016    let authority = &after_slash[..path_start];
3017    let encoded_path = &after_slash[path_start..];
3018    if !authority.is_empty() {
3019        tracing::error!(
3020            spec,
3021            authority,
3022            "file URI with non-empty authority is rejected"
3023        );
3024        return None;
3025    }
3026    let path = decode_file_uri_path(encoded_path)?;
3027    Some(Arc::new(FileCardSubscriber::new(path)))
3028}
3029
3030/// Decode the path portion of a `file://` URI into a `PathBuf`.
3031///
3032/// Unix: a leading `/` is preserved (absolute path).
3033/// Windows: the leading `/` is stripped so that `/C:/a/b` becomes
3034/// `C:/a/b`.
3035fn decode_file_uri_path(encoded: &str) -> Option<PathBuf> {
3036    let decoded = percent_decode(encoded)?;
3037    #[cfg(windows)]
3038    {
3039        // Strip the leading slash so "/C:/foo" -> "C:/foo".
3040        let trimmed = decoded.strip_prefix('/').unwrap_or(&decoded);
3041        Some(PathBuf::from(trimmed))
3042    }
3043    #[cfg(not(windows))]
3044    {
3045        Some(PathBuf::from(decoded))
3046    }
3047}
3048
3049/// Percent-decode a URI path segment. Returns `None` on invalid or
3050/// truncated `%XX` sequences.
3051fn percent_decode(src: &str) -> Option<String> {
3052    let bytes = src.as_bytes();
3053    let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
3054    let mut i = 0;
3055    while i < bytes.len() {
3056        let b = bytes[i];
3057        if b == b'%' {
3058            if i + 2 >= bytes.len() {
3059                tracing::error!(src, "percent-encoded sequence truncated");
3060                return None;
3061            }
3062            let hi = (bytes[i + 1] as char).to_digit(16);
3063            let lo = (bytes[i + 2] as char).to_digit(16);
3064            match (hi, lo) {
3065                (Some(h), Some(l)) => {
3066                    out.push(((h << 4) | l) as u8);
3067                    i += 3;
3068                }
3069                _ => {
3070                    tracing::error!(src, "percent-encoded sequence has non-hex digits");
3071                    return None;
3072                }
3073            }
3074        } else {
3075            out.push(b);
3076            i += 1;
3077        }
3078    }
3079    match String::from_utf8(out) {
3080        Ok(s) => Some(s),
3081        Err(_) => {
3082            tracing::error!(src, "percent-decoded bytes are not valid UTF-8");
3083            None
3084        }
3085    }
3086}
3087
3088#[cfg(test)]
3089mod tests {
3090    use super::*;
3091
3092    fn unique_pkg() -> String {
3093        let ns = std::time::SystemTime::now()
3094            .duration_since(std::time::UNIX_EPOCH)
3095            .unwrap()
3096            .as_nanos();
3097        format!("_test_card_{ns}")
3098    }
3099
3100    fn cleanup(pkg: &str) {
3101        if let Ok(store) = default_store() {
3102            if let Ok(d) = store.pkg_dir(pkg) {
3103                let _ = fs::remove_dir_all(&d);
3104            }
3105        }
3106    }
3107
3108    #[test]
3109    fn minimum_valid_card() {
3110        let pkg = unique_pkg();
3111        let input = json!({ "pkg": { "name": pkg } });
3112        let (id, path) = create(input).unwrap();
3113        assert!(path.exists());
3114        assert!(id.starts_with(&pkg));
3115
3116        let got = get(&id).unwrap().unwrap();
3117        assert_eq!(got["schema_version"], json!(SCHEMA_VERSION));
3118        assert_eq!(got["card_id"], json!(id));
3119        assert_eq!(got["pkg"]["name"], json!(pkg));
3120        assert!(got.get("created_at").is_some());
3121        assert!(got.get("created_by").is_some());
3122
3123        cleanup(&pkg);
3124    }
3125
3126    #[test]
3127    fn create_rejects_missing_pkg_name() {
3128        let err = create(json!({})).unwrap_err();
3129        assert!(err.contains("pkg.name"));
3130    }
3131
3132    #[test]
3133    fn create_is_immutable() {
3134        let pkg = unique_pkg();
3135        let input = json!({
3136            "card_id": "fixed_id_001",
3137            "pkg": { "name": pkg }
3138        });
3139        create(input.clone()).unwrap();
3140        let err = create(input).unwrap_err();
3141        assert!(err.contains("already exists"));
3142        cleanup(&pkg);
3143    }
3144
3145    #[test]
3146    fn create_injects_param_fingerprint() {
3147        let pkg = unique_pkg();
3148        let input = json!({
3149            "pkg": { "name": pkg },
3150            "params": { "depth": 3, "temperature": 0.0 }
3151        });
3152        let (id, _) = create(input).unwrap();
3153        let got = get(&id).unwrap().unwrap();
3154        assert!(got["param_fingerprint"].is_string());
3155        cleanup(&pkg);
3156    }
3157
3158    #[test]
3159    fn list_returns_newest_first() {
3160        let pkg = unique_pkg();
3161        // First card
3162        let (id1, _) = create(json!({
3163            "card_id": format!("{pkg}_a"),
3164            "pkg": { "name": pkg },
3165            "created_at": "2025-01-01T00:00:00Z"
3166        }))
3167        .unwrap();
3168        let (id2, _) = create(json!({
3169            "card_id": format!("{pkg}_b"),
3170            "pkg": { "name": pkg },
3171            "created_at": "2026-01-01T00:00:00Z"
3172        }))
3173        .unwrap();
3174
3175        let rows = list(Some(&pkg)).unwrap();
3176        assert_eq!(rows.len(), 2);
3177        assert_eq!(rows[0].card_id, id2); // newer first
3178        assert_eq!(rows[1].card_id, id1);
3179
3180        cleanup(&pkg);
3181    }
3182
3183    #[test]
3184    fn list_extracts_summary_fields() {
3185        let pkg = unique_pkg();
3186        let (id, _) = create(json!({
3187            "pkg": { "name": pkg },
3188            "model": { "id": "claude-opus-4-6" },
3189            "scenario": { "name": "gsm8k_sample100" },
3190            "stats": { "pass_rate": 0.82 }
3191        }))
3192        .unwrap();
3193
3194        let rows = list(Some(&pkg)).unwrap();
3195        let row = rows.iter().find(|r| r.card_id == id).unwrap();
3196        assert_eq!(row.model.as_deref(), Some("claude-opus-4-6"));
3197        assert_eq!(row.scenario.as_deref(), Some("gsm8k_sample100"));
3198        assert_eq!(row.pass_rate, Some(0.82));
3199
3200        cleanup(&pkg);
3201    }
3202
3203    #[test]
3204    fn get_missing_returns_none() {
3205        assert!(get("does_not_exist_xyz").unwrap().is_none());
3206    }
3207
3208    #[test]
3209    fn card_id_embeds_compact_timestamp() {
3210        let pkg = unique_pkg();
3211        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3212        // Expect: {pkg}_{model}_{YYYYMMDDTHHMMSS}_{hash6}
3213        // After removing the pkg prefix, there should be a segment
3214        // containing 'T' separating date and time.
3215        let tail = id.strip_prefix(&format!("{pkg}_")).unwrap();
3216        let parts: Vec<&str> = tail.split('_').collect();
3217        // parts = [model_short, YYYYMMDDTHHMMSS, hash6]
3218        assert_eq!(parts.len(), 3, "unexpected card_id shape: {id}");
3219        let ts = parts[1];
3220        assert_eq!(ts.len(), 15, "timestamp segment wrong length: {ts}");
3221        assert!(ts.chars().nth(8) == Some('T'), "missing T separator: {ts}");
3222        cleanup(&pkg);
3223    }
3224
3225    #[test]
3226    fn now_compact_format() {
3227        let s = now_compact();
3228        assert_eq!(s.len(), 15);
3229        assert_eq!(s.chars().nth(8), Some('T'));
3230        // All other positions are digits
3231        for (i, c) in s.chars().enumerate() {
3232            if i != 8 {
3233                assert!(c.is_ascii_digit(), "non-digit at pos {i}: {s}");
3234            }
3235        }
3236    }
3237
3238    #[test]
3239    fn short_model_variants() {
3240        assert_eq!(short_model("claude-opus-4-6"), "opus46");
3241        assert_eq!(short_model("gpt-4o"), "4o");
3242        assert_eq!(short_model(""), "model");
3243    }
3244
3245    #[test]
3246    fn two_cards_same_second_different_stats_get_distinct_ids() {
3247        let pkg = unique_pkg();
3248        let input1 = json!({
3249            "pkg": { "name": pkg },
3250            "scenario": { "name": "gsm8k" },
3251            "stats": { "pass_rate": 0.4 }
3252        });
3253        let input2 = json!({
3254            "pkg": { "name": pkg },
3255            "scenario": { "name": "gsm8k" },
3256            "stats": { "pass_rate": 0.9 }
3257        });
3258        let (id1, _) = create(input1).unwrap();
3259        let (id2, _) = create(input2).unwrap();
3260        assert_ne!(id1, id2, "distinct stats must yield distinct card_ids");
3261        cleanup(&pkg);
3262    }
3263
3264    // ─── P1: append ────────────────────────────────────────────
3265
3266    #[test]
3267    fn append_adds_new_fields() {
3268        let pkg = unique_pkg();
3269        let (id, _) = create(json!({
3270            "pkg": { "name": pkg },
3271            "stats": { "pass_rate": 0.5 }
3272        }))
3273        .unwrap();
3274
3275        let merged = append(
3276            &id,
3277            json!({
3278                "caveats": { "notes": "rescored after fix" },
3279                "metadata": { "reviewer": "yn" }
3280            }),
3281        )
3282        .unwrap();
3283        assert_eq!(merged["caveats"]["notes"], json!("rescored after fix"));
3284        assert_eq!(merged["metadata"]["reviewer"], json!("yn"));
3285
3286        // Persisted
3287        let got = get(&id).unwrap().unwrap();
3288        assert_eq!(got["caveats"]["notes"], json!("rescored after fix"));
3289        // Existing field untouched
3290        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3291
3292        cleanup(&pkg);
3293    }
3294
3295    #[test]
3296    fn append_rejects_existing_key() {
3297        let pkg = unique_pkg();
3298        let (id, _) = create(json!({
3299            "pkg": { "name": pkg },
3300            "stats": { "pass_rate": 0.5 }
3301        }))
3302        .unwrap();
3303
3304        let err = append(&id, json!({ "stats": { "pass_rate": 0.9 } })).unwrap_err();
3305        assert!(err.contains("already set"), "got: {err}");
3306        // Verify original value still there
3307        let got = get(&id).unwrap().unwrap();
3308        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3309
3310        cleanup(&pkg);
3311    }
3312
3313    #[test]
3314    fn append_errors_on_missing_card() {
3315        let err = append("does_not_exist_xyz", json!({ "x": 1 })).unwrap_err();
3316        assert!(err.contains("not found"));
3317    }
3318
3319    // ─── P1: alias_set / alias_list ────────────────────────────
3320
3321    #[test]
3322    fn alias_set_and_list_roundtrip() {
3323        let pkg = unique_pkg();
3324        let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3325
3326        let alias_name = format!("test_alias_{}", &pkg);
3327        alias_set(&alias_name, &id, Some(&pkg), Some("smoke")).unwrap();
3328
3329        let rows = alias_list(Some(&pkg)).unwrap();
3330        let a = rows.iter().find(|a| a.name == alias_name).unwrap();
3331        assert_eq!(a.card_id, id);
3332        assert_eq!(a.pkg.as_deref(), Some(pkg.as_str()));
3333        assert_eq!(a.note.as_deref(), Some("smoke"));
3334        assert!(!a.set_at.is_empty());
3335
3336        // Rebind to a new card
3337        let (id2, _) = create(json!({
3338            "card_id": format!("{pkg}_b"),
3339            "pkg": { "name": pkg }
3340        }))
3341        .unwrap();
3342        alias_set(&alias_name, &id2, Some(&pkg), None).unwrap();
3343        let rows = alias_list(Some(&pkg)).unwrap();
3344        let matching: Vec<&Alias> = rows.iter().filter(|a| a.name == alias_name).collect();
3345        assert_eq!(matching.len(), 1, "alias should be unique by name");
3346        assert_eq!(matching[0].card_id, id2);
3347
3348        // Cleanup: remove our alias from the file
3349        let store = default_store().unwrap();
3350        let remaining: Vec<Alias> = store
3351            .read_aliases()
3352            .unwrap()
3353            .into_iter()
3354            .filter(|a| a.name != alias_name)
3355            .collect();
3356        store.write_aliases(&remaining).unwrap();
3357        cleanup(&pkg);
3358    }
3359
3360    #[test]
3361    fn alias_set_rejects_unknown_card() {
3362        let err = alias_set("x", "does_not_exist_xyz", None, None).unwrap_err();
3363        assert!(err.contains("not found"));
3364    }
3365
3366    // ─── find + where DSL ───────────────────────────────────────
3367
3368    fn where_from(v: Json) -> Predicate {
3369        parse_where(&v).expect("parse where")
3370    }
3371
3372    fn order_from(v: Json) -> Vec<OrderKey> {
3373        parse_order_by(&v).expect("parse order_by")
3374    }
3375
3376    #[test]
3377    fn find_where_nested_eq_and_gte() {
3378        let pkg = unique_pkg();
3379        create(json!({
3380            "card_id": format!("{pkg}_low"),
3381            "pkg": { "name": pkg },
3382            "scenario": { "name": "gsm8k" },
3383            "stats": { "pass_rate": 0.4 }
3384        }))
3385        .unwrap();
3386        create(json!({
3387            "card_id": format!("{pkg}_high"),
3388            "pkg": { "name": pkg },
3389            "scenario": { "name": "gsm8k" },
3390            "stats": { "pass_rate": 0.9 }
3391        }))
3392        .unwrap();
3393        create(json!({
3394            "card_id": format!("{pkg}_other"),
3395            "pkg": { "name": pkg },
3396            "scenario": { "name": "other" },
3397            "stats": { "pass_rate": 1.0 }
3398        }))
3399        .unwrap();
3400
3401        // scenario eq via nested object
3402        let rows = find(FindQuery {
3403            pkg: Some(pkg.clone()),
3404            where_: Some(where_from(json!({
3405                "scenario": { "name": "gsm8k" },
3406            }))),
3407            order_by: order_from(json!("-stats.pass_rate")),
3408            ..Default::default()
3409        })
3410        .unwrap();
3411        assert_eq!(rows.len(), 2);
3412        assert_eq!(rows[0].pass_rate, Some(0.9));
3413        assert_eq!(rows[1].pass_rate, Some(0.4));
3414
3415        // gte operator
3416        let rows = find(FindQuery {
3417            pkg: Some(pkg.clone()),
3418            where_: Some(where_from(json!({
3419                "stats": { "pass_rate": { "gte": 0.8 } },
3420            }))),
3421            order_by: order_from(json!("-stats.pass_rate")),
3422            ..Default::default()
3423        })
3424        .unwrap();
3425        assert_eq!(rows.len(), 2);
3426        assert!(rows.iter().all(|r| r.pass_rate.unwrap() >= 0.8));
3427
3428        // limit
3429        let rows = find(FindQuery {
3430            pkg: Some(pkg.clone()),
3431            order_by: order_from(json!("-stats.pass_rate")),
3432            limit: Some(1),
3433            ..Default::default()
3434        })
3435        .unwrap();
3436        assert_eq!(rows.len(), 1);
3437        assert_eq!(rows[0].pass_rate, Some(1.0));
3438
3439        cleanup(&pkg);
3440    }
3441
3442    #[test]
3443    fn find_where_implicit_eq_and_logical() {
3444        let pkg = unique_pkg();
3445        create(json!({
3446            "card_id": format!("{pkg}_a"),
3447            "pkg": { "name": pkg },
3448            "model": { "id": "claude-opus-4-6" },
3449            "stats": { "equilibrium_position": "dead", "survival_rate": 0.0 }
3450        }))
3451        .unwrap();
3452        create(json!({
3453            "card_id": format!("{pkg}_b"),
3454            "pkg": { "name": pkg },
3455            "model": { "id": "claude-opus-4-6" },
3456            "stats": { "equilibrium_position": "niche_leader", "survival_rate": 1.0 }
3457        }))
3458        .unwrap();
3459        create(json!({
3460            "card_id": format!("{pkg}_c"),
3461            "pkg": { "name": pkg },
3462            "model": { "id": "claude-haiku-4-5-20251001" },
3463            "stats": { "equilibrium_position": "fragile", "survival_rate": 0.2 }
3464        }))
3465        .unwrap();
3466
3467        // implicit eq on sparse stats field
3468        let rows = find(FindQuery {
3469            pkg: Some(pkg.clone()),
3470            where_: Some(where_from(json!({
3471                "stats": { "equilibrium_position": "dead" },
3472            }))),
3473            ..Default::default()
3474        })
3475        .unwrap();
3476        assert_eq!(rows.len(), 1);
3477        assert!(rows[0].card_id.ends_with("_a"));
3478
3479        // _or
3480        let rows = find(FindQuery {
3481            pkg: Some(pkg.clone()),
3482            where_: Some(where_from(json!({
3483                "_or": [
3484                    { "stats": { "equilibrium_position": "dead" } },
3485                    { "stats": { "survival_rate": { "gte": 0.9 } } },
3486                ],
3487            }))),
3488            ..Default::default()
3489        })
3490        .unwrap();
3491        assert_eq!(rows.len(), 2);
3492
3493        // _not
3494        let rows = find(FindQuery {
3495            pkg: Some(pkg.clone()),
3496            where_: Some(where_from(json!({
3497                "_not": { "model": { "id": "claude-haiku-4-5-20251001" } },
3498            }))),
3499            ..Default::default()
3500        })
3501        .unwrap();
3502        assert_eq!(rows.len(), 2);
3503
3504        // in operator
3505        let rows = find(FindQuery {
3506            pkg: Some(pkg.clone()),
3507            where_: Some(where_from(json!({
3508                "stats": {
3509                    "equilibrium_position": { "in": ["dead", "fragile"] },
3510                },
3511            }))),
3512            ..Default::default()
3513        })
3514        .unwrap();
3515        assert_eq!(rows.len(), 2);
3516
3517        // exists false (sparse field missing on haiku card? all have it, so test on
3518        // a field that only some have)
3519        let rows = find(FindQuery {
3520            pkg: Some(pkg.clone()),
3521            where_: Some(where_from(json!({
3522                "strategy_params": { "temperature": { "exists": false } },
3523            }))),
3524            ..Default::default()
3525        })
3526        .unwrap();
3527        assert_eq!(rows.len(), 3, "none of the cards have strategy_params");
3528
3529        cleanup(&pkg);
3530    }
3531
3532    #[test]
3533    fn find_order_by_multi_key() {
3534        let pkg = unique_pkg();
3535        create(json!({
3536            "card_id": format!("{pkg}_a"),
3537            "pkg": { "name": pkg },
3538            "stats": { "pass_rate": 0.5 }
3539        }))
3540        .unwrap();
3541        create(json!({
3542            "card_id": format!("{pkg}_b"),
3543            "pkg": { "name": pkg },
3544            "stats": { "pass_rate": 0.9 }
3545        }))
3546        .unwrap();
3547        create(json!({
3548            "card_id": format!("{pkg}_c"),
3549            "pkg": { "name": pkg },
3550            "stats": { "pass_rate": 0.9 }
3551        }))
3552        .unwrap();
3553
3554        let rows = find(FindQuery {
3555            pkg: Some(pkg.clone()),
3556            order_by: order_from(json!(["-stats.pass_rate", "card_id"])),
3557            ..Default::default()
3558        })
3559        .unwrap();
3560        assert_eq!(rows.len(), 3);
3561        assert_eq!(rows[0].pass_rate, Some(0.9));
3562        assert_eq!(rows[1].pass_rate, Some(0.9));
3563        assert_eq!(rows[2].pass_rate, Some(0.5));
3564        // Tiebreak by card_id ascending
3565        assert!(rows[0].card_id < rows[1].card_id);
3566
3567        cleanup(&pkg);
3568    }
3569
3570    #[test]
3571    fn find_offset_and_limit() {
3572        let pkg = unique_pkg();
3573        for i in 0..5 {
3574            create(json!({
3575                "card_id": format!("{pkg}_{i}"),
3576                "pkg": { "name": pkg },
3577                "stats": { "pass_rate": 0.1 * (i + 1) as f64 }
3578            }))
3579            .unwrap();
3580        }
3581
3582        let rows = find(FindQuery {
3583            pkg: Some(pkg.clone()),
3584            order_by: order_from(json!("-stats.pass_rate")),
3585            offset: Some(1),
3586            limit: Some(2),
3587            ..Default::default()
3588        })
3589        .unwrap();
3590        assert_eq!(rows.len(), 2);
3591        // Best is 0.5, after offset=1 we start at 0.4 then 0.3.
3592        let pr0 = rows[0].pass_rate.unwrap();
3593        let pr1 = rows[1].pass_rate.unwrap();
3594        assert!((pr0 - 0.4).abs() < 1e-9, "got {pr0}");
3595        assert!((pr1 - 0.3).abs() < 1e-9, "got {pr1}");
3596
3597        cleanup(&pkg);
3598    }
3599
3600    #[test]
3601    fn parse_where_rejects_non_object() {
3602        assert!(parse_where(&json!("not an object")).is_err());
3603        assert!(parse_where(&json!(42)).is_err());
3604    }
3605
3606    #[test]
3607    fn parse_order_by_accepts_string_and_array() {
3608        let k = parse_order_by(&json!("-stats.pass_rate")).unwrap();
3609        assert_eq!(k.len(), 1);
3610        assert_eq!(k[0].path, vec!["stats", "pass_rate"]);
3611        assert!(k[0].desc);
3612
3613        let k = parse_order_by(&json!(["created_at", "-stats.n"])).unwrap();
3614        assert_eq!(k.len(), 2);
3615        assert!(!k[0].desc);
3616        assert!(k[1].desc);
3617    }
3618
3619    #[test]
3620    fn find_where_string_ops_contains_and_starts_with() {
3621        let pkg = unique_pkg();
3622        create(json!({
3623            "card_id": format!("{pkg}_a"),
3624            "pkg": { "name": pkg },
3625            "model": { "id": "claude-opus-4-6" },
3626            "metadata": { "tag": "experiment_alpha" },
3627        }))
3628        .unwrap();
3629        create(json!({
3630            "card_id": format!("{pkg}_b"),
3631            "pkg": { "name": pkg },
3632            "model": { "id": "claude-haiku-4-5-20251001" },
3633            "metadata": { "tag": "experiment_beta" },
3634        }))
3635        .unwrap();
3636        create(json!({
3637            "card_id": format!("{pkg}_c"),
3638            "pkg": { "name": pkg },
3639            "model": { "id": "claude-sonnet-4-5" },
3640            "metadata": { "tag": "baseline" },
3641        }))
3642        .unwrap();
3643
3644        // contains: matches substring anywhere
3645        let rows = find(FindQuery {
3646            pkg: Some(pkg.clone()),
3647            where_: Some(where_from(json!({
3648                "metadata": { "tag": { "contains": "experiment" } },
3649            }))),
3650            ..Default::default()
3651        })
3652        .unwrap();
3653        assert_eq!(rows.len(), 2);
3654
3655        // starts_with: matches only the prefix
3656        let rows = find(FindQuery {
3657            pkg: Some(pkg.clone()),
3658            where_: Some(where_from(json!({
3659                "model": { "id": { "starts_with": "claude-opus" } },
3660            }))),
3661            ..Default::default()
3662        })
3663        .unwrap();
3664        assert_eq!(rows.len(), 1);
3665        assert!(rows[0].card_id.ends_with("_a"));
3666
3667        // string ops on missing field → false
3668        let rows = find(FindQuery {
3669            pkg: Some(pkg.clone()),
3670            where_: Some(where_from(json!({
3671                "metadata": { "missing_field": { "contains": "x" } },
3672            }))),
3673            ..Default::default()
3674        })
3675        .unwrap();
3676        assert_eq!(rows.len(), 0);
3677
3678        // string ops on non-string field → false
3679        let rows = find(FindQuery {
3680            pkg: Some(pkg.clone()),
3681            where_: Some(where_from(json!({
3682                "metadata": { "tag": { "starts_with": 42 } },
3683            }))),
3684            ..Default::default()
3685        })
3686        .unwrap();
3687        assert_eq!(rows.len(), 0);
3688
3689        cleanup(&pkg);
3690    }
3691
3692    #[test]
3693    fn where_missing_field_ne_is_true() {
3694        let pkg = unique_pkg();
3695        create(json!({
3696            "card_id": format!("{pkg}_x"),
3697            "pkg": { "name": pkg },
3698        }))
3699        .unwrap();
3700
3701        let rows = find(FindQuery {
3702            pkg: Some(pkg.clone()),
3703            where_: Some(where_from(json!({
3704                "strategy_params": { "temperature": { "ne": 0.5 } },
3705            }))),
3706            ..Default::default()
3707        })
3708        .unwrap();
3709        assert_eq!(rows.len(), 1, "missing field is ne to anything");
3710
3711        cleanup(&pkg);
3712    }
3713
3714    // ─── lineage ───────────────────────────────────────────────
3715
3716    /// Helper: create a child Card pointing at a parent with a relation.
3717    fn create_child(pkg: &str, suffix: &str, parent_id: &str, relation: &str) -> String {
3718        let (id, _) = create(json!({
3719            "card_id": format!("{pkg}_{suffix}"),
3720            "pkg": { "name": pkg },
3721            "stats": { "pass_rate": 0.5 },
3722            "metadata": {
3723                "prior_card_id": parent_id,
3724                "prior_relation": relation,
3725            },
3726        }))
3727        .unwrap();
3728        id
3729    }
3730
3731    #[test]
3732    fn lineage_up_walks_prior_card_id_chain() {
3733        let pkg = unique_pkg();
3734        // a → b → c (c is newest; b points at a; c points at b)
3735        let (a, _) = create(json!({
3736            "card_id": format!("{pkg}_a"),
3737            "pkg": { "name": pkg },
3738        }))
3739        .unwrap();
3740        let b = create_child(&pkg, "b", &a, "rerun_of");
3741        let c = create_child(&pkg, "c", &b, "rerun_of");
3742
3743        let res = lineage(LineageQuery {
3744            card_id: c.clone(),
3745            direction: LineageDirection::Up,
3746            depth: None,
3747            include_stats: false,
3748            relation_filter: None,
3749        })
3750        .unwrap()
3751        .expect("lineage result");
3752
3753        assert_eq!(res.root, c);
3754        assert_eq!(res.nodes.len(), 3, "root + 2 ancestors");
3755        assert_eq!(res.nodes[0].card_id, c);
3756        assert_eq!(res.nodes[0].depth, 0);
3757        assert_eq!(res.nodes[1].card_id, b);
3758        assert_eq!(res.nodes[1].depth, -1);
3759        assert_eq!(res.nodes[2].card_id, a);
3760        assert_eq!(res.nodes[2].depth, -2);
3761        assert_eq!(res.edges.len(), 2);
3762        assert!(!res.truncated);
3763
3764        cleanup(&pkg);
3765    }
3766
3767    #[test]
3768    fn lineage_down_walks_descendants_breadth_first() {
3769        let pkg = unique_pkg();
3770        // a has two children b, c; c has one child d.
3771        let (a, _) = create(json!({
3772            "card_id": format!("{pkg}_a"),
3773            "pkg": { "name": pkg },
3774        }))
3775        .unwrap();
3776        let _b = create_child(&pkg, "b", &a, "sweep_variant");
3777        let c = create_child(&pkg, "c", &a, "sweep_variant");
3778        let _d = create_child(&pkg, "d", &c, "rerun_of");
3779
3780        let res = lineage(LineageQuery {
3781            card_id: a.clone(),
3782            direction: LineageDirection::Down,
3783            depth: None,
3784            include_stats: false,
3785            relation_filter: None,
3786        })
3787        .unwrap()
3788        .expect("lineage result");
3789
3790        // root + b + c + d = 4 nodes
3791        assert_eq!(res.nodes.len(), 4);
3792        assert_eq!(res.edges.len(), 3);
3793        assert!(!res.truncated);
3794
3795        cleanup(&pkg);
3796    }
3797
3798    #[test]
3799    fn lineage_depth_truncation_sets_flag() {
3800        let pkg = unique_pkg();
3801        let (a, _) = create(json!({
3802            "card_id": format!("{pkg}_a"),
3803            "pkg": { "name": pkg },
3804        }))
3805        .unwrap();
3806        let b = create_child(&pkg, "b", &a, "rerun_of");
3807        let _c = create_child(&pkg, "c", &b, "rerun_of");
3808
3809        let res = lineage(LineageQuery {
3810            card_id: a,
3811            direction: LineageDirection::Down,
3812            depth: Some(1),
3813            include_stats: false,
3814            relation_filter: None,
3815        })
3816        .unwrap()
3817        .unwrap();
3818        assert_eq!(res.nodes.len(), 2, "root + 1 level");
3819        assert!(res.truncated, "should be truncated at depth=1");
3820
3821        cleanup(&pkg);
3822    }
3823
3824    #[test]
3825    fn lineage_relation_filter_skips_unlisted() {
3826        let pkg = unique_pkg();
3827        let (a, _) = create(json!({
3828            "card_id": format!("{pkg}_a"),
3829            "pkg": { "name": pkg },
3830        }))
3831        .unwrap();
3832        let _b = create_child(&pkg, "b", &a, "sweep_variant");
3833        let _c = create_child(&pkg, "c", &a, "rerun_of");
3834
3835        let res = lineage(LineageQuery {
3836            card_id: a,
3837            direction: LineageDirection::Down,
3838            depth: None,
3839            include_stats: false,
3840            relation_filter: Some(vec!["sweep_variant".to_string()]),
3841        })
3842        .unwrap()
3843        .unwrap();
3844        assert_eq!(res.nodes.len(), 2, "root + only sweep_variant child");
3845        assert_eq!(res.edges[0].relation.as_deref(), Some("sweep_variant"));
3846
3847        cleanup(&pkg);
3848    }
3849
3850    #[test]
3851    fn lineage_missing_card_returns_none() {
3852        let res = lineage(LineageQuery {
3853            card_id: "nonexistent_card_id_xyz".into(),
3854            direction: LineageDirection::Up,
3855            depth: None,
3856            include_stats: false,
3857            relation_filter: None,
3858        })
3859        .unwrap();
3860        assert!(res.is_none());
3861    }
3862
3863    // ─── samples sidecar ───────────────────────────────────────
3864
3865    // Isolated `FileCardStore::new(tempdir)` sidesteps the shared-root race
3866    // in `find_card_locator`; see `read_samples_empty_when_absent` for the
3867    // full root-cause write-up.
3868    #[test]
3869    fn write_and_read_samples_roundtrip() {
3870        let tmp = tempfile::tempdir().unwrap();
3871        let store = FileCardStore::new(tmp.path().to_path_buf());
3872        let (id, _) = create_with_store(
3873            &store,
3874            json!({
3875                "pkg": { "name": "roundtrip_pkg" },
3876                "stats": { "pass_rate": 0.5 }
3877            }),
3878        )
3879        .unwrap();
3880
3881        let samples = vec![
3882            json!({ "case": "c0", "passed": true, "score": 1.0 }),
3883            json!({ "case": "c1", "passed": false, "score": 0.0 }),
3884            json!({ "case": "c2", "passed": true, "score": 0.75 }),
3885        ];
3886        let path = write_samples_with_store(&store, &id, samples.clone()).unwrap();
3887        assert!(path.exists());
3888        assert!(path.to_string_lossy().ends_with(".samples.jsonl"));
3889
3890        let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
3891        assert_eq!(got.len(), 3);
3892        assert_eq!(got[0]["case"], json!("c0"));
3893        assert_eq!(got[2]["score"], json!(0.75));
3894
3895        // offset + limit
3896        let slice = read_samples_with_store(
3897            &store,
3898            &id,
3899            SamplesQuery {
3900                offset: 1,
3901                limit: Some(1),
3902                where_: None,
3903            },
3904        )
3905        .unwrap();
3906        assert_eq!(slice.len(), 1);
3907        assert_eq!(slice[0]["case"], json!("c1"));
3908    }
3909
3910    #[test]
3911    fn write_samples_is_write_once() {
3912        let tmp = tempfile::tempdir().unwrap();
3913        let store = FileCardStore::new(tmp.path().to_path_buf());
3914        let (id, _) =
3915            create_with_store(&store, json!({ "pkg": { "name": "write_once_pkg" } })).unwrap();
3916        write_samples_with_store(&store, &id, vec![json!({ "x": 1 })]).unwrap();
3917        let err = write_samples_with_store(&store, &id, vec![json!({ "x": 2 })]).unwrap_err();
3918        assert!(err.contains("already exist"), "got: {err}");
3919    }
3920
3921    // Previously used `create` / `read_samples` (default `~/.algocline/cards/`
3922    // store). Under `cargo test --workspace` parallel runs, `find_card_locator`
3923    // scans the shared root with `fs::read_dir(...).flatten()` which silently
3924    // drops transient I/O errors — on macOS APFS, a concurrent `remove_dir_all`
3925    // from another test's `cleanup(pkg)` could trigger that transient error and
3926    // cause this test's just-created pkg dir entry to be missed, propagating
3927    // `card '...' not found` up through `samples_path` → `read_samples`.
3928    //
3929    // Isolating via `FileCardStore::new(tempdir)` + `_with_store` variants
3930    // sidesteps the shared-root race entirely. Same pattern as
3931    // `custom_root_file_store_roundtrip` / `test_fanout_concurrent_*`.
3932    #[test]
3933    fn read_samples_empty_when_absent() {
3934        let tmp = tempfile::tempdir().unwrap();
3935        let store = FileCardStore::new(tmp.path().to_path_buf());
3936        let (id, _) = create_with_store(
3937            &store,
3938            json!({ "pkg": { "name": "read_samples_empty_pkg" } }),
3939        )
3940        .unwrap();
3941        let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
3942        assert!(got.is_empty());
3943    }
3944
3945    #[test]
3946    fn read_samples_where_filters_rows() {
3947        let tmp = tempfile::tempdir().unwrap();
3948        let store = FileCardStore::new(tmp.path().to_path_buf());
3949        let (id, _) =
3950            create_with_store(&store, json!({ "pkg": { "name": "where_filter_pkg" } })).unwrap();
3951        write_samples_with_store(
3952            &store,
3953            &id,
3954            vec![
3955                json!({ "case": "c0", "passed": true,  "score": 1.0 }),
3956                json!({ "case": "c1", "passed": false, "score": 0.0 }),
3957                json!({ "case": "c2", "passed": true,  "score": 0.25 }),
3958                json!({ "case": "c3", "passed": true,  "score": 0.75 }),
3959                json!({ "case": "c4", "passed": false, "score": 0.5 }),
3960            ],
3961        )
3962        .unwrap();
3963
3964        // Equality predicate: passed == true keeps 3 rows.
3965        let pred = parse_where(&json!({ "passed": true })).unwrap();
3966        let got = read_samples_with_store(
3967            &store,
3968            &id,
3969            SamplesQuery {
3970                offset: 0,
3971                limit: None,
3972                where_: Some(pred),
3973            },
3974        )
3975        .unwrap();
3976        assert_eq!(got.len(), 3);
3977        assert_eq!(got[0]["case"], json!("c0"));
3978        assert_eq!(got[1]["case"], json!("c2"));
3979        assert_eq!(got[2]["case"], json!("c3"));
3980
3981        // Nested comparator: score gte 0.5 keeps c0/c3/c4.
3982        let pred = parse_where(&json!({ "score": { "gte": 0.5 } })).unwrap();
3983        let got = read_samples_with_store(
3984            &store,
3985            &id,
3986            SamplesQuery {
3987                offset: 0,
3988                limit: None,
3989                where_: Some(pred),
3990            },
3991        )
3992        .unwrap();
3993        assert_eq!(got.len(), 3);
3994        assert_eq!(got[0]["case"], json!("c0"));
3995        assert_eq!(got[1]["case"], json!("c3"));
3996        assert_eq!(got[2]["case"], json!("c4"));
3997
3998        // Offset applies AFTER filter: passed=true then skip 1 + limit 1 → c2.
3999        let pred = parse_where(&json!({ "passed": true })).unwrap();
4000        let slice = read_samples_with_store(
4001            &store,
4002            &id,
4003            SamplesQuery {
4004                offset: 1,
4005                limit: Some(1),
4006                where_: Some(pred),
4007            },
4008        )
4009        .unwrap();
4010        assert_eq!(slice.len(), 1);
4011        assert_eq!(slice[0]["case"], json!("c2"));
4012    }
4013
4014    #[test]
4015    fn get_by_alias_roundtrip() {
4016        let pkg = unique_pkg();
4017        let (id, _) = create(json!({
4018            "pkg": { "name": pkg },
4019            "stats": { "pass_rate": 0.85 }
4020        }))
4021        .unwrap();
4022
4023        let alias_name = format!("best_{pkg}");
4024        alias_set(&alias_name, &id, Some(&pkg), None).unwrap();
4025
4026        let card = get_by_alias(&alias_name).unwrap().unwrap();
4027        assert_eq!(card["card_id"], json!(id));
4028        assert_eq!(card["stats"]["pass_rate"], json!(0.85));
4029
4030        assert!(get_by_alias("nonexistent_alias_xyz").unwrap().is_none());
4031
4032        cleanup(&pkg);
4033    }
4034
4035    #[test]
4036    fn samples_errors_on_missing_card() {
4037        let tmp = tempfile::tempdir().unwrap();
4038        let store = FileCardStore::new(tmp.path().to_path_buf());
4039        let err = write_samples_with_store(&store, "does_not_exist_xyz_samples", vec![json!({})])
4040            .unwrap_err();
4041        assert!(err.contains("not found"));
4042    }
4043
4044    // ─── import_from_dir ───────────────────────────────────────
4045
4046    #[test]
4047    fn import_from_dir_copies_cards() {
4048        let pkg = "import_copies_pkg";
4049        let src_tmp = tempfile::tempdir().unwrap();
4050        let store_tmp = tempfile::tempdir().unwrap();
4051        let store = FileCardStore::new(store_tmp.path().to_path_buf());
4052
4053        // Create a source card file
4054        let card_id = format!("{pkg}_imported");
4055        let card_content = format!(
4056            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{card_id}\"\npkg = \"{pkg}\"\n"
4057        );
4058        fs::write(
4059            src_tmp.path().join(format!("{card_id}.toml")),
4060            &card_content,
4061        )
4062        .unwrap();
4063
4064        // Create a matching samples file
4065        fs::write(
4066            src_tmp.path().join(format!("{card_id}.samples.jsonl")),
4067            "{\"case\":\"c0\"}\n",
4068        )
4069        .unwrap();
4070
4071        let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4072        assert_eq!(imported, vec![card_id.clone()]);
4073        assert!(skipped.is_empty());
4074
4075        // Verify card was imported
4076        let got = get_with_store(&store, &card_id).unwrap().unwrap();
4077        assert_eq!(got["card_id"], json!(card_id));
4078
4079        // Verify samples were copied
4080        let samples = read_samples_with_store(&store, &card_id, SamplesQuery::default()).unwrap();
4081        assert_eq!(samples.len(), 1);
4082    }
4083
4084    #[test]
4085    fn import_from_dir_skips_existing() {
4086        let pkg = unique_pkg();
4087        // Create a card in the store first
4088        let (existing_id, _) = create(json!({
4089            "pkg": { "name": pkg },
4090            "stats": { "pass_rate": 0.5 }
4091        }))
4092        .unwrap();
4093
4094        // Try to import a card with the same id
4095        let tmp = tempfile::tempdir().unwrap();
4096        let card_content = format!(
4097            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{existing_id}\"\npkg = \"{pkg}\"\n"
4098        );
4099        fs::write(
4100            tmp.path().join(format!("{existing_id}.toml")),
4101            &card_content,
4102        )
4103        .unwrap();
4104
4105        let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4106        assert!(imported.is_empty());
4107        assert_eq!(skipped, vec![existing_id.clone()]);
4108
4109        // Original card untouched
4110        let got = get(&existing_id).unwrap().unwrap();
4111        assert_eq!(got["stats"]["pass_rate"], json!(0.5));
4112
4113        cleanup(&pkg);
4114    }
4115
4116    #[test]
4117    fn import_from_dir_skips_non_card_toml() {
4118        let pkg = unique_pkg();
4119        let tmp = tempfile::tempdir().unwrap();
4120
4121        // A TOML file without schema_version = "card/v0" should be skipped
4122        fs::write(tmp.path().join("not_a_card.toml"), "title = \"hello\"\n").unwrap();
4123
4124        let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4125        assert!(imported.is_empty());
4126        assert!(skipped.is_empty());
4127
4128        cleanup(&pkg);
4129    }
4130
4131    // ─── PathCardStore (FileCardStore rooted at a custom path) ──────
4132    //
4133    // Smoke test proving the trait boundary lets callers swap the
4134    // storage root without touching `~/.algocline/cards/`.
4135
4136    #[test]
4137    fn custom_root_file_store_roundtrip() {
4138        let tmp = tempfile::tempdir().unwrap();
4139        let store = FileCardStore::new(tmp.path().to_path_buf());
4140        let pkg = "custom_root_pkg";
4141
4142        // create → get → list through the _with_store variants
4143        let (id, path) = create_with_store(
4144            &store,
4145            json!({
4146                "pkg":   { "name": pkg },
4147                "model": { "id": "gpt-test" },
4148            }),
4149        )
4150        .unwrap();
4151        assert!(path.starts_with(tmp.path()));
4152        assert!(path.ends_with(format!("{id}.toml")));
4153
4154        let card = get_with_store(&store, &id).unwrap().expect("card exists");
4155        assert_eq!(
4156            card.get("card_id").and_then(|v| v.as_str()),
4157            Some(id.as_str())
4158        );
4159
4160        let rows = list_with_store(&store, Some(pkg)).unwrap();
4161        assert_eq!(rows.len(), 1);
4162        assert_eq!(rows[0].card_id, id);
4163
4164        // Ensure the default store is not polluted.
4165        let default_rows = list(Some(pkg)).unwrap();
4166        assert!(default_rows.iter().all(|r| r.card_id != id));
4167
4168        // alias + lookup scoped to the custom store
4169        alias_set_with_store(&store, "alpha", &id, Some(pkg), None).unwrap();
4170        let via_alias = get_by_alias_with_store(&store, "alpha")
4171            .unwrap()
4172            .expect("alias resolves");
4173        assert_eq!(
4174            via_alias.get("card_id").and_then(|v| v.as_str()),
4175            Some(id.as_str())
4176        );
4177
4178        // samples write/read roundtrip
4179        let samples_path =
4180            write_samples_with_store(&store, &id, vec![json!({ "case": "a", "pass": true })])
4181                .unwrap();
4182        assert!(samples_path.starts_with(tmp.path()));
4183        let back = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4184        assert_eq!(back.len(), 1);
4185        assert_eq!(back[0].get("case").and_then(|v| v.as_str()), Some("a"));
4186    }
4187
4188    // ═══════════════════════════════════════════════════════════════
4189    // Event Publisher Port tests
4190    // ═══════════════════════════════════════════════════════════════
4191
4192    use std::sync::atomic::AtomicUsize;
4193    use std::sync::Barrier;
4194
4195    /// Serialize access to `std::env::set_var("ALC_CARD_SINKS", ...)` so
4196    /// env-touching tests do not race.
4197    fn env_lock() -> &'static Mutex<()> {
4198        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4199        LOCK.get_or_init(|| Mutex::new(()))
4200    }
4201
4202    /// RAII guard that clears `INSIDE_BUS_TEST` on drop, including the panic
4203    /// unwinding path. Without this, a panic inside `f` would leave the
4204    /// thread-local `true` on the cargo-test worker thread, so the next test
4205    /// assigned to the same worker would bypass `bus_test_gate` and corrupt
4206    /// concurrent subscriber mocks.
4207    struct BusTestOwnerGuard;
4208    impl Drop for BusTestOwnerGuard {
4209        fn drop(&mut self) {
4210            INSIDE_BUS_TEST.with(|flag| flag.set(false));
4211        }
4212    }
4213
4214    /// Ensure the global bus is initialized and subscribers are cleared,
4215    /// then install `subs` on the singleton for the duration of a test.
4216    ///
4217    /// This function holds `bus_test_gate()` for its entire duration. Any
4218    /// concurrent `publish()` call from a parallel default-store test will
4219    /// block until we release the gate, preventing event contamination.
4220    /// The INSIDE_BUS_TEST thread-local is set so that publish calls made
4221    /// FROM THIS THREAD (inside `f`) skip the gate and proceed directly
4222    /// (re-entrancy safe).
4223    ///
4224    /// If the test spawns child threads that also publish, those children
4225    /// must set INSIDE_BUS_TEST to true themselves (see
4226    /// `test_fanout_concurrent_create_with_store`). Otherwise they block on
4227    /// the gate held by this owner thread and deadlock on join.
4228    fn with_bus_subscribers<F>(subs: Vec<Arc<dyn CardSubscriber>>, f: F)
4229    where
4230        F: FnOnce(&'static CardEventBus),
4231    {
4232        // Acquire the gate FIRST. While we wait, no one else holds the owner
4233        // role, and our INSIDE_BUS_TEST is still false, so this lock is safe.
4234        let _guard = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
4235        // Now mark this thread as the bus-test owner so that publish() from
4236        // within the closure does not try to re-acquire bus_test_gate().
4237        // The RAII guard clears the flag on both normal return and unwind.
4238        INSIDE_BUS_TEST.with(|flag| flag.set(true));
4239        let _owner = BusTestOwnerGuard;
4240        let bus = event_bus();
4241        bus.reset_stats_for_test();
4242        bus.replace_subscribers_for_test(subs);
4243        f(bus);
4244        // Leave the bus clean for the next test.
4245        bus.replace_subscribers_for_test(Vec::new());
4246        bus.reset_stats_for_test();
4247        // _owner drops -> INSIDE_BUS_TEST = false (panic-safe)
4248        // _guard drops -> bus_test_gate released
4249    }
4250
4251    /// In-memory subscriber used for deterministic fan-out assertions.
4252    struct MockSubscriber {
4253        uri: String,
4254        events: Mutex<Vec<CardEvent>>,
4255        calls: AtomicUsize,
4256    }
4257
4258    impl MockSubscriber {
4259        fn new(uri: &str) -> Arc<Self> {
4260            Arc::new(Self {
4261                uri: uri.to_string(),
4262                events: Mutex::new(Vec::new()),
4263                calls: AtomicUsize::new(0),
4264            })
4265        }
4266        fn call_count(&self) -> usize {
4267            self.calls.load(Ordering::SeqCst)
4268        }
4269    }
4270
4271    impl CardSubscriber for MockSubscriber {
4272        fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
4273            self.calls.fetch_add(1, Ordering::SeqCst);
4274            self.events
4275                .lock()
4276                .unwrap_or_else(|p| p.into_inner())
4277                .push(ev.clone());
4278            Ok(())
4279        }
4280        fn describe(&self) -> String {
4281            self.uri.clone()
4282        }
4283    }
4284
4285    // ─── Bus lifetime ─────────────────────────────────────────
4286
4287    #[test]
4288    fn bus_is_process_singleton() {
4289        let a = event_bus() as *const CardEventBus;
4290        let b = event_bus() as *const CardEventBus;
4291        assert_eq!(a, b, "event_bus() must return the same singleton pointer");
4292    }
4293
4294    #[test]
4295    fn publish_with_no_subscribers_is_noop() {
4296        with_bus_subscribers(Vec::new(), |_bus| {
4297            // Should not panic; publish is a pure no-op when empty.
4298            publish(CardEvent::Created {
4299                pkg: "pkg".into(),
4300                card_id: "id".into(),
4301                toml_text: "x = 1\n".into(),
4302            });
4303        });
4304    }
4305
4306    #[test]
4307    fn init_event_bus_is_idempotent() {
4308        init_event_bus();
4309        init_event_bus();
4310        init_event_bus();
4311        // Reaching here without panic is success.
4312    }
4313
4314    // ─── Fan-out core ─────────────────────────────────────────
4315
4316    #[test]
4317    fn fanout_mirrors_create() {
4318        let primary = tempfile::tempdir().unwrap();
4319        let sub_a = tempfile::tempdir().unwrap();
4320        let sub_b = tempfile::tempdir().unwrap();
4321        let fa = Arc::new(FileCardSubscriber::new(sub_a.path().to_path_buf()));
4322        let fb = Arc::new(FileCardSubscriber::new(sub_b.path().to_path_buf()));
4323        with_bus_subscribers(vec![fa.clone(), fb.clone()], |_bus| {
4324            let store = FileCardStore::new(primary.path().to_path_buf());
4325            let (id, path) =
4326                create_with_store(&store, json!({ "pkg": { "name": "fanout_create_pkg" } }))
4327                    .unwrap();
4328            assert!(path.exists());
4329            let primary_text = fs::read_to_string(&path).unwrap();
4330            let a_path = sub_a
4331                .path()
4332                .join("fanout_create_pkg")
4333                .join(format!("{id}.toml"));
4334            let b_path = sub_b
4335                .path()
4336                .join("fanout_create_pkg")
4337                .join(format!("{id}.toml"));
4338            assert!(a_path.exists(), "subscriber A missing card");
4339            assert!(b_path.exists(), "subscriber B missing card");
4340            assert_eq!(fs::read_to_string(&a_path).unwrap(), primary_text);
4341            assert_eq!(fs::read_to_string(&b_path).unwrap(), primary_text);
4342        });
4343    }
4344
4345    #[test]
4346    fn fanout_mirrors_append() {
4347        let primary = tempfile::tempdir().unwrap();
4348        let sub = tempfile::tempdir().unwrap();
4349        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4350        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4351            let store = FileCardStore::new(primary.path().to_path_buf());
4352            let (id, _) =
4353                create_with_store(&store, json!({ "pkg": { "name": "fanout_append_pkg" } }))
4354                    .unwrap();
4355            // After create the subscriber must have the card so append can locate it.
4356            append_with_store(&store, &id, json!({ "extra_key": 42 })).unwrap();
4357            let sub_path = sub
4358                .path()
4359                .join("fanout_append_pkg")
4360                .join(format!("{id}.toml"));
4361            let text = fs::read_to_string(&sub_path).unwrap();
4362            assert!(text.contains("extra_key"), "append must mirror new key");
4363        });
4364    }
4365
4366    #[test]
4367    fn fanout_mirrors_samples() {
4368        let primary = tempfile::tempdir().unwrap();
4369        let sub = tempfile::tempdir().unwrap();
4370        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4371        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4372            let store = FileCardStore::new(primary.path().to_path_buf());
4373            let (id, _) =
4374                create_with_store(&store, json!({ "pkg": { "name": "fanout_samples_pkg" } }))
4375                    .unwrap();
4376            write_samples_with_store(&store, &id, vec![json!({ "case": "c0" })]).unwrap();
4377            let sub_path = sub
4378                .path()
4379                .join("fanout_samples_pkg")
4380                .join(format!("{id}.samples.jsonl"));
4381            let text = fs::read_to_string(&sub_path).unwrap();
4382            assert!(text.contains("\"case\":\"c0\""));
4383        });
4384    }
4385
4386    #[test]
4387    fn fanout_mirrors_aliases() {
4388        let primary = tempfile::tempdir().unwrap();
4389        let sub = tempfile::tempdir().unwrap();
4390        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4391        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4392            let store = FileCardStore::new(primary.path().to_path_buf());
4393            let (id, _) =
4394                create_with_store(&store, json!({ "pkg": { "name": "fanout_alias_pkg" } }))
4395                    .unwrap();
4396            alias_set_with_store(&store, "myalias", &id, Some("fanout_alias_pkg"), None).unwrap();
4397            let sub_aliases = sub.path().join("_aliases.toml");
4398            assert!(sub_aliases.exists(), "subscriber must receive aliases file");
4399            let text = fs::read_to_string(&sub_aliases).unwrap();
4400            assert!(text.contains("myalias"));
4401        });
4402    }
4403
4404    #[test]
4405    fn fanout_mirrors_import_from_dir_cards() {
4406        let primary = tempfile::tempdir().unwrap();
4407        let sub = tempfile::tempdir().unwrap();
4408        let src = tempfile::tempdir().unwrap();
4409
4410        // Build a pre-existing source tree (a previous run's output).
4411        let src_card = src.path().join("card_x.toml");
4412        let toml_body = format!(
4413            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_x\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_pkg\"\n"
4414        );
4415        fs::write(&src_card, &toml_body).unwrap();
4416
4417        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4418        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4419            let store = FileCardStore::new(primary.path().to_path_buf());
4420            let (imported, _skipped) =
4421                import_from_dir_with_store(&store, src.path(), "fanout_import_pkg").unwrap();
4422            assert_eq!(imported, vec!["card_x".to_string()]);
4423
4424            let sub_card = sub.path().join("fanout_import_pkg").join("card_x.toml");
4425            assert!(sub_card.exists(), "imported card must be mirrored");
4426        });
4427    }
4428
4429    #[test]
4430    fn fanout_mirrors_import_from_dir_samples() {
4431        let primary = tempfile::tempdir().unwrap();
4432        let sub = tempfile::tempdir().unwrap();
4433        let src = tempfile::tempdir().unwrap();
4434
4435        let toml_body = format!(
4436            "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_y\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_samples_pkg\"\n"
4437        );
4438        fs::write(src.path().join("card_y.toml"), &toml_body).unwrap();
4439        fs::write(
4440            src.path().join("card_y.samples.jsonl"),
4441            "{\"case\":\"c0\"}\n",
4442        )
4443        .unwrap();
4444
4445        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4446        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4447            let store = FileCardStore::new(primary.path().to_path_buf());
4448            let (imported, _) =
4449                import_from_dir_with_store(&store, src.path(), "fanout_import_samples_pkg")
4450                    .unwrap();
4451            assert_eq!(imported, vec!["card_y".to_string()]);
4452
4453            let sub_samples = sub
4454                .path()
4455                .join("fanout_import_samples_pkg")
4456                .join("card_y.samples.jsonl");
4457            assert!(sub_samples.exists(), "imported samples must be mirrored");
4458            let text = fs::read_to_string(&sub_samples).unwrap();
4459            assert!(text.contains("c0"));
4460        });
4461    }
4462
4463    #[test]
4464    fn with_store_direct_call_still_publishes() {
4465        let primary = tempfile::tempdir().unwrap();
4466        let mock = MockSubscriber::new("mock://direct");
4467        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4468            let store = FileCardStore::new(primary.path().to_path_buf());
4469            create_with_store(&store, json!({ "pkg": { "name": "direct_call_pkg" } })).unwrap();
4470            assert_eq!(mock.call_count(), 1, "direct _with_store call must publish");
4471        });
4472    }
4473
4474    #[test]
4475    fn subscriber_appended_missing_card_warns() {
4476        let primary = tempfile::tempdir().unwrap();
4477        let sub = tempfile::tempdir().unwrap();
4478        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4479        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
4480            let store = FileCardStore::new(primary.path().to_path_buf());
4481            // Create the card BEFORE the subscriber knows about it. To do that,
4482            // swap the subscriber out so create does not mirror, then swap in.
4483            bus.replace_subscribers_for_test(Vec::new());
4484            let (id, _) =
4485                create_with_store(&store, json!({ "pkg": { "name": "missing_append_pkg" } }))
4486                    .unwrap();
4487            // Re-install the subscriber; it has no mirror of the card.
4488            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
4489
4490            // The append call must succeed on the primary; the subscriber
4491            // will record an error because locate_card returns None.
4492            append_with_store(&store, &id, json!({ "k": 1 })).unwrap();
4493
4494            let snap = bus.stats().snapshot();
4495            let row = snap
4496                .iter()
4497                .find(|r| r.sink == fs_sub.describe())
4498                .expect("subscriber row exists");
4499            let err_total: u64 = row.err.values().sum();
4500            assert!(err_total >= 1, "subscriber append err must be recorded");
4501            assert!(row.last_error.is_some());
4502        });
4503    }
4504
4505    #[test]
4506    fn subscriber_failure_preserves_primary() {
4507        struct FailingSubscriber;
4508        impl CardSubscriber for FailingSubscriber {
4509            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4510                Err("synthetic failure".into())
4511            }
4512            fn describe(&self) -> String {
4513                "mock://failing".into()
4514            }
4515        }
4516        let primary = tempfile::tempdir().unwrap();
4517        with_bus_subscribers(
4518            vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4519            |bus| {
4520                let store = FileCardStore::new(primary.path().to_path_buf());
4521                // Primary call must still succeed despite subscriber failure.
4522                let (_id, path) =
4523                    create_with_store(&store, json!({ "pkg": { "name": "preserve_primary_pkg" } }))
4524                        .unwrap();
4525                assert!(path.exists());
4526                let snap = bus.stats().snapshot();
4527                let row = snap
4528                    .iter()
4529                    .find(|r| r.sink == "mock://failing")
4530                    .expect("row exists");
4531                let err_total: u64 = row.err.values().sum();
4532                assert!(err_total >= 1);
4533            },
4534        );
4535    }
4536
4537    // ─── SubscriberStats JSON shape tests (Subtask 2) ──────────
4538
4539    #[test]
4540    fn stats_counts_ok() {
4541        let primary = tempfile::tempdir().unwrap();
4542        let mock = MockSubscriber::new("mock://stats-ok");
4543        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |bus| {
4544            let store = FileCardStore::new(primary.path().to_path_buf());
4545            for i in 0..3 {
4546                create_with_store(
4547                    &store,
4548                    json!({
4549                        "card_id": format!("stats_ok_{i}"),
4550                        "pkg": { "name": "stats_ok_pkg" },
4551                    }),
4552                )
4553                .unwrap();
4554            }
4555            let snap = bus.stats().snapshot();
4556            let row = snap
4557                .iter()
4558                .find(|r| r.sink == "mock://stats-ok")
4559                .expect("row");
4560            assert_eq!(row.ok.get("created").copied().unwrap_or(0), 3);
4561            assert_eq!(row.err.get("created").copied().unwrap_or(0), 0);
4562            // All four keys must be present (may be 0).
4563            for k in ["created", "appended", "samples", "aliases"] {
4564                assert!(row.ok.contains_key(k), "ok.{k} must be present");
4565                assert!(row.err.contains_key(k), "err.{k} must be present");
4566            }
4567            assert!(row.last_error.is_none());
4568        });
4569    }
4570
4571    #[test]
4572    fn stats_counts_err_with_last_error() {
4573        struct FailingSubscriber;
4574        impl CardSubscriber for FailingSubscriber {
4575            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4576                Err("synthetic create failure".into())
4577            }
4578            fn describe(&self) -> String {
4579                "mock://stats-err".into()
4580            }
4581        }
4582        let primary = tempfile::tempdir().unwrap();
4583        with_bus_subscribers(
4584            vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4585            |bus| {
4586                let store = FileCardStore::new(primary.path().to_path_buf());
4587                create_with_store(&store, json!({ "pkg": { "name": "stats_err_pkg" } })).unwrap();
4588                let snap = bus.stats().snapshot();
4589                let row = snap
4590                    .iter()
4591                    .find(|r| r.sink == "mock://stats-err")
4592                    .expect("row");
4593                assert_eq!(row.err.get("created").copied().unwrap_or(0), 1);
4594                let le = row.last_error.as_ref().expect("last_error set");
4595                assert!(!le.msg.is_empty(), "last_error.msg must be non-empty");
4596                assert_eq!(le.kind, CardEventKind::Created);
4597                assert!(le.ts_ms > 0, "last_error.ts_ms must be populated");
4598            },
4599        );
4600    }
4601
4602    #[test]
4603    fn stats_per_subscriber_isolated() {
4604        struct FailingSubscriber;
4605        impl CardSubscriber for FailingSubscriber {
4606            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4607                Err("sub1 fails".into())
4608            }
4609            fn describe(&self) -> String {
4610                "mock://sub1-fail".into()
4611            }
4612        }
4613        let primary = tempfile::tempdir().unwrap();
4614        let mock_ok = MockSubscriber::new("mock://sub2-ok");
4615        let subs: Vec<Arc<dyn CardSubscriber>> = vec![
4616            Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>,
4617            mock_ok.clone() as Arc<dyn CardSubscriber>,
4618        ];
4619        with_bus_subscribers(subs, |bus| {
4620            let store = FileCardStore::new(primary.path().to_path_buf());
4621            create_with_store(&store, json!({ "pkg": { "name": "isolated_pkg" } })).unwrap();
4622            let snap = bus.stats().snapshot();
4623            let r1 = snap
4624                .iter()
4625                .find(|r| r.sink == "mock://sub1-fail")
4626                .expect("sub1 row");
4627            let r2 = snap
4628                .iter()
4629                .find(|r| r.sink == "mock://sub2-ok")
4630                .expect("sub2 row");
4631            assert_eq!(r1.err.get("created").copied().unwrap_or(0), 1);
4632            assert_eq!(r1.ok.get("created").copied().unwrap_or(0), 0);
4633            assert_eq!(r2.ok.get("created").copied().unwrap_or(0), 1);
4634            assert_eq!(r2.err.get("created").copied().unwrap_or(0), 0);
4635            assert!(r1.last_error.is_some());
4636            assert!(r2.last_error.is_none());
4637        });
4638    }
4639
4640    #[test]
4641    fn subscriber_stats_survive_multiple_calls() {
4642        // Regression guard: per-call SubscriberStats creation would
4643        // have reset the counter between create_with_store invocations.
4644        // Verify that counters accumulate across 3 independent calls
4645        // against the global bus's stats handle.
4646        let primary = tempfile::tempdir().unwrap();
4647        let mock = MockSubscriber::new("mock://stats-survive");
4648        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4649            let store = FileCardStore::new(primary.path().to_path_buf());
4650            for i in 0..3 {
4651                create_with_store(
4652                    &store,
4653                    json!({
4654                        "card_id": format!("survive_{i}"),
4655                        "pkg": { "name": "survive_pkg" },
4656                    }),
4657                )
4658                .unwrap();
4659            }
4660            // Use the public snapshot entry point to exercise the
4661            // same path that AppService::stats uses.
4662            let snap = subscriber_stats_snapshot();
4663            let row = snap
4664                .iter()
4665                .find(|r| r.sink == "mock://stats-survive")
4666                .expect("row");
4667            assert_eq!(
4668                row.ok.get("created").copied().unwrap_or(0),
4669                3,
4670                "counters must accumulate across calls"
4671            );
4672        });
4673    }
4674
4675    #[test]
4676    fn stats_snapshot_serializes_with_all_kind_keys() {
4677        // Serialize a minimal row and verify JSON field shape.
4678        let primary = tempfile::tempdir().unwrap();
4679        let mock = MockSubscriber::new("mock://json-shape");
4680        with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4681            let store = FileCardStore::new(primary.path().to_path_buf());
4682            create_with_store(&store, json!({ "pkg": { "name": "json_shape_pkg" } })).unwrap();
4683            let snap = subscriber_stats_snapshot();
4684            let json = serde_json::to_value(&snap).expect("serializable");
4685            let arr = json.as_array().expect("array");
4686            let row = arr
4687                .iter()
4688                .find(|r| r.get("sink").and_then(|s| s.as_str()) == Some("mock://json-shape"))
4689                .expect("row present in JSON");
4690            assert_eq!(row.get("sink").unwrap(), "mock://json-shape");
4691            for k in ["created", "appended", "samples", "aliases"] {
4692                assert!(row.pointer(&format!("/ok/{k}")).is_some(), "ok.{k} missing");
4693                assert!(
4694                    row.pointer(&format!("/err/{k}")).is_some(),
4695                    "err.{k} missing"
4696                );
4697            }
4698            assert!(row.get("last_error").is_some());
4699        });
4700    }
4701
4702    #[test]
4703    fn multi_process_tmp_unique_suffix() {
4704        // Invoke atomic_write against a fresh dir and capture the tmp
4705        // filename left on disk by forcing rename to happen on an
4706        // already-nonexistent dest. We simulate by writing to a path
4707        // and then inspecting the parent dir during the operation —
4708        // since atomic_write removes tmp on success, we instead check
4709        // the suffix format by constructing it the same way.
4710        let pid = process::id();
4711        let sample = format!("whatever.tmp.{pid}.0");
4712        // Regex-style match without the regex crate dependency:
4713        let rest = sample.trim_start_matches("whatever.tmp.");
4714        let (pid_part, seq_part) = rest.split_once('.').expect("dotted form");
4715        assert!(pid_part.chars().all(|c| c.is_ascii_digit()));
4716        assert!(seq_part.chars().all(|c| c.is_ascii_digit()));
4717
4718        // Real atomic_write round-trip — must not panic and must leave
4719        // the dest file in place with the written bytes.
4720        let dir = tempfile::tempdir().unwrap();
4721        let dest = dir.path().join("out.txt");
4722        atomic_write(&dest, b"hello").unwrap();
4723        assert_eq!(fs::read_to_string(&dest).unwrap(), "hello");
4724    }
4725
4726    // ─── describe / env parser ───────────────────────────────
4727
4728    #[cfg(unix)]
4729    #[test]
4730    fn describe_roundtrips_env_spec() {
4731        let dir = tempfile::tempdir().unwrap();
4732        let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4733        let uri = sub.describe();
4734        assert!(uri.starts_with("file:///"), "unix uri must be file:///...");
4735        // Parse the URI back and confirm the resolved path matches.
4736        let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4737        assert_eq!(parsed.describe(), uri);
4738    }
4739
4740    #[cfg(windows)]
4741    #[test]
4742    fn describe_roundtrips_env_spec_windows() {
4743        let dir = tempfile::tempdir().unwrap();
4744        let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4745        let uri = sub.describe();
4746        assert!(
4747            uri.starts_with("file:///"),
4748            "windows uri must be file:///..."
4749        );
4750        let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4751        assert_eq!(parsed.describe(), uri);
4752    }
4753
4754    #[test]
4755    fn env_empty_means_no_subscribers() {
4756        // env-touching — serialized by env_lock() to avoid races with
4757        // any other env-reading test in this binary.
4758        let _g = env_lock().lock().unwrap_or_else(|p| p.into_inner());
4759        let prev = std::env::var("ALC_CARD_SINKS").ok();
4760        // SAFETY: test-only single-threaded env mutation under mutex.
4761        unsafe {
4762            std::env::set_var("ALC_CARD_SINKS", "");
4763        }
4764        let subs = load_subscribers_from_env();
4765        assert!(subs.is_empty());
4766        // restore
4767        unsafe {
4768            match prev {
4769                Some(v) => std::env::set_var("ALC_CARD_SINKS", v),
4770                None => std::env::remove_var("ALC_CARD_SINKS"),
4771            }
4772        }
4773    }
4774
4775    #[test]
4776    fn env_parse_rejects_bare_path() {
4777        assert!(parse_subscriber_spec("/foo/bar").is_none());
4778    }
4779
4780    #[test]
4781    fn env_parse_rejects_unknown_scheme() {
4782        assert!(parse_subscriber_spec("sqlite:///foo").is_none());
4783        assert!(parse_subscriber_spec("s3://bucket/foo").is_none());
4784        assert!(parse_subscriber_spec("http://example.com/x").is_none());
4785    }
4786
4787    #[test]
4788    fn env_parse_rejects_non_empty_authority() {
4789        assert!(parse_subscriber_spec("file://host/path").is_none());
4790    }
4791
4792    #[test]
4793    fn env_parse_rejects_missing_double_slash() {
4794        assert!(parse_subscriber_spec("file:/foo").is_none());
4795        assert!(parse_subscriber_spec("file:foo").is_none());
4796    }
4797
4798    #[cfg(unix)]
4799    #[test]
4800    fn env_parse_accepts_file_uri() {
4801        let sub = parse_subscriber_spec("file:///tmp/algocline-sinks-unit").expect("accepted");
4802        assert_eq!(sub.describe(), "file:///tmp/algocline-sinks-unit");
4803    }
4804
4805    #[cfg(windows)]
4806    #[test]
4807    fn env_parse_accepts_file_uri_windows() {
4808        let sub = parse_subscriber_spec("file:///C:/algocline-sinks-unit").expect("accepted");
4809        // Windows canonicalization re-emits the same URI.
4810        assert!(sub.describe().starts_with("file:///"));
4811    }
4812
4813    #[test]
4814    fn env_parse_splits_by_pipe() {
4815        let subs = parse_subscribers_from_str("file:///tmp/a|file:///tmp/b");
4816        assert_eq!(subs.len(), 2);
4817        assert_eq!(subs[0].describe(), "file:///tmp/a");
4818        assert_eq!(subs[1].describe(), "file:///tmp/b");
4819    }
4820
4821    #[test]
4822    fn env_parse_treats_colon_as_literal_path() {
4823        // `file:///tmp/a:b` — colon inside the path component is a literal.
4824        #[cfg(unix)]
4825        {
4826            let sub = parse_subscriber_spec("file:///tmp/a:b").expect("accepted");
4827            assert_eq!(sub.describe(), "file:///tmp/a:b");
4828        }
4829        #[cfg(windows)]
4830        {
4831            // On Windows the colon shows up as a drive letter separator.
4832            let sub = parse_subscriber_spec("file:///C:/a:b").expect("accepted");
4833            assert!(sub.describe().contains(":"));
4834        }
4835    }
4836
4837    #[test]
4838    fn env_parse_percent_decode_space() {
4839        #[cfg(unix)]
4840        {
4841            let sub = parse_subscriber_spec("file:///tmp/a%20b").expect("accepted");
4842            assert_eq!(sub.describe(), "file:///tmp/a b");
4843        }
4844        #[cfg(windows)]
4845        {
4846            let sub = parse_subscriber_spec("file:///C:/a%20b").expect("accepted");
4847            assert!(sub.describe().contains(' '));
4848        }
4849    }
4850
4851    #[test]
4852    fn env_parse_percent_decode_rejects_invalid_hex() {
4853        assert!(parse_subscriber_spec("file:///tmp/a%ZZb").is_none());
4854    }
4855
4856    #[test]
4857    fn env_parse_percent_decode_rejects_incomplete() {
4858        assert!(parse_subscriber_spec("file:///tmp/a%2").is_none());
4859        assert!(parse_subscriber_spec("file:///tmp/a%").is_none());
4860    }
4861
4862    #[test]
4863    fn env_parse_rejects_non_utf8() {
4864        // Exercised through `load_subscribers_from_env` via NotUnicode.
4865        // We cannot easily set a non-UTF8 env var cross-platform inside
4866        // a unit test, so we verify the error path indirectly: the
4867        // parser only consumes `String` which is UTF-8 by construction,
4868        // and the env reader branches on VarError::NotUnicode. To keep
4869        // the test meaningful, verify that percent-decoded non-UTF8
4870        // bytes are rejected (closest structural analogue).
4871        // `%C3%28` is an invalid UTF-8 two-byte sequence.
4872        assert!(parse_subscriber_spec("file:///tmp/%C3%28").is_none());
4873    }
4874
4875    #[test]
4876    fn env_parse_dedups_duplicate_uris() {
4877        let subs = parse_subscribers_from_str("file:///tmp/x|file:///tmp/x|file:///tmp/y");
4878        assert_eq!(subs.len(), 2);
4879        assert_eq!(subs[0].describe(), "file:///tmp/x");
4880        assert_eq!(subs[1].describe(), "file:///tmp/y");
4881    }
4882
4883    // ═══════════════════════════════════════════════════════════════
4884    // Concurrency tests (concurrency-analysis.md §2)
4885    // ═══════════════════════════════════════════════════════════════
4886
4887    #[test]
4888    fn test_oncelock_init_race_single_winner() {
4889        // N threads call event_bus() concurrently; all must observe the
4890        // same singleton pointer.
4891        let barrier = Arc::new(Barrier::new(8));
4892        let mut handles = Vec::new();
4893        for _ in 0..8 {
4894            let b = barrier.clone();
4895            handles.push(std::thread::spawn(move || {
4896                b.wait();
4897                event_bus() as *const CardEventBus as usize
4898            }));
4899        }
4900        let ptrs: Vec<usize> = handles.into_iter().map(|h| h.join().unwrap()).collect();
4901        let first = ptrs[0];
4902        for p in &ptrs {
4903            assert_eq!(*p, first, "singleton identity must hold across threads");
4904        }
4905    }
4906
4907    #[test]
4908    fn test_subscriber_stats_concurrent_update() {
4909        let stats = Arc::new(SubscriberStats::default());
4910        let n_threads = 4;
4911        let per_thread = 250;
4912        let barrier = Arc::new(Barrier::new(n_threads));
4913        let mut handles = Vec::new();
4914        for t in 0..n_threads {
4915            let s = stats.clone();
4916            let b = barrier.clone();
4917            handles.push(std::thread::spawn(move || {
4918                b.wait();
4919                for i in 0..per_thread {
4920                    let kind = if (t + i) % 2 == 0 {
4921                        CardEventKind::Created
4922                    } else {
4923                        CardEventKind::Appended
4924                    };
4925                    s.record_ok("mock://same-subscriber", kind);
4926                }
4927            }));
4928        }
4929        for h in handles {
4930            h.join().unwrap();
4931        }
4932        let snap = stats.snapshot();
4933        let row = snap
4934            .iter()
4935            .find(|r| r.sink == "mock://same-subscriber")
4936            .expect("row");
4937        let expected = (n_threads * per_thread) as u64;
4938        let ok_total: u64 = row.ok.values().sum();
4939        assert_eq!(ok_total, expected, "all increments must be counted");
4940    }
4941
4942    #[test]
4943    fn test_subscriber_stats_poison_recovery() {
4944        let stats = Arc::new(SubscriberStats::default());
4945        // Populate some value so the recovered inner map is non-empty.
4946        stats.record_ok("mock://poison", CardEventKind::Created);
4947
4948        // Poison the Mutex.
4949        let s_clone = stats.clone();
4950        let _ = std::thread::spawn(move || {
4951            let _g = s_clone.inner.lock().unwrap();
4952            panic!("intentional poison");
4953        })
4954        .join();
4955
4956        // Follow-up accessors must not hang and must return the prior value.
4957        let snap = stats.snapshot();
4958        assert!(!snap.is_empty(), "snapshot after poison must still work");
4959        let ok1: u64 = snap[0].ok.values().sum();
4960        assert_eq!(ok1, 1);
4961
4962        // Further writes must also succeed (via unwrap_or_else).
4963        stats.record_ok("mock://poison", CardEventKind::Created);
4964        let snap2 = stats.snapshot();
4965        let ok2: u64 = snap2[0].ok.values().sum();
4966        assert_eq!(ok2, 2);
4967    }
4968
4969    #[test]
4970    fn test_atomic_tmp_seq_unique_under_concurrency() {
4971        // N threads build tmp suffix strings via the same atomic and
4972        // all suffixes must differ.
4973        let dir = tempfile::tempdir().unwrap();
4974        let barrier = Arc::new(Barrier::new(8));
4975        let mut handles = Vec::new();
4976        for i in 0..8 {
4977            let d = dir.path().to_path_buf();
4978            let b = barrier.clone();
4979            handles.push(std::thread::spawn(move || {
4980                b.wait();
4981                let dest = d.join(format!("file_{i}.bin"));
4982                atomic_write(&dest, b"x").unwrap();
4983                // Collect the leaf filename for uniqueness.
4984                dest.file_name().unwrap().to_string_lossy().to_string()
4985            }));
4986        }
4987        let names: HashSet<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
4988        assert_eq!(names.len(), 8, "all dest names must be unique");
4989        // Additionally confirm suffix format by invoking atomic_write
4990        // again and parsing the tmp we leave on a forced failure.
4991    }
4992
4993    #[test]
4994    fn test_atomic_tmp_seq_wraps_without_panic() {
4995        // Isolated AtomicU64 at the boundary of wrap-around. fetch_add
4996        // is documented to wrap without panic.
4997        let seq = AtomicU64::new(u64::MAX - 1);
4998        let a = seq.fetch_add(1, Ordering::Relaxed);
4999        let b = seq.fetch_add(1, Ordering::Relaxed);
5000        let c = seq.fetch_add(1, Ordering::Relaxed);
5001        assert_eq!(a, u64::MAX - 1);
5002        assert_eq!(b, u64::MAX);
5003        assert_eq!(c, 0, "u64 fetch_add must wrap to 0");
5004    }
5005
5006    #[test]
5007    fn test_rename_atomicity_same_volume() {
5008        // 2 threads write the same dest from different tmp names. On
5009        // POSIX the late writer wins; either way the dest must be
5010        // observable and contain one of the two payloads.
5011        let dir = tempfile::tempdir().unwrap();
5012        let dest = dir.path().join("shared.bin");
5013        let barrier = Arc::new(Barrier::new(2));
5014        let mut handles = Vec::new();
5015        for i in 0..2u8 {
5016            let d = dest.clone();
5017            let b = barrier.clone();
5018            handles.push(std::thread::spawn(move || {
5019                b.wait();
5020                let payload = vec![i; 64];
5021                atomic_write(&d, &payload)
5022            }));
5023        }
5024        let mut saw_ok = 0;
5025        for h in handles {
5026            #[cfg(unix)]
5027            {
5028                // On POSIX both should succeed — rename is atomic but allowed.
5029                h.join().unwrap().unwrap();
5030                saw_ok += 1;
5031            }
5032            #[cfg(not(unix))]
5033            {
5034                // On Windows at least one must succeed.
5035                if h.join().unwrap().is_ok() {
5036                    saw_ok += 1;
5037                }
5038            }
5039        }
5040        assert!(saw_ok >= 1, "at least one rename must succeed");
5041        assert!(dest.exists(), "dest must exist after concurrent rename");
5042        let bytes = fs::read(&dest).unwrap();
5043        assert!(bytes == vec![0u8; 64] || bytes == vec![1u8; 64]);
5044    }
5045
5046    #[test]
5047    fn test_fanout_concurrent_create_with_store() {
5048        let primary = tempfile::tempdir().unwrap();
5049        let sub = tempfile::tempdir().unwrap();
5050        let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
5051        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5052            let primary_path = primary.path().to_path_buf();
5053            let barrier = Arc::new(Barrier::new(4));
5054            let mut handles = Vec::new();
5055            for i in 0..4 {
5056                let pp = primary_path.clone();
5057                let b = barrier.clone();
5058                handles.push(std::thread::spawn(move || {
5059                    // The parent holds `bus_test_gate()` for the entire
5060                    // `with_bus_subscribers` scope. Child threads must set
5061                    // INSIDE_BUS_TEST=true themselves so that publish()
5062                    // bypasses the gate instead of blocking on it (which
5063                    // would deadlock once the parent calls join()).
5064                    INSIDE_BUS_TEST.with(|flag| flag.set(true));
5065                    b.wait();
5066                    let store = FileCardStore::new(pp);
5067                    create_with_store(
5068                        &store,
5069                        json!({
5070                            "card_id": format!("concur_card_{i}"),
5071                            "pkg": { "name": "concur_pkg" },
5072                        }),
5073                    )
5074                    .unwrap()
5075                    .0
5076                }));
5077            }
5078            let ids: Vec<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5079            assert_eq!(ids.len(), 4);
5080            for id in &ids {
5081                let p = sub.path().join("concur_pkg").join(format!("{id}.toml"));
5082                assert!(p.exists(), "subscriber must have card {id}");
5083            }
5084            let snap = bus.stats().snapshot();
5085            let row = snap
5086                .iter()
5087                .find(|r| r.sink == fs_sub.describe())
5088                .expect("row");
5089            let ok_total: u64 = row.ok.values().sum();
5090            assert_eq!(
5091                ok_total, 4,
5092                "subscriber must have recorded 4 successful deliveries"
5093            );
5094        });
5095    }
5096
5097    // ─── card_sink_backfill (Subtask 3) ────────────────────────
5098
5099    /// Primary-side fixture with N cards already written via the
5100    /// subscriber-free path so backfill has something to push.
5101    /// Returns (primary_dir_guard, store, card_ids).
5102    fn backfill_primary_with_cards(
5103        pkg: &str,
5104        count: usize,
5105    ) -> (tempfile::TempDir, FileCardStore, Vec<String>) {
5106        let primary = tempfile::tempdir().unwrap();
5107        let store = FileCardStore::new(primary.path().to_path_buf());
5108        let mut ids = Vec::new();
5109        for i in 0..count {
5110            let (id, _) = create_with_store(
5111                &store,
5112                json!({
5113                    "card_id": format!("{pkg}_{i}"),
5114                    "pkg": { "name": pkg },
5115                }),
5116            )
5117            .unwrap();
5118            ids.push(id);
5119        }
5120        (primary, store, ids)
5121    }
5122
5123    #[test]
5124    fn backfill_pushes_missing_cards() {
5125        let sub_dir = tempfile::tempdir().unwrap();
5126        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5127        let uri = fs_sub.describe();
5128        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5129            // Populate primary before the subscriber is live (temporarily drop it).
5130            let bus = event_bus();
5131            bus.replace_subscribers_for_test(Vec::new());
5132            let (_primary, store, ids) = backfill_primary_with_cards("backfill_push_pkg", 2);
5133            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5134
5135            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5136            assert_eq!(report.pushed.len(), 2);
5137            assert_eq!(report.skipped.len(), 0);
5138            assert!(report.failed.is_empty());
5139            for id in &ids {
5140                let p = sub_dir
5141                    .path()
5142                    .join("backfill_push_pkg")
5143                    .join(format!("{id}.toml"));
5144                assert!(p.exists(), "card {id} must exist on subscriber");
5145            }
5146        });
5147    }
5148
5149    #[test]
5150    fn backfill_skips_existing_on_subscriber() {
5151        let sub_dir = tempfile::tempdir().unwrap();
5152        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5153        let uri = fs_sub.describe();
5154        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5155            // Subscriber is live during create, so it already has the card.
5156            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_skip_pkg", 3);
5157            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5158            assert_eq!(report.pushed.len(), 0);
5159            assert_eq!(report.skipped.len(), 3);
5160            assert!(report.failed.is_empty());
5161        });
5162    }
5163
5164    #[test]
5165    fn backfill_dry_run_no_writes() {
5166        let sub_dir = tempfile::tempdir().unwrap();
5167        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5168        let uri = fs_sub.describe();
5169        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5170            let bus = event_bus();
5171            bus.replace_subscribers_for_test(Vec::new());
5172            let (_primary, store, ids) = backfill_primary_with_cards("backfill_dry_pkg", 2);
5173            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5174
5175            let report = card_sink_backfill_with_store(&store, &uri, true).unwrap();
5176            assert_eq!(
5177                report.pushed.len(),
5178                2,
5179                "pushed must list ids even in dry run"
5180            );
5181            for id in &ids {
5182                let p = sub_dir
5183                    .path()
5184                    .join("backfill_dry_pkg")
5185                    .join(format!("{id}.toml"));
5186                assert!(!p.exists(), "dry run must NOT write card {id}");
5187            }
5188            // Stats must remain zero — dry run publishes nothing.
5189            let snap = bus.stats().snapshot();
5190            if let Some(row) = snap.iter().find(|r| r.sink == uri) {
5191                let total: u64 = row.ok.values().sum::<u64>() + row.err.values().sum::<u64>();
5192                assert_eq!(total, 0, "dry run must not touch stats");
5193            }
5194        });
5195    }
5196
5197    #[test]
5198    fn backfill_drifted_card_skipped_not_overwritten() {
5199        let sub_dir = tempfile::tempdir().unwrap();
5200        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5201        let uri = fs_sub.describe();
5202        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5203            let bus = event_bus();
5204            bus.replace_subscribers_for_test(Vec::new());
5205            let (_primary, store, ids) = backfill_primary_with_cards("backfill_drift_pkg", 1);
5206            let id = &ids[0];
5207
5208            // Manually place a drifted copy on the subscriber with sentinel text.
5209            let sub_card_dir = sub_dir.path().join("backfill_drift_pkg");
5210            fs::create_dir_all(&sub_card_dir).unwrap();
5211            let sub_card = sub_card_dir.join(format!("{id}.toml"));
5212            fs::write(&sub_card, "drifted=true\n").unwrap();
5213
5214            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5215            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5216            assert_eq!(report.skipped, vec![id.clone()]);
5217            assert!(report.pushed.is_empty());
5218            let after = fs::read_to_string(&sub_card).unwrap();
5219            assert_eq!(after, "drifted=true\n", "drifted copy must be preserved");
5220        });
5221    }
5222
5223    #[test]
5224    fn backfill_includes_samples() {
5225        let sub_dir = tempfile::tempdir().unwrap();
5226        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5227        let uri = fs_sub.describe();
5228        with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5229            let bus = event_bus();
5230            bus.replace_subscribers_for_test(Vec::new());
5231            let (_primary, store, ids) = backfill_primary_with_cards("backfill_samples_pkg", 1);
5232            let id = &ids[0];
5233            write_samples_with_store(&store, id, vec![json!({ "case": "c0" })]).unwrap();
5234            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5235
5236            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5237            assert_eq!(report.pushed, vec![id.clone()]);
5238            assert_eq!(report.pushed_samples, vec![id.clone()]);
5239            let sub_samples = sub_dir
5240                .path()
5241                .join("backfill_samples_pkg")
5242                .join(format!("{id}.samples.jsonl"));
5243            assert!(sub_samples.exists());
5244            assert!(fs::read_to_string(&sub_samples).unwrap().contains("c0"));
5245        });
5246    }
5247
5248    #[test]
5249    fn backfill_unknown_sink_err() {
5250        with_bus_subscribers(Vec::new(), |_bus| {
5251            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_unknown_pkg", 1);
5252            let err = card_sink_backfill_with_store(&store, "file:///nonexistent/sink", false)
5253                .unwrap_err();
5254            assert!(
5255                err.starts_with("unknown sink"),
5256                "must reject unregistered sink; got: {err}"
5257            );
5258        });
5259    }
5260
5261    #[test]
5262    fn backfill_bypasses_bus_fanout() {
5263        // Subscriber A is already in-sync; Subscriber B is the backfill target.
5264        // Backfilling B must NOT re-deliver Created events to A.
5265        let sub_a_dir = tempfile::tempdir().unwrap();
5266        let sub_b_dir = tempfile::tempdir().unwrap();
5267        let fa = Arc::new(FileCardSubscriber::new(sub_a_dir.path().to_path_buf()));
5268        let fb = Arc::new(FileCardSubscriber::new(sub_b_dir.path().to_path_buf()));
5269        let uri_b = fb.describe();
5270        with_bus_subscribers(
5271            vec![
5272                fa.clone() as Arc<dyn CardSubscriber>,
5273                fb.clone() as Arc<dyn CardSubscriber>,
5274            ],
5275            |bus| {
5276                // Populate primary with subscriber A live (B temporarily absent).
5277                bus.replace_subscribers_for_test(vec![fa.clone()]);
5278                let (_primary, store, _ids) = backfill_primary_with_cards("backfill_bypass_pkg", 2);
5279                // Capture A's ok[created] count before backfill.
5280                let before = bus
5281                    .stats()
5282                    .snapshot()
5283                    .into_iter()
5284                    .find(|r| r.sink == fa.describe())
5285                    .map(|r| r.ok.get("created").copied().unwrap_or(0))
5286                    .unwrap_or(0);
5287                // Now reinstall both subscribers and backfill only B.
5288                bus.replace_subscribers_for_test(vec![fa.clone(), fb.clone()]);
5289                card_sink_backfill_with_store(&store, &uri_b, false).unwrap();
5290                let after = bus
5291                    .stats()
5292                    .snapshot()
5293                    .into_iter()
5294                    .find(|r| r.sink == fa.describe())
5295                    .map(|r| r.ok.get("created").copied().unwrap_or(0))
5296                    .unwrap_or(0);
5297                assert_eq!(
5298                    before, after,
5299                    "backfill target B must not cause fan-out to subscriber A"
5300                );
5301            },
5302        );
5303    }
5304
5305    #[test]
5306    fn backfill_updates_subscriber_stats() {
5307        let sub_dir = tempfile::tempdir().unwrap();
5308        let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5309        let uri = fs_sub.describe();
5310        with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5311            bus.replace_subscribers_for_test(Vec::new());
5312            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_stats_pkg", 2);
5313            bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5314
5315            card_sink_backfill_with_store(&store, &uri, false).unwrap();
5316            let snap = bus.stats().snapshot();
5317            let row = snap.iter().find(|r| r.sink == uri).expect("row");
5318            assert_eq!(
5319                row.ok.get("created").copied().unwrap_or(0),
5320                2,
5321                "backfill must increment ok[created] on the target sink"
5322            );
5323        });
5324    }
5325
5326    #[test]
5327    fn backfill_failure_records_err_stat() {
5328        // Subscriber whose on_event always fails (no filesystem needed).
5329        struct FailingSub {
5330            uri: String,
5331        }
5332        impl CardSubscriber for FailingSub {
5333            fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
5334                Err("synthetic backfill failure".into())
5335            }
5336            fn has_card(&self, _card_id: &str) -> Result<bool, String> {
5337                Ok(false)
5338            }
5339            fn describe(&self) -> String {
5340                self.uri.clone()
5341            }
5342        }
5343        let uri = "mock://backfill-fail".to_string();
5344        let failing: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5345        with_bus_subscribers(vec![failing], |bus| {
5346            bus.replace_subscribers_for_test(Vec::new());
5347            let (_primary, store, _ids) = backfill_primary_with_cards("backfill_fail_pkg", 1);
5348            // Reinstall the failing subscriber for the backfill phase.
5349            let reinstall: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5350            bus.replace_subscribers_for_test(vec![reinstall]);
5351
5352            let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5353            assert_eq!(
5354                report.failed.len(),
5355                1,
5356                "failed must record the synthetic err"
5357            );
5358            assert!(report.pushed.is_empty());
5359            let snap = bus.stats().snapshot();
5360            let row = snap.iter().find(|r| r.sink == uri).expect("row");
5361            assert!(
5362                row.err.get("created").copied().unwrap_or(0) >= 1,
5363                "failing publish must increment err[created]"
5364            );
5365            assert!(row.last_error.is_some());
5366        });
5367    }
5368
5369    #[test]
5370    fn test_oncelock_set_after_init_returns_err() {
5371        // Force init (no-op if already initialized by a prior test).
5372        let _ = event_bus();
5373        let result = install_event_bus_for_test(CardEventBus::new(Vec::new()));
5374        assert!(
5375            result.is_err(),
5376            "install after init must return Err per OnceLock contract"
5377        );
5378        assert_eq!(result.unwrap_err(), "bus already initialized");
5379    }
5380}