use super::*;
#[test]
fn execute_returns_rows_from_storage() {
let base = temp_dir("runtime_execute_rows");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2, 3]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[4]).unwrap();
storage_api::flush(&mut handle).unwrap();
let typed = validate(&parse("MATCH (n) RETURN n LIMIT 10").unwrap(), &Catalog).unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 10,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert!(stream.rows.iter().any(|r| r.node_id == 1));
assert!(stream.rows.iter().any(|r| r.node_id == 2));
}
#[test]
fn execute_stops_scan_when_limit_satisfied_without_predicate() {
let base = temp_dir("runtime_execute_limit_short_circuit");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=1000 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let typed = validate(&parse("MATCH (n) RETURN n LIMIT 10").unwrap(), &Catalog).unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 1001,
morsel_size: 64,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(stream.rows.len(), 10);
assert!(stream.scanned_nodes <= 12);
}
#[test]
fn execute_applies_vector_threshold_filter() {
let base = temp_dir("runtime_execute_filter");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=10 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
let values = if node_id <= 3 { [1.0, 0.0] } else { [0.0, 1.0] };
let delta = storage_api::encode_delta(
node_id,
2,
&storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Cosine,
&values,
false,
),
);
storage_api::put_vector_delta(&mut handle, &delta).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let query = "MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > 0.95 RETURN n LIMIT 50";
let typed = validate(&parse(query).unwrap(), &Catalog).unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 20,
morsel_size: 5,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert!(!stream.rows.is_empty());
assert!(stream
.rows
.iter()
.all(|row| row.score.unwrap_or(0.0) > 0.95));
}
#[test]
fn execute_prefers_scan_fallback_for_small_limit_unique_space_cosine_queries() {
let base = temp_dir("runtime_execute_small_limit_scan_gate");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=512 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
let delta = storage_api::encode_delta(
node_id,
2,
&storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Cosine,
&[1.0, 0.0],
false,
),
);
storage_api::put_vector_delta(&mut handle, &delta).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let typed = validate(
&parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > -1 RETURN n LIMIT 50")
.unwrap(),
&Catalog,
)
.unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 600,
morsel_size: 256,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(stream.rows.len(), 50);
assert!(stream.scanned_nodes > stream.rows.len() as u64);
assert!(stream.scanned_nodes <= 512);
}
#[test]
fn execute_allows_ann_for_large_limit_unique_space_cosine_queries() {
let base = temp_dir("runtime_execute_large_limit_ann_gate");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=512 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
let delta = storage_api::encode_delta(
node_id,
2,
&storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Cosine,
&[1.0, 0.0],
false,
),
);
storage_api::put_vector_delta(&mut handle, &delta).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let typed = validate(
&parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > -1 RETURN n LIMIT 300")
.unwrap(),
&Catalog,
)
.unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 600,
morsel_size: 128,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert!(!stream.rows.is_empty());
assert_eq!(stream.scanned_nodes, stream.rows.len() as u64);
assert_eq!(stream.morsels_processed, 1);
}
#[test]
fn execute_uses_bitmap_candidates_to_prune_scan() {
let base = temp_dir("runtime_execute_bitmap_scan");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::create_bitmap_index(&mut handle, "idx_country", "n.country").unwrap();
for node_id in 1..=10 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
}
storage_api::bitmap_add_posting(&mut handle, "idx_country", "US", 3).unwrap();
storage_api::bitmap_add_posting(&mut handle, "idx_country", "US", 7).unwrap();
storage_api::flush(&mut handle).unwrap();
let typed = validate(
&parse("MATCH (n) WHERE bitmap.contains(idx_country, US) = 1 RETURN n LIMIT 10").unwrap(),
&Catalog,
)
.unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 20,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![3, 7]);
assert_eq!(stream.scanned_nodes, 2);
}
#[test]
fn execute_with_request_scans_only_owned_shard_nodes() {
let base = temp_dir("runtime_execute_with_request_scope");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=12 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let typed = validate(&parse("MATCH (n) RETURN n LIMIT 50").unwrap(), &Catalog).unwrap();
let plan = explain(&typed).unwrap();
let request = storage_api::ThreadCoreRequest {
core_id: 1,
shard_count: 4,
};
let stream = execute_with_request(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 13,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
&request,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1, 5, 9]);
assert_eq!(stream.scanned_nodes, 3);
}
#[test]
fn bitmap_query_matches_expected_rows_and_reduces_scan_work() {
let base = temp_dir("runtime_execute_bitmap_regression");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::create_bitmap_index(&mut handle, "idx_country", "n.country").unwrap();
for node_id in 1..=200 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
}
for node_id in [7_u64, 23, 88, 144, 199] {
storage_api::bitmap_add_posting(&mut handle, "idx_country", "US", node_id).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let bitmap_typed = validate(
&parse("MATCH (n) WHERE bitmap.contains(idx_country, US) = 1 RETURN n LIMIT 500").unwrap(),
&Catalog,
)
.unwrap();
let bitmap_plan = explain(&bitmap_typed).unwrap();
let bitmap_stream = execute(
&bitmap_plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 512,
morsel_size: 16,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let scan_typed = validate(&parse("MATCH (n) RETURN n LIMIT 500").unwrap(), &Catalog).unwrap();
let scan_plan = explain(&scan_typed).unwrap();
let scan_stream = execute(
&scan_plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 512,
morsel_size: 16,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let mut bitmap_ids: Vec<u64> = bitmap_stream.rows.iter().map(|row| row.node_id).collect();
bitmap_ids.sort_unstable();
let expected_ids = vec![7_u64, 23, 88, 144, 199];
assert_eq!(bitmap_ids, expected_ids);
let mut scan_expected: Vec<u64> = scan_stream
.rows
.iter()
.map(|row| row.node_id)
.filter(|id| expected_ids.contains(id))
.collect();
scan_expected.sort_unstable();
assert_eq!(scan_expected, expected_ids);
assert!(bitmap_stream.scanned_nodes < scan_stream.scanned_nodes);
assert_eq!(bitmap_stream.scanned_nodes, expected_ids.len() as u64);
}
#[test]
fn execute_uses_real_embedding_cosine_scores_when_available() {
let base = temp_dir("runtime_execute_real_scores");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
let v1 = storage_api::encode_delta(1, 2, &encode_vector_payload(&[1.0, 0.0]));
let v2 = storage_api::encode_delta(2, 2, &encode_vector_payload(&[0.0, 1.0]));
storage_api::put_vector_delta(&mut handle, &v1).unwrap();
storage_api::put_vector_delta(&mut handle, &v2).unwrap();
let typed = validate(
&parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > 0.9 RETURN n LIMIT 10")
.unwrap(),
&Catalog,
)
.unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 10,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1]);
assert!(stream.rows[0].score.unwrap_or(0.0) > 0.99);
}
#[test]
fn execute_supports_legacy_raw_f32_vectors_after_reopen() {
let base = temp_dir("runtime_execute_legacy_vectors");
{
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
let legacy = storage_api::encode_delta(
1,
2,
&storage_api::encode_legacy_vector_payload_f32(&[1.0, 0.0]),
);
storage_api::put_vector_delta(&mut handle, &legacy).unwrap();
storage_api::flush(&mut handle).unwrap();
}
let mut reopened = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let typed = validate(
&parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > 0.9 RETURN n LIMIT 10")
.unwrap(),
&Catalog,
)
.unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(&plan, &ExecuteParams::default(), &mut reopened).unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1]);
}
#[test]
fn execute_uses_ann_for_unique_matching_space_when_multiple_spaces_exist() {
let base = temp_dir("runtime_execute_multi_space_ann");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::put_full_node(&mut handle, 10, 1, &[11]).unwrap();
let v1 = storage_api::encode_delta(
1,
2,
&storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Cosine,
&[1.0, 0.0],
false,
),
);
let v2 = storage_api::encode_delta(
2,
2,
&storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Cosine,
&[0.0, 1.0],
false,
),
);
let v3 = storage_api::encode_delta(
10,
2,
&storage_api::encode_vector_payload_f32(
2,
storage_api::VectorMetric::Cosine,
&[1.0, 0.0, 0.0],
false,
),
);
storage_api::put_vector_delta(&mut handle, &v1).unwrap();
storage_api::put_vector_delta(&mut handle, &v2).unwrap();
storage_api::put_vector_delta(&mut handle, &v3).unwrap();
storage_api::flush(&mut handle).unwrap();
let typed = validate(
&parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > 0.9 RETURN n LIMIT 10")
.unwrap(),
&Catalog,
)
.unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(&plan, &ExecuteParams::default(), &mut handle).unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1]);
assert!(stream.scanned_nodes <= 2);
}
#[test]
fn execute_supports_quantized_vectors_after_reopen() {
let base = temp_dir("runtime_execute_quantized_vectors");
{
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
let quantized = storage_api::encode_delta(
1,
2,
&storage_api::encode_vector_payload_quantized_i8(
3,
storage_api::VectorMetric::Cosine,
&[1.0, 0.0],
false,
)
.unwrap(),
);
storage_api::put_vector_delta(&mut handle, &quantized).unwrap();
storage_api::flush(&mut handle).unwrap();
}
let mut reopened = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let typed = validate(
&parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > 0.9 RETURN n LIMIT 10")
.unwrap(),
&Catalog,
)
.unwrap();
let plan = explain(&typed).unwrap();
let stream = execute(&plan, &ExecuteParams::default(), &mut reopened).unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1]);
}