dvcompute 2.0.0

Discrete event simulation library (sequential simulation)
Documentation
// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::cell::RefCell;

use crate::simulation::Point;
use crate::simulation::observable::*;
use crate::simulation::composite::*;
use crate::simulation::event::Event;

use dvcompute_utils::grc::Grc;

/// Represents the history of the signal values.
pub struct ObservableHistory<T> where T: 'static {

    /// The time values.
    times: RefCell<Vec<f64>>,

    /// The values themselves.
    values: RefCell<Vec<T>>
}

/// Represents the history data of the signal values.
#[derive(Clone)]
pub struct ObservableHistoryData<T> where T: 'static {

    /// The time values.
    pub times: Vec<f64>,

    /// The values themselves.
    pub values: Vec<T>
}

impl<T> ObservableHistory<T> {

    /// Read the history size at the specified modeling time.
    pub fn len_at(&self, _p: &Point) -> usize {
        let values = self.values.borrow();
        values.len()
    }

    /// Read the history size.
    pub fn len(hist: Grc<ObservableHistory<T>>) -> impl Event<Item = usize> + Clone {
        cons_event(move |p: &Point| {
            Result::Ok(hist.len_at(p))
        })
    }

    /** Create a new signal history by the optional initial value, where the values can be emitted by the source provided. */
    pub fn new<O>(comp: O, init: Option<T>, source: Option<Grc<ObservableSource<T>>>) -> CompositeBox<Grc<ObservableHistory<T>>>
        where O: Observable<Message = T> + 'static, 
              T: Clone + 'static
    {
        let hist = Grc::new(ObservableHistory {
            times: RefCell::new(vec![]), 
            values: RefCell::new(vec![])
        });

        let source_clone = source.clone();

        return_composite(hist)
            .and_then(move |hist| {
                if let Some(init) = init {
                    cons_event(move |p: &Point| {
                        {
                            let mut times = hist.times.borrow_mut();
                            let mut values = hist.values.borrow_mut();

                            times.push(p.time);
                            values.push(init.clone());
                        }

                        if let Some(source) = source_clone {
                            source.trigger_at(&init, p)?;
                        }

                        Result::Ok(hist)
                    })
                    .into_composite()
                    .into_boxed()

                } else {
                    return_composite(hist)
                        .into_boxed()
                }
            })
            .and_then(move |hist| {
                comp.subscribe({
                    let hist = hist.clone();
                    cons_observer(move |msg: &T, p: &Point| {
                        {
                            let mut times = hist.times.borrow_mut();
                            let mut values = hist.values.borrow_mut();

                            times.push(p.time);
                            values.push(msg.clone());
                        }

                        match &source {
                            None => {},
                            Some(source) => {
                                source.trigger_at(msg, p)?;
                            }
                        }

                        Result::Ok(())
                    })
                })
                .into_composite()
                .and_then(move |h| {
                    disposable_composite(h)
                })
                .and_then(move |()| {
                    return_composite(hist)
                })
            })
            .into_boxed()
    }

    /** Create a new signal history of the specified size by the accumulator and optional initial value. */
    pub fn new_grid<O, F>(comp: O, size: usize, accum: F, init: Option<T>) -> CompositeBox<Grc<ObservableHistory<T>>>
        where O: Observable<Message = T> + 'static, 
              T: Clone + 'static,
              F: Fn(&T, &T) -> T + 'static
    {
        let hist = Grc::new(ObservableHistory {
            times: RefCell::new(vec![]), 
            values: RefCell::new(vec![])
        });

        return_composite(hist)
            .and_then(move |hist| {
                if let Some(init) = init {
                    cons_event(move |p: &Point| {
                        {
                            let mut times = hist.times.borrow_mut();
                            let mut values = hist.values.borrow_mut();

                            times.push(p.time);
                            values.push(init.clone());
                        }

                        Result::Ok(hist)
                    })
                    .into_composite()
                    .into_boxed()

                } else {
                    return_composite(hist)
                        .into_boxed()
                }
            })
            .and_then(move |hist| {
                comp.subscribe({
                    let hist = hist.clone();
                    cons_observer(move |msg: &T, p: &Point| {
                        let mut times = hist.times.borrow_mut();
                        let mut values = hist.values.borrow_mut();

                        if times.is_empty() {
                            times.push(p.time);
                            values.push(msg.clone());

                        } else {
                            let specs = &p.run.specs;

                            let n = times.len();

                            let t0 = &mut times[n - 1];
                            let v0 = &mut values[n - 1];

                            let index0 = specs.grid_index(*t0, size);
                            let index2 = specs.grid_index(p.time, size);

                            if (index0 != index2) || (*t0 <= specs.grid_time(index0 - 1, size)) {
                                times.push(p.time);
                                values.push(msg.clone());

                            } else {
                                *t0 = p.time;
                                *v0 = accum(v0, msg);
                            }
                        }

                        Result::Ok(())
                    })
                })
                .into_composite()
                .and_then(move |h| {
                    disposable_composite(h)
                })
                .and_then(move |()| {
                    return_composite(hist)
                })
            })
            .into_boxed()
    }

