cog-task 1.2.0

A general-purpose low-latency application to run cognitive tasks
Documentation
use crate::action::nil::StatefulNil;
use crate::action::{Action, ActionSignal, StatefulAction};
use crate::comm::{QReader, QWriter, Signal, MAX_QUEUE_SIZE};
use crate::resource::{IoManager, Key, LoggerSignal, ResourceManager};
use crate::server::{AsyncSignal, Atomic, Block, Config, Env, ServerSignal};
use eframe::egui;
use eyre::{eyre, Context, Error, Result};
use serde_cbor::{from_slice, Value};
use std::collections::{BTreeSet, VecDeque};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

#[derive(Debug)]
pub enum SyncSignal {
    UpdateGraph,
    KeyPress(Instant, BTreeSet<Key>),
    Emit(Instant, Signal),
    Error(Error),
    Repaint,
    Finish,
    Go,
}

pub struct SyncProcessor {
    ctx: egui::Context,
    atomic: Atomic,
    sync_reader: QReader<SyncSignal>,
    sync_writer: QWriter<SyncSignal>,
    async_writer: QWriter<AsyncSignal>,
    server_writer: QWriter<ServerSignal>,
}

impl PartialEq for SyncSignal {
    fn eq(&self, other: &Self) -> bool {
        match (self, other) {
            (SyncSignal::UpdateGraph, SyncSignal::UpdateGraph) => true,
            (SyncSignal::KeyPress(t1, _), SyncSignal::KeyPress(t2, _)) => t1 == t2,
            (SyncSignal::Emit(_, _), SyncSignal::Emit(_, _)) => false,
            (SyncSignal::Repaint, SyncSignal::Repaint) => true,
            (SyncSignal::Finish, SyncSignal::Finish) => true,
            _ => false,
        }
    }
}

impl Eq for SyncSignal {}

impl From<Signal> for SyncSignal {
    fn from(signal: Signal) -> Self {
        SyncSignal::Emit(Instant::now(), signal)
    }
}

