dvcompute 2.0.0

Discrete event simulation library (sequential simulation)
Documentation
// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::marker::PhantomData;

use crate::simulation;
use crate::simulation::Point;
use crate::simulation::ref_comp::RefComp;
use crate::simulation::simulation::*;
use crate::simulation::event::*;
use crate::simulation::process::*;
use crate::simulation::strategy::*;
use crate::simulation::resource::*;

use dvcompute_utils::grc::Grc;

/// The unbounded queues that gather their statistics when simulating.
pub mod stats;

/// A type synonym for the ordinary FIFO queue, also known as the FCFS
/// (First Come - First Serviced) queue.
pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, T>;

/// A type synonym for the ordinary LIFO queue, also known as the LCFS
/// (Last Come - First Serviced) queue.
pub type LCFSQueue<T> = Queue<LCFSStrategy, FCFSStrategy, T>;

/// Represents an optimized unbounded queue by using the specified strategies for internal storing (in memory), `SM`,
/// and dequeueing (output), `SO`, where `T` denotes the type of items stored in the queue.
pub struct Queue<SM, SO, T>
    where SM: QueueStrategy,
          SO: QueueStrategy + 'static
{
    /// The queue store.
    queue_store: QueueStorageBox<T, SM::Priority>,

    /// The dequeue resource.
    dequeue_resource: Grc<Resource<SO>>,

    /// The queue size.
    count: RefComp<isize>
}

/// Create a new unbounded FCFS (a.k.a FIFO) queue by the specified capacity.
#[inline]
pub fn new_fcfs_queue<T>() -> NewQueue<FCFSStrategy, FCFSStrategy, T>
    where T: 'static
{
    NewQueue {
        storing_strategy: FCFSStrategy::Instance,
        dequeue_strategy: FCFSStrategy::Instance,
        _phantom: PhantomData
    }
}

/// Create a new unbounded LCFS (a.k.a LIFO) queue by the specified capacity.
#[inline]
pub fn new_lcfs_queue<T>() -> NewQueue<LCFSStrategy, FCFSStrategy, T>
    where T: 'static
{
    NewQueue {
        storing_strategy: LCFSStrategy::Instance,
        dequeue_strategy: FCFSStrategy::Instance,
        _phantom: PhantomData
    }
}

