1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
use crate::{
connection::{ConnectionError, SinkConnection},
message::Message,
transforms::kafka::sink_cluster::connections::ConnectionState,
};
use anyhow::{Result, anyhow};
use std::time::{Duration, Instant};
use super::AuthorizeScramOverMtls;
pub struct ScramOverMtlsConnection {
connection: SinkConnection,
/// When a connection is recreated to avoid timeouts,
/// the old connection will be kept around until all responses have been received from it.
old_connection: Option<SinkConnection>,
created_at: Instant,
timeout: Duration,
}
impl ScramOverMtlsConnection {
pub fn new(
connection: SinkConnection,
old_connection: Option<ScramOverMtlsConnection>,
authorize_scram_over_mtls: &Option<AuthorizeScramOverMtls>,
) -> Result<Self> {
let old_connection = old_connection
.map(|x| x.into_old_connection())
.transpose()?
.flatten();
Ok(ScramOverMtlsConnection {
connection,
old_connection,
created_at: Instant::now(),
timeout: Self::calculate_timeout(authorize_scram_over_mtls),
})
}
fn into_old_connection(self) -> Result<Option<SinkConnection>> {
if self.old_connection.is_some() {
return Err(anyhow!(
"The connection to be replaced had an old_connection. For this to occur a response needs to have been pending for longer than the timeout period which indicates other problems."
));
}
if self.connection.pending_requests_count() == 0 {
Ok(None)
} else {
Ok(Some(self.connection))
}
}
/// Attempts to receive messages, if there are no messages available it immediately returns an empty vec.
/// If there is a problem with the connection an error is returned.
pub fn try_recv_into(&mut self, responses: &mut Vec<Message>) -> Result<(), ConnectionError> {
// ensure old connection is completely drained before receiving from new connection
if let Some(old_connection) = &mut self.old_connection {
old_connection.try_recv_into(responses)?;
if old_connection.pending_requests_count() == 0 {
self.old_connection = None;
self.connection.try_recv_into(responses)?;
}
Ok(())
} else {
self.connection.try_recv_into(responses)
}
}
/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), ConnectionError> {
self.connection.send(messages)
}
/// Receives messages, if there are no messages available it awaits until there are messages.
/// If there is a problem with the connection an error is returned.
pub async fn recv(&mut self) -> Result<Vec<Message>, ConnectionError> {
// ensure old connection is completely drained before receiving from new connection
if let Some(old_connection) = &mut self.old_connection {
let mut received = old_connection.recv().await?;
if old_connection.pending_requests_count() == 0 {
self.old_connection = None;
// Do not use `recv` method here since we already have at least one message due to previous `recv`,
// so we avoid blocking by calling `try_recv_into` instead.
self.connection.try_recv_into(&mut received)?;
}
Ok(received)
} else {
self.connection.recv().await
}
}
pub fn get_error(&mut self) -> Option<ConnectionError> {
self.connection.get_error()
}
pub fn pending_requests_count(&self) -> usize {
self.connection.pending_requests_count()
+ self
.old_connection
.as_ref()
.map(|x| x.pending_requests_count())
.unwrap_or_default()
}
fn calculate_timeout(authorize_scram_over_mtls: &Option<AuthorizeScramOverMtls>) -> Duration {
// The delegation token is recreated after `0.5 * delegation_token_lifetime`
// Consider what happens when we match that timing for our connection timeout,
// in this timeline going from left to right:
//
// create token t1 create token t2
// |--------------------|--------------------|
// | ^ all connections created after this point use token t2 instead of token t1
// | |
// | token t1 lifetime |
// |-----------------------------------------|
// | ^
// | after this point, connections still alive that were authed with token t1 will be closed by the broker.
// | |
// | |
// | |
// | token t2 lifetime
// | |-----------------------------------------|
// | ^ all connections created after this point use token t2
// | |
// | |
// | |
// | connection lifetime using token t1 |
// | |--------------------| |
// This case is fine, the connection exists entirely within the lifetime of token t1.
// | |
// | |
// | |
// | connection lifetime using token t2
// | |--------------------|
// This case is fine, the connection exists entirely within the lifetime of token t2.
// | |
// | |
// | |
// | connection lifetime using token t?
// | |--------------------|
// This case is a race condition.
// We could start with either token t2 or t1.
// If we start with t1 we could go past the end of t1's lifetime.
// To avoid this issue we reduce the size of the connection lifetime by a further 25%
//
// At low values of delegation_token_lifetime all of this falls apart since something
// like a VM migration could delay shotover execution for many seconds.
// However for sufficently large delegation_token_lifetime values (> 1 hour) this should be fine.
authorize_scram_over_mtls
.as_ref()
.unwrap()
.delegation_token_lifetime
.mul_f32(
// match token recreation time
0.5 *
// further reduce connection timeout
0.75,
)
}
pub fn state(&self, recent_instant: Instant) -> ConnectionState {
// Since we cant be 100% exact with time anyway, we use a recent instant that can be reused to reduce syscalls.
if recent_instant.duration_since(self.created_at) > self.timeout {
ConnectionState::AtRiskOfAuthTokenExpiry
} else {
ConnectionState::Open
}
}
}