flowlog_runtime/io.rs
1//! I/O and partition helpers used by the generated engine code.
2//!
3//! - [`partition`] — split an owned `Vec` into per-worker slices for the
4//! library-mode batch engine's ingest path.
5//! - [`byte_range_reader`] — split a CSV file across timely workers so each
6//! reads its own byte slice (binary mode).
7//! - [`shard_int`] / [`shard_str`] / [`shard_spur`] — pick the owning
8//! worker for a tuple based on its first column (binary mode).
9//!
10//! # Byte-range reader example
11//!
12//! ```ignore
13//! if let Some((reader, budget)) = byte_range_reader(path, index, peers) {
14//! let mut buf = Vec::new();
15//! let mut consumed = 0u64;
16//! while consumed < budget {
17//! buf.clear();
18//! let n = reader.read_until(b'\n', &mut buf).unwrap_or(0);
19//! if n == 0 { break; }
20//! consumed += n as u64;
21//! // parse &buf …
22//! }
23//! }
24//! ```
25
26use std::fs::File;
27use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
28use std::path::Path;
29
30// =========================================================================
31// Per-worker partitioning
32// =========================================================================
33
34/// Split `v` into `n` roughly-equal owned partitions, in order.
35///
36/// Used by the generated library-mode engine to hand each timely worker its
37/// own slice by value — no `Arc` sharing, no per-tuple clone, each tuple
38/// moves directly into the worker's `InputSession`.
39///
40/// `n.max(1)` partitions are produced; if `v.len() < n` some partitions
41/// are empty. The last partition absorbs any remainder when the division
42/// doesn't come out evenly.
43pub fn partition<T>(v: Vec<T>, n: usize) -> Vec<Vec<T>> {
44 let n = n.max(1);
45 let total = v.len();
46 let chunk = total / n;
47 let mut iter = v.into_iter();
48 (0..n)
49 .map(|i| {
50 let take = if i + 1 == n { iter.len() } else { chunk };
51 iter.by_ref().take(take).collect()
52 })
53 .collect()
54}
55
56// =========================================================================
57// Byte-range file reader
58// =========================================================================
59
60/// Open a byte-range slice of `path` for worker `index` out of `peers`.
61///
62/// Returns `Some((reader, bytes_to_read))` on success. The reader is
63/// pre-seeked to the start of the worker's range (aligned to the next
64/// line boundary for non-zero workers). The caller should read up to
65/// `bytes_to_read` bytes, stopping at the first complete line beyond
66/// that budget.
67///
68/// Returns `None` on I/O error (logged to stderr).
69pub fn byte_range_reader(
70 path: &Path,
71 index: usize,
72 peers: usize,
73) -> Option<(BufReader<File>, u64)> {
74 let mut file = match File::open(path) {
75 Ok(f) => f,
76 Err(e) => {
77 eprintln!(
78 "[flowlog-runtime::io] failed to open {}: {e}",
79 path.display()
80 );
81 return None;
82 }
83 };
84
85 let file_size = match file.metadata() {
86 Ok(m) => m.len(),
87 Err(e) => {
88 eprintln!(
89 "[flowlog-runtime::io] failed to stat {}: {e}",
90 path.display()
91 );
92 return None;
93 }
94 };
95
96 let chunk = file_size / peers as u64;
97 let start = chunk * index as u64;
98 let end = if index == peers - 1 {
99 file_size
100 } else {
101 chunk * (index + 1) as u64
102 };
103
104 // Nothing to read for this worker.
105 if start >= end {
106 return Some((BufReader::new(file), 0));
107 }
108
109 // Any worker whose range begins at byte 0 reads from the start with no
110 // alignment skip — there's no previous byte to peek at. Worker 0 always
111 // hits this; others hit it when `chunk == 0` (peers > file_size), which
112 // puts the whole file on the last worker.
113 if start == 0 {
114 return Some((BufReader::new(file), end));
115 }
116
117 // Non-zero start: seek to `start - 1` and peek the byte just before our
118 // range. If it's a newline we're on a line boundary; otherwise skip the
119 // rest of the partial line.
120 if file.seek(SeekFrom::Start(start - 1)).is_err() {
121 return Some((BufReader::new(file), 0));
122 }
123
124 let mut reader = BufReader::new(file);
125 let mut peek = [0u8; 1];
126 if reader.read_exact(&mut peek).is_err() {
127 return Some((reader, 0));
128 }
129
130 if peek[0] == b'\n' {
131 // Exactly on a line boundary.
132 return Some((reader, end - start));
133 }
134
135 // Mid-line: skip the rest of this partial line.
136 let mut discard = Vec::new();
137 let skipped = reader.read_until(b'\n', &mut discard).unwrap_or(0);
138 Some((reader, (end - start).saturating_sub(skipped as u64)))
139}
140
141// =========================================================================
142// First-column sharding
143// =========================================================================
144
145/// Shard an integer-typed first column across `peers` workers.
146///
147/// Returns `true` if worker `index` should own this tuple.
148#[inline]
149pub fn shard_int(first: i64, peers: usize, index: usize) -> bool {
150 first.rem_euclid(peers as i64) as usize == index
151}
152
153/// Shard a string-typed first column across `peers` workers.
154///
155/// Uses a 32-bit FNV-1a hash to distribute strings uniformly.
156#[inline]
157pub fn shard_str(first: &str, peers: usize, index: usize) -> bool {
158 let mut hash: u32 = 0x811c9dc5;
159 for &b in first.as_bytes() {
160 hash ^= b as u32;
161 hash = hash.wrapping_mul(0x01000193);
162 }
163 (hash as usize) % peers == index
164}
165
166/// Shard an interned-string first column ([`lasso::Spur`]) across `peers`.
167#[inline]
168pub fn shard_spur(first: lasso::Spur, peers: usize, index: usize) -> bool {
169 (first.into_inner().get() as usize) % peers == index
170}