1use std::{
20 any::TypeId,
21 sync::mpsc::{channel, Receiver, Sender},
22 time::Instant,
23};
24
25use crate::{DeliveryStats, Handler, Message, Service};
26
27pub struct Envelope<S> {
29 message: Box<dyn EnvelopeT<S>>,
30}
31
32impl<S: Service> Envelope<S> {
33 pub fn wrap<M>(message: M) -> Self
34 where
35 M: Message,
36 S: Handler<M>,
37 {
38 Self::wrap_with_reply(message).0
39 }
40
41 pub fn wrap_with_reply<M>(message: M) -> (Self, Receiver<M::Reply>)
42 where
43 M: Message,
44 S: Handler<M>,
45 {
46 let (ack_sender, ack_receiver) = channel();
47 (
48 Envelope {
49 message: Box::new(EnvelopeTImpl {
50 timestamp: Instant::now(),
51 message: Some(message),
52 ack_sender,
53 }),
54 },
55 ack_receiver,
56 )
57 }
58
59 pub fn deliver_to(&mut self, service: &mut S) -> Result<DeliveryStats, &str> {
60 self.message.handle(service)
61 }
62
63 pub fn message_type_id(&self) -> Option<TypeId> {
64 self.message.type_id()
65 }
66}
67
68trait EnvelopeT<S: Service>: Send {
69 fn type_id(&self) -> Option<TypeId>;
70 fn handle(&mut self, service: &mut S) -> Result<DeliveryStats, &str>;
71}
72struct EnvelopeTImpl<M>
73where
74 M: Message,
75{
76 message: Option<M>,
77 ack_sender: Sender<M::Reply>,
78 timestamp: Instant,
79}
80impl<S: Service + Handler<M>, M: Message> EnvelopeT<S> for EnvelopeTImpl<M> {
81 fn type_id(&self) -> Option<TypeId> {
82 self.message.as_ref().map(|m| m.type_id())
83 }
84
85 fn handle(&mut self, service: &mut S) -> Result<DeliveryStats, &'static str> {
86 if let Some(message) = self.message.take() {
87 let processing_at = Instant::now();
88 let r = service.deliver(message);
89
90 let queued = processing_at - self.timestamp;
91 let processing = Instant::now() - processing_at;
92
93 let _error = self.ack_sender.send(r);
95 Ok(DeliveryStats { queued, processing })
96 } else {
97 Err("Attempt to deliver multiple times")
98 }
99 }
100}