rice_c/
stream.rs

1// Copyright (C) 2025 Matthew Waters <matthew@centricular.com>
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! A [`Stream`] in an ICE [`Agent`](crate::agent::Agent).
10
11use crate::{candidate::TransportType, mut_override};
12
13/// An ICE [`Stream`]
14#[derive(Debug)]
15pub struct Stream {
16    ffi: *mut crate::ffi::RiceStream,
17}
18
19unsafe impl Send for Stream {}
20unsafe impl Sync for Stream {}
21
22impl Clone for Stream {
23    fn clone(&self) -> Self {
24        Self {
25            ffi: unsafe { crate::ffi::rice_stream_ref(self.ffi) },
26        }
27    }
28}
29
30impl Drop for Stream {
31    fn drop(&mut self) {
32        unsafe { crate::ffi::rice_stream_unref(self.ffi) };
33    }
34}
35
36impl Stream {
37    pub(crate) fn from_c_full(stream: *mut crate::ffi::RiceStream) -> Self {
38        Self { ffi: stream }
39    }
40
41    pub fn id(&self) -> usize {
42        unsafe { crate::ffi::rice_stream_get_id(self.ffi) }
43    }
44
45    /// Add a `Component` to this stream.
46    pub fn add_component(&self) -> crate::component::Component {
47        unsafe {
48            crate::component::Component::from_c_full(
49                crate::ffi::rice_stream_add_component(self.ffi),
50                self.id(),
51            )
52        }
53    }
54
55    /// Retrieve a `Component` from this stream.  If the index doesn't exist or a component is not
56    /// available at that index, `None` is returned
57    pub fn component(&self, id: usize) -> Option<crate::component::Component> {
58        let ret = unsafe { crate::ffi::rice_stream_get_component(self.ffi, id) };
59        if ret.is_null() {
60            None
61        } else {
62            Some(crate::component::Component::from_c_full(ret, self.id()))
63        }
64    }
65
66    /// Retreive the previouly set local ICE credentials for this `Stream`.
67    pub fn local_credentials(&self) -> Option<Credentials> {
68        let ret = unsafe { crate::ffi::rice_stream_get_local_credentials(self.ffi) };
69        if ret.is_null() {
70            None
71        } else {
72            Some(Credentials::from_c_full(ret))
73        }
74    }
75
76    /// Retreive the previouly set remote ICE credentials for this `Stream`.
77    pub fn remote_credentials(&self) -> Option<Credentials> {
78        let ret = unsafe { crate::ffi::rice_stream_get_remote_credentials(self.ffi) };
79        if ret.is_null() {
80            None
81        } else {
82            Some(Credentials::from_c_full(ret))
83        }
84    }
85
86    /// Set local ICE credentials for this `Stream`.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// # use rice_c::agent::Agent;
92    /// # use rice_c::stream::Credentials;
93    /// let mut agent = Agent::default();
94    /// let stream = agent.add_stream();
95    /// let credentials = Credentials::new("user", "pass");
96    /// stream.set_local_credentials(&credentials);
97    /// assert_eq!(stream.local_credentials(), Some(credentials));
98    /// ```
99    pub fn set_local_credentials(&self, credentials: &Credentials) {
100        unsafe {
101            crate::ffi::rice_stream_set_local_credentials(self.ffi, credentials.into_c_none())
102        }
103    }
104
105    /// Set remote ICE credentials for this `Stream`.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// # use rice_c::agent::Agent;
111    /// # use rice_c::stream::Credentials;
112    /// let agent = Agent::default();
113    /// let stream = agent.add_stream();
114    /// let credentials = Credentials::new("user", "pass");
115    /// stream.set_remote_credentials(&credentials);
116    /// assert_eq!(stream.remote_credentials(), Some(credentials));
117    /// ```
118    pub fn set_remote_credentials(&self, credentials: &Credentials) {
119        unsafe {
120            crate::ffi::rice_stream_set_remote_credentials(self.ffi, credentials.into_c_none())
121        }
122    }
123
124    /// Signal the end of local candidates.  Calling this function may allow ICE processing to
125    /// complete.
126    pub fn end_of_local_candidates(&self) {
127        unsafe { crate::ffi::rice_stream_end_of_local_candidates(self.ffi) }
128    }
129
130    /// Add a remote candidate for connection checks for use with this stream
131    pub fn add_remote_candidate(&self, cand: &crate::candidate::Candidate) {
132        unsafe { crate::ffi::rice_stream_add_remote_candidate(self.ffi, cand.as_c()) }
133    }
134
135    /// Indicate that no more candidates are expected from the peer.  This may allow the ICE
136    /// process to complete.
137    pub fn end_of_remote_candidates(&self) {
138        unsafe { crate::ffi::rice_stream_end_of_remote_candidates(self.ffi) }
139    }
140
141    /// Add a local candidate for this stream.
142    ///
143    /// Returns whether the candidate was added internally.
144    pub fn add_local_gathered_candidate(&self, gathered: GatheredCandidate) -> bool {
145        unsafe { crate::ffi::rice_stream_add_local_gathered_candidate(self.ffi, &gathered.ffi) }
146    }
147
148    /// Provide a reply to the
149    /// [`AgentPoll::AllocateSocket`](crate::agent::AgentPoll::AllocateSocket) request.  The
150    /// `component_id`, `transport`, `from`, and `to` values must match exactly with the request.
151    pub fn allocated_socket(
152        &self,
153        component_id: usize,
154        transport: TransportType,
155        from: &crate::Address,
156        to: &crate::Address,
157        socket_addr: Option<crate::Address>,
158    ) {
159        let socket_addr = if let Some(addr) = socket_addr {
160            addr.into_c_full()
161        } else {
162            core::ptr::null_mut()
163        };
164        unsafe {
165            crate::ffi::rice_stream_handle_allocated_socket(
166                self.ffi,
167                component_id,
168                transport.into(),
169                from.as_c(),
170                to.as_c(),
171                socket_addr,
172            )
173        }
174    }
175
176    /// The list of component ids available in this stream
177    pub fn component_ids(&self) -> Vec<usize> {
178        unsafe {
179            let mut len = 0;
180            crate::ffi::rice_stream_component_ids(self.ffi, &mut len, core::ptr::null_mut());
181            let mut ret = vec![0; len];
182            crate::ffi::rice_stream_component_ids(self.ffi, &mut len, ret.as_mut_ptr());
183            ret.resize(len.min(ret.len()), 0);
184            ret
185        }
186    }
187
188    /// Provide the stream with data that has been received on an external socket.  The returned
189    /// value indicates what has been done with the data and any application data that has been
190    /// received.
191    pub fn handle_incoming_data<'a>(
192        &self,
193        component_id: usize,
194        transport: TransportType,
195        from: crate::Address,
196        to: crate::Address,
197        data: &'a [u8],
198        now_micros: u64,
199    ) -> StreamIncomingDataReply<'a> {
200        unsafe {
201            let mut stream_ret = crate::ffi::RiceStreamIncomingData::default();
202            crate::ffi::rice_stream_handle_incoming_data(
203                self.ffi,
204                component_id,
205                transport.into(),
206                from.as_c(),
207                to.as_c(),
208                data.as_ptr(),
209                data.len(),
210                now_micros,
211                &mut stream_ret,
212            );
213            let mut ret = StreamIncomingDataReply {
214                handled: stream_ret.handled,
215                have_more_data: stream_ret.have_more_data,
216                data: None,
217            };
218            if !stream_ret.data.ptr.is_null() && stream_ret.data.size > 0 {
219                ret.data = Some(data);
220            }
221            ret
222        }
223    }
224
225    /// Poll for any received data.
226    ///
227    /// Must be called after `handle_incoming_data` if `have_more_data` is `true`.
228    pub fn poll_recv(&self) -> Option<PollRecv> {
229        unsafe {
230            let mut len = 0;
231            let mut component_id = 0;
232            let ptr = crate::ffi::rice_stream_poll_recv(self.ffi, &mut component_id, &mut len);
233            if ptr.is_null() {
234                return None;
235            }
236            let slice = core::slice::from_raw_parts(ptr, len);
237            Some(PollRecv {
238                component_id,
239                data: RecvData { data: slice },
240            })
241        }
242    }
243}
244
245/// Data that should be sent to a peer as a result of calling [`Stream::poll_recv()`].
246#[derive(Debug)]
247pub struct PollRecv {
248    pub component_id: usize,
249    pub data: RecvData,
250}
251
252/// Data to send.
253#[derive(Debug)]
254pub struct RecvData {
255    data: &'static [u8],
256}
257
258impl core::ops::Deref for RecvData {
259    type Target = [u8];
260    fn deref(&self) -> &Self::Target {
261        self.data
262    }
263}
264
265impl Drop for RecvData {
266    fn drop(&mut self) {
267        unsafe { crate::ffi::rice_free_data(mut_override(self.data.as_ptr())) }
268    }
269}
270
271/// Return value to [`Stream::handle_incoming_data`].
272#[derive(Debug)]
273pub struct StreamIncomingDataReply<'a> {
274    /// Some of the data was handled
275    pub handled: bool,
276    /// Data was received in addition to any in the `data` field that could be retrieved with
277    /// [`Stream::poll_recv`].
278    pub have_more_data: bool,
279    /// Any application data that could be parsed from the incoming data.
280    pub data: Option<&'a [u8]>,
281}
282
283/// A set of ICE/TURN credentials.
284#[derive(Debug)]
285pub struct Credentials {
286    ffi: *mut crate::ffi::RiceCredentials,
287}
288
289impl Credentials {
290    /// Create a new set of ICE/TURN credentials with the provided username and password.
291    pub fn new(ufrag: &str, passwd: &str) -> Self {
292        let ufrag = std::ffi::CString::new(ufrag).unwrap();
293        let passwd = std::ffi::CString::new(passwd).unwrap();
294        unsafe {
295            Self {
296                ffi: crate::ffi::rice_credentials_new(ufrag.as_ptr(), passwd.as_ptr()),
297            }
298        }
299    }
300
301    pub(crate) fn from_c_full(ffi: *mut crate::ffi::RiceCredentials) -> Self {
302        Self { ffi }
303    }
304
305    #[allow(clippy::wrong_self_convention)]
306    pub(crate) fn into_c_none(&self) -> *const crate::ffi::RiceCredentials {
307        self.ffi
308    }
309}
310
311impl PartialEq for Credentials {
312    fn eq(&self, other: &Self) -> bool {
313        unsafe { crate::ffi::rice_credentials_eq(self.ffi, other.ffi) }
314    }
315}
316
317impl Clone for Credentials {
318    fn clone(&self) -> Self {
319        Self {
320            ffi: unsafe { crate::ffi::rice_credentials_copy(self.ffi) },
321        }
322    }
323}
324
325impl Drop for Credentials {
326    fn drop(&mut self) {
327        unsafe { crate::ffi::rice_credentials_free(self.ffi) }
328    }
329}
330
331/// A locally gathered candidate.
332#[derive(Debug)]
333pub struct GatheredCandidate {
334    pub(crate) ffi: crate::ffi::RiceGatheredCandidate,
335}
336
337impl GatheredCandidate {
338    pub(crate) fn from_c_full(ffi: crate::ffi::RiceGatheredCandidate) -> Self {
339        Self { ffi }
340    }
341
342    pub fn take(&mut self) -> Self {
343        unsafe {
344            let mut ffi = crate::ffi::RiceGatheredCandidate {
345                candidate: crate::ffi::RiceCandidate::zeroed(),
346                turn_agent: self.ffi.turn_agent,
347            };
348            crate::ffi::rice_candidate_copy_into(&self.ffi.candidate, &mut ffi.candidate);
349            self.ffi.turn_agent = core::ptr::null_mut();
350            Self { ffi }
351        }
352    }
353
354    /// The [`Candidate`](crate::candidate::Candidate).
355    pub fn candidate(&self) -> crate::candidate::Candidate {
356        unsafe { crate::candidate::Candidate::from_c_none(&self.ffi.candidate) }
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use crate::agent::{Agent, AgentPoll};
364
365    #[test]
366    fn gather_candidates() {
367        let addr: crate::Address = "192.168.0.1:1000".parse().unwrap();
368        let stun_addr: crate::Address = "102.168.0.200:2000".parse().unwrap();
369        let agent = Agent::builder().build();
370        let stream = agent.add_stream();
371        let component = stream.add_component();
372        let transport = TransportType::Tcp;
373        let local_credentials = Credentials::new("luser", "lpass");
374        let remote_credentials = Credentials::new("ruser", "rpass");
375
376        agent.add_stun_server(transport, stun_addr);
377        stream.set_local_credentials(&local_credentials);
378        stream.set_remote_credentials(&remote_credentials);
379        component.gather_candidates([(transport, addr)]).unwrap();
380
381        let AgentPoll::AllocateSocket(ref alloc) = agent.poll(0) else {
382            unreachable!()
383        };
384        let from = &alloc.from;
385        let to = &alloc.to;
386        let component_id = alloc.component_id;
387
388        let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(0) else {
389            unreachable!()
390        };
391
392        let AgentPoll::GatheredCandidate(ref _candidate) = agent.poll(0) else {
393            unreachable!()
394        };
395
396        let AgentPoll::WaitUntilMicros(_now) = agent.poll(0) else {
397            unreachable!()
398        };
399
400        let tcp_from_addr: crate::Address = "192.168.200.4:3000".parse().unwrap();
401        stream.allocated_socket(
402            component_id,
403            TransportType::Tcp,
404            from,
405            to,
406            Some(tcp_from_addr),
407        );
408
409        let _ = agent.poll_transmit(0).unwrap();
410
411        let _ = agent.poll(0);
412        let _ = agent.poll(0);
413    }
414}