use crate::error::Error;
use crate::event::Event;
use crate::filters::apply_filter_fast;
use crate::list_props::{ListProps, Order, StartAfter};
use indexmap::IndexMap;
use std::fmt::Debug;
use std::sync::mpsc::Sender;
use std::time::{Duration, SystemTime};
use valu3::traits::ToValueBehavior;
use valu3::value::Value;
#[cfg(feature = "persist")]
use std::path::Path;
#[cfg(feature = "persist")]
use std::sync::mpsc::channel;
pub type Key = String;
#[inline(always)]
fn current_time_millis() -> u64 {
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}
#[derive(Clone, Debug)]
pub struct CacheItem {
pub value: Value,
pub created_at: u64,
pub ttl_millis: Option<u64>,
}
impl CacheItem {
#[inline]
pub fn new(value: Value) -> Self {
Self {
value,
created_at: current_time_millis(),
ttl_millis: None,
}
}
#[inline]
pub fn with_ttl(value: Value, ttl: Duration) -> Self {
Self {
value,
created_at: current_time_millis(),
ttl_millis: Some(ttl.as_millis() as u64),
}
}
#[inline(always)]
pub fn is_expired(&self) -> bool {
if let Some(ttl) = self.ttl_millis {
(current_time_millis() - self.created_at) > ttl
} else {
false
}
}
#[inline]
pub fn ttl(&self) -> Option<Duration> {
self.ttl_millis.map(Duration::from_millis)
}
#[inline]
pub fn created_at_time(&self) -> SystemTime {
std::time::UNIX_EPOCH + Duration::from_millis(self.created_at)
}
}
impl PartialEq for CacheItem {
fn eq(&self, other: &Self) -> bool {
self.value == other.value && self.ttl_millis == other.ttl_millis
}
}
#[derive(Clone, Debug)]
pub struct Cache {
map: IndexMap<Key, CacheItem>,
capacity: usize,
default_ttl: Option<Duration>,
sender: Option<Sender<Event>>,
#[cfg(feature = "persist")]
persist_path: Option<std::path::PathBuf>,
_phantom: std::marker::PhantomData<Value>,
}
impl PartialEq for Cache {
fn eq(&self, other: &Self) -> bool {
self.map == other.map
&& self.capacity == other.capacity
&& self.default_ttl == other.default_ttl
}
}
impl Cache {
pub fn new(capacity: usize) -> Self {
Self {
map: IndexMap::with_capacity(capacity),
capacity,
default_ttl: None,
sender: None,
#[cfg(feature = "persist")]
persist_path: None,
_phantom: std::marker::PhantomData,
}
}
pub fn with_sender(capacity: usize, sender: Sender<Event>) -> Self {
Self {
map: IndexMap::with_capacity(capacity),
capacity,
default_ttl: None,
sender: Some(sender),
#[cfg(feature = "persist")]
persist_path: None,
_phantom: std::marker::PhantomData,
}
}
pub fn with_default_ttl(capacity: usize, default_ttl: Duration) -> Self {
Self {
map: IndexMap::with_capacity(capacity),
capacity,
default_ttl: Some(default_ttl),
sender: None,
#[cfg(feature = "persist")]
persist_path: None,
_phantom: std::marker::PhantomData,
}
}
pub fn with_sender_and_ttl(
capacity: usize,
sender: Sender<Event>,
default_ttl: Duration,
) -> Self {
Self {
map: IndexMap::with_capacity(capacity),
capacity,
default_ttl: Some(default_ttl),
sender: Some(sender),
#[cfg(feature = "persist")]
persist_path: None,
_phantom: std::marker::PhantomData,
}
}
#[cfg(feature = "persist")]
pub fn with_persist<P: AsRef<Path>>(
path: P,
capacity: usize,
) -> Result<Self, Box<dyn std::error::Error>> {
use crate::sqlite_store::{ensure_db_file, items_from_db, spawn_writer, PersistentEvent};
let path = path.as_ref().to_path_buf();
ensure_db_file(&path)?;
let (event_tx, event_rx) = channel();
let (persist_tx, persist_rx) = channel();
spawn_writer(path.clone(), persist_rx);
let mut cache = Self::with_sender(capacity, event_tx);
cache.persist_path = Some(path.clone());
std::thread::spawn(move || {
while let Ok(event) = event_rx.recv() {
let persistent_event = PersistentEvent::new(event.clone());
if persist_tx.send(persistent_event).is_err() {
break;
}
}
});
let mut items = items_from_db(&path)?;
items.sort_by(|a, b| a.0.cmp(&b.0));
for (key, item) in items {
if cache.map.len() < capacity {
cache.map.insert(key, item);
}
}
Ok(cache)
}
#[cfg(feature = "persist")]
pub fn with_persist_and_sender<P: AsRef<Path>>(
path: P,
capacity: usize,
external_sender: Sender<Event>,
) -> Result<Self, Box<dyn std::error::Error>> {
use crate::sqlite_store::{ensure_db_file, items_from_db, spawn_writer, PersistentEvent};
let path = path.as_ref().to_path_buf();
ensure_db_file(&path)?;
let (event_tx, event_rx) = channel();
let (persist_tx, persist_rx) = channel();
spawn_writer(path.clone(), persist_rx);
let mut cache = Self::with_sender(capacity, event_tx);
cache.persist_path = Some(path.clone());
std::thread::spawn(move || {
while let Ok(event) = event_rx.recv() {
let _ = external_sender.send(event.clone());
let persistent_event = PersistentEvent::new(event);
if persist_tx.send(persistent_event).is_err() {
break;
}
}
});
let mut items = items_from_db(&path)?;
items.sort_by(|a, b| a.0.cmp(&b.0));
for (key, item) in items {
if cache.map.len() < capacity {
cache.map.insert(key, item);
}
}
Ok(cache)
}
#[cfg(feature = "persist")]
pub fn with_persist_and_ttl<P: AsRef<Path>>(
path: P,
capacity: usize,
default_ttl: Duration,
) -> Result<Self, Box<dyn std::error::Error>> {
use crate::sqlite_store::{ensure_db_file, items_from_db, spawn_writer, PersistentEvent};
let path = path.as_ref().to_path_buf();
ensure_db_file(&path)?;
let (event_tx, event_rx) = channel();
let (persist_tx, persist_rx) = channel();
spawn_writer(path.clone(), persist_rx);
let mut cache = Self::with_sender_and_ttl(capacity, event_tx, default_ttl);
cache.persist_path = Some(path.clone());
std::thread::spawn(move || {
while let Ok(event) = event_rx.recv() {
let persistent_event = PersistentEvent::new(event.clone());
if persist_tx.send(persistent_event).is_err() {
break;
}
}
});
let mut items = items_from_db(&path)?;
items.sort_by(|a, b| a.0.cmp(&b.0));
for (key, item) in items {
if !item.is_expired() && cache.map.len() < capacity {
cache.map.insert(key, item);
}
}
Ok(cache)
}
#[cfg(feature = "persist")]
pub fn with_persist_and_sender_and_ttl<P: AsRef<Path>>(
path: P,
capacity: usize,
external_sender: Sender<Event>,
default_ttl: Duration,
) -> Result<Self, Box<dyn std::error::Error>> {
use crate::sqlite_store::{ensure_db_file, items_from_db, spawn_writer, PersistentEvent};
let path = path.as_ref().to_path_buf();
ensure_db_file(&path)?;
let (event_tx, event_rx) = channel();
let (persist_tx, persist_rx) = channel();
spawn_writer(path.clone(), persist_rx);
let mut cache = Self::with_sender_and_ttl(capacity, event_tx, default_ttl);
cache.persist_path = Some(path.clone());
std::thread::spawn(move || {
while let Ok(event) = event_rx.recv() {
let _ = external_sender.send(event.clone());
let persistent_event = PersistentEvent::new(event);
if persist_tx.send(persistent_event).is_err() {
break;
}
}
});
let mut items = items_from_db(&path)?;
items.sort_by(|a, b| a.0.cmp(&b.0));
for (key, item) in items {
if !item.is_expired() && cache.map.len() < capacity {
cache.map.insert(key, item);
}
}
Ok(cache)
}
#[inline]
pub fn set_event(&mut self, sender: Sender<Event>) {
self.sender = Some(sender);
}
#[inline]
pub fn remove_event(&mut self) {
self.sender = None;
}
#[inline]
fn send_insert(&self, key: Key, value: Value) {
if let Some(sender) = &self.sender {
let event = Event::insert(key, value);
sender.send(event).unwrap();
}
}
#[inline]
fn send_remove(&self, key: Key, value: Value) {
if let Some(sender) = &self.sender {
let event = Event::remove(key, value);
sender.send(event).unwrap();
}
}
#[inline]
fn send_clear(&self) {
if let Some(sender) = &self.sender {
let event = Event::clear();
sender.send(event).unwrap();
}
}
pub fn insert<T, V>(&mut self, key: T, value: V)
where
T: Into<String>,
V: ToValueBehavior,
{
let key = key.into();
let item = if let Some(default_ttl) = self.default_ttl {
CacheItem::with_ttl(value.to_value(), default_ttl)
} else {
CacheItem::new(value.to_value())
};
if let Some(existing_item) = self.map.get(&key) {
if existing_item.value == item.value {
return;
}
}
if self.map.len() >= self.capacity && !self.map.contains_key(&key) {
if let Some((first_key, first_item)) = self.map.shift_remove_index(0) {
self.send_remove(first_key, first_item.value);
}
}
self.map.insert(key.clone(), item.clone());
self.send_insert(key, item.value);
}
pub fn insert_with_ttl<T, V>(&mut self, key: T, value: V, ttl: Duration)
where
T: Into<String> + Clone + AsRef<str>,
V: ToValueBehavior,
{
let key = key.into();
let item = CacheItem::with_ttl(value.to_value(), ttl);
if let Some(existing_item) = self.map.get(&key) {
if existing_item.value == item.value {
return;
}
}
if self.map.len() >= self.capacity && !self.map.contains_key(&key) {
if let Some((first_key, first_item)) = self.map.shift_remove_index(0) {
self.send_remove(first_key, first_item.value);
}
}
self.map.insert(key.clone(), item.clone());
self.send_insert(key.clone(), item.value.clone());
#[cfg(feature = "persist")]
if let Some(persist_path) = &self.persist_path {
if let Some(ttl_millis) = item.ttl_millis {
let _ = crate::sqlite_store::persist_item_with_ttl(
persist_path,
&key,
&item.value,
ttl_millis / 1000,
);
}
}
}
#[inline]
pub fn get(&mut self, key: &str) -> Option<&Value> {
let is_expired = match self.map.get(key) {
Some(item) => {
if let Some(ttl) = item.ttl_millis {
(current_time_millis() - item.created_at) > ttl
} else {
false
}
}
None => return None,
};
if is_expired {
if let Some(expired_item) = self.map.swap_remove(key) {
self.send_remove(key.to_string(), expired_item.value);
}
None
} else {
self.map.get(key).map(|item| &item.value)
}
}
#[inline(always)]
pub fn get_list(&self) -> Vec<&Key> {
self.map.keys().collect()
}
pub fn get_map(&self) -> IndexMap<Key, &Value> {
self.map
.iter()
.filter(|(_, item)| !item.is_expired())
.map(|(key, item)| (key.clone(), &item.value))
.collect()
}
pub fn get_mut(&mut self, key: &str) -> Option<&mut Value> {
let should_remove = self.map.get(key).map_or(false, |item| item.is_expired());
if should_remove {
self.remove(key).ok();
None
} else {
self.map.get_mut(key).map(|item| &mut item.value)
}
}
#[inline(always)]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn set_capacity(&mut self, capacity: usize) {
self.capacity = capacity;
}
pub fn remove(&mut self, key: &str) -> Result<(), Error> {
if let Some(item) = self.map.swap_remove(key) {
self.send_remove(key.to_string(), item.value);
Ok(())
} else {
Err(Error::KeyNotFound)
}
}
pub fn clear(&mut self) {
self.map.clear();
self.send_clear();
}
#[inline(always)]
pub fn len(&self) -> usize {
self.map.len()
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
pub fn contains_key(&mut self, key: &str) -> bool {
match self.map.get(key) {
Some(item) if item.is_expired() => {
self.remove(key).ok();
false
}
Some(_) => true,
None => false,
}
}
pub fn cleanup_expired(&mut self) -> usize {
let current_time = current_time_millis();
let mut expired_keys = Vec::with_capacity(self.map.len() / 4);
for (key, item) in &self.map {
if let Some(ttl) = item.ttl_millis {
if (current_time - item.created_at) > ttl {
expired_keys.push(key.clone());
}
}
}
let removed_count = expired_keys.len();
for key in expired_keys {
if let Some(item) = self.map.swap_remove(&key) {
self.send_remove(key, item.value);
}
}
removed_count
}
#[inline]
pub fn set_default_ttl(&mut self, ttl: Option<Duration>) {
self.default_ttl = ttl;
}
#[inline(always)]
pub fn get_default_ttl(&self) -> Option<Duration> {
self.default_ttl
}
pub fn list<T>(&mut self, props: T) -> Result<Vec<(Key, &Value)>, Error>
where
T: Into<ListProps>,
{
let props = props.into();
self.cleanup_expired();
let mut keys: Vec<String> = self.map.keys().cloned().collect();
keys.sort();
match props.order {
Order::Asc => self.resolve_order(keys.iter(), props),
Order::Desc => self.resolve_order(keys.iter().rev(), props),
}
}
fn resolve_order<'a, I>(
&self,
mut list_iter: I,
props: ListProps,
) -> Result<Vec<(Key, &Value)>, Error>
where
I: Iterator<Item = &'a String>,
{
if let StartAfter::Key(ref key) = props.start_after_key {
list_iter
.find(|k| k == &key)
.ok_or(Error::SortKeyNotFound)?;
}
let mut list = Vec::new();
if props.limit == 0 {
return Ok(list);
}
let mut count = 0;
for k in list_iter {
if let Some(item) = self.map.get(k) {
if item.is_expired() {
continue;
}
let filtered = if apply_filter_fast(k, &props.filter) {
Some((k.clone(), &item.value))
} else {
None
};
if let Some(item) = filtered {
list.push(item);
count += 1;
if count >= props.limit {
break;
}
}
}
}
Ok(list)
}
}