webrtc_connection/
rtc_channel.rs

1use futures::Future;
2use std::{sync::Arc, pin::Pin}; 
3use cs_trace::{Tracer, child};
4use connection_utils::Channel;
5use tokio::{sync::{mpsc::{self, Sender}, Mutex}};
6use webrtc::data_channel::{RTCDataChannel, data_channel_message::DataChannelMessage};
7
8mod async_read;
9mod async_write;
10
11pub struct RtcChannel {
12    trace: Box<dyn Tracer>,
13    id: u16,
14    label: String,
15    channel: Arc<Mutex<Arc<RTCDataChannel>>>,
16    current_read_future: Option<Pin<Box<dyn Future<Output = Result<DataChannelMessage, std::io::Error>>>>>,
17    current_write_future: Option<Pin<Box<dyn Future<Output = Result<usize, std::io::Error>>>>>,
18    read_message_queue: Arc<Mutex<mpsc::Receiver<DataChannelMessage>>>,
19}
20
21// TODO: fix
22unsafe impl Send for RtcChannel {}
23unsafe impl Sync for RtcChannel {}
24
25async fn setup_listeners(
26    channel: Arc<RTCDataChannel>,
27    sender: Arc<Sender<DataChannelMessage>>,
28) {
29    let on_message_channel = Arc::clone(&channel);
30    channel.on_open(Box::new(move || {
31        return Box::pin(async move {
32            on_message_channel.on_message(Box::new(move |message: DataChannelMessage| {
33                let read_sender = Arc::clone(&sender);
34                return Box::pin(async move {
35                    read_sender
36                        .send(message)
37                        .await
38                        .expect("Cannot send data channel message to the queue.");
39                });
40            })).await;
41        });
42    })).await;
43}
44
45impl RtcChannel {
46    pub async fn new(
47        trace: &Box<dyn Tracer>,
48        channel: Arc<RTCDataChannel>,
49    ) -> RtcChannel {
50        let (
51            read_queue_sender,
52            read_message_queue,
53        ) = mpsc::channel(100);
54
55        setup_listeners(
56            Arc::clone(&channel),
57            Arc::new(read_queue_sender),
58        ).await;
59
60        return RtcChannel {
61            id: channel.id(),
62            label: channel.label().to_string(),
63            trace: child!(trace, "rtc-channel"),
64            channel: Arc::new(Mutex::new(channel)),
65            read_message_queue: Arc::new(Mutex::new(read_message_queue)),
66            current_read_future: None,
67            current_write_future: None,
68        };
69    }
70
71    pub fn id(&self) -> u16 {
72        return self.id;
73    }
74
75    pub fn label(&self) -> &String {
76        return &self.label;
77    }
78}
79
80impl Channel for RtcChannel {
81    fn id(&self) -> u16 {
82        return self.id;
83    }
84
85
86    fn label(&self) ->  &String {
87        return &self.label;
88    }
89}