rice_proto/
component.rs

1// Copyright (C) 2024 Matthew Waters <matthew@centricular.com>
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! A [`Component`] in an ICE [`Stream`](crate::stream::Stream)
10
11use std::net::SocketAddr;
12use std::time::Instant;
13
14use stun_proto::agent::Transmit;
15use stun_proto::types::message::{Message, MessageWriteVec, BINDING};
16use stun_proto::types::prelude::MessageWrite;
17use turn_client_proto::api::{DelayedTransmitBuild, TurnClientApi};
18
19use crate::candidate::{CandidatePair, CandidateType, TransportType};
20
21use crate::agent::{Agent, AgentError};
22use crate::conncheck::transmit_send;
23pub use crate::conncheck::SelectedPair;
24use crate::gathering::StunGatherer;
25use turn_client_proto::types::TurnCredentials;
26
27pub const RTP: usize = 1;
28pub const RTCP: usize = 2;
29
30/// A [`Component`] in an ICE [`Stream`](crate::stream::Stream)
31#[repr(C)]
32pub struct Component<'a> {
33    agent: &'a Agent,
34    stream_id: usize,
35    component_id: usize,
36}
37
38impl<'a> Component<'a> {
39    pub(crate) fn from_stream(agent: &'a Agent, stream_id: usize, component_id: usize) -> Self {
40        Self {
41            agent,
42            stream_id,
43            component_id,
44        }
45    }
46
47    /// The component identifier within a particular ICE [`Stream`](crate::stream::Stream)
48    pub fn id(&self) -> usize {
49        self.component_id
50    }
51
52    /// Retrieve the current state of a `Component`
53    ///
54    /// # Examples
55    ///
56    /// The initial state is `ComponentState::New`
57    ///
58    /// ```
59    /// # use rice_proto::component::{Component, ComponentConnectionState};
60    /// # use rice_proto::agent::Agent;
61    /// # use rice_proto::stream::Stream;
62    /// let mut agent = Agent::default();
63    /// let stream_id = agent.add_stream();
64    /// let mut stream = agent.mut_stream(stream_id).unwrap();
65    /// let component_id = stream.add_component().unwrap();
66    /// let component = stream.component(component_id).unwrap();
67    /// assert_eq!(component.state(), ComponentConnectionState::New);
68    /// ```
69    pub fn state(&self) -> ComponentConnectionState {
70        let stream = self.agent.stream_state(self.stream_id).unwrap();
71        let component = stream.component_state(self.component_id).unwrap();
72        component.state
73    }
74
75    /// The [`CandidatePair`] this component has selected to send/receive data with.  This will not
76    /// be valid until the [`Component`] has reached [`ComponentConnectionState::Connected`]
77    pub fn selected_pair(&self) -> Option<&CandidatePair> {
78        let stream = self.agent.stream_state(self.stream_id).unwrap();
79        let component = stream.component_state(self.component_id).unwrap();
80        component
81            .selected_pair
82            .as_ref()
83            .map(|pair| pair.candidate_pair())
84    }
85}
86
87/// A mutable component in an ICE [`Stream`](crate::stream::Stream)
88#[repr(C)]
89pub struct ComponentMut<'a> {
90    agent: &'a mut Agent,
91    stream_id: usize,
92    component_id: usize,
93}
94
95impl<'a> std::ops::Deref for ComponentMut<'a> {
96    type Target = Component<'a>;
97
98    fn deref(&self) -> &Self::Target {
99        unsafe { std::mem::transmute(self) }
100    }
101}
102
103impl<'a> ComponentMut<'a> {
104    pub(crate) fn from_stream(agent: &'a mut Agent, stream_id: usize, component_id: usize) -> Self {
105        Self {
106            agent,
107            stream_id,
108            component_id,
109        }
110    }
111
112    /// Start gathering candidates for this component.  The parent
113    /// [`Agent::poll`](crate::agent::Agent::poll) is used to progress
114    /// the gathering.
115    pub fn gather_candidates(
116        &mut self,
117        sockets: Vec<(TransportType, SocketAddr)>,
118        stun_servers: Vec<(TransportType, SocketAddr)>,
119        turn_servers: Vec<(TransportType, SocketAddr, TurnCredentials)>,
120    ) -> Result<(), AgentError> {
121        let stream = self.agent.mut_stream_state(self.stream_id).unwrap();
122        let component = stream.mut_component_state(self.component_id).unwrap();
123        component.gather_candidates(sockets, stun_servers, turn_servers)
124    }
125
126    /// Set the pair that will be used to send/receive data.  This will override the ICE
127    /// negotiation chosen value.
128    pub fn set_selected_pair(&mut self, selected: CandidatePair) -> Result<(), AgentError> {
129        let stream = self.agent.mut_stream_state(self.stream_id).unwrap();
130        let checklist_id = stream.checklist_id;
131        let checklist = self
132            .agent
133            .checklistset
134            .mut_list(checklist_id)
135            .ok_or(AgentError::ResourceNotFound)?;
136        let (agent_id, agent) = if let Some((agent_id, agent)) = checklist.mut_agent_for_5tuple(
137            selected.local.transport_type,
138            selected.local.base_address,
139            selected.remote.address,
140        ) {
141            (agent_id, agent)
142        } else {
143            let agent_id = checklist
144                .add_agent_for_5tuple(
145                    selected.local.transport_type,
146                    selected.local.base_address,
147                    selected.remote.address,
148                )
149                .0;
150            let agent = checklist.mut_agent_by_id(agent_id).unwrap();
151            (agent_id, agent)
152        };
153        if !agent.is_validated_peer(selected.remote.address) {
154            // ensure that we can receive from the provided remote address.
155            let transmit = agent
156                .send_request(
157                    Message::builder_request(BINDING, MessageWriteVec::new()).finish(),
158                    selected.remote.address,
159                    Instant::now(),
160                )
161                .unwrap();
162            let msg = Message::from_bytes(&transmit.data).unwrap();
163            let response = Message::builder_success(&msg, MessageWriteVec::new()).finish();
164            let response = Message::from_bytes(&response).unwrap();
165            agent.handle_stun(response, selected.remote.address);
166        }
167
168        let selected_pair = SelectedPair::new(selected, agent_id);
169        self.set_selected_pair_with_agent(selected_pair);
170        Ok(())
171    }
172
173    pub(crate) fn set_selected_pair_with_agent(&mut self, selected: SelectedPair) {
174        let stream = self.agent.mut_stream_state(self.stream_id).unwrap();
175        let component = stream.mut_component_state(self.component_id).unwrap();
176        component.selected_pair = Some(selected);
177    }
178
179    /// Send data to the peer using the selected pair.  This will not succeed until the
180    /// [`Component`] has reached [`ComponentConnectionState::Connected`]
181    pub fn send<T: AsRef<[u8]> + std::fmt::Debug>(
182        &mut self,
183        data: T,
184        now: Instant,
185    ) -> Result<Transmit<Box<[u8]>>, AgentError> {
186        // TODO: store statistics about bytes/packets sent
187        let stream = self.agent.stream_state(self.stream_id).unwrap();
188        let checklist_id = stream.checklist_id;
189        let component = stream.component_state(self.component_id).unwrap();
190        let selected_pair = component
191            .selected_pair
192            .as_ref()
193            .ok_or(AgentError::ResourceNotFound)?;
194        let pair = selected_pair.candidate_pair();
195        let local_candidate_type = pair.local.candidate_type;
196        let local_transport = pair.local.transport_type;
197        let local_addr = pair.local.address;
198        let remote_addr = pair.remote.address;
199        let stun_agent_id = selected_pair.stun_agent_id();
200
201        let data_len = data.as_ref().len();
202
203        let checklist = self.agent.checklistset.mut_list(checklist_id).unwrap();
204        if local_candidate_type == CandidateType::Relayed {
205            let turn_client = checklist
206                .mut_turn_client_by_allocated_address(local_transport, local_addr)
207                .ok_or(AgentError::ResourceNotFound)?
208                .1;
209            let transmit = turn_client
210                .send_to(local_transport, remote_addr, data, now)
211                .map_err(|_| AgentError::ResourceNotFound)?
212                .unwrap();
213            trace!(
214                "sending {} bytes from {} {} through TURN server {} with allocation {local_transport} {local_addr} to {remote_addr}",
215                data_len, transmit.transport, transmit.from, transmit.to,
216            );
217            Ok(Transmit::new(
218                transmit.data.build().into_boxed_slice(),
219                transmit.transport,
220                transmit.from,
221                transmit.to,
222            ))
223        } else {
224            let stun_agent = checklist
225                .agent_by_id(stun_agent_id)
226                .ok_or(AgentError::ResourceNotFound)?;
227            trace!(
228                "sending {} bytes directly over {local_transport} {local_addr} -> {remote_addr}",
229                data_len
230            );
231            let transmit = stun_agent.send_data(data, remote_addr);
232            let transport = transmit.transport;
233            Ok(transmit.reinterpret_data(|data| transmit_send(transport, data.as_ref())))
234        }
235    }
236}
237
238#[derive(Debug, Default, PartialEq, Eq)]
239pub(crate) enum GatherProgress {
240    #[default]
241    New,
242    InProgress,
243    Completed,
244}
245
246/// The state of a component
247#[derive(Copy, Clone, Debug, PartialEq, Eq)]
248#[repr(C)]
249pub enum ComponentConnectionState {
250    /// Component is in initial state and no connectivity checks are in progress.
251    New,
252    /// Connectivity checks are in progress for this candidate
253    Connecting,
254    /// A [`CandidatePair`](crate::candidate::CandidatePair`) has been selected for this component
255    Connected,
256    /// No connection could be found for this Component
257    Failed,
258}
259
260#[derive(Debug)]
261pub(crate) struct ComponentState {
262    pub(crate) id: usize,
263    state: ComponentConnectionState,
264    selected_pair: Option<SelectedPair>,
265    pub(crate) gather_state: GatherProgress,
266    pub(crate) gatherer: Option<StunGatherer>,
267}
268
269impl ComponentState {
270    pub(crate) fn new(id: usize) -> Self {
271        Self {
272            id,
273            state: ComponentConnectionState::New,
274            selected_pair: None,
275            gather_state: GatherProgress::New,
276            gatherer: None,
277        }
278    }
279
280    pub(crate) fn gather_candidates(
281        &mut self,
282        sockets: Vec<(TransportType, SocketAddr)>,
283        stun_servers: Vec<(TransportType, SocketAddr)>,
284        turn_servers: Vec<(TransportType, SocketAddr, TurnCredentials)>,
285    ) -> Result<(), AgentError> {
286        if self.gather_state != GatherProgress::New {
287            return Err(AgentError::AlreadyInProgress);
288        }
289
290        self.gatherer = Some(StunGatherer::new(
291            self.id,
292            sockets,
293            stun_servers,
294            turn_servers,
295        ));
296        self.gather_state = GatherProgress::InProgress;
297
298        Ok(())
299    }
300
301    #[tracing::instrument(name = "set_component_state", level = "debug", skip(self, state))]
302    pub(crate) fn set_state(&mut self, state: ComponentConnectionState) -> bool {
303        if self.state != state {
304            debug!(old_state = ?self.state, new_state = ?state, "setting");
305            self.state = state;
306            true
307        } else {
308            false
309        }
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use crate::agent::Agent;
317    use crate::candidate::Candidate;
318
319    #[test]
320    fn initial_state_new() {
321        let _log = crate::tests::test_init_log();
322        let mut agent = Agent::builder().build();
323        let sid = agent.add_stream();
324        let mut s = agent.mut_stream(sid).unwrap();
325        let cid = s.add_component().unwrap();
326        let c = s.component(cid).unwrap();
327        assert_eq!(c.state(), ComponentConnectionState::New);
328    }
329
330    #[test]
331    fn send_recv() {
332        let _log = crate::tests::test_init_log();
333        let mut agent = Agent::builder().controlling(false).build();
334        let stream_id = agent.add_stream();
335        let mut stream = agent.mut_stream(stream_id).unwrap();
336        let send_id = stream.add_component().unwrap();
337        let local_addr = "127.0.0.1:1000".parse().unwrap();
338        let remote_addr = "127.0.0.1:2000".parse().unwrap();
339        let now = Instant::now();
340
341        let local_cand = Candidate::builder(
342            send_id,
343            CandidateType::Host,
344            TransportType::Udp,
345            "0",
346            local_addr,
347        )
348        .build();
349        let remote_cand = Candidate::builder(
350            send_id,
351            CandidateType::Host,
352            TransportType::Udp,
353            "0",
354            remote_addr,
355        )
356        .build();
357        let candidate_pair = CandidatePair::new(local_cand, remote_cand);
358
359        let mut send = stream.mut_component(send_id).unwrap();
360        send.set_selected_pair(candidate_pair.clone()).unwrap();
361        assert_eq!(send.selected_pair().unwrap(), &candidate_pair);
362
363        let data = vec![3; 4];
364        let transmit = send.send(&data, now).unwrap();
365        assert_eq!(transmit.transport, TransportType::Udp);
366        assert_eq!(transmit.from, local_addr);
367        assert_eq!(transmit.to, remote_addr);
368        assert_eq!(transmit.data.as_ref(), data.as_slice());
369
370        let recved_data = vec![7; 6];
371        let ret = stream.handle_incoming_data(
372            send_id,
373            Transmit::new(
374                recved_data.clone(),
375                TransportType::Udp,
376                remote_addr,
377                local_addr,
378            ),
379            now,
380        );
381        assert_eq!(recved_data.as_slice(), ret.data.unwrap().as_ref());
382        assert!(!ret.handled);
383        assert!(!ret.have_more_data);
384
385        // Unknown remote is ignored
386        let recved_data2 = vec![9; 12];
387        let ret = stream.handle_incoming_data(
388            send_id,
389            Transmit::new(recved_data2, TransportType::Udp, local_addr, local_addr),
390            now,
391        );
392        assert!(ret.data.is_none());
393        assert!(!ret.handled);
394        assert!(!ret.have_more_data);
395    }
396}