use crate::terminal::{Terminal, TerminalWriter};
use crate::{fmt_log, CliState};
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering::{Acquire, Release};
use indicatif::ProgressBar;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::broadcast::Receiver;
use tokio::time::sleep;
const REPORTING_CHANNEL_POLL_DELAY: Duration = Duration::from_millis(20);
#[derive(Debug, Clone, PartialEq)]
pub enum Notification {
Message(String),
Progress(String),
ProgressFinishWithMessage(String),
ProgressFinishAndClear(),
}
impl Notification {
pub fn contents(&self) -> Option<&str> {
match self {
Notification::Message(contents) => Some(contents),
Notification::Progress(contents) => Some(contents),
Notification::ProgressFinishWithMessage(contents) => Some(contents),
Notification::ProgressFinishAndClear() => None,
}
}
pub fn message(contents: impl Into<String>) -> Self {
Self::Message(contents.into())
}
pub fn progress(contents: impl Into<String>) -> Self {
Self::Progress(contents.into())
}
pub fn progress_finish(contents: impl Into<Option<String>>) -> Self {
match contents.into() {
Some(contents) => Self::ProgressFinishWithMessage(contents),
None => Self::ProgressFinishAndClear(),
}
}
}
pub struct NotificationHandle {
stop: Arc<AtomicBool>,
}
impl Drop for NotificationHandle {
fn drop(&mut self) {
self.stop.store(true, Release);
}
}
#[derive(Debug)]
pub struct NotificationHandler<T: TerminalWriter + Debug + Send + 'static> {
rx: Receiver<Notification>,
progress_bar: Option<ProgressBar>,
terminal: Terminal<T>,
stop: Arc<AtomicBool>,
}
impl<T: TerminalWriter + Debug + Send + 'static> NotificationHandler<T> {
pub fn start(cli_state: &CliState, terminal: Terminal<T>) -> NotificationHandle {
let stop = Arc::new(AtomicBool::new(false));
let _self = NotificationHandler {
rx: cli_state.subscribe_to_notifications(),
terminal,
progress_bar: None,
stop: stop.clone(),
};
_self.run();
NotificationHandle { stop }
}
pub fn run(mut self) {
tokio::spawn(async move {
loop {
select! {
_ = sleep(REPORTING_CHANNEL_POLL_DELAY) => {
if self.stop.load(Acquire) {
while let Ok(notification) = self.rx.try_recv() {
self.handle_notification(notification);
}
break;
}
}
notification = self.rx.recv() => {
if let Ok(notification) = notification {
self.handle_notification(notification);
}
else {
break;
}
}
}
}
});
}
fn handle_notification(&mut self, notification: Notification) {
match notification {
Notification::Message(contents) => {
let _ = self.terminal.write_line(contents);
}
Notification::Progress(contents) => {
if self.terminal.can_use_progress_bar() {
if self.progress_bar.is_none() {
self.progress_bar = self.terminal.spinner();
}
if let Some(pb) = self.progress_bar.as_ref() {
pb.set_message(contents);
}
}
else {
let _ = self.terminal.write_line(fmt_log!("{}", contents));
}
}
Notification::ProgressFinishWithMessage(contents) => {
if let Some(pb) = self.progress_bar.take() {
pb.finish_with_message(contents);
}
}
Notification::ProgressFinishAndClear() => {
if let Some(pb) = self.progress_bar.take() {
pb.finish_and_clear();
}
}
}
}
}