use core::clone::Clone;
use core::default::Default;
use core::future::Future;
use core::pin::Pin;
use core::time::Duration;
use std::boxed::Box;
use futures_util::StreamExt;
use octseq::Octets;
use tracing::trace;
use crate::base::message_builder::AdditionalBuilder;
use crate::base::{Message, StreamTarget};
use super::message::Request;
use super::service::{Service, ServiceFeedback, ServiceResult};
use super::util::mk_error_response;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum InvokerStatus {
Normal,
InTransaction,
Aborting,
}
pub trait ServiceInvoker<RequestOctets, Svc, RequestMeta, EnqueueMeta>
where
RequestOctets: Octets + Send + Sync + 'static,
RequestMeta: Clone + Default + Send + 'static,
Svc: Service<RequestOctets, RequestMeta>,
EnqueueMeta: Send + 'static,
{
fn dispatch(
&mut self,
request: Request<RequestOctets, RequestMeta>,
svc: Svc,
enqueue_meta: EnqueueMeta,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>
where
Self: Send,
{
Box::pin(async move {
let req_msg = request.message().clone();
let request_id = request.message().header().id();
trace!("Calling service for request id {request_id}");
let mut stream = svc.call(request).await;
trace!(
"Awaiting service call results for request id {request_id}"
);
while let Some(item) = stream.next().await {
trace!(
"Processing service call result for request id {request_id}"
);
let response =
self.process_response_stream_item(item, &req_msg);
if let Some(response) = response {
self.enqueue_response(response, &enqueue_meta).await;
}
if matches!(self.status(), InvokerStatus::Aborting) {
trace!("Aborting response stream processing for request id {request_id}");
break;
}
}
trace!("Finished processing service call results for request id {request_id}");
})
}
fn process_response_stream_item(
&mut self,
stream_item: ServiceResult<Svc::Target>,
req_msg: &Message<RequestOctets>,
) -> Option<AdditionalBuilder<StreamTarget<Svc::Target>>> {
match stream_item {
Ok(call_result) => {
let (response, feedback) = call_result.into_inner();
if let Some(feedback) = feedback {
self.process_feedback(feedback);
}
response
}
Err(err) => {
self.set_status(InvokerStatus::Aborting);
Some(mk_error_response(req_msg, err.rcode().into()))
}
}
}
fn process_feedback(&mut self, feedback: ServiceFeedback) {
match feedback {
ServiceFeedback::Reconfigure { idle_timeout } => {
self.reconfigure(idle_timeout);
}
ServiceFeedback::BeginTransaction => {
self.set_status(InvokerStatus::InTransaction);
}
ServiceFeedback::EndTransaction => {
self.set_status(InvokerStatus::Normal);
}
}
}
fn status(&self) -> InvokerStatus;
fn set_status(&mut self, status: InvokerStatus);
fn reconfigure(&self, idle_timeout: Option<Duration>);
fn enqueue_response<'a>(
&'a self,
response: AdditionalBuilder<StreamTarget<Svc::Target>>,
meta: &'a EnqueueMeta,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
}