livekit_protocol/
observer.rs

1// Copyright 2023 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{pin::Pin, sync::Arc};
16
17use futures_util::{
18    sink::Sink,
19    task::{Context, Poll},
20};
21use parking_lot::Mutex;
22use tokio::sync::mpsc;
23
24#[derive(Clone, Debug)]
25pub struct Dispatcher<T>
26where
27    T: Clone,
28{
29    senders: Arc<Mutex<Vec<mpsc::UnboundedSender<T>>>>,
30}
31
32impl<T> Default for Dispatcher<T>
33where
34    T: Clone,
35{
36    fn default() -> Self {
37        Self { senders: Default::default() }
38    }
39}
40
41impl<T> Dispatcher<T>
42where
43    T: Clone,
44{
45    pub fn register(&self) -> mpsc::UnboundedReceiver<T> {
46        let (tx, rx) = mpsc::unbounded_channel();
47        self.senders.lock().push(tx);
48        rx
49    }
50
51    pub fn dispatch(&self, msg: &T) {
52        self.senders.lock().retain(|sender| sender.send(msg.clone()).is_ok());
53    }
54
55    pub fn clear(&self) {
56        self.senders.lock().clear();
57    }
58}
59
60impl<T> Sink<T> for Dispatcher<T>
61where
62    T: Clone,
63{
64    type Error = ();
65
66    fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
67        Poll::Ready(Ok(()))
68    }
69
70    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
71        self.dispatch(&item);
72        Ok(())
73    }
74
75    fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
76        Poll::Ready(Ok(()))
77    }
78
79    fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
80        Poll::Ready(Ok(()))
81    }
82}