use crate::graphs::arc_list_graph;
use crate::prelude::sort_pairs::KMergeIters;
use crate::prelude::*;
use anyhow::{Context, Result, ensure};
use dsi_progress_logger::prelude::*;
use lender::*;
use tempfile::Builder;
use value_traits::slices::SliceByValue;
pub fn permute(
graph: &impl SequentialGraph,
perm: &impl SliceByValue<Value = usize>,
memory_usage: MemoryUsage,
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<CodecIter<DefaultBatchCodec>, ()>>>> {
ensure!(
perm.len() == graph.num_nodes(),
"The given permutation has {} values and thus it's incompatible with a graph with {} nodes.",
perm.len(),
graph.num_nodes(),
);
let dir = Builder::new().prefix("permute_").tempdir()?;
log::info!(
"Creating a temporary directory for the sorted pairs: {}",
dir.path().display()
);
let mut sorted = SortPairs::new(memory_usage, dir.path())?;
let pgraph = PermutedGraph { graph, perm };
let mut pl = ProgressLogger::default();
pl.item_name("node")
.expected_updates(Some(graph.num_nodes()));
pl.start("Creating batches...");
for_!( (src, succ) in pgraph.iter() {
for dst in succ {
sorted.push(src, dst)?;
}
pl.light_update();
});
let edges = sorted.iter().context("Could not read arcs")?;
let sorted = arc_list_graph::ArcListGraph::new_labeled(graph.num_nodes(), edges);
pl.done();
Ok(Left(sorted))
}
pub fn permute_split<S, P>(
graph: &S,
perm: &P,
memory_usage: MemoryUsage,
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<CodecIter<DefaultBatchCodec>, ()>>>>
where
S: SequentialGraph + SplitLabeling,
P: SliceByValue<Value = usize> + Send + Sync + Clone,
for<'a> <S as SequentialLabeling>::Lender<'a>: Send + Sync + Clone + ExactSizeLender,
{
ensure!(
perm.len() == graph.num_nodes(),
"The given permutation has {} values and thus it's incompatible with a graph with {} nodes.",
perm.len(),
graph.num_nodes(),
);
let pgraph = PermutedGraph { graph, perm };
let num_threads = rayon::current_num_threads();
let mut dirs = vec![];
let edges = rayon::in_place_scope(|scope| {
let (tx, rx) = crossbeam_channel::unbounded();
for (thread_id, iter) in pgraph.split_iter(num_threads).enumerate() {
let tx = tx.clone();
let dir = Builder::new()
.prefix(&format!("permute_split_{thread_id}_"))
.tempdir()
.expect("Could not create a temporary directory");
let dir_path = dir.path().to_path_buf();
dirs.push(dir);
scope.spawn(move |_| {
log::debug!("Spawned thread {thread_id}");
let mut sorted = SortPairs::new(memory_usage / num_threads, dir_path).unwrap();
for_!( (src, succ) in iter {
for dst in succ {
sorted.push(src, dst).unwrap();
}
});
tx.send(sorted.iter().context("Could not read arcs").unwrap())
.expect("Could not send the sorted pairs");
log::debug!("Thread {thread_id} finished");
});
}
drop(tx);
log::debug!("Waiting for threads to finish");
rx.into_rayon_iter().sum()
});
log::debug!("All threads finished");
Ok(Left(arc_list_graph::ArcListGraph::new_labeled(
graph.num_nodes(),
edges,
)))
}