atomr_remote/endpoint/
mod.rs1use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::sync::{mpsc, Notify};
16
17use atomr_core::actor::Address;
18
19use crate::acked_delivery::{AckedReceiveBuffer, AckedSendBuffer, SeqNo};
20use crate::envelope::RemoteEnvelope;
21use crate::pdu::{AkkaPdu, DisassociateReason};
22use crate::settings::RemoteSettings;
23use crate::transport::AkkaProtocolTransport;
24
25#[derive(Debug)]
29pub struct InboundEnvelope {
30 pub envelope: RemoteEnvelope,
31}
32
33#[derive(Debug)]
35pub enum EndpointCmd {
36 Send(RemoteEnvelope),
37 SendSystem(RemoteEnvelope),
38 ResendBuffer,
40 ApplyAck(crate::pdu::AckInfo),
42 Shutdown(DisassociateReason),
44}
45
46#[derive(Debug)]
48pub enum InboundPdu {
49 Payload(RemoteEnvelope),
50 Ack(crate::pdu::AckInfo),
51}
52
53#[derive(Clone)]
54pub struct EndpointHandle {
55 pub remote: Address,
56 pub remote_uid: u64,
57 cmd_tx: mpsc::UnboundedSender<EndpointCmd>,
58 pdu_tx: mpsc::UnboundedSender<InboundPdu>,
59 shutdown: Arc<Notify>,
60}
61
62impl EndpointHandle {
63 pub fn send(&self, env: RemoteEnvelope) {
64 let _ = self.cmd_tx.send(EndpointCmd::Send(env));
65 }
66 pub fn send_system(&self, env: RemoteEnvelope) {
67 let _ = self.cmd_tx.send(EndpointCmd::SendSystem(env));
68 }
69 pub fn resend(&self) {
70 let _ = self.cmd_tx.send(EndpointCmd::ResendBuffer);
71 }
72 pub fn apply_ack(&self, ack: crate::pdu::AckInfo) {
73 let _ = self.cmd_tx.send(EndpointCmd::ApplyAck(ack));
74 }
75 pub fn deliver(&self, pdu: InboundPdu) {
77 let _ = self.pdu_tx.send(pdu);
78 }
79 pub fn shutdown(&self, reason: DisassociateReason) {
80 let _ = self.cmd_tx.send(EndpointCmd::Shutdown(reason));
81 self.shutdown.notify_waiters();
82 }
83}
84
85pub fn spawn_endpoint(
88 protocol: Arc<AkkaProtocolTransport>,
89 settings: RemoteSettings,
90 remote: Address,
91 remote_uid: u64,
92 inbound_sink: mpsc::UnboundedSender<InboundEnvelope>,
93) -> EndpointHandle {
94 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<EndpointCmd>();
95 let (pdu_tx, pdu_rx) = mpsc::unbounded_channel::<InboundPdu>();
96 let shutdown = Arc::new(Notify::new());
97
98 let cmd_tx_for_reader = cmd_tx.clone();
99 let writer = EndpointWriter {
100 protocol: protocol.clone(),
101 settings: settings.clone(),
102 remote: remote.clone(),
103 remote_uid,
104 seq: SeqNo::default(),
105 send_buf: AckedSendBuffer::new(settings.ack_window),
106 cmd_rx,
107 shutdown: shutdown.clone(),
108 };
109 let reader = EndpointReader {
110 remote: remote.clone(),
111 recv_buf: AckedReceiveBuffer::new(),
112 inbound_sink,
113 pdu_rx,
114 cmd_tx: cmd_tx_for_reader,
115 protocol: protocol.clone(),
116 shutdown: shutdown.clone(),
117 };
118
119 tokio::spawn(writer.run());
120 tokio::spawn(reader.run());
121
122 EndpointHandle { remote, remote_uid, cmd_tx, pdu_tx, shutdown }
123}
124
125struct EndpointWriter {
126 protocol: Arc<AkkaProtocolTransport>,
127 settings: RemoteSettings,
128 remote: Address,
129 #[allow(dead_code)]
130 remote_uid: u64,
131 seq: SeqNo,
132 send_buf: AckedSendBuffer,
133 cmd_rx: mpsc::UnboundedReceiver<EndpointCmd>,
134 shutdown: Arc<Notify>,
135}
136
137impl EndpointWriter {
138 async fn run(mut self) {
139 let mut heartbeat = tokio::time::interval(self.settings.heartbeat_interval);
140 heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
141 let _ = heartbeat.tick().await;
142
143 loop {
144 tokio::select! {
145 _ = self.shutdown.notified() => {
146 let _ = self.protocol.send_pdu(
147 &self.remote,
148 AkkaPdu::Disassociate(DisassociateReason::Normal),
149 ).await;
150 break;
151 }
152 cmd = self.cmd_rx.recv() => {
153 let Some(cmd) = cmd else { break };
154 match cmd {
155 EndpointCmd::Send(mut env) | EndpointCmd::SendSystem(mut env) => {
156 env.seq_no = self.seq.advance();
157 let _ = self.send_buf.push(env.clone());
158 if let Err(e) = self
159 .protocol
160 .send_pdu(&self.remote, AkkaPdu::Payload(env))
161 .await
162 {
163 tracing::warn!(remote = %self.remote, "send failed: {e}");
164 }
165 }
166 EndpointCmd::ApplyAck(ack) => {
167 self.send_buf.apply_ack(&ack);
168 }
169 EndpointCmd::ResendBuffer => {
170 let envs = self.send_buf.drain_resend();
171 for e in envs {
172 let _ = self
173 .protocol
174 .send_pdu(&self.remote, AkkaPdu::Payload(e))
175 .await;
176 }
177 }
178 EndpointCmd::Shutdown(reason) => {
179 let _ = self
180 .protocol
181 .send_pdu(&self.remote, AkkaPdu::Disassociate(reason))
182 .await;
183 break;
184 }
185 }
186 }
187 _ = heartbeat.tick() => {
188 let _ = self
189 .protocol
190 .send_pdu(&self.remote, AkkaPdu::Heartbeat)
191 .await;
192 }
193 }
194 }
195 }
196}
197
198struct EndpointReader {
199 remote: Address,
200 recv_buf: AckedReceiveBuffer,
201 inbound_sink: mpsc::UnboundedSender<InboundEnvelope>,
202 pdu_rx: mpsc::UnboundedReceiver<InboundPdu>,
203 cmd_tx: mpsc::UnboundedSender<EndpointCmd>,
204 protocol: Arc<AkkaProtocolTransport>,
205 shutdown: Arc<Notify>,
206}
207
208impl EndpointReader {
209 async fn run(mut self) {
210 let mut ack_tick = tokio::time::interval(Duration::from_millis(200));
211 ack_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
212 let _ = ack_tick.tick().await;
213
214 loop {
215 tokio::select! {
216 _ = self.shutdown.notified() => break,
217 pdu = self.pdu_rx.recv() => {
218 let Some(pdu) = pdu else { break };
219 match pdu {
220 InboundPdu::Payload(env) => {
221 if self.recv_buf.observe(env.seq_no) {
222 let _ = self.inbound_sink.send(InboundEnvelope { envelope: env });
223 }
224 }
225 InboundPdu::Ack(ack) => {
226 let _ = self.cmd_tx.send(EndpointCmd::ApplyAck(ack));
227 }
228 }
229 }
230 _ = ack_tick.tick() => {
231 let ack = self.recv_buf.ack();
232 if ack.cumulative_ack > 0 {
233 let _ = self
234 .protocol
235 .send_pdu(&self.remote, AkkaPdu::Ack(ack))
236 .await;
237 }
238 }
239 }
240 }
241 }
242}