Skip to main content

dbx_core/sql/
view.rs

1//! SQL Views — CREATE VIEW / DROP VIEW 지원
2//!
3//! 뷰는 이름 → SQL 문자열 매핑을 DashMap으로 저장합니다.
4//! SQL 실행 전 FROM 절에서 뷰 이름 발견 시 서브쿼리로 인라인 치환.
5
6use dashmap::DashMap;
7use std::sync::Arc;
8
9use crate::error::{DbxError, DbxResult};
10
11/// SQL 뷰 레지스트리
12///
13/// 뷰 이름 → SQL 정의 문자열을 스레드 안전하게 관리합니다.
14#[derive(Debug, Default)]
15pub struct ViewRegistry {
16    /// view_name (소문자) → SQL 텍스트
17    views: DashMap<String, String>,
18}
19
20impl ViewRegistry {
21    /// 새 ViewRegistry 생성
22    pub fn new() -> Self {
23        Self::default()
24    }
25
26    /// 뷰 생성 (이미 존재하면 덮어씀)
27    pub fn create(&self, name: &str, sql: &str) -> DbxResult<()> {
28        self.views.insert(name.to_lowercase(), sql.to_string());
29        Ok(())
30    }
31
32    /// 뷰 삭제. 없으면 Err 반환.
33    pub fn drop(&self, name: &str) -> DbxResult<()> {
34        self.views
35            .remove(&name.to_lowercase())
36            .map(|_| ())
37            .ok_or_else(|| DbxError::InvalidArguments(format!("뷰 '{}' 를 찾을 수 없음", name)))
38    }
39
40    /// 뷰 존재 여부 확인
41    pub fn exists(&self, name: &str) -> bool {
42        self.views.contains_key(&name.to_lowercase())
43    }
44
45    /// SQL의 FROM 절에서 뷰 이름을 서브쿼리로 치환
46    ///
47    /// 예: `SELECT * FROM active_users`
48    ///   → `SELECT * FROM (SELECT id, name FROM users WHERE active = true) AS active_users`
49    pub fn expand(&self, sql: &str) -> String {
50        let mut result = sql.to_string();
51        for entry in self.views.iter() {
52            let name = entry.key();
53            let view_sql = entry.value();
54
55            // "FROM <name>" 패턴 치환 (대소문자 무시)
56            let pattern = format!("from {}", name);
57            let replacement = format!("FROM ({}) AS {}", view_sql, name);
58
59            // 대소문자 유지하면서 치환
60            let lower = result.to_lowercase();
61            if let Some(pos) = lower.find(&pattern) {
62                result = format!(
63                    "{}{}{}",
64                    &result[..pos],
65                    replacement,
66                    &result[pos + pattern.len()..]
67                );
68            }
69        }
70        result
71    }
72
73    /// 등록된 뷰 목록 반환
74    pub fn list_views(&self) -> Vec<String> {
75        self.views.iter().map(|e| e.key().clone()).collect()
76    }
77}
78
79/// Arc로 래핑된 공유 ViewRegistry 타입 별칭
80pub type SharedViewRegistry = Arc<ViewRegistry>;
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85
86    #[test]
87    fn test_create_and_exists() {
88        let reg = ViewRegistry::new();
89        assert!(!reg.exists("active_users"));
90        reg.create(
91            "active_users",
92            "SELECT id, name FROM users WHERE active = true",
93        )
94        .unwrap();
95        assert!(reg.exists("active_users"));
96    }
97
98    #[test]
99    fn test_create_case_insensitive() {
100        let reg = ViewRegistry::new();
101        reg.create("MyView", "SELECT 1").unwrap();
102        assert!(reg.exists("myview"));
103        assert!(reg.exists("MyView")); // 대소문자 무시
104        assert!(reg.exists("MYVIEW"));
105    }
106
107    #[test]
108    fn test_drop_view() {
109        let reg = ViewRegistry::new();
110        reg.create("v", "SELECT 1 AS x").unwrap();
111        assert!(reg.exists("v"));
112        reg.drop("v").unwrap();
113        assert!(!reg.exists("v"));
114    }
115
116    #[test]
117    fn test_drop_nonexistent_fails() {
118        let reg = ViewRegistry::new();
119        assert!(reg.drop("nonexistent").is_err());
120    }
121
122    #[test]
123    fn test_expand_replaces_from_clause() {
124        let reg = ViewRegistry::new();
125        reg.create(
126            "active_users",
127            "SELECT id, name FROM users WHERE active = true",
128        )
129        .unwrap();
130
131        let sql = "SELECT * FROM active_users";
132        let expanded = reg.expand(sql);
133        assert!(
134            expanded.contains("(SELECT id, name FROM users WHERE active = true)"),
135            "서브쿼리가 삽입되어야 함: {}",
136            expanded
137        );
138        assert!(
139            expanded.contains("AS active_users"),
140            "별칭이 지정되어야 함: {}",
141            expanded
142        );
143    }
144
145    #[test]
146    fn test_expand_no_match() {
147        let reg = ViewRegistry::new();
148        reg.create("v", "SELECT 1").unwrap();
149        let sql = "SELECT * FROM users"; // 'v' 가 아닌 테이블
150        let expanded = reg.expand(sql);
151        assert_eq!(expanded, sql, "치환 없이 원래 SQL 유지");
152    }
153
154    #[test]
155    fn test_list_views() {
156        let reg = ViewRegistry::new();
157        reg.create("v1", "SELECT 1").unwrap();
158        reg.create("v2", "SELECT 2").unwrap();
159        let mut views = reg.list_views();
160        views.sort();
161        assert_eq!(views, vec!["v1", "v2"]);
162    }
163}
164
165// ════════════════════════════════════════════
166// Materialized View Registry (Event-Driven)
167// ════════════════════════════════════════════
168
169use arrow::record_batch::RecordBatch;
170use std::collections::HashSet;
171use std::sync::{Condvar, Mutex, RwLock};
172use std::time::{Duration, Instant};
173
174/// SQL 문에서 FROM / JOIN 절의 테이블명을 간단히 추출합니다.
175///
176/// 완전한 SQL 파서 대신, `FROM <table>` 및 `JOIN <table>` 패턴을 대소문자 무시로 추출합니다.
177/// 서브쿼리 내 테이블은 포함하지 않지만, 대부분의 MV 사용 사례에서 충분합니다.
178fn extract_source_tables(sql: &str) -> Vec<String> {
179    let upper = sql.to_uppercase();
180    let tokens: Vec<&str> = upper.split_whitespace().collect();
181    let original_tokens: Vec<&str> = sql.split_whitespace().collect();
182    let mut tables = Vec::new();
183
184    for (i, token) in tokens.iter().enumerate() {
185        if (*token == "FROM" || *token == "JOIN") && i + 1 < tokens.len() {
186            let table_name = original_tokens[i + 1]
187                .trim_matches(|c: char| c == '(' || c == ')' || c == ',' || c == ';')
188                .to_lowercase();
189            if !table_name.is_empty() && table_name != "select" && table_name != "(" {
190                tables.push(table_name);
191            }
192        }
193    }
194    tables.sort();
195    tables.dedup();
196    tables
197}
198
199/// 구체화된 뷰 항목 (내부 저장 단위)
200struct MatViewEntry {
201    /// 뷰를 정의하는 SQL 쿼리
202    sql: String,
203    /// 사전 계산된 쿼리 결과 캐시 (최초에는 None — stale 상태)
204    cache: Option<Vec<RecordBatch>>,
205    /// 마지막 갱신 시각
206    refreshed_at: Option<Instant>,
207    /// 자동 갱신 주기 (초 단위). None이면 이벤트 기반 갱신만.
208    refresh_interval_secs: Option<u64>,
209    /// MV SQL이 참조하는 소스 테이블 목록 (FROM/JOIN 절에서 추출)
210    source_tables: Vec<String>,
211}
212
213/// 이벤트 기반 Materialized View 알림 채널
214///
215/// DML 발생 시 `notify_change(table)` → dirty set에 추가 → Condvar 깨움
216/// 백그라운드 스레드가 즉시 깨어나 debounce 후 갱신 실행
217pub struct MatViewNotifier {
218    /// 갱신 대기 중인 MV 이름 세트
219    dirty: Mutex<HashSet<String>>,
220    /// 블로킹 대기용 Condvar
221    condvar: Condvar,
222}
223
224impl MatViewNotifier {
225    fn new() -> Self {
226        Self {
227            dirty: Mutex::new(HashSet::new()),
228            condvar: Condvar::new(),
229        }
230    }
231
232    /// dirty set에 MV 이름 추가 + Condvar 깨움
233    fn mark_dirty(&self, mv_name: &str) {
234        let mut dirty = self.dirty.lock().unwrap();
235        dirty.insert(mv_name.to_string());
236        self.condvar.notify_one();
237    }
238
239    /// dirty set이 비어있으면 블로킹 대기, 채워지면 drain하여 반환
240    pub fn wait_and_take(&self) -> HashSet<String> {
241        let mut dirty = self.dirty.lock().unwrap();
242        while dirty.is_empty() {
243            dirty = self.condvar.wait(dirty).unwrap();
244        }
245        dirty.drain().collect()
246    }
247
248    /// non-blocking: 현재 dirty set을 drain하여 반환 (비어있으면 빈 세트)
249    pub fn take(&self) -> HashSet<String> {
250        let mut dirty = self.dirty.lock().unwrap();
251        dirty.drain().collect()
252    }
253}
254
255/// 구체화된 뷰 레지스트리 (이벤트 기반 갱신)
256///
257/// CREATE MATERIALIZED VIEW / REFRESH / DROP 명령어를 처리합니다.
258/// 각 뷰는 등록된 SQL과 사전 계산된 Arrow RecordBatch 캐시를 가집니다.
259///
260/// DML 발생 시 `notify_change(table)` 호출로 해당 테이블을 참조하는 MV를 즉시 갱신합니다.
261/// `min_refresh_interval_ms`로 갱신 폭풍을 방지합니다 (debounce).
262pub struct MaterializedViewRegistry {
263    views: DashMap<String, RwLock<MatViewEntry>>,
264    /// 이벤트 알림 채널
265    notifier: MatViewNotifier,
266    /// 최소 갱신 주기 (밀리초 단위, 기본 1000ms). debounce용.
267    min_refresh_interval_ms: std::sync::atomic::AtomicU64,
268}
269
270impl Default for MaterializedViewRegistry {
271    fn default() -> Self {
272        Self {
273            views: DashMap::new(),
274            notifier: MatViewNotifier::new(),
275            min_refresh_interval_ms: std::sync::atomic::AtomicU64::new(1000),
276        }
277    }
278}
279
280impl MaterializedViewRegistry {
281    pub fn new() -> Self {
282        Self::default()
283    }
284
285    /// 최소 갱신 주기를 밀리초 단위로 설정합니다.
286    ///
287    /// 이 값은 DML 이벤트 발생 후 실제 MV 갱신까지의 최소 대기 시간입니다.
288    /// 빈번한 쓰기 시 갱신 폭풍을 방지하는 debounce 역할을 합니다.
289    ///
290    /// # 예시
291    ///
292    /// ```rust,ignore
293    /// registry.set_min_refresh_interval_ms(500); // 0.5초 debounce
294    /// ```
295    pub fn set_min_refresh_interval_ms(&self, ms: u64) {
296        self.min_refresh_interval_ms
297            .store(ms, std::sync::atomic::Ordering::Relaxed);
298    }
299
300    /// 현재 설정된 최소 갱신 주기 (밀리초)
301    pub fn min_refresh_interval_ms(&self) -> u64 {
302        self.min_refresh_interval_ms
303            .load(std::sync::atomic::Ordering::Relaxed)
304    }
305
306    /// 최소 갱신 주기를 Duration으로 반환
307    pub fn min_refresh_interval(&self) -> Duration {
308        Duration::from_millis(self.min_refresh_interval_ms())
309    }
310
311    /// 구체화된 뷰 등록 (최초에는 stale — cache 없음)
312    ///
313    /// SQL에서 FROM/JOIN 절을 파싱하여 source_tables를 자동 추출합니다.
314    pub fn create(
315        &self,
316        name: &str,
317        sql: &str,
318        refresh_interval_secs: Option<u64>,
319    ) -> DbxResult<()> {
320        let source_tables = extract_source_tables(sql);
321        self.views.insert(
322            name.to_lowercase(),
323            RwLock::new(MatViewEntry {
324                sql: sql.to_string(),
325                cache: None,
326                refreshed_at: None,
327                refresh_interval_secs,
328                source_tables,
329            }),
330        );
331        Ok(())
332    }
333
334    /// 캐시 저장 (REFRESH MATERIALIZED VIEW 호출 후)
335    pub fn set_cache(&self, name: &str, batches: Vec<RecordBatch>) -> DbxResult<()> {
336        let entry = self.views.get(&name.to_lowercase()).ok_or_else(|| {
337            DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
338        })?;
339        let mut e = entry.write().unwrap();
340        e.cache = Some(batches);
341        e.refreshed_at = Some(Instant::now());
342        Ok(())
343    }
344
345    /// 캐시가 유효한지 확인
346    ///
347    /// - 한 번도 갱신되지 않았으면 false (stale)
348    /// - 수동 갱신 전용(interval 없음): 갱신된 적 있으면 true
349    /// - interval이 있으면: 마지막 갱신으로부터 interval 초 미만이면 true
350    pub fn is_fresh(&self, name: &str) -> bool {
351        let entry = match self.views.get(&name.to_lowercase()) {
352            Some(e) => e,
353            None => return false,
354        };
355        let e = entry.read().unwrap();
356        match (e.refreshed_at, e.refresh_interval_secs) {
357            (None, _) => false,
358            (Some(_), None) => true,
359            (Some(t), Some(secs)) => t.elapsed().as_secs() < secs,
360        }
361    }
362
363    /// 캐시 읽기 (SELECT 캐시 히트 시 사용)
364    pub fn get_cache(&self, name: &str) -> Option<Vec<RecordBatch>> {
365        let entry = self.views.get(&name.to_lowercase())?;
366        entry.read().unwrap().cache.clone()
367    }
368
369    /// 뷰 SQL 읽기 (REFRESH 시 재실행 대상)
370    pub fn get_sql(&self, name: &str) -> Option<String> {
371        Some(
372            self.views
373                .get(&name.to_lowercase())?
374                .read()
375                .unwrap()
376                .sql
377                .clone(),
378        )
379    }
380
381    /// 등록된 구체화된 뷰 이름 목록
382    pub fn list(&self) -> Vec<String> {
383        self.views.iter().map(|e| e.key().clone()).collect()
384    }
385
386    /// 구체화된 뷰 삭제 (remove: Rust의 Drop 트레이트와 이름 충돌 방지)
387    pub fn remove(&self, name: &str) -> DbxResult<()> {
388        self.views
389            .remove(&name.to_lowercase())
390            .map(|_| ())
391            .ok_or_else(|| {
392                DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
393            })
394    }
395
396    // ════════════════════════════════════════════
397    // Event-Driven Notification API
398    // ════════════════════════════════════════════
399
400    /// DML(INSERT/UPDATE/DELETE) 발생 시 호출.
401    ///
402    /// 변경된 테이블을 참조하는 모든 MV를 dirty로 마킹하고
403    /// 백그라운드 갱신 스레드를 즉시 깨웁니다.
404    ///
405    /// 이 메서드는 멱등적이며, 빈번하게 호출해도 안전합니다.
406    pub fn notify_change(&self, table: &str) {
407        if self.views.is_empty() {
408            return; // MV 없으면 즉시 반환 (zero overhead)
409        }
410        let table_lower = table.to_lowercase();
411        // shard 접미사 제거 (예: "users__shard_0" → "users")
412        let base_table = if let Some(idx) = table_lower.find("__shard_") {
413            &table_lower[..idx]
414        } else {
415            &table_lower
416        };
417        for entry in self.views.iter() {
418            let mv_name = entry.key();
419            let e = entry.value().read().unwrap();
420            if e.source_tables.iter().any(|t| t == base_table) {
421                drop(e); // lock 해제 후 알림
422                self.notifier.mark_dirty(mv_name);
423            }
424        }
425    }
426
427    /// 백그라운드 스레드용: dirty MV가 생길 때까지 블로킹 대기 후 drain
428    pub fn wait_and_take_dirty(&self) -> HashSet<String> {
429        self.notifier.wait_and_take()
430    }
431
432    /// 백그라운드 스레드용: 현재 dirty set을 non-blocking으로 drain
433    pub fn take_dirty(&self) -> HashSet<String> {
434        self.notifier.take()
435    }
436
437    /// 등록된 MV 중 stale(갱신 필요)인 것들의 이름 반환 (폴링 호환)
438    pub fn stale_views(&self) -> Vec<String> {
439        self.views
440            .iter()
441            .filter(|e| {
442                let entry = e.value().read().unwrap();
443                match (entry.refreshed_at, entry.refresh_interval_secs) {
444                    (None, _) => true,
445                    (Some(_), None) => false,
446                    (Some(t), Some(secs)) => t.elapsed().as_secs() >= secs,
447                }
448            })
449            .map(|e| e.key().clone())
450            .collect()
451    }
452}
453
454/// Arc로 래핑된 공유 MaterializedViewRegistry 타입 별칭
455pub type SharedMaterializedViewRegistry = Arc<MaterializedViewRegistry>;
456
457#[cfg(test)]
458mod matview_tests {
459    use super::*;
460    use arrow::array::Int64Array;
461    use arrow::datatypes::{DataType, Field, Schema};
462
463    fn make_batch(n: i64) -> RecordBatch {
464        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
465        RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![n]))]).unwrap()
466    }
467
468    #[test]
469    fn test_materialized_view_cache() {
470        let reg = MaterializedViewRegistry::new();
471        reg.create("mv_users", "SELECT id FROM users", None)
472            .unwrap();
473        assert!(!reg.is_fresh("mv_users")); // 생성 직후 stale
474
475        reg.set_cache("mv_users", vec![make_batch(1), make_batch(2)])
476            .unwrap();
477        assert!(reg.is_fresh("mv_users")); // 갱신 후 fresh
478
479        let cached = reg.get_cache("mv_users").unwrap();
480        assert_eq!(cached.len(), 2);
481    }
482
483    #[test]
484    fn test_matview_drop() {
485        let reg = MaterializedViewRegistry::new();
486        reg.create("mv_test", "SELECT 1", None).unwrap();
487        assert!(reg.get_sql("mv_test").is_some());
488        reg.remove("mv_test").unwrap();
489        assert!(reg.get_sql("mv_test").is_none());
490    }
491
492    #[test]
493    fn test_matview_drop_nonexistent_fails() {
494        let reg = MaterializedViewRegistry::new();
495        assert!(reg.remove("nonexistent").is_err());
496    }
497
498    #[test]
499    fn test_matview_with_interval() {
500        let reg = MaterializedViewRegistry::new();
501        // 300초 갱신 주기
502        reg.create("mv_sales", "SELECT * FROM sales", Some(300))
503            .unwrap();
504        assert!(!reg.is_fresh("mv_sales")); // 아직 갱신 안 됨
505
506        reg.set_cache("mv_sales", vec![make_batch(42)]).unwrap();
507        assert!(reg.is_fresh("mv_sales")); // 방금 갱신 — 300초 미만이므로 fresh
508
509        let cached = reg.get_cache("mv_sales").unwrap();
510        assert_eq!(cached[0].num_rows(), 1);
511    }
512
513    #[test]
514    fn test_matview_list() {
515        let reg = MaterializedViewRegistry::new();
516        reg.create("mv_a", "SELECT 1", None).unwrap();
517        reg.create("mv_b", "SELECT 2", None).unwrap();
518        let mut names = reg.list();
519        names.sort();
520        assert_eq!(names, vec!["mv_a", "mv_b"]);
521    }
522
523    #[test]
524    fn test_extract_source_tables() {
525        assert_eq!(
526            extract_source_tables("SELECT id, name FROM users WHERE active = true"),
527            vec!["users"]
528        );
529        assert_eq!(
530            extract_source_tables("SELECT * FROM orders JOIN users ON orders.uid = users.id"),
531            vec!["orders", "users"]
532        );
533        assert_eq!(
534            extract_source_tables("SELECT AVG(price) FROM products"),
535            vec!["products"]
536        );
537        // duplicate elimination
538        assert_eq!(
539            extract_source_tables("SELECT * FROM t1 JOIN t1 ON t1.a = t1.b"),
540            vec!["t1"]
541        );
542    }
543
544    #[test]
545    fn test_notify_change_marks_dirty() {
546        let reg = MaterializedViewRegistry::new();
547        reg.create("mv_users", "SELECT id FROM users", None)
548            .unwrap();
549        reg.create("mv_orders", "SELECT * FROM orders", None)
550            .unwrap();
551
552        // users 변경 → mv_users만 dirty
553        reg.notify_change("users");
554        let dirty = reg.take_dirty();
555        assert!(dirty.contains("mv_users"));
556        assert!(!dirty.contains("mv_orders"));
557
558        // orders 변경 → mv_orders만 dirty
559        reg.notify_change("orders");
560        let dirty = reg.take_dirty();
561        assert!(dirty.contains("mv_orders"));
562        assert!(!dirty.contains("mv_users"));
563    }
564
565    #[test]
566    fn test_notify_change_shard_table() {
567        let reg = MaterializedViewRegistry::new();
568        reg.create("mv_users", "SELECT id FROM users", None)
569            .unwrap();
570
571        // shard 테이블 변경도 인식
572        reg.notify_change("users__shard_0");
573        let dirty = reg.take_dirty();
574        assert!(dirty.contains("mv_users"));
575    }
576
577    #[test]
578    fn test_configurable_min_refresh_interval() {
579        let reg = MaterializedViewRegistry::new();
580        assert_eq!(reg.min_refresh_interval_ms(), 1000); // 기본 1초
581
582        reg.set_min_refresh_interval_ms(500);
583        assert_eq!(reg.min_refresh_interval_ms(), 500);
584        assert_eq!(reg.min_refresh_interval(), Duration::from_millis(500));
585    }
586
587    #[test]
588    fn test_notify_no_views_is_noop() {
589        let reg = MaterializedViewRegistry::new();
590        // MV 없을 때 호출해도 패닉 없음
591        reg.notify_change("some_table");
592        let dirty = reg.take_dirty();
593        assert!(dirty.is_empty());
594    }
595}