cog_task/server/scheduler/
mod.rs

1pub mod p_async;
2pub mod p_sync;
3
4pub use p_async::*;
5pub use p_sync::*;
6
7use crate::action::StatefulAction;
8use crate::comm::QWriter;
9use crate::resource::{LoggerSignal, TAG_ACTION, TAG_CONFIG, TAG_INFO};
10use crate::server::{Config, Info, Server, ServerSignal};
11use eframe::egui;
12use eframe::egui::{CentralPanel, CursorIcon, Frame};
13use eyre::Result;
14use serde_cbor::{ser::to_vec, Value};
15use std::collections::{BTreeMap, BTreeSet};
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant, SystemTime};
18
19pub type State = BTreeMap<u16, Value>;
20pub type Atomic = Arc<Mutex<(Box<dyn StatefulAction>, State)>>;
21
22pub struct Scheduler {
23    atomic: Atomic,
24    info: Info,
25    last_esc: Option<SystemTime>,
26    config: Config,
27    ctx: egui::Context,
28    sync_writer: QWriter<SyncSignal>,
29    async_writer: QWriter<AsyncSignal>,
30    server_writer: QWriter<ServerSignal>,
31}
32
33impl Scheduler {
34    pub fn new(server: &Server, ctx: &egui::Context) -> Result<Self> {
35        let env = server.env();
36        let task = server.task();
37        let block = server.active_block().unwrap();
38        let info = Info::new(server, task, block);
39        let config = block.config(server.config());
40
41        let server_writer = server.callback_channel();
42        let mut async_writer = AsyncProcessor::spawn(&info, &config, &server_writer)?;
43        let (sync_writer, atomic) =
44            SyncProcessor::spawn(block, env, &config, ctx, &async_writer, &server_writer)?;
45
46        async_writer.push(LoggerSignal::Extend(
47            "main".to_owned(),
48            vec![
49                (
50                    "info".to_owned(),
51                    Value::Tag(TAG_INFO, Box::new(Value::Bytes(to_vec(&info).unwrap()))),
52                ),
53                (
54                    "config".to_owned(),
55                    Value::Tag(TAG_CONFIG, Box::new(Value::Bytes(to_vec(&config).unwrap()))),
56                ),
57                (
58                    "tree".to_owned(),
59                    Value::Tag(TAG_ACTION, Box::new(Value::Bytes(block.action_tree_vec()))),
60                ),
61            ],
62        ));
63
64        Ok(Self {
65            atomic,
66            info,
67            last_esc: None,
68            config,
69            ctx: ctx.clone(),
70            sync_writer,
71            async_writer,
72            server_writer,
73        })
74    }
75
76    #[inline(always)]
77    pub fn info(&self) -> &Info {
78        &self.info
79    }
80
81    #[inline(always)]
82    pub fn config(&self) -> &Config {
83        &self.config
84    }
85
86    pub fn request_interrupt(&mut self) {
87        self.async_writer.push(LoggerSignal::Append(
88            "main".to_owned(),
89            (
90                "interrupt".to_owned(),
91                Value::Text("user request".to_owned()),
92            ),
93        ));
94
95        self.server_writer.push(ServerSignal::BlockInterrupted);
96        self.ctx.request_repaint();
97    }
98
99    pub fn show(&mut self, ui: &mut egui::Ui) -> Result<()> {
100        if ui.input().key_pressed(egui::Key::Escape) {
101            let time = SystemTime::now();
102            if let Some(t) = self.last_esc.take() {
103                if time.duration_since(t).unwrap() < Duration::from_millis(300) {
104                    self.request_interrupt();
105                    return Ok(());
106                }
107            }
108            self.last_esc = Some(time);
109        }
110
111        let keys_pressed: BTreeSet<_> = ui
112            .input()
113            .keys_down
114            .iter()
115            .filter_map(|k| {
116                if ui.input().key_pressed(*k) {
117                    Some(k.into())
118                } else {
119                    None
120                }
121            })
122            .collect();
123        if !keys_pressed.is_empty() {
124            self.sync_writer
125                .push(SyncSignal::KeyPress(Instant::now(), keys_pressed))
126        }
127
128        let result = {
129            let (tree, state) = &mut *self.atomic.lock().unwrap();
130            CentralPanel::default()
131                .frame(Frame::default().fill(self.config.background().into()))
132                .show_inside(ui, |ui| {
133                    if tree.props().visual() {
134                        tree.show(ui, &mut self.sync_writer, &mut self.async_writer, state)
135                    } else {
136                        ui.output().cursor_icon = CursorIcon::None;
137                        Ok(())
138                    }
139                })
140                .inner
141        };
142
143        if let Err(e) = &result {
144            self.async_writer.push(LoggerSignal::Append(
145                "main".to_owned(),
146                ("crash".to_owned(), Value::Text(format!("{e:#?}"))),
147            ));
148        }
149
150        result
151    }
152
153    pub fn sync_writer(&mut self) -> &mut QWriter<SyncSignal> {
154        &mut self.sync_writer
155    }
156
157    pub fn async_writer(&mut self) -> &mut QWriter<AsyncSignal> {
158        &mut self.async_writer
159    }
160}
161
162impl Drop for Scheduler {
163    fn drop(&mut self) {
164        self.async_writer.push(LoggerSignal::Append(
165            "main".to_owned(),
166            ("finish".to_owned(), Value::Text("ok".to_owned())),
167        ));
168
169        self.sync_writer.push(SyncSignal::Finish);
170        self.async_writer.push(AsyncSignal::Finish);
171    }
172}