s2n_quic_transport/connection/api.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Defines the public QUIC connection API
5
6use crate::{
7 connection::{self, ConnectionApi, OpenToken},
8 stream::{ops, Stream, StreamError, StreamId},
9};
10use bytes::Bytes;
11use core::{
12 any::Any,
13 fmt,
14 sync::atomic::{self, Ordering},
15 task::{Context, Poll},
16};
17use s2n_quic_core::{
18 application,
19 application::ServerName,
20 inet::SocketAddress,
21 query::{Query, QueryMut},
22 stream::StreamType,
23};
24
25/// A QUIC connection
26pub struct Connection {
27 /// The inner connection API implementation
28 ///
29 /// This uses a dynamically-dispatched interface to hide all of the connection's
30 /// generic parameters and allows applications to interact with connections in a
31 /// straightforward manner.
32 pub(super) api: ConnectionApi,
33
34 /// The open token associated with each connection handle.
35 ///
36 /// This is used to correctly track `poll_open_stream` requests.
37 open_token: OpenToken,
38}
39
40impl fmt::Debug for Connection {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 // TODO print interesting virtual fields
43 f.debug_struct("Connection").finish()
44 }
45}
46
47impl Drop for Connection {
48 fn drop(&mut self) {
49 debug_assert!(
50 self.api.application_handle_count().load(Ordering::Acquire) > 0,
51 "application_handle_count underflowed"
52 );
53
54 // Safety
55 //
56 // The use of Ordering and fence mirrors the `Arc` implementation in
57 // the standard library.
58 //
59 // This fence is needed to prevent reordering of use of the data and
60 // deletion of the data. Because it is marked `Release`, the decreasing
61 // of the reference count synchronizes with this `Acquire` fence. This
62 // means that use of the data happens before decreasing the reference
63 // count, which happens before this fence, which happens before the
64 // deletion of the data.
65 // https://github.com/rust-lang/rust/blob/e012a191d768adeda1ee36a99ef8b92d51920154/library/alloc/src/sync.rs#L1637
66
67 // If the connection wasn't closed before, close it now to make sure
68 // all Streams terminate.
69 //
70 // Only close the connection if this is the last application handle.
71 // Otherwise, just drop `api`, which decrements the strong count.
72 if self
73 .api
74 .application_handle_count()
75 .fetch_sub(1, Ordering::Release)
76 != 1
77 {
78 return;
79 }
80
81 atomic::fence(Ordering::Acquire);
82 self.api.close_connection(None);
83 }
84}
85
86impl Clone for Connection {
87 fn clone(&self) -> Self {
88 // Safety
89 //
90 // Using a relaxed ordering is alright here, as knowledge of the
91 // original reference prevents other threads from erroneously deleting
92 // the object.
93 // https://github.com/rust-lang/rust/blob/e012a191d768adeda1ee36a99ef8b92d51920154/library/alloc/src/sync.rs#L1329
94 self.api
95 .application_handle_count()
96 .fetch_add(1, Ordering::Relaxed);
97 Self {
98 api: self.api.clone(),
99 // don't clone the open token - each instance should have its own token
100 open_token: OpenToken::new(),
101 }
102 }
103}
104
105impl Connection {
106 pub(crate) fn new(api: ConnectionApi) -> Self {
107 // Safety
108 //
109 // Using a relaxed ordering is alright here, as knowledge of the
110 // original reference prevents other threads from erroneously deleting
111 // the object.
112 // https://github.com/rust-lang/rust/blob/e012a191d768adeda1ee36a99ef8b92d51920154/library/alloc/src/sync.rs#L1329
113 api.application_handle_count()
114 .fetch_add(1, Ordering::Relaxed);
115 Self {
116 api,
117 open_token: OpenToken::new(),
118 }
119 }
120
121 /// Accepts an incoming [`Stream`]
122 ///
123 /// The method will return
124 /// - `Poll::Ready(Ok(Some(stream, stream_type)))` if a [`Stream`] was accepted
125 /// - `Poll::Ready(Ok(None))` if the connection was closed without an error
126 /// - `Poll::Ready(Err(stream_error))` if no stream could be accepted due to an error
127 /// - `Poll::Pending` if no new [`Stream`] of the given type was accepted by the connection yet.
128 /// In this case the caller must retry calling [`Self::poll_accept`].
129 /// For this purpose the method will save the [`core::task::Waker`]
130 /// which is provided as part of the [`Context`] parameter, and notify it
131 /// as soon as retrying the method will yield a different result.
132 #[inline]
133 pub fn poll_accept(
134 &mut self,
135 stream_type: Option<StreamType>,
136 context: &Context,
137 ) -> Poll<Result<Option<Stream>, connection::Error>> {
138 self.api.poll_accept(&self.api, stream_type, context)
139 }
140
141 #[inline]
142 pub fn poll_open_stream(
143 &mut self,
144 stream_type: StreamType,
145 context: &Context,
146 ) -> Poll<Result<Stream, connection::Error>> {
147 self.api
148 .poll_open_stream(&self.api, stream_type, &mut self.open_token, context)
149 }
150
151 #[inline]
152 pub fn poll_request(
153 &self,
154 stream_id: StreamId,
155 request: &mut ops::Request,
156 context: Option<&Context>,
157 ) -> Result<ops::Response, StreamError> {
158 self.api.poll_request(stream_id, request, context)
159 }
160
161 /// Closes the Connection with the provided error code
162 ///
163 /// This will immediately terminate all outstanding streams.
164 #[inline]
165 pub fn close(&self, error_code: application::Error) {
166 self.api.close_connection(Some(error_code));
167 }
168
169 #[inline]
170 pub fn server_name(&self) -> Result<Option<ServerName>, connection::Error> {
171 self.api.server_name()
172 }
173
174 #[inline]
175 pub fn application_protocol(&self) -> Result<Bytes, connection::Error> {
176 self.api.application_protocol()
177 }
178 #[inline]
179 pub fn take_tls_context(&self) -> Option<Box<dyn Any + Send>> {
180 self.api.take_tls_context()
181 }
182 #[inline]
183 pub fn id(&self) -> u64 {
184 self.api.id()
185 }
186
187 #[inline]
188 pub fn ping(&self) -> Result<(), connection::Error> {
189 self.api.ping()
190 }
191
192 pub fn keep_alive(&self, enabled: bool) -> Result<(), connection::Error> {
193 self.api.keep_alive(enabled)
194 }
195
196 #[inline]
197 pub fn local_address(&self) -> Result<SocketAddress, connection::Error> {
198 self.api.local_address()
199 }
200
201 #[inline]
202 pub fn remote_address(&self) -> Result<SocketAddress, connection::Error> {
203 self.api.remote_address()
204 }
205
206 #[inline]
207 pub fn query_event_context(&self, query: &mut dyn Query) -> Result<(), connection::Error> {
208 self.api.query_event_context(query)
209 }
210
211 #[inline]
212 pub fn query_event_context_mut(
213 &self,
214 query: &mut dyn QueryMut,
215 ) -> Result<(), connection::Error> {
216 self.api.query_event_context_mut(query)
217 }
218
219 #[inline]
220 pub fn datagram_mut(&self, query: &mut dyn QueryMut) -> Result<(), connection::Error> {
221 self.api.datagram_mut(query)
222 }
223}