use std::{
sync::{Arc, atomic},
time::{Duration, SystemTime},
};
use super::{
error::{BusError, ProcError},
queue::{InternalMsgQueue, SendError},
service::{ProcService, ServiceError, ServiceTable},
};
use tracing::{Level, Span, event, info_span, span};
pub use prosa_utils::msg::tvf::Tvf;
#[derive(Debug)]
pub enum InternalMainMsg<M>
where
M: Sized + Clone + Tvf,
{
NewProcQueue(ProcService<M>),
DeleteProc(u32, Option<Box<dyn ProcError + Send + Sync>>),
DeleteProcQueue(u32, u32),
NewProcService(Vec<String>, u32),
NewService(Vec<String>, u32, u32),
DeleteProcService(Vec<String>, u32),
DeleteService(Vec<String>, u32, u32),
Command(String),
Shutdown(String),
}
#[derive(Debug)]
pub enum InternalMsg<M>
where
M: Sized + Clone + Tvf,
{
Request(RequestMsg<M>),
Response(ResponseMsg<M>),
Error(ErrorMsg<M>),
Command(String),
Config,
Service(Arc<ServiceTable<M>>),
Shutdown,
}
#[cfg_attr(doc, aquamarine::aquamarine)]
pub trait Msg<M>
where
M: Sized + Clone + Tvf,
{
fn get_id(&self) -> u64;
fn get_service(&self) -> &String;
fn get_span(&self) -> &Span;
fn get_span_mut(&mut self) -> &mut Span;
fn enter_span(&self) -> span::Entered<'_>;
fn elapsed(&self) -> Duration;
fn get_data(&self) -> Result<&M, BusError>;
fn get_data_mut(&mut self) -> Result<&mut M, BusError>;
fn take_data(&mut self) -> Option<M>;
fn take_data_if<P>(&mut self, predicate: P) -> Option<M>
where
P: FnOnce(&mut M) -> bool;
}
pub(crate) static ATOMIC_INTERNAL_MSG_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
#[derive(Debug)]
pub struct RequestMsg<M>
where
M: Sized + Clone + Tvf,
{
id: u64,
service: String,
span: Span,
data: Option<M>,
begin_time: SystemTime,
response_queue: InternalMsgQueue<M>,
}
impl<M> Msg<M> for RequestMsg<M>
where
M: Sized + Clone + Tvf,
{
fn get_id(&self) -> u64 {
self.id
}
fn get_service(&self) -> &String {
&self.service
}
fn get_span(&self) -> &Span {
&self.span
}
fn get_span_mut(&mut self) -> &mut Span {
&mut self.span
}
fn enter_span(&self) -> span::Entered<'_> {
self.span.enter()
}
fn elapsed(&self) -> Duration {
self.begin_time.elapsed().unwrap_or(Duration::new(0, 0))
}
fn get_data(&self) -> Result<&M, BusError> {
self.data.as_ref().ok_or(BusError::NoData)
}
fn get_data_mut(&mut self) -> Result<&mut M, BusError> {
self.data.as_mut().ok_or(BusError::NoData)
}
fn take_data(&mut self) -> Option<M> {
self.data.take()
}
fn take_data_if<P>(&mut self, predicate: P) -> Option<M>
where
P: FnOnce(&mut M) -> bool,
{
self.data.take_if(predicate)
}
}
impl<M> RequestMsg<M>
where
M: Sized + Clone + Tvf,
{
pub fn new(service: String, data: M, response_queue: impl Into<InternalMsgQueue<M>>) -> Self {
let begin_time = SystemTime::now();
let span = info_span!("prosa::Msg", service = service);
RequestMsg {
id: ATOMIC_INTERNAL_MSG_ID.fetch_add(1, atomic::Ordering::Relaxed),
service,
data: Some(data),
begin_time,
span,
response_queue: response_queue.into(),
}
}
pub fn new_with_trace_id(
service: String,
data: M,
response_queue: impl Into<InternalMsgQueue<M>>,
trace_id: tracing::span::Id,
) -> Self {
let begin_time = SystemTime::now();
let span = info_span!(parent: trace_id, "prosa::Msg", service = service);
RequestMsg {
id: ATOMIC_INTERNAL_MSG_ID.fetch_add(1, atomic::Ordering::Relaxed),
service,
data: Some(data),
begin_time,
span,
response_queue: response_queue.into(),
}
}
pub fn return_to_sender(mut self, resp: M) -> Result<(), SendError<M>> {
let response_queue = self.response_queue.take();
response_queue
.send(InternalMsg::Response(ResponseMsg::from_request(self, resp)))
.map_err(|e| {
e.map(|i| {
if let InternalMsg::Response(mut resp) = i {
resp.take_data().unwrap()
} else {
panic!("Expected InternalMsg::Response")
}
})
})
}
pub fn return_error_to_sender(
mut self,
data: Option<M>,
err: ServiceError,
) -> Result<(), SendError<Option<M>>> {
let response_queue = self.response_queue.take();
response_queue
.send(InternalMsg::Error(ErrorMsg::from_request(self, data, err)))
.map_err(|e| {
e.map(|i| {
if let InternalMsg::Error(mut err) = i {
err.take_data()
} else {
panic!("Expected InternalMsg::Error")
}
})
})
}
pub fn return_result_to_sender(
self,
result: Result<M, ServiceError>,
) -> Result<(), SendError<Option<M>>> {
match result {
Ok(resp) => self.return_to_sender(resp).map_err(|e| e.map(|m| Some(m))),
Err(err) => self.return_error_to_sender(None, err),
}
}
}
#[derive(Debug)]
pub struct ResponseMsg<M>
where
M: Sized + Clone + Tvf,
{
id: u64,
service: String,
span: Span,
response_time: SystemTime,
data: Option<M>,
}
impl<M> ResponseMsg<M>
where
M: Sized + Clone + Tvf,
{
pub fn from_request(request: RequestMsg<M>, resp_data: M) -> Self {
ResponseMsg {
id: request.id,
service: request.service,
span: request.span,
response_time: request.begin_time,
data: Some(resp_data),
}
}
}
impl<M> Msg<M> for ResponseMsg<M>
where
M: Sized + Clone + Tvf,
{
fn get_id(&self) -> u64 {
self.id
}
fn get_service(&self) -> &String {
&self.service
}
fn get_span(&self) -> &Span {
&self.span
}
fn get_span_mut(&mut self) -> &mut Span {
&mut self.span
}
fn enter_span(&self) -> span::Entered<'_> {
self.span.enter()
}
fn elapsed(&self) -> Duration {
self.response_time.elapsed().unwrap_or(Duration::new(0, 0))
}
fn get_data(&self) -> Result<&M, BusError> {
self.data.as_ref().ok_or(BusError::NoData)
}
fn get_data_mut(&mut self) -> Result<&mut M, BusError> {
self.data.as_mut().ok_or(BusError::NoData)
}
fn take_data(&mut self) -> Option<M> {
self.data.take()
}
fn take_data_if<P>(&mut self, predicate: P) -> Option<M>
where
P: FnOnce(&mut M) -> bool,
{
self.data.take_if(predicate)
}
}
#[derive(Debug)]
pub struct ErrorMsg<M>
where
M: Sized + Clone + Tvf,
{
id: u64,
service: String,
span: Span,
error_time: SystemTime,
data: Option<M>,
err: ServiceError,
}
impl<M> ErrorMsg<M>
where
M: Sized + Clone + Tvf,
{
pub fn from_request(request: RequestMsg<M>, data: Option<M>, err: ServiceError) -> Self {
ErrorMsg {
id: request.id,
service: request.service,
span: request.span,
error_time: request.begin_time,
data: data.or(request.data),
err,
}
}
pub fn get_err(&self) -> &ServiceError {
&self.err
}
pub fn into_err(self) -> ServiceError {
self.err
}
}
impl<M> Msg<M> for ErrorMsg<M>
where
M: Sized + Clone + Tvf,
{
fn get_id(&self) -> u64 {
self.id
}
fn get_service(&self) -> &String {
&self.service
}
fn get_span(&self) -> &Span {
&self.span
}
fn get_span_mut(&mut self) -> &mut Span {
&mut self.span
}
fn enter_span(&self) -> span::Entered<'_> {
let enter = self.span.enter();
event!(Level::WARN, "{}", self.err);
enter
}
fn elapsed(&self) -> Duration {
self.error_time.elapsed().unwrap_or(Duration::new(0, 0))
}
fn get_data(&self) -> Result<&M, BusError> {
self.data.as_ref().ok_or(BusError::NoData)
}
fn get_data_mut(&mut self) -> Result<&mut M, BusError> {
self.data.as_mut().ok_or(BusError::NoData)
}
fn take_data(&mut self) -> Option<M> {
self.data.take()
}
fn take_data_if<P>(&mut self, predicate: P) -> Option<M>
where
P: FnOnce(&mut M) -> bool,
{
self.data.take_if(predicate)
}
}