Skip to main content

chunked_wal/wal/
closed_chunk_reader.rs

1use std::collections::BTreeMap;
2use std::io;
3
4use crate::ChunkId;
5use crate::WALRecord;
6use crate::WalTypes;
7use crate::chunk::closed_chunk::ClosedChunk;
8use crate::types::Segment;
9
10#[derive(Debug, Clone)]
11pub struct ClosedChunkReader<W>
12where W: WalTypes
13{
14    chunks: BTreeMap<ChunkId, ClosedChunk<W>>,
15}
16
17impl<W> ClosedChunkReader<W>
18where W: WalTypes
19{
20    pub(crate) fn new(chunks: BTreeMap<ChunkId, ClosedChunk<W>>) -> Self {
21        Self { chunks }
22    }
23
24    pub fn read_record(
25        &self,
26        chunk_id: ChunkId,
27        segment: Segment,
28    ) -> Result<WALRecord<W>, io::Error> {
29        let closed = self.chunks.get(&chunk_id).ok_or_else(|| {
30            io::Error::new(
31                io::ErrorKind::NotFound,
32                format!("Chunk not found: {}", chunk_id),
33            )
34        })?;
35
36        closed.chunk.read_record(segment)
37    }
38}
39
40#[cfg(test)]
41mod tests {
42    use std::collections::BTreeMap;
43    use std::io;
44    use std::os::unix::fs::FileExt;
45    use std::sync::Arc;
46    use std::sync::mpsc::SyncSender;
47
48    use codeq::Decode;
49    use codeq::Encode;
50
51    use crate::Chunk;
52    use crate::ChunkId;
53    use crate::Config;
54    use crate::WALRecord;
55    use crate::WalTypes;
56    use crate::chunk::closed_chunk::ClosedChunk;
57    use crate::chunk::open_chunk::OpenChunk;
58    use crate::types::Segment;
59    use crate::wal::closed_chunk_reader::ClosedChunkReader;
60
61    const TEST_ACTION_TYPE: u32 = 1;
62
63    #[derive(Debug, Clone, PartialEq, Eq)]
64    struct TestAction(String);
65
66    impl Encode for TestAction {
67        fn encode<W: io::Write>(&self, mut w: W) -> Result<usize, io::Error> {
68            let mut n = TEST_ACTION_TYPE.encode(&mut w)?;
69            n += self.0.encode(&mut w)?;
70            Ok(n)
71        }
72
73        fn type_id(&self) -> Option<u32> {
74            Some(TEST_ACTION_TYPE)
75        }
76    }
77
78    impl Decode for TestAction {
79        fn decode<R: io::Read>(mut r: R) -> Result<Self, io::Error> {
80            let type_id = u32::decode(&mut r)?;
81            if type_id != TEST_ACTION_TYPE {
82                return Err(io::Error::new(
83                    io::ErrorKind::InvalidData,
84                    format!("unexpected action type id {}", type_id),
85                ));
86            }
87
88            Ok(Self(String::decode(&mut r)?))
89        }
90    }
91
92    #[derive(Debug, Default, Clone, PartialEq, Eq)]
93    struct TestWal;
94
95    impl WalTypes for TestWal {
96        type Action = TestAction;
97        type Checkpoint = String;
98        type Callback = SyncSender<Result<(), io::Error>>;
99    }
100
101    fn action(v: &str) -> WALRecord<TestWal> {
102        WALRecord::Action(TestAction(v.to_string()))
103    }
104
105    fn build_reader() -> Result<(ClosedChunkReader<TestWal>, Segment), io::Error>
106    {
107        let td = tempfile::tempdir()?;
108        let config = Config::new(td.path().to_str().unwrap());
109        let config = Arc::new(config);
110        let chunk_id = ChunkId(0);
111
112        let mut open = OpenChunk::<WALRecord<TestWal>>::create(
113            config.clone(),
114            chunk_id,
115            WALRecord::Checkpoint(String::new()),
116        )?;
117        open.append_record(&action("val"))?;
118        let data = open.take_pending_data();
119        let offset = open.chunk.f.metadata()?.len();
120        open.chunk.f.write_all_at(&data, offset)?;
121
122        let (chunk, records) = Chunk::<WALRecord<TestWal>>::open_with_truncate(
123            config, chunk_id, true,
124        )?;
125        assert_eq!(
126            vec![WALRecord::Checkpoint(String::new()), action("val"),],
127            records
128        );
129
130        let segment = chunk.record_segment(1);
131
132        let chunks = BTreeMap::from([(
133            chunk_id,
134            ClosedChunk::new(chunk, Arc::new("val".to_string())),
135        )]);
136
137        Ok((ClosedChunkReader::new(chunks), segment))
138    }
139
140    #[test]
141    fn test_read_record() -> Result<(), io::Error> {
142        let (reader, segment) = build_reader()?;
143
144        assert_eq!(action("val"), reader.read_record(ChunkId(0), segment)?);
145
146        Ok(())
147    }
148
149    #[test]
150    fn test_read_record_returns_not_found_for_missing_chunk()
151    -> Result<(), io::Error> {
152        let (reader, segment) = build_reader()?;
153
154        let err = reader.read_record(ChunkId(1), segment).unwrap_err();
155
156        assert_eq!(io::ErrorKind::NotFound, err.kind());
157        assert_eq!(
158            "Chunk not found: ChunkId(00_000_000_000_000_000_001)",
159            err.to_string()
160        );
161
162        Ok(())
163    }
164}