1use super::allocator::BlockStateTracker;
2use super::reader::ColReaderInfo;
3use super::{ReadConsistency, Walrus};
4use crate::wal::block::{Block, Entry, Metadata};
5use crate::wal::config::{MAX_BATCH_ENTRIES, PREFIX_META_SIZE, checksum64, debug_print};
6use std::io;
7use std::sync::{Arc, RwLock};
8
9use rkyv::{AlignedVec, Deserialize};
10
11#[cfg(target_os = "linux")]
12use crate::wal::config::USE_FD_BACKEND;
13#[cfg(target_os = "linux")]
14use std::sync::atomic::Ordering;
15
16#[cfg(target_os = "linux")]
17use io_uring;
18
19#[cfg(target_os = "linux")]
20use std::os::unix::io::AsRawFd;
21
22impl Walrus {
23 pub fn read_next(&self, col_name: &str, checkpoint: bool) -> io::Result<Option<Entry>> {
24 const TAIL_FLAG: u64 = 1u64 << 63;
25 let info_arc = if let Some(arc) = {
26 let map = self.reader.data.read().map_err(|_| {
27 io::Error::new(io::ErrorKind::Other, "reader map read lock poisoned")
28 })?;
29 map.get(col_name).cloned()
30 } {
31 arc
32 } else {
33 let mut map = self.reader.data.write().map_err(|_| {
34 io::Error::new(io::ErrorKind::Other, "reader map write lock poisoned")
35 })?;
36 map.entry(col_name.to_string())
37 .or_insert_with(|| {
38 Arc::new(RwLock::new(ColReaderInfo {
39 chain: Vec::new(),
40 cur_block_idx: 0,
41 cur_block_offset: 0,
42 reads_since_persist: 0,
43 tail_block_id: 0,
44 tail_offset: 0,
45 hydrated_from_index: false,
46 }))
47 })
48 .clone()
49 };
50 let mut info = info_arc
51 .write()
52 .map_err(|_| io::Error::new(io::ErrorKind::Other, "col info write lock poisoned"))?;
53 debug_print!(
54 "[reader] read_next start: col={}, chain_len={}, idx={}, offset={}",
55 col_name,
56 info.chain.len(),
57 info.cur_block_idx,
58 info.cur_block_offset
59 );
60
61 let mut persisted_tail: Option<(u64 , u64 )> = None;
63 if !info.hydrated_from_index {
64 if let Ok(idx_guard) = self.read_offset_index.read() {
65 if let Some(pos) = idx_guard.get(col_name) {
66 if (pos.cur_block_idx & TAIL_FLAG) != 0 {
67 let tail_block_id = pos.cur_block_idx & (!TAIL_FLAG);
68 persisted_tail = Some((tail_block_id, pos.cur_block_offset));
69 info.cur_block_idx = info.chain.len();
71 info.cur_block_offset = 0;
72 } else {
73 let mut ib = pos.cur_block_idx as usize;
74 if ib > info.chain.len() {
75 ib = info.chain.len();
76 }
77 info.cur_block_idx = ib;
78 if ib < info.chain.len() {
79 let used = info.chain[ib].used;
80 info.cur_block_offset = pos.cur_block_offset.min(used);
81 } else {
82 info.cur_block_offset = 0;
83 }
84 }
85 info.hydrated_from_index = true;
86 } else {
87 info.hydrated_from_index = true;
89 }
90 }
91 }
92
93 if let Some((tail_block_id, tail_off)) = persisted_tail {
95 if !info.chain.is_empty() {
96 if let Some((idx, block)) = info
97 .chain
98 .iter()
99 .enumerate()
100 .find(|(_, b)| b.id == tail_block_id)
101 {
102 let used = block.used;
103 info.cur_block_idx = idx;
104 info.cur_block_offset = tail_off.min(used);
105 } else {
106 info.cur_block_idx = 0;
107 info.cur_block_offset = 0;
108 }
109 }
110 persisted_tail = None;
111 }
112
113 drop(info);
115
116 loop {
117 let mut info = info_arc.write().map_err(|_| {
119 io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
120 })?;
121 if info.cur_block_idx < info.chain.len() {
123 let idx = info.cur_block_idx;
124 let off = info.cur_block_offset;
125 let block = info.chain[idx].clone();
126
127 if off >= block.used {
128 debug_print!(
129 "[reader] read_next: advance block col={}, block_id={}, offset={}, used={}",
130 col_name,
131 block.id,
132 off,
133 block.used
134 );
135 BlockStateTracker::set_checkpointed_true(block.id as usize);
136 info.cur_block_idx += 1;
137 info.cur_block_offset = 0;
138 continue;
139 }
140
141 match block.read(off) {
142 Ok((entry, consumed)) => {
143 let new_off = off + consumed as u64;
145 let mut maybe_persist = None;
146 if checkpoint {
147 info.cur_block_offset = new_off;
148 maybe_persist = if self.should_persist(&mut info, false) {
149 Some((info.cur_block_idx as u64, new_off))
150 } else {
151 None
152 };
153 }
154
155 drop(info);
157 if checkpoint {
158 if let Some((idx_val, off_val)) = maybe_persist {
159 if let Ok(mut idx_guard) = self.read_offset_index.write() {
160 let _ = idx_guard.set(col_name.to_string(), idx_val, off_val);
161 }
162 }
163 }
164
165 debug_print!(
166 "[reader] read_next: OK col={}, block_id={}, consumed={}, new_offset={}",
167 col_name,
168 block.id,
169 consumed,
170 new_off
171 );
172 return Ok(Some(entry));
173 }
174 Err(_) => {
175 debug_print!(
176 "[reader] read_next: read error col={}, block_id={}, offset={}",
177 col_name,
178 block.id,
179 off
180 );
181 return Ok(None);
182 }
183 }
184 }
185
186 let tail_snapshot = (info.tail_block_id, info.tail_offset);
188 drop(info);
189
190 let writer_arc = {
191 let map = self.writers.read().map_err(|_| {
192 io::Error::new(io::ErrorKind::Other, "writers read lock poisoned")
193 })?;
194 match map.get(col_name) {
195 Some(w) => w.clone(),
196 None => return Ok(None),
197 }
198 };
199 let (active_block, written) = writer_arc.snapshot_block()?;
200
201 let mut info = info_arc.write().map_err(|_| {
204 io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
205 })?;
206 if let Some((tail_block_id, tail_off)) = persisted_tail {
207 if tail_block_id != active_block.id {
208 if let Some((idx, _)) = info
209 .chain
210 .iter()
211 .enumerate()
212 .find(|(_, b)| b.id == tail_block_id)
213 {
214 info.cur_block_idx = idx;
215 info.cur_block_offset = tail_off.min(info.chain[idx].used);
216 if checkpoint {
217 if self.should_persist(&mut info, true) {
218 if let Ok(mut idx_guard) = self.read_offset_index.write() {
219 let _ = idx_guard.set(
220 col_name.to_string(),
221 info.cur_block_idx as u64,
222 info.cur_block_offset,
223 );
224 }
225 }
226 }
227 persisted_tail = None; drop(info);
229 continue;
230 } else {
231 persisted_tail = Some((active_block.id, 0));
233 if checkpoint {
234 if self.should_persist(&mut info, true) {
235 if let Ok(mut idx_guard) = self.read_offset_index.write() {
236 let _ = idx_guard.set(
237 col_name.to_string(),
238 active_block.id | TAIL_FLAG,
239 0,
240 );
241 }
242 }
243 }
244 }
245 }
246 } else {
247 persisted_tail = Some((active_block.id, 0));
249 if checkpoint {
250 if self.should_persist(&mut info, true) {
251 if let Ok(mut idx_guard) = self.read_offset_index.write() {
252 let _ =
253 idx_guard.set(col_name.to_string(), active_block.id | TAIL_FLAG, 0);
254 }
255 }
256 }
257 }
258 drop(info);
259
260 let (tail_block_id, mut tail_off) = match persisted_tail {
262 Some(v) => v,
263 None => return Ok(None),
264 };
265 if tail_block_id == active_block.id {
266 let (snap_id, snap_off) = tail_snapshot;
267 if snap_id == active_block.id {
268 tail_off = tail_off.max(snap_off);
269 }
270 } else {
271 }
273 if tail_block_id != active_block.id {
275 continue;
277 }
278
279 if tail_off < written {
280 match active_block.read(tail_off) {
281 Ok((entry, consumed)) => {
282 let new_off = tail_off + consumed as u64;
283 let mut info = info_arc.write().map_err(|_| {
285 io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
286 })?;
287 let mut maybe_persist = None;
288 if checkpoint {
289 info.tail_block_id = active_block.id;
290 info.tail_offset = new_off;
291 maybe_persist = if self.should_persist(&mut info, false) {
292 Some((tail_block_id | TAIL_FLAG, new_off))
293 } else {
294 None
295 };
296 }
297 drop(info);
298 if checkpoint {
299 if let Some((idx_val, off_val)) = maybe_persist {
300 if let Ok(mut idx_guard) = self.read_offset_index.write() {
301 let _ = idx_guard.set(col_name.to_string(), idx_val, off_val);
302 }
303 }
304 }
305
306 debug_print!(
307 "[reader] read_next: tail OK col={}, block_id={}, consumed={}, new_tail_off={}",
308 col_name,
309 active_block.id,
310 consumed,
311 new_off
312 );
313 return Ok(Some(entry));
314 }
315 Err(_) => {
316 debug_print!(
317 "[reader] read_next: tail read error col={}, block_id={}, offset={}",
318 col_name,
319 active_block.id,
320 tail_off
321 );
322 return Ok(None);
323 }
324 }
325 } else {
326 debug_print!(
327 "[reader] read_next: tail caught up col={}, block_id={}, off={}, written={}",
328 col_name,
329 active_block.id,
330 tail_off,
331 written
332 );
333 return Ok(None);
334 }
335 }
336 }
337
338 fn should_persist(&self, info: &mut ColReaderInfo, force: bool) -> bool {
339 match self.read_consistency {
340 ReadConsistency::StrictlyAtOnce => true,
341 ReadConsistency::AtLeastOnce { persist_every } => {
342 let every = persist_every.max(1);
343 if force {
344 info.reads_since_persist = 0;
345 return true;
346 }
347 let next = info.reads_since_persist.saturating_add(1);
348 if next >= every {
349 info.reads_since_persist = 0;
350 true
351 } else {
352 info.reads_since_persist = next;
353 false
354 }
355 }
356 }
357 }
358
359 pub fn batch_read_for_topic(
360 &self,
361 col_name: &str,
362 max_bytes: usize,
363 checkpoint: bool,
364 ) -> io::Result<Vec<Entry>> {
365 struct ReadPlan {
367 blk: Block,
368 start: u64,
369 end: u64,
370 is_tail: bool,
371 chain_idx: Option<usize>,
372 }
373
374 const TAIL_FLAG: u64 = 1u64 << 63;
375
376 let writer_snapshot: Option<(Block, u64)> = {
378 let map = self
379 .writers
380 .read()
381 .map_err(|_| io::Error::new(io::ErrorKind::Other, "writers read lock poisoned"))?;
382 match map.get(col_name).cloned() {
383 Some(w) => match w.snapshot_block() {
384 Ok(snapshot) => Some(snapshot),
385 Err(_) => None,
386 },
387 None => None,
388 }
389 };
390
391 let info_arc = if let Some(arc) = {
393 let map = self.reader.data.read().map_err(|_| {
394 io::Error::new(io::ErrorKind::Other, "reader map read lock poisoned")
395 })?;
396 map.get(col_name).cloned()
397 } {
398 arc
399 } else {
400 let mut map = self.reader.data.write().map_err(|_| {
401 io::Error::new(io::ErrorKind::Other, "reader map write lock poisoned")
402 })?;
403 map.entry(col_name.to_string())
404 .or_insert_with(|| {
405 Arc::new(RwLock::new(ColReaderInfo {
406 chain: Vec::new(),
407 cur_block_idx: 0,
408 cur_block_offset: 0,
409 reads_since_persist: 0,
410 tail_block_id: 0,
411 tail_offset: 0,
412 hydrated_from_index: false,
413 }))
414 })
415 .clone()
416 };
417
418 let mut info = info_arc
419 .write()
420 .map_err(|_| io::Error::new(io::ErrorKind::Other, "col info write lock poisoned"))?;
421
422 let mut persisted_tail_for_fold: Option<(u64 , u64 )> = None;
424 if !info.hydrated_from_index {
425 if let Ok(idx_guard) = self.read_offset_index.read() {
426 if let Some(pos) = idx_guard.get(col_name) {
427 if (pos.cur_block_idx & TAIL_FLAG) != 0 {
428 let tail_block_id = pos.cur_block_idx & (!TAIL_FLAG);
429 info.tail_block_id = tail_block_id;
430 info.tail_offset = pos.cur_block_offset;
431 info.cur_block_idx = info.chain.len();
432 info.cur_block_offset = 0;
433 persisted_tail_for_fold = Some((tail_block_id, pos.cur_block_offset));
434 } else {
435 let mut ib = pos.cur_block_idx as usize;
436 if ib > info.chain.len() {
437 ib = info.chain.len();
438 }
439 info.cur_block_idx = ib;
440 if ib < info.chain.len() {
441 let used = info.chain[ib].used;
442 info.cur_block_offset = pos.cur_block_offset.min(used);
443 } else {
444 info.cur_block_offset = 0;
445 }
446 }
447
448 info.hydrated_from_index = true;
449 } else {
450 info.hydrated_from_index = true;
451 }
452 }
453 }
454
455 if let Some((tail_block_id, tail_off)) = persisted_tail_for_fold {
457 if let Some(idx) = info
458 .chain
459 .iter()
460 .enumerate()
461 .find(|(_, b)| b.id == tail_block_id)
462 .map(|(idx, _)| idx)
463 {
464 let used = info.chain[idx].used;
465 info.cur_block_idx = idx;
466 info.cur_block_offset = tail_off.min(used);
467 }
468 }
469
470 let mut plan: Vec<ReadPlan> = Vec::new();
472 let mut planned_bytes: usize = 0;
473 let chain_len_at_plan = info.chain.len();
474 let mut cur_idx = info.cur_block_idx;
475 let mut cur_off = info.cur_block_offset;
476
477 while cur_idx < info.chain.len() && planned_bytes < max_bytes {
478 let block = info.chain[cur_idx].clone();
479 if cur_off >= block.used {
480 BlockStateTracker::set_checkpointed_true(block.id as usize);
481 cur_idx += 1;
482 cur_off = 0;
483 continue;
484 }
485
486 let end = block.used.min(cur_off + (max_bytes - planned_bytes) as u64);
487 if end > cur_off {
488 plan.push(ReadPlan {
489 blk: block.clone(),
490 start: cur_off,
491 end,
492 is_tail: false,
493 chain_idx: Some(cur_idx),
494 });
495 planned_bytes += (end - cur_off) as usize;
496 }
497 cur_idx += 1;
498 cur_off = 0;
499 }
500
501 if cur_idx >= chain_len_at_plan {
503 if let Some((active_block, written)) = writer_snapshot.clone() {
504 let tail_start = if info.tail_block_id == active_block.id {
506 info.tail_offset
507 } else {
508 0
509 };
510 if tail_start < written {
511 let end = written; plan.push(ReadPlan {
513 blk: active_block.clone(),
514 start: tail_start,
515 end,
516 is_tail: true,
517 chain_idx: None,
518 });
519 }
520 }
521 }
522
523 if plan.is_empty() {
524 return Ok(Vec::new());
525 }
526
527 let hold_lock_during_io = matches!(self.read_consistency, ReadConsistency::StrictlyAtOnce);
529 let mut info_opt = Some(info);
531 if !hold_lock_during_io {
532 drop(info_opt.take().unwrap());
534 }
535
536 #[cfg(target_os = "linux")]
538 let buffers = if USE_FD_BACKEND.load(Ordering::Relaxed) {
539 let ring_size = (plan.len() + 64).min(4096) as u32;
541 let mut ring = io_uring::IoUring::new(ring_size).map_err(|e| {
542 io::Error::new(io::ErrorKind::Other, format!("io_uring init failed: {}", e))
543 })?;
544
545 let mut temp_buffers: Vec<Vec<u8>> = vec![Vec::new(); plan.len()];
546 let mut expected_sizes: Vec<usize> = vec![0; plan.len()];
547
548 for (plan_idx, read_plan) in plan.iter().enumerate() {
549 let size = (read_plan.end - read_plan.start) as usize;
550 expected_sizes[plan_idx] = size;
551 let mut buffer = vec![0u8; size];
552 let file_offset = read_plan.blk.offset + read_plan.start;
553
554 let fd = if let Some(fd_backend) = read_plan.blk.mmap.storage().as_fd() {
555 io_uring::types::Fd(fd_backend.file().as_raw_fd())
556 } else {
557 return Err(io::Error::new(
558 io::ErrorKind::Unsupported,
559 "batch reads require FD backend when io_uring is enabled",
560 ));
561 };
562
563 let read_op = io_uring::opcode::Read::new(fd, buffer.as_mut_ptr(), size as u32)
564 .offset(file_offset)
565 .build()
566 .user_data(plan_idx as u64);
567
568 temp_buffers[plan_idx] = buffer;
569
570 unsafe {
571 ring.submission().push(&read_op).map_err(|e| {
572 io::Error::new(io::ErrorKind::Other, format!("io_uring push failed: {}", e))
573 })?;
574 }
575 }
576
577 ring.submit_and_wait(plan.len())?;
579
580 for _ in 0..plan.len() {
582 if let Some(cqe) = ring.completion().next() {
583 let plan_idx = cqe.user_data() as usize;
584 let got = cqe.result();
585 if got < 0 {
586 return Err(io::Error::new(
587 io::ErrorKind::Other,
588 format!("io_uring read failed: {}", got),
589 ));
590 }
591 if (got as usize) != expected_sizes[plan_idx] {
592 return Err(io::Error::new(
593 io::ErrorKind::UnexpectedEof,
594 format!(
595 "short read: got {} bytes, expected {}",
596 got, expected_sizes[plan_idx]
597 ),
598 ));
599 }
600 }
601 }
602
603 temp_buffers
604 } else {
605 plan.iter()
606 .map(|read_plan| {
607 let size = (read_plan.end - read_plan.start) as usize;
608 let mut buffer = vec![0u8; size];
609 let file_offset = (read_plan.blk.offset + read_plan.start) as usize;
610 read_plan.blk.mmap.read(file_offset, &mut buffer);
611 buffer
612 })
613 .collect()
614 };
615
616 #[cfg(not(target_os = "linux"))]
617 let buffers: Vec<Vec<u8>> = plan
618 .iter()
619 .map(|read_plan| {
620 let size = (read_plan.end - read_plan.start) as usize;
621 let mut buffer = vec![0u8; size];
622 let file_offset = (read_plan.blk.offset + read_plan.start) as usize;
623 read_plan.blk.mmap.read(file_offset, &mut buffer);
624 buffer
625 })
626 .collect();
627
628 let mut entries = Vec::new();
630 let mut total_data_bytes = 0usize;
631 let mut final_block_idx = 0usize;
632 let mut final_block_offset = 0u64;
633 let mut final_tail_block_id = 0u64;
634 let mut final_tail_offset = 0u64;
635 let mut entries_parsed = 0u32;
636 let mut saw_tail = false;
637
638 for (plan_idx, read_plan) in plan.iter().enumerate() {
639 if entries.len() >= MAX_BATCH_ENTRIES {
640 break;
641 }
642 let buffer = &buffers[plan_idx];
643 let mut buf_offset = 0usize;
644
645 while buf_offset < buffer.len() {
646 if entries.len() >= MAX_BATCH_ENTRIES {
647 break;
648 }
649 if buf_offset + PREFIX_META_SIZE > buffer.len() {
651 break; }
653
654 let meta_len =
655 (buffer[buf_offset] as usize) | ((buffer[buf_offset + 1] as usize) << 8);
656
657 if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
658 break;
660 }
661
662 let mut aligned = AlignedVec::with_capacity(meta_len);
664 aligned.extend_from_slice(&buffer[buf_offset + 2..buf_offset + 2 + meta_len]);
665
666 let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
667 let meta: Metadata = match archived.deserialize(&mut rkyv::Infallible) {
668 Ok(m) => m,
669 Err(_) => break, };
671
672 let data_size = meta.read_size;
673 let entry_consumed = PREFIX_META_SIZE + data_size;
674
675 if buf_offset + entry_consumed > buffer.len() {
677 break; }
679
680 let next_total = total_data_bytes
682 .checked_add(data_size)
683 .unwrap_or(usize::MAX);
684 if next_total > max_bytes && !entries.is_empty() {
685 break;
686 }
687
688 let data_start = buf_offset + PREFIX_META_SIZE;
690 let data_end = data_start + data_size;
691 let data_slice = &buffer[data_start..data_end];
692
693 if checksum64(data_slice) != meta.checksum {
695 return Err(io::Error::new(
696 io::ErrorKind::InvalidData,
697 "checksum mismatch in batch read",
698 ));
699 }
700
701 entries.push(Entry {
703 data: data_slice.to_vec(),
704 });
705 total_data_bytes = next_total;
706 entries_parsed += 1;
707
708 let in_block_offset = read_plan.start + buf_offset as u64 + entry_consumed as u64;
710
711 if read_plan.is_tail {
712 saw_tail = true;
713 final_tail_block_id = read_plan.blk.id;
714 final_tail_offset = in_block_offset;
715 } else if let Some(idx) = read_plan.chain_idx {
716 final_block_idx = idx;
717 final_block_offset = in_block_offset;
718 }
719
720 buf_offset += entry_consumed;
721 }
722 }
723
724 if entries_parsed > 0 {
726 enum PersistTarget {
727 Tail { blk_id: u64, off: u64 },
728 Sealed { idx: u64, off: u64 },
729 None,
730 }
731 let mut target = PersistTarget::None;
732
733 if hold_lock_during_io {
734 let mut info = info_opt.take().expect("column lock should be held");
736 if checkpoint {
737 if saw_tail {
738 info.cur_block_idx = chain_len_at_plan;
739 info.cur_block_offset = 0;
740 info.tail_block_id = final_tail_block_id;
741 info.tail_offset = final_tail_offset;
742 target = PersistTarget::Tail {
743 blk_id: final_tail_block_id,
744 off: final_tail_offset,
745 };
746 } else {
747 info.cur_block_idx = final_block_idx;
748 info.cur_block_offset = final_block_offset;
749 target = PersistTarget::Sealed {
750 idx: final_block_idx as u64,
751 off: final_block_offset,
752 };
753 }
754 }
755 drop(info);
756 } else {
757 let mut info2 = info_arc.write().map_err(|_| {
759 io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
760 })?;
761 if checkpoint {
762 if saw_tail {
763 info2.cur_block_idx = chain_len_at_plan;
764 info2.cur_block_offset = 0;
765 info2.tail_block_id = final_tail_block_id;
766 info2.tail_offset = final_tail_offset;
767 if let ReadConsistency::AtLeastOnce { persist_every } =
768 self.read_consistency
769 {
770 let room = persist_every
772 .saturating_sub(1)
773 .saturating_sub(info2.reads_since_persist);
774 let add = entries_parsed.min(room);
775 info2.reads_since_persist =
776 info2.reads_since_persist.saturating_add(add);
777 }
779 } else {
780 info2.cur_block_idx = final_block_idx;
781 info2.cur_block_offset = final_block_offset;
782 if let ReadConsistency::AtLeastOnce { persist_every } =
783 self.read_consistency
784 {
785 let room = persist_every
786 .saturating_sub(1)
787 .saturating_sub(info2.reads_since_persist);
788 let add = entries_parsed.min(room);
789 info2.reads_since_persist =
790 info2.reads_since_persist.saturating_add(add);
791 }
792 }
793 }
794 drop(info2);
795 }
796
797 if checkpoint {
798 match target {
799 PersistTarget::Tail { blk_id, off } => {
800 if let Ok(mut idx_guard) = self.read_offset_index.write() {
801 let _ = idx_guard.set(col_name.to_string(), blk_id | TAIL_FLAG, off);
802 }
803 }
804 PersistTarget::Sealed { idx, off } => {
805 if let Ok(mut idx_guard) = self.read_offset_index.write() {
806 let _ = idx_guard.set(col_name.to_string(), idx, off);
807 }
808 }
809 PersistTarget::None => {}
810 }
811 }
812 }
813
814 Ok(entries)
815 }
816}