1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
mod command;
mod stream;
mod stream_description;
mod wire;
use std::{
sync::{Arc, Weak},
time::{Duration, Instant},
};
use derivative::Derivative;
use self::{stream::Stream, wire::Message};
use super::ConnectionPoolInner;
use crate::{
error::{ErrorKind, Result},
event::cmap::{
CmapEventHandler,
ConnectionCheckedInEvent,
ConnectionCheckedOutEvent,
ConnectionClosedEvent,
ConnectionClosedReason,
ConnectionCreatedEvent,
ConnectionReadyEvent,
},
options::{StreamAddress, TlsOptions},
};
pub(crate) use command::{Command, CommandResponse};
pub(crate) use stream_description::StreamDescription;
pub(crate) use wire::next_request_id;
/// User-facing information about a connection to the database.
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
/// A driver-generated identifier that uniquely identifies the connection.
pub id: u32,
/// The address that the connection is connected to.
pub address: StreamAddress,
}
/// A wrapper around Stream that contains all the CMAP information needed to maintain a connection.
#[derive(Derivative)]
#[derivative(Debug)]
pub(crate) struct Connection {
pub(super) id: u32,
pub(super) address: StreamAddress,
pub(super) generation: u32,
/// The cached StreamDescription from the connection's handshake.
pub(super) stream_description: Option<StreamDescription>,
/// Marks the time when the connection was checked into the pool and established. This is used
/// to detect if the connection is idle.
ready_and_available_time: Option<Instant>,
/// The connection will have a weak reference to its pool when it's checked out. When it's
/// A reference to the pool that maintains the connection. If the connection is currently
/// currently checked into the pool, this will be None.
pub(super) pool: Option<Weak<ConnectionPoolInner>>,
stream: Stream,
#[derivative(Debug = "ignore")]
handler: Option<Arc<dyn CmapEventHandler>>,
}
impl Connection {
/// Constructs a new connection.
pub(crate) fn new(
id: u32,
address: StreamAddress,
generation: u32,
connect_timeout: Option<Duration>,
tls_options: Option<TlsOptions>,
handler: Option<Arc<dyn CmapEventHandler>>,
) -> Result<Self> {
let conn = Self {
id,
generation,
pool: None,
stream_description: None,
ready_and_available_time: None,
stream: Stream::connect(address.clone(), connect_timeout, tls_options)?,
address,
handler,
};
Ok(conn)
}
pub(crate) fn info(&self) -> ConnectionInfo {
ConnectionInfo {
id: self.id,
address: self.address.clone(),
}
}
pub(crate) fn address(&self) -> &StreamAddress {
&self.address
}
/// In order to check a connection back into the pool when it's dropped, we need to be able to
/// replace it with something. The `null` method facilitates this by creating a dummy connection
/// which can be passed to `std::mem::replace` to be dropped in place of the original
/// connection.
fn null() -> Self {
Self {
id: 0,
address: StreamAddress {
hostname: Default::default(),
port: None,
},
generation: 0,
pool: None,
stream_description: Some(Default::default()),
ready_and_available_time: None,
stream: Stream::Null,
handler: None,
}
}
/// Helper to mark the time that the connection was checked into the pool for the purpose of
/// detecting when it becomes idle.
pub(super) fn mark_checked_in(&mut self) {
self.pool.take();
self.ready_and_available_time = Some(Instant::now());
}
/// Helper to mark that the connection has been checked out of the pool. This ensures that the
/// connection is not marked as idle based on the time that it's checked out and that it has a
/// reference to the pool.
pub(super) fn mark_checked_out(&mut self, pool: Weak<ConnectionPoolInner>) {
self.pool = Some(pool);
self.ready_and_available_time.take();
}
/// Checks if the connection is idle.
pub(super) fn is_idle(&self, max_idle_time: Option<Duration>) -> bool {
self.ready_and_available_time
.and_then(|ready_and_available_time| {
max_idle_time.map(|max_idle_time| {
Instant::now().duration_since(ready_and_available_time) >= max_idle_time
})
})
.unwrap_or(false)
}
/// Checks if the connection is stale.
pub(super) fn is_stale(&self, current_generation: u32) -> bool {
self.generation != current_generation
}
/// Helper to create a `ConnectionCheckedOutEvent` for the connection.
pub(super) fn checked_out_event(&self) -> ConnectionCheckedOutEvent {
ConnectionCheckedOutEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
/// Helper to create a `ConnectionCheckedInEvent` for the connection.
pub(super) fn checked_in_event(&self) -> ConnectionCheckedInEvent {
ConnectionCheckedInEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
/// Helper to create a `ConnectionReadyEvent` for the connection.
pub(super) fn ready_event(&self) -> ConnectionReadyEvent {
ConnectionReadyEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
/// Helper to create a `ConnectionReadyEvent` for the connection.
pub(super) fn created_event(&self) -> ConnectionCreatedEvent {
ConnectionCreatedEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
/// Helper to create a `ConnectionReadyEvent` for the connection.
pub(super) fn closed_event(&self, reason: ConnectionClosedReason) -> ConnectionClosedEvent {
ConnectionClosedEvent {
address: self.address.clone(),
connection_id: self.id,
reason,
}
}
/// Executes a `Command` and returns a `CommandResponse` containing the result from the server.
///
/// An `Ok(...)` result simply means the server received the command and that the driver
/// driver received the response; it does not imply anything about the success of the command
/// itself.
pub(crate) fn send_command(
&mut self,
command: Command,
request_id: impl Into<Option<i32>>,
) -> Result<CommandResponse> {
let message = Message::with_command(command, request_id.into());
message.write_to(&mut self.stream)?;
let response_message = Message::read_from(&mut self.stream)?;
CommandResponse::new(self.address.clone(), response_message)
}
/// Gets the connection's StreamDescription.
pub(crate) fn stream_description(&self) -> Result<&StreamDescription> {
self.stream_description.as_ref().ok_or_else(|| {
ErrorKind::OperationError {
message: "Stream checked out but not handshaked".to_string(),
}
.into()
})
}
}
impl Drop for Connection {
fn drop(&mut self) {
// If the connection has a weak reference to a pool, that means that the connection is being
// dropped when it's checked out. If the pool is still alive, it should check itself back
// in. Otherwise, the connection should close itself and emit a ConnectionClosed event
// (because the `close` helper was not called explicitly).
//
// If the connection does not have a weak reference to a pool, then the connection is being
// dropped while it's not checked out. This means that the pool called the close helper
// explicitly, so we don't add it back to the pool or emit any events.
if let Some(ref weak_pool_ref) = self.pool {
if let Some(strong_pool_ref) = weak_pool_ref.upgrade() {
strong_pool_ref.check_in(std::mem::replace(self, Self::null()));
} else if let Some(ref handler) = self.handler {
handler.handle_connection_closed_event(
self.closed_event(ConnectionClosedReason::PoolClosed),
);
}
}
}
}