use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
use display_more::DisplayOptionExt;
use crate::RaftTypeConfig;
use crate::replication::inflight_append::InflightAppend;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
#[derive(Clone)]
pub(crate) struct InflightAppendQueue<C>
where C: RaftTypeConfig
{
queue: Arc<Mutex<VecDeque<InflightAppend<C>>>>,
}
impl<C> InflightAppendQueue<C>
where C: RaftTypeConfig
{
pub(crate) fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::with_capacity(32))),
}
}
pub(crate) fn push(&self, log_id: Option<LogIdOf<C>>) {
let mut q = self.queue.lock().unwrap();
let inflight = InflightAppend::new(log_id);
tracing::debug!("Inflight queue push: {}", inflight);
q.push_back(inflight)
}
pub(crate) fn drain_acked(&self, matching: &Option<LogIdOf<C>>) -> Option<InstantOf<C>> {
let mut q = self.queue.lock().unwrap();
tracing::debug!(
"Inflight queue drain_acked: matching: {}; data: {:?}",
matching.display(),
q.as_slices()
);
let mut last = None;
while let Some(first) = q.front() {
if matching >= &first.last_log_id {
last = Some(first.sending_time)
} else {
break;
}
q.pop_front();
}
last
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::testing::UTConfig;
use crate::engine::testing::log_id;
#[test]
fn test_new() {
let q = InflightAppendQueue::<UTConfig>::new();
assert_eq!(q.drain_acked(&None), None);
}
#[test]
fn test_push_and_drain_acked_none_matching() {
let q = InflightAppendQueue::<UTConfig>::new();
q.push(Some(log_id(1, 1, 5)));
q.push(Some(log_id(1, 1, 10)));
assert_eq!(q.drain_acked(&None), None);
assert_eq!(q.queue.lock().unwrap().len(), 2);
}
#[test]
fn test_drain_acked_partial() {
let q = InflightAppendQueue::<UTConfig>::new();
q.push(Some(log_id(1, 1, 5)));
q.push(Some(log_id(1, 1, 10)));
q.push(Some(log_id(1, 1, 15)));
let expected_time = q.queue.lock().unwrap()[1].sending_time;
let result = q.drain_acked(&Some(log_id(1, 1, 10)));
assert_eq!(result, Some(expected_time));
let deque = q.queue.lock().unwrap();
assert_eq!(deque.len(), 1);
assert_eq!(deque[0].last_log_id, Some(log_id(1, 1, 15)));
}
#[test]
fn test_drain_acked_all() {
let q = InflightAppendQueue::<UTConfig>::new();
q.push(Some(log_id(1, 1, 5)));
q.push(Some(log_id(1, 1, 10)));
let expected_time = q.queue.lock().unwrap()[1].sending_time;
let result = q.drain_acked(&Some(log_id(1, 1, 20)));
assert_eq!(result, Some(expected_time));
assert_eq!(q.queue.lock().unwrap().len(), 0);
}
#[test]
fn test_drain_acked_empty_queue() {
let q = InflightAppendQueue::<UTConfig>::new();
assert_eq!(q.drain_acked(&Some(log_id(1, 1, 10))), None);
}
#[test]
fn test_drain_acked_with_none_log_id() {
let q = InflightAppendQueue::<UTConfig>::new();
q.push(None);
q.push(Some(log_id(1, 1, 5)));
let expected_time = q.queue.lock().unwrap()[0].sending_time;
let result = q.drain_acked(&None);
assert_eq!(result, Some(expected_time));
let deque = q.queue.lock().unwrap();
assert_eq!(deque.len(), 1);
assert_eq!(deque[0].last_log_id, Some(log_id(1, 1, 5)));
}
}