use crate::RaftTypeConfig;
use crate::batch::Batch;
use crate::config::Config;
use crate::engine::Command;
use crate::engine::engine_output::EngineOutput;
pub(crate) struct CommandScheduler<'a, C, SM = ()>
where C: RaftTypeConfig
{
config: &'a Config,
output: &'a mut EngineOutput<C, SM>,
}
impl<'a, C, SM> CommandScheduler<'a, C, SM>
where C: RaftTypeConfig
{
pub(crate) fn new(config: &'a Config, output: &'a mut EngineOutput<C, SM>) -> Self {
Self { config, output }
}
pub(crate) fn merge_front_append_entries(&mut self) {
let max_entries = self.config.max_append_entries();
let commands = &mut self.output.commands;
let Some(c) = commands.pop_front() else {
return;
};
let Command::AppendEntries {
committed_vote,
mut entries,
} = c
else {
commands.push_front(c);
return;
};
let mut n = 0;
while let Some(next) = commands.pop_front() {
match next {
Command::AppendEntries {
committed_vote: next_vote,
entries: next_entries,
} if next_vote == committed_vote && entries.len() as u64 + next_entries.len() as u64 <= max_entries => {
n += 1;
entries.extend(next_entries);
}
_ => {
commands.push_front(next);
break;
}
}
}
tracing::debug!(
"CommandScheduler: merged {} AppendEntries, entries size: {}",
n,
entries.len()
);
commands.push_front(Command::AppendEntries {
committed_vote,
entries,
});
}
}
#[cfg(test)]
mod tests {
use super::CommandScheduler;
use crate::batch::Batch;
use crate::config::Config;
use crate::engine::Command;
use crate::engine::engine_output::EngineOutput;
use crate::engine::testing::UTConfig;
use crate::engine::testing::UTLeaderId;
use crate::impls::Vote;
use crate::testing::blank_ent;
use crate::vote::raft_vote::RaftVoteExt;
type C = UTConfig;
fn committed_vote(term: u64, node_id: u64) -> crate::vote::committed::CommittedVote<UTLeaderId> {
Vote::<UTLeaderId>::new(term, node_id).into_committed()
}
fn config_with_max_append(max_append_entries: u64) -> Config {
Config {
max_append_entries: Some(max_append_entries),
..Default::default()
}
}
#[test]
fn test_merge_empty_queue() {
let config = Config::default();
let mut output: EngineOutput<C> = EngineOutput::new(8);
let mut scheduler = CommandScheduler::new(&config, &mut output);
scheduler.merge_front_append_entries();
assert!(output.commands.is_empty());
}
#[test]
fn test_merge_single_command() {
let config = Config::default();
let mut output: EngineOutput<C> = EngineOutput::new(8);
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 1)]),
});
let mut scheduler = CommandScheduler::new(&config, &mut output);
scheduler.merge_front_append_entries();
assert_eq!(output.commands.len(), 1);
}
#[test]
fn test_merge_front_not_append_entries() {
let config = Config::default();
let mut output: EngineOutput<C> = EngineOutput::new(8);
output.push_command(Command::SaveVote { vote: Vote::new(1, 0) });
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 1)]),
});
let mut scheduler = CommandScheduler::new(&config, &mut output);
scheduler.merge_front_append_entries();
assert_eq!(output.commands.len(), 2);
}
#[test]
fn test_merge_consecutive_same_vote() {
let config = Config::default();
let mut output: EngineOutput<C> = EngineOutput::new(8);
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 1)]),
});
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 2)]),
});
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 3)]),
});
let mut scheduler = CommandScheduler::new(&config, &mut output);
scheduler.merge_front_append_entries();
assert_eq!(output.commands.len(), 1);
if let Command::AppendEntries { entries, .. } = &output.commands[0] {
assert_eq!(entries.len(), 3);
} else {
panic!("Expected AppendEntries");
}
}
#[test]
fn test_merge_stops_at_different_vote() {
let config = Config::default();
let mut output: EngineOutput<C> = EngineOutput::new(8);
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 1)]),
});
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(2, 0), entries: Batch::of([blank_ent::<UTConfig>(2, 0, 2)]),
});
let mut scheduler = CommandScheduler::new(&config, &mut output);
scheduler.merge_front_append_entries();
assert_eq!(output.commands.len(), 2);
}
#[test]
fn test_merge_stops_at_non_append_entries() {
let config = Config::default();
let mut output: EngineOutput<C> = EngineOutput::new(8);
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 1)]),
});
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 2)]),
});
output.push_command(Command::SaveVote { vote: Vote::new(1, 0) });
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 3)]),
});
let mut scheduler = CommandScheduler::new(&config, &mut output);
scheduler.merge_front_append_entries();
assert_eq!(output.commands.len(), 3);
if let Command::AppendEntries { entries, .. } = &output.commands[0] {
assert_eq!(entries.len(), 2);
} else {
panic!("Expected AppendEntries");
}
}
#[test]
fn test_merge_stops_at_max_entries() {
let config = config_with_max_append(3);
let mut output: EngineOutput<C> = EngineOutput::new(8);
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 1), blank_ent::<UTConfig>(1, 0, 2)]),
});
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 3)]),
});
output.push_command(Command::AppendEntries {
committed_vote: committed_vote(1, 0),
entries: Batch::of([blank_ent::<UTConfig>(1, 0, 4)]),
});
let mut scheduler = CommandScheduler::new(&config, &mut output);
scheduler.merge_front_append_entries();
assert_eq!(output.commands.len(), 2);
if let Command::AppendEntries { entries, .. } = &output.commands[0] {
assert_eq!(entries.len(), 3);
} else {
panic!("Expected AppendEntries");
}
}
}