use assert_cmd::Command;
use predicates::prelude::PredicateBooleanExt;
use predicates::str::contains;
use std::fs;
use std::path::{Path, PathBuf};
const ZERO_HASH: &str = "sha256:0000000000000000000000000000000000000000000000000000000000000000";
fn aristo_in(dir: &Path) -> Command {
let mut cmd = Command::cargo_bin("aristo").unwrap();
cmd.current_dir(dir);
cmd.env_remove("ARETTA_TOKEN");
cmd
}
fn run_git(repo: &Path, args: &[&str]) {
let out = std::process::Command::new("git")
.args(args)
.current_dir(repo)
.output()
.expect("git");
assert!(
out.status.success(),
"git {args:?} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
fn lean_workspace_with_canon_entry(dir: &Path) {
fs::write(dir.join("aristo.toml"), "").unwrap();
fs::create_dir_all(dir.join(".aristo")).unwrap();
fs::create_dir_all(dir.join("src")).unwrap();
let body = format!(
"[__meta__]\nschema_version = 1\n\n\
[\"aristos:foo\"]\nkind = \"intent\"\ntext = \"the property\"\n\
verify = \"full\"\nstatus = \"unknown\"\n\
text_hash = \"{ZERO_HASH}\"\nbody_hash = \"{ZERO_HASH}\"\n\
file = \"src/foo.rs\"\nsite = \"fn foo (line 42)\"\n\
covered_region = \"function\"\n\
linked = \"arta_op4q3z9NbV\"\n",
);
fs::write(dir.join(".aristo/index.toml"), body).unwrap();
}
fn init_repo_with_pushed_head(dir: &Path) -> tempfile::TempDir {
let bare = tempfile::tempdir().unwrap();
run_git(bare.path(), &["init", "--bare", "-q"]);
run_git(dir, &["init", "-q", "-b", "main"]);
run_git(dir, &["config", "user.email", "t@x"]);
run_git(dir, &["config", "user.name", "t"]);
run_git(dir, &["config", "commit.gpgsign", "false"]);
run_git(
dir,
&[
"remote",
"add",
"origin",
"https://github.com/owner/repo.git",
],
);
run_git(
dir,
&["remote", "set-url", "origin", bare.path().to_str().unwrap()],
);
fs::write(dir.join("README"), b"seed").unwrap();
run_git(dir, &["add", "-A"]);
run_git(dir, &["commit", "-q", "--no-verify", "-m", "init"]);
run_git(dir, &["push", "-q", "origin", "main"]);
run_git(
dir,
&[
"remote",
"set-url",
"origin",
"https://github.com/owner/repo.git",
],
);
bare
}
fn workspace_with_one_canon_bound_full_intent(dir: &Path) {
aristo_in(dir).arg("init").assert().success();
let body = format!(
"[__meta__]\nschema_version = 1\n\n\
[\"aristos:foo\"]\nkind = \"intent\"\ntext = \"the property\"\n\
verify = \"full\"\nstatus = \"unknown\"\n\
text_hash = \"{ZERO_HASH}\"\nbody_hash = \"{ZERO_HASH}\"\n\
file = \"src/foo.rs\"\nsite = \"fn foo (line 42)\"\n\
covered_region = \"function\"\n\
linked = \"arta_op4q3z9NbV\"\n",
);
fs::write(dir.join(".aristo/index.toml"), body).unwrap();
let matches = r#"
[__meta__]
schema_version = 1
["aristos:foo"]
last_match_text_hash = "blake3:test"
canon_fetched_at = "2026-05-24T00:00:00Z"
[["aristos:foo".accepted_matches]]
canon_id = "foo"
version = "v0.1.0"
canonical_text = "the property"
canon_version = "v0.2.0"
confidence = 0.95
prefix_tier = "aristos:"
backed_by = "test backing"
accepted_at = "2026-05-24T00:00:00Z"
bound_at = "2026-05-24T00:00:00Z"
"#;
fs::write(dir.join(".aristo/canon-matches.toml"), matches).unwrap();
}
fn write_aretta_token(home: &Path, server_url: &str) {
let creds_dir = home.join(".config/aristo");
fs::create_dir_all(&creds_dir).unwrap();
let body = format!(
r#"
[aretta]
token = "arta_test_token_xxx"
server = "{server_url}"
user_login = "tester"
user_id = 1
repo = "owner/repo"
issued_at = "2026-05-24T00:00:00Z"
"#
);
fs::write(creds_dir.join("credentials"), body).unwrap();
}
fn write_full_fixture(dir: &Path, body: &str) -> PathBuf {
let path = dir.join("verify-fixture.json");
fs::write(&path, body).unwrap();
path
}
fn one_annotation_fixture(
session: &str,
ann_status: &str,
test_status: &str,
summary: &str,
) -> String {
format!(
r#"{{
"post": {{ "session_id": "{session}", "view_url": "https://x", "plan_size": 1 }},
"gets": [
{{
"session_id": "{session}",
"status": "done",
"user_commit_sha": "abc1234567890",
"canon_version": "v0.1.0",
"started_at": "2026-05-24T00:00:00Z",
"completed_at": "2026-05-24T00:01:00Z",
"annotations": [
{{
"annotation_id": "arta_op4q3z9NbV",
"canon_id": "foo",
"version": "v0.1.0",
"scope": "turso",
"tier": "aristos:",
"source_path": "src/foo.rs:42",
"status": "{ann_status}",
"tests": [{{ "test_binary": "foo_conform", "status": "{test_status}", "duration_ms": 12 }}]
}}
],
"summary": {summary}
}}
]
}}"#
)
}
fn expectations_toml(dir: &Path) -> String {
fs::read_to_string(dir.join(".aristo/expectations.toml"))
.unwrap_or_else(|e| panic!("expected .aristo/expectations.toml to exist: {e}"))
}
#[test]
fn accept_writes_expectations_file_keyed_on_prefixed_canon_id() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args([
"verify",
"--accept",
"aristos:foo",
"--because",
"turso reports initialized from a file-existence proxy",
])
.assert()
.success()
.stdout(contains("aristos:foo"));
let toml = expectations_toml(tmp.path());
assert!(
toml.contains("[\"aristos:foo\"]"),
"keyed on prefixed id; got:\n{toml}"
);
assert!(
toml.contains("turso reports initialized from a file-existence proxy"),
"reason recorded verbatim; got:\n{toml}"
);
assert!(
toml.contains("schema_version"),
"meta header present; got:\n{toml}"
);
}
#[test]
fn accept_records_optional_tracking_ref() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args([
"verify",
"--accept",
"aristos:foo",
"--because",
"tracked upstream",
"--tracking",
"https://github.com/tursodatabase/turso/issues/1234",
])
.assert()
.success();
let toml = expectations_toml(tmp.path());
assert!(
toml.contains("https://github.com/tursodatabase/turso/issues/1234"),
"tracking ref recorded; got:\n{toml}"
);
}
#[test]
fn accept_requires_a_reason() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args(["verify", "--accept", "aristos:foo"])
.assert()
.failure()
.code(2)
.stderr(contains("because"));
assert!(
!tmp.path().join(".aristo/expectations.toml").exists(),
"a rejected --accept must not write the expectations file"
);
}
#[test]
fn accept_bare_canon_suffix_resolves_to_prefixed_entry() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args(["verify", "--accept", "foo", "--because", "shorthand works"])
.assert()
.success();
let toml = expectations_toml(tmp.path());
assert!(
toml.contains("[\"aristos:foo\"]"),
"bare suffix resolves to the prefixed id; got:\n{toml}"
);
}
#[test]
fn accept_rejects_unknown_canon_id() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args([
"verify",
"--accept",
"aristos:nonexistent",
"--because",
"x",
])
.assert()
.failure()
.stderr(contains("nonexistent"));
assert!(
!tmp.path().join(".aristo/expectations.toml").exists(),
"unknown id must not write the expectations file"
);
}
#[test]
fn accept_rejects_opaque_server_id() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args(["verify", "--accept", "arta_op4q3z9NbV", "--because", "x"])
.assert()
.failure()
.stderr(contains("arta_"));
}
#[test]
fn accept_is_idempotent_and_updates_the_reason() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args(["verify", "--accept", "foo", "--because", "first reason"])
.assert()
.success();
aristo_in(tmp.path())
.args(["verify", "--accept", "foo", "--because", "second reason"])
.assert()
.success();
let toml = expectations_toml(tmp.path());
assert!(
toml.contains("second reason"),
"reason updated; got:\n{toml}"
);
assert!(
!toml.contains("first reason"),
"old reason replaced; got:\n{toml}"
);
assert_eq!(
toml.matches("[\"aristos:foo\"]").count(),
1,
"exactly one entry after re-accept; got:\n{toml}"
);
}
#[test]
fn waived_failing_annotation_renders_accepted_gap_and_exits_zero() {
let tmp = tempfile::tempdir().unwrap();
let _bare = init_repo_with_pushed_head(tmp.path());
workspace_with_one_canon_bound_full_intent(tmp.path());
let home = tempfile::tempdir().unwrap();
write_aretta_token(home.path(), "https://example.test");
aristo_in(tmp.path())
.args([
"verify",
"--accept",
"aristos:foo",
"--because",
"turso uses a file-existence proxy; tracked upstream",
])
.assert()
.success();
let fixture = one_annotation_fixture(
"01HMGAP",
"failed",
"fail",
r#"{ "total_annotations": 1, "verified": 0, "failed": 1, "build_failed": 0, "inconclusive": 0, "no_coverage": 0 }"#,
);
let fixture_path = write_full_fixture(tmp.path(), &fixture);
aristo_in(tmp.path())
.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path().join(".config"))
.env("ARISTO_CANON_VERIFY_FIXTURE", &fixture_path)
.env("ARISTO_VERIFY_POLL_MS", "1")
.args(["verify", "--wait"])
.assert()
.success()
.stdout(contains("KNOWN PROPERTY FAILURE"))
.stdout(contains(
"turso uses a file-existence proxy; tracked upstream",
))
.stdout(contains("EXPECTED TO FAIL").not())
.stdout(contains("Known limitation").not());
}
#[test]
fn waived_failure_with_a_report_renders_the_full_known_failure_card() {
let tmp = tempfile::tempdir().unwrap();
let _bare = init_repo_with_pushed_head(tmp.path());
workspace_with_one_canon_bound_full_intent(tmp.path());
let home = tempfile::tempdir().unwrap();
write_aretta_token(home.path(), "https://example.test");
aristo_in(tmp.path())
.args([
"verify",
"--accept",
"aristos:foo",
"--because",
"turso file-existence proxy is the root cause",
])
.assert()
.success();
let report = r#"{
"property": {
"canon_id": "foo",
"statement": "The initialized flag is set true only after a successful header sync.",
"impl_source": { "path": "src/foo.rs", "line": 42, "snippet": "prepare" }
},
"scenario": { "op_trace": [], "seed": "seed" },
"verdict": {},
"relation": {
"kind": "state_eq", "description": "StateEq", "compared": ["initialized"], "ignored": ["max_frame"]
},
"finding": {
"kind": "state_eq",
"expected": { "label": "reference", "fields": [ { "field": "initialized", "value": "false" } ] },
"actual": { "label": "turso (observed)", "fields": [ { "field": "initialized", "value": "true" } ] },
"divergence": [ { "field": "initialized", "expected": "false", "actual": "true", "provenance": "non-durable header reads as present" } ]
}
}"#;
let fixture = format!(
r#"{{
"post": {{ "session_id": "01HMFULL", "view_url": "https://x", "plan_size": 1 }},
"gets": [{{
"session_id": "01HMFULL", "status": "done", "user_commit_sha": "abc1234567890",
"canon_version": "v0.1.0", "started_at": "2026-05-24T00:00:00Z", "completed_at": "2026-05-24T00:01:00Z",
"annotations": [{{
"annotation_id": "arta_op4q3z9NbV", "canon_id": "foo", "version": "v0.1.0",
"scope": "turso", "tier": "aristos:", "source_path": "src/foo.rs:42", "status": "failed",
"tests": [{{ "test_binary": "foo_conform", "status": "fail", "report": {report} }}]
}}],
"summary": {{ "total_annotations": 1, "verified": 0, "failed": 1, "build_failed": 0, "inconclusive": 0, "no_coverage": 0 }}
}}]
}}"#
);
let fixture_path = write_full_fixture(tmp.path(), &fixture);
aristo_in(tmp.path())
.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path().join(".config"))
.env("ARISTO_CANON_VERIFY_FIXTURE", &fixture_path)
.env("ARISTO_VERIFY_POLL_MS", "1")
.args(["verify", "--wait"])
.assert()
.success()
.stdout(contains("KNOWN PROPERTY FAILURE"))
.stdout(contains("turso file-existence proxy is the root cause"))
.stdout(contains("The initialized flag is set true"))
.stdout(contains("divergence observed"))
.stdout(contains("expected").and(contains("initialized = false")))
.stdout(contains("actual").and(contains("initialized = true")))
.stdout(contains("PROPERTY VIOLATED").not())
.stdout(contains("EXPECTED TO FAIL").not());
}
#[test]
fn waived_annotation_that_now_passes_trips_the_strict_ratchet() {
let tmp = tempfile::tempdir().unwrap();
let _bare = init_repo_with_pushed_head(tmp.path());
workspace_with_one_canon_bound_full_intent(tmp.path());
let home = tempfile::tempdir().unwrap();
write_aretta_token(home.path(), "https://example.test");
aristo_in(tmp.path())
.args([
"verify",
"--accept",
"aristos:foo",
"--because",
"stale gap",
])
.assert()
.success();
let fixture = one_annotation_fixture(
"01HMRATCHET",
"verified",
"pass",
r#"{ "total_annotations": 1, "verified": 1, "failed": 0, "build_failed": 0, "inconclusive": 0, "no_coverage": 0 }"#,
);
let fixture_path = write_full_fixture(tmp.path(), &fixture);
aristo_in(tmp.path())
.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path().join(".config"))
.env("ARISTO_CANON_VERIFY_FIXTURE", &fixture_path)
.env("ARISTO_VERIFY_POLL_MS", "1")
.args(["verify", "--wait"])
.assert()
.failure()
.stdout(contains("accepted gap now passes"))
.stdout(contains("expectations.toml"))
.stdout(contains("aristos:foo"));
}
#[test]
fn unwaived_failure_is_unchanged_and_still_red() {
let tmp = tempfile::tempdir().unwrap();
let _bare = init_repo_with_pushed_head(tmp.path());
workspace_with_one_canon_bound_full_intent(tmp.path());
let home = tempfile::tempdir().unwrap();
write_aretta_token(home.path(), "https://example.test");
let fixture = one_annotation_fixture(
"01HMRED",
"failed",
"fail",
r#"{ "total_annotations": 1, "verified": 0, "failed": 1, "build_failed": 0, "inconclusive": 0, "no_coverage": 0 }"#,
);
let fixture_path = write_full_fixture(tmp.path(), &fixture);
aristo_in(tmp.path())
.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path().join(".config"))
.env("ARISTO_CANON_VERIFY_FIXTURE", &fixture_path)
.env("ARISTO_VERIFY_POLL_MS", "1")
.args(["verify", "--wait"])
.assert()
.failure()
.stdout(contains("KNOWN PROPERTY FAILURE").not())
.stdout(contains("aristo verify --accept aristos:foo --because"));
}
#[test]
fn accept_rejects_an_empty_reason() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args(["verify", "--accept", "foo", "--because", " "])
.assert()
.failure()
.code(2)
.stderr(contains("non-empty reason"));
assert!(
!tmp.path().join(".aristo/expectations.toml").exists(),
"a blank reason must not write the expectations file"
);
}
#[test]
fn accept_conflicts_with_dispatch_and_ci_flags() {
let tmp = tempfile::tempdir().unwrap();
lean_workspace_with_canon_entry(tmp.path());
aristo_in(tmp.path())
.args(["verify", "--accept", "foo", "--because", "x", "--check"])
.assert()
.failure()
.code(2);
assert!(
!tmp.path().join(".aristo/expectations.toml").exists(),
"a rejected flag combo must not write the expectations file"
);
}
#[test]
fn malformed_expectations_hard_errors_verify_wait() {
let tmp = tempfile::tempdir().unwrap();
let _bare = init_repo_with_pushed_head(tmp.path());
workspace_with_one_canon_bound_full_intent(tmp.path());
let home = tempfile::tempdir().unwrap();
write_aretta_token(home.path(), "https://example.test");
fs::write(
tmp.path().join(".aristo/expectations.toml"),
"this is = = not valid toml [[[",
)
.unwrap();
let fixture = one_annotation_fixture(
"01HMBAD",
"failed",
"fail",
r#"{ "total_annotations": 1, "verified": 0, "failed": 1, "build_failed": 0, "inconclusive": 0, "no_coverage": 0 }"#,
);
let fixture_path = write_full_fixture(tmp.path(), &fixture);
aristo_in(tmp.path())
.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path().join(".config"))
.env("ARISTO_CANON_VERIFY_FIXTURE", &fixture_path)
.env("ARISTO_VERIFY_POLL_MS", "1")
.args(["verify", "--wait"])
.assert()
.failure()
.stderr(contains("expectations.toml"));
}
#[test]
fn waived_annotation_with_an_operational_test_is_red_not_an_accepted_gap() {
let tmp = tempfile::tempdir().unwrap();
let _bare = init_repo_with_pushed_head(tmp.path());
workspace_with_one_canon_bound_full_intent(tmp.path());
let home = tempfile::tempdir().unwrap();
write_aretta_token(home.path(), "https://example.test");
aristo_in(tmp.path())
.args([
"verify",
"--accept",
"aristos:foo",
"--because",
"the property gap",
])
.assert()
.success();
let fixture = r#"{
"post": { "session_id": "01HMOP", "view_url": "https://x", "plan_size": 1 },
"gets": [{
"session_id": "01HMOP", "status": "done", "user_commit_sha": "abc1234567890",
"canon_version": "v0.1.0", "started_at": "2026-05-24T00:00:00Z", "completed_at": "2026-05-24T00:01:00Z",
"annotations": [{
"annotation_id": "arta_op4q3z9NbV", "canon_id": "foo", "version": "v0.1.0",
"scope": "turso", "tier": "aristos:", "source_path": "src/foo.rs:42", "status": "failed",
"tests": [
{ "test_binary": "foo_conform", "status": "fail" },
{ "test_binary": "foo_build", "status": "build_failed" }
]
}],
"summary": { "total_annotations": 1, "verified": 0, "failed": 1, "build_failed": 0, "inconclusive": 0, "no_coverage": 0 }
}]
}"#;
let fixture_path = write_full_fixture(tmp.path(), fixture);
aristo_in(tmp.path())
.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path().join(".config"))
.env("ARISTO_CANON_VERIFY_FIXTURE", &fixture_path)
.env("ARISTO_VERIFY_POLL_MS", "1")
.args(["verify", "--wait"])
.assert()
.failure()
.stdout(contains("KNOWN PROPERTY FAILURE").not());
}
#[test]
fn orphan_waiver_warns_when_it_matches_no_annotation() {
let tmp = tempfile::tempdir().unwrap();
let _bare = init_repo_with_pushed_head(tmp.path());
workspace_with_one_canon_bound_full_intent(tmp.path());
let home = tempfile::tempdir().unwrap();
write_aretta_token(home.path(), "https://example.test");
aristo_in(tmp.path())
.args([
"verify",
"--accept",
"aristos:foo",
"--because",
"drifted gap",
])
.assert()
.success();
let fixture = r#"{
"post": { "session_id": "01HMORPH", "view_url": "https://x", "plan_size": 1 },
"gets": [{
"session_id": "01HMORPH", "status": "done", "user_commit_sha": "abc1234567890",
"canon_version": "v0.1.0", "started_at": "2026-05-24T00:00:00Z", "completed_at": "2026-05-24T00:01:00Z",
"annotations": [],
"summary": { "total_annotations": 0, "verified": 0, "failed": 0, "build_failed": 0, "inconclusive": 0, "no_coverage": 0 }
}]
}"#;
let fixture_path = write_full_fixture(tmp.path(), fixture);
aristo_in(tmp.path())
.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path().join(".config"))
.env("ARISTO_CANON_VERIFY_FIXTURE", &fixture_path)
.env("ARISTO_VERIFY_POLL_MS", "1")
.args(["verify", "--wait"])
.assert()
.success()
.stderr(contains("aristos:foo"))
.stderr(contains("matched no annotation"));
}