Skip to main content

scouter_dataframe/parquet/bifrost/
engine.rs

1use crate::error::DatasetEngineError;
2use crate::parquet::bifrost::catalog::DatasetCatalogProvider;
3use crate::parquet::tracing::traits::arrow_schema_to_delta;
4use crate::parquet::utils::register_cloud_logstore_factories;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Schema, SchemaRef};
7use arrow_array::RecordBatch;
8use datafusion::prelude::SessionContext;
9use deltalake::datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
10use deltalake::datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
11use deltalake::datafusion::parquet::schema::types::ColumnPath;
12use deltalake::operations::optimize::OptimizeType;
13use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
14use scouter_types::dataset::schema::{
15    SCOUTER_BATCH_ID, SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE,
16};
17use scouter_types::dataset::DatasetNamespace;
18use std::sync::Arc;
19use tokio::sync::oneshot;
20use tokio::sync::{mpsc, RwLock as AsyncRwLock};
21use tokio::time::{interval, Duration};
22use tracing::{debug, error, info, instrument};
23use url::Url;
24
25const MIN_VACUUM_RETENTION_HOURS: u64 = 1;
26
27pub enum TableCommand {
28    Write {
29        batches: Vec<RecordBatch>,
30        respond_to: oneshot::Sender<Result<(), DatasetEngineError>>,
31    },
32    Optimize {
33        respond_to: oneshot::Sender<Result<(), DatasetEngineError>>,
34    },
35    Vacuum {
36        retention_hours: u64,
37        respond_to: oneshot::Sender<Result<(), DatasetEngineError>>,
38    },
39    Shutdown,
40}
41
42fn build_table_url(
43    object_store: &ObjectStore,
44    namespace: &DatasetNamespace,
45) -> Result<Url, DatasetEngineError> {
46    let mut base = object_store.get_base_url()?;
47    let mut path = base.path().to_string();
48    if !path.ends_with('/') {
49        path.push('/');
50    }
51    path.push_str(&namespace.storage_path());
52    base.set_path(&path);
53    Ok(base)
54}
55
56/// Attempt to load an existing Delta table or create a new one.
57#[instrument(skip_all, fields(namespace = %namespace.fqn()))]
58async fn build_or_create_table(
59    object_store: &ObjectStore,
60    schema: &Schema,
61    namespace: &DatasetNamespace,
62    partition_columns: &[String],
63) -> Result<DeltaTable, DatasetEngineError> {
64    register_cloud_logstore_factories();
65    let table_url = build_table_url(object_store, namespace)?;
66    info!(
67        "Attempting to load dataset table [{}://.../{} ]",
68        table_url.scheme(),
69        namespace.fqn()
70    );
71
72    // For local filesystem, ensure the directory exists
73    if table_url.scheme() == "file" {
74        if let Ok(path) = table_url.to_file_path() {
75            if !path.exists() {
76                info!("Creating directory for local table: {:?}", path);
77                std::fs::create_dir_all(&path)?;
78            }
79        }
80    }
81
82    // Try a single load attempt
83    let store = object_store.as_dyn_object_store();
84    let load_result = DeltaTableBuilder::from_url(table_url.clone())
85        .map(|builder| builder.with_storage_backend(store, table_url.clone()));
86
87    if let Ok(builder) = load_result {
88        if let Ok(table) = builder.load().await {
89            info!("Loaded existing dataset table [{}]", namespace.fqn());
90            return Ok(table);
91        }
92    }
93
94    // Table doesn't exist yet — create it
95    info!("Creating new dataset table [{}]", namespace.fqn());
96    let store = object_store.as_dyn_object_store();
97    let table = DeltaTableBuilder::from_url(table_url.clone())?
98        .with_storage_backend(store, table_url)
99        .build()?;
100
101    let delta_fields = arrow_schema_to_delta(schema);
102
103    let data_skipping_cols = build_data_skipping_columns(partition_columns);
104
105    let table = table
106        .create()
107        .with_table_name(namespace.fqn())
108        .with_columns(delta_fields)
109        .with_partition_columns(partition_columns.to_vec())
110        .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
111        .with_configuration_property(
112            TableProperty::DataSkippingStatsColumns,
113            Some(&data_skipping_cols),
114        )
115        .await?;
116
117    Ok(table)
118}
119
120fn build_data_skipping_columns(partition_columns: &[String]) -> String {
121    let mut cols = vec![
122        "scouter_created_at".to_string(),
123        SCOUTER_PARTITION_DATE.to_string(),
124    ];
125    for col in partition_columns {
126        if !cols.contains(col) {
127            cols.push(col.clone());
128        }
129    }
130    cols.join(",")
131}
132
133/// Build Parquet writer properties for a dataset with a dynamic schema.
134///
135/// System columns get hardcoded optimizations. User columns ending in `_id`
136/// or `_key` (Utf8/Utf8View) get bloom filters automatically.
137pub fn build_writer_props(schema: &Schema) -> WriterProperties {
138    let mut builder = WriterProperties::builder()
139        .set_max_row_group_size(32_768)
140        .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
141        .set_column_encoding(
142            ColumnPath::new(vec![SCOUTER_CREATED_AT.to_string()]),
143            Encoding::DELTA_BINARY_PACKED,
144        )
145        .set_column_bloom_filter_enabled(ColumnPath::new(vec![SCOUTER_BATCH_ID.to_string()]), true)
146        .set_column_bloom_filter_fpp(ColumnPath::new(vec![SCOUTER_BATCH_ID.to_string()]), 0.01)
147        .set_column_bloom_filter_ndv(ColumnPath::new(vec![SCOUTER_BATCH_ID.to_string()]), 10_000)
148        .set_column_statistics_enabled(
149            ColumnPath::new(vec![SCOUTER_CREATED_AT.to_string()]),
150            EnabledStatistics::Page,
151        );
152
153    for field in schema.fields() {
154        let name = field.name();
155        if (name.ends_with("_id") || name.ends_with("_key"))
156            && matches!(
157                field.data_type(),
158                DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8
159            )
160            && name != SCOUTER_BATCH_ID
161        {
162            builder = builder
163                .set_column_bloom_filter_enabled(ColumnPath::new(vec![name.clone()]), true)
164                .set_column_bloom_filter_fpp(ColumnPath::new(vec![name.clone()]), 0.01)
165                .set_column_bloom_filter_ndv(ColumnPath::new(vec![name.clone()]), 10_000);
166        }
167    }
168
169    builder.build()
170}
171
172/// Per-table dataset engine actor.
173///
174/// Owns a single `DeltaTable` and serializes all writes through an mpsc channel
175/// (single-writer invariant). Follows the `TraceSpanDBEngine` actor pattern.
176pub struct DatasetEngine {
177    schema: SchemaRef,
178    _object_store: ObjectStore,
179    table: Arc<AsyncRwLock<DeltaTable>>,
180    write_ctx: Arc<SessionContext>,
181    namespace: DatasetNamespace,
182    partition_columns: Vec<String>,
183    catalog_provider: Arc<DatasetCatalogProvider>,
184}
185
186impl DatasetEngine {
187    pub async fn new(
188        object_store: &ObjectStore,
189        schema: SchemaRef,
190        namespace: DatasetNamespace,
191        partition_columns: Vec<String>,
192        catalog_provider: Arc<DatasetCatalogProvider>,
193    ) -> Result<Self, DatasetEngineError> {
194        let delta_table =
195            build_or_create_table(object_store, &schema, &namespace, &partition_columns).await?;
196        let write_ctx = object_store.get_session()?;
197
198        // Register table in write context with a simple name (no catalog resolution needed).
199        // The write_ctx is private — only used for the deregister/register cycle after writes.
200        let write_table_name = Self::write_table_name(&namespace);
201        if let Ok(provider) = delta_table.table_provider().await {
202            write_ctx.register_table(&write_table_name, provider)?;
203        } else {
204            info!(
205                "Empty table at init — deferring write_ctx registration until first write [{}]",
206                namespace.fqn()
207            );
208        }
209
210        // Register table in the shared catalog for query access
211        if let Ok(provider) = delta_table.table_provider().await {
212            catalog_provider.swap_table(&namespace, provider);
213        }
214
215        Ok(Self {
216            schema,
217            _object_store: object_store.clone(),
218            table: Arc::new(AsyncRwLock::new(delta_table)),
219            write_ctx: Arc::new(write_ctx),
220            namespace,
221            partition_columns,
222            catalog_provider,
223        })
224    }
225
226    /// Simple table name for the private write_ctx (avoids catalog resolution).
227    fn write_table_name(namespace: &DatasetNamespace) -> String {
228        format!(
229            "_write_{}_{}_{}",
230            namespace.catalog, namespace.schema_name, namespace.table
231        )
232    }
233
234    async fn write_batches(&self, batches: Vec<RecordBatch>) -> Result<(), DatasetEngineError> {
235        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
236        info!(
237            "Engine writing {} batches ({} rows) to [{}]",
238            batches.len(),
239            total_rows,
240            self.namespace.fqn()
241        );
242
243        let mut table_guard = self.table.write().await;
244
245        // Clone before mutation — preserves original state on error
246        let current_table = table_guard.clone();
247
248        let updated_table = current_table
249            .write(batches)
250            .with_save_mode(deltalake::protocol::SaveMode::Append)
251            .with_writer_properties(build_writer_props(&self.schema))
252            .with_partition_columns(self.partition_columns.clone())
253            .await?;
254
255        // Compute the new provider once — reused for both write_ctx and catalog swap.
256        // This avoids a second async call and ensures the write_ctx update cannot
257        // leave the context in a torn state if table_provider() fails.
258        let new_provider = updated_table.table_provider().await?;
259
260        // Update private write context
261        let write_name = Self::write_table_name(&self.namespace);
262        let _ = self.write_ctx.deregister_table(&write_name);
263        self.write_ctx
264            .register_table(&write_name, Arc::clone(&new_provider))?;
265        updated_table.update_datafusion_session(&self.write_ctx.state())?;
266
267        // Update shared catalog — atomic TableProvider swap
268        self.catalog_provider
269            .swap_table(&self.namespace, new_provider);
270
271        *table_guard = updated_table;
272
273        debug!(
274            "Successfully wrote {} rows to [{}]",
275            total_rows,
276            self.namespace.fqn()
277        );
278        Ok(())
279    }
280
281    async fn optimize_table(&self) -> Result<(), DatasetEngineError> {
282        info!("Optimizing dataset table [{}]", self.namespace.fqn());
283        let mut table_guard = self.table.write().await;
284
285        let current_table = table_guard.clone();
286
287        let mut z_order_cols = vec!["scouter_created_at".to_string()];
288        // Add first user partition column (if any beyond scouter_partition_date)
289        for col in &self.partition_columns {
290            if col != "scouter_partition_date" {
291                z_order_cols.push(col.clone());
292                break;
293            }
294        }
295
296        let (updated_table, _metrics) = current_table
297            .optimize()
298            .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
299            .with_type(OptimizeType::ZOrder(z_order_cols))
300            .with_writer_properties(build_writer_props(&self.schema))
301            .await?;
302
303        let write_name = Self::write_table_name(&self.namespace);
304        let _ = self.write_ctx.deregister_table(&write_name);
305        self.write_ctx
306            .register_table(&write_name, updated_table.table_provider().await?)?;
307        updated_table.update_datafusion_session(&self.write_ctx.state())?;
308
309        let provider = updated_table.table_provider().await?;
310        self.catalog_provider.swap_table(&self.namespace, provider);
311
312        *table_guard = updated_table;
313
314        info!("Optimization complete for [{}]", self.namespace.fqn());
315        Ok(())
316    }
317
318    async fn vacuum_table(&self, retention_hours: u64) -> Result<(), DatasetEngineError> {
319        let retention_hours = retention_hours.max(MIN_VACUUM_RETENTION_HOURS);
320        info!(
321            "Vacuuming dataset table [{}] (retention: {}h)",
322            self.namespace.fqn(),
323            retention_hours
324        );
325        let mut table_guard = self.table.write().await;
326
327        let (updated_table, _metrics) = table_guard
328            .clone()
329            .vacuum()
330            .with_retention_period(chrono::Duration::hours(retention_hours as i64))
331            .with_enforce_retention_duration(false)
332            .await?;
333
334        let write_name = Self::write_table_name(&self.namespace);
335        let _ = self.write_ctx.deregister_table(&write_name);
336        self.write_ctx
337            .register_table(&write_name, updated_table.table_provider().await?)?;
338        updated_table.update_datafusion_session(&self.write_ctx.state())?;
339
340        let provider = updated_table.table_provider().await?;
341        self.catalog_provider.swap_table(&self.namespace, provider);
342
343        *table_guard = updated_table;
344
345        info!(
346            "Vacuum complete for [{}] (retention: {}h)",
347            self.namespace.fqn(),
348            retention_hours
349        );
350        Ok(())
351    }
352
353    async fn refresh_table(&self) -> Result<(), DatasetEngineError> {
354        let mut table_guard = self.table.write().await;
355        let current_version = table_guard.version();
356        let mut refreshed = table_guard.clone();
357
358        match refreshed.update_incremental(None).await {
359            Ok(_) => {
360                if refreshed.version() > current_version {
361                    debug!(
362                        "Refreshed [{}]: v{:?} → v{:?}",
363                        self.namespace.fqn(),
364                        current_version,
365                        refreshed.version()
366                    );
367
368                    // Compute provider first — only deregister/re-register if it succeeds,
369                    // so the write_ctx is never left in a torn (no table registered) state.
370                    if let Ok(new_provider) = refreshed.table_provider().await {
371                        let write_name = Self::write_table_name(&self.namespace);
372                        let _ = self.write_ctx.deregister_table(&write_name);
373                        self.write_ctx
374                            .register_table(&write_name, Arc::clone(&new_provider))?;
375                        refreshed.update_datafusion_session(&self.write_ctx.state())?;
376                        self.catalog_provider
377                            .swap_table(&self.namespace, new_provider);
378                        *table_guard = refreshed;
379                    }
380                }
381            }
382            Err(e) => {
383                debug!("Refresh skipped for [{}]: {}", self.namespace.fqn(), e);
384            }
385        }
386
387        Ok(())
388    }
389
390    /// Start the actor loop. Returns the command channel sender and join handle.
391    #[instrument(skip_all, name = "dataset_engine_actor", fields(fqn = %self.namespace.fqn()))]
392    pub fn start_actor(
393        self,
394        refresh_interval_secs: u64,
395    ) -> (mpsc::Sender<TableCommand>, tokio::task::JoinHandle<()>) {
396        let (tx, mut rx) = mpsc::channel::<TableCommand>(50);
397
398        let handle = tokio::spawn(async move {
399            // Clamp to 1s minimum — tokio::time::interval panics on Duration::ZERO.
400            let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs.max(1)));
401            refresh_ticker.tick().await; // skip immediate
402
403            loop {
404                tokio::select! {
405                    Some(cmd) = rx.recv() => {
406                        match cmd {
407                            TableCommand::Write { batches, respond_to } => {
408                                let result = self.write_batches(batches).await;
409                                if let Err(ref e) = result {
410                                    error!("Write failed for [{}]: {}", self.namespace.fqn(), e);
411                                }
412                                let _ = respond_to.send(result);
413                            }
414                            TableCommand::Optimize { respond_to } => {
415                                let _ = respond_to.send(self.optimize_table().await);
416                                if let Err(e) = self.vacuum_table(MIN_VACUUM_RETENTION_HOURS).await {
417                                    error!("Post-optimize vacuum failed for [{}]: {}", self.namespace.fqn(), e);
418                                }
419                            }
420                            TableCommand::Vacuum { retention_hours, respond_to } => {
421                                let _ = respond_to.send(self.vacuum_table(retention_hours).await);
422                            }
423                            TableCommand::Shutdown => {
424                                info!("Shutting down dataset engine [{}]", self.namespace.fqn());
425                                break;
426                            }
427                        }
428                    }
429                    _ = refresh_ticker.tick() => {
430                        if let Err(e) = self.refresh_table().await {
431                            error!("Table refresh failed for [{}]: {}", self.namespace.fqn(), e);
432                        }
433                    }
434                }
435            }
436        });
437
438        (tx, handle)
439    }
440}