tantivy 0.19.1

Search engine library
Documentation
use std::ops::DerefMut;
use std::sync::{Arc, RwLock, Weak};

use super::operation::DeleteOperation;
use crate::Opstamp;

// The DeleteQueue is similar in conceptually to a multiple
// consumer single producer broadcast channel.
//
// All consumer will receive all messages.
//
// Consumer of the delete queue are holding a `DeleteCursor`,
// which points to a specific place of the `DeleteQueue`.
//
// New consumer can be created in two ways
// - calling `delete_queue.cursor()` returns a cursor, that will include all future delete operation
//   (and some or none of the past operations... The client is in charge of checking the opstamps.).
// - cloning an existing cursor returns a new cursor, that is at the exact same position, and can
//   now advance independently from the original cursor.
#[derive(Default)]
struct InnerDeleteQueue {
    writer: Vec<DeleteOperation>,
    last_block: Weak<Block>,
}

#[derive(Clone)]
pub struct DeleteQueue {
    inner: Arc<RwLock<InnerDeleteQueue>>,
}

impl DeleteQueue {
    // Creates a new delete queue.
    pub fn new() -> DeleteQueue {
        DeleteQueue {
            inner: Arc::default(),
        }
    }

    fn get_last_block(&self) -> Arc<Block> {
        {
            // try get the last block with simply acquiring the read lock.
            let rlock = self.inner.read().unwrap();
            if let Some(block) = rlock.last_block.upgrade() {
                return block;
            }
        }
        // It failed. Let's double check after acquiring the write, as someone could have called
        // `get_last_block` right after we released the rlock.
        let mut wlock = self.inner.write().unwrap();
        if let Some(block) = wlock.last_block.upgrade() {
            return block;
        }
        let block = Arc::new(Block {
            operations: Arc::new([]),
            next: NextBlock::from(self.clone()),
        });
        wlock.last_block = Arc::downgrade(&block);
        block
    }

    // Creates a new cursor that makes it possible to
    // consume future delete operations.
    //
    // Past delete operations are not accessible.
    pub fn cursor(&self) -> DeleteCursor {
        let last_block = self.get_last_block();
        let operations_len = last_block.operations.len();
        DeleteCursor {
            block: last_block,
            pos: operations_len,
        }
    }

    // Appends a new delete operations.
    pub fn push(&self, delete_operation: DeleteOperation) {
        self.inner
            .write()
            .expect("Failed to acquire write lock on delete queue writer")
            .writer
            .push(delete_operation);
    }

    // DeleteQueue is a linked list of blocks of
    // delete operations.
    //
    // Writing happens by simply appending to a vec.
    // `.flush()` takes this pending delete operations vec
    // creates a new read-only block from it,
    // and appends it to the linked list.
    //
    // `.flush()` happens when, for instance,
    // a consumer reaches the last read-only operations.
    // It then ask the delete queue if there happen to
    // be some unflushed operations.
    //
    fn flush(&self) -> Option<Arc<Block>> {
        let mut self_wlock = self
            .inner
            .write()
            .expect("Failed to acquire write lock on delete queue writer");

        if self_wlock.writer.is_empty() {
            return None;
        }

        let delete_operations = std::mem::take(&mut self_wlock.writer);

        let new_block = Arc::new(Block {
            operations: Arc::from(delete_operations.into_boxed_slice()),
            next: NextBlock::from(self.clone()),
        });

        self_wlock.last_block = Arc::downgrade(&new_block);
        Some(new_block)
    }
}

enum InnerNextBlock {
    Writer(DeleteQueue),
    Closed(Arc<Block>),
}

struct NextBlock(RwLock<InnerNextBlock>);

impl From<DeleteQueue> for NextBlock {
    fn from(delete_queue: DeleteQueue) -> NextBlock {
        NextBlock(RwLock::new(InnerNextBlock::Writer(delete_queue)))
    }
}

impl NextBlock {
    fn next_block(&self) -> Option<Arc<Block>> {
        {
            let next_read_lock = self
                .0
                .read()
                .expect("Failed to acquire write lock in delete queue");
            if let InnerNextBlock::Closed(ref block) = *next_read_lock {
                return Some(Arc::clone(block));
            }
        }
        let next_block;
        {
            let mut next_write_lock = self
                .0
                .write()
                .expect("Failed to acquire write lock in delete queue");
            match *next_write_lock {
                InnerNextBlock::Closed(ref block) => {
                    return Some(Arc::clone(block));
                }
                InnerNextBlock::Writer(ref writer) => match writer.flush() {
                    Some(flushed_next_block) => {
                        next_block = flushed_next_block;
                    }
                    None => {
                        return None;
                    }
                },
            }
            *next_write_lock.deref_mut() = InnerNextBlock::Closed(Arc::clone(&next_block));
            Some(next_block)
        }
    }
}

