dvcompute 2.0.0

Discrete event simulation library (sequential simulation)
Documentation
// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use crate::simulation::event::*;
use crate::simulation::process::*;

/// Random streams.
pub mod random;

/// Additional operations.
pub mod ops;

/// Return a stream of values generated by processes supplied by the specified function.
pub fn repeat_process<T, F, M>(f: F) -> Stream<T>
    where F: Fn() -> M + 'static,
          M: Process<Item = T> + 'static,
          T: 'static
{
    let y = {
        let comp = f();
        comp.and_then(move |a| {
                return_process((a, repeat_process(f)))
            })
            .into_boxed()
    };
    Stream::Cons(y)
}

/// An empty stream that never returns data.
pub fn empty_stream<T>() -> Stream<T>
    where T: 'static
{
    let y = never_process().into_boxed();
    Stream::Cons(y)
}

/// Return a stream consisting of exactly one element and inifinite tail.
pub fn singleton_stream<T>(val: T) -> Stream<T>
    where T: 'static
{
    let y = return_process((val, empty_stream())).into_boxed();
    Stream::Cons(y)
}

/// An infinite stream of data.
pub enum Stream<T> {

    /// The cons-cell.
    Cons(ProcessBox<(T, Stream<T>)>)
}

impl<T> Stream<T> {

    /// Run the stream computation.
    pub fn run(self) -> ProcessBox<(T, Self)> {
        let Stream::Cons(y) = self;
        y
    }

    /// Map the stream according the specified function.
    pub fn map<F, B>(self, f: F) -> Stream<B>
        where F: Fn(T) -> B + 'static,
              B: 'static,
              T: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    let b = f(a);
                    return_process((b, xs.map(f)))
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }

    /// Compose the stream.
    pub fn mapc<F, M, B>(self, f: F) -> Stream<B>
        where F: Fn(T) -> M + 'static,
              M: Event<Item = B> + 'static,
              B: 'static,
              T: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    f(a).into_process().and_then(move |b| {
                        return_process((b, xs.mapc(f)))
                    })
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }

    /// Accumulator that outputs a value determined by the specified function.
    pub fn accum<F, M, B, Acc>(self, f: F, acc: Acc) -> Stream<B>
        where F: Fn(Acc, T) -> M + 'static,
              M: Event<Item = (Acc, B)> + 'static,
              B: 'static,
              T: 'static,
              Acc: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    f(acc, a).into_process().and_then(move |(acc, b)| {
                        return_process((b, xs.accum(f, acc)))
                    })
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }

    /// Filter only those data values that satisfy to the specified predicate.
    pub fn filter<F>(self, pred: F) -> Self
        where F: Fn(&T) -> bool + 'static,
              T: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    if pred(&a) {
                        return_process((a, xs.filter(pred))).into_boxed()
                    } else {
                        let Stream::Cons(comp) = xs.filter(pred);
                        comp
                    }
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }

    /// Filter only those data values that satisfy to the specified predicate.
    pub fn filterc<F, M>(self, pred: F) -> Self
        where F: Fn(&T) -> M + 'static,
              M: Event<Item = bool> + 'static,
              T: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    pred(&a).into_process().and_then(move |b| {
                        if b {
                            return_process((a, xs.filterc(pred))).into_boxed()
                        } else {
                            let Stream::Cons(comp) = xs.filterc(pred);
                            comp
                        }
                    })
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }

    /// Return the prefix of the stream of the specified length.
    pub fn take(self, n: isize) -> Self
        where T: 'static
    {
        if n <= 0 {
            empty_stream()
        } else {
            let y = {
                let Stream::Cons(comp) = self;
                comp.and_then(move |(a, xs)| {
                        return_process((a, xs.take(n - 1)))
                    })
                    .into_boxed()
            };
            Stream::Cons(y)
        }
    }

    /// Return the longest prefix of the stream of elements that satisfy the predicate.
    pub fn take_while<F>(self, pred: F) -> Self
        where F: Fn(&T) -> bool + 'static,
              T: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    if pred(&a) {
                        return_process((a, xs.take_while(pred))).into_boxed()
                    } else {
                        never_process().into_boxed()
                    }
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }

    /// Return the longest prefix of the stream of elements that satisfy the computations.
    pub fn take_while_c<F, M>(self, pred: F) -> Self
        where F: Fn(&T) -> M + 'static,
              M: Event<Item = bool> + 'static,
              T: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    pred(&a).into_process().and_then(move |b| {
                        if b {
                            return_process((a, xs.take_while_c(pred))).into_boxed()
                        } else {
                            never_process().into_boxed()
                        }
                    })
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }

    /// Return the suffix of the stream after the specified first elements.
    pub fn drop(self, n: isize) -> Self
        where T: 'static
    {
        if n <= 0 {
            self
        } else {
            let y = {
                let Stream::Cons(comp) = self;
                comp.and_then(move |(_, xs)| {
                        xs.drop(n - 1).run()
                    })
                    .into_boxed()
            };
            Stream::Cons(y)
        }
    }

    /// Return the suffix of the stream of elements remaining after `take_while`.
    pub fn drop_while<F>(self, pred: F) -> Self
        where F: Fn(&T) -> bool + 'static,
              T: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    if pred(&a) {
                        xs.drop_while(pred).run()
                    } else {
                        return_process((a, xs)).into_boxed()
                    }
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }

    /// Return the suffix of the stream of elements remaining after `take_while_c`.
    pub fn drop_while_c<F, M>(self, pred: F) -> Self
        where F: Fn(&T) -> M + 'static,
              M: Event<Item = bool> + 'static,
              T: 'static
    {
        let y = {
            let Stream::Cons(comp) = self;
            comp.and_then(move |(a, xs)| {
                    pred(&a).into_process().and_then(move |b| {
                        if b {
                            xs.drop_while_c(pred).run()
                        } else {
                            return_process((a, xs)).into_boxed()
                        }
                    })
                })
                .into_boxed()
        };
        Stream::Cons(y)
    }
}