annatto/
progress.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use crate::{error::AnnattoError, workflow::StatusMessage, workflow::StatusSender, StepID};
use log::{info, warn};
use std::sync::{Arc, Mutex};

struct ProgressState {
    tx: Option<StatusSender>,
    accumulated_finished_work: usize,
}

pub struct ProgressReporter {
    state: Arc<Mutex<ProgressState>>,
    total_work: Option<usize>,
    step_id: StepID,
}

impl ProgressReporter {
    pub fn new(
        tx: Option<StatusSender>,
        step_id: StepID,
        total_work: usize,
    ) -> Result<ProgressReporter, AnnattoError> {
        let state = ProgressState {
            tx,
            accumulated_finished_work: 0,
        };
        let reporter = ProgressReporter {
            state: Arc::new(Mutex::new(state)),
            step_id,
            total_work: Some(total_work),
        };
        // Send a first status report so any listener can get the total number of steps to perform
        reporter.worked(0)?;
        Ok(reporter)
    }

    pub fn new_unknown_total_work(
        tx: Option<StatusSender>,
        step_id: StepID,
    ) -> Result<ProgressReporter, AnnattoError> {
        let state = ProgressState {
            tx,
            accumulated_finished_work: 0,
        };
        let reporter = ProgressReporter {
            state: Arc::new(Mutex::new(state)),
            step_id,
            total_work: None,
        };
        // Send a first status report so any listener can get the total number of steps to perform
        reporter.worked(0)?;
        Ok(reporter)
    }

    pub fn info(&self, msg: &str) -> Result<(), AnnattoError> {
        let state = self.state.lock()?;
        if let Some(ref tx) = state.tx {
            tx.send(StatusMessage::Info(msg.to_string()))?;
        } else {
            info!("{}", msg);
        }
        Ok(())
    }

    pub fn warn(&self, msg: &str) -> Result<(), AnnattoError> {
        let state = self.state.lock()?;
        if let Some(ref tx) = state.tx {
            tx.send(StatusMessage::Warning(msg.to_string()))?;
        } else {
            warn!("{}", msg);
        }
        Ok(())
    }

    pub fn worked(&self, finished_work: usize) -> Result<(), AnnattoError> {
        let mut state = self.state.lock()?;
        state.accumulated_finished_work += finished_work;

        if let Some(ref tx) = state.tx {
            tx.send(StatusMessage::Progress {
                id: self.step_id.clone(),
                total_work: self.total_work,
                finished_work: state.accumulated_finished_work,
            })?;
        }
        Ok(())
    }
}