1use std::collections::{HashMap, HashSet};
87use std::fs;
88use std::path::{Path, PathBuf};
89use std::process;
90use std::sync::atomic::{AtomicU64, Ordering};
91use std::sync::{Arc, Mutex, OnceLock};
92
93use serde::{Serialize, Serializer};
94use serde_json::{json, Value as Json};
95
96pub const SCHEMA_VERSION: &str = "card/v0";
97
98pub trait CardStore: Send + Sync {
122 fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
133
134 fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
140
141 fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String>;
143
144 fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String>;
146
147 fn list_card_locators(
154 &self,
155 pkg_filter: Option<&str>,
156 ) -> Result<Vec<(String, PathBuf)>, String>;
157
158 fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String>;
162
163 fn read_aliases(&self) -> Result<Vec<Alias>, String>;
166 fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String>;
167
168 fn samples_exists(&self, card_id: &str) -> Result<bool, String>;
172
173 fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String>;
179
180 fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String>;
183
184 fn import_from_dir(
190 &self,
191 source_dir: &Path,
192 pkg: &str,
193 ) -> Result<(Vec<String>, Vec<String>), String>;
194}
195
196fn validate_name(name: &str, kind: &str) -> Result<(), String> {
197 if name.is_empty()
198 || name.contains('/')
199 || name.contains('\\')
200 || name.contains("..")
201 || name.contains('\0')
202 {
203 return Err(format!("Invalid {kind} name: '{name}'"));
204 }
205 Ok(())
206}
207
208fn djb2_hex(s: &str) -> String {
210 let mut h: u64 = 5381;
211 for b in s.bytes() {
212 h = h.wrapping_mul(33).wrapping_add(b as u64);
213 }
214 format!("{h:016x}")
215}
216
217fn hash6(s: &str) -> String {
224 let hex = djb2_hex(s);
225 let start = hex.len().saturating_sub(6);
226 hex[start..].to_string()
227}
228
229fn stable_json(v: &Json) -> String {
231 let mut buf = String::new();
232 stable_json_into(v, &mut buf);
233 buf
234}
235fn stable_json_into(v: &Json, buf: &mut String) {
236 match v {
237 Json::Null => buf.push_str("null"),
238 Json::Bool(b) => buf.push_str(if *b { "true" } else { "false" }),
239 Json::Number(n) => buf.push_str(&n.to_string()),
240 Json::String(s) => {
241 buf.push('"');
242 buf.push_str(s);
243 buf.push('"');
244 }
245 Json::Array(a) => {
246 buf.push('[');
247 for (i, item) in a.iter().enumerate() {
248 if i > 0 {
249 buf.push(',');
250 }
251 stable_json_into(item, buf);
252 }
253 buf.push(']');
254 }
255 Json::Object(m) => {
256 let mut keys: Vec<&String> = m.keys().collect();
257 keys.sort();
258 buf.push('{');
259 for (i, k) in keys.iter().enumerate() {
260 if i > 0 {
261 buf.push(',');
262 }
263 buf.push('"');
264 buf.push_str(k);
265 buf.push_str("\":");
266 stable_json_into(&m[*k], buf);
267 }
268 buf.push('}');
269 }
270 }
271}
272
273fn short_model(id: &str) -> String {
276 if id.is_empty() {
277 return "model".into();
278 }
279 let stripped = id
281 .strip_prefix("claude-")
282 .or_else(|| id.strip_prefix("gpt-"))
283 .unwrap_or(id);
284 let s: String = stripped
286 .chars()
287 .filter(|c| c.is_ascii_alphanumeric())
288 .collect();
289 if s.is_empty() {
290 "model".into()
291 } else {
292 s
293 }
294}
295
296fn now_rfc3339() -> String {
298 let secs = std::time::SystemTime::now()
299 .duration_since(std::time::UNIX_EPOCH)
300 .map(|d| d.as_secs())
301 .unwrap_or(0) as i64;
302 let days = secs.div_euclid(86400);
303 let tod = secs.rem_euclid(86400);
304 let (y, mo, d) = civil_from_days(days);
305 let hh = tod / 3600;
306 let mm = (tod % 3600) / 60;
307 let ss = tod % 60;
308 format!("{y:04}-{mo:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
309}
310
311fn now_compact() -> String {
317 let secs = std::time::SystemTime::now()
318 .duration_since(std::time::UNIX_EPOCH)
319 .map(|d| d.as_secs())
320 .unwrap_or(0) as i64;
321 let days = secs.div_euclid(86400);
322 let tod = secs.rem_euclid(86400);
323 let (y, mo, d) = civil_from_days(days);
324 let hh = tod / 3600;
325 let mm = (tod % 3600) / 60;
326 let ss = tod % 60;
327 format!("{y:04}{mo:02}{d:02}T{hh:02}{mm:02}{ss:02}")
328}
329
330fn civil_from_days(z: i64) -> (i32, u32, u32) {
332 let z = z + 719468;
333 let era = if z >= 0 { z } else { z - 146096 } / 146097;
334 let doe = (z - era * 146097) as u64;
335 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
336 let y = yoe as i64 + era * 400;
337 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
338 let mp = (5 * doy + 2) / 153;
339 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
340 let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
341 let y = y + if m <= 2 { 1 } else { 0 };
342 (y as i32, m, d)
343}
344
345fn json_to_toml(v: Json) -> Result<toml::Value, String> {
348 Ok(match v {
349 Json::Null => return Err("TOML does not support null values".into()),
350 Json::Bool(b) => toml::Value::Boolean(b),
351 Json::Number(n) => {
352 if let Some(i) = n.as_i64() {
353 toml::Value::Integer(i)
354 } else if let Some(f) = n.as_f64() {
355 toml::Value::Float(f)
356 } else {
357 return Err(format!("Unsupported number: {n}"));
358 }
359 }
360 Json::String(s) => toml::Value::String(s),
361 Json::Array(a) => {
362 let mut out = Vec::with_capacity(a.len());
363 for item in a {
364 if !item.is_null() {
365 out.push(json_to_toml(item)?);
366 }
367 }
368 toml::Value::Array(out)
369 }
370 Json::Object(m) => {
371 let mut table = toml::map::Map::new();
372 for (k, val) in m {
373 if val.is_null() {
374 continue;
375 }
376 table.insert(k, json_to_toml(val)?);
377 }
378 toml::Value::Table(table)
379 }
380 })
381}
382
383fn toml_to_json(v: toml::Value) -> Json {
385 match v {
386 toml::Value::String(s) => Json::String(s),
387 toml::Value::Integer(i) => json!(i),
388 toml::Value::Float(f) => json!(f),
389 toml::Value::Boolean(b) => Json::Bool(b),
390 toml::Value::Datetime(dt) => Json::String(dt.to_string()),
391 toml::Value::Array(a) => Json::Array(a.into_iter().map(toml_to_json).collect()),
392 toml::Value::Table(t) => {
393 let mut m = serde_json::Map::new();
394 for (k, v) in t {
395 m.insert(k, toml_to_json(v));
396 }
397 Json::Object(m)
398 }
399 }
400}
401
402fn require_pkg_name(input: &Json) -> Result<String, String> {
404 let name = input
405 .get("pkg")
406 .and_then(|p| p.get("name"))
407 .and_then(|n| n.as_str())
408 .ok_or_else(|| "alc.card.create: pkg.name is required".to_string())?
409 .to_string();
410 validate_name(&name, "pkg")?;
411 Ok(name)
412}
413
414pub fn create_with_store(
416 store: &dyn CardStore,
417 mut input: Json,
418) -> Result<(String, PathBuf), String> {
419 if !input.is_object() {
420 return Err("alc.card.create: input must be a table".into());
421 }
422 let pkg_name = require_pkg_name(&input)?;
423 let obj = input.as_object_mut().unwrap();
424
425 obj.entry("schema_version".to_string())
427 .or_insert_with(|| json!(SCHEMA_VERSION));
428 obj.entry("created_at".to_string())
429 .or_insert_with(|| json!(now_rfc3339()));
430 obj.entry("created_by".to_string())
431 .or_insert_with(|| json!(format!("alc@{}", env!("CARGO_PKG_VERSION"))));
432
433 if let Some(params) = obj.get("params").cloned() {
435 if params.is_object() {
436 let fp = djb2_hex(&stable_json(¶ms));
437 obj.insert("param_fingerprint".to_string(), json!(fp));
438 }
439 }
440
441 let card_id = match obj.get("card_id").and_then(|v| v.as_str()) {
443 Some(id) if !id.is_empty() => id.to_string(),
444 _ => {
445 let model_id = obj
446 .get("model")
447 .and_then(|m| m.get("id"))
448 .and_then(|v| v.as_str())
449 .unwrap_or("");
450 let model_short = short_model(model_id);
451 let ts = now_compact();
452 let fp_seed = stable_json(&Json::Object(obj.clone()));
453 let h = hash6(&fp_seed);
454 format!("{pkg_name}_{model_short}_{ts}_{h}")
455 }
456 };
457 validate_name(&card_id, "card_id")?;
458 obj.insert("card_id".to_string(), json!(card_id.clone()));
459
460 let toml_val = json_to_toml(input)?;
461 let text = toml::to_string_pretty(&toml_val)
462 .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
463 let path = store.write_new_card(&pkg_name, &card_id, &text)?;
464
465 publish(CardEvent::Created {
466 pkg: pkg_name.clone(),
467 card_id: card_id.clone(),
468 toml_text: text,
469 });
470
471 Ok((card_id, path))
472}
473
474pub fn get_with_store(store: &dyn CardStore, card_id: &str) -> Result<Option<Json>, String> {
476 let text = match store.read_card_text(card_id)? {
477 Some(t) => t,
478 None => return Ok(None),
479 };
480 let val: toml::Value =
481 toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
482 Ok(Some(toml_to_json(val)))
483}
484
485#[derive(Debug, Clone)]
487pub struct Summary {
488 pub card_id: String,
489 pub pkg: String,
490 pub created_at: Option<String>,
491 pub model: Option<String>,
492 pub scenario: Option<String>,
493 pub pass_rate: Option<f64>,
494}
495
496impl Summary {
497 fn to_json(&self) -> Json {
498 let mut m = serde_json::Map::new();
499 m.insert("card_id".into(), json!(self.card_id));
500 m.insert("pkg".into(), json!(self.pkg));
501 if let Some(v) = &self.created_at {
502 m.insert("created_at".into(), json!(v));
503 }
504 if let Some(v) = &self.model {
505 m.insert("model".into(), json!(v));
506 }
507 if let Some(v) = &self.scenario {
508 m.insert("scenario".into(), json!(v));
509 }
510 if let Some(v) = self.pass_rate {
511 m.insert("pass_rate".into(), json!(v));
512 }
513 Json::Object(m)
514 }
515}
516
517fn summarize(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<Summary> {
518 let text = store.read_locator_text(locator).ok().flatten()?;
519 let val: toml::Value = toml::from_str(&text).ok()?;
520 let card_id = val
521 .get("card_id")
522 .and_then(|v| v.as_str())
523 .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
524 .to_string();
525 let created_at = val
526 .get("created_at")
527 .and_then(|v| v.as_str())
528 .map(String::from);
529 let model = val
530 .get("model")
531 .and_then(|m| m.get("id"))
532 .and_then(|v| v.as_str())
533 .map(String::from);
534 let scenario = val
535 .get("scenario")
536 .and_then(|s| s.get("name"))
537 .and_then(|v| v.as_str())
538 .map(String::from);
539 let pass_rate = val
540 .get("stats")
541 .and_then(|s| s.get("pass_rate"))
542 .and_then(|v| v.as_float());
543 Some(Summary {
544 card_id,
545 pkg: pkg.to_string(),
546 created_at,
547 model,
548 scenario,
549 pass_rate,
550 })
551}
552
553pub fn list_with_store(
555 store: &dyn CardStore,
556 pkg_filter: Option<&str>,
557) -> Result<Vec<Summary>, String> {
558 let locators = store.list_card_locators(pkg_filter)?;
559 let mut out = Vec::with_capacity(locators.len());
560 for (pkg, loc) in &locators {
561 if let Some(s) = summarize(store, loc, pkg) {
562 out.push(s);
563 }
564 }
565
566 out.sort_by(|a, b| {
570 b.created_at
571 .cmp(&a.created_at)
572 .then_with(|| b.card_id.cmp(&a.card_id))
573 });
574 Ok(out)
575}
576
577pub fn summaries_to_json(rows: &[Summary]) -> Json {
578 Json::Array(rows.iter().map(|s| s.to_json()).collect())
579}
580
581pub fn append_with_store(
594 store: &dyn CardStore,
595 card_id: &str,
596 fields: Json,
597) -> Result<Json, String> {
598 let text = store
599 .read_card_text(card_id)?
600 .ok_or_else(|| format!("alc.card.append: card '{card_id}' not found"))?;
601 let fields_obj = match fields {
602 Json::Object(m) => m,
603 _ => return Err("alc.card.append: fields must be a table".into()),
604 };
605
606 let existing: toml::Value =
607 toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
608 let mut existing_json = toml_to_json(existing);
609 let existing_obj = existing_json
610 .as_object_mut()
611 .ok_or_else(|| format!("Card '{card_id}' is not a table"))?;
612
613 for (k, v) in fields_obj {
614 if existing_obj.contains_key(&k) {
615 return Err(format!(
616 "alc.card.append: key '{k}' already set on card '{card_id}' (immutable)"
617 ));
618 }
619 if !v.is_null() {
620 existing_obj.insert(k, v);
621 }
622 }
623
624 let toml_val = json_to_toml(existing_json.clone())?;
625 let text = toml::to_string_pretty(&toml_val)
626 .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
627 store.overwrite_card(card_id, &text)?;
628
629 publish(CardEvent::Appended {
630 card_id: card_id.to_string(),
631 toml_text: text,
632 });
633
634 Ok(existing_json)
635}
636
637#[derive(Debug, Clone)]
638pub struct Alias {
639 pub name: String,
640 pub card_id: String,
641 pub pkg: Option<String>,
642 pub set_at: String,
643 pub note: Option<String>,
644}
645
646impl Alias {
647 fn to_json(&self) -> Json {
648 let mut m = serde_json::Map::new();
649 m.insert("name".into(), json!(self.name));
650 m.insert("card_id".into(), json!(self.card_id));
651 if let Some(p) = &self.pkg {
652 m.insert("pkg".into(), json!(p));
653 }
654 m.insert("set_at".into(), json!(self.set_at));
655 if let Some(n) = &self.note {
656 m.insert("note".into(), json!(n));
657 }
658 Json::Object(m)
659 }
660}
661
662pub fn alias_set_with_store(
668 store: &dyn CardStore,
669 name: &str,
670 card_id: &str,
671 pkg: Option<&str>,
672 note: Option<&str>,
673) -> Result<Alias, String> {
674 validate_name(name, "alias")?;
675 if store.find_card_locator(card_id)?.is_none() {
676 return Err(format!("alc.card.alias_set: card '{card_id}' not found"));
677 }
678 let mut aliases = store.read_aliases()?;
679 aliases.retain(|a| a.name != name);
680 let entry = Alias {
681 name: name.to_string(),
682 card_id: card_id.to_string(),
683 pkg: pkg.map(String::from),
684 set_at: now_rfc3339(),
685 note: note.map(String::from),
686 };
687 aliases.push(entry.clone());
688 store.write_aliases(&aliases)?;
689
690 match serialize_aliases_toml(&aliases) {
696 Ok(text) => publish(CardEvent::AliasesWritten { toml_text: text }),
697 Err(e) => tracing::warn!(error = %e, "alias_set: failed to serialize aliases for publish"),
698 }
699
700 Ok(entry)
701}
702
703fn serialize_aliases_toml(aliases: &[Alias]) -> Result<String, String> {
707 let mut arr = Vec::with_capacity(aliases.len());
708 for a in aliases {
709 let mut t = toml::map::Map::new();
710 t.insert("name".into(), toml::Value::String(a.name.clone()));
711 t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
712 if let Some(p) = &a.pkg {
713 t.insert("pkg".into(), toml::Value::String(p.clone()));
714 }
715 t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
716 if let Some(n) = &a.note {
717 t.insert("note".into(), toml::Value::String(n.clone()));
718 }
719 arr.push(toml::Value::Table(t));
720 }
721 let mut root = toml::map::Map::new();
722 root.insert("alias".into(), toml::Value::Array(arr));
723 toml::to_string_pretty(&toml::Value::Table(root))
724 .map_err(|e| format!("Failed to serialize aliases: {e}"))
725}
726
727pub fn get_by_alias_with_store(store: &dyn CardStore, name: &str) -> Result<Option<Json>, String> {
733 validate_name(name, "alias")?;
734 let aliases = store.read_aliases()?;
735 let Some(alias) = aliases.into_iter().find(|a| a.name == name) else {
736 return Ok(None);
737 };
738 match get_with_store(store, &alias.card_id)? {
739 Some(card) => Ok(Some(card)),
740 None => Err(format!(
741 "alc.card.get_by_alias: alias '{name}' points at missing card '{}'",
742 alias.card_id
743 )),
744 }
745}
746
747pub fn alias_list_with_store(
749 store: &dyn CardStore,
750 pkg_filter: Option<&str>,
751) -> Result<Vec<Alias>, String> {
752 let mut aliases = store.read_aliases()?;
753 if let Some(p) = pkg_filter {
754 aliases.retain(|a| a.pkg.as_deref() == Some(p));
755 }
756 Ok(aliases)
757}
758
759pub fn aliases_to_json(rows: &[Alias]) -> Json {
760 Json::Array(rows.iter().map(|a| a.to_json()).collect())
761}
762
763#[derive(Debug, Clone, PartialEq)]
797pub enum CmpOp {
798 Eq,
799 Ne,
800 Lt,
801 Lte,
802 Gt,
803 Gte,
804 In,
805 Nin,
806 Exists,
807 Contains,
808 StartsWith,
809}
810
811impl CmpOp {
812 fn from_key(k: &str) -> Option<Self> {
813 Some(match k {
814 "eq" => Self::Eq,
815 "ne" => Self::Ne,
816 "lt" => Self::Lt,
817 "lte" => Self::Lte,
818 "gt" => Self::Gt,
819 "gte" => Self::Gte,
820 "in" => Self::In,
821 "nin" => Self::Nin,
822 "exists" => Self::Exists,
823 "contains" => Self::Contains,
824 "starts_with" => Self::StartsWith,
825 _ => return None,
826 })
827 }
828}
829
830#[derive(Debug, Clone)]
833pub struct Comparison {
834 pub path: Vec<String>,
835 pub op: CmpOp,
836 pub value: Json,
837}
838
839#[derive(Debug, Clone)]
841pub enum Predicate {
842 And(Vec<Predicate>),
843 Or(Vec<Predicate>),
844 Not(Box<Predicate>),
845 Cmp(Comparison),
846}
847
848fn is_operator_object(obj: &serde_json::Map<String, Json>) -> bool {
851 if obj.is_empty() {
852 return false;
853 }
854 obj.keys().all(|k| CmpOp::from_key(k).is_some())
855}
856
857pub fn parse_where(value: &Json) -> Result<Predicate, String> {
862 parse_predicate(value, &[])
863}
864
865fn parse_predicate(value: &Json, prefix: &[String]) -> Result<Predicate, String> {
866 let obj = value
867 .as_object()
868 .ok_or_else(|| "where clause must be a table".to_string())?;
869
870 let mut clauses: Vec<Predicate> = Vec::new();
871
872 for (key, val) in obj {
873 match key.as_str() {
874 "_and" => {
875 let arr = val
876 .as_array()
877 .ok_or_else(|| "_and must be an array of sub-predicates".to_string())?;
878 let mut subs = Vec::with_capacity(arr.len());
879 for sub in arr {
880 subs.push(parse_predicate(sub, prefix)?);
881 }
882 clauses.push(Predicate::And(subs));
883 }
884 "_or" => {
885 let arr = val
886 .as_array()
887 .ok_or_else(|| "_or must be an array of sub-predicates".to_string())?;
888 let mut subs = Vec::with_capacity(arr.len());
889 for sub in arr {
890 subs.push(parse_predicate(sub, prefix)?);
891 }
892 clauses.push(Predicate::Or(subs));
893 }
894 "_not" => {
895 clauses.push(Predicate::Not(Box::new(parse_predicate(val, prefix)?)));
896 }
897 _ => {
898 let mut new_path = prefix.to_vec();
900 new_path.push(key.clone());
901
902 match val {
903 Json::Object(m) if is_operator_object(m) => {
904 for (op_key, op_val) in m {
906 let op = CmpOp::from_key(op_key).expect("validated above");
907 clauses.push(Predicate::Cmp(Comparison {
908 path: new_path.clone(),
909 op,
910 value: op_val.clone(),
911 }));
912 }
913 }
914 Json::Object(_) => {
915 clauses.push(parse_predicate(val, &new_path)?);
917 }
918 _ => {
919 clauses.push(Predicate::Cmp(Comparison {
921 path: new_path,
922 op: CmpOp::Eq,
923 value: val.clone(),
924 }));
925 }
926 }
927 }
928 }
929 }
930
931 if clauses.len() == 1 {
932 Ok(clauses.remove(0))
933 } else {
934 Ok(Predicate::And(clauses))
935 }
936}
937
938fn fetch_path<'a>(card: &'a Json, path: &[String]) -> Option<&'a Json> {
940 let mut node = card;
941 for key in path {
942 let obj = node.as_object()?;
943 node = obj.get(key)?;
944 }
945 Some(node)
946}
947
948fn json_cmp(a: &Json, b: &Json) -> Option<std::cmp::Ordering> {
951 match (a, b) {
952 (Json::Number(x), Json::Number(y)) => {
953 let xf = x.as_f64()?;
954 let yf = y.as_f64()?;
955 xf.partial_cmp(&yf)
956 }
957 (Json::String(x), Json::String(y)) => Some(x.cmp(y)),
958 (Json::Bool(x), Json::Bool(y)) => Some(x.cmp(y)),
959 _ => None,
960 }
961}
962
963fn json_eq(a: &Json, b: &Json) -> bool {
964 match (a, b) {
965 (Json::Number(x), Json::Number(y)) => match (x.as_f64(), y.as_f64()) {
966 (Some(xf), Some(yf)) => xf == yf,
967 _ => a == b,
968 },
969 _ => a == b,
970 }
971}
972
973fn eval_cmp(cmp: &Comparison, card: &Json) -> bool {
974 let actual = fetch_path(card, &cmp.path);
975 let exists = actual.is_some();
976
977 match cmp.op {
978 CmpOp::Exists => {
979 let want = cmp.value.as_bool().unwrap_or(true);
980 exists == want
981 }
982 CmpOp::Ne => match actual {
983 None => true,
984 Some(v) => !json_eq(v, &cmp.value),
985 },
986 CmpOp::Nin => match actual {
987 None => true,
988 Some(v) => match cmp.value.as_array() {
989 Some(arr) => !arr.iter().any(|e| json_eq(e, v)),
990 None => false,
991 },
992 },
993 CmpOp::Eq => actual.is_some_and(|v| json_eq(v, &cmp.value)),
994 CmpOp::In => actual.is_some_and(|v| match cmp.value.as_array() {
995 Some(arr) => arr.iter().any(|e| json_eq(e, v)),
996 None => false,
997 }),
998 CmpOp::Lt | CmpOp::Lte | CmpOp::Gt | CmpOp::Gte => {
999 let Some(v) = actual else { return false };
1000 let Some(ord) = json_cmp(v, &cmp.value) else {
1001 return false;
1002 };
1003 use std::cmp::Ordering::{Equal, Greater, Less};
1004 matches!(
1005 (&cmp.op, ord),
1006 (CmpOp::Lt, Less)
1007 | (CmpOp::Lte, Less | Equal)
1008 | (CmpOp::Gt, Greater)
1009 | (CmpOp::Gte, Greater | Equal)
1010 )
1011 }
1012 CmpOp::Contains => {
1013 let Some(Json::String(haystack)) = actual else {
1014 return false;
1015 };
1016 let Some(needle) = cmp.value.as_str() else {
1017 return false;
1018 };
1019 haystack.contains(needle)
1020 }
1021 CmpOp::StartsWith => {
1022 let Some(Json::String(haystack)) = actual else {
1023 return false;
1024 };
1025 let Some(needle) = cmp.value.as_str() else {
1026 return false;
1027 };
1028 haystack.starts_with(needle)
1029 }
1030 }
1031}
1032
1033pub fn eval_predicate(pred: &Predicate, card: &Json) -> bool {
1035 match pred {
1036 Predicate::And(subs) => subs.iter().all(|p| eval_predicate(p, card)),
1037 Predicate::Or(subs) => subs.iter().any(|p| eval_predicate(p, card)),
1038 Predicate::Not(sub) => !eval_predicate(sub, card),
1039 Predicate::Cmp(c) => eval_cmp(c, card),
1040 }
1041}
1042
1043#[derive(Debug, Clone)]
1049pub struct OrderKey {
1050 pub path: Vec<String>,
1051 pub desc: bool,
1052}
1053
1054impl OrderKey {
1055 fn parse(raw: &str) -> Result<Self, String> {
1056 if raw.is_empty() {
1057 return Err("order_by key must not be empty".into());
1058 }
1059 let (desc, rest) = if let Some(r) = raw.strip_prefix('-') {
1060 (true, r)
1061 } else {
1062 (false, raw)
1063 };
1064 let path: Vec<String> = rest.split('.').map(|s| s.to_string()).collect();
1065 if path.iter().any(|p| p.is_empty()) {
1066 return Err(format!("invalid order_by key: '{raw}'"));
1067 }
1068 Ok(Self { path, desc })
1069 }
1070}
1071
1072pub fn parse_order_by(value: &Json) -> Result<Vec<OrderKey>, String> {
1076 match value {
1077 Json::String(s) => Ok(vec![OrderKey::parse(s)?]),
1078 Json::Array(arr) => {
1079 let mut out = Vec::with_capacity(arr.len());
1080 for v in arr {
1081 let s = v
1082 .as_str()
1083 .ok_or_else(|| "order_by array must contain strings".to_string())?;
1084 out.push(OrderKey::parse(s)?);
1085 }
1086 Ok(out)
1087 }
1088 _ => Err("order_by must be a string or array of strings".into()),
1089 }
1090}
1091
1092#[derive(Debug, Default, Clone)]
1094pub struct FindQuery {
1095 pub pkg: Option<String>,
1097 pub where_: Option<Predicate>,
1099 pub order_by: Vec<OrderKey>,
1101 pub limit: Option<usize>,
1102 pub offset: Option<usize>,
1103}
1104
1105#[derive(Debug, Clone)]
1111struct CardRow {
1112 full: Json,
1113 summary: Summary,
1114}
1115
1116fn load_full(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<CardRow> {
1118 let text = store.read_locator_text(locator).ok().flatten()?;
1119 let val: toml::Value = toml::from_str(&text).ok()?;
1120 let json = toml_to_json(val);
1121
1122 let card_id = json
1123 .get("card_id")
1124 .and_then(|v| v.as_str())
1125 .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
1126 .to_string();
1127 let created_at = json
1128 .get("created_at")
1129 .and_then(|v| v.as_str())
1130 .map(String::from);
1131 let model = json
1132 .get("model")
1133 .and_then(|m| m.get("id"))
1134 .and_then(|v| v.as_str())
1135 .map(String::from);
1136 let scenario = json
1137 .get("scenario")
1138 .and_then(|s| s.get("name"))
1139 .and_then(|v| v.as_str())
1140 .map(String::from);
1141 let pass_rate = json
1142 .get("stats")
1143 .and_then(|s| s.get("pass_rate"))
1144 .and_then(|v| v.as_f64());
1145
1146 Some(CardRow {
1147 full: json,
1148 summary: Summary {
1149 card_id,
1150 pkg: pkg.to_string(),
1151 created_at,
1152 model,
1153 scenario,
1154 pass_rate,
1155 },
1156 })
1157}
1158
1159fn order_cards(a: &CardRow, b: &CardRow, keys: &[OrderKey]) -> std::cmp::Ordering {
1161 use std::cmp::Ordering;
1162 for k in keys {
1163 let va = fetch_path(&a.full, &k.path);
1164 let vb = fetch_path(&b.full, &k.path);
1165 let ord = match (va, vb) {
1166 (None, None) => Ordering::Equal,
1167 (None, Some(_)) => Ordering::Greater, (Some(_), None) => Ordering::Less,
1169 (Some(x), Some(y)) => json_cmp(x, y).unwrap_or(Ordering::Equal),
1170 };
1171 let ord = if k.desc { ord.reverse() } else { ord };
1172 if ord != Ordering::Equal {
1173 return ord;
1174 }
1175 }
1176 Ordering::Equal
1177}
1178
1179const SUMMARY_SORT_FIELDS: &[&str] = &[
1181 "card_id",
1182 "created_at",
1183 "stats.pass_rate",
1184 "scenario.name",
1185 "model.id",
1186];
1187
1188fn is_lightweight_query(q: &FindQuery) -> bool {
1191 q.where_.is_none()
1192 && q.order_by
1193 .iter()
1194 .all(|k| SUMMARY_SORT_FIELDS.contains(&k.path.join(".").as_str()))
1195}
1196
1197fn order_summaries(a: &Summary, b: &Summary, keys: &[OrderKey]) -> std::cmp::Ordering {
1199 use std::cmp::Ordering;
1200 for k in keys {
1201 let key_str = k.path.join(".");
1202 let ord = match key_str.as_str() {
1203 "card_id" => a.card_id.cmp(&b.card_id),
1204 "created_at" => a.created_at.cmp(&b.created_at),
1205 "stats.pass_rate" => match (a.pass_rate, b.pass_rate) {
1206 (None, None) => Ordering::Equal,
1207 (None, Some(_)) => Ordering::Greater,
1208 (Some(_), None) => Ordering::Less,
1209 (Some(x), Some(y)) => x.partial_cmp(&y).unwrap_or(Ordering::Equal),
1210 },
1211 "scenario.name" => a.scenario.cmp(&b.scenario),
1212 "model.id" => a.model.cmp(&b.model),
1213 _ => Ordering::Equal,
1214 };
1215 let ord = if k.desc { ord.reverse() } else { ord };
1216 if ord != Ordering::Equal {
1217 return ord;
1218 }
1219 }
1220 Ordering::Equal
1221}
1222
1223pub fn find_with_store(store: &dyn CardStore, q: FindQuery) -> Result<Vec<Summary>, String> {
1229 if is_lightweight_query(&q) {
1231 let mut rows = list_with_store(store, q.pkg.as_deref())?;
1232 if q.order_by.is_empty() {
1233 rows.sort_by(|a, b| {
1234 b.created_at
1235 .cmp(&a.created_at)
1236 .then_with(|| b.card_id.cmp(&a.card_id))
1237 });
1238 } else {
1239 rows.sort_by(|a, b| order_summaries(a, b, &q.order_by));
1240 }
1241 let out: Vec<Summary> = rows
1242 .into_iter()
1243 .skip(q.offset.unwrap_or(0))
1244 .take(q.limit.unwrap_or(usize::MAX))
1245 .collect();
1246 return Ok(out);
1247 }
1248
1249 let all_rows = scan_cards(store, q.pkg.as_deref())?;
1251
1252 let mut rows: Vec<CardRow> = if let Some(pred) = &q.where_ {
1254 all_rows
1255 .into_iter()
1256 .filter(|row| eval_predicate(pred, &row.full))
1257 .collect()
1258 } else {
1259 all_rows
1260 };
1261
1262 if q.order_by.is_empty() {
1264 rows.sort_by(|a, b| {
1265 b.summary
1266 .created_at
1267 .cmp(&a.summary.created_at)
1268 .then_with(|| b.summary.card_id.cmp(&a.summary.card_id))
1269 });
1270 } else {
1271 rows.sort_by(|a, b| order_cards(a, b, &q.order_by));
1272 }
1273
1274 let out: Vec<Summary> = rows
1276 .into_iter()
1277 .skip(q.offset.unwrap_or(0))
1278 .take(q.limit.unwrap_or(usize::MAX))
1279 .map(|r| r.summary)
1280 .collect();
1281
1282 Ok(out)
1283}
1284
1285#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1301pub enum LineageDirection {
1302 #[default]
1303 Up,
1304 Down,
1305 Both,
1306}
1307
1308impl LineageDirection {
1309 pub fn parse(s: &str) -> Result<Self, String> {
1310 match s {
1311 "up" => Ok(Self::Up),
1312 "down" => Ok(Self::Down),
1313 "both" => Ok(Self::Both),
1314 other => Err(format!(
1315 "direction must be 'up', 'down', or 'both' (got '{other}')"
1316 )),
1317 }
1318 }
1319}
1320
1321#[derive(Debug, Clone, Default)]
1323pub struct LineageQuery {
1324 pub card_id: String,
1325 pub direction: LineageDirection,
1326 pub depth: Option<usize>,
1328 pub include_stats: bool,
1330 pub relation_filter: Option<Vec<String>>,
1333}
1334
1335#[derive(Debug, Clone)]
1340pub struct LineageNode {
1341 pub card_id: String,
1342 pub pkg: String,
1343 pub prior_card_id: Option<String>,
1344 pub prior_relation: Option<String>,
1345 pub depth: i32,
1346 pub stats: Option<Json>,
1347}
1348
1349#[derive(Debug, Clone)]
1351pub struct LineageEdge {
1352 pub from: String,
1353 pub to: String,
1354 pub relation: Option<String>,
1355}
1356
1357#[derive(Debug, Clone)]
1359pub struct LineageResult {
1360 pub root: String,
1361 pub nodes: Vec<LineageNode>,
1362 pub edges: Vec<LineageEdge>,
1363 pub truncated: bool,
1364}
1365
1366const DEFAULT_LINEAGE_DEPTH: usize = 10;
1367
1368fn lineage_fields(card: &Json) -> (Option<String>, Option<String>) {
1371 let meta = card.get("metadata");
1372 let prior_card_id = meta
1373 .and_then(|m| m.get("prior_card_id"))
1374 .and_then(|v| v.as_str())
1375 .map(String::from);
1376 let prior_relation = meta
1377 .and_then(|m| m.get("prior_relation"))
1378 .and_then(|v| v.as_str())
1379 .map(String::from);
1380 (prior_card_id, prior_relation)
1381}
1382
1383fn make_node(row: &CardRow, depth: i32, include_stats: bool) -> LineageNode {
1385 let (prior_card_id, prior_relation) = lineage_fields(&row.full);
1386 let stats = if include_stats {
1387 row.full.get("stats").cloned()
1388 } else {
1389 None
1390 };
1391 LineageNode {
1392 card_id: row.summary.card_id.clone(),
1393 pkg: row.summary.pkg.clone(),
1394 prior_card_id,
1395 prior_relation,
1396 depth,
1397 stats,
1398 }
1399}
1400
1401fn relation_passes(filter: &Option<Vec<String>>, relation: &Option<String>) -> bool {
1404 match filter {
1405 None => true,
1406 Some(allowed) => match relation {
1407 Some(r) => allowed.iter().any(|a| a == r),
1408 None => false,
1409 },
1410 }
1411}
1412
1413struct CardIndex {
1415 cards: std::collections::HashMap<String, CardRow>,
1417 children: std::collections::HashMap<String, Vec<String>>,
1419}
1420
1421fn load_card_index(store: &dyn CardStore) -> Result<CardIndex, String> {
1425 let rows = scan_cards(store, None)?;
1426
1427 let mut cards = std::collections::HashMap::with_capacity(rows.len());
1428 let mut children: std::collections::HashMap<String, Vec<String>> =
1429 std::collections::HashMap::new();
1430
1431 for row in rows {
1432 let id = row.summary.card_id.clone();
1433 let (prior_id, _) = lineage_fields(&row.full);
1434 if let Some(parent) = prior_id {
1435 children.entry(parent).or_default().push(id.clone());
1436 }
1437 cards.insert(id, row);
1438 }
1439 Ok(CardIndex { cards, children })
1440}
1441
1442fn scan_cards(store: &dyn CardStore, pkg_filter: Option<&str>) -> Result<Vec<CardRow>, String> {
1446 let locators = store.list_card_locators(pkg_filter)?;
1447 let mut rows = Vec::with_capacity(locators.len());
1448 for (pkg, loc) in &locators {
1449 if let Some(row) = load_full(store, loc, pkg) {
1450 rows.push(row);
1451 }
1452 }
1453 Ok(rows)
1454}
1455
1456struct LineageCtx<'a> {
1458 index: &'a CardIndex,
1459 relation_filter: &'a Option<Vec<String>>,
1460 include_stats: bool,
1461 max_depth: usize,
1462}
1463
1464struct LineageAccum {
1466 nodes: Vec<LineageNode>,
1467 edges: Vec<LineageEdge>,
1468 visited: std::collections::HashSet<String>,
1469 truncated: bool,
1470}
1471
1472fn walk_up(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1474 let mut cur = start_id.to_string();
1475 for step in 1..=ctx.max_depth {
1476 let Some(row) = ctx.index.cards.get(&cur) else {
1477 return;
1478 };
1479 let (prior_id, prior_rel) = lineage_fields(&row.full);
1480 let Some(prior_id) = prior_id else {
1481 return;
1482 };
1483 if !relation_passes(ctx.relation_filter, &prior_rel) {
1484 return;
1485 }
1486 if acc.visited.contains(&prior_id) {
1487 return;
1488 }
1489 let Some(parent) = ctx.index.cards.get(&prior_id) else {
1490 return;
1491 };
1492 acc.nodes
1493 .push(make_node(parent, -(step as i32), ctx.include_stats));
1494 acc.edges.push(LineageEdge {
1495 from: row.summary.card_id.clone(),
1496 to: parent.summary.card_id.clone(),
1497 relation: prior_rel,
1498 });
1499 acc.visited.insert(prior_id.clone());
1500 cur = prior_id;
1501 }
1502 if let Some(row) = ctx.index.cards.get(&cur) {
1504 let (prior_id, _) = lineage_fields(&row.full);
1505 if prior_id
1506 .as_ref()
1507 .is_some_and(|p| ctx.index.cards.contains_key(p) && !acc.visited.contains(p))
1508 {
1509 acc.truncated = true;
1510 }
1511 }
1512}
1513
1514fn walk_down(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1517 let mut frontier: Vec<String> = vec![start_id.to_string()];
1518
1519 for depth in 1..=ctx.max_depth {
1520 let mut next_frontier: Vec<String> = Vec::new();
1521 for parent_id in &frontier {
1522 let children = match ctx.index.children.get(parent_id) {
1523 Some(c) => c,
1524 None => continue,
1525 };
1526 for child_id in children {
1527 if acc.visited.contains(child_id) {
1528 continue;
1529 }
1530 let Some(child) = ctx.index.cards.get(child_id) else {
1531 continue;
1532 };
1533 let (_, prior_rel) = lineage_fields(&child.full);
1534 if !relation_passes(ctx.relation_filter, &prior_rel) {
1535 continue;
1536 }
1537 acc.nodes
1538 .push(make_node(child, depth as i32, ctx.include_stats));
1539 acc.edges.push(LineageEdge {
1540 from: child.summary.card_id.clone(),
1541 to: parent_id.clone(),
1542 relation: prior_rel,
1543 });
1544 acc.visited.insert(child_id.clone());
1545 next_frontier.push(child_id.clone());
1546 }
1547 }
1548 if next_frontier.is_empty() {
1549 return;
1550 }
1551 frontier = next_frontier;
1552 }
1553 for parent_id in &frontier {
1556 let children = match ctx.index.children.get(parent_id) {
1557 Some(c) => c,
1558 None => continue,
1559 };
1560 for child_id in children {
1561 if acc.visited.contains(child_id) {
1562 continue;
1563 }
1564 let Some(child) = ctx.index.cards.get(child_id) else {
1565 continue;
1566 };
1567 let (_, prior_rel) = lineage_fields(&child.full);
1568 if relation_passes(ctx.relation_filter, &prior_rel) {
1569 acc.truncated = true;
1570 return;
1571 }
1572 }
1573 }
1574}
1575
1576pub fn lineage_with_store(
1578 store: &dyn CardStore,
1579 q: LineageQuery,
1580) -> Result<Option<LineageResult>, String> {
1581 let index = load_card_index(store)?;
1582 let Some(root_row) = index.cards.get(&q.card_id) else {
1583 return Ok(None);
1584 };
1585
1586 let ctx = LineageCtx {
1587 index: &index,
1588 relation_filter: &q.relation_filter,
1589 include_stats: q.include_stats,
1590 max_depth: q.depth.unwrap_or(DEFAULT_LINEAGE_DEPTH),
1591 };
1592 let mut acc = LineageAccum {
1593 nodes: Vec::new(),
1594 edges: Vec::new(),
1595 visited: std::collections::HashSet::new(),
1596 truncated: false,
1597 };
1598
1599 acc.nodes.push(make_node(root_row, 0, q.include_stats));
1600 acc.visited.insert(q.card_id.clone());
1601
1602 if matches!(q.direction, LineageDirection::Up | LineageDirection::Both) {
1603 walk_up(&q.card_id, &ctx, &mut acc);
1604 }
1605 if matches!(q.direction, LineageDirection::Down | LineageDirection::Both) {
1606 walk_down(&q.card_id, &ctx, &mut acc);
1607 }
1608
1609 Ok(Some(LineageResult {
1610 root: q.card_id,
1611 nodes: acc.nodes,
1612 edges: acc.edges,
1613 truncated: acc.truncated,
1614 }))
1615}
1616
1617pub fn lineage_to_json(r: &LineageResult) -> Json {
1619 let nodes: Vec<Json> = r
1620 .nodes
1621 .iter()
1622 .map(|n| {
1623 let mut m = serde_json::Map::new();
1624 m.insert("card_id".into(), json!(n.card_id));
1625 m.insert("pkg".into(), json!(n.pkg));
1626 m.insert("depth".into(), json!(n.depth));
1627 if let Some(p) = &n.prior_card_id {
1628 m.insert("prior_card_id".into(), json!(p));
1629 }
1630 if let Some(rel) = &n.prior_relation {
1631 m.insert("prior_relation".into(), json!(rel));
1632 }
1633 if let Some(s) = &n.stats {
1634 m.insert("stats".into(), s.clone());
1635 }
1636 Json::Object(m)
1637 })
1638 .collect();
1639 let edges: Vec<Json> = r
1640 .edges
1641 .iter()
1642 .map(|e| {
1643 let mut m = serde_json::Map::new();
1644 m.insert("from".into(), json!(e.from));
1645 m.insert("to".into(), json!(e.to));
1646 if let Some(rel) = &e.relation {
1647 m.insert("relation".into(), json!(rel));
1648 }
1649 Json::Object(m)
1650 })
1651 .collect();
1652 json!({
1653 "root": r.root,
1654 "nodes": nodes,
1655 "edges": edges,
1656 "truncated": r.truncated,
1657 })
1658}
1659
1660pub fn import_from_dir_with_store(
1680 store: &dyn CardStore,
1681 source_dir: &std::path::Path,
1682 pkg: &str,
1683) -> Result<(Vec<String>, Vec<String>), String> {
1684 let (imported, skipped) = store.import_from_dir(source_dir, pkg)?;
1685 for card_id in &imported {
1686 match store.read_card_text(card_id) {
1687 Ok(Some(toml_text)) => publish(CardEvent::Created {
1688 pkg: pkg.to_string(),
1689 card_id: card_id.clone(),
1690 toml_text,
1691 }),
1692 Ok(None) => {
1693 tracing::warn!(
1694 card_id = %card_id,
1695 "import_from_dir: read_card_text returned None after import; skipping publish"
1696 );
1697 }
1698 Err(e) => {
1699 tracing::warn!(
1700 card_id = %card_id,
1701 error = %e,
1702 "import_from_dir: read_card_text failed after import; skipping publish"
1703 );
1704 }
1705 }
1706 match store.read_samples_text(card_id) {
1708 Ok(Some(jsonl_text)) => publish(CardEvent::SamplesWritten {
1709 card_id: card_id.clone(),
1710 jsonl_text,
1711 }),
1712 Ok(None) => {}
1713 Err(e) => {
1714 tracing::warn!(
1715 card_id = %card_id,
1716 error = %e,
1717 "import_from_dir: read_samples_text failed after import; skipping publish"
1718 );
1719 }
1720 }
1721 }
1722 Ok((imported, skipped))
1723}
1724
1725pub fn write_samples_with_store(
1731 store: &dyn CardStore,
1732 card_id: &str,
1733 samples: Vec<Json>,
1734) -> Result<PathBuf, String> {
1735 if store.samples_exists(card_id)? {
1736 return Err(format!(
1737 "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
1738 ));
1739 }
1740 let mut buf = String::new();
1741 for (idx, s) in samples.iter().enumerate() {
1742 let line = serde_json::to_string(s).map_err(|e| {
1743 format!("alc.card.write_samples: failed to serialize sample #{idx}: {e}")
1744 })?;
1745 buf.push_str(&line);
1746 buf.push('\n');
1747 }
1748 let path = store.write_samples_text(card_id, &buf)?;
1749
1750 publish(CardEvent::SamplesWritten {
1751 card_id: card_id.to_string(),
1752 jsonl_text: buf,
1753 });
1754
1755 Ok(path)
1756}
1757
1758#[derive(Debug, Default, Clone)]
1760pub struct SamplesQuery {
1761 pub offset: usize,
1763 pub limit: Option<usize>,
1765 pub where_: Option<Predicate>,
1768}
1769
1770pub fn read_samples_with_store(
1780 store: &dyn CardStore,
1781 card_id: &str,
1782 q: SamplesQuery,
1783) -> Result<Vec<Json>, String> {
1784 let text = match store.read_samples_text(card_id)? {
1785 Some(t) => t,
1786 None => return Ok(Vec::new()),
1787 };
1788 let mut matched: usize = 0;
1789 let mut out = Vec::new();
1790 for (i, line) in text.lines().enumerate() {
1791 if line.trim().is_empty() {
1792 continue;
1793 }
1794 let val: Json = serde_json::from_str(line)
1795 .map_err(|e| format!("Failed to parse sample line {i}: {e}"))?;
1796 if let Some(pred) = &q.where_ {
1797 if !eval_predicate(pred, &val) {
1798 continue;
1799 }
1800 }
1801 if matched < q.offset {
1802 matched += 1;
1803 continue;
1804 }
1805 if let Some(lim) = q.limit {
1806 if out.len() >= lim {
1807 break;
1808 }
1809 }
1810 matched += 1;
1811 out.push(val);
1812 }
1813 Ok(out)
1814}
1815
1816pub struct FileCardStore {
1830 root: PathBuf,
1831}
1832
1833impl FileCardStore {
1834 pub fn new(root: PathBuf) -> Self {
1836 Self { root }
1837 }
1838
1839 pub fn root(&self) -> &Path {
1841 &self.root
1842 }
1843
1844 pub fn create(&self, input: Json) -> Result<(String, PathBuf), String> {
1852 create_with_store(self, input)
1853 }
1854
1855 pub fn get(&self, card_id: &str) -> Result<Option<Json>, String> {
1856 get_with_store(self, card_id)
1857 }
1858
1859 pub fn list(&self, pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
1860 list_with_store(self, pkg_filter)
1861 }
1862
1863 pub fn append(&self, card_id: &str, fields: Json) -> Result<Json, String> {
1864 append_with_store(self, card_id, fields)
1865 }
1866
1867 pub fn alias_set(
1868 &self,
1869 name: &str,
1870 card_id: &str,
1871 pkg: Option<&str>,
1872 note: Option<&str>,
1873 ) -> Result<Alias, String> {
1874 alias_set_with_store(self, name, card_id, pkg, note)
1875 }
1876
1877 pub fn alias_list(&self, pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
1878 alias_list_with_store(self, pkg_filter)
1879 }
1880
1881 pub fn get_by_alias(&self, name: &str) -> Result<Option<Json>, String> {
1882 get_by_alias_with_store(self, name)
1883 }
1884
1885 pub fn find(&self, q: FindQuery) -> Result<Vec<Summary>, String> {
1886 find_with_store(self, q)
1887 }
1888
1889 pub fn write_samples(&self, card_id: &str, samples: Vec<Json>) -> Result<PathBuf, String> {
1890 write_samples_with_store(self, card_id, samples)
1891 }
1892
1893 pub fn read_samples(&self, card_id: &str, q: SamplesQuery) -> Result<Vec<Json>, String> {
1894 read_samples_with_store(self, card_id, q)
1895 }
1896
1897 pub fn lineage(&self, q: LineageQuery) -> Result<Option<LineageResult>, String> {
1898 lineage_with_store(self, q)
1899 }
1900
1901 pub fn card_sink_backfill(
1902 &self,
1903 sink: &str,
1904 dry_run: bool,
1905 ) -> Result<SinkBackfillReport, String> {
1906 card_sink_backfill_with_store(self, sink, dry_run)
1907 }
1908
1909 fn pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
1913 validate_name(pkg, "pkg")?;
1914 let dir = self.root.join(pkg);
1915 if !dir.exists() {
1916 fs::create_dir_all(&dir).map_err(|e| format!("Failed to create pkg dir: {e}"))?;
1917 }
1918 Ok(dir)
1919 }
1920
1921 fn aliases_path(&self) -> PathBuf {
1923 self.root.join("_aliases.toml")
1924 }
1925
1926 fn samples_path(&self, card_id: &str) -> Result<PathBuf, String> {
1930 let card_path = self
1931 .find_card_locator(card_id)?
1932 .ok_or_else(|| format!("card '{card_id}' not found"))?;
1933 let dir = card_path
1934 .parent()
1935 .ok_or_else(|| format!("card '{card_id}' has no parent directory"))?;
1936 Ok(dir.join(format!("{card_id}.samples.jsonl")))
1937 }
1938}
1939
1940impl CardStore for FileCardStore {
1941 fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1942 let dir = self.pkg_dir(pkg)?;
1943 let path = dir.join(format!("{card_id}.toml"));
1944 if path.exists() {
1945 return Err(format!(
1946 "alc.card.create: card '{card_id}' already exists (immutable)"
1947 ));
1948 }
1949 let tmp = path.with_extension("toml.tmp");
1950 fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1951 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1952 Ok(path)
1953 }
1954
1955 fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1956 let path = self
1957 .find_card_locator(card_id)?
1958 .ok_or_else(|| format!("alc.card.overwrite: card '{card_id}' not found"))?;
1959 let tmp = path.with_extension("toml.tmp");
1960 fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1961 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1962 Ok(path)
1963 }
1964
1965 fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
1966 validate_name(card_id, "card_id")?;
1967 if !self.root.exists() {
1968 return Ok(None);
1969 }
1970 let entries =
1971 fs::read_dir(&self.root).map_err(|e| format!("Failed to read cards dir: {e}"))?;
1972 for entry in entries.flatten() {
1973 let p = entry.path();
1974 if p.is_dir() {
1975 let candidate = p.join(format!("{card_id}.toml"));
1976 if candidate.exists() {
1977 return Ok(Some(candidate));
1978 }
1979 }
1980 }
1981 Ok(None)
1982 }
1983
1984 fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String> {
1985 let Some(path) = self.find_card_locator(card_id)? else {
1986 return Ok(None);
1987 };
1988 let text = fs::read_to_string(&path)
1989 .map_err(|e| format!("Failed to read card '{card_id}': {e}"))?;
1990 Ok(Some(text))
1991 }
1992
1993 fn list_card_locators(
1994 &self,
1995 pkg_filter: Option<&str>,
1996 ) -> Result<Vec<(String, PathBuf)>, String> {
1997 if !self.root.exists() {
1998 return Ok(Vec::new());
1999 }
2000 let pkg_dirs: Vec<PathBuf> = if let Some(p) = pkg_filter {
2001 validate_name(p, "pkg")?;
2002 let d = self.root.join(p);
2003 if d.is_dir() {
2004 vec![d]
2005 } else {
2006 return Ok(Vec::new());
2007 }
2008 } else {
2009 fs::read_dir(&self.root)
2010 .map_err(|e| format!("Failed to read cards dir: {e}"))?
2011 .flatten()
2012 .map(|e| e.path())
2013 .filter(|p| p.is_dir())
2014 .collect()
2015 };
2016
2017 let mut out = Vec::new();
2018 for pdir in pkg_dirs {
2019 let pkg = pdir
2020 .file_name()
2021 .and_then(|s| s.to_str())
2022 .unwrap_or("")
2023 .to_string();
2024 let entries = match fs::read_dir(&pdir) {
2025 Ok(e) => e,
2026 Err(_) => continue,
2027 };
2028 for entry in entries.flatten() {
2029 let p = entry.path();
2030 if p.extension().and_then(|s| s.to_str()) != Some("toml") {
2031 continue;
2032 }
2033 out.push((pkg.clone(), p));
2034 }
2035 }
2036 Ok(out)
2037 }
2038
2039 fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String> {
2040 match fs::read_to_string(locator) {
2041 Ok(text) => Ok(Some(text)),
2042 Err(_) => Ok(None),
2043 }
2044 }
2045
2046 fn read_aliases(&self) -> Result<Vec<Alias>, String> {
2047 let path = self.aliases_path();
2048 if !path.exists() {
2049 return Ok(Vec::new());
2050 }
2051 let text =
2052 fs::read_to_string(&path).map_err(|e| format!("Failed to read aliases file: {e}"))?;
2053 let val: toml::Value =
2054 toml::from_str(&text).map_err(|e| format!("Failed to parse aliases file: {e}"))?;
2055 let arr = val
2056 .get("alias")
2057 .and_then(|v| v.as_array())
2058 .cloned()
2059 .unwrap_or_default();
2060 let mut out = Vec::with_capacity(arr.len());
2061 for entry in arr {
2062 let t = match entry {
2063 toml::Value::Table(t) => t,
2064 _ => continue,
2065 };
2066 let name = match t.get("name").and_then(|v| v.as_str()) {
2067 Some(s) => s.to_string(),
2068 None => continue,
2069 };
2070 let card_id = match t.get("card_id").and_then(|v| v.as_str()) {
2071 Some(s) => s.to_string(),
2072 None => continue,
2073 };
2074 out.push(Alias {
2075 name,
2076 card_id,
2077 pkg: t.get("pkg").and_then(|v| v.as_str()).map(String::from),
2078 set_at: t
2079 .get("set_at")
2080 .and_then(|v| v.as_str())
2081 .map(String::from)
2082 .unwrap_or_default(),
2083 note: t.get("note").and_then(|v| v.as_str()).map(String::from),
2084 });
2085 }
2086 Ok(out)
2087 }
2088
2089 fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String> {
2090 if !self.root.exists() {
2093 fs::create_dir_all(&self.root)
2094 .map_err(|e| format!("Failed to create cards dir: {e}"))?;
2095 }
2096 let path = self.aliases_path();
2097 let mut arr = Vec::with_capacity(aliases.len());
2098 for a in aliases {
2099 let mut t = toml::map::Map::new();
2100 t.insert("name".into(), toml::Value::String(a.name.clone()));
2101 t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
2102 if let Some(p) = &a.pkg {
2103 t.insert("pkg".into(), toml::Value::String(p.clone()));
2104 }
2105 t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
2106 if let Some(n) = &a.note {
2107 t.insert("note".into(), toml::Value::String(n.clone()));
2108 }
2109 arr.push(toml::Value::Table(t));
2110 }
2111 let mut root = toml::map::Map::new();
2112 root.insert("alias".into(), toml::Value::Array(arr));
2113 let text = toml::to_string_pretty(&toml::Value::Table(root))
2114 .map_err(|e| format!("Failed to serialize aliases: {e}"))?;
2115 let tmp = path.with_extension("toml.tmp");
2116 fs::write(&tmp, &text).map_err(|e| format!("Failed to write aliases tmp: {e}"))?;
2117 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename aliases file: {e}"))?;
2118 Ok(())
2119 }
2120
2121 fn samples_exists(&self, card_id: &str) -> Result<bool, String> {
2122 let path = self.samples_path(card_id)?;
2123 Ok(path.exists())
2124 }
2125
2126 fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String> {
2127 let path = self.samples_path(card_id)?;
2128 if path.exists() {
2129 return Err(format!(
2130 "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
2131 ));
2132 }
2133 let tmp = path.with_extension("jsonl.tmp");
2134 fs::write(&tmp, jsonl_text).map_err(|e| format!("Failed to write samples tmp: {e}"))?;
2135 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename samples file: {e}"))?;
2136 Ok(path)
2137 }
2138
2139 fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String> {
2140 let path = self.samples_path(card_id)?;
2141 if !path.exists() {
2142 return Ok(None);
2143 }
2144 let text =
2145 fs::read_to_string(&path).map_err(|e| format!("Failed to read samples file: {e}"))?;
2146 Ok(Some(text))
2147 }
2148
2149 fn import_from_dir(
2150 &self,
2151 source_dir: &Path,
2152 pkg: &str,
2153 ) -> Result<(Vec<String>, Vec<String>), String> {
2154 validate_name(pkg, "pkg")?;
2155 let dest = self.pkg_dir(pkg)?;
2156 let mut imported = Vec::new();
2157 let mut skipped = Vec::new();
2158
2159 let entries =
2160 fs::read_dir(source_dir).map_err(|e| format!("Failed to read card source dir: {e}"))?;
2161
2162 for entry in entries.flatten() {
2163 let path = entry.path();
2164 let fname = match path.file_name().and_then(|n| n.to_str()) {
2165 Some(n) => n.to_string(),
2166 None => continue,
2167 };
2168
2169 if !fname.ends_with(".toml") {
2170 continue;
2171 }
2172
2173 let card_id = fname.trim_end_matches(".toml");
2174 let dest_toml = dest.join(&fname);
2175
2176 if dest_toml.exists() {
2177 skipped.push(card_id.to_string());
2178 continue;
2179 }
2180
2181 let text = fs::read_to_string(&path)
2182 .map_err(|e| format!("Failed to read card file '{fname}': {e}"))?;
2183 let val: toml::Value = toml::from_str(&text)
2184 .map_err(|e| format!("Failed to parse card file '{fname}': {e}"))?;
2185 if val.get("schema_version").and_then(|v| v.as_str()) != Some(SCHEMA_VERSION) {
2186 continue;
2187 }
2188
2189 fs::copy(&path, &dest_toml)
2190 .map_err(|e| format!("Failed to copy card '{fname}': {e}"))?;
2191
2192 let samples_name = format!("{card_id}.samples.jsonl");
2193 let samples_src = source_dir.join(&samples_name);
2194 if samples_src.exists() {
2195 let samples_dest = dest.join(&samples_name);
2196 if !samples_dest.exists() {
2197 fs::copy(&samples_src, &samples_dest)
2198 .map_err(|e| format!("Failed to copy samples '{samples_name}': {e}"))?;
2199 }
2200 }
2201
2202 imported.push(card_id.to_string());
2203 }
2204
2205 Ok((imported, skipped))
2206 }
2207}
2208
2209#[derive(Debug, Clone)]
2227pub enum CardEvent {
2228 Created {
2230 pkg: String,
2231 card_id: String,
2232 toml_text: String,
2233 },
2234 Appended { card_id: String, toml_text: String },
2236 SamplesWritten { card_id: String, jsonl_text: String },
2238 AliasesWritten { toml_text: String },
2240}
2241
2242#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
2246pub enum CardEventKind {
2247 Created,
2248 Appended,
2249 SamplesWritten,
2250 AliasesWritten,
2251}
2252
2253impl CardEventKind {
2254 pub fn as_str(self) -> &'static str {
2256 match self {
2257 CardEventKind::Created => "created",
2258 CardEventKind::Appended => "appended",
2259 CardEventKind::SamplesWritten => "samples_written",
2260 CardEventKind::AliasesWritten => "aliases_written",
2261 }
2262 }
2263
2264 pub fn json_key(self) -> &'static str {
2268 match self {
2269 CardEventKind::Created => "created",
2270 CardEventKind::Appended => "appended",
2271 CardEventKind::SamplesWritten => "samples",
2272 CardEventKind::AliasesWritten => "aliases",
2273 }
2274 }
2275
2276 pub fn all() -> [CardEventKind; 4] {
2280 [
2281 CardEventKind::Created,
2282 CardEventKind::Appended,
2283 CardEventKind::SamplesWritten,
2284 CardEventKind::AliasesWritten,
2285 ]
2286 }
2287}
2288
2289impl Serialize for CardEventKind {
2290 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2291 s.serialize_str(self.json_key())
2292 }
2293}
2294
2295impl CardEvent {
2296 pub fn kind(&self) -> CardEventKind {
2298 match self {
2299 CardEvent::Created { .. } => CardEventKind::Created,
2300 CardEvent::Appended { .. } => CardEventKind::Appended,
2301 CardEvent::SamplesWritten { .. } => CardEventKind::SamplesWritten,
2302 CardEvent::AliasesWritten { .. } => CardEventKind::AliasesWritten,
2303 }
2304 }
2305}
2306
2307pub trait CardSubscriber: Send + Sync {
2315 fn on_event(&self, ev: &CardEvent) -> Result<(), String>;
2319
2320 fn describe(&self) -> String;
2324
2325 fn has_card(&self, _card_id: &str) -> Result<bool, String> {
2333 Ok(false)
2334 }
2335}
2336
2337#[derive(Debug, Clone, Serialize)]
2342pub struct LastError {
2343 pub kind: CardEventKind,
2344 pub msg: String,
2345 pub ts_ms: u64,
2346}
2347
2348#[derive(Default, Debug)]
2352pub struct PerSubscriber {
2353 pub ok: HashMap<CardEventKind, u64>,
2354 pub err: HashMap<CardEventKind, u64>,
2355 pub last_error: Option<LastError>,
2356}
2357
2358#[derive(Default, Debug)]
2361pub struct SubscriberStats {
2362 inner: Mutex<HashMap<String, PerSubscriber>>,
2363}
2364
2365impl SubscriberStats {
2366 pub fn record_ok(&self, key: &str, kind: CardEventKind) {
2368 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2369 let entry = g.entry(key.to_string()).or_default();
2370 let c = entry.ok.entry(kind).or_insert(0);
2371 *c = c.saturating_add(1);
2372 }
2373
2374 pub fn record_err(&self, key: &str, kind: CardEventKind, err: &str) {
2378 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2379 let entry = g.entry(key.to_string()).or_default();
2380 let c = entry.err.entry(kind).or_insert(0);
2381 *c = c.saturating_add(1);
2382 entry.last_error = Some(LastError {
2383 kind,
2384 msg: err.to_string(),
2385 ts_ms: now_ms(),
2386 });
2387 }
2388
2389 pub fn snapshot(&self) -> Vec<SubscriberHealthRow> {
2398 let g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2399 let mut rows = Vec::with_capacity(g.len());
2400 for (sink, ps) in g.iter() {
2401 let mut ok: HashMap<String, u64> = HashMap::with_capacity(4);
2402 let mut err: HashMap<String, u64> = HashMap::with_capacity(4);
2403 for k in CardEventKind::all() {
2404 ok.insert(
2405 k.json_key().to_string(),
2406 ps.ok.get(&k).copied().unwrap_or(0),
2407 );
2408 err.insert(
2409 k.json_key().to_string(),
2410 ps.err.get(&k).copied().unwrap_or(0),
2411 );
2412 }
2413 rows.push(SubscriberHealthRow {
2414 sink: sink.clone(),
2415 ok,
2416 err,
2417 last_error: ps.last_error.clone(),
2418 });
2419 }
2420 rows.sort_by(|a, b| a.sink.cmp(&b.sink));
2423 rows
2424 }
2425}
2426
2427fn now_ms() -> u64 {
2431 std::time::SystemTime::now()
2432 .duration_since(std::time::UNIX_EPOCH)
2433 .unwrap_or_default()
2434 .as_millis() as u64
2435}
2436
2437#[derive(Debug, Clone, Serialize)]
2440pub struct SubscriberHealthRow {
2441 pub sink: String,
2442 pub ok: HashMap<String, u64>,
2443 pub err: HashMap<String, u64>,
2444 pub last_error: Option<LastError>,
2445}
2446
2447pub fn subscriber_stats_snapshot() -> Vec<SubscriberHealthRow> {
2452 event_bus().stats().snapshot()
2453}
2454
2455fn atomic_write(dest: &Path, bytes: &[u8]) -> Result<(), String> {
2462 static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
2463 let seq = TMP_SEQ.fetch_add(1, Ordering::Relaxed);
2464 let pid = process::id();
2465 if let Some(parent) = dest.parent() {
2466 if !parent.as_os_str().is_empty() && !parent.exists() {
2467 fs::create_dir_all(parent).map_err(|e| format!("subscriber mkdir: {e}"))?;
2468 }
2469 }
2470 let mut tmp = dest.as_os_str().to_owned();
2471 tmp.push(format!(".tmp.{pid}.{seq}"));
2472 let tmp_path = PathBuf::from(tmp);
2473 fs::write(&tmp_path, bytes).map_err(|e| format!("subscriber write tmp: {e}"))?;
2474 fs::rename(&tmp_path, dest).map_err(|e| format!("subscriber rename: {e}"))
2475}
2476
2477fn canonical_file_uri(root: &Path) -> String {
2481 let p = root.to_string_lossy();
2482 #[cfg(unix)]
2483 {
2484 format!("file://{p}")
2485 }
2486 #[cfg(windows)]
2487 {
2488 format!("file:///{}", p.replace('\\', "/"))
2489 }
2490 #[cfg(not(any(unix, windows)))]
2491 {
2492 format!("file://{p}")
2493 }
2494}
2495
2496pub struct FileCardSubscriber {
2503 root: PathBuf,
2504 uri: String,
2505}
2506
2507impl FileCardSubscriber {
2508 pub fn new(root: PathBuf) -> Self {
2511 let uri = canonical_file_uri(&root);
2512 Self { root, uri }
2513 }
2514
2515 pub fn locate_card(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
2519 validate_name(card_id, "card_id")?;
2520 if !self.root.exists() {
2521 return Ok(None);
2522 }
2523 let entries = fs::read_dir(&self.root).map_err(|e| format!("subscriber read_dir: {e}"))?;
2524 for entry in entries.flatten() {
2525 let p = entry.path();
2526 if p.is_dir() {
2527 let candidate = p.join(format!("{card_id}.toml"));
2528 if candidate.exists() {
2529 return Ok(Some(candidate));
2530 }
2531 }
2532 }
2533 Ok(None)
2534 }
2535
2536 fn ensure_pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
2537 validate_name(pkg, "pkg")?;
2538 let dir = self.root.join(pkg);
2539 if !dir.exists() {
2540 fs::create_dir_all(&dir).map_err(|e| format!("subscriber mkdir: {e}"))?;
2541 }
2542 Ok(dir)
2543 }
2544
2545 fn write_created(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<(), String> {
2546 validate_name(card_id, "card_id")?;
2547 let dir = self.ensure_pkg_dir(pkg)?;
2548 let dest = dir.join(format!("{card_id}.toml"));
2549 atomic_write(&dest, toml_text.as_bytes())
2550 }
2551
2552 fn write_appended(&self, card_id: &str, toml_text: &str) -> Result<(), String> {
2553 match self.locate_card(card_id)? {
2554 Some(dest) => atomic_write(&dest, toml_text.as_bytes()),
2555 None => Err(format!(
2556 "subscriber append: card '{card_id}' missing at {}",
2557 self.uri
2558 )),
2559 }
2560 }
2561
2562 fn write_samples(&self, card_id: &str, jsonl_text: &str) -> Result<(), String> {
2563 let card_path = self.locate_card(card_id)?.ok_or_else(|| {
2564 format!(
2565 "subscriber samples: card '{card_id}' missing at {}",
2566 self.uri
2567 )
2568 })?;
2569 let dir = card_path
2570 .parent()
2571 .ok_or_else(|| format!("subscriber samples: card '{card_id}' has no parent dir"))?;
2572 let dest = dir.join(format!("{card_id}.samples.jsonl"));
2573 atomic_write(&dest, jsonl_text.as_bytes())
2574 }
2575
2576 fn write_aliases(&self, toml_text: &str) -> Result<(), String> {
2577 if !self.root.exists() {
2578 fs::create_dir_all(&self.root).map_err(|e| format!("subscriber mkdir: {e}"))?;
2579 }
2580 let dest = self.root.join("_aliases.toml");
2581 atomic_write(&dest, toml_text.as_bytes())
2582 }
2583}
2584
2585impl CardSubscriber for FileCardSubscriber {
2586 fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
2587 match ev {
2588 CardEvent::Created {
2589 pkg,
2590 card_id,
2591 toml_text,
2592 } => self.write_created(pkg, card_id, toml_text),
2593 CardEvent::Appended { card_id, toml_text } => self.write_appended(card_id, toml_text),
2594 CardEvent::SamplesWritten {
2595 card_id,
2596 jsonl_text,
2597 } => self.write_samples(card_id, jsonl_text),
2598 CardEvent::AliasesWritten { toml_text } => self.write_aliases(toml_text),
2599 }
2600 }
2601
2602 fn describe(&self) -> String {
2603 self.uri.clone()
2604 }
2605
2606 fn has_card(&self, card_id: &str) -> Result<bool, String> {
2610 Ok(self.locate_card(card_id)?.is_some())
2611 }
2612}
2613
2614pub struct CardEventBus {
2621 subscribers: Mutex<Vec<Arc<dyn CardSubscriber>>>,
2622 stats: Arc<SubscriberStats>,
2623}
2624
2625impl CardEventBus {
2626 pub fn new(subscribers: Vec<Arc<dyn CardSubscriber>>) -> Self {
2629 Self {
2630 subscribers: Mutex::new(subscribers),
2631 stats: Arc::new(SubscriberStats::default()),
2632 }
2633 }
2634
2635 pub fn stats(&self) -> &Arc<SubscriberStats> {
2637 &self.stats
2638 }
2639
2640 pub fn publish(&self, ev: &CardEvent) {
2644 let subs_snapshot: Vec<Arc<dyn CardSubscriber>> = {
2645 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2646 guard.clone()
2647 };
2648 for sub in &subs_snapshot {
2649 let key = sub.describe();
2650 match sub.on_event(ev) {
2651 Ok(()) => self.stats.record_ok(&key, ev.kind()),
2652 Err(e) => {
2653 tracing::warn!(
2654 subscriber = %key,
2655 kind = ev.kind().as_str(),
2656 error = %e,
2657 "card subscriber failed"
2658 );
2659 self.stats.record_err(&key, ev.kind(), &e);
2660 }
2661 }
2662 }
2663 }
2664
2665 pub fn publish_to(&self, target: &str, ev: &CardEvent) -> Result<(), String> {
2669 let hit: Option<Arc<dyn CardSubscriber>> = {
2670 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2671 guard.iter().find(|s| s.describe() == target).cloned()
2672 };
2673 let Some(sub) = hit else {
2674 return Err(format!("subscriber not registered: {target}"));
2675 };
2676 let key = sub.describe();
2677 match sub.on_event(ev) {
2678 Ok(()) => {
2679 self.stats.record_ok(&key, ev.kind());
2680 Ok(())
2681 }
2682 Err(e) => {
2683 tracing::warn!(
2684 subscriber = %key,
2685 kind = ev.kind().as_str(),
2686 error = %e,
2687 "card subscriber failed (publish_to)"
2688 );
2689 self.stats.record_err(&key, ev.kind(), &e);
2690 Err(e)
2691 }
2692 }
2693 }
2694
2695 pub fn subscriber_uris(&self) -> Vec<String> {
2697 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2698 guard.iter().map(|s| s.describe()).collect()
2699 }
2700
2701 pub fn find_subscriber(&self, uri: &str) -> Option<Arc<dyn CardSubscriber>> {
2706 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2707 guard.iter().find(|s| s.describe() == uri).cloned()
2708 }
2709
2710 #[cfg(any(test, feature = "test-support"))]
2713 pub fn replace_subscribers_for_test(&self, subs: Vec<Arc<dyn CardSubscriber>>) {
2714 let mut guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2715 *guard = subs;
2716 }
2717
2718 #[cfg(any(test, feature = "test-support"))]
2720 pub fn reset_stats_for_test(&self) {
2721 let mut g = self.stats.inner.lock().unwrap_or_else(|p| p.into_inner());
2722 g.clear();
2723 }
2724}
2725
2726static CARD_EVENT_BUS: OnceLock<CardEventBus> = OnceLock::new();
2727
2728pub fn event_bus() -> &'static CardEventBus {
2731 CARD_EVENT_BUS.get_or_init(|| {
2732 let subs = load_subscribers_from_env();
2733 CardEventBus::new(subs)
2734 })
2735}
2736
2737pub fn init_event_bus() {
2742 let bus = event_bus();
2743 let uris = bus.subscriber_uris();
2744 if uris.is_empty() {
2745 tracing::info!("card sinks: no subscribers configured (ALC_CARD_SINKS unset)");
2746 } else {
2747 for uri in &uris {
2748 tracing::info!(subscriber = %uri, "card sink registered");
2749 }
2750 }
2751}
2752
2753#[cfg(any(test, feature = "test-support"))]
2755pub fn install_event_bus_for_test(bus: CardEventBus) -> Result<(), String> {
2756 CARD_EVENT_BUS
2757 .set(bus)
2758 .map_err(|_| "bus already initialized".to_string())
2759}
2760
2761pub fn publish(ev: CardEvent) {
2763 #[cfg(test)]
2769 {
2770 let is_test_owner = INSIDE_BUS_TEST.with(|f| f.get());
2771 if !is_test_owner {
2772 let _gate = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
2774 event_bus().publish(&ev);
2775 return;
2776 }
2777 }
2778 event_bus().publish(&ev);
2779}
2780
2781#[cfg(test)]
2784fn bus_test_gate() -> &'static Mutex<()> {
2785 static GATE: OnceLock<Mutex<()>> = OnceLock::new();
2786 GATE.get_or_init(|| Mutex::new(()))
2787}
2788
2789#[cfg(test)]
2792thread_local! {
2793 static INSIDE_BUS_TEST: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
2794}
2795
2796#[derive(Debug, Clone, Default, Serialize)]
2803pub struct SinkBackfillReport {
2804 pub sink: String,
2805 pub pushed: Vec<String>,
2806 pub skipped: Vec<String>,
2807 pub failed: Vec<(String, String)>,
2808 pub pushed_samples: Vec<String>,
2809}
2810
2811pub fn card_sink_backfill_with_store(
2831 store: &dyn CardStore,
2832 sink: &str,
2833 dry_run: bool,
2834) -> Result<SinkBackfillReport, String> {
2835 let bus = event_bus();
2836 let sub = bus
2837 .find_subscriber(sink)
2838 .ok_or_else(|| format!("unknown sink: {sink}"))?;
2839
2840 let locators = store.list_card_locators(None)?;
2841
2842 let mut report = SinkBackfillReport {
2843 sink: sink.to_string(),
2844 ..Default::default()
2845 };
2846
2847 for (pkg, locator) in locators {
2848 let card_id = match locator.file_stem().and_then(|s| s.to_str()) {
2849 Some(s) => s.to_string(),
2850 None => continue,
2851 };
2852
2853 match sub.has_card(&card_id) {
2854 Ok(true) => {
2855 report.skipped.push(card_id);
2856 continue;
2857 }
2858 Ok(false) => {}
2859 Err(e) => {
2860 tracing::warn!(
2861 card_id = %card_id,
2862 error = %e,
2863 "backfill: has_card failed; treating as skipped"
2864 );
2865 report.skipped.push(card_id);
2866 continue;
2867 }
2868 }
2869
2870 let toml_text = match store.read_locator_text(&locator) {
2871 Ok(Some(t)) => t,
2872 Ok(None) => {
2873 report.skipped.push(card_id);
2875 continue;
2876 }
2877 Err(e) => {
2878 tracing::warn!(
2879 card_id = %card_id,
2880 error = %e,
2881 "backfill: read_locator_text failed; treating as skipped"
2882 );
2883 report.skipped.push(card_id);
2884 continue;
2885 }
2886 };
2887
2888 if dry_run {
2889 report.pushed.push(card_id.clone());
2890 if matches!(store.read_samples_text(&card_id), Ok(Some(_))) {
2891 report.pushed_samples.push(card_id);
2892 }
2893 continue;
2894 }
2895
2896 let ev = CardEvent::Created {
2897 pkg: pkg.clone(),
2898 card_id: card_id.clone(),
2899 toml_text,
2900 };
2901 match bus.publish_to(sink, &ev) {
2902 Ok(()) => report.pushed.push(card_id.clone()),
2903 Err(e) => {
2904 report.failed.push((card_id, e));
2905 continue;
2906 }
2907 }
2908
2909 if let Ok(Some(jsonl_text)) = store.read_samples_text(&card_id) {
2910 let ev = CardEvent::SamplesWritten {
2911 card_id: card_id.clone(),
2912 jsonl_text,
2913 };
2914 match bus.publish_to(sink, &ev) {
2915 Ok(()) => report.pushed_samples.push(card_id),
2916 Err(e) => {
2917 report.failed.push((card_id, format!("samples: {e}")));
2918 }
2919 }
2920 }
2921 }
2922
2923 Ok(report)
2924}
2925
2926fn load_subscribers_from_env() -> Vec<Arc<dyn CardSubscriber>> {
2932 let raw = match std::env::var("ALC_CARD_SINKS") {
2933 Ok(v) => v,
2934 Err(std::env::VarError::NotPresent) => return Vec::new(),
2935 Err(std::env::VarError::NotUnicode(_)) => {
2936 tracing::error!("ALC_CARD_SINKS contains non-UTF8 bytes; ignoring entire variable");
2937 return Vec::new();
2938 }
2939 };
2940 parse_subscribers_from_str(&raw)
2941}
2942
2943fn parse_subscribers_from_str(raw: &str) -> Vec<Arc<dyn CardSubscriber>> {
2947 if raw.is_empty() {
2948 return Vec::new();
2949 }
2950 let mut seen: HashSet<String> = HashSet::new();
2951 let mut out: Vec<Arc<dyn CardSubscriber>> = Vec::new();
2952 for spec in raw.split('|') {
2953 let spec = spec.trim();
2954 if spec.is_empty() {
2955 continue;
2956 }
2957 let Some(sub) = parse_subscriber_spec(spec) else {
2958 continue;
2959 };
2960 let uri = sub.describe();
2961 if !seen.insert(uri.clone()) {
2962 tracing::warn!(subscriber = %uri, "duplicate ALC_CARD_SINKS entry; keeping first");
2963 continue;
2964 }
2965 out.push(sub);
2966 }
2967 out
2968}
2969
2970fn parse_subscriber_spec(spec: &str) -> Option<Arc<dyn CardSubscriber>> {
2972 let Some(colon_idx) = spec.find(':') else {
2974 tracing::error!(spec, "ALC_CARD_SINKS entry missing URI scheme");
2975 return None;
2976 };
2977 let scheme = &spec[..colon_idx];
2978 let rest = &spec[colon_idx + 1..];
2979 if scheme != "file" {
2980 tracing::error!(spec, scheme, "ALC_CARD_SINKS entry has unknown scheme");
2981 return None;
2982 }
2983 let Some(after_slash) = rest.strip_prefix("//") else {
2985 tracing::error!(spec, "file URI missing '//'");
2986 return None;
2987 };
2988 let Some(path_start) = after_slash.find('/') else {
2990 tracing::error!(spec, "file URI has no path component");
2991 return None;
2992 };
2993 let authority = &after_slash[..path_start];
2994 let encoded_path = &after_slash[path_start..];
2995 if !authority.is_empty() {
2996 tracing::error!(
2997 spec,
2998 authority,
2999 "file URI with non-empty authority is rejected"
3000 );
3001 return None;
3002 }
3003 let path = decode_file_uri_path(encoded_path)?;
3004 Some(Arc::new(FileCardSubscriber::new(path)))
3005}
3006
3007fn decode_file_uri_path(encoded: &str) -> Option<PathBuf> {
3013 let decoded = percent_decode(encoded)?;
3014 #[cfg(windows)]
3015 {
3016 let trimmed = decoded.strip_prefix('/').unwrap_or(&decoded);
3018 Some(PathBuf::from(trimmed))
3019 }
3020 #[cfg(not(windows))]
3021 {
3022 Some(PathBuf::from(decoded))
3023 }
3024}
3025
3026fn percent_decode(src: &str) -> Option<String> {
3029 let bytes = src.as_bytes();
3030 let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
3031 let mut i = 0;
3032 while i < bytes.len() {
3033 let b = bytes[i];
3034 if b == b'%' {
3035 if i + 2 >= bytes.len() {
3036 tracing::error!(src, "percent-encoded sequence truncated");
3037 return None;
3038 }
3039 let hi = (bytes[i + 1] as char).to_digit(16);
3040 let lo = (bytes[i + 2] as char).to_digit(16);
3041 match (hi, lo) {
3042 (Some(h), Some(l)) => {
3043 out.push(((h << 4) | l) as u8);
3044 i += 3;
3045 }
3046 _ => {
3047 tracing::error!(src, "percent-encoded sequence has non-hex digits");
3048 return None;
3049 }
3050 }
3051 } else {
3052 out.push(b);
3053 i += 1;
3054 }
3055 }
3056 match String::from_utf8(out) {
3057 Ok(s) => Some(s),
3058 Err(_) => {
3059 tracing::error!(src, "percent-decoded bytes are not valid UTF-8");
3060 None
3061 }
3062 }
3063}
3064
3065#[cfg(test)]
3066mod tests {
3067 use super::*;
3068
3069 #[test]
3070 fn minimum_valid_card() {
3071 let tmp = tempfile::tempdir().unwrap();
3072 let store = FileCardStore::new(tmp.path().to_path_buf());
3073 let pkg = "minimum_valid_pkg";
3074 let input = json!({ "pkg": { "name": pkg } });
3075 let (id, path) = create_with_store(&store, input).unwrap();
3076 assert!(path.exists());
3077 assert!(id.starts_with(pkg));
3078
3079 let got = get_with_store(&store, &id).unwrap().unwrap();
3080 assert_eq!(got["schema_version"], json!(SCHEMA_VERSION));
3081 assert_eq!(got["card_id"], json!(id));
3082 assert_eq!(got["pkg"]["name"], json!(pkg));
3083 assert!(got.get("created_at").is_some());
3084 assert!(got.get("created_by").is_some());
3085 }
3086
3087 #[test]
3088 fn create_rejects_missing_pkg_name() {
3089 let tmp = tempfile::tempdir().unwrap();
3090 let store = FileCardStore::new(tmp.path().to_path_buf());
3091 let err = create_with_store(&store, json!({})).unwrap_err();
3092 assert!(err.contains("pkg.name"));
3093 }
3094
3095 #[test]
3096 fn create_is_immutable() {
3097 let tmp = tempfile::tempdir().unwrap();
3098 let store = FileCardStore::new(tmp.path().to_path_buf());
3099 let pkg = "immutable_pkg";
3100 let input = json!({
3101 "card_id": "fixed_id_001",
3102 "pkg": { "name": pkg }
3103 });
3104 create_with_store(&store, input.clone()).unwrap();
3105 let err = create_with_store(&store, input).unwrap_err();
3106 assert!(err.contains("already exists"));
3107 }
3108
3109 #[test]
3110 fn create_injects_param_fingerprint() {
3111 let tmp = tempfile::tempdir().unwrap();
3112 let store = FileCardStore::new(tmp.path().to_path_buf());
3113 let pkg = "fingerprint_pkg";
3114 let input = json!({
3115 "pkg": { "name": pkg },
3116 "params": { "depth": 3, "temperature": 0.0 }
3117 });
3118 let (id, _) = create_with_store(&store, input).unwrap();
3119 let got = get_with_store(&store, &id).unwrap().unwrap();
3120 assert!(got["param_fingerprint"].is_string());
3121 }
3122
3123 #[test]
3124 fn list_returns_newest_first() {
3125 let tmp = tempfile::tempdir().unwrap();
3126 let store = FileCardStore::new(tmp.path().to_path_buf());
3127 let pkg = "list_newest_pkg";
3128 let (id1, _) = create_with_store(
3129 &store,
3130 json!({
3131 "card_id": format!("{pkg}_a"),
3132 "pkg": { "name": pkg },
3133 "created_at": "2025-01-01T00:00:00Z"
3134 }),
3135 )
3136 .unwrap();
3137 let (id2, _) = create_with_store(
3138 &store,
3139 json!({
3140 "card_id": format!("{pkg}_b"),
3141 "pkg": { "name": pkg },
3142 "created_at": "2026-01-01T00:00:00Z"
3143 }),
3144 )
3145 .unwrap();
3146
3147 let rows = list_with_store(&store, Some(pkg)).unwrap();
3148 assert_eq!(rows.len(), 2);
3149 assert_eq!(rows[0].card_id, id2); assert_eq!(rows[1].card_id, id1);
3151 }
3152
3153 #[test]
3154 fn list_extracts_summary_fields() {
3155 let tmp = tempfile::tempdir().unwrap();
3156 let store = FileCardStore::new(tmp.path().to_path_buf());
3157 let pkg = "list_summary_pkg";
3158 let (id, _) = create_with_store(
3159 &store,
3160 json!({
3161 "pkg": { "name": pkg },
3162 "model": { "id": "claude-opus-4-6" },
3163 "scenario": { "name": "gsm8k_sample100" },
3164 "stats": { "pass_rate": 0.82 }
3165 }),
3166 )
3167 .unwrap();
3168
3169 let rows = list_with_store(&store, Some(pkg)).unwrap();
3170 let row = rows.iter().find(|r| r.card_id == id).unwrap();
3171 assert_eq!(row.model.as_deref(), Some("claude-opus-4-6"));
3172 assert_eq!(row.scenario.as_deref(), Some("gsm8k_sample100"));
3173 assert_eq!(row.pass_rate, Some(0.82));
3174 }
3175
3176 #[test]
3177 fn get_missing_returns_none() {
3178 let tmp = tempfile::tempdir().unwrap();
3179 let store = FileCardStore::new(tmp.path().to_path_buf());
3180 assert!(get_with_store(&store, "does_not_exist_xyz")
3181 .unwrap()
3182 .is_none());
3183 }
3184
3185 #[test]
3186 fn card_id_embeds_compact_timestamp() {
3187 let tmp = tempfile::tempdir().unwrap();
3188 let store = FileCardStore::new(tmp.path().to_path_buf());
3189 let pkg = "ts_embed_pkg";
3190 let (id, _) = create_with_store(&store, json!({ "pkg": { "name": pkg } })).unwrap();
3191 let tail = id.strip_prefix(&format!("{pkg}_")).unwrap();
3195 let parts: Vec<&str> = tail.split('_').collect();
3196 assert_eq!(parts.len(), 3, "unexpected card_id shape: {id}");
3198 let ts = parts[1];
3199 assert_eq!(ts.len(), 15, "timestamp segment wrong length: {ts}");
3200 assert!(ts.chars().nth(8) == Some('T'), "missing T separator: {ts}");
3201 }
3202
3203 #[test]
3204 fn now_compact_format() {
3205 let s = now_compact();
3206 assert_eq!(s.len(), 15);
3207 assert_eq!(s.chars().nth(8), Some('T'));
3208 for (i, c) in s.chars().enumerate() {
3210 if i != 8 {
3211 assert!(c.is_ascii_digit(), "non-digit at pos {i}: {s}");
3212 }
3213 }
3214 }
3215
3216 #[test]
3217 fn short_model_variants() {
3218 assert_eq!(short_model("claude-opus-4-6"), "opus46");
3219 assert_eq!(short_model("gpt-4o"), "4o");
3220 assert_eq!(short_model(""), "model");
3221 }
3222
3223 #[test]
3224 fn two_cards_same_second_different_stats_get_distinct_ids() {
3225 let tmp = tempfile::tempdir().unwrap();
3226 let store = FileCardStore::new(tmp.path().to_path_buf());
3227 let pkg = "distinct_ids_pkg";
3228 let input1 = json!({
3229 "pkg": { "name": pkg },
3230 "scenario": { "name": "gsm8k" },
3231 "stats": { "pass_rate": 0.4 }
3232 });
3233 let input2 = json!({
3234 "pkg": { "name": pkg },
3235 "scenario": { "name": "gsm8k" },
3236 "stats": { "pass_rate": 0.9 }
3237 });
3238 let (id1, _) = create_with_store(&store, input1).unwrap();
3239 let (id2, _) = create_with_store(&store, input2).unwrap();
3240 assert_ne!(id1, id2, "distinct stats must yield distinct card_ids");
3241 }
3242
3243 #[test]
3246 fn append_adds_new_fields() {
3247 let tmp = tempfile::tempdir().unwrap();
3248 let store = FileCardStore::new(tmp.path().to_path_buf());
3249 let pkg = "append_new_fields_pkg";
3250 let (id, _) = create_with_store(
3251 &store,
3252 json!({
3253 "pkg": { "name": pkg },
3254 "stats": { "pass_rate": 0.5 }
3255 }),
3256 )
3257 .unwrap();
3258
3259 let merged = append_with_store(
3260 &store,
3261 &id,
3262 json!({
3263 "caveats": { "notes": "rescored after fix" },
3264 "metadata": { "reviewer": "yn" }
3265 }),
3266 )
3267 .unwrap();
3268 assert_eq!(merged["caveats"]["notes"], json!("rescored after fix"));
3269 assert_eq!(merged["metadata"]["reviewer"], json!("yn"));
3270
3271 let got = get_with_store(&store, &id).unwrap().unwrap();
3273 assert_eq!(got["caveats"]["notes"], json!("rescored after fix"));
3274 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3276 }
3277
3278 #[test]
3279 fn append_rejects_existing_key() {
3280 let tmp = tempfile::tempdir().unwrap();
3281 let store = FileCardStore::new(tmp.path().to_path_buf());
3282 let pkg = "append_reject_key_pkg";
3283 let (id, _) = create_with_store(
3284 &store,
3285 json!({
3286 "pkg": { "name": pkg },
3287 "stats": { "pass_rate": 0.5 }
3288 }),
3289 )
3290 .unwrap();
3291
3292 let err =
3293 append_with_store(&store, &id, json!({ "stats": { "pass_rate": 0.9 } })).unwrap_err();
3294 assert!(err.contains("already set"), "got: {err}");
3295 let got = get_with_store(&store, &id).unwrap().unwrap();
3297 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3298 }
3299
3300 #[test]
3301 fn append_errors_on_missing_card() {
3302 let tmp = tempfile::tempdir().unwrap();
3303 let store = FileCardStore::new(tmp.path().to_path_buf());
3304 let err = append_with_store(&store, "does_not_exist_xyz", json!({ "x": 1 })).unwrap_err();
3305 assert!(err.contains("not found"));
3306 }
3307
3308 #[test]
3311 fn alias_set_and_list_roundtrip() {
3312 let tmp = tempfile::tempdir().unwrap();
3313 let store = FileCardStore::new(tmp.path().to_path_buf());
3314 let pkg = "alias_roundtrip_pkg";
3315 let (id, _) = create_with_store(&store, json!({ "pkg": { "name": pkg } })).unwrap();
3316
3317 let alias_name = "test_alias_roundtrip";
3318 alias_set_with_store(&store, alias_name, &id, Some(pkg), Some("smoke")).unwrap();
3319
3320 let rows = alias_list_with_store(&store, Some(pkg)).unwrap();
3321 let a = rows.iter().find(|a| a.name == alias_name).unwrap();
3322 assert_eq!(a.card_id, id);
3323 assert_eq!(a.pkg.as_deref(), Some(pkg));
3324 assert_eq!(a.note.as_deref(), Some("smoke"));
3325 assert!(!a.set_at.is_empty());
3326
3327 let (id2, _) = create_with_store(
3329 &store,
3330 json!({
3331 "card_id": format!("{pkg}_b"),
3332 "pkg": { "name": pkg }
3333 }),
3334 )
3335 .unwrap();
3336 alias_set_with_store(&store, alias_name, &id2, Some(pkg), None).unwrap();
3337 let rows = alias_list_with_store(&store, Some(pkg)).unwrap();
3338 let matching: Vec<&Alias> = rows.iter().filter(|a| a.name == alias_name).collect();
3339 assert_eq!(matching.len(), 1, "alias should be unique by name");
3340 assert_eq!(matching[0].card_id, id2);
3341 }
3342
3343 #[test]
3344 fn alias_set_rejects_unknown_card() {
3345 let tmp = tempfile::tempdir().unwrap();
3346 let store = FileCardStore::new(tmp.path().to_path_buf());
3347 let err = alias_set_with_store(&store, "x", "does_not_exist_xyz", None, None).unwrap_err();
3348 assert!(err.contains("not found"));
3349 }
3350
3351 fn where_from(v: Json) -> Predicate {
3354 parse_where(&v).expect("parse where")
3355 }
3356
3357 fn order_from(v: Json) -> Vec<OrderKey> {
3358 parse_order_by(&v).expect("parse order_by")
3359 }
3360
3361 #[test]
3362 fn find_where_nested_eq_and_gte() {
3363 let tmp = tempfile::tempdir().unwrap();
3364 let store = FileCardStore::new(tmp.path().to_path_buf());
3365 let pkg = "find_nested_eq_pkg";
3366 create_with_store(
3367 &store,
3368 json!({
3369 "card_id": format!("{pkg}_low"),
3370 "pkg": { "name": pkg },
3371 "scenario": { "name": "gsm8k" },
3372 "stats": { "pass_rate": 0.4 }
3373 }),
3374 )
3375 .unwrap();
3376 create_with_store(
3377 &store,
3378 json!({
3379 "card_id": format!("{pkg}_high"),
3380 "pkg": { "name": pkg },
3381 "scenario": { "name": "gsm8k" },
3382 "stats": { "pass_rate": 0.9 }
3383 }),
3384 )
3385 .unwrap();
3386 create_with_store(
3387 &store,
3388 json!({
3389 "card_id": format!("{pkg}_other"),
3390 "pkg": { "name": pkg },
3391 "scenario": { "name": "other" },
3392 "stats": { "pass_rate": 1.0 }
3393 }),
3394 )
3395 .unwrap();
3396
3397 let rows = find_with_store(
3399 &store,
3400 FindQuery {
3401 pkg: Some(pkg.to_string()),
3402 where_: Some(where_from(json!({
3403 "scenario": { "name": "gsm8k" },
3404 }))),
3405 order_by: order_from(json!("-stats.pass_rate")),
3406 ..Default::default()
3407 },
3408 )
3409 .unwrap();
3410 assert_eq!(rows.len(), 2);
3411 assert_eq!(rows[0].pass_rate, Some(0.9));
3412 assert_eq!(rows[1].pass_rate, Some(0.4));
3413
3414 let rows = find_with_store(
3416 &store,
3417 FindQuery {
3418 pkg: Some(pkg.to_string()),
3419 where_: Some(where_from(json!({
3420 "stats": { "pass_rate": { "gte": 0.8 } },
3421 }))),
3422 order_by: order_from(json!("-stats.pass_rate")),
3423 ..Default::default()
3424 },
3425 )
3426 .unwrap();
3427 assert_eq!(rows.len(), 2);
3428 assert!(rows.iter().all(|r| r.pass_rate.unwrap() >= 0.8));
3429
3430 let rows = find_with_store(
3432 &store,
3433 FindQuery {
3434 pkg: Some(pkg.to_string()),
3435 order_by: order_from(json!("-stats.pass_rate")),
3436 limit: Some(1),
3437 ..Default::default()
3438 },
3439 )
3440 .unwrap();
3441 assert_eq!(rows.len(), 1);
3442 assert_eq!(rows[0].pass_rate, Some(1.0));
3443 }
3444
3445 #[test]
3446 fn find_where_implicit_eq_and_logical() {
3447 let tmp = tempfile::tempdir().unwrap();
3448 let store = FileCardStore::new(tmp.path().to_path_buf());
3449 let pkg = "find_implicit_eq_pkg";
3450 create_with_store(
3451 &store,
3452 json!({
3453 "card_id": format!("{pkg}_a"),
3454 "pkg": { "name": pkg },
3455 "model": { "id": "claude-opus-4-6" },
3456 "stats": { "equilibrium_position": "dead", "survival_rate": 0.0 }
3457 }),
3458 )
3459 .unwrap();
3460 create_with_store(
3461 &store,
3462 json!({
3463 "card_id": format!("{pkg}_b"),
3464 "pkg": { "name": pkg },
3465 "model": { "id": "claude-opus-4-6" },
3466 "stats": { "equilibrium_position": "niche_leader", "survival_rate": 1.0 }
3467 }),
3468 )
3469 .unwrap();
3470 create_with_store(
3471 &store,
3472 json!({
3473 "card_id": format!("{pkg}_c"),
3474 "pkg": { "name": pkg },
3475 "model": { "id": "claude-haiku-4-5-20251001" },
3476 "stats": { "equilibrium_position": "fragile", "survival_rate": 0.2 }
3477 }),
3478 )
3479 .unwrap();
3480
3481 let rows = find_with_store(
3483 &store,
3484 FindQuery {
3485 pkg: Some(pkg.to_string()),
3486 where_: Some(where_from(json!({
3487 "stats": { "equilibrium_position": "dead" },
3488 }))),
3489 ..Default::default()
3490 },
3491 )
3492 .unwrap();
3493 assert_eq!(rows.len(), 1);
3494 assert!(rows[0].card_id.ends_with("_a"));
3495
3496 let rows = find_with_store(
3498 &store,
3499 FindQuery {
3500 pkg: Some(pkg.to_string()),
3501 where_: Some(where_from(json!({
3502 "_or": [
3503 { "stats": { "equilibrium_position": "dead" } },
3504 { "stats": { "survival_rate": { "gte": 0.9 } } },
3505 ],
3506 }))),
3507 ..Default::default()
3508 },
3509 )
3510 .unwrap();
3511 assert_eq!(rows.len(), 2);
3512
3513 let rows = find_with_store(
3515 &store,
3516 FindQuery {
3517 pkg: Some(pkg.to_string()),
3518 where_: Some(where_from(json!({
3519 "_not": { "model": { "id": "claude-haiku-4-5-20251001" } },
3520 }))),
3521 ..Default::default()
3522 },
3523 )
3524 .unwrap();
3525 assert_eq!(rows.len(), 2);
3526
3527 let rows = find_with_store(
3529 &store,
3530 FindQuery {
3531 pkg: Some(pkg.to_string()),
3532 where_: Some(where_from(json!({
3533 "stats": {
3534 "equilibrium_position": { "in": ["dead", "fragile"] },
3535 },
3536 }))),
3537 ..Default::default()
3538 },
3539 )
3540 .unwrap();
3541 assert_eq!(rows.len(), 2);
3542
3543 let rows = find_with_store(
3546 &store,
3547 FindQuery {
3548 pkg: Some(pkg.to_string()),
3549 where_: Some(where_from(json!({
3550 "strategy_params": { "temperature": { "exists": false } },
3551 }))),
3552 ..Default::default()
3553 },
3554 )
3555 .unwrap();
3556 assert_eq!(rows.len(), 3, "none of the cards have strategy_params");
3557 }
3558
3559 #[test]
3560 fn find_order_by_multi_key() {
3561 let tmp = tempfile::tempdir().unwrap();
3562 let store = FileCardStore::new(tmp.path().to_path_buf());
3563 let pkg = "find_order_multi_pkg";
3564 create_with_store(
3565 &store,
3566 json!({
3567 "card_id": format!("{pkg}_a"),
3568 "pkg": { "name": pkg },
3569 "stats": { "pass_rate": 0.5 }
3570 }),
3571 )
3572 .unwrap();
3573 create_with_store(
3574 &store,
3575 json!({
3576 "card_id": format!("{pkg}_b"),
3577 "pkg": { "name": pkg },
3578 "stats": { "pass_rate": 0.9 }
3579 }),
3580 )
3581 .unwrap();
3582 create_with_store(
3583 &store,
3584 json!({
3585 "card_id": format!("{pkg}_c"),
3586 "pkg": { "name": pkg },
3587 "stats": { "pass_rate": 0.9 }
3588 }),
3589 )
3590 .unwrap();
3591
3592 let rows = find_with_store(
3593 &store,
3594 FindQuery {
3595 pkg: Some(pkg.to_string()),
3596 order_by: order_from(json!(["-stats.pass_rate", "card_id"])),
3597 ..Default::default()
3598 },
3599 )
3600 .unwrap();
3601 assert_eq!(rows.len(), 3);
3602 assert_eq!(rows[0].pass_rate, Some(0.9));
3603 assert_eq!(rows[1].pass_rate, Some(0.9));
3604 assert_eq!(rows[2].pass_rate, Some(0.5));
3605 assert!(rows[0].card_id < rows[1].card_id);
3607 }
3608
3609 #[test]
3610 fn find_offset_and_limit() {
3611 let tmp = tempfile::tempdir().unwrap();
3612 let store = FileCardStore::new(tmp.path().to_path_buf());
3613 let pkg = "find_offset_limit_pkg";
3614 for i in 0..5 {
3615 create_with_store(
3616 &store,
3617 json!({
3618 "card_id": format!("{pkg}_{i}"),
3619 "pkg": { "name": pkg },
3620 "stats": { "pass_rate": 0.1 * (i + 1) as f64 }
3621 }),
3622 )
3623 .unwrap();
3624 }
3625
3626 let rows = find_with_store(
3627 &store,
3628 FindQuery {
3629 pkg: Some(pkg.to_string()),
3630 order_by: order_from(json!("-stats.pass_rate")),
3631 offset: Some(1),
3632 limit: Some(2),
3633 ..Default::default()
3634 },
3635 )
3636 .unwrap();
3637 assert_eq!(rows.len(), 2);
3638 let pr0 = rows[0].pass_rate.unwrap();
3640 let pr1 = rows[1].pass_rate.unwrap();
3641 assert!((pr0 - 0.4).abs() < 1e-9, "got {pr0}");
3642 assert!((pr1 - 0.3).abs() < 1e-9, "got {pr1}");
3643 }
3644
3645 #[test]
3646 fn parse_where_rejects_non_object() {
3647 assert!(parse_where(&json!("not an object")).is_err());
3648 assert!(parse_where(&json!(42)).is_err());
3649 }
3650
3651 #[test]
3652 fn parse_order_by_accepts_string_and_array() {
3653 let k = parse_order_by(&json!("-stats.pass_rate")).unwrap();
3654 assert_eq!(k.len(), 1);
3655 assert_eq!(k[0].path, vec!["stats", "pass_rate"]);
3656 assert!(k[0].desc);
3657
3658 let k = parse_order_by(&json!(["created_at", "-stats.n"])).unwrap();
3659 assert_eq!(k.len(), 2);
3660 assert!(!k[0].desc);
3661 assert!(k[1].desc);
3662 }
3663
3664 #[test]
3665 fn find_where_string_ops_contains_and_starts_with() {
3666 let tmp = tempfile::tempdir().unwrap();
3667 let store = FileCardStore::new(tmp.path().to_path_buf());
3668 let pkg = "find_string_ops_pkg";
3669 create_with_store(
3670 &store,
3671 json!({
3672 "card_id": format!("{pkg}_a"),
3673 "pkg": { "name": pkg },
3674 "model": { "id": "claude-opus-4-6" },
3675 "metadata": { "tag": "experiment_alpha" },
3676 }),
3677 )
3678 .unwrap();
3679 create_with_store(
3680 &store,
3681 json!({
3682 "card_id": format!("{pkg}_b"),
3683 "pkg": { "name": pkg },
3684 "model": { "id": "claude-haiku-4-5-20251001" },
3685 "metadata": { "tag": "experiment_beta" },
3686 }),
3687 )
3688 .unwrap();
3689 create_with_store(
3690 &store,
3691 json!({
3692 "card_id": format!("{pkg}_c"),
3693 "pkg": { "name": pkg },
3694 "model": { "id": "claude-sonnet-4-5" },
3695 "metadata": { "tag": "baseline" },
3696 }),
3697 )
3698 .unwrap();
3699
3700 let rows = find_with_store(
3702 &store,
3703 FindQuery {
3704 pkg: Some(pkg.to_string()),
3705 where_: Some(where_from(json!({
3706 "metadata": { "tag": { "contains": "experiment" } },
3707 }))),
3708 ..Default::default()
3709 },
3710 )
3711 .unwrap();
3712 assert_eq!(rows.len(), 2);
3713
3714 let rows = find_with_store(
3716 &store,
3717 FindQuery {
3718 pkg: Some(pkg.to_string()),
3719 where_: Some(where_from(json!({
3720 "model": { "id": { "starts_with": "claude-opus" } },
3721 }))),
3722 ..Default::default()
3723 },
3724 )
3725 .unwrap();
3726 assert_eq!(rows.len(), 1);
3727 assert!(rows[0].card_id.ends_with("_a"));
3728
3729 let rows = find_with_store(
3731 &store,
3732 FindQuery {
3733 pkg: Some(pkg.to_string()),
3734 where_: Some(where_from(json!({
3735 "metadata": { "missing_field": { "contains": "x" } },
3736 }))),
3737 ..Default::default()
3738 },
3739 )
3740 .unwrap();
3741 assert_eq!(rows.len(), 0);
3742
3743 let rows = find_with_store(
3745 &store,
3746 FindQuery {
3747 pkg: Some(pkg.to_string()),
3748 where_: Some(where_from(json!({
3749 "metadata": { "tag": { "starts_with": 42 } },
3750 }))),
3751 ..Default::default()
3752 },
3753 )
3754 .unwrap();
3755 assert_eq!(rows.len(), 0);
3756 }
3757
3758 #[test]
3759 fn where_missing_field_ne_is_true() {
3760 let tmp = tempfile::tempdir().unwrap();
3761 let store = FileCardStore::new(tmp.path().to_path_buf());
3762 let pkg = "where_missing_ne_pkg";
3763 create_with_store(
3764 &store,
3765 json!({
3766 "card_id": format!("{pkg}_x"),
3767 "pkg": { "name": pkg },
3768 }),
3769 )
3770 .unwrap();
3771
3772 let rows = find_with_store(
3773 &store,
3774 FindQuery {
3775 pkg: Some(pkg.to_string()),
3776 where_: Some(where_from(json!({
3777 "strategy_params": { "temperature": { "ne": 0.5 } },
3778 }))),
3779 ..Default::default()
3780 },
3781 )
3782 .unwrap();
3783 assert_eq!(rows.len(), 1, "missing field is ne to anything");
3784 }
3785
3786 fn create_child(
3790 store: &FileCardStore,
3791 pkg: &str,
3792 suffix: &str,
3793 parent_id: &str,
3794 relation: &str,
3795 ) -> String {
3796 let (id, _) = create_with_store(
3797 store,
3798 json!({
3799 "card_id": format!("{pkg}_{suffix}"),
3800 "pkg": { "name": pkg },
3801 "stats": { "pass_rate": 0.5 },
3802 "metadata": {
3803 "prior_card_id": parent_id,
3804 "prior_relation": relation,
3805 },
3806 }),
3807 )
3808 .unwrap();
3809 id
3810 }
3811
3812 #[test]
3813 fn lineage_up_walks_prior_card_id_chain() {
3814 let tmp = tempfile::tempdir().unwrap();
3815 let store = FileCardStore::new(tmp.path().to_path_buf());
3816 let pkg = "lineage_up_pkg";
3817 let (a, _) = create_with_store(
3819 &store,
3820 json!({
3821 "card_id": format!("{pkg}_a"),
3822 "pkg": { "name": pkg },
3823 }),
3824 )
3825 .unwrap();
3826 let b = create_child(&store, pkg, "b", &a, "rerun_of");
3827 let c = create_child(&store, pkg, "c", &b, "rerun_of");
3828
3829 let res = lineage_with_store(
3830 &store,
3831 LineageQuery {
3832 card_id: c.clone(),
3833 direction: LineageDirection::Up,
3834 depth: None,
3835 include_stats: false,
3836 relation_filter: None,
3837 },
3838 )
3839 .unwrap()
3840 .expect("lineage result");
3841
3842 assert_eq!(res.root, c);
3843 assert_eq!(res.nodes.len(), 3, "root + 2 ancestors");
3844 assert_eq!(res.nodes[0].card_id, c);
3845 assert_eq!(res.nodes[0].depth, 0);
3846 assert_eq!(res.nodes[1].card_id, b);
3847 assert_eq!(res.nodes[1].depth, -1);
3848 assert_eq!(res.nodes[2].card_id, a);
3849 assert_eq!(res.nodes[2].depth, -2);
3850 assert_eq!(res.edges.len(), 2);
3851 assert!(!res.truncated);
3852 }
3853
3854 #[test]
3855 fn lineage_down_walks_descendants_breadth_first() {
3856 let tmp = tempfile::tempdir().unwrap();
3857 let store = FileCardStore::new(tmp.path().to_path_buf());
3858 let pkg = "lineage_down_pkg";
3859 let (a, _) = create_with_store(
3861 &store,
3862 json!({
3863 "card_id": format!("{pkg}_a"),
3864 "pkg": { "name": pkg },
3865 }),
3866 )
3867 .unwrap();
3868 let _b = create_child(&store, pkg, "b", &a, "sweep_variant");
3869 let c = create_child(&store, pkg, "c", &a, "sweep_variant");
3870 let _d = create_child(&store, pkg, "d", &c, "rerun_of");
3871
3872 let res = lineage_with_store(
3873 &store,
3874 LineageQuery {
3875 card_id: a.clone(),
3876 direction: LineageDirection::Down,
3877 depth: None,
3878 include_stats: false,
3879 relation_filter: None,
3880 },
3881 )
3882 .unwrap()
3883 .expect("lineage result");
3884
3885 assert_eq!(res.nodes.len(), 4);
3887 assert_eq!(res.edges.len(), 3);
3888 assert!(!res.truncated);
3889 }
3890
3891 #[test]
3892 fn lineage_depth_truncation_sets_flag() {
3893 let tmp = tempfile::tempdir().unwrap();
3894 let store = FileCardStore::new(tmp.path().to_path_buf());
3895 let pkg = "lineage_depth_pkg";
3896 let (a, _) = create_with_store(
3897 &store,
3898 json!({
3899 "card_id": format!("{pkg}_a"),
3900 "pkg": { "name": pkg },
3901 }),
3902 )
3903 .unwrap();
3904 let b = create_child(&store, pkg, "b", &a, "rerun_of");
3905 let _c = create_child(&store, pkg, "c", &b, "rerun_of");
3906
3907 let res = lineage_with_store(
3908 &store,
3909 LineageQuery {
3910 card_id: a,
3911 direction: LineageDirection::Down,
3912 depth: Some(1),
3913 include_stats: false,
3914 relation_filter: None,
3915 },
3916 )
3917 .unwrap()
3918 .unwrap();
3919 assert_eq!(res.nodes.len(), 2, "root + 1 level");
3920 assert!(res.truncated, "should be truncated at depth=1");
3921 }
3922
3923 #[test]
3924 fn lineage_relation_filter_skips_unlisted() {
3925 let tmp = tempfile::tempdir().unwrap();
3926 let store = FileCardStore::new(tmp.path().to_path_buf());
3927 let pkg = "lineage_filter_pkg";
3928 let (a, _) = create_with_store(
3929 &store,
3930 json!({
3931 "card_id": format!("{pkg}_a"),
3932 "pkg": { "name": pkg },
3933 }),
3934 )
3935 .unwrap();
3936 let _b = create_child(&store, pkg, "b", &a, "sweep_variant");
3937 let _c = create_child(&store, pkg, "c", &a, "rerun_of");
3938
3939 let res = lineage_with_store(
3940 &store,
3941 LineageQuery {
3942 card_id: a,
3943 direction: LineageDirection::Down,
3944 depth: None,
3945 include_stats: false,
3946 relation_filter: Some(vec!["sweep_variant".to_string()]),
3947 },
3948 )
3949 .unwrap()
3950 .unwrap();
3951 assert_eq!(res.nodes.len(), 2, "root + only sweep_variant child");
3952 assert_eq!(res.edges[0].relation.as_deref(), Some("sweep_variant"));
3953 }
3954
3955 #[test]
3956 fn lineage_missing_card_returns_none() {
3957 let tmp = tempfile::tempdir().unwrap();
3958 let store = FileCardStore::new(tmp.path().to_path_buf());
3959 let res = lineage_with_store(
3960 &store,
3961 LineageQuery {
3962 card_id: "nonexistent_card_id_xyz".into(),
3963 direction: LineageDirection::Up,
3964 depth: None,
3965 include_stats: false,
3966 relation_filter: None,
3967 },
3968 )
3969 .unwrap();
3970 assert!(res.is_none());
3971 }
3972
3973 #[test]
3979 fn write_and_read_samples_roundtrip() {
3980 let tmp = tempfile::tempdir().unwrap();
3981 let store = FileCardStore::new(tmp.path().to_path_buf());
3982 let (id, _) = create_with_store(
3983 &store,
3984 json!({
3985 "pkg": { "name": "roundtrip_pkg" },
3986 "stats": { "pass_rate": 0.5 }
3987 }),
3988 )
3989 .unwrap();
3990
3991 let samples = vec![
3992 json!({ "case": "c0", "passed": true, "score": 1.0 }),
3993 json!({ "case": "c1", "passed": false, "score": 0.0 }),
3994 json!({ "case": "c2", "passed": true, "score": 0.75 }),
3995 ];
3996 let path = write_samples_with_store(&store, &id, samples.clone()).unwrap();
3997 assert!(path.exists());
3998 assert!(path.to_string_lossy().ends_with(".samples.jsonl"));
3999
4000 let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4001 assert_eq!(got.len(), 3);
4002 assert_eq!(got[0]["case"], json!("c0"));
4003 assert_eq!(got[2]["score"], json!(0.75));
4004
4005 let slice = read_samples_with_store(
4007 &store,
4008 &id,
4009 SamplesQuery {
4010 offset: 1,
4011 limit: Some(1),
4012 where_: None,
4013 },
4014 )
4015 .unwrap();
4016 assert_eq!(slice.len(), 1);
4017 assert_eq!(slice[0]["case"], json!("c1"));
4018 }
4019
4020 #[test]
4021 fn write_samples_is_write_once() {
4022 let tmp = tempfile::tempdir().unwrap();
4023 let store = FileCardStore::new(tmp.path().to_path_buf());
4024 let (id, _) =
4025 create_with_store(&store, json!({ "pkg": { "name": "write_once_pkg" } })).unwrap();
4026 write_samples_with_store(&store, &id, vec![json!({ "x": 1 })]).unwrap();
4027 let err = write_samples_with_store(&store, &id, vec![json!({ "x": 2 })]).unwrap_err();
4028 assert!(err.contains("already exist"), "got: {err}");
4029 }
4030
4031 #[test]
4043 fn read_samples_empty_when_absent() {
4044 let tmp = tempfile::tempdir().unwrap();
4045 let store = FileCardStore::new(tmp.path().to_path_buf());
4046 let (id, _) = create_with_store(
4047 &store,
4048 json!({ "pkg": { "name": "read_samples_empty_pkg" } }),
4049 )
4050 .unwrap();
4051 let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4052 assert!(got.is_empty());
4053 }
4054
4055 #[test]
4056 fn read_samples_where_filters_rows() {
4057 let tmp = tempfile::tempdir().unwrap();
4058 let store = FileCardStore::new(tmp.path().to_path_buf());
4059 let (id, _) =
4060 create_with_store(&store, json!({ "pkg": { "name": "where_filter_pkg" } })).unwrap();
4061 write_samples_with_store(
4062 &store,
4063 &id,
4064 vec![
4065 json!({ "case": "c0", "passed": true, "score": 1.0 }),
4066 json!({ "case": "c1", "passed": false, "score": 0.0 }),
4067 json!({ "case": "c2", "passed": true, "score": 0.25 }),
4068 json!({ "case": "c3", "passed": true, "score": 0.75 }),
4069 json!({ "case": "c4", "passed": false, "score": 0.5 }),
4070 ],
4071 )
4072 .unwrap();
4073
4074 let pred = parse_where(&json!({ "passed": true })).unwrap();
4076 let got = read_samples_with_store(
4077 &store,
4078 &id,
4079 SamplesQuery {
4080 offset: 0,
4081 limit: None,
4082 where_: Some(pred),
4083 },
4084 )
4085 .unwrap();
4086 assert_eq!(got.len(), 3);
4087 assert_eq!(got[0]["case"], json!("c0"));
4088 assert_eq!(got[1]["case"], json!("c2"));
4089 assert_eq!(got[2]["case"], json!("c3"));
4090
4091 let pred = parse_where(&json!({ "score": { "gte": 0.5 } })).unwrap();
4093 let got = read_samples_with_store(
4094 &store,
4095 &id,
4096 SamplesQuery {
4097 offset: 0,
4098 limit: None,
4099 where_: Some(pred),
4100 },
4101 )
4102 .unwrap();
4103 assert_eq!(got.len(), 3);
4104 assert_eq!(got[0]["case"], json!("c0"));
4105 assert_eq!(got[1]["case"], json!("c3"));
4106 assert_eq!(got[2]["case"], json!("c4"));
4107
4108 let pred = parse_where(&json!({ "passed": true })).unwrap();
4110 let slice = read_samples_with_store(
4111 &store,
4112 &id,
4113 SamplesQuery {
4114 offset: 1,
4115 limit: Some(1),
4116 where_: Some(pred),
4117 },
4118 )
4119 .unwrap();
4120 assert_eq!(slice.len(), 1);
4121 assert_eq!(slice[0]["case"], json!("c2"));
4122 }
4123
4124 #[test]
4125 fn get_by_alias_roundtrip() {
4126 let tmp = tempfile::tempdir().unwrap();
4127 let store = FileCardStore::new(tmp.path().to_path_buf());
4128 let pkg = "get_by_alias_pkg";
4129 let (id, _) = create_with_store(
4130 &store,
4131 json!({
4132 "pkg": { "name": pkg },
4133 "stats": { "pass_rate": 0.85 }
4134 }),
4135 )
4136 .unwrap();
4137
4138 let alias_name = "best_by_alias";
4139 alias_set_with_store(&store, alias_name, &id, Some(pkg), None).unwrap();
4140
4141 let card = get_by_alias_with_store(&store, alias_name)
4142 .unwrap()
4143 .unwrap();
4144 assert_eq!(card["card_id"], json!(id));
4145 assert_eq!(card["stats"]["pass_rate"], json!(0.85));
4146
4147 assert!(get_by_alias_with_store(&store, "nonexistent_alias_xyz")
4148 .unwrap()
4149 .is_none());
4150 }
4151
4152 #[test]
4153 fn samples_errors_on_missing_card() {
4154 let tmp = tempfile::tempdir().unwrap();
4155 let store = FileCardStore::new(tmp.path().to_path_buf());
4156 let err = write_samples_with_store(&store, "does_not_exist_xyz_samples", vec![json!({})])
4157 .unwrap_err();
4158 assert!(err.contains("not found"));
4159 }
4160
4161 #[test]
4164 fn import_from_dir_copies_cards() {
4165 let pkg = "import_copies_pkg";
4166 let src_tmp = tempfile::tempdir().unwrap();
4167 let store_tmp = tempfile::tempdir().unwrap();
4168 let store = FileCardStore::new(store_tmp.path().to_path_buf());
4169
4170 let card_id = format!("{pkg}_imported");
4172 let card_content = format!(
4173 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{card_id}\"\npkg = \"{pkg}\"\n"
4174 );
4175 fs::write(
4176 src_tmp.path().join(format!("{card_id}.toml")),
4177 &card_content,
4178 )
4179 .unwrap();
4180
4181 fs::write(
4183 src_tmp.path().join(format!("{card_id}.samples.jsonl")),
4184 "{\"case\":\"c0\"}\n",
4185 )
4186 .unwrap();
4187
4188 let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4189 assert_eq!(imported, vec![card_id.clone()]);
4190 assert!(skipped.is_empty());
4191
4192 let got = get_with_store(&store, &card_id).unwrap().unwrap();
4194 assert_eq!(got["card_id"], json!(card_id));
4195
4196 let samples = read_samples_with_store(&store, &card_id, SamplesQuery::default()).unwrap();
4198 assert_eq!(samples.len(), 1);
4199 }
4200
4201 #[test]
4202 fn import_from_dir_skips_existing() {
4203 let store_tmp = tempfile::tempdir().unwrap();
4204 let store = FileCardStore::new(store_tmp.path().to_path_buf());
4205 let pkg = "import_skips_existing_pkg";
4206 let (existing_id, _) = create_with_store(
4208 &store,
4209 json!({
4210 "pkg": { "name": pkg },
4211 "stats": { "pass_rate": 0.5 }
4212 }),
4213 )
4214 .unwrap();
4215
4216 let src_tmp = tempfile::tempdir().unwrap();
4218 let card_content = format!(
4219 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{existing_id}\"\npkg = \"{pkg}\"\n"
4220 );
4221 fs::write(
4222 src_tmp.path().join(format!("{existing_id}.toml")),
4223 &card_content,
4224 )
4225 .unwrap();
4226
4227 let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4228 assert!(imported.is_empty());
4229 assert_eq!(skipped, vec![existing_id.clone()]);
4230
4231 let got = get_with_store(&store, &existing_id).unwrap().unwrap();
4233 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
4234 }
4235
4236 #[test]
4237 fn import_from_dir_skips_non_card_toml() {
4238 let store_tmp = tempfile::tempdir().unwrap();
4239 let store = FileCardStore::new(store_tmp.path().to_path_buf());
4240 let pkg = "import_skips_non_card_pkg";
4241 let src_tmp = tempfile::tempdir().unwrap();
4242
4243 fs::write(
4245 src_tmp.path().join("not_a_card.toml"),
4246 "title = \"hello\"\n",
4247 )
4248 .unwrap();
4249
4250 let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4251 assert!(imported.is_empty());
4252 assert!(skipped.is_empty());
4253 }
4254
4255 #[test]
4261 fn custom_root_file_store_roundtrip() {
4262 let tmp = tempfile::tempdir().unwrap();
4263 let store = FileCardStore::new(tmp.path().to_path_buf());
4264 let pkg = "custom_root_pkg";
4265
4266 let (id, path) = create_with_store(
4268 &store,
4269 json!({
4270 "pkg": { "name": pkg },
4271 "model": { "id": "gpt-test" },
4272 }),
4273 )
4274 .unwrap();
4275 assert!(path.starts_with(tmp.path()));
4276 assert!(path.ends_with(format!("{id}.toml")));
4277
4278 let card = get_with_store(&store, &id).unwrap().expect("card exists");
4279 assert_eq!(
4280 card.get("card_id").and_then(|v| v.as_str()),
4281 Some(id.as_str())
4282 );
4283
4284 let rows = list_with_store(&store, Some(pkg)).unwrap();
4285 assert_eq!(rows.len(), 1);
4286 assert_eq!(rows[0].card_id, id);
4287
4288 let other_tmp = tempfile::tempdir().unwrap();
4290 let other_store = FileCardStore::new(other_tmp.path().to_path_buf());
4291 let default_rows = list_with_store(&other_store, Some(pkg)).unwrap();
4292 assert!(default_rows.iter().all(|r| r.card_id != id));
4293
4294 alias_set_with_store(&store, "alpha", &id, Some(pkg), None).unwrap();
4296 let via_alias = get_by_alias_with_store(&store, "alpha")
4297 .unwrap()
4298 .expect("alias resolves");
4299 assert_eq!(
4300 via_alias.get("card_id").and_then(|v| v.as_str()),
4301 Some(id.as_str())
4302 );
4303
4304 let samples_path =
4306 write_samples_with_store(&store, &id, vec![json!({ "case": "a", "pass": true })])
4307 .unwrap();
4308 assert!(samples_path.starts_with(tmp.path()));
4309 let back = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4310 assert_eq!(back.len(), 1);
4311 assert_eq!(back[0].get("case").and_then(|v| v.as_str()), Some("a"));
4312 }
4313
4314 use std::sync::atomic::AtomicUsize;
4319 use std::sync::Barrier;
4320
4321 fn env_lock() -> &'static Mutex<()> {
4324 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4325 LOCK.get_or_init(|| Mutex::new(()))
4326 }
4327
4328 struct BusTestOwnerGuard;
4334 impl Drop for BusTestOwnerGuard {
4335 fn drop(&mut self) {
4336 INSIDE_BUS_TEST.with(|flag| flag.set(false));
4337 }
4338 }
4339
4340 fn with_bus_subscribers<F>(subs: Vec<Arc<dyn CardSubscriber>>, f: F)
4355 where
4356 F: FnOnce(&'static CardEventBus),
4357 {
4358 let _guard = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
4361 INSIDE_BUS_TEST.with(|flag| flag.set(true));
4365 let _owner = BusTestOwnerGuard;
4366 let bus = event_bus();
4367 bus.reset_stats_for_test();
4368 bus.replace_subscribers_for_test(subs);
4369 f(bus);
4370 bus.replace_subscribers_for_test(Vec::new());
4372 bus.reset_stats_for_test();
4373 }
4376
4377 struct MockSubscriber {
4379 uri: String,
4380 events: Mutex<Vec<CardEvent>>,
4381 calls: AtomicUsize,
4382 }
4383
4384 impl MockSubscriber {
4385 fn new(uri: &str) -> Arc<Self> {
4386 Arc::new(Self {
4387 uri: uri.to_string(),
4388 events: Mutex::new(Vec::new()),
4389 calls: AtomicUsize::new(0),
4390 })
4391 }
4392 fn call_count(&self) -> usize {
4393 self.calls.load(Ordering::SeqCst)
4394 }
4395 }
4396
4397 impl CardSubscriber for MockSubscriber {
4398 fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
4399 self.calls.fetch_add(1, Ordering::SeqCst);
4400 self.events
4401 .lock()
4402 .unwrap_or_else(|p| p.into_inner())
4403 .push(ev.clone());
4404 Ok(())
4405 }
4406 fn describe(&self) -> String {
4407 self.uri.clone()
4408 }
4409 }
4410
4411 #[test]
4414 fn bus_is_process_singleton() {
4415 let a = event_bus() as *const CardEventBus;
4416 let b = event_bus() as *const CardEventBus;
4417 assert_eq!(a, b, "event_bus() must return the same singleton pointer");
4418 }
4419
4420 #[test]
4421 fn publish_with_no_subscribers_is_noop() {
4422 with_bus_subscribers(Vec::new(), |_bus| {
4423 publish(CardEvent::Created {
4425 pkg: "pkg".into(),
4426 card_id: "id".into(),
4427 toml_text: "x = 1\n".into(),
4428 });
4429 });
4430 }
4431
4432 #[test]
4433 fn init_event_bus_is_idempotent() {
4434 init_event_bus();
4435 init_event_bus();
4436 init_event_bus();
4437 }
4439
4440 #[test]
4443 fn fanout_mirrors_create() {
4444 let primary = tempfile::tempdir().unwrap();
4445 let sub_a = tempfile::tempdir().unwrap();
4446 let sub_b = tempfile::tempdir().unwrap();
4447 let fa = Arc::new(FileCardSubscriber::new(sub_a.path().to_path_buf()));
4448 let fb = Arc::new(FileCardSubscriber::new(sub_b.path().to_path_buf()));
4449 with_bus_subscribers(vec![fa.clone(), fb.clone()], |_bus| {
4450 let store = FileCardStore::new(primary.path().to_path_buf());
4451 let (id, path) =
4452 create_with_store(&store, json!({ "pkg": { "name": "fanout_create_pkg" } }))
4453 .unwrap();
4454 assert!(path.exists());
4455 let primary_text = fs::read_to_string(&path).unwrap();
4456 let a_path = sub_a
4457 .path()
4458 .join("fanout_create_pkg")
4459 .join(format!("{id}.toml"));
4460 let b_path = sub_b
4461 .path()
4462 .join("fanout_create_pkg")
4463 .join(format!("{id}.toml"));
4464 assert!(a_path.exists(), "subscriber A missing card");
4465 assert!(b_path.exists(), "subscriber B missing card");
4466 assert_eq!(fs::read_to_string(&a_path).unwrap(), primary_text);
4467 assert_eq!(fs::read_to_string(&b_path).unwrap(), primary_text);
4468 });
4469 }
4470
4471 #[test]
4472 fn fanout_mirrors_append() {
4473 let primary = tempfile::tempdir().unwrap();
4474 let sub = tempfile::tempdir().unwrap();
4475 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4476 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4477 let store = FileCardStore::new(primary.path().to_path_buf());
4478 let (id, _) =
4479 create_with_store(&store, json!({ "pkg": { "name": "fanout_append_pkg" } }))
4480 .unwrap();
4481 append_with_store(&store, &id, json!({ "extra_key": 42 })).unwrap();
4483 let sub_path = sub
4484 .path()
4485 .join("fanout_append_pkg")
4486 .join(format!("{id}.toml"));
4487 let text = fs::read_to_string(&sub_path).unwrap();
4488 assert!(text.contains("extra_key"), "append must mirror new key");
4489 });
4490 }
4491
4492 #[test]
4493 fn fanout_mirrors_samples() {
4494 let primary = tempfile::tempdir().unwrap();
4495 let sub = tempfile::tempdir().unwrap();
4496 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4497 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4498 let store = FileCardStore::new(primary.path().to_path_buf());
4499 let (id, _) =
4500 create_with_store(&store, json!({ "pkg": { "name": "fanout_samples_pkg" } }))
4501 .unwrap();
4502 write_samples_with_store(&store, &id, vec![json!({ "case": "c0" })]).unwrap();
4503 let sub_path = sub
4504 .path()
4505 .join("fanout_samples_pkg")
4506 .join(format!("{id}.samples.jsonl"));
4507 let text = fs::read_to_string(&sub_path).unwrap();
4508 assert!(text.contains("\"case\":\"c0\""));
4509 });
4510 }
4511
4512 #[test]
4513 fn fanout_mirrors_aliases() {
4514 let primary = tempfile::tempdir().unwrap();
4515 let sub = tempfile::tempdir().unwrap();
4516 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4517 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4518 let store = FileCardStore::new(primary.path().to_path_buf());
4519 let (id, _) =
4520 create_with_store(&store, json!({ "pkg": { "name": "fanout_alias_pkg" } }))
4521 .unwrap();
4522 alias_set_with_store(&store, "myalias", &id, Some("fanout_alias_pkg"), None).unwrap();
4523 let sub_aliases = sub.path().join("_aliases.toml");
4524 assert!(sub_aliases.exists(), "subscriber must receive aliases file");
4525 let text = fs::read_to_string(&sub_aliases).unwrap();
4526 assert!(text.contains("myalias"));
4527 });
4528 }
4529
4530 #[test]
4531 fn fanout_mirrors_import_from_dir_cards() {
4532 let primary = tempfile::tempdir().unwrap();
4533 let sub = tempfile::tempdir().unwrap();
4534 let src = tempfile::tempdir().unwrap();
4535
4536 let src_card = src.path().join("card_x.toml");
4538 let toml_body = format!(
4539 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_x\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_pkg\"\n"
4540 );
4541 fs::write(&src_card, &toml_body).unwrap();
4542
4543 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4544 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4545 let store = FileCardStore::new(primary.path().to_path_buf());
4546 let (imported, _skipped) =
4547 import_from_dir_with_store(&store, src.path(), "fanout_import_pkg").unwrap();
4548 assert_eq!(imported, vec!["card_x".to_string()]);
4549
4550 let sub_card = sub.path().join("fanout_import_pkg").join("card_x.toml");
4551 assert!(sub_card.exists(), "imported card must be mirrored");
4552 });
4553 }
4554
4555 #[test]
4556 fn fanout_mirrors_import_from_dir_samples() {
4557 let primary = tempfile::tempdir().unwrap();
4558 let sub = tempfile::tempdir().unwrap();
4559 let src = tempfile::tempdir().unwrap();
4560
4561 let toml_body = format!(
4562 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_y\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_samples_pkg\"\n"
4563 );
4564 fs::write(src.path().join("card_y.toml"), &toml_body).unwrap();
4565 fs::write(
4566 src.path().join("card_y.samples.jsonl"),
4567 "{\"case\":\"c0\"}\n",
4568 )
4569 .unwrap();
4570
4571 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4572 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4573 let store = FileCardStore::new(primary.path().to_path_buf());
4574 let (imported, _) =
4575 import_from_dir_with_store(&store, src.path(), "fanout_import_samples_pkg")
4576 .unwrap();
4577 assert_eq!(imported, vec!["card_y".to_string()]);
4578
4579 let sub_samples = sub
4580 .path()
4581 .join("fanout_import_samples_pkg")
4582 .join("card_y.samples.jsonl");
4583 assert!(sub_samples.exists(), "imported samples must be mirrored");
4584 let text = fs::read_to_string(&sub_samples).unwrap();
4585 assert!(text.contains("c0"));
4586 });
4587 }
4588
4589 #[test]
4590 fn with_store_direct_call_still_publishes() {
4591 let primary = tempfile::tempdir().unwrap();
4592 let mock = MockSubscriber::new("mock://direct");
4593 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4594 let store = FileCardStore::new(primary.path().to_path_buf());
4595 create_with_store(&store, json!({ "pkg": { "name": "direct_call_pkg" } })).unwrap();
4596 assert_eq!(mock.call_count(), 1, "direct _with_store call must publish");
4597 });
4598 }
4599
4600 #[test]
4601 fn subscriber_appended_missing_card_warns() {
4602 let primary = tempfile::tempdir().unwrap();
4603 let sub = tempfile::tempdir().unwrap();
4604 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4605 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
4606 let store = FileCardStore::new(primary.path().to_path_buf());
4607 bus.replace_subscribers_for_test(Vec::new());
4610 let (id, _) =
4611 create_with_store(&store, json!({ "pkg": { "name": "missing_append_pkg" } }))
4612 .unwrap();
4613 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
4615
4616 append_with_store(&store, &id, json!({ "k": 1 })).unwrap();
4619
4620 let snap = bus.stats().snapshot();
4621 let row = snap
4622 .iter()
4623 .find(|r| r.sink == fs_sub.describe())
4624 .expect("subscriber row exists");
4625 let err_total: u64 = row.err.values().sum();
4626 assert!(err_total >= 1, "subscriber append err must be recorded");
4627 assert!(row.last_error.is_some());
4628 });
4629 }
4630
4631 #[test]
4632 fn subscriber_failure_preserves_primary() {
4633 struct FailingSubscriber;
4634 impl CardSubscriber for FailingSubscriber {
4635 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4636 Err("synthetic failure".into())
4637 }
4638 fn describe(&self) -> String {
4639 "mock://failing".into()
4640 }
4641 }
4642 let primary = tempfile::tempdir().unwrap();
4643 with_bus_subscribers(
4644 vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4645 |bus| {
4646 let store = FileCardStore::new(primary.path().to_path_buf());
4647 let (_id, path) =
4649 create_with_store(&store, json!({ "pkg": { "name": "preserve_primary_pkg" } }))
4650 .unwrap();
4651 assert!(path.exists());
4652 let snap = bus.stats().snapshot();
4653 let row = snap
4654 .iter()
4655 .find(|r| r.sink == "mock://failing")
4656 .expect("row exists");
4657 let err_total: u64 = row.err.values().sum();
4658 assert!(err_total >= 1);
4659 },
4660 );
4661 }
4662
4663 #[test]
4666 fn stats_counts_ok() {
4667 let primary = tempfile::tempdir().unwrap();
4668 let mock = MockSubscriber::new("mock://stats-ok");
4669 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |bus| {
4670 let store = FileCardStore::new(primary.path().to_path_buf());
4671 for i in 0..3 {
4672 create_with_store(
4673 &store,
4674 json!({
4675 "card_id": format!("stats_ok_{i}"),
4676 "pkg": { "name": "stats_ok_pkg" },
4677 }),
4678 )
4679 .unwrap();
4680 }
4681 let snap = bus.stats().snapshot();
4682 let row = snap
4683 .iter()
4684 .find(|r| r.sink == "mock://stats-ok")
4685 .expect("row");
4686 assert_eq!(row.ok.get("created").copied().unwrap_or(0), 3);
4687 assert_eq!(row.err.get("created").copied().unwrap_or(0), 0);
4688 for k in ["created", "appended", "samples", "aliases"] {
4690 assert!(row.ok.contains_key(k), "ok.{k} must be present");
4691 assert!(row.err.contains_key(k), "err.{k} must be present");
4692 }
4693 assert!(row.last_error.is_none());
4694 });
4695 }
4696
4697 #[test]
4698 fn stats_counts_err_with_last_error() {
4699 struct FailingSubscriber;
4700 impl CardSubscriber for FailingSubscriber {
4701 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4702 Err("synthetic create failure".into())
4703 }
4704 fn describe(&self) -> String {
4705 "mock://stats-err".into()
4706 }
4707 }
4708 let primary = tempfile::tempdir().unwrap();
4709 with_bus_subscribers(
4710 vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4711 |bus| {
4712 let store = FileCardStore::new(primary.path().to_path_buf());
4713 create_with_store(&store, json!({ "pkg": { "name": "stats_err_pkg" } })).unwrap();
4714 let snap = bus.stats().snapshot();
4715 let row = snap
4716 .iter()
4717 .find(|r| r.sink == "mock://stats-err")
4718 .expect("row");
4719 assert_eq!(row.err.get("created").copied().unwrap_or(0), 1);
4720 let le = row.last_error.as_ref().expect("last_error set");
4721 assert!(!le.msg.is_empty(), "last_error.msg must be non-empty");
4722 assert_eq!(le.kind, CardEventKind::Created);
4723 assert!(le.ts_ms > 0, "last_error.ts_ms must be populated");
4724 },
4725 );
4726 }
4727
4728 #[test]
4729 fn stats_per_subscriber_isolated() {
4730 struct FailingSubscriber;
4731 impl CardSubscriber for FailingSubscriber {
4732 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4733 Err("sub1 fails".into())
4734 }
4735 fn describe(&self) -> String {
4736 "mock://sub1-fail".into()
4737 }
4738 }
4739 let primary = tempfile::tempdir().unwrap();
4740 let mock_ok = MockSubscriber::new("mock://sub2-ok");
4741 let subs: Vec<Arc<dyn CardSubscriber>> = vec![
4742 Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>,
4743 mock_ok.clone() as Arc<dyn CardSubscriber>,
4744 ];
4745 with_bus_subscribers(subs, |bus| {
4746 let store = FileCardStore::new(primary.path().to_path_buf());
4747 create_with_store(&store, json!({ "pkg": { "name": "isolated_pkg" } })).unwrap();
4748 let snap = bus.stats().snapshot();
4749 let r1 = snap
4750 .iter()
4751 .find(|r| r.sink == "mock://sub1-fail")
4752 .expect("sub1 row");
4753 let r2 = snap
4754 .iter()
4755 .find(|r| r.sink == "mock://sub2-ok")
4756 .expect("sub2 row");
4757 assert_eq!(r1.err.get("created").copied().unwrap_or(0), 1);
4758 assert_eq!(r1.ok.get("created").copied().unwrap_or(0), 0);
4759 assert_eq!(r2.ok.get("created").copied().unwrap_or(0), 1);
4760 assert_eq!(r2.err.get("created").copied().unwrap_or(0), 0);
4761 assert!(r1.last_error.is_some());
4762 assert!(r2.last_error.is_none());
4763 });
4764 }
4765
4766 #[test]
4767 fn subscriber_stats_survive_multiple_calls() {
4768 let primary = tempfile::tempdir().unwrap();
4773 let mock = MockSubscriber::new("mock://stats-survive");
4774 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4775 let store = FileCardStore::new(primary.path().to_path_buf());
4776 for i in 0..3 {
4777 create_with_store(
4778 &store,
4779 json!({
4780 "card_id": format!("survive_{i}"),
4781 "pkg": { "name": "survive_pkg" },
4782 }),
4783 )
4784 .unwrap();
4785 }
4786 let snap = subscriber_stats_snapshot();
4789 let row = snap
4790 .iter()
4791 .find(|r| r.sink == "mock://stats-survive")
4792 .expect("row");
4793 assert_eq!(
4794 row.ok.get("created").copied().unwrap_or(0),
4795 3,
4796 "counters must accumulate across calls"
4797 );
4798 });
4799 }
4800
4801 #[test]
4802 fn stats_snapshot_serializes_with_all_kind_keys() {
4803 let primary = tempfile::tempdir().unwrap();
4805 let mock = MockSubscriber::new("mock://json-shape");
4806 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4807 let store = FileCardStore::new(primary.path().to_path_buf());
4808 create_with_store(&store, json!({ "pkg": { "name": "json_shape_pkg" } })).unwrap();
4809 let snap = subscriber_stats_snapshot();
4810 let json = serde_json::to_value(&snap).expect("serializable");
4811 let arr = json.as_array().expect("array");
4812 let row = arr
4813 .iter()
4814 .find(|r| r.get("sink").and_then(|s| s.as_str()) == Some("mock://json-shape"))
4815 .expect("row present in JSON");
4816 assert_eq!(row.get("sink").unwrap(), "mock://json-shape");
4817 for k in ["created", "appended", "samples", "aliases"] {
4818 assert!(row.pointer(&format!("/ok/{k}")).is_some(), "ok.{k} missing");
4819 assert!(
4820 row.pointer(&format!("/err/{k}")).is_some(),
4821 "err.{k} missing"
4822 );
4823 }
4824 assert!(row.get("last_error").is_some());
4825 });
4826 }
4827
4828 #[test]
4829 fn multi_process_tmp_unique_suffix() {
4830 let pid = process::id();
4837 let sample = format!("whatever.tmp.{pid}.0");
4838 let rest = sample.trim_start_matches("whatever.tmp.");
4840 let (pid_part, seq_part) = rest.split_once('.').expect("dotted form");
4841 assert!(pid_part.chars().all(|c| c.is_ascii_digit()));
4842 assert!(seq_part.chars().all(|c| c.is_ascii_digit()));
4843
4844 let dir = tempfile::tempdir().unwrap();
4847 let dest = dir.path().join("out.txt");
4848 atomic_write(&dest, b"hello").unwrap();
4849 assert_eq!(fs::read_to_string(&dest).unwrap(), "hello");
4850 }
4851
4852 #[cfg(unix)]
4855 #[test]
4856 fn describe_roundtrips_env_spec() {
4857 let dir = tempfile::tempdir().unwrap();
4858 let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4859 let uri = sub.describe();
4860 assert!(uri.starts_with("file:///"), "unix uri must be file:///...");
4861 let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4863 assert_eq!(parsed.describe(), uri);
4864 }
4865
4866 #[cfg(windows)]
4867 #[test]
4868 fn describe_roundtrips_env_spec_windows() {
4869 let dir = tempfile::tempdir().unwrap();
4870 let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4871 let uri = sub.describe();
4872 assert!(
4873 uri.starts_with("file:///"),
4874 "windows uri must be file:///..."
4875 );
4876 let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4877 assert_eq!(parsed.describe(), uri);
4878 }
4879
4880 #[test]
4881 fn env_empty_means_no_subscribers() {
4882 let _g = env_lock().lock().unwrap_or_else(|p| p.into_inner());
4885 let prev = std::env::var("ALC_CARD_SINKS").ok();
4886 unsafe {
4888 std::env::set_var("ALC_CARD_SINKS", "");
4889 }
4890 let subs = load_subscribers_from_env();
4891 assert!(subs.is_empty());
4892 unsafe {
4894 match prev {
4895 Some(v) => std::env::set_var("ALC_CARD_SINKS", v),
4896 None => std::env::remove_var("ALC_CARD_SINKS"),
4897 }
4898 }
4899 }
4900
4901 #[test]
4902 fn env_parse_rejects_bare_path() {
4903 assert!(parse_subscriber_spec("/foo/bar").is_none());
4904 }
4905
4906 #[test]
4907 fn env_parse_rejects_unknown_scheme() {
4908 assert!(parse_subscriber_spec("sqlite:///foo").is_none());
4909 assert!(parse_subscriber_spec("s3://bucket/foo").is_none());
4910 assert!(parse_subscriber_spec("http://example.com/x").is_none());
4911 }
4912
4913 #[test]
4914 fn env_parse_rejects_non_empty_authority() {
4915 assert!(parse_subscriber_spec("file://host/path").is_none());
4916 }
4917
4918 #[test]
4919 fn env_parse_rejects_missing_double_slash() {
4920 assert!(parse_subscriber_spec("file:/foo").is_none());
4921 assert!(parse_subscriber_spec("file:foo").is_none());
4922 }
4923
4924 #[cfg(unix)]
4925 #[test]
4926 fn env_parse_accepts_file_uri() {
4927 let sub = parse_subscriber_spec("file:///tmp/algocline-sinks-unit").expect("accepted");
4928 assert_eq!(sub.describe(), "file:///tmp/algocline-sinks-unit");
4929 }
4930
4931 #[cfg(windows)]
4932 #[test]
4933 fn env_parse_accepts_file_uri_windows() {
4934 let sub = parse_subscriber_spec("file:///C:/algocline-sinks-unit").expect("accepted");
4935 assert!(sub.describe().starts_with("file:///"));
4937 }
4938
4939 #[test]
4940 fn env_parse_splits_by_pipe() {
4941 let subs = parse_subscribers_from_str("file:///tmp/a|file:///tmp/b");
4942 assert_eq!(subs.len(), 2);
4943 assert_eq!(subs[0].describe(), "file:///tmp/a");
4944 assert_eq!(subs[1].describe(), "file:///tmp/b");
4945 }
4946
4947 #[test]
4948 fn env_parse_treats_colon_as_literal_path() {
4949 #[cfg(unix)]
4951 {
4952 let sub = parse_subscriber_spec("file:///tmp/a:b").expect("accepted");
4953 assert_eq!(sub.describe(), "file:///tmp/a:b");
4954 }
4955 #[cfg(windows)]
4956 {
4957 let sub = parse_subscriber_spec("file:///C:/a:b").expect("accepted");
4959 assert!(sub.describe().contains(":"));
4960 }
4961 }
4962
4963 #[test]
4964 fn env_parse_percent_decode_space() {
4965 #[cfg(unix)]
4966 {
4967 let sub = parse_subscriber_spec("file:///tmp/a%20b").expect("accepted");
4968 assert_eq!(sub.describe(), "file:///tmp/a b");
4969 }
4970 #[cfg(windows)]
4971 {
4972 let sub = parse_subscriber_spec("file:///C:/a%20b").expect("accepted");
4973 assert!(sub.describe().contains(' '));
4974 }
4975 }
4976
4977 #[test]
4978 fn env_parse_percent_decode_rejects_invalid_hex() {
4979 assert!(parse_subscriber_spec("file:///tmp/a%ZZb").is_none());
4980 }
4981
4982 #[test]
4983 fn env_parse_percent_decode_rejects_incomplete() {
4984 assert!(parse_subscriber_spec("file:///tmp/a%2").is_none());
4985 assert!(parse_subscriber_spec("file:///tmp/a%").is_none());
4986 }
4987
4988 #[test]
4989 fn env_parse_rejects_non_utf8() {
4990 assert!(parse_subscriber_spec("file:///tmp/%C3%28").is_none());
4999 }
5000
5001 #[test]
5002 fn env_parse_dedups_duplicate_uris() {
5003 let subs = parse_subscribers_from_str("file:///tmp/x|file:///tmp/x|file:///tmp/y");
5004 assert_eq!(subs.len(), 2);
5005 assert_eq!(subs[0].describe(), "file:///tmp/x");
5006 assert_eq!(subs[1].describe(), "file:///tmp/y");
5007 }
5008
5009 #[test]
5014 fn test_oncelock_init_race_single_winner() {
5015 let barrier = Arc::new(Barrier::new(8));
5018 let mut handles = Vec::new();
5019 for _ in 0..8 {
5020 let b = barrier.clone();
5021 handles.push(std::thread::spawn(move || {
5022 b.wait();
5023 event_bus() as *const CardEventBus as usize
5024 }));
5025 }
5026 let ptrs: Vec<usize> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5027 let first = ptrs[0];
5028 for p in &ptrs {
5029 assert_eq!(*p, first, "singleton identity must hold across threads");
5030 }
5031 }
5032
5033 #[test]
5034 fn test_subscriber_stats_concurrent_update() {
5035 let stats = Arc::new(SubscriberStats::default());
5036 let n_threads = 4;
5037 let per_thread = 250;
5038 let barrier = Arc::new(Barrier::new(n_threads));
5039 let mut handles = Vec::new();
5040 for t in 0..n_threads {
5041 let s = stats.clone();
5042 let b = barrier.clone();
5043 handles.push(std::thread::spawn(move || {
5044 b.wait();
5045 for i in 0..per_thread {
5046 let kind = if (t + i) % 2 == 0 {
5047 CardEventKind::Created
5048 } else {
5049 CardEventKind::Appended
5050 };
5051 s.record_ok("mock://same-subscriber", kind);
5052 }
5053 }));
5054 }
5055 for h in handles {
5056 h.join().unwrap();
5057 }
5058 let snap = stats.snapshot();
5059 let row = snap
5060 .iter()
5061 .find(|r| r.sink == "mock://same-subscriber")
5062 .expect("row");
5063 let expected = (n_threads * per_thread) as u64;
5064 let ok_total: u64 = row.ok.values().sum();
5065 assert_eq!(ok_total, expected, "all increments must be counted");
5066 }
5067
5068 #[test]
5069 fn test_subscriber_stats_poison_recovery() {
5070 let stats = Arc::new(SubscriberStats::default());
5071 stats.record_ok("mock://poison", CardEventKind::Created);
5073
5074 let s_clone = stats.clone();
5076 let _ = std::thread::spawn(move || {
5077 let _g = s_clone.inner.lock().unwrap();
5078 panic!("intentional poison");
5079 })
5080 .join();
5081
5082 let snap = stats.snapshot();
5084 assert!(!snap.is_empty(), "snapshot after poison must still work");
5085 let ok1: u64 = snap[0].ok.values().sum();
5086 assert_eq!(ok1, 1);
5087
5088 stats.record_ok("mock://poison", CardEventKind::Created);
5090 let snap2 = stats.snapshot();
5091 let ok2: u64 = snap2[0].ok.values().sum();
5092 assert_eq!(ok2, 2);
5093 }
5094
5095 #[test]
5096 fn test_atomic_tmp_seq_unique_under_concurrency() {
5097 let dir = tempfile::tempdir().unwrap();
5100 let barrier = Arc::new(Barrier::new(8));
5101 let mut handles = Vec::new();
5102 for i in 0..8 {
5103 let d = dir.path().to_path_buf();
5104 let b = barrier.clone();
5105 handles.push(std::thread::spawn(move || {
5106 b.wait();
5107 let dest = d.join(format!("file_{i}.bin"));
5108 atomic_write(&dest, b"x").unwrap();
5109 dest.file_name().unwrap().to_string_lossy().to_string()
5111 }));
5112 }
5113 let names: HashSet<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5114 assert_eq!(names.len(), 8, "all dest names must be unique");
5115 }
5118
5119 #[test]
5120 fn test_atomic_tmp_seq_wraps_without_panic() {
5121 let seq = AtomicU64::new(u64::MAX - 1);
5124 let a = seq.fetch_add(1, Ordering::Relaxed);
5125 let b = seq.fetch_add(1, Ordering::Relaxed);
5126 let c = seq.fetch_add(1, Ordering::Relaxed);
5127 assert_eq!(a, u64::MAX - 1);
5128 assert_eq!(b, u64::MAX);
5129 assert_eq!(c, 0, "u64 fetch_add must wrap to 0");
5130 }
5131
5132 #[test]
5133 fn test_rename_atomicity_same_volume() {
5134 let dir = tempfile::tempdir().unwrap();
5138 let dest = dir.path().join("shared.bin");
5139 let barrier = Arc::new(Barrier::new(2));
5140 let mut handles = Vec::new();
5141 for i in 0..2u8 {
5142 let d = dest.clone();
5143 let b = barrier.clone();
5144 handles.push(std::thread::spawn(move || {
5145 b.wait();
5146 let payload = vec![i; 64];
5147 atomic_write(&d, &payload)
5148 }));
5149 }
5150 let mut saw_ok = 0;
5151 for h in handles {
5152 #[cfg(unix)]
5153 {
5154 h.join().unwrap().unwrap();
5156 saw_ok += 1;
5157 }
5158 #[cfg(not(unix))]
5159 {
5160 if h.join().unwrap().is_ok() {
5162 saw_ok += 1;
5163 }
5164 }
5165 }
5166 assert!(saw_ok >= 1, "at least one rename must succeed");
5167 assert!(dest.exists(), "dest must exist after concurrent rename");
5168 let bytes = fs::read(&dest).unwrap();
5169 assert!(bytes == vec![0u8; 64] || bytes == vec![1u8; 64]);
5170 }
5171
5172 #[test]
5173 fn test_fanout_concurrent_create_with_store() {
5174 let primary = tempfile::tempdir().unwrap();
5175 let sub = tempfile::tempdir().unwrap();
5176 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
5177 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5178 let primary_path = primary.path().to_path_buf();
5179 let barrier = Arc::new(Barrier::new(4));
5180 let mut handles = Vec::new();
5181 for i in 0..4 {
5182 let pp = primary_path.clone();
5183 let b = barrier.clone();
5184 handles.push(std::thread::spawn(move || {
5185 INSIDE_BUS_TEST.with(|flag| flag.set(true));
5191 b.wait();
5192 let store = FileCardStore::new(pp);
5193 create_with_store(
5194 &store,
5195 json!({
5196 "card_id": format!("concur_card_{i}"),
5197 "pkg": { "name": "concur_pkg" },
5198 }),
5199 )
5200 .unwrap()
5201 .0
5202 }));
5203 }
5204 let ids: Vec<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5205 assert_eq!(ids.len(), 4);
5206 for id in &ids {
5207 let p = sub.path().join("concur_pkg").join(format!("{id}.toml"));
5208 assert!(p.exists(), "subscriber must have card {id}");
5209 }
5210 let snap = bus.stats().snapshot();
5211 let row = snap
5212 .iter()
5213 .find(|r| r.sink == fs_sub.describe())
5214 .expect("row");
5215 let ok_total: u64 = row.ok.values().sum();
5216 assert_eq!(
5217 ok_total, 4,
5218 "subscriber must have recorded 4 successful deliveries"
5219 );
5220 });
5221 }
5222
5223 fn backfill_primary_with_cards(
5229 pkg: &str,
5230 count: usize,
5231 ) -> (tempfile::TempDir, FileCardStore, Vec<String>) {
5232 let primary = tempfile::tempdir().unwrap();
5233 let store = FileCardStore::new(primary.path().to_path_buf());
5234 let mut ids = Vec::new();
5235 for i in 0..count {
5236 let (id, _) = create_with_store(
5237 &store,
5238 json!({
5239 "card_id": format!("{pkg}_{i}"),
5240 "pkg": { "name": pkg },
5241 }),
5242 )
5243 .unwrap();
5244 ids.push(id);
5245 }
5246 (primary, store, ids)
5247 }
5248
5249 #[test]
5250 fn backfill_pushes_missing_cards() {
5251 let sub_dir = tempfile::tempdir().unwrap();
5252 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5253 let uri = fs_sub.describe();
5254 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5255 let bus = event_bus();
5257 bus.replace_subscribers_for_test(Vec::new());
5258 let (_primary, store, ids) = backfill_primary_with_cards("backfill_push_pkg", 2);
5259 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5260
5261 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5262 assert_eq!(report.pushed.len(), 2);
5263 assert_eq!(report.skipped.len(), 0);
5264 assert!(report.failed.is_empty());
5265 for id in &ids {
5266 let p = sub_dir
5267 .path()
5268 .join("backfill_push_pkg")
5269 .join(format!("{id}.toml"));
5270 assert!(p.exists(), "card {id} must exist on subscriber");
5271 }
5272 });
5273 }
5274
5275 #[test]
5276 fn backfill_skips_existing_on_subscriber() {
5277 let sub_dir = tempfile::tempdir().unwrap();
5278 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5279 let uri = fs_sub.describe();
5280 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5281 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_skip_pkg", 3);
5283 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5284 assert_eq!(report.pushed.len(), 0);
5285 assert_eq!(report.skipped.len(), 3);
5286 assert!(report.failed.is_empty());
5287 });
5288 }
5289
5290 #[test]
5291 fn backfill_dry_run_no_writes() {
5292 let sub_dir = tempfile::tempdir().unwrap();
5293 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5294 let uri = fs_sub.describe();
5295 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5296 let bus = event_bus();
5297 bus.replace_subscribers_for_test(Vec::new());
5298 let (_primary, store, ids) = backfill_primary_with_cards("backfill_dry_pkg", 2);
5299 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5300
5301 let report = card_sink_backfill_with_store(&store, &uri, true).unwrap();
5302 assert_eq!(
5303 report.pushed.len(),
5304 2,
5305 "pushed must list ids even in dry run"
5306 );
5307 for id in &ids {
5308 let p = sub_dir
5309 .path()
5310 .join("backfill_dry_pkg")
5311 .join(format!("{id}.toml"));
5312 assert!(!p.exists(), "dry run must NOT write card {id}");
5313 }
5314 let snap = bus.stats().snapshot();
5316 if let Some(row) = snap.iter().find(|r| r.sink == uri) {
5317 let total: u64 = row.ok.values().sum::<u64>() + row.err.values().sum::<u64>();
5318 assert_eq!(total, 0, "dry run must not touch stats");
5319 }
5320 });
5321 }
5322
5323 #[test]
5324 fn backfill_drifted_card_skipped_not_overwritten() {
5325 let sub_dir = tempfile::tempdir().unwrap();
5326 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5327 let uri = fs_sub.describe();
5328 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5329 let bus = event_bus();
5330 bus.replace_subscribers_for_test(Vec::new());
5331 let (_primary, store, ids) = backfill_primary_with_cards("backfill_drift_pkg", 1);
5332 let id = &ids[0];
5333
5334 let sub_card_dir = sub_dir.path().join("backfill_drift_pkg");
5336 fs::create_dir_all(&sub_card_dir).unwrap();
5337 let sub_card = sub_card_dir.join(format!("{id}.toml"));
5338 fs::write(&sub_card, "drifted=true\n").unwrap();
5339
5340 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5341 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5342 assert_eq!(report.skipped, vec![id.clone()]);
5343 assert!(report.pushed.is_empty());
5344 let after = fs::read_to_string(&sub_card).unwrap();
5345 assert_eq!(after, "drifted=true\n", "drifted copy must be preserved");
5346 });
5347 }
5348
5349 #[test]
5350 fn backfill_includes_samples() {
5351 let sub_dir = tempfile::tempdir().unwrap();
5352 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5353 let uri = fs_sub.describe();
5354 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5355 let bus = event_bus();
5356 bus.replace_subscribers_for_test(Vec::new());
5357 let (_primary, store, ids) = backfill_primary_with_cards("backfill_samples_pkg", 1);
5358 let id = &ids[0];
5359 write_samples_with_store(&store, id, vec![json!({ "case": "c0" })]).unwrap();
5360 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5361
5362 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5363 assert_eq!(report.pushed, vec![id.clone()]);
5364 assert_eq!(report.pushed_samples, vec![id.clone()]);
5365 let sub_samples = sub_dir
5366 .path()
5367 .join("backfill_samples_pkg")
5368 .join(format!("{id}.samples.jsonl"));
5369 assert!(sub_samples.exists());
5370 assert!(fs::read_to_string(&sub_samples).unwrap().contains("c0"));
5371 });
5372 }
5373
5374 #[test]
5375 fn backfill_unknown_sink_err() {
5376 with_bus_subscribers(Vec::new(), |_bus| {
5377 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_unknown_pkg", 1);
5378 let err = card_sink_backfill_with_store(&store, "file:///nonexistent/sink", false)
5379 .unwrap_err();
5380 assert!(
5381 err.starts_with("unknown sink"),
5382 "must reject unregistered sink; got: {err}"
5383 );
5384 });
5385 }
5386
5387 #[test]
5388 fn backfill_bypasses_bus_fanout() {
5389 let sub_a_dir = tempfile::tempdir().unwrap();
5392 let sub_b_dir = tempfile::tempdir().unwrap();
5393 let fa = Arc::new(FileCardSubscriber::new(sub_a_dir.path().to_path_buf()));
5394 let fb = Arc::new(FileCardSubscriber::new(sub_b_dir.path().to_path_buf()));
5395 let uri_b = fb.describe();
5396 with_bus_subscribers(
5397 vec![
5398 fa.clone() as Arc<dyn CardSubscriber>,
5399 fb.clone() as Arc<dyn CardSubscriber>,
5400 ],
5401 |bus| {
5402 bus.replace_subscribers_for_test(vec![fa.clone()]);
5404 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_bypass_pkg", 2);
5405 let before = bus
5407 .stats()
5408 .snapshot()
5409 .into_iter()
5410 .find(|r| r.sink == fa.describe())
5411 .map(|r| r.ok.get("created").copied().unwrap_or(0))
5412 .unwrap_or(0);
5413 bus.replace_subscribers_for_test(vec![fa.clone(), fb.clone()]);
5415 card_sink_backfill_with_store(&store, &uri_b, false).unwrap();
5416 let after = bus
5417 .stats()
5418 .snapshot()
5419 .into_iter()
5420 .find(|r| r.sink == fa.describe())
5421 .map(|r| r.ok.get("created").copied().unwrap_or(0))
5422 .unwrap_or(0);
5423 assert_eq!(
5424 before, after,
5425 "backfill target B must not cause fan-out to subscriber A"
5426 );
5427 },
5428 );
5429 }
5430
5431 #[test]
5432 fn backfill_updates_subscriber_stats() {
5433 let sub_dir = tempfile::tempdir().unwrap();
5434 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5435 let uri = fs_sub.describe();
5436 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5437 bus.replace_subscribers_for_test(Vec::new());
5438 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_stats_pkg", 2);
5439 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5440
5441 card_sink_backfill_with_store(&store, &uri, false).unwrap();
5442 let snap = bus.stats().snapshot();
5443 let row = snap.iter().find(|r| r.sink == uri).expect("row");
5444 assert_eq!(
5445 row.ok.get("created").copied().unwrap_or(0),
5446 2,
5447 "backfill must increment ok[created] on the target sink"
5448 );
5449 });
5450 }
5451
5452 #[test]
5453 fn backfill_failure_records_err_stat() {
5454 struct FailingSub {
5456 uri: String,
5457 }
5458 impl CardSubscriber for FailingSub {
5459 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
5460 Err("synthetic backfill failure".into())
5461 }
5462 fn has_card(&self, _card_id: &str) -> Result<bool, String> {
5463 Ok(false)
5464 }
5465 fn describe(&self) -> String {
5466 self.uri.clone()
5467 }
5468 }
5469 let uri = "mock://backfill-fail".to_string();
5470 let failing: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5471 with_bus_subscribers(vec![failing], |bus| {
5472 bus.replace_subscribers_for_test(Vec::new());
5473 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_fail_pkg", 1);
5474 let reinstall: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5476 bus.replace_subscribers_for_test(vec![reinstall]);
5477
5478 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5479 assert_eq!(
5480 report.failed.len(),
5481 1,
5482 "failed must record the synthetic err"
5483 );
5484 assert!(report.pushed.is_empty());
5485 let snap = bus.stats().snapshot();
5486 let row = snap.iter().find(|r| r.sink == uri).expect("row");
5487 assert!(
5488 row.err.get("created").copied().unwrap_or(0) >= 1,
5489 "failing publish must increment err[created]"
5490 );
5491 assert!(row.last_error.is_some());
5492 });
5493 }
5494
5495 #[test]
5496 fn test_oncelock_set_after_init_returns_err() {
5497 let _ = event_bus();
5499 let result = install_event_bus_for_test(CardEventBus::new(Vec::new()));
5500 assert!(
5501 result.is_err(),
5502 "install after init must return Err per OnceLock contract"
5503 );
5504 assert_eq!(result.unwrap_err(), "bus already initialized");
5505 }
5506}