Skip to main content

datafusion_datasource_json/
boundary_stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Streaming boundary-aligned wrapper for newline-delimited JSON range reads.
19//!
20//! [`AlignedBoundaryStream`] wraps a raw byte stream and lazily aligns to
21//! record (newline) boundaries, avoiding the need for separate `get_opts`
22//! calls to locate boundary positions.
23
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::{Context, Poll};
27
28use bytes::Bytes;
29use futures::stream::{BoxStream, Stream};
30use futures::{StreamExt, TryFutureExt};
31use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
32
33/// How far past `raw_end` the initial bounded fetch covers. If the terminating
34/// newline is not found within this window, `ScanningLastTerminator` issues
35/// successive same-sized GETs until the newline is located or EOF is reached.
36pub const END_SCAN_LOOKAHEAD: u64 = 16 * 1024; // 16 KiB
37
38/// Phase of the boundary alignment state machine.
39#[derive(Debug)]
40enum Phase {
41    /// Scanning for the first newline to align the start boundary.
42    ScanningFirstTerminator,
43    /// Passing through aligned data, tracking byte position.
44    FetchingChunks,
45    /// Past the end boundary, scanning for terminating newline.
46    ScanningLastTerminator,
47    /// Stream is exhausted.
48    Done,
49}
50
51/// A stream wrapper that lazily aligns byte boundaries to newline characters.
52///
53/// Given a raw byte stream starting from `fetch_start` (which is `start - 1`
54/// for non-zero starts, or `0`), this stream:
55///
56/// 1. Skips bytes until the first newline is found (start alignment)
57/// 2. Passes through data until the `end` boundary is reached
58/// 3. Continues past `end` to find the terminating newline (end alignment)
59///
60/// When the initial byte stream is exhausted during step 3 and the file has
61/// not been fully read, `ScanningLastTerminator` issues additional bounded
62/// `get_opts` calls (`END_SCAN_LOOKAHEAD` bytes each) until the newline is
63/// found or EOF is reached.
64pub struct AlignedBoundaryStream {
65    inner: BoxStream<'static, object_store::Result<Bytes>>,
66    terminator: u8,
67    /// Effective end boundary. Set to `u64::MAX` when `end >= file_size`
68    /// (last partition), so `FetchingChunks` never transitions to
69    /// `ScanningLastTerminator` and simply streams until EOF is reached.
70    end: u64,
71    /// Cumulative bytes consumed from `inner` (relative to `fetch_start`).
72    bytes_consumed: u64,
73    /// The offset where the current `inner` stream begins.
74    fetch_start: u64,
75    phase: Phase,
76    /// Remainder bytes from `ScanningFirstTerminator` that still need
77    /// end-boundary processing. Consumed by `FetchingChunks` before polling
78    /// `inner`.
79    pending: Option<Bytes>,
80    store: Arc<dyn ObjectStore>,
81    location: object_store::path::Path,
82    /// Total file size; overflow stops when `abs_pos() >= file_size`.
83    file_size: u64,
84}
85
86/// Fetch a bounded byte range from `store` and return it as a stream
87async fn get_stream(
88    store: Arc<dyn ObjectStore>,
89    location: object_store::path::Path,
90    range: std::ops::Range<u64>,
91) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
92    let opts = GetOptions {
93        range: Some(GetRange::Bounded(range.clone())),
94        ..Default::default()
95    };
96    let result = store.get_opts(&location, opts).await?;
97
98    #[cfg(not(target_arch = "wasm32"))]
99    if let GetResultPayload::File(mut file, _path) = result.payload {
100        use std::io::{Read, Seek, SeekFrom};
101        const CHUNK_SIZE: u64 = 8 * 1024;
102
103        file.seek(SeekFrom::Start(range.start)).map_err(|e| {
104            object_store::Error::Generic {
105                store: "local",
106                source: Box::new(e),
107            }
108        })?;
109
110        return Ok(futures::stream::try_unfold(
111            (file, range.end - range.start),
112            move |(mut file, remaining)| async move {
113                if remaining == 0 {
114                    return Ok(None);
115                }
116                let to_read = remaining.min(CHUNK_SIZE);
117                let cap = usize::try_from(to_read).map_err(|e| {
118                    object_store::Error::Generic {
119                        store: "local",
120                        source: Box::new(e),
121                    }
122                })?;
123
124                let mut buf = Vec::with_capacity(cap);
125                let read =
126                    (&mut file)
127                        .take(to_read)
128                        .read_to_end(&mut buf)
129                        .map_err(|e| object_store::Error::Generic {
130                            store: "local",
131                            source: Box::new(e),
132                        })?;
133                Ok(Some((Bytes::from(buf), (file, remaining - read as u64))))
134            },
135        )
136        .boxed());
137    }
138
139    Ok(result.into_stream())
140}
141
142impl AlignedBoundaryStream {
143    /// Open a ranged byte stream from `store` and return a ready-to-poll
144    /// `AlignedBoundaryStream`.
145    ///
146    /// Issues a single bounded `get_opts` call covering
147    /// `[fetch_start, raw_end + END_SCAN_LOOKAHEAD)`.  If the terminating
148    /// newline is not found within that window, `ScanningLastTerminator`
149    /// automatically issues additional `END_SCAN_LOOKAHEAD`-sized GETs
150    /// via `store` until the newline is found or EOF is reached.
151    pub async fn new(
152        store: Arc<dyn ObjectStore>,
153        location: object_store::path::Path,
154        raw_start: u64,
155        raw_end: u64,
156        file_size: u64,
157        terminator: u8,
158    ) -> object_store::Result<Self> {
159        if raw_start >= raw_end || raw_start >= file_size {
160            return Ok(Self {
161                inner: futures::stream::empty().boxed(),
162                terminator,
163                end: 0,
164                bytes_consumed: 0,
165                fetch_start: 0,
166                phase: Phase::Done,
167                pending: None,
168                store,
169                location,
170                file_size,
171            });
172        }
173
174        let (fetch_start, phase) = if raw_start == 0 {
175            (0, Phase::FetchingChunks)
176        } else {
177            (raw_start - 1, Phase::ScanningFirstTerminator)
178        };
179
180        let initial_fetch_end = raw_end.saturating_add(END_SCAN_LOOKAHEAD).min(file_size);
181
182        let inner = get_stream(
183            Arc::clone(&store),
184            location.clone(),
185            fetch_start..initial_fetch_end,
186        )
187        .await?;
188
189        // Last partition reads until EOF is reached — no end-boundary scanning needed.
190        let end = if raw_end >= file_size {
191            u64::MAX
192        } else {
193            raw_end
194        };
195
196        Ok(Self {
197            inner,
198            terminator,
199            end,
200            bytes_consumed: 0,
201            fetch_start,
202            phase,
203            pending: None,
204            store,
205            location,
206            file_size,
207        })
208    }
209
210    /// Current absolute position in the file.
211    fn abs_pos(&self) -> u64 {
212        self.fetch_start + self.bytes_consumed
213    }
214}
215
216impl Stream for AlignedBoundaryStream {
217    type Item = object_store::Result<Bytes>;
218
219    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220        let this = self.get_mut();
221
222        loop {
223            match this.phase {
224                Phase::Done => return Poll::Ready(None),
225
226                Phase::ScanningFirstTerminator => {
227                    // Find the first terminator and skip everything up to
228                    // and including it. Store any remainder in `pending`
229                    // so `FetchingChunks` can apply end-boundary logic to it.
230                    match this.inner.poll_next_unpin(cx) {
231                        Poll::Pending => return Poll::Pending,
232                        Poll::Ready(None) => {
233                            this.phase = Phase::Done;
234                            return Poll::Ready(None);
235                        }
236                        Poll::Ready(Some(Err(e))) => {
237                            this.phase = Phase::Done;
238                            return Poll::Ready(Some(Err(e)));
239                        }
240                        Poll::Ready(Some(Ok(chunk))) => {
241                            this.bytes_consumed += chunk.len() as u64;
242                            match chunk.iter().position(|&b| b == this.terminator) {
243                                Some(pos) => {
244                                    let remainder = chunk.slice((pos + 1)..);
245                                    // The aligned start position is where
246                                    // data begins after the newline.
247                                    let aligned_start =
248                                        this.abs_pos() - remainder.len() as u64;
249                                    if aligned_start >= this.end {
250                                        // Start alignment landed at or past
251                                        // the end boundary — no complete
252                                        // lines in this partition's range.
253                                        this.phase = Phase::Done;
254                                        return Poll::Ready(None);
255                                    }
256                                    if !remainder.is_empty() {
257                                        this.pending = Some(remainder);
258                                    }
259                                    this.phase = Phase::FetchingChunks;
260                                    continue;
261                                }
262                                None => continue,
263                            }
264                        }
265                    }
266                }
267
268                Phase::FetchingChunks => {
269                    // Get the next chunk: pending remainder or inner stream.
270                    let chunk = if let Some(pending) = this.pending.take() {
271                        pending
272                    } else {
273                        match this.inner.poll_next_unpin(cx) {
274                            Poll::Pending => return Poll::Pending,
275                            Poll::Ready(None) => {
276                                this.phase = Phase::Done;
277                                return Poll::Ready(None);
278                            }
279                            Poll::Ready(Some(Err(e))) => {
280                                this.phase = Phase::Done;
281                                return Poll::Ready(Some(Err(e)));
282                            }
283                            Poll::Ready(Some(Ok(chunk))) => {
284                                this.bytes_consumed += chunk.len() as u64;
285                                chunk
286                            }
287                        }
288                    };
289
290                    let pos_after = this.abs_pos();
291
292                    // When end == u64::MAX (last partition), this is always
293                    // true and we stream straight through until EOF is reached.
294                    if pos_after < this.end {
295                        return Poll::Ready(Some(Ok(chunk)));
296                    }
297
298                    if pos_after == this.end {
299                        // Chunk ends exactly at the boundary.
300                        if chunk.last() == Some(&this.terminator) {
301                            this.phase = Phase::Done;
302                        } else {
303                            // No terminator at boundary; any following data
304                            // is past end, so switch to end-scanning.
305                            this.phase = Phase::ScanningLastTerminator;
306                        }
307                        return Poll::Ready(Some(Ok(chunk)));
308                    }
309
310                    // Chunk crosses the end boundary (`pos_after > this.end`).
311                    // Find the first terminator at or after file position
312                    // `this.end - 1` and yield everything up to and
313                    // including it.
314                    //
315                    // `pos_before` is the absolute file position of chunk[0].
316                    // `chunk_in_range_len` is how many bytes of this chunk
317                    // fall within [pos_before, this.end), so chunk[0..
318                    // chunk_in_range_len] is the in-range portion.
319                    // `search_from` is the chunk index of the last in-range
320                    // byte (file position this.end - 1).
321                    //
322                    // Example A: "line1\nline2\nline3\n" (18 bytes), end=8,
323                    // one large chunk arriving with pos_after=18:
324                    //   pos_before         = 18 - 18 = 0
325                    //   chunk_in_range_len =  8 -  0 = 8
326                    //   search_from        = 7   (chunk[7] is file pos 7)
327                    //   chunk[7]='i', chunk[11]='\n' → rel=4
328                    //   yield chunk[..7+4+1] = chunk[..12] = "line1\nline2\n"
329                    //
330                    // Example B: same data, 3-byte chunks, end=8.
331                    // "lin"(pos 0-2) and "e1\n"(pos 3-5) yielded already.
332                    // Now chunk="lin" arrives with pos_after=9:
333                    //   pos_before         = 9 - 3 = 6
334                    //   chunk_in_range_len = 8 - 6 = 2
335                    //   search_from        = 1   (chunk[1] is file pos 7)
336                    //   chunk[1]='i', no '\n' in chunk[1..] → EndScan
337                    let pos_before = pos_after - chunk.len() as u64;
338                    let chunk_in_range_len = (this.end - pos_before) as usize;
339                    let search_from = chunk_in_range_len - 1;
340                    if let Some(rel) = chunk[search_from..]
341                        .iter()
342                        .position(|&b| b == this.terminator)
343                    {
344                        this.phase = Phase::Done;
345                        return Poll::Ready(Some(Ok(
346                            chunk.slice(..search_from + rel + 1)
347                        )));
348                    }
349
350                    // No terminator found; continue scanning in EndScan.
351                    this.phase = Phase::ScanningLastTerminator;
352                    return Poll::Ready(Some(Ok(chunk)));
353                }
354
355                Phase::ScanningLastTerminator => {
356                    match this.inner.poll_next_unpin(cx) {
357                        Poll::Pending => return Poll::Pending,
358                        Poll::Ready(None) => {
359                            // Inner exhausted. Issue the next overflow GET if
360                            // the file has not been fully read yet.
361                            let pos = this.abs_pos();
362                            if pos < this.file_size {
363                                let fetch_end = pos
364                                    .saturating_add(END_SCAN_LOOKAHEAD)
365                                    .min(this.file_size);
366                                let store = Arc::clone(&this.store);
367                                let location = this.location.clone();
368                                this.inner = get_stream(store, location, pos..fetch_end)
369                                    .try_flatten_stream()
370                                    .boxed();
371                                continue;
372                            }
373                            this.phase = Phase::Done;
374                            return Poll::Ready(None);
375                        }
376                        Poll::Ready(Some(Err(e))) => {
377                            this.phase = Phase::Done;
378                            return Poll::Ready(Some(Err(e)));
379                        }
380                        Poll::Ready(Some(Ok(chunk))) => {
381                            this.bytes_consumed += chunk.len() as u64;
382                            if let Some(pos) =
383                                chunk.iter().position(|&b| b == this.terminator)
384                            {
385                                this.phase = Phase::Done;
386                                return Poll::Ready(Some(Ok(chunk.slice(..pos + 1))));
387                            }
388                            // No terminator yet; yield and keep scanning.
389                            return Poll::Ready(Some(Ok(chunk)));
390                        }
391                    }
392                }
393            }
394        }
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::test_utils::{CHUNK_SIZES, make_chunked_store};
402    use futures::TryStreamExt;
403
404    async fn collect_stream(stream: AlignedBoundaryStream) -> Vec<u8> {
405        stream.try_collect::<Vec<Bytes>>().await.unwrap().concat()
406    }
407
408    #[tokio::test]
409    async fn test_start_at_zero_no_end_scan() {
410        // start=0, end >= file_size → pass through everything
411        static DATA: &[u8] = b"line1\nline2\nline3\n";
412        for &cs in CHUNK_SIZES {
413            let (store, path) = make_chunked_store(DATA, cs).await;
414            let s = AlignedBoundaryStream::new(store, path, 0, 100, 18, b'\n')
415                .await
416                .unwrap();
417            assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
418        }
419    }
420
421    #[tokio::test]
422    async fn test_start_aligned_on_newline() {
423        // Data: "line1\nline2\nline3\n"
424        //        0    5 6   11 12  17
425        // start=6 → fetch_start=5. Byte at offset 5 is '\n'.
426        // Should skip the leading '\n' and yield "line2\nline3\n".
427        static DATA: &[u8] = b"line1\nline2\nline3\n";
428        for &cs in CHUNK_SIZES {
429            let (store, path) = make_chunked_store(DATA, cs).await;
430            let s = AlignedBoundaryStream::new(store, path, 6, 100, 18, b'\n')
431                .await
432                .unwrap();
433            assert_eq!(
434                collect_stream(s).await,
435                b"line2\nline3\n",
436                "chunk_size={cs}"
437            );
438        }
439    }
440
441    #[tokio::test]
442    async fn test_start_mid_line() {
443        // start=3, fetch_start=2. Bytes from offset 2: "ne1\nline2\nline3\n".
444        // Should skip "ne1\n" and yield "line2\nline3\n".
445        static DATA: &[u8] = b"line1\nline2\nline3\n";
446        for &cs in CHUNK_SIZES {
447            let (store, path) = make_chunked_store(DATA, cs).await;
448            let s = AlignedBoundaryStream::new(store, path, 3, 100, 18, b'\n')
449                .await
450                .unwrap();
451            assert_eq!(
452                collect_stream(s).await,
453                b"line2\nline3\n",
454                "chunk_size={cs}"
455            );
456        }
457    }
458
459    #[tokio::test]
460    async fn test_end_boundary_mid_line() {
461        // Data: "line1\nline2\nline3\n"
462        //        0    5 6   11 12  17
463        // start=0, end=8. End is mid "line2".
464        // Should yield "line1\nline2\n" (continue past end to find newline).
465        static DATA: &[u8] = b"line1\nline2\nline3\n";
466        for &cs in CHUNK_SIZES {
467            let (store, path) = make_chunked_store(DATA, cs).await;
468            let s = AlignedBoundaryStream::new(store, path, 0, 8, 18, b'\n')
469                .await
470                .unwrap();
471            assert_eq!(
472                collect_stream(s).await,
473                b"line1\nline2\n",
474                "chunk_size={cs}"
475            );
476        }
477    }
478
479    #[tokio::test]
480    async fn test_end_at_eof() {
481        // end >= file_size → no end scanning, pass through everything.
482        static DATA: &[u8] = b"line1\nline2\n";
483        for &cs in CHUNK_SIZES {
484            let (store, path) = make_chunked_store(DATA, cs).await;
485            let s = AlignedBoundaryStream::new(store, path, 0, 12, 12, b'\n')
486                .await
487                .unwrap();
488            assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
489        }
490    }
491
492    #[tokio::test]
493    async fn test_no_newline_in_range() {
494        // start=2, fetch_start=1. Bytes from offset 1: "bcdef" — no newline.
495        // No complete line → empty output.
496        static DATA: &[u8] = b"abcdef";
497        for &cs in CHUNK_SIZES {
498            let (store, path) = make_chunked_store(DATA, cs).await;
499            let s = AlignedBoundaryStream::new(store, path, 2, 6, 6, b'\n')
500                .await
501                .unwrap();
502            assert!(collect_stream(s).await.is_empty(), "chunk_size={cs}");
503        }
504    }
505
506    #[tokio::test]
507    async fn test_start_and_end_alignment() {
508        // Data: "line1\nline2\nline3\nline4\n"
509        //        0    5 6   11 12  17 18  23
510        // start=3, end=14, file_size=24
511        // fetch_start=2, bytes from offset 2: "ne1\nline2\nline3\nline4\n"
512        // Start aligns past "ne1\n"; end=14 is mid "line3", scan to '\n'.
513        // Expected: "line2\nline3\n"
514        static DATA: &[u8] = b"line1\nline2\nline3\nline4\n";
515        for &cs in CHUNK_SIZES {
516            let (store, path) = make_chunked_store(DATA, cs).await;
517            let s = AlignedBoundaryStream::new(store, path, 3, 14, 24, b'\n')
518                .await
519                .unwrap();
520            assert_eq!(
521                collect_stream(s).await,
522                b"line2\nline3\n",
523                "chunk_size={cs}"
524            );
525        }
526    }
527
528    #[tokio::test]
529    async fn test_end_scan_across_chunks() {
530        // end boundary falls before a newline; the terminating newline must be
531        // found by scanning past the end in subsequent chunks.
532        // Data: "line1\nline2\nline3\n" (18 bytes)
533        // start=0, end=7 (mid "line2"), file_size=18 → "line1\nline2\n"
534        static DATA: &[u8] = b"line1\nline2\nline3\n";
535        for &cs in CHUNK_SIZES {
536            let (store, path) = make_chunked_store(DATA, cs).await;
537            let s = AlignedBoundaryStream::new(store, path, 0, 7, 18, b'\n')
538                .await
539                .unwrap();
540            assert_eq!(
541                collect_stream(s).await,
542                b"line1\nline2\n",
543                "chunk_size={cs}"
544            );
545        }
546    }
547
548    #[tokio::test]
549    async fn test_empty_range() {
550        // start >= end — no complete line can exist, regardless of data.
551        static DATA: &[u8] = b"line1\nline2\n";
552        for &cs in CHUNK_SIZES {
553            let (store, path) = make_chunked_store(DATA, cs).await;
554
555            // start > end (non-zero start)
556            let s = AlignedBoundaryStream::new(
557                Arc::clone(&store),
558                path.clone(),
559                10,
560                5,
561                20,
562                b'\n',
563            )
564            .await
565            .unwrap();
566            assert!(
567                collect_stream(s).await.is_empty(),
568                "start>end chunk_size={cs}"
569            );
570
571            // start == end == 0 (zero start, previously unguarded)
572            let s = AlignedBoundaryStream::new(
573                Arc::clone(&store),
574                path.clone(),
575                0,
576                0,
577                12,
578                b'\n',
579            )
580            .await
581            .unwrap();
582            assert!(
583                collect_stream(s).await.is_empty(),
584                "start==end==0 chunk_size={cs}"
585            );
586
587            // start == end (non-zero)
588            let s = AlignedBoundaryStream::new(
589                Arc::clone(&store),
590                path.clone(),
591                6,
592                6,
593                12,
594                b'\n',
595            )
596            .await
597            .unwrap();
598            assert!(
599                collect_stream(s).await.is_empty(),
600                "start==end==6 chunk_size={cs}"
601            );
602        }
603    }
604
605    #[tokio::test]
606    async fn test_start_align_across_chunks() {
607        // The newline needed for start alignment may arrive in any chunk.
608        // fetch_start=0 (start=1). Data: "abcdef\nline2\n" (13 bytes)
609        // Start aligns past "abcdef\n", yielding "line2\n".
610        static DATA: &[u8] = b"abcdef\nline2\n";
611        for &cs in CHUNK_SIZES {
612            let (store, path) = make_chunked_store(DATA, cs).await;
613            let s = AlignedBoundaryStream::new(store, path, 1, 100, 13, b'\n')
614                .await
615                .unwrap();
616            assert_eq!(collect_stream(s).await, b"line2\n", "chunk_size={cs}");
617        }
618    }
619
620    #[tokio::test]
621    async fn test_end_aligned_on_newline() {
622        // end falls right on a newline — line is complete, no end-scan needed.
623        // Data: "line1\nline2\nline3\n"
624        //        0    5 6   11 12  17
625        // start=0, end=6 → byte 5 is '\n' → yield only "line1\n".
626        static DATA: &[u8] = b"line1\nline2\nline3\n";
627        for &cs in CHUNK_SIZES {
628            let (store, path) = make_chunked_store(DATA, cs).await;
629            let s = AlignedBoundaryStream::new(store, path, 0, 6, 18, b'\n')
630                .await
631                .unwrap();
632            assert_eq!(collect_stream(s).await, b"line1\n", "chunk_size={cs}");
633        }
634    }
635
636    #[tokio::test]
637    async fn test_adjacent_partitions_no_overlap() {
638        // Three adjacent partitions over "line1\nline2\nline3\n".
639        // Partition 1: [0, 6), fetch_start=0  → stream full file
640        // Partition 2: [6, 12), fetch_start=5 → stream from offset 5
641        // Partition 3: [12, 18), fetch_start=11 → stream from offset 11
642        static DATA: &[u8] = b"line1\nline2\nline3\n"; // 18 bytes
643
644        for &cs in CHUNK_SIZES {
645            let (store, path) = make_chunked_store(DATA, cs).await;
646            let r1 = collect_stream(
647                AlignedBoundaryStream::new(
648                    Arc::clone(&store),
649                    path.clone(),
650                    0,
651                    6,
652                    18,
653                    b'\n',
654                )
655                .await
656                .unwrap(),
657            )
658            .await;
659            let r2 = collect_stream(
660                AlignedBoundaryStream::new(
661                    Arc::clone(&store),
662                    path.clone(),
663                    6,
664                    12,
665                    18,
666                    b'\n',
667                )
668                .await
669                .unwrap(),
670            )
671            .await;
672            let r3 = collect_stream(
673                AlignedBoundaryStream::new(
674                    Arc::clone(&store),
675                    path.clone(),
676                    12,
677                    18,
678                    18,
679                    b'\n',
680                )
681                .await
682                .unwrap(),
683            )
684            .await;
685
686            assert_eq!(r1, b"line1\n", "p1 chunk_size={cs}");
687            assert_eq!(r2, b"line2\n", "p2 chunk_size={cs}");
688            assert_eq!(r3, b"line3\n", "p3 chunk_size={cs}");
689
690            let mut combined = r1;
691            combined.extend(r2);
692            combined.extend(r3);
693            assert_eq!(combined, DATA, "combined chunk_size={cs}");
694        }
695    }
696
697    #[tokio::test]
698    async fn test_start_align_past_end_returns_empty() {
699        // The first aligned start lands at or past the end boundary.
700        // Data: "abcdefghij\nkl\n" (14 bytes)
701        //        0         10 11 13
702        // Partition [3, 6): start=3, end=6, fetch_start=2
703        // Bytes from offset 2: "cdefghij\nkl\n". First '\n' at offset 10;
704        // aligned start = 11, which is >= end = 6 → empty.
705        static DATA: &[u8] = b"abcdefghij\nkl\n";
706        for &cs in CHUNK_SIZES {
707            let (store, path) = make_chunked_store(DATA, cs).await;
708            let s = AlignedBoundaryStream::new(store, path, 3, 6, 14, b'\n')
709                .await
710                .unwrap();
711            assert!(collect_stream(s).await.is_empty(), "chunk_size={cs}");
712        }
713    }
714
715    #[tokio::test]
716    async fn test_unaligned_partitions_no_overlap() {
717        // Partitions that don't fall on line boundaries.
718        // Data: "aaa\nbbb\nccc\n" (12 bytes)
719        //        0  3 4  7 8  11
720        // Partitions: [0, 5), [5, 10), [10, 12)
721        static DATA: &[u8] = b"aaa\nbbb\nccc\n"; // 12 bytes
722
723        for &cs in CHUNK_SIZES {
724            let (store, path) = make_chunked_store(DATA, cs).await;
725
726            // [0, 5): no start alignment; end=5 mid "bbb", scans to '\n' at 7.
727            let r1 = collect_stream(
728                AlignedBoundaryStream::new(
729                    Arc::clone(&store),
730                    path.clone(),
731                    0,
732                    5,
733                    12,
734                    b'\n',
735                )
736                .await
737                .unwrap(),
738            )
739            .await;
740
741            // [5, 10): fetch_start=4, bytes from offset 4: "bbb\nccc\n".
742            // '\n' at pos 3 → aligned start=8 ("ccc\n"). End=10 mid "ccc",
743            // scans to '\n' at 11 → yields "ccc\n".
744            let r2 = collect_stream(
745                AlignedBoundaryStream::new(
746                    Arc::clone(&store),
747                    path.clone(),
748                    5,
749                    10,
750                    12,
751                    b'\n',
752                )
753                .await
754                .unwrap(),
755            )
756            .await;
757
758            // [10, 12): fetch_start=9, bytes from offset 9: "cc\n".
759            // '\n' at pos 2 → aligned start=12. end=12==file_size → end=MAX.
760            // Remainder after '\n' is empty; Passthrough polls inner → Done.
761            let r3 = collect_stream(
762                AlignedBoundaryStream::new(
763                    Arc::clone(&store),
764                    path.clone(),
765                    10,
766                    12,
767                    12,
768                    b'\n',
769                )
770                .await
771                .unwrap(),
772            )
773            .await;
774
775            assert_eq!(r1, b"aaa\nbbb\n", "p1 chunk_size={cs}");
776            assert_eq!(r2, b"ccc\n", "p2 chunk_size={cs}");
777            assert!(r3.is_empty(), "p3 chunk_size={cs}");
778
779            let mut combined = r1;
780            combined.extend(r2);
781            combined.extend(r3);
782            assert_eq!(combined, DATA, "combined chunk_size={cs}");
783        }
784    }
785
786    #[tokio::test]
787    async fn test_no_trailing_newline() {
788        // Last partition of a file that does not end with a newline.
789        // end >= file_size → this.end = u64::MAX, so Passthrough streams straight
790        // until EOF is reached and yields the final incomplete line as-is.
791        static DATA: &[u8] = b"line1\nline2"; // 11 bytes, no trailing '\n'
792        for &cs in CHUNK_SIZES {
793            let (store, path) = make_chunked_store(DATA, cs).await;
794
795            // Single partition covering the whole file.
796            let s = AlignedBoundaryStream::new(
797                Arc::clone(&store),
798                path.clone(),
799                0,
800                11,
801                11,
802                b'\n',
803            )
804            .await
805            .unwrap();
806            assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
807
808            // Last partition starting mid-file (start=6, fetch_start=5).
809            // Bytes from offset 5: "\nline2".
810            // StartAlign consumes '\n', remainder "line2" is yielded as-is.
811            let s = AlignedBoundaryStream::new(
812                Arc::clone(&store),
813                path.clone(),
814                6,
815                11,
816                11,
817                b'\n',
818            )
819            .await
820            .unwrap();
821            assert_eq!(collect_stream(s).await, b"line2", "tail chunk_size={cs}");
822        }
823    }
824
825    #[tokio::test]
826    async fn test_overflow_fetch() {
827        // First line is longer than 2 * END_SCAN_LOOKAHEAD so the initial
828        // bounded fetch [fetch_start, raw_end + END_SCAN_LOOKAHEAD) does not
829        // reach its newline.  ScanningLastTerminator must issue overflow GETs
830        // to find it.
831        //
832        // Partition [0, 1): raw_end=1, initial_fetch_end=1+16384=16385.
833        // The newline is at byte 32768 > 16385 → one overflow GET required.
834        // Partition [1, file_size): start=1 lands mid line-1; ScanningFirstTerminator
835        // skips to byte 32769, then yields "line2\nline3\n".
836        let long_line: Vec<u8> =
837            std::iter::repeat_n(b'A', 2 * END_SCAN_LOOKAHEAD as usize)
838                .chain(std::iter::once(b'\n'))
839                .collect();
840        let rest = b"line2\nline3\n";
841        let mut data = long_line.clone();
842        data.extend_from_slice(rest);
843        let file_size = data.len() as u64;
844
845        for &cs in CHUNK_SIZES {
846            let (store, path) = make_chunked_store(&data, cs).await;
847
848            let r1 = collect_stream(
849                AlignedBoundaryStream::new(
850                    Arc::clone(&store),
851                    path.clone(),
852                    0,
853                    1,
854                    file_size,
855                    b'\n',
856                )
857                .await
858                .unwrap(),
859            )
860            .await;
861
862            let r2 = collect_stream(
863                AlignedBoundaryStream::new(
864                    Arc::clone(&store),
865                    path.clone(),
866                    1,
867                    file_size,
868                    file_size,
869                    b'\n',
870                )
871                .await
872                .unwrap(),
873            )
874            .await;
875
876            assert_eq!(r1, long_line, "p1 chunk_size={cs}");
877            assert_eq!(r2, rest.as_slice(), "p2 chunk_size={cs}");
878
879            let mut combined = r1;
880            combined.extend(r2);
881            assert_eq!(combined, data, "combined chunk_size={cs}");
882        }
883    }
884}