Skip to main content

flowlog_runtime/
io.rs

1//! I/O and partition helpers used by the generated engine code.
2//!
3//! - [`partition`] — split an owned `Vec` into per-worker slices for the
4//!   library-mode batch engine's ingest path.
5//! - [`byte_range_reader`] — split a CSV file across timely workers so each
6//!   reads its own byte slice (binary mode).
7//! - [`shard_int`] / [`shard_str`] / [`shard_spur`] — pick the owning
8//!   worker for a tuple based on its first column (binary mode).
9//!
10//! # Byte-range reader example
11//!
12//! ```ignore
13//! if let Some((reader, budget)) = byte_range_reader(path, index, peers) {
14//!     let mut buf = Vec::new();
15//!     let mut consumed = 0u64;
16//!     while consumed < budget {
17//!         buf.clear();
18//!         let n = reader.read_until(b'\n', &mut buf).unwrap_or(0);
19//!         if n == 0 { break; }
20//!         consumed += n as u64;
21//!         // parse &buf …
22//!     }
23//! }
24//! ```
25
26use std::fs::File;
27use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
28use std::path::Path;
29
30// =========================================================================
31// Per-worker partitioning
32// =========================================================================
33
34/// Split `v` into `n` roughly-equal owned partitions, in order.
35///
36/// Used by the generated library-mode engine to hand each timely worker its
37/// own slice by value — no `Arc` sharing, no per-tuple clone, each tuple
38/// moves directly into the worker's `InputSession`.
39///
40/// `n.max(1)` partitions are produced; if `v.len() < n` some partitions
41/// are empty. The last partition absorbs any remainder when the division
42/// doesn't come out evenly.
43pub fn partition<T>(v: Vec<T>, n: usize) -> Vec<Vec<T>> {
44    let n = n.max(1);
45    let total = v.len();
46    let chunk = total / n;
47    let mut iter = v.into_iter();
48    (0..n)
49        .map(|i| {
50            let take = if i + 1 == n { iter.len() } else { chunk };
51            iter.by_ref().take(take).collect()
52        })
53        .collect()
54}
55
56// =========================================================================
57// Byte-range file reader
58// =========================================================================
59
60/// Open a byte-range slice of `path` for worker `index` out of `peers`.
61///
62/// Returns `Some((reader, bytes_to_read))` on success. The reader is
63/// pre-seeked to the start of the worker's range (aligned to the next
64/// line boundary for non-zero workers). The caller should read up to
65/// `bytes_to_read` bytes, stopping at the first complete line beyond
66/// that budget.
67///
68/// Returns `None` on I/O error (logged to stderr).
69pub fn byte_range_reader(
70    path: &Path,
71    index: usize,
72    peers: usize,
73) -> Option<(BufReader<File>, u64)> {
74    let mut file = match File::open(path) {
75        Ok(f) => f,
76        Err(e) => {
77            eprintln!(
78                "[flowlog-runtime::io] failed to open {}: {e}",
79                path.display()
80            );
81            return None;
82        }
83    };
84
85    let file_size = match file.metadata() {
86        Ok(m) => m.len(),
87        Err(e) => {
88            eprintln!(
89                "[flowlog-runtime::io] failed to stat {}: {e}",
90                path.display()
91            );
92            return None;
93        }
94    };
95
96    let chunk = file_size / peers as u64;
97    let start = chunk * index as u64;
98    let end = if index == peers - 1 {
99        file_size
100    } else {
101        chunk * (index + 1) as u64
102    };
103
104    // Nothing to read for this worker.
105    if start >= end {
106        return Some((BufReader::new(file), 0));
107    }
108
109    // Any worker whose range begins at byte 0 reads from the start with no
110    // alignment skip — there's no previous byte to peek at. Worker 0 always
111    // hits this; others hit it when `chunk == 0` (peers > file_size), which
112    // puts the whole file on the last worker.
113    if start == 0 {
114        return Some((BufReader::new(file), end));
115    }
116
117    // Non-zero start: seek to `start - 1` and peek the byte just before our
118    // range. If it's a newline we're on a line boundary; otherwise skip the
119    // rest of the partial line.
120    if file.seek(SeekFrom::Start(start - 1)).is_err() {
121        return Some((BufReader::new(file), 0));
122    }
123
124    let mut reader = BufReader::new(file);
125    let mut peek = [0u8; 1];
126    if reader.read_exact(&mut peek).is_err() {
127        return Some((reader, 0));
128    }
129
130    if peek[0] == b'\n' {
131        // Exactly on a line boundary.
132        return Some((reader, end - start));
133    }
134
135    // Mid-line: skip the rest of this partial line.
136    let mut discard = Vec::new();
137    let skipped = reader.read_until(b'\n', &mut discard).unwrap_or(0);
138    Some((reader, (end - start).saturating_sub(skipped as u64)))
139}
140
141// =========================================================================
142// First-column sharding
143// =========================================================================
144
145/// Shard an integer-typed first column across `peers` workers.
146///
147/// Returns `true` if worker `index` should own this tuple.
148#[inline]
149pub fn shard_int(first: i64, peers: usize, index: usize) -> bool {
150    first.rem_euclid(peers as i64) as usize == index
151}
152
153/// Shard a string-typed first column across `peers` workers.
154///
155/// Uses a 32-bit FNV-1a hash to distribute strings uniformly.
156#[inline]
157pub fn shard_str(first: &str, peers: usize, index: usize) -> bool {
158    let mut hash: u32 = 0x811c9dc5;
159    for &b in first.as_bytes() {
160        hash ^= b as u32;
161        hash = hash.wrapping_mul(0x01000193);
162    }
163    (hash as usize) % peers == index
164}
165
166/// Shard an interned-string first column ([`lasso::Spur`]) across `peers`.
167#[inline]
168pub fn shard_spur(first: lasso::Spur, peers: usize, index: usize) -> bool {
169    (first.into_inner().get() as usize) % peers == index
170}