cog_task/server/scheduler/
p_sync.rs

1use crate::action::nil::StatefulNil;
2use crate::action::{Action, ActionSignal, StatefulAction};
3use crate::comm::{QReader, QWriter, Signal, MAX_QUEUE_SIZE};
4use crate::resource::{IoManager, Key, LoggerSignal, ResourceManager};
5use crate::server::{AsyncSignal, Atomic, Block, Config, Env, ServerSignal};
6use eframe::egui;
7use eyre::{eyre, Context, Error, Result};
8use serde_cbor::{from_slice, Value};
9use std::collections::{BTreeSet, VecDeque};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::{Duration, Instant};
13
14#[derive(Debug)]
15pub enum SyncSignal {
16    UpdateGraph,
17    KeyPress(Instant, BTreeSet<Key>),
18    Emit(Instant, Signal),
19    Error(Error),
20    Repaint,
21    Finish,
22    Go,
23}
24
25pub struct SyncProcessor {
26    ctx: egui::Context,
27    atomic: Atomic,
28    sync_reader: QReader<SyncSignal>,
29    sync_writer: QWriter<SyncSignal>,
30    async_writer: QWriter<AsyncSignal>,
31    server_writer: QWriter<ServerSignal>,
32}
33
34impl PartialEq for SyncSignal {
35    fn eq(&self, other: &Self) -> bool {
36        match (self, other) {
37            (SyncSignal::UpdateGraph, SyncSignal::UpdateGraph) => true,
38            (SyncSignal::KeyPress(t1, _), SyncSignal::KeyPress(t2, _)) => t1 == t2,
39            (SyncSignal::Emit(_, _), SyncSignal::Emit(_, _)) => false,
40            (SyncSignal::Repaint, SyncSignal::Repaint) => true,
41            (SyncSignal::Finish, SyncSignal::Finish) => true,
42            _ => false,
43        }
44    }
45}
46
47impl Eq for SyncSignal {}
48
49impl From<Signal> for SyncSignal {
50    fn from(signal: Signal) -> Self {
51        SyncSignal::Emit(Instant::now(), signal)
52    }
53}
54
55impl SyncProcessor {
56    pub fn spawn(
57        block: &Block,
58        env: &Env,
59        config: &Config,
60        ctx: &egui::Context,
61        async_writer: &QWriter<AsyncSignal>,
62        server_writer: &QWriter<ServerSignal>,
63    ) -> Result<(QWriter<SyncSignal>, Atomic)> {
64        let sync_reader = QReader::new();
65        let sync_writer = sync_reader.writer();
66        let atomic = Arc::new(Mutex::new((
67            Box::new(StatefulNil::new()) as Box<dyn StatefulAction>,
68            block.default_state().clone(),
69        )));
70        let mut proc = Self {
71            ctx: ctx.clone(),
72            atomic,
73            sync_reader,
74            sync_writer,
75            async_writer: async_writer.clone(),
76            server_writer: server_writer.clone(),
77        };
78
79        let sync_writer = proc.sync_writer.clone();
80        let atomic = proc.atomic.clone();
81
82        let env = env.clone();
83        let config = config.clone();
84        let tree = block.action_tree_vec();
85        let resources = block.resources(&config);
86        let tex_manager = ctx.tex_manager();
87
88        thread::spawn(move || {
89            let io_manager = match IoManager::new(&config) {
90                Ok(io) => io,
91                Err(e) => {
92                    proc.server_writer.push(ServerSignal::BlockCrashed(
93                        e.wrap_err("Failed to initialize IO manager."),
94                    ));
95                    proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
96                    proc.ctx.request_repaint();
97                    return;
98                }
99            };
100
101            let mut res_manager = match ResourceManager::new(&config) {
102                Ok(r) => r,
103                Err(e) => {
104                    proc.server_writer.push(ServerSignal::BlockCrashed(
105                        e.wrap_err("Failed to initialize resource manager."),
106                    ));
107                    proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
108                    proc.ctx.request_repaint();
109                    return;
110                }
111            };
112
113            if let Err(e) = res_manager.preload_block(resources, tex_manager, &config, &env) {
114                proc.server_writer.push(ServerSignal::BlockCrashed(
115                    e.wrap_err("Failed to load resources for block."),
116                ));
117                proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
118                proc.ctx.request_repaint();
119                return;
120            }
121
122            let tree = match from_slice::<Box<dyn Action>>(&tree) {
123                Ok(tree) => tree,
124                Err(e) => {
125                    proc.server_writer.push(ServerSignal::BlockCrashed(eyre!(
126                        "Failed to transfer action tree to sync process:\n{e:?}"
127                    )));
128                    proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
129                    proc.ctx.request_repaint();
130                    return;
131                }
132            };
133
134            let tree = match tree.stateful(
135                &io_manager,
136                &res_manager,
137                &config,
138                &proc.sync_writer,
139                &proc.async_writer,
140            ) {
141                Ok(t) => t,
142                Err(e) => {
143                    proc.server_writer.push(ServerSignal::BlockCrashed(
144                        e.wrap_err("Failed to make action tree stateful."),
145                    ));
146                    proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
147                    proc.ctx.request_repaint();
148                    return;
149                }
150            };
151
152            proc.server_writer.push(ServerSignal::LoadComplete);
153            proc.ctx.request_repaint();
154
155            loop {
156                match proc.sync_reader.pop() {
157                    Some(SyncSignal::Go) => break,
158                    None => return,
159                    _ => {}
160                }
161            }
162            thread::sleep(Duration::from_secs(1));
163
164            if let Err(e) = proc.start(tree).wrap_err("Failed to start block.") {
165                proc.server_writer.push(ServerSignal::BlockCrashed(e));
166                proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
167                proc.ctx.request_repaint();
168                return;
169            }
170
171            'mainloop: while let Ok(signals) = proc.sync_reader.poll() {
172                let mut n_signal = signals.len();
173                let mut signals = VecDeque::from(signals);
174                while let Some(signal) = signals.pop_front() {
175                    #[cfg(debug_assertions)]
176                    println!("{signal:?}");
177
178                    let news = match signal {
179                        SyncSignal::UpdateGraph => {
180                            let (tree, state) = &mut *proc.atomic.lock().unwrap();
181                            tree.update(
182                                &ActionSignal::UpdateGraph,
183                                &mut proc.sync_writer,
184                                &mut proc.async_writer,
185                                state,
186                            )
187                            .wrap_err("Failed to update graph.")
188                        }
189                        SyncSignal::KeyPress(time, keys) => {
190                            let (tree, state) = &mut *proc.atomic.lock().unwrap();
191                            tree.update(
192                                &ActionSignal::KeyPress(time, keys),
193                                &mut proc.sync_writer,
194                                &mut proc.async_writer,
195                                state,
196                            )
197                            .wrap_err("Failed to process key press.")
198                        }
199                        SyncSignal::Emit(time, signal) => {
200                            let (tree, state) = &mut *proc.atomic.lock().unwrap();
201
202                            let mut changed = BTreeSet::new();
203                            for (k, v) in signal.into_iter() {
204                                if k > 0 {
205                                    state.insert(k, v);
206                                    changed.insert(k);
207                                }
208                            }
209
210                            tree.update(
211                                &ActionSignal::StateChanged(time, changed),
212                                &mut proc.sync_writer,
213                                &mut proc.async_writer,
214                                state,
215                            )
216                            .wrap_err("Failed to emit signal.")
217                        }
218                        SyncSignal::Error(e) => Err(e),
219                        SyncSignal::Repaint => {
220                            proc.ctx.request_repaint();
221                            Ok(Signal::none())
222                        }
223                        SyncSignal::Finish => break 'mainloop,
224                        SyncSignal::Go => Ok(Signal::none()),
225                    };
226
227                    let news = match news {
228                        Ok(s) => s,
229                        Err(e) => {
230                            proc.server_writer.push(ServerSignal::BlockCrashed(e));
231                            proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
232                            proc.ctx.request_repaint();
233                            return;
234                        }
235                    };
236
237                    if !news.is_empty() {
238                        n_signal += 1;
239                        if n_signal > MAX_QUEUE_SIZE {
240                            proc.server_writer.push(ServerSignal::BlockCrashed(eyre!(
241                                "Number of signals in a single poll exceeded MAX_QUEUE_SIZE."
242                            )));
243                            proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
244                            proc.ctx.request_repaint();
245                            return;
246                        } else {
247                            signals.push_front(news.into());
248                        }
249                    }
250
251                    let (tree, state) = &mut *proc.atomic.lock().unwrap();
252                    let is_over = tree
253                        .is_over()
254                        .wrap_err_with(|| eyre!("Failed to check if action over: {tree:?}"));
255
256                    let is_over = match is_over {
257                        Ok(c) => c,
258                        Err(e) => {
259                            proc.server_writer.push(ServerSignal::BlockCrashed(e));
260                            proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
261                            let _ = tree.stop(&mut proc.sync_writer, &mut proc.async_writer, state);
262                            *tree = Box::new(StatefulNil::new());
263                            proc.ctx.request_repaint();
264                            return;
265                        }
266                    };
267
268                    if is_over {
269                        proc.server_writer.push(ServerSignal::BlockFinished);
270                        if let Err(e) =
271                            tree.stop(&mut proc.sync_writer, &mut proc.async_writer, state)
272                        {
273                            println!("Failed to graciously finish task:\n{e:?}");
274                        }
275                        *tree = Box::new(StatefulNil::new());
276                        proc.ctx.request_repaint();
277                    }
278                }
279            }
280
281            proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
282            proc.ctx.request_repaint();
283        });
284
285        Ok((sync_writer, atomic))
286    }
287
288    fn start(&mut self, root: Box<dyn StatefulAction>) -> Result<()> {
289        let (tree, state) = &mut *self.atomic.lock().unwrap();
290
291        self.async_writer.push(LoggerSignal::Append(
292            "main".to_owned(),
293            ("start".to_owned(), Value::Text("ok".to_owned())),
294        ));
295
296        *tree = root;
297        let news = tree.start(&mut self.sync_writer, &mut self.async_writer, state)?;
298        if !news.is_empty() {
299            self.sync_writer.push(SyncSignal::from(news));
300        }
301
302        Ok(())
303    }
304}