Skip to main content

rice_c/
stream.rs

1// Copyright (C) 2025 Matthew Waters <matthew@centricular.com>
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8//
9// SPDX-License-Identifier: MIT OR Apache-2.0
10
11//! A [`Stream`] in an ICE [`Agent`](crate::agent::Agent).
12
13use sans_io_time::Instant;
14
15use crate::{
16    candidate::{Candidate, TransportType},
17    mut_override,
18};
19
20/// An ICE [`Stream`]
21#[derive(Debug)]
22pub struct Stream {
23    ffi: *mut crate::ffi::RiceStream,
24}
25
26unsafe impl Send for Stream {}
27unsafe impl Sync for Stream {}
28
29impl Clone for Stream {
30    fn clone(&self) -> Self {
31        Self {
32            ffi: unsafe { crate::ffi::rice_stream_ref(self.ffi) },
33        }
34    }
35}
36
37impl Drop for Stream {
38    fn drop(&mut self) {
39        unsafe { crate::ffi::rice_stream_unref(self.ffi) };
40    }
41}
42
43impl Stream {
44    pub(crate) fn from_c_full(stream: *mut crate::ffi::RiceStream) -> Self {
45        Self { ffi: stream }
46    }
47
48    /// An agent-global unique identifier for the ICE stream.
49    pub fn id(&self) -> usize {
50        unsafe { crate::ffi::rice_stream_get_id(self.ffi) }
51    }
52
53    /// The [`Agent`](crate::agent::Agent) that handles this [`Stream`].
54    pub fn agent(&self) -> crate::agent::Agent {
55        unsafe { crate::agent::Agent::from_c_full(crate::ffi::rice_stream_get_agent(self.ffi)) }
56    }
57
58    /// Add a `Component` to this stream.
59    pub fn add_component(&self) -> crate::component::Component {
60        unsafe {
61            crate::component::Component::from_c_full(
62                crate::ffi::rice_stream_add_component(self.ffi),
63                self.id(),
64            )
65        }
66    }
67
68    /// Retrieve a `Component` from this stream.  If the index doesn't exist or a component is not
69    /// available at that index, `None` is returned
70    pub fn component(&self, id: usize) -> Option<crate::component::Component> {
71        let ret = unsafe { crate::ffi::rice_stream_get_component(self.ffi, id) };
72        if ret.is_null() {
73            None
74        } else {
75            Some(crate::component::Component::from_c_full(ret, self.id()))
76        }
77    }
78
79    /// Retreive the previouly set local ICE credentials for this `Stream`.
80    pub fn local_credentials(&self) -> Option<Credentials> {
81        let ret = unsafe { crate::ffi::rice_stream_get_local_credentials(self.ffi) };
82        if ret.is_null() {
83            None
84        } else {
85            Some(Credentials::from_c_full(ret))
86        }
87    }
88
89    /// Retreive the previouly set remote ICE credentials for this `Stream`.
90    pub fn remote_credentials(&self) -> Option<Credentials> {
91        let ret = unsafe { crate::ffi::rice_stream_get_remote_credentials(self.ffi) };
92        if ret.is_null() {
93            None
94        } else {
95            Some(Credentials::from_c_full(ret))
96        }
97    }
98
99    /// Set local ICE credentials for this `Stream`.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// # use rice_c::agent::Agent;
105    /// # use rice_c::stream::Credentials;
106    /// let mut agent = Agent::default();
107    /// let stream = agent.add_stream();
108    /// let credentials = Credentials::new("user", "pass");
109    /// stream.set_local_credentials(&credentials);
110    /// assert_eq!(stream.local_credentials(), Some(credentials));
111    /// ```
112    pub fn set_local_credentials(&self, credentials: &Credentials) {
113        unsafe {
114            crate::ffi::rice_stream_set_local_credentials(self.ffi, credentials.into_c_none())
115        }
116    }
117
118    /// Set remote ICE credentials for this `Stream`.
119    ///
120    /// # Examples
121    ///
122    /// ```
123    /// # use rice_c::agent::Agent;
124    /// # use rice_c::stream::Credentials;
125    /// let agent = Agent::default();
126    /// let stream = agent.add_stream();
127    /// let credentials = Credentials::new("user", "pass");
128    /// stream.set_remote_credentials(&credentials);
129    /// assert_eq!(stream.remote_credentials(), Some(credentials));
130    /// ```
131    pub fn set_remote_credentials(&self, credentials: &Credentials) {
132        unsafe {
133            crate::ffi::rice_stream_set_remote_credentials(self.ffi, credentials.into_c_none())
134        }
135    }
136
137    /// Signal the end of local candidates.  Calling this function may allow ICE processing to
138    /// complete.
139    pub fn end_of_local_candidates(&self) {
140        unsafe { crate::ffi::rice_stream_end_of_local_candidates(self.ffi) }
141    }
142
143    /// Retrieve previously set local candidates for connection checks from this stream.
144    pub fn local_candidates(&self) -> Vec<Candidate> {
145        unsafe {
146            let mut len = 0;
147            crate::ffi::rice_stream_get_local_candidates(self.ffi, &mut len, core::ptr::null_mut());
148            let mut ret = vec![crate::ffi::RiceCandidate::zeroed(); len];
149            crate::ffi::rice_stream_get_local_candidates(self.ffi, &mut len, ret.as_mut_ptr());
150            ret.into_iter()
151                .map(|cand| Candidate::from_c_none(&cand))
152                .collect()
153        }
154    }
155
156    /// Add a remote candidate for connection checks for use with this stream
157    pub fn add_remote_candidate(&self, cand: &crate::candidate::Candidate) {
158        unsafe { crate::ffi::rice_stream_add_remote_candidate(self.ffi, cand.as_c()) }
159    }
160
161    /// Indicate that no more candidates are expected from the peer.  This may allow the ICE
162    /// process to complete.
163    pub fn end_of_remote_candidates(&self) {
164        unsafe { crate::ffi::rice_stream_end_of_remote_candidates(self.ffi) }
165    }
166
167    /// Retrieve previously set remote candidates for connection checks from this stream.
168    pub fn remote_candidates(&self) -> Vec<Candidate> {
169        unsafe {
170            let mut len = 0;
171            crate::ffi::rice_stream_get_remote_candidates(
172                self.ffi,
173                &mut len,
174                core::ptr::null_mut(),
175            );
176            let mut ret = vec![crate::ffi::RiceCandidate::zeroed(); len];
177            crate::ffi::rice_stream_get_remote_candidates(self.ffi, &mut len, ret.as_mut_ptr());
178            ret.into_iter().map(Candidate::from_c_full).collect()
179        }
180    }
181
182    /// Add a local candidate for this stream.
183    ///
184    /// Returns whether the candidate was added internally.
185    pub fn add_local_gathered_candidate(&self, gathered: GatheredCandidate) -> bool {
186        unsafe { crate::ffi::rice_stream_add_local_gathered_candidate(self.ffi, &gathered.ffi) }
187    }
188
189    /// Provide a reply to the
190    /// [`AgentPoll::AllocateSocket`](crate::agent::AgentPoll::AllocateSocket) request.  The
191    /// `component_id`, `transport`, `from`, and `to` values must match exactly with the request.
192    pub fn allocated_socket(
193        &self,
194        component_id: usize,
195        transport: TransportType,
196        from: &crate::Address,
197        to: &crate::Address,
198        socket_addr: Option<crate::Address>,
199        now: Instant,
200    ) {
201        let socket_addr = if let Some(addr) = socket_addr {
202            addr.into_c_full()
203        } else {
204            core::ptr::null_mut()
205        };
206        unsafe {
207            crate::ffi::rice_stream_handle_allocated_socket(
208                self.ffi,
209                component_id,
210                transport.into(),
211                from.as_c(),
212                to.as_c(),
213                socket_addr,
214                now.as_nanos(),
215            )
216        }
217    }
218
219    /// The list of component ids available in this stream
220    pub fn component_ids(&self) -> Vec<usize> {
221        unsafe {
222            let mut len = 0;
223            crate::ffi::rice_stream_component_ids(self.ffi, &mut len, core::ptr::null_mut());
224            let mut ret = vec![0; len];
225            crate::ffi::rice_stream_component_ids(self.ffi, &mut len, ret.as_mut_ptr());
226            ret.resize(len.min(ret.len()), 0);
227            ret
228        }
229    }
230
231    /// Provide the stream with data that has been received on an external socket.  The returned
232    /// value indicates what has been done with the data and any application data that has been
233    /// received.
234    pub fn handle_incoming_data<'a>(
235        &self,
236        component_id: usize,
237        transport: TransportType,
238        from: crate::Address,
239        to: crate::Address,
240        data: &'a [u8],
241        now: Instant,
242    ) -> StreamIncomingDataReply<'a> {
243        unsafe {
244            let mut stream_ret = crate::ffi::RiceStreamIncomingData::default();
245            crate::ffi::rice_stream_handle_incoming_data(
246                self.ffi,
247                component_id,
248                transport.into(),
249                from.as_c(),
250                to.as_c(),
251                data.as_ptr(),
252                data.len(),
253                now.as_nanos(),
254                &mut stream_ret,
255            );
256            let mut ret = StreamIncomingDataReply {
257                handled: stream_ret.handled,
258                have_more_data: stream_ret.have_more_data,
259                data: None,
260            };
261            if !stream_ret.data.ptr.is_null() && stream_ret.data.size > 0 {
262                ret.data = Some(data);
263            }
264            ret
265        }
266    }
267
268    /// Poll for any received data.
269    ///
270    /// Must be called after `handle_incoming_data` if `have_more_data` is `true`.
271    pub fn poll_recv(&self) -> Option<PollRecv> {
272        unsafe {
273            let mut len = 0;
274            let mut component_id = 0;
275            let ptr = crate::ffi::rice_stream_poll_recv(self.ffi, &mut component_id, &mut len);
276            if ptr.is_null() {
277                return None;
278            }
279            let slice = core::slice::from_raw_parts(ptr, len);
280            Some(PollRecv {
281                component_id,
282                data: RecvData { data: slice },
283            })
284        }
285    }
286}
287
288/// Data that should be sent to a peer as a result of calling [`Stream::poll_recv()`].
289#[derive(Debug)]
290pub struct PollRecv {
291    /// The component id that the data was received for.
292    pub component_id: usize,
293    /// The received data.
294    pub data: RecvData,
295}
296
297/// Data to send.
298#[derive(Debug)]
299pub struct RecvData {
300    data: &'static [u8],
301}
302
303impl core::ops::Deref for RecvData {
304    type Target = [u8];
305    fn deref(&self) -> &Self::Target {
306        self.data
307    }
308}
309
310impl Drop for RecvData {
311    fn drop(&mut self) {
312        unsafe { crate::ffi::rice_free_data(mut_override(self.data.as_ptr())) }
313    }
314}
315
316/// Return value to [`Stream::handle_incoming_data`].
317#[derive(Debug)]
318pub struct StreamIncomingDataReply<'a> {
319    /// Some of the data was handled
320    pub handled: bool,
321    /// Data was received in addition to any in the `data` field that could be retrieved with
322    /// [`Stream::poll_recv`].
323    pub have_more_data: bool,
324    /// Any application data that could be parsed from the incoming data.
325    pub data: Option<&'a [u8]>,
326}
327
328/// A set of ICE/TURN credentials.
329#[derive(Debug)]
330pub struct Credentials {
331    ffi: *mut crate::ffi::RiceCredentials,
332}
333
334impl Credentials {
335    /// Create a new set of ICE/TURN credentials with the provided username and password.
336    pub fn new(ufrag: &str, passwd: &str) -> Self {
337        let ufrag = std::ffi::CString::new(ufrag).unwrap();
338        let passwd = std::ffi::CString::new(passwd).unwrap();
339        unsafe {
340            Self {
341                ffi: crate::ffi::rice_credentials_new(ufrag.as_ptr(), passwd.as_ptr()),
342            }
343        }
344    }
345
346    pub(crate) fn from_c_full(ffi: *mut crate::ffi::RiceCredentials) -> Self {
347        Self { ffi }
348    }
349
350    #[allow(clippy::wrong_self_convention)]
351    pub(crate) fn into_c_none(&self) -> *const crate::ffi::RiceCredentials {
352        self.ffi
353    }
354}
355
356impl PartialEq for Credentials {
357    fn eq(&self, other: &Self) -> bool {
358        unsafe { crate::ffi::rice_credentials_eq(self.ffi, other.ffi) }
359    }
360}
361
362impl Clone for Credentials {
363    fn clone(&self) -> Self {
364        Self {
365            ffi: unsafe { crate::ffi::rice_credentials_copy(self.ffi) },
366        }
367    }
368}
369
370impl Drop for Credentials {
371    fn drop(&mut self) {
372        unsafe { crate::ffi::rice_credentials_free(self.ffi) }
373    }
374}
375
376/// A locally gathered candidate.
377#[derive(Debug)]
378pub struct GatheredCandidate {
379    pub(crate) ffi: crate::ffi::RiceGatheredCandidate,
380}
381
382unsafe impl Send for GatheredCandidate {}
383
384impl GatheredCandidate {
385    pub(crate) fn from_c_full(ffi: crate::ffi::RiceGatheredCandidate) -> Self {
386        Self { ffi }
387    }
388
389    /// Consume the contents of the mutable reference without leaving an invalid invariant.
390    ///
391    /// THis is useful when handling
392    /// [`AgentGatheredCandidate`](crate::agent::AgentGatheredCandidate).
393    pub fn take(&mut self) -> Self {
394        unsafe {
395            let mut ffi = crate::ffi::RiceGatheredCandidate {
396                candidate: crate::ffi::RiceCandidate::zeroed(),
397                turn_agent: self.ffi.turn_agent,
398            };
399            crate::ffi::rice_candidate_copy_into(&self.ffi.candidate, &mut ffi.candidate);
400            self.ffi.turn_agent = core::ptr::null_mut();
401            Self { ffi }
402        }
403    }
404
405    /// The [`Candidate`].
406    pub fn candidate(&self) -> crate::candidate::Candidate {
407        unsafe { crate::candidate::Candidate::from_c_none(&self.ffi.candidate) }
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use sans_io_time::Instant;
414
415    use super::*;
416    use crate::agent::{Agent, AgentPoll};
417
418    #[test]
419    fn getters() {
420        let _log = crate::tests::test_init_log();
421        let agent = Agent::builder().build();
422        let stream = agent.add_stream();
423        let component = stream.add_component();
424
425        assert_eq!(stream.id(), stream.clone().id());
426        assert_eq!(agent.id(), stream.agent().id());
427        assert_eq!(
428            component.id(),
429            stream.component(component.id()).unwrap().id()
430        );
431        assert!(stream.component(0).is_none());
432    }
433
434    #[test]
435    fn gather_candidates() {
436        let addr: crate::Address = "192.168.0.1:1000".parse().unwrap();
437        let stun_addr: crate::Address = "102.168.0.200:2000".parse().unwrap();
438        let agent = Agent::builder().build();
439        let stream = agent.add_stream();
440        let component = stream.add_component();
441        let transport = TransportType::Tcp;
442        let local_credentials = Credentials::new("luser", "lpass");
443        let remote_credentials = Credentials::new("ruser", "rpass");
444
445        agent.add_stun_server(transport, stun_addr);
446        stream.set_local_credentials(&local_credentials);
447        stream.set_remote_credentials(&remote_credentials);
448        component
449            .gather_candidates([(transport, &addr)], [])
450            .unwrap();
451
452        let AgentPoll::AllocateSocket(ref alloc) = agent.poll(Instant::ZERO) else {
453            unreachable!()
454        };
455        let from = &alloc.from;
456        let to = &alloc.to;
457        let component_id = alloc.component_id;
458
459        let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(Instant::ZERO) else {
460            unreachable!()
461        };
462
463        let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(Instant::ZERO) else {
464            unreachable!()
465        };
466
467        let AgentPoll::WaitUntilNanos(now) = agent.poll(Instant::ZERO) else {
468            unreachable!()
469        };
470
471        let tcp_from_addr: crate::Address = "192.168.200.4:3000".parse().unwrap();
472        stream.allocated_socket(
473            component_id,
474            TransportType::Tcp,
475            from,
476            to,
477            Some(tcp_from_addr),
478            Instant::from_nanos(now),
479        );
480
481        let _ = agent.poll_transmit(Instant::ZERO).unwrap();
482
483        let _ = agent.poll(Instant::ZERO);
484        let _ = agent.poll(Instant::ZERO);
485    }
486}