1use std::{fmt, marker::PhantomData};
2
3use futures_intrusive::sync::ManualResetEvent;
4use parking_lot::Mutex;
5use slotmap::{new_key_type, Key, SlotMap};
6use smallvec::SmallVec;
7
8use crate::{addr::Addr, address_book::AddressBook, envelope::Envelope};
9
10pub(crate) struct RequestTable {
11 owner: Addr,
12 notifier: ManualResetEvent,
13 requests: Mutex<SlotMap<RequestId, RequestInfo>>,
14}
15
16assert_impl_all!(RequestTable: Sync);
17
18type Data = SmallVec<[Option<Envelope>; 1]>;
19
20#[derive(Default)]
21struct RequestInfo {
22 remainder: usize,
23 data: Data,
24 collect_all: bool,
25}
26
27new_key_type! {
28 pub struct RequestId;
29}
30
31impl RequestTable {
32 pub(crate) fn new(owner: Addr) -> Self {
33 Self {
34 owner,
35 notifier: ManualResetEvent::new(false),
36 requests: Mutex::new(SlotMap::default()),
37 }
38 }
39
40 pub(crate) fn new_request(&self, book: AddressBook, collect_all: bool) -> ResponseToken<()> {
41 let mut requests = self.requests.lock();
42 let request_id = requests.insert(RequestInfo {
43 remainder: 1,
44 data: Data::new(),
45 collect_all,
46 });
47 ResponseToken::new(self.owner, request_id, book)
48 }
49
50 pub(crate) fn clone_token(&self, token: &ResponseToken<()>) -> Option<ResponseToken<()>> {
51 debug_assert_eq!(token.sender, self.owner);
52 let mut requests = self.requests.lock();
53 requests.get_mut(token.request_id)?.remainder += 1;
54 let book = token.book.clone();
55 Some(ResponseToken::new(token.sender, token.request_id, book))
56 }
57
58 pub(crate) fn respond(&self, mut token: ResponseToken<()>, envelope: Envelope) {
59 self.resolve(token.sender, token.request_id, Some(envelope));
60 token.forget();
61 }
62
63 pub(crate) async fn wait(&self, request_id: RequestId) -> Data {
64 let mut n = 0;
65
66 loop {
67 self.notifier.wait().await;
68
69 {
70 let mut requests = self.requests.lock();
71 let request = requests.get(request_id).expect("unknown request");
72
73 if request.remainder == 0 {
74 let info = requests.remove(request_id).expect("under lock");
75
76 if requests.values().all(|info| info.remainder != 0) {
78 self.notifier.reset();
79 }
80
81 break info.data;
82 }
83 }
84
85 n += 1;
87 if n % 10 == 0 {
88 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
89 } else {
90 tokio::task::yield_now().await;
91 }
92 }
93 }
94
95 fn resolve(&self, sender: Addr, request_id: RequestId, envelope: Option<Envelope>) {
96 debug_assert_eq!(sender, self.owner);
98 let mut requests = self.requests.lock();
99
100 let request = ward!(requests.get_mut(request_id));
103
104 if request.remainder == 0 {
106 debug_assert!(!request.collect_all);
107 return;
108 }
109
110 if !request.collect_all {
111 if envelope.is_some() {
112 request.data.push(envelope);
113 request.remainder = 0;
114 self.notifier.set();
115 return;
116 }
117 } else {
118 request.data.push(envelope);
119 }
120
121 request.remainder -= 1;
122 if request.remainder == 0 {
123 self.notifier.set();
124 }
125 }
126}
127
128#[must_use]
129pub struct ResponseToken<T> {
130 pub(crate) sender: Addr,
131 pub(crate) request_id: RequestId,
132 book: AddressBook,
133 marker: PhantomData<T>,
134}
135
136impl ResponseToken<()> {
137 pub(crate) fn new(sender: Addr, request_id: RequestId, book: AddressBook) -> Self {
138 Self {
139 sender,
140 request_id,
141 book,
142 marker: PhantomData,
143 }
144 }
145
146 pub(crate) fn into_typed<T>(mut self) -> ResponseToken<T> {
147 let token = ResponseToken {
148 sender: self.sender,
149 request_id: self.request_id,
150 book: self.book.clone(),
151 marker: PhantomData,
152 };
153 self.forget();
154 token
155 }
156}
157
158impl<R> ResponseToken<R> {
159 pub(crate) fn forgotten(book: AddressBook) -> Self {
160 Self {
161 sender: Addr::NULL,
162 request_id: RequestId::null(),
163 book,
164 marker: PhantomData,
165 }
166 }
167
168 pub(crate) fn into_untyped(mut self) -> ResponseToken<()> {
169 let token = ResponseToken {
170 sender: self.sender,
171 request_id: self.request_id,
172 book: self.book.clone(),
173 marker: PhantomData,
174 };
175 self.forget();
176 token
177 }
178
179 pub(crate) fn is_forgotten(&self) -> bool {
180 self.request_id == RequestId::null()
181 }
182
183 fn forget(&mut self) {
184 self.request_id = RequestId::null();
185 }
186}
187
188impl<T> Drop for ResponseToken<T> {
189 fn drop(&mut self) {
190 if self.request_id.is_null() {
192 return;
193 }
194
195 let object = ward!(self.book.get(self.sender));
196 let actor = ward!(object.as_actor());
197 actor
198 .request_table()
199 .resolve(self.sender, self.request_id, None);
200 }
201}
202
203impl<T> fmt::Debug for ResponseToken<T> {
204 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205 f.debug_struct("ResponseToken").finish()
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 use std::sync::Arc;
214
215 use elfo_macros::message;
216
217 use crate::{actor::ActorMeta, assert_msg_eq, envelope::MessageKind, scope::Scope};
218
219 #[message(elfo = crate)]
220 #[derive(PartialEq)]
221 struct Num(u32);
222
223 fn envelope(addr: Addr, num: Num) -> Envelope {
224 Scope::test(
225 addr,
226 Arc::new(ActorMeta {
227 group: "test".into(),
228 key: String::new(),
229 }),
230 )
231 .sync_within(|| Envelope::new(num, MessageKind::Regular { sender: addr }).upcast())
232 }
233
234 #[tokio::test]
235 async fn one_request_one_response() {
236 let addr = Addr::from_bits(1);
237 let table = Arc::new(RequestTable::new(addr));
238 let book = AddressBook::new();
239
240 for _ in 0..3 {
241 let token = table.new_request(book.clone(), true);
242 let request_id = token.request_id;
243
244 let table1 = table.clone();
245 tokio::spawn(async move {
246 table1.respond(token, envelope(addr, Num(42)));
247 });
248
249 let mut data = table.wait(request_id).await;
250
251 assert_eq!(data.len(), 1);
252 assert_msg_eq!(data.pop().unwrap().unwrap(), Num(42));
253 }
254 }
255
256 async fn one_request_many_response(collect_all: bool, ignore: bool) {
257 let addr = Addr::from_bits(1);
258 let table = Arc::new(RequestTable::new(addr));
259 let token = table.new_request(AddressBook::new(), collect_all);
260 let request_id = token.request_id;
261
262 let n = 5;
263 for i in 1..n {
264 let table1 = table.clone();
265 let token = table.clone_token(&token).unwrap();
266 tokio::spawn(async move {
267 if !ignore {
268 table1.respond(token, envelope(addr, Num(i)));
269 } else {
270 table1.resolve(addr, request_id, None);
272 }
273 });
274 }
275
276 if !ignore {
277 table.respond(token, envelope(addr, Num(0)));
278 } else {
279 table.resolve(addr, request_id, None);
281 }
282
283 let mut data = table.wait(request_id).await;
284
285 let expected_len = if ignore {
286 0
287 } else if collect_all {
288 n as usize
289 } else {
290 1
291 };
292 assert_eq!(data.len(), expected_len);
293
294 for (i, envelope) in data.drain(..).enumerate() {
295 if ignore {
296 assert!(envelope.is_none());
297 } else {
298 assert_msg_eq!(envelope.unwrap(), Num(i as u32));
299 }
300 }
301 }
302
303 #[tokio::test]
304 async fn one_request_many_response_all() {
305 one_request_many_response(true, false).await;
306 }
307
308 #[tokio::test]
309 async fn one_request_many_response_all_ignored() {
310 one_request_many_response(false, true).await;
311 }
312
313 #[tokio::test]
314 async fn one_request_many_response_any() {
315 one_request_many_response(false, false).await;
316 }
317
318 #[tokio::test]
319 async fn one_request_many_response_any_ignored() {
320 one_request_many_response(false, true).await;
321 }
322
323 #[tokio::test]
327 async fn late_resolve() {
328 let addr = Addr::from_bits(1);
329 let table = Arc::new(RequestTable::new(addr));
330 let book = AddressBook::new();
331
332 let token = table.new_request(book.clone(), false);
333 let token1 = table.clone_token(&token).unwrap();
334 let request_id = token.request_id;
335
336 let table1 = table.clone();
337 tokio::spawn(async move {
338 table1.respond(token, envelope(addr, Num(42)));
339 });
340
341 let _data = table.wait(request_id).await;
342 table.respond(token1, envelope(addr, Num(43)));
343 }
344}