1use std::collections::BTreeMap;
2use std::io;
3
4use crate::chunk::closed_chunk::ClosedChunk;
5use crate::raft_log::log_data::LogData;
6use crate::raft_log::state_machine::raft_log_state::RaftLogState;
7use crate::ChunkId;
8use crate::Types;
9use crate::WALRecord;
10
11pub struct DumpRaftLog<T: Types> {
16 pub(crate) state: RaftLogState<T>,
17
18 pub(crate) logs: Vec<LogData<T>>,
19 pub(crate) cache: BTreeMap<T::LogId, T::LogPayload>,
20 pub(crate) chunks: BTreeMap<ChunkId, ClosedChunk<T>>,
21
22 pub(crate) cache_hit: usize,
23 pub(crate) cache_miss: usize,
24}
25
26impl<T: Types> DumpRaftLog<T> {
27 pub fn state(&self) -> &RaftLogState<T> {
29 &self.state
30 }
31
32 pub fn iter(&mut self) -> DumpRaftLogIter<T> {
38 DumpRaftLogIter { i: 0, data: self }
39 }
40}
41
42pub struct DumpRaftLogIter<'a, T: Types> {
52 i: usize,
53 data: &'a mut DumpRaftLog<T>,
54}
55
56impl<T: Types> DumpRaftLogIter<'_, T> {
57 fn read_log_payload(
64 &self,
65 data: &LogData<T>,
66 ) -> Result<T::LogPayload, io::Error> {
67 let chunk_id = data.chunk_id;
68 let segment = data.record_segment;
69 let closed = self.data.chunks.get(&chunk_id).ok_or_else(|| {
70 io::Error::new(
71 io::ErrorKind::NotFound,
72 format!(
73 "Chunk not found: {}; when:(DumpRaftLogIter open cache-miss read)",
74 chunk_id
75 ),
76 )
77 })?;
78
79 let record = closed.chunk.read_record(segment)?;
80
81 if let WALRecord::Append(log_id, payload) = record {
82 debug_assert_eq!(log_id, data.log_id);
83 Ok(payload)
84 } else {
85 panic!("Expect Record::Append but: {:?}", record);
86 }
87 }
88}
89
90impl<T: Types> Iterator for DumpRaftLogIter<'_, T> {
91 type Item = Result<(T::LogId, T::LogPayload), io::Error>;
92
93 fn next(&mut self) -> Option<Self::Item> {
94 if self.i >= self.data.logs.len() {
95 return None;
96 }
97
98 let data = &self.data.logs[self.i];
99 self.i += 1;
100
101 let log_id = data.log_id.clone();
102 let payload = self.data.cache.get(&log_id).cloned();
103
104 if let Some(payload) = payload {
105 self.data.cache_hit += 1;
106 Some(Ok((log_id, payload)))
107 } else {
108 self.data.cache_miss += 1;
109 Some(self.read_log_payload(data).map(|payload| (log_id, payload)))
110 }
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use std::io;
117
118 use indoc::indoc;
119
120 use crate::api::raft_log_writer::blocking_flush;
121 use crate::api::raft_log_writer::RaftLogWriter;
122 use crate::raft_log::state_machine::raft_log_state::RaftLogState;
123 use crate::testing::ss;
124 use crate::testing::TestTypes;
125 use crate::tests::context::TestContext;
126 use crate::RaftLog;
127
128 #[test]
129 fn test_dump_data() -> Result<(), io::Error> {
130 let mut ctx = TestContext::new()?;
131 let config = &mut ctx.config;
132
133 config.chunk_max_records = Some(5);
134 config.log_cache_max_items = Some(3);
135
136 let mut rl = ctx.new_raft_log()?;
137
138 build_sample_data(&mut rl)?;
139
140 let mut data = rl.dump_data();
141 assert_eq!(data.state(), &RaftLogState {
142 vote: None,
143 last: Some((2, 7)),
144 committed: Some((1, 2)),
145 purged: Some((1, 1)),
146 user_data: None,
147 });
148
149 let mut iter = data.iter();
150
151 let mut actual = vec![];
152 while let Some(Ok((log_id, payload))) = iter.next() {
153 actual.push(format!("{:?}: {}", log_id, payload));
154 }
155
156 let want = vec![
157 ss("(2, 2): world"),
158 ss("(2, 3): foo"),
159 ss("(2, 4): world"),
160 ss("(2, 5): foo"),
161 ss("(2, 6): bar"),
162 ss("(2, 7): wow"),
163 ];
164
165 assert_eq!(actual, want);
166
167 assert_eq!(data.cache_hit, 4);
168 assert_eq!(data.cache_miss, 2);
169
170 Ok(())
171 }
172
173 fn build_sample_data(
174 rl: &mut RaftLog<TestTypes>,
175 ) -> Result<String, io::Error> {
176 assert_eq!(rl.config.chunk_max_records, Some(5));
177
178 let logs = [
179 ((1, 0), ss("hi")),
181 ((1, 1), ss("hello")),
182 ((1, 2), ss("world")),
183 ((1, 3), ss("foo")),
184 ];
185 rl.append(logs)?;
186
187 rl.truncate(2)?;
188
189 let logs = [
190 ((2, 2), ss("world")),
192 ((2, 3), ss("foo")),
193 ];
194 rl.append(logs)?;
195
196 rl.commit((1, 2))?;
197 rl.purge((1, 1))?;
198 blocking_flush(rl)?;
199
200 let logs = [
201 ((2, 4), ss("world")),
203 ((2, 5), ss("foo")),
204 ((2, 6), ss("bar")),
205 ((2, 7), ss("wow")),
206 ];
207 rl.append(logs)?;
208
209 blocking_flush(rl)?;
210
211 let dumped = indoc! {r#"
212 RaftLog:
213 ChunkId(00_000_000_000_000_000_000)
214 R-00000: [000_000_000, 000_000_018) 18: State(RaftLogState { vote: None, last: None, committed: None, purged: None, user_data: None })
215 R-00001: [000_000_018, 000_000_052) 34: Append((1, 0), "hi")
216 R-00002: [000_000_052, 000_000_089) 37: Append((1, 1), "hello")
217 R-00003: [000_000_089, 000_000_126) 37: Append((1, 2), "world")
218 R-00004: [000_000_126, 000_000_161) 35: Append((1, 3), "foo")
219 ChunkId(00_000_000_000_000_000_161)
220 R-00000: [000_000_000, 000_000_034) 34: State(RaftLogState { vote: None, last: Some((1, 3)), committed: None, purged: None, user_data: None })
221 R-00001: [000_000_034, 000_000_063) 29: TruncateAfter(Some((1, 1)))
222 R-00002: [000_000_063, 000_000_100) 37: Append((2, 2), "world")
223 R-00003: [000_000_100, 000_000_135) 35: Append((2, 3), "foo")
224 R-00004: [000_000_135, 000_000_163) 28: Commit((1, 2))
225 ChunkId(00_000_000_000_000_000_324)
226 R-00000: [000_000_000, 000_000_050) 50: State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
227 R-00001: [000_000_050, 000_000_078) 28: PurgeUpto((1, 1))
228 R-00002: [000_000_078, 000_000_115) 37: Append((2, 4), "world")
229 R-00003: [000_000_115, 000_000_150) 35: Append((2, 5), "foo")
230 R-00004: [000_000_150, 000_000_185) 35: Append((2, 6), "bar")
231 ChunkId(00_000_000_000_000_000_509)
232 R-00000: [000_000_000, 000_000_066) 66: State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
233 R-00001: [000_000_066, 000_000_101) 35: Append((2, 7), "wow")
234 "#};
235 Ok(dumped.to_string())
236 }
237}