#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/doc_assets/proc.svg"))]
use super::adaptor::Adaptor;
use super::error::{BusError, ProcError};
use super::{main::Main, msg::InternalMsg, service::ProcService};
use config::{Config, ConfigError, File};
use glob::glob;
use log::{error, info, warn};
use prosa_utils::msg::tvf::Tvf;
use std::borrow::Cow;
use std::fmt::Debug;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio::{runtime, spawn};
pub use prosa_macros::proc;
pub use prosa_macros::proc_settings;
pub trait ProcSettings {
fn get_adaptor_config_path(&self) -> Option<&String>;
fn get_proc_restart_delay(&self) -> (Duration, u32);
fn get_adaptor_config<C>(&self) -> Result<C, ::config::ConfigError>
where
C: serde::de::Deserialize<'static>,
{
if let Some(config_path) = &self.get_adaptor_config_path() {
Config::builder()
.add_source(
glob(config_path)
.unwrap()
.map(|path| File::from(path.unwrap()))
.collect::<Vec<_>>(),
)
.build()?
.try_deserialize()
} else {
Err(ConfigError::NotFound(
"No configuration set for processor's adaptor".to_string(),
))
}
}
}
pub trait ProcBusParam {
fn get_proc_id(&self) -> u32;
fn name(&self) -> &str;
}
impl Debug for dyn ProcBusParam {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "Processor[{}] {}", self.get_proc_id(), self.name())
}
}
pub trait ProcEpilogue {
fn get_proc_restart_delay(&self) -> (std::time::Duration, u32);
fn remove_proc(
&self,
err: Option<Box<dyn ProcError + Send + Sync>>,
) -> impl std::future::Future<Output = Result<(), BusError>> + Send;
fn is_stopping(&self) -> bool;
}
#[derive(Debug, Clone)]
pub struct ProcParam<M>
where
M: Sized + Clone + Tvf,
{
id: u32,
queue: mpsc::Sender<InternalMsg<M>>,
main: Main<M>,
}
impl<M> ProcBusParam for ProcParam<M>
where
M: Sized + Clone + Tvf,
{
fn get_proc_id(&self) -> u32 {
self.id
}
fn name(&self) -> &str {
self.main.name()
}
}
impl<M> ProcParam<M>
where
M: Sized + Clone + Debug + Tvf + Default + 'static + std::marker::Send + std::marker::Sync,
{
pub fn new(id: u32, queue: mpsc::Sender<InternalMsg<M>>, main: Main<M>) -> ProcParam<M> {
ProcParam { id, queue, main }
}
pub fn get_service_queue(&self) -> mpsc::Sender<InternalMsg<M>> {
self.queue.clone()
}
pub async fn add_proc(&self) -> Result<(), BusError> {
self.main
.add_proc_queue(ProcService::new_proc(self, 0))
.await?;
Ok(())
}
pub async fn remove_proc(
&self,
err: Option<Box<dyn ProcError + Send + Sync>>,
) -> Result<(), BusError> {
self.main.remove_proc(self.id, err).await?;
Ok(())
}
pub async fn add_proc_queue(
&self,
queue: mpsc::Sender<InternalMsg<M>>,
queue_id: u32,
) -> Result<(), BusError> {
self.main
.add_proc_queue(ProcService::new(self, queue, queue_id))
.await?;
Ok(())
}
pub async fn remove_proc_queue(&self, queue_id: u32) -> Result<(), BusError> {
self.main.remove_proc_queue(self.id, queue_id).await?;
Ok(())
}
pub async fn add_service_proc(&self, names: Vec<String>) -> Result<(), BusError> {
self.main
.add_service_proc(names, self.get_proc_id())
.await?;
Ok(())
}
pub async fn add_service(&self, names: Vec<String>, queue_id: u32) -> Result<(), BusError> {
self.main
.add_service(names, self.get_proc_id(), queue_id)
.await?;
Ok(())
}
pub async fn remove_service_proc(&self, names: Vec<String>) -> Result<(), BusError> {
self.main
.remove_service_proc(names, self.get_proc_id())
.await?;
Ok(())
}
pub async fn remove_service(&self, names: Vec<String>, queue_id: u32) -> Result<(), BusError> {
self.main
.remove_service(names, self.get_proc_id(), queue_id)
.await?;
Ok(())
}
pub fn is_stopping(&self) -> bool {
self.main.is_stopping()
}
pub fn meter(&self, name: impl Into<Cow<'static, str>>) -> opentelemetry::metrics::Meter {
self.main.meter(name)
}
pub fn logger(&self, name: impl Into<Cow<'static, str>>) -> opentelemetry_sdk::logs::Logger {
self.main.logger(name)
}
pub fn tracer(&self, name: impl Into<Cow<'static, str>>) -> opentelemetry_sdk::trace::Tracer {
self.main.tracer(name)
}
}
pub trait ProcConfig<M>
where
M: 'static
+ std::marker::Send
+ std::marker::Sync
+ std::marker::Sized
+ std::clone::Clone
+ std::fmt::Debug
+ prosa_utils::msg::tvf::Tvf
+ std::default::Default,
{
type Settings;
fn create(proc_id: u32, main: Main<M>, settings: Self::Settings) -> Self;
fn create_raw(proc_id: u32, main: Main<M>) -> Self
where
Self: Sized,
Self::Settings: Default,
{
Self::create(proc_id, main, Self::Settings::default())
}
fn get_proc_param(&self) -> &ProcParam<M>;
}
macro_rules! proc_run {
( $self:ident, $proc_name:ident ) => {
info!(
"Run processor {} on {} threads",
$proc_name,
$self.get_proc_threads()
);
let proc_restart_delay = $self.get_proc_restart_delay();
let mut wait_time = proc_restart_delay.0;
loop {
if let Err(proc_err) = $self.internal_run($proc_name.clone()).await {
if $self.is_stopping() {
let _ = $self.remove_proc(None).await;
return;
}
let recovery_duration = proc_err.recovery_duration();
if proc_err.recoverable() {
warn!(
"Processor {} encounter an error `{}`. Will restart after {}ms",
$proc_name,
proc_err,
(wait_time + recovery_duration).as_millis()
);
if $self.remove_proc(Some(proc_err)).await.is_err() {
return;
}
} else {
error!(
"Processor {} encounter a fatal error `{}`",
$proc_name, proc_err
);
let _ = $self.remove_proc(Some(proc_err)).await;
return;
}
sleep(wait_time + recovery_duration).await;
} else {
let _ = $self.remove_proc(None).await;
return;
}
if wait_time.as_secs() < proc_restart_delay.1 as u64 {
wait_time += proc_restart_delay.0;
wait_time *= 2;
}
}
};
}
#[cfg_attr(doc, aquamarine::aquamarine)]
pub trait Proc<A>: ProcEpilogue
where
A: Adaptor,
{
fn internal_run(
&mut self,
name: String,
) -> impl std::future::Future<Output = Result<(), Box<dyn ProcError + Send + Sync>>> + Send;
fn get_proc_threads(&self) -> usize {
1
}
fn run(mut self, proc_name: String)
where
Self: Sized + 'static + std::marker::Send,
{
match self.get_proc_threads() {
0 => {
spawn(async move {
proc_run!(self, proc_name);
});
}
1 => {
std::thread::Builder::new()
.name(proc_name.clone())
.spawn(move || {
runtime::Builder::new_current_thread()
.enable_all()
.thread_name(proc_name.clone())
.build()
.unwrap()
.block_on(async {
proc_run!(self, proc_name);
})
})
.unwrap();
}
n => {
std::thread::Builder::new()
.name(proc_name.clone())
.spawn(move || {
runtime::Builder::new_multi_thread()
.worker_threads(n)
.enable_all()
.thread_name(proc_name.clone())
.build()
.unwrap()
.block_on(async {
proc_run!(self, proc_name);
})
})
.unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use prosa_macros::proc_settings;
use serde::Serialize;
extern crate self as prosa;
#[test]
fn test_proc_settings() {
#[proc_settings]
#[derive(Debug, Serialize)]
struct TestProcSettings {
name: String,
}
#[proc_settings]
impl Default for TestProcSettings {
fn default() -> Self {
let _test_settings = TestProcSettings {
name: "test".into(),
};
TestProcSettings {
name: "test".into(),
}
}
}
let test_proc_settings = TestProcSettings::default();
assert_eq!("test", test_proc_settings.name);
}
}