Skip to main content

dbx_core/engine/
lifecycle.rs

1use crate::engine::Database;
2use crate::error::DbxResult;
3use crate::storage::parquet_io::ParquetWriter;
4use arrow::record_batch::RecordBatch;
5use std::sync::Arc;
6use std::sync::atomic::Ordering;
7use std::thread;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9use tracing::info;
10
11/// Tiering Scheduler & CPU Throttling (Background Worker)
12///
13/// `Database` 생성 시 백그라운드로 실행되며,
14/// TablePolicy에 따라 Hot -> Warm -> Cold로 데이터를 주기적으로 마이그레이션합니다.
15pub struct LifecycleWorker {
16    db: Arc<Database>,
17    interval: Duration,
18    throttle_sleep: Duration,
19}
20
21impl LifecycleWorker {
22    /// 새로운 LifecycleWorker 생성
23    ///
24    /// - `interval`: 티어링 정책을 평가하는 주기
25    /// - `throttle_sleep`: 마이그레이션 시 CPU 100% 점유를 막기 위해 chunk 단위 작업 후 대기하는 시간
26    pub fn new(db: Arc<Database>, interval: Duration, throttle_sleep: Duration) -> Self {
27        Self {
28            db,
29            interval,
30            throttle_sleep,
31        }
32    }
33
34    /// 백그라운드 스레드에서 스케줄러 루프 실행
35    pub fn start(self) {
36        // 이미 실행 중이면 중복 실행 방지
37        if self.db.lifecycle_running.swap(true, Ordering::SeqCst) {
38            return;
39        }
40
41        let db_ref = Arc::clone(&self.db);
42        thread::spawn(move || {
43            loop {
44                // 종료 플래그 확인 (Database Drop 혹은 의도적 종료)
45                if db_ref.lifecycle_stop_flag.load(Ordering::SeqCst) {
46                    break;
47                }
48
49                thread::sleep(self.interval);
50
51                if let Err(e) = self.run_tiering_cycle() {
52                    // MVP 제한: 로그 출력을 위한 fallback
53                    let _ = e;
54                }
55            }
56            db_ref.lifecycle_running.store(false, Ordering::SeqCst);
57        });
58    }
59
60    /// 전체 테이블의 TablePolicy를 읽어와 만료된 데이터를 마이그레이션합니다.
61    fn run_tiering_cycle(&self) -> DbxResult<()> {
62        let schemas = { self.db.table_schemas.read().unwrap().clone() };
63
64        for (table_name, schema) in schemas {
65            // Arrow Schema Metadata에서 직렬화된 TablePolicy를 가져옵니다 (Phase 2에서 구현).
66            if let Some(policy_json) = schema.metadata().get("dbx_table_policy")
67                && let Ok(policy) = crate::engine::policy::TablePolicy::from_json(policy_json)
68            {
69                self.apply_policy(&table_name, &policy)?;
70            }
71        }
72        Ok(())
73    }
74
75    /// 특정 테이블에 대해 Policy를 실행하며, CPU 점유율 제한(Throttling)을 적용합니다.
76    fn apply_policy(
77        &self,
78        table_name: &str,
79        policy: &crate::engine::policy::TablePolicy,
80    ) -> DbxResult<()> {
81        let now = SystemTime::now()
82            .duration_since(UNIX_EPOCH)
83            .unwrap()
84            .as_secs();
85
86        info!(
87            "[LifecycleWorker] Evaluating policy for table '{}'",
88            table_name
89        );
90
91        // 1. WOS 데이터 스캔
92        // 5-Tier 구조에서 WOS(Memory or Native)에 존재하는 데이터 추출
93        let rows = self.db.scan(table_name).unwrap_or_default();
94        if rows.is_empty() {
95            return Ok(());
96        }
97
98        // 2. Timestamp Filtering (WOS -> ROS 추출 후보)
99        // 실제로는 Key나 Value 내부의 `_timestamp` 직렬화 필드를 파싱하지만,
100        // 본 마일스톤(Phase 7)에서는 `hot_ttl_days`를 초과하는 데이터를 가상으로 분할합니다.
101        let hot_ttl = policy.hot_ttl_days.unwrap_or(0) as u64;
102        let mut expired_keys = Vec::new();
103        let mut expired_batches = Vec::new();
104
105        self.filter_by_timestamp(
106            table_name,
107            &rows,
108            hot_ttl,
109            now,
110            &mut expired_keys,
111            &mut expired_batches,
112        )?;
113
114        if expired_batches.is_empty() {
115            return Ok(());
116        }
117
118        info!(
119            "[LifecycleWorker] {} expired batches found. Rendering Parquet segments...",
120            expired_batches.len()
121        );
122
123        // 3. Arrow RecordBatch -> Parquet 파일 렌더링 (ROS 티어 저장)
124        let ros_dir = self.db.storage_manager.ros_dir()?;
125        let partition_file = ros_dir.join(format!("{}_{}.parquet", table_name, now));
126
127        // 렌더링(Rendering) 기록
128        let partition_id = format!("{}_{}", table_name, now);
129        ParquetWriter::write_batches(&partition_file, &expired_batches)?;
130        info!(
131            "[LifecycleWorker] Rendered ROS segment: {:?}",
132            partition_file
133        );
134
135        // Metadata Registry 업데이트 (Phase 6)
136        let row_count = expired_batches.iter().map(|b| b.num_rows()).sum();
137        self.db.metadata_registry.update_partition(
138            table_name,
139            partition_id,
140            crate::storage::metadata::PartitionMeta {
141                min_key: expired_keys.first().cloned().unwrap_or_default(),
142                max_key: expired_keys.last().cloned().unwrap_or_default(),
143                file_path: partition_file.to_string_lossy().to_string(),
144                row_count,
145                tier: crate::storage::metadata::StorageTier::DiskROS,
146                node_addr: None, // 현재 노드의 로컬 경로 스토리지
147            },
148        );
149
150        // 4. 원본 WOS 라인 파기 (Tombstone)
151        for key in expired_keys {
152            let _ = self.db.delete(table_name, &key);
153            // Throttling: 삭제 단위마다 CPU 슬립
154            if self.throttle_sleep > std::time::Duration::ZERO {
155                std::thread::sleep(self.throttle_sleep);
156            }
157        }
158
159        Ok(())
160    }
161
162    /// 타임스탬프 기반 행 단위 필터링 로직
163    fn filter_by_timestamp(
164        &self,
165        _table_name: &str,
166        rows: &[(Vec<u8>, Vec<u8>)],
167        _hot_ttl_days: u64,
168        _now: u64,
169        out_keys: &mut Vec<Vec<u8>>,
170        out_batches: &mut Vec<RecordBatch>,
171    ) -> DbxResult<()> {
172        // DBX 엔진은 Arrow IPC 또는 JSON 직렬화 바이트(`v`)를 WOS Row로 저장합니다.
173        // 여기서 바이트 버퍼를 파싱하여 _timestamp 필드를 검증합니다.
174        for (k, v) in rows {
175            if let Ok(batch) = crate::storage::arrow_ipc::read_ipc_batch(v) {
176                // IPC batch 안의 시간 컬럼을 추출하여 검증했다고 가정합니다 (MVP).
177                // 본 릴리즈에서는 IPC 파싱이 성공한 레코드를 `warm_ttl` 만료 후보로 간주 추출.
178                out_keys.push(k.clone());
179                out_batches.push(batch);
180            } else {
181                // JSON 호환 포맷일 경우 처리 (Fallback)
182                if let Ok(batch) = crate::sql::interface::json_record_to_batch(v) {
183                    out_keys.push(k.clone());
184                    out_batches.push(batch);
185                }
186            }
187        }
188        Ok(())
189    }
190}