use std::{mem, sync::Arc, time::Duration};
use crossbeam_queue::SegQueue;
use futures_channel::mpsc::UnboundedReceiver;
use futures_util::{Future, Stream, StreamExt};
use log::{debug, error, trace};
use sm::{sm, Event};
use crate::{
channel::command::Command,
channel::retry::Retry,
channel::state::worker::{Variant::*, *},
contracts::Envelope,
timeout,
transmitter::{Response, Transmitter},
};
sm! {
worker {
InitialStates { Receiving }
TimeoutExpired {
Receiving => Sending,
Waiting => Sending
}
FlushRequested {
Receiving => Sending
}
CloseRequested {
Receiving => Sending,
Waiting => Stopped
}
ItemsSentAndContinue {
Sending => Receiving
}
ItemsSentAndStop {
Sending => Stopped
}
RetryRequested {
Sending => Waiting
}
RetryExhausted {
Waiting => Receiving
}
TerminateRequested {
Receiving => Stopped,
Sending => Stopped,
Waiting => Stopped
}
}
}
pub struct Worker {
transmitter: Transmitter,
items: Arc<SegQueue<Envelope>>,
command_receiver: UnboundedReceiver<Command>,
interval: Duration,
}
impl Worker {
pub fn new(
transmitter: Transmitter,
items: Arc<SegQueue<Envelope>>,
command_receiver: UnboundedReceiver<Command>,
interval: Duration,
) -> Self {
Self {
transmitter,
items,
command_receiver,
interval,
}
}
pub async fn run(mut self) {
let mut state = Machine::new(Receiving).as_enum();
let mut items: Vec<Envelope> = Default::default();
let mut retry = Retry::default();
loop {
state = match state {
InitialReceiving(m) => self.handle_receiving(m, &mut items).await,
ReceivingByItemsSentAndContinue(m) => self.handle_receiving(m, &mut items).await,
ReceivingByRetryExhausted(m) => self.handle_receiving(m, &mut items).await,
SendingByTimeoutExpired(m) => self.handle_sending_with_retry(m, &mut items, &mut retry).await,
SendingByFlushRequested(m) => self.handle_sending_with_retry(m, &mut items, &mut retry).await,
SendingByCloseRequested(m) => self.handle_sending_once_and_terminate(m, &mut items, &mut retry).await,
WaitingByRetryRequested(m) => self.handle_waiting(m, &mut retry).await,
StoppedByItemsSentAndStop(_) => break,
StoppedByCloseRequested(_) => break,
StoppedByTerminateRequested(_) => break,
}
}
}
async fn handle_receiving<E: Event>(&mut self, m: Machine<Receiving, E>, items: &mut Vec<Envelope>) -> Variant {
debug!("Receiving messages triggered by {:?}", m.trigger());
let timeout = timeout::sleep(self.interval);
items.clear();
loop {
tokio::select! {
command = self.command_receiver.next() => {
match command {
Some(command) => {
trace!("Command received: {}", command);
match command {
Command::Flush => return m.transition(FlushRequested).as_enum(),
Command::Terminate => return m.transition(TerminateRequested).as_enum(),
Command::Close => return m.transition(CloseRequested).as_enum(),
}
},
None => {
error!("commands channel closed");
return m.transition(TerminateRequested).as_enum()
},
}
},
_ = timeout => {
debug!("Timeout expired");
return m.transition(TimeoutExpired).as_enum()
},
}
}
}
async fn handle_sending_with_retry<E: Event>(
&mut self,
m: Machine<Sending, E>,
items: &mut Vec<Envelope>,
retry: &mut Retry,
) -> Variant {
*retry = Retry::exponential();
self.handle_sending(m, items).await
}
async fn handle_sending_once_and_terminate<E: Event>(
&mut self,
m: Machine<Sending, E>,
items: &mut Vec<Envelope>,
retry: &mut Retry,
) -> Variant {
*retry = Retry::once();
let cloned = m.clone(); self.handle_sending(m, items).await;
cloned.transition(TerminateRequested).as_enum()
}
async fn handle_sending<E: Event>(&mut self, m: Machine<Sending, E>, items: &mut Vec<Envelope>) -> Variant {
while let Some(item) = self.items.pop() {
items.push(item);
}
debug!(
"Sending {} telemetry items triggered by {:?}",
items.len(),
m.trigger().unwrap()
);
if items.is_empty() {
debug!("Nothing to send. Continue to wait");
m.transition(ItemsSentAndContinue).as_enum()
} else {
match self.transmitter.send(mem::take(items)).await {
Ok(Response::Success) => m.transition(ItemsSentAndContinue).as_enum(),
Ok(Response::Retry(retry_items)) => {
*items = retry_items;
m.transition(RetryRequested).as_enum()
}
Ok(Response::Throttled(_retry_after, retry_items)) => {
*items = retry_items;
m.transition(RetryRequested).as_enum()
}
Ok(Response::NoRetry) => m.transition(ItemsSentAndContinue).as_enum(),
Err(err) => {
debug!("Error occurred during sending telemetry items: {}", err);
m.transition(RetryRequested).as_enum()
}
}
}
}
async fn handle_waiting<E: Event>(&mut self, m: Machine<Waiting, E>, retry: &mut Retry) -> Variant {
if let Some(timeout) = retry.next() {
debug!(
"Waiting for retry timeout {:?} or stop command triggered by {:?}",
timeout,
m.state()
);
let timeout = timeout::sleep(timeout);
tokio::select! {
command = skip_flush(&mut self.command_receiver) => {
match command {
Some(Command::Terminate) => m.transition(TerminateRequested).as_enum(),
Some(Command::Close) => m.transition(CloseRequested).as_enum(),
Some(Command::Flush) => panic!("whoops Flush is not supported here"),
None => {
error!("commands channel closed");
m.transition(TerminateRequested).as_enum()
}
}
},
_ = timeout => {
debug!("Retry timeout expired");
m.transition(TimeoutExpired).as_enum()
},
}
} else {
debug!("All retries exhausted by {:?}", m.state());
m.transition(RetryExhausted).as_enum()
}
}
}
fn skip_flush<St>(stream: &mut St) -> SkipFlush<'_, St> {
SkipFlush { stream }
}
struct SkipFlush<'a, St: ?Sized> {
stream: &'a mut St,
}
impl<St: ?Sized + Stream<Item = Command> + Unpin> Future for SkipFlush<'_, St> {
type Output = Option<St::Item>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
match self.stream.poll_next_unpin(cx) {
std::task::Poll::Ready(Some(Command::Flush)) => std::task::Poll::Pending,
std::task::Poll::Ready(command) => std::task::Poll::Ready(command),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}