Skip to main content

dbx_core/sql/
view.rs

1//! SQL Views — CREATE VIEW / DROP VIEW 지원
2//!
3//! 뷰는 이름 → SQL 문자열 매핑을 DashMap으로 저장합니다.
4//! SQL 실행 전 FROM 절에서 뷰 이름 발견 시 서브쿼리로 인라인 치환.
5
6use dashmap::DashMap;
7use std::sync::Arc;
8
9use crate::error::{DbxError, DbxResult};
10
11/// SQL 뷰 레지스트리
12///
13/// 뷰 이름 → SQL 정의 문자열을 스레드 안전하게 관리합니다.
14#[derive(Debug, Default)]
15pub struct ViewRegistry {
16    /// view_name (소문자) → SQL 텍스트
17    views: DashMap<String, String>,
18}
19
20impl ViewRegistry {
21    /// 새 ViewRegistry 생성
22    pub fn new() -> Self {
23        Self::default()
24    }
25
26    /// 뷰 생성 (이미 존재하면 덮어씀)
27    pub fn create(&self, name: &str, sql: &str) -> DbxResult<()> {
28        self.views.insert(name.to_lowercase(), sql.to_string());
29        Ok(())
30    }
31
32    /// 뷰 삭제. 없으면 Err 반환.
33    pub fn drop(&self, name: &str) -> DbxResult<()> {
34        self.views
35            .remove(&name.to_lowercase())
36            .map(|_| ())
37            .ok_or_else(|| DbxError::InvalidArguments(format!("뷰 '{}' 를 찾을 수 없음", name)))
38    }
39
40    /// 뷰 존재 여부 확인
41    pub fn exists(&self, name: &str) -> bool {
42        self.views.contains_key(&name.to_lowercase())
43    }
44
45    /// SQL의 FROM 절에서 뷰 이름을 서브쿼리로 치환
46    ///
47    /// 예: `SELECT * FROM active_users`
48    ///   → `SELECT * FROM (SELECT id, name FROM users WHERE active = true) AS active_users`
49    pub fn expand(&self, sql: &str) -> String {
50        if self.views.is_empty() {
51            return sql.to_string();
52        }
53
54        let mut result = String::with_capacity(sql.len() * 2);
55        let mut tokens = Vec::new();
56        let mut current_token = String::new();
57        let mut start_idx = 0;
58        let mut in_quotes = false;
59        let mut quote_char = ' ';
60
61        // Simple tokenizer that handles quotes and basic delimiters
62        for (i, c) in sql.char_indices() {
63            if in_quotes {
64                current_token.push(c);
65                if c == quote_char {
66                    in_quotes = false;
67                    tokens.push((start_idx, current_token.clone()));
68                    current_token.clear();
69                }
70                continue;
71            }
72
73            if c == '\'' || c == '"' {
74                if !current_token.is_empty() {
75                    tokens.push((start_idx, current_token.clone()));
76                    current_token.clear();
77                }
78                in_quotes = true;
79                quote_char = c;
80                start_idx = i;
81                current_token.push(c);
82                continue;
83            }
84
85            if c.is_whitespace() || c == '(' || c == ')' || c == ',' || c == ';' {
86                if !current_token.is_empty() {
87                    tokens.push((start_idx, current_token.clone()));
88                    current_token.clear();
89                }
90                tokens.push((i, c.to_string()));
91            } else {
92                if current_token.is_empty() {
93                    start_idx = i;
94                }
95                current_token.push(c);
96            }
97        }
98        if !current_token.is_empty() {
99            tokens.push((start_idx, current_token));
100        }
101
102        let mut last_pos = 0;
103        let mut i = 0;
104        while i < tokens.len() {
105            let (pos, ref token) = tokens[i];
106            let upper_token = token.to_uppercase();
107
108            if (upper_token == "FROM" || upper_token == "JOIN") && i + 1 < tokens.len() {
109                // Find next non-whitespace token which could be the view name
110                let mut j = i + 1;
111                while j < tokens.len() && tokens[j].1.chars().all(|c| c.is_whitespace()) {
112                    j += 1;
113                }
114
115                if j < tokens.len() {
116                    let (name_pos, ref name_token) = tokens[j];
117                    let name_lower = name_token.to_lowercase();
118
119                    if let Some(entry) = self.views.get(&name_lower) {
120                        let view_sql = entry.value();
121                        // Copy everything from last_pos to the start of the FROM/JOIN keyword
122                        result.push_str(&sql[last_pos..pos]);
123                        // Add the expanded view
124                        result.push_str(&upper_token);
125                        result.push_str(" (");
126                        result.push_str(view_sql);
127                        result.push_str(") AS ");
128                        result.push_str(&name_lower);
129
130                        // Advance last_pos to the end of the view name token
131                        last_pos = name_pos + name_token.len();
132                        i = j + 1;
133                        continue;
134                    }
135                }
136            }
137            i += 1;
138        }
139
140        result.push_str(&sql[last_pos..]);
141        result
142    }
143
144    /// 등록된 뷰 목록 반환
145    pub fn list_views(&self) -> Vec<String> {
146        self.views.iter().map(|e| e.key().clone()).collect()
147    }
148}
149
150/// Arc로 래핑된 공유 ViewRegistry 타입 별칭
151pub type SharedViewRegistry = Arc<ViewRegistry>;
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn test_create_and_exists() {
159        let reg = ViewRegistry::new();
160        assert!(!reg.exists("active_users"));
161        reg.create(
162            "active_users",
163            "SELECT id, name FROM users WHERE active = true",
164        )
165        .unwrap();
166        assert!(reg.exists("active_users"));
167    }
168
169    #[test]
170    fn test_expand_multiple_views() {
171        let reg = ViewRegistry::new();
172        reg.create("v1", "SELECT 1").unwrap();
173        reg.create("v2", "SELECT 2").unwrap();
174        let sql = "SELECT * FROM v1 JOIN v2 ON v1.x = v2.x";
175        let expanded = reg.expand(sql);
176        assert!(expanded.contains("FROM (SELECT 1) AS v1"));
177        assert!(expanded.contains("JOIN (SELECT 2) AS v2"));
178    }
179
180    #[test]
181    fn test_expand_word_boundaries() {
182        let reg = ViewRegistry::new();
183        reg.create("v", "SELECT 1").unwrap();
184        let sql = "SELECT * FROM v, (SELECT * FROM v) AS sub";
185        let expanded = reg.expand(sql);
186        let count = expanded.matches("(SELECT 1)").count();
187        assert_eq!(count, 2);
188    }
189
190    #[test]
191    fn test_expand_not_a_keyword() {
192        let reg = ViewRegistry::new();
193        reg.create("v", "SELECT 1").unwrap();
194        let sql = "SELECT field_from_v FROM v";
195        let expanded = reg.expand(sql);
196        assert!(expanded.contains("field_from_v"));
197        assert!(expanded.contains("FROM (SELECT 1) AS v"));
198    }
199
200    #[test]
201    fn test_expand_with_non_ascii() {
202        let reg = ViewRegistry::new();
203        reg.create("v", "SELECT 'İ'").unwrap();
204        let sql = "SELECT * FROM v";
205        let expanded = reg.expand(sql);
206        assert!(expanded.contains("(SELECT 'İ') AS v"));
207    }
208
209    #[test]
210    fn test_expand_with_quotes() {
211        let reg = ViewRegistry::new();
212        reg.create("v", "SELECT 1").unwrap();
213        let sql = "SELECT 'FROM v' FROM v";
214        let expanded = reg.expand(sql);
215        assert!(expanded.contains("'FROM v'"));
216        let count = expanded.matches("(SELECT 1)").count();
217        assert_eq!(count, 1);
218    }
219
220    #[test]
221    fn test_expand_with_whitespace() {
222        let reg = ViewRegistry::new();
223        reg.create("v", "SELECT 1").unwrap();
224        let sql = "SELECT * FROM\n v";
225        let expanded = reg.expand(sql);
226        assert!(expanded.contains("FROM (SELECT 1) AS v"));
227    }
228
229    #[test]
230    fn test_create_case_insensitive() {
231        let reg = ViewRegistry::new();
232        reg.create("MyView", "SELECT 1").unwrap();
233        assert!(reg.exists("myview"));
234        assert!(reg.exists("MyView")); // 대소문자 무시
235        assert!(reg.exists("MYVIEW"));
236    }
237
238    #[test]
239    fn test_drop_view() {
240        let reg = ViewRegistry::new();
241        reg.create("v", "SELECT 1 AS x").unwrap();
242        assert!(reg.exists("v"));
243        reg.drop("v").unwrap();
244        assert!(!reg.exists("v"));
245    }
246
247    #[test]
248    fn test_drop_nonexistent_fails() {
249        let reg = ViewRegistry::new();
250        assert!(reg.drop("nonexistent").is_err());
251    }
252
253    #[test]
254    fn test_expand_replaces_from_clause() {
255        let reg = ViewRegistry::new();
256        reg.create(
257            "active_users",
258            "SELECT id, name FROM users WHERE active = true",
259        )
260        .unwrap();
261
262        let sql = "SELECT * FROM active_users";
263        let expanded = reg.expand(sql);
264        assert!(
265            expanded.contains("(SELECT id, name FROM users WHERE active = true)"),
266            "서브쿼리가 삽입되어야 함: {}",
267            expanded
268        );
269        assert!(
270            expanded.contains("AS active_users"),
271            "별칭이 지정되어야 함: {}",
272            expanded
273        );
274    }
275
276    #[test]
277    fn test_expand_no_match() {
278        let reg = ViewRegistry::new();
279        reg.create("v", "SELECT 1").unwrap();
280        let sql = "SELECT * FROM users"; // 'v' 가 아닌 테이블
281        let expanded = reg.expand(sql);
282        assert_eq!(expanded, sql, "치환 없이 원래 SQL 유지");
283    }
284
285    #[test]
286    fn test_list_views() {
287        let reg = ViewRegistry::new();
288        reg.create("v1", "SELECT 1").unwrap();
289        reg.create("v2", "SELECT 2").unwrap();
290        let mut views = reg.list_views();
291        views.sort();
292        assert_eq!(views, vec!["v1", "v2"]);
293    }
294}
295
296// ════════════════════════════════════════════
297// Materialized View Registry (Event-Driven)
298// ════════════════════════════════════════════
299
300use arrow::record_batch::RecordBatch;
301use std::collections::HashSet;
302use std::sync::{Condvar, Mutex, RwLock};
303use std::time::{Duration, Instant};
304
305/// SQL 문에서 FROM / JOIN 절의 테이블명을 간단히 추출합니다.
306///
307/// 완전한 SQL 파서 대신, `FROM <table>` 및 `JOIN <table>` 패턴을 대소문자 무시로 추출합니다.
308/// 서브쿼리 내 테이블은 포함하지 않지만, 대부분의 MV 사용 사례에서 충분합니다.
309fn extract_source_tables(sql: &str) -> Vec<String> {
310    let upper = sql.to_uppercase();
311    let tokens: Vec<&str> = upper.split_whitespace().collect();
312    let original_tokens: Vec<&str> = sql.split_whitespace().collect();
313    let mut tables = Vec::new();
314
315    for (i, token) in tokens.iter().enumerate() {
316        if (*token == "FROM" || *token == "JOIN") && i + 1 < tokens.len() {
317            let table_name = original_tokens[i + 1]
318                .trim_matches(|c: char| c == '(' || c == ')' || c == ',' || c == ';')
319                .to_lowercase();
320            if !table_name.is_empty() && table_name != "select" && table_name != "(" {
321                tables.push(table_name);
322            }
323        }
324    }
325    tables.sort();
326    tables.dedup();
327    tables
328}
329
330/// 구체화된 뷰 항목 (내부 저장 단위)
331struct MatViewEntry {
332    /// 뷰를 정의하는 SQL 쿼리
333    sql: String,
334    /// 사전 계산된 쿼리 결과 캐시 (최초에는 None — stale 상태)
335    cache: Option<Vec<RecordBatch>>,
336    /// 마지막 갱신 시각
337    refreshed_at: Option<Instant>,
338    /// 자동 갱신 주기 (초 단위). None이면 이벤트 기반 갱신만.
339    refresh_interval_secs: Option<u64>,
340    /// MV SQL이 참조하는 소스 테이블 목록 (FROM/JOIN 절에서 추출)
341    source_tables: Vec<String>,
342}
343
344/// 이벤트 기반 Materialized View 알림 채널
345///
346/// DML 발생 시 `notify_change(table)` → dirty set에 추가 → Condvar 깨움
347/// 백그라운드 스레드가 즉시 깨어나 debounce 후 갱신 실행
348pub struct MatViewNotifier {
349    /// 갱신 대기 중인 MV 이름 세트
350    dirty: Mutex<HashSet<String>>,
351    /// 블로킹 대기용 Condvar
352    condvar: Condvar,
353}
354
355impl MatViewNotifier {
356    fn new() -> Self {
357        Self {
358            dirty: Mutex::new(HashSet::new()),
359            condvar: Condvar::new(),
360        }
361    }
362
363    /// dirty set에 MV 이름 추가 + Condvar 깨움
364    fn mark_dirty(&self, mv_name: &str) {
365        let mut dirty = self.dirty.lock().unwrap();
366        dirty.insert(mv_name.to_string());
367        self.condvar.notify_one();
368    }
369
370    /// dirty set이 비어있으면 블로킹 대기, 채워지면 drain하여 반환
371    pub fn wait_and_take(&self) -> HashSet<String> {
372        let mut dirty = self.dirty.lock().unwrap();
373        while dirty.is_empty() {
374            dirty = self.condvar.wait(dirty).unwrap();
375        }
376        dirty.drain().collect()
377    }
378
379    /// non-blocking: 현재 dirty set을 drain하여 반환 (비어있으면 빈 세트)
380    pub fn take(&self) -> HashSet<String> {
381        let mut dirty = self.dirty.lock().unwrap();
382        dirty.drain().collect()
383    }
384}
385
386/// 구체화된 뷰 레지스트리 (이벤트 기반 갱신)
387///
388/// CREATE MATERIALIZED VIEW / REFRESH / DROP 명령어를 처리합니다.
389/// 각 뷰는 등록된 SQL과 사전 계산된 Arrow RecordBatch 캐시를 가집니다.
390///
391/// DML 발생 시 `notify_change(table)` 호출로 해당 테이블을 참조하는 MV를 즉시 갱신합니다.
392/// `min_refresh_interval_ms`로 갱신 폭풍을 방지합니다 (debounce).
393pub struct MaterializedViewRegistry {
394    views: DashMap<String, RwLock<MatViewEntry>>,
395    /// 이벤트 알림 채널
396    notifier: MatViewNotifier,
397    /// 최소 갱신 주기 (밀리초 단위, 기본 1000ms). debounce용.
398    min_refresh_interval_ms: std::sync::atomic::AtomicU64,
399}
400
401impl Default for MaterializedViewRegistry {
402    fn default() -> Self {
403        Self {
404            views: DashMap::new(),
405            notifier: MatViewNotifier::new(),
406            min_refresh_interval_ms: std::sync::atomic::AtomicU64::new(1000),
407        }
408    }
409}
410
411impl MaterializedViewRegistry {
412    pub fn new() -> Self {
413        Self::default()
414    }
415
416    /// 최소 갱신 주기를 밀리초 단위로 설정합니다.
417    ///
418    /// 이 값은 DML 이벤트 발생 후 실제 MV 갱신까지의 최소 대기 시간입니다.
419    /// 빈번한 쓰기 시 갱신 폭풍을 방지하는 debounce 역할을 합니다.
420    ///
421    /// # 예시
422    ///
423    /// ```rust,ignore
424    /// registry.set_min_refresh_interval_ms(500); // 0.5초 debounce
425    /// ```
426    pub fn set_min_refresh_interval_ms(&self, ms: u64) {
427        self.min_refresh_interval_ms
428            .store(ms, std::sync::atomic::Ordering::Relaxed);
429    }
430
431    /// 현재 설정된 최소 갱신 주기 (밀리초)
432    pub fn min_refresh_interval_ms(&self) -> u64 {
433        self.min_refresh_interval_ms
434            .load(std::sync::atomic::Ordering::Relaxed)
435    }
436
437    /// 최소 갱신 주기를 Duration으로 반환
438    pub fn min_refresh_interval(&self) -> Duration {
439        Duration::from_millis(self.min_refresh_interval_ms())
440    }
441
442    /// 구체화된 뷰 등록 (최초에는 stale — cache 없음)
443    ///
444    /// SQL에서 FROM/JOIN 절을 파싱하여 source_tables를 자동 추출합니다.
445    pub fn create(
446        &self,
447        name: &str,
448        sql: &str,
449        refresh_interval_secs: Option<u64>,
450    ) -> DbxResult<()> {
451        let source_tables = extract_source_tables(sql);
452        self.views.insert(
453            name.to_lowercase(),
454            RwLock::new(MatViewEntry {
455                sql: sql.to_string(),
456                cache: None,
457                refreshed_at: None,
458                refresh_interval_secs,
459                source_tables,
460            }),
461        );
462        Ok(())
463    }
464
465    /// 캐시 저장 (REFRESH MATERIALIZED VIEW 호출 후)
466    pub fn set_cache(&self, name: &str, batches: Vec<RecordBatch>) -> DbxResult<()> {
467        let entry = self.views.get(&name.to_lowercase()).ok_or_else(|| {
468            DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
469        })?;
470        let mut e = entry.write().unwrap();
471        e.cache = Some(batches);
472        e.refreshed_at = Some(Instant::now());
473        Ok(())
474    }
475
476    /// 캐시가 유효한지 확인
477    ///
478    /// - 한 번도 갱신되지 않았으면 false (stale)
479    /// - 수동 갱신 전용(interval 없음): 갱신된 적 있으면 true
480    /// - interval이 있으면: 마지막 갱신으로부터 interval 초 미만이면 true
481    pub fn is_fresh(&self, name: &str) -> bool {
482        let entry = match self.views.get(&name.to_lowercase()) {
483            Some(e) => e,
484            None => return false,
485        };
486        let e = entry.read().unwrap();
487        match (e.refreshed_at, e.refresh_interval_secs) {
488            (None, _) => false,
489            (Some(_), None) => true,
490            (Some(t), Some(secs)) => t.elapsed().as_secs() < secs,
491        }
492    }
493
494    /// 캐시 읽기 (SELECT 캐시 히트 시 사용)
495    pub fn get_cache(&self, name: &str) -> Option<Vec<RecordBatch>> {
496        let entry = self.views.get(&name.to_lowercase())?;
497        entry.read().unwrap().cache.clone()
498    }
499
500    /// 뷰 SQL 읽기 (REFRESH 시 재실행 대상)
501    pub fn get_sql(&self, name: &str) -> Option<String> {
502        Some(
503            self.views
504                .get(&name.to_lowercase())?
505                .read()
506                .unwrap()
507                .sql
508                .clone(),
509        )
510    }
511
512    /// 등록된 구체화된 뷰 이름 목록
513    pub fn list(&self) -> Vec<String> {
514        self.views.iter().map(|e| e.key().clone()).collect()
515    }
516
517    /// 구체화된 뷰 삭제 (remove: Rust의 Drop 트레이트와 이름 충돌 방지)
518    pub fn remove(&self, name: &str) -> DbxResult<()> {
519        self.views
520            .remove(&name.to_lowercase())
521            .map(|_| ())
522            .ok_or_else(|| {
523                DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
524            })
525    }
526
527    // ════════════════════════════════════════════
528    // Event-Driven Notification API
529    // ════════════════════════════════════════════
530
531    /// DML(INSERT/UPDATE/DELETE) 발생 시 호출.
532    ///
533    /// 변경된 테이블을 참조하는 모든 MV를 dirty로 마킹하고
534    /// 백그라운드 갱신 스레드를 즉시 깨웁니다.
535    ///
536    /// 이 메서드는 멱등적이며, 빈번하게 호출해도 안전합니다.
537    pub fn notify_change(&self, table: &str) {
538        if self.views.is_empty() {
539            return; // MV 없으면 즉시 반환 (zero overhead)
540        }
541        let table_lower = table.to_lowercase();
542        // shard 접미사 제거 (예: "users__shard_0" → "users")
543        let base_table = if let Some(idx) = table_lower.find("__shard_") {
544            &table_lower[..idx]
545        } else {
546            &table_lower
547        };
548        for entry in self.views.iter() {
549            let mv_name = entry.key();
550            let e = entry.value().read().unwrap();
551            if e.source_tables.iter().any(|t| t == base_table) {
552                drop(e); // lock 해제 후 알림
553                self.notifier.mark_dirty(mv_name);
554            }
555        }
556    }
557
558    /// 백그라운드 스레드용: dirty MV가 생길 때까지 블로킹 대기 후 drain
559    pub fn wait_and_take_dirty(&self) -> HashSet<String> {
560        self.notifier.wait_and_take()
561    }
562
563    /// 백그라운드 스레드용: 현재 dirty set을 non-blocking으로 drain
564    pub fn take_dirty(&self) -> HashSet<String> {
565        self.notifier.take()
566    }
567
568    /// 등록된 MV 중 stale(갱신 필요)인 것들의 이름 반환 (폴링 호환)
569    pub fn stale_views(&self) -> Vec<String> {
570        self.views
571            .iter()
572            .filter(|e| {
573                let entry = e.value().read().unwrap();
574                match (entry.refreshed_at, entry.refresh_interval_secs) {
575                    (None, _) => true,
576                    (Some(_), None) => false,
577                    (Some(t), Some(secs)) => t.elapsed().as_secs() >= secs,
578                }
579            })
580            .map(|e| e.key().clone())
581            .collect()
582    }
583}
584
585/// Arc로 래핑된 공유 MaterializedViewRegistry 타입 별칭
586pub type SharedMaterializedViewRegistry = Arc<MaterializedViewRegistry>;
587
588#[cfg(test)]
589mod matview_tests {
590    use super::*;
591    use arrow::array::Int64Array;
592    use arrow::datatypes::{DataType, Field, Schema};
593
594    fn make_batch(n: i64) -> RecordBatch {
595        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
596        RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![n]))]).unwrap()
597    }
598
599    #[test]
600    fn test_materialized_view_cache() {
601        let reg = MaterializedViewRegistry::new();
602        reg.create("mv_users", "SELECT id FROM users", None)
603            .unwrap();
604        assert!(!reg.is_fresh("mv_users")); // 생성 직후 stale
605
606        reg.set_cache("mv_users", vec![make_batch(1), make_batch(2)])
607            .unwrap();
608        assert!(reg.is_fresh("mv_users")); // 갱신 후 fresh
609
610        let cached = reg.get_cache("mv_users").unwrap();
611        assert_eq!(cached.len(), 2);
612    }
613
614    #[test]
615    fn test_matview_drop() {
616        let reg = MaterializedViewRegistry::new();
617        reg.create("mv_test", "SELECT 1", None).unwrap();
618        assert!(reg.get_sql("mv_test").is_some());
619        reg.remove("mv_test").unwrap();
620        assert!(reg.get_sql("mv_test").is_none());
621    }
622
623    #[test]
624    fn test_matview_drop_nonexistent_fails() {
625        let reg = MaterializedViewRegistry::new();
626        assert!(reg.remove("nonexistent").is_err());
627    }
628
629    #[test]
630    fn test_matview_with_interval() {
631        let reg = MaterializedViewRegistry::new();
632        // 300초 갱신 주기
633        reg.create("mv_sales", "SELECT * FROM sales", Some(300))
634            .unwrap();
635        assert!(!reg.is_fresh("mv_sales")); // 아직 갱신 안 됨
636
637        reg.set_cache("mv_sales", vec![make_batch(42)]).unwrap();
638        assert!(reg.is_fresh("mv_sales")); // 방금 갱신 — 300초 미만이므로 fresh
639
640        let cached = reg.get_cache("mv_sales").unwrap();
641        assert_eq!(cached[0].num_rows(), 1);
642    }
643
644    #[test]
645    fn test_matview_list() {
646        let reg = MaterializedViewRegistry::new();
647        reg.create("mv_a", "SELECT 1", None).unwrap();
648        reg.create("mv_b", "SELECT 2", None).unwrap();
649        let mut names = reg.list();
650        names.sort();
651        assert_eq!(names, vec!["mv_a", "mv_b"]);
652    }
653
654    #[test]
655    fn test_extract_source_tables() {
656        assert_eq!(
657            extract_source_tables("SELECT id, name FROM users WHERE active = true"),
658            vec!["users"]
659        );
660        assert_eq!(
661            extract_source_tables("SELECT * FROM orders JOIN users ON orders.uid = users.id"),
662            vec!["orders", "users"]
663        );
664        assert_eq!(
665            extract_source_tables("SELECT AVG(price) FROM products"),
666            vec!["products"]
667        );
668        // duplicate elimination
669        assert_eq!(
670            extract_source_tables("SELECT * FROM t1 JOIN t1 ON t1.a = t1.b"),
671            vec!["t1"]
672        );
673    }
674
675    #[test]
676    fn test_notify_change_marks_dirty() {
677        let reg = MaterializedViewRegistry::new();
678        reg.create("mv_users", "SELECT id FROM users", None)
679            .unwrap();
680        reg.create("mv_orders", "SELECT * FROM orders", None)
681            .unwrap();
682
683        // users 변경 → mv_users만 dirty
684        reg.notify_change("users");
685        let dirty = reg.take_dirty();
686        assert!(dirty.contains("mv_users"));
687        assert!(!dirty.contains("mv_orders"));
688
689        // orders 변경 → mv_orders만 dirty
690        reg.notify_change("orders");
691        let dirty = reg.take_dirty();
692        assert!(dirty.contains("mv_orders"));
693        assert!(!dirty.contains("mv_users"));
694    }
695
696    #[test]
697    fn test_notify_change_shard_table() {
698        let reg = MaterializedViewRegistry::new();
699        reg.create("mv_users", "SELECT id FROM users", None)
700            .unwrap();
701
702        // shard 테이블 변경도 인식
703        reg.notify_change("users__shard_0");
704        let dirty = reg.take_dirty();
705        assert!(dirty.contains("mv_users"));
706    }
707
708    #[test]
709    fn test_configurable_min_refresh_interval() {
710        let reg = MaterializedViewRegistry::new();
711        assert_eq!(reg.min_refresh_interval_ms(), 1000); // 기본 1초
712
713        reg.set_min_refresh_interval_ms(500);
714        assert_eq!(reg.min_refresh_interval_ms(), 500);
715        assert_eq!(reg.min_refresh_interval(), Duration::from_millis(500));
716    }
717
718    #[test]
719    fn test_notify_no_views_is_noop() {
720        let reg = MaterializedViewRegistry::new();
721        // MV 없을 때 호출해도 패닉 없음
722        reg.notify_change("some_table");
723        let dirty = reg.take_dirty();
724        assert!(dirty.is_empty());
725    }
726}