Skip to main content

dbx_core/engine/
ddl_api.rs

1//! DDL API implementation - Schema management convenience methods
2
3use crate::engine::Database;
4use crate::error::DbxResult;
5use arrow::datatypes::{DataType, Schema};
6use std::sync::Arc;
7
8impl Database {
9    /// Create a new table with the given Arrow schema
10    ///
11    /// This is a convenience wrapper around `execute_sql("CREATE TABLE ...")`.
12    /// It automatically converts the Arrow schema to SQL DDL.
13    ///
14    /// # Example
15    ///
16    /// ```rust
17    /// use dbx_core::Database;
18    /// use arrow::datatypes::{DataType, Field, Schema};
19    ///
20    /// # fn main() -> dbx_core::DbxResult<()> {
21    /// let db = Database::open_in_memory()?;
22    ///
23    /// let schema = Schema::new(vec![
24    ///     Field::new("id", DataType::Int64, false),
25    ///     Field::new("name", DataType::Utf8, true),
26    ///     Field::new("age", DataType::Int32, true),
27    /// ]);
28    ///
29    /// db.create_table("users", schema)?;
30    /// assert!(db.table_exists("users"));
31    /// # Ok(())
32    /// # }
33    /// ```
34    pub fn create_table(&self, name: &str, schema: Schema) -> DbxResult<()> {
35        let schema_arc = Arc::new(schema);
36
37        // Generate CREATE TABLE SQL from Arrow Schema
38        let sql = self.generate_create_table_sql(name, &schema_arc);
39
40        // Execute SQL FIRST (this will check if table exists)
41        self.execute_sql(&sql)?;
42
43        // THEN store schema (after SQL succeeds)
44        self.table_schemas
45            .write()
46            .unwrap()
47            .insert(name.to_string(), Arc::clone(&schema_arc));
48
49        // Initialize empty table data
50        self.tables
51            .write()
52            .unwrap()
53            .insert(name.to_string(), vec![]);
54
55        // Initialize row counter
56        self.row_counters
57            .insert(name.to_string(), std::sync::atomic::AtomicUsize::new(0));
58
59        Ok(())
60    }
61
62    /// Drop a table
63    ///
64    /// # Example
65    ///
66    /// ```rust
67    /// use dbx_core::Database;
68    /// use arrow::datatypes::{DataType, Field, Schema};
69    ///
70    /// # fn main() -> dbx_core::DbxResult<()> {
71    /// let db = Database::open_in_memory()?;
72    ///
73    /// let schema = Schema::new(vec![
74    ///     Field::new("id", DataType::Int64, false),
75    /// ]);
76    ///
77    /// db.create_table("temp", schema)?;
78    /// db.drop_table("temp")?;
79    /// assert!(!db.table_exists("temp"));
80    /// # Ok(())
81    /// # }
82    /// ```
83    pub fn drop_table(&self, name: &str) -> DbxResult<()> {
84        self.execute_sql(&format!("DROP TABLE {}", name))?;
85        self.table_schemas.write().unwrap().remove(name);
86        Ok(())
87    }
88
89    /// Check if a table exists
90    ///
91    /// # Example
92    ///
93    /// ```rust
94    /// use dbx_core::Database;
95    /// use arrow::datatypes::{DataType, Field, Schema};
96    ///
97    /// # fn main() -> dbx_core::DbxResult<()> {
98    /// let db = Database::open_in_memory()?;
99    ///
100    /// assert!(!db.table_exists("users"));
101    ///
102    /// let schema = Schema::new(vec![
103    ///     Field::new("id", DataType::Int64, false),
104    /// ]);
105    ///
106    /// db.create_table("users", schema)?;
107    /// assert!(db.table_exists("users"));
108    /// # Ok(())
109    /// # }
110    /// ```
111    pub fn table_exists(&self, name: &str) -> bool {
112        self.table_schemas.read().unwrap().contains_key(name)
113    }
114
115    /// Get the schema of a table
116    ///
117    /// # Example
118    ///
119    /// ```rust
120    /// use dbx_core::Database;
121    /// use arrow::datatypes::{DataType, Field, Schema};
122    ///
123    /// # fn main() -> dbx_core::DbxResult<()> {
124    /// let db = Database::open_in_memory()?;
125    ///
126    /// let schema = Schema::new(vec![
127    ///     Field::new("id", DataType::Int64, false),
128    ///     Field::new("name", DataType::Utf8, true),
129    /// ]);
130    ///
131    /// db.create_table("users", schema.clone())?;
132    /// let retrieved_schema = db.get_table_schema("users")?;
133    /// assert_eq!(retrieved_schema.fields().len(), 2);
134    /// # Ok(())
135    /// # }
136    /// ```
137    pub fn get_table_schema(&self, name: &str) -> DbxResult<Schema> {
138        self.table_schemas
139            .read()
140            .unwrap()
141            .get(name)
142            .map(|s| (**s).clone())
143            .ok_or_else(|| crate::DbxError::Schema(format!("Table '{}' not found", name)))
144    }
145
146    /// List all tables
147    pub fn list_tables(&self) -> Vec<String> {
148        self.table_schemas.read().unwrap().keys().cloned().collect()
149    }
150
151    /// Helper: Generate CREATE TABLE SQL from Arrow Schema
152    fn generate_create_table_sql(&self, name: &str, schema: &Schema) -> String {
153        let columns: Vec<String> = schema
154            .fields()
155            .iter()
156            .map(|field| {
157                let sql_type = match field.data_type() {
158                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => "INT",
159                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
160                        "INT"
161                    }
162                    DataType::Float32 | DataType::Float64 => "FLOAT",
163                    DataType::Utf8 | DataType::LargeUtf8 => "TEXT",
164                    DataType::Boolean => "BOOLEAN",
165                    DataType::Binary | DataType::LargeBinary => "BLOB",
166                    DataType::Date32 | DataType::Date64 => "DATE",
167                    DataType::Timestamp(_, _) => "TIMESTAMP",
168                    _ => "TEXT", // Default to TEXT for unsupported types
169                };
170                format!("{} {}", field.name(), sql_type)
171            })
172            .collect();
173
174        format!("CREATE TABLE {} ({})", name, columns.join(", "))
175    }
176
177    /// Create a SQL index on table columns
178    ///
179    /// This is a convenience wrapper around `execute_sql("CREATE INDEX ...")`.
180    /// For Hash Index (O(1) lookup), use `create_index(table, column)` instead.
181    ///
182    /// # Example
183    ///
184    /// ```rust
185    /// use dbx_core::Database;
186    /// use arrow::datatypes::{DataType, Field, Schema};
187    ///
188    /// # fn main() -> dbx_core::DbxResult<()> {
189    /// let db = Database::open_in_memory()?;
190    ///
191    /// let schema = Schema::new(vec![
192    ///     Field::new("id", DataType::Int64, false),
193    ///     Field::new("email", DataType::Utf8, true),
194    /// ]);
195    ///
196    /// db.create_table("users", schema)?;
197    /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
198    /// assert!(db.sql_index_exists("idx_email"));
199    /// # Ok(())
200    /// # }
201    /// ```
202    pub fn create_sql_index(
203        &self,
204        table: &str,
205        index_name: &str,
206        columns: Vec<String>,
207    ) -> DbxResult<()> {
208        // Generate CREATE INDEX SQL
209        let columns_str = columns.join(", ");
210        let sql = format!("CREATE INDEX {} ON {} ({})", index_name, table, columns_str);
211
212        // Execute SQL
213        self.execute_sql(&sql)?;
214        Ok(())
215    }
216
217    /// Drop a SQL index
218    ///
219    /// This is a convenience wrapper around `execute_sql("DROP INDEX ...")`.
220    /// For Hash Index, use `drop_index(table, column)` instead.
221    ///
222    /// Note: The index must have been created with `create_sql_index` to be tracked properly.
223    ///
224    /// # Example
225    ///
226    /// ```rust
227    /// use dbx_core::Database;
228    /// use arrow::datatypes::{DataType, Field, Schema};
229    ///
230    /// # fn main() -> dbx_core::DbxResult<()> {
231    /// let db = Database::open_in_memory()?;
232    ///
233    /// let schema = Schema::new(vec![
234    ///     Field::new("id", DataType::Int64, false),
235    ///     Field::new("email", DataType::Utf8, true),
236    /// ]);
237    ///
238    /// db.create_table("users", schema)?;
239    /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
240    /// db.drop_sql_index("users", "idx_email")?;
241    /// assert!(!db.sql_index_exists("idx_email"));
242    /// # Ok(())
243    /// # }
244    /// ```
245    pub fn drop_sql_index(&self, table: &str, index_name: &str) -> DbxResult<()> {
246        // Use table.index_name format for DROP INDEX
247        let sql = format!("DROP INDEX {}.{}", table, index_name);
248        self.execute_sql(&sql)?;
249        Ok(())
250    }
251
252    /// Check if a SQL index exists
253    ///
254    /// For Hash Index, use `has_index(table, column)` instead.
255    ///
256    /// # Example
257    ///
258    /// ```rust
259    /// use dbx_core::Database;
260    /// use arrow::datatypes::{DataType, Field, Schema};
261    ///
262    /// # fn main() -> dbx_core::DbxResult<()> {
263    /// let db = Database::open_in_memory()?;
264    ///
265    /// let schema = Schema::new(vec![
266    ///     Field::new("id", DataType::Int64, false),
267    ///     Field::new("email", DataType::Utf8, true),
268    /// ]);
269    ///
270    /// db.create_table("users", schema)?;
271    /// assert!(!db.sql_index_exists("idx_email"));
272    ///
273    /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
274    /// assert!(db.sql_index_exists("idx_email"));
275    /// # Ok(())
276    /// # }
277    /// ```
278    pub fn sql_index_exists(&self, index_name: &str) -> bool {
279        self.index_registry.read().unwrap().contains_key(index_name)
280    }
281
282    /// List all SQL indexes for a table
283    ///
284    /// # Example
285    ///
286    /// ```rust
287    /// use dbx_core::Database;
288    /// use arrow::datatypes::{DataType, Field, Schema};
289    ///
290    /// # fn main() -> dbx_core::DbxResult<()> {
291    /// let db = Database::open_in_memory()?;
292    ///
293    /// let schema = Schema::new(vec![
294    ///     Field::new("id", DataType::Int64, false),
295    ///     Field::new("email", DataType::Utf8, true),
296    ///     Field::new("name", DataType::Utf8, true),
297    /// ]);
298    ///
299    /// db.create_table("users", schema)?;
300    /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
301    /// db.create_sql_index("users", "idx_name", vec!["name".to_string()])?;
302    ///
303    /// let indexes = db.list_sql_indexes("users");
304    /// assert!(indexes.contains(&"idx_email".to_string()));
305    /// assert!(indexes.contains(&"idx_name".to_string()));
306    /// # Ok(())
307    /// # }
308    /// ```
309    pub fn list_sql_indexes(&self, table: &str) -> Vec<String> {
310        self.index_registry
311            .read()
312            .unwrap()
313            .iter()
314            .filter_map(|(index_name, (tbl, _col))| {
315                if tbl == table {
316                    Some(index_name.clone())
317                } else {
318                    None
319                }
320            })
321            .collect()
322    }
323
324    /// Add a column to an existing table
325    ///
326    /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... ADD COLUMN ...")`.
327    ///
328    /// # Example
329    ///
330    /// ```rust
331    /// use dbx_core::Database;
332    /// use arrow::datatypes::{DataType, Field, Schema};
333    ///
334    /// # fn main() -> dbx_core::DbxResult<()> {
335    /// let db = Database::open_in_memory()?;
336    ///
337    /// let schema = Schema::new(vec![
338    ///     Field::new("id", DataType::Int64, false),
339    ///     Field::new("name", DataType::Utf8, true),
340    /// ]);
341    ///
342    /// db.create_table("users", schema)?;
343    /// db.add_column("users", "email", "TEXT")?;
344    ///
345    /// let updated_schema = db.get_table_schema("users")?;
346    /// assert_eq!(updated_schema.fields().len(), 3);
347    /// # Ok(())
348    /// # }
349    /// ```
350    pub fn add_column(&self, table: &str, column_name: &str, data_type: &str) -> DbxResult<()> {
351        let sql = format!(
352            "ALTER TABLE {} ADD COLUMN {} {}",
353            table, column_name, data_type
354        );
355        self.execute_sql(&sql)?;
356        Ok(())
357    }
358
359    /// Drop a column from an existing table
360    ///
361    /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... DROP COLUMN ...")`.
362    ///
363    /// # Example
364    ///
365    /// ```rust
366    /// use dbx_core::Database;
367    /// use arrow::datatypes::{DataType, Field, Schema};
368    ///
369    /// # fn main() -> dbx_core::DbxResult<()> {
370    /// let db = Database::open_in_memory()?;
371    ///
372    /// let schema = Schema::new(vec![
373    ///     Field::new("id", DataType::Int64, false),
374    ///     Field::new("name", DataType::Utf8, true),
375    ///     Field::new("email", DataType::Utf8, true),
376    /// ]);
377    ///
378    /// db.create_table("users", schema)?;
379    /// db.drop_column("users", "email")?;
380    ///
381    /// let updated_schema = db.get_table_schema("users")?;
382    /// assert_eq!(updated_schema.fields().len(), 2);
383    /// # Ok(())
384    /// # }
385    /// ```
386    pub fn drop_column(&self, table: &str, column_name: &str) -> DbxResult<()> {
387        let sql = format!("ALTER TABLE {} DROP COLUMN {}", table, column_name);
388        self.execute_sql(&sql)?;
389        Ok(())
390    }
391
392    /// Rename a column in an existing table
393    ///
394    /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... RENAME COLUMN ...")`.
395    ///
396    /// # Example
397    ///
398    /// ```rust
399    /// use dbx_core::Database;
400    /// use arrow::datatypes::{DataType, Field, Schema};
401    ///
402    /// # fn main() -> dbx_core::DbxResult<()> {
403    /// let db = Database::open_in_memory()?;
404    ///
405    /// let schema = Schema::new(vec![
406    ///     Field::new("id", DataType::Int64, false),
407    ///     Field::new("user_name", DataType::Utf8, true),
408    /// ]);
409    ///
410    /// db.create_table("users", schema)?;
411    /// db.rename_column("users", "user_name", "name")?;
412    ///
413    /// let updated_schema = db.get_table_schema("users")?;
414    /// assert_eq!(updated_schema.field(1).name(), "name");
415    /// # Ok(())
416    /// # }
417    /// ```
418    pub fn rename_column(&self, table: &str, old_name: &str, new_name: &str) -> DbxResult<()> {
419        let sql = format!(
420            "ALTER TABLE {} RENAME COLUMN {} TO {}",
421            table, old_name, new_name
422        );
423        self.execute_sql(&sql)?;
424        Ok(())
425    }
426
427    // ════════════════════════════════════════════
428    // Phase 3: 파티셔닝 API (Partitioning API)
429    // ════════════════════════════════════════════
430
431    /// 파티셔닝 규칙을 생성합니다.
432    ///
433    /// 이후 해당 `table`로 들어오는 INSERT는 키 값에 따라
434    /// `route_key()`가 반환하는 내부 sub-table로 라우팅됩니다.
435    pub fn create_partition(&self, map: crate::storage::partition::PartitionMap) -> DbxResult<()> {
436        let table_name = map.table.clone();
437        self.partition_maps.write().unwrap().insert(table_name, map);
438        Ok(())
439    }
440
441    /// 자동 확장(Auto-Expand)을 지원하는 범위 파티션을 생성합니다 (Phase 3.4).
442    ///
443    /// 설정된 범위를 초과하는 키 값이 인입되면 `interval` 크기만큼
444    /// 새로운 파티션 구획을 자동 생성하며 지속적으로 확장됩니다.
445    pub fn create_auto_range_partition(
446        &self,
447        table: &str,
448        column: &str,
449        initial_low: i64,
450        interval: i64,
451        max_partitions: usize,
452    ) -> DbxResult<()> {
453        use crate::storage::partition::{PartitionMap, PartitionType};
454        let map = PartitionMap {
455            table: table.to_string(),
456            partition_type: PartitionType::Range {
457                column: column.to_string(),
458                bounds: vec![(initial_low, initial_low + interval)],
459                auto_expand_interval: Some((interval, max_partitions)),
460            },
461            num_partitions: 1, // 초기 파티션 크기
462        };
463
464        self.partition_maps
465            .write()
466            .unwrap()
467            .insert(table.to_string(), map);
468        Ok(())
469    }
470
471    /// 파티셔닝 규칙을 제거합니다.
472    pub fn drop_partition(&self, table: &str) -> DbxResult<()> {
473        self.partition_maps.write().unwrap().remove(table);
474        Ok(())
475    }
476
477    // ════════════════════════════════════════════
478    // Phase 3 Synergy: PartitionStats API
479    // ════════════════════════════════════════════
480
481    /// 파티션 통계를 갱신합니다.
482    ///
483    /// # Example
484    /// ```rust
485    /// use dbx_core::Database;
486    /// use dbx_core::storage::partition::PartitionStats;
487    ///
488    /// # fn main() -> dbx_core::DbxResult<()> {
489    /// let db = Database::open_in_memory()?;
490    /// db.update_partition_stats("orders", "orders__p_part_0", PartitionStats {
491    ///     row_count: 1000, min_value: 0, max_value: 999,
492    ///     null_count: 0, distinct_count: 1000,
493    /// })?;
494    /// # Ok(())
495    /// # }
496    /// ```
497    pub fn update_partition_stats(
498        &self,
499        table: &str,
500        partition_name: &str,
501        stats: crate::storage::partition::PartitionStats,
502    ) -> DbxResult<()> {
503        let key = format!("{}__{}", table, partition_name);
504        self.partition_stats.insert(key, stats);
505        Ok(())
506    }
507
508    /// 특정 파티션의 통계를 조회합니다.
509    pub fn get_partition_stats(
510        &self,
511        table: &str,
512        partition_name: &str,
513    ) -> DbxResult<crate::storage::partition::PartitionStats> {
514        let key = format!("{}__{}", table, partition_name);
515        self.partition_stats
516            .get(&key)
517            .map(|r| r.clone())
518            .ok_or_else(|| crate::error::DbxError::InvalidOperation {
519                message: format!("No stats for partition '{}'", partition_name),
520                context: format!("Call update_partition_stats first for table '{}'", table),
521            })
522    }
523
524    /// 테이블의 모든 파티션 통계를 반환합니다.
525    pub fn all_partition_stats(
526        &self,
527        table: &str,
528    ) -> DbxResult<std::collections::HashMap<String, crate::storage::partition::PartitionStats>>
529    {
530        let prefix = format!("{}__", table);
531        let result = self
532            .partition_stats
533            .iter()
534            .filter(|r| r.key().starts_with(&prefix))
535            .map(|r| (r.key().clone(), r.value().clone()))
536            .collect();
537        Ok(result)
538    }
539
540    // ════════════════════════════════════════════
541    // Phase 3 Synergy: Per-Partition Compression API
542    // ════════════════════════════════════════════
543
544    /// 파티션별 압축 설정을 지정합니다.
545    ///
546    /// # Example
547    /// ```rust
548    /// use dbx_core::Database;
549    /// use dbx_core::storage::compression::CompressionConfig;
550    ///
551    /// # fn main() -> dbx_core::DbxResult<()> {
552    /// let db = Database::open_in_memory()?;
553    /// // 오래된 파티션 → 고압축
554    /// db.set_partition_compression("orders", "orders__p_part_0", CompressionConfig::zstd_level(9))?;
555    /// # Ok(())
556    /// # }
557    /// ```
558    pub fn set_partition_compression(
559        &self,
560        table: &str,
561        partition_name: &str,
562        config: crate::storage::compression::CompressionConfig,
563    ) -> DbxResult<()> {
564        let key = format!("{}__{}", table, partition_name);
565        self.partition_compression.insert(key, config);
566        Ok(())
567    }
568
569    /// 파티션 압축 설정 조회 (미설정 시 기본값 Snappy 반환).
570    pub fn get_partition_compression(
571        &self,
572        table: &str,
573        partition_name: &str,
574    ) -> DbxResult<crate::storage::compression::CompressionConfig> {
575        let key = format!("{}__{}", table, partition_name);
576        Ok(self
577            .partition_compression
578            .get(&key)
579            .map(|r| *r)
580            .unwrap_or_default())
581    }
582
583    // ════════════════════════════════════════════
584    // Phase 3 Synergy: PartitionLifecycle API
585    // ════════════════════════════════════════════
586
587    /// 테이블의 파티션 수명 주기 정책을 설정합니다.
588    ///
589    /// # Example
590    /// ```rust
591    /// use dbx_core::Database;
592    /// use dbx_core::storage::partition::PartitionLifecycle;
593    ///
594    /// # fn main() -> dbx_core::DbxResult<()> {
595    /// let db = Database::open_in_memory()?;
596    /// db.enable_auto_archive("logs", PartitionLifecycle {
597    ///     archive_after_days: 90,
598    ///     delete_after_days: 365,
599    /// })?;
600    /// # Ok(())
601    /// # }
602    /// ```
603    pub fn enable_auto_archive(
604        &self,
605        table: &str,
606        lifecycle: crate::storage::partition::PartitionLifecycle,
607    ) -> DbxResult<()> {
608        self.partition_lifecycle
609            .insert(table.to_string(), lifecycle);
610
611        // ── 완전 자동화: 백그라운드 Lifecycle 스케줄러 구동 ──────────────
612        // compare_exchange: false→true 성공 시만 스레드 기동 (중복 방지)
613        use std::sync::atomic::Ordering;
614        if self.lifecycle_stop_flag.load(Ordering::Relaxed) {
615            // 이전에 stop된 경우 플래그 리셋
616            self.lifecycle_stop_flag.store(false, Ordering::SeqCst);
617        }
618
619        if self
620            .lifecycle_running
621            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
622            .is_ok()
623        {
624            let stop = Arc::clone(&self.lifecycle_stop_flag);
625            let running = Arc::clone(&self.lifecycle_running);
626            let lifecycle_map = Arc::clone(&self.partition_lifecycle);
627            let creation_times = Arc::clone(&self.partition_creation_times);
628            let compression_map = Arc::clone(&self.partition_compression);
629            let tier_map = Arc::clone(&self.partition_tier_hints);
630            let stats_map = Arc::clone(&self.partition_stats);
631
632            std::thread::Builder::new()
633                .name("dbx-lifecycle-scheduler".into())
634                .spawn(move || {
635                    use std::sync::atomic::Ordering;
636                    let interval = std::time::Duration::from_secs(3600); // 1시간마다
637                    loop {
638                        std::thread::sleep(interval);
639                        if stop.load(Ordering::Relaxed) {
640                            running.store(false, Ordering::SeqCst);
641                            break;
642                        }
643                        let now = std::time::SystemTime::now()
644                            .duration_since(std::time::UNIX_EPOCH)
645                            .unwrap()
646                            .as_secs();
647
648                        let tables: Vec<String> =
649                            lifecycle_map.iter().map(|r| r.key().clone()).collect();
650
651                        for table in tables {
652                            if stop.load(Ordering::Relaxed) {
653                                break;
654                            }
655                            let lc = match lifecycle_map.get(&table) {
656                                Some(l) => l.clone(),
657                                None => continue,
658                            };
659                            let prefix = format!("{}__", table);
660                            let candidates: Vec<(String, u64)> = creation_times
661                                .iter()
662                                .filter(|r| r.key().starts_with(&prefix))
663                                .map(|r| (r.key().clone(), *r.value()))
664                                .collect();
665
666                            for (sub_table, created_at) in candidates {
667                                let age_secs = now.saturating_sub(created_at);
668                                let delete_threshold = lc.delete_after_days as u64 * 86400;
669                                let archive_threshold = lc.archive_after_days as u64 * 86400;
670
671                                if age_secs >= delete_threshold {
672                                    stats_map.remove(&sub_table);
673                                    compression_map.remove(&sub_table);
674                                    tier_map.remove(&sub_table);
675                                    creation_times.remove(&sub_table);
676                                } else if age_secs >= archive_threshold {
677                                    compression_map.insert(
678                                        sub_table.clone(),
679                                        crate::storage::compression::CompressionConfig::zstd_level(
680                                            9,
681                                        ),
682                                    );
683                                    tier_map.insert(
684                                        sub_table.clone(),
685                                        crate::storage::partition::PartitionTierHint::Cold,
686                                    );
687                                }
688                            }
689                        }
690                    }
691                })
692                .map_err(|e| crate::error::DbxError::InvalidOperation {
693                    message: format!("Failed to spawn lifecycle scheduler: {}", e),
694                    context: "enable_auto_archive".to_string(),
695                })?;
696        } // end if CAS
697
698        Ok(())
699    }
700
701    /// 테이블의 파티션 수명 주기 정책을 조회합니다.
702    pub fn get_partition_lifecycle(
703        &self,
704        table: &str,
705    ) -> DbxResult<crate::storage::partition::PartitionLifecycle> {
706        self.partition_lifecycle
707            .get(table)
708            .map(|r| r.clone())
709            .ok_or_else(|| crate::error::DbxError::InvalidOperation {
710                message: format!("No lifecycle policy for table '{}'", table),
711                context: "Call enable_auto_archive first".to_string(),
712            })
713    }
714
715    /// 파티션이 아카이브 시점이 되었는지 확인합니다.
716    ///
717    /// `partition_created_at`: UNIX timestamp (초 단위)
718    pub fn partition_needs_archive(
719        &self,
720        table: &str,
721        partition_created_at: u64,
722    ) -> DbxResult<bool> {
723        let lc = self.get_partition_lifecycle(table)?;
724        let now = std::time::SystemTime::now()
725            .duration_since(std::time::UNIX_EPOCH)
726            .unwrap()
727            .as_secs();
728        let threshold_secs = lc.archive_after_days as u64 * 24 * 3600;
729        Ok(now.saturating_sub(partition_created_at) >= threshold_secs)
730    }
731
732    /// 파티션이 삭제 시점이 되었는지 확인합니다.
733    ///
734    /// `partition_created_at`: UNIX timestamp (초 단위)
735    pub fn partition_needs_delete(
736        &self,
737        table: &str,
738        partition_created_at: u64,
739    ) -> DbxResult<bool> {
740        let lc = self.get_partition_lifecycle(table)?;
741        let now = std::time::SystemTime::now()
742            .duration_since(std::time::UNIX_EPOCH)
743            .unwrap()
744            .as_secs();
745        let threshold_secs = lc.delete_after_days as u64 * 24 * 3600;
746        Ok(now.saturating_sub(partition_created_at) >= threshold_secs)
747    }
748
749    /// 테이블의 파티션 수명 주기 정책을 실제로 실행합니다.
750    ///
751    /// `partition_creation_times`에 기록된 타임스탬프를 기반으로:
752    /// - 아카이브 시점이 된 파티션 → ZSTD 레벨 9 압축 자동 적용
753    /// - 삭제 시점이 된 파티션 → sub-table 드롭
754    ///
755    /// # Returns
756    /// `(archived_count, deleted_count)` — 처리된 파티션 수
757    ///
758    /// # Example
759    /// ```rust
760    /// use dbx_core::Database;
761    /// use dbx_core::storage::partition::PartitionLifecycle;
762    ///
763    /// # fn main() -> dbx_core::DbxResult<()> {
764    /// let db = Database::open_in_memory()?;
765    /// db.enable_auto_archive("logs", PartitionLifecycle {
766    ///     archive_after_days: 90,
767    ///     delete_after_days: 365,
768    /// })?;
769    /// let (archived, deleted) = db.run_partition_lifecycle("logs")?;
770    /// # Ok(())
771    /// # }
772    /// ```
773    pub fn run_partition_lifecycle(&self, table: &str) -> DbxResult<(usize, usize)> {
774        let _lc = self.get_partition_lifecycle(table)?;
775        let prefix = format!("{}__", table);
776
777        let mut candidates: Vec<(String, u64)> = self
778            .partition_creation_times
779            .iter()
780            .filter(|r| r.key().starts_with(&prefix))
781            .map(|r| (r.key().clone(), *r.value()))
782            .collect();
783
784        // delete 우선: 삭제 대상은 archive도 하지 않음
785        candidates.sort_by_key(|(_, ts)| *ts); // 오래된 것 먼저
786
787        let mut archived = 0usize;
788        let mut deleted = 0usize;
789
790        for (sub_table, created_at) in candidates {
791            if self.partition_needs_delete(table, created_at)? {
792                // 실제 sub-table 드롭
793                let drop_sql = format!("DROP TABLE IF EXISTS \"{}\"", sub_table);
794                let _ = self.execute_sql(&drop_sql);
795                // 관련 메타데이터 정리
796                self.partition_stats.remove(&sub_table);
797                self.partition_compression.remove(&sub_table);
798                self.partition_tier_hints.remove(&sub_table);
799                self.partition_creation_times.remove(&sub_table);
800                deleted += 1;
801            } else if self.partition_needs_archive(table, created_at)? {
802                // ZSTD 레벨 9 고압축 자동 적용
803                // ⚠️ sub_table은 이미 "table__p_part_N" 형식이므로
804                //    set_partition_compression(table, sub_table) 사용 시 이중 prefix 발생.
805                //    DashMap에 직접 삽입하여 올바른 키("table__p_part_N")를 사용.
806                self.partition_compression.insert(
807                    sub_table.clone(),
808                    crate::storage::compression::CompressionConfig::zstd_level(9),
809                );
810                self.partition_tier_hints.insert(
811                    sub_table.clone(),
812                    crate::storage::partition::PartitionTierHint::Cold,
813                );
814                archived += 1;
815            }
816        }
817
818        Ok((archived, deleted))
819    }
820
821    /// 라이프사이클 정책이 있는 모든 테이블에 대해 일괄 실행합니다.
822    ///
823    /// # Returns
824    /// 전체 `(archived_count, deleted_count)`
825    pub fn run_all_partition_lifecycles(&self) -> DbxResult<(usize, usize)> {
826        let tables: Vec<String> = self
827            .partition_lifecycle
828            .iter()
829            .map(|r| r.key().clone())
830            .collect();
831
832        let mut total_archived = 0usize;
833        let mut total_deleted = 0usize;
834
835        for table in tables {
836            let (a, d) = self.run_partition_lifecycle(&table)?;
837            total_archived += a;
838            total_deleted += d;
839        }
840
841        Ok((total_archived, total_deleted))
842    }
843
844    /// 파티션의 생성 시각을 조회합니다 (INSERT 시 자동 기록).
845    pub fn get_partition_creation_time(&self, partition_name: &str) -> Option<u64> {
846        self.partition_creation_times
847            .get(partition_name)
848            .map(|r| *r)
849    }
850
851    // ════════════════════════════════════════════
852    // Phase 3 Synergy: PartitionTierHint API
853    // ════════════════════════════════════════════
854
855    /// 파티션의 스토리지 티어 힌트를 설정합니다.
856    ///
857    /// # Example
858    /// ```rust
859    /// use dbx_core::Database;
860    /// use dbx_core::storage::partition::PartitionTierHint;
861    ///
862    /// # fn main() -> dbx_core::DbxResult<()> {
863    /// let db = Database::open_in_memory()?;
864    /// db.set_partition_tier("orders", "orders__p_part_0", PartitionTierHint::Hot)?;
865    /// db.set_partition_tier("orders", "orders__p_part_1", PartitionTierHint::Cold)?;
866    /// # Ok(())
867    /// # }
868    /// ```
869    pub fn set_partition_tier(
870        &self,
871        table: &str,
872        partition_name: &str,
873        hint: crate::storage::partition::PartitionTierHint,
874    ) -> DbxResult<()> {
875        let key = format!("{}__{}", table, partition_name);
876        self.partition_tier_hints.insert(key, hint);
877        Ok(())
878    }
879
880    /// 파티션 티어 힌트를 조회합니다 (미설정 시 Hot 반환).
881    pub fn get_partition_tier(
882        &self,
883        table: &str,
884        partition_name: &str,
885    ) -> DbxResult<crate::storage::partition::PartitionTierHint> {
886        let key = format!("{}__{}", table, partition_name);
887        Ok(self
888            .partition_tier_hints
889            .get(&key)
890            .map(|r| *r)
891            .unwrap_or_default())
892    }
893
894    /// 특정 티어에 속하는 파티션 목록을 반환합니다.
895    pub fn list_partitions_by_tier(
896        &self,
897        table: &str,
898        hint: crate::storage::partition::PartitionTierHint,
899    ) -> DbxResult<Vec<String>> {
900        let prefix = format!("{}__", table);
901        let result = self
902            .partition_tier_hints
903            .iter()
904            .filter(|r| r.key().starts_with(&prefix) && *r.value() == hint)
905            .map(|r| r.key().clone())
906            .collect();
907        Ok(result)
908    }
909
910    /// 뷰를 생성합니다 (Phase 5.1).
911    pub fn create_view(&self, name: &str, sql: &str) -> DbxResult<()> {
912        self.view_registry.create(name, sql)
913    }
914
915    /// 뷰를 삭제합니다 (Phase 5.1).
916    pub fn drop_view(&self, name: &str) -> DbxResult<()> {
917        self.view_registry.drop(name)
918    }
919}