impl SyncProcessor {
    pub fn spawn(
        block: &Block,
        env: &Env,
        config: &Config,
        ctx: &egui::Context,
        async_writer: &QWriter<AsyncSignal>,
        server_writer: &QWriter<ServerSignal>,
    ) -> Result<(QWriter<SyncSignal>, Atomic)> {
        let sync_reader = QReader::new();
        let sync_writer = sync_reader.writer();
        let atomic = Arc::new(Mutex::new((
            Box::new(StatefulNil::new()) as Box<dyn StatefulAction>,
            block.default_state().clone(),
        )));
        let mut proc = Self {
            ctx: ctx.clone(),
            atomic,
            sync_reader,
            sync_writer,
            async_writer: async_writer.clone(),
            server_writer: server_writer.clone(),
        };

        let sync_writer = proc.sync_writer.clone();
        let atomic = proc.atomic.clone();

        let env = env.clone();
        let config = config.clone();
        let tree = block.action_tree_vec();
        let resources = block.resources(&config);
        let tex_manager = ctx.tex_manager();

        thread::spawn(move || {
            let io_manager = match IoManager::new(&config) {
                Ok(io) => io,
                Err(e) => {
                    proc.server_writer.push(ServerSignal::BlockCrashed(
                        e.wrap_err("Failed to initialize IO manager."),
                    ));
                    proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                    proc.ctx.request_repaint();
                    return;
                }
            };

            let mut res_manager = match ResourceManager::new(&config) {
                Ok(r) => r,
                Err(e) => {
                    proc.server_writer.push(ServerSignal::BlockCrashed(
                        e.wrap_err("Failed to initialize resource manager."),
                    ));
                    proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                    proc.ctx.request_repaint();
                    return;
                }
            };

            if let Err(e) = res_manager.preload_block(resources, tex_manager, &config, &env) {
                proc.server_writer.push(ServerSignal::BlockCrashed(
                    e.wrap_err("Failed to load resources for block."),
                ));
                proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                proc.ctx.request_repaint();
                return;
            }

            let tree = match from_slice::<Box<dyn Action>>(&tree) {
                Ok(tree) => tree,
                Err(e) => {
                    proc.server_writer.push(ServerSignal::BlockCrashed(eyre!(
                        "Failed to transfer action tree to sync process:\n{e:?}"
                    )));
                    proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                    proc.ctx.request_repaint();
                    return;
                }
            };

            let tree = match tree.stateful(
                &io_manager,
                &res_manager,
                &config,
                &proc.sync_writer,
                &proc.async_writer,
            ) {
                Ok(t) => t,
                Err(e) => {
                    proc.server_writer.push(ServerSignal::BlockCrashed(
                        e.wrap_err("Failed to make action tree stateful."),
                    ));
                    proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                    proc.ctx.request_repaint();
                    return;
                }
            };

            proc.server_writer.push(ServerSignal::LoadComplete);
            proc.ctx.request_repaint();

            loop {
                match proc.sync_reader.pop() {
                    Some(SyncSignal::Go) => break,
                    None => return,
                    _ => {}
                }
            }
            thread::sleep(Duration::from_secs(1));

            if let Err(e) = proc.start(tree).wrap_err("Failed to start block.") {
                proc.server_writer.push(ServerSignal::BlockCrashed(e));
                proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                proc.ctx.request_repaint();
                return;
            }

            'mainloop: while let Ok(signals) = proc.sync_reader.poll() {
                let mut n_signal = signals.len();
                let mut signals = VecDeque::from(signals);
                while let Some(signal) = signals.pop_front() {
                    #[cfg(debug_assertions)]
                    println!("{signal:?}");

                    let news = match signal {
                        SyncSignal::UpdateGraph => {
                            let (tree, state) = &mut *proc.atomic.lock().unwrap();
                            tree.update(
                                &ActionSignal::UpdateGraph,
                                &mut proc.sync_writer,
                                &mut proc.async_writer,
                                state,
                            )
                            .wrap_err("Failed to update graph.")
                        }
                        SyncSignal::KeyPress(time, keys) => {
                            let (tree, state) = &mut *proc.atomic.lock().unwrap();
                            tree.update(
                                &ActionSignal::KeyPress(time, keys),
                                &mut proc.sync_writer,
                                &mut proc.async_writer,
                                state,
                            )
                            .wrap_err("Failed to process key press.")
                        }
                        SyncSignal::Emit(time, signal) => {
                            let (tree, state) = &mut *proc.atomic.lock().unwrap();

                            let mut changed = BTreeSet::new();
                            for (k, v) in signal.into_iter() {
                                if k > 0 {
                                    state.insert(k, v);
                                    changed.insert(k);
                                }
                            }

                            tree.update(
                                &ActionSignal::StateChanged(time, changed),
                                &mut proc.sync_writer,
                                &mut proc.async_writer,
                                state,
                            )
                            .wrap_err("Failed to emit signal.")
                        }
                        SyncSignal::Error(e) => Err(e),
                        SyncSignal::Repaint => {
                            proc.ctx.request_repaint();
                            Ok(Signal::none())
                        }
                        SyncSignal::Finish => break 'mainloop,
                        SyncSignal::Go => Ok(Signal::none()),
                    };

                    let news = match news {
                        Ok(s) => s,
                        Err(e) => {
                            proc.server_writer.push(ServerSignal::BlockCrashed(e));
                            proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                            proc.ctx.request_repaint();
                            return;
                        }
                    };

                    if !news.is_empty() {
                        n_signal += 1;
                        if n_signal > MAX_QUEUE_SIZE {
                            proc.server_writer.push(ServerSignal::BlockCrashed(eyre!(
                                "Number of signals in a single poll exceeded MAX_QUEUE_SIZE."
                            )));
                            proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                            proc.ctx.request_repaint();
                            return;
                        } else {
                            signals.push_front(news.into());
                        }
                    }

                    let (tree, state) = &mut *proc.atomic.lock().unwrap();
                    let is_over = tree
                        .is_over()
                        .wrap_err_with(|| eyre!("Failed to check if action over: {tree:?}"));

                    let is_over = match is_over {
                        Ok(c) => c,
                        Err(e) => {
                            proc.server_writer.push(ServerSignal::BlockCrashed(e));
                            proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
                            let _ = tree.stop(&mut proc.sync_writer, &mut proc.async_writer, state);
                            *tree = Box::new(StatefulNil::new());
                            proc.ctx.request_repaint();
                            return;
                        }
                    };

                    if is_over {
                        proc.server_writer.push(ServerSignal::BlockFinished);
                        if let Err(e) =
                            tree.stop(&mut proc.sync_writer, &mut proc.async_writer, state)
                        {
                            println!("Failed to graciously finish task:\n{e:?}");
                        }
                        *tree = Box::new(StatefulNil::new());
                        proc.ctx.request_repaint();
                    }
                }
            }

            proc.server_writer.push(ServerSignal::SyncComplete(Ok(())));
            proc.ctx.request_repaint();
        });

        Ok((sync_writer, atomic))
    }

    fn start(&mut self, root: Box<dyn StatefulAction>) -> Result<()> {
        let (tree, state) = &mut *self.atomic.lock().unwrap();

        self.async_writer.push(LoggerSignal::Append(
            "main".to_owned(),
            ("start".to_owned(), Value::Text("ok".to_owned())),
        ));

        *tree = root;
        let news = tree.start(&mut self.sync_writer, &mut self.async_writer, state)?;
        if !news.is_empty() {
            self.sync_writer.push(SyncSignal::from(news));
        }

        Ok(())
    }
}