cog-task 1.2.0

A general-purpose low-latency application to run cognitive tasks
Documentation
use crate::action::{Action, ActionSignal, Props, StatefulAction, DEFAULT, INFINITE};
use crate::comm::{QWriter, Signal, SignalId};
use crate::resource::{
    Evaluator, Interpreter, IoManager, LoggerSignal, OptionalPath, OptionalString, ResourceAddr,
    ResourceManager, ResourceValue,
};
use crate::server::{AsyncSignal, Config, State, SyncSignal};
use eyre::{eyre, Context, Error, Result};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_cbor::Value;
use std::collections::{BTreeMap, BTreeSet};
use std::time::Instant;

#[derive(Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Function {
    #[serde(default)]
    name: String,
    #[serde(default)]
    expr: OptionalString,
    #[serde(default)]
    src: OptionalPath,
    #[serde(default)]
    init_expr: OptionalString,
    #[serde(default)]
    init_src: OptionalPath,
    #[serde(default)]
    vars: BTreeMap<String, Value>,
    #[serde(default)]
    interpreter: Interpreter,
    #[serde(default = "defaults::on_start")]
    on_start: bool,
    #[serde(default = "defaults::on_change")]
    on_change: bool,
    #[serde(default)]
    once: bool,
    #[serde(default)]
    in_mapping: BTreeMap<SignalId, String>,
    #[serde(default)]
    in_update: SignalId,
    #[serde(default)]
    lo_response: SignalId,
    #[serde(default)]
    out_result: SignalId,
}

stateful!(Function {
    name: String,
    vars: BTreeMap<String, Value>,
    evaluator: Evaluator,
    on_start: bool,
    on_change: bool,
    once: bool,
    in_mapping: BTreeMap<SignalId, String>,
    in_update: SignalId,
    lo_response: SignalId,
    out_result: SignalId,
});

mod defaults {
    pub fn on_start() -> bool {
        true
    }

    pub fn on_change() -> bool {
        true
    }
}

impl Action for Function {
    #[inline(always)]
    fn init(self) -> Result<Box<dyn Action>, Error>
    where
        Self: 'static + Sized,
    {
        match (self.expr.is_some(), self.src.is_some()) {
            (false, false) => Err(eyre!("`expr` and `src` cannot both be empty."))?,
            (true, true) => Err(eyre!("Only one of `expr` and `src` should be set."))?,
            _ => {}
        };

        if self.init_expr.is_some() && self.init_src.is_some() {
            return Err(eyre!(
                "Only one of `init_expr` and `init_src` should be set."
            ));
        }

        let re = Regex::new(r"^[[:alpha:]][[:word:]]*$").unwrap();
        for (_, var) in self.in_mapping.iter() {
            if var.as_str() == "self" {
                return Err(eyre!(
                    "Reserved variable (\"self\") of Fn cannot be included in `in_mapping`."
                ));
            } else if !re.is_match(var) {
                return Err(eyre!("Invalid variable name ({var}) in `in_mapping`."));
            }
        }

        if self.out_result != 0
            && (self.in_mapping.contains_key(&self.out_result) || self.in_update == self.out_result)
        {
            return Err(eyre!("Recursive expression not allowed."));
        }

        if self.in_update != 0 && self.in_mapping.contains_key(&self.in_update) {
            return Err(eyre!("`in_update` cannot overlap with `in_mapping`."));
        }

        Ok(Box::new(self))
    }

    #[inline]
    fn in_signals(&self) -> BTreeSet<SignalId> {
        let mut signals: BTreeSet<_> = self.in_mapping.keys().cloned().collect();
        signals.extend([self.in_update, self.lo_response]);
        signals
    }

    #[inline]
    fn out_signals(&self) -> BTreeSet<SignalId> {
        BTreeSet::from([self.lo_response, self.out_result])
    }

    fn resources(&self, _config: &Config) -> Vec<ResourceAddr> {
        let mut resources = vec![];
        if let OptionalPath::Some(src) = &self.src {
            resources.push(ResourceAddr::Text(src.clone()));
        }
        if let OptionalPath::Some(src) = &self.init_src {
            resources.push(ResourceAddr::Text(src.clone()));
        }
        resources
    }

    fn stateful(
        &self,
        _io: &IoManager,
        res: &ResourceManager,
        config: &Config,
        _sync_writer: &QWriter<SyncSignal>,
        _async_writer: &QWriter<AsyncSignal>,
    ) -> Result<Box<dyn StatefulAction>> {
        let interpreter = self.interpreter.or(&config.interpreter());

        let init = if let OptionalPath::Some(src) = &self.init_src {
            match res.fetch(&ResourceAddr::Text(src.clone()))? {
                ResourceValue::Text(expr) => (*expr).clone(),
                _ => return Err(eyre!("Resource address and value types don't match.")),
            }
        } else if let OptionalString::Some(expr) = &self.init_expr {
            expr.clone()
        } else {
            "".to_owned()
        }
        .trim()
        .to_owned();

        let expr = if let OptionalPath::Some(src) = &self.src {
            match res.fetch(&ResourceAddr::Text(src.clone()))? {
                ResourceValue::Text(expr) => (*expr).clone(),
                _ => return Err(eyre!("Resource address and value types don't match.")),
            }
        } else if let OptionalString::Some(expr) = &self.expr {
            expr.clone()
        } else {
            "".to_owned()
        }
        .trim()
        .to_owned();

        if expr.is_empty() {
            return Err(eyre!("Fn expression cannot be empty."));
        }

        let mut vars = self.vars.clone();
        vars.entry("self".to_owned()).or_insert(Value::Null);

        for (_, var) in self.in_mapping.iter() {
            if !vars.contains_key(var) {
                return Err(eyre!("Undefined variable ({var}) in `in_mapping`."));
            }
        }

        let evaluator = interpreter
            .parse(&init, &expr, &mut vars)
            .wrap_err("Failed to initialize function evaluator.")?;

        Ok(Box::new(StatefulFunction {
            done: false,
            name: self.name.clone(),
            vars,
            evaluator,
            on_start: self.on_start,
            on_change: self.on_change,
            once: self.once,
            in_mapping: self.in_mapping.clone(),
            in_update: self.in_update,
            lo_response: self.lo_response,
            out_result: self.out_result,
        }))
    }
}