impl<SM, SO, T> Queue<SM, SO, T>
    where SM: QueueStrategy + 'static,
          SO: QueueStrategy + 'static,
          T: Clone + 'static
{
    /// Create a new unbounded queue by the specified strategies.
    #[inline]
    pub fn new(storing_strategy: SM, dequeue_strategy: SO) -> NewQueue<SM, SO, T> {
        NewQueue {
            storing_strategy: storing_strategy,
            dequeue_strategy: dequeue_strategy,
            _phantom: PhantomData
        }
    }

    /// Test whether the queue is empty.
    #[inline]
    pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
        cons_event(move |p| {
            Result::Ok(queue.count.read_at(p) == 0)
        })
    }

    /// Return the current queue size.
    #[inline]
    pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
        cons_event(move |p| {
            Result::Ok(queue.count.read_at(p))
        })
    }

    /// Dequeue by suspending the process if the queue is empty.
    pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
        request_resource(queue.dequeue_resource.clone())
            .and_then(move |()| {
                cons_event(move |p| {
                    queue.dequeue_extract(p)
                })
                .into_process()
            })
    }

    /// Dequeue with output prioerity by suspending the process if the queue is empty.
    pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
        where SO::Priority: Clone
    {
        request_resource_with_priority(queue.dequeue_resource.clone(), po)
            .and_then(move |()| {
                cons_event(move |p| {
                    queue.dequeue_extract(p)
                })
                .into_process()
            })
    }

    /// Try to dequeue immediately.
    pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
        try_request_resource_within_event(queue.dequeue_resource.clone())
            .and_then(move |f| {
                if f {
                    cons_event(move |p| {
                        let x = queue.dequeue_extract(p)?;
                        Result::Ok(Some(x))
                    }).into_boxed()
                } else {
                    return_event(None)
                        .into_boxed()
                }
            })
    }

    /// Remove the item from the queue and return a flag indicating
    /// whether the item was found and actually removed.
    pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
        where T: PartialEq
    {
        let pred = move |x: &T| { *x == item };
        Queue::delete_by(queue, pred)
            .map(|x| { x.is_some() })
    }

    /// Remove the specified item from the queue.
    pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
        where T: PartialEq
    {
        let pred = move |x: &T| { *x == item };
        Queue::delete_by(queue, pred)
            .map(|_| ())
    }

    /// Remove an item satisfying the specified predicate and return the item if found.
    pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
        where F: Fn(&T) -> bool + 'static
    {
        try_request_resource_within_event(queue.dequeue_resource.clone())
            .and_then(move |f| {
                if f {
                    cons_event(move |p| {
                        let pred = move |x: &T| { pred(x) };
                        let pred = Box::new(pred);
                        match queue.queue_store.remove_boxed_by(pred, p) {
                            None => {
                                release_resource_within_event(queue.dequeue_resource.clone())
                                    .call_event(p)?;
                                Result::Ok(None)
                            },
                            Some(i) => {
                                let x = queue.dequeue_post_extract(i, p)?;
                                Result::Ok(Some(x))
                            }
                        }
                    }).into_boxed()
                } else {
                    return_event(None)
                        .into_boxed()
                }
            })
    }

    /// Test whether there is an item satisfying the specified predicate.
    pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
        where F: Fn(&T) -> bool + 'static
    {
        cons_event(move |p| {
            let pred = move |x: &T| { pred(x) };
            let pred = Box::new(pred);
            Result::Ok(queue.queue_store.exists_boxed(pred, p))
        })
    }

    /// Find an item satisfying the specified predicate.
    pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
        where F: Fn(&T) -> bool + 'static,
              T: Clone
    {
        cons_event(move |p| {
            let pred = move |x: &T| { pred(x) };
            let pred = Box::new(pred);
            Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.clone() }))
        })
    }

    /// Clear the queue.
    pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
        cons_event(move |p| {
            loop {
                let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
                match x {
                    None => return Result::Ok(()),
                    Some(_) => {}
                }
            }
        })
    }

    /// Enqueue the item.
    #[inline]
    pub fn enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
        cons_event(move |p| {
            queue.enqueue_store(item, p)
        })
    }

    /// Enqueue the item with storing priority.
    #[inline]
    pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()>
        where SM::Priority: Clone
    {
        cons_event(move |p| {
            queue.enqueue_store_with_priority(pm, item, p)
        })
    }

    /// Extract an item by the dequeue request.
    fn dequeue_extract(&self, p: &Point) -> simulation::Result<T> {
        let i = self.queue_store.pop(p).unwrap();
        self.dequeue_post_extract(i, p)
    }

    /// A post action after extracting the item by the dequeue request.
    fn dequeue_post_extract(&self, i: T, p: &Point) -> simulation::Result<T> {
        let c  = self.count.read_at(p);
        let c2 = c - 1;
        self.count.write_at(c2, p);
        Result::Ok(i)
    }

    /// Store the item.
    fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
        self.queue_store.push(item, p);
        let c  = self.count.read_at(p);
        let c2 = c + 1;
        self.count.write_at(c2, p);
        release_resource_within_event(self.dequeue_resource.clone())
            .call_event(p)
    }

    /// Store the item with priority.
    fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
        self.queue_store.push_with_priority(pm, item, p);
        let c  = self.count.read_at(p);
        let c2 = c + 1;
        self.count.write_at(c2, p);
        release_resource_within_event(self.dequeue_resource.clone())
            .call_event(p)
    }
}

/// Computation that creates a new `Queue`.
#[derive(Clone)]
pub struct NewQueue<SM, SO, T> {

    /// The storing strategy.
    storing_strategy: SM,

    /// The output strategy.
    dequeue_strategy: SO,

    /// To keep the type parameter.
    _phantom: PhantomData<T>
}

impl<SM, SO, T> Event for NewQueue<SM, SO, T>
    where SM: QueueStrategy,
          SO: QueueStrategy + 'static,
          T: 'static
{
    type Item = Queue<SM, SO, T>;

    #[doc(hidden)]
    #[inline]
    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
        let NewQueue { storing_strategy, dequeue_strategy, _phantom } = self;
        let queue_store = storing_strategy.new_storage();
        let dequeue_resource = {
            Resource::<SO>::new_with_max_count(dequeue_strategy, 0, None)
                .call_simulation(p.run)?
        };
        Result::Ok(Queue {
            queue_store: queue_store,
            dequeue_resource: Grc::new(dequeue_resource),
            count: RefComp::new(0),
        })
    }
}