use core::time::Duration;
use crate::{candidate::TransportType, mut_override, stream::Stream};
use sans_io_time::Instant;
#[derive(Debug)]
pub struct Agent {
ffi: *mut crate::ffi::RiceAgent,
}
unsafe impl Send for Agent {}
unsafe impl Sync for Agent {}
impl Clone for Agent {
fn clone(&self) -> Self {
Self {
ffi: unsafe { crate::ffi::rice_agent_ref(self.ffi) },
}
}
}
impl Drop for Agent {
fn drop(&mut self) {
unsafe { crate::ffi::rice_agent_unref(self.ffi) }
}
}
impl Default for Agent {
fn default() -> Self {
Agent::builder().build()
}
}
impl Agent {
pub(crate) fn from_c_full(ffi: *mut crate::ffi::RiceAgent) -> Self {
Self { ffi }
}
pub fn builder() -> AgentBuilder {
AgentBuilder::default()
}
pub fn id(&self) -> u64 {
unsafe { crate::ffi::rice_agent_id(self.ffi) }
}
pub fn timing_advance(&self) -> Duration {
unsafe { Duration::from_nanos(crate::ffi::rice_agent_get_timing_advance(self.ffi)) }
}
pub fn set_timing_advance(&mut self, ta: Duration) {
unsafe {
crate::ffi::rice_agent_set_timing_advance(self.ffi, ta.as_nanos() as u64);
}
}
pub fn set_request_retransmits(
&self,
initial: Duration,
max: Duration,
retransmits: u32,
final_retransmit_timeout: Duration,
) {
unsafe {
crate::ffi::rice_agent_set_request_retransmits(
self.ffi,
initial.as_nanos() as u64,
max.as_nanos() as u64,
retransmits,
final_retransmit_timeout.as_nanos() as u64,
);
}
}
pub fn add_stream(&self) -> crate::stream::Stream {
unsafe { Stream::from_c_full(crate::ffi::rice_agent_add_stream(self.ffi)) }
}
pub fn stream(&self, id: usize) -> Option<crate::stream::Stream> {
let ret = unsafe { crate::ffi::rice_agent_get_stream(self.ffi, id) };
if ret.is_null() {
None
} else {
Some(crate::stream::Stream::from_c_full(ret))
}
}
pub fn close(&self, now: Instant) {
unsafe { crate::ffi::rice_agent_close(self.ffi, now.as_nanos()) }
}
pub fn controlling(&self) -> bool {
unsafe { crate::ffi::rice_agent_get_controlling(self.ffi) }
}
pub fn add_stun_server(
&self,
transport: crate::candidate::TransportType,
addr: crate::Address,
) {
unsafe { crate::ffi::rice_agent_add_stun_server(self.ffi, transport.into(), addr.as_c()) }
}
pub fn poll(&self, now: Instant) -> AgentPoll {
let mut ret = crate::ffi::RiceAgentPoll {
tag: crate::ffi::RICE_AGENT_POLL_CLOSED,
field1: crate::ffi::RiceAgentPoll__bindgen_ty_1 {
field1: core::mem::ManuallyDrop::new(
crate::ffi::RiceAgentPoll__bindgen_ty_1__bindgen_ty_1 {
wait_until_nanos: 0,
},
),
},
};
unsafe {
crate::ffi::rice_agent_poll_init(&mut ret);
crate::ffi::rice_agent_poll(self.ffi, now.as_nanos(), &mut ret);
}
AgentPoll::from_c_full(ret)
}
pub fn poll_transmit(&self, now: Instant) -> Option<AgentTransmit> {
let mut ret = crate::ffi::RiceTransmit {
stream_id: 0,
transport: crate::ffi::RICE_TRANSPORT_TYPE_UDP,
from: core::ptr::null(),
to: core::ptr::null(),
data: crate::ffi::RiceDataImpl {
ptr: core::ptr::null_mut(),
size: 0,
},
};
unsafe { crate::ffi::rice_agent_poll_transmit(self.ffi, now.as_nanos(), &mut ret) }
if ret.from.is_null() || ret.to.is_null() {
return None;
}
Some(AgentTransmit::from_c_full(ret))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RequestRto {
initial: Duration,
max: Duration,
retransmits: u32,
final_retransmit_timeout: Duration,
}
impl RequestRto {
fn from_parts(
initial: Duration,
max: Duration,
retransmits: u32,
final_retransmit_timeout: Duration,
) -> Self {
Self {
initial,
max,
retransmits,
final_retransmit_timeout,
}
}
}
#[derive(Debug)]
pub struct AgentBuilder {
trickle_ice: bool,
controlling: bool,
timing_advance: Duration,
rto: Option<RequestRto>,
}
impl Default for AgentBuilder {
fn default() -> Self {
Self {
trickle_ice: false,
controlling: false,
timing_advance: Duration::from_millis(50),
rto: None,
}
}
}
impl AgentBuilder {
pub fn trickle_ice(mut self, trickle_ice: bool) -> Self {
self.trickle_ice = trickle_ice;
self
}
pub fn controlling(mut self, controlling: bool) -> Self {
self.controlling = controlling;
self
}
pub fn timing_advance(mut self, ta: Duration) -> Self {
self.timing_advance = ta;
self
}
pub fn request_retransmits(
mut self,
initial: Duration,
max: Duration,
retransmits: u32,
final_retransmit_timeout: Duration,
) -> Self {
self.rto = Some(RequestRto::from_parts(
initial,
max,
retransmits,
final_retransmit_timeout,
));
self
}
pub fn build(self) -> Agent {
unsafe {
let ffi = crate::ffi::rice_agent_new(self.controlling, self.trickle_ice);
crate::ffi::rice_agent_set_timing_advance(ffi, self.timing_advance.as_nanos() as u64);
let ret = Agent { ffi };
if let Some(rto) = self.rto {
ret.set_request_retransmits(
rto.initial,
rto.max,
rto.retransmits,
rto.final_retransmit_timeout,
);
}
ret
}
}
}
#[derive(Debug, Default)]
pub enum AgentPoll {
#[default]
Closed,
WaitUntilNanos(i64),
AllocateSocket(AgentSocket),
RemoveSocket(AgentSocket),
SelectedPair(AgentSelectedPair),
ComponentStateChange(AgentComponentStateChange),
GatheredCandidate(AgentGatheredCandidate),
GatheringComplete(AgentGatheringComplete),
}
impl AgentPoll {
fn from_c_full(ffi: crate::ffi::RiceAgentPoll) -> Self {
unsafe {
match ffi.tag {
crate::ffi::RICE_AGENT_POLL_CLOSED => Self::Closed,
crate::ffi::RICE_AGENT_POLL_WAIT_UNTIL_NANOS => Self::WaitUntilNanos(
core::mem::ManuallyDrop::into_inner(ffi.field1.field1).wait_until_nanos,
),
crate::ffi::RICE_AGENT_POLL_ALLOCATE_SOCKET => {
let ty = core::mem::ManuallyDrop::into_inner(ffi.field1.field2).allocate_socket;
Self::AllocateSocket(AgentSocket {
stream_id: ty.stream_id,
component_id: ty.component_id,
transport: ty.transport.into(),
from: crate::Address::from_c_full(mut_override(ty.from)),
to: crate::Address::from_c_full(mut_override(ty.to)),
})
}
crate::ffi::RICE_AGENT_POLL_REMOVE_SOCKET => {
let ty = core::mem::ManuallyDrop::into_inner(ffi.field1.field3).remove_socket;
Self::RemoveSocket(AgentSocket {
stream_id: ty.stream_id,
component_id: ty.component_id,
transport: ty.transport.into(),
from: crate::Address::from_c_full(mut_override(ty.from)),
to: crate::Address::from_c_full(mut_override(ty.to)),
})
}
crate::ffi::RICE_AGENT_POLL_SELECTED_PAIR => {
let mut ty =
core::mem::ManuallyDrop::into_inner(ffi.field1.field4).selected_pair;
let local = crate::candidate::Candidate::from_c_none(&ty.local);
let remote = crate::candidate::Candidate::from_c_none(&ty.remote);
crate::ffi::rice_candidate_clear(&mut ty.local);
crate::ffi::rice_candidate_clear(&mut ty.remote);
let turn = if !ty.local_turn_local_addr.is_null()
&& !ty.local_turn_remote_addr.is_null()
{
Some(SelectedTurn {
transport: ty.local_turn_transport.into(),
local_addr: crate::Address::from_c_none(ty.local_turn_local_addr),
remote_addr: crate::Address::from_c_none(ty.local_turn_remote_addr),
})
} else {
None
};
crate::ffi::rice_address_free(mut_override(ty.local_turn_local_addr));
ty.local_turn_local_addr = core::ptr::null_mut();
crate::ffi::rice_address_free(mut_override(ty.local_turn_remote_addr));
ty.local_turn_remote_addr = core::ptr::null_mut();
Self::SelectedPair(AgentSelectedPair {
stream_id: ty.stream_id,
component_id: ty.component_id,
local,
remote,
turn,
})
}
crate::ffi::RICE_AGENT_POLL_COMPONENT_STATE_CHANGE => {
let ty = core::mem::ManuallyDrop::into_inner(ffi.field1.field5)
.component_state_change;
Self::ComponentStateChange(AgentComponentStateChange {
stream_id: ty.stream_id,
component_id: ty.component_id,
state: ty.state.into(),
})
}
crate::ffi::RICE_AGENT_POLL_GATHERED_CANDIDATE => {
let ty =
core::mem::ManuallyDrop::into_inner(ffi.field1.field6).gathered_candidate;
let stream_id = ty.stream_id;
let gathered = crate::stream::GatheredCandidate::from_c_full(ty.gathered);
Self::GatheredCandidate(AgentGatheredCandidate {
stream_id,
gathered,
})
}
crate::ffi::RICE_AGENT_POLL_GATHERING_COMPLETE => {
let ty =
core::mem::ManuallyDrop::into_inner(ffi.field1.field7).gathering_complete;
Self::GatheringComplete(AgentGatheringComplete {
stream_id: ty.stream_id,
component_id: ty.component_id,
})
}
tag => panic!("Unkown AgentPoll value {tag:x?}"),
}
}
}
}
impl Drop for AgentPoll {
fn drop(&mut self) {
unsafe {
if let Self::GatheredCandidate(gathered) = self {
let mut ret = crate::ffi::RiceAgentPoll {
tag: crate::ffi::RICE_AGENT_POLL_GATHERED_CANDIDATE,
field1: crate::ffi::RiceAgentPoll__bindgen_ty_1 {
field6: core::mem::ManuallyDrop::new(
crate::ffi::RiceAgentPoll__bindgen_ty_1__bindgen_ty_6 {
gathered_candidate: crate::ffi::RiceAgentGatheredCandidate {
stream_id: gathered.stream_id,
gathered: crate::stream::GatheredCandidate::take(
&mut gathered.gathered,
)
.ffi,
},
},
),
},
};
crate::ffi::rice_agent_poll_clear(&raw mut ret);
}
}
}
}
#[derive(Debug)]
pub struct AgentTransmit {
pub stream_id: usize,
pub from: crate::Address,
pub to: crate::Address,
pub transport: crate::candidate::TransportType,
pub data: &'static [u8],
}
impl AgentTransmit {
pub(crate) fn from_c_full(ffi: crate::ffi::RiceTransmit) -> Self {
unsafe {
let data = ffi.data.ptr;
let len = ffi.data.size;
let data = core::slice::from_raw_parts(data, len);
AgentTransmit {
stream_id: ffi.stream_id,
from: crate::Address::from_c_full(mut_override(ffi.from)),
to: crate::Address::from_c_full(mut_override(ffi.to)),
transport: ffi.transport.into(),
data,
}
}
}
}
impl Drop for AgentTransmit {
fn drop(&mut self) {
unsafe {
let mut transmit = crate::ffi::RiceTransmit {
stream_id: self.stream_id,
from: core::ptr::null_mut(),
to: core::ptr::null_mut(),
transport: self.transport.into(),
data: crate::ffi::RiceDataImpl::to_c(self.data),
};
crate::ffi::rice_transmit_clear(&mut transmit);
}
}
}
#[derive(Debug)]
pub struct AgentSocket {
pub stream_id: usize,
pub component_id: usize,
pub transport: crate::candidate::TransportType,
pub from: crate::Address,
pub to: crate::Address,
}
#[derive(Debug)]
pub struct AgentSelectedPair {
pub stream_id: usize,
pub component_id: usize,
pub local: crate::candidate::Candidate,
pub remote: crate::candidate::Candidate,
pub turn: Option<SelectedTurn>,
}
#[derive(Debug)]
pub struct SelectedTurn {
pub transport: TransportType,
pub local_addr: crate::Address,
pub remote_addr: crate::Address,
}
#[derive(Debug)]
#[repr(C)]
pub struct AgentComponentStateChange {
pub stream_id: usize,
pub component_id: usize,
pub state: crate::component::ComponentConnectionState,
}
#[derive(Debug)]
#[repr(C)]
pub struct AgentGatheredCandidate {
pub stream_id: usize,
pub gathered: crate::stream::GatheredCandidate,
}
#[derive(Debug)]
#[repr(C)]
pub struct AgentGatheringComplete {
pub stream_id: usize,
pub component_id: usize,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(i32)]
pub enum AgentError {
Failed = crate::ffi::RICE_ERROR_FAILED,
ResourceNotFound = crate::ffi::RICE_ERROR_RESOURCE_NOT_FOUND,
AlreadyInProgress = crate::ffi::RICE_ERROR_ALREADY_IN_PROGRESS,
}
impl core::fmt::Display for AgentError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Failed => write!(f, "Failed"),
Self::ResourceNotFound => write!(f, "Resource Not Found"),
Self::AlreadyInProgress => write!(f, "Already In Progress"),
}
}
}
impl AgentError {
pub(crate) fn from_c(value: crate::ffi::RiceError) -> Result<(), AgentError> {
match value {
crate::ffi::RICE_ERROR_SUCCESS => Ok(()),
crate::ffi::RICE_ERROR_FAILED => Err(AgentError::Failed),
crate::ffi::RICE_ERROR_RESOURCE_NOT_FOUND => Err(AgentError::ResourceNotFound),
crate::ffi::RICE_ERROR_ALREADY_IN_PROGRESS => Err(AgentError::AlreadyInProgress),
val => panic!("unknown RiceError value {val:x?}"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn agent_getters() {
let agent = Agent::builder()
.trickle_ice(false)
.controlling(true)
.build();
assert!(agent.controlling());
assert_eq!(agent.id(), agent.clone().id());
let stream = agent.add_stream();
assert_eq!(stream.id(), agent.stream(stream.id()).unwrap().id());
}
#[test]
fn agent_build_request_retransmits() {
let _agent = Agent::builder()
.request_retransmits(
Duration::from_millis(500),
Duration::from_secs(1),
10,
Duration::from_secs(10),
)
.build();
}
}