opcua_client/session/services/subscriptions/
event_loop_state.rs1use std::{future::Future, time::Instant};
2
3use futures::{stream::FuturesUnordered, StreamExt};
4use opcua_types::StatusCode;
5use tokio::{select, sync::watch::Receiver};
6use tracing::debug;
7
8use crate::{
9 session::{services::subscriptions::PublishLimits, session_debug, session_error},
10 SubscriptionActivity,
11};
12
13pub trait SubscriptionCache {
18 fn next_publish_time(&mut self, set_last_publish: bool) -> Option<Instant>;
21}
22
23pub struct SubscriptionEventLoopState<T, R, S> {
35 trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
36 futures: FuturesUnordered<T>,
37 last_external_trigger: Instant,
38 waiting_for_response: bool,
41 no_active_subscription: bool,
44 publish_limits_rx: Receiver<PublishLimits>,
46 publish_source: R,
47 subscription_cache: S,
48 session_id: u32,
49}
50
51enum ActivityOrNext {
52 Activity(SubscriptionActivity),
53 Next(Option<Instant>),
54}
55
56impl<T: Future<Output = Result<bool, StatusCode>>, R: Fn() -> T, S: SubscriptionCache>
57 SubscriptionEventLoopState<T, R, S>
58{
59 pub fn new(
71 session_id: u32,
72 trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
73 publish_limits_rx: Receiver<PublishLimits>,
74 publish_source: R,
75 subscription_cache: S,
76 ) -> Self {
77 let last_external_trigger = *trigger_publish_recv.borrow();
78 Self {
79 last_external_trigger,
80 trigger_publish_recv,
81 futures: FuturesUnordered::new(),
82 waiting_for_response: false,
83 no_active_subscription: true,
84 publish_limits_rx,
85 publish_source,
86 subscription_cache,
87 session_id,
88 }
89 }
90
91 fn wait_for_next_tick(
92 &self,
93 next_publish: Option<Instant>,
94 ) -> impl Future<Output = ()> + 'static {
95 let should_wait_for_response = self.waiting_for_response && !self.futures.is_empty();
97 async move {
98 if should_wait_for_response {
99 futures::future::pending().await
100 } else if let Some(next_publish) = next_publish {
101 tokio::time::sleep_until(next_publish.into()).await;
102 } else {
103 futures::future::pending().await
104 }
105 }
106 }
107
108 async fn wait_for_next_publish(&mut self) -> Result<bool, StatusCode> {
109 if self.futures.is_empty() {
110 futures::future::pending().await
111 } else {
112 self.futures
113 .next()
114 .await
115 .unwrap_or(Err(StatusCode::BadInvalidState))
116 }
117 }
118
119 fn session_id(&self) -> u32 {
120 self.session_id
121 }
122
123 pub async fn iter_loop(&mut self) -> SubscriptionActivity {
125 let mut next = self.subscription_cache.next_publish_time(false);
126 let mut recv = self.trigger_publish_recv.clone();
127 loop {
128 match self.tick(next, &mut recv).await {
129 ActivityOrNext::Activity(a) => return a,
130 ActivityOrNext::Next(n) => next = n,
131 }
132 }
133 }
134
135 async fn tick(
136 &mut self,
137 mut next_publish: Option<Instant>,
138 recv: &mut Receiver<Instant>,
139 ) -> ActivityOrNext {
140 let last_external_trigger = self.last_external_trigger;
141 select! {
142 v = recv.wait_for(|i| i > &last_external_trigger) => {
143 if let Ok(v) = v {
144 if !self.waiting_for_response {
145 debug!("Sending publish due to external trigger");
146 self.futures.push((self.publish_source)());
148 next_publish = self.subscription_cache.next_publish_time(true);
149 self.last_external_trigger = *v;
150 } else {
151 debug!("Skipping publish due BadTooManyPublishRequests");
152 }
153 }
154 self.no_active_subscription = false;
155 ActivityOrNext::Next(next_publish)
156 }
157 _ = self.wait_for_next_tick(next_publish) => {
158 if !self.no_active_subscription && self.futures.len()
159 < self
160 .publish_limits_rx
161 .borrow()
162 .max_publish_requests
163 {
164 if !self.waiting_for_response {
165 debug!("Sending publish due to internal tick");
166 self.futures.push((self.publish_source)());
167 } else {
168 debug!("Skipping publish due BadTooManyPublishRequests");
169 }
170 }
171 ActivityOrNext::Next(self.subscription_cache.next_publish_time(true))
172 }
173 res = self.wait_for_next_publish() => {
174 match res {
175 Ok(more_notifications) => {
176 if more_notifications
177 || self.futures.len()
178 < self
179 .publish_limits_rx
180 .borrow()
181 .min_publish_requests
182 {
183 if !self.waiting_for_response {
184 debug!("Sending publish after receiving response");
185 self.futures.push((self.publish_source)());
186 self.subscription_cache.next_publish_time(true);
190 } else {
191 debug!("Skipping publish due BadTooManyPublishRequests");
192 }
193 }
194 self.waiting_for_response = false;
195 self.no_active_subscription = false;
196 ActivityOrNext::Activity(SubscriptionActivity::Publish)
197 }
198 Err(e) => {
199 match e {
200 StatusCode::BadTimeout => {
201 session_debug!(self, "Publish request timed out");
202 }
203 StatusCode::BadTooManyPublishRequests => {
204 session_debug!(
205 self,
206 "Server returned BadTooManyPublishRequests, backing off",
207 );
208 self.waiting_for_response = true;
209 }
210 StatusCode::BadSessionClosed
211 | StatusCode::BadSessionIdInvalid => {
212 session_error!(self, "Publish response indicates session is dead");
214 return ActivityOrNext::Activity(SubscriptionActivity::FatalFailure(e))
215 }
216 StatusCode::BadNoSubscription => {
217 session_debug!(
218 self,
219 "Publish response indicates that there are no subscriptions"
220 );
221 self.no_active_subscription = true;
222 },
223 _ => ()
224 }
225 ActivityOrNext::Activity(SubscriptionActivity::PublishFailed(e))
226 }
227 }
228 },
229 }
230 }
231}