use std::sync::{Arc, Weak};
use cs_trace::{Tracer, child};
use cs_utils::futures::GenericCodec;
use tokio::{io::{AsyncRead, AsyncWrite}, sync::Mutex};
use tokio_util::codec::Framed;
use futures::{stream::{SplitStream, SplitSink}, SinkExt, StreamExt};
use webrtc::{peer_connection::{RTCPeerConnection, peer_connection_state::RTCPeerConnectionState}, ice_transport::ice_candidate::RTCIceCandidate};
use crate::types::RTCSignalingMessage;
pub async fn on_ice_candidate<AsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
trace: Box<dyn Tracer>,
peer_connection: Arc<RTCPeerConnection>,
signaling_sink: Arc<Mutex<SplitSink<Framed<Box<AsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
pending_candidates: Arc<Mutex<Vec<RTCIceCandidate>>>,
) -> anyhow::Result<()> {
let weak_peer_connection = Arc::downgrade(&peer_connection);
let result = peer_connection
.on_ice_candidate(
Box::new(move |maybe_candidate: Option<RTCIceCandidate>| {
trace.trace(
&format!("found ICE candidate: {:?}", &maybe_candidate),
);
let remote_candidate = match maybe_candidate {
Some(candidate) => candidate,
None => return Box::pin(async move {}),
};
let connection = match Weak::clone(&weak_peer_connection).upgrade() {
Some(connection) => connection,
None => return Box::pin(async move {}),
};
let async_trace = child!(trace, "async");
let pending_candidates = Arc::clone(&pending_candidates);
let signaling_sink = Arc::clone(&signaling_sink);
return Box::pin(async move {
let description = connection
.remote_description()
.await;
if description.is_none() {
async_trace.trace(
&format!("adding to pending candidates"),
);
return pending_candidates
.lock().await
.push(remote_candidate);
}
async_trace.trace(
&format!("sending candidate to the remote side"),
);
let ice_candidate_message = RTCSignalingMessage::IceCandidate(remote_candidate.clone());
signaling_sink
.lock()
.await
.send(ice_candidate_message)
.await
.expect("Failed to send codec message.");
});
}),
).await;
return Ok(result);
}
pub async fn on_peer_connection_state_change(
trace: Box<dyn Tracer>,
peer_connection: Arc<RTCPeerConnection>,
connection_signal_sink: Arc<tokio::sync::mpsc::Sender<()>>,
) -> anyhow::Result<()> {
let result = peer_connection
.on_peer_connection_state_change(
Box::new(move |new_peer_state: RTCPeerConnectionState| {
trace.info(
&format!("peer connection state has changed to: [{}]", new_peer_state),
);
match new_peer_state {
RTCPeerConnectionState::Connected | RTCPeerConnectionState::Failed => {
let connection_signal_sink = Arc::clone(&connection_signal_sink);
return Box::pin(async move {
connection_signal_sink
.send(())
.await
.unwrap();
});
},
_ => {
return Box::pin(async move {});
},
}
}),
).await;
return Ok(result);
}
pub async fn on_signal<AsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
trace: Box<dyn Tracer>,
peer_connection: Arc<RTCPeerConnection>,
signaling_source: Arc<Mutex<SplitStream<Framed<Box<AsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
signaling_sink: Arc<Mutex<SplitSink<Framed<Box<AsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
pending_candidates: Arc<Mutex<Vec<RTCIceCandidate>>>,
should_create_answers: bool,
) -> anyhow::Result<()> {
trace.trace("listen");
while let Some(maybe_message) = signaling_source.lock().await.next().await {
let message = maybe_message?;
match message {
RTCSignalingMessage::SessionDescription(remote_session) => {
trace.trace(
&format!("got remote session: {:?}", &remote_session),
);
peer_connection.set_remote_description(remote_session).await?;
trace.trace("remote description is set");
if should_create_answers {
let answer = peer_connection.create_answer(None).await?;
{
trace.trace(
&format!("sending answer: {:?}", &answer),
);
signaling_sink
.lock()
.await
.send(RTCSignalingMessage::SessionDescription(answer.clone()))
.await?;
}
peer_connection.set_local_description(answer).await?;
trace.trace("answer local description is set");
}
{
trace.trace("sending pending candidates");
let pending_candidates = pending_candidates.lock().await.clone();
for candidate in pending_candidates {
signaling_sink
.lock()
.await
.send(RTCSignalingMessage::IceCandidate(candidate.clone()))
.await?;
}
trace.trace("pending candidates sent");
}
},
RTCSignalingMessage::IceCandidate(remote_candidate) => {
trace.trace(
&format!("got remote candidate: {:?}", &remote_candidate),
);
let remote_candidate = remote_candidate.to_json().await?;
peer_connection
.add_ice_candidate(remote_candidate)
.await?;
},
}
}
trace.warn("finished");
return Ok(());
}