moq_transfork/model/
router.rs1use std::ops;
2
3use tokio::sync::{mpsc, oneshot};
4
5use crate::{Error, Track, TrackConsumer, TrackProducer};
6
7pub struct Router {
9 pub capacity: usize,
11}
12
13impl Router {
14 pub fn produce(&self) -> (RouterProducer, RouterConsumer) {
15 let (send, recv) = mpsc::channel(self.capacity);
16
17 let writer = RouterProducer::new(recv);
18 let reader = RouterConsumer::new(send);
19
20 (writer, reader)
21 }
22}
23
24impl Default for Router {
25 fn default() -> Self {
26 Self { capacity: 32 }
27 }
28}
29
30pub struct RouterProducer {
32 queue: mpsc::Receiver<RouterRequest>,
33}
34
35impl RouterProducer {
36 fn new(queue: mpsc::Receiver<RouterRequest>) -> Self {
37 Self { queue }
38 }
39
40 pub async fn requested(&mut self) -> Option<RouterRequest> {
41 self.queue.recv().await
42 }
43}
44
45#[derive(Clone)]
47pub struct RouterConsumer {
48 queue: mpsc::Sender<RouterRequest>,
49}
50
51impl RouterConsumer {
52 fn new(queue: mpsc::Sender<RouterRequest>) -> Self {
53 Self { queue }
54 }
55
56 pub async fn subscribe(&self, track: Track) -> Result<TrackConsumer, Error> {
57 let (send, recv) = oneshot::channel();
58 let request = RouterRequest { track, reply: send };
59
60 if self.queue.send(request).await.is_err() {
61 return Err(Error::Cancel);
62 }
63
64 recv.await.map_err(|_| Error::Cancel)?
65 }
66
67 pub async fn closed(&self) {
68 self.queue.closed().await;
69 }
70}
71
72pub struct RouterRequest {
74 pub track: Track,
75 reply: oneshot::Sender<Result<TrackConsumer, Error>>,
76}
77
78impl RouterRequest {
79 pub fn serve(self, reader: TrackConsumer) {
80 self.reply.send(Ok(reader)).ok();
81 }
82
83 pub fn produce(self) -> TrackProducer {
84 let (writer, reader) = self.track.produce();
85 self.reply.send(Ok(reader)).ok();
86 writer
87 }
88
89 pub fn close(self, error: Error) {
90 self.reply.send(Err(error)).ok();
91 }
92}
93
94impl ops::Deref for RouterRequest {
95 type Target = Track;
96
97 fn deref(&self) -> &Self::Target {
98 &self.track
99 }
100}