masquerade_proxy/
server.rs

1use log::*;
2use quiche::h3::NameValue;
3
4use std::net::{self, SocketAddr};
5use std::net::ToSocketAddrs;
6use std::collections::HashMap;
7use std::error::Error;
8use std::sync::Arc;
9
10use tokio::io::{AsyncWriteExt, AsyncReadExt};
11use tokio::net::{UdpSocket, TcpStream};
12use tokio::sync::mpsc::{self, UnboundedSender};
13use tokio::time::{self, Duration};
14
15use ring::rand::*;
16
17use crate::common::*;
18
19#[derive(PartialEq, Debug)]
20enum Content {
21    Headers {
22        headers: Vec<quiche::h3::Header>,
23    },
24    Data {
25        data: Vec<u8>,
26    },
27    Datagram {
28        payload: Vec<u8>,
29    },
30    Finished,
31}
32
33#[derive(Debug)]
34struct ToSend {
35    stream_id: u64, // or flow_id for DATAGRAM
36    content: Content,
37    finished: bool,
38}
39
40struct QuicReceived {
41    recv_info: quiche::RecvInfo,
42    data: Vec<u8>,
43}
44
45#[derive(Debug, Clone)]
46struct RunBeforeBindError;
47
48impl std::fmt::Display for RunBeforeBindError {
49    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
50        write!(f, "bind(listen_addr) has to be called before run()")
51    }
52}
53impl Error for RunBeforeBindError {}
54
55/**
56 * Client for each QUIC connection
57 */
58struct Client {
59    conn: quiche::Connection,
60    quic_receiver: mpsc::UnboundedReceiver<QuicReceived>,
61    socket: Arc<UdpSocket>,
62}
63
64type ClientMap = HashMap<quiche::ConnectionId<'static>, mpsc::UnboundedSender<QuicReceived>>;
65
66pub struct Server {
67    socket: Option<Arc<UdpSocket>>,
68}
69
70impl Server {
71    pub fn new() -> Server {
72        Server { socket: None }
73    }
74
75    /**
76     * Get the socket address the server is bound to. Returns None if server is not bound to a socket yet
77     */
78    pub fn listen_addr(&self) -> Option<SocketAddr> {
79        return self.socket.clone().map(|socket| socket.local_addr().unwrap())
80    }
81
82    /**
83     * Bind the server to listen to an address
84     */
85    pub async fn bind<T: tokio::net::ToSocketAddrs>(&mut self, listen_addr: T) -> Result<(), Box<dyn Error>> {
86        debug!("creating UDP socket");
87    
88        // Create the UDP listening socket, and register it with the event loop.
89        let socket = UdpSocket::bind(listen_addr).await?;
90        debug!("listening on {}", socket.local_addr().unwrap());
91        
92        self.socket = Some(Arc::new(socket));
93        Ok(())
94    }
95
96    pub async fn run(&self) -> Result<(), Box<dyn Error>> {
97        if self.socket.is_none() {
98            return Err(Box::new(RunBeforeBindError))
99        }
100        let socket = self.socket.clone().unwrap();
101
102        let mut buf = [0; 65535];
103        let mut out = [0; MAX_DATAGRAM_SIZE];
104    
105        // Create the configuration for the QUIC connections.
106        let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
107    
108        config
109            .load_cert_chain_from_pem_file("example_cert/cert.crt")
110            .unwrap();
111        config
112            .load_priv_key_from_pem_file("example_cert/cert.key")
113            .unwrap();
114    
115        config
116            .set_application_protos(quiche::h3::APPLICATION_PROTOCOL)
117            .unwrap();
118    
119        // TODO: allow custom configuration of the following parameters and also consider the defaults more carefully
120        config.set_max_idle_timeout(1000);
121        config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
122        config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
123        config.set_initial_max_data(10_000_000);
124        config.set_initial_max_stream_data_bidi_local(1_000_000);
125        config.set_initial_max_stream_data_bidi_remote(1_000_000);
126        config.set_initial_max_stream_data_uni(1_000_000);
127        config.set_initial_max_streams_bidi(100);
128        config.set_initial_max_streams_uni(100);
129        config.set_disable_active_migration(true);
130        config.enable_dgram(true, 1000, 1000); 
131        config.enable_early_data();
132    
133        let rng = SystemRandom::new();
134        let conn_id_seed =
135            ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
136    
137        let mut clients = ClientMap::new();
138        // let mut tcp_connections = TokenMap::new();
139    
140        let local_addr = socket.local_addr().unwrap();
141    
142        'read: loop {
143            let (len, from) = match socket.recv_from(&mut buf).await {
144                Ok(v) => v,
145    
146                Err(e) => {
147                    panic!("recv_from() failed: {:?}", e);
148                },
149            };
150    
151            debug!("got {} bytes", len);
152    
153            let pkt_buf = &mut buf[..len];
154    
155            // Parse the QUIC packet's header.
156            let hdr = match quiche::Header::from_slice(
157                pkt_buf,
158                quiche::MAX_CONN_ID_LEN,
159            ) {
160                Ok(v) => v,
161    
162                Err(e) => {
163                    error!("Parsing packet header failed: {:?}", e);
164                    continue 'read;
165                },
166            };
167    
168            debug!("got packet {:?}", hdr);
169    
170            let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid);
171            let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
172            let conn_id = conn_id.to_vec().into();
173    
174            // Lookup a connection based on the packet's connection ID. If there
175            // is no connection matching, create a new one.
176            let tx = if !clients.contains_key(&hdr.dcid) &&
177                !clients.contains_key(&conn_id)
178            {
179                // TODO: move initialization to client task
180                if hdr.ty != quiche::Type::Initial {
181                    error!("Packet is not Initial");
182                    continue 'read;
183                }
184    
185                if !quiche::version_is_supported(hdr.version) {
186                    warn!("Doing version negotiation");
187    
188                    let len =
189                        quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out)
190                            .unwrap();
191    
192                    let out = &out[..len];
193    
194                    if let Err(e) = socket.send_to(out, from).await {
195                        if e.kind() == std::io::ErrorKind::WouldBlock {
196                            debug!("send_to() would block");
197                            break;
198                        }
199    
200                        panic!("send_to() failed: {:?}", e);
201                    }
202                    continue 'read;
203                }
204    
205                let mut scid = [0; quiche::MAX_CONN_ID_LEN];
206                scid.copy_from_slice(&conn_id);
207    
208                let scid = quiche::ConnectionId::from_ref(&scid);
209    
210                // Token is always present in Initial packets.
211                let token = hdr.token.as_ref().unwrap();
212    
213                // Do stateless retry if the client didn't send a token.
214                if token.is_empty() {
215                    warn!("Doing stateless retry");
216    
217                    let new_token = mint_token(&hdr, &from);
218    
219                    let len = quiche::retry(
220                        &hdr.scid,
221                        &hdr.dcid,
222                        &scid,
223                        &new_token,
224                        hdr.version,
225                        &mut out,
226                    )
227                    .unwrap();
228    
229                    let out = &out[..len];
230    
231                    if let Err(e) = socket.send_to(out, from).await {
232                        if e.kind() == std::io::ErrorKind::WouldBlock {
233                            debug!("send_to() would block");
234                            break;
235                        }
236    
237                        panic!("send_to() failed: {:?}", e);
238                    }
239                    continue 'read;
240                }
241    
242                let odcid = validate_token(&from, token);
243    
244                // The token was not valid, meaning the retry failed, so
245                // drop the packet.
246                if odcid.is_none() {
247                    error!("Invalid address validation token");
248                    continue 'read;
249                }
250    
251                if scid.len() != hdr.dcid.len() {
252                    error!("Invalid destination connection ID");
253                    continue 'read;
254                }
255    
256                // Reuse the source connection ID we sent in the Retry packet,
257                // instead of changing it again.
258                let scid = hdr.dcid.clone();
259    
260                debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
261    
262                let conn = quiche::accept(
263                    &scid,
264                    odcid.as_ref(),
265                    local_addr,
266                    from,
267                    &mut config,
268                )
269                .unwrap();
270    
271                let (tx, rx) = mpsc::unbounded_channel();
272    
273                let client = Client {
274                    conn,
275                    quic_receiver: rx,
276                    socket: socket.clone(),
277                };
278    
279                clients.insert(scid.clone(), tx);
280    
281                tokio::spawn(async move {
282                    handle_client(client).await
283                });
284    
285                clients.get(&scid).unwrap()
286            } else {
287                match clients.get(&hdr.dcid) {
288                    Some(v) => v,
289    
290                    None => clients.get(&conn_id).unwrap(),
291                }
292            };
293    
294            let recv_info = quiche::RecvInfo {
295                to: socket.local_addr().unwrap(),
296                from,
297            };
298            
299            match tx.send(QuicReceived { recv_info, data: pkt_buf.to_vec() }) {
300                Ok(_) => {},
301                _ => {
302                    debug!("Error sending to {:?}", &hdr.dcid);
303                    clients.remove(&hdr.dcid);
304                }
305            }
306    
307        }
308    
309        Ok(())
310    }
311}
312
313
314/**
315 * Client handler that handles the connection for a single client
316 */
317async fn handle_client(mut client: Client) {
318    let mut http3_conn: Option<quiche::h3::Connection> = None;
319    let mut connect_streams: HashMap<u64, UnboundedSender<Vec<u8>>> = HashMap::new(); // for TCP CONNECT
320    let mut connect_sockets: HashMap<u64, UnboundedSender<Vec<u8>>> = HashMap::new(); // for CONNECT UDP
321    let (http3_sender, mut http3_receiver) = mpsc::unbounded_channel::<ToSend>();
322
323    let mut buf = [0; 65535];
324    let mut out = [0; MAX_DATAGRAM_SIZE];
325
326    let timeout = 5000; // milliseconds
327    let sleep = tokio::time::sleep(Duration::from_millis(timeout));
328    tokio::pin!(sleep);
329
330    let mut http3_retry_send: Option<ToSend> = None;
331    let mut interval = time::interval(Duration::from_millis(20));
332    loop {
333        tokio::select! {
334            // Send pending HTTP3 data in channel to HTTP3 connection on QUIC
335            http3_to_send = http3_receiver.recv(), if http3_conn.is_some() && http3_retry_send.is_none() => {
336                if http3_to_send.is_none() {
337                    unreachable!()
338                }
339                let mut to_send = http3_to_send.unwrap();
340                let http3_conn = http3_conn.as_mut().unwrap();
341                loop {
342                    let result = match &to_send.content {
343                        Content::Headers { headers } => {
344                            debug!("sending http3 response {:?}", hdrs_to_strings(&headers));
345                            http3_conn.send_response(&mut client.conn, to_send.stream_id, headers, to_send.finished)
346                        },
347                        Content::Data { data } => {
348                            debug!("sending http3 data of {} bytes", data.len());
349                            let mut written = 0;
350                            loop {
351                                if written >= data.len() {
352                                    break Ok(())
353                                }
354                                match http3_conn.send_body(&mut client.conn, to_send.stream_id, &data[written..], to_send.finished) {
355                                    Ok(v) => written += v,
356                                    Err(e) => {
357                                        to_send = ToSend { stream_id: to_send.stream_id, content: Content::Data { data: data[written..].to_vec() }, finished: to_send.finished };
358                                        break Err(e)
359                                    },
360                                }
361                                debug!("written http3 data {} of {} bytes", written, data.len());
362                            }
363                        },
364                        Content::Datagram { payload } => {
365                            debug!("sending http3 datagram of {} bytes", payload.len());
366                            http3_conn.send_dgram(&mut client.conn, to_send.stream_id, &payload)
367                        },
368                        Content::Finished => todo!(),
369                    };
370                    match result {
371                        Ok(_) => {},
372                        Err(quiche::h3::Error::StreamBlocked | quiche::h3::Error::Done) => {
373                            debug!("Connection {} stream {} stream blocked, retry later", client.conn.trace_id(), to_send.stream_id);
374                            http3_retry_send = Some(to_send);
375                            break; 
376                        },
377                        Err(e) => {
378                            error!("Connection {} stream {} send failed {:?}", client.conn.trace_id(), to_send.stream_id, e);
379                            client.conn.stream_shutdown(to_send.stream_id, quiche::Shutdown::Write, 0);
380                            connect_streams.remove(&to_send.stream_id);
381                        }
382                    };
383                    to_send = match http3_receiver.try_recv() {
384                        Ok(v) => v,
385                        Err(e) => break,
386                    };
387                }
388            },
389
390            // handle QUIC received data
391            recvd = client.quic_receiver.recv() => {
392                match recvd {
393                    Some(mut quic_received) => {
394                        let read = match client.conn.recv(&mut quic_received.data, quic_received.recv_info) {
395                            Ok(v) => v,
396                            Err(e) => {
397                                error!("Error when quic recv(): {}", e);
398                                break
399                            }
400                        };
401                        debug!("{} processed {} bytes", client.conn.trace_id(), read);
402                        
403                    },
404                    None => {
405                        break // channel closed on the other side. Should not happen?
406                    },
407                }
408                // Create a new HTTP/3 connection as soon as the QUIC connection
409                // is established.
410                if (client.conn.is_in_early_data() || client.conn.is_established()) &&
411                    http3_conn.is_none()
412                {
413                    debug!(
414                        "{} QUIC handshake completed, now trying HTTP/3",
415                        client.conn.trace_id()
416                    );
417
418                    let h3_config = quiche::h3::Config::new().unwrap();
419                    let h3_conn = match quiche::h3::Connection::with_transport(
420                        &mut client.conn,
421                        &h3_config,
422                    ) {
423                        Ok(v) => v,
424
425                        Err(e) => {
426                            error!("failed to create HTTP/3 connection: {}", e);
427                            continue;
428                        },
429                    };
430
431                    // TODO: sanity check h3 connection before adding to map
432                    http3_conn = Some(h3_conn);
433                }
434
435                if http3_conn.is_some() {
436                    // Process HTTP/3 events.
437                    let http3_conn = http3_conn.as_mut().unwrap();
438                    loop {
439                        match http3_conn.poll(&mut client.conn) {
440                            Ok((
441                                stream_id,
442                                quiche::h3::Event::Headers { list: headers, .. },
443                            )) => {
444                                info!(
445                                    "{} got request {:?} on stream id {}",
446                                    client.conn.trace_id(),
447                                    hdrs_to_strings(&headers),
448                                    stream_id
449                                );
450                            
451                                let mut method = None;
452                                let mut authority = None;
453                                let mut protocol = None;
454                                let mut scheme = None;
455                                let mut path = None;
456                            
457                                // Look for the request's path and method.
458                                for hdr in headers.iter() {
459                                    match hdr.name() {
460                                        b":method" => method = Some(hdr.value()),
461                                        b":authority" => authority = Some(std::str::from_utf8(hdr.value()).unwrap()),
462                                        b":protocol" => protocol = Some(hdr.value()),
463                                        b":scheme" => scheme = Some(hdr.value()),
464                                        b":path" => path = Some(hdr.value()),
465                                        _ => (),
466                                    }
467                                }
468                            
469                                match method {
470                                    Some(b"CONNECT") => {
471                                        if let Some(authority) = authority {
472                                            if protocol == Some(b"connect-udp") && scheme.is_some() && path.is_some() {
473                                                let path = path.unwrap();
474                                                if let Some(peer_addr) = path_to_socketaddr(path) {
475                                                    debug!("connecting udp to {} at {} from authority {}", std::str::from_utf8(&path).unwrap(), peer_addr, authority);
476                                                    let http3_sender_clone_1 = http3_sender.clone();
477                                                    let http3_sender_clone_2 = http3_sender.clone();
478                                                    let (udp_sender, mut udp_receiver) = mpsc::unbounded_channel::<Vec<u8>>();
479                                                    let flow_id = stream_id / 4;
480                                                    connect_sockets.insert(flow_id, udp_sender);
481                                                    tokio::spawn(async move {
482                                                        let socket = match UdpSocket::bind("0.0.0.0:0").await {
483                                                            Ok(v) => v,
484                                                            Err(e) => {
485                                                                error!("Error binding UDP socket");
486                                                                return
487                                                            }
488                                                        };
489                                                        if socket.connect(peer_addr).await.is_err() {
490                                                            error!("Error connecting to UDP {}", peer_addr);
491                                                            return
492                                                        };
493                                                        let socket = Arc::new(socket);
494                                                        let socket_clone = socket.clone();
495                                                        let read_task = tokio::spawn(async move {
496                                                            let mut buf = [0; 65527]; // max length of UDP Proxying Payload, ref: https://www.rfc-editor.org/rfc/rfc9298.html#name-http-datagram-payload-forma
497                                                            loop {
498                                                                let read = match socket_clone.recv(&mut buf).await {
499                                                                    Ok(v) => v,
500                                                                    Err(e) => {
501                                                                        error!("Error reading from UDP {} on stream id {}: {}", peer_addr, stream_id, e);
502                                                                        break
503                                                                    },
504                                                                };
505                                                                if read == 0 {
506                                                                    debug!("UDP connection closed from {}", peer_addr); // do we need this check?
507                                                                    break
508                                                                }
509                                                                debug!("read {} bytes from UDP from {} for flow {}", read, peer_addr, flow_id);
510                                                                let data = wrap_udp_connect_payload(0, &buf[..read]);
511                                                                http3_sender_clone_1.send(ToSend { stream_id: flow_id, content: Content::Datagram { payload: data }, finished: false });
512                                                            }
513                                                        });
514                                                        let write_task = tokio::spawn(async move {
515                                                            loop {
516                                                                let data = match udp_receiver.recv().await {
517                                                                    Some(v) => v,
518                                                                    None => {
519                                                                        debug!("UDP receiver channel closed for flow {}", flow_id);
520                                                                        break
521                                                                    },
522                                                                };
523                                                                let (context_id, payload) = decode_var_int(&data);
524                                                                assert_eq!(context_id, 0, "received UDP Proxying Datagram with non-zero Context ID");
525
526                                                                trace!("start sending on UDP");
527                                                                let bytes_written = match socket.send(payload).await {
528                                                                    Ok(v) => v,
529                                                                    Err(e) => {
530                                                                        error!("Error writing to UDP {} on flow id {}: {}", peer_addr, flow_id, e);
531                                                                        return
532                                                                    },
533                                                                };
534                                                                if bytes_written < payload.len() {
535                                                                    debug!("Partially sent {} bytes of UDP packet of length {}", bytes_written, payload.len());
536                                                                }
537                                                                debug!("written {} bytes from UDP to {} for flow {}", payload.len(), peer_addr, flow_id);
538                                                            }
539                                                        });
540                                                        let headers = vec![
541                                                            quiche::h3::Header::new(b":status", b"200"),
542                                                        ];
543                                                        http3_sender_clone_2.send(ToSend { stream_id, content: Content::Headers { headers }, finished: false }).expect("channel send failed");
544                                                        tokio::join!(read_task, write_task);
545                                                    });
546                                                }
547                                            } else if let Ok(target_url) = if authority.contains("://") { url::Url::parse(authority) } else {url::Url::parse(format!("scheme://{}", authority).as_str())} {
548                                                debug!("connecting to url {} from authority {}", target_url, authority);
549                                                if let Ok(mut socket_addrs) = target_url.to_socket_addrs() {
550                                                    let peer_addr = socket_addrs.next().unwrap();
551                                                    let http3_sender_clone_1 = http3_sender.clone();
552                                                    let http3_sender_clone_2 = http3_sender.clone();
553                                                    let (tcp_sender, mut tcp_receiver) = mpsc::unbounded_channel::<Vec<u8>>();
554                                                    connect_streams.insert(stream_id, tcp_sender);
555                                                    tokio::spawn(async move {
556                                                        let stream = match TcpStream::connect(peer_addr).await {
557                                                            Ok(v) => v,
558                                                            Err(e) => {
559                                                                error!("Error connecting TCP to {}: {}", peer_addr, e);
560                                                                return
561                                                            }
562                                                        };
563                                                        debug!("connecting to url {} {}", target_url, target_url.to_socket_addrs().unwrap().next().unwrap());
564                                                        let (mut read_half, mut write_half) = stream.into_split();
565                                                        let read_task = tokio::spawn(async move {
566                                                            let mut buf = [0; 65535];
567                                                            loop {
568                                                                let read = match read_half.read(&mut buf).await {
569                                                                    Ok(v) => v,
570                                                                    Err(e) => {
571                                                                        error!("Error reading from TCP {}: {}", peer_addr, e);
572                                                                        break
573                                                                    },
574                                                                };
575                                                                if read == 0 {
576                                                                    debug!("TCP connection closed from {}", peer_addr);
577                                                                    break
578                                                                }
579                                                                debug!("read {} bytes from TCP from {} for stream {}", read, peer_addr, stream_id);
580                                                                http3_sender_clone_1.send(ToSend { stream_id: stream_id, content: Content::Data { data: buf[..read].to_vec() }, finished: false });
581                                                            }
582                                                        });
583                                                        let write_task = tokio::spawn(async move {
584                                                            loop {
585                                                                let data = match tcp_receiver.recv().await {
586                                                                    Some(v) => v,
587                                                                    None => {
588                                                                        debug!("TCP receiver channel closed for stream {}", stream_id);
589                                                                        break
590                                                                    },
591                                                                };
592                                                                trace!("start sending on TCP");
593                                                                let mut pos = 0;
594                                                                while pos < data.len() {
595                                                                    let bytes_written = match write_half.write(&data[pos..]).await {
596                                                                        Ok(v) => v,
597                                                                        Err(e) => {
598                                                                            error!("Error writing to TCP {} on stream id {}: {}", peer_addr, stream_id, e);
599                                                                            return
600                                                                        },
601                                                                    };
602                                                                    pos += bytes_written;
603                                                                }
604                                                                debug!("written {} bytes from TCP to {} for stream {}", data.len(), peer_addr, stream_id);
605                                                            }
606                                                        });
607                                                        let headers = vec![
608                                                            quiche::h3::Header::new(b":status", b"200"),
609                                                            quiche::h3::Header::new(b"content-length", b"0"), // NOTE: is this needed?
610                                                        ];
611                                                        http3_sender_clone_2.send(ToSend { stream_id, content: Content::Headers { headers }, finished: false }).expect("channel send failed");
612                                                        tokio::join!(read_task, write_task);
613                                                    });
614                                                } else {
615                                                    // TODO: send error
616                                                }
617                                            } else {
618                                                // TODO: send error
619                                            }
620                                        } else {
621                                            // TODO: send error
622                                        }
623                                    },
624                            
625                                    _ => {},
626                                };
627                            },
628
629                            Ok((stream_id, quiche::h3::Event::Data)) => {
630                                info!(
631                                    "{} got data on stream id {}",
632                                    client.conn.trace_id(),
633                                    stream_id
634                                );
635                                if connect_streams.contains_key(&stream_id) {
636                                    while let Ok(read) = http3_conn.recv_body(&mut client.conn, stream_id, &mut buf) {
637                                        debug!(
638                                            "got {} bytes of data on stream {}",
639                                            read, stream_id
640                                        );
641                                        trace!("{}", unsafe {
642                                            std::str::from_utf8_unchecked(&buf[..read])
643                                        });
644                                        let data = &buf[..read];
645                                        connect_streams.get(&stream_id).unwrap().send(data.to_vec()).expect("channel send failed");
646                                    }
647                                }
648                            },
649
650                            Ok((_stream_id, quiche::h3::Event::Finished)) => (), // TODO: Add to the queue
651
652                            Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (), // TODO: Add to the queue
653
654                            Ok((flow_id, quiche::h3::Event::Datagram)) => {
655                                info!(
656                                    "{} got datagram on flow id {}",
657                                    client.conn.trace_id(),
658                                    flow_id
659                                );
660                                if connect_sockets.contains_key(&flow_id) {
661                                    match http3_conn.recv_dgram(&mut client.conn, &mut buf) {
662                                        Ok((read, recvd_flow_id, flow_id_len)) => {
663                                            debug!("got {} bytes of datagram on flow {}", read - flow_id_len, flow_id);
664                                            assert_eq!(flow_id, recvd_flow_id, "flow id by recv_dgram does not match");
665                                            trace!("{}", unsafe {
666                                                std::str::from_utf8_unchecked(&buf[flow_id_len..read])
667                                            });
668                                            let data = &buf[flow_id_len..read];
669                                            connect_sockets.get(&flow_id).unwrap().send(data.to_vec()).expect("channel send failed");
670                                        },
671                                        Err(e) => {
672                                            error!("error recv_dgram(): {}", e);
673                                            break;
674                                        }
675                                    }
676                                }
677                                ()
678                            },
679
680                            Ok((
681                                _prioritized_element_id,
682                                quiche::h3::Event::PriorityUpdate,
683                            )) => (),
684
685                            Ok((_goaway_id, quiche::h3::Event::GoAway)) => (),
686
687                            Err(quiche::h3::Error::Done) => {
688                                break;
689                            },
690
691                            Err(e) => {
692                                error!(
693                                    "{} HTTP/3 error {:?}",
694                                    client.conn.trace_id(),
695                                    e
696                                );
697                                
698                                break;
699                            },
700                        }
701                    }
702                }
703            },
704
705            // Retry sending in case of stream blocking
706            _ = interval.tick(), if http3_conn.is_some() && http3_retry_send.is_some() => {
707                let mut to_send = http3_retry_send.unwrap();
708                let http3_conn = http3_conn.as_mut().unwrap();
709                let result = match &to_send.content {
710                    Content::Headers { headers } => {
711                        debug!("retry sending http3 response {:?}", hdrs_to_strings(&headers));
712                        http3_conn.send_response(&mut client.conn, to_send.stream_id, headers, to_send.finished)
713                    },
714                    Content::Data { data } => {
715                        debug!("retry sending http3 data of {} bytes", data.len());
716                        let mut written = 0;
717                        loop {
718                            if written >= data.len() {
719                                break Ok(())
720                            }
721                            match http3_conn.send_body(&mut client.conn, to_send.stream_id, &data[written..], to_send.finished) {
722                                Ok(v) => written += v,
723                                Err(e) => {
724                                    to_send = ToSend { stream_id: to_send.stream_id, content: Content::Data { data: data[written..].to_vec() }, finished: to_send.finished };
725                                    break Err(e)
726                                },
727                            }
728                            debug!("written http3 data {} of {} bytes", written, data.len());
729                        }
730                    },
731                    Content::Datagram { payload } => {
732                        debug!("retry sending http3 datagram of {} bytes", payload.len());
733                        http3_conn.send_dgram(&mut client.conn, to_send.stream_id, &payload)
734                    },
735                    Content::Finished => todo!(),
736                };
737                match result {
738                    Ok(_) => {
739                        http3_retry_send = None;
740                    },
741                    Err(quiche::h3::Error::StreamBlocked | quiche::h3::Error::Done) => {
742                        debug!("Connection {} stream {} stream blocked, retry later", client.conn.trace_id(), to_send.stream_id);
743                        http3_retry_send = Some(to_send);
744                    },
745                    Err(e) => {
746                        error!("Connection {} stream {} send failed {:?}", client.conn.trace_id(), to_send.stream_id, e);
747                        client.conn.stream_shutdown(to_send.stream_id, quiche::Shutdown::Write, 0);
748                        connect_streams.remove(&to_send.stream_id);
749                        http3_retry_send = None;
750                    }
751                };
752            },
753
754            () = &mut sleep => {
755                trace!("timeout elapsed");
756                sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout));
757
758                if client.conn.is_closed() {
759                    info!(
760                        "{} connection collected {:?}",
761                        client.conn.trace_id(),
762                        client.conn.stats()
763                    );
764                }
765            },
766            else => break,
767        }
768        // Send pending QUIC packets
769        loop {
770            let (write, send_info) = match client.conn.send(&mut out) {
771                Ok(v) => v,
772
773                Err(quiche::Error::Done) => {
774                    debug!("QUIC connection {} done writing", client.conn.trace_id());
775                    break;
776                },
777
778                Err(e) => {
779                    error!("QUIC connection {} send failed: {:?}", client.conn.trace_id(), e);
780
781                    client.conn.close(false, 0x1, b"fail").ok();
782                    break;
783                },
784            };
785
786            match client.socket.send_to(&out[..write], send_info.to).await {
787                Ok(written) => debug!("{} written {} bytes out of {}", client.conn.trace_id(), written, write),
788                Err(e) => panic!("UDP socket send_to() failed: {:?}", e),
789            }
790        }
791    }
792    
793}
794
795/// Generate a stateless retry token.
796///
797/// The token includes the static string `"quiche"` followed by the IP address
798/// of the client and by the original destination connection ID generated by the
799/// client.
800///
801/// Note that this function is only an example and doesn't do any cryptographic
802/// authenticate of the token. *It should not be used in production system*.
803fn mint_token(hdr: &quiche::Header, src: &net::SocketAddr) -> Vec<u8> {
804    let mut token = Vec::new();
805
806    token.extend_from_slice(b"quiche");
807
808    // TODO: add cryptographic token
809    let addr = match src.ip() {
810        std::net::IpAddr::V4(a) => a.octets().to_vec(),
811        std::net::IpAddr::V6(a) => a.octets().to_vec(),
812    };
813
814    token.extend_from_slice(&addr);
815    token.extend_from_slice(&hdr.dcid);
816
817    token
818}
819
820/// Validates a stateless retry token.
821///
822/// This checks that the ticket includes the `"quiche"` static string, and that
823/// the client IP address matches the address stored in the ticket.
824///
825/// Note that this function is only an example and doesn't do any cryptographic
826/// authenticate of the token. *It should not be used in production system*.
827fn validate_token<'a>(
828    src: &net::SocketAddr, token: &'a [u8],
829) -> Option<quiche::ConnectionId<'a>> {
830    if token.len() < 6 {
831        return None;
832    }
833
834    if &token[..6] != b"quiche" {
835        return None;
836    }
837
838    let token = &token[6..];
839
840    let addr = match src.ip() {
841        std::net::IpAddr::V4(a) => a.octets().to_vec(),
842        std::net::IpAddr::V6(a) => a.octets().to_vec(),
843    };
844
845    if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() {
846        return None;
847    }
848
849    Some(quiche::ConnectionId::from_ref(&token[addr.len()..]))
850}
851
852/**
853 * Parse pseudo-header path for CONNECT UDP to SocketAddr
854 */
855fn path_to_socketaddr(path: &[u8]) -> Option<net::SocketAddr> {
856    // for now, let's assume path pattern is "/something.../target-host/target-port/"
857    let mut split_iter = std::io::BufRead::split(path, b'/');
858    let mut second_last = None;
859    let mut last = None;
860    while let Some(curr) = split_iter.next() {
861        if let Ok(curr) = curr {
862            second_last = last;
863            last = Some(curr);
864        } else {
865            return None
866        }
867    }
868    if second_last.is_some() && last.is_some() {
869        let second_last = second_last.unwrap();
870        let last = last.unwrap();
871        let second_last = std::str::from_utf8(&second_last);
872        let last = std::str::from_utf8(&last);
873        if second_last.is_ok() && last.is_ok() {
874            let url_str = format!("scheme://{}:{}/", second_last.unwrap(), last.unwrap());
875            let url = url::Url::parse(&url_str);
876            if let Ok(url) = url {
877                let socket_addrs = url.to_socket_addrs();
878                if let Ok(mut socket_addrs) = socket_addrs {
879                    return socket_addrs.next()
880                }
881            }
882        }
883    }
884
885    None
886}