norgopolis_client/
lib.rs

1//! This crate provides functionality to easily connect to a `norgopolis` instance and interact with
2//! its modules.
3//!
4//! This Rust crate provides a simple and lightweight layer for communicating with norgopolis.
5//! To establish a connection, use the `connect` function. By default Norgopolis runs on port `62020`:
6//!
7//! ```rs
8//! use norgopolis_client;
9//!
10//! #[tokio::main]
11//! async fn main() {
12//!     let connection = norgopolis_client::connect(&"localhost".into(), &"62020".into())
13//!         .await
14//!         .expect("Unable to connect to server!");
15//!
16//!     // Invokes a specific module's function without any parameters.
17//!     // The closure will be executed for every return value provided. Return values are streamed back
18//!     // over time, hence the `await`.
19//!     connection.invoke("module-name", "function-name", None, |response: YourExpectedResponse| println!("{:#?}", response))
20//!         .await
21//!         .unwrap();
22//! }
23//! ```
24//!
25//! If the `autostart-server` feature flag is enabled, this client will look for a binary called `norgopolis-server`
26//! on the host system and will auto-execute it if a connection could not be initially established.
27//!
28//! The server will be forked into a separate system process and will automatically shut down after 5 minutes
29//! of inactivity.
30
31use futures::FutureExt;
32use std::{
33    future::Future,
34    io::{BufReader, Read},
35    net::ToSocketAddrs,
36    process::{Command, Stdio},
37};
38
39pub use norgopolis_protos::client_communication::MessagePack;
40use norgopolis_protos::client_communication::{forwarder_client::ForwarderClient, Invocation};
41
42use serde::de::DeserializeOwned;
43use tonic::{transport::Channel, Request, Response, Status, Streaming};
44
45/// Defines a connection to a Norgopolis instance.
46pub struct ConnectionHandle(ForwarderClient<Channel>);
47
48impl ConnectionHandle {
49    /// Invokes a function of a given module running under Norgopolis.
50    /// Returns a future to the response stream.
51    ///
52    /// It's recommended to use the non-raw functions if you do not need greater control
53    /// over the data being sent.
54    pub fn invoke_raw(
55        &mut self,
56        module: String,
57        function_name: String,
58        args: Option<MessagePack>,
59    ) -> impl Future<Output = Result<Response<Streaming<MessagePack>>, Status>> + '_ {
60        self.0.forward(Request::new(Invocation {
61            module,
62            function_name,
63            args,
64        }))
65    }
66
67    /// Invokes a function of a given module running under Norgopolis.
68    ///
69    /// On every received message a callback will be executed with the raw
70    /// MessagePack return data.
71    ///
72    /// It's recommended to use the non-raw functions if you do not need greater control
73    /// over the data being sent.
74    pub async fn invoke_raw_callback<F>(
75        &mut self,
76        module: String,
77        function_name: String,
78        args: Option<MessagePack>,
79        callback: F,
80    ) -> anyhow::Result<()>
81    where
82        F: Fn(MessagePack),
83    {
84        self.invoke_raw(module, function_name, args)
85            .then(|response| async move {
86                let mut response = response?.into_inner();
87
88                while let Some(data) = response.message().await? {
89                    callback(data);
90                }
91
92                Ok::<(), anyhow::Error>(())
93            })
94            .await?;
95
96        Ok(())
97    }
98
99    /// High-level function to invoke a given module's function.
100    ///
101    /// Will execute a callback on every received return message, but
102    /// will also automatically deserialize the received MessagePack into
103    /// a struct of your choice.
104    ///
105    /// Example:
106    /// ```rs
107    ///  // Automatically deserialize the MessagePack into a String.
108    ///  connection.invoke("module-name".to_string(), "function-name".to_string(), None, |response: String| println!("{}", response))
109    ///      .await
110    ///      .unwrap();
111    ///  ```
112    pub async fn invoke<TargetStruct, F>(
113        &mut self,
114        module: String,
115        function_name: String,
116        args: Option<MessagePack>,
117        callback: F,
118    ) -> anyhow::Result<()>
119    where
120        F: Fn(TargetStruct),
121        TargetStruct: DeserializeOwned,
122    {
123        self.invoke_raw(module, function_name, args)
124            .then(|response| async move {
125                let mut response = response?.into_inner();
126
127                while let Some(data) = response.message().await? {
128                    callback(rmp_serde::from_slice::<TargetStruct>(data.data.as_slice()).unwrap());
129                }
130
131                Ok::<(), anyhow::Error>(())
132            })
133            .await?;
134
135        Ok(())
136    }
137
138    /// Invokes a function of a given module running under norgopolis.
139    ///
140    /// Instead of streaming return values back over time, this function waits until all possible
141    /// return values have been received and then returns a vector of outputs.
142    pub async fn invoke_collect<TargetStruct>(
143        &mut self,
144        module: String,
145        function_name: String,
146        args: Option<MessagePack>,
147    ) -> anyhow::Result<Vec<TargetStruct>>
148    where
149        TargetStruct: DeserializeOwned,
150    {
151        let response = self.invoke_raw(module, function_name, args).await;
152
153        let mut response = response?.into_inner();
154        let mut result: Vec<TargetStruct> = Vec::new();
155
156        while let Some(data) = response.message().await? {
157            result.push(rmp_serde::from_slice::<TargetStruct>(data.data.as_slice()).unwrap());
158        }
159
160        Ok(result)
161    }
162}
163
164/// Establish a connection with a running Norgopolis instance.
165///
166/// If the `autostart-server` feature flag is enabled, will attempt to also spawn a Norgopolis
167/// instance if one is not already running.
168pub async fn connect(ip: &String, port: &String) -> anyhow::Result<ConnectionHandle> {
169    let address = format!("{}:{}", ip, port);
170
171    Ok(ConnectionHandle(
172        match ForwarderClient::connect("http://".to_string() + &address).await {
173            Ok(connection) => connection,
174            Err(err) => {
175                if cfg!(feature = "autostart-server")
176                    && address
177                        .to_socket_addrs()?
178                        .all(|socket| socket.ip().is_loopback())
179                {
180                    let command = Command::new("norgopolis-server")
181                        .stdout(Stdio::piped())
182                        .spawn()?;
183
184                    if let Some(stdout) = command.stdout {
185                        let mut buffer = BufReader::new(stdout);
186                        let mut str = [b' '; 5];
187
188                        buffer.read_exact(&mut str)?;
189
190                        if str.starts_with(b"ready") {
191                            return Ok(ConnectionHandle(
192                                ForwarderClient::connect("http://".to_string() + ip + ":" + port)
193                                    .await?,
194                            ));
195                        }
196                    }
197                }
198
199                return Err(err.into());
200            }
201        },
202    ))
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[tokio::test]
210    async fn establish_connection() {
211        connect(&"127.0.0.1".into(), &"62020".into())
212            .await
213            .unwrap()
214            .invoke(
215                "hello-world".to_string(),
216                "echo".to_string(),
217                Some(MessagePack::encode("hello".to_string()).unwrap()),
218                |response: String| println!("{}", response),
219            )
220            .await
221            .unwrap();
222    }
223}