packed_spatial_index 0.18.1

Packed static spatial index (Hilbert R-tree) for 2D/3D AABBs — SIMD range, kNN, raycast, and spatial-join queries, with zero-copy and streaming serialization.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
use std::sync::Arc;

use crate::persistence::{
    CHUNK_ENTRY_LEN, CHUNK_FLAG_CRITICAL, FORMAT_VERSION, LoadError, PYLD_DESC_LEN,
    PYLD_DESC_LEN_FIXED, SUPERBLOCK_LEN, TAG_PYLD, TAG_TREE, TREE_DESC_LEN, derive_level_bounds,
    expected_tree_shape, parse_pyld_chunk, parse_tree_chunk, read_u32_at, read_u64_at,
    read_u64_le_unchecked,
};

use super::StreamError;
use super::directory::{StreamCoreParts, directory_start};
use super::limits::{Budget, COALESCE_GAP_BYTES, StreamLimits, directory_node_budget};
use super::payload::{
    PayloadSection, emit_run_payloads, emit_run_payloads_fixed, payload_blob_span, payload_run_end,
    payload_run_end_fixed,
};
use super::planner::{apply_gather_run, expand_frontier, plan_gather};
use super::readers::RangeReader;

/// Dimension-independent streaming state: validated header counts, section
/// offsets, the parsed level bounds, and the cached upper-level directory.
///
/// Both the 2D and (future) 3D streaming indexes wrap one of these; only box
/// parsing and query traversal differ between dimensions.
pub(crate) struct StreamCore<R> {
    pub(crate) reader: R,
    pub(crate) node_size: usize,
    pub(crate) num_items: usize,
    pub(crate) num_nodes: usize,
    pub(crate) level_count: usize,
    /// Exclusive end offset of each level, in node positions (`level_bounds[i]`).
    pub(crate) level_bounds: Vec<usize>,
    /// Box record size in bytes.
    pub(crate) record: usize,
    /// Byte stride from one node's box to the next: `record` for the SoA layout,
    /// `record + 8` for the interleaved layout (box immediately followed by its
    /// index). The box of node `n` is always its first `record` bytes.
    pub(crate) box_stride: usize,
    /// Whether the node section is interleaved (box + index per node). When set,
    /// a node's index is read from its own record, so no separate index gather is
    /// issued — one coalesced read per level instead of two.
    pub(crate) interleaved: bool,
    /// Byte offset of the box / node section.
    pub(crate) box0: u64,
    /// Byte offset of the separate index section (SoA layout; unused interleaved).
    pub(crate) idx0: u64,
    /// First node position covered by the cached directory.
    pub(crate) dir_node_start: usize,
    /// Cached box (or node, when interleaved) bytes for positions
    /// `[dir_node_start, num_nodes)`, strided by `box_stride`. `Arc` so a
    /// directory split off with `into_parts` reattaches by a refcount bump, not
    /// a copy (cheap reuse across queries; no growth of sticky wasm memory).
    pub(crate) dir_boxes: Arc<[u8]>,
    /// Cached index bytes for the same positions (SoA layout only; empty when
    /// interleaved, where indices live inside `dir_boxes`).
    pub(crate) dir_indices: Arc<[u8]>,
    /// Optional payload section. `None` when the index carries no payload.
    pub(crate) payload: Option<PayloadSection>,
    /// Per-query cost limits applied to every query (default: unbounded).
    pub(crate) limits: StreamLimits,
}

impl<R> StreamCore<R> {
    /// Whether the index carries a payload section. No I/O, so available for
    /// both sync and async readers.
    pub(crate) fn has_payload(&self) -> bool {
        self.payload.is_some()
    }

    /// Byte gap below which records coalesce into one read (the caller's
    /// [`StreamLimits::coalesce_gap_bytes`] or the built-in default).
    pub(crate) fn coalesce_gap(&self) -> u64 {
        self.limits.coalesce_gap_bytes.unwrap_or(COALESCE_GAP_BYTES)
    }

    /// Split off the reader, keeping the reusable directory. No I/O.
    pub(crate) fn into_parts(self) -> (StreamCoreParts, R) {
        let parts = StreamCoreParts {
            node_size: self.node_size,
            num_items: self.num_items,
            num_nodes: self.num_nodes,
            level_count: self.level_count,
            level_bounds: self.level_bounds,
            record: self.record,
            box_stride: self.box_stride,
            interleaved: self.interleaved,
            box0: self.box0,
            idx0: self.idx0,
            dir_node_start: self.dir_node_start,
            dir_boxes: self.dir_boxes,
            dir_indices: self.dir_indices,
            payload: self.payload,
        };
        (parts, self.reader)
    }

