Skip to main content

scouter_dataframe/parquet/bifrost/
registry.rs

1use crate::error::DatasetEngineError;
2use crate::parquet::tracing::traits::arrow_schema_to_delta;
3use crate::parquet::utils::register_cloud_logstore_factories;
4use crate::storage::ObjectStore;
5use arrow::array::*;
6use arrow::datatypes::*;
7use arrow_array::RecordBatch;
8use chrono::Utc;
9use dashmap::DashMap;
10use datafusion::prelude::*;
11use deltalake::protocol::SaveMode;
12use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
13use scouter_types::dataset::{DatasetNamespace, DatasetRegistration, DatasetStatus};
14use std::sync::Arc;
15use tokio::sync::RwLock as AsyncRwLock;
16use tracing::{debug, info, warn};
17use url::Url;
18
19pub(crate) const REGISTRY_TABLE_NAME: &str = "_scouter_dataset_registry";
20
21fn registry_schema() -> Schema {
22    Schema::new(vec![
23        Field::new("fqn", DataType::Utf8, false),
24        Field::new("catalog", DataType::Utf8, false),
25        Field::new("schema_name", DataType::Utf8, false),
26        Field::new("table_name", DataType::Utf8, false),
27        Field::new("fingerprint", DataType::Utf8, false),
28        Field::new("arrow_schema_json", DataType::Utf8, false),
29        Field::new("json_schema", DataType::Utf8, false),
30        Field::new("partition_columns", DataType::Utf8, false),
31        Field::new(
32            "created_at",
33            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
34            false,
35        ),
36        Field::new(
37            "updated_at",
38            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
39            false,
40        ),
41        Field::new("status", DataType::Utf8, false),
42    ])
43}
44
45fn build_registry_url(object_store: &ObjectStore) -> Result<Url, DatasetEngineError> {
46    let mut base = object_store.get_base_url()?;
47    let mut path = base.path().to_string();
48    if !path.ends_with('/') {
49        path.push('/');
50    }
51    path.push_str("datasets/");
52    path.push_str(REGISTRY_TABLE_NAME);
53    base.set_path(&path);
54    Ok(base)
55}
56
57async fn build_or_create_registry(
58    object_store: &ObjectStore,
59) -> Result<DeltaTable, DatasetEngineError> {
60    register_cloud_logstore_factories();
61    let table_url = build_registry_url(object_store)?;
62
63    // For local filesystem, create dir if needed
64    if table_url.scheme() == "file" {
65        if let Ok(path) = table_url.to_file_path() {
66            if !path.exists() {
67                std::fs::create_dir_all(&path)?;
68            }
69        }
70    }
71
72    // Try to load existing table first
73    let store = object_store.as_dyn_object_store();
74    match DeltaTableBuilder::from_url(table_url.clone())?
75        .with_storage_backend(store.clone(), table_url.clone())
76        .load()
77        .await
78    {
79        Ok(table) => {
80            info!("Loaded existing dataset registry");
81            Ok(table)
82        }
83        Err(_) => {
84            info!("Creating new dataset registry");
85            let schema = registry_schema();
86            let delta_fields = arrow_schema_to_delta(&schema);
87
88            let table = DeltaTableBuilder::from_url(table_url.clone())?
89                .with_storage_backend(store, table_url)
90                .build()?;
91
92            let table = table
93                .create()
94                .with_table_name(REGISTRY_TABLE_NAME)
95                .with_columns(delta_fields)
96                .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
97                .await?;
98
99            Ok(table)
100        }
101    }
102}
103
104fn build_registration_batch(
105    schema: &SchemaRef,
106    reg: &DatasetRegistration,
107) -> Result<RecordBatch, DatasetEngineError> {
108    let now = Utc::now().timestamp_micros();
109    let partition_cols_json = serde_json::to_string(&reg.partition_columns).map_err(|e| {
110        DatasetEngineError::SerializationError(format!(
111            "Failed to serialize partition_columns: {}",
112            e
113        ))
114    })?;
115
116    let batch = RecordBatch::try_new(
117        schema.clone(),
118        vec![
119            Arc::new(StringArray::from(vec![reg.namespace.fqn()])),
120            Arc::new(StringArray::from(vec![reg.namespace.catalog.as_str()])),
121            Arc::new(StringArray::from(vec![reg.namespace.schema_name.as_str()])),
122            Arc::new(StringArray::from(vec![reg.namespace.table.as_str()])),
123            Arc::new(StringArray::from(vec![reg.fingerprint.as_str()])),
124            Arc::new(StringArray::from(vec![reg.arrow_schema_json.as_str()])),
125            Arc::new(StringArray::from(vec![reg.json_schema.as_str()])),
126            Arc::new(StringArray::from(vec![partition_cols_json.as_str()])),
127            Arc::new(TimestampMicrosecondArray::from(vec![now]).with_timezone("UTC")),
128            Arc::new(TimestampMicrosecondArray::from(vec![now]).with_timezone("UTC")),
129            Arc::new(StringArray::from(vec![reg.status.to_string().as_str()])),
130        ],
131    )?;
132
133    Ok(batch)
134}
135
136/// Persistent schema registry backed by a Delta Lake table.
137///
138/// Stores all dataset registrations with a DashMap hot cache for O(1)
139/// lookups on the write path (fingerprint validation).
140pub struct DatasetRegistry {
141    table: Arc<AsyncRwLock<DeltaTable>>,
142    ctx: Arc<SessionContext>,
143    _object_store: ObjectStore,
144    schema: SchemaRef,
145    cache: DashMap<String, DatasetRegistration>,
146}
147
148#[derive(Debug, Clone, PartialEq)]
149pub enum RegistrationResult {
150    Created,
151    AlreadyExists,
152}
153
154impl DatasetRegistry {
155    pub async fn new(object_store: &ObjectStore) -> Result<Self, DatasetEngineError> {
156        let delta_table = build_or_create_registry(object_store).await?;
157        let ctx = object_store.get_session()?;
158        let schema = Arc::new(registry_schema());
159
160        // Register object store bindings first
161        delta_table.update_datafusion_session(&ctx.state())?;
162
163        match delta_table.table_provider().await {
164            Ok(provider) => {
165                ctx.register_table(REGISTRY_TABLE_NAME, provider)?;
166                info!(
167                    "Registry table registered (version: {:?})",
168                    delta_table.version()
169                );
170            }
171            Err(e) => {
172                info!(
173                    "Registry table provider unavailable (likely new/empty): {}",
174                    e
175                );
176            }
177        }
178
179        let registry = Self {
180            table: Arc::new(AsyncRwLock::new(delta_table)),
181            ctx: Arc::new(ctx),
182            _object_store: object_store.clone(),
183            schema,
184            cache: DashMap::new(),
185        };
186
187        registry.load_all().await?;
188
189        Ok(registry)
190    }
191
192    /// Load all registrations from the Delta table into the cache.
193    /// Performs an incremental update first, then repopulates.
194    pub async fn load_all(&self) -> Result<(), DatasetEngineError> {
195        {
196            let mut table_guard = self.table.write().await;
197            let _ = table_guard.update_incremental(None).await;
198        }
199        self.populate_cache().await
200    }
201
202    /// Repopulate the cache from the current table state.
203    /// Assumes the Delta table is already refreshed.
204    async fn populate_cache(&self) -> Result<(), DatasetEngineError> {
205        // Clear stale entries — Delta table is the source of truth
206        self.cache.clear();
207
208        // Re-register the table provider with DataFusion
209        {
210            let table_guard = self.table.read().await;
211            let _ = table_guard.update_datafusion_session(&self.ctx.state());
212            let _ = self.ctx.deregister_table(REGISTRY_TABLE_NAME);
213            match table_guard.table_provider().await {
214                Ok(provider) => {
215                    self.ctx.register_table(REGISTRY_TABLE_NAME, provider)?;
216                }
217                Err(_) => {
218                    // Empty or new table — no data to load
219                    return Ok(());
220                }
221            }
222        }
223
224        let df = match self
225            .ctx
226            .sql(&format!("SELECT * FROM {}", REGISTRY_TABLE_NAME))
227            .await
228        {
229            Ok(df) => df,
230            Err(e) => {
231                info!("Registry query failed (likely empty table): {}", e);
232                return Ok(());
233            }
234        };
235
236        let batches = df.collect().await?;
237
238        for batch in &batches {
239            // session config has schema_force_view_types=true → Utf8 reads back as Utf8View
240            let fqn_col = batch
241                .column_by_name("fqn")
242                .and_then(|c| c.as_string_view_opt());
243            let catalog_col = batch
244                .column_by_name("catalog")
245                .and_then(|c| c.as_string_view_opt());
246            let schema_name_col = batch
247                .column_by_name("schema_name")
248                .and_then(|c| c.as_string_view_opt());
249            let table_name_col = batch
250                .column_by_name("table_name")
251                .and_then(|c| c.as_string_view_opt());
252            let fingerprint_col = batch
253                .column_by_name("fingerprint")
254                .and_then(|c| c.as_string_view_opt());
255            let arrow_schema_col = batch
256                .column_by_name("arrow_schema_json")
257                .and_then(|c| c.as_string_view_opt());
258            let json_schema_col = batch
259                .column_by_name("json_schema")
260                .and_then(|c| c.as_string_view_opt());
261            let partition_col = batch
262                .column_by_name("partition_columns")
263                .and_then(|c| c.as_string_view_opt());
264            let status_col = batch
265                .column_by_name("status")
266                .and_then(|c| c.as_string_view_opt());
267            let created_at_col = batch
268                .column_by_name("created_at")
269                .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>());
270            let updated_at_col = batch
271                .column_by_name("updated_at")
272                .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>());
273
274            let (
275                Some(fqn_col),
276                Some(catalog_col),
277                Some(schema_name_col),
278                Some(table_name_col),
279                Some(fingerprint_col),
280                Some(arrow_schema_col),
281                Some(json_schema_col),
282                Some(partition_col),
283            ) = (
284                fqn_col,
285                catalog_col,
286                schema_name_col,
287                table_name_col,
288                fingerprint_col,
289                arrow_schema_col,
290                json_schema_col,
291                partition_col,
292            )
293            else {
294                warn!("Registry batch missing expected columns — skipping");
295                continue;
296            };
297
298            for i in 0..batch.num_rows() {
299                let fqn = fqn_col.value(i).to_string();
300                let namespace = match DatasetNamespace::new(
301                    catalog_col.value(i),
302                    schema_name_col.value(i),
303                    table_name_col.value(i),
304                ) {
305                    Ok(ns) => ns,
306                    Err(e) => {
307                        warn!("Invalid namespace in registry row {}: {}", i, e);
308                        continue;
309                    }
310                };
311
312                let partition_columns: Vec<String> = match serde_json::from_str(
313                    partition_col.value(i),
314                ) {
315                    Ok(v) => v,
316                    Err(e) => {
317                        warn!(
318                                "Corrupt partition_columns JSON for {}: '{}' — defaulting to empty. Error: {}",
319                                fqn, partition_col.value(i), e
320                            );
321                        vec![]
322                    }
323                };
324
325                let created_at = created_at_col
326                    .and_then(|col| chrono::DateTime::from_timestamp_micros(col.value(i)))
327                    .unwrap_or_else(Utc::now);
328
329                let updated_at = updated_at_col
330                    .and_then(|col| chrono::DateTime::from_timestamp_micros(col.value(i)))
331                    .unwrap_or_else(Utc::now);
332
333                let status = status_col
334                    .and_then(|col| col.value(i).parse::<DatasetStatus>().ok())
335                    .unwrap_or(DatasetStatus::Active);
336
337                let reg = DatasetRegistration {
338                    namespace,
339                    fingerprint: scouter_types::dataset::DatasetFingerprint(
340                        fingerprint_col.value(i).to_string(),
341                    ),
342                    arrow_schema_json: arrow_schema_col.value(i).to_string(),
343                    json_schema: json_schema_col.value(i).to_string(),
344                    partition_columns,
345                    created_at,
346                    updated_at,
347                    status,
348                };
349
350                self.cache.insert(fqn, reg);
351            }
352        }
353
354        info!("Loaded {} registrations from registry", self.cache.len());
355        Ok(())
356    }
357
358    /// Register a dataset. Idempotent:
359    /// - Not found → create → "created"
360    /// - Found + fingerprint match → "already_exists"
361    /// - Found + fingerprint mismatch → error
362    pub async fn register(
363        &self,
364        registration: &DatasetRegistration,
365    ) -> Result<RegistrationResult, DatasetEngineError> {
366        let fqn = registration.namespace.fqn();
367
368        // Check cache first
369        if let Some(existing) = self.cache.get(&fqn) {
370            if existing.fingerprint.as_str() == registration.fingerprint.as_str() {
371                return Ok(RegistrationResult::AlreadyExists);
372            } else {
373                warn!(
374                    table = %fqn,
375                    "Fingerprint mismatch: expected={}, actual={}",
376                    existing.fingerprint.as_str(),
377                    registration.fingerprint.as_str()
378                );
379                return Err(DatasetEngineError::FingerprintMismatch {
380                    table: fqn,
381                    expected: existing.fingerprint.as_str().to_string(),
382                    actual: registration.fingerprint.as_str().to_string(),
383                });
384            }
385        }
386
387        // Write to Delta table
388        let batch = build_registration_batch(&self.schema, registration)?;
389        let mut table_guard = self.table.write().await;
390
391        let updated_table = table_guard
392            .clone()
393            .write(vec![batch])
394            .with_save_mode(SaveMode::Append)
395            .await?;
396
397        let _ = self.ctx.deregister_table(REGISTRY_TABLE_NAME);
398        if let Ok(provider) = updated_table.table_provider().await {
399            self.ctx.register_table(REGISTRY_TABLE_NAME, provider)?;
400        }
401        updated_table.update_datafusion_session(&self.ctx.state())?;
402
403        *table_guard = updated_table;
404
405        // Update cache
406        self.cache.insert(fqn, registration.clone());
407
408        Ok(RegistrationResult::Created)
409    }
410
411    /// O(1) lookup by FQN from cache.
412    pub fn get(&self, fqn: &str) -> Option<DatasetRegistration> {
413        self.cache.get(fqn).map(|r| r.clone())
414    }
415
416    /// Get registration by namespace.
417    pub fn get_by_namespace(&self, namespace: &DatasetNamespace) -> Option<DatasetRegistration> {
418        self.get(&namespace.fqn())
419    }
420
421    /// List all active registrations from cache.
422    pub fn list_active(&self) -> Vec<DatasetRegistration> {
423        self.cache
424            .iter()
425            .filter(|e| matches!(e.value().status, DatasetStatus::Active))
426            .map(|e| e.value().clone())
427            .collect()
428    }
429
430    /// Refresh from Delta table to pick up registrations from other pods.
431    pub async fn refresh(&self) -> Result<(), DatasetEngineError> {
432        {
433            let mut table_guard = self.table.write().await;
434            match table_guard.update_incremental(None).await {
435                Ok(_) => {}
436                Err(e) => {
437                    debug!("Registry refresh skipped: {}", e);
438                    return Ok(());
439                }
440            }
441        }
442        self.populate_cache().await
443    }
444}