use std::time::Duration;
use rand::seq::SliceRandom;
use tokio_util::sync::CancellationToken;
use crate::backfill::{
BackfillError,
checkpoint::{Checkpoint, NoopCheckpoint},
};
#[derive(Default)]
pub struct BackfillConfig {
pub sync_host: String,
pub workers: Option<usize>,
pub batch_size: Option<usize>,
pub checkpoint: Option<Box<dyn Checkpoint>>,
}
pub struct BackfillStats {
pub repos_downloaded: u64,
pub repos_failed: u64,
pub elapsed: Duration,
}
pub struct BackfillEngine {
#[allow(dead_code)]
sync_host: String,
#[allow(dead_code)]
workers: usize,
#[allow(dead_code)]
batch_size: usize,
checkpoint: Box<dyn Checkpoint>,
}
impl BackfillEngine {
pub fn new(config: BackfillConfig) -> Self {
BackfillEngine {
sync_host: config.sync_host,
workers: config.workers.unwrap_or(50),
batch_size: config.batch_size.unwrap_or(100_000),
checkpoint: config
.checkpoint
.unwrap_or_else(|| Box::new(NoopCheckpoint)),
}
}
pub async fn run(&self, cancel: CancellationToken) -> Result<BackfillStats, BackfillError> {
let start = tokio::time::Instant::now();
let _cursor = self.checkpoint.load().await?;
cancel.cancelled().await;
self.checkpoint.save("").await?;
Ok(BackfillStats {
repos_downloaded: 0,
repos_failed: 0,
elapsed: start.elapsed(),
})
}
}
pub fn shuffle_batch<T>(batch: &mut [T]) {
let mut rng = rand::rng();
batch.shuffle(&mut rng);
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::unreachable
)]
mod tests {
use super::*;
#[tokio::test]
async fn engine_respects_cancellation() {
let engine = BackfillEngine::new(BackfillConfig {
sync_host: "https://bsky.network".into(),
..Default::default()
});
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
cancel_clone.cancel();
});
let stats = engine.run(cancel).await.unwrap();
assert!(stats.elapsed < Duration::from_secs(5));
}
#[test]
fn shuffle_batch_preserves_elements() {
let mut batch: Vec<u32> = (0..100).collect();
let original = batch.clone();
shuffle_batch(&mut batch);
batch.sort();
assert_eq!(batch, original);
}
#[test]
fn engine_resolves_defaults() {
let engine = BackfillEngine::new(BackfillConfig {
sync_host: "https://bsky.network".into(),
..Default::default()
});
assert_eq!(engine.workers, 50);
assert_eq!(engine.batch_size, 100_000);
}
#[test]
fn engine_overrides() {
let engine = BackfillEngine::new(BackfillConfig {
sync_host: "https://bsky.network".into(),
workers: Some(10),
batch_size: Some(500),
..Default::default()
});
assert_eq!(engine.workers, 10);
assert_eq!(engine.batch_size, 500);
}
}