1use std::fmt::Debug;
4use std::io::BufRead;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use std::{mem, u64};
8
9use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
10use log::error;
11use num_derive::FromPrimitive;
12use num_traits::FromPrimitive;
13use protobuf::Message;
14
15use crate::codec::{self, NumberEncoder};
16use crate::memtable::EntryIndex;
17use crate::metrics::StopWatch;
18use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext, ReactiveBytes};
19use crate::util::{crc32, lz4};
20use crate::{perf_context, Error, Result};
21
22pub(crate) const LOG_BATCH_HEADER_LEN: usize = 16;
23pub(crate) const LOG_BATCH_CHECKSUM_LEN: usize = 4;
24
25const TYPE_ENTRIES: u8 = 0x01;
26const TYPE_COMMAND: u8 = 0x02;
27const TYPE_KV: u8 = 0x3;
28
29const CMD_CLEAN: u8 = 0x01;
30const CMD_COMPACT: u8 = 0x02;
31
32const DEFAULT_LOG_ITEM_BATCH_CAP: usize = 64;
33const MAX_LOG_BATCH_BUFFER_CAP: usize = 8 * 1024 * 1024;
34const MAX_LOG_ENTRIES_SIZE_PER_BATCH: usize = i32::MAX as usize;
36
37pub trait MessageExt: Send + Sync {
40 type Entry: Message + Clone + PartialEq;
41
42 fn index(e: &Self::Entry) -> u64;
43}
44
45#[repr(u8)]
47#[derive(Clone, Copy, PartialEq, Eq, Debug)]
48pub enum CompressionType {
49 None = 0,
50 Lz4 = 1,
51}
52
53impl CompressionType {
54 pub fn from_u8(t: u8) -> Result<Self> {
55 if t <= CompressionType::Lz4 as u8 {
56 Ok(unsafe { mem::transmute(t) })
57 } else {
58 Err(Error::Corruption(format!(
59 "Unrecognized compression type: {t}"
60 )))
61 }
62 }
63
64 pub fn to_u8(self) -> u8 {
65 self as u8
66 }
67}
68
69type SliceReader<'a> = &'a [u8];
70
71#[derive(Clone, Debug, PartialEq, Eq)]
74pub struct EntryIndexes(pub Vec<EntryIndex>);
75
76impl EntryIndexes {
77 pub fn decode(buf: &mut SliceReader, entries_size: &mut u32) -> Result<Self> {
78 let mut count = codec::decode_var_u64(buf)?;
79 let mut entry_indexes = Vec::with_capacity(count as usize);
80 let mut index = 0;
81 if count > 0 {
82 index = codec::decode_var_u64(buf)?;
83 }
84 while count > 0 {
85 let t = codec::decode_var_u64(buf)?;
86 let entry_len = (t as u32) - *entries_size;
87 let entry_index = EntryIndex {
88 index,
89 entry_offset: *entries_size,
90 entry_len,
91 ..Default::default()
92 };
93 *entries_size += entry_len;
94 entry_indexes.push(entry_index);
95 index += 1;
96 count -= 1;
97 }
98 Ok(Self(entry_indexes))
99 }
100
101 pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
102 let count = self.0.len() as u64;
103 buf.encode_var_u64(count)?;
104 if count > 0 {
105 buf.encode_var_u64(self.0[0].index)?;
106 }
107 for ei in self.0.iter() {
108 buf.encode_var_u64((ei.entry_offset + ei.entry_len) as u64)?;
109 }
110 Ok(())
111 }
112
113 fn approximate_size(&self) -> usize {
114 8 + if self.0.is_empty() { 0 } else { 8 } + 8 * self.0.len()
115 }
116}
117
118#[derive(Clone, Debug, PartialEq, Eq)]
121pub enum Command {
122 Clean,
123 Compact { index: u64 },
124}
125
126impl Command {
127 pub fn encode(&self, vec: &mut Vec<u8>) {
128 match *self {
129 Command::Clean => {
130 vec.push(CMD_CLEAN);
131 }
132 Command::Compact { index } => {
133 vec.push(CMD_COMPACT);
134 vec.encode_var_u64(index).unwrap();
135 }
136 }
137 }
138
139 pub fn decode(buf: &mut SliceReader) -> Result<Command> {
140 let command_type = codec::read_u8(buf)?;
141 match command_type {
142 CMD_CLEAN => Ok(Command::Clean),
143 CMD_COMPACT => {
144 let index = codec::decode_var_u64(buf)?;
145 Ok(Command::Compact { index })
146 }
147 _ => Err(Error::Corruption(format!(
148 "Unrecognized command type: {command_type}"
149 ))),
150 }
151 }
152
153 fn approximate_size(&self) -> usize {
154 match &self {
155 Command::Clean => 1, Command::Compact { .. } => 1 + 8, }
158 }
159}
160
161#[repr(u8)]
162#[derive(Debug, PartialEq, Eq, Copy, Clone)]
163pub enum OpType {
164 Put = 1,
165 Del = 2,
166}
167
168impl OpType {
169 pub fn from_u8(t: u8) -> Result<Self> {
170 if t <= OpType::Del as u8 {
171 Ok(unsafe { mem::transmute(t) })
172 } else {
173 Err(Error::Corruption(format!("Unrecognized op type: {t}")))
174 }
175 }
176
177 pub fn to_u8(self) -> u8 {
178 self as u8
179 }
180}
181
182#[derive(Clone, Debug, PartialEq, Eq)]
185pub struct KeyValue {
186 pub op_type: OpType,
187 pub key: Vec<u8>,
188 pub value: Option<Vec<u8>>,
189 pub file_id: Option<FileId>,
190}
191
192impl KeyValue {
193 pub fn new(op_type: OpType, key: Vec<u8>, value: Option<Vec<u8>>) -> KeyValue {
194 KeyValue {
195 op_type,
196 key,
197 value,
198 file_id: None,
199 }
200 }
201
202 pub fn decode(buf: &mut SliceReader) -> Result<KeyValue> {
203 let op_type = OpType::from_u8(codec::read_u8(buf)?)?;
204 let k_len = codec::decode_var_u64(buf)? as usize;
205 let key = &buf[..k_len];
206 buf.consume(k_len);
207 match op_type {
208 OpType::Put => {
209 let v_len = codec::decode_var_u64(buf)? as usize;
210 let value = &buf[..v_len];
211 buf.consume(v_len);
212 Ok(KeyValue::new(
213 OpType::Put,
214 key.to_vec(),
215 Some(value.to_vec()),
216 ))
217 }
218 OpType::Del => Ok(KeyValue::new(OpType::Del, key.to_vec(), None)),
219 }
220 }
221
222 pub fn encode(&self, vec: &mut Vec<u8>) -> Result<()> {
223 vec.push(self.op_type.to_u8());
224 vec.encode_var_u64(self.key.len() as u64)?;
225 vec.extend_from_slice(self.key.as_slice());
226 match self.op_type {
227 OpType::Put => {
228 vec.encode_var_u64(self.value.as_ref().unwrap().len() as u64)?;
229 vec.extend_from_slice(self.value.as_ref().unwrap().as_slice());
230 }
231 OpType::Del => {}
232 }
233 Ok(())
234 }
235
236 fn approximate_size(&self) -> usize {
237 1 + 8 + self.key.len() + 8 + self.value.as_ref().map_or_else(|| 0, |v| v.len())
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
244pub struct LogItem {
245 pub raft_group_id: u64,
246 pub content: LogItemContent,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq)]
250pub enum LogItemContent {
251 EntryIndexes(EntryIndexes),
252 Command(Command),
253 Kv(KeyValue),
254}
255
256impl LogItem {
257 pub fn new_entry_indexes(raft_group_id: u64, entry_indexes: Vec<EntryIndex>) -> LogItem {
258 LogItem {
259 raft_group_id,
260 content: LogItemContent::EntryIndexes(EntryIndexes(entry_indexes)),
261 }
262 }
263
264 pub fn new_command(raft_group_id: u64, command: Command) -> LogItem {
265 LogItem {
266 raft_group_id,
267 content: LogItemContent::Command(command),
268 }
269 }
270
271 pub fn new_kv(
272 raft_group_id: u64,
273 op_type: OpType,
274 key: Vec<u8>,
275 value: Option<Vec<u8>>,
276 ) -> LogItem {
277 LogItem {
278 raft_group_id,
279 content: LogItemContent::Kv(KeyValue::new(op_type, key, value)),
280 }
281 }
282
283 pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
284 buf.encode_var_u64(self.raft_group_id)?;
285 match &self.content {
286 LogItemContent::EntryIndexes(entry_indexes) => {
287 buf.push(TYPE_ENTRIES);
288 entry_indexes.encode(buf)?;
289 }
290 LogItemContent::Command(command) => {
291 buf.push(TYPE_COMMAND);
292 command.encode(buf);
293 }
294 LogItemContent::Kv(kv) => {
295 buf.push(TYPE_KV);
296 kv.encode(buf)?;
297 }
298 }
299 Ok(())
300 }
301
302 pub fn decode(buf: &mut SliceReader, entries_size: &mut u32) -> Result<LogItem> {
303 let raft_group_id = codec::decode_var_u64(buf)?;
304 let item_type = buf.read_u8()?;
305 let content = match item_type {
306 TYPE_ENTRIES => {
307 let entry_indexes = EntryIndexes::decode(buf, entries_size)?;
308 LogItemContent::EntryIndexes(entry_indexes)
309 }
310 TYPE_COMMAND => {
311 let cmd = Command::decode(buf)?;
312 LogItemContent::Command(cmd)
313 }
314 TYPE_KV => {
315 let kv = KeyValue::decode(buf)?;
316 LogItemContent::Kv(kv)
317 }
318 _ => {
319 return Err(Error::Corruption(format!(
320 "Unrecognized log item type: {item_type}"
321 )));
322 }
323 };
324 Ok(LogItem {
325 raft_group_id,
326 content,
327 })
328 }
329
330 fn approximate_size(&self) -> usize {
331 match &self.content {
332 LogItemContent::EntryIndexes(entry_indexes) => {
333 8 + 1 + entry_indexes.approximate_size()
334 }
335 LogItemContent::Command(cmd) => 8 + 1 + cmd.approximate_size(),
336 LogItemContent::Kv(kv) => 8 + 1 + kv.approximate_size(),
337 }
338 }
339}
340
341pub(crate) type LogItemDrain<'a> = std::vec::Drain<'a, LogItem>;
342
343#[derive(Clone, Debug, PartialEq, Eq)]
347pub struct LogItemBatch {
348 items: Vec<LogItem>,
349 item_size: usize,
350 entries_size: u32,
351 checksum: u32,
352}
353
354impl Default for LogItemBatch {
355 fn default() -> Self {
356 Self::with_capacity(0)
357 }
358}
359
360impl LogItemBatch {
361 pub fn with_capacity(cap: usize) -> Self {
362 Self {
363 items: Vec::with_capacity(cap),
364 item_size: 0,
365 entries_size: 0,
366 checksum: 0,
367 }
368 }
369
370 pub fn into_items(self) -> Vec<LogItem> {
372 self.items
373 }
374
375 pub fn iter(&self) -> std::slice::Iter<LogItem> {
376 self.items.iter()
377 }
378
379 pub fn drain(&mut self) -> LogItemDrain {
380 self.item_size = 0;
381 self.entries_size = 0;
382 self.checksum = 0;
383 self.items.drain(..)
384 }
385
386 pub fn merge(&mut self, rhs: &mut LogItemBatch) {
387 for item in &mut rhs.items {
388 if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content {
389 for ei in entry_indexes.0.iter_mut() {
390 ei.entry_offset += self.entries_size;
391 }
392 }
393 }
394 self.item_size += rhs.item_size;
395 rhs.item_size = 0;
396 self.entries_size += rhs.entries_size;
397 rhs.entries_size = 0;
398 self.items.append(&mut rhs.items);
399 }
400
401 pub(crate) fn finish_populate(&mut self, compression_type: CompressionType) {
402 for item in self.items.iter_mut() {
403 if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content {
404 for ei in entry_indexes.0.iter_mut() {
405 ei.compression_type = compression_type;
406 }
407 }
408 }
409 }
410
411 pub(crate) fn prepare_write(
418 &self,
419 buf: &mut [u8],
420 file_context: &LogFileContext,
421 ) -> Result<()> {
422 if !buf.is_empty() {
423 let mut footer_checksum = self.checksum;
424 let footer_checksum_offset = buf.len() - LOG_BATCH_CHECKSUM_LEN;
427 if let Some(signature) = file_context.get_signature() {
428 footer_checksum ^= signature;
431 }
432 (&mut buf[footer_checksum_offset..]).write_u32::<LittleEndian>(footer_checksum)?;
433 }
434 Ok(())
435 }
436
437 pub(crate) fn finish_write(&mut self, handle: FileBlockHandle) {
438 for item in self.items.iter_mut() {
439 match &mut item.content {
440 LogItemContent::EntryIndexes(entry_indexes) => {
441 for ei in entry_indexes.0.iter_mut() {
442 ei.entries = Some(handle);
446 }
447 }
448 LogItemContent::Kv(kv) => {
449 debug_assert!(kv.file_id.is_none());
450 kv.file_id = Some(handle.id);
451 }
452 _ => {}
453 }
454 }
455 }
456
457 pub fn add_entry_indexes(&mut self, region_id: u64, mut entry_indexes: Vec<EntryIndex>) {
458 for ei in entry_indexes.iter_mut() {
459 ei.entry_offset = self.entries_size;
460 self.entries_size += ei.entry_len;
461 }
462 let item = LogItem::new_entry_indexes(region_id, entry_indexes);
463 self.item_size += item.approximate_size();
464 self.items.push(item);
465 }
466
467 pub fn add_command(&mut self, region_id: u64, cmd: Command) {
468 let item = LogItem::new_command(region_id, cmd);
469 self.item_size += item.approximate_size();
470 self.items.push(item);
471 }
472
473 pub fn delete(&mut self, region_id: u64, key: Vec<u8>) {
474 let item = LogItem::new_kv(region_id, OpType::Del, key, None);
475 self.item_size += item.approximate_size();
476 self.items.push(item);
477 }
478
479 pub fn put_message<S: Message>(&mut self, region_id: u64, key: Vec<u8>, s: &S) -> Result<()> {
480 self.put(region_id, key, s.write_to_bytes()?);
481 Ok(())
482 }
483
484 pub fn put(&mut self, region_id: u64, key: Vec<u8>, value: Vec<u8>) {
485 let item = LogItem::new_kv(region_id, OpType::Put, key, Some(value));
486 self.item_size += item.approximate_size();
487 self.items.push(item);
488 }
489
490 pub fn encode(&mut self, buf: &mut Vec<u8>) -> Result<()> {
491 let offset = buf.len();
492 let count = self.items.len() as u64;
493 buf.encode_var_u64(count)?;
494 for item in self.items.iter() {
495 item.encode(buf)?;
496 }
497 self.checksum = crc32(&buf[offset..]);
498 buf.encode_u32_le(0)?;
501 Ok(())
502 }
503
504 pub fn decode(
507 buf: &mut SliceReader,
508 entries: FileBlockHandle,
509 compression_type: CompressionType,
510 file_context: &LogFileContext,
511 ) -> Result<LogItemBatch> {
512 let checksum = verify_checksum_with_signature(buf, file_context.get_signature())?;
514 *buf = &buf[..buf.len() - LOG_BATCH_CHECKSUM_LEN];
515 let count = codec::decode_var_u64(buf)?;
516 let mut items = LogItemBatch::with_capacity(count as usize);
517 let mut entries_size = 0;
518 for _ in 0..count {
519 let item = LogItem::decode(buf, &mut entries_size)?;
520 items.item_size += item.approximate_size();
521 items.items.push(item);
522 }
523 items.entries_size = entries_size;
524
525 for item in items.items.iter_mut() {
526 if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content {
527 for ei in entry_indexes.0.iter_mut() {
528 ei.compression_type = compression_type;
529 ei.entries = Some(entries);
530 }
531 } else if let LogItemContent::Kv(kv) = &mut item.content {
532 kv.file_id = Some(entries.id);
533 }
534 }
535 items.checksum = checksum;
536 Ok(items)
537 }
538
539 pub fn approximate_size(&self) -> usize {
540 8 + self.item_size + LOG_BATCH_CHECKSUM_LEN
541 }
542
543 pub fn entry_index(&self) -> Option<EntryIndex> {
545 for item in &self.items {
546 if let LogItemContent::EntryIndexes(entry_indexes) = &item.content {
547 return entry_indexes.0.first().cloned();
548 }
549 }
550 None
551 }
552}
553
554#[derive(Copy, Clone, Debug, PartialEq, Eq)]
555enum BufState {
556 Open,
560 Encoded(usize, usize),
566 Sealed(usize, usize),
574 Incomplete,
576}
577
578#[derive(Clone, PartialEq, Eq, Debug)]
598pub struct LogBatch {
599 item_batch: LogItemBatch,
600 buf_state: BufState,
601 buf: Vec<u8>,
602}
603
604impl Default for LogBatch {
605 fn default() -> Self {
606 Self::with_capacity(DEFAULT_LOG_ITEM_BATCH_CAP)
607 }
608}
609
610impl LogBatch {
611 pub fn with_capacity(cap: usize) -> Self {
614 let mut buf = Vec::with_capacity(4096);
615 buf.resize(LOG_BATCH_HEADER_LEN, 0);
616 Self {
617 item_batch: LogItemBatch::with_capacity(cap),
618 buf_state: BufState::Open,
619 buf,
620 }
621 }
622
623 pub fn merge(&mut self, rhs: &mut Self) -> Result<()> {
625 debug_assert!(self.buf_state == BufState::Open && rhs.buf_state == BufState::Open);
626 let max_entries_size = (|| {
627 fail::fail_point!("log_batch::1kb_entries_size_per_batch", |_| 1024);
628 MAX_LOG_ENTRIES_SIZE_PER_BATCH
629 })();
630 if !rhs.buf.is_empty() {
631 if rhs.buf.len() + self.buf.len() > max_entries_size + LOG_BATCH_HEADER_LEN * 2 {
632 return Err(Error::Full);
633 }
634 self.buf_state = BufState::Incomplete;
635 rhs.buf_state = BufState::Incomplete;
636 self.buf.extend_from_slice(&rhs.buf[LOG_BATCH_HEADER_LEN..]);
637 rhs.buf.shrink_to(MAX_LOG_BATCH_BUFFER_CAP);
638 rhs.buf.truncate(LOG_BATCH_HEADER_LEN);
639 }
640 self.item_batch.merge(&mut rhs.item_batch);
641 self.buf_state = BufState::Open;
642 rhs.buf_state = BufState::Open;
643 Ok(())
644 }
645
646 pub fn add_entries<M: MessageExt>(
648 &mut self,
649 region_id: u64,
650 entries: &[M::Entry],
651 ) -> Result<()> {
652 debug_assert!(self.buf_state == BufState::Open);
653 if entries.is_empty() {
654 return Ok(());
655 }
656
657 let mut entry_indexes = Vec::with_capacity(entries.len());
658 self.buf_state = BufState::Incomplete;
659 let old_buf_len = self.buf.len();
660 let max_entries_size = (|| {
661 fail::fail_point!("log_batch::1kb_entries_size_per_batch", |_| 1024);
662 MAX_LOG_ENTRIES_SIZE_PER_BATCH
663 })();
664 for e in entries {
665 let buf_offset = self.buf.len();
666 e.write_to_vec(&mut self.buf)?;
667 if self.buf.len() > max_entries_size + LOG_BATCH_HEADER_LEN {
668 self.buf.truncate(old_buf_len);
669 self.buf_state = BufState::Open;
670 return Err(Error::Full);
671 }
672 entry_indexes.push(EntryIndex {
673 index: M::index(e),
674 entry_len: (self.buf.len() - buf_offset) as u32,
675 ..Default::default()
676 });
677 }
678 self.item_batch.add_entry_indexes(region_id, entry_indexes);
679 self.buf_state = BufState::Open;
680 Ok(())
681 }
682
683 pub(crate) fn add_raw_entries(
687 &mut self,
688 region_id: u64,
689 mut entry_indexes: Vec<EntryIndex>,
690 entries: Vec<Vec<u8>>,
691 ) -> Result<()> {
692 debug_assert!(entry_indexes.len() == entries.len());
693 debug_assert!(self.buf_state == BufState::Open);
694 if entry_indexes.is_empty() {
695 return Ok(());
696 }
697
698 self.buf_state = BufState::Incomplete;
699 let old_buf_len = self.buf.len();
700 let max_entries_size = (|| {
701 fail::fail_point!("log_batch::1kb_entries_size_per_batch", |_| 1024);
702 MAX_LOG_ENTRIES_SIZE_PER_BATCH
703 })();
704 for (ei, e) in entry_indexes.iter_mut().zip(entries.iter()) {
705 if e.len() + self.buf.len() > max_entries_size + LOG_BATCH_HEADER_LEN {
706 self.buf.truncate(old_buf_len);
707 self.buf_state = BufState::Open;
708 return Err(Error::Full);
709 }
710 let buf_offset = self.buf.len();
711 self.buf.extend(e);
712 ei.entry_len = (self.buf.len() - buf_offset) as u32;
713 }
714 self.item_batch.add_entry_indexes(region_id, entry_indexes);
715 self.buf_state = BufState::Open;
716 Ok(())
717 }
718
719 pub fn add_command(&mut self, region_id: u64, cmd: Command) {
721 self.item_batch.add_command(region_id, cmd);
722 }
723
724 pub fn delete(&mut self, region_id: u64, key: Vec<u8>) {
726 self.item_batch.delete(region_id, key);
727 }
728
729 pub fn put_message<S: Message>(&mut self, region_id: u64, key: Vec<u8>, s: &S) -> Result<()> {
731 if crate::is_internal_key(&key, None) {
732 return Err(Error::InvalidArgument(format!(
733 "key prefix `{:?}` reserved for internal use",
734 crate::INTERNAL_KEY_PREFIX
735 )));
736 }
737 self.item_batch.put_message(region_id, key, s)
738 }
739
740 pub fn put(&mut self, region_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
742 if crate::is_internal_key(&key, None) {
743 return Err(Error::InvalidArgument(format!(
744 "key prefix `{:?}` reserved for internal use",
745 crate::INTERNAL_KEY_PREFIX
746 )));
747 }
748 self.item_batch.put(region_id, key, value);
749 Ok(())
750 }
751
752 pub(crate) fn put_unchecked(&mut self, region_id: u64, key: Vec<u8>, value: Vec<u8>) {
753 self.item_batch.put(region_id, key, value);
754 }
755
756 pub fn is_empty(&self) -> bool {
758 self.item_batch.items.is_empty()
759 }
760
761 pub(crate) fn finish_populate(
767 &mut self,
768 compression_threshold: usize,
769 compression_level: Option<usize>,
770 ) -> Result<usize> {
771 let _t = StopWatch::new(perf_context!(log_populating_duration));
772 debug_assert!(self.buf_state == BufState::Open);
773 if self.is_empty() {
774 self.buf_state = BufState::Encoded(self.buf.len(), 0);
775 return Ok(0);
776 }
777 self.buf_state = BufState::Incomplete;
778
779 let (header_offset, compression_type) = if compression_threshold > 0
781 && self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold
782 {
783 let buf_len = self.buf.len();
784 lz4::append_compress_block(
785 &mut self.buf,
786 LOG_BATCH_HEADER_LEN,
787 compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL),
788 )?;
789 (buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4)
790 } else {
791 (0, CompressionType::None)
792 };
793
794 if self.buf.len() > header_offset + LOG_BATCH_HEADER_LEN {
796 let checksum = crc32(&self.buf[header_offset + LOG_BATCH_HEADER_LEN..]);
797 self.buf.encode_u32_le(checksum)?;
798 }
799 let footer_roffset = self.buf.len() - header_offset;
801
802 self.item_batch.encode(&mut self.buf)?;
804 self.item_batch.finish_populate(compression_type);
805
806 let len =
808 (((self.buf.len() - header_offset) as u64) << 8) | u64::from(compression_type.to_u8());
809 (&mut self.buf[header_offset..header_offset + 8]).write_u64::<BigEndian>(len)?;
810 (&mut self.buf[header_offset + 8..header_offset + 16])
811 .write_u64::<BigEndian>(footer_roffset as u64)?;
812
813 #[cfg(feature = "failpoints")]
814 {
815 let corrupted_items = || {
816 fail::fail_point!("log_batch::corrupted_items", |_| true);
817 false
818 };
819 if corrupted_items() {
820 self.buf[footer_roffset] += 1;
821 }
822 let corrupted_entries = || {
823 fail::fail_point!("log_batch::corrupted_entries", |_| true);
824 false
825 };
826 if corrupted_entries() {
827 assert!(footer_roffset > LOG_BATCH_HEADER_LEN);
828 self.buf[footer_roffset - 1] += 1;
829 }
830 }
831
832 self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
833 Ok(self.buf.len() - header_offset)
834 }
835
836 #[inline]
838 pub(crate) fn prepare_write(&mut self, file_context: &LogFileContext) -> Result<()> {
839 match self.buf_state {
840 BufState::Encoded(header_offset, entries_len)
842 | BufState::Sealed(header_offset, entries_len) => {
843 self.item_batch
844 .prepare_write(&mut self.buf[header_offset + entries_len..], file_context)?;
845 self.buf_state = BufState::Sealed(header_offset, entries_len);
846 }
847 _ => unreachable!(),
848 }
849 Ok(())
850 }
851
852 pub(crate) fn encoded_bytes(&self) -> &[u8] {
855 match self.buf_state {
856 BufState::Sealed(header_offset, _) => &self.buf[header_offset..],
857 _ => unreachable!(),
858 }
859 }
860
861 pub(crate) fn finish_write(&mut self, mut handle: FileBlockHandle) {
865 debug_assert!(matches!(self.buf_state, BufState::Sealed(_, _)));
866 if !self.is_empty() {
867 handle.offset += LOG_BATCH_HEADER_LEN as u64;
869 match self.buf_state {
870 BufState::Sealed(_, entries_len) => {
871 debug_assert!(LOG_BATCH_HEADER_LEN + entries_len < handle.len);
872 handle.len = entries_len;
873 }
874 _ => unreachable!(),
875 }
876 }
877 self.item_batch.finish_write(handle);
878 }
879
880 pub(crate) fn drain(&mut self) -> LogItemDrain {
882 debug_assert!(!matches!(self.buf_state, BufState::Incomplete));
883
884 self.buf.shrink_to(MAX_LOG_BATCH_BUFFER_CAP);
885 self.buf.truncate(LOG_BATCH_HEADER_LEN);
886 self.buf_state = BufState::Open;
887 self.item_batch.drain()
888 }
889
890 pub fn approximate_size(&self) -> usize {
893 if self.is_empty() {
894 0
895 } else {
896 match self.buf_state {
897 BufState::Open => {
898 self.buf.len() + LOG_BATCH_CHECKSUM_LEN + self.item_batch.approximate_size()
899 }
900 BufState::Encoded(header_offset, _) => self.buf.len() - header_offset,
901 BufState::Sealed(header_offset, _) => self.buf.len() - header_offset,
902 s => {
903 error!("querying incomplete log batch with state {s:?}");
904 0
905 }
906 }
907 }
908 }
909
910 pub(crate) fn decode_header(buf: &mut SliceReader) -> Result<(usize, CompressionType, usize)> {
918 if buf.len() < LOG_BATCH_HEADER_LEN {
919 return Err(Error::Corruption(format!(
920 "Log batch header too short: {}",
921 buf.len()
922 )));
923 }
924
925 let len_and_type = codec::decode_u64(buf)? as usize;
926 let compression_type = CompressionType::from_u8(len_and_type as u8)?;
927 let len = len_and_type >> 8;
928 let offset = codec::decode_u64(buf)? as usize;
929 if offset > len {
930 return Err(Error::Corruption(
931 "Log item offset exceeds log batch length".to_owned(),
932 ));
933 } else if offset < LOG_BATCH_HEADER_LEN {
934 return Err(Error::Corruption(
935 "Log item offset is smaller than log batch header length".to_owned(),
936 ));
937 }
938 Ok((offset, compression_type, len))
939 }
940
941 pub(crate) fn decode_entries_block(
943 buf: &[u8],
944 handle: FileBlockHandle,
945 compression: CompressionType,
946 ) -> Result<Vec<u8>> {
947 if handle.len > 0 {
948 let _ = verify_checksum_with_signature(&buf[0..handle.len], None)?;
949 match compression {
950 CompressionType::None => Ok(buf[..handle.len - LOG_BATCH_CHECKSUM_LEN].to_owned()),
951 CompressionType::Lz4 => {
952 let decompressed =
953 lz4::decompress_block(&buf[..handle.len - LOG_BATCH_CHECKSUM_LEN])?;
954 Ok(decompressed)
955 }
956 }
957 } else {
958 Ok(Vec::new())
959 }
960 }
961}
962
963impl ReactiveBytes for LogBatch {
964 fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8] {
965 self.prepare_write(ctx).unwrap();
966 self.encoded_bytes()
967 }
968}
969
970fn verify_checksum_with_signature(buf: &[u8], signature: Option<u32>) -> Result<u32> {
975 if buf.len() <= LOG_BATCH_CHECKSUM_LEN {
976 return Err(Error::Corruption(format!(
977 "Content too short {}",
978 buf.len()
979 )));
980 }
981 let actual = crc32(&buf[..buf.len() - LOG_BATCH_CHECKSUM_LEN]);
982 let mut expected = codec::decode_u32_le(&mut &buf[buf.len() - LOG_BATCH_CHECKSUM_LEN..])?;
983 if let Some(signature) = signature {
984 expected ^= signature;
985 }
986 if actual != expected {
987 return Err(Error::Corruption(format!(
988 "Checksum expected {expected} but got {actual}"
989 )));
990 }
991 Ok(actual)
992}
993
994lazy_static! {
995 static ref ATOMIC_GROUP_ID: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
996}
997pub(crate) const ATOMIC_GROUP_KEY: &[u8] = &[0x01];
998const ATOMIC_GROUP_VALUE_LEN: usize = 1;
1000
1001#[repr(u8)]
1002#[derive(Clone, Copy, FromPrimitive, Debug, PartialEq)]
1003pub(crate) enum AtomicGroupStatus {
1004 Begin = 0,
1005 Middle = 1,
1006 End = 2,
1007}
1008
1009impl AtomicGroupStatus {
1010 pub fn parse(item: &LogItem) -> Option<(u64, AtomicGroupStatus)> {
1012 if let LogItemContent::Kv(KeyValue {
1013 op_type,
1014 key,
1015 value,
1016 ..
1017 }) = &item.content
1018 {
1019 if *op_type == OpType::Put
1020 && crate::is_internal_key(key, Some(ATOMIC_GROUP_KEY))
1021 && value.as_ref().unwrap().len() == ATOMIC_GROUP_VALUE_LEN
1022 {
1023 let value = &mut value.as_ref().unwrap().as_slice();
1024 return Some((
1025 item.raft_group_id,
1026 AtomicGroupStatus::from_u8(value[0]).unwrap(),
1027 ));
1028 }
1029 }
1030 None
1031 }
1032}
1033
1034pub(crate) struct AtomicGroupBuilder {
1054 id: u64,
1055 status: Option<AtomicGroupStatus>,
1056}
1057
1058impl Default for AtomicGroupBuilder {
1059 fn default() -> Self {
1060 Self {
1061 id: ATOMIC_GROUP_ID.fetch_add(1, Ordering::Relaxed),
1063 status: None,
1064 }
1065 }
1066}
1067
1068impl AtomicGroupBuilder {
1069 pub fn begin(&mut self, lb: &mut LogBatch) {
1072 fail::fail_point!("atomic_group::begin");
1073 assert_eq!(self.status, None);
1074 self.status = Some(AtomicGroupStatus::Begin);
1075 self.flush(lb);
1076 }
1077
1078 pub fn add(&mut self, lb: &mut LogBatch) {
1079 fail::fail_point!("atomic_group::add");
1080 assert!(matches!(
1081 self.status,
1082 Some(AtomicGroupStatus::Begin | AtomicGroupStatus::Middle)
1083 ));
1084 self.status = Some(AtomicGroupStatus::Middle);
1085 self.flush(lb);
1086 }
1087
1088 pub fn end(&mut self, lb: &mut LogBatch) {
1089 assert!(matches!(
1090 self.status,
1091 Some(AtomicGroupStatus::Begin | AtomicGroupStatus::Middle)
1092 ));
1093 self.status = Some(AtomicGroupStatus::End);
1094 self.flush(lb);
1095 }
1096
1097 #[inline]
1098 fn flush(&self, lb: &mut LogBatch) {
1099 let mut s = Vec::with_capacity(ATOMIC_GROUP_VALUE_LEN);
1100 s.push(self.status.unwrap() as u8);
1101 lb.put_unchecked(self.id, crate::make_internal_key(ATOMIC_GROUP_KEY), s);
1102 }
1103
1104 #[cfg(test)]
1105 pub fn with_id(id: u64) -> Self {
1106 Self { id, status: None }
1107 }
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112 use super::*;
1113 use crate::pipe_log::{LogQueue, Version};
1114 use crate::test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt};
1115 use protobuf::parse_from_bytes;
1116 use raft::eraftpb::Entry;
1117 use strum::IntoEnumIterator;
1118
1119 fn decode_entries_from_bytes<M: MessageExt>(
1120 buf: &[u8],
1121 entry_indexes: &[EntryIndex],
1122 _encoded: bool,
1123 ) -> Vec<M::Entry> {
1124 let mut entries = Vec::with_capacity(entry_indexes.len());
1125 for ei in entry_indexes {
1126 let block =
1127 LogBatch::decode_entries_block(buf, ei.entries.unwrap(), ei.compression_type)
1128 .unwrap();
1129 entries.push(
1130 parse_from_bytes(
1131 &block[ei.entry_offset as usize..(ei.entry_offset + ei.entry_len) as usize],
1132 )
1133 .unwrap(),
1134 );
1135 }
1136 entries
1137 }
1138
1139 #[test]
1140 fn test_entry_indexes_enc_dec() {
1141 fn encode_and_decode(entry_indexes: &mut [EntryIndex]) -> EntryIndexes {
1142 let mut entries_size = 0;
1143 for idx in entry_indexes.iter_mut() {
1144 idx.entry_offset = entries_size;
1145 entries_size += idx.entry_len;
1146 }
1147 let entry_indexes = EntryIndexes(entry_indexes.to_vec());
1148
1149 let mut encoded = vec![];
1150 entry_indexes.encode(&mut encoded).unwrap();
1151 let mut bytes_slice = encoded.as_slice();
1152 let mut decoded_entries_size = 0;
1153 let decoded_indexes =
1154 EntryIndexes::decode(&mut bytes_slice, &mut decoded_entries_size).unwrap();
1155 assert_eq!(bytes_slice.len(), 0);
1156 assert!(decoded_indexes.approximate_size() >= encoded.len());
1157 assert_eq!(decoded_entries_size, entries_size);
1158 decoded_indexes
1159 }
1160
1161 let entry_indexes = vec![Vec::new(), generate_entry_indexes_opt(7, 17, None)];
1162 for mut idxs in entry_indexes.into_iter() {
1163 let decoded = encode_and_decode(&mut idxs);
1164 assert_eq!(idxs, decoded.0);
1165 }
1166
1167 let mut entry_indexes_with_file_id =
1168 generate_entry_indexes_opt(7, 17, Some(FileId::new(LogQueue::Append, 7)));
1169 let mut decoded = encode_and_decode(&mut entry_indexes_with_file_id);
1170 assert_ne!(entry_indexes_with_file_id, decoded.0);
1171 for i in decoded.0.iter_mut() {
1172 i.entries = None;
1173 }
1174 assert_ne!(entry_indexes_with_file_id, decoded.0);
1175 }
1176
1177 #[test]
1178 fn test_command_enc_dec() {
1179 let cmds = vec![Command::Clean, Command::Compact { index: 7 }];
1180 let invalid_command_type = 7;
1181 for cmd in cmds.into_iter() {
1182 let mut encoded = vec![];
1183 cmd.encode(&mut encoded);
1184 let mut bytes_slice = encoded.as_slice();
1185 let decoded_cmd = Command::decode(&mut bytes_slice).unwrap();
1186 assert_eq!(bytes_slice.len(), 0);
1187 assert!(decoded_cmd.approximate_size() >= encoded.len());
1188 assert_eq!(cmd, decoded_cmd);
1189
1190 encoded[0] = invalid_command_type;
1191 let expected = format!("Unrecognized command type: {invalid_command_type}");
1192 assert!(matches!(
1193 Command::decode(&mut encoded.as_slice()),
1194 Err(Error::Corruption(m)) if m == expected
1195 ));
1196 }
1197 }
1198
1199 #[test]
1200 fn test_kv_enc_dec() {
1201 let kvs = vec![
1202 KeyValue::new(OpType::Put, b"put".to_vec(), Some(b"put_v".to_vec())),
1203 KeyValue::new(OpType::Del, b"del".to_vec(), None),
1204 ];
1205 let invalid_op_type = 7;
1206 for kv in kvs.into_iter() {
1207 let mut encoded = vec![];
1208 kv.encode(&mut encoded).unwrap();
1209 let mut bytes_slice = encoded.as_slice();
1210 let decoded_kv = KeyValue::decode(&mut bytes_slice).unwrap();
1211 assert_eq!(bytes_slice.len(), 0);
1212 assert!(decoded_kv.approximate_size() >= encoded.len());
1213 assert_eq!(kv, decoded_kv);
1214
1215 encoded[0] = invalid_op_type;
1216 let expected = format!("Unrecognized op type: {invalid_op_type}");
1217 assert!(matches!(
1218 KeyValue::decode(&mut encoded.as_slice()),
1219 Err(Error::Corruption(m)) if m == expected
1220 ));
1221 }
1222
1223 let del_with_value = KeyValue::new(OpType::Del, b"del".to_vec(), Some(b"del_v".to_vec()));
1224 let mut encoded = vec![];
1225 del_with_value.encode(&mut encoded).unwrap();
1226 let mut bytes_slice = encoded.as_slice();
1227 let decoded_kv = KeyValue::decode(&mut bytes_slice).unwrap();
1228 assert_eq!(bytes_slice.len(), 0);
1229 assert!(decoded_kv.value.is_none());
1230 }
1231
1232 #[test]
1233 fn test_log_item_enc_dec() {
1234 let items = vec![
1235 LogItem::new_entry_indexes(7, generate_entry_indexes_opt(7, 17, None)),
1236 LogItem::new_command(17, Command::Compact { index: 7 }),
1237 LogItem::new_kv(27, OpType::Put, b"key".to_vec(), Some(b"value".to_vec())),
1238 ];
1239 let invalid_log_item_type = 7;
1240 for mut item in items.into_iter() {
1241 let mut entries_size = 0;
1242 if let LogItemContent::EntryIndexes(EntryIndexes(indexes)) = &mut item.content {
1243 for index in indexes.iter_mut() {
1244 index.entry_offset = entries_size;
1245 entries_size += index.entry_len;
1246 }
1247 }
1248 let mut encoded = vec![];
1249 item.encode(&mut encoded).unwrap();
1250 let mut bytes_slice = encoded.as_slice();
1251 let mut decoded_entries_size = 0;
1252 let decoded_item =
1253 LogItem::decode(&mut bytes_slice, &mut decoded_entries_size).unwrap();
1254 assert_eq!(bytes_slice.len(), 0);
1255 assert_eq!(decoded_entries_size, entries_size);
1256 assert!(decoded_item.approximate_size() >= encoded.len());
1257 assert_eq!(item, decoded_item);
1258
1259 bytes_slice = encoded.as_slice();
1261 codec::decode_var_u64(&mut bytes_slice).unwrap();
1262 let next_u8 = encoded.len() - bytes_slice.len();
1263 encoded[next_u8] = invalid_log_item_type;
1264 let expected = format!("Unrecognized log item type: {invalid_log_item_type}");
1265 assert!(matches!(
1266 LogItem::decode(&mut encoded.as_slice(), &mut decoded_entries_size),
1267 Err(Error::Corruption(m)) if m == expected
1268 ));
1269 }
1270 }
1271
1272 #[test]
1273 fn test_log_item_batch_enc_dec() {
1274 let mut batches = vec![LogItemBatch::default()];
1275 let mut batch = LogItemBatch::default();
1276 batch.add_entry_indexes(7, generate_entry_indexes_opt(1, 5, None ));
1277 batch.add_entry_indexes(
1278 7 + 100,
1279 generate_entry_indexes_opt(100, 105, None ),
1280 );
1281 batch.add_command(7, Command::Clean);
1282 batch.put(7, b"key".to_vec(), b"value".to_vec());
1283 batch.delete(7, b"key2".to_vec());
1284 batches.push(batch);
1285
1286 for batch in batches.into_iter() {
1287 for compression_type in [CompressionType::Lz4, CompressionType::None] {
1288 let mut batch = batch.clone();
1289 batch.finish_populate(compression_type);
1290 let mut encoded_batch = vec![];
1291 batch.encode(&mut encoded_batch).unwrap();
1292 let file_context =
1293 LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default());
1294 batch
1295 .prepare_write(&mut encoded_batch, &file_context)
1296 .unwrap();
1297 batch.finish_write(FileBlockHandle::dummy(LogQueue::Append));
1298 let decoded_batch = LogItemBatch::decode(
1299 &mut encoded_batch.as_slice(),
1300 FileBlockHandle::dummy(LogQueue::Append),
1301 compression_type,
1302 &file_context,
1303 )
1304 .unwrap();
1305 assert!(decoded_batch.approximate_size() >= encoded_batch.len());
1306 assert_eq!(batch, decoded_batch);
1307 }
1308 }
1309 }
1310
1311 #[test]
1312 fn test_log_batch_enc_dec() {
1313 fn decode_and_encode(
1314 mut batch: LogBatch,
1315 compress: bool,
1316 version: Version,
1317 entry_data: &[u8],
1318 ) {
1319 assert!(catch_unwind_silent(|| batch.encoded_bytes()).is_err());
1321 assert!(catch_unwind_silent(
1322 || batch.finish_write(FileBlockHandle::dummy(LogQueue::Append))
1323 )
1324 .is_err());
1325 let mocked_file_block_handle = FileBlockHandle {
1326 id: FileId::new(LogQueue::Append, 12),
1327 len: 0,
1328 offset: 0,
1329 };
1330 let old_approximate_size = batch.approximate_size();
1331 let len = batch.finish_populate(usize::from(compress), None).unwrap();
1332 assert!(old_approximate_size >= len);
1333 assert_eq!(batch.approximate_size(), len);
1334 let mut batch_handle = mocked_file_block_handle;
1335 batch_handle.len = len;
1336 let file_context = LogFileContext::new(batch_handle.id, version);
1337 batch.prepare_write(&file_context).unwrap();
1338 batch.finish_write(batch_handle);
1339 let encoded = batch.encoded_bytes();
1340 assert_eq!(encoded.len(), len);
1341 if len < LOG_BATCH_HEADER_LEN {
1342 assert_eq!(len, 0);
1343 let expected = "Log batch header too short: 0";
1344 assert!(matches!(
1345 LogBatch::decode_header(&mut &*encoded),
1346 Err(Error::Corruption(m)) if m == expected
1347 ));
1348 return;
1349 }
1350
1351 let item_batch = batch.item_batch.clone();
1352 let mut bytes_slice = encoded;
1354 let (offset, compression_type, len) =
1355 LogBatch::decode_header(&mut bytes_slice).unwrap();
1356 assert_eq!(len, encoded.len());
1357 assert_eq!(bytes_slice.len() + LOG_BATCH_HEADER_LEN, encoded.len());
1358 let mut entries_handle = mocked_file_block_handle;
1359 entries_handle.offset = LOG_BATCH_HEADER_LEN as u64;
1360 entries_handle.len = offset - LOG_BATCH_HEADER_LEN;
1361 let file_context = LogFileContext::new(entries_handle.id, version);
1362 {
1363 LogItemBatch::decode(
1365 &mut &encoded[offset..],
1366 entries_handle,
1367 if compression_type == CompressionType::None {
1368 CompressionType::Lz4
1369 } else {
1370 CompressionType::None
1371 },
1372 &file_context,
1373 )
1374 .unwrap();
1375 if version.has_log_signing() {
1377 LogItemBatch::decode(
1378 &mut &encoded[offset..],
1379 entries_handle,
1380 compression_type,
1381 &LogFileContext::new(FileId::new(LogQueue::Append, u64::MAX), version),
1382 )
1383 .unwrap_err();
1384 }
1385 LogItemBatch::decode(
1387 &mut &encoded[offset..],
1388 entries_handle,
1389 compression_type,
1390 &LogFileContext::new(
1391 file_context.id,
1392 if version == Version::V1 {
1393 Version::V2
1394 } else {
1395 Version::V1
1396 },
1397 ),
1398 )
1399 .unwrap_err();
1400 }
1401 let decoded_item_batch = LogItemBatch::decode(
1402 &mut &encoded[offset..],
1403 entries_handle,
1404 compression_type,
1405 &file_context,
1406 )
1407 .unwrap();
1408 assert_eq!(decoded_item_batch, item_batch);
1409 assert!(decoded_item_batch.approximate_size() >= len - offset);
1410
1411 let entries = &encoded[LOG_BATCH_HEADER_LEN..offset];
1412 for item in decoded_item_batch.items.iter() {
1413 if let LogItemContent::EntryIndexes(entry_indexes) = &item.content {
1414 if !entry_indexes.0.is_empty() {
1415 let (begin, end) = (
1416 entry_indexes.0.first().unwrap().index,
1417 entry_indexes.0.last().unwrap().index + 1,
1418 );
1419 let origin_entries = generate_entries(begin, end, Some(entry_data));
1420 let decoded_entries =
1421 decode_entries_from_bytes::<Entry>(entries, &entry_indexes.0, false);
1422 assert_eq!(origin_entries, decoded_entries);
1423 }
1424 }
1425 }
1426 }
1427
1428 let mut batches = vec![(LogBatch::default(), Vec::new())];
1429 let mut batch = LogBatch::default();
1430 let entry_data = vec![b'x'; 1024];
1431 batch
1432 .add_entries::<Entry>(7, &generate_entries(1, 11, Some(&entry_data)))
1433 .unwrap();
1434 batch.add_command(7, Command::Clean);
1435 batch.put(7, b"key".to_vec(), b"value".to_vec()).unwrap();
1436 batch.delete(7, b"key2".to_vec());
1437 batch
1438 .add_entries::<Entry>(7, &generate_entries(1, 11, Some(&entry_data)))
1439 .unwrap();
1440 batches.push((batch, entry_data));
1441 let mut batch = LogBatch::default();
1442 batch
1443 .add_entries::<Entry>(17, &generate_entries(0, 1, None))
1444 .unwrap();
1445 batch
1446 .add_entries::<Entry>(27, &generate_entries(1, 11, None))
1447 .unwrap();
1448 batches.push((batch, Vec::new()));
1449
1450 for version in Version::iter() {
1452 for compress in [true, false] {
1453 for (batch, entry_data) in batches.clone().into_iter() {
1454 decode_and_encode(batch, compress, version, &entry_data);
1455 }
1456 }
1457 }
1458 }
1459
1460 #[test]
1461 fn test_log_batch_merge() {
1462 let region_id = 8;
1463 let mut entries = Vec::new();
1464 let mut kvs = Vec::new();
1465 let data = vec![b'x'; 1024];
1466 let file_id = FileId::dummy(LogQueue::Append);
1467 let file_context = LogFileContext::new(file_id, Version::default());
1468
1469 let mut batch1 = LogBatch::default();
1470 entries.push(generate_entries(1, 11, Some(&data)));
1471 batch1
1472 .add_entries::<Entry>(region_id, entries.last().unwrap())
1473 .unwrap();
1474 for i in 0..=2 {
1475 let (k, v) = (format!("k{i}").into_bytes(), format!("v{i}").into_bytes());
1476 batch1.put(region_id, k.clone(), v.clone()).unwrap();
1477 kvs.push((k, v));
1478 }
1479
1480 batch1.merge(&mut LogBatch::default()).unwrap();
1481
1482 let mut batch2 = LogBatch::default();
1483 entries.push(generate_entries(11, 21, Some(&data)));
1484 batch2
1485 .add_entries::<Entry>(region_id, entries.last().unwrap())
1486 .unwrap();
1487 for i in 3..=5 {
1488 let (k, v) = (format!("k{i}").into_bytes(), format!("v{i}").into_bytes());
1489 batch2.put(region_id, k.clone(), v.clone()).unwrap();
1490 kvs.push((k, v));
1491 }
1492
1493 batch1.merge(&mut batch2).unwrap();
1494 assert!(batch2.is_empty());
1495
1496 let len = batch1.finish_populate(0, None).unwrap();
1497 batch1.prepare_write(&file_context).unwrap();
1498 let encoded = batch1.encoded_bytes();
1499 assert_eq!(len, encoded.len());
1500
1501 let (offset, compression_type, len) = LogBatch::decode_header(&mut &*encoded).unwrap();
1503 assert_eq!(encoded.len(), len);
1504 let decoded_item_batch = LogItemBatch::decode(
1505 &mut &encoded[offset..],
1506 FileBlockHandle {
1507 id: file_id,
1508 offset: 0,
1509 len: offset - LOG_BATCH_HEADER_LEN,
1510 },
1511 compression_type,
1512 &file_context,
1513 )
1514 .unwrap();
1515
1516 let entry_bytes = &encoded[LOG_BATCH_HEADER_LEN..offset];
1518 for item in decoded_item_batch.items.iter() {
1519 match &item.content {
1520 LogItemContent::EntryIndexes(entry_indexes) => {
1521 let decoded_entries =
1522 decode_entries_from_bytes::<Entry>(entry_bytes, &entry_indexes.0, false);
1523 assert_eq!(entries.remove(0), decoded_entries);
1524 }
1525 LogItemContent::Kv(kv) => {
1526 let (k, v) = kvs.remove(0);
1527 assert_eq!(OpType::Put, kv.op_type);
1528 assert_eq!(k, kv.key);
1529 assert_eq!(&v, kv.value.as_ref().unwrap());
1530 }
1531 _ => unreachable!(),
1532 }
1533 }
1534 }
1535
1536 #[test]
1537 fn test_empty_log_batch() {
1538 let mut batch = LogBatch::default();
1539 assert!(batch.is_empty());
1540 batch.add_entries::<Entry>(0, &Vec::new()).unwrap();
1541 assert!(batch.is_empty());
1542 batch.add_raw_entries(0, Vec::new(), Vec::new()).unwrap();
1543 assert!(batch.is_empty());
1544 {
1546 let mocked_file_block_handles = FileBlockHandle {
1547 id: FileId::new(LogQueue::Append, 12),
1548 len: 0,
1549 offset: 0,
1550 };
1551 let buf_len = batch.buf.len();
1552 let len = batch.finish_populate(1, None).unwrap();
1553 assert!(len == 0);
1554 assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0));
1555 let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2);
1556 batch.prepare_write(&file_context).unwrap();
1557 assert!(batch.is_empty());
1558 assert_eq!(batch.buf_state, BufState::Sealed(buf_len, 0));
1559 }
1560 }
1561
1562 #[test]
1563 fn test_internal_key() {
1564 let mut batch = LogBatch::default();
1565 assert!(matches!(
1566 batch
1567 .put(0, crate::make_internal_key(&[0]), b"v".to_vec())
1568 .unwrap_err(),
1569 Error::InvalidArgument(_)
1570 ));
1571 assert!(matches!(
1572 batch
1573 .put_message(0, crate::make_internal_key(ATOMIC_GROUP_KEY), &Entry::new())
1574 .unwrap_err(),
1575 Error::InvalidArgument(_)
1576 ));
1577 }
1578
1579 #[test]
1580 fn test_header_corruption() {
1581 let region_id = 7;
1582 let data = vec![b'x'; 16];
1583 let mut batch = LogBatch::default();
1584 batch
1585 .add_entries::<Entry>(region_id, &generate_entries(1, 11, Some(&data)))
1586 .unwrap();
1587 batch
1588 .put(region_id, b"key".to_vec(), b"value".to_vec())
1589 .unwrap();
1590 batch.finish_populate(1, None).unwrap();
1592 let file_context = LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default());
1593 batch.prepare_write(&file_context).unwrap();
1594 let encoded = batch.encoded_bytes();
1595
1596 let mut copy = encoded.to_owned();
1597 copy.truncate(LOG_BATCH_HEADER_LEN - 1);
1598 assert!(LogBatch::decode_header(&mut copy.as_slice())
1599 .unwrap_err()
1600 .to_string()
1601 .contains("Log batch header too short"));
1602
1603 let mut copy = encoded.to_owned();
1604 (&mut copy[LOG_BATCH_HEADER_LEN - 8..LOG_BATCH_HEADER_LEN])
1605 .write_u64::<BigEndian>(encoded.len() as u64 + 1)
1606 .unwrap();
1607 assert!(LogBatch::decode_header(&mut copy.as_slice())
1608 .unwrap_err()
1609 .to_string()
1610 .contains("Log item offset exceeds log batch length"));
1611
1612 let mut copy = encoded.to_owned();
1613 (&mut copy[LOG_BATCH_HEADER_LEN - 8..LOG_BATCH_HEADER_LEN])
1614 .write_u64::<BigEndian>(LOG_BATCH_HEADER_LEN as u64 - 1)
1615 .unwrap();
1616 assert!(LogBatch::decode_header(&mut copy.as_slice())
1617 .unwrap_err()
1618 .to_string()
1619 .contains("Log item offset is smaller than log batch header length"));
1620 }
1621
1622 #[cfg(feature = "nightly")]
1623 #[bench]
1624 fn bench_log_batch_add_entry_and_encode(b: &mut test::Bencher) {
1625 use rand::{thread_rng, Rng};
1626 fn details(log_batch: &mut LogBatch, entries: &[Entry], regions: usize) {
1627 for _ in 0..regions {
1628 log_batch
1629 .add_entries::<Entry>(thread_rng().gen(), entries)
1630 .unwrap();
1631 }
1632 log_batch.finish_populate(0, None).unwrap();
1633 let _ = log_batch.drain();
1634 }
1635 let data: Vec<u8> = (0..128).map(|_| thread_rng().gen()).collect();
1636 let entries = generate_entries(1, 11, Some(&data));
1637 let mut log_batch = LogBatch::default();
1638 details(&mut log_batch, &entries, 100);
1640 b.iter(move || {
1641 details(&mut log_batch, &entries, 100);
1642 });
1643 }
1644
1645 #[test]
1646 fn test_log_batch_sign_signature_repeatedly() {
1647 let mut batch = LogBatch::default();
1649 batch
1650 .add_entries::<Entry>(17, &generate_entries(0, 1, None))
1651 .unwrap();
1652 batch
1653 .add_entries::<Entry>(27, &generate_entries(1, 11, None))
1654 .unwrap();
1655
1656 let mocked_file_block_handles = [
1657 FileBlockHandle {
1658 id: FileId::new(LogQueue::Append, 12),
1659 len: 0,
1660 offset: 0,
1661 },
1662 FileBlockHandle {
1663 id: FileId::new(LogQueue::Append, 18),
1664 len: 0,
1665 offset: 0,
1666 },
1667 FileBlockHandle {
1668 id: FileId::new(LogQueue::Append, 2001),
1669 len: 0,
1670 offset: 0,
1671 },
1672 ];
1673 let old_approximate_size = batch.approximate_size();
1674 let len = batch.finish_populate(1, None).unwrap();
1675 assert!(old_approximate_size >= len);
1676 assert_eq!(batch.approximate_size(), len);
1677 let checksum = batch.item_batch.checksum;
1678
1679 for handle in mocked_file_block_handles {
1682 let mut batch_handle = handle;
1683 batch_handle.len = len;
1684 let file_context = LogFileContext::new(batch_handle.id, Version::V2);
1685 batch.prepare_write(&file_context).unwrap();
1686 assert_eq!(batch.approximate_size(), len);
1687 let encoded = batch.encoded_bytes();
1688 assert_eq!(encoded.len(), len);
1689 let mut bytes_slice = encoded;
1690 let (offset, _, _) = LogBatch::decode_header(&mut bytes_slice).unwrap();
1691 let expected =
1692 verify_checksum_with_signature(&encoded[offset..], file_context.get_signature())
1693 .unwrap();
1694 assert_eq!(expected, checksum);
1695 }
1696 }
1697}