1use std::{
2 collections::BTreeMap,
3 future::{self, Future},
4 panic::Location,
5 sync::{
6 atomic::{AtomicUsize, Ordering},
7 Arc, LazyLock,
8 },
9 thread,
10 time::Duration,
11};
12
13use futures_intrusive::timer::{LocalTimer, StdClock, TimerService};
14use serde::{de::Deserializer, Deserialize};
15use serde_value::Value;
16use tokio::{sync::oneshot, task};
17
18use elfo_core::{
19 ActorGroup, Addr, Blueprint, Context, Envelope, Local, Message, MoveOwnership, Request,
20 ResponseToken,
21 _priv::do_start,
22 addr::NodeLaunchId,
23 errors::{RequestError, TrySendError},
24 message, msg,
25 routers::{MapRouter, Outcome},
26 scope::{self, Scope},
27 topology::Topology,
28};
29
30const SYNC_YIELD_COUNT: usize = 32;
31
32pub struct Proxy {
34 context: ProxyContext,
35 scope: Scope,
36 subject_addr: Addr,
37 recv_timeout: Duration,
38}
39
40type ProxyContext = Context<(), usize>;
41
42impl Proxy {
43 pub fn addr(&self) -> Addr {
45 self.context.addr()
46 }
47
48 pub fn node_launch_id(&self) -> NodeLaunchId {
53 self.scope.node_launch_id()
54 }
55
56 #[track_caller]
58 pub fn unbounded_send<M: Message>(&self, message: M) {
59 self.scope.clone().sync_within(|| {
60 let name = message.name();
61 if let Err(err) = self.context.unbounded_send(message) {
62 panic!("cannot send {name} ({err}) unboundedly")
63 }
64 })
65 }
66
67 #[track_caller]
69 pub fn unbounded_send_to<M: Message>(&self, recipient: Addr, message: M) {
70 self.scope.clone().sync_within(|| {
71 let name = message.name();
72 if let Err(err) = self.context.unbounded_send_to(recipient, message) {
73 panic!("cannot send {name} ({err}) unboundedly")
74 }
75 })
76 }
77
78 #[track_caller]
80 pub fn send<M: Message>(&self, message: M) -> impl Future<Output = ()> + '_ {
81 let location = Location::caller();
82 self.scope.clone().within(async move {
83 let name = message.name();
84 if let Err(err) = self.context.send(message).await {
85 panic!("cannot send {name} ({err}) at {location}");
86 }
87 })
88 }
89
90 #[track_caller]
92 pub fn send_to<M: Message>(
93 &self,
94 recipient: Addr,
95 message: M,
96 ) -> impl Future<Output = ()> + '_ {
97 let location = Location::caller();
98 self.scope.clone().within(async move {
99 let name = message.name();
100 if let Err(err) = self.context.send_to(recipient, message).await {
101 panic!("cannot send {name} ({err}) at {location}");
102 }
103 })
104 }
105
106 #[track_caller]
108 pub fn try_send<M: Message>(&self, message: M) -> Result<(), TrySendError<M>> {
109 self.scope
110 .clone()
111 .sync_within(|| self.context.try_send(message))
112 }
113
114 #[track_caller]
116 pub fn try_send_to<M: Message>(
117 &self,
118 recipient: Addr,
119 message: M,
120 ) -> Result<(), TrySendError<M>> {
121 self.scope
122 .clone()
123 .sync_within(|| self.context.try_send_to(recipient, message))
124 }
125
126 pub fn request_fallible<R: Request>(
128 &self,
129 request: R,
130 ) -> impl Future<Output = Result<R::Response, RequestError>> {
131 let context = self.context.pruned();
132 self.scope
133 .clone()
134 .within(async move { context.request(request).resolve().await })
135 }
136
137 #[track_caller]
139 pub fn request<R: Request>(&self, request: R) -> impl Future<Output = R::Response> {
140 let location = Location::caller();
141 let context = self.context.pruned();
142 self.scope.clone().within(async move {
143 let name = request.name();
144 match context.request(request).resolve().await {
145 Ok(response) => response,
146 Err(err) => panic!("cannot send {name} ({err}) at {location}"),
147 }
148 })
149 }
150
151 pub fn request_to_fallible<R: Request>(
153 &self,
154 recipient: Addr,
155 request: R,
156 ) -> impl Future<Output = Result<R::Response, RequestError>> {
157 let context = self.context.pruned();
158 self.scope
159 .clone()
160 .within(async move { context.request_to(recipient, request).resolve().await })
161 }
162
163 #[track_caller]
165 pub fn request_to<R: Request>(
166 &self,
167 recipient: Addr,
168 request: R,
169 ) -> impl Future<Output = R::Response> {
170 let location = Location::caller();
171 let context = self.context.pruned();
172 self.scope.clone().within(async move {
173 let name = request.name();
174 match context.request_to(recipient, request).resolve().await {
175 Ok(response) => response,
176 Err(err) => panic!("cannot send {name} ({err}) at {location}"),
177 }
178 })
179 }
180
181 pub fn respond<R: Request>(&self, token: ResponseToken<R>, response: R::Response) {
183 self.scope
184 .clone()
185 .sync_within(|| self.context.respond(token, response))
186 }
187
188 #[track_caller]
190 pub fn recv(&mut self) -> impl Future<Output = Envelope> + '_ {
191 static STD_CLOCK: LazyLock<StdClock> = LazyLock::new(StdClock::new);
193 static TIMER_SERVICE: LazyLock<Arc<TimerService>> = LazyLock::new(|| {
194 let timer_service = Arc::new(TimerService::new(&*STD_CLOCK));
195 thread::spawn({
196 let timer_service = timer_service.clone();
197 move || loop {
198 std::thread::sleep(Duration::from_millis(25));
199 timer_service.check_expirations();
200 }
201 });
202 timer_service
203 });
204
205 let location = Location::caller();
206 self.scope.clone().within(async move {
207 tokio::select! {
208 Some(envelope) = self.context.recv() => {
209 envelope
210 },
211 _ = TIMER_SERVICE.delay(self.recv_timeout) => {
212 panic!(
213 "timeout ({:?}) while receiving a message at {}",
214 self.recv_timeout, location,
215 );
216 }
217 }
218 })
219 }
220
221 pub async fn try_recv(&mut self) -> Option<Envelope> {
223 self.scope
224 .clone()
225 .within(async move { self.context.try_recv().await.ok() })
226 .await
227 }
228
229 pub async fn sync(&mut self) {
234 for _ in 0..SYNC_YIELD_COUNT {
236 task::yield_now().await;
237 }
238 }
239
240 pub fn set_recv_timeout(&mut self, recv_timeout: Duration) {
242 self.recv_timeout = recv_timeout;
243 }
244
245 pub async fn subproxy(&self) -> Proxy {
248 let f = async {
249 self.context
250 .request_to(self.context.group(), CreateSubproxy)
251 .resolve()
252 .await
253 .expect("cannot create a new subpoxy")
254 };
255
256 let ProxyCreated { context, scope } = self.scope.clone().within(f).await;
257
258 Proxy {
259 context: context.into_inner(),
260 scope: scope.into_inner(),
261 subject_addr: self.subject_addr,
262 recv_timeout: self.recv_timeout,
263 }
264 }
265
266 pub async fn finished(&self) {
268 let fut = self.context.finished(self.subject_addr);
269 self.scope.clone().within(fut).await
270 }
271
272 pub fn close(&self) {
274 self.scope.clone().sync_within(|| self.context.close());
275 }
276}
277
278#[message(ret = ProxyCreated)]
279struct CreateSubproxy;
280
281#[message(part)]
282struct ProxyCreated {
283 context: Local<ProxyContext>,
284 scope: Local<Scope>,
285}
286
287fn testers(tx: oneshot::Sender<ProxyCreated>) -> Blueprint {
288 let tx = MoveOwnership::from(tx);
289 let key = AtomicUsize::new(1); ActorGroup::new()
292 .router(MapRouter::new(move |envelope| {
293 msg!(match envelope {
294 CreateSubproxy => Outcome::Unicast(key.fetch_add(1, Ordering::SeqCst)),
295 _ => Outcome::Unicast(0),
296 })
297 }))
298 .exec(move |mut ctx| {
299 let tx = tx.clone();
300 async move {
301 if let Some(tx) = tx.take() {
307 let _ = tx.send(ProxyCreated {
308 context: ctx.into(),
309 scope: scope::expose().into(),
310 });
311 } else {
312 let envelope = ctx.recv().await.unwrap();
313 let (_, token) = crate::extract_request::<CreateSubproxy>(envelope);
314
315 ctx.pruned().respond(
316 token,
317 ProxyCreated {
318 scope: scope::expose().into(),
319 context: ctx.into(),
320 },
321 );
322 }
323
324 future::pending::<()>().await;
326 }
327 })
328}
329
330#[doc(hidden)]
331#[instability::unstable]
332pub async fn proxy_with_route<F>(
333 blueprint: Blueprint,
334 route_filter: F,
335 config: impl for<'de> Deserializer<'de>,
336) -> Proxy
337where
338 F: Fn(&Envelope) -> bool + Send + Sync + 'static,
339{
340 let _ = tracing_subscriber::fmt()
343 .with_target(false)
344 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
345 .with_test_writer()
346 .try_init();
347
348 let config = Value::deserialize(config).expect("invalid config");
349 let mut map = BTreeMap::new();
350 map.insert(Value::String("subject".into()), config);
351 let config = Value::Map(map);
352
353 let topology = Topology::empty();
354 let subject = topology.local("subject");
355 let testers = topology.local("system.testers");
356 let configurers = topology.local("system.configurers").entrypoint();
357
358 let subject_addr = subject.addr();
359
360 testers.route_all_to(&subject);
361 subject.route_to(&testers, route_filter);
362
363 configurers.mount(elfo_configurer::fixture(&topology, config));
364 subject.mount(blueprint);
365
366 let (tx, rx) = oneshot::channel();
367 testers.mount(self::testers(tx));
368 do_start(topology, false, |_, _| future::ready(()))
369 .await
370 .expect("cannot start");
371
372 let ProxyCreated { context, scope } = rx.await.expect("cannot create main proxy");
373
374 Proxy {
375 context: context.into_inner(),
376 scope: scope.into_inner(),
377 subject_addr,
378 recv_timeout: Duration::from_millis(150),
379 }
380}
381
382pub async fn proxy(blueprint: Blueprint, config: impl for<'de> Deserializer<'de>) -> Proxy {
385 proxy_with_route(blueprint, |_| true, config).await
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 use elfo_core::{assert_msg_eq, config::AnyConfig, message, msg};
393
394 #[message]
395 #[derive(PartialEq)]
396 struct SomeMessage;
397
398 #[message(ret = u32)]
399 #[derive(PartialEq)]
400 struct SomeRequest;
401
402 #[message]
403 #[derive(PartialEq)]
404 struct SomeMessage2;
405
406 #[tokio::test]
407 async fn it_handles_race_at_startup() {
408 let mut proxy = super::proxy(
409 ActorGroup::new().exec(|ctx| async move {
410 ctx.send(SomeMessage).await.unwrap();
411 }),
412 AnyConfig::default(),
413 )
414 .await;
415
416 assert_msg_eq!(proxy.recv().await, SomeMessage);
417 }
418
419 async fn sample() -> Proxy {
420 super::proxy(
421 ActorGroup::new().exec(|mut ctx| async move {
422 while let Some(envelope) = ctx.recv().await {
423 let addr = envelope.sender();
424 msg!(match envelope {
425 SomeMessage => ctx.send_to(addr, SomeMessage2).await.unwrap(),
426 (SomeRequest, token) => ctx.respond(token, 42),
427 });
428 }
429 }),
430 AnyConfig::default(),
431 )
432 .await
433 }
434
435 #[tokio::test]
436 async fn main_proxy_works() {
437 let mut proxy = sample().await;
438 assert_eq!(proxy.request(SomeRequest).await, 42);
439 proxy.send(SomeMessage).await;
440 assert_msg_eq!(proxy.recv().await, SomeMessage2);
441 }
442
443 #[tokio::test]
444 async fn subproxy_works() {
445 let proxy = sample().await;
446 let mut subproxy = proxy.subproxy().await;
447 assert_eq!(subproxy.request(SomeRequest).await, 42);
448 subproxy.send(SomeMessage).await;
449 assert_msg_eq!(subproxy.recv().await, SomeMessage2);
450 }
451}