1use crate::database::Database;
48use crate::database_entry::DatabaseEntry;
49use crate::error::Result;
50use crate::join_config::JoinConfig;
51use crate::operation_status::OperationStatus;
52use crate::secondary_cursor::SecondaryCursor;
53
54pub struct JoinCursor<'a> {
63 primary_db: &'a Database,
65 cursors: Vec<SecondaryCursor<'a>>,
67 config: JoinConfig,
68 candidates: std::collections::VecDeque<Vec<u8>>,
70 exhausted: bool,
72}
73
74impl<'a> JoinCursor<'a> {
75 pub(crate) fn new(
80 primary_db: &'a Database,
81 mut cursors: Vec<SecondaryCursor<'a>>,
82 config: Option<JoinConfig>,
83 ) -> Result<Self> {
84 let config = config.unwrap_or_default();
85
86 if !config.no_sort && cursors.len() > 1 {
87 let estimates: Vec<u64> =
89 cursors.iter_mut().map(|c| c.count_estimate()).collect();
90 let mut indexed: Vec<(usize, u64)> =
92 estimates.iter().copied().enumerate().collect();
93 indexed.sort_by_key(|&(_, est)| est);
94 let order: Vec<usize> =
95 indexed.into_iter().map(|(i, _)| i).collect();
96 let mut sorted = Vec::with_capacity(cursors.len());
97 let mut slots: Vec<Option<SecondaryCursor<'a>>> =
98 cursors.into_iter().map(Some).collect();
99 for idx in order {
100 sorted.push(slots[idx].take().unwrap());
101 }
102 cursors = sorted;
103 }
104
105 let mut candidates = std::collections::VecDeque::new();
109 if let Some(first) = cursors.first_mut()
110 && let Some(pk) = first.get_current_primary_key_only()?
111 {
112 candidates.push_back(pk);
113 while first.get_next_dup()? == OperationStatus::Success {
118 if let Some(pk_extra) = first.get_current_primary_key_only()? {
119 candidates.push_back(pk_extra);
120 }
121 }
122 }
123
124 let exhausted = candidates.is_empty();
125 Ok(Self { primary_db, cursors, config, candidates, exhausted })
126 }
127
128 pub fn get_next(
133 &mut self,
134 key: &mut DatabaseEntry,
135 data: &mut DatabaseEntry,
136 ) -> Result<OperationStatus> {
137 loop {
138 let candidate = match self.next_matching_candidate()? {
139 Some(c) => c,
140 None => return Ok(OperationStatus::NotFound),
141 };
142
143 let pri_key_entry = DatabaseEntry::from_bytes(&candidate);
145 let status = self.primary_db.get(None, &pri_key_entry, data)?;
146 if status != OperationStatus::Success {
147 continue;
149 }
150 key.set_data(&candidate);
151 return Ok(OperationStatus::Success);
152 }
153 }
154
155 pub fn get_next_key(
161 &mut self,
162 key: &mut DatabaseEntry,
163 ) -> Result<OperationStatus> {
164 match self.next_matching_candidate()? {
165 None => Ok(OperationStatus::NotFound),
166 Some(candidate) => {
167 key.set_data(&candidate);
168 Ok(OperationStatus::Success)
169 }
170 }
171 }
172
173 pub fn close(self) {
175 }
177
178 pub fn get_database(&self) -> &Database {
180 self.primary_db
181 }
182
183 pub fn get_config(&self) -> JoinConfig {
185 self.config.clone()
186 }
187
188 fn next_matching_candidate(&mut self) -> Result<Option<Vec<u8>>> {
201 if self.exhausted {
202 return Ok(None);
203 }
204
205 loop {
206 if self.candidates.is_empty() {
208 match self.cursors[0].get_next_dup()? {
209 OperationStatus::Success => {
210 if let Some(pk) =
211 self.cursors[0].get_current_primary_key_only()?
212 {
213 self.candidates.push_back(pk);
214 }
215 }
216 _ => {
217 self.exhausted = true;
218 return Ok(None);
219 }
220 }
221 }
222
223 let candidate = match self.candidates.pop_front() {
224 Some(c) => c,
225 None => {
226 self.exhausted = true;
227 return Ok(None);
228 }
229 };
230
231 let mut all_match = true;
233 for cursor in &mut self.cursors[1..] {
234 if !cursor.has_candidate_primary_key(&candidate)? {
235 all_match = false;
236 break;
237 }
238 }
239
240 if all_match {
241 return Ok(Some(candidate));
242 }
243 }
245 }
246}
247
248impl Drop for JoinCursor<'_> {
249 fn drop(&mut self) {
250 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use crate::database::Database;
258 use crate::database_config::DatabaseConfig;
259 use crate::environment::Environment;
260 use crate::environment_config::EnvironmentConfig;
261 use crate::secondary_config::{SecondaryConfig, SecondaryKeyCreator};
262 use crate::secondary_database::SecondaryDatabase;
263 use noxu_sync::Mutex;
264 use std::sync::Arc;
265 use tempfile::TempDir;
266
267 struct FirstByteCreator;
273 impl SecondaryKeyCreator for FirstByteCreator {
274 fn create_secondary_key(
275 &self,
276 _db: &Database,
277 _key: &DatabaseEntry,
278 data: &DatabaseEntry,
279 result: &mut DatabaseEntry,
280 ) -> bool {
281 if let Some(d) = data.get_data()
282 && !d.is_empty()
283 {
284 result.set_data(&d[..1]);
285 true
286 } else {
287 false
288 }
289 }
290 }
291
292 struct LastByteCreator;
294 impl SecondaryKeyCreator for LastByteCreator {
295 fn create_secondary_key(
296 &self,
297 _db: &Database,
298 _key: &DatabaseEntry,
299 data: &DatabaseEntry,
300 result: &mut DatabaseEntry,
301 ) -> bool {
302 if let Some(d) = data.get_data()
303 && !d.is_empty()
304 {
305 result.set_data(&d[d.len() - 1..]);
306 true
307 } else {
308 false
309 }
310 }
311 }
312
313 struct Fixture {
318 _tmp: TempDir,
319 _env: Environment,
320 primary: Arc<Mutex<Database>>,
321 sec1: SecondaryDatabase,
322 sec2: SecondaryDatabase,
323 }
324
325 impl Fixture {
326 fn new() -> Self {
327 let tmp = TempDir::new().unwrap();
328 let env_cfg = EnvironmentConfig::new(tmp.path().to_path_buf())
329 .with_allow_create(true)
330 .with_transactional(true);
331 let env = Environment::open(env_cfg).unwrap();
332
333 let db_cfg = DatabaseConfig::new().with_allow_create(true);
334 let pri_db = env.open_database(None, "primary", &db_cfg).unwrap();
335 let primary = Arc::new(Mutex::new(pri_db));
336
337 let sec_db_cfg = DatabaseConfig::new()
339 .with_allow_create(true)
340 .with_sorted_duplicates(true);
341 let sec1_store =
342 env.open_database(None, "sec1", &sec_db_cfg).unwrap();
343 let sec1 = SecondaryDatabase::open(
344 Arc::clone(&primary),
345 sec1_store,
346 SecondaryConfig::new()
347 .with_allow_create(true)
348 .with_key_creator(Box::new(FirstByteCreator)),
349 )
350 .unwrap();
351
352 let sec2_store =
353 env.open_database(None, "sec2", &sec_db_cfg).unwrap();
354 let sec2 = SecondaryDatabase::open(
355 Arc::clone(&primary),
356 sec2_store,
357 SecondaryConfig::new()
358 .with_allow_create(true)
359 .with_key_creator(Box::new(LastByteCreator)),
360 )
361 .unwrap();
362
363 Fixture { _tmp: tmp, _env: env, primary, sec1, sec2 }
364 }
365
366 fn insert(&self, pk: &[u8], val: &[u8]) {
367 let k = DatabaseEntry::from_bytes(pk);
368 let v = DatabaseEntry::from_bytes(val);
369 self.primary.lock().put(None, &k, &v).unwrap();
370 self.sec1.update_secondary(None, &k, None, Some(&v)).unwrap();
371 self.sec2.update_secondary(None, &k, None, Some(&v)).unwrap();
372 }
373 }
374
375 #[test]
399 #[ignore = "requires v1.6 sorted-dup secondaries; see Decision 1B / audit F7"]
400 fn test_join_intersection_finds_single_match() {
401 let fix = Fixture::new();
402 fix.insert(b"pk2", b"AC");
405 fix.insert(b"pk3", b"XB");
406 fix.insert(b"pk1", b"AB");
407
408 let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
409 {
410 let mut p_key = DatabaseEntry::new();
411 let mut data = DatabaseEntry::new();
412 let s = cursor1
413 .get_search_key(
414 &DatabaseEntry::from_bytes(b"A"),
415 &mut p_key,
416 &mut data,
417 )
418 .unwrap();
419 assert_eq!(s, OperationStatus::Success);
420 }
421
422 let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
423 {
424 let mut p_key = DatabaseEntry::new();
425 let mut data = DatabaseEntry::new();
426 let s = cursor2
427 .get_search_key(
428 &DatabaseEntry::from_bytes(b"B"),
429 &mut p_key,
430 &mut data,
431 )
432 .unwrap();
433 assert_eq!(s, OperationStatus::Success);
434 }
435
436 let pri_guard = fix.primary.lock();
437 let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
438
439 let mut key = DatabaseEntry::new();
440 let mut data = DatabaseEntry::new();
441 let status = join.get_next(&mut key, &mut data).unwrap();
442 assert_eq!(status, OperationStatus::Success);
443 assert_eq!(key.get_data().unwrap(), b"pk1");
444 assert_eq!(data.get_data().unwrap(), b"AB");
445
446 let status2 = join.get_next(&mut key, &mut data).unwrap();
448 assert_eq!(status2, OperationStatus::NotFound);
449 }
450
451 #[test]
453 fn test_join_empty_cursor_returns_not_found() {
454 let fix = Fixture::new();
455
456 let cursor1 = fix.sec1.open_cursor(None, None).unwrap();
457 let cursor2 = fix.sec2.open_cursor(None, None).unwrap();
458
459 let pri_guard = fix.primary.lock();
461 let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
462
463 let mut key = DatabaseEntry::new();
464 let mut data = DatabaseEntry::new();
465 let status = join.get_next(&mut key, &mut data).unwrap();
466 assert_eq!(status, OperationStatus::NotFound);
467 }
468
469 #[test]
471 fn test_join_get_next_key_only() {
472 let fix = Fixture::new();
473 fix.insert(b"mypk", b"AB");
474
475 let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
476 {
477 let mut p_key = DatabaseEntry::new();
478 let mut data = DatabaseEntry::new();
479 cursor1
480 .get_search_key(
481 &DatabaseEntry::from_bytes(b"A"),
482 &mut p_key,
483 &mut data,
484 )
485 .unwrap();
486 }
487 let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
488 {
489 let mut p_key = DatabaseEntry::new();
490 let mut data = DatabaseEntry::new();
491 cursor2
492 .get_search_key(
493 &DatabaseEntry::from_bytes(b"B"),
494 &mut p_key,
495 &mut data,
496 )
497 .unwrap();
498 }
499
500 let pri_guard = fix.primary.lock();
501 let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
502
503 let mut key = DatabaseEntry::new();
504 let status = join.get_next_key(&mut key).unwrap();
505 assert_eq!(status, OperationStatus::Success);
506 assert_eq!(key.get_data().unwrap(), b"mypk");
507 }
508
509 #[test]
511 fn test_join_config_no_sort() {
512 let fix = Fixture::new();
513 fix.insert(b"pk1", b"AB");
514
515 let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
516 {
517 let mut p_key = DatabaseEntry::new();
518 let mut data = DatabaseEntry::new();
519 cursor1
520 .get_search_key(
521 &DatabaseEntry::from_bytes(b"A"),
522 &mut p_key,
523 &mut data,
524 )
525 .unwrap();
526 }
527 let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
528 {
529 let mut p_key = DatabaseEntry::new();
530 let mut data = DatabaseEntry::new();
531 cursor2
532 .get_search_key(
533 &DatabaseEntry::from_bytes(b"B"),
534 &mut p_key,
535 &mut data,
536 )
537 .unwrap();
538 }
539
540 let config = JoinConfig::new().with_no_sort(true);
541 let pri_guard = fix.primary.lock();
542 let mut join =
543 pri_guard.join(vec![cursor1, cursor2], Some(config)).unwrap();
544 assert!(join.get_config().no_sort);
545
546 let mut key = DatabaseEntry::new();
547 let mut data = DatabaseEntry::new();
548 let status = join.get_next(&mut key, &mut data).unwrap();
549 assert_eq!(status, OperationStatus::Success);
550 assert_eq!(key.get_data().unwrap(), b"pk1");
551 }
552
553 #[test]
555 fn test_join_no_intersection() {
556 let fix = Fixture::new();
557 fix.insert(b"pk1", b"AA");
561 fix.insert(b"pk2", b"BB");
562
563 let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
564 {
565 let mut p_key = DatabaseEntry::new();
566 let mut data = DatabaseEntry::new();
567 cursor1
568 .get_search_key(
569 &DatabaseEntry::from_bytes(b"A"),
570 &mut p_key,
571 &mut data,
572 )
573 .unwrap();
574 }
575 let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
576 {
577 let mut p_key = DatabaseEntry::new();
578 let mut data = DatabaseEntry::new();
579 cursor2
580 .get_search_key(
581 &DatabaseEntry::from_bytes(b"B"),
582 &mut p_key,
583 &mut data,
584 )
585 .unwrap();
586 }
587
588 let pri_guard = fix.primary.lock();
589 let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
590
591 let mut key = DatabaseEntry::new();
592 let mut data = DatabaseEntry::new();
593 let status = join.get_next(&mut key, &mut data).unwrap();
594 assert_eq!(status, OperationStatus::NotFound);
595 }
596
597 #[test]
599 fn test_join_single_cursor() {
600 let fix = Fixture::new();
601 fix.insert(b"pk1", b"AB");
602
603 let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
604 {
605 let mut p_key = DatabaseEntry::new();
606 let mut data = DatabaseEntry::new();
607 cursor1
608 .get_search_key(
609 &DatabaseEntry::from_bytes(b"A"),
610 &mut p_key,
611 &mut data,
612 )
613 .unwrap();
614 }
615
616 let pri_guard = fix.primary.lock();
617 let mut join = pri_guard.join(vec![cursor1], None).unwrap();
618
619 let mut key = DatabaseEntry::new();
620 let mut data = DatabaseEntry::new();
621 let status = join.get_next(&mut key, &mut data).unwrap();
622 assert_eq!(status, OperationStatus::Success);
623 assert_eq!(key.get_data().unwrap(), b"pk1");
624 }
625
626 #[test]
628 fn test_join_get_database() {
629 let fix = Fixture::new();
630 fix.insert(b"pk1", b"AB");
631
632 let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
633 {
634 let mut p_key = DatabaseEntry::new();
635 let mut data = DatabaseEntry::new();
636 cursor1
637 .get_search_key(
638 &DatabaseEntry::from_bytes(b"A"),
639 &mut p_key,
640 &mut data,
641 )
642 .unwrap();
643 }
644
645 let pri_guard = fix.primary.lock();
646 let join = pri_guard.join(vec![cursor1], None).unwrap();
647 assert_eq!(join.get_database().get_database_name(), "primary");
648 }
649
650 #[test]
652 fn test_join_close() {
653 let fix = Fixture::new();
654 fix.insert(b"pk1", b"AB");
655
656 let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
657 {
658 let mut p_key = DatabaseEntry::new();
659 let mut data = DatabaseEntry::new();
660 cursor1
661 .get_search_key(
662 &DatabaseEntry::from_bytes(b"A"),
663 &mut p_key,
664 &mut data,
665 )
666 .unwrap();
667 }
668
669 let pri_guard = fix.primary.lock();
670 let join = pri_guard.join(vec![cursor1], None).unwrap();
671 join.close(); }
673}