1use std::{
2 sync::{atomic::Ordering, Arc},
3 time::{Duration, Instant},
4};
5
6use futures::{future::BoxFuture, stream::BoxStream, FutureExt, Stream, StreamExt, TryStreamExt};
7use tracing::warn;
8
9use crate::{
10 retry::{ExponentialBackoff, SessionRetryPolicy},
11 session::{session_error, session_warn},
12 transport::{Connector, SecureChannelEventLoop, Transport, TransportPollResult},
13};
14use opcua_types::{
15 AttributeId, QualifiedName, ReadValueId, StatusCode, TimestampsToReturn, VariableId,
16};
17
18use super::{
19 connect::{SessionConnectMode, SessionConnector},
20 services::subscriptions::event_loop::{SubscriptionActivity, SubscriptionEventLoop},
21 Session, SessionState,
22};
23
24#[derive(Debug)]
28#[non_exhaustive]
29pub enum SessionPollResult {
30 Transport(TransportPollResult),
32 ConnectionLost(StatusCode),
34 ReconnectFailed(StatusCode),
36 Reconnected(SessionConnectMode),
38 SessionActivity(SessionActivity),
40 Subscription(SubscriptionActivity),
42 BeginConnect,
44 FinishedDisconnect,
46}
47
48struct ConnectedState<T: Transport + Send + Sync + 'static> {
49 channel: SecureChannelEventLoop<T>,
50 keep_alive: BoxStream<'static, SessionActivity>,
51 subscriptions: BoxStream<'static, SubscriptionActivity>,
52 current_failed_keep_alive_count: u64,
53 currently_closing: bool,
54 disconnect_fut: BoxFuture<'static, Result<(), StatusCode>>,
55}
56
57#[allow(clippy::large_enum_variant)]
60enum SessionEventLoopState<T: Transport + Send + Sync + 'static> {
61 Connected(ConnectedState<T>),
62 Connecting(SessionConnector, ExponentialBackoff, Instant),
63 Disconnected,
64}
65
66#[must_use = "The session event loop must be started for the session to work"]
68pub struct SessionEventLoop<T: Connector + Send + Sync + 'static> {
69 inner: Arc<Session>,
70 trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
71 retry: SessionRetryPolicy,
72 keep_alive_interval: Duration,
73 max_failed_keep_alive_count: u64,
74 connector: T,
75}
76
77impl<T: Connector + Send + Sync + 'static> SessionEventLoop<T> {
78 pub(crate) fn new(
79 inner: Arc<Session>,
80 retry: SessionRetryPolicy,
81 trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
82 keep_alive_interval: Duration,
83 max_failed_keep_alive_count: u64,
84 connector: T,
85 ) -> Self {
86 Self {
87 inner,
88 retry,
89 trigger_publish_recv,
90 keep_alive_interval,
91 max_failed_keep_alive_count,
92 connector,
93 }
94 }
95
96 pub async fn run(self) -> StatusCode {
104 let stream = self.enter();
105 tokio::pin!(stream);
106 loop {
107 let r = stream.try_next().await;
108
109 match r {
110 Ok(None) => break StatusCode::Good,
111 Err(e) => break e,
112 _ => (),
113 }
114 }
115 }
116
117 pub fn spawn(self) -> tokio::task::JoinHandle<StatusCode> {
125 tokio::task::spawn(self.run())
126 }
127
128 pub fn enter(self) -> impl Stream<Item = Result<SessionPollResult, StatusCode>> {
135 futures::stream::try_unfold(
136 (self, SessionEventLoopState::Disconnected),
137 |(slf, state)| async move {
138 let (res, state) = match state {
139 SessionEventLoopState::Connected(mut state) => {
140 tokio::select! {
141 r = state.channel.poll() => {
142 if let TransportPollResult::Closed(code) = r {
143 session_warn!(slf.inner, "Transport disconnected: {code}");
144 let _ = slf.inner.state_watch_tx.send(SessionState::Disconnected);
145
146 let should_reconnect = slf.inner.should_reconnect.load(Ordering::Relaxed);
147 if !should_reconnect {
148 return Ok(None);
149 }
150
151 Ok((
152 SessionPollResult::ConnectionLost(code),
153 SessionEventLoopState::Disconnected,
154 ))
155 } else {
156 Ok((
157 SessionPollResult::Transport(r),
158 SessionEventLoopState::Connected(state),
159 ))
160 }
161 }
162 r = state.keep_alive.next() => {
163 let Some(r) = r else {
165 session_error!(slf.inner, "Session activity loop ended unexpectedly");
166 return Err(StatusCode::BadUnexpectedError);
167 };
168
169 match r {
170 SessionActivity::KeepAliveSucceeded => state.current_failed_keep_alive_count = 0,
171 SessionActivity::KeepAliveFailed(status_code) => {
172 session_warn!(slf.inner, "Keep alive failed: {status_code}");
173 state.current_failed_keep_alive_count += 1;
174 if !state.currently_closing
175 && state.current_failed_keep_alive_count >= slf.max_failed_keep_alive_count
176 && slf.max_failed_keep_alive_count != 0
177 {
178 session_error!(slf.inner, "Maximum number of failed keep-alives exceed limit, session will be closed.");
179 state.currently_closing = true;
180 let s = slf.inner.clone();
181 state.disconnect_fut = async move {
182 s.disconnect_inner(false, false).await
183 }.boxed();
184 }
185 },
186 }
187
188 Ok((
189 SessionPollResult::SessionActivity(r),
190 SessionEventLoopState::Connected(state),
191 ))
192 }
193 r = state.subscriptions.next() => {
194 let Some(r) = r else {
196 session_error!(slf.inner, "Subscription event loop ended unexpectedly");
197 return Err(StatusCode::BadUnexpectedError);
198 };
199
200 if let SubscriptionActivity::FatalFailure(e) = &r {
201 if !state.currently_closing {
202 session_error!(slf.inner, "Fatal error from subscription loop ({e}), session will be closed.");
203 state.currently_closing = true;
204 let s = slf.inner.clone();
205 state.disconnect_fut = async move {
206 s.disconnect_inner(false, false).await
207 }.boxed();
208 }
209 }
210
211 Ok((
212 SessionPollResult::Subscription(r),
213 SessionEventLoopState::Connected(state),
214 ))
215 }
216 _ = &mut state.disconnect_fut => {
217 Ok((
220 SessionPollResult::FinishedDisconnect,
221 SessionEventLoopState::Connected(state)
222 ))
223 }
224 }
225 }
226 SessionEventLoopState::Disconnected => {
227 let connector = SessionConnector::new(slf.inner.clone());
228
229 let _ = slf.inner.state_watch_tx.send(SessionState::Connecting);
230
231 Ok((
232 SessionPollResult::BeginConnect,
233 SessionEventLoopState::Connecting(
234 connector,
235 slf.retry.new_backoff(),
236 Instant::now(),
237 ),
238 ))
239 }
240 SessionEventLoopState::Connecting(connector, mut backoff, next_try) => {
241 tokio::time::sleep_until(next_try.into()).await;
242
243 match connector.try_connect(&slf.connector).await {
244 Ok((channel, result)) => {
245 let _ = slf.inner.state_watch_tx.send(SessionState::Connected);
246 Ok((
247 SessionPollResult::Reconnected(result),
248 SessionEventLoopState::Connected(ConnectedState {
249 channel,
250 keep_alive: SessionActivityLoop::new(
251 slf.inner.clone(),
252 slf.keep_alive_interval,
253 )
254 .run()
255 .boxed(),
256 subscriptions: SubscriptionEventLoop::new(
257 slf.inner.clone(),
258 slf.trigger_publish_recv.clone(),
259 )
260 .run()
261 .boxed(),
262 current_failed_keep_alive_count: 0,
263 currently_closing: false,
264 disconnect_fut: futures::future::pending().boxed(),
265 }),
266 ))
267 }
268 Err(e) => {
269 warn!("Failed to connect to server, status code: {e}");
270 match backoff.next() {
271 Some(x) => Ok((
272 SessionPollResult::ReconnectFailed(e),
273 SessionEventLoopState::Connecting(
274 connector,
275 backoff,
276 Instant::now() + x,
277 ),
278 )),
279 None => Err(e),
280 }
281 }
282 }
283 }
284 }?;
285
286 Ok(Some((res, (slf, state))))
287 },
288 )
289 }
290}
291
292#[derive(Debug, Clone)]
294pub enum SessionActivity {
295 KeepAliveSucceeded,
297 KeepAliveFailed(StatusCode),
299}
300
301enum SessionTickEvent {
302 KeepAlive,
303}
304
305struct SessionIntervals {
306 keep_alive: tokio::time::Interval,
307}
308
309impl SessionIntervals {
310 fn new(keep_alive_interval: Duration) -> Self {
311 let mut keep_alive = tokio::time::interval(keep_alive_interval);
312 keep_alive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
313
314 Self { keep_alive }
315 }
316
317 async fn next(&mut self) -> SessionTickEvent {
318 tokio::select! {
319 _ = self.keep_alive.tick() => SessionTickEvent::KeepAlive
320 }
321 }
322}
323
324struct SessionActivityLoop {
325 inner: Arc<Session>,
326 tick_gen: SessionIntervals,
327}
328
329impl SessionActivityLoop {
330 fn new(inner: Arc<Session>, keep_alive_interval: Duration) -> Self {
331 Self {
332 inner,
333 tick_gen: SessionIntervals::new(keep_alive_interval),
334 }
335 }
336
337 fn run(self) -> impl Stream<Item = SessionActivity> {
338 futures::stream::unfold(self, |mut slf| async move {
339 match slf.tick_gen.next().await {
340 SessionTickEvent::KeepAlive => {
341 let now = Instant::now();
342 let res = slf
343 .inner
344 .read(
345 &[ReadValueId {
346 node_id: VariableId::Server_ServerStatus_State.into(),
347 attribute_id: AttributeId::Value as u32,
348 index_range: Default::default(),
349 data_encoding: QualifiedName::null(),
350 }],
351 TimestampsToReturn::Server,
352 1f64,
353 )
354 .await;
355 let elapsed = now.elapsed();
356
357 let data_value = match res.map(|r| r.into_iter().next()) {
358 Ok(Some(data_value)) => {
359 slf.inner
362 .publish_limits_watch_tx
363 .send_modify(|limits| limits.update_message_roundtrip(elapsed));
364 data_value
365 }
366 Ok(None) => {
369 return Some((
370 SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
371 slf,
372 ))
373 }
374 Err(e) => return Some((SessionActivity::KeepAliveFailed(e), slf)),
375 };
376
377 match data_value.value.and_then(|v| v.try_cast_to().ok()) {
378 Some(0) => Some((SessionActivity::KeepAliveSucceeded, slf)),
379 Some(s) => {
380 warn!("Keep alive failed, non-running status code {s}");
381 Some((
382 SessionActivity::KeepAliveFailed(StatusCode::BadServerHalted),
383 slf,
384 ))
385 }
386 None => Some((
387 SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
388 slf,
389 )),
390 }
391 }
392 }
393 })
394 }
395}