use std::collections::BTreeMap;
use std::time::Duration;
use tokio::time::sleep;
use nodedb_cluster::calvin::sequencer::inbox::Inbox;
use nodedb_cluster::calvin::types::TxClass;
use nodedb_types::TenantId;
use super::circuit_breaker::{CircuitBreaker, CircuitState};
use super::config::OllpConfig;
use super::error::OllpError;
use super::metrics::{
CIRCUIT_CLOSED, CIRCUIT_HALF_OPEN, CIRCUIT_OPEN, OUTCOME_CIRCUIT_OPEN, OUTCOME_RETRIED,
OUTCOME_SUCCEEDED, OllpMetrics,
};
use super::rate_bucket::RateBucket;
use nodedb_cluster::error::CalvinError;
pub const OLLP_RETRY_REQUIRED_CODE: u16 = 0x_CAAD;
pub struct OllpOrchestrator {
config: OllpConfig,
metrics: OllpMetrics,
circuit_breakers: std::sync::Mutex<BTreeMap<u64, CircuitBreaker>>,
tenant_budgets: std::sync::Mutex<BTreeMap<u64, RateBucket>>,
}
impl OllpOrchestrator {
pub fn new(config: OllpConfig) -> Self {
Self {
config,
metrics: OllpMetrics::default(),
circuit_breakers: std::sync::Mutex::new(BTreeMap::new()),
tenant_budgets: std::sync::Mutex::new(BTreeMap::new()),
}
}
pub async fn submit_with_retry(
&self,
inbox: &Inbox,
predicate_class: u64,
tenant_id: TenantId,
tx_builder: impl Fn() -> Result<TxClass, CalvinError>,
) -> Result<u64, OllpError> {
let circuit_state = {
let mut breakers = self
.circuit_breakers
.lock()
.unwrap_or_else(|p| p.into_inner());
let cb = breakers.entry(predicate_class).or_insert_with(|| {
CircuitBreaker::new(
self.config.circuit_capacity,
self.config.circuit_window,
self.config.circuit_threshold_pct,
self.config.circuit_open_duration,
self.config.circuit_close_successes,
)
});
let state = cb.state();
let state_code = match state {
CircuitState::Closed => CIRCUIT_CLOSED,
CircuitState::HalfOpen => CIRCUIT_HALF_OPEN,
CircuitState::Open => CIRCUIT_OPEN,
};
self.metrics
.record_circuit_state(predicate_class, state_code);
state
};
if circuit_state == CircuitState::Open {
self.metrics
.record_retry_outcome(predicate_class, OUTCOME_CIRCUIT_OPEN);
return Err(OllpError::CircuitOpen { predicate_class });
}
let tx_class = tx_builder().map_err(|e| match e {
CalvinError::Sequencer(s) => OllpError::Sequencer(s),
_other => OllpError::Sequencer(
nodedb_cluster::calvin::sequencer::error::SequencerError::Unavailable,
),
})?;
{
let mut budgets = self
.tenant_budgets
.lock()
.unwrap_or_else(|p| p.into_inner());
budgets.entry(tenant_id.as_u64()).or_insert_with(|| {
RateBucket::new(
self.config.tenant_budget_per_minute,
Duration::from_secs(60),
)
});
}
match inbox.submit(tx_class) {
Ok(inbox_seq) => {
{
let mut breakers = self
.circuit_breakers
.lock()
.unwrap_or_else(|p| p.into_inner());
if let Some(cb) = breakers.get_mut(&predicate_class) {
cb.record_success();
}
}
self.metrics
.record_retry_outcome(predicate_class, OUTCOME_SUCCEEDED);
Ok(inbox_seq)
}
Err(e) => Err(OllpError::Sequencer(e)),
}
}
pub fn ollp_max_retries(&self) -> u8 {
self.config.ollp_max_retries.min(u8::MAX as u32) as u8
}
pub fn render_prometheus(&self) -> String {
self.metrics.render_prometheus()
}
pub async fn on_retry_required(&self, predicate_class: u64, retry_count: u32) {
{
let mut breakers = self
.circuit_breakers
.lock()
.unwrap_or_else(|p| p.into_inner());
let cb = breakers.entry(predicate_class).or_insert_with(|| {
CircuitBreaker::new(
self.config.circuit_capacity,
self.config.circuit_window,
self.config.circuit_threshold_pct,
self.config.circuit_open_duration,
self.config.circuit_close_successes,
)
});
cb.record_retry();
}
let backoff = {
let shift = retry_count.min(31);
let factor = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
let multiplied = self.config.backoff_initial.saturating_mul(factor);
multiplied.min(self.config.backoff_max)
};
let backoff_ms = backoff.as_millis() as u64;
self.metrics.record_backoff_ms(predicate_class, backoff_ms);
self.metrics
.record_retry_outcome(predicate_class, OUTCOME_RETRIED);
sleep(backoff).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn adaptive_backoff_doubles_per_retry() {
let config = OllpConfig {
backoff_initial: Duration::from_millis(10),
backoff_max: Duration::from_secs(5),
..OllpConfig::default()
};
let initial = config.backoff_initial;
let max = config.backoff_max;
let shift = 0u32;
let factor0 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
let backoff0 = initial.saturating_mul(factor0).min(max);
assert_eq!(backoff0, Duration::from_millis(10));
let shift = 1u32;
let factor1 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
let backoff1 = initial.saturating_mul(factor1).min(max);
assert_eq!(backoff1, Duration::from_millis(20));
let shift = 2u32;
let factor2 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
let backoff2 = initial.saturating_mul(factor2).min(max);
assert_eq!(backoff2, Duration::from_millis(40));
}
#[test]
fn adaptive_backoff_caps_at_5s() {
let config = OllpConfig {
backoff_initial: Duration::from_millis(10),
backoff_max: Duration::from_secs(5),
..OllpConfig::default()
};
let initial = config.backoff_initial;
let max = config.backoff_max;
let shift = 20u32;
let factor = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
let backoff = initial.saturating_mul(factor).min(max);
assert_eq!(backoff, Duration::from_secs(5));
}
#[test]
fn tenant_budget_exceeded_at_1000_per_min() {
let mut bucket = RateBucket::new(1000, Duration::from_secs(60));
for _ in 0..1000 {
assert!(!bucket.record_and_check());
}
assert!(bucket.record_and_check());
}
}