1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
//! # Introduction
//!
//! A struct for recording execution status of async tasks with lock-free and async methods.
//!
//! Can host `Future`s and query whether they are **not found**, **successful**, **failed**, or **running**.
//!
//! - Depend on `tokio` with feature `rt`, so cannot use other async runtimes.
//! - Depend on [scc](https://crates.io/crates/scc) for lock-free and async `HashSet`.
//!
//! Use this crate if:
//! - Easy to generate an **unique** `task_id` (not necessarily `String`) for a future (task).
//! - Tasks might fail, and then you want to run it again, while you don't want it to success more then once.
//! - Want to record and query all succeeded tasks and failed tasks.
//! - Want to handling every task in the same state (e.g. `success`).
//!
//! [Example](https://github.com/Ayana-chan/ipfs_storage_cruster/tree/master/crates/async_tasks_recorder/examples).
//!
//! A recorder can only use one `task_id` type. The type of `task_id` should be:
//! - `Eq + Hash + Clone + Send + Sync + 'static`
//! - Cheap to clone (sometimes can use `Arc`).
//!
//! And remember, you can add **anything** in the `Future` to achieve the functionality you want.
//! For example:
//! - Handle your `Result` in `Future`, and then return empty result `Result<(),()>`.
//! - Send a message to a one shot channel at the end of the `Future` to notify upper level that "This task is done".
//! Don't forget to consider using `tokio::spawn` when the channel may not complete sending immediately.
//! - Set other callback functions.
//!
//! > It is recommended to directly look at the source code (about 100 line) if there is any confusion.
//!
//! **NOTE**: This crate use three `HashSet` to make it easy to handle all tasks in the same state.
//! But `scc::HashSet` have less contention in **single** access when it grows larger.
//! Therefore, if you don't need handling every tasks in the same state,
//! then just use `scc::HashMap` (`task_id` \-\> `task_status`) to build a simpler implementation,
//! which might have less contention and clone, but more expansive to iterator.
//!
//! # Usage
//!
//! Launch a task with a **unique** `task_id` and a `Future` by [launch](AsyncTasksRecoder::launch).
//!
//! Query the state of the task with its `task_id`
//! by [query_task_state](AsyncTasksRecoder::query_task_state) or [query_task_state_quick](AsyncTasksRecoder::query_task_state_quick).
//!
//! # Theory & Design
//!
//! ## Abstract Model
//! Here is the three-level structure for thinking about tasks' status:
//! - Level 0: `real_none`, `real_failed`, `real_working`, `real_success` : **Exact status** of the tasks in the CPU (seen by God).
//! - Level 1: `failed_tasks`, `working_tasks`, `success_tasks` : **Containers** to store `task_id`s (a `task_id` can be stored in 0 to 2 containers simultaneously).
//! - Level 2: `Not Found`, `Failed`, `Working`, `Success` : **States** of the task that could be obtained by `query_task_state`.
//!
//! ## State Transition Diagram
//! - `Not Found` \-\-\-\-\> `Working`
//! - `Working` \<\-\-\-\> `Failed`
//! - `Working` \-\-\-\-\> `Success`
//!
//! If you equivalent `Not Found` to `Failed`, then:
//!
//! `Failed` \<\-\-\-\> `Working` \-\-\-\-\> `Success`
//!
//! ## Nature
//! ### About Task
//! 1. A task is **launched** by passing a `Future<Output=Result<R, E>>` with unique `task_id`.
//! 2. A task is `real_success` when return `Ok(R)`, and `real_failed` when return `Err(E)`.
//! 3. Different future with **the same `task_id`** is considered **the same task**.
//! 4. The same task **can only `real_success` once**, e.g. a purchase process would never succeed more then once by launching with unique process id as `task_id`.
//!
//! ### About Task State
//! 1. If a task's state is `Success`, it must be `real_success`, i.e. $\text{Success}(id) \rightarrow \text{real\_success}(id)$.
//! 2. If a task's state is `Failed`, it may be in any status, but mostly `real_failed`.
//! 3. If a task's state is `Working`, it may be in any status, but mostly `real_working`.
//! 4. If a task's state is `Not Found`, it may be in any status, but mostly `real_none`.
//!
//! ### About Task State Transition
//! 1. Any task's state can be **queried** at any time.
//! 2. The initial state of the task is `Not Found`, and won't change immediately after `launch`.
//! 3. Always, when a task whose state is `Failed` or `NotFound` is launched, it will be `Working` at some future moment.
//! 4. Always, when a task is `Working`, it would eventually be `Fail` or `Success`, i.e. $\Box (\text{Working}(id) \rightarrow \lozenge(\text{Fail}(id) \vee \text{Success}(id)))$.
//! 5. Always, when a task is `Success`, it would be `Success` forever, i.e. $\Box (\text{Success}(id) \rightarrow \Box \text{Success}(id))$.
//!
//! ### Other
//! Relationship between states and containers at [query_task_state](AsyncTasksRecoder::query_task_state).
//!
//! Further propositions and proofs at [AsyncTasksRecoder](AsyncTasksRecoder).
//!
//! Use [query_task_state_quick](AsyncTasksRecoder::query_task_state_quick) for less contention.
//!

