mrecordlog 0.2.0

Quickwit's shared record log.
Documentation
use std::borrow::Cow;

use bytes::Buf;

use crate::MultiRecordLog;

fn read_all_records<'a>(multi_record_log: &'a MultiRecordLog, queue: &str) -> Vec<Cow<'a, [u8]>> {
    let mut records = Vec::new();
    let mut next_pos = u64::default();
    for (pos, payload) in multi_record_log.range(queue, next_pos..).unwrap() {
        assert_eq!(pos, next_pos);
        records.push(payload);
        next_pos += 1;
    }
    records
}

#[tokio::test]
async fn test_multi_record_log_new() {
    let tempdir = tempfile::tempdir().unwrap();
    MultiRecordLog::open(tempdir.path()).await.unwrap();
}

#[tokio::test]
async fn test_multi_record_log_create_queue() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue").await.unwrap();
        assert!(&multi_record_log.queue_exists("queue"));
    }
}

#[tokio::test]
async fn test_multi_record_log_create_queue_after_reopen() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue").await.unwrap();
    }
    {
        let multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        assert!(&multi_record_log.queue_exists("queue"));
    }
}

#[tokio::test]
async fn test_multi_record_log_simple() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue").await.unwrap();
        multi_record_log
            .append_record("queue", None, &b"hello"[..])
            .await
            .unwrap();
        multi_record_log
            .append_record("queue", None, &b"happy"[..])
            .await
            .unwrap();
        assert_eq!(
            &read_all_records(&multi_record_log, "queue"),
            &[b"hello".as_slice(), b"happy".as_slice()]
        );
        assert_eq!(&multi_record_log.list_file_numbers(), &[0]);
    }
}

#[tokio::test]
async fn test_multi_record_log_chained() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue").await.unwrap();
        multi_record_log
            .append_record(
                "queue",
                None,
                b"world".chain(&b" "[..]).chain(&b"order"[..]),
            )
            .await
            .unwrap();
        multi_record_log
            .append_record(
                "queue",
                None,
                b"nice"[..].chain(&b" "[..]).chain(&b"day"[..]),
            )
            .await
            .unwrap();
        assert_eq!(
            &read_all_records(&multi_record_log, "queue"),
            &[b"world order".as_slice(), b"nice day".as_slice()]
        );
        assert_eq!(&multi_record_log.list_file_numbers(), &[0]);
    }
}

#[tokio::test]
async fn test_multi_record_log_reopen() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue").await.unwrap();
        multi_record_log
            .append_record("queue", None, &b"hello"[..])
            .await
            .unwrap();
        multi_record_log
            .append_record("queue", None, &b"happy"[..])
            .await
            .unwrap();
    }
    {
        let multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        assert_eq!(
            &read_all_records(&multi_record_log, "queue"),
            &[b"hello".as_slice(), b"happy".as_slice()]
        );
        assert_eq!(&multi_record_log.list_file_numbers(), &[0]);
    }
}

#[tokio::test]
async fn test_multi_record_log() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue1").await.unwrap();
        multi_record_log.create_queue("queue2").await.unwrap();
        multi_record_log
            .append_record("queue1", None, &b"hello"[..])
            .await
            .unwrap();
        multi_record_log
            .append_record("queue2", None, &b"maitre"[..])
            .await
            .unwrap();
        multi_record_log
            .append_record("queue1", None, &b"happy"[..])
            .await
            .unwrap();
        multi_record_log
            .append_record("queue1", None, &b"tax"[..])
            .await
            .unwrap();
        multi_record_log
            .append_record("queue2", None, &b"corbeau"[..])
            .await
            .unwrap();
        assert_eq!(
            &read_all_records(&multi_record_log, "queue1"),
            &[b"hello".as_slice(), b"happy".as_slice(), b"tax".as_slice()]
        );
        assert_eq!(
            &read_all_records(&multi_record_log, "queue2"),
            &[b"maitre".as_slice(), b"corbeau".as_slice()]
        );
        assert_eq!(&multi_record_log.list_file_numbers(), &[0]);
    }
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log
            .append_record("queue1", None, &b"bubu"[..])
            .await
            .unwrap();
        assert_eq!(
            &read_all_records(&multi_record_log, "queue1"),
            &[
                b"hello".as_slice(),
                b"happy".as_slice(),
                b"tax".as_slice(),
                b"bubu".as_slice()
            ]
        );
        assert_eq!(&multi_record_log.list_file_numbers(), &[0]);
    }
}

