livekit_protocol/
observer.rs1use std::{pin::Pin, sync::Arc};
16
17use futures_util::{
18 sink::Sink,
19 task::{Context, Poll},
20};
21use parking_lot::Mutex;
22use tokio::sync::mpsc;
23
24#[derive(Clone, Debug)]
25pub struct Dispatcher<T>
26where
27 T: Clone,
28{
29 senders: Arc<Mutex<Vec<mpsc::UnboundedSender<T>>>>,
30}
31
32impl<T> Default for Dispatcher<T>
33where
34 T: Clone,
35{
36 fn default() -> Self {
37 Self { senders: Default::default() }
38 }
39}
40
41impl<T> Dispatcher<T>
42where
43 T: Clone,
44{
45 pub fn register(&self) -> mpsc::UnboundedReceiver<T> {
46 let (tx, rx) = mpsc::unbounded_channel();
47 self.senders.lock().push(tx);
48 rx
49 }
50
51 pub fn dispatch(&self, msg: &T) {
52 self.senders.lock().retain(|sender| sender.send(msg.clone()).is_ok());
53 }
54
55 pub fn clear(&self) {
56 self.senders.lock().clear();
57 }
58}
59
60impl<T> Sink<T> for Dispatcher<T>
61where
62 T: Clone,
63{
64 type Error = ();
65
66 fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
67 Poll::Ready(Ok(()))
68 }
69
70 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
71 self.dispatch(&item);
72 Ok(())
73 }
74
75 fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
76 Poll::Ready(Ok(()))
77 }
78
79 fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
80 Poll::Ready(Ok(()))
81 }
82}