use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use prosa_utils::msg::tvf::Tvf;
use tokio::sync::mpsc;
use tracing::span;
use tracing::{Level, Span, event};
use super::{
error::ProcError,
service::{ProcService, ServiceError, ServiceTable},
};
#[derive(Debug)]
pub enum InternalMainMsg<M>
where
M: Sized + Clone + Tvf,
{
NewProcQueue(ProcService<M>),
DeleteProc(u32, Option<Box<dyn ProcError + Send + Sync>>),
DeleteProcQueue(u32, u32),
NewProcService(Vec<String>, u32),
NewService(Vec<String>, u32, u32),
DeleteProcService(Vec<String>, u32),
DeleteService(Vec<String>, u32, u32),
Command(String),
Shutdown(String),
}
#[derive(Debug)]
pub enum InternalMsg<M>
where
M: Sized + Clone + Tvf,
{
Request(RequestMsg<M>),
Response(ResponseMsg<M>),
Error(ErrorMsg<M>),
Command(String),
Config,
Service(Arc<ServiceTable<M>>),
Shutdown,
}
#[cfg_attr(doc, aquamarine::aquamarine)]
pub trait Msg<M>
where
M: Sized + Clone + Tvf,
{
fn get_id(&self) -> u64;
fn get_service(&self) -> &String;
fn get_span(&self) -> &Span;
fn get_span_mut(&mut self) -> &mut Span;
fn enter_span(&self) -> span::Entered;
fn elapsed(&self) -> Duration;
fn get_data(&self) -> &M;
fn get_data_mut(&mut self) -> &mut M;
}
#[derive(Debug)]
pub struct RequestMsg<M>
where
M: Sized + Clone + Tvf,
{
id: u64,
service: String,
span: Span,
data: M,
begin_time: SystemTime,
response_queue: mpsc::Sender<InternalMsg<M>>,
}
impl<M> Msg<M> for RequestMsg<M>
where
M: Sized + Clone + Tvf,
{
fn get_id(&self) -> u64 {
self.id
}
fn get_service(&self) -> &String {
&self.service
}
fn get_span(&self) -> &Span {
&self.span
}
fn get_span_mut(&mut self) -> &mut Span {
&mut self.span
}
fn enter_span(&self) -> span::Entered<'_> {
self.span.enter()
}
fn elapsed(&self) -> Duration {
self.begin_time.elapsed().unwrap_or(Duration::new(0, 0))
}
fn get_data(&self) -> &M {
&self.data
}
fn get_data_mut(&mut self) -> &mut M {
&mut self.data
}
}
impl<M> RequestMsg<M>
where
M: Sized + Clone + Tvf,
{
pub fn new(
id: u64,
service: String,
data: M,
response_queue: mpsc::Sender<InternalMsg<M>>,
) -> Self {
let begin_time = SystemTime::now();
let span = span!(Level::INFO, "prosa::Msg", service = service);
RequestMsg {
id,
service,
data,
begin_time,
span,
response_queue,
}
}
pub async fn return_to_sender(
self,
resp: M,
) -> Result<(), tokio::sync::mpsc::error::SendError<InternalMsg<M>>> {
self.response_queue
.send(InternalMsg::Response(ResponseMsg {
id: self.id,
service: self.service,
span: self.span,
response_time: self.begin_time,
data: resp,
}))
.await
}
pub async fn return_error_to_sender(
self,
data: Option<M>,
err: ServiceError,
) -> Result<(), tokio::sync::mpsc::error::SendError<InternalMsg<M>>> {
self.response_queue
.send(InternalMsg::Error(ErrorMsg {
id: self.id,
service: self.service,
span: self.span,
error_time: self.begin_time,
data: data.unwrap_or(self.data),
err,
}))
.await
}
}
#[derive(Debug)]
pub struct ResponseMsg<M>
where
M: Sized + Clone + Tvf,
{
id: u64,
service: String,
span: Span,
response_time: SystemTime,
data: M,
}
impl<M> Msg<M> for ResponseMsg<M>
where
M: Sized + Clone + Tvf,
{
fn get_id(&self) -> u64 {
self.id
}
fn get_service(&self) -> &String {
&self.service
}
fn get_span(&self) -> &Span {
&self.span
}
fn get_span_mut(&mut self) -> &mut Span {
&mut self.span
}
fn enter_span(&self) -> span::Entered<'_> {
self.span.enter()
}
fn elapsed(&self) -> Duration {
self.response_time.elapsed().unwrap_or(Duration::new(0, 0))
}
fn get_data(&self) -> &M {
&self.data
}
fn get_data_mut(&mut self) -> &mut M {
&mut self.data
}
}
#[derive(Debug)]
pub struct ErrorMsg<M>
where
M: Sized + Clone + Tvf,
{
id: u64,
service: String,
span: Span,
error_time: SystemTime,
data: M,
err: ServiceError,
}
impl<M> Msg<M> for ErrorMsg<M>
where
M: Sized + Clone + Tvf,
{
fn get_id(&self) -> u64 {
self.id
}
fn get_service(&self) -> &String {
&self.service
}
fn get_span(&self) -> &Span {
&self.span
}
fn get_span_mut(&mut self) -> &mut Span {
&mut self.span
}
fn enter_span(&self) -> span::Entered<'_> {
let enter = self.span.enter();
event!(Level::WARN, "{}", self.err);
enter
}
fn elapsed(&self) -> Duration {
self.error_time.elapsed().unwrap_or(Duration::new(0, 0))
}
fn get_data(&self) -> &M {
&self.data
}
fn get_data_mut(&mut self) -> &mut M {
&mut self.data
}
}
impl<M> ErrorMsg<M>
where
M: Sized + Clone + Tvf,
{
pub fn new(id: u64, service: String, span: Span, data: M, err: ServiceError) -> Self {
ErrorMsg {
id,
service,
span,
error_time: SystemTime::now(),
data,
err,
}
}
pub fn get_err(&self) -> &ServiceError {
&self.err
}
}