1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
pub mod invoker_service;
mod stdio_service;

use std::time::Duration;

use futures::FutureExt;
use invoker_service::InvokerService;
use invoker_service::Service;
use module_communication::invoker_server::InvokerServer;
use stdio_service::StdioService;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;

pub use norgopolis_protos::module_communication;
pub use tonic::async_trait;
pub use tonic::{Code, Status};

pub trait Shutdown {
    fn shutdown(&self);
}

// TODO: Make the timeout for the module configurable via Module::new().timeout(xyz).start()
pub struct Module {}

impl Module {
    pub async fn start<S>(service: S) -> Result<(), anyhow::Error>
    where
        S: Service + Shutdown + Sync + Send + Copy + 'static,
    {
        let (keepalive_tx, mut keepalive_rx) = tokio::sync::mpsc::channel::<()>(1);

        tokio::spawn(async move {
            sleep(Duration::from_secs(60 * 5)).await;

            if keepalive_rx.recv().now_or_never().is_none() {
                service.shutdown();
            }
        });

        let (stdin, stdout) = (tokio::io::stdin(), tokio::io::stdout());
        let stdio_service = StdioService { stdin, stdout };

        // TODO: Do this in a better way
        // `once_stream` doesn't work :/
        let (tx, rx) = tokio::sync::mpsc::channel::<Result<StdioService, anyhow::Error>>(1);
        tx.send(Ok(stdio_service)).await?;

        Ok(Server::builder()
            .add_service(InvokerServer::new(InvokerService::new(
                service,
                keepalive_tx,
            )))
            .serve_with_incoming(ReceiverStream::new(rx))
            .await?)
    }
}