webrtc_ice/agent/
agent_transport.rs

1use std::io;
2use std::sync::atomic::Ordering;
3
4use arc_swap::ArcSwapOption;
5use async_trait::async_trait;
6use portable_atomic::AtomicBool;
7use util::Conn;
8
9use super::*;
10use crate::error::*;
11
12impl Agent {
13    /// Connects to the remote agent, acting as the controlling ice agent.
14    /// The method blocks until at least one ice candidate pair has successfully connected.
15    ///
16    /// The operation will be cancelled if `cancel_rx` either receives a message or its channel
17    /// closes.
18    pub async fn dial(
19        &self,
20        mut cancel_rx: mpsc::Receiver<()>,
21        remote_ufrag: String,
22        remote_pwd: String,
23    ) -> Result<Arc<impl Conn>> {
24        let (on_connected_rx, agent_conn) = {
25            self.internal
26                .start_connectivity_checks(true, remote_ufrag, remote_pwd)
27                .await?;
28
29            let mut on_connected_rx = self.internal.on_connected_rx.lock().await;
30            (
31                on_connected_rx.take(),
32                Arc::clone(&self.internal.agent_conn),
33            )
34        };
35
36        if let Some(mut on_connected_rx) = on_connected_rx {
37            // block until pair selected
38            tokio::select! {
39                _ = on_connected_rx.recv() => {},
40                _ = cancel_rx.recv() => {
41                    return Err(Error::ErrCanceledByCaller);
42                }
43            }
44        }
45        Ok(agent_conn)
46    }
47
48    /// Connects to the remote agent, acting as the controlled ice agent.
49    /// The method blocks until at least one ice candidate pair has successfully connected.
50    ///
51    /// The operation will be cancelled if `cancel_rx` either receives a message or its channel
52    /// closes.
53    pub async fn accept(
54        &self,
55        mut cancel_rx: mpsc::Receiver<()>,
56        remote_ufrag: String,
57        remote_pwd: String,
58    ) -> Result<Arc<impl Conn>> {
59        let (on_connected_rx, agent_conn) = {
60            self.internal
61                .start_connectivity_checks(false, remote_ufrag, remote_pwd)
62                .await?;
63
64            let mut on_connected_rx = self.internal.on_connected_rx.lock().await;
65            (
66                on_connected_rx.take(),
67                Arc::clone(&self.internal.agent_conn),
68            )
69        };
70
71        if let Some(mut on_connected_rx) = on_connected_rx {
72            // block until pair selected
73            tokio::select! {
74                _ = on_connected_rx.recv() => {},
75                _ = cancel_rx.recv() => {
76                    return Err(Error::ErrCanceledByCaller);
77                }
78            }
79        }
80
81        Ok(agent_conn)
82    }
83}
84
85pub(crate) struct AgentConn {
86    pub(crate) selected_pair: ArcSwapOption<CandidatePair>,
87    pub(crate) checklist: Mutex<Vec<Arc<CandidatePair>>>,
88
89    pub(crate) buffer: Buffer,
90    pub(crate) bytes_received: AtomicUsize,
91    pub(crate) bytes_sent: AtomicUsize,
92    pub(crate) done: AtomicBool,
93}
94
95impl AgentConn {
96    pub(crate) fn new() -> Self {
97        Self {
98            selected_pair: ArcSwapOption::empty(),
99            checklist: Mutex::new(vec![]),
100            // Make sure the buffer doesn't grow indefinitely.
101            // NOTE: We actually won't get anywhere close to this limit.
102            // SRTP will constantly read from the endpoint and drop packets if it's full.
103            buffer: Buffer::new(0, MAX_BUFFER_SIZE),
104            bytes_received: AtomicUsize::new(0),
105            bytes_sent: AtomicUsize::new(0),
106            done: AtomicBool::new(false),
107        }
108    }
109    pub(crate) fn get_selected_pair(&self) -> Option<Arc<CandidatePair>> {
110        self.selected_pair.load().clone()
111    }
112
113    pub(crate) async fn get_best_available_candidate_pair(&self) -> Option<Arc<CandidatePair>> {
114        let mut best: Option<&Arc<CandidatePair>> = None;
115
116        let checklist = self.checklist.lock().await;
117        for p in &*checklist {
118            if p.state.load(Ordering::SeqCst) == CandidatePairState::Failed as u8 {
119                continue;
120            }
121
122            if let Some(b) = &mut best {
123                if b.priority() < p.priority() {
124                    *b = p;
125                }
126            } else {
127                best = Some(p);
128            }
129        }
130
131        best.cloned()
132    }
133
134    pub(crate) async fn get_best_valid_candidate_pair(&self) -> Option<Arc<CandidatePair>> {
135        let mut best: Option<&Arc<CandidatePair>> = None;
136
137        let checklist = self.checklist.lock().await;
138        for p in &*checklist {
139            if p.state.load(Ordering::SeqCst) != CandidatePairState::Succeeded as u8 {
140                continue;
141            }
142
143            if let Some(b) = &mut best {
144                if b.priority() < p.priority() {
145                    *b = p;
146                }
147            } else {
148                best = Some(p);
149            }
150        }
151
152        best.cloned()
153    }
154
155    /// Returns the number of bytes sent.
156    pub fn bytes_sent(&self) -> usize {
157        self.bytes_sent.load(Ordering::SeqCst)
158    }
159
160    /// Returns the number of bytes received.
161    pub fn bytes_received(&self) -> usize {
162        self.bytes_received.load(Ordering::SeqCst)
163    }
164}
165
166#[async_trait]
167impl Conn for AgentConn {
168    async fn connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error> {
169        Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into())
170    }
171
172    async fn recv(&self, buf: &mut [u8]) -> std::result::Result<usize, util::Error> {
173        if self.done.load(Ordering::SeqCst) {
174            return Err(io::Error::new(io::ErrorKind::Other, "Conn is closed").into());
175        }
176
177        let n = match self.buffer.read(buf, None).await {
178            Ok(n) => n,
179            Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err.to_string()).into()),
180        };
181        self.bytes_received.fetch_add(n, Ordering::SeqCst);
182
183        Ok(n)
184    }
185
186    async fn recv_from(
187        &self,
188        buf: &mut [u8],
189    ) -> std::result::Result<(usize, SocketAddr), util::Error> {
190        if let Some(raddr) = self.remote_addr() {
191            let n = self.recv(buf).await?;
192            Ok((n, raddr))
193        } else {
194            Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into())
195        }
196    }
197
198    async fn send(&self, buf: &[u8]) -> std::result::Result<usize, util::Error> {
199        if self.done.load(Ordering::SeqCst) {
200            return Err(io::Error::new(io::ErrorKind::Other, "Conn is closed").into());
201        }
202
203        if is_message(buf) {
204            return Err(util::Error::Other("ErrIceWriteStunMessage".into()));
205        }
206
207        let result = if let Some(pair) = self.get_selected_pair() {
208            pair.write(buf).await
209        } else if let Some(pair) = self.get_best_available_candidate_pair().await {
210            pair.write(buf).await
211        } else {
212            Ok(0)
213        };
214
215        match result {
216            Ok(n) => {
217                self.bytes_sent.fetch_add(buf.len(), Ordering::SeqCst);
218                Ok(n)
219            }
220            Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string()).into()),
221        }
222    }
223
224    async fn send_to(
225        &self,
226        _buf: &[u8],
227        _target: SocketAddr,
228    ) -> std::result::Result<usize, util::Error> {
229        Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into())
230    }
231
232    fn local_addr(&self) -> std::result::Result<SocketAddr, util::Error> {
233        if let Some(pair) = self.get_selected_pair() {
234            Ok(pair.local.addr())
235        } else {
236            Err(io::Error::new(io::ErrorKind::AddrNotAvailable, "Addr Not Available").into())
237        }
238    }
239
240    fn remote_addr(&self) -> Option<SocketAddr> {
241        self.get_selected_pair().map(|pair| pair.remote.addr())
242    }
243
244    async fn close(&self) -> std::result::Result<(), util::Error> {
245        Ok(())
246    }
247
248    fn as_any(&self) -> &(dyn std::any::Any + Send + Sync) {
249        self
250    }
251}