use std::sync::Mutex;
use futures_util::StreamExt;
use tokio::time as tokio_time;
use crate::endpoints::EventSinkRegistry;
use crate::path::Path as NexosimPath;
use super::super::codegen::simulation::*;
use super::{simulation_not_started_error, to_error, to_positive_duration};
pub(crate) enum MonitorService {
Halted,
Started {
event_sink_registry: EventSinkRegistry,
},
}
impl MonitorService {
pub(crate) fn try_read_events(
&mut self,
request: TryReadEventsRequest,
) -> Result<Vec<Vec<u8>>, Error> {
match self {
Self::Started {
event_sink_registry,
} => {
let sink_path: &NexosimPath = &request
.sink
.ok_or_else(|| to_error(ErrorCode::MissingArgument, "missing event sink path"))?
.segments
.into();
let sink = match event_sink_registry.get_entry_mut(sink_path) {
Ok(sink) => sink,
Err(_) => {
return Err(if event_sink_registry.has_sink(sink_path) {
to_error(
ErrorCode::SinkReadRace,
format!(
"attempting concurrent read operation on sink '{sink_path}'"
),
)
} else {
sink_not_found_error(sink_path)
});
}
};
let mut encoded_events = Vec::new();
while let Some(encoded_event) = sink.try_read() {
match encoded_event {
Ok(encoded_event) => encoded_events.push(encoded_event),
Err(e) => {
return Err(to_error(
ErrorCode::InvalidMessage,
format!(
"the event from sink '{sink_path}' could not be serialized from type '{}': {e}",
sink.event_type_name(),
),
));
}
}
}
Ok(encoded_events)
}
Self::Halted => Err(simulation_not_started_error()),
}
}
pub(crate) fn enable_sink(&mut self, request: EnableSinkRequest) -> Result<(), Error> {
match self {
Self::Started {
event_sink_registry,
} => {
let sink_path: &NexosimPath = &request
.sink
.ok_or_else(|| to_error(ErrorCode::MissingArgument, "missing event sink path"))?
.segments
.into();
if let Ok(sink) = event_sink_registry.get_entry_mut(sink_path) {
sink.enable();
Ok(())
} else {
Err(if event_sink_registry.has_sink(sink_path) {
to_error(
ErrorCode::SinkReadRace,
format!(
"attempting to enable sink '{sink_path}' while a read operation is ongoing"
),
)
} else {
sink_not_found_error(sink_path)
})
}
}
Self::Halted => Err(simulation_not_started_error()),
}
}
pub(crate) fn disable_sink(&mut self, request: DisableSinkRequest) -> Result<(), Error> {
match self {
Self::Started {
event_sink_registry,
} => {
let sink_path: &NexosimPath = &request
.sink
.ok_or_else(|| to_error(ErrorCode::MissingArgument, "missing event sink path"))?
.segments
.into();
if let Ok(sink) = event_sink_registry.get_entry_mut(sink_path) {
sink.disable();
Ok(())
} else {
Err(if event_sink_registry.has_sink(sink_path) {
to_error(
ErrorCode::SinkReadRace,
format!(
"attempting to disable sink '{sink_path}' while a read operation is ongoing"
),
)
} else {
sink_not_found_error(sink_path)
})
}
}
Self::Halted => Err(simulation_not_started_error()),
}
}
}
pub(crate) async fn monitor_service_read_event(
service: &Mutex<MonitorService>,
request: ReadEventRequest,
) -> Result<Vec<u8>, Error> {
let sink_path: &NexosimPath = &request
.sink
.ok_or_else(|| to_error(ErrorCode::MissingArgument, "missing event sink path"))?
.segments
.into();
let mut sink = match &mut *service.lock().unwrap() {
MonitorService::Started {
event_sink_registry,
} => {
match event_sink_registry.rent_entry(sink_path) {
Ok(sink) => sink,
Err(_) => {
return Err(if event_sink_registry.has_sink(sink_path) {
to_error(
ErrorCode::SinkReadRace,
format!("attempting concurrent read operation on sink '{sink_path}'"),
)
} else {
sink_not_found_error(sink_path)
});
}
}
}
MonitorService::Halted => return Err(simulation_not_started_error()),
};
let event = match request.timeout {
Some(timeout) => {
let timeout = to_positive_duration(timeout).ok_or_else(|| {
to_error(
ErrorCode::InvalidTimeout,
"the specified timeout is negative",
)
})?;
tokio_time::timeout(timeout, sink.next())
.await
.map_err(|_| {
to_error(
ErrorCode::SinkReadTimeout,
format!("the read operation on sink '{sink_path}' timed out",),
)
})?
}
None => sink.next().await,
};
let reply = event
.ok_or_else(|| {
to_error(
ErrorCode::SinkTerminated,
format!("sink '{sink_path}' has not sender"),
)
})
.and_then(|s| {
s.map_err(|e|
to_error(
ErrorCode::InvalidMessage,
format!(
"the event from sink '{sink_path}' could not be serialized from type '{}': {e}",
sink.event_type_name(),
),
))
});
match &mut *service.lock().unwrap() {
MonitorService::Started {
event_sink_registry,
} => {
event_sink_registry.replace_entry(sink_path, sink).unwrap(); }
MonitorService::Halted => return Err(simulation_not_started_error()),
};
reply
}
fn sink_not_found_error(sink: &NexosimPath) -> Error {
to_error(
ErrorCode::SinkNotFound,
format!("no sink is registered with the name '{sink}'"),
)
}