Skip to main content

sparrowdb_execution/
join_spill.rs

1//! Spill-to-disk hash-join for ASP-Join on large intermediate hash tables.
2//!
3//! When the build-side hash map would exceed `SPILL_THRESHOLD` entries, this
4//! operator partitions the probe and build sides by `node_slot % num_partitions`
5//! and processes one partition at a time, spilling overflow partitions to temp
6//! files.  This keeps peak memory bounded at `SPILL_THRESHOLD` entries per
7//! partition.
8//!
9//! The public API is intentionally compatible with [`AspJoin::two_hop`]:
10//! `SpillingHashJoin::two_hop(src_slot)` returns the same deduplicated, sorted
11//! set of friend-of-friend slots.
12//!
13//! SPA-114
14
15use std::collections::{HashMap, HashSet};
16use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
17
18use sparrowdb_common::{Error, Result};
19use sparrowdb_storage::csr::CsrForward;
20use tempfile::NamedTempFile;
21
22/// Default hash-map entry count before partitioning/spilling.
23pub const SPILL_THRESHOLD: usize = 500_000;
24
25/// Default number of partitions when spilling.
26const NUM_PARTITIONS: usize = 16;
27
28// ---------------------------------------------------------------------------
29// SpillingHashJoin
30// ---------------------------------------------------------------------------
31
32/// ASP-Join with spill-to-disk support for large intermediate hash tables.
33///
34/// When the hash map built from a source node's 2-hop neighbourhood exceeds
35/// [`SPILL_THRESHOLD`] entries, the operator partitions intermediate data into
36/// temp files and processes them one partition at a time.
37pub struct SpillingHashJoin<'a> {
38    csr: &'a CsrForward,
39    spill_threshold: usize,
40    num_partitions: usize,
41}
42
43impl<'a> SpillingHashJoin<'a> {
44    /// Create with default thresholds.
45    pub fn new(csr: &'a CsrForward) -> Self {
46        SpillingHashJoin {
47            csr,
48            spill_threshold: SPILL_THRESHOLD,
49            num_partitions: NUM_PARTITIONS,
50        }
51    }
52
53    /// Create with explicit thresholds (useful for testing spill behaviour).
54    pub fn with_thresholds(
55        csr: &'a CsrForward,
56        spill_threshold: usize,
57        num_partitions: usize,
58    ) -> Self {
59        SpillingHashJoin {
60            csr,
61            spill_threshold,
62            num_partitions: num_partitions.max(1), // clamp to avoid divide-by-zero
63        }
64    }
65
66    /// Compute 2-hop friends-of-friends for `src_slot`.
67    ///
68    /// Returns a deduplicated, sorted set of fof slots, identical in
69    /// semantics to [`AspJoin::two_hop`].
70    pub fn two_hop(&self, src_slot: u64) -> Result<Vec<u64>> {
71        let direct = self.csr.neighbors(src_slot);
72        if direct.is_empty() {
73            return Ok(vec![]);
74        }
75
76        // Fast path: if the total fof entry count is below threshold, use a
77        // plain in-memory hash map — no spill overhead.
78        let total_fof_estimate: usize = direct
79            .iter()
80            .map(|&mid| self.csr.neighbors(mid).len())
81            .sum();
82
83        if total_fof_estimate <= self.spill_threshold {
84            return self.two_hop_in_memory(direct);
85        }
86
87        // Slow path: partition-based spill.
88        self.two_hop_spilling(direct)
89    }
90
91    // ── In-memory path ────────────────────────────────────────────────────
92
93    fn two_hop_in_memory(&self, direct: &[u64]) -> Result<Vec<u64>> {
94        let mut hash: HashMap<u64, Vec<u64>> = HashMap::new();
95        for &mid in direct {
96            let fof_list = self.csr.neighbors(mid);
97            if !fof_list.is_empty() {
98                hash.entry(mid).or_default().extend_from_slice(fof_list);
99            }
100        }
101
102        let mut fof_set: HashSet<u64> = HashSet::new();
103        for &mid in direct {
104            if let Some(fof_list) = hash.get(&mid) {
105                fof_set.extend(fof_list.iter().copied());
106            }
107        }
108
109        let mut result: Vec<u64> = fof_set.into_iter().collect();
110        result.sort_unstable();
111        Ok(result)
112    }
113
114    // ── Spilling path ─────────────────────────────────────────────────────
115
116    fn two_hop_spilling(&self, direct: &[u64]) -> Result<Vec<u64>> {
117        let np = self.num_partitions;
118
119        // Phase 1: distribute (mid, fof) pairs into per-partition temp files.
120        let mut part_files: Vec<NamedTempFile> = (0..np)
121            .map(|_| NamedTempFile::new().map_err(Error::Io))
122            .collect::<Result<_>>()?;
123
124        {
125            let mut writers: Vec<BufWriter<&mut std::fs::File>> = part_files
126                .iter_mut()
127                .map(|f| BufWriter::new(f.as_file_mut()))
128                .collect();
129
130            for &mid in direct {
131                let fof_list = self.csr.neighbors(mid);
132                if fof_list.is_empty() {
133                    continue;
134                }
135                let p = (mid as usize) % np;
136                for &fof in fof_list {
137                    write_u64_pair(&mut writers[p], mid, fof)?;
138                }
139            }
140
141            for w in &mut writers {
142                w.flush().map_err(Error::Io)?;
143            }
144        }
145
146        // Phase 2: process each partition independently.
147        let mut fof_set: HashSet<u64> = HashSet::new();
148
149        for file in &mut part_files {
150            file.as_file_mut()
151                .seek(SeekFrom::Start(0))
152                .map_err(Error::Io)?;
153            let mut reader = BufReader::new(file.as_file_mut());
154
155            let mut hash: HashMap<u64, Vec<u64>> = HashMap::new();
156            while let Some((mid, fof)) = read_u64_pair(&mut reader)? {
157                hash.entry(mid).or_default().push(fof);
158            }
159
160            for fof_list in hash.values() {
161                fof_set.extend(fof_list.iter().copied());
162            }
163        }
164
165        let mut result: Vec<u64> = fof_set.into_iter().collect();
166        result.sort_unstable();
167        Ok(result)
168    }
169}
170
171// ---------------------------------------------------------------------------
172// Serialisation helpers for (u64, u64) pairs
173// ---------------------------------------------------------------------------
174
175fn write_u64_pair<W: Write>(w: &mut W, a: u64, b: u64) -> Result<()> {
176    w.write_all(&a.to_le_bytes()).map_err(Error::Io)?;
177    w.write_all(&b.to_le_bytes()).map_err(Error::Io)?;
178    Ok(())
179}
180
181fn read_u64_pair<R: Read>(r: &mut R) -> Result<Option<(u64, u64)>> {
182    let mut buf = [0u8; 8];
183    match r.read_exact(&mut buf) {
184        Ok(()) => {}
185        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
186        Err(e) => return Err(Error::Io(e)),
187    }
188    let a = u64::from_le_bytes(buf);
189    r.read_exact(&mut buf).map_err(Error::Io)?;
190    let b = u64::from_le_bytes(buf);
191    Ok(Some((a, b)))
192}
193
194// ---------------------------------------------------------------------------
195// Tests
196// ---------------------------------------------------------------------------
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use crate::join::AspJoin;
202
203    /// Build the same social graph used in join.rs tests:
204    /// Alice=0, Bob=1, Carol=2, Dave=3, Eve=4
205    /// Alice->Bob, Alice->Carol, Bob->Dave, Carol->Dave, Carol->Eve
206    fn social_graph() -> CsrForward {
207        let edges = vec![(0u64, 1u64), (0, 2), (1, 3), (2, 3), (2, 4)];
208        CsrForward::build(5u64, &edges)
209    }
210
211    /// SpillingHashJoin on the social graph should give the same result as
212    /// the baseline AspJoin (in-memory path, below threshold).
213    #[test]
214    fn join_spill_small_graph() {
215        let csr = social_graph();
216        let baseline = AspJoin::new(&csr);
217        let spilling = SpillingHashJoin::new(&csr);
218
219        // Alice (0) -> Dave (3) and Eve (4).
220        let expected = baseline.two_hop(0).unwrap();
221        let got = spilling.two_hop(0).unwrap();
222        assert_eq!(got, expected, "Alice fof mismatch");
223
224        // Bob (1) -> Dave (3)'s neighbors = none beyond the graph, so empty
225        // because Dave has no outgoing edges.
226        let expected_bob = baseline.two_hop(1).unwrap();
227        let got_bob = spilling.two_hop(1).unwrap();
228        assert_eq!(got_bob, expected_bob, "Bob fof mismatch");
229    }
230
231    /// A large ring graph: node i -> i+1 (mod N).
232    /// Each node's 2-hop fof is [i+2 mod N].
233    /// We force the spilling path by setting a tiny threshold.
234    #[test]
235    fn join_spill_large_graph() {
236        const N: u64 = 10_000;
237
238        // Build ring: 0->1, 1->2, ..., N-1->0
239        let edges: Vec<(u64, u64)> = (0..N).map(|i| (i, (i + 1) % N)).collect();
240        let csr = CsrForward::build(N, &edges);
241
242        // Baseline (in-memory AspJoin).
243        let baseline = AspJoin::new(&csr);
244
245        // Spilling variant with a very small threshold to force the spill path.
246        // Each node has exactly 1 direct friend and 1 fof.
247        let spilling = SpillingHashJoin::with_thresholds(&csr, 1, 4);
248
249        for src in 0..N {
250            let expected = baseline.two_hop(src).unwrap();
251            let got = spilling.two_hop(src).unwrap();
252            assert_eq!(got, expected, "ring fof mismatch for src={src}");
253        }
254    }
255
256    /// Node with no outgoing edges returns empty.
257    #[test]
258    fn join_spill_no_edges() {
259        let csr = CsrForward::build(3u64, &[(1u64, 2u64)]);
260        let spilling = SpillingHashJoin::new(&csr);
261        let got = spilling.two_hop(0).unwrap();
262        assert!(got.is_empty());
263    }
264
265    /// Passing `num_partitions = 0` must not panic (clamped to 1 internally).
266    #[test]
267    fn join_spill_zero_partitions_does_not_panic() {
268        let csr = CsrForward::build(3u64, &[(0u64, 1u64), (1u64, 2u64)]);
269        // threshold=0 forces the spill path; num_partitions=0 must be clamped to 1
270        let join = SpillingHashJoin::with_thresholds(&csr, 0, 0);
271        let result = join.two_hop(0).unwrap();
272        assert_eq!(result, vec![2]);
273    }
274}