use std::collections::{HashMap, HashSet};
use tracing::{debug, trace, warn};
use super::distribute::DistributeAction;
use super::location::LocationId;
use super::transfer::TransferKind;
pub trait Topology: Send + Sync {
fn optimal_tree(
&self,
origin: &LocationId,
required_dests: &HashSet<LocationId>,
) -> Vec<(LocationId, LocationId)>;
fn optimal_tree_multi_source(
&self,
sources: &HashSet<LocationId>,
required_dests: &HashSet<LocationId>,
) -> Vec<(LocationId, LocationId)> {
if let Some(origin) = sources.iter().next() {
self.optimal_tree(origin, required_dests)
} else {
Vec::new()
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlannedTransfer {
pub file_id: String,
pub src: LocationId,
pub dest: LocationId,
pub kind: TransferKind,
pub depends_on_index: Option<usize>,
}
pub fn edges_to_planned_transfers(
tree_edges: &[(LocationId, LocationId)],
file_id: &str,
kind: TransferKind,
) -> Vec<PlannedTransfer> {
let mut result = Vec::with_capacity(tree_edges.len());
for (i, (src, dest)) in tree_edges.iter().enumerate() {
let depends_on = (0..i).rev().find(|&j| &tree_edges[j].1 == src);
result.push(PlannedTransfer {
file_id: file_id.to_string(),
src: src.clone(),
dest: dest.clone(),
kind,
depends_on_index: depends_on,
});
}
result
}
pub fn plan_distribution(
actions: &[DistributeAction],
topology: &dyn Topology,
pending_dests: &HashMap<String, HashSet<LocationId>>,
existing_presences: &HashMap<String, HashSet<LocationId>>,
) -> Vec<PlannedTransfer> {
trace!(
actions = actions.len(),
pending_dests = pending_dests.len(),
existing_presences = existing_presences.len(),
"plan_distribution: start"
);
let mut groups: HashMap<DistributeGroup, HashSet<LocationId>> = HashMap::new();
for action in actions {
let group = DistributeGroup::from_action(action);
groups
.entry(group)
.or_default()
.insert(action.target().clone());
}
trace!(groups = groups.len(), "plan_distribution: grouped");
let empty_pending = HashSet::new();
let empty_presences = HashSet::new();
let mut all_transfers = Vec::new();
let mut skipped_all_pending = 0usize;
let mut skipped_all_existing = 0usize;
let mut unreachable_targets = 0usize;
for (group, mut targets) in groups {
let pending = pending_dests.get(&group.file_id).unwrap_or(&empty_pending);
let pre_filter = targets.len();
targets.retain(|t| !pending.contains(t));
if targets.is_empty() {
skipped_all_pending += 1;
trace!(
file_id = %group.file_id,
kind = ?group.kind,
pre_filter = pre_filter,
"plan_distribution: all targets pending, skip"
);
continue;
}
let existing = existing_presences
.get(&group.file_id)
.unwrap_or(&empty_presences);
let mut sources = existing.clone();
sources.insert(group.source.clone());
let effective_targets: HashSet<LocationId> = targets
.iter()
.filter(|t| !sources.contains(t))
.cloned()
.collect();
if effective_targets.is_empty() {
skipped_all_existing += 1;
trace!(
file_id = %group.file_id,
kind = ?group.kind,
sources = ?sources.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
"plan_distribution: all targets already present in sources, skip"
);
continue;
}
trace!(
file_id = %group.file_id,
kind = ?group.kind,
sources = ?sources.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
targets = ?effective_targets.iter().map(|t| t.to_string()).collect::<Vec<_>>(),
"plan_distribution: computing optimal tree"
);
let tree_edges = topology.optimal_tree_multi_source(&sources, &targets);
let covered_dests: HashSet<&LocationId> = tree_edges.iter().map(|(_, d)| d).collect();
let missing: Vec<&LocationId> = effective_targets
.iter()
.filter(|t| !covered_dests.contains(t))
.collect();
if !missing.is_empty() {
unreachable_targets += missing.len();
warn!(
file_id = %group.file_id,
kind = ?group.kind,
sources = ?sources.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
unreachable = ?missing.iter().map(|t| t.to_string()).collect::<Vec<_>>(),
"plan_distribution: targets unreachable from sources — no route in graph, transfer not planned"
);
}
trace!(
file_id = %group.file_id,
edges = tree_edges.len(),
"plan_distribution: tree computed"
);
let base_offset = all_transfers.len();
let transfers = edges_to_planned_transfers(&tree_edges, &group.file_id, group.kind);
for mut pt in transfers {
pt.depends_on_index = pt.depends_on_index.map(|i| i + base_offset);
all_transfers.push(pt);
}
}
if skipped_all_pending > 0 || skipped_all_existing > 0 || unreachable_targets > 0 {
debug!(
total_transfers = all_transfers.len(),
skipped_all_pending,
skipped_all_existing,
unreachable_targets,
"plan_distribution: done (with skips)"
);
} else {
trace!(
total_transfers = all_transfers.len(),
"plan_distribution: done"
);
}
all_transfers
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct DistributeGroup {
file_id: String,
source: LocationId,
kind: TransferKind,
}
impl DistributeGroup {
fn from_action(action: &DistributeAction) -> Self {
match action {
DistributeAction::Send(a) => Self {
file_id: a.topology_file_id.clone(),
source: a.source.clone(),
kind: TransferKind::Sync,
},
DistributeAction::Update(a) => Self {
file_id: a.topology_file_id.clone(),
source: a.source.clone(),
kind: TransferKind::Sync,
},
DistributeAction::Delete(a) => Self {
file_id: a.topology_file_id.clone(),
source: a.target.clone(),
kind: TransferKind::Delete,
},
}
}
}
#[cfg(test)]
pub fn plan_deletes(
delete_actions: &[DistributeAction],
pending_dests: &HashMap<String, HashSet<LocationId>>,
) -> Vec<PlannedTransfer> {
let empty_pending = HashSet::new();
delete_actions
.iter()
.filter_map(|action| {
if let DistributeAction::Delete(a) = action {
let pending = pending_dests
.get(&a.topology_file_id)
.unwrap_or(&empty_pending);
if pending.contains(&a.target) {
return None;
}
Some(PlannedTransfer {
file_id: a.topology_file_id.clone(),
src: a.target.clone(), dest: a.target.clone(),
kind: TransferKind::Delete,
depends_on_index: None,
})
} else {
None
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::file_type::FileType;
use crate::domain::graph::{EdgeCost, RouteGraph};
fn local() -> LocationId {
LocationId::local()
}
fn cloud() -> LocationId {
LocationId::new("cloud").unwrap()
}
fn pod() -> LocationId {
LocationId::new("pod").unwrap()
}
fn simple_graph() -> RouteGraph {
let mut g = RouteGraph::new();
g.add(local(), cloud());
g
}
fn chain_graph() -> RouteGraph {
let mut g = RouteGraph::new();
g.add_with_cost(local(), pod(), EdgeCost::new(1.0, 10).unwrap());
g.add_with_cost(pod(), cloud(), EdgeCost::new(2.0, 10).unwrap());
g
}
fn chain_with_direct_graph() -> RouteGraph {
let mut g = RouteGraph::new();
g.add_with_cost(local(), pod(), EdgeCost::new(1.0, 10).unwrap());
g.add_with_cost(pod(), cloud(), EdgeCost::new(2.0, 10).unwrap());
g.add_with_cost(local(), cloud(), EdgeCost::new(10.0, 10).unwrap()); g
}
use crate::domain::distribute::{DeleteAction, DistributeAction, SendAction, UpdateAction};
fn send_action(file_id: &str, source: LocationId, target: LocationId) -> DistributeAction {
DistributeAction::Send(SendAction {
topology_file_id: file_id.into(),
relative_path: format!("{file_id}.png"),
file_type: FileType::Image,
target,
source,
})
}
fn update_action(file_id: &str, source: LocationId, target: LocationId) -> DistributeAction {
DistributeAction::Update(UpdateAction {
topology_file_id: file_id.into(),
relative_path: format!("{file_id}.png"),
target,
source,
})
}
fn delete_action(file_id: &str, target: LocationId) -> DistributeAction {
DistributeAction::Delete(DeleteAction {
topology_file_id: file_id.into(),
relative_path: format!("{file_id}.png"),
target,
})
}
#[test]
fn plan_distribution_single_send() {
let actions = vec![send_action("f1", local(), cloud())];
let planned =
plan_distribution(&actions, &simple_graph(), &HashMap::new(), &HashMap::new());
assert_eq!(planned.len(), 1);
assert_eq!(planned[0].file_id, "f1");
assert_eq!(planned[0].src, local());
assert_eq!(planned[0].dest, cloud());
assert_eq!(planned[0].kind, TransferKind::Sync);
assert_eq!(planned[0].depends_on_index, None);
}
#[test]
fn plan_distribution_groups_same_file_targets() {
let actions = vec![
send_action("f1", local(), pod()),
send_action("f1", local(), cloud()),
];
let planned = plan_distribution(&actions, &chain_graph(), &HashMap::new(), &HashMap::new());
assert_eq!(planned.len(), 2);
assert_eq!(planned[0].src, local());
assert_eq!(planned[0].dest, pod());
assert_eq!(planned[0].depends_on_index, None);
assert_eq!(planned[1].src, pod());
assert_eq!(planned[1].dest, cloud());
assert_eq!(planned[1].depends_on_index, Some(0));
}
#[test]
fn plan_distribution_respects_pending() {
let actions = vec![
send_action("f1", local(), pod()),
send_action("f1", local(), cloud()),
];
let mut pending = HashMap::new();
pending.insert("f1".to_string(), HashSet::from([cloud()]));
let planned = plan_distribution(&actions, &chain_graph(), &pending, &HashMap::new());
assert_eq!(planned.len(), 1);
assert_eq!(planned[0].dest, pod());
}
#[test]
fn plan_distribution_update_uses_sync_kind() {
let actions = vec![update_action("f1", local(), cloud())];
let planned =
plan_distribution(&actions, &simple_graph(), &HashMap::new(), &HashMap::new());
assert_eq!(planned.len(), 1);
assert_eq!(planned[0].kind, TransferKind::Sync);
}
#[test]
fn plan_distribution_multiple_files() {
let actions = vec![
send_action("f1", local(), cloud()),
send_action("f2", local(), cloud()),
];
let planned =
plan_distribution(&actions, &simple_graph(), &HashMap::new(), &HashMap::new());
assert_eq!(planned.len(), 2);
let file_ids: HashSet<_> = planned.iter().map(|p| p.file_id.as_str()).collect();
assert!(file_ids.contains("f1"));
assert!(file_ids.contains("f2"));
}
#[test]
fn plan_distribution_chain_with_direct_picks_optimal() {
let actions = vec![
send_action("f1", local(), pod()),
send_action("f1", local(), cloud()),
];
let planned = plan_distribution(
&actions,
&chain_with_direct_graph(),
&HashMap::new(),
&HashMap::new(),
);
assert_eq!(planned.len(), 2);
assert_eq!(planned[0].src, local());
assert_eq!(planned[0].dest, pod());
assert_eq!(planned[1].src, pod());
assert_eq!(planned[1].dest, cloud());
}
#[test]
fn plan_distribution_multi_source_picks_cheaper_relay() {
let mut g = RouteGraph::new();
g.add_with_cost(local(), pod(), EdgeCost::new(1.0, 10).unwrap());
g.add_with_cost(pod(), cloud(), EdgeCost::new(2.0, 10).unwrap());
g.add_with_cost(local(), cloud(), EdgeCost::new(5.0, 10).unwrap());
g.add_with_cost(cloud(), local(), EdgeCost::new(5.0, 10).unwrap());
g.add_with_cost(cloud(), pod(), EdgeCost::new(2.0, 10).unwrap());
let actions = vec![send_action("f1", local(), cloud())];
let mut existing = HashMap::new();
existing.insert("f1".to_string(), HashSet::from([local(), pod()]));
let planned = plan_distribution(&actions, &g, &HashMap::new(), &existing);
assert_eq!(planned.len(), 1);
assert_eq!(planned[0].src, pod());
assert_eq!(planned[0].dest, cloud());
}
#[test]
fn plan_distribution_no_existing_presences_uses_source() {
let mut g = RouteGraph::new();
g.add_with_cost(local(), pod(), EdgeCost::new(1.0, 10).unwrap());
g.add_with_cost(pod(), cloud(), EdgeCost::new(2.0, 10).unwrap());
g.add_with_cost(local(), cloud(), EdgeCost::new(5.0, 10).unwrap());
let actions = vec![send_action("f1", local(), cloud())];
let planned = plan_distribution(&actions, &g, &HashMap::new(), &HashMap::new());
assert_eq!(planned.len(), 2);
assert_eq!(planned[0].src, local());
assert_eq!(planned[0].dest, pod());
assert_eq!(planned[1].src, pod());
assert_eq!(planned[1].dest, cloud());
}
#[test]
fn plan_deletes_creates_individual_transfers() {
let actions = vec![delete_action("f1", pod()), delete_action("f1", cloud())];
let planned = plan_deletes(&actions, &HashMap::new());
assert_eq!(planned.len(), 2);
assert!(planned.iter().all(|p| p.kind == TransferKind::Delete));
assert!(planned.iter().all(|p| p.depends_on_index.is_none()));
let dests: HashSet<_> = planned.iter().map(|p| p.dest.clone()).collect();
assert!(dests.contains(&pod()));
assert!(dests.contains(&cloud()));
}
#[test]
fn plan_deletes_respects_pending() {
let actions = vec![delete_action("f1", pod()), delete_action("f1", cloud())];
let mut pending = HashMap::new();
pending.insert("f1".to_string(), HashSet::from([cloud()]));
let planned = plan_deletes(&actions, &pending);
assert_eq!(planned.len(), 1);
assert_eq!(planned[0].dest, pod());
}
#[test]
fn plan_deletes_ignores_non_delete_actions() {
let actions = vec![
send_action("f1", local(), cloud()),
delete_action("f2", pod()),
];
let planned = plan_deletes(&actions, &HashMap::new());
assert_eq!(planned.len(), 1);
assert_eq!(planned[0].file_id, "f2");
}
#[test]
fn plan_distribution_multi_file_chain_depends_on_index_offset() {
let actions = vec![
send_action("f1", local(), pod()),
send_action("f1", local(), cloud()),
send_action("f2", local(), pod()),
send_action("f2", local(), cloud()),
];
let planned = plan_distribution(&actions, &chain_graph(), &HashMap::new(), &HashMap::new());
assert_eq!(planned.len(), 4);
let f1: Vec<_> = planned.iter().filter(|p| p.file_id == "f1").collect();
let f2: Vec<_> = planned.iter().filter(|p| p.file_id == "f2").collect();
assert_eq!(f1.len(), 2);
assert_eq!(f2.len(), 2);
let f1_hop1_idx = planned
.iter()
.position(|p| p.file_id == "f1" && p.dest == pod())
.unwrap();
let f1_hop2 = planned
.iter()
.find(|p| p.file_id == "f1" && p.dest == cloud())
.unwrap();
assert_eq!(f1_hop2.depends_on_index, Some(f1_hop1_idx));
let f2_hop1_idx = planned
.iter()
.position(|p| p.file_id == "f2" && p.dest == pod())
.unwrap();
let f2_hop2 = planned
.iter()
.find(|p| p.file_id == "f2" && p.dest == cloud())
.unwrap();
assert_eq!(f2_hop2.depends_on_index, Some(f2_hop1_idx));
assert_ne!(f2_hop2.depends_on_index, Some(f1_hop1_idx));
}
}