use std::{
collections::BTreeMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
};
use log::warn;
use crate::{
core::orchestrator::{JobInfo, Orchestrator},
mpsc::{
channel::{Channel, ReceiverChannel, SenderChannel},
err::SenderError,
},
task::{Message, Task},
};
use super::Node;
pub trait In<TIn, TCollected>
where
TIn: Send,
{
fn run(&mut self, input: TIn);
fn finalize(self) -> Option<TCollected>;
fn is_ordered(&self) -> bool {
false
}
}
impl<TIn, TCollected, F> In<TIn, TCollected> for F
where
F: FnMut(TIn) -> TCollected,
TIn: Send,
{
fn run(&mut self, input: TIn) {
self(input);
}
fn finalize(self) -> Option<TCollected> {
None
}
}
pub struct InNode<TIn, TCollected>
where
TIn: Send,
{
channel: SenderChannel<Message<TIn>>,
ordered: bool,
storage: Mutex<BTreeMap<usize, Message<TIn>>>,
counter: AtomicUsize,
result: Arc<Mutex<Option<TCollected>>>,
job_info: JobInfo,
}
impl<TIn, TCollected> Node<TIn, TCollected> for InNode<TIn, TCollected>
where
TIn: Send + 'static,
TCollected: Send + 'static,
{
fn send(&self, input: Message<TIn>, rec_id: usize) -> Result<(), SenderError> {
let Message { op, order } = input;
match &op {
Task::New(_e) => {
if self.ordered && order != self.counter.load(Ordering::Acquire) {
self.save_to_storage(Message::new(op, rec_id), order);
self.send_pending();
} else {
let res = self.channel.send(Message::new(op, order));
if res.is_err() {
panic!("Error: Cannot send message!");
}
self.counter.fetch_add(1, Ordering::AcqRel);
}
}
Task::Dropped => {
if self.ordered && order != self.counter.load(Ordering::Acquire) {
self.save_to_storage(Message::new(op, order), order);
self.send_pending();
} else if self.ordered {
self.counter.fetch_add(1, Ordering::AcqRel);
}
}
Task::Terminate => {
if self.ordered && order != self.counter.load(Ordering::Acquire) {
self.save_to_storage(Message::new(op, order), order);
self.send_pending();
} else {
let res = self.channel.send(Message::new(op, order));
if res.is_err() {
panic!("Error: Cannot send message!");
}
}
}
}
Ok(())
}
fn collect(mut self) -> Option<TCollected> {
self.wait();
let tmp = self.result.lock();
if tmp.is_err() {
panic!("Error: Cannot collect results in.");
}
let mut res = tmp.unwrap();
if res.is_none() {
None
} else {
res.take()
}
}
fn get_num_of_replicas(&self) -> usize {
1
}
}
impl<TIn, TCollected> InNode<TIn, TCollected>
where
TIn: Send + 'static,
TCollected: Send + 'static,
{
pub fn new(
handler: Box<dyn In<TIn, TCollected> + Send + Sync>,
orchestrator: Arc<Orchestrator>,
) -> InNode<TIn, TCollected> {
let (channel_in, channel_out) =
Channel::channel(orchestrator.get_configuration().get_wait_policy());
let result = Arc::new(Mutex::new(None));
let ordered = handler.is_ordered();
let bucket = Arc::clone(&result);
let job_info = orchestrator
.push_jobs(vec![move || {
if let Some(res) = InNode::rts(handler, channel_in) {
match bucket.lock() {
Ok(mut lock_bucket) => {
*lock_bucket = Some(res);
}
Err(_) => panic!("Error: Cannot collect results."),
}
}
}])
.remove(0);
InNode {
job_info,
channel: channel_out,
ordered,
storage: Mutex::new(BTreeMap::new()),
counter: AtomicUsize::new(0),
result,
}
}
fn rts(
mut node: Box<dyn In<TIn, TCollected>>,
channel: ReceiverChannel<Message<TIn>>,
) -> Option<TCollected> {
loop {
let input = channel.receive();
match input {
Ok(Some(Message { op, order: _ })) => {
match op {
Task::New(arg) => {
node.run(arg);
}
Task::Dropped => {
}
Task::Terminate => {
break;
}
}
}
Ok(None) => (),
Err(e) => {
warn!("Error: {}", e);
}
}
}
node.finalize()
}
fn wait(&mut self) {
self.job_info.wait()
}
fn save_to_storage(&self, task: Message<TIn>, order: usize) {
let mtx = self.storage.lock();
match mtx {
Ok(mut queue) => {
queue.insert(order, task);
}
Err(_) => panic!("Error: Cannot lock the storage!"),
}
}
fn send_pending(&self) {
let mtx = self.storage.lock();
match mtx {
Ok(mut queue) => {
let mut c = self.counter.load(Ordering::Acquire);
while queue.contains_key(&c) {
let msg = queue.remove(&c).unwrap();
let Message { op, order } = msg;
match &op {
Task::New(_e) => {
let err = self.send(Message::new(op, c), order);
if err.is_err() {
panic!("Error: Cannot send message!");
}
}
Task::Dropped => {
let err = self.send(Message::new(op, c), 0);
if err.is_err() {
panic!("Error: Cannot send message!");
}
}
Task::Terminate => {
let err = self.send(Message::new(op, c), 0);
if err.is_err() {
panic!("Error: Cannot send message!");
}
}
}
c = self.counter.load(Ordering::Acquire);
}
}
Err(_) => panic!("Error: Cannot lock the storage!"),
}
}
}