norgopolis_module/lib.rs
1//! # A Library for Creating Norgopolis Modules
2//!
3//! For information about Norgopolis, consult https://github.com/nvim-neorg/norgopolis.
4//!
5//! This library exposes an API for creating and maintaining a connection to the Norgopolis router.
6//! Norgopolis modules provide specific sets of functionality, for example multithreaded parsing, database
7//! access, etc. All of the default modules created by the Neorg team are built on top of this library.
8//!
9//! # Usage
10//!
11//! ### General Setup
12//!
13//! First, create a struct for your module. Name it whatever you'd like:
14//!
15//! ```rs
16//! use norgopolis_module::{
17//! invoker_service::Service, module_communication::MessagePack, Code, Module, Status,
18//! };
19//!
20//! #[derive(Default)]
21//! struct MyModule {
22//! // add any data or state you might need to maintain here...
23//! }
24//! ```
25//!
26//! Second, implement the `norgopolis_module::invoker_service::Service` trait for your struct.
27//! This forces you to implement a `call` function which will be invoked any time someone routes
28//! a message to your module. Since async traits are not stabilized within Rust yet, tag your
29//! trait implementation with `#[norgopolis_module::async_trait]`:
30//!
31//! ```rs
32//! use tokio_stream::wrappers::UnboundedReceiverStream;
33//!
34//! #[norgopolis_module::async_trait]
35//! impl Service for MyModule {
36//! type Stream = UnboundedReceiverStream<Result<MessagePack, Status>>;
37//!
38//! async fn call(
39//! &self,
40//! function: String,
41//! args: Option<MessagePack>,
42//! ) -> Result<Self::Stream, Status> {
43//! todo!()
44//! }
45//! }
46//! ```
47//!
48//! ##### `Stream`
49//!
50//! The `Stream` type defines what sort of data will be returned back via gRPC. We recommend
51//! that you set it to `UnboundedReceiverStream<Result<MessagePack, Status>>`. This means that
52//! given one request your module will be able to return an infinite amount of MessagePack responses,
53//! or a status code in case something went wrong.
54//!
55//! ##### `call`
56//!
57//! The `call` function gets invoked whenever a client routes a message to you. The message contains:
58//! - The function that they would like to invoke
59//! - An optional set of parameters they would like to supply to the function.
60//!
61//! ### Creating the Basic Glue
62//!
63//! In the `call` function it's recommended to match over all possible function names that your module
64//! supports and returning an error code if it's unsupported:
65//!
66//! ```rs
67//! match function.as_str() {
68//! "my-function" => todo!(),
69//! _ => Err(Status::new(Code::NotFound, "Requested function not found!")),
70//! }
71//! ```
72//!
73//! > [!IMPORTANT]
74//! > It's always better to return *some* sort of status code over panicking.
75//! > Panicking will terminate the connection to Norgopolis and the user will not receive
76//! > any sort of error or warning.
77//!
78//! ### Decoding the Parameters
79//!
80//! If your function takes in any amount of parameters then now is the time to decode them.
81//! If your parameter is complex (e.g. a dictionary) then it's recommended to create a struct
82//! designated for it. Be sure to derive `serde::Serialize`:
83//!
84//! ```rs
85//! #[derive(serde::Serialize)]
86//! struct MyParameters {
87//! name: String,
88//! }
89//! ```
90//!
91//! Aftewards, it's a simple matter of running `decode` on your arguments:
92//!
93//! ```rs
94//! match function.as_str() {
95//! "my-function" => {
96//! let args: MyParameters = args
97//! .unwrap() // WARNING: Don't actually use unwrap() in your code :)
98//! .decode()
99//! .map_err(|err| Status::new(Code::InvalidArgument, err.to_string()))?;
100//!
101//! // TODO: Do something with the parameters...
102//! },
103//! }
104//! ```
105//!
106//! We manually provide the type of `args` so that Rust knows what type to serialize to.
107//! Afterwards we wrap any possible errors into a status code which can be returned back to the client.
108//!
109//! ### Sending Data back to the Client
110//!
111//! Now that we have all of the input data in check we can process our data and return it back to the client.
112//! The way we do this is in the form of a data stream. Thanks to data streams we can return long segments of
113//! data over time instead of having to return the whole data upfront. When we return a segment of data, we
114//! also return it in the form of a `Result<>`. This is because individual segments of data may contain errors,
115//! but the whole process can complete succesfully. You should return errors from the `call` function when there
116//! is an irrecoverable error, but should send back an error packet when a *portion* of the internal logic fails.
117//!
118//! Let's showcase all of this via an example:
119//!
120//! ```rs
121//! match function.as_str() {
122//! "my-function" => {
123//! let args: MyParameters = args
124//! .unwrap() // WARNING: Don't actually use unwrap() in your code :)
125//! .decode()
126//! .map_err(|err| Status::new(Code::InvalidArgument, err.to_string()))?;
127//!
128//! let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
129//!
130//! // We send back an Ok() packet to the client with an encoded message of our choice
131//! // (it can be anything that's serializable with serde!)
132//! tx.send(Ok(MessagePack::encode(format!("Hello, {}!", args.name)))).unwrap();
133//!
134//! Ok(UnboundedReceiverStream::new(rx))
135//! },
136//! }
137//! ```
138//!
139//! First, we create a sender and receiver via tokio's `unbounded_channel()`. This allows us to send data to the client
140//! and for the client to read data from the module. All return messages have to be encoded via `MessagePack::encode`.
141//!
142//! ### Running the Module
143//!
144//! Now that we have all of the code set up, create an asynchronous main function. In here we will instantiate our
145//! module and kick it into full gear:
146//!
147//! ```rs
148//! #[tokio::main]
149//! async fn main() {
150//! Module::new().start(MyModule::default())
151//! .await
152//! .unwrap()
153//! }
154//! ```
155//!
156//! Voila! You now have a fundamental understanding of how modules communicate with Norgopolis and how to write your own
157//! norgopolis module. Happy coding!
158
159pub mod invoker_service;
160mod stdio_service;
161
162use std::time::Duration;
163
164use futures::FutureExt;
165use invoker_service::InvokerService;
166use invoker_service::Service;
167use module_communication::invoker_server::InvokerServer;
168use stdio_service::StdioService;
169use tokio::time::sleep;
170use tokio_stream::wrappers::ReceiverStream;
171use tonic::transport::Server;
172
173pub use norgopolis_protos::module_communication;
174pub use tonic::async_trait;
175pub use tonic::{Code, Status};
176
177/// Describes a module that can communicate with Norgopolis
178/// over stdin/stdout.
179pub struct Module {
180 /// Timeout duration for the module. If no messages are received by the module after this time
181 /// has passed the module will automatically shut down.
182 ///
183 /// Default is 5 minutes.
184 pub timeout: Duration,
185}
186
187impl Default for Module {
188 fn default() -> Self {
189 Self::new()
190 }
191}
192
193impl Module {
194 pub fn new() -> Self {
195 Module {
196 timeout: Duration::from_secs(60 * 5),
197 }
198 }
199
200 pub fn timeout(self, timeout: Duration) -> Self {
201 Module { timeout }
202 }
203
204 pub async fn start<S>(self, service: S) -> Result<(), anyhow::Error>
205 where
206 S: Service + Sync + Send + 'static,
207 {
208 let (keepalive_tx, mut keepalive_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
209
210 tokio::spawn(async move {
211 sleep(self.timeout).await;
212
213 if keepalive_rx.recv().now_or_never().is_none() {
214 std::process::exit(0);
215 }
216
217 // Drain the remained of the messages.
218 while keepalive_rx.recv().now_or_never().is_some() {}
219 });
220
221 let (stdin, stdout) = (tokio::io::stdin(), tokio::io::stdout());
222 let stdio_service = StdioService { stdin, stdout };
223
224 // TODO: Do this in a better way
225 // `once_stream` doesn't work :/
226 let (tx, rx) = tokio::sync::mpsc::channel::<Result<StdioService, anyhow::Error>>(1);
227 tx.send(Ok(stdio_service)).await?;
228
229 Ok(Server::builder()
230 .add_service(InvokerServer::new(InvokerService::new(
231 service,
232 keepalive_tx,
233 )))
234 .serve_with_incoming(ReceiverStream::new(rx))
235 .await?)
236 }
237}