opcua_client/session/services/subscriptions/event_loop.rs
1use std::{sync::Arc, time::Instant};
2
3use futures::Stream;
4use opcua_types::StatusCode;
5
6use crate::{
7 session::services::subscriptions::event_loop_state::{
8 SubscriptionCache, SubscriptionEventLoopState,
9 },
10 Session,
11};
12
13/// An event on the subscription event loop.
14#[derive(Debug)]
15pub enum SubscriptionActivity {
16 /// A publish request received a successful response.
17 Publish,
18 /// A publish request failed, either due to a timeout or an error.
19 /// The publish request will typically be retried.
20 PublishFailed(StatusCode),
21 /// Fatal failure, a publishing request has failed fatally in a way
22 /// that indicates it will not recover on its own.
23 /// This typically means the client has lost connection to the server.
24 /// When this is received by the session event loop it triggers a session restart.
25 FatalFailure(StatusCode),
26}
27
28/// An event loop for running periodic subscription tasks.
29///
30/// This handles publshing on a fixed interval, republishing failed requests,
31/// and subscription keep-alive.
32pub(crate) struct SubscriptionEventLoop {
33 session: Arc<Session>,
34 trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
35}
36
37impl SubscriptionEventLoop {
38 /// Create a new subscription event loop for `session`
39 ///
40 /// # Arguments
41 ///
42 /// * `session` - A shared reference to an [AsyncSession].
43 /// * `trigger_publish_recv` - A channel used to transmit external publish triggers.
44 /// This is used to trigger publish outside of the normal schedule, for example when
45 /// a new subscription is created.
46 pub(crate) fn new(
47 session: Arc<Session>,
48 trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
49 ) -> Self {
50 Self {
51 trigger_publish_recv,
52 session,
53 }
54 }
55
56 /// Run the subscription event loop, returning a stream that produces
57 /// [SubscriptionActivity] enums, reporting activity to the session event loop.
58 pub(crate) fn run(self) -> impl Stream<Item = SubscriptionActivity> {
59 let session_ref = self.session.clone();
60
61 futures::stream::unfold(
62 SubscriptionEventLoopState::new(
63 self.session.session_id(),
64 self.trigger_publish_recv,
65 self.session.publish_limits_watch_rx.clone(),
66 move || {
67 let session = session_ref.clone();
68 async move { session.publish().await }
69 },
70 SessionSubscriptionCache {
71 inner: self.session.clone(),
72 },
73 ),
74 |mut state| async move {
75 let res = state.iter_loop().await;
76 Some((res, state))
77 },
78 )
79 }
80}
81
82struct SessionSubscriptionCache {
83 inner: Arc<Session>,
84}
85
86impl SubscriptionCache for SessionSubscriptionCache {
87 fn next_publish_time(&mut self, update: bool) -> Option<Instant> {
88 self.inner.next_publish_time(update)
89 }
90}