Skip to main content

atomr_remote/endpoint/
mod.rs

1//! Per-association `Endpoint` actor.
2//!
3//! Each endpoint owns one peer (`Address` + UID). The writer half pumps
4//! outbound user/system payloads (with heartbeats when idle); the reader
5//! half consumes inbound payloads dispatched by the `EndpointManager`
6//! and routes them up to the local `ActorSystem`. Both halves cooperate
7//! with the [`AckedSendBuffer`] / [`AckedReceiveBuffer`] for
8//! sliding-window reliable delivery.
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use tokio::sync::{mpsc, Notify};
14
15use atomr_core::actor::Address;
16
17use crate::acked_delivery::{AckedReceiveBuffer, AckedSendBuffer, SeqNo};
18use crate::envelope::RemoteEnvelope;
19use crate::pdu::{AkkaPdu, DisassociateReason};
20use crate::settings::RemoteSettings;
21use crate::transport::AkkaProtocolTransport;
22
23/// Inbound payload destined for the local `ActorSystem`. Produced by the
24/// reader half and consumed by the local dispatcher (set on the
25/// provider).
26#[derive(Debug)]
27pub struct InboundEnvelope {
28    pub envelope: RemoteEnvelope,
29}
30
31/// Outbound commands accepted by the writer half.
32#[derive(Debug)]
33pub enum EndpointCmd {
34    Send(RemoteEnvelope),
35    SendSystem(RemoteEnvelope),
36    /// Resend the buffered window after a reconnect.
37    ResendBuffer,
38    /// Apply an inbound `Ack` to the send buffer.
39    ApplyAck(crate::pdu::AckInfo),
40    /// Drain and disassociate.
41    Shutdown(DisassociateReason),
42}
43
44/// PDUs the manager dispatches to this endpoint's reader half.
45#[derive(Debug)]
46pub enum InboundPdu {
47    Payload(RemoteEnvelope),
48    Ack(crate::pdu::AckInfo),
49}
50
51#[derive(Clone)]
52pub struct EndpointHandle {
53    pub remote: Address,
54    pub remote_uid: u64,
55    cmd_tx: mpsc::UnboundedSender<EndpointCmd>,
56    pdu_tx: mpsc::UnboundedSender<InboundPdu>,
57    shutdown: Arc<Notify>,
58}
59
60impl EndpointHandle {
61    pub fn send(&self, env: RemoteEnvelope) {
62        let _ = self.cmd_tx.send(EndpointCmd::Send(env));
63    }
64    pub fn send_system(&self, env: RemoteEnvelope) {
65        let _ = self.cmd_tx.send(EndpointCmd::SendSystem(env));
66    }
67    pub fn resend(&self) {
68        let _ = self.cmd_tx.send(EndpointCmd::ResendBuffer);
69    }
70    pub fn apply_ack(&self, ack: crate::pdu::AckInfo) {
71        let _ = self.cmd_tx.send(EndpointCmd::ApplyAck(ack));
72    }
73    /// Hand off an inbound PDU (called by the manager dispatch task).
74    pub fn deliver(&self, pdu: InboundPdu) {
75        let _ = self.pdu_tx.send(pdu);
76    }
77    pub fn shutdown(&self, reason: DisassociateReason) {
78        let _ = self.cmd_tx.send(EndpointCmd::Shutdown(reason));
79        self.shutdown.notify_waiters();
80    }
81}
82
83/// Spawn an endpoint reader/writer pair. The returned handle is what
84/// `RemoteActorRef::tell_serialized` ultimately writes to.
85pub fn spawn_endpoint(
86    protocol: Arc<AkkaProtocolTransport>,
87    settings: RemoteSettings,
88    remote: Address,
89    remote_uid: u64,
90    inbound_sink: mpsc::UnboundedSender<InboundEnvelope>,
91) -> EndpointHandle {
92    let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<EndpointCmd>();
93    let (pdu_tx, pdu_rx) = mpsc::unbounded_channel::<InboundPdu>();
94    let shutdown = Arc::new(Notify::new());
95
96    let cmd_tx_for_reader = cmd_tx.clone();
97    let writer = EndpointWriter {
98        protocol: protocol.clone(),
99        settings: settings.clone(),
100        remote: remote.clone(),
101        remote_uid,
102        seq: SeqNo::default(),
103        send_buf: AckedSendBuffer::new(settings.ack_window),
104        cmd_rx,
105        shutdown: shutdown.clone(),
106    };
107    let reader = EndpointReader {
108        remote: remote.clone(),
109        recv_buf: AckedReceiveBuffer::new(),
110        inbound_sink,
111        pdu_rx,
112        cmd_tx: cmd_tx_for_reader,
113        protocol: protocol.clone(),
114        shutdown: shutdown.clone(),
115    };
116
117    tokio::spawn(writer.run());
118    tokio::spawn(reader.run());
119
120    EndpointHandle { remote, remote_uid, cmd_tx, pdu_tx, shutdown }
121}
122
123struct EndpointWriter {
124    protocol: Arc<AkkaProtocolTransport>,
125    settings: RemoteSettings,
126    remote: Address,
127    #[allow(dead_code)]
128    remote_uid: u64,
129    seq: SeqNo,
130    send_buf: AckedSendBuffer,
131    cmd_rx: mpsc::UnboundedReceiver<EndpointCmd>,
132    shutdown: Arc<Notify>,
133}
134
135impl EndpointWriter {
136    async fn run(mut self) {
137        let mut heartbeat = tokio::time::interval(self.settings.heartbeat_interval);
138        heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
139        let _ = heartbeat.tick().await;
140
141        loop {
142            tokio::select! {
143                _ = self.shutdown.notified() => {
144                    let _ = self.protocol.send_pdu(
145                        &self.remote,
146                        AkkaPdu::Disassociate(DisassociateReason::Normal),
147                    ).await;
148                    break;
149                }
150                cmd = self.cmd_rx.recv() => {
151                    let Some(cmd) = cmd else { break };
152                    match cmd {
153                        EndpointCmd::Send(mut env) | EndpointCmd::SendSystem(mut env) => {
154                            env.seq_no = self.seq.advance();
155                            let _ = self.send_buf.push(env.clone());
156                            if let Err(e) = self
157                                .protocol
158                                .send_pdu(&self.remote, AkkaPdu::Payload(env))
159                                .await
160                            {
161                                tracing::warn!(remote = %self.remote, "send failed: {e}");
162                            }
163                        }
164                        EndpointCmd::ApplyAck(ack) => {
165                            self.send_buf.apply_ack(&ack);
166                        }
167                        EndpointCmd::ResendBuffer => {
168                            let envs = self.send_buf.drain_resend();
169                            for e in envs {
170                                let _ = self
171                                    .protocol
172                                    .send_pdu(&self.remote, AkkaPdu::Payload(e))
173                                    .await;
174                            }
175                        }
176                        EndpointCmd::Shutdown(reason) => {
177                            let _ = self
178                                .protocol
179                                .send_pdu(&self.remote, AkkaPdu::Disassociate(reason))
180                                .await;
181                            break;
182                        }
183                    }
184                }
185                _ = heartbeat.tick() => {
186                    let _ = self
187                        .protocol
188                        .send_pdu(&self.remote, AkkaPdu::Heartbeat)
189                        .await;
190                }
191            }
192        }
193    }
194}
195
196struct EndpointReader {
197    remote: Address,
198    recv_buf: AckedReceiveBuffer,
199    inbound_sink: mpsc::UnboundedSender<InboundEnvelope>,
200    pdu_rx: mpsc::UnboundedReceiver<InboundPdu>,
201    cmd_tx: mpsc::UnboundedSender<EndpointCmd>,
202    protocol: Arc<AkkaProtocolTransport>,
203    shutdown: Arc<Notify>,
204}
205
206impl EndpointReader {
207    async fn run(mut self) {
208        let mut ack_tick = tokio::time::interval(Duration::from_millis(200));
209        ack_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
210        let _ = ack_tick.tick().await;
211
212        loop {
213            tokio::select! {
214                _ = self.shutdown.notified() => break,
215                pdu = self.pdu_rx.recv() => {
216                    let Some(pdu) = pdu else { break };
217                    match pdu {
218                        InboundPdu::Payload(env) => {
219                            if self.recv_buf.observe(env.seq_no) {
220                                let _ = self.inbound_sink.send(InboundEnvelope { envelope: env });
221                            }
222                        }
223                        InboundPdu::Ack(ack) => {
224                            let _ = self.cmd_tx.send(EndpointCmd::ApplyAck(ack));
225                        }
226                    }
227                }
228                _ = ack_tick.tick() => {
229                    let ack = self.recv_buf.ack();
230                    if ack.cumulative_ack > 0 {
231                        let _ = self
232                            .protocol
233                            .send_pdu(&self.remote, AkkaPdu::Ack(ack))
234                            .await;
235                    }
236                }
237            }
238        }
239    }
240}