norgopolis_module/
lib.rs

1//! # A Library for Creating Norgopolis Modules
2//! 
3//! For information about Norgopolis, consult https://github.com/nvim-neorg/norgopolis.
4//! 
5//! This library exposes an API for creating and maintaining a connection to the Norgopolis router.
6//! Norgopolis modules provide specific sets of functionality, for example multithreaded parsing, database
7//! access, etc. All of the default modules created by the Neorg team are built on top of this library.
8//!
9//! # Usage
10//! 
11//! ### General Setup
12//! 
13//! First, create a struct for your module. Name it whatever you'd like:
14//! 
15//! ```rs
16//! use norgopolis_module::{
17//!     invoker_service::Service, module_communication::MessagePack, Code, Module, Status,
18//! };
19//! 
20//! #[derive(Default)]
21//! struct MyModule {
22//!     // add any data or state you might need to maintain here...
23//! }
24//! ```
25//! 
26//! Second, implement the `norgopolis_module::invoker_service::Service` trait for your struct.
27//! This forces you to implement a `call` function which will be invoked any time someone routes
28//! a message to your module. Since async traits are not stabilized within Rust yet, tag your
29//! trait implementation with `#[norgopolis_module::async_trait]`:
30//! 
31//! ```rs
32//! use tokio_stream::wrappers::UnboundedReceiverStream;
33//! 
34//! #[norgopolis_module::async_trait]
35//! impl Service for MyModule {
36//!     type Stream = UnboundedReceiverStream<Result<MessagePack, Status>>;
37//! 
38//!     async fn call(
39//!         &self,
40//!         function: String,
41//!         args: Option<MessagePack>,
42//!     ) -> Result<Self::Stream, Status> {
43//!         todo!()
44//!     }
45//! }
46//! ```
47//! 
48//! ##### `Stream`
49//! 
50//! The `Stream` type defines what sort of data will be returned back via gRPC. We recommend
51//! that you set it to `UnboundedReceiverStream<Result<MessagePack, Status>>`. This means that
52//! given one request your module will be able to return an infinite amount of MessagePack responses,
53//! or a status code in case something went wrong.
54//! 
55//! ##### `call`
56//! 
57//! The `call` function gets invoked whenever a client routes a message to you. The message contains:
58//! - The function that they would like to invoke
59//! - An optional set of parameters they would like to supply to the function.
60//! 
61//! ### Creating the Basic Glue
62//! 
63//! In the `call` function it's recommended to match over all possible function names that your module
64//! supports and returning an error code if it's unsupported:
65//! 
66//! ```rs
67//! match function.as_str() {
68//!     "my-function" => todo!(),
69//!     _ => Err(Status::new(Code::NotFound, "Requested function not found!")),
70//! }
71//! ```
72//! 
73//! > [!IMPORTANT]
74//! > It's always better to return *some* sort of status code over panicking.
75//! > Panicking will terminate the connection to Norgopolis and the user will not receive
76//! > any sort of error or warning.
77//! 
78//! ### Decoding the Parameters
79//! 
80//! If your function takes in any amount of parameters then now is the time to decode them.
81//! If your parameter is complex (e.g. a dictionary) then it's recommended to create a struct
82//! designated for it. Be sure to derive `serde::Serialize`:
83//! 
84//! ```rs
85//! #[derive(serde::Serialize)]
86//! struct MyParameters {
87//!     name: String,
88//! }
89//! ```
90//! 
91//! Aftewards, it's a simple matter of running `decode` on your arguments:
92//! 
93//! ```rs
94//! match function.as_str() {
95//!     "my-function" => {
96//!         let args: MyParameters = args
97//!             .unwrap() // WARNING: Don't actually use unwrap() in your code :)
98//!             .decode()
99//!             .map_err(|err| Status::new(Code::InvalidArgument, err.to_string()))?;
100//! 
101//!         // TODO: Do something with the parameters...
102//!     },
103//! }
104//! ```
105//! 
106//! We manually provide the type of `args` so that Rust knows what type to serialize to.
107//! Afterwards we wrap any possible errors into a status code which can be returned back to the client.
108//! 
109//! ### Sending Data back to the Client
110//! 
111//! Now that we have all of the input data in check we can process our data and return it back to the client.
112//! The way we do this is in the form of a data stream. Thanks to data streams we can return long segments of
113//! data over time instead of having to return the whole data upfront. When we return a segment of data, we
114//! also return it in the form of a `Result<>`. This is because individual segments of data may contain errors,
115//! but the whole process can complete succesfully. You should return errors from the `call` function when there
116//! is an irrecoverable error, but should send back an error packet when a *portion* of the internal logic fails.
117//! 
118//! Let's showcase all of this via an example:
119//! 
120//! ```rs
121//! match function.as_str() {
122//!     "my-function" => {
123//!         let args: MyParameters = args
124//!             .unwrap() // WARNING: Don't actually use unwrap() in your code :)
125//!             .decode()
126//!             .map_err(|err| Status::new(Code::InvalidArgument, err.to_string()))?;
127//! 
128//!         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
129//! 
130//!         // We send back an Ok() packet to the client with an encoded message of our choice
131//!         // (it can be anything that's serializable with serde!)
132//!         tx.send(Ok(MessagePack::encode(format!("Hello, {}!", args.name)))).unwrap();
133//! 
134//!         Ok(UnboundedReceiverStream::new(rx))
135//!     },
136//! }
137//! ```
138//! 
139//! First, we create a sender and receiver via tokio's `unbounded_channel()`. This allows us to send data to the client
140//! and for the client to read data from the module. All return messages have to be encoded via `MessagePack::encode`.
141//! 
142//! ### Running the Module
143//! 
144//! Now that we have all of the code set up, create an asynchronous main function. In here we will instantiate our
145//! module and kick it into full gear:
146//! 
147//! ```rs
148//! #[tokio::main]
149//! async fn main() {
150//!     Module::new().start(MyModule::default())
151//!         .await
152//!         .unwrap()
153//! }
154//! ```
155//! 
156//! Voila! You now have a fundamental understanding of how modules communicate with Norgopolis and how to write your own
157//! norgopolis module. Happy coding!
158
159pub mod invoker_service;
160mod stdio_service;
161
162use std::time::Duration;
163
164use futures::FutureExt;
165use invoker_service::InvokerService;
166use invoker_service::Service;
167use module_communication::invoker_server::InvokerServer;
168use stdio_service::StdioService;
169use tokio::time::sleep;
170use tokio_stream::wrappers::ReceiverStream;
171use tonic::transport::Server;
172
173pub use norgopolis_protos::module_communication;
174pub use tonic::async_trait;
175pub use tonic::{Code, Status};
176
177/// Describes a module that can communicate with Norgopolis
178/// over stdin/stdout.
179pub struct Module {
180    /// Timeout duration for the module. If no messages are received by the module after this time
181    /// has passed the module will automatically shut down.
182    ///
183    /// Default is 5 minutes.
184    pub timeout: Duration,
185}
186
187impl Default for Module {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193impl Module {
194    pub fn new() -> Self {
195        Module {
196            timeout: Duration::from_secs(60 * 5),
197        }
198    }
199
200    pub fn timeout(self, timeout: Duration) -> Self {
201        Module { timeout }
202    }
203
204    pub async fn start<S>(self, service: S) -> Result<(), anyhow::Error>
205    where
206        S: Service + Sync + Send + 'static,
207    {
208        let (keepalive_tx, mut keepalive_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
209
210        tokio::spawn(async move {
211            sleep(self.timeout).await;
212
213            if keepalive_rx.recv().now_or_never().is_none() {
214                std::process::exit(0);
215            }
216
217            // Drain the remained of the messages.
218            while keepalive_rx.recv().now_or_never().is_some() {}
219        });
220
221        let (stdin, stdout) = (tokio::io::stdin(), tokio::io::stdout());
222        let stdio_service = StdioService { stdin, stdout };
223
224        // TODO: Do this in a better way
225        // `once_stream` doesn't work :/
226        let (tx, rx) = tokio::sync::mpsc::channel::<Result<StdioService, anyhow::Error>>(1);
227        tx.send(Ok(stdio_service)).await?;
228
229        Ok(Server::builder()
230            .add_service(InvokerServer::new(InvokerService::new(
231                service,
232                keepalive_tx,
233            )))
234            .serve_with_incoming(ReceiverStream::new(rx))
235            .await?)
236    }
237}