1use dashmap::DashMap;
7use std::sync::Arc;
8
9use crate::error::{DbxError, DbxResult};
10
11#[derive(Debug, Default)]
15pub struct ViewRegistry {
16 views: DashMap<String, String>,
18}
19
20impl ViewRegistry {
21 pub fn new() -> Self {
23 Self::default()
24 }
25
26 pub fn create(&self, name: &str, sql: &str) -> DbxResult<()> {
28 self.views.insert(name.to_lowercase(), sql.to_string());
29 Ok(())
30 }
31
32 pub fn drop(&self, name: &str) -> DbxResult<()> {
34 self.views
35 .remove(&name.to_lowercase())
36 .map(|_| ())
37 .ok_or_else(|| DbxError::InvalidArguments(format!("뷰 '{}' 를 찾을 수 없음", name)))
38 }
39
40 pub fn exists(&self, name: &str) -> bool {
42 self.views.contains_key(&name.to_lowercase())
43 }
44
45 pub fn expand(&self, sql: &str) -> String {
50 if self.views.is_empty() {
51 return sql.to_string();
52 }
53
54 let mut result = String::with_capacity(sql.len() * 2);
55 let mut tokens = Vec::new();
56 let mut current_token = String::new();
57 let mut start_idx = 0;
58 let mut in_quotes = false;
59 let mut quote_char = ' ';
60
61 for (i, c) in sql.char_indices() {
63 if in_quotes {
64 current_token.push(c);
65 if c == quote_char {
66 in_quotes = false;
67 tokens.push((start_idx, current_token.clone()));
68 current_token.clear();
69 }
70 continue;
71 }
72
73 if c == '\'' || c == '"' {
74 if !current_token.is_empty() {
75 tokens.push((start_idx, current_token.clone()));
76 current_token.clear();
77 }
78 in_quotes = true;
79 quote_char = c;
80 start_idx = i;
81 current_token.push(c);
82 continue;
83 }
84
85 if c.is_whitespace() || c == '(' || c == ')' || c == ',' || c == ';' {
86 if !current_token.is_empty() {
87 tokens.push((start_idx, current_token.clone()));
88 current_token.clear();
89 }
90 tokens.push((i, c.to_string()));
91 } else {
92 if current_token.is_empty() {
93 start_idx = i;
94 }
95 current_token.push(c);
96 }
97 }
98 if !current_token.is_empty() {
99 tokens.push((start_idx, current_token));
100 }
101
102 let mut last_pos = 0;
103 let mut i = 0;
104 while i < tokens.len() {
105 let (pos, ref token) = tokens[i];
106 let upper_token = token.to_uppercase();
107
108 if (upper_token == "FROM" || upper_token == "JOIN") && i + 1 < tokens.len() {
109 let mut j = i + 1;
111 while j < tokens.len() && tokens[j].1.chars().all(|c| c.is_whitespace()) {
112 j += 1;
113 }
114
115 if j < tokens.len() {
116 let (name_pos, ref name_token) = tokens[j];
117 let name_lower = name_token.to_lowercase();
118
119 if let Some(entry) = self.views.get(&name_lower) {
120 let view_sql = entry.value();
121 result.push_str(&sql[last_pos..pos]);
123 result.push_str(&upper_token);
125 result.push_str(" (");
126 result.push_str(view_sql);
127 result.push_str(") AS ");
128 result.push_str(&name_lower);
129
130 last_pos = name_pos + name_token.len();
132 i = j + 1;
133 continue;
134 }
135 }
136 }
137 i += 1;
138 }
139
140 result.push_str(&sql[last_pos..]);
141 result
142 }
143
144 pub fn list_views(&self) -> Vec<String> {
146 self.views.iter().map(|e| e.key().clone()).collect()
147 }
148}
149
150pub type SharedViewRegistry = Arc<ViewRegistry>;
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_create_and_exists() {
159 let reg = ViewRegistry::new();
160 assert!(!reg.exists("active_users"));
161 reg.create(
162 "active_users",
163 "SELECT id, name FROM users WHERE active = true",
164 )
165 .unwrap();
166 assert!(reg.exists("active_users"));
167 }
168
169 #[test]
170 fn test_expand_multiple_views() {
171 let reg = ViewRegistry::new();
172 reg.create("v1", "SELECT 1").unwrap();
173 reg.create("v2", "SELECT 2").unwrap();
174 let sql = "SELECT * FROM v1 JOIN v2 ON v1.x = v2.x";
175 let expanded = reg.expand(sql);
176 assert!(expanded.contains("FROM (SELECT 1) AS v1"));
177 assert!(expanded.contains("JOIN (SELECT 2) AS v2"));
178 }
179
180 #[test]
181 fn test_expand_word_boundaries() {
182 let reg = ViewRegistry::new();
183 reg.create("v", "SELECT 1").unwrap();
184 let sql = "SELECT * FROM v, (SELECT * FROM v) AS sub";
185 let expanded = reg.expand(sql);
186 let count = expanded.matches("(SELECT 1)").count();
187 assert_eq!(count, 2);
188 }
189
190 #[test]
191 fn test_expand_not_a_keyword() {
192 let reg = ViewRegistry::new();
193 reg.create("v", "SELECT 1").unwrap();
194 let sql = "SELECT field_from_v FROM v";
195 let expanded = reg.expand(sql);
196 assert!(expanded.contains("field_from_v"));
197 assert!(expanded.contains("FROM (SELECT 1) AS v"));
198 }
199
200 #[test]
201 fn test_expand_with_non_ascii() {
202 let reg = ViewRegistry::new();
203 reg.create("v", "SELECT 'İ'").unwrap();
204 let sql = "SELECT * FROM v";
205 let expanded = reg.expand(sql);
206 assert!(expanded.contains("(SELECT 'İ') AS v"));
207 }
208
209 #[test]
210 fn test_expand_with_quotes() {
211 let reg = ViewRegistry::new();
212 reg.create("v", "SELECT 1").unwrap();
213 let sql = "SELECT 'FROM v' FROM v";
214 let expanded = reg.expand(sql);
215 assert!(expanded.contains("'FROM v'"));
216 let count = expanded.matches("(SELECT 1)").count();
217 assert_eq!(count, 1);
218 }
219
220 #[test]
221 fn test_expand_with_whitespace() {
222 let reg = ViewRegistry::new();
223 reg.create("v", "SELECT 1").unwrap();
224 let sql = "SELECT * FROM\n v";
225 let expanded = reg.expand(sql);
226 assert!(expanded.contains("FROM (SELECT 1) AS v"));
227 }
228
229 #[test]
230 fn test_create_case_insensitive() {
231 let reg = ViewRegistry::new();
232 reg.create("MyView", "SELECT 1").unwrap();
233 assert!(reg.exists("myview"));
234 assert!(reg.exists("MyView")); assert!(reg.exists("MYVIEW"));
236 }
237
238 #[test]
239 fn test_drop_view() {
240 let reg = ViewRegistry::new();
241 reg.create("v", "SELECT 1 AS x").unwrap();
242 assert!(reg.exists("v"));
243 reg.drop("v").unwrap();
244 assert!(!reg.exists("v"));
245 }
246
247 #[test]
248 fn test_drop_nonexistent_fails() {
249 let reg = ViewRegistry::new();
250 assert!(reg.drop("nonexistent").is_err());
251 }
252
253 #[test]
254 fn test_expand_replaces_from_clause() {
255 let reg = ViewRegistry::new();
256 reg.create(
257 "active_users",
258 "SELECT id, name FROM users WHERE active = true",
259 )
260 .unwrap();
261
262 let sql = "SELECT * FROM active_users";
263 let expanded = reg.expand(sql);
264 assert!(
265 expanded.contains("(SELECT id, name FROM users WHERE active = true)"),
266 "서브쿼리가 삽입되어야 함: {}",
267 expanded
268 );
269 assert!(
270 expanded.contains("AS active_users"),
271 "별칭이 지정되어야 함: {}",
272 expanded
273 );
274 }
275
276 #[test]
277 fn test_expand_no_match() {
278 let reg = ViewRegistry::new();
279 reg.create("v", "SELECT 1").unwrap();
280 let sql = "SELECT * FROM users"; let expanded = reg.expand(sql);
282 assert_eq!(expanded, sql, "치환 없이 원래 SQL 유지");
283 }
284
285 #[test]
286 fn test_list_views() {
287 let reg = ViewRegistry::new();
288 reg.create("v1", "SELECT 1").unwrap();
289 reg.create("v2", "SELECT 2").unwrap();
290 let mut views = reg.list_views();
291 views.sort();
292 assert_eq!(views, vec!["v1", "v2"]);
293 }
294}
295
296use arrow::record_batch::RecordBatch;
301use std::collections::HashSet;
302use std::sync::{Condvar, Mutex, RwLock};
303use std::time::{Duration, Instant};
304
305fn extract_source_tables(sql: &str) -> Vec<String> {
310 let upper = sql.to_uppercase();
311 let tokens: Vec<&str> = upper.split_whitespace().collect();
312 let original_tokens: Vec<&str> = sql.split_whitespace().collect();
313 let mut tables = Vec::new();
314
315 for (i, token) in tokens.iter().enumerate() {
316 if (*token == "FROM" || *token == "JOIN") && i + 1 < tokens.len() {
317 let table_name = original_tokens[i + 1]
318 .trim_matches(|c: char| c == '(' || c == ')' || c == ',' || c == ';')
319 .to_lowercase();
320 if !table_name.is_empty() && table_name != "select" && table_name != "(" {
321 tables.push(table_name);
322 }
323 }
324 }
325 tables.sort();
326 tables.dedup();
327 tables
328}
329
330struct MatViewEntry {
332 sql: String,
334 cache: Option<Vec<RecordBatch>>,
336 refreshed_at: Option<Instant>,
338 refresh_interval_secs: Option<u64>,
340 source_tables: Vec<String>,
342}
343
344pub struct MatViewNotifier {
349 dirty: Mutex<HashSet<String>>,
351 condvar: Condvar,
353}
354
355impl MatViewNotifier {
356 fn new() -> Self {
357 Self {
358 dirty: Mutex::new(HashSet::new()),
359 condvar: Condvar::new(),
360 }
361 }
362
363 fn mark_dirty(&self, mv_name: &str) {
365 let mut dirty = self.dirty.lock().unwrap();
366 dirty.insert(mv_name.to_string());
367 self.condvar.notify_one();
368 }
369
370 pub fn wait_and_take(&self) -> HashSet<String> {
372 let mut dirty = self.dirty.lock().unwrap();
373 while dirty.is_empty() {
374 dirty = self.condvar.wait(dirty).unwrap();
375 }
376 dirty.drain().collect()
377 }
378
379 pub fn take(&self) -> HashSet<String> {
381 let mut dirty = self.dirty.lock().unwrap();
382 dirty.drain().collect()
383 }
384}
385
386pub struct MaterializedViewRegistry {
394 views: DashMap<String, RwLock<MatViewEntry>>,
395 notifier: MatViewNotifier,
397 min_refresh_interval_ms: std::sync::atomic::AtomicU64,
399}
400
401impl Default for MaterializedViewRegistry {
402 fn default() -> Self {
403 Self {
404 views: DashMap::new(),
405 notifier: MatViewNotifier::new(),
406 min_refresh_interval_ms: std::sync::atomic::AtomicU64::new(1000),
407 }
408 }
409}
410
411impl MaterializedViewRegistry {
412 pub fn new() -> Self {
413 Self::default()
414 }
415
416 pub fn set_min_refresh_interval_ms(&self, ms: u64) {
427 self.min_refresh_interval_ms
428 .store(ms, std::sync::atomic::Ordering::Relaxed);
429 }
430
431 pub fn min_refresh_interval_ms(&self) -> u64 {
433 self.min_refresh_interval_ms
434 .load(std::sync::atomic::Ordering::Relaxed)
435 }
436
437 pub fn min_refresh_interval(&self) -> Duration {
439 Duration::from_millis(self.min_refresh_interval_ms())
440 }
441
442 pub fn create(
446 &self,
447 name: &str,
448 sql: &str,
449 refresh_interval_secs: Option<u64>,
450 ) -> DbxResult<()> {
451 let source_tables = extract_source_tables(sql);
452 self.views.insert(
453 name.to_lowercase(),
454 RwLock::new(MatViewEntry {
455 sql: sql.to_string(),
456 cache: None,
457 refreshed_at: None,
458 refresh_interval_secs,
459 source_tables,
460 }),
461 );
462 Ok(())
463 }
464
465 pub fn set_cache(&self, name: &str, batches: Vec<RecordBatch>) -> DbxResult<()> {
467 let entry = self.views.get(&name.to_lowercase()).ok_or_else(|| {
468 DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
469 })?;
470 let mut e = entry.write().unwrap();
471 e.cache = Some(batches);
472 e.refreshed_at = Some(Instant::now());
473 Ok(())
474 }
475
476 pub fn is_fresh(&self, name: &str) -> bool {
482 let entry = match self.views.get(&name.to_lowercase()) {
483 Some(e) => e,
484 None => return false,
485 };
486 let e = entry.read().unwrap();
487 match (e.refreshed_at, e.refresh_interval_secs) {
488 (None, _) => false,
489 (Some(_), None) => true,
490 (Some(t), Some(secs)) => t.elapsed().as_secs() < secs,
491 }
492 }
493
494 pub fn get_cache(&self, name: &str) -> Option<Vec<RecordBatch>> {
496 let entry = self.views.get(&name.to_lowercase())?;
497 entry.read().unwrap().cache.clone()
498 }
499
500 pub fn get_sql(&self, name: &str) -> Option<String> {
502 Some(
503 self.views
504 .get(&name.to_lowercase())?
505 .read()
506 .unwrap()
507 .sql
508 .clone(),
509 )
510 }
511
512 pub fn list(&self) -> Vec<String> {
514 self.views.iter().map(|e| e.key().clone()).collect()
515 }
516
517 pub fn remove(&self, name: &str) -> DbxResult<()> {
519 self.views
520 .remove(&name.to_lowercase())
521 .map(|_| ())
522 .ok_or_else(|| {
523 DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
524 })
525 }
526
527 pub fn notify_change(&self, table: &str) {
538 if self.views.is_empty() {
539 return; }
541 let table_lower = table.to_lowercase();
542 let base_table = if let Some(idx) = table_lower.find("__shard_") {
544 &table_lower[..idx]
545 } else {
546 &table_lower
547 };
548 for entry in self.views.iter() {
549 let mv_name = entry.key();
550 let e = entry.value().read().unwrap();
551 if e.source_tables.iter().any(|t| t == base_table) {
552 drop(e); self.notifier.mark_dirty(mv_name);
554 }
555 }
556 }
557
558 pub fn wait_and_take_dirty(&self) -> HashSet<String> {
560 self.notifier.wait_and_take()
561 }
562
563 pub fn take_dirty(&self) -> HashSet<String> {
565 self.notifier.take()
566 }
567
568 pub fn stale_views(&self) -> Vec<String> {
570 self.views
571 .iter()
572 .filter(|e| {
573 let entry = e.value().read().unwrap();
574 match (entry.refreshed_at, entry.refresh_interval_secs) {
575 (None, _) => true,
576 (Some(_), None) => false,
577 (Some(t), Some(secs)) => t.elapsed().as_secs() >= secs,
578 }
579 })
580 .map(|e| e.key().clone())
581 .collect()
582 }
583}
584
585pub type SharedMaterializedViewRegistry = Arc<MaterializedViewRegistry>;
587
588#[cfg(test)]
589mod matview_tests {
590 use super::*;
591 use arrow::array::Int64Array;
592 use arrow::datatypes::{DataType, Field, Schema};
593
594 fn make_batch(n: i64) -> RecordBatch {
595 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
596 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![n]))]).unwrap()
597 }
598
599 #[test]
600 fn test_materialized_view_cache() {
601 let reg = MaterializedViewRegistry::new();
602 reg.create("mv_users", "SELECT id FROM users", None)
603 .unwrap();
604 assert!(!reg.is_fresh("mv_users")); reg.set_cache("mv_users", vec![make_batch(1), make_batch(2)])
607 .unwrap();
608 assert!(reg.is_fresh("mv_users")); let cached = reg.get_cache("mv_users").unwrap();
611 assert_eq!(cached.len(), 2);
612 }
613
614 #[test]
615 fn test_matview_drop() {
616 let reg = MaterializedViewRegistry::new();
617 reg.create("mv_test", "SELECT 1", None).unwrap();
618 assert!(reg.get_sql("mv_test").is_some());
619 reg.remove("mv_test").unwrap();
620 assert!(reg.get_sql("mv_test").is_none());
621 }
622
623 #[test]
624 fn test_matview_drop_nonexistent_fails() {
625 let reg = MaterializedViewRegistry::new();
626 assert!(reg.remove("nonexistent").is_err());
627 }
628
629 #[test]
630 fn test_matview_with_interval() {
631 let reg = MaterializedViewRegistry::new();
632 reg.create("mv_sales", "SELECT * FROM sales", Some(300))
634 .unwrap();
635 assert!(!reg.is_fresh("mv_sales")); reg.set_cache("mv_sales", vec![make_batch(42)]).unwrap();
638 assert!(reg.is_fresh("mv_sales")); let cached = reg.get_cache("mv_sales").unwrap();
641 assert_eq!(cached[0].num_rows(), 1);
642 }
643
644 #[test]
645 fn test_matview_list() {
646 let reg = MaterializedViewRegistry::new();
647 reg.create("mv_a", "SELECT 1", None).unwrap();
648 reg.create("mv_b", "SELECT 2", None).unwrap();
649 let mut names = reg.list();
650 names.sort();
651 assert_eq!(names, vec!["mv_a", "mv_b"]);
652 }
653
654 #[test]
655 fn test_extract_source_tables() {
656 assert_eq!(
657 extract_source_tables("SELECT id, name FROM users WHERE active = true"),
658 vec!["users"]
659 );
660 assert_eq!(
661 extract_source_tables("SELECT * FROM orders JOIN users ON orders.uid = users.id"),
662 vec!["orders", "users"]
663 );
664 assert_eq!(
665 extract_source_tables("SELECT AVG(price) FROM products"),
666 vec!["products"]
667 );
668 assert_eq!(
670 extract_source_tables("SELECT * FROM t1 JOIN t1 ON t1.a = t1.b"),
671 vec!["t1"]
672 );
673 }
674
675 #[test]
676 fn test_notify_change_marks_dirty() {
677 let reg = MaterializedViewRegistry::new();
678 reg.create("mv_users", "SELECT id FROM users", None)
679 .unwrap();
680 reg.create("mv_orders", "SELECT * FROM orders", None)
681 .unwrap();
682
683 reg.notify_change("users");
685 let dirty = reg.take_dirty();
686 assert!(dirty.contains("mv_users"));
687 assert!(!dirty.contains("mv_orders"));
688
689 reg.notify_change("orders");
691 let dirty = reg.take_dirty();
692 assert!(dirty.contains("mv_orders"));
693 assert!(!dirty.contains("mv_users"));
694 }
695
696 #[test]
697 fn test_notify_change_shard_table() {
698 let reg = MaterializedViewRegistry::new();
699 reg.create("mv_users", "SELECT id FROM users", None)
700 .unwrap();
701
702 reg.notify_change("users__shard_0");
704 let dirty = reg.take_dirty();
705 assert!(dirty.contains("mv_users"));
706 }
707
708 #[test]
709 fn test_configurable_min_refresh_interval() {
710 let reg = MaterializedViewRegistry::new();
711 assert_eq!(reg.min_refresh_interval_ms(), 1000); reg.set_min_refresh_interval_ms(500);
714 assert_eq!(reg.min_refresh_interval_ms(), 500);
715 assert_eq!(reg.min_refresh_interval(), Duration::from_millis(500));
716 }
717
718 #[test]
719 fn test_notify_no_views_is_noop() {
720 let reg = MaterializedViewRegistry::new();
721 reg.notify_change("some_table");
723 let dirty = reg.take_dirty();
724 assert!(dirty.is_empty());
725 }
726}