moq_transfork/model/
group.rs

1//! A group is a stream of frames, split into a [Producer] and [Consumer] handle.
2//!
3//! A [Producer] writes an ordered stream of frames.
4//! Frames can be written all at once, or in chunks.
5//!
6//! A [Consumer] reads an ordered stream of frames.
7//! The reader can be cloned, in which case each reader receives a copy of each frame. (fanout)
8//!
9//! The stream is closed with [ServeError::MoqError] when all writers or readers are dropped.
10use bytes::Bytes;
11use std::ops;
12use tokio::sync::watch;
13
14use crate::Error;
15
16use super::{Frame, FrameConsumer, FrameProducer};
17
18/// An independent group of frames.
19#[derive(Clone, PartialEq, Debug)]
20pub struct Group {
21	// The sequence number of the group within the track.
22	// NOTE: These may be received out of order
23	pub sequence: u64,
24}
25
26impl Group {
27	pub fn new(sequence: u64) -> Group {
28		Self { sequence }
29	}
30
31	pub fn produce(self) -> (GroupProducer, GroupConsumer) {
32		let (send, recv) = watch::channel(GroupState::default());
33
34		let writer = GroupProducer::new(send, self.clone());
35		let reader = GroupConsumer::new(recv, self);
36
37		(writer, reader)
38	}
39}
40
41#[derive(Debug)]
42struct GroupState {
43	// The frames that has been written thus far
44	frames: Vec<FrameConsumer>,
45
46	// Set when the writer or all readers are dropped.
47	closed: Result<(), Error>,
48}
49
50impl Default for GroupState {
51	fn default() -> Self {
52		Self {
53			frames: Vec::new(),
54			closed: Ok(()),
55		}
56	}
57}
58
59/// Create a group, frame-by-frame.
60#[derive(Clone, Debug)]
61pub struct GroupProducer {
62	// Mutable stream state.
63	state: watch::Sender<GroupState>,
64
65	// Immutable stream state.
66	pub info: Group,
67}
68
69impl GroupProducer {
70	fn new(state: watch::Sender<GroupState>, info: Group) -> Self {
71		Self { state, info }
72	}
73
74	// Write a frame in one go
75	pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
76		let frame = frame.into();
77		self.create_frame(frame.len()).write(frame);
78	}
79
80	// Create a frame with an upfront size
81	pub fn create_frame(&mut self, size: usize) -> FrameProducer {
82		let (writer, reader) = Frame::new(size).produce();
83		self.state.send_modify(|state| state.frames.push(reader));
84		writer
85	}
86
87	pub fn frame_count(&self) -> usize {
88		self.state.borrow().frames.len()
89	}
90
91	/// Create a new consumer for the group.
92	pub fn subscribe(&self) -> GroupConsumer {
93		GroupConsumer::new(self.state.subscribe(), self.info.clone())
94	}
95
96	/// Close the stream with an error.
97	pub fn close(self, err: Error) {
98		self.state.send_modify(|state| {
99			state.closed = Err(err);
100		});
101	}
102}
103
104impl ops::Deref for GroupProducer {
105	type Target = Group;
106
107	fn deref(&self) -> &Self::Target {
108		&self.info
109	}
110}
111
112/// Consume a group, frame-by-frame.
113#[derive(Clone, Debug)]
114pub struct GroupConsumer {
115	// Modify the stream state.
116	state: watch::Receiver<GroupState>,
117
118	// Immutable stream state.
119	pub info: Group,
120
121	// The number of frames we've read.
122	// NOTE: Cloned readers inherit this offset, but then run in parallel.
123	index: usize,
124
125	// Used to make read_frame cancel safe.
126	active: Option<FrameConsumer>,
127}
128
129impl GroupConsumer {
130	fn new(state: watch::Receiver<GroupState>, group: Group) -> Self {
131		Self {
132			state,
133			info: group,
134			index: 0,
135			active: None,
136		}
137	}
138
139	// Read the next frame.
140	pub async fn read_frame(&mut self) -> Result<Option<Bytes>, Error> {
141		// In order to be cancel safe, we need to save the active frame.
142		// That way if this method gets caneclled, we can resume where we left off.
143		if self.active.is_none() {
144			self.active = match self.next_frame().await? {
145				Some(frame) => Some(frame),
146				None => return Ok(None),
147			};
148		};
149
150		// Read the frame in one go, which is cancel safe.
151		let frame = self.active.as_mut().unwrap().read_all().await?;
152		self.active = None;
153
154		Ok(Some(frame))
155	}
156
157	// Return a reader for the next frame.
158	pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>, Error> {
159		// Just in case someone called read_frame, cancelled it, then called next_frame.
160		if let Some(frame) = self.active.take() {
161			return Ok(Some(frame));
162		}
163
164		loop {
165			{
166				let state = self.state.borrow_and_update();
167
168				if let Some(frame) = state.frames.get(self.index).cloned() {
169					self.index += 1;
170					return Ok(Some(frame));
171				}
172
173				state.closed.clone()?;
174			}
175
176			if self.state.changed().await.is_err() {
177				return Ok(None);
178			}
179		}
180	}
181
182	pub async fn closed(&self) -> Result<(), Error> {
183		match self.state.clone().wait_for(|state| state.closed.is_err()).await {
184			Ok(state) => state.closed.clone(),
185			Err(_) => Ok(()),
186		}
187	}
188}
189
190impl ops::Deref for GroupConsumer {
191	type Target = Group;
192
193	fn deref(&self) -> &Self::Target {
194		&self.info
195	}
196}