dvcompute_gpss_cons 1.3.4

Discrete event simulation library (support of GPSS-like DSL language for conservative distributed simulation)
Documentation
// Copyright (c) 2020-2022  David Sorokin <david.sorokin@gmail.com>, based in Yoshkar-Ola, Russia
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::rc::Rc;
use std::marker::PhantomData;

use dvcompute_utils::simulation::arrival::*;

use dvcompute::simulation::simulation::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::observable::*;
use dvcompute::simulation::observable::observer::*;
use dvcompute::simulation::observable::disposable::*;
use dvcompute::simulation::stream::*;

use crate::simulation::block::*;
use crate::simulation::block::ops::*;
use crate::simulation::transact::*;

/// The generator block.
pub trait GeneratorBlock<T>: Block<Input = BlockFn<T, ()>, Output = ()> {}

/// Return a generator block by the specified observable using zero priority.
#[inline]
pub fn observable_generator_block0<T, O>(observable: O) -> impl GeneratorBlock<Rc<Transact<T>>>
    where O: Observable<Message = Rc<Arrival<T>>> + 'static,
          T: Clone + 'static
{
    observable_generator_block(observable, 0)
}

/// Return a generator block by the specified observable and priority.
#[inline]
pub fn observable_generator_block<T, O>(observable: O, priority: isize) -> impl GeneratorBlock<Rc<Transact<T>>>
    where O: Observable<Message = Rc<Arrival<T>>> + 'static,
          T: Clone + 'static
{
    observable_generator_block_c(observable, move || { return_event(priority) })
}

