norgopolis_client/lib.rs
1//! This crate provides functionality to easily connect to a `norgopolis` instance and interact with
2//! its modules.
3//!
4//! This Rust crate provides a simple and lightweight layer for communicating with norgopolis.
5//! To establish a connection, use the `connect` function. By default Norgopolis runs on port `62020`:
6//!
7//! ```rs
8//! use norgopolis_client;
9//!
10//! #[tokio::main]
11//! async fn main() {
12//! let connection = norgopolis_client::connect(&"localhost".into(), &"62020".into())
13//! .await
14//! .expect("Unable to connect to server!");
15//!
16//! // Invokes a specific module's function without any parameters.
17//! // The closure will be executed for every return value provided. Return values are streamed back
18//! // over time, hence the `await`.
19//! connection.invoke("module-name", "function-name", None, |response: YourExpectedResponse| println!("{:#?}", response))
20//! .await
21//! .unwrap();
22//! }
23//! ```
24//!
25//! If the `autostart-server` feature flag is enabled, this client will look for a binary called `norgopolis-server`
26//! on the host system and will auto-execute it if a connection could not be initially established.
27//!
28//! The server will be forked into a separate system process and will automatically shut down after 5 minutes
29//! of inactivity.
30
31use futures::FutureExt;
32use std::{
33 future::Future,
34 io::{BufReader, Read},
35 net::ToSocketAddrs,
36 process::{Command, Stdio},
37};
38
39pub use norgopolis_protos::client_communication::MessagePack;
40use norgopolis_protos::client_communication::{forwarder_client::ForwarderClient, Invocation};
41
42use serde::de::DeserializeOwned;
43use tonic::{transport::Channel, Request, Response, Status, Streaming};
44
45/// Defines a connection to a Norgopolis instance.
46pub struct ConnectionHandle(ForwarderClient<Channel>);
47
48impl ConnectionHandle {
49 /// Invokes a function of a given module running under Norgopolis.
50 /// Returns a future to the response stream.
51 ///
52 /// It's recommended to use the non-raw functions if you do not need greater control
53 /// over the data being sent.
54 pub fn invoke_raw(
55 &mut self,
56 module: String,
57 function_name: String,
58 args: Option<MessagePack>,
59 ) -> impl Future<Output = Result<Response<Streaming<MessagePack>>, Status>> + '_ {
60 self.0.forward(Request::new(Invocation {
61 module,
62 function_name,
63 args,
64 }))
65 }
66
67 /// Invokes a function of a given module running under Norgopolis.
68 ///
69 /// On every received message a callback will be executed with the raw
70 /// MessagePack return data.
71 ///
72 /// It's recommended to use the non-raw functions if you do not need greater control
73 /// over the data being sent.
74 pub async fn invoke_raw_callback<F>(
75 &mut self,
76 module: String,
77 function_name: String,
78 args: Option<MessagePack>,
79 callback: F,
80 ) -> anyhow::Result<()>
81 where
82 F: Fn(MessagePack),
83 {
84 self.invoke_raw(module, function_name, args)
85 .then(|response| async move {
86 let mut response = response?.into_inner();
87
88 while let Some(data) = response.message().await? {
89 callback(data);
90 }
91
92 Ok::<(), anyhow::Error>(())
93 })
94 .await?;
95
96 Ok(())
97 }
98
99 /// High-level function to invoke a given module's function.
100 ///
101 /// Will execute a callback on every received return message, but
102 /// will also automatically deserialize the received MessagePack into
103 /// a struct of your choice.
104 ///
105 /// Example:
106 /// ```rs
107 /// // Automatically deserialize the MessagePack into a String.
108 /// connection.invoke("module-name".to_string(), "function-name".to_string(), None, |response: String| println!("{}", response))
109 /// .await
110 /// .unwrap();
111 /// ```
112 pub async fn invoke<TargetStruct, F>(
113 &mut self,
114 module: String,
115 function_name: String,
116 args: Option<MessagePack>,
117 callback: F,
118 ) -> anyhow::Result<()>
119 where
120 F: Fn(TargetStruct),
121 TargetStruct: DeserializeOwned,
122 {
123 self.invoke_raw(module, function_name, args)
124 .then(|response| async move {
125 let mut response = response?.into_inner();
126
127 while let Some(data) = response.message().await? {
128 callback(rmp_serde::from_slice::<TargetStruct>(data.data.as_slice()).unwrap());
129 }
130
131 Ok::<(), anyhow::Error>(())
132 })
133 .await?;
134
135 Ok(())
136 }
137
138 /// Invokes a function of a given module running under norgopolis.
139 ///
140 /// Instead of streaming return values back over time, this function waits until all possible
141 /// return values have been received and then returns a vector of outputs.
142 pub async fn invoke_collect<TargetStruct>(
143 &mut self,
144 module: String,
145 function_name: String,
146 args: Option<MessagePack>,
147 ) -> anyhow::Result<Vec<TargetStruct>>
148 where
149 TargetStruct: DeserializeOwned,
150 {
151 let response = self.invoke_raw(module, function_name, args).await;
152
153 let mut response = response?.into_inner();
154 let mut result: Vec<TargetStruct> = Vec::new();
155
156 while let Some(data) = response.message().await? {
157 result.push(rmp_serde::from_slice::<TargetStruct>(data.data.as_slice()).unwrap());
158 }
159
160 Ok(result)
161 }
162}
163
164/// Establish a connection with a running Norgopolis instance.
165///
166/// If the `autostart-server` feature flag is enabled, will attempt to also spawn a Norgopolis
167/// instance if one is not already running.
168pub async fn connect(ip: &String, port: &String) -> anyhow::Result<ConnectionHandle> {
169 let address = format!("{}:{}", ip, port);
170
171 Ok(ConnectionHandle(
172 match ForwarderClient::connect("http://".to_string() + &address).await {
173 Ok(connection) => connection,
174 Err(err) => {
175 if cfg!(feature = "autostart-server")
176 && address
177 .to_socket_addrs()?
178 .all(|socket| socket.ip().is_loopback())
179 {
180 let command = Command::new("norgopolis-server")
181 .stdout(Stdio::piped())
182 .spawn()?;
183
184 if let Some(stdout) = command.stdout {
185 let mut buffer = BufReader::new(stdout);
186 let mut str = [b' '; 5];
187
188 buffer.read_exact(&mut str)?;
189
190 if str.starts_with(b"ready") {
191 return Ok(ConnectionHandle(
192 ForwarderClient::connect("http://".to_string() + ip + ":" + port)
193 .await?,
194 ));
195 }
196 }
197 }
198
199 return Err(err.into());
200 }
201 },
202 ))
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[tokio::test]
210 async fn establish_connection() {
211 connect(&"127.0.0.1".into(), &"62020".into())
212 .await
213 .unwrap()
214 .invoke(
215 "hello-world".to_string(),
216 "echo".to_string(),
217 Some(MessagePack::encode("hello".to_string()).unwrap()),
218 |response: String| println!("{}", response),
219 )
220 .await
221 .unwrap();
222 }
223}