1use std::{fmt, marker::PhantomData};
17
18use async_trait::async_trait;
19use tokio::sync::{mpsc, oneshot};
20
21use crate::{
22 errors::{PostmanError, PuppetError},
23 executor::Executor,
24 pid::Pid,
25 prelude::CriticalError,
26 puppet::{Context, Handler, Puppet, ResponseFor},
27};
28
29pub trait Message: fmt::Debug + Send + 'static {}
40impl<T> Message for T where T: fmt::Debug + Send + 'static {}
41
42#[async_trait]
53pub trait Envelope<P>: Send
54where
55 P: Puppet,
56{
57 async fn handle_message(&mut self, puppet: &mut P, ctx: &mut Context<P>);
59 async fn reply_error(&mut self, ctx: &Context<P>, err: PuppetError);
61}
62
63pub type ReplySender<T> = oneshot::Sender<Result<T, PuppetError>>;
68pub type ReplyReceiver<T> = oneshot::Receiver<Result<T, PuppetError>>;
73
74pub struct Packet<P, E>
83where
84 P: Handler<E>,
85 E: Message,
86{
87 message: Option<E>,
88 reply_address: Option<ReplySender<ResponseFor<P, E>>>,
89 _phantom: PhantomData<P>,
90}
91
92impl<P, E> Packet<P, E>
93where
94 P: Handler<E>,
95 E: Message,
96{
97 #[must_use]
105 pub fn without_reply(message: E) -> Self {
106 Self {
107 message: Some(message),
108 reply_address: None,
109 _phantom: PhantomData,
110 }
111 }
112
113 #[must_use]
121 pub fn with_reply(
122 message: E,
123 reply_address: oneshot::Sender<Result<ResponseFor<P, E>, PuppetError>>,
124 ) -> Self {
125 Self {
126 message: Some(message),
127 reply_address: Some(reply_address),
128 _phantom: PhantomData,
129 }
130 }
131}
132
133#[async_trait]
134impl<P, E> Envelope<P> for Packet<P, E>
135where
136 P: Handler<E>,
137 E: Message + 'static,
138{
139 async fn handle_message(&mut self, puppet: &mut P, ctx: &mut Context<P>) {
140 if let Some(msg) = self.message.take() {
141 let reply_address = self.reply_address.take();
142 if let Err(err) =
143 <P as Handler<E>>::Executor::execute(puppet, ctx, msg, reply_address).await
144 {
145 self.reply_error(ctx, err).await;
146 }
147 } else {
148 let err = ctx.critical_error("Packet has no message");
149 self.reply_error(ctx, err).await;
150 }
151 }
152 async fn reply_error(&mut self, ctx: &Context<P>, err: PuppetError) {
153 if let Some(reply_address) = self.reply_address.take() {
154 if reply_address.send(Err(err)).is_err() {
155 let err =
156 CriticalError::new(ctx.pid, "Failed to send response over the oneshot channel");
157 ctx.report_unrecoverable_failure(err);
158 }
159 }
160 }
161}
162
163pub struct ServicePacket {
172 pub(crate) cmd: Option<ServiceCommand>,
173 pub(crate) reply_address: Option<oneshot::Sender<Result<(), PuppetError>>>,
174}
175
176impl ServicePacket {
177 #[must_use]
178 pub fn with_reply(
186 cmd: ServiceCommand,
187 reply_address: oneshot::Sender<Result<(), PuppetError>>,
188 ) -> Self {
189 Self {
190 cmd: Some(cmd),
191 reply_address: Some(reply_address),
192 }
193 }
194
195 #[must_use]
203 pub fn without_reply(cmd: ServiceCommand) -> Self {
204 Self {
205 cmd: Some(cmd),
206 reply_address: None,
207 }
208 }
209
210 pub(crate) async fn handle_command<P>(
225 &mut self,
226 puppet: &mut P,
227 ctx: &mut Context<P>,
228 ) -> Result<(), PuppetError>
229 where
230 P: Puppet,
231 {
232 let cmd = self
233 .cmd
234 .take()
235 .ok_or_else(|| PuppetError::critical(ctx.pid, "ServicePacket has no command"))?;
236
237 let reply_address = self
238 .reply_address
239 .take()
240 .ok_or_else(|| PuppetError::critical(ctx.pid, "ServicePacket has no reply address"))?;
241
242 let response = ctx.handle_command(puppet, cmd).await;
243
244 if let Err(PuppetError::Critical(err)) = &response {
245 ctx.report_failure(puppet, err.clone()).await?;
246 }
247
248 reply_address.send(response).map_err(|_err| {
249 PuppetError::critical(ctx.pid, "Failed to send response over the oneshot channel")
250 })?;
251
252 Ok(())
253 }
254
255 pub(crate) fn reply_error(&mut self, err: PuppetError) {
260 if let Some(reply_address) = self.reply_address.take() {
261 let _ = reply_address.send(Err(err));
262 }
263 }
264}
265
266#[derive(Debug, Clone, strum::Display, PartialEq, Eq)]
271pub enum RestartStage {
272 Start,
273 Stop,
274}
275
276#[derive(Debug, Clone, strum::Display)]
284pub enum ServiceCommand {
285 Start,
286 Stop,
287 Restart { stage: Option<RestartStage> },
288 ReportFailure { pid: Pid, error: PuppetError },
289 Fail,
290}
291
292#[derive(Debug)]
293pub struct Postman<P>
294where
295 P: Puppet,
296{
297 tx: tokio::sync::mpsc::UnboundedSender<Box<dyn Envelope<P>>>,
298}
299
300impl<P> Clone for Postman<P>
301where
302 P: Puppet,
303{
304 fn clone(&self) -> Self {
305 Self {
306 tx: self.tx.clone(),
307 }
308 }
309}
310
311impl<P> Postman<P>
312where
313 P: Puppet,
314{
315 #[must_use]
316 pub fn new(tx: tokio::sync::mpsc::UnboundedSender<Box<dyn Envelope<P>>>) -> Self {
317 Self { tx }
318 }
319
320 pub(crate) fn send<E>(&self, message: E) -> Result<(), PostmanError>
321 where
322 P: Handler<E>,
323 E: Message + 'static,
324 {
325 let packet = Packet::<P, E>::without_reply(message);
326 self.tx.send(Box::new(packet)).map_err(|_e| {
327 PostmanError::SendError {
328 puppet: Pid::new::<P>(),
329 }
330 })?;
331 Ok(())
332 }
333
334 pub(crate) async fn send_and_await_response<E>(
335 &self,
336 message: E,
337 duration: Option<std::time::Duration>,
338 ) -> Result<ResponseFor<P, E>, PostmanError>
339 where
340 P: Handler<E>,
341 E: Message + 'static,
342 {
343 let (res_tx, res_rx) =
344 tokio::sync::oneshot::channel::<Result<ResponseFor<P, E>, PuppetError>>();
345
346 let packet = Packet::<P, E>::with_reply(message, res_tx);
347 self.tx.send(Box::new(packet)).map_err(|_e| {
348 PostmanError::SendError {
349 puppet: Pid::new::<P>(),
350 }
351 })?;
352
353 if let Some(duration) = duration {
354 (tokio::time::timeout(duration, res_rx).await).map_or_else(
355 |_| {
356 Err(PostmanError::ResponseReceiveError {
357 puppet: Pid::new::<P>(),
358 })
359 },
360 |inner_res| {
361 inner_res.map_or_else(
362 |_| {
363 Err(PostmanError::ResponseReceiveError {
364 puppet: Pid::new::<P>(),
365 })
366 },
367 |res| res.map_err(PostmanError::from),
368 )
369 },
370 )
371 } else {
372 (res_rx.await).map_or_else(
373 |_| {
374 Err(PostmanError::ResponseReceiveError {
375 puppet: Pid::new::<P>(),
376 })
377 },
378 |res| res.map_err(PostmanError::from),
379 )
380 }
381 }
382}
383
384#[derive(Debug, Clone)]
385pub struct ServicePostman {
386 tx: tokio::sync::mpsc::Sender<ServicePacket>,
387}
388
389impl ServicePostman {
390 #[must_use]
391 pub fn new(tx: tokio::sync::mpsc::Sender<ServicePacket>) -> Self {
392 Self { tx }
393 }
394
395 pub(crate) async fn send(
396 &self,
397 puppet: Pid,
398 command: ServiceCommand,
399 ) -> Result<(), PostmanError> {
400 let packet = ServicePacket::without_reply(command);
401 self.tx
402 .send(packet)
403 .await
404 .map_err(|_e| PostmanError::SendError { puppet })
405 }
406
407 pub(crate) async fn send_and_await_response(
408 &self,
409 puppet: Pid,
410 command: ServiceCommand,
411 duration: Option<std::time::Duration>,
412 ) -> Result<(), PostmanError> {
413 let (res_tx, res_rx) = tokio::sync::oneshot::channel::<Result<(), PuppetError>>();
414 let packet = ServicePacket::with_reply(command, res_tx);
415 self.tx
416 .send(packet)
417 .await
418 .map_err(|_e| PostmanError::SendError { puppet })?;
419
420 if let Some(duration) = duration {
421 (tokio::time::timeout(duration, res_rx).await).map_or(
422 Err(PostmanError::ResponseReceiveError { puppet }),
423 |inner_res| {
424 inner_res.map_or(Err(PostmanError::ResponseReceiveError { puppet }), |res| {
425 res.map_err(PostmanError::from)
426 })
427 },
428 )
429 } else {
430 (res_rx.await).map_or(Err(PostmanError::ResponseReceiveError { puppet }), |res| {
431 res.map_err(PostmanError::from)
432 })
433 }
434 }
435}
436
437pub(crate) struct Mailbox<P>
438where
439 P: Puppet,
440{
441 rx: mpsc::UnboundedReceiver<Box<dyn Envelope<P>>>,
442}
443
444impl<P> fmt::Debug for Mailbox<P>
445where
446 P: Puppet,
447{
448 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449 f.debug_struct("Mailbox").field("rx", &self.rx).finish()
450 }
451}
452
453impl<P> Mailbox<P>
454where
455 P: Puppet,
456{
457 pub fn new(rx: mpsc::UnboundedReceiver<Box<dyn Envelope<P>>>) -> Self {
458 Self { rx }
459 }
460 pub async fn recv(&mut self) -> Option<Box<dyn Envelope<P>>> {
461 self.rx.recv().await
462 }
463}
464
465#[derive(Debug)]
466pub(crate) struct ServiceMailbox {
467 rx: tokio::sync::mpsc::Receiver<ServicePacket>,
468}
469
470impl ServiceMailbox {
471 pub fn new(rx: tokio::sync::mpsc::Receiver<ServicePacket>) -> Self {
472 Self { rx }
473 }
474 pub async fn recv(&mut self) -> Option<ServicePacket> {
475 self.rx.recv().await
476 }
477}