Skip to main content

krishiv_sql/
incremental_view.rs

1#![forbid(unsafe_code)]
2
3//! `CREATE INCREMENTAL VIEW` and `DECLARE RECURSIVE VIEW` SQL extensions.
4//!
5//! Supported DDL:
6//!
7//! ```sql
8//! -- Non-recursive incremental view (IVM)
9//! CREATE INCREMENTAL VIEW revenue AS
10//!   SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id
11//!   LATENESS event_ts INTERVAL '5' MINUTE;
12//!
13//! -- Materialized variant (keeps a full snapshot in memory)
14//! CREATE MATERIALIZED INCREMENTAL VIEW revenue AS ...;
15//!
16//! -- Recursive view (fixed-point iteration, auto-DISTINCT)
17//! DECLARE RECURSIVE VIEW reachable AS
18//!   SELECT dst FROM edges WHERE src = 0
19//!   UNION ALL
20//!   SELECT e.dst FROM edges e JOIN reachable r ON e.src = r.dst;
21//!
22//! -- Force a re-step (no-op for streaming; useful in batch/test mode)
23//! REFRESH INCREMENTAL VIEW revenue;
24//!
25//! -- Remove view and its cached Trace state
26//! DROP INCREMENTAL VIEW revenue;
27//! ```
28
29use std::collections::HashMap;
30use std::sync::RwLock;
31
32use crate::{SqlError, SqlResult};
33
34// ── LATENESS spec ─────────────────────────────────────────────────────────────
35
36/// One LATENESS annotation: `LATENESS <column> INTERVAL '<n>' <unit>`.
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct LatenessAnnotation {
39    pub column: String,
40    pub lateness_ms: u64,
41}
42
43// ── Parsed DDL statement ───────────────────────────────────────────────────────
44
45/// Parsed incremental-view DDL statement.
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum IncrementalViewStatement {
48    Create {
49        name: String,
50        body_sql: String,
51        is_materialized: bool,
52        lateness: Vec<LatenessAnnotation>,
53    },
54    DeclareRecursive {
55        name: String,
56        body_sql: String,
57    },
58    Refresh {
59        name: String,
60    },
61    Drop {
62        name: String,
63    },
64}
65
66// ── Registry ──────────────────────────────────────────────────────────────────
67
68/// Metadata stored for one registered incremental view.
69#[derive(Debug, Clone)]
70pub struct IncrementalViewEntry {
71    pub body_sql: String,
72    pub is_materialized: bool,
73    pub is_recursive: bool,
74    pub lateness: Vec<LatenessAnnotation>,
75}
76
77/// Registry of active incremental views (SQL metadata layer).
78///
79/// This is the SQL-layer registry — it stores the DDL metadata for each view.
80/// The `krishiv-api` layer bridges this to the `krishiv-delta` incremental
81/// operator pipeline.
82#[derive(Debug, Default)]
83pub struct IncrementalViewRegistry {
84    views: RwLock<HashMap<String, IncrementalViewEntry>>,
85}
86
87impl IncrementalViewRegistry {
88    pub fn new() -> Self {
89        Self::default()
90    }
91
92    pub fn register(&self, name: impl Into<String>, entry: IncrementalViewEntry) -> SqlResult<()> {
93        let mut views = self.views.write().map_err(|_| SqlError::DataFusion {
94            message: "incremental view registry lock poisoned".into(),
95        })?;
96        views.insert(name.into(), entry);
97        Ok(())
98    }
99
100    pub fn remove(&self, name: &str) -> SqlResult<bool> {
101        let mut views = self.views.write().map_err(|_| SqlError::DataFusion {
102            message: "incremental view registry lock poisoned".into(),
103        })?;
104        Ok(views.remove(name).is_some())
105    }
106
107    pub fn get(&self, name: &str) -> SqlResult<Option<IncrementalViewEntry>> {
108        let views = self.views.read().map_err(|_| SqlError::DataFusion {
109            message: "incremental view registry lock poisoned".into(),
110        })?;
111        Ok(views.get(name).cloned())
112    }
113
114    pub fn contains(&self, name: &str) -> bool {
115        self.views
116            .read()
117            .map(|v| v.contains_key(name))
118            .unwrap_or(false)
119    }
120
121    pub fn view_names(&self) -> SqlResult<Vec<String>> {
122        let views = self.views.read().map_err(|_| SqlError::DataFusion {
123            message: "incremental view registry lock poisoned".into(),
124        })?;
125        Ok(views.keys().cloned().collect())
126    }
127}
128
129// ── Parser ────────────────────────────────────────────────────────────────────
130
131/// Parse incremental-view DDL statements from a SQL string.
132///
133/// Returns `Ok(None)` if the statement is not an incremental-view DDL.
134pub fn parse_incremental_view_statement(sql: &str) -> SqlResult<Option<IncrementalViewStatement>> {
135    let trimmed = sql.trim().trim_end_matches(';');
136    let upper = trimmed.to_uppercase();
137
138    // CREATE [MATERIALIZED] INCREMENTAL VIEW <name> AS <body>
139    // [LATENESS <col> INTERVAL '<n>' <unit>]
140    let is_materialized = upper.starts_with("CREATE MATERIALIZED INCREMENTAL VIEW ");
141    if is_materialized || upper.starts_with("CREATE INCREMENTAL VIEW ") {
142        let prefix = if is_materialized {
143            "CREATE MATERIALIZED INCREMENTAL VIEW "
144        } else {
145            "CREATE INCREMENTAL VIEW "
146        };
147        let rest = trimmed
148            .get(prefix.len()..)
149            .ok_or_else(|| SqlError::Unsupported {
150                feature: "CREATE INCREMENTAL VIEW".into(),
151            })?;
152        let (name, body_with_lateness) = split_name_and_body(rest)?;
153        let (body_sql, lateness) = split_body_and_lateness(&body_with_lateness)?;
154        return Ok(Some(IncrementalViewStatement::Create {
155            name,
156            body_sql,
157            is_materialized,
158            lateness,
159        }));
160    }
161
162    // DECLARE RECURSIVE VIEW <name> AS <body>
163    if upper.starts_with("DECLARE RECURSIVE VIEW ") {
164        let rest = trimmed
165            .get("DECLARE RECURSIVE VIEW ".len()..)
166            .ok_or_else(|| SqlError::Unsupported {
167                feature: "DECLARE RECURSIVE VIEW".into(),
168            })?;
169        let (name, body_sql) = split_name_and_body(rest)?;
170        let (body_sql, _lateness) = split_body_and_lateness(&body_sql)?;
171        return Ok(Some(IncrementalViewStatement::DeclareRecursive {
172            name,
173            body_sql,
174        }));
175    }
176
177    // REFRESH INCREMENTAL VIEW <name>
178    if upper.starts_with("REFRESH INCREMENTAL VIEW ") {
179        let name = trimmed
180            .get("REFRESH INCREMENTAL VIEW ".len()..)
181            .ok_or_else(|| SqlError::Unsupported {
182                feature: "REFRESH INCREMENTAL VIEW".into(),
183            })?
184            .trim()
185            .to_string();
186        if name.is_empty() {
187            return Err(SqlError::EmptyTableName);
188        }
189        return Ok(Some(IncrementalViewStatement::Refresh { name }));
190    }
191
192    // DROP INCREMENTAL VIEW <name>
193    if upper.starts_with("DROP INCREMENTAL VIEW ") {
194        let name = trimmed
195            .get("DROP INCREMENTAL VIEW ".len()..)
196            .ok_or_else(|| SqlError::Unsupported {
197                feature: "DROP INCREMENTAL VIEW".into(),
198            })?
199            .trim()
200            .to_string();
201        if name.is_empty() {
202            return Err(SqlError::EmptyTableName);
203        }
204        return Ok(Some(IncrementalViewStatement::Drop { name }));
205    }
206
207    Ok(None)
208}
209
210/// Apply a parsed incremental-view DDL statement to the registry.
211///
212/// Returns `Some(name)` if the statement was an incremental-view DDL (so the
213/// caller knows to return an empty DDL result rather than forwarding to
214/// DataFusion), or `None` if the SQL was not an incremental-view DDL.
215pub fn execute_incremental_view_ddl(
216    registry: &IncrementalViewRegistry,
217    sql: &str,
218) -> SqlResult<Option<String>> {
219    let Some(stmt) = parse_incremental_view_statement(sql)? else {
220        return Ok(None);
221    };
222
223    match stmt {
224        IncrementalViewStatement::Create {
225            ref name,
226            ref body_sql,
227            is_materialized,
228            ref lateness,
229        } => {
230            registry.register(
231                name.clone(),
232                IncrementalViewEntry {
233                    body_sql: body_sql.clone(),
234                    is_materialized,
235                    is_recursive: false,
236                    lateness: lateness.clone(),
237                },
238            )?;
239            Ok(Some(name.clone()))
240        }
241
242        IncrementalViewStatement::DeclareRecursive {
243            ref name,
244            ref body_sql,
245        } => {
246            registry.register(
247                name.clone(),
248                IncrementalViewEntry {
249                    body_sql: body_sql.clone(),
250                    is_materialized: false,
251                    is_recursive: true,
252                    lateness: vec![],
253                },
254            )?;
255            Ok(Some(name.clone()))
256        }
257
258        IncrementalViewStatement::Refresh { ref name } => {
259            if !registry.contains(name) {
260                return Err(SqlError::Unsupported {
261                    feature: format!("REFRESH INCREMENTAL VIEW: view '{name}' is not registered"),
262                });
263            }
264            Ok(Some(name.clone()))
265        }
266
267        IncrementalViewStatement::Drop { ref name } => {
268            registry.remove(name)?;
269            Ok(Some(name.clone()))
270        }
271    }
272}
273
274// ── Internal helpers ──────────────────────────────────────────────────────────
275
276/// Split `<name> AS <body>` into `(name, body)`.
277fn split_name_and_body(rest: &str) -> SqlResult<(String, String)> {
278    let upper = rest.to_uppercase();
279    let as_pos = upper.find(" AS ").ok_or_else(|| SqlError::Unsupported {
280        feature: "CREATE INCREMENTAL VIEW / DECLARE RECURSIVE VIEW requires AS <query>".into(),
281    })?;
282    let name = rest[..as_pos].trim().to_string();
283    let body = rest[as_pos + 4..].trim().to_string();
284    if name.is_empty() {
285        return Err(SqlError::EmptyTableName);
286    }
287    if body.is_empty() {
288        return Err(SqlError::EmptyQuery);
289    }
290    Ok((name, body))
291}
292
293/// Split the view body from trailing `LATENESS` annotations.
294///
295/// Grammar: `<body_sql> LATENESS <col> INTERVAL '<n>' <unit> [, ...]`
296/// where unit is SECOND | MINUTE | HOUR | DAY.
297///
298/// If no LATENESS clause is found, returns `(body, vec![])`.
299fn split_body_and_lateness(
300    body_with_lateness: &str,
301) -> SqlResult<(String, Vec<LatenessAnnotation>)> {
302    let upper = body_with_lateness.to_uppercase();
303
304    // Find the LAST occurrence of LATENESS (it follows the body SQL).
305    // We look for the keyword followed by a valid column name and INTERVAL.
306    let Some(lat_pos) = find_lateness_clause_start(&upper) else {
307        return Ok((body_with_lateness.trim().to_string(), vec![]));
308    };
309
310    let body_sql = body_with_lateness[..lat_pos].trim().to_string();
311    let lateness_str = &body_with_lateness[lat_pos..];
312    let lateness = parse_lateness_clauses(lateness_str)?;
313    Ok((body_sql, lateness))
314}
315
316/// Find the byte offset of the first top-level LATENESS keyword in `upper`.
317fn find_lateness_clause_start(upper: &str) -> Option<usize> {
318    // Simple scan: look for the word LATENESS not inside parentheses.
319    let bytes = upper.as_bytes();
320    let keyword = b"LATENESS";
321    let mut depth = 0usize;
322    let mut i = 0usize;
323    while i + keyword.len() <= bytes.len() {
324        let Some(&b) = bytes.get(i) else {
325            break;
326        };
327        match b {
328            b'(' => {
329                depth += 1;
330                i += 1;
331            }
332            b')' => {
333                depth = depth.saturating_sub(1);
334                i += 1;
335            }
336            _ if depth == 0 && bytes.get(i..).is_some_and(|s| s.starts_with(keyword)) => {
337                let before_ok =
338                    i == 0 || bytes.get(i - 1).is_some_and(|b| !b.is_ascii_alphanumeric());
339                let after = i + keyword.len();
340                let after_ok = bytes.get(after).is_none_or(|b| !b.is_ascii_alphanumeric());
341                if before_ok && after_ok {
342                    return Some(i);
343                }
344                i += 1;
345            }
346            _ => {
347                i += 1;
348            }
349        }
350    }
351    None
352}
353
354/// Parse one or more `LATENESS <col> INTERVAL '<n>' <unit>` clauses.
355fn parse_lateness_clauses(lateness_str: &str) -> SqlResult<Vec<LatenessAnnotation>> {
356    // Tokenize: split on LATENESS keyword (handling multiple)
357    let upper = lateness_str.to_uppercase();
358    let mut result = Vec::new();
359    let mut remaining = lateness_str.trim();
360
361    loop {
362        let upper_rem = remaining.to_uppercase();
363        let stripped = if upper_rem.starts_with("LATENESS ") {
364            &remaining["LATENESS ".len()..]
365        } else if upper_rem.starts_with(", LATENESS ") {
366            &remaining[", LATENESS ".len()..]
367        } else {
368            break;
369        };
370
371        // Parse: <col> INTERVAL '<n>' <unit>
372        let tokens: Vec<&str> = stripped.splitn(5, char::is_whitespace).collect();
373        if tokens.len() < 4 {
374            break;
375        }
376        let col = tokens
377            .first()
378            .copied()
379            .unwrap_or("")
380            .trim_matches(',')
381            .to_string();
382        // tokens[1] should be INTERVAL (case-insensitive)
383        let interval_str = tokens.get(2).copied().unwrap_or("").trim_matches('\'');
384        let unit_str = tokens.get(3).copied().unwrap_or("").trim_matches(',');
385        let n: u64 = interval_str.parse().map_err(|_| SqlError::Unsupported {
386            feature: format!("LATENESS INTERVAL value '{interval_str}' is not a valid integer"),
387        })?;
388        let ms = match unit_str.to_uppercase().as_str() {
389            "SECOND" | "SECONDS" => n * 1000,
390            "MINUTE" | "MINUTES" => n * 60_000,
391            "HOUR" | "HOURS" => n * 3_600_000,
392            "DAY" | "DAYS" => n * 86_400_000,
393            "MILLISECOND" | "MILLISECONDS" | "MS" => n,
394            _ => {
395                return Err(SqlError::Unsupported {
396                    feature: format!(
397                        "LATENESS interval unit '{unit_str}' is not supported \
398                         (expected SECOND, MINUTE, HOUR, DAY, or MILLISECOND)"
399                    ),
400                });
401            }
402        };
403
404        result.push(LatenessAnnotation {
405            column: col,
406            lateness_ms: ms,
407        });
408
409        // Advance past this clause
410        let consumed_upper: String = upper_rem
411            .chars()
412            .take("LATENESS ".len() + stripped.len() - stripped.trim_start().len())
413            .collect();
414        let _ = consumed_upper; // advance is approximate; find next LATENESS
415        // Find next "LATENESS" or ", LATENESS" in remaining
416        let next = remaining[1..].to_uppercase().find("LATENESS");
417        match next {
418            Some(pos) => {
419                remaining = &remaining[1 + pos..];
420            }
421            None => break,
422        }
423    }
424
425    let _ = upper; // suppress unused warning
426    Ok(result)
427}
428
429// ── Tests ──────────────────────────────────────────────────────────────────────
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[test]
436    fn parse_create_incremental_view() {
437        let sql = "CREATE INCREMENTAL VIEW revenue AS SELECT SUM(amount) FROM orders";
438        let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
439        assert!(matches!(
440            stmt,
441            IncrementalViewStatement::Create { ref name, is_materialized: false, .. }
442            if name == "revenue"
443        ));
444    }
445
446    #[test]
447    fn parse_create_materialized_incremental_view() {
448        let sql = "CREATE MATERIALIZED INCREMENTAL VIEW snap AS SELECT * FROM t";
449        let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
450        assert!(matches!(
451            stmt,
452            IncrementalViewStatement::Create {
453                is_materialized: true,
454                ..
455            }
456        ));
457    }
458
459    #[test]
460    fn parse_declare_recursive_view() {
461        let sql = "DECLARE RECURSIVE VIEW reach AS SELECT dst FROM edges WHERE src = 0";
462        let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
463        assert!(matches!(
464            stmt,
465            IncrementalViewStatement::DeclareRecursive { ref name, .. } if name == "reach"
466        ));
467    }
468
469    #[test]
470    fn parse_refresh_incremental_view() {
471        let sql = "REFRESH INCREMENTAL VIEW revenue";
472        let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
473        assert!(matches!(
474            stmt,
475            IncrementalViewStatement::Refresh { ref name } if name == "revenue"
476        ));
477    }
478
479    #[test]
480    fn parse_drop_incremental_view() {
481        let sql = "DROP INCREMENTAL VIEW revenue;";
482        let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
483        assert!(matches!(
484            stmt,
485            IncrementalViewStatement::Drop { ref name } if name == "revenue"
486        ));
487    }
488
489    #[test]
490    fn non_incremental_sql_returns_none() {
491        let sql = "SELECT 1";
492        assert!(parse_incremental_view_statement(sql).unwrap().is_none());
493    }
494
495    #[test]
496    fn parse_create_with_lateness() {
497        let sql =
498            "CREATE INCREMENTAL VIEW ev AS SELECT * FROM s LATENESS event_ts INTERVAL '5' MINUTE";
499        let stmt = parse_incremental_view_statement(sql).unwrap().unwrap();
500        if let IncrementalViewStatement::Create { lateness, .. } = stmt {
501            assert_eq!(lateness.len(), 1);
502            assert_eq!(lateness[0].column, "event_ts");
503            assert_eq!(lateness[0].lateness_ms, 5 * 60_000);
504        } else {
505            panic!("expected Create");
506        }
507    }
508
509    #[test]
510    fn registry_register_and_get() {
511        let reg = IncrementalViewRegistry::new();
512        reg.register(
513            "v1",
514            IncrementalViewEntry {
515                body_sql: "SELECT 1".into(),
516                is_materialized: false,
517                is_recursive: false,
518                lateness: vec![],
519            },
520        )
521        .unwrap();
522        assert!(reg.contains("v1"));
523        let entry = reg.get("v1").unwrap().unwrap();
524        assert_eq!(entry.body_sql, "SELECT 1");
525    }
526
527    #[test]
528    fn execute_ddl_create_and_drop() {
529        let reg = IncrementalViewRegistry::new();
530        let name =
531            execute_incremental_view_ddl(&reg, "CREATE INCREMENTAL VIEW v AS SELECT 1").unwrap();
532        assert_eq!(name.as_deref(), Some("v"));
533        assert!(reg.contains("v"));
534
535        execute_incremental_view_ddl(&reg, "DROP INCREMENTAL VIEW v").unwrap();
536        assert!(!reg.contains("v"));
537    }
538
539    #[test]
540    fn execute_ddl_refresh_missing_returns_error() {
541        let reg = IncrementalViewRegistry::new();
542        let err = execute_incremental_view_ddl(&reg, "REFRESH INCREMENTAL VIEW nonexistent");
543        assert!(err.is_err());
544    }
545}