Skip to main content

elfo_test/
proxy.rs

1use std::{
2    collections::BTreeMap,
3    future::{self, Future},
4    panic::Location,
5    sync::{
6        atomic::{AtomicUsize, Ordering},
7        Arc, LazyLock,
8    },
9    thread,
10    time::Duration,
11};
12
13use futures_intrusive::timer::{LocalTimer, StdClock, TimerService};
14use serde::{de::Deserializer, Deserialize};
15use serde_value::Value;
16use tokio::{sync::oneshot, task};
17
18use elfo_core::{
19    ActorGroup, Addr, Blueprint, Context, Envelope, Local, Message, MoveOwnership, Request,
20    ResponseToken,
21    _priv::do_start,
22    addr::NodeLaunchId,
23    errors::{RequestError, TrySendError},
24    message, msg,
25    routers::{MapRouter, Outcome},
26    scope::{self, Scope},
27    topology::Topology,
28};
29
30const SYNC_YIELD_COUNT: usize = 32;
31
32/// A proxy for testing actors.
33pub struct Proxy {
34    context: ProxyContext,
35    scope: Scope,
36    subject_addr: Addr,
37    recv_timeout: Duration,
38}
39
40type ProxyContext = Context<(), usize>;
41
42impl Proxy {
43    /// Returns an address of the proxy.
44    pub fn addr(&self) -> Addr {
45        self.context.addr()
46    }
47
48    /// Returns a launch ID of the topology.
49    ///
50    /// It can be used to distinguish produced artifacts (logs, dumps, metrics)
51    /// from different concurrent tests if the custom implementation is used.
52    pub fn node_launch_id(&self) -> NodeLaunchId {
53        self.scope.node_launch_id()
54    }
55
56    /// See [`Context::unbounded_send()`] for details.
57    #[track_caller]
58    pub fn unbounded_send<M: Message>(&self, message: M) {
59        self.scope.clone().sync_within(|| {
60            let name = message.name();
61            if let Err(err) = self.context.unbounded_send(message) {
62                panic!("cannot send {name} ({err}) unboundedly")
63            }
64        })
65    }
66
67    /// See [`Context::unbounded_send_to()`] for details.
68    #[track_caller]
69    pub fn unbounded_send_to<M: Message>(&self, recipient: Addr, message: M) {
70        self.scope.clone().sync_within(|| {
71            let name = message.name();
72            if let Err(err) = self.context.unbounded_send_to(recipient, message) {
73                panic!("cannot send {name} ({err}) unboundedly")
74            }
75        })
76    }
77
78    /// See [`Context::send()`] for details.
79    #[track_caller]
80    pub fn send<M: Message>(&self, message: M) -> impl Future<Output = ()> + '_ {
81        let location = Location::caller();
82        self.scope.clone().within(async move {
83            let name = message.name();
84            if let Err(err) = self.context.send(message).await {
85                panic!("cannot send {name} ({err}) at {location}");
86            }
87        })
88    }
89
90    /// See [`Context::send_to()`] for details.
91    #[track_caller]
92    pub fn send_to<M: Message>(
93        &self,
94        recipient: Addr,
95        message: M,
96    ) -> impl Future<Output = ()> + '_ {
97        let location = Location::caller();
98        self.scope.clone().within(async move {
99            let name = message.name();
100            if let Err(err) = self.context.send_to(recipient, message).await {
101                panic!("cannot send {name} ({err}) at {location}");
102            }
103        })
104    }
105
106    /// See [`Context::try_send()`] for details.
107    #[track_caller]
108    pub fn try_send<M: Message>(&self, message: M) -> Result<(), TrySendError<M>> {
109        self.scope
110            .clone()
111            .sync_within(|| self.context.try_send(message))
112    }
113
114    /// See [`Context::try_send_to()`] for details.
115    #[track_caller]
116    pub fn try_send_to<M: Message>(
117        &self,
118        recipient: Addr,
119        message: M,
120    ) -> Result<(), TrySendError<M>> {
121        self.scope
122            .clone()
123            .sync_within(|| self.context.try_send_to(recipient, message))
124    }
125
126    /// Same as [`Self::request`], but doesn't unwraps the error.
127    pub fn request_fallible<R: Request>(
128        &self,
129        request: R,
130    ) -> impl Future<Output = Result<R::Response, RequestError>> {
131        let context = self.context.pruned();
132        self.scope
133            .clone()
134            .within(async move { context.request(request).resolve().await })
135    }
136
137    /// See [`Context::request()`] for details.
138    #[track_caller]
139    pub fn request<R: Request>(&self, request: R) -> impl Future<Output = R::Response> {
140        let location = Location::caller();
141        let context = self.context.pruned();
142        self.scope.clone().within(async move {
143            let name = request.name();
144            match context.request(request).resolve().await {
145                Ok(response) => response,
146                Err(err) => panic!("cannot send {name} ({err}) at {location}"),
147            }
148        })
149    }
150
151    /// Same as [`Self::request_to`], but doesn't unwraps the errors.
152    pub fn request_to_fallible<R: Request>(
153        &self,
154        recipient: Addr,
155        request: R,
156    ) -> impl Future<Output = Result<R::Response, RequestError>> {
157        let context = self.context.pruned();
158        self.scope
159            .clone()
160            .within(async move { context.request_to(recipient, request).resolve().await })
161    }
162
163    /// See [`Context::request_to()`] for details.
164    #[track_caller]
165    pub fn request_to<R: Request>(
166        &self,
167        recipient: Addr,
168        request: R,
169    ) -> impl Future<Output = R::Response> {
170        let location = Location::caller();
171        let context = self.context.pruned();
172        self.scope.clone().within(async move {
173            let name = request.name();
174            match context.request_to(recipient, request).resolve().await {
175                Ok(response) => response,
176                Err(err) => panic!("cannot send {name} ({err}) at {location}"),
177            }
178        })
179    }
180
181    /// See [`Context::respond()`] for details.
182    pub fn respond<R: Request>(&self, token: ResponseToken<R>, response: R::Response) {
183        self.scope
184            .clone()
185            .sync_within(|| self.context.respond(token, response))
186    }
187
188    /// See [`Context::recv()`] for details.
189    #[track_caller]
190    pub fn recv(&mut self) -> impl Future<Output = Envelope> + '_ {
191        // We use a separate timer here to avoid interaction with the tokio's timer.
192        static STD_CLOCK: LazyLock<StdClock> = LazyLock::new(StdClock::new);
193        static TIMER_SERVICE: LazyLock<Arc<TimerService>> = LazyLock::new(|| {
194            let timer_service = Arc::new(TimerService::new(&*STD_CLOCK));
195            thread::spawn({
196                let timer_service = timer_service.clone();
197                move || loop {
198                    std::thread::sleep(Duration::from_millis(25));
199                    timer_service.check_expirations();
200                }
201            });
202            timer_service
203        });
204
205        let location = Location::caller();
206        self.scope.clone().within(async move {
207            tokio::select! {
208                Some(envelope) = self.context.recv() => {
209                    envelope
210                },
211                _ = TIMER_SERVICE.delay(self.recv_timeout) => {
212                    panic!(
213                        "timeout ({:?}) while receiving a message at {}",
214                        self.recv_timeout, location,
215                    );
216                }
217            }
218        })
219    }
220
221    /// See [`Context::try_recv()`] for details.
222    pub async fn try_recv(&mut self) -> Option<Envelope> {
223        self.scope
224            .clone()
225            .within(async move { self.context.try_recv().await.ok() })
226            .await
227    }
228
229    /// Waits until the testable actor handles all previously sent messages.
230    ///
231    /// Now it's implemented as multiple calls `yield_now()`,
232    /// but the implementation can be changed in the future.
233    pub async fn sync(&mut self) {
234        // TODO: it should probably be `request(Ping).await`.
235        for _ in 0..SYNC_YIELD_COUNT {
236            task::yield_now().await;
237        }
238    }
239
240    /// Sets message wait time for `recv` call.
241    pub fn set_recv_timeout(&mut self, recv_timeout: Duration) {
242        self.recv_timeout = recv_timeout;
243    }
244
245    /// Creates a subproxy with a different address.
246    /// The main purpose is to test `send_to(..)` and `request_to(..)` calls.
247    pub async fn subproxy(&self) -> Proxy {
248        let f = async {
249            self.context
250                .request_to(self.context.group(), CreateSubproxy)
251                .resolve()
252                .await
253                .expect("cannot create a new subpoxy")
254        };
255
256        let ProxyCreated { context, scope } = self.scope.clone().within(f).await;
257
258        Proxy {
259            context: context.into_inner(),
260            scope: scope.into_inner(),
261            subject_addr: self.subject_addr,
262            recv_timeout: self.recv_timeout,
263        }
264    }
265
266    /// Waits until the testable actor finishes.
267    pub async fn finished(&self) {
268        let fut = self.context.finished(self.subject_addr);
269        self.scope.clone().within(fut).await
270    }
271
272    /// Closes a mailbox of the proxy.
273    pub fn close(&self) {
274        self.scope.clone().sync_within(|| self.context.close());
275    }
276}
277
278#[message(ret = ProxyCreated)]
279struct CreateSubproxy;
280
281#[message(part)]
282struct ProxyCreated {
283    context: Local<ProxyContext>,
284    scope: Local<Scope>,
285}
286
287fn testers(tx: oneshot::Sender<ProxyCreated>) -> Blueprint {
288    let tx = MoveOwnership::from(tx);
289    let key = AtomicUsize::new(1); // 0 is reserved for the main proxy
290
291    ActorGroup::new()
292        .router(MapRouter::new(move |envelope| {
293            msg!(match envelope {
294                CreateSubproxy => Outcome::Unicast(key.fetch_add(1, Ordering::SeqCst)),
295                _ => Outcome::Unicast(0),
296            })
297        }))
298        .exec(move |mut ctx| {
299            let tx = tx.clone();
300            async move {
301                // It would be nice to use the code in the `else` branch also for the main
302                // proxy. Unfortunately, the main proxy can receive messages from the subject
303                // before receiving the `CreateSubproxy` message. That's why we need to use
304                // a dedicated oneshot channel for the main proxy.
305                // See the `it_handles_race_at_startup` test for an example.
306                if let Some(tx) = tx.take() {
307                    let _ = tx.send(ProxyCreated {
308                        context: ctx.into(),
309                        scope: scope::expose().into(),
310                    });
311                } else {
312                    let envelope = ctx.recv().await.unwrap();
313                    let (_, token) = crate::extract_request::<CreateSubproxy>(envelope);
314
315                    ctx.pruned().respond(
316                        token,
317                        ProxyCreated {
318                            scope: scope::expose().into(),
319                            context: ctx.into(),
320                        },
321                    );
322                }
323
324                // We don't track the lifetime of sent context for now, so keep the actor alive.
325                future::pending::<()>().await;
326            }
327        })
328}
329
330#[doc(hidden)]
331#[instability::unstable]
332pub async fn proxy_with_route<F>(
333    blueprint: Blueprint,
334    route_filter: F,
335    config: impl for<'de> Deserializer<'de>,
336) -> Proxy
337where
338    F: Fn(&Envelope) -> bool + Send + Sync + 'static,
339{
340    // Initialize logging but skip errors if the logger is already initialized.
341    // It occurs when tests are run in the same process.
342    let _ = tracing_subscriber::fmt()
343        .with_target(false)
344        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
345        .with_test_writer()
346        .try_init();
347
348    let config = Value::deserialize(config).expect("invalid config");
349    let mut map = BTreeMap::new();
350    map.insert(Value::String("subject".into()), config);
351    let config = Value::Map(map);
352
353    let topology = Topology::empty();
354    let subject = topology.local("subject");
355    let testers = topology.local("system.testers");
356    let configurers = topology.local("system.configurers").entrypoint();
357
358    let subject_addr = subject.addr();
359
360    testers.route_all_to(&subject);
361    subject.route_to(&testers, route_filter);
362
363    configurers.mount(elfo_configurer::fixture(&topology, config));
364    subject.mount(blueprint);
365
366    let (tx, rx) = oneshot::channel();
367    testers.mount(self::testers(tx));
368    do_start(topology, false, |_, _| future::ready(()))
369        .await
370        .expect("cannot start");
371
372    let ProxyCreated { context, scope } = rx.await.expect("cannot create main proxy");
373
374    Proxy {
375        context: context.into_inner(),
376        scope: scope.into_inner(),
377        subject_addr,
378        recv_timeout: Duration::from_millis(150),
379    }
380}
381
382/// Creates a proxy for testing actors.
383/// See examples in the repository for more details how to use it.
384pub async fn proxy(blueprint: Blueprint, config: impl for<'de> Deserializer<'de>) -> Proxy {
385    proxy_with_route(blueprint, |_| true, config).await
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    use elfo_core::{assert_msg_eq, config::AnyConfig, message, msg};
393
394    #[message]
395    #[derive(PartialEq)]
396    struct SomeMessage;
397
398    #[message(ret = u32)]
399    #[derive(PartialEq)]
400    struct SomeRequest;
401
402    #[message]
403    #[derive(PartialEq)]
404    struct SomeMessage2;
405
406    #[tokio::test]
407    async fn it_handles_race_at_startup() {
408        let mut proxy = super::proxy(
409            ActorGroup::new().exec(|ctx| async move {
410                ctx.send(SomeMessage).await.unwrap();
411            }),
412            AnyConfig::default(),
413        )
414        .await;
415
416        assert_msg_eq!(proxy.recv().await, SomeMessage);
417    }
418
419    async fn sample() -> Proxy {
420        super::proxy(
421            ActorGroup::new().exec(|mut ctx| async move {
422                while let Some(envelope) = ctx.recv().await {
423                    let addr = envelope.sender();
424                    msg!(match envelope {
425                        SomeMessage => ctx.send_to(addr, SomeMessage2).await.unwrap(),
426                        (SomeRequest, token) => ctx.respond(token, 42),
427                    });
428                }
429            }),
430            AnyConfig::default(),
431        )
432        .await
433    }
434
435    #[tokio::test]
436    async fn main_proxy_works() {
437        let mut proxy = sample().await;
438        assert_eq!(proxy.request(SomeRequest).await, 42);
439        proxy.send(SomeMessage).await;
440        assert_msg_eq!(proxy.recv().await, SomeMessage2);
441    }
442
443    #[tokio::test]
444    async fn subproxy_works() {
445        let proxy = sample().await;
446        let mut subproxy = proxy.subproxy().await;
447        assert_eq!(subproxy.request(SomeRequest).await, 42);
448        subproxy.send(SomeMessage).await;
449        assert_msg_eq!(subproxy.recv().await, SomeMessage2);
450    }
451}