use std::collections::HashMap;
use std::sync::Arc;
use crate::application::error::SyncError;
use crate::domain::fingerprint::FileFingerprint;
use crate::domain::location::LocationId;
use crate::domain::topology_delta::{
ContentChangedFile, DiscoveredFile, TopologyDelta, VanishedFile,
};
use crate::domain::topology_file::TopologyFile;
use crate::infra::backend::ProgressFn;
use crate::infra::location_file_store::LocationFileStore;
use crate::infra::location_scanner::{LocationScanner, ScannedFile};
use crate::infra::topology_file_store::TopologyFileStore;
#[derive(Debug, Clone)]
pub struct TopologyScanError {
pub path: String,
pub error: String,
}
pub struct ScanResult {
pub deltas: Vec<TopologyDelta>,
pub scanned: usize,
pub scan_errors: Vec<TopologyScanError>,
}
pub struct TopologyScanner {
topology_files: Arc<dyn TopologyFileStore>,
location_files: Arc<dyn LocationFileStore>,
scanners: Vec<Arc<dyn LocationScanner>>,
}
impl TopologyScanner {
pub fn new(
topology_files: Arc<dyn TopologyFileStore>,
location_files: Arc<dyn LocationFileStore>,
scanners: Vec<Arc<dyn LocationScanner>>,
) -> Self {
Self {
topology_files,
location_files,
scanners,
}
}
pub async fn scan_all(
&self,
excludes: &[glob::Pattern],
skip_locations: &std::collections::HashSet<crate::domain::location::LocationId>,
progress: Option<&ProgressFn>,
) -> Result<ScanResult, SyncError> {
let mut all_scanned: Vec<ScannedFile> = Vec::new();
let mut all_errors: Vec<TopologyScanError> = Vec::new();
let location_total = self.scanners.len();
tracing::info!(locations = location_total, "topology_scan: starting");
for (idx, scanner) in self.scanners.iter().enumerate() {
let loc_id = scanner.location_id().clone();
if skip_locations.contains(&loc_id) {
tracing::warn!(
location = %loc_id,
"topology_scan: skipping location (ensure failed)"
);
continue;
}
tracing::info!(
location = %loc_id,
index = idx,
total = location_total,
"topology_scan: scanning location"
);
match scanner.scan(excludes).await {
Ok(result) => {
let entry_count = result.files.len();
let error_count = result.errors.len();
tracing::info!(
location = %loc_id,
entries = entry_count,
errors = error_count,
"topology_scan: location done"
);
if let Some(cb) = &progress {
cb(&format!(
"scan: {loc_id} done ({entry_count} files) [{}/{location_total}]",
idx + 1
));
}
all_scanned.extend(result.files);
all_errors.extend(result.errors.into_iter().map(|e| TopologyScanError {
path: e.path,
error: e.error,
}));
}
Err(e) => {
tracing::error!(
location = %loc_id,
error = %e,
"topology_scan: location failed"
);
}
}
}
let scanned = all_scanned.len();
let deltas = self.compute_topology_deltas(&all_scanned).await?;
tracing::info!(
scanned,
deltas = deltas.len(),
errors = all_errors.len(),
"topology_scan: delta generation complete"
);
Ok(ScanResult {
deltas,
scanned,
scan_errors: all_errors,
})
}
async fn compute_topology_deltas(
&self,
scanned: &[ScannedFile],
) -> Result<Vec<TopologyDelta>, SyncError> {
let mut deltas = Vec::new();
let all_tfs = self.topology_files.list_active(None, None).await?;
let mut by_origin: HashMap<&LocationId, Vec<&ScannedFile>> = HashMap::new();
for entry in scanned {
by_origin.entry(&entry.origin).or_default().push(entry);
}
for (origin, entries) in &by_origin {
let mut matched_tf_ids = std::collections::HashSet::new();
for entry in entries {
let matched = match_and_classify(entry, &all_tfs, &mut matched_tf_ids);
if let Some(delta) = matched {
deltas.push(delta);
}
}
let origin_lfs = self.location_files.list_by_location(origin).await?;
let scanned_paths: std::collections::HashSet<&str> =
entries.iter().map(|e| e.relative_path.as_str()).collect();
for lf in &origin_lfs {
if !lf.state().is_source_eligible() {
continue;
}
let tf_id = lf.file_id();
if matched_tf_ids.contains(tf_id) {
continue;
}
let lf_path = lf.relative_path();
if !scanned_paths.contains(lf_path) {
deltas.push(TopologyDelta::Vanished(VanishedFile {
topology_file_id: tf_id.to_string(),
relative_path: lf_path.to_string(),
origin: (*origin).clone(),
}));
}
}
}
Ok(deltas)
}
}
fn match_and_classify(
entry: &ScannedFile,
all_tfs: &[TopologyFile],
matched_tf_ids: &mut std::collections::HashSet<String>,
) -> Option<TopologyDelta> {
for tf in all_tfs {
if tf.relative_path() == entry.relative_path {
matched_tf_ids.insert(tf.id().to_string());
if fingerprint_changed(tf, &entry.fingerprint) {
tracing::debug!(
path = %entry.relative_path,
tf_id = %tf.id(),
origin = %entry.origin,
size = entry.fingerprint.size,
"match_and_classify: ByPath → ContentChanged"
);
return Some(TopologyDelta::ContentChanged(ContentChangedFile {
topology_file_id: tf.id().to_string(),
relative_path: entry.relative_path.clone(),
file_type: entry.file_type,
old_fingerprint: extract_tf_fingerprint(tf),
new_fingerprint: entry.fingerprint.clone(),
origin: entry.origin.clone(),
embedded_id: entry.embedded_id.clone(),
}));
}
tracing::trace!(
path = %entry.relative_path,
tf_id = %tf.id(),
origin = %entry.origin,
"match_and_classify: ByPath + fingerprint unchanged → skip"
);
return None;
}
}
if let Some(ref scan_cd) = entry.fingerprint.content_digest {
for tf in all_tfs {
if let Some(canonical) = tf.canonical_digest() {
if canonical == scan_cd {
matched_tf_ids.insert(tf.id().to_string());
tracing::debug!(
scan_path = %entry.relative_path,
tf_path = %tf.relative_path(),
tf_id = %tf.id(),
origin = %entry.origin,
"match_and_classify: ByHash → Renamed"
);
return Some(TopologyDelta::Renamed(
crate::domain::topology_delta::RenamedFile {
topology_file_id: tf.id().to_string(),
old_path: tf.relative_path().to_string(),
new_path: entry.relative_path.clone(),
file_type: entry.file_type,
fingerprint: entry.fingerprint.clone(),
origin: entry.origin.clone(),
embedded_id: entry.embedded_id.clone(),
},
));
}
}
}
}
tracing::debug!(
path = %entry.relative_path,
origin = %entry.origin,
size = entry.fingerprint.size,
content_digest = ?entry.fingerprint.content_digest,
"match_and_classify: Discovered (no match in {} topology_files)",
all_tfs.len()
);
Some(TopologyDelta::Discovered(DiscoveredFile {
relative_path: entry.relative_path.clone(),
file_type: entry.file_type,
fingerprint: entry.fingerprint.clone(),
origin: entry.origin.clone(),
embedded_id: entry.embedded_id.clone(),
}))
}
fn fingerprint_changed(tf: &TopologyFile, scan_fp: &FileFingerprint) -> bool {
let scan_canonical = scan_fp.content_digest.as_ref().map(|d| d.0.as_str());
match (tf.canonical_hash(), scan_canonical) {
(Some(db), Some(scan)) => db != scan,
(None, Some(_)) => true,
(Some(_), None) => false,
(None, None) => false,
}
}
fn extract_tf_fingerprint(tf: &TopologyFile) -> FileFingerprint {
FileFingerprint {
byte_digest: None,
content_digest: tf
.canonical_hash()
.map(|s| crate::domain::digest::ContentDigest(s.to_string())),
meta_digest: None,
size: 0,
modified_at: None,
}
}