1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#![doc = include_str!("runner.md")]

use async_channel as ac;
use std::thread::{self, JoinHandle, Thread};
use std::time::{Duration, Instant};

pub trait ExternalRunnerExt {
    fn set_task(&mut self, t: Task);
    fn start(&mut self) -> Result<(), &str>;
    fn close(self) -> Result<(), &'static str>;
}
pub trait InternalRunnerExt {
    fn set_task(&mut self, t: TaskWithHandle);
    fn start(&mut self) -> Result<(), &str>;
    fn join(self) -> Result<(), &'static str>;
}

pub type Task = Box<dyn FnMut() + Send + 'static>;
pub type TaskWithHandle = Box<dyn Send + 'static + FnMut() -> bool>;

pub struct Runner<T> {
    task: Option<T>,
    interval: Duration,
    s: Option<ac::Sender<u8>>,
    r: Option<ac::Receiver<u8>>,
    thread: Option<JoinHandle<()>>,
}

impl<T> Runner<T> {
    pub fn new(interval: Duration) -> Runner<T> {
        Runner {
            interval,
            task: None,
            s: None,
            r: None,
            thread: None,
        }
    }

    fn take_task(&mut self) -> Result<T, &'static str> {
        self.task.take().ok_or("task not set")
    }

    fn _start_runner(&mut self, task: Task) {
        let (sub_sender, main_receiver) = ac::bounded::<u8>(1);
        let (main_sender, sub_receiver) = ac::bounded::<u8>(1);

        self.s = Some(main_sender);
        self.r = Some(main_receiver);

        let interval = self.interval;
        let mut task = task;
        self.thread = Some(thread::spawn(move || {
            sub_sender.send_blocking(0).unwrap();
            loop {
                if sub_receiver.try_recv().is_ok() {
                    break;
                }

                let frame_start = Instant::now();

                task();

                if let Some(gap) = interval.checked_sub(frame_start.elapsed()) {
                    spin_sleep::sleep(gap);
                }
            }

            if sub_sender.send_blocking(0).is_err() {
                panic!("[task-controler] Error sending stopped signal");
            }
        }));
    }

    pub fn get_thread_ref(&self) -> &Thread {
        self.thread.as_ref().unwrap().thread()
    }
}

impl ExternalRunnerExt for Runner<Task> {
    fn set_task(&mut self, t: Task) {
        self.task = Some(t);
    }

    fn start(&mut self) -> Result<(), &str> {
        let task = self.take_task().unwrap();

        let (sub_sender, main_receiver) = ac::bounded::<u8>(1);
        let (main_sender, sub_receiver) = ac::bounded::<u8>(1);

        self.s = Some(main_sender);
        self.r = Some(main_receiver);

        let interval = self.interval;
        let mut task = task;
        self.thread = Some(thread::spawn(move || {
            sub_sender.send_blocking(0).unwrap();
            loop {
                if sub_receiver.try_recv().is_ok() {
                    break;
                }

                let frame_start = Instant::now();

                task();

                if let Some(gap) = interval.checked_sub(frame_start.elapsed()) {
                    spin_sleep::sleep(gap);
                }
            }

            if sub_sender.send_blocking(0).is_err() {
                panic!("[task-controler] Error sending stopped signal");
            }
        }));
        Ok(())
    }
    /// !!! DO NOT USE THIS IF `true` WILL BE RETURNED FROM `Task`, INSTEAD, USE `join()`.
    /// Send `signal` to runner, wait for response and join thread.
    /// Gets dropped after call
    fn close(mut self) -> Result<(), &'static str> {
        if let Some(t) = self.thread.take() {
            self.s.as_ref().unwrap().send_blocking(0).unwrap();
            self.r.as_ref().unwrap().recv_blocking().unwrap();
            t.join().unwrap();
            return Ok(());
        };
        Err("no task running")
    }
}

impl InternalRunnerExt for Runner<TaskWithHandle> {
    fn start(&mut self) -> Result<(), &'static str> {
        let task = self.take_task().unwrap();

        let (sub_sender, main_receiver) = ac::bounded::<u8>(1);

        self.s = None;
        self.r = Some(main_receiver);

        let interval = self.interval;
        let mut task = task;
        self.thread = Some(thread::spawn(move || {
            sub_sender.send_blocking(0).unwrap();
            loop {
                let frame_start = Instant::now();

                if task() {
                    return;
                };

                if let Some(gap) = interval.checked_sub(frame_start.elapsed()) {
                    spin_sleep::sleep(gap);
                }
            }
        }));
        Ok(())
    }

    /// object moved and gets dropped after call
    fn join(mut self) -> Result<(), &'static str> {
        if let Some(t) = self.thread.take() {
            t.join().unwrap();
            return Ok(());
        };
        Err("no task running")
    }

    fn set_task(&mut self, t: TaskWithHandle) {
        self.task = Some(t);
    }
}

pub fn new_external_close_runner(interval: Duration) -> Runner<Task> {
    Runner::new(interval)
}

pub fn new_internal_close_runner(interval: Duration) -> Runner<TaskWithHandle> {
    Runner::new(interval)
}