use super::*;
pub fn create_bitmap_index(
handle: &mut StorageHandle,
index_name: &str,
field_path: &str,
) -> Result<()> {
handle
.manifest
.register_bitmap_index(index_name, field_path)?;
handle.manifest.persist()?;
Ok(())
}
pub fn list_bitmap_indexes(handle: &StorageHandle) -> Vec<manifest::BitmapIndexDescriptor> {
handle.manifest.bitmap_indexes()
}
pub fn bitmap_add_posting(
handle: &mut StorageHandle,
index_name: &str,
value_key: &str,
node_id: u64,
) -> Result<()> {
apply_bitmap_posting(handle, index_name, value_key, node_id, true)?;
Ok(())
}
pub fn bitmap_postings(handle: &StorageHandle, index_name: &str, value_key: &str) -> Vec<u64> {
handle.bitmap_store.postings(index_name, value_key)
}
pub fn bitmap_postings_in_range_limit(
handle: &StorageHandle,
index_name: &str,
value_key: &str,
range_start: u64,
range_end_exclusive: u64,
limit: Option<usize>,
) -> Vec<u64> {
handle.bitmap_store.postings_in_range_limit(
index_name,
value_key,
range_start,
range_end_exclusive,
limit,
)
}
pub fn bitmap_postings_in_range_limit_for_request(
handle: &StorageHandle,
index_name: &str,
value_key: &str,
range_start: u64,
range_end_exclusive: u64,
limit: Option<usize>,
request: &ThreadCoreRequest,
) -> Vec<u64> {
let postings = bitmap_postings_in_range_limit(
handle,
index_name,
value_key,
range_start,
range_end_exclusive,
limit,
);
let mut filtered = Vec::new();
for node_id in postings {
if request_owns_node(request, node_id) {
filtered.push(node_id);
if let Some(limit) = limit {
if filtered.len() >= limit {
break;
}
}
}
}
filtered
}
pub fn get_logical_node(handle: &mut StorageHandle, node_id: u64) -> Result<LogicalNode> {
if let Some(cached) = cache_get_logical_node(handle, node_id) {
return Ok(cached);
}
let mut runs: Vec<Vec<sstable::Entry>> = Vec::new();
let mem_entries = handle.memtable.entries_for_key(node_id);
if !mem_entries.is_empty() {
runs.push(mem_entries);
}
let run_paths: Vec<std::path::PathBuf> = handle.l0_runs.iter().rev().cloned().collect();
for path in run_paths {
let entries = read_entries_for_key_from_run(handle, &path, node_id)?;
if !entries.is_empty() {
runs.push(entries);
}
}
if runs.is_empty() {
return Ok(LogicalNode {
node_id,
full: None,
deltas: Vec::new(),
});
}
let mut all_entries = Vec::new();
for run in runs {
all_entries.extend(run);
}
let mut full: Option<sstable::Entry> = None;
for entry in &all_entries {
if entry.kind == sstable::EntryKind::FullNode {
match &full {
Some(existing) if existing.version >= entry.version => {}
_ => full = Some(entry.clone()),
}
}
}
let mut deltas = Vec::new();
let min_version = full.as_ref().map(|entry| entry.version).unwrap_or(0);
for entry in all_entries {
if entry.kind != sstable::EntryKind::FullNode && entry.version >= min_version {
deltas.push(entry);
}
}
if let Some(full) = &full {
handle.metrics.logical_bytes_read += full.value.len() as u64;
}
for delta in &deltas {
handle.metrics.logical_bytes_read += delta.value.len() as u64;
}
let logical = LogicalNode {
node_id,
full,
deltas,
};
cache_put_logical_node(handle, node_id, logical.clone());
Ok(logical)
}
pub fn get_logical_node_for_request(
handle: &mut StorageHandle,
node_id: u64,
request: &ThreadCoreRequest,
) -> Result<LogicalNode> {
if !request_owns_node(request, node_id) {
return Ok(LogicalNode {
node_id,
full: None,
deltas: Vec::new(),
});
}
get_logical_node(handle, node_id)
}
pub fn get_node_row_summary(
handle: &mut StorageHandle,
node_id: u64,
) -> Result<Option<NodeRowSummary>> {
if let Some(cached) = cache_get_logical_node(handle, node_id) {
return Ok(Some(node_row_summary_from_logical(&cached)));
}
let mut all_entries: Vec<sstable::Entry> = Vec::new();
let mem_entries = handle.memtable.entries_for_key(node_id);
all_entries.extend(mem_entries);
let run_paths: Vec<std::path::PathBuf> = handle.l0_runs.iter().rev().cloned().collect();
for path in run_paths {
all_entries.extend(read_entries_for_key_from_run(handle, &path, node_id)?);
}
if all_entries.is_empty() {
return Ok(None);
}
let mut newest_full_version = 0_u64;
let mut has_full = false;
let mut adjacency_degree = 0_usize;
let mut newest_full_bytes = 0_usize;
for entry in &all_entries {
if entry.kind == sstable::EntryKind::FullNode
&& (!has_full || entry.version > newest_full_version)
{
has_full = true;
newest_full_version = entry.version;
adjacency_degree = entry.value.len() / 8;
newest_full_bytes = entry.value.len();
}
}
if newest_full_bytes > 0 {
handle.metrics.logical_bytes_read += newest_full_bytes as u64;
}
let min_version = if has_full { newest_full_version } else { 0 };
let mut delta_count = 0_usize;
for entry in &all_entries {
if entry.kind != sstable::EntryKind::FullNode && entry.version >= min_version {
delta_count += 1;
handle.metrics.logical_bytes_read += entry.value.len() as u64;
}
}
Ok(Some(NodeRowSummary {
has_full,
delta_count,
adjacency_degree,
}))
}
pub fn get_node_row_summary_for_request(
handle: &mut StorageHandle,
node_id: u64,
request: &ThreadCoreRequest,
) -> Result<Option<NodeRowSummary>> {
if !request_owns_node(request, node_id) {
return Ok(None);
}
get_node_row_summary(handle, node_id)
}
fn read_entries_for_key_from_run(
handle: &mut StorageHandle,
path: &std::path::Path,
key: u64,
) -> Result<Vec<sstable::Entry>> {
if let Some(table) = handle.sstable_cache.get(path) {
return Ok(sstable::read_entries_for_key_in_table(table, key));
}
let table = sstable::read_sstable_with_reactor(path, handle.reactor.as_ref())?;
if let Ok(len) = handle.reactor.metadata_len(path) {
handle.metrics.sstable_bytes_read += len;
}
let entries = sstable::read_entries_for_key_in_table(&table, key);
handle.sstable_cache.insert(path.to_path_buf(), table);
Ok(entries)
}