Skip to main content

opcua_client/session/services/subscriptions/
event_loop.rs

1use std::{sync::Arc, time::Instant};
2
3use futures::Stream;
4use opcua_types::StatusCode;
5
6use crate::{
7    session::services::subscriptions::event_loop_state::{
8        SubscriptionCache, SubscriptionEventLoopState,
9    },
10    Session,
11};
12
13/// An event on the subscription event loop.
14#[derive(Debug)]
15pub enum SubscriptionActivity {
16    /// A publish request received a successful response.
17    Publish,
18    /// A publish request failed, either due to a timeout or an error.
19    /// The publish request will typically be retried.
20    PublishFailed(StatusCode),
21    /// Fatal failure, a publishing request has failed fatally in a way
22    /// that indicates it will not recover on its own.
23    /// This typically means the client has lost connection to the server.
24    /// When this is received by the session event loop it triggers a session restart.
25    FatalFailure(StatusCode),
26}
27
28/// An event loop for running periodic subscription tasks.
29///
30/// This handles publshing on a fixed interval, republishing failed requests,
31/// and subscription keep-alive.
32pub(crate) struct SubscriptionEventLoop {
33    session: Arc<Session>,
34    trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
35}
36
37impl SubscriptionEventLoop {
38    /// Create a new subscription event loop for `session`
39    ///
40    /// # Arguments
41    ///
42    ///  * `session` - A shared reference to an [AsyncSession].
43    ///  * `trigger_publish_recv` - A channel used to transmit external publish triggers.
44    ///    This is used to trigger publish outside of the normal schedule, for example when
45    ///    a new subscription is created.
46    pub(crate) fn new(
47        session: Arc<Session>,
48        trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
49    ) -> Self {
50        Self {
51            trigger_publish_recv,
52            session,
53        }
54    }
55
56    /// Run the subscription event loop, returning a stream that produces
57    /// [SubscriptionActivity] enums, reporting activity to the session event loop.
58    pub(crate) fn run(self) -> impl Stream<Item = SubscriptionActivity> {
59        let session_ref = self.session.clone();
60
61        futures::stream::unfold(
62            SubscriptionEventLoopState::new(
63                self.session.session_id(),
64                self.trigger_publish_recv,
65                self.session.publish_limits_watch_rx.clone(),
66                move || {
67                    let session = session_ref.clone();
68                    async move { session.publish().await }
69                },
70                SessionSubscriptionCache {
71                    inner: self.session.clone(),
72                },
73            ),
74            |mut state| async move {
75                let res = state.iter_loop().await;
76                Some((res, state))
77            },
78        )
79    }
80}
81
82struct SessionSubscriptionCache {
83    inner: Arc<Session>,
84}
85
86impl SubscriptionCache for SessionSubscriptionCache {
87    fn next_publish_time(&mut self, update: bool) -> Option<Instant> {
88        self.inner.next_publish_time(update)
89    }
90}