mod common;
use common::setup_provider;
use xlog_core::{ScalarType, Schema};
use xlog_cuda::JoinType;
fn make_schema(cols: &[(&str, ScalarType)]) -> Schema {
Schema::new(cols.iter().map(|(n, t)| (n.to_string(), *t)).collect())
}
#[test]
fn test_hash_join_v2_single_key() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left_key: Vec<u32> = vec![1, 2, 3];
let left_payload: Vec<u32> = vec![10, 20, 30];
let right_key: Vec<u32> = vec![2, 3, 4];
let right_value: Vec<u32> = vec![200, 300, 400];
let left_schema = make_schema(&[("key", ScalarType::U32), ("payload", ScalarType::U32)]);
let right_schema = make_schema(&[("key", ScalarType::U32), ("value", ScalarType::U32)]);
let left = provider
.create_buffer_from_u32_columns(&[&left_key, &left_payload], left_schema)
.unwrap();
let right = provider
.create_buffer_from_u32_columns(&[&right_key, &right_value], right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left, &right, &[0], &[0], JoinType::Inner)
.unwrap();
let rows = provider.download_column::<u32>(&result, 0).unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(result.arity(), 4);
}
#[test]
fn test_semi_join() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left: Vec<u32> = vec![1, 2, 3, 4];
let right: Vec<u32> = vec![2, 4];
let left_schema = make_schema(&[("val", ScalarType::U32)]);
let right_schema = make_schema(&[("val", ScalarType::U32)]);
let left_buf = provider
.create_buffer_from_slice::<u32>(&left, left_schema)
.unwrap();
let right_buf = provider
.create_buffer_from_slice::<u32>(&right, right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left_buf, &right_buf, &[0], &[0], JoinType::Semi)
.unwrap();
let vals = provider.download_column::<u32>(&result, 0).unwrap();
assert_eq!(vals.len(), 2);
assert!(vals.contains(&2));
assert!(vals.contains(&4));
}
#[test]
fn test_anti_join() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left: Vec<u32> = vec![1, 2, 3, 4];
let right: Vec<u32> = vec![2, 4];
let left_schema = make_schema(&[("val", ScalarType::U32)]);
let right_schema = make_schema(&[("val", ScalarType::U32)]);
let left_buf = provider
.create_buffer_from_slice::<u32>(&left, left_schema)
.unwrap();
let right_buf = provider
.create_buffer_from_slice::<u32>(&right, right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left_buf, &right_buf, &[0], &[0], JoinType::Anti)
.unwrap();
let vals = provider.download_column::<u32>(&result, 0).unwrap();
assert_eq!(vals.len(), 2);
assert!(vals.contains(&1));
assert!(vals.contains(&3));
}
#[test]
fn test_join_no_matches() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left: Vec<u32> = vec![1, 2, 3];
let right: Vec<u32> = vec![4, 5, 6];
let left_schema = make_schema(&[("val", ScalarType::U32)]);
let right_schema = make_schema(&[("val", ScalarType::U32)]);
let left_buf = provider
.create_buffer_from_slice::<u32>(&left, left_schema)
.unwrap();
let right_buf = provider
.create_buffer_from_slice::<u32>(&right, right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left_buf, &right_buf, &[0], &[0], JoinType::Inner)
.unwrap();
let vals = provider.download_column::<u32>(&result, 0).unwrap();
assert!(vals.is_empty());
}
#[test]
fn test_inner_join_with_duplicates() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left: Vec<u32> = vec![1, 2, 2];
let right: Vec<u32> = vec![2, 2];
let left_schema = make_schema(&[("key", ScalarType::U32)]);
let right_schema = make_schema(&[("key", ScalarType::U32)]);
let left_buf = provider
.create_buffer_from_slice::<u32>(&left, left_schema)
.unwrap();
let right_buf = provider
.create_buffer_from_slice::<u32>(&right, right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left_buf, &right_buf, &[0], &[0], JoinType::Inner)
.unwrap();
let vals = provider.download_column::<u32>(&result, 0).unwrap();
assert_eq!(vals.len(), 4);
}
#[test]
fn test_semi_join_preserves_left_columns() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left_key: Vec<u32> = vec![1, 2, 3];
let left_payload: Vec<u32> = vec![10, 20, 30];
let right_key: Vec<u32> = vec![2, 3, 4];
let left_schema = make_schema(&[("key", ScalarType::U32), ("payload", ScalarType::U32)]);
let right_schema = make_schema(&[("key", ScalarType::U32)]);
let left = provider
.create_buffer_from_u32_columns(&[&left_key, &left_payload], left_schema)
.unwrap();
let right = provider
.create_buffer_from_slice::<u32>(&right_key, right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left, &right, &[0], &[0], JoinType::Semi)
.unwrap();
assert_eq!(result.arity(), 2);
let keys = provider.download_column::<u32>(&result, 0).unwrap();
let payloads = provider.download_column::<u32>(&result, 1).unwrap();
assert_eq!(keys.len(), 2);
assert!(keys.contains(&2));
assert!(keys.contains(&3));
for i in 0..keys.len() {
if keys[i] == 2 {
assert_eq!(payloads[i], 20);
} else if keys[i] == 3 {
assert_eq!(payloads[i], 30);
}
}
}
#[test]
fn test_left_outer_join() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left: Vec<u32> = vec![1, 2, 3];
let right: Vec<u32> = vec![2];
let left_schema = make_schema(&[("lval", ScalarType::U32)]);
let right_schema = make_schema(&[("rval", ScalarType::U32)]);
let left_buf = provider
.create_buffer_from_slice::<u32>(&left, left_schema)
.unwrap();
let right_buf = provider
.create_buffer_from_slice::<u32>(&right, right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left_buf, &right_buf, &[0], &[0], JoinType::LeftOuter)
.unwrap();
assert_eq!(result.arity(), 2);
let left_vals = provider.download_column::<u32>(&result, 0).unwrap();
let right_vals = provider.download_column::<u32>(&result, 1).unwrap();
assert_eq!(left_vals.len(), 3);
assert!(left_vals.contains(&1));
assert!(left_vals.contains(&2));
assert!(left_vals.contains(&3));
for i in 0..left_vals.len() {
if left_vals[i] == 2 {
assert_eq!(right_vals[i], 2);
} else {
assert_eq!(right_vals[i], 0);
}
}
}
#[test]
fn test_left_outer_all_unmatched() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left: Vec<u32> = vec![1, 2, 3];
let right: Vec<u32> = vec![4, 5];
let left_schema = make_schema(&[("lval", ScalarType::U32)]);
let right_schema = make_schema(&[("rval", ScalarType::U32)]);
let left_buf = provider
.create_buffer_from_slice::<u32>(&left, left_schema)
.unwrap();
let right_buf = provider
.create_buffer_from_slice::<u32>(&right, right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left_buf, &right_buf, &[0], &[0], JoinType::LeftOuter)
.unwrap();
let right_vals = provider.download_column::<u32>(&result, 1).unwrap();
assert_eq!(right_vals.len(), 3);
assert!(right_vals.iter().all(|&v| v == 0));
}
#[test]
fn test_multi_column_join() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left_a: Vec<u32> = vec![1, 1, 2];
let left_b: Vec<u32> = vec![10, 20, 10];
let left_p: Vec<u32> = vec![100, 200, 300];
let right_x: Vec<u32> = vec![1, 2, 1];
let right_y: Vec<u32> = vec![10, 10, 20];
let right_v: Vec<u32> = vec![1000, 2000, 3000];
let left_schema = make_schema(&[
("a", ScalarType::U32),
("b", ScalarType::U32),
("payload", ScalarType::U32),
]);
let right_schema = make_schema(&[
("x", ScalarType::U32),
("y", ScalarType::U32),
("value", ScalarType::U32),
]);
let left = provider
.create_buffer_from_u32_columns(&[&left_a, &left_b, &left_p], left_schema)
.unwrap();
let right = provider
.create_buffer_from_u32_columns(&[&right_x, &right_y, &right_v], right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left, &right, &[0, 1], &[0, 1], JoinType::Inner)
.unwrap();
let rows = provider.download_column::<u32>(&result, 0).unwrap();
assert_eq!(rows.len(), 3, "Should have 3 matches for multi-column join");
}
#[test]
fn test_left_outer_empty_right() {
let Some(provider) = setup_provider() else {
eprintln!("Skipping: no CUDA device");
return;
};
let left: Vec<u32> = vec![1, 2, 3];
let right: Vec<u32> = vec![];
let left_schema = make_schema(&[("lval", ScalarType::U32)]);
let right_schema = make_schema(&[("rval", ScalarType::U32)]);
let left_buf = provider
.create_buffer_from_slice::<u32>(&left, left_schema)
.unwrap();
let right_buf = provider
.create_buffer_from_slice::<u32>(&right, right_schema)
.unwrap();
let result = provider
.hash_join_v2(&left_buf, &right_buf, &[0], &[0], JoinType::LeftOuter)
.unwrap();
assert_eq!(result.arity(), 2);
let right_vals = provider.download_column::<u32>(&result, 1).unwrap();
assert_eq!(right_vals.len(), 3);
assert!(right_vals.iter().all(|&v| v == 0));
}