Skip to main content

atomr_remote/endpoint/
mod.rs

1//! Per-association `Endpoint` actor.
2//! akka.net: `Remote/EndpointWriter.cs`, `Remote/EndpointReader.cs`,
3//! `Remote/Endpoint.cs`.
4//!
5//! Each endpoint owns one peer (`Address` + UID). The writer half pumps
6//! outbound user/system payloads (with heartbeats when idle); the reader
7//! half consumes inbound payloads dispatched by the `EndpointManager`
8//! and routes them up to the local `ActorSystem`. Both halves cooperate
9//! with the [`AckedSendBuffer`] / [`AckedReceiveBuffer`] for
10//! sliding-window reliable delivery.
11
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::sync::{mpsc, Notify};
16
17use atomr_core::actor::Address;
18
19use crate::acked_delivery::{AckedReceiveBuffer, AckedSendBuffer, SeqNo};
20use crate::envelope::RemoteEnvelope;
21use crate::pdu::{AkkaPdu, DisassociateReason};
22use crate::settings::RemoteSettings;
23use crate::transport::AkkaProtocolTransport;
24
25/// Inbound payload destined for the local `ActorSystem`. Produced by the
26/// reader half and consumed by the local dispatcher (set on the
27/// provider).
28#[derive(Debug)]
29pub struct InboundEnvelope {
30    pub envelope: RemoteEnvelope,
31}
32
33/// Outbound commands accepted by the writer half.
34#[derive(Debug)]
35pub enum EndpointCmd {
36    Send(RemoteEnvelope),
37    SendSystem(RemoteEnvelope),
38    /// Resend the buffered window after a reconnect.
39    ResendBuffer,
40    /// Apply an inbound `Ack` to the send buffer.
41    ApplyAck(crate::pdu::AckInfo),
42    /// Drain and disassociate.
43    Shutdown(DisassociateReason),
44}
45
46/// PDUs the manager dispatches to this endpoint's reader half.
47#[derive(Debug)]
48pub enum InboundPdu {
49    Payload(RemoteEnvelope),
50    Ack(crate::pdu::AckInfo),
51}
52
53#[derive(Clone)]
54pub struct EndpointHandle {
55    pub remote: Address,
56    pub remote_uid: u64,
57    cmd_tx: mpsc::UnboundedSender<EndpointCmd>,
58    pdu_tx: mpsc::UnboundedSender<InboundPdu>,
59    shutdown: Arc<Notify>,
60}
61
62impl EndpointHandle {
63    pub fn send(&self, env: RemoteEnvelope) {
64        let _ = self.cmd_tx.send(EndpointCmd::Send(env));
65    }
66    pub fn send_system(&self, env: RemoteEnvelope) {
67        let _ = self.cmd_tx.send(EndpointCmd::SendSystem(env));
68    }
69    pub fn resend(&self) {
70        let _ = self.cmd_tx.send(EndpointCmd::ResendBuffer);
71    }
72    pub fn apply_ack(&self, ack: crate::pdu::AckInfo) {
73        let _ = self.cmd_tx.send(EndpointCmd::ApplyAck(ack));
74    }
75    /// Hand off an inbound PDU (called by the manager dispatch task).
76    pub fn deliver(&self, pdu: InboundPdu) {
77        let _ = self.pdu_tx.send(pdu);
78    }
79    pub fn shutdown(&self, reason: DisassociateReason) {
80        let _ = self.cmd_tx.send(EndpointCmd::Shutdown(reason));
81        self.shutdown.notify_waiters();
82    }
83}
84
85/// Spawn an endpoint reader/writer pair. The returned handle is what
86/// `RemoteActorRef::tell_serialized` ultimately writes to.
87pub fn spawn_endpoint(
88    protocol: Arc<AkkaProtocolTransport>,
89    settings: RemoteSettings,
90    remote: Address,
91    remote_uid: u64,
92    inbound_sink: mpsc::UnboundedSender<InboundEnvelope>,
93) -> EndpointHandle {
94    let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<EndpointCmd>();
95    let (pdu_tx, pdu_rx) = mpsc::unbounded_channel::<InboundPdu>();
96    let shutdown = Arc::new(Notify::new());
97
98    let cmd_tx_for_reader = cmd_tx.clone();
99    let writer = EndpointWriter {
100        protocol: protocol.clone(),
101        settings: settings.clone(),
102        remote: remote.clone(),
103        remote_uid,
104        seq: SeqNo::default(),
105        send_buf: AckedSendBuffer::new(settings.ack_window),
106        cmd_rx,
107        shutdown: shutdown.clone(),
108    };
109    let reader = EndpointReader {
110        remote: remote.clone(),
111        recv_buf: AckedReceiveBuffer::new(),
112        inbound_sink,
113        pdu_rx,
114        cmd_tx: cmd_tx_for_reader,
115        protocol: protocol.clone(),
116        shutdown: shutdown.clone(),
117    };
118
119    tokio::spawn(writer.run());
120    tokio::spawn(reader.run());
121
122    EndpointHandle { remote, remote_uid, cmd_tx, pdu_tx, shutdown }
123}
124
125struct EndpointWriter {
126    protocol: Arc<AkkaProtocolTransport>,
127    settings: RemoteSettings,
128    remote: Address,
129    #[allow(dead_code)]
130    remote_uid: u64,
131    seq: SeqNo,
132    send_buf: AckedSendBuffer,
133    cmd_rx: mpsc::UnboundedReceiver<EndpointCmd>,
134    shutdown: Arc<Notify>,
135}
136
137impl EndpointWriter {
138    async fn run(mut self) {
139        let mut heartbeat = tokio::time::interval(self.settings.heartbeat_interval);
140        heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
141        let _ = heartbeat.tick().await;
142
143        loop {
144            tokio::select! {
145                _ = self.shutdown.notified() => {
146                    let _ = self.protocol.send_pdu(
147                        &self.remote,
148                        AkkaPdu::Disassociate(DisassociateReason::Normal),
149                    ).await;
150                    break;
151                }
152                cmd = self.cmd_rx.recv() => {
153                    let Some(cmd) = cmd else { break };
154                    match cmd {
155                        EndpointCmd::Send(mut env) | EndpointCmd::SendSystem(mut env) => {
156                            env.seq_no = self.seq.advance();
157                            let _ = self.send_buf.push(env.clone());
158                            if let Err(e) = self
159                                .protocol
160                                .send_pdu(&self.remote, AkkaPdu::Payload(env))
161                                .await
162                            {
163                                tracing::warn!(remote = %self.remote, "send failed: {e}");
164                            }
165                        }
166                        EndpointCmd::ApplyAck(ack) => {
167                            self.send_buf.apply_ack(&ack);
168                        }
169                        EndpointCmd::ResendBuffer => {
170                            let envs = self.send_buf.drain_resend();
171                            for e in envs {
172                                let _ = self
173                                    .protocol
174                                    .send_pdu(&self.remote, AkkaPdu::Payload(e))
175                                    .await;
176                            }
177                        }
178                        EndpointCmd::Shutdown(reason) => {
179                            let _ = self
180                                .protocol
181                                .send_pdu(&self.remote, AkkaPdu::Disassociate(reason))
182                                .await;
183                            break;
184                        }
185                    }
186                }
187                _ = heartbeat.tick() => {
188                    let _ = self
189                        .protocol
190                        .send_pdu(&self.remote, AkkaPdu::Heartbeat)
191                        .await;
192                }
193            }
194        }
195    }
196}
197
198struct EndpointReader {
199    remote: Address,
200    recv_buf: AckedReceiveBuffer,
201    inbound_sink: mpsc::UnboundedSender<InboundEnvelope>,
202    pdu_rx: mpsc::UnboundedReceiver<InboundPdu>,
203    cmd_tx: mpsc::UnboundedSender<EndpointCmd>,
204    protocol: Arc<AkkaProtocolTransport>,
205    shutdown: Arc<Notify>,
206}
207
208impl EndpointReader {
209    async fn run(mut self) {
210        let mut ack_tick = tokio::time::interval(Duration::from_millis(200));
211        ack_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
212        let _ = ack_tick.tick().await;
213
214        loop {
215            tokio::select! {
216                _ = self.shutdown.notified() => break,
217                pdu = self.pdu_rx.recv() => {
218                    let Some(pdu) = pdu else { break };
219                    match pdu {
220                        InboundPdu::Payload(env) => {
221                            if self.recv_buf.observe(env.seq_no) {
222                                let _ = self.inbound_sink.send(InboundEnvelope { envelope: env });
223                            }
224                        }
225                        InboundPdu::Ack(ack) => {
226                            let _ = self.cmd_tx.send(EndpointCmd::ApplyAck(ack));
227                        }
228                    }
229                }
230                _ = ack_tick.tick() => {
231                    let ack = self.recv_buf.ack();
232                    if ack.cumulative_ack > 0 {
233                        let _ = self
234                            .protocol
235                            .send_pdu(&self.remote, AkkaPdu::Ack(ack))
236                            .await;
237                    }
238                }
239            }
240        }
241    }
242}