use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use actix::{Actor, Addr, Handler, SyncArbiter, SyncContext};
use actix_storage::dev::actor::{
ExpiryRequest, ExpiryResponse, ExpiryStoreRequest, ExpiryStoreResponse, StoreRequest,
StoreResponse,
};
use crate::SledConfig;
mod delay;
mod flags;
mod inner;
mod utils;
#[cfg(test)]
mod tests;
pub use self::flags::ExpiryFlags;
use self::inner::SledActorInner;
pub use utils::{decode, decode_mut, encode};
#[derive(Clone)]
pub struct SledActor {
inner: SledActorInner,
perform_deletion: bool,
scan_db_on_start: bool,
#[doc(hidden)]
stopped: Arc<AtomicBool>,
}
impl SledActor {
#[must_use = "Actor should be started by calling start method"]
pub fn perform_deletion(mut self, to: bool) -> Self {
self.perform_deletion = to;
self
}
#[must_use = "Actor should be started by calling start method"]
pub fn scan_db_on_start(mut self, to: bool) -> Self {
self.scan_db_on_start = to;
self
}
#[must_use = "Actor should be started by calling start method"]
pub fn from_db(db: sled::Db) -> Self {
Self {
inner: SledActorInner::from_db(db),
perform_deletion: false,
scan_db_on_start: false,
stopped: Arc::new(AtomicBool::new(false)),
}
}
pub fn start(self, threads_num: usize) -> Addr<Self> {
SyncArbiter::start(threads_num, move || self.clone())
}
fn spawn_expiry_thread(&self) {
let mut inner = self.inner.clone();
let stopped = self.stopped.clone();
std::thread::spawn(move || loop {
inner.try_delete_expired_item_for(Duration::from_secs(1));
if stopped.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
});
}
}
impl Actor for SledActor {
type Context = SyncContext<Self>;
fn started(&mut self, _: &mut Self::Context) {
if self.scan_db_on_start && self.perform_deletion {
self.inner.scan_db();
}
if self.perform_deletion {
self.spawn_expiry_thread();
}
}
fn stopped(&mut self, _: &mut Self::Context) {
loop {
if self
.stopped
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::Acquire,
std::sync::atomic::Ordering::Acquire,
)
.is_ok()
{
break;
}
}
}
}
impl Handler<StoreRequest> for SledActor {
type Result = StoreResponse;
fn handle(&mut self, msg: StoreRequest, _: &mut Self::Context) -> Self::Result {
match msg {
StoreRequest::Set(scope, key, value) => {
StoreResponse::Set(self.inner.set(scope, key, value))
}
StoreRequest::Get(scope, key) => StoreResponse::Get(self.inner.get(scope, key)),
StoreRequest::Delete(scope, key) => {
StoreResponse::Delete(self.inner.delete(scope, key))
}
StoreRequest::Contains(scope, key) => {
StoreResponse::Contains(self.inner.contains(scope, key))
}
}
}
}
impl Handler<ExpiryRequest> for SledActor {
type Result = ExpiryResponse;
fn handle(&mut self, msg: ExpiryRequest, _: &mut Self::Context) -> Self::Result {
match msg {
ExpiryRequest::Set(scope, key, expires_in) => {
ExpiryResponse::Set(self.inner.set_expiry(scope, key, expires_in))
}
ExpiryRequest::Persist(scope, key) => {
ExpiryResponse::Persist(self.inner.persist(scope, key))
}
ExpiryRequest::Get(scope, key) => {
ExpiryResponse::Get(self.inner.get_expiry(scope, key))
}
ExpiryRequest::Extend(scope, key, duration) => {
ExpiryResponse::Extend(self.inner.extend_expiry(scope, key, duration))
}
}
}
}
impl Handler<ExpiryStoreRequest> for SledActor {
type Result = ExpiryStoreResponse;
fn handle(&mut self, msg: ExpiryStoreRequest, _: &mut Self::Context) -> Self::Result {
match msg {
ExpiryStoreRequest::SetExpiring(scope, key, value, duration) => {
ExpiryStoreResponse::SetExpiring(
self.inner.set_expiring(scope, key, value, duration),
)
}
ExpiryStoreRequest::GetExpiring(scope, key) => {
ExpiryStoreResponse::GetExpiring(self.inner.get_expiring(scope, key))
}
}
}
}
pub trait ToActorExt {
#[must_use = "Actor should be started by calling start method"]
fn to_actor(&self) -> Result<SledActor, sled::Error>;
}
impl ToActorExt for SledConfig {
fn to_actor(&self) -> Result<SledActor, sled::Error> {
let db = self.open()?;
Ok(SledActor::from_db(db))
}
}