use super::*;
#[test]
fn execute_fanout_merges_rows_across_shards() {
let base = temp_dir("runtime_fanout_merge");
let manifest_path = base.join("ir.manifest");
let config0 = storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: manifest_path.clone(),
sstable_dir: base.join("sst"),
};
let config1 = storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path,
sstable_dir: base.join("sst"),
};
let lanes = storage_api::ThreadCoreLaneConfig::default();
let request0 = storage_api::ThreadCoreRequest {
core_id: 0,
shard_count: 2,
};
let request1 = storage_api::ThreadCoreRequest {
core_id: 1,
shard_count: 2,
};
let mut handle0 = storage_api::open_store_for_request(config0, &request0, &lanes).unwrap();
let mut handle1 = storage_api::open_store_for_request(config1, &request1, &lanes).unwrap();
for node_id in 1..=12 {
if storage_api::request_owns_node(&request0, node_id) {
storage_api::put_full_node(&mut handle0, node_id, 1, &[node_id + 1]).unwrap();
} else {
storage_api::put_full_node(&mut handle1, node_id, 1, &[node_id + 1]).unwrap();
}
}
storage_api::flush(&mut handle0).unwrap();
storage_api::flush(&mut handle1).unwrap();
let typed = validate(&parse("MATCH (n) RETURN n LIMIT 20").unwrap(), &Catalog).unwrap();
let plan = explain(&typed).unwrap();
let mut shards = vec![
FanoutShardExecution {
request: request0.clone(),
handle: &mut handle0,
},
FanoutShardExecution {
request: request1.clone(),
handle: &mut handle1,
},
];
let out = execute_fanout(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 13,
morsel_size: 8,
parallel_workers: 1,
},
&mut shards,
)
.unwrap();
let ids: Vec<u64> = out.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, (1..=12).collect::<Vec<u64>>());
}
#[test]
fn execute_fanout_merge_is_deterministic_across_shard_order() {
let base = temp_dir("runtime_fanout_order");
let manifest_path = base.join("ir.manifest");
let lanes = storage_api::ThreadCoreLaneConfig::default();
let request0 = storage_api::ThreadCoreRequest {
core_id: 0,
shard_count: 2,
};
let request1 = storage_api::ThreadCoreRequest {
core_id: 1,
shard_count: 2,
};
let config_a = storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: manifest_path.clone(),
sstable_dir: base.join("sst"),
};
let config_b = storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path,
sstable_dir: base.join("sst"),
};
let mut handle0 = storage_api::open_store_for_request(config_a, &request0, &lanes).unwrap();
let mut handle1 = storage_api::open_store_for_request(config_b, &request1, &lanes).unwrap();
for node_id in 1..=20 {
if storage_api::request_owns_node(&request0, node_id) {
storage_api::put_full_node(&mut handle0, node_id, 1, &[node_id + 1]).unwrap();
let delta = storage_api::encode_delta(
node_id,
2,
&encode_vector_payload(&[(node_id as f32) / 20.0, 1.0]),
);
storage_api::put_vector_delta(&mut handle0, &delta).unwrap();
} else {
storage_api::put_full_node(&mut handle1, node_id, 1, &[node_id + 1]).unwrap();
let delta = storage_api::encode_delta(
node_id,
2,
&encode_vector_payload(&[(node_id as f32) / 20.0, 1.0]),
);
storage_api::put_vector_delta(&mut handle1, &delta).unwrap();
}
}
storage_api::flush(&mut handle0).unwrap();
storage_api::flush(&mut handle1).unwrap();
let typed = validate(
&parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:1) > 0.4 RETURN n LIMIT 8")
.unwrap(),
&Catalog,
)
.unwrap();
let plan = explain(&typed).unwrap();
let params = ExecuteParams {
scan_start: 1,
scan_end_exclusive: 21,
morsel_size: 8,
parallel_workers: 1,
};
let ids_a = {
let mut shards = vec![
FanoutShardExecution {
request: request0.clone(),
handle: &mut handle0,
},
FanoutShardExecution {
request: request1.clone(),
handle: &mut handle1,
},
];
let out = execute_fanout(&plan, ¶ms, &mut shards).unwrap();
out.rows
.into_iter()
.map(|row| row.node_id)
.collect::<Vec<u64>>()
};
let ids_b = {
let mut shards = vec![
FanoutShardExecution {
request: request1,
handle: &mut handle1,
},
FanoutShardExecution {
request: request0,
handle: &mut handle0,
},
];
let out = execute_fanout(&plan, ¶ms, &mut shards).unwrap();
out.rows
.into_iter()
.map(|row| row.node_id)
.collect::<Vec<u64>>()
};
assert_eq!(ids_a, ids_b);
}