mod algorithm;
mod astar;
mod features;
mod long_range;
mod scenario;
use std::collections::HashSet;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, mpsc};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use tracing::{debug, info};
use crate::{ArrayIndex, RevrtRoutingSolutions, Solution, error::Result};
use algorithm::Algorithm;
use algorithm::AlgorithmType;
use scenario::Scenario;
const CACHE_BUDGET_PERCENT: u64 = 25;
pub(super) struct Routing {
scenario: Scenario,
algorithm: Algorithm,
}
impl Routing {
pub(super) fn compute(
&mut self,
start: &[ArrayIndex],
end: Vec<ArrayIndex>,
) -> impl Iterator<Item = Solution<ArrayIndex, f32>> {
debug!("Starting compute with {} start points", start.len());
compute_route_attempt_result(&self.scenario, &self.algorithm, start, &end).into_iter()
}
pub(super) fn new<P: AsRef<std::path::Path>>(
store_path: P,
cost_function: crate::cost::CostFunction,
mem_limit_bytes: u64,
algorithm: &str,
) -> Result<Self> {
let algorithm = AlgorithmType::from_str(algorithm)?;
let cache_size = cache_budget_bytes(mem_limit_bytes);
let rayon_worker_total_budget_bytes = mem_limit_bytes - cache_size;
let scenario = Scenario::new(store_path, cost_function, cache_size)?;
let algorithm = Algorithm::from_selection(
algorithm,
per_rayon_worker_memory_budget(rayon_worker_total_budget_bytes),
);
Ok(Self {
scenario,
algorithm,
})
}
pub(super) fn new_wth_swap<P: AsRef<std::path::Path>>(
store_path: P,
cost_function: crate::cost::CostFunction,
swap_fp: PathBuf,
mem_limit_bytes: u64,
algorithm: &str,
) -> Result<Self> {
let algorithm = AlgorithmType::from_str(algorithm)?;
let cache_size = cache_budget_bytes(mem_limit_bytes);
let rayon_worker_total_budget_bytes = mem_limit_bytes - cache_size;
let scenario = Scenario::new_with_swap(store_path, cost_function, cache_size, swap_fp)?;
let algorithm = Algorithm::from_selection(
algorithm,
per_rayon_worker_memory_budget(rayon_worker_total_budget_bytes),
);
Ok(Self {
scenario,
algorithm,
})
}
}
pub(super) struct RouteDefinition {
pub(super) route_id: u32,
pub(super) start_inds: Vec<ArrayIndex>,
pub(super) end_inds: HashSet<ArrayIndex>,
}
pub(super) struct ParRouting {
scenario: Arc<Scenario>,
algorithm: Arc<Algorithm>,
}
impl ParRouting {
pub(super) fn new<P: AsRef<std::path::Path>>(
store_path: P,
cost_function: crate::cost::CostFunction,
mem_limit_bytes: u64,
algorithm: &str,
) -> Result<Self> {
let algorithm = AlgorithmType::from_str(algorithm)?;
let cache_size = cache_budget_bytes(mem_limit_bytes);
let rayon_worker_total_budget_bytes = mem_limit_bytes - cache_size;
let scenario = Scenario::new(store_path, cost_function, cache_size)?;
Ok(Self {
scenario: Arc::new(scenario),
algorithm: Arc::new(Algorithm::from_selection(
algorithm,
per_rayon_worker_memory_budget(rayon_worker_total_budget_bytes),
)),
})
}
pub(super) fn new_with_swap<P: AsRef<std::path::Path>>(
store_path: P,
cost_function: crate::cost::CostFunction,
mem_limit_bytes: u64,
swap_fp: PathBuf,
algorithm: &str,
) -> Result<Self> {
let algorithm = AlgorithmType::from_str(algorithm)?;
let cache_size = cache_budget_bytes(mem_limit_bytes);
let rayon_worker_total_budget_bytes = mem_limit_bytes - cache_size;
let scenario = Scenario::new_with_swap(store_path, cost_function, cache_size, swap_fp)?;
Ok(Self {
scenario: Arc::new(scenario),
algorithm: Arc::new(Algorithm::from_selection(
algorithm,
per_rayon_worker_memory_budget(rayon_worker_total_budget_bytes),
)),
})
}
pub(super) fn lazy_scout<I>(
&self,
route_definitions: I,
tx: mpsc::Sender<(u32, RevrtRoutingSolutions)>,
) where
I: IntoParallelIterator<Item = RouteDefinition> + Send + 'static,
I::Iter: Send,
{
let scenario = Arc::clone(&self.scenario);
let algorithm = Arc::clone(&self.algorithm);
rayon::spawn(move || {
let _ = route_definitions.into_par_iter().try_for_each_with(
tx,
|sender,
RouteDefinition {
route_id,
start_inds,
end_inds,
}| {
debug!("Computing routes between {start_inds:?} and {end_inds:?}");
let result = compute_route_attempt_result(
&scenario,
&algorithm,
&start_inds,
&end_inds.iter().cloned().collect::<Vec<_>>(),
);
let num_routes = result.len();
debug!("Finished computing {num_routes} route(s) to {end_inds:?}");
sender.send((route_id, result))
},
);
});
}
}
fn compute_route_attempt_result(
scenario: &Scenario,
algorithm: &Algorithm,
start: &[ArrayIndex],
end: &[ArrayIndex],
) -> RevrtRoutingSolutions {
start
.into_par_iter()
.filter_map(|start_point| compute_solution_for_start(scenario, algorithm, start_point, end))
.collect()
}
fn compute_solution_for_start(
scenario: &Scenario,
algorithm: &Algorithm,
start_point: &ArrayIndex,
end: &[ArrayIndex],
) -> Option<Solution<ArrayIndex, f32>> {
let grid_shape = scenario.grid_shape();
for dropped_soft_groups in 0..=scenario.soft_barrier_group_count() {
if dropped_soft_groups > 0 {
info!(
"Retrying route from {:?} with {} soft-barrier group(s) dropped",
start_point, dropped_soft_groups
);
}
let solution = algorithm.compute(
start_point,
end,
|p| scenario.successors_for_attempt(p, dropped_soft_groups),
|p| end.contains(p),
grid_shape,
);
if let Some(solution) = solution {
let dropped_barrier_layers = scenario.dropped_barrier_layers(dropped_soft_groups);
return Some(solution.record_dropped_barriers(dropped_barrier_layers));
}
}
None
}
const PRECISION_SCALAR: f32 = 1e4;
fn cost_as_u64(cost: f32) -> u64 {
let cost = cost * PRECISION_SCALAR;
cost as u64
}
fn unscaled_cost(cost: u64) -> f32 {
(cost as f32) / PRECISION_SCALAR
}
fn cache_budget_bytes(mem_limit_bytes: u64) -> u64 {
mem_limit_bytes * CACHE_BUDGET_PERCENT / 100
}
fn per_rayon_worker_memory_budget(total_budget_bytes: u64) -> u64 {
let worker_count = rayon::current_num_threads().max(1) as u64;
let per_worker_budget = total_budget_bytes / worker_count;
debug!(
"Splitting {} bytes across {} Rayon workers ({} bytes per worker)",
total_budget_bytes, worker_count, per_worker_budget
);
per_worker_budget
}
#[cfg(test)]
mod tests {
use super::{Algorithm, AlgorithmType, Scenario, compute_route_attempt_result};
use crate::ArrayIndex;
#[test]
fn compute_route_attempt_result_tracks_dropped_barriers_per_start() {
let store = crate::dataset::samples::ZarrTestBuilder::new()
.dimensions(1, 3, 5)
.chunks(1, 3, 5)
.layer(crate::dataset::samples::LayerConfig::ones("cost"))
.layer(crate::dataset::samples::LayerConfig::new(
"hard_barrier",
crate::dataset::samples::FillStrategy::Values(vec![
0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
]),
))
.layer(crate::dataset::samples::LayerConfig::new(
"soft_barrier",
crate::dataset::samples::FillStrategy::Values(vec![
0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
]),
))
.build()
.unwrap();
let cost_function = crate::cost::CostFunction::from_json(
r#"{
"cost_layers": [{"layer_name": "cost"}],
"barrier_layers": [
{
"layer_name": "hard_barrier",
"barrier_operator": "eq",
"barrier_threshold": 1.0
},
{
"layer_name": "soft_barrier",
"barrier_operator": "eq",
"barrier_threshold": 1.0,
"barrier_importance": 1
}
],
"ignore_invalid_costs": false
}"#,
)
.unwrap();
let scenario = Scenario::new(store.path(), cost_function, 1_000).unwrap();
let algorithm = Algorithm::from_selection(AlgorithmType::Dijkstra, 8 * 1024 * 1024);
let start = [ArrayIndex::new(0, 0), ArrayIndex::new(2, 0)];
let end = [ArrayIndex::new(0, 4), ArrayIndex::new(2, 4)];
let result = compute_route_attempt_result(&scenario, &algorithm, &start, &end);
assert_eq!(result.len(), 2);
let top_solution = result
.iter()
.find(|solution| solution.route().first() == Some(&ArrayIndex::new(0, 0)))
.unwrap();
assert_eq!(top_solution.route().last(), Some(&ArrayIndex::new(0, 4)));
assert_eq!(
top_solution.dropped_barrier_layers(),
&vec!["soft_barrier".to_string()]
);
let bottom_solution = result
.iter()
.find(|solution| solution.route().first() == Some(&ArrayIndex::new(2, 0)))
.unwrap();
assert_eq!(bottom_solution.route().last(), Some(&ArrayIndex::new(2, 4)));
assert!(bottom_solution.dropped_barrier_layers().is_empty());
}
}