1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
use crate::engine::Database;
use crate::error::DbxResult;
use crate::storage::parquet_io::ParquetWriter;
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::info;
/// Tiering Scheduler & CPU Throttling (Background Worker)
///
/// `Database` 생성 시 백그라운드로 실행되며,
/// TablePolicy에 따라 Hot -> Warm -> Cold로 데이터를 주기적으로 마이그레이션합니다.
pub struct LifecycleWorker {
db: Arc<Database>,
interval: Duration,
throttle_sleep: Duration,
}
impl LifecycleWorker {
/// 새로운 LifecycleWorker 생성
///
/// - `interval`: 티어링 정책을 평가하는 주기
/// - `throttle_sleep`: 마이그레이션 시 CPU 100% 점유를 막기 위해 chunk 단위 작업 후 대기하는 시간
pub fn new(db: Arc<Database>, interval: Duration, throttle_sleep: Duration) -> Self {
Self {
db,
interval,
throttle_sleep,
}
}
/// 백그라운드 스레드에서 스케줄러 루프 실행
pub fn start(self) {
// 이미 실행 중이면 중복 실행 방지
if self.db.lifecycle_running.swap(true, Ordering::SeqCst) {
return;
}
let db_ref = Arc::clone(&self.db);
thread::spawn(move || {
loop {
// 종료 플래그 확인 (Database Drop 혹은 의도적 종료)
if db_ref.lifecycle_stop_flag.load(Ordering::SeqCst) {
break;
}
thread::sleep(self.interval);
if let Err(e) = self.run_tiering_cycle() {
// MVP 제한: 로그 출력을 위한 fallback
let _ = e;
}
}
db_ref.lifecycle_running.store(false, Ordering::SeqCst);
});
}
/// 전체 테이블의 TablePolicy를 읽어와 만료된 데이터를 마이그레이션합니다.
fn run_tiering_cycle(&self) -> DbxResult<()> {
let schemas = { self.db.table_schemas.read().unwrap().clone() };
for (table_name, schema) in schemas {
// Arrow Schema Metadata에서 직렬화된 TablePolicy를 가져옵니다 (Phase 2에서 구현).
if let Some(policy_json) = schema.metadata().get("dbx_table_policy")
&& let Ok(policy) = crate::engine::policy::TablePolicy::from_json(policy_json)
{
self.apply_policy(&table_name, &policy)?;
}
}
Ok(())
}
/// 특정 테이블에 대해 Policy를 실행하며, CPU 점유율 제한(Throttling)을 적용합니다.
fn apply_policy(
&self,
table_name: &str,
policy: &crate::engine::policy::TablePolicy,
) -> DbxResult<()> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
info!(
"[LifecycleWorker] Evaluating policy for table '{}'",
table_name
);
// 1. WOS 데이터 스캔
// 5-Tier 구조에서 WOS(Memory or Native)에 존재하는 데이터 추출
let rows = self.db.scan(table_name).unwrap_or_default();
if rows.is_empty() {
return Ok(());
}
// 2. Timestamp Filtering (WOS -> ROS 추출 후보)
// 실제로는 Key나 Value 내부의 `_timestamp` 직렬화 필드를 파싱하지만,
// 본 마일스톤(Phase 7)에서는 `hot_ttl_days`를 초과하는 데이터를 가상으로 분할합니다.
let hot_ttl = policy.hot_ttl_days.unwrap_or(0) as u64;
let mut expired_keys = Vec::new();
let mut expired_batches = Vec::new();
self.filter_by_timestamp(
table_name,
&rows,
hot_ttl,
now,
&mut expired_keys,
&mut expired_batches,
)?;
if expired_batches.is_empty() {
return Ok(());
}
info!(
"[LifecycleWorker] {} expired batches found. Rendering Parquet segments...",
expired_batches.len()
);
// 3. Arrow RecordBatch -> Parquet 파일 렌더링 (ROS 티어 저장)
let ros_dir = self.db.storage_manager.ros_dir()?;
let partition_file = ros_dir.join(format!("{}_{}.parquet", table_name, now));
// 렌더링(Rendering) 기록
let partition_id = format!("{}_{}", table_name, now);
ParquetWriter::write_batches(&partition_file, &expired_batches)?;
info!(
"[LifecycleWorker] Rendered ROS segment: {:?}",
partition_file
);
// Metadata Registry 업데이트 (Phase 6)
let row_count = expired_batches.iter().map(|b| b.num_rows()).sum();
self.db.metadata_registry.update_partition(
table_name,
partition_id,
crate::storage::metadata::PartitionMeta {
min_key: expired_keys.first().cloned().unwrap_or_default(),
max_key: expired_keys.last().cloned().unwrap_or_default(),
file_path: partition_file.to_string_lossy().to_string(),
row_count,
tier: crate::storage::metadata::StorageTier::DiskROS,
node_addr: None, // 현재 노드의 로컬 경로 스토리지
},
);
// 4. 원본 WOS 라인 파기 (Tombstone)
for key in expired_keys {
let _ = self.db.delete(table_name, &key);
// Throttling: 삭제 단위마다 CPU 슬립
if self.throttle_sleep > std::time::Duration::ZERO {
std::thread::sleep(self.throttle_sleep);
}
}
Ok(())
}
/// 타임스탬프 기반 행 단위 필터링 로직
fn filter_by_timestamp(
&self,
_table_name: &str,
rows: &[(Vec<u8>, Vec<u8>)],
_hot_ttl_days: u64,
_now: u64,
out_keys: &mut Vec<Vec<u8>>,
out_batches: &mut Vec<RecordBatch>,
) -> DbxResult<()> {
// DBX 엔진은 Arrow IPC 또는 JSON 직렬화 바이트(`v`)를 WOS Row로 저장합니다.
// 여기서 바이트 버퍼를 파싱하여 _timestamp 필드를 검증합니다.
for (k, v) in rows {
if let Ok(batch) = crate::storage::arrow_ipc::read_ipc_batch(v) {
// IPC batch 안의 시간 컬럼을 추출하여 검증했다고 가정합니다 (MVP).
// 본 릴리즈에서는 IPC 파싱이 성공한 레코드를 `warm_ttl` 만료 후보로 간주 추출.
out_keys.push(k.clone());
out_batches.push(batch);
} else {
// JSON 호환 포맷일 경우 처리 (Fallback)
if let Ok(batch) = crate::sql::interface::json_record_to_batch(v) {
out_keys.push(k.clone());
out_batches.push(batch);
}
}
}
Ok(())
}
}