use std::time::Duration;
use opentelemetry::{KeyValue, metrics::Histogram};
use prosa_macros::{proc, proc_settings};
use serde::{Deserialize, Serialize};
use tracing::debug;
use crate::{
core::{
adaptor::Adaptor,
error::ProcError,
msg::{InternalMsg, Msg, RequestMsg},
proc::{Proc, ProcBusParam as _},
service::ServiceError,
},
event::speed::Regulator,
};
use super::adaptor::InjAdaptor;
extern crate self as prosa;
#[proc_settings]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct InjSettings {
pub service_name: String,
#[serde(default = "InjSettings::default_max_speed")]
pub max_speed: f64,
#[serde(default = "InjSettings::default_timeout_threshold")]
pub timeout_threshold: Duration,
#[serde(default = "InjSettings::default_max_concurrents_send")]
pub max_concurrents_send: u32,
#[serde(default = "InjSettings::default_speed_interval")]
pub speed_interval: u16,
}
impl InjSettings {
fn default_max_speed() -> f64 {
5.0
}
fn default_timeout_threshold() -> Duration {
Duration::new(10, 0)
}
fn default_max_concurrents_send() -> u32 {
1
}
fn default_speed_interval() -> u16 {
15
}
pub fn new(service_name: String) -> InjSettings {
InjSettings {
service_name,
max_speed: InjSettings::default_max_speed(),
timeout_threshold: InjSettings::default_timeout_threshold(),
max_concurrents_send: InjSettings::default_max_concurrents_send(),
speed_interval: InjSettings::default_speed_interval(),
..Default::default()
}
}
pub fn set_service_name(&mut self, service_name: String) {
self.service_name = service_name;
}
pub fn get_regulator(&self) -> Regulator {
Regulator::new(
self.max_speed,
self.timeout_threshold,
self.max_concurrents_send,
self.speed_interval,
)
}
}
#[proc_settings]
impl Default for InjSettings {
fn default() -> InjSettings {
InjSettings {
service_name: Default::default(),
max_speed: InjSettings::default_max_speed(),
timeout_threshold: InjSettings::default_timeout_threshold(),
max_concurrents_send: InjSettings::default_max_concurrents_send(),
speed_interval: InjSettings::default_speed_interval(),
}
}
}
#[proc(settings = prosa::inj::proc::InjSettings)]
pub struct InjProc {}
#[proc]
impl InjProc {
async fn process_internal<A>(
&mut self,
msg: InternalMsg<M>,
adaptor: &mut A,
regulator: &mut Regulator,
next_transaction: &mut Option<M>,
meter_trans_duration: &Histogram<f64>,
) -> Result<(), Box<dyn ProcError + Send + Sync>>
where
A: Adaptor + InjAdaptor<M> + std::marker::Send + std::marker::Sync,
{
match msg {
InternalMsg::Request(msg) => panic!(
"The inj processor {} receive a request {:?}",
self.get_proc_id(),
msg
),
InternalMsg::Response(mut msg) => {
let response_data = msg.take_data();
let _enter_span = msg.enter_span();
meter_trans_duration.record(
msg.elapsed().as_secs_f64(),
&[
KeyValue::new("proc", self.name().to_string()),
KeyValue::new("service", msg.get_service().clone()),
KeyValue::new("err_code", "0".to_string()),
],
);
if let Some(response) = response_data {
debug!(name: "resp_inj_proc", target: "prosa::inj::proc", proc_name = self.name(), service = msg.get_service(), "{:?}", response);
adaptor.process_response(response, msg.get_service())?;
regulator.notify_receive_transaction(msg.elapsed());
let _ = next_transaction.get_or_insert(adaptor.build_transaction());
}
}
InternalMsg::Error(err_msg) => {
let enter_span = err_msg.enter_span();
meter_trans_duration.record(
err_msg.elapsed().as_secs_f64(),
&[
KeyValue::new("proc", self.name().to_string()),
KeyValue::new("service", err_msg.get_service().clone()),
KeyValue::new("err_code", err_msg.get_err().get_code().to_string()),
],
);
debug!(name: "resp_err_inj_proc", target: "prosa::inj::proc", proc_name = self.name(), service = err_msg.get_service(), "{:?}", err_msg.get_err());
match err_msg.get_err() {
ServiceError::Timeout(_, overhead) => {
regulator.add_tick_overhead(Duration::from_millis(*overhead));
}
ServiceError::UnableToReachService(_) => {
regulator.add_tick_overhead(self.settings.timeout_threshold);
}
_ => {
drop(enter_span);
return Err(Box::new(err_msg.into_err()));
}
}
regulator.notify_receive_transaction(err_msg.elapsed());
let _ = next_transaction.get_or_insert(adaptor.build_transaction());
}
InternalMsg::Command(_) => todo!(),
InternalMsg::Config => todo!(),
InternalMsg::Service(table) => self.service = table,
InternalMsg::Shutdown => {
adaptor.terminate();
self.proc.remove_proc(None).await?;
return Ok(());
}
}
Ok(())
}
}
#[proc]
impl<A> Proc<A> for InjProc
where
A: Adaptor + InjAdaptor<M> + std::marker::Send + std::marker::Sync,
{
async fn internal_run(&mut self) -> Result<(), Box<dyn ProcError + Send + Sync>> {
let mut adaptor = A::new(self)?;
let meter = self.proc.meter("prosa_inj");
let meter_trans_duration = meter
.f64_histogram("prosa_inj_request_duration")
.with_description("inj transaction processing duration")
.with_unit("seconds")
.build();
self.proc.add_proc().await?;
let mut regulator = self.settings.get_regulator();
let mut next_transaction = Some(adaptor.build_transaction());
while !self.service.exist_proc_service(&self.settings.service_name) {
if let Some(msg) = self.internal_rx_queue.recv().await {
self.process_internal(
msg,
&mut adaptor,
&mut regulator,
&mut next_transaction,
&meter_trans_duration,
)
.await?;
}
}
self.service
.get_proc_service(&self.settings.service_name)
.unwrap()
.proc_queue
.send(InternalMsg::Request(RequestMsg::new(
self.settings.service_name.clone(),
next_transaction.take().unwrap(),
self.proc.get_service_queue(),
)))
.await?;
regulator.notify_send_transaction();
loop {
tokio::select! {
Some(msg) = self.internal_rx_queue.recv() => {
self.process_internal(msg, &mut adaptor, &mut regulator, &mut next_transaction, &meter_trans_duration).await?;
}
_ = regulator.tick(), if self.service.exist_proc_service(&self.settings.service_name) => {
if let Some(service) = self.service.get_proc_service(&self.settings.service_name) {
let trans = if let Some(transaction) = next_transaction.take() {
RequestMsg::new(self.settings.service_name.clone(), transaction, self.proc.get_service_queue())
} else {
RequestMsg::new(self.settings.service_name.clone(), adaptor.build_transaction(), self.proc.get_service_queue())
};
debug!(name: "inj_proc", target: "prosa::inj::proc", parent: trans.get_span(), proc_name = self.name(), service = self.settings.service_name, "{:?}", trans.get_data());
service.proc_queue.send(InternalMsg::Request(trans)).await?;
regulator.notify_send_transaction();
}
},
};
}
}
}