studiole_command/services/
command_events.rs1use crate::prelude::*;
2use tokio::spawn;
3use tokio::sync::broadcast::error::RecvError;
4use tracing::{error, warn};
5
6pub struct CommandEvents<T: ICommandInfo> {
8 mediator: Arc<CommandMediator<T>>,
9 events: Arc<Mutex<Vec<T::Event>>>,
10 handle: Mutex<Option<JoinHandle<()>>>,
11}
12
13#[derive(Debug, Default)]
15pub struct CommandEventCounts {
16 pub queued: usize,
18 pub executing: usize,
20 pub succeeded: usize,
22 pub failed: usize,
24}
25
26impl<T: ICommandInfo + 'static> CommandEvents<T> {
27 #[must_use]
29 pub fn new(mediator: Arc<CommandMediator<T>>) -> Self {
30 Self {
31 mediator,
32 events: Arc::default(),
33 handle: Mutex::default(),
34 }
35 }
36
37 pub async fn start(&self) {
39 let mut handle_guard = self.handle.lock().await;
40 if handle_guard.is_some() {
41 return;
42 }
43 let mediator = self.mediator.clone();
44 let mut receiver = mediator.subscribe();
45 let events = self.events.clone();
46 let handle = spawn(async move {
47 loop {
48 match receiver.recv().await {
49 Err(RecvError::Lagged(count)) => {
50 warn!("CommandEvents missed {count} events due to lagging");
51 }
52 Err(RecvError::Closed) => {
53 error!("Event pipe was closed. CommandEvents can't proceed.");
54 break;
55 }
56 Ok(event) => {
57 let mut events_guard = events.lock().await;
58 events_guard.push(event);
59 drop(events_guard);
60 }
61 }
62 }
63 });
64 *handle_guard = Some(handle);
65 }
66
67 pub async fn get(&self) -> MutexGuard<'_, Vec<T::Event>> {
69 self.events.lock().await
70 }
71
72 pub async fn count(&self) -> CommandEventCounts {
74 let mut counts = CommandEventCounts::default();
75 let events = self.events.lock().await;
76 for event in events.iter() {
77 match event.get_kind() {
78 EventKind::Queued => counts.queued += 1,
79 EventKind::Executing => counts.executing += 1,
80 EventKind::Succeeded => counts.succeeded += 1,
81 EventKind::Failed => counts.failed += 1,
82 }
83 }
84 counts
85 }
86}
87
88impl CommandEventCounts {
89 #[must_use]
94 pub fn get_currently_queued(&self) -> Option<usize> {
95 self.queued.checked_sub(self.executing)
96 }
97
98 #[must_use]
103 pub fn get_currently_executing(&self) -> Option<usize> {
104 self.executing
105 .checked_sub(self.succeeded)?
106 .checked_sub(self.failed)
107 }
108}
109
110impl<T: ICommandInfo + 'static> Service for CommandEvents<T> {
111 type Error = ServiceError;
112
113 async fn from_services(services: &ServiceProvider) -> Result<Self, Report<Self::Error>> {
114 Ok(Self::new(services.get_service().await?))
115 }
116}