1use dashmap::DashMap;
7use std::sync::Arc;
8
9use crate::error::{DbxError, DbxResult};
10
11#[derive(Debug, Default)]
15pub struct ViewRegistry {
16 views: DashMap<String, String>,
18}
19
20impl ViewRegistry {
21 pub fn new() -> Self {
23 Self::default()
24 }
25
26 pub fn create(&self, name: &str, sql: &str) -> DbxResult<()> {
28 self.views.insert(name.to_lowercase(), sql.to_string());
29 Ok(())
30 }
31
32 pub fn drop(&self, name: &str) -> DbxResult<()> {
34 self.views
35 .remove(&name.to_lowercase())
36 .map(|_| ())
37 .ok_or_else(|| DbxError::InvalidArguments(format!("뷰 '{}' 를 찾을 수 없음", name)))
38 }
39
40 pub fn exists(&self, name: &str) -> bool {
42 self.views.contains_key(&name.to_lowercase())
43 }
44
45 pub fn expand(&self, sql: &str) -> String {
50 let mut result = sql.to_string();
51 for entry in self.views.iter() {
52 let name = entry.key();
53 let view_sql = entry.value();
54
55 let pattern = format!("from {}", name);
57 let replacement = format!("FROM ({}) AS {}", view_sql, name);
58
59 let lower = result.to_lowercase();
61 if let Some(pos) = lower.find(&pattern) {
62 result = format!(
63 "{}{}{}",
64 &result[..pos],
65 replacement,
66 &result[pos + pattern.len()..]
67 );
68 }
69 }
70 result
71 }
72
73 pub fn list_views(&self) -> Vec<String> {
75 self.views.iter().map(|e| e.key().clone()).collect()
76 }
77}
78
79pub type SharedViewRegistry = Arc<ViewRegistry>;
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85
86 #[test]
87 fn test_create_and_exists() {
88 let reg = ViewRegistry::new();
89 assert!(!reg.exists("active_users"));
90 reg.create(
91 "active_users",
92 "SELECT id, name FROM users WHERE active = true",
93 )
94 .unwrap();
95 assert!(reg.exists("active_users"));
96 }
97
98 #[test]
99 fn test_create_case_insensitive() {
100 let reg = ViewRegistry::new();
101 reg.create("MyView", "SELECT 1").unwrap();
102 assert!(reg.exists("myview"));
103 assert!(reg.exists("MyView")); assert!(reg.exists("MYVIEW"));
105 }
106
107 #[test]
108 fn test_drop_view() {
109 let reg = ViewRegistry::new();
110 reg.create("v", "SELECT 1 AS x").unwrap();
111 assert!(reg.exists("v"));
112 reg.drop("v").unwrap();
113 assert!(!reg.exists("v"));
114 }
115
116 #[test]
117 fn test_drop_nonexistent_fails() {
118 let reg = ViewRegistry::new();
119 assert!(reg.drop("nonexistent").is_err());
120 }
121
122 #[test]
123 fn test_expand_replaces_from_clause() {
124 let reg = ViewRegistry::new();
125 reg.create(
126 "active_users",
127 "SELECT id, name FROM users WHERE active = true",
128 )
129 .unwrap();
130
131 let sql = "SELECT * FROM active_users";
132 let expanded = reg.expand(sql);
133 assert!(
134 expanded.contains("(SELECT id, name FROM users WHERE active = true)"),
135 "서브쿼리가 삽입되어야 함: {}",
136 expanded
137 );
138 assert!(
139 expanded.contains("AS active_users"),
140 "별칭이 지정되어야 함: {}",
141 expanded
142 );
143 }
144
145 #[test]
146 fn test_expand_no_match() {
147 let reg = ViewRegistry::new();
148 reg.create("v", "SELECT 1").unwrap();
149 let sql = "SELECT * FROM users"; let expanded = reg.expand(sql);
151 assert_eq!(expanded, sql, "치환 없이 원래 SQL 유지");
152 }
153
154 #[test]
155 fn test_list_views() {
156 let reg = ViewRegistry::new();
157 reg.create("v1", "SELECT 1").unwrap();
158 reg.create("v2", "SELECT 2").unwrap();
159 let mut views = reg.list_views();
160 views.sort();
161 assert_eq!(views, vec!["v1", "v2"]);
162 }
163}
164
165use arrow::record_batch::RecordBatch;
170use std::collections::HashSet;
171use std::sync::{Condvar, Mutex, RwLock};
172use std::time::{Duration, Instant};
173
174fn extract_source_tables(sql: &str) -> Vec<String> {
179 let upper = sql.to_uppercase();
180 let tokens: Vec<&str> = upper.split_whitespace().collect();
181 let original_tokens: Vec<&str> = sql.split_whitespace().collect();
182 let mut tables = Vec::new();
183
184 for (i, token) in tokens.iter().enumerate() {
185 if (*token == "FROM" || *token == "JOIN") && i + 1 < tokens.len() {
186 let table_name = original_tokens[i + 1]
187 .trim_matches(|c: char| c == '(' || c == ')' || c == ',' || c == ';')
188 .to_lowercase();
189 if !table_name.is_empty() && table_name != "select" && table_name != "(" {
190 tables.push(table_name);
191 }
192 }
193 }
194 tables.sort();
195 tables.dedup();
196 tables
197}
198
199struct MatViewEntry {
201 sql: String,
203 cache: Option<Vec<RecordBatch>>,
205 refreshed_at: Option<Instant>,
207 refresh_interval_secs: Option<u64>,
209 source_tables: Vec<String>,
211}
212
213pub struct MatViewNotifier {
218 dirty: Mutex<HashSet<String>>,
220 condvar: Condvar,
222}
223
224impl MatViewNotifier {
225 fn new() -> Self {
226 Self {
227 dirty: Mutex::new(HashSet::new()),
228 condvar: Condvar::new(),
229 }
230 }
231
232 fn mark_dirty(&self, mv_name: &str) {
234 let mut dirty = self.dirty.lock().unwrap();
235 dirty.insert(mv_name.to_string());
236 self.condvar.notify_one();
237 }
238
239 pub fn wait_and_take(&self) -> HashSet<String> {
241 let mut dirty = self.dirty.lock().unwrap();
242 while dirty.is_empty() {
243 dirty = self.condvar.wait(dirty).unwrap();
244 }
245 dirty.drain().collect()
246 }
247
248 pub fn take(&self) -> HashSet<String> {
250 let mut dirty = self.dirty.lock().unwrap();
251 dirty.drain().collect()
252 }
253}
254
255pub struct MaterializedViewRegistry {
263 views: DashMap<String, RwLock<MatViewEntry>>,
264 notifier: MatViewNotifier,
266 min_refresh_interval_ms: std::sync::atomic::AtomicU64,
268}
269
270impl Default for MaterializedViewRegistry {
271 fn default() -> Self {
272 Self {
273 views: DashMap::new(),
274 notifier: MatViewNotifier::new(),
275 min_refresh_interval_ms: std::sync::atomic::AtomicU64::new(1000),
276 }
277 }
278}
279
280impl MaterializedViewRegistry {
281 pub fn new() -> Self {
282 Self::default()
283 }
284
285 pub fn set_min_refresh_interval_ms(&self, ms: u64) {
296 self.min_refresh_interval_ms
297 .store(ms, std::sync::atomic::Ordering::Relaxed);
298 }
299
300 pub fn min_refresh_interval_ms(&self) -> u64 {
302 self.min_refresh_interval_ms
303 .load(std::sync::atomic::Ordering::Relaxed)
304 }
305
306 pub fn min_refresh_interval(&self) -> Duration {
308 Duration::from_millis(self.min_refresh_interval_ms())
309 }
310
311 pub fn create(
315 &self,
316 name: &str,
317 sql: &str,
318 refresh_interval_secs: Option<u64>,
319 ) -> DbxResult<()> {
320 let source_tables = extract_source_tables(sql);
321 self.views.insert(
322 name.to_lowercase(),
323 RwLock::new(MatViewEntry {
324 sql: sql.to_string(),
325 cache: None,
326 refreshed_at: None,
327 refresh_interval_secs,
328 source_tables,
329 }),
330 );
331 Ok(())
332 }
333
334 pub fn set_cache(&self, name: &str, batches: Vec<RecordBatch>) -> DbxResult<()> {
336 let entry = self.views.get(&name.to_lowercase()).ok_or_else(|| {
337 DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
338 })?;
339 let mut e = entry.write().unwrap();
340 e.cache = Some(batches);
341 e.refreshed_at = Some(Instant::now());
342 Ok(())
343 }
344
345 pub fn is_fresh(&self, name: &str) -> bool {
351 let entry = match self.views.get(&name.to_lowercase()) {
352 Some(e) => e,
353 None => return false,
354 };
355 let e = entry.read().unwrap();
356 match (e.refreshed_at, e.refresh_interval_secs) {
357 (None, _) => false,
358 (Some(_), None) => true,
359 (Some(t), Some(secs)) => t.elapsed().as_secs() < secs,
360 }
361 }
362
363 pub fn get_cache(&self, name: &str) -> Option<Vec<RecordBatch>> {
365 let entry = self.views.get(&name.to_lowercase())?;
366 entry.read().unwrap().cache.clone()
367 }
368
369 pub fn get_sql(&self, name: &str) -> Option<String> {
371 Some(
372 self.views
373 .get(&name.to_lowercase())?
374 .read()
375 .unwrap()
376 .sql
377 .clone(),
378 )
379 }
380
381 pub fn list(&self) -> Vec<String> {
383 self.views.iter().map(|e| e.key().clone()).collect()
384 }
385
386 pub fn remove(&self, name: &str) -> DbxResult<()> {
388 self.views
389 .remove(&name.to_lowercase())
390 .map(|_| ())
391 .ok_or_else(|| {
392 DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
393 })
394 }
395
396 pub fn notify_change(&self, table: &str) {
407 if self.views.is_empty() {
408 return; }
410 let table_lower = table.to_lowercase();
411 let base_table = if let Some(idx) = table_lower.find("__shard_") {
413 &table_lower[..idx]
414 } else {
415 &table_lower
416 };
417 for entry in self.views.iter() {
418 let mv_name = entry.key();
419 let e = entry.value().read().unwrap();
420 if e.source_tables.iter().any(|t| t == base_table) {
421 drop(e); self.notifier.mark_dirty(mv_name);
423 }
424 }
425 }
426
427 pub fn wait_and_take_dirty(&self) -> HashSet<String> {
429 self.notifier.wait_and_take()
430 }
431
432 pub fn take_dirty(&self) -> HashSet<String> {
434 self.notifier.take()
435 }
436
437 pub fn stale_views(&self) -> Vec<String> {
439 self.views
440 .iter()
441 .filter(|e| {
442 let entry = e.value().read().unwrap();
443 match (entry.refreshed_at, entry.refresh_interval_secs) {
444 (None, _) => true,
445 (Some(_), None) => false,
446 (Some(t), Some(secs)) => t.elapsed().as_secs() >= secs,
447 }
448 })
449 .map(|e| e.key().clone())
450 .collect()
451 }
452}
453
454pub type SharedMaterializedViewRegistry = Arc<MaterializedViewRegistry>;
456
457#[cfg(test)]
458mod matview_tests {
459 use super::*;
460 use arrow::array::Int64Array;
461 use arrow::datatypes::{DataType, Field, Schema};
462
463 fn make_batch(n: i64) -> RecordBatch {
464 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
465 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![n]))]).unwrap()
466 }
467
468 #[test]
469 fn test_materialized_view_cache() {
470 let reg = MaterializedViewRegistry::new();
471 reg.create("mv_users", "SELECT id FROM users", None)
472 .unwrap();
473 assert!(!reg.is_fresh("mv_users")); reg.set_cache("mv_users", vec![make_batch(1), make_batch(2)])
476 .unwrap();
477 assert!(reg.is_fresh("mv_users")); let cached = reg.get_cache("mv_users").unwrap();
480 assert_eq!(cached.len(), 2);
481 }
482
483 #[test]
484 fn test_matview_drop() {
485 let reg = MaterializedViewRegistry::new();
486 reg.create("mv_test", "SELECT 1", None).unwrap();
487 assert!(reg.get_sql("mv_test").is_some());
488 reg.remove("mv_test").unwrap();
489 assert!(reg.get_sql("mv_test").is_none());
490 }
491
492 #[test]
493 fn test_matview_drop_nonexistent_fails() {
494 let reg = MaterializedViewRegistry::new();
495 assert!(reg.remove("nonexistent").is_err());
496 }
497
498 #[test]
499 fn test_matview_with_interval() {
500 let reg = MaterializedViewRegistry::new();
501 reg.create("mv_sales", "SELECT * FROM sales", Some(300))
503 .unwrap();
504 assert!(!reg.is_fresh("mv_sales")); reg.set_cache("mv_sales", vec![make_batch(42)]).unwrap();
507 assert!(reg.is_fresh("mv_sales")); let cached = reg.get_cache("mv_sales").unwrap();
510 assert_eq!(cached[0].num_rows(), 1);
511 }
512
513 #[test]
514 fn test_matview_list() {
515 let reg = MaterializedViewRegistry::new();
516 reg.create("mv_a", "SELECT 1", None).unwrap();
517 reg.create("mv_b", "SELECT 2", None).unwrap();
518 let mut names = reg.list();
519 names.sort();
520 assert_eq!(names, vec!["mv_a", "mv_b"]);
521 }
522
523 #[test]
524 fn test_extract_source_tables() {
525 assert_eq!(
526 extract_source_tables("SELECT id, name FROM users WHERE active = true"),
527 vec!["users"]
528 );
529 assert_eq!(
530 extract_source_tables("SELECT * FROM orders JOIN users ON orders.uid = users.id"),
531 vec!["orders", "users"]
532 );
533 assert_eq!(
534 extract_source_tables("SELECT AVG(price) FROM products"),
535 vec!["products"]
536 );
537 assert_eq!(
539 extract_source_tables("SELECT * FROM t1 JOIN t1 ON t1.a = t1.b"),
540 vec!["t1"]
541 );
542 }
543
544 #[test]
545 fn test_notify_change_marks_dirty() {
546 let reg = MaterializedViewRegistry::new();
547 reg.create("mv_users", "SELECT id FROM users", None)
548 .unwrap();
549 reg.create("mv_orders", "SELECT * FROM orders", None)
550 .unwrap();
551
552 reg.notify_change("users");
554 let dirty = reg.take_dirty();
555 assert!(dirty.contains("mv_users"));
556 assert!(!dirty.contains("mv_orders"));
557
558 reg.notify_change("orders");
560 let dirty = reg.take_dirty();
561 assert!(dirty.contains("mv_orders"));
562 assert!(!dirty.contains("mv_users"));
563 }
564
565 #[test]
566 fn test_notify_change_shard_table() {
567 let reg = MaterializedViewRegistry::new();
568 reg.create("mv_users", "SELECT id FROM users", None)
569 .unwrap();
570
571 reg.notify_change("users__shard_0");
573 let dirty = reg.take_dirty();
574 assert!(dirty.contains("mv_users"));
575 }
576
577 #[test]
578 fn test_configurable_min_refresh_interval() {
579 let reg = MaterializedViewRegistry::new();
580 assert_eq!(reg.min_refresh_interval_ms(), 1000); reg.set_min_refresh_interval_ms(500);
583 assert_eq!(reg.min_refresh_interval_ms(), 500);
584 assert_eq!(reg.min_refresh_interval(), Duration::from_millis(500));
585 }
586
587 #[test]
588 fn test_notify_no_views_is_noop() {
589 let reg = MaterializedViewRegistry::new();
590 reg.notify_change("some_table");
592 let dirty = reg.take_dirty();
593 assert!(dirty.is_empty());
594 }
595}