use either::Either;
use std::{collections::HashSet, error::Error, fmt::Debug, future::Future};
use tracing::{info, instrument};
use crate::ObserverConfig;
pub trait EventSubject: Sized + Send + Sync {
type Config: Send + Sync;
type Connection;
type Error: Error + 'static;
fn connect(config: &Self::Config) -> impl Future<Output = Result<Self, Self::Error>> + Send;
fn on_init<ML: MsgListener>(
&mut self,
msg_broker: &mut ML,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn on_notify<ML: MsgListener>(
&mut self,
msg_broker: &mut ML,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}
pub trait MsgListener: Sized + Send + Sync {
type Config: Send + Sync;
type Connection;
type Message: Debug + Send;
type Error: Error + 'static;
fn connect(config: &Self::Config) -> impl Future<Output = Result<Self, Self::Error>> + Send;
fn subscribe_to_notifications(
&mut self,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn notification_stream(&mut self) -> impl Future<Output = Option<Self::Message>> + Send;
fn get_current_state(
&mut self,
) -> impl Future<Output = Result<HashSet<String>, <Self as MsgListener>::Error>> + Send;
}
#[instrument(skip(config))]
pub(crate) async fn observer_main_loop<ML, ES>(
config: ObserverConfig<ML, ES>,
) -> Result<(), Either<ML::Error, ES::Error>>
where
ML: MsgListener,
ML::Config: Debug,
ES: EventSubject,
ES::Config: Debug,
{
let mut msg_listener = ML::connect(config.listener_config())
.await
.map_err(Either::Left)?;
msg_listener
.subscribe_to_notifications()
.await
.map_err(Either::Left)?;
let mut event_subject = ES::connect(config.subject_config())
.await
.map_err(Either::Right)?;
event_subject
.on_init(&mut msg_listener)
.await
.map_err(Either::Right)?;
info!("Initial operations performed. Entering main loop to monitor for notifications.");
while let Some(msg) = msg_listener.notification_stream().await {
info!("Received message: {:?}", msg);
event_subject
.on_notify(&mut msg_listener)
.await
.map_err(Either::Right)?;
}
Ok(())
}