use crate::progress::{Job, JobStatus, JobType, ProgressStore};
use log::{error, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(feature = "ts-bindings")]
use ts_rs::TS;
#[inline]
fn current_timestamp_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time before Unix epoch")
.as_secs()
}
#[inline]
fn current_timestamp_nanos() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time before Unix epoch")
.as_nanos()
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[cfg_attr(feature = "ts-bindings", derive(TS))]
#[cfg_attr(
feature = "ts-bindings",
ts(
export,
export_to = "bindings/src/datafold_node/static-react/src/types/generated.ts"
)
)]
pub enum BackfillStatus {
InProgress,
Completed,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "ts-bindings", derive(TS))]
#[cfg_attr(
feature = "ts-bindings",
ts(
export,
export_to = "bindings/src/datafold_node/static-react/src/types/generated.ts"
)
)]
pub struct BackfillInfo {
pub backfill_hash: String,
pub transform_id: String,
pub schema_name: String,
pub status: BackfillStatus,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub start_time: u64,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub end_time: Option<u64>,
pub error: Option<String>,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub records_produced: u64,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub mutations_expected: u64,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub mutations_completed: u64,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub mutations_failed: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "ts-bindings", derive(TS))]
#[cfg_attr(
feature = "ts-bindings",
ts(
export,
export_to = "bindings/src/datafold_node/static-react/src/types/generated.ts"
)
)]
pub struct BackfillStatistics {
pub total_backfills: usize,
pub active_backfills: usize,
pub completed_backfills: usize,
pub failed_backfills: usize,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub total_mutations_expected: u64,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub total_mutations_completed: u64,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub total_mutations_failed: u64,
#[cfg_attr(feature = "ts-bindings", ts(type = "number"))]
pub total_records_produced: u64,
}
impl BackfillInfo {
fn generate_backfill_hash(transform_id: &str, schema_name: &str) -> String {
let timestamp = current_timestamp_nanos();
let input = format!("{}:{}:{}", transform_id, schema_name, timestamp);
let hash = seahash::hash(input.as_bytes());
format!("backfill_{:016x}", hash)
}
pub fn new_with_hash(backfill_hash: String, transform_id: String, schema_name: String) -> Self {
Self {
backfill_hash,
transform_id,
schema_name,
status: BackfillStatus::InProgress,
start_time: current_timestamp_secs(),
end_time: None,
error: None,
records_produced: 0,
mutations_expected: 0,
mutations_completed: 0,
mutations_failed: 0,
}
}
pub fn mark_failed(&mut self, error: String) {
self.status = BackfillStatus::Failed;
self.error = Some(error);
self.end_time = Some(current_timestamp_secs());
}
pub fn mark_completed(&mut self) {
self.status = BackfillStatus::Completed;
self.end_time = Some(current_timestamp_secs());
}
pub fn duration_seconds(&self) -> u64 {
let end = self.end_time.unwrap_or_else(current_timestamp_secs);
end.saturating_sub(self.start_time)
}
pub fn to_job(&self, user_id: Option<String>) -> Job {
let mut job = Job::new(
self.backfill_hash.clone(),
JobType::Backfill,
);
job.status = self.status.clone().into();
if let Some(uid) = user_id {
job.user_id = Some(uid);
}
job.error = self.error.clone();
job.progress_percentage = if self.mutations_expected > 0 {
((self.mutations_completed as f64 / self.mutations_expected as f64) * 100.0) as u8
} else {
0
};
if let Some(end) = self.end_time {
job.completed_at = Some(end);
}
job.metadata = serde_json::to_value(self).unwrap_or(Value::Null);
job
}
pub fn from_job(job: &Job) -> Option<Self> {
if job.job_type != JobType::Backfill {
return None;
}
if let Ok(info) = serde_json::from_value::<BackfillInfo>(job.metadata.clone()) {
return Some(info);
}
None
}
}
impl From<BackfillStatus> for JobStatus {
fn from(status: BackfillStatus) -> Self {
match status {
BackfillStatus::InProgress => JobStatus::Running,
BackfillStatus::Completed => JobStatus::Completed,
BackfillStatus::Failed => JobStatus::Failed,
}
}
}
pub struct BackfillTracker {
backfills: Arc<Mutex<HashMap<String, BackfillInfo>>>,
transform_to_hash: Arc<Mutex<HashMap<String, String>>>,
progress_store: Option<Arc<dyn ProgressStore>>,
}
impl BackfillTracker {
pub fn new(progress_store: Option<Arc<dyn ProgressStore>>) -> Self {
Self {
backfills: Arc::new(Mutex::new(HashMap::new())),
transform_to_hash: Arc::new(Mutex::new(HashMap::new())),
progress_store,
}
}
pub async fn load_from_store(&self, user_id: Option<String>) {
if let Some(store) = &self.progress_store {
match store.list_by_user(user_id.as_deref().unwrap_or("global")).await {
Ok(jobs) => {
let mut backfills = self.backfills.lock().unwrap();
let mut transform_to_hash = self.transform_to_hash.lock().unwrap();
for job in jobs {
if let Some(info) = BackfillInfo::from_job(&job) {
backfills.insert(info.backfill_hash.clone(), info.clone());
transform_to_hash.insert(info.transform_id.clone(), info.backfill_hash.clone());
}
}
}
Err(e) => {
warn!("Failed to load backfills from store: {}", e);
}
}
}
}
pub async fn start_backfill_with_hash(
&self,
backfill_hash: String,
transform_id: String,
schema_name: String,
) {
let info =
BackfillInfo::new_with_hash(backfill_hash.clone(), transform_id.clone(), schema_name);
{
self.backfills
.lock()
.unwrap()
.insert(backfill_hash.clone(), info.clone());
self.transform_to_hash
.lock()
.unwrap()
.insert(transform_id, backfill_hash);
}
if let Some(store) = &self.progress_store {
let job = info.to_job(None); if let Err(e) = store.save(&job).await {
error!("Failed to persist backfill start: {}", e);
}
}
}
pub fn generate_hash(transform_id: &str, schema_name: &str) -> String {
BackfillInfo::generate_backfill_hash(transform_id, schema_name)
}
pub async fn set_mutations_expected(&self, backfill_hash: &str, count: u64) {
let mut info_clone = None;
{
let mut backfills = self.backfills.lock().unwrap();
if let Some(info) = backfills.get_mut(backfill_hash) {
let was_in_progress = info.status == BackfillStatus::InProgress;
info.mutations_expected = count;
info.records_produced = count;
if count == 0 && was_in_progress {
info.status = BackfillStatus::Completed;
info.end_time = Some(current_timestamp_secs());
}
info_clone = Some(info.clone());
} else {
warn!(
"Attempted to set_mutations_expected for non-existent backfill: {}",
backfill_hash
);
}
}
if let Some(info) = info_clone {
if let Some(store) = &self.progress_store {
let job = info.to_job(None);
if let Err(e) = store.save(&job).await {
error!("Failed to persist backfill update: {}", e);
}
}
}
}
pub async fn increment_mutation_completed(&self, backfill_hash: &str) -> bool {
let mut is_completed = false;
let mut info_clone = None;
{
let mut backfills = self.backfills.lock().unwrap();
if let Some(info) = backfills.get_mut(backfill_hash) {
info.mutations_completed += 1;
if info.mutations_completed >= info.mutations_expected
&& info.mutations_expected > 0
&& info.status == BackfillStatus::InProgress
{
info.status = BackfillStatus::Completed;
info.end_time = Some(current_timestamp_secs());
is_completed = true;
info_clone = Some(info.clone());
} else if info.mutations_completed % 100 == 0 {
info_clone = Some(info.clone());
}
}
}
if let Some(info) = info_clone {
if let Some(store) = &self.progress_store {
let job = info.to_job(None);
if let Err(e) = store.save(&job).await {
warn!("Failed to persist backfill progress: {}", e);
}
}
}
is_completed
}
pub async fn increment_mutation_failed(&self, backfill_hash: &str, error_msg: String) {
let mut info_clone = None;
{
let mut backfills = self.backfills.lock().unwrap();
if let Some(info) = backfills.get_mut(backfill_hash) {
info.mutations_failed += 1;
let total_processed = info.mutations_completed + info.mutations_failed;
let failure_rate = if total_processed > 0 {
info.mutations_failed as f64 / total_processed as f64
} else {
0.0
};
if failure_rate > 0.1 && total_processed > 10 {
info.status = BackfillStatus::Failed;
info.error = Some(format!(
"Backfill failed: {} mutations failed ({:.1}% failure rate). Last error: {}",
info.mutations_failed,
failure_rate * 100.0,
error_msg
));
info.end_time = Some(current_timestamp_secs());
info_clone = Some(info.clone());
}
}
}
if let Some(info) = info_clone {
if let Some(store) = &self.progress_store {
let job = info.to_job(None);
if let Err(e) = store.save(&job).await {
error!("Failed to persist backfill failure: {}", e);
}
}
}
}
pub async fn fail_backfill(&self, transform_id: &str, error_msg: String) {
let mut info_clone = None;
if let Some(hash) = self.transform_to_hash.lock().unwrap().get(transform_id) {
if let Some(info) = self.backfills.lock().unwrap().get_mut(hash) {
info.mark_failed(error_msg);
info_clone = Some(info.clone());
}
}
if let Some(info) = info_clone {
if let Some(store) = &self.progress_store {
let job = info.to_job(None);
if let Err(e) = store.save(&job).await {
error!("Failed to persist backfill failure: {}", e);
}
}
}
}
pub fn get_backfill(&self, transform_id: &str) -> Option<BackfillInfo> {
self.transform_to_hash
.lock()
.unwrap()
.get(transform_id)
.and_then(|hash| self.backfills.lock().unwrap().get(hash).cloned())
}
pub fn get_backfill_by_hash(&self, backfill_hash: &str) -> Option<BackfillInfo> {
self.backfills.lock().unwrap().get(backfill_hash).cloned()
}
pub async fn force_complete(&self, backfill_hash: &str) {
let mut info_clone = None;
{
let mut backfills = self.backfills.lock().unwrap();
if let Some(info) = backfills.get_mut(backfill_hash) {
if info.status == BackfillStatus::InProgress {
info.mark_completed();
info_clone = Some(info.clone());
}
}
}
if let Some(info) = info_clone {
if let Some(store) = &self.progress_store {
let job = info.to_job(None);
if let Err(e) = store.save(&job).await {
error!("Failed to persist backfill completion: {}", e);
}
}
}
}
pub fn get_all_backfills(&self) -> Vec<BackfillInfo> {
self.backfills.lock().unwrap().values().cloned().collect()
}
pub fn get_active_backfills(&self) -> Vec<BackfillInfo> {
self.backfills
.lock()
.unwrap()
.values()
.filter(|info| info.status == BackfillStatus::InProgress)
.cloned()
.collect()
}
pub fn cleanup_old_backfills(&self, keep_count: usize) {
let mut backfills = self.backfills.lock().unwrap();
let mut completed: Vec<_> = backfills
.iter()
.filter(|(_, info)| info.status == BackfillStatus::Completed)
.map(|(id, info)| (id.clone(), info.start_time))
.collect();
completed.sort_by_key(|(_, time)| *time);
if completed.len() > keep_count {
let to_remove = &completed[..completed.len() - keep_count];
for (id, _) in to_remove {
backfills.remove(id);
}
}
}
}
impl Default for BackfillTracker {
fn default() -> Self {
Self::new(None)
}
}