Skip to main content

amaru_protocols/
network_effects.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    fmt::{Display, Formatter},
17    io::ErrorKind,
18    net::SocketAddr,
19    num::NonZeroUsize,
20    time::Duration,
21};
22
23use amaru_kernel::{NonEmptyBytes, Peer};
24use amaru_ouroboros::{ConnectionId, ConnectionsResource, ToSocketAddrs};
25use pure_stage::{BoxFuture, Effects, ExternalEffect, ExternalEffectAPI, Resources, SendData};
26
27pub fn register_deserializers() -> pure_stage::DeserializerGuards {
28    vec![
29        pure_stage::register_data_deserializer::<ListenEffect>().boxed(),
30        pure_stage::register_data_deserializer::<AcceptEffect>().boxed(),
31        pure_stage::register_data_deserializer::<ConnectEffect>().boxed(),
32        pure_stage::register_data_deserializer::<SendEffect>().boxed(),
33        pure_stage::register_data_deserializer::<RecvEffect>().boxed(),
34        pure_stage::register_data_deserializer::<CloseEffect>().boxed(),
35    ]
36}
37
38pub trait NetworkOps {
39    fn listen(&self, addr: SocketAddr) -> BoxFuture<'static, Result<SocketAddr, ListenError>>;
40
41    fn accept(&self, listener_addr: SocketAddr) -> BoxFuture<'static, Result<(Peer, ConnectionId), AcceptError>>;
42
43    fn connect(&self, addr: ToSocketAddrs, timeout: Duration)
44    -> BoxFuture<'static, Result<ConnectionId, ConnectError>>;
45
46    fn send(&self, conn: ConnectionId, data: NonEmptyBytes) -> BoxFuture<'static, Result<(), SendError>>;
47
48    fn recv(&self, conn: ConnectionId, bytes: NonZeroUsize) -> BoxFuture<'static, Result<NonEmptyBytes, ReceiveError>>;
49
50    fn close(&self, conn: ConnectionId) -> BoxFuture<'static, Result<(), CloseError>>;
51}
52
53pub struct Network<'a, T>(&'a Effects<T>);
54
55impl<'a, T> Network<'a, T> {
56    pub fn new(eff: &'a Effects<T>) -> Self {
57        Network(eff)
58    }
59}
60
61impl<T> NetworkOps for Network<'_, T> {
62    fn listen(&self, addr: SocketAddr) -> BoxFuture<'static, Result<SocketAddr, ListenError>> {
63        self.0.external(ListenEffect { addr })
64    }
65
66    fn accept(&self, listener_addr: SocketAddr) -> BoxFuture<'static, Result<(Peer, ConnectionId), AcceptError>> {
67        self.0.external(AcceptEffect { listener_addr })
68    }
69
70    fn connect(
71        &self,
72        addr: ToSocketAddrs,
73        timeout: Duration,
74    ) -> BoxFuture<'static, Result<ConnectionId, ConnectError>> {
75        self.0.external(ConnectEffect { addr, timeout })
76    }
77
78    fn send(&self, conn: ConnectionId, data: NonEmptyBytes) -> BoxFuture<'static, Result<(), SendError>> {
79        self.0.external(SendEffect { conn, data })
80    }
81
82    fn recv(&self, conn: ConnectionId, bytes: NonZeroUsize) -> BoxFuture<'static, Result<NonEmptyBytes, ReceiveError>> {
83        self.0.external(RecvEffect { conn, bytes })
84    }
85
86    fn close(&self, conn: ConnectionId) -> BoxFuture<'static, Result<(), CloseError>> {
87        self.0.external(CloseEffect { conn })
88    }
89}
90
91#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
92pub struct ListenEffect {
93    pub addr: SocketAddr,
94}
95
96impl ExternalEffect for ListenEffect {
97    fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
98        Self::wrap(async move {
99            #[expect(clippy::expect_used)]
100            let resource =
101                resources.get::<ConnectionsResource>().expect("ListenEffect requires a ConnectionsResource").clone();
102            resource.listen(self.addr).await.map_err(|e| ListenError(format!("{e}")))
103        })
104    }
105}
106
107impl ExternalEffectAPI for ListenEffect {
108    type Response = Result<SocketAddr, ListenError>;
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
112pub struct ListenError(String);
113
114impl Display for ListenError {
115    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116        let ListenError(error) = self;
117        write!(f, "ListenError: {error}")
118    }
119}
120
121#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
122pub struct AcceptEffect {
123    pub listener_addr: SocketAddr,
124}
125
126impl ExternalEffect for AcceptEffect {
127    fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
128        Self::wrap(async move {
129            #[expect(clippy::expect_used)]
130            let resource =
131                resources.get::<ConnectionsResource>().expect("AcceptEffect requires a ConnectionsResource").clone();
132            #[expect(clippy::wildcard_enum_match_arm)]
133            resource.accept(self.listener_addr).await.map_err(|e| match e.kind() {
134                ErrorKind::ConnectionAborted => AcceptError::ConnectionAborted,
135                other => AcceptError::Other(format!("{other}")),
136            })
137        })
138    }
139}
140
141impl ExternalEffectAPI for AcceptEffect {
142    type Response = Result<(Peer, ConnectionId), AcceptError>;
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
146pub enum AcceptError {
147    ConnectionAborted,
148    Other(String),
149}
150
151impl Display for AcceptError {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        match self {
154            AcceptError::ConnectionAborted => {
155                write!(f, "AcceptError: connection aborted")
156            }
157            AcceptError::Other(e) => write!(f, "AcceptError: {e}"),
158        }
159    }
160}
161#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
162pub struct ConnectEffect {
163    pub addr: ToSocketAddrs,
164    pub timeout: Duration,
165}
166
167impl ExternalEffect for ConnectEffect {
168    fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
169        Self::wrap(async move {
170            #[expect(clippy::expect_used)]
171            let resource =
172                resources.get::<ConnectionsResource>().expect("ConnectEffect requires a ConnectionsResource").clone();
173            resource
174                .connect_addrs(self.addr.clone(), self.timeout)
175                .await
176                .map_err(|e| ConnectError { addr: self.addr, error: format!("{e}") })
177        })
178    }
179}
180
181impl ExternalEffectAPI for ConnectEffect {
182    type Response = Result<ConnectionId, ConnectError>;
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
186pub struct ConnectError {
187    addr: ToSocketAddrs,
188    error: String,
189}
190
191impl Display for ConnectError {
192    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
193        let ConnectError { addr, error } = self;
194        write!(f, "ConnectError on {addr:?}: {error}")
195    }
196}
197
198#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
199pub struct SendEffect {
200    pub conn: ConnectionId,
201    pub data: NonEmptyBytes,
202}
203
204impl ExternalEffect for SendEffect {
205    fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
206        Self::wrap(async move {
207            #[expect(clippy::expect_used)]
208            let resource =
209                resources.get::<ConnectionsResource>().expect("SendEffect requires a ConnectionsResource").clone();
210            resource.send(self.conn, self.data).await.map_err(|e| SendError { conn: self.conn, error: format!("{e}") })
211        })
212    }
213}
214
215impl ExternalEffectAPI for SendEffect {
216    type Response = Result<(), SendError>;
217}
218
219#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
220pub struct SendError {
221    conn: ConnectionId,
222    error: String,
223}
224
225impl Display for SendError {
226    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
227        let SendError { conn, error } = self;
228        write!(f, "SendError on {conn:?}: {error}")
229    }
230}
231
232#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
233pub struct RecvEffect {
234    pub conn: ConnectionId,
235    pub bytes: NonZeroUsize,
236}
237
238impl ExternalEffect for RecvEffect {
239    fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
240        Self::wrap(async move {
241            #[expect(clippy::expect_used)]
242            let resource =
243                resources.get::<ConnectionsResource>().expect("RecvEffect requires a ConnectionsResource").clone();
244            resource
245                .recv(self.conn, self.bytes)
246                .await
247                .map_err(|e| ReceiveError { conn: self.conn, error: format!("{e}") })
248        })
249    }
250}
251
252impl ExternalEffectAPI for RecvEffect {
253    type Response = Result<NonEmptyBytes, ReceiveError>;
254}
255
256#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
257pub struct ReceiveError {
258    conn: ConnectionId,
259    error: String,
260}
261
262impl Display for ReceiveError {
263    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
264        let ReceiveError { conn, error } = self;
265        write!(f, "ReceiveError on {conn:?}: {error}")
266    }
267}
268
269#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
270pub struct CloseEffect {
271    pub conn: ConnectionId,
272}
273
274impl ExternalEffect for CloseEffect {
275    fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
276        Self::wrap(async move {
277            #[expect(clippy::expect_used)]
278            let resource =
279                resources.get::<ConnectionsResource>().expect("CloseEffect requires a ConnectionsResource").clone();
280            resource.close(self.conn).await.map_err(|e| CloseError { conn: self.conn, error: format!("{e}") })
281        })
282    }
283}
284
285impl ExternalEffectAPI for CloseEffect {
286    type Response = Result<(), CloseError>;
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
290pub struct CloseError {
291    conn: ConnectionId,
292    error: String,
293}
294
295impl Display for CloseError {
296    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
297        let CloseError { conn, error } = self;
298        write!(f, "CloseError on {conn:?}: {error}")
299    }
300}
301
302/// Create a connection to an upstream node, either specified in the PEER environment variable,
303/// or to 127.0.0.1:3000
304#[cfg(test)]
305pub async fn create_connection(conn: &dyn amaru_ouroboros::ConnectionProvider) -> anyhow::Result<ConnectionId> {
306    tokio::time::timeout(std::time::Duration::from_secs(5), async {
307        use amaru_network::socket_addr::resolve;
308
309        let addr = ToSocketAddrs::String(std::env::var("PEER").unwrap_or_else(|_| "127.0.0.1:3000".to_string()));
310        let addr = resolve(addr).await?;
311        Ok(conn.connect(addr, Duration::from_secs(5)).await?)
312    })
313    .await?
314}