webrtc_connection/
rtc_channel.rs1use futures::Future;
2use std::{sync::Arc, pin::Pin};
3use cs_trace::{Tracer, child};
4use connection_utils::Channel;
5use tokio::{sync::{mpsc::{self, Sender}, Mutex}};
6use webrtc::data_channel::{RTCDataChannel, data_channel_message::DataChannelMessage};
7
8mod async_read;
9mod async_write;
10
11pub struct RtcChannel {
12 trace: Box<dyn Tracer>,
13 id: u16,
14 label: String,
15 channel: Arc<Mutex<Arc<RTCDataChannel>>>,
16 current_read_future: Option<Pin<Box<dyn Future<Output = Result<DataChannelMessage, std::io::Error>>>>>,
17 current_write_future: Option<Pin<Box<dyn Future<Output = Result<usize, std::io::Error>>>>>,
18 read_message_queue: Arc<Mutex<mpsc::Receiver<DataChannelMessage>>>,
19}
20
21unsafe impl Send for RtcChannel {}
23unsafe impl Sync for RtcChannel {}
24
25async fn setup_listeners(
26 channel: Arc<RTCDataChannel>,
27 sender: Arc<Sender<DataChannelMessage>>,
28) {
29 let on_message_channel = Arc::clone(&channel);
30 channel.on_open(Box::new(move || {
31 return Box::pin(async move {
32 on_message_channel.on_message(Box::new(move |message: DataChannelMessage| {
33 let read_sender = Arc::clone(&sender);
34 return Box::pin(async move {
35 read_sender
36 .send(message)
37 .await
38 .expect("Cannot send data channel message to the queue.");
39 });
40 })).await;
41 });
42 })).await;
43}
44
45impl RtcChannel {
46 pub async fn new(
47 trace: &Box<dyn Tracer>,
48 channel: Arc<RTCDataChannel>,
49 ) -> RtcChannel {
50 let (
51 read_queue_sender,
52 read_message_queue,
53 ) = mpsc::channel(100);
54
55 setup_listeners(
56 Arc::clone(&channel),
57 Arc::new(read_queue_sender),
58 ).await;
59
60 return RtcChannel {
61 id: channel.id(),
62 label: channel.label().to_string(),
63 trace: child!(trace, "rtc-channel"),
64 channel: Arc::new(Mutex::new(channel)),
65 read_message_queue: Arc::new(Mutex::new(read_message_queue)),
66 current_read_future: None,
67 current_write_future: None,
68 };
69 }
70
71 pub fn id(&self) -> u16 {
72 return self.id;
73 }
74
75 pub fn label(&self) -> &String {
76 return &self.label;
77 }
78}
79
80impl Channel for RtcChannel {
81 fn id(&self) -> u16 {
82 return self.id;
83 }
84
85
86 fn label(&self) -> &String {
87 return &self.label;
88 }
89}