#[tokio::test]
async fn test_multi_record_position_known_after_truncate() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue").await.unwrap();
        assert_eq!(
            multi_record_log
                .append_record("queue", None, &b"1"[..])
                .await
                .unwrap(),
            Some(0)
        );
    }
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        assert_eq!(
            multi_record_log
                .append_record("queue", None, &b"2"[..])
                .await
                .unwrap(),
            Some(1)
        );
        assert_eq!(&multi_record_log.list_file_numbers(), &[0]);
    }
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.truncate("queue", 1).await.unwrap();
        assert_eq!(&multi_record_log.list_file_numbers(), &[0]);
    }
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        assert_eq!(
            multi_record_log
                .append_record("queue", None, &b"hello"[..])
                .await
                .unwrap(),
            Some(2)
        );
    }
}

#[tokio::test]
async fn test_multi_insert_truncate() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue").await.unwrap();
        assert_eq!(
            multi_record_log
                .append_records(
                    "queue",
                    None,
                    [b"1", b"2", b"3", b"4"].into_iter().map(|r| r.as_slice())
                )
                .await
                .unwrap(),
            Some(3)
        );
        assert_eq!(
            &read_all_records(&multi_record_log, "queue"),
            &[
                b"1".as_slice(),
                b"2".as_slice(),
                b"3".as_slice(),
                b"4".as_slice()
            ]
        );

        multi_record_log.truncate("queue", 0).await.unwrap();
        assert_eq!(
            &multi_record_log
                .range("queue", ..)
                .unwrap()
                .map(|(_, payload)| payload)
                .collect::<Vec<_>>(),
            &[b"2".as_slice(), b"3".as_slice(), b"4".as_slice()]
        )
    }
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.truncate("queue", 1).await.unwrap();

        assert_eq!(
            &multi_record_log
                .range("queue", ..)
                .unwrap()
                .map(|(_, payload)| payload)
                .collect::<Vec<_>>(),
            &[b"3".as_slice(), b"4".as_slice()]
        )
    }
    {
        let multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        assert_eq!(
            &multi_record_log
                .range("queue", ..)
                .unwrap()
                .map(|(_, payload)| payload)
                .collect::<Vec<_>>(),
            &[b"3".as_slice(), b"4".as_slice()]
        )
    }
}

#[tokio::test]
async fn test_truncate_range_correct_pos() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        multi_record_log.create_queue("queue").await.unwrap();
        assert_eq!(
            multi_record_log
                .append_record("queue", None, &b"1"[..])
                .await
                .unwrap(),
            Some(0)
        );
        assert_eq!(
            multi_record_log
                .append_record("queue", None, &b"2"[..])
                .await
                .unwrap(),
            Some(1)
        );
        multi_record_log.truncate("queue", 1).await.unwrap();
        assert_eq!(
            multi_record_log
                .append_record("queue", None, &b"3"[..])
                .await
                .unwrap(),
            Some(2)
        );
        assert_eq!(
            multi_record_log
                .range("queue", ..)
                .unwrap()
                .collect::<Vec<_>>(),
            &[(2, Cow::Borrowed(&b"3"[..]))]
        );

        assert_eq!(
            multi_record_log
                .range("queue", 2..)
                .unwrap()
                .collect::<Vec<_>>(),
            &[(2, Cow::Borrowed(&b"3"[..]))]
        );

        use std::ops::Bound;
        assert_eq!(
            multi_record_log
                .range("queue", (Bound::Excluded(1), Bound::Unbounded))
                .unwrap()
                .collect::<Vec<_>>(),
            &[(2, Cow::Borrowed(&b"3"[..]))]
        );
    }
}

#[tokio::test]
async fn test_multi_record_size() {
    let tempdir = tempfile::tempdir().unwrap();
    {
        let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
        assert_eq!(multi_record_log.in_memory_size(), 0);
        multi_record_log.create_queue("queue").await.unwrap();
        let size_mem_create = multi_record_log.in_memory_size();
        assert!(size_mem_create > 0);
        multi_record_log
            .append_record("queue", None, &b"hello"[..])
            .await
            .unwrap();
        let size_mem_append = multi_record_log.in_memory_size();
        assert!(size_mem_append > size_mem_create);
        multi_record_log.truncate("queue", 0).await.unwrap();
        let size_mem_truncate = multi_record_log.in_memory_size();
        assert!(size_mem_truncate < size_mem_append);
    }
}