use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use async_broadcast::{Receiver, Sender, broadcast};
use futures_core::Stream;
use pin_project_lite::pin_project;
use crate::{Progress, ProgressUpdate, State};
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
#[derive(Debug, Clone)]
pub struct ProgressUpdater {
total: u64,
current: u64,
completed: bool,
sender: Sender<ProgressUpdate>,
}
impl ProgressUpdater {
const fn new(total: u64, sender: Sender<ProgressUpdate>) -> Self {
Self {
total,
current: 0,
completed: false,
sender,
}
}
pub fn update_with_message(&mut self, current: u64, message: impl Into<String>) {
self.current = current;
let update = ProgressUpdate::new(self.total, current, State::Working, Some(message.into()));
self.broadcast(update);
}
pub fn update(&mut self, current: u64) {
self.current = current;
let update = ProgressUpdate::new(self.total, current, State::Working, None);
self.broadcast(update);
}
pub fn pause(&self) {
let update = ProgressUpdate::new(self.total, self.current, State::Paused, None);
self.broadcast(update);
}
pub fn complete(&mut self) {
if !self.completed {
self.completed = true;
let update = ProgressUpdate::new(self.total, self.current, State::Completed, None);
self.broadcast(update);
}
}
pub fn pause_with_message(&self, message: impl Into<String>) {
let update = ProgressUpdate::new(
self.total,
self.current,
State::Paused,
Some(message.into()),
);
self.broadcast(update);
}
pub fn set_total(&mut self, total: u64) {
self.total = total;
let update = ProgressUpdate::new(self.total, self.current, State::Working, None);
self.broadcast(update);
}
fn broadcast(&self, update: ProgressUpdate) {
let _ = self.sender.try_broadcast(update);
}
pub fn cancel(self) {
}
}
impl Drop for ProgressUpdater {
fn drop(&mut self) {
if !self.completed {
let _ = self.sender.try_broadcast(ProgressUpdate::new(
self.total,
self.current,
State::Cancelled,
None,
));
}
}
}
pin_project! {
struct ProgressFuture<Fut>
where
Fut: Future,
{
receiver: Receiver<ProgressUpdate>,
#[pin]
fut: Fut,
}
}
impl<Fut> Future for ProgressFuture<Fut>
where
Fut: Future,
{
type Output = Fut::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
impl<Fut> Progress for ProgressFuture<Fut>
where
Fut: Future,
{
fn progress(&self) -> impl Stream<Item = ProgressUpdate> + Unpin + Send + 'static {
self.receiver.clone()
}
}
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub fn progress<F, Fut>(total: u64, f: F) -> impl Progress<Output = Fut::Output>
where
F: FnOnce(ProgressUpdater) -> Fut,
Fut: Future,
{
let (sender, receiver) = broadcast(32);
let updater = ProgressUpdater::new(total, sender);
let fut = f(updater);
ProgressFuture { receiver, fut }
}