s2n_quic/connection/handle.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4macro_rules! impl_handle_api {
5 (| $handle:ident, $dispatch:ident | $dispatch_body:expr) => {
6 /// Opens a new [`LocalStream`](`crate::stream::LocalStream`) with a specific type
7 ///
8 /// The method will return
9 /// - `Ok(stream)` if a stream of the requested type was opened
10 /// - `Err(stream_error)` if the stream could not be opened due to an error
11 ///
12 /// # Examples
13 ///
14 /// ```rust,no_run
15 /// # async fn test() -> s2n_quic::connection::Result<()> {
16 /// # use s2n_quic::stream;
17 /// # let mut handle: s2n_quic::connection::Handle = todo!();
18 /// #
19 /// while let Ok(stream) = handle.open_stream(stream::Type::Bidirectional).await {
20 /// println!("Stream opened from {:?}", stream.connection().remote_addr());
21 /// }
22 /// #
23 /// # Ok(())
24 /// # }
25 /// ```
26 #[inline]
27 pub async fn open_stream(
28 &mut self,
29 stream_type: $crate::stream::Type,
30 ) -> $crate::connection::Result<$crate::stream::LocalStream> {
31 futures::future::poll_fn(|cx| self.poll_open_stream(stream_type, cx)).await
32 }
33
34 /// Polls opening a [`LocalStream`](`crate::stream::LocalStream`) with a specific type
35 ///
36 /// The method will return
37 /// - `Poll::Ready(Ok(stream))` if a stream of the requested type was opened
38 /// - `Poll::Ready(Err(stream_error))` if the stream could not be opened due to an error
39 /// - `Poll::Pending` if the stream has not been opened yet
40 #[inline]
41 pub fn poll_open_stream(
42 &mut self,
43 stream_type: $crate::stream::Type,
44 cx: &mut core::task::Context,
45 ) -> core::task::Poll<$crate::connection::Result<$crate::stream::LocalStream>> {
46 s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
47 use s2n_quic_core::stream::StreamType;
48 use $crate::stream::{BidirectionalStream, SendStream};
49
50 Ok(
51 match core::task::ready!(self.0.poll_open_stream(stream_type, cx))? {
52 stream if stream_type == StreamType::Unidirectional => {
53 SendStream::new(stream.into()).into()
54 }
55 stream => BidirectionalStream::new(stream).into(),
56 },
57 )
58 .into()
59 })
60 }
61
62 /// Opens a new [`BidirectionalStream`](`crate::stream::BidirectionalStream`)
63 ///
64 /// The method will return
65 /// - `Ok(stream)` if a bidirectional stream was opened
66 /// - `Err(stream_error)` if the stream could not be opened due to an error
67 ///
68 /// # Examples
69 ///
70 /// ```rust,no_run
71 /// # async fn test() -> s2n_quic::connection::Result<()> {
72 /// # let mut handle: s2n_quic::connection::Handle = todo!();
73 /// #
74 /// while let Ok(mut stream) = handle.open_bidirectional_stream().await {
75 /// println!("Stream opened from {:?}", stream.connection().remote_addr());
76 /// }
77 /// #
78 /// # Ok(())
79 /// # }
80 /// ```
81 #[inline]
82 pub async fn open_bidirectional_stream(
83 &mut self,
84 ) -> $crate::connection::Result<$crate::stream::BidirectionalStream> {
85 futures::future::poll_fn(|cx| self.poll_open_bidirectional_stream(cx)).await
86 }
87
88 /// Polls opening a [`BidirectionalStream`](`crate::stream::BidirectionalStream`)
89 ///
90 /// The method will return
91 /// - `Poll::Ready(Ok(stream))` if a bidirectional stream was opened
92 /// - `Poll::Ready(Err(stream_error))` if the stream could not be opened due to an error
93 /// - `Poll::Pending` if the stream has not been opened yet
94 #[inline]
95 pub fn poll_open_bidirectional_stream(
96 &mut self,
97 cx: &mut core::task::Context,
98 ) -> core::task::Poll<$crate::connection::Result<$crate::stream::BidirectionalStream>> {
99 s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
100 use s2n_quic_core::stream::StreamType;
101 use $crate::stream::BidirectionalStream;
102
103 let stream =
104 core::task::ready!(self.0.poll_open_stream(StreamType::Bidirectional, cx))?;
105
106 Ok(BidirectionalStream::new(stream)).into()
107 })
108 }
109
110 /// Opens a [`SendStream`](`crate::stream::SendStream`)
111 ///
112 /// # Examples
113 ///
114 /// ```rust,no_run
115 /// # async fn test() -> s2n_quic::connection::Result<()> {
116 /// # let mut connection: s2n_quic::connection::Handle = todo!();
117 /// #
118 /// let stream = connection.open_send_stream().await?;
119 /// println!("Send stream opened with id: {}", stream.id());
120 /// #
121 /// # Ok(())
122 /// # }
123 /// ```
124 #[inline]
125 pub async fn open_send_stream(
126 &mut self,
127 ) -> $crate::connection::Result<$crate::stream::SendStream> {
128 futures::future::poll_fn(|cx| self.poll_open_send_stream(cx)).await
129 }
130
131 /// Polls opening a [`SendStream`](`crate::stream::SendStream`)
132 #[inline]
133 pub fn poll_open_send_stream(
134 &mut self,
135 cx: &mut core::task::Context,
136 ) -> core::task::Poll<$crate::connection::Result<$crate::stream::SendStream>> {
137 s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
138 use s2n_quic_core::stream::StreamType;
139 use $crate::stream::SendStream;
140
141 let stream =
142 core::task::ready!(self.0.poll_open_stream(StreamType::Unidirectional, cx))?;
143
144 Ok(SendStream::new(stream.into())).into()
145 })
146 }
147
148 /// Returns the local address that this connection is bound to.
149 #[inline]
150 pub fn local_addr(&self) -> $crate::connection::Result<std::net::SocketAddr> {
151 self.0.local_address().map(std::net::SocketAddr::from)
152 }
153
154 /// Returns the remote address that this connection is connected to.
155 #[inline]
156 pub fn remote_addr(&self) -> $crate::connection::Result<std::net::SocketAddr> {
157 self.0.remote_address().map(std::net::SocketAddr::from)
158 }
159
160 /// Returns the negotiated server name the connection is using.
161 #[inline]
162 pub fn server_name(&self) -> $crate::connection::Result<Option<$crate::server::Name>> {
163 self.0.server_name()
164 }
165
166 /// Returns the negotiated application protocol the connection is using
167 #[inline]
168 pub fn application_protocol(&self) -> $crate::connection::Result<::bytes::Bytes> {
169 self.0.application_protocol()
170 }
171
172 /// Takes the context provided by the TLS provider.
173 ///
174 /// This functionality is useful when you need to pass information from the TLS provider to the
175 /// application. This could include things like certificate information or application-specific data.
176 ///
177 /// Calling this function a second time will always return `None` so applications should
178 /// store the context elsewhere if it is needed in multiple locations.
179 #[inline]
180 pub fn take_tls_context(&mut self) -> Option<std::boxed::Box<dyn core::any::Any + Send>> {
181 self.0.take_tls_context()
182 }
183
184 /// Returns the internal identifier for the [`Connection`](`crate::Connection`)
185 ///
186 /// Note: This internal identifier is not the same as the connection ID included in packet
187 /// headers as described in [QUIC Transport RFC](https://www.rfc-editor.org/rfc/rfc9000.html#name-connection-id)
188 #[inline]
189 pub fn id(&self) -> u64 {
190 self.0.id()
191 }
192
193 /// Sends a Ping frame to the peer
194 #[inline]
195 pub fn ping(&mut self) -> $crate::connection::Result<()> {
196 self.0.ping()
197 }
198
199 /// Enables or disables the connection to actively keep the connection alive with the peer
200 ///
201 /// This can be useful for maintaining connections beyond the configured idle timeout. The
202 /// connection will continue to be held open until the keep alive is disabled or the
203 /// connection is no longer able to be maintained due to connectivity.
204 #[inline]
205 pub fn keep_alive(&mut self, enabled: bool) -> $crate::connection::Result<()> {
206 self.0.keep_alive(enabled)
207 }
208
209 /// Closes the Connection with the provided error code
210 ///
211 /// This will immediately terminate all outstanding streams.
212 ///
213 /// # Examples
214 ///
215 /// ```rust,no_run
216 /// # async fn test() -> s2n_quic::connection::Result<()> {
217 /// # let mut connection: s2n_quic::connection::Handle = todo!();
218 /// #
219 /// const MY_ERROR_CODE:u32 = 99;
220 /// connection.close(MY_ERROR_CODE.into());
221 /// #
222 /// # Ok(())
223 /// # }
224 /// ```
225 #[inline]
226 pub fn close(&self, error_code: $crate::application::Error) {
227 self.0.close(error_code)
228 }
229
230 /// API for querying the connection's
231 /// [`Subscriber::ConnectionContext`](crate::provider::event::Subscriber::ConnectionContext).
232 ///
233 /// The ConnectionContext provides a mechanism for users to provide a custom
234 /// type and update it on each event. The query APIs (check
235 /// [`Self::query_event_context_mut`] for mutable version) provide a way to inspect the
236 /// ConnectionContext outside of events.
237 ///
238 /// This function takes a `FnOnce(&EventContext) -> Outcome`, where `EventContext`
239 /// represents the type of `ConnectionContext`. If the `EventContext` type matches
240 /// any of the types of the configured Subscriber's context, the query is executed
241 /// and `Ok(Outcome)` is returned, else
242 /// `Err(`[`query::Error`](s2n_quic_core::query::Error)`)`.
243 ///
244 /// Given that it is possible to compose Subscriber, which can have different
245 /// ConnectionContext types, this function traverses all Subscribers, executes
246 /// and short-circuiting on the first match.
247 ///
248 /// # Examples
249 ///
250 /// ```no_run
251 /// use s2n_quic::{provider::event::{events, query, Subscriber}, Connection, Server};
252 ///
253 /// struct MySubscriber{}
254 ///
255 /// impl Subscriber for MySubscriber {
256 /// type ConnectionContext = MyEventContext;
257 /// fn create_connection_context(
258 /// &mut self, _meta: &events::ConnectionMeta,
259 /// _info: &events::ConnectionInfo,
260 /// ) -> Self::ConnectionContext {
261 /// MyEventContext { request: 0 }
262 /// }
263 /// }
264 ///
265 /// #[derive(Clone, Copy)]
266 /// pub struct MyEventContext {
267 /// request: u64,
268 /// }
269 ///
270 /// let mut server = Server::builder()
271 /// .with_event(MySubscriber {}).unwrap()
272 /// .start().unwrap();
273 /// # let connection: Connection = todo!();
274 ///
275 /// let outcome: Result<MyEventContext, query::Error> = connection
276 /// .query_event_context(|event_context: &MyEventContext| *event_context);
277 ///
278 /// match outcome {
279 /// Ok(event_context) => {
280 /// // `MyEventContext` matched a Subscriber::ConnectionContext and the
281 /// // query executed.
282 /// //
283 /// // use the value event_context for logging, etc..
284 /// }
285 /// Err(query::Error::ConnectionLockPoisoned) => {
286 /// // The query did not execute because of a connection error.
287 /// //
288 /// // log an error, panic, etc..
289 /// }
290 /// Err(query::Error::ContextTypeMismatch) => {
291 /// // `MyEventContext` failed to match any Subscriber::ConnectionContext
292 /// // and the query did not execute.
293 /// //
294 /// // log an error, panic, etc..
295 /// }
296 /// Err(_) => {
297 /// // We encountered an unknown error so handle it generically, e.g. log,
298 /// // panic, etc.
299 /// }
300 /// }
301 /// ```
302 ///
303 /// # Traverse order
304 /// Let's demonstrate the traversal order for matching on ConnectionContext in the
305 /// example below. We provide a composed Subscriber type (Foo, Bar), where both
306 /// Foo and Bar have a ConnectionContext type of `u64`. The query traverse order
307 /// is as follows:
308 /// - `(Foo::ConnectionContext, Bar::ConnectionContext)`
309 /// - `Foo::ConnectionContext`
310 /// - `Bar::ConnectionContext`
311 ///
312 /// Note: In this example the type `u64` will always match `Foo::u64` and
313 /// `Bar::u64` will never be matched. If this is undesirable, applications should
314 /// make unique associated `ConnectionContext`s by creating new types.
315 ///
316 /// ```no_run
317 /// use s2n_quic::{provider::event::{events, Subscriber}, Connection, Server};
318 ///
319 /// struct Foo {}
320 ///
321 /// impl Subscriber for Foo {
322 /// type ConnectionContext = u64;
323 /// fn create_connection_context(
324 /// &mut self, _meta: &events::ConnectionMeta,
325 /// _info: &events::ConnectionInfo,
326 /// ) -> Self::ConnectionContext { 0 }
327 /// }
328 ///
329 /// struct Bar {}
330 ///
331 /// impl Subscriber for Bar {
332 /// type ConnectionContext = u64;
333 /// fn create_connection_context(
334 /// &mut self, _meta: &events::ConnectionMeta,
335 /// _info: &events::ConnectionInfo,
336 /// ) -> Self::ConnectionContext { 0 }
337 /// }
338 ///
339 /// let mut server = Server::builder()
340 /// .with_event((Foo {}, Bar {})).unwrap()
341 /// .start().unwrap();
342 /// # let connection: Connection = todo!();
343 ///
344 /// // Matches Foo.
345 /// //
346 /// // Note: Because the `ConnectionContext` type is the same for
347 /// // both `Foo` and `Bar`, only `Foo`'s context will be matched.
348 /// let _ = connection.query_event_context(|ctx: &u64| *ctx );
349 ///
350 /// // Matches (Foo, Bar).
351 /// let _ = connection.query_event_context(|ctx: &(u64, u64)| ctx.0 );
352 /// ```
353 pub fn query_event_context<Query, EventContext, Outcome>(
354 &self,
355 query: Query,
356 ) -> core::result::Result<Outcome, s2n_quic_core::query::Error>
357 where
358 Query: FnOnce(&EventContext) -> Outcome,
359 EventContext: 'static,
360 {
361 use s2n_quic_core::query;
362 let mut query = query::Once::new(query);
363
364 self.0
365 .query_event_context(&mut query)
366 .map_err(|_| query::Error::ConnectionLockPoisoned)?;
367
368 query.into()
369 }
370
371 /// API for querying the connection's
372 /// [`Subscriber::ConnectionContext`](crate::provider::event::Subscriber::ConnectionContext).
373 ///
374 /// Similar to [`Self::query_event_context`] but provides
375 /// mutable access to `ConnectionContext`.
376 ///
377 /// ```ignore
378 /// let outcome = connection
379 /// .query_event_context(
380 /// |event_context: &MyEventContext| event_context.request += 1
381 /// );
382 /// ```
383 pub fn query_event_context_mut<Query, EventContext, Outcome>(
384 &mut self,
385 query: Query,
386 ) -> core::result::Result<Outcome, s2n_quic_core::query::Error>
387 where
388 Query: FnOnce(&mut EventContext) -> Outcome,
389 EventContext: 'static,
390 {
391 use s2n_quic_core::query;
392 let mut query = query::Once::new_mut(query);
393
394 self.0
395 .query_event_context_mut(&mut query)
396 .map_err(|_| query::Error::ConnectionLockPoisoned)?;
397
398 query.into()
399 }
400
401 /// API for querying the connection's datagram endpoint.
402 ///
403 /// Provides mutable access to `Sender` or `Receiver`.
404 ///
405 /// ```ignore
406 /// let outcome = connection
407 /// .datagram_mut(
408 /// |sender: &MySender| sender.send_datagram(Bytes::from_static(&[1, 2, 3]));
409 /// );
410 /// ```
411 pub fn datagram_mut<Query, ProviderType, Outcome>(
412 &self,
413 query: Query,
414 ) -> core::result::Result<Outcome, s2n_quic_core::query::Error>
415 where
416 Query: FnOnce(&mut ProviderType) -> Outcome,
417 ProviderType: 'static,
418 {
419 use s2n_quic_core::query;
420 let mut query = query::Once::new_mut(query);
421
422 self.0
423 .datagram_mut(&mut query)
424 .map_err(|_| query::Error::ConnectionLockPoisoned)?;
425
426 query.into()
427 }
428 };
429}
430
431#[derive(Clone, Debug)]
432pub struct Handle(pub(crate) s2n_quic_transport::connection::Connection);
433
434impl Handle {
435 impl_handle_api!(|handle, call| call!(handle));
436}