use std::{sync::Arc, time::Instant};
use futures::Stream;
use opcua_types::StatusCode;
use crate::{
session::services::subscriptions::event_loop_state::{
SubscriptionCache, SubscriptionEventLoopState,
},
Session,
};
#[derive(Debug)]
pub enum SubscriptionActivity {
Publish,
PublishFailed(StatusCode),
FatalFailure(StatusCode),
}
pub(crate) struct SubscriptionEventLoop {
session: Arc<Session>,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
}
impl SubscriptionEventLoop {
pub(crate) fn new(
session: Arc<Session>,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
) -> Self {
Self {
trigger_publish_recv,
session,
}
}
pub(crate) fn run(self) -> impl Stream<Item = SubscriptionActivity> {
let session_ref = self.session.clone();
futures::stream::unfold(
SubscriptionEventLoopState::new(
self.session.session_id(),
self.trigger_publish_recv,
self.session.publish_limits_watch_rx.clone(),
move || {
let session = session_ref.clone();
async move { session.publish().await }
},
SessionSubscriptionCache {
inner: self.session.clone(),
},
),
|mut state| async move {
let res = state.iter_loop().await;
Some((res, state))
},
)
}
}
struct SessionSubscriptionCache {
inner: Arc<Session>,
}
impl SubscriptionCache for SessionSubscriptionCache {
fn next_publish_time(&mut self, update: bool) -> Option<Instant> {
self.inner.next_publish_time(update)
}
}