use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::stream::{BoxStream, Stream};
use futures::{StreamExt, TryFutureExt};
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
pub const END_SCAN_LOOKAHEAD: u64 = 16 * 1024;
#[derive(Debug)]
enum Phase {
ScanningFirstTerminator,
FetchingChunks,
ScanningLastTerminator,
Done,
}
pub struct AlignedBoundaryStream {
inner: BoxStream<'static, object_store::Result<Bytes>>,
terminator: u8,
end: u64,
bytes_consumed: u64,
fetch_start: u64,
phase: Phase,
pending: Option<Bytes>,
store: Arc<dyn ObjectStore>,
location: object_store::path::Path,
file_size: u64,
}
async fn get_stream(
store: Arc<dyn ObjectStore>,
location: object_store::path::Path,
range: std::ops::Range<u64>,
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
let opts = GetOptions {
range: Some(GetRange::Bounded(range.clone())),
..Default::default()
};
let result = store.get_opts(&location, opts).await?;
#[cfg(not(target_arch = "wasm32"))]
if let GetResultPayload::File(mut file, _path) = result.payload {
use std::io::{Read, Seek, SeekFrom};
const CHUNK_SIZE: u64 = 8 * 1024;
file.seek(SeekFrom::Start(range.start)).map_err(|e| {
object_store::Error::Generic {
store: "local",
source: Box::new(e),
}
})?;
return Ok(futures::stream::try_unfold(
(file, range.end - range.start),
move |(mut file, remaining)| async move {
if remaining == 0 {
return Ok(None);
}
let to_read = remaining.min(CHUNK_SIZE);
let cap = usize::try_from(to_read).map_err(|e| {
object_store::Error::Generic {
store: "local",
source: Box::new(e),
}
})?;
let mut buf = Vec::with_capacity(cap);
let read =
(&mut file)
.take(to_read)
.read_to_end(&mut buf)
.map_err(|e| object_store::Error::Generic {
store: "local",
source: Box::new(e),
})?;
Ok(Some((Bytes::from(buf), (file, remaining - read as u64))))
},
)
.boxed());
}
Ok(result.into_stream())
}
impl AlignedBoundaryStream {
pub async fn new(
store: Arc<dyn ObjectStore>,
location: object_store::path::Path,
raw_start: u64,
raw_end: u64,
file_size: u64,
terminator: u8,
) -> object_store::Result<Self> {
if raw_start >= raw_end || raw_start >= file_size {
return Ok(Self {
inner: futures::stream::empty().boxed(),
terminator,
end: 0,
bytes_consumed: 0,
fetch_start: 0,
phase: Phase::Done,
pending: None,
store,
location,
file_size,
});
}
let (fetch_start, phase) = if raw_start == 0 {
(0, Phase::FetchingChunks)
} else {
(raw_start - 1, Phase::ScanningFirstTerminator)
};
let initial_fetch_end = raw_end.saturating_add(END_SCAN_LOOKAHEAD).min(file_size);
let inner = get_stream(
Arc::clone(&store),
location.clone(),
fetch_start..initial_fetch_end,
)
.await?;
let end = if raw_end >= file_size {
u64::MAX
} else {
raw_end
};
Ok(Self {
inner,
terminator,
end,
bytes_consumed: 0,
fetch_start,
phase,
pending: None,
store,
location,
file_size,
})
}
fn abs_pos(&self) -> u64 {
self.fetch_start + self.bytes_consumed
}
}
impl Stream for AlignedBoundaryStream {
type Item = object_store::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
match this.phase {
Phase::Done => return Poll::Ready(None),
Phase::ScanningFirstTerminator => {
match this.inner.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
this.phase = Phase::Done;
return Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => {
this.phase = Phase::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Some(Ok(chunk))) => {
this.bytes_consumed += chunk.len() as u64;
match chunk.iter().position(|&b| b == this.terminator) {
Some(pos) => {
let remainder = chunk.slice((pos + 1)..);
let aligned_start =
this.abs_pos() - remainder.len() as u64;
if aligned_start >= this.end {
this.phase = Phase::Done;
return Poll::Ready(None);
}
if !remainder.is_empty() {
this.pending = Some(remainder);
}
this.phase = Phase::FetchingChunks;
continue;
}
None => continue,
}
}
}
}
Phase::FetchingChunks => {
let chunk = if let Some(pending) = this.pending.take() {
pending
} else {
match this.inner.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
this.phase = Phase::Done;
return Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => {
this.phase = Phase::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Some(Ok(chunk))) => {
this.bytes_consumed += chunk.len() as u64;
chunk
}
}
};
let pos_after = this.abs_pos();
if pos_after < this.end {
return Poll::Ready(Some(Ok(chunk)));
}
if pos_after == this.end {
if chunk.last() == Some(&this.terminator) {
this.phase = Phase::Done;
} else {
this.phase = Phase::ScanningLastTerminator;
}
return Poll::Ready(Some(Ok(chunk)));
}
let pos_before = pos_after - chunk.len() as u64;
let chunk_in_range_len = (this.end - pos_before) as usize;
let search_from = chunk_in_range_len - 1;
if let Some(rel) = chunk[search_from..]
.iter()
.position(|&b| b == this.terminator)
{
this.phase = Phase::Done;
return Poll::Ready(Some(Ok(
chunk.slice(..search_from + rel + 1)
)));
}
this.phase = Phase::ScanningLastTerminator;
return Poll::Ready(Some(Ok(chunk)));
}
Phase::ScanningLastTerminator => {
match this.inner.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
let pos = this.abs_pos();
if pos < this.file_size {
let fetch_end = pos
.saturating_add(END_SCAN_LOOKAHEAD)
.min(this.file_size);
let store = Arc::clone(&this.store);
let location = this.location.clone();
this.inner = get_stream(store, location, pos..fetch_end)
.try_flatten_stream()
.boxed();
continue;
}
this.phase = Phase::Done;
return Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => {
this.phase = Phase::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Some(Ok(chunk))) => {
this.bytes_consumed += chunk.len() as u64;
if let Some(pos) =
chunk.iter().position(|&b| b == this.terminator)
{
this.phase = Phase::Done;
return Poll::Ready(Some(Ok(chunk.slice(..pos + 1))));
}
return Poll::Ready(Some(Ok(chunk)));
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{CHUNK_SIZES, make_chunked_store};
use futures::TryStreamExt;
async fn collect_stream(stream: AlignedBoundaryStream) -> Vec<u8> {
stream.try_collect::<Vec<Bytes>>().await.unwrap().concat()
}
#[tokio::test]
async fn test_start_at_zero_no_end_scan() {
static DATA: &[u8] = b"line1\nline2\nline3\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 0, 100, 18, b'\n')
.await
.unwrap();
assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
}
}
#[tokio::test]
async fn test_start_aligned_on_newline() {
static DATA: &[u8] = b"line1\nline2\nline3\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 6, 100, 18, b'\n')
.await
.unwrap();
assert_eq!(
collect_stream(s).await,
b"line2\nline3\n",
"chunk_size={cs}"
);
}
}
#[tokio::test]
async fn test_start_mid_line() {
static DATA: &[u8] = b"line1\nline2\nline3\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 3, 100, 18, b'\n')
.await
.unwrap();
assert_eq!(
collect_stream(s).await,
b"line2\nline3\n",
"chunk_size={cs}"
);
}
}
#[tokio::test]
async fn test_end_boundary_mid_line() {
static DATA: &[u8] = b"line1\nline2\nline3\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 0, 8, 18, b'\n')
.await
.unwrap();
assert_eq!(
collect_stream(s).await,
b"line1\nline2\n",
"chunk_size={cs}"
);
}
}
#[tokio::test]
async fn test_end_at_eof() {
static DATA: &[u8] = b"line1\nline2\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 0, 12, 12, b'\n')
.await
.unwrap();
assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
}
}
#[tokio::test]
async fn test_no_newline_in_range() {
static DATA: &[u8] = b"abcdef";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 2, 6, 6, b'\n')
.await
.unwrap();
assert!(collect_stream(s).await.is_empty(), "chunk_size={cs}");
}
}
#[tokio::test]
async fn test_start_and_end_alignment() {
static DATA: &[u8] = b"line1\nline2\nline3\nline4\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 3, 14, 24, b'\n')
.await
.unwrap();
assert_eq!(
collect_stream(s).await,
b"line2\nline3\n",
"chunk_size={cs}"
);
}
}
#[tokio::test]
async fn test_end_scan_across_chunks() {
static DATA: &[u8] = b"line1\nline2\nline3\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 0, 7, 18, b'\n')
.await
.unwrap();
assert_eq!(
collect_stream(s).await,
b"line1\nline2\n",
"chunk_size={cs}"
);
}
}
#[tokio::test]
async fn test_empty_range() {
static DATA: &[u8] = b"line1\nline2\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
10,
5,
20,
b'\n',
)
.await
.unwrap();
assert!(
collect_stream(s).await.is_empty(),
"start>end chunk_size={cs}"
);
let s = AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
0,
0,
12,
b'\n',
)
.await
.unwrap();
assert!(
collect_stream(s).await.is_empty(),
"start==end==0 chunk_size={cs}"
);
let s = AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
6,
6,
12,
b'\n',
)
.await
.unwrap();
assert!(
collect_stream(s).await.is_empty(),
"start==end==6 chunk_size={cs}"
);
}
}
#[tokio::test]
async fn test_start_align_across_chunks() {
static DATA: &[u8] = b"abcdef\nline2\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 1, 100, 13, b'\n')
.await
.unwrap();
assert_eq!(collect_stream(s).await, b"line2\n", "chunk_size={cs}");
}
}
#[tokio::test]
async fn test_end_aligned_on_newline() {
static DATA: &[u8] = b"line1\nline2\nline3\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 0, 6, 18, b'\n')
.await
.unwrap();
assert_eq!(collect_stream(s).await, b"line1\n", "chunk_size={cs}");
}
}
#[tokio::test]
async fn test_adjacent_partitions_no_overlap() {
static DATA: &[u8] = b"line1\nline2\nline3\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let r1 = collect_stream(
AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
0,
6,
18,
b'\n',
)
.await
.unwrap(),
)
.await;
let r2 = collect_stream(
AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
6,
12,
18,
b'\n',
)
.await
.unwrap(),
)
.await;
let r3 = collect_stream(
AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
12,
18,
18,
b'\n',
)
.await
.unwrap(),
)
.await;
assert_eq!(r1, b"line1\n", "p1 chunk_size={cs}");
assert_eq!(r2, b"line2\n", "p2 chunk_size={cs}");
assert_eq!(r3, b"line3\n", "p3 chunk_size={cs}");
let mut combined = r1;
combined.extend(r2);
combined.extend(r3);
assert_eq!(combined, DATA, "combined chunk_size={cs}");
}
}
#[tokio::test]
async fn test_start_align_past_end_returns_empty() {
static DATA: &[u8] = b"abcdefghij\nkl\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(store, path, 3, 6, 14, b'\n')
.await
.unwrap();
assert!(collect_stream(s).await.is_empty(), "chunk_size={cs}");
}
}
#[tokio::test]
async fn test_unaligned_partitions_no_overlap() {
static DATA: &[u8] = b"aaa\nbbb\nccc\n";
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let r1 = collect_stream(
AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
0,
5,
12,
b'\n',
)
.await
.unwrap(),
)
.await;
let r2 = collect_stream(
AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
5,
10,
12,
b'\n',
)
.await
.unwrap(),
)
.await;
let r3 = collect_stream(
AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
10,
12,
12,
b'\n',
)
.await
.unwrap(),
)
.await;
assert_eq!(r1, b"aaa\nbbb\n", "p1 chunk_size={cs}");
assert_eq!(r2, b"ccc\n", "p2 chunk_size={cs}");
assert!(r3.is_empty(), "p3 chunk_size={cs}");
let mut combined = r1;
combined.extend(r2);
combined.extend(r3);
assert_eq!(combined, DATA, "combined chunk_size={cs}");
}
}
#[tokio::test]
async fn test_no_trailing_newline() {
static DATA: &[u8] = b"line1\nline2"; for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(DATA, cs).await;
let s = AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
0,
11,
11,
b'\n',
)
.await
.unwrap();
assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}");
let s = AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
6,
11,
11,
b'\n',
)
.await
.unwrap();
assert_eq!(collect_stream(s).await, b"line2", "tail chunk_size={cs}");
}
}
#[tokio::test]
async fn test_overflow_fetch() {
let long_line: Vec<u8> =
std::iter::repeat_n(b'A', 2 * END_SCAN_LOOKAHEAD as usize)
.chain(std::iter::once(b'\n'))
.collect();
let rest = b"line2\nline3\n";
let mut data = long_line.clone();
data.extend_from_slice(rest);
let file_size = data.len() as u64;
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(&data, cs).await;
let r1 = collect_stream(
AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
0,
1,
file_size,
b'\n',
)
.await
.unwrap(),
)
.await;
let r2 = collect_stream(
AlignedBoundaryStream::new(
Arc::clone(&store),
path.clone(),
1,
file_size,
file_size,
b'\n',
)
.await
.unwrap(),
)
.await;
assert_eq!(r1, long_line, "p1 chunk_size={cs}");
assert_eq!(r2, rest.as_slice(), "p2 chunk_size={cs}");
let mut combined = r1;
combined.extend(r2);
assert_eq!(combined, data, "combined chunk_size={cs}");
}
}
}