Skip to main content

moq_net/model/
frame.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::task::{Poll, ready};
4
5use bytes::buf::UninitSlice;
6use bytes::{BufMut, Bytes};
7
8use crate::{Error, Result};
9
10/// Maximum payload size accepted for a single frame on the wire.
11///
12/// The receive path preallocates a buffer from the declared frame size, so an
13/// untrusted peer could otherwise request a multi-gigabyte allocation with a
14/// single varint. Subscribers reject frames whose declared size exceeds this.
15///
16/// Matches the per-group cache cap (`MAX_GROUP_CACHE`), so a single frame may fill
17/// a group. 16 MiB was too tight for a high-bitrate CMAF fragment carried as one
18/// frame; 32 MiB covers that while keeping the per-frame preallocation bounded.
19///
20/// Enforced on the wire decode (above) and in
21/// [`GroupProducer::create_frame`](super::group::GroupProducer::create_frame), which rejects an
22/// oversized frame *before* `produce()` allocates its buffer. A direct `FrameProducer::new` still
23/// allocates ahead of the [`append_frame`](super::group::GroupProducer::append_frame) backstop;
24/// closing that gap means a fallible constructor, which is a breaking change left to a separate `dev` PR.
25pub(crate) const MAX_FRAME_SIZE: u64 = 32 * 1024 * 1024;
26
27/// A chunk of data with an upfront size.
28///
29/// Note that this is just the header.
30/// You use [FrameProducer] and [FrameConsumer] to deal with the frame payload, potentially chunked.
31#[derive(Clone, Debug)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct Frame {
34	/// Total payload size in bytes. Declared up front so consumers can preallocate.
35	pub size: u64,
36}
37
38impl Frame {
39	/// Create a new producer for the frame.
40	pub fn produce(self) -> FrameProducer {
41		FrameProducer::new(self)
42	}
43}
44
45impl From<usize> for Frame {
46	fn from(size: usize) -> Self {
47		Self { size: size as u64 }
48	}
49}
50
51impl From<u64> for Frame {
52	fn from(size: u64) -> Self {
53		Self { size }
54	}
55}
56
57impl From<u32> for Frame {
58	fn from(size: u32) -> Self {
59		Self { size: size as u64 }
60	}
61}
62
63impl From<u16> for Frame {
64	fn from(size: u16) -> Self {
65		Self { size: size as u64 }
66	}
67}
68
69/// Single-allocation buffer shared between a [FrameProducer] and many [FrameConsumer]s.
70///
71/// Internally an [Arc] over a thin pointer + length owning a heap allocation. The
72/// data pointer is stable for the life of any clone, so [Bytes] views taken via
73/// [Bytes::from_owner] remain valid. [Clone] is cheap (one atomic increment).
74///
75/// The producer writes through the raw pointer (sole writer); `written` provides
76/// happens-before for cross-thread reads. Implements [AsRef]<[u8]> directly so it
77/// can be passed to [Bytes::from_owner] without an extra wrapper newtype.
78#[derive(Clone)]
79struct FrameBuf(Arc<FrameBufInner>);
80
81struct FrameBufInner {
82	// Owned heap allocation of `capacity` bytes (zero-initialized).
83	data: *mut u8,
84	capacity: usize,
85	written: AtomicUsize,
86}
87
88// Safety: `data` is owned (Box-allocated, freed in Drop); the producer is the
89// sole writer; consumers only read bytes `< written`, which was set via Release
90// after the corresponding writes completed (Acquire pairs on the consumer side).
91unsafe impl Send for FrameBufInner {}
92unsafe impl Sync for FrameBufInner {}
93
94impl Drop for FrameBufInner {
95	fn drop(&mut self) {
96		// Safety: data was obtained from `Box::into_raw` of a `Box<[u8]>` of
97		// length `capacity` and is not aliased at drop (Arc refcount hit 0).
98		unsafe {
99			let slice = std::ptr::slice_from_raw_parts_mut(self.data, self.capacity);
100			drop(Box::from_raw(slice));
101		}
102	}
103}
104
105impl FrameBuf {
106	fn new(size: usize) -> Self {
107		let boxed: Box<[u8]> = vec![0u8; size].into_boxed_slice();
108		let capacity = boxed.len();
109		let data = Box::into_raw(boxed) as *mut u8;
110		Self(Arc::new(FrameBufInner {
111			data,
112			capacity,
113			written: AtomicUsize::new(0),
114		}))
115	}
116
117	fn capacity(&self) -> usize {
118		self.0.capacity
119	}
120
121	fn written(&self, ord: Ordering) -> usize {
122		self.0.written.load(ord)
123	}
124
125	/// Safety: caller must be the sole producer (FrameProducer-as-BufMut invariant).
126	unsafe fn data_ptr(&self) -> *mut u8 {
127		self.0.data
128	}
129
130	/// Safety: caller must be the sole producer; `new_written` must be `<= capacity`.
131	unsafe fn store_written(&self, new_written: usize) {
132		// Release pairs with consumers' Acquire load to publish prior writes.
133		self.0.written.store(new_written, Ordering::Release);
134	}
135}
136
137impl AsRef<[u8]> for FrameBuf {
138	fn as_ref(&self) -> &[u8] {
139		// Snapshot the initialized region (bytes the producer has written so far).
140		// Acquire pairs with the producer's Release on `written`.
141		let written = self.0.written.load(Ordering::Acquire);
142		// Safety: data..data+written is initialized (zero-init at alloc + producer
143		// writes up to `written`). The Arc keeps the allocation alive while any
144		// reference to the slice lives.
145		unsafe { std::slice::from_raw_parts(self.0.data, written) }
146	}
147}
148
149#[derive(Default, Debug)]
150struct FrameState {
151	// Whether the producer signaled a clean finish (written == capacity).
152	fin: bool,
153	// The error that aborted the frame, if any.
154	abort: Option<Error>,
155}
156
157/// Writes a frame's payload in one or more chunks.
158///
159/// The total bytes written must exactly match [Frame::size].
160/// Call [Self::finish] after writing all bytes to verify correctness.
161///
162/// Implements [BufMut] so the receive path can write directly into the
163/// pre-allocated buffer (e.g. via `tokio::io::AsyncReadExt::read_buf`).
164pub struct FrameProducer {
165	info: Frame,
166	state: kio::Producer<FrameState>,
167	buf: FrameBuf,
168}
169
170impl std::ops::Deref for FrameProducer {
171	type Target = Frame;
172
173	fn deref(&self) -> &Self::Target {
174		&self.info
175	}
176}
177
178impl FrameProducer {
179	/// Create a new frame producer for the given frame header.
180	pub fn new(info: Frame) -> Self {
181		let buf = FrameBuf::new(info.size as usize);
182		Self {
183			info,
184			state: kio::Producer::new(FrameState::default()),
185			buf,
186		}
187	}
188
189	/// Write a chunk of data to the frame.
190	///
191	/// Returns [Error::WrongSize] if the chunk would exceed the remaining bytes.
192	pub fn write<B: Into<Bytes>>(&mut self, chunk: B) -> Result<()> {
193		let chunk = chunk.into();
194		if chunk.len() > self.remaining_mut() {
195			return Err(Error::WrongSize);
196		}
197		// Surface aborts before writing.
198		self.bail_if_aborted()?;
199		self.put_slice(&chunk);
200		Ok(())
201	}
202
203	/// Verify that all bytes have been written.
204	///
205	/// Returns [Error::WrongSize] if the bytes written don't match [Frame::size].
206	pub fn finish(&mut self) -> Result<()> {
207		let written = self.buf.written(Ordering::Acquire);
208		if written != self.buf.capacity() {
209			return Err(Error::WrongSize);
210		}
211		// Mark fin (idempotent if `advance_mut` already set it on the last byte).
212		let mut state = self.modify()?;
213		state.fin = true;
214		Ok(())
215	}
216
217	/// Abort the frame with the given error.
218	pub fn abort(&mut self, err: Error) -> Result<()> {
219		let mut guard = self.modify()?;
220		guard.abort = Some(err);
221		guard.close();
222		Ok(())
223	}
224
225	/// Create a new consumer for the frame.
226	pub fn consume(&self) -> FrameConsumer {
227		FrameConsumer {
228			info: self.info.clone(),
229			state: self.state.consume(),
230			buf: self.buf.clone(),
231			read_idx: 0,
232		}
233	}
234
235	/// Block until there are no active consumers.
236	pub async fn unused(&self) -> Result<()> {
237		self.state
238			.unused()
239			.await
240			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
241	}
242
243	fn modify(&mut self) -> Result<kio::Mut<'_, FrameState>> {
244		self.state
245			.write()
246			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
247	}
248
249	fn bail_if_aborted(&self) -> Result<()> {
250		let state = self.state.read();
251		if let Some(err) = &state.abort {
252			return Err(err.clone());
253		}
254		Ok(())
255	}
256}
257
258// Safety: `chunk_mut` returns a slice into the producer-private region of the
259// buffer (`[written..capacity]`). Sole-writer invariant: even though
260// `FrameProducer` is `Clone`, the API exposes BufMut only via `&mut self`,
261// and existing callers never share a single producer between concurrent writers
262// (group.rs clones a handle for `abort` / `consume` only). The defensive
263// `assert!` in `advance_mut` panics loudly if that invariant is ever violated.
264unsafe impl BufMut for FrameProducer {
265	fn remaining_mut(&self) -> usize {
266		self.buf.capacity() - self.buf.written(Ordering::Acquire)
267	}
268
269	fn chunk_mut(&mut self) -> &mut UninitSlice {
270		let written = self.buf.written(Ordering::Acquire);
271		let cap = self.buf.capacity();
272		// Safety: writes to `[written..cap]` are unaliased — consumers only ever
273		// read `[..written]`, and we hold `&mut self`. The slice's lifetime is
274		// tied to `&mut self` by the function signature.
275		unsafe {
276			let ptr = self.buf.data_ptr().add(written);
277			UninitSlice::from_raw_parts_mut(ptr, cap - written)
278		}
279	}
280
281	unsafe fn advance_mut(&mut self, cnt: usize) {
282		let cap = self.buf.capacity();
283		let prev = self.buf.written(Ordering::Relaxed);
284		assert!(
285			prev + cnt <= cap,
286			"advance_mut past frame.size: prev={prev} cnt={cnt} cap={cap}"
287		);
288		// Safety: sole-writer invariant + bounds-checked above.
289		unsafe { self.buf.store_written(prev + cnt) };
290
291		// Briefly take the kio write lock to wake waiters; drop of `Mut`
292		// triggers kio's notify. Also flip `fin` if we just filled the buffer.
293		if let Ok(mut state) = self.state.write() {
294			if prev + cnt == cap {
295				state.fin = true;
296			}
297		}
298	}
299}
300
301impl Clone for FrameProducer {
302	fn clone(&self) -> Self {
303		Self {
304			info: self.info.clone(),
305			state: self.state.clone(),
306			buf: self.buf.clone(),
307		}
308	}
309}
310
311impl From<Frame> for FrameProducer {
312	fn from(info: Frame) -> Self {
313		FrameProducer::new(info)
314	}
315}
316
317/// Used to consume a frame's worth of data, streaming as bytes arrive.
318#[derive(Clone)]
319pub struct FrameConsumer {
320	info: Frame,
321	state: kio::Consumer<FrameState>,
322	buf: FrameBuf,
323	// Byte offset into the buffer; cloned consumers inherit this offset and
324	// read independently from there.
325	read_idx: usize,
326}
327
328impl std::ops::Deref for FrameConsumer {
329	type Target = Frame;
330
331	fn deref(&self) -> &Self::Target {
332		&self.info
333	}
334}
335
336impl FrameConsumer {
337	// A helper to automatically apply Dropped if the state is closed without an error.
338	fn poll<F, R>(&self, waiter: &kio::Waiter, f: F) -> Poll<Result<R>>
339	where
340		F: Fn(&kio::Ref<'_, FrameState>) -> Poll<Result<R>>,
341	{
342		Poll::Ready(match ready!(self.state.poll(waiter, f)) {
343			Ok(res) => res,
344			Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
345		})
346	}
347
348	fn snapshot(&self, read_idx: usize) -> Option<Bytes> {
349		// Acquire pairs with the producer's Release on `written`, making the
350		// bytes in `[..written]` visible to this thread.
351		let written = self.buf.written(Ordering::Acquire);
352		if written > read_idx {
353			Some(Bytes::from_owner(self.buf.clone()).slice(read_idx..written))
354		} else {
355			None
356		}
357	}
358
359	/// Poll for all remaining data without blocking.
360	///
361	/// Waits until the frame is finished (written == size); then returns the
362	/// remaining bytes from `read_idx` to the end as a single zero-copy slice.
363	pub fn poll_read_all(&mut self, waiter: &kio::Waiter) -> Poll<Result<Bytes>> {
364		let read_idx = self.read_idx;
365		let res = ready!(self.poll(waiter, |state| {
366			if state.fin {
367				return Poll::Ready(Ok(()));
368			}
369			if let Some(err) = &state.abort {
370				return Poll::Ready(Err(err.clone()));
371			}
372			Poll::Pending
373		}));
374		match res {
375			Ok(()) => {
376				// Frame is finished: written == capacity.
377				let bytes = self
378					.snapshot(read_idx)
379					.unwrap_or_else(|| Bytes::from_owner(self.buf.clone()).slice(read_idx..read_idx));
380				self.read_idx = self.buf.capacity();
381				Poll::Ready(Ok(bytes))
382			}
383			Err(e) => Poll::Ready(Err(e)),
384		}
385	}
386
387	/// Return all of the remaining bytes, blocking until the frame is finished.
388	pub async fn read_all(&mut self) -> Result<Bytes> {
389		kio::wait(|waiter| self.poll_read_all(waiter)).await
390	}
391
392	/// Poll for all remaining bytes (split into a single-element vec for backwards
393	/// compatibility with the previous chunk-based API).
394	pub fn poll_read_all_chunks(&mut self, waiter: &kio::Waiter) -> Poll<Result<Vec<Bytes>>> {
395		let bytes = ready!(self.poll_read_all(waiter)?);
396		Poll::Ready(Ok(if bytes.is_empty() { Vec::new() } else { vec![bytes] }))
397	}
398
399	/// Poll for the next chunk of bytes since the last read.
400	///
401	/// Returns whatever bytes have been written since the consumer's `read_idx` —
402	/// could span multiple producer writes. Returns `None` once the frame is
403	/// finished and all bytes have been consumed.
404	pub fn poll_read_chunk(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<Bytes>>> {
405		let read_idx = self.read_idx;
406		let res = ready!(self.poll(waiter, |state| {
407			let written = self.buf.written(Ordering::Acquire);
408			if written > read_idx {
409				return Poll::Ready(Ok(Some(written)));
410			}
411			if state.fin {
412				return Poll::Ready(Ok(None));
413			}
414			if let Some(err) = &state.abort {
415				return Poll::Ready(Err(err.clone()));
416			}
417			Poll::Pending
418		}));
419		match res {
420			Ok(Some(written)) => {
421				let bytes = Bytes::from_owner(self.buf.clone()).slice(read_idx..written);
422				self.read_idx = written;
423				Poll::Ready(Ok(Some(bytes)))
424			}
425			Ok(None) => Poll::Ready(Ok(None)),
426			Err(e) => Poll::Ready(Err(e)),
427		}
428	}
429
430	/// Return the next chunk of bytes since the last read.
431	pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
432		kio::wait(|waiter| self.poll_read_chunk(waiter)).await
433	}
434
435	/// Poll for the next chunk; for backwards compatibility, wraps
436	/// [Self::poll_read_chunk] in a vec (single element if any data is available).
437	pub fn poll_read_chunks(&mut self, waiter: &kio::Waiter) -> Poll<Result<Vec<Bytes>>> {
438		match ready!(self.poll_read_chunk(waiter)?) {
439			Some(b) => Poll::Ready(Ok(vec![b])),
440			None => Poll::Ready(Ok(Vec::new())),
441		}
442	}
443
444	/// Read the next chunk into a vector (single element if available, empty on eof).
445	pub async fn read_chunks(&mut self) -> Result<Vec<Bytes>> {
446		kio::wait(|waiter| self.poll_read_chunks(waiter)).await
447	}
448}
449
450#[cfg(test)]
451mod test {
452	use super::*;
453	use futures::FutureExt;
454
455	#[test]
456	fn single_chunk_roundtrip() {
457		let mut producer = Frame { size: 5 }.produce();
458		producer.write(Bytes::from_static(b"hello")).unwrap();
459		producer.finish().unwrap();
460
461		let mut consumer = producer.consume();
462		let data = consumer.read_all().now_or_never().unwrap().unwrap();
463		assert_eq!(data, Bytes::from_static(b"hello"));
464	}
465
466	#[test]
467	fn multi_chunk_read_all() {
468		let mut producer = Frame { size: 10 }.produce();
469		producer.write(Bytes::from_static(b"hello")).unwrap();
470		producer.write(Bytes::from_static(b"world")).unwrap();
471		producer.finish().unwrap();
472
473		let mut consumer = producer.consume();
474		let data = consumer.read_all().now_or_never().unwrap().unwrap();
475		assert_eq!(data, Bytes::from_static(b"helloworld"));
476	}
477
478	#[test]
479	fn read_chunk_sequential() {
480		let mut producer = Frame { size: 10 }.produce();
481		producer.write(Bytes::from_static(b"hello")).unwrap();
482		// Each read_chunk returns whatever is new since the last call,
483		// which may span multiple writes.
484		let mut consumer = producer.consume();
485		let c1 = consumer.read_chunk().now_or_never().unwrap().unwrap();
486		assert_eq!(c1, Some(Bytes::from_static(b"hello")));
487
488		producer.write(Bytes::from_static(b"world")).unwrap();
489		producer.finish().unwrap();
490
491		let c2 = consumer.read_chunk().now_or_never().unwrap().unwrap();
492		assert_eq!(c2, Some(Bytes::from_static(b"world")));
493		let c3 = consumer.read_chunk().now_or_never().unwrap().unwrap();
494		assert_eq!(c3, None);
495	}
496
497	#[test]
498	fn read_all_chunks() {
499		let mut producer = Frame { size: 10 }.produce();
500		producer.write(Bytes::from_static(b"hello")).unwrap();
501		producer.write(Bytes::from_static(b"world")).unwrap();
502		producer.finish().unwrap();
503
504		let mut consumer = producer.consume();
505		let chunks = consumer.read_chunks().now_or_never().unwrap().unwrap();
506		assert_eq!(chunks.len(), 1);
507		assert_eq!(chunks[0], Bytes::from_static(b"helloworld"));
508	}
509
510	#[test]
511	fn finish_checks_remaining() {
512		let mut producer = Frame { size: 5 }.produce();
513		producer.write(Bytes::from_static(b"hi")).unwrap();
514		let err = producer.finish().unwrap_err();
515		assert!(matches!(err, Error::WrongSize));
516	}
517
518	#[test]
519	fn write_too_many_bytes() {
520		let mut producer = Frame { size: 3 }.produce();
521		let err = producer.write(Bytes::from_static(b"toolong")).unwrap_err();
522		assert!(matches!(err, Error::WrongSize));
523	}
524
525	#[test]
526	fn abort_propagates() {
527		let mut producer = Frame { size: 5 }.produce();
528		let mut consumer = producer.consume();
529		producer.abort(Error::Cancel).unwrap();
530
531		let err = consumer.read_all().now_or_never().unwrap().unwrap_err();
532		assert!(matches!(err, Error::Cancel));
533	}
534
535	#[test]
536	fn empty_frame() {
537		let mut producer = Frame { size: 0 }.produce();
538		producer.finish().unwrap();
539
540		let mut consumer = producer.consume();
541		let data = consumer.read_all().now_or_never().unwrap().unwrap();
542		assert_eq!(data, Bytes::new());
543	}
544
545	#[tokio::test]
546	async fn pending_then_ready() {
547		let mut producer = Frame { size: 5 }.produce();
548		let mut consumer = producer.consume();
549
550		// Consumer blocks because no data yet.
551		assert!(consumer.read_all().now_or_never().is_none());
552
553		producer.write(Bytes::from_static(b"hello")).unwrap();
554		producer.finish().unwrap();
555
556		let data = consumer.read_all().now_or_never().unwrap().unwrap();
557		assert_eq!(data, Bytes::from_static(b"hello"));
558	}
559
560	#[test]
561	fn buf_mut_roundtrip() {
562		// Exercise the BufMut path that the receive loop uses via `read_buf`.
563		let mut producer = Frame { size: 12 }.produce();
564		assert_eq!(producer.remaining_mut(), 12);
565		producer.put_slice(b"hello");
566		assert_eq!(producer.remaining_mut(), 7);
567		producer.put_slice(b" world!");
568		assert_eq!(producer.remaining_mut(), 0);
569		producer.finish().unwrap();
570
571		let mut consumer = producer.consume();
572		let data = consumer.read_all().now_or_never().unwrap().unwrap();
573		assert_eq!(data, Bytes::from_static(b"hello world!"));
574	}
575
576	#[test]
577	#[should_panic(expected = "advance_mut past frame.size")]
578	fn buf_mut_advance_past_capacity_panics() {
579		let mut producer = Frame { size: 4 }.produce();
580		// Safety violation on purpose: cnt > remaining_mut().
581		unsafe { producer.advance_mut(5) };
582	}
583
584	#[test]
585	fn read_chunk_streams_partial_writes() {
586		let mut producer = Frame { size: 6 }.produce();
587		let mut consumer = producer.consume();
588
589		producer.write(Bytes::from_static(b"foo")).unwrap();
590		let c1 = consumer.read_chunk().now_or_never().unwrap().unwrap();
591		assert_eq!(c1, Some(Bytes::from_static(b"foo")));
592
593		// No new data → pending.
594		assert!(consumer.read_chunk().now_or_never().is_none());
595
596		producer.write(Bytes::from_static(b"bar")).unwrap();
597		producer.finish().unwrap();
598		let c2 = consumer.read_chunk().now_or_never().unwrap().unwrap();
599		assert_eq!(c2, Some(Bytes::from_static(b"bar")));
600		let c3 = consumer.read_chunk().now_or_never().unwrap().unwrap();
601		assert_eq!(c3, None);
602	}
603
604	#[test]
605	fn cloned_consumer_independent_cursor() {
606		let mut producer = Frame { size: 10 }.produce();
607		let mut c1 = producer.consume();
608		producer.write(Bytes::from_static(b"hello")).unwrap();
609
610		// c1 reads the first 5 bytes, then we clone — c2 inherits c1's cursor.
611		let chunk = c1.read_chunk().now_or_never().unwrap().unwrap();
612		assert_eq!(chunk, Some(Bytes::from_static(b"hello")));
613		let mut c2 = c1.clone();
614
615		producer.write(Bytes::from_static(b"world")).unwrap();
616		producer.finish().unwrap();
617
618		// Both consumers now see "world" as their next chunk.
619		let chunk = c1.read_chunk().now_or_never().unwrap().unwrap();
620		assert_eq!(chunk, Some(Bytes::from_static(b"world")));
621		let chunk = c2.read_chunk().now_or_never().unwrap().unwrap();
622		assert_eq!(chunk, Some(Bytes::from_static(b"world")));
623	}
624}