Skip to main content

hydra/
gen_server.rs

1use std::future::Future;
2use std::time::Duration;
3
4use tokio::sync::oneshot;
5
6use serde::Deserialize;
7use serde::Serialize;
8
9use crate::CallError;
10use crate::Dest;
11use crate::Dests;
12use crate::ExitReason;
13use crate::From;
14use crate::GenServerOptions;
15use crate::Message;
16use crate::Pid;
17use crate::Process;
18use crate::Receivable;
19use crate::Reference;
20use crate::SystemMessage;
21
22/// Unique message type for a [GenServer] cast, call, and reply.
23#[derive(Debug, Serialize, Deserialize)]
24enum GenServerMessage<T: Send + 'static> {
25    #[serde(rename = "$gen_cast")]
26    Cast(T),
27    #[serde(rename = "$gen_call")]
28    Call(From, T),
29    #[serde(rename = "$gen_reply")]
30    CallReply(Reference, T),
31    #[serde(rename = "$gen_stop")]
32    Stop(ExitReason),
33}
34
35/// A trait for implementing the server of a client-server relation.
36///
37/// A [GenServer] is a process like any other hydra process and it can be used to keep state,
38/// execute code asynchronously and so on.
39///
40/// The advantage of using a generic server process (GenServer) implemented using this
41/// trait is that it will have a standard set of trait functions and include functionality
42/// for tracing and error reporting.
43///
44/// It will also fit into a supervision tree.
45///
46/// ## Example
47/// Let's start with a code example and then explore the available callbacks. Imagine we want to implement a service with a GenServer that works like a stack, allowing us to push and pop elements. We'll customize a generic GenServer with our own module by implementing three callbacks.
48///
49/// ```ignore
50/// #[derive(Debug, Serialize, Deserialize)]
51/// enum StackMessage {
52///     Pop,
53///     PopResult(String),
54///     Push(String),
55/// }
56///
57/// struct Stack {
58///     stack: Vec<String>,
59/// }
60///
61/// impl Stack {
62///     pub fn with_entries(entries: Vec<&'static str>) -> Self {
63///         Self {
64///             stack: Vec::from_iter(entries.into_iter().map(Into::into)),
65///         }
66///     }
67/// }
68///
69/// impl GenServer for Stack {
70///     type Message = StackMessage;
71///
72///     async fn init(&mut self) -> Result<(), ExitReason> {
73///         Ok(())
74///     }
75///
76///     async fn handle_call(&mut self, message: Self::Message, _from: From) -> Result<Option<Self::Message>, ExitReason> {
77///         match message {
78///             StackMessage::Pop => Ok(Some(StackMessage::PopResult(self.stack.remove(0)))),
79///             _ => unreachable!(),
80///         }
81///     }
82///
83///     async fn handle_cast(&mut self, message: Self::Message) -> Result<(), ExitReason> {
84///         match message {
85///             StackMessage::Push(value) => self.stack.insert(0, value),
86///             _ => unreachable!(),
87///         }
88///         Ok(())
89///     }
90/// }
91/// ```
92///
93/// We leave the process machinery of startup, message passing, and the message loop to the GenServer.
94/// We can now use the GenServer methods to interact with the service by creating a process and sending it messages:
95/// ```ignore
96/// // Start the server.
97/// let pid = Stack::with_entries(vec![String::from("hello"), String::from("world")])
98///             .start_link(GenServerOptions::new())
99///             .await
100///             .expect("Failed to start stack!");
101///
102/// // This is the client.
103/// Stack::call(pid, StackMessage::Pop, None)
104///         .await
105///         .expect("Stack call failed!");
106/// // => StackMessage::PopResult("hello")
107///
108/// Stack::cast(pid, StackMessage::Push(String::from("rust")))
109///
110/// Stack::call(pid, StackMessage::Pop, None)
111///         .await
112///         .expect("Stack call failed!");
113/// // => StackMessage::PopResult("rust")
114/// ```
115pub trait GenServer: Sized + Send + 'static {
116    /// The message type that this server will use.
117    type Message: Receivable;
118
119    /// Invoked when the server is started. `start_link` or `start` will block until it returns.
120    fn init(&mut self) -> impl Future<Output = Result<(), ExitReason>> + Send;
121
122    /// Starts a [GenServer] process without links.
123    fn start(
124        self,
125        options: GenServerOptions,
126    ) -> impl Future<Output = Result<Pid, ExitReason>> + Send {
127        async { start_gen_server(self, options, false).await }
128    }
129
130    /// Starts a [GenServer] process linked to the current process.
131    fn start_link(
132        self,
133        options: GenServerOptions,
134    ) -> impl Future<Output = Result<Pid, ExitReason>> + Send {
135        async { start_gen_server(self, options, true).await }
136    }
137
138    /// Synchronously stops the server with the given `reason`.
139    ///
140    /// The `terminate` callback of the given `server` will be invoked before exiting. This function returns an error if the process
141    /// exits with a reason other than the given `reason`.
142    ///
143    /// The default timeout is infinity.
144    fn stop<T: Into<Dest>>(
145        server: T,
146        reason: ExitReason,
147        timeout: Option<Duration>,
148    ) -> impl Future<Output = Result<(), ExitReason>> {
149        async move {
150            let server = server.into();
151            let monitor = Process::monitor(server.clone());
152
153            Process::send(
154                server,
155                GenServerMessage::<Self::Message>::Stop(reason.clone()),
156            );
157
158            let receiver = Process::receiver()
159                .for_message::<GenServerMessage<Self::Message>>()
160                .select(|message| matches!(message, Message::System(SystemMessage::ProcessDown(_, tag, _)) if *tag == monitor));
161
162            let result = match timeout {
163                Some(duration) => Process::timeout(duration, receiver).await,
164                None => Ok(receiver.await),
165            };
166
167            match result {
168                Ok(Message::System(SystemMessage::ProcessDown(_, _, exit_reason))) => {
169                    if reason == exit_reason {
170                        Ok(())
171                    } else {
172                        Err(exit_reason)
173                    }
174                }
175                Err(_) => {
176                    Process::demonitor(monitor);
177
178                    Err(ExitReason::from("timeout"))
179                }
180                _ => unreachable!(),
181            }
182        }
183    }
184
185    /// Casts a request to the `servers` without waiting for a response.
186    ///
187    /// It is unknown whether the destination server successfully handled the request.
188    ///
189    /// See [Process::send] for performance trade-offs.
190    fn cast<T: Into<Dests>>(servers: T, message: Self::Message) {
191        Process::send(servers, GenServerMessage::Cast(message));
192    }
193
194    /// Casts a request to the `servers` after the given `duration` without waiting for a response.
195    ///
196    /// It is unknown whether the destination server successfully handled the request.
197    ///
198    /// See [Process::send] for performance trade-offs.
199    fn cast_after<T: Into<Dests>>(
200        servers: T,
201        message: Self::Message,
202        duration: Duration,
203    ) -> Reference {
204        Process::send_after(servers, GenServerMessage::Cast(message), duration)
205    }
206
207    /// Makes a synchronous call to the `server` and waits for it's reply.
208    ///
209    /// The client sends the given `message` to the server and waits until a reply
210    /// arrives or a timeout occurs. `handle_call` will be called on the server to handle the request.
211    ///
212    /// The default timeout is 5000ms.
213    fn call<T: Into<Dest>>(
214        server: T,
215        message: Self::Message,
216        timeout: Option<Duration>,
217    ) -> impl Future<Output = Result<Self::Message, CallError>> + Send {
218        let server = server.into();
219
220        async move {
221            let monitor = if server.is_local() {
222                Process::monitor(server.clone())
223            } else {
224                Process::monitor_alias(server.clone(), true)
225            };
226
227            let from = From::new(Process::current(), monitor, server.is_remote());
228
229            Process::send(server, GenServerMessage::Call(from, message));
230
231            let receiver = Process::receiver()
232                .for_message::<GenServerMessage<Self::Message>>()
233                .select(|message| {
234                    match message {
235                        Message::User(GenServerMessage::CallReply(tag, _)) => {
236                            // Make sure the tag matches the monitor.
237                            *tag == monitor
238                        }
239                        Message::System(SystemMessage::ProcessDown(_, tag, _)) => {
240                            // Make sure the tag matches the monitor.
241                            *tag == monitor
242                        }
243                        _ => false,
244                    }
245                });
246
247            let result =
248                Process::timeout(timeout.unwrap_or(Duration::from_millis(5000)), receiver).await;
249
250            match result {
251                Ok(Message::User(GenServerMessage::CallReply(_, message))) => {
252                    Process::demonitor(monitor);
253
254                    Ok(message)
255                }
256                Ok(Message::System(SystemMessage::ProcessDown(_, _, reason))) => {
257                    Err(CallError::ServerDown(reason))
258                }
259                Err(timeout) => {
260                    Process::demonitor(monitor);
261
262                    // Drop a stale reply that may already be in the process message inbox.
263                    Process::receiver()
264                        .for_message::<GenServerMessage<Self::Message>>()
265                        .remove(|message| matches!(message, Message::User(GenServerMessage::CallReply(tag, _)) if *tag == monitor));
266
267                    Err(CallError::Timeout(timeout))
268                }
269                _ => unreachable!(),
270            }
271        }
272    }
273
274    /// Replies to a client.
275    ///
276    /// This function can be used to explicitly send a reply to a client that called `call` when the
277    /// reply cannot be specified in the return value of `handle_call`.
278    ///
279    /// `client` must be the `from` argument accepted by `handle_call` callbacks.
280    ///
281    /// Note that `reply` can be called from any process, not just the [GenServer] that originally received the call
282    /// (as long as the GenServer communicated the `from` argument somehow).
283    fn reply(from: From, message: Self::Message) {
284        if from.is_alias() {
285            Process::send(from.tag(), GenServerMessage::CallReply(from.tag(), message));
286        } else {
287            Process::send(from.pid(), GenServerMessage::CallReply(from.tag(), message));
288        }
289    }
290
291    /// Invoked when the server is about to exit. It should do any cleanup required.
292    ///
293    /// `terminate` is useful for cleanup that requires access to the [GenServer]'s state. However, it is not
294    /// guaranteed that `terminate` is called when a [GenServer] exits. Therefore, important cleanup should be done
295    /// using process links and/or monitors. A monitoring process will receive the same `reason` that would be passed to `terminate`.
296    ///
297    /// `terminate` is called if:
298    /// - The [GenServer] traps exits (using [Process::flags]) and the parent process sends an exit signal.
299    /// - A callback (except `init`) returns stop with a given reason.
300    /// - The `stop` method is called on a [GenServer].
301    fn terminate(&mut self, reason: ExitReason) -> impl Future<Output = ()> + Send {
302        async move {
303            let _ = reason;
304        }
305    }
306
307    /// Invoked to handle asynchronous `cast` messages.
308    fn handle_cast(
309        &mut self,
310        message: Self::Message,
311    ) -> impl Future<Output = Result<(), ExitReason>> + Send {
312        async move {
313            let _ = message;
314
315            unimplemented!();
316        }
317    }
318
319    /// Invoked to handle all other messages.
320    fn handle_info(
321        &mut self,
322        info: Message<Self::Message>,
323    ) -> impl Future<Output = Result<(), ExitReason>> + Send {
324        async move {
325            let _ = info;
326
327            Ok(())
328        }
329    }
330
331    /// Invoked to handle synchronous `call` messages. `call` will block until a reply is received
332    /// (unless the call times out or nodes are disconnected).
333    ///
334    /// `from` is a struct containing the callers [Pid] and a [Reference] that uniquely identifies the call.
335    fn handle_call(
336        &mut self,
337        message: Self::Message,
338        from: From,
339    ) -> impl Future<Output = Result<Option<Self::Message>, ExitReason>> + Send {
340        async move {
341            let _ = message;
342            let _ = from;
343
344            unimplemented!();
345        }
346    }
347}
348
349/// Internal [GenServer] start routine.
350async fn start_gen_server<T: GenServer>(
351    gen_server: T,
352    options: GenServerOptions,
353    link: bool,
354) -> Result<Pid, ExitReason> {
355    let (tx, rx) = oneshot::channel::<Result<(), ExitReason>>();
356
357    let parent: Option<Pid> = link.then(Process::current);
358
359    let server = async move {
360        let mut gen_server = gen_server;
361        let mut options = options;
362
363        let parent = parent.unwrap_or(Process::current());
364
365        let registered = if let Some(name) = options.name.take() {
366            Process::register(Process::current(), name).is_ok()
367        } else {
368            true
369        };
370
371        if !registered {
372            tx.send(Err(ExitReason::from("already_started")))
373                .expect("Failed to notify parent process!");
374            return;
375        }
376
377        let timeout = if let Some(duration) = options.timeout.take() {
378            Process::timeout(duration, gen_server.init()).await
379        } else {
380            Ok(gen_server.init().await)
381        };
382
383        match timeout {
384            Ok(Ok(())) => {
385                tx.send(Ok(())).expect("Failed to notify parent process!");
386            }
387            Ok(Err(reason)) => {
388                tx.send(Err(reason.clone()))
389                    .expect("Failed to notify parent process!");
390                return Process::exit(Process::current(), reason);
391            }
392            Err(_) => {
393                tx.send(Err(ExitReason::from("timeout")))
394                    .expect("Failed to notify parent process!");
395                return Process::exit(Process::current(), ExitReason::from("timeout"));
396            }
397        }
398
399        loop {
400            let message: Message<GenServerMessage<T::Message>> = Process::receive().await;
401
402            match message {
403                Message::User(GenServerMessage::Cast(message)) => {
404                    if let Err(reason) = gen_server.handle_cast(message).await {
405                        gen_server.terminate(reason.clone()).await;
406
407                        return Process::exit(Process::current(), reason);
408                    }
409                }
410                Message::User(GenServerMessage::Call(from, message)) => {
411                    match gen_server.handle_call(message, from).await {
412                        Ok(Some(message)) => {
413                            T::reply(from, message);
414                        }
415                        Ok(None) => {
416                            // Server must reply using `GenServer::reply(from, message)`.
417                        }
418                        Err(reason) => {
419                            gen_server.terminate(reason.clone()).await;
420
421                            return Process::exit(Process::current(), reason);
422                        }
423                    }
424                }
425                Message::User(GenServerMessage::CallReply(_, message)) => {
426                    if let Err(reason) = gen_server.handle_info(Message::User(message)).await {
427                        gen_server.terminate(reason.clone()).await;
428
429                        return Process::exit(Process::current(), reason);
430                    }
431                }
432                Message::User(GenServerMessage::Stop(reason)) => {
433                    gen_server.terminate(reason.clone()).await;
434
435                    return Process::exit(Process::current(), reason);
436                }
437                Message::System(system) => match system {
438                    SystemMessage::Exit(epid, reason) if epid == parent => {
439                        gen_server.terminate(reason.clone()).await;
440
441                        return Process::exit(Process::current(), reason);
442                    }
443                    _ => {
444                        if let Err(reason) = gen_server.handle_info(Message::System(system)).await {
445                            gen_server.terminate(reason.clone()).await;
446
447                            return Process::exit(Process::current(), reason);
448                        }
449                    }
450                },
451            }
452        }
453    };
454
455    let pid = if link {
456        Process::spawn_link(server)
457    } else {
458        Process::spawn(server)
459    };
460
461    rx.await
462        .map_err(|_| ExitReason::from("unknown"))?
463        .map(|_| pid)
464}