scouter_dataframe/parquet/bifrost/
registry.rs1use crate::error::DatasetEngineError;
2use crate::parquet::tracing::traits::arrow_schema_to_delta;
3use crate::parquet::utils::register_cloud_logstore_factories;
4use crate::storage::ObjectStore;
5use arrow::array::*;
6use arrow::datatypes::*;
7use arrow_array::RecordBatch;
8use chrono::Utc;
9use dashmap::DashMap;
10use datafusion::prelude::*;
11use deltalake::protocol::SaveMode;
12use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
13use scouter_types::dataset::{DatasetNamespace, DatasetRegistration, DatasetStatus};
14use std::sync::Arc;
15use tokio::sync::RwLock as AsyncRwLock;
16use tracing::{debug, info, warn};
17use url::Url;
18
19pub(crate) const REGISTRY_TABLE_NAME: &str = "_scouter_dataset_registry";
20
21fn registry_schema() -> Schema {
22 Schema::new(vec![
23 Field::new("fqn", DataType::Utf8, false),
24 Field::new("catalog", DataType::Utf8, false),
25 Field::new("schema_name", DataType::Utf8, false),
26 Field::new("table_name", DataType::Utf8, false),
27 Field::new("fingerprint", DataType::Utf8, false),
28 Field::new("arrow_schema_json", DataType::Utf8, false),
29 Field::new("json_schema", DataType::Utf8, false),
30 Field::new("partition_columns", DataType::Utf8, false),
31 Field::new(
32 "created_at",
33 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
34 false,
35 ),
36 Field::new(
37 "updated_at",
38 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
39 false,
40 ),
41 Field::new("status", DataType::Utf8, false),
42 ])
43}
44
45fn build_registry_url(object_store: &ObjectStore) -> Result<Url, DatasetEngineError> {
46 let mut base = object_store.get_base_url()?;
47 let mut path = base.path().to_string();
48 if !path.ends_with('/') {
49 path.push('/');
50 }
51 path.push_str("datasets/");
52 path.push_str(REGISTRY_TABLE_NAME);
53 base.set_path(&path);
54 Ok(base)
55}
56
57async fn build_or_create_registry(
58 object_store: &ObjectStore,
59) -> Result<DeltaTable, DatasetEngineError> {
60 register_cloud_logstore_factories();
61 let table_url = build_registry_url(object_store)?;
62
63 if table_url.scheme() == "file" {
65 if let Ok(path) = table_url.to_file_path() {
66 if !path.exists() {
67 std::fs::create_dir_all(&path)?;
68 }
69 }
70 }
71
72 let store = object_store.as_dyn_object_store();
74 match DeltaTableBuilder::from_url(table_url.clone())?
75 .with_storage_backend(store.clone(), table_url.clone())
76 .load()
77 .await
78 {
79 Ok(table) => {
80 info!("Loaded existing dataset registry");
81 Ok(table)
82 }
83 Err(_) => {
84 info!("Creating new dataset registry");
85 let schema = registry_schema();
86 let delta_fields = arrow_schema_to_delta(&schema);
87
88 let table = DeltaTableBuilder::from_url(table_url.clone())?
89 .with_storage_backend(store, table_url)
90 .build()?;
91
92 let table = table
93 .create()
94 .with_table_name(REGISTRY_TABLE_NAME)
95 .with_columns(delta_fields)
96 .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
97 .await?;
98
99 Ok(table)
100 }
101 }
102}
103
104fn build_registration_batch(
105 schema: &SchemaRef,
106 reg: &DatasetRegistration,
107) -> Result<RecordBatch, DatasetEngineError> {
108 let now = Utc::now().timestamp_micros();
109 let partition_cols_json = serde_json::to_string(®.partition_columns).map_err(|e| {
110 DatasetEngineError::SerializationError(format!(
111 "Failed to serialize partition_columns: {}",
112 e
113 ))
114 })?;
115
116 let batch = RecordBatch::try_new(
117 schema.clone(),
118 vec![
119 Arc::new(StringArray::from(vec![reg.namespace.fqn()])),
120 Arc::new(StringArray::from(vec![reg.namespace.catalog.as_str()])),
121 Arc::new(StringArray::from(vec![reg.namespace.schema_name.as_str()])),
122 Arc::new(StringArray::from(vec![reg.namespace.table.as_str()])),
123 Arc::new(StringArray::from(vec![reg.fingerprint.as_str()])),
124 Arc::new(StringArray::from(vec![reg.arrow_schema_json.as_str()])),
125 Arc::new(StringArray::from(vec![reg.json_schema.as_str()])),
126 Arc::new(StringArray::from(vec![partition_cols_json.as_str()])),
127 Arc::new(TimestampMicrosecondArray::from(vec![now]).with_timezone("UTC")),
128 Arc::new(TimestampMicrosecondArray::from(vec![now]).with_timezone("UTC")),
129 Arc::new(StringArray::from(vec![reg.status.to_string().as_str()])),
130 ],
131 )?;
132
133 Ok(batch)
134}
135
136pub struct DatasetRegistry {
141 table: Arc<AsyncRwLock<DeltaTable>>,
142 ctx: Arc<SessionContext>,
143 _object_store: ObjectStore,
144 schema: SchemaRef,
145 cache: DashMap<String, DatasetRegistration>,
146}
147
148#[derive(Debug, Clone, PartialEq)]
149pub enum RegistrationResult {
150 Created,
151 AlreadyExists,
152}
153
154impl DatasetRegistry {
155 pub async fn new(object_store: &ObjectStore) -> Result<Self, DatasetEngineError> {
156 let delta_table = build_or_create_registry(object_store).await?;
157 let ctx = object_store.get_session()?;
158 let schema = Arc::new(registry_schema());
159
160 delta_table.update_datafusion_session(&ctx.state())?;
162
163 match delta_table.table_provider().await {
164 Ok(provider) => {
165 ctx.register_table(REGISTRY_TABLE_NAME, provider)?;
166 info!(
167 "Registry table registered (version: {:?})",
168 delta_table.version()
169 );
170 }
171 Err(e) => {
172 info!(
173 "Registry table provider unavailable (likely new/empty): {}",
174 e
175 );
176 }
177 }
178
179 let registry = Self {
180 table: Arc::new(AsyncRwLock::new(delta_table)),
181 ctx: Arc::new(ctx),
182 _object_store: object_store.clone(),
183 schema,
184 cache: DashMap::new(),
185 };
186
187 registry.load_all().await?;
188
189 Ok(registry)
190 }
191
192 pub async fn load_all(&self) -> Result<(), DatasetEngineError> {
195 {
196 let mut table_guard = self.table.write().await;
197 let _ = table_guard.update_incremental(None).await;
198 }
199 self.populate_cache().await
200 }
201
202 async fn populate_cache(&self) -> Result<(), DatasetEngineError> {
205 self.cache.clear();
207
208 {
210 let table_guard = self.table.read().await;
211 let _ = table_guard.update_datafusion_session(&self.ctx.state());
212 let _ = self.ctx.deregister_table(REGISTRY_TABLE_NAME);
213 match table_guard.table_provider().await {
214 Ok(provider) => {
215 self.ctx.register_table(REGISTRY_TABLE_NAME, provider)?;
216 }
217 Err(_) => {
218 return Ok(());
220 }
221 }
222 }
223
224 let df = match self
225 .ctx
226 .sql(&format!("SELECT * FROM {}", REGISTRY_TABLE_NAME))
227 .await
228 {
229 Ok(df) => df,
230 Err(e) => {
231 info!("Registry query failed (likely empty table): {}", e);
232 return Ok(());
233 }
234 };
235
236 let batches = df.collect().await?;
237
238 for batch in &batches {
239 let fqn_col = batch
241 .column_by_name("fqn")
242 .and_then(|c| c.as_string_view_opt());
243 let catalog_col = batch
244 .column_by_name("catalog")
245 .and_then(|c| c.as_string_view_opt());
246 let schema_name_col = batch
247 .column_by_name("schema_name")
248 .and_then(|c| c.as_string_view_opt());
249 let table_name_col = batch
250 .column_by_name("table_name")
251 .and_then(|c| c.as_string_view_opt());
252 let fingerprint_col = batch
253 .column_by_name("fingerprint")
254 .and_then(|c| c.as_string_view_opt());
255 let arrow_schema_col = batch
256 .column_by_name("arrow_schema_json")
257 .and_then(|c| c.as_string_view_opt());
258 let json_schema_col = batch
259 .column_by_name("json_schema")
260 .and_then(|c| c.as_string_view_opt());
261 let partition_col = batch
262 .column_by_name("partition_columns")
263 .and_then(|c| c.as_string_view_opt());
264 let status_col = batch
265 .column_by_name("status")
266 .and_then(|c| c.as_string_view_opt());
267 let created_at_col = batch
268 .column_by_name("created_at")
269 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>());
270 let updated_at_col = batch
271 .column_by_name("updated_at")
272 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>());
273
274 let (
275 Some(fqn_col),
276 Some(catalog_col),
277 Some(schema_name_col),
278 Some(table_name_col),
279 Some(fingerprint_col),
280 Some(arrow_schema_col),
281 Some(json_schema_col),
282 Some(partition_col),
283 ) = (
284 fqn_col,
285 catalog_col,
286 schema_name_col,
287 table_name_col,
288 fingerprint_col,
289 arrow_schema_col,
290 json_schema_col,
291 partition_col,
292 )
293 else {
294 warn!("Registry batch missing expected columns — skipping");
295 continue;
296 };
297
298 for i in 0..batch.num_rows() {
299 let fqn = fqn_col.value(i).to_string();
300 let namespace = match DatasetNamespace::new(
301 catalog_col.value(i),
302 schema_name_col.value(i),
303 table_name_col.value(i),
304 ) {
305 Ok(ns) => ns,
306 Err(e) => {
307 warn!("Invalid namespace in registry row {}: {}", i, e);
308 continue;
309 }
310 };
311
312 let partition_columns: Vec<String> = match serde_json::from_str(
313 partition_col.value(i),
314 ) {
315 Ok(v) => v,
316 Err(e) => {
317 warn!(
318 "Corrupt partition_columns JSON for {}: '{}' — defaulting to empty. Error: {}",
319 fqn, partition_col.value(i), e
320 );
321 vec![]
322 }
323 };
324
325 let created_at = created_at_col
326 .and_then(|col| chrono::DateTime::from_timestamp_micros(col.value(i)))
327 .unwrap_or_else(Utc::now);
328
329 let updated_at = updated_at_col
330 .and_then(|col| chrono::DateTime::from_timestamp_micros(col.value(i)))
331 .unwrap_or_else(Utc::now);
332
333 let status = status_col
334 .and_then(|col| col.value(i).parse::<DatasetStatus>().ok())
335 .unwrap_or(DatasetStatus::Active);
336
337 let reg = DatasetRegistration {
338 namespace,
339 fingerprint: scouter_types::dataset::DatasetFingerprint(
340 fingerprint_col.value(i).to_string(),
341 ),
342 arrow_schema_json: arrow_schema_col.value(i).to_string(),
343 json_schema: json_schema_col.value(i).to_string(),
344 partition_columns,
345 created_at,
346 updated_at,
347 status,
348 };
349
350 self.cache.insert(fqn, reg);
351 }
352 }
353
354 info!("Loaded {} registrations from registry", self.cache.len());
355 Ok(())
356 }
357
358 pub async fn register(
363 &self,
364 registration: &DatasetRegistration,
365 ) -> Result<RegistrationResult, DatasetEngineError> {
366 let fqn = registration.namespace.fqn();
367
368 if let Some(existing) = self.cache.get(&fqn) {
370 if existing.fingerprint.as_str() == registration.fingerprint.as_str() {
371 return Ok(RegistrationResult::AlreadyExists);
372 } else {
373 warn!(
374 table = %fqn,
375 "Fingerprint mismatch: expected={}, actual={}",
376 existing.fingerprint.as_str(),
377 registration.fingerprint.as_str()
378 );
379 return Err(DatasetEngineError::FingerprintMismatch {
380 table: fqn,
381 expected: existing.fingerprint.as_str().to_string(),
382 actual: registration.fingerprint.as_str().to_string(),
383 });
384 }
385 }
386
387 let batch = build_registration_batch(&self.schema, registration)?;
389 let mut table_guard = self.table.write().await;
390
391 let updated_table = table_guard
392 .clone()
393 .write(vec![batch])
394 .with_save_mode(SaveMode::Append)
395 .await?;
396
397 let _ = self.ctx.deregister_table(REGISTRY_TABLE_NAME);
398 if let Ok(provider) = updated_table.table_provider().await {
399 self.ctx.register_table(REGISTRY_TABLE_NAME, provider)?;
400 }
401 updated_table.update_datafusion_session(&self.ctx.state())?;
402
403 *table_guard = updated_table;
404
405 self.cache.insert(fqn, registration.clone());
407
408 Ok(RegistrationResult::Created)
409 }
410
411 pub fn get(&self, fqn: &str) -> Option<DatasetRegistration> {
413 self.cache.get(fqn).map(|r| r.clone())
414 }
415
416 pub fn get_by_namespace(&self, namespace: &DatasetNamespace) -> Option<DatasetRegistration> {
418 self.get(&namespace.fqn())
419 }
420
421 pub fn list_active(&self) -> Vec<DatasetRegistration> {
423 self.cache
424 .iter()
425 .filter(|e| matches!(e.value().status, DatasetStatus::Active))
426 .map(|e| e.value().clone())
427 .collect()
428 }
429
430 pub async fn refresh(&self) -> Result<(), DatasetEngineError> {
432 {
433 let mut table_guard = self.table.write().await;
434 match table_guard.update_incremental(None).await {
435 Ok(_) => {}
436 Err(e) => {
437 debug!("Registry refresh skipped: {}", e);
438 return Ok(());
439 }
440 }
441 }
442 self.populate_cache().await
443 }
444}