use std::collections::HashMap;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct PendingEntry {
pub id: String,
pub page: String,
pub r: u32,
pub message: String,
pub secret: bool,
pub token: String,
pub group: Option<String>,
pub created_at: Instant,
}
#[derive(Debug, Clone)]
pub enum FulfillState {
Pending,
Fulfilled(String),
Cancelled,
}
#[derive(Default)]
pub struct PendingQueue {
inner: Mutex<HashMap<String, (PendingEntry, FulfillState)>>,
cv: Condvar,
}
impl PendingQueue {
#[must_use]
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
#[must_use]
pub fn enqueue_and_wait(&self, entry: PendingEntry, timeout: Duration) -> Option<String> {
let id = entry.id.clone();
{
let mut guard = self.inner.lock().unwrap();
guard.insert(id.clone(), (entry, FulfillState::Pending));
}
let deadline = Instant::now() + timeout;
let mut guard = self.inner.lock().unwrap();
loop {
let remaining = match deadline.checked_duration_since(Instant::now()) {
Some(r) if !r.is_zero() => r,
_ => {
guard.remove(&id);
return None;
}
};
let (g, _) = self.cv.wait_timeout(guard, remaining).unwrap();
guard = g;
let (_, state) = guard.get(&id)?;
match state.clone() {
FulfillState::Pending => {}
FulfillState::Fulfilled(v) => {
guard.remove(&id);
return Some(v);
}
FulfillState::Cancelled => {
guard.remove(&id);
return None;
}
}
}
}
#[must_use]
pub fn list(&self) -> Vec<PendingEntry> {
let guard = self.inner.lock().unwrap();
guard
.values()
.filter(|(_, s)| matches!(s, FulfillState::Pending))
.map(|(e, _)| e.clone())
.collect()
}
pub fn fulfill(&self, id: &str, value: String) -> bool {
let mut guard = self.inner.lock().unwrap();
if let Some((_, state)) = guard.get_mut(id) {
if matches!(state, FulfillState::Pending) {
*state = FulfillState::Fulfilled(value);
self.cv.notify_all();
return true;
}
}
false
}
pub fn cancel(&self, id: &str) -> bool {
let mut guard = self.inner.lock().unwrap();
if let Some((_, state)) = guard.get_mut(id) {
if matches!(state, FulfillState::Pending) {
*state = FulfillState::Cancelled;
self.cv.notify_all();
return true;
}
}
false
}
#[must_use]
pub fn peek(&self, id: &str) -> Option<PendingEntry> {
let guard = self.inner.lock().unwrap();
guard
.get(id)
.filter(|(_, s)| matches!(s, FulfillState::Pending))
.map(|(e, _)| e.clone())
}
}
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::SystemTime;
static COUNTER: AtomicU64 = AtomicU64::new(0);
#[must_use]
pub fn new_id() -> String {
let nanos = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0_u128, |d| d.as_nanos());
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
#[allow(clippy::cast_possible_truncation)]
let n = nanos as u64;
let combined = n.wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ counter;
format!("p_{combined:016x}")
}