Skip to main content

scouter_dataframe/parquet/control/
engine.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::traits::arrow_schema_to_delta;
3use crate::parquet::utils::register_cloud_logstore_factories;
4use crate::storage::ObjectStore;
5use arrow::array::*;
6use arrow::datatypes::*;
7use arrow_array::RecordBatch;
8use chrono::{DateTime, Duration, Utc};
9use datafusion::logical_expr::{col, lit};
10use datafusion::prelude::SessionContext;
11use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
12use scouter_settings::ObjectStorageSettings;
13use std::sync::Arc;
14use tokio::sync::RwLock as AsyncRwLock;
15use tracing::{debug, info, warn};
16use url::Url;
17
18const CONTROL_TABLE_NAME: &str = "_scouter_control";
19
20/// Stale lock threshold: if a task has been "processing" for longer than this,
21/// it is considered abandoned and can be reclaimed by another pod.
22const STALE_LOCK_MINUTES: i64 = 30;
23
24/// Status values for task records in the control table.
25mod status {
26    pub const IDLE: &str = "idle";
27    pub const PROCESSING: &str = "processing";
28}
29
30/// A task record in the control table.
31#[derive(Debug, Clone)]
32pub struct TaskRecord {
33    pub task_name: String,
34    pub status: String,
35    pub pod_id: String,
36    pub claimed_at: DateTime<Utc>,
37    pub completed_at: Option<DateTime<Utc>>,
38    pub next_run_at: DateTime<Utc>,
39}
40
41/// Arrow schema for the control table.
42///
43/// Simple, flat schema — no dictionary encoding or bloom filters needed.
44/// This table will only ever have a handful of rows (one per task type).
45fn control_schema() -> Schema {
46    Schema::new(vec![
47        Field::new("task_name", DataType::Utf8, false),
48        Field::new("status", DataType::Utf8, false),
49        Field::new("pod_id", DataType::Utf8, false),
50        Field::new(
51            "claimed_at",
52            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
53            false,
54        ),
55        Field::new(
56            "completed_at",
57            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
58            true,
59        ),
60        Field::new(
61            "next_run_at",
62            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
63            false,
64        ),
65    ])
66}
67
68fn build_task_batch(
69    schema: &SchemaRef,
70    record: &TaskRecord,
71) -> Result<RecordBatch, TraceEngineError> {
72    let task_name = StringArray::from(vec![record.task_name.as_str()]);
73    let status = StringArray::from(vec![record.status.as_str()]);
74    let pod_id = StringArray::from(vec![record.pod_id.as_str()]);
75    let claimed_at = TimestampMicrosecondArray::from(vec![record.claimed_at.timestamp_micros()])
76        .with_timezone("UTC");
77    let completed_at = if let Some(ts) = record.completed_at {
78        TimestampMicrosecondArray::from(vec![Some(ts.timestamp_micros())]).with_timezone("UTC")
79    } else {
80        TimestampMicrosecondArray::from(vec![None::<i64>]).with_timezone("UTC")
81    };
82    let next_run_at = TimestampMicrosecondArray::from(vec![record.next_run_at.timestamp_micros()])
83        .with_timezone("UTC");
84
85    RecordBatch::try_new(
86        schema.clone(),
87        vec![
88            Arc::new(task_name),
89            Arc::new(status),
90            Arc::new(pod_id),
91            Arc::new(claimed_at),
92            Arc::new(completed_at),
93            Arc::new(next_run_at),
94        ],
95    )
96    .map_err(Into::into)
97}
98
99/// Get a stable pod identifier for distributed locking.
100///
101/// Resolution order: `HOSTNAME` (K8s default) → `POD_NAME` (custom override) →
102/// `local-{pid}` for local dev.
103pub fn get_pod_id() -> String {
104    std::env::var("HOSTNAME")
105        .or_else(|_| std::env::var("POD_NAME"))
106        .unwrap_or_else(|_| format!("local-{}", std::process::id()))
107}
108
109/// Distributed coordination engine backed by a Delta Lake table.
110///
111/// Uses Delta Lake's optimistic concurrency control (OCC) to implement
112/// distributed task locking across multiple K8s pods. The transaction log
113/// serializes concurrent claims — if two pods race to claim the same task,
114/// one commit succeeds and the other gets a `TransactionError`.
115///
116/// The control table lives at `{storage_uri}/_scouter_control/` alongside
117/// the trace data tables.
118pub struct ControlTableEngine {
119    schema: SchemaRef,
120    #[allow(dead_code)] // Used for future vacuum/maintenance operations
121    object_store: ObjectStore,
122    table: Arc<AsyncRwLock<DeltaTable>>,
123    ctx: Arc<SessionContext>,
124    pod_id: String,
125}
126
127impl ControlTableEngine {
128    /// Create or load the control table.
129    ///
130    /// The `pod_id` identifies this instance for distributed locking. In K8s,
131    /// pass the pod hostname (`std::env::var("HOSTNAME")`).
132    pub async fn new(
133        storage_settings: &ObjectStorageSettings,
134        pod_id: String,
135    ) -> Result<Self, TraceEngineError> {
136        let object_store = ObjectStore::new(storage_settings)?;
137        let schema = Arc::new(control_schema());
138        let table = build_or_create_control_table(&object_store, schema.clone()).await?;
139        let ctx = object_store.get_session()?;
140
141        if let Ok(provider) = table.table_provider().await {
142            ctx.register_table(CONTROL_TABLE_NAME, provider)?;
143        } else {
144            info!("Empty control table at init — deferring registration until first write");
145        }
146
147        Ok(Self {
148            schema,
149            object_store,
150            table: Arc::new(AsyncRwLock::new(table)),
151            ctx: Arc::new(ctx),
152            pod_id,
153        })
154    }
155
156    /// Try to claim a task for exclusive execution.
157    ///
158    /// Returns `true` if this pod successfully claimed the task.
159    /// Returns `false` if:
160    /// - Another pod is already processing (and the lock is not stale)
161    /// - The task is not yet due (`next_run_at` is in the future)
162    /// - A concurrent pod won the Delta OCC race
163    ///
164    /// This is the distributed equivalent of `SELECT ... FOR UPDATE SKIP LOCKED`.
165    pub async fn try_claim_task(&self, task_name: &str) -> Result<bool, TraceEngineError> {
166        let mut table_guard = self.table.write().await;
167
168        // Refresh from storage to see commits from other pods
169        if let Err(e) = table_guard.update_incremental(None).await {
170            debug!("Control table update skipped (new table): {}", e);
171        }
172
173        // Re-register so DataFusion sees latest state
174        let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
175        if let Ok(provider) = table_guard.table_provider().await {
176            self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
177        }
178
179        // Read current task state
180        let current = self
181            .read_task(&table_guard_to_ctx(&self.ctx), task_name)
182            .await?;
183
184        let now = Utc::now();
185
186        match current {
187            Some(record) => {
188                // Check if another pod is actively processing
189                if record.status == status::PROCESSING {
190                    let stale_threshold = now - Duration::minutes(STALE_LOCK_MINUTES);
191                    if record.claimed_at > stale_threshold {
192                        debug!(
193                            "Task '{}' is being processed by pod '{}' (not stale), skipping",
194                            task_name, record.pod_id
195                        );
196                        return Ok(false);
197                    }
198                    warn!(
199                        "Task '{}' claimed by pod '{}' is stale (claimed_at: {}), reclaiming",
200                        task_name, record.pod_id, record.claimed_at
201                    );
202                }
203
204                // Check if task is due
205                if now < record.next_run_at {
206                    debug!(
207                        "Task '{}' not due until {}, skipping",
208                        task_name, record.next_run_at
209                    );
210                    return Ok(false);
211                }
212
213                // Claim the task by overwriting the entire table content for this task.
214                // Delta OCC ensures only one pod wins.
215                let claimed = TaskRecord {
216                    task_name: task_name.to_string(),
217                    status: status::PROCESSING.to_string(),
218                    pod_id: self.pod_id.clone(),
219                    claimed_at: now,
220                    completed_at: None,
221                    next_run_at: record.next_run_at,
222                };
223
224                match self.write_task_update(&mut table_guard, &claimed).await {
225                    Ok(()) => {
226                        info!("Successfully claimed task '{}'", task_name);
227                        Ok(true)
228                    }
229                    Err(TraceEngineError::DataTableError(ref e))
230                        if e.to_string().contains("Transaction") =>
231                    {
232                        info!("Lost OCC race for task '{}' to another pod", task_name);
233                        Ok(false)
234                    }
235                    Err(e) => Err(e),
236                }
237            }
238            None => {
239                // Task doesn't exist yet — create it in "processing" state.
240                // The first pod to reach here wins via OCC.
241                let claimed = TaskRecord {
242                    task_name: task_name.to_string(),
243                    status: status::PROCESSING.to_string(),
244                    pod_id: self.pod_id.clone(),
245                    claimed_at: now,
246                    completed_at: None,
247                    next_run_at: now, // Will be updated on release
248                };
249
250                match self.write_task_update(&mut table_guard, &claimed).await {
251                    Ok(()) => {
252                        info!("Created and claimed new task '{}'", task_name);
253                        Ok(true)
254                    }
255                    Err(TraceEngineError::DataTableError(ref e))
256                        if e.to_string().contains("Transaction") =>
257                    {
258                        info!("Lost OCC race for new task '{}' to another pod", task_name);
259                        Ok(false)
260                    }
261                    Err(e) => Err(e),
262                }
263            }
264        }
265    }
266
267    /// Release a task after successful completion, scheduling the next run.
268    ///
269    /// Sets status to "idle" and updates `next_run_at` based on the provided interval.
270    pub async fn release_task(
271        &self,
272        task_name: &str,
273        next_run_interval: Duration,
274    ) -> Result<(), TraceEngineError> {
275        let mut table_guard = self.table.write().await;
276        let now = Utc::now();
277
278        let released = TaskRecord {
279            task_name: task_name.to_string(),
280            status: status::IDLE.to_string(),
281            pod_id: self.pod_id.clone(),
282            claimed_at: now,
283            completed_at: Some(now),
284            next_run_at: now + next_run_interval,
285        };
286
287        self.write_task_update(&mut table_guard, &released).await?;
288
289        info!(
290            "Released task '{}', next run at {}",
291            task_name, released.next_run_at
292        );
293        Ok(())
294    }
295
296    /// Release a task after a failure, keeping the original `next_run_at` so it
297    /// can be retried immediately by any pod.
298    pub async fn release_task_on_failure(&self, task_name: &str) -> Result<(), TraceEngineError> {
299        let mut table_guard = self.table.write().await;
300
301        // Refresh to get current next_run_at
302        if let Err(e) = table_guard.update_incremental(None).await {
303            debug!("Control table update skipped: {}", e);
304        }
305
306        let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
307        if let Ok(provider) = table_guard.table_provider().await {
308            self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
309        }
310
311        let current = self
312            .read_task(&table_guard_to_ctx(&self.ctx), task_name)
313            .await?;
314
315        let now = Utc::now();
316        let next_run = current.map(|r| r.next_run_at).unwrap_or(now);
317
318        let released = TaskRecord {
319            task_name: task_name.to_string(),
320            status: status::IDLE.to_string(),
321            pod_id: self.pod_id.clone(),
322            claimed_at: now,
323            completed_at: Some(now),
324            next_run_at: next_run,
325        };
326
327        self.write_task_update(&mut table_guard, &released).await?;
328
329        warn!(
330            "Released task '{}' after failure, next_run_at unchanged: {}",
331            task_name, next_run
332        );
333        Ok(())
334    }
335
336    /// Check if a task is due and not currently being processed.
337    pub async fn is_task_due(&self, task_name: &str) -> Result<bool, TraceEngineError> {
338        let mut table_guard = self.table.write().await;
339
340        if let Err(e) = table_guard.update_incremental(None).await {
341            debug!("Control table update skipped: {}", e);
342        }
343
344        let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
345        if let Ok(provider) = table_guard.table_provider().await {
346            self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
347        }
348
349        let current = self
350            .read_task(&table_guard_to_ctx(&self.ctx), task_name)
351            .await?;
352
353        let now = Utc::now();
354        match current {
355            Some(record) => {
356                if record.status == status::PROCESSING {
357                    let stale_threshold = now - Duration::minutes(STALE_LOCK_MINUTES);
358                    // Due only if the lock is stale
359                    Ok(record.claimed_at <= stale_threshold)
360                } else {
361                    Ok(now >= record.next_run_at)
362                }
363            }
364            // Never registered = due (first run)
365            None => Ok(true),
366        }
367    }
368
369    /// Read a single task record from the control table via DataFusion.
370    async fn read_task(
371        &self,
372        ctx: &SessionContext,
373        task_name: &str,
374    ) -> Result<Option<TaskRecord>, TraceEngineError> {
375        let table_exists = ctx.table_exist(CONTROL_TABLE_NAME)?;
376        if !table_exists {
377            return Ok(None);
378        }
379
380        let df = ctx
381            .table(CONTROL_TABLE_NAME)
382            .await
383            .map_err(TraceEngineError::DatafusionError)?;
384
385        let df = df
386            .filter(col("task_name").eq(lit(task_name)))
387            .map_err(TraceEngineError::DatafusionError)?;
388
389        let batches = df
390            .collect()
391            .await
392            .map_err(TraceEngineError::DatafusionError)?;
393
394        // Extract the first (and only) row if it exists.
395        // DataFusion may return Utf8View instead of Utf8 for string columns,
396        // so cast to Utf8 before downcast to StringArray.
397        for batch in &batches {
398            if batch.num_rows() == 0 {
399                continue;
400            }
401
402            let get_string = |col_name: &str| -> String {
403                let col = batch.column_by_name(col_name).unwrap();
404                let casted =
405                    arrow::compute::cast(col, &DataType::Utf8).expect("cast to Utf8 failed");
406                let arr = casted.as_any().downcast_ref::<StringArray>().unwrap();
407                arr.value(0).to_string()
408            };
409
410            let get_timestamp = |col_name: &str| -> Option<DateTime<Utc>> {
411                let col = batch.column_by_name(col_name).unwrap();
412                if col.is_null(0) {
413                    return None;
414                }
415                let arr = col
416                    .as_any()
417                    .downcast_ref::<TimestampMicrosecondArray>()
418                    .unwrap();
419                DateTime::from_timestamp_micros(arr.value(0))
420            };
421
422            let task_name_val = get_string("task_name");
423            let status_val = get_string("status");
424            let pod_id_val = get_string("pod_id");
425            let claimed_at = get_timestamp("claimed_at").unwrap_or_else(Utc::now);
426            let completed_at = get_timestamp("completed_at");
427            let next_run_at = get_timestamp("next_run_at").unwrap_or_else(Utc::now);
428
429            return Ok(Some(TaskRecord {
430                task_name: task_name_val,
431                status: status_val,
432                pod_id: pod_id_val,
433                claimed_at,
434                completed_at,
435                next_run_at,
436            }));
437        }
438
439        Ok(None)
440    }
441
442    /// Write a task update using Delta Lake MERGE-like semantics.
443    ///
444    /// Strategy: DELETE the existing row for this task_name, then APPEND the new row.
445    /// Delta OCC ensures atomicity — if two pods race, one commit fails with
446    /// `TransactionError` and the caller can retry or back off.
447    async fn write_task_update(
448        &self,
449        table_guard: &mut DeltaTable,
450        record: &TaskRecord,
451    ) -> Result<(), TraceEngineError> {
452        let batch = build_task_batch(&self.schema, record)?;
453
454        // First, delete the existing row for this task (if any).
455        // On a brand-new table with no data, delete will fail — that's fine.
456        let predicate = format!("task_name = '{}'", record.task_name);
457        let delete_result = table_guard.clone().delete().with_predicate(predicate).await;
458
459        match delete_result {
460            Ok((updated_table, _metrics)) => {
461                // Delete succeeded — now append the new row to the updated table
462                let updated_table = updated_table
463                    .write(vec![batch])
464                    .with_save_mode(deltalake::protocol::SaveMode::Append)
465                    .await?;
466
467                let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
468                if let Ok(provider) = updated_table.table_provider().await {
469                    self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
470                }
471                *table_guard = updated_table;
472            }
473            Err(_) => {
474                // No existing data to delete (new table) — just append
475                let updated_table = table_guard
476                    .clone()
477                    .write(vec![batch])
478                    .with_save_mode(deltalake::protocol::SaveMode::Append)
479                    .await?;
480
481                let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
482                if let Ok(provider) = updated_table.table_provider().await {
483                    self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
484                }
485                *table_guard = updated_table;
486            }
487        }
488
489        Ok(())
490    }
491}
492
493/// Helper to avoid borrow issues — just returns the ctx reference.
494fn table_guard_to_ctx(ctx: &Arc<SessionContext>) -> SessionContext {
495    ctx.as_ref().clone()
496}
497
498/// Build or load the control table at `{base_url}/_scouter_control/`.
499async fn build_or_create_control_table(
500    object_store: &ObjectStore,
501    schema: SchemaRef,
502) -> Result<DeltaTable, TraceEngineError> {
503    // Reuse the cloud logstore factories registered by the trace engine.
504    // Safe to call repeatedly — existing entries are not overwritten.
505    register_cloud_logstore_factories();
506
507    let base_url = object_store.get_base_url()?;
508    let control_url = append_path_to_url(&base_url, CONTROL_TABLE_NAME)?;
509
510    info!(
511        "Loading control table [{}://.../{} ]",
512        control_url.scheme(),
513        control_url
514            .path_segments()
515            .and_then(|mut s| s.next_back())
516            .unwrap_or(CONTROL_TABLE_NAME)
517    );
518
519    let store = object_store.as_dyn_object_store();
520
521    let is_delta_table = if control_url.scheme() == "file" {
522        if let Ok(path) = control_url.to_file_path() {
523            if !path.exists() {
524                info!("Creating directory for control table: {:?}", path);
525                std::fs::create_dir_all(&path)?;
526            }
527            path.join("_delta_log").exists()
528        } else {
529            false
530        }
531    } else {
532        match DeltaTableBuilder::from_url(control_url.clone()) {
533            Ok(builder) => builder
534                .with_storage_backend(store.clone(), control_url.clone())
535                .load()
536                .await
537                .is_ok(),
538            Err(_) => false,
539        }
540    };
541
542    if is_delta_table {
543        info!(
544            "Loaded existing control table [{}://.../{} ]",
545            control_url.scheme(),
546            control_url
547                .path_segments()
548                .and_then(|mut s| s.next_back())
549                .unwrap_or(CONTROL_TABLE_NAME)
550        );
551        let table = DeltaTableBuilder::from_url(control_url.clone())?
552            .with_storage_backend(store, control_url)
553            .load()
554            .await?;
555        Ok(table)
556    } else {
557        info!("Creating new control table");
558        let table = DeltaTableBuilder::from_url(control_url.clone())?
559            .with_storage_backend(store, control_url)
560            .build()?;
561
562        let delta_fields = arrow_schema_to_delta(&schema);
563
564        table
565            .create()
566            .with_table_name(CONTROL_TABLE_NAME)
567            .with_columns(delta_fields)
568            .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
569            .await
570            .map_err(Into::into)
571    }
572}
573
574/// Append a path segment to a URL, handling trailing slashes correctly.
575fn append_path_to_url(base: &Url, segment: &str) -> Result<Url, TraceEngineError> {
576    let mut url = base.clone();
577    // Ensure the base path ends with '/'
578    if !url.path().ends_with('/') {
579        url.set_path(&format!("{}/", url.path()));
580    }
581    url = url.join(segment)?;
582    Ok(url)
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use scouter_settings::ObjectStorageSettings;
589
590    fn cleanup() {
591        let storage_settings = ObjectStorageSettings::default();
592        let current_dir = std::env::current_dir().unwrap();
593        let storage_path = current_dir.join(storage_settings.storage_root());
594        if storage_path.exists() {
595            std::fs::remove_dir_all(storage_path).unwrap();
596        }
597    }
598
599    #[tokio::test]
600    async fn test_control_table_init() -> Result<(), TraceEngineError> {
601        cleanup();
602
603        let settings = ObjectStorageSettings::default();
604        let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
605
606        // No tasks should exist yet
607        let due = engine.is_task_due("optimize").await?;
608        assert!(due, "New task should be due (never run before)");
609
610        cleanup();
611        Ok(())
612    }
613
614    #[tokio::test]
615    async fn test_claim_and_release() -> Result<(), TraceEngineError> {
616        cleanup();
617
618        let settings = ObjectStorageSettings::default();
619        let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
620
621        // Claim a new task
622        let claimed = engine.try_claim_task("optimize").await?;
623        assert!(claimed, "First claim should succeed");
624
625        // Second claim from same engine should fail (task is processing)
626        let claimed_again = engine.try_claim_task("optimize").await?;
627        assert!(
628            !claimed_again,
629            "Second claim should fail (already processing)"
630        );
631
632        // Release with 1-hour interval
633        engine.release_task("optimize", Duration::hours(1)).await?;
634
635        // Task should not be due yet (next_run_at is 1 hour from now)
636        let due = engine.is_task_due("optimize").await?;
637        assert!(!due, "Task should not be due yet");
638
639        cleanup();
640        Ok(())
641    }
642
643    #[tokio::test]
644    async fn test_claim_release_then_due() -> Result<(), TraceEngineError> {
645        cleanup();
646
647        let settings = ObjectStorageSettings::default();
648        let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
649
650        // Claim and release with 0-second interval (immediately due again)
651        let claimed = engine.try_claim_task("vacuum").await?;
652        assert!(claimed);
653
654        engine.release_task("vacuum", Duration::seconds(0)).await?;
655
656        // Should be due now
657        let due = engine.is_task_due("vacuum").await?;
658        assert!(due, "Task should be due after 0-second interval");
659
660        // Should be claimable again
661        let claimed = engine.try_claim_task("vacuum").await?;
662        assert!(claimed, "Task should be claimable after release");
663
664        // Release on failure — next_run_at stays the same
665        engine.release_task_on_failure("vacuum").await?;
666
667        cleanup();
668        Ok(())
669    }
670
671    #[tokio::test]
672    async fn test_multiple_tasks() -> Result<(), TraceEngineError> {
673        cleanup();
674
675        let settings = ObjectStorageSettings::default();
676        let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
677
678        // Claim two different tasks
679        let claimed_opt = engine.try_claim_task("optimize").await?;
680        let claimed_vac = engine.try_claim_task("vacuum").await?;
681        assert!(claimed_opt, "Optimize claim should succeed");
682        assert!(claimed_vac, "Vacuum claim should succeed");
683
684        // Release both
685        engine.release_task("optimize", Duration::hours(24)).await?;
686        engine.release_task("vacuum", Duration::hours(168)).await?;
687
688        cleanup();
689        Ok(())
690    }
691}