1use std::collections::{HashMap, HashSet};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::time::SystemTime;
7
8use bytes::{Bytes, BytesMut};
9use crabka_protocol::records::{HEADER_LEN, RecordBatch};
10
11use crate::config::LogConfig;
12use crate::error::LogError;
13use crate::leader_epoch_checkpoint::LeaderEpochCheckpoint;
14use crate::name;
15use crate::retention;
16use crate::segment::{RawSegmentRead, Segment};
17use crate::txn_index::{AbortedTxn, TxnIndex};
18
19#[derive(Debug)]
26#[allow(clippy::struct_field_names)]
29pub struct Log {
30 dir: PathBuf,
31 config: std::sync::Arc<std::sync::RwLock<LogConfig>>,
32 segments: Vec<Segment>,
33 active: Option<Segment>,
34 log_start_override: Option<i64>,
43
44 lso: i64,
48
49 pending: HashMap<i64, i64>,
53
54 active_txn_index: TxnIndex,
56
57 epoch_checkpoint: LeaderEpochCheckpoint,
60}
61
62#[derive(Debug)]
69pub struct ReadOutput {
70 pub start_offset: i64,
73 pub batches: Vec<RecordBatch>,
76}
77
78#[derive(Debug, Clone)]
80pub struct RawRead {
81 pub start_offset: i64,
84 pub bytes: Bytes,
87 pub total: usize,
89}
90
91impl RawRead {
92 fn empty(off: i64) -> Self {
93 Self {
94 start_offset: off,
95 bytes: Bytes::new(),
96 total: 0,
97 }
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct SegmentExport {
107 pub base_offset: i64,
109 pub last_offset: i64,
111 pub max_timestamp: i64,
114 pub size_bytes: u64,
116 pub log_path: PathBuf,
118 pub offset_index_path: PathBuf,
120 pub time_index_path: PathBuf,
122 pub transaction_index_path: Option<PathBuf>,
124 pub leader_epochs: Vec<(i32, i64)>,
128}
129
130impl Log {
131 pub fn open(dir: impl AsRef<Path>, config: LogConfig) -> Result<Self, LogError> {
135 let dir = dir.as_ref().to_path_buf();
136 fs::create_dir_all(&dir)?;
137
138 crate::recovery::swap_orphan_recover(&dir)?;
141
142 let mut base_offsets: Vec<i64> = Vec::new();
143 for entry in fs::read_dir(&dir)? {
144 let entry = entry?;
145 let Ok(file_name) = entry.file_name().into_string() else {
146 continue; };
148 if let Ok(base) = name::parse_log_filename(&file_name) {
149 base_offsets.push(base);
150 }
151 }
152 base_offsets.sort_unstable();
153 base_offsets.dedup();
154
155 let mut segments: Vec<Segment> = Vec::with_capacity(base_offsets.len());
156 let mut active: Option<Segment> = None;
157 for (i, base) in base_offsets.iter().enumerate() {
158 if i + 1 < base_offsets.len() {
159 let mut seg = Segment::open(&dir, *base)?;
160 seg.seal();
161 segments.push(seg);
162 } else {
163 active = Some(Segment::open_active(&dir, *base, config.validate_on_open)?);
164 }
165 }
166
167 let active = match active {
168 Some(s) => s,
169 None => Segment::create(&dir, 0)?,
170 };
171
172 let active_txn_index = TxnIndex::open(active.txn_index_path())?;
173 let epoch_checkpoint = LeaderEpochCheckpoint::open(active.leader_epoch_checkpoint_path())?;
174 let lso = active.last_offset() + 1;
176
177 let config = std::sync::Arc::new(std::sync::RwLock::new(config));
178
179 Ok(Self {
180 dir,
181 config,
182 segments,
183 active: Some(active),
184 log_start_override: None,
185 lso,
186 pending: HashMap::new(),
187 active_txn_index,
188 epoch_checkpoint,
189 })
190 }
191
192 #[must_use]
197 pub fn dir(&self) -> &Path {
198 &self.dir
199 }
200
201 #[must_use]
203 pub fn log_start_offset(&self) -> i64 {
204 let derived = if let Some(first) = self.segments.first() {
205 first.base_offset()
206 } else if let Some(active) = &self.active {
207 active.base_offset()
208 } else {
209 0
210 };
211 if let Some(o) = self.log_start_override {
212 return derived.max(o);
213 }
214 derived
215 }
216
217 pub fn set_log_start_offset(&mut self, new_start: i64) -> Result<(), LogError> {
229 if new_start < 0 {
230 return Err(LogError::InvalidArgument(
231 "set_log_start_offset: new_start must be >= 0".into(),
232 ));
233 }
234 self.log_start_override = Some(new_start);
235 Ok(())
236 }
237
238 #[deprecated(note = "use set_log_start_offset")]
240 #[cfg(any(test, feature = "test-helpers"))]
241 pub fn test_set_log_start_offset(&mut self, new_start: i64) -> Result<(), LogError> {
242 self.set_log_start_offset(new_start)
243 }
244
245 pub fn reset_to(&mut self, new_base: i64) -> Result<(), LogError> {
252 if new_base < 0 {
253 return Err(LogError::OffsetMismatch {
254 expected: 0,
255 actual: new_base,
256 });
257 }
258
259 while let Some(popped) = self.segments.pop() {
261 let base = popped.base_offset();
262 drop(popped);
263 let _ = fs::remove_file(name::log_path(&self.dir, base));
264 let _ = fs::remove_file(name::index_path(&self.dir, base));
265 let _ = fs::remove_file(name::timeindex_path(&self.dir, base));
266 }
267
268 if let Some(active) = self.active.take() {
270 let base = active.base_offset();
271 drop(active);
272 let _ = fs::remove_file(name::log_path(&self.dir, base));
273 let _ = fs::remove_file(name::index_path(&self.dir, base));
274 let _ = fs::remove_file(name::timeindex_path(&self.dir, base));
275 }
276
277 self.log_start_override = None;
279
280 let new_active = Segment::create(&self.dir, new_base)?;
281 self.active_txn_index = TxnIndex::open(new_active.txn_index_path())?;
282 self.pending.clear(); self.lso = new_active.last_offset() + 1; self.active = Some(new_active);
285 Ok(())
286 }
287
288 #[must_use]
290 pub fn log_end_offset(&self) -> i64 {
291 if let Some(active) = &self.active {
292 return active.last_offset() + 1;
293 }
294 0
295 }
296
297 #[must_use]
303 pub fn size_bytes(&self) -> u64 {
304 let sealed: u64 = self
305 .segments
306 .iter()
307 .map(super::segment::Segment::size_bytes)
308 .sum();
309 sealed + self.active.as_ref().map_or(0, Segment::size_bytes)
310 }
311
312 #[must_use]
317 pub fn lso(&self) -> i64 {
318 self.lso
319 }
320
321 pub fn close(self) {
324 drop(self);
325 }
326
327 pub fn set_config(&self, new: LogConfig) {
334 *self.config.write().unwrap() = new;
335 }
336
337 #[must_use]
340 pub fn config_snapshot(&self) -> LogConfig {
341 self.config.read().unwrap().clone()
342 }
343
344 #[must_use]
354 pub fn aborted_in_range(&self, start: i64, end: i64) -> Vec<crate::txn_index::AbortedTxn> {
355 self.active_txn_index
356 .aborted_in_range(start, end)
357 .copied()
358 .collect()
359 }
360
361 pub fn append(&mut self, batch: &mut RecordBatch) -> Result<i64, LogError> {
366 let leader_epoch = batch.partition_leader_epoch;
367 let assigned_base = self.log_end_offset();
368 batch.base_offset = assigned_base;
369 self.append_preserving_offset(batch)?;
370 if leader_epoch >= 0
373 && self
374 .epoch_checkpoint
375 .latest_epoch()
376 .is_none_or(|e| leader_epoch > e)
377 {
378 self.epoch_checkpoint.append(leader_epoch, assigned_base)?;
379 }
380 Ok(assigned_base)
381 }
382
383 #[must_use]
385 pub fn epoch_checkpoint(&self) -> &LeaderEpochCheckpoint {
386 &self.epoch_checkpoint
387 }
388
389 pub fn append_at(&mut self, batch: &mut RecordBatch, offset: i64) -> Result<(), LogError> {
401 let expected = self.log_end_offset();
402 if offset != expected {
403 return Err(LogError::OffsetMismatch {
404 expected,
405 actual: offset,
406 });
407 }
408 let leader_epoch = batch.partition_leader_epoch;
409 batch.base_offset = offset;
410 self.append_preserving_offset(batch)?;
411 if leader_epoch >= 0
415 && self
416 .epoch_checkpoint
417 .latest_epoch()
418 .is_none_or(|e| leader_epoch > e)
419 {
420 self.epoch_checkpoint.append(leader_epoch, offset)?;
421 }
422 Ok(())
423 }
424
425 fn append_preserving_offset(&mut self, batch: &mut RecordBatch) -> Result<(), LogError> {
431 let (segment_bytes, index_interval_bytes, flush_on_append) = {
432 let cfg = self.config.read().unwrap();
433 (
434 cfg.segment_bytes,
435 cfg.index_interval_bytes,
436 cfg.flush_on_append,
437 )
438 };
439
440 let should_roll = match &self.active {
441 Some(seg) => seg.size_bytes() >= segment_bytes,
442 None => false,
443 };
444 if should_roll {
445 self.roll_active_segment()?;
446 }
447
448 let active = self
449 .active
450 .as_mut()
451 .expect("active segment must exist after Log::open");
452 active.append(batch, index_interval_bytes)?;
453
454 if flush_on_append {
455 active.flush()?;
456 }
457
458 let pid = batch.producer_id;
460 if batch.attributes.is_control_batch() {
461 let marker_type = batch
464 .records
465 .first()
466 .and_then(|r| r.key.as_deref())
467 .and_then(parse_control_marker_type);
468 if let Some(start) = self.pending.remove(&pid) {
469 let last = batch.base_offset + i64::from(batch.last_offset_delta);
470 if marker_type == Some(0)
471 {
473 self.active_txn_index.append(AbortedTxn {
474 start_offset: start,
475 last_offset: last,
476 producer_id: pid,
477 })?;
478 }
479 }
480 if self.pending.is_empty() {
482 self.lso = self.log_end_offset();
483 }
484 } else if batch.attributes.is_transactional() && pid >= 0 {
485 self.pending.entry(pid).or_insert(batch.base_offset);
487 } else {
489 if self.pending.is_empty() {
491 self.lso = self.log_end_offset();
492 }
493 }
494
495 Ok(())
496 }
497
498 fn roll_active_segment(&mut self) -> Result<(), LogError> {
499 let new_base = self.log_end_offset();
500 let mut old = self
501 .active
502 .take()
503 .expect("active segment must exist before rolling");
504 old.seal();
505 self.segments.push(old);
506 let new_seg = Segment::create(&self.dir, new_base)?;
507 self.active_txn_index = TxnIndex::open(new_seg.txn_index_path())?;
508 self.active = Some(new_seg);
509 Ok(())
510 }
511
512 pub fn read(&self, offset: i64, max_bytes: usize) -> Result<ReadOutput, LogError> {
516 let log_start = self.log_start_offset();
517 let log_end = self.log_end_offset();
518 if offset < log_start {
519 return Err(LogError::OffsetTooLow {
520 requested: offset,
521 log_start,
522 });
523 }
524 if offset >= log_end {
525 return Ok(ReadOutput {
526 start_offset: log_end,
527 batches: Vec::new(),
528 });
529 }
530
531 let mut batches: Vec<RecordBatch> = Vec::new();
532 let mut current_offset = offset;
533 let mut remaining = max_bytes;
534
535 for seg in &self.segments {
536 if seg.last_offset() < current_offset {
537 continue;
538 }
539 let bs = seg.read(current_offset, remaining)?;
540 if !bs.is_empty() {
541 let consumed: usize = bs.iter().map(RecordBatch::encoded_len).sum();
542 remaining = remaining.saturating_sub(consumed);
543 let last = bs.last().expect("non-empty by branch");
544 current_offset = last.base_offset + i64::from(last.last_offset_delta) + 1;
545 batches.extend(bs);
546 if remaining == 0 {
547 break;
548 }
549 }
550 }
551
552 if (remaining > 0 || batches.is_empty())
553 && let Some(active) = &self.active
554 && current_offset <= active.last_offset()
555 {
556 let bs = active.read(current_offset, remaining.max(1))?;
557 batches.extend(bs);
558 }
559
560 let start_offset = batches.first().map_or(offset, |b| b.base_offset);
561 Ok(ReadOutput {
562 start_offset,
563 batches,
564 })
565 }
566
567 pub fn read_raw(
571 &self,
572 fetch_offset: i64,
573 limit_offset: i64,
574 max_bytes: usize,
575 ) -> Result<RawRead, LogError> {
576 let log_start = self.log_start_offset();
577 if fetch_offset < log_start {
578 return Err(LogError::OffsetTooLow {
579 requested: fetch_offset,
580 log_start,
581 });
582 }
583 if fetch_offset >= limit_offset {
584 return Ok(RawRead::empty(fetch_offset));
585 }
586
587 let mut chunks: Vec<Bytes> = Vec::new();
588 let mut start_offset = fetch_offset;
589 let mut current = fetch_offset;
590 let mut remaining = max_bytes;
591 let mut got_first = false;
592
593 for seg in &self.segments {
594 if seg.last_offset() < current {
595 continue;
596 }
597 let r: RawSegmentRead =
598 seg.read_raw(current, limit_offset, remaining.max(HEADER_LEN))?;
599 if !r.is_empty() {
600 if !got_first {
601 start_offset = r.start_offset;
602 got_first = true;
603 }
604 remaining = remaining.saturating_sub(r.bytes.len());
605 current = r.last_offset + 1;
606 chunks.push(r.bytes);
607 if remaining == 0 || current >= limit_offset {
608 break;
609 }
610 }
611 }
612
613 if (remaining > 0 || !got_first)
614 && current < limit_offset
615 && let Some(active) = &self.active
616 && current <= active.last_offset()
617 {
618 let r = active.read_raw(current, limit_offset, remaining.max(HEADER_LEN))?;
619 if !r.is_empty() {
620 if !got_first {
621 start_offset = r.start_offset;
622 }
623 chunks.push(r.bytes);
624 }
625 }
626
627 let bytes = match chunks.len() {
628 0 => Bytes::new(),
629 1 => chunks.pop().expect("len==1"),
630 _ => {
631 let total: usize = chunks.iter().map(Bytes::len).sum();
632 let mut b = BytesMut::with_capacity(total);
633 for c in &chunks {
634 b.extend_from_slice(c);
635 }
636 b.freeze()
637 }
638 };
639 let total = bytes.len();
640 Ok(RawRead {
641 start_offset,
642 bytes,
643 total,
644 })
645 }
646
647 pub fn truncate_to(&mut self, offset: i64) -> Result<(), LogError> {
650 let log_start = self.log_start_offset();
651 let log_end = self.log_end_offset();
652 if offset >= log_end {
653 return Ok(()); }
655 if offset < log_start {
656 return Err(LogError::OffsetTooLow {
657 requested: offset,
658 log_start,
659 });
660 }
661
662 while let Some(last_sealed) = self.segments.last() {
664 if last_sealed.base_offset() >= offset {
665 let popped = self.segments.pop().expect("non-empty by while-let");
666 let base = popped.base_offset();
667 drop(popped);
668 let _ = fs::remove_file(name::log_path(&self.dir, base));
669 let _ = fs::remove_file(name::index_path(&self.dir, base));
670 let _ = fs::remove_file(name::timeindex_path(&self.dir, base));
671 } else {
672 break;
673 }
674 }
675
676 if let Some(active) = &self.active
678 && active.base_offset() >= offset
679 {
680 let base = active.base_offset();
681 self.active = None;
682 let _ = fs::remove_file(name::log_path(&self.dir, base));
683 let _ = fs::remove_file(name::index_path(&self.dir, base));
684 let _ = fs::remove_file(name::timeindex_path(&self.dir, base));
685 }
686
687 if self.active.is_none() {
690 if let Some(mut seg) = self.segments.pop() {
691 let rel = u32::try_from(offset - seg.base_offset())
692 .map_err(|_| LogError::BadSegmentName("offset overflow".into()))?;
693 seg.truncate_to_relative(rel)?;
694 self.active_txn_index = TxnIndex::open(seg.txn_index_path())?;
695 self.active = Some(seg);
696 } else {
697 let new_seg = Segment::create(&self.dir, offset)?;
698 self.active_txn_index = TxnIndex::open(new_seg.txn_index_path())?;
699 self.active = Some(new_seg);
700 }
701 } else if let Some(active) = self.active.as_mut()
702 && active.last_offset() >= offset
703 {
704 let rel = u32::try_from(offset - active.base_offset())
707 .map_err(|_| LogError::BadSegmentName("offset overflow".into()))?;
708 active.truncate_to_relative(rel)?;
709 self.active_txn_index = TxnIndex::open(active.txn_index_path())?;
710 }
711 self.lso = self.lso.min(self.log_end_offset());
713 self.epoch_checkpoint
717 .truncate_from_end(self.log_end_offset())?;
718 Ok(())
719 }
720
721 pub fn trim_to_offset(&mut self, target: i64) -> Result<i64, LogError> {
733 if target < 0 {
734 return Err(LogError::InvalidArgument(
735 "trim_to_offset: target must be >= 0".into(),
736 ));
737 }
738 let leo = self.log_end_offset();
739 let target = target.min(leo);
740 let log_start = self.log_start_offset();
741 if target <= log_start {
742 return Ok(log_start);
743 }
744
745 let active_base = self.active.as_ref().map_or(leo, Segment::base_offset);
752 let next_bases: Vec<i64> = self
753 .segments
754 .iter()
755 .map(Segment::base_offset)
756 .skip(1)
757 .chain(std::iter::once(active_base))
758 .collect();
759
760 let mut to_drop: Vec<i64> = Vec::new();
761 for (seg, next_base) in self.segments.iter().zip(next_bases.iter()) {
762 if *next_base <= target {
763 to_drop.push(seg.base_offset());
764 } else {
765 break;
766 }
767 }
768
769 let drop_set: HashSet<i64> = to_drop.iter().copied().collect();
770 self.segments
771 .retain(|s| !drop_set.contains(&s.base_offset()));
772 for base in &to_drop {
773 let _ = retention::delete_segment_files(&self.dir, *base);
774 }
775
776 let new_log_start = self
780 .segments
781 .first()
782 .map_or(active_base, Segment::base_offset);
783 if target > new_log_start {
784 self.set_log_start_offset(target)?;
785 }
786 Ok(self.log_start_offset())
787 }
788
789 pub fn tick(&mut self, now: SystemTime) -> Result<(), LogError> {
794 if self.config.read().unwrap().remote_storage_enable {
796 return Ok(());
797 }
798 let sealed_refs: Vec<&Segment> = self.segments.iter().collect();
799 let active_size = self.active.as_ref().map_or(0, Segment::size_bytes);
800
801 let cfg_guard = self.config.read().unwrap();
802 let time_evict = retention::time_based_evict(&sealed_refs, &cfg_guard, now);
803 let size_evict = retention::size_based_evict(&sealed_refs, active_size, &cfg_guard);
804 drop(cfg_guard);
805
806 let mut to_evict: Vec<i64> = time_evict;
808 let mut seen: HashSet<i64> = to_evict.iter().copied().collect();
809 for base in size_evict {
810 if seen.insert(base) {
811 to_evict.push(base);
812 }
813 }
814
815 let total_segments = self.segments.len() + usize::from(self.active.is_some());
818 if to_evict.len() >= total_segments {
819 to_evict.truncate(total_segments.saturating_sub(1));
820 }
821
822 let evict: HashSet<i64> = to_evict.iter().copied().collect();
823 self.segments.retain(|s| !evict.contains(&s.base_offset()));
824 for base in to_evict {
825 let _ = retention::delete_segment_files(&self.dir, base);
826 }
827 Ok(())
828 }
829
830 #[must_use]
834 pub fn local_log_start_offset(&self) -> i64 {
835 self.log_start_offset()
836 }
837
838 #[must_use]
845 pub fn offset_for_timestamp(&self, target_ts: i64) -> Option<(i64, i64)> {
846 for seg in &self.segments {
847 if seg.max_timestamp() >= target_ts
848 && let Some(hit) = seg.offset_for_timestamp(target_ts)
849 {
850 return Some(hit);
851 }
852 }
853 if let Some(active) = &self.active
854 && active.max_timestamp() >= target_ts
855 {
856 return active.offset_for_timestamp(target_ts);
857 }
858 None
859 }
860
861 #[must_use]
867 pub fn max_timestamp_offset_and_ts(&self) -> Option<(i64, i64)> {
868 let mut best: Option<(i64, i64)> = None; let candidates = self.segments.iter().chain(self.active.as_ref());
870 for seg in candidates {
871 if let Some((offset, ts)) = seg.offset_of_max_timestamp()
872 && best.is_none_or(|(best_ts, _)| ts > best_ts)
873 {
874 best = Some((ts, offset));
875 }
876 }
877 best.map(|(ts, offset)| (offset, ts))
878 }
879
880 #[must_use]
884 pub fn offset_of_max_timestamp(&self) -> i64 {
885 self.max_timestamp_offset_and_ts()
886 .map_or_else(|| self.log_start_offset(), |(offset, _)| offset)
887 }
888
889 pub fn delete_local_segments_through(&mut self, target: i64) -> Result<usize, LogError> {
904 if target < 0 {
905 return Err(LogError::InvalidArgument(
906 "delete_local_segments_through: target must be >= 0".into(),
907 ));
908 }
909 if target <= self.local_log_start_offset() {
910 return Ok(0);
911 }
912
913 let active_base = self
917 .active
918 .as_ref()
919 .map_or_else(|| self.log_end_offset(), Segment::base_offset);
920 let next_bases: Vec<i64> = self
921 .segments
922 .iter()
923 .map(Segment::base_offset)
924 .skip(1)
925 .chain(std::iter::once(active_base))
926 .collect();
927
928 let to_drop: Vec<i64> = self
929 .segments
930 .iter()
931 .zip(next_bases.iter())
932 .filter_map(|(seg, next_base)| {
933 let last = *next_base - 1;
934 (last < target).then(|| seg.base_offset())
935 })
936 .collect();
937
938 let removed = to_drop.len();
939 let drop_set: HashSet<i64> = to_drop.iter().copied().collect();
940 self.segments
941 .retain(|s| !drop_set.contains(&s.base_offset()));
942 for base in &to_drop {
943 let _ = retention::delete_segment_files(&self.dir, *base);
944 }
945
946 self.log_start_override = Some(target);
949
950 Ok(removed)
951 }
952
953 #[must_use]
963 pub fn tierable_segments(&self) -> Vec<SegmentExport> {
964 let mut epoch_entries = self.epoch_checkpoint.entries().to_vec();
967 epoch_entries.sort_by_key(|e| e.start_offset);
968 let active_base = self
969 .active
970 .as_ref()
971 .map_or_else(|| self.log_end_offset(), Segment::base_offset);
972 let next_bases: Vec<i64> = self
973 .segments
974 .iter()
975 .map(Segment::base_offset)
976 .skip(1)
977 .chain(std::iter::once(active_base))
978 .collect();
979
980 self.segments
981 .iter()
982 .zip(next_bases)
983 .map(|(seg, next_base)| {
984 let base = seg.base_offset();
985 let last = next_base - 1;
986 let max_ts = seg.max_timestamp();
987 let txn = name::txnindex_path(&self.dir, base);
988 SegmentExport {
989 base_offset: base,
990 last_offset: last,
991 max_timestamp: if max_ts == i64::MIN { -1 } else { max_ts },
992 size_bytes: seg.size_bytes(),
993 log_path: name::log_path(&self.dir, base),
994 offset_index_path: name::index_path(&self.dir, base),
995 time_index_path: name::timeindex_path(&self.dir, base),
996 transaction_index_path: txn.exists().then_some(txn),
997 leader_epochs: epochs_for_range(&epoch_entries, base, last),
998 }
999 })
1000 .collect()
1001 }
1002
1003 pub fn compact(&mut self) -> Result<(), LogError> {
1010 if self.segments.is_empty() {
1011 return Ok(());
1012 }
1013
1014 let cfg_guard = self.config.read().unwrap();
1015 if cfg_guard.cleanup_policy != crate::CleanupPolicy::Compact {
1016 return Ok(());
1017 }
1018 let index_interval = cfg_guard.index_interval_bytes;
1019 drop(cfg_guard);
1020
1021 let consumed_bases: Vec<i64> = self.segments.iter().map(Segment::base_offset).collect();
1022
1023 let rewrite = {
1029 let sealed_refs: Vec<&Segment> = self.segments.iter().collect();
1030 let offset_map = crate::compact::build_offset_map(&sealed_refs)?;
1031 crate::compact::rewrite_segments(&self.dir, &sealed_refs, &offset_map, index_interval)?
1032 };
1033
1034 self.segments.clear();
1035 crate::compact::atomic_swap(&self.dir, &consumed_bases, &rewrite)?;
1036
1037 let mut new_seg = Segment::open_active(&self.dir, rewrite.new_base_offset, true)?;
1040 new_seg.seal();
1041 self.segments.push(new_seg);
1042 Ok(())
1043 }
1044}
1045
1046fn epochs_for_range(
1054 sorted: &[crate::leader_epoch_checkpoint::EpochEntry],
1055 base: i64,
1056 last: i64,
1057) -> Vec<(i32, i64)> {
1058 let mut out = Vec::new();
1059 for (i, e) in sorted.iter().enumerate() {
1060 let end = sorted.get(i + 1).map_or(i64::MAX, |n| n.start_offset);
1062 if e.start_offset <= last && end > base {
1063 out.push((e.epoch, e.start_offset.max(base)));
1064 }
1065 }
1066 out
1067}
1068
1069fn parse_control_marker_type(key: &[u8]) -> Option<i16> {
1074 if key.len() < 4 {
1075 return None;
1076 }
1077 let _version = i16::from_be_bytes([key[0], key[1]]);
1078 Some(i16::from_be_bytes([key[2], key[3]]))
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 use super::*;
1084 use crate::leader_epoch_checkpoint::EpochEntry;
1085 use assert2::assert;
1086 use bytes::Bytes;
1087 use crabka_protocol::records::{Attributes, Record};
1088 use tempfile::tempdir;
1089
1090 fn sample_batch(n: i32) -> RecordBatch {
1091 let mut b = RecordBatch {
1092 base_offset: 0, max_timestamp: 0,
1094 last_offset_delta: n - 1,
1095 ..RecordBatch::default()
1096 };
1097 for i in 0..n {
1098 b.records.push(Record {
1099 offset_delta: i,
1100 key: Some(Bytes::from(format!("k{i}"))),
1101 value: Some(Bytes::from(format!("v{i}"))),
1102 ..Default::default()
1103 });
1104 }
1105 b
1106 }
1107
1108 fn test_log() -> (tempfile::TempDir, Log) {
1109 let dir = tempdir().unwrap();
1110 let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1111 (dir, log)
1112 }
1113
1114 fn test_batch_at(_off: i64) -> RecordBatch {
1115 let mut b = RecordBatch {
1117 base_offset: 0,
1118 base_timestamp: 1_000,
1119 max_timestamp: 1_000,
1120 last_offset_delta: 0,
1121 ..RecordBatch::default()
1122 };
1123 b.records.push(Record {
1124 offset_delta: 0,
1125 value: Some(Bytes::from("v")),
1126 ..Default::default()
1127 });
1128 b
1129 }
1130
1131 #[test]
1132 fn log_read_raw_spans_and_is_byte_exact() {
1133 let (dir, mut log) = test_log();
1134 let mut wire = bytes::BytesMut::new();
1135 for off in 0..4i64 {
1136 let mut b = test_batch_at(off);
1137 log.append(&mut b).unwrap();
1138 b.encode(&mut wire).unwrap();
1139 }
1140 let wire = wire.freeze();
1141 let log_end = log.log_end_offset();
1142 let r = log.read_raw(0, log_end, 10 * 1024 * 1024).unwrap();
1143 assert!(r.start_offset == 0);
1144 assert!(r.total == wire.len());
1145 assert!(&r.bytes[..] == &wire[..]);
1146 drop(dir);
1147 }
1148
1149 #[test]
1150 fn log_read_raw_spans_multiple_segments() {
1151 let dir = tempdir().unwrap();
1157 let config = LogConfig {
1158 segment_bytes: 100, ..LogConfig::default()
1160 };
1161 let mut log = Log::open(dir.path(), config).unwrap();
1162
1163 let n: i64 = 6;
1164 let mut wire = bytes::BytesMut::new();
1165 let mut expected_bases = Vec::new();
1166 for off in 0..n {
1167 let mut b = test_batch_at(off);
1168 let base = log.append(&mut b).unwrap();
1169 expected_bases.push(base);
1170 b.encode(&mut wire).unwrap();
1171 }
1172 let wire = wire.freeze();
1173
1174 assert!(
1177 !log.segments.is_empty(),
1178 "expected >=1 sealed segment (segment roll); got 0"
1179 );
1180 assert!(log.active.is_some());
1181
1182 let log_end = log.log_end_offset();
1183 let r = log.read_raw(0, log_end, 10 * 1024 * 1024).unwrap();
1184 assert!(r.start_offset == 0);
1185 assert!(r.total == wire.len());
1186 assert!(
1187 &r.bytes[..] == &wire[..],
1188 "raw bytes must be byte-exact across the segment seam"
1189 );
1190
1191 let mut cur: &[u8] = &r.bytes;
1193 let mut bases = Vec::new();
1194 while !cur.is_empty() {
1195 let b = crabka_protocol::records::RecordBatch::decode(&mut cur).unwrap();
1196 bases.push(b.base_offset);
1197 }
1198 assert!(bases == expected_bases);
1199 drop(dir);
1200 }
1201
1202 #[test]
1203 fn open_empty_dir_creates_first_segment() {
1204 let dir = tempdir().unwrap();
1205 let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1206 assert!(log.log_start_offset() == 0);
1207 assert!(log.log_end_offset() == 0);
1208 log.close();
1209 }
1210
1211 #[test]
1212 fn dir_returns_open_path() {
1213 let dir = tempdir().unwrap();
1217 let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1218 assert!(log.dir() == dir.path());
1219 }
1220
1221 #[test]
1222 fn open_creates_log_file() {
1223 let dir = tempdir().unwrap();
1224 let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1225 drop(log);
1226 let log_path = dir.path().join("00000000000000000000.log");
1227 assert!(log_path.exists());
1228 }
1229
1230 #[test]
1231 fn append_assigns_monotonic_offsets() {
1232 let dir = tempdir().unwrap();
1233 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1234 let mut b1 = sample_batch(3);
1235 let mut b2 = sample_batch(2);
1236 assert!(log.append(&mut b1).unwrap() == 0);
1237 assert!(log.append(&mut b2).unwrap() == 3);
1238 assert!(log.log_end_offset() == 5);
1239 }
1240
1241 #[test]
1242 fn append_at_matching_offset_preserves_caller_offset() {
1243 let dir = tempdir().unwrap();
1244 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1245 let mut b = sample_batch(3);
1246 log.append_at(&mut b, 0).unwrap();
1249 assert!(b.base_offset == 0);
1250 assert!(log.log_end_offset() == 3);
1251
1252 let mut b2 = sample_batch(2);
1253 log.append_at(&mut b2, 3).unwrap();
1254 assert!(b2.base_offset == 3);
1255 assert!(log.log_end_offset() == 5);
1256 }
1257
1258 #[test]
1259 fn append_at_with_mismatched_offset_errors() {
1260 let dir = tempdir().unwrap();
1261 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1262 let mut b = sample_batch(2);
1263 let err = log.append_at(&mut b, 7).unwrap_err();
1264 assert!(matches!(
1265 err,
1266 LogError::OffsetMismatch {
1267 expected: 0,
1268 actual: 7
1269 }
1270 ));
1271 assert!(log.log_end_offset() == 0);
1273 }
1274
1275 #[test]
1276 fn append_then_read_back_in_order() {
1277 let dir = tempdir().unwrap();
1278 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1279 for _ in 0..3 {
1280 let mut b = sample_batch(2);
1281 log.append(&mut b).unwrap();
1282 }
1283 let out = log.read(0, usize::MAX).unwrap();
1284 assert!(out.batches.len() == 3);
1285 assert!(out.start_offset == 0);
1286 }
1287
1288 #[test]
1289 fn read_offset_too_low_errors() {
1290 let dir = tempdir().unwrap();
1291 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1292 let mut b = sample_batch(2);
1293 log.append(&mut b).unwrap();
1294 assert!(matches!(
1295 log.read(-1, 1024),
1296 Err(LogError::OffsetTooLow { .. })
1297 ));
1298 }
1299
1300 #[test]
1301 fn read_at_log_end_returns_empty() {
1302 let dir = tempdir().unwrap();
1303 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1304 let mut b = sample_batch(2);
1305 log.append(&mut b).unwrap();
1306 let out = log.read(log.log_end_offset(), 1024).unwrap();
1307 assert!(out.batches.is_empty());
1308 }
1309
1310 #[test]
1311 fn truncate_to_drops_later_records() {
1312 let dir = tempdir().unwrap();
1313 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1314 let mut b1 = sample_batch(3);
1315 let mut b2 = sample_batch(2);
1316 log.append(&mut b1).unwrap();
1317 log.append(&mut b2).unwrap();
1318 assert!(log.log_end_offset() == 5);
1319 log.truncate_to(3).unwrap();
1320 assert!(log.log_end_offset() == 3);
1322 }
1323
1324 #[test]
1325 fn truncate_to_log_end_is_noop() {
1326 let dir = tempdir().unwrap();
1327 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1328 let mut b = sample_batch(2);
1329 log.append(&mut b).unwrap();
1330 let before = log.log_end_offset();
1331 log.truncate_to(before + 100).unwrap();
1332 assert!(log.log_end_offset() == before);
1333 }
1334
1335 #[test]
1336 fn open_recovers_partial_trailing_batch() {
1337 let dir = tempdir().unwrap();
1338 {
1339 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1340 let mut b1 = sample_batch(3);
1341 let mut b2 = sample_batch(2);
1342 log.append(&mut b1).unwrap();
1343 log.append(&mut b2).unwrap();
1344 }
1345 let log_path = dir.path().join("00000000000000000000.log");
1347 let mut f = std::fs::OpenOptions::new()
1348 .append(true)
1349 .open(&log_path)
1350 .unwrap();
1351 std::io::Write::write_all(&mut f, &[0xAB; 10]).unwrap();
1352 f.sync_data().unwrap();
1353 drop(f);
1354 let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1355 assert!(log.log_end_offset() == 5);
1356 }
1357
1358 #[test]
1359 fn tick_with_no_retention_is_noop() {
1360 let dir = tempdir().unwrap();
1361 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1362 let mut b1 = sample_batch(2);
1363 let mut b2 = sample_batch(3);
1364 log.append(&mut b1).unwrap();
1365 log.append(&mut b2).unwrap();
1366 let before = log.log_end_offset();
1367 log.tick(SystemTime::now()).unwrap();
1368 assert!(log.log_end_offset() == before);
1369 }
1370
1371 #[test]
1372 fn tick_never_deletes_only_segment() {
1373 use std::time::Duration;
1374 let dir = tempdir().unwrap();
1375 let config = LogConfig {
1376 retention_ms: Some(Duration::from_secs(1)),
1377 retention_bytes: Some(0),
1378 ..LogConfig::default()
1379 };
1380 let mut log = Log::open(dir.path(), config).unwrap();
1381 let mut b1 = sample_batch(2);
1382 log.append(&mut b1).unwrap();
1383 let now = SystemTime::now() + Duration::from_hours(30 * 24);
1385 log.tick(now).unwrap();
1386 assert!(log.log_end_offset() == 2);
1387 }
1388
1389 #[test]
1390 fn segment_rolls_when_bytes_exceeded() {
1391 let dir = tempdir().unwrap();
1392 let config = LogConfig {
1393 segment_bytes: 200, ..LogConfig::default()
1395 };
1396 let mut log = Log::open(dir.path(), config).unwrap();
1397 for _ in 0..5 {
1398 let mut b = sample_batch(2);
1399 log.append(&mut b).unwrap();
1400 }
1401 let log_files: Vec<_> = std::fs::read_dir(dir.path())
1403 .unwrap()
1404 .filter_map(Result::ok)
1405 .filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("log"))
1406 .collect();
1407 assert!(
1408 log_files.len() >= 2,
1409 "expected segment roll; got {} .log files",
1410 log_files.len()
1411 );
1412 }
1413
1414 fn transactional_batch(pid: i64, epoch: i16, values: &[&str]) -> RecordBatch {
1418 let last_offset_delta = i32::try_from(values.len()).unwrap() - 1;
1419 let mut records = Vec::new();
1420 for (i, v) in values.iter().enumerate() {
1421 records.push(Record {
1422 offset_delta: i32::try_from(i).unwrap(),
1423 value: Some(Bytes::from(v.to_string())),
1424 ..Default::default()
1425 });
1426 }
1427 RecordBatch {
1428 base_offset: 0, last_offset_delta,
1430 producer_id: pid,
1431 producer_epoch: epoch,
1432 attributes: Attributes::default().with_transactional(true),
1433 records,
1434 ..RecordBatch::default()
1435 }
1436 }
1437
1438 fn control_key(marker_type: i16) -> Bytes {
1440 let mut buf = [0u8; 4];
1441 buf[0..2].copy_from_slice(&0i16.to_be_bytes()); buf[2..4].copy_from_slice(&marker_type.to_be_bytes());
1443 Bytes::from(buf.to_vec())
1444 }
1445
1446 fn commit_marker(pid: i64, epoch: i16) -> RecordBatch {
1449 RecordBatch {
1450 base_offset: 0,
1451 last_offset_delta: 0,
1452 producer_id: pid,
1453 producer_epoch: epoch,
1454 attributes: Attributes::default()
1455 .with_transactional(true)
1456 .with_control(true),
1457 records: vec![Record {
1458 offset_delta: 0,
1459 key: Some(control_key(1 )),
1460 ..Default::default()
1461 }],
1462 ..RecordBatch::default()
1463 }
1464 }
1465
1466 fn abort_marker(pid: i64, epoch: i16) -> RecordBatch {
1469 RecordBatch {
1470 base_offset: 0,
1471 last_offset_delta: 0,
1472 producer_id: pid,
1473 producer_epoch: epoch,
1474 attributes: Attributes::default()
1475 .with_transactional(true)
1476 .with_control(true),
1477 records: vec![Record {
1478 offset_delta: 0,
1479 key: Some(control_key(0 )),
1480 ..Default::default()
1481 }],
1482 ..RecordBatch::default()
1483 }
1484 }
1485
1486 #[test]
1489 fn transactional_batch_holds_lso() {
1490 let dir = tempdir().unwrap();
1491 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1492 let mut b0 = sample_batch(1);
1494 log.append(&mut b0).unwrap();
1495 assert!(log.lso() == log.log_end_offset());
1496
1497 let mut b1 = transactional_batch(1000, 0, &["a", "b"]); let old_lso = log.lso();
1500 log.append(&mut b1).unwrap();
1501 assert!(
1502 log.lso() == old_lso,
1503 "LSO must not advance while txn in flight"
1504 );
1505
1506 let mut commit = commit_marker(1000, 0);
1508 log.append(&mut commit).unwrap();
1509 assert!(log.lso() == log.log_end_offset());
1510 }
1511
1512 #[test]
1513 fn abort_marker_writes_txnindex_entry() {
1514 let dir = tempdir().unwrap();
1515 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1516 let mut t = transactional_batch(1000, 0, &["a", "b", "c"]);
1517 log.append(&mut t).unwrap();
1518
1519 let mut a = abort_marker(1000, 0);
1520 log.append(&mut a).unwrap();
1521
1522 let idx = TxnIndex::open(dir.path().join("00000000000000000000.txnindex")).unwrap();
1523 let entries = idx.entries();
1524 assert!(entries.len() == 1);
1525 assert!(entries[0].producer_id == 1000);
1526 assert!(entries[0].start_offset == 0);
1528 assert!(entries[0].last_offset == 3);
1531 }
1532
1533 #[test]
1534 fn lso_held_by_remaining_producer_after_partial_commit() {
1535 use tempfile::TempDir;
1536 let dir = TempDir::new().unwrap();
1537 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1538
1539 let mut t1 = transactional_batch(1000, 0, &["a", "b"]);
1541 log.append(&mut t1).unwrap();
1542 let mut t2 = transactional_batch(2000, 0, &["c"]);
1543 log.append(&mut t2).unwrap();
1544 let lso_after_open = log.lso();
1545
1546 let mut c1 = commit_marker(1000, 0);
1548 log.append(&mut c1).unwrap();
1549 assert!(log.lso() == lso_after_open, "LSO held by producer 2000");
1550
1551 let mut c2 = commit_marker(2000, 0);
1553 log.append(&mut c2).unwrap();
1554 assert!(log.lso() == log.log_end_offset());
1555 }
1556
1557 fn sample_batch_with_epoch(n: i32, epoch: i32) -> RecordBatch {
1558 let mut b = sample_batch(n);
1559 b.partition_leader_epoch = epoch;
1560 b
1561 }
1562
1563 #[test]
1564 fn append_records_epoch_transition() {
1565 use tempfile::TempDir;
1566 let dir = TempDir::new().unwrap();
1567 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1568 let mut b = sample_batch_with_epoch(3, 0);
1569 log.append(&mut b).unwrap();
1570 let mut b2 = sample_batch_with_epoch(2, 1); log.append(&mut b2).unwrap();
1572 assert!(
1573 log.epoch_checkpoint().entries()
1574 == &[
1575 EpochEntry {
1576 epoch: 0,
1577 start_offset: 0
1578 },
1579 EpochEntry {
1580 epoch: 1,
1581 start_offset: 3
1582 }
1583 ]
1584 );
1585 }
1586
1587 #[test]
1588 fn truncate_to_drops_stale_epoch_checkpoint_entries() {
1589 use tempfile::TempDir;
1590 let dir = TempDir::new().unwrap();
1591 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1592 let mut b1 = sample_batch_with_epoch(3, 1);
1594 log.append(&mut b1).unwrap();
1595 let epoch7_start = log.log_end_offset();
1596 let mut b2 = sample_batch_with_epoch(2, 7);
1597 log.append(&mut b2).unwrap();
1598 assert!(log.epoch_checkpoint().latest_epoch() == Some(7));
1599
1600 log.truncate_to(epoch7_start).unwrap();
1602
1603 assert!(log.epoch_checkpoint().latest_epoch() == Some(1));
1604 let leo = log.log_end_offset();
1605 assert!(log.epoch_checkpoint().end_offset_for_epoch(7, leo) == -1);
1606 assert!(log.epoch_checkpoint().end_offset_for_epoch(1, leo) == leo);
1607 }
1608
1609 #[test]
1610 fn set_config_swaps_active_config() {
1611 let dir = tempdir().expect("tempdir");
1612 let log = Log::open(
1613 dir.path(),
1614 LogConfig {
1615 retention_ms: Some(std::time::Duration::from_mins(1)),
1616 ..LogConfig::default()
1617 },
1618 )
1619 .expect("open");
1620 log.set_config(LogConfig {
1621 retention_ms: Some(std::time::Duration::from_mins(2)),
1622 ..LogConfig::default()
1623 });
1624 assert!(log.config_snapshot().retention_ms == Some(std::time::Duration::from_mins(2)));
1625 }
1626
1627 #[test]
1628 fn trim_to_offset_drops_old_segments() {
1629 let dir = tempdir().expect("tempdir");
1630 let mut log = Log::open(
1631 dir.path(),
1632 LogConfig {
1633 segment_bytes: 200, ..LogConfig::default()
1635 },
1636 )
1637 .expect("open");
1638 for _ in 0..30 {
1640 let mut b = sample_batch(1);
1641 log.append(&mut b).expect("append");
1642 }
1643 let leo = log.log_end_offset();
1644 let new_start = log.trim_to_offset(15).expect("trim");
1645 assert!(new_start <= 15);
1649 assert!(log.log_end_offset() == leo);
1650 assert!(log.log_start_offset() >= 0);
1653 }
1654
1655 #[test]
1656 fn trim_to_offset_clamps_to_leo() {
1657 let dir = tempdir().expect("tempdir");
1658 let mut log = Log::open(dir.path(), LogConfig::default()).expect("open");
1659 for _ in 0..3 {
1660 let mut b = sample_batch(1);
1661 log.append(&mut b).expect("append");
1662 }
1663 let leo = log.log_end_offset();
1664 let new_start = log.trim_to_offset(999).expect("trim");
1665 assert!(new_start == leo);
1667 }
1668
1669 #[test]
1670 fn trim_to_offset_rejects_negative() {
1671 let dir = tempdir().expect("tempdir");
1672 let mut log = Log::open(dir.path(), LogConfig::default()).expect("open");
1673 assert!(log.trim_to_offset(-5).is_err());
1674 }
1675
1676 #[test]
1677 fn trim_to_offset_idempotent_at_or_below_log_start() {
1678 let dir = tempdir().expect("tempdir");
1679 let mut log = Log::open(dir.path(), LogConfig::default()).expect("open");
1680 for _ in 0..3 {
1681 let mut b = sample_batch(1);
1682 log.append(&mut b).expect("append");
1683 }
1684 let r = log.trim_to_offset(0).expect("trim");
1686 assert!(r == log.log_start_offset());
1687 }
1688
1689 fn keyed_batch(base: i64, items: &[(i32, &[u8], &[u8])]) -> RecordBatch {
1690 let records: Vec<Record> = items
1691 .iter()
1692 .map(|(d, k, v)| Record {
1693 offset_delta: *d,
1694 key: Some(Bytes::copy_from_slice(k)),
1695 value: Some(Bytes::copy_from_slice(v)),
1696 ..Default::default()
1697 })
1698 .collect();
1699 let last_delta = items.iter().map(|(d, _, _)| *d).max().unwrap_or(0);
1700 RecordBatch {
1701 base_offset: base,
1702 last_offset_delta: last_delta,
1703 max_timestamp: 0,
1704 records,
1705 ..RecordBatch::default()
1706 }
1707 }
1708
1709 #[test]
1710 fn compact_no_op_when_only_one_segment() {
1711 let dir = tempdir().unwrap();
1712 let cfg = LogConfig {
1713 cleanup_policy: crate::CleanupPolicy::Compact,
1714 ..Default::default()
1715 };
1716 let mut log = Log::open(dir.path(), cfg).unwrap();
1717 let mut b = keyed_batch(0, &[(0, b"k1", b"v1")]);
1718 log.append(&mut b).unwrap();
1719 log.compact().unwrap();
1721 assert!(log.log_end_offset() == 1);
1722 }
1723
1724 #[test]
1725 fn compact_dedupes_sealed_segments_keeps_active_intact() {
1726 let dir = tempdir().unwrap();
1727 let cfg = LogConfig {
1728 cleanup_policy: crate::CleanupPolicy::Compact,
1729 segment_bytes: 256, ..Default::default()
1731 };
1732 let mut log = Log::open(dir.path(), cfg).unwrap();
1733
1734 for i in 0..3 {
1736 let v = format!("v{i}");
1737 let mut b = keyed_batch(0, &[(0, b"k1", v.as_bytes())]);
1738 log.append(&mut b).unwrap();
1739 }
1742 let mut b = keyed_batch(0, &[(0, b"active-key", b"active-value")]);
1745 log.append(&mut b).unwrap();
1746
1747 let active_leo_before = log.log_end_offset();
1748 log.compact().unwrap();
1749 assert!(
1750 log.log_end_offset() == active_leo_before,
1751 "compaction must not change LEO"
1752 );
1753
1754 let out = log.read(0, 1024 * 1024).unwrap();
1757 let all_records: Vec<_> = out.batches.iter().flat_map(|b| b.records.iter()).collect();
1758 let keys: Vec<&[u8]> = all_records
1759 .iter()
1760 .map(|r| r.key.as_ref().unwrap().as_ref())
1761 .collect();
1762 assert!(keys.contains(&b"k1".as_ref()), "k1 must survive as newest");
1763 assert!(
1764 keys.contains(&b"active-key".as_ref()),
1765 "active segment record must survive"
1766 );
1767 }
1768
1769 #[test]
1770 fn tierable_segments_excludes_active_and_reports_paths() {
1771 let dir = tempdir().unwrap();
1772 let config = LogConfig {
1773 segment_bytes: 200, ..LogConfig::default()
1775 };
1776 let mut log = Log::open(dir.path(), config).unwrap();
1777 for _ in 0..10 {
1778 let mut b = sample_batch(2);
1779 log.append(&mut b).unwrap();
1780 }
1781 let sealed_count = std::fs::read_dir(dir.path())
1782 .unwrap()
1783 .filter_map(Result::ok)
1784 .filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("log"))
1785 .count()
1786 - 1; let exports = log.tierable_segments();
1789 assert!(
1790 exports.len() == sealed_count,
1791 "one export per sealed segment"
1792 );
1793
1794 let active_base = log.log_end_offset(); let mut prev_last = -1;
1796 for ex in &exports {
1797 assert!(ex.log_path.exists(), "log file present: {:?}", ex.log_path);
1798 assert!(ex.offset_index_path.exists());
1799 assert!(ex.time_index_path.exists());
1800 assert!(ex.last_offset >= ex.base_offset);
1801 assert!(ex.base_offset > prev_last, "segments are offset-ordered");
1802 prev_last = ex.last_offset;
1803 assert!(
1804 ex.last_offset < active_base,
1805 "sealed segments end before the log end"
1806 );
1807 }
1808 }
1809
1810 #[test]
1811 fn tierable_segments_empty_for_single_active_segment() {
1812 let dir = tempdir().unwrap();
1813 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1814 let mut b = sample_batch(3);
1815 log.append(&mut b).unwrap();
1816 assert!(log.tierable_segments().is_empty());
1818 }
1819
1820 #[test]
1821 fn tierable_segments_last_offset_matches_next_base() {
1822 let dir = tempdir().unwrap();
1823 let config = LogConfig {
1824 segment_bytes: 200,
1825 ..LogConfig::default()
1826 };
1827 let mut log = Log::open(dir.path(), config).unwrap();
1828 for _ in 0..8 {
1829 let mut b = sample_batch(2);
1830 log.append(&mut b).unwrap();
1831 }
1832 let exports = log.tierable_segments();
1833 for pair in exports.windows(2) {
1836 assert!(pair[0].last_offset + 1 == pair[1].base_offset);
1837 }
1838 }
1839
1840 #[test]
1841 fn tierable_segments_carry_leader_epochs() {
1842 let dir = tempdir().unwrap();
1843 let config = LogConfig {
1844 segment_bytes: 200,
1845 ..LogConfig::default()
1846 };
1847 let mut log = Log::open(dir.path(), config).unwrap();
1848 for _ in 0..4 {
1850 let mut b = sample_batch_with_epoch(2, 0);
1851 log.append(&mut b).unwrap();
1852 }
1853 for _ in 0..4 {
1854 let mut b = sample_batch_with_epoch(2, 1);
1855 log.append(&mut b).unwrap();
1856 }
1857 let exports = log.tierable_segments();
1858 assert!(!exports.is_empty());
1859 for ex in &exports {
1862 assert!(!ex.leader_epochs.is_empty(), "export has leader epochs");
1863 for (_epoch, start) in &ex.leader_epochs {
1864 assert!(*start >= ex.base_offset);
1865 assert!(*start <= ex.last_offset);
1866 }
1867 }
1868 }
1869
1870 #[test]
1871 fn epochs_for_range_clamps_and_filters() {
1872 use crate::leader_epoch_checkpoint::EpochEntry;
1873 let entries = vec![
1874 EpochEntry {
1875 epoch: 0,
1876 start_offset: 0,
1877 },
1878 EpochEntry {
1879 epoch: 1,
1880 start_offset: 50,
1881 },
1882 EpochEntry {
1883 epoch: 2,
1884 start_offset: 100,
1885 },
1886 ];
1887 assert!(epochs_for_range(&entries, 60, 90) == vec![(1, 60)]);
1889 assert!(epochs_for_range(&entries, 40, 60) == vec![(0, 40), (1, 50)]);
1891 assert!(epochs_for_range(&entries, 0, 200) == vec![(0, 0), (1, 50), (2, 100)]);
1893 assert!(epochs_for_range(&[], 0, 100).is_empty());
1895 }
1896
1897 #[test]
1898 fn compact_is_idempotent() {
1899 let dir = tempdir().unwrap();
1900 let cfg = LogConfig {
1901 cleanup_policy: crate::CleanupPolicy::Compact,
1902 segment_bytes: 256,
1903 ..Default::default()
1904 };
1905 let mut log = Log::open(dir.path(), cfg).unwrap();
1906 for i in 0..3 {
1907 let v = format!("v{i}");
1908 let mut b = keyed_batch(0, &[(0, b"k1", v.as_bytes())]);
1909 log.append(&mut b).unwrap();
1910 }
1911 let mut b = keyed_batch(0, &[(0, b"active", b"x")]);
1912 log.append(&mut b).unwrap();
1913 log.compact().unwrap();
1914 let leo1 = log.log_end_offset();
1915 log.compact().unwrap();
1916 let leo2 = log.log_end_offset();
1917 assert!(leo1 == leo2);
1918 }
1919
1920 #[allow(clippy::needless_pass_by_value)]
1925 fn rolled_log(dir: &std::path::Path, extra: LogConfig) -> Log {
1926 let mut log = Log::open(
1927 dir,
1928 LogConfig {
1929 segment_bytes: 200,
1930 ..extra
1931 },
1932 )
1933 .unwrap();
1934 for _ in 0..16 {
1935 let mut b = sample_batch(2);
1936 log.append(&mut b).unwrap();
1937 }
1938 log
1939 }
1940
1941 #[test]
1942 fn local_log_start_offset_matches_log_start_offset() {
1943 let dir = tempdir().unwrap();
1944 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1945 for _ in 0..3 {
1946 let mut b = sample_batch(2);
1947 log.append(&mut b).unwrap();
1948 }
1949 assert!(log.local_log_start_offset() == log.log_start_offset());
1950 }
1951
1952 #[test]
1953 fn delete_local_segments_through_drops_sealed_below_target() {
1954 let dir = tempdir().unwrap();
1955 let mut log = rolled_log(dir.path(), LogConfig::default());
1956 let exports = log.tierable_segments();
1957 assert!(
1958 exports.len() >= 3,
1959 "test needs multiple sealed segments; got {}",
1960 exports.len()
1961 );
1962
1963 let target = exports[1].last_offset + 1;
1967 let expected_deleted: Vec<i64> = exports
1968 .iter()
1969 .filter(|e| e.last_offset < target)
1970 .map(|e| e.base_offset)
1971 .collect();
1972 let active_base_before = log.log_end_offset();
1973
1974 let removed = log.delete_local_segments_through(target).unwrap();
1975 assert!(removed == expected_deleted.len());
1976
1977 let remaining_bases: Vec<i64> = log
1979 .tierable_segments()
1980 .iter()
1981 .map(|e| e.base_offset)
1982 .collect();
1983 for base in &expected_deleted {
1984 assert!(
1985 !remaining_bases.contains(base),
1986 "base {base} should be dropped"
1987 );
1988 }
1989
1990 for base in &expected_deleted {
1992 assert!(!name::log_path(dir.path(), *base).exists());
1993 assert!(!name::index_path(dir.path(), *base).exists());
1994 assert!(!name::timeindex_path(dir.path(), *base).exists());
1995 }
1996
1997 assert!(log.log_end_offset() == active_base_before);
1999 }
2000
2001 #[test]
2002 fn delete_local_segments_through_keeps_active_segment() {
2003 let dir = tempdir().unwrap();
2004 let mut log = rolled_log(dir.path(), LogConfig::default());
2005 let leo_before = log.log_end_offset();
2006 let active_log = dir.path().join(format!(
2007 "{:020}.log",
2008 log.tierable_segments().last().unwrap().last_offset + 1
2009 ));
2010 assert!(active_log.exists());
2012
2013 let huge_target = leo_before + 1_000_000;
2016 let _ = log.delete_local_segments_through(huge_target).unwrap();
2017 assert!(active_log.exists(), "active segment must survive");
2018 assert!(
2019 log.log_end_offset() == leo_before,
2020 "active segment untouched (LEO unchanged)"
2021 );
2022 assert!(log.tierable_segments().is_empty());
2024 }
2025
2026 #[test]
2027 fn delete_local_segments_through_advances_local_start_pointer() {
2028 let dir = tempdir().unwrap();
2029 let mut log = rolled_log(dir.path(), LogConfig::default());
2030 let exports = log.tierable_segments();
2031 let target = exports[1].last_offset + 1;
2032 log.delete_local_segments_through(target).unwrap();
2033 assert!(log.local_log_start_offset() == target);
2034 assert!(log.log_start_offset() == target);
2035 }
2036
2037 #[test]
2038 fn delete_local_segments_through_is_noop_at_or_below_current_start() {
2039 let dir = tempdir().unwrap();
2040 let mut log = rolled_log(dir.path(), LogConfig::default());
2041 let start_before = log.log_start_offset();
2042 let sealed_before = log.tierable_segments().len();
2043
2044 let removed = log.delete_local_segments_through(start_before).unwrap();
2045 assert!(removed == 0);
2046 let removed_below = log
2047 .delete_local_segments_through((start_before - 1).max(0))
2048 .unwrap();
2049 assert!(removed_below == 0);
2050 assert!(log.log_start_offset() == start_before);
2051 assert!(log.tierable_segments().len() == sealed_before);
2052 }
2053
2054 #[test]
2055 fn delete_local_segments_through_rejects_negative_target() {
2056 let dir = tempdir().unwrap();
2057 let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
2058 let err = log.delete_local_segments_through(-1).unwrap_err();
2059 assert!(matches!(err, LogError::InvalidArgument(_)));
2060 }
2061
2062 #[test]
2063 fn tick_skips_retention_when_remote_storage_enable_is_true() {
2064 use std::time::Duration;
2065 let far_future = SystemTime::now() + Duration::from_hours(365 * 24);
2066
2067 let dir_tiered = tempdir().unwrap();
2069 let mut tiered = rolled_log(
2070 dir_tiered.path(),
2071 LogConfig {
2072 remote_storage_enable: true,
2073 retention_ms: Some(Duration::from_millis(1)),
2074 ..LogConfig::default()
2075 },
2076 );
2077 let sealed_before = tiered.tierable_segments().len();
2078 assert!(sealed_before > 0, "test setup must roll multiple segments");
2079 tiered.tick(far_future).unwrap();
2080 assert!(
2081 tiered.tierable_segments().len() == sealed_before,
2082 "tiered topics' retention is the RemoteLogManager's job"
2083 );
2084
2085 let dir_plain = tempdir().unwrap();
2087 let mut plain = rolled_log(
2088 dir_plain.path(),
2089 LogConfig {
2090 remote_storage_enable: false,
2091 retention_ms: Some(Duration::from_millis(1)),
2092 ..LogConfig::default()
2093 },
2094 );
2095 assert!(!plain.tierable_segments().is_empty());
2096 plain.tick(far_future).unwrap();
2097 assert!(
2100 plain.tierable_segments().len() == 0,
2101 "standard retention deletes all sealed segments"
2102 );
2103 }
2104
2105 fn ts_batch(ts: i64) -> RecordBatch {
2106 let mut b = RecordBatch {
2107 base_offset: 0, base_timestamp: ts,
2109 max_timestamp: ts,
2110 last_offset_delta: 0,
2111 ..RecordBatch::default()
2112 };
2113 b.records.push(Record {
2114 offset_delta: 0,
2115 timestamp_delta: 0,
2116 value: Some(Bytes::from("v")),
2117 ..Default::default()
2118 });
2119 b
2120 }
2121
2122 #[test]
2123 fn log_offset_for_timestamp_across_segments() {
2124 let dir = tempdir().unwrap();
2125 let config = LogConfig {
2126 segment_bytes: 1, ..LogConfig::default()
2128 };
2129 let mut log = Log::open(dir.path(), config).unwrap();
2130 for (i, ts) in [100, 200, 300, 400, 500].into_iter().enumerate() {
2132 let mut b = ts_batch(ts);
2133 assert!(log.append(&mut b).unwrap() == i64::try_from(i).unwrap());
2134 }
2135 assert!(log.offset_for_timestamp(50) == Some((0, 100)));
2137 assert!(log.offset_for_timestamp(300) == Some((2, 300)));
2139 assert!(log.offset_for_timestamp(350) == Some((3, 400)));
2141 assert!(log.offset_for_timestamp(500) == Some((4, 500)));
2143 assert!(log.offset_for_timestamp(600) == None);
2145 log.close();
2146 drop(dir);
2147 }
2148
2149 #[test]
2150 fn log_offset_for_timestamp_empty_log_is_none() {
2151 let dir = tempdir().unwrap();
2152 let log = Log::open(dir.path(), LogConfig::default()).unwrap();
2153 assert!(log.offset_for_timestamp(0) == None);
2154 log.close();
2155 drop(dir);
2156 }
2157
2158 #[test]
2159 fn log_offset_of_max_timestamp_in_active() {
2160 let dir = tempdir().unwrap();
2161 let config = LogConfig {
2162 segment_bytes: 1, ..LogConfig::default()
2164 };
2165 let mut log = Log::open(dir.path(), config).unwrap();
2166 for ts in [100, 300, 200] {
2168 let mut b = ts_batch(ts);
2169 log.append(&mut b).unwrap();
2170 }
2171 assert!(log.offset_of_max_timestamp() == 1);
2172 log.close();
2173 drop(dir);
2174 }
2175
2176 #[test]
2177 fn log_offset_of_max_timestamp_empty_is_log_start() {
2178 let dir = tempdir().unwrap();
2179 let log = Log::open(dir.path(), LogConfig::default()).unwrap();
2180 assert!(log.offset_of_max_timestamp() == log.log_start_offset());
2181 assert!(log.max_timestamp_offset_and_ts() == None);
2182 log.close();
2183 drop(dir);
2184 }
2185
2186 #[test]
2187 fn log_max_timestamp_offset_and_ts_returns_pair() {
2188 let dir = tempdir().unwrap();
2189 let config = LogConfig {
2190 segment_bytes: 1,
2191 ..LogConfig::default()
2192 };
2193 let mut log = Log::open(dir.path(), config).unwrap();
2194 for ts in [100, 300, 200] {
2195 let mut b = ts_batch(ts);
2196 log.append(&mut b).unwrap();
2197 }
2198 assert!(log.max_timestamp_offset_and_ts() == Some((1, 300)));
2200 log.close();
2201 drop(dir);
2202 }
2203}