use std::ops::DerefMut;
use std::sync::{Arc, RwLock, Weak};
use super::operation::DeleteOperation;
use crate::Opstamp;
#[derive(Default)]
struct InnerDeleteQueue {
writer: Vec<DeleteOperation>,
last_block: Weak<Block>,
}
#[derive(Clone)]
pub struct DeleteQueue {
inner: Arc<RwLock<InnerDeleteQueue>>,
}
impl DeleteQueue {
pub fn new() -> DeleteQueue {
DeleteQueue {
inner: Arc::default(),
}
}
fn get_last_block(&self) -> Arc<Block> {
{
let rlock = self.inner.read().unwrap();
if let Some(block) = rlock.last_block.upgrade() {
return block;
}
}
let mut wlock = self.inner.write().unwrap();
if let Some(block) = wlock.last_block.upgrade() {
return block;
}
let block = Arc::new(Block {
operations: Arc::new([]),
next: NextBlock::from(self.clone()),
});
wlock.last_block = Arc::downgrade(&block);
block
}
pub fn cursor(&self) -> DeleteCursor {
let last_block = self.get_last_block();
let operations_len = last_block.operations.len();
DeleteCursor {
block: last_block,
pos: operations_len,
}
}
pub fn push(&self, delete_operation: DeleteOperation) {
self.inner
.write()
.expect("Failed to acquire write lock on delete queue writer")
.writer
.push(delete_operation);
}
fn flush(&self) -> Option<Arc<Block>> {
let mut self_wlock = self
.inner
.write()
.expect("Failed to acquire write lock on delete queue writer");
if self_wlock.writer.is_empty() {
return None;
}
let delete_operations = std::mem::take(&mut self_wlock.writer);
let new_block = Arc::new(Block {
operations: Arc::from(delete_operations.into_boxed_slice()),
next: NextBlock::from(self.clone()),
});
self_wlock.last_block = Arc::downgrade(&new_block);
Some(new_block)
}
}
enum InnerNextBlock {
Writer(DeleteQueue),
Closed(Arc<Block>),
}
struct NextBlock(RwLock<InnerNextBlock>);
impl From<DeleteQueue> for NextBlock {
fn from(delete_queue: DeleteQueue) -> NextBlock {
NextBlock(RwLock::new(InnerNextBlock::Writer(delete_queue)))
}
}
impl NextBlock {
fn next_block(&self) -> Option<Arc<Block>> {
{
let next_read_lock = self
.0
.read()
.expect("Failed to acquire write lock in delete queue");
if let InnerNextBlock::Closed(ref block) = *next_read_lock {
return Some(Arc::clone(block));
}
}
let next_block;
{
let mut next_write_lock = self
.0
.write()
.expect("Failed to acquire write lock in delete queue");
match *next_write_lock {
InnerNextBlock::Closed(ref block) => {
return Some(Arc::clone(block));
}
InnerNextBlock::Writer(ref writer) => match writer.flush() {
Some(flushed_next_block) => {
next_block = flushed_next_block;
}
None => {
return None;
}
},
}
*next_write_lock.deref_mut() = InnerNextBlock::Closed(Arc::clone(&next_block));
Some(next_block)
}
}
}
struct Block {
operations: Arc<[DeleteOperation]>,
next: NextBlock,
}
#[derive(Clone)]
pub struct DeleteCursor {
block: Arc<Block>,
pos: usize,
}
impl DeleteCursor {
pub fn skip_to(&mut self, target_opstamp: Opstamp) {
while self.is_behind_opstamp(target_opstamp) {
self.advance();
}
}
#[allow(clippy::wrong_self_convention)]
fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool {
self.get()
.map(|operation| operation.opstamp < target_opstamp)
.unwrap_or(false)
}
fn load_block_if_required(&mut self) -> bool {
if self.pos >= self.block.operations.len() {
match self.block.next.next_block() {
Some(block) => {
self.block = block;
self.pos = 0;
true
}
None => false,
}
} else {
true
}
}
pub fn advance(&mut self) -> bool {
if self.load_block_if_required() {
self.pos += 1;
true
} else {
false
}
}
pub fn get(&mut self) -> Option<&DeleteOperation> {
if self.load_block_if_required() {
Some(&self.block.operations[self.pos])
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::{DeleteOperation, DeleteQueue};
use crate::query::{Explanation, Scorer, Weight};
use crate::{DocId, Score, SegmentReader};
struct DummyWeight;
impl Weight for DummyWeight {
fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result<Box<dyn Scorer>> {
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
}
fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result<Explanation> {
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
}
}
#[test]
fn test_deletequeue() {
let delete_queue = DeleteQueue::new();
let make_op = |i: usize| DeleteOperation {
opstamp: i as u64,
target: Box::new(DummyWeight),
};
delete_queue.push(make_op(1));
delete_queue.push(make_op(2));
let snapshot = delete_queue.cursor();
{
let mut operations_it = snapshot.clone();
assert_eq!(operations_it.get().unwrap().opstamp, 1);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 2);
operations_it.advance();
assert!(operations_it.get().is_none());
operations_it.advance();
let mut snapshot2 = delete_queue.cursor();
assert!(snapshot2.get().is_none());
delete_queue.push(make_op(3));
assert_eq!(snapshot2.get().unwrap().opstamp, 3);
assert_eq!(operations_it.get().unwrap().opstamp, 3);
assert_eq!(operations_it.get().unwrap().opstamp, 3);
operations_it.advance();
assert!(operations_it.get().is_none());
operations_it.advance();
}
{
let mut operations_it = snapshot;
assert_eq!(operations_it.get().unwrap().opstamp, 1);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 2);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 3);
operations_it.advance();
assert!(operations_it.get().is_none());
}
}
}