1#![forbid(unsafe_code)]
2
3use std::collections::HashMap;
30use std::sync::RwLock;
31
32use crate::{SqlError, SqlResult};
33
34#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct LatenessAnnotation {
39 pub column: String,
40 pub lateness_ms: u64,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum IncrementalViewStatement {
48 Create {
49 name: String,
50 body_sql: String,
51 is_materialized: bool,
52 lateness: Vec<LatenessAnnotation>,
53 },
54 DeclareRecursive {
55 name: String,
56 body_sql: String,
57 },
58 Refresh {
59 name: String,
60 },
61 Drop {
62 name: String,
63 },
64}
65
66#[derive(Debug, Clone)]
70pub struct IncrementalViewEntry {
71 pub body_sql: String,
72 pub is_materialized: bool,
73 pub is_recursive: bool,
74 pub lateness: Vec<LatenessAnnotation>,
75}
76
77#[derive(Debug, Default)]
83pub struct IncrementalViewRegistry {
84 views: RwLock<HashMap<String, IncrementalViewEntry>>,
85}
86
87impl IncrementalViewRegistry {
88 pub fn new() -> Self {
89 Self::default()
90 }
91
92 pub fn register(&self, name: impl Into<String>, entry: IncrementalViewEntry) -> SqlResult<()> {
93 let mut views = self.views.write().map_err(|_| SqlError::DataFusion {
94 message: "incremental view registry lock poisoned".into(),
95 })?;
96 views.insert(name.into(), entry);
97 Ok(())
98 }
99
100 pub fn remove(&self, name: &str) -> SqlResult<bool> {
101 let mut views = self.views.write().map_err(|_| SqlError::DataFusion {
102 message: "incremental view registry lock poisoned".into(),
103 })?;
104 Ok(views.remove(name).is_some())
105 }
106
107 pub fn get(&self, name: &str) -> SqlResult<Option<IncrementalViewEntry>> {
108 let views = self.views.read().map_err(|_| SqlError::DataFusion {
109 message: "incremental view registry lock poisoned".into(),
110 })?;
111 Ok(views.get(name).cloned())
112 }
113
114 pub fn contains(&self, name: &str) -> bool {
115 self.views
116 .read()
117 .map(|v| v.contains_key(name))
118 .unwrap_or(false)
119 }
120
121 pub fn view_names(&self) -> SqlResult<Vec<String>> {
122 let views = self.views.read().map_err(|_| SqlError::DataFusion {
123 message: "incremental view registry lock poisoned".into(),
124 })?;
125 Ok(views.keys().cloned().collect())
126 }
127}
128
129pub fn parse_incremental_view_statement(sql: &str) -> SqlResult<Option<IncrementalViewStatement>> {
135 let trimmed = sql.trim().trim_end_matches(';');
136 let upper = trimmed.to_uppercase();
137
138 let is_materialized = upper.starts_with("CREATE MATERIALIZED INCREMENTAL VIEW ");
141 if is_materialized || upper.starts_with("CREATE INCREMENTAL VIEW ") {
142 let prefix = if is_materialized {
143 "CREATE MATERIALIZED INCREMENTAL VIEW "
144 } else {
145 "CREATE INCREMENTAL VIEW "
146 };
147 let rest = trimmed
148 .get(prefix.len()..)
149 .ok_or_else(|| SqlError::Unsupported {
150 feature: "CREATE INCREMENTAL VIEW".into(),
151 })?;
152 let (name, body_with_lateness) = split_name_and_body(rest)?;
153 let (body_sql, lateness) = split_body_and_lateness(&body_with_lateness)?;
154 return Ok(Some(IncrementalViewStatement::Create {
155 name,
156 body_sql,
157 is_materialized,
158 lateness,
159 }));
160 }
161
162 if upper.starts_with("DECLARE RECURSIVE VIEW ") {
164 let rest = trimmed
165 .get("DECLARE RECURSIVE VIEW ".len()..)
166 .ok_or_else(|| SqlError::Unsupported {
167 feature: "DECLARE RECURSIVE VIEW".into(),
168 })?;
169 let (name, body_sql) = split_name_and_body(rest)?;
170 let (body_sql, _lateness) = split_body_and_lateness(&body_sql)?;
171 return Ok(Some(IncrementalViewStatement::DeclareRecursive {
172 name,
173 body_sql,
174 }));
175 }
176
177 if upper.starts_with("REFRESH INCREMENTAL VIEW ") {
179 let name = trimmed
180 .get("REFRESH INCREMENTAL VIEW ".len()..)
181 .ok_or_else(|| SqlError::Unsupported {
182 feature: "REFRESH INCREMENTAL VIEW".into(),
183 })?
184 .trim()
185 .to_string();
186 if name.is_empty() {
187 return Err(SqlError::EmptyTableName);
188 }
189 return Ok(Some(IncrementalViewStatement::Refresh { name }));
190 }
191
192 if upper.starts_with("DROP INCREMENTAL VIEW ") {
194 let name = trimmed
195 .get("DROP INCREMENTAL VIEW ".len()..)
196 .ok_or_else(|| SqlError::Unsupported {
197 feature: "DROP INCREMENTAL VIEW".into(),
198 })?
199 .trim()
200 .to_string();
201 if name.is_empty() {
202 return Err(SqlError::EmptyTableName);
203 }
204 return Ok(Some(IncrementalViewStatement::Drop { name }));
205 }
206
207 Ok(None)
208}
209
210pub fn execute_incremental_view_ddl(
216 registry: &IncrementalViewRegistry,
217 sql: &str,
218) -> SqlResult<Option<String>> {
219 let Some(stmt) = parse_incremental_view_statement(sql)? else {
220 return Ok(None);
221 };
222
223 match stmt {
224 IncrementalViewStatement::Create {
225 ref name,
226 ref body_sql,
227 is_materialized,
228 ref lateness,
229 } => {
230 registry.register(
231 name.clone(),
232 IncrementalViewEntry {
233 body_sql: body_sql.clone(),
234 is_materialized,
235 is_recursive: false,
236 lateness: lateness.clone(),
237 },
238 )?;
239 Ok(Some(name.clone()))
240 }
241
242 IncrementalViewStatement::DeclareRecursive {
243 ref name,
244 ref body_sql,
245 } => {
246 registry.register(
247 name.clone(),
248 IncrementalViewEntry {
249 body_sql: body_sql.clone(),
250 is_materialized: false,
251 is_recursive: true,
252 lateness: vec![],
253 },
254 )?;
255 Ok(Some(name.clone()))
256 }
257
258 IncrementalViewStatement::Refresh { ref name } => {
259 if !registry.contains(name) {
260 return Err(SqlError::Unsupported {
261 feature: format!("REFRESH INCREMENTAL VIEW: view '{name}' is not registered"),
262 });
263 }
264 Ok(Some(name.clone()))
265 }
266
267 IncrementalViewStatement::Drop { ref name } => {
268 registry.remove(name)?;
269 Ok(Some(name.clone()))
270 }
271 }
272}
273
274fn split_name_and_body(rest: &str) -> SqlResult<(String, String)> {
278 let upper = rest.to_uppercase();
279 let as_pos = upper.find(" AS ").ok_or_else(|| SqlError::Unsupported {
280 feature: "CREATE INCREMENTAL VIEW / DECLARE RECURSIVE VIEW requires AS <query>".into(),
281 })?;
282 let name = rest[..as_pos].trim().to_string();
283 let body = rest[as_pos + 4..].trim().to_string();
284 if name.is_empty() {
285 return Err(SqlError::EmptyTableName);
286 }
287 if body.is_empty() {
288 return Err(SqlError::EmptyQuery);
289 }
290 Ok((name, body))
291}
292
293fn split_body_and_lateness(
300 body_with_lateness: &str,
301) -> SqlResult<(String, Vec<LatenessAnnotation>)> {
302 let upper = body_with_lateness.to_uppercase();
303
304 let Some(lat_pos) = find_lateness_clause_start(&upper) else {
307 return Ok((body_with_lateness.trim().to_string(), vec![]));
308 };
309
310 let body_sql = body_with_lateness[..lat_pos].trim().to_string();
311 let lateness_str = &body_with_lateness[lat_pos..];
312 let lateness = parse_lateness_clauses(lateness_str)?;
313 Ok((body_sql, lateness))
314}
315
316fn find_lateness_clause_start(upper: &str) -> Option<usize> {
318 let bytes = upper.as_bytes();
320 let keyword = b"LATENESS";
321 let mut depth = 0usize;
322 let mut i = 0usize;
323 while i + keyword.len() <= bytes.len() {
324 let Some(&b) = bytes.get(i) else {
325 break;
326 };
327 match b {
328 b'(' => {
329 depth += 1;
330 i += 1;
331 }
332 b')' => {
333 depth = depth.saturating_sub(1);
334 i += 1;
335 }
336 _ if depth == 0 && bytes.get(i..).is_some_and(|s| s.starts_with(keyword)) => {
337 let before_ok =
338 i == 0 || bytes.get(i - 1).is_some_and(|b| !b.is_ascii_alphanumeric());
339 let after = i + keyword.len();
340 let after_ok = bytes.get(after).is_none_or(|b| !b.is_ascii_alphanumeric());
341 if before_ok && after_ok {
342 return Some(i);
343 }
344 i += 1;
345 }
346 _ => {
347 i += 1;
348 }
349 }
350 }
351 None
352}
353
354fn parse_lateness_clauses(lateness_str: &str) -> SqlResult<Vec<LatenessAnnotation>> {
356 let upper = lateness_str.to_uppercase();
358 let mut result = Vec::new();
359 let mut remaining = lateness_str.trim();
360
361 loop {
362 let upper_rem = remaining.to_uppercase();
363 let stripped = if upper_rem.starts_with("LATENESS ") {
364 &remaining["LATENESS ".len()..]
365 } else if upper_rem.starts_with(", LATENESS ") {
366 &remaining[", LATENESS ".len()..]
367 } else {
368 break;
369 };
370
371 let tokens: Vec<&str> = stripped.splitn(5, char::is_whitespace).collect();
373 if tokens.len() < 4 {
374 break;
375 }
376 let col = tokens
377 .first()
378 .copied()
379 .unwrap_or("")
380 .trim_matches(',')
381 .to_string();
382 let interval_str = tokens.get(2).copied().unwrap_or("").trim_matches('\'');
384 let unit_str = tokens.get(3).copied().unwrap_or("").trim_matches(',');
385 let n: u64 = interval_str.parse().map_err(|_| SqlError::Unsupported {
386 feature: format!("LATENESS INTERVAL value '{interval_str}' is not a valid integer"),
387 })?;
388 let ms = match unit_str.to_uppercase().as_str() {
389 "SECOND" | "SECONDS" => n * 1000,
390 "MINUTE" | "MINUTES" => n * 60_000,
391 "HOUR" | "HOURS" => n * 3_600_000,
392 "DAY" | "DAYS" => n * 86_400_000,
393 "MILLISECOND" | "MILLISECONDS" | "MS" => n,
394 _ => {
395 return Err(SqlError::Unsupported {
396 feature: format!(
397 "LATENESS interval unit '{unit_str}' is not supported \
398 (expected SECOND, MINUTE, HOUR, DAY, or MILLISECOND)"
399 ),
400 });
401 }
402 };
403
404 result.push(LatenessAnnotation {
405 column: col,
406 lateness_ms: ms,
407 });
408
409 let consumed_upper: String = upper_rem
411 .chars()
412 .take("LATENESS ".len() + stripped.len() - stripped.trim_start().len())
413 .collect();
414 let _ = consumed_upper; let next = remaining[1..].to_uppercase().find("LATENESS");
417 match next {
418 Some(pos) => {
419 remaining = &remaining[1 + pos..];
420 }
421 None => break,
422 }
423 }
424
425 let _ = upper; Ok(result)
427}
428
429#[cfg(test)]
432mod tests {
433 use super::*;
434
435 #[test]
436 fn parse_create_incremental_view() {
437 let sql = "CREATE INCREMENTAL VIEW revenue AS SELECT SUM(amount) FROM orders";
438 let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
439 assert!(matches!(
440 stmt,
441 IncrementalViewStatement::Create { ref name, is_materialized: false, .. }
442 if name == "revenue"
443 ));
444 }
445
446 #[test]
447 fn parse_create_materialized_incremental_view() {
448 let sql = "CREATE MATERIALIZED INCREMENTAL VIEW snap AS SELECT * FROM t";
449 let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
450 assert!(matches!(
451 stmt,
452 IncrementalViewStatement::Create {
453 is_materialized: true,
454 ..
455 }
456 ));
457 }
458
459 #[test]
460 fn parse_declare_recursive_view() {
461 let sql = "DECLARE RECURSIVE VIEW reach AS SELECT dst FROM edges WHERE src = 0";
462 let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
463 assert!(matches!(
464 stmt,
465 IncrementalViewStatement::DeclareRecursive { ref name, .. } if name == "reach"
466 ));
467 }
468
469 #[test]
470 fn parse_refresh_incremental_view() {
471 let sql = "REFRESH INCREMENTAL VIEW revenue";
472 let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
473 assert!(matches!(
474 stmt,
475 IncrementalViewStatement::Refresh { ref name } if name == "revenue"
476 ));
477 }
478
479 #[test]
480 fn parse_drop_incremental_view() {
481 let sql = "DROP INCREMENTAL VIEW revenue;";
482 let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
483 assert!(matches!(
484 stmt,
485 IncrementalViewStatement::Drop { ref name } if name == "revenue"
486 ));
487 }
488
489 #[test]
490 fn non_incremental_sql_returns_none() {
491 let sql = "SELECT 1";
492 assert!(parse_incremental_view_statement(sql).unwrap().is_none());
493 }
494
495 #[test]
496 fn parse_create_with_lateness() {
497 let sql =
498 "CREATE INCREMENTAL VIEW ev AS SELECT * FROM s LATENESS event_ts INTERVAL '5' MINUTE";
499 let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
500 if let IncrementalViewStatement::Create { lateness, .. } = stmt {
501 assert_eq!(lateness.len(), 1);
502 assert_eq!(lateness[0].column, "event_ts");
503 assert_eq!(lateness[0].lateness_ms, 5 * 60_000);
504 } else {
505 panic!("expected Create");
506 }
507 }
508
509 #[test]
510 fn registry_register_and_get() {
511 let reg = IncrementalViewRegistry::new();
512 reg.register(
513 "v1",
514 IncrementalViewEntry {
515 body_sql: "SELECT 1".into(),
516 is_materialized: false,
517 is_recursive: false,
518 lateness: vec![],
519 },
520 )
521 .unwrap();
522 assert!(reg.contains("v1"));
523 let entry = reg.get("v1").unwrap().unwrap();
524 assert_eq!(entry.body_sql, "SELECT 1");
525 }
526
527 #[test]
528 fn execute_ddl_create_and_drop() {
529 let reg = IncrementalViewRegistry::new();
530 let name =
531 execute_incremental_view_ddl(®, "CREATE INCREMENTAL VIEW v AS SELECT 1").unwrap();
532 assert_eq!(name.as_deref(), Some("v"));
533 assert!(reg.contains("v"));
534
535 execute_incremental_view_ddl(®, "DROP INCREMENTAL VIEW v").unwrap();
536 assert!(!reg.contains("v"));
537 }
538
539 #[test]
540 fn execute_ddl_refresh_missing_returns_error() {
541 let reg = IncrementalViewRegistry::new();
542 let err = execute_incremental_view_ddl(®, "REFRESH INCREMENTAL VIEW nonexistent");
543 assert!(err.is_err());
544 }
545}