1use log::*;
2use quiche::h3::NameValue;
3
4use std::net::{self, SocketAddr};
5use std::net::ToSocketAddrs;
6use std::collections::HashMap;
7use std::error::Error;
8use std::sync::Arc;
9
10use tokio::io::{AsyncWriteExt, AsyncReadExt};
11use tokio::net::{UdpSocket, TcpStream};
12use tokio::sync::mpsc::{self, UnboundedSender};
13use tokio::time::{self, Duration};
14
15use ring::rand::*;
16
17use crate::common::*;
18
19#[derive(PartialEq, Debug)]
20enum Content {
21 Headers {
22 headers: Vec<quiche::h3::Header>,
23 },
24 Data {
25 data: Vec<u8>,
26 },
27 Datagram {
28 payload: Vec<u8>,
29 },
30 Finished,
31}
32
33#[derive(Debug)]
34struct ToSend {
35 stream_id: u64, content: Content,
37 finished: bool,
38}
39
40struct QuicReceived {
41 recv_info: quiche::RecvInfo,
42 data: Vec<u8>,
43}
44
45#[derive(Debug, Clone)]
46struct RunBeforeBindError;
47
48impl std::fmt::Display for RunBeforeBindError {
49 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
50 write!(f, "bind(listen_addr) has to be called before run()")
51 }
52}
53impl Error for RunBeforeBindError {}
54
55struct Client {
59 conn: quiche::Connection,
60 quic_receiver: mpsc::UnboundedReceiver<QuicReceived>,
61 socket: Arc<UdpSocket>,
62}
63
64type ClientMap = HashMap<quiche::ConnectionId<'static>, mpsc::UnboundedSender<QuicReceived>>;
65
66pub struct Server {
67 socket: Option<Arc<UdpSocket>>,
68}
69
70impl Server {
71 pub fn new() -> Server {
72 Server { socket: None }
73 }
74
75 pub fn listen_addr(&self) -> Option<SocketAddr> {
79 return self.socket.clone().map(|socket| socket.local_addr().unwrap())
80 }
81
82 pub async fn bind<T: tokio::net::ToSocketAddrs>(&mut self, listen_addr: T) -> Result<(), Box<dyn Error>> {
86 debug!("creating UDP socket");
87
88 let socket = UdpSocket::bind(listen_addr).await?;
90 debug!("listening on {}", socket.local_addr().unwrap());
91
92 self.socket = Some(Arc::new(socket));
93 Ok(())
94 }
95
96 pub async fn run(&self) -> Result<(), Box<dyn Error>> {
97 if self.socket.is_none() {
98 return Err(Box::new(RunBeforeBindError))
99 }
100 let socket = self.socket.clone().unwrap();
101
102 let mut buf = [0; 65535];
103 let mut out = [0; MAX_DATAGRAM_SIZE];
104
105 let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
107
108 config
109 .load_cert_chain_from_pem_file("example_cert/cert.crt")
110 .unwrap();
111 config
112 .load_priv_key_from_pem_file("example_cert/cert.key")
113 .unwrap();
114
115 config
116 .set_application_protos(quiche::h3::APPLICATION_PROTOCOL)
117 .unwrap();
118
119 config.set_max_idle_timeout(1000);
121 config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
122 config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
123 config.set_initial_max_data(10_000_000);
124 config.set_initial_max_stream_data_bidi_local(1_000_000);
125 config.set_initial_max_stream_data_bidi_remote(1_000_000);
126 config.set_initial_max_stream_data_uni(1_000_000);
127 config.set_initial_max_streams_bidi(100);
128 config.set_initial_max_streams_uni(100);
129 config.set_disable_active_migration(true);
130 config.enable_dgram(true, 1000, 1000);
131 config.enable_early_data();
132
133 let rng = SystemRandom::new();
134 let conn_id_seed =
135 ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
136
137 let mut clients = ClientMap::new();
138 let local_addr = socket.local_addr().unwrap();
141
142 'read: loop {
143 let (len, from) = match socket.recv_from(&mut buf).await {
144 Ok(v) => v,
145
146 Err(e) => {
147 panic!("recv_from() failed: {:?}", e);
148 },
149 };
150
151 debug!("got {} bytes", len);
152
153 let pkt_buf = &mut buf[..len];
154
155 let hdr = match quiche::Header::from_slice(
157 pkt_buf,
158 quiche::MAX_CONN_ID_LEN,
159 ) {
160 Ok(v) => v,
161
162 Err(e) => {
163 error!("Parsing packet header failed: {:?}", e);
164 continue 'read;
165 },
166 };
167
168 debug!("got packet {:?}", hdr);
169
170 let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid);
171 let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
172 let conn_id = conn_id.to_vec().into();
173
174 let tx = if !clients.contains_key(&hdr.dcid) &&
177 !clients.contains_key(&conn_id)
178 {
179 if hdr.ty != quiche::Type::Initial {
181 error!("Packet is not Initial");
182 continue 'read;
183 }
184
185 if !quiche::version_is_supported(hdr.version) {
186 warn!("Doing version negotiation");
187
188 let len =
189 quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out)
190 .unwrap();
191
192 let out = &out[..len];
193
194 if let Err(e) = socket.send_to(out, from).await {
195 if e.kind() == std::io::ErrorKind::WouldBlock {
196 debug!("send_to() would block");
197 break;
198 }
199
200 panic!("send_to() failed: {:?}", e);
201 }
202 continue 'read;
203 }
204
205 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
206 scid.copy_from_slice(&conn_id);
207
208 let scid = quiche::ConnectionId::from_ref(&scid);
209
210 let token = hdr.token.as_ref().unwrap();
212
213 if token.is_empty() {
215 warn!("Doing stateless retry");
216
217 let new_token = mint_token(&hdr, &from);
218
219 let len = quiche::retry(
220 &hdr.scid,
221 &hdr.dcid,
222 &scid,
223 &new_token,
224 hdr.version,
225 &mut out,
226 )
227 .unwrap();
228
229 let out = &out[..len];
230
231 if let Err(e) = socket.send_to(out, from).await {
232 if e.kind() == std::io::ErrorKind::WouldBlock {
233 debug!("send_to() would block");
234 break;
235 }
236
237 panic!("send_to() failed: {:?}", e);
238 }
239 continue 'read;
240 }
241
242 let odcid = validate_token(&from, token);
243
244 if odcid.is_none() {
247 error!("Invalid address validation token");
248 continue 'read;
249 }
250
251 if scid.len() != hdr.dcid.len() {
252 error!("Invalid destination connection ID");
253 continue 'read;
254 }
255
256 let scid = hdr.dcid.clone();
259
260 debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
261
262 let conn = quiche::accept(
263 &scid,
264 odcid.as_ref(),
265 local_addr,
266 from,
267 &mut config,
268 )
269 .unwrap();
270
271 let (tx, rx) = mpsc::unbounded_channel();
272
273 let client = Client {
274 conn,
275 quic_receiver: rx,
276 socket: socket.clone(),
277 };
278
279 clients.insert(scid.clone(), tx);
280
281 tokio::spawn(async move {
282 handle_client(client).await
283 });
284
285 clients.get(&scid).unwrap()
286 } else {
287 match clients.get(&hdr.dcid) {
288 Some(v) => v,
289
290 None => clients.get(&conn_id).unwrap(),
291 }
292 };
293
294 let recv_info = quiche::RecvInfo {
295 to: socket.local_addr().unwrap(),
296 from,
297 };
298
299 match tx.send(QuicReceived { recv_info, data: pkt_buf.to_vec() }) {
300 Ok(_) => {},
301 _ => {
302 debug!("Error sending to {:?}", &hdr.dcid);
303 clients.remove(&hdr.dcid);
304 }
305 }
306
307 }
308
309 Ok(())
310 }
311}
312
313
314async fn handle_client(mut client: Client) {
318 let mut http3_conn: Option<quiche::h3::Connection> = None;
319 let mut connect_streams: HashMap<u64, UnboundedSender<Vec<u8>>> = HashMap::new(); let mut connect_sockets: HashMap<u64, UnboundedSender<Vec<u8>>> = HashMap::new(); let (http3_sender, mut http3_receiver) = mpsc::unbounded_channel::<ToSend>();
322
323 let mut buf = [0; 65535];
324 let mut out = [0; MAX_DATAGRAM_SIZE];
325
326 let timeout = 5000; let sleep = tokio::time::sleep(Duration::from_millis(timeout));
328 tokio::pin!(sleep);
329
330 let mut http3_retry_send: Option<ToSend> = None;
331 let mut interval = time::interval(Duration::from_millis(20));
332 loop {
333 tokio::select! {
334 http3_to_send = http3_receiver.recv(), if http3_conn.is_some() && http3_retry_send.is_none() => {
336 if http3_to_send.is_none() {
337 unreachable!()
338 }
339 let mut to_send = http3_to_send.unwrap();
340 let http3_conn = http3_conn.as_mut().unwrap();
341 loop {
342 let result = match &to_send.content {
343 Content::Headers { headers } => {
344 debug!("sending http3 response {:?}", hdrs_to_strings(&headers));
345 http3_conn.send_response(&mut client.conn, to_send.stream_id, headers, to_send.finished)
346 },
347 Content::Data { data } => {
348 debug!("sending http3 data of {} bytes", data.len());
349 let mut written = 0;
350 loop {
351 if written >= data.len() {
352 break Ok(())
353 }
354 match http3_conn.send_body(&mut client.conn, to_send.stream_id, &data[written..], to_send.finished) {
355 Ok(v) => written += v,
356 Err(e) => {
357 to_send = ToSend { stream_id: to_send.stream_id, content: Content::Data { data: data[written..].to_vec() }, finished: to_send.finished };
358 break Err(e)
359 },
360 }
361 debug!("written http3 data {} of {} bytes", written, data.len());
362 }
363 },
364 Content::Datagram { payload } => {
365 debug!("sending http3 datagram of {} bytes", payload.len());
366 http3_conn.send_dgram(&mut client.conn, to_send.stream_id, &payload)
367 },
368 Content::Finished => todo!(),
369 };
370 match result {
371 Ok(_) => {},
372 Err(quiche::h3::Error::StreamBlocked | quiche::h3::Error::Done) => {
373 debug!("Connection {} stream {} stream blocked, retry later", client.conn.trace_id(), to_send.stream_id);
374 http3_retry_send = Some(to_send);
375 break;
376 },
377 Err(e) => {
378 error!("Connection {} stream {} send failed {:?}", client.conn.trace_id(), to_send.stream_id, e);
379 client.conn.stream_shutdown(to_send.stream_id, quiche::Shutdown::Write, 0);
380 connect_streams.remove(&to_send.stream_id);
381 }
382 };
383 to_send = match http3_receiver.try_recv() {
384 Ok(v) => v,
385 Err(e) => break,
386 };
387 }
388 },
389
390 recvd = client.quic_receiver.recv() => {
392 match recvd {
393 Some(mut quic_received) => {
394 let read = match client.conn.recv(&mut quic_received.data, quic_received.recv_info) {
395 Ok(v) => v,
396 Err(e) => {
397 error!("Error when quic recv(): {}", e);
398 break
399 }
400 };
401 debug!("{} processed {} bytes", client.conn.trace_id(), read);
402
403 },
404 None => {
405 break },
407 }
408 if (client.conn.is_in_early_data() || client.conn.is_established()) &&
411 http3_conn.is_none()
412 {
413 debug!(
414 "{} QUIC handshake completed, now trying HTTP/3",
415 client.conn.trace_id()
416 );
417
418 let h3_config = quiche::h3::Config::new().unwrap();
419 let h3_conn = match quiche::h3::Connection::with_transport(
420 &mut client.conn,
421 &h3_config,
422 ) {
423 Ok(v) => v,
424
425 Err(e) => {
426 error!("failed to create HTTP/3 connection: {}", e);
427 continue;
428 },
429 };
430
431 http3_conn = Some(h3_conn);
433 }
434
435 if http3_conn.is_some() {
436 let http3_conn = http3_conn.as_mut().unwrap();
438 loop {
439 match http3_conn.poll(&mut client.conn) {
440 Ok((
441 stream_id,
442 quiche::h3::Event::Headers { list: headers, .. },
443 )) => {
444 info!(
445 "{} got request {:?} on stream id {}",
446 client.conn.trace_id(),
447 hdrs_to_strings(&headers),
448 stream_id
449 );
450
451 let mut method = None;
452 let mut authority = None;
453 let mut protocol = None;
454 let mut scheme = None;
455 let mut path = None;
456
457 for hdr in headers.iter() {
459 match hdr.name() {
460 b":method" => method = Some(hdr.value()),
461 b":authority" => authority = Some(std::str::from_utf8(hdr.value()).unwrap()),
462 b":protocol" => protocol = Some(hdr.value()),
463 b":scheme" => scheme = Some(hdr.value()),
464 b":path" => path = Some(hdr.value()),
465 _ => (),
466 }
467 }
468
469 match method {
470 Some(b"CONNECT") => {
471 if let Some(authority) = authority {
472 if protocol == Some(b"connect-udp") && scheme.is_some() && path.is_some() {
473 let path = path.unwrap();
474 if let Some(peer_addr) = path_to_socketaddr(path) {
475 debug!("connecting udp to {} at {} from authority {}", std::str::from_utf8(&path).unwrap(), peer_addr, authority);
476 let http3_sender_clone_1 = http3_sender.clone();
477 let http3_sender_clone_2 = http3_sender.clone();
478 let (udp_sender, mut udp_receiver) = mpsc::unbounded_channel::<Vec<u8>>();
479 let flow_id = stream_id / 4;
480 connect_sockets.insert(flow_id, udp_sender);
481 tokio::spawn(async move {
482 let socket = match UdpSocket::bind("0.0.0.0:0").await {
483 Ok(v) => v,
484 Err(e) => {
485 error!("Error binding UDP socket");
486 return
487 }
488 };
489 if socket.connect(peer_addr).await.is_err() {
490 error!("Error connecting to UDP {}", peer_addr);
491 return
492 };
493 let socket = Arc::new(socket);
494 let socket_clone = socket.clone();
495 let read_task = tokio::spawn(async move {
496 let mut buf = [0; 65527]; loop {
498 let read = match socket_clone.recv(&mut buf).await {
499 Ok(v) => v,
500 Err(e) => {
501 error!("Error reading from UDP {} on stream id {}: {}", peer_addr, stream_id, e);
502 break
503 },
504 };
505 if read == 0 {
506 debug!("UDP connection closed from {}", peer_addr); break
508 }
509 debug!("read {} bytes from UDP from {} for flow {}", read, peer_addr, flow_id);
510 let data = wrap_udp_connect_payload(0, &buf[..read]);
511 http3_sender_clone_1.send(ToSend { stream_id: flow_id, content: Content::Datagram { payload: data }, finished: false });
512 }
513 });
514 let write_task = tokio::spawn(async move {
515 loop {
516 let data = match udp_receiver.recv().await {
517 Some(v) => v,
518 None => {
519 debug!("UDP receiver channel closed for flow {}", flow_id);
520 break
521 },
522 };
523 let (context_id, payload) = decode_var_int(&data);
524 assert_eq!(context_id, 0, "received UDP Proxying Datagram with non-zero Context ID");
525
526 trace!("start sending on UDP");
527 let bytes_written = match socket.send(payload).await {
528 Ok(v) => v,
529 Err(e) => {
530 error!("Error writing to UDP {} on flow id {}: {}", peer_addr, flow_id, e);
531 return
532 },
533 };
534 if bytes_written < payload.len() {
535 debug!("Partially sent {} bytes of UDP packet of length {}", bytes_written, payload.len());
536 }
537 debug!("written {} bytes from UDP to {} for flow {}", payload.len(), peer_addr, flow_id);
538 }
539 });
540 let headers = vec![
541 quiche::h3::Header::new(b":status", b"200"),
542 ];
543 http3_sender_clone_2.send(ToSend { stream_id, content: Content::Headers { headers }, finished: false }).expect("channel send failed");
544 tokio::join!(read_task, write_task);
545 });
546 }
547 } else if let Ok(target_url) = if authority.contains("://") { url::Url::parse(authority) } else {url::Url::parse(format!("scheme://{}", authority).as_str())} {
548 debug!("connecting to url {} from authority {}", target_url, authority);
549 if let Ok(mut socket_addrs) = target_url.to_socket_addrs() {
550 let peer_addr = socket_addrs.next().unwrap();
551 let http3_sender_clone_1 = http3_sender.clone();
552 let http3_sender_clone_2 = http3_sender.clone();
553 let (tcp_sender, mut tcp_receiver) = mpsc::unbounded_channel::<Vec<u8>>();
554 connect_streams.insert(stream_id, tcp_sender);
555 tokio::spawn(async move {
556 let stream = match TcpStream::connect(peer_addr).await {
557 Ok(v) => v,
558 Err(e) => {
559 error!("Error connecting TCP to {}: {}", peer_addr, e);
560 return
561 }
562 };
563 debug!("connecting to url {} {}", target_url, target_url.to_socket_addrs().unwrap().next().unwrap());
564 let (mut read_half, mut write_half) = stream.into_split();
565 let read_task = tokio::spawn(async move {
566 let mut buf = [0; 65535];
567 loop {
568 let read = match read_half.read(&mut buf).await {
569 Ok(v) => v,
570 Err(e) => {
571 error!("Error reading from TCP {}: {}", peer_addr, e);
572 break
573 },
574 };
575 if read == 0 {
576 debug!("TCP connection closed from {}", peer_addr);
577 break
578 }
579 debug!("read {} bytes from TCP from {} for stream {}", read, peer_addr, stream_id);
580 http3_sender_clone_1.send(ToSend { stream_id: stream_id, content: Content::Data { data: buf[..read].to_vec() }, finished: false });
581 }
582 });
583 let write_task = tokio::spawn(async move {
584 loop {
585 let data = match tcp_receiver.recv().await {
586 Some(v) => v,
587 None => {
588 debug!("TCP receiver channel closed for stream {}", stream_id);
589 break
590 },
591 };
592 trace!("start sending on TCP");
593 let mut pos = 0;
594 while pos < data.len() {
595 let bytes_written = match write_half.write(&data[pos..]).await {
596 Ok(v) => v,
597 Err(e) => {
598 error!("Error writing to TCP {} on stream id {}: {}", peer_addr, stream_id, e);
599 return
600 },
601 };
602 pos += bytes_written;
603 }
604 debug!("written {} bytes from TCP to {} for stream {}", data.len(), peer_addr, stream_id);
605 }
606 });
607 let headers = vec![
608 quiche::h3::Header::new(b":status", b"200"),
609 quiche::h3::Header::new(b"content-length", b"0"), ];
611 http3_sender_clone_2.send(ToSend { stream_id, content: Content::Headers { headers }, finished: false }).expect("channel send failed");
612 tokio::join!(read_task, write_task);
613 });
614 } else {
615 }
617 } else {
618 }
620 } else {
621 }
623 },
624
625 _ => {},
626 };
627 },
628
629 Ok((stream_id, quiche::h3::Event::Data)) => {
630 info!(
631 "{} got data on stream id {}",
632 client.conn.trace_id(),
633 stream_id
634 );
635 if connect_streams.contains_key(&stream_id) {
636 while let Ok(read) = http3_conn.recv_body(&mut client.conn, stream_id, &mut buf) {
637 debug!(
638 "got {} bytes of data on stream {}",
639 read, stream_id
640 );
641 trace!("{}", unsafe {
642 std::str::from_utf8_unchecked(&buf[..read])
643 });
644 let data = &buf[..read];
645 connect_streams.get(&stream_id).unwrap().send(data.to_vec()).expect("channel send failed");
646 }
647 }
648 },
649
650 Ok((_stream_id, quiche::h3::Event::Finished)) => (), Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (), Ok((flow_id, quiche::h3::Event::Datagram)) => {
655 info!(
656 "{} got datagram on flow id {}",
657 client.conn.trace_id(),
658 flow_id
659 );
660 if connect_sockets.contains_key(&flow_id) {
661 match http3_conn.recv_dgram(&mut client.conn, &mut buf) {
662 Ok((read, recvd_flow_id, flow_id_len)) => {
663 debug!("got {} bytes of datagram on flow {}", read - flow_id_len, flow_id);
664 assert_eq!(flow_id, recvd_flow_id, "flow id by recv_dgram does not match");
665 trace!("{}", unsafe {
666 std::str::from_utf8_unchecked(&buf[flow_id_len..read])
667 });
668 let data = &buf[flow_id_len..read];
669 connect_sockets.get(&flow_id).unwrap().send(data.to_vec()).expect("channel send failed");
670 },
671 Err(e) => {
672 error!("error recv_dgram(): {}", e);
673 break;
674 }
675 }
676 }
677 ()
678 },
679
680 Ok((
681 _prioritized_element_id,
682 quiche::h3::Event::PriorityUpdate,
683 )) => (),
684
685 Ok((_goaway_id, quiche::h3::Event::GoAway)) => (),
686
687 Err(quiche::h3::Error::Done) => {
688 break;
689 },
690
691 Err(e) => {
692 error!(
693 "{} HTTP/3 error {:?}",
694 client.conn.trace_id(),
695 e
696 );
697
698 break;
699 },
700 }
701 }
702 }
703 },
704
705 _ = interval.tick(), if http3_conn.is_some() && http3_retry_send.is_some() => {
707 let mut to_send = http3_retry_send.unwrap();
708 let http3_conn = http3_conn.as_mut().unwrap();
709 let result = match &to_send.content {
710 Content::Headers { headers } => {
711 debug!("retry sending http3 response {:?}", hdrs_to_strings(&headers));
712 http3_conn.send_response(&mut client.conn, to_send.stream_id, headers, to_send.finished)
713 },
714 Content::Data { data } => {
715 debug!("retry sending http3 data of {} bytes", data.len());
716 let mut written = 0;
717 loop {
718 if written >= data.len() {
719 break Ok(())
720 }
721 match http3_conn.send_body(&mut client.conn, to_send.stream_id, &data[written..], to_send.finished) {
722 Ok(v) => written += v,
723 Err(e) => {
724 to_send = ToSend { stream_id: to_send.stream_id, content: Content::Data { data: data[written..].to_vec() }, finished: to_send.finished };
725 break Err(e)
726 },
727 }
728 debug!("written http3 data {} of {} bytes", written, data.len());
729 }
730 },
731 Content::Datagram { payload } => {
732 debug!("retry sending http3 datagram of {} bytes", payload.len());
733 http3_conn.send_dgram(&mut client.conn, to_send.stream_id, &payload)
734 },
735 Content::Finished => todo!(),
736 };
737 match result {
738 Ok(_) => {
739 http3_retry_send = None;
740 },
741 Err(quiche::h3::Error::StreamBlocked | quiche::h3::Error::Done) => {
742 debug!("Connection {} stream {} stream blocked, retry later", client.conn.trace_id(), to_send.stream_id);
743 http3_retry_send = Some(to_send);
744 },
745 Err(e) => {
746 error!("Connection {} stream {} send failed {:?}", client.conn.trace_id(), to_send.stream_id, e);
747 client.conn.stream_shutdown(to_send.stream_id, quiche::Shutdown::Write, 0);
748 connect_streams.remove(&to_send.stream_id);
749 http3_retry_send = None;
750 }
751 };
752 },
753
754 () = &mut sleep => {
755 trace!("timeout elapsed");
756 sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout));
757
758 if client.conn.is_closed() {
759 info!(
760 "{} connection collected {:?}",
761 client.conn.trace_id(),
762 client.conn.stats()
763 );
764 }
765 },
766 else => break,
767 }
768 loop {
770 let (write, send_info) = match client.conn.send(&mut out) {
771 Ok(v) => v,
772
773 Err(quiche::Error::Done) => {
774 debug!("QUIC connection {} done writing", client.conn.trace_id());
775 break;
776 },
777
778 Err(e) => {
779 error!("QUIC connection {} send failed: {:?}", client.conn.trace_id(), e);
780
781 client.conn.close(false, 0x1, b"fail").ok();
782 break;
783 },
784 };
785
786 match client.socket.send_to(&out[..write], send_info.to).await {
787 Ok(written) => debug!("{} written {} bytes out of {}", client.conn.trace_id(), written, write),
788 Err(e) => panic!("UDP socket send_to() failed: {:?}", e),
789 }
790 }
791 }
792
793}
794
795fn mint_token(hdr: &quiche::Header, src: &net::SocketAddr) -> Vec<u8> {
804 let mut token = Vec::new();
805
806 token.extend_from_slice(b"quiche");
807
808 let addr = match src.ip() {
810 std::net::IpAddr::V4(a) => a.octets().to_vec(),
811 std::net::IpAddr::V6(a) => a.octets().to_vec(),
812 };
813
814 token.extend_from_slice(&addr);
815 token.extend_from_slice(&hdr.dcid);
816
817 token
818}
819
820fn validate_token<'a>(
828 src: &net::SocketAddr, token: &'a [u8],
829) -> Option<quiche::ConnectionId<'a>> {
830 if token.len() < 6 {
831 return None;
832 }
833
834 if &token[..6] != b"quiche" {
835 return None;
836 }
837
838 let token = &token[6..];
839
840 let addr = match src.ip() {
841 std::net::IpAddr::V4(a) => a.octets().to_vec(),
842 std::net::IpAddr::V6(a) => a.octets().to_vec(),
843 };
844
845 if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() {
846 return None;
847 }
848
849 Some(quiche::ConnectionId::from_ref(&token[addr.len()..]))
850}
851
852fn path_to_socketaddr(path: &[u8]) -> Option<net::SocketAddr> {
856 let mut split_iter = std::io::BufRead::split(path, b'/');
858 let mut second_last = None;
859 let mut last = None;
860 while let Some(curr) = split_iter.next() {
861 if let Ok(curr) = curr {
862 second_last = last;
863 last = Some(curr);
864 } else {
865 return None
866 }
867 }
868 if second_last.is_some() && last.is_some() {
869 let second_last = second_last.unwrap();
870 let last = last.unwrap();
871 let second_last = std::str::from_utf8(&second_last);
872 let last = std::str::from_utf8(&last);
873 if second_last.is_ok() && last.is_ok() {
874 let url_str = format!("scheme://{}:{}/", second_last.unwrap(), last.unwrap());
875 let url = url::Url::parse(&url_str);
876 if let Ok(url) = url {
877 let socket_addrs = url.to_socket_addrs();
878 if let Ok(mut socket_addrs) = socket_addrs {
879 return socket_addrs.next()
880 }
881 }
882 }
883 }
884
885 None
886}