use std::sync::Arc;
use prosa_macros::proc_settings;
use serde::{Deserialize, Serialize};
use tracing::debug;
use crate::core::adaptor::{Adaptor, MaybeAsync};
use crate::core::error::{BusError, ProcError};
use crate::core::msg::{InternalMsg, Msg};
use crate::core::proc::{Proc, ProcBusParam, proc};
use super::adaptor::StubAdaptor;
extern crate self as prosa;
#[proc_settings]
#[derive(Default, Debug, Deserialize, Serialize, Clone)]
pub struct StubSettings {
pub service_names: Vec<String>,
}
impl StubSettings {
pub fn new(service_names: Vec<String>) -> StubSettings {
StubSettings {
service_names,
..Default::default()
}
}
pub fn add_service_name(&mut self, service_name: String) {
self.service_names.push(service_name);
}
}
#[proc(settings = prosa::stub::proc::StubSettings)]
pub struct StubProc {}
#[proc]
impl<A> Proc<A> for StubProc
where
A: 'static + Adaptor + StubAdaptor<M> + std::marker::Send + std::marker::Sync,
{
async fn internal_run(&mut self) -> Result<(), Box<dyn ProcError + Send + Sync>> {
let adaptor = Arc::new(A::new(self)?);
self.proc.add_proc().await?;
self.proc
.add_service_proc(self.settings.service_names.clone())
.await?;
loop {
if let Some(msg) = self.internal_rx_queue.recv().await {
match msg {
InternalMsg::Request(mut msg) => {
let request_data = msg.take_data().ok_or(BusError::NoData)?;
let enter_span = msg.enter_span();
debug!(name: "stub_proc_request", target: "prosa::stub::proc", parent: msg.get_span(), proc_name = self.name(), stub_service = msg.get_service(), "{:?}", msg.get_data());
match adaptor.process_request(msg.get_service(), request_data) {
MaybeAsync::Ready(resp) => {
debug!(name: "stub_proc_response", target: "prosa::stub::proc", parent: msg.get_span(), stub_service = msg.get_service(), "{resp:?}");
drop(enter_span);
let _ = msg.return_result_to_sender(resp);
}
MaybeAsync::Future(future_resp) => {
drop(enter_span);
tokio::spawn(async move {
let enter_span = msg.enter_span();
let resp = future_resp.await;
debug!(name: "stub_proc_response", target: "prosa::stub::proc", parent: msg.get_span(), stub_service = msg.get_service(), "{resp:?}");
drop(enter_span);
let _ = msg.return_result_to_sender(resp);
});
}
}
}
InternalMsg::Response(msg) => panic!(
"The stub processor {} receive a response {:?}",
self.get_proc_id(),
msg
),
InternalMsg::Error(err) => panic!(
"The stub processor {} receive an error {:?}",
self.get_proc_id(),
err
),
InternalMsg::Command(_) => todo!(),
InternalMsg::Config => todo!(),
InternalMsg::Service(table) => self.service = table,
InternalMsg::Shutdown => {
adaptor.terminate();
self.proc.remove_proc(None).await?;
return Ok(());
}
}
}
}
}
}