1use crate::error::TraceEngineError;
2use crate::parquet::tracing::traits::arrow_schema_to_delta;
3use crate::parquet::utils::register_cloud_logstore_factories;
4use crate::storage::ObjectStore;
5use arrow::array::*;
6use arrow::datatypes::*;
7use arrow_array::RecordBatch;
8use chrono::{DateTime, Duration, Utc};
9use datafusion::logical_expr::{col, lit};
10use datafusion::prelude::SessionContext;
11use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
12use scouter_settings::ObjectStorageSettings;
13use std::sync::Arc;
14use tokio::sync::RwLock as AsyncRwLock;
15use tracing::{debug, info, warn};
16use url::Url;
17
18const CONTROL_TABLE_NAME: &str = "_scouter_control";
19
20const STALE_LOCK_MINUTES: i64 = 30;
23
24mod status {
26 pub const IDLE: &str = "idle";
27 pub const PROCESSING: &str = "processing";
28}
29
30#[derive(Debug, Clone)]
32pub struct TaskRecord {
33 pub task_name: String,
34 pub status: String,
35 pub pod_id: String,
36 pub claimed_at: DateTime<Utc>,
37 pub completed_at: Option<DateTime<Utc>>,
38 pub next_run_at: DateTime<Utc>,
39}
40
41fn control_schema() -> Schema {
46 Schema::new(vec![
47 Field::new("task_name", DataType::Utf8, false),
48 Field::new("status", DataType::Utf8, false),
49 Field::new("pod_id", DataType::Utf8, false),
50 Field::new(
51 "claimed_at",
52 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
53 false,
54 ),
55 Field::new(
56 "completed_at",
57 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
58 true,
59 ),
60 Field::new(
61 "next_run_at",
62 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
63 false,
64 ),
65 ])
66}
67
68fn build_task_batch(
69 schema: &SchemaRef,
70 record: &TaskRecord,
71) -> Result<RecordBatch, TraceEngineError> {
72 let task_name = StringArray::from(vec![record.task_name.as_str()]);
73 let status = StringArray::from(vec![record.status.as_str()]);
74 let pod_id = StringArray::from(vec![record.pod_id.as_str()]);
75 let claimed_at = TimestampMicrosecondArray::from(vec![record.claimed_at.timestamp_micros()])
76 .with_timezone("UTC");
77 let completed_at = if let Some(ts) = record.completed_at {
78 TimestampMicrosecondArray::from(vec![Some(ts.timestamp_micros())]).with_timezone("UTC")
79 } else {
80 TimestampMicrosecondArray::from(vec![None::<i64>]).with_timezone("UTC")
81 };
82 let next_run_at = TimestampMicrosecondArray::from(vec![record.next_run_at.timestamp_micros()])
83 .with_timezone("UTC");
84
85 RecordBatch::try_new(
86 schema.clone(),
87 vec![
88 Arc::new(task_name),
89 Arc::new(status),
90 Arc::new(pod_id),
91 Arc::new(claimed_at),
92 Arc::new(completed_at),
93 Arc::new(next_run_at),
94 ],
95 )
96 .map_err(Into::into)
97}
98
99pub fn get_pod_id() -> String {
104 std::env::var("HOSTNAME")
105 .or_else(|_| std::env::var("POD_NAME"))
106 .unwrap_or_else(|_| format!("local-{}", std::process::id()))
107}
108
109pub struct ControlTableEngine {
119 schema: SchemaRef,
120 #[allow(dead_code)] object_store: ObjectStore,
122 table: Arc<AsyncRwLock<DeltaTable>>,
123 ctx: Arc<SessionContext>,
124 pod_id: String,
125}
126
127impl ControlTableEngine {
128 pub async fn new(
133 storage_settings: &ObjectStorageSettings,
134 pod_id: String,
135 ) -> Result<Self, TraceEngineError> {
136 let object_store = ObjectStore::new(storage_settings)?;
137 let schema = Arc::new(control_schema());
138 let table = build_or_create_control_table(&object_store, schema.clone()).await?;
139 let ctx = object_store.get_session()?;
140
141 if let Ok(provider) = table.table_provider().await {
142 ctx.register_table(CONTROL_TABLE_NAME, provider)?;
143 } else {
144 info!("Empty control table at init — deferring registration until first write");
145 }
146
147 Ok(Self {
148 schema,
149 object_store,
150 table: Arc::new(AsyncRwLock::new(table)),
151 ctx: Arc::new(ctx),
152 pod_id,
153 })
154 }
155
156 pub async fn try_claim_task(&self, task_name: &str) -> Result<bool, TraceEngineError> {
166 let mut table_guard = self.table.write().await;
167
168 if let Err(e) = table_guard.update_incremental(None).await {
170 debug!("Control table update skipped (new table): {}", e);
171 }
172
173 let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
175 if let Ok(provider) = table_guard.table_provider().await {
176 self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
177 }
178
179 let current = self
181 .read_task(&table_guard_to_ctx(&self.ctx), task_name)
182 .await?;
183
184 let now = Utc::now();
185
186 match current {
187 Some(record) => {
188 if record.status == status::PROCESSING {
190 let stale_threshold = now - Duration::minutes(STALE_LOCK_MINUTES);
191 if record.claimed_at > stale_threshold {
192 debug!(
193 "Task '{}' is being processed by pod '{}' (not stale), skipping",
194 task_name, record.pod_id
195 );
196 return Ok(false);
197 }
198 warn!(
199 "Task '{}' claimed by pod '{}' is stale (claimed_at: {}), reclaiming",
200 task_name, record.pod_id, record.claimed_at
201 );
202 }
203
204 if now < record.next_run_at {
206 debug!(
207 "Task '{}' not due until {}, skipping",
208 task_name, record.next_run_at
209 );
210 return Ok(false);
211 }
212
213 let claimed = TaskRecord {
216 task_name: task_name.to_string(),
217 status: status::PROCESSING.to_string(),
218 pod_id: self.pod_id.clone(),
219 claimed_at: now,
220 completed_at: None,
221 next_run_at: record.next_run_at,
222 };
223
224 match self.write_task_update(&mut table_guard, &claimed).await {
225 Ok(()) => {
226 info!("Successfully claimed task '{}'", task_name);
227 Ok(true)
228 }
229 Err(TraceEngineError::DataTableError(ref e))
230 if e.to_string().contains("Transaction") =>
231 {
232 info!("Lost OCC race for task '{}' to another pod", task_name);
233 Ok(false)
234 }
235 Err(e) => Err(e),
236 }
237 }
238 None => {
239 let claimed = TaskRecord {
242 task_name: task_name.to_string(),
243 status: status::PROCESSING.to_string(),
244 pod_id: self.pod_id.clone(),
245 claimed_at: now,
246 completed_at: None,
247 next_run_at: now, };
249
250 match self.write_task_update(&mut table_guard, &claimed).await {
251 Ok(()) => {
252 info!("Created and claimed new task '{}'", task_name);
253 Ok(true)
254 }
255 Err(TraceEngineError::DataTableError(ref e))
256 if e.to_string().contains("Transaction") =>
257 {
258 info!("Lost OCC race for new task '{}' to another pod", task_name);
259 Ok(false)
260 }
261 Err(e) => Err(e),
262 }
263 }
264 }
265 }
266
267 pub async fn release_task(
271 &self,
272 task_name: &str,
273 next_run_interval: Duration,
274 ) -> Result<(), TraceEngineError> {
275 let mut table_guard = self.table.write().await;
276 let now = Utc::now();
277
278 let released = TaskRecord {
279 task_name: task_name.to_string(),
280 status: status::IDLE.to_string(),
281 pod_id: self.pod_id.clone(),
282 claimed_at: now,
283 completed_at: Some(now),
284 next_run_at: now + next_run_interval,
285 };
286
287 self.write_task_update(&mut table_guard, &released).await?;
288
289 info!(
290 "Released task '{}', next run at {}",
291 task_name, released.next_run_at
292 );
293 Ok(())
294 }
295
296 pub async fn release_task_on_failure(&self, task_name: &str) -> Result<(), TraceEngineError> {
299 let mut table_guard = self.table.write().await;
300
301 if let Err(e) = table_guard.update_incremental(None).await {
303 debug!("Control table update skipped: {}", e);
304 }
305
306 let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
307 if let Ok(provider) = table_guard.table_provider().await {
308 self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
309 }
310
311 let current = self
312 .read_task(&table_guard_to_ctx(&self.ctx), task_name)
313 .await?;
314
315 let now = Utc::now();
316 let next_run = current.map(|r| r.next_run_at).unwrap_or(now);
317
318 let released = TaskRecord {
319 task_name: task_name.to_string(),
320 status: status::IDLE.to_string(),
321 pod_id: self.pod_id.clone(),
322 claimed_at: now,
323 completed_at: Some(now),
324 next_run_at: next_run,
325 };
326
327 self.write_task_update(&mut table_guard, &released).await?;
328
329 warn!(
330 "Released task '{}' after failure, next_run_at unchanged: {}",
331 task_name, next_run
332 );
333 Ok(())
334 }
335
336 pub async fn is_task_due(&self, task_name: &str) -> Result<bool, TraceEngineError> {
338 let mut table_guard = self.table.write().await;
339
340 if let Err(e) = table_guard.update_incremental(None).await {
341 debug!("Control table update skipped: {}", e);
342 }
343
344 let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
345 if let Ok(provider) = table_guard.table_provider().await {
346 self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
347 }
348
349 let current = self
350 .read_task(&table_guard_to_ctx(&self.ctx), task_name)
351 .await?;
352
353 let now = Utc::now();
354 match current {
355 Some(record) => {
356 if record.status == status::PROCESSING {
357 let stale_threshold = now - Duration::minutes(STALE_LOCK_MINUTES);
358 Ok(record.claimed_at <= stale_threshold)
360 } else {
361 Ok(now >= record.next_run_at)
362 }
363 }
364 None => Ok(true),
366 }
367 }
368
369 async fn read_task(
371 &self,
372 ctx: &SessionContext,
373 task_name: &str,
374 ) -> Result<Option<TaskRecord>, TraceEngineError> {
375 let table_exists = ctx.table_exist(CONTROL_TABLE_NAME)?;
376 if !table_exists {
377 return Ok(None);
378 }
379
380 let df = ctx
381 .table(CONTROL_TABLE_NAME)
382 .await
383 .map_err(TraceEngineError::DatafusionError)?;
384
385 let df = df
386 .filter(col("task_name").eq(lit(task_name)))
387 .map_err(TraceEngineError::DatafusionError)?;
388
389 let batches = df
390 .collect()
391 .await
392 .map_err(TraceEngineError::DatafusionError)?;
393
394 for batch in &batches {
398 if batch.num_rows() == 0 {
399 continue;
400 }
401
402 let get_string = |col_name: &str| -> String {
403 let col = batch.column_by_name(col_name).unwrap();
404 let casted =
405 arrow::compute::cast(col, &DataType::Utf8).expect("cast to Utf8 failed");
406 let arr = casted.as_any().downcast_ref::<StringArray>().unwrap();
407 arr.value(0).to_string()
408 };
409
410 let get_timestamp = |col_name: &str| -> Option<DateTime<Utc>> {
411 let col = batch.column_by_name(col_name).unwrap();
412 if col.is_null(0) {
413 return None;
414 }
415 let arr = col
416 .as_any()
417 .downcast_ref::<TimestampMicrosecondArray>()
418 .unwrap();
419 DateTime::from_timestamp_micros(arr.value(0))
420 };
421
422 let task_name_val = get_string("task_name");
423 let status_val = get_string("status");
424 let pod_id_val = get_string("pod_id");
425 let claimed_at = get_timestamp("claimed_at").unwrap_or_else(Utc::now);
426 let completed_at = get_timestamp("completed_at");
427 let next_run_at = get_timestamp("next_run_at").unwrap_or_else(Utc::now);
428
429 return Ok(Some(TaskRecord {
430 task_name: task_name_val,
431 status: status_val,
432 pod_id: pod_id_val,
433 claimed_at,
434 completed_at,
435 next_run_at,
436 }));
437 }
438
439 Ok(None)
440 }
441
442 async fn write_task_update(
448 &self,
449 table_guard: &mut DeltaTable,
450 record: &TaskRecord,
451 ) -> Result<(), TraceEngineError> {
452 let batch = build_task_batch(&self.schema, record)?;
453
454 let predicate = format!("task_name = '{}'", record.task_name);
457 let delete_result = table_guard.clone().delete().with_predicate(predicate).await;
458
459 match delete_result {
460 Ok((updated_table, _metrics)) => {
461 let updated_table = updated_table
463 .write(vec![batch])
464 .with_save_mode(deltalake::protocol::SaveMode::Append)
465 .await?;
466
467 let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
468 if let Ok(provider) = updated_table.table_provider().await {
469 self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
470 }
471 *table_guard = updated_table;
472 }
473 Err(_) => {
474 let updated_table = table_guard
476 .clone()
477 .write(vec![batch])
478 .with_save_mode(deltalake::protocol::SaveMode::Append)
479 .await?;
480
481 let _ = self.ctx.deregister_table(CONTROL_TABLE_NAME);
482 if let Ok(provider) = updated_table.table_provider().await {
483 self.ctx.register_table(CONTROL_TABLE_NAME, provider)?;
484 }
485 *table_guard = updated_table;
486 }
487 }
488
489 Ok(())
490 }
491}
492
493fn table_guard_to_ctx(ctx: &Arc<SessionContext>) -> SessionContext {
495 ctx.as_ref().clone()
496}
497
498async fn build_or_create_control_table(
500 object_store: &ObjectStore,
501 schema: SchemaRef,
502) -> Result<DeltaTable, TraceEngineError> {
503 register_cloud_logstore_factories();
506
507 let base_url = object_store.get_base_url()?;
508 let control_url = append_path_to_url(&base_url, CONTROL_TABLE_NAME)?;
509
510 info!(
511 "Loading control table [{}://.../{} ]",
512 control_url.scheme(),
513 control_url
514 .path_segments()
515 .and_then(|mut s| s.next_back())
516 .unwrap_or(CONTROL_TABLE_NAME)
517 );
518
519 let store = object_store.as_dyn_object_store();
520
521 let is_delta_table = if control_url.scheme() == "file" {
522 if let Ok(path) = control_url.to_file_path() {
523 if !path.exists() {
524 info!("Creating directory for control table: {:?}", path);
525 std::fs::create_dir_all(&path)?;
526 }
527 path.join("_delta_log").exists()
528 } else {
529 false
530 }
531 } else {
532 match DeltaTableBuilder::from_url(control_url.clone()) {
533 Ok(builder) => builder
534 .with_storage_backend(store.clone(), control_url.clone())
535 .load()
536 .await
537 .is_ok(),
538 Err(_) => false,
539 }
540 };
541
542 if is_delta_table {
543 info!(
544 "Loaded existing control table [{}://.../{} ]",
545 control_url.scheme(),
546 control_url
547 .path_segments()
548 .and_then(|mut s| s.next_back())
549 .unwrap_or(CONTROL_TABLE_NAME)
550 );
551 let table = DeltaTableBuilder::from_url(control_url.clone())?
552 .with_storage_backend(store, control_url)
553 .load()
554 .await?;
555 Ok(table)
556 } else {
557 info!("Creating new control table");
558 let table = DeltaTableBuilder::from_url(control_url.clone())?
559 .with_storage_backend(store, control_url)
560 .build()?;
561
562 let delta_fields = arrow_schema_to_delta(&schema);
563
564 table
565 .create()
566 .with_table_name(CONTROL_TABLE_NAME)
567 .with_columns(delta_fields)
568 .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
569 .await
570 .map_err(Into::into)
571 }
572}
573
574fn append_path_to_url(base: &Url, segment: &str) -> Result<Url, TraceEngineError> {
576 let mut url = base.clone();
577 if !url.path().ends_with('/') {
579 url.set_path(&format!("{}/", url.path()));
580 }
581 url = url.join(segment)?;
582 Ok(url)
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 use scouter_settings::ObjectStorageSettings;
589
590 fn cleanup() {
591 let storage_settings = ObjectStorageSettings::default();
592 let current_dir = std::env::current_dir().unwrap();
593 let storage_path = current_dir.join(storage_settings.storage_root());
594 if storage_path.exists() {
595 std::fs::remove_dir_all(storage_path).unwrap();
596 }
597 }
598
599 #[tokio::test]
600 async fn test_control_table_init() -> Result<(), TraceEngineError> {
601 cleanup();
602
603 let settings = ObjectStorageSettings::default();
604 let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
605
606 let due = engine.is_task_due("optimize").await?;
608 assert!(due, "New task should be due (never run before)");
609
610 cleanup();
611 Ok(())
612 }
613
614 #[tokio::test]
615 async fn test_claim_and_release() -> Result<(), TraceEngineError> {
616 cleanup();
617
618 let settings = ObjectStorageSettings::default();
619 let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
620
621 let claimed = engine.try_claim_task("optimize").await?;
623 assert!(claimed, "First claim should succeed");
624
625 let claimed_again = engine.try_claim_task("optimize").await?;
627 assert!(
628 !claimed_again,
629 "Second claim should fail (already processing)"
630 );
631
632 engine.release_task("optimize", Duration::hours(1)).await?;
634
635 let due = engine.is_task_due("optimize").await?;
637 assert!(!due, "Task should not be due yet");
638
639 cleanup();
640 Ok(())
641 }
642
643 #[tokio::test]
644 async fn test_claim_release_then_due() -> Result<(), TraceEngineError> {
645 cleanup();
646
647 let settings = ObjectStorageSettings::default();
648 let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
649
650 let claimed = engine.try_claim_task("vacuum").await?;
652 assert!(claimed);
653
654 engine.release_task("vacuum", Duration::seconds(0)).await?;
655
656 let due = engine.is_task_due("vacuum").await?;
658 assert!(due, "Task should be due after 0-second interval");
659
660 let claimed = engine.try_claim_task("vacuum").await?;
662 assert!(claimed, "Task should be claimable after release");
663
664 engine.release_task_on_failure("vacuum").await?;
666
667 cleanup();
668 Ok(())
669 }
670
671 #[tokio::test]
672 async fn test_multiple_tasks() -> Result<(), TraceEngineError> {
673 cleanup();
674
675 let settings = ObjectStorageSettings::default();
676 let engine = ControlTableEngine::new(&settings, "pod-1".to_string()).await?;
677
678 let claimed_opt = engine.try_claim_task("optimize").await?;
680 let claimed_vac = engine.try_claim_task("vacuum").await?;
681 assert!(claimed_opt, "Optimize claim should succeed");
682 assert!(claimed_vac, "Vacuum claim should succeed");
683
684 engine.release_task("optimize", Duration::hours(24)).await?;
686 engine.release_task("vacuum", Duration::hours(168)).await?;
687
688 cleanup();
689 Ok(())
690 }
691}