dbx_core/engine/
lifecycle.rs1use crate::engine::Database;
2use crate::error::DbxResult;
3use crate::storage::parquet_io::ParquetWriter;
4use arrow::record_batch::RecordBatch;
5use std::sync::Arc;
6use std::sync::atomic::Ordering;
7use std::thread;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9use tracing::info;
10
11pub struct LifecycleWorker {
16 db: Arc<Database>,
17 interval: Duration,
18 throttle_sleep: Duration,
19}
20
21impl LifecycleWorker {
22 pub fn new(db: Arc<Database>, interval: Duration, throttle_sleep: Duration) -> Self {
27 Self {
28 db,
29 interval,
30 throttle_sleep,
31 }
32 }
33
34 pub fn start(self) {
36 if self.db.lifecycle_running.swap(true, Ordering::SeqCst) {
38 return;
39 }
40
41 let db_ref = Arc::clone(&self.db);
42 thread::spawn(move || {
43 loop {
44 if db_ref.lifecycle_stop_flag.load(Ordering::SeqCst) {
46 break;
47 }
48
49 thread::sleep(self.interval);
50
51 if let Err(e) = self.run_tiering_cycle() {
52 let _ = e;
54 }
55 }
56 db_ref.lifecycle_running.store(false, Ordering::SeqCst);
57 });
58 }
59
60 fn run_tiering_cycle(&self) -> DbxResult<()> {
62 let schemas = { self.db.table_schemas.read().unwrap().clone() };
63
64 for (table_name, schema) in schemas {
65 if let Some(policy_json) = schema.metadata().get("dbx_table_policy")
67 && let Ok(policy) = crate::engine::policy::TablePolicy::from_json(policy_json)
68 {
69 self.apply_policy(&table_name, &policy)?;
70 }
71 }
72 Ok(())
73 }
74
75 fn apply_policy(
77 &self,
78 table_name: &str,
79 policy: &crate::engine::policy::TablePolicy,
80 ) -> DbxResult<()> {
81 let now = SystemTime::now()
82 .duration_since(UNIX_EPOCH)
83 .unwrap()
84 .as_secs();
85
86 info!(
87 "[LifecycleWorker] Evaluating policy for table '{}'",
88 table_name
89 );
90
91 let rows = self.db.scan(table_name).unwrap_or_default();
94 if rows.is_empty() {
95 return Ok(());
96 }
97
98 let hot_ttl = policy.hot_ttl_days.unwrap_or(0) as u64;
102 let mut expired_keys = Vec::new();
103 let mut expired_batches = Vec::new();
104
105 self.filter_by_timestamp(
106 table_name,
107 &rows,
108 hot_ttl,
109 now,
110 &mut expired_keys,
111 &mut expired_batches,
112 )?;
113
114 if expired_batches.is_empty() {
115 return Ok(());
116 }
117
118 info!(
119 "[LifecycleWorker] {} expired batches found. Rendering Parquet segments...",
120 expired_batches.len()
121 );
122
123 let ros_dir = self.db.storage_manager.ros_dir()?;
125 let partition_file = ros_dir.join(format!("{}_{}.parquet", table_name, now));
126
127 let partition_id = format!("{}_{}", table_name, now);
129 ParquetWriter::write_batches(&partition_file, &expired_batches)?;
130 info!(
131 "[LifecycleWorker] Rendered ROS segment: {:?}",
132 partition_file
133 );
134
135 let row_count = expired_batches.iter().map(|b| b.num_rows()).sum();
137 self.db.metadata_registry.update_partition(
138 table_name,
139 partition_id,
140 crate::storage::metadata::PartitionMeta {
141 min_key: expired_keys.first().cloned().unwrap_or_default(),
142 max_key: expired_keys.last().cloned().unwrap_or_default(),
143 file_path: partition_file.to_string_lossy().to_string(),
144 row_count,
145 tier: crate::storage::metadata::StorageTier::DiskROS,
146 node_addr: None, },
148 );
149
150 for key in expired_keys {
152 let _ = self.db.delete(table_name, &key);
153 if self.throttle_sleep > std::time::Duration::ZERO {
155 std::thread::sleep(self.throttle_sleep);
156 }
157 }
158
159 Ok(())
160 }
161
162 fn filter_by_timestamp(
164 &self,
165 _table_name: &str,
166 rows: &[(Vec<u8>, Vec<u8>)],
167 _hot_ttl_days: u64,
168 _now: u64,
169 out_keys: &mut Vec<Vec<u8>>,
170 out_batches: &mut Vec<RecordBatch>,
171 ) -> DbxResult<()> {
172 for (k, v) in rows {
175 if let Ok(batch) = crate::storage::arrow_ipc::read_ipc_batch(v) {
176 out_keys.push(k.clone());
179 out_batches.push(batch);
180 } else {
181 if let Ok(batch) = crate::sql::interface::json_record_to_batch(v) {
183 out_keys.push(k.clone());
184 out_batches.push(batch);
185 }
186 }
187 }
188 Ok(())
189 }
190}