studiole_command/services/
cli_progress.rs1use crate::prelude::*;
2use indicatif::ProgressBar;
3use tokio::spawn;
4use tokio::sync::broadcast::error::RecvError;
5use tracing::{error, warn};
6
7pub struct CliProgress<T: ICommandInfo> {
9 mediator: Arc<CommandMediator<T>>,
10 bar: Arc<ProgressBar>,
11 handle: Mutex<Option<JoinHandle<()>>>,
12 finished: Arc<Mutex<bool>>,
13}
14
15impl<T: ICommandInfo + 'static> CliProgress<T> {
16 #[must_use]
18 pub fn new(mediator: Arc<CommandMediator<T>>) -> Self {
19 Self {
20 mediator,
21 bar: Arc::new(ProgressBar::new(0)),
22 handle: Mutex::default(),
23 finished: Arc::new(Mutex::new(false)),
24 }
25 }
26
27 pub async fn start(&self) {
29 let mut handle_guard = self.handle.lock().await;
30 if handle_guard.is_some() {
31 return;
32 }
33 let mediator = self.mediator.clone();
34 let mut receiver = mediator.subscribe();
35 let bar = self.bar.clone();
36 let finished = self.finished.clone();
37 let mut total: u64 = 0;
38 let handle = spawn(async move {
39 while !*finished.lock().await {
40 let event = match receiver.recv().await {
41 Err(RecvError::Lagged(count)) => {
42 warn!("CLI Progress missed {count} events due to lagging");
43 continue;
44 }
45 Err(RecvError::Closed) => {
46 error!("Event pipe was closed. CLI Progress can't proceed.");
47 break;
48 }
49 Ok(event) => event,
50 };
51 match event.get_kind() {
52 EventKind::Queued => {
53 total += 1;
54 bar.set_length(total);
55 }
56 EventKind::Executing => {}
57 EventKind::Succeeded | EventKind::Failed => {
58 bar.inc(1);
59 }
60 }
61 }
62 });
63 *handle_guard = Some(handle);
64 }
65
66 pub async fn finish(&self) {
68 let mut finished_guard = self.finished.lock().await;
69 *finished_guard = true;
70 drop(finished_guard);
71 let mut handle_guard = self.handle.lock().await;
72 if let Some(handle) = handle_guard.take() {
73 handle.abort();
74 }
75 drop(handle_guard);
76 self.bar.finish();
77 }
78}
79
80impl<T: ICommandInfo + 'static> Service for CliProgress<T> {
81 type Error = ServiceError;
82
83 async fn from_services(services: &ServiceProvider) -> Result<Self, Report<Self::Error>> {
84 Ok(Self::new(services.get_service().await?))
85 }
86}