/// Return a generator block by the specified observable and priority.
#[inline]
pub fn observable_generator_block_c<T, O, F, M>(observable: O, priority: F) -> impl GeneratorBlock<Rc<Transact<T>>>
    where O: Observable<Message = Rc<Arrival<T>>> + 'static,
          F: Fn() -> M + 'static,
          M: Event<Item = isize> + 'static,
          T: Clone + 'static
{
    ObservableGenerator { observable: observable, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
}

/// Return a generator block by the specified stream using zero priority.
#[inline]
pub fn stream_generator_block0<T>(stream: Stream<Rc<Arrival<T>>>) -> impl GeneratorBlock<Rc<Transact<T>>>
    where T: Clone + 'static
{
    stream_generator_block(stream, 0)
}

/// Return a generator block by the specified stream and priority.
#[inline]
pub fn stream_generator_block<T>(stream: Stream<Rc<Arrival<T>>>, priority: isize) -> impl GeneratorBlock<Rc<Transact<T>>>
    where T: Clone + 'static
{
    stream_generator_block_c(stream, move || { return_event(priority) })
}

/// Return a generator block by the specified stream and priority.
#[inline]
pub fn stream_generator_block_c<T, F, M>(stream: Stream<Rc<Arrival<T>>>, priority: F) -> impl GeneratorBlock<Rc<Transact<T>>>
    where F: Fn() -> M + 'static,
          M: Event<Item = isize> + 'static,
          T: Clone + 'static
{
    StreamGenerator { stream: stream, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
}

/// The generator block by the specified observable and priority.
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
struct ObservableGenerator<T, O, F, M> {

    /// The observable computation of time delays.
    observable: O,

    /// The computation of priorities.
    priority: F,

    /// To keep the type parameter.
    _phantom1: PhantomData<M>,

    /// To keep the type parameter.
    _phantom2: PhantomData<T>
}

impl<T, O, F, M> Block for ObservableGenerator<T, O, F, M>
    where O: Observable<Message = Rc<Arrival<T>>> + 'static,
          F: Fn() -> M + 'static,
          M: Event<Item = isize> + 'static,
          T: Clone + 'static
{
    type Input  = BlockFn<Rc<Transact<T>>, ()>;
    type Output = ();

    #[doc(hidden)]
    fn call_block<C>(self, a: Self::Input, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
        where C: FnOnce(simulation::Result<Self::Output>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
    {
        let cont = ProcessBoxCont::new(cont);
        self.call_block_boxed(a, cont, pid, p)
    }

    #[doc(hidden)]
    fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
        if is_process_cancelled(&pid, p) {
            revoke_process_boxed(cont, pid, p)
        } else {
            let ObservableGenerator { observable, priority, _phantom1, _phantom2 } = self;
            let h = {
                observable.subscribe({
                    cons_observer(move |arrival: &Rc<Arrival<T>>, p| {
                        let chain = a.clone();
                        priority()
                            .flat_map(move |priority| {
                                Transact::new(arrival.clone(), priority)
                                    .into_event()
                                    .flat_map(move |t| {
                                        let t = Rc::new(t);
                                        Transact::take(t.clone())
                                            .flat_map(move |()| {
                                                let block = chain.next();
                                                block.run(t)
                                            })
                                            .run()
                                    })
                            })
                            .call_event(p)
                    })
                }).call_event(p)?
            };
            never_process()
                .finally({
                    h.into_event().into_process()
                })
                .call_process_boxed(cont, pid, p)
        }
    }
}

impl<T, O, F, M> GeneratorBlock<Rc<Transact<T>>> for ObservableGenerator<T, O, F, M>
    where O: Observable<Message = Rc<Arrival<T>>> + 'static,
          F: Fn() -> M + 'static,
          M: Event<Item = isize> + 'static,
          T: Clone + 'static
{}

/// The generator block by the specified stream and priority.
#[must_use = "computations are lazy and do nothing unless to be run"]
struct StreamGenerator<T, F, M> {

  /// The stream computation of time delays.
  stream: Stream<Rc<Arrival<T>>>,

  /// The computation of priorities.
  priority: F,

  /// To keep the type parameter.
  _phantom1: PhantomData<M>,

  /// To keep the type parameter.
  _phantom2: PhantomData<T>
}

impl<T, F, M> Block for StreamGenerator<T, F, M>
    where F: Fn() -> M + 'static,
          M: Event<Item = isize> + 'static,
          T: Clone + 'static
{
    type Input  = BlockFn<Rc<Transact<T>>, ()>;
    type Output = ();

    #[doc(hidden)]
    fn call_block<C>(self, a: Self::Input, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
        where C: FnOnce(simulation::Result<Self::Output>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
    {
        let cont = ProcessBoxCont::new(cont);
        self.call_block_boxed(a, cont, pid, p)
    }

    #[doc(hidden)]
    fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
        if is_process_cancelled(&pid, p) {
            revoke_process_boxed(cont, pid, p)
        } else {
            let StreamGenerator { stream, priority, _phantom1, _phantom2 } = self;
            generator_stream_loop(stream, priority, a)
                .call_process_boxed(cont, pid, p)
        }
    }
}

/// Handle the specified stream.
fn generator_stream_loop<T, F, M>(stream: Stream<Rc<Arrival<T>>>, priority: F, a: BlockFn<Rc<Transact<T>>, ()>) -> ProcessBox<()>
    where F: Fn() -> M + 'static,
          M: Event<Item = isize> + 'static,
          T: Clone + 'static
{
    let Stream::Cons(comp) = stream;
    comp.flat_map(move |(arrival, xs)| {
        let chain = a.clone();
        priority().flat_map(move |priority| {
            Transact::new(arrival, priority)
                .into_event()
                .flat_map(move |t| {
                    let t = Rc::new(t);
                    Transact::take(t.clone())
                        .flat_map(move |()| {
                            let block = chain.next();
                            block.run(t)
                        })
                        .run()
                })
        })
        .into_process()
        .flat_map(move |()| {
            generator_stream_loop(xs, priority, a)
        })
    }).into_boxed()
}

impl<T, F, M> GeneratorBlock<Rc<Transact<T>>> for StreamGenerator<T, F, M>
    where F: Fn() -> M + 'static,
          M: Event<Item = isize> + 'static,
          T: Clone + 'static
{}