1use std::{collections::BTreeMap, net::SocketAddr, sync::Arc, time::Duration};
16
17use amaru_kernel::{EraHistory, NetworkMagic, Peer, Point, Tip};
18use amaru_ouroboros::{ConnectionId, ToSocketAddrs};
19use pure_stage::{DeserializerGuards, Effects, StageRef, register_data_deserializer};
20use tracing::instrument;
21
22use crate::{
23 accept,
24 accept::PullAccept,
25 blockfetch::Blocks,
26 chainsync::ChainSyncInitiatorMsg,
27 connection::{self, ConnectionMessage},
28 network_effects::{Network, NetworkOps},
29 protocol::Role,
30};
31
32#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
33pub enum ManagerMessage {
34 AddPeer(Peer),
35 ConnectionDied(Peer, ConnectionId, Role),
41 Connect(Peer),
43 Accepted(Peer, ConnectionId),
44 RemovePeer(Peer),
45 Listen(SocketAddr),
46 FetchBlocks {
47 peer: Peer,
48 from: Point,
49 through: Point,
50 cr: StageRef<Blocks>,
51 },
52 NewTip(Tip),
53}
54
55impl ManagerMessage {
56 fn message_type(&self) -> &'static str {
57 match self {
58 ManagerMessage::AddPeer(_) => "AddPeer",
59 ManagerMessage::ConnectionDied(..) => "ConnectionDied",
60 ManagerMessage::Connect(_) => "Connect",
61 ManagerMessage::Accepted(..) => "Accepted",
62 ManagerMessage::RemovePeer(_) => "RemovePeer",
63 ManagerMessage::Listen(_) => "Listen",
64 ManagerMessage::FetchBlocks { .. } => "FetchBlocks",
65 ManagerMessage::NewTip(_) => "NewTip",
66 }
67 }
68}
69
70#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
71pub struct Manager {
72 peers: BTreeMap<Peer, ConnectionState>,
73 magic: NetworkMagic,
74 config: ManagerConfig,
75 era_history: Arc<EraHistory>,
76 chain_sync: StageRef<ChainSyncInitiatorMsg>,
77}
78
79#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
80enum ConnectionState {
81 Scheduled,
82 Connected(ConnectionId, StageRef<ConnectionMessage>),
83 Disconnecting,
85}
86
87impl Manager {
88 pub fn new(
89 magic: NetworkMagic,
90 config: ManagerConfig,
91 era_history: Arc<EraHistory>,
92 chain_sync: StageRef<ChainSyncInitiatorMsg>,
93 ) -> Self {
94 Self { peers: BTreeMap::new(), magic, config, era_history, chain_sync }
95 }
96
97 pub fn config(&self) -> ManagerConfig {
98 self.config
99 }
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
104pub struct ManagerConfig {
105 pub connection_timeout: Duration,
106 pub reconnect_delay: Duration,
107 pub accept_interval: Duration,
108}
109
110impl ManagerConfig {
111 pub fn with_reconnect_delay(mut self, reconnect_delay: Duration) -> Self {
112 self.reconnect_delay = reconnect_delay;
113 self
114 }
115
116 pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> Self {
117 self.connection_timeout = connection_timeout;
118 self
119 }
120
121 pub fn with_accept_interval(mut self, accept_interval: Duration) -> Self {
122 self.accept_interval = accept_interval;
123 self
124 }
125}
126
127impl Default for ManagerConfig {
128 fn default() -> Self {
129 Self {
130 connection_timeout: Duration::from_secs(10),
131 reconnect_delay: Duration::from_secs(2),
132 accept_interval: Duration::from_millis(100),
133 }
134 }
135}
136
137#[instrument(name = "manager", skip_all, fields(message_type = msg.message_type()))]
145pub async fn stage(mut manager: Manager, msg: ManagerMessage, eff: Effects<ManagerMessage>) -> Manager {
146 match msg {
147 ManagerMessage::AddPeer(peer) => {
148 match manager.peers.get_mut(&peer) {
149 Some(ConnectionState::Connected(..) | ConnectionState::Scheduled) => {
150 tracing::info!(%peer, "discarding connection request, already connected or scheduled");
151 return manager;
152 }
153 Some(s @ ConnectionState::Disconnecting) => {
154 tracing::info!(%peer, "adding peer while still disconnecting");
155 *s = ConnectionState::Scheduled;
157 }
158 None => {
159 tracing::info!(%peer, "adding peer");
160 manager.peers.insert(peer.clone(), ConnectionState::Scheduled);
161 }
162 }
163 eff.send(eff.me_ref(), ManagerMessage::Connect(peer)).await;
164 }
165 ManagerMessage::Connect(peer) => {
166 let entry = match manager.peers.get_mut(&peer) {
168 Some(ConnectionState::Connected(..)) => {
169 tracing::debug!(%peer, "discarding connection request, already connected");
170 return manager;
171 }
172 Some(entry @ ConnectionState::Scheduled) => entry,
173 Some(ConnectionState::Disconnecting) => {
174 tracing::debug!(%peer, "discarding connection request, already disconnecting");
175 return manager;
176 }
177 None => {
178 tracing::debug!(%peer, "discarding connection request, not added");
179 return manager;
180 }
181 };
182 let addr = ToSocketAddrs::String(peer.to_string());
183 let conn_id = match Network::new(&eff).connect(addr, manager.config.connection_timeout).await {
184 Ok(conn_id) => conn_id,
185 Err(err) => {
186 tracing::error!(?err, %peer, reconnecting_in=?manager.config.reconnect_delay, "failed to connect to peer. Scheduling reconnect");
187 eff.schedule_after(ManagerMessage::Connect(peer), manager.config.reconnect_delay).await;
188 assert_eq!(*entry, ConnectionState::Scheduled);
189 return manager;
190 }
191 };
192 tracing::info!(?conn_id, %peer, "connected to peer");
193 start_connection_stage(&mut manager, &eff, peer, conn_id, Role::Initiator).await;
194 }
195 ManagerMessage::Accepted(peer, conn_id) => {
196 match manager.peers.get(&peer) {
197 Some(ConnectionState::Connected(..)) => {
198 tracing::debug!(%peer, "already connected. Closing the newly accepted connection");
199 close_connection(&eff, &peer, conn_id).await;
200 return manager;
201 }
202 Some(ConnectionState::Disconnecting) => {
203 tracing::debug!(%peer, "already disconnecting, the previous connection will be closed with ConnectionDied, the newly accepted connection will be closed now");
204 close_connection(&eff, &peer, conn_id).await;
205 return manager;
206 }
207 Some(ConnectionState::Scheduled) => {
208 unreachable!(
209 "Accepted peers are initiators. They will schedule reconnections on their side so this case cannot happen."
210 )
211 }
212 None => {}
213 };
214 start_connection_stage(&mut manager, &eff, peer, conn_id, Role::Responder).await;
215 }
216 ManagerMessage::RemovePeer(peer) => {
217 let Some(entry) = manager.peers.get_mut(&peer) else {
218 tracing::info!(%peer, "disconnect request ignored, not connected");
219 return manager;
220 };
221 match entry {
222 ConnectionState::Connected(_conn_id, connection) => {
223 eff.send(connection, ConnectionMessage::Disconnect).await;
224 *entry = ConnectionState::Disconnecting;
225 }
226 ConnectionState::Scheduled | ConnectionState::Disconnecting => {
227 tracing::info!(%peer, "removing currently disconnected peer");
228 manager.peers.remove(&peer);
229 }
230 }
231 }
232 ManagerMessage::ConnectionDied(peer, conn_id, role) => {
233 close_connection(&eff, &peer, conn_id).await;
234 let Some(peer_state) = manager.peers.get_mut(&peer) else {
235 tracing::debug!(%peer, "connection died, peer already removed");
236 return manager;
237 };
238 match peer_state {
239 ConnectionState::Connected(conn_id_new, ..) if *conn_id_new != conn_id => {
240 tracing::debug!(%peer, "previously terminated connection closed");
241 }
242 ConnectionState::Connected(..) => {
243 if role == Role::Initiator {
245 tracing::info!(%peer, reconnecting_in=?manager.config.reconnect_delay, "initiator connection died, scheduling reconnect");
246 eff.schedule_after(ManagerMessage::Connect(peer), manager.config.reconnect_delay).await;
247 *peer_state = ConnectionState::Scheduled;
248 } else {
249 tracing::info!(%peer, "responder connection died, removing peer");
250 manager.peers.remove(&peer);
251 }
252 }
253 ConnectionState::Scheduled => {
254 tracing::debug!(%peer, "initiator connection died, reconnect already scheduled");
255 }
256 ConnectionState::Disconnecting => {
257 tracing::debug!(%peer, "peer terminated after removal");
258 manager.peers.remove(&peer);
259 }
260 }
261 }
262 ManagerMessage::FetchBlocks { peer, from, through, cr } => {
263 tracing::trace!(?from, ?through, %peer, "fetching blocks");
264 if let Some(ConnectionState::Connected(_, connection)) = manager.peers.get(&peer) {
265 eff.send(connection, ConnectionMessage::FetchBlocks { from, through, cr }).await;
266 } else {
267 tracing::error!(%peer, "peer not found");
268 eff.send(&cr, Blocks::default()).await;
269 }
270 }
271 ManagerMessage::Listen(listen_addr) => {
272 let network = Network::new(&eff);
273 match network.listen(listen_addr).await {
276 Ok(listen_addr) => {
277 tracing::info!(%listen_addr, "listening");
278 let accept_stage = eff.stage("accept", accept::stage).await;
279 let accept_stage = eff.supervise(accept_stage, ManagerMessage::Listen(listen_addr));
282 let accept_stage = eff
283 .wire_up(accept_stage, accept::AcceptState::new(eff.me(), manager.config(), listen_addr))
284 .await;
285 eff.send(&accept_stage, PullAccept).await;
286 }
287 Err(error) => {
288 tracing::error!(%listen_addr, %error, "cannot listen");
289 return eff.terminate().await;
290 }
291 }
292 }
293 ManagerMessage::NewTip(tip) => {
294 for peer in &manager.peers {
296 if let ConnectionState::Connected(_, connection) = peer.1 {
297 eff.send(connection, ConnectionMessage::NewTip(tip)).await;
298 }
299 }
300 }
301 }
302 manager
303}
304
305async fn close_connection(eff: &Effects<ManagerMessage>, peer: &Peer, conn_id: ConnectionId) {
307 if let Err(err) = Network::new(eff).close(conn_id).await {
308 tracing::error!(?err, %peer, "failed to close connection");
309 }
310}
311
312async fn start_connection_stage(
314 manager: &mut Manager,
315 eff: &Effects<ManagerMessage>,
316 peer: Peer,
317 conn_id: ConnectionId,
318 role: Role,
319) {
320 let connection = eff.stage(format!("{conn_id}-{peer}"), connection::stage).await;
321 let connection = eff.supervise(connection, ManagerMessage::ConnectionDied(peer.clone(), conn_id, role));
322 let connection = eff
323 .wire_up(
324 connection,
325 connection::Connection::new(
326 peer.clone(),
327 conn_id,
328 role,
329 manager.config,
330 manager.magic,
331 manager.chain_sync.clone(),
332 manager.era_history.clone(),
333 ),
334 )
335 .await;
336 eff.send(&connection, ConnectionMessage::Initialize).await;
337 manager.peers.insert(peer, ConnectionState::Connected(conn_id, connection));
338}
339
340pub fn register_deserializers() -> DeserializerGuards {
341 vec![register_data_deserializer::<Manager>().boxed(), register_data_deserializer::<ManagerMessage>().boxed()]
342}