Skip to main content

zmux_quinn/
lib.rs

1//! Optional QUIC adapter support for zmux.
2//!
3//! Adapter streams start with `varint(metadata_len) + STREAM-METADATA-TLV...`
4//! when open-time metadata is present.
5//!
6//! Mapping rules:
7//! - bidirectional and unidirectional open / accept map directly to QUIC
8//!   streams;
9//! - open-time zmux metadata is carried in the per-stream prelude;
10//! - accepted-stream prelude parsing is concurrency-bounded;
11//! - post-open metadata updates return an unsupported error;
12//! - QUIC stream termination carries numeric codes.
13
14#![forbid(unsafe_code)]
15
16use std::collections::HashMap;
17use std::future::Future;
18use std::io::{ErrorKind, IoSlice, IoSliceMut, Read};
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, Mutex};
22use std::time::{Duration, Instant};
23
24use bytes::Bytes;
25use tokio::sync::{mpsc, watch, Mutex as AsyncMutex, Notify, Semaphore};
26use zmux::{
27    build_open_metadata_prefix, parse_stream_metadata_bytes_view, read_varint, AsyncBoxFuture,
28    AsyncDuplexStreamHandle, AsyncRecvStreamHandle, AsyncSendStreamHandle, AsyncSession,
29    AsyncStreamHandle, OpenOptions, OpenRequest, OpenSend, Result, StreamMetadata, WritePayload,
30    CAPABILITY_OPEN_METADATA, CAPABILITY_PRIORITY_HINTS, CAPABILITY_STREAM_GROUPS,
31};
32
33const STREAM_PRELUDE_MAX_PAYLOAD: u64 = 16 << 10;
34const QUINN_WRITE_VECTORED_COALESCE_MAX_BYTES: usize = 64 << 10;
35const OPEN_METADATA_CAPABILITIES: u64 =
36    CAPABILITY_OPEN_METADATA | CAPABILITY_PRIORITY_HINTS | CAPABILITY_STREAM_GROUPS;
37pub const DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT: Duration = Duration::from_secs(5);
38pub const DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT: usize = 8;
39pub const MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT: usize = 1024;
40const ACCEPTED_PRELUDE_RESULT_QUEUE_CAP: usize = 32;
41const MAX_REASON_STATS_CODES: usize = 1024;
42
43static DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT_VALUE: AtomicUsize =
44    AtomicUsize::new(DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT);
45
46#[derive(Debug, Clone, Copy, Default)]
47struct AdapterAddresses {
48    local_addr: Option<SocketAddr>,
49    peer_addr: Option<SocketAddr>,
50}
51
52pub fn target_claims() -> &'static [zmux::Claim] {
53    &[zmux::Claim::StreamAdapterProfileV1]
54}
55
56pub fn target_implementation_profiles() -> &'static [zmux::ImplementationProfile] {
57    &[]
58}
59
60pub fn target_suites() -> &'static [zmux::ConformanceSuite] {
61    &[zmux::ConformanceSuite::StreamAdapterProfile]
62}
63
64fn default_accepted_prelude_max_concurrent() -> usize {
65    let current = DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT_VALUE.load(Ordering::Acquire);
66    if current > 0 {
67        current.min(MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT)
68    } else {
69        1
70    }
71}
72
73fn set_default_accepted_prelude_max_concurrent(max: usize) {
74    let max = if max == 0 {
75        DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT
76    } else {
77        max.min(MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT)
78    };
79    DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT_VALUE.store(max, Ordering::Release);
80}
81
82#[derive(Debug, Clone, Default, PartialEq, Eq)]
83pub struct AcceptedStreamMetadata {
84    pub metadata: StreamMetadata,
85    pub metadata_valid: bool,
86}
87
88impl AcceptedStreamMetadata {
89    pub fn metadata(&self) -> &StreamMetadata {
90        &self.metadata
91    }
92
93    pub fn is_metadata_valid(&self) -> bool {
94        self.metadata_valid
95    }
96
97    pub fn open_info(&self) -> &[u8] {
98        self.metadata.open_info()
99    }
100
101    pub fn has_open_info(&self) -> bool {
102        self.metadata.has_open_info()
103    }
104}
105
106#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
107pub enum AcceptedPreludeReadTimeout {
108    #[default]
109    Default,
110    Disabled,
111    Timeout(Duration),
112}
113
114impl AcceptedPreludeReadTimeout {
115    fn normalize(self) -> Option<Duration> {
116        match self {
117            Self::Default => Some(DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT),
118            Self::Disabled => None,
119            Self::Timeout(timeout) if timeout.is_zero() => {
120                Some(DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT)
121            }
122            Self::Timeout(timeout) => Some(timeout),
123        }
124    }
125}
126
127#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
128pub struct SessionOptions {
129    pub accepted_prelude_read_timeout: AcceptedPreludeReadTimeout,
130    pub accepted_prelude_max_concurrent: Option<usize>,
131    pub local_addr: Option<SocketAddr>,
132    pub peer_addr: Option<SocketAddr>,
133}
134
135impl SessionOptions {
136    /// Returns the process-wide default accepted prelude parsing concurrency.
137    ///
138    /// `SessionOptions::default()` follows this value until a session explicitly
139    /// sets `accepted_prelude_max_concurrent`.
140    pub fn default_accepted_prelude_max_concurrent() -> usize {
141        default_accepted_prelude_max_concurrent()
142    }
143
144    /// Updates the process-wide accepted prelude parsing concurrency default.
145    ///
146    /// Values above `MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT` are clamped. Passing
147    /// zero restores the built-in `DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT`
148    /// value.
149    pub fn set_default_accepted_prelude_max_concurrent(max: usize) {
150        set_default_accepted_prelude_max_concurrent(max);
151    }
152
153    #[must_use]
154    pub fn new() -> Self {
155        Self::default()
156    }
157
158    #[must_use]
159    pub fn accepted_prelude_read_timeout(mut self, timeout: Duration) -> Self {
160        self.accepted_prelude_read_timeout = AcceptedPreludeReadTimeout::Timeout(timeout);
161        self
162    }
163
164    #[must_use]
165    pub fn disable_accepted_prelude_read_timeout(mut self) -> Self {
166        self.accepted_prelude_read_timeout = AcceptedPreludeReadTimeout::Disabled;
167        self
168    }
169
170    /// Sets this session's accepted prelude parsing concurrency.
171    ///
172    /// Passing zero leaves the session on the current process-wide default.
173    #[must_use]
174    pub fn accepted_prelude_max_concurrent(mut self, max: usize) -> Self {
175        self.accepted_prelude_max_concurrent = Some(max);
176        self
177    }
178
179    #[must_use]
180    pub fn local_addr(mut self, addr: SocketAddr) -> Self {
181        self.local_addr = Some(addr);
182        self
183    }
184
185    #[must_use]
186    pub fn peer_addr(mut self, addr: SocketAddr) -> Self {
187        self.peer_addr = Some(addr);
188        self
189    }
190
191    #[must_use]
192    pub fn addresses(
193        mut self,
194        local_addr: Option<SocketAddr>,
195        peer_addr: Option<SocketAddr>,
196    ) -> Self {
197        self.local_addr = local_addr;
198        self.peer_addr = peer_addr;
199        self
200    }
201}
202
203pub fn build_stream_prelude(opts: &OpenOptions) -> Result<Vec<u8>> {
204    let prelude = build_open_metadata_prefix(
205        OPEN_METADATA_CAPABILITIES,
206        opts.initial_priority(),
207        opts.initial_group(),
208        opts.open_info_bytes(),
209        STREAM_PRELUDE_MAX_PAYLOAD,
210    )?;
211    if prelude.is_empty() {
212        Ok(vec![0])
213    } else {
214        Ok(prelude)
215    }
216}
217
218pub fn read_stream_prelude<R: Read>(reader: &mut R) -> Result<AcceptedStreamMetadata> {
219    let (metadata_len, prefix_len) =
220        read_varint(reader).map_err(|_| protocol_prelude_error("parse stream prelude length"))?;
221    if metadata_len == 0 {
222        return Ok(AcceptedStreamMetadata {
223            metadata: StreamMetadata::default(),
224            metadata_valid: true,
225        });
226    }
227    let metadata_len = checked_prelude_metadata_len(metadata_len, prefix_len)?;
228    let mut metadata_raw = vec![0u8; metadata_len];
229    reader.read_exact(&mut metadata_raw).map_err(|err| {
230        if err.kind() == ErrorKind::UnexpectedEof {
231            protocol_prelude_error("unexpected EOF in stream prelude")
232        } else {
233            err.into()
234        }
235    })?;
236    let (metadata, metadata_valid) = parse_stream_metadata_bytes_view(&metadata_raw)
237        .map_err(|_| protocol_prelude_error("malformed stream prelude metadata"))?;
238    Ok(AcceptedStreamMetadata {
239        metadata: metadata.try_to_owned()?,
240        metadata_valid,
241    })
242}
243
244fn normalize_accepted_prelude_read_timeout(opts: SessionOptions) -> Option<Duration> {
245    opts.accepted_prelude_read_timeout.normalize()
246}
247
248fn normalize_accepted_prelude_max_concurrent(max: Option<usize>) -> usize {
249    match max {
250        Some(max) if max > 0 => max.min(MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT),
251        _ => default_accepted_prelude_max_concurrent(),
252    }
253}
254
255#[derive(Clone)]
256pub struct QuinnSession {
257    conn: quinn::Connection,
258    accepted_prelude_read_timeout: Option<Duration>,
259    local_addr: Option<SocketAddr>,
260    peer_addr: Option<SocketAddr>,
261    prepare_sem: Arc<Semaphore>,
262    active: Arc<ActiveCounters>,
263    internal: Arc<InternalTasks>,
264    stats: Arc<AdapterStats>,
265    accept_shutdown: watch::Sender<()>,
266    bidi_accept: Arc<AsyncMutex<Option<mpsc::Receiver<Result<QuinnStream>>>>>,
267    uni_accept: Arc<AsyncMutex<Option<mpsc::Receiver<Result<QuinnRecvStream>>>>>,
268}
269
270impl QuinnSession {
271    #[must_use]
272    pub fn new(conn: quinn::Connection) -> Self {
273        Self::with_options(conn, SessionOptions::default())
274    }
275
276    #[must_use]
277    pub fn with_options(conn: quinn::Connection, opts: SessionOptions) -> Self {
278        let (accept_shutdown, _) = watch::channel(());
279        let local_addr = opts.local_addr;
280        let peer_addr = opts.peer_addr.or_else(|| Some(conn.remote_address()));
281        Self {
282            conn,
283            accepted_prelude_read_timeout: normalize_accepted_prelude_read_timeout(opts),
284            local_addr,
285            peer_addr,
286            prepare_sem: Arc::new(Semaphore::new(normalize_accepted_prelude_max_concurrent(
287                opts.accepted_prelude_max_concurrent,
288            ))),
289            active: Arc::new(ActiveCounters::default()),
290            internal: Arc::new(InternalTasks::default()),
291            stats: Arc::new(AdapterStats::new()),
292            accept_shutdown,
293            bidi_accept: Arc::new(AsyncMutex::new(None)),
294            uni_accept: Arc::new(AsyncMutex::new(None)),
295        }
296    }
297
298    pub fn local_addr(&self) -> Option<SocketAddr> {
299        self.local_addr
300    }
301
302    pub fn peer_addr(&self) -> Option<SocketAddr> {
303        self.peer_addr
304    }
305
306    pub async fn accept_stream(&self) -> Result<QuinnStream> {
307        let mut receiver = self.bidi_accept.lock().await;
308        if receiver.is_none() {
309            let (tx, rx) = mpsc::channel(ACCEPTED_PRELUDE_RESULT_QUEUE_CAP);
310            *receiver = Some(rx);
311            self.spawn_bidi_accept_loop(tx);
312        }
313        let receiver = receiver.as_mut().expect("bidi accept receiver initialized");
314        loop {
315            match receiver.recv().await {
316                Some(Ok(stream)) => {
317                    self.stats.note_accepted_stream();
318                    return Ok(stream);
319                }
320                Some(Err(err)) if accepted_prelude_rejectable(&err) => continue,
321                Some(Err(err)) => return Err(err),
322                None => return Err(zmux::Error::session_closed()),
323            }
324        }
325    }
326
327    pub async fn accept_stream_timeout(&self, timeout: Duration) -> Result<QuinnStream> {
328        with_timeout(self.accept_stream(), timeout, "accept").await
329    }
330
331    pub async fn accept_uni_stream(&self) -> Result<QuinnRecvStream> {
332        let mut receiver = self.uni_accept.lock().await;
333        if receiver.is_none() {
334            let (tx, rx) = mpsc::channel(ACCEPTED_PRELUDE_RESULT_QUEUE_CAP);
335            *receiver = Some(rx);
336            self.spawn_uni_accept_loop(tx);
337        }
338        let receiver = receiver.as_mut().expect("uni accept receiver initialized");
339        loop {
340            match receiver.recv().await {
341                Some(Ok(stream)) => {
342                    self.stats.note_accepted_stream();
343                    return Ok(stream);
344                }
345                Some(Err(err)) if accepted_prelude_rejectable(&err) => continue,
346                Some(Err(err)) => return Err(err),
347                None => return Err(zmux::Error::session_closed()),
348            }
349        }
350    }
351
352    pub async fn accept_uni_stream_timeout(&self, timeout: Duration) -> Result<QuinnRecvStream> {
353        with_timeout(self.accept_uni_stream(), timeout, "accept").await
354    }
355
356    fn spawn_bidi_accept_loop(&self, tx: mpsc::Sender<Result<QuinnStream>>) {
357        let conn = self.conn.clone();
358        let timeout = self.accepted_prelude_read_timeout;
359        let addresses = AdapterAddresses {
360            local_addr: self.local_addr,
361            peer_addr: self.peer_addr,
362        };
363        let sem = self.prepare_sem.clone();
364        let active = self.active.clone();
365        let stats = self.stats.clone();
366        let internal = self.internal.clone();
367        let mut shutdown = self.accept_shutdown.subscribe();
368        internal.clone().spawn(async move {
369            loop {
370                let (send, recv) = match accept_bi_or_shutdown(&conn, &mut shutdown).await {
371                    Some(Ok(streams)) => streams,
372                    None => return,
373                    Some(Err(err)) => {
374                        publish_bidi_accept_result(
375                            &tx,
376                            Err(translate_connection_error(err)),
377                            &conn,
378                        )
379                        .await;
380                        return;
381                    }
382                };
383                let permit = match acquire_prepare_permit_or_shutdown(&sem, &mut shutdown).await {
384                    Some(Ok(permit)) => permit,
385                    None => return,
386                    Some(Err(_)) => {
387                        let _ = tx.send(Err(zmux::Error::session_closed())).await;
388                        return;
389                    }
390                };
391                let tx = tx.clone();
392                let active = active.clone();
393                let stats = stats.clone();
394                let conn = conn.clone();
395                let internal = internal.clone();
396                let mut worker_shutdown = shutdown.clone();
397                internal.spawn(async move {
398                    let _permit = permit;
399                    if let Some(result) = prepare_accepted_bidi_stream_or_shutdown(
400                        send,
401                        recv,
402                        timeout,
403                        addresses,
404                        active,
405                        stats,
406                        &mut worker_shutdown,
407                    )
408                    .await
409                    {
410                        publish_bidi_accept_result(&tx, result, &conn).await;
411                    }
412                });
413            }
414        });
415    }
416
417    fn spawn_uni_accept_loop(&self, tx: mpsc::Sender<Result<QuinnRecvStream>>) {
418        let conn = self.conn.clone();
419        let timeout = self.accepted_prelude_read_timeout;
420        let addresses = AdapterAddresses {
421            local_addr: self.local_addr,
422            peer_addr: self.peer_addr,
423        };
424        let sem = self.prepare_sem.clone();
425        let active = self.active.clone();
426        let stats = self.stats.clone();
427        let internal = self.internal.clone();
428        let mut shutdown = self.accept_shutdown.subscribe();
429        internal.clone().spawn(async move {
430            loop {
431                let recv = match accept_uni_or_shutdown(&conn, &mut shutdown).await {
432                    Some(Ok(recv)) => recv,
433                    None => return,
434                    Some(Err(err)) => {
435                        publish_uni_accept_result(&tx, Err(translate_connection_error(err)), &conn)
436                            .await;
437                        return;
438                    }
439                };
440                let permit = match acquire_prepare_permit_or_shutdown(&sem, &mut shutdown).await {
441                    Some(Ok(permit)) => permit,
442                    None => return,
443                    Some(Err(_)) => {
444                        let _ = tx.send(Err(zmux::Error::session_closed())).await;
445                        return;
446                    }
447                };
448                let tx = tx.clone();
449                let active = active.clone();
450                let stats = stats.clone();
451                let conn = conn.clone();
452                let internal = internal.clone();
453                let mut worker_shutdown = shutdown.clone();
454                internal.spawn(async move {
455                    let _permit = permit;
456                    if let Some(result) = prepare_accepted_uni_stream_or_shutdown(
457                        recv,
458                        timeout,
459                        addresses,
460                        active,
461                        stats,
462                        &mut worker_shutdown,
463                    )
464                    .await
465                    {
466                        publish_uni_accept_result(&tx, result, &conn).await;
467                    }
468                });
469            }
470        });
471    }
472
473    pub async fn open_stream(&self) -> Result<QuinnStream> {
474        self.open_stream_with(OpenRequest::new()).await
475    }
476
477    pub async fn open_stream_with(&self, request: impl Into<OpenRequest>) -> Result<QuinnStream> {
478        let (opts, timeout) = request.into().into_parts();
479        let open = self.open_stream_inner(opts);
480        match timeout {
481            Some(timeout) => with_timeout(open, timeout, "open").await,
482            None => open.await,
483        }
484    }
485
486    async fn open_stream_inner(&self, opts: OpenOptions) -> Result<QuinnStream> {
487        let prelude = PreludeState::local(opts)?;
488        let started_at = Instant::now();
489        let (send, recv) = self
490            .conn
491            .open_bi()
492            .await
493            .map_err(translate_connection_error)?;
494        let stream = QuinnStream::local(
495            send,
496            recv,
497            prelude,
498            self.stats.clone(),
499            self.local_addr,
500            self.peer_addr,
501        );
502        if let Err(err) = stream.maybe_send_open_prelude_on_open().await {
503            stream.discard_after_open_error(&err).await;
504            return Err(err);
505        }
506        self.stats.note_open_latency(started_at, Instant::now());
507        Ok(stream.with_active(self.active.clone(), ActiveKind::LocalBidi))
508    }
509
510    pub async fn open_uni_stream(&self) -> Result<QuinnSendStream> {
511        self.open_uni_stream_with(OpenRequest::new()).await
512    }
513
514    pub async fn open_uni_stream_with(
515        &self,
516        request: impl Into<OpenRequest>,
517    ) -> Result<QuinnSendStream> {
518        let (opts, timeout) = request.into().into_parts();
519        let open = self.open_uni_stream_inner(opts);
520        match timeout {
521            Some(timeout) => with_timeout(open, timeout, "open").await,
522            None => open.await,
523        }
524    }
525
526    async fn open_uni_stream_inner(&self, opts: OpenOptions) -> Result<QuinnSendStream> {
527        let prelude = PreludeState::local(opts)?;
528        let started_at = Instant::now();
529        let send = self
530            .conn
531            .open_uni()
532            .await
533            .map_err(translate_connection_error)?;
534        let stream = QuinnSendStream::local(
535            send,
536            prelude,
537            self.stats.clone(),
538            self.local_addr,
539            self.peer_addr,
540        );
541        if let Err(err) = stream.maybe_send_open_prelude_on_open().await {
542            stream.discard_after_open_error(&err).await;
543            return Err(err);
544        }
545        self.stats.note_open_latency(started_at, Instant::now());
546        Ok(stream.with_active(self.active.clone(), ActiveKind::LocalUni))
547    }
548
549    pub async fn open_and_send<'a>(&self, request: impl Into<OpenSend<'a>>) -> Result<QuinnStream> {
550        let (opts, payload, timeout) = request.into().into_parts();
551        let requested = payload.checked_len()?;
552        let start = Instant::now();
553        let mut open = OpenRequest::new().options(opts);
554        if let Some(timeout) = timeout {
555            ensure_positive_session_timeout(timeout, "open", zmux::ErrorOperation::Open)?;
556            open = open.timeout(timeout);
557        }
558        let stream = self
559            .open_stream_with(open)
560            .await
561            .map_err(|err| err.with_session_context(zmux::ErrorOperation::Open))?;
562        if requested == 0 {
563            return Ok(stream);
564        }
565        let write_result: Result<()> = async {
566            let timeout = timeout
567                .map(|timeout| remaining_write_timeout(start, timeout))
568                .transpose()?;
569            match timeout {
570                Some(timeout) => {
571                    stream
572                        .write_all_timeout(payload, timeout)
573                        .await
574                        .map_err(|err| {
575                            err.with_stream_context(
576                                zmux::ErrorOperation::Write,
577                                zmux::ErrorDirection::Write,
578                            )
579                        })?;
580                }
581                None => {
582                    stream.write_all(payload).await?;
583                }
584            }
585            Ok(())
586        }
587        .await;
588        if let Err(err) = write_result {
589            stream.discard_after_open_error(&err).await;
590            return Err(err);
591        }
592        Ok(stream)
593    }
594
595    pub async fn open_uni_and_send<'a>(
596        &self,
597        request: impl Into<OpenSend<'a>>,
598    ) -> Result<QuinnSendStream> {
599        let (opts, payload, timeout) = request.into().into_parts();
600        let requested = payload.checked_len()?;
601        let start = Instant::now();
602        let mut open = OpenRequest::new().options(opts);
603        if let Some(timeout) = timeout {
604            ensure_positive_session_timeout(timeout, "open", zmux::ErrorOperation::Open)?;
605            open = open.timeout(timeout);
606        }
607        let stream = self
608            .open_uni_stream_with(open)
609            .await
610            .map_err(|err| err.with_session_context(zmux::ErrorOperation::Open))?;
611        let write_result: Result<()> = async {
612            let timeout = timeout
613                .map(|timeout| remaining_write_timeout(start, timeout))
614                .transpose()?;
615            let n = match (payload, timeout) {
616                (WritePayload::Bytes(data), Some(timeout)) => stream
617                    .write_final_timeout(WritePayload::Bytes(data), timeout)
618                    .await
619                    .map_err(|err| {
620                        err.with_stream_context(
621                            zmux::ErrorOperation::Write,
622                            zmux::ErrorDirection::Write,
623                        )
624                    })?,
625                (WritePayload::Bytes(data), None) => {
626                    stream.write_final(WritePayload::Bytes(data)).await?
627                }
628                (WritePayload::Vectored(parts), Some(timeout)) => stream
629                    .write_vectored_final_timeout(parts, timeout)
630                    .await
631                    .map_err(|err| {
632                        err.with_stream_context(
633                            zmux::ErrorOperation::Write,
634                            zmux::ErrorDirection::Write,
635                        )
636                    })?,
637                (WritePayload::Vectored(parts), None) => stream.write_vectored_final(parts).await?,
638            };
639            validate_progress(n, requested)?;
640            Ok(())
641        }
642        .await;
643        if let Err(err) = write_result {
644            stream.discard_after_open_error(&err).await;
645            return Err(err);
646        }
647        Ok(stream)
648    }
649
650    fn close_now(&self, code: quinn::VarInt, reason: &[u8]) {
651        let _ = self.accept_shutdown.send(());
652        self.conn.close(code, reason);
653    }
654
655    pub async fn close(&self) -> Result<()> {
656        self.close_now(quinn_varint(0), &[]);
657        self.wait().await
658    }
659
660    pub async fn close_with_error(&self, code: u64, reason: &str) -> Result<()> {
661        let code = checked_session_quinn_varint(code, zmux::ErrorOperation::Close)?;
662        self.close_now(code, reason.as_bytes());
663        Ok(())
664    }
665
666    pub async fn wait(&self) -> Result<()> {
667        let result = translate_wait_error(self.conn.closed().await);
668        self.internal.wait_idle().await;
669        result
670    }
671
672    pub async fn wait_timeout(&self, timeout: Duration) -> Result<bool> {
673        if self.is_closed() {
674            return Ok(true);
675        }
676        if timeout.is_zero() {
677            return Ok(false);
678        }
679        match tokio::time::timeout(timeout, self.wait()).await {
680            Ok(result) => result.map(|_| true),
681            Err(_) => Ok(false),
682        }
683    }
684
685    pub fn is_closed(&self) -> bool {
686        self.conn.close_reason().is_some()
687    }
688
689    pub fn close_error(&self) -> Option<zmux::Error> {
690        self.conn
691            .close_reason()
692            .and_then(|err| translate_wait_error(err).err())
693    }
694
695    pub fn state(&self) -> zmux::SessionState {
696        if self.is_closed() {
697            zmux::SessionState::Closed
698        } else {
699            zmux::SessionState::Ready
700        }
701    }
702
703    pub fn stats(&self) -> zmux::SessionStats {
704        let active_streams = self.active.snapshot();
705        let adapter_stats = self.stats.snapshot();
706        zmux::SessionStats {
707            state: self.state(),
708            sent_frames: 0,
709            received_frames: 0,
710            sent_data_bytes: adapter_stats.sent_data_bytes,
711            received_data_bytes: adapter_stats.received_data_bytes,
712            open_streams: usize::try_from(active_streams.total).unwrap_or(usize::MAX),
713            accepted_streams: adapter_stats.accepted_streams,
714            active_streams,
715            provisional: Default::default(),
716            accept_backlog: Default::default(),
717            retention: Default::default(),
718            memory: Default::default(),
719            abuse: Default::default(),
720            hidden: zmux::HiddenStateStats {
721                refused: adapter_stats.hidden_refused,
722                ..Default::default()
723            },
724            reasons: adapter_stats.reasons,
725            diagnostics: Default::default(),
726            pressure: Default::default(),
727            flush: adapter_stats.flush,
728            telemetry: adapter_stats.telemetry,
729            progress: adapter_stats.progress,
730            blocked_write_total: adapter_stats.blocked_write_total,
731            writer_queue: Default::default(),
732            liveness: Default::default(),
733        }
734    }
735}
736
737async fn with_timeout<T>(
738    fut: impl Future<Output = Result<T>>,
739    timeout: Duration,
740    operation: &'static str,
741) -> Result<T> {
742    if timeout.is_zero() {
743        return Err(zmux::Error::timeout(operation));
744    }
745    tokio::time::timeout(timeout, fut)
746        .await
747        .map_err(|_| zmux::Error::timeout(operation))?
748}
749
750async fn with_optional_timeout<T>(
751    fut: impl Future<Output = Result<T>>,
752    timeout: Option<Duration>,
753    operation: &'static str,
754) -> Result<T> {
755    match timeout {
756        Some(timeout) => with_timeout(fut, timeout, operation).await,
757        None => fut.await,
758    }
759}
760
761fn ensure_positive_session_timeout(
762    timeout: Duration,
763    operation_name: &'static str,
764    operation: zmux::ErrorOperation,
765) -> Result<()> {
766    if timeout.is_zero() {
767        Err(zmux::Error::timeout(operation_name).with_session_context(operation))
768    } else {
769        Ok(())
770    }
771}
772
773fn remaining_timeout(start: Instant, timeout: Duration) -> Option<Duration> {
774    timeout
775        .checked_sub(start.elapsed())
776        .filter(|duration| !duration.is_zero())
777}
778
779fn timeout_to_deadline(timeout: Option<Duration>) -> Option<Instant> {
780    timeout.and_then(|timeout| Instant::now().checked_add(timeout))
781}
782
783fn remaining_write_timeout(start: Instant, timeout: Duration) -> Result<Duration> {
784    remaining_timeout(start, timeout).ok_or_else(|| {
785        zmux::Error::timeout("write")
786            .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
787    })
788}
789
790fn timeout_until(deadline: Option<Instant>, operation: &'static str) -> Result<Option<Duration>> {
791    match deadline {
792        Some(deadline) => deadline
793            .checked_duration_since(Instant::now())
794            .filter(|timeout| !timeout.is_zero())
795            .map(Some)
796            .ok_or_else(|| zmux::Error::timeout(operation)),
797        None => Ok(None),
798    }
799}
800
801#[derive(Debug)]
802struct AdapterStats {
803    origin: Instant,
804    sent_data_bytes: AtomicU64,
805    received_data_bytes: AtomicU64,
806    accepted_streams: AtomicU64,
807    hidden_refused: AtomicU64,
808    flush_count: AtomicU64,
809    blocked_write_nanos: AtomicU64,
810    last_inbound_frame_at: AtomicU64,
811    last_control_progress_at: AtomicU64,
812    last_transport_write_at: AtomicU64,
813    last_stream_progress_at: AtomicU64,
814    last_application_progress_at: AtomicU64,
815    last_flush_at: AtomicU64,
816    last_flush_bytes: AtomicUsize,
817    last_open_latency_nanos: AtomicU64,
818    reasons: Mutex<AdapterReasonStats>,
819}
820
821#[derive(Debug, Clone)]
822struct AdapterStatsSnapshot {
823    sent_data_bytes: u64,
824    received_data_bytes: u64,
825    accepted_streams: u64,
826    hidden_refused: u64,
827    flush: zmux::FlushStats,
828    telemetry: zmux::TelemetryStats,
829    progress: zmux::ProgressStats,
830    blocked_write_total: Duration,
831    reasons: zmux::ReasonStats,
832}
833
834#[derive(Debug, Default)]
835struct InternalTasks {
836    active: AtomicUsize,
837    idle: Notify,
838}
839
840impl InternalTasks {
841    fn spawn<F>(self: Arc<Self>, fut: F)
842    where
843        F: Future<Output = ()> + Send + 'static,
844    {
845        self.active.fetch_add(1, Ordering::AcqRel);
846        tokio::spawn(async move {
847            fut.await;
848            if self.active.fetch_sub(1, Ordering::AcqRel) == 1 {
849                self.idle.notify_waiters();
850            }
851        });
852    }
853
854    async fn wait_idle(&self) {
855        loop {
856            let notified = self.idle.notified();
857            if self.active.load(Ordering::Acquire) == 0 {
858                return;
859            }
860            notified.await;
861        }
862    }
863}
864
865#[derive(Debug, Default)]
866struct AdapterReasonStats {
867    reset: HashMap<u64, u64>,
868    reset_overflow: u64,
869    abort: HashMap<u64, u64>,
870    abort_overflow: u64,
871}
872
873impl AdapterStats {
874    const NO_DURATION: u64 = u64::MAX;
875
876    fn new() -> Self {
877        Self {
878            origin: Instant::now(),
879            sent_data_bytes: AtomicU64::new(0),
880            received_data_bytes: AtomicU64::new(0),
881            accepted_streams: AtomicU64::new(0),
882            hidden_refused: AtomicU64::new(0),
883            flush_count: AtomicU64::new(0),
884            blocked_write_nanos: AtomicU64::new(0),
885            last_inbound_frame_at: AtomicU64::new(0),
886            last_control_progress_at: AtomicU64::new(0),
887            last_transport_write_at: AtomicU64::new(0),
888            last_stream_progress_at: AtomicU64::new(0),
889            last_application_progress_at: AtomicU64::new(0),
890            last_flush_at: AtomicU64::new(0),
891            last_flush_bytes: AtomicUsize::new(0),
892            last_open_latency_nanos: AtomicU64::new(Self::NO_DURATION),
893            reasons: Mutex::new(AdapterReasonStats::default()),
894        }
895    }
896
897    fn snapshot(&self) -> AdapterStatsSnapshot {
898        let last_open_latency = match self.last_open_latency_nanos.load(Ordering::Relaxed) {
899            Self::NO_DURATION => None,
900            nanos => Some(Duration::from_nanos(nanos)),
901        };
902        let flush_count = self.flush_count.load(Ordering::Relaxed);
903        AdapterStatsSnapshot {
904            sent_data_bytes: self.sent_data_bytes.load(Ordering::Relaxed),
905            received_data_bytes: self.received_data_bytes.load(Ordering::Relaxed),
906            accepted_streams: self.accepted_streams.load(Ordering::Relaxed),
907            hidden_refused: self.hidden_refused.load(Ordering::Relaxed),
908            flush: zmux::FlushStats {
909                count: flush_count,
910                last_at: self.instant_for_event(self.last_flush_at.load(Ordering::Relaxed)),
911                last_frames: u64::from(flush_count != 0),
912                last_bytes: self.last_flush_bytes.load(Ordering::Relaxed),
913            },
914            telemetry: zmux::TelemetryStats {
915                last_open_latency,
916                send_rate_estimate_bytes_per_second: 0,
917            },
918            progress: zmux::ProgressStats {
919                inbound_frame_at: self
920                    .instant_for_event(self.last_inbound_frame_at.load(Ordering::Relaxed)),
921                control_progress_at: self
922                    .instant_for_event(self.last_control_progress_at.load(Ordering::Relaxed)),
923                transport_write_at: self
924                    .instant_for_event(self.last_transport_write_at.load(Ordering::Relaxed)),
925                stream_progress_at: self
926                    .instant_for_event(self.last_stream_progress_at.load(Ordering::Relaxed)),
927                application_progress_at: self
928                    .instant_for_event(self.last_application_progress_at.load(Ordering::Relaxed)),
929                ping_sent_at: None,
930                pong_at: None,
931            },
932            blocked_write_total: Duration::from_nanos(
933                self.blocked_write_nanos.load(Ordering::Relaxed),
934            ),
935            reasons: self.reasons.lock().unwrap().snapshot(),
936        }
937    }
938
939    fn note_accepted_stream(&self) {
940        saturating_add_atomic_u64(&self.accepted_streams, 1);
941    }
942
943    fn note_hidden_refused(&self) {
944        saturating_add_atomic_u64(&self.hidden_refused, 1);
945    }
946
947    fn note_reset_reason(&self, code: u64) {
948        self.reasons.lock().unwrap().note_reset(code);
949    }
950
951    fn note_abort_reason(&self, code: u64) {
952        self.reasons.lock().unwrap().note_abort(code);
953    }
954
955    fn note_control_progress(&self) {
956        self.note_control_progress_at(Instant::now());
957    }
958
959    fn note_control_progress_at(&self, at: Instant) {
960        self.last_control_progress_at
961            .store(self.event_nanos(at), Ordering::Relaxed);
962    }
963
964    fn note_data_read(&self, bytes: usize, at: Instant) {
965        if bytes == 0 {
966            return;
967        }
968        let event = self.event_nanos(at);
969        saturating_add_atomic_u64(&self.received_data_bytes, usize_to_u64_saturating(bytes));
970        self.last_inbound_frame_at.store(event, Ordering::Relaxed);
971        self.last_stream_progress_at.store(event, Ordering::Relaxed);
972        self.last_application_progress_at
973            .store(event, Ordering::Relaxed);
974    }
975
976    fn note_data_write(&self, bytes: usize, at: Instant) {
977        if bytes == 0 {
978            return;
979        }
980        let event = self.event_nanos(at);
981        saturating_add_atomic_u64(&self.sent_data_bytes, usize_to_u64_saturating(bytes));
982        self.last_stream_progress_at.store(event, Ordering::Relaxed);
983        self.last_application_progress_at
984            .store(event, Ordering::Relaxed);
985    }
986
987    fn note_flush(&self, bytes: usize, at: Instant) {
988        let event = self.event_nanos(at);
989        saturating_add_atomic_u64(&self.flush_count, 1);
990        self.last_transport_write_at.store(event, Ordering::Relaxed);
991        self.last_flush_at.store(event, Ordering::Relaxed);
992        self.last_flush_bytes.store(bytes, Ordering::Relaxed);
993    }
994
995    fn note_write_wait(&self, started_at: Instant, completed_at: Instant) {
996        let elapsed = completed_at.saturating_duration_since(started_at);
997        saturating_add_atomic_u64(
998            &self.blocked_write_nanos,
999            duration_nanos_saturating(elapsed),
1000        );
1001    }
1002
1003    fn note_open_latency(&self, started_at: Instant, completed_at: Instant) {
1004        let elapsed = completed_at.saturating_duration_since(started_at);
1005        self.last_open_latency_nanos
1006            .store(duration_nanos_saturating(elapsed), Ordering::Relaxed);
1007        self.last_stream_progress_at
1008            .store(self.event_nanos(completed_at), Ordering::Relaxed);
1009    }
1010
1011    fn event_nanos(&self, at: Instant) -> u64 {
1012        duration_nanos_saturating(at.saturating_duration_since(self.origin)).max(1)
1013    }
1014
1015    fn instant_for_event(&self, nanos: u64) -> Option<Instant> {
1016        if nanos == 0 {
1017            None
1018        } else {
1019            self.origin.checked_add(Duration::from_nanos(nanos))
1020        }
1021    }
1022}
1023
1024impl AdapterReasonStats {
1025    fn note_reset(&mut self, code: u64) {
1026        note_reason(&mut self.reset, &mut self.reset_overflow, code);
1027    }
1028
1029    fn note_abort(&mut self, code: u64) {
1030        note_reason(&mut self.abort, &mut self.abort_overflow, code);
1031    }
1032
1033    fn snapshot(&self) -> zmux::ReasonStats {
1034        zmux::ReasonStats {
1035            reset: self.reset.clone(),
1036            reset_overflow: self.reset_overflow,
1037            abort: self.abort.clone(),
1038            abort_overflow: self.abort_overflow,
1039        }
1040    }
1041}
1042
1043#[derive(Debug, Default)]
1044struct ActiveCounters {
1045    local_bidi: AtomicUsize,
1046    local_uni: AtomicUsize,
1047    peer_bidi: AtomicUsize,
1048    peer_uni: AtomicUsize,
1049}
1050
1051impl ActiveCounters {
1052    fn add(&self, kind: ActiveKind) {
1053        saturating_increment_atomic_usize(self.counter(kind));
1054    }
1055
1056    fn done(&self, kind: ActiveKind) {
1057        let counter = self.counter(kind);
1058        let mut current = counter.load(Ordering::Relaxed);
1059        while current != 0 {
1060            match counter.compare_exchange_weak(
1061                current,
1062                current - 1,
1063                Ordering::Relaxed,
1064                Ordering::Relaxed,
1065            ) {
1066                Ok(_) => return,
1067                Err(next) => current = next,
1068            }
1069        }
1070    }
1071
1072    fn snapshot(&self) -> zmux::ActiveStreamStats {
1073        let local_bidi = usize_to_u64_saturating(self.local_bidi.load(Ordering::Relaxed));
1074        let local_uni = usize_to_u64_saturating(self.local_uni.load(Ordering::Relaxed));
1075        let peer_bidi = usize_to_u64_saturating(self.peer_bidi.load(Ordering::Relaxed));
1076        let peer_uni = usize_to_u64_saturating(self.peer_uni.load(Ordering::Relaxed));
1077        zmux::ActiveStreamStats {
1078            local_bidi,
1079            local_uni,
1080            peer_bidi,
1081            peer_uni,
1082            total: local_bidi
1083                .saturating_add(local_uni)
1084                .saturating_add(peer_bidi)
1085                .saturating_add(peer_uni),
1086        }
1087    }
1088
1089    fn counter(&self, kind: ActiveKind) -> &AtomicUsize {
1090        match kind {
1091            ActiveKind::LocalBidi => &self.local_bidi,
1092            ActiveKind::LocalUni => &self.local_uni,
1093            ActiveKind::PeerBidi => &self.peer_bidi,
1094            ActiveKind::PeerUni => &self.peer_uni,
1095        }
1096    }
1097}
1098
1099#[derive(Debug, Clone, Copy)]
1100enum ActiveKind {
1101    LocalBidi,
1102    LocalUni,
1103    PeerBidi,
1104    PeerUni,
1105}
1106
1107#[derive(Debug)]
1108struct ActiveGuard {
1109    counters: Arc<ActiveCounters>,
1110    kind: ActiveKind,
1111    tracked: AtomicBool,
1112}
1113
1114#[derive(Debug, Default)]
1115struct TerminalErrors {
1116    read: Option<zmux::Error>,
1117    write: Option<zmux::Error>,
1118}
1119
1120impl ActiveGuard {
1121    fn new(counters: Arc<ActiveCounters>, kind: ActiveKind) -> Self {
1122        counters.add(kind);
1123        Self {
1124            counters,
1125            kind,
1126            tracked: AtomicBool::new(true),
1127        }
1128    }
1129
1130    fn finish(&self) {
1131        if self.tracked.swap(false, Ordering::AcqRel) {
1132            self.counters.done(self.kind);
1133        }
1134    }
1135}
1136
1137impl Drop for ActiveGuard {
1138    fn drop(&mut self) {
1139        self.finish();
1140    }
1141}
1142
1143async fn accept_bi_or_shutdown(
1144    conn: &quinn::Connection,
1145    shutdown: &mut watch::Receiver<()>,
1146) -> Option<std::result::Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>> {
1147    tokio::select! {
1148        result = conn.accept_bi() => Some(result),
1149        _ = shutdown.changed() => None,
1150    }
1151}
1152
1153async fn accept_uni_or_shutdown(
1154    conn: &quinn::Connection,
1155    shutdown: &mut watch::Receiver<()>,
1156) -> Option<std::result::Result<quinn::RecvStream, quinn::ConnectionError>> {
1157    tokio::select! {
1158        result = conn.accept_uni() => Some(result),
1159        _ = shutdown.changed() => None,
1160    }
1161}
1162
1163async fn acquire_prepare_permit_or_shutdown(
1164    sem: &Arc<Semaphore>,
1165    shutdown: &mut watch::Receiver<()>,
1166) -> Option<std::result::Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError>> {
1167    let permit = sem.clone().acquire_owned();
1168    tokio::select! {
1169        result = permit => Some(result),
1170        _ = shutdown.changed() => None,
1171    }
1172}
1173
1174async fn prepare_accepted_bidi_stream_or_shutdown(
1175    send: quinn::SendStream,
1176    recv: quinn::RecvStream,
1177    timeout: Option<Duration>,
1178    addresses: AdapterAddresses,
1179    active: Arc<ActiveCounters>,
1180    stats: Arc<AdapterStats>,
1181    shutdown: &mut watch::Receiver<()>,
1182) -> Option<Result<QuinnStream>> {
1183    tokio::select! {
1184        result = prepare_accepted_bidi_stream(send, recv, timeout, addresses, active, stats) => result,
1185        _ = shutdown.changed() => None,
1186    }
1187}
1188
1189async fn prepare_accepted_uni_stream_or_shutdown(
1190    recv: quinn::RecvStream,
1191    timeout: Option<Duration>,
1192    addresses: AdapterAddresses,
1193    active: Arc<ActiveCounters>,
1194    stats: Arc<AdapterStats>,
1195    shutdown: &mut watch::Receiver<()>,
1196) -> Option<Result<QuinnRecvStream>> {
1197    tokio::select! {
1198        result = prepare_accepted_uni_stream(recv, timeout, addresses, active, stats) => result,
1199        _ = shutdown.changed() => None,
1200    }
1201}
1202
1203async fn prepare_accepted_bidi_stream(
1204    mut send: quinn::SendStream,
1205    mut recv: quinn::RecvStream,
1206    timeout: Option<Duration>,
1207    addresses: AdapterAddresses,
1208    active: Arc<ActiveCounters>,
1209    stats: Arc<AdapterStats>,
1210) -> Option<Result<QuinnStream>> {
1211    match read_accepted_metadata_with_timeout(&mut recv, timeout, true).await {
1212        Ok(metadata) => {
1213            stats.note_control_progress();
1214            let stream = QuinnStream::accepted(
1215                send,
1216                recv,
1217                metadata,
1218                stats,
1219                addresses.local_addr,
1220                addresses.peer_addr,
1221            );
1222            Some(Ok(stream.with_active(active, ActiveKind::PeerBidi)))
1223        }
1224        Err(err) if accepted_prelude_rejectable(&err) => {
1225            stats.note_hidden_refused();
1226            let _ = recv.stop(quinn_varint(zmux::ErrorCode::Protocol.as_u64()));
1227            let _ = send.reset(quinn_varint(zmux::ErrorCode::Protocol.as_u64()));
1228            None
1229        }
1230        Err(err) => Some(Err(err)),
1231    }
1232}
1233
1234async fn prepare_accepted_uni_stream(
1235    mut recv: quinn::RecvStream,
1236    timeout: Option<Duration>,
1237    addresses: AdapterAddresses,
1238    active: Arc<ActiveCounters>,
1239    stats: Arc<AdapterStats>,
1240) -> Option<Result<QuinnRecvStream>> {
1241    match read_accepted_metadata_with_timeout(&mut recv, timeout, false).await {
1242        Ok(metadata) => {
1243            stats.note_control_progress();
1244            let stream = QuinnRecvStream::accepted(
1245                recv,
1246                metadata,
1247                stats,
1248                addresses.local_addr,
1249                addresses.peer_addr,
1250            );
1251            Some(Ok(stream.with_active(active, ActiveKind::PeerUni)))
1252        }
1253        Err(err) if accepted_prelude_rejectable(&err) => {
1254            stats.note_hidden_refused();
1255            let _ = recv.stop(quinn_varint(zmux::ErrorCode::Protocol.as_u64()));
1256            None
1257        }
1258        Err(err) => Some(Err(err)),
1259    }
1260}
1261
1262async fn publish_bidi_accept_result(
1263    tx: &mpsc::Sender<Result<QuinnStream>>,
1264    result: Result<QuinnStream>,
1265    conn: &quinn::Connection,
1266) {
1267    let send = tx.send(result);
1268    tokio::pin!(send);
1269    tokio::select! {
1270        biased;
1271        result = &mut send => {
1272            if let Err(err) = result {
1273                if let Ok(stream) = err.0 {
1274                    let _ = stream
1275                        .close_with_error(zmux::ErrorCode::Cancelled.as_u64(), "")
1276                        .await;
1277                }
1278            }
1279        }
1280        _ = conn.closed() => {}
1281    }
1282}
1283
1284async fn publish_uni_accept_result(
1285    tx: &mpsc::Sender<Result<QuinnRecvStream>>,
1286    result: Result<QuinnRecvStream>,
1287    conn: &quinn::Connection,
1288) {
1289    let send = tx.send(result);
1290    tokio::pin!(send);
1291    tokio::select! {
1292        biased;
1293        result = &mut send => {
1294            if let Err(err) = result {
1295                if let Ok(stream) = err.0 {
1296                    let _ = stream
1297                        .close_with_error(zmux::ErrorCode::Cancelled.as_u64(), "")
1298                        .await;
1299                }
1300            }
1301        }
1302        _ = conn.closed() => {}
1303    }
1304}
1305
1306async fn read_accepted_metadata_with_timeout(
1307    recv: &mut quinn::RecvStream,
1308    timeout: Option<Duration>,
1309    bidirectional: bool,
1310) -> Result<AcceptedStreamMetadata> {
1311    let fut = read_accepted_metadata(recv);
1312    match timeout {
1313        Some(timeout) => tokio::time::timeout(timeout, fut).await.map_err(|_| {
1314            zmux::Error::timeout("accept").with_stream_context(
1315                zmux::ErrorOperation::Accept,
1316                accepted_direction(bidirectional),
1317            )
1318        })?,
1319        None => fut.await,
1320    }
1321}
1322
1323fn accepted_direction(bidirectional: bool) -> zmux::ErrorDirection {
1324    if bidirectional {
1325        zmux::ErrorDirection::Both
1326    } else {
1327        zmux::ErrorDirection::Read
1328    }
1329}
1330
1331async fn read_accepted_metadata(recv: &mut quinn::RecvStream) -> Result<AcceptedStreamMetadata> {
1332    read_stream_prelude_quinn(recv).await
1333}
1334
1335fn accepted_prelude_rejectable(err: &zmux::Error) -> bool {
1336    err.scope() == zmux::ErrorScope::Stream
1337        && (err.operation() == zmux::ErrorOperation::Accept
1338            || err.operation() == zmux::ErrorOperation::Read)
1339        && (err.is_timeout()
1340            || err.is_error_code(zmux::ErrorCode::Protocol)
1341            || matches!(
1342                err.termination_kind(),
1343                zmux::TerminationKind::Abort
1344                    | zmux::TerminationKind::Reset
1345                    | zmux::TerminationKind::Stopped
1346            ))
1347}
1348
1349async fn read_stream_prelude_quinn(recv: &mut quinn::RecvStream) -> Result<AcceptedStreamMetadata> {
1350    let first = read_byte(recv).await?;
1351    let prefix_len = match first >> 6 {
1352        0 => 1usize,
1353        1 => 2,
1354        2 => 4,
1355        _ => 8,
1356    };
1357    let mut prefix = [0u8; 8];
1358    prefix[0] = first;
1359    read_exact_quinn(recv, &mut prefix[1..prefix_len]).await?;
1360    let (metadata_len, _) = zmux::parse_varint(&prefix[..prefix_len])
1361        .map_err(|_| protocol_prelude_error("parse stream prelude length"))?;
1362    if metadata_len == 0 {
1363        return Ok(AcceptedStreamMetadata {
1364            metadata: StreamMetadata::default(),
1365            metadata_valid: true,
1366        });
1367    }
1368    let metadata_len = checked_prelude_metadata_len(metadata_len, prefix_len)?;
1369    let mut payload = vec![0u8; metadata_len];
1370    read_exact_quinn(recv, &mut payload).await?;
1371    let (metadata, metadata_valid) = parse_stream_metadata_bytes_view(&payload)
1372        .map_err(|_| protocol_prelude_error("malformed stream prelude metadata"))?;
1373    Ok(AcceptedStreamMetadata {
1374        metadata: metadata.try_to_owned()?,
1375        metadata_valid,
1376    })
1377}
1378
1379async fn read_byte(recv: &mut quinn::RecvStream) -> Result<u8> {
1380    let mut buf = [0u8; 1];
1381    read_exact_quinn(recv, &mut buf).await?;
1382    Ok(buf[0])
1383}
1384
1385async fn read_exact_quinn(recv: &mut quinn::RecvStream, mut dst: &mut [u8]) -> Result<()> {
1386    while !dst.is_empty() {
1387        let n = recv
1388            .read(dst)
1389            .await
1390            .map_err(translate_read_error)?
1391            .ok_or_else(|| protocol_prelude_error("unexpected EOF in stream prelude"))?;
1392        if n == 0 {
1393            return Err(protocol_prelude_error("zero-length read in stream prelude"));
1394        }
1395        if n > dst.len() {
1396            return Err(protocol_prelude_error(
1397                "stream prelude read reported invalid progress",
1398            ));
1399        }
1400        let (_, rest) = dst.split_at_mut(n);
1401        dst = rest;
1402    }
1403    Ok(())
1404}
1405
1406#[derive(Debug)]
1407struct PreludeState {
1408    send_prelude: bool,
1409    prelude_sent: bool,
1410    prelude: Bytes,
1411    prelude_offset: usize,
1412    metadata: StreamMetadata,
1413}
1414
1415impl PreludeState {
1416    fn local(opts: OpenOptions) -> Result<Self> {
1417        let prelude = build_stream_prelude(&opts)?;
1418        let (priority, group, open_info) = opts.into_parts();
1419        Ok(Self {
1420            send_prelude: true,
1421            prelude_sent: false,
1422            prelude: Bytes::from(prelude),
1423            prelude_offset: 0,
1424            metadata: StreamMetadata {
1425                priority,
1426                group,
1427                open_info,
1428            },
1429        })
1430    }
1431
1432    fn accepted(metadata: AcceptedStreamMetadata) -> Self {
1433        Self {
1434            send_prelude: false,
1435            prelude_sent: true,
1436            prelude: Bytes::new(),
1437            prelude_offset: 0,
1438            metadata: metadata.metadata,
1439        }
1440    }
1441
1442    fn has_peer_visible_open_metadata(&self) -> bool {
1443        self.metadata.priority.is_some()
1444            || self.metadata.group.is_some()
1445            || !self.metadata.open_info.is_empty()
1446    }
1447
1448    fn update_pre_open_metadata(&mut self, update: zmux::MetadataUpdate) -> Result<bool> {
1449        if self.prelude_sent {
1450            return Ok(false);
1451        }
1452        if update.priority.is_none() && update.group.is_none() {
1453            return Err(zmux::Error::local("zmux: metadata update has no fields"));
1454        }
1455        if let Some(priority) = update.priority {
1456            self.metadata.priority = Some(priority);
1457        }
1458        if let Some(group) = update.group {
1459            self.metadata.group = Some(group);
1460        }
1461        let mut opts = OpenOptions::new().open_info(&self.metadata.open_info);
1462        if let Some(priority) = self.metadata.priority {
1463            opts = opts.priority(priority);
1464        }
1465        if let Some(group) = self.metadata.group {
1466            opts = opts.group(group);
1467        }
1468        self.prelude = Bytes::from(build_stream_prelude(&opts)?);
1469        self.prelude_offset = 0;
1470        Ok(true)
1471    }
1472}
1473
1474fn prelude_open_info(prelude: &Mutex<PreludeState>) -> Vec<u8> {
1475    prelude.lock().unwrap().metadata.open_info.clone()
1476}
1477
1478fn append_prelude_open_info_to(prelude: &Mutex<PreludeState>, dst: &mut Vec<u8>) {
1479    dst.extend_from_slice(prelude.lock().unwrap().metadata.open_info());
1480}
1481
1482fn prelude_open_info_len(prelude: &Mutex<PreludeState>) -> usize {
1483    prelude.lock().unwrap().metadata.open_info.len()
1484}
1485
1486fn prelude_has_open_info(prelude: &Mutex<PreludeState>) -> bool {
1487    prelude_open_info_len(prelude) != 0
1488}
1489
1490pub struct QuinnStream {
1491    stream_id: u64,
1492    opened_locally: bool,
1493    local_addr: Option<SocketAddr>,
1494    peer_addr: Option<SocketAddr>,
1495    send: AsyncMutex<quinn::SendStream>,
1496    recv: AsyncMutex<quinn::RecvStream>,
1497    prelude: Mutex<PreludeState>,
1498    read_deadline: Mutex<Option<Instant>>,
1499    write_deadline: Mutex<Option<Instant>>,
1500    terminal: Mutex<TerminalErrors>,
1501    stats: Arc<AdapterStats>,
1502    active: Option<ActiveGuard>,
1503    read_closed: AtomicBool,
1504    write_closed: AtomicBool,
1505}
1506
1507impl QuinnStream {
1508    fn local(
1509        send: quinn::SendStream,
1510        recv: quinn::RecvStream,
1511        prelude: PreludeState,
1512        stats: Arc<AdapterStats>,
1513        local_addr: Option<SocketAddr>,
1514        peer_addr: Option<SocketAddr>,
1515    ) -> Self {
1516        let stream_id = quinn_stream_id(send.id());
1517        Self {
1518            stream_id,
1519            opened_locally: true,
1520            local_addr,
1521            peer_addr,
1522            send: AsyncMutex::new(send),
1523            recv: AsyncMutex::new(recv),
1524            prelude: Mutex::new(prelude),
1525            read_deadline: Mutex::new(None),
1526            write_deadline: Mutex::new(None),
1527            terminal: Mutex::new(TerminalErrors::default()),
1528            stats,
1529            active: None,
1530            read_closed: AtomicBool::new(false),
1531            write_closed: AtomicBool::new(false),
1532        }
1533    }
1534
1535    fn accepted(
1536        send: quinn::SendStream,
1537        recv: quinn::RecvStream,
1538        metadata: AcceptedStreamMetadata,
1539        stats: Arc<AdapterStats>,
1540        local_addr: Option<SocketAddr>,
1541        peer_addr: Option<SocketAddr>,
1542    ) -> Self {
1543        let stream_id = quinn_stream_id(send.id());
1544        Self {
1545            stream_id,
1546            opened_locally: false,
1547            local_addr,
1548            peer_addr,
1549            send: AsyncMutex::new(send),
1550            recv: AsyncMutex::new(recv),
1551            prelude: Mutex::new(PreludeState::accepted(metadata)),
1552            read_deadline: Mutex::new(None),
1553            write_deadline: Mutex::new(None),
1554            terminal: Mutex::new(TerminalErrors::default()),
1555            stats,
1556            active: None,
1557            read_closed: AtomicBool::new(false),
1558            write_closed: AtomicBool::new(false),
1559        }
1560    }
1561
1562    fn with_active(mut self, counters: Arc<ActiveCounters>, kind: ActiveKind) -> Self {
1563        self.active = Some(ActiveGuard::new(counters, kind));
1564        self
1565    }
1566
1567    fn mark_read_closed(&self) {
1568        self.mark_read_closed_with(None);
1569    }
1570
1571    fn mark_read_closed_with(&self, err: Option<zmux::Error>) {
1572        if self.read_closed.load(Ordering::Acquire) {
1573            return;
1574        }
1575        let first = {
1576            let mut terminal = self.terminal.lock().unwrap();
1577            if self.read_closed.load(Ordering::Acquire) {
1578                false
1579            } else {
1580                if let Some(err) = err {
1581                    terminal.read = Some(err);
1582                }
1583                self.read_closed.store(true, Ordering::Release);
1584                true
1585            }
1586        };
1587        if first {
1588            self.maybe_finish_active();
1589        }
1590    }
1591
1592    fn mark_write_closed(&self) {
1593        self.mark_write_closed_with(None);
1594    }
1595
1596    fn mark_write_closed_with(&self, err: Option<zmux::Error>) {
1597        if self.write_closed.load(Ordering::Acquire) {
1598            return;
1599        }
1600        let first = {
1601            let mut terminal = self.terminal.lock().unwrap();
1602            if self.write_closed.load(Ordering::Acquire) {
1603                false
1604            } else {
1605                if let Some(err) = err {
1606                    terminal.write = Some(err);
1607                }
1608                self.write_closed.store(true, Ordering::Release);
1609                true
1610            }
1611        };
1612        if first {
1613            self.maybe_finish_active();
1614        }
1615    }
1616
1617    fn maybe_finish_active(&self) {
1618        if self.read_closed.load(Ordering::Acquire) && self.write_closed.load(Ordering::Acquire) {
1619            if let Some(active) = &self.active {
1620                active.finish();
1621            }
1622        }
1623    }
1624
1625    pub fn stream_id(&self) -> u64 {
1626        self.stream_id
1627    }
1628
1629    pub fn is_opened_locally(&self) -> bool {
1630        self.opened_locally
1631    }
1632
1633    pub fn is_bidirectional(&self) -> bool {
1634        true
1635    }
1636
1637    pub fn is_read_closed(&self) -> bool {
1638        self.read_closed.load(Ordering::Acquire)
1639    }
1640
1641    pub fn is_write_closed(&self) -> bool {
1642        self.write_closed.load(Ordering::Acquire)
1643    }
1644
1645    pub fn metadata(&self) -> StreamMetadata {
1646        self.prelude.lock().unwrap().metadata.clone()
1647    }
1648
1649    pub fn open_info(&self) -> Vec<u8> {
1650        prelude_open_info(&self.prelude)
1651    }
1652
1653    pub fn append_open_info_to(&self, dst: &mut Vec<u8>) {
1654        append_prelude_open_info_to(&self.prelude, dst);
1655    }
1656
1657    pub fn open_info_len(&self) -> usize {
1658        prelude_open_info_len(&self.prelude)
1659    }
1660
1661    pub fn has_open_info(&self) -> bool {
1662        prelude_has_open_info(&self.prelude)
1663    }
1664
1665    pub fn local_addr(&self) -> Option<SocketAddr> {
1666        self.local_addr
1667    }
1668
1669    pub fn peer_addr(&self) -> Option<SocketAddr> {
1670        self.peer_addr
1671    }
1672
1673    pub fn set_read_deadline(&self, deadline: Option<Instant>) -> Result<()> {
1674        *self.read_deadline.lock().unwrap() = deadline;
1675        Ok(())
1676    }
1677
1678    pub fn set_write_deadline(&self, deadline: Option<Instant>) -> Result<()> {
1679        *self.write_deadline.lock().unwrap() = deadline;
1680        Ok(())
1681    }
1682
1683    pub fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
1684        self.set_read_deadline(deadline)?;
1685        self.set_write_deadline(deadline)
1686    }
1687
1688    pub fn set_read_timeout(&self, timeout: Option<Duration>) -> Result<()> {
1689        self.set_read_deadline(timeout_to_deadline(timeout))
1690    }
1691
1692    pub fn set_write_timeout(&self, timeout: Option<Duration>) -> Result<()> {
1693        self.set_write_deadline(timeout_to_deadline(timeout))
1694    }
1695
1696    pub fn set_timeout(&self, timeout: Option<Duration>) -> Result<()> {
1697        self.set_deadline(timeout_to_deadline(timeout))
1698    }
1699
1700    fn read_timeout_from_deadline(&self) -> Result<Option<Duration>> {
1701        timeout_until(*self.read_deadline.lock().unwrap(), "read")
1702    }
1703
1704    fn write_timeout_from_deadline(&self) -> Result<Option<Duration>> {
1705        timeout_until(*self.write_deadline.lock().unwrap(), "write")
1706    }
1707
1708    fn read_terminal_error(&self) -> zmux::Error {
1709        self.terminal
1710            .lock()
1711            .unwrap()
1712            .read
1713            .clone()
1714            .unwrap_or_else(local_read_closed_error)
1715    }
1716
1717    fn write_terminal_error(&self) -> zmux::Error {
1718        self.terminal
1719            .lock()
1720            .unwrap()
1721            .write
1722            .clone()
1723            .unwrap_or_else(local_write_closed_error)
1724    }
1725
1726    pub async fn update_metadata(&self, update: zmux::MetadataUpdate) -> Result<()> {
1727        let mut send = self.send.lock().await;
1728        let should_flush = {
1729            let mut state = self.prelude.lock().unwrap();
1730            match state.update_pre_open_metadata(update)? {
1731                true => true,
1732                false => return Err(priority_update_unavailable()),
1733            }
1734        };
1735        if should_flush {
1736            if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
1737                self.mark_write_closed_with(Some(err.clone()));
1738                return Err(err);
1739            }
1740        }
1741        Ok(())
1742    }
1743
1744    pub async fn read(&self, dst: &mut [u8]) -> Result<usize> {
1745        let timeout = self.read_timeout_from_deadline()?;
1746        with_optional_timeout(self.read_inner(dst), timeout, "read").await
1747    }
1748
1749    async fn read_inner(&self, dst: &mut [u8]) -> Result<usize> {
1750        if dst.is_empty() {
1751            return Ok(0);
1752        }
1753        if self.read_closed.load(Ordering::Acquire) {
1754            return Err(self.read_terminal_error());
1755        }
1756        let mut recv = self.recv.lock().await;
1757        match recv.read(dst).await {
1758            Ok(Some(n)) => {
1759                self.stats.note_data_read(n, Instant::now());
1760                Ok(n)
1761            }
1762            Ok(None) => {
1763                self.mark_read_closed();
1764                Ok(0)
1765            }
1766            Err(err) => {
1767                if let quinn::ReadError::Reset(code) = &err {
1768                    self.stats.note_reset_reason((*code).into_inner());
1769                }
1770                let err = translate_read_error(err);
1771                self.mark_read_closed_with(Some(err.clone()));
1772                Err(err)
1773            }
1774        }
1775    }
1776
1777    pub async fn read_timeout(&self, dst: &mut [u8], timeout: Duration) -> Result<usize> {
1778        with_timeout(self.read(dst), timeout, "read").await
1779    }
1780
1781    pub async fn read_exact(&self, dst: &mut [u8]) -> Result<()> {
1782        let timeout = self.read_timeout_from_deadline()?;
1783        with_optional_timeout(self.read_exact_inner(dst), timeout, "read").await
1784    }
1785
1786    async fn read_exact_inner(&self, mut dst: &mut [u8]) -> Result<()> {
1787        if dst.is_empty() {
1788            return Ok(());
1789        }
1790        if self.read_closed.load(Ordering::Acquire) {
1791            return Err(self.read_terminal_error());
1792        }
1793        let mut recv = self.recv.lock().await;
1794        while !dst.is_empty() {
1795            match recv.read(dst).await {
1796                Ok(Some(n)) => {
1797                    if n == 0 {
1798                        return Err(unexpected_eof_error());
1799                    }
1800                    self.stats.note_data_read(n, Instant::now());
1801                    let (_, rest) = dst.split_at_mut(n);
1802                    dst = rest;
1803                }
1804                Ok(None) => {
1805                    self.mark_read_closed();
1806                    return Err(unexpected_eof_error());
1807                }
1808                Err(err) => {
1809                    if let quinn::ReadError::Reset(code) = &err {
1810                        self.stats.note_reset_reason((*code).into_inner());
1811                    }
1812                    let err = translate_read_error(err);
1813                    self.mark_read_closed_with(Some(err.clone()));
1814                    return Err(err);
1815                }
1816            }
1817        }
1818        Ok(())
1819    }
1820
1821    pub async fn read_exact_timeout(&self, dst: &mut [u8], timeout: Duration) -> Result<()> {
1822        with_timeout(self.read_exact(dst), timeout, "read").await
1823    }
1824
1825    pub async fn read_vectored(&self, dsts: &mut [IoSliceMut<'_>]) -> Result<usize> {
1826        match dsts.iter_mut().find(|dst| !dst.is_empty()) {
1827            Some(dst) => self.read(dst).await,
1828            None => Ok(0),
1829        }
1830    }
1831
1832    pub async fn read_vectored_timeout(
1833        &self,
1834        dsts: &mut [IoSliceMut<'_>],
1835        timeout: Duration,
1836    ) -> Result<usize> {
1837        with_timeout(self.read_vectored(dsts), timeout, "read").await
1838    }
1839
1840    pub async fn write(&self, src: &[u8]) -> Result<usize> {
1841        let timeout = self.write_timeout_from_deadline()?;
1842        with_optional_timeout(self.write_inner(src), timeout, "write").await
1843    }
1844
1845    async fn write_inner(&self, src: &[u8]) -> Result<usize> {
1846        if src.is_empty() {
1847            return Ok(0);
1848        }
1849        if self.write_closed.load(Ordering::Acquire) {
1850            return Err(self.write_terminal_error());
1851        }
1852        let mut send = self.send.lock().await;
1853        if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
1854            self.mark_write_closed_with(Some(err.clone()));
1855            return Err(err);
1856        }
1857        match write_payload_once(&mut send, src, &self.stats).await {
1858            Ok(n) => Ok(n),
1859            Err(err) => {
1860                self.mark_write_closed_with(Some(err.clone()));
1861                Err(err)
1862            }
1863        }
1864    }
1865
1866    pub async fn write_timeout(&self, src: &[u8], timeout: Duration) -> Result<usize> {
1867        with_timeout(self.write(src), timeout, "write").await
1868    }
1869
1870    pub async fn write_all<'a>(&self, src: impl Into<WritePayload<'a>>) -> Result<()> {
1871        let timeout = self.write_timeout_from_deadline()?;
1872        with_optional_timeout(self.write_all_inner(src.into()), timeout, "write").await
1873    }
1874
1875    pub async fn write_all_timeout<'a>(
1876        &self,
1877        src: impl Into<WritePayload<'a>>,
1878        timeout: Duration,
1879    ) -> Result<()> {
1880        with_timeout(self.write_all(src), timeout, "write").await
1881    }
1882
1883    async fn write_all_inner(&self, payload: WritePayload<'_>) -> Result<()> {
1884        match payload {
1885            WritePayload::Bytes(data) => self.write_all_bytes_inner(data.as_ref()).await,
1886            WritePayload::Vectored(parts) => {
1887                for part in parts {
1888                    if !part.is_empty() {
1889                        self.write_all_bytes_inner(part.as_ref()).await?;
1890                    }
1891                }
1892                Ok(())
1893            }
1894        }
1895    }
1896
1897    async fn write_all_bytes_inner(&self, src: &[u8]) -> Result<()> {
1898        if src.is_empty() {
1899            return Ok(());
1900        }
1901        if self.write_closed.load(Ordering::Acquire) {
1902            return Err(self.write_terminal_error());
1903        }
1904        let mut send = self.send.lock().await;
1905        let result = async {
1906            ensure_open_prelude(&self.prelude, &mut send, &self.stats).await?;
1907            write_payload_all(&mut send, src, &self.stats).await
1908        }
1909        .await;
1910        if result.is_err() {
1911            self.mark_write_closed_with(result.as_ref().err().cloned());
1912        }
1913        result
1914    }
1915
1916    pub async fn write_vectored(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
1917        let timeout = self.write_timeout_from_deadline()?;
1918        with_optional_timeout(self.write_vectored_inner(parts), timeout, "write").await
1919    }
1920
1921    async fn write_vectored_inner(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
1922        let total = total_bytes(parts.iter().map(|part| part.len()))?;
1923        if total == 0 {
1924            return Ok(0);
1925        }
1926        if self.write_closed.load(Ordering::Acquire) {
1927            return Err(self.write_terminal_error());
1928        }
1929        let mut send = self.send.lock().await;
1930        let result = async {
1931            ensure_open_prelude(&self.prelude, &mut send, &self.stats).await?;
1932            write_io_slices_once(&mut send, parts, total, &self.stats).await
1933        }
1934        .await;
1935        if let Err(err) = &result {
1936            self.mark_write_closed_with(Some(err.clone()));
1937        }
1938        result
1939    }
1940
1941    pub async fn write_vectored_timeout(
1942        &self,
1943        parts: &[IoSlice<'_>],
1944        timeout: Duration,
1945    ) -> Result<usize> {
1946        with_timeout(self.write_vectored(parts), timeout, "write").await
1947    }
1948
1949    pub async fn write_final<'a>(&self, src: impl Into<WritePayload<'a>>) -> Result<usize> {
1950        let timeout = self.write_timeout_from_deadline()?;
1951        with_optional_timeout(self.write_final_inner(src.into()), timeout, "write").await
1952    }
1953
1954    async fn write_final_inner(&self, payload: WritePayload<'_>) -> Result<usize> {
1955        match payload {
1956            WritePayload::Bytes(data) => self.write_final_bytes_inner(data.as_ref()).await,
1957            WritePayload::Vectored(parts) => self.write_vectored_final_inner(parts).await,
1958        }
1959    }
1960
1961    async fn write_final_bytes_inner(&self, src: &[u8]) -> Result<usize> {
1962        if self.write_closed.load(Ordering::Acquire) {
1963            return Err(self.write_terminal_error());
1964        }
1965        let mut send = self.send.lock().await;
1966        match write_all_final(&self.prelude, &mut send, src, &self.stats).await {
1967            Ok(n) => {
1968                self.mark_write_closed();
1969                Ok(n)
1970            }
1971            Err(err) => {
1972                self.mark_write_closed_with(Some(err.clone()));
1973                Err(err)
1974            }
1975        }
1976    }
1977
1978    pub async fn write_final_timeout<'a>(
1979        &self,
1980        src: impl Into<WritePayload<'a>>,
1981        timeout: Duration,
1982    ) -> Result<usize> {
1983        with_timeout(self.write_final(src), timeout, "write").await
1984    }
1985
1986    pub async fn write_vectored_final(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
1987        let timeout = self.write_timeout_from_deadline()?;
1988        with_optional_timeout(self.write_vectored_final_inner(parts), timeout, "write").await
1989    }
1990
1991    async fn write_vectored_final_inner(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
1992        if self.write_closed.load(Ordering::Acquire) {
1993            return Err(self.write_terminal_error());
1994        }
1995        let mut send = self.send.lock().await;
1996        match write_io_slices_final(&self.prelude, &mut send, parts, &self.stats).await {
1997            Ok(n) => {
1998                self.mark_write_closed();
1999                Ok(n)
2000            }
2001            Err(err) => {
2002                self.mark_write_closed_with(Some(err.clone()));
2003                Err(err)
2004            }
2005        }
2006    }
2007
2008    pub async fn write_vectored_final_timeout(
2009        &self,
2010        parts: &[IoSlice<'_>],
2011        timeout: Duration,
2012    ) -> Result<usize> {
2013        with_timeout(self.write_vectored_final(parts), timeout, "write").await
2014    }
2015
2016    pub async fn close_read(&self) -> Result<()> {
2017        self.cancel_read(zmux::ErrorCode::Cancelled.as_u64()).await
2018    }
2019
2020    pub async fn cancel_read(&self, code: u64) -> Result<()> {
2021        if self.read_closed.load(Ordering::Acquire) {
2022            return Err(self.read_terminal_error());
2023        }
2024        let code =
2025            checked_quinn_varint(code, zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)?;
2026        self.cancel_read_varint(code).await
2027    }
2028
2029    async fn cancel_read_varint(&self, code: quinn::VarInt) -> Result<()> {
2030        let mut send = self.send.lock().await;
2031        if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2032            self.mark_write_closed_with(Some(err.clone()));
2033            return Err(err);
2034        }
2035        drop(send);
2036        let mut recv = self.recv.lock().await;
2037        let result = recv.stop(code).map_err(translate_read_closed_stream);
2038        drop(recv);
2039        if result.is_ok() {
2040            self.stats.note_control_progress();
2041        }
2042        self.mark_read_closed();
2043        result
2044    }
2045
2046    pub async fn close_write(&self) -> Result<()> {
2047        let timeout = self.write_timeout_from_deadline()?;
2048        with_optional_timeout(self.close_write_inner(), timeout, "write").await
2049    }
2050
2051    async fn close_write_inner(&self) -> Result<()> {
2052        if self.write_closed.load(Ordering::Acquire) {
2053            return Err(self.write_terminal_error());
2054        }
2055        let mut send = self.send.lock().await;
2056        if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2057            self.mark_write_closed_with(Some(err.clone()));
2058            return Err(err);
2059        }
2060        let result = finish_send(&mut send, &self.stats).await;
2061        drop(send);
2062        self.mark_write_closed_with(result.as_ref().err().cloned());
2063        result
2064    }
2065
2066    pub async fn cancel_write(&self, code: u64) -> Result<()> {
2067        if self.write_closed.load(Ordering::Acquire) {
2068            return Err(self.write_terminal_error());
2069        }
2070        let code = checked_quinn_varint(
2071            code,
2072            zmux::ErrorOperation::Write,
2073            zmux::ErrorDirection::Write,
2074        )?;
2075        self.cancel_write_varint(code).await
2076    }
2077
2078    async fn cancel_write_varint(&self, code: quinn::VarInt) -> Result<()> {
2079        if self.write_closed.load(Ordering::Acquire) {
2080            return Err(self.write_terminal_error());
2081        }
2082        let mut send = self.send.lock().await;
2083        let result = send.reset(code).map_err(translate_write_closed_stream);
2084        drop(send);
2085        if result.is_ok() {
2086            self.stats.note_control_progress();
2087            self.stats.note_reset_reason(code.into_inner());
2088        }
2089        self.mark_write_closed_with(Some(local_stream_application_error(
2090            code.into_inner(),
2091            "",
2092            zmux::ErrorOperation::Write,
2093            zmux::ErrorDirection::Write,
2094            zmux::TerminationKind::Reset,
2095        )));
2096        result
2097    }
2098
2099    pub async fn close_with_error(&self, code: u64, reason: &str) -> Result<()> {
2100        if self.read_closed.load(Ordering::Acquire) && self.write_closed.load(Ordering::Acquire) {
2101            return Err(self.write_terminal_error());
2102        }
2103        let code = checked_quinn_varint(
2104            code,
2105            zmux::ErrorOperation::Close,
2106            zmux::ErrorDirection::Both,
2107        )?;
2108        let terminal = local_stream_application_error(
2109            code.into_inner(),
2110            reason,
2111            zmux::ErrorOperation::Close,
2112            zmux::ErrorDirection::Both,
2113            zmux::TerminationKind::Abort,
2114        );
2115        {
2116            let mut recv = self.recv.lock().await;
2117            let _ = recv.stop(code);
2118        }
2119        {
2120            let mut send = self.send.lock().await;
2121            let _ = send.reset(code);
2122        }
2123        self.stats.note_control_progress();
2124        self.stats.note_abort_reason(code.into_inner());
2125        self.mark_read_closed_with(Some(terminal.clone()));
2126        self.mark_write_closed_with(Some(terminal));
2127        Ok(())
2128    }
2129
2130    pub async fn close(&self) -> Result<()> {
2131        let write = if self.write_closed.load(Ordering::Acquire) {
2132            Ok(())
2133        } else {
2134            self.close_write().await
2135        };
2136        let read = if self.read_closed.load(Ordering::Acquire) {
2137            Ok(())
2138        } else {
2139            self.close_read().await
2140        };
2141        write.and(read)
2142    }
2143
2144    async fn maybe_send_open_prelude_on_open(&self) -> Result<()> {
2145        if !self
2146            .prelude
2147            .lock()
2148            .unwrap()
2149            .has_peer_visible_open_metadata()
2150        {
2151            return Ok(());
2152        }
2153        let mut send = self.send.lock().await;
2154        ensure_open_prelude(&self.prelude, &mut send, &self.stats).await
2155    }
2156
2157    async fn discard_after_open_error(&self, err: &zmux::Error) {
2158        let code = open_error_cleanup_code(err);
2159        {
2160            let mut recv = self.recv.lock().await;
2161            let _ = recv.stop(code);
2162        }
2163        {
2164            let mut send = self.send.lock().await;
2165            let _ = send.reset(code);
2166        }
2167        self.mark_read_closed();
2168        self.mark_write_closed();
2169    }
2170}
2171
2172pub struct QuinnSendStream {
2173    stream_id: u64,
2174    local_addr: Option<SocketAddr>,
2175    peer_addr: Option<SocketAddr>,
2176    send: AsyncMutex<quinn::SendStream>,
2177    prelude: Mutex<PreludeState>,
2178    write_deadline: Mutex<Option<Instant>>,
2179    terminal: Mutex<TerminalErrors>,
2180    stats: Arc<AdapterStats>,
2181    active: Option<ActiveGuard>,
2182    write_closed: AtomicBool,
2183}
2184
2185impl QuinnSendStream {
2186    fn local(
2187        send: quinn::SendStream,
2188        prelude: PreludeState,
2189        stats: Arc<AdapterStats>,
2190        local_addr: Option<SocketAddr>,
2191        peer_addr: Option<SocketAddr>,
2192    ) -> Self {
2193        let stream_id = quinn_stream_id(send.id());
2194        Self {
2195            stream_id,
2196            local_addr,
2197            peer_addr,
2198            send: AsyncMutex::new(send),
2199            prelude: Mutex::new(prelude),
2200            write_deadline: Mutex::new(None),
2201            terminal: Mutex::new(TerminalErrors::default()),
2202            stats,
2203            active: None,
2204            write_closed: AtomicBool::new(false),
2205        }
2206    }
2207
2208    fn with_active(mut self, counters: Arc<ActiveCounters>, kind: ActiveKind) -> Self {
2209        self.active = Some(ActiveGuard::new(counters, kind));
2210        self
2211    }
2212
2213    fn mark_write_closed(&self) {
2214        self.mark_write_closed_with(None);
2215    }
2216
2217    fn mark_write_closed_with(&self, err: Option<zmux::Error>) {
2218        if self.write_closed.load(Ordering::Acquire) {
2219            return;
2220        }
2221        let first = {
2222            let mut terminal = self.terminal.lock().unwrap();
2223            if self.write_closed.load(Ordering::Acquire) {
2224                false
2225            } else {
2226                if let Some(err) = err {
2227                    terminal.write = Some(err);
2228                }
2229                self.write_closed.store(true, Ordering::Release);
2230                true
2231            }
2232        };
2233        if first {
2234            if let Some(active) = &self.active {
2235                active.finish();
2236            }
2237        }
2238    }
2239
2240    pub fn stream_id(&self) -> u64 {
2241        self.stream_id
2242    }
2243
2244    pub fn is_opened_locally(&self) -> bool {
2245        true
2246    }
2247
2248    pub fn is_bidirectional(&self) -> bool {
2249        false
2250    }
2251
2252    pub fn is_write_closed(&self) -> bool {
2253        self.write_closed.load(Ordering::Acquire)
2254    }
2255
2256    pub fn metadata(&self) -> StreamMetadata {
2257        self.prelude.lock().unwrap().metadata.clone()
2258    }
2259
2260    pub fn open_info(&self) -> Vec<u8> {
2261        prelude_open_info(&self.prelude)
2262    }
2263
2264    pub fn append_open_info_to(&self, dst: &mut Vec<u8>) {
2265        append_prelude_open_info_to(&self.prelude, dst);
2266    }
2267
2268    pub fn open_info_len(&self) -> usize {
2269        prelude_open_info_len(&self.prelude)
2270    }
2271
2272    pub fn has_open_info(&self) -> bool {
2273        prelude_has_open_info(&self.prelude)
2274    }
2275
2276    pub fn local_addr(&self) -> Option<SocketAddr> {
2277        self.local_addr
2278    }
2279
2280    pub fn peer_addr(&self) -> Option<SocketAddr> {
2281        self.peer_addr
2282    }
2283
2284    pub fn set_write_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2285        *self.write_deadline.lock().unwrap() = deadline;
2286        Ok(())
2287    }
2288
2289    pub fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2290        self.set_write_deadline(deadline)
2291    }
2292
2293    pub fn set_write_timeout(&self, timeout: Option<Duration>) -> Result<()> {
2294        self.set_write_deadline(timeout_to_deadline(timeout))
2295    }
2296
2297    pub fn set_timeout(&self, timeout: Option<Duration>) -> Result<()> {
2298        self.set_deadline(timeout_to_deadline(timeout))
2299    }
2300
2301    fn write_timeout_from_deadline(&self) -> Result<Option<Duration>> {
2302        timeout_until(*self.write_deadline.lock().unwrap(), "write")
2303    }
2304
2305    fn write_terminal_error(&self) -> zmux::Error {
2306        self.terminal
2307            .lock()
2308            .unwrap()
2309            .write
2310            .clone()
2311            .unwrap_or_else(local_write_closed_error)
2312    }
2313
2314    pub async fn update_metadata(&self, update: zmux::MetadataUpdate) -> Result<()> {
2315        let mut send = self.send.lock().await;
2316        let should_flush = {
2317            let mut state = self.prelude.lock().unwrap();
2318            match state.update_pre_open_metadata(update)? {
2319                true => true,
2320                false => return Err(priority_update_unavailable()),
2321            }
2322        };
2323        if should_flush {
2324            if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2325                self.mark_write_closed_with(Some(err.clone()));
2326                return Err(err);
2327            }
2328        }
2329        Ok(())
2330    }
2331
2332    pub async fn write(&self, src: &[u8]) -> Result<usize> {
2333        let timeout = self.write_timeout_from_deadline()?;
2334        with_optional_timeout(self.write_inner(src), timeout, "write").await
2335    }
2336
2337    async fn write_inner(&self, src: &[u8]) -> Result<usize> {
2338        if src.is_empty() {
2339            return Ok(0);
2340        }
2341        if self.write_closed.load(Ordering::Acquire) {
2342            return Err(self.write_terminal_error());
2343        }
2344        let mut send = self.send.lock().await;
2345        if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2346            self.mark_write_closed_with(Some(err.clone()));
2347            return Err(err);
2348        }
2349        match write_payload_once(&mut send, src, &self.stats).await {
2350            Ok(n) => Ok(n),
2351            Err(err) => {
2352                self.mark_write_closed_with(Some(err.clone()));
2353                Err(err)
2354            }
2355        }
2356    }
2357
2358    pub async fn write_timeout(&self, src: &[u8], timeout: Duration) -> Result<usize> {
2359        with_timeout(self.write(src), timeout, "write").await
2360    }
2361
2362    pub async fn write_all<'a>(&self, src: impl Into<WritePayload<'a>>) -> Result<()> {
2363        let timeout = self.write_timeout_from_deadline()?;
2364        with_optional_timeout(self.write_all_inner(src.into()), timeout, "write").await
2365    }
2366
2367    pub async fn write_all_timeout<'a>(
2368        &self,
2369        src: impl Into<WritePayload<'a>>,
2370        timeout: Duration,
2371    ) -> Result<()> {
2372        with_timeout(self.write_all(src), timeout, "write").await
2373    }
2374
2375    async fn write_all_inner(&self, payload: WritePayload<'_>) -> Result<()> {
2376        match payload {
2377            WritePayload::Bytes(data) => self.write_all_bytes_inner(data.as_ref()).await,
2378            WritePayload::Vectored(parts) => {
2379                for part in parts {
2380                    if !part.is_empty() {
2381                        self.write_all_bytes_inner(part.as_ref()).await?;
2382                    }
2383                }
2384                Ok(())
2385            }
2386        }
2387    }
2388
2389    async fn write_all_bytes_inner(&self, src: &[u8]) -> Result<()> {
2390        if src.is_empty() {
2391            return Ok(());
2392        }
2393        if self.write_closed.load(Ordering::Acquire) {
2394            return Err(self.write_terminal_error());
2395        }
2396        let mut send = self.send.lock().await;
2397        let result = async {
2398            ensure_open_prelude(&self.prelude, &mut send, &self.stats).await?;
2399            write_payload_all(&mut send, src, &self.stats).await
2400        }
2401        .await;
2402        if result.is_err() {
2403            self.mark_write_closed_with(result.as_ref().err().cloned());
2404        }
2405        result
2406    }
2407
2408    pub async fn write_vectored(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
2409        let timeout = self.write_timeout_from_deadline()?;
2410        with_optional_timeout(self.write_vectored_inner(parts), timeout, "write").await
2411    }
2412
2413    async fn write_vectored_inner(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
2414        let total = total_bytes(parts.iter().map(|part| part.len()))?;
2415        if total == 0 {
2416            return Ok(0);
2417        }
2418        if self.write_closed.load(Ordering::Acquire) {
2419            return Err(self.write_terminal_error());
2420        }
2421        let mut send = self.send.lock().await;
2422        let result = async {
2423            ensure_open_prelude(&self.prelude, &mut send, &self.stats).await?;
2424            write_io_slices_once(&mut send, parts, total, &self.stats).await
2425        }
2426        .await;
2427        if let Err(err) = &result {
2428            self.mark_write_closed_with(Some(err.clone()));
2429        }
2430        result
2431    }
2432
2433    pub async fn write_vectored_timeout(
2434        &self,
2435        parts: &[IoSlice<'_>],
2436        timeout: Duration,
2437    ) -> Result<usize> {
2438        with_timeout(self.write_vectored(parts), timeout, "write").await
2439    }
2440
2441    pub async fn write_final<'a>(&self, src: impl Into<WritePayload<'a>>) -> Result<usize> {
2442        let timeout = self.write_timeout_from_deadline()?;
2443        with_optional_timeout(self.write_final_inner(src.into()), timeout, "write").await
2444    }
2445
2446    async fn write_final_inner(&self, payload: WritePayload<'_>) -> Result<usize> {
2447        match payload {
2448            WritePayload::Bytes(data) => self.write_final_bytes_inner(data.as_ref()).await,
2449            WritePayload::Vectored(parts) => self.write_vectored_final_inner(parts).await,
2450        }
2451    }
2452
2453    async fn write_final_bytes_inner(&self, src: &[u8]) -> Result<usize> {
2454        if self.write_closed.load(Ordering::Acquire) {
2455            return Err(self.write_terminal_error());
2456        }
2457        let mut send = self.send.lock().await;
2458        match write_all_final(&self.prelude, &mut send, src, &self.stats).await {
2459            Ok(n) => {
2460                self.mark_write_closed();
2461                Ok(n)
2462            }
2463            Err(err) => {
2464                self.mark_write_closed_with(Some(err.clone()));
2465                Err(err)
2466            }
2467        }
2468    }
2469
2470    pub async fn write_final_timeout<'a>(
2471        &self,
2472        src: impl Into<WritePayload<'a>>,
2473        timeout: Duration,
2474    ) -> Result<usize> {
2475        with_timeout(self.write_final(src), timeout, "write").await
2476    }
2477
2478    pub async fn write_vectored_final(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
2479        let timeout = self.write_timeout_from_deadline()?;
2480        with_optional_timeout(self.write_vectored_final_inner(parts), timeout, "write").await
2481    }
2482
2483    async fn write_vectored_final_inner(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
2484        if self.write_closed.load(Ordering::Acquire) {
2485            return Err(self.write_terminal_error());
2486        }
2487        let mut send = self.send.lock().await;
2488        match write_io_slices_final(&self.prelude, &mut send, parts, &self.stats).await {
2489            Ok(n) => {
2490                self.mark_write_closed();
2491                Ok(n)
2492            }
2493            Err(err) => {
2494                self.mark_write_closed_with(Some(err.clone()));
2495                Err(err)
2496            }
2497        }
2498    }
2499
2500    pub async fn write_vectored_final_timeout(
2501        &self,
2502        parts: &[IoSlice<'_>],
2503        timeout: Duration,
2504    ) -> Result<usize> {
2505        with_timeout(self.write_vectored_final(parts), timeout, "write").await
2506    }
2507
2508    pub async fn close_write(&self) -> Result<()> {
2509        let timeout = self.write_timeout_from_deadline()?;
2510        with_optional_timeout(self.close_write_inner(), timeout, "write").await
2511    }
2512
2513    async fn close_write_inner(&self) -> Result<()> {
2514        if self.write_closed.load(Ordering::Acquire) {
2515            return Err(self.write_terminal_error());
2516        }
2517        let mut send = self.send.lock().await;
2518        if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2519            self.mark_write_closed_with(Some(err.clone()));
2520            return Err(err);
2521        }
2522        let result = finish_send(&mut send, &self.stats).await;
2523        drop(send);
2524        self.mark_write_closed_with(result.as_ref().err().cloned());
2525        result
2526    }
2527
2528    pub async fn cancel_write(&self, code: u64) -> Result<()> {
2529        if self.write_closed.load(Ordering::Acquire) {
2530            return Err(self.write_terminal_error());
2531        }
2532        let code = checked_quinn_varint(
2533            code,
2534            zmux::ErrorOperation::Write,
2535            zmux::ErrorDirection::Write,
2536        )?;
2537        self.cancel_write_varint(code).await
2538    }
2539
2540    async fn cancel_write_varint(&self, code: quinn::VarInt) -> Result<()> {
2541        if self.write_closed.load(Ordering::Acquire) {
2542            return Err(self.write_terminal_error());
2543        }
2544        let mut send = self.send.lock().await;
2545        let result = send.reset(code).map_err(translate_write_closed_stream);
2546        drop(send);
2547        if result.is_ok() {
2548            self.stats.note_control_progress();
2549            self.stats.note_reset_reason(code.into_inner());
2550        }
2551        self.mark_write_closed_with(Some(local_stream_application_error(
2552            code.into_inner(),
2553            "",
2554            zmux::ErrorOperation::Write,
2555            zmux::ErrorDirection::Write,
2556            zmux::TerminationKind::Reset,
2557        )));
2558        result
2559    }
2560
2561    pub async fn close_with_error(&self, code: u64, reason: &str) -> Result<()> {
2562        if self.write_closed.load(Ordering::Acquire) {
2563            return Err(self.write_terminal_error());
2564        }
2565        let code = checked_quinn_varint(
2566            code,
2567            zmux::ErrorOperation::Close,
2568            zmux::ErrorDirection::Write,
2569        )?;
2570        let terminal = local_stream_application_error(
2571            code.into_inner(),
2572            reason,
2573            zmux::ErrorOperation::Close,
2574            zmux::ErrorDirection::Write,
2575            zmux::TerminationKind::Abort,
2576        );
2577        let mut send = self.send.lock().await;
2578        let _ = send.reset(code);
2579        drop(send);
2580        self.stats.note_control_progress();
2581        self.stats.note_abort_reason(code.into_inner());
2582        self.mark_write_closed_with(Some(terminal));
2583        Ok(())
2584    }
2585
2586    pub async fn close(&self) -> Result<()> {
2587        if self.write_closed.load(Ordering::Acquire) {
2588            Ok(())
2589        } else {
2590            self.close_write().await
2591        }
2592    }
2593
2594    async fn maybe_send_open_prelude_on_open(&self) -> Result<()> {
2595        if !self
2596            .prelude
2597            .lock()
2598            .unwrap()
2599            .has_peer_visible_open_metadata()
2600        {
2601            return Ok(());
2602        }
2603        let mut send = self.send.lock().await;
2604        ensure_open_prelude(&self.prelude, &mut send, &self.stats).await
2605    }
2606
2607    async fn discard_after_open_error(&self, err: &zmux::Error) {
2608        let code = open_error_cleanup_code(err);
2609        let mut send = self.send.lock().await;
2610        let _ = send.reset(code);
2611        drop(send);
2612        self.mark_write_closed();
2613    }
2614}
2615
2616pub struct QuinnRecvStream {
2617    stream_id: u64,
2618    local_addr: Option<SocketAddr>,
2619    peer_addr: Option<SocketAddr>,
2620    recv: AsyncMutex<quinn::RecvStream>,
2621    prelude: Mutex<PreludeState>,
2622    read_deadline: Mutex<Option<Instant>>,
2623    terminal: Mutex<TerminalErrors>,
2624    stats: Arc<AdapterStats>,
2625    active: Option<ActiveGuard>,
2626    read_closed: AtomicBool,
2627}
2628
2629impl QuinnRecvStream {
2630    fn accepted(
2631        recv: quinn::RecvStream,
2632        metadata: AcceptedStreamMetadata,
2633        stats: Arc<AdapterStats>,
2634        local_addr: Option<SocketAddr>,
2635        peer_addr: Option<SocketAddr>,
2636    ) -> Self {
2637        let stream_id = quinn_stream_id(recv.id());
2638        Self {
2639            stream_id,
2640            local_addr,
2641            peer_addr,
2642            recv: AsyncMutex::new(recv),
2643            prelude: Mutex::new(PreludeState::accepted(metadata)),
2644            read_deadline: Mutex::new(None),
2645            terminal: Mutex::new(TerminalErrors::default()),
2646            stats,
2647            active: None,
2648            read_closed: AtomicBool::new(false),
2649        }
2650    }
2651
2652    fn with_active(mut self, counters: Arc<ActiveCounters>, kind: ActiveKind) -> Self {
2653        self.active = Some(ActiveGuard::new(counters, kind));
2654        self
2655    }
2656
2657    fn mark_read_closed(&self) {
2658        self.mark_read_closed_with(None);
2659    }
2660
2661    fn mark_read_closed_with(&self, err: Option<zmux::Error>) {
2662        if self.read_closed.load(Ordering::Acquire) {
2663            return;
2664        }
2665        let first = {
2666            let mut terminal = self.terminal.lock().unwrap();
2667            if self.read_closed.load(Ordering::Acquire) {
2668                false
2669            } else {
2670                if let Some(err) = err {
2671                    terminal.read = Some(err);
2672                }
2673                self.read_closed.store(true, Ordering::Release);
2674                true
2675            }
2676        };
2677        if first {
2678            if let Some(active) = &self.active {
2679                active.finish();
2680            }
2681        }
2682    }
2683
2684    pub fn stream_id(&self) -> u64 {
2685        self.stream_id
2686    }
2687
2688    pub fn is_opened_locally(&self) -> bool {
2689        false
2690    }
2691
2692    pub fn is_bidirectional(&self) -> bool {
2693        false
2694    }
2695
2696    pub fn is_read_closed(&self) -> bool {
2697        self.read_closed.load(Ordering::Acquire)
2698    }
2699
2700    pub fn metadata(&self) -> StreamMetadata {
2701        self.prelude.lock().unwrap().metadata.clone()
2702    }
2703
2704    pub fn open_info(&self) -> Vec<u8> {
2705        prelude_open_info(&self.prelude)
2706    }
2707
2708    pub fn append_open_info_to(&self, dst: &mut Vec<u8>) {
2709        append_prelude_open_info_to(&self.prelude, dst);
2710    }
2711
2712    pub fn open_info_len(&self) -> usize {
2713        prelude_open_info_len(&self.prelude)
2714    }
2715
2716    pub fn has_open_info(&self) -> bool {
2717        prelude_has_open_info(&self.prelude)
2718    }
2719
2720    pub fn local_addr(&self) -> Option<SocketAddr> {
2721        self.local_addr
2722    }
2723
2724    pub fn peer_addr(&self) -> Option<SocketAddr> {
2725        self.peer_addr
2726    }
2727
2728    pub fn set_read_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2729        *self.read_deadline.lock().unwrap() = deadline;
2730        Ok(())
2731    }
2732
2733    pub fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2734        self.set_read_deadline(deadline)
2735    }
2736
2737    pub fn set_read_timeout(&self, timeout: Option<Duration>) -> Result<()> {
2738        self.set_read_deadline(timeout_to_deadline(timeout))
2739    }
2740
2741    pub fn set_timeout(&self, timeout: Option<Duration>) -> Result<()> {
2742        self.set_deadline(timeout_to_deadline(timeout))
2743    }
2744
2745    fn read_timeout_from_deadline(&self) -> Result<Option<Duration>> {
2746        timeout_until(*self.read_deadline.lock().unwrap(), "read")
2747    }
2748
2749    fn read_terminal_error(&self) -> zmux::Error {
2750        self.terminal
2751            .lock()
2752            .unwrap()
2753            .read
2754            .clone()
2755            .unwrap_or_else(local_read_closed_error)
2756    }
2757
2758    pub async fn read(&self, dst: &mut [u8]) -> Result<usize> {
2759        let timeout = self.read_timeout_from_deadline()?;
2760        with_optional_timeout(self.read_inner(dst), timeout, "read").await
2761    }
2762
2763    async fn read_inner(&self, dst: &mut [u8]) -> Result<usize> {
2764        if dst.is_empty() {
2765            return Ok(0);
2766        }
2767        if self.read_closed.load(Ordering::Acquire) {
2768            return Err(self.read_terminal_error());
2769        }
2770        let mut recv = self.recv.lock().await;
2771        match recv.read(dst).await {
2772            Ok(Some(n)) => {
2773                self.stats.note_data_read(n, Instant::now());
2774                Ok(n)
2775            }
2776            Ok(None) => {
2777                self.mark_read_closed();
2778                Ok(0)
2779            }
2780            Err(err) => {
2781                if let quinn::ReadError::Reset(code) = &err {
2782                    self.stats.note_reset_reason((*code).into_inner());
2783                }
2784                let err = translate_read_error(err);
2785                self.mark_read_closed_with(Some(err.clone()));
2786                Err(err)
2787            }
2788        }
2789    }
2790
2791    pub async fn read_timeout(&self, dst: &mut [u8], timeout: Duration) -> Result<usize> {
2792        with_timeout(self.read(dst), timeout, "read").await
2793    }
2794
2795    pub async fn read_exact(&self, dst: &mut [u8]) -> Result<()> {
2796        let timeout = self.read_timeout_from_deadline()?;
2797        with_optional_timeout(self.read_exact_inner(dst), timeout, "read").await
2798    }
2799
2800    async fn read_exact_inner(&self, mut dst: &mut [u8]) -> Result<()> {
2801        if dst.is_empty() {
2802            return Ok(());
2803        }
2804        if self.read_closed.load(Ordering::Acquire) {
2805            return Err(self.read_terminal_error());
2806        }
2807        let mut recv = self.recv.lock().await;
2808        while !dst.is_empty() {
2809            match recv.read(dst).await {
2810                Ok(Some(n)) => {
2811                    if n == 0 {
2812                        return Err(unexpected_eof_error());
2813                    }
2814                    self.stats.note_data_read(n, Instant::now());
2815                    let (_, rest) = dst.split_at_mut(n);
2816                    dst = rest;
2817                }
2818                Ok(None) => {
2819                    self.mark_read_closed();
2820                    return Err(unexpected_eof_error());
2821                }
2822                Err(err) => {
2823                    if let quinn::ReadError::Reset(code) = &err {
2824                        self.stats.note_reset_reason((*code).into_inner());
2825                    }
2826                    let err = translate_read_error(err);
2827                    self.mark_read_closed_with(Some(err.clone()));
2828                    return Err(err);
2829                }
2830            }
2831        }
2832        Ok(())
2833    }
2834
2835    pub async fn read_exact_timeout(&self, dst: &mut [u8], timeout: Duration) -> Result<()> {
2836        with_timeout(self.read_exact(dst), timeout, "read").await
2837    }
2838
2839    pub async fn read_vectored(&self, dsts: &mut [IoSliceMut<'_>]) -> Result<usize> {
2840        match dsts.iter_mut().find(|dst| !dst.is_empty()) {
2841            Some(dst) => self.read(dst).await,
2842            None => Ok(0),
2843        }
2844    }
2845
2846    pub async fn read_vectored_timeout(
2847        &self,
2848        dsts: &mut [IoSliceMut<'_>],
2849        timeout: Duration,
2850    ) -> Result<usize> {
2851        with_timeout(self.read_vectored(dsts), timeout, "read").await
2852    }
2853
2854    pub async fn close_read(&self) -> Result<()> {
2855        self.cancel_read(zmux::ErrorCode::Cancelled.as_u64()).await
2856    }
2857
2858    pub async fn cancel_read(&self, code: u64) -> Result<()> {
2859        if self.read_closed.load(Ordering::Acquire) {
2860            return Err(self.read_terminal_error());
2861        }
2862        let code =
2863            checked_quinn_varint(code, zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)?;
2864        self.cancel_read_varint(code).await
2865    }
2866
2867    async fn cancel_read_varint(&self, code: quinn::VarInt) -> Result<()> {
2868        let mut recv = self.recv.lock().await;
2869        let result = recv.stop(code).map_err(translate_read_closed_stream);
2870        drop(recv);
2871        if result.is_ok() {
2872            self.stats.note_control_progress();
2873        }
2874        self.mark_read_closed();
2875        result
2876    }
2877
2878    pub async fn close_with_error(&self, code: u64, reason: &str) -> Result<()> {
2879        if self.read_closed.load(Ordering::Acquire) {
2880            return Err(self.read_terminal_error());
2881        }
2882        let code = checked_quinn_varint(
2883            code,
2884            zmux::ErrorOperation::Close,
2885            zmux::ErrorDirection::Read,
2886        )?;
2887        let terminal = local_stream_application_error(
2888            code.into_inner(),
2889            reason,
2890            zmux::ErrorOperation::Close,
2891            zmux::ErrorDirection::Read,
2892            zmux::TerminationKind::Abort,
2893        );
2894        let mut recv = self.recv.lock().await;
2895        let _ = recv.stop(code);
2896        drop(recv);
2897        self.stats.note_control_progress();
2898        self.stats.note_abort_reason(code.into_inner());
2899        self.mark_read_closed_with(Some(terminal));
2900        Ok(())
2901    }
2902
2903    pub async fn close(&self) -> Result<()> {
2904        if self.read_closed.load(Ordering::Acquire) {
2905            Ok(())
2906        } else {
2907            self.close_read().await
2908        }
2909    }
2910}
2911
2912impl AsyncStreamHandle for QuinnStream {
2913    fn stream_id(&self) -> u64 {
2914        QuinnStream::stream_id(self)
2915    }
2916
2917    fn is_opened_locally(&self) -> bool {
2918        QuinnStream::is_opened_locally(self)
2919    }
2920
2921    fn is_bidirectional(&self) -> bool {
2922        QuinnStream::is_bidirectional(self)
2923    }
2924
2925    fn open_info_len(&self) -> usize {
2926        QuinnStream::open_info_len(self)
2927    }
2928
2929    fn has_open_info(&self) -> bool {
2930        QuinnStream::has_open_info(self)
2931    }
2932
2933    fn append_open_info_to(&self, dst: &mut Vec<u8>) {
2934        QuinnStream::append_open_info_to(self, dst)
2935    }
2936
2937    fn open_info(&self) -> Vec<u8> {
2938        QuinnStream::open_info(self)
2939    }
2940
2941    fn metadata(&self) -> StreamMetadata {
2942        QuinnStream::metadata(self)
2943    }
2944
2945    fn local_addr(&self) -> Option<SocketAddr> {
2946        QuinnStream::local_addr(self)
2947    }
2948
2949    fn peer_addr(&self) -> Option<SocketAddr> {
2950        QuinnStream::peer_addr(self)
2951    }
2952
2953    fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2954        QuinnStream::set_deadline(self, deadline)
2955    }
2956
2957    fn close(&self) -> AsyncBoxFuture<'_, Result<()>> {
2958        Box::pin(async move { QuinnStream::close(self).await })
2959    }
2960
2961    fn close_with_error<'a>(
2962        &'a self,
2963        code: u64,
2964        reason: &'a str,
2965    ) -> AsyncBoxFuture<'a, Result<()>> {
2966        Box::pin(async move { QuinnStream::close_with_error(self, code, reason).await })
2967    }
2968}
2969
2970impl AsyncRecvStreamHandle for QuinnStream {
2971    fn read<'a>(&'a self, dst: &'a mut [u8]) -> AsyncBoxFuture<'a, Result<usize>> {
2972        Box::pin(async move { QuinnStream::read(self, dst).await })
2973    }
2974
2975    fn read_vectored<'a>(
2976        &'a self,
2977        dsts: &'a mut [IoSliceMut<'_>],
2978    ) -> AsyncBoxFuture<'a, Result<usize>> {
2979        Box::pin(async move { QuinnStream::read_vectored(self, dsts).await })
2980    }
2981
2982    fn read_timeout<'a>(
2983        &'a self,
2984        dst: &'a mut [u8],
2985        timeout: Duration,
2986    ) -> AsyncBoxFuture<'a, Result<usize>> {
2987        Box::pin(async move { QuinnStream::read_timeout(self, dst, timeout).await })
2988    }
2989
2990    fn read_vectored_timeout<'a>(
2991        &'a self,
2992        dsts: &'a mut [IoSliceMut<'_>],
2993        timeout: Duration,
2994    ) -> AsyncBoxFuture<'a, Result<usize>> {
2995        Box::pin(async move { QuinnStream::read_vectored_timeout(self, dsts, timeout).await })
2996    }
2997
2998    fn read_exact<'a>(&'a self, dst: &'a mut [u8]) -> AsyncBoxFuture<'a, Result<()>> {
2999        Box::pin(async move { QuinnStream::read_exact(self, dst).await })
3000    }
3001
3002    fn read_exact_timeout<'a>(
3003        &'a self,
3004        dst: &'a mut [u8],
3005        timeout: Duration,
3006    ) -> AsyncBoxFuture<'a, Result<()>> {
3007        Box::pin(async move { QuinnStream::read_exact_timeout(self, dst, timeout).await })
3008    }
3009
3010    fn is_read_closed(&self) -> bool {
3011        QuinnStream::is_read_closed(self)
3012    }
3013
3014    fn set_read_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3015        QuinnStream::set_read_deadline(self, deadline)
3016    }
3017
3018    fn close_read(&self) -> AsyncBoxFuture<'_, Result<()>> {
3019        Box::pin(async move { QuinnStream::close_read(self).await })
3020    }
3021
3022    fn cancel_read(&self, code: u64) -> AsyncBoxFuture<'_, Result<()>> {
3023        Box::pin(async move { QuinnStream::cancel_read(self, code).await })
3024    }
3025}
3026
3027impl AsyncSendStreamHandle for QuinnStream {
3028    fn write<'a>(&'a self, src: &'a [u8]) -> AsyncBoxFuture<'a, Result<usize>> {
3029        Box::pin(async move { QuinnStream::write(self, src).await })
3030    }
3031
3032    fn write_all<'a>(&'a self, src: WritePayload<'a>) -> AsyncBoxFuture<'a, Result<()>> {
3033        Box::pin(async move { QuinnStream::write_all(self, src).await })
3034    }
3035
3036    fn write_all_timeout<'a>(
3037        &'a self,
3038        src: WritePayload<'a>,
3039        timeout: Duration,
3040    ) -> AsyncBoxFuture<'a, Result<()>> {
3041        Box::pin(async move { QuinnStream::write_all_timeout(self, src, timeout).await })
3042    }
3043
3044    fn write_timeout<'a>(
3045        &'a self,
3046        src: &'a [u8],
3047        timeout: Duration,
3048    ) -> AsyncBoxFuture<'a, Result<usize>> {
3049        Box::pin(async move { QuinnStream::write_timeout(self, src, timeout).await })
3050    }
3051
3052    fn write_vectored<'a>(&'a self, parts: &'a [IoSlice<'_>]) -> AsyncBoxFuture<'a, Result<usize>> {
3053        Box::pin(async move { QuinnStream::write_vectored(self, parts).await })
3054    }
3055
3056    fn write_vectored_timeout<'a>(
3057        &'a self,
3058        parts: &'a [IoSlice<'_>],
3059        timeout: Duration,
3060    ) -> AsyncBoxFuture<'a, Result<usize>> {
3061        Box::pin(async move { QuinnStream::write_vectored_timeout(self, parts, timeout).await })
3062    }
3063
3064    fn write_final<'a>(&'a self, src: WritePayload<'a>) -> AsyncBoxFuture<'a, Result<usize>> {
3065        Box::pin(async move { QuinnStream::write_final(self, src).await })
3066    }
3067
3068    fn write_final_timeout<'a>(
3069        &'a self,
3070        src: WritePayload<'a>,
3071        timeout: Duration,
3072    ) -> AsyncBoxFuture<'a, Result<usize>> {
3073        Box::pin(async move { QuinnStream::write_final_timeout(self, src, timeout).await })
3074    }
3075
3076    fn write_vectored_final<'a>(
3077        &'a self,
3078        parts: &'a [IoSlice<'_>],
3079    ) -> AsyncBoxFuture<'a, Result<usize>> {
3080        Box::pin(async move { QuinnStream::write_vectored_final(self, parts).await })
3081    }
3082
3083    fn write_vectored_final_timeout<'a>(
3084        &'a self,
3085        parts: &'a [IoSlice<'_>],
3086        timeout: Duration,
3087    ) -> AsyncBoxFuture<'a, Result<usize>> {
3088        Box::pin(
3089            async move { QuinnStream::write_vectored_final_timeout(self, parts, timeout).await },
3090        )
3091    }
3092
3093    fn is_write_closed(&self) -> bool {
3094        QuinnStream::is_write_closed(self)
3095    }
3096
3097    fn set_write_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3098        QuinnStream::set_write_deadline(self, deadline)
3099    }
3100
3101    fn update_metadata(&self, update: zmux::MetadataUpdate) -> AsyncBoxFuture<'_, Result<()>> {
3102        Box::pin(async move { QuinnStream::update_metadata(self, update).await })
3103    }
3104
3105    fn close_write(&self) -> AsyncBoxFuture<'_, Result<()>> {
3106        Box::pin(async move { QuinnStream::close_write(self).await })
3107    }
3108
3109    fn cancel_write(&self, code: u64) -> AsyncBoxFuture<'_, Result<()>> {
3110        Box::pin(async move { QuinnStream::cancel_write(self, code).await })
3111    }
3112}
3113
3114impl AsyncDuplexStreamHandle for QuinnStream {}
3115
3116impl AsyncStreamHandle for QuinnSendStream {
3117    fn stream_id(&self) -> u64 {
3118        QuinnSendStream::stream_id(self)
3119    }
3120
3121    fn is_opened_locally(&self) -> bool {
3122        QuinnSendStream::is_opened_locally(self)
3123    }
3124
3125    fn is_bidirectional(&self) -> bool {
3126        QuinnSendStream::is_bidirectional(self)
3127    }
3128
3129    fn open_info_len(&self) -> usize {
3130        QuinnSendStream::open_info_len(self)
3131    }
3132
3133    fn has_open_info(&self) -> bool {
3134        QuinnSendStream::has_open_info(self)
3135    }
3136
3137    fn append_open_info_to(&self, dst: &mut Vec<u8>) {
3138        QuinnSendStream::append_open_info_to(self, dst)
3139    }
3140
3141    fn open_info(&self) -> Vec<u8> {
3142        QuinnSendStream::open_info(self)
3143    }
3144
3145    fn metadata(&self) -> StreamMetadata {
3146        QuinnSendStream::metadata(self)
3147    }
3148
3149    fn local_addr(&self) -> Option<SocketAddr> {
3150        QuinnSendStream::local_addr(self)
3151    }
3152
3153    fn peer_addr(&self) -> Option<SocketAddr> {
3154        QuinnSendStream::peer_addr(self)
3155    }
3156
3157    fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3158        QuinnSendStream::set_deadline(self, deadline)
3159    }
3160
3161    fn close(&self) -> AsyncBoxFuture<'_, Result<()>> {
3162        Box::pin(async move { QuinnSendStream::close(self).await })
3163    }
3164
3165    fn close_with_error<'a>(
3166        &'a self,
3167        code: u64,
3168        reason: &'a str,
3169    ) -> AsyncBoxFuture<'a, Result<()>> {
3170        Box::pin(async move { QuinnSendStream::close_with_error(self, code, reason).await })
3171    }
3172}
3173
3174impl AsyncSendStreamHandle for QuinnSendStream {
3175    fn write<'a>(&'a self, src: &'a [u8]) -> AsyncBoxFuture<'a, Result<usize>> {
3176        Box::pin(async move { QuinnSendStream::write(self, src).await })
3177    }
3178
3179    fn write_all<'a>(&'a self, src: WritePayload<'a>) -> AsyncBoxFuture<'a, Result<()>> {
3180        Box::pin(async move { QuinnSendStream::write_all(self, src).await })
3181    }
3182
3183    fn write_all_timeout<'a>(
3184        &'a self,
3185        src: WritePayload<'a>,
3186        timeout: Duration,
3187    ) -> AsyncBoxFuture<'a, Result<()>> {
3188        Box::pin(async move { QuinnSendStream::write_all_timeout(self, src, timeout).await })
3189    }
3190
3191    fn write_timeout<'a>(
3192        &'a self,
3193        src: &'a [u8],
3194        timeout: Duration,
3195    ) -> AsyncBoxFuture<'a, Result<usize>> {
3196        Box::pin(async move { QuinnSendStream::write_timeout(self, src, timeout).await })
3197    }
3198
3199    fn write_vectored<'a>(&'a self, parts: &'a [IoSlice<'_>]) -> AsyncBoxFuture<'a, Result<usize>> {
3200        Box::pin(async move { QuinnSendStream::write_vectored(self, parts).await })
3201    }
3202
3203    fn write_vectored_timeout<'a>(
3204        &'a self,
3205        parts: &'a [IoSlice<'_>],
3206        timeout: Duration,
3207    ) -> AsyncBoxFuture<'a, Result<usize>> {
3208        Box::pin(async move { QuinnSendStream::write_vectored_timeout(self, parts, timeout).await })
3209    }
3210
3211    fn write_final<'a>(&'a self, src: WritePayload<'a>) -> AsyncBoxFuture<'a, Result<usize>> {
3212        Box::pin(async move { QuinnSendStream::write_final(self, src).await })
3213    }
3214
3215    fn write_final_timeout<'a>(
3216        &'a self,
3217        src: WritePayload<'a>,
3218        timeout: Duration,
3219    ) -> AsyncBoxFuture<'a, Result<usize>> {
3220        Box::pin(async move { QuinnSendStream::write_final_timeout(self, src, timeout).await })
3221    }
3222
3223    fn write_vectored_final<'a>(
3224        &'a self,
3225        parts: &'a [IoSlice<'_>],
3226    ) -> AsyncBoxFuture<'a, Result<usize>> {
3227        Box::pin(async move { QuinnSendStream::write_vectored_final(self, parts).await })
3228    }
3229
3230    fn write_vectored_final_timeout<'a>(
3231        &'a self,
3232        parts: &'a [IoSlice<'_>],
3233        timeout: Duration,
3234    ) -> AsyncBoxFuture<'a, Result<usize>> {
3235        Box::pin(async move {
3236            QuinnSendStream::write_vectored_final_timeout(self, parts, timeout).await
3237        })
3238    }
3239
3240    fn is_write_closed(&self) -> bool {
3241        QuinnSendStream::is_write_closed(self)
3242    }
3243
3244    fn set_write_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3245        QuinnSendStream::set_write_deadline(self, deadline)
3246    }
3247
3248    fn update_metadata(&self, update: zmux::MetadataUpdate) -> AsyncBoxFuture<'_, Result<()>> {
3249        Box::pin(async move { QuinnSendStream::update_metadata(self, update).await })
3250    }
3251
3252    fn close_write(&self) -> AsyncBoxFuture<'_, Result<()>> {
3253        Box::pin(async move { QuinnSendStream::close_write(self).await })
3254    }
3255
3256    fn cancel_write(&self, code: u64) -> AsyncBoxFuture<'_, Result<()>> {
3257        Box::pin(async move { QuinnSendStream::cancel_write(self, code).await })
3258    }
3259}
3260
3261impl AsyncStreamHandle for QuinnRecvStream {
3262    fn stream_id(&self) -> u64 {
3263        QuinnRecvStream::stream_id(self)
3264    }
3265
3266    fn is_opened_locally(&self) -> bool {
3267        QuinnRecvStream::is_opened_locally(self)
3268    }
3269
3270    fn is_bidirectional(&self) -> bool {
3271        QuinnRecvStream::is_bidirectional(self)
3272    }
3273
3274    fn open_info_len(&self) -> usize {
3275        QuinnRecvStream::open_info_len(self)
3276    }
3277
3278    fn has_open_info(&self) -> bool {
3279        QuinnRecvStream::has_open_info(self)
3280    }
3281
3282    fn append_open_info_to(&self, dst: &mut Vec<u8>) {
3283        QuinnRecvStream::append_open_info_to(self, dst)
3284    }
3285
3286    fn open_info(&self) -> Vec<u8> {
3287        QuinnRecvStream::open_info(self)
3288    }
3289
3290    fn metadata(&self) -> StreamMetadata {
3291        QuinnRecvStream::metadata(self)
3292    }
3293
3294    fn local_addr(&self) -> Option<SocketAddr> {
3295        QuinnRecvStream::local_addr(self)
3296    }
3297
3298    fn peer_addr(&self) -> Option<SocketAddr> {
3299        QuinnRecvStream::peer_addr(self)
3300    }
3301
3302    fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3303        QuinnRecvStream::set_deadline(self, deadline)
3304    }
3305
3306    fn close(&self) -> AsyncBoxFuture<'_, Result<()>> {
3307        Box::pin(async move { QuinnRecvStream::close(self).await })
3308    }
3309
3310    fn close_with_error<'a>(
3311        &'a self,
3312        code: u64,
3313        reason: &'a str,
3314    ) -> AsyncBoxFuture<'a, Result<()>> {
3315        Box::pin(async move { QuinnRecvStream::close_with_error(self, code, reason).await })
3316    }
3317}
3318
3319impl AsyncRecvStreamHandle for QuinnRecvStream {
3320    fn read<'a>(&'a self, dst: &'a mut [u8]) -> AsyncBoxFuture<'a, Result<usize>> {
3321        Box::pin(async move { QuinnRecvStream::read(self, dst).await })
3322    }
3323
3324    fn read_vectored<'a>(
3325        &'a self,
3326        dsts: &'a mut [IoSliceMut<'_>],
3327    ) -> AsyncBoxFuture<'a, Result<usize>> {
3328        Box::pin(async move { QuinnRecvStream::read_vectored(self, dsts).await })
3329    }
3330
3331    fn read_timeout<'a>(
3332        &'a self,
3333        dst: &'a mut [u8],
3334        timeout: Duration,
3335    ) -> AsyncBoxFuture<'a, Result<usize>> {
3336        Box::pin(async move { QuinnRecvStream::read_timeout(self, dst, timeout).await })
3337    }
3338
3339    fn read_vectored_timeout<'a>(
3340        &'a self,
3341        dsts: &'a mut [IoSliceMut<'_>],
3342        timeout: Duration,
3343    ) -> AsyncBoxFuture<'a, Result<usize>> {
3344        Box::pin(async move { QuinnRecvStream::read_vectored_timeout(self, dsts, timeout).await })
3345    }
3346
3347    fn read_exact<'a>(&'a self, dst: &'a mut [u8]) -> AsyncBoxFuture<'a, Result<()>> {
3348        Box::pin(async move { QuinnRecvStream::read_exact(self, dst).await })
3349    }
3350
3351    fn read_exact_timeout<'a>(
3352        &'a self,
3353        dst: &'a mut [u8],
3354        timeout: Duration,
3355    ) -> AsyncBoxFuture<'a, Result<()>> {
3356        Box::pin(async move { QuinnRecvStream::read_exact_timeout(self, dst, timeout).await })
3357    }
3358
3359    fn is_read_closed(&self) -> bool {
3360        QuinnRecvStream::is_read_closed(self)
3361    }
3362
3363    fn set_read_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3364        QuinnRecvStream::set_read_deadline(self, deadline)
3365    }
3366
3367    fn close_read(&self) -> AsyncBoxFuture<'_, Result<()>> {
3368        Box::pin(async move { QuinnRecvStream::close_read(self).await })
3369    }
3370
3371    fn cancel_read(&self, code: u64) -> AsyncBoxFuture<'_, Result<()>> {
3372        Box::pin(async move { QuinnRecvStream::cancel_read(self, code).await })
3373    }
3374}
3375
3376impl AsyncSession for QuinnSession {
3377    type Stream = QuinnStream;
3378    type SendStream = QuinnSendStream;
3379    type RecvStream = QuinnRecvStream;
3380
3381    fn accept_stream(&self) -> AsyncBoxFuture<'_, Result<Self::Stream>> {
3382        Box::pin(async move { QuinnSession::accept_stream(self).await })
3383    }
3384
3385    fn accept_stream_timeout(&self, timeout: Duration) -> AsyncBoxFuture<'_, Result<Self::Stream>> {
3386        Box::pin(async move { QuinnSession::accept_stream_timeout(self, timeout).await })
3387    }
3388
3389    fn accept_uni_stream(&self) -> AsyncBoxFuture<'_, Result<Self::RecvStream>> {
3390        Box::pin(async move { QuinnSession::accept_uni_stream(self).await })
3391    }
3392
3393    fn accept_uni_stream_timeout(
3394        &self,
3395        timeout: Duration,
3396    ) -> AsyncBoxFuture<'_, Result<Self::RecvStream>> {
3397        Box::pin(async move { QuinnSession::accept_uni_stream_timeout(self, timeout).await })
3398    }
3399
3400    fn open_stream_with(&self, request: OpenRequest) -> AsyncBoxFuture<'_, Result<Self::Stream>> {
3401        Box::pin(async move { QuinnSession::open_stream_with(self, request).await })
3402    }
3403
3404    fn open_uni_stream_with(
3405        &self,
3406        request: OpenRequest,
3407    ) -> AsyncBoxFuture<'_, Result<Self::SendStream>> {
3408        Box::pin(async move { QuinnSession::open_uni_stream_with(self, request).await })
3409    }
3410
3411    fn open_and_send<'a>(
3412        &'a self,
3413        request: OpenSend<'a>,
3414    ) -> AsyncBoxFuture<'a, Result<Self::Stream>> {
3415        Box::pin(async move { QuinnSession::open_and_send(self, request).await })
3416    }
3417
3418    fn open_uni_and_send<'a>(
3419        &'a self,
3420        request: OpenSend<'a>,
3421    ) -> AsyncBoxFuture<'a, Result<Self::SendStream>> {
3422        Box::pin(async move { QuinnSession::open_uni_and_send(self, request).await })
3423    }
3424
3425    fn ping<'a>(&'a self, _echo: &'a [u8]) -> AsyncBoxFuture<'a, Result<Duration>> {
3426        Box::pin(async move {
3427            Err(adapter_session_control_unavailable(
3428                zmux::ErrorOperation::Ping,
3429                "zmux: feature not supported by adapter: ping",
3430            ))
3431        })
3432    }
3433
3434    fn ping_timeout<'a>(
3435        &'a self,
3436        _echo: &'a [u8],
3437        _timeout: Duration,
3438    ) -> AsyncBoxFuture<'a, Result<Duration>> {
3439        Box::pin(async move {
3440            Err(adapter_session_control_unavailable(
3441                zmux::ErrorOperation::Ping,
3442                "zmux: feature not supported by adapter: ping_timeout",
3443            ))
3444        })
3445    }
3446
3447    fn go_away(
3448        &self,
3449        _last_accepted_bidi: u64,
3450        _last_accepted_uni: u64,
3451    ) -> AsyncBoxFuture<'_, Result<()>> {
3452        Box::pin(async move {
3453            Err(adapter_session_control_unavailable(
3454                zmux::ErrorOperation::Close,
3455                "zmux: feature not supported by adapter: go_away",
3456            ))
3457        })
3458    }
3459
3460    fn go_away_with_error<'a>(
3461        &'a self,
3462        _last_accepted_bidi: u64,
3463        _last_accepted_uni: u64,
3464        _code: u64,
3465        _reason: &'a str,
3466    ) -> AsyncBoxFuture<'a, Result<()>> {
3467        Box::pin(async move {
3468            Err(adapter_session_control_unavailable(
3469                zmux::ErrorOperation::Close,
3470                "zmux: feature not supported by adapter: go_away_with_error",
3471            ))
3472        })
3473    }
3474
3475    fn close(&self) -> AsyncBoxFuture<'_, Result<()>> {
3476        Box::pin(async move { QuinnSession::close(self).await })
3477    }
3478
3479    fn close_with_error<'a>(
3480        &'a self,
3481        code: u64,
3482        reason: &'a str,
3483    ) -> AsyncBoxFuture<'a, Result<()>> {
3484        Box::pin(async move { QuinnSession::close_with_error(self, code, reason).await })
3485    }
3486
3487    fn wait(&self) -> AsyncBoxFuture<'_, Result<()>> {
3488        Box::pin(async move { QuinnSession::wait(self).await })
3489    }
3490
3491    fn wait_timeout(&self, timeout: Duration) -> AsyncBoxFuture<'_, Result<bool>> {
3492        Box::pin(async move { QuinnSession::wait_timeout(self, timeout).await })
3493    }
3494
3495    fn is_closed(&self) -> bool {
3496        QuinnSession::is_closed(self)
3497    }
3498
3499    fn local_addr(&self) -> Option<SocketAddr> {
3500        QuinnSession::local_addr(self)
3501    }
3502
3503    fn peer_addr(&self) -> Option<SocketAddr> {
3504        QuinnSession::peer_addr(self)
3505    }
3506
3507    fn close_error(&self) -> Option<zmux::Error> {
3508        QuinnSession::close_error(self)
3509    }
3510
3511    fn state(&self) -> zmux::SessionState {
3512        QuinnSession::state(self)
3513    }
3514
3515    fn stats(&self) -> zmux::SessionStats {
3516        QuinnSession::stats(self)
3517    }
3518
3519    fn peer_go_away_error(&self) -> Option<zmux::PeerGoAwayError> {
3520        None
3521    }
3522
3523    fn peer_close_error(&self) -> Option<zmux::PeerCloseError> {
3524        None
3525    }
3526
3527    fn local_preface(&self) -> zmux::Preface {
3528        adapter_empty_preface()
3529    }
3530
3531    fn peer_preface(&self) -> zmux::Preface {
3532        adapter_empty_preface()
3533    }
3534
3535    fn negotiated(&self) -> zmux::Negotiated {
3536        adapter_empty_negotiated()
3537    }
3538}
3539
3540async fn ensure_open_prelude(
3541    state: &Mutex<PreludeState>,
3542    send: &mut quinn::SendStream,
3543    stats: &AdapterStats,
3544) -> Result<()> {
3545    loop {
3546        let chunk = {
3547            let state = state.lock().unwrap();
3548            if !state.send_prelude || state.prelude_sent {
3549                return Ok(());
3550            }
3551            state.prelude.slice(state.prelude_offset..)
3552        };
3553        if chunk.is_empty() {
3554            let mut state = state.lock().unwrap();
3555            state.prelude_sent = true;
3556            state.prelude = Bytes::new();
3557            drop(state);
3558            return Ok(());
3559        }
3560        let started_at = Instant::now();
3561        let n = send.write(&chunk).await.map_err(translate_write_error)?;
3562        let completed_at = Instant::now();
3563        if n == 0 {
3564            return Err(zmux::Error::local("zmux-quinn: zero-length prelude write"));
3565        }
3566        if n > chunk.len() {
3567            return Err(zmux::Error::local(
3568                "zmux-quinn: prelude write reported invalid progress",
3569            ));
3570        }
3571        stats.note_write_wait(started_at, completed_at);
3572        stats.note_flush(n, completed_at);
3573        stats.note_control_progress_at(completed_at);
3574        let mut state = state.lock().unwrap();
3575        state.prelude_offset = state.prelude_offset.saturating_add(n);
3576        if state.prelude_offset >= state.prelude.len() {
3577            state.prelude_sent = true;
3578            state.prelude = Bytes::new();
3579            state.prelude_offset = 0;
3580            drop(state);
3581            return Ok(());
3582        }
3583    }
3584}
3585
3586async fn write_all_final(
3587    state: &Mutex<PreludeState>,
3588    send: &mut quinn::SendStream,
3589    src: &[u8],
3590    stats: &AdapterStats,
3591) -> Result<usize> {
3592    ensure_open_prelude(state, send, stats).await?;
3593    if !src.is_empty() {
3594        write_payload_all(send, src, stats).await?;
3595    }
3596    finish_send(send, stats).await?;
3597    Ok(src.len())
3598}
3599
3600async fn write_io_slices_final(
3601    state: &Mutex<PreludeState>,
3602    send: &mut quinn::SendStream,
3603    parts: &[IoSlice<'_>],
3604    stats: &AdapterStats,
3605) -> Result<usize> {
3606    let total = total_bytes(parts.iter().map(|part| part.len()))?;
3607    ensure_open_prelude(state, send, stats).await?;
3608    if total == 0 {
3609        finish_send(send, stats).await?;
3610        return Ok(0);
3611    }
3612    if let Some(single) = single_non_empty_io_slice(parts) {
3613        write_payload_all(send, single, stats).await?;
3614        finish_send(send, stats).await?;
3615        return Ok(total);
3616    }
3617    if total <= QUINN_WRITE_VECTORED_COALESCE_MAX_BYTES {
3618        let mut coalesced = Vec::with_capacity(total);
3619        for part in parts {
3620            coalesced.extend_from_slice(part.as_ref());
3621        }
3622        write_payload_all(send, &coalesced, stats).await?;
3623    } else {
3624        for part in parts {
3625            if !part.is_empty() {
3626                write_payload_all(send, part.as_ref(), stats).await?;
3627            }
3628        }
3629    }
3630    finish_send(send, stats).await?;
3631    Ok(total)
3632}
3633
3634async fn write_io_slices_once(
3635    send: &mut quinn::SendStream,
3636    parts: &[IoSlice<'_>],
3637    total: usize,
3638    stats: &AdapterStats,
3639) -> Result<usize> {
3640    if total == 0 {
3641        return Ok(0);
3642    }
3643    if let Some(single) = single_non_empty_io_slice(parts) {
3644        return write_payload_once(send, single, stats).await;
3645    }
3646
3647    let prefix_len = total.min(QUINN_WRITE_VECTORED_COALESCE_MAX_BYTES);
3648    let mut coalesced = Vec::with_capacity(prefix_len);
3649    for part in parts {
3650        if coalesced.len() == prefix_len {
3651            break;
3652        }
3653        let remaining = prefix_len - coalesced.len();
3654        let bytes = part.as_ref();
3655        let take = bytes.len().min(remaining);
3656        coalesced.extend_from_slice(&bytes[..take]);
3657    }
3658    write_payload_once(send, &coalesced, stats).await
3659}
3660
3661fn single_non_empty_io_slice<'a>(parts: &'a [IoSlice<'_>]) -> Option<&'a [u8]> {
3662    let mut single = None;
3663    for part in parts {
3664        if part.is_empty() {
3665            continue;
3666        }
3667        if single.is_some() {
3668            return None;
3669        }
3670        single = Some(part.as_ref());
3671    }
3672    single
3673}
3674
3675async fn write_payload_once(
3676    send: &mut quinn::SendStream,
3677    src: &[u8],
3678    stats: &AdapterStats,
3679) -> Result<usize> {
3680    let started_at = Instant::now();
3681    let n = send.write(src).await.map_err(translate_write_error)?;
3682    let completed_at = Instant::now();
3683    if n == 0 {
3684        return Err(zmux::Error::local("zmux-quinn: zero-length stream write"));
3685    }
3686    if n > src.len() {
3687        return Err(
3688            zmux::Error::local("zmux-quinn: stream write reported invalid progress")
3689                .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write),
3690        );
3691    }
3692    stats.note_write_wait(started_at, completed_at);
3693    stats.note_flush(n, completed_at);
3694    stats.note_data_write(n, completed_at);
3695    Ok(n)
3696}
3697
3698async fn write_payload_all(
3699    send: &mut quinn::SendStream,
3700    src: &[u8],
3701    stats: &AdapterStats,
3702) -> Result<()> {
3703    let started_at = Instant::now();
3704    send.write_all(src).await.map_err(translate_write_error)?;
3705    let completed_at = Instant::now();
3706    stats.note_write_wait(started_at, completed_at);
3707    stats.note_flush(src.len(), completed_at);
3708    stats.note_data_write(src.len(), completed_at);
3709    Ok(())
3710}
3711
3712async fn finish_send(send: &mut quinn::SendStream, stats: &AdapterStats) -> Result<()> {
3713    send.finish().map_err(translate_write_closed_stream)?;
3714    stats.note_control_progress();
3715    Ok(())
3716}
3717
3718fn total_bytes(lengths: impl IntoIterator<Item = usize>) -> Result<usize> {
3719    lengths.into_iter().try_fold(0usize, |total, len| {
3720        total.checked_add(len).ok_or_else(|| {
3721            zmux::Error::local("zmux-quinn: vectored payload length exceeds usize")
3722                .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
3723        })
3724    })
3725}
3726
3727fn validate_progress(n: usize, requested: usize) -> Result<()> {
3728    if n > requested {
3729        Err(
3730            zmux::Error::local("zmux-quinn: write reported invalid progress")
3731                .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write),
3732        )
3733    } else {
3734        Ok(())
3735    }
3736}
3737
3738fn checked_prelude_metadata_len(metadata_len: u64, prefix_len: usize) -> Result<usize> {
3739    let prefix_len = u64::try_from(prefix_len)
3740        .map_err(|_| protocol_prelude_error("stream prelude prefix length exceeds u64"))?;
3741    if metadata_len
3742        .checked_add(prefix_len)
3743        .is_none_or(|len| len > STREAM_PRELUDE_MAX_PAYLOAD)
3744    {
3745        return Err(protocol_prelude_error(
3746            "stream prelude exceeds adapter payload cap",
3747        ));
3748    }
3749    usize::try_from(metadata_len)
3750        .map_err(|_| protocol_prelude_error("stream prelude length exceeds usize"))
3751}
3752
3753fn usize_to_u64_saturating(value: usize) -> u64 {
3754    u64::try_from(value).unwrap_or(u64::MAX)
3755}
3756
3757fn duration_nanos_saturating(duration: Duration) -> u64 {
3758    u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX - 1)
3759}
3760
3761fn note_reason(counts: &mut HashMap<u64, u64>, overflow: &mut u64, code: u64) {
3762    if let Some(count) = counts.get_mut(&code) {
3763        *count = count.saturating_add(1);
3764    } else if counts.len() < MAX_REASON_STATS_CODES {
3765        counts.insert(code, 1);
3766    } else {
3767        *overflow = overflow.saturating_add(1);
3768    }
3769}
3770
3771fn saturating_add_atomic_u64(counter: &AtomicU64, delta: u64) {
3772    if delta == 0 {
3773        return;
3774    }
3775    let mut current = counter.load(Ordering::Relaxed);
3776    loop {
3777        let next = current.saturating_add(delta);
3778        match counter.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed) {
3779            Ok(_) => return,
3780            Err(observed) => current = observed,
3781        }
3782    }
3783}
3784
3785fn saturating_increment_atomic_usize(counter: &AtomicUsize) {
3786    let mut current = counter.load(Ordering::Relaxed);
3787    loop {
3788        if current == usize::MAX {
3789            return;
3790        }
3791        match counter.compare_exchange_weak(
3792            current,
3793            current + 1,
3794            Ordering::Relaxed,
3795            Ordering::Relaxed,
3796        ) {
3797            Ok(_) => return,
3798            Err(observed) => current = observed,
3799        }
3800    }
3801}
3802
3803fn quinn_stream_id(id: quinn::StreamId) -> u64 {
3804    quinn::VarInt::from(id).into_inner()
3805}
3806
3807fn quinn_varint(value: u64) -> quinn::VarInt {
3808    quinn::VarInt::from_u64(value).unwrap_or(quinn::VarInt::MAX)
3809}
3810
3811fn open_error_cleanup_code(err: &zmux::Error) -> quinn::VarInt {
3812    quinn_varint(
3813        err.numeric_code()
3814            .unwrap_or(zmux::ErrorCode::Cancelled.as_u64()),
3815    )
3816}
3817
3818fn checked_quinn_varint(
3819    value: u64,
3820    operation: zmux::ErrorOperation,
3821    direction: zmux::ErrorDirection,
3822) -> Result<quinn::VarInt> {
3823    quinn::VarInt::from_u64(value).map_err(|_| {
3824        zmux::Error::local("zmux-quinn: QUIC application error code exceeds varint62")
3825            .with_stream_context(operation, direction)
3826    })
3827}
3828
3829fn checked_session_quinn_varint(
3830    value: u64,
3831    operation: zmux::ErrorOperation,
3832) -> Result<quinn::VarInt> {
3833    quinn::VarInt::from_u64(value).map_err(|_| {
3834        zmux::Error::local("zmux-quinn: QUIC application error code exceeds varint62")
3835            .with_session_context(operation)
3836    })
3837}
3838
3839fn protocol_prelude_error(reason: &str) -> zmux::Error {
3840    zmux::Error::application(
3841        zmux::ErrorCode::Protocol.as_u64(),
3842        format!("zmux-quinn: {reason}"),
3843    )
3844    .with_source(zmux::ErrorSource::Remote)
3845    .with_stream_context(zmux::ErrorOperation::Accept, zmux::ErrorDirection::Both)
3846    .with_termination_kind(zmux::TerminationKind::Abort)
3847}
3848
3849fn local_stream_application_error(
3850    code: u64,
3851    reason: &str,
3852    operation: zmux::ErrorOperation,
3853    direction: zmux::ErrorDirection,
3854    termination_kind: zmux::TerminationKind,
3855) -> zmux::Error {
3856    zmux::Error::application(code, reason)
3857        .with_source(zmux::ErrorSource::Local)
3858        .with_stream_context(operation, direction)
3859        .with_termination_kind(termination_kind)
3860}
3861
3862fn priority_update_unavailable() -> zmux::Error {
3863    zmux::Error::local(
3864        "zmux: feature not supported by adapter: metadata update requires negotiated priority_update",
3865    )
3866        .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
3867}
3868
3869fn adapter_session_control_unavailable(
3870    operation: zmux::ErrorOperation,
3871    message: &'static str,
3872) -> zmux::Error {
3873    zmux::Error::local(message).with_session_context(operation)
3874}
3875
3876fn adapter_empty_preface() -> zmux::Preface {
3877    zmux::Preface {
3878        preface_version: 0,
3879        role: zmux::Role::Initiator,
3880        tie_breaker_nonce: 0,
3881        min_proto: 0,
3882        max_proto: 0,
3883        capabilities: 0,
3884        settings: zmux::default_settings(),
3885    }
3886}
3887
3888fn adapter_empty_negotiated() -> zmux::Negotiated {
3889    zmux::Negotiated {
3890        proto: 0,
3891        capabilities: 0,
3892        local_role: zmux::Role::Initiator,
3893        peer_role: zmux::Role::Initiator,
3894        peer_settings: zmux::default_settings(),
3895    }
3896}
3897
3898fn local_read_closed_error() -> zmux::Error {
3899    zmux::Error::read_closed()
3900        .with_source(zmux::ErrorSource::Local)
3901        .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)
3902}
3903
3904fn unexpected_eof_error() -> zmux::Error {
3905    zmux::Error::io(std::io::Error::new(
3906        ErrorKind::UnexpectedEof,
3907        "failed to fill whole buffer",
3908    ))
3909    .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)
3910}
3911
3912fn local_write_closed_error() -> zmux::Error {
3913    zmux::Error::write_closed()
3914        .with_source(zmux::ErrorSource::Local)
3915        .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
3916}
3917
3918fn translate_read_closed_stream(_: quinn::ClosedStream) -> zmux::Error {
3919    zmux::Error::read_closed()
3920        .with_source(zmux::ErrorSource::Local)
3921        .with_stream_context(zmux::ErrorOperation::Close, zmux::ErrorDirection::Read)
3922}
3923
3924fn translate_write_closed_stream(_: quinn::ClosedStream) -> zmux::Error {
3925    zmux::Error::write_closed()
3926        .with_source(zmux::ErrorSource::Local)
3927        .with_stream_context(zmux::ErrorOperation::Close, zmux::ErrorDirection::Write)
3928}
3929
3930fn translate_read_error(err: quinn::ReadError) -> zmux::Error {
3931    match err {
3932        quinn::ReadError::Reset(code) => zmux::Error::application(code.into_inner(), "")
3933            .with_source(zmux::ErrorSource::Remote)
3934            .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)
3935            .with_termination_kind(zmux::TerminationKind::Reset),
3936        quinn::ReadError::ConnectionLost(err) => translate_connection_error(err),
3937        quinn::ReadError::ClosedStream => zmux::Error::read_closed()
3938            .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read),
3939        quinn::ReadError::IllegalOrderedRead => {
3940            zmux::Error::local("zmux-quinn: illegal ordered read")
3941                .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)
3942        }
3943        quinn::ReadError::ZeroRttRejected => zmux::Error::local("zmux-quinn: 0-RTT rejected")
3944            .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read),
3945    }
3946}
3947
3948fn translate_write_error(err: quinn::WriteError) -> zmux::Error {
3949    match err {
3950        quinn::WriteError::Stopped(code) => zmux::Error::application(code.into_inner(), "")
3951            .with_source(zmux::ErrorSource::Remote)
3952            .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
3953            .with_termination_kind(zmux::TerminationKind::Stopped),
3954        quinn::WriteError::ConnectionLost(err) => translate_connection_error(err),
3955        quinn::WriteError::ClosedStream => zmux::Error::write_closed()
3956            .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write),
3957        quinn::WriteError::ZeroRttRejected => zmux::Error::local("zmux-quinn: 0-RTT rejected")
3958            .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write),
3959    }
3960}
3961
3962fn translate_connection_error(err: quinn::ConnectionError) -> zmux::Error {
3963    match err {
3964        quinn::ConnectionError::ApplicationClosed(close) => {
3965            let reason = String::from_utf8_lossy(&close.reason).into_owned();
3966            zmux::Error::application(close.error_code.into_inner(), reason)
3967                .with_source(zmux::ErrorSource::Remote)
3968                .with_session_context(zmux::ErrorOperation::Close)
3969                .with_termination_kind(zmux::TerminationKind::SessionTermination)
3970        }
3971        quinn::ConnectionError::LocallyClosed => zmux::Error::session_closed()
3972            .with_source(zmux::ErrorSource::Local)
3973            .with_session_context(zmux::ErrorOperation::Close),
3974        quinn::ConnectionError::TimedOut => zmux::Error::new(
3975            zmux::ErrorCode::IdleTimeout,
3976            "zmux-quinn: QUIC connection idle timeout",
3977        )
3978        .with_source(zmux::ErrorSource::Transport)
3979        .with_session_context(zmux::ErrorOperation::Close)
3980        .with_termination_kind(zmux::TerminationKind::Timeout),
3981        other => zmux::Error::local(format!("zmux-quinn: {other}"))
3982            .with_source(zmux::ErrorSource::Transport)
3983            .with_session_context(zmux::ErrorOperation::Close),
3984    }
3985}
3986
3987fn translate_wait_error(err: quinn::ConnectionError) -> Result<()> {
3988    match err {
3989        quinn::ConnectionError::LocallyClosed => Ok(()),
3990        quinn::ConnectionError::ApplicationClosed(close)
3991            if close.error_code.into_inner() == 0 && close.reason.is_empty() =>
3992        {
3993            Ok(())
3994        }
3995        other => Err(translate_connection_error(other)),
3996    }
3997}
3998
3999#[cfg(test)]
4000mod tests {
4001    use super::*;
4002
4003    #[test]
4004    fn adapter_publishes_stream_adapter_claim_only() {
4005        assert_eq!(target_claims(), &[zmux::Claim::StreamAdapterProfileV1]);
4006        assert!(target_implementation_profiles().is_empty());
4007        assert_eq!(
4008            target_suites(),
4009            &[zmux::ConformanceSuite::StreamAdapterProfile]
4010        );
4011    }
4012
4013    #[test]
4014    fn prelude_round_trip_open_info_and_priority() {
4015        let opts = OpenOptions::new()
4016            .priority(7)
4017            .group(11)
4018            .open_info(&[1, 2, 3, 4]);
4019        let prelude = build_stream_prelude(&opts).unwrap();
4020        let parsed = read_stream_prelude(&mut prelude.as_slice()).unwrap();
4021        assert!(parsed.metadata_valid);
4022        assert!(parsed.is_metadata_valid());
4023        assert!(parsed.has_open_info());
4024        assert_eq!(parsed.open_info(), [1, 2, 3, 4]);
4025        assert_eq!(parsed.metadata.priority, Some(7));
4026        assert_eq!(parsed.metadata().priority, Some(7));
4027        assert_eq!(parsed.metadata.group, Some(11));
4028        assert_eq!(parsed.metadata.open_info, [1, 2, 3, 4]);
4029    }
4030
4031    #[test]
4032    fn empty_prelude_is_one_zero_byte() {
4033        let prelude = build_stream_prelude(&OpenOptions::default()).unwrap();
4034        assert_eq!(prelude, vec![0]);
4035        let parsed = read_stream_prelude(&mut prelude.as_slice()).unwrap();
4036        assert!(parsed.metadata_valid);
4037        assert_eq!(parsed.metadata, StreamMetadata::default());
4038    }
4039
4040    #[test]
4041    fn duplicate_singleton_metadata_is_ignored() {
4042        let mut prelude = Vec::new();
4043        let metadata = [
4044            zmux::METADATA_STREAM_PRIORITY as u8,
4045            1,
4046            5,
4047            zmux::METADATA_STREAM_PRIORITY as u8,
4048            1,
4049            6,
4050        ];
4051        zmux::append_varint(&mut prelude, metadata.len() as u64).unwrap();
4052        prelude.extend_from_slice(&metadata);
4053
4054        let parsed = read_stream_prelude(&mut prelude.as_slice()).unwrap();
4055        assert!(!parsed.metadata_valid);
4056        assert_eq!(parsed.metadata, StreamMetadata::default());
4057    }
4058
4059    #[test]
4060    fn malformed_metadata_is_rejected() {
4061        let mut prelude = Vec::new();
4062        let metadata = [zmux::METADATA_STREAM_PRIORITY as u8, 2, 5];
4063        zmux::append_varint(&mut prelude, metadata.len() as u64).unwrap();
4064        prelude.extend_from_slice(&metadata);
4065
4066        let err = read_stream_prelude(&mut prelude.as_slice()).unwrap_err();
4067        assert!(err.is_error_code(zmux::ErrorCode::Protocol));
4068        assert_eq!(err.scope(), zmux::ErrorScope::Stream);
4069        assert_eq!(err.source(), zmux::ErrorSource::Remote);
4070        assert_eq!(err.direction(), zmux::ErrorDirection::Both);
4071    }
4072
4073    #[test]
4074    fn session_option_defaults_match_go_adapter_bounds() {
4075        let previous_default = SessionOptions::default_accepted_prelude_max_concurrent();
4076        SessionOptions::set_default_accepted_prelude_max_concurrent(0);
4077        assert_eq!(
4078            normalize_accepted_prelude_max_concurrent(None),
4079            DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT
4080        );
4081        assert_eq!(
4082            SessionOptions::default_accepted_prelude_max_concurrent(),
4083            DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT
4084        );
4085        SessionOptions::set_default_accepted_prelude_max_concurrent(5);
4086        assert_eq!(SessionOptions::default_accepted_prelude_max_concurrent(), 5);
4087        assert_eq!(normalize_accepted_prelude_max_concurrent(None), 5);
4088        assert_eq!(normalize_accepted_prelude_max_concurrent(Some(0)), 5);
4089        SessionOptions::set_default_accepted_prelude_max_concurrent(
4090            MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT + 1,
4091        );
4092        assert_eq!(
4093            SessionOptions::default_accepted_prelude_max_concurrent(),
4094            MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT
4095        );
4096        SessionOptions::set_default_accepted_prelude_max_concurrent(previous_default);
4097        assert_eq!(
4098            normalize_accepted_prelude_read_timeout(SessionOptions::default()),
4099            Some(DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT)
4100        );
4101        let custom = SessionOptions::new()
4102            .accepted_prelude_read_timeout(Duration::from_secs(2))
4103            .accepted_prelude_max_concurrent(16);
4104        assert_eq!(
4105            normalize_accepted_prelude_read_timeout(custom),
4106            Some(Duration::from_secs(2))
4107        );
4108        assert_eq!(
4109            normalize_accepted_prelude_max_concurrent(custom.accepted_prelude_max_concurrent),
4110            16
4111        );
4112        assert_eq!(
4113            normalize_accepted_prelude_read_timeout(
4114                SessionOptions::new()
4115                    .accepted_prelude_read_timeout(Duration::from_secs(2))
4116                    .disable_accepted_prelude_read_timeout()
4117            ),
4118            None
4119        );
4120        assert_eq!(
4121            normalize_accepted_prelude_read_timeout(
4122                SessionOptions::new()
4123                    .disable_accepted_prelude_read_timeout()
4124                    .accepted_prelude_read_timeout(Duration::from_secs(2))
4125            ),
4126            Some(Duration::from_secs(2))
4127        );
4128        assert_eq!(
4129            normalize_accepted_prelude_read_timeout(SessionOptions {
4130                accepted_prelude_read_timeout: AcceptedPreludeReadTimeout::Timeout(Duration::ZERO),
4131                ..SessionOptions::default()
4132            }),
4133            Some(DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT)
4134        );
4135        assert_eq!(
4136            normalize_accepted_prelude_read_timeout(SessionOptions {
4137                accepted_prelude_read_timeout: AcceptedPreludeReadTimeout::Disabled,
4138                ..SessionOptions::default()
4139            }),
4140            None
4141        );
4142        assert_eq!(normalize_accepted_prelude_max_concurrent(Some(3)), 3);
4143        assert_eq!(
4144            normalize_accepted_prelude_max_concurrent(Some(
4145                MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT + 1
4146            )),
4147            MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT
4148        );
4149    }
4150
4151    #[test]
4152    fn total_bytes_counts_empty_vectored_parts() {
4153        let parts = [IoSlice::new(b"abc"), IoSlice::new(b""), IoSlice::new(b"de")];
4154        assert_eq!(total_bytes(parts.iter().map(|part| part.len())).unwrap(), 5);
4155    }
4156
4157    #[test]
4158    fn total_bytes_rejects_usize_overflow() {
4159        let err = total_bytes([usize::MAX, 1]).unwrap_err();
4160        assert_eq!(err.scope(), zmux::ErrorScope::Stream);
4161        assert_eq!(err.source(), zmux::ErrorSource::Local);
4162        assert_eq!(err.operation(), zmux::ErrorOperation::Write);
4163        assert_eq!(err.direction(), zmux::ErrorDirection::Write);
4164    }
4165
4166    #[test]
4167    fn support_saturating_helpers_clamp_at_integer_limits() {
4168        let counter = AtomicU64::new(u64::MAX);
4169        saturating_add_atomic_u64(&counter, 1);
4170        assert_eq!(counter.load(Ordering::Relaxed), u64::MAX);
4171
4172        let counter = AtomicU64::new(u64::MAX - 1);
4173        saturating_add_atomic_u64(&counter, 2);
4174        assert_eq!(counter.load(Ordering::Relaxed), u64::MAX);
4175
4176        let counter = AtomicUsize::new(usize::MAX);
4177        saturating_increment_atomic_usize(&counter);
4178        assert_eq!(counter.load(Ordering::Relaxed), usize::MAX);
4179
4180        assert_eq!(
4181            duration_nanos_saturating(Duration::from_secs(u64::MAX)),
4182            u64::MAX - 1
4183        );
4184    }
4185
4186    #[test]
4187    fn adapter_reason_stats_bound_distinct_codes_and_count_overflow() {
4188        let mut reasons = AdapterReasonStats::default();
4189        let overflow = 5;
4190        for i in 0..MAX_REASON_STATS_CODES + overflow {
4191            let code = u64::try_from(i).unwrap();
4192            reasons.note_reset(10_000 + code);
4193            reasons.note_abort(20_000 + code);
4194        }
4195        reasons.note_reset(10_000);
4196        reasons.note_abort(20_000);
4197
4198        let snapshot = reasons.snapshot();
4199        assert_eq!(snapshot.reset.len(), MAX_REASON_STATS_CODES);
4200        assert_eq!(snapshot.abort.len(), MAX_REASON_STATS_CODES);
4201        assert_eq!(snapshot.reset_overflow, u64::try_from(overflow).unwrap());
4202        assert_eq!(snapshot.abort_overflow, u64::try_from(overflow).unwrap());
4203        assert_eq!(snapshot.reset.get(&10_000), Some(&2));
4204        assert_eq!(snapshot.abort.get(&20_000), Some(&2));
4205    }
4206
4207    #[test]
4208    fn single_non_empty_io_slice_skips_empty_parts() {
4209        let parts = [IoSlice::new(b""), IoSlice::new(b"abc"), IoSlice::new(b"")];
4210        assert_eq!(single_non_empty_io_slice(&parts), Some(&b"abc"[..]));
4211
4212        let parts = [IoSlice::new(b"abc"), IoSlice::new(b"de")];
4213        assert_eq!(single_non_empty_io_slice(&parts), None);
4214
4215        let parts = [IoSlice::new(b""), IoSlice::new(b"")];
4216        assert_eq!(single_non_empty_io_slice(&parts), None);
4217    }
4218
4219    #[test]
4220    fn prelude_metadata_len_rejects_cap_overflow() {
4221        let err = checked_prelude_metadata_len(STREAM_PRELUDE_MAX_PAYLOAD, 1).unwrap_err();
4222        assert!(err.is_error_code(zmux::ErrorCode::Protocol));
4223        assert_eq!(
4224            checked_prelude_metadata_len(STREAM_PRELUDE_MAX_PAYLOAD - 1, 1).unwrap(),
4225            usize::try_from(STREAM_PRELUDE_MAX_PAYLOAD - 1).unwrap()
4226        );
4227    }
4228
4229    #[test]
4230    fn active_guard_finishes_once_before_drop() {
4231        let counters = Arc::new(ActiveCounters::default());
4232        let guard = ActiveGuard::new(counters.clone(), ActiveKind::LocalBidi);
4233        assert_eq!(counters.snapshot().total, 1);
4234        guard.finish();
4235        assert_eq!(counters.snapshot().total, 0);
4236        drop(guard);
4237        assert_eq!(counters.snapshot().total, 0);
4238    }
4239
4240    #[test]
4241    fn adapter_stats_snapshot_reports_progress_without_ping_fields() {
4242        let stats = AdapterStats::new();
4243        let now = Instant::now();
4244        stats.note_accepted_stream();
4245        stats.note_hidden_refused();
4246        stats.note_reset_reason(7);
4247        stats.note_abort_reason(9);
4248        stats.note_data_write(7, now);
4249        stats.note_data_read(3, now);
4250        stats.note_flush(7, now);
4251        stats.note_control_progress_at(now);
4252        stats.note_open_latency(now, now + Duration::from_millis(2));
4253        let snapshot = stats.snapshot();
4254        assert_eq!(snapshot.accepted_streams, 1);
4255        assert_eq!(snapshot.hidden_refused, 1);
4256        assert_eq!(snapshot.reasons.reset.get(&7), Some(&1));
4257        assert_eq!(snapshot.reasons.abort.get(&9), Some(&1));
4258        assert_eq!(snapshot.sent_data_bytes, 7);
4259        assert_eq!(snapshot.received_data_bytes, 3);
4260        assert_eq!(snapshot.flush.count, 1);
4261        assert_eq!(snapshot.flush.last_bytes, 7);
4262        assert_eq!(
4263            snapshot.telemetry.last_open_latency,
4264            Some(Duration::from_millis(2))
4265        );
4266        assert!(snapshot.progress.control_progress_at.is_some());
4267        assert!(snapshot.progress.transport_write_at.is_some());
4268        assert!(snapshot.progress.stream_progress_at.is_some());
4269        assert!(snapshot.progress.application_progress_at.is_some());
4270        assert!(snapshot.progress.inbound_frame_at.is_some());
4271        assert!(snapshot.progress.ping_sent_at.is_none());
4272        assert!(snapshot.progress.pong_at.is_none());
4273    }
4274}