walrus_rust/wal/runtime/
walrus_read.rs

1use super::allocator::BlockStateTracker;
2use super::reader::ColReaderInfo;
3use super::{ReadConsistency, Walrus};
4use crate::wal::block::{Block, Entry, Metadata};
5use crate::wal::config::{MAX_BATCH_ENTRIES, PREFIX_META_SIZE, checksum64, debug_print};
6use std::io;
7use std::sync::{Arc, RwLock};
8
9use rkyv::{AlignedVec, Deserialize};
10
11#[cfg(target_os = "linux")]
12use crate::wal::config::USE_FD_BACKEND;
13#[cfg(target_os = "linux")]
14use std::sync::atomic::Ordering;
15
16#[cfg(target_os = "linux")]
17use io_uring;
18
19#[cfg(target_os = "linux")]
20use std::os::unix::io::AsRawFd;
21
22impl Walrus {
23    pub fn read_next(&self, col_name: &str, checkpoint: bool) -> io::Result<Option<Entry>> {
24        const TAIL_FLAG: u64 = 1u64 << 63;
25        let info_arc = if let Some(arc) = {
26            let map = self.reader.data.read().map_err(|_| {
27                io::Error::new(io::ErrorKind::Other, "reader map read lock poisoned")
28            })?;
29            map.get(col_name).cloned()
30        } {
31            arc
32        } else {
33            let mut map = self.reader.data.write().map_err(|_| {
34                io::Error::new(io::ErrorKind::Other, "reader map write lock poisoned")
35            })?;
36            map.entry(col_name.to_string())
37                .or_insert_with(|| {
38                    Arc::new(RwLock::new(ColReaderInfo {
39                        chain: Vec::new(),
40                        cur_block_idx: 0,
41                        cur_block_offset: 0,
42                        reads_since_persist: 0,
43                        tail_block_id: 0,
44                        tail_offset: 0,
45                        hydrated_from_index: false,
46                    }))
47                })
48                .clone()
49        };
50        let mut info = info_arc
51            .write()
52            .map_err(|_| io::Error::new(io::ErrorKind::Other, "col info write lock poisoned"))?;
53        debug_print!(
54            "[reader] read_next start: col={}, chain_len={}, idx={}, offset={}",
55            col_name,
56            info.chain.len(),
57            info.cur_block_idx,
58            info.cur_block_offset
59        );
60
61        // Load persisted position (supports tail sentinel)
62        let mut persisted_tail: Option<(u64 /*block_id*/, u64 /*offset*/)> = None;
63        if !info.hydrated_from_index {
64            if let Ok(idx_guard) = self.read_offset_index.read() {
65                if let Some(pos) = idx_guard.get(col_name) {
66                    if (pos.cur_block_idx & TAIL_FLAG) != 0 {
67                        let tail_block_id = pos.cur_block_idx & (!TAIL_FLAG);
68                        persisted_tail = Some((tail_block_id, pos.cur_block_offset));
69                        // sealed state is considered caught up
70                        info.cur_block_idx = info.chain.len();
71                        info.cur_block_offset = 0;
72                    } else {
73                        let mut ib = pos.cur_block_idx as usize;
74                        if ib > info.chain.len() {
75                            ib = info.chain.len();
76                        }
77                        info.cur_block_idx = ib;
78                        if ib < info.chain.len() {
79                            let used = info.chain[ib].used;
80                            info.cur_block_offset = pos.cur_block_offset.min(used);
81                        } else {
82                            info.cur_block_offset = 0;
83                        }
84                    }
85                    info.hydrated_from_index = true;
86                } else {
87                    // No persisted state present; mark hydrated to avoid re-checking every call
88                    info.hydrated_from_index = true;
89                }
90            }
91        }
92
93        // If we have a persisted tail and some sealed blocks were recovered, fold into the last block
94        if let Some((tail_block_id, tail_off)) = persisted_tail {
95            if !info.chain.is_empty() {
96                if let Some((idx, block)) = info
97                    .chain
98                    .iter()
99                    .enumerate()
100                    .find(|(_, b)| b.id == tail_block_id)
101                {
102                    let used = block.used;
103                    info.cur_block_idx = idx;
104                    info.cur_block_offset = tail_off.min(used);
105                } else {
106                    info.cur_block_idx = 0;
107                    info.cur_block_offset = 0;
108                }
109            }
110            persisted_tail = None;
111        }
112
113        // Important: release the per-column lock; we'll reacquire each iteration
114        drop(info);
115
116        loop {
117            // Reacquire column lock at the start of each iteration
118            let mut info = info_arc.write().map_err(|_| {
119                io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
120            })?;
121            // Sealed chain path
122            if info.cur_block_idx < info.chain.len() {
123                let idx = info.cur_block_idx;
124                let off = info.cur_block_offset;
125                let block = info.chain[idx].clone();
126
127                if off >= block.used {
128                    debug_print!(
129                        "[reader] read_next: advance block col={}, block_id={}, offset={}, used={}",
130                        col_name,
131                        block.id,
132                        off,
133                        block.used
134                    );
135                    BlockStateTracker::set_checkpointed_true(block.id as usize);
136                    info.cur_block_idx += 1;
137                    info.cur_block_offset = 0;
138                    continue;
139                }
140
141                match block.read(off) {
142                    Ok((entry, consumed)) => {
143                        // Compute new offset and decide whether to commit progress
144                        let new_off = off + consumed as u64;
145                        let mut maybe_persist = None;
146                        if checkpoint {
147                            info.cur_block_offset = new_off;
148                            maybe_persist = if self.should_persist(&mut info, false) {
149                                Some((info.cur_block_idx as u64, new_off))
150                            } else {
151                                None
152                            };
153                        }
154
155                        // Drop the column lock before touching the index to avoid lock inversion
156                        drop(info);
157                        if checkpoint {
158                            if let Some((idx_val, off_val)) = maybe_persist {
159                                if let Ok(mut idx_guard) = self.read_offset_index.write() {
160                                    let _ = idx_guard.set(col_name.to_string(), idx_val, off_val);
161                                }
162                            }
163                        }
164
165                        debug_print!(
166                            "[reader] read_next: OK col={}, block_id={}, consumed={}, new_offset={}",
167                            col_name,
168                            block.id,
169                            consumed,
170                            new_off
171                        );
172                        return Ok(Some(entry));
173                    }
174                    Err(_) => {
175                        debug_print!(
176                            "[reader] read_next: read error col={}, block_id={}, offset={}",
177                            col_name,
178                            block.id,
179                            off
180                        );
181                        return Ok(None);
182                    }
183                }
184            }
185
186            // Tail path
187            let tail_snapshot = (info.tail_block_id, info.tail_offset);
188            drop(info);
189
190            let writer_arc = {
191                let map = self.writers.read().map_err(|_| {
192                    io::Error::new(io::ErrorKind::Other, "writers read lock poisoned")
193                })?;
194                match map.get(col_name) {
195                    Some(w) => w.clone(),
196                    None => return Ok(None),
197                }
198            };
199            let (active_block, written) = writer_arc.snapshot_block()?;
200
201            // If persisted tail points to a different block and that block is now sealed in chain, fold it
202            // Reacquire column lock for folding/rebasing decisions
203            let mut info = info_arc.write().map_err(|_| {
204                io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
205            })?;
206            if let Some((tail_block_id, tail_off)) = persisted_tail {
207                if tail_block_id != active_block.id {
208                    if let Some((idx, _)) = info
209                        .chain
210                        .iter()
211                        .enumerate()
212                        .find(|(_, b)| b.id == tail_block_id)
213                    {
214                        info.cur_block_idx = idx;
215                        info.cur_block_offset = tail_off.min(info.chain[idx].used);
216                        if checkpoint {
217                            if self.should_persist(&mut info, true) {
218                                if let Ok(mut idx_guard) = self.read_offset_index.write() {
219                                    let _ = idx_guard.set(
220                                        col_name.to_string(),
221                                        info.cur_block_idx as u64,
222                                        info.cur_block_offset,
223                                    );
224                                }
225                            }
226                        }
227                        persisted_tail = None; // sealed now
228                        drop(info);
229                        continue;
230                    } else {
231                        // rebase tail to current active block at 0
232                        persisted_tail = Some((active_block.id, 0));
233                        if checkpoint {
234                            if self.should_persist(&mut info, true) {
235                                if let Ok(mut idx_guard) = self.read_offset_index.write() {
236                                    let _ = idx_guard.set(
237                                        col_name.to_string(),
238                                        active_block.id | TAIL_FLAG,
239                                        0,
240                                    );
241                                }
242                            }
243                        }
244                    }
245                }
246            } else {
247                // No persisted tail; init at current active block start
248                persisted_tail = Some((active_block.id, 0));
249                if checkpoint {
250                    if self.should_persist(&mut info, true) {
251                        if let Ok(mut idx_guard) = self.read_offset_index.write() {
252                            let _ =
253                                idx_guard.set(col_name.to_string(), active_block.id | TAIL_FLAG, 0);
254                        }
255                    }
256                }
257            }
258            drop(info);
259
260            // Choose the best known tail offset: prefer in-memory snapshot for current active block
261            let (tail_block_id, mut tail_off) = match persisted_tail {
262                Some(v) => v,
263                None => return Ok(None),
264            };
265            if tail_block_id == active_block.id {
266                let (snap_id, snap_off) = tail_snapshot;
267                if snap_id == active_block.id {
268                    tail_off = tail_off.max(snap_off);
269                }
270            } else {
271                // If writer rotated and persisted tail points elsewhere, loop above will fold/rebase
272            }
273            // If writer rotated after we set persisted_tail, loop to fold/rebase
274            if tail_block_id != active_block.id {
275                // Loop to next iteration; `info` will be reacquired at loop top
276                continue;
277            }
278
279            if tail_off < written {
280                match active_block.read(tail_off) {
281                    Ok((entry, consumed)) => {
282                        let new_off = tail_off + consumed as u64;
283                        // Reacquire column lock to update in-memory progress, then decide persistence
284                        let mut info = info_arc.write().map_err(|_| {
285                            io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
286                        })?;
287                        let mut maybe_persist = None;
288                        if checkpoint {
289                            info.tail_block_id = active_block.id;
290                            info.tail_offset = new_off;
291                            maybe_persist = if self.should_persist(&mut info, false) {
292                                Some((tail_block_id | TAIL_FLAG, new_off))
293                            } else {
294                                None
295                            };
296                        }
297                        drop(info);
298                        if checkpoint {
299                            if let Some((idx_val, off_val)) = maybe_persist {
300                                if let Ok(mut idx_guard) = self.read_offset_index.write() {
301                                    let _ = idx_guard.set(col_name.to_string(), idx_val, off_val);
302                                }
303                            }
304                        }
305
306                        debug_print!(
307                            "[reader] read_next: tail OK col={}, block_id={}, consumed={}, new_tail_off={}",
308                            col_name,
309                            active_block.id,
310                            consumed,
311                            new_off
312                        );
313                        return Ok(Some(entry));
314                    }
315                    Err(_) => {
316                        debug_print!(
317                            "[reader] read_next: tail read error col={}, block_id={}, offset={}",
318                            col_name,
319                            active_block.id,
320                            tail_off
321                        );
322                        return Ok(None);
323                    }
324                }
325            } else {
326                debug_print!(
327                    "[reader] read_next: tail caught up col={}, block_id={}, off={}, written={}",
328                    col_name,
329                    active_block.id,
330                    tail_off,
331                    written
332                );
333                return Ok(None);
334            }
335        }
336    }
337
338    fn should_persist(&self, info: &mut ColReaderInfo, force: bool) -> bool {
339        match self.read_consistency {
340            ReadConsistency::StrictlyAtOnce => true,
341            ReadConsistency::AtLeastOnce { persist_every } => {
342                let every = persist_every.max(1);
343                if force {
344                    info.reads_since_persist = 0;
345                    return true;
346                }
347                let next = info.reads_since_persist.saturating_add(1);
348                if next >= every {
349                    info.reads_since_persist = 0;
350                    true
351                } else {
352                    info.reads_since_persist = next;
353                    false
354                }
355            }
356        }
357    }
358
359    pub fn batch_read_for_topic(
360        &self,
361        col_name: &str,
362        max_bytes: usize,
363        checkpoint: bool,
364    ) -> io::Result<Vec<Entry>> {
365        // Helper struct for read planning
366        struct ReadPlan {
367            blk: Block,
368            start: u64,
369            end: u64,
370            is_tail: bool,
371            chain_idx: Option<usize>,
372        }
373
374        const TAIL_FLAG: u64 = 1u64 << 63;
375
376        // Pre-snapshot active writer state to avoid lock-order inversion later
377        let writer_snapshot: Option<(Block, u64)> = {
378            let map = self
379                .writers
380                .read()
381                .map_err(|_| io::Error::new(io::ErrorKind::Other, "writers read lock poisoned"))?;
382            match map.get(col_name).cloned() {
383                Some(w) => match w.snapshot_block() {
384                    Ok(snapshot) => Some(snapshot),
385                    Err(_) => None,
386                },
387                None => None,
388            }
389        };
390
391        // 1) Get or create reader info
392        let info_arc = if let Some(arc) = {
393            let map = self.reader.data.read().map_err(|_| {
394                io::Error::new(io::ErrorKind::Other, "reader map read lock poisoned")
395            })?;
396            map.get(col_name).cloned()
397        } {
398            arc
399        } else {
400            let mut map = self.reader.data.write().map_err(|_| {
401                io::Error::new(io::ErrorKind::Other, "reader map write lock poisoned")
402            })?;
403            map.entry(col_name.to_string())
404                .or_insert_with(|| {
405                    Arc::new(RwLock::new(ColReaderInfo {
406                        chain: Vec::new(),
407                        cur_block_idx: 0,
408                        cur_block_offset: 0,
409                        reads_since_persist: 0,
410                        tail_block_id: 0,
411                        tail_offset: 0,
412                        hydrated_from_index: false,
413                    }))
414                })
415                .clone()
416        };
417
418        let mut info = info_arc
419            .write()
420            .map_err(|_| io::Error::new(io::ErrorKind::Other, "col info write lock poisoned"))?;
421
422        // Hydrate from index if needed
423        let mut persisted_tail_for_fold: Option<(u64 /*block_id*/, u64 /*offset*/)> = None;
424        if !info.hydrated_from_index {
425            if let Ok(idx_guard) = self.read_offset_index.read() {
426                if let Some(pos) = idx_guard.get(col_name) {
427                    if (pos.cur_block_idx & TAIL_FLAG) != 0 {
428                        let tail_block_id = pos.cur_block_idx & (!TAIL_FLAG);
429                        info.tail_block_id = tail_block_id;
430                        info.tail_offset = pos.cur_block_offset;
431                        info.cur_block_idx = info.chain.len();
432                        info.cur_block_offset = 0;
433                        persisted_tail_for_fold = Some((tail_block_id, pos.cur_block_offset));
434                    } else {
435                        let mut ib = pos.cur_block_idx as usize;
436                        if ib > info.chain.len() {
437                            ib = info.chain.len();
438                        }
439                        info.cur_block_idx = ib;
440                        if ib < info.chain.len() {
441                            let used = info.chain[ib].used;
442                            info.cur_block_offset = pos.cur_block_offset.min(used);
443                        } else {
444                            info.cur_block_offset = 0;
445                        }
446                    }
447
448                    info.hydrated_from_index = true;
449                } else {
450                    info.hydrated_from_index = true;
451                }
452            }
453        }
454
455        // Fold persisted tail into sealed blocks if possible
456        if let Some((tail_block_id, tail_off)) = persisted_tail_for_fold {
457            if let Some(idx) = info
458                .chain
459                .iter()
460                .enumerate()
461                .find(|(_, b)| b.id == tail_block_id)
462                .map(|(idx, _)| idx)
463            {
464                let used = info.chain[idx].used;
465                info.cur_block_idx = idx;
466                info.cur_block_offset = tail_off.min(used);
467            }
468        }
469
470        // 2) Build read plan up to byte and entry limits
471        let mut plan: Vec<ReadPlan> = Vec::new();
472        let mut planned_bytes: usize = 0;
473        let chain_len_at_plan = info.chain.len();
474        let mut cur_idx = info.cur_block_idx;
475        let mut cur_off = info.cur_block_offset;
476
477        while cur_idx < info.chain.len() && planned_bytes < max_bytes {
478            let block = info.chain[cur_idx].clone();
479            if cur_off >= block.used {
480                BlockStateTracker::set_checkpointed_true(block.id as usize);
481                cur_idx += 1;
482                cur_off = 0;
483                continue;
484            }
485
486            let end = block.used.min(cur_off + (max_bytes - planned_bytes) as u64);
487            if end > cur_off {
488                plan.push(ReadPlan {
489                    blk: block.clone(),
490                    start: cur_off,
491                    end,
492                    is_tail: false,
493                    chain_idx: Some(cur_idx),
494                });
495                planned_bytes += (end - cur_off) as usize;
496            }
497            cur_idx += 1;
498            cur_off = 0;
499        }
500
501        // Plan tail if we're at the end of sealed chain
502        if cur_idx >= chain_len_at_plan {
503            if let Some((active_block, written)) = writer_snapshot.clone() {
504                // Use in-memory tail progress if available for this block
505                let tail_start = if info.tail_block_id == active_block.id {
506                    info.tail_offset
507                } else {
508                    0
509                };
510                if tail_start < written {
511                    let end = written; // read up to current writer offset
512                    plan.push(ReadPlan {
513                        blk: active_block.clone(),
514                        start: tail_start,
515                        end,
516                        is_tail: true,
517                        chain_idx: None,
518                    });
519                }
520            }
521        }
522
523        if plan.is_empty() {
524            return Ok(Vec::new());
525        }
526
527        // Hold lock across IO/parse for StrictlyAtOnce to avoid duplicate consumption
528        let hold_lock_during_io = matches!(self.read_consistency, ReadConsistency::StrictlyAtOnce);
529        // Manage the guard explicitly to satisfy the borrow checker
530        let mut info_opt = Some(info);
531        if !hold_lock_during_io {
532            // Release lock for AtLeastOnce before IO
533            drop(info_opt.take().unwrap());
534        }
535
536        // 3) Read ranges via io_uring (FD backend) or mmap
537        #[cfg(target_os = "linux")]
538        let buffers = if USE_FD_BACKEND.load(Ordering::Relaxed) {
539            // io_uring path
540            let ring_size = (plan.len() + 64).min(4096) as u32;
541            let mut ring = io_uring::IoUring::new(ring_size).map_err(|e| {
542                io::Error::new(io::ErrorKind::Other, format!("io_uring init failed: {}", e))
543            })?;
544
545            let mut temp_buffers: Vec<Vec<u8>> = vec![Vec::new(); plan.len()];
546            let mut expected_sizes: Vec<usize> = vec![0; plan.len()];
547
548            for (plan_idx, read_plan) in plan.iter().enumerate() {
549                let size = (read_plan.end - read_plan.start) as usize;
550                expected_sizes[plan_idx] = size;
551                let mut buffer = vec![0u8; size];
552                let file_offset = read_plan.blk.offset + read_plan.start;
553
554                let fd = if let Some(fd_backend) = read_plan.blk.mmap.storage().as_fd() {
555                    io_uring::types::Fd(fd_backend.file().as_raw_fd())
556                } else {
557                    return Err(io::Error::new(
558                        io::ErrorKind::Unsupported,
559                        "batch reads require FD backend when io_uring is enabled",
560                    ));
561                };
562
563                let read_op = io_uring::opcode::Read::new(fd, buffer.as_mut_ptr(), size as u32)
564                    .offset(file_offset)
565                    .build()
566                    .user_data(plan_idx as u64);
567
568                temp_buffers[plan_idx] = buffer;
569
570                unsafe {
571                    ring.submission().push(&read_op).map_err(|e| {
572                        io::Error::new(io::ErrorKind::Other, format!("io_uring push failed: {}", e))
573                    })?;
574                }
575            }
576
577            // Submit and wait for all reads
578            ring.submit_and_wait(plan.len())?;
579
580            // Process completions and validate read lengths
581            for _ in 0..plan.len() {
582                if let Some(cqe) = ring.completion().next() {
583                    let plan_idx = cqe.user_data() as usize;
584                    let got = cqe.result();
585                    if got < 0 {
586                        return Err(io::Error::new(
587                            io::ErrorKind::Other,
588                            format!("io_uring read failed: {}", got),
589                        ));
590                    }
591                    if (got as usize) != expected_sizes[plan_idx] {
592                        return Err(io::Error::new(
593                            io::ErrorKind::UnexpectedEof,
594                            format!(
595                                "short read: got {} bytes, expected {}",
596                                got, expected_sizes[plan_idx]
597                            ),
598                        ));
599                    }
600                }
601            }
602
603            temp_buffers
604        } else {
605            plan.iter()
606                .map(|read_plan| {
607                    let size = (read_plan.end - read_plan.start) as usize;
608                    let mut buffer = vec![0u8; size];
609                    let file_offset = (read_plan.blk.offset + read_plan.start) as usize;
610                    read_plan.blk.mmap.read(file_offset, &mut buffer);
611                    buffer
612                })
613                .collect()
614        };
615
616        #[cfg(not(target_os = "linux"))]
617        let buffers: Vec<Vec<u8>> = plan
618            .iter()
619            .map(|read_plan| {
620                let size = (read_plan.end - read_plan.start) as usize;
621                let mut buffer = vec![0u8; size];
622                let file_offset = (read_plan.blk.offset + read_plan.start) as usize;
623                read_plan.blk.mmap.read(file_offset, &mut buffer);
624                buffer
625            })
626            .collect();
627
628        // 4) Parse entries from buffers in plan order
629        let mut entries = Vec::new();
630        let mut total_data_bytes = 0usize;
631        let mut final_block_idx = 0usize;
632        let mut final_block_offset = 0u64;
633        let mut final_tail_block_id = 0u64;
634        let mut final_tail_offset = 0u64;
635        let mut entries_parsed = 0u32;
636        let mut saw_tail = false;
637
638        for (plan_idx, read_plan) in plan.iter().enumerate() {
639            if entries.len() >= MAX_BATCH_ENTRIES {
640                break;
641            }
642            let buffer = &buffers[plan_idx];
643            let mut buf_offset = 0usize;
644
645            while buf_offset < buffer.len() {
646                if entries.len() >= MAX_BATCH_ENTRIES {
647                    break;
648                }
649                // Try to read metadata header
650                if buf_offset + PREFIX_META_SIZE > buffer.len() {
651                    break; // Not enough data for header
652                }
653
654                let meta_len =
655                    (buffer[buf_offset] as usize) | ((buffer[buf_offset + 1] as usize) << 8);
656
657                if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
658                    // Invalid or zeroed header - stop parsing this block
659                    break;
660                }
661
662                // Deserialize metadata
663                let mut aligned = AlignedVec::with_capacity(meta_len);
664                aligned.extend_from_slice(&buffer[buf_offset + 2..buf_offset + 2 + meta_len]);
665
666                let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
667                let meta: Metadata = match archived.deserialize(&mut rkyv::Infallible) {
668                    Ok(m) => m,
669                    Err(_) => break, // Parse error - stop
670                };
671
672                let data_size = meta.read_size;
673                let entry_consumed = PREFIX_META_SIZE + data_size;
674
675                // Check if we have enough buffer space for the data
676                if buf_offset + entry_consumed > buffer.len() {
677                    break; // Incomplete entry
678                }
679
680                // Enforce byte budget on payload bytes, but always allow at least one entry.
681                let next_total = total_data_bytes
682                    .checked_add(data_size)
683                    .unwrap_or(usize::MAX);
684                if next_total > max_bytes && !entries.is_empty() {
685                    break;
686                }
687
688                // Extract and verify data
689                let data_start = buf_offset + PREFIX_META_SIZE;
690                let data_end = data_start + data_size;
691                let data_slice = &buffer[data_start..data_end];
692
693                // Verify checksum
694                if checksum64(data_slice) != meta.checksum {
695                    return Err(io::Error::new(
696                        io::ErrorKind::InvalidData,
697                        "checksum mismatch in batch read",
698                    ));
699                }
700
701                // Add to results
702                entries.push(Entry {
703                    data: data_slice.to_vec(),
704                });
705                total_data_bytes = next_total;
706                entries_parsed += 1;
707
708                // Update position tracking
709                let in_block_offset = read_plan.start + buf_offset as u64 + entry_consumed as u64;
710
711                if read_plan.is_tail {
712                    saw_tail = true;
713                    final_tail_block_id = read_plan.blk.id;
714                    final_tail_offset = in_block_offset;
715                } else if let Some(idx) = read_plan.chain_idx {
716                    final_block_idx = idx;
717                    final_block_offset = in_block_offset;
718                }
719
720                buf_offset += entry_consumed;
721            }
722        }
723
724        // 5) Commit progress (optional)
725        if entries_parsed > 0 {
726            enum PersistTarget {
727                Tail { blk_id: u64, off: u64 },
728                Sealed { idx: u64, off: u64 },
729                None,
730            }
731            let mut target = PersistTarget::None;
732
733            if hold_lock_during_io {
734                // We still hold the original write guard here
735                let mut info = info_opt.take().expect("column lock should be held");
736                if checkpoint {
737                    if saw_tail {
738                        info.cur_block_idx = chain_len_at_plan;
739                        info.cur_block_offset = 0;
740                        info.tail_block_id = final_tail_block_id;
741                        info.tail_offset = final_tail_offset;
742                        target = PersistTarget::Tail {
743                            blk_id: final_tail_block_id,
744                            off: final_tail_offset,
745                        };
746                    } else {
747                        info.cur_block_idx = final_block_idx;
748                        info.cur_block_offset = final_block_offset;
749                        target = PersistTarget::Sealed {
750                            idx: final_block_idx as u64,
751                            off: final_block_offset,
752                        };
753                    }
754                }
755                drop(info);
756            } else {
757                // Reacquire to update
758                let mut info2 = info_arc.write().map_err(|_| {
759                    io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
760                })?;
761                if checkpoint {
762                    if saw_tail {
763                        info2.cur_block_idx = chain_len_at_plan;
764                        info2.cur_block_offset = 0;
765                        info2.tail_block_id = final_tail_block_id;
766                        info2.tail_offset = final_tail_offset;
767                        if let ReadConsistency::AtLeastOnce { persist_every } =
768                            self.read_consistency
769                        {
770                            // Clamp contribution so a single call can't reach the threshold
771                            let room = persist_every
772                                .saturating_sub(1)
773                                .saturating_sub(info2.reads_since_persist);
774                            let add = entries_parsed.min(room);
775                            info2.reads_since_persist =
776                                info2.reads_since_persist.saturating_add(add);
777                            // target remains None here to avoid persisting to end in one batch
778                        }
779                    } else {
780                        info2.cur_block_idx = final_block_idx;
781                        info2.cur_block_offset = final_block_offset;
782                        if let ReadConsistency::AtLeastOnce { persist_every } =
783                            self.read_consistency
784                        {
785                            let room = persist_every
786                                .saturating_sub(1)
787                                .saturating_sub(info2.reads_since_persist);
788                            let add = entries_parsed.min(room);
789                            info2.reads_since_persist =
790                                info2.reads_since_persist.saturating_add(add);
791                        }
792                    }
793                }
794                drop(info2);
795            }
796
797            if checkpoint {
798                match target {
799                    PersistTarget::Tail { blk_id, off } => {
800                        if let Ok(mut idx_guard) = self.read_offset_index.write() {
801                            let _ = idx_guard.set(col_name.to_string(), blk_id | TAIL_FLAG, off);
802                        }
803                    }
804                    PersistTarget::Sealed { idx, off } => {
805                        if let Ok(mut idx_guard) = self.read_offset_index.write() {
806                            let _ = idx_guard.set(col_name.to_string(), idx, off);
807                        }
808                    }
809                    PersistTarget::None => {}
810                }
811            }
812        }
813
814        Ok(entries)
815    }
816}