use serde::{Deserialize, Serialize};
use aristo_core::canon::cache::CanonMatchesFile;
use aristo_core::canon::{ClusterSuggestion, Disposition, PrefixTier, SuggestedEntry};
use aristo_core::index::AnnotationId;
use crate::commands::index::workspace_or_error;
use crate::filter::Filter;
use crate::pipeline::queue::{self, QueueDir};
use crate::session::types::ItemRef;
use crate::{CliError, CliResult, Workspace};
pub(crate) const PIPELINE: &str = "canon-suggestions";
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub(crate) struct SuggestionTask {
pub for_canon_ids: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub objective: Option<SuggestedMatch>,
pub siblings: Vec<SuggestedMatch>,
pub discovered_at: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub(crate) struct SuggestedMatch {
pub canon_id: String,
pub version: String,
pub canonical_text: String,
pub scope: String,
pub prefix_tier: PrefixTier,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub backed_by: Option<String>,
pub disposition: Disposition,
}
impl SuggestedMatch {
fn from_entry(entry: &SuggestedEntry) -> Self {
Self {
canon_id: entry.canon_id.clone(),
version: entry.version.clone(),
canonical_text: entry.canonical_text.clone(),
scope: entry.scope.clone(),
prefix_tier: entry.prefix_tier,
backed_by: entry.backed_by.clone(),
disposition: Disposition::Open,
}
}
}
impl SuggestionTask {
pub(crate) fn key(&self) -> &str {
match &self.objective {
Some(obj) => &obj.canon_id,
None => self.for_canon_ids.first().map(String::as_str).unwrap_or(""),
}
}
fn from_cluster(cluster: &ClusterSuggestion, siblings: &[SuggestedEntry], now: &str) -> Self {
Self {
for_canon_ids: vec![cluster.for_canon_id.clone()],
objective: cluster.objective.as_ref().map(SuggestedMatch::from_entry),
siblings: siblings.iter().map(SuggestedMatch::from_entry).collect(),
discovered_at: now.to_string(),
}
}
}
pub(crate) struct LocalState {
pub bound: std::collections::BTreeSet<String>,
pub in_cache: std::collections::BTreeSet<String>,
pub rejected: std::collections::BTreeSet<String>,
}
impl LocalState {
fn contains(&self, canon_id: &str) -> bool {
self.bound.contains(canon_id)
|| self.in_cache.contains(canon_id)
|| self.rejected.contains(canon_id)
}
}
fn dedup_two<'a>(cluster: &'a ClusterSuggestion, local: &LocalState) -> Vec<&'a SuggestedEntry> {
cluster
.siblings
.iter()
.filter(|s| !local.contains(&s.canon_id))
.collect()
}
pub(crate) fn route_suggestions_into_queue(
qdir: &QueueDir,
suggestions: &[Option<ClusterSuggestion>],
local: &LocalState,
now: &str,
) -> CliResult<usize> {
let mut tasks: std::collections::BTreeMap<String, SuggestionTask> =
std::collections::BTreeMap::new();
for cluster in suggestions.iter().flatten() {
let siblings: Vec<&SuggestedEntry> = dedup_two(cluster, local);
if siblings.is_empty() && cluster.objective.is_none() {
continue;
}
let owned: Vec<SuggestedEntry> = siblings.into_iter().cloned().collect();
let task = SuggestionTask::from_cluster(cluster, &owned, now);
let key = task.key().to_string();
match tasks.get_mut(&key) {
Some(existing) => {
if !existing.for_canon_ids.contains(&cluster.for_canon_id) {
existing.for_canon_ids.push(cluster.for_canon_id.clone());
}
for s in task.siblings {
if !existing.siblings.iter().any(|e| e.canon_id == s.canon_id) {
existing.siblings.push(s);
}
}
}
None => {
tasks.insert(key, task);
}
}
}
qdir.ensure_dirs()?;
let mut written = 0usize;
for (key, task) in tasks {
let id = AnnotationId::parse(&key).map_err(|e| CliError::Other {
message: format!("suggestion cluster key `{key}` is not a valid id: {e}"),
exit_code: 1,
})?;
let toml_text = toml::to_string_pretty(&task).map_err(|e| CliError::Other {
message: format!("serialize suggestion task: {e}"),
exit_code: 1,
})?;
queue::enqueue(qdir, &id, &toml_text)?;
written += 1;
}
Ok(written)
}
pub(crate) fn local_state(ws: &Workspace, cache: &CanonMatchesFile) -> CliResult<LocalState> {
use std::collections::BTreeSet;
let mut bound = BTreeSet::new();
let index_path = ws.index_path();
if index_path.is_file() {
let raw = std::fs::read_to_string(&index_path).map_err(CliError::Io)?;
if let Ok(index) = toml::from_str::<aristo_core::index::IndexFile>(&raw) {
for id in index.entries.keys() {
if id.is_canon_bound() {
let bare = id
.as_str()
.strip_prefix("aristos:")
.or_else(|| id.as_str().strip_prefix("kanon:"))
.unwrap_or(id.as_str());
bound.insert(bare.to_string());
}
}
}
}
let mut in_cache = BTreeSet::new();
for entry in cache.entries.values() {
for m in &entry.pending_matches {
in_cache.insert(m.canon_id.clone());
}
for m in &entry.accepted_matches {
in_cache.insert(m.canon_id.clone());
}
}
let mut rejected = BTreeSet::new();
for entry in crate::session::rejections::read_for_kind(ws, INTENT_REVIEW_KIND)? {
if let Some(canon_id) = entry.fingerprint.as_str() {
rejected.insert(canon_id.to_string());
}
}
Ok(LocalState {
bound,
in_cache,
rejected,
})
}
pub(crate) const INTENT_REVIEW_KIND: &str = "intent-review";
#[derive(Debug, Clone, PartialEq, Serialize)]
struct Counts {
matches: Bucket,
suggestions: Bucket,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
struct Bucket {
new: usize,
pending: usize,
}
pub(crate) fn run(objective: Option<String>, counts: bool, filter: Vec<Filter>) -> CliResult<()> {
let ws = workspace_or_error()?;
if counts {
return run_counts(&ws);
}
match objective {
Some(obj) => run_show(&ws, &obj),
None => run_list(&ws, &filter),
}
}
fn bare(id: &str) -> &str {
id.strip_prefix("aristos:")
.or_else(|| id.strip_prefix("kanon:"))
.unwrap_or(id)
}
fn task_passes_filter(task: &SuggestionTask, filter: &[Filter]) -> bool {
filter.iter().all(|f| match f {
Filter::Parent(p) => task.key() == bare(p),
_ => true,
})
}
fn read_all_tasks(ws: &Workspace) -> CliResult<Vec<SuggestionTask>> {
Ok(read_all_tasks_with_paths(ws)?
.into_iter()
.map(|(t, _)| t)
.collect())
}
pub(crate) fn read_all_tasks_with_paths(
ws: &Workspace,
) -> CliResult<Vec<(SuggestionTask, std::path::PathBuf)>> {
let qdir = QueueDir::for_pipeline(ws, PIPELINE);
let mut out = Vec::new();
for dir in [qdir.pending_dir(), qdir.claimed_dir()] {
if !dir.is_dir() {
continue;
}
let mut paths: Vec<_> = std::fs::read_dir(&dir)?
.filter_map(Result::ok)
.map(|e| e.path())
.filter(|p| p.is_file())
.collect();
paths.sort();
for p in paths {
let raw = std::fs::read_to_string(&p).map_err(CliError::Io)?;
let task: SuggestionTask = toml::from_str(&raw).map_err(|e| CliError::Other {
message: format!("parse suggestion task {}: {e}", p.display()),
exit_code: 1,
})?;
out.push((task, p));
}
}
Ok(out)
}
pub(crate) fn find_task_by_key(
ws: &Workspace,
key: &str,
) -> CliResult<Option<(SuggestionTask, std::path::PathBuf)>> {
Ok(read_all_tasks_with_paths(ws)?
.into_iter()
.find(|(t, _)| t.key() == key))
}
pub(crate) fn member_independently_held(
ws: &Workspace,
cache: &CanonMatchesFile,
canon_id: &str,
) -> CliResult<bool> {
let local = local_state(ws, cache)?;
Ok(local.bound.contains(canon_id) || local.in_cache.contains(canon_id))
}
fn run_list(ws: &Workspace, filter: &[Filter]) -> CliResult<()> {
let tasks: Vec<SuggestionTask> = read_all_tasks(ws)?
.into_iter()
.filter(|t| task_passes_filter(t, filter))
.collect();
if tasks.is_empty() {
println!(
"ok: no proof-tree suggestions queued. Run `aristo stamp` (signed in) \
to populate the queue."
);
return Ok(());
}
println!("proof-tree suggestions ({} cluster(s)):", tasks.len());
for task in &tasks {
let obj = task
.objective
.as_ref()
.map(|o| o.canon_id.as_str())
.unwrap_or("(siblings-only — no objective yet)");
println!(
" {obj} ({} sibling(s), for {})",
task.siblings.len(),
task.for_canon_ids.join(", ")
);
}
println!();
println!(
"review with `aristo session start intent-review`, or inspect one with \
`aristo canon suggestions <objective>`."
);
Ok(())
}
fn run_show(ws: &Workspace, objective: &str) -> CliResult<()> {
let tasks = read_all_tasks(ws)?;
let task = tasks
.iter()
.find(|t| t.key() == objective)
.ok_or_else(|| CliError::Other {
message: format!(
"no queued suggestion cluster `{objective}`.\n\
hint: list clusters with `aristo canon suggestions`."
),
exit_code: 1,
})?;
match &task.objective {
Some(obj) => {
println!(
"objective: {} {} ({} tier)",
obj.canon_id,
obj.version,
obj.prefix_tier.as_prefix()
);
println!(" {}", obj.canonical_text);
}
None => println!("objective: (siblings-only — no objective entry yet)"),
}
println!("dragged in by: {}", task.for_canon_ids.join(", "));
println!("siblings ({}):", task.siblings.len());
for s in &task.siblings {
println!(
" {} {} ({} tier)",
s.canon_id,
s.version,
s.prefix_tier.as_prefix()
);
println!(" {}", s.canonical_text);
if let Some(backed_by) = &s.backed_by {
println!(" backed by: {backed_by}");
}
}
println!();
println!("card detail for any entry: `aristo canon show <canon_id>`.");
Ok(())
}
fn pending_raw(ws: &Workspace) -> CliResult<(usize, usize)> {
let cache = CanonMatchesFile::read(&ws.canon_matches_path()).map_err(CliError::Io)?;
let match_pending = cache
.entries
.values()
.flat_map(|e| &e.pending_matches)
.filter(|m| matches!(m.disposition, Disposition::Open))
.count();
let qdir = QueueDir::for_pipeline(ws, PIPELINE);
let pending_in_queue = if qdir.pending_dir().is_dir() {
std::fs::read_dir(qdir.pending_dir())?
.filter_map(Result::ok)
.filter(|e| e.path().is_file())
.count()
} else {
0
};
Ok((match_pending, pending_in_queue))
}
#[aristo::intent(
"`canon_pending` (#10) counts the canon work awaiting the user's review: \
open primary matches in the cache PLUS unclaimed suggestion tasks in the \
queue. Claimed tasks are excluded — they're already in flight, not \
waiting. It is tolerant: any read error yields 0, because it feeds a \
nudge and a nudge must never fail a workflow on missing or malformed \
cache state.",
verify = "test",
id = "canon_pending_counts_open_matches_plus_unclaimed_suggestions"
)]
pub(crate) fn pending_total(ws: &Workspace) -> usize {
pending_raw(ws).map(|(m, q)| m + q).unwrap_or(0)
}
fn run_counts(ws: &Workspace) -> CliResult<()> {
let (match_pending, pending_in_queue) = pending_raw(ws)?;
let tasks = read_all_tasks(ws)?;
let claimed_in_queue = tasks.len().saturating_sub(pending_in_queue);
let counts = Counts {
matches: Bucket {
new: 0,
pending: match_pending,
},
suggestions: Bucket {
new: pending_in_queue,
pending: claimed_in_queue,
},
};
println!(
"{}",
serde_json::to_string(&counts).map_err(|e| CliError::Other {
message: format!("serialize counts: {e}"),
exit_code: 1,
})?
);
Ok(())
}
pub(crate) fn rejection_fingerprint(canon_id: &str) -> serde_json::Value {
serde_json::Value::String(canon_id.to_string())
}
pub(crate) fn rejection_item_ref(canon_id: &str) -> ItemRef {
ItemRef::from_opaque(canon_id)
}
#[cfg(test)]
mod tests {
use super::*;
use aristo_core::canon::{Relationship, SuggestedEntry, VerificationMetadata};
use std::collections::BTreeSet;
fn entry(canon_id: &str, rel: Relationship, tier: PrefixTier) -> SuggestedEntry {
SuggestedEntry {
canon_id: canon_id.into(),
version: "v0.1.0".into(),
canonical_text: format!("text for {canon_id}"),
scope: "turso".into(),
prefix_tier: tier,
backed_by: match tier {
PrefixTier::Aristos => Some("golden model + proofs".into()),
PrefixTier::Kanon => None,
},
verification: VerificationMetadata::default(),
relationship: rel,
}
}
fn cluster(primary: &str, objective: Option<&str>, siblings: &[&str]) -> ClusterSuggestion {
ClusterSuggestion {
for_canon_id: primary.into(),
objective: objective.map(|o| entry(o, Relationship::Parent, PrefixTier::Kanon)),
siblings: siblings
.iter()
.map(|s| entry(s, Relationship::Sibling, PrefixTier::Aristos))
.collect(),
}
}
fn empty_local() -> LocalState {
LocalState {
bound: BTreeSet::new(),
in_cache: BTreeSet::new(),
rejected: BTreeSet::new(),
}
}
fn fresh_qdir() -> (tempfile::TempDir, QueueDir) {
let tmp = tempfile::tempdir().unwrap();
let ws = Workspace {
root: tmp.path().to_path_buf(),
};
let qdir = QueueDir::for_pipeline(&ws, PIPELINE);
(tmp, qdir)
}
fn read_queue(qdir: &QueueDir) -> Vec<SuggestionTask> {
let mut out = Vec::new();
let mut paths: Vec<_> = std::fs::read_dir(qdir.pending_dir())
.unwrap()
.filter_map(Result::ok)
.map(|e| e.path())
.collect();
paths.sort();
for p in paths {
let raw = std::fs::read_to_string(p).unwrap();
out.push(toml::from_str::<SuggestionTask>(&raw).unwrap());
}
out
}
#[test]
fn pending_total_is_zero_for_a_fresh_workspace() {
let tmp = tempfile::tempdir().unwrap();
let ws = Workspace {
root: tmp.path().to_path_buf(),
};
assert_eq!(pending_total(&ws), 0);
}
#[test]
fn pending_total_counts_open_matches_and_unclaimed_tasks() {
let tmp = tempfile::tempdir().unwrap();
let ws = Workspace {
root: tmp.path().to_path_buf(),
};
std::fs::create_dir_all(ws.aristo_dir()).unwrap();
std::fs::write(
ws.canon_matches_path(),
r#"
[__meta__]
schema_version = 1
[foo]
last_match_text_hash = "blake3:x"
canon_fetched_at = "2026-06-15T09:14:22Z"
[[foo.pending_matches]]
canon_id = "x"
version = "v0.1.0"
canonical_text = "y"
canon_version = "v0.2.0"
confidence = 0.9
prefix_tier = "aristos:"
linked = "arta_x"
disposition = "open"
found_at = "2026-06-15T09:14:22Z"
found_by = "aristo stamp"
"#,
)
.unwrap();
let qdir = QueueDir::for_pipeline(&ws, PIPELINE);
std::fs::create_dir_all(qdir.pending_dir()).unwrap();
std::fs::write(qdir.pending_dir().join("t1.toml"), "x").unwrap();
std::fs::write(qdir.pending_dir().join("t2.toml"), "x").unwrap();
assert_eq!(pending_total(&ws), 3, "1 open match + 2 unclaimed tasks");
}
#[test]
fn pending_total_excludes_non_open_matches() {
let tmp = tempfile::tempdir().unwrap();
let ws = Workspace {
root: tmp.path().to_path_buf(),
};
std::fs::create_dir_all(ws.aristo_dir()).unwrap();
std::fs::write(
ws.canon_matches_path(),
r#"
[__meta__]
schema_version = 1
[foo]
last_match_text_hash = "blake3:x"
canon_fetched_at = "2026-06-15T09:14:22Z"
[[foo.pending_matches]]
canon_id = "x"
version = "v0.1.0"
canonical_text = "y"
canon_version = "v0.2.0"
confidence = 0.9
prefix_tier = "aristos:"
linked = "arta_x"
disposition = "skipped"
found_at = "2026-06-15T09:14:22Z"
found_by = "aristo stamp"
"#,
)
.unwrap();
assert_eq!(pending_total(&ws), 0);
}
#[test]
fn route_writes_one_task_per_cluster() {
let (_tmp, qdir) = fresh_qdir();
let suggestions = vec![Some(cluster("p1", Some("obj_a"), &["s1", "s2"]))];
let n = route_suggestions_into_queue(
&qdir,
&suggestions,
&empty_local(),
"2026-06-05T00:00:00Z",
)
.unwrap();
assert_eq!(n, 1);
let tasks = read_queue(&qdir);
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].objective.as_ref().unwrap().canon_id, "obj_a");
assert_eq!(tasks[0].siblings.len(), 2);
assert_eq!(tasks[0].for_canon_ids, vec!["p1"]);
}
#[test]
fn dedup_two_drops_bound_pending_and_rejected_members() {
let (_tmp, qdir) = fresh_qdir();
let mut local = empty_local();
local.bound.insert("s1".into()); local.in_cache.insert("s2".into()); local.rejected.insert("s3".into()); let suggestions = vec![Some(cluster(
"p1",
Some("obj_a"),
&["s1", "s2", "s3", "s4"],
))];
route_suggestions_into_queue(&qdir, &suggestions, &local, "2026-06-05T00:00:00Z").unwrap();
let tasks = read_queue(&qdir);
assert_eq!(tasks.len(), 1);
let surviving: Vec<&str> = tasks[0]
.siblings
.iter()
.map(|s| s.canon_id.as_str())
.collect();
assert_eq!(surviving, vec!["s4"]);
}
#[test]
fn dedup_three_collapses_clusters_sharing_an_objective() {
let (_tmp, qdir) = fresh_qdir();
let suggestions = vec![
Some(cluster("p1", Some("obj_a"), &["s1", "s2"])),
Some(cluster("p2", Some("obj_a"), &["s2", "s3"])),
];
let n = route_suggestions_into_queue(
&qdir,
&suggestions,
&empty_local(),
"2026-06-05T00:00:00Z",
)
.unwrap();
assert_eq!(n, 1, "two primaries, same objective → one task");
let tasks = read_queue(&qdir);
assert_eq!(tasks.len(), 1);
let mut ids: Vec<&str> = tasks[0]
.siblings
.iter()
.map(|s| s.canon_id.as_str())
.collect();
ids.sort();
assert_eq!(ids, vec!["s1", "s2", "s3"], "siblings union, deduped");
let mut primaries = tasks[0].for_canon_ids.clone();
primaries.sort();
assert_eq!(primaries, vec!["p1", "p2"]);
}
#[test]
fn distinct_objectives_stay_separate_tasks() {
let (_tmp, qdir) = fresh_qdir();
let suggestions = vec![
Some(cluster("p1", Some("obj_a"), &["s1"])),
Some(cluster("p2", Some("obj_b"), &["s2"])),
];
let n = route_suggestions_into_queue(
&qdir,
&suggestions,
&empty_local(),
"2026-06-05T00:00:00Z",
)
.unwrap();
assert_eq!(n, 2);
}
#[test]
fn cluster_fully_consumed_by_dedup_and_no_objective_is_dropped() {
let (_tmp, qdir) = fresh_qdir();
let mut local = empty_local();
local.bound.insert("s1".into());
let suggestions = vec![Some(cluster("p1", None, &["s1"]))];
let n = route_suggestions_into_queue(&qdir, &suggestions, &local, "2026-06-05T00:00:00Z")
.unwrap();
assert_eq!(n, 0);
assert!(read_queue(&qdir).is_empty());
}
#[test]
fn siblings_only_cluster_keyed_by_primary() {
let (_tmp, qdir) = fresh_qdir();
let suggestions = vec![Some(cluster("p1", None, &["s1"]))];
route_suggestions_into_queue(&qdir, &suggestions, &empty_local(), "2026-06-05T00:00:00Z")
.unwrap();
let tasks = read_queue(&qdir);
assert_eq!(tasks.len(), 1);
assert!(tasks[0].objective.is_none());
assert_eq!(tasks[0].key(), "p1");
}
#[test]
fn null_cluster_entries_are_skipped() {
let (_tmp, qdir) = fresh_qdir();
let suggestions = vec![None, Some(cluster("p1", Some("obj_a"), &["s1"]))];
let n = route_suggestions_into_queue(
&qdir,
&suggestions,
&empty_local(),
"2026-06-05T00:00:00Z",
)
.unwrap();
assert_eq!(n, 1);
}
}