1use crate::buffer::WriteBuffer;
2use crate::stream::{ReadStream, StartError as StreamStartError, Stream, StreamType};
3
4#[cfg(feature = "msquic-2-5")]
5use msquic_v2_5 as msquic;
6#[cfg(feature = "msquic-seera")]
7use seera_msquic as msquic;
8
9use std::collections::VecDeque;
10use std::fs::File;
11use std::future::Future;
12use std::io::{Seek, SeekFrom, Write};
13use std::net::SocketAddr;
14use std::ops::Deref;
15use std::pin::Pin;
16use std::sync::{Arc, Mutex};
17use std::task::{Context, Poll, Waker};
18
19use bytes::Bytes;
20use libc::c_void;
21use msquic::ffi::QUIC_TLS_SECRETS__bindgen_ty_1;
22use thiserror::Error;
23use tracing::{info, trace};
24
25#[derive(Clone)]
26pub struct Connection(Arc<ConnectionInstance>);
27
28impl Connection {
29 pub fn new(registration: &msquic::Registration) -> Result<Self, ConnectionError> {
33 let inner = Arc::new(ConnectionInner::new(ConnectionState::Open, None, None));
34 let inner_in_ev = inner.clone();
35 let msquic_conn = msquic::Connection::open(registration, move |conn_ref, ev| {
36 inner_in_ev.callback_handler_impl(conn_ref, ev)
37 })
38 .map_err(ConnectionError::OtherError)?;
39 let instance = Arc::new(ConnectionInstance { inner, msquic_conn });
40 trace!(
41 "ConnectionInstance({:p}, Inner: {:p}) Open by local",
42 instance,
43 instance.inner
44 );
45 Ok(Self(instance))
46 }
47
48 pub(crate) fn from_raw(
49 #[cfg(feature = "msquic-2-5")] handle: msquic::ffi::HQUIC,
50 #[cfg(not(feature = "msquic-2-5"))] msquic_conn: msquic::Connection,
51 tls_secrets: Option<Box<msquic::ffi::QUIC_TLS_SECRETS>>,
52 sslkeylog_file: Option<File>,
53 ) -> Self {
54 #[cfg(feature = "msquic-2-5")]
55 let msquic_conn = unsafe { msquic::Connection::from_raw(handle) };
56 let inner = Arc::new(ConnectionInner::new(
57 ConnectionState::Connected,
58 tls_secrets,
59 sslkeylog_file,
60 ));
61 let inner_in_ev = inner.clone();
62 msquic_conn.set_callback_handler(move |conn_ref, ev| {
63 inner_in_ev.callback_handler_impl(conn_ref, ev)
64 });
65 let instance = Arc::new(ConnectionInstance { inner, msquic_conn });
66 trace!(
67 "ConnectionInstance({:p}, Inner: {:p}) Open by peer",
68 instance,
69 instance.inner
70 );
71 Self(instance)
72 }
73
74 pub fn start<'a>(
76 &'a self,
77 configuration: &'a msquic::Configuration,
78 host: &'a str,
79 port: u16,
80 ) -> ConnectionStart<'a> {
81 ConnectionStart {
82 conn: self,
83 configuration,
84 host,
85 port,
86 }
87 }
88
89 fn poll_start(
91 &self,
92 cx: &mut Context<'_>,
93 configuration: &msquic::Configuration,
94 host: &str,
95 port: u16,
96 ) -> Poll<Result<(), StartError>> {
97 let mut exclusive = self.0.exclusive.lock().unwrap();
98 match exclusive.state {
99 ConnectionState::Open => {
100 self.0
101 .msquic_conn
102 .start(configuration, host, port)
103 .map_err(StartError::OtherError)?;
104 exclusive.state = ConnectionState::Connecting;
105 }
106 ConnectionState::Connecting => {}
107 ConnectionState::Connected => return Poll::Ready(Ok(())),
108 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
109 return Poll::Ready(Err(StartError::ConnectionLost(
110 exclusive.error.as_ref().expect("error").clone(),
111 )));
112 }
113 }
114 exclusive.start_waiters.push(cx.waker().clone());
115 Poll::Pending
116 }
117
118 pub fn open_outbound_stream(
120 &self,
121 stream_type: StreamType,
122 fail_on_blocked: bool,
123 ) -> OpenOutboundStream<'_> {
124 OpenOutboundStream {
125 conn: &self.0,
126 stream_type: Some(stream_type),
127 stream: None,
128 fail_on_blocked,
129 }
130 }
131
132 pub fn accept_inbound_stream(&self) -> AcceptInboundStream<'_> {
134 AcceptInboundStream { conn: self }
135 }
136
137 pub fn poll_accept_inbound_stream(
139 &self,
140 cx: &mut Context<'_>,
141 ) -> Poll<Result<Stream, StreamStartError>> {
142 let mut exclusive = self.0.exclusive.lock().unwrap();
143 match exclusive.state {
144 ConnectionState::Open => {
145 return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
146 }
147 ConnectionState::Connecting => {
148 exclusive.start_waiters.push(cx.waker().clone());
149 return Poll::Pending;
150 }
151 ConnectionState::Connected => {}
152 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
153 return Poll::Ready(Err(StreamStartError::ConnectionLost(
154 exclusive.error.as_ref().expect("error").clone(),
155 )));
156 }
157 }
158
159 if !exclusive.inbound_streams.is_empty() {
160 return Poll::Ready(Ok(exclusive.inbound_streams.pop_front().unwrap()));
161 }
162 exclusive.inbound_stream_waiters.push(cx.waker().clone());
163 Poll::Pending
164 }
165
166 pub fn accept_inbound_uni_stream(&self) -> AcceptInboundUniStream<'_> {
168 AcceptInboundUniStream { conn: self }
169 }
170
171 pub fn poll_accept_inbound_uni_stream(
173 &self,
174 cx: &mut Context<'_>,
175 ) -> Poll<Result<ReadStream, StreamStartError>> {
176 let mut exclusive = self.0.exclusive.lock().unwrap();
177 match exclusive.state {
178 ConnectionState::Open => {
179 return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
180 }
181 ConnectionState::Connecting => {
182 exclusive.start_waiters.push(cx.waker().clone());
183 return Poll::Pending;
184 }
185 ConnectionState::Connected => {}
186 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
187 return Poll::Ready(Err(StreamStartError::ConnectionLost(
188 exclusive.error.as_ref().expect("error").clone(),
189 )));
190 }
191 }
192
193 if !exclusive.inbound_uni_streams.is_empty() {
194 return Poll::Ready(Ok(exclusive.inbound_uni_streams.pop_front().unwrap()));
195 }
196 exclusive
197 .inbound_uni_stream_waiters
198 .push(cx.waker().clone());
199 Poll::Pending
200 }
201
202 pub fn poll_receive_datagram(
204 &self,
205 cx: &mut Context<'_>,
206 ) -> Poll<Result<Bytes, DgramReceiveError>> {
207 let mut exclusive = self.0.exclusive.lock().unwrap();
208 match exclusive.state {
209 ConnectionState::Open => {
210 return Poll::Ready(Err(DgramReceiveError::ConnectionNotStarted));
211 }
212 ConnectionState::Connecting => {
213 exclusive.start_waiters.push(cx.waker().clone());
214 return Poll::Pending;
215 }
216 ConnectionState::Connected => {}
217 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
218 return Poll::Ready(Err(DgramReceiveError::ConnectionLost(
219 exclusive.error.as_ref().expect("error").clone(),
220 )));
221 }
222 }
223
224 if let Some(buf) = exclusive.recv_buffers.pop_front() {
225 Poll::Ready(Ok(buf))
226 } else {
227 exclusive.recv_waiters.push(cx.waker().clone());
228 Poll::Pending
229 }
230 }
231
232 pub fn poll_send_datagram(
234 &self,
235 cx: &mut Context<'_>,
236 buf: &Bytes,
237 ) -> Poll<Result<(), DgramSendError>> {
238 let mut exclusive = self.0.exclusive.lock().unwrap();
239 match exclusive.state {
240 ConnectionState::Open => {
241 return Poll::Ready(Err(DgramSendError::ConnectionNotStarted));
242 }
243 ConnectionState::Connecting => {
244 exclusive.start_waiters.push(cx.waker().clone());
245 return Poll::Pending;
246 }
247 ConnectionState::Connected => {}
248 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
249 return Poll::Ready(Err(DgramSendError::ConnectionLost(
250 exclusive.error.as_ref().expect("error").clone(),
251 )));
252 }
253 }
254
255 let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
256 let _ = write_buf.put_zerocopy(buf);
257 let buffers = unsafe {
258 let (data, len) = write_buf.get_buffers();
259 std::slice::from_raw_parts(data, len)
260 };
261 let res = unsafe {
262 self.0.msquic_conn.datagram_send(
263 buffers,
264 msquic::SendFlags::NONE,
265 write_buf.into_raw() as *const _,
266 )
267 }
268 .map_err(DgramSendError::OtherError);
269 Poll::Ready(res)
270 }
271
272 pub fn send_datagram(&self, buf: &Bytes) -> Result<(), DgramSendError> {
274 let mut exclusive = self.0.exclusive.lock().unwrap();
275 match exclusive.state {
276 ConnectionState::Open => {
277 return Err(DgramSendError::ConnectionNotStarted);
278 }
279 ConnectionState::Connecting => {
280 return Err(DgramSendError::ConnectionNotStarted);
281 }
282 ConnectionState::Connected => {}
283 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
284 return Err(DgramSendError::ConnectionLost(
285 exclusive.error.as_ref().expect("error").clone(),
286 ));
287 }
288 }
289
290 if !exclusive.dgram_send_enabled {
291 return Err(DgramSendError::Denied);
292 }
293 if buf.len() > exclusive.dgram_max_send_length as usize {
294 return Err(DgramSendError::TooBig);
295 }
296
297 let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
298 let _ = write_buf.put_zerocopy(buf);
299 let buffers = unsafe {
300 let (data, len) = write_buf.get_buffers();
301 std::slice::from_raw_parts(data, len)
302 };
303 unsafe {
304 self.0.msquic_conn.datagram_send(
305 buffers,
306 msquic::SendFlags::NONE,
307 write_buf.into_raw() as *const _,
308 )
309 }
310 .map_err(DgramSendError::OtherError)?;
311 Ok(())
312 }
313
314 pub fn poll_shutdown(
316 &self,
317 cx: &mut Context<'_>,
318 error_code: u64,
319 ) -> Poll<Result<(), ShutdownError>> {
320 let mut exclusive = self.0.exclusive.lock().unwrap();
321 match exclusive.state {
322 ConnectionState::Open => {
323 return Poll::Ready(Err(ShutdownError::ConnectionNotStarted));
324 }
325 ConnectionState::Connecting => {
326 exclusive.start_waiters.push(cx.waker().clone());
327 return Poll::Pending;
328 }
329 ConnectionState::Connected => {
330 self.0
331 .msquic_conn
332 .shutdown(msquic::ConnectionShutdownFlags::NONE, error_code);
333 exclusive.state = ConnectionState::Shutdown;
334 }
335 ConnectionState::Shutdown => {}
336 ConnectionState::ShutdownComplete => {
337 if let Some(ConnectionError::ShutdownByLocal) = &exclusive.error {
338 return Poll::Ready(Ok(()));
339 } else {
340 return Poll::Ready(Err(ShutdownError::ConnectionLost(
341 exclusive.error.as_ref().expect("error").clone(),
342 )));
343 }
344 }
345 }
346
347 exclusive.shutdown_waiters.push(cx.waker().clone());
348 Poll::Pending
349 }
350
351 pub fn shutdown(&self, error_code: u64) -> Result<(), ShutdownError> {
353 let mut exclusive = self.0.exclusive.lock().unwrap();
354 match exclusive.state {
355 ConnectionState::Open | ConnectionState::Connecting => {
356 return Err(ShutdownError::ConnectionNotStarted);
357 }
358 ConnectionState::Connected => {
359 self.0
360 .msquic_conn
361 .shutdown(msquic::ConnectionShutdownFlags::NONE, error_code);
362 exclusive.state = ConnectionState::Shutdown;
363 }
364 _ => {}
365 }
366 Ok(())
367 }
368
369 pub fn get_local_addr(&self) -> Result<SocketAddr, ConnectionError> {
371 self.0
372 .msquic_conn
373 .get_local_addr()
374 .map(|addr| addr.as_socket().expect("socket addr"))
375 .map_err(ConnectionError::OtherError)
376 }
377
378 pub fn get_remote_addr(&self) -> Result<SocketAddr, ConnectionError> {
380 self.0
381 .msquic_conn
382 .get_remote_addr()
383 .map(|addr| addr.as_socket().expect("socket addr"))
384 .map_err(ConnectionError::OtherError)
385 }
386
387 pub fn set_share_binding(&self, share: bool) -> Result<(), ConnectionError> {
389 let share: u8 = if share { 1 } else { 0 };
390 unsafe {
391 msquic::Api::set_param(
392 self.0.msquic_conn.as_raw(),
393 msquic::ffi::QUIC_PARAM_CONN_SHARE_UDP_BINDING,
394 std::mem::size_of::<u8>() as u32,
395 &share as *const _ as *const _,
396 )
397 }
398 .map_err(ConnectionError::OtherError)
399 }
400
401 #[cfg(feature = "msquic-seera")]
403 pub fn add_path(
404 &self,
405 local_addr: SocketAddr,
406 remote_addr: SocketAddr,
407 ) -> Result<(), ConnectionError> {
408 unsafe {
409 msquic::Api::set_param(
410 self.0.msquic_conn.as_raw(),
411 msquic::ffi::QUIC_PARAM_CONN_ADD_PATH,
412 std::mem::size_of::<msquic::ffi::QUIC_PATH_PARAM>() as u32,
413 &msquic::ffi::QUIC_PATH_PARAM {
414 LocalAddress: &mut msquic::Addr::from(local_addr) as *mut _ as *mut _,
415 RemoteAddress: &mut msquic::Addr::from(remote_addr) as *mut _ as *mut _,
416 } as *const _ as *const _,
417 )
418 }
419 .map_err(ConnectionError::OtherError)
420 }
421
422 #[cfg(feature = "msquic-seera")]
424 pub fn activate_path(
425 &self,
426 local_addr: SocketAddr,
427 remote_addr: SocketAddr,
428 ) -> Result<(), ConnectionError> {
429 unsafe {
430 msquic::Api::set_param(
431 self.0.msquic_conn.as_raw(),
432 msquic::ffi::QUIC_PARAM_CONN_ACTIVATE_PATH,
433 std::mem::size_of::<msquic::ffi::QUIC_PATH_PARAM>() as u32,
434 &msquic::ffi::QUIC_PATH_PARAM {
435 LocalAddress: &mut msquic::Addr::from(local_addr) as *mut _ as *mut _,
436 RemoteAddress: &mut msquic::Addr::from(remote_addr) as *mut _ as *mut _,
437 } as *const _ as *const _,
438 )
439 }
440 .map_err(ConnectionError::OtherError)
441 }
442
443 #[cfg(feature = "msquic-seera")]
445 pub fn remove_path(
446 &self,
447 local_addr: SocketAddr,
448 remote_addr: SocketAddr,
449 ) -> Result<(), ConnectionError> {
450 unsafe {
451 msquic::Api::set_param(
452 self.0.msquic_conn.as_raw(),
453 msquic::ffi::QUIC_PARAM_CONN_REMOVE_PATH,
454 std::mem::size_of::<msquic::ffi::QUIC_PATH_PARAM>() as u32,
455 &msquic::ffi::QUIC_PATH_PARAM {
456 LocalAddress: &mut msquic::Addr::from(local_addr) as *mut _ as *mut _,
457 RemoteAddress: &mut msquic::Addr::from(remote_addr) as *mut _ as *mut _,
458 } as *const _ as *const _,
459 )
460 }
461 .map_err(ConnectionError::OtherError)
462 }
463
464 #[cfg(feature = "msquic-seera")]
466 pub fn add_bound_addr(&self, addr: SocketAddr) -> Result<(), ConnectionError> {
467 unsafe {
468 msquic::Api::set_param(
469 self.0.msquic_conn.as_raw(),
470 msquic::ffi::QUIC_PARAM_CONN_ADD_BOUND_ADDRESS,
471 std::mem::size_of::<msquic::Addr>() as u32,
472 &msquic::Addr::from(addr) as *const _ as *const _,
473 )
474 }
475 .map_err(ConnectionError::OtherError)
476 }
477
478 #[cfg(feature = "msquic-seera")]
480 pub fn add_observed_addr(
481 &self,
482 addr: SocketAddr,
483 observed_addr: SocketAddr,
484 ) -> Result<(), ConnectionError> {
485 unsafe {
486 msquic::Api::set_param(
487 self.0.msquic_conn.as_raw(),
488 msquic::ffi::QUIC_PARAM_CONN_ADD_OBSERVED_ADDRESS,
489 std::mem::size_of::<msquic::ffi::QUIC_ADD_OBSERVED_ADDRESS>() as u32,
490 &msquic::ffi::QUIC_ADD_OBSERVED_ADDRESS {
491 LocalAddress: &mut msquic::Addr::from(addr) as *mut _ as *mut _,
492 ObservedAddress: &mut msquic::Addr::from(observed_addr) as *mut _ as *mut _,
493 } as *const _ as *const _,
494 )
495 }
496 .map_err(ConnectionError::OtherError)
497 }
498
499 #[cfg(feature = "msquic-seera")]
501 pub fn remove_bound_addr(&self, addr: SocketAddr) -> Result<(), ConnectionError> {
502 unsafe {
503 msquic::Api::set_param(
504 self.0.msquic_conn.as_raw(),
505 msquic::ffi::QUIC_PARAM_CONN_REMOVE_BOUND_ADDRESS,
506 std::mem::size_of::<msquic::Addr>() as u32,
507 &msquic::Addr::from(addr) as *const _ as *const _,
508 )
509 }
510 .map_err(ConnectionError::OtherError)
511 }
512
513 #[cfg(feature = "msquic-seera")]
515 pub fn add_candidate_addr(
516 &self,
517 host_addr: SocketAddr,
518 observed_addr: SocketAddr,
519 ) -> Result<(), ConnectionError> {
520 unsafe {
521 msquic::Api::set_param(
522 self.0.msquic_conn.as_raw(),
523 msquic::ffi::QUIC_PARAM_CONN_ADD_CANDIDATE_ADDRESS,
524 std::mem::size_of::<msquic::ffi::QUIC_CANDIDATE_ADDRESS>() as u32,
525 &msquic::ffi::QUIC_CANDIDATE_ADDRESS {
526 HostAddress: &mut msquic::Addr::from(host_addr) as *mut _ as *mut _,
527 ObservedAddress: &mut msquic::Addr::from(observed_addr) as *mut _ as *mut _,
528 } as *const _ as *const _,
529 )
530 }
531 .map_err(ConnectionError::OtherError)
532 }
533
534 #[cfg(feature = "msquic-seera")]
536 pub fn remove_candidate_addr(
537 &self,
538 host_addr: SocketAddr,
539 observed_addr: SocketAddr,
540 ) -> Result<(), ConnectionError> {
541 unsafe {
542 msquic::Api::set_param(
543 self.0.msquic_conn.as_raw(),
544 msquic::ffi::QUIC_PARAM_CONN_REMOVE_CANDIDATE_ADDRESS,
545 std::mem::size_of::<msquic::ffi::QUIC_CANDIDATE_ADDRESS>() as u32,
546 &msquic::ffi::QUIC_CANDIDATE_ADDRESS {
547 HostAddress: &mut msquic::Addr::from(host_addr) as *mut _ as *mut _,
548 ObservedAddress: &mut msquic::Addr::from(observed_addr) as *mut _ as *mut _,
549 } as *const _ as *const _,
550 )
551 }
552 .map_err(ConnectionError::OtherError)
553 }
554
555 pub fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<ConnectionEvent, EventError>> {
557 let mut exclusive = self.0.exclusive.lock().unwrap();
558 match exclusive.state {
559 ConnectionState::Open => {
560 return Poll::Ready(Err(EventError::ConnectionNotStarted));
561 }
562 ConnectionState::Connecting => {
563 exclusive.start_waiters.push(cx.waker().clone());
564 return Poll::Pending;
565 }
566 ConnectionState::Connected | ConnectionState::Shutdown => {}
567 ConnectionState::ShutdownComplete => {
568 return Poll::Ready(Err(EventError::ConnectionLost(
569 exclusive.error.as_ref().expect("error").clone(),
570 )));
571 }
572 }
573
574 if exclusive.events.is_empty() {
575 exclusive.event_waiters.push(cx.waker().clone());
576 Poll::Pending
577 } else {
578 Poll::Ready(Ok(exclusive.events.pop_front().unwrap()))
579 }
580 }
581
582 pub fn set_sslkeylog_file(&self, file: File) -> Result<(), ConnectionError> {
584 let mut exclusive = self.0.exclusive.lock().unwrap();
585 if exclusive.sslkeylog_file.is_some() {
586 return Err(ConnectionError::SslKeyLogFileAlreadySet);
587 }
588 if exclusive.tls_secrets.is_none() {
589 exclusive.tls_secrets = Some(Box::new(msquic::ffi::QUIC_TLS_SECRETS {
590 SecretLength: 0,
591 ClientRandom: [0; 32],
592 IsSet: QUIC_TLS_SECRETS__bindgen_ty_1 {
593 _bitfield_align_1: [0; 0],
594 _bitfield_1: QUIC_TLS_SECRETS__bindgen_ty_1::new_bitfield_1(
595 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
596 ),
597 },
598 ClientEarlyTrafficSecret: [0; 64],
599 ClientHandshakeTrafficSecret: [0; 64],
600 ServerHandshakeTrafficSecret: [0; 64],
601 ClientTrafficSecret0: [0; 64],
602 ServerTrafficSecret0: [0; 64],
603 }));
604 unsafe {
605 msquic::Api::set_param(
606 self.0.msquic_conn.as_raw(),
607 msquic::ffi::QUIC_PARAM_CONN_TLS_SECRETS,
608 std::mem::size_of::<msquic::ffi::QUIC_TLS_SECRETS>() as u32,
609 exclusive.tls_secrets.as_ref().unwrap().as_ref() as *const _ as *const _,
610 )
611 }
612 .map_err(ConnectionError::OtherError)?;
613 }
614 exclusive.sslkeylog_file = Some(file);
615 Ok(())
616 }
617}
618
619struct ConnectionInstance {
620 inner: Arc<ConnectionInner>,
621 msquic_conn: msquic::Connection,
622}
623
624impl Deref for ConnectionInstance {
625 type Target = ConnectionInner;
626
627 fn deref(&self) -> &Self::Target {
628 &self.inner
629 }
630}
631
632impl Drop for ConnectionInstance {
633 fn drop(&mut self) {
634 trace!("ConnectionInstance({:p}) dropping", self);
635 }
636}
637
638struct ConnectionInner {
639 exclusive: Mutex<ConnectionInnerExclusive>,
640}
641
642struct ConnectionInnerExclusive {
643 state: ConnectionState,
644 error: Option<ConnectionError>,
645 start_waiters: Vec<Waker>,
646 inbound_stream_waiters: Vec<Waker>,
647 inbound_uni_stream_waiters: Vec<Waker>,
648 inbound_streams: VecDeque<crate::stream::Stream>,
649 inbound_uni_streams: VecDeque<crate::stream::ReadStream>,
650 recv_buffers: VecDeque<Bytes>,
651 recv_waiters: Vec<Waker>,
652 write_pool: Vec<WriteBuffer>,
653 dgram_send_enabled: bool,
654 dgram_max_send_length: u16,
655 shutdown_waiters: Vec<Waker>,
656 events: VecDeque<ConnectionEvent>,
657 event_waiters: Vec<Waker>,
658 sslkeylog_file: Option<File>,
659 tls_secrets: Option<Box<msquic::ffi::QUIC_TLS_SECRETS>>,
660}
661
662impl ConnectionInner {
663 fn new(
664 state: ConnectionState,
665 tls_secrets: Option<Box<msquic::ffi::QUIC_TLS_SECRETS>>,
666 sslkeylog_file: Option<File>,
667 ) -> Self {
668 Self {
669 exclusive: Mutex::new(ConnectionInnerExclusive {
670 state,
671 error: None,
672 start_waiters: Vec::new(),
673 inbound_stream_waiters: Vec::new(),
674 inbound_uni_stream_waiters: Vec::new(),
675 inbound_streams: VecDeque::new(),
676 inbound_uni_streams: VecDeque::new(),
677 recv_buffers: VecDeque::new(),
678 recv_waiters: Vec::new(),
679 write_pool: Vec::new(),
680 dgram_send_enabled: false,
681 dgram_max_send_length: 0,
682 shutdown_waiters: Vec::new(),
683 events: VecDeque::new(),
684 event_waiters: Vec::new(),
685 sslkeylog_file,
686 tls_secrets,
687 }),
688 }
689 }
690
691 fn handle_event_connected(
692 &self,
693 _session_resumed: bool,
694 _negotiated_alpn: &[u8],
695 ) -> Result<(), msquic::Status> {
696 trace!("ConnectionInner({:p}) Connected", self);
697
698 let mut exclusive = self.exclusive.lock().unwrap();
699 match (
700 exclusive.tls_secrets.take(),
701 exclusive.sslkeylog_file.take(),
702 ) {
703 (Some(tls_secrets), Some(mut file)) => {
704 info!("ConnectionInner({:p}) Writing TLS secrets to file", self);
705 let client_random = if tls_secrets.IsSet.ClientRandom() != 0 {
706 hex::encode(tls_secrets.ClientRandom)
707 } else {
708 String::new()
709 };
710
711 let _ = file.seek(SeekFrom::End(0));
712
713 if tls_secrets.IsSet.ClientEarlyTrafficSecret() != 0 {
714 let _ = writeln!(
715 file,
716 "CLIENT_EARLY_TRAFFIC_SECRET {} {}",
717 client_random,
718 hex::encode(
719 &tls_secrets.ClientEarlyTrafficSecret
720 [0..tls_secrets.SecretLength as usize]
721 )
722 );
723 }
724
725 if tls_secrets.IsSet.ClientHandshakeTrafficSecret() != 0 {
726 let _ = writeln!(
727 file,
728 "CLIENT_HANDSHAKE_TRAFFIC_SECRET {} {}",
729 client_random,
730 hex::encode(
731 &tls_secrets.ClientHandshakeTrafficSecret
732 [0..tls_secrets.SecretLength as usize]
733 )
734 );
735 }
736
737 if tls_secrets.IsSet.ServerHandshakeTrafficSecret() != 0 {
738 let _ = writeln!(
739 file,
740 "SERVER_HANDSHAKE_TRAFFIC_SECRET {} {}",
741 client_random,
742 hex::encode(
743 &tls_secrets.ServerHandshakeTrafficSecret
744 [0..tls_secrets.SecretLength as usize]
745 )
746 );
747 }
748
749 if tls_secrets.IsSet.ClientTrafficSecret0() != 0 {
750 let _ = writeln!(
751 file,
752 "CLIENT_TRAFFIC_SECRET_0 {} {}",
753 client_random,
754 hex::encode(
755 &tls_secrets.ClientTrafficSecret0[0..tls_secrets.SecretLength as usize]
756 )
757 );
758 }
759
760 if tls_secrets.IsSet.ServerTrafficSecret0() != 0 {
761 let _ = writeln!(
762 file,
763 "SERVER_TRAFFIC_SECRET_0 {} {}",
764 client_random,
765 hex::encode(
766 &tls_secrets.ServerTrafficSecret0[0..tls_secrets.SecretLength as usize]
767 )
768 );
769 }
770 exclusive.tls_secrets = Some(tls_secrets);
771 }
772 _ => { }
773 }
774 exclusive.state = ConnectionState::Connected;
775 exclusive
776 .start_waiters
777 .drain(..)
778 .for_each(|waker| waker.wake());
779 Ok(())
780 }
781
782 fn handle_event_shutdown_initiated_by_transport(
783 &self,
784 status: msquic::Status,
785 error_code: u64,
786 ) -> Result<(), msquic::Status> {
787 trace!(
788 "ConnectionInner({:p}) Transport shutdown {:?}",
789 self,
790 status
791 );
792
793 let mut exclusive = self.exclusive.lock().unwrap();
794 exclusive.state = ConnectionState::Shutdown;
795 exclusive.error = Some(ConnectionError::ShutdownByTransport(status, error_code));
796 exclusive
797 .start_waiters
798 .drain(..)
799 .for_each(|waker| waker.wake());
800 exclusive
801 .inbound_stream_waiters
802 .drain(..)
803 .for_each(|waker| waker.wake());
804 Ok(())
805 }
806
807 fn handle_event_shutdown_initiated_by_peer(
808 &self,
809 error_code: u64,
810 ) -> Result<(), msquic::Status> {
811 trace!("ConnectionInner({:p}) App shutdown {}", self, error_code);
812
813 let mut exclusive = self.exclusive.lock().unwrap();
814 exclusive.state = ConnectionState::Shutdown;
815 exclusive.error = Some(ConnectionError::ShutdownByPeer(error_code));
816 exclusive
817 .start_waiters
818 .drain(..)
819 .for_each(|waker| waker.wake());
820 exclusive
821 .inbound_stream_waiters
822 .drain(..)
823 .for_each(|waker| waker.wake());
824 Ok(())
825 }
826
827 fn handle_event_shutdown_complete(
828 &self,
829 handshake_completed: bool,
830 peer_acknowledged_shutdown: bool,
831 app_close_in_progress: bool,
832 ) -> Result<(), msquic::Status> {
833 trace!("ConnectionInner({:p}) Shutdown complete: handshake_completed={}, peer_acknowledged_shutdown={}, app_close_in_progress={}",
834 self, handshake_completed, peer_acknowledged_shutdown, app_close_in_progress
835 );
836
837 {
838 let mut exclusive = self.exclusive.lock().unwrap();
839 exclusive.state = ConnectionState::ShutdownComplete;
840 if exclusive.error.is_none() {
841 exclusive.error = Some(ConnectionError::ShutdownByLocal);
842 }
843 exclusive
844 .start_waiters
845 .drain(..)
846 .for_each(|waker| waker.wake());
847 exclusive
848 .inbound_stream_waiters
849 .drain(..)
850 .for_each(|waker| waker.wake());
851 exclusive
852 .shutdown_waiters
853 .drain(..)
854 .for_each(|waker| waker.wake());
855 exclusive
856 .event_waiters
857 .drain(..)
858 .for_each(|waker| waker.wake());
859 }
860 Ok(())
861 }
862
863 fn handle_event_peer_stream_started(
864 &self,
865 stream: msquic::StreamRef,
866 flags: msquic::StreamOpenFlags,
867 ) -> Result<(), msquic::Status> {
868 let stream_type = if (flags & msquic::StreamOpenFlags::UNIDIRECTIONAL)
869 == msquic::StreamOpenFlags::UNIDIRECTIONAL
870 {
871 StreamType::Unidirectional
872 } else {
873 StreamType::Bidirectional
874 };
875 trace!(
876 "ConnectionInner({:p}) Peer stream started {:?}",
877 self,
878 stream_type
879 );
880
881 let stream = Stream::from_raw(unsafe { stream.as_raw() }, stream_type);
882 if (flags & msquic::StreamOpenFlags::UNIDIRECTIONAL)
883 == msquic::StreamOpenFlags::UNIDIRECTIONAL
884 {
885 if let (Some(read_stream), None) = stream.split() {
886 let mut exclusive = self.exclusive.lock().unwrap();
887 exclusive.inbound_uni_streams.push_back(read_stream);
888 exclusive
889 .inbound_uni_stream_waiters
890 .drain(..)
891 .for_each(|waker| waker.wake());
892 } else {
893 unreachable!();
894 }
895 } else {
896 {
897 let mut exclusive = self.exclusive.lock().unwrap();
898 exclusive.inbound_streams.push_back(stream);
899 exclusive
900 .inbound_stream_waiters
901 .drain(..)
902 .for_each(|waker| waker.wake());
903 }
904 }
905
906 Ok(())
907 }
908
909 fn handle_event_streams_available(
910 &self,
911 bidirectional_count: u16,
912 unidirectional_count: u16,
913 ) -> Result<(), msquic::Status> {
914 trace!(
915 "ConnectionInner({:p}) Streams available bidirectional_count:{} unidirectional_count:{}",
916 self,
917 bidirectional_count,
918 unidirectional_count
919 );
920 Ok(())
921 }
922
923 fn handle_event_datagram_state_changed(
924 &self,
925 send_enabled: bool,
926 max_send_length: u16,
927 ) -> Result<(), msquic::Status> {
928 trace!(
929 "ConnectionInner({:p}) Datagram state changed send_enabled:{} max_send_length:{}",
930 self,
931 send_enabled,
932 max_send_length
933 );
934 let mut exclusive = self.exclusive.lock().unwrap();
935 exclusive.dgram_send_enabled = send_enabled;
936 exclusive.dgram_max_send_length = max_send_length;
937 Ok(())
938 }
939
940 fn handle_event_datagram_received(
941 &self,
942 buffer: &msquic::BufferRef,
943 _flags: msquic::ReceiveFlags,
944 ) -> Result<(), msquic::Status> {
945 trace!("ConnectionInner({:p}) Datagram received", self);
946 let buf = Bytes::copy_from_slice(buffer.as_bytes());
947 {
948 let mut exclusive = self.exclusive.lock().unwrap();
949 exclusive.recv_buffers.push_back(buf);
950 exclusive
951 .recv_waiters
952 .drain(..)
953 .for_each(|waker| waker.wake());
954 }
955 Ok(())
956 }
957
958 fn handle_event_datagram_send_state_changed(
959 &self,
960 client_context: *const c_void,
961 state: msquic::DatagramSendState,
962 ) -> Result<(), msquic::Status> {
963 trace!(
964 "ConnectionInner({:p}) Datagram send state changed state:{:?}",
965 self,
966 state
967 );
968 match state {
969 msquic::DatagramSendState::Sent | msquic::DatagramSendState::Canceled => {
970 let mut write_buf = unsafe { WriteBuffer::from_raw(client_context) };
971 let mut exclusive = self.exclusive.lock().unwrap();
972 write_buf.reset();
973 exclusive.write_pool.push(write_buf);
974 }
975 _ => {}
976 }
977 Ok(())
978 }
979
980 #[cfg(feature = "msquic-seera")]
981 fn handle_event_notify_observed_address(
982 &self,
983 local_address: &msquic::Addr,
984 observed_address: &msquic::Addr,
985 ) -> Result<(), msquic::Status> {
986 let local_address = local_address.as_socket().expect("socket addr");
987 let observed_address = observed_address.as_socket().expect("socket addr");
988 trace!(
989 "ConnectionInner({:p}) Notify observed address local_address:{} observed_address:{}",
990 self,
991 local_address,
992 observed_address
993 );
994 let mut exclusive = self.exclusive.lock().unwrap();
995 exclusive
996 .events
997 .push_back(ConnectionEvent::NotifyObservedAddress {
998 local_address,
999 observed_address,
1000 });
1001 exclusive
1002 .event_waiters
1003 .drain(..)
1004 .for_each(|waker| waker.wake());
1005 Ok(())
1006 }
1007
1008 #[cfg(feature = "msquic-seera")]
1009 fn handle_event_notify_remote_address_added(
1010 &self,
1011 address: &msquic::Addr,
1012 sequence_number: u64,
1013 ) -> Result<(), msquic::Status> {
1014 let address = address.as_socket().expect("socket addr");
1015 trace!(
1016 "ConnectionInner({:p}) Notify remote address added address:{} sequence_number:{}",
1017 self,
1018 address,
1019 sequence_number
1020 );
1021 let mut exclusive = self.exclusive.lock().unwrap();
1022 exclusive
1023 .events
1024 .push_back(ConnectionEvent::NotifyRemoteAddressAdded {
1025 address,
1026 sequence_number,
1027 });
1028 exclusive
1029 .event_waiters
1030 .drain(..)
1031 .for_each(|waker| waker.wake());
1032 Ok(())
1033 }
1034
1035 #[cfg(feature = "msquic-seera")]
1036 fn handle_event_path_validated(
1037 &self,
1038 local_address: &msquic::Addr,
1039 remote_address: &msquic::Addr,
1040 ) -> Result<(), msquic::Status> {
1041 let local_address = local_address.as_socket().expect("socket addr");
1042 let remote_address = remote_address.as_socket().expect("socket addr");
1043 trace!(
1044 "ConnectionInner({:p}) path validated local_address:{} remote_address:{}",
1045 self,
1046 local_address,
1047 remote_address
1048 );
1049 let mut exclusive = self.exclusive.lock().unwrap();
1050 exclusive.events.push_back(ConnectionEvent::PathValidated {
1051 local_address,
1052 remote_address,
1053 });
1054 exclusive
1055 .event_waiters
1056 .drain(..)
1057 .for_each(|waker| waker.wake());
1058 Ok(())
1059 }
1060
1061 #[cfg(feature = "msquic-seera")]
1062 fn handle_event_notify_remote_address_removed(
1063 &self,
1064 sequence_number: u64,
1065 ) -> Result<(), msquic::Status> {
1066 trace!(
1067 "ConnectionInner({:p}) Notify remote address removed sequence_number:{}",
1068 self,
1069 sequence_number
1070 );
1071 let mut exclusive = self.exclusive.lock().unwrap();
1072 exclusive
1073 .events
1074 .push_back(ConnectionEvent::NotifyRemoteAddressRemoved { sequence_number });
1075 exclusive
1076 .event_waiters
1077 .drain(..)
1078 .for_each(|waker| waker.wake());
1079 Ok(())
1080 }
1081
1082 fn callback_handler_impl(
1083 &self,
1084 _connection: msquic::ConnectionRef,
1085 ev: msquic::ConnectionEvent,
1086 ) -> Result<(), msquic::Status> {
1087 match ev {
1088 msquic::ConnectionEvent::Connected {
1089 session_resumed,
1090 negotiated_alpn,
1091 } => self.handle_event_connected(session_resumed, negotiated_alpn),
1092 msquic::ConnectionEvent::ShutdownInitiatedByTransport { status, error_code } => {
1093 self.handle_event_shutdown_initiated_by_transport(status, error_code)
1094 }
1095 msquic::ConnectionEvent::ShutdownInitiatedByPeer { error_code } => {
1096 self.handle_event_shutdown_initiated_by_peer(error_code)
1097 }
1098 msquic::ConnectionEvent::ShutdownComplete {
1099 handshake_completed,
1100 peer_acknowledged_shutdown,
1101 app_close_in_progress,
1102 } => self.handle_event_shutdown_complete(
1103 handshake_completed,
1104 peer_acknowledged_shutdown,
1105 app_close_in_progress,
1106 ),
1107 msquic::ConnectionEvent::PeerStreamStarted { stream, flags } => {
1108 self.handle_event_peer_stream_started(stream, flags)
1109 }
1110 msquic::ConnectionEvent::StreamsAvailable {
1111 bidirectional_count,
1112 unidirectional_count,
1113 } => self.handle_event_streams_available(bidirectional_count, unidirectional_count),
1114 msquic::ConnectionEvent::DatagramStateChanged {
1115 send_enabled,
1116 max_send_length,
1117 } => self.handle_event_datagram_state_changed(send_enabled, max_send_length),
1118 msquic::ConnectionEvent::DatagramReceived { buffer, flags } => {
1119 self.handle_event_datagram_received(buffer, flags)
1120 }
1121 msquic::ConnectionEvent::DatagramSendStateChanged {
1122 client_context,
1123 state,
1124 } => self.handle_event_datagram_send_state_changed(client_context, state),
1125 #[cfg(feature = "msquic-seera")]
1126 msquic::ConnectionEvent::NotifyObservedAddress {
1127 local_address,
1128 observed_address,
1129 } => self.handle_event_notify_observed_address(local_address, observed_address),
1130 #[cfg(feature = "msquic-seera")]
1131 msquic::ConnectionEvent::NotifyRemoteAddressAdded {
1132 address,
1133 sequence_number,
1134 } => self.handle_event_notify_remote_address_added(address, sequence_number),
1135 #[cfg(feature = "msquic-seera")]
1136 msquic::ConnectionEvent::PathValidated {
1137 local_address,
1138 remote_address,
1139 } => self.handle_event_path_validated(local_address, remote_address),
1140 #[cfg(feature = "msquic-seera")]
1141 msquic::ConnectionEvent::NotifyRemoteAddressRemoved { sequence_number } => {
1142 self.handle_event_notify_remote_address_removed(sequence_number)
1143 }
1144 _ => {
1145 trace!("ConnectionInner({:p}) Other callback", self);
1146 Ok(())
1147 }
1148 }
1149 }
1150}
1151impl Drop for ConnectionInner {
1152 fn drop(&mut self) {
1153 trace!("ConnectionInner({:p}) dropping", self);
1154 }
1155}
1156
1157#[derive(Debug, PartialEq)]
1158enum ConnectionState {
1159 Open,
1160 Connecting,
1161 Connected,
1162 Shutdown,
1163 ShutdownComplete,
1164}
1165
1166#[derive(Clone, Debug, PartialEq, Eq)]
1168pub enum ConnectionEvent {
1169 NotifyObservedAddress {
1171 local_address: SocketAddr,
1172 observed_address: SocketAddr,
1173 },
1174 NotifyRemoteAddressAdded {
1176 address: SocketAddr,
1177 sequence_number: u64,
1178 },
1179 PathValidated {
1181 local_address: SocketAddr,
1182 remote_address: SocketAddr,
1183 },
1184 NotifyRemoteAddressRemoved { sequence_number: u64 },
1186}
1187
1188#[derive(Debug, Error, Clone)]
1190pub enum ConnectionError {
1191 #[error("connection shutdown by transport: status {0:?}, error 0x{1:x}")]
1192 ShutdownByTransport(msquic::Status, u64),
1193 #[error("connection shutdown by peer: error 0x{0:x}")]
1194 ShutdownByPeer(u64),
1195 #[error("connection shutdown by local")]
1196 ShutdownByLocal,
1197 #[error("connection closed")]
1198 ConnectionClosed,
1199 #[error("SSL key log file already set")]
1200 SslKeyLogFileAlreadySet,
1201 #[error("other error: status {0:?}")]
1202 OtherError(msquic::Status),
1203}
1204
1205#[derive(Debug, Error, Clone)]
1207pub enum DgramReceiveError {
1208 #[error("connection not started yet")]
1209 ConnectionNotStarted,
1210 #[error("connection lost")]
1211 ConnectionLost(#[from] ConnectionError),
1212 #[error("other error: status {0:?}")]
1213 OtherError(msquic::Status),
1214}
1215
1216#[derive(Debug, Error, Clone)]
1218pub enum DgramSendError {
1219 #[error("connection not started yet")]
1220 ConnectionNotStarted,
1221 #[error("not allowed for sending dgram")]
1222 Denied,
1223 #[error("exceeded maximum data size for sending dgram")]
1224 TooBig,
1225 #[error("connection lost")]
1226 ConnectionLost(#[from] ConnectionError),
1227 #[error("other error: status {0:?}")]
1228 OtherError(msquic::Status),
1229}
1230
1231#[derive(Debug, Error, Clone)]
1233pub enum StartError {
1234 #[error("connection lost")]
1235 ConnectionLost(#[from] ConnectionError),
1236 #[error("other error: status {0:?}")]
1237 OtherError(msquic::Status),
1238}
1239
1240#[derive(Debug, Error, Clone)]
1242pub enum ShutdownError {
1243 #[error("connection not started yet")]
1244 ConnectionNotStarted,
1245 #[error("connection lost")]
1246 ConnectionLost(#[from] ConnectionError),
1247 #[error("other error: status {0:?}")]
1248 OtherError(msquic::Status),
1249}
1250
1251#[derive(Debug, Error, Clone)]
1253pub enum EventError {
1254 #[error("connection not started yet")]
1255 ConnectionNotStarted,
1256 #[error("connection lost")]
1257 ConnectionLost(#[from] ConnectionError),
1258 #[error("other error: status {0:?}")]
1259 OtherError(msquic::Status),
1260}
1261
1262pub struct ConnectionStart<'a> {
1264 conn: &'a Connection,
1265 configuration: &'a msquic::Configuration,
1266 host: &'a str,
1267 port: u16,
1268}
1269
1270impl Future for ConnectionStart<'_> {
1271 type Output = Result<(), StartError>;
1272
1273 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1274 self.conn
1275 .poll_start(cx, self.configuration, self.host, self.port)
1276 }
1277}
1278
1279pub struct OpenOutboundStream<'a> {
1281 conn: &'a ConnectionInstance,
1282 stream_type: Option<crate::stream::StreamType>,
1283 stream: Option<crate::stream::Stream>,
1284 fail_on_blocked: bool,
1285}
1286
1287impl Future for OpenOutboundStream<'_> {
1288 type Output = Result<crate::stream::Stream, StreamStartError>;
1289
1290 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1291 let this = self.get_mut();
1292 let OpenOutboundStream {
1293 conn,
1294 ref mut stream_type,
1295 ref mut stream,
1296 fail_on_blocked: fail_blocked,
1297 ..
1298 } = *this;
1299
1300 let mut exclusive = conn.inner.exclusive.lock().unwrap();
1301 match exclusive.state {
1302 ConnectionState::Open => {
1303 return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
1304 }
1305 ConnectionState::Connecting => {
1306 exclusive.start_waiters.push(cx.waker().clone());
1307 return Poll::Pending;
1308 }
1309 ConnectionState::Connected => {}
1310 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
1311 return Poll::Ready(Err(StreamStartError::ConnectionLost(
1312 exclusive.error.as_ref().expect("error").clone(),
1313 )));
1314 }
1315 }
1316 if stream.is_none() {
1317 match Stream::open(&conn.msquic_conn, stream_type.take().unwrap()) {
1318 Ok(new_stream) => {
1319 *stream = Some(new_stream);
1320 }
1321 Err(e) => return Poll::Ready(Err(e)),
1322 }
1323 }
1324 stream
1325 .as_mut()
1326 .unwrap()
1327 .poll_start(cx, fail_blocked)
1328 .map(|res| res.map(|_| stream.take().unwrap()))
1329 }
1330}
1331
1332pub struct AcceptInboundStream<'a> {
1334 conn: &'a Connection,
1335}
1336
1337impl Future for AcceptInboundStream<'_> {
1338 type Output = Result<Stream, StreamStartError>;
1339
1340 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1341 self.conn.poll_accept_inbound_stream(cx)
1342 }
1343}
1344
1345pub struct AcceptInboundUniStream<'a> {
1347 conn: &'a Connection,
1348}
1349
1350impl Future for AcceptInboundUniStream<'_> {
1351 type Output = Result<ReadStream, StreamStartError>;
1352
1353 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1354 self.conn.poll_accept_inbound_uni_stream(cx)
1355 }
1356}