Skip to main content

dbx_core/storage/
metadata.rs

1use arrow::datatypes::Schema;
2use dashmap::DashMap;
3use std::sync::Arc;
4
5#[derive(Debug, Clone, PartialEq, Eq, Default)]
6pub enum StorageTier {
7    #[default]
8    MemoryWOS,
9    DiskROS,
10    ColdEC,
11}
12
13#[derive(Debug, Clone)]
14pub struct PartitionMeta {
15    pub min_key: Vec<u8>,
16    pub max_key: Vec<u8>,
17    pub file_path: String,
18    pub row_count: usize,
19    pub tier: StorageTier,
20    /// 해당 파티션을 물리적으로 보유한 클러스터 워커 주소 (e.g. "192.168.0.2:15690")
21    pub node_addr: Option<String>,
22}
23
24#[derive(Debug, Clone)]
25pub struct TableMeta {
26    pub schema: Arc<Schema>,
27    pub partitions: Arc<DashMap<String, PartitionMeta>>,
28}
29
30impl TableMeta {
31    pub fn new(schema: Arc<Schema>) -> Self {
32        Self {
33            schema,
34            partitions: Arc::new(DashMap::new()),
35        }
36    }
37}
38
39/// 글로벌 파티션 및 스토리지 계층 상태 캐시 시스템.
40/// LifecycleWorker 가 동기화하며 Optimizer와 분산 스케줄러가 구독합니다.
41#[derive(Clone)]
42pub struct MetadataRegistry {
43    pub tables: Arc<DashMap<String, Arc<TableMeta>>>,
44}
45
46impl Default for MetadataRegistry {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl MetadataRegistry {
53    pub fn new() -> Self {
54        Self {
55            tables: Arc::new(DashMap::new()),
56        }
57    }
58
59    pub fn get_table(&self, table_name: &str) -> Option<Arc<TableMeta>> {
60        self.tables.get(table_name).map(|r| Arc::clone(r.value()))
61    }
62
63    pub fn register_table(&self, table_name: String, schema: Arc<Schema>) {
64        self.tables
65            .insert(table_name, Arc::new(TableMeta::new(schema)));
66    }
67
68    /// 특정 파티션의 위치와 계층 상태를 업데이트합니다. LifecycleWorker 컴팩션 이후 호출됩니다.
69    pub fn update_partition(&self, table_name: &str, partition_id: String, meta: PartitionMeta) {
70        if let Some(table) = self.get_table(table_name) {
71            table.partitions.insert(partition_id, meta);
72        }
73    }
74
75    /// 해당 파티션을 스캔 범위에서 완전 제거합니다. (삭제나 Time Travel, 만료 등으로 파기될 때)
76    pub fn drop_partition(&self, table_name: &str, partition_id: &str) {
77        if let Some(table) = self.get_table(table_name) {
78            table.partitions.remove(partition_id);
79        }
80    }
81}