use crate::core::error::ProcError;
use super::{
msg::InternalMsg,
proc::{ProcBusParam, ProcParam},
};
use opentelemetry::{KeyValue, metrics::AsyncInstrument};
use prosa_utils::msg::tvf::{Tvf, TvfError};
use std::{
collections::{HashMap, HashSet},
fmt::{self, Debug},
sync::atomic,
};
use thiserror::Error;
use tokio::sync::mpsc;
#[derive(Debug, Default)]
pub struct ServiceTable<M>
where
M: Sized + Clone + Tvf,
{
table: HashMap<Box<str>, (Vec<ProcService<M>>, atomic::AtomicU64)>,
}
impl<M> ServiceTable<M>
where
M: Sized + Clone + Tvf,
{
pub fn is_empty(&self) -> bool {
self.table.is_empty()
}
pub fn len(&self) -> usize {
self.table.len()
}
pub(crate) fn observe_metrics(&self, services_meter: &dyn AsyncInstrument<u64>) {
for (name, (services, _)) in self.table.iter() {
services_meter.observe(
services.len() as u64,
&[
KeyValue::new("type", "node"),
KeyValue::new("id", name.to_string()),
],
);
let mut processor_link = HashSet::new();
for service in services {
if processor_link.insert(service.proc_id) {
services_meter.observe(
1,
&[
KeyValue::new("type", "link"),
KeyValue::new("id", format!("{name}/{}", service.proc_id)),
KeyValue::new("source", name.to_string()),
KeyValue::new("target", service.proc_id as i64),
],
);
}
}
}
}
pub fn exist_proc_service(&self, name: &str) -> bool {
if let Some((services, _)) = self.table.get(name) {
!services.is_empty()
} else {
false
}
}
pub fn get_proc_service(&self, name: &str) -> Option<&ProcService<M>> {
self.table.get(name).and_then(|(s, rr)| match s.len() {
2.. => s.get(rr.fetch_add(1, atomic::Ordering::Relaxed) as usize % s.len()),
1 => s.first(),
_ => None,
})
}
pub fn add_service(&mut self, name: &str, proc_service: ProcService<M>) {
if let Some((services, _)) = self.table.get_mut(name) {
if !services
.iter()
.any(|s| s.proc_id == proc_service.proc_id && s.queue_id == proc_service.queue_id)
{
services.push(proc_service);
}
} else {
self.table
.insert(name.into(), (vec![proc_service], atomic::AtomicU64::new(0)));
}
}
pub fn remove_service_proc(&mut self, name: &str, proc_id: u32) {
if let Some((services, _)) = self.table.get_mut(name) {
services.retain(|s| s.proc_id != proc_id);
}
}
pub fn remove_service(&mut self, name: &str, proc_id: u32, queue_id: u32) {
if let Some((services, _)) = self.table.get_mut(name) {
services.retain(|s| s.proc_id != proc_id || s.queue_id != queue_id);
}
}
pub fn remove_proc_services(&mut self, proc_id: u32) {
for (services, _) in self.table.values_mut() {
services.retain(|s| s.proc_id != proc_id);
}
}
pub fn remove_proc_queue_services(&mut self, proc_id: u32, queue_id: u32) {
for (services, _) in self.table.values_mut() {
services.retain(|s| s.proc_id != proc_id || s.queue_id != queue_id);
}
}
}
impl<M> Clone for ServiceTable<M>
where
M: Sized + Clone + Tvf,
{
fn clone(&self) -> Self {
Self {
table: self
.table
.iter()
.map(|(k, (s, rr))| {
let rr = atomic::AtomicU64::new(if !s.is_empty() {
rr.load(atomic::Ordering::Relaxed) % s.len() as u64
} else {
0
});
(k.clone(), (s.clone(), rr))
})
.collect(),
}
}
}
impl<M> fmt::Display for ServiceTable<M>
where
M: Sized + Clone + Tvf,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (name, (services, _)) in self.table.iter() {
writeln!(f, "Service name: {name}")?;
for service in services {
writeln!(f, "\tProcessor ID: {}", service.proc_id)?;
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ProcService<M>
where
M: Sized + Clone + Tvf,
{
proc_id: u32,
proc_name: Box<str>,
queue_id: u32,
pub proc_queue: mpsc::Sender<InternalMsg<M>>,
}
impl<M> ProcService<M>
where
M: Sized + Clone + Debug + Tvf + Default + 'static + std::marker::Send + std::marker::Sync,
{
pub fn new(
proc: &ProcParam<M>,
proc_queue: mpsc::Sender<InternalMsg<M>>,
queue_id: u32,
) -> ProcService<M> {
ProcService {
proc_id: proc.get_proc_id(),
proc_name: proc.name().into(),
queue_id,
proc_queue,
}
}
pub fn new_proc(proc: &ProcParam<M>, queue_id: u32) -> ProcService<M> {
ProcService {
proc_id: proc.get_proc_id(),
proc_name: proc.name().into(),
queue_id,
proc_queue: proc.get_service_queue(),
}
}
pub fn get_proc_id(&self) -> u32 {
self.proc_id
}
pub fn get_queue_id(&self) -> u32 {
self.queue_id
}
}
impl<M> ProcBusParam for ProcService<M>
where
M: Sized + Clone + Tvf,
{
fn get_proc_id(&self) -> u32 {
self.proc_id
}
fn name(&self) -> &str {
self.proc_name.as_ref()
}
}
impl<M> PartialEq for ProcService<M>
where
M: Sized + Clone + Tvf,
{
fn eq(&self, other: &Self) -> bool {
self.proc_id == other.proc_id && self.queue_id == other.queue_id
}
}
#[derive(Debug, Eq, Error, PartialEq)]
pub enum ServiceError {
#[error("No error on the service `{0}`")]
NoError(String),
#[error("The service `{0}` can't be reach")]
UnableToReachService(String),
#[error("The service `{0}` didn't respond before {1} ms")]
Timeout(String, u64),
#[error("The service `{0}` made a protocol error")]
ProtocolError(String),
}
impl ServiceError {
pub fn get_code(&self) -> u8 {
match self {
ServiceError::NoError(_) => 0,
ServiceError::UnableToReachService(_) => 1,
ServiceError::Timeout(_, _) => 2,
ServiceError::ProtocolError(_) => 3,
}
}
}
impl ProcError for ServiceError {
fn recoverable(&self) -> bool {
true
}
}
impl From<TvfError> for ServiceError {
fn from(err: TvfError) -> Self {
match err {
TvfError::FieldNotFound(id) => {
ServiceError::ProtocolError(format!("on TVF field {id}"))
}
TvfError::TypeMismatch => ServiceError::ProtocolError(String::from("on TVF type")),
TvfError::ConvertionError(str) => {
ServiceError::ProtocolError(format!("on TVF convertion {str}"))
}
TvfError::SerializationError(str) => {
ServiceError::ProtocolError(format!("on TVF serialization {str}"))
}
}
}
}