1#![forbid(unsafe_code)]
15
16use std::collections::HashMap;
17use std::future::Future;
18use std::io::{ErrorKind, IoSlice, IoSliceMut, Read};
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, Mutex};
22use std::time::{Duration, Instant};
23
24use bytes::Bytes;
25use tokio::sync::{mpsc, watch, Mutex as AsyncMutex, Notify, Semaphore};
26use zmux::{
27 build_open_metadata_prefix, parse_stream_metadata_bytes_view, read_varint, AsyncBoxFuture,
28 AsyncDuplexStreamHandle, AsyncRecvStreamHandle, AsyncSendStreamHandle, AsyncSession,
29 AsyncStreamHandle, OpenOptions, OpenRequest, OpenSend, Result, StreamMetadata, WritePayload,
30 CAPABILITY_OPEN_METADATA, CAPABILITY_PRIORITY_HINTS, CAPABILITY_STREAM_GROUPS,
31};
32
33const STREAM_PRELUDE_MAX_PAYLOAD: u64 = 16 << 10;
34const QUINN_WRITE_VECTORED_COALESCE_MAX_BYTES: usize = 64 << 10;
35const OPEN_METADATA_CAPABILITIES: u64 =
36 CAPABILITY_OPEN_METADATA | CAPABILITY_PRIORITY_HINTS | CAPABILITY_STREAM_GROUPS;
37pub const DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT: Duration = Duration::from_secs(5);
38pub const DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT: usize = 8;
39pub const MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT: usize = 1024;
40const ACCEPTED_PRELUDE_RESULT_QUEUE_CAP: usize = 32;
41const MAX_REASON_STATS_CODES: usize = 1024;
42
43static DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT_VALUE: AtomicUsize =
44 AtomicUsize::new(DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT);
45
46#[derive(Debug, Clone, Copy, Default)]
47struct AdapterAddresses {
48 local_addr: Option<SocketAddr>,
49 peer_addr: Option<SocketAddr>,
50}
51
52pub fn target_claims() -> &'static [zmux::Claim] {
53 &[zmux::Claim::StreamAdapterProfileV1]
54}
55
56pub fn target_implementation_profiles() -> &'static [zmux::ImplementationProfile] {
57 &[]
58}
59
60pub fn target_suites() -> &'static [zmux::ConformanceSuite] {
61 &[zmux::ConformanceSuite::StreamAdapterProfile]
62}
63
64fn default_accepted_prelude_max_concurrent() -> usize {
65 let current = DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT_VALUE.load(Ordering::Acquire);
66 if current > 0 {
67 current.min(MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT)
68 } else {
69 1
70 }
71}
72
73fn set_default_accepted_prelude_max_concurrent(max: usize) {
74 let max = if max == 0 {
75 DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT
76 } else {
77 max.min(MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT)
78 };
79 DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT_VALUE.store(max, Ordering::Release);
80}
81
82#[derive(Debug, Clone, Default, PartialEq, Eq)]
83pub struct AcceptedStreamMetadata {
84 pub metadata: StreamMetadata,
85 pub metadata_valid: bool,
86}
87
88impl AcceptedStreamMetadata {
89 pub fn metadata(&self) -> &StreamMetadata {
90 &self.metadata
91 }
92
93 pub fn is_metadata_valid(&self) -> bool {
94 self.metadata_valid
95 }
96
97 pub fn open_info(&self) -> &[u8] {
98 self.metadata.open_info()
99 }
100
101 pub fn has_open_info(&self) -> bool {
102 self.metadata.has_open_info()
103 }
104}
105
106#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
107pub enum AcceptedPreludeReadTimeout {
108 #[default]
109 Default,
110 Disabled,
111 Timeout(Duration),
112}
113
114impl AcceptedPreludeReadTimeout {
115 fn normalize(self) -> Option<Duration> {
116 match self {
117 Self::Default => Some(DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT),
118 Self::Disabled => None,
119 Self::Timeout(timeout) if timeout.is_zero() => {
120 Some(DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT)
121 }
122 Self::Timeout(timeout) => Some(timeout),
123 }
124 }
125}
126
127#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
128pub struct SessionOptions {
129 pub accepted_prelude_read_timeout: AcceptedPreludeReadTimeout,
130 pub accepted_prelude_max_concurrent: Option<usize>,
131 pub local_addr: Option<SocketAddr>,
132 pub peer_addr: Option<SocketAddr>,
133}
134
135impl SessionOptions {
136 pub fn default_accepted_prelude_max_concurrent() -> usize {
141 default_accepted_prelude_max_concurrent()
142 }
143
144 pub fn set_default_accepted_prelude_max_concurrent(max: usize) {
150 set_default_accepted_prelude_max_concurrent(max);
151 }
152
153 #[must_use]
154 pub fn new() -> Self {
155 Self::default()
156 }
157
158 #[must_use]
159 pub fn accepted_prelude_read_timeout(mut self, timeout: Duration) -> Self {
160 self.accepted_prelude_read_timeout = AcceptedPreludeReadTimeout::Timeout(timeout);
161 self
162 }
163
164 #[must_use]
165 pub fn disable_accepted_prelude_read_timeout(mut self) -> Self {
166 self.accepted_prelude_read_timeout = AcceptedPreludeReadTimeout::Disabled;
167 self
168 }
169
170 #[must_use]
174 pub fn accepted_prelude_max_concurrent(mut self, max: usize) -> Self {
175 self.accepted_prelude_max_concurrent = Some(max);
176 self
177 }
178
179 #[must_use]
180 pub fn local_addr(mut self, addr: SocketAddr) -> Self {
181 self.local_addr = Some(addr);
182 self
183 }
184
185 #[must_use]
186 pub fn peer_addr(mut self, addr: SocketAddr) -> Self {
187 self.peer_addr = Some(addr);
188 self
189 }
190
191 #[must_use]
192 pub fn addresses(
193 mut self,
194 local_addr: Option<SocketAddr>,
195 peer_addr: Option<SocketAddr>,
196 ) -> Self {
197 self.local_addr = local_addr;
198 self.peer_addr = peer_addr;
199 self
200 }
201}
202
203pub fn build_stream_prelude(opts: &OpenOptions) -> Result<Vec<u8>> {
204 let prelude = build_open_metadata_prefix(
205 OPEN_METADATA_CAPABILITIES,
206 opts.initial_priority(),
207 opts.initial_group(),
208 opts.open_info_bytes(),
209 STREAM_PRELUDE_MAX_PAYLOAD,
210 )?;
211 if prelude.is_empty() {
212 Ok(vec![0])
213 } else {
214 Ok(prelude)
215 }
216}
217
218pub fn read_stream_prelude<R: Read>(reader: &mut R) -> Result<AcceptedStreamMetadata> {
219 let (metadata_len, prefix_len) =
220 read_varint(reader).map_err(|_| protocol_prelude_error("parse stream prelude length"))?;
221 if metadata_len == 0 {
222 return Ok(AcceptedStreamMetadata {
223 metadata: StreamMetadata::default(),
224 metadata_valid: true,
225 });
226 }
227 let metadata_len = checked_prelude_metadata_len(metadata_len, prefix_len)?;
228 let mut metadata_raw = vec![0u8; metadata_len];
229 reader.read_exact(&mut metadata_raw).map_err(|err| {
230 if err.kind() == ErrorKind::UnexpectedEof {
231 protocol_prelude_error("unexpected EOF in stream prelude")
232 } else {
233 err.into()
234 }
235 })?;
236 let (metadata, metadata_valid) = parse_stream_metadata_bytes_view(&metadata_raw)
237 .map_err(|_| protocol_prelude_error("malformed stream prelude metadata"))?;
238 Ok(AcceptedStreamMetadata {
239 metadata: metadata.try_to_owned()?,
240 metadata_valid,
241 })
242}
243
244fn normalize_accepted_prelude_read_timeout(opts: SessionOptions) -> Option<Duration> {
245 opts.accepted_prelude_read_timeout.normalize()
246}
247
248fn normalize_accepted_prelude_max_concurrent(max: Option<usize>) -> usize {
249 match max {
250 Some(max) if max > 0 => max.min(MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT),
251 _ => default_accepted_prelude_max_concurrent(),
252 }
253}
254
255#[derive(Clone)]
256pub struct QuinnSession {
257 conn: quinn::Connection,
258 accepted_prelude_read_timeout: Option<Duration>,
259 local_addr: Option<SocketAddr>,
260 peer_addr: Option<SocketAddr>,
261 prepare_sem: Arc<Semaphore>,
262 active: Arc<ActiveCounters>,
263 internal: Arc<InternalTasks>,
264 stats: Arc<AdapterStats>,
265 accept_shutdown: watch::Sender<()>,
266 bidi_accept: Arc<AsyncMutex<Option<mpsc::Receiver<Result<QuinnStream>>>>>,
267 uni_accept: Arc<AsyncMutex<Option<mpsc::Receiver<Result<QuinnRecvStream>>>>>,
268}
269
270impl QuinnSession {
271 #[must_use]
272 pub fn new(conn: quinn::Connection) -> Self {
273 Self::with_options(conn, SessionOptions::default())
274 }
275
276 #[must_use]
277 pub fn with_options(conn: quinn::Connection, opts: SessionOptions) -> Self {
278 let (accept_shutdown, _) = watch::channel(());
279 let local_addr = opts.local_addr;
280 let peer_addr = opts.peer_addr.or_else(|| Some(conn.remote_address()));
281 Self {
282 conn,
283 accepted_prelude_read_timeout: normalize_accepted_prelude_read_timeout(opts),
284 local_addr,
285 peer_addr,
286 prepare_sem: Arc::new(Semaphore::new(normalize_accepted_prelude_max_concurrent(
287 opts.accepted_prelude_max_concurrent,
288 ))),
289 active: Arc::new(ActiveCounters::default()),
290 internal: Arc::new(InternalTasks::default()),
291 stats: Arc::new(AdapterStats::new()),
292 accept_shutdown,
293 bidi_accept: Arc::new(AsyncMutex::new(None)),
294 uni_accept: Arc::new(AsyncMutex::new(None)),
295 }
296 }
297
298 pub fn local_addr(&self) -> Option<SocketAddr> {
299 self.local_addr
300 }
301
302 pub fn peer_addr(&self) -> Option<SocketAddr> {
303 self.peer_addr
304 }
305
306 pub async fn accept_stream(&self) -> Result<QuinnStream> {
307 let mut receiver = self.bidi_accept.lock().await;
308 if receiver.is_none() {
309 let (tx, rx) = mpsc::channel(ACCEPTED_PRELUDE_RESULT_QUEUE_CAP);
310 *receiver = Some(rx);
311 self.spawn_bidi_accept_loop(tx);
312 }
313 let receiver = receiver.as_mut().expect("bidi accept receiver initialized");
314 loop {
315 match receiver.recv().await {
316 Some(Ok(stream)) => {
317 self.stats.note_accepted_stream();
318 return Ok(stream);
319 }
320 Some(Err(err)) if accepted_prelude_rejectable(&err) => continue,
321 Some(Err(err)) => return Err(err),
322 None => return Err(zmux::Error::session_closed()),
323 }
324 }
325 }
326
327 pub async fn accept_stream_timeout(&self, timeout: Duration) -> Result<QuinnStream> {
328 with_timeout(self.accept_stream(), timeout, "accept").await
329 }
330
331 pub async fn accept_uni_stream(&self) -> Result<QuinnRecvStream> {
332 let mut receiver = self.uni_accept.lock().await;
333 if receiver.is_none() {
334 let (tx, rx) = mpsc::channel(ACCEPTED_PRELUDE_RESULT_QUEUE_CAP);
335 *receiver = Some(rx);
336 self.spawn_uni_accept_loop(tx);
337 }
338 let receiver = receiver.as_mut().expect("uni accept receiver initialized");
339 loop {
340 match receiver.recv().await {
341 Some(Ok(stream)) => {
342 self.stats.note_accepted_stream();
343 return Ok(stream);
344 }
345 Some(Err(err)) if accepted_prelude_rejectable(&err) => continue,
346 Some(Err(err)) => return Err(err),
347 None => return Err(zmux::Error::session_closed()),
348 }
349 }
350 }
351
352 pub async fn accept_uni_stream_timeout(&self, timeout: Duration) -> Result<QuinnRecvStream> {
353 with_timeout(self.accept_uni_stream(), timeout, "accept").await
354 }
355
356 fn spawn_bidi_accept_loop(&self, tx: mpsc::Sender<Result<QuinnStream>>) {
357 let conn = self.conn.clone();
358 let timeout = self.accepted_prelude_read_timeout;
359 let addresses = AdapterAddresses {
360 local_addr: self.local_addr,
361 peer_addr: self.peer_addr,
362 };
363 let sem = self.prepare_sem.clone();
364 let active = self.active.clone();
365 let stats = self.stats.clone();
366 let internal = self.internal.clone();
367 let mut shutdown = self.accept_shutdown.subscribe();
368 internal.clone().spawn(async move {
369 loop {
370 let (send, recv) = match accept_bi_or_shutdown(&conn, &mut shutdown).await {
371 Some(Ok(streams)) => streams,
372 None => return,
373 Some(Err(err)) => {
374 publish_bidi_accept_result(
375 &tx,
376 Err(translate_connection_error(err)),
377 &conn,
378 )
379 .await;
380 return;
381 }
382 };
383 let permit = match acquire_prepare_permit_or_shutdown(&sem, &mut shutdown).await {
384 Some(Ok(permit)) => permit,
385 None => return,
386 Some(Err(_)) => {
387 let _ = tx.send(Err(zmux::Error::session_closed())).await;
388 return;
389 }
390 };
391 let tx = tx.clone();
392 let active = active.clone();
393 let stats = stats.clone();
394 let conn = conn.clone();
395 let internal = internal.clone();
396 let mut worker_shutdown = shutdown.clone();
397 internal.spawn(async move {
398 let _permit = permit;
399 if let Some(result) = prepare_accepted_bidi_stream_or_shutdown(
400 send,
401 recv,
402 timeout,
403 addresses,
404 active,
405 stats,
406 &mut worker_shutdown,
407 )
408 .await
409 {
410 publish_bidi_accept_result(&tx, result, &conn).await;
411 }
412 });
413 }
414 });
415 }
416
417 fn spawn_uni_accept_loop(&self, tx: mpsc::Sender<Result<QuinnRecvStream>>) {
418 let conn = self.conn.clone();
419 let timeout = self.accepted_prelude_read_timeout;
420 let addresses = AdapterAddresses {
421 local_addr: self.local_addr,
422 peer_addr: self.peer_addr,
423 };
424 let sem = self.prepare_sem.clone();
425 let active = self.active.clone();
426 let stats = self.stats.clone();
427 let internal = self.internal.clone();
428 let mut shutdown = self.accept_shutdown.subscribe();
429 internal.clone().spawn(async move {
430 loop {
431 let recv = match accept_uni_or_shutdown(&conn, &mut shutdown).await {
432 Some(Ok(recv)) => recv,
433 None => return,
434 Some(Err(err)) => {
435 publish_uni_accept_result(&tx, Err(translate_connection_error(err)), &conn)
436 .await;
437 return;
438 }
439 };
440 let permit = match acquire_prepare_permit_or_shutdown(&sem, &mut shutdown).await {
441 Some(Ok(permit)) => permit,
442 None => return,
443 Some(Err(_)) => {
444 let _ = tx.send(Err(zmux::Error::session_closed())).await;
445 return;
446 }
447 };
448 let tx = tx.clone();
449 let active = active.clone();
450 let stats = stats.clone();
451 let conn = conn.clone();
452 let internal = internal.clone();
453 let mut worker_shutdown = shutdown.clone();
454 internal.spawn(async move {
455 let _permit = permit;
456 if let Some(result) = prepare_accepted_uni_stream_or_shutdown(
457 recv,
458 timeout,
459 addresses,
460 active,
461 stats,
462 &mut worker_shutdown,
463 )
464 .await
465 {
466 publish_uni_accept_result(&tx, result, &conn).await;
467 }
468 });
469 }
470 });
471 }
472
473 pub async fn open_stream(&self) -> Result<QuinnStream> {
474 self.open_stream_with(OpenRequest::new()).await
475 }
476
477 pub async fn open_stream_with(&self, request: impl Into<OpenRequest>) -> Result<QuinnStream> {
478 let (opts, timeout) = request.into().into_parts();
479 let open = self.open_stream_inner(opts);
480 match timeout {
481 Some(timeout) => with_timeout(open, timeout, "open").await,
482 None => open.await,
483 }
484 }
485
486 async fn open_stream_inner(&self, opts: OpenOptions) -> Result<QuinnStream> {
487 let prelude = PreludeState::local(opts)?;
488 let started_at = Instant::now();
489 let (send, recv) = self
490 .conn
491 .open_bi()
492 .await
493 .map_err(translate_connection_error)?;
494 let stream = QuinnStream::local(
495 send,
496 recv,
497 prelude,
498 self.stats.clone(),
499 self.local_addr,
500 self.peer_addr,
501 );
502 if let Err(err) = stream.maybe_send_open_prelude_on_open().await {
503 stream.discard_after_open_error(&err).await;
504 return Err(err);
505 }
506 self.stats.note_open_latency(started_at, Instant::now());
507 Ok(stream.with_active(self.active.clone(), ActiveKind::LocalBidi))
508 }
509
510 pub async fn open_uni_stream(&self) -> Result<QuinnSendStream> {
511 self.open_uni_stream_with(OpenRequest::new()).await
512 }
513
514 pub async fn open_uni_stream_with(
515 &self,
516 request: impl Into<OpenRequest>,
517 ) -> Result<QuinnSendStream> {
518 let (opts, timeout) = request.into().into_parts();
519 let open = self.open_uni_stream_inner(opts);
520 match timeout {
521 Some(timeout) => with_timeout(open, timeout, "open").await,
522 None => open.await,
523 }
524 }
525
526 async fn open_uni_stream_inner(&self, opts: OpenOptions) -> Result<QuinnSendStream> {
527 let prelude = PreludeState::local(opts)?;
528 let started_at = Instant::now();
529 let send = self
530 .conn
531 .open_uni()
532 .await
533 .map_err(translate_connection_error)?;
534 let stream = QuinnSendStream::local(
535 send,
536 prelude,
537 self.stats.clone(),
538 self.local_addr,
539 self.peer_addr,
540 );
541 if let Err(err) = stream.maybe_send_open_prelude_on_open().await {
542 stream.discard_after_open_error(&err).await;
543 return Err(err);
544 }
545 self.stats.note_open_latency(started_at, Instant::now());
546 Ok(stream.with_active(self.active.clone(), ActiveKind::LocalUni))
547 }
548
549 pub async fn open_and_send<'a>(&self, request: impl Into<OpenSend<'a>>) -> Result<QuinnStream> {
550 let (opts, payload, timeout) = request.into().into_parts();
551 let requested = payload.checked_len()?;
552 let start = Instant::now();
553 let mut open = OpenRequest::new().options(opts);
554 if let Some(timeout) = timeout {
555 ensure_positive_session_timeout(timeout, "open", zmux::ErrorOperation::Open)?;
556 open = open.timeout(timeout);
557 }
558 let stream = self
559 .open_stream_with(open)
560 .await
561 .map_err(|err| err.with_session_context(zmux::ErrorOperation::Open))?;
562 if requested == 0 {
563 return Ok(stream);
564 }
565 let write_result: Result<()> = async {
566 let timeout = timeout
567 .map(|timeout| remaining_write_timeout(start, timeout))
568 .transpose()?;
569 match timeout {
570 Some(timeout) => {
571 stream
572 .write_all_timeout(payload, timeout)
573 .await
574 .map_err(|err| {
575 err.with_stream_context(
576 zmux::ErrorOperation::Write,
577 zmux::ErrorDirection::Write,
578 )
579 })?;
580 }
581 None => {
582 stream.write_all(payload).await?;
583 }
584 }
585 Ok(())
586 }
587 .await;
588 if let Err(err) = write_result {
589 stream.discard_after_open_error(&err).await;
590 return Err(err);
591 }
592 Ok(stream)
593 }
594
595 pub async fn open_uni_and_send<'a>(
596 &self,
597 request: impl Into<OpenSend<'a>>,
598 ) -> Result<QuinnSendStream> {
599 let (opts, payload, timeout) = request.into().into_parts();
600 let requested = payload.checked_len()?;
601 let start = Instant::now();
602 let mut open = OpenRequest::new().options(opts);
603 if let Some(timeout) = timeout {
604 ensure_positive_session_timeout(timeout, "open", zmux::ErrorOperation::Open)?;
605 open = open.timeout(timeout);
606 }
607 let stream = self
608 .open_uni_stream_with(open)
609 .await
610 .map_err(|err| err.with_session_context(zmux::ErrorOperation::Open))?;
611 let write_result: Result<()> = async {
612 let timeout = timeout
613 .map(|timeout| remaining_write_timeout(start, timeout))
614 .transpose()?;
615 let n = match (payload, timeout) {
616 (WritePayload::Bytes(data), Some(timeout)) => stream
617 .write_final_timeout(WritePayload::Bytes(data), timeout)
618 .await
619 .map_err(|err| {
620 err.with_stream_context(
621 zmux::ErrorOperation::Write,
622 zmux::ErrorDirection::Write,
623 )
624 })?,
625 (WritePayload::Bytes(data), None) => {
626 stream.write_final(WritePayload::Bytes(data)).await?
627 }
628 (WritePayload::Vectored(parts), Some(timeout)) => stream
629 .write_vectored_final_timeout(parts, timeout)
630 .await
631 .map_err(|err| {
632 err.with_stream_context(
633 zmux::ErrorOperation::Write,
634 zmux::ErrorDirection::Write,
635 )
636 })?,
637 (WritePayload::Vectored(parts), None) => stream.write_vectored_final(parts).await?,
638 };
639 validate_progress(n, requested)?;
640 Ok(())
641 }
642 .await;
643 if let Err(err) = write_result {
644 stream.discard_after_open_error(&err).await;
645 return Err(err);
646 }
647 Ok(stream)
648 }
649
650 fn close_now(&self, code: quinn::VarInt, reason: &[u8]) {
651 let _ = self.accept_shutdown.send(());
652 self.conn.close(code, reason);
653 }
654
655 pub async fn close(&self) -> Result<()> {
656 self.close_now(quinn_varint(0), &[]);
657 self.wait().await
658 }
659
660 pub async fn close_with_error(&self, code: u64, reason: &str) -> Result<()> {
661 let code = checked_session_quinn_varint(code, zmux::ErrorOperation::Close)?;
662 self.close_now(code, reason.as_bytes());
663 Ok(())
664 }
665
666 pub async fn wait(&self) -> Result<()> {
667 let result = translate_wait_error(self.conn.closed().await);
668 self.internal.wait_idle().await;
669 result
670 }
671
672 pub async fn wait_timeout(&self, timeout: Duration) -> Result<bool> {
673 if self.is_closed() {
674 return Ok(true);
675 }
676 if timeout.is_zero() {
677 return Ok(false);
678 }
679 match tokio::time::timeout(timeout, self.wait()).await {
680 Ok(result) => result.map(|_| true),
681 Err(_) => Ok(false),
682 }
683 }
684
685 pub fn is_closed(&self) -> bool {
686 self.conn.close_reason().is_some()
687 }
688
689 pub fn close_error(&self) -> Option<zmux::Error> {
690 self.conn
691 .close_reason()
692 .and_then(|err| translate_wait_error(err).err())
693 }
694
695 pub fn state(&self) -> zmux::SessionState {
696 if self.is_closed() {
697 zmux::SessionState::Closed
698 } else {
699 zmux::SessionState::Ready
700 }
701 }
702
703 pub fn stats(&self) -> zmux::SessionStats {
704 let active_streams = self.active.snapshot();
705 let adapter_stats = self.stats.snapshot();
706 zmux::SessionStats {
707 state: self.state(),
708 sent_frames: 0,
709 received_frames: 0,
710 sent_data_bytes: adapter_stats.sent_data_bytes,
711 received_data_bytes: adapter_stats.received_data_bytes,
712 open_streams: usize::try_from(active_streams.total).unwrap_or(usize::MAX),
713 accepted_streams: adapter_stats.accepted_streams,
714 active_streams,
715 provisional: Default::default(),
716 accept_backlog: Default::default(),
717 retention: Default::default(),
718 memory: Default::default(),
719 abuse: Default::default(),
720 hidden: zmux::HiddenStateStats {
721 refused: adapter_stats.hidden_refused,
722 ..Default::default()
723 },
724 reasons: adapter_stats.reasons,
725 diagnostics: Default::default(),
726 pressure: Default::default(),
727 flush: adapter_stats.flush,
728 telemetry: adapter_stats.telemetry,
729 progress: adapter_stats.progress,
730 blocked_write_total: adapter_stats.blocked_write_total,
731 writer_queue: Default::default(),
732 liveness: Default::default(),
733 }
734 }
735}
736
737async fn with_timeout<T>(
738 fut: impl Future<Output = Result<T>>,
739 timeout: Duration,
740 operation: &'static str,
741) -> Result<T> {
742 if timeout.is_zero() {
743 return Err(zmux::Error::timeout(operation));
744 }
745 tokio::time::timeout(timeout, fut)
746 .await
747 .map_err(|_| zmux::Error::timeout(operation))?
748}
749
750async fn with_optional_timeout<T>(
751 fut: impl Future<Output = Result<T>>,
752 timeout: Option<Duration>,
753 operation: &'static str,
754) -> Result<T> {
755 match timeout {
756 Some(timeout) => with_timeout(fut, timeout, operation).await,
757 None => fut.await,
758 }
759}
760
761fn ensure_positive_session_timeout(
762 timeout: Duration,
763 operation_name: &'static str,
764 operation: zmux::ErrorOperation,
765) -> Result<()> {
766 if timeout.is_zero() {
767 Err(zmux::Error::timeout(operation_name).with_session_context(operation))
768 } else {
769 Ok(())
770 }
771}
772
773fn remaining_timeout(start: Instant, timeout: Duration) -> Option<Duration> {
774 timeout
775 .checked_sub(start.elapsed())
776 .filter(|duration| !duration.is_zero())
777}
778
779fn timeout_to_deadline(timeout: Option<Duration>) -> Option<Instant> {
780 timeout.and_then(|timeout| Instant::now().checked_add(timeout))
781}
782
783fn remaining_write_timeout(start: Instant, timeout: Duration) -> Result<Duration> {
784 remaining_timeout(start, timeout).ok_or_else(|| {
785 zmux::Error::timeout("write")
786 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
787 })
788}
789
790fn timeout_until(deadline: Option<Instant>, operation: &'static str) -> Result<Option<Duration>> {
791 match deadline {
792 Some(deadline) => deadline
793 .checked_duration_since(Instant::now())
794 .filter(|timeout| !timeout.is_zero())
795 .map(Some)
796 .ok_or_else(|| zmux::Error::timeout(operation)),
797 None => Ok(None),
798 }
799}
800
801#[derive(Debug)]
802struct AdapterStats {
803 origin: Instant,
804 sent_data_bytes: AtomicU64,
805 received_data_bytes: AtomicU64,
806 accepted_streams: AtomicU64,
807 hidden_refused: AtomicU64,
808 flush_count: AtomicU64,
809 blocked_write_nanos: AtomicU64,
810 last_inbound_frame_at: AtomicU64,
811 last_control_progress_at: AtomicU64,
812 last_transport_write_at: AtomicU64,
813 last_stream_progress_at: AtomicU64,
814 last_application_progress_at: AtomicU64,
815 last_flush_at: AtomicU64,
816 last_flush_bytes: AtomicUsize,
817 last_open_latency_nanos: AtomicU64,
818 reasons: Mutex<AdapterReasonStats>,
819}
820
821#[derive(Debug, Clone)]
822struct AdapterStatsSnapshot {
823 sent_data_bytes: u64,
824 received_data_bytes: u64,
825 accepted_streams: u64,
826 hidden_refused: u64,
827 flush: zmux::FlushStats,
828 telemetry: zmux::TelemetryStats,
829 progress: zmux::ProgressStats,
830 blocked_write_total: Duration,
831 reasons: zmux::ReasonStats,
832}
833
834#[derive(Debug, Default)]
835struct InternalTasks {
836 active: AtomicUsize,
837 idle: Notify,
838}
839
840impl InternalTasks {
841 fn spawn<F>(self: Arc<Self>, fut: F)
842 where
843 F: Future<Output = ()> + Send + 'static,
844 {
845 self.active.fetch_add(1, Ordering::AcqRel);
846 tokio::spawn(async move {
847 fut.await;
848 if self.active.fetch_sub(1, Ordering::AcqRel) == 1 {
849 self.idle.notify_waiters();
850 }
851 });
852 }
853
854 async fn wait_idle(&self) {
855 loop {
856 let notified = self.idle.notified();
857 if self.active.load(Ordering::Acquire) == 0 {
858 return;
859 }
860 notified.await;
861 }
862 }
863}
864
865#[derive(Debug, Default)]
866struct AdapterReasonStats {
867 reset: HashMap<u64, u64>,
868 reset_overflow: u64,
869 abort: HashMap<u64, u64>,
870 abort_overflow: u64,
871}
872
873impl AdapterStats {
874 const NO_DURATION: u64 = u64::MAX;
875
876 fn new() -> Self {
877 Self {
878 origin: Instant::now(),
879 sent_data_bytes: AtomicU64::new(0),
880 received_data_bytes: AtomicU64::new(0),
881 accepted_streams: AtomicU64::new(0),
882 hidden_refused: AtomicU64::new(0),
883 flush_count: AtomicU64::new(0),
884 blocked_write_nanos: AtomicU64::new(0),
885 last_inbound_frame_at: AtomicU64::new(0),
886 last_control_progress_at: AtomicU64::new(0),
887 last_transport_write_at: AtomicU64::new(0),
888 last_stream_progress_at: AtomicU64::new(0),
889 last_application_progress_at: AtomicU64::new(0),
890 last_flush_at: AtomicU64::new(0),
891 last_flush_bytes: AtomicUsize::new(0),
892 last_open_latency_nanos: AtomicU64::new(Self::NO_DURATION),
893 reasons: Mutex::new(AdapterReasonStats::default()),
894 }
895 }
896
897 fn snapshot(&self) -> AdapterStatsSnapshot {
898 let last_open_latency = match self.last_open_latency_nanos.load(Ordering::Relaxed) {
899 Self::NO_DURATION => None,
900 nanos => Some(Duration::from_nanos(nanos)),
901 };
902 let flush_count = self.flush_count.load(Ordering::Relaxed);
903 AdapterStatsSnapshot {
904 sent_data_bytes: self.sent_data_bytes.load(Ordering::Relaxed),
905 received_data_bytes: self.received_data_bytes.load(Ordering::Relaxed),
906 accepted_streams: self.accepted_streams.load(Ordering::Relaxed),
907 hidden_refused: self.hidden_refused.load(Ordering::Relaxed),
908 flush: zmux::FlushStats {
909 count: flush_count,
910 last_at: self.instant_for_event(self.last_flush_at.load(Ordering::Relaxed)),
911 last_frames: u64::from(flush_count != 0),
912 last_bytes: self.last_flush_bytes.load(Ordering::Relaxed),
913 },
914 telemetry: zmux::TelemetryStats {
915 last_open_latency,
916 send_rate_estimate_bytes_per_second: 0,
917 },
918 progress: zmux::ProgressStats {
919 inbound_frame_at: self
920 .instant_for_event(self.last_inbound_frame_at.load(Ordering::Relaxed)),
921 control_progress_at: self
922 .instant_for_event(self.last_control_progress_at.load(Ordering::Relaxed)),
923 transport_write_at: self
924 .instant_for_event(self.last_transport_write_at.load(Ordering::Relaxed)),
925 stream_progress_at: self
926 .instant_for_event(self.last_stream_progress_at.load(Ordering::Relaxed)),
927 application_progress_at: self
928 .instant_for_event(self.last_application_progress_at.load(Ordering::Relaxed)),
929 ping_sent_at: None,
930 pong_at: None,
931 },
932 blocked_write_total: Duration::from_nanos(
933 self.blocked_write_nanos.load(Ordering::Relaxed),
934 ),
935 reasons: self.reasons.lock().unwrap().snapshot(),
936 }
937 }
938
939 fn note_accepted_stream(&self) {
940 saturating_add_atomic_u64(&self.accepted_streams, 1);
941 }
942
943 fn note_hidden_refused(&self) {
944 saturating_add_atomic_u64(&self.hidden_refused, 1);
945 }
946
947 fn note_reset_reason(&self, code: u64) {
948 self.reasons.lock().unwrap().note_reset(code);
949 }
950
951 fn note_abort_reason(&self, code: u64) {
952 self.reasons.lock().unwrap().note_abort(code);
953 }
954
955 fn note_control_progress(&self) {
956 self.note_control_progress_at(Instant::now());
957 }
958
959 fn note_control_progress_at(&self, at: Instant) {
960 self.last_control_progress_at
961 .store(self.event_nanos(at), Ordering::Relaxed);
962 }
963
964 fn note_data_read(&self, bytes: usize, at: Instant) {
965 if bytes == 0 {
966 return;
967 }
968 let event = self.event_nanos(at);
969 saturating_add_atomic_u64(&self.received_data_bytes, usize_to_u64_saturating(bytes));
970 self.last_inbound_frame_at.store(event, Ordering::Relaxed);
971 self.last_stream_progress_at.store(event, Ordering::Relaxed);
972 self.last_application_progress_at
973 .store(event, Ordering::Relaxed);
974 }
975
976 fn note_data_write(&self, bytes: usize, at: Instant) {
977 if bytes == 0 {
978 return;
979 }
980 let event = self.event_nanos(at);
981 saturating_add_atomic_u64(&self.sent_data_bytes, usize_to_u64_saturating(bytes));
982 self.last_stream_progress_at.store(event, Ordering::Relaxed);
983 self.last_application_progress_at
984 .store(event, Ordering::Relaxed);
985 }
986
987 fn note_flush(&self, bytes: usize, at: Instant) {
988 let event = self.event_nanos(at);
989 saturating_add_atomic_u64(&self.flush_count, 1);
990 self.last_transport_write_at.store(event, Ordering::Relaxed);
991 self.last_flush_at.store(event, Ordering::Relaxed);
992 self.last_flush_bytes.store(bytes, Ordering::Relaxed);
993 }
994
995 fn note_write_wait(&self, started_at: Instant, completed_at: Instant) {
996 let elapsed = completed_at.saturating_duration_since(started_at);
997 saturating_add_atomic_u64(
998 &self.blocked_write_nanos,
999 duration_nanos_saturating(elapsed),
1000 );
1001 }
1002
1003 fn note_open_latency(&self, started_at: Instant, completed_at: Instant) {
1004 let elapsed = completed_at.saturating_duration_since(started_at);
1005 self.last_open_latency_nanos
1006 .store(duration_nanos_saturating(elapsed), Ordering::Relaxed);
1007 self.last_stream_progress_at
1008 .store(self.event_nanos(completed_at), Ordering::Relaxed);
1009 }
1010
1011 fn event_nanos(&self, at: Instant) -> u64 {
1012 duration_nanos_saturating(at.saturating_duration_since(self.origin)).max(1)
1013 }
1014
1015 fn instant_for_event(&self, nanos: u64) -> Option<Instant> {
1016 if nanos == 0 {
1017 None
1018 } else {
1019 self.origin.checked_add(Duration::from_nanos(nanos))
1020 }
1021 }
1022}
1023
1024impl AdapterReasonStats {
1025 fn note_reset(&mut self, code: u64) {
1026 note_reason(&mut self.reset, &mut self.reset_overflow, code);
1027 }
1028
1029 fn note_abort(&mut self, code: u64) {
1030 note_reason(&mut self.abort, &mut self.abort_overflow, code);
1031 }
1032
1033 fn snapshot(&self) -> zmux::ReasonStats {
1034 zmux::ReasonStats {
1035 reset: self.reset.clone(),
1036 reset_overflow: self.reset_overflow,
1037 abort: self.abort.clone(),
1038 abort_overflow: self.abort_overflow,
1039 }
1040 }
1041}
1042
1043#[derive(Debug, Default)]
1044struct ActiveCounters {
1045 local_bidi: AtomicUsize,
1046 local_uni: AtomicUsize,
1047 peer_bidi: AtomicUsize,
1048 peer_uni: AtomicUsize,
1049}
1050
1051impl ActiveCounters {
1052 fn add(&self, kind: ActiveKind) {
1053 saturating_increment_atomic_usize(self.counter(kind));
1054 }
1055
1056 fn done(&self, kind: ActiveKind) {
1057 let counter = self.counter(kind);
1058 let mut current = counter.load(Ordering::Relaxed);
1059 while current != 0 {
1060 match counter.compare_exchange_weak(
1061 current,
1062 current - 1,
1063 Ordering::Relaxed,
1064 Ordering::Relaxed,
1065 ) {
1066 Ok(_) => return,
1067 Err(next) => current = next,
1068 }
1069 }
1070 }
1071
1072 fn snapshot(&self) -> zmux::ActiveStreamStats {
1073 let local_bidi = usize_to_u64_saturating(self.local_bidi.load(Ordering::Relaxed));
1074 let local_uni = usize_to_u64_saturating(self.local_uni.load(Ordering::Relaxed));
1075 let peer_bidi = usize_to_u64_saturating(self.peer_bidi.load(Ordering::Relaxed));
1076 let peer_uni = usize_to_u64_saturating(self.peer_uni.load(Ordering::Relaxed));
1077 zmux::ActiveStreamStats {
1078 local_bidi,
1079 local_uni,
1080 peer_bidi,
1081 peer_uni,
1082 total: local_bidi
1083 .saturating_add(local_uni)
1084 .saturating_add(peer_bidi)
1085 .saturating_add(peer_uni),
1086 }
1087 }
1088
1089 fn counter(&self, kind: ActiveKind) -> &AtomicUsize {
1090 match kind {
1091 ActiveKind::LocalBidi => &self.local_bidi,
1092 ActiveKind::LocalUni => &self.local_uni,
1093 ActiveKind::PeerBidi => &self.peer_bidi,
1094 ActiveKind::PeerUni => &self.peer_uni,
1095 }
1096 }
1097}
1098
1099#[derive(Debug, Clone, Copy)]
1100enum ActiveKind {
1101 LocalBidi,
1102 LocalUni,
1103 PeerBidi,
1104 PeerUni,
1105}
1106
1107#[derive(Debug)]
1108struct ActiveGuard {
1109 counters: Arc<ActiveCounters>,
1110 kind: ActiveKind,
1111 tracked: AtomicBool,
1112}
1113
1114#[derive(Debug, Default)]
1115struct TerminalErrors {
1116 read: Option<zmux::Error>,
1117 write: Option<zmux::Error>,
1118}
1119
1120impl ActiveGuard {
1121 fn new(counters: Arc<ActiveCounters>, kind: ActiveKind) -> Self {
1122 counters.add(kind);
1123 Self {
1124 counters,
1125 kind,
1126 tracked: AtomicBool::new(true),
1127 }
1128 }
1129
1130 fn finish(&self) {
1131 if self.tracked.swap(false, Ordering::AcqRel) {
1132 self.counters.done(self.kind);
1133 }
1134 }
1135}
1136
1137impl Drop for ActiveGuard {
1138 fn drop(&mut self) {
1139 self.finish();
1140 }
1141}
1142
1143async fn accept_bi_or_shutdown(
1144 conn: &quinn::Connection,
1145 shutdown: &mut watch::Receiver<()>,
1146) -> Option<std::result::Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>> {
1147 tokio::select! {
1148 result = conn.accept_bi() => Some(result),
1149 _ = shutdown.changed() => None,
1150 }
1151}
1152
1153async fn accept_uni_or_shutdown(
1154 conn: &quinn::Connection,
1155 shutdown: &mut watch::Receiver<()>,
1156) -> Option<std::result::Result<quinn::RecvStream, quinn::ConnectionError>> {
1157 tokio::select! {
1158 result = conn.accept_uni() => Some(result),
1159 _ = shutdown.changed() => None,
1160 }
1161}
1162
1163async fn acquire_prepare_permit_or_shutdown(
1164 sem: &Arc<Semaphore>,
1165 shutdown: &mut watch::Receiver<()>,
1166) -> Option<std::result::Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError>> {
1167 let permit = sem.clone().acquire_owned();
1168 tokio::select! {
1169 result = permit => Some(result),
1170 _ = shutdown.changed() => None,
1171 }
1172}
1173
1174async fn prepare_accepted_bidi_stream_or_shutdown(
1175 send: quinn::SendStream,
1176 recv: quinn::RecvStream,
1177 timeout: Option<Duration>,
1178 addresses: AdapterAddresses,
1179 active: Arc<ActiveCounters>,
1180 stats: Arc<AdapterStats>,
1181 shutdown: &mut watch::Receiver<()>,
1182) -> Option<Result<QuinnStream>> {
1183 tokio::select! {
1184 result = prepare_accepted_bidi_stream(send, recv, timeout, addresses, active, stats) => result,
1185 _ = shutdown.changed() => None,
1186 }
1187}
1188
1189async fn prepare_accepted_uni_stream_or_shutdown(
1190 recv: quinn::RecvStream,
1191 timeout: Option<Duration>,
1192 addresses: AdapterAddresses,
1193 active: Arc<ActiveCounters>,
1194 stats: Arc<AdapterStats>,
1195 shutdown: &mut watch::Receiver<()>,
1196) -> Option<Result<QuinnRecvStream>> {
1197 tokio::select! {
1198 result = prepare_accepted_uni_stream(recv, timeout, addresses, active, stats) => result,
1199 _ = shutdown.changed() => None,
1200 }
1201}
1202
1203async fn prepare_accepted_bidi_stream(
1204 mut send: quinn::SendStream,
1205 mut recv: quinn::RecvStream,
1206 timeout: Option<Duration>,
1207 addresses: AdapterAddresses,
1208 active: Arc<ActiveCounters>,
1209 stats: Arc<AdapterStats>,
1210) -> Option<Result<QuinnStream>> {
1211 match read_accepted_metadata_with_timeout(&mut recv, timeout, true).await {
1212 Ok(metadata) => {
1213 stats.note_control_progress();
1214 let stream = QuinnStream::accepted(
1215 send,
1216 recv,
1217 metadata,
1218 stats,
1219 addresses.local_addr,
1220 addresses.peer_addr,
1221 );
1222 Some(Ok(stream.with_active(active, ActiveKind::PeerBidi)))
1223 }
1224 Err(err) if accepted_prelude_rejectable(&err) => {
1225 stats.note_hidden_refused();
1226 let _ = recv.stop(quinn_varint(zmux::ErrorCode::Protocol.as_u64()));
1227 let _ = send.reset(quinn_varint(zmux::ErrorCode::Protocol.as_u64()));
1228 None
1229 }
1230 Err(err) => Some(Err(err)),
1231 }
1232}
1233
1234async fn prepare_accepted_uni_stream(
1235 mut recv: quinn::RecvStream,
1236 timeout: Option<Duration>,
1237 addresses: AdapterAddresses,
1238 active: Arc<ActiveCounters>,
1239 stats: Arc<AdapterStats>,
1240) -> Option<Result<QuinnRecvStream>> {
1241 match read_accepted_metadata_with_timeout(&mut recv, timeout, false).await {
1242 Ok(metadata) => {
1243 stats.note_control_progress();
1244 let stream = QuinnRecvStream::accepted(
1245 recv,
1246 metadata,
1247 stats,
1248 addresses.local_addr,
1249 addresses.peer_addr,
1250 );
1251 Some(Ok(stream.with_active(active, ActiveKind::PeerUni)))
1252 }
1253 Err(err) if accepted_prelude_rejectable(&err) => {
1254 stats.note_hidden_refused();
1255 let _ = recv.stop(quinn_varint(zmux::ErrorCode::Protocol.as_u64()));
1256 None
1257 }
1258 Err(err) => Some(Err(err)),
1259 }
1260}
1261
1262async fn publish_bidi_accept_result(
1263 tx: &mpsc::Sender<Result<QuinnStream>>,
1264 result: Result<QuinnStream>,
1265 conn: &quinn::Connection,
1266) {
1267 let send = tx.send(result);
1268 tokio::pin!(send);
1269 tokio::select! {
1270 biased;
1271 result = &mut send => {
1272 if let Err(err) = result {
1273 if let Ok(stream) = err.0 {
1274 let _ = stream
1275 .close_with_error(zmux::ErrorCode::Cancelled.as_u64(), "")
1276 .await;
1277 }
1278 }
1279 }
1280 _ = conn.closed() => {}
1281 }
1282}
1283
1284async fn publish_uni_accept_result(
1285 tx: &mpsc::Sender<Result<QuinnRecvStream>>,
1286 result: Result<QuinnRecvStream>,
1287 conn: &quinn::Connection,
1288) {
1289 let send = tx.send(result);
1290 tokio::pin!(send);
1291 tokio::select! {
1292 biased;
1293 result = &mut send => {
1294 if let Err(err) = result {
1295 if let Ok(stream) = err.0 {
1296 let _ = stream
1297 .close_with_error(zmux::ErrorCode::Cancelled.as_u64(), "")
1298 .await;
1299 }
1300 }
1301 }
1302 _ = conn.closed() => {}
1303 }
1304}
1305
1306async fn read_accepted_metadata_with_timeout(
1307 recv: &mut quinn::RecvStream,
1308 timeout: Option<Duration>,
1309 bidirectional: bool,
1310) -> Result<AcceptedStreamMetadata> {
1311 let fut = read_accepted_metadata(recv);
1312 match timeout {
1313 Some(timeout) => tokio::time::timeout(timeout, fut).await.map_err(|_| {
1314 zmux::Error::timeout("accept").with_stream_context(
1315 zmux::ErrorOperation::Accept,
1316 accepted_direction(bidirectional),
1317 )
1318 })?,
1319 None => fut.await,
1320 }
1321}
1322
1323fn accepted_direction(bidirectional: bool) -> zmux::ErrorDirection {
1324 if bidirectional {
1325 zmux::ErrorDirection::Both
1326 } else {
1327 zmux::ErrorDirection::Read
1328 }
1329}
1330
1331async fn read_accepted_metadata(recv: &mut quinn::RecvStream) -> Result<AcceptedStreamMetadata> {
1332 read_stream_prelude_quinn(recv).await
1333}
1334
1335fn accepted_prelude_rejectable(err: &zmux::Error) -> bool {
1336 err.scope() == zmux::ErrorScope::Stream
1337 && (err.operation() == zmux::ErrorOperation::Accept
1338 || err.operation() == zmux::ErrorOperation::Read)
1339 && (err.is_timeout()
1340 || err.is_error_code(zmux::ErrorCode::Protocol)
1341 || matches!(
1342 err.termination_kind(),
1343 zmux::TerminationKind::Abort
1344 | zmux::TerminationKind::Reset
1345 | zmux::TerminationKind::Stopped
1346 ))
1347}
1348
1349async fn read_stream_prelude_quinn(recv: &mut quinn::RecvStream) -> Result<AcceptedStreamMetadata> {
1350 let first = read_byte(recv).await?;
1351 let prefix_len = match first >> 6 {
1352 0 => 1usize,
1353 1 => 2,
1354 2 => 4,
1355 _ => 8,
1356 };
1357 let mut prefix = [0u8; 8];
1358 prefix[0] = first;
1359 read_exact_quinn(recv, &mut prefix[1..prefix_len]).await?;
1360 let (metadata_len, _) = zmux::parse_varint(&prefix[..prefix_len])
1361 .map_err(|_| protocol_prelude_error("parse stream prelude length"))?;
1362 if metadata_len == 0 {
1363 return Ok(AcceptedStreamMetadata {
1364 metadata: StreamMetadata::default(),
1365 metadata_valid: true,
1366 });
1367 }
1368 let metadata_len = checked_prelude_metadata_len(metadata_len, prefix_len)?;
1369 let mut payload = vec![0u8; metadata_len];
1370 read_exact_quinn(recv, &mut payload).await?;
1371 let (metadata, metadata_valid) = parse_stream_metadata_bytes_view(&payload)
1372 .map_err(|_| protocol_prelude_error("malformed stream prelude metadata"))?;
1373 Ok(AcceptedStreamMetadata {
1374 metadata: metadata.try_to_owned()?,
1375 metadata_valid,
1376 })
1377}
1378
1379async fn read_byte(recv: &mut quinn::RecvStream) -> Result<u8> {
1380 let mut buf = [0u8; 1];
1381 read_exact_quinn(recv, &mut buf).await?;
1382 Ok(buf[0])
1383}
1384
1385async fn read_exact_quinn(recv: &mut quinn::RecvStream, mut dst: &mut [u8]) -> Result<()> {
1386 while !dst.is_empty() {
1387 let n = recv
1388 .read(dst)
1389 .await
1390 .map_err(translate_read_error)?
1391 .ok_or_else(|| protocol_prelude_error("unexpected EOF in stream prelude"))?;
1392 if n == 0 {
1393 return Err(protocol_prelude_error("zero-length read in stream prelude"));
1394 }
1395 if n > dst.len() {
1396 return Err(protocol_prelude_error(
1397 "stream prelude read reported invalid progress",
1398 ));
1399 }
1400 let (_, rest) = dst.split_at_mut(n);
1401 dst = rest;
1402 }
1403 Ok(())
1404}
1405
1406#[derive(Debug)]
1407struct PreludeState {
1408 send_prelude: bool,
1409 prelude_sent: bool,
1410 prelude: Bytes,
1411 prelude_offset: usize,
1412 metadata: StreamMetadata,
1413}
1414
1415impl PreludeState {
1416 fn local(opts: OpenOptions) -> Result<Self> {
1417 let prelude = build_stream_prelude(&opts)?;
1418 let (priority, group, open_info) = opts.into_parts();
1419 Ok(Self {
1420 send_prelude: true,
1421 prelude_sent: false,
1422 prelude: Bytes::from(prelude),
1423 prelude_offset: 0,
1424 metadata: StreamMetadata {
1425 priority,
1426 group,
1427 open_info,
1428 },
1429 })
1430 }
1431
1432 fn accepted(metadata: AcceptedStreamMetadata) -> Self {
1433 Self {
1434 send_prelude: false,
1435 prelude_sent: true,
1436 prelude: Bytes::new(),
1437 prelude_offset: 0,
1438 metadata: metadata.metadata,
1439 }
1440 }
1441
1442 fn has_peer_visible_open_metadata(&self) -> bool {
1443 self.metadata.priority.is_some()
1444 || self.metadata.group.is_some()
1445 || !self.metadata.open_info.is_empty()
1446 }
1447
1448 fn update_pre_open_metadata(&mut self, update: zmux::MetadataUpdate) -> Result<bool> {
1449 if self.prelude_sent {
1450 return Ok(false);
1451 }
1452 if update.priority.is_none() && update.group.is_none() {
1453 return Err(zmux::Error::local("zmux: metadata update has no fields"));
1454 }
1455 if let Some(priority) = update.priority {
1456 self.metadata.priority = Some(priority);
1457 }
1458 if let Some(group) = update.group {
1459 self.metadata.group = Some(group);
1460 }
1461 let mut opts = OpenOptions::new().open_info(&self.metadata.open_info);
1462 if let Some(priority) = self.metadata.priority {
1463 opts = opts.priority(priority);
1464 }
1465 if let Some(group) = self.metadata.group {
1466 opts = opts.group(group);
1467 }
1468 self.prelude = Bytes::from(build_stream_prelude(&opts)?);
1469 self.prelude_offset = 0;
1470 Ok(true)
1471 }
1472}
1473
1474fn prelude_open_info(prelude: &Mutex<PreludeState>) -> Vec<u8> {
1475 prelude.lock().unwrap().metadata.open_info.clone()
1476}
1477
1478fn append_prelude_open_info_to(prelude: &Mutex<PreludeState>, dst: &mut Vec<u8>) {
1479 dst.extend_from_slice(prelude.lock().unwrap().metadata.open_info());
1480}
1481
1482fn prelude_open_info_len(prelude: &Mutex<PreludeState>) -> usize {
1483 prelude.lock().unwrap().metadata.open_info.len()
1484}
1485
1486fn prelude_has_open_info(prelude: &Mutex<PreludeState>) -> bool {
1487 prelude_open_info_len(prelude) != 0
1488}
1489
1490pub struct QuinnStream {
1491 stream_id: u64,
1492 opened_locally: bool,
1493 local_addr: Option<SocketAddr>,
1494 peer_addr: Option<SocketAddr>,
1495 send: AsyncMutex<quinn::SendStream>,
1496 recv: AsyncMutex<quinn::RecvStream>,
1497 prelude: Mutex<PreludeState>,
1498 read_deadline: Mutex<Option<Instant>>,
1499 write_deadline: Mutex<Option<Instant>>,
1500 terminal: Mutex<TerminalErrors>,
1501 stats: Arc<AdapterStats>,
1502 active: Option<ActiveGuard>,
1503 read_closed: AtomicBool,
1504 write_closed: AtomicBool,
1505}
1506
1507impl QuinnStream {
1508 fn local(
1509 send: quinn::SendStream,
1510 recv: quinn::RecvStream,
1511 prelude: PreludeState,
1512 stats: Arc<AdapterStats>,
1513 local_addr: Option<SocketAddr>,
1514 peer_addr: Option<SocketAddr>,
1515 ) -> Self {
1516 let stream_id = quinn_stream_id(send.id());
1517 Self {
1518 stream_id,
1519 opened_locally: true,
1520 local_addr,
1521 peer_addr,
1522 send: AsyncMutex::new(send),
1523 recv: AsyncMutex::new(recv),
1524 prelude: Mutex::new(prelude),
1525 read_deadline: Mutex::new(None),
1526 write_deadline: Mutex::new(None),
1527 terminal: Mutex::new(TerminalErrors::default()),
1528 stats,
1529 active: None,
1530 read_closed: AtomicBool::new(false),
1531 write_closed: AtomicBool::new(false),
1532 }
1533 }
1534
1535 fn accepted(
1536 send: quinn::SendStream,
1537 recv: quinn::RecvStream,
1538 metadata: AcceptedStreamMetadata,
1539 stats: Arc<AdapterStats>,
1540 local_addr: Option<SocketAddr>,
1541 peer_addr: Option<SocketAddr>,
1542 ) -> Self {
1543 let stream_id = quinn_stream_id(send.id());
1544 Self {
1545 stream_id,
1546 opened_locally: false,
1547 local_addr,
1548 peer_addr,
1549 send: AsyncMutex::new(send),
1550 recv: AsyncMutex::new(recv),
1551 prelude: Mutex::new(PreludeState::accepted(metadata)),
1552 read_deadline: Mutex::new(None),
1553 write_deadline: Mutex::new(None),
1554 terminal: Mutex::new(TerminalErrors::default()),
1555 stats,
1556 active: None,
1557 read_closed: AtomicBool::new(false),
1558 write_closed: AtomicBool::new(false),
1559 }
1560 }
1561
1562 fn with_active(mut self, counters: Arc<ActiveCounters>, kind: ActiveKind) -> Self {
1563 self.active = Some(ActiveGuard::new(counters, kind));
1564 self
1565 }
1566
1567 fn mark_read_closed(&self) {
1568 self.mark_read_closed_with(None);
1569 }
1570
1571 fn mark_read_closed_with(&self, err: Option<zmux::Error>) {
1572 if self.read_closed.load(Ordering::Acquire) {
1573 return;
1574 }
1575 let first = {
1576 let mut terminal = self.terminal.lock().unwrap();
1577 if self.read_closed.load(Ordering::Acquire) {
1578 false
1579 } else {
1580 if let Some(err) = err {
1581 terminal.read = Some(err);
1582 }
1583 self.read_closed.store(true, Ordering::Release);
1584 true
1585 }
1586 };
1587 if first {
1588 self.maybe_finish_active();
1589 }
1590 }
1591
1592 fn mark_write_closed(&self) {
1593 self.mark_write_closed_with(None);
1594 }
1595
1596 fn mark_write_closed_with(&self, err: Option<zmux::Error>) {
1597 if self.write_closed.load(Ordering::Acquire) {
1598 return;
1599 }
1600 let first = {
1601 let mut terminal = self.terminal.lock().unwrap();
1602 if self.write_closed.load(Ordering::Acquire) {
1603 false
1604 } else {
1605 if let Some(err) = err {
1606 terminal.write = Some(err);
1607 }
1608 self.write_closed.store(true, Ordering::Release);
1609 true
1610 }
1611 };
1612 if first {
1613 self.maybe_finish_active();
1614 }
1615 }
1616
1617 fn maybe_finish_active(&self) {
1618 if self.read_closed.load(Ordering::Acquire) && self.write_closed.load(Ordering::Acquire) {
1619 if let Some(active) = &self.active {
1620 active.finish();
1621 }
1622 }
1623 }
1624
1625 pub fn stream_id(&self) -> u64 {
1626 self.stream_id
1627 }
1628
1629 pub fn is_opened_locally(&self) -> bool {
1630 self.opened_locally
1631 }
1632
1633 pub fn is_bidirectional(&self) -> bool {
1634 true
1635 }
1636
1637 pub fn is_read_closed(&self) -> bool {
1638 self.read_closed.load(Ordering::Acquire)
1639 }
1640
1641 pub fn is_write_closed(&self) -> bool {
1642 self.write_closed.load(Ordering::Acquire)
1643 }
1644
1645 pub fn metadata(&self) -> StreamMetadata {
1646 self.prelude.lock().unwrap().metadata.clone()
1647 }
1648
1649 pub fn open_info(&self) -> Vec<u8> {
1650 prelude_open_info(&self.prelude)
1651 }
1652
1653 pub fn append_open_info_to(&self, dst: &mut Vec<u8>) {
1654 append_prelude_open_info_to(&self.prelude, dst);
1655 }
1656
1657 pub fn open_info_len(&self) -> usize {
1658 prelude_open_info_len(&self.prelude)
1659 }
1660
1661 pub fn has_open_info(&self) -> bool {
1662 prelude_has_open_info(&self.prelude)
1663 }
1664
1665 pub fn local_addr(&self) -> Option<SocketAddr> {
1666 self.local_addr
1667 }
1668
1669 pub fn peer_addr(&self) -> Option<SocketAddr> {
1670 self.peer_addr
1671 }
1672
1673 pub fn set_read_deadline(&self, deadline: Option<Instant>) -> Result<()> {
1674 *self.read_deadline.lock().unwrap() = deadline;
1675 Ok(())
1676 }
1677
1678 pub fn set_write_deadline(&self, deadline: Option<Instant>) -> Result<()> {
1679 *self.write_deadline.lock().unwrap() = deadline;
1680 Ok(())
1681 }
1682
1683 pub fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
1684 self.set_read_deadline(deadline)?;
1685 self.set_write_deadline(deadline)
1686 }
1687
1688 pub fn set_read_timeout(&self, timeout: Option<Duration>) -> Result<()> {
1689 self.set_read_deadline(timeout_to_deadline(timeout))
1690 }
1691
1692 pub fn set_write_timeout(&self, timeout: Option<Duration>) -> Result<()> {
1693 self.set_write_deadline(timeout_to_deadline(timeout))
1694 }
1695
1696 pub fn set_timeout(&self, timeout: Option<Duration>) -> Result<()> {
1697 self.set_deadline(timeout_to_deadline(timeout))
1698 }
1699
1700 fn read_timeout_from_deadline(&self) -> Result<Option<Duration>> {
1701 timeout_until(*self.read_deadline.lock().unwrap(), "read")
1702 }
1703
1704 fn write_timeout_from_deadline(&self) -> Result<Option<Duration>> {
1705 timeout_until(*self.write_deadline.lock().unwrap(), "write")
1706 }
1707
1708 fn read_terminal_error(&self) -> zmux::Error {
1709 self.terminal
1710 .lock()
1711 .unwrap()
1712 .read
1713 .clone()
1714 .unwrap_or_else(local_read_closed_error)
1715 }
1716
1717 fn write_terminal_error(&self) -> zmux::Error {
1718 self.terminal
1719 .lock()
1720 .unwrap()
1721 .write
1722 .clone()
1723 .unwrap_or_else(local_write_closed_error)
1724 }
1725
1726 pub async fn update_metadata(&self, update: zmux::MetadataUpdate) -> Result<()> {
1727 let mut send = self.send.lock().await;
1728 let should_flush = {
1729 let mut state = self.prelude.lock().unwrap();
1730 match state.update_pre_open_metadata(update)? {
1731 true => true,
1732 false => return Err(priority_update_unavailable()),
1733 }
1734 };
1735 if should_flush {
1736 if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
1737 self.mark_write_closed_with(Some(err.clone()));
1738 return Err(err);
1739 }
1740 }
1741 Ok(())
1742 }
1743
1744 pub async fn read(&self, dst: &mut [u8]) -> Result<usize> {
1745 let timeout = self.read_timeout_from_deadline()?;
1746 with_optional_timeout(self.read_inner(dst), timeout, "read").await
1747 }
1748
1749 async fn read_inner(&self, dst: &mut [u8]) -> Result<usize> {
1750 if dst.is_empty() {
1751 return Ok(0);
1752 }
1753 if self.read_closed.load(Ordering::Acquire) {
1754 return Err(self.read_terminal_error());
1755 }
1756 let mut recv = self.recv.lock().await;
1757 match recv.read(dst).await {
1758 Ok(Some(n)) => {
1759 self.stats.note_data_read(n, Instant::now());
1760 Ok(n)
1761 }
1762 Ok(None) => {
1763 self.mark_read_closed();
1764 Ok(0)
1765 }
1766 Err(err) => {
1767 if let quinn::ReadError::Reset(code) = &err {
1768 self.stats.note_reset_reason((*code).into_inner());
1769 }
1770 let err = translate_read_error(err);
1771 self.mark_read_closed_with(Some(err.clone()));
1772 Err(err)
1773 }
1774 }
1775 }
1776
1777 pub async fn read_timeout(&self, dst: &mut [u8], timeout: Duration) -> Result<usize> {
1778 with_timeout(self.read(dst), timeout, "read").await
1779 }
1780
1781 pub async fn read_exact(&self, dst: &mut [u8]) -> Result<()> {
1782 let timeout = self.read_timeout_from_deadline()?;
1783 with_optional_timeout(self.read_exact_inner(dst), timeout, "read").await
1784 }
1785
1786 async fn read_exact_inner(&self, mut dst: &mut [u8]) -> Result<()> {
1787 if dst.is_empty() {
1788 return Ok(());
1789 }
1790 if self.read_closed.load(Ordering::Acquire) {
1791 return Err(self.read_terminal_error());
1792 }
1793 let mut recv = self.recv.lock().await;
1794 while !dst.is_empty() {
1795 match recv.read(dst).await {
1796 Ok(Some(n)) => {
1797 if n == 0 {
1798 return Err(unexpected_eof_error());
1799 }
1800 self.stats.note_data_read(n, Instant::now());
1801 let (_, rest) = dst.split_at_mut(n);
1802 dst = rest;
1803 }
1804 Ok(None) => {
1805 self.mark_read_closed();
1806 return Err(unexpected_eof_error());
1807 }
1808 Err(err) => {
1809 if let quinn::ReadError::Reset(code) = &err {
1810 self.stats.note_reset_reason((*code).into_inner());
1811 }
1812 let err = translate_read_error(err);
1813 self.mark_read_closed_with(Some(err.clone()));
1814 return Err(err);
1815 }
1816 }
1817 }
1818 Ok(())
1819 }
1820
1821 pub async fn read_exact_timeout(&self, dst: &mut [u8], timeout: Duration) -> Result<()> {
1822 with_timeout(self.read_exact(dst), timeout, "read").await
1823 }
1824
1825 pub async fn read_vectored(&self, dsts: &mut [IoSliceMut<'_>]) -> Result<usize> {
1826 match dsts.iter_mut().find(|dst| !dst.is_empty()) {
1827 Some(dst) => self.read(dst).await,
1828 None => Ok(0),
1829 }
1830 }
1831
1832 pub async fn read_vectored_timeout(
1833 &self,
1834 dsts: &mut [IoSliceMut<'_>],
1835 timeout: Duration,
1836 ) -> Result<usize> {
1837 with_timeout(self.read_vectored(dsts), timeout, "read").await
1838 }
1839
1840 pub async fn write(&self, src: &[u8]) -> Result<usize> {
1841 let timeout = self.write_timeout_from_deadline()?;
1842 with_optional_timeout(self.write_inner(src), timeout, "write").await
1843 }
1844
1845 async fn write_inner(&self, src: &[u8]) -> Result<usize> {
1846 if src.is_empty() {
1847 return Ok(0);
1848 }
1849 if self.write_closed.load(Ordering::Acquire) {
1850 return Err(self.write_terminal_error());
1851 }
1852 let mut send = self.send.lock().await;
1853 if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
1854 self.mark_write_closed_with(Some(err.clone()));
1855 return Err(err);
1856 }
1857 match write_payload_once(&mut send, src, &self.stats).await {
1858 Ok(n) => Ok(n),
1859 Err(err) => {
1860 self.mark_write_closed_with(Some(err.clone()));
1861 Err(err)
1862 }
1863 }
1864 }
1865
1866 pub async fn write_timeout(&self, src: &[u8], timeout: Duration) -> Result<usize> {
1867 with_timeout(self.write(src), timeout, "write").await
1868 }
1869
1870 pub async fn write_all<'a>(&self, src: impl Into<WritePayload<'a>>) -> Result<()> {
1871 let timeout = self.write_timeout_from_deadline()?;
1872 with_optional_timeout(self.write_all_inner(src.into()), timeout, "write").await
1873 }
1874
1875 pub async fn write_all_timeout<'a>(
1876 &self,
1877 src: impl Into<WritePayload<'a>>,
1878 timeout: Duration,
1879 ) -> Result<()> {
1880 with_timeout(self.write_all(src), timeout, "write").await
1881 }
1882
1883 async fn write_all_inner(&self, payload: WritePayload<'_>) -> Result<()> {
1884 match payload {
1885 WritePayload::Bytes(data) => self.write_all_bytes_inner(data.as_ref()).await,
1886 WritePayload::Vectored(parts) => {
1887 for part in parts {
1888 if !part.is_empty() {
1889 self.write_all_bytes_inner(part.as_ref()).await?;
1890 }
1891 }
1892 Ok(())
1893 }
1894 }
1895 }
1896
1897 async fn write_all_bytes_inner(&self, src: &[u8]) -> Result<()> {
1898 if src.is_empty() {
1899 return Ok(());
1900 }
1901 if self.write_closed.load(Ordering::Acquire) {
1902 return Err(self.write_terminal_error());
1903 }
1904 let mut send = self.send.lock().await;
1905 let result = async {
1906 ensure_open_prelude(&self.prelude, &mut send, &self.stats).await?;
1907 write_payload_all(&mut send, src, &self.stats).await
1908 }
1909 .await;
1910 if result.is_err() {
1911 self.mark_write_closed_with(result.as_ref().err().cloned());
1912 }
1913 result
1914 }
1915
1916 pub async fn write_vectored(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
1917 let timeout = self.write_timeout_from_deadline()?;
1918 with_optional_timeout(self.write_vectored_inner(parts), timeout, "write").await
1919 }
1920
1921 async fn write_vectored_inner(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
1922 let total = total_bytes(parts.iter().map(|part| part.len()))?;
1923 if total == 0 {
1924 return Ok(0);
1925 }
1926 if self.write_closed.load(Ordering::Acquire) {
1927 return Err(self.write_terminal_error());
1928 }
1929 let mut send = self.send.lock().await;
1930 let result = async {
1931 ensure_open_prelude(&self.prelude, &mut send, &self.stats).await?;
1932 write_io_slices_once(&mut send, parts, total, &self.stats).await
1933 }
1934 .await;
1935 if let Err(err) = &result {
1936 self.mark_write_closed_with(Some(err.clone()));
1937 }
1938 result
1939 }
1940
1941 pub async fn write_vectored_timeout(
1942 &self,
1943 parts: &[IoSlice<'_>],
1944 timeout: Duration,
1945 ) -> Result<usize> {
1946 with_timeout(self.write_vectored(parts), timeout, "write").await
1947 }
1948
1949 pub async fn write_final<'a>(&self, src: impl Into<WritePayload<'a>>) -> Result<usize> {
1950 let timeout = self.write_timeout_from_deadline()?;
1951 with_optional_timeout(self.write_final_inner(src.into()), timeout, "write").await
1952 }
1953
1954 async fn write_final_inner(&self, payload: WritePayload<'_>) -> Result<usize> {
1955 match payload {
1956 WritePayload::Bytes(data) => self.write_final_bytes_inner(data.as_ref()).await,
1957 WritePayload::Vectored(parts) => self.write_vectored_final_inner(parts).await,
1958 }
1959 }
1960
1961 async fn write_final_bytes_inner(&self, src: &[u8]) -> Result<usize> {
1962 if self.write_closed.load(Ordering::Acquire) {
1963 return Err(self.write_terminal_error());
1964 }
1965 let mut send = self.send.lock().await;
1966 match write_all_final(&self.prelude, &mut send, src, &self.stats).await {
1967 Ok(n) => {
1968 self.mark_write_closed();
1969 Ok(n)
1970 }
1971 Err(err) => {
1972 self.mark_write_closed_with(Some(err.clone()));
1973 Err(err)
1974 }
1975 }
1976 }
1977
1978 pub async fn write_final_timeout<'a>(
1979 &self,
1980 src: impl Into<WritePayload<'a>>,
1981 timeout: Duration,
1982 ) -> Result<usize> {
1983 with_timeout(self.write_final(src), timeout, "write").await
1984 }
1985
1986 pub async fn write_vectored_final(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
1987 let timeout = self.write_timeout_from_deadline()?;
1988 with_optional_timeout(self.write_vectored_final_inner(parts), timeout, "write").await
1989 }
1990
1991 async fn write_vectored_final_inner(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
1992 if self.write_closed.load(Ordering::Acquire) {
1993 return Err(self.write_terminal_error());
1994 }
1995 let mut send = self.send.lock().await;
1996 match write_io_slices_final(&self.prelude, &mut send, parts, &self.stats).await {
1997 Ok(n) => {
1998 self.mark_write_closed();
1999 Ok(n)
2000 }
2001 Err(err) => {
2002 self.mark_write_closed_with(Some(err.clone()));
2003 Err(err)
2004 }
2005 }
2006 }
2007
2008 pub async fn write_vectored_final_timeout(
2009 &self,
2010 parts: &[IoSlice<'_>],
2011 timeout: Duration,
2012 ) -> Result<usize> {
2013 with_timeout(self.write_vectored_final(parts), timeout, "write").await
2014 }
2015
2016 pub async fn close_read(&self) -> Result<()> {
2017 self.cancel_read(zmux::ErrorCode::Cancelled.as_u64()).await
2018 }
2019
2020 pub async fn cancel_read(&self, code: u64) -> Result<()> {
2021 if self.read_closed.load(Ordering::Acquire) {
2022 return Err(self.read_terminal_error());
2023 }
2024 let code =
2025 checked_quinn_varint(code, zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)?;
2026 self.cancel_read_varint(code).await
2027 }
2028
2029 async fn cancel_read_varint(&self, code: quinn::VarInt) -> Result<()> {
2030 let mut send = self.send.lock().await;
2031 if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2032 self.mark_write_closed_with(Some(err.clone()));
2033 return Err(err);
2034 }
2035 drop(send);
2036 let mut recv = self.recv.lock().await;
2037 let result = recv.stop(code).map_err(translate_read_closed_stream);
2038 drop(recv);
2039 if result.is_ok() {
2040 self.stats.note_control_progress();
2041 }
2042 self.mark_read_closed();
2043 result
2044 }
2045
2046 pub async fn close_write(&self) -> Result<()> {
2047 let timeout = self.write_timeout_from_deadline()?;
2048 with_optional_timeout(self.close_write_inner(), timeout, "write").await
2049 }
2050
2051 async fn close_write_inner(&self) -> Result<()> {
2052 if self.write_closed.load(Ordering::Acquire) {
2053 return Err(self.write_terminal_error());
2054 }
2055 let mut send = self.send.lock().await;
2056 if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2057 self.mark_write_closed_with(Some(err.clone()));
2058 return Err(err);
2059 }
2060 let result = finish_send(&mut send, &self.stats).await;
2061 drop(send);
2062 self.mark_write_closed_with(result.as_ref().err().cloned());
2063 result
2064 }
2065
2066 pub async fn cancel_write(&self, code: u64) -> Result<()> {
2067 if self.write_closed.load(Ordering::Acquire) {
2068 return Err(self.write_terminal_error());
2069 }
2070 let code = checked_quinn_varint(
2071 code,
2072 zmux::ErrorOperation::Write,
2073 zmux::ErrorDirection::Write,
2074 )?;
2075 self.cancel_write_varint(code).await
2076 }
2077
2078 async fn cancel_write_varint(&self, code: quinn::VarInt) -> Result<()> {
2079 if self.write_closed.load(Ordering::Acquire) {
2080 return Err(self.write_terminal_error());
2081 }
2082 let mut send = self.send.lock().await;
2083 let result = send.reset(code).map_err(translate_write_closed_stream);
2084 drop(send);
2085 if result.is_ok() {
2086 self.stats.note_control_progress();
2087 self.stats.note_reset_reason(code.into_inner());
2088 }
2089 self.mark_write_closed_with(Some(local_stream_application_error(
2090 code.into_inner(),
2091 "",
2092 zmux::ErrorOperation::Write,
2093 zmux::ErrorDirection::Write,
2094 zmux::TerminationKind::Reset,
2095 )));
2096 result
2097 }
2098
2099 pub async fn close_with_error(&self, code: u64, reason: &str) -> Result<()> {
2100 if self.read_closed.load(Ordering::Acquire) && self.write_closed.load(Ordering::Acquire) {
2101 return Err(self.write_terminal_error());
2102 }
2103 let code = checked_quinn_varint(
2104 code,
2105 zmux::ErrorOperation::Close,
2106 zmux::ErrorDirection::Both,
2107 )?;
2108 let terminal = local_stream_application_error(
2109 code.into_inner(),
2110 reason,
2111 zmux::ErrorOperation::Close,
2112 zmux::ErrorDirection::Both,
2113 zmux::TerminationKind::Abort,
2114 );
2115 {
2116 let mut recv = self.recv.lock().await;
2117 let _ = recv.stop(code);
2118 }
2119 {
2120 let mut send = self.send.lock().await;
2121 let _ = send.reset(code);
2122 }
2123 self.stats.note_control_progress();
2124 self.stats.note_abort_reason(code.into_inner());
2125 self.mark_read_closed_with(Some(terminal.clone()));
2126 self.mark_write_closed_with(Some(terminal));
2127 Ok(())
2128 }
2129
2130 pub async fn close(&self) -> Result<()> {
2131 let write = if self.write_closed.load(Ordering::Acquire) {
2132 Ok(())
2133 } else {
2134 self.close_write().await
2135 };
2136 let read = if self.read_closed.load(Ordering::Acquire) {
2137 Ok(())
2138 } else {
2139 self.close_read().await
2140 };
2141 write.and(read)
2142 }
2143
2144 async fn maybe_send_open_prelude_on_open(&self) -> Result<()> {
2145 if !self
2146 .prelude
2147 .lock()
2148 .unwrap()
2149 .has_peer_visible_open_metadata()
2150 {
2151 return Ok(());
2152 }
2153 let mut send = self.send.lock().await;
2154 ensure_open_prelude(&self.prelude, &mut send, &self.stats).await
2155 }
2156
2157 async fn discard_after_open_error(&self, err: &zmux::Error) {
2158 let code = open_error_cleanup_code(err);
2159 {
2160 let mut recv = self.recv.lock().await;
2161 let _ = recv.stop(code);
2162 }
2163 {
2164 let mut send = self.send.lock().await;
2165 let _ = send.reset(code);
2166 }
2167 self.mark_read_closed();
2168 self.mark_write_closed();
2169 }
2170}
2171
2172pub struct QuinnSendStream {
2173 stream_id: u64,
2174 local_addr: Option<SocketAddr>,
2175 peer_addr: Option<SocketAddr>,
2176 send: AsyncMutex<quinn::SendStream>,
2177 prelude: Mutex<PreludeState>,
2178 write_deadline: Mutex<Option<Instant>>,
2179 terminal: Mutex<TerminalErrors>,
2180 stats: Arc<AdapterStats>,
2181 active: Option<ActiveGuard>,
2182 write_closed: AtomicBool,
2183}
2184
2185impl QuinnSendStream {
2186 fn local(
2187 send: quinn::SendStream,
2188 prelude: PreludeState,
2189 stats: Arc<AdapterStats>,
2190 local_addr: Option<SocketAddr>,
2191 peer_addr: Option<SocketAddr>,
2192 ) -> Self {
2193 let stream_id = quinn_stream_id(send.id());
2194 Self {
2195 stream_id,
2196 local_addr,
2197 peer_addr,
2198 send: AsyncMutex::new(send),
2199 prelude: Mutex::new(prelude),
2200 write_deadline: Mutex::new(None),
2201 terminal: Mutex::new(TerminalErrors::default()),
2202 stats,
2203 active: None,
2204 write_closed: AtomicBool::new(false),
2205 }
2206 }
2207
2208 fn with_active(mut self, counters: Arc<ActiveCounters>, kind: ActiveKind) -> Self {
2209 self.active = Some(ActiveGuard::new(counters, kind));
2210 self
2211 }
2212
2213 fn mark_write_closed(&self) {
2214 self.mark_write_closed_with(None);
2215 }
2216
2217 fn mark_write_closed_with(&self, err: Option<zmux::Error>) {
2218 if self.write_closed.load(Ordering::Acquire) {
2219 return;
2220 }
2221 let first = {
2222 let mut terminal = self.terminal.lock().unwrap();
2223 if self.write_closed.load(Ordering::Acquire) {
2224 false
2225 } else {
2226 if let Some(err) = err {
2227 terminal.write = Some(err);
2228 }
2229 self.write_closed.store(true, Ordering::Release);
2230 true
2231 }
2232 };
2233 if first {
2234 if let Some(active) = &self.active {
2235 active.finish();
2236 }
2237 }
2238 }
2239
2240 pub fn stream_id(&self) -> u64 {
2241 self.stream_id
2242 }
2243
2244 pub fn is_opened_locally(&self) -> bool {
2245 true
2246 }
2247
2248 pub fn is_bidirectional(&self) -> bool {
2249 false
2250 }
2251
2252 pub fn is_write_closed(&self) -> bool {
2253 self.write_closed.load(Ordering::Acquire)
2254 }
2255
2256 pub fn metadata(&self) -> StreamMetadata {
2257 self.prelude.lock().unwrap().metadata.clone()
2258 }
2259
2260 pub fn open_info(&self) -> Vec<u8> {
2261 prelude_open_info(&self.prelude)
2262 }
2263
2264 pub fn append_open_info_to(&self, dst: &mut Vec<u8>) {
2265 append_prelude_open_info_to(&self.prelude, dst);
2266 }
2267
2268 pub fn open_info_len(&self) -> usize {
2269 prelude_open_info_len(&self.prelude)
2270 }
2271
2272 pub fn has_open_info(&self) -> bool {
2273 prelude_has_open_info(&self.prelude)
2274 }
2275
2276 pub fn local_addr(&self) -> Option<SocketAddr> {
2277 self.local_addr
2278 }
2279
2280 pub fn peer_addr(&self) -> Option<SocketAddr> {
2281 self.peer_addr
2282 }
2283
2284 pub fn set_write_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2285 *self.write_deadline.lock().unwrap() = deadline;
2286 Ok(())
2287 }
2288
2289 pub fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2290 self.set_write_deadline(deadline)
2291 }
2292
2293 pub fn set_write_timeout(&self, timeout: Option<Duration>) -> Result<()> {
2294 self.set_write_deadline(timeout_to_deadline(timeout))
2295 }
2296
2297 pub fn set_timeout(&self, timeout: Option<Duration>) -> Result<()> {
2298 self.set_deadline(timeout_to_deadline(timeout))
2299 }
2300
2301 fn write_timeout_from_deadline(&self) -> Result<Option<Duration>> {
2302 timeout_until(*self.write_deadline.lock().unwrap(), "write")
2303 }
2304
2305 fn write_terminal_error(&self) -> zmux::Error {
2306 self.terminal
2307 .lock()
2308 .unwrap()
2309 .write
2310 .clone()
2311 .unwrap_or_else(local_write_closed_error)
2312 }
2313
2314 pub async fn update_metadata(&self, update: zmux::MetadataUpdate) -> Result<()> {
2315 let mut send = self.send.lock().await;
2316 let should_flush = {
2317 let mut state = self.prelude.lock().unwrap();
2318 match state.update_pre_open_metadata(update)? {
2319 true => true,
2320 false => return Err(priority_update_unavailable()),
2321 }
2322 };
2323 if should_flush {
2324 if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2325 self.mark_write_closed_with(Some(err.clone()));
2326 return Err(err);
2327 }
2328 }
2329 Ok(())
2330 }
2331
2332 pub async fn write(&self, src: &[u8]) -> Result<usize> {
2333 let timeout = self.write_timeout_from_deadline()?;
2334 with_optional_timeout(self.write_inner(src), timeout, "write").await
2335 }
2336
2337 async fn write_inner(&self, src: &[u8]) -> Result<usize> {
2338 if src.is_empty() {
2339 return Ok(0);
2340 }
2341 if self.write_closed.load(Ordering::Acquire) {
2342 return Err(self.write_terminal_error());
2343 }
2344 let mut send = self.send.lock().await;
2345 if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2346 self.mark_write_closed_with(Some(err.clone()));
2347 return Err(err);
2348 }
2349 match write_payload_once(&mut send, src, &self.stats).await {
2350 Ok(n) => Ok(n),
2351 Err(err) => {
2352 self.mark_write_closed_with(Some(err.clone()));
2353 Err(err)
2354 }
2355 }
2356 }
2357
2358 pub async fn write_timeout(&self, src: &[u8], timeout: Duration) -> Result<usize> {
2359 with_timeout(self.write(src), timeout, "write").await
2360 }
2361
2362 pub async fn write_all<'a>(&self, src: impl Into<WritePayload<'a>>) -> Result<()> {
2363 let timeout = self.write_timeout_from_deadline()?;
2364 with_optional_timeout(self.write_all_inner(src.into()), timeout, "write").await
2365 }
2366
2367 pub async fn write_all_timeout<'a>(
2368 &self,
2369 src: impl Into<WritePayload<'a>>,
2370 timeout: Duration,
2371 ) -> Result<()> {
2372 with_timeout(self.write_all(src), timeout, "write").await
2373 }
2374
2375 async fn write_all_inner(&self, payload: WritePayload<'_>) -> Result<()> {
2376 match payload {
2377 WritePayload::Bytes(data) => self.write_all_bytes_inner(data.as_ref()).await,
2378 WritePayload::Vectored(parts) => {
2379 for part in parts {
2380 if !part.is_empty() {
2381 self.write_all_bytes_inner(part.as_ref()).await?;
2382 }
2383 }
2384 Ok(())
2385 }
2386 }
2387 }
2388
2389 async fn write_all_bytes_inner(&self, src: &[u8]) -> Result<()> {
2390 if src.is_empty() {
2391 return Ok(());
2392 }
2393 if self.write_closed.load(Ordering::Acquire) {
2394 return Err(self.write_terminal_error());
2395 }
2396 let mut send = self.send.lock().await;
2397 let result = async {
2398 ensure_open_prelude(&self.prelude, &mut send, &self.stats).await?;
2399 write_payload_all(&mut send, src, &self.stats).await
2400 }
2401 .await;
2402 if result.is_err() {
2403 self.mark_write_closed_with(result.as_ref().err().cloned());
2404 }
2405 result
2406 }
2407
2408 pub async fn write_vectored(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
2409 let timeout = self.write_timeout_from_deadline()?;
2410 with_optional_timeout(self.write_vectored_inner(parts), timeout, "write").await
2411 }
2412
2413 async fn write_vectored_inner(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
2414 let total = total_bytes(parts.iter().map(|part| part.len()))?;
2415 if total == 0 {
2416 return Ok(0);
2417 }
2418 if self.write_closed.load(Ordering::Acquire) {
2419 return Err(self.write_terminal_error());
2420 }
2421 let mut send = self.send.lock().await;
2422 let result = async {
2423 ensure_open_prelude(&self.prelude, &mut send, &self.stats).await?;
2424 write_io_slices_once(&mut send, parts, total, &self.stats).await
2425 }
2426 .await;
2427 if let Err(err) = &result {
2428 self.mark_write_closed_with(Some(err.clone()));
2429 }
2430 result
2431 }
2432
2433 pub async fn write_vectored_timeout(
2434 &self,
2435 parts: &[IoSlice<'_>],
2436 timeout: Duration,
2437 ) -> Result<usize> {
2438 with_timeout(self.write_vectored(parts), timeout, "write").await
2439 }
2440
2441 pub async fn write_final<'a>(&self, src: impl Into<WritePayload<'a>>) -> Result<usize> {
2442 let timeout = self.write_timeout_from_deadline()?;
2443 with_optional_timeout(self.write_final_inner(src.into()), timeout, "write").await
2444 }
2445
2446 async fn write_final_inner(&self, payload: WritePayload<'_>) -> Result<usize> {
2447 match payload {
2448 WritePayload::Bytes(data) => self.write_final_bytes_inner(data.as_ref()).await,
2449 WritePayload::Vectored(parts) => self.write_vectored_final_inner(parts).await,
2450 }
2451 }
2452
2453 async fn write_final_bytes_inner(&self, src: &[u8]) -> Result<usize> {
2454 if self.write_closed.load(Ordering::Acquire) {
2455 return Err(self.write_terminal_error());
2456 }
2457 let mut send = self.send.lock().await;
2458 match write_all_final(&self.prelude, &mut send, src, &self.stats).await {
2459 Ok(n) => {
2460 self.mark_write_closed();
2461 Ok(n)
2462 }
2463 Err(err) => {
2464 self.mark_write_closed_with(Some(err.clone()));
2465 Err(err)
2466 }
2467 }
2468 }
2469
2470 pub async fn write_final_timeout<'a>(
2471 &self,
2472 src: impl Into<WritePayload<'a>>,
2473 timeout: Duration,
2474 ) -> Result<usize> {
2475 with_timeout(self.write_final(src), timeout, "write").await
2476 }
2477
2478 pub async fn write_vectored_final(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
2479 let timeout = self.write_timeout_from_deadline()?;
2480 with_optional_timeout(self.write_vectored_final_inner(parts), timeout, "write").await
2481 }
2482
2483 async fn write_vectored_final_inner(&self, parts: &[IoSlice<'_>]) -> Result<usize> {
2484 if self.write_closed.load(Ordering::Acquire) {
2485 return Err(self.write_terminal_error());
2486 }
2487 let mut send = self.send.lock().await;
2488 match write_io_slices_final(&self.prelude, &mut send, parts, &self.stats).await {
2489 Ok(n) => {
2490 self.mark_write_closed();
2491 Ok(n)
2492 }
2493 Err(err) => {
2494 self.mark_write_closed_with(Some(err.clone()));
2495 Err(err)
2496 }
2497 }
2498 }
2499
2500 pub async fn write_vectored_final_timeout(
2501 &self,
2502 parts: &[IoSlice<'_>],
2503 timeout: Duration,
2504 ) -> Result<usize> {
2505 with_timeout(self.write_vectored_final(parts), timeout, "write").await
2506 }
2507
2508 pub async fn close_write(&self) -> Result<()> {
2509 let timeout = self.write_timeout_from_deadline()?;
2510 with_optional_timeout(self.close_write_inner(), timeout, "write").await
2511 }
2512
2513 async fn close_write_inner(&self) -> Result<()> {
2514 if self.write_closed.load(Ordering::Acquire) {
2515 return Err(self.write_terminal_error());
2516 }
2517 let mut send = self.send.lock().await;
2518 if let Err(err) = ensure_open_prelude(&self.prelude, &mut send, &self.stats).await {
2519 self.mark_write_closed_with(Some(err.clone()));
2520 return Err(err);
2521 }
2522 let result = finish_send(&mut send, &self.stats).await;
2523 drop(send);
2524 self.mark_write_closed_with(result.as_ref().err().cloned());
2525 result
2526 }
2527
2528 pub async fn cancel_write(&self, code: u64) -> Result<()> {
2529 if self.write_closed.load(Ordering::Acquire) {
2530 return Err(self.write_terminal_error());
2531 }
2532 let code = checked_quinn_varint(
2533 code,
2534 zmux::ErrorOperation::Write,
2535 zmux::ErrorDirection::Write,
2536 )?;
2537 self.cancel_write_varint(code).await
2538 }
2539
2540 async fn cancel_write_varint(&self, code: quinn::VarInt) -> Result<()> {
2541 if self.write_closed.load(Ordering::Acquire) {
2542 return Err(self.write_terminal_error());
2543 }
2544 let mut send = self.send.lock().await;
2545 let result = send.reset(code).map_err(translate_write_closed_stream);
2546 drop(send);
2547 if result.is_ok() {
2548 self.stats.note_control_progress();
2549 self.stats.note_reset_reason(code.into_inner());
2550 }
2551 self.mark_write_closed_with(Some(local_stream_application_error(
2552 code.into_inner(),
2553 "",
2554 zmux::ErrorOperation::Write,
2555 zmux::ErrorDirection::Write,
2556 zmux::TerminationKind::Reset,
2557 )));
2558 result
2559 }
2560
2561 pub async fn close_with_error(&self, code: u64, reason: &str) -> Result<()> {
2562 if self.write_closed.load(Ordering::Acquire) {
2563 return Err(self.write_terminal_error());
2564 }
2565 let code = checked_quinn_varint(
2566 code,
2567 zmux::ErrorOperation::Close,
2568 zmux::ErrorDirection::Write,
2569 )?;
2570 let terminal = local_stream_application_error(
2571 code.into_inner(),
2572 reason,
2573 zmux::ErrorOperation::Close,
2574 zmux::ErrorDirection::Write,
2575 zmux::TerminationKind::Abort,
2576 );
2577 let mut send = self.send.lock().await;
2578 let _ = send.reset(code);
2579 drop(send);
2580 self.stats.note_control_progress();
2581 self.stats.note_abort_reason(code.into_inner());
2582 self.mark_write_closed_with(Some(terminal));
2583 Ok(())
2584 }
2585
2586 pub async fn close(&self) -> Result<()> {
2587 if self.write_closed.load(Ordering::Acquire) {
2588 Ok(())
2589 } else {
2590 self.close_write().await
2591 }
2592 }
2593
2594 async fn maybe_send_open_prelude_on_open(&self) -> Result<()> {
2595 if !self
2596 .prelude
2597 .lock()
2598 .unwrap()
2599 .has_peer_visible_open_metadata()
2600 {
2601 return Ok(());
2602 }
2603 let mut send = self.send.lock().await;
2604 ensure_open_prelude(&self.prelude, &mut send, &self.stats).await
2605 }
2606
2607 async fn discard_after_open_error(&self, err: &zmux::Error) {
2608 let code = open_error_cleanup_code(err);
2609 let mut send = self.send.lock().await;
2610 let _ = send.reset(code);
2611 drop(send);
2612 self.mark_write_closed();
2613 }
2614}
2615
2616pub struct QuinnRecvStream {
2617 stream_id: u64,
2618 local_addr: Option<SocketAddr>,
2619 peer_addr: Option<SocketAddr>,
2620 recv: AsyncMutex<quinn::RecvStream>,
2621 prelude: Mutex<PreludeState>,
2622 read_deadline: Mutex<Option<Instant>>,
2623 terminal: Mutex<TerminalErrors>,
2624 stats: Arc<AdapterStats>,
2625 active: Option<ActiveGuard>,
2626 read_closed: AtomicBool,
2627}
2628
2629impl QuinnRecvStream {
2630 fn accepted(
2631 recv: quinn::RecvStream,
2632 metadata: AcceptedStreamMetadata,
2633 stats: Arc<AdapterStats>,
2634 local_addr: Option<SocketAddr>,
2635 peer_addr: Option<SocketAddr>,
2636 ) -> Self {
2637 let stream_id = quinn_stream_id(recv.id());
2638 Self {
2639 stream_id,
2640 local_addr,
2641 peer_addr,
2642 recv: AsyncMutex::new(recv),
2643 prelude: Mutex::new(PreludeState::accepted(metadata)),
2644 read_deadline: Mutex::new(None),
2645 terminal: Mutex::new(TerminalErrors::default()),
2646 stats,
2647 active: None,
2648 read_closed: AtomicBool::new(false),
2649 }
2650 }
2651
2652 fn with_active(mut self, counters: Arc<ActiveCounters>, kind: ActiveKind) -> Self {
2653 self.active = Some(ActiveGuard::new(counters, kind));
2654 self
2655 }
2656
2657 fn mark_read_closed(&self) {
2658 self.mark_read_closed_with(None);
2659 }
2660
2661 fn mark_read_closed_with(&self, err: Option<zmux::Error>) {
2662 if self.read_closed.load(Ordering::Acquire) {
2663 return;
2664 }
2665 let first = {
2666 let mut terminal = self.terminal.lock().unwrap();
2667 if self.read_closed.load(Ordering::Acquire) {
2668 false
2669 } else {
2670 if let Some(err) = err {
2671 terminal.read = Some(err);
2672 }
2673 self.read_closed.store(true, Ordering::Release);
2674 true
2675 }
2676 };
2677 if first {
2678 if let Some(active) = &self.active {
2679 active.finish();
2680 }
2681 }
2682 }
2683
2684 pub fn stream_id(&self) -> u64 {
2685 self.stream_id
2686 }
2687
2688 pub fn is_opened_locally(&self) -> bool {
2689 false
2690 }
2691
2692 pub fn is_bidirectional(&self) -> bool {
2693 false
2694 }
2695
2696 pub fn is_read_closed(&self) -> bool {
2697 self.read_closed.load(Ordering::Acquire)
2698 }
2699
2700 pub fn metadata(&self) -> StreamMetadata {
2701 self.prelude.lock().unwrap().metadata.clone()
2702 }
2703
2704 pub fn open_info(&self) -> Vec<u8> {
2705 prelude_open_info(&self.prelude)
2706 }
2707
2708 pub fn append_open_info_to(&self, dst: &mut Vec<u8>) {
2709 append_prelude_open_info_to(&self.prelude, dst);
2710 }
2711
2712 pub fn open_info_len(&self) -> usize {
2713 prelude_open_info_len(&self.prelude)
2714 }
2715
2716 pub fn has_open_info(&self) -> bool {
2717 prelude_has_open_info(&self.prelude)
2718 }
2719
2720 pub fn local_addr(&self) -> Option<SocketAddr> {
2721 self.local_addr
2722 }
2723
2724 pub fn peer_addr(&self) -> Option<SocketAddr> {
2725 self.peer_addr
2726 }
2727
2728 pub fn set_read_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2729 *self.read_deadline.lock().unwrap() = deadline;
2730 Ok(())
2731 }
2732
2733 pub fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2734 self.set_read_deadline(deadline)
2735 }
2736
2737 pub fn set_read_timeout(&self, timeout: Option<Duration>) -> Result<()> {
2738 self.set_read_deadline(timeout_to_deadline(timeout))
2739 }
2740
2741 pub fn set_timeout(&self, timeout: Option<Duration>) -> Result<()> {
2742 self.set_deadline(timeout_to_deadline(timeout))
2743 }
2744
2745 fn read_timeout_from_deadline(&self) -> Result<Option<Duration>> {
2746 timeout_until(*self.read_deadline.lock().unwrap(), "read")
2747 }
2748
2749 fn read_terminal_error(&self) -> zmux::Error {
2750 self.terminal
2751 .lock()
2752 .unwrap()
2753 .read
2754 .clone()
2755 .unwrap_or_else(local_read_closed_error)
2756 }
2757
2758 pub async fn read(&self, dst: &mut [u8]) -> Result<usize> {
2759 let timeout = self.read_timeout_from_deadline()?;
2760 with_optional_timeout(self.read_inner(dst), timeout, "read").await
2761 }
2762
2763 async fn read_inner(&self, dst: &mut [u8]) -> Result<usize> {
2764 if dst.is_empty() {
2765 return Ok(0);
2766 }
2767 if self.read_closed.load(Ordering::Acquire) {
2768 return Err(self.read_terminal_error());
2769 }
2770 let mut recv = self.recv.lock().await;
2771 match recv.read(dst).await {
2772 Ok(Some(n)) => {
2773 self.stats.note_data_read(n, Instant::now());
2774 Ok(n)
2775 }
2776 Ok(None) => {
2777 self.mark_read_closed();
2778 Ok(0)
2779 }
2780 Err(err) => {
2781 if let quinn::ReadError::Reset(code) = &err {
2782 self.stats.note_reset_reason((*code).into_inner());
2783 }
2784 let err = translate_read_error(err);
2785 self.mark_read_closed_with(Some(err.clone()));
2786 Err(err)
2787 }
2788 }
2789 }
2790
2791 pub async fn read_timeout(&self, dst: &mut [u8], timeout: Duration) -> Result<usize> {
2792 with_timeout(self.read(dst), timeout, "read").await
2793 }
2794
2795 pub async fn read_exact(&self, dst: &mut [u8]) -> Result<()> {
2796 let timeout = self.read_timeout_from_deadline()?;
2797 with_optional_timeout(self.read_exact_inner(dst), timeout, "read").await
2798 }
2799
2800 async fn read_exact_inner(&self, mut dst: &mut [u8]) -> Result<()> {
2801 if dst.is_empty() {
2802 return Ok(());
2803 }
2804 if self.read_closed.load(Ordering::Acquire) {
2805 return Err(self.read_terminal_error());
2806 }
2807 let mut recv = self.recv.lock().await;
2808 while !dst.is_empty() {
2809 match recv.read(dst).await {
2810 Ok(Some(n)) => {
2811 if n == 0 {
2812 return Err(unexpected_eof_error());
2813 }
2814 self.stats.note_data_read(n, Instant::now());
2815 let (_, rest) = dst.split_at_mut(n);
2816 dst = rest;
2817 }
2818 Ok(None) => {
2819 self.mark_read_closed();
2820 return Err(unexpected_eof_error());
2821 }
2822 Err(err) => {
2823 if let quinn::ReadError::Reset(code) = &err {
2824 self.stats.note_reset_reason((*code).into_inner());
2825 }
2826 let err = translate_read_error(err);
2827 self.mark_read_closed_with(Some(err.clone()));
2828 return Err(err);
2829 }
2830 }
2831 }
2832 Ok(())
2833 }
2834
2835 pub async fn read_exact_timeout(&self, dst: &mut [u8], timeout: Duration) -> Result<()> {
2836 with_timeout(self.read_exact(dst), timeout, "read").await
2837 }
2838
2839 pub async fn read_vectored(&self, dsts: &mut [IoSliceMut<'_>]) -> Result<usize> {
2840 match dsts.iter_mut().find(|dst| !dst.is_empty()) {
2841 Some(dst) => self.read(dst).await,
2842 None => Ok(0),
2843 }
2844 }
2845
2846 pub async fn read_vectored_timeout(
2847 &self,
2848 dsts: &mut [IoSliceMut<'_>],
2849 timeout: Duration,
2850 ) -> Result<usize> {
2851 with_timeout(self.read_vectored(dsts), timeout, "read").await
2852 }
2853
2854 pub async fn close_read(&self) -> Result<()> {
2855 self.cancel_read(zmux::ErrorCode::Cancelled.as_u64()).await
2856 }
2857
2858 pub async fn cancel_read(&self, code: u64) -> Result<()> {
2859 if self.read_closed.load(Ordering::Acquire) {
2860 return Err(self.read_terminal_error());
2861 }
2862 let code =
2863 checked_quinn_varint(code, zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)?;
2864 self.cancel_read_varint(code).await
2865 }
2866
2867 async fn cancel_read_varint(&self, code: quinn::VarInt) -> Result<()> {
2868 let mut recv = self.recv.lock().await;
2869 let result = recv.stop(code).map_err(translate_read_closed_stream);
2870 drop(recv);
2871 if result.is_ok() {
2872 self.stats.note_control_progress();
2873 }
2874 self.mark_read_closed();
2875 result
2876 }
2877
2878 pub async fn close_with_error(&self, code: u64, reason: &str) -> Result<()> {
2879 if self.read_closed.load(Ordering::Acquire) {
2880 return Err(self.read_terminal_error());
2881 }
2882 let code = checked_quinn_varint(
2883 code,
2884 zmux::ErrorOperation::Close,
2885 zmux::ErrorDirection::Read,
2886 )?;
2887 let terminal = local_stream_application_error(
2888 code.into_inner(),
2889 reason,
2890 zmux::ErrorOperation::Close,
2891 zmux::ErrorDirection::Read,
2892 zmux::TerminationKind::Abort,
2893 );
2894 let mut recv = self.recv.lock().await;
2895 let _ = recv.stop(code);
2896 drop(recv);
2897 self.stats.note_control_progress();
2898 self.stats.note_abort_reason(code.into_inner());
2899 self.mark_read_closed_with(Some(terminal));
2900 Ok(())
2901 }
2902
2903 pub async fn close(&self) -> Result<()> {
2904 if self.read_closed.load(Ordering::Acquire) {
2905 Ok(())
2906 } else {
2907 self.close_read().await
2908 }
2909 }
2910}
2911
2912impl AsyncStreamHandle for QuinnStream {
2913 fn stream_id(&self) -> u64 {
2914 QuinnStream::stream_id(self)
2915 }
2916
2917 fn is_opened_locally(&self) -> bool {
2918 QuinnStream::is_opened_locally(self)
2919 }
2920
2921 fn is_bidirectional(&self) -> bool {
2922 QuinnStream::is_bidirectional(self)
2923 }
2924
2925 fn open_info_len(&self) -> usize {
2926 QuinnStream::open_info_len(self)
2927 }
2928
2929 fn has_open_info(&self) -> bool {
2930 QuinnStream::has_open_info(self)
2931 }
2932
2933 fn append_open_info_to(&self, dst: &mut Vec<u8>) {
2934 QuinnStream::append_open_info_to(self, dst)
2935 }
2936
2937 fn open_info(&self) -> Vec<u8> {
2938 QuinnStream::open_info(self)
2939 }
2940
2941 fn metadata(&self) -> StreamMetadata {
2942 QuinnStream::metadata(self)
2943 }
2944
2945 fn local_addr(&self) -> Option<SocketAddr> {
2946 QuinnStream::local_addr(self)
2947 }
2948
2949 fn peer_addr(&self) -> Option<SocketAddr> {
2950 QuinnStream::peer_addr(self)
2951 }
2952
2953 fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
2954 QuinnStream::set_deadline(self, deadline)
2955 }
2956
2957 fn close(&self) -> AsyncBoxFuture<'_, Result<()>> {
2958 Box::pin(async move { QuinnStream::close(self).await })
2959 }
2960
2961 fn close_with_error<'a>(
2962 &'a self,
2963 code: u64,
2964 reason: &'a str,
2965 ) -> AsyncBoxFuture<'a, Result<()>> {
2966 Box::pin(async move { QuinnStream::close_with_error(self, code, reason).await })
2967 }
2968}
2969
2970impl AsyncRecvStreamHandle for QuinnStream {
2971 fn read<'a>(&'a self, dst: &'a mut [u8]) -> AsyncBoxFuture<'a, Result<usize>> {
2972 Box::pin(async move { QuinnStream::read(self, dst).await })
2973 }
2974
2975 fn read_vectored<'a>(
2976 &'a self,
2977 dsts: &'a mut [IoSliceMut<'_>],
2978 ) -> AsyncBoxFuture<'a, Result<usize>> {
2979 Box::pin(async move { QuinnStream::read_vectored(self, dsts).await })
2980 }
2981
2982 fn read_timeout<'a>(
2983 &'a self,
2984 dst: &'a mut [u8],
2985 timeout: Duration,
2986 ) -> AsyncBoxFuture<'a, Result<usize>> {
2987 Box::pin(async move { QuinnStream::read_timeout(self, dst, timeout).await })
2988 }
2989
2990 fn read_vectored_timeout<'a>(
2991 &'a self,
2992 dsts: &'a mut [IoSliceMut<'_>],
2993 timeout: Duration,
2994 ) -> AsyncBoxFuture<'a, Result<usize>> {
2995 Box::pin(async move { QuinnStream::read_vectored_timeout(self, dsts, timeout).await })
2996 }
2997
2998 fn read_exact<'a>(&'a self, dst: &'a mut [u8]) -> AsyncBoxFuture<'a, Result<()>> {
2999 Box::pin(async move { QuinnStream::read_exact(self, dst).await })
3000 }
3001
3002 fn read_exact_timeout<'a>(
3003 &'a self,
3004 dst: &'a mut [u8],
3005 timeout: Duration,
3006 ) -> AsyncBoxFuture<'a, Result<()>> {
3007 Box::pin(async move { QuinnStream::read_exact_timeout(self, dst, timeout).await })
3008 }
3009
3010 fn is_read_closed(&self) -> bool {
3011 QuinnStream::is_read_closed(self)
3012 }
3013
3014 fn set_read_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3015 QuinnStream::set_read_deadline(self, deadline)
3016 }
3017
3018 fn close_read(&self) -> AsyncBoxFuture<'_, Result<()>> {
3019 Box::pin(async move { QuinnStream::close_read(self).await })
3020 }
3021
3022 fn cancel_read(&self, code: u64) -> AsyncBoxFuture<'_, Result<()>> {
3023 Box::pin(async move { QuinnStream::cancel_read(self, code).await })
3024 }
3025}
3026
3027impl AsyncSendStreamHandle for QuinnStream {
3028 fn write<'a>(&'a self, src: &'a [u8]) -> AsyncBoxFuture<'a, Result<usize>> {
3029 Box::pin(async move { QuinnStream::write(self, src).await })
3030 }
3031
3032 fn write_all<'a>(&'a self, src: WritePayload<'a>) -> AsyncBoxFuture<'a, Result<()>> {
3033 Box::pin(async move { QuinnStream::write_all(self, src).await })
3034 }
3035
3036 fn write_all_timeout<'a>(
3037 &'a self,
3038 src: WritePayload<'a>,
3039 timeout: Duration,
3040 ) -> AsyncBoxFuture<'a, Result<()>> {
3041 Box::pin(async move { QuinnStream::write_all_timeout(self, src, timeout).await })
3042 }
3043
3044 fn write_timeout<'a>(
3045 &'a self,
3046 src: &'a [u8],
3047 timeout: Duration,
3048 ) -> AsyncBoxFuture<'a, Result<usize>> {
3049 Box::pin(async move { QuinnStream::write_timeout(self, src, timeout).await })
3050 }
3051
3052 fn write_vectored<'a>(&'a self, parts: &'a [IoSlice<'_>]) -> AsyncBoxFuture<'a, Result<usize>> {
3053 Box::pin(async move { QuinnStream::write_vectored(self, parts).await })
3054 }
3055
3056 fn write_vectored_timeout<'a>(
3057 &'a self,
3058 parts: &'a [IoSlice<'_>],
3059 timeout: Duration,
3060 ) -> AsyncBoxFuture<'a, Result<usize>> {
3061 Box::pin(async move { QuinnStream::write_vectored_timeout(self, parts, timeout).await })
3062 }
3063
3064 fn write_final<'a>(&'a self, src: WritePayload<'a>) -> AsyncBoxFuture<'a, Result<usize>> {
3065 Box::pin(async move { QuinnStream::write_final(self, src).await })
3066 }
3067
3068 fn write_final_timeout<'a>(
3069 &'a self,
3070 src: WritePayload<'a>,
3071 timeout: Duration,
3072 ) -> AsyncBoxFuture<'a, Result<usize>> {
3073 Box::pin(async move { QuinnStream::write_final_timeout(self, src, timeout).await })
3074 }
3075
3076 fn write_vectored_final<'a>(
3077 &'a self,
3078 parts: &'a [IoSlice<'_>],
3079 ) -> AsyncBoxFuture<'a, Result<usize>> {
3080 Box::pin(async move { QuinnStream::write_vectored_final(self, parts).await })
3081 }
3082
3083 fn write_vectored_final_timeout<'a>(
3084 &'a self,
3085 parts: &'a [IoSlice<'_>],
3086 timeout: Duration,
3087 ) -> AsyncBoxFuture<'a, Result<usize>> {
3088 Box::pin(
3089 async move { QuinnStream::write_vectored_final_timeout(self, parts, timeout).await },
3090 )
3091 }
3092
3093 fn is_write_closed(&self) -> bool {
3094 QuinnStream::is_write_closed(self)
3095 }
3096
3097 fn set_write_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3098 QuinnStream::set_write_deadline(self, deadline)
3099 }
3100
3101 fn update_metadata(&self, update: zmux::MetadataUpdate) -> AsyncBoxFuture<'_, Result<()>> {
3102 Box::pin(async move { QuinnStream::update_metadata(self, update).await })
3103 }
3104
3105 fn close_write(&self) -> AsyncBoxFuture<'_, Result<()>> {
3106 Box::pin(async move { QuinnStream::close_write(self).await })
3107 }
3108
3109 fn cancel_write(&self, code: u64) -> AsyncBoxFuture<'_, Result<()>> {
3110 Box::pin(async move { QuinnStream::cancel_write(self, code).await })
3111 }
3112}
3113
3114impl AsyncDuplexStreamHandle for QuinnStream {}
3115
3116impl AsyncStreamHandle for QuinnSendStream {
3117 fn stream_id(&self) -> u64 {
3118 QuinnSendStream::stream_id(self)
3119 }
3120
3121 fn is_opened_locally(&self) -> bool {
3122 QuinnSendStream::is_opened_locally(self)
3123 }
3124
3125 fn is_bidirectional(&self) -> bool {
3126 QuinnSendStream::is_bidirectional(self)
3127 }
3128
3129 fn open_info_len(&self) -> usize {
3130 QuinnSendStream::open_info_len(self)
3131 }
3132
3133 fn has_open_info(&self) -> bool {
3134 QuinnSendStream::has_open_info(self)
3135 }
3136
3137 fn append_open_info_to(&self, dst: &mut Vec<u8>) {
3138 QuinnSendStream::append_open_info_to(self, dst)
3139 }
3140
3141 fn open_info(&self) -> Vec<u8> {
3142 QuinnSendStream::open_info(self)
3143 }
3144
3145 fn metadata(&self) -> StreamMetadata {
3146 QuinnSendStream::metadata(self)
3147 }
3148
3149 fn local_addr(&self) -> Option<SocketAddr> {
3150 QuinnSendStream::local_addr(self)
3151 }
3152
3153 fn peer_addr(&self) -> Option<SocketAddr> {
3154 QuinnSendStream::peer_addr(self)
3155 }
3156
3157 fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3158 QuinnSendStream::set_deadline(self, deadline)
3159 }
3160
3161 fn close(&self) -> AsyncBoxFuture<'_, Result<()>> {
3162 Box::pin(async move { QuinnSendStream::close(self).await })
3163 }
3164
3165 fn close_with_error<'a>(
3166 &'a self,
3167 code: u64,
3168 reason: &'a str,
3169 ) -> AsyncBoxFuture<'a, Result<()>> {
3170 Box::pin(async move { QuinnSendStream::close_with_error(self, code, reason).await })
3171 }
3172}
3173
3174impl AsyncSendStreamHandle for QuinnSendStream {
3175 fn write<'a>(&'a self, src: &'a [u8]) -> AsyncBoxFuture<'a, Result<usize>> {
3176 Box::pin(async move { QuinnSendStream::write(self, src).await })
3177 }
3178
3179 fn write_all<'a>(&'a self, src: WritePayload<'a>) -> AsyncBoxFuture<'a, Result<()>> {
3180 Box::pin(async move { QuinnSendStream::write_all(self, src).await })
3181 }
3182
3183 fn write_all_timeout<'a>(
3184 &'a self,
3185 src: WritePayload<'a>,
3186 timeout: Duration,
3187 ) -> AsyncBoxFuture<'a, Result<()>> {
3188 Box::pin(async move { QuinnSendStream::write_all_timeout(self, src, timeout).await })
3189 }
3190
3191 fn write_timeout<'a>(
3192 &'a self,
3193 src: &'a [u8],
3194 timeout: Duration,
3195 ) -> AsyncBoxFuture<'a, Result<usize>> {
3196 Box::pin(async move { QuinnSendStream::write_timeout(self, src, timeout).await })
3197 }
3198
3199 fn write_vectored<'a>(&'a self, parts: &'a [IoSlice<'_>]) -> AsyncBoxFuture<'a, Result<usize>> {
3200 Box::pin(async move { QuinnSendStream::write_vectored(self, parts).await })
3201 }
3202
3203 fn write_vectored_timeout<'a>(
3204 &'a self,
3205 parts: &'a [IoSlice<'_>],
3206 timeout: Duration,
3207 ) -> AsyncBoxFuture<'a, Result<usize>> {
3208 Box::pin(async move { QuinnSendStream::write_vectored_timeout(self, parts, timeout).await })
3209 }
3210
3211 fn write_final<'a>(&'a self, src: WritePayload<'a>) -> AsyncBoxFuture<'a, Result<usize>> {
3212 Box::pin(async move { QuinnSendStream::write_final(self, src).await })
3213 }
3214
3215 fn write_final_timeout<'a>(
3216 &'a self,
3217 src: WritePayload<'a>,
3218 timeout: Duration,
3219 ) -> AsyncBoxFuture<'a, Result<usize>> {
3220 Box::pin(async move { QuinnSendStream::write_final_timeout(self, src, timeout).await })
3221 }
3222
3223 fn write_vectored_final<'a>(
3224 &'a self,
3225 parts: &'a [IoSlice<'_>],
3226 ) -> AsyncBoxFuture<'a, Result<usize>> {
3227 Box::pin(async move { QuinnSendStream::write_vectored_final(self, parts).await })
3228 }
3229
3230 fn write_vectored_final_timeout<'a>(
3231 &'a self,
3232 parts: &'a [IoSlice<'_>],
3233 timeout: Duration,
3234 ) -> AsyncBoxFuture<'a, Result<usize>> {
3235 Box::pin(async move {
3236 QuinnSendStream::write_vectored_final_timeout(self, parts, timeout).await
3237 })
3238 }
3239
3240 fn is_write_closed(&self) -> bool {
3241 QuinnSendStream::is_write_closed(self)
3242 }
3243
3244 fn set_write_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3245 QuinnSendStream::set_write_deadline(self, deadline)
3246 }
3247
3248 fn update_metadata(&self, update: zmux::MetadataUpdate) -> AsyncBoxFuture<'_, Result<()>> {
3249 Box::pin(async move { QuinnSendStream::update_metadata(self, update).await })
3250 }
3251
3252 fn close_write(&self) -> AsyncBoxFuture<'_, Result<()>> {
3253 Box::pin(async move { QuinnSendStream::close_write(self).await })
3254 }
3255
3256 fn cancel_write(&self, code: u64) -> AsyncBoxFuture<'_, Result<()>> {
3257 Box::pin(async move { QuinnSendStream::cancel_write(self, code).await })
3258 }
3259}
3260
3261impl AsyncStreamHandle for QuinnRecvStream {
3262 fn stream_id(&self) -> u64 {
3263 QuinnRecvStream::stream_id(self)
3264 }
3265
3266 fn is_opened_locally(&self) -> bool {
3267 QuinnRecvStream::is_opened_locally(self)
3268 }
3269
3270 fn is_bidirectional(&self) -> bool {
3271 QuinnRecvStream::is_bidirectional(self)
3272 }
3273
3274 fn open_info_len(&self) -> usize {
3275 QuinnRecvStream::open_info_len(self)
3276 }
3277
3278 fn has_open_info(&self) -> bool {
3279 QuinnRecvStream::has_open_info(self)
3280 }
3281
3282 fn append_open_info_to(&self, dst: &mut Vec<u8>) {
3283 QuinnRecvStream::append_open_info_to(self, dst)
3284 }
3285
3286 fn open_info(&self) -> Vec<u8> {
3287 QuinnRecvStream::open_info(self)
3288 }
3289
3290 fn metadata(&self) -> StreamMetadata {
3291 QuinnRecvStream::metadata(self)
3292 }
3293
3294 fn local_addr(&self) -> Option<SocketAddr> {
3295 QuinnRecvStream::local_addr(self)
3296 }
3297
3298 fn peer_addr(&self) -> Option<SocketAddr> {
3299 QuinnRecvStream::peer_addr(self)
3300 }
3301
3302 fn set_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3303 QuinnRecvStream::set_deadline(self, deadline)
3304 }
3305
3306 fn close(&self) -> AsyncBoxFuture<'_, Result<()>> {
3307 Box::pin(async move { QuinnRecvStream::close(self).await })
3308 }
3309
3310 fn close_with_error<'a>(
3311 &'a self,
3312 code: u64,
3313 reason: &'a str,
3314 ) -> AsyncBoxFuture<'a, Result<()>> {
3315 Box::pin(async move { QuinnRecvStream::close_with_error(self, code, reason).await })
3316 }
3317}
3318
3319impl AsyncRecvStreamHandle for QuinnRecvStream {
3320 fn read<'a>(&'a self, dst: &'a mut [u8]) -> AsyncBoxFuture<'a, Result<usize>> {
3321 Box::pin(async move { QuinnRecvStream::read(self, dst).await })
3322 }
3323
3324 fn read_vectored<'a>(
3325 &'a self,
3326 dsts: &'a mut [IoSliceMut<'_>],
3327 ) -> AsyncBoxFuture<'a, Result<usize>> {
3328 Box::pin(async move { QuinnRecvStream::read_vectored(self, dsts).await })
3329 }
3330
3331 fn read_timeout<'a>(
3332 &'a self,
3333 dst: &'a mut [u8],
3334 timeout: Duration,
3335 ) -> AsyncBoxFuture<'a, Result<usize>> {
3336 Box::pin(async move { QuinnRecvStream::read_timeout(self, dst, timeout).await })
3337 }
3338
3339 fn read_vectored_timeout<'a>(
3340 &'a self,
3341 dsts: &'a mut [IoSliceMut<'_>],
3342 timeout: Duration,
3343 ) -> AsyncBoxFuture<'a, Result<usize>> {
3344 Box::pin(async move { QuinnRecvStream::read_vectored_timeout(self, dsts, timeout).await })
3345 }
3346
3347 fn read_exact<'a>(&'a self, dst: &'a mut [u8]) -> AsyncBoxFuture<'a, Result<()>> {
3348 Box::pin(async move { QuinnRecvStream::read_exact(self, dst).await })
3349 }
3350
3351 fn read_exact_timeout<'a>(
3352 &'a self,
3353 dst: &'a mut [u8],
3354 timeout: Duration,
3355 ) -> AsyncBoxFuture<'a, Result<()>> {
3356 Box::pin(async move { QuinnRecvStream::read_exact_timeout(self, dst, timeout).await })
3357 }
3358
3359 fn is_read_closed(&self) -> bool {
3360 QuinnRecvStream::is_read_closed(self)
3361 }
3362
3363 fn set_read_deadline(&self, deadline: Option<Instant>) -> Result<()> {
3364 QuinnRecvStream::set_read_deadline(self, deadline)
3365 }
3366
3367 fn close_read(&self) -> AsyncBoxFuture<'_, Result<()>> {
3368 Box::pin(async move { QuinnRecvStream::close_read(self).await })
3369 }
3370
3371 fn cancel_read(&self, code: u64) -> AsyncBoxFuture<'_, Result<()>> {
3372 Box::pin(async move { QuinnRecvStream::cancel_read(self, code).await })
3373 }
3374}
3375
3376impl AsyncSession for QuinnSession {
3377 type Stream = QuinnStream;
3378 type SendStream = QuinnSendStream;
3379 type RecvStream = QuinnRecvStream;
3380
3381 fn accept_stream(&self) -> AsyncBoxFuture<'_, Result<Self::Stream>> {
3382 Box::pin(async move { QuinnSession::accept_stream(self).await })
3383 }
3384
3385 fn accept_stream_timeout(&self, timeout: Duration) -> AsyncBoxFuture<'_, Result<Self::Stream>> {
3386 Box::pin(async move { QuinnSession::accept_stream_timeout(self, timeout).await })
3387 }
3388
3389 fn accept_uni_stream(&self) -> AsyncBoxFuture<'_, Result<Self::RecvStream>> {
3390 Box::pin(async move { QuinnSession::accept_uni_stream(self).await })
3391 }
3392
3393 fn accept_uni_stream_timeout(
3394 &self,
3395 timeout: Duration,
3396 ) -> AsyncBoxFuture<'_, Result<Self::RecvStream>> {
3397 Box::pin(async move { QuinnSession::accept_uni_stream_timeout(self, timeout).await })
3398 }
3399
3400 fn open_stream_with(&self, request: OpenRequest) -> AsyncBoxFuture<'_, Result<Self::Stream>> {
3401 Box::pin(async move { QuinnSession::open_stream_with(self, request).await })
3402 }
3403
3404 fn open_uni_stream_with(
3405 &self,
3406 request: OpenRequest,
3407 ) -> AsyncBoxFuture<'_, Result<Self::SendStream>> {
3408 Box::pin(async move { QuinnSession::open_uni_stream_with(self, request).await })
3409 }
3410
3411 fn open_and_send<'a>(
3412 &'a self,
3413 request: OpenSend<'a>,
3414 ) -> AsyncBoxFuture<'a, Result<Self::Stream>> {
3415 Box::pin(async move { QuinnSession::open_and_send(self, request).await })
3416 }
3417
3418 fn open_uni_and_send<'a>(
3419 &'a self,
3420 request: OpenSend<'a>,
3421 ) -> AsyncBoxFuture<'a, Result<Self::SendStream>> {
3422 Box::pin(async move { QuinnSession::open_uni_and_send(self, request).await })
3423 }
3424
3425 fn ping<'a>(&'a self, _echo: &'a [u8]) -> AsyncBoxFuture<'a, Result<Duration>> {
3426 Box::pin(async move {
3427 Err(adapter_session_control_unavailable(
3428 zmux::ErrorOperation::Ping,
3429 "zmux: feature not supported by adapter: ping",
3430 ))
3431 })
3432 }
3433
3434 fn ping_timeout<'a>(
3435 &'a self,
3436 _echo: &'a [u8],
3437 _timeout: Duration,
3438 ) -> AsyncBoxFuture<'a, Result<Duration>> {
3439 Box::pin(async move {
3440 Err(adapter_session_control_unavailable(
3441 zmux::ErrorOperation::Ping,
3442 "zmux: feature not supported by adapter: ping_timeout",
3443 ))
3444 })
3445 }
3446
3447 fn go_away(
3448 &self,
3449 _last_accepted_bidi: u64,
3450 _last_accepted_uni: u64,
3451 ) -> AsyncBoxFuture<'_, Result<()>> {
3452 Box::pin(async move {
3453 Err(adapter_session_control_unavailable(
3454 zmux::ErrorOperation::Close,
3455 "zmux: feature not supported by adapter: go_away",
3456 ))
3457 })
3458 }
3459
3460 fn go_away_with_error<'a>(
3461 &'a self,
3462 _last_accepted_bidi: u64,
3463 _last_accepted_uni: u64,
3464 _code: u64,
3465 _reason: &'a str,
3466 ) -> AsyncBoxFuture<'a, Result<()>> {
3467 Box::pin(async move {
3468 Err(adapter_session_control_unavailable(
3469 zmux::ErrorOperation::Close,
3470 "zmux: feature not supported by adapter: go_away_with_error",
3471 ))
3472 })
3473 }
3474
3475 fn close(&self) -> AsyncBoxFuture<'_, Result<()>> {
3476 Box::pin(async move { QuinnSession::close(self).await })
3477 }
3478
3479 fn close_with_error<'a>(
3480 &'a self,
3481 code: u64,
3482 reason: &'a str,
3483 ) -> AsyncBoxFuture<'a, Result<()>> {
3484 Box::pin(async move { QuinnSession::close_with_error(self, code, reason).await })
3485 }
3486
3487 fn wait(&self) -> AsyncBoxFuture<'_, Result<()>> {
3488 Box::pin(async move { QuinnSession::wait(self).await })
3489 }
3490
3491 fn wait_timeout(&self, timeout: Duration) -> AsyncBoxFuture<'_, Result<bool>> {
3492 Box::pin(async move { QuinnSession::wait_timeout(self, timeout).await })
3493 }
3494
3495 fn is_closed(&self) -> bool {
3496 QuinnSession::is_closed(self)
3497 }
3498
3499 fn local_addr(&self) -> Option<SocketAddr> {
3500 QuinnSession::local_addr(self)
3501 }
3502
3503 fn peer_addr(&self) -> Option<SocketAddr> {
3504 QuinnSession::peer_addr(self)
3505 }
3506
3507 fn close_error(&self) -> Option<zmux::Error> {
3508 QuinnSession::close_error(self)
3509 }
3510
3511 fn state(&self) -> zmux::SessionState {
3512 QuinnSession::state(self)
3513 }
3514
3515 fn stats(&self) -> zmux::SessionStats {
3516 QuinnSession::stats(self)
3517 }
3518
3519 fn peer_go_away_error(&self) -> Option<zmux::PeerGoAwayError> {
3520 None
3521 }
3522
3523 fn peer_close_error(&self) -> Option<zmux::PeerCloseError> {
3524 None
3525 }
3526
3527 fn local_preface(&self) -> zmux::Preface {
3528 adapter_empty_preface()
3529 }
3530
3531 fn peer_preface(&self) -> zmux::Preface {
3532 adapter_empty_preface()
3533 }
3534
3535 fn negotiated(&self) -> zmux::Negotiated {
3536 adapter_empty_negotiated()
3537 }
3538}
3539
3540async fn ensure_open_prelude(
3541 state: &Mutex<PreludeState>,
3542 send: &mut quinn::SendStream,
3543 stats: &AdapterStats,
3544) -> Result<()> {
3545 loop {
3546 let chunk = {
3547 let state = state.lock().unwrap();
3548 if !state.send_prelude || state.prelude_sent {
3549 return Ok(());
3550 }
3551 state.prelude.slice(state.prelude_offset..)
3552 };
3553 if chunk.is_empty() {
3554 let mut state = state.lock().unwrap();
3555 state.prelude_sent = true;
3556 state.prelude = Bytes::new();
3557 drop(state);
3558 return Ok(());
3559 }
3560 let started_at = Instant::now();
3561 let n = send.write(&chunk).await.map_err(translate_write_error)?;
3562 let completed_at = Instant::now();
3563 if n == 0 {
3564 return Err(zmux::Error::local("zmux-quinn: zero-length prelude write"));
3565 }
3566 if n > chunk.len() {
3567 return Err(zmux::Error::local(
3568 "zmux-quinn: prelude write reported invalid progress",
3569 ));
3570 }
3571 stats.note_write_wait(started_at, completed_at);
3572 stats.note_flush(n, completed_at);
3573 stats.note_control_progress_at(completed_at);
3574 let mut state = state.lock().unwrap();
3575 state.prelude_offset = state.prelude_offset.saturating_add(n);
3576 if state.prelude_offset >= state.prelude.len() {
3577 state.prelude_sent = true;
3578 state.prelude = Bytes::new();
3579 state.prelude_offset = 0;
3580 drop(state);
3581 return Ok(());
3582 }
3583 }
3584}
3585
3586async fn write_all_final(
3587 state: &Mutex<PreludeState>,
3588 send: &mut quinn::SendStream,
3589 src: &[u8],
3590 stats: &AdapterStats,
3591) -> Result<usize> {
3592 ensure_open_prelude(state, send, stats).await?;
3593 if !src.is_empty() {
3594 write_payload_all(send, src, stats).await?;
3595 }
3596 finish_send(send, stats).await?;
3597 Ok(src.len())
3598}
3599
3600async fn write_io_slices_final(
3601 state: &Mutex<PreludeState>,
3602 send: &mut quinn::SendStream,
3603 parts: &[IoSlice<'_>],
3604 stats: &AdapterStats,
3605) -> Result<usize> {
3606 let total = total_bytes(parts.iter().map(|part| part.len()))?;
3607 ensure_open_prelude(state, send, stats).await?;
3608 if total == 0 {
3609 finish_send(send, stats).await?;
3610 return Ok(0);
3611 }
3612 if let Some(single) = single_non_empty_io_slice(parts) {
3613 write_payload_all(send, single, stats).await?;
3614 finish_send(send, stats).await?;
3615 return Ok(total);
3616 }
3617 if total <= QUINN_WRITE_VECTORED_COALESCE_MAX_BYTES {
3618 let mut coalesced = Vec::with_capacity(total);
3619 for part in parts {
3620 coalesced.extend_from_slice(part.as_ref());
3621 }
3622 write_payload_all(send, &coalesced, stats).await?;
3623 } else {
3624 for part in parts {
3625 if !part.is_empty() {
3626 write_payload_all(send, part.as_ref(), stats).await?;
3627 }
3628 }
3629 }
3630 finish_send(send, stats).await?;
3631 Ok(total)
3632}
3633
3634async fn write_io_slices_once(
3635 send: &mut quinn::SendStream,
3636 parts: &[IoSlice<'_>],
3637 total: usize,
3638 stats: &AdapterStats,
3639) -> Result<usize> {
3640 if total == 0 {
3641 return Ok(0);
3642 }
3643 if let Some(single) = single_non_empty_io_slice(parts) {
3644 return write_payload_once(send, single, stats).await;
3645 }
3646
3647 let prefix_len = total.min(QUINN_WRITE_VECTORED_COALESCE_MAX_BYTES);
3648 let mut coalesced = Vec::with_capacity(prefix_len);
3649 for part in parts {
3650 if coalesced.len() == prefix_len {
3651 break;
3652 }
3653 let remaining = prefix_len - coalesced.len();
3654 let bytes = part.as_ref();
3655 let take = bytes.len().min(remaining);
3656 coalesced.extend_from_slice(&bytes[..take]);
3657 }
3658 write_payload_once(send, &coalesced, stats).await
3659}
3660
3661fn single_non_empty_io_slice<'a>(parts: &'a [IoSlice<'_>]) -> Option<&'a [u8]> {
3662 let mut single = None;
3663 for part in parts {
3664 if part.is_empty() {
3665 continue;
3666 }
3667 if single.is_some() {
3668 return None;
3669 }
3670 single = Some(part.as_ref());
3671 }
3672 single
3673}
3674
3675async fn write_payload_once(
3676 send: &mut quinn::SendStream,
3677 src: &[u8],
3678 stats: &AdapterStats,
3679) -> Result<usize> {
3680 let started_at = Instant::now();
3681 let n = send.write(src).await.map_err(translate_write_error)?;
3682 let completed_at = Instant::now();
3683 if n == 0 {
3684 return Err(zmux::Error::local("zmux-quinn: zero-length stream write"));
3685 }
3686 if n > src.len() {
3687 return Err(
3688 zmux::Error::local("zmux-quinn: stream write reported invalid progress")
3689 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write),
3690 );
3691 }
3692 stats.note_write_wait(started_at, completed_at);
3693 stats.note_flush(n, completed_at);
3694 stats.note_data_write(n, completed_at);
3695 Ok(n)
3696}
3697
3698async fn write_payload_all(
3699 send: &mut quinn::SendStream,
3700 src: &[u8],
3701 stats: &AdapterStats,
3702) -> Result<()> {
3703 let started_at = Instant::now();
3704 send.write_all(src).await.map_err(translate_write_error)?;
3705 let completed_at = Instant::now();
3706 stats.note_write_wait(started_at, completed_at);
3707 stats.note_flush(src.len(), completed_at);
3708 stats.note_data_write(src.len(), completed_at);
3709 Ok(())
3710}
3711
3712async fn finish_send(send: &mut quinn::SendStream, stats: &AdapterStats) -> Result<()> {
3713 send.finish().map_err(translate_write_closed_stream)?;
3714 stats.note_control_progress();
3715 Ok(())
3716}
3717
3718fn total_bytes(lengths: impl IntoIterator<Item = usize>) -> Result<usize> {
3719 lengths.into_iter().try_fold(0usize, |total, len| {
3720 total.checked_add(len).ok_or_else(|| {
3721 zmux::Error::local("zmux-quinn: vectored payload length exceeds usize")
3722 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
3723 })
3724 })
3725}
3726
3727fn validate_progress(n: usize, requested: usize) -> Result<()> {
3728 if n > requested {
3729 Err(
3730 zmux::Error::local("zmux-quinn: write reported invalid progress")
3731 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write),
3732 )
3733 } else {
3734 Ok(())
3735 }
3736}
3737
3738fn checked_prelude_metadata_len(metadata_len: u64, prefix_len: usize) -> Result<usize> {
3739 let prefix_len = u64::try_from(prefix_len)
3740 .map_err(|_| protocol_prelude_error("stream prelude prefix length exceeds u64"))?;
3741 if metadata_len
3742 .checked_add(prefix_len)
3743 .is_none_or(|len| len > STREAM_PRELUDE_MAX_PAYLOAD)
3744 {
3745 return Err(protocol_prelude_error(
3746 "stream prelude exceeds adapter payload cap",
3747 ));
3748 }
3749 usize::try_from(metadata_len)
3750 .map_err(|_| protocol_prelude_error("stream prelude length exceeds usize"))
3751}
3752
3753fn usize_to_u64_saturating(value: usize) -> u64 {
3754 u64::try_from(value).unwrap_or(u64::MAX)
3755}
3756
3757fn duration_nanos_saturating(duration: Duration) -> u64 {
3758 u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX - 1)
3759}
3760
3761fn note_reason(counts: &mut HashMap<u64, u64>, overflow: &mut u64, code: u64) {
3762 if let Some(count) = counts.get_mut(&code) {
3763 *count = count.saturating_add(1);
3764 } else if counts.len() < MAX_REASON_STATS_CODES {
3765 counts.insert(code, 1);
3766 } else {
3767 *overflow = overflow.saturating_add(1);
3768 }
3769}
3770
3771fn saturating_add_atomic_u64(counter: &AtomicU64, delta: u64) {
3772 if delta == 0 {
3773 return;
3774 }
3775 let mut current = counter.load(Ordering::Relaxed);
3776 loop {
3777 let next = current.saturating_add(delta);
3778 match counter.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed) {
3779 Ok(_) => return,
3780 Err(observed) => current = observed,
3781 }
3782 }
3783}
3784
3785fn saturating_increment_atomic_usize(counter: &AtomicUsize) {
3786 let mut current = counter.load(Ordering::Relaxed);
3787 loop {
3788 if current == usize::MAX {
3789 return;
3790 }
3791 match counter.compare_exchange_weak(
3792 current,
3793 current + 1,
3794 Ordering::Relaxed,
3795 Ordering::Relaxed,
3796 ) {
3797 Ok(_) => return,
3798 Err(observed) => current = observed,
3799 }
3800 }
3801}
3802
3803fn quinn_stream_id(id: quinn::StreamId) -> u64 {
3804 quinn::VarInt::from(id).into_inner()
3805}
3806
3807fn quinn_varint(value: u64) -> quinn::VarInt {
3808 quinn::VarInt::from_u64(value).unwrap_or(quinn::VarInt::MAX)
3809}
3810
3811fn open_error_cleanup_code(err: &zmux::Error) -> quinn::VarInt {
3812 quinn_varint(
3813 err.numeric_code()
3814 .unwrap_or(zmux::ErrorCode::Cancelled.as_u64()),
3815 )
3816}
3817
3818fn checked_quinn_varint(
3819 value: u64,
3820 operation: zmux::ErrorOperation,
3821 direction: zmux::ErrorDirection,
3822) -> Result<quinn::VarInt> {
3823 quinn::VarInt::from_u64(value).map_err(|_| {
3824 zmux::Error::local("zmux-quinn: QUIC application error code exceeds varint62")
3825 .with_stream_context(operation, direction)
3826 })
3827}
3828
3829fn checked_session_quinn_varint(
3830 value: u64,
3831 operation: zmux::ErrorOperation,
3832) -> Result<quinn::VarInt> {
3833 quinn::VarInt::from_u64(value).map_err(|_| {
3834 zmux::Error::local("zmux-quinn: QUIC application error code exceeds varint62")
3835 .with_session_context(operation)
3836 })
3837}
3838
3839fn protocol_prelude_error(reason: &str) -> zmux::Error {
3840 zmux::Error::application(
3841 zmux::ErrorCode::Protocol.as_u64(),
3842 format!("zmux-quinn: {reason}"),
3843 )
3844 .with_source(zmux::ErrorSource::Remote)
3845 .with_stream_context(zmux::ErrorOperation::Accept, zmux::ErrorDirection::Both)
3846 .with_termination_kind(zmux::TerminationKind::Abort)
3847}
3848
3849fn local_stream_application_error(
3850 code: u64,
3851 reason: &str,
3852 operation: zmux::ErrorOperation,
3853 direction: zmux::ErrorDirection,
3854 termination_kind: zmux::TerminationKind,
3855) -> zmux::Error {
3856 zmux::Error::application(code, reason)
3857 .with_source(zmux::ErrorSource::Local)
3858 .with_stream_context(operation, direction)
3859 .with_termination_kind(termination_kind)
3860}
3861
3862fn priority_update_unavailable() -> zmux::Error {
3863 zmux::Error::local(
3864 "zmux: feature not supported by adapter: metadata update requires negotiated priority_update",
3865 )
3866 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
3867}
3868
3869fn adapter_session_control_unavailable(
3870 operation: zmux::ErrorOperation,
3871 message: &'static str,
3872) -> zmux::Error {
3873 zmux::Error::local(message).with_session_context(operation)
3874}
3875
3876fn adapter_empty_preface() -> zmux::Preface {
3877 zmux::Preface {
3878 preface_version: 0,
3879 role: zmux::Role::Initiator,
3880 tie_breaker_nonce: 0,
3881 min_proto: 0,
3882 max_proto: 0,
3883 capabilities: 0,
3884 settings: zmux::default_settings(),
3885 }
3886}
3887
3888fn adapter_empty_negotiated() -> zmux::Negotiated {
3889 zmux::Negotiated {
3890 proto: 0,
3891 capabilities: 0,
3892 local_role: zmux::Role::Initiator,
3893 peer_role: zmux::Role::Initiator,
3894 peer_settings: zmux::default_settings(),
3895 }
3896}
3897
3898fn local_read_closed_error() -> zmux::Error {
3899 zmux::Error::read_closed()
3900 .with_source(zmux::ErrorSource::Local)
3901 .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)
3902}
3903
3904fn unexpected_eof_error() -> zmux::Error {
3905 zmux::Error::io(std::io::Error::new(
3906 ErrorKind::UnexpectedEof,
3907 "failed to fill whole buffer",
3908 ))
3909 .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)
3910}
3911
3912fn local_write_closed_error() -> zmux::Error {
3913 zmux::Error::write_closed()
3914 .with_source(zmux::ErrorSource::Local)
3915 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
3916}
3917
3918fn translate_read_closed_stream(_: quinn::ClosedStream) -> zmux::Error {
3919 zmux::Error::read_closed()
3920 .with_source(zmux::ErrorSource::Local)
3921 .with_stream_context(zmux::ErrorOperation::Close, zmux::ErrorDirection::Read)
3922}
3923
3924fn translate_write_closed_stream(_: quinn::ClosedStream) -> zmux::Error {
3925 zmux::Error::write_closed()
3926 .with_source(zmux::ErrorSource::Local)
3927 .with_stream_context(zmux::ErrorOperation::Close, zmux::ErrorDirection::Write)
3928}
3929
3930fn translate_read_error(err: quinn::ReadError) -> zmux::Error {
3931 match err {
3932 quinn::ReadError::Reset(code) => zmux::Error::application(code.into_inner(), "")
3933 .with_source(zmux::ErrorSource::Remote)
3934 .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)
3935 .with_termination_kind(zmux::TerminationKind::Reset),
3936 quinn::ReadError::ConnectionLost(err) => translate_connection_error(err),
3937 quinn::ReadError::ClosedStream => zmux::Error::read_closed()
3938 .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read),
3939 quinn::ReadError::IllegalOrderedRead => {
3940 zmux::Error::local("zmux-quinn: illegal ordered read")
3941 .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read)
3942 }
3943 quinn::ReadError::ZeroRttRejected => zmux::Error::local("zmux-quinn: 0-RTT rejected")
3944 .with_stream_context(zmux::ErrorOperation::Read, zmux::ErrorDirection::Read),
3945 }
3946}
3947
3948fn translate_write_error(err: quinn::WriteError) -> zmux::Error {
3949 match err {
3950 quinn::WriteError::Stopped(code) => zmux::Error::application(code.into_inner(), "")
3951 .with_source(zmux::ErrorSource::Remote)
3952 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write)
3953 .with_termination_kind(zmux::TerminationKind::Stopped),
3954 quinn::WriteError::ConnectionLost(err) => translate_connection_error(err),
3955 quinn::WriteError::ClosedStream => zmux::Error::write_closed()
3956 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write),
3957 quinn::WriteError::ZeroRttRejected => zmux::Error::local("zmux-quinn: 0-RTT rejected")
3958 .with_stream_context(zmux::ErrorOperation::Write, zmux::ErrorDirection::Write),
3959 }
3960}
3961
3962fn translate_connection_error(err: quinn::ConnectionError) -> zmux::Error {
3963 match err {
3964 quinn::ConnectionError::ApplicationClosed(close) => {
3965 let reason = String::from_utf8_lossy(&close.reason).into_owned();
3966 zmux::Error::application(close.error_code.into_inner(), reason)
3967 .with_source(zmux::ErrorSource::Remote)
3968 .with_session_context(zmux::ErrorOperation::Close)
3969 .with_termination_kind(zmux::TerminationKind::SessionTermination)
3970 }
3971 quinn::ConnectionError::LocallyClosed => zmux::Error::session_closed()
3972 .with_source(zmux::ErrorSource::Local)
3973 .with_session_context(zmux::ErrorOperation::Close),
3974 quinn::ConnectionError::TimedOut => zmux::Error::new(
3975 zmux::ErrorCode::IdleTimeout,
3976 "zmux-quinn: QUIC connection idle timeout",
3977 )
3978 .with_source(zmux::ErrorSource::Transport)
3979 .with_session_context(zmux::ErrorOperation::Close)
3980 .with_termination_kind(zmux::TerminationKind::Timeout),
3981 other => zmux::Error::local(format!("zmux-quinn: {other}"))
3982 .with_source(zmux::ErrorSource::Transport)
3983 .with_session_context(zmux::ErrorOperation::Close),
3984 }
3985}
3986
3987fn translate_wait_error(err: quinn::ConnectionError) -> Result<()> {
3988 match err {
3989 quinn::ConnectionError::LocallyClosed => Ok(()),
3990 quinn::ConnectionError::ApplicationClosed(close)
3991 if close.error_code.into_inner() == 0 && close.reason.is_empty() =>
3992 {
3993 Ok(())
3994 }
3995 other => Err(translate_connection_error(other)),
3996 }
3997}
3998
3999#[cfg(test)]
4000mod tests {
4001 use super::*;
4002
4003 #[test]
4004 fn adapter_publishes_stream_adapter_claim_only() {
4005 assert_eq!(target_claims(), &[zmux::Claim::StreamAdapterProfileV1]);
4006 assert!(target_implementation_profiles().is_empty());
4007 assert_eq!(
4008 target_suites(),
4009 &[zmux::ConformanceSuite::StreamAdapterProfile]
4010 );
4011 }
4012
4013 #[test]
4014 fn prelude_round_trip_open_info_and_priority() {
4015 let opts = OpenOptions::new()
4016 .priority(7)
4017 .group(11)
4018 .open_info(&[1, 2, 3, 4]);
4019 let prelude = build_stream_prelude(&opts).unwrap();
4020 let parsed = read_stream_prelude(&mut prelude.as_slice()).unwrap();
4021 assert!(parsed.metadata_valid);
4022 assert!(parsed.is_metadata_valid());
4023 assert!(parsed.has_open_info());
4024 assert_eq!(parsed.open_info(), [1, 2, 3, 4]);
4025 assert_eq!(parsed.metadata.priority, Some(7));
4026 assert_eq!(parsed.metadata().priority, Some(7));
4027 assert_eq!(parsed.metadata.group, Some(11));
4028 assert_eq!(parsed.metadata.open_info, [1, 2, 3, 4]);
4029 }
4030
4031 #[test]
4032 fn empty_prelude_is_one_zero_byte() {
4033 let prelude = build_stream_prelude(&OpenOptions::default()).unwrap();
4034 assert_eq!(prelude, vec![0]);
4035 let parsed = read_stream_prelude(&mut prelude.as_slice()).unwrap();
4036 assert!(parsed.metadata_valid);
4037 assert_eq!(parsed.metadata, StreamMetadata::default());
4038 }
4039
4040 #[test]
4041 fn duplicate_singleton_metadata_is_ignored() {
4042 let mut prelude = Vec::new();
4043 let metadata = [
4044 zmux::METADATA_STREAM_PRIORITY as u8,
4045 1,
4046 5,
4047 zmux::METADATA_STREAM_PRIORITY as u8,
4048 1,
4049 6,
4050 ];
4051 zmux::append_varint(&mut prelude, metadata.len() as u64).unwrap();
4052 prelude.extend_from_slice(&metadata);
4053
4054 let parsed = read_stream_prelude(&mut prelude.as_slice()).unwrap();
4055 assert!(!parsed.metadata_valid);
4056 assert_eq!(parsed.metadata, StreamMetadata::default());
4057 }
4058
4059 #[test]
4060 fn malformed_metadata_is_rejected() {
4061 let mut prelude = Vec::new();
4062 let metadata = [zmux::METADATA_STREAM_PRIORITY as u8, 2, 5];
4063 zmux::append_varint(&mut prelude, metadata.len() as u64).unwrap();
4064 prelude.extend_from_slice(&metadata);
4065
4066 let err = read_stream_prelude(&mut prelude.as_slice()).unwrap_err();
4067 assert!(err.is_error_code(zmux::ErrorCode::Protocol));
4068 assert_eq!(err.scope(), zmux::ErrorScope::Stream);
4069 assert_eq!(err.source(), zmux::ErrorSource::Remote);
4070 assert_eq!(err.direction(), zmux::ErrorDirection::Both);
4071 }
4072
4073 #[test]
4074 fn session_option_defaults_match_go_adapter_bounds() {
4075 let previous_default = SessionOptions::default_accepted_prelude_max_concurrent();
4076 SessionOptions::set_default_accepted_prelude_max_concurrent(0);
4077 assert_eq!(
4078 normalize_accepted_prelude_max_concurrent(None),
4079 DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT
4080 );
4081 assert_eq!(
4082 SessionOptions::default_accepted_prelude_max_concurrent(),
4083 DEFAULT_ACCEPTED_PRELUDE_MAX_CONCURRENT
4084 );
4085 SessionOptions::set_default_accepted_prelude_max_concurrent(5);
4086 assert_eq!(SessionOptions::default_accepted_prelude_max_concurrent(), 5);
4087 assert_eq!(normalize_accepted_prelude_max_concurrent(None), 5);
4088 assert_eq!(normalize_accepted_prelude_max_concurrent(Some(0)), 5);
4089 SessionOptions::set_default_accepted_prelude_max_concurrent(
4090 MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT + 1,
4091 );
4092 assert_eq!(
4093 SessionOptions::default_accepted_prelude_max_concurrent(),
4094 MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT
4095 );
4096 SessionOptions::set_default_accepted_prelude_max_concurrent(previous_default);
4097 assert_eq!(
4098 normalize_accepted_prelude_read_timeout(SessionOptions::default()),
4099 Some(DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT)
4100 );
4101 let custom = SessionOptions::new()
4102 .accepted_prelude_read_timeout(Duration::from_secs(2))
4103 .accepted_prelude_max_concurrent(16);
4104 assert_eq!(
4105 normalize_accepted_prelude_read_timeout(custom),
4106 Some(Duration::from_secs(2))
4107 );
4108 assert_eq!(
4109 normalize_accepted_prelude_max_concurrent(custom.accepted_prelude_max_concurrent),
4110 16
4111 );
4112 assert_eq!(
4113 normalize_accepted_prelude_read_timeout(
4114 SessionOptions::new()
4115 .accepted_prelude_read_timeout(Duration::from_secs(2))
4116 .disable_accepted_prelude_read_timeout()
4117 ),
4118 None
4119 );
4120 assert_eq!(
4121 normalize_accepted_prelude_read_timeout(
4122 SessionOptions::new()
4123 .disable_accepted_prelude_read_timeout()
4124 .accepted_prelude_read_timeout(Duration::from_secs(2))
4125 ),
4126 Some(Duration::from_secs(2))
4127 );
4128 assert_eq!(
4129 normalize_accepted_prelude_read_timeout(SessionOptions {
4130 accepted_prelude_read_timeout: AcceptedPreludeReadTimeout::Timeout(Duration::ZERO),
4131 ..SessionOptions::default()
4132 }),
4133 Some(DEFAULT_ACCEPTED_PRELUDE_READ_TIMEOUT)
4134 );
4135 assert_eq!(
4136 normalize_accepted_prelude_read_timeout(SessionOptions {
4137 accepted_prelude_read_timeout: AcceptedPreludeReadTimeout::Disabled,
4138 ..SessionOptions::default()
4139 }),
4140 None
4141 );
4142 assert_eq!(normalize_accepted_prelude_max_concurrent(Some(3)), 3);
4143 assert_eq!(
4144 normalize_accepted_prelude_max_concurrent(Some(
4145 MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT + 1
4146 )),
4147 MAX_ACCEPTED_PRELUDE_MAX_CONCURRENT
4148 );
4149 }
4150
4151 #[test]
4152 fn total_bytes_counts_empty_vectored_parts() {
4153 let parts = [IoSlice::new(b"abc"), IoSlice::new(b""), IoSlice::new(b"de")];
4154 assert_eq!(total_bytes(parts.iter().map(|part| part.len())).unwrap(), 5);
4155 }
4156
4157 #[test]
4158 fn total_bytes_rejects_usize_overflow() {
4159 let err = total_bytes([usize::MAX, 1]).unwrap_err();
4160 assert_eq!(err.scope(), zmux::ErrorScope::Stream);
4161 assert_eq!(err.source(), zmux::ErrorSource::Local);
4162 assert_eq!(err.operation(), zmux::ErrorOperation::Write);
4163 assert_eq!(err.direction(), zmux::ErrorDirection::Write);
4164 }
4165
4166 #[test]
4167 fn support_saturating_helpers_clamp_at_integer_limits() {
4168 let counter = AtomicU64::new(u64::MAX);
4169 saturating_add_atomic_u64(&counter, 1);
4170 assert_eq!(counter.load(Ordering::Relaxed), u64::MAX);
4171
4172 let counter = AtomicU64::new(u64::MAX - 1);
4173 saturating_add_atomic_u64(&counter, 2);
4174 assert_eq!(counter.load(Ordering::Relaxed), u64::MAX);
4175
4176 let counter = AtomicUsize::new(usize::MAX);
4177 saturating_increment_atomic_usize(&counter);
4178 assert_eq!(counter.load(Ordering::Relaxed), usize::MAX);
4179
4180 assert_eq!(
4181 duration_nanos_saturating(Duration::from_secs(u64::MAX)),
4182 u64::MAX - 1
4183 );
4184 }
4185
4186 #[test]
4187 fn adapter_reason_stats_bound_distinct_codes_and_count_overflow() {
4188 let mut reasons = AdapterReasonStats::default();
4189 let overflow = 5;
4190 for i in 0..MAX_REASON_STATS_CODES + overflow {
4191 let code = u64::try_from(i).unwrap();
4192 reasons.note_reset(10_000 + code);
4193 reasons.note_abort(20_000 + code);
4194 }
4195 reasons.note_reset(10_000);
4196 reasons.note_abort(20_000);
4197
4198 let snapshot = reasons.snapshot();
4199 assert_eq!(snapshot.reset.len(), MAX_REASON_STATS_CODES);
4200 assert_eq!(snapshot.abort.len(), MAX_REASON_STATS_CODES);
4201 assert_eq!(snapshot.reset_overflow, u64::try_from(overflow).unwrap());
4202 assert_eq!(snapshot.abort_overflow, u64::try_from(overflow).unwrap());
4203 assert_eq!(snapshot.reset.get(&10_000), Some(&2));
4204 assert_eq!(snapshot.abort.get(&20_000), Some(&2));
4205 }
4206
4207 #[test]
4208 fn single_non_empty_io_slice_skips_empty_parts() {
4209 let parts = [IoSlice::new(b""), IoSlice::new(b"abc"), IoSlice::new(b"")];
4210 assert_eq!(single_non_empty_io_slice(&parts), Some(&b"abc"[..]));
4211
4212 let parts = [IoSlice::new(b"abc"), IoSlice::new(b"de")];
4213 assert_eq!(single_non_empty_io_slice(&parts), None);
4214
4215 let parts = [IoSlice::new(b""), IoSlice::new(b"")];
4216 assert_eq!(single_non_empty_io_slice(&parts), None);
4217 }
4218
4219 #[test]
4220 fn prelude_metadata_len_rejects_cap_overflow() {
4221 let err = checked_prelude_metadata_len(STREAM_PRELUDE_MAX_PAYLOAD, 1).unwrap_err();
4222 assert!(err.is_error_code(zmux::ErrorCode::Protocol));
4223 assert_eq!(
4224 checked_prelude_metadata_len(STREAM_PRELUDE_MAX_PAYLOAD - 1, 1).unwrap(),
4225 usize::try_from(STREAM_PRELUDE_MAX_PAYLOAD - 1).unwrap()
4226 );
4227 }
4228
4229 #[test]
4230 fn active_guard_finishes_once_before_drop() {
4231 let counters = Arc::new(ActiveCounters::default());
4232 let guard = ActiveGuard::new(counters.clone(), ActiveKind::LocalBidi);
4233 assert_eq!(counters.snapshot().total, 1);
4234 guard.finish();
4235 assert_eq!(counters.snapshot().total, 0);
4236 drop(guard);
4237 assert_eq!(counters.snapshot().total, 0);
4238 }
4239
4240 #[test]
4241 fn adapter_stats_snapshot_reports_progress_without_ping_fields() {
4242 let stats = AdapterStats::new();
4243 let now = Instant::now();
4244 stats.note_accepted_stream();
4245 stats.note_hidden_refused();
4246 stats.note_reset_reason(7);
4247 stats.note_abort_reason(9);
4248 stats.note_data_write(7, now);
4249 stats.note_data_read(3, now);
4250 stats.note_flush(7, now);
4251 stats.note_control_progress_at(now);
4252 stats.note_open_latency(now, now + Duration::from_millis(2));
4253 let snapshot = stats.snapshot();
4254 assert_eq!(snapshot.accepted_streams, 1);
4255 assert_eq!(snapshot.hidden_refused, 1);
4256 assert_eq!(snapshot.reasons.reset.get(&7), Some(&1));
4257 assert_eq!(snapshot.reasons.abort.get(&9), Some(&1));
4258 assert_eq!(snapshot.sent_data_bytes, 7);
4259 assert_eq!(snapshot.received_data_bytes, 3);
4260 assert_eq!(snapshot.flush.count, 1);
4261 assert_eq!(snapshot.flush.last_bytes, 7);
4262 assert_eq!(
4263 snapshot.telemetry.last_open_latency,
4264 Some(Duration::from_millis(2))
4265 );
4266 assert!(snapshot.progress.control_progress_at.is_some());
4267 assert!(snapshot.progress.transport_write_at.is_some());
4268 assert!(snapshot.progress.stream_progress_at.is_some());
4269 assert!(snapshot.progress.application_progress_at.is_some());
4270 assert!(snapshot.progress.inbound_frame_at.is_some());
4271 assert!(snapshot.progress.ping_sent_at.is_none());
4272 assert!(snapshot.progress.pong_at.is_none());
4273 }
4274}