1use std::fs::{File, OpenOptions};
5use std::io::{Seek, SeekFrom};
6use std::path::{Path, PathBuf};
7
8use bytes::Bytes;
9use crabka_protocol::records::{HEADER_LEN, RecordBatch, RecordBatchHeader};
10use zerocopy::FromBytes;
11
12use crate::error::LogError;
13use crate::index::{OffsetIndex, TimeIndex};
14use crate::name;
15
16fn read_full_at(file: &File, mut offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
22 let mut total = 0;
23 while total < buf.len() {
24 match read_at(file, offset, &mut buf[total..]) {
25 Ok(0) => break, Ok(n) => {
27 total += n;
28 offset += n as u64;
29 }
30 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
31 Err(e) => return Err(e),
32 }
33 }
34 Ok(total)
35}
36
37#[cfg(unix)]
38fn read_at(file: &File, offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
39 std::os::unix::fs::FileExt::read_at(file, buf, offset)
40}
41
42#[cfg(windows)]
43fn read_at(file: &File, offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
44 std::os::windows::fs::FileExt::seek_read(file, buf, offset)
45}
46
47#[derive(Debug)]
57pub struct Segment {
58 #[allow(dead_code)] dir: PathBuf,
60 base_offset: i64,
61 log_file: File,
62 log_size: u64,
63 offset_index: OffsetIndex,
64 time_index: TimeIndex,
65 sealed: bool,
68 max_timestamp: i64,
70 last_offset: i64,
72}
73
74#[derive(Debug, Clone)]
76pub struct RawSegmentRead {
77 pub start_offset: i64,
79 pub last_offset: i64,
81 pub bytes: Bytes,
83}
84
85impl RawSegmentRead {
86 fn empty() -> Self {
87 Self {
88 start_offset: 0,
89 last_offset: -1,
90 bytes: Bytes::new(),
91 }
92 }
93
94 #[must_use]
96 pub fn is_empty(&self) -> bool {
97 self.bytes.is_empty()
98 }
99}
100
101impl Segment {
102 pub fn create(dir: &Path, base_offset: i64) -> Result<Self, LogError> {
105 let log_path = name::log_path(dir, base_offset);
106 let log_file = OpenOptions::new()
107 .read(true)
108 .write(true)
109 .create_new(true)
110 .open(&log_path)?;
111 let offset_index = OffsetIndex::open(&name::index_path(dir, base_offset))?;
112 let time_index = TimeIndex::open(&name::timeindex_path(dir, base_offset))?;
113 Ok(Self {
114 dir: dir.to_path_buf(),
115 base_offset,
116 log_file,
117 log_size: 0,
118 offset_index,
119 time_index,
120 sealed: false,
121 max_timestamp: i64::MIN,
122 last_offset: base_offset - 1,
123 })
124 }
125
126 pub fn open_active(dir: &Path, base_offset: i64, validate: bool) -> Result<Self, LogError> {
131 let mut seg = Self::open(dir, base_offset)?;
132 if validate {
133 seg.recover_active_tail()?;
134 }
135 Ok(seg)
136 }
137
138 fn recover_active_tail(&mut self) -> Result<(), LogError> {
139 let scan_start = self
140 .offset_index
141 .last_entry()
142 .map_or(0u64, |(_, pos)| u64::from(pos));
143 if scan_start >= self.log_size {
144 return Ok(());
145 }
146
147 let mut buf = Vec::new();
148 let to_read = usize::try_from(self.log_size - scan_start).unwrap_or(usize::MAX);
149 self.read_log_range(scan_start, &mut buf, to_read)?;
150
151 let mut cur: &[u8] = &buf;
152 let mut consumed: u64 = 0;
153 let mut last_offset = self.last_offset;
154 let mut max_ts = self.max_timestamp;
155 while !cur.is_empty() {
156 let before = cur.len();
157 let Ok(batch) = RecordBatch::decode(&mut cur) else {
158 break;
159 };
160 consumed += (before - cur.len()) as u64;
161 last_offset = batch.base_offset + i64::from(batch.last_offset_delta);
162 if batch.max_timestamp > max_ts {
163 max_ts = batch.max_timestamp;
164 }
165 }
166
167 let valid_end = scan_start + consumed;
168 if valid_end < self.log_size {
169 self.log_file.set_len(valid_end)?;
170 self.log_size = valid_end;
171 }
172 self.last_offset = last_offset;
173 self.max_timestamp = max_ts;
174 Ok(())
175 }
176
177 pub fn open(dir: &Path, base_offset: i64) -> Result<Self, LogError> {
183 let log_path = name::log_path(dir, base_offset);
184 let log_file = OpenOptions::new().read(true).write(true).open(&log_path)?;
185 let log_size = log_file.metadata()?.len();
186 let offset_index = OffsetIndex::open(&name::index_path(dir, base_offset))?;
187 let time_index = TimeIndex::open(&name::timeindex_path(dir, base_offset))?;
188 Ok(Self {
189 dir: dir.to_path_buf(),
190 base_offset,
191 log_file,
192 log_size,
193 offset_index,
194 time_index,
195 sealed: false,
196 max_timestamp: i64::MIN,
197 last_offset: base_offset - 1,
198 })
199 }
200
201 #[must_use]
203 pub fn base_offset(&self) -> i64 {
204 self.base_offset
205 }
206
207 #[must_use]
209 pub fn txn_index_path(&self) -> std::path::PathBuf {
210 crate::name::txnindex_path(&self.dir, self.base_offset)
211 }
212
213 #[must_use]
217 pub fn leader_epoch_checkpoint_path(&self) -> std::path::PathBuf {
218 crate::name::leader_epoch_checkpoint_path(&self.dir)
219 }
220
221 #[must_use]
224 pub fn last_offset(&self) -> i64 {
225 self.last_offset
226 }
227
228 #[must_use]
230 pub fn size_bytes(&self) -> u64 {
231 self.log_size
232 }
233
234 #[must_use]
237 pub fn max_timestamp(&self) -> i64 {
238 self.max_timestamp
239 }
240
241 #[must_use]
248 pub fn offset_for_timestamp(&self, target_ts: i64) -> Option<(i64, i64)> {
249 let floor_rel = self.time_index.lookup(target_ts);
250 let scan_from = self.base_offset + i64::from(floor_rel);
251 self.scan_from_floor(scan_from, |ts| ts >= target_ts)
252 }
253
254 #[must_use]
260 pub fn offset_of_max_timestamp(&self) -> Option<(i64, i64)> {
261 if self.max_timestamp == i64::MIN {
262 return None;
263 }
264 let floor_rel = self.time_index.lookup(self.max_timestamp);
265 let scan_from = self.base_offset + i64::from(floor_rel);
266 self.scan_from_floor(scan_from, |ts| ts == self.max_timestamp)
271 }
272
273 fn scan_from_floor(&self, floor_offset: i64, pred: impl Fn(i64) -> bool) -> Option<(i64, i64)> {
284 const SCAN_WINDOW_BYTES: usize = 64 * 1024;
288 self.scan_from_floor_windowed(floor_offset, SCAN_WINDOW_BYTES, pred)
289 }
290
291 fn scan_from_floor_windowed(
303 &self,
304 floor_offset: i64,
305 window_bytes: usize,
306 pred: impl Fn(i64) -> bool,
307 ) -> Option<(i64, i64)> {
308 let mut cursor = floor_offset;
309 let mut window = window_bytes.max(1);
310 loop {
311 if cursor > self.last_offset {
312 return None;
313 }
314 let batches = self.read(cursor, window).ok()?;
315 if batches.is_empty() {
316 window = window.saturating_mul(2);
320 continue;
321 }
322 for batch in &batches {
323 for rec in &batch.records {
324 let ts = batch.base_timestamp + rec.timestamp_delta;
325 if pred(ts) {
326 return Some((batch.base_offset + i64::from(rec.offset_delta), ts));
327 }
328 }
329 }
330 let last = batches.last().expect("non-empty checked above");
334 let last_read = last.base_offset + i64::from(last.last_offset_delta);
335 cursor = last_read + 1;
336 }
337 }
338
339 #[must_use]
342 pub fn is_sealed(&self) -> bool {
343 self.sealed
344 }
345
346 pub fn read(&self, offset: i64, max_bytes: usize) -> Result<Vec<RecordBatch>, LogError> {
350 if offset > self.last_offset {
351 return Ok(vec![]);
352 }
353 let target_rel = u32::try_from((offset - self.base_offset).max(0))
354 .map_err(|_| LogError::BadSegmentName("target offset out of range".into()))?;
355 let start_pos = u64::from(self.offset_index.lookup(target_rel));
356
357 let initial_cap = max_bytes.min(4 * 1024 * 1024);
358 let mut buf: Vec<u8> = Vec::with_capacity(initial_cap);
359 self.read_log_range(start_pos, &mut buf, max_bytes)?;
360
361 let mut out: Vec<RecordBatch> = Vec::new();
362 let mut total: usize = 0;
363 let mut cursor: &[u8] = &buf;
364 while !cursor.is_empty() {
365 let before = cursor.len();
366 let Ok(batch) = RecordBatch::decode(&mut cursor) else {
367 break; };
369 let consumed = before - cursor.len();
370 let batch_last = batch.base_offset + i64::from(batch.last_offset_delta);
371 if batch_last >= offset {
372 out.push(batch);
373 total += consumed;
374 if total >= max_bytes {
375 break;
376 }
377 }
378 }
379 Ok(out)
380 }
381
382 pub fn read_raw(
388 &self,
389 fetch_offset: i64,
390 limit_offset: i64,
391 max_bytes: usize,
392 ) -> Result<RawSegmentRead, LogError> {
393 if fetch_offset > self.last_offset || fetch_offset >= limit_offset {
394 return Ok(RawSegmentRead::empty());
395 }
396 let target_rel = u32::try_from((fetch_offset - self.base_offset).max(0))
397 .map_err(|_| LogError::Corrupt("read_raw target offset out of range".into()))?;
398 let start_pos = u64::from(self.offset_index.lookup(target_rel));
399
400 let first_read = max_bytes.max(HEADER_LEN);
401 let mut buf: Vec<u8> = Vec::with_capacity(first_read.min(4 * 1024 * 1024));
402 self.read_log_range(start_pos, &mut buf, first_read)?;
403
404 let mut pos = 0usize;
405 let mut range_start: Option<usize> = None;
406 let mut range_end = 0usize;
407 let mut start_offset = fetch_offset;
408 let mut last_offset = fetch_offset - 1;
409
410 loop {
411 if pos + HEADER_LEN > buf.len() {
412 break;
413 }
414 let hdr = RecordBatchHeader::ref_from_bytes(&buf[pos..pos + HEADER_LEN])
415 .map_err(|_| LogError::Corrupt("record batch header".into()))?;
416 let base = hdr.base_offset.get();
417 let batch_len = usize::try_from(hdr.batch_length.get().max(0)).unwrap_or(0);
418 let total = 12 + batch_len;
419 let batch_last = base + i64::from(hdr.last_offset_delta.get());
420
421 if batch_last < fetch_offset {
422 pos += total;
423 continue;
424 }
425 if base >= limit_offset {
426 break;
427 }
428 if pos + total > buf.len() {
429 if range_start.is_none() {
430 let mut one: Vec<u8> = Vec::with_capacity(total);
431 self.read_log_range(start_pos + pos as u64, &mut one, total)?;
432 if one.len() < total {
433 break;
434 }
435 return Ok(RawSegmentRead {
436 start_offset: base,
437 last_offset: batch_last,
438 bytes: Bytes::from(one),
439 });
440 }
441 break;
442 }
443
444 if range_start.is_none() {
445 range_start = Some(pos);
446 start_offset = base;
447 }
448 range_end = pos + total;
449 last_offset = batch_last;
450 pos += total;
451
452 if range_end - range_start.expect("set above") >= max_bytes {
453 break;
454 }
455 }
456
457 match range_start {
458 Some(s) => Ok(RawSegmentRead {
459 start_offset,
460 last_offset,
461 bytes: Bytes::from(buf).slice(s..range_end),
462 }),
463 None => Ok(RawSegmentRead::empty()),
464 }
465 }
466
467 fn read_log_range(
468 &self,
469 start_pos: u64,
470 buf: &mut Vec<u8>,
471 max_bytes: usize,
472 ) -> Result<(), LogError> {
473 let available = self.log_size.saturating_sub(start_pos);
474 let to_read = available.min(u64::try_from(max_bytes).unwrap_or(u64::MAX));
475 let to_read = usize::try_from(to_read).unwrap_or(usize::MAX);
476 let base = buf.len();
477 buf.resize(base + to_read, 0);
478 let n = read_full_at(&self.log_file, start_pos, &mut buf[base..])?;
479 buf.truncate(base + n);
480 Ok(())
481 }
482
483 pub fn append(
491 &mut self,
492 batch: &RecordBatch,
493 index_interval_bytes: u32,
494 ) -> Result<u64, LogError> {
495 use std::io::Write;
496
497 if self.sealed {
498 return Err(LogError::Io(std::io::Error::other("segment is sealed")));
499 }
500
501 let mut buf = bytes::BytesMut::with_capacity(batch.encoded_len());
502 batch.encode(&mut buf)?;
503 let bytes = buf.freeze();
504
505 let position = self.log_size;
506 self.log_file.seek(SeekFrom::End(0))?;
507 self.log_file.write_all(&bytes)?;
508 self.log_size += bytes.len() as u64;
509
510 let last_offset = batch.base_offset + i64::from(batch.last_offset_delta);
511 self.last_offset = last_offset;
512 if batch.max_timestamp > self.max_timestamp {
513 self.max_timestamp = batch.max_timestamp;
514 }
515
516 let should_index = match self.offset_index.last_entry() {
517 None => true,
518 Some((_, last_pos)) => {
519 position.saturating_sub(u64::from(last_pos)) >= u64::from(index_interval_bytes)
520 }
521 };
522 if should_index {
523 let rel = u32::try_from(batch.base_offset - self.base_offset)
524 .map_err(|_| LogError::BadSegmentName("offset overflow in segment".into()))?;
525 let pos_u32 = u32::try_from(position)
526 .map_err(|_| LogError::BadSegmentName("position overflow in segment".into()))?;
527 self.offset_index.append(rel, pos_u32)?;
528 self.time_index.append(self.max_timestamp, rel)?;
529 }
530
531 Ok(position)
532 }
533
534 pub fn seal(&mut self) {
536 self.sealed = true;
537 }
538
539 #[must_use]
545 pub fn dir(&self) -> &Path {
546 &self.dir
547 }
548
549 pub fn flush(&mut self) -> Result<(), LogError> {
551 self.log_file.sync_data()?;
552 self.offset_index.flush()?;
553 self.time_index.flush()?;
554 Ok(())
555 }
556
557 pub fn truncate_to_relative(&mut self, rel: u32) -> Result<(), LogError> {
560 let read_limit = self
565 .offset_index
566 .position_at_or_after(rel)
567 .map_or(self.log_size, u64::from);
568 let mut buf = Vec::new();
569 let to_read = usize::try_from(read_limit).unwrap_or(usize::MAX);
570 self.read_log_range(0, &mut buf, to_read)?;
571
572 let target_abs = self.base_offset + i64::from(rel);
573 let mut cur: &[u8] = &buf;
574 let mut pos: u64 = 0;
575 let mut last_kept_offset = self.base_offset - 1;
576 let mut last_kept_ts = i64::MIN;
577 while !cur.is_empty() {
578 let before = cur.len();
579 let Ok(batch) = RecordBatch::decode(&mut cur) else {
580 break;
581 };
582 let batch_last_offset = batch.base_offset + i64::from(batch.last_offset_delta);
583 if batch_last_offset >= target_abs {
584 break;
585 }
586 pos += (before - cur.len()) as u64;
587 last_kept_offset = batch_last_offset;
588 if batch.max_timestamp > last_kept_ts {
589 last_kept_ts = batch.max_timestamp;
590 }
591 }
592
593 self.log_file.set_len(pos)?;
594 self.log_size = pos;
595 self.last_offset = last_kept_offset;
596 self.max_timestamp = last_kept_ts;
597
598 let pos_u32 =
599 u32::try_from(pos).map_err(|_| LogError::BadSegmentName("position overflow".into()))?;
600 self.offset_index.truncate_by_position(pos_u32)?;
601 self.time_index.truncate_by_relative_offset(rel)?;
602 self.sealed = false;
603 Ok(())
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610 use assert2::assert;
611 use bytes::Bytes;
612 use crabka_protocol::records::{Record, RecordBatch};
613 use tempfile::tempdir;
614
615 fn sample_batch(base_offset: i64, n: i32, ts_base: i64) -> RecordBatch {
616 let mut b = RecordBatch {
617 base_offset,
618 base_timestamp: ts_base,
619 max_timestamp: ts_base + i64::from(n - 1),
620 last_offset_delta: n - 1,
621 ..RecordBatch::default()
622 };
623 for i in 0..n {
624 b.records.push(Record {
625 offset_delta: i,
626 timestamp_delta: i64::from(i),
627 key: Some(Bytes::from(format!("k{i}"))),
628 value: Some(Bytes::from(format!("v{i}"))),
629 ..Default::default()
630 });
631 }
632 b
633 }
634
635 #[test]
636 fn offset_for_timestamp_finds_first_ge() {
637 let dir = tempdir().unwrap();
638 let mut seg = Segment::create(dir.path(), 0).unwrap();
639 seg.append(&sample_batch(0, 3, 100), 0).unwrap();
641 seg.append(&sample_batch(3, 2, 200), 0).unwrap();
642 assert!(seg.offset_for_timestamp(100) == Some((0, 100)));
646 assert!(seg.offset_for_timestamp(101) == Some((1, 101)));
647 assert!(seg.offset_for_timestamp(150) == Some((3, 200)));
648 assert!(seg.offset_for_timestamp(201) == Some((4, 201)));
649 assert!(seg.offset_for_timestamp(202) == None);
650 drop(dir);
651 }
652
653 #[test]
654 fn scan_from_floor_finds_match_beyond_first_window() {
655 let dir = tempdir().unwrap();
656 let mut seg = Segment::create(dir.path(), 0).unwrap();
657 let n = 50i64;
661 for off in 0..n {
662 let mut b = RecordBatch {
663 base_offset: off,
664 base_timestamp: 1_000 + off,
665 max_timestamp: 1_000 + off,
666 last_offset_delta: 0,
667 ..RecordBatch::default()
668 };
669 b.records.push(Record {
670 offset_delta: 0,
671 timestamp_delta: 0,
672 value: Some(Bytes::from(format!("v{off}"))),
673 ..Default::default()
674 });
675 seg.append(&b, 0).unwrap();
676 }
677 let target = 1_000 + (n - 1);
681 let got = seg.scan_from_floor_windowed(0, 1, |ts| ts >= target);
682 assert!(got == Some((n - 1, target)));
683 let none = seg.scan_from_floor_windowed(0, 1, |ts| ts > 10_000);
685 assert!(none == None);
686 drop(dir);
687 }
688
689 #[test]
690 fn offset_of_max_timestamp_earliest_on_tie() {
691 let dir = tempdir().unwrap();
692 let mut seg = Segment::create(dir.path(), 0).unwrap();
693 seg.append(&sample_batch(0, 3, 100), 0).unwrap();
695 seg.append(&sample_batch(3, 2, 200), 0).unwrap();
697 assert!(seg.offset_of_max_timestamp() == Some((4, 201)));
698
699 let dir2 = tempdir().unwrap();
701 let empty = Segment::create(dir2.path(), 0).unwrap();
702 assert!(empty.offset_of_max_timestamp() == None);
703 drop(dir);
704 drop(dir2);
705 }
706
707 #[test]
708 fn offset_of_max_timestamp_tie_picks_earliest() {
709 let dir = tempdir().unwrap();
710 let mut seg = Segment::create(dir.path(), 0).unwrap();
711 let mut b = RecordBatch {
713 base_offset: 0,
714 base_timestamp: 500,
715 max_timestamp: 500,
716 last_offset_delta: 2,
717 ..RecordBatch::default()
718 };
719 for i in 0..3 {
720 b.records.push(Record {
721 offset_delta: i,
722 timestamp_delta: 0,
723 value: Some(Bytes::from("v")),
724 ..Default::default()
725 });
726 }
727 seg.append(&b, 0).unwrap();
728 assert!(seg.offset_of_max_timestamp() == Some((0, 500)));
729 drop(dir);
730 }
731
732 #[test]
733 fn append_then_read_back() {
734 let dir = tempdir().unwrap();
735 let mut seg = Segment::create(dir.path(), 0).unwrap();
736 let b1 = sample_batch(0, 3, 1_000_000);
737 let b2 = sample_batch(3, 2, 2_000_000);
738 seg.append(&b1, 4096).unwrap();
739 seg.append(&b2, 4096).unwrap();
740 assert!(seg.last_offset() == 4);
741 let read = seg.read(0, usize::MAX).unwrap();
742 assert!(read.len() == 2);
743 assert!(read[0].records.len() == 3);
744 assert!(read[1].records.len() == 2);
745 }
746
747 #[test]
748 fn read_at_higher_offset_skips_earlier_batches() {
749 let dir = tempdir().unwrap();
750 let mut seg = Segment::create(dir.path(), 0).unwrap();
751 seg.append(&sample_batch(0, 3, 1_000_000), 4096).unwrap();
752 seg.append(&sample_batch(3, 2, 2_000_000), 4096).unwrap();
753 let read = seg.read(4, usize::MAX).unwrap();
754 assert!(read.len() == 1);
756 assert!(read[0].base_offset == 3);
757 }
758
759 #[test]
760 fn append_to_sealed_segment_errors() {
761 let dir = tempdir().unwrap();
762 let mut seg = Segment::create(dir.path(), 0).unwrap();
763 seg.seal();
764 assert!(seg.is_sealed());
765 let err = seg.append(&sample_batch(0, 1, 0), 4096).unwrap_err();
766 assert!(matches!(err, LogError::Io(_)));
767 }
768
769 #[test]
770 fn read_past_last_offset_returns_empty() {
771 let dir = tempdir().unwrap();
772 let mut seg = Segment::create(dir.path(), 0).unwrap();
773 seg.append(&sample_batch(0, 2, 1_000), 4096).unwrap();
774 let read = seg.read(100, usize::MAX).unwrap();
775 assert!(read.is_empty());
776 }
777
778 #[test]
779 fn flush_succeeds() {
780 let dir = tempdir().unwrap();
781 let mut seg = Segment::create(dir.path(), 0).unwrap();
782 seg.append(&sample_batch(0, 1, 42), 4096).unwrap();
783 seg.flush().unwrap();
784 }
785
786 fn test_segment() -> (tempfile::TempDir, Segment) {
789 let dir = tempdir().unwrap();
790 let seg = Segment::create(dir.path(), 0).unwrap();
791 (dir, seg)
792 }
793
794 fn test_batch_at(off: i64) -> RecordBatch {
795 let mut b = RecordBatch {
796 base_offset: off,
797 base_timestamp: 1_000,
798 max_timestamp: 1_000,
799 last_offset_delta: 0,
800 ..RecordBatch::default()
801 };
802 b.records.push(Record {
803 offset_delta: 0,
804 timestamp_delta: 0,
805 value: Some(Bytes::from(format!("v{off}"))),
806 ..Default::default()
807 });
808 b
809 }
810
811 #[test]
812 fn read_raw_is_byte_exact_and_multi_batch() {
813 let (dir, mut seg) = test_segment();
814 let mut wire = bytes::BytesMut::new();
815 for off in 0..3i64 {
816 let b = test_batch_at(off);
817 seg.append(&b, 0).unwrap();
818 b.encode(&mut wire).unwrap();
819 }
820 let wire = wire.freeze();
821 let r = seg.read_raw(0, 3, 10 * 1024 * 1024).unwrap();
822 assert!(r.start_offset == 0);
823 assert!(r.last_offset == 2);
824 assert!(
825 &r.bytes[..] == &wire[..],
826 "raw bytes must equal the on-disk concatenation"
827 );
828 let mut cur: &[u8] = &r.bytes;
829 let mut n = 0;
830 while !cur.is_empty() {
831 crabka_protocol::records::RecordBatch::decode(&mut cur).unwrap();
832 n += 1;
833 }
834 assert!(n == 3);
835 drop(dir);
836 }
837
838 #[test]
839 fn read_raw_clamps_at_limit_offset() {
840 let (dir, mut seg) = test_segment();
841 for off in 0..3i64 {
842 seg.append(&test_batch_at(off), 0).unwrap();
843 }
844 let r = seg.read_raw(0, 2, 10 * 1024 * 1024).unwrap();
845 assert!(r.last_offset == 1);
846 drop(dir);
847 }
848
849 #[test]
850 fn read_raw_returns_at_least_one_batch_over_budget() {
851 let (dir, mut seg) = test_segment();
852 seg.append(&test_batch_at(0), 0).unwrap();
853 let r = seg.read_raw(0, 1, 1).unwrap();
854 assert!(r.start_offset == 0);
855 assert!(r.last_offset == 0);
856 assert!(!r.bytes.is_empty());
857 drop(dir);
858 }
859}