use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Weak};
use std::task::{Poll, Waker};
use librice_proto::candidate::CandidatePair;
pub use librice_proto::component::ComponentConnectionState;
use crate::agent::AgentError;
use crate::socket::StunChannel;
use futures::prelude::*;
pub const RTP: usize = 1;
pub const RTCP: usize = 2;
#[derive(Debug, Clone)]
pub struct Component {
weak_agent: Weak<Mutex<librice_proto::agent::Agent>>,
stream_id: usize,
pub(crate) id: usize,
pub(crate) inner: Arc<Mutex<ComponentInner>>,
}
impl Component {
pub(crate) fn new(
weak_agent: Weak<Mutex<librice_proto::agent::Agent>>,
stream_id: usize,
id: usize,
) -> Self {
Self {
weak_agent,
stream_id,
id,
inner: Arc::new(Mutex::new(ComponentInner::new(id))),
}
}
pub fn id(&self) -> usize {
self.id
}
pub fn state(&self) -> ComponentConnectionState {
let Some(agent) = self.weak_agent.upgrade() else {
return ComponentConnectionState::Failed;
};
let agent = agent.lock().unwrap();
if let Some(stream) = agent.stream(self.stream_id) {
stream
.component(self.id)
.map(|component| component.state())
.unwrap_or(ComponentConnectionState::Failed)
} else {
ComponentConnectionState::Failed
}
}
pub async fn send(&self, data: &[u8]) -> Result<(), AgentError> {
let (local_agent, channel, to) = {
let inner = self.inner.lock().unwrap();
let selected_pair = inner.selected_pair.as_ref().ok_or(std::io::Error::new(
std::io::ErrorKind::NotFound,
"No selected pair",
))?;
let local_agent = selected_pair.proto.stun_agent().clone();
let to = selected_pair.proto.candidate_pair().remote.address;
trace!("sending {} bytes to {}", data.len(), to);
(local_agent, selected_pair.socket.clone(), to)
};
let transmit;
{
let local_agent = local_agent.lock().unwrap();
transmit = local_agent.send_data(data, to);
}
channel.send_to(&transmit.data, transmit.to).await?;
Ok(())
}
pub fn recv(&self) -> impl Stream<Item = Vec<u8>> {
ComponentRecv {
inner: self.inner.clone(),
}
}
pub(crate) fn set_selected_pair(&self, selected: SelectedPair) {
self.inner.lock().unwrap().set_selected_pair(selected)
}
pub fn selected_pair(&self) -> Option<CandidatePair> {
self.inner
.lock()
.unwrap()
.selected_pair
.clone()
.map(|selected| selected.proto.candidate_pair().clone())
}
}
#[derive(Debug)]
pub(crate) struct ComponentInner {
id: usize,
selected_pair: Option<SelectedPair>,
received_data: VecDeque<Vec<u8>>,
recv_waker: Option<Waker>,
}
impl ComponentInner {
fn new(id: usize) -> Self {
Self {
id,
selected_pair: None,
received_data: VecDeque::default(),
recv_waker: None,
}
}
#[tracing::instrument(
skip(self),
fields(
component_id = self.id
)
)]
fn set_selected_pair(&mut self, selected: SelectedPair) {
debug!("setting");
self.selected_pair = Some(selected);
}
#[tracing::instrument(
name = "component_incoming_data"
skip(self, data)
fields(
data.len = data.len()
)
)]
pub(crate) fn handle_incoming_data(&mut self, data: Vec<u8>) {
self.received_data.push_back(data);
if let Some(waker) = self.recv_waker.take() {
waker.wake();
}
}
}
#[doc(hidden)]
pub struct ComponentRecv {
inner: Arc<Mutex<ComponentInner>>,
}
impl futures::Stream for ComponentRecv {
type Item = Vec<u8>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut inner = self.inner.lock().unwrap();
if let Some(data) = inner.received_data.pop_front() {
return Poll::Ready(Some(data));
}
inner.recv_waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
#[derive(Debug, Clone)]
pub(crate) struct SelectedPair {
proto: librice_proto::component::SelectedPair,
socket: StunChannel,
}
impl SelectedPair {
pub(crate) fn new(pair: librice_proto::component::SelectedPair, socket: StunChannel) -> Self {
Self {
proto: pair,
socket,
}
}
}
#[cfg(test)]
mod tests {
use async_std::net::UdpSocket;
use librice_proto::candidate::{Candidate, CandidateType};
use stun_proto::agent::StunAgent;
use stun_proto::types::TransportType;
use super::*;
use crate::{agent::Agent, socket::UdpSocketChannel};
fn init() {
crate::tests::test_init_log();
}
#[test]
fn initial_state_new() {
init();
let agent = Agent::builder().build();
let s = agent.add_stream();
let c = s.add_component().unwrap();
assert_eq!(c.state(), ComponentConnectionState::New);
}
#[test]
fn send_recv() {
init();
async_std::task::block_on(async move {
let agent = Agent::builder().controlling(false).build();
let s = agent.add_stream();
let send = s.add_component().unwrap();
let local_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let remote_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let local_addr = local_socket.local_addr().unwrap();
let remote_addr = remote_socket.local_addr().unwrap();
let local_channel = StunChannel::Udp(UdpSocketChannel::new(local_socket));
let local_agent = StunAgent::builder(TransportType::Udp, local_addr).build();
let local_cand =
Candidate::builder(0, CandidateType::Host, TransportType::Udp, "0", local_addr)
.build();
let remote_cand =
Candidate::builder(0, CandidateType::Host, TransportType::Udp, "0", remote_addr)
.build();
let candidate_pair = CandidatePair::new(local_cand, remote_cand);
let local_agent = Arc::new(Mutex::new(local_agent));
let selected_pair = SelectedPair {
proto: librice_proto::component::SelectedPair::new(candidate_pair, local_agent),
socket: local_channel,
};
send.set_selected_pair(selected_pair.clone());
assert_eq!(
selected_pair.proto.candidate_pair(),
&send.selected_pair().unwrap()
);
let data = vec![3; 4];
send.send(&data).await.unwrap();
let mut recved = vec![0; 16];
let (len, from) = remote_socket.recv_from(&mut recved).await.unwrap();
let recved = &recved[..len];
assert_eq!(from, local_addr);
assert_eq!(recved, data);
});
}
}