pub trait Indicator {
type Input;
type Output;
fn update(&mut self, input: Self::Input) -> Option<Self::Output>;
fn reset(&mut self);
fn warmup_period(&self) -> usize;
fn is_ready(&self) -> bool;
fn name(&self) -> &'static str;
}
pub trait BatchExt: Indicator {
fn batch(&mut self, inputs: &[Self::Input]) -> Vec<Option<Self::Output>>
where
Self::Input: Clone,
{
let mut out = Vec::with_capacity(inputs.len());
for x in inputs {
out.push(self.update(x.clone()));
}
out
}
#[cfg(feature = "parallel")]
fn batch_parallel<F>(
inputs_per_asset: &[Vec<Self::Input>],
make: F,
) -> Vec<Vec<Option<Self::Output>>>
where
Self: Sized + Send,
Self::Input: Sync + Clone,
Self::Output: Send,
F: Fn() -> Self + Sync + Send,
{
use rayon::prelude::*;
inputs_per_asset
.par_iter()
.map(|series| {
let mut ind = make();
ind.batch(series)
})
.collect()
}
}
impl<T: Indicator> BatchExt for T {}
#[derive(Debug, Clone)]
pub struct Chain<A, B>
where
A: Indicator<Input = f64, Output = f64>,
B: Indicator<Input = f64>,
{
first: A,
second: B,
}
impl<A, B> Chain<A, B>
where
A: Indicator<Input = f64, Output = f64>,
B: Indicator<Input = f64>,
{
pub const fn new(first: A, second: B) -> Self {
Self { first, second }
}
pub fn then<C>(self, third: C) -> Chain<Self, C>
where
C: Indicator<Input = f64>,
Self: Indicator<Input = f64, Output = f64>,
{
Chain::new(self, third)
}
pub const fn first(&self) -> &A {
&self.first
}
pub const fn second(&self) -> &B {
&self.second
}
}
impl<A, B> Indicator for Chain<A, B>
where
A: Indicator<Input = f64, Output = f64>,
B: Indicator<Input = f64>,
{
type Input = f64;
type Output = B::Output;
fn update(&mut self, input: f64) -> Option<Self::Output> {
self.first.update(input).and_then(|v| self.second.update(v))
}
fn reset(&mut self) {
self.first.reset();
self.second.reset();
}
fn warmup_period(&self) -> usize {
self.first.warmup_period() + self.second.warmup_period()
}
fn is_ready(&self) -> bool {
self.first.is_ready() && self.second.is_ready()
}
fn name(&self) -> &'static str {
"Chain"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Default)]
struct Identity {
seen: bool,
}
impl Indicator for Identity {
type Input = f64;
type Output = f64;
fn update(&mut self, input: f64) -> Option<f64> {
self.seen = true;
Some(input)
}
fn reset(&mut self) {
self.seen = false;
}
fn warmup_period(&self) -> usize {
0
}
fn is_ready(&self) -> bool {
self.seen
}
fn name(&self) -> &'static str {
"Identity"
}
}
#[derive(Debug, Default)]
struct Doubler {
seen: bool,
}
impl Indicator for Doubler {
type Input = f64;
type Output = f64;
fn update(&mut self, input: f64) -> Option<f64> {
self.seen = true;
Some(input * 2.0)
}
fn reset(&mut self) {
self.seen = false;
}
fn warmup_period(&self) -> usize {
0
}
fn is_ready(&self) -> bool {
self.seen
}
fn name(&self) -> &'static str {
"Doubler"
}
}
#[test]
fn batch_replays_update() {
let mut id = Identity::default();
let out = id.batch(&[1.0, 2.0, 3.0]);
assert_eq!(out, vec![Some(1.0), Some(2.0), Some(3.0)]);
}
#[test]
fn chain_pipes_first_into_second() {
let mut c = Chain::new(Doubler::default(), Doubler::default());
assert_eq!(c.update(5.0), Some(20.0));
}
#[test]
fn chain_is_ready_only_after_both_stages_emit() {
let mut c = Chain::new(Doubler::default(), Doubler::default());
assert!(!c.is_ready());
c.update(1.0);
assert!(c.is_ready());
}
#[test]
fn chain_reset_propagates() {
let mut c = Chain::new(Doubler::default(), Doubler::default());
c.update(1.0);
assert!(c.is_ready());
c.reset();
assert!(!c.is_ready());
}
#[test]
fn chain_three_levels_via_then() {
let c = Chain::new(Doubler::default(), Doubler::default()).then(Doubler::default());
let mut c = c;
assert_eq!(c.update(1.0), Some(8.0));
}
#[cfg(feature = "parallel")]
#[test]
fn batch_parallel_runs_independent_instances() {
let series: Vec<Vec<f64>> = vec![vec![1.0, 2.0, 3.0], vec![4.0, 5.0, 6.0]];
let out = Doubler::batch_parallel(&series, Doubler::default);
assert_eq!(out.len(), 2);
assert_eq!(out[0], vec![Some(2.0), Some(4.0), Some(6.0)]);
assert_eq!(out[1], vec![Some(8.0), Some(10.0), Some(12.0)]);
}
}