    /// Reattach a reader to a previously split directory. No I/O.
    pub(crate) fn from_parts(parts: StreamCoreParts, reader: R, limits: StreamLimits) -> Self {
        StreamCore {
            reader,
            node_size: parts.node_size,
            num_items: parts.num_items,
            num_nodes: parts.num_nodes,
            level_count: parts.level_count,
            level_bounds: parts.level_bounds,
            record: parts.record,
            box_stride: parts.box_stride,
            interleaved: parts.interleaved,
            box0: parts.box0,
            idx0: parts.idx0,
            dir_node_start: parts.dir_node_start,
            dir_boxes: parts.dir_boxes,
            dir_indices: parts.dir_indices,
            payload: parts.payload,
            limits,
        }
    }
}

impl<R: RangeReader> StreamCore<R> {
    /// Open and validate a chunk-container index from `reader`: check the
    /// superblock, read the directory, locate the `TREE` (and optional `PYLD`)
    /// chunk, derive the tree shape, and prefetch the upper-level directory.
    pub(crate) fn open(
        reader: R,
        dimensions: usize,
        coord_bytes: usize,
        limits: StreamLimits,
    ) -> Result<Self, StreamError> {
        // One leading read covers the superblock (magic + version + chunk_count).
        let mut head = [0u8; SUPERBLOCK_LEN];
        reader.read_exact_at(0, &mut head)?;
        if &head[..8] != b"PSINDEX\0" {
            return Err(StreamError::Format(LoadError::BadMagic));
        }
        if u64::from_le_bytes(head[8..16].try_into().unwrap()) != FORMAT_VERSION {
            return Err(StreamError::Format(LoadError::UnsupportedVersion));
        }
        let chunk_count = read_u32_at(&head, 16)? as usize;
        let dir_len = chunk_count
            .checked_mul(CHUNK_ENTRY_LEN)
            .ok_or(LoadError::IntegerOverflow)?;
        let mut dir = vec![0u8; dir_len];
        reader.read_exact_at(SUPERBLOCK_LEN as u64, &mut dir)?;

        let file_len = reader.len();
        let mut max_end = SUPERBLOCK_LEN as u64 + dir_len as u64;
        let mut tree: Option<(u64, u64)> = None;
        let mut pyld: Option<(u64, u64)> = None;
        for i in 0..chunk_count {
            let base = i * CHUNK_ENTRY_LEN;
            let mut tag = [0u8; 4];
            tag.copy_from_slice(&dir[base..base + 4]);
            let flags = read_u32_at(&dir, base + 4)?;
            let offset = read_u64_at(&dir, base + 8)?;
            let len = read_u64_at(&dir, base + 16)?;
            let end = offset.checked_add(len).ok_or(LoadError::IntegerOverflow)?;
            if file_len.is_some_and(|fl| end > fl) {
                return Err(StreamError::Format(LoadError::InvalidTree));
            }
            max_end = max_end.max(end);
            if tag == TAG_TREE {
                tree = Some((offset, len));
            } else if tag == TAG_PYLD {
                pyld = Some((offset, len));
            } else if flags & CHUNK_FLAG_CRITICAL != 0 {
                return Err(StreamError::Format(LoadError::UnsupportedVersion));
            }
        }

        // TREE descriptor.
        // Reject a file longer than the last chunk plus its alignment pad — a
        // stray trailing byte the directory does not account for.
        let aligned_end = (max_end + 7) & !7;
        if let Some(fl) = file_len
            && fl > aligned_end
        {
            return Err(StreamError::Format(LoadError::LengthMismatch {
                expected: max_end as usize,
                actual: fl as usize,
            }));
        }
        let (toff, tlen) = tree.ok_or(LoadError::InvalidTree)?;
        if tlen < TREE_DESC_LEN as u64 {
            return Err(StreamError::Format(LoadError::Truncated));
        }
        let mut desc = [0u8; TREE_DESC_LEN];
        reader.read_exact_at(toff, &mut desc)?;
        let (td, _) = parse_tree_chunk(&desc)?;
        if td.dimensions != dimensions || td.coord_bytes != coord_bytes {
            return Err(StreamError::Format(LoadError::UnsupportedVersion));
        }
        let (num_nodes, level_count) = expected_tree_shape(td.num_items, td.node_size)?;
        let record = dimensions
            .checked_mul(2 * coord_bytes)
            .ok_or(LoadError::IntegerOverflow)?;
        let box_stride = if td.interleaved { record + 8 } else { record };
        let box0 = toff + td.desc_len as u64;
        let node_len = num_nodes
            .checked_mul(box_stride + if td.interleaved { 0 } else { 8 })
            .ok_or(LoadError::IntegerOverflow)?;
        if tlen != td.desc_len as u64 + node_len as u64 {
            return Err(StreamError::Format(LoadError::InvalidTree));
        }
        let idx0 = if td.interleaved {
            box0
        } else {
            box0 + (num_nodes * record) as u64
        };
        let level_bounds = derive_level_bounds(td.num_items, td.node_size, level_count);

        // Optional payload chunk.
        let payload = match pyld {
            Some((poff, plen)) => {
                if plen < PYLD_DESC_LEN as u64 {
                    return Err(StreamError::Format(LoadError::Truncated));
                }
                let dn = (PYLD_DESC_LEN_FIXED as u64).min(plen) as usize;
                let mut pd = [0u8; PYLD_DESC_LEN_FIXED];
                reader.read_exact_at(poff, &mut pd[..dn])?;
                let (pdesc, _) = parse_pyld_chunk(&pd[..dn])?;
                let body0 = poff + pdesc.desc_len as u64;
                if pdesc.record_stride != 0 {
                    let stride = pdesc.record_stride as u64;
                    let blob_total = (td.num_items as u64)
                        .checked_mul(stride)
                        .ok_or(StreamError::Format(LoadError::IntegerOverflow))?;
                    let need = pdesc.desc_len as u64 + blob_total;
                    if plen != need {
                        return Err(StreamError::Format(LoadError::InvalidTree));
                    }
                    Some(PayloadSection {
                        offsets_start: 0,
                        blobs_start: body0,
                        blob_total,
                        stride,
                    })
                } else {
                    let offsets_start = body0;
                    let last_at = offsets_start + (td.num_items as u64) * 8;
                    let mut last = [0u8; 8];
                    reader.read_exact_at(last_at, &mut last)?;
                    let blob_total = u64::from_le_bytes(last);
                    let blobs_start = offsets_start + (td.num_items as u64 + 1) * 8;
                    let need = pdesc.desc_len as u64 + (td.num_items as u64 + 1) * 8 + blob_total;
                    if plen != need {
                        return Err(StreamError::Format(LoadError::InvalidTree));
                    }
                    Some(PayloadSection {
                        offsets_start,
                        blobs_start,
                        blob_total,
                        stride: 0,
                    })
                }
            }
            None => None,
        };

        // Directory: cache the upper levels (a contiguous suffix of the node
        // section) up to the byte budget.
        let budget = directory_node_budget(&limits, box_stride, td.interleaved);
        let dir_node_start = directory_start(&level_bounds, level_count, budget);
        let cached_nodes = num_nodes - dir_node_start;

        let mut dir_boxes = vec![0u8; cached_nodes * box_stride];
        if !dir_boxes.is_empty() {
            let offset = box0 + (dir_node_start * box_stride) as u64;
            reader.read_exact_at(offset, &mut dir_boxes)?;
        }
        // The interleaved layout carries indices inside the node records, so the
        // separate index cache is read only for the SoA layout.
        let mut dir_indices = if td.interleaved {
            Vec::new()
        } else {
            vec![0u8; cached_nodes * 8]
        };
        if !dir_indices.is_empty() {
            let offset = idx0 + (dir_node_start * 8) as u64;
            reader.read_exact_at(offset, &mut dir_indices)?;
        }
        let dir_boxes: Arc<[u8]> = dir_boxes.into();
        let dir_indices: Arc<[u8]> = dir_indices.into();

        Ok(StreamCore {
            reader,
            node_size: td.node_size,
            num_items: td.num_items,
            num_nodes,
            level_count,
            level_bounds,
            record,
            box_stride,
            interleaved: td.interleaved,
            box0,
            idx0,
            dir_node_start,
            dir_boxes,
            dir_indices,
            payload,
            limits,
        })
    }

