1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::sync::Arc;

use async_recursion::async_recursion;
use tokio::task::JoinHandle;

use crate::task::Task;

struct TaskHandle {
    task: Task,
    handle: JoinHandle<()>,
}
struct _EventQueue {
    tid: tokio::sync::Mutex<usize>,
    tasks: tokio::sync::Mutex<Vec<Task>>,
    current: tokio::sync::Mutex<Option<TaskHandle>>,
}
impl _EventQueue {
    fn new() -> Self {
        Self {
            tid: tokio::sync::Mutex::new(1),
            tasks: tokio::sync::Mutex::new(Vec::new()),
            current: tokio::sync::Mutex::new(None),
        }
    }
}
async fn spawn_task(sq: Arc<_EventQueue>, task: &Task) -> JoinHandle<()> {
    let task = task.clone();
    tokio::spawn(async move {
        task.clone().await;
        add(sq.clone(), task).await;
        next(sq.clone()).await;
    })
}

#[async_recursion]
async fn next(sq: Arc<_EventQueue>) {
    let mut queue_guard = sq.tasks.lock().await; 
    let mut current_guard = sq.current.lock().await;
    *current_guard = None;

    if let Some(task) = queue_guard.pop() {
        // println!("task next start: {:?} {:?}", task.id, task.timestamp);
        let handle = spawn_task(sq.clone(), &task).await;
        *current_guard = Some(TaskHandle {
            task,
            handle
        });
    }
}
#[async_recursion]
async fn add(sq: Arc<_EventQueue>, mut task: Task) {
    let mut tid = sq.tid.lock().await;
    let mut queue_guard = sq.tasks.lock().await; 
    let mut current_guard = sq.current.lock().await;

    task.id = Some(*tid);
    *tid += 1;

    task.ready();
    if task.timestamp.is_none() {
        return;
    }

    let taskhandle = current_guard.take();

    match taskhandle {
        Some(t) => {
            let Some(cur_timestamp) = task.timestamp else { return; };
            let Some(new_timestamp) = t.task.timestamp else { return; };
            if cur_timestamp < new_timestamp {
                // println!("task abort: {:?} {:?}", t.task.id, t.task.timestamp);
                t.handle.abort();
                queue_guard.push(t.task);
            }
            else {
                *current_guard = Some(t);
            }
        },
        None => {}
    }
    queue_guard.push(task);
    queue_guard.sort_by(|a, b| {
        b.timestamp.unwrap().cmp(&a.timestamp.unwrap())
    });

    if current_guard.is_none() {
        match queue_guard.pop() {
            Some(task) => {
                // println!("task add start: {:?} {:?}", task.id, task.timestamp);
                let handle = spawn_task(sq.clone(), &task).await;
                *current_guard = Some(TaskHandle {
                    task,
                    handle
                });
            },
            None => {

            }
        }
    }
}

pub struct SchedQueue {
    eq: Arc<_EventQueue>,
}
impl SchedQueue {
    pub fn new() -> Self {
        Self {
            eq: Arc::new(_EventQueue::new())
        }
    }
    pub async fn add(&self, task: Task) {
        add(self.eq.clone(), task).await;
    }
}