pptr/
message.rs

1//! Defines message types and communication primitives for the puppet system.
2//!
3//! This module provides the core components for message-based communication between puppets
4//! and the service that manages them. It defines traits for messages and envelopes, as well
5//! as structs for packets and postmen.
6//!
7//! The main types and traits in this module include:
8//!
9//! - [`Message`]: A marker trait for types that can be used as messages.
10//! - [`Envelope`]: A trait for message envelopes that can be handled by puppets.
11//! - [`Packet`]: A struct representing a message packet with an optional reply address.
12//! - [`ServicePacket`]: A struct representing a packet sent to a service for processing.
13//! - [`Postman`]: A struct for sending messages to puppets.
14//! - [`ServicePostman`]: A struct for sending commands to services.
15//!
16use std::{fmt, marker::PhantomData};
17
18use async_trait::async_trait;
19use tokio::sync::{mpsc, oneshot};
20
21use crate::{
22    errors::{PostmanError, PuppetError},
23    executor::Executor,
24    pid::Pid,
25    prelude::CriticalError,
26    puppet::{Context, Handler, Puppet, ResponseFor},
27};
28
29/// A marker trait for types that can be used as messages.
30///
31/// This trait is automatically implemented for any type that satisfies the following bounds:
32/// - `fmt::Debug`: The type must implement the `Debug` trait for debugging and error reporting.
33/// - `Send`: The type must be safe to send across thread boundaries.
34/// - `'static`: The type must have a static lifetime.
35///
36/// By implementing this trait, a type indicates that it can be used as a message in a messaging
37/// system or communication protocol.
38///
39pub trait Message: fmt::Debug + Send + 'static {}
40impl<T> Message for T where T: fmt::Debug + Send + 'static {}
41
42/// An envelope trait is the visitor pattern for message handling.
43///
44/// This trait allows sending messages that implement the `Envelope` trait,
45/// enabling the sending of any type as a message.
46///
47/// The `handle_message` method takes a mutable reference to the puppet and the
48/// context on which the message should be executed.
49///
50/// The `reply_error` method is a convenience function for sending an error as a
51/// response.
52#[async_trait]
53pub trait Envelope<P>: Send
54where
55    P: Puppet,
56{
57    /// Handles the message using the provided puppet and context.
58    async fn handle_message(&mut self, puppet: &mut P, ctx: &mut Context<P>);
59    /// Sends an error as a response using the provided context.
60    async fn reply_error(&mut self, ctx: &Context<P>, err: PuppetError);
61}
62
63/// A type alias for a one-shot sender used to send a reply.
64///
65/// The `ReplySender` is a type alias for `oneshot::Sender` that sends a `Result` containing either
66/// the response of type `T` or a `PuppetError`.
67pub type ReplySender<T> = oneshot::Sender<Result<T, PuppetError>>;
68/// A type alias for a one-shot receiver used to receive a reply.
69///
70/// The `ReplyReceiver` is a type alias for `oneshot::Receiver` that receives a `Result` containing
71/// either the response of type `T` or a `PuppetError`.
72pub type ReplyReceiver<T> = oneshot::Receiver<Result<T, PuppetError>>;
73
74/// Represents a packet that wraps a message and specifies its type and reply address.
75///
76/// The `Packet` struct is used to encapsulate a message of type `E` along with an optional reply
77/// address. It is generic over two type parameters:
78/// - `P`: The handler type that can handle the message type `E`.
79/// - `E`: The message type that implements the `Message` trait.
80///
81/// The packet can be created with or without a reply address using the provided constructor methods.
82pub struct Packet<P, E>
83where
84    P: Handler<E>,
85    E: Message,
86{
87    message: Option<E>,
88    reply_address: Option<ReplySender<ResponseFor<P, E>>>,
89    _phantom: PhantomData<P>,
90}
91
92impl<P, E> Packet<P, E>
93where
94    P: Handler<E>,
95    E: Message,
96{
97    /// Creates a new `Packet` without a reply address.
98    ///
99    /// This method is a shorthand for creating a packet that does not expect a response.
100    ///
101    /// # Returns
102    ///
103    /// A new `Packet` instance containing the provided message and no reply address.
104    #[must_use]
105    pub fn without_reply(message: E) -> Self {
106        Self {
107            message: Some(message),
108            reply_address: None,
109            _phantom: PhantomData,
110        }
111    }
112
113    /// Creates a new `Packet` with a reply address.
114    ///
115    /// This method is a shorthand for creating a packet that expects a response.
116    ///
117    /// # Returns
118    ///
119    /// A new `Packet` instance containing the provided message and reply address.
120    #[must_use]
121    pub fn with_reply(
122        message: E,
123        reply_address: oneshot::Sender<Result<ResponseFor<P, E>, PuppetError>>,
124    ) -> Self {
125        Self {
126            message: Some(message),
127            reply_address: Some(reply_address),
128            _phantom: PhantomData,
129        }
130    }
131}
132
133#[async_trait]
134impl<P, E> Envelope<P> for Packet<P, E>
135where
136    P: Handler<E>,
137    E: Message + 'static,
138{
139    async fn handle_message(&mut self, puppet: &mut P, ctx: &mut Context<P>) {
140        if let Some(msg) = self.message.take() {
141            let reply_address = self.reply_address.take();
142            if let Err(err) =
143                <P as Handler<E>>::Executor::execute(puppet, ctx, msg, reply_address).await
144            {
145                self.reply_error(ctx, err).await;
146            }
147        } else {
148            let err = ctx.critical_error("Packet has no message");
149            self.reply_error(ctx, err).await;
150        }
151    }
152    async fn reply_error(&mut self, ctx: &Context<P>, err: PuppetError) {
153        if let Some(reply_address) = self.reply_address.take() {
154            if reply_address.send(Err(err)).is_err() {
155                let err =
156                    CriticalError::new(ctx.pid, "Failed to send response over the oneshot channel");
157                ctx.report_unrecoverable_failure(err);
158            }
159        }
160    }
161}
162
163/// Represents a packet of data sent to a service for processing.
164///
165/// A `ServicePacket` contains an `ServiceCommand` and an reply address.
166/// The reply address is used to send a response back to the sender of the packet.
167///
168/// The `cmd` field holds the command to be executed by the service.
169/// The `reply_address` field contains a `Sender` for sending the result of the command
170/// back to the caller, if a reply is expected.
171pub struct ServicePacket {
172    pub(crate) cmd: Option<ServiceCommand>,
173    pub(crate) reply_address: Option<oneshot::Sender<Result<(), PuppetError>>>,
174}
175
176impl ServicePacket {
177    #[must_use]
178    /// Creates a new `ServicePacket` with a reply address.
179    ///
180    /// This method is a shorthand for creating a service packet that expects a response.
181    ///
182    /// # Returns
183    ///
184    /// A new `ServicePacket` instance containing the provided command and reply address.
185    pub fn with_reply(
186        cmd: ServiceCommand,
187        reply_address: oneshot::Sender<Result<(), PuppetError>>,
188    ) -> Self {
189        Self {
190            cmd: Some(cmd),
191            reply_address: Some(reply_address),
192        }
193    }
194
195    /// Creates a new `ServicePacket` without a reply address.
196    ///
197    /// This method is a shorthand for creating a service packet that does not expect a response.
198    ///
199    /// # Returns
200    ///
201    /// A new `ServicePacket` instance containing the provided command and no reply address.
202    #[must_use]
203    pub fn without_reply(cmd: ServiceCommand) -> Self {
204        Self {
205            cmd: Some(cmd),
206            reply_address: None,
207        }
208    }
209
210    /// Handles the command contained in the `ServicePacket`.
211    ///
212    /// This method takes ownership of the command and reply address stored in the `ServicePacket`,
213    /// handles the command using the provided `puppet` and `ctx`, and sends the response back
214    /// through the reply address.
215    ///
216    /// # Errors
217    ///
218    /// Returns a `PuppetError` if:
219    /// - The `ServicePacket` has no command.
220    /// - The `ServicePacket` has no reply address.
221    /// - Sending the response over the oneshot channel fails.
222    ///
223    /// If the command handling results in a critical error, it is reported using `ctx.report_failure()`.
224    pub(crate) async fn handle_command<P>(
225        &mut self,
226        puppet: &mut P,
227        ctx: &mut Context<P>,
228    ) -> Result<(), PuppetError>
229    where
230        P: Puppet,
231    {
232        let cmd = self
233            .cmd
234            .take()
235            .ok_or_else(|| PuppetError::critical(ctx.pid, "ServicePacket has no command"))?;
236
237        let reply_address = self
238            .reply_address
239            .take()
240            .ok_or_else(|| PuppetError::critical(ctx.pid, "ServicePacket has no reply address"))?;
241
242        let response = ctx.handle_command(puppet, cmd).await;
243
244        if let Err(PuppetError::Critical(err)) = &response {
245            ctx.report_failure(puppet, err.clone()).await?;
246        }
247
248        reply_address.send(response).map_err(|_err| {
249            PuppetError::critical(ctx.pid, "Failed to send response over the oneshot channel")
250        })?;
251
252        Ok(())
253    }
254
255    /// Replies with an error to the sender of the `ServicePacket`.
256    ///
257    /// If the `ServicePacket` has a reply address, this method sends the provided error
258    /// over the oneshot channel. If the send operation fails, the error is silently ignored.
259    pub(crate) fn reply_error(&mut self, err: PuppetError) {
260        if let Some(reply_address) = self.reply_address.take() {
261            let _ = reply_address.send(Err(err));
262        }
263    }
264}
265
266/// Represents the stages of a restart operation.
267///
268/// - `Start`: Indicates the start of the restart process.
269/// - `Stop`: Indicates the stop phase of the restart process.
270#[derive(Debug, Clone, strum::Display, PartialEq, Eq)]
271pub enum RestartStage {
272    Start,
273    Stop,
274}
275
276/// Represents the commands that can be sent to a service.
277///
278/// - `Start`: Starts the puppet.
279/// - `Stop`: Stops the puppet.
280/// - `Restart`: Restarts the puppet. The `stage` field indicates the current stage of the restart process.
281/// - `ReportFailure`: Reports a failure in the service identified by `pid` with the given `error`.
282/// - `Fail`: Indicates a failure in the puppet.
283#[derive(Debug, Clone, strum::Display)]
284pub enum ServiceCommand {
285    Start,
286    Stop,
287    Restart { stage: Option<RestartStage> },
288    ReportFailure { pid: Pid, error: PuppetError },
289    Fail,
290}
291
292#[derive(Debug)]
293pub struct Postman<P>
294where
295    P: Puppet,
296{
297    tx: tokio::sync::mpsc::UnboundedSender<Box<dyn Envelope<P>>>,
298}
299
300impl<P> Clone for Postman<P>
301where
302    P: Puppet,
303{
304    fn clone(&self) -> Self {
305        Self {
306            tx: self.tx.clone(),
307        }
308    }
309}
310
311impl<P> Postman<P>
312where
313    P: Puppet,
314{
315    #[must_use]
316    pub fn new(tx: tokio::sync::mpsc::UnboundedSender<Box<dyn Envelope<P>>>) -> Self {
317        Self { tx }
318    }
319
320    pub(crate) fn send<E>(&self, message: E) -> Result<(), PostmanError>
321    where
322        P: Handler<E>,
323        E: Message + 'static,
324    {
325        let packet = Packet::<P, E>::without_reply(message);
326        self.tx.send(Box::new(packet)).map_err(|_e| {
327            PostmanError::SendError {
328                puppet: Pid::new::<P>(),
329            }
330        })?;
331        Ok(())
332    }
333
334    pub(crate) async fn send_and_await_response<E>(
335        &self,
336        message: E,
337        duration: Option<std::time::Duration>,
338    ) -> Result<ResponseFor<P, E>, PostmanError>
339    where
340        P: Handler<E>,
341        E: Message + 'static,
342    {
343        let (res_tx, res_rx) =
344            tokio::sync::oneshot::channel::<Result<ResponseFor<P, E>, PuppetError>>();
345
346        let packet = Packet::<P, E>::with_reply(message, res_tx);
347        self.tx.send(Box::new(packet)).map_err(|_e| {
348            PostmanError::SendError {
349                puppet: Pid::new::<P>(),
350            }
351        })?;
352
353        if let Some(duration) = duration {
354            (tokio::time::timeout(duration, res_rx).await).map_or_else(
355                |_| {
356                    Err(PostmanError::ResponseReceiveError {
357                        puppet: Pid::new::<P>(),
358                    })
359                },
360                |inner_res| {
361                    inner_res.map_or_else(
362                        |_| {
363                            Err(PostmanError::ResponseReceiveError {
364                                puppet: Pid::new::<P>(),
365                            })
366                        },
367                        |res| res.map_err(PostmanError::from),
368                    )
369                },
370            )
371        } else {
372            (res_rx.await).map_or_else(
373                |_| {
374                    Err(PostmanError::ResponseReceiveError {
375                        puppet: Pid::new::<P>(),
376                    })
377                },
378                |res| res.map_err(PostmanError::from),
379            )
380        }
381    }
382}
383
384#[derive(Debug, Clone)]
385pub struct ServicePostman {
386    tx: tokio::sync::mpsc::Sender<ServicePacket>,
387}
388
389impl ServicePostman {
390    #[must_use]
391    pub fn new(tx: tokio::sync::mpsc::Sender<ServicePacket>) -> Self {
392        Self { tx }
393    }
394
395    pub(crate) async fn send(
396        &self,
397        puppet: Pid,
398        command: ServiceCommand,
399    ) -> Result<(), PostmanError> {
400        let packet = ServicePacket::without_reply(command);
401        self.tx
402            .send(packet)
403            .await
404            .map_err(|_e| PostmanError::SendError { puppet })
405    }
406
407    pub(crate) async fn send_and_await_response(
408        &self,
409        puppet: Pid,
410        command: ServiceCommand,
411        duration: Option<std::time::Duration>,
412    ) -> Result<(), PostmanError> {
413        let (res_tx, res_rx) = tokio::sync::oneshot::channel::<Result<(), PuppetError>>();
414        let packet = ServicePacket::with_reply(command, res_tx);
415        self.tx
416            .send(packet)
417            .await
418            .map_err(|_e| PostmanError::SendError { puppet })?;
419
420        if let Some(duration) = duration {
421            (tokio::time::timeout(duration, res_rx).await).map_or(
422                Err(PostmanError::ResponseReceiveError { puppet }),
423                |inner_res| {
424                    inner_res.map_or(Err(PostmanError::ResponseReceiveError { puppet }), |res| {
425                        res.map_err(PostmanError::from)
426                    })
427                },
428            )
429        } else {
430            (res_rx.await).map_or(Err(PostmanError::ResponseReceiveError { puppet }), |res| {
431                res.map_err(PostmanError::from)
432            })
433        }
434    }
435}
436
437pub(crate) struct Mailbox<P>
438where
439    P: Puppet,
440{
441    rx: mpsc::UnboundedReceiver<Box<dyn Envelope<P>>>,
442}
443
444impl<P> fmt::Debug for Mailbox<P>
445where
446    P: Puppet,
447{
448    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449        f.debug_struct("Mailbox").field("rx", &self.rx).finish()
450    }
451}
452
453impl<P> Mailbox<P>
454where
455    P: Puppet,
456{
457    pub fn new(rx: mpsc::UnboundedReceiver<Box<dyn Envelope<P>>>) -> Self {
458        Self { rx }
459    }
460    pub async fn recv(&mut self) -> Option<Box<dyn Envelope<P>>> {
461        self.rx.recv().await
462    }
463}
464
465#[derive(Debug)]
466pub(crate) struct ServiceMailbox {
467    rx: tokio::sync::mpsc::Receiver<ServicePacket>,
468}
469
470impl ServiceMailbox {
471    pub fn new(rx: tokio::sync::mpsc::Receiver<ServicePacket>) -> Self {
472        Self { rx }
473    }
474    pub async fn recv(&mut self) -> Option<ServicePacket> {
475        self.rx.recv().await
476    }
477}