    /// Cached box record bytes for node `position`, if the directory covers it.
    /// The box is the first `record` bytes of the node's `box_stride`-byte slot
    /// (interleaved nodes carry their index in the trailing 8 bytes).
    pub(crate) fn cached_box_bytes(&self, position: usize) -> Option<&[u8]> {
        if position < self.dir_node_start || position >= self.num_nodes {
            return None;
        }
        let start = (position - self.dir_node_start) * self.box_stride;
        self.dir_boxes.get(start..start + self.record)
    }

    /// Gather `stride`-byte records for `positions` (sorted) from the section at
    /// `section0` into `out`. The planning and scatter live in [`plan_gather`] /
    /// [`apply_gather_run`] (shared with the async path); here we just read each
    /// coalesced run.
    #[allow(clippy::too_many_arguments)]
    fn gather(
        &self,
        positions: &[usize],
        section0: u64,
        stride: usize,
        cache: &[u8],
        out: &mut Vec<u8>,
        scratch: &mut Vec<u8>,
        budget: &mut Budget,
    ) -> Result<(), StreamError> {
        let runs = plan_gather(
            positions,
            section0,
            stride,
            self.dir_node_start,
            cache,
            out,
            self.coalesce_gap(),
        );
        for run in &runs {
            budget.charge_read(run.len)?;
            scratch.clear();
            scratch.resize(run.len, 0);
            self.reader.read_exact_at(run.offset, scratch)?;
            apply_gather_run(out, run, scratch, stride);
        }
        Ok(())
    }

