use std::collections::VecDeque;
use crate::RaftTypeConfig;
use crate::config::Config;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::command_scheduler::CommandScheduler;
use crate::engine::pending_responds::PendingResponds;
use crate::engine::respond_command::PendingRespond;
#[derive(Debug)]
pub(crate) struct EngineOutput<C, SM = ()>
where C: RaftTypeConfig
{
pub(crate) commands: VecDeque<Command<C, SM>>,
pub(crate) pending_responds: PendingResponds<C>,
}
impl<C, SM> Default for EngineOutput<C, SM>
where C: RaftTypeConfig
{
fn default() -> Self {
Self {
commands: VecDeque::new(),
pending_responds: PendingResponds::default(),
}
}
}
impl<C, SM> EngineOutput<C, SM>
where C: RaftTypeConfig
{
pub(crate) fn new(command_buffer_size: usize) -> Self {
let pending_capacity = 1024;
Self {
commands: VecDeque::with_capacity(command_buffer_size),
pending_responds: PendingResponds::new(pending_capacity),
}
}
pub(crate) fn len(&self) -> usize {
self.commands.len()
}
pub(crate) fn push_command(&mut self, cmd: Command<C, SM>) {
tracing::debug!("push command: {:?}", cmd);
self.commands.push_back(cmd)
}
pub(crate) fn postpone_command(&mut self, cmd: Command<C, SM>) -> Result<(), &'static str> {
tracing::debug!("postpone command: {:?}", cmd);
match cmd {
Command::Respond { when, resp } => {
let pending_responds = &mut self.pending_responds;
match when {
None => {
unreachable!("Respond command to postpone must have a condition");
}
Some(Condition::IOFlushed { io_id }) => {
pending_responds.on_log_io.push_back(PendingRespond::new(io_id, resp));
}
Some(Condition::LogFlushed { log_id }) => {
pending_responds.on_log_flush.push_back(PendingRespond::new(log_id, resp));
}
Some(Condition::Applied { log_id }) => {
pending_responds.on_apply.push_back(PendingRespond::new(log_id, resp));
}
Some(Condition::Snapshot { log_id }) => {
pending_responds.on_snapshot.push_back(PendingRespond::new(log_id, resp));
}
}
Ok(())
}
_ => {
self.commands.push_front(cmd);
Err("Put back to the front of command queue")
}
}
}
pub(crate) fn pop_command(&mut self) -> Option<Command<C, SM>> {
self.commands.pop_front()
}
pub(crate) fn iter_commands(&self) -> impl Iterator<Item = &Command<C, SM>> {
self.commands.iter()
}
#[cfg(test)]
pub(crate) fn take_commands(&mut self) -> Vec<Command<C, SM>> {
self.commands.drain(..).collect()
}
#[cfg(test)]
pub(crate) fn clear_commands(&mut self) {
self.commands.clear()
}
pub(crate) fn sched_commands(&mut self, config: &Config) {
let mut scheduler = CommandScheduler::new(config, self);
scheduler.merge_front_append_entries();
}
}