use std::future::Future;
use std::hash::Hash;
use std::sync::Arc;

/// Thread-safe.
///
/// Everything is public, so hash functions and initial capacity can be customized.
#[derive(Default, Debug)]
pub struct TaskManager<T>
    where T: Eq + Hash {
    /// hot
    pub working_tasks: scc::HashSet<T>,
    pub success_tasks: scc::HashSet<T>,
    pub failed_tasks: scc::HashSet<T>,
}

impl<T> TaskManager<T>
    where T: Eq + Hash {
    /// Create default and empty `TaskManager`
    pub fn new() -> Self {
        TaskManager {
            working_tasks: scc::HashSet::new(),
            success_tasks: scc::HashSet::new(),
            failed_tasks: scc::HashSet::new(),
        }
    }
}

#[derive(Eq, PartialEq, Debug, Clone)]
pub enum TaskState {
    /// running or pending
    Working,
    Success,
    Failed,
    NotFound,
}

/// Arc was used internally, so after `clone`, the same `TaskManager` was used,
/// which means you can share `AsyncTasksRecoder` by clone.
///
/// # Usage
///
/// Launch a task with a **unique** `task_id` and a `Future` by [launch](AsyncTasksRecoder::launch).
///
/// Query the state of the task with its `task_id`
/// by [query_task_state](AsyncTasksRecoder::query_task_state) or [query_task_state_quick](AsyncTasksRecoder::query_task_state_quick).
///
/// # Further Propositions & Proofs
///
/// ## P01
/// **A task (or tasks with the same `task_id`) wouldn't be executed again after first success.**
///
/// When a task fail, it wouldn't break anything. Failure just means the task could be launched again,
/// so if this proposition (**P01**) is true, there is almost nothing to worry about.
/// For further discussion, please refer to **P02**.
///
/// From now on, only consider the situation of success.
///
/// `working_tasks` play the role of lock,
/// which allow tasks with the same `task_id` to execute remaining codes (after `insert` & before `remove`) only once.
/// And before `remove` from `working_tasks`, the succeeded `task_id` has been in `success_tasks`.
///
/// An equivalent pseudocode can be obtained.
/// - `working_tasks` become a **mutex** (maybe a **spin lock**) for one `task_id`.
/// - `success_tasks` become an atomic boolean, which can only change from false to true.
/// - An execution of a task becomes adding on an atomic int (`count`).
///
/// Therefore, if the `count` is never greater than 1, it means that the task will only be called once.
///
/// ```not_rust
/// let working_task_id = mutex::new();
/// let success_task_id = atomic(false);
/// let count = atomic(0);
/// launch_multi_thread {
///     working_task_id.lock();
///     if success_task_id.get() {
///         exit();
///     }
///     count.add(1);
///     success_task_id.set(true);
///     working_task_id.unlock();
/// }
/// assert_eq!(count.get(), 1);
/// ```
///
/// Obviously, `success_tasks.set(true)` can only be executed once.
/// After that, `exit()` is always called.
/// This results in `count.add(1)` being called only once, too. Q.E.D.
///
/// ## P02
/// **Task failure is not harmful for recorder.**
///
/// Considering the situation of failure, the pseudocode becomes like this:
///
/// ```not_rust
/// let working_task_id = mutex::new();
/// let success_task_id = atomic(false);
/// let failed_task_id = atomic(false); // Initially not in `failed_tasks`, but not important
/// let count = atomic(0);
/// launch_multi_thread {
///     working_task_id.lock();
///     if success_task_id.get() {
///         exit();
///     }
///     // Here should be `real_working`
///     failed_task_id.set(false); // So it shouldn't be `Failed`, just remove from `failed_tasks`
///     count.add(1);
///     if real_success {
///         success_task_id.set(true);
///     } else {
///         // `real_failed`
///         failed_task_id.set(true); // become `Failed`
///     }
///     working_task_id.unlock();
/// }
/// assert_eq!(count.get(), 1);
/// ```
///
/// In a launch (critical section by `working_tasks`), the initial value of failed is ignored.
/// Therefore, it's not important whether `failed_tasks` changes are atomic for launches.
///
/// From the perspective of `query_task_state`,
/// `failed_tasks` is only meaningful when `task_id` is in it.
///
/// `task_id` is in `failed_tasks` only when it become `real_failed` and before redo (next `real_working`).
/// Very good.
///
/// ## P03
/// **No state would turn back to `Not found`.**
///
/// From the pseudocode in **P02**:
///
/// ```not_rust
/// // entry `working_task_id`
/// if real_success {
///     success_task_id.set(true);
/// } else {
///     // `real_failed`
///     failed_task_id.set(true); // Become `Failed`
/// }
/// // leave `working_task_id`
/// ```
///
/// It can be found that as long as the task has entered `working_tasks` once,
/// when exiting `working_tasks`,
/// the task must already be in one of the `failed_tasks` or `success_tasks` options.
///
/// So after first `Working`, the task must be in one of `tasks`,
/// then it won't be `Not found` again. Q.E.D.
#[derive(Default, Debug, Clone)]
pub struct AsyncTasksRecoder<T>
    where T: Eq + Hash + Clone + Send + 'static {
    task_manager: Arc<TaskManager<T>>,
}

