use std::collections::VecDeque;
use std::sync::{Condvar, Mutex};
use crate::index::segment_worker::SegmentWorker;
const MAX_CONCURRENT_FLUSHES: usize = 4;
pub struct FlushControl {
inner: Mutex<FlushControlInner>,
stall_cvar: Condvar,
}
struct FlushControlInner {
flush_queue: VecDeque<SegmentWorker>,
flushing_count: usize,
}
impl Default for FlushControl {
fn default() -> Self {
Self::new()
}
}
impl FlushControl {
pub fn new() -> Self {
Self {
inner: Mutex::new(FlushControlInner {
flush_queue: VecDeque::new(),
flushing_count: 0,
}),
stall_cvar: Condvar::new(),
}
}
pub fn flush_completed(&self) {
let mut inner = self.inner.lock().unwrap();
inner.flushing_count = inner.flushing_count.saturating_sub(1);
drop(inner);
self.stall_cvar.notify_all();
}
pub fn wait_if_stalled(&self) {
let mut inner = self.inner.lock().unwrap();
while inner.flushing_count >= MAX_CONCURRENT_FLUSHES {
inner = self.stall_cvar.wait(inner).unwrap();
}
}
pub fn drain_pending(&self) -> Vec<SegmentWorker> {
self.inner.lock().unwrap().flush_queue.drain(..).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
impl FlushControl {
fn enqueue_for_flush(&self, worker: SegmentWorker) {
self.inner.lock().unwrap().flush_queue.push_back(worker);
}
fn next_pending_flush(&self) -> Option<SegmentWorker> {
let mut inner = self.inner.lock().unwrap();
if let Some(worker) = inner.flush_queue.pop_front() {
inner.flushing_count += 1;
Some(worker)
} else {
None
}
}
fn pending_count(&self) -> usize {
self.inner.lock().unwrap().flush_queue.len()
}
fn flushing_count(&self) -> usize {
self.inner.lock().unwrap().flushing_count
}
}
fn make_worker(name: &str) -> SegmentWorker {
use crate::store::memory::MemoryDirectory;
use std::sync::{Arc, Mutex};
let dir = Arc::new(Mutex::new(
Box::new(MemoryDirectory::new()) as Box<dyn crate::store::Directory>
));
SegmentWorker::new(name.to_string(), HashMap::new(), 0, dir)
}
#[test]
fn test_flush_control_default() {
let fc = FlushControl::default();
assert_eq!(fc.pending_count(), 0);
assert_eq!(fc.flushing_count(), 0);
}
#[test]
fn test_enqueue_and_dequeue() {
let fc = FlushControl::new();
fc.enqueue_for_flush(make_worker("_0"));
fc.enqueue_for_flush(make_worker("_1"));
assert_eq!(fc.pending_count(), 2);
let d0 = fc.next_pending_flush().unwrap();
assert_eq!(d0.segment_name(), "_0");
assert_eq!(fc.flushing_count(), 1);
let d1 = fc.next_pending_flush().unwrap();
assert_eq!(d1.segment_name(), "_1");
assert_eq!(fc.flushing_count(), 2);
assert!(fc.next_pending_flush().is_none());
}
#[test]
fn test_flush_completed_decrements() {
let fc = FlushControl::new();
fc.enqueue_for_flush(make_worker("_0"));
let _d = fc.next_pending_flush();
assert_eq!(fc.flushing_count(), 1);
fc.flush_completed();
assert_eq!(fc.flushing_count(), 0);
}
#[test]
fn test_drain_pending() {
let fc = FlushControl::new();
fc.enqueue_for_flush(make_worker("_0"));
fc.enqueue_for_flush(make_worker("_1"));
let drained = fc.drain_pending();
assert_eq!(drained.len(), 2);
assert_eq!(fc.pending_count(), 0);
}
}