elfo_core/
request_table.rs

1use std::{fmt, marker::PhantomData};
2
3use futures_intrusive::sync::ManualResetEvent;
4use parking_lot::Mutex;
5use slotmap::{new_key_type, Key, SlotMap};
6use smallvec::SmallVec;
7
8use crate::{addr::Addr, address_book::AddressBook, envelope::Envelope};
9
10pub(crate) struct RequestTable {
11    owner: Addr,
12    notifier: ManualResetEvent,
13    requests: Mutex<SlotMap<RequestId, RequestInfo>>,
14}
15
16assert_impl_all!(RequestTable: Sync);
17
18type Data = SmallVec<[Option<Envelope>; 1]>;
19
20#[derive(Default)]
21struct RequestInfo {
22    remainder: usize,
23    data: Data,
24    collect_all: bool,
25}
26
27new_key_type! {
28    pub struct RequestId;
29}
30
31impl RequestTable {
32    pub(crate) fn new(owner: Addr) -> Self {
33        Self {
34            owner,
35            notifier: ManualResetEvent::new(false),
36            requests: Mutex::new(SlotMap::default()),
37        }
38    }
39
40    pub(crate) fn new_request(&self, book: AddressBook, collect_all: bool) -> ResponseToken<()> {
41        let mut requests = self.requests.lock();
42        let request_id = requests.insert(RequestInfo {
43            remainder: 1,
44            data: Data::new(),
45            collect_all,
46        });
47        ResponseToken::new(self.owner, request_id, book)
48    }
49
50    pub(crate) fn clone_token(&self, token: &ResponseToken<()>) -> Option<ResponseToken<()>> {
51        debug_assert_eq!(token.sender, self.owner);
52        let mut requests = self.requests.lock();
53        requests.get_mut(token.request_id)?.remainder += 1;
54        let book = token.book.clone();
55        Some(ResponseToken::new(token.sender, token.request_id, book))
56    }
57
58    pub(crate) fn respond(&self, mut token: ResponseToken<()>, envelope: Envelope) {
59        self.resolve(token.sender, token.request_id, Some(envelope));
60        token.forget();
61    }
62
63    pub(crate) async fn wait(&self, request_id: RequestId) -> Data {
64        let mut n = 0;
65
66        loop {
67            self.notifier.wait().await;
68
69            {
70                let mut requests = self.requests.lock();
71                let request = requests.get(request_id).expect("unknown request");
72
73                if request.remainder == 0 {
74                    let info = requests.remove(request_id).expect("under lock");
75
76                    // TODO: use another approach.
77                    if requests.values().all(|info| info.remainder != 0) {
78                        self.notifier.reset();
79                    }
80
81                    break info.data;
82                }
83            }
84
85            // XXX: dirty fix to avoid high CPU usage.
86            n += 1;
87            if n % 10 == 0 {
88                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
89            } else {
90                tokio::task::yield_now().await;
91            }
92        }
93    }
94
95    fn resolve(&self, sender: Addr, request_id: RequestId, envelope: Option<Envelope>) {
96        // TODO: should we have another strategy for panics?
97        debug_assert_eq!(sender, self.owner);
98        let mut requests = self.requests.lock();
99
100        // `None` here means the request was with `collect_all = false` and
101        // the response has been recieved already.
102        let request = ward!(requests.get_mut(request_id));
103
104        // Extra responses (in `any` case).
105        if request.remainder == 0 {
106            debug_assert!(!request.collect_all);
107            return;
108        }
109
110        if !request.collect_all {
111            if envelope.is_some() {
112                request.data.push(envelope);
113                request.remainder = 0;
114                self.notifier.set();
115                return;
116            }
117        } else {
118            request.data.push(envelope);
119        }
120
121        request.remainder -= 1;
122        if request.remainder == 0 {
123            self.notifier.set();
124        }
125    }
126}
127
128#[must_use]
129pub struct ResponseToken<T> {
130    pub(crate) sender: Addr,
131    pub(crate) request_id: RequestId,
132    book: AddressBook,
133    marker: PhantomData<T>,
134}
135
136impl ResponseToken<()> {
137    pub(crate) fn new(sender: Addr, request_id: RequestId, book: AddressBook) -> Self {
138        Self {
139            sender,
140            request_id,
141            book,
142            marker: PhantomData,
143        }
144    }
145
146    pub(crate) fn into_typed<T>(mut self) -> ResponseToken<T> {
147        let token = ResponseToken {
148            sender: self.sender,
149            request_id: self.request_id,
150            book: self.book.clone(),
151            marker: PhantomData,
152        };
153        self.forget();
154        token
155    }
156}
157
158impl<R> ResponseToken<R> {
159    pub(crate) fn forgotten(book: AddressBook) -> Self {
160        Self {
161            sender: Addr::NULL,
162            request_id: RequestId::null(),
163            book,
164            marker: PhantomData,
165        }
166    }
167
168    pub(crate) fn into_untyped(mut self) -> ResponseToken<()> {
169        let token = ResponseToken {
170            sender: self.sender,
171            request_id: self.request_id,
172            book: self.book.clone(),
173            marker: PhantomData,
174        };
175        self.forget();
176        token
177    }
178
179    pub(crate) fn is_forgotten(&self) -> bool {
180        self.request_id == RequestId::null()
181    }
182
183    fn forget(&mut self) {
184        self.request_id = RequestId::null();
185    }
186}
187
188impl<T> Drop for ResponseToken<T> {
189    fn drop(&mut self) {
190        // We use the special value of `RequestId` to reduce memory usage.
191        if self.request_id.is_null() {
192            return;
193        }
194
195        let object = ward!(self.book.get(self.sender));
196        let actor = ward!(object.as_actor());
197        actor
198            .request_table()
199            .resolve(self.sender, self.request_id, None);
200    }
201}
202
203impl<T> fmt::Debug for ResponseToken<T> {
204    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205        f.debug_struct("ResponseToken").finish()
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    use std::sync::Arc;
214
215    use elfo_macros::message;
216
217    use crate::{actor::ActorMeta, assert_msg_eq, envelope::MessageKind, scope::Scope};
218
219    #[message(elfo = crate)]
220    #[derive(PartialEq)]
221    struct Num(u32);
222
223    fn envelope(addr: Addr, num: Num) -> Envelope {
224        Scope::test(
225            addr,
226            Arc::new(ActorMeta {
227                group: "test".into(),
228                key: String::new(),
229            }),
230        )
231        .sync_within(|| Envelope::new(num, MessageKind::Regular { sender: addr }).upcast())
232    }
233
234    #[tokio::test]
235    async fn one_request_one_response() {
236        let addr = Addr::from_bits(1);
237        let table = Arc::new(RequestTable::new(addr));
238        let book = AddressBook::new();
239
240        for _ in 0..3 {
241            let token = table.new_request(book.clone(), true);
242            let request_id = token.request_id;
243
244            let table1 = table.clone();
245            tokio::spawn(async move {
246                table1.respond(token, envelope(addr, Num(42)));
247            });
248
249            let mut data = table.wait(request_id).await;
250
251            assert_eq!(data.len(), 1);
252            assert_msg_eq!(data.pop().unwrap().unwrap(), Num(42));
253        }
254    }
255
256    async fn one_request_many_response(collect_all: bool, ignore: bool) {
257        let addr = Addr::from_bits(1);
258        let table = Arc::new(RequestTable::new(addr));
259        let token = table.new_request(AddressBook::new(), collect_all);
260        let request_id = token.request_id;
261
262        let n = 5;
263        for i in 1..n {
264            let table1 = table.clone();
265            let token = table.clone_token(&token).unwrap();
266            tokio::spawn(async move {
267                if !ignore {
268                    table1.respond(token, envelope(addr, Num(i)));
269                } else {
270                    // TODO: test a real `Drop`.
271                    table1.resolve(addr, request_id, None);
272                }
273            });
274        }
275
276        if !ignore {
277            table.respond(token, envelope(addr, Num(0)));
278        } else {
279            // TODO: test a real `Drop`.
280            table.resolve(addr, request_id, None);
281        }
282
283        let mut data = table.wait(request_id).await;
284
285        let expected_len = if ignore {
286            0
287        } else if collect_all {
288            n as usize
289        } else {
290            1
291        };
292        assert_eq!(data.len(), expected_len);
293
294        for (i, envelope) in data.drain(..).enumerate() {
295            if ignore {
296                assert!(envelope.is_none());
297            } else {
298                assert_msg_eq!(envelope.unwrap(), Num(i as u32));
299            }
300        }
301    }
302
303    #[tokio::test]
304    async fn one_request_many_response_all() {
305        one_request_many_response(true, false).await;
306    }
307
308    #[tokio::test]
309    async fn one_request_many_response_all_ignored() {
310        one_request_many_response(false, true).await;
311    }
312
313    #[tokio::test]
314    async fn one_request_many_response_any() {
315        one_request_many_response(false, false).await;
316    }
317
318    #[tokio::test]
319    async fn one_request_many_response_any_ignored() {
320        one_request_many_response(false, true).await;
321    }
322
323    // TODO: check many requests.
324    // TODO: check `Drop`.
325
326    #[tokio::test]
327    async fn late_resolve() {
328        let addr = Addr::from_bits(1);
329        let table = Arc::new(RequestTable::new(addr));
330        let book = AddressBook::new();
331
332        let token = table.new_request(book.clone(), false);
333        let token1 = table.clone_token(&token).unwrap();
334        let request_id = token.request_id;
335
336        let table1 = table.clone();
337        tokio::spawn(async move {
338            table1.respond(token, envelope(addr, Num(42)));
339        });
340
341        let _data = table.wait(request_id).await;
342        table.respond(token1, envelope(addr, Num(43)));
343    }
344}