cog_task/server/scheduler/
p_sync.rs1use crate::action::nil::StatefulNil;
2use crate::action::{Action, ActionSignal, StatefulAction};
3use crate::comm::{QReader, QWriter, Signal, MAX_QUEUE_SIZE};
4use crate::resource::{IoManager, Key, LoggerSignal, ResourceManager};
5use crate::server::{AsyncSignal, Atomic, Block, Config, Env, ServerSignal};
6use eframe::egui;
7use eyre::{eyre, Context, Error, Result};
8use serde_cbor::{from_slice, Value};
9use std::collections::{BTreeSet, VecDeque};
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::{Duration, Instant};
13
14#[derive(Debug)]
15pub enum SyncSignal {
16 UpdateGraph,
17 KeyPress(Instant, BTreeSet<Key>),
18 Emit(Instant, Signal),
19 Error(Error),
20 Repaint,
21 Finish,
22 Go,
23}
24
25pub struct SyncProcessor {
26 ctx: egui::Context,
27 atomic: Atomic,
28 sync_reader: QReader<SyncSignal>,
29 sync_writer: QWriter<SyncSignal>,
30 async_writer: QWriter<AsyncSignal>,
31 server_writer: QWriter<ServerSignal>,
32}
33
34impl PartialEq for SyncSignal {
35 fn eq(&self, other: &Self) -> bool {
36 match (self, other) {
37 (SyncSignal::UpdateGraph, SyncSignal::UpdateGraph) => true,
38 (SyncSignal::KeyPress(t1, _), SyncSignal::KeyPress(t2, _)) => t1 == t2,
39 (SyncSignal::Emit(_, _), SyncSignal::Emit(_, _)) => false,
40 (SyncSignal::Repaint, SyncSignal::Repaint) => true,
41 (SyncSignal::Finish, SyncSignal::Finish) => true,
42 _ => false,
43 }
44 }
45}
46
47impl Eq for SyncSignal {}
48
49impl From<Signal> for SyncSignal {
50 fn from(signal: Signal) -> Self {
51 SyncSignal::Emit(Instant::now(), signal)
52 }
53}
54
55impl SyncProcessor {
56 pub fn spawn(
57 block: &Block,
58 env: &Env,
59 config: &Config,
60 ctx: &egui::Context,
61 async_writer: &QWriter<AsyncSignal>,
62 server_writer: &QWriter<ServerSignal>,
63 ) -> Result<(QWriter<SyncSignal>, Atomic)> {
64 let sync_reader = QReader::new();
65 let sync_writer = sync_reader.writer();
66 let atomic = Arc::new(Mutex::new((
67 Box::new(StatefulNil::new()) as Box<dyn StatefulAction>,
68 block.default_state().clone(),
69 )));
70 let mut proc = Self {
71 ctx: ctx.clone(),
72 atomic,
73 sync_reader,
74 sync_writer,
75 async_writer: async_writer.clone(),
76 server_writer: server_writer.clone(),
77 };
78
79 let sync_writer = proc.sync_writer.clone();
80 let atomic = proc.atomic.clone();
81
82 let env = env.clone();
83 let config = config.clone();
84 let tree = block.action_tree_vec();
85 let resources = block.resources(&config);
86 let tex_manager = ctx.tex_manager();
87
88 thread::spawn(move || {
89 let io_manager = match IoManager::new(&config) {
90 Ok(io) => io,
91 Err(e) => {
92 proc.server_writer.push(ServerSignal::BlockCrashed(
93 e.wrap_err("Failed to initialize IO manager."),
94 ));
95 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
96 proc.ctx.request_repaint();
97 return;
98 }
99 };
100
101 let mut res_manager = match ResourceManager::new(&config) {
102 Ok(r) => r,
103 Err(e) => {
104 proc.server_writer.push(ServerSignal::BlockCrashed(
105 e.wrap_err("Failed to initialize resource manager."),
106 ));
107 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
108 proc.ctx.request_repaint();
109 return;
110 }
111 };
112
113 if let Err(e) = res_manager.preload_block(resources, tex_manager, &config, &env) {
114 proc.server_writer.push(ServerSignal::BlockCrashed(
115 e.wrap_err("Failed to load resources for block."),
116 ));
117 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
118 proc.ctx.request_repaint();
119 return;
120 }
121
122 let tree = match from_slice::<Box<dyn Action>>(&tree) {
123 Ok(tree) => tree,
124 Err(e) => {
125 proc.server_writer.push(ServerSignal::BlockCrashed(eyre!(
126 "Failed to transfer action tree to sync process:\n{e:?}"
127 )));
128 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
129 proc.ctx.request_repaint();
130 return;
131 }
132 };
133
134 let tree = match tree.stateful(
135 &io_manager,
136 &res_manager,
137 &config,
138 &proc.sync_writer,
139 &proc.async_writer,
140 ) {
141 Ok(t) => t,
142 Err(e) => {
143 proc.server_writer.push(ServerSignal::BlockCrashed(
144 e.wrap_err("Failed to make action tree stateful."),
145 ));
146 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
147 proc.ctx.request_repaint();
148 return;
149 }
150 };
151
152 proc.server_writer.push(ServerSignal::LoadComplete);
153 proc.ctx.request_repaint();
154
155 loop {
156 match proc.sync_reader.pop() {
157 Some(SyncSignal::Go) => break,
158 None => return,
159 _ => {}
160 }
161 }
162 thread::sleep(Duration::from_secs(1));
163
164 if let Err(e) = proc.start(tree).wrap_err("Failed to start block.") {
165 proc.server_writer.push(ServerSignal::BlockCrashed(e));
166 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
167 proc.ctx.request_repaint();
168 return;
169 }
170
171 'mainloop: while let Ok(signals) = proc.sync_reader.poll() {
172 let mut n_signal = signals.len();
173 let mut signals = VecDeque::from(signals);
174 while let Some(signal) = signals.pop_front() {
175 #[cfg(debug_assertions)]
176 println!("{signal:?}");
177
178 let news = match signal {
179 SyncSignal::UpdateGraph => {
180 let (tree, state) = &mut *proc.atomic.lock().unwrap();
181 tree.update(
182 &ActionSignal::UpdateGraph,
183 &mut proc.sync_writer,
184 &mut proc.async_writer,
185 state,
186 )
187 .wrap_err("Failed to update graph.")
188 }
189 SyncSignal::KeyPress(time, keys) => {
190 let (tree, state) = &mut *proc.atomic.lock().unwrap();
191 tree.update(
192 &ActionSignal::KeyPress(time, keys),
193 &mut proc.sync_writer,
194 &mut proc.async_writer,
195 state,
196 )
197 .wrap_err("Failed to process key press.")
198 }
199 SyncSignal::Emit(time, signal) => {
200 let (tree, state) = &mut *proc.atomic.lock().unwrap();
201
202 let mut changed = BTreeSet::new();
203 for (k, v) in signal.into_iter() {
204 if k > 0 {
205 state.insert(k, v);
206 changed.insert(k);
207 }
208 }
209
210 tree.update(
211 &ActionSignal::StateChanged(time, changed),
212 &mut proc.sync_writer,
213 &mut proc.async_writer,
214 state,
215 )
216 .wrap_err("Failed to emit signal.")
217 }
218 SyncSignal::Error(e) => Err(e),
219 SyncSignal::Repaint => {
220 proc.ctx.request_repaint();
221 Ok(Signal::none())
222 }
223 SyncSignal::Finish => break 'mainloop,
224 SyncSignal::Go => Ok(Signal::none()),
225 };
226
227 let news = match news {
228 Ok(s) => s,
229 Err(e) => {
230 proc.server_writer.push(ServerSignal::BlockCrashed(e));
231 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
232 proc.ctx.request_repaint();
233 return;
234 }
235 };
236
237 if !news.is_empty() {
238 n_signal += 1;
239 if n_signal > MAX_QUEUE_SIZE {
240 proc.server_writer.push(ServerSignal::BlockCrashed(eyre!(
241 "Number of signals in a single poll exceeded MAX_QUEUE_SIZE."
242 )));
243 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
244 proc.ctx.request_repaint();
245 return;
246 } else {
247 signals.push_front(news.into());
248 }
249 }
250
251 let (tree, state) = &mut *proc.atomic.lock().unwrap();
252 let is_over = tree
253 .is_over()
254 .wrap_err_with(|| eyre!("Failed to check if action over: {tree:?}"));
255
256 let is_over = match is_over {
257 Ok(c) => c,
258 Err(e) => {
259 proc.server_writer.push(ServerSignal::BlockCrashed(e));
260 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
261 let _ = tree.stop(&mut proc.sync_writer, &mut proc.async_writer, state);
262 *tree = Box::new(StatefulNil::new());
263 proc.ctx.request_repaint();
264 return;
265 }
266 };
267
268 if is_over {
269 proc.server_writer.push(ServerSignal::BlockFinished);
270 if let Err(e) =
271 tree.stop(&mut proc.sync_writer, &mut proc.async_writer, state)
272 {
273 println!("Failed to graciously finish task:\n{e:?}");
274 }
275 *tree = Box::new(StatefulNil::new());
276 proc.ctx.request_repaint();
277 }
278 }
279 }
280
281 proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
282 proc.ctx.request_repaint();
283 });
284
285 Ok((sync_writer, atomic))
286 }
287
288 fn start(&mut self, root: Box<dyn StatefulAction>) -> Result<()> {
289 let (tree, state) = &mut *self.atomic.lock().unwrap();
290
291 self.async_writer.push(LoggerSignal::Append(
292 "main".to_owned(),
293 ("start".to_owned(), Value::Text("ok".to_owned())),
294 ));
295
296 *tree = root;
297 let news = tree.start(&mut self.sync_writer, &mut self.async_writer, state)?;
298 if !news.is_empty() {
299 self.sync_writer.push(SyncSignal::from(news));
300 }
301
302 Ok(())
303 }
304}