smoldot/libp2p/collection/multi_stream.rs
1// Smoldot
2// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18use super::{
19 super::{
20 connection::{established, noise, webrtc_framing},
21 read_write::ReadWrite,
22 },
23 ConnectionToCoordinator, ConnectionToCoordinatorInner, CoordinatorToConnection,
24 CoordinatorToConnectionInner, NotificationsOutErr, PeerId, ShutdownCause, SubstreamFate,
25 SubstreamId,
26};
27
28use alloc::{collections::VecDeque, string::ToString as _, sync::Arc};
29use core::{
30 hash::Hash,
31 ops::{Add, Sub},
32 time::Duration,
33};
34
35/// State machine dedicated to a single multi-stream connection.
36pub struct MultiStreamConnectionTask<TNow, TSubId> {
37 connection: MultiStreamConnectionTaskInner<TNow, TSubId>,
38}
39enum MultiStreamConnectionTaskInner<TNow, TSubId> {
40 /// Connection is still in its handshake phase.
41 Handshake {
42 /// Substream that has been opened to perform the handshake, if any.
43 opened_substream: Option<(TSubId, webrtc_framing::WebRtcFraming)>,
44
45 /// Noise handshake in progress. Always `Some`, except to be temporarily extracted.
46 handshake: Option<noise::HandshakeInProgress>,
47
48 /// Other substreams, besides [`MultiStreamConnectionTaskInner::Handshake::opened_substream`],
49 /// that have been opened. For each substream, contains a boolean indicating whether the
50 /// substream is outbound (`true`) or inbound (`false`).
51 ///
52 /// Due to the asynchronous nature of the protocol, it is not a logic error to open
53 /// additional substreams before the handshake has finished. The remote might think that
54 /// the handshake has finished while the local node hasn't finished processing it yet.
55 ///
56 /// These substreams aren't processed as long as the handshake hasn't finished. It is,
57 /// however, important to remember that substreams have been opened.
58 extra_open_substreams: hashbrown::HashMap<TSubId, bool, fnv::FnvBuildHasher>,
59
60 /// State machine used once the connection has been established. Unused during the
61 /// handshake, but created ahead of time. Always `Some`, except to be temporarily
62 /// extracted.
63 established: Option<established::MultiStream<TNow, TSubId, Option<SubstreamId>>>,
64 },
65
66 /// Connection has been fully established.
67 Established {
68 established: established::MultiStream<TNow, TSubId, Option<SubstreamId>>,
69
70 /// If `Some`, contains the substream that was used for the handshake. This substream
71 /// is meant to be closed as soon as possible.
72 handshake_substream: Option<TSubId>,
73
74 /// If `Some`, then no `HandshakeFinished` message has been sent back yet.
75 handshake_finished_message_to_send: Option<PeerId>,
76
77 /// Because outgoing substream ids are assigned by the coordinator, we maintain a mapping
78 /// of the "outer ids" to "inner ids".
79 outbound_substreams_map:
80 hashbrown::HashMap<SubstreamId, established::SubstreamId, fnv::FnvBuildHasher>,
81
82 /// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] or a
83 /// [`ConnectionToCoordinatorInner::NotificationsInClose`] is emitted, an
84 /// entry is added to this list. If the coordinator accepts or refuses a substream in this
85 /// list, or closes a substream in this list, the acceptance/refusal/closing is dismissed.
86 // TODO: this works only because SubstreamIds aren't reused
87 notifications_in_close_acknowledgments:
88 hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>,
89
90 /// Messages about inbound accept cancellations to send back.
91 inbound_accept_cancel_events: VecDeque<established::SubstreamId>,
92 },
93
94 /// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
95 /// message has been sent and is waiting to be acknowledged.
96 ShutdownWaitingAck {
97 /// What has initiated the shutdown.
98 initiator: ShutdownInitiator,
99
100 /// `None` if the [`ConnectionToCoordinatorInner::StartShutdown`] message has already
101 /// been sent to the coordinator. `Some` if the message hasn't been sent yet.
102 start_shutdown_message_to_send: Option<Option<ShutdownCause>>,
103
104 /// `true` if the [`ConnectionToCoordinatorInner::ShutdownFinished`] message has already
105 /// been sent to the coordinator.
106 shutdown_finish_message_sent: bool,
107 },
108
109 /// Connection has finished its shutdown and its shutdown has been acknowledged. There is
110 /// nothing more to do except stop the connection task.
111 ShutdownAcked {
112 /// What has initiated the shutdown.
113 initiator: ShutdownInitiator,
114 },
115}
116
117#[derive(Debug, Copy, Clone, PartialEq, Eq)]
118enum ShutdownInitiator {
119 /// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message.
120 Coordinator,
121 /// [`MultiStreamConnectionTask::reset`] has been called.
122 Api,
123}
124
125impl<TNow, TSubId> MultiStreamConnectionTask<TNow, TSubId>
126where
127 TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
128 TSubId: Clone + PartialEq + Eq + Hash,
129{
130 // Note that the parameters of this function are a bit rough and undocumented, as this is
131 // a function only called from the parent module.
132 pub(super) fn new(
133 randomness_seed: [u8; 32],
134 when_connection_start: TNow,
135 handshake: noise::HandshakeInProgress,
136 max_inbound_substreams: usize,
137 substreams_capacity: usize,
138 max_protocol_name_len: usize,
139 ping_protocol: Arc<str>,
140 ) -> Self {
141 MultiStreamConnectionTask {
142 connection: MultiStreamConnectionTaskInner::Handshake {
143 // TODO: the handshake doesn't have a timeout
144 handshake: Some(handshake),
145 opened_substream: None,
146 extra_open_substreams: hashbrown::HashMap::with_capacity_and_hasher(
147 0,
148 Default::default(),
149 ),
150 established: Some(established::MultiStream::webrtc(established::Config {
151 max_inbound_substreams,
152 substreams_capacity,
153 max_protocol_name_len,
154 randomness_seed,
155 ping_protocol: ping_protocol.to_string(), // TODO: cloning :-/
156 ping_interval: Duration::from_secs(20), // TODO: hardcoded
157 ping_timeout: Duration::from_secs(10), // TODO: hardcoded
158 first_out_ping: when_connection_start, // TODO: only start the ping after the Noise handshake has ended
159 })),
160 },
161 }
162 }
163
164 /// Pulls a message to send back to the coordinator.
165 ///
166 /// This function takes ownership of `self` and optionally yields it back. If the first
167 /// option contains `None`, then no more message will be generated and the
168 /// [`MultiStreamConnectionTask`] has vanished. This will happen after the connection has been
169 /// shut down or reset.
170 /// It is possible for `self` to not be yielded back even if substreams are still open, in
171 /// which case the API user should abruptly reset the connection, for example by sending a
172 /// TCP RST flag.
173 ///
174 /// If any message is returned, it is the responsibility of the API user to send it to the
175 /// coordinator.
176 /// Do not attempt to buffer the message being returned, as it would work against the
177 /// back-pressure strategy used internally. As soon as a message is returned, it should be
178 /// delivered. If the coordinator is busy at the moment a message should be delivered, then
179 /// the entire thread of execution dedicated to this [`MultiStreamConnectionTask`] should be
180 /// paused until the coordinator is ready and the message delivered.
181 ///
182 /// Messages aren't generated spontaneously. In other words, you don't need to periodically
183 /// call this function just in case there's a new message. Messages are always generated after
184 /// [`MultiStreamConnectionTask::substream_read_write`],
185 /// [`MultiStreamConnectionTask::add_substream`], or [`MultiStreamConnectionTask::reset`]
186 /// has been called. Multiple messages can happen in a row.
187 ///
188 /// Because this function frees space in a buffer, processing substreams again after it
189 /// has returned might read/write more data and generate an event again. In other words,
190 /// the API user should call [`MultiStreamConnectionTask::substream_read_write`] and
191 /// [`MultiStreamConnectionTask::pull_message_to_coordinator`] repeatedly in a loop until no
192 /// more message is generated.
193 pub fn pull_message_to_coordinator(
194 mut self,
195 ) -> (Option<Self>, Option<ConnectionToCoordinator>) {
196 match &mut self.connection {
197 MultiStreamConnectionTaskInner::Handshake { .. } => (Some(self), None),
198 MultiStreamConnectionTaskInner::Established {
199 established,
200 outbound_substreams_map,
201 handshake_finished_message_to_send,
202 notifications_in_close_acknowledgments,
203 inbound_accept_cancel_events,
204 ..
205 } => {
206 if let Some(remote_peer_id) = handshake_finished_message_to_send.take() {
207 return (
208 Some(self),
209 Some(ConnectionToCoordinator {
210 inner: ConnectionToCoordinatorInner::HandshakeFinished(remote_peer_id),
211 }),
212 );
213 }
214
215 if let Some(substream_id) = inbound_accept_cancel_events.pop_front() {
216 return (
217 Some(self),
218 Some(ConnectionToCoordinator {
219 inner: ConnectionToCoordinatorInner::InboundAcceptedCancel {
220 id: substream_id,
221 },
222 }),
223 );
224 }
225
226 let event = match established.pull_event() {
227 Some(established::Event::NewOutboundSubstreamsForbidden) => {
228 // TODO: handle properly
229 self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
230 start_shutdown_message_to_send: Some(None),
231 shutdown_finish_message_sent: false,
232 initiator: ShutdownInitiator::Coordinator,
233 };
234 Some(ConnectionToCoordinatorInner::StartShutdown(None))
235 }
236 Some(established::Event::InboundError(err)) => {
237 Some(ConnectionToCoordinatorInner::InboundError(err))
238 }
239 Some(established::Event::InboundNegotiated { id, protocol_name }) => {
240 Some(ConnectionToCoordinatorInner::InboundNegotiated { id, protocol_name })
241 }
242 Some(established::Event::InboundNegotiatedCancel { id, .. }) => {
243 notifications_in_close_acknowledgments.insert(id);
244 None
245 }
246 Some(established::Event::InboundAcceptedCancel { id, .. }) => {
247 Some(ConnectionToCoordinatorInner::InboundAcceptedCancel { id })
248 }
249 Some(established::Event::RequestIn { id, request, .. }) => {
250 Some(ConnectionToCoordinatorInner::RequestIn { id, request })
251 }
252 Some(established::Event::Response {
253 response,
254 user_data,
255 ..
256 }) => {
257 let Some(outer_substream_id) = user_data else {
258 panic!()
259 };
260 outbound_substreams_map.remove(&outer_substream_id).unwrap();
261 Some(ConnectionToCoordinatorInner::Response {
262 response,
263 id: outer_substream_id,
264 })
265 }
266 Some(established::Event::NotificationsInOpen { id, handshake, .. }) => {
267 Some(ConnectionToCoordinatorInner::NotificationsInOpen { id, handshake })
268 }
269 Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
270 notifications_in_close_acknowledgments.insert(id);
271 Some(ConnectionToCoordinatorInner::NotificationsInOpenCancel { id })
272 }
273 Some(established::Event::NotificationIn { id, notification }) => {
274 Some(ConnectionToCoordinatorInner::NotificationIn { id, notification })
275 }
276 Some(established::Event::NotificationsInClose { id, outcome, .. }) => {
277 notifications_in_close_acknowledgments.insert(id);
278 Some(ConnectionToCoordinatorInner::NotificationsInClose { id, outcome })
279 }
280 Some(established::Event::NotificationsOutResult { id, result }) => {
281 let (outer_substream_id, result) = match result {
282 Ok(r) => {
283 let Some(outer_substream_id) = established[id] else {
284 panic!()
285 };
286 (outer_substream_id, Ok(r))
287 }
288 Err((err, ud)) => {
289 let Some(outer_substream_id) = ud else {
290 panic!()
291 };
292 outbound_substreams_map.remove(&outer_substream_id);
293 (outer_substream_id, Err(NotificationsOutErr::Substream(err)))
294 }
295 };
296
297 Some(ConnectionToCoordinatorInner::NotificationsOutResult {
298 id: outer_substream_id,
299 result,
300 })
301 }
302 Some(established::Event::NotificationsOutCloseDemanded { id }) => {
303 let Some(outer_substream_id) = established[id] else {
304 panic!()
305 };
306 Some(
307 ConnectionToCoordinatorInner::NotificationsOutCloseDemanded {
308 id: outer_substream_id,
309 },
310 )
311 }
312 Some(established::Event::NotificationsOutReset { user_data, .. }) => {
313 let Some(outer_substream_id) = user_data else {
314 panic!()
315 };
316 outbound_substreams_map.remove(&outer_substream_id);
317 Some(ConnectionToCoordinatorInner::NotificationsOutReset {
318 id: outer_substream_id,
319 })
320 }
321 Some(established::Event::PingOutSuccess { ping_time }) => {
322 Some(ConnectionToCoordinatorInner::PingOutSuccess { ping_time })
323 }
324 Some(established::Event::PingOutFailed) => {
325 Some(ConnectionToCoordinatorInner::PingOutFailed)
326 }
327 None => None,
328 };
329
330 (
331 Some(self),
332 event.map(|ev| ConnectionToCoordinator { inner: ev }),
333 )
334 }
335 MultiStreamConnectionTaskInner::ShutdownWaitingAck {
336 start_shutdown_message_to_send,
337 shutdown_finish_message_sent,
338 ..
339 } => {
340 if let Some(reason) = start_shutdown_message_to_send.take() {
341 debug_assert!(!*shutdown_finish_message_sent);
342 (
343 Some(self),
344 Some(ConnectionToCoordinator {
345 inner: ConnectionToCoordinatorInner::StartShutdown(reason),
346 }),
347 )
348 } else if !*shutdown_finish_message_sent {
349 debug_assert!(start_shutdown_message_to_send.is_none());
350 *shutdown_finish_message_sent = true;
351 (
352 Some(self),
353 Some(ConnectionToCoordinator {
354 inner: ConnectionToCoordinatorInner::ShutdownFinished,
355 }),
356 )
357 } else {
358 (Some(self), None)
359 }
360 }
361 MultiStreamConnectionTaskInner::ShutdownAcked { .. } => (None, None),
362 }
363 }
364
365 /// Injects a message that has been pulled from the coordinator.
366 ///
367 /// Calling this function might generate data to send to the connection. You should call
368 /// [`MultiStreamConnectionTask::desired_outbound_substreams`] and
369 /// [`MultiStreamConnectionTask::substream_read_write`] after this function has returned.
370 pub fn inject_coordinator_message(&mut self, now: &TNow, message: CoordinatorToConnection) {
371 match (message.inner, &mut self.connection) {
372 (
373 CoordinatorToConnectionInner::AcceptInbound {
374 substream_id,
375 inbound_ty,
376 },
377 MultiStreamConnectionTaskInner::Established {
378 established,
379 notifications_in_close_acknowledgments,
380 inbound_accept_cancel_events,
381 ..
382 },
383 ) => {
384 if !notifications_in_close_acknowledgments.remove(&substream_id) {
385 established.accept_inbound(substream_id, inbound_ty, None);
386 } else {
387 inbound_accept_cancel_events.push_back(substream_id)
388 }
389 }
390 (
391 CoordinatorToConnectionInner::RejectInbound { substream_id },
392 MultiStreamConnectionTaskInner::Established {
393 established,
394 notifications_in_close_acknowledgments,
395 ..
396 },
397 ) => {
398 if !notifications_in_close_acknowledgments.remove(&substream_id) {
399 established.reject_inbound(substream_id);
400 }
401 }
402 (
403 CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length },
404 MultiStreamConnectionTaskInner::Handshake {
405 established: Some(established),
406 ..
407 }
408 | MultiStreamConnectionTaskInner::Established { established, .. },
409 ) => {
410 established.set_max_protocol_name_len(new_max_length);
411 }
412 (
413 CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. },
414 MultiStreamConnectionTaskInner::Handshake {
415 established: None, ..
416 },
417 ) => {
418 unreachable!()
419 }
420 (
421 CoordinatorToConnectionInner::StartRequest {
422 protocol_name,
423 request_data,
424 timeout,
425 max_response_size,
426 substream_id,
427 },
428 MultiStreamConnectionTaskInner::Established {
429 established,
430 outbound_substreams_map,
431 ..
432 },
433 ) => {
434 let inner_substream_id = established.add_request(
435 protocol_name,
436 request_data,
437 now.clone() + timeout,
438 max_response_size,
439 Some(substream_id),
440 );
441 let _prev_value = outbound_substreams_map.insert(substream_id, inner_substream_id);
442 debug_assert!(_prev_value.is_none());
443 }
444 (
445 CoordinatorToConnectionInner::OpenOutNotifications {
446 max_handshake_size,
447 protocol_name,
448 handshake,
449 handshake_timeout,
450 substream_id: outer_substream_id,
451 },
452 MultiStreamConnectionTaskInner::Established {
453 established,
454 outbound_substreams_map,
455 ..
456 },
457 ) => {
458 let inner_substream_id = established.open_notifications_substream(
459 protocol_name,
460 max_handshake_size,
461 handshake,
462 now.clone() + handshake_timeout,
463 Some(outer_substream_id),
464 );
465
466 let _prev_value =
467 outbound_substreams_map.insert(outer_substream_id, inner_substream_id);
468 debug_assert!(_prev_value.is_none());
469 }
470 (
471 CoordinatorToConnectionInner::CloseOutNotifications { substream_id },
472 MultiStreamConnectionTaskInner::Established {
473 established,
474 outbound_substreams_map,
475 ..
476 },
477 ) => {
478 // It is possible that the remote has closed the outbound notification substream
479 // while the `CloseOutNotifications` message was being delivered, or that the API
480 // user close the substream before the message about the substream being closed
481 // was delivered to the coordinator.
482 if let Some(inner_substream_id) = outbound_substreams_map.remove(&substream_id) {
483 established.close_out_notifications_substream(inner_substream_id);
484 }
485 }
486 (
487 CoordinatorToConnectionInner::QueueNotification {
488 substream_id,
489 notification,
490 },
491 MultiStreamConnectionTaskInner::Established {
492 established,
493 outbound_substreams_map,
494 ..
495 },
496 ) => {
497 // It is possible that the remote has closed the outbound notification substream
498 // while a `QueueNotification` message was being delivered, or that the API user
499 // queued a notification before the message about the substream being closed was
500 // delivered to the coordinator.
501 // If that happens, we intentionally silently discard the message, causing the
502 // notification to not be sent. This is consistent with the guarantees about
503 // notifications delivered that are documented in the public API.
504 if let Some(inner_substream_id) = outbound_substreams_map.get(&substream_id) {
505 established.write_notification_unbounded(*inner_substream_id, notification);
506 }
507 }
508 (
509 CoordinatorToConnectionInner::AnswerRequest {
510 substream_id,
511 response,
512 },
513 MultiStreamConnectionTaskInner::Established { established, .. },
514 ) => match established.respond_in_request(substream_id, response) {
515 Ok(()) => {}
516 Err(established::RespondInRequestError::SubstreamClosed) => {
517 // As documented, answering an obsolete request is simply ignored.
518 }
519 },
520 (
521 CoordinatorToConnectionInner::AcceptInNotifications {
522 substream_id,
523 handshake,
524 max_notification_size,
525 },
526 MultiStreamConnectionTaskInner::Established {
527 established,
528 notifications_in_close_acknowledgments,
529 ..
530 },
531 ) => {
532 if !notifications_in_close_acknowledgments.remove(&substream_id) {
533 established.accept_in_notifications_substream(
534 substream_id,
535 handshake,
536 max_notification_size,
537 );
538 }
539 }
540 (
541 CoordinatorToConnectionInner::RejectInNotifications { substream_id },
542 MultiStreamConnectionTaskInner::Established {
543 established,
544 notifications_in_close_acknowledgments,
545 ..
546 },
547 ) => {
548 if !notifications_in_close_acknowledgments.remove(&substream_id) {
549 established.reject_in_notifications_substream(substream_id);
550 }
551 }
552 (
553 CoordinatorToConnectionInner::CloseInNotifications {
554 substream_id,
555 timeout,
556 },
557 MultiStreamConnectionTaskInner::Established {
558 established,
559 notifications_in_close_acknowledgments,
560 ..
561 },
562 ) => {
563 if !notifications_in_close_acknowledgments.remove(&substream_id) {
564 established
565 .close_in_notifications_substream(substream_id, now.clone() + timeout);
566 }
567 }
568 (
569 CoordinatorToConnectionInner::StartShutdown { .. },
570 MultiStreamConnectionTaskInner::Handshake { .. }
571 | MultiStreamConnectionTaskInner::Established { .. },
572 ) => {
573 // TODO: implement proper shutdown
574 self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
575 start_shutdown_message_to_send: Some(None),
576 shutdown_finish_message_sent: false,
577 initiator: ShutdownInitiator::Coordinator,
578 };
579 }
580 (
581 CoordinatorToConnectionInner::AcceptInbound { .. }
582 | CoordinatorToConnectionInner::RejectInbound { .. }
583 | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }
584 | CoordinatorToConnectionInner::AcceptInNotifications { .. }
585 | CoordinatorToConnectionInner::RejectInNotifications { .. }
586 | CoordinatorToConnectionInner::CloseInNotifications { .. }
587 | CoordinatorToConnectionInner::StartRequest { .. }
588 | CoordinatorToConnectionInner::AnswerRequest { .. }
589 | CoordinatorToConnectionInner::OpenOutNotifications { .. }
590 | CoordinatorToConnectionInner::CloseOutNotifications { .. }
591 | CoordinatorToConnectionInner::QueueNotification { .. },
592 MultiStreamConnectionTaskInner::Handshake { .. }
593 | MultiStreamConnectionTaskInner::ShutdownAcked { .. },
594 ) => unreachable!(),
595 (
596 CoordinatorToConnectionInner::AcceptInbound { .. }
597 | CoordinatorToConnectionInner::RejectInbound { .. }
598 | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }
599 | CoordinatorToConnectionInner::AcceptInNotifications { .. }
600 | CoordinatorToConnectionInner::RejectInNotifications { .. }
601 | CoordinatorToConnectionInner::CloseInNotifications { .. }
602 | CoordinatorToConnectionInner::StartRequest { .. }
603 | CoordinatorToConnectionInner::AnswerRequest { .. }
604 | CoordinatorToConnectionInner::OpenOutNotifications { .. }
605 | CoordinatorToConnectionInner::CloseOutNotifications { .. }
606 | CoordinatorToConnectionInner::QueueNotification { .. },
607 MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. },
608 )
609 | (
610 CoordinatorToConnectionInner::StartShutdown,
611 MultiStreamConnectionTaskInner::ShutdownWaitingAck {
612 initiator: ShutdownInitiator::Api,
613 ..
614 },
615 ) => {
616 // There might still be some messages coming from the coordinator after the
617 // connection task has sent a message indicating that it has shut down. This is
618 // due to the concurrent nature of the API and doesn't indicate a bug. These
619 // messages are simply ignored by the connection task.
620 }
621 (
622 CoordinatorToConnectionInner::ShutdownFinishedAck,
623 MultiStreamConnectionTaskInner::ShutdownWaitingAck {
624 start_shutdown_message_to_send: start_shutdown_message_sent,
625 shutdown_finish_message_sent,
626 initiator,
627 },
628 ) => {
629 debug_assert!(
630 start_shutdown_message_sent.is_none() && *shutdown_finish_message_sent
631 );
632 self.connection = MultiStreamConnectionTaskInner::ShutdownAcked {
633 initiator: *initiator,
634 };
635 }
636 (
637 CoordinatorToConnectionInner::StartShutdown,
638 MultiStreamConnectionTaskInner::ShutdownWaitingAck {
639 initiator: ShutdownInitiator::Coordinator,
640 ..
641 }
642 | MultiStreamConnectionTaskInner::ShutdownAcked { .. },
643 ) => unreachable!(),
644 (CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(),
645 }
646 }
647
648 /// Returns the number of new outbound substreams that the state machine would like to see
649 /// opened.
650 ///
651 /// This value doesn't change automatically over time but only after a call to
652 /// [`MultiStreamConnectionTask::substream_read_write`],
653 /// [`MultiStreamConnectionTask::inject_coordinator_message`],
654 /// [`MultiStreamConnectionTask::add_substream`], or
655 /// [`MultiStreamConnectionTask::reset_substream`].
656 ///
657 /// Note that the user is expected to track the number of substreams that are currently being
658 /// opened. For example, if this function returns 2 and there are already 2 substreams
659 /// currently being opened, then there is no need to open any additional one.
660 pub fn desired_outbound_substreams(&self) -> u32 {
661 match &self.connection {
662 MultiStreamConnectionTaskInner::Handshake {
663 opened_substream, ..
664 } => {
665 if opened_substream.is_none() {
666 1
667 } else {
668 0
669 }
670 }
671 MultiStreamConnectionTaskInner::Established { established, .. } => {
672 established.desired_outbound_substreams()
673 }
674 MultiStreamConnectionTaskInner::ShutdownAcked { .. }
675 | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => 0,
676 }
677 }
678
679 /// Notifies the state machine that a new substream has been opened.
680 ///
681 /// `outbound` indicates whether the substream has been opened by the remote (`false`) or
682 /// locally (`true`).
683 ///
684 /// If `outbound` is `true`, then the value returned by
685 /// [`MultiStreamConnectionTask::desired_outbound_substreams`] will decrease by one.
686 ///
687 /// # Panic
688 ///
689 /// Panics if there already exists a substream with an identical identifier.
690 ///
691 pub fn add_substream(&mut self, id: TSubId, outbound: bool) {
692 match &mut self.connection {
693 MultiStreamConnectionTaskInner::Handshake {
694 opened_substream: opened_substream @ None,
695 ..
696 } if outbound => {
697 *opened_substream = Some((id, webrtc_framing::WebRtcFraming::new()));
698 }
699 MultiStreamConnectionTaskInner::Handshake {
700 opened_substream,
701 extra_open_substreams,
702 ..
703 } => {
704 assert!(
705 opened_substream
706 .as_ref()
707 .map_or(true, |(open, _)| *open != id)
708 );
709 // TODO: add a limit to the number allowed?
710 let _was_in = extra_open_substreams.insert(id, outbound);
711 assert!(_was_in.is_none());
712 }
713 MultiStreamConnectionTaskInner::Established { established, .. } => {
714 established.add_substream(id, outbound)
715 }
716 MultiStreamConnectionTaskInner::ShutdownAcked { .. }
717 | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
718 // TODO: reset the substream or something?
719 }
720 }
721 }
722
723 /// Sets the state of the connection to "reset".
724 ///
725 /// This should be called if the remote abruptly closes the connection, such as with a TCP/IP
726 /// RST flag.
727 ///
728 /// After this function has been called, it is illegal to call
729 /// [`MultiStreamConnectionTask::substream_read_write`] or
730 /// [`MultiStreamConnectionTask::reset`] again.
731 ///
732 /// Calling this function might have generated messages for the coordinator.
733 /// [`MultiStreamConnectionTask::pull_message_to_coordinator`] should be called afterwards in
734 /// order to process these messages.
735 ///
736 /// # Panic
737 ///
738 /// Panics if [`MultiStreamConnectionTask::reset`] has been called in the past.
739 ///
740 pub fn reset(&mut self) {
741 match self.connection {
742 MultiStreamConnectionTaskInner::ShutdownWaitingAck {
743 initiator: ShutdownInitiator::Api,
744 ..
745 }
746 | MultiStreamConnectionTaskInner::ShutdownAcked {
747 initiator: ShutdownInitiator::Api,
748 ..
749 } => {
750 // It is illegal to call `reset` a second time.
751 panic!()
752 }
753 MultiStreamConnectionTaskInner::ShutdownWaitingAck {
754 ref mut initiator, ..
755 }
756 | MultiStreamConnectionTaskInner::ShutdownAcked {
757 ref mut initiator, ..
758 } => {
759 // Mark the initiator as being the API in order to track proper API usage.
760 *initiator = ShutdownInitiator::Api;
761 }
762 _ => {
763 self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
764 initiator: ShutdownInitiator::Api,
765 shutdown_finish_message_sent: false,
766 start_shutdown_message_to_send: Some(Some(ShutdownCause::RemoteReset)),
767 };
768 }
769 }
770 }
771
772 /// Returns `true` if [`MultiStreamConnectionTask::reset`] has been called in the past.
773 pub fn is_reset_called(&self) -> bool {
774 matches!(
775 self.connection,
776 MultiStreamConnectionTaskInner::ShutdownWaitingAck {
777 initiator: ShutdownInitiator::Api,
778 ..
779 } | MultiStreamConnectionTaskInner::ShutdownAcked {
780 initiator: ShutdownInitiator::Api,
781 ..
782 }
783 )
784 }
785
786 /// Immediately destroys the substream with the given identifier.
787 ///
788 /// The given identifier is now considered invalid by the state machine.
789 ///
790 /// # Panic
791 ///
792 /// Panics if there is no substream with that identifier.
793 ///
794 pub fn reset_substream(&mut self, substream_id: &TSubId) {
795 match &mut self.connection {
796 MultiStreamConnectionTaskInner::Established {
797 handshake_substream,
798 ..
799 } if handshake_substream
800 .as_ref()
801 .map_or(false, |s| s == substream_id) =>
802 {
803 *handshake_substream = None;
804 }
805 MultiStreamConnectionTaskInner::Established { established, .. } => {
806 established.reset_substream(substream_id)
807 }
808 MultiStreamConnectionTaskInner::Handshake {
809 opened_substream: Some((opened_substream, _)),
810 ..
811 } if opened_substream == substream_id => {
812 // TODO: the handshake has failed, kill the connection?
813 }
814 MultiStreamConnectionTaskInner::Handshake {
815 extra_open_substreams,
816 ..
817 } => {
818 let _was_in = extra_open_substreams.remove(substream_id).is_some();
819 assert!(_was_in);
820 }
821 MultiStreamConnectionTaskInner::ShutdownAcked { .. }
822 | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
823 // TODO: panic if substream id invalid?
824 }
825 }
826 }
827
828 /// Reads/writes data on the substream.
829 ///
830 /// If the method returns [`SubstreamFate::Reset`], then the substream is now considered dead
831 /// according to the state machine and its identifier is now invalid. If the reading or
832 /// writing side of the substream was still open, then the user should reset that substream.
833 ///
834 /// In the case of a WebRTC connection, the [`ReadWrite::incoming_buffer`] and
835 /// [`ReadWrite::write_bytes_queueable`] must always be `Some`.
836 ///
837 /// # Panic
838 ///
839 /// Panics if there is no substream with that identifier.
840 /// Panics if this is a WebRTC connection, and the reading or writing side is closed.
841 ///
842 #[must_use]
843 pub fn substream_read_write(
844 &mut self,
845 substream_id: &TSubId,
846 read_write: &mut ReadWrite<TNow>,
847 ) -> SubstreamFate {
848 // In WebRTC, the reading and writing sides are never closed.
849 // Note that the `established::MultiStream` state machine also performs this check, but
850 // we do it here again because we're not necessarily in the ̀`established` state.
851 assert!(
852 read_write.expected_incoming_bytes.is_some()
853 && read_write.write_bytes_queueable.is_some()
854 );
855
856 match &mut self.connection {
857 MultiStreamConnectionTaskInner::Handshake {
858 handshake,
859 opened_substream: Some((opened_handshake_substream, handshake_webrtc_framing)),
860 established,
861 extra_open_substreams,
862 } if opened_handshake_substream == substream_id => {
863 // TODO: check the handshake timeout
864
865 // Progress the Noise handshake.
866 let handshake_outcome = {
867 // The Noise data is not directly the data of the substream. Instead,
868 // everything is wrapped within a Protobuf frame.
869 let mut with_framing = match handshake_webrtc_framing.read_write(read_write) {
870 Ok(f) => f,
871 Err(_err) => {
872 // TODO: not great for diagnostic to just ignore the error; also, the connection should just reset entirely
873 return SubstreamFate::Reset;
874 }
875 };
876 handshake.take().unwrap().read_write(&mut with_framing)
877 };
878
879 match handshake_outcome {
880 Ok(noise::NoiseHandshake::InProgress(handshake_update)) => {
881 *handshake = Some(handshake_update);
882 SubstreamFate::Continue
883 }
884 Err(_err) => return SubstreamFate::Reset, // TODO: /!\
885 Ok(noise::NoiseHandshake::Success {
886 cipher: _,
887 remote_peer_id,
888 }) => {
889 // The handshake has succeeded and we will transition into "established"
890 // mode.
891 let mut established = established.take().unwrap();
892 for (substream_id, outbound) in extra_open_substreams.drain() {
893 established.add_substream(substream_id, outbound);
894 }
895
896 self.connection = MultiStreamConnectionTaskInner::Established {
897 established,
898 handshake_finished_message_to_send: Some(remote_peer_id),
899 handshake_substream: None, // TODO: do properly
900 outbound_substreams_map: hashbrown::HashMap::with_capacity_and_hasher(
901 0,
902 Default::default(),
903 ),
904 notifications_in_close_acknowledgments:
905 hashbrown::HashSet::with_capacity_and_hasher(2, Default::default()),
906 inbound_accept_cancel_events: VecDeque::with_capacity(2),
907 };
908
909 // TODO: hacky
910 SubstreamFate::Reset
911 }
912 }
913 }
914 MultiStreamConnectionTaskInner::Established {
915 handshake_substream,
916 ..
917 } if handshake_substream
918 .as_ref()
919 .map_or(false, |s| s == substream_id) =>
920 {
921 // Close the writing side. If the reading side is closed, we indicate that the
922 // substream is dead. If the reading side is still open, we indicate that it's not
923 // dead and simply wait for the remote to close it.
924 // TODO: kill the connection if the remote sends more data?
925 read_write.close_write();
926 if read_write.expected_incoming_bytes.is_none() {
927 *handshake_substream = None;
928 SubstreamFate::Reset
929 } else {
930 SubstreamFate::Continue
931 }
932 }
933 MultiStreamConnectionTaskInner::Established { established, .. } => {
934 established.substream_read_write(substream_id, read_write)
935 }
936 MultiStreamConnectionTaskInner::Handshake {
937 extra_open_substreams,
938 ..
939 } => {
940 assert!(extra_open_substreams.contains_key(substream_id));
941 // Don't do anything. Don't read or write. Instead we wait for the handshake to
942 // be finished.
943 SubstreamFate::Continue
944 }
945 MultiStreamConnectionTaskInner::ShutdownAcked { .. }
946 | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
947 // TODO: panic if substream id invalid?
948 SubstreamFate::Reset
949 }
950 }
951 }
952}