use crate::error::AntibotError;
use crate::types::{Solution, SolutionSource};
use dashmap::DashMap;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::Notify;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CoalesceKey {
Domain,
Url,
}
type SharedResult = Result<Arc<Solution>, Arc<String>>;
struct InflightSolve {
notify: Arc<Notify>,
result: Arc<parking_lot_mutex::Mutex<Option<SharedResult>>>,
}
#[derive(Clone)]
pub(crate) struct SolveCoalescer {
inflight: Arc<DashMap<String, InflightSolve>>,
key_strategy: CoalesceKey,
}
impl SolveCoalescer {
pub fn new(key_strategy: CoalesceKey) -> Self {
Self {
inflight: Arc::new(DashMap::new()),
key_strategy,
}
}
pub fn key_for(&self, url: &str) -> Option<String> {
match self.key_strategy {
CoalesceKey::Domain => crate::session_cache::extract_domain(url),
CoalesceKey::Url => Some(url.to_string()),
}
}
pub async fn solve_or_wait<F, Fut>(
&self,
key: String,
solver: F,
) -> Result<Solution, AntibotError>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<Solution, AntibotError>>,
{
if let Some(existing) = self.inflight.get(&key) {
let notify = existing.notify.clone();
let result_holder = existing.result.clone();
drop(existing);
notify.notified().await;
let snapshot = result_holder.lock().clone();
return match snapshot {
Some(Ok(arc_sol)) => Ok(stamp_cached((*arc_sol).clone())),
Some(Err(arc_msg)) => Err(AntibotError::CoalescedFailure((*arc_msg).clone())),
None => Err(AntibotError::CoalescedFailure(
"leader vanished without producing a result".to_string(),
)),
};
}
let notify = Arc::new(Notify::new());
let result_holder = Arc::new(parking_lot_mutex::Mutex::new(None));
let entry = self.inflight.entry(key.clone()).or_insert_with(|| InflightSolve {
notify: notify.clone(),
result: result_holder.clone(),
});
let we_are_leader = Arc::ptr_eq(&entry.notify, ¬ify);
let actual_notify = entry.notify.clone();
let actual_result = entry.result.clone();
drop(entry);
if !we_are_leader {
actual_notify.notified().await;
let snapshot = actual_result.lock().clone();
return match snapshot {
Some(Ok(arc_sol)) => Ok(stamp_cached((*arc_sol).clone())),
Some(Err(arc_msg)) => Err(AntibotError::CoalescedFailure((*arc_msg).clone())),
None => Err(AntibotError::CoalescedFailure(
"leader vanished without producing a result".to_string(),
)),
};
}
let outcome = solver().await;
let shared: SharedResult = match &outcome {
Ok(sol) => Ok(Arc::new(sol.clone())),
Err(e) => Err(Arc::new(e.to_string())),
};
*result_holder.lock() = Some(shared);
self.inflight.remove(&key);
notify.notify_waiters();
outcome
}
}
fn stamp_cached(mut sol: Solution) -> Solution {
let age = sol.solved_at.elapsed().unwrap_or_default();
sol.source = SolutionSource::Cached { age };
sol
}
mod parking_lot_mutex {
use std::sync::Mutex as StdMutex;
pub struct Mutex<T>(StdMutex<T>);
impl<T> Mutex<T> {
pub fn new(value: T) -> Self {
Self(StdMutex::new(value))
}
pub fn lock(&self) -> std::sync::MutexGuard<'_, T> {
self.0.lock().unwrap_or_else(|e| e.into_inner())
}
}
}