use std::sync::Arc;
use async_trait::async_trait;
use cs_trace::{Tracer, child};
use cs_utils::futures::wait;
use tokio::sync::mpsc::Receiver;
use webrtc::peer_connection::RTCPeerConnection;
use connection_utils::{Channel, Connected, TOnRemoteChannelReader};
use anyhow::{self, bail};
use crate::RtcChannel;
pub struct RtcConnectionConnected {
trace: Box<dyn Tracer>,
peer_connection: Arc<RTCPeerConnection>,
on_data_channel_source: Option<Receiver<Box<dyn Channel>>>,
initial_data_channel: Option<Box<dyn Channel>>,
}
impl RtcConnectionConnected {
pub fn new(
trace: &Box<dyn Tracer>,
peer_connection: Arc<RTCPeerConnection>,
initial_data_channel: Option<Box<dyn Channel>>,
on_data_channel_source: Receiver<Box<dyn Channel>>,
) -> anyhow::Result<RtcConnectionConnected> {
let trace = child!(trace, "connected");
return Ok(
RtcConnectionConnected {
trace,
peer_connection,
on_data_channel_source: Some(on_data_channel_source),
initial_data_channel,
},
);
}
}
#[async_trait]
impl Connected for RtcConnectionConnected {
fn on_remote_channel(&mut self) -> anyhow::Result<TOnRemoteChannelReader> {
match self.on_data_channel_source.take() {
None => bail!("on_remote_channel listener not found, already taken?"),
Some(on_channel) => return Ok(on_channel),
};
}
fn off_remote_channel(&mut self, on_data_channel: TOnRemoteChannelReader) -> anyhow::Result<()> {
self.on_data_channel_source = Some(on_data_channel);
return Ok(());
}
async fn channel(&mut self, label: String) -> anyhow::Result<Box<dyn Channel>> {
if let Some(channel) = self.initial_data_channel.take() {
wait(50).await;
return Ok(channel);
}
let data_channel = self
.peer_connection
.create_data_channel(label.as_ref(), None)
.await
.expect("Cannot create data channel.");
let data_stream = RtcChannel::new(
&self.trace,
data_channel,
).await;
return Ok(Box::new(data_stream));
}
async fn disconnect(mut self) -> anyhow::Result<()> {
todo!();
}
}