1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//! This module provides an interface connecting all components of the program:
//! Traits for Event Subject and Message Broker are defined, facilitating a main loop function based on these abstractions.

use either::Either;
use std::{collections::HashSet, error::Error, fmt::Debug, future::Future};
use tracing::{info, instrument};

use crate::ObserverConfig;

/// Describes structures that can act as an Event Subject, such as an IRC Chat Monitor or any system
/// that needs real-time monitoring and dynamic state alignment based on external conditions.
pub trait EventSubject: Sized + Send + Sync {
    /// Configuration needed to connect to the event subject.
    type Config: Send + Sync;
    /// Connection interface for the event subject.
    type Connection;
    /// Any Error type returned by operations on the event subject.
    type Error: Error + 'static;

    /// Establishes connection to the event subject using specific configuration. This should enable
    /// further operations without additional preparations.
    fn connect(config: &Self::Config) -> impl Future<Output = Result<Self, Self::Error>> + Send;

    /// Actions that will be performed on initialization of the program.
    fn on_init<ML: MsgListener>(
        &mut self,
        msg_broker: &mut ML,
    ) -> impl Future<Output = Result<(), Self::Error>> + Send;

    /// Actions that will be performed whenever we receive a notification from the Message Listener.
    fn on_notify<ML: MsgListener>(
        &mut self,
        msg_broker: &mut ML,
    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
}

/// Describes structures capable of listening to a Message Broker system, facilitating dynamic system adjustments
/// based on Message Broker notifications.
pub trait MsgListener: Sized + Send + Sync {
    /// Configuration for connecting to the message broker system.
    type Config: Send + Sync;
    /// Connection interface for the message broker.
    type Connection;
    /// Message type received through the broker.
    type Message: Debug + Send;
    /// Any Error type returned by operations on the message broker.
    type Error: Error + 'static;

    /// Establishes connection to the message broker using the provided configuration.
    // #lizard forgives the complexity # this is a bizarre false-positive
    fn connect(config: &Self::Config) -> impl Future<Output = Result<Self, Self::Error>> + Send;

    /// Subscribes to notifications, enabling message receipt for state alignments.
    fn subscribe_to_notifications(
        &mut self,
    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
    /// Provides a stream of messages, allowing continuous monitoring and response to broker notifications.
    fn notification_stream(&mut self) -> impl Future<Output = Option<Self::Message>> + Send;

    /// Get the current state of a dataset.
    // This function is not used within this crate itself, but is sometimes necessary to implement the `Observer` trait
    fn get_current_state(
        &mut self,
    ) -> impl Future<Output = Result<HashSet<String>, <Self as MsgListener>::Error>> + Send;
}

/// Manages a controller given an abstract configuration. This controller orchestrates the interaction between
/// an Event Subject (e.g., a chat monitor) and a Broker, ensuring the Event Subject's state aligns with broker information.
///
/// This function expects a configuration that implements the [`AbstractObserverConfig`] trait, providing necessary
/// details for establishing connections to both the Event Subject and the Broker. After establishing connections,
/// it continuously adjusts the Event Subject's monitoring state based on real-time broker notifications.
///
/// This loop ensures the Event Subject remains synchronized with the desired state as defined by the broker,
/// reacting to updates indefinitely until halted.
///
/// # Errors
/// Returns an error if connecting or aligning states fails, encapsulating errors from both the Event Subject and the Message Broker.
#[instrument(skip(config))]
pub(crate) async fn observer_main_loop<ML, ES>(
    config: ObserverConfig<ML, ES>,
) -> Result<(), Either<ML::Error, ES::Error>>
where
    ML: MsgListener,
    ML::Config: Debug,
    ES: EventSubject,
    ES::Config: Debug,
{
    let mut msg_listener = ML::connect(config.listener_config())
        .await
        .map_err(Either::Left)?;
    msg_listener
        .subscribe_to_notifications()
        .await
        .map_err(Either::Left)?;

    let mut event_subject = ES::connect(config.subject_config())
        .await
        .map_err(Either::Right)?;

    // Align the initial state before entering the main observation loop.
    event_subject
        .on_init(&mut msg_listener)
        .await
        .map_err(Either::Right)?;
    info!("Initial operations performed. Entering main loop to monitor for notifications.");

    // Main loop for processing incoming messages and aligning state accordingly.
    while let Some(msg) = msg_listener.notification_stream().await {
        // Log received messages for debugging and tracking.
        info!("Received message: {:?}", msg);

        // Attempt to realign the state on each message received from the message broker.
        event_subject
            .on_notify(&mut msg_listener)
            .await
            .map_err(Either::Right)?;
    }

    // If loop exits (e.g., through a stop condition not shown here), the function will conclude.
    Ok(())
}