raft_engine/
log_batch.rs

1// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3use std::fmt::Debug;
4use std::io::BufRead;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use std::{mem, u64};
8
9use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
10use log::error;
11use num_derive::FromPrimitive;
12use num_traits::FromPrimitive;
13use protobuf::Message;
14
15use crate::codec::{self, NumberEncoder};
16use crate::memtable::EntryIndex;
17use crate::metrics::StopWatch;
18use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext, ReactiveBytes};
19use crate::util::{crc32, lz4};
20use crate::{perf_context, Error, Result};
21
22pub(crate) const LOG_BATCH_HEADER_LEN: usize = 16;
23pub(crate) const LOG_BATCH_CHECKSUM_LEN: usize = 4;
24
25const TYPE_ENTRIES: u8 = 0x01;
26const TYPE_COMMAND: u8 = 0x02;
27const TYPE_KV: u8 = 0x3;
28
29const CMD_CLEAN: u8 = 0x01;
30const CMD_COMPACT: u8 = 0x02;
31
32const DEFAULT_LOG_ITEM_BATCH_CAP: usize = 64;
33const MAX_LOG_BATCH_BUFFER_CAP: usize = 8 * 1024 * 1024;
34// 2GiB, The maximum content length accepted by lz4 compression.
35const MAX_LOG_ENTRIES_SIZE_PER_BATCH: usize = i32::MAX as usize;
36
37/// `MessageExt` trait allows for probing log index from a specific type of
38/// protobuf messages.
39pub trait MessageExt: Send + Sync {
40    type Entry: Message + Clone + PartialEq;
41
42    fn index(e: &Self::Entry) -> u64;
43}
44
45/// Types of compression.
46#[repr(u8)]
47#[derive(Clone, Copy, PartialEq, Eq, Debug)]
48pub enum CompressionType {
49    None = 0,
50    Lz4 = 1,
51}
52
53impl CompressionType {
54    pub fn from_u8(t: u8) -> Result<Self> {
55        if t <= CompressionType::Lz4 as u8 {
56            Ok(unsafe { mem::transmute(t) })
57        } else {
58            Err(Error::Corruption(format!(
59                "Unrecognized compression type: {t}"
60            )))
61        }
62    }
63
64    pub fn to_u8(self) -> u8 {
65        self as u8
66    }
67}
68
69type SliceReader<'a> = &'a [u8];
70
71// Format:
72// { count | first index | [ tail offsets ] }
73#[derive(Clone, Debug, PartialEq, Eq)]
74pub struct EntryIndexes(pub Vec<EntryIndex>);
75
76impl EntryIndexes {
77    pub fn decode(buf: &mut SliceReader, entries_size: &mut u32) -> Result<Self> {
78        let mut count = codec::decode_var_u64(buf)?;
79        let mut entry_indexes = Vec::with_capacity(count as usize);
80        let mut index = 0;
81        if count > 0 {
82            index = codec::decode_var_u64(buf)?;
83        }
84        while count > 0 {
85            let t = codec::decode_var_u64(buf)?;
86            let entry_len = (t as u32) - *entries_size;
87            let entry_index = EntryIndex {
88                index,
89                entry_offset: *entries_size,
90                entry_len,
91                ..Default::default()
92            };
93            *entries_size += entry_len;
94            entry_indexes.push(entry_index);
95            index += 1;
96            count -= 1;
97        }
98        Ok(Self(entry_indexes))
99    }
100
101    pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
102        let count = self.0.len() as u64;
103        buf.encode_var_u64(count)?;
104        if count > 0 {
105            buf.encode_var_u64(self.0[0].index)?;
106        }
107        for ei in self.0.iter() {
108            buf.encode_var_u64((ei.entry_offset + ei.entry_len) as u64)?;
109        }
110        Ok(())
111    }
112
113    fn approximate_size(&self) -> usize {
114        8 /*count*/ + if self.0.is_empty() { 0 } else { 8 } /*first index*/ + 8 /*tail offset*/ * self.0.len()
115    }
116}
117
118// Format:
119// { type | (index) }
120#[derive(Clone, Debug, PartialEq, Eq)]
121pub enum Command {
122    Clean,
123    Compact { index: u64 },
124}
125
126impl Command {
127    pub fn encode(&self, vec: &mut Vec<u8>) {
128        match *self {
129            Command::Clean => {
130                vec.push(CMD_CLEAN);
131            }
132            Command::Compact { index } => {
133                vec.push(CMD_COMPACT);
134                vec.encode_var_u64(index).unwrap();
135            }
136        }
137    }
138
139    pub fn decode(buf: &mut SliceReader) -> Result<Command> {
140        let command_type = codec::read_u8(buf)?;
141        match command_type {
142            CMD_CLEAN => Ok(Command::Clean),
143            CMD_COMPACT => {
144                let index = codec::decode_var_u64(buf)?;
145                Ok(Command::Compact { index })
146            }
147            _ => Err(Error::Corruption(format!(
148                "Unrecognized command type: {command_type}"
149            ))),
150        }
151    }
152
153    fn approximate_size(&self) -> usize {
154        match &self {
155            Command::Clean => 1,              /* type */
156            Command::Compact { .. } => 1 + 8, /* type + index */
157        }
158    }
159}
160
161#[repr(u8)]
162#[derive(Debug, PartialEq, Eq, Copy, Clone)]
163pub enum OpType {
164    Put = 1,
165    Del = 2,
166}
167
168impl OpType {
169    pub fn from_u8(t: u8) -> Result<Self> {
170        if t <= OpType::Del as u8 {
171            Ok(unsafe { mem::transmute(t) })
172        } else {
173            Err(Error::Corruption(format!("Unrecognized op type: {t}")))
174        }
175    }
176
177    pub fn to_u8(self) -> u8 {
178        self as u8
179    }
180}
181
182// Format:
183// { op_type | key len | key | ( value len | value ) }
184#[derive(Clone, Debug, PartialEq, Eq)]
185pub struct KeyValue {
186    pub op_type: OpType,
187    pub key: Vec<u8>,
188    pub value: Option<Vec<u8>>,
189    pub file_id: Option<FileId>,
190}
191
192impl KeyValue {
193    pub fn new(op_type: OpType, key: Vec<u8>, value: Option<Vec<u8>>) -> KeyValue {
194        KeyValue {
195            op_type,
196            key,
197            value,
198            file_id: None,
199        }
200    }
201
202    pub fn decode(buf: &mut SliceReader) -> Result<KeyValue> {
203        let op_type = OpType::from_u8(codec::read_u8(buf)?)?;
204        let k_len = codec::decode_var_u64(buf)? as usize;
205        let key = &buf[..k_len];
206        buf.consume(k_len);
207        match op_type {
208            OpType::Put => {
209                let v_len = codec::decode_var_u64(buf)? as usize;
210                let value = &buf[..v_len];
211                buf.consume(v_len);
212                Ok(KeyValue::new(
213                    OpType::Put,
214                    key.to_vec(),
215                    Some(value.to_vec()),
216                ))
217            }
218            OpType::Del => Ok(KeyValue::new(OpType::Del, key.to_vec(), None)),
219        }
220    }
221
222    pub fn encode(&self, vec: &mut Vec<u8>) -> Result<()> {
223        vec.push(self.op_type.to_u8());
224        vec.encode_var_u64(self.key.len() as u64)?;
225        vec.extend_from_slice(self.key.as_slice());
226        match self.op_type {
227            OpType::Put => {
228                vec.encode_var_u64(self.value.as_ref().unwrap().len() as u64)?;
229                vec.extend_from_slice(self.value.as_ref().unwrap().as_slice());
230            }
231            OpType::Del => {}
232        }
233        Ok(())
234    }
235
236    fn approximate_size(&self) -> usize {
237        1 /*op*/ + 8 /*k_len*/ + self.key.len() + 8 /*v_len*/ + self.value.as_ref().map_or_else(|| 0, |v| v.len())
238    }
239}
240
241// Format:
242// { 8 byte region id | 1 byte type | item }
243#[derive(Clone, Debug, PartialEq, Eq)]
244pub struct LogItem {
245    pub raft_group_id: u64,
246    pub content: LogItemContent,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq)]
250pub enum LogItemContent {
251    EntryIndexes(EntryIndexes),
252    Command(Command),
253    Kv(KeyValue),
254}
255
256impl LogItem {
257    pub fn new_entry_indexes(raft_group_id: u64, entry_indexes: Vec<EntryIndex>) -> LogItem {
258        LogItem {
259            raft_group_id,
260            content: LogItemContent::EntryIndexes(EntryIndexes(entry_indexes)),
261        }
262    }
263
264    pub fn new_command(raft_group_id: u64, command: Command) -> LogItem {
265        LogItem {
266            raft_group_id,
267            content: LogItemContent::Command(command),
268        }
269    }
270
271    pub fn new_kv(
272        raft_group_id: u64,
273        op_type: OpType,
274        key: Vec<u8>,
275        value: Option<Vec<u8>>,
276    ) -> LogItem {
277        LogItem {
278            raft_group_id,
279            content: LogItemContent::Kv(KeyValue::new(op_type, key, value)),
280        }
281    }
282
283    pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
284        buf.encode_var_u64(self.raft_group_id)?;
285        match &self.content {
286            LogItemContent::EntryIndexes(entry_indexes) => {
287                buf.push(TYPE_ENTRIES);
288                entry_indexes.encode(buf)?;
289            }
290            LogItemContent::Command(command) => {
291                buf.push(TYPE_COMMAND);
292                command.encode(buf);
293            }
294            LogItemContent::Kv(kv) => {
295                buf.push(TYPE_KV);
296                kv.encode(buf)?;
297            }
298        }
299        Ok(())
300    }
301
302    pub fn decode(buf: &mut SliceReader, entries_size: &mut u32) -> Result<LogItem> {
303        let raft_group_id = codec::decode_var_u64(buf)?;
304        let item_type = buf.read_u8()?;
305        let content = match item_type {
306            TYPE_ENTRIES => {
307                let entry_indexes = EntryIndexes::decode(buf, entries_size)?;
308                LogItemContent::EntryIndexes(entry_indexes)
309            }
310            TYPE_COMMAND => {
311                let cmd = Command::decode(buf)?;
312                LogItemContent::Command(cmd)
313            }
314            TYPE_KV => {
315                let kv = KeyValue::decode(buf)?;
316                LogItemContent::Kv(kv)
317            }
318            _ => {
319                return Err(Error::Corruption(format!(
320                    "Unrecognized log item type: {item_type}"
321                )));
322            }
323        };
324        Ok(LogItem {
325            raft_group_id,
326            content,
327        })
328    }
329
330    fn approximate_size(&self) -> usize {
331        match &self.content {
332            LogItemContent::EntryIndexes(entry_indexes) => {
333                8 /*r_id*/ + 1 /*type*/ + entry_indexes.approximate_size()
334            }
335            LogItemContent::Command(cmd) => 8 + 1 + cmd.approximate_size(),
336            LogItemContent::Kv(kv) => 8 + 1 + kv.approximate_size(),
337        }
338    }
339}
340
341pub(crate) type LogItemDrain<'a> = std::vec::Drain<'a, LogItem>;
342
343/// A lean batch of log item, without entry data.
344// Format:
345// { item count | [items] | crc32 }
346#[derive(Clone, Debug, PartialEq, Eq)]
347pub struct LogItemBatch {
348    items: Vec<LogItem>,
349    item_size: usize,
350    entries_size: u32,
351    checksum: u32,
352}
353
354impl Default for LogItemBatch {
355    fn default() -> Self {
356        Self::with_capacity(0)
357    }
358}
359
360impl LogItemBatch {
361    pub fn with_capacity(cap: usize) -> Self {
362        Self {
363            items: Vec::with_capacity(cap),
364            item_size: 0,
365            entries_size: 0,
366            checksum: 0,
367        }
368    }
369
370    // TODO: Clean up these interfaces.
371    pub fn into_items(self) -> Vec<LogItem> {
372        self.items
373    }
374
375    pub fn iter(&self) -> std::slice::Iter<LogItem> {
376        self.items.iter()
377    }
378
379    pub fn drain(&mut self) -> LogItemDrain {
380        self.item_size = 0;
381        self.entries_size = 0;
382        self.checksum = 0;
383        self.items.drain(..)
384    }
385
386    pub fn merge(&mut self, rhs: &mut LogItemBatch) {
387        for item in &mut rhs.items {
388            if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content {
389                for ei in entry_indexes.0.iter_mut() {
390                    ei.entry_offset += self.entries_size;
391                }
392            }
393        }
394        self.item_size += rhs.item_size;
395        rhs.item_size = 0;
396        self.entries_size += rhs.entries_size;
397        rhs.entries_size = 0;
398        self.items.append(&mut rhs.items);
399    }
400
401    pub(crate) fn finish_populate(&mut self, compression_type: CompressionType) {
402        for item in self.items.iter_mut() {
403            if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content {
404                for ei in entry_indexes.0.iter_mut() {
405                    ei.compression_type = compression_type;
406                }
407            }
408        }
409    }
410
411    /// Prepare the `write` by signing a checksum, so-called `signature`,
412    /// into the encoded buffer corresponding to `LogBatch`.
413    ///
414    /// The `signature` is both generated by the given `LogFileContext`.
415    /// That is, the final checksum of each `LogBatch` consists of this
416    /// `signature` and the original `checksum` of the contents.
417    pub(crate) fn prepare_write(
418        &self,
419        buf: &mut [u8],
420        file_context: &LogFileContext,
421    ) -> Result<()> {
422        if !buf.is_empty() {
423            let mut footer_checksum = self.checksum;
424            // Fill the placeholder (checksum) in `LogItemBatch` by inserting the
425            // signature into the encoded bytes.
426            let footer_checksum_offset = buf.len() - LOG_BATCH_CHECKSUM_LEN;
427            if let Some(signature) = file_context.get_signature() {
428                // The final checksum is generated by `signature` ***XOR***
429                // `original checksum of buf`.
430                footer_checksum ^= signature;
431            }
432            (&mut buf[footer_checksum_offset..]).write_u32::<LittleEndian>(footer_checksum)?;
433        }
434        Ok(())
435    }
436
437    pub(crate) fn finish_write(&mut self, handle: FileBlockHandle) {
438        for item in self.items.iter_mut() {
439            match &mut item.content {
440                LogItemContent::EntryIndexes(entry_indexes) => {
441                    for ei in entry_indexes.0.iter_mut() {
442                        // No assert!(ei.entries.is_none):
443                        // It's possible that batch containing rewritten index already
444                        // has entries location.
445                        ei.entries = Some(handle);
446                    }
447                }
448                LogItemContent::Kv(kv) => {
449                    debug_assert!(kv.file_id.is_none());
450                    kv.file_id = Some(handle.id);
451                }
452                _ => {}
453            }
454        }
455    }
456
457    pub fn add_entry_indexes(&mut self, region_id: u64, mut entry_indexes: Vec<EntryIndex>) {
458        for ei in entry_indexes.iter_mut() {
459            ei.entry_offset = self.entries_size;
460            self.entries_size += ei.entry_len;
461        }
462        let item = LogItem::new_entry_indexes(region_id, entry_indexes);
463        self.item_size += item.approximate_size();
464        self.items.push(item);
465    }
466
467    pub fn add_command(&mut self, region_id: u64, cmd: Command) {
468        let item = LogItem::new_command(region_id, cmd);
469        self.item_size += item.approximate_size();
470        self.items.push(item);
471    }
472
473    pub fn delete(&mut self, region_id: u64, key: Vec<u8>) {
474        let item = LogItem::new_kv(region_id, OpType::Del, key, None);
475        self.item_size += item.approximate_size();
476        self.items.push(item);
477    }
478
479    pub fn put_message<S: Message>(&mut self, region_id: u64, key: Vec<u8>, s: &S) -> Result<()> {
480        self.put(region_id, key, s.write_to_bytes()?);
481        Ok(())
482    }
483
484    pub fn put(&mut self, region_id: u64, key: Vec<u8>, value: Vec<u8>) {
485        let item = LogItem::new_kv(region_id, OpType::Put, key, Some(value));
486        self.item_size += item.approximate_size();
487        self.items.push(item);
488    }
489
490    pub fn encode(&mut self, buf: &mut Vec<u8>) -> Result<()> {
491        let offset = buf.len();
492        let count = self.items.len() as u64;
493        buf.encode_var_u64(count)?;
494        for item in self.items.iter() {
495            item.encode(buf)?;
496        }
497        self.checksum = crc32(&buf[offset..]);
498        // Just leave a placeholder for the final checksum, which will be filled
499        // by later `prepare_write(...)` progress.
500        buf.encode_u32_le(0)?;
501        Ok(())
502    }
503
504    /// Decodes a `LogItemBatch` from bytes of footer. `entries` is the block
505    /// location of encoded entries.
506    pub fn decode(
507        buf: &mut SliceReader,
508        entries: FileBlockHandle,
509        compression_type: CompressionType,
510        file_context: &LogFileContext,
511    ) -> Result<LogItemBatch> {
512        // Validate the checksum of each LogItemBatch by the signature.
513        let checksum = verify_checksum_with_signature(buf, file_context.get_signature())?;
514        *buf = &buf[..buf.len() - LOG_BATCH_CHECKSUM_LEN];
515        let count = codec::decode_var_u64(buf)?;
516        let mut items = LogItemBatch::with_capacity(count as usize);
517        let mut entries_size = 0;
518        for _ in 0..count {
519            let item = LogItem::decode(buf, &mut entries_size)?;
520            items.item_size += item.approximate_size();
521            items.items.push(item);
522        }
523        items.entries_size = entries_size;
524
525        for item in items.items.iter_mut() {
526            if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content {
527                for ei in entry_indexes.0.iter_mut() {
528                    ei.compression_type = compression_type;
529                    ei.entries = Some(entries);
530                }
531            } else if let LogItemContent::Kv(kv) = &mut item.content {
532                kv.file_id = Some(entries.id);
533            }
534        }
535        items.checksum = checksum;
536        Ok(items)
537    }
538
539    pub fn approximate_size(&self) -> usize {
540        8 /*count*/ + self.item_size + LOG_BATCH_CHECKSUM_LEN
541    }
542
543    /// Returns the first [`EntryIndex`] appeared in this batch.
544    pub fn entry_index(&self) -> Option<EntryIndex> {
545        for item in &self.items {
546            if let LogItemContent::EntryIndexes(entry_indexes) = &item.content {
547                return entry_indexes.0.first().cloned();
548            }
549        }
550        None
551    }
552}
553
554#[derive(Copy, Clone, Debug, PartialEq, Eq)]
555enum BufState {
556    /// Buffer contains header and optionally entries.
557    /// # Invariants
558    /// LOG_BATCH_HEADER_LEN <= buf.len()
559    Open,
560    /// Buffer contains header, entries and footer; ready to be written. The
561    /// footer may be signed with extra information depending on the format
562    /// version.
563    /// # Content
564    /// (header_offset, entries_len)
565    Encoded(usize, usize),
566    /// Buffer contains header, entries and footer; ready to be written. This
567    /// state only briefly exists between encoding and writing, user operation
568    /// will panic under this state.
569    /// # Content
570    /// (header_offset, entries_len)
571    /// # Invariants
572    /// LOG_BATCH_HEADER_LEN <= buf.len()
573    Sealed(usize, usize),
574    /// Buffer is undergoing writes. User operation will panic under this state.
575    Incomplete,
576}
577
578/// A batch of log items.
579///
580/// Encoding format:
581/// - header = { u56 len | u8 compression type | u64 item offset }
582/// - entries = { [entry..] (optionally compressed) | crc32 }
583/// - footer = { item batch }
584///
585/// Size restriction:
586/// - The total size of log entries must not exceed 2GiB.
587///
588/// Error will be raised if a to-be-added log item cannot fit within those
589/// limits.
590// Calling order:
591// 1. Insert some log items
592// 2. [`finish_populate`]
593// 3. Write to disk with [`encoded_bytes`]
594// 4. Update disk location with [`finish_write`]
595// 5. Clean up the memory states with [`drain`]. Step 4 can be skipped if the states are not used
596// (e.g. to apply memtable).
597#[derive(Clone, PartialEq, Eq, Debug)]
598pub struct LogBatch {
599    item_batch: LogItemBatch,
600    buf_state: BufState,
601    buf: Vec<u8>,
602}
603
604impl Default for LogBatch {
605    fn default() -> Self {
606        Self::with_capacity(DEFAULT_LOG_ITEM_BATCH_CAP)
607    }
608}
609
610impl LogBatch {
611    /// Creates a new, empty log batch capable of holding at least `cap` log
612    /// items.
613    pub fn with_capacity(cap: usize) -> Self {
614        let mut buf = Vec::with_capacity(4096);
615        buf.resize(LOG_BATCH_HEADER_LEN, 0);
616        Self {
617            item_batch: LogItemBatch::with_capacity(cap),
618            buf_state: BufState::Open,
619            buf,
620        }
621    }
622
623    /// Moves all log items of `rhs` into `Self`, leaving `rhs` empty.
624    pub fn merge(&mut self, rhs: &mut Self) -> Result<()> {
625        debug_assert!(self.buf_state == BufState::Open && rhs.buf_state == BufState::Open);
626        let max_entries_size = (|| {
627            fail::fail_point!("log_batch::1kb_entries_size_per_batch", |_| 1024);
628            MAX_LOG_ENTRIES_SIZE_PER_BATCH
629        })();
630        if !rhs.buf.is_empty() {
631            if rhs.buf.len() + self.buf.len() > max_entries_size + LOG_BATCH_HEADER_LEN * 2 {
632                return Err(Error::Full);
633            }
634            self.buf_state = BufState::Incomplete;
635            rhs.buf_state = BufState::Incomplete;
636            self.buf.extend_from_slice(&rhs.buf[LOG_BATCH_HEADER_LEN..]);
637            rhs.buf.shrink_to(MAX_LOG_BATCH_BUFFER_CAP);
638            rhs.buf.truncate(LOG_BATCH_HEADER_LEN);
639        }
640        self.item_batch.merge(&mut rhs.item_batch);
641        self.buf_state = BufState::Open;
642        rhs.buf_state = BufState::Open;
643        Ok(())
644    }
645
646    /// Adds some protobuf log entries into the log batch.
647    pub fn add_entries<M: MessageExt>(
648        &mut self,
649        region_id: u64,
650        entries: &[M::Entry],
651    ) -> Result<()> {
652        debug_assert!(self.buf_state == BufState::Open);
653        if entries.is_empty() {
654            return Ok(());
655        }
656
657        let mut entry_indexes = Vec::with_capacity(entries.len());
658        self.buf_state = BufState::Incomplete;
659        let old_buf_len = self.buf.len();
660        let max_entries_size = (|| {
661            fail::fail_point!("log_batch::1kb_entries_size_per_batch", |_| 1024);
662            MAX_LOG_ENTRIES_SIZE_PER_BATCH
663        })();
664        for e in entries {
665            let buf_offset = self.buf.len();
666            e.write_to_vec(&mut self.buf)?;
667            if self.buf.len() > max_entries_size + LOG_BATCH_HEADER_LEN {
668                self.buf.truncate(old_buf_len);
669                self.buf_state = BufState::Open;
670                return Err(Error::Full);
671            }
672            entry_indexes.push(EntryIndex {
673                index: M::index(e),
674                entry_len: (self.buf.len() - buf_offset) as u32,
675                ..Default::default()
676            });
677        }
678        self.item_batch.add_entry_indexes(region_id, entry_indexes);
679        self.buf_state = BufState::Open;
680        Ok(())
681    }
682
683    /// Adds some log entries with specified encoded data into the log batch.
684    /// Assumes there are the same amount of entry indexes as the encoded data
685    /// vectors.
686    pub(crate) fn add_raw_entries(
687        &mut self,
688        region_id: u64,
689        mut entry_indexes: Vec<EntryIndex>,
690        entries: Vec<Vec<u8>>,
691    ) -> Result<()> {
692        debug_assert!(entry_indexes.len() == entries.len());
693        debug_assert!(self.buf_state == BufState::Open);
694        if entry_indexes.is_empty() {
695            return Ok(());
696        }
697
698        self.buf_state = BufState::Incomplete;
699        let old_buf_len = self.buf.len();
700        let max_entries_size = (|| {
701            fail::fail_point!("log_batch::1kb_entries_size_per_batch", |_| 1024);
702            MAX_LOG_ENTRIES_SIZE_PER_BATCH
703        })();
704        for (ei, e) in entry_indexes.iter_mut().zip(entries.iter()) {
705            if e.len() + self.buf.len() > max_entries_size + LOG_BATCH_HEADER_LEN {
706                self.buf.truncate(old_buf_len);
707                self.buf_state = BufState::Open;
708                return Err(Error::Full);
709            }
710            let buf_offset = self.buf.len();
711            self.buf.extend(e);
712            ei.entry_len = (self.buf.len() - buf_offset) as u32;
713        }
714        self.item_batch.add_entry_indexes(region_id, entry_indexes);
715        self.buf_state = BufState::Open;
716        Ok(())
717    }
718
719    /// Adds a command into the log batch.
720    pub fn add_command(&mut self, region_id: u64, cmd: Command) {
721        self.item_batch.add_command(region_id, cmd);
722    }
723
724    /// Removes a key value pair from the log batch.
725    pub fn delete(&mut self, region_id: u64, key: Vec<u8>) {
726        self.item_batch.delete(region_id, key);
727    }
728
729    /// Adds a protobuf key value pair into the log batch.
730    pub fn put_message<S: Message>(&mut self, region_id: u64, key: Vec<u8>, s: &S) -> Result<()> {
731        if crate::is_internal_key(&key, None) {
732            return Err(Error::InvalidArgument(format!(
733                "key prefix `{:?}` reserved for internal use",
734                crate::INTERNAL_KEY_PREFIX
735            )));
736        }
737        self.item_batch.put_message(region_id, key, s)
738    }
739
740    /// Adds a key value pair into the log batch.
741    pub fn put(&mut self, region_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
742        if crate::is_internal_key(&key, None) {
743            return Err(Error::InvalidArgument(format!(
744                "key prefix `{:?}` reserved for internal use",
745                crate::INTERNAL_KEY_PREFIX
746            )));
747        }
748        self.item_batch.put(region_id, key, value);
749        Ok(())
750    }
751
752    pub(crate) fn put_unchecked(&mut self, region_id: u64, key: Vec<u8>, value: Vec<u8>) {
753        self.item_batch.put(region_id, key, value);
754    }
755
756    /// Returns true if the log batch contains no log item.
757    pub fn is_empty(&self) -> bool {
758        self.item_batch.items.is_empty()
759    }
760
761    /// Notifies the completion of log item population. User must not add any
762    /// more log content after this call. Returns the length of encoded data.
763    ///
764    /// Internally, encodes and optionally compresses log entries. Sets the
765    /// compression type to each entry index.
766    pub(crate) fn finish_populate(
767        &mut self,
768        compression_threshold: usize,
769        compression_level: Option<usize>,
770    ) -> Result<usize> {
771        let _t = StopWatch::new(perf_context!(log_populating_duration));
772        debug_assert!(self.buf_state == BufState::Open);
773        if self.is_empty() {
774            self.buf_state = BufState::Encoded(self.buf.len(), 0);
775            return Ok(0);
776        }
777        self.buf_state = BufState::Incomplete;
778
779        // entries
780        let (header_offset, compression_type) = if compression_threshold > 0
781            && self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold
782        {
783            let buf_len = self.buf.len();
784            lz4::append_compress_block(
785                &mut self.buf,
786                LOG_BATCH_HEADER_LEN,
787                compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL),
788            )?;
789            (buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4)
790        } else {
791            (0, CompressionType::None)
792        };
793
794        // checksum
795        if self.buf.len() > header_offset + LOG_BATCH_HEADER_LEN {
796            let checksum = crc32(&self.buf[header_offset + LOG_BATCH_HEADER_LEN..]);
797            self.buf.encode_u32_le(checksum)?;
798        }
799        // `footer_roffset` records the start offset of encoded `self.item_batch`
800        let footer_roffset = self.buf.len() - header_offset;
801
802        // footer
803        self.item_batch.encode(&mut self.buf)?;
804        self.item_batch.finish_populate(compression_type);
805
806        // header
807        let len =
808            (((self.buf.len() - header_offset) as u64) << 8) | u64::from(compression_type.to_u8());
809        (&mut self.buf[header_offset..header_offset + 8]).write_u64::<BigEndian>(len)?;
810        (&mut self.buf[header_offset + 8..header_offset + 16])
811            .write_u64::<BigEndian>(footer_roffset as u64)?;
812
813        #[cfg(feature = "failpoints")]
814        {
815            let corrupted_items = || {
816                fail::fail_point!("log_batch::corrupted_items", |_| true);
817                false
818            };
819            if corrupted_items() {
820                self.buf[footer_roffset] += 1;
821            }
822            let corrupted_entries = || {
823                fail::fail_point!("log_batch::corrupted_entries", |_| true);
824                false
825            };
826            if corrupted_entries() {
827                assert!(footer_roffset > LOG_BATCH_HEADER_LEN);
828                self.buf[footer_roffset - 1] += 1;
829            }
830        }
831
832        self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
833        Ok(self.buf.len() - header_offset)
834    }
835
836    /// Make preparations for the write of `LogBatch`.
837    #[inline]
838    pub(crate) fn prepare_write(&mut self, file_context: &LogFileContext) -> Result<()> {
839        match self.buf_state {
840            // `BufState::Sealed` means that `LogBatch` is under a repeated state of dumping.
841            BufState::Encoded(header_offset, entries_len)
842            | BufState::Sealed(header_offset, entries_len) => {
843                self.item_batch
844                    .prepare_write(&mut self.buf[header_offset + entries_len..], file_context)?;
845                self.buf_state = BufState::Sealed(header_offset, entries_len);
846            }
847            _ => unreachable!(),
848        }
849        Ok(())
850    }
851
852    /// Returns a slice of bytes containing encoded data of this log batch.
853    /// Assumes called after a successful call of [`prepare_write`].
854    pub(crate) fn encoded_bytes(&self) -> &[u8] {
855        match self.buf_state {
856            BufState::Sealed(header_offset, _) => &self.buf[header_offset..],
857            _ => unreachable!(),
858        }
859    }
860
861    /// Notifies the completion of a storage write with the written location.
862    ///
863    /// Internally sets the file locations of each log entry indexes.
864    pub(crate) fn finish_write(&mut self, mut handle: FileBlockHandle) {
865        debug_assert!(matches!(self.buf_state, BufState::Sealed(_, _)));
866        if !self.is_empty() {
867            // adjust log batch handle to log entries handle.
868            handle.offset += LOG_BATCH_HEADER_LEN as u64;
869            match self.buf_state {
870                BufState::Sealed(_, entries_len) => {
871                    debug_assert!(LOG_BATCH_HEADER_LEN + entries_len < handle.len);
872                    handle.len = entries_len;
873                }
874                _ => unreachable!(),
875            }
876        }
877        self.item_batch.finish_write(handle);
878    }
879
880    /// Consumes log items into an iterator.
881    pub(crate) fn drain(&mut self) -> LogItemDrain {
882        debug_assert!(!matches!(self.buf_state, BufState::Incomplete));
883
884        self.buf.shrink_to(MAX_LOG_BATCH_BUFFER_CAP);
885        self.buf.truncate(LOG_BATCH_HEADER_LEN);
886        self.buf_state = BufState::Open;
887        self.item_batch.drain()
888    }
889
890    /// Returns approximate encoded size of this log batch. Might be larger
891    /// than the actual size.
892    pub fn approximate_size(&self) -> usize {
893        if self.is_empty() {
894            0
895        } else {
896            match self.buf_state {
897                BufState::Open => {
898                    self.buf.len() + LOG_BATCH_CHECKSUM_LEN + self.item_batch.approximate_size()
899                }
900                BufState::Encoded(header_offset, _) => self.buf.len() - header_offset,
901                BufState::Sealed(header_offset, _) => self.buf.len() - header_offset,
902                s => {
903                    error!("querying incomplete log batch with state {s:?}");
904                    0
905                }
906            }
907        }
908    }
909
910    /// Returns header information from some bytes.
911    ///
912    /// The information includes:
913    ///
914    /// + The offset of log items
915    /// + The compression type of entries
916    /// + The total length of this log batch.
917    pub(crate) fn decode_header(buf: &mut SliceReader) -> Result<(usize, CompressionType, usize)> {
918        if buf.len() < LOG_BATCH_HEADER_LEN {
919            return Err(Error::Corruption(format!(
920                "Log batch header too short: {}",
921                buf.len()
922            )));
923        }
924
925        let len_and_type = codec::decode_u64(buf)? as usize;
926        let compression_type = CompressionType::from_u8(len_and_type as u8)?;
927        let len = len_and_type >> 8;
928        let offset = codec::decode_u64(buf)? as usize;
929        if offset > len {
930            return Err(Error::Corruption(
931                "Log item offset exceeds log batch length".to_owned(),
932            ));
933        } else if offset < LOG_BATCH_HEADER_LEN {
934            return Err(Error::Corruption(
935                "Log item offset is smaller than log batch header length".to_owned(),
936            ));
937        }
938        Ok((offset, compression_type, len))
939    }
940
941    /// Unfolds bytes of multiple user entries from an encoded block.
942    pub(crate) fn decode_entries_block(
943        buf: &[u8],
944        handle: FileBlockHandle,
945        compression: CompressionType,
946    ) -> Result<Vec<u8>> {
947        if handle.len > 0 {
948            let _ = verify_checksum_with_signature(&buf[0..handle.len], None)?;
949            match compression {
950                CompressionType::None => Ok(buf[..handle.len - LOG_BATCH_CHECKSUM_LEN].to_owned()),
951                CompressionType::Lz4 => {
952                    let decompressed =
953                        lz4::decompress_block(&buf[..handle.len - LOG_BATCH_CHECKSUM_LEN])?;
954                    Ok(decompressed)
955                }
956            }
957        } else {
958            Ok(Vec::new())
959        }
960    }
961}
962
963impl ReactiveBytes for LogBatch {
964    fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8] {
965        self.prepare_write(ctx).unwrap();
966        self.encoded_bytes()
967    }
968}
969
970/// Verifies the checksum of a slice of bytes that sequentially holds data and
971/// checksum. The checksum field may be signed by XOR-ing with an u32.
972///
973/// Returns the checksum of the buffer without signature.
974fn verify_checksum_with_signature(buf: &[u8], signature: Option<u32>) -> Result<u32> {
975    if buf.len() <= LOG_BATCH_CHECKSUM_LEN {
976        return Err(Error::Corruption(format!(
977            "Content too short {}",
978            buf.len()
979        )));
980    }
981    let actual = crc32(&buf[..buf.len() - LOG_BATCH_CHECKSUM_LEN]);
982    let mut expected = codec::decode_u32_le(&mut &buf[buf.len() - LOG_BATCH_CHECKSUM_LEN..])?;
983    if let Some(signature) = signature {
984        expected ^= signature;
985    }
986    if actual != expected {
987        return Err(Error::Corruption(format!(
988            "Checksum expected {expected} but got {actual}"
989        )));
990    }
991    Ok(actual)
992}
993
994lazy_static! {
995    static ref ATOMIC_GROUP_ID: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
996}
997pub(crate) const ATOMIC_GROUP_KEY: &[u8] = &[0x01];
998// <status>
999const ATOMIC_GROUP_VALUE_LEN: usize = 1;
1000
1001#[repr(u8)]
1002#[derive(Clone, Copy, FromPrimitive, Debug, PartialEq)]
1003pub(crate) enum AtomicGroupStatus {
1004    Begin = 0,
1005    Middle = 1,
1006    End = 2,
1007}
1008
1009impl AtomicGroupStatus {
1010    /// Whether the log batch with `item` belongs to an atomic group.
1011    pub fn parse(item: &LogItem) -> Option<(u64, AtomicGroupStatus)> {
1012        if let LogItemContent::Kv(KeyValue {
1013            op_type,
1014            key,
1015            value,
1016            ..
1017        }) = &item.content
1018        {
1019            if *op_type == OpType::Put
1020                && crate::is_internal_key(key, Some(ATOMIC_GROUP_KEY))
1021                && value.as_ref().unwrap().len() == ATOMIC_GROUP_VALUE_LEN
1022            {
1023                let value = &mut value.as_ref().unwrap().as_slice();
1024                return Some((
1025                    item.raft_group_id,
1026                    AtomicGroupStatus::from_u8(value[0]).unwrap(),
1027                ));
1028            }
1029        }
1030        None
1031    }
1032}
1033
1034/// Group multiple log batches as an atomic operation.
1035///
1036/// Caveats:
1037/// (1) The atomicity is provided at persistent level. This means, once an
1038/// atomic group fails, the in-memory value will be inconsistent with what can
1039/// be recovered from on-disk data files.
1040/// (2) The recovery replay order will be different from original write order.
1041/// Log batches in a completed atomic group will be replayed as if they were
1042/// written together at an arbitary time point within the group.
1043/// (3) Atomic group is implemented by embedding normal key-values into user
1044/// writes. These keys have internal key prefix and will not be replayed into
1045/// memtable. However, when read by an older version, they will behave as user
1046/// keys. They may also belong to Raft Group that doesn't exist before.
1047///
1048/// In practice, we only use atomic group for rewrite operation. (In fact,
1049/// atomic group markers in append queue are simply ignored.) Rewrite doesn't
1050/// change the value of entries, just locations. So first issue doesn't affect
1051/// correctness. There could only be one worker doing the rewrite. So second
1052/// issue doesn't change observed write order because there's no mixed write.
1053pub(crate) struct AtomicGroupBuilder {
1054    id: u64,
1055    status: Option<AtomicGroupStatus>,
1056}
1057
1058impl Default for AtomicGroupBuilder {
1059    fn default() -> Self {
1060        Self {
1061            // We only care there's no collision between concurrent groups.
1062            id: ATOMIC_GROUP_ID.fetch_add(1, Ordering::Relaxed),
1063            status: None,
1064        }
1065    }
1066}
1067
1068impl AtomicGroupBuilder {
1069    /// Each log batch can only carry one atomic group marker. If multiple are
1070    /// present only the first is recognized.
1071    pub fn begin(&mut self, lb: &mut LogBatch) {
1072        fail::fail_point!("atomic_group::begin");
1073        assert_eq!(self.status, None);
1074        self.status = Some(AtomicGroupStatus::Begin);
1075        self.flush(lb);
1076    }
1077
1078    pub fn add(&mut self, lb: &mut LogBatch) {
1079        fail::fail_point!("atomic_group::add");
1080        assert!(matches!(
1081            self.status,
1082            Some(AtomicGroupStatus::Begin | AtomicGroupStatus::Middle)
1083        ));
1084        self.status = Some(AtomicGroupStatus::Middle);
1085        self.flush(lb);
1086    }
1087
1088    pub fn end(&mut self, lb: &mut LogBatch) {
1089        assert!(matches!(
1090            self.status,
1091            Some(AtomicGroupStatus::Begin | AtomicGroupStatus::Middle)
1092        ));
1093        self.status = Some(AtomicGroupStatus::End);
1094        self.flush(lb);
1095    }
1096
1097    #[inline]
1098    fn flush(&self, lb: &mut LogBatch) {
1099        let mut s = Vec::with_capacity(ATOMIC_GROUP_VALUE_LEN);
1100        s.push(self.status.unwrap() as u8);
1101        lb.put_unchecked(self.id, crate::make_internal_key(ATOMIC_GROUP_KEY), s);
1102    }
1103
1104    #[cfg(test)]
1105    pub fn with_id(id: u64) -> Self {
1106        Self { id, status: None }
1107    }
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112    use super::*;
1113    use crate::pipe_log::{LogQueue, Version};
1114    use crate::test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt};
1115    use protobuf::parse_from_bytes;
1116    use raft::eraftpb::Entry;
1117    use strum::IntoEnumIterator;
1118
1119    fn decode_entries_from_bytes<M: MessageExt>(
1120        buf: &[u8],
1121        entry_indexes: &[EntryIndex],
1122        _encoded: bool,
1123    ) -> Vec<M::Entry> {
1124        let mut entries = Vec::with_capacity(entry_indexes.len());
1125        for ei in entry_indexes {
1126            let block =
1127                LogBatch::decode_entries_block(buf, ei.entries.unwrap(), ei.compression_type)
1128                    .unwrap();
1129            entries.push(
1130                parse_from_bytes(
1131                    &block[ei.entry_offset as usize..(ei.entry_offset + ei.entry_len) as usize],
1132                )
1133                .unwrap(),
1134            );
1135        }
1136        entries
1137    }
1138
1139    #[test]
1140    fn test_entry_indexes_enc_dec() {
1141        fn encode_and_decode(entry_indexes: &mut [EntryIndex]) -> EntryIndexes {
1142            let mut entries_size = 0;
1143            for idx in entry_indexes.iter_mut() {
1144                idx.entry_offset = entries_size;
1145                entries_size += idx.entry_len;
1146            }
1147            let entry_indexes = EntryIndexes(entry_indexes.to_vec());
1148
1149            let mut encoded = vec![];
1150            entry_indexes.encode(&mut encoded).unwrap();
1151            let mut bytes_slice = encoded.as_slice();
1152            let mut decoded_entries_size = 0;
1153            let decoded_indexes =
1154                EntryIndexes::decode(&mut bytes_slice, &mut decoded_entries_size).unwrap();
1155            assert_eq!(bytes_slice.len(), 0);
1156            assert!(decoded_indexes.approximate_size() >= encoded.len());
1157            assert_eq!(decoded_entries_size, entries_size);
1158            decoded_indexes
1159        }
1160
1161        let entry_indexes = vec![Vec::new(), generate_entry_indexes_opt(7, 17, None)];
1162        for mut idxs in entry_indexes.into_iter() {
1163            let decoded = encode_and_decode(&mut idxs);
1164            assert_eq!(idxs, decoded.0);
1165        }
1166
1167        let mut entry_indexes_with_file_id =
1168            generate_entry_indexes_opt(7, 17, Some(FileId::new(LogQueue::Append, 7)));
1169        let mut decoded = encode_and_decode(&mut entry_indexes_with_file_id);
1170        assert_ne!(entry_indexes_with_file_id, decoded.0);
1171        for i in decoded.0.iter_mut() {
1172            i.entries = None;
1173        }
1174        assert_ne!(entry_indexes_with_file_id, decoded.0);
1175    }
1176
1177    #[test]
1178    fn test_command_enc_dec() {
1179        let cmds = vec![Command::Clean, Command::Compact { index: 7 }];
1180        let invalid_command_type = 7;
1181        for cmd in cmds.into_iter() {
1182            let mut encoded = vec![];
1183            cmd.encode(&mut encoded);
1184            let mut bytes_slice = encoded.as_slice();
1185            let decoded_cmd = Command::decode(&mut bytes_slice).unwrap();
1186            assert_eq!(bytes_slice.len(), 0);
1187            assert!(decoded_cmd.approximate_size() >= encoded.len());
1188            assert_eq!(cmd, decoded_cmd);
1189
1190            encoded[0] = invalid_command_type;
1191            let expected = format!("Unrecognized command type: {invalid_command_type}");
1192            assert!(matches!(
1193                Command::decode(&mut encoded.as_slice()),
1194                Err(Error::Corruption(m)) if m == expected
1195            ));
1196        }
1197    }
1198
1199    #[test]
1200    fn test_kv_enc_dec() {
1201        let kvs = vec![
1202            KeyValue::new(OpType::Put, b"put".to_vec(), Some(b"put_v".to_vec())),
1203            KeyValue::new(OpType::Del, b"del".to_vec(), None),
1204        ];
1205        let invalid_op_type = 7;
1206        for kv in kvs.into_iter() {
1207            let mut encoded = vec![];
1208            kv.encode(&mut encoded).unwrap();
1209            let mut bytes_slice = encoded.as_slice();
1210            let decoded_kv = KeyValue::decode(&mut bytes_slice).unwrap();
1211            assert_eq!(bytes_slice.len(), 0);
1212            assert!(decoded_kv.approximate_size() >= encoded.len());
1213            assert_eq!(kv, decoded_kv);
1214
1215            encoded[0] = invalid_op_type;
1216            let expected = format!("Unrecognized op type: {invalid_op_type}");
1217            assert!(matches!(
1218                KeyValue::decode(&mut encoded.as_slice()),
1219                Err(Error::Corruption(m)) if m == expected
1220            ));
1221        }
1222
1223        let del_with_value = KeyValue::new(OpType::Del, b"del".to_vec(), Some(b"del_v".to_vec()));
1224        let mut encoded = vec![];
1225        del_with_value.encode(&mut encoded).unwrap();
1226        let mut bytes_slice = encoded.as_slice();
1227        let decoded_kv = KeyValue::decode(&mut bytes_slice).unwrap();
1228        assert_eq!(bytes_slice.len(), 0);
1229        assert!(decoded_kv.value.is_none());
1230    }
1231
1232    #[test]
1233    fn test_log_item_enc_dec() {
1234        let items = vec![
1235            LogItem::new_entry_indexes(7, generate_entry_indexes_opt(7, 17, None)),
1236            LogItem::new_command(17, Command::Compact { index: 7 }),
1237            LogItem::new_kv(27, OpType::Put, b"key".to_vec(), Some(b"value".to_vec())),
1238        ];
1239        let invalid_log_item_type = 7;
1240        for mut item in items.into_iter() {
1241            let mut entries_size = 0;
1242            if let LogItemContent::EntryIndexes(EntryIndexes(indexes)) = &mut item.content {
1243                for index in indexes.iter_mut() {
1244                    index.entry_offset = entries_size;
1245                    entries_size += index.entry_len;
1246                }
1247            }
1248            let mut encoded = vec![];
1249            item.encode(&mut encoded).unwrap();
1250            let mut bytes_slice = encoded.as_slice();
1251            let mut decoded_entries_size = 0;
1252            let decoded_item =
1253                LogItem::decode(&mut bytes_slice, &mut decoded_entries_size).unwrap();
1254            assert_eq!(bytes_slice.len(), 0);
1255            assert_eq!(decoded_entries_size, entries_size);
1256            assert!(decoded_item.approximate_size() >= encoded.len());
1257            assert_eq!(item, decoded_item);
1258
1259            // consume raft group id.
1260            bytes_slice = encoded.as_slice();
1261            codec::decode_var_u64(&mut bytes_slice).unwrap();
1262            let next_u8 = encoded.len() - bytes_slice.len();
1263            encoded[next_u8] = invalid_log_item_type;
1264            let expected = format!("Unrecognized log item type: {invalid_log_item_type}");
1265            assert!(matches!(
1266                LogItem::decode(&mut encoded.as_slice(), &mut decoded_entries_size),
1267                Err(Error::Corruption(m)) if m == expected
1268            ));
1269        }
1270    }
1271
1272    #[test]
1273    fn test_log_item_batch_enc_dec() {
1274        let mut batches = vec![LogItemBatch::default()];
1275        let mut batch = LogItemBatch::default();
1276        batch.add_entry_indexes(7, generate_entry_indexes_opt(1, 5, None /* file_id */));
1277        batch.add_entry_indexes(
1278            7 + 100,
1279            generate_entry_indexes_opt(100, 105, None /* file_id */),
1280        );
1281        batch.add_command(7, Command::Clean);
1282        batch.put(7, b"key".to_vec(), b"value".to_vec());
1283        batch.delete(7, b"key2".to_vec());
1284        batches.push(batch);
1285
1286        for batch in batches.into_iter() {
1287            for compression_type in [CompressionType::Lz4, CompressionType::None] {
1288                let mut batch = batch.clone();
1289                batch.finish_populate(compression_type);
1290                let mut encoded_batch = vec![];
1291                batch.encode(&mut encoded_batch).unwrap();
1292                let file_context =
1293                    LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default());
1294                batch
1295                    .prepare_write(&mut encoded_batch, &file_context)
1296                    .unwrap();
1297                batch.finish_write(FileBlockHandle::dummy(LogQueue::Append));
1298                let decoded_batch = LogItemBatch::decode(
1299                    &mut encoded_batch.as_slice(),
1300                    FileBlockHandle::dummy(LogQueue::Append),
1301                    compression_type,
1302                    &file_context,
1303                )
1304                .unwrap();
1305                assert!(decoded_batch.approximate_size() >= encoded_batch.len());
1306                assert_eq!(batch, decoded_batch);
1307            }
1308        }
1309    }
1310
1311    #[test]
1312    fn test_log_batch_enc_dec() {
1313        fn decode_and_encode(
1314            mut batch: LogBatch,
1315            compress: bool,
1316            version: Version,
1317            entry_data: &[u8],
1318        ) {
1319            // Test call protocol violation.
1320            assert!(catch_unwind_silent(|| batch.encoded_bytes()).is_err());
1321            assert!(catch_unwind_silent(
1322                || batch.finish_write(FileBlockHandle::dummy(LogQueue::Append))
1323            )
1324            .is_err());
1325            let mocked_file_block_handle = FileBlockHandle {
1326                id: FileId::new(LogQueue::Append, 12),
1327                len: 0,
1328                offset: 0,
1329            };
1330            let old_approximate_size = batch.approximate_size();
1331            let len = batch.finish_populate(usize::from(compress), None).unwrap();
1332            assert!(old_approximate_size >= len);
1333            assert_eq!(batch.approximate_size(), len);
1334            let mut batch_handle = mocked_file_block_handle;
1335            batch_handle.len = len;
1336            let file_context = LogFileContext::new(batch_handle.id, version);
1337            batch.prepare_write(&file_context).unwrap();
1338            batch.finish_write(batch_handle);
1339            let encoded = batch.encoded_bytes();
1340            assert_eq!(encoded.len(), len);
1341            if len < LOG_BATCH_HEADER_LEN {
1342                assert_eq!(len, 0);
1343                let expected = "Log batch header too short: 0";
1344                assert!(matches!(
1345                    LogBatch::decode_header(&mut &*encoded),
1346                    Err(Error::Corruption(m)) if m == expected
1347                ));
1348                return;
1349            }
1350
1351            let item_batch = batch.item_batch.clone();
1352            // decode item batch
1353            let mut bytes_slice = encoded;
1354            let (offset, compression_type, len) =
1355                LogBatch::decode_header(&mut bytes_slice).unwrap();
1356            assert_eq!(len, encoded.len());
1357            assert_eq!(bytes_slice.len() + LOG_BATCH_HEADER_LEN, encoded.len());
1358            let mut entries_handle = mocked_file_block_handle;
1359            entries_handle.offset = LOG_BATCH_HEADER_LEN as u64;
1360            entries_handle.len = offset - LOG_BATCH_HEADER_LEN;
1361            let file_context = LogFileContext::new(entries_handle.id, version);
1362            {
1363                // Decoding with wrong compression type is okay.
1364                LogItemBatch::decode(
1365                    &mut &encoded[offset..],
1366                    entries_handle,
1367                    if compression_type == CompressionType::None {
1368                        CompressionType::Lz4
1369                    } else {
1370                        CompressionType::None
1371                    },
1372                    &file_context,
1373                )
1374                .unwrap();
1375                // Decode with wrong file number.
1376                if version.has_log_signing() {
1377                    LogItemBatch::decode(
1378                        &mut &encoded[offset..],
1379                        entries_handle,
1380                        compression_type,
1381                        &LogFileContext::new(FileId::new(LogQueue::Append, u64::MAX), version),
1382                    )
1383                    .unwrap_err();
1384                }
1385                // Decode with wrong version.
1386                LogItemBatch::decode(
1387                    &mut &encoded[offset..],
1388                    entries_handle,
1389                    compression_type,
1390                    &LogFileContext::new(
1391                        file_context.id,
1392                        if version == Version::V1 {
1393                            Version::V2
1394                        } else {
1395                            Version::V1
1396                        },
1397                    ),
1398                )
1399                .unwrap_err();
1400            }
1401            let decoded_item_batch = LogItemBatch::decode(
1402                &mut &encoded[offset..],
1403                entries_handle,
1404                compression_type,
1405                &file_context,
1406            )
1407            .unwrap();
1408            assert_eq!(decoded_item_batch, item_batch);
1409            assert!(decoded_item_batch.approximate_size() >= len - offset);
1410
1411            let entries = &encoded[LOG_BATCH_HEADER_LEN..offset];
1412            for item in decoded_item_batch.items.iter() {
1413                if let LogItemContent::EntryIndexes(entry_indexes) = &item.content {
1414                    if !entry_indexes.0.is_empty() {
1415                        let (begin, end) = (
1416                            entry_indexes.0.first().unwrap().index,
1417                            entry_indexes.0.last().unwrap().index + 1,
1418                        );
1419                        let origin_entries = generate_entries(begin, end, Some(entry_data));
1420                        let decoded_entries =
1421                            decode_entries_from_bytes::<Entry>(entries, &entry_indexes.0, false);
1422                        assert_eq!(origin_entries, decoded_entries);
1423                    }
1424                }
1425            }
1426        }
1427
1428        let mut batches = vec![(LogBatch::default(), Vec::new())];
1429        let mut batch = LogBatch::default();
1430        let entry_data = vec![b'x'; 1024];
1431        batch
1432            .add_entries::<Entry>(7, &generate_entries(1, 11, Some(&entry_data)))
1433            .unwrap();
1434        batch.add_command(7, Command::Clean);
1435        batch.put(7, b"key".to_vec(), b"value".to_vec()).unwrap();
1436        batch.delete(7, b"key2".to_vec());
1437        batch
1438            .add_entries::<Entry>(7, &generate_entries(1, 11, Some(&entry_data)))
1439            .unwrap();
1440        batches.push((batch, entry_data));
1441        let mut batch = LogBatch::default();
1442        batch
1443            .add_entries::<Entry>(17, &generate_entries(0, 1, None))
1444            .unwrap();
1445        batch
1446            .add_entries::<Entry>(27, &generate_entries(1, 11, None))
1447            .unwrap();
1448        batches.push((batch, Vec::new()));
1449
1450        // Validate with different Versions
1451        for version in Version::iter() {
1452            for compress in [true, false] {
1453                for (batch, entry_data) in batches.clone().into_iter() {
1454                    decode_and_encode(batch, compress, version, &entry_data);
1455                }
1456            }
1457        }
1458    }
1459
1460    #[test]
1461    fn test_log_batch_merge() {
1462        let region_id = 8;
1463        let mut entries = Vec::new();
1464        let mut kvs = Vec::new();
1465        let data = vec![b'x'; 1024];
1466        let file_id = FileId::dummy(LogQueue::Append);
1467        let file_context = LogFileContext::new(file_id, Version::default());
1468
1469        let mut batch1 = LogBatch::default();
1470        entries.push(generate_entries(1, 11, Some(&data)));
1471        batch1
1472            .add_entries::<Entry>(region_id, entries.last().unwrap())
1473            .unwrap();
1474        for i in 0..=2 {
1475            let (k, v) = (format!("k{i}").into_bytes(), format!("v{i}").into_bytes());
1476            batch1.put(region_id, k.clone(), v.clone()).unwrap();
1477            kvs.push((k, v));
1478        }
1479
1480        batch1.merge(&mut LogBatch::default()).unwrap();
1481
1482        let mut batch2 = LogBatch::default();
1483        entries.push(generate_entries(11, 21, Some(&data)));
1484        batch2
1485            .add_entries::<Entry>(region_id, entries.last().unwrap())
1486            .unwrap();
1487        for i in 3..=5 {
1488            let (k, v) = (format!("k{i}").into_bytes(), format!("v{i}").into_bytes());
1489            batch2.put(region_id, k.clone(), v.clone()).unwrap();
1490            kvs.push((k, v));
1491        }
1492
1493        batch1.merge(&mut batch2).unwrap();
1494        assert!(batch2.is_empty());
1495
1496        let len = batch1.finish_populate(0, None).unwrap();
1497        batch1.prepare_write(&file_context).unwrap();
1498        let encoded = batch1.encoded_bytes();
1499        assert_eq!(len, encoded.len());
1500
1501        // decode item batch
1502        let (offset, compression_type, len) = LogBatch::decode_header(&mut &*encoded).unwrap();
1503        assert_eq!(encoded.len(), len);
1504        let decoded_item_batch = LogItemBatch::decode(
1505            &mut &encoded[offset..],
1506            FileBlockHandle {
1507                id: file_id,
1508                offset: 0,
1509                len: offset - LOG_BATCH_HEADER_LEN,
1510            },
1511            compression_type,
1512            &file_context,
1513        )
1514        .unwrap();
1515
1516        // decode and assert entries
1517        let entry_bytes = &encoded[LOG_BATCH_HEADER_LEN..offset];
1518        for item in decoded_item_batch.items.iter() {
1519            match &item.content {
1520                LogItemContent::EntryIndexes(entry_indexes) => {
1521                    let decoded_entries =
1522                        decode_entries_from_bytes::<Entry>(entry_bytes, &entry_indexes.0, false);
1523                    assert_eq!(entries.remove(0), decoded_entries);
1524                }
1525                LogItemContent::Kv(kv) => {
1526                    let (k, v) = kvs.remove(0);
1527                    assert_eq!(OpType::Put, kv.op_type);
1528                    assert_eq!(k, kv.key);
1529                    assert_eq!(&v, kv.value.as_ref().unwrap());
1530                }
1531                _ => unreachable!(),
1532            }
1533        }
1534    }
1535
1536    #[test]
1537    fn test_empty_log_batch() {
1538        let mut batch = LogBatch::default();
1539        assert!(batch.is_empty());
1540        batch.add_entries::<Entry>(0, &Vec::new()).unwrap();
1541        assert!(batch.is_empty());
1542        batch.add_raw_entries(0, Vec::new(), Vec::new()).unwrap();
1543        assert!(batch.is_empty());
1544        // Encoding empty LogBatch.
1545        {
1546            let mocked_file_block_handles = FileBlockHandle {
1547                id: FileId::new(LogQueue::Append, 12),
1548                len: 0,
1549                offset: 0,
1550            };
1551            let buf_len = batch.buf.len();
1552            let len = batch.finish_populate(1, None).unwrap();
1553            assert!(len == 0);
1554            assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0));
1555            let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2);
1556            batch.prepare_write(&file_context).unwrap();
1557            assert!(batch.is_empty());
1558            assert_eq!(batch.buf_state, BufState::Sealed(buf_len, 0));
1559        }
1560    }
1561
1562    #[test]
1563    fn test_internal_key() {
1564        let mut batch = LogBatch::default();
1565        assert!(matches!(
1566            batch
1567                .put(0, crate::make_internal_key(&[0]), b"v".to_vec())
1568                .unwrap_err(),
1569            Error::InvalidArgument(_)
1570        ));
1571        assert!(matches!(
1572            batch
1573                .put_message(0, crate::make_internal_key(ATOMIC_GROUP_KEY), &Entry::new())
1574                .unwrap_err(),
1575            Error::InvalidArgument(_)
1576        ));
1577    }
1578
1579    #[test]
1580    fn test_header_corruption() {
1581        let region_id = 7;
1582        let data = vec![b'x'; 16];
1583        let mut batch = LogBatch::default();
1584        batch
1585            .add_entries::<Entry>(region_id, &generate_entries(1, 11, Some(&data)))
1586            .unwrap();
1587        batch
1588            .put(region_id, b"key".to_vec(), b"value".to_vec())
1589            .unwrap();
1590        // enable compression so that len_and_type > len.
1591        batch.finish_populate(1, None).unwrap();
1592        let file_context = LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default());
1593        batch.prepare_write(&file_context).unwrap();
1594        let encoded = batch.encoded_bytes();
1595
1596        let mut copy = encoded.to_owned();
1597        copy.truncate(LOG_BATCH_HEADER_LEN - 1);
1598        assert!(LogBatch::decode_header(&mut copy.as_slice())
1599            .unwrap_err()
1600            .to_string()
1601            .contains("Log batch header too short"));
1602
1603        let mut copy = encoded.to_owned();
1604        (&mut copy[LOG_BATCH_HEADER_LEN - 8..LOG_BATCH_HEADER_LEN])
1605            .write_u64::<BigEndian>(encoded.len() as u64 + 1)
1606            .unwrap();
1607        assert!(LogBatch::decode_header(&mut copy.as_slice())
1608            .unwrap_err()
1609            .to_string()
1610            .contains("Log item offset exceeds log batch length"));
1611
1612        let mut copy = encoded.to_owned();
1613        (&mut copy[LOG_BATCH_HEADER_LEN - 8..LOG_BATCH_HEADER_LEN])
1614            .write_u64::<BigEndian>(LOG_BATCH_HEADER_LEN as u64 - 1)
1615            .unwrap();
1616        assert!(LogBatch::decode_header(&mut copy.as_slice())
1617            .unwrap_err()
1618            .to_string()
1619            .contains("Log item offset is smaller than log batch header length"));
1620    }
1621
1622    #[cfg(feature = "nightly")]
1623    #[bench]
1624    fn bench_log_batch_add_entry_and_encode(b: &mut test::Bencher) {
1625        use rand::{thread_rng, Rng};
1626        fn details(log_batch: &mut LogBatch, entries: &[Entry], regions: usize) {
1627            for _ in 0..regions {
1628                log_batch
1629                    .add_entries::<Entry>(thread_rng().gen(), entries)
1630                    .unwrap();
1631            }
1632            log_batch.finish_populate(0, None).unwrap();
1633            let _ = log_batch.drain();
1634        }
1635        let data: Vec<u8> = (0..128).map(|_| thread_rng().gen()).collect();
1636        let entries = generate_entries(1, 11, Some(&data));
1637        let mut log_batch = LogBatch::default();
1638        // warm up
1639        details(&mut log_batch, &entries, 100);
1640        b.iter(move || {
1641            details(&mut log_batch, &entries, 100);
1642        });
1643    }
1644
1645    #[test]
1646    fn test_log_batch_sign_signature_repeatedly() {
1647        // Set a LogBatch and encode the LogBatch by `finish_populate`.
1648        let mut batch = LogBatch::default();
1649        batch
1650            .add_entries::<Entry>(17, &generate_entries(0, 1, None))
1651            .unwrap();
1652        batch
1653            .add_entries::<Entry>(27, &generate_entries(1, 11, None))
1654            .unwrap();
1655
1656        let mocked_file_block_handles = [
1657            FileBlockHandle {
1658                id: FileId::new(LogQueue::Append, 12),
1659                len: 0,
1660                offset: 0,
1661            },
1662            FileBlockHandle {
1663                id: FileId::new(LogQueue::Append, 18),
1664                len: 0,
1665                offset: 0,
1666            },
1667            FileBlockHandle {
1668                id: FileId::new(LogQueue::Append, 2001),
1669                len: 0,
1670                offset: 0,
1671            },
1672        ];
1673        let old_approximate_size = batch.approximate_size();
1674        let len = batch.finish_populate(1, None).unwrap();
1675        assert!(old_approximate_size >= len);
1676        assert_eq!(batch.approximate_size(), len);
1677        let checksum = batch.item_batch.checksum;
1678
1679        // Repeatedly sign signature to this batch, followed by decoding the signature
1680        // and verifying the checksum.
1681        for handle in mocked_file_block_handles {
1682            let mut batch_handle = handle;
1683            batch_handle.len = len;
1684            let file_context = LogFileContext::new(batch_handle.id, Version::V2);
1685            batch.prepare_write(&file_context).unwrap();
1686            assert_eq!(batch.approximate_size(), len);
1687            let encoded = batch.encoded_bytes();
1688            assert_eq!(encoded.len(), len);
1689            let mut bytes_slice = encoded;
1690            let (offset, _, _) = LogBatch::decode_header(&mut bytes_slice).unwrap();
1691            let expected =
1692                verify_checksum_with_signature(&encoded[offset..], file_context.get_signature())
1693                    .unwrap();
1694            assert_eq!(expected, checksum);
1695        }
1696    }
1697}