1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
use std::{
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, Stream, StreamExt, TryStreamExt};
use tracing::warn;
use crate::{
retry::{ExponentialBackoff, SessionRetryPolicy},
session::{session_error, session_warn},
transport::{Connector, SecureChannelEventLoop, Transport, TransportPollResult},
};
use opcua_types::{
AttributeId, QualifiedName, ReadValueId, StatusCode, TimestampsToReturn, VariableId,
};
use super::{
connect::{SessionConnectMode, SessionConnector},
services::subscriptions::event_loop::{SubscriptionActivity, SubscriptionEventLoop},
Session, SessionState,
};
/// A list of possible events that happens while polling the session.
/// The client can use this list to monitor events such as disconnects,
/// publish failures, etc.
#[derive(Debug)]
#[non_exhaustive]
pub enum SessionPollResult {
/// A message was sent to or received from the server.
Transport(TransportPollResult),
/// Connection was lost with the inner [`StatusCode`].
ConnectionLost(StatusCode),
/// Reconnecting to the server failed with the inner [`StatusCode`].
ReconnectFailed(StatusCode),
/// Session was reconnected, the mode is given by the innner [`SessionConnectMode`]
Reconnected(SessionConnectMode),
/// The session performed some periodic activity.
SessionActivity(SessionActivity),
/// The session performed some subscription-related activity.
Subscription(SubscriptionActivity),
/// The session begins (re)connecting to the server.
BeginConnect,
/// Disconnect due to a keep alive terminated.
FinishedDisconnect,
}
struct ConnectedState<T: Transport + Send + Sync + 'static> {
channel: SecureChannelEventLoop<T>,
keep_alive: BoxStream<'static, SessionActivity>,
subscriptions: BoxStream<'static, SubscriptionActivity>,
current_failed_keep_alive_count: u64,
currently_closing: bool,
disconnect_fut: BoxFuture<'static, Result<(), StatusCode>>,
}
// The way this is passed around, the Connected state being larger is
// not generally a problem, since it should be the most common state by far.
#[allow(clippy::large_enum_variant)]
enum SessionEventLoopState<T: Transport + Send + Sync + 'static> {
Connected(ConnectedState<T>),
Connecting(SessionConnector, ExponentialBackoff, Instant),
Disconnected,
}
/// The session event loop drives the client. It must be polled for anything to happen at all.
#[must_use = "The session event loop must be started for the session to work"]
pub struct SessionEventLoop<T: Connector + Send + Sync + 'static> {
inner: Arc<Session>,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
retry: SessionRetryPolicy,
keep_alive_interval: Duration,
max_failed_keep_alive_count: u64,
connector: T,
}
impl<T: Connector + Send + Sync + 'static> SessionEventLoop<T> {
pub(crate) fn new(
inner: Arc<Session>,
retry: SessionRetryPolicy,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
keep_alive_interval: Duration,
max_failed_keep_alive_count: u64,
connector: T,
) -> Self {
Self {
inner,
retry,
trigger_publish_recv,
keep_alive_interval,
max_failed_keep_alive_count,
connector,
}
}
/// Convenience method for running the session event loop until completion,
/// this method will return once the session is closed manually, or
/// after it fails to reconnect.
///
/// # Returns
///
/// * `StatusCode` - [Status code](StatusCode) indicating how the session terminated.
pub async fn run(self) -> StatusCode {
let stream = self.enter();
tokio::pin!(stream);
loop {
let r = stream.try_next().await;
match r {
Ok(None) => break StatusCode::Good,
Err(e) => break e,
_ => (),
}
}
}
/// Convenience method for running the session event loop until completion on a tokio task.
/// This method will return a [`JoinHandle`](tokio::task::JoinHandle) that will terminate
/// once the session is closed manually, or after it fails to reconnect.
///
/// # Returns
///
/// * `JoinHandle<StatusCode>` - Handle to a tokio task wrapping the event loop.
pub fn spawn(self) -> tokio::task::JoinHandle<StatusCode> {
tokio::task::spawn(self.run())
}
/// Start the event loop, returning a stream that must be polled until it is closed.
/// The stream will return `None` when the transport is closed manually, or
/// `Some(Err(StatusCode))` when the stream fails to reconnect after a loss of connection.
///
/// It yields events from normal session operation, which can be used to take specific actions
/// based on changes to the session state.
pub fn enter(self) -> impl Stream<Item = Result<SessionPollResult, StatusCode>> {
futures::stream::try_unfold(
(self, SessionEventLoopState::Disconnected),
|(slf, state)| async move {
let (res, state) = match state {
SessionEventLoopState::Connected(mut state) => {
tokio::select! {
r = state.channel.poll() => {
if let TransportPollResult::Closed(code) = r {
session_warn!(slf.inner, "Transport disconnected: {code}");
let _ = slf.inner.state_watch_tx.send(SessionState::Disconnected);
let should_reconnect = slf.inner.should_reconnect.load(Ordering::Relaxed);
if !should_reconnect {
return Ok(None);
}
Ok((
SessionPollResult::ConnectionLost(code),
SessionEventLoopState::Disconnected,
))
} else {
Ok((
SessionPollResult::Transport(r),
SessionEventLoopState::Connected(state),
))
}
}
r = state.keep_alive.next() => {
// Should never be null, fail out
let Some(r) = r else {
session_error!(slf.inner, "Session activity loop ended unexpectedly");
return Err(StatusCode::BadUnexpectedError);
};
match r {
SessionActivity::KeepAliveSucceeded => state.current_failed_keep_alive_count = 0,
SessionActivity::KeepAliveFailed(status_code) => {
session_warn!(slf.inner, "Keep alive failed: {status_code}");
state.current_failed_keep_alive_count += 1;
if !state.currently_closing
&& state.current_failed_keep_alive_count >= slf.max_failed_keep_alive_count
&& slf.max_failed_keep_alive_count != 0
{
session_error!(slf.inner, "Maximum number of failed keep-alives exceed limit, session will be closed.");
state.currently_closing = true;
let s = slf.inner.clone();
state.disconnect_fut = async move {
s.disconnect_inner(false, false).await
}.boxed();
}
},
}
Ok((
SessionPollResult::SessionActivity(r),
SessionEventLoopState::Connected(state),
))
}
r = state.subscriptions.next() => {
// Should never be null, fail out
let Some(r) = r else {
session_error!(slf.inner, "Subscription event loop ended unexpectedly");
return Err(StatusCode::BadUnexpectedError);
};
if let SubscriptionActivity::FatalFailure(e) = &r {
if !state.currently_closing {
session_error!(slf.inner, "Fatal error from subscription loop ({e}), session will be closed.");
state.currently_closing = true;
let s = slf.inner.clone();
state.disconnect_fut = async move {
s.disconnect_inner(false, false).await
}.boxed();
}
}
Ok((
SessionPollResult::Subscription(r),
SessionEventLoopState::Connected(state),
))
}
_ = &mut state.disconnect_fut => {
// Do nothing, if this terminates we will very soon be transitioning
// to a disconnected state.
Ok((
SessionPollResult::FinishedDisconnect,
SessionEventLoopState::Connected(state)
))
}
}
}
SessionEventLoopState::Disconnected => {
let connector = SessionConnector::new(slf.inner.clone());
let _ = slf.inner.state_watch_tx.send(SessionState::Connecting);
Ok((
SessionPollResult::BeginConnect,
SessionEventLoopState::Connecting(
connector,
slf.retry.new_backoff(),
Instant::now(),
),
))
}
SessionEventLoopState::Connecting(connector, mut backoff, next_try) => {
tokio::time::sleep_until(next_try.into()).await;
match connector.try_connect(&slf.connector).await {
Ok((channel, result)) => {
let _ = slf.inner.state_watch_tx.send(SessionState::Connected);
Ok((
SessionPollResult::Reconnected(result),
SessionEventLoopState::Connected(ConnectedState {
channel,
keep_alive: SessionActivityLoop::new(
slf.inner.clone(),
slf.keep_alive_interval,
)
.run()
.boxed(),
subscriptions: SubscriptionEventLoop::new(
slf.inner.clone(),
slf.trigger_publish_recv.clone(),
)
.run()
.boxed(),
current_failed_keep_alive_count: 0,
currently_closing: false,
disconnect_fut: futures::future::pending().boxed(),
}),
))
}
Err(e) => {
warn!("Failed to connect to server, status code: {e}");
match backoff.next() {
Some(x) => Ok((
SessionPollResult::ReconnectFailed(e),
SessionEventLoopState::Connecting(
connector,
backoff,
Instant::now() + x,
),
)),
None => Err(e),
}
}
}
}
}?;
Ok(Some((res, (slf, state))))
},
)
}
}
/// Periodic activity performed by the session.
#[derive(Debug, Clone)]
pub enum SessionActivity {
/// A keep alive request was sent to the server and a response was received with a successful state.
KeepAliveSucceeded,
/// A keep alive request was sent to the server, but it failed or the server was in an invalid state.
KeepAliveFailed(StatusCode),
}
enum SessionTickEvent {
KeepAlive,
}
struct SessionIntervals {
keep_alive: tokio::time::Interval,
}
impl SessionIntervals {
fn new(keep_alive_interval: Duration) -> Self {
let mut keep_alive = tokio::time::interval(keep_alive_interval);
keep_alive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Self { keep_alive }
}
async fn next(&mut self) -> SessionTickEvent {
tokio::select! {
_ = self.keep_alive.tick() => SessionTickEvent::KeepAlive
}
}
}
struct SessionActivityLoop {
inner: Arc<Session>,
tick_gen: SessionIntervals,
}
impl SessionActivityLoop {
fn new(inner: Arc<Session>, keep_alive_interval: Duration) -> Self {
Self {
inner,
tick_gen: SessionIntervals::new(keep_alive_interval),
}
}
fn run(self) -> impl Stream<Item = SessionActivity> {
futures::stream::unfold(self, |mut slf| async move {
match slf.tick_gen.next().await {
SessionTickEvent::KeepAlive => {
let now = Instant::now();
let res = slf
.inner
.read(
&[ReadValueId {
node_id: VariableId::Server_ServerStatus_State.into(),
attribute_id: AttributeId::Value as u32,
index_range: Default::default(),
data_encoding: QualifiedName::null(),
}],
TimestampsToReturn::Server,
1f64,
)
.await;
let elapsed = now.elapsed();
let data_value = match res.map(|r| r.into_iter().next()) {
Ok(Some(data_value)) => {
// Only update if the request was successful to avoid
// skewing the roundtrip time by processing timeouts.
slf.inner
.publish_limits_watch_tx
.send_modify(|limits| limits.update_message_roundtrip(elapsed));
data_value
}
// Should not be possible, this would be a bug in
// the server, assume everything is terrible.
Ok(None) => {
return Some((
SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
slf,
))
}
Err(e) => return Some((SessionActivity::KeepAliveFailed(e), slf)),
};
match data_value.value.and_then(|v| v.try_cast_to().ok()) {
Some(0) => Some((SessionActivity::KeepAliveSucceeded, slf)),
Some(s) => {
warn!("Keep alive failed, non-running status code {s}");
Some((
SessionActivity::KeepAliveFailed(StatusCode::BadServerHalted),
slf,
))
}
None => Some((
SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
slf,
)),
}
}
}
})
}
}