Skip to main content

opcua_client/session/
event_loop.rs

1use std::{
2    sync::{atomic::Ordering, Arc},
3    time::{Duration, Instant},
4};
5
6use futures::{future::BoxFuture, stream::BoxStream, FutureExt, Stream, StreamExt, TryStreamExt};
7use tracing::warn;
8
9use crate::{
10    retry::{ExponentialBackoff, SessionRetryPolicy},
11    session::{session_error, session_warn},
12    transport::{Connector, SecureChannelEventLoop, Transport, TransportPollResult},
13};
14use opcua_types::{
15    AttributeId, QualifiedName, ReadValueId, StatusCode, TimestampsToReturn, VariableId,
16};
17
18use super::{
19    connect::{SessionConnectMode, SessionConnector},
20    services::subscriptions::event_loop::{SubscriptionActivity, SubscriptionEventLoop},
21    Session, SessionState,
22};
23
24/// A list of possible events that happens while polling the session.
25/// The client can use this list to monitor events such as disconnects,
26/// publish failures, etc.
27#[derive(Debug)]
28#[non_exhaustive]
29pub enum SessionPollResult {
30    /// A message was sent to or received from the server.
31    Transport(TransportPollResult),
32    /// Connection was lost with the inner [`StatusCode`].
33    ConnectionLost(StatusCode),
34    /// Reconnecting to the server failed with the inner [`StatusCode`].
35    ReconnectFailed(StatusCode),
36    /// Session was reconnected, the mode is given by the innner [`SessionConnectMode`]
37    Reconnected(SessionConnectMode),
38    /// The session performed some periodic activity.
39    SessionActivity(SessionActivity),
40    /// The session performed some subscription-related activity.
41    Subscription(SubscriptionActivity),
42    /// The session begins (re)connecting to the server.
43    BeginConnect,
44    /// Disconnect due to a keep alive terminated.
45    FinishedDisconnect,
46}
47
48struct ConnectedState<T: Transport + Send + Sync + 'static> {
49    channel: SecureChannelEventLoop<T>,
50    keep_alive: BoxStream<'static, SessionActivity>,
51    subscriptions: BoxStream<'static, SubscriptionActivity>,
52    current_failed_keep_alive_count: u64,
53    currently_closing: bool,
54    disconnect_fut: BoxFuture<'static, Result<(), StatusCode>>,
55}
56
57// The way this is passed around, the Connected state being larger is
58// not generally a problem, since it should be the most common state by far.
59#[allow(clippy::large_enum_variant)]
60enum SessionEventLoopState<T: Transport + Send + Sync + 'static> {
61    Connected(ConnectedState<T>),
62    Connecting(SessionConnector, ExponentialBackoff, Instant),
63    Disconnected,
64}
65
66/// The session event loop drives the client. It must be polled for anything to happen at all.
67#[must_use = "The session event loop must be started for the session to work"]
68pub struct SessionEventLoop<T: Connector + Send + Sync + 'static> {
69    inner: Arc<Session>,
70    trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
71    retry: SessionRetryPolicy,
72    keep_alive_interval: Duration,
73    max_failed_keep_alive_count: u64,
74    connector: T,
75}
76
77impl<T: Connector + Send + Sync + 'static> SessionEventLoop<T> {
78    pub(crate) fn new(
79        inner: Arc<Session>,
80        retry: SessionRetryPolicy,
81        trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
82        keep_alive_interval: Duration,
83        max_failed_keep_alive_count: u64,
84        connector: T,
85    ) -> Self {
86        Self {
87            inner,
88            retry,
89            trigger_publish_recv,
90            keep_alive_interval,
91            max_failed_keep_alive_count,
92            connector,
93        }
94    }
95
96    /// Convenience method for running the session event loop until completion,
97    /// this method will return once the session is closed manually, or
98    /// after it fails to reconnect.
99    ///
100    /// # Returns
101    ///
102    /// * `StatusCode` - [Status code](StatusCode) indicating how the session terminated.
103    pub async fn run(self) -> StatusCode {
104        let stream = self.enter();
105        tokio::pin!(stream);
106        loop {
107            let r = stream.try_next().await;
108
109            match r {
110                Ok(None) => break StatusCode::Good,
111                Err(e) => break e,
112                _ => (),
113            }
114        }
115    }
116
117    /// Convenience method for running the session event loop until completion on a tokio task.
118    /// This method will return a [`JoinHandle`](tokio::task::JoinHandle) that will terminate
119    /// once the session is closed manually, or after it fails to reconnect.
120    ///
121    /// # Returns
122    ///
123    /// * `JoinHandle<StatusCode>` - Handle to a tokio task wrapping the event loop.
124    pub fn spawn(self) -> tokio::task::JoinHandle<StatusCode> {
125        tokio::task::spawn(self.run())
126    }
127
128    /// Start the event loop, returning a stream that must be polled until it is closed.
129    /// The stream will return `None` when the transport is closed manually, or
130    /// `Some(Err(StatusCode))` when the stream fails to reconnect after a loss of connection.
131    ///
132    /// It yields events from normal session operation, which can be used to take specific actions
133    /// based on changes to the session state.
134    pub fn enter(self) -> impl Stream<Item = Result<SessionPollResult, StatusCode>> {
135        futures::stream::try_unfold(
136            (self, SessionEventLoopState::Disconnected),
137            |(slf, state)| async move {
138                let (res, state) = match state {
139                    SessionEventLoopState::Connected(mut state) => {
140                        tokio::select! {
141                            r = state.channel.poll() => {
142                                if let TransportPollResult::Closed(code) = r {
143                                    session_warn!(slf.inner, "Transport disconnected: {code}");
144                                    let _ = slf.inner.state_watch_tx.send(SessionState::Disconnected);
145
146                                    let should_reconnect = slf.inner.should_reconnect.load(Ordering::Relaxed);
147                                    if !should_reconnect {
148                                        return Ok(None);
149                                    }
150
151                                    Ok((
152                                        SessionPollResult::ConnectionLost(code),
153                                        SessionEventLoopState::Disconnected,
154                                    ))
155                                } else {
156                                    Ok((
157                                        SessionPollResult::Transport(r),
158                                        SessionEventLoopState::Connected(state),
159                                    ))
160                                }
161                            }
162                            r = state.keep_alive.next() => {
163                                // Should never be null, fail out
164                                let Some(r) = r else {
165                                    session_error!(slf.inner, "Session activity loop ended unexpectedly");
166                                    return Err(StatusCode::BadUnexpectedError);
167                                };
168
169                                match r {
170                                    SessionActivity::KeepAliveSucceeded => state.current_failed_keep_alive_count = 0,
171                                    SessionActivity::KeepAliveFailed(status_code) => {
172                                        session_warn!(slf.inner, "Keep alive failed: {status_code}");
173                                        state.current_failed_keep_alive_count += 1;
174                                        if !state.currently_closing
175                                            && state.current_failed_keep_alive_count >= slf.max_failed_keep_alive_count
176                                            && slf.max_failed_keep_alive_count != 0
177                                        {
178                                            session_error!(slf.inner, "Maximum number of failed keep-alives exceed limit, session will be closed.");
179                                            state.currently_closing = true;
180                                            let s = slf.inner.clone();
181                                            state.disconnect_fut = async move {
182                                                s.disconnect_inner(false, false).await
183                                            }.boxed();
184                                        }
185                                    },
186                                }
187
188                                Ok((
189                                    SessionPollResult::SessionActivity(r),
190                                    SessionEventLoopState::Connected(state),
191                                ))
192                            }
193                            r = state.subscriptions.next() => {
194                                // Should never be null, fail out
195                                let Some(r) = r else {
196                                    session_error!(slf.inner, "Subscription event loop ended unexpectedly");
197                                    return Err(StatusCode::BadUnexpectedError);
198                                };
199
200                                if let SubscriptionActivity::FatalFailure(e) = &r {
201                                    if !state.currently_closing {
202                                        session_error!(slf.inner, "Fatal error from subscription loop ({e}), session will be closed.");
203                                        state.currently_closing = true;
204                                        let s = slf.inner.clone();
205                                        state.disconnect_fut = async move {
206                                            s.disconnect_inner(false, false).await
207                                        }.boxed();
208                                    }
209                                }
210
211                                Ok((
212                                    SessionPollResult::Subscription(r),
213                                    SessionEventLoopState::Connected(state),
214                                ))
215                            }
216                            _ = &mut state.disconnect_fut => {
217                                // Do nothing, if this terminates we will very soon be transitioning
218                                // to a disconnected state.
219                                Ok((
220                                    SessionPollResult::FinishedDisconnect,
221                                    SessionEventLoopState::Connected(state)
222                                ))
223                            }
224                        }
225                    }
226                    SessionEventLoopState::Disconnected => {
227                        let connector = SessionConnector::new(slf.inner.clone());
228
229                        let _ = slf.inner.state_watch_tx.send(SessionState::Connecting);
230
231                        Ok((
232                            SessionPollResult::BeginConnect,
233                            SessionEventLoopState::Connecting(
234                                connector,
235                                slf.retry.new_backoff(),
236                                Instant::now(),
237                            ),
238                        ))
239                    }
240                    SessionEventLoopState::Connecting(connector, mut backoff, next_try) => {
241                        tokio::time::sleep_until(next_try.into()).await;
242
243                        match connector.try_connect(&slf.connector).await {
244                            Ok((channel, result)) => {
245                                let _ = slf.inner.state_watch_tx.send(SessionState::Connected);
246                                Ok((
247                                    SessionPollResult::Reconnected(result),
248                                    SessionEventLoopState::Connected(ConnectedState {
249                                        channel,
250                                        keep_alive: SessionActivityLoop::new(
251                                            slf.inner.clone(),
252                                            slf.keep_alive_interval,
253                                        )
254                                        .run()
255                                        .boxed(),
256                                        subscriptions: SubscriptionEventLoop::new(
257                                            slf.inner.clone(),
258                                            slf.trigger_publish_recv.clone(),
259                                        )
260                                        .run()
261                                        .boxed(),
262                                        current_failed_keep_alive_count: 0,
263                                        currently_closing: false,
264                                        disconnect_fut: futures::future::pending().boxed(),
265                                    }),
266                                ))
267                            }
268                            Err(e) => {
269                                warn!("Failed to connect to server, status code: {e}");
270                                match backoff.next() {
271                                    Some(x) => Ok((
272                                        SessionPollResult::ReconnectFailed(e),
273                                        SessionEventLoopState::Connecting(
274                                            connector,
275                                            backoff,
276                                            Instant::now() + x,
277                                        ),
278                                    )),
279                                    None => Err(e),
280                                }
281                            }
282                        }
283                    }
284                }?;
285
286                Ok(Some((res, (slf, state))))
287            },
288        )
289    }
290}
291
292/// Periodic activity performed by the session.
293#[derive(Debug, Clone)]
294pub enum SessionActivity {
295    /// A keep alive request was sent to the server and a response was received with a successful state.
296    KeepAliveSucceeded,
297    /// A keep alive request was sent to the server, but it failed or the server was in an invalid state.
298    KeepAliveFailed(StatusCode),
299}
300
301enum SessionTickEvent {
302    KeepAlive,
303}
304
305struct SessionIntervals {
306    keep_alive: tokio::time::Interval,
307}
308
309impl SessionIntervals {
310    fn new(keep_alive_interval: Duration) -> Self {
311        let mut keep_alive = tokio::time::interval(keep_alive_interval);
312        keep_alive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
313
314        Self { keep_alive }
315    }
316
317    async fn next(&mut self) -> SessionTickEvent {
318        tokio::select! {
319            _ = self.keep_alive.tick() => SessionTickEvent::KeepAlive
320        }
321    }
322}
323
324struct SessionActivityLoop {
325    inner: Arc<Session>,
326    tick_gen: SessionIntervals,
327}
328
329impl SessionActivityLoop {
330    fn new(inner: Arc<Session>, keep_alive_interval: Duration) -> Self {
331        Self {
332            inner,
333            tick_gen: SessionIntervals::new(keep_alive_interval),
334        }
335    }
336
337    fn run(self) -> impl Stream<Item = SessionActivity> {
338        futures::stream::unfold(self, |mut slf| async move {
339            match slf.tick_gen.next().await {
340                SessionTickEvent::KeepAlive => {
341                    let now = Instant::now();
342                    let res = slf
343                        .inner
344                        .read(
345                            &[ReadValueId {
346                                node_id: VariableId::Server_ServerStatus_State.into(),
347                                attribute_id: AttributeId::Value as u32,
348                                index_range: Default::default(),
349                                data_encoding: QualifiedName::null(),
350                            }],
351                            TimestampsToReturn::Server,
352                            1f64,
353                        )
354                        .await;
355                    let elapsed = now.elapsed();
356
357                    let data_value = match res.map(|r| r.into_iter().next()) {
358                        Ok(Some(data_value)) => {
359                            // Only update if the request was successful to avoid
360                            // skewing the roundtrip time by processing timeouts.
361                            slf.inner
362                                .publish_limits_watch_tx
363                                .send_modify(|limits| limits.update_message_roundtrip(elapsed));
364                            data_value
365                        }
366                        // Should not be possible, this would be a bug in
367                        // the server, assume everything is terrible.
368                        Ok(None) => {
369                            return Some((
370                                SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
371                                slf,
372                            ))
373                        }
374                        Err(e) => return Some((SessionActivity::KeepAliveFailed(e), slf)),
375                    };
376
377                    match data_value.value.and_then(|v| v.try_cast_to().ok()) {
378                        Some(0) => Some((SessionActivity::KeepAliveSucceeded, slf)),
379                        Some(s) => {
380                            warn!("Keep alive failed, non-running status code {s}");
381                            Some((
382                                SessionActivity::KeepAliveFailed(StatusCode::BadServerHalted),
383                                slf,
384                            ))
385                        }
386                        None => Some((
387                            SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
388                            slf,
389                        )),
390                    }
391                }
392            }
393        })
394    }
395}