h3_datagram/
datagram_handler.rs1use std::{error::Error, fmt::Display, future::poll_fn, marker::PhantomData, sync::Arc};
5
6use crate::{
7 datagram::Datagram,
8 quic_traits::{DatagramConnectionExt, RecvDatagram, SendDatagram, SendDatagramErrorIncoming},
9};
10use bytes::Buf;
11use h3::{
12 error::{connection_error_creators::CloseStream, ConnectionError, StreamError},
13 quic::{self, StreamId},
14 ConnectionState, SharedState,
15};
16
17#[derive(Debug)]
19pub struct DatagramSender<H: SendDatagram<B>, B: Buf> {
20 pub(crate) handler: H,
21 pub(crate) _marker: PhantomData<B>,
22 pub(crate) shared_state: Arc<SharedState>,
23 pub(crate) stream_id: StreamId,
24}
25
26impl<H, B> ConnectionState for DatagramSender<H, B>
27where
28 H: SendDatagram<B>,
29 B: Buf,
30{
31 fn shared_state(&self) -> &SharedState {
32 self.shared_state.as_ref()
33 }
34}
35
36impl<H, B> DatagramSender<H, B>
37where
38 H: SendDatagram<B>,
39 B: Buf,
40{
41 pub fn send_datagram(&mut self, data: B) -> Result<(), SendDatagramError> {
43 let encoded_datagram = Datagram::new(self.stream_id, data);
44 match self.handler.send_datagram(encoded_datagram.encode()) {
45 Ok(()) => Ok(()),
46 Err(e) => Err(self.handle_send_datagram_error(e)),
47 }
48 }
49
50 fn handle_send_datagram_error(
51 &mut self,
52 error: SendDatagramErrorIncoming,
53 ) -> SendDatagramError {
54 match error {
55 SendDatagramErrorIncoming::NotAvailable => SendDatagramError::NotAvailable,
56 SendDatagramErrorIncoming::TooLarge => SendDatagramError::TooLarge,
57 SendDatagramErrorIncoming::ConnectionError(error) => {
58 self.set_conn_error_and_wake(error.clone());
59 SendDatagramError::ConnectionError(ConnectionError::Remote(error))
60 }
61 }
62 }
63}
64
65#[derive(Debug)]
66pub struct DatagramReader<H: RecvDatagram> {
67 pub(crate) handler: H,
68 pub(crate) shared_state: Arc<SharedState>,
69}
70
71impl<H> ConnectionState for DatagramReader<H>
72where
73 H: RecvDatagram,
74{
75 fn shared_state(&self) -> &SharedState {
76 self.shared_state.as_ref()
77 }
78}
79
80impl<H> CloseStream for DatagramReader<H> where H: RecvDatagram {}
81
82impl<H> DatagramReader<H>
83where
84 H: RecvDatagram,
85{
86 pub async fn read_datagram(&mut self) -> Result<Datagram<H::Buffer>, StreamError> {
88 match poll_fn(|cx| self.handler.poll_incoming_datagram(cx)).await {
89 Ok(datagram) => Datagram::decode(datagram)
90 .map_err(|err| self.handle_connection_error_on_stream(err)),
91 Err(err) => Err(self.handle_quic_stream_error(
92 quic::StreamErrorIncoming::ConnectionErrorIncoming {
93 connection_error: err,
94 },
95 )),
96 }
97 }
98}
99
100pub trait HandleDatagramsExt<C, B>: ConnectionState
101where
102 B: Buf,
103 C: quic::Connection<B> + DatagramConnectionExt<B>,
104{
105 fn get_datagram_sender(&self, stream_id: StreamId)
107 -> DatagramSender<C::SendDatagramHandler, B>;
108 fn get_datagram_reader(&self) -> DatagramReader<C::RecvDatagramHandler>;
110}
111
112#[derive(Debug)]
114#[non_exhaustive]
115pub enum SendDatagramError {
116 #[non_exhaustive]
120 NotAvailable,
121 #[non_exhaustive]
123 TooLarge,
124 #[non_exhaustive]
126 ConnectionError(ConnectionError),
127}
128
129impl Display for SendDatagramError {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 match self {
132 SendDatagramError::NotAvailable => write!(f, "Datagrams are not available"),
133 SendDatagramError::TooLarge => write!(f, "Datagram is too large"),
134 SendDatagramError::ConnectionError(e) => write!(f, "Connection error: {}", e),
135 }
136 }
137}
138
139impl Error for SendDatagramError {}