rice_c/
stream.rs

1// Copyright (C) 2025 Matthew Waters <matthew@centricular.com>
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! A [`Stream`] in an ICE [`Agent`](crate::agent::Agent).
10
11use sans_io_time::Instant;
12
13use crate::{candidate::TransportType, mut_override};
14
15/// An ICE [`Stream`]
16#[derive(Debug)]
17pub struct Stream {
18    ffi: *mut crate::ffi::RiceStream,
19}
20
21unsafe impl Send for Stream {}
22unsafe impl Sync for Stream {}
23
24impl Clone for Stream {
25    fn clone(&self) -> Self {
26        Self {
27            ffi: unsafe { crate::ffi::rice_stream_ref(self.ffi) },
28        }
29    }
30}
31
32impl Drop for Stream {
33    fn drop(&mut self) {
34        unsafe { crate::ffi::rice_stream_unref(self.ffi) };
35    }
36}
37
38impl Stream {
39    pub(crate) fn from_c_full(stream: *mut crate::ffi::RiceStream) -> Self {
40        Self { ffi: stream }
41    }
42
43    /// An agent-global unique identifier for the ICE stream.
44    pub fn id(&self) -> usize {
45        unsafe { crate::ffi::rice_stream_get_id(self.ffi) }
46    }
47
48    /// The [`Agent`](crate::agent::Agent) that handles this [`Stream`].
49    pub fn agent(&self) -> crate::agent::Agent {
50        unsafe { crate::agent::Agent::from_c_full(crate::ffi::rice_stream_get_agent(self.ffi)) }
51    }
52
53    /// Add a `Component` to this stream.
54    pub fn add_component(&self) -> crate::component::Component {
55        unsafe {
56            crate::component::Component::from_c_full(
57                crate::ffi::rice_stream_add_component(self.ffi),
58                self.id(),
59            )
60        }
61    }
62
63    /// Retrieve a `Component` from this stream.  If the index doesn't exist or a component is not
64    /// available at that index, `None` is returned
65    pub fn component(&self, id: usize) -> Option<crate::component::Component> {
66        let ret = unsafe { crate::ffi::rice_stream_get_component(self.ffi, id) };
67        if ret.is_null() {
68            None
69        } else {
70            Some(crate::component::Component::from_c_full(ret, self.id()))
71        }
72    }
73
74    /// Retreive the previouly set local ICE credentials for this `Stream`.
75    pub fn local_credentials(&self) -> Option<Credentials> {
76        let ret = unsafe { crate::ffi::rice_stream_get_local_credentials(self.ffi) };
77        if ret.is_null() {
78            None
79        } else {
80            Some(Credentials::from_c_full(ret))
81        }
82    }
83
84    /// Retreive the previouly set remote ICE credentials for this `Stream`.
85    pub fn remote_credentials(&self) -> Option<Credentials> {
86        let ret = unsafe { crate::ffi::rice_stream_get_remote_credentials(self.ffi) };
87        if ret.is_null() {
88            None
89        } else {
90            Some(Credentials::from_c_full(ret))
91        }
92    }
93
94    /// Set local ICE credentials for this `Stream`.
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// # use rice_c::agent::Agent;
100    /// # use rice_c::stream::Credentials;
101    /// let mut agent = Agent::default();
102    /// let stream = agent.add_stream();
103    /// let credentials = Credentials::new("user", "pass");
104    /// stream.set_local_credentials(&credentials);
105    /// assert_eq!(stream.local_credentials(), Some(credentials));
106    /// ```
107    pub fn set_local_credentials(&self, credentials: &Credentials) {
108        unsafe {
109            crate::ffi::rice_stream_set_local_credentials(self.ffi, credentials.into_c_none())
110        }
111    }
112
113    /// Set remote ICE credentials for this `Stream`.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// # use rice_c::agent::Agent;
119    /// # use rice_c::stream::Credentials;
120    /// let agent = Agent::default();
121    /// let stream = agent.add_stream();
122    /// let credentials = Credentials::new("user", "pass");
123    /// stream.set_remote_credentials(&credentials);
124    /// assert_eq!(stream.remote_credentials(), Some(credentials));
125    /// ```
126    pub fn set_remote_credentials(&self, credentials: &Credentials) {
127        unsafe {
128            crate::ffi::rice_stream_set_remote_credentials(self.ffi, credentials.into_c_none())
129        }
130    }
131
132    /// Signal the end of local candidates.  Calling this function may allow ICE processing to
133    /// complete.
134    pub fn end_of_local_candidates(&self) {
135        unsafe { crate::ffi::rice_stream_end_of_local_candidates(self.ffi) }
136    }
137
138    /// Add a remote candidate for connection checks for use with this stream
139    pub fn add_remote_candidate(&self, cand: &crate::candidate::Candidate) {
140        unsafe { crate::ffi::rice_stream_add_remote_candidate(self.ffi, cand.as_c()) }
141    }
142
143    /// Indicate that no more candidates are expected from the peer.  This may allow the ICE
144    /// process to complete.
145    pub fn end_of_remote_candidates(&self) {
146        unsafe { crate::ffi::rice_stream_end_of_remote_candidates(self.ffi) }
147    }
148
149    /// Add a local candidate for this stream.
150    ///
151    /// Returns whether the candidate was added internally.
152    pub fn add_local_gathered_candidate(&self, gathered: GatheredCandidate) -> bool {
153        unsafe { crate::ffi::rice_stream_add_local_gathered_candidate(self.ffi, &gathered.ffi) }
154    }
155
156    /// Provide a reply to the
157    /// [`AgentPoll::AllocateSocket`](crate::agent::AgentPoll::AllocateSocket) request.  The
158    /// `component_id`, `transport`, `from`, and `to` values must match exactly with the request.
159    pub fn allocated_socket(
160        &self,
161        component_id: usize,
162        transport: TransportType,
163        from: &crate::Address,
164        to: &crate::Address,
165        socket_addr: Option<crate::Address>,
166    ) {
167        let socket_addr = if let Some(addr) = socket_addr {
168            addr.into_c_full()
169        } else {
170            core::ptr::null_mut()
171        };
172        unsafe {
173            crate::ffi::rice_stream_handle_allocated_socket(
174                self.ffi,
175                component_id,
176                transport.into(),
177                from.as_c(),
178                to.as_c(),
179                socket_addr,
180            )
181        }
182    }
183
184    /// The list of component ids available in this stream
185    pub fn component_ids(&self) -> Vec<usize> {
186        unsafe {
187            let mut len = 0;
188            crate::ffi::rice_stream_component_ids(self.ffi, &mut len, core::ptr::null_mut());
189            let mut ret = vec![0; len];
190            crate::ffi::rice_stream_component_ids(self.ffi, &mut len, ret.as_mut_ptr());
191            ret.resize(len.min(ret.len()), 0);
192            ret
193        }
194    }
195
196    /// Provide the stream with data that has been received on an external socket.  The returned
197    /// value indicates what has been done with the data and any application data that has been
198    /// received.
199    pub fn handle_incoming_data<'a>(
200        &self,
201        component_id: usize,
202        transport: TransportType,
203        from: crate::Address,
204        to: crate::Address,
205        data: &'a [u8],
206        now: Instant,
207    ) -> StreamIncomingDataReply<'a> {
208        unsafe {
209            let mut stream_ret = crate::ffi::RiceStreamIncomingData::default();
210            crate::ffi::rice_stream_handle_incoming_data(
211                self.ffi,
212                component_id,
213                transport.into(),
214                from.as_c(),
215                to.as_c(),
216                data.as_ptr(),
217                data.len(),
218                now.as_nanos(),
219                &mut stream_ret,
220            );
221            let mut ret = StreamIncomingDataReply {
222                handled: stream_ret.handled,
223                have_more_data: stream_ret.have_more_data,
224                data: None,
225            };
226            if !stream_ret.data.ptr.is_null() && stream_ret.data.size > 0 {
227                ret.data = Some(data);
228            }
229            ret
230        }
231    }
232
233    /// Poll for any received data.
234    ///
235    /// Must be called after `handle_incoming_data` if `have_more_data` is `true`.
236    pub fn poll_recv(&self) -> Option<PollRecv> {
237        unsafe {
238            let mut len = 0;
239            let mut component_id = 0;
240            let ptr = crate::ffi::rice_stream_poll_recv(self.ffi, &mut component_id, &mut len);
241            if ptr.is_null() {
242                return None;
243            }
244            let slice = core::slice::from_raw_parts(ptr, len);
245            Some(PollRecv {
246                component_id,
247                data: RecvData { data: slice },
248            })
249        }
250    }
251}
252
253/// Data that should be sent to a peer as a result of calling [`Stream::poll_recv()`].
254#[derive(Debug)]
255pub struct PollRecv {
256    /// The component id that the data was received for.
257    pub component_id: usize,
258    /// The received data.
259    pub data: RecvData,
260}
261
262/// Data to send.
263#[derive(Debug)]
264pub struct RecvData {
265    data: &'static [u8],
266}
267
268impl core::ops::Deref for RecvData {
269    type Target = [u8];
270    fn deref(&self) -> &Self::Target {
271        self.data
272    }
273}
274
275impl Drop for RecvData {
276    fn drop(&mut self) {
277        unsafe { crate::ffi::rice_free_data(mut_override(self.data.as_ptr())) }
278    }
279}
280
281/// Return value to [`Stream::handle_incoming_data`].
282#[derive(Debug)]
283pub struct StreamIncomingDataReply<'a> {
284    /// Some of the data was handled
285    pub handled: bool,
286    /// Data was received in addition to any in the `data` field that could be retrieved with
287    /// [`Stream::poll_recv`].
288    pub have_more_data: bool,
289    /// Any application data that could be parsed from the incoming data.
290    pub data: Option<&'a [u8]>,
291}
292
293/// A set of ICE/TURN credentials.
294#[derive(Debug)]
295pub struct Credentials {
296    ffi: *mut crate::ffi::RiceCredentials,
297}
298
299impl Credentials {
300    /// Create a new set of ICE/TURN credentials with the provided username and password.
301    pub fn new(ufrag: &str, passwd: &str) -> Self {
302        let ufrag = std::ffi::CString::new(ufrag).unwrap();
303        let passwd = std::ffi::CString::new(passwd).unwrap();
304        unsafe {
305            Self {
306                ffi: crate::ffi::rice_credentials_new(ufrag.as_ptr(), passwd.as_ptr()),
307            }
308        }
309    }
310
311    pub(crate) fn from_c_full(ffi: *mut crate::ffi::RiceCredentials) -> Self {
312        Self { ffi }
313    }
314
315    #[allow(clippy::wrong_self_convention)]
316    pub(crate) fn into_c_none(&self) -> *const crate::ffi::RiceCredentials {
317        self.ffi
318    }
319}
320
321impl PartialEq for Credentials {
322    fn eq(&self, other: &Self) -> bool {
323        unsafe { crate::ffi::rice_credentials_eq(self.ffi, other.ffi) }
324    }
325}
326
327impl Clone for Credentials {
328    fn clone(&self) -> Self {
329        Self {
330            ffi: unsafe { crate::ffi::rice_credentials_copy(self.ffi) },
331        }
332    }
333}
334
335impl Drop for Credentials {
336    fn drop(&mut self) {
337        unsafe { crate::ffi::rice_credentials_free(self.ffi) }
338    }
339}
340
341/// A locally gathered candidate.
342#[derive(Debug)]
343pub struct GatheredCandidate {
344    pub(crate) ffi: crate::ffi::RiceGatheredCandidate,
345}
346
347unsafe impl Send for GatheredCandidate {}
348
349impl GatheredCandidate {
350    pub(crate) fn from_c_full(ffi: crate::ffi::RiceGatheredCandidate) -> Self {
351        Self { ffi }
352    }
353
354    /// Consume the contents of the mutable reference without leaving an invalid invariant.
355    ///
356    /// THis is useful when handling
357    /// [`AgentGatheredCandidate`](crate::agent::AgentGatheredCandidate).
358    pub fn take(&mut self) -> Self {
359        unsafe {
360            let mut ffi = crate::ffi::RiceGatheredCandidate {
361                candidate: crate::ffi::RiceCandidate::zeroed(),
362                turn_agent: self.ffi.turn_agent,
363            };
364            crate::ffi::rice_candidate_copy_into(&self.ffi.candidate, &mut ffi.candidate);
365            self.ffi.turn_agent = core::ptr::null_mut();
366            Self { ffi }
367        }
368    }
369
370    /// The [`Candidate`](crate::candidate::Candidate).
371    pub fn candidate(&self) -> crate::candidate::Candidate {
372        unsafe { crate::candidate::Candidate::from_c_none(&self.ffi.candidate) }
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use sans_io_time::Instant;
379
380    use super::*;
381    use crate::agent::{Agent, AgentPoll};
382
383    #[test]
384    fn gather_candidates() {
385        let addr: crate::Address = "192.168.0.1:1000".parse().unwrap();
386        let stun_addr: crate::Address = "102.168.0.200:2000".parse().unwrap();
387        let agent = Agent::builder().build();
388        let stream = agent.add_stream();
389        let component = stream.add_component();
390        let transport = TransportType::Tcp;
391        let local_credentials = Credentials::new("luser", "lpass");
392        let remote_credentials = Credentials::new("ruser", "rpass");
393
394        agent.add_stun_server(transport, stun_addr);
395        stream.set_local_credentials(&local_credentials);
396        stream.set_remote_credentials(&remote_credentials);
397        component
398            .gather_candidates([(transport, &addr)], [])
399            .unwrap();
400
401        let AgentPoll::AllocateSocket(ref alloc) = agent.poll(Instant::ZERO) else {
402            unreachable!()
403        };
404        let from = &alloc.from;
405        let to = &alloc.to;
406        let component_id = alloc.component_id;
407
408        let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(Instant::ZERO) else {
409            unreachable!()
410        };
411
412        let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(Instant::ZERO) else {
413            unreachable!()
414        };
415
416        let AgentPoll::WaitUntilNanos(_now) = agent.poll(Instant::ZERO) else {
417            unreachable!()
418        };
419
420        let tcp_from_addr: crate::Address = "192.168.200.4:3000".parse().unwrap();
421        stream.allocated_socket(
422            component_id,
423            TransportType::Tcp,
424            from,
425            to,
426            Some(tcp_from_addr),
427        );
428
429        let _ = agent.poll_transmit(Instant::ZERO).unwrap();
430
431        let _ = agent.poll(Instant::ZERO);
432        let _ = agent.poll(Instant::ZERO);
433    }
434}