moq_transfork/model/
router.rs

1use std::ops;
2
3use tokio::sync::{mpsc, oneshot};
4
5use crate::{Error, Track, TrackConsumer, TrackProducer};
6
7/// Used to respond to arbitrary track requests.
8pub struct Router {
9	/// The maximum number of requests that can be queued before blocking.
10	pub capacity: usize,
11}
12
13impl Router {
14	pub fn produce(&self) -> (RouterProducer, RouterConsumer) {
15		let (send, recv) = mpsc::channel(self.capacity);
16
17		let writer = RouterProducer::new(recv);
18		let reader = RouterConsumer::new(send);
19
20		(writer, reader)
21	}
22}
23
24impl Default for Router {
25	fn default() -> Self {
26		Self { capacity: 32 }
27	}
28}
29
30/// Receive broadcast/track requests and return if we can fulfill them.
31pub struct RouterProducer {
32	queue: mpsc::Receiver<RouterRequest>,
33}
34
35impl RouterProducer {
36	fn new(queue: mpsc::Receiver<RouterRequest>) -> Self {
37		Self { queue }
38	}
39
40	pub async fn requested(&mut self) -> Option<RouterRequest> {
41		self.queue.recv().await
42	}
43}
44
45/// Subscribe to abitrary broadcast/tracks.
46#[derive(Clone)]
47pub struct RouterConsumer {
48	queue: mpsc::Sender<RouterRequest>,
49}
50
51impl RouterConsumer {
52	fn new(queue: mpsc::Sender<RouterRequest>) -> Self {
53		Self { queue }
54	}
55
56	pub async fn subscribe(&self, track: Track) -> Result<TrackConsumer, Error> {
57		let (send, recv) = oneshot::channel();
58		let request = RouterRequest { track, reply: send };
59
60		if self.queue.send(request).await.is_err() {
61			return Err(Error::Cancel);
62		}
63
64		recv.await.map_err(|_| Error::Cancel)?
65	}
66
67	pub async fn closed(&self) {
68		self.queue.closed().await;
69	}
70}
71
72/// An outstanding request for a path.
73pub struct RouterRequest {
74	pub track: Track,
75	reply: oneshot::Sender<Result<TrackConsumer, Error>>,
76}
77
78impl RouterRequest {
79	pub fn serve(self, reader: TrackConsumer) {
80		self.reply.send(Ok(reader)).ok();
81	}
82
83	pub fn produce(self) -> TrackProducer {
84		let (writer, reader) = self.track.produce();
85		self.reply.send(Ok(reader)).ok();
86		writer
87	}
88
89	pub fn close(self, error: Error) {
90		self.reply.send(Err(error)).ok();
91	}
92}
93
94impl ops::Deref for RouterRequest {
95	type Target = Track;
96
97	fn deref(&self) -> &Self::Target {
98		&self.track
99	}
100}