1use std::{
16 fmt::{Display, Formatter},
17 io::ErrorKind,
18 net::SocketAddr,
19 num::NonZeroUsize,
20 time::Duration,
21};
22
23use amaru_kernel::{NonEmptyBytes, Peer};
24use amaru_ouroboros::{ConnectionId, ConnectionsResource, ToSocketAddrs};
25use pure_stage::{BoxFuture, Effects, ExternalEffect, ExternalEffectAPI, Resources, SendData};
26
27pub fn register_deserializers() -> pure_stage::DeserializerGuards {
28 vec![
29 pure_stage::register_data_deserializer::<ListenEffect>().boxed(),
30 pure_stage::register_data_deserializer::<AcceptEffect>().boxed(),
31 pure_stage::register_data_deserializer::<ConnectEffect>().boxed(),
32 pure_stage::register_data_deserializer::<SendEffect>().boxed(),
33 pure_stage::register_data_deserializer::<RecvEffect>().boxed(),
34 pure_stage::register_data_deserializer::<CloseEffect>().boxed(),
35 ]
36}
37
38pub trait NetworkOps {
39 fn listen(&self, addr: SocketAddr) -> BoxFuture<'static, Result<SocketAddr, ListenError>>;
40
41 fn accept(&self, listener_addr: SocketAddr) -> BoxFuture<'static, Result<(Peer, ConnectionId), AcceptError>>;
42
43 fn connect(&self, addr: ToSocketAddrs, timeout: Duration)
44 -> BoxFuture<'static, Result<ConnectionId, ConnectError>>;
45
46 fn send(&self, conn: ConnectionId, data: NonEmptyBytes) -> BoxFuture<'static, Result<(), SendError>>;
47
48 fn recv(&self, conn: ConnectionId, bytes: NonZeroUsize) -> BoxFuture<'static, Result<NonEmptyBytes, ReceiveError>>;
49
50 fn close(&self, conn: ConnectionId) -> BoxFuture<'static, Result<(), CloseError>>;
51}
52
53pub struct Network<'a, T>(&'a Effects<T>);
54
55impl<'a, T> Network<'a, T> {
56 pub fn new(eff: &'a Effects<T>) -> Self {
57 Network(eff)
58 }
59}
60
61impl<T> NetworkOps for Network<'_, T> {
62 fn listen(&self, addr: SocketAddr) -> BoxFuture<'static, Result<SocketAddr, ListenError>> {
63 self.0.external(ListenEffect { addr })
64 }
65
66 fn accept(&self, listener_addr: SocketAddr) -> BoxFuture<'static, Result<(Peer, ConnectionId), AcceptError>> {
67 self.0.external(AcceptEffect { listener_addr })
68 }
69
70 fn connect(
71 &self,
72 addr: ToSocketAddrs,
73 timeout: Duration,
74 ) -> BoxFuture<'static, Result<ConnectionId, ConnectError>> {
75 self.0.external(ConnectEffect { addr, timeout })
76 }
77
78 fn send(&self, conn: ConnectionId, data: NonEmptyBytes) -> BoxFuture<'static, Result<(), SendError>> {
79 self.0.external(SendEffect { conn, data })
80 }
81
82 fn recv(&self, conn: ConnectionId, bytes: NonZeroUsize) -> BoxFuture<'static, Result<NonEmptyBytes, ReceiveError>> {
83 self.0.external(RecvEffect { conn, bytes })
84 }
85
86 fn close(&self, conn: ConnectionId) -> BoxFuture<'static, Result<(), CloseError>> {
87 self.0.external(CloseEffect { conn })
88 }
89}
90
91#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
92pub struct ListenEffect {
93 pub addr: SocketAddr,
94}
95
96impl ExternalEffect for ListenEffect {
97 fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
98 Self::wrap(async move {
99 #[expect(clippy::expect_used)]
100 let resource =
101 resources.get::<ConnectionsResource>().expect("ListenEffect requires a ConnectionsResource").clone();
102 resource.listen(self.addr).await.map_err(|e| ListenError(format!("{e}")))
103 })
104 }
105}
106
107impl ExternalEffectAPI for ListenEffect {
108 type Response = Result<SocketAddr, ListenError>;
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
112pub struct ListenError(String);
113
114impl Display for ListenError {
115 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116 let ListenError(error) = self;
117 write!(f, "ListenError: {error}")
118 }
119}
120
121#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
122pub struct AcceptEffect {
123 pub listener_addr: SocketAddr,
124}
125
126impl ExternalEffect for AcceptEffect {
127 fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
128 Self::wrap(async move {
129 #[expect(clippy::expect_used)]
130 let resource =
131 resources.get::<ConnectionsResource>().expect("AcceptEffect requires a ConnectionsResource").clone();
132 #[expect(clippy::wildcard_enum_match_arm)]
133 resource.accept(self.listener_addr).await.map_err(|e| match e.kind() {
134 ErrorKind::ConnectionAborted => AcceptError::ConnectionAborted,
135 other => AcceptError::Other(format!("{other}")),
136 })
137 })
138 }
139}
140
141impl ExternalEffectAPI for AcceptEffect {
142 type Response = Result<(Peer, ConnectionId), AcceptError>;
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
146pub enum AcceptError {
147 ConnectionAborted,
148 Other(String),
149}
150
151impl Display for AcceptError {
152 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153 match self {
154 AcceptError::ConnectionAborted => {
155 write!(f, "AcceptError: connection aborted")
156 }
157 AcceptError::Other(e) => write!(f, "AcceptError: {e}"),
158 }
159 }
160}
161#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
162pub struct ConnectEffect {
163 pub addr: ToSocketAddrs,
164 pub timeout: Duration,
165}
166
167impl ExternalEffect for ConnectEffect {
168 fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
169 Self::wrap(async move {
170 #[expect(clippy::expect_used)]
171 let resource =
172 resources.get::<ConnectionsResource>().expect("ConnectEffect requires a ConnectionsResource").clone();
173 resource
174 .connect_addrs(self.addr.clone(), self.timeout)
175 .await
176 .map_err(|e| ConnectError { addr: self.addr, error: format!("{e}") })
177 })
178 }
179}
180
181impl ExternalEffectAPI for ConnectEffect {
182 type Response = Result<ConnectionId, ConnectError>;
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
186pub struct ConnectError {
187 addr: ToSocketAddrs,
188 error: String,
189}
190
191impl Display for ConnectError {
192 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
193 let ConnectError { addr, error } = self;
194 write!(f, "ConnectError on {addr:?}: {error}")
195 }
196}
197
198#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
199pub struct SendEffect {
200 pub conn: ConnectionId,
201 pub data: NonEmptyBytes,
202}
203
204impl ExternalEffect for SendEffect {
205 fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
206 Self::wrap(async move {
207 #[expect(clippy::expect_used)]
208 let resource =
209 resources.get::<ConnectionsResource>().expect("SendEffect requires a ConnectionsResource").clone();
210 resource.send(self.conn, self.data).await.map_err(|e| SendError { conn: self.conn, error: format!("{e}") })
211 })
212 }
213}
214
215impl ExternalEffectAPI for SendEffect {
216 type Response = Result<(), SendError>;
217}
218
219#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
220pub struct SendError {
221 conn: ConnectionId,
222 error: String,
223}
224
225impl Display for SendError {
226 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
227 let SendError { conn, error } = self;
228 write!(f, "SendError on {conn:?}: {error}")
229 }
230}
231
232#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
233pub struct RecvEffect {
234 pub conn: ConnectionId,
235 pub bytes: NonZeroUsize,
236}
237
238impl ExternalEffect for RecvEffect {
239 fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
240 Self::wrap(async move {
241 #[expect(clippy::expect_used)]
242 let resource =
243 resources.get::<ConnectionsResource>().expect("RecvEffect requires a ConnectionsResource").clone();
244 resource
245 .recv(self.conn, self.bytes)
246 .await
247 .map_err(|e| ReceiveError { conn: self.conn, error: format!("{e}") })
248 })
249 }
250}
251
252impl ExternalEffectAPI for RecvEffect {
253 type Response = Result<NonEmptyBytes, ReceiveError>;
254}
255
256#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
257pub struct ReceiveError {
258 conn: ConnectionId,
259 error: String,
260}
261
262impl Display for ReceiveError {
263 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
264 let ReceiveError { conn, error } = self;
265 write!(f, "ReceiveError on {conn:?}: {error}")
266 }
267}
268
269#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
270pub struct CloseEffect {
271 pub conn: ConnectionId,
272}
273
274impl ExternalEffect for CloseEffect {
275 fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
276 Self::wrap(async move {
277 #[expect(clippy::expect_used)]
278 let resource =
279 resources.get::<ConnectionsResource>().expect("CloseEffect requires a ConnectionsResource").clone();
280 resource.close(self.conn).await.map_err(|e| CloseError { conn: self.conn, error: format!("{e}") })
281 })
282 }
283}
284
285impl ExternalEffectAPI for CloseEffect {
286 type Response = Result<(), CloseError>;
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
290pub struct CloseError {
291 conn: ConnectionId,
292 error: String,
293}
294
295impl Display for CloseError {
296 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
297 let CloseError { conn, error } = self;
298 write!(f, "CloseError on {conn:?}: {error}")
299 }
300}
301
302#[cfg(test)]
305pub async fn create_connection(conn: &dyn amaru_ouroboros::ConnectionProvider) -> anyhow::Result<ConnectionId> {
306 tokio::time::timeout(std::time::Duration::from_secs(5), async {
307 use amaru_network::socket_addr::resolve;
308
309 let addr = ToSocketAddrs::String(std::env::var("PEER").unwrap_or_else(|_| "127.0.0.1:3000".to_string()));
310 let addr = resolve(addr).await?;
311 Ok(conn.connect(addr, Duration::from_secs(5)).await?)
312 })
313 .await?
314}