1use core::time::Duration;
14
15use crate::{candidate::TransportType, mut_override, stream::Stream};
16
17use sans_io_time::Instant;
18
19#[derive(Debug)]
21pub struct Agent {
22 ffi: *mut crate::ffi::RiceAgent,
23}
24
25unsafe impl Send for Agent {}
26unsafe impl Sync for Agent {}
27
28impl Clone for Agent {
29 fn clone(&self) -> Self {
30 Self {
31 ffi: unsafe { crate::ffi::rice_agent_ref(self.ffi) },
32 }
33 }
34}
35
36impl Drop for Agent {
37 fn drop(&mut self) {
38 unsafe { crate::ffi::rice_agent_unref(self.ffi) }
39 }
40}
41
42impl Default for Agent {
43 fn default() -> Self {
44 Agent::builder().build()
45 }
46}
47
48impl Agent {
49 pub(crate) fn from_c_full(ffi: *mut crate::ffi::RiceAgent) -> Self {
50 Self { ffi }
51 }
52
53 pub fn builder() -> AgentBuilder {
55 AgentBuilder::default()
56 }
57
58 pub fn id(&self) -> u64 {
60 unsafe { crate::ffi::rice_agent_id(self.ffi) }
61 }
62
63 pub fn timing_advance(&self) -> Duration {
69 unsafe { Duration::from_nanos(crate::ffi::rice_agent_get_timing_advance(self.ffi)) }
70 }
71
72 pub fn set_timing_advance(&mut self, ta: Duration) {
78 unsafe {
79 crate::ffi::rice_agent_set_timing_advance(self.ffi, ta.as_nanos() as u64);
80 }
81 }
82
83 pub fn set_request_retransmits(
99 &self,
100 initial: Duration,
101 max: Duration,
102 retransmits: u32,
103 final_retransmit_timeout: Duration,
104 ) {
105 unsafe {
106 crate::ffi::rice_agent_set_request_retransmits(
107 self.ffi,
108 initial.as_nanos() as u64,
109 max.as_nanos() as u64,
110 retransmits,
111 final_retransmit_timeout.as_nanos() as u64,
112 );
113 }
114 }
115
116 pub fn add_stream(&self) -> crate::stream::Stream {
128 unsafe { Stream::from_c_full(crate::ffi::rice_agent_add_stream(self.ffi)) }
129 }
130
131 pub fn stream(&self, id: usize) -> Option<crate::stream::Stream> {
133 let ret = unsafe { crate::ffi::rice_agent_get_stream(self.ffi, id) };
134 if ret.is_null() {
135 None
136 } else {
137 Some(crate::stream::Stream::from_c_full(ret))
138 }
139 }
140
141 pub fn close(&self, now: Instant) {
144 unsafe { crate::ffi::rice_agent_close(self.ffi, now.as_nanos()) }
145 }
146
147 pub fn controlling(&self) -> bool {
150 unsafe { crate::ffi::rice_agent_get_controlling(self.ffi) }
151 }
152
153 pub fn add_stun_server(
155 &self,
156 transport: crate::candidate::TransportType,
157 addr: crate::Address,
158 ) {
159 unsafe { crate::ffi::rice_agent_add_stun_server(self.ffi, transport.into(), addr.as_c()) }
160 }
161
162 pub fn poll(&self, now: Instant) -> AgentPoll {
166 let mut ret = crate::ffi::RiceAgentPoll {
167 tag: crate::ffi::RICE_AGENT_POLL_CLOSED,
168 field1: crate::ffi::RiceAgentPoll__bindgen_ty_1 {
169 field1: core::mem::ManuallyDrop::new(
170 crate::ffi::RiceAgentPoll__bindgen_ty_1__bindgen_ty_1 {
171 wait_until_nanos: 0,
172 },
173 ),
174 },
175 };
176
177 unsafe {
178 crate::ffi::rice_agent_poll_init(&mut ret);
179 crate::ffi::rice_agent_poll(self.ffi, now.as_nanos(), &mut ret);
180 }
181
182 AgentPoll::from_c_full(ret)
183 }
184
185 pub fn poll_transmit(&self, now: Instant) -> Option<AgentTransmit> {
190 let mut ret = crate::ffi::RiceTransmit {
191 stream_id: 0,
192 transport: crate::ffi::RICE_TRANSPORT_TYPE_UDP,
193 from: core::ptr::null(),
194 to: core::ptr::null(),
195 data: crate::ffi::RiceDataImpl {
196 ptr: core::ptr::null_mut(),
197 size: 0,
198 },
199 };
200 unsafe { crate::ffi::rice_agent_poll_transmit(self.ffi, now.as_nanos(), &mut ret) }
201 if ret.from.is_null() || ret.to.is_null() {
202 return None;
203 }
204 Some(AgentTransmit::from_c_full(ret))
205 }
206 }
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210struct RequestRto {
211 initial: Duration,
212 max: Duration,
213 retransmits: u32,
214 final_retransmit_timeout: Duration,
215}
216
217impl RequestRto {
218 fn from_parts(
219 initial: Duration,
220 max: Duration,
221 retransmits: u32,
222 final_retransmit_timeout: Duration,
223 ) -> Self {
224 Self {
225 initial,
226 max,
227 retransmits,
228 final_retransmit_timeout,
229 }
230 }
231}
232
233#[derive(Debug)]
235pub struct AgentBuilder {
236 trickle_ice: bool,
237 controlling: bool,
238 timing_advance: Duration,
239 rto: Option<RequestRto>,
240}
241
242impl Default for AgentBuilder {
243 fn default() -> Self {
244 Self {
245 trickle_ice: false,
246 controlling: false,
247 timing_advance: Duration::from_millis(50),
248 rto: None,
249 }
250 }
251}
252
253impl AgentBuilder {
254 pub fn trickle_ice(mut self, trickle_ice: bool) -> Self {
256 self.trickle_ice = trickle_ice;
257 self
258 }
259
260 pub fn controlling(mut self, controlling: bool) -> Self {
263 self.controlling = controlling;
264 self
265 }
266
267 pub fn timing_advance(mut self, ta: Duration) -> Self {
273 self.timing_advance = ta;
274 self
275 }
276
277 pub fn request_retransmits(
293 mut self,
294 initial: Duration,
295 max: Duration,
296 retransmits: u32,
297 final_retransmit_timeout: Duration,
298 ) -> Self {
299 self.rto = Some(RequestRto::from_parts(
300 initial,
301 max,
302 retransmits,
303 final_retransmit_timeout,
304 ));
305 self
306 }
307
308 pub fn build(self) -> Agent {
310 unsafe {
311 let ffi = crate::ffi::rice_agent_new(self.controlling, self.trickle_ice);
312 crate::ffi::rice_agent_set_timing_advance(ffi, self.timing_advance.as_nanos() as u64);
313 let ret = Agent { ffi };
314 if let Some(rto) = self.rto {
315 ret.set_request_retransmits(
316 rto.initial,
317 rto.max,
318 rto.retransmits,
319 rto.final_retransmit_timeout,
320 );
321 }
322 ret
323 }
324 }
325}
326
327#[derive(Debug, Default)]
329pub enum AgentPoll {
330 #[default]
332 Closed,
333 WaitUntilNanos(i64),
335 AllocateSocket(AgentSocket),
338 RemoveSocket(AgentSocket),
341 SelectedPair(AgentSelectedPair),
343 ComponentStateChange(AgentComponentStateChange),
345 GatheredCandidate(AgentGatheredCandidate),
347 GatheringComplete(AgentGatheringComplete),
349}
350
351impl AgentPoll {
352 fn from_c_full(ffi: crate::ffi::RiceAgentPoll) -> Self {
353 unsafe {
354 match ffi.tag {
355 crate::ffi::RICE_AGENT_POLL_CLOSED => Self::Closed,
356 crate::ffi::RICE_AGENT_POLL_WAIT_UNTIL_NANOS => Self::WaitUntilNanos(
357 core::mem::ManuallyDrop::into_inner(ffi.field1.field1).wait_until_nanos,
358 ),
359 crate::ffi::RICE_AGENT_POLL_ALLOCATE_SOCKET => {
360 let ty = core::mem::ManuallyDrop::into_inner(ffi.field1.field2).allocate_socket;
361 Self::AllocateSocket(AgentSocket {
362 stream_id: ty.stream_id,
363 component_id: ty.component_id,
364 transport: ty.transport.into(),
365 from: crate::Address::from_c_full(mut_override(ty.from)),
366 to: crate::Address::from_c_full(mut_override(ty.to)),
367 })
368 }
369 crate::ffi::RICE_AGENT_POLL_REMOVE_SOCKET => {
370 let ty = core::mem::ManuallyDrop::into_inner(ffi.field1.field3).remove_socket;
371 Self::RemoveSocket(AgentSocket {
372 stream_id: ty.stream_id,
373 component_id: ty.component_id,
374 transport: ty.transport.into(),
375 from: crate::Address::from_c_full(mut_override(ty.from)),
376 to: crate::Address::from_c_full(mut_override(ty.to)),
377 })
378 }
379 crate::ffi::RICE_AGENT_POLL_SELECTED_PAIR => {
380 let mut ty =
381 core::mem::ManuallyDrop::into_inner(ffi.field1.field4).selected_pair;
382 let local = crate::candidate::Candidate::from_c_none(&ty.local);
383 let remote = crate::candidate::Candidate::from_c_none(&ty.remote);
384 crate::ffi::rice_candidate_clear(&mut ty.local);
385 crate::ffi::rice_candidate_clear(&mut ty.remote);
386 let turn = if !ty.local_turn_local_addr.is_null()
387 && !ty.local_turn_remote_addr.is_null()
388 {
389 Some(SelectedTurn {
390 transport: ty.local_turn_transport.into(),
391 local_addr: crate::Address::from_c_none(ty.local_turn_local_addr),
392 remote_addr: crate::Address::from_c_none(ty.local_turn_remote_addr),
393 })
394 } else {
395 None
396 };
397 crate::ffi::rice_address_free(mut_override(ty.local_turn_local_addr));
398 ty.local_turn_local_addr = core::ptr::null_mut();
399 crate::ffi::rice_address_free(mut_override(ty.local_turn_remote_addr));
400 ty.local_turn_remote_addr = core::ptr::null_mut();
401 Self::SelectedPair(AgentSelectedPair {
402 stream_id: ty.stream_id,
403 component_id: ty.component_id,
404 local,
405 remote,
406 turn,
407 })
408 }
409 crate::ffi::RICE_AGENT_POLL_COMPONENT_STATE_CHANGE => {
410 let ty = core::mem::ManuallyDrop::into_inner(ffi.field1.field5)
411 .component_state_change;
412 Self::ComponentStateChange(AgentComponentStateChange {
413 stream_id: ty.stream_id,
414 component_id: ty.component_id,
415 state: ty.state.into(),
416 })
417 }
418 crate::ffi::RICE_AGENT_POLL_GATHERED_CANDIDATE => {
419 let ty =
420 core::mem::ManuallyDrop::into_inner(ffi.field1.field6).gathered_candidate;
421 let stream_id = ty.stream_id;
422 let gathered = crate::stream::GatheredCandidate::from_c_full(ty.gathered);
423 Self::GatheredCandidate(AgentGatheredCandidate {
424 stream_id,
425 gathered,
426 })
427 }
428 crate::ffi::RICE_AGENT_POLL_GATHERING_COMPLETE => {
429 let ty =
430 core::mem::ManuallyDrop::into_inner(ffi.field1.field7).gathering_complete;
431 Self::GatheringComplete(AgentGatheringComplete {
432 stream_id: ty.stream_id,
433 component_id: ty.component_id,
434 })
435 }
436 tag => panic!("Unkown AgentPoll value {tag:x?}"),
437 }
438 }
439 }
440}
441
442impl Drop for AgentPoll {
443 fn drop(&mut self) {
444 unsafe {
445 if let Self::GatheredCandidate(gathered) = self {
446 let mut ret = crate::ffi::RiceAgentPoll {
447 tag: crate::ffi::RICE_AGENT_POLL_GATHERED_CANDIDATE,
448 field1: crate::ffi::RiceAgentPoll__bindgen_ty_1 {
449 field6: core::mem::ManuallyDrop::new(
450 crate::ffi::RiceAgentPoll__bindgen_ty_1__bindgen_ty_6 {
451 gathered_candidate: crate::ffi::RiceAgentGatheredCandidate {
452 stream_id: gathered.stream_id,
453 gathered: crate::stream::GatheredCandidate::take(
454 &mut gathered.gathered,
455 )
456 .ffi,
457 },
458 },
459 ),
460 },
461 };
462 crate::ffi::rice_agent_poll_clear(&raw mut ret);
463 }
464 }
465 }
466}
467
468#[derive(Debug)]
470pub struct AgentTransmit {
471 pub stream_id: usize,
473 pub from: crate::Address,
475 pub to: crate::Address,
477 pub transport: crate::candidate::TransportType,
479 pub data: &'static [u8],
481}
482
483impl AgentTransmit {
484 pub(crate) fn from_c_full(ffi: crate::ffi::RiceTransmit) -> Self {
485 unsafe {
486 let data = ffi.data.ptr;
487 let len = ffi.data.size;
488 let data = core::slice::from_raw_parts(data, len);
489 AgentTransmit {
490 stream_id: ffi.stream_id,
491 from: crate::Address::from_c_full(mut_override(ffi.from)),
492 to: crate::Address::from_c_full(mut_override(ffi.to)),
493 transport: ffi.transport.into(),
494 data,
495 }
496 }
497 }
498}
499
500impl Drop for AgentTransmit {
501 fn drop(&mut self) {
502 unsafe {
503 let mut transmit = crate::ffi::RiceTransmit {
504 stream_id: self.stream_id,
505 from: core::ptr::null_mut(),
506 to: core::ptr::null_mut(),
507 transport: self.transport.into(),
508 data: crate::ffi::RiceDataImpl::to_c(self.data),
509 };
510 crate::ffi::rice_transmit_clear(&mut transmit);
511 }
512 }
513}
514
515#[derive(Debug)]
517pub struct AgentSocket {
518 pub stream_id: usize,
520 pub component_id: usize,
522 pub transport: crate::candidate::TransportType,
524 pub from: crate::Address,
526 pub to: crate::Address,
528}
529
530#[derive(Debug)]
532pub struct AgentSelectedPair {
533 pub stream_id: usize,
535 pub component_id: usize,
537 pub local: crate::candidate::Candidate,
539 pub remote: crate::candidate::Candidate,
541 pub turn: Option<SelectedTurn>,
543}
544
545#[derive(Debug)]
547pub struct SelectedTurn {
548 pub transport: TransportType,
550 pub local_addr: crate::Address,
552 pub remote_addr: crate::Address,
554}
555
556#[derive(Debug)]
558#[repr(C)]
559pub struct AgentComponentStateChange {
560 pub stream_id: usize,
562 pub component_id: usize,
564 pub state: crate::component::ComponentConnectionState,
566}
567
568#[derive(Debug)]
570#[repr(C)]
571pub struct AgentGatheredCandidate {
572 pub stream_id: usize,
574 pub gathered: crate::stream::GatheredCandidate,
576}
577
578#[derive(Debug)]
580#[repr(C)]
581pub struct AgentGatheringComplete {
582 pub stream_id: usize,
584 pub component_id: usize,
586}
587
588#[derive(Debug, Copy, Clone, PartialEq, Eq)]
590#[repr(i32)]
591pub enum AgentError {
592 Failed = crate::ffi::RICE_ERROR_FAILED,
594 ResourceNotFound = crate::ffi::RICE_ERROR_RESOURCE_NOT_FOUND,
596 AlreadyInProgress = crate::ffi::RICE_ERROR_ALREADY_IN_PROGRESS,
598}
599
600impl core::fmt::Display for AgentError {
601 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
602 match self {
603 Self::Failed => write!(f, "Failed"),
604 Self::ResourceNotFound => write!(f, "Resource Not Found"),
605 Self::AlreadyInProgress => write!(f, "Already In Progress"),
606 }
607 }
608}
609
610impl AgentError {
611 pub(crate) fn from_c(value: crate::ffi::RiceError) -> Result<(), AgentError> {
612 match value {
613 crate::ffi::RICE_ERROR_SUCCESS => Ok(()),
614 crate::ffi::RICE_ERROR_FAILED => Err(AgentError::Failed),
615 crate::ffi::RICE_ERROR_RESOURCE_NOT_FOUND => Err(AgentError::ResourceNotFound),
616 crate::ffi::RICE_ERROR_ALREADY_IN_PROGRESS => Err(AgentError::AlreadyInProgress),
617 val => panic!("unknown RiceError value {val:x?}"),
618 }
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 #[test]
627 fn agent_getters() {
628 let agent = Agent::builder()
629 .trickle_ice(false)
630 .controlling(true)
631 .build();
632 assert!(agent.controlling());
633 assert_eq!(agent.id(), agent.clone().id());
634
635 let stream = agent.add_stream();
636 assert_eq!(stream.id(), agent.stream(stream.id()).unwrap().id());
637 }
638
639 #[test]
640 fn agent_build_request_retransmits() {
641 let _agent = Agent::builder()
642 .request_retransmits(
643 Duration::from_millis(500),
644 Duration::from_secs(1),
645 10,
646 Duration::from_secs(10),
647 )
648 .build();
649 }
650}