Skip to main content

opcua_client/session/services/subscriptions/
event_loop_state.rs

1use std::{future::Future, time::Instant};
2
3use futures::{stream::FuturesUnordered, StreamExt};
4use opcua_types::StatusCode;
5use tokio::{select, sync::watch::Receiver};
6use tracing::debug;
7
8use crate::{
9    session::{services::subscriptions::PublishLimits, session_debug, session_error},
10    SubscriptionActivity,
11};
12
13/// A trait for managing subscription state in the event loop.
14///
15/// This is just a handle to something that track subscriptions,
16/// letting us query when the next publish should be sent.
17pub trait SubscriptionCache {
18    /// Get and update the time for the next publish. If `set_last_publish` is true,
19    /// the last publish time is updated to now, affecting future calls to this method.
20    fn next_publish_time(&mut self, set_last_publish: bool) -> Option<Instant>;
21}
22
23/// The state machine for the subscription event loop.
24///
25/// This is made generic and removed from the subscription event loop to make it
26/// possible for users to implement their own event loop that doesn't depend on the
27/// `Session`, which can allow for several useful features that we are unlikely to implement
28/// in the `Session` itself, such as:
29///
30///  - Backpressure, letting users replace the `publish` implementation with one that
31///    waits for the consumer to be ready before passing the publish response to the
32///    event loop.
33///  - Custom subscription caches, for example for persisting subscription state.
34pub struct SubscriptionEventLoopState<T, R, S> {
35    trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
36    futures: FuturesUnordered<T>,
37    last_external_trigger: Instant,
38    // This is true if the client has received BadTooManyPublishRequests
39    // and is waiting for a response before making further requests.
40    waiting_for_response: bool,
41    // This is true if the client has received a no_subscriptions response,
42    // and is waiting for a manual trigger or successful response before resuming publishing.
43    no_active_subscription: bool,
44    /// Receiver for publish limits updates
45    publish_limits_rx: Receiver<PublishLimits>,
46    publish_source: R,
47    subscription_cache: S,
48    session_id: u32,
49}
50
51enum ActivityOrNext {
52    Activity(SubscriptionActivity),
53    Next(Option<Instant>),
54}
55
56impl<T: Future<Output = Result<bool, StatusCode>>, R: Fn() -> T, S: SubscriptionCache>
57    SubscriptionEventLoopState<T, R, S>
58{
59    /// Construct a new subscription cache.
60    ///
61    /// # Arguments
62    ///
63    /// * `session_id` - The session id for logging purposes.
64    /// * `trigger_publish_recv` - A channel used to transmit external publish triggers.
65    ///   This is used to trigger publish outside of the normal schedule, for example when
66    ///   a new subscription is created.
67    /// * `publish_limits_rx` - A channel used to receive updates to publish limits.
68    /// * `publish_source` - A function that produces a future that performs a publish operation.
69    /// * `subscription_cache` - An implementation of the [SubscriptionCache] trait.
70    pub fn new(
71        session_id: u32,
72        trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
73        publish_limits_rx: Receiver<PublishLimits>,
74        publish_source: R,
75        subscription_cache: S,
76    ) -> Self {
77        let last_external_trigger = *trigger_publish_recv.borrow();
78        Self {
79            last_external_trigger,
80            trigger_publish_recv,
81            futures: FuturesUnordered::new(),
82            waiting_for_response: false,
83            no_active_subscription: true,
84            publish_limits_rx,
85            publish_source,
86            subscription_cache,
87            session_id,
88        }
89    }
90
91    fn wait_for_next_tick(
92        &self,
93        next_publish: Option<Instant>,
94    ) -> impl Future<Output = ()> + 'static {
95        // Deliberately create a future that doesn't capture `self` at all.
96        let should_wait_for_response = self.waiting_for_response && !self.futures.is_empty();
97        async move {
98            if should_wait_for_response {
99                futures::future::pending().await
100            } else if let Some(next_publish) = next_publish {
101                tokio::time::sleep_until(next_publish.into()).await;
102            } else {
103                futures::future::pending().await
104            }
105        }
106    }
107
108    async fn wait_for_next_publish(&mut self) -> Result<bool, StatusCode> {
109        if self.futures.is_empty() {
110            futures::future::pending().await
111        } else {
112            self.futures
113                .next()
114                .await
115                .unwrap_or(Err(StatusCode::BadInvalidState))
116        }
117    }
118
119    fn session_id(&self) -> u32 {
120        self.session_id
121    }
122
123    /// Run an iteration of the event loop, returning each time a publish message is received.
124    pub async fn iter_loop(&mut self) -> SubscriptionActivity {
125        let mut next = self.subscription_cache.next_publish_time(false);
126        let mut recv = self.trigger_publish_recv.clone();
127        loop {
128            match self.tick(next, &mut recv).await {
129                ActivityOrNext::Activity(a) => return a,
130                ActivityOrNext::Next(n) => next = n,
131            }
132        }
133    }
134
135    async fn tick(
136        &mut self,
137        mut next_publish: Option<Instant>,
138        recv: &mut Receiver<Instant>,
139    ) -> ActivityOrNext {
140        let last_external_trigger = self.last_external_trigger;
141        select! {
142            v = recv.wait_for(|i| i > &last_external_trigger) => {
143                if let Ok(v) = v {
144                    if !self.waiting_for_response {
145                        debug!("Sending publish due to external trigger");
146                        // On an external trigger, we always publish.
147                        self.futures.push((self.publish_source)());
148                        next_publish = self.subscription_cache.next_publish_time(true);
149                        self.last_external_trigger = *v;
150                    } else {
151                        debug!("Skipping publish due BadTooManyPublishRequests");
152                    }
153                }
154                self.no_active_subscription = false;
155                ActivityOrNext::Next(next_publish)
156            }
157            _ = self.wait_for_next_tick(next_publish) => {
158                if !self.no_active_subscription && self.futures.len()
159                    < self
160                        .publish_limits_rx
161                        .borrow()
162                        .max_publish_requests
163                {
164                    if !self.waiting_for_response {
165                        debug!("Sending publish due to internal tick");
166                        self.futures.push((self.publish_source)());
167                    } else {
168                        debug!("Skipping publish due BadTooManyPublishRequests");
169                    }
170                }
171                ActivityOrNext::Next(self.subscription_cache.next_publish_time(true))
172            }
173            res = self.wait_for_next_publish() => {
174                match res {
175                    Ok(more_notifications) => {
176                        if more_notifications
177                            || self.futures.len()
178                                < self
179                                    .publish_limits_rx
180                                    .borrow()
181                                    .min_publish_requests
182                        {
183                            if !self.waiting_for_response {
184                                debug!("Sending publish after receiving response");
185                                self.futures.push((self.publish_source)());
186                                // Set the last publish time to to avoid a buildup
187                                // of publish requests if exhausting the queue takes
188                                // more time than a single publishing interval.
189                                self.subscription_cache.next_publish_time(true);
190                            } else {
191                                debug!("Skipping publish due BadTooManyPublishRequests");
192                            }
193                        }
194                        self.waiting_for_response = false;
195                        self.no_active_subscription = false;
196                        ActivityOrNext::Activity(SubscriptionActivity::Publish)
197                    }
198                    Err(e) => {
199                        match e {
200                            StatusCode::BadTimeout => {
201                                session_debug!(self, "Publish request timed out");
202                            }
203                            StatusCode::BadTooManyPublishRequests => {
204                                session_debug!(
205                                    self,
206                                    "Server returned BadTooManyPublishRequests, backing off",
207                                );
208                                self.waiting_for_response = true;
209                            }
210                            StatusCode::BadSessionClosed
211                            | StatusCode::BadSessionIdInvalid => {
212                                // If this happens we will probably eventually fail keep-alive, defer to that.
213                                session_error!(self, "Publish response indicates session is dead");
214                                return ActivityOrNext::Activity(SubscriptionActivity::FatalFailure(e))
215                            }
216                            StatusCode::BadNoSubscription => {
217                                session_debug!(
218                                    self,
219                                    "Publish response indicates that there are no subscriptions"
220                                );
221                                self.no_active_subscription = true;
222                            },
223                            _ => ()
224                        }
225                        ActivityOrNext::Activity(SubscriptionActivity::PublishFailed(e))
226                    }
227                }
228            },
229        }
230    }
231}