webrtc_ice/agent/
agent_transport.rs1use std::io;
2use std::sync::atomic::Ordering;
3
4use arc_swap::ArcSwapOption;
5use async_trait::async_trait;
6use portable_atomic::AtomicBool;
7use util::Conn;
8
9use super::*;
10use crate::error::*;
11
12impl Agent {
13 pub async fn dial(
19 &self,
20 mut cancel_rx: mpsc::Receiver<()>,
21 remote_ufrag: String,
22 remote_pwd: String,
23 ) -> Result<Arc<impl Conn>> {
24 let (on_connected_rx, agent_conn) = {
25 self.internal
26 .start_connectivity_checks(true, remote_ufrag, remote_pwd)
27 .await?;
28
29 let mut on_connected_rx = self.internal.on_connected_rx.lock().await;
30 (
31 on_connected_rx.take(),
32 Arc::clone(&self.internal.agent_conn),
33 )
34 };
35
36 if let Some(mut on_connected_rx) = on_connected_rx {
37 tokio::select! {
39 _ = on_connected_rx.recv() => {},
40 _ = cancel_rx.recv() => {
41 return Err(Error::ErrCanceledByCaller);
42 }
43 }
44 }
45 Ok(agent_conn)
46 }
47
48 pub async fn accept(
54 &self,
55 mut cancel_rx: mpsc::Receiver<()>,
56 remote_ufrag: String,
57 remote_pwd: String,
58 ) -> Result<Arc<impl Conn>> {
59 let (on_connected_rx, agent_conn) = {
60 self.internal
61 .start_connectivity_checks(false, remote_ufrag, remote_pwd)
62 .await?;
63
64 let mut on_connected_rx = self.internal.on_connected_rx.lock().await;
65 (
66 on_connected_rx.take(),
67 Arc::clone(&self.internal.agent_conn),
68 )
69 };
70
71 if let Some(mut on_connected_rx) = on_connected_rx {
72 tokio::select! {
74 _ = on_connected_rx.recv() => {},
75 _ = cancel_rx.recv() => {
76 return Err(Error::ErrCanceledByCaller);
77 }
78 }
79 }
80
81 Ok(agent_conn)
82 }
83}
84
85pub(crate) struct AgentConn {
86 pub(crate) selected_pair: ArcSwapOption<CandidatePair>,
87 pub(crate) checklist: Mutex<Vec<Arc<CandidatePair>>>,
88
89 pub(crate) buffer: Buffer,
90 pub(crate) bytes_received: AtomicUsize,
91 pub(crate) bytes_sent: AtomicUsize,
92 pub(crate) done: AtomicBool,
93}
94
95impl AgentConn {
96 pub(crate) fn new() -> Self {
97 Self {
98 selected_pair: ArcSwapOption::empty(),
99 checklist: Mutex::new(vec![]),
100 buffer: Buffer::new(0, MAX_BUFFER_SIZE),
104 bytes_received: AtomicUsize::new(0),
105 bytes_sent: AtomicUsize::new(0),
106 done: AtomicBool::new(false),
107 }
108 }
109 pub(crate) fn get_selected_pair(&self) -> Option<Arc<CandidatePair>> {
110 self.selected_pair.load().clone()
111 }
112
113 pub(crate) async fn get_best_available_candidate_pair(&self) -> Option<Arc<CandidatePair>> {
114 let mut best: Option<&Arc<CandidatePair>> = None;
115
116 let checklist = self.checklist.lock().await;
117 for p in &*checklist {
118 if p.state.load(Ordering::SeqCst) == CandidatePairState::Failed as u8 {
119 continue;
120 }
121
122 if let Some(b) = &mut best {
123 if b.priority() < p.priority() {
124 *b = p;
125 }
126 } else {
127 best = Some(p);
128 }
129 }
130
131 best.cloned()
132 }
133
134 pub(crate) async fn get_best_valid_candidate_pair(&self) -> Option<Arc<CandidatePair>> {
135 let mut best: Option<&Arc<CandidatePair>> = None;
136
137 let checklist = self.checklist.lock().await;
138 for p in &*checklist {
139 if p.state.load(Ordering::SeqCst) != CandidatePairState::Succeeded as u8 {
140 continue;
141 }
142
143 if let Some(b) = &mut best {
144 if b.priority() < p.priority() {
145 *b = p;
146 }
147 } else {
148 best = Some(p);
149 }
150 }
151
152 best.cloned()
153 }
154
155 pub fn bytes_sent(&self) -> usize {
157 self.bytes_sent.load(Ordering::SeqCst)
158 }
159
160 pub fn bytes_received(&self) -> usize {
162 self.bytes_received.load(Ordering::SeqCst)
163 }
164}
165
166#[async_trait]
167impl Conn for AgentConn {
168 async fn connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error> {
169 Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into())
170 }
171
172 async fn recv(&self, buf: &mut [u8]) -> std::result::Result<usize, util::Error> {
173 if self.done.load(Ordering::SeqCst) {
174 return Err(io::Error::new(io::ErrorKind::Other, "Conn is closed").into());
175 }
176
177 let n = match self.buffer.read(buf, None).await {
178 Ok(n) => n,
179 Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err.to_string()).into()),
180 };
181 self.bytes_received.fetch_add(n, Ordering::SeqCst);
182
183 Ok(n)
184 }
185
186 async fn recv_from(
187 &self,
188 buf: &mut [u8],
189 ) -> std::result::Result<(usize, SocketAddr), util::Error> {
190 if let Some(raddr) = self.remote_addr() {
191 let n = self.recv(buf).await?;
192 Ok((n, raddr))
193 } else {
194 Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into())
195 }
196 }
197
198 async fn send(&self, buf: &[u8]) -> std::result::Result<usize, util::Error> {
199 if self.done.load(Ordering::SeqCst) {
200 return Err(io::Error::new(io::ErrorKind::Other, "Conn is closed").into());
201 }
202
203 if is_message(buf) {
204 return Err(util::Error::Other("ErrIceWriteStunMessage".into()));
205 }
206
207 let result = if let Some(pair) = self.get_selected_pair() {
208 pair.write(buf).await
209 } else if let Some(pair) = self.get_best_available_candidate_pair().await {
210 pair.write(buf).await
211 } else {
212 Ok(0)
213 };
214
215 match result {
216 Ok(n) => {
217 self.bytes_sent.fetch_add(buf.len(), Ordering::SeqCst);
218 Ok(n)
219 }
220 Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string()).into()),
221 }
222 }
223
224 async fn send_to(
225 &self,
226 _buf: &[u8],
227 _target: SocketAddr,
228 ) -> std::result::Result<usize, util::Error> {
229 Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into())
230 }
231
232 fn local_addr(&self) -> std::result::Result<SocketAddr, util::Error> {
233 if let Some(pair) = self.get_selected_pair() {
234 Ok(pair.local.addr())
235 } else {
236 Err(io::Error::new(io::ErrorKind::AddrNotAvailable, "Addr Not Available").into())
237 }
238 }
239
240 fn remote_addr(&self) -> Option<SocketAddr> {
241 self.get_selected_pair().map(|pair| pair.remote.addr())
242 }
243
244 async fn close(&self) -> std::result::Result<(), util::Error> {
245 Ok(())
246 }
247
248 fn as_any(&self) -> &(dyn std::any::Any + Send + Sync) {
249 self
250 }
251}