iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;

pub fn create_bitmap_index(
    handle: &mut StorageHandle,
    index_name: &str,
    field_path: &str,
) -> Result<()> {
    handle
        .manifest
        .register_bitmap_index(index_name, field_path)?;
    handle.manifest.persist()?;
    Ok(())
}

pub fn list_bitmap_indexes(handle: &StorageHandle) -> Vec<manifest::BitmapIndexDescriptor> {
    handle.manifest.bitmap_indexes()
}

pub fn bitmap_add_posting(
    handle: &mut StorageHandle,
    index_name: &str,
    value_key: &str,
    node_id: u64,
) -> Result<()> {
    apply_bitmap_posting(handle, index_name, value_key, node_id, true)?;
    Ok(())
}

pub fn bitmap_postings(handle: &StorageHandle, index_name: &str, value_key: &str) -> Vec<u64> {
    handle.bitmap_store.postings(index_name, value_key)
}

pub fn bitmap_postings_in_range_limit(
    handle: &StorageHandle,
    index_name: &str,
    value_key: &str,
    range_start: u64,
    range_end_exclusive: u64,
    limit: Option<usize>,
) -> Vec<u64> {
    handle.bitmap_store.postings_in_range_limit(
        index_name,
        value_key,
        range_start,
        range_end_exclusive,
        limit,
    )
}

pub fn bitmap_postings_in_range_limit_for_request(
    handle: &StorageHandle,
    index_name: &str,
    value_key: &str,
    range_start: u64,
    range_end_exclusive: u64,
    limit: Option<usize>,
    request: &ThreadCoreRequest,
) -> Vec<u64> {
    let postings = bitmap_postings_in_range_limit(
        handle,
        index_name,
        value_key,
        range_start,
        range_end_exclusive,
        limit,
    );
    let mut filtered = Vec::new();
    for node_id in postings {
        if request_owns_node(request, node_id) {
            filtered.push(node_id);
            if let Some(limit) = limit {
                if filtered.len() >= limit {
                    break;
                }
            }
        }
    }
    filtered
}

pub fn get_logical_node(handle: &mut StorageHandle, node_id: u64) -> Result<LogicalNode> {
    if let Some(cached) = cache_get_logical_node(handle, node_id) {
        return Ok(cached);
    }
    let mut runs: Vec<Vec<sstable::Entry>> = Vec::new();

    let mem_entries = handle.memtable.entries_for_key(node_id);
    if !mem_entries.is_empty() {
        runs.push(mem_entries);
    }

    let run_paths: Vec<std::path::PathBuf> = handle.l0_runs.iter().rev().cloned().collect();
    for path in run_paths {
        let entries = read_entries_for_key_from_run(handle, &path, node_id)?;
        if !entries.is_empty() {
            runs.push(entries);
        }
    }

    if runs.is_empty() {
        return Ok(LogicalNode {
            node_id,
            full: None,
            deltas: Vec::new(),
        });
    }

    let mut all_entries = Vec::new();
    for run in runs {
        all_entries.extend(run);
    }

    let mut full: Option<sstable::Entry> = None;
    for entry in &all_entries {
        if entry.kind == sstable::EntryKind::FullNode {
            match &full {
                Some(existing) if existing.version >= entry.version => {}
                _ => full = Some(entry.clone()),
            }
        }
    }

    let mut deltas = Vec::new();
    let min_version = full.as_ref().map(|entry| entry.version).unwrap_or(0);
    for entry in all_entries {
        if entry.kind != sstable::EntryKind::FullNode && entry.version >= min_version {
            deltas.push(entry);
        }
    }

    if let Some(full) = &full {
        handle.metrics.logical_bytes_read += full.value.len() as u64;
    }
    for delta in &deltas {
        handle.metrics.logical_bytes_read += delta.value.len() as u64;
    }

    let logical = LogicalNode {
        node_id,
        full,
        deltas,
    };
    cache_put_logical_node(handle, node_id, logical.clone());
    Ok(logical)
}

pub fn get_logical_node_for_request(
    handle: &mut StorageHandle,
    node_id: u64,
    request: &ThreadCoreRequest,
) -> Result<LogicalNode> {
    if !request_owns_node(request, node_id) {
        return Ok(LogicalNode {
            node_id,
            full: None,
            deltas: Vec::new(),
        });
    }
    get_logical_node(handle, node_id)
}

pub fn get_node_row_summary(
    handle: &mut StorageHandle,
    node_id: u64,
) -> Result<Option<NodeRowSummary>> {
    if let Some(cached) = cache_get_logical_node(handle, node_id) {
        return Ok(Some(node_row_summary_from_logical(&cached)));
    }
    let mut all_entries: Vec<sstable::Entry> = Vec::new();
    let mem_entries = handle.memtable.entries_for_key(node_id);
    all_entries.extend(mem_entries);

    let run_paths: Vec<std::path::PathBuf> = handle.l0_runs.iter().rev().cloned().collect();
    for path in run_paths {
        all_entries.extend(read_entries_for_key_from_run(handle, &path, node_id)?);
    }

    if all_entries.is_empty() {
        return Ok(None);
    }

    let mut newest_full_version = 0_u64;
    let mut has_full = false;
    let mut adjacency_degree = 0_usize;
    let mut newest_full_bytes = 0_usize;
    for entry in &all_entries {
        if entry.kind == sstable::EntryKind::FullNode
            && (!has_full || entry.version > newest_full_version)
        {
            has_full = true;
            newest_full_version = entry.version;
            adjacency_degree = entry.value.len() / 8;
            newest_full_bytes = entry.value.len();
        }
    }
    if newest_full_bytes > 0 {
        handle.metrics.logical_bytes_read += newest_full_bytes as u64;
    }

    let min_version = if has_full { newest_full_version } else { 0 };
    let mut delta_count = 0_usize;
    for entry in &all_entries {
        if entry.kind != sstable::EntryKind::FullNode && entry.version >= min_version {
            delta_count += 1;
            handle.metrics.logical_bytes_read += entry.value.len() as u64;
        }
    }

    Ok(Some(NodeRowSummary {
        has_full,
        delta_count,
        adjacency_degree,
    }))
}

pub fn get_node_row_summary_for_request(
    handle: &mut StorageHandle,
    node_id: u64,
    request: &ThreadCoreRequest,
) -> Result<Option<NodeRowSummary>> {
    if !request_owns_node(request, node_id) {
        return Ok(None);
    }
    get_node_row_summary(handle, node_id)
}

fn read_entries_for_key_from_run(
    handle: &mut StorageHandle,
    path: &std::path::Path,
    key: u64,
) -> Result<Vec<sstable::Entry>> {
    if let Some(table) = handle.sstable_cache.get(path) {
        return Ok(sstable::read_entries_for_key_in_table(table, key));
    }

    let table = sstable::read_sstable_with_reactor(path, handle.reactor.as_ref())?;
    if let Ok(len) = handle.reactor.metadata_len(path) {
        handle.metrics.sstable_bytes_read += len;
    }
    let entries = sstable::read_entries_for_key_in_table(&table, key);
    handle.sstable_cache.insert(path.to_path_buf(), table);
    Ok(entries)
}