impl StatefulAction for StatefulFunction {
    impl_stateful!();

    #[inline(always)]
    fn props(&self) -> Props {
        if self.once { DEFAULT } else { INFINITE }.into()
    }

    fn start(
        &mut self,
        sync_writer: &mut QWriter<SyncSignal>,
        async_writer: &mut QWriter<AsyncSignal>,
        state: &State,
    ) -> Result<Signal> {
        for (id, var) in self.in_mapping.iter() {
            if let Some(entry) = self.vars.get_mut(var) {
                if let Some(value) = state.get(id) {
                    *entry = value.clone();
                }
            }
        }

        if self.on_start {
            if self.once && self.lo_response == 0 {
                self.done = true;
                sync_writer.push(SyncSignal::UpdateGraph);
            }

            self.eval(sync_writer, async_writer)
                .wrap_err("Failed to evaluate function.")
        } else {
            Ok(Signal::none())
        }
    }

    fn update(
        &mut self,
        signal: &ActionSignal,
        sync_writer: &mut QWriter<SyncSignal>,
        async_writer: &mut QWriter<AsyncSignal>,
        state: &State,
    ) -> Result<Signal> {
        let mut news: Vec<(SignalId, Value)> = vec![];
        let mut changed = false;
        let mut updated = false;
        if let ActionSignal::StateChanged(_, signal) = signal {
            for id in signal {
                if let Some(var) = self.in_mapping.get(id) {
                    if let Some(entry) = self.vars.get_mut(var) {
                        *entry = state.get(id).unwrap().clone();
                    }
                    changed = true;
                }

                if *id == self.lo_response {
                    let result = state.get(id).unwrap();
                    self.vars.insert("self".to_owned(), result.clone());

                    if !self.name.is_empty() {
                        async_writer.push(LoggerSignal::Append(
                            "function".to_owned(),
                            (self.name.clone(), result.clone()),
                        ));
                    }

                    if self.out_result > 0 {
                        news.push((self.out_result, result.clone()));
                    }

                    if self.once {
                        self.done = true;
                        sync_writer.push(SyncSignal::UpdateGraph);
                    }
                }
            }

            if signal.contains(&self.in_update) {
                updated = true;
            }
        }

        if (changed && self.on_change) || updated {
            news.extend(
                self.eval(sync_writer, async_writer)
                    .wrap_err("Failed to evaluate function.")?,
            );
        }

        Ok(news.into())
    }

    fn debug(&self) -> Vec<(&str, String)> {
        <dyn StatefulAction>::debug(self)
            .into_iter()
            .chain([("name", format!("{:?}", self.name))])
            .collect()
    }
}

impl StatefulFunction {
    #[inline(always)]
    fn eval(
        &mut self,
        sync_writer: &mut QWriter<SyncSignal>,
        async_writer: &mut QWriter<AsyncSignal>,
    ) -> Result<Signal> {
        if self.lo_response > 0 {
            self.eval_lazy(sync_writer)
        } else {
            self.eval_blocking(async_writer)
        }
    }

    fn eval_blocking(&mut self, async_writer: &mut QWriter<AsyncSignal>) -> Result<Signal> {
        let result = self.evaluator.eval(&mut self.vars)?;

        self.vars.insert("self".to_owned(), result.clone());

        if !self.name.is_empty() {
            async_writer.push(LoggerSignal::Append(
                "function".to_owned(),
                (self.name.clone(), result.clone()),
            ));
        }

        if self.out_result > 0 {
            Ok(vec![(self.out_result, result)].into())
        } else {
            Ok(Signal::none())
        }
    }

    fn eval_lazy(&mut self, sync_writer: &mut QWriter<SyncSignal>) -> Result<Signal> {
        let loopback = {
            let signal_id = self.lo_response;
            let mut sync_writer = sync_writer.clone();

            Box::new(move |value: Value| {
                sync_writer.push(SyncSignal::Emit(
                    Instant::now(),
                    Signal::from(vec![(signal_id, value)]),
                ));
            })
        };

        let error = {
            let mut sync_writer = sync_writer.clone();

            Box::new(move |e: Error| {
                sync_writer.push(SyncSignal::Error(e));
            })
        };

        self.evaluator.eval_lazy(&mut self.vars, loopback, error)?;
        Ok(Signal::none())
    }
}