1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
use crate::core::connection::request_connection::RequestConnection;
use crate::core::connection::RequestConnectionEnum;
use crate::prelude::*;
use linear_type::Linear;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use tracing::warn;

/// A request hub is used to send requests to other actors which will reply later.
pub trait IsOutRequestHub<M: IsInboundMessage>: Send + Sync + 'static + HasActivate {
    /// Create a new request hub for an actor.
    fn from_parent_and_sender(
        actor_name: &str,
        sender: &tokio::sync::mpsc::UnboundedSender<M>,
    ) -> Self;
}

/// A request message with a reply channel.
///
/// Sending a reply will consume this struct.
///
/// ### Panics:
///
/// The struct cannot be dropped before a reply is sent. The reply must be sent using
/// [RequestWithReplyChannel::reply] or [RequestWithReplyChannel::reply_from_request]. If the
/// struct is dropped without a reply sent, this thread will panic. In this case, the thread of the
/// reply receiver will continue with a warning.
/// However, the reply does not need to be sent immediately. This struct can be stored and
/// forwarded, but it must be sent before the request is dropped.
///
/// The intention of the panic behavior is to make it easier to catch subtle programming
/// errors early where a reply is not sent by mistake. The stacktrace will show the location
/// where the request struct was dropped (before a reply was sent).
///
/// This behavior might change in the future.
#[derive(Debug)]
pub struct RequestWithReplyChannel<Request, Reply> {
    /// The request.
    pub request: Request,
    pub(crate) reply_channel: Linear<tokio::sync::oneshot::Sender<ReplyMessage<Reply>>>,
}

/// A trait for request messages.
pub trait IsRequestWithReplyChannel: Send + Sync + 'static + Debug {
    /// The request type.
    type Request;
    /// The reply type.
    type Reply;
}

impl<
        Request: Send + Sync + 'static + Clone + Debug,
        Reply: Send + Sync + 'static + Clone + Debug,
    > IsRequestWithReplyChannel for RequestWithReplyChannel<Request, Reply>
{
    type Request = Reply;
    type Reply = Reply;
}

impl<Request, Reply: Debug> RequestWithReplyChannel<Request, Reply> {
    /// Reply to the request using the provided function.
    pub fn reply_from_request<F>(self, func: F)
    where
        F: FnOnce(Request) -> Reply,
    {
        let request = self.request;
        let reply = func(request);

        let reply_channel = self.reply_channel.into_inner();
        reply_channel.send(ReplyMessage { reply }).unwrap();
    }

    /// Reply to the request.
    pub fn reply(self, reply: Reply) {
        let reply_channel = self.reply_channel.into_inner();
        reply_channel.send(ReplyMessage { reply }).unwrap();
    }
}

/// A reply to a request.
#[derive(Debug, Clone, Default)]
pub struct ReplyMessage<Reply> {
    /// The reply value.
    pub reply: Reply,
}

/// OutRequestChannel is a connections for sending requests to other actors (and receiving replies
/// later).
pub struct OutRequestChannel<Request, Reply, M: IsInboundMessage> {
    /// Unique name of the request channel.
    pub name: String,
    /// Name of the actor that sends the request messages.
    pub actor_name: String,

    pub(crate) connection_register: RequestConnectionEnum<RequestWithReplyChannel<Request, Reply>>,
    pub(crate) sender: tokio::sync::mpsc::UnboundedSender<M>,
}

impl<Request, Reply, M: IsInboundMessage> HasActivate for OutRequestChannel<Request, Reply, M> {
    fn extract(&mut self) -> Self {
        Self {
            name: self.name.clone(),
            actor_name: self.actor_name.clone(),
            connection_register: self.connection_register.extract(),
            sender: self.sender.clone(),
        }
    }

    fn activate(&mut self) {
        self.connection_register.activate();
    }
}

impl<
        Request: Clone + Send + Sync + std::fmt::Debug + 'static,
        Reply: Clone + Send + Sync + std::fmt::Debug + 'static,
        M: IsInboundMessageNew<ReplyMessage<Reply>>,
    > OutRequestChannel<Request, Reply, M>
{
    /// Creates a new out-request channel for the actor.
    pub fn new(
        name: String,
        actor_name: &str,
        sender: &tokio::sync::mpsc::UnboundedSender<M>,
    ) -> Self {
        Self {
            name: name.clone(),
            actor_name: actor_name.to_owned(),
            connection_register: RequestConnectionEnum::new(),
            sender: sender.clone(),
        }
    }

    /// Connects the out-request channel from this actor to the in-request channel of another actor.
    pub fn connect<Me: IsInRequestMessageNew<RequestWithReplyChannel<Request, Reply>>>(
        &mut self,
        _ctx: &mut Hollywood,
        inbound: &mut InRequestChannel<RequestWithReplyChannel<Request, Reply>, Me>,
    ) {
        self.connection_register.push(Arc::new(RequestConnection {
            sender: inbound.sender.as_ref().clone(),
            inbound_channel: inbound.name.clone(),
            phantom: PhantomData {},
        }));
    }

    /// Sends a request message to the connected in-request channel of other actors.
    pub fn send_request(&self, msg: Request) {
        let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel();
        let msg = RequestWithReplyChannel {
            request: msg,
            reply_channel: Linear::new(reply_sender),
        };
        self.connection_register.send(msg);

        let sender = self.sender.clone();
        let name = self.name.clone();

        tokio::spawn(async move {
            match reply_receiver.await {
                Ok(r) => match sender.send(M::new(name, r)) {
                    Ok(_) => {}
                    Err(e) => {
                        warn!("Error sending request: {:?}", e);
                    }
                },
                Err(e) => {
                    warn!("Reply receiver error: {:?}", e);
                }
            };
        });
    }
}

/// An empty request hub - used for actors that do not have any request channels.
#[derive(Debug, Clone, Default)]
pub struct NullOutRequests {}

impl<M: IsInboundMessage> IsOutRequestHub<M> for NullOutRequests {
    fn from_parent_and_sender(
        _actor_name: &str,
        _sender: &tokio::sync::mpsc::UnboundedSender<M>,
    ) -> Self {
        Self {}
    }
}

impl HasActivate for NullOutRequests {
    fn extract(&mut self) -> Self {
        Self {}
    }

    fn activate(&mut self) {}
}