    /// Descend the tree level by level, calling `leaf` once at the leaf level
    /// with the surviving leaf positions (sorted) and their gathered index bytes
    /// (the insertion ids, in the same order). `overlaps` decides box
    /// intersection; this keeps the traversal dimension- and payload-independent.
    ///
    /// At each level the frontier's boxes are fetched (cached or
    /// coalesced-streamed) and tested; survivors expand to their child groups,
    /// and a parent that fails the test prunes its whole subtree.
    fn traverse<O, L>(&self, overlaps: O, mut leaf: L) -> Result<(), StreamError>
    where
        O: Fn(&[u8]) -> bool,
        L: FnMut(&[usize], &[u8], &mut Budget) -> Result<(), StreamError>,
    {
        if self.num_items == 0 {
            return Ok(());
        }

        let mut budget = Budget::new(self.limits);
        let mut frontier = vec![self.num_nodes - 1];
        let mut level = self.level_count - 1;
        let mut boxes = Vec::new();
        let mut indices = Vec::new();
        let mut scratch = Vec::new();
        let mut survivors: Vec<usize> = Vec::new();

        loop {
            // One gather fetches each frontier node's box (interleaved: box +
            // index in the same `box_stride`-byte record; SoA: box only).
            self.gather(
                &frontier,
                self.box0,
                self.box_stride,
                &self.dir_boxes,
                &mut boxes,
                &mut scratch,
                &mut budget,
            )?;
            survivors.clear();
            indices.clear();
            for (i, &pos) in frontier.iter().enumerate() {
                let slot = i * self.box_stride;
                if overlaps(&boxes[slot..slot + self.record]) {
                    survivors.push(pos);
                    // Interleaved: the index trails the box in the same record, so
                    // no second gather is needed.
                    if self.interleaved {
                        indices
                            .extend_from_slice(&boxes[slot + self.record..slot + self.record + 8]);
                    }
                }
            }
            if survivors.is_empty() {
                return Ok(());
            }

            if !self.interleaved {
                self.gather(
                    &survivors,
                    self.idx0,
                    8,
                    &self.dir_indices,
                    &mut indices,
                    &mut scratch,
                    &mut budget,
                )?;
            }

            if level == 0 {
                // `survivors` are sorted leaf positions; `indices` their ids.
                return leaf(&survivors, &indices, &mut budget);
            }

            frontier = expand_frontier(
                &self.level_bounds,
                self.node_size,
                level,
                survivors.len(),
                &indices,
            )?;
            level -= 1;
        }
    }

    /// Visit the insertion id of every leaf whose box satisfies `overlaps`.
    pub(crate) fn visit_ids<O, F>(&self, overlaps: O, mut visit: F) -> Result<(), StreamError>
    where
        O: Fn(&[u8]) -> bool,
        F: FnMut(usize),
    {
        self.traverse(overlaps, |survivors, indices, budget| {
            for i in 0..survivors.len() {
                let id = read_index(indices, i)?;
                if id >= self.num_items {
                    return Err(StreamError::Format(LoadError::InvalidTree));
                }
                budget.charge_item()?;
                visit(id);
            }
            Ok(())
        })
    }

