use std::time::Duration;
use crate::error::Error;
const MAX_ATTEMPTS: usize = 5;
const BACKOFFS: [Duration; MAX_ATTEMPTS - 1] = [
Duration::from_millis(50),
Duration::from_millis(100),
Duration::from_millis(200),
Duration::from_millis(400),
];
pub async fn retry_on_schema_change<F, Fut, T>(mut op: F) -> Result<T, Error>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, Error>>,
{
let mut last_err: Option<Error> = None;
for attempt in 0..MAX_ATTEMPTS {
match op().await {
Ok(value) => return Ok(value),
Err(Error::RetryableSchemaChanged { descriptor }) => {
tracing::debug!(
attempt,
descriptor = %descriptor,
"pgwire: retrying plan after schema change"
);
last_err = Some(Error::RetryableSchemaChanged { descriptor });
if let Some(backoff) = BACKOFFS.get(attempt) {
tokio::time::sleep(*backoff).await;
}
}
Err(other) => return Err(other),
}
}
Err(last_err.unwrap_or_else(|| Error::PlanError {
detail: "retry_on_schema_change: no attempts recorded".into(),
}))
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test]
async fn first_attempt_success() {
let calls = AtomicUsize::new(0);
let result: Result<i32, Error> = retry_on_schema_change(|| {
let c = calls.fetch_add(1, Ordering::SeqCst);
async move { Ok(c as i32) }
})
.await;
assert_eq!(result.unwrap(), 0);
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn retries_on_schema_change_then_succeeds() {
let calls = AtomicUsize::new(0);
let result: Result<&str, Error> = retry_on_schema_change(|| {
let n = calls.fetch_add(1, Ordering::SeqCst);
async move {
if n < 2 {
Err(Error::RetryableSchemaChanged {
descriptor: format!("attempt {n}"),
})
} else {
Ok("done")
}
}
})
.await;
assert_eq!(result.unwrap(), "done");
assert_eq!(calls.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn surfaces_error_after_budget_exhausted() {
let calls = AtomicUsize::new(0);
let result: Result<(), Error> = retry_on_schema_change(|| {
calls.fetch_add(1, Ordering::SeqCst);
async move {
Err(Error::RetryableSchemaChanged {
descriptor: "orders".into(),
})
}
})
.await;
assert!(matches!(result, Err(Error::RetryableSchemaChanged { .. })));
assert_eq!(calls.load(Ordering::SeqCst), MAX_ATTEMPTS);
}
#[tokio::test]
async fn non_retryable_error_surfaces_immediately() {
let calls = AtomicUsize::new(0);
let result: Result<(), Error> = retry_on_schema_change(|| {
calls.fetch_add(1, Ordering::SeqCst);
async move {
Err(Error::PlanError {
detail: "syntax error".into(),
})
}
})
.await;
assert!(matches!(result, Err(Error::PlanError { .. })));
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
}