1use std::collections::BTreeMap;
2use std::io;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5
6use codeq::error_context_ext::ErrorContextExt;
7use codeq::OffsetSize;
8use log::info;
9
10use crate::api::raft_log_writer::RaftLogWriter;
11use crate::api::state_machine::StateMachine;
12use crate::api::wal::WAL;
13use crate::chunk::closed_chunk::ClosedChunk;
14use crate::chunk::open_chunk::OpenChunk;
15use crate::chunk::Chunk;
16use crate::errors::LogIndexNotFound;
17use crate::errors::RaftLogStateError;
18use crate::file_lock::FileLock;
19use crate::num::format_pad_u64;
20use crate::raft_log::access_state::AccessStat;
21use crate::raft_log::dump::RefDump;
22use crate::raft_log::dump_raft_log::DumpRaftLog;
23use crate::raft_log::stat::ChunkStat;
24use crate::raft_log::stat::Stat;
25use crate::raft_log::state_machine::raft_log_state::RaftLogState;
26use crate::raft_log::state_machine::RaftLogStateMachine;
27use crate::raft_log::wal::RaftLogWAL;
28use crate::types::Segment;
29use crate::ChunkId;
30use crate::Config;
31use crate::Types;
32use crate::WALRecord;
33
34#[derive(Debug)]
44pub struct RaftLog<T: Types> {
45 pub(crate) config: Arc<Config>,
46
47 _dir_lock: FileLock,
49
50 pub(crate) wal: RaftLogWAL<T>,
51
52 pub(crate) state_machine: RaftLogStateMachine<T>,
53
54 removed_chunks: Vec<String>,
58
59 access_stat: AccessStat,
60}
61
62impl<T: Types> RaftLogWriter<T> for RaftLog<T> {
63 fn save_user_data(
64 &mut self,
65 user_data: Option<T::UserData>,
66 ) -> Result<Segment, io::Error> {
67 let mut state = self.log_state().clone();
68 state.user_data = user_data;
69 let record = WALRecord::State(state);
70 self.append_and_apply(&record)
71 }
72
73 fn save_vote(&mut self, vote: T::Vote) -> Result<Segment, io::Error> {
74 let record = WALRecord::SaveVote(vote.clone());
75 self.append_and_apply(&record)
76 }
77
78 fn append<I>(&mut self, entries: I) -> Result<Segment, io::Error>
79 where I: IntoIterator<Item = (T::LogId, T::LogPayload)> {
80 for (log_id, payload) in entries {
81 let record = WALRecord::Append(log_id, payload);
82 self.append_and_apply(&record)?;
83 }
84 Ok(self.wal.last_segment())
85 }
86
87 fn truncate(&mut self, index: u64) -> Result<Segment, io::Error> {
89 let purged = self.log_state().purged.as_ref();
90
91 let log_id = if index == T::next_log_index(purged) {
92 purged.cloned()
93 } else {
94 let log_id = self.get_log_id(index - 1)?;
95 Some(log_id)
96 };
97
98 let record = WALRecord::TruncateAfter(log_id);
99 self.append_and_apply(&record)
100 }
101
102 fn purge(&mut self, upto: T::LogId) -> Result<Segment, io::Error> {
103 let purged = self.log_state().purged.as_ref();
107
108 info!(
109 "RaftLog purge upto: {:?}; current purged: {:?}",
110 upto, purged
111 );
112
113 if T::log_index(&upto) < T::next_log_index(purged) {
114 return Ok(self.wal.last_segment());
115 }
116
117 let record = WALRecord::PurgeUpto(upto.clone());
118 let res = self.append_and_apply(&record)?;
119
120 while let Some((_chunk_id, closed)) = self.wal.closed.first_key_value()
125 {
126 if closed.state.last.as_ref() > Some(&upto) {
127 break;
128 }
129 let (chunk_id, _r) = self.wal.closed.pop_first().unwrap();
130 let path = self.config.chunk_path(chunk_id);
131 info!(
132 "RaftLog: scheduled to remove chunk after next flush: {}",
133 path
134 );
135 self.removed_chunks.push(path);
136 }
137
138 Ok(res)
139 }
140
141 fn commit(&mut self, log_id: T::LogId) -> Result<Segment, io::Error> {
142 let record = WALRecord::Commit(log_id);
143 self.append_and_apply(&record)
144 }
145
146 fn flush(&mut self, callback: T::Callback) -> Result<(), io::Error> {
147 self.wal.send_flush(callback)?;
148
149 if !self.removed_chunks.is_empty() {
150 let chunk_ids = self.removed_chunks.drain(..).collect::<Vec<_>>();
151 self.wal.send_remove_chunks(chunk_ids)?;
152 }
153
154 Ok(())
155 }
156}
157
158impl<T: Types> RaftLog<T> {
159 pub fn dump_data(&self) -> DumpRaftLog<T> {
164 let logs = self.state_machine.log.values().cloned().collect::<Vec<_>>();
165 let cache =
166 self.state_machine.payload_cache.read().unwrap().cache.clone();
167 let chunks = self.wal.closed.clone();
168
169 DumpRaftLog {
170 state: self.state_machine.log_state.clone(),
171 logs,
172 cache,
173 chunks,
174 cache_hit: 0,
175 cache_miss: 0,
176 }
177 }
178
179 pub fn dump(&self) -> RefDump<'_, T> {
184 RefDump {
185 config: self.config.clone(),
186 raft_log: self,
187 }
188 }
189
190 pub fn config(&self) -> &Config {
192 self.config.as_ref()
193 }
194
195 pub fn open(config: Arc<Config>) -> Result<Self, io::Error> {
209 let dir_lock = FileLock::new(config.clone())
210 .context(|| format!("open RaftLog in '{}'", config.dir))?;
211
212 let chunk_ids = Self::load_chunk_ids(&config)?;
213
214 let mut sm = RaftLogStateMachine::new(&config);
215 let mut closed = BTreeMap::new();
216 let mut prev_end_offset = None;
217 let mut last_log_id = None;
218
219 for chunk_id in chunk_ids.iter().copied() {
220 sm.payload_cache.write().unwrap().set_last_evictable(last_log_id);
224
225 Self::ensure_consecutive_chunks(prev_end_offset, chunk_id)?;
226
227 let (chunk, records) = Chunk::open(config.clone(), chunk_id)?;
228
229 for (i, record) in records.into_iter().enumerate() {
230 let start = chunk.global_offsets[i];
231 let end = chunk.global_offsets[i + 1];
232 let seg = Segment::new(start, end - start);
233 sm.apply(&record, chunk_id, seg)?;
234 }
235
236 prev_end_offset = Some(chunk.last_segment().end().0);
237 last_log_id = sm.log_state.last.clone();
238
239 closed.insert(
240 chunk_id,
241 ClosedChunk::new(chunk, sm.log_state.clone()),
242 );
243 }
244
245 let open = Self::reopen_last_closed(&mut closed);
246
247 let open = if let Some(open) = open {
248 open
249 } else {
250 OpenChunk::create(
251 config.clone(),
252 ChunkId(prev_end_offset.unwrap_or_default()),
253 WALRecord::State(sm.log_state.clone()),
254 )?
255 };
256
257 let cache = sm.payload_cache.clone();
258
259 let wal = RaftLogWAL::new(config.clone(), closed, open, cache);
260
261 let s = Self {
262 config,
263 _dir_lock: dir_lock,
264 state_machine: sm,
265 wal,
266 access_stat: Default::default(),
267 removed_chunks: vec![],
268 };
269
270 Ok(s)
271 }
272
273 fn ensure_consecutive_chunks(
284 prev_end_offset: Option<u64>,
285 chunk_id: ChunkId,
286 ) -> Result<(), io::Error> {
287 let Some(prev_end) = prev_end_offset else {
288 return Ok(());
289 };
290
291 if prev_end != chunk_id.offset() {
292 let message = format!(
293 "Gap between chunks: {} -> {}; Can not open, \
294 fix this error and re-open",
295 format_pad_u64(prev_end),
296 format_pad_u64(chunk_id.offset()),
297 );
298 return Err(io::Error::new(io::ErrorKind::InvalidData, message));
299 }
300
301 Ok(())
302 }
303
304 fn reopen_last_closed(
309 closed_chunks: &mut BTreeMap<ChunkId, ClosedChunk<T>>,
310 ) -> Option<OpenChunk<T>> {
311 {
313 let (_chunk_id, closed) = closed_chunks.iter().last()?;
314
315 if closed.chunk.truncated.is_some() {
316 return None;
317 }
318 }
319
320 let (_chunk_id, last) = closed_chunks.pop_last().unwrap();
321 let open = OpenChunk::new(last.chunk);
322 Some(open)
323 }
324
325 pub fn load_chunk_ids(config: &Config) -> Result<Vec<ChunkId>, io::Error> {
326 let path = &config.dir;
327 let entries = std::fs::read_dir(path)?;
328 let mut chunk_ids = vec![];
329 for entry in entries {
330 let entry = entry?;
331 let file_name = entry.file_name();
332
333 let fn_str = file_name.to_string_lossy();
334 if fn_str == FileLock::LOCK_FILE_NAME {
335 continue;
336 }
337
338 let res = Config::parse_chunk_file_name(&fn_str);
339
340 match res {
341 Ok(offset) => {
342 chunk_ids.push(ChunkId(offset));
343 }
344 Err(err) => {
345 log::warn!(
346 "Ignore invalid WAL file name: '{}': {}",
347 fn_str,
348 err
349 );
350 continue;
351 }
352 };
353 }
354
355 chunk_ids.sort();
356
357 Ok(chunk_ids)
358 }
359
360 pub fn update_state(
365 &mut self,
366 state: RaftLogState<T>,
367 ) -> Result<Segment, io::Error> {
368 let record = WALRecord::State(state);
369 self.append_and_apply(&record)
370 }
371
372 pub fn read(
377 &self,
378 from: u64,
379 to: u64,
380 ) -> impl Iterator<Item = Result<(T::LogId, T::LogPayload), io::Error>> + '_
381 {
382 self.state_machine.log.range(from..to).map(|(_, log_data)| {
383 let log_id = log_data.log_id.clone();
384
385 let payload =
386 self.state_machine.payload_cache.read().unwrap().get(&log_id);
387
388 let payload = if let Some(payload) = payload {
389 self.access_stat.cache_hit.fetch_add(1, Ordering::Relaxed);
390 payload
391 } else {
392 self.access_stat.cache_miss.fetch_add(1, Ordering::Relaxed);
393 self.wal.load_log_payload(log_data)?
394 };
395
396 Ok((log_id, payload))
397 })
398 }
399
400 pub fn log_state(&self) -> &RaftLogState<T> {
405 &self.state_machine.log_state
406 }
407
408 #[allow(dead_code)]
409 pub(crate) fn log_state_mut(&mut self) -> &mut RaftLogState<T> {
410 &mut self.state_machine.log_state
411 }
412
413 pub fn stat(&self) -> Stat<T> {
420 let closed =
421 self.wal.closed.values().map(|c| c.stat()).collect::<Vec<_>>();
422
423 let open = &self.wal.open;
424 let open_stat = ChunkStat {
425 chunk_id: open.chunk.chunk_id(),
426 records_count: open.chunk.records_count() as u64,
427 global_start: open.chunk.global_start(),
428 global_end: open.chunk.global_end(),
429 size: open.chunk.chunk_size(),
430 log_state: self.log_state().clone(),
431 };
432 let cache = self.state_machine.payload_cache.read().unwrap();
433
434 Stat {
435 closed_chunks: closed,
436 open_chunk: open_stat,
437
438 payload_cache_last_evictable: cache.last_evictable().cloned(),
439 payload_cache_item_count: cache.item_count() as u64,
440 payload_cache_max_item: cache.max_items() as u64,
441 payload_cache_size: cache.total_size() as u64,
442 payload_cache_capacity: cache.capacity() as u64,
443
444 payload_cache_miss: self
445 .access_stat
446 .cache_miss
447 .load(Ordering::Relaxed),
448 payload_cache_hit: self
449 .access_stat
450 .cache_hit
451 .load(Ordering::Relaxed),
452 }
453 }
454
455 pub fn access_stat(&self) -> &AccessStat {
460 &self.access_stat
461 }
462
463 fn get_log_id(&self, index: u64) -> Result<T::LogId, RaftLogStateError<T>> {
464 let entry = self
465 .state_machine
466 .log
467 .get(&index)
468 .ok_or_else(|| LogIndexNotFound::new(index))?;
469 Ok(entry.log_id.clone())
470 }
471
472 fn append_and_apply(
473 &mut self,
474 rec: &WALRecord<T>,
475 ) -> Result<Segment, io::Error> {
476 WAL::append(&mut self.wal, rec)?;
477 StateMachine::apply(
478 &mut self.state_machine,
479 rec,
480 self.wal.open.chunk.chunk_id(),
481 self.wal.last_segment(),
482 )?;
483
484 self.wal
485 .try_close_full_chunk(|| self.state_machine.log_state.clone())?;
486
487 Ok(self.wal.last_segment())
488 }
489
490 pub fn on_disk_size(&self) -> u64 {
495 let end = self.wal.open.chunk.global_end();
496 let open_start = self.wal.open.chunk.global_start();
497 let first_closed_start = self
498 .wal
499 .closed
500 .first_key_value()
501 .map(|(_, v)| v.chunk.global_start())
502 .unwrap_or(open_start);
503
504 end - first_closed_start
505 }
506}