1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use futures::FutureExt;
use std::future::Future;

use norgopolis_protos::client_communication::{forwarder_client::ForwarderClient, Invocation};
pub use norgopolis_protos::client_communication::MessagePack;

use serde::de::DeserializeOwned;
use tonic::{transport::Channel, Request, Response, Status, Streaming};

pub struct ConnectionHandle(ForwarderClient<Channel>);

impl ConnectionHandle {
    pub fn invoke_raw<'a>(
        &'a mut self,
        module: String,
        function_name: String,
        args: Option<MessagePack>,
    ) -> impl Future<Output = Result<Response<Streaming<MessagePack>>, Status>> + 'a {
        self.0.forward(Request::new(Invocation {
            module,
            function_name,
            args,
        }))
    }

    pub async fn invoke<'a, TargetStruct, F>(
        &'a mut self,
        module: String,
        function_name: String,
        args: Option<MessagePack>,
        callback: F
    ) -> anyhow::Result<()>
        where F: Fn(TargetStruct),
        TargetStruct: DeserializeOwned,
    {
        self.invoke_raw(module, function_name, args).then(|response| async move {
            let mut response = response?.into_inner();

            while let Some(data) = response.message().await? {
                callback(rmp_serde::from_slice::<TargetStruct>(data.data.as_slice()).unwrap());
            }

            Ok::<(), anyhow::Error>(())
        }).await?;

        Ok(())
    }

    pub async fn invoke_collect<TargetStruct>(
        &mut self,
        module: String,
        function_name: String,
        args: Option<MessagePack>,
    ) -> anyhow::Result<Vec<TargetStruct>>
    where
        TargetStruct: DeserializeOwned,
    {
        let response = self.invoke_raw(module, function_name, args).await;

        let mut response = response?.into_inner();
        let mut result: Vec<TargetStruct> = Vec::new();

        while let Some(data) = response.message().await? {
            result.push(rmp_serde::from_slice::<TargetStruct>(data.data.as_slice()).unwrap());
        }

        Ok(result)
    }
}

pub async fn connect(ip: &String, port: &String) -> anyhow::Result<ConnectionHandle> {
    // TODO: Spin up the server if it doesn't already exist
    // NOTE: Perhaps make server spinup a feature flag?
    Ok(ConnectionHandle(
        ForwarderClient::connect("http://".to_string() + ip + ":" + port).await?,
    ))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn establish_connection() {
        connect(&"127.0.0.1".into(), &"62020".into()).await.unwrap().invoke("test-module".to_string(), "func-name".to_string(), None, |response: (String,)| println!("{}", response.0)).await.unwrap();
    }
}