chunked-wal 0.2.0

Chunked write-ahead log implementation
Documentation
use std::collections::BTreeMap;
use std::io;

use crate::ChunkId;
use crate::WALRecord;
use crate::WalTypes;
use crate::chunk::closed_chunk::ClosedChunk;
use crate::types::Segment;

#[derive(Debug, Clone)]
pub struct ClosedChunkReader<W>
where W: WalTypes
{
    chunks: BTreeMap<ChunkId, ClosedChunk<W>>,
}

impl<W> ClosedChunkReader<W>
where W: WalTypes
{
    pub(crate) fn new(chunks: BTreeMap<ChunkId, ClosedChunk<W>>) -> Self {
        Self { chunks }
    }

    pub fn read_record(
        &self,
        chunk_id: ChunkId,
        segment: Segment,
    ) -> Result<WALRecord<W>, io::Error> {
        let closed = self.chunks.get(&chunk_id).ok_or_else(|| {
            io::Error::new(
                io::ErrorKind::NotFound,
                format!("Chunk not found: {}", chunk_id),
            )
        })?;

        closed.chunk.read_record(segment)
    }
}

#[cfg(test)]
mod tests {
    use std::collections::BTreeMap;
    use std::io;
    use std::os::unix::fs::FileExt;
    use std::sync::Arc;
    use std::sync::mpsc::SyncSender;

    use codeq::Decode;
    use codeq::Encode;

    use crate::Chunk;
    use crate::ChunkId;
    use crate::Config;
    use crate::WALRecord;
    use crate::WalTypes;
    use crate::chunk::closed_chunk::ClosedChunk;
    use crate::chunk::open_chunk::OpenChunk;
    use crate::types::Segment;
    use crate::wal::closed_chunk_reader::ClosedChunkReader;

    const TEST_ACTION_TYPE: u32 = 1;

    #[derive(Debug, Clone, PartialEq, Eq)]
    struct TestAction(String);

    impl Encode for TestAction {
        fn encode<W: io::Write>(&self, mut w: W) -> Result<usize, io::Error> {
            let mut n = TEST_ACTION_TYPE.encode(&mut w)?;
            n += self.0.encode(&mut w)?;
            Ok(n)
        }

        fn type_id(&self) -> Option<u32> {
            Some(TEST_ACTION_TYPE)
        }
    }

    impl Decode for TestAction {
        fn decode<R: io::Read>(mut r: R) -> Result<Self, io::Error> {
            let type_id = u32::decode(&mut r)?;
            if type_id != TEST_ACTION_TYPE {
                return Err(io::Error::new(
                    io::ErrorKind::InvalidData,
                    format!("unexpected action type id {}", type_id),
                ));
            }

            Ok(Self(String::decode(&mut r)?))
        }
    }

    #[derive(Debug, Default, Clone, PartialEq, Eq)]
    struct TestWal;

    impl WalTypes for TestWal {
        type Action = TestAction;
        type Checkpoint = String;
        type Callback = SyncSender<Result<(), io::Error>>;
    }

    fn action(v: &str) -> WALRecord<TestWal> {
        WALRecord::Action(TestAction(v.to_string()))
    }

    fn build_reader() -> Result<(ClosedChunkReader<TestWal>, Segment), io::Error>
    {
        let td = tempfile::tempdir()?;
        let config = Config::new(td.path().to_str().unwrap());
        let config = Arc::new(config);
        let chunk_id = ChunkId(0);

        let mut open = OpenChunk::<WALRecord<TestWal>>::create(
            config.clone(),
            chunk_id,
            WALRecord::Checkpoint(String::new()),
        )?;
        open.append_record(&action("val"))?;
        let data = open.take_pending_data();
        let offset = open.chunk.f.metadata()?.len();
        open.chunk.f.write_all_at(&data, offset)?;

        let (chunk, records) = Chunk::<WALRecord<TestWal>>::open_with_truncate(
            config, chunk_id, true,
        )?;
        assert_eq!(
            vec![WALRecord::Checkpoint(String::new()), action("val"),],
            records
        );

        let segment = chunk.record_segment(1);

        let chunks = BTreeMap::from([(
            chunk_id,
            ClosedChunk::new(chunk, Arc::new("val".to_string())),
        )]);

        Ok((ClosedChunkReader::new(chunks), segment))
    }

    #[test]
    fn test_read_record() -> Result<(), io::Error> {
        let (reader, segment) = build_reader()?;

        assert_eq!(action("val"), reader.read_record(ChunkId(0), segment)?);

        Ok(())
    }

    #[test]
    fn test_read_record_returns_not_found_for_missing_chunk()
    -> Result<(), io::Error> {
        let (reader, segment) = build_reader()?;

        let err = reader.read_record(ChunkId(1), segment).unwrap_err();

        assert_eq!(io::ErrorKind::NotFound, err.kind());
        assert_eq!(
            "Chunk not found: ChunkId(00_000_000_000_000_000_001)",
            err.to_string()
        );

        Ok(())
    }
}