s2n_quic/connection/
handle.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4macro_rules! impl_handle_api {
5    (| $handle:ident, $dispatch:ident | $dispatch_body:expr) => {
6        /// Opens a new [`LocalStream`](`crate::stream::LocalStream`) with a specific type
7        ///
8        /// The method will return
9        ///  - `Ok(stream)` if a stream of the requested type was opened
10        ///  - `Err(stream_error)` if the stream could not be opened due to an error
11        ///
12        /// # Examples
13        ///
14        /// ```rust,no_run
15        /// # async fn test() -> s2n_quic::connection::Result<()> {
16        /// #   use s2n_quic::stream;
17        /// #   let mut handle: s2n_quic::connection::Handle = todo!();
18        /// #
19        /// while let Ok(stream) = handle.open_stream(stream::Type::Bidirectional).await {
20        ///     println!("Stream opened from {:?}", stream.connection().remote_addr());
21        /// }
22        /// #
23        /// #   Ok(())
24        /// # }
25        /// ```
26        #[inline]
27        pub async fn open_stream(
28            &mut self,
29            stream_type: $crate::stream::Type,
30        ) -> $crate::connection::Result<$crate::stream::LocalStream> {
31            futures::future::poll_fn(|cx| self.poll_open_stream(stream_type, cx)).await
32        }
33
34        /// Polls opening a [`LocalStream`](`crate::stream::LocalStream`) with a specific type
35        ///
36        /// The method will return
37        /// - `Poll::Ready(Ok(stream))` if a stream of the requested type was opened
38        /// - `Poll::Ready(Err(stream_error))` if the stream could not be opened due to an error
39        /// - `Poll::Pending` if the stream has not been opened yet
40        #[inline]
41        pub fn poll_open_stream(
42            &mut self,
43            stream_type: $crate::stream::Type,
44            cx: &mut core::task::Context,
45        ) -> core::task::Poll<$crate::connection::Result<$crate::stream::LocalStream>> {
46            s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
47                use s2n_quic_core::stream::StreamType;
48                use $crate::stream::{BidirectionalStream, SendStream};
49
50                Ok(
51                    match core::task::ready!(self.0.poll_open_stream(stream_type, cx))? {
52                        stream if stream_type == StreamType::Unidirectional => {
53                            SendStream::new(stream.into()).into()
54                        }
55                        stream => BidirectionalStream::new(stream).into(),
56                    },
57                )
58                .into()
59            })
60        }
61
62        /// Opens a new [`BidirectionalStream`](`crate::stream::BidirectionalStream`)
63        ///
64        /// The method will return
65        ///  - `Ok(stream)` if a bidirectional stream was opened
66        ///  - `Err(stream_error)` if the stream could not be opened due to an error
67        ///
68        /// # Examples
69        ///
70        /// ```rust,no_run
71        /// # async fn test() -> s2n_quic::connection::Result<()> {
72        /// #   let mut handle: s2n_quic::connection::Handle = todo!();
73        /// #
74        /// while let Ok(mut stream) = handle.open_bidirectional_stream().await {
75        ///     println!("Stream opened from {:?}", stream.connection().remote_addr());
76        /// }
77        /// #
78        /// #   Ok(())
79        /// # }
80        /// ```
81        #[inline]
82        pub async fn open_bidirectional_stream(
83            &mut self,
84        ) -> $crate::connection::Result<$crate::stream::BidirectionalStream> {
85            futures::future::poll_fn(|cx| self.poll_open_bidirectional_stream(cx)).await
86        }
87
88        /// Polls opening a [`BidirectionalStream`](`crate::stream::BidirectionalStream`)
89        ///
90        /// The method will return
91        /// - `Poll::Ready(Ok(stream))` if a bidirectional stream was opened
92        /// - `Poll::Ready(Err(stream_error))` if the stream could not be opened due to an error
93        /// - `Poll::Pending` if the stream has not been opened yet
94        #[inline]
95        pub fn poll_open_bidirectional_stream(
96            &mut self,
97            cx: &mut core::task::Context,
98        ) -> core::task::Poll<$crate::connection::Result<$crate::stream::BidirectionalStream>> {
99            s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
100                use s2n_quic_core::stream::StreamType;
101                use $crate::stream::BidirectionalStream;
102
103                let stream =
104                    core::task::ready!(self.0.poll_open_stream(StreamType::Bidirectional, cx))?;
105
106                Ok(BidirectionalStream::new(stream)).into()
107            })
108        }
109
110        /// Opens a [`SendStream`](`crate::stream::SendStream`)
111        ///
112        /// # Examples
113        ///
114        /// ```rust,no_run
115        /// # async fn test() -> s2n_quic::connection::Result<()> {
116        /// #   let mut connection: s2n_quic::connection::Handle = todo!();
117        /// #
118        /// let stream = connection.open_send_stream().await?;
119        /// println!("Send stream opened with id: {}", stream.id());
120        /// #
121        /// #   Ok(())
122        /// # }
123        /// ```
124        #[inline]
125        pub async fn open_send_stream(
126            &mut self,
127        ) -> $crate::connection::Result<$crate::stream::SendStream> {
128            futures::future::poll_fn(|cx| self.poll_open_send_stream(cx)).await
129        }
130
131        /// Polls opening a [`SendStream`](`crate::stream::SendStream`)
132        #[inline]
133        pub fn poll_open_send_stream(
134            &mut self,
135            cx: &mut core::task::Context,
136        ) -> core::task::Poll<$crate::connection::Result<$crate::stream::SendStream>> {
137            s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
138                use s2n_quic_core::stream::StreamType;
139                use $crate::stream::SendStream;
140
141                let stream =
142                    core::task::ready!(self.0.poll_open_stream(StreamType::Unidirectional, cx))?;
143
144                Ok(SendStream::new(stream.into())).into()
145            })
146        }
147
148        /// Returns the local address that this connection is bound to.
149        #[inline]
150        pub fn local_addr(&self) -> $crate::connection::Result<std::net::SocketAddr> {
151            self.0.local_address().map(std::net::SocketAddr::from)
152        }
153
154        /// Returns the remote address that this connection is connected to.
155        #[inline]
156        pub fn remote_addr(&self) -> $crate::connection::Result<std::net::SocketAddr> {
157            self.0.remote_address().map(std::net::SocketAddr::from)
158        }
159
160        /// Returns the negotiated server name the connection is using.
161        #[inline]
162        pub fn server_name(&self) -> $crate::connection::Result<Option<$crate::server::Name>> {
163            self.0.server_name()
164        }
165
166        /// Returns the negotiated application protocol the connection is using
167        #[inline]
168        pub fn application_protocol(&self) -> $crate::connection::Result<::bytes::Bytes> {
169            self.0.application_protocol()
170        }
171
172        /// Takes the context provided by the TLS provider.
173        ///
174        /// This functionality is useful when you need to pass information from the TLS provider to the
175        /// application. This could include things like certificate information or application-specific data.
176        ///
177        /// Calling this function a second time will always return `None` so applications should
178        /// store the context elsewhere if it is needed in multiple locations.
179        #[inline]
180        pub fn take_tls_context(&mut self) -> Option<std::boxed::Box<dyn core::any::Any + Send>> {
181            self.0.take_tls_context()
182        }
183
184        /// Returns the internal identifier for the [`Connection`](`crate::Connection`)
185        ///
186        /// Note: This internal identifier is not the same as the connection ID included in packet
187        /// headers as described in [QUIC Transport RFC](https://www.rfc-editor.org/rfc/rfc9000.html#name-connection-id)
188        #[inline]
189        pub fn id(&self) -> u64 {
190            self.0.id()
191        }
192
193        /// Sends a Ping frame to the peer
194        #[inline]
195        pub fn ping(&mut self) -> $crate::connection::Result<()> {
196            self.0.ping()
197        }
198
199        /// Enables or disables the connection to actively keep the connection alive with the peer
200        ///
201        /// This can be useful for maintaining connections beyond the configured idle timeout. The
202        /// connection will continue to be held open until the keep alive is disabled or the
203        /// connection is no longer able to be maintained due to connectivity.
204        #[inline]
205        pub fn keep_alive(&mut self, enabled: bool) -> $crate::connection::Result<()> {
206            self.0.keep_alive(enabled)
207        }
208
209        /// Closes the Connection with the provided error code
210        ///
211        /// This will immediately terminate all outstanding streams.
212        ///
213        /// # Examples
214        ///
215        /// ```rust,no_run
216        /// # async fn test() -> s2n_quic::connection::Result<()> {
217        /// #   let mut connection: s2n_quic::connection::Handle = todo!();
218        /// #
219        /// const MY_ERROR_CODE:u32 = 99;
220        /// connection.close(MY_ERROR_CODE.into());
221        /// #
222        /// #   Ok(())
223        /// # }
224        /// ```
225        #[inline]
226        pub fn close(&self, error_code: $crate::application::Error) {
227            self.0.close(error_code)
228        }
229
230        /// API for querying the connection's
231        /// [`Subscriber::ConnectionContext`](crate::provider::event::Subscriber::ConnectionContext).
232        ///
233        /// The ConnectionContext provides a mechanism for users to provide a custom
234        /// type and update it on each event. The query APIs (check
235        /// [`Self::query_event_context_mut`] for mutable version) provide a way to inspect the
236        /// ConnectionContext outside of events.
237        ///
238        /// This function takes a `FnOnce(&EventContext) -> Outcome`, where `EventContext`
239        /// represents the type of `ConnectionContext`. If the `EventContext` type matches
240        /// any of the types of the configured Subscriber's context, the query is executed
241        /// and `Ok(Outcome)` is returned, else
242        /// `Err(`[`query::Error`](s2n_quic_core::query::Error)`)`.
243        ///
244        /// Given that it is possible to compose Subscriber, which can have different
245        /// ConnectionContext types, this function traverses all Subscribers, executes
246        /// and short-circuiting on the first match.
247        ///
248        /// # Examples
249        ///
250        /// ```no_run
251        /// use s2n_quic::{provider::event::{events, query, Subscriber}, Connection, Server};
252        ///
253        /// struct MySubscriber{}
254        ///
255        /// impl Subscriber for MySubscriber {
256        ///     type ConnectionContext = MyEventContext;
257        ///     fn create_connection_context(
258        ///        &mut self, _meta: &events::ConnectionMeta,
259        ///        _info: &events::ConnectionInfo,
260        ///     ) -> Self::ConnectionContext {
261        ///         MyEventContext { request: 0 }
262        ///     }
263        ///  }
264        ///
265        /// #[derive(Clone, Copy)]
266        /// pub struct MyEventContext {
267        ///     request: u64,
268        /// }
269        ///
270        /// let mut server = Server::builder()
271        ///   .with_event(MySubscriber {}).unwrap()
272        ///   .start().unwrap();
273        /// # let connection: Connection = todo!();
274        ///
275        /// let outcome: Result<MyEventContext, query::Error> = connection
276        ///     .query_event_context(|event_context: &MyEventContext| *event_context);
277        ///
278        /// match outcome {
279        ///     Ok(event_context) => {
280        ///         // `MyEventContext` matched a Subscriber::ConnectionContext and the
281        ///         // query executed.
282        ///         //
283        ///         // use the value event_context for logging, etc..
284        ///     }
285        ///     Err(query::Error::ConnectionLockPoisoned) => {
286        ///         // The query did not execute because of a connection error.
287        ///         //
288        ///         // log an error, panic, etc..
289        ///     }
290        ///     Err(query::Error::ContextTypeMismatch) => {
291        ///         // `MyEventContext` failed to match any Subscriber::ConnectionContext
292        ///         // and the query did not execute.
293        ///         //
294        ///         // log an error, panic, etc..
295        ///     }
296        ///     Err(_) => {
297        ///         // We encountered an unknown error so handle it generically, e.g. log,
298        ///         // panic, etc.
299        ///     }
300        /// }
301        /// ```
302        ///
303        /// # Traverse order
304        /// Let's demonstrate the traversal order for matching on ConnectionContext in the
305        /// example below. We provide a composed Subscriber type (Foo, Bar), where both
306        /// Foo and Bar have a ConnectionContext type of `u64`. The query traverse order
307        /// is as follows:
308        /// - `(Foo::ConnectionContext, Bar::ConnectionContext)`
309        /// - `Foo::ConnectionContext`
310        /// - `Bar::ConnectionContext`
311        ///
312        /// Note: In this example the type `u64` will always match `Foo::u64` and
313        /// `Bar::u64` will never be matched. If this is undesirable, applications should
314        /// make unique associated `ConnectionContext`s by creating new types.
315        ///
316        /// ```no_run
317        /// use s2n_quic::{provider::event::{events, Subscriber}, Connection, Server};
318        ///
319        /// struct Foo {}
320        ///
321        /// impl Subscriber for Foo {
322        ///    type ConnectionContext = u64;
323        ///    fn create_connection_context(
324        ///        &mut self, _meta: &events::ConnectionMeta,
325        ///        _info: &events::ConnectionInfo,
326        ///    ) -> Self::ConnectionContext { 0 }
327        /// }
328        ///
329        /// struct Bar {}
330        ///
331        /// impl Subscriber for Bar {
332        ///    type ConnectionContext = u64;
333        ///    fn create_connection_context(
334        ///        &mut self, _meta: &events::ConnectionMeta,
335        ///        _info: &events::ConnectionInfo,
336        ///    ) -> Self::ConnectionContext { 0 }
337        /// }
338        ///
339        /// let mut server = Server::builder()
340        ///     .with_event((Foo {}, Bar {})).unwrap()
341        ///     .start().unwrap();
342        /// # let connection: Connection = todo!();
343        ///
344        /// // Matches Foo.
345        /// //
346        /// // Note: Because the `ConnectionContext` type is the same for
347        /// // both `Foo` and `Bar`, only `Foo`'s context will be matched.
348        /// let _ = connection.query_event_context(|ctx: &u64| *ctx );
349        ///
350        /// // Matches (Foo, Bar).
351        /// let _ = connection.query_event_context(|ctx: &(u64, u64)| ctx.0 );
352        /// ```
353        pub fn query_event_context<Query, EventContext, Outcome>(
354            &self,
355            query: Query,
356        ) -> core::result::Result<Outcome, s2n_quic_core::query::Error>
357        where
358            Query: FnOnce(&EventContext) -> Outcome,
359            EventContext: 'static,
360        {
361            use s2n_quic_core::query;
362            let mut query = query::Once::new(query);
363
364            self.0
365                .query_event_context(&mut query)
366                .map_err(|_| query::Error::ConnectionLockPoisoned)?;
367
368            query.into()
369        }
370
371        /// API for querying the connection's
372        /// [`Subscriber::ConnectionContext`](crate::provider::event::Subscriber::ConnectionContext).
373        ///
374        /// Similar to [`Self::query_event_context`] but provides
375        /// mutable access to `ConnectionContext`.
376        ///
377        /// ```ignore
378        /// let outcome = connection
379        ///     .query_event_context(
380        ///         |event_context: &MyEventContext| event_context.request += 1
381        ///     );
382        /// ```
383        pub fn query_event_context_mut<Query, EventContext, Outcome>(
384            &mut self,
385            query: Query,
386        ) -> core::result::Result<Outcome, s2n_quic_core::query::Error>
387        where
388            Query: FnOnce(&mut EventContext) -> Outcome,
389            EventContext: 'static,
390        {
391            use s2n_quic_core::query;
392            let mut query = query::Once::new_mut(query);
393
394            self.0
395                .query_event_context_mut(&mut query)
396                .map_err(|_| query::Error::ConnectionLockPoisoned)?;
397
398            query.into()
399        }
400
401        /// API for querying the connection's datagram endpoint.
402        ///
403        ///  Provides mutable access to `Sender` or `Receiver`.
404        ///
405        /// ```ignore
406        /// let outcome = connection
407        ///     .datagram_mut(
408        ///         |sender: &MySender| sender.send_datagram(Bytes::from_static(&[1, 2, 3]));
409        ///     );
410        /// ```
411        pub fn datagram_mut<Query, ProviderType, Outcome>(
412            &self,
413            query: Query,
414        ) -> core::result::Result<Outcome, s2n_quic_core::query::Error>
415        where
416            Query: FnOnce(&mut ProviderType) -> Outcome,
417            ProviderType: 'static,
418        {
419            use s2n_quic_core::query;
420            let mut query = query::Once::new_mut(query);
421
422            self.0
423                .datagram_mut(&mut query)
424                .map_err(|_| query::Error::ConnectionLockPoisoned)?;
425
426            query.into()
427        }
428    };
429}
430
431#[derive(Clone, Debug)]
432pub struct Handle(pub(crate) s2n_quic_transport::connection::Connection);
433
434impl Handle {
435    impl_handle_api!(|handle, call| call!(handle));
436}