atomr_remote/endpoint/
mod.rs1use std::sync::Arc;
11use std::time::Duration;
12
13use tokio::sync::{mpsc, Notify};
14
15use atomr_core::actor::Address;
16
17use crate::acked_delivery::{AckedReceiveBuffer, AckedSendBuffer, SeqNo};
18use crate::envelope::RemoteEnvelope;
19use crate::pdu::{AkkaPdu, DisassociateReason};
20use crate::settings::RemoteSettings;
21use crate::transport::AkkaProtocolTransport;
22
23#[derive(Debug)]
27pub struct InboundEnvelope {
28 pub envelope: RemoteEnvelope,
29}
30
31#[derive(Debug)]
33pub enum EndpointCmd {
34 Send(RemoteEnvelope),
35 SendSystem(RemoteEnvelope),
36 ResendBuffer,
38 ApplyAck(crate::pdu::AckInfo),
40 Shutdown(DisassociateReason),
42}
43
44#[derive(Debug)]
46pub enum InboundPdu {
47 Payload(RemoteEnvelope),
48 Ack(crate::pdu::AckInfo),
49}
50
51#[derive(Clone)]
52pub struct EndpointHandle {
53 pub remote: Address,
54 pub remote_uid: u64,
55 cmd_tx: mpsc::UnboundedSender<EndpointCmd>,
56 pdu_tx: mpsc::UnboundedSender<InboundPdu>,
57 shutdown: Arc<Notify>,
58}
59
60impl EndpointHandle {
61 pub fn send(&self, env: RemoteEnvelope) {
62 let _ = self.cmd_tx.send(EndpointCmd::Send(env));
63 }
64 pub fn send_system(&self, env: RemoteEnvelope) {
65 let _ = self.cmd_tx.send(EndpointCmd::SendSystem(env));
66 }
67 pub fn resend(&self) {
68 let _ = self.cmd_tx.send(EndpointCmd::ResendBuffer);
69 }
70 pub fn apply_ack(&self, ack: crate::pdu::AckInfo) {
71 let _ = self.cmd_tx.send(EndpointCmd::ApplyAck(ack));
72 }
73 pub fn deliver(&self, pdu: InboundPdu) {
75 let _ = self.pdu_tx.send(pdu);
76 }
77 pub fn shutdown(&self, reason: DisassociateReason) {
78 let _ = self.cmd_tx.send(EndpointCmd::Shutdown(reason));
79 self.shutdown.notify_waiters();
80 }
81}
82
83pub fn spawn_endpoint(
86 protocol: Arc<AkkaProtocolTransport>,
87 settings: RemoteSettings,
88 remote: Address,
89 remote_uid: u64,
90 inbound_sink: mpsc::UnboundedSender<InboundEnvelope>,
91) -> EndpointHandle {
92 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<EndpointCmd>();
93 let (pdu_tx, pdu_rx) = mpsc::unbounded_channel::<InboundPdu>();
94 let shutdown = Arc::new(Notify::new());
95
96 let cmd_tx_for_reader = cmd_tx.clone();
97 let writer = EndpointWriter {
98 protocol: protocol.clone(),
99 settings: settings.clone(),
100 remote: remote.clone(),
101 remote_uid,
102 seq: SeqNo::default(),
103 send_buf: AckedSendBuffer::new(settings.ack_window),
104 cmd_rx,
105 shutdown: shutdown.clone(),
106 };
107 let reader = EndpointReader {
108 remote: remote.clone(),
109 recv_buf: AckedReceiveBuffer::new(),
110 inbound_sink,
111 pdu_rx,
112 cmd_tx: cmd_tx_for_reader,
113 protocol: protocol.clone(),
114 shutdown: shutdown.clone(),
115 };
116
117 tokio::spawn(writer.run());
118 tokio::spawn(reader.run());
119
120 EndpointHandle { remote, remote_uid, cmd_tx, pdu_tx, shutdown }
121}
122
123struct EndpointWriter {
124 protocol: Arc<AkkaProtocolTransport>,
125 settings: RemoteSettings,
126 remote: Address,
127 #[allow(dead_code)]
128 remote_uid: u64,
129 seq: SeqNo,
130 send_buf: AckedSendBuffer,
131 cmd_rx: mpsc::UnboundedReceiver<EndpointCmd>,
132 shutdown: Arc<Notify>,
133}
134
135impl EndpointWriter {
136 async fn run(mut self) {
137 let mut heartbeat = tokio::time::interval(self.settings.heartbeat_interval);
138 heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
139 let _ = heartbeat.tick().await;
140
141 loop {
142 tokio::select! {
143 _ = self.shutdown.notified() => {
144 let _ = self.protocol.send_pdu(
145 &self.remote,
146 AkkaPdu::Disassociate(DisassociateReason::Normal),
147 ).await;
148 break;
149 }
150 cmd = self.cmd_rx.recv() => {
151 let Some(cmd) = cmd else { break };
152 match cmd {
153 EndpointCmd::Send(mut env) | EndpointCmd::SendSystem(mut env) => {
154 env.seq_no = self.seq.advance();
155 let _ = self.send_buf.push(env.clone());
156 if let Err(e) = self
157 .protocol
158 .send_pdu(&self.remote, AkkaPdu::Payload(env))
159 .await
160 {
161 tracing::warn!(remote = %self.remote, "send failed: {e}");
162 }
163 }
164 EndpointCmd::ApplyAck(ack) => {
165 self.send_buf.apply_ack(&ack);
166 }
167 EndpointCmd::ResendBuffer => {
168 let envs = self.send_buf.drain_resend();
169 for e in envs {
170 let _ = self
171 .protocol
172 .send_pdu(&self.remote, AkkaPdu::Payload(e))
173 .await;
174 }
175 }
176 EndpointCmd::Shutdown(reason) => {
177 let _ = self
178 .protocol
179 .send_pdu(&self.remote, AkkaPdu::Disassociate(reason))
180 .await;
181 break;
182 }
183 }
184 }
185 _ = heartbeat.tick() => {
186 let _ = self
187 .protocol
188 .send_pdu(&self.remote, AkkaPdu::Heartbeat)
189 .await;
190 }
191 }
192 }
193 }
194}
195
196struct EndpointReader {
197 remote: Address,
198 recv_buf: AckedReceiveBuffer,
199 inbound_sink: mpsc::UnboundedSender<InboundEnvelope>,
200 pdu_rx: mpsc::UnboundedReceiver<InboundPdu>,
201 cmd_tx: mpsc::UnboundedSender<EndpointCmd>,
202 protocol: Arc<AkkaProtocolTransport>,
203 shutdown: Arc<Notify>,
204}
205
206impl EndpointReader {
207 async fn run(mut self) {
208 let mut ack_tick = tokio::time::interval(Duration::from_millis(200));
209 ack_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
210 let _ = ack_tick.tick().await;
211
212 loop {
213 tokio::select! {
214 _ = self.shutdown.notified() => break,
215 pdu = self.pdu_rx.recv() => {
216 let Some(pdu) = pdu else { break };
217 match pdu {
218 InboundPdu::Payload(env) => {
219 if self.recv_buf.observe(env.seq_no) {
220 let _ = self.inbound_sink.send(InboundEnvelope { envelope: env });
221 }
222 }
223 InboundPdu::Ack(ack) => {
224 let _ = self.cmd_tx.send(EndpointCmd::ApplyAck(ack));
225 }
226 }
227 }
228 _ = ack_tick.tick() => {
229 let ack = self.recv_buf.ack();
230 if ack.cumulative_ack > 0 {
231 let _ = self
232 .protocol
233 .send_pdu(&self.remote, AkkaPdu::Ack(ack))
234 .await;
235 }
236 }
237 }
238 }
239 }
240}