use std::any::Any;
use std::error;
use std::panic::{self, AssertUnwindSafe};
use std::sync::Arc;
use ciborium;
use serde::de::DeserializeOwned;
use crate::endpoints::{
EventSinkInfoRegistry, EventSinkRegistry, EventSourceRegistry, QuerySourceRegistry,
};
use crate::server::services::from_bench_error;
use crate::simulation::{BenchError, Injector, SimInit, Simulation};
use super::super::codegen::simulation::*;
use super::{bench_not_built_error, from_simulation_error, timestamp_to_monotonic, to_error};
type DeserializationError = ciborium::de::Error<std::io::Error>;
type SimGen = Box<
dyn FnMut(&[u8]) -> Result<Result<SimInit, Box<dyn error::Error>>, DeserializationError>
+ Send
+ 'static,
>;
pub(crate) struct BuildService {
sim_gen: SimGen,
bench: Option<(
SimInit,
EventSinkRegistry,
Arc<EventSourceRegistry>,
Arc<QuerySourceRegistry>,
)>,
}
impl BuildService {
pub(crate) fn new<F, I>(mut sim_gen: F) -> Self
where
F: FnMut(I) -> Result<SimInit, Box<dyn error::Error>> + Send + 'static,
I: DeserializeOwned,
{
let sim_gen = move |serialized_cfg: &[u8]| -> Result<
Result<SimInit, Box<dyn error::Error>>,
DeserializationError,
> {
let cfg = ciborium::from_reader(serialized_cfg)?;
Ok(sim_gen(cfg))
};
Self {
sim_gen: Box::new(sim_gen),
bench: None,
}
}
pub(crate) fn build(
&mut self,
request: BuildRequest,
) -> Result<
(
EventSinkInfoRegistry,
Arc<EventSourceRegistry>,
Arc<QuerySourceRegistry>,
Injector,
),
Error,
> {
panic::catch_unwind(AssertUnwindSafe(|| {
(self.sim_gen)(&request.cfg)
.map_err(from_config_deserialization_error)
.and_then(|bench_result| bench_result.map_err(from_general_bench_error))
}))
.map_err(from_panic)
.and_then(|reply| reply)
.map(|mut bench| {
let (
event_sink_registry,
event_sink_info_registry,
event_source_registry,
query_source_registry,
) = bench.take_endpoints().into_parts();
let event_source_registry = Arc::new(event_source_registry);
let query_source_registry = Arc::new(query_source_registry);
let injector = bench.injector();
self.bench = Some((
bench,
event_sink_registry,
event_source_registry.clone(),
query_source_registry.clone(),
));
(
event_sink_info_registry,
event_source_registry,
query_source_registry,
injector,
)
})
}
pub(crate) fn init(
&mut self,
request: InitRequest,
) -> Result<
(
Simulation,
EventSinkRegistry,
Arc<EventSourceRegistry>,
Arc<QuerySourceRegistry>,
),
Error,
> {
let start_time = request
.time
.and_then(timestamp_to_monotonic)
.ok_or_else(|| {
to_error(
ErrorCode::MissingArgument,
"simulation start time not provided",
)
})?;
let (bench, event_sink_registry, event_source_registry, query_source_registry) =
self.bench.take().ok_or_else(bench_not_built_error)?;
bench
.init(start_time)
.map_err(from_simulation_error)
.map(|simulation| {
(
simulation,
event_sink_registry,
event_source_registry,
query_source_registry,
)
})
}
pub(crate) fn restore(
&mut self,
request: RestoreRequest,
) -> Result<
(
Simulation,
EventSinkRegistry,
Arc<EventSourceRegistry>,
Arc<QuerySourceRegistry>,
),
Error,
> {
let (bench, event_sink_registry, event_source_registry, query_source_registry) =
self.bench.take().ok_or_else(bench_not_built_error)?;
bench
.restore(&request.state[..])
.map_err(from_simulation_error)
.map(|simulation| {
(
simulation,
event_sink_registry,
event_source_registry,
query_source_registry,
)
})
}
}
fn from_panic(payload: Box<dyn Any + Send>) -> Error {
let panic_msg: Option<&str> = if let Some(s) = payload.downcast_ref::<&str>() {
Some(s)
} else if let Some(s) = payload.downcast_ref::<String>() {
Some(s)
} else {
None
};
let error_msg = if let Some(panic_msg) = panic_msg {
format!(
"the simulation bench builder has panicked with the following message: `{panic_msg}`",
)
} else {
String::from("the simulation bench builder has panicked")
};
to_error(ErrorCode::BenchPanic, error_msg)
}
fn from_config_deserialization_error(error: DeserializationError) -> Error {
to_error(
ErrorCode::InvalidMessage,
format!("the simulation bench configuration could not be deserialized: {error}",),
)
}
fn from_general_bench_error(error: Box<dyn error::Error>) -> Error {
match error.downcast::<BenchError>() {
Ok(bench_err) => from_bench_error(*bench_err),
Err(error) => to_error(
ErrorCode::BenchError,
format!("simulation bench building has failed with the following error: {error}",),
),
}
}