Skip to main content

noxu_db/
join_cursor.rs

1//! Join cursor for multi-secondary-index intersection queries.
2//!
3//! Implements `JoinCursor`.  A join cursor is obtained by calling
4//! [`Database::join`] with an array of
5//! [`SecondaryCursor`] objects,
6//! each pre-positioned at the desired secondary key value.
7//!
8//! # Algorithm
9//!
10//! The join algorithm is a faithful port of 's natural-join algorithm:
11//!
12//! 1. Iterate through all candidate primary keys from cursor C(0) —
13//!    in are the "duplicate" records sharing C(0)'s secondary key.
14//!    In Noxu's current one-to-one secondary model there is at most one
15//!    candidate per secondary key position.
16//!
17//! 2. For each candidate primary key, probe cursors C(1) through C(n) to
18//!    confirm the candidate also appears in each of their secondary keys.
19//!    The probe does **not** read the primary database.
20//!
21//! 3. If all probes succeed, optionally read the primary record and
22//!    return the key (and data if requested).
23//!
24//! 4. Cursor order matters: cursors by ascending duplicate count
25//!    unless `JoinConfig::no_sort` is set.
26//!
27//! # Example
28//!
29//! ```ignore
30//! let mut cur1 = sec_db1.open_cursor(None, None)?;
31//! let mut cur2 = sec_db2.open_cursor(None, None)?;
32//!
33//! // Position each cursor at its desired secondary key.
34//! cur1.get_search_key(&sec_key1, &mut p_key, &mut data)?;
35//! cur2.get_search_key(&sec_key2, &mut p_key, &mut data)?;
36//!
37//! let mut join = primary_db.join(vec![cur1, cur2], None)?;
38//!
39//! let mut pri_key = DatabaseEntry::new();
40//! let mut pri_data = DatabaseEntry::new();
41//! while join.get_next(&mut pri_key, &mut pri_data)? == OperationStatus::Success {
42//!     // use pri_key, pri_data
43//! }
44//! join.close();
45//! ```
46
47use crate::database::Database;
48use crate::database_entry::DatabaseEntry;
49use crate::error::Result;
50use crate::join_config::JoinConfig;
51use crate::operation_status::OperationStatus;
52use crate::secondary_cursor::SecondaryCursor;
53
54/// A cursor that returns records satisfying all secondary-key constraints.
55///
56/// Obtained via [`Database::join`][crate::database::Database::join].
57///
58/// The cursor owns the [`SecondaryCursor`] objects for the duration of the
59/// join.  When [`close`][JoinCursor::close] is called (or the cursor is
60/// dropped), the internal cursors are released.  The caller's original
61/// cursor variables have been moved in so they are no longer accessible.
62pub struct JoinCursor<'a> {
63    /// Primary database (for final record retrieval).
64    primary_db: &'a Database,
65    /// Internal (optionally sorted) copies of the secondary cursors.
66    cursors: Vec<SecondaryCursor<'a>>,
67    config: JoinConfig,
68    /// Pending candidate primary keys collected from cursor[0].
69    candidates: std::collections::VecDeque<Vec<u8>>,
70    /// `true` once there are no more candidates to process.
71    exhausted: bool,
72}
73
74impl<'a> JoinCursor<'a> {
75    /// Creates a new `JoinCursor`.
76    ///
77    /// Sorts `cursors` by ascending `count_estimate()` unless
78    /// `config.no_sort` is `true`, mirroring 's optimisation.
79    pub(crate) fn new(
80        primary_db: &'a Database,
81        mut cursors: Vec<SecondaryCursor<'a>>,
82        config: Option<JoinConfig>,
83    ) -> Result<Self> {
84        let config = config.unwrap_or_default();
85
86        if !config.no_sort && cursors.len() > 1 {
87            // Collect estimates first (avoids repeated mutable borrows).
88            let estimates: Vec<u64> =
89                cursors.iter_mut().map(|c| c.count_estimate()).collect();
90            // Stable sort by estimate ascending (smallest first = fewest candidates).
91            let mut indexed: Vec<(usize, u64)> =
92                estimates.iter().copied().enumerate().collect();
93            indexed.sort_by_key(|&(_, est)| est);
94            let order: Vec<usize> =
95                indexed.into_iter().map(|(i, _)| i).collect();
96            let mut sorted = Vec::with_capacity(cursors.len());
97            let mut slots: Vec<Option<SecondaryCursor<'a>>> =
98                cursors.into_iter().map(Some).collect();
99            for idx in order {
100                sorted.push(slots[idx].take().unwrap());
101            }
102            cursors = sorted;
103        }
104
105        // Collect the initial set of candidate primary keys from cursor[0].
106        //  these are all "duplicate" records with the same secondary key.
107        // In Noxu's current one-to-one secondary model there is at most one.
108        let mut candidates = std::collections::VecDeque::new();
109        if let Some(first) = cursors.first_mut()
110            && let Some(pk) = first.get_current_primary_key_only()?
111        {
112            candidates.push_back(pk);
113            // Collect all duplicates at this secondary key position.
114            // For non-dup secondaries this loop runs at most once; for
115            // sorted-dup secondaries it drains all entries sharing the
116            // same secondary key value ( JoinCursor.getNext() pattern).
117            while first.get_next_dup()? == OperationStatus::Success {
118                if let Some(pk_extra) = first.get_current_primary_key_only()? {
119                    candidates.push_back(pk_extra);
120                }
121            }
122        }
123
124        let exhausted = candidates.is_empty();
125        Ok(Self { primary_db, cursors, config, candidates, exhausted })
126    }
127
128    /// Returns the next primary key **and** primary record data from the join.
129    ///
130    /// Returns `OperationStatus::Success` with `key` and `data` filled in,
131    /// or `OperationStatus::NotFound` when there are no more matching records.
132    pub fn get_next(
133        &mut self,
134        key: &mut DatabaseEntry,
135        data: &mut DatabaseEntry,
136    ) -> Result<OperationStatus> {
137        loop {
138            let candidate = match self.next_matching_candidate()? {
139                Some(c) => c,
140                None => return Ok(OperationStatus::NotFound),
141            };
142
143            // Fetch primary record.
144            let pri_key_entry = DatabaseEntry::from_bytes(&candidate);
145            let status = self.primary_db.get(None, &pri_key_entry, data)?;
146            if status != OperationStatus::Success {
147                // Primary was concurrently deleted (read-uncommitted path); skip.
148                continue;
149            }
150            key.set_data(&candidate);
151            return Ok(OperationStatus::Success);
152        }
153    }
154
155    /// Returns the next primary key **only** — does not read primary data.
156    ///
157    /// Equivalent to 's `JoinCursor.getNext(key, lockMode)` single-arg
158    /// overload.  Useful when only the key is needed and avoiding a primary
159    /// read is desirable.
160    pub fn get_next_key(
161        &mut self,
162        key: &mut DatabaseEntry,
163    ) -> Result<OperationStatus> {
164        match self.next_matching_candidate()? {
165            None => Ok(OperationStatus::NotFound),
166            Some(candidate) => {
167                key.set_data(&candidate);
168                Ok(OperationStatus::Success)
169            }
170        }
171    }
172
173    /// Closes the join cursor, releasing all internal secondary cursors.
174    pub fn close(self) {
175        // Dropping self drops cursors via SecondaryCursor::drop.
176    }
177
178    /// Returns a reference to the primary database associated with this cursor.
179    pub fn get_database(&self) -> &Database {
180        self.primary_db
181    }
182
183    /// Returns a clone of this cursor's configuration.
184    pub fn get_config(&self) -> JoinConfig {
185        self.config.clone()
186    }
187
188    // ------------------------------------------------------------------
189    // Internal join algorithm
190    // ------------------------------------------------------------------
191
192    /// Advances the join state and returns the next candidate primary key
193    /// that satisfies all secondary-cursor probes, or `None` when exhausted.
194    ///
195    /// On each call:
196    /// 1. Pop a candidate from the deque.  If empty, try to advance
197    ///    cursor[0] to its next duplicate.
198    /// 2. Probe cursor[1..n] — if any probe fails, loop to next candidate.
199    /// 3. Return the matching candidate bytes.
200    fn next_matching_candidate(&mut self) -> Result<Option<Vec<u8>>> {
201        if self.exhausted {
202            return Ok(None);
203        }
204
205        loop {
206            // --- Refill candidates from cursor[0]'s next duplicate ---
207            if self.candidates.is_empty() {
208                match self.cursors[0].get_next_dup()? {
209                    OperationStatus::Success => {
210                        if let Some(pk) =
211                            self.cursors[0].get_current_primary_key_only()?
212                        {
213                            self.candidates.push_back(pk);
214                        }
215                    }
216                    _ => {
217                        self.exhausted = true;
218                        return Ok(None);
219                    }
220                }
221            }
222
223            let candidate = match self.candidates.pop_front() {
224                Some(c) => c,
225                None => {
226                    self.exhausted = true;
227                    return Ok(None);
228                }
229            };
230
231            // --- Probe cursors[1..n] ---
232            let mut all_match = true;
233            for cursor in &mut self.cursors[1..] {
234                if !cursor.has_candidate_primary_key(&candidate)? {
235                    all_match = false;
236                    break;
237                }
238            }
239
240            if all_match {
241                return Ok(Some(candidate));
242            }
243            // Probe failed — advance cursor[0] to next duplicate on next iter.
244        }
245    }
246}
247
248impl Drop for JoinCursor<'_> {
249    fn drop(&mut self) {
250        // SecondaryCursors in self.cursors are dropped here automatically.
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use crate::database::Database;
258    use crate::database_config::DatabaseConfig;
259    use crate::environment::Environment;
260    use crate::environment_config::EnvironmentConfig;
261    use crate::secondary_config::{SecondaryConfig, SecondaryKeyCreator};
262    use crate::secondary_database::SecondaryDatabase;
263    use noxu_sync::Mutex;
264    use std::sync::Arc;
265    use tempfile::TempDir;
266
267    // ------------------------------------------------------------------
268    // Helper key creators
269    // ------------------------------------------------------------------
270
271    /// Extracts the first byte of the data as the secondary key.
272    struct FirstByteCreator;
273    impl SecondaryKeyCreator for FirstByteCreator {
274        fn create_secondary_key(
275            &self,
276            _db: &Database,
277            _key: &DatabaseEntry,
278            data: &DatabaseEntry,
279            result: &mut DatabaseEntry,
280        ) -> bool {
281            if let Some(d) = data.get_data()
282                && !d.is_empty()
283            {
284                result.set_data(&d[..1]);
285                true
286            } else {
287                false
288            }
289        }
290    }
291
292    /// Extracts the last byte of the data as the secondary key.
293    struct LastByteCreator;
294    impl SecondaryKeyCreator for LastByteCreator {
295        fn create_secondary_key(
296            &self,
297            _db: &Database,
298            _key: &DatabaseEntry,
299            data: &DatabaseEntry,
300            result: &mut DatabaseEntry,
301        ) -> bool {
302            if let Some(d) = data.get_data()
303                && !d.is_empty()
304            {
305                result.set_data(&d[d.len() - 1..]);
306                true
307            } else {
308                false
309            }
310        }
311    }
312
313    // ------------------------------------------------------------------
314    // Fixture
315    // ------------------------------------------------------------------
316
317    struct Fixture {
318        _tmp: TempDir,
319        _env: Environment,
320        primary: Arc<Mutex<Database>>,
321        sec1: SecondaryDatabase,
322        sec2: SecondaryDatabase,
323    }
324
325    impl Fixture {
326        fn new() -> Self {
327            let tmp = TempDir::new().unwrap();
328            let env_cfg = EnvironmentConfig::new(tmp.path().to_path_buf())
329                .with_allow_create(true)
330                .with_transactional(true);
331            let env = Environment::open(env_cfg).unwrap();
332
333            let db_cfg = DatabaseConfig::new().with_allow_create(true);
334            let pri_db = env.open_database(None, "primary", &db_cfg).unwrap();
335            let primary = Arc::new(Mutex::new(pri_db));
336
337            // v1.6 sorted-dup secondaries: inner index DB needs dups.
338            let sec_db_cfg = DatabaseConfig::new()
339                .with_allow_create(true)
340                .with_sorted_duplicates(true);
341            let sec1_store =
342                env.open_database(None, "sec1", &sec_db_cfg).unwrap();
343            let sec1 = SecondaryDatabase::open(
344                Arc::clone(&primary),
345                sec1_store,
346                SecondaryConfig::new()
347                    .with_allow_create(true)
348                    .with_key_creator(Box::new(FirstByteCreator)),
349            )
350            .unwrap();
351
352            let sec2_store =
353                env.open_database(None, "sec2", &sec_db_cfg).unwrap();
354            let sec2 = SecondaryDatabase::open(
355                Arc::clone(&primary),
356                sec2_store,
357                SecondaryConfig::new()
358                    .with_allow_create(true)
359                    .with_key_creator(Box::new(LastByteCreator)),
360            )
361            .unwrap();
362
363            Fixture { _tmp: tmp, _env: env, primary, sec1, sec2 }
364        }
365
366        fn insert(&self, pk: &[u8], val: &[u8]) {
367            let k = DatabaseEntry::from_bytes(pk);
368            let v = DatabaseEntry::from_bytes(val);
369            self.primary.lock().put(None, &k, &v).unwrap();
370            self.sec1.update_secondary(None, &k, None, Some(&v)).unwrap();
371            self.sec2.update_secondary(None, &k, None, Some(&v)).unwrap();
372        }
373    }
374
375    // ------------------------------------------------------------------
376    // Tests
377    // ------------------------------------------------------------------
378
379    /// Two secondary cursors positioned at keys where only pk1 matches both.
380    ///
381    /// Data layout:
382    ///   pk1 → b"AB"  (first byte 'A', last byte 'B')
383    ///   pk2 → b"AC"  (first byte 'A', last byte 'C')
384    ///   pk3 → b"XB"  (first byte 'X', last byte 'B')
385    ///
386    /// sec1 (first byte) at 'A' → {pk1, pk2}  (in the one-to-one model: pk1 only)
387    /// sec2 (last byte)  at 'B' → {pk1, pk3}  (in the one-to-one model: pk1 only)
388    /// Intersection → {pk1}
389    ///
390    /// Decision 1B (`docs/src/internal/v1.5-decisions-2026-05.md`):
391    /// v1.5 secondaries are one-to-one, so the second primary that
392    /// resolves to the same secondary key now returns
393    /// `NoxuError::Unsupported` from `update_secondary` rather than
394    /// silently overwriting (audit finding C4).  This test exercises a
395    /// JoinCursor over distinct primaries that share secondary keys —
396    /// the v1.6 sorted-dup feature.  Re-enable when sorted-dup
397    /// secondaries land (audit finding F7).
398    #[test]
399    #[ignore = "requires v1.6 sorted-dup secondaries; see Decision 1B / audit F7"]
400    fn test_join_intersection_finds_single_match() {
401        let fix = Fixture::new();
402        // In the one-to-one model sec1['A'] stores the last inserted 'A' record.
403        // Insert pk1 last so it is the record held by sec1['A'] and sec2['B'].
404        fix.insert(b"pk2", b"AC");
405        fix.insert(b"pk3", b"XB");
406        fix.insert(b"pk1", b"AB");
407
408        let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
409        {
410            let mut p_key = DatabaseEntry::new();
411            let mut data = DatabaseEntry::new();
412            let s = cursor1
413                .get_search_key(
414                    &DatabaseEntry::from_bytes(b"A"),
415                    &mut p_key,
416                    &mut data,
417                )
418                .unwrap();
419            assert_eq!(s, OperationStatus::Success);
420        }
421
422        let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
423        {
424            let mut p_key = DatabaseEntry::new();
425            let mut data = DatabaseEntry::new();
426            let s = cursor2
427                .get_search_key(
428                    &DatabaseEntry::from_bytes(b"B"),
429                    &mut p_key,
430                    &mut data,
431                )
432                .unwrap();
433            assert_eq!(s, OperationStatus::Success);
434        }
435
436        let pri_guard = fix.primary.lock();
437        let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
438
439        let mut key = DatabaseEntry::new();
440        let mut data = DatabaseEntry::new();
441        let status = join.get_next(&mut key, &mut data).unwrap();
442        assert_eq!(status, OperationStatus::Success);
443        assert_eq!(key.get_data().unwrap(), b"pk1");
444        assert_eq!(data.get_data().unwrap(), b"AB");
445
446        // No more results.
447        let status2 = join.get_next(&mut key, &mut data).unwrap();
448        assert_eq!(status2, OperationStatus::NotFound);
449    }
450
451    /// Join over an empty secondary cursor returns NotFound immediately.
452    #[test]
453    fn test_join_empty_cursor_returns_not_found() {
454        let fix = Fixture::new();
455
456        let cursor1 = fix.sec1.open_cursor(None, None).unwrap();
457        let cursor2 = fix.sec2.open_cursor(None, None).unwrap();
458
459        // Cursors not positioned (no records) → join returns NotFound.
460        let pri_guard = fix.primary.lock();
461        let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
462
463        let mut key = DatabaseEntry::new();
464        let mut data = DatabaseEntry::new();
465        let status = join.get_next(&mut key, &mut data).unwrap();
466        assert_eq!(status, OperationStatus::NotFound);
467    }
468
469    /// `get_next_key` returns only the primary key without primary data.
470    #[test]
471    fn test_join_get_next_key_only() {
472        let fix = Fixture::new();
473        fix.insert(b"mypk", b"AB");
474
475        let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
476        {
477            let mut p_key = DatabaseEntry::new();
478            let mut data = DatabaseEntry::new();
479            cursor1
480                .get_search_key(
481                    &DatabaseEntry::from_bytes(b"A"),
482                    &mut p_key,
483                    &mut data,
484                )
485                .unwrap();
486        }
487        let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
488        {
489            let mut p_key = DatabaseEntry::new();
490            let mut data = DatabaseEntry::new();
491            cursor2
492                .get_search_key(
493                    &DatabaseEntry::from_bytes(b"B"),
494                    &mut p_key,
495                    &mut data,
496                )
497                .unwrap();
498        }
499
500        let pri_guard = fix.primary.lock();
501        let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
502
503        let mut key = DatabaseEntry::new();
504        let status = join.get_next_key(&mut key).unwrap();
505        assert_eq!(status, OperationStatus::Success);
506        assert_eq!(key.get_data().unwrap(), b"mypk");
507    }
508
509    /// `no_sort = true` preserves cursor order and still finds the match.
510    #[test]
511    fn test_join_config_no_sort() {
512        let fix = Fixture::new();
513        fix.insert(b"pk1", b"AB");
514
515        let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
516        {
517            let mut p_key = DatabaseEntry::new();
518            let mut data = DatabaseEntry::new();
519            cursor1
520                .get_search_key(
521                    &DatabaseEntry::from_bytes(b"A"),
522                    &mut p_key,
523                    &mut data,
524                )
525                .unwrap();
526        }
527        let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
528        {
529            let mut p_key = DatabaseEntry::new();
530            let mut data = DatabaseEntry::new();
531            cursor2
532                .get_search_key(
533                    &DatabaseEntry::from_bytes(b"B"),
534                    &mut p_key,
535                    &mut data,
536                )
537                .unwrap();
538        }
539
540        let config = JoinConfig::new().with_no_sort(true);
541        let pri_guard = fix.primary.lock();
542        let mut join =
543            pri_guard.join(vec![cursor1, cursor2], Some(config)).unwrap();
544        assert!(join.get_config().no_sort);
545
546        let mut key = DatabaseEntry::new();
547        let mut data = DatabaseEntry::new();
548        let status = join.get_next(&mut key, &mut data).unwrap();
549        assert_eq!(status, OperationStatus::Success);
550        assert_eq!(key.get_data().unwrap(), b"pk1");
551    }
552
553    /// No match when secondary keys do not overlap.
554    #[test]
555    fn test_join_no_intersection() {
556        let fix = Fixture::new();
557        // pk1: first='A', last='A'
558        // pk2: first='B', last='B'
559        // sec1 at 'A' → pk1, sec2 at 'B' → pk2 — no intersection.
560        fix.insert(b"pk1", b"AA");
561        fix.insert(b"pk2", b"BB");
562
563        let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
564        {
565            let mut p_key = DatabaseEntry::new();
566            let mut data = DatabaseEntry::new();
567            cursor1
568                .get_search_key(
569                    &DatabaseEntry::from_bytes(b"A"),
570                    &mut p_key,
571                    &mut data,
572                )
573                .unwrap();
574        }
575        let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
576        {
577            let mut p_key = DatabaseEntry::new();
578            let mut data = DatabaseEntry::new();
579            cursor2
580                .get_search_key(
581                    &DatabaseEntry::from_bytes(b"B"),
582                    &mut p_key,
583                    &mut data,
584                )
585                .unwrap();
586        }
587
588        let pri_guard = fix.primary.lock();
589        let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
590
591        let mut key = DatabaseEntry::new();
592        let mut data = DatabaseEntry::new();
593        let status = join.get_next(&mut key, &mut data).unwrap();
594        assert_eq!(status, OperationStatus::NotFound);
595    }
596
597    /// Single-cursor join acts as a filtered scan over one secondary.
598    #[test]
599    fn test_join_single_cursor() {
600        let fix = Fixture::new();
601        fix.insert(b"pk1", b"AB");
602
603        let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
604        {
605            let mut p_key = DatabaseEntry::new();
606            let mut data = DatabaseEntry::new();
607            cursor1
608                .get_search_key(
609                    &DatabaseEntry::from_bytes(b"A"),
610                    &mut p_key,
611                    &mut data,
612                )
613                .unwrap();
614        }
615
616        let pri_guard = fix.primary.lock();
617        let mut join = pri_guard.join(vec![cursor1], None).unwrap();
618
619        let mut key = DatabaseEntry::new();
620        let mut data = DatabaseEntry::new();
621        let status = join.get_next(&mut key, &mut data).unwrap();
622        assert_eq!(status, OperationStatus::Success);
623        assert_eq!(key.get_data().unwrap(), b"pk1");
624    }
625
626    /// `get_database()` returns the primary database.
627    #[test]
628    fn test_join_get_database() {
629        let fix = Fixture::new();
630        fix.insert(b"pk1", b"AB");
631
632        let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
633        {
634            let mut p_key = DatabaseEntry::new();
635            let mut data = DatabaseEntry::new();
636            cursor1
637                .get_search_key(
638                    &DatabaseEntry::from_bytes(b"A"),
639                    &mut p_key,
640                    &mut data,
641                )
642                .unwrap();
643        }
644
645        let pri_guard = fix.primary.lock();
646        let join = pri_guard.join(vec![cursor1], None).unwrap();
647        assert_eq!(join.get_database().get_database_name(), "primary");
648    }
649
650    /// `close()` releases the cursors without panicking.
651    #[test]
652    fn test_join_close() {
653        let fix = Fixture::new();
654        fix.insert(b"pk1", b"AB");
655
656        let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
657        {
658            let mut p_key = DatabaseEntry::new();
659            let mut data = DatabaseEntry::new();
660            cursor1
661                .get_search_key(
662                    &DatabaseEntry::from_bytes(b"A"),
663                    &mut p_key,
664                    &mut data,
665                )
666                .unwrap();
667        }
668
669        let pri_guard = fix.primary.lock();
670        let join = pri_guard.join(vec![cursor1], None).unwrap();
671        join.close(); // must not panic
672    }
673}