1use std::collections::{HashMap, HashSet};
87use std::fs;
88use std::path::{Path, PathBuf};
89use std::process;
90use std::sync::atomic::{AtomicU64, Ordering};
91use std::sync::{Arc, Mutex, OnceLock};
92
93use serde::{Serialize, Serializer};
94use serde_json::{json, Value as Json};
95
96pub const SCHEMA_VERSION: &str = "card/v0";
97
98pub trait CardStore: Send + Sync {
122 fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
133
134 fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
140
141 fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String>;
143
144 fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String>;
146
147 fn list_card_locators(
154 &self,
155 pkg_filter: Option<&str>,
156 ) -> Result<Vec<(String, PathBuf)>, String>;
157
158 fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String>;
162
163 fn read_aliases(&self) -> Result<Vec<Alias>, String>;
166 fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String>;
167
168 fn samples_exists(&self, card_id: &str) -> Result<bool, String>;
172
173 fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String>;
179
180 fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String>;
183
184 fn import_from_dir(
190 &self,
191 source_dir: &Path,
192 pkg: &str,
193 ) -> Result<(Vec<String>, Vec<String>), String>;
194}
195
196fn validate_name(name: &str, kind: &str) -> Result<(), String> {
197 if name.is_empty()
198 || name.contains('/')
199 || name.contains('\\')
200 || name.contains("..")
201 || name.contains('\0')
202 {
203 return Err(format!("Invalid {kind} name: '{name}'"));
204 }
205 Ok(())
206}
207
208fn djb2_hex(s: &str) -> String {
210 let mut h: u64 = 5381;
211 for b in s.bytes() {
212 h = h.wrapping_mul(33).wrapping_add(b as u64);
213 }
214 format!("{h:016x}")
215}
216
217fn hash6(s: &str) -> String {
224 let hex = djb2_hex(s);
225 let start = hex.len().saturating_sub(6);
226 hex[start..].to_string()
227}
228
229fn stable_json(v: &Json) -> String {
231 let mut buf = String::new();
232 stable_json_into(v, &mut buf);
233 buf
234}
235fn stable_json_into(v: &Json, buf: &mut String) {
236 match v {
237 Json::Null => buf.push_str("null"),
238 Json::Bool(b) => buf.push_str(if *b { "true" } else { "false" }),
239 Json::Number(n) => buf.push_str(&n.to_string()),
240 Json::String(s) => {
241 buf.push('"');
242 buf.push_str(s);
243 buf.push('"');
244 }
245 Json::Array(a) => {
246 buf.push('[');
247 for (i, item) in a.iter().enumerate() {
248 if i > 0 {
249 buf.push(',');
250 }
251 stable_json_into(item, buf);
252 }
253 buf.push(']');
254 }
255 Json::Object(m) => {
256 let mut keys: Vec<&String> = m.keys().collect();
257 keys.sort();
258 buf.push('{');
259 for (i, k) in keys.iter().enumerate() {
260 if i > 0 {
261 buf.push(',');
262 }
263 buf.push('"');
264 buf.push_str(k);
265 buf.push_str("\":");
266 stable_json_into(&m[*k], buf);
267 }
268 buf.push('}');
269 }
270 }
271}
272
273fn short_model(id: &str) -> String {
276 if id.is_empty() {
277 return "model".into();
278 }
279 let stripped = id
281 .strip_prefix("claude-")
282 .or_else(|| id.strip_prefix("gpt-"))
283 .unwrap_or(id);
284 let s: String = stripped
286 .chars()
287 .filter(|c| c.is_ascii_alphanumeric())
288 .collect();
289 if s.is_empty() {
290 "model".into()
291 } else {
292 s
293 }
294}
295
296fn now_rfc3339() -> String {
298 let secs = std::time::SystemTime::now()
299 .duration_since(std::time::UNIX_EPOCH)
300 .map(|d| d.as_secs())
301 .unwrap_or(0) as i64;
302 let days = secs.div_euclid(86400);
303 let tod = secs.rem_euclid(86400);
304 let (y, mo, d) = civil_from_days(days);
305 let hh = tod / 3600;
306 let mm = (tod % 3600) / 60;
307 let ss = tod % 60;
308 format!("{y:04}-{mo:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
309}
310
311fn now_compact() -> String {
317 let secs = std::time::SystemTime::now()
318 .duration_since(std::time::UNIX_EPOCH)
319 .map(|d| d.as_secs())
320 .unwrap_or(0) as i64;
321 let days = secs.div_euclid(86400);
322 let tod = secs.rem_euclid(86400);
323 let (y, mo, d) = civil_from_days(days);
324 let hh = tod / 3600;
325 let mm = (tod % 3600) / 60;
326 let ss = tod % 60;
327 format!("{y:04}{mo:02}{d:02}T{hh:02}{mm:02}{ss:02}")
328}
329
330fn civil_from_days(z: i64) -> (i32, u32, u32) {
332 let z = z + 719468;
333 let era = if z >= 0 { z } else { z - 146096 } / 146097;
334 let doe = (z - era * 146097) as u64;
335 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
336 let y = yoe as i64 + era * 400;
337 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
338 let mp = (5 * doy + 2) / 153;
339 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
340 let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
341 let y = y + if m <= 2 { 1 } else { 0 };
342 (y as i32, m, d)
343}
344
345fn json_to_toml(v: Json) -> Result<toml::Value, String> {
348 Ok(match v {
349 Json::Null => return Err("TOML does not support null values".into()),
350 Json::Bool(b) => toml::Value::Boolean(b),
351 Json::Number(n) => {
352 if let Some(i) = n.as_i64() {
353 toml::Value::Integer(i)
354 } else if let Some(f) = n.as_f64() {
355 toml::Value::Float(f)
356 } else {
357 return Err(format!("Unsupported number: {n}"));
358 }
359 }
360 Json::String(s) => toml::Value::String(s),
361 Json::Array(a) => {
362 let mut out = Vec::with_capacity(a.len());
363 for item in a {
364 if !item.is_null() {
365 out.push(json_to_toml(item)?);
366 }
367 }
368 toml::Value::Array(out)
369 }
370 Json::Object(m) => {
371 let mut table = toml::map::Map::new();
372 for (k, val) in m {
373 if val.is_null() {
374 continue;
375 }
376 table.insert(k, json_to_toml(val)?);
377 }
378 toml::Value::Table(table)
379 }
380 })
381}
382
383fn toml_to_json(v: toml::Value) -> Json {
385 match v {
386 toml::Value::String(s) => Json::String(s),
387 toml::Value::Integer(i) => json!(i),
388 toml::Value::Float(f) => json!(f),
389 toml::Value::Boolean(b) => Json::Bool(b),
390 toml::Value::Datetime(dt) => Json::String(dt.to_string()),
391 toml::Value::Array(a) => Json::Array(a.into_iter().map(toml_to_json).collect()),
392 toml::Value::Table(t) => {
393 let mut m = serde_json::Map::new();
394 for (k, v) in t {
395 m.insert(k, toml_to_json(v));
396 }
397 Json::Object(m)
398 }
399 }
400}
401
402fn require_pkg_name(input: &Json) -> Result<String, String> {
404 let name = input
405 .get("pkg")
406 .and_then(|p| p.get("name"))
407 .and_then(|n| n.as_str())
408 .ok_or_else(|| "alc.card.create: pkg.name is required".to_string())?
409 .to_string();
410 validate_name(&name, "pkg")?;
411 Ok(name)
412}
413
414pub fn create_with_store(
416 store: &dyn CardStore,
417 mut input: Json,
418) -> Result<(String, PathBuf), String> {
419 if !input.is_object() {
420 return Err("alc.card.create: input must be a table".into());
421 }
422 let pkg_name = require_pkg_name(&input)?;
423 let obj = input.as_object_mut().unwrap();
424
425 obj.entry("schema_version".to_string())
427 .or_insert_with(|| json!(SCHEMA_VERSION));
428 obj.entry("created_at".to_string())
429 .or_insert_with(|| json!(now_rfc3339()));
430 obj.entry("created_by".to_string())
431 .or_insert_with(|| json!(format!("alc@{}", env!("CARGO_PKG_VERSION"))));
432
433 if let Some(params) = obj.get("params").cloned() {
435 if params.is_object() {
436 let fp = djb2_hex(&stable_json(¶ms));
437 obj.insert("param_fingerprint".to_string(), json!(fp));
438 }
439 }
440
441 let card_id = match obj.get("card_id").and_then(|v| v.as_str()) {
443 Some(id) if !id.is_empty() => id.to_string(),
444 _ => {
445 let model_id = obj
446 .get("model")
447 .and_then(|m| m.get("id"))
448 .and_then(|v| v.as_str())
449 .unwrap_or("");
450 let model_short = short_model(model_id);
451 let ts = now_compact();
452 let fp_seed = stable_json(&Json::Object(obj.clone()));
453 let h = hash6(&fp_seed);
454 format!("{pkg_name}_{model_short}_{ts}_{h}")
455 }
456 };
457 validate_name(&card_id, "card_id")?;
458 obj.insert("card_id".to_string(), json!(card_id.clone()));
459
460 let toml_val = json_to_toml(input)?;
461 let text = toml::to_string_pretty(&toml_val)
462 .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
463 let path = store.write_new_card(&pkg_name, &card_id, &text)?;
464
465 publish(CardEvent::Created {
466 pkg: pkg_name.clone(),
467 card_id: card_id.clone(),
468 toml_text: text,
469 });
470
471 Ok((card_id, path))
472}
473
474pub fn get_with_store(store: &dyn CardStore, card_id: &str) -> Result<Option<Json>, String> {
476 let text = match store.read_card_text(card_id)? {
477 Some(t) => t,
478 None => return Ok(None),
479 };
480 let val: toml::Value =
481 toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
482 Ok(Some(toml_to_json(val)))
483}
484
485#[derive(Debug, Clone)]
487pub struct Summary {
488 pub card_id: String,
489 pub pkg: String,
490 pub created_at: Option<String>,
491 pub model: Option<String>,
492 pub scenario: Option<String>,
493 pub pass_rate: Option<f64>,
494}
495
496impl Summary {
497 fn to_json(&self) -> Json {
498 let mut m = serde_json::Map::new();
499 m.insert("card_id".into(), json!(self.card_id));
500 m.insert("pkg".into(), json!(self.pkg));
501 if let Some(v) = &self.created_at {
502 m.insert("created_at".into(), json!(v));
503 }
504 if let Some(v) = &self.model {
505 m.insert("model".into(), json!(v));
506 }
507 if let Some(v) = &self.scenario {
508 m.insert("scenario".into(), json!(v));
509 }
510 if let Some(v) = self.pass_rate {
511 m.insert("pass_rate".into(), json!(v));
512 }
513 Json::Object(m)
514 }
515}
516
517fn summarize(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<Summary> {
518 let text = store.read_locator_text(locator).ok().flatten()?;
519 let val: toml::Value = toml::from_str(&text).ok()?;
520 let card_id = val
521 .get("card_id")
522 .and_then(|v| v.as_str())
523 .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
524 .to_string();
525 let created_at = val
526 .get("created_at")
527 .and_then(|v| v.as_str())
528 .map(String::from);
529 let model = val
530 .get("model")
531 .and_then(|m| m.get("id"))
532 .and_then(|v| v.as_str())
533 .map(String::from);
534 let scenario = val
535 .get("scenario")
536 .and_then(|s| s.get("name"))
537 .and_then(|v| v.as_str())
538 .map(String::from);
539 let pass_rate = val
540 .get("stats")
541 .and_then(|s| s.get("pass_rate"))
542 .and_then(|v| v.as_float());
543 Some(Summary {
544 card_id,
545 pkg: pkg.to_string(),
546 created_at,
547 model,
548 scenario,
549 pass_rate,
550 })
551}
552
553pub fn list_with_store(
555 store: &dyn CardStore,
556 pkg_filter: Option<&str>,
557) -> Result<Vec<Summary>, String> {
558 let locators = store.list_card_locators(pkg_filter)?;
559 let mut out = Vec::with_capacity(locators.len());
560 for (pkg, loc) in &locators {
561 if let Some(s) = summarize(store, loc, pkg) {
562 out.push(s);
563 }
564 }
565
566 out.sort_by(|a, b| {
570 b.created_at
571 .cmp(&a.created_at)
572 .then_with(|| b.card_id.cmp(&a.card_id))
573 });
574 Ok(out)
575}
576
577pub fn summaries_to_json(rows: &[Summary]) -> Json {
578 Json::Array(rows.iter().map(|s| s.to_json()).collect())
579}
580
581pub fn append_with_store(
594 store: &dyn CardStore,
595 card_id: &str,
596 fields: Json,
597) -> Result<Json, String> {
598 let text = store
599 .read_card_text(card_id)?
600 .ok_or_else(|| format!("alc.card.append: card '{card_id}' not found"))?;
601 let fields_obj = match fields {
602 Json::Object(m) => m,
603 _ => return Err("alc.card.append: fields must be a table".into()),
604 };
605
606 let existing: toml::Value =
607 toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
608 let mut existing_json = toml_to_json(existing);
609 let existing_obj = existing_json
610 .as_object_mut()
611 .ok_or_else(|| format!("Card '{card_id}' is not a table"))?;
612
613 for (k, v) in fields_obj {
614 if existing_obj.contains_key(&k) {
615 return Err(format!(
616 "alc.card.append: key '{k}' already set on card '{card_id}' (immutable)"
617 ));
618 }
619 if !v.is_null() {
620 existing_obj.insert(k, v);
621 }
622 }
623
624 let toml_val = json_to_toml(existing_json.clone())?;
625 let text = toml::to_string_pretty(&toml_val)
626 .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
627 store.overwrite_card(card_id, &text)?;
628
629 publish(CardEvent::Appended {
630 card_id: card_id.to_string(),
631 toml_text: text,
632 });
633
634 Ok(existing_json)
635}
636
637#[derive(Debug, Clone)]
638pub struct Alias {
639 pub name: String,
640 pub card_id: String,
641 pub pkg: Option<String>,
642 pub set_at: String,
643 pub note: Option<String>,
644}
645
646impl Alias {
647 fn to_json(&self) -> Json {
648 let mut m = serde_json::Map::new();
649 m.insert("name".into(), json!(self.name));
650 m.insert("card_id".into(), json!(self.card_id));
651 if let Some(p) = &self.pkg {
652 m.insert("pkg".into(), json!(p));
653 }
654 m.insert("set_at".into(), json!(self.set_at));
655 if let Some(n) = &self.note {
656 m.insert("note".into(), json!(n));
657 }
658 Json::Object(m)
659 }
660}
661
662pub fn alias_set_with_store(
668 store: &dyn CardStore,
669 name: &str,
670 card_id: &str,
671 pkg: Option<&str>,
672 note: Option<&str>,
673) -> Result<Alias, String> {
674 validate_name(name, "alias")?;
675 if store.find_card_locator(card_id)?.is_none() {
676 return Err(format!("alc.card.alias_set: card '{card_id}' not found"));
677 }
678 let mut aliases = store.read_aliases()?;
679 aliases.retain(|a| a.name != name);
680 let entry = Alias {
681 name: name.to_string(),
682 card_id: card_id.to_string(),
683 pkg: pkg.map(String::from),
684 set_at: now_rfc3339(),
685 note: note.map(String::from),
686 };
687 aliases.push(entry.clone());
688 store.write_aliases(&aliases)?;
689
690 match serialize_aliases_toml(&aliases) {
696 Ok(text) => publish(CardEvent::AliasesWritten { toml_text: text }),
697 Err(e) => tracing::warn!(error = %e, "alias_set: failed to serialize aliases for publish"),
698 }
699
700 Ok(entry)
701}
702
703fn serialize_aliases_toml(aliases: &[Alias]) -> Result<String, String> {
707 let mut arr = Vec::with_capacity(aliases.len());
708 for a in aliases {
709 let mut t = toml::map::Map::new();
710 t.insert("name".into(), toml::Value::String(a.name.clone()));
711 t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
712 if let Some(p) = &a.pkg {
713 t.insert("pkg".into(), toml::Value::String(p.clone()));
714 }
715 t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
716 if let Some(n) = &a.note {
717 t.insert("note".into(), toml::Value::String(n.clone()));
718 }
719 arr.push(toml::Value::Table(t));
720 }
721 let mut root = toml::map::Map::new();
722 root.insert("alias".into(), toml::Value::Array(arr));
723 toml::to_string_pretty(&toml::Value::Table(root))
724 .map_err(|e| format!("Failed to serialize aliases: {e}"))
725}
726
727pub fn get_by_alias_with_store(store: &dyn CardStore, name: &str) -> Result<Option<Json>, String> {
733 validate_name(name, "alias")?;
734 let aliases = store.read_aliases()?;
735 let Some(alias) = aliases.into_iter().find(|a| a.name == name) else {
736 return Ok(None);
737 };
738 match get_with_store(store, &alias.card_id)? {
739 Some(card) => Ok(Some(card)),
740 None => Err(format!(
741 "alc.card.get_by_alias: alias '{name}' points at missing card '{}'",
742 alias.card_id
743 )),
744 }
745}
746
747pub fn alias_list_with_store(
749 store: &dyn CardStore,
750 pkg_filter: Option<&str>,
751) -> Result<Vec<Alias>, String> {
752 let mut aliases = store.read_aliases()?;
753 if let Some(p) = pkg_filter {
754 aliases.retain(|a| a.pkg.as_deref() == Some(p));
755 }
756 Ok(aliases)
757}
758
759pub fn aliases_to_json(rows: &[Alias]) -> Json {
760 Json::Array(rows.iter().map(|a| a.to_json()).collect())
761}
762
763#[derive(Debug, Clone, PartialEq)]
797pub enum CmpOp {
798 Eq,
799 Ne,
800 Lt,
801 Lte,
802 Gt,
803 Gte,
804 In,
805 Nin,
806 Exists,
807 Contains,
808 StartsWith,
809}
810
811impl CmpOp {
812 fn from_key(k: &str) -> Option<Self> {
813 Some(match k {
814 "eq" => Self::Eq,
815 "ne" => Self::Ne,
816 "lt" => Self::Lt,
817 "lte" => Self::Lte,
818 "gt" => Self::Gt,
819 "gte" => Self::Gte,
820 "in" => Self::In,
821 "nin" => Self::Nin,
822 "exists" => Self::Exists,
823 "contains" => Self::Contains,
824 "starts_with" => Self::StartsWith,
825 _ => return None,
826 })
827 }
828}
829
830#[derive(Debug, Clone)]
833pub struct Comparison {
834 pub path: Vec<String>,
835 pub op: CmpOp,
836 pub value: Json,
837}
838
839#[derive(Debug, Clone)]
841pub enum Predicate {
842 And(Vec<Predicate>),
843 Or(Vec<Predicate>),
844 Not(Box<Predicate>),
845 Cmp(Comparison),
846}
847
848fn is_operator_object(obj: &serde_json::Map<String, Json>) -> bool {
851 if obj.is_empty() {
852 return false;
853 }
854 obj.keys().all(|k| CmpOp::from_key(k).is_some())
855}
856
857pub fn parse_where(value: &Json) -> Result<Predicate, String> {
862 parse_predicate(value, &[])
863}
864
865fn parse_predicate(value: &Json, prefix: &[String]) -> Result<Predicate, String> {
866 let obj = value
867 .as_object()
868 .ok_or_else(|| "where clause must be a table".to_string())?;
869
870 let mut clauses: Vec<Predicate> = Vec::new();
871
872 for (key, val) in obj {
873 match key.as_str() {
874 "_and" => {
875 let arr = val
876 .as_array()
877 .ok_or_else(|| "_and must be an array of sub-predicates".to_string())?;
878 let mut subs = Vec::with_capacity(arr.len());
879 for sub in arr {
880 subs.push(parse_predicate(sub, prefix)?);
881 }
882 clauses.push(Predicate::And(subs));
883 }
884 "_or" => {
885 let arr = val
886 .as_array()
887 .ok_or_else(|| "_or must be an array of sub-predicates".to_string())?;
888 let mut subs = Vec::with_capacity(arr.len());
889 for sub in arr {
890 subs.push(parse_predicate(sub, prefix)?);
891 }
892 clauses.push(Predicate::Or(subs));
893 }
894 "_not" => {
895 clauses.push(Predicate::Not(Box::new(parse_predicate(val, prefix)?)));
896 }
897 _ => {
898 let mut new_path = prefix.to_vec();
900 new_path.push(key.clone());
901
902 match val {
903 Json::Object(m) if is_operator_object(m) => {
904 for (op_key, op_val) in m {
906 let op = CmpOp::from_key(op_key).expect("validated above");
907 clauses.push(Predicate::Cmp(Comparison {
908 path: new_path.clone(),
909 op,
910 value: op_val.clone(),
911 }));
912 }
913 }
914 Json::Object(_) => {
915 clauses.push(parse_predicate(val, &new_path)?);
917 }
918 _ => {
919 clauses.push(Predicate::Cmp(Comparison {
921 path: new_path,
922 op: CmpOp::Eq,
923 value: val.clone(),
924 }));
925 }
926 }
927 }
928 }
929 }
930
931 if clauses.len() == 1 {
932 Ok(clauses.remove(0))
933 } else {
934 Ok(Predicate::And(clauses))
935 }
936}
937
938fn fetch_path<'a>(card: &'a Json, path: &[String]) -> Option<&'a Json> {
940 let mut node = card;
941 for key in path {
942 let obj = node.as_object()?;
943 node = obj.get(key)?;
944 }
945 Some(node)
946}
947
948fn json_cmp(a: &Json, b: &Json) -> Option<std::cmp::Ordering> {
951 match (a, b) {
952 (Json::Number(x), Json::Number(y)) => {
953 let xf = x.as_f64()?;
954 let yf = y.as_f64()?;
955 xf.partial_cmp(&yf)
956 }
957 (Json::String(x), Json::String(y)) => Some(x.cmp(y)),
958 (Json::Bool(x), Json::Bool(y)) => Some(x.cmp(y)),
959 _ => None,
960 }
961}
962
963fn json_eq(a: &Json, b: &Json) -> bool {
964 match (a, b) {
965 (Json::Number(x), Json::Number(y)) => match (x.as_f64(), y.as_f64()) {
966 (Some(xf), Some(yf)) => xf == yf,
967 _ => a == b,
968 },
969 _ => a == b,
970 }
971}
972
973fn eval_cmp(cmp: &Comparison, card: &Json) -> bool {
974 let actual = fetch_path(card, &cmp.path);
975 let exists = actual.is_some();
976
977 match cmp.op {
978 CmpOp::Exists => {
979 let want = cmp.value.as_bool().unwrap_or(true);
980 exists == want
981 }
982 CmpOp::Ne => match actual {
983 None => true,
984 Some(v) => !json_eq(v, &cmp.value),
985 },
986 CmpOp::Nin => match actual {
987 None => true,
988 Some(v) => match cmp.value.as_array() {
989 Some(arr) => !arr.iter().any(|e| json_eq(e, v)),
990 None => false,
991 },
992 },
993 CmpOp::Eq => actual.is_some_and(|v| json_eq(v, &cmp.value)),
994 CmpOp::In => actual.is_some_and(|v| match cmp.value.as_array() {
995 Some(arr) => arr.iter().any(|e| json_eq(e, v)),
996 None => false,
997 }),
998 CmpOp::Lt | CmpOp::Lte | CmpOp::Gt | CmpOp::Gte => {
999 let Some(v) = actual else { return false };
1000 let Some(ord) = json_cmp(v, &cmp.value) else {
1001 return false;
1002 };
1003 use std::cmp::Ordering::{Equal, Greater, Less};
1004 matches!(
1005 (&cmp.op, ord),
1006 (CmpOp::Lt, Less)
1007 | (CmpOp::Lte, Less | Equal)
1008 | (CmpOp::Gt, Greater)
1009 | (CmpOp::Gte, Greater | Equal)
1010 )
1011 }
1012 CmpOp::Contains => {
1013 let Some(Json::String(haystack)) = actual else {
1014 return false;
1015 };
1016 let Some(needle) = cmp.value.as_str() else {
1017 return false;
1018 };
1019 haystack.contains(needle)
1020 }
1021 CmpOp::StartsWith => {
1022 let Some(Json::String(haystack)) = actual else {
1023 return false;
1024 };
1025 let Some(needle) = cmp.value.as_str() else {
1026 return false;
1027 };
1028 haystack.starts_with(needle)
1029 }
1030 }
1031}
1032
1033pub fn eval_predicate(pred: &Predicate, card: &Json) -> bool {
1035 match pred {
1036 Predicate::And(subs) => subs.iter().all(|p| eval_predicate(p, card)),
1037 Predicate::Or(subs) => subs.iter().any(|p| eval_predicate(p, card)),
1038 Predicate::Not(sub) => !eval_predicate(sub, card),
1039 Predicate::Cmp(c) => eval_cmp(c, card),
1040 }
1041}
1042
1043#[derive(Debug, Clone)]
1049pub struct OrderKey {
1050 pub path: Vec<String>,
1051 pub desc: bool,
1052}
1053
1054impl OrderKey {
1055 fn parse(raw: &str) -> Result<Self, String> {
1056 if raw.is_empty() {
1057 return Err("order_by key must not be empty".into());
1058 }
1059 let (desc, rest) = if let Some(r) = raw.strip_prefix('-') {
1060 (true, r)
1061 } else {
1062 (false, raw)
1063 };
1064 let path: Vec<String> = rest.split('.').map(|s| s.to_string()).collect();
1065 if path.iter().any(|p| p.is_empty()) {
1066 return Err(format!("invalid order_by key: '{raw}'"));
1067 }
1068 Ok(Self { path, desc })
1069 }
1070}
1071
1072pub fn parse_order_by(value: &Json) -> Result<Vec<OrderKey>, String> {
1076 match value {
1077 Json::String(s) => Ok(vec![OrderKey::parse(s)?]),
1078 Json::Array(arr) => {
1079 let mut out = Vec::with_capacity(arr.len());
1080 for v in arr {
1081 let s = v
1082 .as_str()
1083 .ok_or_else(|| "order_by array must contain strings".to_string())?;
1084 out.push(OrderKey::parse(s)?);
1085 }
1086 Ok(out)
1087 }
1088 _ => Err("order_by must be a string or array of strings".into()),
1089 }
1090}
1091
1092#[derive(Debug, Default, Clone)]
1094pub struct FindQuery {
1095 pub pkg: Option<String>,
1097 pub where_: Option<Predicate>,
1099 pub order_by: Vec<OrderKey>,
1101 pub limit: Option<usize>,
1102 pub offset: Option<usize>,
1103}
1104
1105#[derive(Debug, Clone)]
1111struct CardRow {
1112 full: Json,
1113 summary: Summary,
1114}
1115
1116fn load_full(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<CardRow> {
1118 let text = store.read_locator_text(locator).ok().flatten()?;
1119 let val: toml::Value = toml::from_str(&text).ok()?;
1120 let json = toml_to_json(val);
1121
1122 let card_id = json
1123 .get("card_id")
1124 .and_then(|v| v.as_str())
1125 .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
1126 .to_string();
1127 let created_at = json
1128 .get("created_at")
1129 .and_then(|v| v.as_str())
1130 .map(String::from);
1131 let model = json
1132 .get("model")
1133 .and_then(|m| m.get("id"))
1134 .and_then(|v| v.as_str())
1135 .map(String::from);
1136 let scenario = json
1137 .get("scenario")
1138 .and_then(|s| s.get("name"))
1139 .and_then(|v| v.as_str())
1140 .map(String::from);
1141 let pass_rate = json
1142 .get("stats")
1143 .and_then(|s| s.get("pass_rate"))
1144 .and_then(|v| v.as_f64());
1145
1146 Some(CardRow {
1147 full: json,
1148 summary: Summary {
1149 card_id,
1150 pkg: pkg.to_string(),
1151 created_at,
1152 model,
1153 scenario,
1154 pass_rate,
1155 },
1156 })
1157}
1158
1159fn order_cards(a: &CardRow, b: &CardRow, keys: &[OrderKey]) -> std::cmp::Ordering {
1161 use std::cmp::Ordering;
1162 for k in keys {
1163 let va = fetch_path(&a.full, &k.path);
1164 let vb = fetch_path(&b.full, &k.path);
1165 let ord = match (va, vb) {
1166 (None, None) => Ordering::Equal,
1167 (None, Some(_)) => Ordering::Greater, (Some(_), None) => Ordering::Less,
1169 (Some(x), Some(y)) => json_cmp(x, y).unwrap_or(Ordering::Equal),
1170 };
1171 let ord = if k.desc { ord.reverse() } else { ord };
1172 if ord != Ordering::Equal {
1173 return ord;
1174 }
1175 }
1176 Ordering::Equal
1177}
1178
1179const SUMMARY_SORT_FIELDS: &[&str] = &[
1181 "card_id",
1182 "created_at",
1183 "stats.pass_rate",
1184 "scenario.name",
1185 "model.id",
1186];
1187
1188fn is_lightweight_query(q: &FindQuery) -> bool {
1191 q.where_.is_none()
1192 && q.order_by
1193 .iter()
1194 .all(|k| SUMMARY_SORT_FIELDS.contains(&k.path.join(".").as_str()))
1195}
1196
1197fn order_summaries(a: &Summary, b: &Summary, keys: &[OrderKey]) -> std::cmp::Ordering {
1199 use std::cmp::Ordering;
1200 for k in keys {
1201 let key_str = k.path.join(".");
1202 let ord = match key_str.as_str() {
1203 "card_id" => a.card_id.cmp(&b.card_id),
1204 "created_at" => a.created_at.cmp(&b.created_at),
1205 "stats.pass_rate" => match (a.pass_rate, b.pass_rate) {
1206 (None, None) => Ordering::Equal,
1207 (None, Some(_)) => Ordering::Greater,
1208 (Some(_), None) => Ordering::Less,
1209 (Some(x), Some(y)) => x.partial_cmp(&y).unwrap_or(Ordering::Equal),
1210 },
1211 "scenario.name" => a.scenario.cmp(&b.scenario),
1212 "model.id" => a.model.cmp(&b.model),
1213 _ => Ordering::Equal,
1214 };
1215 let ord = if k.desc { ord.reverse() } else { ord };
1216 if ord != Ordering::Equal {
1217 return ord;
1218 }
1219 }
1220 Ordering::Equal
1221}
1222
1223pub fn find_with_store(store: &dyn CardStore, q: FindQuery) -> Result<Vec<Summary>, String> {
1229 if is_lightweight_query(&q) {
1231 let mut rows = list_with_store(store, q.pkg.as_deref())?;
1232 if q.order_by.is_empty() {
1233 rows.sort_by(|a, b| {
1234 b.created_at
1235 .cmp(&a.created_at)
1236 .then_with(|| b.card_id.cmp(&a.card_id))
1237 });
1238 } else {
1239 rows.sort_by(|a, b| order_summaries(a, b, &q.order_by));
1240 }
1241 let out: Vec<Summary> = rows
1242 .into_iter()
1243 .skip(q.offset.unwrap_or(0))
1244 .take(q.limit.unwrap_or(usize::MAX))
1245 .collect();
1246 return Ok(out);
1247 }
1248
1249 let all_rows = scan_cards(store, q.pkg.as_deref())?;
1251
1252 let mut rows: Vec<CardRow> = if let Some(pred) = &q.where_ {
1254 all_rows
1255 .into_iter()
1256 .filter(|row| eval_predicate(pred, &row.full))
1257 .collect()
1258 } else {
1259 all_rows
1260 };
1261
1262 if q.order_by.is_empty() {
1264 rows.sort_by(|a, b| {
1265 b.summary
1266 .created_at
1267 .cmp(&a.summary.created_at)
1268 .then_with(|| b.summary.card_id.cmp(&a.summary.card_id))
1269 });
1270 } else {
1271 rows.sort_by(|a, b| order_cards(a, b, &q.order_by));
1272 }
1273
1274 let out: Vec<Summary> = rows
1276 .into_iter()
1277 .skip(q.offset.unwrap_or(0))
1278 .take(q.limit.unwrap_or(usize::MAX))
1279 .map(|r| r.summary)
1280 .collect();
1281
1282 Ok(out)
1283}
1284
1285#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1301pub enum LineageDirection {
1302 #[default]
1303 Up,
1304 Down,
1305 Both,
1306}
1307
1308impl LineageDirection {
1309 pub fn parse(s: &str) -> Result<Self, String> {
1310 match s {
1311 "up" => Ok(Self::Up),
1312 "down" => Ok(Self::Down),
1313 "both" => Ok(Self::Both),
1314 other => Err(format!(
1315 "direction must be 'up', 'down', or 'both' (got '{other}')"
1316 )),
1317 }
1318 }
1319}
1320
1321#[derive(Debug, Clone, Default)]
1323pub struct LineageQuery {
1324 pub card_id: String,
1325 pub direction: LineageDirection,
1326 pub depth: Option<usize>,
1328 pub include_stats: bool,
1330 pub relation_filter: Option<Vec<String>>,
1333}
1334
1335#[derive(Debug, Clone)]
1340pub struct LineageNode {
1341 pub card_id: String,
1342 pub pkg: String,
1343 pub prior_card_id: Option<String>,
1344 pub prior_relation: Option<String>,
1345 pub depth: i32,
1346 pub stats: Option<Json>,
1347}
1348
1349#[derive(Debug, Clone)]
1351pub struct LineageEdge {
1352 pub from: String,
1353 pub to: String,
1354 pub relation: Option<String>,
1355}
1356
1357#[derive(Debug, Clone)]
1359pub struct LineageResult {
1360 pub root: String,
1361 pub nodes: Vec<LineageNode>,
1362 pub edges: Vec<LineageEdge>,
1363 pub truncated: bool,
1364}
1365
1366const DEFAULT_LINEAGE_DEPTH: usize = 10;
1367
1368fn lineage_fields(card: &Json) -> (Option<String>, Option<String>) {
1371 let meta = card.get("metadata");
1372 let prior_card_id = meta
1373 .and_then(|m| m.get("prior_card_id"))
1374 .and_then(|v| v.as_str())
1375 .map(String::from);
1376 let prior_relation = meta
1377 .and_then(|m| m.get("prior_relation"))
1378 .and_then(|v| v.as_str())
1379 .map(String::from);
1380 (prior_card_id, prior_relation)
1381}
1382
1383fn make_node(row: &CardRow, depth: i32, include_stats: bool) -> LineageNode {
1385 let (prior_card_id, prior_relation) = lineage_fields(&row.full);
1386 let stats = if include_stats {
1387 row.full.get("stats").cloned()
1388 } else {
1389 None
1390 };
1391 LineageNode {
1392 card_id: row.summary.card_id.clone(),
1393 pkg: row.summary.pkg.clone(),
1394 prior_card_id,
1395 prior_relation,
1396 depth,
1397 stats,
1398 }
1399}
1400
1401fn relation_passes(filter: &Option<Vec<String>>, relation: &Option<String>) -> bool {
1404 match filter {
1405 None => true,
1406 Some(allowed) => match relation {
1407 Some(r) => allowed.iter().any(|a| a == r),
1408 None => false,
1409 },
1410 }
1411}
1412
1413struct CardIndex {
1415 cards: std::collections::HashMap<String, CardRow>,
1417 children: std::collections::HashMap<String, Vec<String>>,
1419}
1420
1421fn load_card_index(store: &dyn CardStore) -> Result<CardIndex, String> {
1425 let rows = scan_cards(store, None)?;
1426
1427 let mut cards = std::collections::HashMap::with_capacity(rows.len());
1428 let mut children: std::collections::HashMap<String, Vec<String>> =
1429 std::collections::HashMap::new();
1430
1431 for row in rows {
1432 let id = row.summary.card_id.clone();
1433 let (prior_id, _) = lineage_fields(&row.full);
1434 if let Some(parent) = prior_id {
1435 children.entry(parent).or_default().push(id.clone());
1436 }
1437 cards.insert(id, row);
1438 }
1439 Ok(CardIndex { cards, children })
1440}
1441
1442fn scan_cards(store: &dyn CardStore, pkg_filter: Option<&str>) -> Result<Vec<CardRow>, String> {
1446 let locators = store.list_card_locators(pkg_filter)?;
1447 let mut rows = Vec::with_capacity(locators.len());
1448 for (pkg, loc) in &locators {
1449 if let Some(row) = load_full(store, loc, pkg) {
1450 rows.push(row);
1451 }
1452 }
1453 Ok(rows)
1454}
1455
1456struct LineageCtx<'a> {
1458 index: &'a CardIndex,
1459 relation_filter: &'a Option<Vec<String>>,
1460 include_stats: bool,
1461 max_depth: usize,
1462}
1463
1464struct LineageAccum {
1466 nodes: Vec<LineageNode>,
1467 edges: Vec<LineageEdge>,
1468 visited: std::collections::HashSet<String>,
1469 truncated: bool,
1470}
1471
1472fn walk_up(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1474 let mut cur = start_id.to_string();
1475 for step in 1..=ctx.max_depth {
1476 let Some(row) = ctx.index.cards.get(&cur) else {
1477 return;
1478 };
1479 let (prior_id, prior_rel) = lineage_fields(&row.full);
1480 let Some(prior_id) = prior_id else {
1481 return;
1482 };
1483 if !relation_passes(ctx.relation_filter, &prior_rel) {
1484 return;
1485 }
1486 if acc.visited.contains(&prior_id) {
1487 return;
1488 }
1489 let Some(parent) = ctx.index.cards.get(&prior_id) else {
1490 return;
1491 };
1492 acc.nodes
1493 .push(make_node(parent, -(step as i32), ctx.include_stats));
1494 acc.edges.push(LineageEdge {
1495 from: row.summary.card_id.clone(),
1496 to: parent.summary.card_id.clone(),
1497 relation: prior_rel,
1498 });
1499 acc.visited.insert(prior_id.clone());
1500 cur = prior_id;
1501 }
1502 if let Some(row) = ctx.index.cards.get(&cur) {
1504 let (prior_id, _) = lineage_fields(&row.full);
1505 if prior_id
1506 .as_ref()
1507 .is_some_and(|p| ctx.index.cards.contains_key(p) && !acc.visited.contains(p))
1508 {
1509 acc.truncated = true;
1510 }
1511 }
1512}
1513
1514fn walk_down(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1517 let mut frontier: Vec<String> = vec![start_id.to_string()];
1518
1519 for depth in 1..=ctx.max_depth {
1520 let mut next_frontier: Vec<String> = Vec::new();
1521 for parent_id in &frontier {
1522 let children = match ctx.index.children.get(parent_id) {
1523 Some(c) => c,
1524 None => continue,
1525 };
1526 for child_id in children {
1527 if acc.visited.contains(child_id) {
1528 continue;
1529 }
1530 let Some(child) = ctx.index.cards.get(child_id) else {
1531 continue;
1532 };
1533 let (_, prior_rel) = lineage_fields(&child.full);
1534 if !relation_passes(ctx.relation_filter, &prior_rel) {
1535 continue;
1536 }
1537 acc.nodes
1538 .push(make_node(child, depth as i32, ctx.include_stats));
1539 acc.edges.push(LineageEdge {
1540 from: child.summary.card_id.clone(),
1541 to: parent_id.clone(),
1542 relation: prior_rel,
1543 });
1544 acc.visited.insert(child_id.clone());
1545 next_frontier.push(child_id.clone());
1546 }
1547 }
1548 if next_frontier.is_empty() {
1549 return;
1550 }
1551 frontier = next_frontier;
1552 }
1553 for parent_id in &frontier {
1556 let children = match ctx.index.children.get(parent_id) {
1557 Some(c) => c,
1558 None => continue,
1559 };
1560 for child_id in children {
1561 if acc.visited.contains(child_id) {
1562 continue;
1563 }
1564 let Some(child) = ctx.index.cards.get(child_id) else {
1565 continue;
1566 };
1567 let (_, prior_rel) = lineage_fields(&child.full);
1568 if relation_passes(ctx.relation_filter, &prior_rel) {
1569 acc.truncated = true;
1570 return;
1571 }
1572 }
1573 }
1574}
1575
1576pub fn lineage_with_store(
1578 store: &dyn CardStore,
1579 q: LineageQuery,
1580) -> Result<Option<LineageResult>, String> {
1581 let index = load_card_index(store)?;
1582 let Some(root_row) = index.cards.get(&q.card_id) else {
1583 return Ok(None);
1584 };
1585
1586 let ctx = LineageCtx {
1587 index: &index,
1588 relation_filter: &q.relation_filter,
1589 include_stats: q.include_stats,
1590 max_depth: q.depth.unwrap_or(DEFAULT_LINEAGE_DEPTH),
1591 };
1592 let mut acc = LineageAccum {
1593 nodes: Vec::new(),
1594 edges: Vec::new(),
1595 visited: std::collections::HashSet::new(),
1596 truncated: false,
1597 };
1598
1599 acc.nodes.push(make_node(root_row, 0, q.include_stats));
1600 acc.visited.insert(q.card_id.clone());
1601
1602 if matches!(q.direction, LineageDirection::Up | LineageDirection::Both) {
1603 walk_up(&q.card_id, &ctx, &mut acc);
1604 }
1605 if matches!(q.direction, LineageDirection::Down | LineageDirection::Both) {
1606 walk_down(&q.card_id, &ctx, &mut acc);
1607 }
1608
1609 Ok(Some(LineageResult {
1610 root: q.card_id,
1611 nodes: acc.nodes,
1612 edges: acc.edges,
1613 truncated: acc.truncated,
1614 }))
1615}
1616
1617pub fn lineage_to_json(r: &LineageResult) -> Json {
1619 let nodes: Vec<Json> = r
1620 .nodes
1621 .iter()
1622 .map(|n| {
1623 let mut m = serde_json::Map::new();
1624 m.insert("card_id".into(), json!(n.card_id));
1625 m.insert("pkg".into(), json!(n.pkg));
1626 m.insert("depth".into(), json!(n.depth));
1627 if let Some(p) = &n.prior_card_id {
1628 m.insert("prior_card_id".into(), json!(p));
1629 }
1630 if let Some(rel) = &n.prior_relation {
1631 m.insert("prior_relation".into(), json!(rel));
1632 }
1633 if let Some(s) = &n.stats {
1634 m.insert("stats".into(), s.clone());
1635 }
1636 Json::Object(m)
1637 })
1638 .collect();
1639 let edges: Vec<Json> = r
1640 .edges
1641 .iter()
1642 .map(|e| {
1643 let mut m = serde_json::Map::new();
1644 m.insert("from".into(), json!(e.from));
1645 m.insert("to".into(), json!(e.to));
1646 if let Some(rel) = &e.relation {
1647 m.insert("relation".into(), json!(rel));
1648 }
1649 Json::Object(m)
1650 })
1651 .collect();
1652 json!({
1653 "root": r.root,
1654 "nodes": nodes,
1655 "edges": edges,
1656 "truncated": r.truncated,
1657 })
1658}
1659
1660pub fn import_from_dir_with_store(
1680 store: &dyn CardStore,
1681 source_dir: &std::path::Path,
1682 pkg: &str,
1683) -> Result<(Vec<String>, Vec<String>), String> {
1684 let (imported, skipped) = store.import_from_dir(source_dir, pkg)?;
1685 for card_id in &imported {
1686 match store.read_card_text(card_id) {
1687 Ok(Some(toml_text)) => publish(CardEvent::Created {
1688 pkg: pkg.to_string(),
1689 card_id: card_id.clone(),
1690 toml_text,
1691 }),
1692 Ok(None) => {
1693 tracing::warn!(
1694 card_id = %card_id,
1695 "import_from_dir: read_card_text returned None after import; skipping publish"
1696 );
1697 }
1698 Err(e) => {
1699 tracing::warn!(
1700 card_id = %card_id,
1701 error = %e,
1702 "import_from_dir: read_card_text failed after import; skipping publish"
1703 );
1704 }
1705 }
1706 match store.read_samples_text(card_id) {
1708 Ok(Some(jsonl_text)) => publish(CardEvent::SamplesWritten {
1709 card_id: card_id.clone(),
1710 jsonl_text,
1711 }),
1712 Ok(None) => {}
1713 Err(e) => {
1714 tracing::warn!(
1715 card_id = %card_id,
1716 error = %e,
1717 "import_from_dir: read_samples_text failed after import; skipping publish"
1718 );
1719 }
1720 }
1721 }
1722 Ok((imported, skipped))
1723}
1724
1725pub fn write_samples_with_store(
1731 store: &dyn CardStore,
1732 card_id: &str,
1733 samples: Vec<Json>,
1734) -> Result<PathBuf, String> {
1735 if store.samples_exists(card_id)? {
1736 return Err(format!(
1737 "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
1738 ));
1739 }
1740 let mut buf = String::new();
1741 for (idx, s) in samples.iter().enumerate() {
1742 let line = serde_json::to_string(s).map_err(|e| {
1743 format!("alc.card.write_samples: failed to serialize sample #{idx}: {e}")
1744 })?;
1745 buf.push_str(&line);
1746 buf.push('\n');
1747 }
1748 let path = store.write_samples_text(card_id, &buf)?;
1749
1750 publish(CardEvent::SamplesWritten {
1751 card_id: card_id.to_string(),
1752 jsonl_text: buf,
1753 });
1754
1755 Ok(path)
1756}
1757
1758#[derive(Debug, Default, Clone)]
1760pub struct SamplesQuery {
1761 pub offset: usize,
1763 pub limit: Option<usize>,
1765 pub where_: Option<Predicate>,
1768}
1769
1770pub fn read_samples_with_store(
1780 store: &dyn CardStore,
1781 card_id: &str,
1782 q: SamplesQuery,
1783) -> Result<Vec<Json>, String> {
1784 let text = match store.read_samples_text(card_id)? {
1785 Some(t) => t,
1786 None => return Ok(Vec::new()),
1787 };
1788 let mut matched: usize = 0;
1789 let mut out = Vec::new();
1790 for (i, line) in text.lines().enumerate() {
1791 if line.trim().is_empty() {
1792 continue;
1793 }
1794 let val: Json = serde_json::from_str(line)
1795 .map_err(|e| format!("Failed to parse sample line {i}: {e}"))?;
1796 if let Some(pred) = &q.where_ {
1797 if !eval_predicate(pred, &val) {
1798 continue;
1799 }
1800 }
1801 if matched < q.offset {
1802 matched += 1;
1803 continue;
1804 }
1805 if let Some(lim) = q.limit {
1806 if out.len() >= lim {
1807 break;
1808 }
1809 }
1810 matched += 1;
1811 out.push(val);
1812 }
1813 Ok(out)
1814}
1815
1816pub struct FileCardStore {
1830 root: PathBuf,
1831}
1832
1833impl FileCardStore {
1834 pub fn new(root: PathBuf) -> Self {
1836 Self { root }
1837 }
1838
1839 pub fn root(&self) -> &Path {
1841 &self.root
1842 }
1843
1844 pub fn create(&self, input: Json) -> Result<(String, PathBuf), String> {
1852 create_with_store(self, input)
1853 }
1854
1855 pub fn get(&self, card_id: &str) -> Result<Option<Json>, String> {
1856 get_with_store(self, card_id)
1857 }
1858
1859 pub fn list(&self, pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
1860 list_with_store(self, pkg_filter)
1861 }
1862
1863 pub fn append(&self, card_id: &str, fields: Json) -> Result<Json, String> {
1864 append_with_store(self, card_id, fields)
1865 }
1866
1867 pub fn alias_set(
1868 &self,
1869 name: &str,
1870 card_id: &str,
1871 pkg: Option<&str>,
1872 note: Option<&str>,
1873 ) -> Result<Alias, String> {
1874 alias_set_with_store(self, name, card_id, pkg, note)
1875 }
1876
1877 pub fn alias_list(&self, pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
1878 alias_list_with_store(self, pkg_filter)
1879 }
1880
1881 pub fn get_by_alias(&self, name: &str) -> Result<Option<Json>, String> {
1882 get_by_alias_with_store(self, name)
1883 }
1884
1885 pub fn find(&self, q: FindQuery) -> Result<Vec<Summary>, String> {
1886 find_with_store(self, q)
1887 }
1888
1889 pub fn write_samples(&self, card_id: &str, samples: Vec<Json>) -> Result<PathBuf, String> {
1890 write_samples_with_store(self, card_id, samples)
1891 }
1892
1893 pub fn read_samples(&self, card_id: &str, q: SamplesQuery) -> Result<Vec<Json>, String> {
1894 read_samples_with_store(self, card_id, q)
1895 }
1896
1897 pub fn lineage(&self, q: LineageQuery) -> Result<Option<LineageResult>, String> {
1898 lineage_with_store(self, q)
1899 }
1900
1901 pub fn card_sink_backfill(
1902 &self,
1903 sink: &str,
1904 dry_run: bool,
1905 ) -> Result<SinkBackfillReport, String> {
1906 card_sink_backfill_with_store(self, sink, dry_run)
1907 }
1908
1909 fn pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
1913 validate_name(pkg, "pkg")?;
1914 let dir = self.root.join(pkg);
1915 if !dir.exists() {
1916 fs::create_dir_all(&dir).map_err(|e| format!("Failed to create pkg dir: {e}"))?;
1917 }
1918 Ok(dir)
1919 }
1920
1921 fn aliases_path(&self) -> PathBuf {
1923 self.root.join("_aliases.toml")
1924 }
1925
1926 fn samples_path(&self, card_id: &str) -> Result<PathBuf, String> {
1930 let card_path = self
1931 .find_card_locator(card_id)?
1932 .ok_or_else(|| format!("card '{card_id}' not found"))?;
1933 let dir = card_path
1934 .parent()
1935 .ok_or_else(|| format!("card '{card_id}' has no parent directory"))?;
1936 Ok(dir.join(format!("{card_id}.samples.jsonl")))
1937 }
1938}
1939
1940impl CardStore for FileCardStore {
1941 fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1942 let dir = self.pkg_dir(pkg)?;
1943 let path = dir.join(format!("{card_id}.toml"));
1944 if path.exists() {
1945 return Err(format!(
1946 "alc.card.create: card '{card_id}' already exists (immutable)"
1947 ));
1948 }
1949 let tmp = path.with_extension("toml.tmp");
1950 fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1951 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1952 Ok(path)
1953 }
1954
1955 fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1956 let path = self
1957 .find_card_locator(card_id)?
1958 .ok_or_else(|| format!("alc.card.overwrite: card '{card_id}' not found"))?;
1959 let tmp = path.with_extension("toml.tmp");
1960 fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1961 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1962 Ok(path)
1963 }
1964
1965 fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
1966 validate_name(card_id, "card_id")?;
1967 if !self.root.exists() {
1968 return Ok(None);
1969 }
1970 let entries =
1971 fs::read_dir(&self.root).map_err(|e| format!("Failed to read cards dir: {e}"))?;
1972 for entry in entries.flatten() {
1973 let p = entry.path();
1974 if p.is_dir() {
1975 let candidate = p.join(format!("{card_id}.toml"));
1976 if candidate.exists() {
1977 return Ok(Some(candidate));
1978 }
1979 }
1980 }
1981 Ok(None)
1982 }
1983
1984 fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String> {
1985 let Some(path) = self.find_card_locator(card_id)? else {
1986 return Ok(None);
1987 };
1988 let text = fs::read_to_string(&path)
1989 .map_err(|e| format!("Failed to read card '{card_id}': {e}"))?;
1990 Ok(Some(text))
1991 }
1992
1993 fn list_card_locators(
1994 &self,
1995 pkg_filter: Option<&str>,
1996 ) -> Result<Vec<(String, PathBuf)>, String> {
1997 if !self.root.exists() {
1998 return Ok(Vec::new());
1999 }
2000 let pkg_dirs: Vec<PathBuf> = if let Some(p) = pkg_filter {
2001 validate_name(p, "pkg")?;
2002 let d = self.root.join(p);
2003 if d.is_dir() {
2004 vec![d]
2005 } else {
2006 return Ok(Vec::new());
2007 }
2008 } else {
2009 fs::read_dir(&self.root)
2010 .map_err(|e| format!("Failed to read cards dir: {e}"))?
2011 .flatten()
2012 .map(|e| e.path())
2013 .filter(|p| p.is_dir())
2014 .collect()
2015 };
2016
2017 let mut out = Vec::new();
2018 for pdir in pkg_dirs {
2019 let pkg = pdir
2020 .file_name()
2021 .and_then(|s| s.to_str())
2022 .unwrap_or("")
2023 .to_string();
2024 let entries = match fs::read_dir(&pdir) {
2025 Ok(e) => e,
2026 Err(_) => continue,
2027 };
2028 for entry in entries.flatten() {
2029 let p = entry.path();
2030 if p.extension().and_then(|s| s.to_str()) != Some("toml") {
2031 continue;
2032 }
2033 out.push((pkg.clone(), p));
2034 }
2035 }
2036 Ok(out)
2037 }
2038
2039 fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String> {
2040 match fs::read_to_string(locator) {
2041 Ok(text) => Ok(Some(text)),
2042 Err(_) => Ok(None),
2043 }
2044 }
2045
2046 fn read_aliases(&self) -> Result<Vec<Alias>, String> {
2047 let path = self.aliases_path();
2048 if !path.exists() {
2049 return Ok(Vec::new());
2050 }
2051 let text =
2052 fs::read_to_string(&path).map_err(|e| format!("Failed to read aliases file: {e}"))?;
2053 let val: toml::Value =
2054 toml::from_str(&text).map_err(|e| format!("Failed to parse aliases file: {e}"))?;
2055 let arr = val
2056 .get("alias")
2057 .and_then(|v| v.as_array())
2058 .cloned()
2059 .unwrap_or_default();
2060 let mut out = Vec::with_capacity(arr.len());
2061 for entry in arr {
2062 let t = match entry {
2063 toml::Value::Table(t) => t,
2064 _ => continue,
2065 };
2066 let name = match t.get("name").and_then(|v| v.as_str()) {
2067 Some(s) => s.to_string(),
2068 None => continue,
2069 };
2070 let card_id = match t.get("card_id").and_then(|v| v.as_str()) {
2071 Some(s) => s.to_string(),
2072 None => continue,
2073 };
2074 out.push(Alias {
2075 name,
2076 card_id,
2077 pkg: t.get("pkg").and_then(|v| v.as_str()).map(String::from),
2078 set_at: t
2079 .get("set_at")
2080 .and_then(|v| v.as_str())
2081 .map(String::from)
2082 .unwrap_or_default(),
2083 note: t.get("note").and_then(|v| v.as_str()).map(String::from),
2084 });
2085 }
2086 Ok(out)
2087 }
2088
2089 fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String> {
2090 if !self.root.exists() {
2093 fs::create_dir_all(&self.root)
2094 .map_err(|e| format!("Failed to create cards dir: {e}"))?;
2095 }
2096 let path = self.aliases_path();
2097 let mut arr = Vec::with_capacity(aliases.len());
2098 for a in aliases {
2099 let mut t = toml::map::Map::new();
2100 t.insert("name".into(), toml::Value::String(a.name.clone()));
2101 t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
2102 if let Some(p) = &a.pkg {
2103 t.insert("pkg".into(), toml::Value::String(p.clone()));
2104 }
2105 t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
2106 if let Some(n) = &a.note {
2107 t.insert("note".into(), toml::Value::String(n.clone()));
2108 }
2109 arr.push(toml::Value::Table(t));
2110 }
2111 let mut root = toml::map::Map::new();
2112 root.insert("alias".into(), toml::Value::Array(arr));
2113 let text = toml::to_string_pretty(&toml::Value::Table(root))
2114 .map_err(|e| format!("Failed to serialize aliases: {e}"))?;
2115 let tmp = path.with_extension("toml.tmp");
2116 fs::write(&tmp, &text).map_err(|e| format!("Failed to write aliases tmp: {e}"))?;
2117 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename aliases file: {e}"))?;
2118 Ok(())
2119 }
2120
2121 fn samples_exists(&self, card_id: &str) -> Result<bool, String> {
2122 let path = self.samples_path(card_id)?;
2123 Ok(path.exists())
2124 }
2125
2126 fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String> {
2127 let path = self.samples_path(card_id)?;
2128 if path.exists() {
2129 return Err(format!(
2130 "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
2131 ));
2132 }
2133 let tmp = path.with_extension("jsonl.tmp");
2134 fs::write(&tmp, jsonl_text).map_err(|e| format!("Failed to write samples tmp: {e}"))?;
2135 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename samples file: {e}"))?;
2136 Ok(path)
2137 }
2138
2139 fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String> {
2140 let path = self.samples_path(card_id)?;
2141 if !path.exists() {
2142 return Ok(None);
2143 }
2144 let text =
2145 fs::read_to_string(&path).map_err(|e| format!("Failed to read samples file: {e}"))?;
2146 Ok(Some(text))
2147 }
2148
2149 fn import_from_dir(
2150 &self,
2151 source_dir: &Path,
2152 pkg: &str,
2153 ) -> Result<(Vec<String>, Vec<String>), String> {
2154 validate_name(pkg, "pkg")?;
2155 let dest = self.pkg_dir(pkg)?;
2156 let mut imported = Vec::new();
2157 let mut skipped = Vec::new();
2158
2159 let entries =
2160 fs::read_dir(source_dir).map_err(|e| format!("Failed to read card source dir: {e}"))?;
2161
2162 for entry in entries.flatten() {
2163 let path = entry.path();
2164 let fname = match path.file_name().and_then(|n| n.to_str()) {
2165 Some(n) => n.to_string(),
2166 None => continue,
2167 };
2168
2169 if !fname.ends_with(".toml") {
2170 continue;
2171 }
2172
2173 let card_id = fname.trim_end_matches(".toml");
2174 let dest_toml = dest.join(&fname);
2175
2176 if dest_toml.exists() {
2177 skipped.push(card_id.to_string());
2178 continue;
2179 }
2180
2181 let text = fs::read_to_string(&path)
2182 .map_err(|e| format!("Failed to read card file '{fname}': {e}"))?;
2183 let val: toml::Value = toml::from_str(&text)
2184 .map_err(|e| format!("Failed to parse card file '{fname}': {e}"))?;
2185 if val.get("schema_version").and_then(|v| v.as_str()) != Some(SCHEMA_VERSION) {
2186 continue;
2187 }
2188
2189 fs::copy(&path, &dest_toml)
2190 .map_err(|e| format!("Failed to copy card '{fname}': {e}"))?;
2191
2192 let samples_name = format!("{card_id}.samples.jsonl");
2193 let samples_src = source_dir.join(&samples_name);
2194 if samples_src.exists() {
2195 let samples_dest = dest.join(&samples_name);
2196 if !samples_dest.exists() {
2197 fs::copy(&samples_src, &samples_dest)
2198 .map_err(|e| format!("Failed to copy samples '{samples_name}': {e}"))?;
2199 }
2200 }
2201
2202 imported.push(card_id.to_string());
2203 }
2204
2205 Ok((imported, skipped))
2206 }
2207}
2208
2209#[derive(Debug, Clone)]
2227pub enum CardEvent {
2228 Created {
2230 pkg: String,
2231 card_id: String,
2232 toml_text: String,
2233 },
2234 Appended { card_id: String, toml_text: String },
2236 SamplesWritten { card_id: String, jsonl_text: String },
2238 AliasesWritten { toml_text: String },
2240}
2241
2242#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
2246pub enum CardEventKind {
2247 Created,
2248 Appended,
2249 SamplesWritten,
2250 AliasesWritten,
2251}
2252
2253impl CardEventKind {
2254 pub fn as_str(self) -> &'static str {
2256 match self {
2257 CardEventKind::Created => "created",
2258 CardEventKind::Appended => "appended",
2259 CardEventKind::SamplesWritten => "samples_written",
2260 CardEventKind::AliasesWritten => "aliases_written",
2261 }
2262 }
2263
2264 pub fn json_key(self) -> &'static str {
2268 match self {
2269 CardEventKind::Created => "created",
2270 CardEventKind::Appended => "appended",
2271 CardEventKind::SamplesWritten => "samples",
2272 CardEventKind::AliasesWritten => "aliases",
2273 }
2274 }
2275
2276 pub fn all() -> [CardEventKind; 4] {
2280 [
2281 CardEventKind::Created,
2282 CardEventKind::Appended,
2283 CardEventKind::SamplesWritten,
2284 CardEventKind::AliasesWritten,
2285 ]
2286 }
2287}
2288
2289impl Serialize for CardEventKind {
2290 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2291 s.serialize_str(self.json_key())
2292 }
2293}
2294
2295impl CardEvent {
2296 pub fn kind(&self) -> CardEventKind {
2298 match self {
2299 CardEvent::Created { .. } => CardEventKind::Created,
2300 CardEvent::Appended { .. } => CardEventKind::Appended,
2301 CardEvent::SamplesWritten { .. } => CardEventKind::SamplesWritten,
2302 CardEvent::AliasesWritten { .. } => CardEventKind::AliasesWritten,
2303 }
2304 }
2305}
2306
2307pub trait CardSubscriber: Send + Sync {
2315 fn on_event(&self, ev: &CardEvent) -> Result<(), String>;
2319
2320 fn describe(&self) -> String;
2324
2325 fn has_card(&self, _card_id: &str) -> Result<bool, String> {
2333 Ok(false)
2334 }
2335}
2336
2337#[derive(Debug, Clone, Serialize)]
2342pub struct LastError {
2343 pub kind: CardEventKind,
2344 pub msg: String,
2345 pub ts_ms: u64,
2346}
2347
2348#[derive(Default, Debug)]
2352pub struct PerSubscriber {
2353 pub ok: HashMap<CardEventKind, u64>,
2354 pub err: HashMap<CardEventKind, u64>,
2355 pub last_error: Option<LastError>,
2356}
2357
2358#[derive(Default, Debug)]
2361pub struct SubscriberStats {
2362 inner: Mutex<HashMap<String, PerSubscriber>>,
2363}
2364
2365impl SubscriberStats {
2366 pub fn record_ok(&self, key: &str, kind: CardEventKind) {
2368 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2369 let entry = g.entry(key.to_string()).or_default();
2370 let c = entry.ok.entry(kind).or_insert(0);
2371 *c = c.saturating_add(1);
2372 }
2373
2374 pub fn record_err(&self, key: &str, kind: CardEventKind, err: &str) {
2378 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2379 let entry = g.entry(key.to_string()).or_default();
2380 let c = entry.err.entry(kind).or_insert(0);
2381 *c = c.saturating_add(1);
2382 entry.last_error = Some(LastError {
2383 kind,
2384 msg: err.to_string(),
2385 ts_ms: now_ms(),
2386 });
2387 }
2388
2389 pub fn snapshot(&self) -> Vec<SubscriberHealthRow> {
2398 let g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2399 let mut rows = Vec::with_capacity(g.len());
2400 for (sink, ps) in g.iter() {
2401 let mut ok: HashMap<String, u64> = HashMap::with_capacity(4);
2402 let mut err: HashMap<String, u64> = HashMap::with_capacity(4);
2403 for k in CardEventKind::all() {
2404 ok.insert(
2405 k.json_key().to_string(),
2406 ps.ok.get(&k).copied().unwrap_or(0),
2407 );
2408 err.insert(
2409 k.json_key().to_string(),
2410 ps.err.get(&k).copied().unwrap_or(0),
2411 );
2412 }
2413 rows.push(SubscriberHealthRow {
2414 sink: sink.clone(),
2415 ok,
2416 err,
2417 last_error: ps.last_error.clone(),
2418 });
2419 }
2420 rows.sort_by(|a, b| a.sink.cmp(&b.sink));
2423 rows
2424 }
2425}
2426
2427fn now_ms() -> u64 {
2431 std::time::SystemTime::now()
2432 .duration_since(std::time::UNIX_EPOCH)
2433 .unwrap_or_default()
2434 .as_millis() as u64
2435}
2436
2437#[derive(Debug, Clone, Serialize)]
2440pub struct SubscriberHealthRow {
2441 pub sink: String,
2442 pub ok: HashMap<String, u64>,
2443 pub err: HashMap<String, u64>,
2444 pub last_error: Option<LastError>,
2445}
2446
2447pub fn subscriber_stats_snapshot() -> Vec<SubscriberHealthRow> {
2452 event_bus().stats().snapshot()
2453}
2454
2455fn atomic_write(dest: &Path, bytes: &[u8]) -> Result<(), String> {
2462 static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
2463 let seq = TMP_SEQ.fetch_add(1, Ordering::Relaxed);
2464 let pid = process::id();
2465 if let Some(parent) = dest.parent() {
2466 if !parent.as_os_str().is_empty() && !parent.exists() {
2467 fs::create_dir_all(parent).map_err(|e| format!("subscriber mkdir: {e}"))?;
2468 }
2469 }
2470 let mut tmp = dest.as_os_str().to_owned();
2471 tmp.push(format!(".tmp.{pid}.{seq}"));
2472 let tmp_path = PathBuf::from(tmp);
2473 fs::write(&tmp_path, bytes).map_err(|e| format!("subscriber write tmp: {e}"))?;
2474 fs::rename(&tmp_path, dest).map_err(|e| format!("subscriber rename: {e}"))
2475}
2476
2477fn canonical_file_uri(root: &Path) -> String {
2481 let p = root.to_string_lossy();
2482 #[cfg(unix)]
2483 {
2484 format!("file://{p}")
2485 }
2486 #[cfg(windows)]
2487 {
2488 format!("file:///{}", p.replace('\\', "/"))
2489 }
2490 #[cfg(not(any(unix, windows)))]
2491 {
2492 format!("file://{p}")
2493 }
2494}
2495
2496pub struct FileCardSubscriber {
2503 root: PathBuf,
2504 uri: String,
2505}
2506
2507impl FileCardSubscriber {
2508 pub fn new(root: PathBuf) -> Self {
2511 let uri = canonical_file_uri(&root);
2512 Self { root, uri }
2513 }
2514
2515 pub fn locate_card(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
2519 validate_name(card_id, "card_id")?;
2520 if !self.root.exists() {
2521 return Ok(None);
2522 }
2523 let entries = fs::read_dir(&self.root).map_err(|e| format!("subscriber read_dir: {e}"))?;
2524 for entry in entries.flatten() {
2525 let p = entry.path();
2526 if p.is_dir() {
2527 let candidate = p.join(format!("{card_id}.toml"));
2528 if candidate.exists() {
2529 return Ok(Some(candidate));
2530 }
2531 }
2532 }
2533 Ok(None)
2534 }
2535
2536 fn ensure_pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
2537 validate_name(pkg, "pkg")?;
2538 let dir = self.root.join(pkg);
2539 if !dir.exists() {
2540 fs::create_dir_all(&dir).map_err(|e| format!("subscriber mkdir: {e}"))?;
2541 }
2542 Ok(dir)
2543 }
2544
2545 fn write_created(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<(), String> {
2546 validate_name(card_id, "card_id")?;
2547 let dir = self.ensure_pkg_dir(pkg)?;
2548 let dest = dir.join(format!("{card_id}.toml"));
2549 atomic_write(&dest, toml_text.as_bytes())
2550 }
2551
2552 fn write_appended(&self, card_id: &str, toml_text: &str) -> Result<(), String> {
2553 match self.locate_card(card_id)? {
2554 Some(dest) => atomic_write(&dest, toml_text.as_bytes()),
2555 None => Err(format!(
2556 "subscriber append: card '{card_id}' missing at {}",
2557 self.uri
2558 )),
2559 }
2560 }
2561
2562 fn write_samples(&self, card_id: &str, jsonl_text: &str) -> Result<(), String> {
2563 let card_path = self.locate_card(card_id)?.ok_or_else(|| {
2564 format!(
2565 "subscriber samples: card '{card_id}' missing at {}",
2566 self.uri
2567 )
2568 })?;
2569 let dir = card_path
2570 .parent()
2571 .ok_or_else(|| format!("subscriber samples: card '{card_id}' has no parent dir"))?;
2572 let dest = dir.join(format!("{card_id}.samples.jsonl"));
2573 atomic_write(&dest, jsonl_text.as_bytes())
2574 }
2575
2576 fn write_aliases(&self, toml_text: &str) -> Result<(), String> {
2577 if !self.root.exists() {
2578 fs::create_dir_all(&self.root).map_err(|e| format!("subscriber mkdir: {e}"))?;
2579 }
2580 let dest = self.root.join("_aliases.toml");
2581 atomic_write(&dest, toml_text.as_bytes())
2582 }
2583}
2584
2585impl CardSubscriber for FileCardSubscriber {
2586 fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
2587 match ev {
2588 CardEvent::Created {
2589 pkg,
2590 card_id,
2591 toml_text,
2592 } => self.write_created(pkg, card_id, toml_text),
2593 CardEvent::Appended { card_id, toml_text } => self.write_appended(card_id, toml_text),
2594 CardEvent::SamplesWritten {
2595 card_id,
2596 jsonl_text,
2597 } => self.write_samples(card_id, jsonl_text),
2598 CardEvent::AliasesWritten { toml_text } => self.write_aliases(toml_text),
2599 }
2600 }
2601
2602 fn describe(&self) -> String {
2603 self.uri.clone()
2604 }
2605
2606 fn has_card(&self, card_id: &str) -> Result<bool, String> {
2610 Ok(self.locate_card(card_id)?.is_some())
2611 }
2612}
2613
2614pub struct CardEventBus {
2621 subscribers: Mutex<Vec<Arc<dyn CardSubscriber>>>,
2622 stats: Arc<SubscriberStats>,
2623}
2624
2625impl CardEventBus {
2626 pub fn new(subscribers: Vec<Arc<dyn CardSubscriber>>) -> Self {
2629 Self {
2630 subscribers: Mutex::new(subscribers),
2631 stats: Arc::new(SubscriberStats::default()),
2632 }
2633 }
2634
2635 pub fn stats(&self) -> &Arc<SubscriberStats> {
2637 &self.stats
2638 }
2639
2640 pub fn publish(&self, ev: &CardEvent) {
2644 let subs_snapshot: Vec<Arc<dyn CardSubscriber>> = {
2645 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2646 guard.clone()
2647 };
2648 for sub in &subs_snapshot {
2649 let key = sub.describe();
2650 match sub.on_event(ev) {
2651 Ok(()) => self.stats.record_ok(&key, ev.kind()),
2652 Err(e) => {
2653 tracing::warn!(
2654 subscriber = %key,
2655 kind = ev.kind().as_str(),
2656 error = %e,
2657 "card subscriber failed"
2658 );
2659 self.stats.record_err(&key, ev.kind(), &e);
2660 }
2661 }
2662 }
2663 }
2664
2665 pub fn publish_to(&self, target: &str, ev: &CardEvent) -> Result<(), String> {
2669 let hit: Option<Arc<dyn CardSubscriber>> = {
2670 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2671 guard.iter().find(|s| s.describe() == target).cloned()
2672 };
2673 let Some(sub) = hit else {
2674 return Err(format!("subscriber not registered: {target}"));
2675 };
2676 let key = sub.describe();
2677 match sub.on_event(ev) {
2678 Ok(()) => {
2679 self.stats.record_ok(&key, ev.kind());
2680 Ok(())
2681 }
2682 Err(e) => {
2683 tracing::warn!(
2684 subscriber = %key,
2685 kind = ev.kind().as_str(),
2686 error = %e,
2687 "card subscriber failed (publish_to)"
2688 );
2689 self.stats.record_err(&key, ev.kind(), &e);
2690 Err(e)
2691 }
2692 }
2693 }
2694
2695 pub fn subscriber_uris(&self) -> Vec<String> {
2697 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2698 guard.iter().map(|s| s.describe()).collect()
2699 }
2700
2701 pub fn find_subscriber(&self, uri: &str) -> Option<Arc<dyn CardSubscriber>> {
2706 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2707 guard.iter().find(|s| s.describe() == uri).cloned()
2708 }
2709
2710 #[cfg(any(test, feature = "test-support"))]
2713 pub fn replace_subscribers_for_test(&self, subs: Vec<Arc<dyn CardSubscriber>>) {
2714 let mut guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2715 *guard = subs;
2716 }
2717
2718 #[cfg(any(test, feature = "test-support"))]
2720 pub fn reset_stats_for_test(&self) {
2721 let mut g = self.stats.inner.lock().unwrap_or_else(|p| p.into_inner());
2722 g.clear();
2723 }
2724}
2725
2726static CARD_EVENT_BUS: OnceLock<CardEventBus> = OnceLock::new();
2727
2728pub fn event_bus() -> &'static CardEventBus {
2731 CARD_EVENT_BUS.get_or_init(|| {
2732 let subs = load_subscribers_from_env();
2733 CardEventBus::new(subs)
2734 })
2735}
2736
2737pub fn init_event_bus() {
2742 let bus = event_bus();
2743 let uris = bus.subscriber_uris();
2744 if uris.is_empty() {
2745 tracing::info!("card sinks: no subscribers configured (ALC_CARD_SINKS unset)");
2746 } else {
2747 for uri in &uris {
2748 tracing::info!(subscriber = %uri, "card sink registered");
2749 }
2750 }
2751}
2752
2753#[cfg(any(test, feature = "test-support"))]
2755pub fn install_event_bus_for_test(bus: CardEventBus) -> Result<(), String> {
2756 CARD_EVENT_BUS
2757 .set(bus)
2758 .map_err(|_| "bus already initialized".to_string())
2759}
2760
2761pub fn publish(ev: CardEvent) {
2763 #[cfg(test)]
2769 {
2770 let is_test_owner = INSIDE_BUS_TEST.with(|f| f.get());
2771 if !is_test_owner {
2772 let _gate = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
2774 event_bus().publish(&ev);
2775 return;
2776 }
2777 }
2778 event_bus().publish(&ev);
2779}
2780
2781#[cfg(test)]
2784fn bus_test_gate() -> &'static Mutex<()> {
2785 static GATE: OnceLock<Mutex<()>> = OnceLock::new();
2786 GATE.get_or_init(|| Mutex::new(()))
2787}
2788
2789#[cfg(test)]
2792thread_local! {
2793 static INSIDE_BUS_TEST: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
2794}
2795
2796#[derive(Debug, Clone, Default, Serialize)]
2803pub struct SinkBackfillReport {
2804 pub sink: String,
2805 pub pushed: Vec<String>,
2806 pub skipped: Vec<String>,
2807 pub failed: Vec<(String, String)>,
2808 pub pushed_samples: Vec<String>,
2809}
2810
2811pub fn card_sink_backfill_with_store(
2831 store: &dyn CardStore,
2832 sink: &str,
2833 dry_run: bool,
2834) -> Result<SinkBackfillReport, String> {
2835 let bus = event_bus();
2836 let sub = bus
2837 .find_subscriber(sink)
2838 .ok_or_else(|| format!("unknown sink: {sink}"))?;
2839
2840 let locators = store.list_card_locators(None)?;
2841
2842 let mut report = SinkBackfillReport {
2843 sink: sink.to_string(),
2844 ..Default::default()
2845 };
2846
2847 for (pkg, locator) in locators {
2848 let card_id = match locator.file_stem().and_then(|s| s.to_str()) {
2849 Some(s) => s.to_string(),
2850 None => continue,
2851 };
2852
2853 match sub.has_card(&card_id) {
2854 Ok(true) => {
2855 report.skipped.push(card_id);
2856 continue;
2857 }
2858 Ok(false) => {}
2859 Err(e) => {
2860 tracing::warn!(
2861 card_id = %card_id,
2862 error = %e,
2863 "backfill: has_card failed; treating as skipped"
2864 );
2865 report.skipped.push(card_id);
2866 continue;
2867 }
2868 }
2869
2870 let toml_text = match store.read_locator_text(&locator) {
2871 Ok(Some(t)) => t,
2872 Ok(None) => {
2873 report.skipped.push(card_id);
2875 continue;
2876 }
2877 Err(e) => {
2878 tracing::warn!(
2879 card_id = %card_id,
2880 error = %e,
2881 "backfill: read_locator_text failed; treating as skipped"
2882 );
2883 report.skipped.push(card_id);
2884 continue;
2885 }
2886 };
2887
2888 if dry_run {
2889 report.pushed.push(card_id.clone());
2890 if matches!(store.read_samples_text(&card_id), Ok(Some(_))) {
2891 report.pushed_samples.push(card_id);
2892 }
2893 continue;
2894 }
2895
2896 let ev = CardEvent::Created {
2897 pkg: pkg.clone(),
2898 card_id: card_id.clone(),
2899 toml_text,
2900 };
2901 match bus.publish_to(sink, &ev) {
2902 Ok(()) => report.pushed.push(card_id.clone()),
2903 Err(e) => {
2904 report.failed.push((card_id, e));
2905 continue;
2906 }
2907 }
2908
2909 if let Ok(Some(jsonl_text)) = store.read_samples_text(&card_id) {
2910 let ev = CardEvent::SamplesWritten {
2911 card_id: card_id.clone(),
2912 jsonl_text,
2913 };
2914 match bus.publish_to(sink, &ev) {
2915 Ok(()) => report.pushed_samples.push(card_id),
2916 Err(e) => {
2917 report.failed.push((card_id, format!("samples: {e}")));
2918 }
2919 }
2920 }
2921 }
2922
2923 Ok(report)
2924}
2925
2926fn load_subscribers_from_env() -> Vec<Arc<dyn CardSubscriber>> {
2932 let raw = match std::env::var("ALC_CARD_SINKS") {
2933 Ok(v) => v,
2934 Err(std::env::VarError::NotPresent) => return Vec::new(),
2935 Err(std::env::VarError::NotUnicode(_)) => {
2936 tracing::error!("ALC_CARD_SINKS contains non-UTF8 bytes; ignoring entire variable");
2937 return Vec::new();
2938 }
2939 };
2940 parse_subscribers_from_str(&raw)
2941}
2942
2943fn parse_subscribers_from_str(raw: &str) -> Vec<Arc<dyn CardSubscriber>> {
2947 if raw.is_empty() {
2948 return Vec::new();
2949 }
2950 let mut seen: HashSet<String> = HashSet::new();
2951 let mut out: Vec<Arc<dyn CardSubscriber>> = Vec::new();
2952 for spec in raw.split('|') {
2953 let spec = spec.trim();
2954 if spec.is_empty() {
2955 continue;
2956 }
2957 let Some(sub) = parse_subscriber_spec(spec) else {
2958 continue;
2959 };
2960 let uri = sub.describe();
2961 if !seen.insert(uri.clone()) {
2962 tracing::warn!(subscriber = %uri, "duplicate ALC_CARD_SINKS entry; keeping first");
2963 continue;
2964 }
2965 out.push(sub);
2966 }
2967 out
2968}
2969
2970fn parse_subscriber_spec(spec: &str) -> Option<Arc<dyn CardSubscriber>> {
2972 let Some(colon_idx) = spec.find(':') else {
2974 tracing::error!(spec, "ALC_CARD_SINKS entry missing URI scheme");
2975 return None;
2976 };
2977 let scheme = &spec[..colon_idx];
2978 let rest = &spec[colon_idx + 1..];
2979 if scheme != "file" {
2980 tracing::error!(spec, scheme, "ALC_CARD_SINKS entry has unknown scheme");
2981 return None;
2982 }
2983 let Some(after_slash) = rest.strip_prefix("//") else {
2985 tracing::error!(spec, "file URI missing '//'");
2986 return None;
2987 };
2988 let Some(path_start) = after_slash.find('/') else {
2990 tracing::error!(spec, "file URI has no path component");
2991 return None;
2992 };
2993 let authority = &after_slash[..path_start];
2994 let encoded_path = &after_slash[path_start..];
2995 if !authority.is_empty() {
2996 tracing::error!(
2997 spec,
2998 authority,
2999 "file URI with non-empty authority is rejected"
3000 );
3001 return None;
3002 }
3003 let path = decode_file_uri_path(encoded_path)?;
3004 Some(Arc::new(FileCardSubscriber::new(path)))
3005}
3006
3007fn decode_file_uri_path(encoded: &str) -> Option<PathBuf> {
3013 let decoded = percent_decode(encoded)?;
3014 #[cfg(windows)]
3015 {
3016 let trimmed = decoded.strip_prefix('/').unwrap_or(&decoded);
3018 Some(PathBuf::from(trimmed))
3019 }
3020 #[cfg(not(windows))]
3021 {
3022 Some(PathBuf::from(decoded))
3023 }
3024}
3025
3026fn percent_decode(src: &str) -> Option<String> {
3029 let bytes = src.as_bytes();
3030 let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
3031 let mut i = 0;
3032 while i < bytes.len() {
3033 let b = bytes[i];
3034 if b == b'%' {
3035 if i + 2 >= bytes.len() {
3036 tracing::error!(src, "percent-encoded sequence truncated");
3037 return None;
3038 }
3039 let hi = (bytes[i + 1] as char).to_digit(16);
3040 let lo = (bytes[i + 2] as char).to_digit(16);
3041 match (hi, lo) {
3042 (Some(h), Some(l)) => {
3043 out.push(((h << 4) | l) as u8);
3044 i += 3;
3045 }
3046 _ => {
3047 tracing::error!(src, "percent-encoded sequence has non-hex digits");
3048 return None;
3049 }
3050 }
3051 } else {
3052 out.push(b);
3053 i += 1;
3054 }
3055 }
3056 match String::from_utf8(out) {
3057 Ok(s) => Some(s),
3058 Err(_) => {
3059 tracing::error!(src, "percent-decoded bytes are not valid UTF-8");
3060 None
3061 }
3062 }
3063}
3064
3065#[cfg(test)]
3066mod tests {
3067 use super::*;
3068
3069 fn shared_store() -> &'static FileCardStore {
3077 static STORE: OnceLock<FileCardStore> = OnceLock::new();
3078 STORE.get_or_init(|| {
3079 let tmp = tempfile::tempdir().expect("test tempdir");
3080 let root = tmp.path().to_path_buf();
3083 std::mem::forget(tmp);
3084 FileCardStore::new(root)
3085 })
3086 }
3087
3088 fn create(input: Json) -> Result<(String, PathBuf), String> {
3095 create_with_store(shared_store(), input)
3096 }
3097
3098 fn get(card_id: &str) -> Result<Option<Json>, String> {
3099 get_with_store(shared_store(), card_id)
3100 }
3101
3102 fn list(pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
3103 list_with_store(shared_store(), pkg_filter)
3104 }
3105
3106 fn append(card_id: &str, fields: Json) -> Result<Json, String> {
3107 append_with_store(shared_store(), card_id, fields)
3108 }
3109
3110 fn alias_set(
3111 name: &str,
3112 card_id: &str,
3113 pkg: Option<&str>,
3114 note: Option<&str>,
3115 ) -> Result<Alias, String> {
3116 alias_set_with_store(shared_store(), name, card_id, pkg, note)
3117 }
3118
3119 fn alias_list(pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
3120 alias_list_with_store(shared_store(), pkg_filter)
3121 }
3122
3123 fn get_by_alias(name: &str) -> Result<Option<Json>, String> {
3124 get_by_alias_with_store(shared_store(), name)
3125 }
3126
3127 fn find(q: FindQuery) -> Result<Vec<Summary>, String> {
3128 find_with_store(shared_store(), q)
3129 }
3130
3131 fn lineage(q: LineageQuery) -> Result<Option<LineageResult>, String> {
3137 lineage_with_store(shared_store(), q)
3138 }
3139
3140 fn import_from_dir(
3141 source_dir: &std::path::Path,
3142 pkg: &str,
3143 ) -> Result<(Vec<String>, Vec<String>), String> {
3144 import_from_dir_with_store(shared_store(), source_dir, pkg)
3145 }
3146
3147 fn unique_pkg() -> String {
3148 let ns = std::time::SystemTime::now()
3149 .duration_since(std::time::UNIX_EPOCH)
3150 .unwrap()
3151 .as_nanos();
3152 format!("_test_card_{ns}")
3153 }
3154
3155 fn cleanup(pkg: &str) {
3156 if let Ok(d) = shared_store().pkg_dir(pkg) {
3157 let _ = fs::remove_dir_all(&d);
3158 }
3159 }
3160
3161 #[test]
3162 fn minimum_valid_card() {
3163 let pkg = unique_pkg();
3164 let input = json!({ "pkg": { "name": pkg } });
3165 let (id, path) = create(input).unwrap();
3166 assert!(path.exists());
3167 assert!(id.starts_with(&pkg));
3168
3169 let got = get(&id).unwrap().unwrap();
3170 assert_eq!(got["schema_version"], json!(SCHEMA_VERSION));
3171 assert_eq!(got["card_id"], json!(id));
3172 assert_eq!(got["pkg"]["name"], json!(pkg));
3173 assert!(got.get("created_at").is_some());
3174 assert!(got.get("created_by").is_some());
3175
3176 cleanup(&pkg);
3177 }
3178
3179 #[test]
3180 fn create_rejects_missing_pkg_name() {
3181 let err = create(json!({})).unwrap_err();
3182 assert!(err.contains("pkg.name"));
3183 }
3184
3185 #[test]
3186 fn create_is_immutable() {
3187 let pkg = unique_pkg();
3188 let input = json!({
3189 "card_id": "fixed_id_001",
3190 "pkg": { "name": pkg }
3191 });
3192 create(input.clone()).unwrap();
3193 let err = create(input).unwrap_err();
3194 assert!(err.contains("already exists"));
3195 cleanup(&pkg);
3196 }
3197
3198 #[test]
3199 fn create_injects_param_fingerprint() {
3200 let pkg = unique_pkg();
3201 let input = json!({
3202 "pkg": { "name": pkg },
3203 "params": { "depth": 3, "temperature": 0.0 }
3204 });
3205 let (id, _) = create(input).unwrap();
3206 let got = get(&id).unwrap().unwrap();
3207 assert!(got["param_fingerprint"].is_string());
3208 cleanup(&pkg);
3209 }
3210
3211 #[test]
3212 fn list_returns_newest_first() {
3213 let pkg = unique_pkg();
3214 let (id1, _) = create(json!({
3216 "card_id": format!("{pkg}_a"),
3217 "pkg": { "name": pkg },
3218 "created_at": "2025-01-01T00:00:00Z"
3219 }))
3220 .unwrap();
3221 let (id2, _) = create(json!({
3222 "card_id": format!("{pkg}_b"),
3223 "pkg": { "name": pkg },
3224 "created_at": "2026-01-01T00:00:00Z"
3225 }))
3226 .unwrap();
3227
3228 let rows = list(Some(&pkg)).unwrap();
3229 assert_eq!(rows.len(), 2);
3230 assert_eq!(rows[0].card_id, id2); assert_eq!(rows[1].card_id, id1);
3232
3233 cleanup(&pkg);
3234 }
3235
3236 #[test]
3237 fn list_extracts_summary_fields() {
3238 let pkg = unique_pkg();
3239 let (id, _) = create(json!({
3240 "pkg": { "name": pkg },
3241 "model": { "id": "claude-opus-4-6" },
3242 "scenario": { "name": "gsm8k_sample100" },
3243 "stats": { "pass_rate": 0.82 }
3244 }))
3245 .unwrap();
3246
3247 let rows = list(Some(&pkg)).unwrap();
3248 let row = rows.iter().find(|r| r.card_id == id).unwrap();
3249 assert_eq!(row.model.as_deref(), Some("claude-opus-4-6"));
3250 assert_eq!(row.scenario.as_deref(), Some("gsm8k_sample100"));
3251 assert_eq!(row.pass_rate, Some(0.82));
3252
3253 cleanup(&pkg);
3254 }
3255
3256 #[test]
3257 fn get_missing_returns_none() {
3258 assert!(get("does_not_exist_xyz").unwrap().is_none());
3259 }
3260
3261 #[test]
3262 fn card_id_embeds_compact_timestamp() {
3263 let pkg = unique_pkg();
3264 let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3265 let tail = id.strip_prefix(&format!("{pkg}_")).unwrap();
3269 let parts: Vec<&str> = tail.split('_').collect();
3270 assert_eq!(parts.len(), 3, "unexpected card_id shape: {id}");
3272 let ts = parts[1];
3273 assert_eq!(ts.len(), 15, "timestamp segment wrong length: {ts}");
3274 assert!(ts.chars().nth(8) == Some('T'), "missing T separator: {ts}");
3275 cleanup(&pkg);
3276 }
3277
3278 #[test]
3279 fn now_compact_format() {
3280 let s = now_compact();
3281 assert_eq!(s.len(), 15);
3282 assert_eq!(s.chars().nth(8), Some('T'));
3283 for (i, c) in s.chars().enumerate() {
3285 if i != 8 {
3286 assert!(c.is_ascii_digit(), "non-digit at pos {i}: {s}");
3287 }
3288 }
3289 }
3290
3291 #[test]
3292 fn short_model_variants() {
3293 assert_eq!(short_model("claude-opus-4-6"), "opus46");
3294 assert_eq!(short_model("gpt-4o"), "4o");
3295 assert_eq!(short_model(""), "model");
3296 }
3297
3298 #[test]
3299 fn two_cards_same_second_different_stats_get_distinct_ids() {
3300 let pkg = unique_pkg();
3301 let input1 = json!({
3302 "pkg": { "name": pkg },
3303 "scenario": { "name": "gsm8k" },
3304 "stats": { "pass_rate": 0.4 }
3305 });
3306 let input2 = json!({
3307 "pkg": { "name": pkg },
3308 "scenario": { "name": "gsm8k" },
3309 "stats": { "pass_rate": 0.9 }
3310 });
3311 let (id1, _) = create(input1).unwrap();
3312 let (id2, _) = create(input2).unwrap();
3313 assert_ne!(id1, id2, "distinct stats must yield distinct card_ids");
3314 cleanup(&pkg);
3315 }
3316
3317 #[test]
3320 fn append_adds_new_fields() {
3321 let pkg = unique_pkg();
3322 let (id, _) = create(json!({
3323 "pkg": { "name": pkg },
3324 "stats": { "pass_rate": 0.5 }
3325 }))
3326 .unwrap();
3327
3328 let merged = append(
3329 &id,
3330 json!({
3331 "caveats": { "notes": "rescored after fix" },
3332 "metadata": { "reviewer": "yn" }
3333 }),
3334 )
3335 .unwrap();
3336 assert_eq!(merged["caveats"]["notes"], json!("rescored after fix"));
3337 assert_eq!(merged["metadata"]["reviewer"], json!("yn"));
3338
3339 let got = get(&id).unwrap().unwrap();
3341 assert_eq!(got["caveats"]["notes"], json!("rescored after fix"));
3342 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3344
3345 cleanup(&pkg);
3346 }
3347
3348 #[test]
3349 fn append_rejects_existing_key() {
3350 let pkg = unique_pkg();
3351 let (id, _) = create(json!({
3352 "pkg": { "name": pkg },
3353 "stats": { "pass_rate": 0.5 }
3354 }))
3355 .unwrap();
3356
3357 let err = append(&id, json!({ "stats": { "pass_rate": 0.9 } })).unwrap_err();
3358 assert!(err.contains("already set"), "got: {err}");
3359 let got = get(&id).unwrap().unwrap();
3361 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3362
3363 cleanup(&pkg);
3364 }
3365
3366 #[test]
3367 fn append_errors_on_missing_card() {
3368 let err = append("does_not_exist_xyz", json!({ "x": 1 })).unwrap_err();
3369 assert!(err.contains("not found"));
3370 }
3371
3372 #[test]
3375 fn alias_set_and_list_roundtrip() {
3376 let pkg = unique_pkg();
3377 let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3378
3379 let alias_name = format!("test_alias_{}", &pkg);
3380 alias_set(&alias_name, &id, Some(&pkg), Some("smoke")).unwrap();
3381
3382 let rows = alias_list(Some(&pkg)).unwrap();
3383 let a = rows.iter().find(|a| a.name == alias_name).unwrap();
3384 assert_eq!(a.card_id, id);
3385 assert_eq!(a.pkg.as_deref(), Some(pkg.as_str()));
3386 assert_eq!(a.note.as_deref(), Some("smoke"));
3387 assert!(!a.set_at.is_empty());
3388
3389 let (id2, _) = create(json!({
3391 "card_id": format!("{pkg}_b"),
3392 "pkg": { "name": pkg }
3393 }))
3394 .unwrap();
3395 alias_set(&alias_name, &id2, Some(&pkg), None).unwrap();
3396 let rows = alias_list(Some(&pkg)).unwrap();
3397 let matching: Vec<&Alias> = rows.iter().filter(|a| a.name == alias_name).collect();
3398 assert_eq!(matching.len(), 1, "alias should be unique by name");
3399 assert_eq!(matching[0].card_id, id2);
3400
3401 let store = shared_store();
3403 let remaining: Vec<Alias> = store
3404 .read_aliases()
3405 .unwrap()
3406 .into_iter()
3407 .filter(|a| a.name != alias_name)
3408 .collect();
3409 store.write_aliases(&remaining).unwrap();
3410 cleanup(&pkg);
3411 }
3412
3413 #[test]
3414 fn alias_set_rejects_unknown_card() {
3415 let err = alias_set("x", "does_not_exist_xyz", None, None).unwrap_err();
3416 assert!(err.contains("not found"));
3417 }
3418
3419 fn where_from(v: Json) -> Predicate {
3422 parse_where(&v).expect("parse where")
3423 }
3424
3425 fn order_from(v: Json) -> Vec<OrderKey> {
3426 parse_order_by(&v).expect("parse order_by")
3427 }
3428
3429 #[test]
3430 fn find_where_nested_eq_and_gte() {
3431 let pkg = unique_pkg();
3432 create(json!({
3433 "card_id": format!("{pkg}_low"),
3434 "pkg": { "name": pkg },
3435 "scenario": { "name": "gsm8k" },
3436 "stats": { "pass_rate": 0.4 }
3437 }))
3438 .unwrap();
3439 create(json!({
3440 "card_id": format!("{pkg}_high"),
3441 "pkg": { "name": pkg },
3442 "scenario": { "name": "gsm8k" },
3443 "stats": { "pass_rate": 0.9 }
3444 }))
3445 .unwrap();
3446 create(json!({
3447 "card_id": format!("{pkg}_other"),
3448 "pkg": { "name": pkg },
3449 "scenario": { "name": "other" },
3450 "stats": { "pass_rate": 1.0 }
3451 }))
3452 .unwrap();
3453
3454 let rows = find(FindQuery {
3456 pkg: Some(pkg.clone()),
3457 where_: Some(where_from(json!({
3458 "scenario": { "name": "gsm8k" },
3459 }))),
3460 order_by: order_from(json!("-stats.pass_rate")),
3461 ..Default::default()
3462 })
3463 .unwrap();
3464 assert_eq!(rows.len(), 2);
3465 assert_eq!(rows[0].pass_rate, Some(0.9));
3466 assert_eq!(rows[1].pass_rate, Some(0.4));
3467
3468 let rows = find(FindQuery {
3470 pkg: Some(pkg.clone()),
3471 where_: Some(where_from(json!({
3472 "stats": { "pass_rate": { "gte": 0.8 } },
3473 }))),
3474 order_by: order_from(json!("-stats.pass_rate")),
3475 ..Default::default()
3476 })
3477 .unwrap();
3478 assert_eq!(rows.len(), 2);
3479 assert!(rows.iter().all(|r| r.pass_rate.unwrap() >= 0.8));
3480
3481 let rows = find(FindQuery {
3483 pkg: Some(pkg.clone()),
3484 order_by: order_from(json!("-stats.pass_rate")),
3485 limit: Some(1),
3486 ..Default::default()
3487 })
3488 .unwrap();
3489 assert_eq!(rows.len(), 1);
3490 assert_eq!(rows[0].pass_rate, Some(1.0));
3491
3492 cleanup(&pkg);
3493 }
3494
3495 #[test]
3496 fn find_where_implicit_eq_and_logical() {
3497 let pkg = unique_pkg();
3498 create(json!({
3499 "card_id": format!("{pkg}_a"),
3500 "pkg": { "name": pkg },
3501 "model": { "id": "claude-opus-4-6" },
3502 "stats": { "equilibrium_position": "dead", "survival_rate": 0.0 }
3503 }))
3504 .unwrap();
3505 create(json!({
3506 "card_id": format!("{pkg}_b"),
3507 "pkg": { "name": pkg },
3508 "model": { "id": "claude-opus-4-6" },
3509 "stats": { "equilibrium_position": "niche_leader", "survival_rate": 1.0 }
3510 }))
3511 .unwrap();
3512 create(json!({
3513 "card_id": format!("{pkg}_c"),
3514 "pkg": { "name": pkg },
3515 "model": { "id": "claude-haiku-4-5-20251001" },
3516 "stats": { "equilibrium_position": "fragile", "survival_rate": 0.2 }
3517 }))
3518 .unwrap();
3519
3520 let rows = find(FindQuery {
3522 pkg: Some(pkg.clone()),
3523 where_: Some(where_from(json!({
3524 "stats": { "equilibrium_position": "dead" },
3525 }))),
3526 ..Default::default()
3527 })
3528 .unwrap();
3529 assert_eq!(rows.len(), 1);
3530 assert!(rows[0].card_id.ends_with("_a"));
3531
3532 let rows = find(FindQuery {
3534 pkg: Some(pkg.clone()),
3535 where_: Some(where_from(json!({
3536 "_or": [
3537 { "stats": { "equilibrium_position": "dead" } },
3538 { "stats": { "survival_rate": { "gte": 0.9 } } },
3539 ],
3540 }))),
3541 ..Default::default()
3542 })
3543 .unwrap();
3544 assert_eq!(rows.len(), 2);
3545
3546 let rows = find(FindQuery {
3548 pkg: Some(pkg.clone()),
3549 where_: Some(where_from(json!({
3550 "_not": { "model": { "id": "claude-haiku-4-5-20251001" } },
3551 }))),
3552 ..Default::default()
3553 })
3554 .unwrap();
3555 assert_eq!(rows.len(), 2);
3556
3557 let rows = find(FindQuery {
3559 pkg: Some(pkg.clone()),
3560 where_: Some(where_from(json!({
3561 "stats": {
3562 "equilibrium_position": { "in": ["dead", "fragile"] },
3563 },
3564 }))),
3565 ..Default::default()
3566 })
3567 .unwrap();
3568 assert_eq!(rows.len(), 2);
3569
3570 let rows = find(FindQuery {
3573 pkg: Some(pkg.clone()),
3574 where_: Some(where_from(json!({
3575 "strategy_params": { "temperature": { "exists": false } },
3576 }))),
3577 ..Default::default()
3578 })
3579 .unwrap();
3580 assert_eq!(rows.len(), 3, "none of the cards have strategy_params");
3581
3582 cleanup(&pkg);
3583 }
3584
3585 #[test]
3586 fn find_order_by_multi_key() {
3587 let pkg = unique_pkg();
3588 create(json!({
3589 "card_id": format!("{pkg}_a"),
3590 "pkg": { "name": pkg },
3591 "stats": { "pass_rate": 0.5 }
3592 }))
3593 .unwrap();
3594 create(json!({
3595 "card_id": format!("{pkg}_b"),
3596 "pkg": { "name": pkg },
3597 "stats": { "pass_rate": 0.9 }
3598 }))
3599 .unwrap();
3600 create(json!({
3601 "card_id": format!("{pkg}_c"),
3602 "pkg": { "name": pkg },
3603 "stats": { "pass_rate": 0.9 }
3604 }))
3605 .unwrap();
3606
3607 let rows = find(FindQuery {
3608 pkg: Some(pkg.clone()),
3609 order_by: order_from(json!(["-stats.pass_rate", "card_id"])),
3610 ..Default::default()
3611 })
3612 .unwrap();
3613 assert_eq!(rows.len(), 3);
3614 assert_eq!(rows[0].pass_rate, Some(0.9));
3615 assert_eq!(rows[1].pass_rate, Some(0.9));
3616 assert_eq!(rows[2].pass_rate, Some(0.5));
3617 assert!(rows[0].card_id < rows[1].card_id);
3619
3620 cleanup(&pkg);
3621 }
3622
3623 #[test]
3624 fn find_offset_and_limit() {
3625 let pkg = unique_pkg();
3626 for i in 0..5 {
3627 create(json!({
3628 "card_id": format!("{pkg}_{i}"),
3629 "pkg": { "name": pkg },
3630 "stats": { "pass_rate": 0.1 * (i + 1) as f64 }
3631 }))
3632 .unwrap();
3633 }
3634
3635 let rows = find(FindQuery {
3636 pkg: Some(pkg.clone()),
3637 order_by: order_from(json!("-stats.pass_rate")),
3638 offset: Some(1),
3639 limit: Some(2),
3640 ..Default::default()
3641 })
3642 .unwrap();
3643 assert_eq!(rows.len(), 2);
3644 let pr0 = rows[0].pass_rate.unwrap();
3646 let pr1 = rows[1].pass_rate.unwrap();
3647 assert!((pr0 - 0.4).abs() < 1e-9, "got {pr0}");
3648 assert!((pr1 - 0.3).abs() < 1e-9, "got {pr1}");
3649
3650 cleanup(&pkg);
3651 }
3652
3653 #[test]
3654 fn parse_where_rejects_non_object() {
3655 assert!(parse_where(&json!("not an object")).is_err());
3656 assert!(parse_where(&json!(42)).is_err());
3657 }
3658
3659 #[test]
3660 fn parse_order_by_accepts_string_and_array() {
3661 let k = parse_order_by(&json!("-stats.pass_rate")).unwrap();
3662 assert_eq!(k.len(), 1);
3663 assert_eq!(k[0].path, vec!["stats", "pass_rate"]);
3664 assert!(k[0].desc);
3665
3666 let k = parse_order_by(&json!(["created_at", "-stats.n"])).unwrap();
3667 assert_eq!(k.len(), 2);
3668 assert!(!k[0].desc);
3669 assert!(k[1].desc);
3670 }
3671
3672 #[test]
3673 fn find_where_string_ops_contains_and_starts_with() {
3674 let pkg = unique_pkg();
3675 create(json!({
3676 "card_id": format!("{pkg}_a"),
3677 "pkg": { "name": pkg },
3678 "model": { "id": "claude-opus-4-6" },
3679 "metadata": { "tag": "experiment_alpha" },
3680 }))
3681 .unwrap();
3682 create(json!({
3683 "card_id": format!("{pkg}_b"),
3684 "pkg": { "name": pkg },
3685 "model": { "id": "claude-haiku-4-5-20251001" },
3686 "metadata": { "tag": "experiment_beta" },
3687 }))
3688 .unwrap();
3689 create(json!({
3690 "card_id": format!("{pkg}_c"),
3691 "pkg": { "name": pkg },
3692 "model": { "id": "claude-sonnet-4-5" },
3693 "metadata": { "tag": "baseline" },
3694 }))
3695 .unwrap();
3696
3697 let rows = find(FindQuery {
3699 pkg: Some(pkg.clone()),
3700 where_: Some(where_from(json!({
3701 "metadata": { "tag": { "contains": "experiment" } },
3702 }))),
3703 ..Default::default()
3704 })
3705 .unwrap();
3706 assert_eq!(rows.len(), 2);
3707
3708 let rows = find(FindQuery {
3710 pkg: Some(pkg.clone()),
3711 where_: Some(where_from(json!({
3712 "model": { "id": { "starts_with": "claude-opus" } },
3713 }))),
3714 ..Default::default()
3715 })
3716 .unwrap();
3717 assert_eq!(rows.len(), 1);
3718 assert!(rows[0].card_id.ends_with("_a"));
3719
3720 let rows = find(FindQuery {
3722 pkg: Some(pkg.clone()),
3723 where_: Some(where_from(json!({
3724 "metadata": { "missing_field": { "contains": "x" } },
3725 }))),
3726 ..Default::default()
3727 })
3728 .unwrap();
3729 assert_eq!(rows.len(), 0);
3730
3731 let rows = find(FindQuery {
3733 pkg: Some(pkg.clone()),
3734 where_: Some(where_from(json!({
3735 "metadata": { "tag": { "starts_with": 42 } },
3736 }))),
3737 ..Default::default()
3738 })
3739 .unwrap();
3740 assert_eq!(rows.len(), 0);
3741
3742 cleanup(&pkg);
3743 }
3744
3745 #[test]
3746 fn where_missing_field_ne_is_true() {
3747 let pkg = unique_pkg();
3748 create(json!({
3749 "card_id": format!("{pkg}_x"),
3750 "pkg": { "name": pkg },
3751 }))
3752 .unwrap();
3753
3754 let rows = find(FindQuery {
3755 pkg: Some(pkg.clone()),
3756 where_: Some(where_from(json!({
3757 "strategy_params": { "temperature": { "ne": 0.5 } },
3758 }))),
3759 ..Default::default()
3760 })
3761 .unwrap();
3762 assert_eq!(rows.len(), 1, "missing field is ne to anything");
3763
3764 cleanup(&pkg);
3765 }
3766
3767 fn create_child(pkg: &str, suffix: &str, parent_id: &str, relation: &str) -> String {
3771 let (id, _) = create(json!({
3772 "card_id": format!("{pkg}_{suffix}"),
3773 "pkg": { "name": pkg },
3774 "stats": { "pass_rate": 0.5 },
3775 "metadata": {
3776 "prior_card_id": parent_id,
3777 "prior_relation": relation,
3778 },
3779 }))
3780 .unwrap();
3781 id
3782 }
3783
3784 #[test]
3785 fn lineage_up_walks_prior_card_id_chain() {
3786 let pkg = unique_pkg();
3787 let (a, _) = create(json!({
3789 "card_id": format!("{pkg}_a"),
3790 "pkg": { "name": pkg },
3791 }))
3792 .unwrap();
3793 let b = create_child(&pkg, "b", &a, "rerun_of");
3794 let c = create_child(&pkg, "c", &b, "rerun_of");
3795
3796 let res = lineage(LineageQuery {
3797 card_id: c.clone(),
3798 direction: LineageDirection::Up,
3799 depth: None,
3800 include_stats: false,
3801 relation_filter: None,
3802 })
3803 .unwrap()
3804 .expect("lineage result");
3805
3806 assert_eq!(res.root, c);
3807 assert_eq!(res.nodes.len(), 3, "root + 2 ancestors");
3808 assert_eq!(res.nodes[0].card_id, c);
3809 assert_eq!(res.nodes[0].depth, 0);
3810 assert_eq!(res.nodes[1].card_id, b);
3811 assert_eq!(res.nodes[1].depth, -1);
3812 assert_eq!(res.nodes[2].card_id, a);
3813 assert_eq!(res.nodes[2].depth, -2);
3814 assert_eq!(res.edges.len(), 2);
3815 assert!(!res.truncated);
3816
3817 cleanup(&pkg);
3818 }
3819
3820 #[test]
3821 fn lineage_down_walks_descendants_breadth_first() {
3822 let pkg = unique_pkg();
3823 let (a, _) = create(json!({
3825 "card_id": format!("{pkg}_a"),
3826 "pkg": { "name": pkg },
3827 }))
3828 .unwrap();
3829 let _b = create_child(&pkg, "b", &a, "sweep_variant");
3830 let c = create_child(&pkg, "c", &a, "sweep_variant");
3831 let _d = create_child(&pkg, "d", &c, "rerun_of");
3832
3833 let res = lineage(LineageQuery {
3834 card_id: a.clone(),
3835 direction: LineageDirection::Down,
3836 depth: None,
3837 include_stats: false,
3838 relation_filter: None,
3839 })
3840 .unwrap()
3841 .expect("lineage result");
3842
3843 assert_eq!(res.nodes.len(), 4);
3845 assert_eq!(res.edges.len(), 3);
3846 assert!(!res.truncated);
3847
3848 cleanup(&pkg);
3849 }
3850
3851 #[test]
3852 fn lineage_depth_truncation_sets_flag() {
3853 let pkg = unique_pkg();
3854 let (a, _) = create(json!({
3855 "card_id": format!("{pkg}_a"),
3856 "pkg": { "name": pkg },
3857 }))
3858 .unwrap();
3859 let b = create_child(&pkg, "b", &a, "rerun_of");
3860 let _c = create_child(&pkg, "c", &b, "rerun_of");
3861
3862 let res = lineage(LineageQuery {
3863 card_id: a,
3864 direction: LineageDirection::Down,
3865 depth: Some(1),
3866 include_stats: false,
3867 relation_filter: None,
3868 })
3869 .unwrap()
3870 .unwrap();
3871 assert_eq!(res.nodes.len(), 2, "root + 1 level");
3872 assert!(res.truncated, "should be truncated at depth=1");
3873
3874 cleanup(&pkg);
3875 }
3876
3877 #[test]
3878 fn lineage_relation_filter_skips_unlisted() {
3879 let pkg = unique_pkg();
3880 let (a, _) = create(json!({
3881 "card_id": format!("{pkg}_a"),
3882 "pkg": { "name": pkg },
3883 }))
3884 .unwrap();
3885 let _b = create_child(&pkg, "b", &a, "sweep_variant");
3886 let _c = create_child(&pkg, "c", &a, "rerun_of");
3887
3888 let res = lineage(LineageQuery {
3889 card_id: a,
3890 direction: LineageDirection::Down,
3891 depth: None,
3892 include_stats: false,
3893 relation_filter: Some(vec!["sweep_variant".to_string()]),
3894 })
3895 .unwrap()
3896 .unwrap();
3897 assert_eq!(res.nodes.len(), 2, "root + only sweep_variant child");
3898 assert_eq!(res.edges[0].relation.as_deref(), Some("sweep_variant"));
3899
3900 cleanup(&pkg);
3901 }
3902
3903 #[test]
3904 fn lineage_missing_card_returns_none() {
3905 let res = lineage(LineageQuery {
3906 card_id: "nonexistent_card_id_xyz".into(),
3907 direction: LineageDirection::Up,
3908 depth: None,
3909 include_stats: false,
3910 relation_filter: None,
3911 })
3912 .unwrap();
3913 assert!(res.is_none());
3914 }
3915
3916 #[test]
3922 fn write_and_read_samples_roundtrip() {
3923 let tmp = tempfile::tempdir().unwrap();
3924 let store = FileCardStore::new(tmp.path().to_path_buf());
3925 let (id, _) = create_with_store(
3926 &store,
3927 json!({
3928 "pkg": { "name": "roundtrip_pkg" },
3929 "stats": { "pass_rate": 0.5 }
3930 }),
3931 )
3932 .unwrap();
3933
3934 let samples = vec![
3935 json!({ "case": "c0", "passed": true, "score": 1.0 }),
3936 json!({ "case": "c1", "passed": false, "score": 0.0 }),
3937 json!({ "case": "c2", "passed": true, "score": 0.75 }),
3938 ];
3939 let path = write_samples_with_store(&store, &id, samples.clone()).unwrap();
3940 assert!(path.exists());
3941 assert!(path.to_string_lossy().ends_with(".samples.jsonl"));
3942
3943 let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
3944 assert_eq!(got.len(), 3);
3945 assert_eq!(got[0]["case"], json!("c0"));
3946 assert_eq!(got[2]["score"], json!(0.75));
3947
3948 let slice = read_samples_with_store(
3950 &store,
3951 &id,
3952 SamplesQuery {
3953 offset: 1,
3954 limit: Some(1),
3955 where_: None,
3956 },
3957 )
3958 .unwrap();
3959 assert_eq!(slice.len(), 1);
3960 assert_eq!(slice[0]["case"], json!("c1"));
3961 }
3962
3963 #[test]
3964 fn write_samples_is_write_once() {
3965 let tmp = tempfile::tempdir().unwrap();
3966 let store = FileCardStore::new(tmp.path().to_path_buf());
3967 let (id, _) =
3968 create_with_store(&store, json!({ "pkg": { "name": "write_once_pkg" } })).unwrap();
3969 write_samples_with_store(&store, &id, vec![json!({ "x": 1 })]).unwrap();
3970 let err = write_samples_with_store(&store, &id, vec![json!({ "x": 2 })]).unwrap_err();
3971 assert!(err.contains("already exist"), "got: {err}");
3972 }
3973
3974 #[test]
3986 fn read_samples_empty_when_absent() {
3987 let tmp = tempfile::tempdir().unwrap();
3988 let store = FileCardStore::new(tmp.path().to_path_buf());
3989 let (id, _) = create_with_store(
3990 &store,
3991 json!({ "pkg": { "name": "read_samples_empty_pkg" } }),
3992 )
3993 .unwrap();
3994 let got = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
3995 assert!(got.is_empty());
3996 }
3997
3998 #[test]
3999 fn read_samples_where_filters_rows() {
4000 let tmp = tempfile::tempdir().unwrap();
4001 let store = FileCardStore::new(tmp.path().to_path_buf());
4002 let (id, _) =
4003 create_with_store(&store, json!({ "pkg": { "name": "where_filter_pkg" } })).unwrap();
4004 write_samples_with_store(
4005 &store,
4006 &id,
4007 vec![
4008 json!({ "case": "c0", "passed": true, "score": 1.0 }),
4009 json!({ "case": "c1", "passed": false, "score": 0.0 }),
4010 json!({ "case": "c2", "passed": true, "score": 0.25 }),
4011 json!({ "case": "c3", "passed": true, "score": 0.75 }),
4012 json!({ "case": "c4", "passed": false, "score": 0.5 }),
4013 ],
4014 )
4015 .unwrap();
4016
4017 let pred = parse_where(&json!({ "passed": true })).unwrap();
4019 let got = read_samples_with_store(
4020 &store,
4021 &id,
4022 SamplesQuery {
4023 offset: 0,
4024 limit: None,
4025 where_: Some(pred),
4026 },
4027 )
4028 .unwrap();
4029 assert_eq!(got.len(), 3);
4030 assert_eq!(got[0]["case"], json!("c0"));
4031 assert_eq!(got[1]["case"], json!("c2"));
4032 assert_eq!(got[2]["case"], json!("c3"));
4033
4034 let pred = parse_where(&json!({ "score": { "gte": 0.5 } })).unwrap();
4036 let got = read_samples_with_store(
4037 &store,
4038 &id,
4039 SamplesQuery {
4040 offset: 0,
4041 limit: None,
4042 where_: Some(pred),
4043 },
4044 )
4045 .unwrap();
4046 assert_eq!(got.len(), 3);
4047 assert_eq!(got[0]["case"], json!("c0"));
4048 assert_eq!(got[1]["case"], json!("c3"));
4049 assert_eq!(got[2]["case"], json!("c4"));
4050
4051 let pred = parse_where(&json!({ "passed": true })).unwrap();
4053 let slice = read_samples_with_store(
4054 &store,
4055 &id,
4056 SamplesQuery {
4057 offset: 1,
4058 limit: Some(1),
4059 where_: Some(pred),
4060 },
4061 )
4062 .unwrap();
4063 assert_eq!(slice.len(), 1);
4064 assert_eq!(slice[0]["case"], json!("c2"));
4065 }
4066
4067 #[test]
4068 fn get_by_alias_roundtrip() {
4069 let pkg = unique_pkg();
4070 let (id, _) = create(json!({
4071 "pkg": { "name": pkg },
4072 "stats": { "pass_rate": 0.85 }
4073 }))
4074 .unwrap();
4075
4076 let alias_name = format!("best_{pkg}");
4077 alias_set(&alias_name, &id, Some(&pkg), None).unwrap();
4078
4079 let card = get_by_alias(&alias_name).unwrap().unwrap();
4080 assert_eq!(card["card_id"], json!(id));
4081 assert_eq!(card["stats"]["pass_rate"], json!(0.85));
4082
4083 assert!(get_by_alias("nonexistent_alias_xyz").unwrap().is_none());
4084
4085 cleanup(&pkg);
4086 }
4087
4088 #[test]
4089 fn samples_errors_on_missing_card() {
4090 let tmp = tempfile::tempdir().unwrap();
4091 let store = FileCardStore::new(tmp.path().to_path_buf());
4092 let err = write_samples_with_store(&store, "does_not_exist_xyz_samples", vec![json!({})])
4093 .unwrap_err();
4094 assert!(err.contains("not found"));
4095 }
4096
4097 #[test]
4100 fn import_from_dir_copies_cards() {
4101 let pkg = "import_copies_pkg";
4102 let src_tmp = tempfile::tempdir().unwrap();
4103 let store_tmp = tempfile::tempdir().unwrap();
4104 let store = FileCardStore::new(store_tmp.path().to_path_buf());
4105
4106 let card_id = format!("{pkg}_imported");
4108 let card_content = format!(
4109 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{card_id}\"\npkg = \"{pkg}\"\n"
4110 );
4111 fs::write(
4112 src_tmp.path().join(format!("{card_id}.toml")),
4113 &card_content,
4114 )
4115 .unwrap();
4116
4117 fs::write(
4119 src_tmp.path().join(format!("{card_id}.samples.jsonl")),
4120 "{\"case\":\"c0\"}\n",
4121 )
4122 .unwrap();
4123
4124 let (imported, skipped) = import_from_dir_with_store(&store, src_tmp.path(), pkg).unwrap();
4125 assert_eq!(imported, vec![card_id.clone()]);
4126 assert!(skipped.is_empty());
4127
4128 let got = get_with_store(&store, &card_id).unwrap().unwrap();
4130 assert_eq!(got["card_id"], json!(card_id));
4131
4132 let samples = read_samples_with_store(&store, &card_id, SamplesQuery::default()).unwrap();
4134 assert_eq!(samples.len(), 1);
4135 }
4136
4137 #[test]
4138 fn import_from_dir_skips_existing() {
4139 let pkg = unique_pkg();
4140 let (existing_id, _) = create(json!({
4142 "pkg": { "name": pkg },
4143 "stats": { "pass_rate": 0.5 }
4144 }))
4145 .unwrap();
4146
4147 let tmp = tempfile::tempdir().unwrap();
4149 let card_content = format!(
4150 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{existing_id}\"\npkg = \"{pkg}\"\n"
4151 );
4152 fs::write(
4153 tmp.path().join(format!("{existing_id}.toml")),
4154 &card_content,
4155 )
4156 .unwrap();
4157
4158 let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4159 assert!(imported.is_empty());
4160 assert_eq!(skipped, vec![existing_id.clone()]);
4161
4162 let got = get(&existing_id).unwrap().unwrap();
4164 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
4165
4166 cleanup(&pkg);
4167 }
4168
4169 #[test]
4170 fn import_from_dir_skips_non_card_toml() {
4171 let pkg = unique_pkg();
4172 let tmp = tempfile::tempdir().unwrap();
4173
4174 fs::write(tmp.path().join("not_a_card.toml"), "title = \"hello\"\n").unwrap();
4176
4177 let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4178 assert!(imported.is_empty());
4179 assert!(skipped.is_empty());
4180
4181 cleanup(&pkg);
4182 }
4183
4184 #[test]
4190 fn custom_root_file_store_roundtrip() {
4191 let tmp = tempfile::tempdir().unwrap();
4192 let store = FileCardStore::new(tmp.path().to_path_buf());
4193 let pkg = "custom_root_pkg";
4194
4195 let (id, path) = create_with_store(
4197 &store,
4198 json!({
4199 "pkg": { "name": pkg },
4200 "model": { "id": "gpt-test" },
4201 }),
4202 )
4203 .unwrap();
4204 assert!(path.starts_with(tmp.path()));
4205 assert!(path.ends_with(format!("{id}.toml")));
4206
4207 let card = get_with_store(&store, &id).unwrap().expect("card exists");
4208 assert_eq!(
4209 card.get("card_id").and_then(|v| v.as_str()),
4210 Some(id.as_str())
4211 );
4212
4213 let rows = list_with_store(&store, Some(pkg)).unwrap();
4214 assert_eq!(rows.len(), 1);
4215 assert_eq!(rows[0].card_id, id);
4216
4217 let default_rows = list(Some(pkg)).unwrap();
4219 assert!(default_rows.iter().all(|r| r.card_id != id));
4220
4221 alias_set_with_store(&store, "alpha", &id, Some(pkg), None).unwrap();
4223 let via_alias = get_by_alias_with_store(&store, "alpha")
4224 .unwrap()
4225 .expect("alias resolves");
4226 assert_eq!(
4227 via_alias.get("card_id").and_then(|v| v.as_str()),
4228 Some(id.as_str())
4229 );
4230
4231 let samples_path =
4233 write_samples_with_store(&store, &id, vec![json!({ "case": "a", "pass": true })])
4234 .unwrap();
4235 assert!(samples_path.starts_with(tmp.path()));
4236 let back = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4237 assert_eq!(back.len(), 1);
4238 assert_eq!(back[0].get("case").and_then(|v| v.as_str()), Some("a"));
4239 }
4240
4241 use std::sync::atomic::AtomicUsize;
4246 use std::sync::Barrier;
4247
4248 fn env_lock() -> &'static Mutex<()> {
4251 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4252 LOCK.get_or_init(|| Mutex::new(()))
4253 }
4254
4255 struct BusTestOwnerGuard;
4261 impl Drop for BusTestOwnerGuard {
4262 fn drop(&mut self) {
4263 INSIDE_BUS_TEST.with(|flag| flag.set(false));
4264 }
4265 }
4266
4267 fn with_bus_subscribers<F>(subs: Vec<Arc<dyn CardSubscriber>>, f: F)
4282 where
4283 F: FnOnce(&'static CardEventBus),
4284 {
4285 let _guard = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
4288 INSIDE_BUS_TEST.with(|flag| flag.set(true));
4292 let _owner = BusTestOwnerGuard;
4293 let bus = event_bus();
4294 bus.reset_stats_for_test();
4295 bus.replace_subscribers_for_test(subs);
4296 f(bus);
4297 bus.replace_subscribers_for_test(Vec::new());
4299 bus.reset_stats_for_test();
4300 }
4303
4304 struct MockSubscriber {
4306 uri: String,
4307 events: Mutex<Vec<CardEvent>>,
4308 calls: AtomicUsize,
4309 }
4310
4311 impl MockSubscriber {
4312 fn new(uri: &str) -> Arc<Self> {
4313 Arc::new(Self {
4314 uri: uri.to_string(),
4315 events: Mutex::new(Vec::new()),
4316 calls: AtomicUsize::new(0),
4317 })
4318 }
4319 fn call_count(&self) -> usize {
4320 self.calls.load(Ordering::SeqCst)
4321 }
4322 }
4323
4324 impl CardSubscriber for MockSubscriber {
4325 fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
4326 self.calls.fetch_add(1, Ordering::SeqCst);
4327 self.events
4328 .lock()
4329 .unwrap_or_else(|p| p.into_inner())
4330 .push(ev.clone());
4331 Ok(())
4332 }
4333 fn describe(&self) -> String {
4334 self.uri.clone()
4335 }
4336 }
4337
4338 #[test]
4341 fn bus_is_process_singleton() {
4342 let a = event_bus() as *const CardEventBus;
4343 let b = event_bus() as *const CardEventBus;
4344 assert_eq!(a, b, "event_bus() must return the same singleton pointer");
4345 }
4346
4347 #[test]
4348 fn publish_with_no_subscribers_is_noop() {
4349 with_bus_subscribers(Vec::new(), |_bus| {
4350 publish(CardEvent::Created {
4352 pkg: "pkg".into(),
4353 card_id: "id".into(),
4354 toml_text: "x = 1\n".into(),
4355 });
4356 });
4357 }
4358
4359 #[test]
4360 fn init_event_bus_is_idempotent() {
4361 init_event_bus();
4362 init_event_bus();
4363 init_event_bus();
4364 }
4366
4367 #[test]
4370 fn fanout_mirrors_create() {
4371 let primary = tempfile::tempdir().unwrap();
4372 let sub_a = tempfile::tempdir().unwrap();
4373 let sub_b = tempfile::tempdir().unwrap();
4374 let fa = Arc::new(FileCardSubscriber::new(sub_a.path().to_path_buf()));
4375 let fb = Arc::new(FileCardSubscriber::new(sub_b.path().to_path_buf()));
4376 with_bus_subscribers(vec![fa.clone(), fb.clone()], |_bus| {
4377 let store = FileCardStore::new(primary.path().to_path_buf());
4378 let (id, path) =
4379 create_with_store(&store, json!({ "pkg": { "name": "fanout_create_pkg" } }))
4380 .unwrap();
4381 assert!(path.exists());
4382 let primary_text = fs::read_to_string(&path).unwrap();
4383 let a_path = sub_a
4384 .path()
4385 .join("fanout_create_pkg")
4386 .join(format!("{id}.toml"));
4387 let b_path = sub_b
4388 .path()
4389 .join("fanout_create_pkg")
4390 .join(format!("{id}.toml"));
4391 assert!(a_path.exists(), "subscriber A missing card");
4392 assert!(b_path.exists(), "subscriber B missing card");
4393 assert_eq!(fs::read_to_string(&a_path).unwrap(), primary_text);
4394 assert_eq!(fs::read_to_string(&b_path).unwrap(), primary_text);
4395 });
4396 }
4397
4398 #[test]
4399 fn fanout_mirrors_append() {
4400 let primary = tempfile::tempdir().unwrap();
4401 let sub = tempfile::tempdir().unwrap();
4402 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4403 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4404 let store = FileCardStore::new(primary.path().to_path_buf());
4405 let (id, _) =
4406 create_with_store(&store, json!({ "pkg": { "name": "fanout_append_pkg" } }))
4407 .unwrap();
4408 append_with_store(&store, &id, json!({ "extra_key": 42 })).unwrap();
4410 let sub_path = sub
4411 .path()
4412 .join("fanout_append_pkg")
4413 .join(format!("{id}.toml"));
4414 let text = fs::read_to_string(&sub_path).unwrap();
4415 assert!(text.contains("extra_key"), "append must mirror new key");
4416 });
4417 }
4418
4419 #[test]
4420 fn fanout_mirrors_samples() {
4421 let primary = tempfile::tempdir().unwrap();
4422 let sub = tempfile::tempdir().unwrap();
4423 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4424 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4425 let store = FileCardStore::new(primary.path().to_path_buf());
4426 let (id, _) =
4427 create_with_store(&store, json!({ "pkg": { "name": "fanout_samples_pkg" } }))
4428 .unwrap();
4429 write_samples_with_store(&store, &id, vec![json!({ "case": "c0" })]).unwrap();
4430 let sub_path = sub
4431 .path()
4432 .join("fanout_samples_pkg")
4433 .join(format!("{id}.samples.jsonl"));
4434 let text = fs::read_to_string(&sub_path).unwrap();
4435 assert!(text.contains("\"case\":\"c0\""));
4436 });
4437 }
4438
4439 #[test]
4440 fn fanout_mirrors_aliases() {
4441 let primary = tempfile::tempdir().unwrap();
4442 let sub = tempfile::tempdir().unwrap();
4443 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4444 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4445 let store = FileCardStore::new(primary.path().to_path_buf());
4446 let (id, _) =
4447 create_with_store(&store, json!({ "pkg": { "name": "fanout_alias_pkg" } }))
4448 .unwrap();
4449 alias_set_with_store(&store, "myalias", &id, Some("fanout_alias_pkg"), None).unwrap();
4450 let sub_aliases = sub.path().join("_aliases.toml");
4451 assert!(sub_aliases.exists(), "subscriber must receive aliases file");
4452 let text = fs::read_to_string(&sub_aliases).unwrap();
4453 assert!(text.contains("myalias"));
4454 });
4455 }
4456
4457 #[test]
4458 fn fanout_mirrors_import_from_dir_cards() {
4459 let primary = tempfile::tempdir().unwrap();
4460 let sub = tempfile::tempdir().unwrap();
4461 let src = tempfile::tempdir().unwrap();
4462
4463 let src_card = src.path().join("card_x.toml");
4465 let toml_body = format!(
4466 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_x\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_pkg\"\n"
4467 );
4468 fs::write(&src_card, &toml_body).unwrap();
4469
4470 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4471 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4472 let store = FileCardStore::new(primary.path().to_path_buf());
4473 let (imported, _skipped) =
4474 import_from_dir_with_store(&store, src.path(), "fanout_import_pkg").unwrap();
4475 assert_eq!(imported, vec!["card_x".to_string()]);
4476
4477 let sub_card = sub.path().join("fanout_import_pkg").join("card_x.toml");
4478 assert!(sub_card.exists(), "imported card must be mirrored");
4479 });
4480 }
4481
4482 #[test]
4483 fn fanout_mirrors_import_from_dir_samples() {
4484 let primary = tempfile::tempdir().unwrap();
4485 let sub = tempfile::tempdir().unwrap();
4486 let src = tempfile::tempdir().unwrap();
4487
4488 let toml_body = format!(
4489 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_y\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_samples_pkg\"\n"
4490 );
4491 fs::write(src.path().join("card_y.toml"), &toml_body).unwrap();
4492 fs::write(
4493 src.path().join("card_y.samples.jsonl"),
4494 "{\"case\":\"c0\"}\n",
4495 )
4496 .unwrap();
4497
4498 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4499 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4500 let store = FileCardStore::new(primary.path().to_path_buf());
4501 let (imported, _) =
4502 import_from_dir_with_store(&store, src.path(), "fanout_import_samples_pkg")
4503 .unwrap();
4504 assert_eq!(imported, vec!["card_y".to_string()]);
4505
4506 let sub_samples = sub
4507 .path()
4508 .join("fanout_import_samples_pkg")
4509 .join("card_y.samples.jsonl");
4510 assert!(sub_samples.exists(), "imported samples must be mirrored");
4511 let text = fs::read_to_string(&sub_samples).unwrap();
4512 assert!(text.contains("c0"));
4513 });
4514 }
4515
4516 #[test]
4517 fn with_store_direct_call_still_publishes() {
4518 let primary = tempfile::tempdir().unwrap();
4519 let mock = MockSubscriber::new("mock://direct");
4520 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4521 let store = FileCardStore::new(primary.path().to_path_buf());
4522 create_with_store(&store, json!({ "pkg": { "name": "direct_call_pkg" } })).unwrap();
4523 assert_eq!(mock.call_count(), 1, "direct _with_store call must publish");
4524 });
4525 }
4526
4527 #[test]
4528 fn subscriber_appended_missing_card_warns() {
4529 let primary = tempfile::tempdir().unwrap();
4530 let sub = tempfile::tempdir().unwrap();
4531 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4532 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
4533 let store = FileCardStore::new(primary.path().to_path_buf());
4534 bus.replace_subscribers_for_test(Vec::new());
4537 let (id, _) =
4538 create_with_store(&store, json!({ "pkg": { "name": "missing_append_pkg" } }))
4539 .unwrap();
4540 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
4542
4543 append_with_store(&store, &id, json!({ "k": 1 })).unwrap();
4546
4547 let snap = bus.stats().snapshot();
4548 let row = snap
4549 .iter()
4550 .find(|r| r.sink == fs_sub.describe())
4551 .expect("subscriber row exists");
4552 let err_total: u64 = row.err.values().sum();
4553 assert!(err_total >= 1, "subscriber append err must be recorded");
4554 assert!(row.last_error.is_some());
4555 });
4556 }
4557
4558 #[test]
4559 fn subscriber_failure_preserves_primary() {
4560 struct FailingSubscriber;
4561 impl CardSubscriber for FailingSubscriber {
4562 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4563 Err("synthetic failure".into())
4564 }
4565 fn describe(&self) -> String {
4566 "mock://failing".into()
4567 }
4568 }
4569 let primary = tempfile::tempdir().unwrap();
4570 with_bus_subscribers(
4571 vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4572 |bus| {
4573 let store = FileCardStore::new(primary.path().to_path_buf());
4574 let (_id, path) =
4576 create_with_store(&store, json!({ "pkg": { "name": "preserve_primary_pkg" } }))
4577 .unwrap();
4578 assert!(path.exists());
4579 let snap = bus.stats().snapshot();
4580 let row = snap
4581 .iter()
4582 .find(|r| r.sink == "mock://failing")
4583 .expect("row exists");
4584 let err_total: u64 = row.err.values().sum();
4585 assert!(err_total >= 1);
4586 },
4587 );
4588 }
4589
4590 #[test]
4593 fn stats_counts_ok() {
4594 let primary = tempfile::tempdir().unwrap();
4595 let mock = MockSubscriber::new("mock://stats-ok");
4596 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |bus| {
4597 let store = FileCardStore::new(primary.path().to_path_buf());
4598 for i in 0..3 {
4599 create_with_store(
4600 &store,
4601 json!({
4602 "card_id": format!("stats_ok_{i}"),
4603 "pkg": { "name": "stats_ok_pkg" },
4604 }),
4605 )
4606 .unwrap();
4607 }
4608 let snap = bus.stats().snapshot();
4609 let row = snap
4610 .iter()
4611 .find(|r| r.sink == "mock://stats-ok")
4612 .expect("row");
4613 assert_eq!(row.ok.get("created").copied().unwrap_or(0), 3);
4614 assert_eq!(row.err.get("created").copied().unwrap_or(0), 0);
4615 for k in ["created", "appended", "samples", "aliases"] {
4617 assert!(row.ok.contains_key(k), "ok.{k} must be present");
4618 assert!(row.err.contains_key(k), "err.{k} must be present");
4619 }
4620 assert!(row.last_error.is_none());
4621 });
4622 }
4623
4624 #[test]
4625 fn stats_counts_err_with_last_error() {
4626 struct FailingSubscriber;
4627 impl CardSubscriber for FailingSubscriber {
4628 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4629 Err("synthetic create failure".into())
4630 }
4631 fn describe(&self) -> String {
4632 "mock://stats-err".into()
4633 }
4634 }
4635 let primary = tempfile::tempdir().unwrap();
4636 with_bus_subscribers(
4637 vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4638 |bus| {
4639 let store = FileCardStore::new(primary.path().to_path_buf());
4640 create_with_store(&store, json!({ "pkg": { "name": "stats_err_pkg" } })).unwrap();
4641 let snap = bus.stats().snapshot();
4642 let row = snap
4643 .iter()
4644 .find(|r| r.sink == "mock://stats-err")
4645 .expect("row");
4646 assert_eq!(row.err.get("created").copied().unwrap_or(0), 1);
4647 let le = row.last_error.as_ref().expect("last_error set");
4648 assert!(!le.msg.is_empty(), "last_error.msg must be non-empty");
4649 assert_eq!(le.kind, CardEventKind::Created);
4650 assert!(le.ts_ms > 0, "last_error.ts_ms must be populated");
4651 },
4652 );
4653 }
4654
4655 #[test]
4656 fn stats_per_subscriber_isolated() {
4657 struct FailingSubscriber;
4658 impl CardSubscriber for FailingSubscriber {
4659 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4660 Err("sub1 fails".into())
4661 }
4662 fn describe(&self) -> String {
4663 "mock://sub1-fail".into()
4664 }
4665 }
4666 let primary = tempfile::tempdir().unwrap();
4667 let mock_ok = MockSubscriber::new("mock://sub2-ok");
4668 let subs: Vec<Arc<dyn CardSubscriber>> = vec![
4669 Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>,
4670 mock_ok.clone() as Arc<dyn CardSubscriber>,
4671 ];
4672 with_bus_subscribers(subs, |bus| {
4673 let store = FileCardStore::new(primary.path().to_path_buf());
4674 create_with_store(&store, json!({ "pkg": { "name": "isolated_pkg" } })).unwrap();
4675 let snap = bus.stats().snapshot();
4676 let r1 = snap
4677 .iter()
4678 .find(|r| r.sink == "mock://sub1-fail")
4679 .expect("sub1 row");
4680 let r2 = snap
4681 .iter()
4682 .find(|r| r.sink == "mock://sub2-ok")
4683 .expect("sub2 row");
4684 assert_eq!(r1.err.get("created").copied().unwrap_or(0), 1);
4685 assert_eq!(r1.ok.get("created").copied().unwrap_or(0), 0);
4686 assert_eq!(r2.ok.get("created").copied().unwrap_or(0), 1);
4687 assert_eq!(r2.err.get("created").copied().unwrap_or(0), 0);
4688 assert!(r1.last_error.is_some());
4689 assert!(r2.last_error.is_none());
4690 });
4691 }
4692
4693 #[test]
4694 fn subscriber_stats_survive_multiple_calls() {
4695 let primary = tempfile::tempdir().unwrap();
4700 let mock = MockSubscriber::new("mock://stats-survive");
4701 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4702 let store = FileCardStore::new(primary.path().to_path_buf());
4703 for i in 0..3 {
4704 create_with_store(
4705 &store,
4706 json!({
4707 "card_id": format!("survive_{i}"),
4708 "pkg": { "name": "survive_pkg" },
4709 }),
4710 )
4711 .unwrap();
4712 }
4713 let snap = subscriber_stats_snapshot();
4716 let row = snap
4717 .iter()
4718 .find(|r| r.sink == "mock://stats-survive")
4719 .expect("row");
4720 assert_eq!(
4721 row.ok.get("created").copied().unwrap_or(0),
4722 3,
4723 "counters must accumulate across calls"
4724 );
4725 });
4726 }
4727
4728 #[test]
4729 fn stats_snapshot_serializes_with_all_kind_keys() {
4730 let primary = tempfile::tempdir().unwrap();
4732 let mock = MockSubscriber::new("mock://json-shape");
4733 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4734 let store = FileCardStore::new(primary.path().to_path_buf());
4735 create_with_store(&store, json!({ "pkg": { "name": "json_shape_pkg" } })).unwrap();
4736 let snap = subscriber_stats_snapshot();
4737 let json = serde_json::to_value(&snap).expect("serializable");
4738 let arr = json.as_array().expect("array");
4739 let row = arr
4740 .iter()
4741 .find(|r| r.get("sink").and_then(|s| s.as_str()) == Some("mock://json-shape"))
4742 .expect("row present in JSON");
4743 assert_eq!(row.get("sink").unwrap(), "mock://json-shape");
4744 for k in ["created", "appended", "samples", "aliases"] {
4745 assert!(row.pointer(&format!("/ok/{k}")).is_some(), "ok.{k} missing");
4746 assert!(
4747 row.pointer(&format!("/err/{k}")).is_some(),
4748 "err.{k} missing"
4749 );
4750 }
4751 assert!(row.get("last_error").is_some());
4752 });
4753 }
4754
4755 #[test]
4756 fn multi_process_tmp_unique_suffix() {
4757 let pid = process::id();
4764 let sample = format!("whatever.tmp.{pid}.0");
4765 let rest = sample.trim_start_matches("whatever.tmp.");
4767 let (pid_part, seq_part) = rest.split_once('.').expect("dotted form");
4768 assert!(pid_part.chars().all(|c| c.is_ascii_digit()));
4769 assert!(seq_part.chars().all(|c| c.is_ascii_digit()));
4770
4771 let dir = tempfile::tempdir().unwrap();
4774 let dest = dir.path().join("out.txt");
4775 atomic_write(&dest, b"hello").unwrap();
4776 assert_eq!(fs::read_to_string(&dest).unwrap(), "hello");
4777 }
4778
4779 #[cfg(unix)]
4782 #[test]
4783 fn describe_roundtrips_env_spec() {
4784 let dir = tempfile::tempdir().unwrap();
4785 let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4786 let uri = sub.describe();
4787 assert!(uri.starts_with("file:///"), "unix uri must be file:///...");
4788 let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4790 assert_eq!(parsed.describe(), uri);
4791 }
4792
4793 #[cfg(windows)]
4794 #[test]
4795 fn describe_roundtrips_env_spec_windows() {
4796 let dir = tempfile::tempdir().unwrap();
4797 let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4798 let uri = sub.describe();
4799 assert!(
4800 uri.starts_with("file:///"),
4801 "windows uri must be file:///..."
4802 );
4803 let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4804 assert_eq!(parsed.describe(), uri);
4805 }
4806
4807 #[test]
4808 fn env_empty_means_no_subscribers() {
4809 let _g = env_lock().lock().unwrap_or_else(|p| p.into_inner());
4812 let prev = std::env::var("ALC_CARD_SINKS").ok();
4813 unsafe {
4815 std::env::set_var("ALC_CARD_SINKS", "");
4816 }
4817 let subs = load_subscribers_from_env();
4818 assert!(subs.is_empty());
4819 unsafe {
4821 match prev {
4822 Some(v) => std::env::set_var("ALC_CARD_SINKS", v),
4823 None => std::env::remove_var("ALC_CARD_SINKS"),
4824 }
4825 }
4826 }
4827
4828 #[test]
4829 fn env_parse_rejects_bare_path() {
4830 assert!(parse_subscriber_spec("/foo/bar").is_none());
4831 }
4832
4833 #[test]
4834 fn env_parse_rejects_unknown_scheme() {
4835 assert!(parse_subscriber_spec("sqlite:///foo").is_none());
4836 assert!(parse_subscriber_spec("s3://bucket/foo").is_none());
4837 assert!(parse_subscriber_spec("http://example.com/x").is_none());
4838 }
4839
4840 #[test]
4841 fn env_parse_rejects_non_empty_authority() {
4842 assert!(parse_subscriber_spec("file://host/path").is_none());
4843 }
4844
4845 #[test]
4846 fn env_parse_rejects_missing_double_slash() {
4847 assert!(parse_subscriber_spec("file:/foo").is_none());
4848 assert!(parse_subscriber_spec("file:foo").is_none());
4849 }
4850
4851 #[cfg(unix)]
4852 #[test]
4853 fn env_parse_accepts_file_uri() {
4854 let sub = parse_subscriber_spec("file:///tmp/algocline-sinks-unit").expect("accepted");
4855 assert_eq!(sub.describe(), "file:///tmp/algocline-sinks-unit");
4856 }
4857
4858 #[cfg(windows)]
4859 #[test]
4860 fn env_parse_accepts_file_uri_windows() {
4861 let sub = parse_subscriber_spec("file:///C:/algocline-sinks-unit").expect("accepted");
4862 assert!(sub.describe().starts_with("file:///"));
4864 }
4865
4866 #[test]
4867 fn env_parse_splits_by_pipe() {
4868 let subs = parse_subscribers_from_str("file:///tmp/a|file:///tmp/b");
4869 assert_eq!(subs.len(), 2);
4870 assert_eq!(subs[0].describe(), "file:///tmp/a");
4871 assert_eq!(subs[1].describe(), "file:///tmp/b");
4872 }
4873
4874 #[test]
4875 fn env_parse_treats_colon_as_literal_path() {
4876 #[cfg(unix)]
4878 {
4879 let sub = parse_subscriber_spec("file:///tmp/a:b").expect("accepted");
4880 assert_eq!(sub.describe(), "file:///tmp/a:b");
4881 }
4882 #[cfg(windows)]
4883 {
4884 let sub = parse_subscriber_spec("file:///C:/a:b").expect("accepted");
4886 assert!(sub.describe().contains(":"));
4887 }
4888 }
4889
4890 #[test]
4891 fn env_parse_percent_decode_space() {
4892 #[cfg(unix)]
4893 {
4894 let sub = parse_subscriber_spec("file:///tmp/a%20b").expect("accepted");
4895 assert_eq!(sub.describe(), "file:///tmp/a b");
4896 }
4897 #[cfg(windows)]
4898 {
4899 let sub = parse_subscriber_spec("file:///C:/a%20b").expect("accepted");
4900 assert!(sub.describe().contains(' '));
4901 }
4902 }
4903
4904 #[test]
4905 fn env_parse_percent_decode_rejects_invalid_hex() {
4906 assert!(parse_subscriber_spec("file:///tmp/a%ZZb").is_none());
4907 }
4908
4909 #[test]
4910 fn env_parse_percent_decode_rejects_incomplete() {
4911 assert!(parse_subscriber_spec("file:///tmp/a%2").is_none());
4912 assert!(parse_subscriber_spec("file:///tmp/a%").is_none());
4913 }
4914
4915 #[test]
4916 fn env_parse_rejects_non_utf8() {
4917 assert!(parse_subscriber_spec("file:///tmp/%C3%28").is_none());
4926 }
4927
4928 #[test]
4929 fn env_parse_dedups_duplicate_uris() {
4930 let subs = parse_subscribers_from_str("file:///tmp/x|file:///tmp/x|file:///tmp/y");
4931 assert_eq!(subs.len(), 2);
4932 assert_eq!(subs[0].describe(), "file:///tmp/x");
4933 assert_eq!(subs[1].describe(), "file:///tmp/y");
4934 }
4935
4936 #[test]
4941 fn test_oncelock_init_race_single_winner() {
4942 let barrier = Arc::new(Barrier::new(8));
4945 let mut handles = Vec::new();
4946 for _ in 0..8 {
4947 let b = barrier.clone();
4948 handles.push(std::thread::spawn(move || {
4949 b.wait();
4950 event_bus() as *const CardEventBus as usize
4951 }));
4952 }
4953 let ptrs: Vec<usize> = handles.into_iter().map(|h| h.join().unwrap()).collect();
4954 let first = ptrs[0];
4955 for p in &ptrs {
4956 assert_eq!(*p, first, "singleton identity must hold across threads");
4957 }
4958 }
4959
4960 #[test]
4961 fn test_subscriber_stats_concurrent_update() {
4962 let stats = Arc::new(SubscriberStats::default());
4963 let n_threads = 4;
4964 let per_thread = 250;
4965 let barrier = Arc::new(Barrier::new(n_threads));
4966 let mut handles = Vec::new();
4967 for t in 0..n_threads {
4968 let s = stats.clone();
4969 let b = barrier.clone();
4970 handles.push(std::thread::spawn(move || {
4971 b.wait();
4972 for i in 0..per_thread {
4973 let kind = if (t + i) % 2 == 0 {
4974 CardEventKind::Created
4975 } else {
4976 CardEventKind::Appended
4977 };
4978 s.record_ok("mock://same-subscriber", kind);
4979 }
4980 }));
4981 }
4982 for h in handles {
4983 h.join().unwrap();
4984 }
4985 let snap = stats.snapshot();
4986 let row = snap
4987 .iter()
4988 .find(|r| r.sink == "mock://same-subscriber")
4989 .expect("row");
4990 let expected = (n_threads * per_thread) as u64;
4991 let ok_total: u64 = row.ok.values().sum();
4992 assert_eq!(ok_total, expected, "all increments must be counted");
4993 }
4994
4995 #[test]
4996 fn test_subscriber_stats_poison_recovery() {
4997 let stats = Arc::new(SubscriberStats::default());
4998 stats.record_ok("mock://poison", CardEventKind::Created);
5000
5001 let s_clone = stats.clone();
5003 let _ = std::thread::spawn(move || {
5004 let _g = s_clone.inner.lock().unwrap();
5005 panic!("intentional poison");
5006 })
5007 .join();
5008
5009 let snap = stats.snapshot();
5011 assert!(!snap.is_empty(), "snapshot after poison must still work");
5012 let ok1: u64 = snap[0].ok.values().sum();
5013 assert_eq!(ok1, 1);
5014
5015 stats.record_ok("mock://poison", CardEventKind::Created);
5017 let snap2 = stats.snapshot();
5018 let ok2: u64 = snap2[0].ok.values().sum();
5019 assert_eq!(ok2, 2);
5020 }
5021
5022 #[test]
5023 fn test_atomic_tmp_seq_unique_under_concurrency() {
5024 let dir = tempfile::tempdir().unwrap();
5027 let barrier = Arc::new(Barrier::new(8));
5028 let mut handles = Vec::new();
5029 for i in 0..8 {
5030 let d = dir.path().to_path_buf();
5031 let b = barrier.clone();
5032 handles.push(std::thread::spawn(move || {
5033 b.wait();
5034 let dest = d.join(format!("file_{i}.bin"));
5035 atomic_write(&dest, b"x").unwrap();
5036 dest.file_name().unwrap().to_string_lossy().to_string()
5038 }));
5039 }
5040 let names: HashSet<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5041 assert_eq!(names.len(), 8, "all dest names must be unique");
5042 }
5045
5046 #[test]
5047 fn test_atomic_tmp_seq_wraps_without_panic() {
5048 let seq = AtomicU64::new(u64::MAX - 1);
5051 let a = seq.fetch_add(1, Ordering::Relaxed);
5052 let b = seq.fetch_add(1, Ordering::Relaxed);
5053 let c = seq.fetch_add(1, Ordering::Relaxed);
5054 assert_eq!(a, u64::MAX - 1);
5055 assert_eq!(b, u64::MAX);
5056 assert_eq!(c, 0, "u64 fetch_add must wrap to 0");
5057 }
5058
5059 #[test]
5060 fn test_rename_atomicity_same_volume() {
5061 let dir = tempfile::tempdir().unwrap();
5065 let dest = dir.path().join("shared.bin");
5066 let barrier = Arc::new(Barrier::new(2));
5067 let mut handles = Vec::new();
5068 for i in 0..2u8 {
5069 let d = dest.clone();
5070 let b = barrier.clone();
5071 handles.push(std::thread::spawn(move || {
5072 b.wait();
5073 let payload = vec![i; 64];
5074 atomic_write(&d, &payload)
5075 }));
5076 }
5077 let mut saw_ok = 0;
5078 for h in handles {
5079 #[cfg(unix)]
5080 {
5081 h.join().unwrap().unwrap();
5083 saw_ok += 1;
5084 }
5085 #[cfg(not(unix))]
5086 {
5087 if h.join().unwrap().is_ok() {
5089 saw_ok += 1;
5090 }
5091 }
5092 }
5093 assert!(saw_ok >= 1, "at least one rename must succeed");
5094 assert!(dest.exists(), "dest must exist after concurrent rename");
5095 let bytes = fs::read(&dest).unwrap();
5096 assert!(bytes == vec![0u8; 64] || bytes == vec![1u8; 64]);
5097 }
5098
5099 #[test]
5100 fn test_fanout_concurrent_create_with_store() {
5101 let primary = tempfile::tempdir().unwrap();
5102 let sub = tempfile::tempdir().unwrap();
5103 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
5104 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5105 let primary_path = primary.path().to_path_buf();
5106 let barrier = Arc::new(Barrier::new(4));
5107 let mut handles = Vec::new();
5108 for i in 0..4 {
5109 let pp = primary_path.clone();
5110 let b = barrier.clone();
5111 handles.push(std::thread::spawn(move || {
5112 INSIDE_BUS_TEST.with(|flag| flag.set(true));
5118 b.wait();
5119 let store = FileCardStore::new(pp);
5120 create_with_store(
5121 &store,
5122 json!({
5123 "card_id": format!("concur_card_{i}"),
5124 "pkg": { "name": "concur_pkg" },
5125 }),
5126 )
5127 .unwrap()
5128 .0
5129 }));
5130 }
5131 let ids: Vec<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5132 assert_eq!(ids.len(), 4);
5133 for id in &ids {
5134 let p = sub.path().join("concur_pkg").join(format!("{id}.toml"));
5135 assert!(p.exists(), "subscriber must have card {id}");
5136 }
5137 let snap = bus.stats().snapshot();
5138 let row = snap
5139 .iter()
5140 .find(|r| r.sink == fs_sub.describe())
5141 .expect("row");
5142 let ok_total: u64 = row.ok.values().sum();
5143 assert_eq!(
5144 ok_total, 4,
5145 "subscriber must have recorded 4 successful deliveries"
5146 );
5147 });
5148 }
5149
5150 fn backfill_primary_with_cards(
5156 pkg: &str,
5157 count: usize,
5158 ) -> (tempfile::TempDir, FileCardStore, Vec<String>) {
5159 let primary = tempfile::tempdir().unwrap();
5160 let store = FileCardStore::new(primary.path().to_path_buf());
5161 let mut ids = Vec::new();
5162 for i in 0..count {
5163 let (id, _) = create_with_store(
5164 &store,
5165 json!({
5166 "card_id": format!("{pkg}_{i}"),
5167 "pkg": { "name": pkg },
5168 }),
5169 )
5170 .unwrap();
5171 ids.push(id);
5172 }
5173 (primary, store, ids)
5174 }
5175
5176 #[test]
5177 fn backfill_pushes_missing_cards() {
5178 let sub_dir = tempfile::tempdir().unwrap();
5179 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5180 let uri = fs_sub.describe();
5181 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5182 let bus = event_bus();
5184 bus.replace_subscribers_for_test(Vec::new());
5185 let (_primary, store, ids) = backfill_primary_with_cards("backfill_push_pkg", 2);
5186 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5187
5188 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5189 assert_eq!(report.pushed.len(), 2);
5190 assert_eq!(report.skipped.len(), 0);
5191 assert!(report.failed.is_empty());
5192 for id in &ids {
5193 let p = sub_dir
5194 .path()
5195 .join("backfill_push_pkg")
5196 .join(format!("{id}.toml"));
5197 assert!(p.exists(), "card {id} must exist on subscriber");
5198 }
5199 });
5200 }
5201
5202 #[test]
5203 fn backfill_skips_existing_on_subscriber() {
5204 let sub_dir = tempfile::tempdir().unwrap();
5205 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5206 let uri = fs_sub.describe();
5207 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5208 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_skip_pkg", 3);
5210 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5211 assert_eq!(report.pushed.len(), 0);
5212 assert_eq!(report.skipped.len(), 3);
5213 assert!(report.failed.is_empty());
5214 });
5215 }
5216
5217 #[test]
5218 fn backfill_dry_run_no_writes() {
5219 let sub_dir = tempfile::tempdir().unwrap();
5220 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5221 let uri = fs_sub.describe();
5222 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5223 let bus = event_bus();
5224 bus.replace_subscribers_for_test(Vec::new());
5225 let (_primary, store, ids) = backfill_primary_with_cards("backfill_dry_pkg", 2);
5226 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5227
5228 let report = card_sink_backfill_with_store(&store, &uri, true).unwrap();
5229 assert_eq!(
5230 report.pushed.len(),
5231 2,
5232 "pushed must list ids even in dry run"
5233 );
5234 for id in &ids {
5235 let p = sub_dir
5236 .path()
5237 .join("backfill_dry_pkg")
5238 .join(format!("{id}.toml"));
5239 assert!(!p.exists(), "dry run must NOT write card {id}");
5240 }
5241 let snap = bus.stats().snapshot();
5243 if let Some(row) = snap.iter().find(|r| r.sink == uri) {
5244 let total: u64 = row.ok.values().sum::<u64>() + row.err.values().sum::<u64>();
5245 assert_eq!(total, 0, "dry run must not touch stats");
5246 }
5247 });
5248 }
5249
5250 #[test]
5251 fn backfill_drifted_card_skipped_not_overwritten() {
5252 let sub_dir = tempfile::tempdir().unwrap();
5253 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5254 let uri = fs_sub.describe();
5255 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5256 let bus = event_bus();
5257 bus.replace_subscribers_for_test(Vec::new());
5258 let (_primary, store, ids) = backfill_primary_with_cards("backfill_drift_pkg", 1);
5259 let id = &ids[0];
5260
5261 let sub_card_dir = sub_dir.path().join("backfill_drift_pkg");
5263 fs::create_dir_all(&sub_card_dir).unwrap();
5264 let sub_card = sub_card_dir.join(format!("{id}.toml"));
5265 fs::write(&sub_card, "drifted=true\n").unwrap();
5266
5267 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5268 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5269 assert_eq!(report.skipped, vec![id.clone()]);
5270 assert!(report.pushed.is_empty());
5271 let after = fs::read_to_string(&sub_card).unwrap();
5272 assert_eq!(after, "drifted=true\n", "drifted copy must be preserved");
5273 });
5274 }
5275
5276 #[test]
5277 fn backfill_includes_samples() {
5278 let sub_dir = tempfile::tempdir().unwrap();
5279 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5280 let uri = fs_sub.describe();
5281 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5282 let bus = event_bus();
5283 bus.replace_subscribers_for_test(Vec::new());
5284 let (_primary, store, ids) = backfill_primary_with_cards("backfill_samples_pkg", 1);
5285 let id = &ids[0];
5286 write_samples_with_store(&store, id, vec![json!({ "case": "c0" })]).unwrap();
5287 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5288
5289 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5290 assert_eq!(report.pushed, vec![id.clone()]);
5291 assert_eq!(report.pushed_samples, vec![id.clone()]);
5292 let sub_samples = sub_dir
5293 .path()
5294 .join("backfill_samples_pkg")
5295 .join(format!("{id}.samples.jsonl"));
5296 assert!(sub_samples.exists());
5297 assert!(fs::read_to_string(&sub_samples).unwrap().contains("c0"));
5298 });
5299 }
5300
5301 #[test]
5302 fn backfill_unknown_sink_err() {
5303 with_bus_subscribers(Vec::new(), |_bus| {
5304 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_unknown_pkg", 1);
5305 let err = card_sink_backfill_with_store(&store, "file:///nonexistent/sink", false)
5306 .unwrap_err();
5307 assert!(
5308 err.starts_with("unknown sink"),
5309 "must reject unregistered sink; got: {err}"
5310 );
5311 });
5312 }
5313
5314 #[test]
5315 fn backfill_bypasses_bus_fanout() {
5316 let sub_a_dir = tempfile::tempdir().unwrap();
5319 let sub_b_dir = tempfile::tempdir().unwrap();
5320 let fa = Arc::new(FileCardSubscriber::new(sub_a_dir.path().to_path_buf()));
5321 let fb = Arc::new(FileCardSubscriber::new(sub_b_dir.path().to_path_buf()));
5322 let uri_b = fb.describe();
5323 with_bus_subscribers(
5324 vec![
5325 fa.clone() as Arc<dyn CardSubscriber>,
5326 fb.clone() as Arc<dyn CardSubscriber>,
5327 ],
5328 |bus| {
5329 bus.replace_subscribers_for_test(vec![fa.clone()]);
5331 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_bypass_pkg", 2);
5332 let before = bus
5334 .stats()
5335 .snapshot()
5336 .into_iter()
5337 .find(|r| r.sink == fa.describe())
5338 .map(|r| r.ok.get("created").copied().unwrap_or(0))
5339 .unwrap_or(0);
5340 bus.replace_subscribers_for_test(vec![fa.clone(), fb.clone()]);
5342 card_sink_backfill_with_store(&store, &uri_b, false).unwrap();
5343 let after = bus
5344 .stats()
5345 .snapshot()
5346 .into_iter()
5347 .find(|r| r.sink == fa.describe())
5348 .map(|r| r.ok.get("created").copied().unwrap_or(0))
5349 .unwrap_or(0);
5350 assert_eq!(
5351 before, after,
5352 "backfill target B must not cause fan-out to subscriber A"
5353 );
5354 },
5355 );
5356 }
5357
5358 #[test]
5359 fn backfill_updates_subscriber_stats() {
5360 let sub_dir = tempfile::tempdir().unwrap();
5361 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5362 let uri = fs_sub.describe();
5363 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5364 bus.replace_subscribers_for_test(Vec::new());
5365 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_stats_pkg", 2);
5366 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5367
5368 card_sink_backfill_with_store(&store, &uri, false).unwrap();
5369 let snap = bus.stats().snapshot();
5370 let row = snap.iter().find(|r| r.sink == uri).expect("row");
5371 assert_eq!(
5372 row.ok.get("created").copied().unwrap_or(0),
5373 2,
5374 "backfill must increment ok[created] on the target sink"
5375 );
5376 });
5377 }
5378
5379 #[test]
5380 fn backfill_failure_records_err_stat() {
5381 struct FailingSub {
5383 uri: String,
5384 }
5385 impl CardSubscriber for FailingSub {
5386 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
5387 Err("synthetic backfill failure".into())
5388 }
5389 fn has_card(&self, _card_id: &str) -> Result<bool, String> {
5390 Ok(false)
5391 }
5392 fn describe(&self) -> String {
5393 self.uri.clone()
5394 }
5395 }
5396 let uri = "mock://backfill-fail".to_string();
5397 let failing: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5398 with_bus_subscribers(vec![failing], |bus| {
5399 bus.replace_subscribers_for_test(Vec::new());
5400 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_fail_pkg", 1);
5401 let reinstall: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5403 bus.replace_subscribers_for_test(vec![reinstall]);
5404
5405 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5406 assert_eq!(
5407 report.failed.len(),
5408 1,
5409 "failed must record the synthetic err"
5410 );
5411 assert!(report.pushed.is_empty());
5412 let snap = bus.stats().snapshot();
5413 let row = snap.iter().find(|r| r.sink == uri).expect("row");
5414 assert!(
5415 row.err.get("created").copied().unwrap_or(0) >= 1,
5416 "failing publish must increment err[created]"
5417 );
5418 assert!(row.last_error.is_some());
5419 });
5420 }
5421
5422 #[test]
5423 fn test_oncelock_set_after_init_returns_err() {
5424 let _ = event_bus();
5426 let result = install_event_bus_for_test(CardEventBus::new(Vec::new()));
5427 assert!(
5428 result.is_err(),
5429 "install after init must return Err per OnceLock contract"
5430 );
5431 assert_eq!(result.unwrap_err(), "bus already initialized");
5432 }
5433}