struct Block {
    operations: Arc<[DeleteOperation]>,
    next: NextBlock,
}

#[derive(Clone)]
pub struct DeleteCursor {
    block: Arc<Block>,
    pos: usize,
}

impl DeleteCursor {
    /// Skips operations and position it so that
    /// - either all of the delete operation currently in the queue are consume and the next get
    ///   will return `None`.
    /// - the next get will return the first operation with an
    /// `opstamp >= target_opstamp`.
    pub fn skip_to(&mut self, target_opstamp: Opstamp) {
        // TODO Can be optimize as we work with block.
        while self.is_behind_opstamp(target_opstamp) {
            self.advance();
        }
    }

    #[allow(clippy::wrong_self_convention)]
    fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool {
        self.get()
            .map(|operation| operation.opstamp < target_opstamp)
            .unwrap_or(false)
    }

    /// If the current block has been entirely
    /// consumed, try to load the next one.
    ///
    /// Return `true`, if after this attempt,
    /// the cursor is on a block that has not
    /// been entirely consumed.
    /// Return `false`, if we have reached the end of the queue.
    fn load_block_if_required(&mut self) -> bool {
        if self.pos >= self.block.operations.len() {
            // we have consumed our operations entirely.
            // let's ask our writer if he has more for us.
            // self.go_next_block();
            match self.block.next.next_block() {
                Some(block) => {
                    self.block = block;
                    self.pos = 0;
                    true
                }
                None => false,
            }
        } else {
            true
        }
    }

    /// Advance to the next delete operation.
    /// Returns true if and only if there is such an operation.
    pub fn advance(&mut self) -> bool {
        if self.load_block_if_required() {
            self.pos += 1;
            true
        } else {
            false
        }
    }

    /// Get the current delete operation.
    /// Calling `.get` does not advance the cursor.
    pub fn get(&mut self) -> Option<&DeleteOperation> {
        if self.load_block_if_required() {
            Some(&self.block.operations[self.pos])
        } else {
            None
        }
    }
}

#[cfg(test)]
mod tests {

    use super::{DeleteOperation, DeleteQueue};
    use crate::query::{Explanation, Scorer, Weight};
    use crate::{DocId, Score, SegmentReader};

    struct DummyWeight;
    impl Weight for DummyWeight {
        fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result<Box<dyn Scorer>> {
            Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
        }

        fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result<Explanation> {
            Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
        }
    }

    #[test]
    fn test_deletequeue() {
        let delete_queue = DeleteQueue::new();

        let make_op = |i: usize| DeleteOperation {
            opstamp: i as u64,
            target: Box::new(DummyWeight),
        };

        delete_queue.push(make_op(1));
        delete_queue.push(make_op(2));

        let snapshot = delete_queue.cursor();
        {
            let mut operations_it = snapshot.clone();
            assert_eq!(operations_it.get().unwrap().opstamp, 1);
            operations_it.advance();
            assert_eq!(operations_it.get().unwrap().opstamp, 2);
            operations_it.advance();
            assert!(operations_it.get().is_none());
            operations_it.advance();

            let mut snapshot2 = delete_queue.cursor();
            assert!(snapshot2.get().is_none());
            delete_queue.push(make_op(3));
            assert_eq!(snapshot2.get().unwrap().opstamp, 3);
            assert_eq!(operations_it.get().unwrap().opstamp, 3);
            assert_eq!(operations_it.get().unwrap().opstamp, 3);
            operations_it.advance();
            assert!(operations_it.get().is_none());
            operations_it.advance();
        }
        {
            let mut operations_it = snapshot;
            assert_eq!(operations_it.get().unwrap().opstamp, 1);
            operations_it.advance();
            assert_eq!(operations_it.get().unwrap().opstamp, 2);
            operations_it.advance();
            assert_eq!(operations_it.get().unwrap().opstamp, 3);
            operations_it.advance();
            assert!(operations_it.get().is_none());
        }
    }
}