Skip to main content

flowlog_runtime/
io.rs

1//! I/O and partition helpers used by the generated engine code.
2//!
3//! - [`partition`] — split an owned `Vec` into per-worker slices for the
4//!   library-mode batch engine's ingest path.
5//! - [`byte_range_reader`] — split a CSV file across timely workers so each
6//!   reads its own byte slice (binary mode).
7//! - [`shard_int`] / [`shard_str`] / [`shard_spur`] — pick the owning
8//!   worker for a tuple based on its first column (binary mode).
9//!
10//! # Byte-range reader example
11//!
12//! ```ignore
13//! if let Some((reader, budget)) = byte_range_reader(path, index, peers) {
14//!     let mut buf = Vec::new();
15//!     let mut consumed = 0u64;
16//!     while consumed < budget {
17//!         buf.clear();
18//!         let n = reader.read_until(b'\n', &mut buf).unwrap_or(0);
19//!         if n == 0 { break; }
20//!         consumed += n as u64;
21//!         // parse &buf …
22//!     }
23//! }
24//! ```
25
26use std::fs::File;
27use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
28use std::path::Path;
29
30// =========================================================================
31// Per-worker partitioning
32// =========================================================================
33
34/// Split `v` into `n` roughly-equal owned partitions, in order.
35///
36/// Used by the generated library-mode engine to hand each timely worker its
37/// own slice by value — no `Arc` sharing, no per-tuple clone, each tuple
38/// moves directly into the worker's `InputSession`.
39///
40/// `n.max(1)` partitions are produced; if `v.len() < n` some partitions
41/// are empty. The last partition absorbs any remainder when the division
42/// doesn't come out evenly.
43pub fn partition<T>(v: Vec<T>, n: usize) -> Vec<Vec<T>> {
44    let n = n.max(1);
45    let chunk = v.len() / n;
46    let mut iter = v.into_iter();
47    (0..n)
48        .map(|i| {
49            let take = if i + 1 == n { iter.len() } else { chunk };
50            iter.by_ref().take(take).collect()
51        })
52        .collect()
53}
54
55// =========================================================================
56// Byte-range file reader
57// =========================================================================
58
59/// Open a byte-range slice of `path` for worker `index` out of `peers`.
60///
61/// Returns `Some((reader, bytes_to_read))` on success. The reader is
62/// pre-seeked to the start of the worker's range (aligned to the next
63/// line boundary for non-zero workers). The caller should read up to
64/// `bytes_to_read` bytes, stopping at the first complete line beyond
65/// that budget.
66///
67/// Returns `None` on I/O error (logged to stderr).
68pub fn byte_range_reader(
69    path: &Path,
70    index: usize,
71    peers: usize,
72) -> Option<(BufReader<File>, u64)> {
73    let mut file = File::open(path)
74        .inspect_err(|e| {
75            eprintln!(
76                "[flowlog-runtime::io] failed to open {}: {e}",
77                path.display()
78            );
79        })
80        .ok()?;
81
82    let file_size = file
83        .metadata()
84        .inspect_err(|e| {
85            eprintln!(
86                "[flowlog-runtime::io] failed to stat {}: {e}",
87                path.display()
88            );
89        })
90        .ok()?
91        .len();
92
93    let chunk = file_size / peers as u64;
94    let start = chunk * index as u64;
95    let end = if index == peers - 1 {
96        file_size
97    } else {
98        chunk * (index + 1) as u64
99    };
100
101    // Nothing to read for this worker.
102    if start >= end {
103        return Some((BufReader::new(file), 0));
104    }
105
106    // Any worker whose range begins at byte 0 reads from the start with no
107    // alignment skip — there's no previous byte to peek at. Worker 0 always
108    // hits this; others hit it when `chunk == 0` (peers > file_size), which
109    // puts the whole file on the last worker.
110    if start == 0 {
111        return Some((BufReader::new(file), end));
112    }
113
114    // Non-zero start: seek to `start - 1` and peek the byte just before our
115    // range. If it's a newline we're on a line boundary; otherwise skip the
116    // rest of the partial line.
117    if file.seek(SeekFrom::Start(start - 1)).is_err() {
118        return Some((BufReader::new(file), 0));
119    }
120
121    let mut reader = BufReader::new(file);
122    let mut peek = [0u8; 1];
123    if reader.read_exact(&mut peek).is_err() {
124        return Some((reader, 0));
125    }
126
127    if peek[0] == b'\n' {
128        // Exactly on a line boundary.
129        return Some((reader, end - start));
130    }
131
132    // Mid-line: skip the rest of this partial line.
133    let mut discard = Vec::new();
134    let skipped = reader.read_until(b'\n', &mut discard).unwrap_or(0);
135    Some((reader, (end - start).saturating_sub(skipped as u64)))
136}
137
138// =========================================================================
139// First-column sharding
140// =========================================================================
141
142/// Shard an integer-typed first column across `peers` workers.
143///
144/// Returns `true` if worker `index` should own this tuple.
145#[inline]
146pub fn shard_int(first: i64, peers: usize, index: usize) -> bool {
147    first.rem_euclid(peers as i64) as usize == index
148}
149
150/// Shard a string-typed first column across `peers` workers.
151///
152/// Uses a 32-bit FNV-1a hash to distribute strings uniformly.
153#[inline]
154pub fn shard_str(first: &str, peers: usize, index: usize) -> bool {
155    let mut hash: u32 = 0x811c9dc5;
156    for &b in first.as_bytes() {
157        hash ^= b as u32;
158        hash = hash.wrapping_mul(0x01000193);
159    }
160    (hash as usize) % peers == index
161}
162
163/// Shard an interned-string first column ([`lasso::Spur`]) across `peers`.
164#[inline]
165pub fn shard_spur(first: lasso::Spur, peers: usize, index: usize) -> bool {
166    (first.into_inner().get() as usize) % peers == index
167}