1use std::net::SocketAddr;
12use std::time::Instant;
13
14use stun_proto::agent::Transmit;
15use stun_proto::types::message::{Message, MessageWriteVec, BINDING};
16use stun_proto::types::prelude::MessageWrite;
17use turn_client_proto::api::{DelayedTransmitBuild, TurnClientApi};
18
19use crate::candidate::{CandidatePair, CandidateType, TransportType};
20
21use crate::agent::{Agent, AgentError};
22use crate::conncheck::transmit_send;
23pub use crate::conncheck::SelectedPair;
24use crate::gathering::StunGatherer;
25use turn_client_proto::types::TurnCredentials;
26
27pub const RTP: usize = 1;
28pub const RTCP: usize = 2;
29
30#[repr(C)]
32pub struct Component<'a> {
33 agent: &'a Agent,
34 stream_id: usize,
35 component_id: usize,
36}
37
38impl<'a> Component<'a> {
39 pub(crate) fn from_stream(agent: &'a Agent, stream_id: usize, component_id: usize) -> Self {
40 Self {
41 agent,
42 stream_id,
43 component_id,
44 }
45 }
46
47 pub fn id(&self) -> usize {
49 self.component_id
50 }
51
52 pub fn state(&self) -> ComponentConnectionState {
70 let stream = self.agent.stream_state(self.stream_id).unwrap();
71 let component = stream.component_state(self.component_id).unwrap();
72 component.state
73 }
74
75 pub fn selected_pair(&self) -> Option<&CandidatePair> {
78 let stream = self.agent.stream_state(self.stream_id).unwrap();
79 let component = stream.component_state(self.component_id).unwrap();
80 component
81 .selected_pair
82 .as_ref()
83 .map(|pair| pair.candidate_pair())
84 }
85}
86
87#[repr(C)]
89pub struct ComponentMut<'a> {
90 agent: &'a mut Agent,
91 stream_id: usize,
92 component_id: usize,
93}
94
95impl<'a> std::ops::Deref for ComponentMut<'a> {
96 type Target = Component<'a>;
97
98 fn deref(&self) -> &Self::Target {
99 unsafe { std::mem::transmute(self) }
100 }
101}
102
103impl<'a> ComponentMut<'a> {
104 pub(crate) fn from_stream(agent: &'a mut Agent, stream_id: usize, component_id: usize) -> Self {
105 Self {
106 agent,
107 stream_id,
108 component_id,
109 }
110 }
111
112 pub fn gather_candidates(
116 &mut self,
117 sockets: Vec<(TransportType, SocketAddr)>,
118 stun_servers: Vec<(TransportType, SocketAddr)>,
119 turn_servers: Vec<(TransportType, SocketAddr, TurnCredentials)>,
120 ) -> Result<(), AgentError> {
121 let stream = self.agent.mut_stream_state(self.stream_id).unwrap();
122 let component = stream.mut_component_state(self.component_id).unwrap();
123 component.gather_candidates(sockets, stun_servers, turn_servers)
124 }
125
126 pub fn set_selected_pair(&mut self, selected: CandidatePair) -> Result<(), AgentError> {
129 let stream = self.agent.mut_stream_state(self.stream_id).unwrap();
130 let checklist_id = stream.checklist_id;
131 let checklist = self
132 .agent
133 .checklistset
134 .mut_list(checklist_id)
135 .ok_or(AgentError::ResourceNotFound)?;
136 let (agent_id, agent) = if let Some((agent_id, agent)) = checklist.mut_agent_for_5tuple(
137 selected.local.transport_type,
138 selected.local.base_address,
139 selected.remote.address,
140 ) {
141 (agent_id, agent)
142 } else {
143 let agent_id = checklist
144 .add_agent_for_5tuple(
145 selected.local.transport_type,
146 selected.local.base_address,
147 selected.remote.address,
148 )
149 .0;
150 let agent = checklist.mut_agent_by_id(agent_id).unwrap();
151 (agent_id, agent)
152 };
153 if !agent.is_validated_peer(selected.remote.address) {
154 let transmit = agent
156 .send_request(
157 Message::builder_request(BINDING, MessageWriteVec::new()).finish(),
158 selected.remote.address,
159 Instant::now(),
160 )
161 .unwrap();
162 let msg = Message::from_bytes(&transmit.data).unwrap();
163 let response = Message::builder_success(&msg, MessageWriteVec::new()).finish();
164 let response = Message::from_bytes(&response).unwrap();
165 agent.handle_stun(response, selected.remote.address);
166 }
167
168 let selected_pair = SelectedPair::new(selected, agent_id);
169 self.set_selected_pair_with_agent(selected_pair);
170 Ok(())
171 }
172
173 pub(crate) fn set_selected_pair_with_agent(&mut self, selected: SelectedPair) {
174 let stream = self.agent.mut_stream_state(self.stream_id).unwrap();
175 let component = stream.mut_component_state(self.component_id).unwrap();
176 component.selected_pair = Some(selected);
177 }
178
179 pub fn send<T: AsRef<[u8]> + std::fmt::Debug>(
182 &mut self,
183 data: T,
184 now: Instant,
185 ) -> Result<Transmit<Box<[u8]>>, AgentError> {
186 let stream = self.agent.stream_state(self.stream_id).unwrap();
188 let checklist_id = stream.checklist_id;
189 let component = stream.component_state(self.component_id).unwrap();
190 let selected_pair = component
191 .selected_pair
192 .as_ref()
193 .ok_or(AgentError::ResourceNotFound)?;
194 let pair = selected_pair.candidate_pair();
195 let local_candidate_type = pair.local.candidate_type;
196 let local_transport = pair.local.transport_type;
197 let local_addr = pair.local.address;
198 let remote_addr = pair.remote.address;
199 let stun_agent_id = selected_pair.stun_agent_id();
200
201 let data_len = data.as_ref().len();
202
203 let checklist = self.agent.checklistset.mut_list(checklist_id).unwrap();
204 if local_candidate_type == CandidateType::Relayed {
205 let turn_client = checklist
206 .mut_turn_client_by_allocated_address(local_transport, local_addr)
207 .ok_or(AgentError::ResourceNotFound)?
208 .1;
209 let transmit = turn_client
210 .send_to(local_transport, remote_addr, data, now)
211 .map_err(|_| AgentError::ResourceNotFound)?
212 .unwrap();
213 trace!(
214 "sending {} bytes from {} {} through TURN server {} with allocation {local_transport} {local_addr} to {remote_addr}",
215 data_len, transmit.transport, transmit.from, transmit.to,
216 );
217 Ok(Transmit::new(
218 transmit.data.build().into_boxed_slice(),
219 transmit.transport,
220 transmit.from,
221 transmit.to,
222 ))
223 } else {
224 let stun_agent = checklist
225 .agent_by_id(stun_agent_id)
226 .ok_or(AgentError::ResourceNotFound)?;
227 trace!(
228 "sending {} bytes directly over {local_transport} {local_addr} -> {remote_addr}",
229 data_len
230 );
231 let transmit = stun_agent.send_data(data, remote_addr);
232 let transport = transmit.transport;
233 Ok(transmit.reinterpret_data(|data| transmit_send(transport, data.as_ref())))
234 }
235 }
236}
237
238#[derive(Debug, Default, PartialEq, Eq)]
239pub(crate) enum GatherProgress {
240 #[default]
241 New,
242 InProgress,
243 Completed,
244}
245
246#[derive(Copy, Clone, Debug, PartialEq, Eq)]
248#[repr(C)]
249pub enum ComponentConnectionState {
250 New,
252 Connecting,
254 Connected,
256 Failed,
258}
259
260#[derive(Debug)]
261pub(crate) struct ComponentState {
262 pub(crate) id: usize,
263 state: ComponentConnectionState,
264 selected_pair: Option<SelectedPair>,
265 pub(crate) gather_state: GatherProgress,
266 pub(crate) gatherer: Option<StunGatherer>,
267}
268
269impl ComponentState {
270 pub(crate) fn new(id: usize) -> Self {
271 Self {
272 id,
273 state: ComponentConnectionState::New,
274 selected_pair: None,
275 gather_state: GatherProgress::New,
276 gatherer: None,
277 }
278 }
279
280 pub(crate) fn gather_candidates(
281 &mut self,
282 sockets: Vec<(TransportType, SocketAddr)>,
283 stun_servers: Vec<(TransportType, SocketAddr)>,
284 turn_servers: Vec<(TransportType, SocketAddr, TurnCredentials)>,
285 ) -> Result<(), AgentError> {
286 if self.gather_state != GatherProgress::New {
287 return Err(AgentError::AlreadyInProgress);
288 }
289
290 self.gatherer = Some(StunGatherer::new(
291 self.id,
292 sockets,
293 stun_servers,
294 turn_servers,
295 ));
296 self.gather_state = GatherProgress::InProgress;
297
298 Ok(())
299 }
300
301 #[tracing::instrument(name = "set_component_state", level = "debug", skip(self, state))]
302 pub(crate) fn set_state(&mut self, state: ComponentConnectionState) -> bool {
303 if self.state != state {
304 debug!(old_state = ?self.state, new_state = ?state, "setting");
305 self.state = state;
306 true
307 } else {
308 false
309 }
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316 use crate::agent::Agent;
317 use crate::candidate::Candidate;
318
319 #[test]
320 fn initial_state_new() {
321 let _log = crate::tests::test_init_log();
322 let mut agent = Agent::builder().build();
323 let sid = agent.add_stream();
324 let mut s = agent.mut_stream(sid).unwrap();
325 let cid = s.add_component().unwrap();
326 let c = s.component(cid).unwrap();
327 assert_eq!(c.state(), ComponentConnectionState::New);
328 }
329
330 #[test]
331 fn send_recv() {
332 let _log = crate::tests::test_init_log();
333 let mut agent = Agent::builder().controlling(false).build();
334 let stream_id = agent.add_stream();
335 let mut stream = agent.mut_stream(stream_id).unwrap();
336 let send_id = stream.add_component().unwrap();
337 let local_addr = "127.0.0.1:1000".parse().unwrap();
338 let remote_addr = "127.0.0.1:2000".parse().unwrap();
339 let now = Instant::now();
340
341 let local_cand = Candidate::builder(
342 send_id,
343 CandidateType::Host,
344 TransportType::Udp,
345 "0",
346 local_addr,
347 )
348 .build();
349 let remote_cand = Candidate::builder(
350 send_id,
351 CandidateType::Host,
352 TransportType::Udp,
353 "0",
354 remote_addr,
355 )
356 .build();
357 let candidate_pair = CandidatePair::new(local_cand, remote_cand);
358
359 let mut send = stream.mut_component(send_id).unwrap();
360 send.set_selected_pair(candidate_pair.clone()).unwrap();
361 assert_eq!(send.selected_pair().unwrap(), &candidate_pair);
362
363 let data = vec![3; 4];
364 let transmit = send.send(&data, now).unwrap();
365 assert_eq!(transmit.transport, TransportType::Udp);
366 assert_eq!(transmit.from, local_addr);
367 assert_eq!(transmit.to, remote_addr);
368 assert_eq!(transmit.data.as_ref(), data.as_slice());
369
370 let recved_data = vec![7; 6];
371 let ret = stream.handle_incoming_data(
372 send_id,
373 Transmit::new(
374 recved_data.clone(),
375 TransportType::Udp,
376 remote_addr,
377 local_addr,
378 ),
379 now,
380 );
381 assert_eq!(recved_data.as_slice(), ret.data.unwrap().as_ref());
382 assert!(!ret.handled);
383 assert!(!ret.have_more_data);
384
385 let recved_data2 = vec![9; 12];
387 let ret = stream.handle_incoming_data(
388 send_id,
389 Transmit::new(recved_data2, TransportType::Udp, local_addr, local_addr),
390 now,
391 );
392 assert!(ret.data.is_none());
393 assert!(!ret.handled);
394 assert!(!ret.have_more_data);
395 }
396}