hydra/gen_server.rs
1use std::future::Future;
2use std::time::Duration;
3
4use tokio::sync::oneshot;
5
6use serde::Deserialize;
7use serde::Serialize;
8
9use crate::CallError;
10use crate::Dest;
11use crate::Dests;
12use crate::ExitReason;
13use crate::From;
14use crate::GenServerOptions;
15use crate::Message;
16use crate::Pid;
17use crate::Process;
18use crate::Receivable;
19use crate::Reference;
20use crate::SystemMessage;
21
22/// Unique message type for a [GenServer] cast, call, and reply.
23#[derive(Debug, Serialize, Deserialize)]
24enum GenServerMessage<T: Send + 'static> {
25 #[serde(rename = "$gen_cast")]
26 Cast(T),
27 #[serde(rename = "$gen_call")]
28 Call(From, T),
29 #[serde(rename = "$gen_reply")]
30 CallReply(Reference, T),
31 #[serde(rename = "$gen_stop")]
32 Stop(ExitReason),
33}
34
35/// A trait for implementing the server of a client-server relation.
36///
37/// A [GenServer] is a process like any other hydra process and it can be used to keep state,
38/// execute code asynchronously and so on.
39///
40/// The advantage of using a generic server process (GenServer) implemented using this
41/// trait is that it will have a standard set of trait functions and include functionality
42/// for tracing and error reporting.
43///
44/// It will also fit into a supervision tree.
45///
46/// ## Example
47/// Let's start with a code example and then explore the available callbacks. Imagine we want to implement a service with a GenServer that works like a stack, allowing us to push and pop elements. We'll customize a generic GenServer with our own module by implementing three callbacks.
48///
49/// ```ignore
50/// #[derive(Debug, Serialize, Deserialize)]
51/// enum StackMessage {
52/// Pop,
53/// PopResult(String),
54/// Push(String),
55/// }
56///
57/// struct Stack {
58/// stack: Vec<String>,
59/// }
60///
61/// impl Stack {
62/// pub fn with_entries(entries: Vec<&'static str>) -> Self {
63/// Self {
64/// stack: Vec::from_iter(entries.into_iter().map(Into::into)),
65/// }
66/// }
67/// }
68///
69/// impl GenServer for Stack {
70/// type Message = StackMessage;
71///
72/// async fn init(&mut self) -> Result<(), ExitReason> {
73/// Ok(())
74/// }
75///
76/// async fn handle_call(&mut self, message: Self::Message, _from: From) -> Result<Option<Self::Message>, ExitReason> {
77/// match message {
78/// StackMessage::Pop => Ok(Some(StackMessage::PopResult(self.stack.remove(0)))),
79/// _ => unreachable!(),
80/// }
81/// }
82///
83/// async fn handle_cast(&mut self, message: Self::Message) -> Result<(), ExitReason> {
84/// match message {
85/// StackMessage::Push(value) => self.stack.insert(0, value),
86/// _ => unreachable!(),
87/// }
88/// Ok(())
89/// }
90/// }
91/// ```
92///
93/// We leave the process machinery of startup, message passing, and the message loop to the GenServer.
94/// We can now use the GenServer methods to interact with the service by creating a process and sending it messages:
95/// ```ignore
96/// // Start the server.
97/// let pid = Stack::with_entries(vec![String::from("hello"), String::from("world")])
98/// .start_link(GenServerOptions::new())
99/// .await
100/// .expect("Failed to start stack!");
101///
102/// // This is the client.
103/// Stack::call(pid, StackMessage::Pop, None)
104/// .await
105/// .expect("Stack call failed!");
106/// // => StackMessage::PopResult("hello")
107///
108/// Stack::cast(pid, StackMessage::Push(String::from("rust")))
109///
110/// Stack::call(pid, StackMessage::Pop, None)
111/// .await
112/// .expect("Stack call failed!");
113/// // => StackMessage::PopResult("rust")
114/// ```
115pub trait GenServer: Sized + Send + 'static {
116 /// The message type that this server will use.
117 type Message: Receivable;
118
119 /// Invoked when the server is started. `start_link` or `start` will block until it returns.
120 fn init(&mut self) -> impl Future<Output = Result<(), ExitReason>> + Send;
121
122 /// Starts a [GenServer] process without links.
123 fn start(
124 self,
125 options: GenServerOptions,
126 ) -> impl Future<Output = Result<Pid, ExitReason>> + Send {
127 async { start_gen_server(self, options, false).await }
128 }
129
130 /// Starts a [GenServer] process linked to the current process.
131 fn start_link(
132 self,
133 options: GenServerOptions,
134 ) -> impl Future<Output = Result<Pid, ExitReason>> + Send {
135 async { start_gen_server(self, options, true).await }
136 }
137
138 /// Synchronously stops the server with the given `reason`.
139 ///
140 /// The `terminate` callback of the given `server` will be invoked before exiting. This function returns an error if the process
141 /// exits with a reason other than the given `reason`.
142 ///
143 /// The default timeout is infinity.
144 fn stop<T: Into<Dest>>(
145 server: T,
146 reason: ExitReason,
147 timeout: Option<Duration>,
148 ) -> impl Future<Output = Result<(), ExitReason>> {
149 async move {
150 let server = server.into();
151 let monitor = Process::monitor(server.clone());
152
153 Process::send(
154 server,
155 GenServerMessage::<Self::Message>::Stop(reason.clone()),
156 );
157
158 let receiver = Process::receiver()
159 .for_message::<GenServerMessage<Self::Message>>()
160 .select(|message| matches!(message, Message::System(SystemMessage::ProcessDown(_, tag, _)) if *tag == monitor));
161
162 let result = match timeout {
163 Some(duration) => Process::timeout(duration, receiver).await,
164 None => Ok(receiver.await),
165 };
166
167 match result {
168 Ok(Message::System(SystemMessage::ProcessDown(_, _, exit_reason))) => {
169 if reason == exit_reason {
170 Ok(())
171 } else {
172 Err(exit_reason)
173 }
174 }
175 Err(_) => {
176 Process::demonitor(monitor);
177
178 Err(ExitReason::from("timeout"))
179 }
180 _ => unreachable!(),
181 }
182 }
183 }
184
185 /// Casts a request to the `servers` without waiting for a response.
186 ///
187 /// It is unknown whether the destination server successfully handled the request.
188 ///
189 /// See [Process::send] for performance trade-offs.
190 fn cast<T: Into<Dests>>(servers: T, message: Self::Message) {
191 Process::send(servers, GenServerMessage::Cast(message));
192 }
193
194 /// Casts a request to the `servers` after the given `duration` without waiting for a response.
195 ///
196 /// It is unknown whether the destination server successfully handled the request.
197 ///
198 /// See [Process::send] for performance trade-offs.
199 fn cast_after<T: Into<Dests>>(
200 servers: T,
201 message: Self::Message,
202 duration: Duration,
203 ) -> Reference {
204 Process::send_after(servers, GenServerMessage::Cast(message), duration)
205 }
206
207 /// Makes a synchronous call to the `server` and waits for it's reply.
208 ///
209 /// The client sends the given `message` to the server and waits until a reply
210 /// arrives or a timeout occurs. `handle_call` will be called on the server to handle the request.
211 ///
212 /// The default timeout is 5000ms.
213 fn call<T: Into<Dest>>(
214 server: T,
215 message: Self::Message,
216 timeout: Option<Duration>,
217 ) -> impl Future<Output = Result<Self::Message, CallError>> + Send {
218 let server = server.into();
219
220 async move {
221 let monitor = if server.is_local() {
222 Process::monitor(server.clone())
223 } else {
224 Process::monitor_alias(server.clone(), true)
225 };
226
227 let from = From::new(Process::current(), monitor, server.is_remote());
228
229 Process::send(server, GenServerMessage::Call(from, message));
230
231 let receiver = Process::receiver()
232 .for_message::<GenServerMessage<Self::Message>>()
233 .select(|message| {
234 match message {
235 Message::User(GenServerMessage::CallReply(tag, _)) => {
236 // Make sure the tag matches the monitor.
237 *tag == monitor
238 }
239 Message::System(SystemMessage::ProcessDown(_, tag, _)) => {
240 // Make sure the tag matches the monitor.
241 *tag == monitor
242 }
243 _ => false,
244 }
245 });
246
247 let result =
248 Process::timeout(timeout.unwrap_or(Duration::from_millis(5000)), receiver).await;
249
250 match result {
251 Ok(Message::User(GenServerMessage::CallReply(_, message))) => {
252 Process::demonitor(monitor);
253
254 Ok(message)
255 }
256 Ok(Message::System(SystemMessage::ProcessDown(_, _, reason))) => {
257 Err(CallError::ServerDown(reason))
258 }
259 Err(timeout) => {
260 Process::demonitor(monitor);
261
262 // Drop a stale reply that may already be in the process message inbox.
263 Process::receiver()
264 .for_message::<GenServerMessage<Self::Message>>()
265 .remove(|message| matches!(message, Message::User(GenServerMessage::CallReply(tag, _)) if *tag == monitor));
266
267 Err(CallError::Timeout(timeout))
268 }
269 _ => unreachable!(),
270 }
271 }
272 }
273
274 /// Replies to a client.
275 ///
276 /// This function can be used to explicitly send a reply to a client that called `call` when the
277 /// reply cannot be specified in the return value of `handle_call`.
278 ///
279 /// `client` must be the `from` argument accepted by `handle_call` callbacks.
280 ///
281 /// Note that `reply` can be called from any process, not just the [GenServer] that originally received the call
282 /// (as long as the GenServer communicated the `from` argument somehow).
283 fn reply(from: From, message: Self::Message) {
284 if from.is_alias() {
285 Process::send(from.tag(), GenServerMessage::CallReply(from.tag(), message));
286 } else {
287 Process::send(from.pid(), GenServerMessage::CallReply(from.tag(), message));
288 }
289 }
290
291 /// Invoked when the server is about to exit. It should do any cleanup required.
292 ///
293 /// `terminate` is useful for cleanup that requires access to the [GenServer]'s state. However, it is not
294 /// guaranteed that `terminate` is called when a [GenServer] exits. Therefore, important cleanup should be done
295 /// using process links and/or monitors. A monitoring process will receive the same `reason` that would be passed to `terminate`.
296 ///
297 /// `terminate` is called if:
298 /// - The [GenServer] traps exits (using [Process::flags]) and the parent process sends an exit signal.
299 /// - A callback (except `init`) returns stop with a given reason.
300 /// - The `stop` method is called on a [GenServer].
301 fn terminate(&mut self, reason: ExitReason) -> impl Future<Output = ()> + Send {
302 async move {
303 let _ = reason;
304 }
305 }
306
307 /// Invoked to handle asynchronous `cast` messages.
308 fn handle_cast(
309 &mut self,
310 message: Self::Message,
311 ) -> impl Future<Output = Result<(), ExitReason>> + Send {
312 async move {
313 let _ = message;
314
315 unimplemented!();
316 }
317 }
318
319 /// Invoked to handle all other messages.
320 fn handle_info(
321 &mut self,
322 info: Message<Self::Message>,
323 ) -> impl Future<Output = Result<(), ExitReason>> + Send {
324 async move {
325 let _ = info;
326
327 Ok(())
328 }
329 }
330
331 /// Invoked to handle synchronous `call` messages. `call` will block until a reply is received
332 /// (unless the call times out or nodes are disconnected).
333 ///
334 /// `from` is a struct containing the callers [Pid] and a [Reference] that uniquely identifies the call.
335 fn handle_call(
336 &mut self,
337 message: Self::Message,
338 from: From,
339 ) -> impl Future<Output = Result<Option<Self::Message>, ExitReason>> + Send {
340 async move {
341 let _ = message;
342 let _ = from;
343
344 unimplemented!();
345 }
346 }
347}
348
349/// Internal [GenServer] start routine.
350async fn start_gen_server<T: GenServer>(
351 gen_server: T,
352 options: GenServerOptions,
353 link: bool,
354) -> Result<Pid, ExitReason> {
355 let (tx, rx) = oneshot::channel::<Result<(), ExitReason>>();
356
357 let parent: Option<Pid> = link.then(Process::current);
358
359 let server = async move {
360 let mut gen_server = gen_server;
361 let mut options = options;
362
363 let parent = parent.unwrap_or(Process::current());
364
365 let registered = if let Some(name) = options.name.take() {
366 Process::register(Process::current(), name).is_ok()
367 } else {
368 true
369 };
370
371 if !registered {
372 tx.send(Err(ExitReason::from("already_started")))
373 .expect("Failed to notify parent process!");
374 return;
375 }
376
377 let timeout = if let Some(duration) = options.timeout.take() {
378 Process::timeout(duration, gen_server.init()).await
379 } else {
380 Ok(gen_server.init().await)
381 };
382
383 match timeout {
384 Ok(Ok(())) => {
385 tx.send(Ok(())).expect("Failed to notify parent process!");
386 }
387 Ok(Err(reason)) => {
388 tx.send(Err(reason.clone()))
389 .expect("Failed to notify parent process!");
390 return Process::exit(Process::current(), reason);
391 }
392 Err(_) => {
393 tx.send(Err(ExitReason::from("timeout")))
394 .expect("Failed to notify parent process!");
395 return Process::exit(Process::current(), ExitReason::from("timeout"));
396 }
397 }
398
399 loop {
400 let message: Message<GenServerMessage<T::Message>> = Process::receive().await;
401
402 match message {
403 Message::User(GenServerMessage::Cast(message)) => {
404 if let Err(reason) = gen_server.handle_cast(message).await {
405 gen_server.terminate(reason.clone()).await;
406
407 return Process::exit(Process::current(), reason);
408 }
409 }
410 Message::User(GenServerMessage::Call(from, message)) => {
411 match gen_server.handle_call(message, from).await {
412 Ok(Some(message)) => {
413 T::reply(from, message);
414 }
415 Ok(None) => {
416 // Server must reply using `GenServer::reply(from, message)`.
417 }
418 Err(reason) => {
419 gen_server.terminate(reason.clone()).await;
420
421 return Process::exit(Process::current(), reason);
422 }
423 }
424 }
425 Message::User(GenServerMessage::CallReply(_, message)) => {
426 if let Err(reason) = gen_server.handle_info(Message::User(message)).await {
427 gen_server.terminate(reason.clone()).await;
428
429 return Process::exit(Process::current(), reason);
430 }
431 }
432 Message::User(GenServerMessage::Stop(reason)) => {
433 gen_server.terminate(reason.clone()).await;
434
435 return Process::exit(Process::current(), reason);
436 }
437 Message::System(system) => match system {
438 SystemMessage::Exit(epid, reason) if epid == parent => {
439 gen_server.terminate(reason.clone()).await;
440
441 return Process::exit(Process::current(), reason);
442 }
443 _ => {
444 if let Err(reason) = gen_server.handle_info(Message::System(system)).await {
445 gen_server.terminate(reason.clone()).await;
446
447 return Process::exit(Process::current(), reason);
448 }
449 }
450 },
451 }
452 }
453 };
454
455 let pid = if link {
456 Process::spawn_link(server)
457 } else {
458 Process::spawn(server)
459 };
460
461 rx.await
462 .map_err(|_| ExitReason::from("unknown"))?
463 .map(|_| pid)
464}