s2n_quic_transport/connection/
api.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Defines the public QUIC connection API
5
6use crate::{
7    connection::{self, ConnectionApi, OpenToken},
8    stream::{ops, Stream, StreamError, StreamId},
9};
10use bytes::Bytes;
11use core::{
12    any::Any,
13    fmt,
14    sync::atomic::{self, Ordering},
15    task::{Context, Poll},
16};
17use s2n_quic_core::{
18    application,
19    application::ServerName,
20    inet::SocketAddress,
21    query::{Query, QueryMut},
22    stream::StreamType,
23};
24
25/// A QUIC connection
26pub struct Connection {
27    /// The inner connection API implementation
28    ///
29    /// This uses a dynamically-dispatched interface to hide all of the connection's
30    /// generic parameters and allows applications to interact with connections in a
31    /// straightforward manner.
32    pub(super) api: ConnectionApi,
33
34    /// The open token associated with each connection handle.
35    ///
36    /// This is used to correctly track `poll_open_stream` requests.
37    open_token: OpenToken,
38}
39
40impl fmt::Debug for Connection {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        // TODO print interesting virtual fields
43        f.debug_struct("Connection").finish()
44    }
45}
46
47impl Drop for Connection {
48    fn drop(&mut self) {
49        debug_assert!(
50            self.api.application_handle_count().load(Ordering::Acquire) > 0,
51            "application_handle_count underflowed"
52        );
53
54        // Safety
55        //
56        // The use of Ordering and fence mirrors the `Arc` implementation in
57        // the standard library.
58        //
59        // This fence is needed to prevent reordering of use of the data and
60        // deletion of the data.  Because it is marked `Release`, the decreasing
61        // of the reference count synchronizes with this `Acquire` fence. This
62        // means that use of the data happens before decreasing the reference
63        // count, which happens before this fence, which happens before the
64        // deletion of the data.
65        // https://github.com/rust-lang/rust/blob/e012a191d768adeda1ee36a99ef8b92d51920154/library/alloc/src/sync.rs#L1637
66
67        // If the connection wasn't closed before, close it now to make sure
68        // all Streams terminate.
69        //
70        // Only close the connection if this is the last application handle.
71        // Otherwise, just drop `api`, which decrements the strong count.
72        if self
73            .api
74            .application_handle_count()
75            .fetch_sub(1, Ordering::Release)
76            != 1
77        {
78            return;
79        }
80
81        atomic::fence(Ordering::Acquire);
82        self.api.close_connection(None);
83    }
84}
85
86impl Clone for Connection {
87    fn clone(&self) -> Self {
88        // Safety
89        //
90        // Using a relaxed ordering is alright here, as knowledge of the
91        // original reference prevents other threads from erroneously deleting
92        // the object.
93        // https://github.com/rust-lang/rust/blob/e012a191d768adeda1ee36a99ef8b92d51920154/library/alloc/src/sync.rs#L1329
94        self.api
95            .application_handle_count()
96            .fetch_add(1, Ordering::Relaxed);
97        Self {
98            api: self.api.clone(),
99            // don't clone the open token - each instance should have its own token
100            open_token: OpenToken::new(),
101        }
102    }
103}
104
105impl Connection {
106    pub(crate) fn new(api: ConnectionApi) -> Self {
107        // Safety
108        //
109        // Using a relaxed ordering is alright here, as knowledge of the
110        // original reference prevents other threads from erroneously deleting
111        // the object.
112        // https://github.com/rust-lang/rust/blob/e012a191d768adeda1ee36a99ef8b92d51920154/library/alloc/src/sync.rs#L1329
113        api.application_handle_count()
114            .fetch_add(1, Ordering::Relaxed);
115        Self {
116            api,
117            open_token: OpenToken::new(),
118        }
119    }
120
121    /// Accepts an incoming [`Stream`]
122    ///
123    /// The method will return
124    /// - `Poll::Ready(Ok(Some(stream, stream_type)))` if a [`Stream`] was accepted
125    /// - `Poll::Ready(Ok(None))` if the connection was closed without an error
126    /// - `Poll::Ready(Err(stream_error))` if no stream could be accepted due to an error
127    /// - `Poll::Pending` if no new [`Stream`] of the given type was accepted by the connection yet.
128    ///   In this case the caller must retry calling [`Self::poll_accept`].
129    ///   For this purpose the method will save the [`core::task::Waker`]
130    ///   which is provided as part of the [`Context`] parameter, and notify it
131    ///   as soon as retrying the method will yield a different result.
132    #[inline]
133    pub fn poll_accept(
134        &mut self,
135        stream_type: Option<StreamType>,
136        context: &Context,
137    ) -> Poll<Result<Option<Stream>, connection::Error>> {
138        self.api.poll_accept(&self.api, stream_type, context)
139    }
140
141    #[inline]
142    pub fn poll_open_stream(
143        &mut self,
144        stream_type: StreamType,
145        context: &Context,
146    ) -> Poll<Result<Stream, connection::Error>> {
147        self.api
148            .poll_open_stream(&self.api, stream_type, &mut self.open_token, context)
149    }
150
151    #[inline]
152    pub fn poll_request(
153        &self,
154        stream_id: StreamId,
155        request: &mut ops::Request,
156        context: Option<&Context>,
157    ) -> Result<ops::Response, StreamError> {
158        self.api.poll_request(stream_id, request, context)
159    }
160
161    /// Closes the Connection with the provided error code
162    ///
163    /// This will immediately terminate all outstanding streams.
164    #[inline]
165    pub fn close(&self, error_code: application::Error) {
166        self.api.close_connection(Some(error_code));
167    }
168
169    #[inline]
170    pub fn server_name(&self) -> Result<Option<ServerName>, connection::Error> {
171        self.api.server_name()
172    }
173
174    #[inline]
175    pub fn application_protocol(&self) -> Result<Bytes, connection::Error> {
176        self.api.application_protocol()
177    }
178    #[inline]
179    pub fn take_tls_context(&self) -> Option<Box<dyn Any + Send>> {
180        self.api.take_tls_context()
181    }
182    #[inline]
183    pub fn id(&self) -> u64 {
184        self.api.id()
185    }
186
187    #[inline]
188    pub fn ping(&self) -> Result<(), connection::Error> {
189        self.api.ping()
190    }
191
192    pub fn keep_alive(&self, enabled: bool) -> Result<(), connection::Error> {
193        self.api.keep_alive(enabled)
194    }
195
196    #[inline]
197    pub fn local_address(&self) -> Result<SocketAddress, connection::Error> {
198        self.api.local_address()
199    }
200
201    #[inline]
202    pub fn remote_address(&self) -> Result<SocketAddress, connection::Error> {
203        self.api.remote_address()
204    }
205
206    #[inline]
207    pub fn query_event_context(&self, query: &mut dyn Query) -> Result<(), connection::Error> {
208        self.api.query_event_context(query)
209    }
210
211    #[inline]
212    pub fn query_event_context_mut(
213        &self,
214        query: &mut dyn QueryMut,
215    ) -> Result<(), connection::Error> {
216        self.api.query_event_context_mut(query)
217    }
218
219    #[inline]
220    pub fn datagram_mut(&self, query: &mut dyn QueryMut) -> Result<(), connection::Error> {
221        self.api.datagram_mut(query)
222    }
223}