use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix::{
Actor, ActorFutureExt, Addr, ArbiterHandle, AsyncContext, Context, Handler, Message, WrapFuture,
};
use tracing::{debug, info, warn};
use crate::handlers::state_delta::{handle_state_delta, StateDeltaContext, StateDeltaMessage};
pub const STATE_DELTA_CHANNEL_CAPACITY: usize = 2048;
const STATE_DELTA_PROCESSING_TIMEOUT: Duration = Duration::from_secs(60);
const SUMMARY_INTERVAL: Duration = Duration::from_secs(60);
struct InFlightGuard {
counter: Arc<AtomicU64>,
}
impl InFlightGuard {
fn new(counter: Arc<AtomicU64>) -> Self {
let _prev = counter.fetch_add(1, Ordering::Relaxed);
Self { counter }
}
}
impl Drop for InFlightGuard {
fn drop(&mut self) {
let _prev = self.counter.fetch_sub(1, Ordering::Relaxed);
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct StateDeltaJob {
pub(crate) context: StateDeltaContext,
pub(crate) message: StateDeltaMessage,
}
#[derive(Clone, Debug)]
pub struct StateDeltaSender {
addr: Addr<StateDeltaActor>,
dropped_total: Arc<AtomicU64>,
}
#[derive(Debug)]
pub enum StateDeltaSendError {
Full,
Closed,
}
impl StateDeltaSender {
pub fn try_send(&self, job: StateDeltaJob) -> Result<(), StateDeltaSendError> {
match self.addr.try_send(job) {
Ok(()) => Ok(()),
Err(actix::dev::SendError::Full(_)) => {
let _prev = self.dropped_total.fetch_add(1, Ordering::Relaxed);
Err(StateDeltaSendError::Full)
}
Err(actix::dev::SendError::Closed(_)) => Err(StateDeltaSendError::Closed),
}
}
}
pub struct StateDeltaActor {
in_flight: Arc<AtomicU64>,
processed_total: Arc<AtomicU64>,
error_total: Arc<AtomicU64>,
timeout_total: Arc<AtomicU64>,
dropped_total: Arc<AtomicU64>,
}
impl StateDeltaActor {
fn new(dropped_total: Arc<AtomicU64>) -> Self {
Self {
in_flight: Arc::new(AtomicU64::new(0)),
processed_total: Arc::new(AtomicU64::new(0)),
error_total: Arc::new(AtomicU64::new(0)),
timeout_total: Arc::new(AtomicU64::new(0)),
dropped_total,
}
}
fn log_summary(&self) {
let processed = self.processed_total.load(Ordering::Relaxed);
let errors = self.error_total.load(Ordering::Relaxed);
let timeouts = self.timeout_total.load(Ordering::Relaxed);
let dropped = self.dropped_total.load(Ordering::Relaxed);
let in_flight = self.in_flight.load(Ordering::Relaxed);
info!(
processed_total = processed,
error_total = errors,
timeout_total = timeouts,
dropped_total = dropped,
in_flight,
"StateDelta actor summary"
);
}
}
impl Actor for StateDeltaActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("StateDelta actor started on dedicated Arbiter");
let _handle = ctx.run_interval(SUMMARY_INTERVAL, |actor, _ctx| {
actor.log_summary();
});
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
self.log_summary();
info!("StateDelta actor stopped");
}
}
impl Handler<StateDeltaJob> for StateDeltaActor {
type Result = ();
fn handle(&mut self, job: StateDeltaJob, ctx: &mut Self::Context) {
let processed_total = Arc::clone(&self.processed_total);
let error_total = Arc::clone(&self.error_total);
let timeout_total = Arc::clone(&self.timeout_total);
let in_flight_guard = InFlightGuard::new(Arc::clone(&self.in_flight));
let StateDeltaJob { context, message } = job;
let context_id = message.context_id;
let delta_id = message.delta_id;
let work = async move {
let _guard = in_flight_guard;
let started = Instant::now();
let outcome = tokio::time::timeout(
STATE_DELTA_PROCESSING_TIMEOUT,
handle_state_delta(context, message),
)
.await;
match &outcome {
Ok(Ok(())) => {
let _prev = processed_total.fetch_add(1, Ordering::Relaxed);
}
Ok(Err(_)) => {
let _prev = error_total.fetch_add(1, Ordering::Relaxed);
}
Err(_elapsed) => {
let _prev = timeout_total.fetch_add(1, Ordering::Relaxed);
}
}
(outcome, started)
};
let _spawn_handle = ctx.spawn(work.into_actor(self).map(
move |(outcome, started), _act, _ctx| match outcome {
Ok(Ok(())) => {
debug!(
%context_id,
?delta_id,
elapsed_ms = started.elapsed().as_millis(),
"StateDelta worker completed"
);
}
Ok(Err(err)) => {
warn!(?err, %context_id, ?delta_id, "Failed to handle state delta");
}
Err(_elapsed) => {
warn!(
%context_id,
?delta_id,
timeout_secs = STATE_DELTA_PROCESSING_TIMEOUT.as_secs(),
elapsed_ms = started.elapsed().as_millis(),
"StateDelta worker exceeded processing budget — dropping delta, sync recovery will retry (#2199)"
);
}
},
));
}
}
pub fn start_state_delta_actor(arbiter: &ArbiterHandle, capacity: usize) -> StateDeltaSender {
let dropped_total = Arc::new(AtomicU64::new(0));
let dropped_for_actor = Arc::clone(&dropped_total);
let addr = StateDeltaActor::start_in_arbiter(arbiter, move |ctx| {
ctx.set_mailbox_capacity(capacity);
StateDeltaActor::new(dropped_for_actor)
});
StateDeltaSender {
addr,
dropped_total,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[actix::test]
async fn sender_clones_and_starts_with_zero_drops() {
let arbiter = actix::Arbiter::new();
let sender = start_state_delta_actor(&arbiter.handle(), 8);
assert_eq!(sender.dropped_total.load(Ordering::Relaxed), 0);
let _clone = sender.clone();
let _stopped = arbiter.stop();
}
}