genserver/lib.rs
1#![feature(impl_trait_in_assoc_type)]
2//! # genserver: generate a server
3//!
4//! This is a neat little create for building async actor-based applications,
5//! inspired by Elixir's GenServer, powered by Tokio.
6//!
7//! This crate is currently nightly-only, and requires unstable features
8//! (`#![feature(type_alias_impl_trait, impl_trait_in_assoc_type)]`).
9//!
10//! ## Introduction
11//!
12//! Erlang's OTP has reached legend amongst computer people for its rock solid
13//! reliability, super cool concurrency, failure handling, hot code reloading,
14//! and web scale innovation. [This excellent documentary
15//! film](https://www.youtube.com/watch?v=rRbY3TMUcgQ) provides a good overview of
16//! some of the coolness of Erlang.
17//!
18//! This crate has nothing to do Erlang, but it takes inspiration from Elixir
19//! (Erlang's younger, ever more hip successor) and its GenServer. Here we
20//! provide a method and interface for creating very simple actors which can
21//! call other actors and reply to messages.
22//!
23//! Due to some quirks of the current Rust async implementation, this crate has
24//! to do a few janky things to make everything work nice, however the end
25//! result is a nice little API for building legendary actor-based systems. For
26//! example, Rust doesn't yet support async methods in traits, but we can
27//! provide similar behaviour by enabling a few unstable features.
28//!
29//! Underneath the hood, Tokio is leveraged for its sweet concurrency
30//! primitives. You can generate as many servers as you'd like, and each is
31//! spawned into its own async context (like a thread, but not exactly).
32//!
33//! We can build some really cool stuff using this very simple abstraction. And
34//! since it all runs within an async context, we can build super fast web scale
35//! technology in no time. Plus we get all the benefits of Rust, especially the
36//! bragging rights.
37//!
38//! For more usage examples (aside from what's in the docs), check out the tests
39//! in [`tests/test.rs`](https://github.com/brndnmtthws/genserver/blob/main/tests/test.rs)
40//! within this crate.
41//!
42//! ## Quickstart
43//!
44//! To get started, you'll need to do 3 things:
45//!
46//! * Define one or more servers, which respond to _calls_ and _casts_, and
47//! implement the [`GenServer`] trait.
48//! * Define a registry with [`make_registry`], which starts and managers the
49//! servers you define
50//! * Enable the `type_alias_impl_trait` features at the crate level.
51//!
52//! When we make a _call_, the call will block until it returns a result from
53//! the server. When we make a _cast_, the cast returns immediately after it's
54//! dispatched to the server.
55//!
56//! We need to start our registry within an async context. Refer
57//!
58//! Here is a minimal code example showing a server, `MyServer`, which
59//! implements [`GenServer`]. We then create a registry and start it.
60//!
61//! ```
62//! // this feature must be enabled at the crate level
63//! #![feature(type_alias_impl_trait, impl_trait_in_assoc_type)]
64//!
65//! use std::future::Future;
66//!
67//! use genserver::{make_registry, GenServer};
68//!
69//! struct MyServer {
70//! // Any state your server needs can go right
71//! // in here. We keep a registry around in case
72//! // we want to call any other servers from our
73//! // server.
74//! registry: MyRegistry,
75//! }
76//!
77//! impl GenServer for MyServer {
78//! // Message type for this server.
79//! type Message = String;
80//! // The type of the registry defined by the `make_registry` attribute macro.
81//! type Registry = MyRegistry;
82//! // The response type for calls.
83//! type Response = String;
84//!
85//! // The call response type, a future, which returns Self::Response.
86//! type CallResponse<'a> = impl Future<Output = Self::Response> + 'a;
87//! // The cast response type, also a future, which returns unit.
88//! type CastResponse<'a> = impl Future<Output = ()> + 'a;
89//!
90//! fn new(registry: Self::Registry) -> Self {
91//! // When are server is started, the registry passes a copy
92//! // of itself here. We keep it around so we can call
93//! // other servers from this one.
94//! Self { registry }
95//! }
96//!
97//! // Calls to handle_call will block until our `Response` is returned.
98//! // Because they return a future, we can return an async block here.
99//! fn handle_call(&mut self, message: Self::Message) -> Self::CallResponse<'_> {
100//! println!("handle_call received {}", message);
101//! std::future::ready("returned from handle_call".into())
102//! }
103//!
104//! // Casts always return (), because they do not block callers and return
105//! // immediately.
106//! fn handle_cast(&mut self, message: Self::Message) -> Self::CastResponse<'_> {
107//! println!("handle_cast received {}", message);
108//! std::future::ready(())
109//! }
110//! }
111//!
112//! #[make_registry{
113//! myserver: MyServer
114//! }]
115//! struct MyRegistry;
116//!
117//! tokio_test::block_on(async {
118//! let registry = MyRegistry::start().await;
119//!
120//! let response = registry
121//! .call_myserver("calling myserver".into())
122//! .await
123//! .unwrap();
124//! registry
125//! .cast_myserver("casting to myserver".into())
126//! .await
127//! .unwrap();
128//! });
129//! ```
130//!
131//! Note that in the code above, `MyServer::handle_call` and
132//! `MyServer::handle_cast` return futures. That means you can include an async
133//! block within the function and return it. Refer to [`GenServer`] for more
134//! details.
135//!
136//! ## Supplying initialization data to your server
137//!
138//! With Elixir's GenServer, you can initialize state with the `init()` method.
139//! However, in Rust this is a bit tricky due to the type rules, and it makes
140//! the implementation much more complicated.
141//!
142//! The [`GenServer`] trait lets you control the initialization of your server
143//! with the `new()` method. However, there's no way to pass additional
144//! parameters to `new()`. Instead, you have 2 options:
145//!
146//! 1. You can simply create a message with initial parameters, and send that
147//! message to your server immediately after launching. This is the best option,
148//! though it has some drawbacks such as needing to make most fields optional.
149//! 2. You could add fields to your [`Registry`]. This is the least preferred
150//! option, as the registry should be immutable and easy to make many copies of.
151//!
152//! ## Handling state within servers
153//!
154//! Each server instance you create is essentially an ordinary struct, but it's
155//! owned and created by the registry. Moving state into your struct can be
156//! accomplished by sending it messages with state (Pro Tip: use enums for
157//! messages). Your server struct can contain any arbitrary state you want.
158//!
159//! For example, you can create a server like this:
160//!
161//! ```
162//! use std::collections::HashMap;
163//!
164//! pub struct MyStatefulServer {
165//! map: HashMap<String, String>,
166//! }
167//! ```
168//!
169//! And in your `new()` implementation for [`GenServer::new()`], you can
170//! initialize `map` with an empty hashmap using `map: HashMap::new()`. You can
171//! also use [`Option`] to wrap fields which require state initialization after
172//! your server is started, and send a message to your server to initialize that
173//! state.
174//!
175//! ## Changing channel queue size
176//!
177//! We use bounded queues, which provide backpressure when queues are full. This
178//! is probably not something you'll need to worry much about, but if (for some
179//! reason) you want to change the channel queue length, it can be done so by
180//! implementing the [`GenServer::channel_queue_size()`] method for your server.
181//!
182//! ## Scaling to lots of things
183//!
184//! Each [`GenServer`] is spawned with `tokio::task::spawn()` into its own
185//! main loop, but within each loop all tasks are executed within a local
186//! thread. Thus, each individual server instance is thread-bound. In order to
187//! increase parallelism, you need to split up your service into more
188//! `GenServer` instances.
189//!
190//! Creating new servers is relatively cheap, and making calls between them is
191//! also cheap. The more separate servers you introduce, the higher a level of
192//! parallelism you can achieve. For a processing pipeline that needs to be
193//! highly parallelized, simply create a new server for each step or stage of
194//! your pipeline and have each stage hand off to the next.
195pub mod joinset;
196
197use std::future::Future;
198
199/// Makes a registry.
200///
201/// This attribute applies to structs and converts an ordinary struct into a
202/// registry. While you can pass any ordinary struct to this attribute, it's
203/// recommended that you don't add any fields or methods to the registry crate.
204///
205/// You must specify the servers you want to register as an argument, in `name:
206/// Type` pairs.
207///
208/// For example, if we use the following code:
209///
210/// ```compile_fail
211/// #[make_registry {
212/// first_server: FirstServer,
213/// second_server: SecondServer,
214/// }]
215/// struct MyRegistry;
216/// ```
217///
218/// This will generate a registry called `MyRegistry` which implements the
219/// [`Registry`] trait. In the example above, we specified 2 servers,
220/// named `first_server` and `second_server`, and are of the type `FirstServer`
221/// and `SecondServer` respectively.
222///
223/// For each server registered with this registry, the `call_{name}()`,
224/// `call_{name}_with_timeout()`, and `cast_{name}()` will be generated for this
225/// registry which can be used to make calls and casts to each server. You can
226/// (if you wish) define multiple separate instances of the same type, so long
227/// as each one has a unique name, much like you would when defining struct
228/// fields.
229///
230/// Using the example above, we'd generate functions with the following names in
231/// our registry:
232///
233/// | Call fn | Call fn with timeout | Cast fn (non-blocking) |
234/// |---|---|---|
235/// |`call_first_server`|`call_first_server_with_timeout`|`cast_first_server`|
236/// |`call_second_server`|`call_second_server_with_timeout`|`cast_second_server`|
237///
238/// This macro will additionally derive the [`Clone`] trait.
239///
240/// Note that this example will not compile because the [`GenServer`] trait is
241/// not implemented for this example case.
242///
243/// The full generated code for the example above is as follows:
244///
245/// ```compile_fail
246/// struct MyRegistry {
247/// first_server_tx: tokio::sync::mpsc::Sender<(
248/// <FirstServer as genserver::GenServer>::Message,
249/// Option<tokio::sync::oneshot::Sender<<FirstServer as genserver::GenServer>::Response>>,
250/// )>,
251/// second_server_tx: tokio::sync::mpsc::Sender<(
252/// <SecondServer as genserver::GenServer>::Message,
253/// Option<tokio::sync::oneshot::Sender<<SecondServer as genserver::GenServer>::Response>>,
254/// )>,
255/// }
256/// #[automatically_derived]
257/// impl ::core::clone::Clone for MyRegistry {
258/// #[inline]
259/// fn clone(&self) -> MyRegistry {
260/// MyRegistry {
261/// first_server_tx: ::core::clone::Clone::clone(&self.first_server_tx),
262/// second_server_tx: ::core::clone::Clone::clone(&self.second_server_tx),
263/// }
264/// }
265/// }
266/// impl genserver::Registry for MyRegistry {}
267/// impl MyRegistry {
268/// pub async fn start() -> Self {
269/// let (first_server_tx, mut first_server_rx) = tokio::sync::mpsc::channel::<(
270/// <FirstServer as genserver::GenServer>::Message,
271/// Option<
272/// tokio::sync::oneshot::Sender<<FirstServer as genserver::GenServer>::Response>,
273/// >,
274/// )>(1_000);
275/// let (second_server_tx, mut second_server_rx) = tokio::sync::mpsc::channel::<(
276/// <SecondServer as genserver::GenServer>::Message,
277/// Option<
278/// tokio::sync::oneshot::Sender<<SecondServer as genserver::GenServer>::Response>,
279/// >,
280/// )>(1_000);
281/// let mut registry = Self {
282/// first_server_tx,
283/// second_server_tx,
284/// };
285/// {
286/// let local_registry = registry.clone();
287/// tokio::spawn(async move {
288/// let mut handler = FirstServer::new(local_registry);
289/// while let Some((message, oneshot)) = first_server_rx.recv().await {
290/// if let Some(oneshot) = oneshot {
291/// let Response = handler.handle_call(message).await;
292/// oneshot.send(Response).ok();
293/// } else {
294/// handler.handle_cast(message).await;
295/// }
296/// }
297/// });
298/// }
299/// {
300/// let local_registry = registry.clone();
301/// tokio::spawn(async move {
302/// let mut handler = SecondServer::new(local_registry);
303/// while let Some((message, oneshot)) = second_server_rx.recv().await {
304/// if let Some(oneshot) = oneshot {
305/// let Response = handler.handle_call(message).await;
306/// oneshot.send(Response).ok();
307/// } else {
308/// handler.handle_cast(message).await;
309/// }
310/// }
311/// });
312/// }
313/// registry
314/// }
315///
316/// pub async fn call_first_server(
317/// &self,
318/// message: <FirstServer as genserver::GenServer>::Message,
319/// ) -> Result<
320/// <FirstServer as genserver::GenServer>::Response,
321/// genserver::Error<(
322/// <FirstServer as genserver::GenServer>::Message,
323/// Option<
324/// tokio::sync::oneshot::Sender<<FirstServer as genserver::GenServer>::Message>,
325/// >,
326/// )>,
327/// > {
328/// let (oneshot_tx, oneshot_rx) =
329/// tokio::sync::oneshot::channel::<<FirstServer as genserver::GenServer>::Response>();
330/// self.first_server_tx
331/// .send((message, Some(oneshot_tx)))
332/// .await?;
333/// let Response = oneshot_rx.await?;
334/// Ok(Response)
335/// }
336///
337/// pub async fn cast_first_server(
338/// &self,
339/// message: <FirstServer as genserver::GenServer>::Message,
340/// ) -> Result<
341/// (),
342/// genserver::Error<(
343/// <FirstServer as genserver::GenServer>::Message,
344/// Option<
345/// tokio::sync::oneshot::Sender<<FirstServer as genserver::GenServer>::Message>,
346/// >,
347/// )>,
348/// > {
349/// self.first_server_tx.send((message, None)).await?;
350/// Ok(())
351/// }
352///
353/// pub async fn call_second_server(
354/// &self,
355/// message: <SecondServer as genserver::GenServer>::Message,
356/// ) -> Result<
357/// <SecondServer as genserver::GenServer>::Response,
358/// genserver::Error<(
359/// <SecondServer as genserver::GenServer>::Message,
360/// Option<
361/// tokio::sync::oneshot::Sender<<SecondServer as genserver::GenServer>::Message>,
362/// >,
363/// )>,
364/// > {
365/// let (oneshot_tx, oneshot_rx) =
366/// tokio::sync::oneshot::channel::<<SecondServer as genserver::GenServer>::Response>();
367/// self.second_server_tx
368/// .send((message, Some(oneshot_tx)))
369/// .await?;
370/// let Response = oneshot_rx.await?;
371/// Ok(Response)
372/// }
373///
374/// pub async fn cast_second_server(
375/// &self,
376/// message: <SecondServer as genserver::GenServer>::Message,
377/// ) -> Result<
378/// (),
379/// genserver::Error<(
380/// <SecondServer as genserver::GenServer>::Message,
381/// Option<
382/// tokio::sync::oneshot::Sender<<SecondServer as genserver::GenServer>::Message>,
383/// >,
384/// )>,
385/// > {
386/// self.second_server_tx.send((message, None)).await?;
387/// Ok(())
388/// }
389/// }
390/// ```
391///
392/// ## Adding your own state
393///
394/// If you'd like to add your own state to a registry, you can do so like you
395/// normally would with any other struct, so long as the fields are named (you
396/// can't use an unnamed field struct). Any types you add must also implement
397/// [`Send`], [`Sync`], and [`Clone`].
398///
399/// When you specify fields in a struct, it will create a `new()` method which
400/// takes all those fields by value for initialization. For example, this code:
401///
402/// ```
403/// use std::sync::atomic::{AtomicUsize, Ordering};
404/// use std::sync::Arc;
405///
406/// use genserver::make_registry;
407///
408/// #[make_registry{}]
409/// struct MyRegistry {
410/// counter: Arc<AtomicUsize>,
411/// }
412/// ```
413///
414/// Will generate a registry with a `new()` which has the following signature:
415/// ```compile_fail
416/// pub async fn start(counter: Arc<AtomicUsize>) -> Self {
417/// // ...
418/// }
419/// ```
420
421pub use genserver_codegen::make_registry;
422
423/// Error wrapper type.
424#[derive(Debug)]
425pub enum Error<M, R> {
426 OneshotRecvError(tokio::sync::oneshot::error::RecvError),
427 MpscSendError(
428 tokio::sync::mpsc::error::SendError<(M, Option<tokio::sync::oneshot::Sender<R>>)>,
429 ),
430 Timeout,
431}
432
433impl<M, R> From<tokio::sync::oneshot::error::RecvError> for Error<M, R> {
434 fn from(err: tokio::sync::oneshot::error::RecvError) -> Self {
435 Self::OneshotRecvError(err)
436 }
437}
438
439impl<M, R> From<tokio::sync::mpsc::error::SendError<(M, Option<tokio::sync::oneshot::Sender<R>>)>>
440 for Error<M, R>
441{
442 fn from(
443 err: tokio::sync::mpsc::error::SendError<(M, Option<tokio::sync::oneshot::Sender<R>>)>,
444 ) -> Self {
445 Self::MpscSendError(err)
446 }
447}
448
449/// The `GenServer` trait lets you generate a server, by implementing the trait.
450pub trait GenServer {
451 /// Specifies the type of messages this server can receive.
452 type Message;
453 /// Specifies the response from calls.
454 type Response;
455 /// Specifies the name of the registry type created with [`make_registry`].
456 type Registry;
457 /// Specifies the call response type, which must implement
458 /// [std::future::Future].
459 type CallResponse<'a>: Future<Output = Self::Response>
460 where
461 Self: 'a;
462 /// Specifies the call response type, which must implement
463 /// [std::future::Future] and return `()`.
464 type CastResponse<'a>: Future<Output = ()>
465 where
466 Self: 'a;
467
468 /// Creates a new server, and receives a copy of the current registry.
469 ///
470 /// You should never need to call this method yourself, as it's called for
471 /// you by the registry.
472 fn new(registry: Self::Registry) -> Self;
473 /// This function will be called whenever this server receives a call.
474 fn handle_call(&mut self, message: Self::Message) -> Self::CallResponse<'_>;
475 /// This function will be called whenever this server receives a cast.
476 fn handle_cast(&mut self, message: Self::Message) -> Self::CastResponse<'_>;
477
478 /// Reimplement this method to change the channel queue size for your
479 /// server. Defaults to 1000 messages.
480 fn channel_queue_size() -> usize {
481 1_000
482 }
483}
484
485/// Trait for registries created with [`make_registry`].
486pub trait Registry {
487 /// Shuts down a registry.
488 fn shutdown(&mut self);
489}