raft_log/raft_log/
dump_raft_log.rs

1use std::collections::BTreeMap;
2use std::io;
3
4use crate::chunk::closed_chunk::ClosedChunk;
5use crate::raft_log::log_data::LogData;
6use crate::raft_log::state_machine::raft_log_state::RaftLogState;
7use crate::ChunkId;
8use crate::Types;
9use crate::WALRecord;
10
11/// A struct that contains a snapshot of RaftLog data for inspection or
12/// debugging.
13///
14/// It includes the log state, log entries, cache entries and closed chunks.
15pub struct DumpRaftLog<T: Types> {
16    pub(crate) state: RaftLogState<T>,
17
18    pub(crate) logs: Vec<LogData<T>>,
19    pub(crate) cache: BTreeMap<T::LogId, T::LogPayload>,
20    pub(crate) chunks: BTreeMap<ChunkId, ClosedChunk<T>>,
21
22    pub(crate) cache_hit: usize,
23    pub(crate) cache_miss: usize,
24}
25
26impl<T: Types> DumpRaftLog<T> {
27    /// Returns a reference to the RaftLog state machine state
28    pub fn state(&self) -> &RaftLogState<T> {
29        &self.state
30    }
31
32    /// Returns an iterator that yields log entries in order
33    ///
34    /// The iterator yields Result<(log_id, payload), io::Error> pairs. The
35    /// payload is retrieved either from cache or by reading from the
36    /// underlying chunk storage.
37    pub fn iter(&mut self) -> DumpRaftLogIter<T> {
38        DumpRaftLogIter { i: 0, data: self }
39    }
40}
41
42/// An iterator over log entries in a DumpData
43///
44/// It yields Result<(log_id, payload), io::Error> pairs. The payload is
45/// retrieved either from cache or by reading from the underlying chunk storage.
46///
47/// # Errors
48/// The iterator may return io::Error if:
49/// - The chunk containing a log entry is not found
50/// - There is an error reading a record from storage
51pub struct DumpRaftLogIter<'a, T: Types> {
52    i: usize,
53    data: &'a mut DumpRaftLog<T>,
54}
55
56impl<T: Types> DumpRaftLogIter<'_, T> {
57    /// Reads a log payload from the chunk storage
58    ///
59    /// # Errors
60    /// Returns io::Error if:
61    /// - The chunk is not found
62    /// - There is an error reading the record
63    fn read_log_payload(
64        &self,
65        data: &LogData<T>,
66    ) -> Result<T::LogPayload, io::Error> {
67        let chunk_id = data.chunk_id;
68        let segment = data.record_segment;
69        let closed = self.data.chunks.get(&chunk_id).ok_or_else(|| {
70            io::Error::new(
71                io::ErrorKind::NotFound,
72                format!(
73                    "Chunk not found: {}; when:(DumpRaftLogIter open cache-miss read)",
74                    chunk_id
75                ),
76            )
77        })?;
78
79        let record = closed.chunk.read_record(segment)?;
80
81        if let WALRecord::Append(log_id, payload) = record {
82            debug_assert_eq!(log_id, data.log_id);
83            Ok(payload)
84        } else {
85            panic!("Expect Record::Append but: {:?}", record);
86        }
87    }
88}
89
90impl<T: Types> Iterator for DumpRaftLogIter<'_, T> {
91    type Item = Result<(T::LogId, T::LogPayload), io::Error>;
92
93    fn next(&mut self) -> Option<Self::Item> {
94        if self.i >= self.data.logs.len() {
95            return None;
96        }
97
98        let data = &self.data.logs[self.i];
99        self.i += 1;
100
101        let log_id = data.log_id.clone();
102        let payload = self.data.cache.get(&log_id).cloned();
103
104        if let Some(payload) = payload {
105            self.data.cache_hit += 1;
106            Some(Ok((log_id, payload)))
107        } else {
108            self.data.cache_miss += 1;
109            Some(self.read_log_payload(data).map(|payload| (log_id, payload)))
110        }
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use std::io;
117
118    use indoc::indoc;
119
120    use crate::api::raft_log_writer::blocking_flush;
121    use crate::api::raft_log_writer::RaftLogWriter;
122    use crate::raft_log::state_machine::raft_log_state::RaftLogState;
123    use crate::testing::ss;
124    use crate::testing::TestTypes;
125    use crate::tests::context::TestContext;
126    use crate::RaftLog;
127
128    #[test]
129    fn test_dump_data() -> Result<(), io::Error> {
130        let mut ctx = TestContext::new()?;
131        let config = &mut ctx.config;
132
133        config.chunk_max_records = Some(5);
134        config.log_cache_max_items = Some(3);
135
136        let mut rl = ctx.new_raft_log()?;
137
138        build_sample_data(&mut rl)?;
139
140        let mut data = rl.dump_data();
141        assert_eq!(data.state(), &RaftLogState {
142            vote: None,
143            last: Some((2, 7)),
144            committed: Some((1, 2)),
145            purged: Some((1, 1)),
146            user_data: None,
147        });
148
149        let mut iter = data.iter();
150
151        let mut actual = vec![];
152        while let Some(Ok((log_id, payload))) = iter.next() {
153            actual.push(format!("{:?}: {}", log_id, payload));
154        }
155
156        let want = vec![
157            ss("(2, 2): world"),
158            ss("(2, 3): foo"),
159            ss("(2, 4): world"),
160            ss("(2, 5): foo"),
161            ss("(2, 6): bar"),
162            ss("(2, 7): wow"),
163        ];
164
165        assert_eq!(actual, want);
166
167        assert_eq!(data.cache_hit, 4);
168        assert_eq!(data.cache_miss, 2);
169
170        Ok(())
171    }
172
173    fn build_sample_data(
174        rl: &mut RaftLog<TestTypes>,
175    ) -> Result<String, io::Error> {
176        assert_eq!(rl.config.chunk_max_records, Some(5));
177
178        let logs = [
179            //
180            ((1, 0), ss("hi")),
181            ((1, 1), ss("hello")),
182            ((1, 2), ss("world")),
183            ((1, 3), ss("foo")),
184        ];
185        rl.append(logs)?;
186
187        rl.truncate(2)?;
188
189        let logs = [
190            //
191            ((2, 2), ss("world")),
192            ((2, 3), ss("foo")),
193        ];
194        rl.append(logs)?;
195
196        rl.commit((1, 2))?;
197        rl.purge((1, 1))?;
198        blocking_flush(rl)?;
199
200        let logs = [
201            //
202            ((2, 4), ss("world")),
203            ((2, 5), ss("foo")),
204            ((2, 6), ss("bar")),
205            ((2, 7), ss("wow")),
206        ];
207        rl.append(logs)?;
208
209        blocking_flush(rl)?;
210
211        let dumped = indoc! {r#"
212        RaftLog:
213        ChunkId(00_000_000_000_000_000_000)
214          R-00000: [000_000_000, 000_000_018) 18: State(RaftLogState { vote: None, last: None, committed: None, purged: None, user_data: None })
215          R-00001: [000_000_018, 000_000_052) 34: Append((1, 0), "hi")
216          R-00002: [000_000_052, 000_000_089) 37: Append((1, 1), "hello")
217          R-00003: [000_000_089, 000_000_126) 37: Append((1, 2), "world")
218          R-00004: [000_000_126, 000_000_161) 35: Append((1, 3), "foo")
219        ChunkId(00_000_000_000_000_000_161)
220          R-00000: [000_000_000, 000_000_034) 34: State(RaftLogState { vote: None, last: Some((1, 3)), committed: None, purged: None, user_data: None })
221          R-00001: [000_000_034, 000_000_063) 29: TruncateAfter(Some((1, 1)))
222          R-00002: [000_000_063, 000_000_100) 37: Append((2, 2), "world")
223          R-00003: [000_000_100, 000_000_135) 35: Append((2, 3), "foo")
224          R-00004: [000_000_135, 000_000_163) 28: Commit((1, 2))
225        ChunkId(00_000_000_000_000_000_324)
226          R-00000: [000_000_000, 000_000_050) 50: State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
227          R-00001: [000_000_050, 000_000_078) 28: PurgeUpto((1, 1))
228          R-00002: [000_000_078, 000_000_115) 37: Append((2, 4), "world")
229          R-00003: [000_000_115, 000_000_150) 35: Append((2, 5), "foo")
230          R-00004: [000_000_150, 000_000_185) 35: Append((2, 6), "bar")
231        ChunkId(00_000_000_000_000_000_509)
232          R-00000: [000_000_000, 000_000_066) 66: State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
233          R-00001: [000_000_066, 000_000_101) 35: Append((2, 7), "wow")
234        "#};
235        Ok(dumped.to_string())
236    }
237}