Skip to main content

amaru_protocols/
manager.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeMap, net::SocketAddr, sync::Arc, time::Duration};
16
17use amaru_kernel::{EraHistory, NetworkMagic, Peer, Point, Tip};
18use amaru_ouroboros::{ConnectionId, ToSocketAddrs};
19use pure_stage::{DeserializerGuards, Effects, StageRef, register_data_deserializer};
20use tracing::instrument;
21
22use crate::{
23    accept,
24    accept::PullAccept,
25    blockfetch::Blocks,
26    chainsync::ChainSyncInitiatorMsg,
27    connection::{self, ConnectionMessage},
28    network_effects::{Network, NetworkOps},
29    protocol::Role,
30};
31
32#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
33pub enum ManagerMessage {
34    AddPeer(Peer),
35    /// Internal message sent from the connection stage only!
36    ///
37    /// Must contain the connection ID so that we can then close the actual socket;
38    /// the `peers` entry could already have been removed by RemovePeer.
39    // TODO move to separate message type
40    ConnectionDied(Peer, ConnectionId, Role),
41    // TODO move to separate message type
42    Connect(Peer),
43    Accepted(Peer, ConnectionId),
44    RemovePeer(Peer),
45    Listen(SocketAddr),
46    FetchBlocks {
47        peer: Peer,
48        from: Point,
49        through: Point,
50        cr: StageRef<Blocks>,
51    },
52    NewTip(Tip),
53}
54
55impl ManagerMessage {
56    fn message_type(&self) -> &'static str {
57        match self {
58            ManagerMessage::AddPeer(_) => "AddPeer",
59            ManagerMessage::ConnectionDied(..) => "ConnectionDied",
60            ManagerMessage::Connect(_) => "Connect",
61            ManagerMessage::Accepted(..) => "Accepted",
62            ManagerMessage::RemovePeer(_) => "RemovePeer",
63            ManagerMessage::Listen(_) => "Listen",
64            ManagerMessage::FetchBlocks { .. } => "FetchBlocks",
65            ManagerMessage::NewTip(_) => "NewTip",
66        }
67    }
68}
69
70#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
71pub struct Manager {
72    peers: BTreeMap<Peer, ConnectionState>,
73    magic: NetworkMagic,
74    config: ManagerConfig,
75    era_history: Arc<EraHistory>,
76    chain_sync: StageRef<ChainSyncInitiatorMsg>,
77}
78
79#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
80enum ConnectionState {
81    Scheduled,
82    Connected(ConnectionId, StageRef<ConnectionMessage>),
83    // Does not contain the connection ID because that will be received in the ConnectionDied message.
84    Disconnecting,
85}
86
87impl Manager {
88    pub fn new(
89        magic: NetworkMagic,
90        config: ManagerConfig,
91        era_history: Arc<EraHistory>,
92        chain_sync: StageRef<ChainSyncInitiatorMsg>,
93    ) -> Self {
94        Self { peers: BTreeMap::new(), magic, config, era_history, chain_sync }
95    }
96
97    pub fn config(&self) -> ManagerConfig {
98        self.config
99    }
100}
101
102/// Parameters for the Manager: connection timeout, reconnection delay, etc...
103#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
104pub struct ManagerConfig {
105    pub connection_timeout: Duration,
106    pub reconnect_delay: Duration,
107    pub accept_interval: Duration,
108}
109
110impl ManagerConfig {
111    pub fn with_reconnect_delay(mut self, reconnect_delay: Duration) -> Self {
112        self.reconnect_delay = reconnect_delay;
113        self
114    }
115
116    pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> Self {
117        self.connection_timeout = connection_timeout;
118        self
119    }
120
121    pub fn with_accept_interval(mut self, accept_interval: Duration) -> Self {
122        self.accept_interval = accept_interval;
123        self
124    }
125}
126
127impl Default for ManagerConfig {
128    fn default() -> Self {
129        Self {
130            connection_timeout: Duration::from_secs(10),
131            reconnect_delay: Duration::from_secs(2),
132            accept_interval: Duration::from_millis(100),
133        }
134    }
135}
136
137/// The manager stage is responsible for managing the connections to the peers.
138///
139/// The semantics of the operations are as follows:
140/// - AddPeer: add a peer to the manager unless that peer is already added
141/// - RemovePeer: remove a peer from the manager, which will terminate a connection if currently connected
142///
143/// A peer can be added right after being removed even though the socket will be closed asynchronously.
144#[instrument(name = "manager", skip_all, fields(message_type = msg.message_type()))]
145pub async fn stage(mut manager: Manager, msg: ManagerMessage, eff: Effects<ManagerMessage>) -> Manager {
146    match msg {
147        ManagerMessage::AddPeer(peer) => {
148            match manager.peers.get_mut(&peer) {
149                Some(ConnectionState::Connected(..) | ConnectionState::Scheduled) => {
150                    tracing::info!(%peer, "discarding connection request, already connected or scheduled");
151                    return manager;
152                }
153                Some(s @ ConnectionState::Disconnecting) => {
154                    tracing::info!(%peer, "adding peer while still disconnecting");
155                    // old connection stage will report ConnectionDied which will close the socket
156                    *s = ConnectionState::Scheduled;
157                }
158                None => {
159                    tracing::info!(%peer, "adding peer");
160                    manager.peers.insert(peer.clone(), ConnectionState::Scheduled);
161                }
162            }
163            eff.send(eff.me_ref(), ManagerMessage::Connect(peer)).await;
164        }
165        ManagerMessage::Connect(peer) => {
166            // TODO(network) slow connection will block the manager, should delegate to a child stage
167            let entry = match manager.peers.get_mut(&peer) {
168                Some(ConnectionState::Connected(..)) => {
169                    tracing::debug!(%peer, "discarding connection request, already connected");
170                    return manager;
171                }
172                Some(entry @ ConnectionState::Scheduled) => entry,
173                Some(ConnectionState::Disconnecting) => {
174                    tracing::debug!(%peer, "discarding connection request, already disconnecting");
175                    return manager;
176                }
177                None => {
178                    tracing::debug!(%peer, "discarding connection request, not added");
179                    return manager;
180                }
181            };
182            let addr = ToSocketAddrs::String(peer.to_string());
183            let conn_id = match Network::new(&eff).connect(addr, manager.config.connection_timeout).await {
184                Ok(conn_id) => conn_id,
185                Err(err) => {
186                    tracing::error!(?err, %peer, reconnecting_in=?manager.config.reconnect_delay, "failed to connect to peer. Scheduling reconnect");
187                    eff.schedule_after(ManagerMessage::Connect(peer), manager.config.reconnect_delay).await;
188                    assert_eq!(*entry, ConnectionState::Scheduled);
189                    return manager;
190                }
191            };
192            tracing::info!(?conn_id, %peer, "connected to peer");
193            start_connection_stage(&mut manager, &eff, peer, conn_id, Role::Initiator).await;
194        }
195        ManagerMessage::Accepted(peer, conn_id) => {
196            match manager.peers.get(&peer) {
197                Some(ConnectionState::Connected(..)) => {
198                    tracing::debug!(%peer, "already connected. Closing the newly accepted connection");
199                    close_connection(&eff, &peer, conn_id).await;
200                    return manager;
201                }
202                Some(ConnectionState::Disconnecting) => {
203                    tracing::debug!(%peer, "already disconnecting, the previous connection will be closed with ConnectionDied, the newly accepted connection will be closed now");
204                    close_connection(&eff, &peer, conn_id).await;
205                    return manager;
206                }
207                Some(ConnectionState::Scheduled) => {
208                    unreachable!(
209                        "Accepted peers are initiators. They will schedule reconnections on their side so this case cannot happen."
210                    )
211                }
212                None => {}
213            };
214            start_connection_stage(&mut manager, &eff, peer, conn_id, Role::Responder).await;
215        }
216        ManagerMessage::RemovePeer(peer) => {
217            let Some(entry) = manager.peers.get_mut(&peer) else {
218                tracing::info!(%peer, "disconnect request ignored, not connected");
219                return manager;
220            };
221            match entry {
222                ConnectionState::Connected(_conn_id, connection) => {
223                    eff.send(connection, ConnectionMessage::Disconnect).await;
224                    *entry = ConnectionState::Disconnecting;
225                }
226                ConnectionState::Scheduled | ConnectionState::Disconnecting => {
227                    tracing::info!(%peer, "removing currently disconnected peer");
228                    manager.peers.remove(&peer);
229                }
230            }
231        }
232        ManagerMessage::ConnectionDied(peer, conn_id, role) => {
233            close_connection(&eff, &peer, conn_id).await;
234            let Some(peer_state) = manager.peers.get_mut(&peer) else {
235                tracing::debug!(%peer, "connection died, peer already removed");
236                return manager;
237            };
238            match peer_state {
239                ConnectionState::Connected(conn_id_new, ..) if *conn_id_new != conn_id => {
240                    tracing::debug!(%peer, "previously terminated connection closed");
241                }
242                ConnectionState::Connected(..) => {
243                    // Only reconnect on the initiator side
244                    if role == Role::Initiator {
245                        tracing::info!(%peer, reconnecting_in=?manager.config.reconnect_delay, "initiator connection died, scheduling reconnect");
246                        eff.schedule_after(ManagerMessage::Connect(peer), manager.config.reconnect_delay).await;
247                        *peer_state = ConnectionState::Scheduled;
248                    } else {
249                        tracing::info!(%peer, "responder connection died, removing peer");
250                        manager.peers.remove(&peer);
251                    }
252                }
253                ConnectionState::Scheduled => {
254                    tracing::debug!(%peer, "initiator connection died, reconnect already scheduled");
255                }
256                ConnectionState::Disconnecting => {
257                    tracing::debug!(%peer, "peer terminated after removal");
258                    manager.peers.remove(&peer);
259                }
260            }
261        }
262        ManagerMessage::FetchBlocks { peer, from, through, cr } => {
263            tracing::trace!(?from, ?through, %peer, "fetching blocks");
264            if let Some(ConnectionState::Connected(_, connection)) = manager.peers.get(&peer) {
265                eff.send(connection, ConnectionMessage::FetchBlocks { from, through, cr }).await;
266            } else {
267                tracing::error!(%peer, "peer not found");
268                eff.send(&cr, Blocks::default()).await;
269            }
270        }
271        ManagerMessage::Listen(listen_addr) => {
272            let network = Network::new(&eff);
273            // If we cannot listen to this address we terminate the node because this means that
274            // the configuration needs to be reviewed.
275            match network.listen(listen_addr).await {
276                Ok(listen_addr) => {
277                    tracing::info!(%listen_addr, "listening");
278                    let accept_stage = eff.stage("accept", accept::stage).await;
279                    // If the accept stage fails, the tombstone message triggers a restart.
280                    // The listen() call is idempotent and will clean up the old listener.
281                    let accept_stage = eff.supervise(accept_stage, ManagerMessage::Listen(listen_addr));
282                    let accept_stage = eff
283                        .wire_up(accept_stage, accept::AcceptState::new(eff.me(), manager.config(), listen_addr))
284                        .await;
285                    eff.send(&accept_stage, PullAccept).await;
286                }
287                Err(error) => {
288                    tracing::error!(%listen_addr, %error, "cannot listen");
289                    return eff.terminate().await;
290                }
291            }
292        }
293        ManagerMessage::NewTip(tip) => {
294            // forward to all peers
295            for peer in &manager.peers {
296                if let ConnectionState::Connected(_, connection) = peer.1 {
297                    eff.send(connection, ConnectionMessage::NewTip(tip)).await;
298                }
299            }
300        }
301    }
302    manager
303}
304
305/// Close the connection and log any errors.
306async fn close_connection(eff: &Effects<ManagerMessage>, peer: &Peer, conn_id: ConnectionId) {
307    if let Err(err) = Network::new(eff).close(conn_id).await {
308        tracing::error!(?err, %peer, "failed to close connection");
309    }
310}
311
312/// Start a stage to handle the connection lifecycle.
313async fn start_connection_stage(
314    manager: &mut Manager,
315    eff: &Effects<ManagerMessage>,
316    peer: Peer,
317    conn_id: ConnectionId,
318    role: Role,
319) {
320    let connection = eff.stage(format!("{conn_id}-{peer}"), connection::stage).await;
321    let connection = eff.supervise(connection, ManagerMessage::ConnectionDied(peer.clone(), conn_id, role));
322    let connection = eff
323        .wire_up(
324            connection,
325            connection::Connection::new(
326                peer.clone(),
327                conn_id,
328                role,
329                manager.config,
330                manager.magic,
331                manager.chain_sync.clone(),
332                manager.era_history.clone(),
333            ),
334        )
335        .await;
336    eff.send(&connection, ConnectionMessage::Initialize).await;
337    manager.peers.insert(peer, ConnectionState::Connected(conn_id, connection));
338}
339
340pub fn register_deserializers() -> DeserializerGuards {
341    vec![register_data_deserializer::<Manager>().boxed(), register_data_deserializer::<ManagerMessage>().boxed()]
342}