use std::rc::Rc;
use std::marker::PhantomData;
use dvcompute_utils::simulation::arrival::*;
use dvcompute::simulation::simulation::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::observable::*;
use dvcompute::simulation::observable::observer::*;
use dvcompute::simulation::observable::disposable::*;
use dvcompute::simulation::stream::*;
use crate::simulation::block::*;
use crate::simulation::block::ops::*;
use crate::simulation::transact::*;
pub trait GeneratorBlock<T>: Block<Input = BlockFn<T, ()>, Output = ()> {}
#[inline]
pub fn observable_generator_block0<T, O>(observable: O) -> impl GeneratorBlock<Rc<Transact<T>>>
where O: Observable<Message = Rc<Arrival<T>>> + 'static,
T: Clone + 'static
{
observable_generator_block(observable, 0)
}
#[inline]
pub fn observable_generator_block<T, O>(observable: O, priority: isize) -> impl GeneratorBlock<Rc<Transact<T>>>
where O: Observable<Message = Rc<Arrival<T>>> + 'static,
T: Clone + 'static
{
observable_generator_block_c(observable, move || { return_event(priority) })
}
#[inline]
pub fn observable_generator_block_c<T, O, F, M>(observable: O, priority: F) -> impl GeneratorBlock<Rc<Transact<T>>>
where O: Observable<Message = Rc<Arrival<T>>> + 'static,
F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
ObservableGenerator { observable: observable, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
}
#[inline]
pub fn stream_generator_block0<T>(stream: Stream<Rc<Arrival<T>>>) -> impl GeneratorBlock<Rc<Transact<T>>>
where T: Clone + 'static
{
stream_generator_block(stream, 0)
}
#[inline]
pub fn stream_generator_block<T>(stream: Stream<Rc<Arrival<T>>>, priority: isize) -> impl GeneratorBlock<Rc<Transact<T>>>
where T: Clone + 'static
{
stream_generator_block_c(stream, move || { return_event(priority) })
}
#[inline]
pub fn stream_generator_block_c<T, F, M>(stream: Stream<Rc<Arrival<T>>>, priority: F) -> impl GeneratorBlock<Rc<Transact<T>>>
where F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
StreamGenerator { stream: stream, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
struct ObservableGenerator<T, O, F, M> {
observable: O,
priority: F,
_phantom1: PhantomData<M>,
_phantom2: PhantomData<T>
}
impl<T, O, F, M> Block for ObservableGenerator<T, O, F, M>
where O: Observable<Message = Rc<Arrival<T>>> + 'static,
F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
type Input = BlockFn<Rc<Transact<T>>, ()>;
type Output = ();
#[doc(hidden)]
fn call_block<C>(self, a: Self::Input, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Output>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_block_boxed(a, cont, pid, p)
}
#[doc(hidden)]
fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let ObservableGenerator { observable, priority, _phantom1, _phantom2 } = self;
let h = {
observable.subscribe({
cons_observer(move |arrival: &Rc<Arrival<T>>, p| {
let chain = a.clone();
priority()
.flat_map(move |priority| {
Transact::new(arrival.clone(), priority)
.into_event()
.flat_map(move |t| {
let t = Rc::new(t);
Transact::take(t.clone())
.flat_map(move |()| {
let block = chain.next();
block.run(t)
})
.run()
})
})
.call_event(p)
})
}).call_event(p)?
};
never_process()
.finally({
h.into_event().into_process()
})
.call_process_boxed(cont, pid, p)
}
}
}
impl<T, O, F, M> GeneratorBlock<Rc<Transact<T>>> for ObservableGenerator<T, O, F, M>
where O: Observable<Message = Rc<Arrival<T>>> + 'static,
F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{}
#[must_use = "computations are lazy and do nothing unless to be run"]
struct StreamGenerator<T, F, M> {
stream: Stream<Rc<Arrival<T>>>,
priority: F,
_phantom1: PhantomData<M>,
_phantom2: PhantomData<T>
}
impl<T, F, M> Block for StreamGenerator<T, F, M>
where F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
type Input = BlockFn<Rc<Transact<T>>, ()>;
type Output = ();
#[doc(hidden)]
fn call_block<C>(self, a: Self::Input, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Output>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_block_boxed(a, cont, pid, p)
}
#[doc(hidden)]
fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let StreamGenerator { stream, priority, _phantom1, _phantom2 } = self;
generator_stream_loop(stream, priority, a)
.call_process_boxed(cont, pid, p)
}
}
}
fn generator_stream_loop<T, F, M>(stream: Stream<Rc<Arrival<T>>>, priority: F, a: BlockFn<Rc<Transact<T>>, ()>) -> ProcessBox<()>
where F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
let Stream::Cons(comp) = stream;
comp.flat_map(move |(arrival, xs)| {
let chain = a.clone();
priority().flat_map(move |priority| {
Transact::new(arrival, priority)
.into_event()
.flat_map(move |t| {
let t = Rc::new(t);
Transact::take(t.clone())
.flat_map(move |()| {
let block = chain.next();
block.run(t)
})
.run()
})
})
.into_process()
.flat_map(move |()| {
generator_stream_loop(xs, priority, a)
})
}).into_boxed()
}
impl<T, F, M> GeneratorBlock<Rc<Transact<T>>> for StreamGenerator<T, F, M>
where F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{}