Skip to main content

flowlog_runtime/
io.rs

1//! I/O and partition helpers used by the generated engine code.
2//!
3//! - [`partition`] — split an owned `Vec` into per-worker slices for the
4//!   library-mode batch engine's ingest path.
5//! - [`byte_range_reader`] — split a CSV file across timely workers so each
6//!   reads its own byte slice (binary mode).
7//! - [`shard_int`] / [`shard_str`] / [`shard_spur`] — pick the owning
8//!   worker for a tuple based on its first column (binary mode).
9//!
10//! # Byte-range reader example
11//!
12//! ```ignore
13//! if let Some((reader, budget)) = byte_range_reader(path, index, peers) {
14//!     let mut buf = Vec::new();
15//!     let mut consumed = 0u64;
16//!     while consumed < budget {
17//!         buf.clear();
18//!         let n = reader.read_until(b'\n', &mut buf).unwrap_or(0);
19//!         if n == 0 { break; }
20//!         consumed += n as u64;
21//!         // parse &buf …
22//!     }
23//! }
24//! ```
25
26use std::fs::File;
27use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
28use std::path::Path;
29
30// =========================================================================
31// Per-worker partitioning
32// =========================================================================
33
34/// Split `v` into `n` roughly-equal owned partitions, in order.
35///
36/// Used by the generated library-mode engine to hand each timely worker its
37/// own slice by value — no `Arc` sharing, no per-tuple clone, each tuple
38/// moves directly into the worker's `InputSession`.
39///
40/// `n.max(1)` partitions are produced; if `v.len() < n` some partitions
41/// are empty. The last partition absorbs any remainder when the division
42/// doesn't come out evenly.
43pub fn partition<T>(v: Vec<T>, n: usize) -> Vec<Vec<T>> {
44    let n = n.max(1);
45    let total = v.len();
46    let chunk = total / n;
47    let mut iter = v.into_iter();
48    (0..n)
49        .map(|i| {
50            let take = if i + 1 == n { iter.len() } else { chunk };
51            iter.by_ref().take(take).collect()
52        })
53        .collect()
54}
55
56// =========================================================================
57// Byte-range file reader
58// =========================================================================
59
60/// Open a byte-range slice of `path` for worker `index` out of `peers`.
61///
62/// Returns `Some((reader, bytes_to_read))` on success. The reader is
63/// pre-seeked to the start of the worker's range (aligned to the next
64/// line boundary for non-zero workers). The caller should read up to
65/// `bytes_to_read` bytes, stopping at the first complete line beyond
66/// that budget.
67///
68/// Returns `None` on I/O error (logged to stderr).
69pub fn byte_range_reader(
70    path: &Path,
71    index: usize,
72    peers: usize,
73) -> Option<(BufReader<File>, u64)> {
74    let mut file = match File::open(path) {
75        Ok(f) => f,
76        Err(e) => {
77            eprintln!("[flowlog-runtime::io] failed to open {}: {e}", path.display());
78            return None;
79        }
80    };
81
82    let file_size = match file.metadata() {
83        Ok(m) => m.len(),
84        Err(e) => {
85            eprintln!("[flowlog-runtime::io] failed to stat {}: {e}", path.display());
86            return None;
87        }
88    };
89
90    let chunk = file_size / peers as u64;
91    let start = chunk * index as u64;
92    let end = if index == peers - 1 {
93        file_size
94    } else {
95        chunk * (index + 1) as u64
96    };
97
98    // Nothing to read for this worker.
99    if start >= end {
100        return Some((BufReader::new(file), 0));
101    }
102
103    // Any worker whose range begins at byte 0 reads from the start with no
104    // alignment skip — there's no previous byte to peek at. Worker 0 always
105    // hits this; others hit it when `chunk == 0` (peers > file_size), which
106    // puts the whole file on the last worker.
107    if start == 0 {
108        return Some((BufReader::new(file), end));
109    }
110
111    // Non-zero start: seek to `start - 1` and peek the byte just before our
112    // range. If it's a newline we're on a line boundary; otherwise skip the
113    // rest of the partial line.
114    if file.seek(SeekFrom::Start(start - 1)).is_err() {
115        return Some((BufReader::new(file), 0));
116    }
117
118    let mut reader = BufReader::new(file);
119    let mut peek = [0u8; 1];
120    if reader.read_exact(&mut peek).is_err() {
121        return Some((reader, 0));
122    }
123
124    if peek[0] == b'\n' {
125        // Exactly on a line boundary.
126        return Some((reader, end - start));
127    }
128
129    // Mid-line: skip the rest of this partial line.
130    let mut discard = Vec::new();
131    let skipped = reader.read_until(b'\n', &mut discard).unwrap_or(0);
132    Some((reader, (end - start).saturating_sub(skipped as u64)))
133}
134
135// =========================================================================
136// First-column sharding
137// =========================================================================
138
139/// Shard an integer-typed first column across `peers` workers.
140///
141/// Returns `true` if worker `index` should own this tuple.
142#[inline]
143pub fn shard_int(first: i64, peers: usize, index: usize) -> bool {
144    first.rem_euclid(peers as i64) as usize == index
145}
146
147/// Shard a string-typed first column across `peers` workers.
148///
149/// Uses a 32-bit FNV-1a hash to distribute strings uniformly.
150#[inline]
151pub fn shard_str(first: &str, peers: usize, index: usize) -> bool {
152    let mut hash: u32 = 0x811c9dc5;
153    for &b in first.as_bytes() {
154        hash ^= b as u32;
155        hash = hash.wrapping_mul(0x01000193);
156    }
157    (hash as usize) % peers == index
158}
159
160/// Shard an interned-string first column ([`lasso::Spur`]) across `peers`.
161#[inline]
162pub fn shard_spur(first: lasso::Spur, peers: usize, index: usize) -> bool {
163    (first.into_inner().get() as usize) % peers == index
164}