dbx_core/engine/ddl_api.rs
1//! DDL API implementation - Schema management convenience methods
2
3use crate::engine::Database;
4use crate::error::DbxResult;
5use arrow::datatypes::{DataType, Schema};
6use std::sync::Arc;
7
8impl Database {
9 /// Create a new table with the given Arrow schema
10 ///
11 /// This is a convenience wrapper around `execute_sql("CREATE TABLE ...")`.
12 /// It automatically converts the Arrow schema to SQL DDL.
13 ///
14 /// # Example
15 ///
16 /// ```rust
17 /// use dbx_core::Database;
18 /// use arrow::datatypes::{DataType, Field, Schema};
19 ///
20 /// # fn main() -> dbx_core::DbxResult<()> {
21 /// let db = Database::open_in_memory()?;
22 ///
23 /// let schema = Schema::new(vec![
24 /// Field::new("id", DataType::Int64, false),
25 /// Field::new("name", DataType::Utf8, true),
26 /// Field::new("age", DataType::Int32, true),
27 /// ]);
28 ///
29 /// db.create_table("users", schema)?;
30 /// assert!(db.table_exists("users"));
31 /// # Ok(())
32 /// # }
33 /// ```
34 pub fn create_table(&self, name: &str, schema: Schema) -> DbxResult<()> {
35 let schema_arc = Arc::new(schema);
36
37 // Generate CREATE TABLE SQL from Arrow Schema
38 let sql = self.generate_create_table_sql(name, &schema_arc);
39
40 // Execute SQL FIRST (this will check if table exists)
41 self.execute_sql(&sql)?;
42
43 // THEN store schema (after SQL succeeds)
44 self.table_schemas
45 .write()
46 .unwrap()
47 .insert(name.to_string(), Arc::clone(&schema_arc));
48
49 // Initialize empty table data
50 self.tables
51 .write()
52 .unwrap()
53 .insert(name.to_string(), vec![]);
54
55 // Initialize row counter
56 self.row_counters
57 .insert(name.to_string(), std::sync::atomic::AtomicUsize::new(0));
58
59 Ok(())
60 }
61
62 /// Drop a table
63 ///
64 /// # Example
65 ///
66 /// ```rust
67 /// use dbx_core::Database;
68 /// use arrow::datatypes::{DataType, Field, Schema};
69 ///
70 /// # fn main() -> dbx_core::DbxResult<()> {
71 /// let db = Database::open_in_memory()?;
72 ///
73 /// let schema = Schema::new(vec![
74 /// Field::new("id", DataType::Int64, false),
75 /// ]);
76 ///
77 /// db.create_table("temp", schema)?;
78 /// db.drop_table("temp")?;
79 /// assert!(!db.table_exists("temp"));
80 /// # Ok(())
81 /// # }
82 /// ```
83 pub fn drop_table(&self, name: &str) -> DbxResult<()> {
84 self.execute_sql(&format!("DROP TABLE {}", name))?;
85 self.table_schemas.write().unwrap().remove(name);
86 Ok(())
87 }
88
89 /// Check if a table exists
90 ///
91 /// # Example
92 ///
93 /// ```rust
94 /// use dbx_core::Database;
95 /// use arrow::datatypes::{DataType, Field, Schema};
96 ///
97 /// # fn main() -> dbx_core::DbxResult<()> {
98 /// let db = Database::open_in_memory()?;
99 ///
100 /// assert!(!db.table_exists("users"));
101 ///
102 /// let schema = Schema::new(vec![
103 /// Field::new("id", DataType::Int64, false),
104 /// ]);
105 ///
106 /// db.create_table("users", schema)?;
107 /// assert!(db.table_exists("users"));
108 /// # Ok(())
109 /// # }
110 /// ```
111 pub fn table_exists(&self, name: &str) -> bool {
112 self.table_schemas.read().unwrap().contains_key(name)
113 }
114
115 /// Get the schema of a table
116 ///
117 /// # Example
118 ///
119 /// ```rust
120 /// use dbx_core::Database;
121 /// use arrow::datatypes::{DataType, Field, Schema};
122 ///
123 /// # fn main() -> dbx_core::DbxResult<()> {
124 /// let db = Database::open_in_memory()?;
125 ///
126 /// let schema = Schema::new(vec![
127 /// Field::new("id", DataType::Int64, false),
128 /// Field::new("name", DataType::Utf8, true),
129 /// ]);
130 ///
131 /// db.create_table("users", schema.clone())?;
132 /// let retrieved_schema = db.get_table_schema("users")?;
133 /// assert_eq!(retrieved_schema.fields().len(), 2);
134 /// # Ok(())
135 /// # }
136 /// ```
137 pub fn get_table_schema(&self, name: &str) -> DbxResult<Schema> {
138 self.table_schemas
139 .read()
140 .unwrap()
141 .get(name)
142 .map(|s| (**s).clone())
143 .ok_or_else(|| crate::DbxError::Schema(format!("Table '{}' not found", name)))
144 }
145
146 /// List all tables
147 pub fn list_tables(&self) -> Vec<String> {
148 self.table_schemas.read().unwrap().keys().cloned().collect()
149 }
150
151 /// Helper: Generate CREATE TABLE SQL from Arrow Schema
152 fn generate_create_table_sql(&self, name: &str, schema: &Schema) -> String {
153 let columns: Vec<String> = schema
154 .fields()
155 .iter()
156 .map(|field| {
157 let sql_type = match field.data_type() {
158 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => "INT",
159 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
160 "INT"
161 }
162 DataType::Float32 | DataType::Float64 => "FLOAT",
163 DataType::Utf8 | DataType::LargeUtf8 => "TEXT",
164 DataType::Boolean => "BOOLEAN",
165 DataType::Binary | DataType::LargeBinary => "BLOB",
166 DataType::Date32 | DataType::Date64 => "DATE",
167 DataType::Timestamp(_, _) => "TIMESTAMP",
168 _ => "TEXT", // Default to TEXT for unsupported types
169 };
170 format!("{} {}", field.name(), sql_type)
171 })
172 .collect();
173
174 format!("CREATE TABLE {} ({})", name, columns.join(", "))
175 }
176
177 /// Create a SQL index on table columns
178 ///
179 /// This is a convenience wrapper around `execute_sql("CREATE INDEX ...")`.
180 /// For Hash Index (O(1) lookup), use `create_index(table, column)` instead.
181 ///
182 /// # Example
183 ///
184 /// ```rust
185 /// use dbx_core::Database;
186 /// use arrow::datatypes::{DataType, Field, Schema};
187 ///
188 /// # fn main() -> dbx_core::DbxResult<()> {
189 /// let db = Database::open_in_memory()?;
190 ///
191 /// let schema = Schema::new(vec![
192 /// Field::new("id", DataType::Int64, false),
193 /// Field::new("email", DataType::Utf8, true),
194 /// ]);
195 ///
196 /// db.create_table("users", schema)?;
197 /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
198 /// assert!(db.sql_index_exists("idx_email"));
199 /// # Ok(())
200 /// # }
201 /// ```
202 pub fn create_sql_index(
203 &self,
204 table: &str,
205 index_name: &str,
206 columns: Vec<String>,
207 ) -> DbxResult<()> {
208 // Generate CREATE INDEX SQL
209 let columns_str = columns.join(", ");
210 let sql = format!("CREATE INDEX {} ON {} ({})", index_name, table, columns_str);
211
212 // Execute SQL
213 self.execute_sql(&sql)?;
214 Ok(())
215 }
216
217 /// Drop a SQL index
218 ///
219 /// This is a convenience wrapper around `execute_sql("DROP INDEX ...")`.
220 /// For Hash Index, use `drop_index(table, column)` instead.
221 ///
222 /// Note: The index must have been created with `create_sql_index` to be tracked properly.
223 ///
224 /// # Example
225 ///
226 /// ```rust
227 /// use dbx_core::Database;
228 /// use arrow::datatypes::{DataType, Field, Schema};
229 ///
230 /// # fn main() -> dbx_core::DbxResult<()> {
231 /// let db = Database::open_in_memory()?;
232 ///
233 /// let schema = Schema::new(vec![
234 /// Field::new("id", DataType::Int64, false),
235 /// Field::new("email", DataType::Utf8, true),
236 /// ]);
237 ///
238 /// db.create_table("users", schema)?;
239 /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
240 /// db.drop_sql_index("users", "idx_email")?;
241 /// assert!(!db.sql_index_exists("idx_email"));
242 /// # Ok(())
243 /// # }
244 /// ```
245 pub fn drop_sql_index(&self, table: &str, index_name: &str) -> DbxResult<()> {
246 // Use table.index_name format for DROP INDEX
247 let sql = format!("DROP INDEX {}.{}", table, index_name);
248 self.execute_sql(&sql)?;
249 Ok(())
250 }
251
252 /// Check if a SQL index exists
253 ///
254 /// For Hash Index, use `has_index(table, column)` instead.
255 ///
256 /// # Example
257 ///
258 /// ```rust
259 /// use dbx_core::Database;
260 /// use arrow::datatypes::{DataType, Field, Schema};
261 ///
262 /// # fn main() -> dbx_core::DbxResult<()> {
263 /// let db = Database::open_in_memory()?;
264 ///
265 /// let schema = Schema::new(vec![
266 /// Field::new("id", DataType::Int64, false),
267 /// Field::new("email", DataType::Utf8, true),
268 /// ]);
269 ///
270 /// db.create_table("users", schema)?;
271 /// assert!(!db.sql_index_exists("idx_email"));
272 ///
273 /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
274 /// assert!(db.sql_index_exists("idx_email"));
275 /// # Ok(())
276 /// # }
277 /// ```
278 pub fn sql_index_exists(&self, index_name: &str) -> bool {
279 self.index_registry.read().unwrap().contains_key(index_name)
280 }
281
282 /// List all SQL indexes for a table
283 ///
284 /// # Example
285 ///
286 /// ```rust
287 /// use dbx_core::Database;
288 /// use arrow::datatypes::{DataType, Field, Schema};
289 ///
290 /// # fn main() -> dbx_core::DbxResult<()> {
291 /// let db = Database::open_in_memory()?;
292 ///
293 /// let schema = Schema::new(vec![
294 /// Field::new("id", DataType::Int64, false),
295 /// Field::new("email", DataType::Utf8, true),
296 /// Field::new("name", DataType::Utf8, true),
297 /// ]);
298 ///
299 /// db.create_table("users", schema)?;
300 /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
301 /// db.create_sql_index("users", "idx_name", vec!["name".to_string()])?;
302 ///
303 /// let indexes = db.list_sql_indexes("users");
304 /// assert!(indexes.contains(&"idx_email".to_string()));
305 /// assert!(indexes.contains(&"idx_name".to_string()));
306 /// # Ok(())
307 /// # }
308 /// ```
309 pub fn list_sql_indexes(&self, table: &str) -> Vec<String> {
310 self.index_registry
311 .read()
312 .unwrap()
313 .iter()
314 .filter_map(|(index_name, (tbl, _col))| {
315 if tbl == table {
316 Some(index_name.clone())
317 } else {
318 None
319 }
320 })
321 .collect()
322 }
323
324 /// Add a column to an existing table
325 ///
326 /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... ADD COLUMN ...")`.
327 ///
328 /// # Example
329 ///
330 /// ```rust
331 /// use dbx_core::Database;
332 /// use arrow::datatypes::{DataType, Field, Schema};
333 ///
334 /// # fn main() -> dbx_core::DbxResult<()> {
335 /// let db = Database::open_in_memory()?;
336 ///
337 /// let schema = Schema::new(vec![
338 /// Field::new("id", DataType::Int64, false),
339 /// Field::new("name", DataType::Utf8, true),
340 /// ]);
341 ///
342 /// db.create_table("users", schema)?;
343 /// db.add_column("users", "email", "TEXT")?;
344 ///
345 /// let updated_schema = db.get_table_schema("users")?;
346 /// assert_eq!(updated_schema.fields().len(), 3);
347 /// # Ok(())
348 /// # }
349 /// ```
350 pub fn add_column(&self, table: &str, column_name: &str, data_type: &str) -> DbxResult<()> {
351 let sql = format!(
352 "ALTER TABLE {} ADD COLUMN {} {}",
353 table, column_name, data_type
354 );
355 self.execute_sql(&sql)?;
356 Ok(())
357 }
358
359 /// Drop a column from an existing table
360 ///
361 /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... DROP COLUMN ...")`.
362 ///
363 /// # Example
364 ///
365 /// ```rust
366 /// use dbx_core::Database;
367 /// use arrow::datatypes::{DataType, Field, Schema};
368 ///
369 /// # fn main() -> dbx_core::DbxResult<()> {
370 /// let db = Database::open_in_memory()?;
371 ///
372 /// let schema = Schema::new(vec![
373 /// Field::new("id", DataType::Int64, false),
374 /// Field::new("name", DataType::Utf8, true),
375 /// Field::new("email", DataType::Utf8, true),
376 /// ]);
377 ///
378 /// db.create_table("users", schema)?;
379 /// db.drop_column("users", "email")?;
380 ///
381 /// let updated_schema = db.get_table_schema("users")?;
382 /// assert_eq!(updated_schema.fields().len(), 2);
383 /// # Ok(())
384 /// # }
385 /// ```
386 pub fn drop_column(&self, table: &str, column_name: &str) -> DbxResult<()> {
387 let sql = format!("ALTER TABLE {} DROP COLUMN {}", table, column_name);
388 self.execute_sql(&sql)?;
389 Ok(())
390 }
391
392 /// Rename a column in an existing table
393 ///
394 /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... RENAME COLUMN ...")`.
395 ///
396 /// # Example
397 ///
398 /// ```rust
399 /// use dbx_core::Database;
400 /// use arrow::datatypes::{DataType, Field, Schema};
401 ///
402 /// # fn main() -> dbx_core::DbxResult<()> {
403 /// let db = Database::open_in_memory()?;
404 ///
405 /// let schema = Schema::new(vec![
406 /// Field::new("id", DataType::Int64, false),
407 /// Field::new("user_name", DataType::Utf8, true),
408 /// ]);
409 ///
410 /// db.create_table("users", schema)?;
411 /// db.rename_column("users", "user_name", "name")?;
412 ///
413 /// let updated_schema = db.get_table_schema("users")?;
414 /// assert_eq!(updated_schema.field(1).name(), "name");
415 /// # Ok(())
416 /// # }
417 /// ```
418 pub fn rename_column(&self, table: &str, old_name: &str, new_name: &str) -> DbxResult<()> {
419 let sql = format!(
420 "ALTER TABLE {} RENAME COLUMN {} TO {}",
421 table, old_name, new_name
422 );
423 self.execute_sql(&sql)?;
424 Ok(())
425 }
426
427 // ════════════════════════════════════════════
428 // Phase 3: 파티셔닝 API (Partitioning API)
429 // ════════════════════════════════════════════
430
431 /// 파티셔닝 규칙을 생성합니다.
432 ///
433 /// 이후 해당 `table`로 들어오는 INSERT는 키 값에 따라
434 /// `route_key()`가 반환하는 내부 sub-table로 라우팅됩니다.
435 pub fn create_partition(&self, map: crate::storage::partition::PartitionMap) -> DbxResult<()> {
436 let table_name = map.table.clone();
437 self.partition_maps.write().unwrap().insert(table_name, map);
438 Ok(())
439 }
440
441 /// 자동 확장(Auto-Expand)을 지원하는 범위 파티션을 생성합니다 (Phase 3.4).
442 ///
443 /// 설정된 범위를 초과하는 키 값이 인입되면 `interval` 크기만큼
444 /// 새로운 파티션 구획을 자동 생성하며 지속적으로 확장됩니다.
445 pub fn create_auto_range_partition(
446 &self,
447 table: &str,
448 column: &str,
449 initial_low: i64,
450 interval: i64,
451 max_partitions: usize,
452 ) -> DbxResult<()> {
453 use crate::storage::partition::{PartitionMap, PartitionType};
454 let map = PartitionMap {
455 table: table.to_string(),
456 partition_type: PartitionType::Range {
457 column: column.to_string(),
458 bounds: vec![(initial_low, initial_low + interval)],
459 auto_expand_interval: Some((interval, max_partitions)),
460 },
461 num_partitions: 1, // 초기 파티션 크기
462 };
463
464 self.partition_maps
465 .write()
466 .unwrap()
467 .insert(table.to_string(), map);
468 Ok(())
469 }
470
471 /// 파티셔닝 규칙을 제거합니다.
472 pub fn drop_partition(&self, table: &str) -> DbxResult<()> {
473 self.partition_maps.write().unwrap().remove(table);
474 Ok(())
475 }
476
477 // ════════════════════════════════════════════
478 // Phase 3 Synergy: PartitionStats API
479 // ════════════════════════════════════════════
480
481 /// 파티션 통계를 갱신합니다.
482 ///
483 /// # Example
484 /// ```rust
485 /// use dbx_core::Database;
486 /// use dbx_core::storage::partition::PartitionStats;
487 ///
488 /// # fn main() -> dbx_core::DbxResult<()> {
489 /// let db = Database::open_in_memory()?;
490 /// db.update_partition_stats("orders", "orders__p_part_0", PartitionStats {
491 /// row_count: 1000, min_value: 0, max_value: 999,
492 /// null_count: 0, distinct_count: 1000,
493 /// })?;
494 /// # Ok(())
495 /// # }
496 /// ```
497 pub fn update_partition_stats(
498 &self,
499 table: &str,
500 partition_name: &str,
501 stats: crate::storage::partition::PartitionStats,
502 ) -> DbxResult<()> {
503 let key = format!("{}__{}", table, partition_name);
504 self.partition_stats.insert(key, stats);
505 Ok(())
506 }
507
508 /// 특정 파티션의 통계를 조회합니다.
509 pub fn get_partition_stats(
510 &self,
511 table: &str,
512 partition_name: &str,
513 ) -> DbxResult<crate::storage::partition::PartitionStats> {
514 let key = format!("{}__{}", table, partition_name);
515 self.partition_stats
516 .get(&key)
517 .map(|r| r.clone())
518 .ok_or_else(|| crate::error::DbxError::InvalidOperation {
519 message: format!("No stats for partition '{}'", partition_name),
520 context: format!("Call update_partition_stats first for table '{}'", table),
521 })
522 }
523
524 /// 테이블의 모든 파티션 통계를 반환합니다.
525 pub fn all_partition_stats(
526 &self,
527 table: &str,
528 ) -> DbxResult<std::collections::HashMap<String, crate::storage::partition::PartitionStats>>
529 {
530 let prefix = format!("{}__", table);
531 let result = self
532 .partition_stats
533 .iter()
534 .filter(|r| r.key().starts_with(&prefix))
535 .map(|r| (r.key().clone(), r.value().clone()))
536 .collect();
537 Ok(result)
538 }
539
540 // ════════════════════════════════════════════
541 // Phase 3 Synergy: Per-Partition Compression API
542 // ════════════════════════════════════════════
543
544 /// 파티션별 압축 설정을 지정합니다.
545 ///
546 /// # Example
547 /// ```rust
548 /// use dbx_core::Database;
549 /// use dbx_core::storage::compression::CompressionConfig;
550 ///
551 /// # fn main() -> dbx_core::DbxResult<()> {
552 /// let db = Database::open_in_memory()?;
553 /// // 오래된 파티션 → 고압축
554 /// db.set_partition_compression("orders", "orders__p_part_0", CompressionConfig::zstd_level(9))?;
555 /// # Ok(())
556 /// # }
557 /// ```
558 pub fn set_partition_compression(
559 &self,
560 table: &str,
561 partition_name: &str,
562 config: crate::storage::compression::CompressionConfig,
563 ) -> DbxResult<()> {
564 let key = format!("{}__{}", table, partition_name);
565 self.partition_compression.insert(key, config);
566 Ok(())
567 }
568
569 /// 파티션 압축 설정 조회 (미설정 시 기본값 Snappy 반환).
570 pub fn get_partition_compression(
571 &self,
572 table: &str,
573 partition_name: &str,
574 ) -> DbxResult<crate::storage::compression::CompressionConfig> {
575 let key = format!("{}__{}", table, partition_name);
576 Ok(self
577 .partition_compression
578 .get(&key)
579 .map(|r| *r)
580 .unwrap_or_default())
581 }
582
583 // ════════════════════════════════════════════
584 // Phase 3 Synergy: PartitionLifecycle API
585 // ════════════════════════════════════════════
586
587 /// 테이블의 파티션 수명 주기 정책을 설정합니다.
588 ///
589 /// # Example
590 /// ```rust
591 /// use dbx_core::Database;
592 /// use dbx_core::storage::partition::PartitionLifecycle;
593 ///
594 /// # fn main() -> dbx_core::DbxResult<()> {
595 /// let db = Database::open_in_memory()?;
596 /// db.enable_auto_archive("logs", PartitionLifecycle {
597 /// archive_after_days: 90,
598 /// delete_after_days: 365,
599 /// })?;
600 /// # Ok(())
601 /// # }
602 /// ```
603 pub fn enable_auto_archive(
604 &self,
605 table: &str,
606 lifecycle: crate::storage::partition::PartitionLifecycle,
607 ) -> DbxResult<()> {
608 self.partition_lifecycle
609 .insert(table.to_string(), lifecycle);
610
611 // ── 완전 자동화: 백그라운드 Lifecycle 스케줄러 구동 ──────────────
612 // compare_exchange: false→true 성공 시만 스레드 기동 (중복 방지)
613 use std::sync::atomic::Ordering;
614 if self.lifecycle_stop_flag.load(Ordering::Relaxed) {
615 // 이전에 stop된 경우 플래그 리셋
616 self.lifecycle_stop_flag.store(false, Ordering::SeqCst);
617 }
618
619 if self
620 .lifecycle_running
621 .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
622 .is_ok()
623 {
624 let stop = Arc::clone(&self.lifecycle_stop_flag);
625 let running = Arc::clone(&self.lifecycle_running);
626 let lifecycle_map = Arc::clone(&self.partition_lifecycle);
627 let creation_times = Arc::clone(&self.partition_creation_times);
628 let compression_map = Arc::clone(&self.partition_compression);
629 let tier_map = Arc::clone(&self.partition_tier_hints);
630 let stats_map = Arc::clone(&self.partition_stats);
631
632 std::thread::Builder::new()
633 .name("dbx-lifecycle-scheduler".into())
634 .spawn(move || {
635 use std::sync::atomic::Ordering;
636 let interval = std::time::Duration::from_secs(3600); // 1시간마다
637 loop {
638 std::thread::sleep(interval);
639 if stop.load(Ordering::Relaxed) {
640 running.store(false, Ordering::SeqCst);
641 break;
642 }
643 let now = std::time::SystemTime::now()
644 .duration_since(std::time::UNIX_EPOCH)
645 .unwrap()
646 .as_secs();
647
648 let tables: Vec<String> =
649 lifecycle_map.iter().map(|r| r.key().clone()).collect();
650
651 for table in tables {
652 if stop.load(Ordering::Relaxed) {
653 break;
654 }
655 let lc = match lifecycle_map.get(&table) {
656 Some(l) => l.clone(),
657 None => continue,
658 };
659 let prefix = format!("{}__", table);
660 let candidates: Vec<(String, u64)> = creation_times
661 .iter()
662 .filter(|r| r.key().starts_with(&prefix))
663 .map(|r| (r.key().clone(), *r.value()))
664 .collect();
665
666 for (sub_table, created_at) in candidates {
667 let age_secs = now.saturating_sub(created_at);
668 let delete_threshold = lc.delete_after_days as u64 * 86400;
669 let archive_threshold = lc.archive_after_days as u64 * 86400;
670
671 if age_secs >= delete_threshold {
672 stats_map.remove(&sub_table);
673 compression_map.remove(&sub_table);
674 tier_map.remove(&sub_table);
675 creation_times.remove(&sub_table);
676 } else if age_secs >= archive_threshold {
677 compression_map.insert(
678 sub_table.clone(),
679 crate::storage::compression::CompressionConfig::zstd_level(
680 9,
681 ),
682 );
683 tier_map.insert(
684 sub_table.clone(),
685 crate::storage::partition::PartitionTierHint::Cold,
686 );
687 }
688 }
689 }
690 }
691 })
692 .map_err(|e| crate::error::DbxError::InvalidOperation {
693 message: format!("Failed to spawn lifecycle scheduler: {}", e),
694 context: "enable_auto_archive".to_string(),
695 })?;
696 } // end if CAS
697
698 Ok(())
699 }
700
701 /// 테이블의 파티션 수명 주기 정책을 조회합니다.
702 pub fn get_partition_lifecycle(
703 &self,
704 table: &str,
705 ) -> DbxResult<crate::storage::partition::PartitionLifecycle> {
706 self.partition_lifecycle
707 .get(table)
708 .map(|r| r.clone())
709 .ok_or_else(|| crate::error::DbxError::InvalidOperation {
710 message: format!("No lifecycle policy for table '{}'", table),
711 context: "Call enable_auto_archive first".to_string(),
712 })
713 }
714
715 /// 파티션이 아카이브 시점이 되었는지 확인합니다.
716 ///
717 /// `partition_created_at`: UNIX timestamp (초 단위)
718 pub fn partition_needs_archive(
719 &self,
720 table: &str,
721 partition_created_at: u64,
722 ) -> DbxResult<bool> {
723 let lc = self.get_partition_lifecycle(table)?;
724 let now = std::time::SystemTime::now()
725 .duration_since(std::time::UNIX_EPOCH)
726 .unwrap()
727 .as_secs();
728 let threshold_secs = lc.archive_after_days as u64 * 24 * 3600;
729 Ok(now.saturating_sub(partition_created_at) >= threshold_secs)
730 }
731
732 /// 파티션이 삭제 시점이 되었는지 확인합니다.
733 ///
734 /// `partition_created_at`: UNIX timestamp (초 단위)
735 pub fn partition_needs_delete(
736 &self,
737 table: &str,
738 partition_created_at: u64,
739 ) -> DbxResult<bool> {
740 let lc = self.get_partition_lifecycle(table)?;
741 let now = std::time::SystemTime::now()
742 .duration_since(std::time::UNIX_EPOCH)
743 .unwrap()
744 .as_secs();
745 let threshold_secs = lc.delete_after_days as u64 * 24 * 3600;
746 Ok(now.saturating_sub(partition_created_at) >= threshold_secs)
747 }
748
749 /// 테이블의 파티션 수명 주기 정책을 실제로 실행합니다.
750 ///
751 /// `partition_creation_times`에 기록된 타임스탬프를 기반으로:
752 /// - 아카이브 시점이 된 파티션 → ZSTD 레벨 9 압축 자동 적용
753 /// - 삭제 시점이 된 파티션 → sub-table 드롭
754 ///
755 /// # Returns
756 /// `(archived_count, deleted_count)` — 처리된 파티션 수
757 ///
758 /// # Example
759 /// ```rust
760 /// use dbx_core::Database;
761 /// use dbx_core::storage::partition::PartitionLifecycle;
762 ///
763 /// # fn main() -> dbx_core::DbxResult<()> {
764 /// let db = Database::open_in_memory()?;
765 /// db.enable_auto_archive("logs", PartitionLifecycle {
766 /// archive_after_days: 90,
767 /// delete_after_days: 365,
768 /// })?;
769 /// let (archived, deleted) = db.run_partition_lifecycle("logs")?;
770 /// # Ok(())
771 /// # }
772 /// ```
773 pub fn run_partition_lifecycle(&self, table: &str) -> DbxResult<(usize, usize)> {
774 let _lc = self.get_partition_lifecycle(table)?;
775 let prefix = format!("{}__", table);
776
777 let mut candidates: Vec<(String, u64)> = self
778 .partition_creation_times
779 .iter()
780 .filter(|r| r.key().starts_with(&prefix))
781 .map(|r| (r.key().clone(), *r.value()))
782 .collect();
783
784 // delete 우선: 삭제 대상은 archive도 하지 않음
785 candidates.sort_by_key(|(_, ts)| *ts); // 오래된 것 먼저
786
787 let mut archived = 0usize;
788 let mut deleted = 0usize;
789
790 for (sub_table, created_at) in candidates {
791 if self.partition_needs_delete(table, created_at)? {
792 // 실제 sub-table 드롭
793 let drop_sql = format!("DROP TABLE IF EXISTS \"{}\"", sub_table);
794 let _ = self.execute_sql(&drop_sql);
795 // 관련 메타데이터 정리
796 self.partition_stats.remove(&sub_table);
797 self.partition_compression.remove(&sub_table);
798 self.partition_tier_hints.remove(&sub_table);
799 self.partition_creation_times.remove(&sub_table);
800 deleted += 1;
801 } else if self.partition_needs_archive(table, created_at)? {
802 // ZSTD 레벨 9 고압축 자동 적용
803 // ⚠️ sub_table은 이미 "table__p_part_N" 형식이므로
804 // set_partition_compression(table, sub_table) 사용 시 이중 prefix 발생.
805 // DashMap에 직접 삽입하여 올바른 키("table__p_part_N")를 사용.
806 self.partition_compression.insert(
807 sub_table.clone(),
808 crate::storage::compression::CompressionConfig::zstd_level(9),
809 );
810 self.partition_tier_hints.insert(
811 sub_table.clone(),
812 crate::storage::partition::PartitionTierHint::Cold,
813 );
814 archived += 1;
815 }
816 }
817
818 Ok((archived, deleted))
819 }
820
821 /// 라이프사이클 정책이 있는 모든 테이블에 대해 일괄 실행합니다.
822 ///
823 /// # Returns
824 /// 전체 `(archived_count, deleted_count)`
825 pub fn run_all_partition_lifecycles(&self) -> DbxResult<(usize, usize)> {
826 let tables: Vec<String> = self
827 .partition_lifecycle
828 .iter()
829 .map(|r| r.key().clone())
830 .collect();
831
832 let mut total_archived = 0usize;
833 let mut total_deleted = 0usize;
834
835 for table in tables {
836 let (a, d) = self.run_partition_lifecycle(&table)?;
837 total_archived += a;
838 total_deleted += d;
839 }
840
841 Ok((total_archived, total_deleted))
842 }
843
844 /// 파티션의 생성 시각을 조회합니다 (INSERT 시 자동 기록).
845 pub fn get_partition_creation_time(&self, partition_name: &str) -> Option<u64> {
846 self.partition_creation_times
847 .get(partition_name)
848 .map(|r| *r)
849 }
850
851 // ════════════════════════════════════════════
852 // Phase 3 Synergy: PartitionTierHint API
853 // ════════════════════════════════════════════
854
855 /// 파티션의 스토리지 티어 힌트를 설정합니다.
856 ///
857 /// # Example
858 /// ```rust
859 /// use dbx_core::Database;
860 /// use dbx_core::storage::partition::PartitionTierHint;
861 ///
862 /// # fn main() -> dbx_core::DbxResult<()> {
863 /// let db = Database::open_in_memory()?;
864 /// db.set_partition_tier("orders", "orders__p_part_0", PartitionTierHint::Hot)?;
865 /// db.set_partition_tier("orders", "orders__p_part_1", PartitionTierHint::Cold)?;
866 /// # Ok(())
867 /// # }
868 /// ```
869 pub fn set_partition_tier(
870 &self,
871 table: &str,
872 partition_name: &str,
873 hint: crate::storage::partition::PartitionTierHint,
874 ) -> DbxResult<()> {
875 let key = format!("{}__{}", table, partition_name);
876 self.partition_tier_hints.insert(key, hint);
877 Ok(())
878 }
879
880 /// 파티션 티어 힌트를 조회합니다 (미설정 시 Hot 반환).
881 pub fn get_partition_tier(
882 &self,
883 table: &str,
884 partition_name: &str,
885 ) -> DbxResult<crate::storage::partition::PartitionTierHint> {
886 let key = format!("{}__{}", table, partition_name);
887 Ok(self
888 .partition_tier_hints
889 .get(&key)
890 .map(|r| *r)
891 .unwrap_or_default())
892 }
893
894 /// 특정 티어에 속하는 파티션 목록을 반환합니다.
895 pub fn list_partitions_by_tier(
896 &self,
897 table: &str,
898 hint: crate::storage::partition::PartitionTierHint,
899 ) -> DbxResult<Vec<String>> {
900 let prefix = format!("{}__", table);
901 let result = self
902 .partition_tier_hints
903 .iter()
904 .filter(|r| r.key().starts_with(&prefix) && *r.value() == hint)
905 .map(|r| r.key().clone())
906 .collect();
907 Ok(result)
908 }
909
910 /// 뷰를 생성합니다 (Phase 5.1).
911 pub fn create_view(&self, name: &str, sql: &str) -> DbxResult<()> {
912 self.view_registry.create(name, sql)
913 }
914
915 /// 뷰를 삭제합니다 (Phase 5.1).
916 pub fn drop_view(&self, name: &str) -> DbxResult<()> {
917 self.view_registry.drop(name)
918 }
919}