impl<T> AsyncTasksRecoder<T>
    where T: Eq + Hash + Clone + Send + Sync + 'static {
    /// Create a completely new `AsyncTasksRecoder`.
    pub fn new() -> Self {
        AsyncTasksRecoder {
            task_manager: TaskManager::new().into(),
        }
    }

    /// Create by `TaskManager`
    pub fn new_with_task_manager(task_manager: TaskManager<T>) -> Self {
        AsyncTasksRecoder {
            task_manager: task_manager.into(),
        }
    }

    /// Create by `Arc` of `TaskManager`
    pub fn new_with_task_manager_arc(task_manager: Arc<TaskManager<T>>) -> Self {
        AsyncTasksRecoder {
            task_manager,
        }
    }

    /// Launch task that returns `Result`.
    ///
    /// The return value of task is ignored, so please use other methods to handle the return value,
    /// such as channel or shared variable.
    ///
    /// - `task_id`: Uniquely mark a task. Different `Future` with **the same `task_id`** is considered **the same task**.
    /// - `task`: A `Future` to be executed automatically.
    pub async fn launch<Fut, R, E>(&self, task_id: T, task: Fut)
        where Fut: Future<Output=Result<R, E>> + Send + 'static,
              R: Send,
              E: Send {
        // insert working -> check success -> remove failed -> work -> insert success/failed -> remove working
        // `working_tasks` play the role of lock
        let res = self.task_manager.working_tasks.insert_async(task_id.clone()).await;
        if res.is_err() {
            // on working
            return;
        }
        // modify status
        if self.task_manager.success_tasks.contains_async(&task_id).await {
            // succeeded
            return;
        }
        // remove from `failed_tasks` if contained
        self.task_manager.failed_tasks.remove_async(&task_id).await;

        // adjust args
        let task_manager = self.task_manager.clone();

        // start
        let _task = tokio::spawn(async move {
            let add_pin_res = task.await;
            if add_pin_res.is_ok() {
                let _ = task_manager.success_tasks.insert_async(task_id.clone()).await;
                task_manager.working_tasks.remove_async(&task_id).await;
            } else {
                let _ = task_manager.failed_tasks.insert_async(task_id.clone()).await;
                task_manager.working_tasks.remove_async(&task_id).await;
            }
        });
    }

    /// Query the state of a task by `task_id`.
    ///
    /// Query priority of containers : `success_tasks` -> `failed_tasks` -> `working_tasks`.
    ///
    /// **NOTE**: `working_tasks` usually has more contention.
    ///
    /// If not found in all tasks, be `NotFound`.
    /// Only occurs before the launch or in a very short period of time after the first launch.
    ///
    /// Note, if `T` is `String`, then parameter `task_id` would be `&String` instead of `&str`.
    pub async fn query_task_state(&self, task_id: &T) -> TaskState {
        if self.task_manager.success_tasks.contains_async(task_id).await {
            return TaskState::Success;
        }

        if self.task_manager.failed_tasks.contains_async(task_id).await {
            return TaskState::Failed;
        }

        if self.task_manager.working_tasks.contains_async(task_id).await {
            return TaskState::Working;
        }

        TaskState::NotFound
    }

    /// Return `Working` if not in either `success_tasks` or `failed_tasks`.
    ///
    /// No query in `working_tasks`, so less contention.
    ///
    /// Even when the `task_id`'s launch have not occurred, return `Working`.
    ///
    /// Use this if you are certain that the task's launch must occur at some point in the past or future,
    /// and don't care about when the launch occurs
    /// (because first launch always turns into `Working` at some point).
    pub async fn query_task_state_quick(&self, task_id: &T) -> TaskState {
        if self.task_manager.success_tasks.contains_async(task_id).await {
            return TaskState::Success;
        }

        if self.task_manager.failed_tasks.contains_async(task_id).await {
            return TaskState::Failed;
        }

        TaskState::Working
    }

    /// Get a cloned `Arc` of `task_manager`.
    /// Then you can do anything you want (Every containers are public).
    pub fn get_task_manager_arc(&self) -> Arc<TaskManager<T>> {
        self.task_manager.clone()
    }

    /// Get a reference of `success_tasks`.
    pub fn get_success_tasks_ref(&self) -> &scc::HashSet<T> {
        &self.task_manager.success_tasks
    }

    /// Get a reference of `working_tasks`.
    pub fn get_working_tasks_ref(&self) -> &scc::HashSet<T> {
        &self.task_manager.working_tasks
    }

    /// Get a reference of `failed_tasks`.
    pub fn get_failed_tasks_ref(&self) -> &scc::HashSet<T> {
        &self.task_manager.failed_tasks
    }
}