use crate::database::Database;
use crate::database_entry::DatabaseEntry;
use crate::error::Result;
use crate::join_config::JoinConfig;
use crate::operation_status::OperationStatus;
use crate::secondary_cursor::SecondaryCursor;
pub struct JoinCursor<'a> {
primary_db: &'a Database,
cursors: Vec<SecondaryCursor<'a>>,
config: JoinConfig,
candidates: std::collections::VecDeque<Vec<u8>>,
exhausted: bool,
}
impl<'a> JoinCursor<'a> {
pub(crate) fn new(
primary_db: &'a Database,
mut cursors: Vec<SecondaryCursor<'a>>,
config: Option<JoinConfig>,
) -> Result<Self> {
let config = config.unwrap_or_default();
if !config.no_sort && cursors.len() > 1 {
let estimates: Vec<u64> =
cursors.iter_mut().map(|c| c.count_estimate()).collect();
let mut indexed: Vec<(usize, u64)> =
estimates.iter().copied().enumerate().collect();
indexed.sort_by_key(|&(_, est)| est);
let order: Vec<usize> =
indexed.into_iter().map(|(i, _)| i).collect();
let mut sorted = Vec::with_capacity(cursors.len());
let mut slots: Vec<Option<SecondaryCursor<'a>>> =
cursors.into_iter().map(Some).collect();
for idx in order {
sorted.push(slots[idx].take().unwrap());
}
cursors = sorted;
}
let mut candidates = std::collections::VecDeque::new();
if let Some(first) = cursors.first_mut()
&& let Some(pk) = first.get_current_primary_key_only()?
{
candidates.push_back(pk);
while first.get_next_dup()? == OperationStatus::Success {
if let Some(pk_extra) = first.get_current_primary_key_only()? {
candidates.push_back(pk_extra);
}
}
}
let exhausted = candidates.is_empty();
Ok(Self { primary_db, cursors, config, candidates, exhausted })
}
pub fn get_next(
&mut self,
key: &mut DatabaseEntry,
data: &mut DatabaseEntry,
) -> Result<OperationStatus> {
loop {
let candidate = match self.next_matching_candidate()? {
Some(c) => c,
None => return Ok(OperationStatus::NotFound),
};
let pri_key_entry = DatabaseEntry::from_bytes(&candidate);
let status = self.primary_db.get(None, &pri_key_entry, data)?;
if status != OperationStatus::Success {
continue;
}
key.set_data(&candidate);
return Ok(OperationStatus::Success);
}
}
pub fn get_next_key(
&mut self,
key: &mut DatabaseEntry,
) -> Result<OperationStatus> {
match self.next_matching_candidate()? {
None => Ok(OperationStatus::NotFound),
Some(candidate) => {
key.set_data(&candidate);
Ok(OperationStatus::Success)
}
}
}
pub fn close(self) {
}
pub fn get_database(&self) -> &Database {
self.primary_db
}
pub fn get_config(&self) -> JoinConfig {
self.config.clone()
}
fn next_matching_candidate(&mut self) -> Result<Option<Vec<u8>>> {
if self.exhausted {
return Ok(None);
}
loop {
if self.candidates.is_empty() {
match self.cursors[0].get_next_dup()? {
OperationStatus::Success => {
if let Some(pk) =
self.cursors[0].get_current_primary_key_only()?
{
self.candidates.push_back(pk);
}
}
_ => {
self.exhausted = true;
return Ok(None);
}
}
}
let candidate = match self.candidates.pop_front() {
Some(c) => c,
None => {
self.exhausted = true;
return Ok(None);
}
};
let mut all_match = true;
for cursor in &mut self.cursors[1..] {
if !cursor.has_candidate_primary_key(&candidate)? {
all_match = false;
break;
}
}
if all_match {
return Ok(Some(candidate));
}
}
}
}
impl Drop for JoinCursor<'_> {
fn drop(&mut self) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::Database;
use crate::database_config::DatabaseConfig;
use crate::environment::Environment;
use crate::environment_config::EnvironmentConfig;
use crate::secondary_config::{SecondaryConfig, SecondaryKeyCreator};
use crate::secondary_database::SecondaryDatabase;
use noxu_sync::Mutex;
use std::sync::Arc;
use tempfile::TempDir;
struct FirstByteCreator;
impl SecondaryKeyCreator for FirstByteCreator {
fn create_secondary_key(
&self,
_db: &Database,
_key: &DatabaseEntry,
data: &DatabaseEntry,
result: &mut DatabaseEntry,
) -> bool {
if let Some(d) = data.get_data()
&& !d.is_empty()
{
result.set_data(&d[..1]);
true
} else {
false
}
}
}
struct LastByteCreator;
impl SecondaryKeyCreator for LastByteCreator {
fn create_secondary_key(
&self,
_db: &Database,
_key: &DatabaseEntry,
data: &DatabaseEntry,
result: &mut DatabaseEntry,
) -> bool {
if let Some(d) = data.get_data()
&& !d.is_empty()
{
result.set_data(&d[d.len() - 1..]);
true
} else {
false
}
}
}
struct Fixture {
_tmp: TempDir,
_env: Environment,
primary: Arc<Mutex<Database>>,
sec1: SecondaryDatabase,
sec2: SecondaryDatabase,
}
impl Fixture {
fn new() -> Self {
let tmp = TempDir::new().unwrap();
let env_cfg = EnvironmentConfig::new(tmp.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true);
let env = Environment::open(env_cfg).unwrap();
let db_cfg = DatabaseConfig::new().with_allow_create(true);
let pri_db = env.open_database(None, "primary", &db_cfg).unwrap();
let primary = Arc::new(Mutex::new(pri_db));
let sec_db_cfg = DatabaseConfig::new()
.with_allow_create(true)
.with_sorted_duplicates(true);
let sec1_store =
env.open_database(None, "sec1", &sec_db_cfg).unwrap();
let sec1 = SecondaryDatabase::open(
Arc::clone(&primary),
sec1_store,
SecondaryConfig::new()
.with_allow_create(true)
.with_key_creator(Box::new(FirstByteCreator)),
)
.unwrap();
let sec2_store =
env.open_database(None, "sec2", &sec_db_cfg).unwrap();
let sec2 = SecondaryDatabase::open(
Arc::clone(&primary),
sec2_store,
SecondaryConfig::new()
.with_allow_create(true)
.with_key_creator(Box::new(LastByteCreator)),
)
.unwrap();
Fixture { _tmp: tmp, _env: env, primary, sec1, sec2 }
}
fn insert(&self, pk: &[u8], val: &[u8]) {
let k = DatabaseEntry::from_bytes(pk);
let v = DatabaseEntry::from_bytes(val);
self.primary.lock().put(None, &k, &v).unwrap();
self.sec1.update_secondary(None, &k, None, Some(&v)).unwrap();
self.sec2.update_secondary(None, &k, None, Some(&v)).unwrap();
}
}
#[test]
#[ignore = "requires v1.6 sorted-dup secondaries; see Decision 1B / audit F7"]
fn test_join_intersection_finds_single_match() {
let fix = Fixture::new();
fix.insert(b"pk2", b"AC");
fix.insert(b"pk3", b"XB");
fix.insert(b"pk1", b"AB");
let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let s = cursor1
.get_search_key(
&DatabaseEntry::from_bytes(b"A"),
&mut p_key,
&mut data,
)
.unwrap();
assert_eq!(s, OperationStatus::Success);
}
let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let s = cursor2
.get_search_key(
&DatabaseEntry::from_bytes(b"B"),
&mut p_key,
&mut data,
)
.unwrap();
assert_eq!(s, OperationStatus::Success);
}
let pri_guard = fix.primary.lock();
let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
let mut key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = join.get_next(&mut key, &mut data).unwrap();
assert_eq!(status, OperationStatus::Success);
assert_eq!(key.get_data().unwrap(), b"pk1");
assert_eq!(data.get_data().unwrap(), b"AB");
let status2 = join.get_next(&mut key, &mut data).unwrap();
assert_eq!(status2, OperationStatus::NotFound);
}
#[test]
fn test_join_empty_cursor_returns_not_found() {
let fix = Fixture::new();
let cursor1 = fix.sec1.open_cursor(None, None).unwrap();
let cursor2 = fix.sec2.open_cursor(None, None).unwrap();
let pri_guard = fix.primary.lock();
let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
let mut key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = join.get_next(&mut key, &mut data).unwrap();
assert_eq!(status, OperationStatus::NotFound);
}
#[test]
fn test_join_get_next_key_only() {
let fix = Fixture::new();
fix.insert(b"mypk", b"AB");
let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor1
.get_search_key(
&DatabaseEntry::from_bytes(b"A"),
&mut p_key,
&mut data,
)
.unwrap();
}
let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor2
.get_search_key(
&DatabaseEntry::from_bytes(b"B"),
&mut p_key,
&mut data,
)
.unwrap();
}
let pri_guard = fix.primary.lock();
let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
let mut key = DatabaseEntry::new();
let status = join.get_next_key(&mut key).unwrap();
assert_eq!(status, OperationStatus::Success);
assert_eq!(key.get_data().unwrap(), b"mypk");
}
#[test]
fn test_join_config_no_sort() {
let fix = Fixture::new();
fix.insert(b"pk1", b"AB");
let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor1
.get_search_key(
&DatabaseEntry::from_bytes(b"A"),
&mut p_key,
&mut data,
)
.unwrap();
}
let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor2
.get_search_key(
&DatabaseEntry::from_bytes(b"B"),
&mut p_key,
&mut data,
)
.unwrap();
}
let config = JoinConfig::new().with_no_sort(true);
let pri_guard = fix.primary.lock();
let mut join =
pri_guard.join(vec![cursor1, cursor2], Some(config)).unwrap();
assert!(join.get_config().no_sort);
let mut key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = join.get_next(&mut key, &mut data).unwrap();
assert_eq!(status, OperationStatus::Success);
assert_eq!(key.get_data().unwrap(), b"pk1");
}
#[test]
fn test_join_no_intersection() {
let fix = Fixture::new();
fix.insert(b"pk1", b"AA");
fix.insert(b"pk2", b"BB");
let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor1
.get_search_key(
&DatabaseEntry::from_bytes(b"A"),
&mut p_key,
&mut data,
)
.unwrap();
}
let mut cursor2 = fix.sec2.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor2
.get_search_key(
&DatabaseEntry::from_bytes(b"B"),
&mut p_key,
&mut data,
)
.unwrap();
}
let pri_guard = fix.primary.lock();
let mut join = pri_guard.join(vec![cursor1, cursor2], None).unwrap();
let mut key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = join.get_next(&mut key, &mut data).unwrap();
assert_eq!(status, OperationStatus::NotFound);
}
#[test]
fn test_join_single_cursor() {
let fix = Fixture::new();
fix.insert(b"pk1", b"AB");
let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor1
.get_search_key(
&DatabaseEntry::from_bytes(b"A"),
&mut p_key,
&mut data,
)
.unwrap();
}
let pri_guard = fix.primary.lock();
let mut join = pri_guard.join(vec![cursor1], None).unwrap();
let mut key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = join.get_next(&mut key, &mut data).unwrap();
assert_eq!(status, OperationStatus::Success);
assert_eq!(key.get_data().unwrap(), b"pk1");
}
#[test]
fn test_join_get_database() {
let fix = Fixture::new();
fix.insert(b"pk1", b"AB");
let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor1
.get_search_key(
&DatabaseEntry::from_bytes(b"A"),
&mut p_key,
&mut data,
)
.unwrap();
}
let pri_guard = fix.primary.lock();
let join = pri_guard.join(vec![cursor1], None).unwrap();
assert_eq!(join.get_database().get_database_name(), "primary");
}
#[test]
fn test_join_close() {
let fix = Fixture::new();
fix.insert(b"pk1", b"AB");
let mut cursor1 = fix.sec1.open_cursor(None, None).unwrap();
{
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
cursor1
.get_search_key(
&DatabaseEntry::from_bytes(b"A"),
&mut p_key,
&mut data,
)
.unwrap();
}
let pri_guard = fix.primary.lock();
let join = pri_guard.join(vec![cursor1], None).unwrap();
join.close(); }
}