1#![forbid(unsafe_code)]
10
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock, RwLock};
13
14pub use irox_time;
15use irox_time::epoch::UnixTimestamp;
16use irox_time::Duration;
17use irox_tools::random::{Random, PRNG};
18use irox_tools::sync::SynchronizedOptional;
19use irox_tools::vec::RetainTake;
20
21pub mod console;
22pub mod read;
23pub mod write;
24
25static RAND: OnceLock<Mutex<Random>> = OnceLock::new();
26
27fn get_random_id() -> u64 {
29 if let Ok(mut rand) = RAND.get_or_init(|| Mutex::new(Random::default())).lock() {
30 return rand.next_u64();
31 };
32 Random::default().next_u64()
33}
34
35pub trait ProgressPrinter {
38 fn track_task_progress(&self, task: &Task);
40}
41
42#[derive(Debug, Copy, Clone, Eq, PartialEq)]
43pub enum TaskElementUnits {
44 None,
45 Bytes,
46 Bits,
47}
48
49#[derive(Debug)]
50struct TaskInner {
51 id: AtomicU64,
52 name: String,
53 counter: AtomicU64,
54 max_elements: AtomicU64,
55 current_status: SynchronizedOptional<String>,
56 _element_units: TaskElementUnits,
57 created: UnixTimestamp,
58 started: OnceLock<UnixTimestamp>,
59 ended: OnceLock<UnixTimestamp>,
60 remaining: RwLock<Duration>,
61 children: RwLock<Vec<Task>>,
62}
63
64#[derive(Debug, Clone)]
76pub struct Task {
77 inner: Arc<TaskInner>,
78 cancelled: Arc<AtomicBool>,
79}
80
81impl Task {
82 #[must_use]
84 pub fn new(id: u64, name: String, max_elements: u64) -> Task {
85 let inner = TaskInner {
86 id: AtomicU64::new(id),
87 name,
88 max_elements: AtomicU64::new(max_elements),
89 _element_units: TaskElementUnits::None,
90 counter: AtomicU64::new(0),
91 current_status: SynchronizedOptional::empty(),
92 children: RwLock::new(Vec::new()),
93 created: UnixTimestamp::now(),
94 started: OnceLock::new(),
95 ended: OnceLock::new(),
96 remaining: RwLock::new(Duration::default()),
97 };
98 Task {
99 inner: Arc::new(inner),
100 cancelled: Arc::new(AtomicBool::new(false)),
101 }
102 }
103
104 #[must_use]
106 pub fn new_infinite(id: u64, name: String) -> Task {
107 Self::new(id, name, u64::MAX)
108 }
109
110 #[must_use]
112 pub fn new_infinite_named(name: String) -> Task {
113 let id = get_random_id();
114 Task::new_infinite(id, name)
115 }
116
117 #[must_use]
119 pub fn new_named(name: String, max_elements: u64) -> Task {
120 let id = get_random_id();
121 Task::new(id, name, max_elements)
122 }
123
124 #[must_use]
126 pub fn current_progress_count(&self) -> u64 {
127 self.inner.counter.load(Ordering::SeqCst)
128 }
129
130 pub fn set_current_progress_count(&self, current_progress: u64) {
132 self.inner.counter.store(current_progress, Ordering::SeqCst);
133 }
134
135 #[must_use]
137 pub fn max_elements(&self) -> u64 {
138 self.inner.max_elements.load(Ordering::SeqCst)
139 }
140
141 pub fn set_max_elements(&self, max_elements: u64) {
142 self.inner
143 .max_elements
144 .store(max_elements, Ordering::SeqCst)
145 }
146
147 #[must_use]
149 pub fn current_progress_frac(&self) -> f64 {
150 let cur = self.current_progress_count() as f64;
151 let max = self.max_elements() as f64;
152 cur / max
153 }
154
155 #[must_use]
157 pub fn get_id(&self) -> u64 {
158 self.inner.id.load(Ordering::SeqCst)
159 }
160
161 #[must_use]
163 pub fn get_name(&self) -> &str {
164 self.inner.name.as_str()
165 }
166
167 #[must_use]
169 pub fn get_created(&self) -> UnixTimestamp {
170 self.inner.created
171 }
172
173 #[must_use]
175 pub fn get_started(&self) -> Option<&UnixTimestamp> {
176 self.inner.started.get()
177 }
178
179 pub fn mark_one_completed(&self) {
181 let completed = self.inner.counter.fetch_add(1, Ordering::SeqCst);
182 self.update_remaining();
183 if completed == self.max_elements() {
184 self.mark_ended();
185 }
186 }
187
188 fn update_remaining(&self) {
189 let completed = self.inner.counter.load(Ordering::SeqCst);
190 if completed > 0 {
191 if let Some(started) = self.get_started() {
192 let mult = 1. / self.current_progress_frac();
193 let elapsed = started.elapsed();
194 let est_end = elapsed * mult;
195 if let Ok(mut remaining) = self.inner.remaining.write() {
196 *remaining = est_end - elapsed;
197 }
198 }
199 }
200 }
201
202 pub fn mark_all_completed(&self) {
204 self.inner
205 .counter
206 .store(self.max_elements(), Ordering::SeqCst);
207 if let Ok(mut remaining) = self.inner.remaining.write() {
208 *remaining = Duration::default();
209 }
210 self.mark_ended();
211 }
212
213 pub fn mark_some_completed(&self, completed: u64) {
215 self.inner.counter.fetch_add(completed, Ordering::SeqCst);
216 self.update_remaining()
217 }
218
219 pub fn get_remaining_time(&self) -> Duration {
220 if let Ok(remaining) = self.inner.remaining.read() {
221 return *remaining;
222 }
223 Duration::default()
224 }
225
226 pub fn mark_started(&self) {
228 let _res = self.inner.started.set(UnixTimestamp::now());
229 }
230
231 #[must_use]
233 pub fn get_ended(&self) -> Option<&UnixTimestamp> {
234 self.inner.ended.get()
235 }
236
237 pub fn mark_ended(&self) {
239 let _res = self.inner.ended.set(UnixTimestamp::now());
240 }
241
242 #[must_use]
244 pub fn num_children(&self) -> usize {
245 let read = self.inner.children.read();
246 let Ok(read) = read else {
247 return 0;
248 };
249 read.len()
250 }
251
252 pub fn each_child<F: FnMut(&Task)>(&self, func: F) {
254 let read = self.inner.children.read();
255 let Ok(read) = read else {
256 return;
257 };
258 read.iter().for_each(func)
259 }
260
261 pub fn clean_completed_children(&self) -> Vec<Task> {
262 if let Ok(mut write) = self.inner.children.write() {
263 return write.retain_take(Task::is_complete);
264 }
265 vec![]
266 }
267
268 #[must_use]
271 pub fn new_child_task(&self, id: u64, name: String, max_elements: u64) -> Task {
272 loop {
273 let write = self.inner.children.write();
274 if let Ok(mut write) = write {
275 let task = Task::new(id, name, max_elements);
276 let t2 = task.clone();
277 write.push(task);
278 return t2;
279 };
280 }
281 }
282
283 pub fn push_new_child_task(&self, task: Task) {
286 let write = self.inner.children.write();
287 if let Ok(mut write) = write {
288 write.push(task)
289 }
290 }
291
292 #[must_use]
294 pub fn is_complete(&self) -> bool {
295 self.inner.ended.get().is_some() || self.current_progress_frac() >= 1.
296 }
297
298 pub fn cancel(&self) {
301 self.cancelled.store(true, Ordering::Relaxed);
302 self.each_child(|ch| {
303 ch.cancel();
304 })
305 }
306
307 #[must_use]
310 pub fn is_cancelled(&self) -> bool {
311 self.cancelled.load(Ordering::Relaxed)
312 }
313
314 #[must_use]
316 pub fn current_status(&self) -> Option<Arc<String>> {
317 self.inner.current_status.get()
318 }
319
320 pub fn set_current_status<T: AsRef<str>>(&self, status: Option<T>) {
322 let _res = self
323 .inner
324 .current_status
325 .set(status.map(|v| v.as_ref().to_string()));
326 }
327}
328
329#[macro_export]
330macro_rules! get_human {
331 ($inp:ident) => {{
332 let temp = ((1. + $inp).log10() / 3.) as u32;
333 let chr = match temp {
334 0 => "",
335 1 => "K",
336 2 => "M",
337 3 => "G",
338 4 => "T",
339 5 => "P",
340 6 => "E",
341 _ => "?",
342 };
343 let inp = $inp / 10f64.powf(3. * temp as f64);
344 (inp, chr)
345 }};
346}