mod common;
use common::pgwire_harness::TestServer;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn refresh_applies_projection() {
let server = TestServer::start().await;
server.exec("CREATE COLLECTION mv_proj_src").await.unwrap();
server
.exec("INSERT INTO mv_proj_src { id: 'r1', a: 1, b: 10, c: 100 }")
.await
.unwrap();
server
.exec("INSERT INTO mv_proj_src { id: 'r2', a: 2, b: 20, c: 200 }")
.await
.unwrap();
server
.exec("CREATE MATERIALIZED VIEW mv_proj ON mv_proj_src AS SELECT id, a FROM mv_proj_src")
.await
.unwrap();
server
.exec("REFRESH MATERIALIZED VIEW mv_proj")
.await
.unwrap();
let rows = server
.query_named_rows("SELECT * FROM mv_proj")
.await
.unwrap();
assert_eq!(rows.len(), 2, "view must have both projected rows");
for row in &rows {
assert!(
row.contains_key("id") && row.contains_key("a"),
"projected columns id+a must be present in {row:?}"
);
assert!(
!row.contains_key("b") && !row.contains_key("c"),
"unprojected columns must not leak into view row {row:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn refresh_applies_where_filter() {
let server = TestServer::start().await;
server.exec("CREATE COLLECTION mv_filt_src").await.unwrap();
server
.exec("INSERT INTO mv_filt_src { id: 'r1', a: 1, b: 1 }")
.await
.unwrap();
server
.exec("INSERT INTO mv_filt_src { id: 'r2', a: 2, b: 99 }")
.await
.unwrap();
server
.exec(
"CREATE MATERIALIZED VIEW mv_filt ON mv_filt_src AS \
SELECT id, a FROM mv_filt_src WHERE b > 5",
)
.await
.unwrap();
server
.exec("REFRESH MATERIALIZED VIEW mv_filt")
.await
.unwrap();
let rows = server
.query_named_rows("SELECT * FROM mv_filt")
.await
.unwrap();
assert_eq!(
rows.len(),
1,
"only the row matching WHERE b > 5 may appear"
);
assert_eq!(
rows[0].get("id").map(String::as_str),
Some("r2"),
"surviving row must be r2"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn refresh_applies_group_by_aggregate() {
let server = TestServer::start().await;
server.exec("CREATE COLLECTION mv_agg_src").await.unwrap();
server
.exec("INSERT INTO mv_agg_src { id: 'r1', bucket: 'x', n: 1 }")
.await
.unwrap();
server
.exec("INSERT INTO mv_agg_src { id: 'r2', bucket: 'x', n: 1 }")
.await
.unwrap();
server
.exec("INSERT INTO mv_agg_src { id: 'r3', bucket: 'y', n: 1 }")
.await
.unwrap();
server
.exec(
"CREATE MATERIALIZED VIEW mv_agg ON mv_agg_src AS \
SELECT bucket, COUNT(*) AS cnt FROM mv_agg_src GROUP BY bucket",
)
.await
.unwrap();
server
.exec("REFRESH MATERIALIZED VIEW mv_agg")
.await
.unwrap();
let rows = server
.query_named_rows("SELECT * FROM mv_agg")
.await
.unwrap();
assert_eq!(
rows.len(),
2,
"GROUP BY must aggregate to one row per group"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn refresh_applies_join() {
let server = TestServer::start().await;
server.exec("CREATE COLLECTION mv_join_left").await.unwrap();
server
.exec("CREATE COLLECTION mv_join_right")
.await
.unwrap();
server
.exec("INSERT INTO mv_join_left { id: 'l1', rid: 'r1', name: 'alice' }")
.await
.unwrap();
server
.exec("INSERT INTO mv_join_left { id: 'l2', rid: 'r2', name: 'bob' }")
.await
.unwrap();
server
.exec("INSERT INTO mv_join_right { id: 'r1', tag: 'vip' }")
.await
.unwrap();
server
.exec(
"CREATE MATERIALIZED VIEW mv_join ON mv_join_left AS \
SELECT l.id AS id, r.tag AS tag \
FROM mv_join_left l JOIN mv_join_right r ON l.rid = r.id",
)
.await
.unwrap();
server
.exec("REFRESH MATERIALIZED VIEW mv_join")
.await
.unwrap();
let rows = server
.query_named_rows("SELECT * FROM mv_join")
.await
.unwrap();
assert_eq!(
rows.len(),
1,
"INNER JOIN view must materialize only the matched pair, got {rows:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn refresh_removes_rows_excluded_by_where() {
let server = TestServer::start().await;
server
.exec("CREATE COLLECTION mv_narrow_src")
.await
.unwrap();
server
.exec("INSERT INTO mv_narrow_src { id: 'r1', keep: true }")
.await
.unwrap();
server
.exec("INSERT INTO mv_narrow_src { id: 'r2', keep: true }")
.await
.unwrap();
server
.exec(
"CREATE MATERIALIZED VIEW mv_narrow ON mv_narrow_src AS \
SELECT id FROM mv_narrow_src WHERE keep = true",
)
.await
.unwrap();
server
.exec("REFRESH MATERIALIZED VIEW mv_narrow")
.await
.unwrap();
server
.exec("UPDATE mv_narrow_src SET keep = false WHERE id = 'r1'")
.await
.unwrap();
server
.exec("REFRESH MATERIALIZED VIEW mv_narrow")
.await
.unwrap();
let rows = server
.query_named_rows("SELECT * FROM mv_narrow")
.await
.unwrap();
assert_eq!(rows.len(), 1, "WHERE-excluded row must be removed");
assert_eq!(
rows[0].get("id").map(String::as_str),
Some("r2"),
"surviving row after narrowed WHERE must be r2"
);
}