use std::collections::HashMap;
use std::hash::Hash;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use parking_lot::Mutex;
use graphrefly_core::{
monotonic_ns, wall_clock_ns, Core, CoreMailbox, HandleId, Message, NodeId, SubscriptionId,
};
use crate::backend::{
HashMapBackend, IndexBackend, IndexRow, ListBackend, LogBackend, MapBackend, VecIndexBackend,
VecListBackend, VecLogBackend,
};
use crate::changeset::{
BaseChange, DeleteReason, IndexChange, Lifecycle, ListChange, LogChange, MapChange, Version,
};
pub type InternFn<S> = Arc<dyn Fn(S) -> HandleId + Send + Sync>;
struct EmitHandle<S> {
node_id: NodeId,
intern: InternFn<S>,
version: AtomicU64,
}
impl<S> EmitHandle<S> {
fn emit(&self, core: &Core, snapshot: S) -> Version {
let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
let handle = (self.intern)(snapshot);
core.emit(self.node_id, handle);
Version::Counter(ver)
}
}
struct SinkEmitter<S> {
mailbox: Arc<CoreMailbox>,
node_id: NodeId,
intern: InternFn<S>,
version: AtomicU64,
}
impl<S> SinkEmitter<S> {
fn emit(&self, snapshot: S) -> Version {
let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
let handle = (self.intern)(snapshot);
let _ = self.mailbox.post_emit(self.node_id, handle);
Version::Counter(ver)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct IndexOutOfBounds;
impl std::fmt::Display for IndexOutOfBounds {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("index out of bounds")
}
}
impl std::error::Error for IndexOutOfBounds {}
pub struct ReactiveLog<T: Clone + Send + Sync + 'static> {
inner: Arc<Mutex<LogInner<T>>>,
emitter: EmitHandle<Vec<T>>,
pub node_id: NodeId,
}
struct LogInner<T: Clone + Send + Sync + 'static> {
backend: Box<dyn LogBackend<T>>,
mutation_log: Option<Vec<BaseChange<LogChange<T>>>>,
structure_name: String,
}
impl<T: Clone + Send + Sync + 'static> LogInner<T> {
fn record(&mut self, change: LogChange<T>, version: Version) {
if let Some(log) = &mut self.mutation_log {
log.push(BaseChange {
structure: self.structure_name.clone(),
version,
t_ns: wall_clock_ns(),
seq: None,
lifecycle: Lifecycle::Data,
change,
});
}
}
}
pub struct ReactiveLogOptions<T: Clone + Send + Sync + 'static> {
pub name: String,
pub max_size: Option<usize>,
pub backend: Option<Box<dyn LogBackend<T>>>,
pub mutation_log: bool,
}
impl<T: Clone + Send + Sync + 'static> Default for ReactiveLogOptions<T> {
fn default() -> Self {
Self {
name: "reactiveLog".into(),
max_size: None,
backend: None,
mutation_log: false,
}
}
}
impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
#[must_use]
pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveLogOptions<T>) -> Self {
let node_id = core
.register_state(HandleId::new(0), false)
.expect("register_state for ReactiveLog");
let backend: Box<dyn LogBackend<T>> = opts
.backend
.unwrap_or_else(|| Box::new(VecLogBackend::new(opts.max_size)));
let mutation_log = if opts.mutation_log {
Some(Vec::new())
} else {
None
};
let inner = LogInner {
backend,
mutation_log,
structure_name: opts.name,
};
Self {
inner: Arc::new(Mutex::new(inner)),
emitter: EmitHandle {
node_id,
intern,
version: AtomicU64::new(0),
},
node_id,
}
}
#[must_use]
pub fn size(&self) -> usize {
self.inner.lock().backend.size()
}
#[must_use]
pub fn at(&self, index: i64) -> Option<T> {
self.inner.lock().backend.at(index)
}
pub fn append(&self, core: &Core, value: T) {
let (snapshot, change) = {
let mut inner = self.inner.lock();
let change = inner.mutation_log.is_some().then(|| LogChange::Append {
value: value.clone(),
});
inner.backend.append(value);
(inner.backend.to_vec(), change)
};
let version = self.emitter.emit(core, snapshot);
if let Some(change) = change {
self.inner.lock().record(change, version);
}
}
pub fn append_many(&self, core: &Core, values: Vec<T>) {
if values.is_empty() {
return;
}
let (snapshot, change) = {
let mut inner = self.inner.lock();
let change = inner.mutation_log.is_some().then(|| LogChange::AppendMany {
values: values.clone(),
});
inner.backend.append_many(values);
(inner.backend.to_vec(), change)
};
let version = self.emitter.emit(core, snapshot);
if let Some(change) = change {
self.inner.lock().record(change, version);
}
}
pub fn clear(&self, core: &Core) {
let (snapshot, count) = {
let mut inner = self.inner.lock();
let count = inner.backend.clear();
if count == 0 {
return;
}
(inner.backend.to_vec(), count)
};
let version = self.emitter.emit(core, snapshot);
self.inner
.lock()
.record(LogChange::Clear { count }, version);
}
pub fn trim_head(&self, core: &Core, n: usize) {
if n == 0 {
return;
}
let (snapshot, actual) = {
let mut inner = self.inner.lock();
let actual = inner.backend.trim_head(n);
if actual == 0 {
return;
}
(inner.backend.to_vec(), actual)
};
let version = self.emitter.emit(core, snapshot);
self.inner
.lock()
.record(LogChange::TrimHead { n: actual }, version);
}
#[must_use]
pub fn to_vec(&self) -> Vec<T> {
self.inner.lock().backend.to_vec()
}
#[must_use]
pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<LogChange<T>>>> {
self.inner.lock().mutation_log.clone()
}
}
pub enum ViewSpec {
Tail { n: usize },
Slice { start: usize, stop: Option<usize> },
FromCursor {
cursor_node: NodeId,
read_cursor: Arc<dyn Fn(HandleId) -> usize + Send + Sync>,
},
}
pub struct ReactiveSub {
subs: Vec<(NodeId, SubscriptionId)>,
}
impl ReactiveSub {
pub fn detach(&mut self, core: &Core) {
for (node_id, sub_id) in self.subs.drain(..) {
core.unsubscribe(node_id, sub_id);
}
}
}
pub struct LogView {
pub node_id: NodeId,
sub: ReactiveSub,
}
impl LogView {
pub fn detach(&mut self, core: &Core) {
self.sub.detach(core);
}
}
pub struct ScanHandle {
pub node_id: NodeId,
sub: ReactiveSub,
}
impl ScanHandle {
pub fn detach(&mut self, core: &Core) {
self.sub.detach(core);
}
}
pub trait AppendLogSink<T>: Send + Sync {
fn append_entries(&self, entries: &[T])
-> Result<(), Box<dyn std::error::Error + Send + Sync>>;
fn load_entries(&self) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>;
}
pub struct AttachStorageHandle {
sub: ReactiveSub,
}
impl AttachStorageHandle {
pub fn detach(&mut self, core: &Core) {
self.sub.detach(core);
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct AttachOptions {
pub skip_cached_replay: bool,
}
impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
#[allow(clippy::too_many_lines)]
pub fn view(&self, core: &Core, spec: ViewSpec, intern: InternFn<Vec<T>>) -> LogView {
let view_node = core
.register_state(HandleId::new(0), false)
.expect("register_state for LogView");
let view_emitter = Arc::new(SinkEmitter {
mailbox: core.mailbox(),
node_id: view_node,
intern,
version: AtomicU64::new(0),
});
let inner = Arc::clone(&self.inner);
let mut subscriptions: Vec<(NodeId, SubscriptionId)> = Vec::new();
match spec {
ViewSpec::Tail { n } => {
let inner_c = Arc::clone(&inner);
let emitter_c = Arc::clone(&view_emitter);
let sub = core.subscribe(
self.node_id,
Arc::new(move |msgs| {
if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
let guard = inner_c.lock();
let data = guard.backend.to_vec();
let start = data.len().saturating_sub(n);
let view = data[start..].to_vec();
drop(guard);
emitter_c.emit(view);
}
}),
);
subscriptions.push((self.node_id, sub));
}
ViewSpec::Slice { start, stop } => {
let inner_c = Arc::clone(&inner);
let emitter_c = Arc::clone(&view_emitter);
let sub = core.subscribe(
self.node_id,
Arc::new(move |msgs| {
if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
let guard = inner_c.lock();
let data = guard.backend.to_vec();
let end = stop.unwrap_or(data.len()).min(data.len());
let s = start.min(end);
let view = data[s..end].to_vec();
drop(guard);
emitter_c.emit(view);
}
}),
);
subscriptions.push((self.node_id, sub));
}
ViewSpec::FromCursor {
cursor_node,
read_cursor,
} => {
let cursor_pos = Arc::new(Mutex::new(0usize));
let cursor_pos_c = Arc::clone(&cursor_pos);
let inner_c = Arc::clone(&inner);
let emitter_c = Arc::clone(&view_emitter);
let read_cursor_c = Arc::clone(&read_cursor);
let sub_cursor = core.subscribe(
cursor_node,
Arc::new(move |msgs| {
for m in msgs {
if let Message::Data(h) = m {
let pos = read_cursor_c(*h);
*cursor_pos_c.lock() = pos;
let guard = inner_c.lock();
let data = guard.backend.to_vec();
let s = pos.min(data.len());
let view = data[s..].to_vec();
drop(guard);
emitter_c.emit(view);
}
}
}),
);
subscriptions.push((cursor_node, sub_cursor));
let cursor_pos_c2 = Arc::clone(&cursor_pos);
let inner_c2 = Arc::clone(&inner);
let emitter_c2 = view_emitter;
let sub_log = core.subscribe(
self.node_id,
Arc::new(move |msgs| {
if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
let pos = *cursor_pos_c2.lock();
let guard = inner_c2.lock();
let data = guard.backend.to_vec();
let s = pos.min(data.len());
let view = data[s..].to_vec();
drop(guard);
emitter_c2.emit(view);
}
}),
);
subscriptions.push((self.node_id, sub_log));
}
}
LogView {
node_id: view_node,
sub: ReactiveSub {
subs: subscriptions,
},
}
}
pub fn scan<TAcc: Clone + Send + Sync + 'static>(
&self,
core: &Core,
initial: TAcc,
step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
intern: InternFn<TAcc>,
) -> ScanHandle {
struct ScanState<T, TAcc> {
acc: TAcc,
processed: usize,
initial: TAcc,
step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
}
let scan_node = core
.register_state(HandleId::new(0), false)
.expect("register_state for Scan");
let state = Arc::new(Mutex::new(ScanState {
acc: initial.clone(),
processed: 0,
initial,
step,
}));
let inner = Arc::clone(&self.inner);
let scan_emitter = Arc::new(SinkEmitter {
mailbox: core.mailbox(),
node_id: scan_node,
intern,
version: AtomicU64::new(0),
});
let sub = core.subscribe(
self.node_id,
Arc::new(move |msgs| {
if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
let mut ss = state.lock();
let guard = inner.lock();
let data = guard.backend.to_vec();
drop(guard);
if data.len() < ss.processed {
ss.acc = ss.initial.clone();
ss.processed = 0;
}
for item in &data[ss.processed..] {
ss.acc = (ss.step)(&ss.acc, item);
}
ss.processed = data.len();
let acc = ss.acc.clone();
drop(ss);
scan_emitter.emit(acc);
}
}),
);
ScanHandle {
node_id: scan_node,
sub: ReactiveSub {
subs: vec![(self.node_id, sub)],
},
}
}
pub fn attach(
&self,
core: &Core,
upstream: NodeId,
read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
) -> ReactiveSub {
self.attach_with_options(core, upstream, read_value, AttachOptions::default())
}
pub fn attach_with_options(
&self,
core: &Core,
upstream: NodeId,
read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
opts: AttachOptions,
) -> ReactiveSub {
let inner = Arc::clone(&self.inner);
let mailbox = core.mailbox();
let node_id = self.node_id;
let intern = Arc::clone(&self.emitter.intern);
let suppress = if opts.skip_cached_replay {
let cache = core.cache_of(upstream);
cache != graphrefly_core::NO_HANDLE
} else {
false
};
let skip_state = Arc::new(parking_lot::Mutex::new(suppress));
let sub = core.subscribe(
upstream,
Arc::new(move |msgs| {
let should_skip = {
let mut s = skip_state.lock();
let has_data = msgs.iter().any(|m| matches!(m, Message::Data(_)));
let do_skip = *s && has_data;
if do_skip {
*s = false;
}
do_skip
};
if should_skip {
return;
}
for m in msgs {
if let Message::Data(h) = m {
let value = read_value(*h);
let snapshot = {
let mut guard = inner.lock();
guard.backend.append(value);
guard.backend.to_vec()
};
let handle = (intern)(snapshot);
let _ = mailbox.post_emit(node_id, handle);
}
}
}),
);
ReactiveSub {
subs: vec![(upstream, sub)],
}
}
pub fn attach_storage(
&self,
core: &Core,
sinks: Vec<Arc<dyn AppendLogSink<T>>>,
preload: bool,
) -> AttachStorageHandle {
if sinks.is_empty() {
let sub = core.subscribe(self.node_id, Arc::new(|_| {}));
return AttachStorageHandle {
sub: ReactiveSub {
subs: vec![(self.node_id, sub)],
},
};
}
if preload {
for sink in &sinks {
if let Ok(entries) = sink.load_entries() {
if !entries.is_empty() {
self.append_many(core, entries);
break;
}
}
}
}
let current_size = self.size();
let delivered: Vec<Arc<Mutex<usize>>> = sinks
.iter()
.map(|_| Arc::new(Mutex::new(current_size)))
.collect();
let inner = Arc::clone(&self.inner);
let sinks_arc = sinks;
let delivered_arc = delivered;
let sub = core.subscribe(
self.node_id,
Arc::new(move |msgs| {
if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
let guard = inner.lock();
let data = guard.backend.to_vec();
drop(guard);
for (i, sink) in sinks_arc.iter().enumerate() {
let mut del = delivered_arc[i].lock();
let result = match data.len().cmp(&*del) {
std::cmp::Ordering::Greater => sink.append_entries(&data[*del..]),
std::cmp::Ordering::Less => sink.append_entries(&data),
std::cmp::Ordering::Equal => continue,
};
match result {
Ok(()) => *del = data.len(),
Err(e) => eprintln!("attach_storage sink[{i}] error: {e}"),
}
}
}
}),
);
AttachStorageHandle {
sub: ReactiveSub {
subs: vec![(self.node_id, sub)],
},
}
}
}
pub struct ReactiveList<T: Clone + Send + Sync + 'static> {
inner: Mutex<ListInner<T>>,
emitter: EmitHandle<Vec<T>>,
pub node_id: NodeId,
}
struct ListInner<T: Clone + Send + Sync + 'static> {
backend: Box<dyn ListBackend<T>>,
mutation_log: Option<Vec<BaseChange<ListChange<T>>>>,
structure_name: String,
}
impl<T: Clone + Send + Sync + 'static> ListInner<T> {
fn record(&mut self, change: ListChange<T>, version: Version) {
if let Some(log) = &mut self.mutation_log {
log.push(BaseChange {
structure: self.structure_name.clone(),
version,
t_ns: wall_clock_ns(),
seq: None,
lifecycle: Lifecycle::Data,
change,
});
}
}
}
pub struct ReactiveListOptions<T: Clone + Send + Sync + 'static> {
pub name: String,
pub backend: Option<Box<dyn ListBackend<T>>>,
pub mutation_log: bool,
}
impl<T: Clone + Send + Sync + 'static> Default for ReactiveListOptions<T> {
fn default() -> Self {
Self {
name: "reactiveList".into(),
backend: None,
mutation_log: false,
}
}
}
impl<T: Clone + Send + Sync + 'static> ReactiveList<T> {
#[must_use]
pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveListOptions<T>) -> Self {
let node_id = core
.register_state(HandleId::new(0), false)
.expect("register_state for ReactiveList");
let backend: Box<dyn ListBackend<T>> = opts
.backend
.unwrap_or_else(|| Box::new(VecListBackend::new()));
let mutation_log = if opts.mutation_log {
Some(Vec::new())
} else {
None
};
let inner = ListInner {
backend,
mutation_log,
structure_name: opts.name,
};
Self {
inner: Mutex::new(inner),
emitter: EmitHandle {
node_id,
intern,
version: AtomicU64::new(0),
},
node_id,
}
}
#[must_use]
pub fn size(&self) -> usize {
self.inner.lock().backend.size()
}
#[must_use]
pub fn at(&self, index: i64) -> Option<T> {
self.inner.lock().backend.at(index)
}
pub fn append(&self, core: &Core, value: T) {
let (snapshot, change) = {
let mut inner = self.inner.lock();
let change = inner.mutation_log.is_some().then(|| ListChange::Append {
value: value.clone(),
});
inner.backend.append(value);
(inner.backend.to_vec(), change)
};
let version = self.emitter.emit(core, snapshot);
if let Some(change) = change {
self.inner.lock().record(change, version);
}
}
pub fn append_many(&self, core: &Core, values: Vec<T>) {
if values.is_empty() {
return;
}
let (snapshot, change) = {
let mut inner = self.inner.lock();
let change = inner
.mutation_log
.is_some()
.then(|| ListChange::AppendMany {
values: values.clone(),
});
inner.backend.append_many(values);
(inner.backend.to_vec(), change)
};
let version = self.emitter.emit(core, snapshot);
if let Some(change) = change {
self.inner.lock().record(change, version);
}
}
pub fn insert(&self, core: &Core, index: usize, value: T) -> Result<(), IndexOutOfBounds> {
let (snapshot, change) = {
let mut inner = self.inner.lock();
if index > inner.backend.size() {
return Err(IndexOutOfBounds);
}
let change = inner.mutation_log.is_some().then(|| ListChange::Insert {
index,
value: value.clone(),
});
inner.backend.insert(index, value);
(inner.backend.to_vec(), change)
};
let version = self.emitter.emit(core, snapshot);
if let Some(change) = change {
self.inner.lock().record(change, version);
}
Ok(())
}
pub fn insert_many(
&self,
core: &Core,
index: usize,
values: Vec<T>,
) -> Result<(), IndexOutOfBounds> {
if values.is_empty() {
return Ok(());
}
let (snapshot, change) = {
let mut inner = self.inner.lock();
if index > inner.backend.size() {
return Err(IndexOutOfBounds);
}
let change = inner
.mutation_log
.is_some()
.then(|| ListChange::InsertMany {
index,
values: values.clone(),
});
inner.backend.insert_many(index, values);
(inner.backend.to_vec(), change)
};
let version = self.emitter.emit(core, snapshot);
if let Some(change) = change {
self.inner.lock().record(change, version);
}
Ok(())
}
pub fn pop(&self, core: &Core, index: i64) -> Option<T> {
let (value, snapshot, change) = {
let mut inner = self.inner.lock();
let value = inner.backend.pop(index)?;
let change = inner.mutation_log.is_some().then(|| ListChange::Pop {
index,
value: value.clone(),
});
let snapshot = inner.backend.to_vec();
(value, snapshot, change)
};
let version = self.emitter.emit(core, snapshot);
if let Some(change) = change {
self.inner.lock().record(change, version);
}
Some(value)
}
pub fn clear(&self, core: &Core) {
let (snapshot, count) = {
let mut inner = self.inner.lock();
let count = inner.backend.clear();
if count == 0 {
return;
}
(inner.backend.to_vec(), count)
};
let version = self.emitter.emit(core, snapshot);
self.inner
.lock()
.record(ListChange::Clear { count }, version);
}
#[must_use]
pub fn to_vec(&self) -> Vec<T> {
self.inner.lock().backend.to_vec()
}
#[must_use]
pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<ListChange<T>>>> {
self.inner.lock().mutation_log.clone()
}
}
pub struct ReactiveMap<K, V>
where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
inner: Mutex<MapInner<K, V>>,
emitter: EmitHandle<Vec<(K, V)>>,
pub node_id: NodeId,
}
pub struct RetentionPolicy<K, V>
where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
pub score: Arc<dyn Fn(&K, &V) -> f64 + Send + Sync>,
pub archive_threshold: Option<f64>,
pub max_size: Option<usize>,
pub on_archive: Option<Arc<dyn Fn(&K, &V, f64) + Send + Sync>>,
}
struct MapInner<K, V>
where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
backend: Box<dyn MapBackend<K, V>>,
mutation_log: Option<Vec<BaseChange<MapChange<K, V>>>>,
structure_name: String,
ttl_expiry: HashMap<K, u64>,
default_ttl_ns: Option<u64>,
lru_order: Vec<K>,
lru_max_size: Option<usize>,
retention: Option<RetentionPolicy<K, V>>,
}
impl<K, V> MapInner<K, V>
where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
fn record(&mut self, change: MapChange<K, V>, version: Version) {
if let Some(log) = &mut self.mutation_log {
log.push(BaseChange {
structure: self.structure_name.clone(),
version,
t_ns: wall_clock_ns(),
seq: None,
lifecycle: Lifecycle::Data,
change,
});
}
}
fn prune_expired_inner(&mut self) -> Vec<(K, V)> {
if self.ttl_expiry.is_empty() {
return vec![];
}
let now = monotonic_ns();
let expired_keys: Vec<K> = self
.ttl_expiry
.iter()
.filter(|(_, &exp)| now >= exp)
.map(|(k, _)| k.clone())
.collect();
let mut expired = Vec::new();
for k in expired_keys {
if let Some(prev) = self.backend.get(&k) {
self.backend.delete(&k);
self.ttl_expiry.remove(&k);
self.lru_remove(&k);
expired.push((k, prev));
}
}
expired
}
fn apply_retention_inner(&mut self) -> Vec<(K, V, f64)> {
let (score_fn, archive_threshold, max_size) = match &self.retention {
Some(r) => (Arc::clone(&r.score), r.archive_threshold, r.max_size),
None => return vec![],
};
let entries = self.backend.to_vec();
if entries.is_empty() {
return vec![];
}
let mut scored: Vec<(K, V, f64)> = entries
.into_iter()
.map(|(k, v)| {
let s = (score_fn)(&k, &v);
(k, v, s)
})
.collect();
scored.sort_by(|a, b| a.2.total_cmp(&b.2));
let mut archived = Vec::new();
if let Some(threshold) = archive_threshold {
while let Some(entry) = scored.first() {
if entry.2 < threshold {
let (k, v, s) = scored.remove(0);
self.backend.delete(&k);
self.ttl_expiry.remove(&k);
self.lru_remove(&k);
archived.push((k, v, s));
} else {
break;
}
}
}
if let Some(max) = max_size {
while scored.len() > max {
let (k, v, s) = scored.remove(0);
self.backend.delete(&k);
self.ttl_expiry.remove(&k);
self.lru_remove(&k);
archived.push((k, v, s));
}
}
archived
}
fn lru_touch(&mut self, key: &K) {
if self.lru_max_size.is_none() {
return;
}
if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
self.lru_order.remove(pos);
self.lru_order.push(key.clone());
}
}
fn lru_remove(&mut self, key: &K) {
if self.lru_max_size.is_none() {
return;
}
if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
self.lru_order.remove(pos);
}
}
fn lru_evict(&mut self) -> Vec<(K, V)> {
let Some(max) = self.lru_max_size else {
return vec![];
};
let mut evicted = Vec::new();
while self.backend.size() > max && !self.lru_order.is_empty() {
let victim = self.lru_order.remove(0);
if let Some(prev) = self.backend.get(&victim) {
self.backend.delete(&victim);
self.ttl_expiry.remove(&victim);
evicted.push((victim, prev));
}
}
evicted
}
fn set_ttl_with(&mut self, key: &K, ttl: Option<f64>) {
let ttl_ns = match ttl {
Some(secs) => Some((secs * 1_000_000_000.0) as u64),
None => self.default_ttl_ns,
};
if let Some(ns) = ttl_ns {
self.ttl_expiry.insert(key.clone(), monotonic_ns() + ns);
}
}
}
pub struct ReactiveMapOptions<K, V>
where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
pub name: String,
pub backend: Option<Box<dyn MapBackend<K, V>>>,
pub mutation_log: bool,
pub default_ttl: Option<f64>,
pub max_size: Option<usize>,
pub retention: Option<RetentionPolicy<K, V>>,
}
impl<K, V> Default for ReactiveMapOptions<K, V>
where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self {
name: "reactiveMap".into(),
backend: None,
mutation_log: false,
default_ttl: None,
max_size: None,
retention: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MapConfigError(pub String);
impl std::fmt::Display for MapConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl std::error::Error for MapConfigError {}
impl<K, V> ReactiveMap<K, V>
where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
pub fn new(
core: &Core,
intern: InternFn<Vec<(K, V)>>,
opts: ReactiveMapOptions<K, V>,
) -> Result<Self, MapConfigError> {
if opts.max_size.is_some() && opts.retention.is_some() {
return Err(MapConfigError(
"max_size (LRU) and retention are mutually exclusive".into(),
));
}
if let Some(ref r) = opts.retention {
if r.archive_threshold.is_none() && r.max_size.is_none() {
return Err(MapConfigError(
"retention requires at least one of archive_threshold or max_size".into(),
));
}
}
if let Some(ttl) = opts.default_ttl {
if ttl <= 0.0 {
return Err(MapConfigError("default_ttl must be > 0".into()));
}
}
let node_id = core
.register_state(HandleId::new(0), false)
.expect("register_state for ReactiveMap");
let backend: Box<dyn MapBackend<K, V>> = opts
.backend
.unwrap_or_else(|| Box::new(HashMapBackend::new()));
let mutation_log = if opts.mutation_log {
Some(Vec::new())
} else {
None
};
let default_ttl_ns = opts.default_ttl.map(|secs| (secs * 1_000_000_000.0) as u64);
let inner = MapInner {
backend,
mutation_log,
structure_name: opts.name,
ttl_expiry: HashMap::new(),
default_ttl_ns,
lru_order: Vec::new(),
lru_max_size: opts.max_size,
retention: opts.retention,
};
Ok(Self {
inner: Mutex::new(inner),
emitter: EmitHandle {
node_id,
intern,
version: AtomicU64::new(0),
},
node_id,
})
}
#[must_use]
pub fn size(&self) -> usize {
self.inner.lock().backend.size()
}
pub fn has(&self, core: &Core, key: &K) -> bool {
let (has, expired) = {
let mut inner = self.inner.lock();
let mut target_expired = false;
if inner.default_ttl_ns.is_some() {
if let Some(&exp) = inner.ttl_expiry.get(key) {
if monotonic_ns() >= exp {
target_expired = true;
}
}
}
let mut expired = inner.prune_expired_inner();
if target_expired && !expired.iter().any(|(k, _)| k == key) {
if let Some(prev) = inner.backend.get(key) {
inner.backend.delete(key);
inner.ttl_expiry.remove(key);
inner.lru_remove(key);
expired.push((key.clone(), prev));
}
}
let has = if target_expired {
false
} else {
let h = inner.backend.has(key);
if h {
inner.lru_touch(key);
}
h
};
(has, expired)
};
if !expired.is_empty() {
let snapshot = self.inner.lock().backend.to_vec();
let version = self.emitter.emit(core, snapshot);
let mut inner = self.inner.lock();
for (k, prev) in expired {
inner.record(
MapChange::Delete {
key: k,
previous: prev,
reason: DeleteReason::Expired,
},
version.clone(),
);
}
}
has
}
pub fn get(&self, core: &Core, key: &K) -> Option<V> {
let (value, expired) = {
let mut inner = self.inner.lock();
let mut target_expired = false;
if inner.default_ttl_ns.is_some() {
if let Some(&exp) = inner.ttl_expiry.get(key) {
if monotonic_ns() >= exp {
target_expired = true;
}
}
}
let mut expired = inner.prune_expired_inner();
if target_expired && !expired.iter().any(|(k, _)| k == key) {
if let Some(prev) = inner.backend.get(key) {
inner.backend.delete(key);
inner.ttl_expiry.remove(key);
inner.lru_remove(key);
expired.push((key.clone(), prev));
}
}
let value = if target_expired {
None
} else {
let v = inner.backend.get(key);
if v.is_some() {
inner.lru_touch(key);
}
v
};
(value, expired)
};
if !expired.is_empty() {
let snapshot = self.inner.lock().backend.to_vec();
let version = self.emitter.emit(core, snapshot);
let mut inner = self.inner.lock();
for (k, prev) in expired {
inner.record(
MapChange::Delete {
key: k,
previous: prev,
reason: DeleteReason::Expired,
},
version.clone(),
);
}
}
value
}
pub fn set(&self, core: &Core, key: K, value: V) {
self.set_with_ttl(core, key, value, None);
}
pub fn set_with_ttl(&self, core: &Core, key: K, value: V, ttl: Option<f64>) {
if let Some(t) = ttl {
assert!(
t > 0.0 && t.is_finite(),
"per-call ttl must be positive and finite"
);
}
let (snapshot, change, eviction_changes) = {
let mut inner = self.inner.lock();
let expired = inner.prune_expired_inner();
let change = inner.mutation_log.is_some().then(|| MapChange::Set {
key: key.clone(),
value: value.clone(),
});
inner.set_ttl_with(&key, ttl);
inner.lru_remove(&key);
if inner.lru_max_size.is_some() {
inner.lru_order.push(key.clone());
}
inner.backend.set(key, value);
let evicted = inner.lru_evict();
let archived = inner.apply_retention_inner();
let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
for (k, prev) in expired {
eviction_changes.push((k, prev, DeleteReason::Expired));
}
for (k, prev) in evicted {
eviction_changes.push((k, prev, DeleteReason::LruEvict));
}
for (k, v, s) in &archived {
if let Some(on_archive) =
&inner.retention.as_ref().and_then(|r| r.on_archive.clone())
{
on_archive(k, v, *s);
}
eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
}
(inner.backend.to_vec(), change, eviction_changes)
};
let version = self.emitter.emit(core, snapshot);
if change.is_some() || !eviction_changes.is_empty() {
let mut inner = self.inner.lock();
if let Some(change) = change {
inner.record(change, version.clone());
}
for (k, prev, reason) in eviction_changes {
inner.record(
MapChange::Delete {
key: k,
previous: prev,
reason,
},
version.clone(),
);
}
}
}
pub fn set_many(&self, core: &Core, entries: Vec<(K, V)>) {
self.set_many_with_ttl(core, entries, None);
}
pub fn set_many_with_ttl(&self, core: &Core, entries: Vec<(K, V)>, ttl: Option<f64>) {
if let Some(t) = ttl {
assert!(
t > 0.0 && t.is_finite(),
"per-call ttl must be positive and finite"
);
}
if entries.is_empty() {
return;
}
let (snapshot, changes, eviction_changes) = {
let mut inner = self.inner.lock();
let expired = inner.prune_expired_inner();
let changes: Option<Vec<MapChange<K, V>>> = inner.mutation_log.is_some().then(|| {
entries
.iter()
.map(|(k, v)| MapChange::Set {
key: k.clone(),
value: v.clone(),
})
.collect()
});
for (k, _) in &entries {
inner.set_ttl_with(k, ttl);
inner.lru_remove(k);
if inner.lru_max_size.is_some() {
inner.lru_order.push(k.clone());
}
}
inner.backend.set_many(entries);
let evicted = inner.lru_evict();
let archived = inner.apply_retention_inner();
let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
for (k, prev) in expired {
eviction_changes.push((k, prev, DeleteReason::Expired));
}
for (k, prev) in evicted {
eviction_changes.push((k, prev, DeleteReason::LruEvict));
}
for (k, v, s) in &archived {
if let Some(on_archive) =
&inner.retention.as_ref().and_then(|r| r.on_archive.clone())
{
on_archive(k, v, *s);
}
eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
}
(inner.backend.to_vec(), changes, eviction_changes)
};
let version = self.emitter.emit(core, snapshot);
if changes.is_some() || !eviction_changes.is_empty() {
let mut inner = self.inner.lock();
if let Some(changes) = changes {
for change in changes {
inner.record(change, version.clone());
}
}
for (k, prev, reason) in eviction_changes {
inner.record(
MapChange::Delete {
key: k,
previous: prev,
reason,
},
version.clone(),
);
}
}
}
pub fn delete(&self, core: &Core, key: &K) {
let (snapshot, previous) = {
let mut inner = self.inner.lock();
let previous = inner.backend.get(key);
if !inner.backend.delete(key) {
return;
}
inner.ttl_expiry.remove(key);
inner.lru_remove(key);
(inner.backend.to_vec(), previous)
};
let version = self.emitter.emit(core, snapshot);
if let Some(prev) = previous {
self.inner.lock().record(
MapChange::Delete {
key: key.clone(),
previous: prev,
reason: DeleteReason::Explicit,
},
version,
);
}
}
pub fn delete_many(&self, core: &Core, keys: &[K]) {
let (snapshot, actually_deleted) = {
let mut inner = self.inner.lock();
let actually_deleted: Vec<(K, V)> = keys
.iter()
.filter_map(|k| inner.backend.get(k).map(|v| (k.clone(), v)))
.collect();
let removed = inner.backend.delete_many(keys);
if removed == 0 {
return;
}
for k in keys {
inner.ttl_expiry.remove(k);
inner.lru_remove(k);
}
(inner.backend.to_vec(), actually_deleted)
};
let version = self.emitter.emit(core, snapshot);
if !actually_deleted.is_empty() {
let mut inner = self.inner.lock();
for (k, prev) in actually_deleted {
inner.record(
MapChange::Delete {
key: k,
previous: prev,
reason: DeleteReason::Explicit,
},
version.clone(),
);
}
}
}
pub fn clear(&self, core: &Core) {
let (snapshot, count) = {
let mut inner = self.inner.lock();
let count = inner.backend.clear();
if count == 0 {
return;
}
inner.ttl_expiry.clear();
inner.lru_order.clear();
(inner.backend.to_vec(), count)
};
let version = self.emitter.emit(core, snapshot);
self.inner
.lock()
.record(MapChange::Clear { count }, version);
}
pub fn prune_expired(&self, core: &Core) -> usize {
let expired = {
let mut inner = self.inner.lock();
inner.prune_expired_inner()
};
if expired.is_empty() {
return 0;
}
let count = expired.len();
let snapshot = self.inner.lock().backend.to_vec();
let version = self.emitter.emit(core, snapshot);
let mut inner = self.inner.lock();
for (k, prev) in expired {
inner.record(
MapChange::Delete {
key: k,
previous: prev,
reason: DeleteReason::Expired,
},
version.clone(),
);
}
count
}
#[must_use]
pub fn to_vec(&self) -> Vec<(K, V)> {
self.inner.lock().backend.to_vec()
}
#[must_use]
pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<MapChange<K, V>>>> {
self.inner.lock().mutation_log.clone()
}
}
pub struct ReactiveIndex<K, V>
where
K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
V: Clone + Send + Sync + 'static,
{
inner: Mutex<IndexInner<K, V>>,
emitter: EmitHandle<Vec<IndexRow<K, V>>>,
pub node_id: NodeId,
}
pub type IndexEqualsFn<K, V> = Arc<dyn Fn(&IndexRow<K, V>, &IndexRow<K, V>) -> bool + Send + Sync>;
struct IndexInner<K, V>
where
K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
V: Clone + Send + Sync + 'static,
{
backend: Box<dyn IndexBackend<K, V>>,
mutation_log: Option<Vec<BaseChange<IndexChange<K, V>>>>,
structure_name: String,
equals: Option<IndexEqualsFn<K, V>>,
}
impl<K, V> IndexInner<K, V>
where
K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
V: Clone + Send + Sync + 'static,
{
fn record(&mut self, change: IndexChange<K, V>, version: Version) {
if let Some(log) = &mut self.mutation_log {
log.push(BaseChange {
structure: self.structure_name.clone(),
version,
t_ns: wall_clock_ns(),
seq: None,
lifecycle: Lifecycle::Data,
change,
});
}
}
}
pub struct UpsertOptions<K, V>
where
K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
V: Clone + Send + Sync + 'static,
{
pub equals: Option<IndexEqualsFn<K, V>>,
}
impl<K, V> Default for UpsertOptions<K, V>
where
K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
V: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self { equals: None }
}
}
pub struct ReactiveIndexOptions<K, V>
where
K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
V: Clone + Send + Sync + 'static,
{
pub name: String,
pub backend: Option<Box<dyn IndexBackend<K, V>>>,
pub mutation_log: bool,
pub equals: Option<IndexEqualsFn<K, V>>,
}
impl<K, V> Default for ReactiveIndexOptions<K, V>
where
K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
V: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self {
name: "reactiveIndex".into(),
backend: None,
mutation_log: false,
equals: None,
}
}
}
impl<K, V> ReactiveIndex<K, V>
where
K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
V: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn new(
core: &Core,
intern: InternFn<Vec<IndexRow<K, V>>>,
opts: ReactiveIndexOptions<K, V>,
) -> Self {
let node_id = core
.register_state(HandleId::new(0), false)
.expect("register_state for ReactiveIndex");
let backend: Box<dyn IndexBackend<K, V>> = opts
.backend
.unwrap_or_else(|| Box::new(VecIndexBackend::new()));
let mutation_log = if opts.mutation_log {
Some(Vec::new())
} else {
None
};
let inner = IndexInner {
backend,
mutation_log,
structure_name: opts.name,
equals: opts.equals,
};
Self {
inner: Mutex::new(inner),
emitter: EmitHandle {
node_id,
intern,
version: AtomicU64::new(0),
},
node_id,
}
}
#[must_use]
pub fn size(&self) -> usize {
self.inner.lock().backend.size()
}
#[must_use]
pub fn has(&self, primary: &K) -> bool {
self.inner.lock().backend.has(primary)
}
#[must_use]
pub fn get(&self, primary: &K) -> Option<V> {
self.inner.lock().backend.get(primary)
}
pub fn upsert(&self, core: &Core, primary: K, secondary: String, value: V) -> bool {
self.upsert_with(core, primary, secondary, value, &UpsertOptions::default())
}
pub fn upsert_with(
&self,
core: &Core,
primary: K,
secondary: String,
value: V,
opts: &UpsertOptions<K, V>,
) -> bool {
let (is_new, snapshot, change) = {
let mut inner = self.inner.lock();
let eq_fn = opts.equals.as_ref().or(inner.equals.as_ref());
if let Some(eq) = eq_fn {
if let Some(existing_row) = inner.backend.get_row(&primary) {
let proposed = IndexRow {
primary: primary.clone(),
secondary: secondary.clone(),
value: value.clone(),
};
if eq(&existing_row, &proposed) {
return false;
}
}
}
let change = inner.mutation_log.is_some().then(|| IndexChange::Upsert {
primary: primary.clone(),
secondary: secondary.clone(),
value: value.clone(),
});
let is_new = inner.backend.upsert(primary, secondary, value);
(is_new, inner.backend.to_ordered(), change)
};
let version = self.emitter.emit(core, snapshot);
if let Some(change) = change {
self.inner.lock().record(change, version);
}
is_new
}
pub fn upsert_many(&self, core: &Core, rows: Vec<(K, String, V)>) {
if rows.is_empty() {
return;
}
let (snapshot, changes) = {
let mut inner = self.inner.lock();
let effective_rows: Vec<(K, String, V)> = if let Some(eq) = &inner.equals {
rows.into_iter()
.filter(|(pk, sec, val)| {
if let Some(existing) = inner.backend.get_row(pk) {
let proposed = IndexRow {
primary: pk.clone(),
secondary: sec.clone(),
value: val.clone(),
};
!eq(&existing, &proposed)
} else {
true
}
})
.collect()
} else {
rows
};
if effective_rows.is_empty() {
return;
}
let changes: Option<Vec<IndexChange<K, V>>> = inner.mutation_log.is_some().then(|| {
effective_rows
.iter()
.map(|(k, s, v)| IndexChange::Upsert {
primary: k.clone(),
secondary: s.clone(),
value: v.clone(),
})
.collect()
});
inner.backend.upsert_many(effective_rows);
(inner.backend.to_ordered(), changes)
};
let version = self.emitter.emit(core, snapshot);
if let Some(changes) = changes {
let mut inner = self.inner.lock();
for change in changes {
inner.record(change, version.clone());
}
}
}
pub fn delete(&self, core: &Core, primary: &K) {
let snapshot = {
let mut inner = self.inner.lock();
if !inner.backend.delete(primary) {
return;
}
inner.backend.to_ordered()
};
let version = self.emitter.emit(core, snapshot);
self.inner.lock().record(
IndexChange::Delete {
primary: primary.clone(),
},
version,
);
}
pub fn delete_many(&self, core: &Core, primaries: &[K]) {
let (snapshot, actually_deleted) = {
let mut inner = self.inner.lock();
let actually_deleted: Vec<K> = if inner.mutation_log.is_some() {
primaries
.iter()
.filter(|k| inner.backend.has(k))
.cloned()
.collect()
} else {
vec![]
};
let removed = inner.backend.delete_many(primaries);
if removed == 0 {
return;
}
(inner.backend.to_ordered(), actually_deleted)
};
let version = self.emitter.emit(core, snapshot);
if !actually_deleted.is_empty() {
self.inner.lock().record(
IndexChange::DeleteMany {
primaries: actually_deleted,
},
version,
);
}
}
pub fn clear(&self, core: &Core) {
let (snapshot, count) = {
let mut inner = self.inner.lock();
let count = inner.backend.clear();
if count == 0 {
return;
}
(inner.backend.to_ordered(), count)
};
let version = self.emitter.emit(core, snapshot);
self.inner
.lock()
.record(IndexChange::Clear { count }, version);
}
#[must_use]
pub fn to_ordered(&self) -> Vec<IndexRow<K, V>> {
self.inner.lock().backend.to_ordered()
}
#[must_use]
pub fn to_primary_map(&self) -> Vec<(K, V)> {
self.inner.lock().backend.to_primary_map()
}
#[must_use]
pub fn range_by_primary(&self, start: &K, end: &K) -> Vec<V>
where
K: Ord,
{
let mut rows: Vec<(K, V)> = self
.inner
.lock()
.backend
.to_primary_map()
.into_iter()
.filter(|(k, _)| k >= start && k < end)
.collect();
rows.sort_by(|a, b| a.0.cmp(&b.0));
rows.into_iter().map(|(_, v)| v).collect()
}
#[must_use]
pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<IndexChange<K, V>>>> {
self.inner.lock().mutation_log.clone()
}
}