smoldot/libp2p/collection/
multi_stream.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18use super::{
19    super::{
20        connection::{established, noise, webrtc_framing},
21        read_write::ReadWrite,
22    },
23    ConnectionToCoordinator, ConnectionToCoordinatorInner, CoordinatorToConnection,
24    CoordinatorToConnectionInner, NotificationsOutErr, PeerId, ShutdownCause, SubstreamFate,
25    SubstreamId,
26};
27
28use alloc::{collections::VecDeque, string::ToString as _, sync::Arc};
29use core::{
30    hash::Hash,
31    ops::{Add, Sub},
32    time::Duration,
33};
34
35/// State machine dedicated to a single multi-stream connection.
36pub struct MultiStreamConnectionTask<TNow, TSubId> {
37    connection: MultiStreamConnectionTaskInner<TNow, TSubId>,
38}
39enum MultiStreamConnectionTaskInner<TNow, TSubId> {
40    /// Connection is still in its handshake phase.
41    Handshake {
42        /// Substream that has been opened to perform the handshake, if any.
43        opened_substream: Option<(TSubId, webrtc_framing::WebRtcFraming)>,
44
45        /// Noise handshake in progress. Always `Some`, except to be temporarily extracted.
46        handshake: Option<noise::HandshakeInProgress>,
47
48        /// Other substreams, besides [`MultiStreamConnectionTaskInner::Handshake::opened_substream`],
49        /// that have been opened. For each substream, contains a boolean indicating whether the
50        /// substream is outbound (`true`) or inbound (`false`).
51        ///
52        /// Due to the asynchronous nature of the protocol, it is not a logic error to open
53        /// additional substreams before the handshake has finished. The remote might think that
54        /// the handshake has finished while the local node hasn't finished processing it yet.
55        ///
56        /// These substreams aren't processed as long as the handshake hasn't finished. It is,
57        /// however, important to remember that substreams have been opened.
58        extra_open_substreams: hashbrown::HashMap<TSubId, bool, fnv::FnvBuildHasher>,
59
60        /// State machine used once the connection has been established. Unused during the
61        /// handshake, but created ahead of time. Always `Some`, except to be temporarily
62        /// extracted.
63        established: Option<established::MultiStream<TNow, TSubId, Option<SubstreamId>>>,
64    },
65
66    /// Connection has been fully established.
67    Established {
68        established: established::MultiStream<TNow, TSubId, Option<SubstreamId>>,
69
70        /// If `Some`, contains the substream that was used for the handshake. This substream
71        /// is meant to be closed as soon as possible.
72        handshake_substream: Option<TSubId>,
73
74        /// If `Some`, then no `HandshakeFinished` message has been sent back yet.
75        handshake_finished_message_to_send: Option<PeerId>,
76
77        /// Because outgoing substream ids are assigned by the coordinator, we maintain a mapping
78        /// of the "outer ids" to "inner ids".
79        outbound_substreams_map:
80            hashbrown::HashMap<SubstreamId, established::SubstreamId, fnv::FnvBuildHasher>,
81
82        /// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] or a
83        /// [`ConnectionToCoordinatorInner::NotificationsInClose`] is emitted, an
84        /// entry is added to this list. If the coordinator accepts or refuses a substream in this
85        /// list, or closes a substream in this list, the acceptance/refusal/closing is dismissed.
86        // TODO: this works only because SubstreamIds aren't reused
87        notifications_in_close_acknowledgments:
88            hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>,
89
90        /// Messages about inbound accept cancellations to send back.
91        inbound_accept_cancel_events: VecDeque<established::SubstreamId>,
92    },
93
94    /// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
95    /// message has been sent and is waiting to be acknowledged.
96    ShutdownWaitingAck {
97        /// What has initiated the shutdown.
98        initiator: ShutdownInitiator,
99
100        /// `None` if the [`ConnectionToCoordinatorInner::StartShutdown`] message has already
101        /// been sent to the coordinator. `Some` if the message hasn't been sent yet.
102        start_shutdown_message_to_send: Option<Option<ShutdownCause>>,
103
104        /// `true` if the [`ConnectionToCoordinatorInner::ShutdownFinished`] message has already
105        /// been sent to the coordinator.
106        shutdown_finish_message_sent: bool,
107    },
108
109    /// Connection has finished its shutdown and its shutdown has been acknowledged. There is
110    /// nothing more to do except stop the connection task.
111    ShutdownAcked {
112        /// What has initiated the shutdown.
113        initiator: ShutdownInitiator,
114    },
115}
116
117#[derive(Debug, Copy, Clone, PartialEq, Eq)]
118enum ShutdownInitiator {
119    /// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message.
120    Coordinator,
121    /// [`MultiStreamConnectionTask::reset`] has been called.
122    Api,
123}
124
125impl<TNow, TSubId> MultiStreamConnectionTask<TNow, TSubId>
126where
127    TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
128    TSubId: Clone + PartialEq + Eq + Hash,
129{
130    // Note that the parameters of this function are a bit rough and undocumented, as this is
131    // a function only called from the parent module.
132    pub(super) fn new(
133        randomness_seed: [u8; 32],
134        when_connection_start: TNow,
135        handshake: noise::HandshakeInProgress,
136        max_inbound_substreams: usize,
137        substreams_capacity: usize,
138        max_protocol_name_len: usize,
139        ping_protocol: Arc<str>,
140    ) -> Self {
141        MultiStreamConnectionTask {
142            connection: MultiStreamConnectionTaskInner::Handshake {
143                // TODO: the handshake doesn't have a timeout
144                handshake: Some(handshake),
145                opened_substream: None,
146                extra_open_substreams: hashbrown::HashMap::with_capacity_and_hasher(
147                    0,
148                    Default::default(),
149                ),
150                established: Some(established::MultiStream::webrtc(established::Config {
151                    max_inbound_substreams,
152                    substreams_capacity,
153                    max_protocol_name_len,
154                    randomness_seed,
155                    ping_protocol: ping_protocol.to_string(), // TODO: cloning :-/
156                    ping_interval: Duration::from_secs(20),   // TODO: hardcoded
157                    ping_timeout: Duration::from_secs(10),    // TODO: hardcoded
158                    first_out_ping: when_connection_start, // TODO: only start the ping after the Noise handshake has ended
159                })),
160            },
161        }
162    }
163
164    /// Pulls a message to send back to the coordinator.
165    ///
166    /// This function takes ownership of `self` and optionally yields it back. If the first
167    /// option contains `None`, then no more message will be generated and the
168    /// [`MultiStreamConnectionTask`] has vanished. This will happen after the connection has been
169    /// shut down or reset.
170    /// It is possible for `self` to not be yielded back even if substreams are still open, in
171    /// which case the API user should abruptly reset the connection, for example by sending a
172    /// TCP RST flag.
173    ///
174    /// If any message is returned, it is the responsibility of the API user to send it to the
175    /// coordinator.
176    /// Do not attempt to buffer the message being returned, as it would work against the
177    /// back-pressure strategy used internally. As soon as a message is returned, it should be
178    /// delivered. If the coordinator is busy at the moment a message should be delivered, then
179    /// the entire thread of execution dedicated to this [`MultiStreamConnectionTask`] should be
180    /// paused until the coordinator is ready and the message delivered.
181    ///
182    /// Messages aren't generated spontaneously. In other words, you don't need to periodically
183    /// call this function just in case there's a new message. Messages are always generated after
184    /// [`MultiStreamConnectionTask::substream_read_write`],
185    /// [`MultiStreamConnectionTask::add_substream`], or [`MultiStreamConnectionTask::reset`]
186    /// has been called. Multiple messages can happen in a row.
187    ///
188    /// Because this function frees space in a buffer, processing substreams again after it
189    /// has returned might read/write more data and generate an event again. In other words,
190    /// the API user should call [`MultiStreamConnectionTask::substream_read_write`] and
191    /// [`MultiStreamConnectionTask::pull_message_to_coordinator`] repeatedly in a loop until no
192    /// more message is generated.
193    pub fn pull_message_to_coordinator(
194        mut self,
195    ) -> (Option<Self>, Option<ConnectionToCoordinator>) {
196        match &mut self.connection {
197            MultiStreamConnectionTaskInner::Handshake { .. } => (Some(self), None),
198            MultiStreamConnectionTaskInner::Established {
199                established,
200                outbound_substreams_map,
201                handshake_finished_message_to_send,
202                notifications_in_close_acknowledgments,
203                inbound_accept_cancel_events,
204                ..
205            } => {
206                if let Some(remote_peer_id) = handshake_finished_message_to_send.take() {
207                    return (
208                        Some(self),
209                        Some(ConnectionToCoordinator {
210                            inner: ConnectionToCoordinatorInner::HandshakeFinished(remote_peer_id),
211                        }),
212                    );
213                }
214
215                if let Some(substream_id) = inbound_accept_cancel_events.pop_front() {
216                    return (
217                        Some(self),
218                        Some(ConnectionToCoordinator {
219                            inner: ConnectionToCoordinatorInner::InboundAcceptedCancel {
220                                id: substream_id,
221                            },
222                        }),
223                    );
224                }
225
226                let event = match established.pull_event() {
227                    Some(established::Event::NewOutboundSubstreamsForbidden) => {
228                        // TODO: handle properly
229                        self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
230                            start_shutdown_message_to_send: Some(None),
231                            shutdown_finish_message_sent: false,
232                            initiator: ShutdownInitiator::Coordinator,
233                        };
234                        Some(ConnectionToCoordinatorInner::StartShutdown(None))
235                    }
236                    Some(established::Event::InboundError(err)) => {
237                        Some(ConnectionToCoordinatorInner::InboundError(err))
238                    }
239                    Some(established::Event::InboundNegotiated { id, protocol_name }) => {
240                        Some(ConnectionToCoordinatorInner::InboundNegotiated { id, protocol_name })
241                    }
242                    Some(established::Event::InboundNegotiatedCancel { id, .. }) => {
243                        notifications_in_close_acknowledgments.insert(id);
244                        None
245                    }
246                    Some(established::Event::InboundAcceptedCancel { id, .. }) => {
247                        Some(ConnectionToCoordinatorInner::InboundAcceptedCancel { id })
248                    }
249                    Some(established::Event::RequestIn { id, request, .. }) => {
250                        Some(ConnectionToCoordinatorInner::RequestIn { id, request })
251                    }
252                    Some(established::Event::Response {
253                        response,
254                        user_data,
255                        ..
256                    }) => {
257                        let Some(outer_substream_id) = user_data else {
258                            panic!()
259                        };
260                        outbound_substreams_map.remove(&outer_substream_id).unwrap();
261                        Some(ConnectionToCoordinatorInner::Response {
262                            response,
263                            id: outer_substream_id,
264                        })
265                    }
266                    Some(established::Event::NotificationsInOpen { id, handshake, .. }) => {
267                        Some(ConnectionToCoordinatorInner::NotificationsInOpen { id, handshake })
268                    }
269                    Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
270                        notifications_in_close_acknowledgments.insert(id);
271                        Some(ConnectionToCoordinatorInner::NotificationsInOpenCancel { id })
272                    }
273                    Some(established::Event::NotificationIn { id, notification }) => {
274                        Some(ConnectionToCoordinatorInner::NotificationIn { id, notification })
275                    }
276                    Some(established::Event::NotificationsInClose { id, outcome, .. }) => {
277                        notifications_in_close_acknowledgments.insert(id);
278                        Some(ConnectionToCoordinatorInner::NotificationsInClose { id, outcome })
279                    }
280                    Some(established::Event::NotificationsOutResult { id, result }) => {
281                        let (outer_substream_id, result) = match result {
282                            Ok(r) => {
283                                let Some(outer_substream_id) = established[id] else {
284                                    panic!()
285                                };
286                                (outer_substream_id, Ok(r))
287                            }
288                            Err((err, ud)) => {
289                                let Some(outer_substream_id) = ud else {
290                                    panic!()
291                                };
292                                outbound_substreams_map.remove(&outer_substream_id);
293                                (outer_substream_id, Err(NotificationsOutErr::Substream(err)))
294                            }
295                        };
296
297                        Some(ConnectionToCoordinatorInner::NotificationsOutResult {
298                            id: outer_substream_id,
299                            result,
300                        })
301                    }
302                    Some(established::Event::NotificationsOutCloseDemanded { id }) => {
303                        let Some(outer_substream_id) = established[id] else {
304                            panic!()
305                        };
306                        Some(
307                            ConnectionToCoordinatorInner::NotificationsOutCloseDemanded {
308                                id: outer_substream_id,
309                            },
310                        )
311                    }
312                    Some(established::Event::NotificationsOutReset { user_data, .. }) => {
313                        let Some(outer_substream_id) = user_data else {
314                            panic!()
315                        };
316                        outbound_substreams_map.remove(&outer_substream_id);
317                        Some(ConnectionToCoordinatorInner::NotificationsOutReset {
318                            id: outer_substream_id,
319                        })
320                    }
321                    Some(established::Event::PingOutSuccess { ping_time }) => {
322                        Some(ConnectionToCoordinatorInner::PingOutSuccess { ping_time })
323                    }
324                    Some(established::Event::PingOutFailed) => {
325                        Some(ConnectionToCoordinatorInner::PingOutFailed)
326                    }
327                    None => None,
328                };
329
330                (
331                    Some(self),
332                    event.map(|ev| ConnectionToCoordinator { inner: ev }),
333                )
334            }
335            MultiStreamConnectionTaskInner::ShutdownWaitingAck {
336                start_shutdown_message_to_send,
337                shutdown_finish_message_sent,
338                ..
339            } => {
340                if let Some(reason) = start_shutdown_message_to_send.take() {
341                    debug_assert!(!*shutdown_finish_message_sent);
342                    (
343                        Some(self),
344                        Some(ConnectionToCoordinator {
345                            inner: ConnectionToCoordinatorInner::StartShutdown(reason),
346                        }),
347                    )
348                } else if !*shutdown_finish_message_sent {
349                    debug_assert!(start_shutdown_message_to_send.is_none());
350                    *shutdown_finish_message_sent = true;
351                    (
352                        Some(self),
353                        Some(ConnectionToCoordinator {
354                            inner: ConnectionToCoordinatorInner::ShutdownFinished,
355                        }),
356                    )
357                } else {
358                    (Some(self), None)
359                }
360            }
361            MultiStreamConnectionTaskInner::ShutdownAcked { .. } => (None, None),
362        }
363    }
364
365    /// Injects a message that has been pulled from the coordinator.
366    ///
367    /// Calling this function might generate data to send to the connection. You should call
368    /// [`MultiStreamConnectionTask::desired_outbound_substreams`] and
369    /// [`MultiStreamConnectionTask::substream_read_write`] after this function has returned.
370    pub fn inject_coordinator_message(&mut self, now: &TNow, message: CoordinatorToConnection) {
371        match (message.inner, &mut self.connection) {
372            (
373                CoordinatorToConnectionInner::AcceptInbound {
374                    substream_id,
375                    inbound_ty,
376                },
377                MultiStreamConnectionTaskInner::Established {
378                    established,
379                    notifications_in_close_acknowledgments,
380                    inbound_accept_cancel_events,
381                    ..
382                },
383            ) => {
384                if !notifications_in_close_acknowledgments.remove(&substream_id) {
385                    established.accept_inbound(substream_id, inbound_ty, None);
386                } else {
387                    inbound_accept_cancel_events.push_back(substream_id)
388                }
389            }
390            (
391                CoordinatorToConnectionInner::RejectInbound { substream_id },
392                MultiStreamConnectionTaskInner::Established {
393                    established,
394                    notifications_in_close_acknowledgments,
395                    ..
396                },
397            ) => {
398                if !notifications_in_close_acknowledgments.remove(&substream_id) {
399                    established.reject_inbound(substream_id);
400                }
401            }
402            (
403                CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length },
404                MultiStreamConnectionTaskInner::Handshake {
405                    established: Some(established),
406                    ..
407                }
408                | MultiStreamConnectionTaskInner::Established { established, .. },
409            ) => {
410                established.set_max_protocol_name_len(new_max_length);
411            }
412            (
413                CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. },
414                MultiStreamConnectionTaskInner::Handshake {
415                    established: None, ..
416                },
417            ) => {
418                unreachable!()
419            }
420            (
421                CoordinatorToConnectionInner::StartRequest {
422                    protocol_name,
423                    request_data,
424                    timeout,
425                    max_response_size,
426                    substream_id,
427                },
428                MultiStreamConnectionTaskInner::Established {
429                    established,
430                    outbound_substreams_map,
431                    ..
432                },
433            ) => {
434                let inner_substream_id = established.add_request(
435                    protocol_name,
436                    request_data,
437                    now.clone() + timeout,
438                    max_response_size,
439                    Some(substream_id),
440                );
441                let _prev_value = outbound_substreams_map.insert(substream_id, inner_substream_id);
442                debug_assert!(_prev_value.is_none());
443            }
444            (
445                CoordinatorToConnectionInner::OpenOutNotifications {
446                    max_handshake_size,
447                    protocol_name,
448                    handshake,
449                    handshake_timeout,
450                    substream_id: outer_substream_id,
451                },
452                MultiStreamConnectionTaskInner::Established {
453                    established,
454                    outbound_substreams_map,
455                    ..
456                },
457            ) => {
458                let inner_substream_id = established.open_notifications_substream(
459                    protocol_name,
460                    max_handshake_size,
461                    handshake,
462                    now.clone() + handshake_timeout,
463                    Some(outer_substream_id),
464                );
465
466                let _prev_value =
467                    outbound_substreams_map.insert(outer_substream_id, inner_substream_id);
468                debug_assert!(_prev_value.is_none());
469            }
470            (
471                CoordinatorToConnectionInner::CloseOutNotifications { substream_id },
472                MultiStreamConnectionTaskInner::Established {
473                    established,
474                    outbound_substreams_map,
475                    ..
476                },
477            ) => {
478                // It is possible that the remote has closed the outbound notification substream
479                // while the `CloseOutNotifications` message was being delivered, or that the API
480                // user close the substream before the message about the substream being closed
481                // was delivered to the coordinator.
482                if let Some(inner_substream_id) = outbound_substreams_map.remove(&substream_id) {
483                    established.close_out_notifications_substream(inner_substream_id);
484                }
485            }
486            (
487                CoordinatorToConnectionInner::QueueNotification {
488                    substream_id,
489                    notification,
490                },
491                MultiStreamConnectionTaskInner::Established {
492                    established,
493                    outbound_substreams_map,
494                    ..
495                },
496            ) => {
497                // It is possible that the remote has closed the outbound notification substream
498                // while a `QueueNotification` message was being delivered, or that the API user
499                // queued a notification before the message about the substream being closed was
500                // delivered to the coordinator.
501                // If that happens, we intentionally silently discard the message, causing the
502                // notification to not be sent. This is consistent with the guarantees about
503                // notifications delivered that are documented in the public API.
504                if let Some(inner_substream_id) = outbound_substreams_map.get(&substream_id) {
505                    established.write_notification_unbounded(*inner_substream_id, notification);
506                }
507            }
508            (
509                CoordinatorToConnectionInner::AnswerRequest {
510                    substream_id,
511                    response,
512                },
513                MultiStreamConnectionTaskInner::Established { established, .. },
514            ) => match established.respond_in_request(substream_id, response) {
515                Ok(()) => {}
516                Err(established::RespondInRequestError::SubstreamClosed) => {
517                    // As documented, answering an obsolete request is simply ignored.
518                }
519            },
520            (
521                CoordinatorToConnectionInner::AcceptInNotifications {
522                    substream_id,
523                    handshake,
524                    max_notification_size,
525                },
526                MultiStreamConnectionTaskInner::Established {
527                    established,
528                    notifications_in_close_acknowledgments,
529                    ..
530                },
531            ) => {
532                if !notifications_in_close_acknowledgments.remove(&substream_id) {
533                    established.accept_in_notifications_substream(
534                        substream_id,
535                        handshake,
536                        max_notification_size,
537                    );
538                }
539            }
540            (
541                CoordinatorToConnectionInner::RejectInNotifications { substream_id },
542                MultiStreamConnectionTaskInner::Established {
543                    established,
544                    notifications_in_close_acknowledgments,
545                    ..
546                },
547            ) => {
548                if !notifications_in_close_acknowledgments.remove(&substream_id) {
549                    established.reject_in_notifications_substream(substream_id);
550                }
551            }
552            (
553                CoordinatorToConnectionInner::CloseInNotifications {
554                    substream_id,
555                    timeout,
556                },
557                MultiStreamConnectionTaskInner::Established {
558                    established,
559                    notifications_in_close_acknowledgments,
560                    ..
561                },
562            ) => {
563                if !notifications_in_close_acknowledgments.remove(&substream_id) {
564                    established
565                        .close_in_notifications_substream(substream_id, now.clone() + timeout);
566                }
567            }
568            (
569                CoordinatorToConnectionInner::StartShutdown { .. },
570                MultiStreamConnectionTaskInner::Handshake { .. }
571                | MultiStreamConnectionTaskInner::Established { .. },
572            ) => {
573                // TODO: implement proper shutdown
574                self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
575                    start_shutdown_message_to_send: Some(None),
576                    shutdown_finish_message_sent: false,
577                    initiator: ShutdownInitiator::Coordinator,
578                };
579            }
580            (
581                CoordinatorToConnectionInner::AcceptInbound { .. }
582                | CoordinatorToConnectionInner::RejectInbound { .. }
583                | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }
584                | CoordinatorToConnectionInner::AcceptInNotifications { .. }
585                | CoordinatorToConnectionInner::RejectInNotifications { .. }
586                | CoordinatorToConnectionInner::CloseInNotifications { .. }
587                | CoordinatorToConnectionInner::StartRequest { .. }
588                | CoordinatorToConnectionInner::AnswerRequest { .. }
589                | CoordinatorToConnectionInner::OpenOutNotifications { .. }
590                | CoordinatorToConnectionInner::CloseOutNotifications { .. }
591                | CoordinatorToConnectionInner::QueueNotification { .. },
592                MultiStreamConnectionTaskInner::Handshake { .. }
593                | MultiStreamConnectionTaskInner::ShutdownAcked { .. },
594            ) => unreachable!(),
595            (
596                CoordinatorToConnectionInner::AcceptInbound { .. }
597                | CoordinatorToConnectionInner::RejectInbound { .. }
598                | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }
599                | CoordinatorToConnectionInner::AcceptInNotifications { .. }
600                | CoordinatorToConnectionInner::RejectInNotifications { .. }
601                | CoordinatorToConnectionInner::CloseInNotifications { .. }
602                | CoordinatorToConnectionInner::StartRequest { .. }
603                | CoordinatorToConnectionInner::AnswerRequest { .. }
604                | CoordinatorToConnectionInner::OpenOutNotifications { .. }
605                | CoordinatorToConnectionInner::CloseOutNotifications { .. }
606                | CoordinatorToConnectionInner::QueueNotification { .. },
607                MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. },
608            )
609            | (
610                CoordinatorToConnectionInner::StartShutdown,
611                MultiStreamConnectionTaskInner::ShutdownWaitingAck {
612                    initiator: ShutdownInitiator::Api,
613                    ..
614                },
615            ) => {
616                // There might still be some messages coming from the coordinator after the
617                // connection task has sent a message indicating that it has shut down. This is
618                // due to the concurrent nature of the API and doesn't indicate a bug. These
619                // messages are simply ignored by the connection task.
620            }
621            (
622                CoordinatorToConnectionInner::ShutdownFinishedAck,
623                MultiStreamConnectionTaskInner::ShutdownWaitingAck {
624                    start_shutdown_message_to_send: start_shutdown_message_sent,
625                    shutdown_finish_message_sent,
626                    initiator,
627                },
628            ) => {
629                debug_assert!(
630                    start_shutdown_message_sent.is_none() && *shutdown_finish_message_sent
631                );
632                self.connection = MultiStreamConnectionTaskInner::ShutdownAcked {
633                    initiator: *initiator,
634                };
635            }
636            (
637                CoordinatorToConnectionInner::StartShutdown,
638                MultiStreamConnectionTaskInner::ShutdownWaitingAck {
639                    initiator: ShutdownInitiator::Coordinator,
640                    ..
641                }
642                | MultiStreamConnectionTaskInner::ShutdownAcked { .. },
643            ) => unreachable!(),
644            (CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(),
645        }
646    }
647
648    /// Returns the number of new outbound substreams that the state machine would like to see
649    /// opened.
650    ///
651    /// This value doesn't change automatically over time but only after a call to
652    /// [`MultiStreamConnectionTask::substream_read_write`],
653    /// [`MultiStreamConnectionTask::inject_coordinator_message`],
654    /// [`MultiStreamConnectionTask::add_substream`], or
655    /// [`MultiStreamConnectionTask::reset_substream`].
656    ///
657    /// Note that the user is expected to track the number of substreams that are currently being
658    /// opened. For example, if this function returns 2 and there are already 2 substreams
659    /// currently being opened, then there is no need to open any additional one.
660    pub fn desired_outbound_substreams(&self) -> u32 {
661        match &self.connection {
662            MultiStreamConnectionTaskInner::Handshake {
663                opened_substream, ..
664            } => {
665                if opened_substream.is_none() {
666                    1
667                } else {
668                    0
669                }
670            }
671            MultiStreamConnectionTaskInner::Established { established, .. } => {
672                established.desired_outbound_substreams()
673            }
674            MultiStreamConnectionTaskInner::ShutdownAcked { .. }
675            | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => 0,
676        }
677    }
678
679    /// Notifies the state machine that a new substream has been opened.
680    ///
681    /// `outbound` indicates whether the substream has been opened by the remote (`false`) or
682    /// locally (`true`).
683    ///
684    /// If `outbound` is `true`, then the value returned by
685    /// [`MultiStreamConnectionTask::desired_outbound_substreams`] will decrease by one.
686    ///
687    /// # Panic
688    ///
689    /// Panics if there already exists a substream with an identical identifier.
690    ///
691    pub fn add_substream(&mut self, id: TSubId, outbound: bool) {
692        match &mut self.connection {
693            MultiStreamConnectionTaskInner::Handshake {
694                opened_substream: opened_substream @ None,
695                ..
696            } if outbound => {
697                *opened_substream = Some((id, webrtc_framing::WebRtcFraming::new()));
698            }
699            MultiStreamConnectionTaskInner::Handshake {
700                opened_substream,
701                extra_open_substreams,
702                ..
703            } => {
704                assert!(
705                    opened_substream
706                        .as_ref()
707                        .map_or(true, |(open, _)| *open != id)
708                );
709                // TODO: add a limit to the number allowed?
710                let _was_in = extra_open_substreams.insert(id, outbound);
711                assert!(_was_in.is_none());
712            }
713            MultiStreamConnectionTaskInner::Established { established, .. } => {
714                established.add_substream(id, outbound)
715            }
716            MultiStreamConnectionTaskInner::ShutdownAcked { .. }
717            | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
718                // TODO: reset the substream or something?
719            }
720        }
721    }
722
723    /// Sets the state of the connection to "reset".
724    ///
725    /// This should be called if the remote abruptly closes the connection, such as with a TCP/IP
726    /// RST flag.
727    ///
728    /// After this function has been called, it is illegal to call
729    /// [`MultiStreamConnectionTask::substream_read_write`] or
730    /// [`MultiStreamConnectionTask::reset`] again.
731    ///
732    /// Calling this function might have generated messages for the coordinator.
733    /// [`MultiStreamConnectionTask::pull_message_to_coordinator`] should be called afterwards in
734    /// order to process these messages.
735    ///
736    /// # Panic
737    ///
738    /// Panics if [`MultiStreamConnectionTask::reset`] has been called in the past.
739    ///
740    pub fn reset(&mut self) {
741        match self.connection {
742            MultiStreamConnectionTaskInner::ShutdownWaitingAck {
743                initiator: ShutdownInitiator::Api,
744                ..
745            }
746            | MultiStreamConnectionTaskInner::ShutdownAcked {
747                initiator: ShutdownInitiator::Api,
748                ..
749            } => {
750                // It is illegal to call `reset` a second time.
751                panic!()
752            }
753            MultiStreamConnectionTaskInner::ShutdownWaitingAck {
754                ref mut initiator, ..
755            }
756            | MultiStreamConnectionTaskInner::ShutdownAcked {
757                ref mut initiator, ..
758            } => {
759                // Mark the initiator as being the API in order to track proper API usage.
760                *initiator = ShutdownInitiator::Api;
761            }
762            _ => {
763                self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
764                    initiator: ShutdownInitiator::Api,
765                    shutdown_finish_message_sent: false,
766                    start_shutdown_message_to_send: Some(Some(ShutdownCause::RemoteReset)),
767                };
768            }
769        }
770    }
771
772    /// Returns `true` if [`MultiStreamConnectionTask::reset`] has been called in the past.
773    pub fn is_reset_called(&self) -> bool {
774        matches!(
775            self.connection,
776            MultiStreamConnectionTaskInner::ShutdownWaitingAck {
777                initiator: ShutdownInitiator::Api,
778                ..
779            } | MultiStreamConnectionTaskInner::ShutdownAcked {
780                initiator: ShutdownInitiator::Api,
781                ..
782            }
783        )
784    }
785
786    /// Immediately destroys the substream with the given identifier.
787    ///
788    /// The given identifier is now considered invalid by the state machine.
789    ///
790    /// # Panic
791    ///
792    /// Panics if there is no substream with that identifier.
793    ///
794    pub fn reset_substream(&mut self, substream_id: &TSubId) {
795        match &mut self.connection {
796            MultiStreamConnectionTaskInner::Established {
797                handshake_substream,
798                ..
799            } if handshake_substream
800                .as_ref()
801                .map_or(false, |s| s == substream_id) =>
802            {
803                *handshake_substream = None;
804            }
805            MultiStreamConnectionTaskInner::Established { established, .. } => {
806                established.reset_substream(substream_id)
807            }
808            MultiStreamConnectionTaskInner::Handshake {
809                opened_substream: Some((opened_substream, _)),
810                ..
811            } if opened_substream == substream_id => {
812                // TODO: the handshake has failed, kill the connection?
813            }
814            MultiStreamConnectionTaskInner::Handshake {
815                extra_open_substreams,
816                ..
817            } => {
818                let _was_in = extra_open_substreams.remove(substream_id).is_some();
819                assert!(_was_in);
820            }
821            MultiStreamConnectionTaskInner::ShutdownAcked { .. }
822            | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
823                // TODO: panic if substream id invalid?
824            }
825        }
826    }
827
828    /// Reads/writes data on the substream.
829    ///
830    /// If the method returns [`SubstreamFate::Reset`], then the substream is now considered dead
831    /// according to the state machine and its identifier is now invalid. If the reading or
832    /// writing side of the substream was still open, then the user should reset that substream.
833    ///
834    /// In the case of a WebRTC connection, the [`ReadWrite::incoming_buffer`] and
835    /// [`ReadWrite::write_bytes_queueable`] must always be `Some`.
836    ///
837    /// # Panic
838    ///
839    /// Panics if there is no substream with that identifier.
840    /// Panics if this is a WebRTC connection, and the reading or writing side is closed.
841    ///
842    #[must_use]
843    pub fn substream_read_write(
844        &mut self,
845        substream_id: &TSubId,
846        read_write: &mut ReadWrite<TNow>,
847    ) -> SubstreamFate {
848        // In WebRTC, the reading and writing sides are never closed.
849        // Note that the `established::MultiStream` state machine also performs this check, but
850        // we do it here again because we're not necessarily in the ̀`established` state.
851        assert!(
852            read_write.expected_incoming_bytes.is_some()
853                && read_write.write_bytes_queueable.is_some()
854        );
855
856        match &mut self.connection {
857            MultiStreamConnectionTaskInner::Handshake {
858                handshake,
859                opened_substream: Some((opened_handshake_substream, handshake_webrtc_framing)),
860                established,
861                extra_open_substreams,
862            } if opened_handshake_substream == substream_id => {
863                // TODO: check the handshake timeout
864
865                // Progress the Noise handshake.
866                let handshake_outcome = {
867                    // The Noise data is not directly the data of the substream. Instead,
868                    // everything is wrapped within a Protobuf frame.
869                    let mut with_framing = match handshake_webrtc_framing.read_write(read_write) {
870                        Ok(f) => f,
871                        Err(_err) => {
872                            // TODO: not great for diagnostic to just ignore the error; also, the connection should just reset entirely
873                            return SubstreamFate::Reset;
874                        }
875                    };
876                    handshake.take().unwrap().read_write(&mut with_framing)
877                };
878
879                match handshake_outcome {
880                    Ok(noise::NoiseHandshake::InProgress(handshake_update)) => {
881                        *handshake = Some(handshake_update);
882                        SubstreamFate::Continue
883                    }
884                    Err(_err) => return SubstreamFate::Reset, // TODO: /!\
885                    Ok(noise::NoiseHandshake::Success {
886                        cipher: _,
887                        remote_peer_id,
888                    }) => {
889                        // The handshake has succeeded and we will transition into "established"
890                        // mode.
891                        let mut established = established.take().unwrap();
892                        for (substream_id, outbound) in extra_open_substreams.drain() {
893                            established.add_substream(substream_id, outbound);
894                        }
895
896                        self.connection = MultiStreamConnectionTaskInner::Established {
897                            established,
898                            handshake_finished_message_to_send: Some(remote_peer_id),
899                            handshake_substream: None, // TODO: do properly
900                            outbound_substreams_map: hashbrown::HashMap::with_capacity_and_hasher(
901                                0,
902                                Default::default(),
903                            ),
904                            notifications_in_close_acknowledgments:
905                                hashbrown::HashSet::with_capacity_and_hasher(2, Default::default()),
906                            inbound_accept_cancel_events: VecDeque::with_capacity(2),
907                        };
908
909                        // TODO: hacky
910                        SubstreamFate::Reset
911                    }
912                }
913            }
914            MultiStreamConnectionTaskInner::Established {
915                handshake_substream,
916                ..
917            } if handshake_substream
918                .as_ref()
919                .map_or(false, |s| s == substream_id) =>
920            {
921                // Close the writing side. If the reading side is closed, we indicate that the
922                // substream is dead. If the reading side is still open, we indicate that it's not
923                // dead and simply wait for the remote to close it.
924                // TODO: kill the connection if the remote sends more data?
925                read_write.close_write();
926                if read_write.expected_incoming_bytes.is_none() {
927                    *handshake_substream = None;
928                    SubstreamFate::Reset
929                } else {
930                    SubstreamFate::Continue
931                }
932            }
933            MultiStreamConnectionTaskInner::Established { established, .. } => {
934                established.substream_read_write(substream_id, read_write)
935            }
936            MultiStreamConnectionTaskInner::Handshake {
937                extra_open_substreams,
938                ..
939            } => {
940                assert!(extra_open_substreams.contains_key(substream_id));
941                // Don't do anything. Don't read or write. Instead we wait for the handshake to
942                // be finished.
943                SubstreamFate::Continue
944            }
945            MultiStreamConnectionTaskInner::ShutdownAcked { .. }
946            | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
947                // TODO: panic if substream id invalid?
948                SubstreamFate::Reset
949            }
950        }
951    }
952}