use crate::rpc::RpcError;
use async_trait::async_trait;
#[cfg(not(feature = "rt"))]
use parking_lot::Mutex as SyncMutex;
#[cfg(feature = "rt")]
use parking_lot_rt::Mutex as SyncMutex;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
pub struct Payload {
u: uuid::Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
n: Option<usize>,
}
impl From<Uuid> for Payload {
#[inline]
fn from(u: Uuid) -> Self {
Self { u, n: None }
}
}
impl Payload {
#[inline]
pub fn uuid(&self) -> &Uuid {
&self.u
}
#[inline]
pub fn bulk_number(&self) -> usize {
self.n.unwrap_or(1)
}
#[inline]
pub fn set_bulk_number(&mut self, n: usize) {
self.n = Some(n);
}
#[inline]
pub fn clear_bulk_number(&mut self) {
self.n = None;
}
}
pub struct Map {
data: Arc<RwLock<BTreeMap<uuid::Uuid, Box<dyn Cursor + Send + Sync>>>>,
cleaner: Option<JoinHandle<()>>,
}
impl Map {
pub fn new(cleaner_interval: Duration) -> Self {
let mut map = Self {
data: <_>::default(),
cleaner: <_>::default(),
};
let cleaner = map.spawn_cleaner(cleaner_interval);
map.cleaner.replace(cleaner);
map
}
pub async fn add<C>(&self, c: C) -> Uuid
where
C: Cursor + Send + Sync + 'static,
{
let u = Uuid::new_v4();
self.data.write().await.insert(u, Box::new(c));
u
}
pub async fn remove<C>(&self, u: &Uuid) {
self.data.write().await.remove(u);
}
pub async fn next(&self, cursor_id: &Uuid) -> Result<Option<Vec<u8>>, RpcError> {
if let Some(cursor) = self.data.read().await.get(cursor_id) {
cursor.meta().touch();
Ok(cursor.next().await?)
} else {
Err(RpcError::not_found(None))
}
}
pub async fn next_bulk(
&self,
cursor_id: &Uuid,
count: usize,
) -> Result<Option<Vec<u8>>, RpcError> {
if let Some(cursor) = self.data.read().await.get(cursor_id) {
cursor.meta().touch();
Ok(Some(cursor.next_bulk(count).await?))
} else {
Err(RpcError::not_found(None))
}
}
fn spawn_cleaner(&self, interval: Duration) -> JoinHandle<()> {
let cursors = self.data.clone();
let mut int = tokio::time::interval(interval);
tokio::spawn(async move {
loop {
int.tick().await;
cursors.write().await.retain(|_, v| v.meta().is_alive());
}
})
}
}
impl Drop for Map {
fn drop(&mut self) {
if let Some(cleaner) = self.cleaner.take() {
cleaner.abort();
}
}
}
#[async_trait]
pub trait Cursor {
async fn next(&self) -> Result<Option<Vec<u8>>, RpcError>;
async fn next_bulk(&self, count: usize) -> Result<Vec<u8>, RpcError>;
fn meta(&self) -> &Meta;
}
pub struct Meta {
finished: atomic::AtomicBool,
expires: SyncMutex<Instant>,
ttl: Duration,
}
impl Meta {
#[inline]
pub fn new(ttl: Duration) -> Self {
Self {
expires: SyncMutex::new(Instant::now() + ttl),
finished: <_>::default(),
ttl,
}
}
#[inline]
pub fn is_finished(&self) -> bool {
self.finished.load(atomic::Ordering::SeqCst)
}
#[inline]
pub fn is_expired(&self) -> bool {
*self.expires.lock() < Instant::now()
}
#[inline]
pub fn mark_finished(&self) {
self.finished.store(true, atomic::Ordering::SeqCst);
}
#[inline]
pub fn is_alive(&self) -> bool {
!self.is_finished() && !self.is_expired()
}
#[inline]
fn touch(&self) {
let mut expires = self.expires.lock();
*expires = Instant::now() + self.ttl;
}
}