mm1_node/runtime/context/
impl_context_api.rs

1use std::future::Future;
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::subnet::NetMask;
7use mm1_common::errors::error_of::ErrorOf;
8use mm1_common::futures::timeout::FutureTimeoutExt;
9use mm1_common::types::Never;
10use mm1_core::context::{
11    Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Quit, RecvErrorKind, SendErrorKind,
12    Start, Stop, Watching,
13};
14use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
15use mm1_proto_system::{InitAck, SpawnErrorKind, StartErrorKind, WatchRef};
16use tokio::sync::oneshot;
17use tokio::time::Instant;
18use tracing::trace;
19
20use crate::runtime::config::EffectiveActorConfig;
21use crate::runtime::context::ActorContext;
22use crate::runtime::runnable::BoxedRunnable;
23use crate::runtime::sys_call::SysCall;
24use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
25use crate::runtime::{container, mq};
26
27impl Quit for ActorContext {
28    async fn quit_ok(&mut self) -> Never {
29        self.call.invoke(SysCall::Exit(Ok(()))).await;
30        std::future::pending().await
31    }
32
33    async fn quit_err<E>(&mut self, reason: E) -> Never
34    where
35        E: std::error::Error + Send + Sync + 'static,
36    {
37        self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
38        std::future::pending().await
39    }
40}
41
42impl Fork for ActorContext {
43    async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
44        let address_lease = self
45            .subnet_pool
46            .lease(NetMask::M_64)
47            .map_err(|lease_error| {
48                ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
49            })?;
50        let actor_address = address_lease.address;
51
52        let (tx_priority, rx_priority) = mq::unbounded();
53        let (tx_regular, rx_regular) = mq::bounded(
54            self.rt_config
55                .actor_config(&self.actor_key)
56                .fork_inbox_size(),
57        );
58        let tx_system_weak = self.tx_system_weak.clone();
59        let tx_system = tx_system_weak
60            .upgrade()
61            .ok_or_else(|| ErrorOf::new(ForkErrorKind::InternalError, "tx_system_weak.upgrade"))?;
62
63        self.call
64            .invoke(SysCall::ForkAdded(
65                address_lease.address,
66                tx_priority.downgrade(),
67            ))
68            .await;
69
70        let () = self
71            .rt_api
72            .register(address_lease, tx_system, tx_priority, tx_regular);
73
74        let subnet_pool = self.subnet_pool.clone();
75        let rt_api = self.rt_api.clone();
76        let rt_config = self.rt_config.clone();
77
78        let actor_key = self.actor_key.clone();
79
80        let context = Self {
81            rt_api,
82            rt_config,
83            actor_address,
84            rx_priority,
85            rx_regular,
86            tx_system_weak,
87            call: self.call.clone(),
88            actor_key,
89            subnet_pool,
90            ack_to: None,
91            unregister_on_drop: true,
92        };
93
94        Ok(context)
95    }
96
97    async fn run<F, Fut>(self, fun: F)
98    where
99        F: FnOnce(Self) -> Fut,
100        F: Send + 'static,
101        Fut: std::future::Future + Send + 'static,
102    {
103        let call = self.call.clone();
104        let fut = fun(self).map(|_| ()).boxed();
105        call.invoke(SysCall::Spawn(fut)).await;
106    }
107}
108
109impl Now for ActorContext {
110    type Instant = Instant;
111
112    fn now(&self) -> Self::Instant {
113        Instant::now()
114    }
115}
116
117impl Start<BoxedRunnable<Self>> for ActorContext {
118    fn spawn(
119        &mut self,
120        runnable: BoxedRunnable<Self>,
121        link: bool,
122    ) -> impl Future<Output = Result<Address, ErrorOf<SpawnErrorKind>>> + Send {
123        do_spawn(self, runnable, None, link.then_some(self.actor_address))
124    }
125
126    fn start(
127        &mut self,
128        runnable: BoxedRunnable<Self>,
129        link: bool,
130        start_timeout: Duration,
131    ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
132        do_start(self, runnable, link, start_timeout)
133    }
134}
135
136impl Stop for ActorContext {
137    fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
138        let this = self.actor_address;
139        let out = do_exit(self, this, peer).is_ok();
140        std::future::ready(out)
141    }
142
143    fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
144        let out = do_kill(self, peer).is_ok();
145        std::future::ready(out)
146    }
147}
148
149impl Linking for ActorContext {
150    fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
151        let this = self.actor_address;
152        do_link(self, this, peer)
153    }
154
155    fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
156        let this = self.actor_address;
157        do_unlink(self, this, peer)
158    }
159
160    fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
161        do_set_trap_exit(self, enable)
162    }
163}
164
165impl Watching for ActorContext {
166    fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
167        do_watch(self, peer)
168    }
169
170    fn unwatch(
171        &mut self,
172        watch_ref: mm1_proto_system::WatchRef,
173    ) -> impl Future<Output = ()> + Send {
174        do_unwatch(self, watch_ref)
175    }
176}
177
178impl InitDone for ActorContext {
179    fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
180        do_init_done(self, address)
181    }
182}
183
184impl Messaging for ActorContext {
185    fn address(&self) -> Address {
186        self.actor_address
187    }
188
189    async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
190        let (priority, inbound_opt) = tokio::select! {
191            biased;
192
193            inbound_opt = self.rx_priority.recv() => (true, inbound_opt),
194            inbound_opt = self.rx_regular.recv() => (false, inbound_opt),
195        };
196
197        trace!(
198            "received [priority: {}; inbound: {:?}]",
199            priority, inbound_opt
200        );
201
202        inbound_opt.ok_or(ErrorOf::new(RecvErrorKind::Closed, "closed"))
203    }
204
205    async fn close(&mut self) {
206        self.rx_regular.close();
207        self.rx_priority.close();
208    }
209
210    fn send(
211        &mut self,
212        envelope: Envelope,
213    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
214        std::future::ready(do_send(self, envelope))
215    }
216}
217
218async fn do_start(
219    context: &mut ActorContext,
220    runnable: BoxedRunnable<ActorContext>,
221    link: bool,
222    timeout: Duration,
223) -> Result<Address, ErrorOf<StartErrorKind>> {
224    let this_address = context.actor_address;
225
226    let mut fork = context
227        .fork()
228        .await
229        .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
230
231    let fork_address = fork.actor_address;
232    let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
233        .await
234        .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
235
236    let envelope = match fork.recv().timeout(timeout).await {
237        Err(_elapsed) => {
238            do_kill(context, spawned_address)
239                .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
240
241            // TODO: should we ensure termination with a `system::Watch`?
242
243            return Err(ErrorOf::new(
244                StartErrorKind::Timeout,
245                "no init-ack within timeout",
246            ))
247        },
248        Ok(recv_result) => {
249            recv_result.map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
250        },
251    };
252
253    dispatch!(match envelope {
254        mm1_proto_system::InitAck { address } => {
255            if link {
256                do_link(context, this_address, address).await;
257            }
258            Ok(address)
259        },
260
261        mm1_proto_system::Exited { .. } => {
262            Err(ErrorOf::new(
263                StartErrorKind::Exited,
264                "exited before init-ack",
265            ))
266        },
267
268        unexpected @ _ => {
269            Err(ErrorOf::new(
270                StartErrorKind::InternalError,
271                format!("unexpected message: {:?}", unexpected),
272            ))
273        },
274    })
275}
276
277async fn do_spawn(
278    context: &mut ActorContext,
279    runnable: BoxedRunnable<ActorContext>,
280    ack_to: Option<Address>,
281    link_to: impl IntoIterator<Item = Address>,
282) -> Result<Address, ErrorOf<SpawnErrorKind>> {
283    let actor_key = context.actor_key.child(runnable.func_name());
284    let actor_config = context.rt_config.actor_config(&actor_key);
285    let execute_on = context.rt_api.choose_executor(actor_config.runtime_key());
286
287    trace!("starting [ack-to: {:?}]", ack_to);
288
289    let subnet_lease = context
290        .rt_api
291        .request_address(actor_config.netmask())
292        .await
293        .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
294
295    trace!("subnet-lease: {}", subnet_lease.net_address());
296
297    let rt_api = context.rt_api.clone();
298    let rt_config = context.rt_config.clone();
299    let container = container::Container::create(
300        container::ContainerArgs {
301            ack_to,
302            // FIXME: can we make it IntoIterator too?
303            link_to: link_to.into_iter().collect(),
304            actor_key,
305
306            subnet_lease,
307            rt_api,
308            rt_config,
309        },
310        runnable,
311    )
312    .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
313    let actor_address = container.actor_address();
314
315    trace!("actor-address: {}", actor_address);
316
317    // TODO: maybe keep it somewhere too?
318    let _join_handle = execute_on.spawn(container.run());
319
320    Ok(actor_address)
321}
322
323fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
324    context.rt_api.sys_send(
325        peer,
326        SysMsg::Link(SysLink::Exit {
327            sender:   this,
328            receiver: peer,
329            reason:   ExitReason::Terminate,
330        }),
331    )
332}
333
334fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
335    context.rt_api.sys_send(peer, SysMsg::Kill)
336}
337
338async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
339    context
340        .call
341        .invoke(SysCall::Link {
342            sender:   this,
343            receiver: peer,
344        })
345        .await
346}
347
348async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
349    context
350        .call
351        .invoke(SysCall::Unlink {
352            sender:   this,
353            receiver: peer,
354        })
355        .await
356}
357
358async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
359    context.call.invoke(SysCall::TrapExit(enable)).await;
360}
361
362async fn do_watch(context: &mut ActorContext, peer: Address) -> WatchRef {
363    let this = context.actor_address;
364    let (reply_tx, reply_rx) = oneshot::channel();
365    context
366        .call
367        .invoke(SysCall::Watch {
368            sender: this,
369            receiver: peer,
370            reply_tx,
371        })
372        .await;
373    reply_rx.await.expect("sys-call remained unanswered")
374}
375
376async fn do_unwatch(context: &mut ActorContext, watch_ref: WatchRef) {
377    let this = context.actor_address;
378    context
379        .call
380        .invoke(SysCall::Unwatch {
381            sender: this,
382            watch_ref,
383        })
384        .await;
385}
386
387async fn do_init_done(context: &mut ActorContext, address: Address) {
388    let message = InitAck { address };
389    let Some(ack_to_address) = context.ack_to.take() else {
390        return;
391    };
392    let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
393    let _ = context.rt_api.send(true, envelope.into_erased());
394}
395
396fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
397    trace!("sending [outbound: {:?}]", outbound);
398    context
399        .rt_api
400        .send(false, outbound)
401        .map_err(|k| ErrorOf::new(k, ""))
402}