agner_init_ack/
channel.rs1use agner_utils::result_err_flatten::ResultErrFlattenIn;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::oneshot;
6
7use agner_actors::{ActorID, Exit};
8
9pub fn new() -> (InitAckTx, InitAckRx) {
10 let (tx, rx) = oneshot::channel();
11 (InitAckTx(tx), InitAckRx(rx))
12}
13
14#[derive(Debug)]
15pub struct InitAckTx(oneshot::Sender<Result<ActorID, Exit>>);
16
17#[derive(Debug)]
18#[pin_project::pin_project]
19pub struct InitAckRx(#[pin] oneshot::Receiver<Result<ActorID, Exit>>);
20
21impl InitAckTx {
22 pub fn ok(self, actor_id: ActorID) {
23 let _ = self.0.send(Ok(actor_id));
24 }
25
26 pub fn err(self, reason: impl Into<Exit>) {
27 let _ = self.0.send(Err(reason.into()));
28 }
29
30 pub fn ack(self, result: Result<ActorID, impl Into<Exit>>) {
31 match result {
32 Ok(actor_id) => self.ok(actor_id),
33 Err(err) => self.err(err),
34 }
35 }
36}
37
38impl Future for InitAckRx {
39 type Output = Result<ActorID, Exit>;
40
41 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42 let this = self.project();
43 let out = futures::ready!(this.0.poll(cx)).ok();
44 let out = out.ok_or_else(Exit::no_actor).err_flatten_in();
45 Poll::Ready(out)
46 }
47}