1use std::collections::BTreeMap;
2use std::io;
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6use codeq::OffsetSize;
7use codeq::error_context_ext::ErrorContextExt;
8use log::info;
9
10use crate::ChunkId;
11use crate::Config;
12use crate::Types;
13use crate::WALRecord;
14use crate::api::raft_log_writer::RaftLogWriter;
15use crate::api::state_machine::StateMachine;
16use crate::api::wal::WAL;
17use crate::chunk::Chunk;
18use crate::chunk::closed_chunk::ClosedChunk;
19use crate::chunk::open_chunk::OpenChunk;
20use crate::errors::LogIndexNotFound;
21use crate::errors::RaftLogStateError;
22use crate::file_lock::FileLock;
23use crate::num::format_pad_u64;
24use crate::raft_log::access_state::AccessStat;
25use crate::raft_log::dump::RefDump;
26use crate::raft_log::dump_raft_log::DumpRaftLog;
27use crate::raft_log::stat::ChunkStat;
28use crate::raft_log::stat::Stat;
29use crate::raft_log::state_machine::RaftLogStateMachine;
30use crate::raft_log::state_machine::raft_log_state::RaftLogState;
31use crate::raft_log::wal::RaftLogWAL;
32use crate::types::Segment;
33
34#[derive(Debug)]
44pub struct RaftLog<T: Types> {
45 pub(crate) config: Arc<Config>,
46
47 _dir_lock: FileLock,
49
50 pub(crate) wal: RaftLogWAL<T>,
51
52 pub(crate) state_machine: RaftLogStateMachine<T>,
53
54 removed_chunks: Vec<String>,
58
59 access_stat: AccessStat,
60}
61
62impl<T: Types> RaftLogWriter<T> for RaftLog<T> {
63 fn save_user_data(
64 &mut self,
65 user_data: Option<T::UserData>,
66 ) -> Result<Segment, io::Error> {
67 let mut state = self.log_state().clone();
68 state.user_data = user_data;
69 let record = WALRecord::State(state);
70 self.append_and_apply(&record)
71 }
72
73 fn save_vote(&mut self, vote: T::Vote) -> Result<Segment, io::Error> {
74 let record = WALRecord::SaveVote(vote.clone());
75 self.append_and_apply(&record)
76 }
77
78 fn append<I>(&mut self, entries: I) -> Result<Segment, io::Error>
79 where I: IntoIterator<Item = (T::LogId, T::LogPayload)> {
80 for (log_id, payload) in entries {
81 let record = WALRecord::Append(log_id, payload);
82 self.append_and_apply(&record)?;
83 }
84 Ok(self.wal.last_segment())
85 }
86
87 fn truncate(&mut self, index: u64) -> Result<Segment, io::Error> {
89 let purged = self.log_state().purged.as_ref();
90
91 let log_id = if index == T::next_log_index(purged) {
92 purged.cloned()
93 } else {
94 let log_id = self.get_log_id(index - 1)?;
95 Some(log_id)
96 };
97
98 let record = WALRecord::TruncateAfter(log_id);
99 self.append_and_apply(&record)
100 }
101
102 fn purge(&mut self, upto: T::LogId) -> Result<Segment, io::Error> {
103 let purged = self.log_state().purged.as_ref();
107
108 info!(
109 "RaftLog purge upto: {:?}; current purged: {:?}",
110 upto, purged
111 );
112
113 if T::log_index(&upto) < T::next_log_index(purged) {
114 return Ok(self.wal.last_segment());
115 }
116
117 let record = WALRecord::PurgeUpto(upto.clone());
118 let res = self.append_and_apply(&record)?;
119
120 while let Some((_chunk_id, closed)) = self.wal.closed.first_key_value()
125 {
126 if closed.state.last.as_ref() > Some(&upto) {
127 break;
128 }
129 let (chunk_id, _r) = self.wal.closed.pop_first().unwrap();
130 let path = self.config.chunk_path(chunk_id);
131 info!(
132 "RaftLog: scheduled to remove chunk after next flush: {}",
133 path
134 );
135 self.removed_chunks.push(path);
136 }
137
138 Ok(res)
139 }
140
141 fn commit(&mut self, log_id: T::LogId) -> Result<Segment, io::Error> {
142 let record = WALRecord::Commit(log_id);
143 self.append_and_apply(&record)
144 }
145
146 fn flush(
147 &mut self,
148 callback: Option<T::Callback>,
149 ) -> Result<(), io::Error> {
150 self.wal.send_flush(callback)?;
151
152 if !self.removed_chunks.is_empty() {
153 let chunk_ids = self.removed_chunks.drain(..).collect::<Vec<_>>();
154 self.wal.send_remove_chunks(chunk_ids)?;
155 }
156
157 Ok(())
158 }
159}
160
161impl<T: Types> RaftLog<T> {
162 pub fn dump_data(&self) -> DumpRaftLog<T> {
167 let logs = self.state_machine.log.values().cloned().collect::<Vec<_>>();
168 let cache =
169 self.state_machine.payload_cache.read().unwrap().cache.clone();
170 let chunks = self.wal.closed.clone();
171
172 DumpRaftLog {
173 state: self.state_machine.log_state.clone(),
174 logs,
175 cache,
176 chunks,
177 cache_hit: 0,
178 cache_miss: 0,
179 }
180 }
181
182 pub fn dump(&self) -> RefDump<'_, T> {
187 RefDump {
188 config: self.config.clone(),
189 raft_log: self,
190 }
191 }
192
193 pub fn config(&self) -> &Config {
195 self.config.as_ref()
196 }
197
198 pub fn open(config: Arc<Config>) -> Result<Self, io::Error> {
212 let dir_lock = FileLock::new(config.clone())
213 .context(|| format!("open RaftLog in '{}'", config.dir))?;
214
215 let chunk_ids = Self::load_chunk_ids(&config)?;
216
217 let mut sm = RaftLogStateMachine::new(&config);
218 let mut closed = BTreeMap::new();
219 let mut prev_end_offset = None;
220 let mut last_log_id = None;
221
222 for chunk_id in chunk_ids.iter().copied() {
223 sm.payload_cache.write().unwrap().set_last_evictable(last_log_id);
227
228 Self::ensure_consecutive_chunks(prev_end_offset, chunk_id)?;
229
230 let (chunk, records) = Chunk::open(config.clone(), chunk_id)?;
231
232 for (i, record) in records.into_iter().enumerate() {
233 let start = chunk.global_offsets[i];
234 let end = chunk.global_offsets[i + 1];
235 let seg = Segment::new(start, end - start);
236 sm.apply(&record, chunk_id, seg)?;
237 }
238
239 prev_end_offset = Some(chunk.last_segment().end().0);
240 last_log_id = sm.log_state.last.clone();
241
242 closed.insert(
243 chunk_id,
244 ClosedChunk::new(chunk, sm.log_state.clone()),
245 );
246 }
247
248 let open = Self::reopen_last_closed(&mut closed);
249
250 let open = if let Some(open) = open {
251 open
252 } else {
253 OpenChunk::create(
254 config.clone(),
255 ChunkId(prev_end_offset.unwrap_or_default()),
256 WALRecord::State(sm.log_state.clone()),
257 )?
258 };
259
260 let cache = sm.payload_cache.clone();
261
262 let wal = RaftLogWAL::new(config.clone(), closed, open, cache);
263
264 let s = Self {
265 config,
266 _dir_lock: dir_lock,
267 state_machine: sm,
268 wal,
269 access_stat: Default::default(),
270 removed_chunks: vec![],
271 };
272
273 Ok(s)
274 }
275
276 fn ensure_consecutive_chunks(
287 prev_end_offset: Option<u64>,
288 chunk_id: ChunkId,
289 ) -> Result<(), io::Error> {
290 let Some(prev_end) = prev_end_offset else {
291 return Ok(());
292 };
293
294 if prev_end != chunk_id.offset() {
295 let message = format!(
296 "Gap between chunks: {} -> {}; Can not open, \
297 fix this error and re-open",
298 format_pad_u64(prev_end),
299 format_pad_u64(chunk_id.offset()),
300 );
301 return Err(io::Error::new(io::ErrorKind::InvalidData, message));
302 }
303
304 Ok(())
305 }
306
307 fn reopen_last_closed(
312 closed_chunks: &mut BTreeMap<ChunkId, ClosedChunk<T>>,
313 ) -> Option<OpenChunk<T>> {
314 {
316 let (_chunk_id, closed) = closed_chunks.iter().last()?;
317
318 if closed.chunk.truncated.is_some() {
319 return None;
320 }
321 }
322
323 let (_chunk_id, last) = closed_chunks.pop_last().unwrap();
324 let open = OpenChunk::new(last.chunk);
325 Some(open)
326 }
327
328 pub fn load_chunk_ids(config: &Config) -> Result<Vec<ChunkId>, io::Error> {
329 let path = &config.dir;
330 let entries = std::fs::read_dir(path)?;
331 let mut chunk_ids = vec![];
332 for entry in entries {
333 let entry = entry?;
334 let file_name = entry.file_name();
335
336 let fn_str = file_name.to_string_lossy();
337 if fn_str == FileLock::LOCK_FILE_NAME {
338 continue;
339 }
340
341 let res = Config::parse_chunk_file_name(&fn_str);
342
343 match res {
344 Ok(offset) => {
345 chunk_ids.push(ChunkId(offset));
346 }
347 Err(err) => {
348 log::warn!(
349 "Ignore invalid WAL file name: '{}': {}",
350 fn_str,
351 err
352 );
353 continue;
354 }
355 };
356 }
357
358 chunk_ids.sort();
359
360 Ok(chunk_ids)
361 }
362
363 pub fn update_state(
368 &mut self,
369 state: RaftLogState<T>,
370 ) -> Result<Segment, io::Error> {
371 let record = WALRecord::State(state);
372 self.append_and_apply(&record)
373 }
374
375 pub fn read(
380 &self,
381 from: u64,
382 to: u64,
383 ) -> impl Iterator<Item = Result<(T::LogId, T::LogPayload), io::Error>> + '_
384 {
385 self.state_machine.log.range(from..to).map(|(_, log_data)| {
386 let log_id = log_data.log_id.clone();
387
388 let payload =
389 self.state_machine.payload_cache.read().unwrap().get(&log_id);
390
391 let payload = if let Some(payload) = payload {
392 self.access_stat.cache_hit.fetch_add(1, Ordering::Relaxed);
393 payload
394 } else {
395 self.access_stat.cache_miss.fetch_add(1, Ordering::Relaxed);
396 self.wal.load_log_payload(log_data)?
397 };
398
399 Ok((log_id, payload))
400 })
401 }
402
403 pub fn log_state(&self) -> &RaftLogState<T> {
408 &self.state_machine.log_state
409 }
410
411 #[allow(dead_code)]
412 pub(crate) fn log_state_mut(&mut self) -> &mut RaftLogState<T> {
413 &mut self.state_machine.log_state
414 }
415
416 pub fn stat(&self) -> Stat<T> {
423 let closed =
424 self.wal.closed.values().map(|c| c.stat()).collect::<Vec<_>>();
425
426 let open = &self.wal.open;
427 let open_stat = ChunkStat {
428 chunk_id: open.chunk.chunk_id(),
429 records_count: open.chunk.records_count() as u64,
430 global_start: open.chunk.global_start(),
431 global_end: open.chunk.global_end(),
432 size: open.chunk.chunk_size(),
433 log_state: self.log_state().clone(),
434 };
435 let cache = self.state_machine.payload_cache.read().unwrap();
436
437 Stat {
438 closed_chunks: closed,
439 open_chunk: open_stat,
440
441 payload_cache_last_evictable: cache.last_evictable().cloned(),
442 payload_cache_item_count: cache.item_count() as u64,
443 payload_cache_max_item: cache.max_items() as u64,
444 payload_cache_size: cache.total_size() as u64,
445 payload_cache_capacity: cache.capacity() as u64,
446
447 payload_cache_miss: self
448 .access_stat
449 .cache_miss
450 .load(Ordering::Relaxed),
451 payload_cache_hit: self
452 .access_stat
453 .cache_hit
454 .load(Ordering::Relaxed),
455 }
456 }
457
458 pub fn access_stat(&self) -> &AccessStat {
463 &self.access_stat
464 }
465
466 pub fn wait_worker_idle(&self) {
473 self.wal.wait_worker_idle();
474 }
475
476 pub fn drain_cache_evictable(&self) {
481 self.state_machine.payload_cache.write().unwrap().drain_evictable();
482 }
483
484 fn get_log_id(&self, index: u64) -> Result<T::LogId, RaftLogStateError<T>> {
485 let entry = self
486 .state_machine
487 .log
488 .get(&index)
489 .ok_or_else(|| LogIndexNotFound::new(index))?;
490 Ok(entry.log_id.clone())
491 }
492
493 fn append_and_apply(
494 &mut self,
495 rec: &WALRecord<T>,
496 ) -> Result<Segment, io::Error> {
497 WAL::append(&mut self.wal, rec)?;
498 StateMachine::apply(
499 &mut self.state_machine,
500 rec,
501 self.wal.open.chunk.chunk_id(),
502 self.wal.last_segment(),
503 )?;
504
505 self.wal
506 .try_close_full_chunk(|| self.state_machine.log_state.clone())?;
507
508 Ok(self.wal.last_segment())
509 }
510
511 pub fn on_disk_size(&self) -> u64 {
516 let end = self.wal.open.chunk.global_end();
517 let open_start = self.wal.open.chunk.global_start();
518 let first_closed_start = self
519 .wal
520 .closed
521 .first_key_value()
522 .map(|(_, v)| v.chunk.global_start())
523 .unwrap_or(open_start);
524
525 end - first_closed_start
526 }
527}