use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use common::peer::{SyncJob, SyncProvider};
#[derive(Debug, Clone)]
pub struct QueuedSyncConfig {
pub max_queue_size: Option<usize>,
}
impl Default for QueuedSyncConfig {
fn default() -> Self {
Self {
max_queue_size: Some(1000),
}
}
}
#[derive(Debug, Clone)]
pub struct QueuedSyncProvider {
tx: flume::Sender<SyncJob>,
}
impl QueuedSyncProvider {
pub fn new(config: QueuedSyncConfig) -> (Self, JobReceiver) {
let (tx, rx) = match config.max_queue_size {
Some(size) => {
tracing::info!("Creating bounded job queue with size {}", size);
flume::bounded(size)
}
None => {
tracing::info!("Creating unbounded job queue");
flume::unbounded()
}
};
(Self { tx }, JobReceiver { rx })
}
}
#[async_trait]
impl<L> SyncProvider<L> for QueuedSyncProvider
where
L: common::bucket_log::BucketLogProvider + Clone + Send + Sync + 'static,
L::Error: std::error::Error + Send + Sync + 'static,
{
async fn execute(&self, _peer: &common::peer::Peer<L>, job: SyncJob) -> Result<()> {
tracing::debug!("Queueing job for background execution: {:?}", job);
self.tx.try_send(job).map_err(|e| match e {
flume::TrySendError::Full(_) => {
anyhow::anyhow!("job queue is full - worker may be overloaded")
}
flume::TrySendError::Disconnected(_) => {
anyhow::anyhow!("job worker has been stopped")
}
})
}
}
#[derive(Debug)]
pub struct JobReceiver {
rx: flume::Receiver<SyncJob>,
}
impl JobReceiver {
pub fn into_async(self) -> flume::r#async::RecvStream<'static, SyncJob> {
self.rx.into_stream()
}
}
const MAX_CONCURRENT_PINGS: usize = 10;
const PERIODIC_PING_INTERVAL_SECS: u64 = 300;
pub async fn run_worker<L>(
peer: common::peer::Peer<L>,
mut job_stream: flume::r#async::RecvStream<'static, SyncJob>,
) where
L: common::bucket_log::BucketLogProvider + Clone + Send + Sync + 'static,
L::Error: std::error::Error + Send + Sync + 'static,
{
use common::peer::sync::{execute_job, ping_peer};
use futures::StreamExt;
use tokio::time::{interval, Duration};
tracing::info!("Starting background job worker for peer {}", peer.id());
let mut ping_interval = interval(Duration::from_secs(PERIODIC_PING_INTERVAL_SECS));
ping_interval.tick().await;
let ping_semaphore = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_PINGS));
let pings_in_flight = Arc::new(AtomicBool::new(false));
loop {
tokio::select! {
Some(job) = job_stream.next() => {
match job {
SyncJob::PingPeer(ping_job) => {
let peer = peer.clone();
let semaphore = ping_semaphore.clone();
tokio::spawn(async move {
let _permit = semaphore.acquire().await;
if let Err(e) = ping_peer::execute(&peer, ping_job).await {
tracing::error!("Ping job failed: {}", e);
}
});
}
job => {
if let Err(e) = execute_job(&peer, job).await {
tracing::error!("Job execution failed: {}", e);
}
}
}
}
_ = ping_interval.tick() => {
if pings_in_flight.load(Ordering::Relaxed) {
tracing::debug!("Skipping periodic pings — previous batch still running");
continue;
}
tracing::info!("Running periodic ping scheduler");
let peer = peer.clone();
let flag = pings_in_flight.clone();
flag.store(true, Ordering::Relaxed);
tokio::spawn(async move {
schedule_periodic_pings(&peer).await;
flag.store(false, Ordering::Relaxed);
});
}
else => {
tracing::info!("Job queue closed, shutting down worker");
break;
}
}
}
tracing::info!("Background job worker shutting down for peer {}", peer.id());
}
async fn schedule_periodic_pings<L>(peer: &common::peer::Peer<L>)
where
L: common::bucket_log::BucketLogProvider + Clone + Send + Sync + 'static,
L::Error: std::error::Error + Send + Sync + 'static,
{
let bucket_ids = match peer.logs().list_syncable_buckets().await {
Ok(ids) => ids,
Err(e) => {
tracing::error!("Failed to list buckets for periodic pings: {}", e);
return;
}
};
tracing::debug!("Scheduling periodic pings for {} buckets", bucket_ids.len());
for bucket_id in bucket_ids {
if let Err(e) = peer.ping(bucket_id).await {
tracing::warn!("Failed to ping peers for bucket {}: {}", bucket_id, e);
}
}
}