scouter_dataframe/parquet/bifrost/
engine.rs1use crate::error::DatasetEngineError;
2use crate::parquet::bifrost::catalog::DatasetCatalogProvider;
3use crate::parquet::tracing::traits::arrow_schema_to_delta;
4use crate::parquet::utils::register_cloud_logstore_factories;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Schema, SchemaRef};
7use arrow_array::RecordBatch;
8use datafusion::prelude::SessionContext;
9use deltalake::datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
10use deltalake::datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
11use deltalake::datafusion::parquet::schema::types::ColumnPath;
12use deltalake::operations::optimize::OptimizeType;
13use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
14use scouter_types::dataset::schema::{
15 SCOUTER_BATCH_ID, SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE,
16};
17use scouter_types::dataset::DatasetNamespace;
18use std::sync::Arc;
19use tokio::sync::oneshot;
20use tokio::sync::{mpsc, RwLock as AsyncRwLock};
21use tokio::time::{interval, Duration};
22use tracing::{debug, error, info, instrument};
23use url::Url;
24
25const MIN_VACUUM_RETENTION_HOURS: u64 = 1;
26
27pub enum TableCommand {
28 Write {
29 batches: Vec<RecordBatch>,
30 respond_to: oneshot::Sender<Result<(), DatasetEngineError>>,
31 },
32 Optimize {
33 respond_to: oneshot::Sender<Result<(), DatasetEngineError>>,
34 },
35 Vacuum {
36 retention_hours: u64,
37 respond_to: oneshot::Sender<Result<(), DatasetEngineError>>,
38 },
39 Shutdown,
40}
41
42fn build_table_url(
43 object_store: &ObjectStore,
44 namespace: &DatasetNamespace,
45) -> Result<Url, DatasetEngineError> {
46 let mut base = object_store.get_base_url()?;
47 let mut path = base.path().to_string();
48 if !path.ends_with('/') {
49 path.push('/');
50 }
51 path.push_str(&namespace.storage_path());
52 base.set_path(&path);
53 Ok(base)
54}
55
56#[instrument(skip_all, fields(namespace = %namespace.fqn()))]
58async fn build_or_create_table(
59 object_store: &ObjectStore,
60 schema: &Schema,
61 namespace: &DatasetNamespace,
62 partition_columns: &[String],
63) -> Result<DeltaTable, DatasetEngineError> {
64 register_cloud_logstore_factories();
65 let table_url = build_table_url(object_store, namespace)?;
66 info!(
67 "Attempting to load dataset table [{}://.../{} ]",
68 table_url.scheme(),
69 namespace.fqn()
70 );
71
72 if table_url.scheme() == "file" {
74 if let Ok(path) = table_url.to_file_path() {
75 if !path.exists() {
76 info!("Creating directory for local table: {:?}", path);
77 std::fs::create_dir_all(&path)?;
78 }
79 }
80 }
81
82 let store = object_store.as_dyn_object_store();
84 let load_result = DeltaTableBuilder::from_url(table_url.clone())
85 .map(|builder| builder.with_storage_backend(store, table_url.clone()));
86
87 if let Ok(builder) = load_result {
88 if let Ok(table) = builder.load().await {
89 info!("Loaded existing dataset table [{}]", namespace.fqn());
90 return Ok(table);
91 }
92 }
93
94 info!("Creating new dataset table [{}]", namespace.fqn());
96 let store = object_store.as_dyn_object_store();
97 let table = DeltaTableBuilder::from_url(table_url.clone())?
98 .with_storage_backend(store, table_url)
99 .build()?;
100
101 let delta_fields = arrow_schema_to_delta(schema);
102
103 let data_skipping_cols = build_data_skipping_columns(partition_columns);
104
105 let table = table
106 .create()
107 .with_table_name(namespace.fqn())
108 .with_columns(delta_fields)
109 .with_partition_columns(partition_columns.to_vec())
110 .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
111 .with_configuration_property(
112 TableProperty::DataSkippingStatsColumns,
113 Some(&data_skipping_cols),
114 )
115 .await?;
116
117 Ok(table)
118}
119
120fn build_data_skipping_columns(partition_columns: &[String]) -> String {
121 let mut cols = vec![
122 "scouter_created_at".to_string(),
123 SCOUTER_PARTITION_DATE.to_string(),
124 ];
125 for col in partition_columns {
126 if !cols.contains(col) {
127 cols.push(col.clone());
128 }
129 }
130 cols.join(",")
131}
132
133pub fn build_writer_props(schema: &Schema) -> WriterProperties {
138 let mut builder = WriterProperties::builder()
139 .set_max_row_group_size(32_768)
140 .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
141 .set_column_encoding(
142 ColumnPath::new(vec![SCOUTER_CREATED_AT.to_string()]),
143 Encoding::DELTA_BINARY_PACKED,
144 )
145 .set_column_bloom_filter_enabled(ColumnPath::new(vec![SCOUTER_BATCH_ID.to_string()]), true)
146 .set_column_bloom_filter_fpp(ColumnPath::new(vec![SCOUTER_BATCH_ID.to_string()]), 0.01)
147 .set_column_bloom_filter_ndv(ColumnPath::new(vec![SCOUTER_BATCH_ID.to_string()]), 10_000)
148 .set_column_statistics_enabled(
149 ColumnPath::new(vec![SCOUTER_CREATED_AT.to_string()]),
150 EnabledStatistics::Page,
151 );
152
153 for field in schema.fields() {
154 let name = field.name();
155 if (name.ends_with("_id") || name.ends_with("_key"))
156 && matches!(
157 field.data_type(),
158 DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8
159 )
160 && name != SCOUTER_BATCH_ID
161 {
162 builder = builder
163 .set_column_bloom_filter_enabled(ColumnPath::new(vec![name.clone()]), true)
164 .set_column_bloom_filter_fpp(ColumnPath::new(vec![name.clone()]), 0.01)
165 .set_column_bloom_filter_ndv(ColumnPath::new(vec![name.clone()]), 10_000);
166 }
167 }
168
169 builder.build()
170}
171
172pub struct DatasetEngine {
177 schema: SchemaRef,
178 _object_store: ObjectStore,
179 table: Arc<AsyncRwLock<DeltaTable>>,
180 write_ctx: Arc<SessionContext>,
181 namespace: DatasetNamespace,
182 partition_columns: Vec<String>,
183 catalog_provider: Arc<DatasetCatalogProvider>,
184}
185
186impl DatasetEngine {
187 pub async fn new(
188 object_store: &ObjectStore,
189 schema: SchemaRef,
190 namespace: DatasetNamespace,
191 partition_columns: Vec<String>,
192 catalog_provider: Arc<DatasetCatalogProvider>,
193 ) -> Result<Self, DatasetEngineError> {
194 let delta_table =
195 build_or_create_table(object_store, &schema, &namespace, &partition_columns).await?;
196 let write_ctx = object_store.get_session()?;
197
198 let write_table_name = Self::write_table_name(&namespace);
201 if let Ok(provider) = delta_table.table_provider().await {
202 write_ctx.register_table(&write_table_name, provider)?;
203 } else {
204 info!(
205 "Empty table at init — deferring write_ctx registration until first write [{}]",
206 namespace.fqn()
207 );
208 }
209
210 if let Ok(provider) = delta_table.table_provider().await {
212 catalog_provider.swap_table(&namespace, provider);
213 }
214
215 Ok(Self {
216 schema,
217 _object_store: object_store.clone(),
218 table: Arc::new(AsyncRwLock::new(delta_table)),
219 write_ctx: Arc::new(write_ctx),
220 namespace,
221 partition_columns,
222 catalog_provider,
223 })
224 }
225
226 fn write_table_name(namespace: &DatasetNamespace) -> String {
228 format!(
229 "_write_{}_{}_{}",
230 namespace.catalog, namespace.schema_name, namespace.table
231 )
232 }
233
234 async fn write_batches(&self, batches: Vec<RecordBatch>) -> Result<(), DatasetEngineError> {
235 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
236 info!(
237 "Engine writing {} batches ({} rows) to [{}]",
238 batches.len(),
239 total_rows,
240 self.namespace.fqn()
241 );
242
243 let mut table_guard = self.table.write().await;
244
245 let current_table = table_guard.clone();
247
248 let updated_table = current_table
249 .write(batches)
250 .with_save_mode(deltalake::protocol::SaveMode::Append)
251 .with_writer_properties(build_writer_props(&self.schema))
252 .with_partition_columns(self.partition_columns.clone())
253 .await?;
254
255 let new_provider = updated_table.table_provider().await?;
259
260 let write_name = Self::write_table_name(&self.namespace);
262 let _ = self.write_ctx.deregister_table(&write_name);
263 self.write_ctx
264 .register_table(&write_name, Arc::clone(&new_provider))?;
265 updated_table.update_datafusion_session(&self.write_ctx.state())?;
266
267 self.catalog_provider
269 .swap_table(&self.namespace, new_provider);
270
271 *table_guard = updated_table;
272
273 debug!(
274 "Successfully wrote {} rows to [{}]",
275 total_rows,
276 self.namespace.fqn()
277 );
278 Ok(())
279 }
280
281 async fn optimize_table(&self) -> Result<(), DatasetEngineError> {
282 info!("Optimizing dataset table [{}]", self.namespace.fqn());
283 let mut table_guard = self.table.write().await;
284
285 let current_table = table_guard.clone();
286
287 let mut z_order_cols = vec!["scouter_created_at".to_string()];
288 for col in &self.partition_columns {
290 if col != "scouter_partition_date" {
291 z_order_cols.push(col.clone());
292 break;
293 }
294 }
295
296 let (updated_table, _metrics) = current_table
297 .optimize()
298 .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
299 .with_type(OptimizeType::ZOrder(z_order_cols))
300 .with_writer_properties(build_writer_props(&self.schema))
301 .await?;
302
303 let write_name = Self::write_table_name(&self.namespace);
304 let _ = self.write_ctx.deregister_table(&write_name);
305 self.write_ctx
306 .register_table(&write_name, updated_table.table_provider().await?)?;
307 updated_table.update_datafusion_session(&self.write_ctx.state())?;
308
309 let provider = updated_table.table_provider().await?;
310 self.catalog_provider.swap_table(&self.namespace, provider);
311
312 *table_guard = updated_table;
313
314 info!("Optimization complete for [{}]", self.namespace.fqn());
315 Ok(())
316 }
317
318 async fn vacuum_table(&self, retention_hours: u64) -> Result<(), DatasetEngineError> {
319 let retention_hours = retention_hours.max(MIN_VACUUM_RETENTION_HOURS);
320 info!(
321 "Vacuuming dataset table [{}] (retention: {}h)",
322 self.namespace.fqn(),
323 retention_hours
324 );
325 let mut table_guard = self.table.write().await;
326
327 let (updated_table, _metrics) = table_guard
328 .clone()
329 .vacuum()
330 .with_retention_period(chrono::Duration::hours(retention_hours as i64))
331 .with_enforce_retention_duration(false)
332 .await?;
333
334 let write_name = Self::write_table_name(&self.namespace);
335 let _ = self.write_ctx.deregister_table(&write_name);
336 self.write_ctx
337 .register_table(&write_name, updated_table.table_provider().await?)?;
338 updated_table.update_datafusion_session(&self.write_ctx.state())?;
339
340 let provider = updated_table.table_provider().await?;
341 self.catalog_provider.swap_table(&self.namespace, provider);
342
343 *table_guard = updated_table;
344
345 info!(
346 "Vacuum complete for [{}] (retention: {}h)",
347 self.namespace.fqn(),
348 retention_hours
349 );
350 Ok(())
351 }
352
353 async fn refresh_table(&self) -> Result<(), DatasetEngineError> {
354 let mut table_guard = self.table.write().await;
355 let current_version = table_guard.version();
356 let mut refreshed = table_guard.clone();
357
358 match refreshed.update_incremental(None).await {
359 Ok(_) => {
360 if refreshed.version() > current_version {
361 debug!(
362 "Refreshed [{}]: v{:?} → v{:?}",
363 self.namespace.fqn(),
364 current_version,
365 refreshed.version()
366 );
367
368 if let Ok(new_provider) = refreshed.table_provider().await {
371 let write_name = Self::write_table_name(&self.namespace);
372 let _ = self.write_ctx.deregister_table(&write_name);
373 self.write_ctx
374 .register_table(&write_name, Arc::clone(&new_provider))?;
375 refreshed.update_datafusion_session(&self.write_ctx.state())?;
376 self.catalog_provider
377 .swap_table(&self.namespace, new_provider);
378 *table_guard = refreshed;
379 }
380 }
381 }
382 Err(e) => {
383 debug!("Refresh skipped for [{}]: {}", self.namespace.fqn(), e);
384 }
385 }
386
387 Ok(())
388 }
389
390 #[instrument(skip_all, name = "dataset_engine_actor", fields(fqn = %self.namespace.fqn()))]
392 pub fn start_actor(
393 self,
394 refresh_interval_secs: u64,
395 ) -> (mpsc::Sender<TableCommand>, tokio::task::JoinHandle<()>) {
396 let (tx, mut rx) = mpsc::channel::<TableCommand>(50);
397
398 let handle = tokio::spawn(async move {
399 let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs.max(1)));
401 refresh_ticker.tick().await; loop {
404 tokio::select! {
405 Some(cmd) = rx.recv() => {
406 match cmd {
407 TableCommand::Write { batches, respond_to } => {
408 let result = self.write_batches(batches).await;
409 if let Err(ref e) = result {
410 error!("Write failed for [{}]: {}", self.namespace.fqn(), e);
411 }
412 let _ = respond_to.send(result);
413 }
414 TableCommand::Optimize { respond_to } => {
415 let _ = respond_to.send(self.optimize_table().await);
416 if let Err(e) = self.vacuum_table(MIN_VACUUM_RETENTION_HOURS).await {
417 error!("Post-optimize vacuum failed for [{}]: {}", self.namespace.fqn(), e);
418 }
419 }
420 TableCommand::Vacuum { retention_hours, respond_to } => {
421 let _ = respond_to.send(self.vacuum_table(retention_hours).await);
422 }
423 TableCommand::Shutdown => {
424 info!("Shutting down dataset engine [{}]", self.namespace.fqn());
425 break;
426 }
427 }
428 }
429 _ = refresh_ticker.tick() => {
430 if let Err(e) = self.refresh_table().await {
431 error!("Table refresh failed for [{}]: {}", self.namespace.fqn(), e);
432 }
433 }
434 }
435 }
436 });
437
438 (tx, handle)
439 }
440}