use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use crate::domain::path::canonicalize_scopes;
use crate::domain::root::resolve_archive_path;
use crate::domain::scope::ScopeMatch;
use crate::domain::source::Source;
use crate::domain::{FactEntry, FactValue};
use crate::expr::filter::{self, Filter};
use crate::expr::{BuiltinKey, BuiltinKeyVisibility, FactType, Modifier, ModifierCategory};
use crate::repo::{self, Connection, Db};
#[derive(Serialize, Deserialize)]
pub struct ManifestConfig {
pub meta: ManifestMeta,
pub output: ManifestOutput,
}
#[derive(Serialize, Deserialize)]
pub struct ManifestMeta {
pub query: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub scope: Option<String>,
pub generated_at: String,
pub lock_hash: String,
}
#[derive(Serialize, Deserialize)]
pub struct ManifestOutput {
pub pattern: String,
pub archive_root_id: i64,
pub base_dir: String,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct LockEntry {
pub id: i64,
pub root_id: i64,
pub path: String,
pub device: i64,
pub inode: i64,
pub size: i64,
pub mtime: i64,
pub partial_hash: String, pub object_id: Option<i64>,
pub hash_type: Option<String>,
pub hash_value: Option<String>,
}
impl LockEntry {
pub fn from_source(
source: &Source,
hash_type: Option<String>,
hash_value: Option<String>,
) -> Self {
Self {
id: source.id,
root_id: source.root_id,
path: source.path(),
device: source.device,
inode: source.inode,
size: source.size,
mtime: source.mtime,
partial_hash: source.partial_hash.clone(),
object_id: source.object_id,
hash_type,
hash_value,
}
}
}
pub struct GenerateOptions {
pub force: bool,
pub include_archived: bool,
pub show_archived: bool,
pub allow_duplicates: bool,
}
struct LockGenerationResult {
source_count: usize,
full_coverage_facts: Vec<(String, FactType, String)>,
}
fn generate_lock(
conn: &mut Connection,
scope_prefixes: &[String],
filters: &[Filter],
lock_path: &Path,
options: &GenerateOptions,
) -> Result<Option<LockGenerationResult>> {
let (sources, archived, excluded_count, unhashed_count, all_facts) =
query_sources(conn, scope_prefixes, filters, options.include_archived)?;
if excluded_count > 0 {
eprintln!("Skipped {excluded_count} excluded sources");
}
if unhashed_count > 0 {
eprintln!("Skipped {unhashed_count} sources without content hash");
eprintln!(" To discover: run 'canon ls --unhashed' with your scope/pattern");
eprintln!(
" To include: import hashes via worklist pipeline, then run 'canon cluster refresh'"
);
eprintln!(" To permanently exclude: use 'canon exclude set' with your pattern AND 'NOT content.hash.sha256?'");
}
if !archived.is_empty() {
eprintln!("Excluded {} files already in archive(s)", archived.len());
if options.show_archived {
eprintln!("Archived files:");
for (source_path, archive_path) in &archived {
eprintln!(" {source_path} -> {archive_path}");
}
} else {
eprintln!("Use --show-archived to list them");
}
}
if sources.is_empty() {
return Ok(None);
}
if !options.allow_duplicates {
let duplicate_groups = find_source_duplicates(&sources);
if !duplicate_groups.is_empty() {
let total_dup_sources: usize = duplicate_groups.iter().map(|(_, v)| v.len()).sum();
bail!(
"Found {} duplicate groups ({} sources with identical content)\n\
Use `canon ls --duplicates` to see details (supports [path] and --where filters).\n\
Use `canon exclude duplicates --prefer <path>` to resolve.\n\
Use --allow-duplicates to include them anyway.",
duplicate_groups.len(),
total_dup_sources
);
}
}
let full_coverage_facts = collect_full_coverage_facts(&sources, &all_facts);
write_lock_file(lock_path, &sources)?;
Ok(Some(LockGenerationResult {
source_count: sources.len(),
full_coverage_facts,
}))
}
pub fn generate(
db: &mut Db,
scope_paths: &[PathBuf],
original_filters: &[String],
expanded_filters: &[String],
dest: &Path,
output_path: &Path,
options: &GenerateOptions,
) -> Result<()> {
if output_path.exists() && !options.force {
bail!(
"Output file '{}' already exists.\n\
Use `cluster refresh` to update the lock file, or -f/--force to overwrite.",
output_path.display()
);
}
if scope_paths.is_empty() && expanded_filters.is_empty() {
bail!("At least one of path or --where filter is required");
}
let conn = db.conn_mut();
let all_roots = repo::root::fetch_all(conn)?;
let (archive_root_id, _archive_root_path, base_dir) = resolve_archive_path(&all_roots, dest)?;
let scope_prefixes = canonicalize_scopes(scope_paths)?;
let parsed_filters: Vec<Filter> = expanded_filters
.iter()
.map(|f| Filter::parse(f))
.collect::<Result<Vec<_>>>()?;
let lock_path = output_path.with_extension("lock");
let result = generate_lock(conn, &scope_prefixes, &parsed_filters, &lock_path, options)?;
let result = match result {
Some(r) => r,
None => {
println!("No sources matched the query");
return Ok(());
}
};
let lock_hash = hash_file(&lock_path)?;
let fact_help = generate_fact_help(result.source_count, &result.full_coverage_facts);
let config = ManifestConfig {
meta: ManifestMeta {
query: expanded_filters.to_vec(),
scope: if scope_prefixes.len() == 1 {
Some(scope_prefixes[0].clone())
} else if scope_prefixes.is_empty() {
None
} else {
Some(scope_prefixes.join(", "))
},
generated_at: current_timestamp(),
lock_hash,
},
output: ManifestOutput {
pattern: "{filename}".to_string(),
archive_root_id,
base_dir,
},
};
let toml_str =
toml::to_string_pretty(&config).context("Failed to serialize manifest config")?;
let comment_lines: Vec<String> = original_filters
.iter()
.zip(expanded_filters.iter())
.filter(|(orig, exp)| orig != exp)
.map(|(orig, _)| format!("# Original: {orig}"))
.collect();
let toml_str = if comment_lines.is_empty() {
toml_str
} else {
inject_comments_before_key(&toml_str, "query", &comment_lines)
};
let toml_with_help = format!("{}\n\n{}", toml_str.trim_end(), fact_help);
fs::write(output_path, &toml_with_help)
.with_context(|| format!("Failed to write manifest to {}", output_path.display()))?;
println!(
"Generated manifest: {} ({} sources in {})",
output_path.display(),
result.source_count,
lock_path.display()
);
Ok(())
}
pub fn refresh(db: &mut Db, config_path: &Path, options: &GenerateOptions) -> Result<()> {
let conn = db.conn_mut();
let config_content = fs::read_to_string(config_path)
.with_context(|| format!("Failed to read config: {}", config_path.display()))?;
let mut config: ManifestConfig = toml::from_str(&config_content)
.with_context(|| format!("Failed to parse config: {}", config_path.display()))?;
let scope_prefixes: Vec<String> = match &config.meta.scope {
Some(s) => s.split(", ").map(|p| p.to_string()).collect(),
None => vec![],
};
let parsed_filters: Vec<Filter> = config
.meta
.query
.iter()
.map(|f| Filter::parse(f))
.collect::<Result<Vec<_>>>()?;
let lock_path = config_path.with_extension("lock");
let result = generate_lock(conn, &scope_prefixes, &parsed_filters, &lock_path, options)?;
match result {
Some(r) => {
let lock_hash = hash_file(&lock_path)?;
config.meta.lock_hash = lock_hash;
config.meta.generated_at = current_timestamp();
let fact_help = generate_fact_help(r.source_count, &r.full_coverage_facts);
let toml_str =
toml::to_string_pretty(&config).context("Failed to serialize manifest config")?;
let toml_with_help = format!("{}\n\n{}", toml_str.trim_end(), fact_help);
fs::write(config_path, &toml_with_help)
.with_context(|| format!("Failed to write config: {}", config_path.display()))?;
println!(
"Refreshed lock file: {} ({} sources)",
lock_path.display(),
r.source_count
);
}
None => {
if lock_path.exists() {
fs::remove_file(&lock_path)?;
}
config.meta.lock_hash = String::new();
config.meta.generated_at = current_timestamp();
let toml_str =
toml::to_string_pretty(&config).context("Failed to serialize manifest config")?;
fs::write(config_path, &toml_str)
.with_context(|| format!("Failed to write config: {}", config_path.display()))?;
println!("No sources matched the query");
}
}
Ok(())
}
fn write_lock_file(lock_path: &Path, sources: &[LockEntry]) -> Result<()> {
let lock_file = File::create(lock_path)
.with_context(|| format!("Failed to create lock file: {}", lock_path.display()))?;
let mut writer = BufWriter::new(lock_file);
for source in sources {
serde_json::to_writer(&mut writer, source)
.with_context(|| format!("Failed to write lock entry for {}", source.path))?;
writeln!(writer)?;
}
writer.flush()?;
Ok(())
}
pub fn hash_file(path: &Path) -> Result<String> {
let file = File::open(path)
.with_context(|| format!("Failed to open file for hashing: {}", path.display()))?;
let mut reader = BufReader::new(file);
let mut hasher = Sha256::new();
let mut buffer = [0u8; 8192];
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
hasher.update(&buffer[..bytes_read]);
}
Ok(format!("{:x}", hasher.finalize()))
}
fn query_sources(
conn: &mut Connection,
scope_prefixes: &[String],
filters: &[Filter],
include_archived: bool,
) -> Result<(
Vec<LockEntry>,
Vec<(String, String)>,
usize,
usize,
HashMap<i64, Vec<FactEntry>>,
)> {
let root_ids: Vec<i64> = conn
.prepare("SELECT id FROM roots")?
.query_map([], |row| row.get(0))?
.collect::<Result<Vec<_>, _>>()?;
let all_sources = repo::source::batch_fetch_by_roots(conn, &root_ids)?;
let scopes = ScopeMatch::classify_all(scope_prefixes);
let mut excluded_count = 0usize;
let filtered: Vec<_> = all_sources
.into_iter()
.filter(|s| s.is_active())
.filter(|s| include_archived || s.is_from_role("source"))
.filter(|s| s.matches_scope(&scopes))
.filter(|s| {
if s.is_excluded() {
excluded_count += 1;
false
} else {
true
}
})
.collect();
let filtered_sources = if filters.is_empty() {
filtered
} else {
let source_ids: Vec<i64> = filtered.iter().map(|s| s.id).collect();
let filtered_ids = filter::apply_filters(conn, &source_ids, filters)?;
let filtered_id_set: HashSet<i64> = filtered_ids.into_iter().collect();
filtered
.into_iter()
.filter(|s| filtered_id_set.contains(&s.id))
.collect()
};
let mut unhashed_count = 0;
let hashed_sources: Vec<Source> = filtered_sources
.into_iter()
.filter(|s| {
if s.object_id.is_none() {
unhashed_count += 1;
false
} else {
true
}
})
.collect();
let object_ids: Vec<i64> = hashed_sources.iter().filter_map(|s| s.object_id).collect();
let objects = repo::object::batch_fetch_by_ids(conn, &object_ids)?;
let archive_paths = repo::object::batch_find_archive_paths(conn, &object_ids)?;
let source_ids: Vec<i64> = hashed_sources.iter().map(|s| s.id).collect();
let all_facts = repo::fact::batch_fetch_for_sources(conn, &source_ids)?;
let mut sources = Vec::new();
let mut archived = Vec::new();
for source in hashed_sources {
let (hash_type, hash_value) = source
.object_id
.and_then(|oid| objects.get(&oid))
.map(|obj| (Some(obj.hash_type.clone()), Some(obj.hash_value.clone())))
.unwrap_or((None, None));
let archive_path = source
.object_id
.and_then(|oid| archive_paths.get(&oid))
.and_then(|paths| paths.first())
.cloned();
let lock_entry = LockEntry::from_source(&source, hash_type, hash_value);
if let Some(arch_path) = archive_path {
if include_archived {
sources.push(lock_entry);
} else {
archived.push((lock_entry.path.clone(), arch_path));
}
} else {
sources.push(lock_entry);
}
}
Ok((sources, archived, excluded_count, unhashed_count, all_facts))
}
fn current_timestamp() -> String {
chrono::Utc::now().to_rfc3339()
}
#[derive(Default)]
struct FactTypeTracker {
count: usize,
text_count: usize,
num_count: usize,
time_count: usize,
}
impl FactTypeTracker {
fn add(&mut self, fact_type: FactType) {
self.count += 1;
match fact_type {
FactType::Text | FactType::Path => self.text_count += 1,
FactType::Num => self.num_count += 1,
FactType::Time => self.time_count += 1,
}
}
fn has_mixed_types(&self) -> bool {
let type_count = (self.text_count > 0) as usize
+ (self.num_count > 0) as usize
+ (self.time_count > 0) as usize;
type_count > 1
}
fn dominant_type(&self) -> FactType {
if self.time_count >= self.text_count && self.time_count >= self.num_count {
FactType::Time
} else if self.num_count >= self.text_count {
FactType::Num
} else {
FactType::Text
}
}
fn type_breakdown(&self) -> String {
let mut parts = Vec::new();
if self.time_count > 0 {
parts.push(format!("{} time", self.time_count));
}
if self.text_count > 0 {
parts.push(format!("{} text", self.text_count));
}
if self.num_count > 0 {
parts.push(format!("{} num", self.num_count));
}
parts.join(", ")
}
}
fn collect_full_coverage_facts(
sources: &[LockEntry],
all_facts: &HashMap<i64, Vec<FactEntry>>,
) -> Vec<(String, FactType, String)> {
use std::collections::HashSet;
if sources.is_empty() {
return Vec::new();
}
let source_count = sources.len();
let mut fact_counts: HashMap<String, FactTypeTracker> = HashMap::new();
let mut seen_keys: HashSet<String> = HashSet::new();
for source in sources {
if let Some(facts) = all_facts.get(&source.id) {
for fact in facts {
let fact_type = match &fact.value {
FactValue::Text(_) => FactType::Text,
FactValue::Num(_) => FactType::Num,
FactValue::Time(_) => FactType::Time,
FactValue::Path(_) => FactType::Path,
};
let seen_key = format!("{}:{}", source.id, fact.key);
if !seen_keys.contains(&seen_key) {
fact_counts
.entry(fact.key.clone())
.or_default()
.add(fact_type);
seen_keys.insert(seen_key);
}
}
}
}
let mut mixed_type_warnings: Vec<(String, String)> = Vec::new();
for (key, tracker) in &fact_counts {
if tracker.count == source_count && tracker.has_mixed_types() {
mixed_type_warnings.push((key.clone(), tracker.type_breakdown()));
}
}
if !mixed_type_warnings.is_empty() {
mixed_type_warnings.sort_by(|a, b| a.0.cmp(&b.0));
eprintln!("Warning: some facts have inconsistent types across sources:");
for (key, breakdown) in &mixed_type_warnings {
eprintln!(" {key}: {breakdown}");
}
eprintln!(" Type-specific modifiers (|year, |month, etc.) may fail on mismatched values.");
eprintln!(" To fix: delete outliers with 'canon facts delete <key> --on object --value-type <minority-type>'");
}
let mut full_coverage: Vec<(String, FactType, String)> = fact_counts
.into_iter()
.filter(|(_, tracker)| tracker.count == source_count)
.map(|(key, tracker)| {
let description = get_fact_description(&key);
(key, tracker.dominant_type(), description)
})
.collect();
full_coverage.sort_by(|a, b| a.0.cmp(&b.0));
full_coverage
}
fn get_fact_description(key: &str) -> String {
BuiltinKey::from_str(key)
.and_then(|k| k.description())
.map(|s| s.to_string())
.unwrap_or_default()
}
fn generate_fact_help(
source_count: usize,
full_coverage_facts: &[(String, FactType, String)],
) -> String {
use strum::IntoEnumIterator;
if source_count == 0 {
return String::new();
}
let mut help = String::new();
help.push_str(&format!(
"# Available facts for pattern (100% coverage on {source_count} sources in this cluster):\n"
));
help.push_str("#\n");
help.push_str("# Built-in:\n");
for key in BuiltinKey::iter() {
if key.visibility() != BuiltinKeyVisibility::Default {
continue;
}
let name: &'static str = key.into();
let desc = key.description().unwrap_or("");
help.push_str(&format!(
"# {:18} {:6} - {}\n",
name,
key.fact_type().as_str(),
desc
));
}
help.push_str(&format!(
"# {:18} {:6} - {}\n",
"object.hash", "text", "Content hash (if hashed)"
));
help.push_str("#\n");
if !full_coverage_facts.is_empty() {
help.push_str("# Content facts:\n");
for (key, fact_type, description) in full_coverage_facts {
let desc_part = if description.is_empty() {
String::new()
} else {
format!(" - {description}")
};
help.push_str(&format!(
"# {:18} {:6}{}\n",
key,
fact_type.as_str(),
desc_part
));
}
help.push_str("#\n");
}
let time_mods: Vec<_> = Modifier::iter()
.filter(|m| m.category() == ModifierCategory::Time)
.map(|m| {
let name: &'static str = m.into();
format!("|{name}")
})
.collect();
let string_mods: Vec<_> = Modifier::iter()
.filter(|m| m.category() == ModifierCategory::String)
.map(|m| {
let name: &'static str = m.into();
format!("|{name}")
})
.collect();
help.push_str("# Modifiers:\n");
help.push_str(&format!("# Time: {}\n", time_mods.join(" ")));
help.push_str(&format!("# String: {}\n", string_mods.join(" ")));
help.push_str("# Path: [0] [-1] [1:3] etc.\n");
help.push_str("#\n");
help.push_str("# Aliases:\n");
for key in BuiltinKey::iter() {
if let Some(expansion) = key.expansion() {
let name: &'static str = key.into();
help.push_str(&format!("# {{{name}}} → {{{expansion}}}\n"));
}
}
help.push('\n');
help
}
fn find_source_duplicates(sources: &[LockEntry]) -> Vec<(i64, Vec<i64>)> {
let mut object_map: HashMap<i64, Vec<i64>> = HashMap::new();
for source in sources {
if let Some(object_id) = source.object_id {
object_map.entry(object_id).or_default().push(source.id);
}
}
object_map
.into_iter()
.filter(|(_, ids)| ids.len() > 1)
.collect()
}
fn inject_comments_before_key(toml_str: &str, key: &str, comments: &[String]) -> String {
let prefix = format!("{key} = ");
let mut result = String::with_capacity(toml_str.len() + comments.len() * 40);
for line in toml_str.lines() {
if line.starts_with(&prefix) {
for comment in comments {
result.push_str(comment);
result.push('\n');
}
}
result.push_str(line);
result.push('\n');
}
result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::repo::open_in_memory_for_test;
use rusqlite::Connection as RusqliteConnection;
fn setup_test_db() -> RusqliteConnection {
open_in_memory_for_test()
}
fn insert_root(conn: &RusqliteConnection, path: &str, role: &str, suspended: bool) -> i64 {
conn.execute(
"INSERT INTO roots (path, role, suspended) VALUES (?, ?, ?)",
rusqlite::params![path, role, suspended as i64],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_object(conn: &RusqliteConnection, hash: &str, excluded: bool) -> i64 {
conn.execute(
"INSERT INTO objects (hash_type, hash_value, excluded) VALUES ('sha256', ?, ?)",
rusqlite::params![hash, excluded as i64],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_source(
conn: &RusqliteConnection,
root_id: i64,
rel_path: &str,
object_id: Option<i64>,
excluded: bool,
) -> i64 {
conn.execute(
"INSERT INTO sources (root_id, rel_path, object_id, size, mtime, partial_hash, scanned_at, last_seen_at, device, inode, excluded)
VALUES (?, ?, ?, 1000, 1704067200, '', 0, 0, 0, 0, ?)",
rusqlite::params![root_id, rel_path, object_id, excluded as i64],
)
.unwrap();
conn.last_insert_rowid()
}
#[test]
fn test_cluster_excludes_suspended_roots() {
let mut conn = setup_test_db();
let active_root = insert_root(&conn, "/active", "source", false);
let suspended_root = insert_root(&conn, "/suspended", "source", true);
let obj1 = insert_object(&conn, "hash1", false);
let obj2 = insert_object(&conn, "hash2", false);
insert_source(&conn, active_root, "file1.jpg", Some(obj1), false);
insert_source(&conn, suspended_root, "file2.jpg", Some(obj2), false);
let (sources, _archived, _excluded_count, _unhashed_count, _facts) =
query_sources(&mut conn, &[], &[], false).unwrap();
assert_eq!(
sources.len(),
1,
"Should exclude sources from suspended roots"
);
assert_eq!(sources[0].path, "/active/file1.jpg");
}
#[test]
fn test_cluster_excludes_excluded_sources() {
let mut conn = setup_test_db();
let root = insert_root(&conn, "/photos", "source", false);
let normal_obj = insert_object(&conn, "normal_hash", false);
insert_source(&conn, root, "normal.jpg", Some(normal_obj), false);
let source_excl_obj = insert_object(&conn, "source_excl_hash", false);
insert_source(
&conn,
root,
"source_excluded.jpg",
Some(source_excl_obj),
true,
);
let object_excl_obj = insert_object(&conn, "object_excl_hash", true);
insert_source(
&conn,
root,
"object_excluded.jpg",
Some(object_excl_obj),
false,
);
let (sources, _archived, excluded_count, _unhashed_count, _facts) =
query_sources(&mut conn, &[], &[], false).unwrap();
assert_eq!(
sources.len(),
1,
"Should exclude both source-level and object-level excluded"
);
assert_eq!(sources[0].path, "/photos/normal.jpg");
assert_eq!(excluded_count, 2, "Should count both excluded sources");
}
#[test]
fn test_cluster_archive_detection_counts_sources_not_objects() {
let mut conn = setup_test_db();
let source_root = insert_root(&conn, "/photos", "source", false);
let archive_root = insert_root(&conn, "/archive", "archive", false);
let archived_obj = insert_object(&conn, "archived_hash", false);
insert_source(&conn, source_root, "photo1.jpg", Some(archived_obj), false);
insert_source(&conn, source_root, "photo2.jpg", Some(archived_obj), false);
insert_source(&conn, source_root, "photo3.jpg", Some(archived_obj), false);
let unarchived_obj = insert_object(&conn, "unarchived_hash", false);
insert_source(
&conn,
source_root,
"photo4.jpg",
Some(unarchived_obj),
false,
);
insert_source(&conn, archive_root, "backup.jpg", Some(archived_obj), false);
let (sources, archived, _excluded_count, _unhashed_count, _facts) =
query_sources(&mut conn, &[], &[], false).unwrap();
assert_eq!(
archived.len(),
3,
"Should detect 3 SOURCES as already archived, not 1 unique object"
);
assert_eq!(
sources.len(),
1,
"Only unarchived source should be in sources"
);
assert_eq!(sources[0].path, "/photos/photo4.jpg");
}
}