    /// Visit `(insertion id, payload blob)` for every leaf whose box satisfies
    /// `overlaps`, streaming the payload section in leaf order during the leaf
    /// pass so the offset table and blobs are read in coalesced runs.
    pub(crate) fn visit_payloads<O, F>(&self, overlaps: O, mut emit: F) -> Result<(), StreamError>
    where
        O: Fn(&[u8]) -> bool,
        F: FnMut(usize, &[u8]),
    {
        let section = self.payload.as_ref().ok_or(StreamError::NoPayload)?;
        let mut off_buf = Vec::new();
        let mut blob_buf = Vec::new();
        self.traverse(overlaps, |survivors, indices, budget| {
            if section.stride != 0 {
                self.gather_payloads_fixed(
                    section,
                    survivors,
                    indices,
                    &mut blob_buf,
                    budget,
                    &mut emit,
                )
            } else {
                self.gather_payloads(
                    section,
                    survivors,
                    indices,
                    &mut off_buf,
                    &mut blob_buf,
                    budget,
                    &mut emit,
                )
            }
        })
    }

    /// Stream the blobs for `leaf_positions` (sorted leaf ranks) and their
    /// `indices` (insertion ids, same order), coalescing the leaf-ordered offset
    /// table and blob region into runs. Emits `(id, blob)` per leaf.
    #[allow(clippy::too_many_arguments)]
    fn gather_payloads<F>(
        &self,
        section: &PayloadSection,
        leaf_positions: &[usize],
        indices: &[u8],
        off_buf: &mut Vec<u8>,
        blob_buf: &mut Vec<u8>,
        budget: &mut Budget,
        emit: &mut F,
    ) -> Result<(), StreamError>
    where
        F: FnMut(usize, &[u8]),
    {
        let mut j = 0;
        while j < leaf_positions.len() {
            let k = payload_run_end(leaf_positions, j, self.coalesce_gap());
            let lo = leaf_positions[j];
            let hi = leaf_positions[k];

            off_buf.clear();
            off_buf.resize((hi + 2 - lo) * 8, 0);
            budget.charge_read(off_buf.len())?;
            self.reader
                .read_exact_at(section.offsets_start + (lo * 8) as u64, off_buf)?;
            let (blob_lo, blob_hi) = payload_blob_span(off_buf, lo, hi, section.blob_total)?;

            blob_buf.clear();
            blob_buf.resize((blob_hi - blob_lo) as usize, 0);
            if !blob_buf.is_empty() {
                budget.charge_read(blob_buf.len())?;
                self.reader
                    .read_exact_at(section.blobs_start + blob_lo, blob_buf)?;
            }

            emit_run_payloads(
                leaf_positions,
                indices,
                j,
                k,
                lo,
                off_buf,
                blob_lo,
                blob_hi,
                blob_buf,
                self.num_items,
                budget,
                emit,
            )?;
            j = k + 1;
        }
        Ok(())
    }

    /// Fixed-width payload variant of [`gather_payloads`](Self::gather_payloads):
    /// no offset table, so each coalesced run is one contiguous blob read whose
    /// byte span is pure arithmetic (`lo * stride`).
    fn gather_payloads_fixed<F>(
        &self,
        section: &PayloadSection,
        leaf_positions: &[usize],
        indices: &[u8],
        blob_buf: &mut Vec<u8>,
        budget: &mut Budget,
        emit: &mut F,
    ) -> Result<(), StreamError>
    where
        F: FnMut(usize, &[u8]),
    {
        let stride = section.stride as usize;
        let mut j = 0;
        while j < leaf_positions.len() {
            let k = payload_run_end_fixed(leaf_positions, j, stride, self.coalesce_gap());
            let lo = leaf_positions[j];
            let hi = leaf_positions[k];
            let span = (hi + 1 - lo) * stride;

            blob_buf.clear();
            blob_buf.resize(span, 0);
            budget.charge_read(span)?;
            self.reader
                .read_exact_at(section.blobs_start + (lo * stride) as u64, blob_buf)?;

            emit_run_payloads_fixed(
                leaf_positions,
                indices,
                j,
                k,
                lo,
                stride,
                blob_buf,
                self.num_items,
                budget,
                emit,
            )?;
            j = k + 1;
        }
        Ok(())
    }
}

/// Read index entry `i` (a little-endian `u64`) from gathered index bytes.
pub(crate) fn read_index(bytes: &[u8], i: usize) -> Result<usize, StreamError> {
    let value = read_u64_le_unchecked(bytes, i * 8);
    usize::try_from(value).map_err(|_| StreamError::Format(LoadError::IntegerOverflow))
}