Skip to main content

studiole_command/services/
cli_progress.rs

1use crate::prelude::*;
2use indicatif::ProgressBar;
3use tokio::spawn;
4use tokio::sync::broadcast::error::RecvError;
5use tracing::{error, warn};
6
7/// Display command progress as a terminal progress bar.
8pub struct CliProgress<T: ICommandInfo> {
9    mediator: Arc<CommandMediator<T>>,
10    bar: Arc<ProgressBar>,
11    handle: Mutex<Option<JoinHandle<()>>>,
12    finished: Arc<Mutex<bool>>,
13}
14
15impl<T: ICommandInfo + 'static> CliProgress<T> {
16    /// Create a new [`CliProgress`] backed by a [`CommandMediator`].
17    #[must_use]
18    pub fn new(mediator: Arc<CommandMediator<T>>) -> Self {
19        Self {
20            mediator,
21            bar: Arc::new(ProgressBar::new(0)),
22            handle: Mutex::default(),
23            finished: Arc::new(Mutex::new(false)),
24        }
25    }
26
27    /// Start listening for events and updating the progress bar.
28    pub async fn start(&self) {
29        let mut handle_guard = self.handle.lock().await;
30        if handle_guard.is_some() {
31            return;
32        }
33        let mediator = self.mediator.clone();
34        let mut receiver = mediator.subscribe();
35        let bar = self.bar.clone();
36        let finished = self.finished.clone();
37        let mut total: u64 = 0;
38        let handle = spawn(async move {
39            while !*finished.lock().await {
40                let event = match receiver.recv().await {
41                    Err(RecvError::Lagged(count)) => {
42                        warn!("CLI Progress missed {count} events due to lagging");
43                        continue;
44                    }
45                    Err(RecvError::Closed) => {
46                        error!("Event pipe was closed. CLI Progress can't proceed.");
47                        break;
48                    }
49                    Ok(event) => event,
50                };
51                match event.get_kind() {
52                    EventKind::Queued => {
53                        total += 1;
54                        bar.set_length(total);
55                    }
56                    EventKind::Executing => {}
57                    EventKind::Succeeded | EventKind::Failed => {
58                        bar.inc(1);
59                    }
60                }
61            }
62        });
63        *handle_guard = Some(handle);
64    }
65
66    /// Stop listening and mark the progress bar as finished.
67    pub async fn finish(&self) {
68        let mut finished_guard = self.finished.lock().await;
69        *finished_guard = true;
70        drop(finished_guard);
71        let mut handle_guard = self.handle.lock().await;
72        if let Some(handle) = handle_guard.take() {
73            handle.abort();
74        }
75        drop(handle_guard);
76        self.bar.finish();
77    }
78}
79
80impl<T: ICommandInfo + 'static> Service for CliProgress<T> {
81    type Error = ServiceError;
82
83    async fn from_services(services: &ServiceProvider) -> Result<Self, Report<Self::Error>> {
84        Ok(Self::new(services.get_service().await?))
85    }
86}