Skip to main content

rice_c/
stream.rs

1// Copyright (C) 2025 Matthew Waters <matthew@centricular.com>
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8//
9// SPDX-License-Identifier: MIT OR Apache-2.0
10
11//! A [`Stream`] in an ICE [`Agent`](crate::agent::Agent).
12
13use sans_io_time::Instant;
14
15use crate::{candidate::TransportType, mut_override};
16
17/// An ICE [`Stream`]
18#[derive(Debug)]
19pub struct Stream {
20    ffi: *mut crate::ffi::RiceStream,
21}
22
23unsafe impl Send for Stream {}
24unsafe impl Sync for Stream {}
25
26impl Clone for Stream {
27    fn clone(&self) -> Self {
28        Self {
29            ffi: unsafe { crate::ffi::rice_stream_ref(self.ffi) },
30        }
31    }
32}
33
34impl Drop for Stream {
35    fn drop(&mut self) {
36        unsafe { crate::ffi::rice_stream_unref(self.ffi) };
37    }
38}
39
40impl Stream {
41    pub(crate) fn from_c_full(stream: *mut crate::ffi::RiceStream) -> Self {
42        Self { ffi: stream }
43    }
44
45    /// An agent-global unique identifier for the ICE stream.
46    pub fn id(&self) -> usize {
47        unsafe { crate::ffi::rice_stream_get_id(self.ffi) }
48    }
49
50    /// The [`Agent`](crate::agent::Agent) that handles this [`Stream`].
51    pub fn agent(&self) -> crate::agent::Agent {
52        unsafe { crate::agent::Agent::from_c_full(crate::ffi::rice_stream_get_agent(self.ffi)) }
53    }
54
55    /// Add a `Component` to this stream.
56    pub fn add_component(&self) -> crate::component::Component {
57        unsafe {
58            crate::component::Component::from_c_full(
59                crate::ffi::rice_stream_add_component(self.ffi),
60                self.id(),
61            )
62        }
63    }
64
65    /// Retrieve a `Component` from this stream.  If the index doesn't exist or a component is not
66    /// available at that index, `None` is returned
67    pub fn component(&self, id: usize) -> Option<crate::component::Component> {
68        let ret = unsafe { crate::ffi::rice_stream_get_component(self.ffi, id) };
69        if ret.is_null() {
70            None
71        } else {
72            Some(crate::component::Component::from_c_full(ret, self.id()))
73        }
74    }
75
76    /// Retreive the previouly set local ICE credentials for this `Stream`.
77    pub fn local_credentials(&self) -> Option<Credentials> {
78        let ret = unsafe { crate::ffi::rice_stream_get_local_credentials(self.ffi) };
79        if ret.is_null() {
80            None
81        } else {
82            Some(Credentials::from_c_full(ret))
83        }
84    }
85
86    /// Retreive the previouly set remote ICE credentials for this `Stream`.
87    pub fn remote_credentials(&self) -> Option<Credentials> {
88        let ret = unsafe { crate::ffi::rice_stream_get_remote_credentials(self.ffi) };
89        if ret.is_null() {
90            None
91        } else {
92            Some(Credentials::from_c_full(ret))
93        }
94    }
95
96    /// Set local ICE credentials for this `Stream`.
97    ///
98    /// # Examples
99    ///
100    /// ```
101    /// # use rice_c::agent::Agent;
102    /// # use rice_c::stream::Credentials;
103    /// let mut agent = Agent::default();
104    /// let stream = agent.add_stream();
105    /// let credentials = Credentials::new("user", "pass");
106    /// stream.set_local_credentials(&credentials);
107    /// assert_eq!(stream.local_credentials(), Some(credentials));
108    /// ```
109    pub fn set_local_credentials(&self, credentials: &Credentials) {
110        unsafe {
111            crate::ffi::rice_stream_set_local_credentials(self.ffi, credentials.into_c_none())
112        }
113    }
114
115    /// Set remote ICE credentials for this `Stream`.
116    ///
117    /// # Examples
118    ///
119    /// ```
120    /// # use rice_c::agent::Agent;
121    /// # use rice_c::stream::Credentials;
122    /// let agent = Agent::default();
123    /// let stream = agent.add_stream();
124    /// let credentials = Credentials::new("user", "pass");
125    /// stream.set_remote_credentials(&credentials);
126    /// assert_eq!(stream.remote_credentials(), Some(credentials));
127    /// ```
128    pub fn set_remote_credentials(&self, credentials: &Credentials) {
129        unsafe {
130            crate::ffi::rice_stream_set_remote_credentials(self.ffi, credentials.into_c_none())
131        }
132    }
133
134    /// Signal the end of local candidates.  Calling this function may allow ICE processing to
135    /// complete.
136    pub fn end_of_local_candidates(&self) {
137        unsafe { crate::ffi::rice_stream_end_of_local_candidates(self.ffi) }
138    }
139
140    /// Add a remote candidate for connection checks for use with this stream
141    pub fn add_remote_candidate(&self, cand: &crate::candidate::Candidate) {
142        unsafe { crate::ffi::rice_stream_add_remote_candidate(self.ffi, cand.as_c()) }
143    }
144
145    /// Indicate that no more candidates are expected from the peer.  This may allow the ICE
146    /// process to complete.
147    pub fn end_of_remote_candidates(&self) {
148        unsafe { crate::ffi::rice_stream_end_of_remote_candidates(self.ffi) }
149    }
150
151    /// Add a local candidate for this stream.
152    ///
153    /// Returns whether the candidate was added internally.
154    pub fn add_local_gathered_candidate(&self, gathered: GatheredCandidate) -> bool {
155        unsafe { crate::ffi::rice_stream_add_local_gathered_candidate(self.ffi, &gathered.ffi) }
156    }
157
158    /// Provide a reply to the
159    /// [`AgentPoll::AllocateSocket`](crate::agent::AgentPoll::AllocateSocket) request.  The
160    /// `component_id`, `transport`, `from`, and `to` values must match exactly with the request.
161    pub fn allocated_socket(
162        &self,
163        component_id: usize,
164        transport: TransportType,
165        from: &crate::Address,
166        to: &crate::Address,
167        socket_addr: Option<crate::Address>,
168        now: Instant,
169    ) {
170        let socket_addr = if let Some(addr) = socket_addr {
171            addr.into_c_full()
172        } else {
173            core::ptr::null_mut()
174        };
175        unsafe {
176            crate::ffi::rice_stream_handle_allocated_socket(
177                self.ffi,
178                component_id,
179                transport.into(),
180                from.as_c(),
181                to.as_c(),
182                socket_addr,
183                now.as_nanos(),
184            )
185        }
186    }
187
188    /// The list of component ids available in this stream
189    pub fn component_ids(&self) -> Vec<usize> {
190        unsafe {
191            let mut len = 0;
192            crate::ffi::rice_stream_component_ids(self.ffi, &mut len, core::ptr::null_mut());
193            let mut ret = vec![0; len];
194            crate::ffi::rice_stream_component_ids(self.ffi, &mut len, ret.as_mut_ptr());
195            ret.resize(len.min(ret.len()), 0);
196            ret
197        }
198    }
199
200    /// Provide the stream with data that has been received on an external socket.  The returned
201    /// value indicates what has been done with the data and any application data that has been
202    /// received.
203    pub fn handle_incoming_data<'a>(
204        &self,
205        component_id: usize,
206        transport: TransportType,
207        from: crate::Address,
208        to: crate::Address,
209        data: &'a [u8],
210        now: Instant,
211    ) -> StreamIncomingDataReply<'a> {
212        unsafe {
213            let mut stream_ret = crate::ffi::RiceStreamIncomingData::default();
214            crate::ffi::rice_stream_handle_incoming_data(
215                self.ffi,
216                component_id,
217                transport.into(),
218                from.as_c(),
219                to.as_c(),
220                data.as_ptr(),
221                data.len(),
222                now.as_nanos(),
223                &mut stream_ret,
224            );
225            let mut ret = StreamIncomingDataReply {
226                handled: stream_ret.handled,
227                have_more_data: stream_ret.have_more_data,
228                data: None,
229            };
230            if !stream_ret.data.ptr.is_null() && stream_ret.data.size > 0 {
231                ret.data = Some(data);
232            }
233            ret
234        }
235    }
236
237    /// Poll for any received data.
238    ///
239    /// Must be called after `handle_incoming_data` if `have_more_data` is `true`.
240    pub fn poll_recv(&self) -> Option<PollRecv> {
241        unsafe {
242            let mut len = 0;
243            let mut component_id = 0;
244            let ptr = crate::ffi::rice_stream_poll_recv(self.ffi, &mut component_id, &mut len);
245            if ptr.is_null() {
246                return None;
247            }
248            let slice = core::slice::from_raw_parts(ptr, len);
249            Some(PollRecv {
250                component_id,
251                data: RecvData { data: slice },
252            })
253        }
254    }
255}
256
257/// Data that should be sent to a peer as a result of calling [`Stream::poll_recv()`].
258#[derive(Debug)]
259pub struct PollRecv {
260    /// The component id that the data was received for.
261    pub component_id: usize,
262    /// The received data.
263    pub data: RecvData,
264}
265
266/// Data to send.
267#[derive(Debug)]
268pub struct RecvData {
269    data: &'static [u8],
270}
271
272impl core::ops::Deref for RecvData {
273    type Target = [u8];
274    fn deref(&self) -> &Self::Target {
275        self.data
276    }
277}
278
279impl Drop for RecvData {
280    fn drop(&mut self) {
281        unsafe { crate::ffi::rice_free_data(mut_override(self.data.as_ptr())) }
282    }
283}
284
285/// Return value to [`Stream::handle_incoming_data`].
286#[derive(Debug)]
287pub struct StreamIncomingDataReply<'a> {
288    /// Some of the data was handled
289    pub handled: bool,
290    /// Data was received in addition to any in the `data` field that could be retrieved with
291    /// [`Stream::poll_recv`].
292    pub have_more_data: bool,
293    /// Any application data that could be parsed from the incoming data.
294    pub data: Option<&'a [u8]>,
295}
296
297/// A set of ICE/TURN credentials.
298#[derive(Debug)]
299pub struct Credentials {
300    ffi: *mut crate::ffi::RiceCredentials,
301}
302
303impl Credentials {
304    /// Create a new set of ICE/TURN credentials with the provided username and password.
305    pub fn new(ufrag: &str, passwd: &str) -> Self {
306        let ufrag = std::ffi::CString::new(ufrag).unwrap();
307        let passwd = std::ffi::CString::new(passwd).unwrap();
308        unsafe {
309            Self {
310                ffi: crate::ffi::rice_credentials_new(ufrag.as_ptr(), passwd.as_ptr()),
311            }
312        }
313    }
314
315    pub(crate) fn from_c_full(ffi: *mut crate::ffi::RiceCredentials) -> Self {
316        Self { ffi }
317    }
318
319    #[allow(clippy::wrong_self_convention)]
320    pub(crate) fn into_c_none(&self) -> *const crate::ffi::RiceCredentials {
321        self.ffi
322    }
323}
324
325impl PartialEq for Credentials {
326    fn eq(&self, other: &Self) -> bool {
327        unsafe { crate::ffi::rice_credentials_eq(self.ffi, other.ffi) }
328    }
329}
330
331impl Clone for Credentials {
332    fn clone(&self) -> Self {
333        Self {
334            ffi: unsafe { crate::ffi::rice_credentials_copy(self.ffi) },
335        }
336    }
337}
338
339impl Drop for Credentials {
340    fn drop(&mut self) {
341        unsafe { crate::ffi::rice_credentials_free(self.ffi) }
342    }
343}
344
345/// A locally gathered candidate.
346#[derive(Debug)]
347pub struct GatheredCandidate {
348    pub(crate) ffi: crate::ffi::RiceGatheredCandidate,
349}
350
351unsafe impl Send for GatheredCandidate {}
352
353impl GatheredCandidate {
354    pub(crate) fn from_c_full(ffi: crate::ffi::RiceGatheredCandidate) -> Self {
355        Self { ffi }
356    }
357
358    /// Consume the contents of the mutable reference without leaving an invalid invariant.
359    ///
360    /// THis is useful when handling
361    /// [`AgentGatheredCandidate`](crate::agent::AgentGatheredCandidate).
362    pub fn take(&mut self) -> Self {
363        unsafe {
364            let mut ffi = crate::ffi::RiceGatheredCandidate {
365                candidate: crate::ffi::RiceCandidate::zeroed(),
366                turn_agent: self.ffi.turn_agent,
367            };
368            crate::ffi::rice_candidate_copy_into(&self.ffi.candidate, &mut ffi.candidate);
369            self.ffi.turn_agent = core::ptr::null_mut();
370            Self { ffi }
371        }
372    }
373
374    /// The [`Candidate`](crate::candidate::Candidate).
375    pub fn candidate(&self) -> crate::candidate::Candidate {
376        unsafe { crate::candidate::Candidate::from_c_none(&self.ffi.candidate) }
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use sans_io_time::Instant;
383
384    use super::*;
385    use crate::agent::{Agent, AgentPoll};
386
387    #[test]
388    fn gather_candidates() {
389        let addr: crate::Address = "192.168.0.1:1000".parse().unwrap();
390        let stun_addr: crate::Address = "102.168.0.200:2000".parse().unwrap();
391        let agent = Agent::builder().build();
392        let stream = agent.add_stream();
393        let component = stream.add_component();
394        let transport = TransportType::Tcp;
395        let local_credentials = Credentials::new("luser", "lpass");
396        let remote_credentials = Credentials::new("ruser", "rpass");
397
398        agent.add_stun_server(transport, stun_addr);
399        stream.set_local_credentials(&local_credentials);
400        stream.set_remote_credentials(&remote_credentials);
401        component
402            .gather_candidates([(transport, &addr)], [])
403            .unwrap();
404
405        let AgentPoll::AllocateSocket(ref alloc) = agent.poll(Instant::ZERO) else {
406            unreachable!()
407        };
408        let from = &alloc.from;
409        let to = &alloc.to;
410        let component_id = alloc.component_id;
411
412        let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(Instant::ZERO) else {
413            unreachable!()
414        };
415
416        let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(Instant::ZERO) else {
417            unreachable!()
418        };
419
420        let AgentPoll::WaitUntilNanos(now) = agent.poll(Instant::ZERO) else {
421            unreachable!()
422        };
423
424        let tcp_from_addr: crate::Address = "192.168.200.4:3000".parse().unwrap();
425        stream.allocated_socket(
426            component_id,
427            TransportType::Tcp,
428            from,
429            to,
430            Some(tcp_from_addr),
431            Instant::from_nanos(now),
432        );
433
434        let _ = agent.poll_transmit(Instant::ZERO).unwrap();
435
436        let _ = agent.poll(Instant::ZERO);
437        let _ = agent.poll(Instant::ZERO);
438    }
439}