norgopolis_module/
invoker_service.rs

1use std::pin::Pin;
2
3use crate::module_communication::{invoker_server::Invoker, Invocation, MessagePack};
4use futures::Stream;
5use tonic::{Request, Response, Status};
6
7#[crate::async_trait]
8pub trait Service {
9    type Stream: Stream<Item = Result<MessagePack, Status>> + Send;
10
11    async fn call(
12        &self,
13        fn_name: String,
14        args: Option<MessagePack>,
15    ) -> Result<Self::Stream, Status>;
16}
17
18pub struct InvokerService<T> {
19    service: T,
20    tx: tokio::sync::mpsc::UnboundedSender<()>,
21}
22
23impl<T> InvokerService<T>
24where
25    T: Service,
26{
27    pub fn new(service: T, tx: tokio::sync::mpsc::UnboundedSender<()>) -> InvokerService<T> {
28        InvokerService { service, tx }
29    }
30}
31
32#[tonic::async_trait]
33impl<T> Invoker for InvokerService<T>
34where
35    T: Service + Sync + Send + 'static,
36{
37    type InvokeStream = Pin<Box<dyn Stream<Item = Result<MessagePack, Status>> + Send>>;
38
39    async fn invoke(
40        &self,
41        request: Request<Invocation>,
42    ) -> Result<Response<Self::InvokeStream>, Status> {
43        let invocation = request.into_inner();
44
45        let _ = self.tx.send(());
46
47        let response = self
48            .service
49            .call(invocation.function_name, invocation.args)
50            .await?;
51
52        Ok(Response::new(Box::pin(response)))
53    }
54}