1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
//! This module provides an interface connecting all components of the program:
//! Traits for Event Subject and Message Broker are defined, facilitating a main loop function based on these abstractions.
use either::Either;
use std::{collections::HashSet, error::Error, fmt::Debug, future::Future};
use tracing::{info, instrument};
use crate::ObserverConfig;
/// Describes structures that can act as an Event Subject, such as an IRC Chat Monitor or any system
/// that needs real-time monitoring and dynamic state alignment based on external conditions.
pub trait EventSubject: Sized + Send + Sync {
/// Configuration needed to connect to the event subject.
type Config: Send + Sync;
/// Connection interface for the event subject.
type Connection;
/// Any Error type returned by operations on the event subject.
type Error: Error + 'static;
/// Establishes connection to the event subject using specific configuration. This should enable
/// further operations without additional preparations.
fn connect(config: &Self::Config) -> impl Future<Output = Result<Self, Self::Error>> + Send;
/// Actions that will be performed on initialization of the program.
fn on_init<ML: MsgListener>(
&mut self,
msg_broker: &mut ML,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// Actions that will be performed whenever we receive a notification from the Message Listener.
fn on_notify<ML: MsgListener>(
&mut self,
msg_broker: &mut ML,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}
/// Describes structures capable of listening to a Message Broker system, facilitating dynamic system adjustments
/// based on Message Broker notifications.
pub trait MsgListener: Sized + Send + Sync {
/// Configuration for connecting to the message broker system.
type Config: Send + Sync;
/// Connection interface for the message broker.
type Connection;
/// Message type received through the broker.
type Message: Debug + Send;
/// Any Error type returned by operations on the message broker.
type Error: Error + 'static;
/// Establishes connection to the message broker using the provided configuration.
// #lizard forgives the complexity # this is a bizarre false-positive
fn connect(config: &Self::Config) -> impl Future<Output = Result<Self, Self::Error>> + Send;
/// Subscribes to notifications, enabling message receipt for state alignments.
fn subscribe_to_notifications(
&mut self,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// Provides a stream of messages, allowing continuous monitoring and response to broker notifications.
fn notification_stream(&mut self) -> impl Future<Output = Option<Self::Message>> + Send;
/// Get the current state of a dataset.
// This function is not used within this crate itself, but is sometimes necessary to implement the `Observer` trait
fn get_current_state(
&mut self,
) -> impl Future<Output = Result<HashSet<String>, <Self as MsgListener>::Error>> + Send;
}
/// Manages a controller given an abstract configuration. This controller orchestrates the interaction between
/// an Event Subject (e.g., a chat monitor) and a Broker, ensuring the Event Subject's state aligns with broker information.
///
/// This function expects a configuration that implements the [`AbstractObserverConfig`] trait, providing necessary
/// details for establishing connections to both the Event Subject and the Broker. After establishing connections,
/// it continuously adjusts the Event Subject's monitoring state based on real-time broker notifications.
///
/// This loop ensures the Event Subject remains synchronized with the desired state as defined by the broker,
/// reacting to updates indefinitely until halted.
///
/// # Errors
/// Returns an error if connecting or aligning states fails, encapsulating errors from both the Event Subject and the Message Broker.
#[instrument(skip(config))]
pub(crate) async fn observer_main_loop<ML, ES>(
config: ObserverConfig<ML, ES>,
) -> Result<(), Either<ML::Error, ES::Error>>
where
ML: MsgListener,
ML::Config: Debug,
ES: EventSubject,
ES::Config: Debug,
{
let mut msg_listener = ML::connect(config.listener_config())
.await
.map_err(Either::Left)?;
msg_listener
.subscribe_to_notifications()
.await
.map_err(Either::Left)?;
let mut event_subject = ES::connect(config.subject_config())
.await
.map_err(Either::Right)?;
// Align the initial state before entering the main observation loop.
event_subject
.on_init(&mut msg_listener)
.await
.map_err(Either::Right)?;
info!("Initial operations performed. Entering main loop to monitor for notifications.");
// Main loop for processing incoming messages and aligning state accordingly.
while let Some(msg) = msg_listener.notification_stream().await {
// Log received messages for debugging and tracking.
info!("Received message: {:?}", msg);
// Attempt to realign the state on each message received from the message broker.
event_subject
.on_notify(&mut msg_listener)
.await
.map_err(Either::Right)?;
}
// If loop exits (e.g., through a stop condition not shown here), the function will conclude.
Ok(())
}