dvcompute 2.0.0

Discrete event simulation library (sequential simulation)
Documentation
// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::ops::Deref;

use crate::simulation;
use crate::simulation::point::Point;
use crate::simulation::observable::*;
use crate::simulation::observable::disposable::*;
use crate::simulation::observable::observer::*;
use crate::simulation::event::Event;
use crate::simulation::event::EventBox;
use crate::simulation::event::cons_event;
use crate::simulation::ref_comp::RefComp;

use dvcompute_utils::grc::Grc;
use dvcompute_utils::grc::Weak;

/// The source of `Observable` computation.
#[derive(Clone)]
pub struct ObservableSource<M> where M: 'static {

    /// The list of observers.
    observers: Grc<RefComp<ObserverList<M>>>
}

impl<M> ObservableSource<M> {

    /// Create a new source of observable messages.
    pub fn new() -> ObservableSource<M> {
        ObservableSource {
            observers: Grc::new(RefComp::new(ObserverList::Nil))
        }
    }

    /// Trigger the message at the specified time point.
    pub fn trigger_at(&self, message: &M, p: &Point) -> simulation::Result<()> {
        let observers = self.observers.read_at(p);
        trigger_observers(&observers, message, p)
    }

    /// Trigger the message within the `Event` computation.
    pub fn trigger(&self, message: M) -> Trigger<M> {
        Trigger { observers: self.observers.clone(), message: message }
    }

    /// Publish the observable.
    pub fn publish(&self) -> Publish<M> where M: 'static {
        Publish { observers: Grc::downgrade(&self.observers) }
    }
}

/// The trigger computation by cloning the reference.
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Trigger<M> where M: 'static {

    /// The source of observable messages.
    observers: Grc<RefComp<ObserverList<M>>>,

    /// The message.
    message: M
}

impl<M> Event for Trigger<M> {

    type Item = ();

    #[doc(hidden)]
    #[inline]
    fn call_event(self, p: &Point) -> simulation::Result<()> {
        let Trigger { observers, message } = self;
        let observers = observers.read_at(p);
        trigger_observers(&observers, &message, p)
    }
}

/// The publishing computation.
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Publish<M> where M: 'static {

    /// The source of observable messages.
    observers: Weak<RefComp<ObserverList<M>>>
}

impl<M> Observable for Publish<M> where M: 'static {

    type Message = M;

    #[inline]
    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
        where O: Observer<Message = Self::Message, Item = ()> + 'static
    {
        let Publish { observers } = self;
        cons_event(move |p: &Point| {
            match observers.upgrade() {
                None => {
                    Result::Ok(empty_disposable().into_boxed())
                },
                Some(observers) => {
                    let observer   = Grc::new(observer.into_boxed());
                    let disposable = SourceDisposable {
                        observers: Grc::downgrade(&observers),
                        observer: Grc::downgrade(&observer)
                    };
                    let disposable = disposable.into_boxed();
                    let observers0 = observers.read_at(p);
                    let observers2 = add_observer(observer, Grc::new(observers0));
                    observers.write_at(observers2, p);

                    Result::Ok(disposable)
                }
            }
        }).into_boxed()
    }
}

/// The disposable object.
struct SourceDisposable<M> where M: 'static {

    /// The source of observable messages.
    observers: Weak<RefComp<ObserverList<M>>>,

    /// The observer that should be disposed.
    observer: Weak<ObserverBox<M, ()>>
}

impl<M> Disposable for SourceDisposable<M>
    where M: 'static
{
    #[doc(hidden)]
    #[inline]
    fn dispose(self, p: &Point) -> simulation::Result<()> {
        let SourceDisposable { observers, observer } = self;
        match observers.upgrade() {
            None => (),
            Some(observers) => {
                match observer.upgrade() {
                    None => (),
                    Some(observer) => {
                        let observers0 = observers.read_at(p);
                        let observers2 = delete_observer(observer, &observers0);
                        observers.write_at(observers2, p);
                    }
                }
            }
        }
        Result::Ok(())
    }
}

impl<M> Clone for SourceDisposable<M> {

    fn clone(&self) -> Self {
        SourceDisposable {
            observers: self.observers.clone(),
            observer: self.observer.clone()
        }
    }
}

/// The list of observers.
enum ObserverList<M> where M: 'static {

    /// The cons cell.
    Cons(Grc<ObserverBox<M, ()>>, Grc<ObserverList<M>>),

    /// An empty list.
    Nil
}

impl<M> Clone for ObserverList<M> {

    fn clone(&self) -> Self {
        match self {
            &ObserverList::Cons(ref observer, ref tail) => {
                ObserverList::Cons(observer.clone(), tail.clone())
            },
            &ObserverList::Nil => ObserverList::Nil
        }
    }
}

/// Trigger the observers passing in the specified message to them.
fn trigger_observers<M>(observers: &ObserverList<M>, m: &M, p: &Point) -> simulation::Result<()> {
    let mut observers = observers;
    loop {
        match observers {
            &ObserverList::Nil => return Result::Ok(()),
            &ObserverList::Cons(ref observer, ref tail) => {
                match observer.call(m, p) {
                    Result::Ok(()) => (),
                    e@Result::Err(_) => return e
                }
                observers = tail.deref();
            }
        }
    }
}

/// Add a new observer to the list.
#[inline]
fn add_observer<M>(observer: Grc<ObserverBox<M, ()>>, observers: Grc<ObserverList<M>>) -> ObserverList<M> {
    ObserverList::Cons(observer, observers)
}

/// Remove the specified observer from the list.
fn delete_observer<M>(observer: Grc<ObserverBox<M, ()>>, observers: &ObserverList<M>) -> ObserverList<M> {
    let init          = observers;
    let mut first     = ObserverList::Nil;
    let mut observers = observers;
    loop {
        match observers {
            &ObserverList::Nil => {
                return init.clone();
            },
            &ObserverList::Cons(ref head, ref tail) if Grc::ptr_eq(&observer, head) => {
                return append_rev_observers(&first, tail.clone());
            },
            &ObserverList::Cons(ref head, ref tail) => {
                first     = ObserverList::Cons(head.clone(), Grc::new(first));
                observers = tail.deref();
            }
        }
    }
}

/// Append the reversed observers to the list tail.
fn append_rev_observers<M>(observers: &ObserverList<M>, tail: Grc<ObserverList<M>>) -> ObserverList<M> {
    let mut result    = tail;
    let mut observers = observers;
    loop {
        match observers {
            &ObserverList::Nil => {
                return (*result).clone(); // it is almost impossible
            },
            &ObserverList::Cons(ref observer, ref tail) => {
                observers = tail.deref();
                let list  = ObserverList::Cons(observer.clone(), result);
                match observers {
                    &ObserverList::Nil => {
                        return list;
                    },
                    &ObserverList::Cons(_, _) => {
                        result = Grc::new(list);
                    }
                }
            }
        }
    }
}