use std::time::Duration;
use sequenced_broadcast::{SequencedReceiver, SequencedSender};
use tokio::sync::mpsc::channel;
use tokio_util::sync::CancellationToken;
use crate::state::{DeterministicState, FlushedUpdater, FollowUpdater};
use super::task_and_cancel::TaskAndCancel;
pub struct FollowWorker<D: DeterministicState> {
updater: FollowUpdater<D>,
rx: SequencedReceiver<D::AuthorityAction>,
tx: SequencedSender<D::AuthorityAction>,
}
pub struct SequencedRxAndUpdater<D: DeterministicState> {
pub rx: SequencedReceiver<D::AuthorityAction>,
pub updater: FlushedUpdater<D>,
}
pub struct ValidSequencedRxAndUpdater<D: DeterministicState>(SequencedRxAndUpdater<D>);
impl<D: DeterministicState> SequencedRxAndUpdater<D> {
pub fn is_valid_pair(&self) -> bool {
self.rx.next_seq() == self.updater.accept_seq()
}
pub fn try_into_valid(self) -> Result<ValidSequencedRxAndUpdater<D>, SequencedRxAndUpdater<D>> {
if self.is_valid_pair() {
Ok(ValidSequencedRxAndUpdater(self))
} else {
Err(self)
}
}
}
impl<D: DeterministicState> FollowWorker<D> where D::AuthorityAction: Clone {
pub fn new(rx_and_updater: ValidSequencedRxAndUpdater<D>) -> (Self, SequencedReceiver<D::AuthorityAction>) {
let (ch_tx, ch_rx) = channel(2048);
let seq = rx_and_updater.0.updater.accept_seq();
let seq_tx = SequencedSender::new(seq, ch_tx);
let seq_rx = SequencedReceiver::new(seq, ch_rx);
(
FollowWorker {
updater: rx_and_updater.0.updater.into_follow(),
rx: rx_and_updater.0.rx,
tx: seq_tx,
},
seq_rx
)
}
pub fn spawn(mut self) -> TaskAndCancel<Self> {
TaskAndCancel::spawn(|cancel| async move {
self.run_till_canceled(cancel).await;
self
})
}
pub async fn run_till_canceled(&mut self, cancel: CancellationToken) {
loop {
tokio::task::yield_now().await;
let (seq, action) = tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(50)) => {
if self.updater.update_ready() {
self.updater.update();
}
continue;
}
_ = cancel.cancelled() => break,
msg_opt = self.rx.recv() => {
let Some(msg) = msg_opt else { break };
msg
}
};
let authority = self.updater.queue(seq, action);
self.tx.safe_send(seq, authority.clone()).await.unwrap();
let mut remaining = 128;
while let Ok((seq, action)) = self.rx.try_recv() {
let authority = self.updater.queue(seq, action);
self.tx.safe_send(seq, authority.clone()).await.unwrap();
if remaining == 0 {
break;
}
remaining -= 1;
}
if self.updater.update_ready() {
self.updater.update();
}
}
while let Ok((seq, action)) = self.rx.try_recv() {
let authority = self.updater.queue(seq, action);
self.tx.safe_send(seq, authority.clone()).await.unwrap();
}
if self.updater.update_ready() {
self.updater.update();
}
}
pub fn into_flushed(self) -> FlushedUpdater<D> {
self.updater.into_flushed()
}
}