use futures::Future;
use std::{sync::Arc, pin::Pin};
use cs_trace::{Tracer, child};
use connection_utils::Channel;
use tokio::{sync::{mpsc::{self, Sender}, Mutex}};
use webrtc::data_channel::{RTCDataChannel, data_channel_message::DataChannelMessage};
mod async_read;
mod async_write;
pub struct RtcChannel {
trace: Box<dyn Tracer>,
id: u16,
label: String,
channel: Arc<Mutex<Arc<RTCDataChannel>>>,
current_read_future: Option<Pin<Box<dyn Future<Output = Result<DataChannelMessage, std::io::Error>>>>>,
current_write_future: Option<Pin<Box<dyn Future<Output = Result<usize, std::io::Error>>>>>,
read_message_queue: Arc<Mutex<mpsc::Receiver<DataChannelMessage>>>,
}
unsafe impl Send for RtcChannel {}
unsafe impl Sync for RtcChannel {}
async fn setup_listeners(
channel: Arc<RTCDataChannel>,
sender: Arc<Sender<DataChannelMessage>>,
) {
let on_message_channel = Arc::clone(&channel);
channel.on_open(Box::new(move || {
return Box::pin(async move {
on_message_channel.on_message(Box::new(move |message: DataChannelMessage| {
let read_sender = Arc::clone(&sender);
return Box::pin(async move {
read_sender
.send(message)
.await
.expect("Cannot send data channel message to the queue.");
});
})).await;
});
})).await;
}
impl RtcChannel {
pub async fn new(
trace: &Box<dyn Tracer>,
channel: Arc<RTCDataChannel>,
) -> RtcChannel {
let (
read_queue_sender,
read_message_queue,
) = mpsc::channel(100);
setup_listeners(
Arc::clone(&channel),
Arc::new(read_queue_sender),
).await;
return RtcChannel {
id: channel.id(),
label: channel.label().to_string(),
trace: child!(trace, "rtc-channel"),
channel: Arc::new(Mutex::new(channel)),
read_message_queue: Arc::new(Mutex::new(read_message_queue)),
current_read_future: None,
current_write_future: None,
};
}
pub fn id(&self) -> u16 {
return self.id;
}
pub fn label(&self) -> &String {
return &self.label;
}
}
impl Channel for RtcChannel {
fn id(&self) -> u16 {
return self.id;
}
fn label(&self) -> &String {
return &self.label;
}
}