chunked_wal/wal/
closed_chunk_reader.rs1use std::collections::BTreeMap;
2use std::io;
3
4use crate::ChunkId;
5use crate::WALRecord;
6use crate::WalTypes;
7use crate::chunk::closed_chunk::ClosedChunk;
8use crate::types::Segment;
9
10#[derive(Debug, Clone)]
11pub struct ClosedChunkReader<W>
12where W: WalTypes
13{
14 chunks: BTreeMap<ChunkId, ClosedChunk<W>>,
15}
16
17impl<W> ClosedChunkReader<W>
18where W: WalTypes
19{
20 pub(crate) fn new(chunks: BTreeMap<ChunkId, ClosedChunk<W>>) -> Self {
21 Self { chunks }
22 }
23
24 pub fn read_record(
25 &self,
26 chunk_id: ChunkId,
27 segment: Segment,
28 ) -> Result<WALRecord<W>, io::Error> {
29 let closed = self.chunks.get(&chunk_id).ok_or_else(|| {
30 io::Error::new(
31 io::ErrorKind::NotFound,
32 format!("Chunk not found: {}", chunk_id),
33 )
34 })?;
35
36 closed.chunk.read_record(segment)
37 }
38}
39
40#[cfg(test)]
41mod tests {
42 use std::collections::BTreeMap;
43 use std::io;
44 use std::os::unix::fs::FileExt;
45 use std::sync::Arc;
46 use std::sync::mpsc::SyncSender;
47
48 use codeq::Decode;
49 use codeq::Encode;
50
51 use crate::Chunk;
52 use crate::ChunkId;
53 use crate::Config;
54 use crate::WALRecord;
55 use crate::WalTypes;
56 use crate::chunk::closed_chunk::ClosedChunk;
57 use crate::chunk::open_chunk::OpenChunk;
58 use crate::types::Segment;
59 use crate::wal::closed_chunk_reader::ClosedChunkReader;
60
61 const TEST_ACTION_TYPE: u32 = 1;
62
63 #[derive(Debug, Clone, PartialEq, Eq)]
64 struct TestAction(String);
65
66 impl Encode for TestAction {
67 fn encode<W: io::Write>(&self, mut w: W) -> Result<usize, io::Error> {
68 let mut n = TEST_ACTION_TYPE.encode(&mut w)?;
69 n += self.0.encode(&mut w)?;
70 Ok(n)
71 }
72
73 fn type_id(&self) -> Option<u32> {
74 Some(TEST_ACTION_TYPE)
75 }
76 }
77
78 impl Decode for TestAction {
79 fn decode<R: io::Read>(mut r: R) -> Result<Self, io::Error> {
80 let type_id = u32::decode(&mut r)?;
81 if type_id != TEST_ACTION_TYPE {
82 return Err(io::Error::new(
83 io::ErrorKind::InvalidData,
84 format!("unexpected action type id {}", type_id),
85 ));
86 }
87
88 Ok(Self(String::decode(&mut r)?))
89 }
90 }
91
92 #[derive(Debug, Default, Clone, PartialEq, Eq)]
93 struct TestWal;
94
95 impl WalTypes for TestWal {
96 type Action = TestAction;
97 type Checkpoint = String;
98 type Callback = SyncSender<Result<(), io::Error>>;
99 }
100
101 fn action(v: &str) -> WALRecord<TestWal> {
102 WALRecord::Action(TestAction(v.to_string()))
103 }
104
105 fn build_reader() -> Result<(ClosedChunkReader<TestWal>, Segment), io::Error>
106 {
107 let td = tempfile::tempdir()?;
108 let config = Config::new(td.path().to_str().unwrap());
109 let config = Arc::new(config);
110 let chunk_id = ChunkId(0);
111
112 let mut open = OpenChunk::<WALRecord<TestWal>>::create(
113 config.clone(),
114 chunk_id,
115 WALRecord::Checkpoint(String::new()),
116 )?;
117 open.append_record(&action("val"))?;
118 let data = open.take_pending_data();
119 let offset = open.chunk.f.metadata()?.len();
120 open.chunk.f.write_all_at(&data, offset)?;
121
122 let (chunk, records) = Chunk::<WALRecord<TestWal>>::open_with_truncate(
123 config, chunk_id, true,
124 )?;
125 assert_eq!(
126 vec![WALRecord::Checkpoint(String::new()), action("val"),],
127 records
128 );
129
130 let segment = chunk.record_segment(1);
131
132 let chunks = BTreeMap::from([(
133 chunk_id,
134 ClosedChunk::new(chunk, Arc::new("val".to_string())),
135 )]);
136
137 Ok((ClosedChunkReader::new(chunks), segment))
138 }
139
140 #[test]
141 fn test_read_record() -> Result<(), io::Error> {
142 let (reader, segment) = build_reader()?;
143
144 assert_eq!(action("val"), reader.read_record(ChunkId(0), segment)?);
145
146 Ok(())
147 }
148
149 #[test]
150 fn test_read_record_returns_not_found_for_missing_chunk()
151 -> Result<(), io::Error> {
152 let (reader, segment) = build_reader()?;
153
154 let err = reader.read_record(ChunkId(1), segment).unwrap_err();
155
156 assert_eq!(io::ErrorKind::NotFound, err.kind());
157 assert_eq!(
158 "Chunk not found: ChunkId(00_000_000_000_000_000_001)",
159 err.to_string()
160 );
161
162 Ok(())
163 }
164}