use super::compass_app_system::CompassAppSystemParameters;
use super::{
compass_app_ops as ops, compass_map_matching as map_matching_ops, CompassBuilderInventory,
};
use crate::app::compass::compass_app_config::CompassAppConfig;
use crate::app::compass::response::response_persistence_policy::ResponsePersistencePolicy;
use crate::{
app::{compass::CompassAppError, search::SearchApp},
plugin::{input::InputPlugin, output::OutputPlugin},
};
use kdam::Bar;
use rayon::current_num_threads;
use routee_compass_core::algorithm::search::SearchAlgorithm;
use routee_compass_core::model::cost::cost_model_service::CostModelService;
use routee_compass_core::model::map::MapModel;
use routee_compass_core::model::network::Graph;
use routee_compass_core::model::state::StateModel;
use serde_json::Value;
use std::{
path::Path,
sync::{Arc, Mutex},
};
use routee_compass_core::algorithm::map_matching::MapMatchingAlgorithm;
pub struct CompassApp {
pub search_app: Arc<SearchApp>,
pub input_plugins: Vec<Arc<dyn InputPlugin>>,
pub output_plugins: Vec<Arc<dyn OutputPlugin>>,
pub system_parameters: CompassAppSystemParameters,
pub map_matching_algorithm: Arc<dyn MapMatchingAlgorithm>,
}
impl TryFrom<&Path> for CompassApp {
type Error = CompassAppError;
fn try_from(conf_file: &Path) -> Result<Self, Self::Error> {
let config = CompassAppConfig::try_from(conf_file)?;
let builder = CompassBuilderInventory::new()?;
let compass_app = CompassApp::new(&config, &builder)?;
Ok(compass_app)
}
}
impl CompassApp {
pub fn new(
config: &CompassAppConfig,
builder: &CompassBuilderInventory,
) -> Result<Self, CompassAppError> {
let state_model = match &config.state {
Some(state_config) => Arc::new(StateModel::new(state_config.clone())),
None => Arc::new(StateModel::empty()),
};
let cost_model_service = CostModelService::try_from(&config.cost)?;
let label_model_service = builder.build_label_model_service(&config.label)?;
log::info!("app termination model: {:?}", config.termination);
let traversal_model_services = ops::with_timing("traversal models", || {
config.build_traversal_model_services(builder)
})?;
let constraint_model_services = ops::with_timing("constraint models", || {
config.build_constraint_model_services(builder)
})?;
let graph = ops::with_timing("graph", || Ok(Arc::new(Graph::try_from(&config.graph)?)))?;
let map_model = ops::with_timing("map model", || {
let mm = MapModel::new(graph.clone(), config.mapping.clone()).map_err(|e| {
CompassAppError::BuildFailure(format!("unable to load MapModel from config: {e}"))
})?;
Ok(Arc::new(mm))
})?;
let search_algorithm = SearchAlgorithm::from(&config.algorithm);
let search_app = Arc::new(SearchApp::new(
search_algorithm,
graph,
map_model,
state_model,
traversal_model_services,
constraint_model_services,
cost_model_service,
config.termination.clone(),
label_model_service,
config.system.default_edge_list,
));
let input_plugins = ops::with_timing("input plugins", || {
Ok(builder.build_input_plugins(&config.plugin.input_plugins)?)
})?;
let output_plugins = ops::with_timing("output plugins", || {
Ok(builder.build_output_plugins(&config.plugin.output_plugins)?)
})?;
let map_matching_algorithm = ops::with_timing("map matching algorithm", || {
Ok(builder.build_map_matching_algorithm(&config.map_matching)?)
})?;
let app = CompassApp {
search_app,
input_plugins,
output_plugins,
system_parameters: config.system.clone(),
map_matching_algorithm,
};
Ok(app)
}
pub fn run(
&self,
queries: &mut Vec<Value>,
config: Option<&Value>,
) -> Result<Vec<Value>, CompassAppError> {
let override_config_opt: Option<CompassAppSystemParameters> = match config {
Some(c) => serde_json::from_value(c.clone())?,
None => None,
};
let parallelism = override_config_opt
.as_ref()
.and_then(|c| c.parallelism)
.or(self.system_parameters.parallelism)
.unwrap_or(1);
let response_persistence_policy = override_config_opt
.as_ref()
.and_then(|c| c.response_persistence_policy)
.or(self.system_parameters.response_persistence_policy)
.unwrap_or_default();
let response_output_policy = override_config_opt
.as_ref()
.and_then(|c| c.response_output_policy.clone())
.or(self.system_parameters.response_output_policy.clone())
.unwrap_or_default();
let response_writer = response_output_policy.build()?;
let input_plugin_result = ops::apply_input_plugins(
queries,
&self.input_plugins,
self.search_app.clone(),
parallelism,
)?;
let (processed_inputs, input_errors) = input_plugin_result;
let mut load_balanced_inputs =
ops::apply_load_balancing_policy(processed_inputs, parallelism, 1.0)?;
log::info!(
"creating {} parallel batches across {} threads to run queries",
parallelism,
current_num_threads(),
);
let proc_batch_sizes = load_balanced_inputs
.iter()
.map(|qs| qs.len())
.collect::<Vec<_>>();
log::info!("queries assigned per executor: {proc_batch_sizes:?}");
let num_balanced_inputs = load_balanced_inputs
.iter()
.flatten()
.collect::<Vec<_>>()
.len();
let search_pb = Bar::builder()
.total(num_balanced_inputs)
.animation("fillup")
.desc("search")
.build()
.map_err(|e| {
CompassAppError::InternalError(format!("could not build progress bar: {e}"))
})?;
let search_pb_shared = Arc::new(Mutex::new(search_pb));
let run_query_result = match response_persistence_policy {
ResponsePersistencePolicy::PersistResponseInMemory => ops::run_batch_with_responses(
&mut load_balanced_inputs,
&self.output_plugins,
&self.search_app,
&response_writer,
search_pb_shared,
)?,
ResponsePersistencePolicy::DiscardResponseFromMemory => {
ops::run_batch_without_responses(
&mut load_balanced_inputs,
&self.output_plugins,
&self.search_app,
&response_writer,
search_pb_shared,
)?
}
};
eprintln!();
response_writer.close()?;
let run_result = run_query_result
.chain(input_errors)
.collect();
Ok(run_result)
}
}
impl CompassApp {
pub fn map_match(
&self,
queries: &[Value],
config: Option<&Value>,
) -> Result<Vec<Value>, CompassAppError> {
let parallelism = self.get_parallelism(config)?;
log::info!(
"running {} map match queries with parallelism {} across {} threads",
queries.len(),
parallelism,
current_num_threads(),
);
ops::run_batch(queries, parallelism, "map matching", |q| {
self.run_single_map_match(q)
})
}
pub fn run_calculate_path(
&self,
queries: &[Value],
config: Option<&Value>,
) -> Result<Vec<Value>, CompassAppError> {
let parallelism = self.get_parallelism(config)?;
ops::run_batch(queries, parallelism, "calculating paths", |q| {
self.run_single_calculate_path(q)
})
}
fn run_single_map_match(&self, query: &Value) -> Value {
match map_matching_ops::run_single_map_match(
query,
&self.search_app,
&self.map_matching_algorithm,
) {
Ok(response) => response,
Err(e) => serde_json::json!({
"request": query,
"error": e.to_string()
}),
}
}
fn run_single_calculate_path(&self, query: &Value) -> Value {
match ops::run_single_calculate_path(query, &self.search_app, &self.output_plugins) {
Ok(response) => response,
Err(e) => serde_json::json!({
"request": query,
"error": e.to_string()
}),
}
}
fn get_parallelism(&self, config: Option<&Value>) -> Result<usize, CompassAppError> {
let override_config_opt: Option<CompassAppSystemParameters> = match config {
Some(c) => serde_json::from_value(c.clone())?,
None => None,
};
let parallelism = override_config_opt
.as_ref()
.and_then(|c| c.parallelism)
.or(self.system_parameters.parallelism)
.unwrap_or(1);
Ok(parallelism)
}
}
#[cfg(test)]
mod tests {
use super::CompassApp;
use crate::app::compass::CompassAppError;
use routee_compass_core::config::CompassConfigurationError;
use std::path::PathBuf;
#[test]
fn test_e2e_dist_speed_time_traversal() {
let conf_file_test = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("src")
.join("app")
.join("compass")
.join("test")
.join("speeds_test")
.join("speeds_test.toml");
let conf_file_debug = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("src")
.join("app")
.join("compass")
.join("test")
.join("speeds_test")
.join("speeds_debug.toml");
println!(
"attempting to load '{}'",
conf_file_test.to_str().unwrap_or_default()
);
let app = match CompassApp::try_from(conf_file_test.as_path()) {
Ok(a) => Ok(a),
Err(CompassAppError::CompassConfigurationError(
CompassConfigurationError::FileNormalizationNotFound(..),
)) => {
println!(
"attempting to load '{}'",
conf_file_debug.to_str().unwrap_or_default()
);
CompassApp::try_from(conf_file_debug.as_path())
}
Err(other) => panic!("{}", other),
}
.unwrap();
let query = serde_json::json!({
"origin_vertex": 0,
"destination_vertex": 2
});
let mut queries = vec![query];
let result = app.run(&mut queries, None).expect("run failed");
assert_eq!(result.len(), 1, "expected one result");
let route_0 = result[0].get("route").expect("result has no route");
let path_0 = route_0.get("path").expect("result route has no path");
let edge_ids = path_0
.as_array()
.expect("path should be an array of edge IDs");
assert!(!edge_ids.is_empty(), "Path should not be empty");
assert!(
!edge_ids.is_empty() && edge_ids.len() <= 2,
"Path should contain 1-2 edges for route from vertex 0 to vertex 2"
);
let expected_path = serde_json::json!(vec![0, 2]);
assert_eq!(path_0, &expected_path);
}
#[test]
fn test_run_calculate_path() {
let conf_file_test = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("src")
.join("app")
.join("compass")
.join("test")
.join("speeds_test")
.join("speeds_test.toml");
let app = CompassApp::try_from(conf_file_test.as_path()).unwrap();
let query = serde_json::json!({
"path": [
{"edge_id": 0},
{"edge_id": 2}
]
});
let queries = vec![query];
let results = app
.run_calculate_path(&queries, None)
.expect("run_calculate_path failed");
assert_eq!(results.len(), 1, "expected one result");
let result = &results[0];
if let Some(err) = result.get("error") {
panic!("{:?}", err);
}
let route = result.get("route").expect("result should have route");
let path = route.get("path").expect("route should have path");
assert_eq!(path, &serde_json::json!(vec![0, 2]));
let traversal_summary = route
.get("traversal_summary")
.expect("route should have traversal_summary");
assert!(
traversal_summary.get("edge_distance").is_some(),
"summary should have edge_distance"
);
assert!(
traversal_summary.get("edge_time").is_some(),
"summary should have edge_time"
);
}
}