1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
use {
    crossbeam::channel::{self, bounded, select, Receiver},
    std::thread,
    termimad::TimedEvent,
};

pub enum Either<A, B> {
    First(A),
    Second(B),
}

#[derive(Debug, Clone)]
pub enum ComputationResult<V> {
    NotComputed, // not computed but will probably be
    Done(V),
    None, // nothing to compute, cancelled, failed, etc.
}
impl<V> ComputationResult<V> {
    pub fn is_done(&self) -> bool {
        matches!(&self, Self::Done(_))
    }
    pub fn is_not_computed(&self) -> bool {
        matches!(&self, Self::NotComputed)
    }
    pub fn is_some(&self) -> bool {
        !matches!(&self, Self::None)
    }
    pub fn is_none(&self) -> bool {
        matches!(&self, Self::None)
    }
}

/// The dam controls the flow of events.
/// A dam is used in broot to manage long computations and,
/// when the user presses a key, either tell the computation
/// to stop (the computation function checking `has_event`)
/// or drop the computation.
pub struct Dam {
    receiver: Receiver<TimedEvent>,
    in_dam: Option<TimedEvent>,
}

impl Dam {
    pub fn from(receiver: Receiver<TimedEvent>) -> Self {
        Self {
            receiver,
            in_dam: None,
        }
    }
    pub fn unlimited() -> Self {
        Self::from(channel::never())
    }

    /// provide an observer which can be used for periodic
    /// check a task can be used.
    /// The observer can safely be moved to another thread
    /// but Be careful not to use it
    /// after the event listener started again. In any case
    /// using try_compute should be preferred for immediate
    /// return to the ui thread.

    pub fn observer(&self) -> DamObserver {
        DamObserver::from(self)
    }

    /// launch the computation on a new thread and return
    /// when it finishes or when a new event appears on
    /// the channel.
    /// Note that the task itself isn't interrupted so that
    /// this should not be used when many tasks are expected
    /// to be launched (or it would result in many working
    /// threads uselessly working in the background) : use
    /// dam.has_event from inside the task whenever possible.
    pub fn try_compute<V: Send + 'static, F: Send + 'static + FnOnce() -> ComputationResult<V>>(
        &mut self,
        f: F,
    ) -> ComputationResult<V> {
        let (comp_sender, comp_receiver) = bounded(1);
        thread::spawn(move || {
            let comp_res = time!("comp in dam", f());
            if comp_sender.send(comp_res).is_err() {
                debug!("no channel at end of computation");
            }
        });
        self.select(comp_receiver)
    }

    pub fn select<V>(
        &mut self,
        comp_receiver: Receiver<ComputationResult<V>>,
    ) -> ComputationResult<V> {
        if self.in_dam.is_some() {
            // should probably not happen
            debug!("There's already an event in dam");
            ComputationResult::None
        } else {
            select! {
                recv(self.receiver) -> event => {
                    // interruption
                    debug!("dam interrupts computation");
                    self.in_dam = event.ok();
                    ComputationResult::None
                }
                recv(comp_receiver) -> comp_res => {
                    // computation finished
                    comp_res.unwrap_or(ComputationResult::None)
                }
            }
        }
    }

    /// non blocking
    pub fn has_event(&self) -> bool {
        !self.receiver.is_empty()
    }

    /// block until next event (including the one which
    ///  may have been pushed back into the dam).
    /// no event means the source is dead (i.e. we
    /// must quit broot)
    /// There's no event kept in dam after this call.
    pub fn next_event(&mut self) -> Option<TimedEvent> {
        if self.in_dam.is_some() {
            self.in_dam.take()
        } else {
            match self.receiver.recv() {
                Ok(event) => Some(event),
                Err(_) => {
                    debug!("dead dam"); // should be logged once
                    None
                }
            }
        }
    }

    // or maybed return either Option<TimedEvent> or Option<T> ?
    pub fn next<T>(&mut self, other: &Receiver<T>) -> Either<Option<TimedEvent>, Option<T>> {
        if self.in_dam.is_some() {
            Either::First(self.in_dam.take())
        } else {
            select! {
                recv(self.receiver) -> event => Either::First(match event {
                    Ok(event) => Some(event),
                    Err(_) => {
                        debug!("dead dam"); // should be logged once
                        None
                    }
                }),
                recv(other) -> o => Either::Second(match o {
                    Ok(o) => Some(o),
                    Err(_) => {
                        debug!("dead other");
                        None
                    }
                }),
            }
        }
    }
}

pub struct DamObserver {
    receiver: Receiver<TimedEvent>,
}
impl DamObserver {
    pub fn from(dam: &Dam) -> Self {
        Self {
            receiver: dam.receiver.clone(),
        }
    }
    /// be careful that this can be used as a thread
    /// stop condition only before the event receiver
    /// start being active to avoid a race condition.
    pub fn has_event(&self) -> bool {
        !self.receiver.is_empty()
    }
}

/// wraps either a computation in progress, or a finished
/// one (even a failed or useless one).
/// This can be stored in a map to avoid starting computations
/// more than once.
#[derive(Debug, Clone)]
pub enum Computation<V> {
    InProgress(Receiver<ComputationResult<V>>),
    Finished(ComputationResult<V>),
}