1use sans_io_time::Instant;
14
15use crate::{
16 candidate::{Candidate, TransportType},
17 mut_override,
18};
19
20#[derive(Debug)]
22pub struct Stream {
23 ffi: *mut crate::ffi::RiceStream,
24}
25
26unsafe impl Send for Stream {}
27unsafe impl Sync for Stream {}
28
29impl Clone for Stream {
30 fn clone(&self) -> Self {
31 Self {
32 ffi: unsafe { crate::ffi::rice_stream_ref(self.ffi) },
33 }
34 }
35}
36
37impl Drop for Stream {
38 fn drop(&mut self) {
39 unsafe { crate::ffi::rice_stream_unref(self.ffi) };
40 }
41}
42
43impl Stream {
44 pub(crate) fn from_c_full(stream: *mut crate::ffi::RiceStream) -> Self {
45 Self { ffi: stream }
46 }
47
48 pub fn id(&self) -> usize {
50 unsafe { crate::ffi::rice_stream_get_id(self.ffi) }
51 }
52
53 pub fn agent(&self) -> crate::agent::Agent {
55 unsafe { crate::agent::Agent::from_c_full(crate::ffi::rice_stream_get_agent(self.ffi)) }
56 }
57
58 pub fn add_component(&self) -> crate::component::Component {
60 unsafe {
61 crate::component::Component::from_c_full(
62 crate::ffi::rice_stream_add_component(self.ffi),
63 self.id(),
64 )
65 }
66 }
67
68 pub fn component(&self, id: usize) -> Option<crate::component::Component> {
71 let ret = unsafe { crate::ffi::rice_stream_get_component(self.ffi, id) };
72 if ret.is_null() {
73 None
74 } else {
75 Some(crate::component::Component::from_c_full(ret, self.id()))
76 }
77 }
78
79 pub fn local_credentials(&self) -> Option<Credentials> {
81 let ret = unsafe { crate::ffi::rice_stream_get_local_credentials(self.ffi) };
82 if ret.is_null() {
83 None
84 } else {
85 Some(Credentials::from_c_full(ret))
86 }
87 }
88
89 pub fn remote_credentials(&self) -> Option<Credentials> {
91 let ret = unsafe { crate::ffi::rice_stream_get_remote_credentials(self.ffi) };
92 if ret.is_null() {
93 None
94 } else {
95 Some(Credentials::from_c_full(ret))
96 }
97 }
98
99 pub fn set_local_credentials(&self, credentials: &Credentials) {
113 unsafe {
114 crate::ffi::rice_stream_set_local_credentials(self.ffi, credentials.into_c_none())
115 }
116 }
117
118 pub fn set_remote_credentials(&self, credentials: &Credentials) {
132 unsafe {
133 crate::ffi::rice_stream_set_remote_credentials(self.ffi, credentials.into_c_none())
134 }
135 }
136
137 pub fn end_of_local_candidates(&self) {
140 unsafe { crate::ffi::rice_stream_end_of_local_candidates(self.ffi) }
141 }
142
143 pub fn local_candidates(&self) -> Vec<Candidate> {
145 unsafe {
146 let mut len = 0;
147 crate::ffi::rice_stream_get_local_candidates(self.ffi, &mut len, core::ptr::null_mut());
148 let mut ret = vec![crate::ffi::RiceCandidate::zeroed(); len];
149 crate::ffi::rice_stream_get_local_candidates(self.ffi, &mut len, ret.as_mut_ptr());
150 ret.into_iter()
151 .map(|cand| Candidate::from_c_none(&cand))
152 .collect()
153 }
154 }
155
156 pub fn add_remote_candidate(&self, cand: &crate::candidate::Candidate) {
158 unsafe { crate::ffi::rice_stream_add_remote_candidate(self.ffi, cand.as_c()) }
159 }
160
161 pub fn end_of_remote_candidates(&self) {
164 unsafe { crate::ffi::rice_stream_end_of_remote_candidates(self.ffi) }
165 }
166
167 pub fn remote_candidates(&self) -> Vec<Candidate> {
169 unsafe {
170 let mut len = 0;
171 crate::ffi::rice_stream_get_remote_candidates(
172 self.ffi,
173 &mut len,
174 core::ptr::null_mut(),
175 );
176 let mut ret = vec![crate::ffi::RiceCandidate::zeroed(); len];
177 crate::ffi::rice_stream_get_remote_candidates(self.ffi, &mut len, ret.as_mut_ptr());
178 ret.into_iter().map(Candidate::from_c_full).collect()
179 }
180 }
181
182 pub fn add_local_gathered_candidate(&self, gathered: GatheredCandidate) -> bool {
186 unsafe { crate::ffi::rice_stream_add_local_gathered_candidate(self.ffi, &gathered.ffi) }
187 }
188
189 pub fn allocated_socket(
193 &self,
194 component_id: usize,
195 transport: TransportType,
196 from: &crate::Address,
197 to: &crate::Address,
198 socket_addr: Option<crate::Address>,
199 now: Instant,
200 ) {
201 let socket_addr = if let Some(addr) = socket_addr {
202 addr.into_c_full()
203 } else {
204 core::ptr::null_mut()
205 };
206 unsafe {
207 crate::ffi::rice_stream_handle_allocated_socket(
208 self.ffi,
209 component_id,
210 transport.into(),
211 from.as_c(),
212 to.as_c(),
213 socket_addr,
214 now.as_nanos(),
215 )
216 }
217 }
218
219 pub fn component_ids(&self) -> Vec<usize> {
221 unsafe {
222 let mut len = 0;
223 crate::ffi::rice_stream_component_ids(self.ffi, &mut len, core::ptr::null_mut());
224 let mut ret = vec![0; len];
225 crate::ffi::rice_stream_component_ids(self.ffi, &mut len, ret.as_mut_ptr());
226 ret.resize(len.min(ret.len()), 0);
227 ret
228 }
229 }
230
231 pub fn handle_incoming_data<'a>(
235 &self,
236 component_id: usize,
237 transport: TransportType,
238 from: crate::Address,
239 to: crate::Address,
240 data: &'a [u8],
241 now: Instant,
242 ) -> StreamIncomingDataReply<'a> {
243 unsafe {
244 let mut stream_ret = crate::ffi::RiceStreamIncomingData::default();
245 crate::ffi::rice_stream_handle_incoming_data(
246 self.ffi,
247 component_id,
248 transport.into(),
249 from.as_c(),
250 to.as_c(),
251 data.as_ptr(),
252 data.len(),
253 now.as_nanos(),
254 &mut stream_ret,
255 );
256 let mut ret = StreamIncomingDataReply {
257 handled: stream_ret.handled,
258 have_more_data: stream_ret.have_more_data,
259 data: None,
260 };
261 if !stream_ret.data.ptr.is_null() && stream_ret.data.size > 0 {
262 ret.data = Some(data);
263 }
264 ret
265 }
266 }
267
268 pub fn poll_recv(&self) -> Option<PollRecv> {
272 unsafe {
273 let mut len = 0;
274 let mut component_id = 0;
275 let ptr = crate::ffi::rice_stream_poll_recv(self.ffi, &mut component_id, &mut len);
276 if ptr.is_null() {
277 return None;
278 }
279 let slice = core::slice::from_raw_parts(ptr, len);
280 Some(PollRecv {
281 component_id,
282 data: RecvData { data: slice },
283 })
284 }
285 }
286}
287
288#[derive(Debug)]
290pub struct PollRecv {
291 pub component_id: usize,
293 pub data: RecvData,
295}
296
297#[derive(Debug)]
299pub struct RecvData {
300 data: &'static [u8],
301}
302
303impl core::ops::Deref for RecvData {
304 type Target = [u8];
305 fn deref(&self) -> &Self::Target {
306 self.data
307 }
308}
309
310impl Drop for RecvData {
311 fn drop(&mut self) {
312 unsafe { crate::ffi::rice_free_data(mut_override(self.data.as_ptr())) }
313 }
314}
315
316#[derive(Debug)]
318pub struct StreamIncomingDataReply<'a> {
319 pub handled: bool,
321 pub have_more_data: bool,
324 pub data: Option<&'a [u8]>,
326}
327
328#[derive(Debug)]
330pub struct Credentials {
331 ffi: *mut crate::ffi::RiceCredentials,
332}
333
334impl Credentials {
335 pub fn new(ufrag: &str, passwd: &str) -> Self {
337 let ufrag = std::ffi::CString::new(ufrag).unwrap();
338 let passwd = std::ffi::CString::new(passwd).unwrap();
339 unsafe {
340 Self {
341 ffi: crate::ffi::rice_credentials_new(ufrag.as_ptr(), passwd.as_ptr()),
342 }
343 }
344 }
345
346 pub(crate) fn from_c_full(ffi: *mut crate::ffi::RiceCredentials) -> Self {
347 Self { ffi }
348 }
349
350 #[allow(clippy::wrong_self_convention)]
351 pub(crate) fn into_c_none(&self) -> *const crate::ffi::RiceCredentials {
352 self.ffi
353 }
354}
355
356impl PartialEq for Credentials {
357 fn eq(&self, other: &Self) -> bool {
358 unsafe { crate::ffi::rice_credentials_eq(self.ffi, other.ffi) }
359 }
360}
361
362impl Clone for Credentials {
363 fn clone(&self) -> Self {
364 Self {
365 ffi: unsafe { crate::ffi::rice_credentials_copy(self.ffi) },
366 }
367 }
368}
369
370impl Drop for Credentials {
371 fn drop(&mut self) {
372 unsafe { crate::ffi::rice_credentials_free(self.ffi) }
373 }
374}
375
376#[derive(Debug)]
378pub struct GatheredCandidate {
379 pub(crate) ffi: crate::ffi::RiceGatheredCandidate,
380}
381
382unsafe impl Send for GatheredCandidate {}
383
384impl GatheredCandidate {
385 pub(crate) fn from_c_full(ffi: crate::ffi::RiceGatheredCandidate) -> Self {
386 Self { ffi }
387 }
388
389 pub fn take(&mut self) -> Self {
394 unsafe {
395 let mut ffi = crate::ffi::RiceGatheredCandidate {
396 candidate: crate::ffi::RiceCandidate::zeroed(),
397 turn_agent: self.ffi.turn_agent,
398 };
399 crate::ffi::rice_candidate_copy_into(&self.ffi.candidate, &mut ffi.candidate);
400 self.ffi.turn_agent = core::ptr::null_mut();
401 Self { ffi }
402 }
403 }
404
405 pub fn candidate(&self) -> crate::candidate::Candidate {
407 unsafe { crate::candidate::Candidate::from_c_none(&self.ffi.candidate) }
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use sans_io_time::Instant;
414
415 use super::*;
416 use crate::agent::{Agent, AgentPoll};
417
418 #[test]
419 fn getters() {
420 let _log = crate::tests::test_init_log();
421 let agent = Agent::builder().build();
422 let stream = agent.add_stream();
423 let component = stream.add_component();
424
425 assert_eq!(stream.id(), stream.clone().id());
426 assert_eq!(agent.id(), stream.agent().id());
427 assert_eq!(
428 component.id(),
429 stream.component(component.id()).unwrap().id()
430 );
431 assert!(stream.component(0).is_none());
432 }
433
434 #[test]
435 fn gather_candidates() {
436 let addr: crate::Address = "192.168.0.1:1000".parse().unwrap();
437 let stun_addr: crate::Address = "102.168.0.200:2000".parse().unwrap();
438 let agent = Agent::builder().build();
439 let stream = agent.add_stream();
440 let component = stream.add_component();
441 let transport = TransportType::Tcp;
442 let local_credentials = Credentials::new("luser", "lpass");
443 let remote_credentials = Credentials::new("ruser", "rpass");
444
445 agent.add_stun_server(transport, stun_addr);
446 stream.set_local_credentials(&local_credentials);
447 stream.set_remote_credentials(&remote_credentials);
448 component
449 .gather_candidates([(transport, &addr)], [])
450 .unwrap();
451
452 let AgentPoll::AllocateSocket(ref alloc) = agent.poll(Instant::ZERO) else {
453 unreachable!()
454 };
455 let from = &alloc.from;
456 let to = &alloc.to;
457 let component_id = alloc.component_id;
458
459 let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(Instant::ZERO) else {
460 unreachable!()
461 };
462
463 let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(Instant::ZERO) else {
464 unreachable!()
465 };
466
467 let AgentPoll::WaitUntilNanos(now) = agent.poll(Instant::ZERO) else {
468 unreachable!()
469 };
470
471 let tcp_from_addr: crate::Address = "192.168.200.4:3000".parse().unwrap();
472 stream.allocated_socket(
473 component_id,
474 TransportType::Tcp,
475 from,
476 to,
477 Some(tcp_from_addr),
478 Instant::from_nanos(now),
479 );
480
481 let _ = agent.poll_transmit(Instant::ZERO).unwrap();
482
483 let _ = agent.poll(Instant::ZERO);
484 let _ = agent.poll(Instant::ZERO);
485 }
486}