mod common;
use common::pgwire_harness::TestServer;
async fn create_nums(server: &TestServer) {
server
.exec(
"CREATE COLLECTION nums TYPE DOCUMENT STRICT (\
id STRING PRIMARY KEY,\
n INT\
)",
)
.await
.unwrap();
server
.exec(
"INSERT INTO nums (id, n) VALUES \
('a',1),('b',2),('c',3),('d',4),('e',5)",
)
.await
.unwrap();
}
async fn create_tied(server: &TestServer) {
server
.exec(
"CREATE COLLECTION tied TYPE DOCUMENT STRICT (\
id STRING PRIMARY KEY,\
grp INT,\
n INT\
)",
)
.await
.unwrap();
server
.exec(
"INSERT INTO tied (id, grp, n) VALUES \
('a',1,10),('b',1,10),('c',2,20),('d',3,30),('e',3,30)",
)
.await
.unwrap();
}
fn parse_f64(s: &str) -> f64 {
s.parse().unwrap_or(f64::NAN)
}
fn col(rows: &[Vec<String>], row: usize, col: usize) -> f64 {
parse_f64(rows[row].get(col).map(String::as_str).unwrap_or("NaN"))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn rows_1_preceding_1_following_sum() {
let server = TestServer::start().await;
create_nums(&server).await;
let rows = server
.query_rows(
"SELECT id, SUM(n) OVER (ORDER BY n ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS s \
FROM nums ORDER BY n",
)
.await
.unwrap();
assert_eq!(rows.len(), 5, "rows: {rows:?}");
let expected = [3.0, 6.0, 9.0, 12.0, 9.0];
for (i, &want) in expected.iter().enumerate() {
let got = col(&rows, i, 1);
assert!(
(got - want).abs() < 1e-9,
"row {i}: got {got}, want {want}; rows={rows:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn rows_unbounded_preceding_to_current_running_sum() {
let server = TestServer::start().await;
create_nums(&server).await;
let rows = server
.query_rows(
"SELECT id, SUM(n) OVER \
(ORDER BY n ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s \
FROM nums ORDER BY n",
)
.await
.unwrap();
assert_eq!(rows.len(), 5, "rows: {rows:?}");
let expected = [1.0, 3.0, 6.0, 10.0, 15.0];
for (i, &want) in expected.iter().enumerate() {
let got = col(&rows, i, 1);
assert!(
(got - want).abs() < 1e-9,
"row {i}: got {got}, want {want}; rows={rows:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn rows_current_to_unbounded_following_reverse_sum() {
let server = TestServer::start().await;
create_nums(&server).await;
let rows = server
.query_rows(
"SELECT id, SUM(n) OVER \
(ORDER BY n ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS s \
FROM nums ORDER BY n",
)
.await
.unwrap();
assert_eq!(rows.len(), 5, "rows: {rows:?}");
let expected = [15.0, 14.0, 12.0, 9.0, 5.0];
for (i, &want) in expected.iter().enumerate() {
let got = col(&rows, i, 1);
assert!(
(got - want).abs() < 1e-9,
"row {i}: got {got}, want {want}; rows={rows:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn range_unbounded_preceding_current_row_peer_aware() {
let server = TestServer::start().await;
create_tied(&server).await;
let rows = server
.query_rows(
"SELECT id, SUM(n) OVER \
(ORDER BY n RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s \
FROM tied ORDER BY n, id",
)
.await
.unwrap();
assert_eq!(rows.len(), 5, "rows: {rows:?}");
let expected = [20.0, 20.0, 40.0, 100.0, 100.0];
for (i, &want) in expected.iter().enumerate() {
let got = col(&rows, i, 1);
assert!(
(got - want).abs() < 1e-9,
"row {i}: got {got}, want {want}; rows={rows:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn groups_1_preceding_1_following_sum() {
let server = TestServer::start().await;
create_tied(&server).await;
let rows = server
.query_rows(
"SELECT id, SUM(n) OVER \
(ORDER BY grp GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS s \
FROM tied ORDER BY grp, id",
)
.await
.unwrap();
assert_eq!(rows.len(), 5, "rows: {rows:?}");
let expected = [40.0, 40.0, 100.0, 80.0, 80.0];
for (i, &want) in expected.iter().enumerate() {
let got = col(&rows, i, 1);
assert!(
(got - want).abs() < 1e-9,
"row {i}: got {got}, want {want}; rows={rows:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn default_frame_with_order_by_is_range_unbounded_to_current() {
let server = TestServer::start().await;
create_nums(&server).await;
let rows = server
.query_rows("SELECT id, SUM(n) OVER (ORDER BY n) AS s FROM nums ORDER BY n")
.await
.unwrap();
assert_eq!(rows.len(), 5, "rows: {rows:?}");
let expected = [1.0, 3.0, 6.0, 10.0, 15.0];
for (i, &want) in expected.iter().enumerate() {
let got = col(&rows, i, 1);
assert!(
(got - want).abs() < 1e-9,
"row {i}: got {got}, want {want}; rows={rows:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn default_frame_without_order_by_is_whole_partition() {
let server = TestServer::start().await;
create_nums(&server).await;
let rows = server
.query_rows("SELECT id, SUM(n) OVER () AS s FROM nums ORDER BY n")
.await
.unwrap();
assert_eq!(rows.len(), 5, "rows: {rows:?}");
for (i, row) in rows.iter().enumerate() {
let got = col(&rows, i, 1);
assert!(
(got - 15.0).abs() < 1e-9,
"row {i}: got {got}, want 15.0; row={row:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn groups_without_order_by_is_plan_time_error() {
let server = TestServer::start().await;
create_nums(&server).await;
let result = server
.query_rows(
"SELECT id, SUM(n) OVER \
(GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s \
FROM nums ORDER BY n",
)
.await;
assert!(
result.is_err(),
"GROUPS without ORDER BY must be rejected at plan time, got: {result:?}"
);
let msg = result.unwrap_err().to_lowercase();
assert!(
msg.contains("groups") || msg.contains("order") || msg.contains("invalid"),
"error should mention GROUPS or ORDER BY: {msg}"
);
}