datafusion_datasource_json/boundary_stream.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Streaming boundary-aligned wrapper for newline-delimited JSON range reads.
19//!
20//! [`AlignedBoundaryStream`] wraps a raw byte stream and lazily aligns to
21//! record (newline) boundaries, avoiding the need for separate `get_opts`
22//! calls to locate boundary positions.
23
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::{Context, Poll};
27
28use bytes::Bytes;
29use futures::stream::{BoxStream, Stream};
30use futures::{StreamExt, TryFutureExt};
31use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
32
33/// How far past `raw_end` the initial bounded fetch covers. If the terminating
34/// newline is not found within this window, `ScanningLastTerminator` issues
35/// successive same-sized GETs until the newline is located or EOF is reached.
36pub const END_SCAN_LOOKAHEAD: u64 = 16 * 1024; // 16 KiB
37
38/// Phase of the boundary alignment state machine.
39#[derive(Debug)]
40enum Phase {
41 /// Scanning for the first newline to align the start boundary.
42 ScanningFirstTerminator,
43 /// Passing through aligned data, tracking byte position.
44 FetchingChunks,
45 /// Past the end boundary, scanning for terminating newline.
46 ScanningLastTerminator,
47 /// Stream is exhausted.
48 Done,
49}
50
51/// A stream wrapper that lazily aligns byte boundaries to newline characters.
52///
53/// Given a raw byte stream starting from `fetch_start` (which is `start - 1`
54/// for non-zero starts, or `0`), this stream:
55///
56/// 1. Skips bytes until the first newline is found (start alignment)
57/// 2. Passes through data until the `end` boundary is reached
58/// 3. Continues past `end` to find the terminating newline (end alignment)
59///
60/// When the initial byte stream is exhausted during step 3 and the file has
61/// not been fully read, `ScanningLastTerminator` issues additional bounded
62/// `get_opts` calls (`END_SCAN_LOOKAHEAD` bytes each) until the newline is
63/// found or EOF is reached.
64pub struct AlignedBoundaryStream {
65 inner: BoxStream<'static, object_store::Result<Bytes>>,
66 terminator: u8,
67 /// Effective end boundary. Set to `u64::MAX` when `end >= file_size`
68 /// (last partition), so `FetchingChunks` never transitions to
69 /// `ScanningLastTerminator` and simply streams until EOF is reached.
70 end: u64,
71 /// Cumulative bytes consumed from `inner` (relative to `fetch_start`).
72 bytes_consumed: u64,
73 /// The offset where the current `inner` stream begins.
74 fetch_start: u64,
75 phase: Phase,
76 /// Remainder bytes from `ScanningFirstTerminator` that still need
77 /// end-boundary processing. Consumed by `FetchingChunks` before polling
78 /// `inner`.
79 pending: Option<Bytes>,
80 store: Arc<dyn ObjectStore>,
81 location: object_store::path::Path,
82 /// Total file size; overflow stops when `abs_pos() >= file_size`.
83 file_size: u64,
84}
85
86/// Fetch a bounded byte range from `store` and return it as a stream
87async fn get_stream(
88 store: Arc<dyn ObjectStore>,
89 location: object_store::path::Path,
90 range: std::ops::Range<u64>,
91) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
92 let opts = GetOptions {
93 range: Some(GetRange::Bounded(range.clone())),
94 ..Default::default()
95 };
96 let result = store.get_opts(&location, opts).await?;
97
98 #[cfg(not(target_arch = "wasm32"))]
99 if let GetResultPayload::File(mut file, _path) = result.payload {
100 use std::io::{Read, Seek, SeekFrom};
101 const CHUNK_SIZE: u64 = 8 * 1024;
102
103 file.seek(SeekFrom::Start(range.start)).map_err(|e| {
104 object_store::Error::Generic {
105 store: "local",
106 source: Box::new(e),
107 }
108 })?;
109
110 return Ok(futures::stream::try_unfold(
111 (file, range.end - range.start),
112 move |(mut file, remaining)| async move {
113 if remaining == 0 {
114 return Ok(None);
115 }
116 let to_read = remaining.min(CHUNK_SIZE);
117 let cap = usize::try_from(to_read).map_err(|e| {
118 object_store::Error::Generic {
119 store: "local",
120 source: Box::new(e),
121 }
122 })?;
123
124 let mut buf = Vec::with_capacity(cap);
125 let read =
126 (&mut file)
127 .take(to_read)
128 .read_to_end(&mut buf)
129 .map_err(|e| object_store::Error::Generic {
130 store: "local",
131 source: Box::new(e),
132 })?;
133 Ok(Some((Bytes::from(buf), (file, remaining - read as u64))))
134 },
135 )
136 .boxed());
137 }
138
139 Ok(result.into_stream())
140}
141
142impl AlignedBoundaryStream {
143 /// Open a ranged byte stream from `store` and return a ready-to-poll
144 /// `AlignedBoundaryStream`.
145 ///
146 /// Issues a single bounded `get_opts` call covering
147 /// `[fetch_start, raw_end + END_SCAN_LOOKAHEAD)`. If the terminating
148 /// newline is not found within that window, `ScanningLastTerminator`
149 /// automatically issues additional `END_SCAN_LOOKAHEAD`-sized GETs
150 /// via `store` until the newline is found or EOF is reached.
151 pub async fn new(
152 store: Arc<dyn ObjectStore>,
153 location: object_store::path::Path,
154 raw_start: u64,
155 raw_end: u64,
156 file_size: u64,
157 terminator: u8,
158 ) -> object_store::Result<Self> {
159 if raw_start >= raw_end || raw_start >= file_size {
160 return Ok(Self {
161 inner: futures::stream::empty().boxed(),
162 terminator,
163 end: 0,
164 bytes_consumed: 0,
165 fetch_start: 0,
166 phase: Phase::Done,
167 pending: None,
168 store,
169 location,
170 file_size,
171 });
172 }
173
174 let (fetch_start, phase) = if raw_start == 0 {
175 (0, Phase::FetchingChunks)
176 } else {
177 (raw_start - 1, Phase::ScanningFirstTerminator)
178 };
179
180 let initial_fetch_end = raw_end.saturating_add(END_SCAN_LOOKAHEAD).min(file_size);
181
182 let inner = get_stream(
183 Arc::clone(&store),
184 location.clone(),
185 fetch_start..initial_fetch_end,
186 )
187 .await?;
188
189 // Last partition reads until EOF is reached — no end-boundary scanning needed.
190 let end = if raw_end >= file_size {
191 u64::MAX
192 } else {
193 raw_end
194 };
195
196 Ok(Self {
197 inner,
198 terminator,
199 end,
200 bytes_consumed: 0,
201 fetch_start,
202 phase,
203 pending: None,
204 store,
205 location,
206 file_size,
207 })
208 }
209
210 /// Current absolute position in the file.
211 fn abs_pos(&self) -> u64 {
212 self.fetch_start + self.bytes_consumed
213 }
214}
215
216impl Stream for AlignedBoundaryStream {
217 type Item = object_store::Result<Bytes>;
218
219 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220 let this = self.get_mut();
221
222 loop {
223 match this.phase {
224 Phase::Done => return Poll::Ready(None),
225
226 Phase::ScanningFirstTerminator => {
227 // Find the first terminator and skip everything up to
228 // and including it. Store any remainder in `pending`
229 // so `FetchingChunks` can apply end-boundary logic to it.
230 match this.inner.poll_next_unpin(cx) {
231 Poll::Pending => return Poll::Pending,
232 Poll::Ready(None) => {
233 this.phase = Phase::Done;
234 return Poll::Ready(None);
235 }
236 Poll::Ready(Some(Err(e))) => {
237 this.phase = Phase::Done;
238 return Poll::Ready(Some(Err(e)));
239 }
240 Poll::Ready(Some(Ok(chunk))) => {
241 this.bytes_consumed += chunk.len() as u64;
242 match chunk.iter().position(|&b| b == this.terminator) {
243 Some(pos) => {
244 let remainder = chunk.slice((pos + 1)..);
245 // The aligned start position is where
246 // data begins after the newline.
247 let aligned_start =
248 this.abs_pos() - remainder.len() as u64;
249 if aligned_start >= this.end {
250 // Start alignment landed at or past
251 // the end boundary — no complete
252 // lines in this partition's range.
253 this.phase = Phase::Done;
254 return Poll::Ready(None);
255 }
256 if !remainder.is_empty() {
257 this.pending = Some(remainder);
258 }
259 this.phase = Phase::FetchingChunks;
260 continue;
261 }
262 None => continue,
263 }
264 }
265 }
266 }
267
268 Phase::FetchingChunks => {
269 // Get the next chunk: pending remainder or inner stream.
270 let chunk = if let Some(pending) = this.pending.take() {
271 pending
272 } else {
273 match this.inner.poll_next_unpin(cx) {
274 Poll::Pending => return Poll::Pending,
275 Poll::Ready(None) => {
276 this.phase = Phase::Done;
277 return Poll::Ready(None);
278 }
279 Poll::Ready(Some(Err(e))) => {
280 this.phase = Phase::Done;
281 return Poll::Ready(Some(Err(e)));
282 }
283 Poll::Ready(Some(Ok(chunk))) => {
284 this.bytes_consumed += chunk.len() as u64;
285 chunk
286 }
287 }
288 };
289
290 let pos_after = this.abs_pos();
291
292 // When end == u64::MAX (last partition), this is always
293 // true and we stream straight through until EOF is reached.
294 if pos_after < this.end {
295 return Poll::Ready(Some(Ok(chunk)));
296 }
297
298 if pos_after == this.end {
299 // Chunk ends exactly at the boundary.
300 if chunk.last() == Some(&this.terminator) {
301 this.phase = Phase::Done;
302 } else {
303 // No terminator at boundary; any following data
304 // is past end, so switch to end-scanning.
305 this.phase = Phase::ScanningLastTerminator;
306 }
307 return Poll::Ready(Some(Ok(chunk)));
308 }
309
310 // Chunk crosses the end boundary (`pos_after > this.end`).
311 // Find the first terminator at or after file position
312 // `this.end - 1` and yield everything up to and
313 // including it.
314 //
315 // `pos_before` is the absolute file position of chunk[0].
316 // `chunk_in_range_len` is how many bytes of this chunk
317 // fall within [pos_before, this.end), so chunk[0..
318 // chunk_in_range_len] is the in-range portion.
319 // `search_from` is the chunk index of the last in-range
320 // byte (file position this.end - 1).
321 //
322 // Example A: "line1\nline2\nline3\n" (18 bytes), end=8,
323 // one large chunk arriving with pos_after=18:
324 // pos_before = 18 - 18 = 0
325 // chunk_in_range_len = 8 - 0 = 8
326 // search_from = 7 (chunk[7] is file pos 7)
327 // chunk[7]='i', chunk[11]='\n' → rel=4
328 // yield chunk[..7+4+1] = chunk[..12] = "line1\nline2\n"
329 //
330 // Example B: same data, 3-byte chunks, end=8.
331 // "lin"(pos 0-2) and "e1\n"(pos 3-5) yielded already.
332 // Now chunk="lin" arrives with pos_after=9:
333 // pos_before = 9 - 3 = 6
334 // chunk_in_range_len = 8 - 6 = 2
335 // search_from = 1 (chunk[1] is file pos 7)
336 // chunk[1]='i', no '\n' in chunk[1..] → EndScan
337 let pos_before = pos_after - chunk.len() as u64;
338 let chunk_in_range_len = (this.end - pos_before) as usize;
339 let search_from = chunk_in_range_len - 1;
340 if let Some(rel) = chunk[search_from..]
341 .iter()
342 .position(|&b| b == this.terminator)
343 {
344 this.phase = Phase::Done;
345 return Poll::Ready(Some(Ok(
346 chunk.slice(..search_from + rel + 1)
347 )));
348 }
349
350 // No terminator found; continue scanning in EndScan.
351 this.phase = Phase::ScanningLastTerminator;
352 return Poll::Ready(Some(Ok(chunk)));
353 }
354
355 Phase::ScanningLastTerminator => {
356 match this.inner.poll_next_unpin(cx) {
357 Poll::Pending => return Poll::Pending,
358 Poll::Ready(None) => {
359 // Inner exhausted. Issue the next overflow GET if
360 // the file has not been fully read yet.
361 let pos = this.abs_pos();
362 if pos < this.file_size {
363 let fetch_end = pos
364 .saturating_add(END_SCAN_LOOKAHEAD)
365 .min(this.file_size);
366 let store = Arc::clone(&this.store);
367 let location = this.location.clone();
368 this.inner = get_stream(store, location, pos..fetch_end)
369 .try_flatten_stream()
370 .boxed();
371 continue;
372 }
373 this.phase = Phase::Done;
374 return Poll::Ready(None);
375 }
376 Poll::Ready(Some(Err(e))) => {
377 this.phase = Phase::Done;
378 return Poll::Ready(Some(Err(e)));
379 }
380 Poll::Ready(Some(Ok(chunk))) => {
381 this.bytes_consumed += chunk.len() as u64;
382 if let Some(pos) =
383 chunk.iter().position(|&b| b == this.terminator)
384 {
385 this.phase = Phase::Done;
386 return Poll::Ready(Some(Ok(chunk.slice(..pos + 1))));
387 }
388 // No terminator yet; yield and keep scanning.
389 return Poll::Ready(Some(Ok(chunk)));
390 }
391 }
392 }
393 }
394 }
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::test_utils::{CHUNK_SIZES, make_chunked_store};
402 use futures::TryStreamExt;
403
404 async fn collect_stream(stream: AlignedBoundaryStream) -> Vec<u8> {
405 stream.try_collect::<Vec<Bytes>>().await.unwrap().concat()
406 }
407
408 #[tokio::test]
409 async fn test_start_at_zero_no_end_scan() {
410 // start=0, end >= file_size → pass through everything
411 static DATA: &[u8] = b"line1\nline2\nline3\n";
412 for &cs in CHUNK_SIZES {
413 let (store, path) = make_chunked_store(DATA, cs).await;
414 let s = AlignedBoundaryStream::new(store, path, 0, 100, 18, b'\n')
415 .await
416 .unwrap();
417 assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
418 }
419 }
420
421 #[tokio::test]
422 async fn test_start_aligned_on_newline() {
423 // Data: "line1\nline2\nline3\n"
424 // 0 5 6 11 12 17
425 // start=6 → fetch_start=5. Byte at offset 5 is '\n'.
426 // Should skip the leading '\n' and yield "line2\nline3\n".
427 static DATA: &[u8] = b"line1\nline2\nline3\n";
428 for &cs in CHUNK_SIZES {
429 let (store, path) = make_chunked_store(DATA, cs).await;
430 let s = AlignedBoundaryStream::new(store, path, 6, 100, 18, b'\n')
431 .await
432 .unwrap();
433 assert_eq!(
434 collect_stream(s).await,
435 b"line2\nline3\n",
436 "chunk_size={cs}"
437 );
438 }
439 }
440
441 #[tokio::test]
442 async fn test_start_mid_line() {
443 // start=3, fetch_start=2. Bytes from offset 2: "ne1\nline2\nline3\n".
444 // Should skip "ne1\n" and yield "line2\nline3\n".
445 static DATA: &[u8] = b"line1\nline2\nline3\n";
446 for &cs in CHUNK_SIZES {
447 let (store, path) = make_chunked_store(DATA, cs).await;
448 let s = AlignedBoundaryStream::new(store, path, 3, 100, 18, b'\n')
449 .await
450 .unwrap();
451 assert_eq!(
452 collect_stream(s).await,
453 b"line2\nline3\n",
454 "chunk_size={cs}"
455 );
456 }
457 }
458
459 #[tokio::test]
460 async fn test_end_boundary_mid_line() {
461 // Data: "line1\nline2\nline3\n"
462 // 0 5 6 11 12 17
463 // start=0, end=8. End is mid "line2".
464 // Should yield "line1\nline2\n" (continue past end to find newline).
465 static DATA: &[u8] = b"line1\nline2\nline3\n";
466 for &cs in CHUNK_SIZES {
467 let (store, path) = make_chunked_store(DATA, cs).await;
468 let s = AlignedBoundaryStream::new(store, path, 0, 8, 18, b'\n')
469 .await
470 .unwrap();
471 assert_eq!(
472 collect_stream(s).await,
473 b"line1\nline2\n",
474 "chunk_size={cs}"
475 );
476 }
477 }
478
479 #[tokio::test]
480 async fn test_end_at_eof() {
481 // end >= file_size → no end scanning, pass through everything.
482 static DATA: &[u8] = b"line1\nline2\n";
483 for &cs in CHUNK_SIZES {
484 let (store, path) = make_chunked_store(DATA, cs).await;
485 let s = AlignedBoundaryStream::new(store, path, 0, 12, 12, b'\n')
486 .await
487 .unwrap();
488 assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
489 }
490 }
491
492 #[tokio::test]
493 async fn test_no_newline_in_range() {
494 // start=2, fetch_start=1. Bytes from offset 1: "bcdef" — no newline.
495 // No complete line → empty output.
496 static DATA: &[u8] = b"abcdef";
497 for &cs in CHUNK_SIZES {
498 let (store, path) = make_chunked_store(DATA, cs).await;
499 let s = AlignedBoundaryStream::new(store, path, 2, 6, 6, b'\n')
500 .await
501 .unwrap();
502 assert!(collect_stream(s).await.is_empty(), "chunk_size={cs}");
503 }
504 }
505
506 #[tokio::test]
507 async fn test_start_and_end_alignment() {
508 // Data: "line1\nline2\nline3\nline4\n"
509 // 0 5 6 11 12 17 18 23
510 // start=3, end=14, file_size=24
511 // fetch_start=2, bytes from offset 2: "ne1\nline2\nline3\nline4\n"
512 // Start aligns past "ne1\n"; end=14 is mid "line3", scan to '\n'.
513 // Expected: "line2\nline3\n"
514 static DATA: &[u8] = b"line1\nline2\nline3\nline4\n";
515 for &cs in CHUNK_SIZES {
516 let (store, path) = make_chunked_store(DATA, cs).await;
517 let s = AlignedBoundaryStream::new(store, path, 3, 14, 24, b'\n')
518 .await
519 .unwrap();
520 assert_eq!(
521 collect_stream(s).await,
522 b"line2\nline3\n",
523 "chunk_size={cs}"
524 );
525 }
526 }
527
528 #[tokio::test]
529 async fn test_end_scan_across_chunks() {
530 // end boundary falls before a newline; the terminating newline must be
531 // found by scanning past the end in subsequent chunks.
532 // Data: "line1\nline2\nline3\n" (18 bytes)
533 // start=0, end=7 (mid "line2"), file_size=18 → "line1\nline2\n"
534 static DATA: &[u8] = b"line1\nline2\nline3\n";
535 for &cs in CHUNK_SIZES {
536 let (store, path) = make_chunked_store(DATA, cs).await;
537 let s = AlignedBoundaryStream::new(store, path, 0, 7, 18, b'\n')
538 .await
539 .unwrap();
540 assert_eq!(
541 collect_stream(s).await,
542 b"line1\nline2\n",
543 "chunk_size={cs}"
544 );
545 }
546 }
547
548 #[tokio::test]
549 async fn test_empty_range() {
550 // start >= end — no complete line can exist, regardless of data.
551 static DATA: &[u8] = b"line1\nline2\n";
552 for &cs in CHUNK_SIZES {
553 let (store, path) = make_chunked_store(DATA, cs).await;
554
555 // start > end (non-zero start)
556 let s = AlignedBoundaryStream::new(
557 Arc::clone(&store),
558 path.clone(),
559 10,
560 5,
561 20,
562 b'\n',
563 )
564 .await
565 .unwrap();
566 assert!(
567 collect_stream(s).await.is_empty(),
568 "start>end chunk_size={cs}"
569 );
570
571 // start == end == 0 (zero start, previously unguarded)
572 let s = AlignedBoundaryStream::new(
573 Arc::clone(&store),
574 path.clone(),
575 0,
576 0,
577 12,
578 b'\n',
579 )
580 .await
581 .unwrap();
582 assert!(
583 collect_stream(s).await.is_empty(),
584 "start==end==0 chunk_size={cs}"
585 );
586
587 // start == end (non-zero)
588 let s = AlignedBoundaryStream::new(
589 Arc::clone(&store),
590 path.clone(),
591 6,
592 6,
593 12,
594 b'\n',
595 )
596 .await
597 .unwrap();
598 assert!(
599 collect_stream(s).await.is_empty(),
600 "start==end==6 chunk_size={cs}"
601 );
602 }
603 }
604
605 #[tokio::test]
606 async fn test_start_align_across_chunks() {
607 // The newline needed for start alignment may arrive in any chunk.
608 // fetch_start=0 (start=1). Data: "abcdef\nline2\n" (13 bytes)
609 // Start aligns past "abcdef\n", yielding "line2\n".
610 static DATA: &[u8] = b"abcdef\nline2\n";
611 for &cs in CHUNK_SIZES {
612 let (store, path) = make_chunked_store(DATA, cs).await;
613 let s = AlignedBoundaryStream::new(store, path, 1, 100, 13, b'\n')
614 .await
615 .unwrap();
616 assert_eq!(collect_stream(s).await, b"line2\n", "chunk_size={cs}");
617 }
618 }
619
620 #[tokio::test]
621 async fn test_end_aligned_on_newline() {
622 // end falls right on a newline — line is complete, no end-scan needed.
623 // Data: "line1\nline2\nline3\n"
624 // 0 5 6 11 12 17
625 // start=0, end=6 → byte 5 is '\n' → yield only "line1\n".
626 static DATA: &[u8] = b"line1\nline2\nline3\n";
627 for &cs in CHUNK_SIZES {
628 let (store, path) = make_chunked_store(DATA, cs).await;
629 let s = AlignedBoundaryStream::new(store, path, 0, 6, 18, b'\n')
630 .await
631 .unwrap();
632 assert_eq!(collect_stream(s).await, b"line1\n", "chunk_size={cs}");
633 }
634 }
635
636 #[tokio::test]
637 async fn test_adjacent_partitions_no_overlap() {
638 // Three adjacent partitions over "line1\nline2\nline3\n".
639 // Partition 1: [0, 6), fetch_start=0 → stream full file
640 // Partition 2: [6, 12), fetch_start=5 → stream from offset 5
641 // Partition 3: [12, 18), fetch_start=11 → stream from offset 11
642 static DATA: &[u8] = b"line1\nline2\nline3\n"; // 18 bytes
643
644 for &cs in CHUNK_SIZES {
645 let (store, path) = make_chunked_store(DATA, cs).await;
646 let r1 = collect_stream(
647 AlignedBoundaryStream::new(
648 Arc::clone(&store),
649 path.clone(),
650 0,
651 6,
652 18,
653 b'\n',
654 )
655 .await
656 .unwrap(),
657 )
658 .await;
659 let r2 = collect_stream(
660 AlignedBoundaryStream::new(
661 Arc::clone(&store),
662 path.clone(),
663 6,
664 12,
665 18,
666 b'\n',
667 )
668 .await
669 .unwrap(),
670 )
671 .await;
672 let r3 = collect_stream(
673 AlignedBoundaryStream::new(
674 Arc::clone(&store),
675 path.clone(),
676 12,
677 18,
678 18,
679 b'\n',
680 )
681 .await
682 .unwrap(),
683 )
684 .await;
685
686 assert_eq!(r1, b"line1\n", "p1 chunk_size={cs}");
687 assert_eq!(r2, b"line2\n", "p2 chunk_size={cs}");
688 assert_eq!(r3, b"line3\n", "p3 chunk_size={cs}");
689
690 let mut combined = r1;
691 combined.extend(r2);
692 combined.extend(r3);
693 assert_eq!(combined, DATA, "combined chunk_size={cs}");
694 }
695 }
696
697 #[tokio::test]
698 async fn test_start_align_past_end_returns_empty() {
699 // The first aligned start lands at or past the end boundary.
700 // Data: "abcdefghij\nkl\n" (14 bytes)
701 // 0 10 11 13
702 // Partition [3, 6): start=3, end=6, fetch_start=2
703 // Bytes from offset 2: "cdefghij\nkl\n". First '\n' at offset 10;
704 // aligned start = 11, which is >= end = 6 → empty.
705 static DATA: &[u8] = b"abcdefghij\nkl\n";
706 for &cs in CHUNK_SIZES {
707 let (store, path) = make_chunked_store(DATA, cs).await;
708 let s = AlignedBoundaryStream::new(store, path, 3, 6, 14, b'\n')
709 .await
710 .unwrap();
711 assert!(collect_stream(s).await.is_empty(), "chunk_size={cs}");
712 }
713 }
714
715 #[tokio::test]
716 async fn test_unaligned_partitions_no_overlap() {
717 // Partitions that don't fall on line boundaries.
718 // Data: "aaa\nbbb\nccc\n" (12 bytes)
719 // 0 3 4 7 8 11
720 // Partitions: [0, 5), [5, 10), [10, 12)
721 static DATA: &[u8] = b"aaa\nbbb\nccc\n"; // 12 bytes
722
723 for &cs in CHUNK_SIZES {
724 let (store, path) = make_chunked_store(DATA, cs).await;
725
726 // [0, 5): no start alignment; end=5 mid "bbb", scans to '\n' at 7.
727 let r1 = collect_stream(
728 AlignedBoundaryStream::new(
729 Arc::clone(&store),
730 path.clone(),
731 0,
732 5,
733 12,
734 b'\n',
735 )
736 .await
737 .unwrap(),
738 )
739 .await;
740
741 // [5, 10): fetch_start=4, bytes from offset 4: "bbb\nccc\n".
742 // '\n' at pos 3 → aligned start=8 ("ccc\n"). End=10 mid "ccc",
743 // scans to '\n' at 11 → yields "ccc\n".
744 let r2 = collect_stream(
745 AlignedBoundaryStream::new(
746 Arc::clone(&store),
747 path.clone(),
748 5,
749 10,
750 12,
751 b'\n',
752 )
753 .await
754 .unwrap(),
755 )
756 .await;
757
758 // [10, 12): fetch_start=9, bytes from offset 9: "cc\n".
759 // '\n' at pos 2 → aligned start=12. end=12==file_size → end=MAX.
760 // Remainder after '\n' is empty; Passthrough polls inner → Done.
761 let r3 = collect_stream(
762 AlignedBoundaryStream::new(
763 Arc::clone(&store),
764 path.clone(),
765 10,
766 12,
767 12,
768 b'\n',
769 )
770 .await
771 .unwrap(),
772 )
773 .await;
774
775 assert_eq!(r1, b"aaa\nbbb\n", "p1 chunk_size={cs}");
776 assert_eq!(r2, b"ccc\n", "p2 chunk_size={cs}");
777 assert!(r3.is_empty(), "p3 chunk_size={cs}");
778
779 let mut combined = r1;
780 combined.extend(r2);
781 combined.extend(r3);
782 assert_eq!(combined, DATA, "combined chunk_size={cs}");
783 }
784 }
785
786 #[tokio::test]
787 async fn test_no_trailing_newline() {
788 // Last partition of a file that does not end with a newline.
789 // end >= file_size → this.end = u64::MAX, so Passthrough streams straight
790 // until EOF is reached and yields the final incomplete line as-is.
791 static DATA: &[u8] = b"line1\nline2"; // 11 bytes, no trailing '\n'
792 for &cs in CHUNK_SIZES {
793 let (store, path) = make_chunked_store(DATA, cs).await;
794
795 // Single partition covering the whole file.
796 let s = AlignedBoundaryStream::new(
797 Arc::clone(&store),
798 path.clone(),
799 0,
800 11,
801 11,
802 b'\n',
803 )
804 .await
805 .unwrap();
806 assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
807
808 // Last partition starting mid-file (start=6, fetch_start=5).
809 // Bytes from offset 5: "\nline2".
810 // StartAlign consumes '\n', remainder "line2" is yielded as-is.
811 let s = AlignedBoundaryStream::new(
812 Arc::clone(&store),
813 path.clone(),
814 6,
815 11,
816 11,
817 b'\n',
818 )
819 .await
820 .unwrap();
821 assert_eq!(collect_stream(s).await, b"line2", "tail chunk_size={cs}");
822 }
823 }
824
825 #[tokio::test]
826 async fn test_overflow_fetch() {
827 // First line is longer than 2 * END_SCAN_LOOKAHEAD so the initial
828 // bounded fetch [fetch_start, raw_end + END_SCAN_LOOKAHEAD) does not
829 // reach its newline. ScanningLastTerminator must issue overflow GETs
830 // to find it.
831 //
832 // Partition [0, 1): raw_end=1, initial_fetch_end=1+16384=16385.
833 // The newline is at byte 32768 > 16385 → one overflow GET required.
834 // Partition [1, file_size): start=1 lands mid line-1; ScanningFirstTerminator
835 // skips to byte 32769, then yields "line2\nline3\n".
836 let long_line: Vec<u8> =
837 std::iter::repeat_n(b'A', 2 * END_SCAN_LOOKAHEAD as usize)
838 .chain(std::iter::once(b'\n'))
839 .collect();
840 let rest = b"line2\nline3\n";
841 let mut data = long_line.clone();
842 data.extend_from_slice(rest);
843 let file_size = data.len() as u64;
844
845 for &cs in CHUNK_SIZES {
846 let (store, path) = make_chunked_store(&data, cs).await;
847
848 let r1 = collect_stream(
849 AlignedBoundaryStream::new(
850 Arc::clone(&store),
851 path.clone(),
852 0,
853 1,
854 file_size,
855 b'\n',
856 )
857 .await
858 .unwrap(),
859 )
860 .await;
861
862 let r2 = collect_stream(
863 AlignedBoundaryStream::new(
864 Arc::clone(&store),
865 path.clone(),
866 1,
867 file_size,
868 file_size,
869 b'\n',
870 )
871 .await
872 .unwrap(),
873 )
874 .await;
875
876 assert_eq!(r1, long_line, "p1 chunk_size={cs}");
877 assert_eq!(r2, rest.as_slice(), "p2 chunk_size={cs}");
878
879 let mut combined = r1;
880 combined.extend(r2);
881 assert_eq!(combined, data, "combined chunk_size={cs}");
882 }
883 }
884}