use std::{future::Future, time::Instant};
use futures::{stream::FuturesUnordered, StreamExt};
use opcua_types::StatusCode;
use tokio::{select, sync::watch::Receiver};
use tracing::debug;
use crate::{
session::{services::subscriptions::PublishLimits, session_debug, session_error},
SubscriptionActivity,
};
pub trait SubscriptionCache {
fn next_publish_time(&mut self, set_last_publish: bool) -> Option<Instant>;
}
pub struct SubscriptionEventLoopState<T, R, S> {
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
futures: FuturesUnordered<T>,
last_external_trigger: Instant,
waiting_for_response: bool,
no_active_subscription: bool,
publish_limits_rx: Receiver<PublishLimits>,
publish_source: R,
subscription_cache: S,
session_id: u32,
}
enum ActivityOrNext {
Activity(SubscriptionActivity),
Next(Option<Instant>),
}
impl<T: Future<Output = Result<bool, StatusCode>>, R: Fn() -> T, S: SubscriptionCache>
SubscriptionEventLoopState<T, R, S>
{
pub fn new(
session_id: u32,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
publish_limits_rx: Receiver<PublishLimits>,
publish_source: R,
subscription_cache: S,
) -> Self {
let last_external_trigger = *trigger_publish_recv.borrow();
Self {
last_external_trigger,
trigger_publish_recv,
futures: FuturesUnordered::new(),
waiting_for_response: false,
no_active_subscription: true,
publish_limits_rx,
publish_source,
subscription_cache,
session_id,
}
}
fn wait_for_next_tick(
&self,
next_publish: Option<Instant>,
) -> impl Future<Output = ()> + 'static {
let should_wait_for_response = self.waiting_for_response && !self.futures.is_empty();
async move {
if should_wait_for_response {
futures::future::pending().await
} else if let Some(next_publish) = next_publish {
tokio::time::sleep_until(next_publish.into()).await;
} else {
futures::future::pending().await
}
}
}
async fn wait_for_next_publish(&mut self) -> Result<bool, StatusCode> {
if self.futures.is_empty() {
futures::future::pending().await
} else {
self.futures
.next()
.await
.unwrap_or(Err(StatusCode::BadInvalidState))
}
}
fn session_id(&self) -> u32 {
self.session_id
}
pub async fn iter_loop(&mut self) -> SubscriptionActivity {
let mut next = self.subscription_cache.next_publish_time(false);
let mut recv = self.trigger_publish_recv.clone();
loop {
match self.tick(next, &mut recv).await {
ActivityOrNext::Activity(a) => return a,
ActivityOrNext::Next(n) => next = n,
}
}
}
async fn tick(
&mut self,
mut next_publish: Option<Instant>,
recv: &mut Receiver<Instant>,
) -> ActivityOrNext {
let last_external_trigger = self.last_external_trigger;
select! {
v = recv.wait_for(|i| i > &last_external_trigger) => {
if let Ok(v) = v {
if !self.waiting_for_response {
debug!("Sending publish due to external trigger");
self.futures.push((self.publish_source)());
next_publish = self.subscription_cache.next_publish_time(true);
self.last_external_trigger = *v;
} else {
debug!("Skipping publish due BadTooManyPublishRequests");
}
}
self.no_active_subscription = false;
ActivityOrNext::Next(next_publish)
}
_ = self.wait_for_next_tick(next_publish) => {
if !self.no_active_subscription && self.futures.len()
< self
.publish_limits_rx
.borrow()
.max_publish_requests
{
if !self.waiting_for_response {
debug!("Sending publish due to internal tick");
self.futures.push((self.publish_source)());
} else {
debug!("Skipping publish due BadTooManyPublishRequests");
}
}
ActivityOrNext::Next(self.subscription_cache.next_publish_time(true))
}
res = self.wait_for_next_publish() => {
match res {
Ok(more_notifications) => {
if more_notifications
|| self.futures.len()
< self
.publish_limits_rx
.borrow()
.min_publish_requests
{
if !self.waiting_for_response {
debug!("Sending publish after receiving response");
self.futures.push((self.publish_source)());
self.subscription_cache.next_publish_time(true);
} else {
debug!("Skipping publish due BadTooManyPublishRequests");
}
}
self.waiting_for_response = false;
self.no_active_subscription = false;
ActivityOrNext::Activity(SubscriptionActivity::Publish)
}
Err(e) => {
match e {
StatusCode::BadTimeout => {
session_debug!(self, "Publish request timed out");
}
StatusCode::BadTooManyPublishRequests => {
session_debug!(
self,
"Server returned BadTooManyPublishRequests, backing off",
);
self.waiting_for_response = true;
}
StatusCode::BadSessionClosed
| StatusCode::BadSessionIdInvalid => {
session_error!(self, "Publish response indicates session is dead");
return ActivityOrNext::Activity(SubscriptionActivity::FatalFailure(e))
}
StatusCode::BadNoSubscription => {
session_debug!(
self,
"Publish response indicates that there are no subscriptions"
);
self.no_active_subscription = true;
},
_ => ()
}
ActivityOrNext::Activity(SubscriptionActivity::PublishFailed(e))
}
}
},
}
}
}