use std::collections::{HashMap, HashSet};
use tracing::trace;
use super::digest::CrossLocationIdentity;
use super::file_type::FileType;
use super::fingerprint::FileFingerprint;
use super::location::LocationId;
use super::location_file::LocationFile;
use super::topology_file::TopologyFile;
#[derive(Debug, Clone)]
pub enum DistributeAction {
Send(SendAction),
Update(UpdateAction),
#[allow(dead_code)] Delete(DeleteAction),
}
#[derive(Debug, Clone)]
pub struct SendAction {
pub(crate) topology_file_id: String,
pub(crate) relative_path: String,
#[allow(dead_code)] pub(crate) file_type: FileType,
pub(crate) target: LocationId,
pub(crate) source: LocationId,
}
#[derive(Debug, Clone)]
pub struct UpdateAction {
pub(crate) topology_file_id: String,
pub(crate) relative_path: String,
pub(crate) target: LocationId,
pub(crate) source: LocationId,
}
#[derive(Debug, Clone)]
pub struct DeleteAction {
pub(crate) topology_file_id: String,
pub(crate) relative_path: String,
pub(crate) target: LocationId,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ConflictEntry {
pub(crate) topology_file_id: String,
pub(crate) relative_path: String,
pub(crate) variants: Vec<ConflictVariant>,
}
impl ConflictEntry {
pub fn topology_file_id(&self) -> &str {
&self.topology_file_id
}
pub fn relative_path(&self) -> &str {
&self.relative_path
}
pub fn variants(&self) -> &[ConflictVariant] {
&self.variants
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ConflictVariant {
pub(crate) location_id: LocationId,
pub(crate) fingerprint: FileFingerprint,
}
impl ConflictVariant {
pub fn location_id(&self) -> &LocationId {
&self.location_id
}
pub fn fingerprint(&self) -> &FileFingerprint {
&self.fingerprint
}
}
#[derive(Debug)]
pub struct DistributeResult {
pub actions: Vec<DistributeAction>,
pub conflicts: Vec<ConflictEntry>,
}
impl std::fmt::Display for ConflictEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CONFLICT {} [", self.relative_path)?;
for (i, v) in self.variants.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}", v.location_id)?;
}
write!(f, "]")
}
}
impl DistributeAction {
pub fn topology_file_id(&self) -> &str {
match self {
Self::Send(a) => &a.topology_file_id,
Self::Update(a) => &a.topology_file_id,
Self::Delete(a) => &a.topology_file_id,
}
}
#[cfg(test)]
pub fn relative_path(&self) -> &str {
match self {
Self::Send(a) => &a.relative_path,
Self::Update(a) => &a.relative_path,
Self::Delete(a) => &a.relative_path,
}
}
pub fn target(&self) -> &LocationId {
match self {
Self::Send(a) => &a.target,
Self::Update(a) => &a.target,
Self::Delete(a) => &a.target,
}
}
#[cfg(test)]
pub fn is_send(&self) -> bool {
matches!(self, Self::Send(_))
}
#[cfg(test)]
pub fn is_update(&self) -> bool {
matches!(self, Self::Update(_))
}
pub fn is_delete(&self) -> bool {
matches!(self, Self::Delete(_))
}
}
impl std::fmt::Display for DistributeAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Send(a) => write!(f, "SEND {} → [{}]", a.relative_path, a.target),
Self::Update(a) => write!(f, "UPDATE {} → [{}]", a.relative_path, a.target),
Self::Delete(a) => write!(f, "DELETE {} @ [{}]", a.relative_path, a.target),
}
}
}
pub fn distribute_actions(
topology_files: &[&TopologyFile],
location_files: &HashMap<String, Vec<&LocationFile>>,
target_locations: &[LocationId],
ingest_origins: &HashMap<String, HashSet<LocationId>>,
) -> DistributeResult {
trace!(
topology_files = topology_files.len(),
target_locations = target_locations.len(),
ingest_origins = ingest_origins.len(),
"distribute_actions: start"
);
let mut actions = Vec::new();
let mut conflicts = Vec::new();
let empty_origins = HashSet::new();
for tf in topology_files {
let origins = ingest_origins.get(tf.id()).unwrap_or(&empty_origins);
distribute_file(
tf,
origins,
location_files,
target_locations,
&mut actions,
&mut conflicts,
);
}
trace!(
actions = actions.len(),
conflicts = conflicts.len(),
"distribute_actions: done"
);
DistributeResult { actions, conflicts }
}
fn distribute_file(
tf: &TopologyFile,
origins: &HashSet<LocationId>,
location_files: &HashMap<String, Vec<&LocationFile>>,
target_locations: &[LocationId],
actions: &mut Vec<DistributeAction>,
conflicts: &mut Vec<ConflictEntry>,
) {
let file_id = tf.id();
let has_conflict = if let Some(entry) =
detect_conflict(file_id, tf.relative_path(), origins, location_files)
{
conflicts.push(entry);
true
} else {
false
};
let Some(source) = pick_source(file_id, origins, location_files) else {
trace!(file_id = %file_id, path = %tf.relative_path(), "no source found, skip");
return;
};
let empty_lfs: Vec<&LocationFile> = Vec::new();
let lfs = location_files.get(file_id).unwrap_or(&empty_lfs);
let lf_by_location: HashMap<&LocationId, &&LocationFile> =
lfs.iter().map(|lf| (lf.location_id(), lf)).collect();
let latest_fp = latest_fingerprint(file_id, origins, location_files);
for target in target_locations {
if origins.contains(target) || target == &source {
continue;
}
emit_action_for_target(
tf,
target,
&source,
&lf_by_location,
latest_fp.as_ref(),
has_conflict,
actions,
);
}
}
fn emit_action_for_target(
tf: &TopologyFile,
target: &LocationId,
source: &LocationId,
lf_by_location: &HashMap<&LocationId, &&LocationFile>,
latest_fp: Option<&FileFingerprint>,
has_conflict: bool,
actions: &mut Vec<DistributeAction>,
) {
let file_id = tf.id();
match lf_by_location.get(target) {
Some(lf) => {
if has_conflict {
return; }
if !lf.state().is_distribute_target() {
return; }
if lf.state() == super::location_file::LocationFileState::Syncing {
return; }
if lf.state() == super::location_file::LocationFileState::Missing {
trace!(file_id = %file_id, target = %target, source = %source, "Update (missing)");
actions.push(DistributeAction::Update(UpdateAction {
topology_file_id: file_id.to_string(),
relative_path: tf.relative_path().to_string(),
target: target.clone(),
source: source.clone(),
}));
return;
}
let Some(fp) = latest_fp else {
trace!(file_id = %file_id, target = %target, "no latest fingerprint, skip Update");
return;
};
let target_id = CrossLocationIdentity::from_fingerprint(lf.fingerprint());
let latest_id = CrossLocationIdentity::from_fingerprint(fp);
if !latest_id.matches(&target_id) {
trace!(file_id = %file_id, target = %target, source = %source, "Update");
actions.push(DistributeAction::Update(UpdateAction {
topology_file_id: file_id.to_string(),
relative_path: tf.relative_path().to_string(),
target: target.clone(),
source: source.clone(),
}));
}
}
None => {
trace!(file_id = %file_id, target = %target, source = %source, "Send");
actions.push(DistributeAction::Send(SendAction {
topology_file_id: file_id.to_string(),
relative_path: tf.relative_path().to_string(),
file_type: tf.file_type(),
target: target.clone(),
source: source.clone(),
}));
}
}
}
#[cfg(test)]
pub fn distribute_delete_actions(
deleted_topology_files: &[&TopologyFile],
location_files: &HashMap<String, Vec<&LocationFile>>,
target_locations: &[LocationId],
) -> Vec<DistributeAction> {
let mut actions = Vec::new();
for tf in deleted_topology_files {
let file_id = tf.id();
let empty_lfs: Vec<&LocationFile> = Vec::new();
let lfs = location_files.get(file_id).unwrap_or(&empty_lfs);
let lf_locations: HashSet<&LocationId> = lfs.iter().map(|lf| lf.location_id()).collect();
for target in target_locations {
if lf_locations.contains(target) {
actions.push(DistributeAction::Delete(DeleteAction {
topology_file_id: file_id.to_string(),
relative_path: tf.relative_path().to_string(),
target: target.clone(),
}));
}
}
}
actions
}
fn detect_conflict(
file_id: &str,
relative_path: &str,
origins: &HashSet<LocationId>,
location_files: &HashMap<String, Vec<&LocationFile>>,
) -> Option<ConflictEntry> {
if origins.len() < 2 {
return None;
}
let lfs = location_files.get(file_id)?;
let mut variants: Vec<ConflictVariant> = Vec::new();
for origin in origins {
if let Some(lf) = lfs.iter().find(|lf| lf.location_id() == origin) {
variants.push(ConflictVariant {
location_id: origin.clone(),
fingerprint: lf.fingerprint().clone(),
});
}
}
if variants.len() < 2 {
return None;
}
let base_identity = CrossLocationIdentity::from_fingerprint(&variants[0].fingerprint);
let all_match = variants[1..]
.iter()
.all(|v| base_identity.matches(&CrossLocationIdentity::from_fingerprint(&v.fingerprint)));
if all_match {
return None;
}
Some(ConflictEntry {
topology_file_id: file_id.to_string(),
relative_path: relative_path.to_string(),
variants,
})
}
fn pick_source(
file_id: &str,
origins: &HashSet<LocationId>,
location_files: &HashMap<String, Vec<&LocationFile>>,
) -> Option<LocationId> {
if let Some(origin) = origins.iter().min() {
return Some(origin.clone());
}
if let Some(lfs) = location_files.get(file_id) {
let mut candidates: Vec<&LocationId> = lfs
.iter()
.filter(|lf| lf.state().is_source_eligible())
.map(|lf| lf.location_id())
.collect();
candidates.sort();
return candidates.first().map(|id| (*id).clone());
}
None
}
fn latest_fingerprint(
file_id: &str,
origins: &HashSet<LocationId>,
location_files: &HashMap<String, Vec<&LocationFile>>,
) -> Option<FileFingerprint> {
let lfs = location_files.get(file_id)?;
let mut sorted_origins: Vec<&LocationId> = origins.iter().collect();
sorted_origins.sort();
for origin in &sorted_origins {
if let Some(lf) = lfs.iter().find(|lf| lf.location_id() == *origin) {
return Some(lf.fingerprint().clone());
}
}
let mut active: Vec<&LocationFile> = lfs
.iter()
.copied()
.filter(|lf| lf.state().is_source_eligible())
.collect();
active.sort_by_key(|lf| lf.location_id());
active.first().map(|lf| lf.fingerprint().clone())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::test_helpers::{cloud, local, local_fp, pod};
use crate::domain::topology_file::TopologyFile;
use std::collections::{HashMap, HashSet};
#[test]
fn send_action_accessors() {
let action = DistributeAction::Send(SendAction {
topology_file_id: "tf-1".into(),
relative_path: "output/001.png".into(),
file_type: FileType::Image,
target: pod(),
source: local(),
});
assert_eq!(action.topology_file_id(), "tf-1");
assert_eq!(action.relative_path(), "output/001.png");
assert_eq!(action.target(), &pod());
assert!(action.is_send());
assert!(!action.is_update());
assert!(!action.is_delete());
}
#[test]
fn update_action_accessors() {
let action = DistributeAction::Update(UpdateAction {
topology_file_id: "tf-1".into(),
relative_path: "output/002.png".into(),
target: cloud(),
source: local(),
});
assert!(action.is_update());
}
#[test]
fn delete_action_accessors() {
let action = DistributeAction::Delete(DeleteAction {
topology_file_id: "tf-1".into(),
relative_path: "output/gone.png".into(),
target: pod(),
});
assert!(action.is_delete());
assert_eq!(action.target(), &pod());
}
#[test]
fn display_send_action() {
let action = DistributeAction::Send(SendAction {
topology_file_id: "tf-1".into(),
relative_path: "a.png".into(),
file_type: FileType::Image,
target: pod(),
source: local(),
});
assert!(action.to_string().starts_with("SEND"));
}
#[test]
fn display_delete_action() {
let action = DistributeAction::Delete(DeleteAction {
topology_file_id: "tf-1".into(),
relative_path: "a.png".into(),
target: pod(),
});
assert!(action.to_string().starts_with("DELETE"));
}
fn make_tf(path: &str) -> TopologyFile {
TopologyFile::new(path.to_string(), FileType::Image).unwrap()
}
fn make_lf(file_id: &str, location: &LocationId, fp: &FileFingerprint) -> LocationFile {
LocationFile::new(
file_id.to_string(),
location.clone(),
"dummy.png".to_string(),
fp.clone(),
None,
)
.unwrap()
}
fn make_lf_with_state(
file_id: &str,
location: &LocationId,
fp: &FileFingerprint,
state: crate::domain::location_file::LocationFileState,
) -> LocationFile {
use chrono::Utc;
LocationFile::reconstitute(
file_id.to_string(),
location.clone(),
"dummy.png".to_string(),
fp.clone(),
state,
None,
Utc::now(),
)
}
#[test]
fn distribute_sends_to_location_without_file() {
let tf = make_tf("output/001.png");
let lf_local = make_lf(tf.id(), &local(), &local_fp("h1", 1024));
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local()]));
let result = distribute_actions(
&[&tf],
&location_files,
&[local(), pod(), cloud()],
&origins,
);
assert_eq!(result.actions.len(), 2);
assert!(result.actions.iter().all(|a| a.is_send()));
let targets: HashSet<_> = result.actions.iter().map(|a| a.target().clone()).collect();
assert!(targets.contains(&pod()));
assert!(targets.contains(&cloud()));
assert!(result.conflicts.is_empty());
}
#[test]
fn distribute_updates_stale_location() {
let tf = make_tf("output/001.png");
let lf_local = make_lf(tf.id(), &local(), &local_fp("new_hash", 2048));
let lf_pod = make_lf(tf.id(), &pod(), &local_fp("old_hash", 1024));
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local()]));
let result = distribute_actions(&[&tf], &location_files, &[local(), pod()], &origins);
assert_eq!(result.actions.len(), 1);
assert!(result.actions[0].is_update());
assert_eq!(result.actions[0].target(), &pod());
assert!(result.conflicts.is_empty());
}
#[test]
fn distribute_skips_up_to_date_location() {
let tf = make_tf("output/001.png");
let fp = local_fp("same_hash", 1024);
let lf_local = make_lf(tf.id(), &local(), &fp);
let lf_pod = make_lf(tf.id(), &pod(), &fp);
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local()]));
let result = distribute_actions(&[&tf], &location_files, &[local(), pod()], &origins);
assert_eq!(result.actions.len(), 0, "fingerprint一致 → skip");
assert!(result.conflicts.is_empty());
}
#[test]
fn distribute_skips_archived_location() {
use crate::domain::location_file::LocationFileState;
let tf = make_tf("output/001.png");
let lf_local = make_lf(tf.id(), &local(), &local_fp("new", 2048));
let lf_pod = make_lf_with_state(
tf.id(),
&pod(),
&local_fp("old", 1024),
LocationFileState::Archived,
);
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local()]));
let result = distribute_actions(&[&tf], &location_files, &[local(), pod()], &origins);
assert_eq!(result.actions.len(), 0, "Archived → skip");
}
#[test]
fn distribute_skips_syncing_location() {
use crate::domain::location_file::LocationFileState;
let tf = make_tf("output/001.png");
let lf_local = make_lf(tf.id(), &local(), &local_fp("new", 2048));
let lf_pod = make_lf_with_state(
tf.id(),
&pod(),
&local_fp("old", 1024),
LocationFileState::Syncing,
);
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local()]));
let result = distribute_actions(&[&tf], &location_files, &[local(), pod()], &origins);
assert_eq!(result.actions.len(), 0, "Syncing → skip");
}
#[test]
fn distribute_excludes_ingest_origin() {
let tf = make_tf("output/001.png");
let lf_local = make_lf(tf.id(), &local(), &local_fp("h1", 1024));
let lf_pod = make_lf(tf.id(), &pod(), &local_fp("h1", 1024));
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local(), pod()]));
let result = distribute_actions(
&[&tf],
&location_files,
&[local(), pod(), cloud()],
&origins,
);
assert_eq!(result.actions.len(), 1);
assert!(result.actions[0].is_send());
assert_eq!(result.actions[0].target(), &cloud());
assert!(result.conflicts.is_empty());
}
#[test]
fn distribute_picks_active_source_when_no_origin() {
let tf = make_tf("output/001.png");
let lf_local = make_lf(tf.id(), &local(), &local_fp("h1", 1024));
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local]);
let origins: HashMap<String, HashSet<LocationId>> = HashMap::new();
let result = distribute_actions(&[&tf], &location_files, &[local(), pod()], &origins);
assert_eq!(result.actions.len(), 1);
assert!(result.actions[0].is_send());
assert_eq!(result.actions[0].target(), &pod());
}
#[test]
fn distribute_detects_conflict_when_origins_have_different_fingerprints() {
let tf = make_tf("output/001.png");
let lf_local = make_lf(tf.id(), &local(), &local_fp("hash_local", 1024));
let lf_pod = make_lf(tf.id(), &pod(), &local_fp("hash_pod", 2048));
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local(), pod()]));
let result = distribute_actions(
&[&tf],
&location_files,
&[local(), pod(), cloud()],
&origins,
);
assert_eq!(result.conflicts.len(), 1);
let conflict = &result.conflicts[0];
assert_eq!(conflict.topology_file_id(), tf.id());
assert_eq!(conflict.relative_path(), "output/001.png");
assert_eq!(conflict.variants().len(), 2);
assert_eq!(result.actions.len(), 1);
assert!(result.actions[0].is_send());
assert_eq!(result.actions[0].target(), &cloud());
}
#[test]
fn distribute_no_conflict_when_origins_have_same_fingerprint() {
let tf = make_tf("output/001.png");
let fp = local_fp("same_hash", 1024);
let lf_local = make_lf(tf.id(), &local(), &fp);
let lf_pod = make_lf(tf.id(), &pod(), &fp);
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local(), pod()]));
let result = distribute_actions(
&[&tf],
&location_files,
&[local(), pod(), cloud()],
&origins,
);
assert!(result.conflicts.is_empty());
assert_eq!(result.actions.len(), 1);
assert!(result.actions[0].is_send());
assert_eq!(result.actions[0].target(), &cloud());
}
#[test]
fn distribute_conflict_skips_update_but_allows_send() {
let tf = make_tf("output/001.png");
let lf_local = make_lf(tf.id(), &local(), &local_fp("v_local", 1024));
let lf_pod = make_lf(tf.id(), &pod(), &local_fp("v_pod", 2048));
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let mut origins = HashMap::new();
origins.insert(tf.id().to_string(), HashSet::from([local(), pod()]));
let result = distribute_actions(
&[&tf],
&location_files,
&[local(), pod(), cloud()],
&origins,
);
assert_eq!(result.conflicts.len(), 1);
assert_eq!(result.actions.len(), 1);
assert!(result.actions[0].is_send());
assert_eq!(result.actions[0].target(), &cloud());
}
#[test]
fn distribute_delete_targets_existing_locations() {
let mut tf = make_tf("output/001.png");
tf.mark_deleted();
let lf_local = make_lf(tf.id(), &local(), &local_fp("h1", 1024));
let lf_pod = make_lf(tf.id(), &pod(), &local_fp("h1", 1024));
let mut location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
location_files.insert(tf.id().to_string(), vec![&lf_local, &lf_pod]);
let actions =
distribute_delete_actions(&[&tf], &location_files, &[local(), pod(), cloud()]);
assert_eq!(actions.len(), 2);
assert!(actions.iter().all(|a| a.is_delete()));
let targets: HashSet<_> = actions.iter().map(|a| a.target().clone()).collect();
assert!(targets.contains(&local()));
assert!(targets.contains(&pod()));
assert!(!targets.contains(&cloud()));
}
#[test]
fn distribute_delete_skips_location_without_file() {
let mut tf = make_tf("output/001.png");
tf.mark_deleted();
let location_files: HashMap<String, Vec<&LocationFile>> = HashMap::new();
let actions = distribute_delete_actions(&[&tf], &location_files, &[local(), pod()]);
assert_eq!(actions.len(), 0, "LocationFileなし → Deleteなし");
}
}