norgopolis_module/
invoker_service.rs1use std::pin::Pin;
2
3use crate::module_communication::{invoker_server::Invoker, Invocation, MessagePack};
4use futures::Stream;
5use tonic::{Request, Response, Status};
6
7#[crate::async_trait]
8pub trait Service {
9 type Stream: Stream<Item = Result<MessagePack, Status>> + Send;
10
11 async fn call(
12 &self,
13 fn_name: String,
14 args: Option<MessagePack>,
15 ) -> Result<Self::Stream, Status>;
16}
17
18pub struct InvokerService<T> {
19 service: T,
20 tx: tokio::sync::mpsc::UnboundedSender<()>,
21}
22
23impl<T> InvokerService<T>
24where
25 T: Service,
26{
27 pub fn new(service: T, tx: tokio::sync::mpsc::UnboundedSender<()>) -> InvokerService<T> {
28 InvokerService { service, tx }
29 }
30}
31
32#[tonic::async_trait]
33impl<T> Invoker for InvokerService<T>
34where
35 T: Service + Sync + Send + 'static,
36{
37 type InvokeStream = Pin<Box<dyn Stream<Item = Result<MessagePack, Status>> + Send>>;
38
39 async fn invoke(
40 &self,
41 request: Request<Invocation>,
42 ) -> Result<Response<Self::InvokeStream>, Status> {
43 let invocation = request.into_inner();
44
45 let _ = self.tx.send(());
46
47 let response = self
48 .service
49 .call(invocation.function_name, invocation.args)
50 .await?;
51
52 Ok(Response::new(Box::pin(response)))
53 }
54}