1use std::collections::HashMap;
14use std::hash::Hash;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use parking_lot::Mutex;
19
20use graphrefly_core::{
21 monotonic_ns, wall_clock_ns, Core, HandleId, Message, NodeId, Subscription, WeakCore,
22};
23
24use crate::backend::{
25 HashMapBackend, IndexBackend, IndexRow, ListBackend, LogBackend, MapBackend, VecIndexBackend,
26 VecListBackend, VecLogBackend,
27};
28use crate::changeset::{
29 BaseChange, DeleteReason, IndexChange, Lifecycle, ListChange, LogChange, MapChange, Version,
30};
31
32pub type InternFn<S> = Arc<dyn Fn(S) -> HandleId + Send + Sync>;
40
41struct EmitHandle<S> {
48 core: WeakCore,
49 node_id: NodeId,
50 intern: InternFn<S>,
51 version: AtomicU64,
52}
53
54impl<S> EmitHandle<S> {
55 fn emit(&self, snapshot: S) -> Version {
57 let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
58 let handle = (self.intern)(snapshot);
59 if let Some(core) = self.core.upgrade() {
60 core.emit(self.node_id, handle);
61 }
62 Version::Counter(ver)
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub struct IndexOutOfBounds;
73
74impl std::fmt::Display for IndexOutOfBounds {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.write_str("index out of bounds")
77 }
78}
79
80impl std::error::Error for IndexOutOfBounds {}
81
82pub struct ReactiveLog<T: Clone + Send + Sync + 'static> {
92 inner: Arc<Mutex<LogInner<T>>>,
93 emitter: EmitHandle<Vec<T>>,
94 pub node_id: NodeId,
96}
97
98struct LogInner<T: Clone + Send + Sync + 'static> {
99 backend: Box<dyn LogBackend<T>>,
100 mutation_log: Option<Vec<BaseChange<LogChange<T>>>>,
101 structure_name: String,
102}
103
104impl<T: Clone + Send + Sync + 'static> LogInner<T> {
105 fn record(&mut self, change: LogChange<T>, version: Version) {
106 if let Some(log) = &mut self.mutation_log {
107 log.push(BaseChange {
108 structure: self.structure_name.clone(),
109 version,
110 t_ns: wall_clock_ns(),
111 seq: None,
112 lifecycle: Lifecycle::Data,
113 change,
114 });
115 }
116 }
117}
118
119pub struct ReactiveLogOptions<T: Clone + Send + Sync + 'static> {
121 pub name: String,
122 pub max_size: Option<usize>,
123 pub backend: Option<Box<dyn LogBackend<T>>>,
124 pub mutation_log: bool,
125}
126
127impl<T: Clone + Send + Sync + 'static> Default for ReactiveLogOptions<T> {
128 fn default() -> Self {
129 Self {
130 name: "reactiveLog".into(),
131 max_size: None,
132 backend: None,
133 mutation_log: false,
134 }
135 }
136}
137
138impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
139 #[must_use]
143 pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveLogOptions<T>) -> Self {
144 let node_id = core
145 .register_state(HandleId::new(0), false)
146 .expect("register_state for ReactiveLog");
147 let backend: Box<dyn LogBackend<T>> = opts
148 .backend
149 .unwrap_or_else(|| Box::new(VecLogBackend::new(opts.max_size)));
150 let mutation_log = if opts.mutation_log {
151 Some(Vec::new())
152 } else {
153 None
154 };
155 let inner = LogInner {
156 backend,
157 mutation_log,
158 structure_name: opts.name,
159 };
160 Self {
161 inner: Arc::new(Mutex::new(inner)),
162 emitter: EmitHandle {
163 core: core.weak_handle(),
164 node_id,
165 intern,
166 version: AtomicU64::new(0),
167 },
168 node_id,
169 }
170 }
171
172 #[must_use]
173 pub fn size(&self) -> usize {
174 self.inner.lock().backend.size()
175 }
176
177 #[must_use]
178 pub fn at(&self, index: i64) -> Option<T> {
179 self.inner.lock().backend.at(index)
180 }
181
182 pub fn append(&self, value: T) {
183 let (snapshot, change) = {
184 let mut inner = self.inner.lock();
185 let change = inner.mutation_log.is_some().then(|| LogChange::Append {
186 value: value.clone(),
187 });
188 inner.backend.append(value);
189 (inner.backend.to_vec(), change)
190 };
191 let version = self.emitter.emit(snapshot);
192 if let Some(change) = change {
193 self.inner.lock().record(change, version);
194 }
195 }
196
197 pub fn append_many(&self, values: Vec<T>) {
198 if values.is_empty() {
199 return;
200 }
201 let (snapshot, change) = {
202 let mut inner = self.inner.lock();
203 let change = inner.mutation_log.is_some().then(|| LogChange::AppendMany {
204 values: values.clone(),
205 });
206 inner.backend.append_many(values);
207 (inner.backend.to_vec(), change)
208 };
209 let version = self.emitter.emit(snapshot);
210 if let Some(change) = change {
211 self.inner.lock().record(change, version);
212 }
213 }
214
215 pub fn clear(&self) {
216 let (snapshot, count) = {
217 let mut inner = self.inner.lock();
218 let count = inner.backend.clear();
219 if count == 0 {
220 return;
221 }
222 (inner.backend.to_vec(), count)
223 };
224 let version = self.emitter.emit(snapshot);
225 self.inner
226 .lock()
227 .record(LogChange::Clear { count }, version);
228 }
229
230 pub fn trim_head(&self, n: usize) {
231 if n == 0 {
232 return;
233 }
234 let (snapshot, actual) = {
235 let mut inner = self.inner.lock();
236 let actual = inner.backend.trim_head(n);
237 if actual == 0 {
238 return;
239 }
240 (inner.backend.to_vec(), actual)
241 };
242 let version = self.emitter.emit(snapshot);
243 self.inner
244 .lock()
245 .record(LogChange::TrimHead { n: actual }, version);
246 }
247
248 #[must_use]
249 pub fn to_vec(&self) -> Vec<T> {
250 self.inner.lock().backend.to_vec()
251 }
252
253 #[must_use]
256 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<LogChange<T>>>> {
257 self.inner.lock().mutation_log.clone()
258 }
259}
260
261pub enum ViewSpec {
267 Tail { n: usize },
269 Slice { start: usize, stop: Option<usize> },
271 FromCursor {
274 cursor_node: NodeId,
275 read_cursor: Arc<dyn Fn(HandleId) -> usize + Send + Sync>,
276 },
277}
278
279pub struct LogView {
281 pub node_id: NodeId,
283 _subscriptions: Vec<Subscription>,
284}
285
286pub struct ScanHandle {
288 pub node_id: NodeId,
290 _subscription: Subscription,
291}
292
293pub trait AppendLogSink<T>: Send + Sync {
299 fn append_entries(&self, entries: &[T])
301 -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
302 fn load_entries(&self) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>;
305}
306
307pub struct AttachStorageHandle {
309 _subscription: Subscription,
310}
311
312impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
313 #[allow(clippy::too_many_lines)]
318 pub fn view(&self, spec: ViewSpec, intern: InternFn<Vec<T>>) -> LogView {
319 let core = self.emitter.core.upgrade().expect("Core dropped");
320 let view_node = core
321 .register_state(HandleId::new(0), false)
322 .expect("register_state for LogView");
323 let view_emitter = Arc::new(EmitHandle {
324 core: self.emitter.core.clone(),
325 node_id: view_node,
326 intern,
327 version: AtomicU64::new(0),
328 });
329
330 let inner = Arc::clone(&self.inner);
331 let mut subscriptions = Vec::new();
332
333 match spec {
334 ViewSpec::Tail { n } => {
335 let inner_c = Arc::clone(&inner);
336 let emitter_c = Arc::clone(&view_emitter);
337 let sub = core.subscribe(
338 self.node_id,
339 Arc::new(move |msgs| {
340 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
341 let guard = inner_c.lock();
342 let data = guard.backend.to_vec();
343 let start = data.len().saturating_sub(n);
344 let view = data[start..].to_vec();
345 drop(guard);
346 emitter_c.emit(view);
347 }
348 }),
349 );
350 subscriptions.push(sub);
351 }
352 ViewSpec::Slice { start, stop } => {
353 let inner_c = Arc::clone(&inner);
354 let emitter_c = Arc::clone(&view_emitter);
355 let sub = core.subscribe(
356 self.node_id,
357 Arc::new(move |msgs| {
358 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
359 let guard = inner_c.lock();
360 let data = guard.backend.to_vec();
361 let end = stop.unwrap_or(data.len()).min(data.len());
362 let s = start.min(end);
363 let view = data[s..end].to_vec();
364 drop(guard);
365 emitter_c.emit(view);
366 }
367 }),
368 );
369 subscriptions.push(sub);
370 }
371 ViewSpec::FromCursor {
372 cursor_node,
373 read_cursor,
374 } => {
375 let cursor_pos = Arc::new(Mutex::new(0usize));
376
377 let cursor_pos_c = Arc::clone(&cursor_pos);
379 let inner_c = Arc::clone(&inner);
380 let emitter_c = Arc::clone(&view_emitter);
381 let read_cursor_c = Arc::clone(&read_cursor);
382 let sub_cursor = core.subscribe(
383 cursor_node,
384 Arc::new(move |msgs| {
385 for m in msgs {
386 if let Message::Data(h) = m {
387 let pos = read_cursor_c(*h);
388 *cursor_pos_c.lock() = pos;
389 let guard = inner_c.lock();
390 let data = guard.backend.to_vec();
391 let s = pos.min(data.len());
392 let view = data[s..].to_vec();
393 drop(guard);
394 emitter_c.emit(view);
395 }
396 }
397 }),
398 );
399 subscriptions.push(sub_cursor);
400
401 let cursor_pos_c2 = Arc::clone(&cursor_pos);
403 let inner_c2 = Arc::clone(&inner);
404 let emitter_c2 = view_emitter;
405 let sub_log = core.subscribe(
406 self.node_id,
407 Arc::new(move |msgs| {
408 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
409 let pos = *cursor_pos_c2.lock();
410 let guard = inner_c2.lock();
411 let data = guard.backend.to_vec();
412 let s = pos.min(data.len());
413 let view = data[s..].to_vec();
414 drop(guard);
415 emitter_c2.emit(view);
416 }
417 }),
418 );
419 subscriptions.push(sub_log);
420 }
421 }
422
423 LogView {
424 node_id: view_node,
425 _subscriptions: subscriptions,
426 }
427 }
428
429 pub fn scan<TAcc: Clone + Send + Sync + 'static>(
435 &self,
436 initial: TAcc,
437 step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
438 intern: InternFn<TAcc>,
439 ) -> ScanHandle {
440 struct ScanState<T, TAcc> {
441 acc: TAcc,
442 processed: usize,
443 initial: TAcc,
444 step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
445 }
446
447 let core = self.emitter.core.upgrade().expect("Core dropped");
448 let scan_node = core
449 .register_state(HandleId::new(0), false)
450 .expect("register_state for Scan");
451
452 let state = Arc::new(Mutex::new(ScanState {
453 acc: initial.clone(),
454 processed: 0,
455 initial,
456 step,
457 }));
458 let inner = Arc::clone(&self.inner);
459 let scan_emitter = Arc::new(EmitHandle {
460 core: self.emitter.core.clone(),
461 node_id: scan_node,
462 intern,
463 version: AtomicU64::new(0),
464 });
465
466 let sub = core.subscribe(
467 self.node_id,
468 Arc::new(move |msgs| {
469 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
470 let mut ss = state.lock();
471 let guard = inner.lock();
472 let data = guard.backend.to_vec();
473 drop(guard);
474
475 if data.len() < ss.processed {
476 ss.acc = ss.initial.clone();
478 ss.processed = 0;
479 }
480 for item in &data[ss.processed..] {
481 ss.acc = (ss.step)(&ss.acc, item);
482 }
483 ss.processed = data.len();
484 let acc = ss.acc.clone();
485 drop(ss);
486 scan_emitter.emit(acc);
487 }
488 }),
489 );
490
491 ScanHandle {
492 node_id: scan_node,
493 _subscription: sub,
494 }
495 }
496
497 pub fn attach(
502 &self,
503 upstream: NodeId,
504 read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
505 ) -> Subscription {
506 let core = self.emitter.core.upgrade().expect("Core dropped");
507 let inner = Arc::clone(&self.inner);
508 let weak_core = self.emitter.core.clone();
509 let node_id = self.node_id;
510 let intern = Arc::clone(&self.emitter.intern);
511
512 core.subscribe(
513 upstream,
514 Arc::new(move |msgs| {
515 for m in msgs {
516 if let Message::Data(h) = m {
517 let value = read_value(*h);
518 let snapshot = {
519 let mut guard = inner.lock();
520 guard.backend.append(value);
521 guard.backend.to_vec()
522 };
523 let handle = (intern)(snapshot);
524 if let Some(c) = weak_core.upgrade() {
525 c.emit(node_id, handle);
526 }
527 }
528 }
529 }),
530 )
531 }
532
533 pub fn attach_storage(
541 &self,
542 sinks: Vec<Arc<dyn AppendLogSink<T>>>,
543 preload: bool,
544 ) -> AttachStorageHandle {
545 if sinks.is_empty() {
546 let core = self.emitter.core.upgrade().expect("Core dropped");
547 let sub = core.subscribe(self.node_id, Arc::new(|_| {}));
548 return AttachStorageHandle { _subscription: sub };
549 }
550
551 if preload {
552 for sink in &sinks {
553 if let Ok(entries) = sink.load_entries() {
554 if !entries.is_empty() {
555 self.append_many(entries);
556 break;
557 }
558 }
559 }
560 }
561
562 let current_size = self.size();
563 let delivered: Vec<Arc<Mutex<usize>>> = sinks
566 .iter()
567 .map(|_| Arc::new(Mutex::new(current_size)))
568 .collect();
569
570 let core = self.emitter.core.upgrade().expect("Core dropped");
571 let inner = Arc::clone(&self.inner);
572 let sinks_arc = sinks;
573 let delivered_arc = delivered;
574
575 let sub = core.subscribe(
576 self.node_id,
577 Arc::new(move |msgs| {
578 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
579 let guard = inner.lock();
580 let data = guard.backend.to_vec();
581 drop(guard);
582
583 for (i, sink) in sinks_arc.iter().enumerate() {
584 let mut del = delivered_arc[i].lock();
585 let result = match data.len().cmp(&*del) {
586 std::cmp::Ordering::Greater => sink.append_entries(&data[*del..]),
587 std::cmp::Ordering::Less => sink.append_entries(&data),
588 std::cmp::Ordering::Equal => continue,
589 };
590 match result {
591 Ok(()) => *del = data.len(),
592 Err(e) => eprintln!("attach_storage sink[{i}] error: {e}"),
593 }
594 }
595 }
596 }),
597 );
598
599 AttachStorageHandle { _subscription: sub }
600 }
601}
602
603pub struct ReactiveList<T: Clone + Send + Sync + 'static> {
611 inner: Mutex<ListInner<T>>,
612 emitter: EmitHandle<Vec<T>>,
613 pub node_id: NodeId,
614}
615
616struct ListInner<T: Clone + Send + Sync + 'static> {
617 backend: Box<dyn ListBackend<T>>,
618 mutation_log: Option<Vec<BaseChange<ListChange<T>>>>,
619 structure_name: String,
620}
621
622impl<T: Clone + Send + Sync + 'static> ListInner<T> {
623 fn record(&mut self, change: ListChange<T>, version: Version) {
624 if let Some(log) = &mut self.mutation_log {
625 log.push(BaseChange {
626 structure: self.structure_name.clone(),
627 version,
628 t_ns: wall_clock_ns(),
629 seq: None,
630 lifecycle: Lifecycle::Data,
631 change,
632 });
633 }
634 }
635}
636
637pub struct ReactiveListOptions<T: Clone + Send + Sync + 'static> {
639 pub name: String,
640 pub backend: Option<Box<dyn ListBackend<T>>>,
641 pub mutation_log: bool,
642}
643
644impl<T: Clone + Send + Sync + 'static> Default for ReactiveListOptions<T> {
645 fn default() -> Self {
646 Self {
647 name: "reactiveList".into(),
648 backend: None,
649 mutation_log: false,
650 }
651 }
652}
653
654impl<T: Clone + Send + Sync + 'static> ReactiveList<T> {
655 #[must_use]
656 pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveListOptions<T>) -> Self {
657 let node_id = core
658 .register_state(HandleId::new(0), false)
659 .expect("register_state for ReactiveList");
660 let backend: Box<dyn ListBackend<T>> = opts
661 .backend
662 .unwrap_or_else(|| Box::new(VecListBackend::new()));
663 let mutation_log = if opts.mutation_log {
664 Some(Vec::new())
665 } else {
666 None
667 };
668 let inner = ListInner {
669 backend,
670 mutation_log,
671 structure_name: opts.name,
672 };
673 Self {
674 inner: Mutex::new(inner),
675 emitter: EmitHandle {
676 core: core.weak_handle(),
677 node_id,
678 intern,
679 version: AtomicU64::new(0),
680 },
681 node_id,
682 }
683 }
684
685 #[must_use]
686 pub fn size(&self) -> usize {
687 self.inner.lock().backend.size()
688 }
689
690 #[must_use]
691 pub fn at(&self, index: i64) -> Option<T> {
692 self.inner.lock().backend.at(index)
693 }
694
695 pub fn append(&self, value: T) {
696 let (snapshot, change) = {
697 let mut inner = self.inner.lock();
698 let change = inner.mutation_log.is_some().then(|| ListChange::Append {
699 value: value.clone(),
700 });
701 inner.backend.append(value);
702 (inner.backend.to_vec(), change)
703 };
704 let version = self.emitter.emit(snapshot);
705 if let Some(change) = change {
706 self.inner.lock().record(change, version);
707 }
708 }
709
710 pub fn append_many(&self, values: Vec<T>) {
711 if values.is_empty() {
712 return;
713 }
714 let (snapshot, change) = {
715 let mut inner = self.inner.lock();
716 let change = inner
717 .mutation_log
718 .is_some()
719 .then(|| ListChange::AppendMany {
720 values: values.clone(),
721 });
722 inner.backend.append_many(values);
723 (inner.backend.to_vec(), change)
724 };
725 let version = self.emitter.emit(snapshot);
726 if let Some(change) = change {
727 self.inner.lock().record(change, version);
728 }
729 }
730
731 pub fn insert(&self, index: usize, value: T) -> Result<(), IndexOutOfBounds> {
734 let (snapshot, change) = {
735 let mut inner = self.inner.lock();
736 if index > inner.backend.size() {
737 return Err(IndexOutOfBounds);
738 }
739 let change = inner.mutation_log.is_some().then(|| ListChange::Insert {
740 index,
741 value: value.clone(),
742 });
743 inner.backend.insert(index, value);
744 (inner.backend.to_vec(), change)
745 };
746 let version = self.emitter.emit(snapshot);
747 if let Some(change) = change {
748 self.inner.lock().record(change, version);
749 }
750 Ok(())
751 }
752
753 pub fn insert_many(&self, index: usize, values: Vec<T>) -> Result<(), IndexOutOfBounds> {
756 if values.is_empty() {
757 return Ok(());
758 }
759 let (snapshot, change) = {
760 let mut inner = self.inner.lock();
761 if index > inner.backend.size() {
762 return Err(IndexOutOfBounds);
763 }
764 let change = inner
765 .mutation_log
766 .is_some()
767 .then(|| ListChange::InsertMany {
768 index,
769 values: values.clone(),
770 });
771 inner.backend.insert_many(index, values);
772 (inner.backend.to_vec(), change)
773 };
774 let version = self.emitter.emit(snapshot);
775 if let Some(change) = change {
776 self.inner.lock().record(change, version);
777 }
778 Ok(())
779 }
780
781 pub fn pop(&self, index: i64) -> Option<T> {
784 let (value, snapshot, change) = {
785 let mut inner = self.inner.lock();
786 let value = inner.backend.pop(index)?;
787 let change = inner.mutation_log.is_some().then(|| ListChange::Pop {
788 index,
789 value: value.clone(),
790 });
791 let snapshot = inner.backend.to_vec();
792 (value, snapshot, change)
793 };
794 let version = self.emitter.emit(snapshot);
795 if let Some(change) = change {
796 self.inner.lock().record(change, version);
797 }
798 Some(value)
799 }
800
801 pub fn clear(&self) {
802 let (snapshot, count) = {
803 let mut inner = self.inner.lock();
804 let count = inner.backend.clear();
805 if count == 0 {
806 return;
807 }
808 (inner.backend.to_vec(), count)
809 };
810 let version = self.emitter.emit(snapshot);
811 self.inner
812 .lock()
813 .record(ListChange::Clear { count }, version);
814 }
815
816 #[must_use]
817 pub fn to_vec(&self) -> Vec<T> {
818 self.inner.lock().backend.to_vec()
819 }
820
821 #[must_use]
822 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<ListChange<T>>>> {
823 self.inner.lock().mutation_log.clone()
824 }
825}
826
827pub struct ReactiveMap<K, V>
836where
837 K: Clone + Eq + Hash + Send + Sync + 'static,
838 V: Clone + Send + Sync + 'static,
839{
840 inner: Mutex<MapInner<K, V>>,
841 emitter: EmitHandle<Vec<(K, V)>>,
842 pub node_id: NodeId,
843}
844
845pub struct RetentionPolicy<K, V>
851where
852 K: Clone + Eq + Hash + Send + Sync + 'static,
853 V: Clone + Send + Sync + 'static,
854{
855 pub score: Arc<dyn Fn(&K, &V) -> f64 + Send + Sync>,
857 pub archive_threshold: Option<f64>,
859 pub max_size: Option<usize>,
861 pub on_archive: Option<Arc<dyn Fn(&K, &V, f64) + Send + Sync>>,
863}
864
865struct MapInner<K, V>
866where
867 K: Clone + Eq + Hash + Send + Sync + 'static,
868 V: Clone + Send + Sync + 'static,
869{
870 backend: Box<dyn MapBackend<K, V>>,
871 mutation_log: Option<Vec<BaseChange<MapChange<K, V>>>>,
872 structure_name: String,
873 ttl_expiry: HashMap<K, u64>,
875 default_ttl_ns: Option<u64>,
877 lru_order: Vec<K>,
879 lru_max_size: Option<usize>,
881 retention: Option<RetentionPolicy<K, V>>,
883}
884
885impl<K, V> MapInner<K, V>
886where
887 K: Clone + Eq + Hash + Send + Sync + 'static,
888 V: Clone + Send + Sync + 'static,
889{
890 fn record(&mut self, change: MapChange<K, V>, version: Version) {
891 if let Some(log) = &mut self.mutation_log {
892 log.push(BaseChange {
893 structure: self.structure_name.clone(),
894 version,
895 t_ns: wall_clock_ns(),
896 seq: None,
897 lifecycle: Lifecycle::Data,
898 change,
899 });
900 }
901 }
902
903 fn prune_expired_inner(&mut self) -> Vec<(K, V)> {
905 if self.ttl_expiry.is_empty() {
906 return vec![];
907 }
908 let now = monotonic_ns();
909 let expired_keys: Vec<K> = self
910 .ttl_expiry
911 .iter()
912 .filter(|(_, &exp)| now >= exp)
913 .map(|(k, _)| k.clone())
914 .collect();
915 let mut expired = Vec::new();
916 for k in expired_keys {
917 if let Some(prev) = self.backend.get(&k) {
918 self.backend.delete(&k);
919 self.ttl_expiry.remove(&k);
920 self.lru_remove(&k);
921 expired.push((k, prev));
922 }
923 }
924 expired
925 }
926
927 fn apply_retention_inner(&mut self) -> Vec<(K, V, f64)> {
929 let (score_fn, archive_threshold, max_size) = match &self.retention {
931 Some(r) => (Arc::clone(&r.score), r.archive_threshold, r.max_size),
932 None => return vec![],
933 };
934 let entries = self.backend.to_vec();
935 if entries.is_empty() {
936 return vec![];
937 }
938 let mut scored: Vec<(K, V, f64)> = entries
939 .into_iter()
940 .map(|(k, v)| {
941 let s = (score_fn)(&k, &v);
942 (k, v, s)
943 })
944 .collect();
945 scored.sort_by(|a, b| a.2.total_cmp(&b.2));
946
947 let mut archived = Vec::new();
948 if let Some(threshold) = archive_threshold {
949 while let Some(entry) = scored.first() {
950 if entry.2 < threshold {
951 let (k, v, s) = scored.remove(0);
952 self.backend.delete(&k);
953 self.ttl_expiry.remove(&k);
954 self.lru_remove(&k);
955 archived.push((k, v, s));
956 } else {
957 break;
958 }
959 }
960 }
961 if let Some(max) = max_size {
962 while scored.len() > max {
963 let (k, v, s) = scored.remove(0);
964 self.backend.delete(&k);
965 self.ttl_expiry.remove(&k);
966 self.lru_remove(&k);
967 archived.push((k, v, s));
968 }
969 }
970 archived
971 }
972
973 fn lru_touch(&mut self, key: &K) {
975 if self.lru_max_size.is_none() {
976 return;
977 }
978 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
979 self.lru_order.remove(pos);
980 self.lru_order.push(key.clone());
981 }
982 }
983
984 fn lru_remove(&mut self, key: &K) {
986 if self.lru_max_size.is_none() {
987 return;
988 }
989 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
990 self.lru_order.remove(pos);
991 }
992 }
993
994 fn lru_evict(&mut self) -> Vec<(K, V)> {
996 let Some(max) = self.lru_max_size else {
997 return vec![];
998 };
999 let mut evicted = Vec::new();
1000 while self.backend.size() > max && !self.lru_order.is_empty() {
1001 let victim = self.lru_order.remove(0);
1002 if let Some(prev) = self.backend.get(&victim) {
1003 self.backend.delete(&victim);
1004 self.ttl_expiry.remove(&victim);
1005 evicted.push((victim, prev));
1006 }
1007 }
1008 evicted
1009 }
1010
1011 fn set_ttl_with(&mut self, key: &K, ttl: Option<f64>) {
1013 let ttl_ns = match ttl {
1014 Some(secs) => Some((secs * 1_000_000_000.0) as u64),
1015 None => self.default_ttl_ns,
1016 };
1017 if let Some(ns) = ttl_ns {
1018 self.ttl_expiry.insert(key.clone(), monotonic_ns() + ns);
1019 }
1020 }
1021}
1022
1023pub struct ReactiveMapOptions<K, V>
1025where
1026 K: Clone + Eq + Hash + Send + Sync + 'static,
1027 V: Clone + Send + Sync + 'static,
1028{
1029 pub name: String,
1030 pub backend: Option<Box<dyn MapBackend<K, V>>>,
1031 pub mutation_log: bool,
1032 pub default_ttl: Option<f64>,
1035 pub max_size: Option<usize>,
1037 pub retention: Option<RetentionPolicy<K, V>>,
1039}
1040
1041impl<K, V> Default for ReactiveMapOptions<K, V>
1042where
1043 K: Clone + Eq + Hash + Send + Sync + 'static,
1044 V: Clone + Send + Sync + 'static,
1045{
1046 fn default() -> Self {
1047 Self {
1048 name: "reactiveMap".into(),
1049 backend: None,
1050 mutation_log: false,
1051 default_ttl: None,
1052 max_size: None,
1053 retention: None,
1054 }
1055 }
1056}
1057
1058#[derive(Debug, Clone, PartialEq, Eq)]
1060pub struct MapConfigError(pub String);
1061
1062impl std::fmt::Display for MapConfigError {
1063 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1064 f.write_str(&self.0)
1065 }
1066}
1067
1068impl std::error::Error for MapConfigError {}
1069
1070impl<K, V> ReactiveMap<K, V>
1071where
1072 K: Clone + Eq + Hash + Send + Sync + 'static,
1073 V: Clone + Send + Sync + 'static,
1074{
1075 pub fn new(
1080 core: &Core,
1081 intern: InternFn<Vec<(K, V)>>,
1082 opts: ReactiveMapOptions<K, V>,
1083 ) -> Result<Self, MapConfigError> {
1084 if opts.max_size.is_some() && opts.retention.is_some() {
1085 return Err(MapConfigError(
1086 "max_size (LRU) and retention are mutually exclusive".into(),
1087 ));
1088 }
1089 if let Some(ref r) = opts.retention {
1090 if r.archive_threshold.is_none() && r.max_size.is_none() {
1091 return Err(MapConfigError(
1092 "retention requires at least one of archive_threshold or max_size".into(),
1093 ));
1094 }
1095 }
1096 if let Some(ttl) = opts.default_ttl {
1097 if ttl <= 0.0 {
1098 return Err(MapConfigError("default_ttl must be > 0".into()));
1099 }
1100 }
1101 let node_id = core
1102 .register_state(HandleId::new(0), false)
1103 .expect("register_state for ReactiveMap");
1104 let backend: Box<dyn MapBackend<K, V>> = opts
1105 .backend
1106 .unwrap_or_else(|| Box::new(HashMapBackend::new()));
1107 let mutation_log = if opts.mutation_log {
1108 Some(Vec::new())
1109 } else {
1110 None
1111 };
1112 let default_ttl_ns = opts.default_ttl.map(|secs| (secs * 1_000_000_000.0) as u64);
1113 let inner = MapInner {
1114 backend,
1115 mutation_log,
1116 structure_name: opts.name,
1117 ttl_expiry: HashMap::new(),
1118 default_ttl_ns,
1119 lru_order: Vec::new(),
1120 lru_max_size: opts.max_size,
1121 retention: opts.retention,
1122 };
1123 Ok(Self {
1124 inner: Mutex::new(inner),
1125 emitter: EmitHandle {
1126 core: core.weak_handle(),
1127 node_id,
1128 intern,
1129 version: AtomicU64::new(0),
1130 },
1131 node_id,
1132 })
1133 }
1134
1135 #[must_use]
1136 pub fn size(&self) -> usize {
1137 self.inner.lock().backend.size()
1138 }
1139
1140 pub fn has(&self, key: &K) -> bool {
1143 let (has, expired) = {
1144 let mut inner = self.inner.lock();
1145 let mut target_expired = false;
1146 if inner.default_ttl_ns.is_some() {
1148 if let Some(&exp) = inner.ttl_expiry.get(key) {
1149 if monotonic_ns() >= exp {
1150 target_expired = true;
1151 }
1152 }
1153 }
1154 let mut expired = inner.prune_expired_inner();
1156 if target_expired && !expired.iter().any(|(k, _)| k == key) {
1157 if let Some(prev) = inner.backend.get(key) {
1160 inner.backend.delete(key);
1161 inner.ttl_expiry.remove(key);
1162 inner.lru_remove(key);
1163 expired.push((key.clone(), prev));
1164 }
1165 }
1166 let has = if target_expired {
1167 false
1168 } else {
1169 let h = inner.backend.has(key);
1170 if h {
1171 inner.lru_touch(key);
1172 }
1173 h
1174 };
1175 (has, expired)
1176 };
1177 if !expired.is_empty() {
1178 let snapshot = self.inner.lock().backend.to_vec();
1179 let version = self.emitter.emit(snapshot);
1180 let mut inner = self.inner.lock();
1181 for (k, prev) in expired {
1182 inner.record(
1183 MapChange::Delete {
1184 key: k,
1185 previous: prev,
1186 reason: DeleteReason::Expired,
1187 },
1188 version.clone(),
1189 );
1190 }
1191 }
1192 has
1193 }
1194
1195 pub fn get(&self, key: &K) -> Option<V> {
1198 let (value, expired) = {
1199 let mut inner = self.inner.lock();
1200 let mut target_expired = false;
1201 if inner.default_ttl_ns.is_some() {
1202 if let Some(&exp) = inner.ttl_expiry.get(key) {
1203 if monotonic_ns() >= exp {
1204 target_expired = true;
1205 }
1206 }
1207 }
1208 let mut expired = inner.prune_expired_inner();
1210 if target_expired && !expired.iter().any(|(k, _)| k == key) {
1211 if let Some(prev) = inner.backend.get(key) {
1213 inner.backend.delete(key);
1214 inner.ttl_expiry.remove(key);
1215 inner.lru_remove(key);
1216 expired.push((key.clone(), prev));
1217 }
1218 }
1219 let value = if target_expired {
1220 None
1221 } else {
1222 let v = inner.backend.get(key);
1223 if v.is_some() {
1224 inner.lru_touch(key);
1225 }
1226 v
1227 };
1228 (value, expired)
1229 };
1230 if !expired.is_empty() {
1231 let snapshot = self.inner.lock().backend.to_vec();
1232 let version = self.emitter.emit(snapshot);
1233 let mut inner = self.inner.lock();
1234 for (k, prev) in expired {
1235 inner.record(
1236 MapChange::Delete {
1237 key: k,
1238 previous: prev,
1239 reason: DeleteReason::Expired,
1240 },
1241 version.clone(),
1242 );
1243 }
1244 }
1245 value
1246 }
1247
1248 pub fn set(&self, key: K, value: V) {
1249 self.set_with_ttl(key, value, None);
1250 }
1251
1252 pub fn set_with_ttl(&self, key: K, value: V, ttl: Option<f64>) {
1257 if let Some(t) = ttl {
1258 assert!(
1259 t > 0.0 && t.is_finite(),
1260 "per-call ttl must be positive and finite"
1261 );
1262 }
1263 let (snapshot, change, eviction_changes) = {
1264 let mut inner = self.inner.lock();
1265 let expired = inner.prune_expired_inner();
1266 let change = inner.mutation_log.is_some().then(|| MapChange::Set {
1267 key: key.clone(),
1268 value: value.clone(),
1269 });
1270 inner.set_ttl_with(&key, ttl);
1271 inner.lru_remove(&key);
1272 if inner.lru_max_size.is_some() {
1273 inner.lru_order.push(key.clone());
1274 }
1275 inner.backend.set(key, value);
1276 let evicted = inner.lru_evict();
1277 let archived = inner.apply_retention_inner();
1278 let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1279 for (k, prev) in expired {
1280 eviction_changes.push((k, prev, DeleteReason::Expired));
1281 }
1282 for (k, prev) in evicted {
1283 eviction_changes.push((k, prev, DeleteReason::LruEvict));
1284 }
1285 for (k, v, s) in &archived {
1286 if let Some(on_archive) =
1287 &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1288 {
1289 on_archive(k, v, *s);
1290 }
1291 eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1292 }
1293 (inner.backend.to_vec(), change, eviction_changes)
1294 };
1295 let version = self.emitter.emit(snapshot);
1296 if change.is_some() || !eviction_changes.is_empty() {
1297 let mut inner = self.inner.lock();
1298 if let Some(change) = change {
1299 inner.record(change, version.clone());
1300 }
1301 for (k, prev, reason) in eviction_changes {
1302 inner.record(
1303 MapChange::Delete {
1304 key: k,
1305 previous: prev,
1306 reason,
1307 },
1308 version.clone(),
1309 );
1310 }
1311 }
1312 }
1313
1314 pub fn set_many(&self, entries: Vec<(K, V)>) {
1315 self.set_many_with_ttl(entries, None);
1316 }
1317
1318 pub fn set_many_with_ttl(&self, entries: Vec<(K, V)>, ttl: Option<f64>) {
1323 if let Some(t) = ttl {
1324 assert!(
1325 t > 0.0 && t.is_finite(),
1326 "per-call ttl must be positive and finite"
1327 );
1328 }
1329 if entries.is_empty() {
1330 return;
1331 }
1332 let (snapshot, changes, eviction_changes) = {
1333 let mut inner = self.inner.lock();
1334 let expired = inner.prune_expired_inner();
1335 let changes: Option<Vec<MapChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1336 entries
1337 .iter()
1338 .map(|(k, v)| MapChange::Set {
1339 key: k.clone(),
1340 value: v.clone(),
1341 })
1342 .collect()
1343 });
1344 for (k, _) in &entries {
1345 inner.set_ttl_with(k, ttl);
1346 inner.lru_remove(k);
1347 if inner.lru_max_size.is_some() {
1348 inner.lru_order.push(k.clone());
1349 }
1350 }
1351 inner.backend.set_many(entries);
1352 let evicted = inner.lru_evict();
1353 let archived = inner.apply_retention_inner();
1354 let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1355 for (k, prev) in expired {
1356 eviction_changes.push((k, prev, DeleteReason::Expired));
1357 }
1358 for (k, prev) in evicted {
1359 eviction_changes.push((k, prev, DeleteReason::LruEvict));
1360 }
1361 for (k, v, s) in &archived {
1362 if let Some(on_archive) =
1363 &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1364 {
1365 on_archive(k, v, *s);
1366 }
1367 eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1368 }
1369 (inner.backend.to_vec(), changes, eviction_changes)
1370 };
1371 let version = self.emitter.emit(snapshot);
1372 if changes.is_some() || !eviction_changes.is_empty() {
1373 let mut inner = self.inner.lock();
1374 if let Some(changes) = changes {
1375 for change in changes {
1376 inner.record(change, version.clone());
1377 }
1378 }
1379 for (k, prev, reason) in eviction_changes {
1380 inner.record(
1381 MapChange::Delete {
1382 key: k,
1383 previous: prev,
1384 reason,
1385 },
1386 version.clone(),
1387 );
1388 }
1389 }
1390 }
1391
1392 pub fn delete(&self, key: &K) {
1393 let (snapshot, previous) = {
1394 let mut inner = self.inner.lock();
1395 let previous = inner.backend.get(key);
1396 if !inner.backend.delete(key) {
1397 return;
1398 }
1399 inner.ttl_expiry.remove(key);
1400 inner.lru_remove(key);
1401 (inner.backend.to_vec(), previous)
1402 };
1403 let version = self.emitter.emit(snapshot);
1404 if let Some(prev) = previous {
1405 self.inner.lock().record(
1406 MapChange::Delete {
1407 key: key.clone(),
1408 previous: prev,
1409 reason: DeleteReason::Explicit,
1410 },
1411 version,
1412 );
1413 }
1414 }
1415
1416 pub fn delete_many(&self, keys: &[K]) {
1417 let (snapshot, actually_deleted) = {
1418 let mut inner = self.inner.lock();
1419 let actually_deleted: Vec<(K, V)> = keys
1420 .iter()
1421 .filter_map(|k| inner.backend.get(k).map(|v| (k.clone(), v)))
1422 .collect();
1423 let removed = inner.backend.delete_many(keys);
1424 if removed == 0 {
1425 return;
1426 }
1427 for k in keys {
1428 inner.ttl_expiry.remove(k);
1429 inner.lru_remove(k);
1430 }
1431 (inner.backend.to_vec(), actually_deleted)
1432 };
1433 let version = self.emitter.emit(snapshot);
1434 if !actually_deleted.is_empty() {
1435 let mut inner = self.inner.lock();
1436 for (k, prev) in actually_deleted {
1437 inner.record(
1438 MapChange::Delete {
1439 key: k,
1440 previous: prev,
1441 reason: DeleteReason::Explicit,
1442 },
1443 version.clone(),
1444 );
1445 }
1446 }
1447 }
1448
1449 pub fn clear(&self) {
1450 let (snapshot, count) = {
1451 let mut inner = self.inner.lock();
1452 let count = inner.backend.clear();
1453 if count == 0 {
1454 return;
1455 }
1456 inner.ttl_expiry.clear();
1457 inner.lru_order.clear();
1458 (inner.backend.to_vec(), count)
1459 };
1460 let version = self.emitter.emit(snapshot);
1461 self.inner
1462 .lock()
1463 .record(MapChange::Clear { count }, version);
1464 }
1465
1466 pub fn prune_expired(&self) -> usize {
1468 let expired = {
1469 let mut inner = self.inner.lock();
1470 inner.prune_expired_inner()
1471 };
1472 if expired.is_empty() {
1473 return 0;
1474 }
1475 let count = expired.len();
1476 let snapshot = self.inner.lock().backend.to_vec();
1477 let version = self.emitter.emit(snapshot);
1478 let mut inner = self.inner.lock();
1479 for (k, prev) in expired {
1480 inner.record(
1481 MapChange::Delete {
1482 key: k,
1483 previous: prev,
1484 reason: DeleteReason::Expired,
1485 },
1486 version.clone(),
1487 );
1488 }
1489 count
1490 }
1491
1492 #[must_use]
1493 pub fn to_vec(&self) -> Vec<(K, V)> {
1494 self.inner.lock().backend.to_vec()
1495 }
1496
1497 #[must_use]
1498 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<MapChange<K, V>>>> {
1499 self.inner.lock().mutation_log.clone()
1500 }
1501}
1502
1503pub struct ReactiveIndex<K, V>
1511where
1512 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1513 V: Clone + Send + Sync + 'static,
1514{
1515 inner: Mutex<IndexInner<K, V>>,
1516 emitter: EmitHandle<Vec<IndexRow<K, V>>>,
1517 pub node_id: NodeId,
1518}
1519
1520pub type IndexEqualsFn<K, V> = Arc<dyn Fn(&IndexRow<K, V>, &IndexRow<K, V>) -> bool + Send + Sync>;
1522
1523struct IndexInner<K, V>
1524where
1525 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1526 V: Clone + Send + Sync + 'static,
1527{
1528 backend: Box<dyn IndexBackend<K, V>>,
1529 mutation_log: Option<Vec<BaseChange<IndexChange<K, V>>>>,
1530 structure_name: String,
1531 equals: Option<IndexEqualsFn<K, V>>,
1536}
1537
1538impl<K, V> IndexInner<K, V>
1539where
1540 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1541 V: Clone + Send + Sync + 'static,
1542{
1543 fn record(&mut self, change: IndexChange<K, V>, version: Version) {
1544 if let Some(log) = &mut self.mutation_log {
1545 log.push(BaseChange {
1546 structure: self.structure_name.clone(),
1547 version,
1548 t_ns: wall_clock_ns(),
1549 seq: None,
1550 lifecycle: Lifecycle::Data,
1551 change,
1552 });
1553 }
1554 }
1555}
1556
1557pub struct UpsertOptions<K, V>
1559where
1560 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1561 V: Clone + Send + Sync + 'static,
1562{
1563 pub equals: Option<IndexEqualsFn<K, V>>,
1565}
1566
1567impl<K, V> Default for UpsertOptions<K, V>
1568where
1569 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1570 V: Clone + Send + Sync + 'static,
1571{
1572 fn default() -> Self {
1573 Self { equals: None }
1574 }
1575}
1576
1577pub struct ReactiveIndexOptions<K, V>
1579where
1580 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1581 V: Clone + Send + Sync + 'static,
1582{
1583 pub name: String,
1584 pub backend: Option<Box<dyn IndexBackend<K, V>>>,
1585 pub mutation_log: bool,
1586 pub equals: Option<IndexEqualsFn<K, V>>,
1588}
1589
1590impl<K, V> Default for ReactiveIndexOptions<K, V>
1591where
1592 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1593 V: Clone + Send + Sync + 'static,
1594{
1595 fn default() -> Self {
1596 Self {
1597 name: "reactiveIndex".into(),
1598 backend: None,
1599 mutation_log: false,
1600 equals: None,
1601 }
1602 }
1603}
1604
1605impl<K, V> ReactiveIndex<K, V>
1606where
1607 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1608 V: Clone + Send + Sync + 'static,
1609{
1610 #[must_use]
1611 pub fn new(
1612 core: &Core,
1613 intern: InternFn<Vec<IndexRow<K, V>>>,
1614 opts: ReactiveIndexOptions<K, V>,
1615 ) -> Self {
1616 let node_id = core
1617 .register_state(HandleId::new(0), false)
1618 .expect("register_state for ReactiveIndex");
1619 let backend: Box<dyn IndexBackend<K, V>> = opts
1620 .backend
1621 .unwrap_or_else(|| Box::new(VecIndexBackend::new()));
1622 let mutation_log = if opts.mutation_log {
1623 Some(Vec::new())
1624 } else {
1625 None
1626 };
1627 let inner = IndexInner {
1628 backend,
1629 mutation_log,
1630 structure_name: opts.name,
1631 equals: opts.equals,
1632 };
1633 Self {
1634 inner: Mutex::new(inner),
1635 emitter: EmitHandle {
1636 core: core.weak_handle(),
1637 node_id,
1638 intern,
1639 version: AtomicU64::new(0),
1640 },
1641 node_id,
1642 }
1643 }
1644
1645 #[must_use]
1646 pub fn size(&self) -> usize {
1647 self.inner.lock().backend.size()
1648 }
1649
1650 #[must_use]
1651 pub fn has(&self, primary: &K) -> bool {
1652 self.inner.lock().backend.has(primary)
1653 }
1654
1655 #[must_use]
1656 pub fn get(&self, primary: &K) -> Option<V> {
1657 self.inner.lock().backend.get(primary)
1658 }
1659
1660 pub fn upsert(&self, primary: K, secondary: String, value: V) -> bool {
1663 self.upsert_with(primary, secondary, value, &UpsertOptions::default())
1664 }
1665
1666 pub fn upsert_with(
1671 &self,
1672 primary: K,
1673 secondary: String,
1674 value: V,
1675 opts: &UpsertOptions<K, V>,
1676 ) -> bool {
1677 let (is_new, snapshot, change) = {
1678 let mut inner = self.inner.lock();
1679 let eq_fn = opts.equals.as_ref().or(inner.equals.as_ref());
1681 if let Some(eq) = eq_fn {
1682 if let Some(existing_row) = inner.backend.get_row(&primary) {
1683 let proposed = IndexRow {
1684 primary: primary.clone(),
1685 secondary: secondary.clone(),
1686 value: value.clone(),
1687 };
1688 if eq(&existing_row, &proposed) {
1689 return false;
1690 }
1691 }
1692 }
1693 let change = inner.mutation_log.is_some().then(|| IndexChange::Upsert {
1694 primary: primary.clone(),
1695 secondary: secondary.clone(),
1696 value: value.clone(),
1697 });
1698 let is_new = inner.backend.upsert(primary, secondary, value);
1699 (is_new, inner.backend.to_ordered(), change)
1700 };
1701 let version = self.emitter.emit(snapshot);
1702 if let Some(change) = change {
1703 self.inner.lock().record(change, version);
1704 }
1705 is_new
1706 }
1707
1708 pub fn upsert_many(&self, rows: Vec<(K, String, V)>) {
1711 if rows.is_empty() {
1712 return;
1713 }
1714 let (snapshot, changes) = {
1715 let mut inner = self.inner.lock();
1716 let effective_rows: Vec<(K, String, V)> = if let Some(eq) = &inner.equals {
1718 rows.into_iter()
1719 .filter(|(pk, sec, val)| {
1720 if let Some(existing) = inner.backend.get_row(pk) {
1721 let proposed = IndexRow {
1722 primary: pk.clone(),
1723 secondary: sec.clone(),
1724 value: val.clone(),
1725 };
1726 !eq(&existing, &proposed)
1727 } else {
1728 true
1729 }
1730 })
1731 .collect()
1732 } else {
1733 rows
1734 };
1735 if effective_rows.is_empty() {
1736 return;
1737 }
1738 let changes: Option<Vec<IndexChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1739 effective_rows
1740 .iter()
1741 .map(|(k, s, v)| IndexChange::Upsert {
1742 primary: k.clone(),
1743 secondary: s.clone(),
1744 value: v.clone(),
1745 })
1746 .collect()
1747 });
1748 inner.backend.upsert_many(effective_rows);
1749 (inner.backend.to_ordered(), changes)
1750 };
1751 let version = self.emitter.emit(snapshot);
1752 if let Some(changes) = changes {
1753 let mut inner = self.inner.lock();
1754 for change in changes {
1755 inner.record(change, version.clone());
1756 }
1757 }
1758 }
1759
1760 pub fn delete(&self, primary: &K) {
1761 let snapshot = {
1762 let mut inner = self.inner.lock();
1763 if !inner.backend.delete(primary) {
1764 return;
1765 }
1766 inner.backend.to_ordered()
1767 };
1768 let version = self.emitter.emit(snapshot);
1769 self.inner.lock().record(
1770 IndexChange::Delete {
1771 primary: primary.clone(),
1772 },
1773 version,
1774 );
1775 }
1776
1777 pub fn delete_many(&self, primaries: &[K]) {
1778 let (snapshot, actually_deleted) = {
1779 let mut inner = self.inner.lock();
1780 let actually_deleted: Vec<K> = if inner.mutation_log.is_some() {
1782 primaries
1783 .iter()
1784 .filter(|k| inner.backend.has(k))
1785 .cloned()
1786 .collect()
1787 } else {
1788 vec![]
1789 };
1790 let removed = inner.backend.delete_many(primaries);
1791 if removed == 0 {
1792 return;
1793 }
1794 (inner.backend.to_ordered(), actually_deleted)
1795 };
1796 let version = self.emitter.emit(snapshot);
1797 if !actually_deleted.is_empty() {
1798 self.inner.lock().record(
1799 IndexChange::DeleteMany {
1800 primaries: actually_deleted,
1801 },
1802 version,
1803 );
1804 }
1805 }
1806
1807 pub fn clear(&self) {
1808 let (snapshot, count) = {
1809 let mut inner = self.inner.lock();
1810 let count = inner.backend.clear();
1811 if count == 0 {
1812 return;
1813 }
1814 (inner.backend.to_ordered(), count)
1815 };
1816 let version = self.emitter.emit(snapshot);
1817 self.inner
1818 .lock()
1819 .record(IndexChange::Clear { count }, version);
1820 }
1821
1822 #[must_use]
1823 pub fn to_ordered(&self) -> Vec<IndexRow<K, V>> {
1824 self.inner.lock().backend.to_ordered()
1825 }
1826
1827 #[must_use]
1828 pub fn to_primary_map(&self) -> Vec<(K, V)> {
1829 self.inner.lock().backend.to_primary_map()
1830 }
1831
1832 #[must_use]
1837 pub fn range_by_primary(&self, start: &K, end: &K) -> Vec<V>
1838 where
1839 K: Ord,
1840 {
1841 let mut rows: Vec<(K, V)> = self
1842 .inner
1843 .lock()
1844 .backend
1845 .to_primary_map()
1846 .into_iter()
1847 .filter(|(k, _)| k >= start && k < end)
1848 .collect();
1849 rows.sort_by(|a, b| a.0.cmp(&b.0));
1850 rows.into_iter().map(|(_, v)| v).collect()
1851 }
1852
1853 #[must_use]
1854 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<IndexChange<K, V>>>> {
1855 self.inner.lock().mutation_log.clone()
1856 }
1857}