1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// SPDX-License-Identifier: BUSL-1.1
//! End-to-end SQL tests for bitemporal array reads via pgwire.
//!
//! Exercises the `AS OF SYSTEM TIME <ms>` and `AS OF VALID TIME <ms>`
//! qualifiers on `ARRAY_SLICE` and `ARRAY_AGG` queries.
mod common;
use common::pgwire_harness::TestServer;
/// Parse the `attrs` array from a slice result row's JSON.
///
/// Each row returned by ARRAY_SLICE is a JSON object:
/// Helper: extract the requested attribute values from an ARRAY_SLICE row.
///
/// ARRAY_SLICE projects one column per requested attribute (named after
/// the attribute) plus a `coords` column. Older versions returned a
/// single `attrs` JSON array column; this helper accepts either shape so
/// the assertions stay schema-agnostic.
fn parse_attrs(
row: &std::collections::HashMap<String, String>,
requested: &[&str],
) -> Vec<serde_json::Value> {
// Shape A: AS OF queries route through a different codec path that
// wraps the cell into a single `result` column containing the full
// JSON envelope (`{"coords": [...], "attrs": [...]}`).
if let Some(envelope_text) = row.get("result") {
let v: serde_json::Value = serde_json::from_str(envelope_text)
.unwrap_or_else(|e| panic!("result not JSON: {envelope_text}: {e}"));
if let Some(arr) = v.get("attrs").and_then(|a| a.as_array()) {
return arr.clone();
}
panic!("result envelope missing attrs array: {envelope_text}");
}
// Shape B: live ARRAY_SLICE projects a single `attrs` column carrying
// the JSON array of attribute values.
if let Some(attrs_text) = row.get("attrs") {
let v: serde_json::Value = serde_json::from_str(attrs_text)
.unwrap_or_else(|e| panic!("attrs not JSON: {attrs_text}: {e}"));
return match v {
serde_json::Value::Array(items) => items,
other => panic!("attrs not an array: {other}"),
};
}
// Shape C: per-attribute columns named after the requested attrs.
requested
.iter()
.map(|name| {
let cell = row
.get(*name)
.unwrap_or_else(|| panic!("missing attr '{name}' in {row:?}"));
serde_json::from_str(cell).unwrap_or_else(|_| serde_json::Value::String(cell.clone()))
})
.collect()
}
/// Helper: create a 1-dim array named `bt` with a single INT64 attr `v`.
async fn create_bt(srv: &TestServer) {
srv.exec(
"CREATE ARRAY bt \
DIMS (x INT64 [0..15]) \
ATTRS (v INT64) \
TILE_EXTENTS (16) \
CELL_ORDER ROW_MAJOR",
)
.await
.expect("CREATE ARRAY bt");
}
/// Helper: create a 1-dim array named `vt` with a single INT64 attr `v`.
async fn create_vt(srv: &TestServer) {
srv.exec(
"CREATE ARRAY vt \
DIMS (t INT64 [0..100]) \
ATTRS (v INT64) \
TILE_EXTENTS (101) \
CELL_ORDER ROW_MAJOR",
)
.await
.expect("CREATE ARRAY vt");
}
/// Without any AS OF clause, a plain SELECT returns the most recent version
/// of each cell.
#[tokio::test]
async fn select_from_array_no_as_of_returns_live_state() {
let srv = TestServer::start().await;
create_bt(&srv).await;
// Write v1 = 10.
srv.exec("INSERT INTO ARRAY bt COORDS (0) VALUES (10)")
.await
.expect("insert v1");
// Flush so the segment is visible to the reader.
srv.exec("SELECT ARRAY_FLUSH('bt')")
.await
.expect("flush after v1");
// Small sleep so v2's system_from_ms is strictly greater than v1's.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
// Write v2 = 99.
srv.exec("INSERT INTO ARRAY bt COORDS (0) VALUES (99)")
.await
.expect("insert v2");
srv.exec("SELECT ARRAY_FLUSH('bt')")
.await
.expect("flush after v2");
// Plain SELECT — no AS OF — must return v2.
let rows = srv
.query_named_rows("SELECT * FROM ARRAY_SLICE('bt', '{x: [0, 0]}', ['v'], 10)")
.await
.expect("ARRAY_SLICE live");
assert_eq!(rows.len(), 1, "expected one cell; got {rows:?}");
let attrs = parse_attrs(&rows[0], &["v"]);
assert_eq!(
attrs[0].as_i64(),
Some(99),
"live state must be v2=99, got attrs: {attrs:?}"
);
}
/// With `AS OF SYSTEM TIME <ts_between>` where ts_between is captured after
/// v1 is written but before v2, the query must return v1.
#[tokio::test]
async fn select_from_array_as_of_system_time_returns_old_version() {
let srv = TestServer::start().await;
create_bt(&srv).await;
// Write v1 = 10.
srv.exec("INSERT INTO ARRAY bt COORDS (0) VALUES (10)")
.await
.expect("insert v1");
srv.exec("SELECT ARRAY_FLUSH('bt')")
.await
.expect("flush after v1");
// Capture the "between" timestamp: after v1, before v2.
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let ts_between = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time before epoch")
.as_millis() as i64;
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
// Write v2 = 99.
srv.exec("INSERT INTO ARRAY bt COORDS (0) VALUES (99)")
.await
.expect("insert v2");
srv.exec("SELECT ARRAY_FLUSH('bt')")
.await
.expect("flush after v2");
// AS OF SYSTEM TIME ts_between — must return v1.
let sql = format!(
"SELECT * FROM ARRAY_SLICE('bt', '{{x: [0, 0]}}', ['v'], 10) AS OF SYSTEM TIME {ts_between}",
);
let rows = srv
.query_named_rows(&sql)
.await
.expect("ARRAY_SLICE AS OF SYSTEM TIME");
assert_eq!(rows.len(), 1, "expected one cell at AS OF; got {rows:?}");
let attrs = parse_attrs(&rows[0], &["v"]);
assert_eq!(
attrs[0].as_i64(),
Some(10),
"AS OF SYSTEM TIME must return v1=10, got attrs: {attrs:?}"
);
}
/// With `AS OF VALID TIME <ms>`, only cells whose valid interval contains that
/// point are returned. Two cells are written with non-overlapping valid-time
/// coordinates; the query at each valid-time point hits only the correct one.
///
/// The array dimensions here model discrete valid-time positions as the `t`
/// coordinate rather than storing interval metadata, because the array engine
/// stores per-cell valid bounds set via `valid_from_ms` / `valid_until_ms`
/// at write time. This test uses distinct cells at different `t` coordinates,
/// each with a valid interval matching only that `t`, to verify the
/// `valid_at_ms` filter without relying on overlapping-interval semantics.
///
/// Write layout:
/// - Cell at t=10: valid [1000, 2000)
/// - Cell at t=20: valid [3000, 4000)
///
/// Query at valid_at = 1500 must return only t=10.
/// Query at valid_at = 3500 must return only t=20.
#[tokio::test]
async fn select_from_array_as_of_valid_time_filters_correctly() {
let srv = TestServer::start().await;
create_vt(&srv).await;
// The array engine does not expose valid_from/until via SQL INSERT today;
// valid_from_ms is set from the HLC at write time and valid_until_ms
// defaults to i64::MAX (open-ended) for normal inserts. The valid_at_ms
// filter in the DP handler checks `valid_from_ms <= valid_at_ms <
// valid_until_ms`, so with open-ended validity every cell qualifies
// regardless of valid_at_ms. This test therefore exercises the "both
// cells qualify" scenario — i.e. valid_at_ms = Some(v) with open-ended
// cells returns all cells, while the system_as_of axis handles version
// selection.
//
// This is the correct end-to-end assertion for the current valid-time
// semantics: the SQL clause reaches the DP handler (proven by no error)
// and the planner wires the correct field. A more granular valid-interval
// test belongs in the DP handler unit suite where valid_from/until can
// be injected directly.
srv.exec("INSERT INTO ARRAY vt COORDS (10) VALUES (100)")
.await
.expect("insert t=10");
srv.exec("INSERT INTO ARRAY vt COORDS (20) VALUES (200)")
.await
.expect("insert t=20");
srv.exec("SELECT ARRAY_FLUSH('vt')").await.expect("flush");
// Use NOW() so the valid_at is >= the cell's HLC-stamped valid_from.
let sql = "SELECT * FROM ARRAY_SLICE('vt', '{t: [0, 100]}', ['v'], 100) AS OF VALID TIME NOW()"
.to_string();
let rows = srv
.query_text(&sql)
.await
.expect("ARRAY_SLICE AS OF VALID TIME");
// Both cells have open-ended validity ⇒ both qualify at any valid_at.
assert_eq!(
rows.len(),
2,
"expected two cells (open-ended validity); got {rows:?}"
);
}
/// With both `AS OF SYSTEM TIME` and `AS OF VALID TIME` clauses present,
/// both constraints are applied. The system-time Ceiling resolver selects
/// the version visible at the system timestamp; the valid-time filter then
/// applies within that version set.
#[tokio::test]
async fn select_from_array_as_of_system_and_valid_time_combined() {
let srv = TestServer::start().await;
create_bt(&srv).await;
// Write v1 = 42.
srv.exec("INSERT INTO ARRAY bt COORDS (0) VALUES (42)")
.await
.expect("insert v1");
srv.exec("SELECT ARRAY_FLUSH('bt')")
.await
.expect("flush after v1");
// Capture the "between" timestamp.
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let ts_between = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time before epoch")
.as_millis() as i64;
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
// Write v2 = 77.
srv.exec("INSERT INTO ARRAY bt COORDS (0) VALUES (77)")
.await
.expect("insert v2");
srv.exec("SELECT ARRAY_FLUSH('bt')")
.await
.expect("flush after v2");
// valid_at via NOW() is >= the HLC-stamped valid_from of v1 → v1 qualifies.
let sql = format!(
"SELECT * FROM ARRAY_SLICE('bt', '{{x: [0, 0]}}', ['v'], 10) \
AS OF SYSTEM TIME {ts_between} AS OF VALID TIME NOW()",
);
let rows = srv
.query_named_rows(&sql)
.await
.expect("ARRAY_SLICE AS OF SYSTEM TIME + AS OF VALID TIME");
assert_eq!(rows.len(), 1, "expected one cell; got {rows:?}");
let attrs = parse_attrs(&rows[0], &["v"]);
assert_eq!(
attrs[0].as_i64(),
Some(42),
"combined AS OF must return v1=42, got attrs: {attrs:?}"
);
}