    /** Create a new signal history of the limit size, which is synchronized with IO. */
    pub fn io_with_limit_size<O>(comp: O, limit_size: usize, init: Option<T>) -> CompositeBox<ObservableBox<ObservableHistoryData<T>>>
        where O: Observable<Message = T> + 'static, 
              T: Clone + 'static
    {
        let signal_src = Grc::new(ObservableSource::new());

        let txs_src = Grc::new(ObservableSource::new());
        let txs_src_clone = txs_src.clone();

        ObservableHistory::new(comp, init, Some(signal_src.clone()))
            .and_then(move |hist: Grc<ObservableHistory<T>>| {
                let txs_src_clone = txs_src.clone();
                let hist_clone = hist.clone();

                signal_src
                    .publish()
                    .subscribe(cons_observer(move |_msg: &T, p: &Point| {
                        let len = hist.len_at(p);
                        if len > limit_size {
                            let comp = {
                                let txs_src = txs_src.clone();
                                let hist = hist.clone();

                                enqueue_io_event(p.time, cons_event(move |p: &Point| {
                                    let len = hist.len_at(p);
                                    if len > limit_size {
                                        let comp = 
                                            ObservableHistory::take(hist)
                                                .and_then(move |txs: ObservableHistoryData<T>| {
                                                    txs_src.trigger(txs)
                                                });
                                                
                                        comp.call_event(p)?;
                                    }

                                    Result::Ok(())
                                })
                                .into_boxed())
                            };

                            comp.call_event(p)?;
                        }

                        Result::Ok(())
                    }))
                    .into_composite()
                    .and_then(|h: DisposableBox| {
                        disposable_composite(h)
                    })
                    .and_then(move |()| {
                        let flag = Grc::new(RefComp::new(true));
                        let flag_clone = flag.clone();

                        cons_event(move |p: &Point| {
                            enqueue_io_event(p.run.specs.stop_time, 
                                cons_event(move |p: &Point| {
                                    if flag.read_at(p) && (hist_clone.len_at(p) > 0) {
                                        let comp = 
                                        ObservableHistory::take(hist_clone)
                                            .and_then(move |txs: ObservableHistoryData<T>| {
                                                txs_src_clone.trigger(txs)
                                            });
                                            
                                        comp.call_event(p)?;
                                    }

                                    Result::Ok(())
                                }).into_boxed())
                            .call_event(p)
                        })
                        .into_composite()
                        .and_then(move |()| {
                            disposable_composite(cons_disposable(move |p: &Point| {
                                flag_clone.write_at(false, p);
                                Result::Ok(())
                            }))
                        })
                    })
            })
            .map(move |()| {
                txs_src_clone.publish()
                    .into_boxed()
            })
            .into_boxed()
    }

    /** Add the value to the history at the current modeling time. */
    pub fn push(hist: Grc<Self>, item: T) -> impl Event<Item = ()> {
        cons_event(move |p: &Point| {
            let mut times = hist.times.borrow_mut();
            let mut values = hist.values.borrow_mut();

            times.push(p.time);
            values.push(item);

            Result::Ok(())
        })
    }

    /** Read the history of the signal values. */
    pub fn read(hist: Grc<Self>) -> impl Event<Item = ObservableHistoryData<T>>
        where T: Clone + 'static
    {
        cons_event(move |_p: &Point| {
            let times = hist.times.borrow();
            let values = hist.values.borrow();

            Result::Ok(ObservableHistoryData {
                times: times.to_vec(),
                values: values.to_vec()
            })
        })
    }

    /** Take the history of the signal values, after which the history becomes empty. */
    pub fn take(hist: Grc<Self>) -> impl Event<Item = ObservableHistoryData<T>> {
        cons_event(move |_p: &Point| {
            let times = hist.times.replace(vec![]);
            let values = hist.values.replace(vec![]);

            Result::Ok(ObservableHistoryData {
                times: times,
                values: values
            })
        })
    }

    /** Reset the history of the signal values. */
    pub fn reset(hist: Grc<Self>) -> impl Event<Item = ()> {
        cons_event(move |_p: &Point| {
            let mut times = hist.times.borrow_mut();
            let mut values = hist.values.borrow_mut();

            times.clear();
            values.clear();

            Result::Ok(())
        })
    }
}