1use std::collections::{HashMap, HashSet};
87use std::fs;
88use std::path::{Path, PathBuf};
89use std::process;
90use std::sync::atomic::{AtomicU64, Ordering};
91use std::sync::{Arc, Mutex, OnceLock};
92
93use serde::{Serialize, Serializer};
94use serde_json::{json, Value as Json};
95
96pub const SCHEMA_VERSION: &str = "card/v0";
97
98pub trait CardStore: Send + Sync {
122 fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
133
134 fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String>;
140
141 fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String>;
143
144 fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String>;
146
147 fn list_card_locators(
154 &self,
155 pkg_filter: Option<&str>,
156 ) -> Result<Vec<(String, PathBuf)>, String>;
157
158 fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String>;
162
163 fn read_aliases(&self) -> Result<Vec<Alias>, String>;
166 fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String>;
167
168 fn samples_exists(&self, card_id: &str) -> Result<bool, String>;
172
173 fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String>;
179
180 fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String>;
183
184 fn import_from_dir(
190 &self,
191 source_dir: &Path,
192 pkg: &str,
193 ) -> Result<(Vec<String>, Vec<String>), String>;
194}
195
196fn default_store() -> Result<FileCardStore, String> {
198 FileCardStore::from_home()
199}
200
201fn cards_dir() -> Result<PathBuf, String> {
203 let home = dirs::home_dir().ok_or("Cannot determine home directory")?;
204 let dir = home.join(".algocline").join("cards");
205 if !dir.exists() {
206 fs::create_dir_all(&dir).map_err(|e| format!("Failed to create cards dir: {e}"))?;
207 }
208 Ok(dir)
209}
210
211fn validate_name(name: &str, kind: &str) -> Result<(), String> {
212 if name.is_empty()
213 || name.contains('/')
214 || name.contains('\\')
215 || name.contains("..")
216 || name.contains('\0')
217 {
218 return Err(format!("Invalid {kind} name: '{name}'"));
219 }
220 Ok(())
221}
222
223fn djb2_hex(s: &str) -> String {
225 let mut h: u64 = 5381;
226 for b in s.bytes() {
227 h = h.wrapping_mul(33).wrapping_add(b as u64);
228 }
229 format!("{h:016x}")
230}
231
232fn hash6(s: &str) -> String {
239 let hex = djb2_hex(s);
240 let start = hex.len().saturating_sub(6);
241 hex[start..].to_string()
242}
243
244fn stable_json(v: &Json) -> String {
246 let mut buf = String::new();
247 stable_json_into(v, &mut buf);
248 buf
249}
250fn stable_json_into(v: &Json, buf: &mut String) {
251 match v {
252 Json::Null => buf.push_str("null"),
253 Json::Bool(b) => buf.push_str(if *b { "true" } else { "false" }),
254 Json::Number(n) => buf.push_str(&n.to_string()),
255 Json::String(s) => {
256 buf.push('"');
257 buf.push_str(s);
258 buf.push('"');
259 }
260 Json::Array(a) => {
261 buf.push('[');
262 for (i, item) in a.iter().enumerate() {
263 if i > 0 {
264 buf.push(',');
265 }
266 stable_json_into(item, buf);
267 }
268 buf.push(']');
269 }
270 Json::Object(m) => {
271 let mut keys: Vec<&String> = m.keys().collect();
272 keys.sort();
273 buf.push('{');
274 for (i, k) in keys.iter().enumerate() {
275 if i > 0 {
276 buf.push(',');
277 }
278 buf.push('"');
279 buf.push_str(k);
280 buf.push_str("\":");
281 stable_json_into(&m[*k], buf);
282 }
283 buf.push('}');
284 }
285 }
286}
287
288fn short_model(id: &str) -> String {
291 if id.is_empty() {
292 return "model".into();
293 }
294 let stripped = id
296 .strip_prefix("claude-")
297 .or_else(|| id.strip_prefix("gpt-"))
298 .unwrap_or(id);
299 let s: String = stripped
301 .chars()
302 .filter(|c| c.is_ascii_alphanumeric())
303 .collect();
304 if s.is_empty() {
305 "model".into()
306 } else {
307 s
308 }
309}
310
311fn now_rfc3339() -> String {
313 let secs = std::time::SystemTime::now()
314 .duration_since(std::time::UNIX_EPOCH)
315 .map(|d| d.as_secs())
316 .unwrap_or(0) as i64;
317 let days = secs.div_euclid(86400);
318 let tod = secs.rem_euclid(86400);
319 let (y, mo, d) = civil_from_days(days);
320 let hh = tod / 3600;
321 let mm = (tod % 3600) / 60;
322 let ss = tod % 60;
323 format!("{y:04}-{mo:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
324}
325
326fn now_compact() -> String {
332 let secs = std::time::SystemTime::now()
333 .duration_since(std::time::UNIX_EPOCH)
334 .map(|d| d.as_secs())
335 .unwrap_or(0) as i64;
336 let days = secs.div_euclid(86400);
337 let tod = secs.rem_euclid(86400);
338 let (y, mo, d) = civil_from_days(days);
339 let hh = tod / 3600;
340 let mm = (tod % 3600) / 60;
341 let ss = tod % 60;
342 format!("{y:04}{mo:02}{d:02}T{hh:02}{mm:02}{ss:02}")
343}
344
345fn civil_from_days(z: i64) -> (i32, u32, u32) {
347 let z = z + 719468;
348 let era = if z >= 0 { z } else { z - 146096 } / 146097;
349 let doe = (z - era * 146097) as u64;
350 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
351 let y = yoe as i64 + era * 400;
352 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
353 let mp = (5 * doy + 2) / 153;
354 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
355 let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
356 let y = y + if m <= 2 { 1 } else { 0 };
357 (y as i32, m, d)
358}
359
360fn json_to_toml(v: Json) -> Result<toml::Value, String> {
363 Ok(match v {
364 Json::Null => return Err("TOML does not support null values".into()),
365 Json::Bool(b) => toml::Value::Boolean(b),
366 Json::Number(n) => {
367 if let Some(i) = n.as_i64() {
368 toml::Value::Integer(i)
369 } else if let Some(f) = n.as_f64() {
370 toml::Value::Float(f)
371 } else {
372 return Err(format!("Unsupported number: {n}"));
373 }
374 }
375 Json::String(s) => toml::Value::String(s),
376 Json::Array(a) => {
377 let mut out = Vec::with_capacity(a.len());
378 for item in a {
379 if !item.is_null() {
380 out.push(json_to_toml(item)?);
381 }
382 }
383 toml::Value::Array(out)
384 }
385 Json::Object(m) => {
386 let mut table = toml::map::Map::new();
387 for (k, val) in m {
388 if val.is_null() {
389 continue;
390 }
391 table.insert(k, json_to_toml(val)?);
392 }
393 toml::Value::Table(table)
394 }
395 })
396}
397
398fn toml_to_json(v: toml::Value) -> Json {
400 match v {
401 toml::Value::String(s) => Json::String(s),
402 toml::Value::Integer(i) => json!(i),
403 toml::Value::Float(f) => json!(f),
404 toml::Value::Boolean(b) => Json::Bool(b),
405 toml::Value::Datetime(dt) => Json::String(dt.to_string()),
406 toml::Value::Array(a) => Json::Array(a.into_iter().map(toml_to_json).collect()),
407 toml::Value::Table(t) => {
408 let mut m = serde_json::Map::new();
409 for (k, v) in t {
410 m.insert(k, toml_to_json(v));
411 }
412 Json::Object(m)
413 }
414 }
415}
416
417fn require_pkg_name(input: &Json) -> Result<String, String> {
419 let name = input
420 .get("pkg")
421 .and_then(|p| p.get("name"))
422 .and_then(|n| n.as_str())
423 .ok_or_else(|| "alc.card.create: pkg.name is required".to_string())?
424 .to_string();
425 validate_name(&name, "pkg")?;
426 Ok(name)
427}
428
429pub fn create(input: Json) -> Result<(String, PathBuf), String> {
431 create_with_store(&default_store()?, input)
432}
433
434pub fn create_with_store(
436 store: &dyn CardStore,
437 mut input: Json,
438) -> Result<(String, PathBuf), String> {
439 if !input.is_object() {
440 return Err("alc.card.create: input must be a table".into());
441 }
442 let pkg_name = require_pkg_name(&input)?;
443 let obj = input.as_object_mut().unwrap();
444
445 obj.entry("schema_version".to_string())
447 .or_insert_with(|| json!(SCHEMA_VERSION));
448 obj.entry("created_at".to_string())
449 .or_insert_with(|| json!(now_rfc3339()));
450 obj.entry("created_by".to_string())
451 .or_insert_with(|| json!(format!("alc@{}", env!("CARGO_PKG_VERSION"))));
452
453 if let Some(params) = obj.get("params").cloned() {
455 if params.is_object() {
456 let fp = djb2_hex(&stable_json(¶ms));
457 obj.insert("param_fingerprint".to_string(), json!(fp));
458 }
459 }
460
461 let card_id = match obj.get("card_id").and_then(|v| v.as_str()) {
463 Some(id) if !id.is_empty() => id.to_string(),
464 _ => {
465 let model_id = obj
466 .get("model")
467 .and_then(|m| m.get("id"))
468 .and_then(|v| v.as_str())
469 .unwrap_or("");
470 let model_short = short_model(model_id);
471 let ts = now_compact();
472 let fp_seed = stable_json(&Json::Object(obj.clone()));
473 let h = hash6(&fp_seed);
474 format!("{pkg_name}_{model_short}_{ts}_{h}")
475 }
476 };
477 validate_name(&card_id, "card_id")?;
478 obj.insert("card_id".to_string(), json!(card_id.clone()));
479
480 let toml_val = json_to_toml(input)?;
481 let text = toml::to_string_pretty(&toml_val)
482 .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
483 let path = store.write_new_card(&pkg_name, &card_id, &text)?;
484
485 publish(CardEvent::Created {
486 pkg: pkg_name.clone(),
487 card_id: card_id.clone(),
488 toml_text: text,
489 });
490
491 Ok((card_id, path))
492}
493
494pub fn get(card_id: &str) -> Result<Option<Json>, String> {
496 get_with_store(&default_store()?, card_id)
497}
498
499pub fn get_with_store(store: &dyn CardStore, card_id: &str) -> Result<Option<Json>, String> {
501 let text = match store.read_card_text(card_id)? {
502 Some(t) => t,
503 None => return Ok(None),
504 };
505 let val: toml::Value =
506 toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
507 Ok(Some(toml_to_json(val)))
508}
509
510#[derive(Debug, Clone)]
512pub struct Summary {
513 pub card_id: String,
514 pub pkg: String,
515 pub created_at: Option<String>,
516 pub model: Option<String>,
517 pub scenario: Option<String>,
518 pub pass_rate: Option<f64>,
519}
520
521impl Summary {
522 fn to_json(&self) -> Json {
523 let mut m = serde_json::Map::new();
524 m.insert("card_id".into(), json!(self.card_id));
525 m.insert("pkg".into(), json!(self.pkg));
526 if let Some(v) = &self.created_at {
527 m.insert("created_at".into(), json!(v));
528 }
529 if let Some(v) = &self.model {
530 m.insert("model".into(), json!(v));
531 }
532 if let Some(v) = &self.scenario {
533 m.insert("scenario".into(), json!(v));
534 }
535 if let Some(v) = self.pass_rate {
536 m.insert("pass_rate".into(), json!(v));
537 }
538 Json::Object(m)
539 }
540}
541
542fn summarize(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<Summary> {
543 let text = store.read_locator_text(locator).ok().flatten()?;
544 let val: toml::Value = toml::from_str(&text).ok()?;
545 let card_id = val
546 .get("card_id")
547 .and_then(|v| v.as_str())
548 .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
549 .to_string();
550 let created_at = val
551 .get("created_at")
552 .and_then(|v| v.as_str())
553 .map(String::from);
554 let model = val
555 .get("model")
556 .and_then(|m| m.get("id"))
557 .and_then(|v| v.as_str())
558 .map(String::from);
559 let scenario = val
560 .get("scenario")
561 .and_then(|s| s.get("name"))
562 .and_then(|v| v.as_str())
563 .map(String::from);
564 let pass_rate = val
565 .get("stats")
566 .and_then(|s| s.get("pass_rate"))
567 .and_then(|v| v.as_float());
568 Some(Summary {
569 card_id,
570 pkg: pkg.to_string(),
571 created_at,
572 model,
573 scenario,
574 pass_rate,
575 })
576}
577
578pub fn list(pkg_filter: Option<&str>) -> Result<Vec<Summary>, String> {
580 list_with_store(&default_store()?, pkg_filter)
581}
582
583pub fn list_with_store(
585 store: &dyn CardStore,
586 pkg_filter: Option<&str>,
587) -> Result<Vec<Summary>, String> {
588 let locators = store.list_card_locators(pkg_filter)?;
589 let mut out = Vec::with_capacity(locators.len());
590 for (pkg, loc) in &locators {
591 if let Some(s) = summarize(store, loc, pkg) {
592 out.push(s);
593 }
594 }
595
596 out.sort_by(|a, b| {
600 b.created_at
601 .cmp(&a.created_at)
602 .then_with(|| b.card_id.cmp(&a.card_id))
603 });
604 Ok(out)
605}
606
607pub fn summaries_to_json(rows: &[Summary]) -> Json {
608 Json::Array(rows.iter().map(|s| s.to_json()).collect())
609}
610
611pub fn append(card_id: &str, fields: Json) -> Result<Json, String> {
624 append_with_store(&default_store()?, card_id, fields)
625}
626
627pub fn append_with_store(
629 store: &dyn CardStore,
630 card_id: &str,
631 fields: Json,
632) -> Result<Json, String> {
633 let text = store
634 .read_card_text(card_id)?
635 .ok_or_else(|| format!("alc.card.append: card '{card_id}' not found"))?;
636 let fields_obj = match fields {
637 Json::Object(m) => m,
638 _ => return Err("alc.card.append: fields must be a table".into()),
639 };
640
641 let existing: toml::Value =
642 toml::from_str(&text).map_err(|e| format!("Failed to parse card '{card_id}': {e}"))?;
643 let mut existing_json = toml_to_json(existing);
644 let existing_obj = existing_json
645 .as_object_mut()
646 .ok_or_else(|| format!("Card '{card_id}' is not a table"))?;
647
648 for (k, v) in fields_obj {
649 if existing_obj.contains_key(&k) {
650 return Err(format!(
651 "alc.card.append: key '{k}' already set on card '{card_id}' (immutable)"
652 ));
653 }
654 if !v.is_null() {
655 existing_obj.insert(k, v);
656 }
657 }
658
659 let toml_val = json_to_toml(existing_json.clone())?;
660 let text = toml::to_string_pretty(&toml_val)
661 .map_err(|e| format!("Failed to serialize card TOML: {e}"))?;
662 store.overwrite_card(card_id, &text)?;
663
664 publish(CardEvent::Appended {
665 card_id: card_id.to_string(),
666 toml_text: text,
667 });
668
669 Ok(existing_json)
670}
671
672#[derive(Debug, Clone)]
673pub struct Alias {
674 pub name: String,
675 pub card_id: String,
676 pub pkg: Option<String>,
677 pub set_at: String,
678 pub note: Option<String>,
679}
680
681impl Alias {
682 fn to_json(&self) -> Json {
683 let mut m = serde_json::Map::new();
684 m.insert("name".into(), json!(self.name));
685 m.insert("card_id".into(), json!(self.card_id));
686 if let Some(p) = &self.pkg {
687 m.insert("pkg".into(), json!(p));
688 }
689 m.insert("set_at".into(), json!(self.set_at));
690 if let Some(n) = &self.note {
691 m.insert("note".into(), json!(n));
692 }
693 Json::Object(m)
694 }
695}
696
697pub fn alias_set(
703 name: &str,
704 card_id: &str,
705 pkg: Option<&str>,
706 note: Option<&str>,
707) -> Result<Alias, String> {
708 alias_set_with_store(&default_store()?, name, card_id, pkg, note)
709}
710
711pub fn alias_set_with_store(
713 store: &dyn CardStore,
714 name: &str,
715 card_id: &str,
716 pkg: Option<&str>,
717 note: Option<&str>,
718) -> Result<Alias, String> {
719 validate_name(name, "alias")?;
720 if store.find_card_locator(card_id)?.is_none() {
721 return Err(format!("alc.card.alias_set: card '{card_id}' not found"));
722 }
723 let mut aliases = store.read_aliases()?;
724 aliases.retain(|a| a.name != name);
725 let entry = Alias {
726 name: name.to_string(),
727 card_id: card_id.to_string(),
728 pkg: pkg.map(String::from),
729 set_at: now_rfc3339(),
730 note: note.map(String::from),
731 };
732 aliases.push(entry.clone());
733 store.write_aliases(&aliases)?;
734
735 match serialize_aliases_toml(&aliases) {
741 Ok(text) => publish(CardEvent::AliasesWritten { toml_text: text }),
742 Err(e) => tracing::warn!(error = %e, "alias_set: failed to serialize aliases for publish"),
743 }
744
745 Ok(entry)
746}
747
748fn serialize_aliases_toml(aliases: &[Alias]) -> Result<String, String> {
752 let mut arr = Vec::with_capacity(aliases.len());
753 for a in aliases {
754 let mut t = toml::map::Map::new();
755 t.insert("name".into(), toml::Value::String(a.name.clone()));
756 t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
757 if let Some(p) = &a.pkg {
758 t.insert("pkg".into(), toml::Value::String(p.clone()));
759 }
760 t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
761 if let Some(n) = &a.note {
762 t.insert("note".into(), toml::Value::String(n.clone()));
763 }
764 arr.push(toml::Value::Table(t));
765 }
766 let mut root = toml::map::Map::new();
767 root.insert("alias".into(), toml::Value::Array(arr));
768 toml::to_string_pretty(&toml::Value::Table(root))
769 .map_err(|e| format!("Failed to serialize aliases: {e}"))
770}
771
772pub fn get_by_alias(name: &str) -> Result<Option<Json>, String> {
778 get_by_alias_with_store(&default_store()?, name)
779}
780
781pub fn get_by_alias_with_store(store: &dyn CardStore, name: &str) -> Result<Option<Json>, String> {
783 validate_name(name, "alias")?;
784 let aliases = store.read_aliases()?;
785 let Some(alias) = aliases.into_iter().find(|a| a.name == name) else {
786 return Ok(None);
787 };
788 match get_with_store(store, &alias.card_id)? {
789 Some(card) => Ok(Some(card)),
790 None => Err(format!(
791 "alc.card.get_by_alias: alias '{name}' points at missing card '{}'",
792 alias.card_id
793 )),
794 }
795}
796
797pub fn alias_list(pkg_filter: Option<&str>) -> Result<Vec<Alias>, String> {
799 alias_list_with_store(&default_store()?, pkg_filter)
800}
801
802pub fn alias_list_with_store(
804 store: &dyn CardStore,
805 pkg_filter: Option<&str>,
806) -> Result<Vec<Alias>, String> {
807 let mut aliases = store.read_aliases()?;
808 if let Some(p) = pkg_filter {
809 aliases.retain(|a| a.pkg.as_deref() == Some(p));
810 }
811 Ok(aliases)
812}
813
814pub fn aliases_to_json(rows: &[Alias]) -> Json {
815 Json::Array(rows.iter().map(|a| a.to_json()).collect())
816}
817
818#[derive(Debug, Clone, PartialEq)]
854pub enum CmpOp {
855 Eq,
856 Ne,
857 Lt,
858 Lte,
859 Gt,
860 Gte,
861 In,
862 Nin,
863 Exists,
864 Contains,
865 StartsWith,
866}
867
868impl CmpOp {
869 fn from_key(k: &str) -> Option<Self> {
870 Some(match k {
871 "eq" => Self::Eq,
872 "ne" => Self::Ne,
873 "lt" => Self::Lt,
874 "lte" => Self::Lte,
875 "gt" => Self::Gt,
876 "gte" => Self::Gte,
877 "in" => Self::In,
878 "nin" => Self::Nin,
879 "exists" => Self::Exists,
880 "contains" => Self::Contains,
881 "starts_with" => Self::StartsWith,
882 _ => return None,
883 })
884 }
885}
886
887#[derive(Debug, Clone)]
890pub struct Comparison {
891 pub path: Vec<String>,
892 pub op: CmpOp,
893 pub value: Json,
894}
895
896#[derive(Debug, Clone)]
898pub enum Predicate {
899 And(Vec<Predicate>),
900 Or(Vec<Predicate>),
901 Not(Box<Predicate>),
902 Cmp(Comparison),
903}
904
905fn is_operator_object(obj: &serde_json::Map<String, Json>) -> bool {
908 if obj.is_empty() {
909 return false;
910 }
911 obj.keys().all(|k| CmpOp::from_key(k).is_some())
912}
913
914pub fn parse_where(value: &Json) -> Result<Predicate, String> {
919 parse_predicate(value, &[])
920}
921
922fn parse_predicate(value: &Json, prefix: &[String]) -> Result<Predicate, String> {
923 let obj = value
924 .as_object()
925 .ok_or_else(|| "where clause must be a table".to_string())?;
926
927 let mut clauses: Vec<Predicate> = Vec::new();
928
929 for (key, val) in obj {
930 match key.as_str() {
931 "_and" => {
932 let arr = val
933 .as_array()
934 .ok_or_else(|| "_and must be an array of sub-predicates".to_string())?;
935 let mut subs = Vec::with_capacity(arr.len());
936 for sub in arr {
937 subs.push(parse_predicate(sub, prefix)?);
938 }
939 clauses.push(Predicate::And(subs));
940 }
941 "_or" => {
942 let arr = val
943 .as_array()
944 .ok_or_else(|| "_or must be an array of sub-predicates".to_string())?;
945 let mut subs = Vec::with_capacity(arr.len());
946 for sub in arr {
947 subs.push(parse_predicate(sub, prefix)?);
948 }
949 clauses.push(Predicate::Or(subs));
950 }
951 "_not" => {
952 clauses.push(Predicate::Not(Box::new(parse_predicate(val, prefix)?)));
953 }
954 _ => {
955 let mut new_path = prefix.to_vec();
957 new_path.push(key.clone());
958
959 match val {
960 Json::Object(m) if is_operator_object(m) => {
961 for (op_key, op_val) in m {
963 let op = CmpOp::from_key(op_key).expect("validated above");
964 clauses.push(Predicate::Cmp(Comparison {
965 path: new_path.clone(),
966 op,
967 value: op_val.clone(),
968 }));
969 }
970 }
971 Json::Object(_) => {
972 clauses.push(parse_predicate(val, &new_path)?);
974 }
975 _ => {
976 clauses.push(Predicate::Cmp(Comparison {
978 path: new_path,
979 op: CmpOp::Eq,
980 value: val.clone(),
981 }));
982 }
983 }
984 }
985 }
986 }
987
988 if clauses.len() == 1 {
989 Ok(clauses.remove(0))
990 } else {
991 Ok(Predicate::And(clauses))
992 }
993}
994
995fn fetch_path<'a>(card: &'a Json, path: &[String]) -> Option<&'a Json> {
997 let mut node = card;
998 for key in path {
999 let obj = node.as_object()?;
1000 node = obj.get(key)?;
1001 }
1002 Some(node)
1003}
1004
1005fn json_cmp(a: &Json, b: &Json) -> Option<std::cmp::Ordering> {
1008 match (a, b) {
1009 (Json::Number(x), Json::Number(y)) => {
1010 let xf = x.as_f64()?;
1011 let yf = y.as_f64()?;
1012 xf.partial_cmp(&yf)
1013 }
1014 (Json::String(x), Json::String(y)) => Some(x.cmp(y)),
1015 (Json::Bool(x), Json::Bool(y)) => Some(x.cmp(y)),
1016 _ => None,
1017 }
1018}
1019
1020fn json_eq(a: &Json, b: &Json) -> bool {
1021 match (a, b) {
1022 (Json::Number(x), Json::Number(y)) => match (x.as_f64(), y.as_f64()) {
1023 (Some(xf), Some(yf)) => xf == yf,
1024 _ => a == b,
1025 },
1026 _ => a == b,
1027 }
1028}
1029
1030fn eval_cmp(cmp: &Comparison, card: &Json) -> bool {
1031 let actual = fetch_path(card, &cmp.path);
1032 let exists = actual.is_some();
1033
1034 match cmp.op {
1035 CmpOp::Exists => {
1036 let want = cmp.value.as_bool().unwrap_or(true);
1037 exists == want
1038 }
1039 CmpOp::Ne => match actual {
1040 None => true,
1041 Some(v) => !json_eq(v, &cmp.value),
1042 },
1043 CmpOp::Nin => match actual {
1044 None => true,
1045 Some(v) => match cmp.value.as_array() {
1046 Some(arr) => !arr.iter().any(|e| json_eq(e, v)),
1047 None => false,
1048 },
1049 },
1050 CmpOp::Eq => actual.is_some_and(|v| json_eq(v, &cmp.value)),
1051 CmpOp::In => actual.is_some_and(|v| match cmp.value.as_array() {
1052 Some(arr) => arr.iter().any(|e| json_eq(e, v)),
1053 None => false,
1054 }),
1055 CmpOp::Lt | CmpOp::Lte | CmpOp::Gt | CmpOp::Gte => {
1056 let Some(v) = actual else { return false };
1057 let Some(ord) = json_cmp(v, &cmp.value) else {
1058 return false;
1059 };
1060 use std::cmp::Ordering::{Equal, Greater, Less};
1061 matches!(
1062 (&cmp.op, ord),
1063 (CmpOp::Lt, Less)
1064 | (CmpOp::Lte, Less | Equal)
1065 | (CmpOp::Gt, Greater)
1066 | (CmpOp::Gte, Greater | Equal)
1067 )
1068 }
1069 CmpOp::Contains => {
1070 let Some(Json::String(haystack)) = actual else {
1071 return false;
1072 };
1073 let Some(needle) = cmp.value.as_str() else {
1074 return false;
1075 };
1076 haystack.contains(needle)
1077 }
1078 CmpOp::StartsWith => {
1079 let Some(Json::String(haystack)) = actual else {
1080 return false;
1081 };
1082 let Some(needle) = cmp.value.as_str() else {
1083 return false;
1084 };
1085 haystack.starts_with(needle)
1086 }
1087 }
1088}
1089
1090pub fn eval_predicate(pred: &Predicate, card: &Json) -> bool {
1092 match pred {
1093 Predicate::And(subs) => subs.iter().all(|p| eval_predicate(p, card)),
1094 Predicate::Or(subs) => subs.iter().any(|p| eval_predicate(p, card)),
1095 Predicate::Not(sub) => !eval_predicate(sub, card),
1096 Predicate::Cmp(c) => eval_cmp(c, card),
1097 }
1098}
1099
1100#[derive(Debug, Clone)]
1106pub struct OrderKey {
1107 pub path: Vec<String>,
1108 pub desc: bool,
1109}
1110
1111impl OrderKey {
1112 fn parse(raw: &str) -> Result<Self, String> {
1113 if raw.is_empty() {
1114 return Err("order_by key must not be empty".into());
1115 }
1116 let (desc, rest) = if let Some(r) = raw.strip_prefix('-') {
1117 (true, r)
1118 } else {
1119 (false, raw)
1120 };
1121 let path: Vec<String> = rest.split('.').map(|s| s.to_string()).collect();
1122 if path.iter().any(|p| p.is_empty()) {
1123 return Err(format!("invalid order_by key: '{raw}'"));
1124 }
1125 Ok(Self { path, desc })
1126 }
1127}
1128
1129pub fn parse_order_by(value: &Json) -> Result<Vec<OrderKey>, String> {
1133 match value {
1134 Json::String(s) => Ok(vec![OrderKey::parse(s)?]),
1135 Json::Array(arr) => {
1136 let mut out = Vec::with_capacity(arr.len());
1137 for v in arr {
1138 let s = v
1139 .as_str()
1140 .ok_or_else(|| "order_by array must contain strings".to_string())?;
1141 out.push(OrderKey::parse(s)?);
1142 }
1143 Ok(out)
1144 }
1145 _ => Err("order_by must be a string or array of strings".into()),
1146 }
1147}
1148
1149#[derive(Debug, Default, Clone)]
1151pub struct FindQuery {
1152 pub pkg: Option<String>,
1154 pub where_: Option<Predicate>,
1156 pub order_by: Vec<OrderKey>,
1158 pub limit: Option<usize>,
1159 pub offset: Option<usize>,
1160}
1161
1162#[derive(Debug, Clone)]
1168struct CardRow {
1169 full: Json,
1170 summary: Summary,
1171}
1172
1173fn load_full(store: &dyn CardStore, locator: &std::path::Path, pkg: &str) -> Option<CardRow> {
1175 let text = store.read_locator_text(locator).ok().flatten()?;
1176 let val: toml::Value = toml::from_str(&text).ok()?;
1177 let json = toml_to_json(val);
1178
1179 let card_id = json
1180 .get("card_id")
1181 .and_then(|v| v.as_str())
1182 .or_else(|| locator.file_stem().and_then(|s| s.to_str()))?
1183 .to_string();
1184 let created_at = json
1185 .get("created_at")
1186 .and_then(|v| v.as_str())
1187 .map(String::from);
1188 let model = json
1189 .get("model")
1190 .and_then(|m| m.get("id"))
1191 .and_then(|v| v.as_str())
1192 .map(String::from);
1193 let scenario = json
1194 .get("scenario")
1195 .and_then(|s| s.get("name"))
1196 .and_then(|v| v.as_str())
1197 .map(String::from);
1198 let pass_rate = json
1199 .get("stats")
1200 .and_then(|s| s.get("pass_rate"))
1201 .and_then(|v| v.as_f64());
1202
1203 Some(CardRow {
1204 full: json,
1205 summary: Summary {
1206 card_id,
1207 pkg: pkg.to_string(),
1208 created_at,
1209 model,
1210 scenario,
1211 pass_rate,
1212 },
1213 })
1214}
1215
1216fn order_cards(a: &CardRow, b: &CardRow, keys: &[OrderKey]) -> std::cmp::Ordering {
1218 use std::cmp::Ordering;
1219 for k in keys {
1220 let va = fetch_path(&a.full, &k.path);
1221 let vb = fetch_path(&b.full, &k.path);
1222 let ord = match (va, vb) {
1223 (None, None) => Ordering::Equal,
1224 (None, Some(_)) => Ordering::Greater, (Some(_), None) => Ordering::Less,
1226 (Some(x), Some(y)) => json_cmp(x, y).unwrap_or(Ordering::Equal),
1227 };
1228 let ord = if k.desc { ord.reverse() } else { ord };
1229 if ord != Ordering::Equal {
1230 return ord;
1231 }
1232 }
1233 Ordering::Equal
1234}
1235
1236const SUMMARY_SORT_FIELDS: &[&str] = &[
1238 "card_id",
1239 "created_at",
1240 "stats.pass_rate",
1241 "scenario.name",
1242 "model.id",
1243];
1244
1245fn is_lightweight_query(q: &FindQuery) -> bool {
1248 q.where_.is_none()
1249 && q.order_by
1250 .iter()
1251 .all(|k| SUMMARY_SORT_FIELDS.contains(&k.path.join(".").as_str()))
1252}
1253
1254fn order_summaries(a: &Summary, b: &Summary, keys: &[OrderKey]) -> std::cmp::Ordering {
1256 use std::cmp::Ordering;
1257 for k in keys {
1258 let key_str = k.path.join(".");
1259 let ord = match key_str.as_str() {
1260 "card_id" => a.card_id.cmp(&b.card_id),
1261 "created_at" => a.created_at.cmp(&b.created_at),
1262 "stats.pass_rate" => match (a.pass_rate, b.pass_rate) {
1263 (None, None) => Ordering::Equal,
1264 (None, Some(_)) => Ordering::Greater,
1265 (Some(_), None) => Ordering::Less,
1266 (Some(x), Some(y)) => x.partial_cmp(&y).unwrap_or(Ordering::Equal),
1267 },
1268 "scenario.name" => a.scenario.cmp(&b.scenario),
1269 "model.id" => a.model.cmp(&b.model),
1270 _ => Ordering::Equal,
1271 };
1272 let ord = if k.desc { ord.reverse() } else { ord };
1273 if ord != Ordering::Equal {
1274 return ord;
1275 }
1276 }
1277 Ordering::Equal
1278}
1279
1280pub fn find(q: FindQuery) -> Result<Vec<Summary>, String> {
1286 find_with_store(&default_store()?, q)
1287}
1288
1289pub fn find_with_store(store: &dyn CardStore, q: FindQuery) -> Result<Vec<Summary>, String> {
1291 if is_lightweight_query(&q) {
1293 let mut rows = list_with_store(store, q.pkg.as_deref())?;
1294 if q.order_by.is_empty() {
1295 rows.sort_by(|a, b| {
1296 b.created_at
1297 .cmp(&a.created_at)
1298 .then_with(|| b.card_id.cmp(&a.card_id))
1299 });
1300 } else {
1301 rows.sort_by(|a, b| order_summaries(a, b, &q.order_by));
1302 }
1303 let out: Vec<Summary> = rows
1304 .into_iter()
1305 .skip(q.offset.unwrap_or(0))
1306 .take(q.limit.unwrap_or(usize::MAX))
1307 .collect();
1308 return Ok(out);
1309 }
1310
1311 let all_rows = scan_cards(store, q.pkg.as_deref())?;
1313
1314 let mut rows: Vec<CardRow> = if let Some(pred) = &q.where_ {
1316 all_rows
1317 .into_iter()
1318 .filter(|row| eval_predicate(pred, &row.full))
1319 .collect()
1320 } else {
1321 all_rows
1322 };
1323
1324 if q.order_by.is_empty() {
1326 rows.sort_by(|a, b| {
1327 b.summary
1328 .created_at
1329 .cmp(&a.summary.created_at)
1330 .then_with(|| b.summary.card_id.cmp(&a.summary.card_id))
1331 });
1332 } else {
1333 rows.sort_by(|a, b| order_cards(a, b, &q.order_by));
1334 }
1335
1336 let out: Vec<Summary> = rows
1338 .into_iter()
1339 .skip(q.offset.unwrap_or(0))
1340 .take(q.limit.unwrap_or(usize::MAX))
1341 .map(|r| r.summary)
1342 .collect();
1343
1344 Ok(out)
1345}
1346
1347#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1363pub enum LineageDirection {
1364 #[default]
1365 Up,
1366 Down,
1367 Both,
1368}
1369
1370impl LineageDirection {
1371 pub fn parse(s: &str) -> Result<Self, String> {
1372 match s {
1373 "up" => Ok(Self::Up),
1374 "down" => Ok(Self::Down),
1375 "both" => Ok(Self::Both),
1376 other => Err(format!(
1377 "direction must be 'up', 'down', or 'both' (got '{other}')"
1378 )),
1379 }
1380 }
1381}
1382
1383#[derive(Debug, Clone, Default)]
1385pub struct LineageQuery {
1386 pub card_id: String,
1387 pub direction: LineageDirection,
1388 pub depth: Option<usize>,
1390 pub include_stats: bool,
1392 pub relation_filter: Option<Vec<String>>,
1395}
1396
1397#[derive(Debug, Clone)]
1402pub struct LineageNode {
1403 pub card_id: String,
1404 pub pkg: String,
1405 pub prior_card_id: Option<String>,
1406 pub prior_relation: Option<String>,
1407 pub depth: i32,
1408 pub stats: Option<Json>,
1409}
1410
1411#[derive(Debug, Clone)]
1413pub struct LineageEdge {
1414 pub from: String,
1415 pub to: String,
1416 pub relation: Option<String>,
1417}
1418
1419#[derive(Debug, Clone)]
1421pub struct LineageResult {
1422 pub root: String,
1423 pub nodes: Vec<LineageNode>,
1424 pub edges: Vec<LineageEdge>,
1425 pub truncated: bool,
1426}
1427
1428const DEFAULT_LINEAGE_DEPTH: usize = 10;
1429
1430fn lineage_fields(card: &Json) -> (Option<String>, Option<String>) {
1433 let meta = card.get("metadata");
1434 let prior_card_id = meta
1435 .and_then(|m| m.get("prior_card_id"))
1436 .and_then(|v| v.as_str())
1437 .map(String::from);
1438 let prior_relation = meta
1439 .and_then(|m| m.get("prior_relation"))
1440 .and_then(|v| v.as_str())
1441 .map(String::from);
1442 (prior_card_id, prior_relation)
1443}
1444
1445fn make_node(row: &CardRow, depth: i32, include_stats: bool) -> LineageNode {
1447 let (prior_card_id, prior_relation) = lineage_fields(&row.full);
1448 let stats = if include_stats {
1449 row.full.get("stats").cloned()
1450 } else {
1451 None
1452 };
1453 LineageNode {
1454 card_id: row.summary.card_id.clone(),
1455 pkg: row.summary.pkg.clone(),
1456 prior_card_id,
1457 prior_relation,
1458 depth,
1459 stats,
1460 }
1461}
1462
1463fn relation_passes(filter: &Option<Vec<String>>, relation: &Option<String>) -> bool {
1466 match filter {
1467 None => true,
1468 Some(allowed) => match relation {
1469 Some(r) => allowed.iter().any(|a| a == r),
1470 None => false,
1471 },
1472 }
1473}
1474
1475struct CardIndex {
1477 cards: std::collections::HashMap<String, CardRow>,
1479 children: std::collections::HashMap<String, Vec<String>>,
1481}
1482
1483fn load_card_index(store: &dyn CardStore) -> Result<CardIndex, String> {
1487 let rows = scan_cards(store, None)?;
1488
1489 let mut cards = std::collections::HashMap::with_capacity(rows.len());
1490 let mut children: std::collections::HashMap<String, Vec<String>> =
1491 std::collections::HashMap::new();
1492
1493 for row in rows {
1494 let id = row.summary.card_id.clone();
1495 let (prior_id, _) = lineage_fields(&row.full);
1496 if let Some(parent) = prior_id {
1497 children.entry(parent).or_default().push(id.clone());
1498 }
1499 cards.insert(id, row);
1500 }
1501 Ok(CardIndex { cards, children })
1502}
1503
1504fn scan_cards(store: &dyn CardStore, pkg_filter: Option<&str>) -> Result<Vec<CardRow>, String> {
1508 let locators = store.list_card_locators(pkg_filter)?;
1509 let mut rows = Vec::with_capacity(locators.len());
1510 for (pkg, loc) in &locators {
1511 if let Some(row) = load_full(store, loc, pkg) {
1512 rows.push(row);
1513 }
1514 }
1515 Ok(rows)
1516}
1517
1518struct LineageCtx<'a> {
1520 index: &'a CardIndex,
1521 relation_filter: &'a Option<Vec<String>>,
1522 include_stats: bool,
1523 max_depth: usize,
1524}
1525
1526struct LineageAccum {
1528 nodes: Vec<LineageNode>,
1529 edges: Vec<LineageEdge>,
1530 visited: std::collections::HashSet<String>,
1531 truncated: bool,
1532}
1533
1534fn walk_up(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1536 let mut cur = start_id.to_string();
1537 for step in 1..=ctx.max_depth {
1538 let Some(row) = ctx.index.cards.get(&cur) else {
1539 return;
1540 };
1541 let (prior_id, prior_rel) = lineage_fields(&row.full);
1542 let Some(prior_id) = prior_id else {
1543 return;
1544 };
1545 if !relation_passes(ctx.relation_filter, &prior_rel) {
1546 return;
1547 }
1548 if acc.visited.contains(&prior_id) {
1549 return;
1550 }
1551 let Some(parent) = ctx.index.cards.get(&prior_id) else {
1552 return;
1553 };
1554 acc.nodes
1555 .push(make_node(parent, -(step as i32), ctx.include_stats));
1556 acc.edges.push(LineageEdge {
1557 from: row.summary.card_id.clone(),
1558 to: parent.summary.card_id.clone(),
1559 relation: prior_rel,
1560 });
1561 acc.visited.insert(prior_id.clone());
1562 cur = prior_id;
1563 }
1564 if let Some(row) = ctx.index.cards.get(&cur) {
1566 let (prior_id, _) = lineage_fields(&row.full);
1567 if prior_id
1568 .as_ref()
1569 .is_some_and(|p| ctx.index.cards.contains_key(p) && !acc.visited.contains(p))
1570 {
1571 acc.truncated = true;
1572 }
1573 }
1574}
1575
1576fn walk_down(start_id: &str, ctx: &LineageCtx<'_>, acc: &mut LineageAccum) {
1579 let mut frontier: Vec<String> = vec![start_id.to_string()];
1580
1581 for depth in 1..=ctx.max_depth {
1582 let mut next_frontier: Vec<String> = Vec::new();
1583 for parent_id in &frontier {
1584 let children = match ctx.index.children.get(parent_id) {
1585 Some(c) => c,
1586 None => continue,
1587 };
1588 for child_id in children {
1589 if acc.visited.contains(child_id) {
1590 continue;
1591 }
1592 let Some(child) = ctx.index.cards.get(child_id) else {
1593 continue;
1594 };
1595 let (_, prior_rel) = lineage_fields(&child.full);
1596 if !relation_passes(ctx.relation_filter, &prior_rel) {
1597 continue;
1598 }
1599 acc.nodes
1600 .push(make_node(child, depth as i32, ctx.include_stats));
1601 acc.edges.push(LineageEdge {
1602 from: child.summary.card_id.clone(),
1603 to: parent_id.clone(),
1604 relation: prior_rel,
1605 });
1606 acc.visited.insert(child_id.clone());
1607 next_frontier.push(child_id.clone());
1608 }
1609 }
1610 if next_frontier.is_empty() {
1611 return;
1612 }
1613 frontier = next_frontier;
1614 }
1615 for parent_id in &frontier {
1618 let children = match ctx.index.children.get(parent_id) {
1619 Some(c) => c,
1620 None => continue,
1621 };
1622 for child_id in children {
1623 if acc.visited.contains(child_id) {
1624 continue;
1625 }
1626 let Some(child) = ctx.index.cards.get(child_id) else {
1627 continue;
1628 };
1629 let (_, prior_rel) = lineage_fields(&child.full);
1630 if relation_passes(ctx.relation_filter, &prior_rel) {
1631 acc.truncated = true;
1632 return;
1633 }
1634 }
1635 }
1636}
1637
1638pub fn lineage(q: LineageQuery) -> Result<Option<LineageResult>, String> {
1640 lineage_with_store(&default_store()?, q)
1641}
1642
1643pub fn lineage_with_store(
1645 store: &dyn CardStore,
1646 q: LineageQuery,
1647) -> Result<Option<LineageResult>, String> {
1648 let index = load_card_index(store)?;
1649 let Some(root_row) = index.cards.get(&q.card_id) else {
1650 return Ok(None);
1651 };
1652
1653 let ctx = LineageCtx {
1654 index: &index,
1655 relation_filter: &q.relation_filter,
1656 include_stats: q.include_stats,
1657 max_depth: q.depth.unwrap_or(DEFAULT_LINEAGE_DEPTH),
1658 };
1659 let mut acc = LineageAccum {
1660 nodes: Vec::new(),
1661 edges: Vec::new(),
1662 visited: std::collections::HashSet::new(),
1663 truncated: false,
1664 };
1665
1666 acc.nodes.push(make_node(root_row, 0, q.include_stats));
1667 acc.visited.insert(q.card_id.clone());
1668
1669 if matches!(q.direction, LineageDirection::Up | LineageDirection::Both) {
1670 walk_up(&q.card_id, &ctx, &mut acc);
1671 }
1672 if matches!(q.direction, LineageDirection::Down | LineageDirection::Both) {
1673 walk_down(&q.card_id, &ctx, &mut acc);
1674 }
1675
1676 Ok(Some(LineageResult {
1677 root: q.card_id,
1678 nodes: acc.nodes,
1679 edges: acc.edges,
1680 truncated: acc.truncated,
1681 }))
1682}
1683
1684pub fn lineage_to_json(r: &LineageResult) -> Json {
1686 let nodes: Vec<Json> = r
1687 .nodes
1688 .iter()
1689 .map(|n| {
1690 let mut m = serde_json::Map::new();
1691 m.insert("card_id".into(), json!(n.card_id));
1692 m.insert("pkg".into(), json!(n.pkg));
1693 m.insert("depth".into(), json!(n.depth));
1694 if let Some(p) = &n.prior_card_id {
1695 m.insert("prior_card_id".into(), json!(p));
1696 }
1697 if let Some(rel) = &n.prior_relation {
1698 m.insert("prior_relation".into(), json!(rel));
1699 }
1700 if let Some(s) = &n.stats {
1701 m.insert("stats".into(), s.clone());
1702 }
1703 Json::Object(m)
1704 })
1705 .collect();
1706 let edges: Vec<Json> = r
1707 .edges
1708 .iter()
1709 .map(|e| {
1710 let mut m = serde_json::Map::new();
1711 m.insert("from".into(), json!(e.from));
1712 m.insert("to".into(), json!(e.to));
1713 if let Some(rel) = &e.relation {
1714 m.insert("relation".into(), json!(rel));
1715 }
1716 Json::Object(m)
1717 })
1718 .collect();
1719 json!({
1720 "root": r.root,
1721 "nodes": nodes,
1722 "edges": edges,
1723 "truncated": r.truncated,
1724 })
1725}
1726
1727pub fn import_from_dir(
1747 source_dir: &std::path::Path,
1748 pkg: &str,
1749) -> Result<(Vec<String>, Vec<String>), String> {
1750 import_from_dir_with_store(&default_store()?, source_dir, pkg)
1751}
1752
1753pub fn import_from_dir_with_store(
1755 store: &dyn CardStore,
1756 source_dir: &std::path::Path,
1757 pkg: &str,
1758) -> Result<(Vec<String>, Vec<String>), String> {
1759 let (imported, skipped) = store.import_from_dir(source_dir, pkg)?;
1760 for card_id in &imported {
1761 match store.read_card_text(card_id) {
1762 Ok(Some(toml_text)) => publish(CardEvent::Created {
1763 pkg: pkg.to_string(),
1764 card_id: card_id.clone(),
1765 toml_text,
1766 }),
1767 Ok(None) => {
1768 tracing::warn!(
1769 card_id = %card_id,
1770 "import_from_dir: read_card_text returned None after import; skipping publish"
1771 );
1772 }
1773 Err(e) => {
1774 tracing::warn!(
1775 card_id = %card_id,
1776 error = %e,
1777 "import_from_dir: read_card_text failed after import; skipping publish"
1778 );
1779 }
1780 }
1781 match store.read_samples_text(card_id) {
1783 Ok(Some(jsonl_text)) => publish(CardEvent::SamplesWritten {
1784 card_id: card_id.clone(),
1785 jsonl_text,
1786 }),
1787 Ok(None) => {}
1788 Err(e) => {
1789 tracing::warn!(
1790 card_id = %card_id,
1791 error = %e,
1792 "import_from_dir: read_samples_text failed after import; skipping publish"
1793 );
1794 }
1795 }
1796 }
1797 Ok((imported, skipped))
1798}
1799
1800pub fn write_samples(card_id: &str, samples: Vec<Json>) -> Result<PathBuf, String> {
1806 write_samples_with_store(&default_store()?, card_id, samples)
1807}
1808
1809pub fn write_samples_with_store(
1811 store: &dyn CardStore,
1812 card_id: &str,
1813 samples: Vec<Json>,
1814) -> Result<PathBuf, String> {
1815 if store.samples_exists(card_id)? {
1816 return Err(format!(
1817 "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
1818 ));
1819 }
1820 let mut buf = String::new();
1821 for (idx, s) in samples.iter().enumerate() {
1822 let line = serde_json::to_string(s).map_err(|e| {
1823 format!("alc.card.write_samples: failed to serialize sample #{idx}: {e}")
1824 })?;
1825 buf.push_str(&line);
1826 buf.push('\n');
1827 }
1828 let path = store.write_samples_text(card_id, &buf)?;
1829
1830 publish(CardEvent::SamplesWritten {
1831 card_id: card_id.to_string(),
1832 jsonl_text: buf,
1833 });
1834
1835 Ok(path)
1836}
1837
1838#[derive(Debug, Default, Clone)]
1840pub struct SamplesQuery {
1841 pub offset: usize,
1843 pub limit: Option<usize>,
1845 pub where_: Option<Predicate>,
1848}
1849
1850pub fn read_samples(card_id: &str, q: SamplesQuery) -> Result<Vec<Json>, String> {
1860 read_samples_with_store(&default_store()?, card_id, q)
1861}
1862
1863pub fn read_samples_with_store(
1865 store: &dyn CardStore,
1866 card_id: &str,
1867 q: SamplesQuery,
1868) -> Result<Vec<Json>, String> {
1869 let text = match store.read_samples_text(card_id)? {
1870 Some(t) => t,
1871 None => return Ok(Vec::new()),
1872 };
1873 let mut matched: usize = 0;
1874 let mut out = Vec::new();
1875 for (i, line) in text.lines().enumerate() {
1876 if line.trim().is_empty() {
1877 continue;
1878 }
1879 let val: Json = serde_json::from_str(line)
1880 .map_err(|e| format!("Failed to parse sample line {i}: {e}"))?;
1881 if let Some(pred) = &q.where_ {
1882 if !eval_predicate(pred, &val) {
1883 continue;
1884 }
1885 }
1886 if matched < q.offset {
1887 matched += 1;
1888 continue;
1889 }
1890 if let Some(lim) = q.limit {
1891 if out.len() >= lim {
1892 break;
1893 }
1894 }
1895 matched += 1;
1896 out.push(val);
1897 }
1898 Ok(out)
1899}
1900
1901pub struct FileCardStore {
1914 root: PathBuf,
1915}
1916
1917impl FileCardStore {
1918 pub fn new(root: PathBuf) -> Self {
1920 Self { root }
1921 }
1922
1923 pub fn from_home() -> Result<Self, String> {
1925 Ok(Self { root: cards_dir()? })
1926 }
1927
1928 fn pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
1932 validate_name(pkg, "pkg")?;
1933 let dir = self.root.join(pkg);
1934 if !dir.exists() {
1935 fs::create_dir_all(&dir).map_err(|e| format!("Failed to create pkg dir: {e}"))?;
1936 }
1937 Ok(dir)
1938 }
1939
1940 fn aliases_path(&self) -> PathBuf {
1942 self.root.join("_aliases.toml")
1943 }
1944
1945 fn samples_path(&self, card_id: &str) -> Result<PathBuf, String> {
1949 let card_path = self
1950 .find_card_locator(card_id)?
1951 .ok_or_else(|| format!("card '{card_id}' not found"))?;
1952 let dir = card_path
1953 .parent()
1954 .ok_or_else(|| format!("card '{card_id}' has no parent directory"))?;
1955 Ok(dir.join(format!("{card_id}.samples.jsonl")))
1956 }
1957}
1958
1959impl CardStore for FileCardStore {
1960 fn write_new_card(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1961 let dir = self.pkg_dir(pkg)?;
1962 let path = dir.join(format!("{card_id}.toml"));
1963 if path.exists() {
1964 return Err(format!(
1965 "alc.card.create: card '{card_id}' already exists (immutable)"
1966 ));
1967 }
1968 let tmp = path.with_extension("toml.tmp");
1969 fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1970 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1971 Ok(path)
1972 }
1973
1974 fn overwrite_card(&self, card_id: &str, toml_text: &str) -> Result<PathBuf, String> {
1975 let path = self
1976 .find_card_locator(card_id)?
1977 .ok_or_else(|| format!("alc.card.overwrite: card '{card_id}' not found"))?;
1978 let tmp = path.with_extension("toml.tmp");
1979 fs::write(&tmp, toml_text).map_err(|e| format!("Failed to write card tmp: {e}"))?;
1980 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename card file: {e}"))?;
1981 Ok(path)
1982 }
1983
1984 fn find_card_locator(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
1985 validate_name(card_id, "card_id")?;
1986 if !self.root.exists() {
1987 return Ok(None);
1988 }
1989 let entries =
1990 fs::read_dir(&self.root).map_err(|e| format!("Failed to read cards dir: {e}"))?;
1991 for entry in entries.flatten() {
1992 let p = entry.path();
1993 if p.is_dir() {
1994 let candidate = p.join(format!("{card_id}.toml"));
1995 if candidate.exists() {
1996 return Ok(Some(candidate));
1997 }
1998 }
1999 }
2000 Ok(None)
2001 }
2002
2003 fn read_card_text(&self, card_id: &str) -> Result<Option<String>, String> {
2004 let Some(path) = self.find_card_locator(card_id)? else {
2005 return Ok(None);
2006 };
2007 let text = fs::read_to_string(&path)
2008 .map_err(|e| format!("Failed to read card '{card_id}': {e}"))?;
2009 Ok(Some(text))
2010 }
2011
2012 fn list_card_locators(
2013 &self,
2014 pkg_filter: Option<&str>,
2015 ) -> Result<Vec<(String, PathBuf)>, String> {
2016 if !self.root.exists() {
2017 return Ok(Vec::new());
2018 }
2019 let pkg_dirs: Vec<PathBuf> = if let Some(p) = pkg_filter {
2020 validate_name(p, "pkg")?;
2021 let d = self.root.join(p);
2022 if d.is_dir() {
2023 vec![d]
2024 } else {
2025 return Ok(Vec::new());
2026 }
2027 } else {
2028 fs::read_dir(&self.root)
2029 .map_err(|e| format!("Failed to read cards dir: {e}"))?
2030 .flatten()
2031 .map(|e| e.path())
2032 .filter(|p| p.is_dir())
2033 .collect()
2034 };
2035
2036 let mut out = Vec::new();
2037 for pdir in pkg_dirs {
2038 let pkg = pdir
2039 .file_name()
2040 .and_then(|s| s.to_str())
2041 .unwrap_or("")
2042 .to_string();
2043 let entries = match fs::read_dir(&pdir) {
2044 Ok(e) => e,
2045 Err(_) => continue,
2046 };
2047 for entry in entries.flatten() {
2048 let p = entry.path();
2049 if p.extension().and_then(|s| s.to_str()) != Some("toml") {
2050 continue;
2051 }
2052 out.push((pkg.clone(), p));
2053 }
2054 }
2055 Ok(out)
2056 }
2057
2058 fn read_locator_text(&self, locator: &Path) -> Result<Option<String>, String> {
2059 match fs::read_to_string(locator) {
2060 Ok(text) => Ok(Some(text)),
2061 Err(_) => Ok(None),
2062 }
2063 }
2064
2065 fn read_aliases(&self) -> Result<Vec<Alias>, String> {
2066 let path = self.aliases_path();
2067 if !path.exists() {
2068 return Ok(Vec::new());
2069 }
2070 let text =
2071 fs::read_to_string(&path).map_err(|e| format!("Failed to read aliases file: {e}"))?;
2072 let val: toml::Value =
2073 toml::from_str(&text).map_err(|e| format!("Failed to parse aliases file: {e}"))?;
2074 let arr = val
2075 .get("alias")
2076 .and_then(|v| v.as_array())
2077 .cloned()
2078 .unwrap_or_default();
2079 let mut out = Vec::with_capacity(arr.len());
2080 for entry in arr {
2081 let t = match entry {
2082 toml::Value::Table(t) => t,
2083 _ => continue,
2084 };
2085 let name = match t.get("name").and_then(|v| v.as_str()) {
2086 Some(s) => s.to_string(),
2087 None => continue,
2088 };
2089 let card_id = match t.get("card_id").and_then(|v| v.as_str()) {
2090 Some(s) => s.to_string(),
2091 None => continue,
2092 };
2093 out.push(Alias {
2094 name,
2095 card_id,
2096 pkg: t.get("pkg").and_then(|v| v.as_str()).map(String::from),
2097 set_at: t
2098 .get("set_at")
2099 .and_then(|v| v.as_str())
2100 .map(String::from)
2101 .unwrap_or_default(),
2102 note: t.get("note").and_then(|v| v.as_str()).map(String::from),
2103 });
2104 }
2105 Ok(out)
2106 }
2107
2108 fn write_aliases(&self, aliases: &[Alias]) -> Result<(), String> {
2109 if !self.root.exists() {
2112 fs::create_dir_all(&self.root)
2113 .map_err(|e| format!("Failed to create cards dir: {e}"))?;
2114 }
2115 let path = self.aliases_path();
2116 let mut arr = Vec::with_capacity(aliases.len());
2117 for a in aliases {
2118 let mut t = toml::map::Map::new();
2119 t.insert("name".into(), toml::Value::String(a.name.clone()));
2120 t.insert("card_id".into(), toml::Value::String(a.card_id.clone()));
2121 if let Some(p) = &a.pkg {
2122 t.insert("pkg".into(), toml::Value::String(p.clone()));
2123 }
2124 t.insert("set_at".into(), toml::Value::String(a.set_at.clone()));
2125 if let Some(n) = &a.note {
2126 t.insert("note".into(), toml::Value::String(n.clone()));
2127 }
2128 arr.push(toml::Value::Table(t));
2129 }
2130 let mut root = toml::map::Map::new();
2131 root.insert("alias".into(), toml::Value::Array(arr));
2132 let text = toml::to_string_pretty(&toml::Value::Table(root))
2133 .map_err(|e| format!("Failed to serialize aliases: {e}"))?;
2134 let tmp = path.with_extension("toml.tmp");
2135 fs::write(&tmp, &text).map_err(|e| format!("Failed to write aliases tmp: {e}"))?;
2136 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename aliases file: {e}"))?;
2137 Ok(())
2138 }
2139
2140 fn samples_exists(&self, card_id: &str) -> Result<bool, String> {
2141 let path = self.samples_path(card_id)?;
2142 Ok(path.exists())
2143 }
2144
2145 fn write_samples_text(&self, card_id: &str, jsonl_text: &str) -> Result<PathBuf, String> {
2146 let path = self.samples_path(card_id)?;
2147 if path.exists() {
2148 return Err(format!(
2149 "alc.card.write_samples: samples already exist for card '{card_id}' (write-once)"
2150 ));
2151 }
2152 let tmp = path.with_extension("jsonl.tmp");
2153 fs::write(&tmp, jsonl_text).map_err(|e| format!("Failed to write samples tmp: {e}"))?;
2154 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename samples file: {e}"))?;
2155 Ok(path)
2156 }
2157
2158 fn read_samples_text(&self, card_id: &str) -> Result<Option<String>, String> {
2159 let path = self.samples_path(card_id)?;
2160 if !path.exists() {
2161 return Ok(None);
2162 }
2163 let text =
2164 fs::read_to_string(&path).map_err(|e| format!("Failed to read samples file: {e}"))?;
2165 Ok(Some(text))
2166 }
2167
2168 fn import_from_dir(
2169 &self,
2170 source_dir: &Path,
2171 pkg: &str,
2172 ) -> Result<(Vec<String>, Vec<String>), String> {
2173 validate_name(pkg, "pkg")?;
2174 let dest = self.pkg_dir(pkg)?;
2175 let mut imported = Vec::new();
2176 let mut skipped = Vec::new();
2177
2178 let entries =
2179 fs::read_dir(source_dir).map_err(|e| format!("Failed to read card source dir: {e}"))?;
2180
2181 for entry in entries.flatten() {
2182 let path = entry.path();
2183 let fname = match path.file_name().and_then(|n| n.to_str()) {
2184 Some(n) => n.to_string(),
2185 None => continue,
2186 };
2187
2188 if !fname.ends_with(".toml") {
2189 continue;
2190 }
2191
2192 let card_id = fname.trim_end_matches(".toml");
2193 let dest_toml = dest.join(&fname);
2194
2195 if dest_toml.exists() {
2196 skipped.push(card_id.to_string());
2197 continue;
2198 }
2199
2200 let text = fs::read_to_string(&path)
2201 .map_err(|e| format!("Failed to read card file '{fname}': {e}"))?;
2202 let val: toml::Value = toml::from_str(&text)
2203 .map_err(|e| format!("Failed to parse card file '{fname}': {e}"))?;
2204 if val.get("schema_version").and_then(|v| v.as_str()) != Some(SCHEMA_VERSION) {
2205 continue;
2206 }
2207
2208 fs::copy(&path, &dest_toml)
2209 .map_err(|e| format!("Failed to copy card '{fname}': {e}"))?;
2210
2211 let samples_name = format!("{card_id}.samples.jsonl");
2212 let samples_src = source_dir.join(&samples_name);
2213 if samples_src.exists() {
2214 let samples_dest = dest.join(&samples_name);
2215 if !samples_dest.exists() {
2216 fs::copy(&samples_src, &samples_dest)
2217 .map_err(|e| format!("Failed to copy samples '{samples_name}': {e}"))?;
2218 }
2219 }
2220
2221 imported.push(card_id.to_string());
2222 }
2223
2224 Ok((imported, skipped))
2225 }
2226}
2227
2228#[derive(Debug, Clone)]
2246pub enum CardEvent {
2247 Created {
2249 pkg: String,
2250 card_id: String,
2251 toml_text: String,
2252 },
2253 Appended { card_id: String, toml_text: String },
2255 SamplesWritten { card_id: String, jsonl_text: String },
2257 AliasesWritten { toml_text: String },
2259}
2260
2261#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
2265pub enum CardEventKind {
2266 Created,
2267 Appended,
2268 SamplesWritten,
2269 AliasesWritten,
2270}
2271
2272impl CardEventKind {
2273 pub fn as_str(self) -> &'static str {
2275 match self {
2276 CardEventKind::Created => "created",
2277 CardEventKind::Appended => "appended",
2278 CardEventKind::SamplesWritten => "samples_written",
2279 CardEventKind::AliasesWritten => "aliases_written",
2280 }
2281 }
2282
2283 pub fn json_key(self) -> &'static str {
2287 match self {
2288 CardEventKind::Created => "created",
2289 CardEventKind::Appended => "appended",
2290 CardEventKind::SamplesWritten => "samples",
2291 CardEventKind::AliasesWritten => "aliases",
2292 }
2293 }
2294
2295 pub fn all() -> [CardEventKind; 4] {
2299 [
2300 CardEventKind::Created,
2301 CardEventKind::Appended,
2302 CardEventKind::SamplesWritten,
2303 CardEventKind::AliasesWritten,
2304 ]
2305 }
2306}
2307
2308impl Serialize for CardEventKind {
2309 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2310 s.serialize_str(self.json_key())
2311 }
2312}
2313
2314impl CardEvent {
2315 pub fn kind(&self) -> CardEventKind {
2317 match self {
2318 CardEvent::Created { .. } => CardEventKind::Created,
2319 CardEvent::Appended { .. } => CardEventKind::Appended,
2320 CardEvent::SamplesWritten { .. } => CardEventKind::SamplesWritten,
2321 CardEvent::AliasesWritten { .. } => CardEventKind::AliasesWritten,
2322 }
2323 }
2324}
2325
2326pub trait CardSubscriber: Send + Sync {
2334 fn on_event(&self, ev: &CardEvent) -> Result<(), String>;
2338
2339 fn describe(&self) -> String;
2343
2344 fn has_card(&self, _card_id: &str) -> Result<bool, String> {
2352 Ok(false)
2353 }
2354}
2355
2356#[derive(Debug, Clone, Serialize)]
2361pub struct LastError {
2362 pub kind: CardEventKind,
2363 pub msg: String,
2364 pub ts_ms: u64,
2365}
2366
2367#[derive(Default, Debug)]
2371pub struct PerSubscriber {
2372 pub ok: HashMap<CardEventKind, u64>,
2373 pub err: HashMap<CardEventKind, u64>,
2374 pub last_error: Option<LastError>,
2375}
2376
2377#[derive(Default, Debug)]
2380pub struct SubscriberStats {
2381 inner: Mutex<HashMap<String, PerSubscriber>>,
2382}
2383
2384impl SubscriberStats {
2385 pub fn record_ok(&self, key: &str, kind: CardEventKind) {
2387 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2388 let entry = g.entry(key.to_string()).or_default();
2389 let c = entry.ok.entry(kind).or_insert(0);
2390 *c = c.saturating_add(1);
2391 }
2392
2393 pub fn record_err(&self, key: &str, kind: CardEventKind, err: &str) {
2397 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2398 let entry = g.entry(key.to_string()).or_default();
2399 let c = entry.err.entry(kind).or_insert(0);
2400 *c = c.saturating_add(1);
2401 entry.last_error = Some(LastError {
2402 kind,
2403 msg: err.to_string(),
2404 ts_ms: now_ms(),
2405 });
2406 }
2407
2408 pub fn snapshot(&self) -> Vec<SubscriberHealthRow> {
2417 let g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
2418 let mut rows = Vec::with_capacity(g.len());
2419 for (sink, ps) in g.iter() {
2420 let mut ok: HashMap<String, u64> = HashMap::with_capacity(4);
2421 let mut err: HashMap<String, u64> = HashMap::with_capacity(4);
2422 for k in CardEventKind::all() {
2423 ok.insert(
2424 k.json_key().to_string(),
2425 ps.ok.get(&k).copied().unwrap_or(0),
2426 );
2427 err.insert(
2428 k.json_key().to_string(),
2429 ps.err.get(&k).copied().unwrap_or(0),
2430 );
2431 }
2432 rows.push(SubscriberHealthRow {
2433 sink: sink.clone(),
2434 ok,
2435 err,
2436 last_error: ps.last_error.clone(),
2437 });
2438 }
2439 rows.sort_by(|a, b| a.sink.cmp(&b.sink));
2442 rows
2443 }
2444}
2445
2446fn now_ms() -> u64 {
2450 std::time::SystemTime::now()
2451 .duration_since(std::time::UNIX_EPOCH)
2452 .unwrap_or_default()
2453 .as_millis() as u64
2454}
2455
2456#[derive(Debug, Clone, Serialize)]
2459pub struct SubscriberHealthRow {
2460 pub sink: String,
2461 pub ok: HashMap<String, u64>,
2462 pub err: HashMap<String, u64>,
2463 pub last_error: Option<LastError>,
2464}
2465
2466pub fn subscriber_stats_snapshot() -> Vec<SubscriberHealthRow> {
2471 event_bus().stats().snapshot()
2472}
2473
2474fn atomic_write(dest: &Path, bytes: &[u8]) -> Result<(), String> {
2481 static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
2482 let seq = TMP_SEQ.fetch_add(1, Ordering::Relaxed);
2483 let pid = process::id();
2484 if let Some(parent) = dest.parent() {
2485 if !parent.as_os_str().is_empty() && !parent.exists() {
2486 fs::create_dir_all(parent).map_err(|e| format!("subscriber mkdir: {e}"))?;
2487 }
2488 }
2489 let mut tmp = dest.as_os_str().to_owned();
2490 tmp.push(format!(".tmp.{pid}.{seq}"));
2491 let tmp_path = PathBuf::from(tmp);
2492 fs::write(&tmp_path, bytes).map_err(|e| format!("subscriber write tmp: {e}"))?;
2493 fs::rename(&tmp_path, dest).map_err(|e| format!("subscriber rename: {e}"))
2494}
2495
2496fn canonical_file_uri(root: &Path) -> String {
2500 let p = root.to_string_lossy();
2501 #[cfg(unix)]
2502 {
2503 format!("file://{p}")
2504 }
2505 #[cfg(windows)]
2506 {
2507 format!("file:///{}", p.replace('\\', "/"))
2508 }
2509 #[cfg(not(any(unix, windows)))]
2510 {
2511 format!("file://{p}")
2512 }
2513}
2514
2515pub struct FileCardSubscriber {
2522 root: PathBuf,
2523 uri: String,
2524}
2525
2526impl FileCardSubscriber {
2527 pub fn new(root: PathBuf) -> Self {
2530 let uri = canonical_file_uri(&root);
2531 Self { root, uri }
2532 }
2533
2534 pub fn locate_card(&self, card_id: &str) -> Result<Option<PathBuf>, String> {
2538 validate_name(card_id, "card_id")?;
2539 if !self.root.exists() {
2540 return Ok(None);
2541 }
2542 let entries = fs::read_dir(&self.root).map_err(|e| format!("subscriber read_dir: {e}"))?;
2543 for entry in entries.flatten() {
2544 let p = entry.path();
2545 if p.is_dir() {
2546 let candidate = p.join(format!("{card_id}.toml"));
2547 if candidate.exists() {
2548 return Ok(Some(candidate));
2549 }
2550 }
2551 }
2552 Ok(None)
2553 }
2554
2555 fn ensure_pkg_dir(&self, pkg: &str) -> Result<PathBuf, String> {
2556 validate_name(pkg, "pkg")?;
2557 let dir = self.root.join(pkg);
2558 if !dir.exists() {
2559 fs::create_dir_all(&dir).map_err(|e| format!("subscriber mkdir: {e}"))?;
2560 }
2561 Ok(dir)
2562 }
2563
2564 fn write_created(&self, pkg: &str, card_id: &str, toml_text: &str) -> Result<(), String> {
2565 validate_name(card_id, "card_id")?;
2566 let dir = self.ensure_pkg_dir(pkg)?;
2567 let dest = dir.join(format!("{card_id}.toml"));
2568 atomic_write(&dest, toml_text.as_bytes())
2569 }
2570
2571 fn write_appended(&self, card_id: &str, toml_text: &str) -> Result<(), String> {
2572 match self.locate_card(card_id)? {
2573 Some(dest) => atomic_write(&dest, toml_text.as_bytes()),
2574 None => Err(format!(
2575 "subscriber append: card '{card_id}' missing at {}",
2576 self.uri
2577 )),
2578 }
2579 }
2580
2581 fn write_samples(&self, card_id: &str, jsonl_text: &str) -> Result<(), String> {
2582 let card_path = self.locate_card(card_id)?.ok_or_else(|| {
2583 format!(
2584 "subscriber samples: card '{card_id}' missing at {}",
2585 self.uri
2586 )
2587 })?;
2588 let dir = card_path
2589 .parent()
2590 .ok_or_else(|| format!("subscriber samples: card '{card_id}' has no parent dir"))?;
2591 let dest = dir.join(format!("{card_id}.samples.jsonl"));
2592 atomic_write(&dest, jsonl_text.as_bytes())
2593 }
2594
2595 fn write_aliases(&self, toml_text: &str) -> Result<(), String> {
2596 if !self.root.exists() {
2597 fs::create_dir_all(&self.root).map_err(|e| format!("subscriber mkdir: {e}"))?;
2598 }
2599 let dest = self.root.join("_aliases.toml");
2600 atomic_write(&dest, toml_text.as_bytes())
2601 }
2602}
2603
2604impl CardSubscriber for FileCardSubscriber {
2605 fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
2606 match ev {
2607 CardEvent::Created {
2608 pkg,
2609 card_id,
2610 toml_text,
2611 } => self.write_created(pkg, card_id, toml_text),
2612 CardEvent::Appended { card_id, toml_text } => self.write_appended(card_id, toml_text),
2613 CardEvent::SamplesWritten {
2614 card_id,
2615 jsonl_text,
2616 } => self.write_samples(card_id, jsonl_text),
2617 CardEvent::AliasesWritten { toml_text } => self.write_aliases(toml_text),
2618 }
2619 }
2620
2621 fn describe(&self) -> String {
2622 self.uri.clone()
2623 }
2624
2625 fn has_card(&self, card_id: &str) -> Result<bool, String> {
2629 Ok(self.locate_card(card_id)?.is_some())
2630 }
2631}
2632
2633pub struct CardEventBus {
2640 subscribers: Mutex<Vec<Arc<dyn CardSubscriber>>>,
2641 stats: Arc<SubscriberStats>,
2642}
2643
2644impl CardEventBus {
2645 pub fn new(subscribers: Vec<Arc<dyn CardSubscriber>>) -> Self {
2648 Self {
2649 subscribers: Mutex::new(subscribers),
2650 stats: Arc::new(SubscriberStats::default()),
2651 }
2652 }
2653
2654 pub fn stats(&self) -> &Arc<SubscriberStats> {
2656 &self.stats
2657 }
2658
2659 pub fn publish(&self, ev: &CardEvent) {
2663 let subs_snapshot: Vec<Arc<dyn CardSubscriber>> = {
2664 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2665 guard.clone()
2666 };
2667 for sub in &subs_snapshot {
2668 let key = sub.describe();
2669 match sub.on_event(ev) {
2670 Ok(()) => self.stats.record_ok(&key, ev.kind()),
2671 Err(e) => {
2672 tracing::warn!(
2673 subscriber = %key,
2674 kind = ev.kind().as_str(),
2675 error = %e,
2676 "card subscriber failed"
2677 );
2678 self.stats.record_err(&key, ev.kind(), &e);
2679 }
2680 }
2681 }
2682 }
2683
2684 pub fn publish_to(&self, target: &str, ev: &CardEvent) -> Result<(), String> {
2688 let hit: Option<Arc<dyn CardSubscriber>> = {
2689 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2690 guard.iter().find(|s| s.describe() == target).cloned()
2691 };
2692 let Some(sub) = hit else {
2693 return Err(format!("subscriber not registered: {target}"));
2694 };
2695 let key = sub.describe();
2696 match sub.on_event(ev) {
2697 Ok(()) => {
2698 self.stats.record_ok(&key, ev.kind());
2699 Ok(())
2700 }
2701 Err(e) => {
2702 tracing::warn!(
2703 subscriber = %key,
2704 kind = ev.kind().as_str(),
2705 error = %e,
2706 "card subscriber failed (publish_to)"
2707 );
2708 self.stats.record_err(&key, ev.kind(), &e);
2709 Err(e)
2710 }
2711 }
2712 }
2713
2714 pub fn subscriber_uris(&self) -> Vec<String> {
2716 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2717 guard.iter().map(|s| s.describe()).collect()
2718 }
2719
2720 pub fn find_subscriber(&self, uri: &str) -> Option<Arc<dyn CardSubscriber>> {
2725 let guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2726 guard.iter().find(|s| s.describe() == uri).cloned()
2727 }
2728
2729 #[cfg(any(test, feature = "test-support"))]
2732 pub fn replace_subscribers_for_test(&self, subs: Vec<Arc<dyn CardSubscriber>>) {
2733 let mut guard = self.subscribers.lock().unwrap_or_else(|p| p.into_inner());
2734 *guard = subs;
2735 }
2736
2737 #[cfg(any(test, feature = "test-support"))]
2739 pub fn reset_stats_for_test(&self) {
2740 let mut g = self.stats.inner.lock().unwrap_or_else(|p| p.into_inner());
2741 g.clear();
2742 }
2743}
2744
2745static CARD_EVENT_BUS: OnceLock<CardEventBus> = OnceLock::new();
2746
2747pub fn event_bus() -> &'static CardEventBus {
2750 CARD_EVENT_BUS.get_or_init(|| {
2751 let subs = load_subscribers_from_env();
2752 CardEventBus::new(subs)
2753 })
2754}
2755
2756pub fn init_event_bus() {
2761 let bus = event_bus();
2762 let uris = bus.subscriber_uris();
2763 if uris.is_empty() {
2764 tracing::info!("card sinks: no subscribers configured (ALC_CARD_SINKS unset)");
2765 } else {
2766 for uri in &uris {
2767 tracing::info!(subscriber = %uri, "card sink registered");
2768 }
2769 }
2770}
2771
2772#[cfg(any(test, feature = "test-support"))]
2774pub fn install_event_bus_for_test(bus: CardEventBus) -> Result<(), String> {
2775 CARD_EVENT_BUS
2776 .set(bus)
2777 .map_err(|_| "bus already initialized".to_string())
2778}
2779
2780pub fn publish(ev: CardEvent) {
2782 #[cfg(test)]
2788 {
2789 let is_test_owner = INSIDE_BUS_TEST.with(|f| f.get());
2790 if !is_test_owner {
2791 let _gate = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
2793 event_bus().publish(&ev);
2794 return;
2795 }
2796 }
2797 event_bus().publish(&ev);
2798}
2799
2800#[cfg(test)]
2803fn bus_test_gate() -> &'static Mutex<()> {
2804 static GATE: OnceLock<Mutex<()>> = OnceLock::new();
2805 GATE.get_or_init(|| Mutex::new(()))
2806}
2807
2808#[cfg(test)]
2811thread_local! {
2812 static INSIDE_BUS_TEST: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
2813}
2814
2815#[derive(Debug, Clone, Default, Serialize)]
2822pub struct SinkBackfillReport {
2823 pub sink: String,
2824 pub pushed: Vec<String>,
2825 pub skipped: Vec<String>,
2826 pub failed: Vec<(String, String)>,
2827 pub pushed_samples: Vec<String>,
2828}
2829
2830pub fn card_sink_backfill(sink: &str, dry_run: bool) -> Result<SinkBackfillReport, String> {
2846 let store = default_store()?;
2847 card_sink_backfill_with_store(&store, sink, dry_run)
2848}
2849
2850pub fn card_sink_backfill_with_store(
2854 store: &dyn CardStore,
2855 sink: &str,
2856 dry_run: bool,
2857) -> Result<SinkBackfillReport, String> {
2858 let bus = event_bus();
2859 let sub = bus
2860 .find_subscriber(sink)
2861 .ok_or_else(|| format!("unknown sink: {sink}"))?;
2862
2863 let locators = store.list_card_locators(None)?;
2864
2865 let mut report = SinkBackfillReport {
2866 sink: sink.to_string(),
2867 ..Default::default()
2868 };
2869
2870 for (pkg, locator) in locators {
2871 let card_id = match locator.file_stem().and_then(|s| s.to_str()) {
2872 Some(s) => s.to_string(),
2873 None => continue,
2874 };
2875
2876 match sub.has_card(&card_id) {
2877 Ok(true) => {
2878 report.skipped.push(card_id);
2879 continue;
2880 }
2881 Ok(false) => {}
2882 Err(e) => {
2883 tracing::warn!(
2884 card_id = %card_id,
2885 error = %e,
2886 "backfill: has_card failed; treating as skipped"
2887 );
2888 report.skipped.push(card_id);
2889 continue;
2890 }
2891 }
2892
2893 let toml_text = match store.read_locator_text(&locator) {
2894 Ok(Some(t)) => t,
2895 Ok(None) => {
2896 report.skipped.push(card_id);
2898 continue;
2899 }
2900 Err(e) => {
2901 tracing::warn!(
2902 card_id = %card_id,
2903 error = %e,
2904 "backfill: read_locator_text failed; treating as skipped"
2905 );
2906 report.skipped.push(card_id);
2907 continue;
2908 }
2909 };
2910
2911 if dry_run {
2912 report.pushed.push(card_id.clone());
2913 if matches!(store.read_samples_text(&card_id), Ok(Some(_))) {
2914 report.pushed_samples.push(card_id);
2915 }
2916 continue;
2917 }
2918
2919 let ev = CardEvent::Created {
2920 pkg: pkg.clone(),
2921 card_id: card_id.clone(),
2922 toml_text,
2923 };
2924 match bus.publish_to(sink, &ev) {
2925 Ok(()) => report.pushed.push(card_id.clone()),
2926 Err(e) => {
2927 report.failed.push((card_id, e));
2928 continue;
2929 }
2930 }
2931
2932 if let Ok(Some(jsonl_text)) = store.read_samples_text(&card_id) {
2933 let ev = CardEvent::SamplesWritten {
2934 card_id: card_id.clone(),
2935 jsonl_text,
2936 };
2937 match bus.publish_to(sink, &ev) {
2938 Ok(()) => report.pushed_samples.push(card_id),
2939 Err(e) => {
2940 report.failed.push((card_id, format!("samples: {e}")));
2941 }
2942 }
2943 }
2944 }
2945
2946 Ok(report)
2947}
2948
2949fn load_subscribers_from_env() -> Vec<Arc<dyn CardSubscriber>> {
2955 let raw = match std::env::var("ALC_CARD_SINKS") {
2956 Ok(v) => v,
2957 Err(std::env::VarError::NotPresent) => return Vec::new(),
2958 Err(std::env::VarError::NotUnicode(_)) => {
2959 tracing::error!("ALC_CARD_SINKS contains non-UTF8 bytes; ignoring entire variable");
2960 return Vec::new();
2961 }
2962 };
2963 parse_subscribers_from_str(&raw)
2964}
2965
2966fn parse_subscribers_from_str(raw: &str) -> Vec<Arc<dyn CardSubscriber>> {
2970 if raw.is_empty() {
2971 return Vec::new();
2972 }
2973 let mut seen: HashSet<String> = HashSet::new();
2974 let mut out: Vec<Arc<dyn CardSubscriber>> = Vec::new();
2975 for spec in raw.split('|') {
2976 let spec = spec.trim();
2977 if spec.is_empty() {
2978 continue;
2979 }
2980 let Some(sub) = parse_subscriber_spec(spec) else {
2981 continue;
2982 };
2983 let uri = sub.describe();
2984 if !seen.insert(uri.clone()) {
2985 tracing::warn!(subscriber = %uri, "duplicate ALC_CARD_SINKS entry; keeping first");
2986 continue;
2987 }
2988 out.push(sub);
2989 }
2990 out
2991}
2992
2993fn parse_subscriber_spec(spec: &str) -> Option<Arc<dyn CardSubscriber>> {
2995 let Some(colon_idx) = spec.find(':') else {
2997 tracing::error!(spec, "ALC_CARD_SINKS entry missing URI scheme");
2998 return None;
2999 };
3000 let scheme = &spec[..colon_idx];
3001 let rest = &spec[colon_idx + 1..];
3002 if scheme != "file" {
3003 tracing::error!(spec, scheme, "ALC_CARD_SINKS entry has unknown scheme");
3004 return None;
3005 }
3006 let Some(after_slash) = rest.strip_prefix("//") else {
3008 tracing::error!(spec, "file URI missing '//'");
3009 return None;
3010 };
3011 let Some(path_start) = after_slash.find('/') else {
3013 tracing::error!(spec, "file URI has no path component");
3014 return None;
3015 };
3016 let authority = &after_slash[..path_start];
3017 let encoded_path = &after_slash[path_start..];
3018 if !authority.is_empty() {
3019 tracing::error!(
3020 spec,
3021 authority,
3022 "file URI with non-empty authority is rejected"
3023 );
3024 return None;
3025 }
3026 let path = decode_file_uri_path(encoded_path)?;
3027 Some(Arc::new(FileCardSubscriber::new(path)))
3028}
3029
3030fn decode_file_uri_path(encoded: &str) -> Option<PathBuf> {
3036 let decoded = percent_decode(encoded)?;
3037 #[cfg(windows)]
3038 {
3039 let trimmed = decoded.strip_prefix('/').unwrap_or(&decoded);
3041 Some(PathBuf::from(trimmed))
3042 }
3043 #[cfg(not(windows))]
3044 {
3045 Some(PathBuf::from(decoded))
3046 }
3047}
3048
3049fn percent_decode(src: &str) -> Option<String> {
3052 let bytes = src.as_bytes();
3053 let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
3054 let mut i = 0;
3055 while i < bytes.len() {
3056 let b = bytes[i];
3057 if b == b'%' {
3058 if i + 2 >= bytes.len() {
3059 tracing::error!(src, "percent-encoded sequence truncated");
3060 return None;
3061 }
3062 let hi = (bytes[i + 1] as char).to_digit(16);
3063 let lo = (bytes[i + 2] as char).to_digit(16);
3064 match (hi, lo) {
3065 (Some(h), Some(l)) => {
3066 out.push(((h << 4) | l) as u8);
3067 i += 3;
3068 }
3069 _ => {
3070 tracing::error!(src, "percent-encoded sequence has non-hex digits");
3071 return None;
3072 }
3073 }
3074 } else {
3075 out.push(b);
3076 i += 1;
3077 }
3078 }
3079 match String::from_utf8(out) {
3080 Ok(s) => Some(s),
3081 Err(_) => {
3082 tracing::error!(src, "percent-decoded bytes are not valid UTF-8");
3083 None
3084 }
3085 }
3086}
3087
3088#[cfg(test)]
3089mod tests {
3090 use super::*;
3091
3092 fn unique_pkg() -> String {
3093 let ns = std::time::SystemTime::now()
3094 .duration_since(std::time::UNIX_EPOCH)
3095 .unwrap()
3096 .as_nanos();
3097 format!("_test_card_{ns}")
3098 }
3099
3100 fn cleanup(pkg: &str) {
3101 if let Ok(store) = default_store() {
3102 if let Ok(d) = store.pkg_dir(pkg) {
3103 let _ = fs::remove_dir_all(&d);
3104 }
3105 }
3106 }
3107
3108 #[test]
3109 fn minimum_valid_card() {
3110 let pkg = unique_pkg();
3111 let input = json!({ "pkg": { "name": pkg } });
3112 let (id, path) = create(input).unwrap();
3113 assert!(path.exists());
3114 assert!(id.starts_with(&pkg));
3115
3116 let got = get(&id).unwrap().unwrap();
3117 assert_eq!(got["schema_version"], json!(SCHEMA_VERSION));
3118 assert_eq!(got["card_id"], json!(id));
3119 assert_eq!(got["pkg"]["name"], json!(pkg));
3120 assert!(got.get("created_at").is_some());
3121 assert!(got.get("created_by").is_some());
3122
3123 cleanup(&pkg);
3124 }
3125
3126 #[test]
3127 fn create_rejects_missing_pkg_name() {
3128 let err = create(json!({})).unwrap_err();
3129 assert!(err.contains("pkg.name"));
3130 }
3131
3132 #[test]
3133 fn create_is_immutable() {
3134 let pkg = unique_pkg();
3135 let input = json!({
3136 "card_id": "fixed_id_001",
3137 "pkg": { "name": pkg }
3138 });
3139 create(input.clone()).unwrap();
3140 let err = create(input).unwrap_err();
3141 assert!(err.contains("already exists"));
3142 cleanup(&pkg);
3143 }
3144
3145 #[test]
3146 fn create_injects_param_fingerprint() {
3147 let pkg = unique_pkg();
3148 let input = json!({
3149 "pkg": { "name": pkg },
3150 "params": { "depth": 3, "temperature": 0.0 }
3151 });
3152 let (id, _) = create(input).unwrap();
3153 let got = get(&id).unwrap().unwrap();
3154 assert!(got["param_fingerprint"].is_string());
3155 cleanup(&pkg);
3156 }
3157
3158 #[test]
3159 fn list_returns_newest_first() {
3160 let pkg = unique_pkg();
3161 let (id1, _) = create(json!({
3163 "card_id": format!("{pkg}_a"),
3164 "pkg": { "name": pkg },
3165 "created_at": "2025-01-01T00:00:00Z"
3166 }))
3167 .unwrap();
3168 let (id2, _) = create(json!({
3169 "card_id": format!("{pkg}_b"),
3170 "pkg": { "name": pkg },
3171 "created_at": "2026-01-01T00:00:00Z"
3172 }))
3173 .unwrap();
3174
3175 let rows = list(Some(&pkg)).unwrap();
3176 assert_eq!(rows.len(), 2);
3177 assert_eq!(rows[0].card_id, id2); assert_eq!(rows[1].card_id, id1);
3179
3180 cleanup(&pkg);
3181 }
3182
3183 #[test]
3184 fn list_extracts_summary_fields() {
3185 let pkg = unique_pkg();
3186 let (id, _) = create(json!({
3187 "pkg": { "name": pkg },
3188 "model": { "id": "claude-opus-4-6" },
3189 "scenario": { "name": "gsm8k_sample100" },
3190 "stats": { "pass_rate": 0.82 }
3191 }))
3192 .unwrap();
3193
3194 let rows = list(Some(&pkg)).unwrap();
3195 let row = rows.iter().find(|r| r.card_id == id).unwrap();
3196 assert_eq!(row.model.as_deref(), Some("claude-opus-4-6"));
3197 assert_eq!(row.scenario.as_deref(), Some("gsm8k_sample100"));
3198 assert_eq!(row.pass_rate, Some(0.82));
3199
3200 cleanup(&pkg);
3201 }
3202
3203 #[test]
3204 fn get_missing_returns_none() {
3205 assert!(get("does_not_exist_xyz").unwrap().is_none());
3206 }
3207
3208 #[test]
3209 fn card_id_embeds_compact_timestamp() {
3210 let pkg = unique_pkg();
3211 let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3212 let tail = id.strip_prefix(&format!("{pkg}_")).unwrap();
3216 let parts: Vec<&str> = tail.split('_').collect();
3217 assert_eq!(parts.len(), 3, "unexpected card_id shape: {id}");
3219 let ts = parts[1];
3220 assert_eq!(ts.len(), 15, "timestamp segment wrong length: {ts}");
3221 assert!(ts.chars().nth(8) == Some('T'), "missing T separator: {ts}");
3222 cleanup(&pkg);
3223 }
3224
3225 #[test]
3226 fn now_compact_format() {
3227 let s = now_compact();
3228 assert_eq!(s.len(), 15);
3229 assert_eq!(s.chars().nth(8), Some('T'));
3230 for (i, c) in s.chars().enumerate() {
3232 if i != 8 {
3233 assert!(c.is_ascii_digit(), "non-digit at pos {i}: {s}");
3234 }
3235 }
3236 }
3237
3238 #[test]
3239 fn short_model_variants() {
3240 assert_eq!(short_model("claude-opus-4-6"), "opus46");
3241 assert_eq!(short_model("gpt-4o"), "4o");
3242 assert_eq!(short_model(""), "model");
3243 }
3244
3245 #[test]
3246 fn two_cards_same_second_different_stats_get_distinct_ids() {
3247 let pkg = unique_pkg();
3248 let input1 = json!({
3249 "pkg": { "name": pkg },
3250 "scenario": { "name": "gsm8k" },
3251 "stats": { "pass_rate": 0.4 }
3252 });
3253 let input2 = json!({
3254 "pkg": { "name": pkg },
3255 "scenario": { "name": "gsm8k" },
3256 "stats": { "pass_rate": 0.9 }
3257 });
3258 let (id1, _) = create(input1).unwrap();
3259 let (id2, _) = create(input2).unwrap();
3260 assert_ne!(id1, id2, "distinct stats must yield distinct card_ids");
3261 cleanup(&pkg);
3262 }
3263
3264 #[test]
3267 fn append_adds_new_fields() {
3268 let pkg = unique_pkg();
3269 let (id, _) = create(json!({
3270 "pkg": { "name": pkg },
3271 "stats": { "pass_rate": 0.5 }
3272 }))
3273 .unwrap();
3274
3275 let merged = append(
3276 &id,
3277 json!({
3278 "caveats": { "notes": "rescored after fix" },
3279 "metadata": { "reviewer": "yn" }
3280 }),
3281 )
3282 .unwrap();
3283 assert_eq!(merged["caveats"]["notes"], json!("rescored after fix"));
3284 assert_eq!(merged["metadata"]["reviewer"], json!("yn"));
3285
3286 let got = get(&id).unwrap().unwrap();
3288 assert_eq!(got["caveats"]["notes"], json!("rescored after fix"));
3289 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3291
3292 cleanup(&pkg);
3293 }
3294
3295 #[test]
3296 fn append_rejects_existing_key() {
3297 let pkg = unique_pkg();
3298 let (id, _) = create(json!({
3299 "pkg": { "name": pkg },
3300 "stats": { "pass_rate": 0.5 }
3301 }))
3302 .unwrap();
3303
3304 let err = append(&id, json!({ "stats": { "pass_rate": 0.9 } })).unwrap_err();
3305 assert!(err.contains("already set"), "got: {err}");
3306 let got = get(&id).unwrap().unwrap();
3308 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
3309
3310 cleanup(&pkg);
3311 }
3312
3313 #[test]
3314 fn append_errors_on_missing_card() {
3315 let err = append("does_not_exist_xyz", json!({ "x": 1 })).unwrap_err();
3316 assert!(err.contains("not found"));
3317 }
3318
3319 #[test]
3322 fn alias_set_and_list_roundtrip() {
3323 let pkg = unique_pkg();
3324 let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3325
3326 let alias_name = format!("test_alias_{}", &pkg);
3327 alias_set(&alias_name, &id, Some(&pkg), Some("smoke")).unwrap();
3328
3329 let rows = alias_list(Some(&pkg)).unwrap();
3330 let a = rows.iter().find(|a| a.name == alias_name).unwrap();
3331 assert_eq!(a.card_id, id);
3332 assert_eq!(a.pkg.as_deref(), Some(pkg.as_str()));
3333 assert_eq!(a.note.as_deref(), Some("smoke"));
3334 assert!(!a.set_at.is_empty());
3335
3336 let (id2, _) = create(json!({
3338 "card_id": format!("{pkg}_b"),
3339 "pkg": { "name": pkg }
3340 }))
3341 .unwrap();
3342 alias_set(&alias_name, &id2, Some(&pkg), None).unwrap();
3343 let rows = alias_list(Some(&pkg)).unwrap();
3344 let matching: Vec<&Alias> = rows.iter().filter(|a| a.name == alias_name).collect();
3345 assert_eq!(matching.len(), 1, "alias should be unique by name");
3346 assert_eq!(matching[0].card_id, id2);
3347
3348 let store = default_store().unwrap();
3350 let remaining: Vec<Alias> = store
3351 .read_aliases()
3352 .unwrap()
3353 .into_iter()
3354 .filter(|a| a.name != alias_name)
3355 .collect();
3356 store.write_aliases(&remaining).unwrap();
3357 cleanup(&pkg);
3358 }
3359
3360 #[test]
3361 fn alias_set_rejects_unknown_card() {
3362 let err = alias_set("x", "does_not_exist_xyz", None, None).unwrap_err();
3363 assert!(err.contains("not found"));
3364 }
3365
3366 fn where_from(v: Json) -> Predicate {
3369 parse_where(&v).expect("parse where")
3370 }
3371
3372 fn order_from(v: Json) -> Vec<OrderKey> {
3373 parse_order_by(&v).expect("parse order_by")
3374 }
3375
3376 #[test]
3377 fn find_where_nested_eq_and_gte() {
3378 let pkg = unique_pkg();
3379 create(json!({
3380 "card_id": format!("{pkg}_low"),
3381 "pkg": { "name": pkg },
3382 "scenario": { "name": "gsm8k" },
3383 "stats": { "pass_rate": 0.4 }
3384 }))
3385 .unwrap();
3386 create(json!({
3387 "card_id": format!("{pkg}_high"),
3388 "pkg": { "name": pkg },
3389 "scenario": { "name": "gsm8k" },
3390 "stats": { "pass_rate": 0.9 }
3391 }))
3392 .unwrap();
3393 create(json!({
3394 "card_id": format!("{pkg}_other"),
3395 "pkg": { "name": pkg },
3396 "scenario": { "name": "other" },
3397 "stats": { "pass_rate": 1.0 }
3398 }))
3399 .unwrap();
3400
3401 let rows = find(FindQuery {
3403 pkg: Some(pkg.clone()),
3404 where_: Some(where_from(json!({
3405 "scenario": { "name": "gsm8k" },
3406 }))),
3407 order_by: order_from(json!("-stats.pass_rate")),
3408 ..Default::default()
3409 })
3410 .unwrap();
3411 assert_eq!(rows.len(), 2);
3412 assert_eq!(rows[0].pass_rate, Some(0.9));
3413 assert_eq!(rows[1].pass_rate, Some(0.4));
3414
3415 let rows = find(FindQuery {
3417 pkg: Some(pkg.clone()),
3418 where_: Some(where_from(json!({
3419 "stats": { "pass_rate": { "gte": 0.8 } },
3420 }))),
3421 order_by: order_from(json!("-stats.pass_rate")),
3422 ..Default::default()
3423 })
3424 .unwrap();
3425 assert_eq!(rows.len(), 2);
3426 assert!(rows.iter().all(|r| r.pass_rate.unwrap() >= 0.8));
3427
3428 let rows = find(FindQuery {
3430 pkg: Some(pkg.clone()),
3431 order_by: order_from(json!("-stats.pass_rate")),
3432 limit: Some(1),
3433 ..Default::default()
3434 })
3435 .unwrap();
3436 assert_eq!(rows.len(), 1);
3437 assert_eq!(rows[0].pass_rate, Some(1.0));
3438
3439 cleanup(&pkg);
3440 }
3441
3442 #[test]
3443 fn find_where_implicit_eq_and_logical() {
3444 let pkg = unique_pkg();
3445 create(json!({
3446 "card_id": format!("{pkg}_a"),
3447 "pkg": { "name": pkg },
3448 "model": { "id": "claude-opus-4-6" },
3449 "stats": { "equilibrium_position": "dead", "survival_rate": 0.0 }
3450 }))
3451 .unwrap();
3452 create(json!({
3453 "card_id": format!("{pkg}_b"),
3454 "pkg": { "name": pkg },
3455 "model": { "id": "claude-opus-4-6" },
3456 "stats": { "equilibrium_position": "niche_leader", "survival_rate": 1.0 }
3457 }))
3458 .unwrap();
3459 create(json!({
3460 "card_id": format!("{pkg}_c"),
3461 "pkg": { "name": pkg },
3462 "model": { "id": "claude-haiku-4-5-20251001" },
3463 "stats": { "equilibrium_position": "fragile", "survival_rate": 0.2 }
3464 }))
3465 .unwrap();
3466
3467 let rows = find(FindQuery {
3469 pkg: Some(pkg.clone()),
3470 where_: Some(where_from(json!({
3471 "stats": { "equilibrium_position": "dead" },
3472 }))),
3473 ..Default::default()
3474 })
3475 .unwrap();
3476 assert_eq!(rows.len(), 1);
3477 assert!(rows[0].card_id.ends_with("_a"));
3478
3479 let rows = find(FindQuery {
3481 pkg: Some(pkg.clone()),
3482 where_: Some(where_from(json!({
3483 "_or": [
3484 { "stats": { "equilibrium_position": "dead" } },
3485 { "stats": { "survival_rate": { "gte": 0.9 } } },
3486 ],
3487 }))),
3488 ..Default::default()
3489 })
3490 .unwrap();
3491 assert_eq!(rows.len(), 2);
3492
3493 let rows = find(FindQuery {
3495 pkg: Some(pkg.clone()),
3496 where_: Some(where_from(json!({
3497 "_not": { "model": { "id": "claude-haiku-4-5-20251001" } },
3498 }))),
3499 ..Default::default()
3500 })
3501 .unwrap();
3502 assert_eq!(rows.len(), 2);
3503
3504 let rows = find(FindQuery {
3506 pkg: Some(pkg.clone()),
3507 where_: Some(where_from(json!({
3508 "stats": {
3509 "equilibrium_position": { "in": ["dead", "fragile"] },
3510 },
3511 }))),
3512 ..Default::default()
3513 })
3514 .unwrap();
3515 assert_eq!(rows.len(), 2);
3516
3517 let rows = find(FindQuery {
3520 pkg: Some(pkg.clone()),
3521 where_: Some(where_from(json!({
3522 "strategy_params": { "temperature": { "exists": false } },
3523 }))),
3524 ..Default::default()
3525 })
3526 .unwrap();
3527 assert_eq!(rows.len(), 3, "none of the cards have strategy_params");
3528
3529 cleanup(&pkg);
3530 }
3531
3532 #[test]
3533 fn find_order_by_multi_key() {
3534 let pkg = unique_pkg();
3535 create(json!({
3536 "card_id": format!("{pkg}_a"),
3537 "pkg": { "name": pkg },
3538 "stats": { "pass_rate": 0.5 }
3539 }))
3540 .unwrap();
3541 create(json!({
3542 "card_id": format!("{pkg}_b"),
3543 "pkg": { "name": pkg },
3544 "stats": { "pass_rate": 0.9 }
3545 }))
3546 .unwrap();
3547 create(json!({
3548 "card_id": format!("{pkg}_c"),
3549 "pkg": { "name": pkg },
3550 "stats": { "pass_rate": 0.9 }
3551 }))
3552 .unwrap();
3553
3554 let rows = find(FindQuery {
3555 pkg: Some(pkg.clone()),
3556 order_by: order_from(json!(["-stats.pass_rate", "card_id"])),
3557 ..Default::default()
3558 })
3559 .unwrap();
3560 assert_eq!(rows.len(), 3);
3561 assert_eq!(rows[0].pass_rate, Some(0.9));
3562 assert_eq!(rows[1].pass_rate, Some(0.9));
3563 assert_eq!(rows[2].pass_rate, Some(0.5));
3564 assert!(rows[0].card_id < rows[1].card_id);
3566
3567 cleanup(&pkg);
3568 }
3569
3570 #[test]
3571 fn find_offset_and_limit() {
3572 let pkg = unique_pkg();
3573 for i in 0..5 {
3574 create(json!({
3575 "card_id": format!("{pkg}_{i}"),
3576 "pkg": { "name": pkg },
3577 "stats": { "pass_rate": 0.1 * (i + 1) as f64 }
3578 }))
3579 .unwrap();
3580 }
3581
3582 let rows = find(FindQuery {
3583 pkg: Some(pkg.clone()),
3584 order_by: order_from(json!("-stats.pass_rate")),
3585 offset: Some(1),
3586 limit: Some(2),
3587 ..Default::default()
3588 })
3589 .unwrap();
3590 assert_eq!(rows.len(), 2);
3591 let pr0 = rows[0].pass_rate.unwrap();
3593 let pr1 = rows[1].pass_rate.unwrap();
3594 assert!((pr0 - 0.4).abs() < 1e-9, "got {pr0}");
3595 assert!((pr1 - 0.3).abs() < 1e-9, "got {pr1}");
3596
3597 cleanup(&pkg);
3598 }
3599
3600 #[test]
3601 fn parse_where_rejects_non_object() {
3602 assert!(parse_where(&json!("not an object")).is_err());
3603 assert!(parse_where(&json!(42)).is_err());
3604 }
3605
3606 #[test]
3607 fn parse_order_by_accepts_string_and_array() {
3608 let k = parse_order_by(&json!("-stats.pass_rate")).unwrap();
3609 assert_eq!(k.len(), 1);
3610 assert_eq!(k[0].path, vec!["stats", "pass_rate"]);
3611 assert!(k[0].desc);
3612
3613 let k = parse_order_by(&json!(["created_at", "-stats.n"])).unwrap();
3614 assert_eq!(k.len(), 2);
3615 assert!(!k[0].desc);
3616 assert!(k[1].desc);
3617 }
3618
3619 #[test]
3620 fn find_where_string_ops_contains_and_starts_with() {
3621 let pkg = unique_pkg();
3622 create(json!({
3623 "card_id": format!("{pkg}_a"),
3624 "pkg": { "name": pkg },
3625 "model": { "id": "claude-opus-4-6" },
3626 "metadata": { "tag": "experiment_alpha" },
3627 }))
3628 .unwrap();
3629 create(json!({
3630 "card_id": format!("{pkg}_b"),
3631 "pkg": { "name": pkg },
3632 "model": { "id": "claude-haiku-4-5-20251001" },
3633 "metadata": { "tag": "experiment_beta" },
3634 }))
3635 .unwrap();
3636 create(json!({
3637 "card_id": format!("{pkg}_c"),
3638 "pkg": { "name": pkg },
3639 "model": { "id": "claude-sonnet-4-5" },
3640 "metadata": { "tag": "baseline" },
3641 }))
3642 .unwrap();
3643
3644 let rows = find(FindQuery {
3646 pkg: Some(pkg.clone()),
3647 where_: Some(where_from(json!({
3648 "metadata": { "tag": { "contains": "experiment" } },
3649 }))),
3650 ..Default::default()
3651 })
3652 .unwrap();
3653 assert_eq!(rows.len(), 2);
3654
3655 let rows = find(FindQuery {
3657 pkg: Some(pkg.clone()),
3658 where_: Some(where_from(json!({
3659 "model": { "id": { "starts_with": "claude-opus" } },
3660 }))),
3661 ..Default::default()
3662 })
3663 .unwrap();
3664 assert_eq!(rows.len(), 1);
3665 assert!(rows[0].card_id.ends_with("_a"));
3666
3667 let rows = find(FindQuery {
3669 pkg: Some(pkg.clone()),
3670 where_: Some(where_from(json!({
3671 "metadata": { "missing_field": { "contains": "x" } },
3672 }))),
3673 ..Default::default()
3674 })
3675 .unwrap();
3676 assert_eq!(rows.len(), 0);
3677
3678 let rows = find(FindQuery {
3680 pkg: Some(pkg.clone()),
3681 where_: Some(where_from(json!({
3682 "metadata": { "tag": { "starts_with": 42 } },
3683 }))),
3684 ..Default::default()
3685 })
3686 .unwrap();
3687 assert_eq!(rows.len(), 0);
3688
3689 cleanup(&pkg);
3690 }
3691
3692 #[test]
3693 fn where_missing_field_ne_is_true() {
3694 let pkg = unique_pkg();
3695 create(json!({
3696 "card_id": format!("{pkg}_x"),
3697 "pkg": { "name": pkg },
3698 }))
3699 .unwrap();
3700
3701 let rows = find(FindQuery {
3702 pkg: Some(pkg.clone()),
3703 where_: Some(where_from(json!({
3704 "strategy_params": { "temperature": { "ne": 0.5 } },
3705 }))),
3706 ..Default::default()
3707 })
3708 .unwrap();
3709 assert_eq!(rows.len(), 1, "missing field is ne to anything");
3710
3711 cleanup(&pkg);
3712 }
3713
3714 fn create_child(pkg: &str, suffix: &str, parent_id: &str, relation: &str) -> String {
3718 let (id, _) = create(json!({
3719 "card_id": format!("{pkg}_{suffix}"),
3720 "pkg": { "name": pkg },
3721 "stats": { "pass_rate": 0.5 },
3722 "metadata": {
3723 "prior_card_id": parent_id,
3724 "prior_relation": relation,
3725 },
3726 }))
3727 .unwrap();
3728 id
3729 }
3730
3731 #[test]
3732 fn lineage_up_walks_prior_card_id_chain() {
3733 let pkg = unique_pkg();
3734 let (a, _) = create(json!({
3736 "card_id": format!("{pkg}_a"),
3737 "pkg": { "name": pkg },
3738 }))
3739 .unwrap();
3740 let b = create_child(&pkg, "b", &a, "rerun_of");
3741 let c = create_child(&pkg, "c", &b, "rerun_of");
3742
3743 let res = lineage(LineageQuery {
3744 card_id: c.clone(),
3745 direction: LineageDirection::Up,
3746 depth: None,
3747 include_stats: false,
3748 relation_filter: None,
3749 })
3750 .unwrap()
3751 .expect("lineage result");
3752
3753 assert_eq!(res.root, c);
3754 assert_eq!(res.nodes.len(), 3, "root + 2 ancestors");
3755 assert_eq!(res.nodes[0].card_id, c);
3756 assert_eq!(res.nodes[0].depth, 0);
3757 assert_eq!(res.nodes[1].card_id, b);
3758 assert_eq!(res.nodes[1].depth, -1);
3759 assert_eq!(res.nodes[2].card_id, a);
3760 assert_eq!(res.nodes[2].depth, -2);
3761 assert_eq!(res.edges.len(), 2);
3762 assert!(!res.truncated);
3763
3764 cleanup(&pkg);
3765 }
3766
3767 #[test]
3768 fn lineage_down_walks_descendants_breadth_first() {
3769 let pkg = unique_pkg();
3770 let (a, _) = create(json!({
3772 "card_id": format!("{pkg}_a"),
3773 "pkg": { "name": pkg },
3774 }))
3775 .unwrap();
3776 let _b = create_child(&pkg, "b", &a, "sweep_variant");
3777 let c = create_child(&pkg, "c", &a, "sweep_variant");
3778 let _d = create_child(&pkg, "d", &c, "rerun_of");
3779
3780 let res = lineage(LineageQuery {
3781 card_id: a.clone(),
3782 direction: LineageDirection::Down,
3783 depth: None,
3784 include_stats: false,
3785 relation_filter: None,
3786 })
3787 .unwrap()
3788 .expect("lineage result");
3789
3790 assert_eq!(res.nodes.len(), 4);
3792 assert_eq!(res.edges.len(), 3);
3793 assert!(!res.truncated);
3794
3795 cleanup(&pkg);
3796 }
3797
3798 #[test]
3799 fn lineage_depth_truncation_sets_flag() {
3800 let pkg = unique_pkg();
3801 let (a, _) = create(json!({
3802 "card_id": format!("{pkg}_a"),
3803 "pkg": { "name": pkg },
3804 }))
3805 .unwrap();
3806 let b = create_child(&pkg, "b", &a, "rerun_of");
3807 let _c = create_child(&pkg, "c", &b, "rerun_of");
3808
3809 let res = lineage(LineageQuery {
3810 card_id: a,
3811 direction: LineageDirection::Down,
3812 depth: Some(1),
3813 include_stats: false,
3814 relation_filter: None,
3815 })
3816 .unwrap()
3817 .unwrap();
3818 assert_eq!(res.nodes.len(), 2, "root + 1 level");
3819 assert!(res.truncated, "should be truncated at depth=1");
3820
3821 cleanup(&pkg);
3822 }
3823
3824 #[test]
3825 fn lineage_relation_filter_skips_unlisted() {
3826 let pkg = unique_pkg();
3827 let (a, _) = create(json!({
3828 "card_id": format!("{pkg}_a"),
3829 "pkg": { "name": pkg },
3830 }))
3831 .unwrap();
3832 let _b = create_child(&pkg, "b", &a, "sweep_variant");
3833 let _c = create_child(&pkg, "c", &a, "rerun_of");
3834
3835 let res = lineage(LineageQuery {
3836 card_id: a,
3837 direction: LineageDirection::Down,
3838 depth: None,
3839 include_stats: false,
3840 relation_filter: Some(vec!["sweep_variant".to_string()]),
3841 })
3842 .unwrap()
3843 .unwrap();
3844 assert_eq!(res.nodes.len(), 2, "root + only sweep_variant child");
3845 assert_eq!(res.edges[0].relation.as_deref(), Some("sweep_variant"));
3846
3847 cleanup(&pkg);
3848 }
3849
3850 #[test]
3851 fn lineage_missing_card_returns_none() {
3852 let res = lineage(LineageQuery {
3853 card_id: "nonexistent_card_id_xyz".into(),
3854 direction: LineageDirection::Up,
3855 depth: None,
3856 include_stats: false,
3857 relation_filter: None,
3858 })
3859 .unwrap();
3860 assert!(res.is_none());
3861 }
3862
3863 #[test]
3866 fn write_and_read_samples_roundtrip() {
3867 let pkg = unique_pkg();
3868 let (id, _) = create(json!({
3869 "pkg": { "name": pkg },
3870 "stats": { "pass_rate": 0.5 }
3871 }))
3872 .unwrap();
3873
3874 let samples = vec![
3875 json!({ "case": "c0", "passed": true, "score": 1.0 }),
3876 json!({ "case": "c1", "passed": false, "score": 0.0 }),
3877 json!({ "case": "c2", "passed": true, "score": 0.75 }),
3878 ];
3879 let path = write_samples(&id, samples.clone()).unwrap();
3880 assert!(path.exists());
3881 assert!(path.to_string_lossy().ends_with(".samples.jsonl"));
3882
3883 let got = read_samples(&id, SamplesQuery::default()).unwrap();
3884 assert_eq!(got.len(), 3);
3885 assert_eq!(got[0]["case"], json!("c0"));
3886 assert_eq!(got[2]["score"], json!(0.75));
3887
3888 let slice = read_samples(
3890 &id,
3891 SamplesQuery {
3892 offset: 1,
3893 limit: Some(1),
3894 where_: None,
3895 },
3896 )
3897 .unwrap();
3898 assert_eq!(slice.len(), 1);
3899 assert_eq!(slice[0]["case"], json!("c1"));
3900
3901 cleanup(&pkg);
3902 }
3903
3904 #[test]
3905 fn write_samples_is_write_once() {
3906 let pkg = unique_pkg();
3907 let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3908 write_samples(&id, vec![json!({ "x": 1 })]).unwrap();
3909 let err = write_samples(&id, vec![json!({ "x": 2 })]).unwrap_err();
3910 assert!(err.contains("already exist"), "got: {err}");
3911 cleanup(&pkg);
3912 }
3913
3914 #[test]
3915 fn read_samples_empty_when_absent() {
3916 let pkg = unique_pkg();
3917 let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3918 let got = read_samples(&id, SamplesQuery::default()).unwrap();
3919 assert!(got.is_empty());
3920 cleanup(&pkg);
3921 }
3922
3923 #[test]
3924 fn read_samples_where_filters_rows() {
3925 let pkg = unique_pkg();
3926 let (id, _) = create(json!({ "pkg": { "name": pkg } })).unwrap();
3927 write_samples(
3928 &id,
3929 vec![
3930 json!({ "case": "c0", "passed": true, "score": 1.0 }),
3931 json!({ "case": "c1", "passed": false, "score": 0.0 }),
3932 json!({ "case": "c2", "passed": true, "score": 0.25 }),
3933 json!({ "case": "c3", "passed": true, "score": 0.75 }),
3934 json!({ "case": "c4", "passed": false, "score": 0.5 }),
3935 ],
3936 )
3937 .unwrap();
3938
3939 let pred = parse_where(&json!({ "passed": true })).unwrap();
3941 let got = read_samples(
3942 &id,
3943 SamplesQuery {
3944 offset: 0,
3945 limit: None,
3946 where_: Some(pred),
3947 },
3948 )
3949 .unwrap();
3950 assert_eq!(got.len(), 3);
3951 assert_eq!(got[0]["case"], json!("c0"));
3952 assert_eq!(got[1]["case"], json!("c2"));
3953 assert_eq!(got[2]["case"], json!("c3"));
3954
3955 let pred = parse_where(&json!({ "score": { "gte": 0.5 } })).unwrap();
3957 let got = read_samples(
3958 &id,
3959 SamplesQuery {
3960 offset: 0,
3961 limit: None,
3962 where_: Some(pred),
3963 },
3964 )
3965 .unwrap();
3966 assert_eq!(got.len(), 3);
3967 assert_eq!(got[0]["case"], json!("c0"));
3968 assert_eq!(got[1]["case"], json!("c3"));
3969 assert_eq!(got[2]["case"], json!("c4"));
3970
3971 let pred = parse_where(&json!({ "passed": true })).unwrap();
3973 let slice = read_samples(
3974 &id,
3975 SamplesQuery {
3976 offset: 1,
3977 limit: Some(1),
3978 where_: Some(pred),
3979 },
3980 )
3981 .unwrap();
3982 assert_eq!(slice.len(), 1);
3983 assert_eq!(slice[0]["case"], json!("c2"));
3984
3985 cleanup(&pkg);
3986 }
3987
3988 #[test]
3989 fn get_by_alias_roundtrip() {
3990 let pkg = unique_pkg();
3991 let (id, _) = create(json!({
3992 "pkg": { "name": pkg },
3993 "stats": { "pass_rate": 0.85 }
3994 }))
3995 .unwrap();
3996
3997 let alias_name = format!("best_{pkg}");
3998 alias_set(&alias_name, &id, Some(&pkg), None).unwrap();
3999
4000 let card = get_by_alias(&alias_name).unwrap().unwrap();
4001 assert_eq!(card["card_id"], json!(id));
4002 assert_eq!(card["stats"]["pass_rate"], json!(0.85));
4003
4004 assert!(get_by_alias("nonexistent_alias_xyz").unwrap().is_none());
4005
4006 cleanup(&pkg);
4007 }
4008
4009 #[test]
4010 fn samples_errors_on_missing_card() {
4011 let err = write_samples("does_not_exist_xyz_samples", vec![json!({})]).unwrap_err();
4012 assert!(err.contains("not found"));
4013 }
4014
4015 #[test]
4018 fn import_from_dir_copies_cards() {
4019 let pkg = unique_pkg();
4020 let tmp = tempfile::tempdir().unwrap();
4021
4022 let card_id = format!("{pkg}_imported");
4024 let card_content = format!(
4025 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{card_id}\"\npkg = \"{pkg}\"\n"
4026 );
4027 fs::write(tmp.path().join(format!("{card_id}.toml")), &card_content).unwrap();
4028
4029 fs::write(
4031 tmp.path().join(format!("{card_id}.samples.jsonl")),
4032 "{\"case\":\"c0\"}\n",
4033 )
4034 .unwrap();
4035
4036 let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4037 assert_eq!(imported, vec![card_id.clone()]);
4038 assert!(skipped.is_empty());
4039
4040 let got = get(&card_id).unwrap().unwrap();
4042 assert_eq!(got["card_id"], json!(card_id));
4043
4044 let samples = read_samples(&card_id, SamplesQuery::default()).unwrap();
4046 assert_eq!(samples.len(), 1);
4047
4048 cleanup(&pkg);
4049 }
4050
4051 #[test]
4052 fn import_from_dir_skips_existing() {
4053 let pkg = unique_pkg();
4054 let (existing_id, _) = create(json!({
4056 "pkg": { "name": pkg },
4057 "stats": { "pass_rate": 0.5 }
4058 }))
4059 .unwrap();
4060
4061 let tmp = tempfile::tempdir().unwrap();
4063 let card_content = format!(
4064 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"{existing_id}\"\npkg = \"{pkg}\"\n"
4065 );
4066 fs::write(
4067 tmp.path().join(format!("{existing_id}.toml")),
4068 &card_content,
4069 )
4070 .unwrap();
4071
4072 let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4073 assert!(imported.is_empty());
4074 assert_eq!(skipped, vec![existing_id.clone()]);
4075
4076 let got = get(&existing_id).unwrap().unwrap();
4078 assert_eq!(got["stats"]["pass_rate"], json!(0.5));
4079
4080 cleanup(&pkg);
4081 }
4082
4083 #[test]
4084 fn import_from_dir_skips_non_card_toml() {
4085 let pkg = unique_pkg();
4086 let tmp = tempfile::tempdir().unwrap();
4087
4088 fs::write(tmp.path().join("not_a_card.toml"), "title = \"hello\"\n").unwrap();
4090
4091 let (imported, skipped) = import_from_dir(tmp.path(), &pkg).unwrap();
4092 assert!(imported.is_empty());
4093 assert!(skipped.is_empty());
4094
4095 cleanup(&pkg);
4096 }
4097
4098 #[test]
4104 fn custom_root_file_store_roundtrip() {
4105 let tmp = tempfile::tempdir().unwrap();
4106 let store = FileCardStore::new(tmp.path().to_path_buf());
4107 let pkg = "custom_root_pkg";
4108
4109 let (id, path) = create_with_store(
4111 &store,
4112 json!({
4113 "pkg": { "name": pkg },
4114 "model": { "id": "gpt-test" },
4115 }),
4116 )
4117 .unwrap();
4118 assert!(path.starts_with(tmp.path()));
4119 assert!(path.ends_with(format!("{id}.toml")));
4120
4121 let card = get_with_store(&store, &id).unwrap().expect("card exists");
4122 assert_eq!(
4123 card.get("card_id").and_then(|v| v.as_str()),
4124 Some(id.as_str())
4125 );
4126
4127 let rows = list_with_store(&store, Some(pkg)).unwrap();
4128 assert_eq!(rows.len(), 1);
4129 assert_eq!(rows[0].card_id, id);
4130
4131 let default_rows = list(Some(pkg)).unwrap();
4133 assert!(default_rows.iter().all(|r| r.card_id != id));
4134
4135 alias_set_with_store(&store, "alpha", &id, Some(pkg), None).unwrap();
4137 let via_alias = get_by_alias_with_store(&store, "alpha")
4138 .unwrap()
4139 .expect("alias resolves");
4140 assert_eq!(
4141 via_alias.get("card_id").and_then(|v| v.as_str()),
4142 Some(id.as_str())
4143 );
4144
4145 let samples_path =
4147 write_samples_with_store(&store, &id, vec![json!({ "case": "a", "pass": true })])
4148 .unwrap();
4149 assert!(samples_path.starts_with(tmp.path()));
4150 let back = read_samples_with_store(&store, &id, SamplesQuery::default()).unwrap();
4151 assert_eq!(back.len(), 1);
4152 assert_eq!(back[0].get("case").and_then(|v| v.as_str()), Some("a"));
4153 }
4154
4155 use std::sync::atomic::AtomicUsize;
4160 use std::sync::Barrier;
4161
4162 fn env_lock() -> &'static Mutex<()> {
4165 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4166 LOCK.get_or_init(|| Mutex::new(()))
4167 }
4168
4169 struct BusTestOwnerGuard;
4175 impl Drop for BusTestOwnerGuard {
4176 fn drop(&mut self) {
4177 INSIDE_BUS_TEST.with(|flag| flag.set(false));
4178 }
4179 }
4180
4181 fn with_bus_subscribers<F>(subs: Vec<Arc<dyn CardSubscriber>>, f: F)
4196 where
4197 F: FnOnce(&'static CardEventBus),
4198 {
4199 let _guard = bus_test_gate().lock().unwrap_or_else(|p| p.into_inner());
4202 INSIDE_BUS_TEST.with(|flag| flag.set(true));
4206 let _owner = BusTestOwnerGuard;
4207 let bus = event_bus();
4208 bus.reset_stats_for_test();
4209 bus.replace_subscribers_for_test(subs);
4210 f(bus);
4211 bus.replace_subscribers_for_test(Vec::new());
4213 bus.reset_stats_for_test();
4214 }
4217
4218 struct MockSubscriber {
4220 uri: String,
4221 events: Mutex<Vec<CardEvent>>,
4222 calls: AtomicUsize,
4223 }
4224
4225 impl MockSubscriber {
4226 fn new(uri: &str) -> Arc<Self> {
4227 Arc::new(Self {
4228 uri: uri.to_string(),
4229 events: Mutex::new(Vec::new()),
4230 calls: AtomicUsize::new(0),
4231 })
4232 }
4233 fn call_count(&self) -> usize {
4234 self.calls.load(Ordering::SeqCst)
4235 }
4236 }
4237
4238 impl CardSubscriber for MockSubscriber {
4239 fn on_event(&self, ev: &CardEvent) -> Result<(), String> {
4240 self.calls.fetch_add(1, Ordering::SeqCst);
4241 self.events
4242 .lock()
4243 .unwrap_or_else(|p| p.into_inner())
4244 .push(ev.clone());
4245 Ok(())
4246 }
4247 fn describe(&self) -> String {
4248 self.uri.clone()
4249 }
4250 }
4251
4252 #[test]
4255 fn bus_is_process_singleton() {
4256 let a = event_bus() as *const CardEventBus;
4257 let b = event_bus() as *const CardEventBus;
4258 assert_eq!(a, b, "event_bus() must return the same singleton pointer");
4259 }
4260
4261 #[test]
4262 fn publish_with_no_subscribers_is_noop() {
4263 with_bus_subscribers(Vec::new(), |_bus| {
4264 publish(CardEvent::Created {
4266 pkg: "pkg".into(),
4267 card_id: "id".into(),
4268 toml_text: "x = 1\n".into(),
4269 });
4270 });
4271 }
4272
4273 #[test]
4274 fn init_event_bus_is_idempotent() {
4275 init_event_bus();
4276 init_event_bus();
4277 init_event_bus();
4278 }
4280
4281 #[test]
4284 fn fanout_mirrors_create() {
4285 let primary = tempfile::tempdir().unwrap();
4286 let sub_a = tempfile::tempdir().unwrap();
4287 let sub_b = tempfile::tempdir().unwrap();
4288 let fa = Arc::new(FileCardSubscriber::new(sub_a.path().to_path_buf()));
4289 let fb = Arc::new(FileCardSubscriber::new(sub_b.path().to_path_buf()));
4290 with_bus_subscribers(vec![fa.clone(), fb.clone()], |_bus| {
4291 let store = FileCardStore::new(primary.path().to_path_buf());
4292 let (id, path) =
4293 create_with_store(&store, json!({ "pkg": { "name": "fanout_create_pkg" } }))
4294 .unwrap();
4295 assert!(path.exists());
4296 let primary_text = fs::read_to_string(&path).unwrap();
4297 let a_path = sub_a
4298 .path()
4299 .join("fanout_create_pkg")
4300 .join(format!("{id}.toml"));
4301 let b_path = sub_b
4302 .path()
4303 .join("fanout_create_pkg")
4304 .join(format!("{id}.toml"));
4305 assert!(a_path.exists(), "subscriber A missing card");
4306 assert!(b_path.exists(), "subscriber B missing card");
4307 assert_eq!(fs::read_to_string(&a_path).unwrap(), primary_text);
4308 assert_eq!(fs::read_to_string(&b_path).unwrap(), primary_text);
4309 });
4310 }
4311
4312 #[test]
4313 fn fanout_mirrors_append() {
4314 let primary = tempfile::tempdir().unwrap();
4315 let sub = tempfile::tempdir().unwrap();
4316 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4317 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4318 let store = FileCardStore::new(primary.path().to_path_buf());
4319 let (id, _) =
4320 create_with_store(&store, json!({ "pkg": { "name": "fanout_append_pkg" } }))
4321 .unwrap();
4322 append_with_store(&store, &id, json!({ "extra_key": 42 })).unwrap();
4324 let sub_path = sub
4325 .path()
4326 .join("fanout_append_pkg")
4327 .join(format!("{id}.toml"));
4328 let text = fs::read_to_string(&sub_path).unwrap();
4329 assert!(text.contains("extra_key"), "append must mirror new key");
4330 });
4331 }
4332
4333 #[test]
4334 fn fanout_mirrors_samples() {
4335 let primary = tempfile::tempdir().unwrap();
4336 let sub = tempfile::tempdir().unwrap();
4337 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4338 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4339 let store = FileCardStore::new(primary.path().to_path_buf());
4340 let (id, _) =
4341 create_with_store(&store, json!({ "pkg": { "name": "fanout_samples_pkg" } }))
4342 .unwrap();
4343 write_samples_with_store(&store, &id, vec![json!({ "case": "c0" })]).unwrap();
4344 let sub_path = sub
4345 .path()
4346 .join("fanout_samples_pkg")
4347 .join(format!("{id}.samples.jsonl"));
4348 let text = fs::read_to_string(&sub_path).unwrap();
4349 assert!(text.contains("\"case\":\"c0\""));
4350 });
4351 }
4352
4353 #[test]
4354 fn fanout_mirrors_aliases() {
4355 let primary = tempfile::tempdir().unwrap();
4356 let sub = tempfile::tempdir().unwrap();
4357 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4358 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4359 let store = FileCardStore::new(primary.path().to_path_buf());
4360 let (id, _) =
4361 create_with_store(&store, json!({ "pkg": { "name": "fanout_alias_pkg" } }))
4362 .unwrap();
4363 alias_set_with_store(&store, "myalias", &id, Some("fanout_alias_pkg"), None).unwrap();
4364 let sub_aliases = sub.path().join("_aliases.toml");
4365 assert!(sub_aliases.exists(), "subscriber must receive aliases file");
4366 let text = fs::read_to_string(&sub_aliases).unwrap();
4367 assert!(text.contains("myalias"));
4368 });
4369 }
4370
4371 #[test]
4372 fn fanout_mirrors_import_from_dir_cards() {
4373 let primary = tempfile::tempdir().unwrap();
4374 let sub = tempfile::tempdir().unwrap();
4375 let src = tempfile::tempdir().unwrap();
4376
4377 let src_card = src.path().join("card_x.toml");
4379 let toml_body = format!(
4380 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_x\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_pkg\"\n"
4381 );
4382 fs::write(&src_card, &toml_body).unwrap();
4383
4384 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4385 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4386 let store = FileCardStore::new(primary.path().to_path_buf());
4387 let (imported, _skipped) =
4388 import_from_dir_with_store(&store, src.path(), "fanout_import_pkg").unwrap();
4389 assert_eq!(imported, vec!["card_x".to_string()]);
4390
4391 let sub_card = sub.path().join("fanout_import_pkg").join("card_x.toml");
4392 assert!(sub_card.exists(), "imported card must be mirrored");
4393 });
4394 }
4395
4396 #[test]
4397 fn fanout_mirrors_import_from_dir_samples() {
4398 let primary = tempfile::tempdir().unwrap();
4399 let sub = tempfile::tempdir().unwrap();
4400 let src = tempfile::tempdir().unwrap();
4401
4402 let toml_body = format!(
4403 "schema_version = \"{SCHEMA_VERSION}\"\ncard_id = \"card_y\"\ncreated_at = \"2026-01-01T00:00:00Z\"\n[pkg]\nname = \"fanout_import_samples_pkg\"\n"
4404 );
4405 fs::write(src.path().join("card_y.toml"), &toml_body).unwrap();
4406 fs::write(
4407 src.path().join("card_y.samples.jsonl"),
4408 "{\"case\":\"c0\"}\n",
4409 )
4410 .unwrap();
4411
4412 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4413 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
4414 let store = FileCardStore::new(primary.path().to_path_buf());
4415 let (imported, _) =
4416 import_from_dir_with_store(&store, src.path(), "fanout_import_samples_pkg")
4417 .unwrap();
4418 assert_eq!(imported, vec!["card_y".to_string()]);
4419
4420 let sub_samples = sub
4421 .path()
4422 .join("fanout_import_samples_pkg")
4423 .join("card_y.samples.jsonl");
4424 assert!(sub_samples.exists(), "imported samples must be mirrored");
4425 let text = fs::read_to_string(&sub_samples).unwrap();
4426 assert!(text.contains("c0"));
4427 });
4428 }
4429
4430 #[test]
4431 fn with_store_direct_call_still_publishes() {
4432 let primary = tempfile::tempdir().unwrap();
4433 let mock = MockSubscriber::new("mock://direct");
4434 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4435 let store = FileCardStore::new(primary.path().to_path_buf());
4436 create_with_store(&store, json!({ "pkg": { "name": "direct_call_pkg" } })).unwrap();
4437 assert_eq!(mock.call_count(), 1, "direct _with_store call must publish");
4438 });
4439 }
4440
4441 #[test]
4442 fn subscriber_appended_missing_card_warns() {
4443 let primary = tempfile::tempdir().unwrap();
4444 let sub = tempfile::tempdir().unwrap();
4445 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
4446 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
4447 let store = FileCardStore::new(primary.path().to_path_buf());
4448 bus.replace_subscribers_for_test(Vec::new());
4451 let (id, _) =
4452 create_with_store(&store, json!({ "pkg": { "name": "missing_append_pkg" } }))
4453 .unwrap();
4454 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
4456
4457 append_with_store(&store, &id, json!({ "k": 1 })).unwrap();
4460
4461 let snap = bus.stats().snapshot();
4462 let row = snap
4463 .iter()
4464 .find(|r| r.sink == fs_sub.describe())
4465 .expect("subscriber row exists");
4466 let err_total: u64 = row.err.values().sum();
4467 assert!(err_total >= 1, "subscriber append err must be recorded");
4468 assert!(row.last_error.is_some());
4469 });
4470 }
4471
4472 #[test]
4473 fn subscriber_failure_preserves_primary() {
4474 struct FailingSubscriber;
4475 impl CardSubscriber for FailingSubscriber {
4476 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4477 Err("synthetic failure".into())
4478 }
4479 fn describe(&self) -> String {
4480 "mock://failing".into()
4481 }
4482 }
4483 let primary = tempfile::tempdir().unwrap();
4484 with_bus_subscribers(
4485 vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4486 |bus| {
4487 let store = FileCardStore::new(primary.path().to_path_buf());
4488 let (_id, path) =
4490 create_with_store(&store, json!({ "pkg": { "name": "preserve_primary_pkg" } }))
4491 .unwrap();
4492 assert!(path.exists());
4493 let snap = bus.stats().snapshot();
4494 let row = snap
4495 .iter()
4496 .find(|r| r.sink == "mock://failing")
4497 .expect("row exists");
4498 let err_total: u64 = row.err.values().sum();
4499 assert!(err_total >= 1);
4500 },
4501 );
4502 }
4503
4504 #[test]
4507 fn stats_counts_ok() {
4508 let primary = tempfile::tempdir().unwrap();
4509 let mock = MockSubscriber::new("mock://stats-ok");
4510 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |bus| {
4511 let store = FileCardStore::new(primary.path().to_path_buf());
4512 for i in 0..3 {
4513 create_with_store(
4514 &store,
4515 json!({
4516 "card_id": format!("stats_ok_{i}"),
4517 "pkg": { "name": "stats_ok_pkg" },
4518 }),
4519 )
4520 .unwrap();
4521 }
4522 let snap = bus.stats().snapshot();
4523 let row = snap
4524 .iter()
4525 .find(|r| r.sink == "mock://stats-ok")
4526 .expect("row");
4527 assert_eq!(row.ok.get("created").copied().unwrap_or(0), 3);
4528 assert_eq!(row.err.get("created").copied().unwrap_or(0), 0);
4529 for k in ["created", "appended", "samples", "aliases"] {
4531 assert!(row.ok.contains_key(k), "ok.{k} must be present");
4532 assert!(row.err.contains_key(k), "err.{k} must be present");
4533 }
4534 assert!(row.last_error.is_none());
4535 });
4536 }
4537
4538 #[test]
4539 fn stats_counts_err_with_last_error() {
4540 struct FailingSubscriber;
4541 impl CardSubscriber for FailingSubscriber {
4542 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4543 Err("synthetic create failure".into())
4544 }
4545 fn describe(&self) -> String {
4546 "mock://stats-err".into()
4547 }
4548 }
4549 let primary = tempfile::tempdir().unwrap();
4550 with_bus_subscribers(
4551 vec![Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>],
4552 |bus| {
4553 let store = FileCardStore::new(primary.path().to_path_buf());
4554 create_with_store(&store, json!({ "pkg": { "name": "stats_err_pkg" } })).unwrap();
4555 let snap = bus.stats().snapshot();
4556 let row = snap
4557 .iter()
4558 .find(|r| r.sink == "mock://stats-err")
4559 .expect("row");
4560 assert_eq!(row.err.get("created").copied().unwrap_or(0), 1);
4561 let le = row.last_error.as_ref().expect("last_error set");
4562 assert!(!le.msg.is_empty(), "last_error.msg must be non-empty");
4563 assert_eq!(le.kind, CardEventKind::Created);
4564 assert!(le.ts_ms > 0, "last_error.ts_ms must be populated");
4565 },
4566 );
4567 }
4568
4569 #[test]
4570 fn stats_per_subscriber_isolated() {
4571 struct FailingSubscriber;
4572 impl CardSubscriber for FailingSubscriber {
4573 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
4574 Err("sub1 fails".into())
4575 }
4576 fn describe(&self) -> String {
4577 "mock://sub1-fail".into()
4578 }
4579 }
4580 let primary = tempfile::tempdir().unwrap();
4581 let mock_ok = MockSubscriber::new("mock://sub2-ok");
4582 let subs: Vec<Arc<dyn CardSubscriber>> = vec![
4583 Arc::new(FailingSubscriber) as Arc<dyn CardSubscriber>,
4584 mock_ok.clone() as Arc<dyn CardSubscriber>,
4585 ];
4586 with_bus_subscribers(subs, |bus| {
4587 let store = FileCardStore::new(primary.path().to_path_buf());
4588 create_with_store(&store, json!({ "pkg": { "name": "isolated_pkg" } })).unwrap();
4589 let snap = bus.stats().snapshot();
4590 let r1 = snap
4591 .iter()
4592 .find(|r| r.sink == "mock://sub1-fail")
4593 .expect("sub1 row");
4594 let r2 = snap
4595 .iter()
4596 .find(|r| r.sink == "mock://sub2-ok")
4597 .expect("sub2 row");
4598 assert_eq!(r1.err.get("created").copied().unwrap_or(0), 1);
4599 assert_eq!(r1.ok.get("created").copied().unwrap_or(0), 0);
4600 assert_eq!(r2.ok.get("created").copied().unwrap_or(0), 1);
4601 assert_eq!(r2.err.get("created").copied().unwrap_or(0), 0);
4602 assert!(r1.last_error.is_some());
4603 assert!(r2.last_error.is_none());
4604 });
4605 }
4606
4607 #[test]
4608 fn subscriber_stats_survive_multiple_calls() {
4609 let primary = tempfile::tempdir().unwrap();
4614 let mock = MockSubscriber::new("mock://stats-survive");
4615 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4616 let store = FileCardStore::new(primary.path().to_path_buf());
4617 for i in 0..3 {
4618 create_with_store(
4619 &store,
4620 json!({
4621 "card_id": format!("survive_{i}"),
4622 "pkg": { "name": "survive_pkg" },
4623 }),
4624 )
4625 .unwrap();
4626 }
4627 let snap = subscriber_stats_snapshot();
4630 let row = snap
4631 .iter()
4632 .find(|r| r.sink == "mock://stats-survive")
4633 .expect("row");
4634 assert_eq!(
4635 row.ok.get("created").copied().unwrap_or(0),
4636 3,
4637 "counters must accumulate across calls"
4638 );
4639 });
4640 }
4641
4642 #[test]
4643 fn stats_snapshot_serializes_with_all_kind_keys() {
4644 let primary = tempfile::tempdir().unwrap();
4646 let mock = MockSubscriber::new("mock://json-shape");
4647 with_bus_subscribers(vec![mock.clone() as Arc<dyn CardSubscriber>], |_bus| {
4648 let store = FileCardStore::new(primary.path().to_path_buf());
4649 create_with_store(&store, json!({ "pkg": { "name": "json_shape_pkg" } })).unwrap();
4650 let snap = subscriber_stats_snapshot();
4651 let json = serde_json::to_value(&snap).expect("serializable");
4652 let arr = json.as_array().expect("array");
4653 let row = arr
4654 .iter()
4655 .find(|r| r.get("sink").and_then(|s| s.as_str()) == Some("mock://json-shape"))
4656 .expect("row present in JSON");
4657 assert_eq!(row.get("sink").unwrap(), "mock://json-shape");
4658 for k in ["created", "appended", "samples", "aliases"] {
4659 assert!(row.pointer(&format!("/ok/{k}")).is_some(), "ok.{k} missing");
4660 assert!(
4661 row.pointer(&format!("/err/{k}")).is_some(),
4662 "err.{k} missing"
4663 );
4664 }
4665 assert!(row.get("last_error").is_some());
4666 });
4667 }
4668
4669 #[test]
4670 fn multi_process_tmp_unique_suffix() {
4671 let pid = process::id();
4678 let sample = format!("whatever.tmp.{pid}.0");
4679 let rest = sample.trim_start_matches("whatever.tmp.");
4681 let (pid_part, seq_part) = rest.split_once('.').expect("dotted form");
4682 assert!(pid_part.chars().all(|c| c.is_ascii_digit()));
4683 assert!(seq_part.chars().all(|c| c.is_ascii_digit()));
4684
4685 let dir = tempfile::tempdir().unwrap();
4688 let dest = dir.path().join("out.txt");
4689 atomic_write(&dest, b"hello").unwrap();
4690 assert_eq!(fs::read_to_string(&dest).unwrap(), "hello");
4691 }
4692
4693 #[cfg(unix)]
4696 #[test]
4697 fn describe_roundtrips_env_spec() {
4698 let dir = tempfile::tempdir().unwrap();
4699 let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4700 let uri = sub.describe();
4701 assert!(uri.starts_with("file:///"), "unix uri must be file:///...");
4702 let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4704 assert_eq!(parsed.describe(), uri);
4705 }
4706
4707 #[cfg(windows)]
4708 #[test]
4709 fn describe_roundtrips_env_spec_windows() {
4710 let dir = tempfile::tempdir().unwrap();
4711 let sub = FileCardSubscriber::new(dir.path().to_path_buf());
4712 let uri = sub.describe();
4713 assert!(
4714 uri.starts_with("file:///"),
4715 "windows uri must be file:///..."
4716 );
4717 let parsed = parse_subscriber_spec(&uri).expect("round-trip parse");
4718 assert_eq!(parsed.describe(), uri);
4719 }
4720
4721 #[test]
4722 fn env_empty_means_no_subscribers() {
4723 let _g = env_lock().lock().unwrap_or_else(|p| p.into_inner());
4726 let prev = std::env::var("ALC_CARD_SINKS").ok();
4727 unsafe {
4729 std::env::set_var("ALC_CARD_SINKS", "");
4730 }
4731 let subs = load_subscribers_from_env();
4732 assert!(subs.is_empty());
4733 unsafe {
4735 match prev {
4736 Some(v) => std::env::set_var("ALC_CARD_SINKS", v),
4737 None => std::env::remove_var("ALC_CARD_SINKS"),
4738 }
4739 }
4740 }
4741
4742 #[test]
4743 fn env_parse_rejects_bare_path() {
4744 assert!(parse_subscriber_spec("/foo/bar").is_none());
4745 }
4746
4747 #[test]
4748 fn env_parse_rejects_unknown_scheme() {
4749 assert!(parse_subscriber_spec("sqlite:///foo").is_none());
4750 assert!(parse_subscriber_spec("s3://bucket/foo").is_none());
4751 assert!(parse_subscriber_spec("http://example.com/x").is_none());
4752 }
4753
4754 #[test]
4755 fn env_parse_rejects_non_empty_authority() {
4756 assert!(parse_subscriber_spec("file://host/path").is_none());
4757 }
4758
4759 #[test]
4760 fn env_parse_rejects_missing_double_slash() {
4761 assert!(parse_subscriber_spec("file:/foo").is_none());
4762 assert!(parse_subscriber_spec("file:foo").is_none());
4763 }
4764
4765 #[cfg(unix)]
4766 #[test]
4767 fn env_parse_accepts_file_uri() {
4768 let sub = parse_subscriber_spec("file:///tmp/algocline-sinks-unit").expect("accepted");
4769 assert_eq!(sub.describe(), "file:///tmp/algocline-sinks-unit");
4770 }
4771
4772 #[cfg(windows)]
4773 #[test]
4774 fn env_parse_accepts_file_uri_windows() {
4775 let sub = parse_subscriber_spec("file:///C:/algocline-sinks-unit").expect("accepted");
4776 assert!(sub.describe().starts_with("file:///"));
4778 }
4779
4780 #[test]
4781 fn env_parse_splits_by_pipe() {
4782 let subs = parse_subscribers_from_str("file:///tmp/a|file:///tmp/b");
4783 assert_eq!(subs.len(), 2);
4784 assert_eq!(subs[0].describe(), "file:///tmp/a");
4785 assert_eq!(subs[1].describe(), "file:///tmp/b");
4786 }
4787
4788 #[test]
4789 fn env_parse_treats_colon_as_literal_path() {
4790 #[cfg(unix)]
4792 {
4793 let sub = parse_subscriber_spec("file:///tmp/a:b").expect("accepted");
4794 assert_eq!(sub.describe(), "file:///tmp/a:b");
4795 }
4796 #[cfg(windows)]
4797 {
4798 let sub = parse_subscriber_spec("file:///C:/a:b").expect("accepted");
4800 assert!(sub.describe().contains(":"));
4801 }
4802 }
4803
4804 #[test]
4805 fn env_parse_percent_decode_space() {
4806 #[cfg(unix)]
4807 {
4808 let sub = parse_subscriber_spec("file:///tmp/a%20b").expect("accepted");
4809 assert_eq!(sub.describe(), "file:///tmp/a b");
4810 }
4811 #[cfg(windows)]
4812 {
4813 let sub = parse_subscriber_spec("file:///C:/a%20b").expect("accepted");
4814 assert!(sub.describe().contains(' '));
4815 }
4816 }
4817
4818 #[test]
4819 fn env_parse_percent_decode_rejects_invalid_hex() {
4820 assert!(parse_subscriber_spec("file:///tmp/a%ZZb").is_none());
4821 }
4822
4823 #[test]
4824 fn env_parse_percent_decode_rejects_incomplete() {
4825 assert!(parse_subscriber_spec("file:///tmp/a%2").is_none());
4826 assert!(parse_subscriber_spec("file:///tmp/a%").is_none());
4827 }
4828
4829 #[test]
4830 fn env_parse_rejects_non_utf8() {
4831 assert!(parse_subscriber_spec("file:///tmp/%C3%28").is_none());
4840 }
4841
4842 #[test]
4843 fn env_parse_dedups_duplicate_uris() {
4844 let subs = parse_subscribers_from_str("file:///tmp/x|file:///tmp/x|file:///tmp/y");
4845 assert_eq!(subs.len(), 2);
4846 assert_eq!(subs[0].describe(), "file:///tmp/x");
4847 assert_eq!(subs[1].describe(), "file:///tmp/y");
4848 }
4849
4850 #[test]
4855 fn test_oncelock_init_race_single_winner() {
4856 let barrier = Arc::new(Barrier::new(8));
4859 let mut handles = Vec::new();
4860 for _ in 0..8 {
4861 let b = barrier.clone();
4862 handles.push(std::thread::spawn(move || {
4863 b.wait();
4864 event_bus() as *const CardEventBus as usize
4865 }));
4866 }
4867 let ptrs: Vec<usize> = handles.into_iter().map(|h| h.join().unwrap()).collect();
4868 let first = ptrs[0];
4869 for p in &ptrs {
4870 assert_eq!(*p, first, "singleton identity must hold across threads");
4871 }
4872 }
4873
4874 #[test]
4875 fn test_subscriber_stats_concurrent_update() {
4876 let stats = Arc::new(SubscriberStats::default());
4877 let n_threads = 4;
4878 let per_thread = 250;
4879 let barrier = Arc::new(Barrier::new(n_threads));
4880 let mut handles = Vec::new();
4881 for t in 0..n_threads {
4882 let s = stats.clone();
4883 let b = barrier.clone();
4884 handles.push(std::thread::spawn(move || {
4885 b.wait();
4886 for i in 0..per_thread {
4887 let kind = if (t + i) % 2 == 0 {
4888 CardEventKind::Created
4889 } else {
4890 CardEventKind::Appended
4891 };
4892 s.record_ok("mock://same-subscriber", kind);
4893 }
4894 }));
4895 }
4896 for h in handles {
4897 h.join().unwrap();
4898 }
4899 let snap = stats.snapshot();
4900 let row = snap
4901 .iter()
4902 .find(|r| r.sink == "mock://same-subscriber")
4903 .expect("row");
4904 let expected = (n_threads * per_thread) as u64;
4905 let ok_total: u64 = row.ok.values().sum();
4906 assert_eq!(ok_total, expected, "all increments must be counted");
4907 }
4908
4909 #[test]
4910 fn test_subscriber_stats_poison_recovery() {
4911 let stats = Arc::new(SubscriberStats::default());
4912 stats.record_ok("mock://poison", CardEventKind::Created);
4914
4915 let s_clone = stats.clone();
4917 let _ = std::thread::spawn(move || {
4918 let _g = s_clone.inner.lock().unwrap();
4919 panic!("intentional poison");
4920 })
4921 .join();
4922
4923 let snap = stats.snapshot();
4925 assert!(!snap.is_empty(), "snapshot after poison must still work");
4926 let ok1: u64 = snap[0].ok.values().sum();
4927 assert_eq!(ok1, 1);
4928
4929 stats.record_ok("mock://poison", CardEventKind::Created);
4931 let snap2 = stats.snapshot();
4932 let ok2: u64 = snap2[0].ok.values().sum();
4933 assert_eq!(ok2, 2);
4934 }
4935
4936 #[test]
4937 fn test_atomic_tmp_seq_unique_under_concurrency() {
4938 let dir = tempfile::tempdir().unwrap();
4941 let barrier = Arc::new(Barrier::new(8));
4942 let mut handles = Vec::new();
4943 for i in 0..8 {
4944 let d = dir.path().to_path_buf();
4945 let b = barrier.clone();
4946 handles.push(std::thread::spawn(move || {
4947 b.wait();
4948 let dest = d.join(format!("file_{i}.bin"));
4949 atomic_write(&dest, b"x").unwrap();
4950 dest.file_name().unwrap().to_string_lossy().to_string()
4952 }));
4953 }
4954 let names: HashSet<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
4955 assert_eq!(names.len(), 8, "all dest names must be unique");
4956 }
4959
4960 #[test]
4961 fn test_atomic_tmp_seq_wraps_without_panic() {
4962 let seq = AtomicU64::new(u64::MAX - 1);
4965 let a = seq.fetch_add(1, Ordering::Relaxed);
4966 let b = seq.fetch_add(1, Ordering::Relaxed);
4967 let c = seq.fetch_add(1, Ordering::Relaxed);
4968 assert_eq!(a, u64::MAX - 1);
4969 assert_eq!(b, u64::MAX);
4970 assert_eq!(c, 0, "u64 fetch_add must wrap to 0");
4971 }
4972
4973 #[test]
4974 fn test_rename_atomicity_same_volume() {
4975 let dir = tempfile::tempdir().unwrap();
4979 let dest = dir.path().join("shared.bin");
4980 let barrier = Arc::new(Barrier::new(2));
4981 let mut handles = Vec::new();
4982 for i in 0..2u8 {
4983 let d = dest.clone();
4984 let b = barrier.clone();
4985 handles.push(std::thread::spawn(move || {
4986 b.wait();
4987 let payload = vec![i; 64];
4988 atomic_write(&d, &payload)
4989 }));
4990 }
4991 let mut saw_ok = 0;
4992 for h in handles {
4993 #[cfg(unix)]
4994 {
4995 h.join().unwrap().unwrap();
4997 saw_ok += 1;
4998 }
4999 #[cfg(not(unix))]
5000 {
5001 if h.join().unwrap().is_ok() {
5003 saw_ok += 1;
5004 }
5005 }
5006 }
5007 assert!(saw_ok >= 1, "at least one rename must succeed");
5008 assert!(dest.exists(), "dest must exist after concurrent rename");
5009 let bytes = fs::read(&dest).unwrap();
5010 assert!(bytes == vec![0u8; 64] || bytes == vec![1u8; 64]);
5011 }
5012
5013 #[test]
5014 fn test_fanout_concurrent_create_with_store() {
5015 let primary = tempfile::tempdir().unwrap();
5016 let sub = tempfile::tempdir().unwrap();
5017 let fs_sub = Arc::new(FileCardSubscriber::new(sub.path().to_path_buf()));
5018 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5019 let primary_path = primary.path().to_path_buf();
5020 let barrier = Arc::new(Barrier::new(4));
5021 let mut handles = Vec::new();
5022 for i in 0..4 {
5023 let pp = primary_path.clone();
5024 let b = barrier.clone();
5025 handles.push(std::thread::spawn(move || {
5026 INSIDE_BUS_TEST.with(|flag| flag.set(true));
5032 b.wait();
5033 let store = FileCardStore::new(pp);
5034 create_with_store(
5035 &store,
5036 json!({
5037 "card_id": format!("concur_card_{i}"),
5038 "pkg": { "name": "concur_pkg" },
5039 }),
5040 )
5041 .unwrap()
5042 .0
5043 }));
5044 }
5045 let ids: Vec<String> = handles.into_iter().map(|h| h.join().unwrap()).collect();
5046 assert_eq!(ids.len(), 4);
5047 for id in &ids {
5048 let p = sub.path().join("concur_pkg").join(format!("{id}.toml"));
5049 assert!(p.exists(), "subscriber must have card {id}");
5050 }
5051 let snap = bus.stats().snapshot();
5052 let row = snap
5053 .iter()
5054 .find(|r| r.sink == fs_sub.describe())
5055 .expect("row");
5056 let ok_total: u64 = row.ok.values().sum();
5057 assert_eq!(
5058 ok_total, 4,
5059 "subscriber must have recorded 4 successful deliveries"
5060 );
5061 });
5062 }
5063
5064 fn backfill_primary_with_cards(
5070 pkg: &str,
5071 count: usize,
5072 ) -> (tempfile::TempDir, FileCardStore, Vec<String>) {
5073 let primary = tempfile::tempdir().unwrap();
5074 let store = FileCardStore::new(primary.path().to_path_buf());
5075 let mut ids = Vec::new();
5076 for i in 0..count {
5077 let (id, _) = create_with_store(
5078 &store,
5079 json!({
5080 "card_id": format!("{pkg}_{i}"),
5081 "pkg": { "name": pkg },
5082 }),
5083 )
5084 .unwrap();
5085 ids.push(id);
5086 }
5087 (primary, store, ids)
5088 }
5089
5090 #[test]
5091 fn backfill_pushes_missing_cards() {
5092 let sub_dir = tempfile::tempdir().unwrap();
5093 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5094 let uri = fs_sub.describe();
5095 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5096 let bus = event_bus();
5098 bus.replace_subscribers_for_test(Vec::new());
5099 let (_primary, store, ids) = backfill_primary_with_cards("backfill_push_pkg", 2);
5100 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5101
5102 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5103 assert_eq!(report.pushed.len(), 2);
5104 assert_eq!(report.skipped.len(), 0);
5105 assert!(report.failed.is_empty());
5106 for id in &ids {
5107 let p = sub_dir
5108 .path()
5109 .join("backfill_push_pkg")
5110 .join(format!("{id}.toml"));
5111 assert!(p.exists(), "card {id} must exist on subscriber");
5112 }
5113 });
5114 }
5115
5116 #[test]
5117 fn backfill_skips_existing_on_subscriber() {
5118 let sub_dir = tempfile::tempdir().unwrap();
5119 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5120 let uri = fs_sub.describe();
5121 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5122 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_skip_pkg", 3);
5124 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5125 assert_eq!(report.pushed.len(), 0);
5126 assert_eq!(report.skipped.len(), 3);
5127 assert!(report.failed.is_empty());
5128 });
5129 }
5130
5131 #[test]
5132 fn backfill_dry_run_no_writes() {
5133 let sub_dir = tempfile::tempdir().unwrap();
5134 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5135 let uri = fs_sub.describe();
5136 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5137 let bus = event_bus();
5138 bus.replace_subscribers_for_test(Vec::new());
5139 let (_primary, store, ids) = backfill_primary_with_cards("backfill_dry_pkg", 2);
5140 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5141
5142 let report = card_sink_backfill_with_store(&store, &uri, true).unwrap();
5143 assert_eq!(
5144 report.pushed.len(),
5145 2,
5146 "pushed must list ids even in dry run"
5147 );
5148 for id in &ids {
5149 let p = sub_dir
5150 .path()
5151 .join("backfill_dry_pkg")
5152 .join(format!("{id}.toml"));
5153 assert!(!p.exists(), "dry run must NOT write card {id}");
5154 }
5155 let snap = bus.stats().snapshot();
5157 if let Some(row) = snap.iter().find(|r| r.sink == uri) {
5158 let total: u64 = row.ok.values().sum::<u64>() + row.err.values().sum::<u64>();
5159 assert_eq!(total, 0, "dry run must not touch stats");
5160 }
5161 });
5162 }
5163
5164 #[test]
5165 fn backfill_drifted_card_skipped_not_overwritten() {
5166 let sub_dir = tempfile::tempdir().unwrap();
5167 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5168 let uri = fs_sub.describe();
5169 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5170 let bus = event_bus();
5171 bus.replace_subscribers_for_test(Vec::new());
5172 let (_primary, store, ids) = backfill_primary_with_cards("backfill_drift_pkg", 1);
5173 let id = &ids[0];
5174
5175 let sub_card_dir = sub_dir.path().join("backfill_drift_pkg");
5177 fs::create_dir_all(&sub_card_dir).unwrap();
5178 let sub_card = sub_card_dir.join(format!("{id}.toml"));
5179 fs::write(&sub_card, "drifted=true\n").unwrap();
5180
5181 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5182 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5183 assert_eq!(report.skipped, vec![id.clone()]);
5184 assert!(report.pushed.is_empty());
5185 let after = fs::read_to_string(&sub_card).unwrap();
5186 assert_eq!(after, "drifted=true\n", "drifted copy must be preserved");
5187 });
5188 }
5189
5190 #[test]
5191 fn backfill_includes_samples() {
5192 let sub_dir = tempfile::tempdir().unwrap();
5193 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5194 let uri = fs_sub.describe();
5195 with_bus_subscribers(vec![fs_sub.clone()], |_bus| {
5196 let bus = event_bus();
5197 bus.replace_subscribers_for_test(Vec::new());
5198 let (_primary, store, ids) = backfill_primary_with_cards("backfill_samples_pkg", 1);
5199 let id = &ids[0];
5200 write_samples_with_store(&store, id, vec![json!({ "case": "c0" })]).unwrap();
5201 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5202
5203 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5204 assert_eq!(report.pushed, vec![id.clone()]);
5205 assert_eq!(report.pushed_samples, vec![id.clone()]);
5206 let sub_samples = sub_dir
5207 .path()
5208 .join("backfill_samples_pkg")
5209 .join(format!("{id}.samples.jsonl"));
5210 assert!(sub_samples.exists());
5211 assert!(fs::read_to_string(&sub_samples).unwrap().contains("c0"));
5212 });
5213 }
5214
5215 #[test]
5216 fn backfill_unknown_sink_err() {
5217 with_bus_subscribers(Vec::new(), |_bus| {
5218 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_unknown_pkg", 1);
5219 let err = card_sink_backfill_with_store(&store, "file:///nonexistent/sink", false)
5220 .unwrap_err();
5221 assert!(
5222 err.starts_with("unknown sink"),
5223 "must reject unregistered sink; got: {err}"
5224 );
5225 });
5226 }
5227
5228 #[test]
5229 fn backfill_bypasses_bus_fanout() {
5230 let sub_a_dir = tempfile::tempdir().unwrap();
5233 let sub_b_dir = tempfile::tempdir().unwrap();
5234 let fa = Arc::new(FileCardSubscriber::new(sub_a_dir.path().to_path_buf()));
5235 let fb = Arc::new(FileCardSubscriber::new(sub_b_dir.path().to_path_buf()));
5236 let uri_b = fb.describe();
5237 with_bus_subscribers(
5238 vec![
5239 fa.clone() as Arc<dyn CardSubscriber>,
5240 fb.clone() as Arc<dyn CardSubscriber>,
5241 ],
5242 |bus| {
5243 bus.replace_subscribers_for_test(vec![fa.clone()]);
5245 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_bypass_pkg", 2);
5246 let before = bus
5248 .stats()
5249 .snapshot()
5250 .into_iter()
5251 .find(|r| r.sink == fa.describe())
5252 .map(|r| r.ok.get("created").copied().unwrap_or(0))
5253 .unwrap_or(0);
5254 bus.replace_subscribers_for_test(vec![fa.clone(), fb.clone()]);
5256 card_sink_backfill_with_store(&store, &uri_b, false).unwrap();
5257 let after = bus
5258 .stats()
5259 .snapshot()
5260 .into_iter()
5261 .find(|r| r.sink == fa.describe())
5262 .map(|r| r.ok.get("created").copied().unwrap_or(0))
5263 .unwrap_or(0);
5264 assert_eq!(
5265 before, after,
5266 "backfill target B must not cause fan-out to subscriber A"
5267 );
5268 },
5269 );
5270 }
5271
5272 #[test]
5273 fn backfill_updates_subscriber_stats() {
5274 let sub_dir = tempfile::tempdir().unwrap();
5275 let fs_sub = Arc::new(FileCardSubscriber::new(sub_dir.path().to_path_buf()));
5276 let uri = fs_sub.describe();
5277 with_bus_subscribers(vec![fs_sub.clone()], |bus| {
5278 bus.replace_subscribers_for_test(Vec::new());
5279 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_stats_pkg", 2);
5280 bus.replace_subscribers_for_test(vec![fs_sub.clone()]);
5281
5282 card_sink_backfill_with_store(&store, &uri, false).unwrap();
5283 let snap = bus.stats().snapshot();
5284 let row = snap.iter().find(|r| r.sink == uri).expect("row");
5285 assert_eq!(
5286 row.ok.get("created").copied().unwrap_or(0),
5287 2,
5288 "backfill must increment ok[created] on the target sink"
5289 );
5290 });
5291 }
5292
5293 #[test]
5294 fn backfill_failure_records_err_stat() {
5295 struct FailingSub {
5297 uri: String,
5298 }
5299 impl CardSubscriber for FailingSub {
5300 fn on_event(&self, _ev: &CardEvent) -> Result<(), String> {
5301 Err("synthetic backfill failure".into())
5302 }
5303 fn has_card(&self, _card_id: &str) -> Result<bool, String> {
5304 Ok(false)
5305 }
5306 fn describe(&self) -> String {
5307 self.uri.clone()
5308 }
5309 }
5310 let uri = "mock://backfill-fail".to_string();
5311 let failing: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5312 with_bus_subscribers(vec![failing], |bus| {
5313 bus.replace_subscribers_for_test(Vec::new());
5314 let (_primary, store, _ids) = backfill_primary_with_cards("backfill_fail_pkg", 1);
5315 let reinstall: Arc<dyn CardSubscriber> = Arc::new(FailingSub { uri: uri.clone() });
5317 bus.replace_subscribers_for_test(vec![reinstall]);
5318
5319 let report = card_sink_backfill_with_store(&store, &uri, false).unwrap();
5320 assert_eq!(
5321 report.failed.len(),
5322 1,
5323 "failed must record the synthetic err"
5324 );
5325 assert!(report.pushed.is_empty());
5326 let snap = bus.stats().snapshot();
5327 let row = snap.iter().find(|r| r.sink == uri).expect("row");
5328 assert!(
5329 row.err.get("created").copied().unwrap_or(0) >= 1,
5330 "failing publish must increment err[created]"
5331 );
5332 assert!(row.last_error.is_some());
5333 });
5334 }
5335
5336 #[test]
5337 fn test_oncelock_set_after_init_returns_err() {
5338 let _ = event_bus();
5340 let result = install_event_bus_for_test(CardEventBus::new(Vec::new()));
5341 assert!(
5342 result.is_err(),
5343 "install after init must return Err per OnceLock contract"
5344 );
5345 assert_eq!(result.unwrap_err(), "bus already initialized");
5346 }
5347}