genserver/
lib.rs

1#![feature(impl_trait_in_assoc_type)]
2//! # genserver: generate a server
3//!
4//! This is a neat little create for building async actor-based applications,
5//! inspired by Elixir's GenServer, powered by Tokio.
6//!
7//! This crate is currently nightly-only, and requires unstable features
8//! (`#![feature(type_alias_impl_trait, impl_trait_in_assoc_type)]`).
9//!
10//! ## Introduction
11//!
12//! Erlang's OTP has reached legend amongst computer people for its rock solid
13//! reliability, super cool concurrency, failure handling, hot code reloading,
14//! and web scale innovation. [This excellent documentary
15//! film](https://www.youtube.com/watch?v=rRbY3TMUcgQ) provides a good overview of
16//! some of the coolness of Erlang.
17//!
18//! This crate has nothing to do Erlang, but it takes inspiration from Elixir
19//! (Erlang's younger, ever more hip successor) and its GenServer. Here we
20//! provide a method and interface for creating very simple actors which can
21//! call other actors and reply to messages.
22//!
23//! Due to some quirks of the current Rust async implementation, this crate has
24//! to do a few janky things to make everything work nice, however the end
25//! result is a nice little API for building legendary actor-based systems. For
26//! example, Rust doesn't yet support async methods in traits, but we can
27//! provide similar behaviour by enabling a few unstable features.
28//!
29//! Underneath the hood, Tokio is leveraged for its sweet concurrency
30//! primitives. You can generate as many servers as you'd like, and each is
31//! spawned into its own async context (like a thread, but not exactly).
32//!
33//! We can build some really cool stuff using this very simple abstraction. And
34//! since it all runs within an async context, we can build super fast web scale
35//! technology in no time. Plus we get all the benefits of Rust, especially the
36//! bragging rights.
37//!
38//! For more usage examples (aside from what's in the docs), check out the tests
39//! in [`tests/test.rs`](https://github.com/brndnmtthws/genserver/blob/main/tests/test.rs)
40//! within this crate.
41//!
42//! ## Quickstart
43//!
44//! To get started, you'll need to do 3 things:
45//!
46//! * Define one or more servers, which respond to _calls_ and _casts_, and
47//!   implement the [`GenServer`] trait.
48//! * Define a registry with [`make_registry`], which starts and managers the
49//!   servers you define
50//! * Enable the `type_alias_impl_trait` features at the crate level.
51//!
52//! When we make a _call_, the call will block until it returns a result from
53//! the server. When we make a _cast_, the cast returns immediately after it's
54//! dispatched to the server.
55//!
56//! We need to start our registry within an async context. Refer
57//!
58//! Here is a minimal code example showing a server, `MyServer`, which
59//! implements [`GenServer`]. We then create a registry and start it.
60//!
61//! ```
62//! // this feature must be enabled at the crate level
63//! #![feature(type_alias_impl_trait, impl_trait_in_assoc_type)]
64//!
65//! use std::future::Future;
66//!
67//! use genserver::{make_registry, GenServer};
68//!
69//! struct MyServer {
70//!     // Any state your server needs can go right
71//!     // in here. We keep a registry around in case
72//!     // we want to call any other servers from our
73//!     // server.
74//!     registry: MyRegistry,
75//! }
76//!
77//! impl GenServer for MyServer {
78//!     // Message type for this server.
79//!     type Message = String;
80//!     // The type of the registry defined by the `make_registry` attribute macro.
81//!     type Registry = MyRegistry;
82//!     // The response type for calls.
83//!     type Response = String;
84//!
85//!     // The call response type, a future, which returns Self::Response.
86//!     type CallResponse<'a> = impl Future<Output = Self::Response> + 'a;
87//!     // The cast response type, also a future, which returns unit.
88//!     type CastResponse<'a> = impl Future<Output = ()> + 'a;
89//!
90//!     fn new(registry: Self::Registry) -> Self {
91//!         // When are server is started, the registry passes a copy
92//!         // of itself here. We keep it around so we can call
93//!         // other servers from this one.
94//!         Self { registry }
95//!     }
96//!
97//!     // Calls to handle_call will block until our `Response` is returned.
98//!     // Because they return a future, we can return an async block here.
99//!     fn handle_call(&mut self, message: Self::Message) -> Self::CallResponse<'_> {
100//!         println!("handle_call received {}", message);
101//!         std::future::ready("returned from handle_call".into())
102//!     }
103//!
104//!     // Casts always return (), because they do not block callers and return
105//!     // immediately.
106//!     fn handle_cast(&mut self, message: Self::Message) -> Self::CastResponse<'_> {
107//!         println!("handle_cast received {}", message);
108//!         std::future::ready(())
109//!     }
110//! }
111//!
112//! #[make_registry{
113//!     myserver: MyServer
114//! }]
115//! struct MyRegistry;
116//!
117//! tokio_test::block_on(async {
118//!     let registry = MyRegistry::start().await;
119//!
120//!     let response = registry
121//!         .call_myserver("calling myserver".into())
122//!         .await
123//!         .unwrap();
124//!     registry
125//!         .cast_myserver("casting to myserver".into())
126//!         .await
127//!         .unwrap();
128//! });
129//! ```
130//!
131//! Note that in the code above, `MyServer::handle_call` and
132//! `MyServer::handle_cast` return futures. That means you can include an async
133//! block within the function and return it. Refer to [`GenServer`] for more
134//! details.
135//!
136//! ## Supplying initialization data to your server
137//!
138//! With Elixir's GenServer, you can initialize state with the `init()` method.
139//! However, in Rust this is a bit tricky due to the type rules, and it makes
140//! the implementation much more complicated.
141//!
142//! The [`GenServer`] trait lets you control the initialization of your server
143//! with the `new()` method. However, there's no way to pass additional
144//! parameters to `new()`. Instead, you have 2 options:
145//!
146//! 1. You can simply create a message with initial parameters, and send that
147//! message to your server immediately after launching. This is the best option,
148//! though it has some drawbacks such as needing to make most fields optional.
149//! 2. You could add fields to your [`Registry`]. This is the least preferred
150//! option, as the registry should be immutable and easy to make many copies of.
151//!
152//! ## Handling state within servers
153//!
154//! Each server instance you create is essentially an ordinary struct, but it's
155//! owned and created by the registry. Moving state into your struct can be
156//! accomplished by sending it messages with state (Pro Tip: use enums for
157//! messages). Your server struct can contain any arbitrary state you want.
158//!
159//! For example, you can create a server like this:
160//!
161//! ```
162//! use std::collections::HashMap;
163//!
164//! pub struct MyStatefulServer {
165//!     map: HashMap<String, String>,
166//! }
167//! ```
168//!
169//! And in your `new()` implementation for [`GenServer::new()`], you can
170//! initialize `map` with an empty hashmap using `map: HashMap::new()`. You can
171//! also use [`Option`] to wrap fields which require state initialization after
172//! your server is started, and send a message to your server to initialize that
173//! state.
174//!
175//! ## Changing channel queue size
176//!
177//! We use bounded queues, which provide backpressure when queues are full. This
178//! is probably not something you'll need to worry much about, but if (for some
179//! reason) you want to change the channel queue length, it can be done so by
180//! implementing the [`GenServer::channel_queue_size()`] method for your server.
181//!
182//! ## Scaling to lots of things
183//!
184//! Each [`GenServer`] is spawned with `tokio::task::spawn()` into its own
185//! main loop, but within each loop all tasks are executed within a local
186//! thread. Thus, each individual server instance is thread-bound. In order to
187//! increase parallelism, you need to split up your service into more
188//! `GenServer` instances.
189//!
190//! Creating new servers is relatively cheap, and making calls between them is
191//! also cheap. The more separate servers you introduce, the higher a level of
192//! parallelism you can achieve. For a processing pipeline that needs to be
193//! highly parallelized, simply create a new server for each step or stage of
194//! your pipeline and have each stage hand off to the next.
195pub mod joinset;
196
197use std::future::Future;
198
199/// Makes a registry.
200///
201/// This attribute applies to structs and converts an ordinary struct into a
202/// registry. While you can pass any ordinary struct to this attribute, it's
203/// recommended that you don't add any fields or methods to the registry crate.
204///
205/// You must specify the servers you want to register as an argument, in `name:
206/// Type` pairs.
207///
208/// For example, if we use the following code:
209///
210/// ```compile_fail
211/// #[make_registry {
212///     first_server: FirstServer,
213///     second_server: SecondServer,
214/// }]
215/// struct MyRegistry;
216/// ```
217///
218/// This will generate a registry called `MyRegistry` which implements the
219/// [`Registry`] trait. In the example above, we specified 2 servers,
220/// named `first_server` and `second_server`, and are of the type `FirstServer`
221/// and `SecondServer` respectively.
222///
223/// For each server registered with this registry, the `call_{name}()`,
224/// `call_{name}_with_timeout()`, and `cast_{name}()` will be generated for this
225/// registry which can be used to make calls and casts to each server. You can
226/// (if you wish) define multiple separate instances of the same type, so long
227/// as each one has a unique name, much like you would when defining struct
228/// fields.
229///
230/// Using the example above, we'd generate functions with the following names in
231/// our registry:
232///
233/// | Call fn | Call fn with timeout | Cast fn (non-blocking) |
234/// |---|---|---|
235/// |`call_first_server`|`call_first_server_with_timeout`|`cast_first_server`|
236/// |`call_second_server`|`call_second_server_with_timeout`|`cast_second_server`|
237///
238/// This macro will additionally derive the [`Clone`] trait.
239///
240/// Note that this example will not compile because the [`GenServer`] trait is
241/// not implemented for this example case.
242///
243/// The full generated code for the example above is as follows:
244///
245/// ```compile_fail
246/// struct MyRegistry {
247///     first_server_tx: tokio::sync::mpsc::Sender<(
248///         <FirstServer as genserver::GenServer>::Message,
249///         Option<tokio::sync::oneshot::Sender<<FirstServer as genserver::GenServer>::Response>>,
250///     )>,
251///     second_server_tx: tokio::sync::mpsc::Sender<(
252///         <SecondServer as genserver::GenServer>::Message,
253///         Option<tokio::sync::oneshot::Sender<<SecondServer as genserver::GenServer>::Response>>,
254///     )>,
255/// }
256/// #[automatically_derived]
257/// impl ::core::clone::Clone for MyRegistry {
258///     #[inline]
259///     fn clone(&self) -> MyRegistry {
260///         MyRegistry {
261///             first_server_tx: ::core::clone::Clone::clone(&self.first_server_tx),
262///             second_server_tx: ::core::clone::Clone::clone(&self.second_server_tx),
263///         }
264///     }
265/// }
266/// impl genserver::Registry for MyRegistry {}
267/// impl MyRegistry {
268///     pub async fn start() -> Self {
269///         let (first_server_tx, mut first_server_rx) = tokio::sync::mpsc::channel::<(
270///             <FirstServer as genserver::GenServer>::Message,
271///             Option<
272///                 tokio::sync::oneshot::Sender<<FirstServer as genserver::GenServer>::Response>,
273///             >,
274///         )>(1_000);
275///         let (second_server_tx, mut second_server_rx) = tokio::sync::mpsc::channel::<(
276///             <SecondServer as genserver::GenServer>::Message,
277///             Option<
278///                 tokio::sync::oneshot::Sender<<SecondServer as genserver::GenServer>::Response>,
279///             >,
280///         )>(1_000);
281///         let mut registry = Self {
282///             first_server_tx,
283///             second_server_tx,
284///         };
285///         {
286///             let local_registry = registry.clone();
287///             tokio::spawn(async move {
288///                 let mut handler = FirstServer::new(local_registry);
289///                 while let Some((message, oneshot)) = first_server_rx.recv().await {
290///                     if let Some(oneshot) = oneshot {
291///                         let Response = handler.handle_call(message).await;
292///                         oneshot.send(Response).ok();
293///                     } else {
294///                         handler.handle_cast(message).await;
295///                     }
296///                 }
297///             });
298///         }
299///         {
300///             let local_registry = registry.clone();
301///             tokio::spawn(async move {
302///                 let mut handler = SecondServer::new(local_registry);
303///                 while let Some((message, oneshot)) = second_server_rx.recv().await {
304///                     if let Some(oneshot) = oneshot {
305///                         let Response = handler.handle_call(message).await;
306///                         oneshot.send(Response).ok();
307///                     } else {
308///                         handler.handle_cast(message).await;
309///                     }
310///                 }
311///             });
312///         }
313///         registry
314///     }
315///
316///     pub async fn call_first_server(
317///         &self,
318///         message: <FirstServer as genserver::GenServer>::Message,
319///     ) -> Result<
320///         <FirstServer as genserver::GenServer>::Response,
321///         genserver::Error<(
322///             <FirstServer as genserver::GenServer>::Message,
323///             Option<
324///                 tokio::sync::oneshot::Sender<<FirstServer as genserver::GenServer>::Message>,
325///             >,
326///         )>,
327///     > {
328///         let (oneshot_tx, oneshot_rx) =
329///             tokio::sync::oneshot::channel::<<FirstServer as genserver::GenServer>::Response>();
330///         self.first_server_tx
331///             .send((message, Some(oneshot_tx)))
332///             .await?;
333///         let Response = oneshot_rx.await?;
334///         Ok(Response)
335///     }
336///
337///     pub async fn cast_first_server(
338///         &self,
339///         message: <FirstServer as genserver::GenServer>::Message,
340///     ) -> Result<
341///         (),
342///         genserver::Error<(
343///             <FirstServer as genserver::GenServer>::Message,
344///             Option<
345///                 tokio::sync::oneshot::Sender<<FirstServer as genserver::GenServer>::Message>,
346///             >,
347///         )>,
348///     > {
349///         self.first_server_tx.send((message, None)).await?;
350///         Ok(())
351///     }
352///
353///     pub async fn call_second_server(
354///         &self,
355///         message: <SecondServer as genserver::GenServer>::Message,
356///     ) -> Result<
357///         <SecondServer as genserver::GenServer>::Response,
358///         genserver::Error<(
359///             <SecondServer as genserver::GenServer>::Message,
360///             Option<
361///                 tokio::sync::oneshot::Sender<<SecondServer as genserver::GenServer>::Message>,
362///             >,
363///         )>,
364///     > {
365///         let (oneshot_tx, oneshot_rx) =
366///             tokio::sync::oneshot::channel::<<SecondServer as genserver::GenServer>::Response>();
367///         self.second_server_tx
368///             .send((message, Some(oneshot_tx)))
369///             .await?;
370///         let Response = oneshot_rx.await?;
371///         Ok(Response)
372///     }
373///
374///     pub async fn cast_second_server(
375///         &self,
376///         message: <SecondServer as genserver::GenServer>::Message,
377///     ) -> Result<
378///         (),
379///         genserver::Error<(
380///             <SecondServer as genserver::GenServer>::Message,
381///             Option<
382///                 tokio::sync::oneshot::Sender<<SecondServer as genserver::GenServer>::Message>,
383///             >,
384///         )>,
385///     > {
386///         self.second_server_tx.send((message, None)).await?;
387///         Ok(())
388///     }
389/// }
390/// ```
391///
392/// ## Adding your own state
393///
394/// If you'd like to add your own state to a registry, you can do so like you
395/// normally would with any other struct, so long as the fields are named (you
396/// can't use an unnamed field struct). Any types you add must also implement
397/// [`Send`], [`Sync`], and [`Clone`].
398///
399/// When you specify fields in a struct, it will create a `new()` method which
400/// takes all those fields by value for initialization. For example, this code:
401///
402/// ```
403/// use std::sync::atomic::{AtomicUsize, Ordering};
404/// use std::sync::Arc;
405///
406/// use genserver::make_registry;
407///
408/// #[make_registry{}]
409/// struct MyRegistry {
410///     counter: Arc<AtomicUsize>,
411/// }
412/// ```
413///
414/// Will generate a registry with a `new()` which has the following signature:
415/// ```compile_fail
416/// pub async fn start(counter: Arc<AtomicUsize>) -> Self {
417///     // ...
418/// }
419/// ```
420
421pub use genserver_codegen::make_registry;
422
423/// Error wrapper type.
424#[derive(Debug)]
425pub enum Error<M, R> {
426    OneshotRecvError(tokio::sync::oneshot::error::RecvError),
427    MpscSendError(
428        tokio::sync::mpsc::error::SendError<(M, Option<tokio::sync::oneshot::Sender<R>>)>,
429    ),
430    Timeout,
431}
432
433impl<M, R> From<tokio::sync::oneshot::error::RecvError> for Error<M, R> {
434    fn from(err: tokio::sync::oneshot::error::RecvError) -> Self {
435        Self::OneshotRecvError(err)
436    }
437}
438
439impl<M, R> From<tokio::sync::mpsc::error::SendError<(M, Option<tokio::sync::oneshot::Sender<R>>)>>
440    for Error<M, R>
441{
442    fn from(
443        err: tokio::sync::mpsc::error::SendError<(M, Option<tokio::sync::oneshot::Sender<R>>)>,
444    ) -> Self {
445        Self::MpscSendError(err)
446    }
447}
448
449/// The `GenServer` trait lets you generate a server, by implementing the trait.
450pub trait GenServer {
451    /// Specifies the type of messages this server can receive.
452    type Message;
453    /// Specifies the response from calls.
454    type Response;
455    /// Specifies the name of the registry type created with [`make_registry`].
456    type Registry;
457    /// Specifies the call response type, which must implement
458    /// [std::future::Future].
459    type CallResponse<'a>: Future<Output = Self::Response>
460    where
461        Self: 'a;
462    /// Specifies the call response type, which must implement
463    /// [std::future::Future] and return `()`.
464    type CastResponse<'a>: Future<Output = ()>
465    where
466        Self: 'a;
467
468    /// Creates a new server, and receives a copy of the current registry.
469    ///
470    /// You should never need to call this method yourself, as it's called for
471    /// you by the registry.
472    fn new(registry: Self::Registry) -> Self;
473    /// This function will be called whenever this server receives a call.
474    fn handle_call(&mut self, message: Self::Message) -> Self::CallResponse<'_>;
475    /// This function will be called whenever this server receives a cast.
476    fn handle_cast(&mut self, message: Self::Message) -> Self::CastResponse<'_>;
477
478    /// Reimplement this method to change the channel queue size for your
479    /// server. Defaults to 1000 messages.
480    fn channel_queue_size() -> usize {
481        1_000
482    }
483}
484
485/// Trait for registries created with [`make_registry`].
486pub trait Registry {
487    /// Shuts down a registry.
488    fn shutdown(&mut self);
489}