Skip to main content

studiole_command/services/
command_events.rs

1use crate::prelude::*;
2use tokio::spawn;
3use tokio::sync::broadcast::error::RecvError;
4use tracing::{error, warn};
5
6/// Collect and query command lifecycle events.
7pub struct CommandEvents<T: ICommandInfo> {
8    mediator: Arc<CommandMediator<T>>,
9    events: Arc<Mutex<Vec<T::Event>>>,
10    handle: Mutex<Option<JoinHandle<()>>>,
11}
12
13/// Aggregate counts of command lifecycle events.
14#[derive(Debug, Default)]
15pub struct CommandEventCounts {
16    /// Number of queued events received.
17    pub queued: usize,
18    /// Number of executing events received.
19    pub executing: usize,
20    /// Number of succeeded events received.
21    pub succeeded: usize,
22    /// Number of failed events received.
23    pub failed: usize,
24}
25
26impl<T: ICommandInfo + 'static> CommandEvents<T> {
27    /// Create a new [`CommandEvents`] backed by a [`CommandMediator`].
28    #[must_use]
29    pub fn new(mediator: Arc<CommandMediator<T>>) -> Self {
30        Self {
31            mediator,
32            events: Arc::default(),
33            handle: Mutex::default(),
34        }
35    }
36
37    /// Start listening for events from the [`CommandMediator`].
38    pub async fn start(&self) {
39        let mut handle_guard = self.handle.lock().await;
40        if handle_guard.is_some() {
41            return;
42        }
43        let mediator = self.mediator.clone();
44        let mut receiver = mediator.subscribe();
45        let events = self.events.clone();
46        let handle = spawn(async move {
47            loop {
48                match receiver.recv().await {
49                    Err(RecvError::Lagged(count)) => {
50                        warn!("CommandEvents missed {count} events due to lagging");
51                    }
52                    Err(RecvError::Closed) => {
53                        error!("Event pipe was closed. CommandEvents can't proceed.");
54                        break;
55                    }
56                    Ok(event) => {
57                        let mut events_guard = events.lock().await;
58                        events_guard.push(event);
59                        drop(events_guard);
60                    }
61                }
62            }
63        });
64        *handle_guard = Some(handle);
65    }
66
67    /// Lock and return the collected events.
68    pub async fn get(&self) -> MutexGuard<'_, Vec<T::Event>> {
69        self.events.lock().await
70    }
71
72    /// Count events by [`EventKind`].
73    pub async fn count(&self) -> CommandEventCounts {
74        let mut counts = CommandEventCounts::default();
75        let events = self.events.lock().await;
76        for event in events.iter() {
77            match event.get_kind() {
78                EventKind::Queued => counts.queued += 1,
79                EventKind::Executing => counts.executing += 1,
80                EventKind::Succeeded => counts.succeeded += 1,
81                EventKind::Failed => counts.failed += 1,
82            }
83        }
84        counts
85    }
86}
87
88impl CommandEventCounts {
89    /// Estimated number of commands currently waiting in the queue.
90    ///
91    /// - Returns `None` if the subtraction overflows
92    /// - For accuracy, [`CommandEvents::start`] must be called before any events occur
93    #[must_use]
94    pub fn get_currently_queued(&self) -> Option<usize> {
95        self.queued.checked_sub(self.executing)
96    }
97
98    /// Estimated number of commands currently being executed.
99    ///
100    /// - Returns `None` if the subtraction overflows
101    /// - For accuracy, [`CommandEvents::start`] must be called before any events occur
102    #[must_use]
103    pub fn get_currently_executing(&self) -> Option<usize> {
104        self.executing
105            .checked_sub(self.succeeded)?
106            .checked_sub(self.failed)
107    }
108}
109
110impl<T: ICommandInfo + 'static> Service for CommandEvents<T> {
111    type Error = ServiceError;
112
113    async fn from_services(services: &ServiceProvider) -> Result<Self, Report<Self::Error>> {
114        Ok(Self::new(services.get_service().await?))
115    }
116}