1use std::collections::HashMap;
14use std::hash::Hash;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use parking_lot::Mutex;
19
20use graphrefly_core::{
21 monotonic_ns, wall_clock_ns, Core, CoreMailbox, HandleId, Message, NodeId, SubscriptionId,
22};
23
24use crate::backend::{
25 HashMapBackend, IndexBackend, IndexRow, ListBackend, LogBackend, MapBackend, VecIndexBackend,
26 VecListBackend, VecLogBackend,
27};
28use crate::changeset::{
29 BaseChange, DeleteReason, IndexChange, Lifecycle, ListChange, LogChange, MapChange, Version,
30};
31
32pub type InternFn<S> = Arc<dyn Fn(S) -> HandleId + Send + Sync>;
40
41struct EmitHandle<S> {
52 node_id: NodeId,
53 intern: InternFn<S>,
54 version: AtomicU64,
55}
56
57impl<S> EmitHandle<S> {
58 fn emit(&self, core: &Core, snapshot: S) -> Version {
61 let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
62 let handle = (self.intern)(snapshot);
63 core.emit(self.node_id, handle);
64 Version::Counter(ver)
65 }
66}
67
68struct SinkEmitter<S> {
75 mailbox: Arc<CoreMailbox>,
76 node_id: NodeId,
77 intern: InternFn<S>,
78 version: AtomicU64,
79}
80
81impl<S> SinkEmitter<S> {
82 fn emit(&self, snapshot: S) -> Version {
85 let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
86 let handle = (self.intern)(snapshot);
87 let _ = self.mailbox.post_emit(self.node_id, handle);
92 Version::Counter(ver)
93 }
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub struct IndexOutOfBounds;
103
104impl std::fmt::Display for IndexOutOfBounds {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.write_str("index out of bounds")
107 }
108}
109
110impl std::error::Error for IndexOutOfBounds {}
111
112pub struct ReactiveLog<T: Clone + Send + Sync + 'static> {
122 inner: Arc<Mutex<LogInner<T>>>,
123 emitter: EmitHandle<Vec<T>>,
124 pub node_id: NodeId,
126}
127
128struct LogInner<T: Clone + Send + Sync + 'static> {
129 backend: Box<dyn LogBackend<T>>,
130 mutation_log: Option<Vec<BaseChange<LogChange<T>>>>,
131 structure_name: String,
132}
133
134impl<T: Clone + Send + Sync + 'static> LogInner<T> {
135 fn record(&mut self, change: LogChange<T>, version: Version) {
136 if let Some(log) = &mut self.mutation_log {
137 log.push(BaseChange {
138 structure: self.structure_name.clone(),
139 version,
140 t_ns: wall_clock_ns(),
141 seq: None,
142 lifecycle: Lifecycle::Data,
143 change,
144 });
145 }
146 }
147}
148
149pub struct ReactiveLogOptions<T: Clone + Send + Sync + 'static> {
151 pub name: String,
152 pub max_size: Option<usize>,
153 pub backend: Option<Box<dyn LogBackend<T>>>,
154 pub mutation_log: bool,
155}
156
157impl<T: Clone + Send + Sync + 'static> Default for ReactiveLogOptions<T> {
158 fn default() -> Self {
159 Self {
160 name: "reactiveLog".into(),
161 max_size: None,
162 backend: None,
163 mutation_log: false,
164 }
165 }
166}
167
168impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
169 #[must_use]
173 pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveLogOptions<T>) -> Self {
174 let node_id = core
175 .register_state(HandleId::new(0), false)
176 .expect("register_state for ReactiveLog");
177 let backend: Box<dyn LogBackend<T>> = opts
178 .backend
179 .unwrap_or_else(|| Box::new(VecLogBackend::new(opts.max_size)));
180 let mutation_log = if opts.mutation_log {
181 Some(Vec::new())
182 } else {
183 None
184 };
185 let inner = LogInner {
186 backend,
187 mutation_log,
188 structure_name: opts.name,
189 };
190 Self {
191 inner: Arc::new(Mutex::new(inner)),
192 emitter: EmitHandle {
193 node_id,
194 intern,
195 version: AtomicU64::new(0),
196 },
197 node_id,
198 }
199 }
200
201 #[must_use]
202 pub fn size(&self) -> usize {
203 self.inner.lock().backend.size()
204 }
205
206 #[must_use]
207 pub fn at(&self, index: i64) -> Option<T> {
208 self.inner.lock().backend.at(index)
209 }
210
211 pub fn append(&self, core: &Core, value: T) {
212 let (snapshot, change) = {
213 let mut inner = self.inner.lock();
214 let change = inner.mutation_log.is_some().then(|| LogChange::Append {
215 value: value.clone(),
216 });
217 inner.backend.append(value);
218 (inner.backend.to_vec(), change)
219 };
220 let version = self.emitter.emit(core, snapshot);
221 if let Some(change) = change {
222 self.inner.lock().record(change, version);
223 }
224 }
225
226 pub fn append_many(&self, core: &Core, values: Vec<T>) {
227 if values.is_empty() {
228 return;
229 }
230 let (snapshot, change) = {
231 let mut inner = self.inner.lock();
232 let change = inner.mutation_log.is_some().then(|| LogChange::AppendMany {
233 values: values.clone(),
234 });
235 inner.backend.append_many(values);
236 (inner.backend.to_vec(), change)
237 };
238 let version = self.emitter.emit(core, snapshot);
239 if let Some(change) = change {
240 self.inner.lock().record(change, version);
241 }
242 }
243
244 pub fn clear(&self, core: &Core) {
245 let (snapshot, count) = {
246 let mut inner = self.inner.lock();
247 let count = inner.backend.clear();
248 if count == 0 {
249 return;
250 }
251 (inner.backend.to_vec(), count)
252 };
253 let version = self.emitter.emit(core, snapshot);
254 self.inner
255 .lock()
256 .record(LogChange::Clear { count }, version);
257 }
258
259 pub fn trim_head(&self, core: &Core, n: usize) {
260 if n == 0 {
261 return;
262 }
263 let (snapshot, actual) = {
264 let mut inner = self.inner.lock();
265 let actual = inner.backend.trim_head(n);
266 if actual == 0 {
267 return;
268 }
269 (inner.backend.to_vec(), actual)
270 };
271 let version = self.emitter.emit(core, snapshot);
272 self.inner
273 .lock()
274 .record(LogChange::TrimHead { n: actual }, version);
275 }
276
277 #[must_use]
278 pub fn to_vec(&self) -> Vec<T> {
279 self.inner.lock().backend.to_vec()
280 }
281
282 #[must_use]
285 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<LogChange<T>>>> {
286 self.inner.lock().mutation_log.clone()
287 }
288}
289
290pub enum ViewSpec {
296 Tail { n: usize },
298 Slice { start: usize, stop: Option<usize> },
300 FromCursor {
303 cursor_node: NodeId,
304 read_cursor: Arc<dyn Fn(HandleId) -> usize + Send + Sync>,
305 },
306}
307
308pub struct ReactiveSub {
314 subs: Vec<(NodeId, SubscriptionId)>,
315}
316
317impl ReactiveSub {
318 pub fn detach(&mut self, core: &Core) {
322 for (node_id, sub_id) in self.subs.drain(..) {
323 core.unsubscribe(node_id, sub_id);
324 }
325 }
326}
327
328pub struct LogView {
331 pub node_id: NodeId,
333 sub: ReactiveSub,
334}
335
336impl LogView {
337 pub fn detach(&mut self, core: &Core) {
339 self.sub.detach(core);
340 }
341}
342
343pub struct ScanHandle {
346 pub node_id: NodeId,
348 sub: ReactiveSub,
349}
350
351impl ScanHandle {
352 pub fn detach(&mut self, core: &Core) {
354 self.sub.detach(core);
355 }
356}
357
358pub trait AppendLogSink<T>: Send + Sync {
364 fn append_entries(&self, entries: &[T])
366 -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
367 fn load_entries(&self) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>;
370}
371
372pub struct AttachStorageHandle {
375 sub: ReactiveSub,
376}
377
378impl AttachStorageHandle {
379 pub fn detach(&mut self, core: &Core) {
382 self.sub.detach(core);
383 }
384}
385
386#[derive(Debug, Clone, Copy, Default)]
396pub struct AttachOptions {
397 pub skip_cached_replay: bool,
398}
399
400impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
401 #[allow(clippy::too_many_lines)]
409 pub fn view(&self, core: &Core, spec: ViewSpec, intern: InternFn<Vec<T>>) -> LogView {
410 let view_node = core
411 .register_state(HandleId::new(0), false)
412 .expect("register_state for LogView");
413 let view_emitter = Arc::new(SinkEmitter {
416 mailbox: core.mailbox(),
417 node_id: view_node,
418 intern,
419 version: AtomicU64::new(0),
420 });
421
422 let inner = Arc::clone(&self.inner);
423 let mut subscriptions: Vec<(NodeId, SubscriptionId)> = Vec::new();
424
425 match spec {
426 ViewSpec::Tail { n } => {
427 let inner_c = Arc::clone(&inner);
428 let emitter_c = Arc::clone(&view_emitter);
429 let sub = core.subscribe(
430 self.node_id,
431 Arc::new(move |msgs| {
432 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
433 let guard = inner_c.lock();
434 let data = guard.backend.to_vec();
435 let start = data.len().saturating_sub(n);
436 let view = data[start..].to_vec();
437 drop(guard);
438 emitter_c.emit(view);
439 }
440 }),
441 );
442 subscriptions.push((self.node_id, sub));
443 }
444 ViewSpec::Slice { start, stop } => {
445 let inner_c = Arc::clone(&inner);
446 let emitter_c = Arc::clone(&view_emitter);
447 let sub = core.subscribe(
448 self.node_id,
449 Arc::new(move |msgs| {
450 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
451 let guard = inner_c.lock();
452 let data = guard.backend.to_vec();
453 let end = stop.unwrap_or(data.len()).min(data.len());
454 let s = start.min(end);
455 let view = data[s..end].to_vec();
456 drop(guard);
457 emitter_c.emit(view);
458 }
459 }),
460 );
461 subscriptions.push((self.node_id, sub));
462 }
463 ViewSpec::FromCursor {
464 cursor_node,
465 read_cursor,
466 } => {
467 let cursor_pos = Arc::new(Mutex::new(0usize));
468
469 let cursor_pos_c = Arc::clone(&cursor_pos);
471 let inner_c = Arc::clone(&inner);
472 let emitter_c = Arc::clone(&view_emitter);
473 let read_cursor_c = Arc::clone(&read_cursor);
474 let sub_cursor = core.subscribe(
475 cursor_node,
476 Arc::new(move |msgs| {
477 for m in msgs {
478 if let Message::Data(h) = m {
479 let pos = read_cursor_c(*h);
480 *cursor_pos_c.lock() = pos;
481 let guard = inner_c.lock();
482 let data = guard.backend.to_vec();
483 let s = pos.min(data.len());
484 let view = data[s..].to_vec();
485 drop(guard);
486 emitter_c.emit(view);
487 }
488 }
489 }),
490 );
491 subscriptions.push((cursor_node, sub_cursor));
492
493 let cursor_pos_c2 = Arc::clone(&cursor_pos);
495 let inner_c2 = Arc::clone(&inner);
496 let emitter_c2 = view_emitter;
497 let sub_log = core.subscribe(
498 self.node_id,
499 Arc::new(move |msgs| {
500 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
501 let pos = *cursor_pos_c2.lock();
502 let guard = inner_c2.lock();
503 let data = guard.backend.to_vec();
504 let s = pos.min(data.len());
505 let view = data[s..].to_vec();
506 drop(guard);
507 emitter_c2.emit(view);
508 }
509 }),
510 );
511 subscriptions.push((self.node_id, sub_log));
512 }
513 }
514
515 LogView {
516 node_id: view_node,
517 sub: ReactiveSub {
518 subs: subscriptions,
519 },
520 }
521 }
522
523 pub fn scan<TAcc: Clone + Send + Sync + 'static>(
529 &self,
530 core: &Core,
531 initial: TAcc,
532 step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
533 intern: InternFn<TAcc>,
534 ) -> ScanHandle {
535 struct ScanState<T, TAcc> {
536 acc: TAcc,
537 processed: usize,
538 initial: TAcc,
539 step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
540 }
541
542 let scan_node = core
543 .register_state(HandleId::new(0), false)
544 .expect("register_state for Scan");
545
546 let state = Arc::new(Mutex::new(ScanState {
547 acc: initial.clone(),
548 processed: 0,
549 initial,
550 step,
551 }));
552 let inner = Arc::clone(&self.inner);
553 let scan_emitter = Arc::new(SinkEmitter {
556 mailbox: core.mailbox(),
557 node_id: scan_node,
558 intern,
559 version: AtomicU64::new(0),
560 });
561
562 let sub = core.subscribe(
563 self.node_id,
564 Arc::new(move |msgs| {
565 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
566 let mut ss = state.lock();
567 let guard = inner.lock();
568 let data = guard.backend.to_vec();
569 drop(guard);
570
571 if data.len() < ss.processed {
572 ss.acc = ss.initial.clone();
574 ss.processed = 0;
575 }
576 for item in &data[ss.processed..] {
577 ss.acc = (ss.step)(&ss.acc, item);
578 }
579 ss.processed = data.len();
580 let acc = ss.acc.clone();
581 drop(ss);
582 scan_emitter.emit(acc);
583 }
584 }),
585 );
586
587 ScanHandle {
588 node_id: scan_node,
589 sub: ReactiveSub {
590 subs: vec![(self.node_id, sub)],
591 },
592 }
593 }
594
595 pub fn attach(
602 &self,
603 core: &Core,
604 upstream: NodeId,
605 read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
606 ) -> ReactiveSub {
607 self.attach_with_options(core, upstream, read_value, AttachOptions::default())
608 }
609
610 pub fn attach_with_options(
615 &self,
616 core: &Core,
617 upstream: NodeId,
618 read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
619 opts: AttachOptions,
620 ) -> ReactiveSub {
621 let inner = Arc::clone(&self.inner);
622 let mailbox = core.mailbox();
627 let node_id = self.node_id;
628 let intern = Arc::clone(&self.emitter.intern);
629
630 let suppress = if opts.skip_cached_replay {
635 let cache = core.cache_of(upstream);
636 cache != graphrefly_core::NO_HANDLE
637 } else {
638 false
639 };
640 let skip_state = Arc::new(parking_lot::Mutex::new(suppress));
641
642 let sub = core.subscribe(
643 upstream,
644 Arc::new(move |msgs| {
645 let should_skip = {
649 let mut s = skip_state.lock();
650 let has_data = msgs.iter().any(|m| matches!(m, Message::Data(_)));
651 let do_skip = *s && has_data;
652 if do_skip {
653 *s = false;
654 }
655 do_skip
656 };
657 if should_skip {
658 return;
659 }
660 for m in msgs {
661 if let Message::Data(h) = m {
662 let value = read_value(*h);
663 let snapshot = {
664 let mut guard = inner.lock();
665 guard.backend.append(value);
666 guard.backend.to_vec()
667 };
668 let handle = (intern)(snapshot);
669 let _ = mailbox.post_emit(node_id, handle);
672 }
673 }
674 }),
675 );
676 ReactiveSub {
677 subs: vec![(upstream, sub)],
678 }
679 }
680
681 pub fn attach_storage(
689 &self,
690 core: &Core,
691 sinks: Vec<Arc<dyn AppendLogSink<T>>>,
692 preload: bool,
693 ) -> AttachStorageHandle {
694 if sinks.is_empty() {
695 let sub = core.subscribe(self.node_id, Arc::new(|_| {}));
696 return AttachStorageHandle {
697 sub: ReactiveSub {
698 subs: vec![(self.node_id, sub)],
699 },
700 };
701 }
702
703 if preload {
704 for sink in &sinks {
705 if let Ok(entries) = sink.load_entries() {
706 if !entries.is_empty() {
707 self.append_many(core, entries);
708 break;
709 }
710 }
711 }
712 }
713
714 let current_size = self.size();
715 let delivered: Vec<Arc<Mutex<usize>>> = sinks
718 .iter()
719 .map(|_| Arc::new(Mutex::new(current_size)))
720 .collect();
721
722 let inner = Arc::clone(&self.inner);
723 let sinks_arc = sinks;
724 let delivered_arc = delivered;
725
726 let sub = core.subscribe(
727 self.node_id,
728 Arc::new(move |msgs| {
729 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
730 let guard = inner.lock();
731 let data = guard.backend.to_vec();
732 drop(guard);
733
734 for (i, sink) in sinks_arc.iter().enumerate() {
735 let mut del = delivered_arc[i].lock();
736 let result = match data.len().cmp(&*del) {
737 std::cmp::Ordering::Greater => sink.append_entries(&data[*del..]),
738 std::cmp::Ordering::Less => sink.append_entries(&data),
739 std::cmp::Ordering::Equal => continue,
740 };
741 match result {
742 Ok(()) => *del = data.len(),
743 Err(e) => eprintln!("attach_storage sink[{i}] error: {e}"),
744 }
745 }
746 }
747 }),
748 );
749
750 AttachStorageHandle {
751 sub: ReactiveSub {
752 subs: vec![(self.node_id, sub)],
753 },
754 }
755 }
756}
757
758pub struct ReactiveList<T: Clone + Send + Sync + 'static> {
766 inner: Mutex<ListInner<T>>,
767 emitter: EmitHandle<Vec<T>>,
768 pub node_id: NodeId,
769}
770
771struct ListInner<T: Clone + Send + Sync + 'static> {
772 backend: Box<dyn ListBackend<T>>,
773 mutation_log: Option<Vec<BaseChange<ListChange<T>>>>,
774 structure_name: String,
775}
776
777impl<T: Clone + Send + Sync + 'static> ListInner<T> {
778 fn record(&mut self, change: ListChange<T>, version: Version) {
779 if let Some(log) = &mut self.mutation_log {
780 log.push(BaseChange {
781 structure: self.structure_name.clone(),
782 version,
783 t_ns: wall_clock_ns(),
784 seq: None,
785 lifecycle: Lifecycle::Data,
786 change,
787 });
788 }
789 }
790}
791
792pub struct ReactiveListOptions<T: Clone + Send + Sync + 'static> {
794 pub name: String,
795 pub backend: Option<Box<dyn ListBackend<T>>>,
796 pub mutation_log: bool,
797}
798
799impl<T: Clone + Send + Sync + 'static> Default for ReactiveListOptions<T> {
800 fn default() -> Self {
801 Self {
802 name: "reactiveList".into(),
803 backend: None,
804 mutation_log: false,
805 }
806 }
807}
808
809impl<T: Clone + Send + Sync + 'static> ReactiveList<T> {
810 #[must_use]
811 pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveListOptions<T>) -> Self {
812 let node_id = core
813 .register_state(HandleId::new(0), false)
814 .expect("register_state for ReactiveList");
815 let backend: Box<dyn ListBackend<T>> = opts
816 .backend
817 .unwrap_or_else(|| Box::new(VecListBackend::new()));
818 let mutation_log = if opts.mutation_log {
819 Some(Vec::new())
820 } else {
821 None
822 };
823 let inner = ListInner {
824 backend,
825 mutation_log,
826 structure_name: opts.name,
827 };
828 Self {
829 inner: Mutex::new(inner),
830 emitter: EmitHandle {
831 node_id,
832 intern,
833 version: AtomicU64::new(0),
834 },
835 node_id,
836 }
837 }
838
839 #[must_use]
840 pub fn size(&self) -> usize {
841 self.inner.lock().backend.size()
842 }
843
844 #[must_use]
845 pub fn at(&self, index: i64) -> Option<T> {
846 self.inner.lock().backend.at(index)
847 }
848
849 pub fn append(&self, core: &Core, value: T) {
850 let (snapshot, change) = {
851 let mut inner = self.inner.lock();
852 let change = inner.mutation_log.is_some().then(|| ListChange::Append {
853 value: value.clone(),
854 });
855 inner.backend.append(value);
856 (inner.backend.to_vec(), change)
857 };
858 let version = self.emitter.emit(core, snapshot);
859 if let Some(change) = change {
860 self.inner.lock().record(change, version);
861 }
862 }
863
864 pub fn append_many(&self, core: &Core, values: Vec<T>) {
865 if values.is_empty() {
866 return;
867 }
868 let (snapshot, change) = {
869 let mut inner = self.inner.lock();
870 let change = inner
871 .mutation_log
872 .is_some()
873 .then(|| ListChange::AppendMany {
874 values: values.clone(),
875 });
876 inner.backend.append_many(values);
877 (inner.backend.to_vec(), change)
878 };
879 let version = self.emitter.emit(core, snapshot);
880 if let Some(change) = change {
881 self.inner.lock().record(change, version);
882 }
883 }
884
885 pub fn insert(&self, core: &Core, index: usize, value: T) -> Result<(), IndexOutOfBounds> {
888 let (snapshot, change) = {
889 let mut inner = self.inner.lock();
890 if index > inner.backend.size() {
891 return Err(IndexOutOfBounds);
892 }
893 let change = inner.mutation_log.is_some().then(|| ListChange::Insert {
894 index,
895 value: value.clone(),
896 });
897 inner.backend.insert(index, value);
898 (inner.backend.to_vec(), change)
899 };
900 let version = self.emitter.emit(core, snapshot);
901 if let Some(change) = change {
902 self.inner.lock().record(change, version);
903 }
904 Ok(())
905 }
906
907 pub fn insert_many(
910 &self,
911 core: &Core,
912 index: usize,
913 values: Vec<T>,
914 ) -> Result<(), IndexOutOfBounds> {
915 if values.is_empty() {
916 return Ok(());
917 }
918 let (snapshot, change) = {
919 let mut inner = self.inner.lock();
920 if index > inner.backend.size() {
921 return Err(IndexOutOfBounds);
922 }
923 let change = inner
924 .mutation_log
925 .is_some()
926 .then(|| ListChange::InsertMany {
927 index,
928 values: values.clone(),
929 });
930 inner.backend.insert_many(index, values);
931 (inner.backend.to_vec(), change)
932 };
933 let version = self.emitter.emit(core, snapshot);
934 if let Some(change) = change {
935 self.inner.lock().record(change, version);
936 }
937 Ok(())
938 }
939
940 pub fn pop(&self, core: &Core, index: i64) -> Option<T> {
943 let (value, snapshot, change) = {
944 let mut inner = self.inner.lock();
945 let value = inner.backend.pop(index)?;
946 let change = inner.mutation_log.is_some().then(|| ListChange::Pop {
947 index,
948 value: value.clone(),
949 });
950 let snapshot = inner.backend.to_vec();
951 (value, snapshot, change)
952 };
953 let version = self.emitter.emit(core, snapshot);
954 if let Some(change) = change {
955 self.inner.lock().record(change, version);
956 }
957 Some(value)
958 }
959
960 pub fn clear(&self, core: &Core) {
961 let (snapshot, count) = {
962 let mut inner = self.inner.lock();
963 let count = inner.backend.clear();
964 if count == 0 {
965 return;
966 }
967 (inner.backend.to_vec(), count)
968 };
969 let version = self.emitter.emit(core, snapshot);
970 self.inner
971 .lock()
972 .record(ListChange::Clear { count }, version);
973 }
974
975 #[must_use]
976 pub fn to_vec(&self) -> Vec<T> {
977 self.inner.lock().backend.to_vec()
978 }
979
980 #[must_use]
981 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<ListChange<T>>>> {
982 self.inner.lock().mutation_log.clone()
983 }
984}
985
986pub struct ReactiveMap<K, V>
995where
996 K: Clone + Eq + Hash + Send + Sync + 'static,
997 V: Clone + Send + Sync + 'static,
998{
999 inner: Mutex<MapInner<K, V>>,
1000 emitter: EmitHandle<Vec<(K, V)>>,
1001 pub node_id: NodeId,
1002}
1003
1004pub struct RetentionPolicy<K, V>
1010where
1011 K: Clone + Eq + Hash + Send + Sync + 'static,
1012 V: Clone + Send + Sync + 'static,
1013{
1014 pub score: Arc<dyn Fn(&K, &V) -> f64 + Send + Sync>,
1016 pub archive_threshold: Option<f64>,
1018 pub max_size: Option<usize>,
1020 pub on_archive: Option<Arc<dyn Fn(&K, &V, f64) + Send + Sync>>,
1022}
1023
1024struct MapInner<K, V>
1025where
1026 K: Clone + Eq + Hash + Send + Sync + 'static,
1027 V: Clone + Send + Sync + 'static,
1028{
1029 backend: Box<dyn MapBackend<K, V>>,
1030 mutation_log: Option<Vec<BaseChange<MapChange<K, V>>>>,
1031 structure_name: String,
1032 ttl_expiry: HashMap<K, u64>,
1034 default_ttl_ns: Option<u64>,
1036 lru_order: Vec<K>,
1038 lru_max_size: Option<usize>,
1040 retention: Option<RetentionPolicy<K, V>>,
1042}
1043
1044impl<K, V> MapInner<K, V>
1045where
1046 K: Clone + Eq + Hash + Send + Sync + 'static,
1047 V: Clone + Send + Sync + 'static,
1048{
1049 fn record(&mut self, change: MapChange<K, V>, version: Version) {
1050 if let Some(log) = &mut self.mutation_log {
1051 log.push(BaseChange {
1052 structure: self.structure_name.clone(),
1053 version,
1054 t_ns: wall_clock_ns(),
1055 seq: None,
1056 lifecycle: Lifecycle::Data,
1057 change,
1058 });
1059 }
1060 }
1061
1062 fn prune_expired_inner(&mut self) -> Vec<(K, V)> {
1064 if self.ttl_expiry.is_empty() {
1065 return vec![];
1066 }
1067 let now = monotonic_ns();
1068 let expired_keys: Vec<K> = self
1069 .ttl_expiry
1070 .iter()
1071 .filter(|(_, &exp)| now >= exp)
1072 .map(|(k, _)| k.clone())
1073 .collect();
1074 let mut expired = Vec::new();
1075 for k in expired_keys {
1076 if let Some(prev) = self.backend.get(&k) {
1077 self.backend.delete(&k);
1078 self.ttl_expiry.remove(&k);
1079 self.lru_remove(&k);
1080 expired.push((k, prev));
1081 }
1082 }
1083 expired
1084 }
1085
1086 fn apply_retention_inner(&mut self) -> Vec<(K, V, f64)> {
1088 let (score_fn, archive_threshold, max_size) = match &self.retention {
1090 Some(r) => (Arc::clone(&r.score), r.archive_threshold, r.max_size),
1091 None => return vec![],
1092 };
1093 let entries = self.backend.to_vec();
1094 if entries.is_empty() {
1095 return vec![];
1096 }
1097 let mut scored: Vec<(K, V, f64)> = entries
1098 .into_iter()
1099 .map(|(k, v)| {
1100 let s = (score_fn)(&k, &v);
1101 (k, v, s)
1102 })
1103 .collect();
1104 scored.sort_by(|a, b| a.2.total_cmp(&b.2));
1105
1106 let mut archived = Vec::new();
1107 if let Some(threshold) = archive_threshold {
1108 while let Some(entry) = scored.first() {
1109 if entry.2 < threshold {
1110 let (k, v, s) = scored.remove(0);
1111 self.backend.delete(&k);
1112 self.ttl_expiry.remove(&k);
1113 self.lru_remove(&k);
1114 archived.push((k, v, s));
1115 } else {
1116 break;
1117 }
1118 }
1119 }
1120 if let Some(max) = max_size {
1121 while scored.len() > max {
1122 let (k, v, s) = scored.remove(0);
1123 self.backend.delete(&k);
1124 self.ttl_expiry.remove(&k);
1125 self.lru_remove(&k);
1126 archived.push((k, v, s));
1127 }
1128 }
1129 archived
1130 }
1131
1132 fn lru_touch(&mut self, key: &K) {
1134 if self.lru_max_size.is_none() {
1135 return;
1136 }
1137 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
1138 self.lru_order.remove(pos);
1139 self.lru_order.push(key.clone());
1140 }
1141 }
1142
1143 fn lru_remove(&mut self, key: &K) {
1145 if self.lru_max_size.is_none() {
1146 return;
1147 }
1148 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
1149 self.lru_order.remove(pos);
1150 }
1151 }
1152
1153 fn lru_evict(&mut self) -> Vec<(K, V)> {
1155 let Some(max) = self.lru_max_size else {
1156 return vec![];
1157 };
1158 let mut evicted = Vec::new();
1159 while self.backend.size() > max && !self.lru_order.is_empty() {
1160 let victim = self.lru_order.remove(0);
1161 if let Some(prev) = self.backend.get(&victim) {
1162 self.backend.delete(&victim);
1163 self.ttl_expiry.remove(&victim);
1164 evicted.push((victim, prev));
1165 }
1166 }
1167 evicted
1168 }
1169
1170 fn set_ttl_with(&mut self, key: &K, ttl: Option<f64>) {
1172 let ttl_ns = match ttl {
1173 Some(secs) => Some((secs * 1_000_000_000.0) as u64),
1174 None => self.default_ttl_ns,
1175 };
1176 if let Some(ns) = ttl_ns {
1177 self.ttl_expiry.insert(key.clone(), monotonic_ns() + ns);
1178 }
1179 }
1180}
1181
1182pub struct ReactiveMapOptions<K, V>
1184where
1185 K: Clone + Eq + Hash + Send + Sync + 'static,
1186 V: Clone + Send + Sync + 'static,
1187{
1188 pub name: String,
1189 pub backend: Option<Box<dyn MapBackend<K, V>>>,
1190 pub mutation_log: bool,
1191 pub default_ttl: Option<f64>,
1194 pub max_size: Option<usize>,
1196 pub retention: Option<RetentionPolicy<K, V>>,
1198}
1199
1200impl<K, V> Default for ReactiveMapOptions<K, V>
1201where
1202 K: Clone + Eq + Hash + Send + Sync + 'static,
1203 V: Clone + Send + Sync + 'static,
1204{
1205 fn default() -> Self {
1206 Self {
1207 name: "reactiveMap".into(),
1208 backend: None,
1209 mutation_log: false,
1210 default_ttl: None,
1211 max_size: None,
1212 retention: None,
1213 }
1214 }
1215}
1216
1217#[derive(Debug, Clone, PartialEq, Eq)]
1219pub struct MapConfigError(pub String);
1220
1221impl std::fmt::Display for MapConfigError {
1222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1223 f.write_str(&self.0)
1224 }
1225}
1226
1227impl std::error::Error for MapConfigError {}
1228
1229impl<K, V> ReactiveMap<K, V>
1230where
1231 K: Clone + Eq + Hash + Send + Sync + 'static,
1232 V: Clone + Send + Sync + 'static,
1233{
1234 pub fn new(
1239 core: &Core,
1240 intern: InternFn<Vec<(K, V)>>,
1241 opts: ReactiveMapOptions<K, V>,
1242 ) -> Result<Self, MapConfigError> {
1243 if opts.max_size.is_some() && opts.retention.is_some() {
1244 return Err(MapConfigError(
1245 "max_size (LRU) and retention are mutually exclusive".into(),
1246 ));
1247 }
1248 if let Some(ref r) = opts.retention {
1249 if r.archive_threshold.is_none() && r.max_size.is_none() {
1250 return Err(MapConfigError(
1251 "retention requires at least one of archive_threshold or max_size".into(),
1252 ));
1253 }
1254 }
1255 if let Some(ttl) = opts.default_ttl {
1256 if ttl <= 0.0 {
1257 return Err(MapConfigError("default_ttl must be > 0".into()));
1258 }
1259 }
1260 let node_id = core
1261 .register_state(HandleId::new(0), false)
1262 .expect("register_state for ReactiveMap");
1263 let backend: Box<dyn MapBackend<K, V>> = opts
1264 .backend
1265 .unwrap_or_else(|| Box::new(HashMapBackend::new()));
1266 let mutation_log = if opts.mutation_log {
1267 Some(Vec::new())
1268 } else {
1269 None
1270 };
1271 let default_ttl_ns = opts.default_ttl.map(|secs| (secs * 1_000_000_000.0) as u64);
1272 let inner = MapInner {
1273 backend,
1274 mutation_log,
1275 structure_name: opts.name,
1276 ttl_expiry: HashMap::new(),
1277 default_ttl_ns,
1278 lru_order: Vec::new(),
1279 lru_max_size: opts.max_size,
1280 retention: opts.retention,
1281 };
1282 Ok(Self {
1283 inner: Mutex::new(inner),
1284 emitter: EmitHandle {
1285 node_id,
1286 intern,
1287 version: AtomicU64::new(0),
1288 },
1289 node_id,
1290 })
1291 }
1292
1293 #[must_use]
1294 pub fn size(&self) -> usize {
1295 self.inner.lock().backend.size()
1296 }
1297
1298 pub fn has(&self, core: &Core, key: &K) -> bool {
1301 let (has, expired) = {
1302 let mut inner = self.inner.lock();
1303 let mut target_expired = false;
1304 if inner.default_ttl_ns.is_some() {
1306 if let Some(&exp) = inner.ttl_expiry.get(key) {
1307 if monotonic_ns() >= exp {
1308 target_expired = true;
1309 }
1310 }
1311 }
1312 let mut expired = inner.prune_expired_inner();
1314 if target_expired && !expired.iter().any(|(k, _)| k == key) {
1315 if let Some(prev) = inner.backend.get(key) {
1318 inner.backend.delete(key);
1319 inner.ttl_expiry.remove(key);
1320 inner.lru_remove(key);
1321 expired.push((key.clone(), prev));
1322 }
1323 }
1324 let has = if target_expired {
1325 false
1326 } else {
1327 let h = inner.backend.has(key);
1328 if h {
1329 inner.lru_touch(key);
1330 }
1331 h
1332 };
1333 (has, expired)
1334 };
1335 if !expired.is_empty() {
1336 let snapshot = self.inner.lock().backend.to_vec();
1337 let version = self.emitter.emit(core, snapshot);
1338 let mut inner = self.inner.lock();
1339 for (k, prev) in expired {
1340 inner.record(
1341 MapChange::Delete {
1342 key: k,
1343 previous: prev,
1344 reason: DeleteReason::Expired,
1345 },
1346 version.clone(),
1347 );
1348 }
1349 }
1350 has
1351 }
1352
1353 pub fn get(&self, core: &Core, key: &K) -> Option<V> {
1356 let (value, expired) = {
1357 let mut inner = self.inner.lock();
1358 let mut target_expired = false;
1359 if inner.default_ttl_ns.is_some() {
1360 if let Some(&exp) = inner.ttl_expiry.get(key) {
1361 if monotonic_ns() >= exp {
1362 target_expired = true;
1363 }
1364 }
1365 }
1366 let mut expired = inner.prune_expired_inner();
1368 if target_expired && !expired.iter().any(|(k, _)| k == key) {
1369 if let Some(prev) = inner.backend.get(key) {
1371 inner.backend.delete(key);
1372 inner.ttl_expiry.remove(key);
1373 inner.lru_remove(key);
1374 expired.push((key.clone(), prev));
1375 }
1376 }
1377 let value = if target_expired {
1378 None
1379 } else {
1380 let v = inner.backend.get(key);
1381 if v.is_some() {
1382 inner.lru_touch(key);
1383 }
1384 v
1385 };
1386 (value, expired)
1387 };
1388 if !expired.is_empty() {
1389 let snapshot = self.inner.lock().backend.to_vec();
1390 let version = self.emitter.emit(core, snapshot);
1391 let mut inner = self.inner.lock();
1392 for (k, prev) in expired {
1393 inner.record(
1394 MapChange::Delete {
1395 key: k,
1396 previous: prev,
1397 reason: DeleteReason::Expired,
1398 },
1399 version.clone(),
1400 );
1401 }
1402 }
1403 value
1404 }
1405
1406 pub fn set(&self, core: &Core, key: K, value: V) {
1407 self.set_with_ttl(core, key, value, None);
1408 }
1409
1410 pub fn set_with_ttl(&self, core: &Core, key: K, value: V, ttl: Option<f64>) {
1415 if let Some(t) = ttl {
1416 assert!(
1417 t > 0.0 && t.is_finite(),
1418 "per-call ttl must be positive and finite"
1419 );
1420 }
1421 let (snapshot, change, eviction_changes) = {
1422 let mut inner = self.inner.lock();
1423 let expired = inner.prune_expired_inner();
1424 let change = inner.mutation_log.is_some().then(|| MapChange::Set {
1425 key: key.clone(),
1426 value: value.clone(),
1427 });
1428 inner.set_ttl_with(&key, ttl);
1429 inner.lru_remove(&key);
1430 if inner.lru_max_size.is_some() {
1431 inner.lru_order.push(key.clone());
1432 }
1433 inner.backend.set(key, value);
1434 let evicted = inner.lru_evict();
1435 let archived = inner.apply_retention_inner();
1436 let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1437 for (k, prev) in expired {
1438 eviction_changes.push((k, prev, DeleteReason::Expired));
1439 }
1440 for (k, prev) in evicted {
1441 eviction_changes.push((k, prev, DeleteReason::LruEvict));
1442 }
1443 for (k, v, s) in &archived {
1444 if let Some(on_archive) =
1445 &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1446 {
1447 on_archive(k, v, *s);
1448 }
1449 eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1450 }
1451 (inner.backend.to_vec(), change, eviction_changes)
1452 };
1453 let version = self.emitter.emit(core, snapshot);
1454 if change.is_some() || !eviction_changes.is_empty() {
1455 let mut inner = self.inner.lock();
1456 if let Some(change) = change {
1457 inner.record(change, version.clone());
1458 }
1459 for (k, prev, reason) in eviction_changes {
1460 inner.record(
1461 MapChange::Delete {
1462 key: k,
1463 previous: prev,
1464 reason,
1465 },
1466 version.clone(),
1467 );
1468 }
1469 }
1470 }
1471
1472 pub fn set_many(&self, core: &Core, entries: Vec<(K, V)>) {
1473 self.set_many_with_ttl(core, entries, None);
1474 }
1475
1476 pub fn set_many_with_ttl(&self, core: &Core, entries: Vec<(K, V)>, ttl: Option<f64>) {
1481 if let Some(t) = ttl {
1482 assert!(
1483 t > 0.0 && t.is_finite(),
1484 "per-call ttl must be positive and finite"
1485 );
1486 }
1487 if entries.is_empty() {
1488 return;
1489 }
1490 let (snapshot, changes, eviction_changes) = {
1491 let mut inner = self.inner.lock();
1492 let expired = inner.prune_expired_inner();
1493 let changes: Option<Vec<MapChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1494 entries
1495 .iter()
1496 .map(|(k, v)| MapChange::Set {
1497 key: k.clone(),
1498 value: v.clone(),
1499 })
1500 .collect()
1501 });
1502 for (k, _) in &entries {
1503 inner.set_ttl_with(k, ttl);
1504 inner.lru_remove(k);
1505 if inner.lru_max_size.is_some() {
1506 inner.lru_order.push(k.clone());
1507 }
1508 }
1509 inner.backend.set_many(entries);
1510 let evicted = inner.lru_evict();
1511 let archived = inner.apply_retention_inner();
1512 let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1513 for (k, prev) in expired {
1514 eviction_changes.push((k, prev, DeleteReason::Expired));
1515 }
1516 for (k, prev) in evicted {
1517 eviction_changes.push((k, prev, DeleteReason::LruEvict));
1518 }
1519 for (k, v, s) in &archived {
1520 if let Some(on_archive) =
1521 &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1522 {
1523 on_archive(k, v, *s);
1524 }
1525 eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1526 }
1527 (inner.backend.to_vec(), changes, eviction_changes)
1528 };
1529 let version = self.emitter.emit(core, snapshot);
1530 if changes.is_some() || !eviction_changes.is_empty() {
1531 let mut inner = self.inner.lock();
1532 if let Some(changes) = changes {
1533 for change in changes {
1534 inner.record(change, version.clone());
1535 }
1536 }
1537 for (k, prev, reason) in eviction_changes {
1538 inner.record(
1539 MapChange::Delete {
1540 key: k,
1541 previous: prev,
1542 reason,
1543 },
1544 version.clone(),
1545 );
1546 }
1547 }
1548 }
1549
1550 pub fn delete(&self, core: &Core, key: &K) {
1551 let (snapshot, previous) = {
1552 let mut inner = self.inner.lock();
1553 let previous = inner.backend.get(key);
1554 if !inner.backend.delete(key) {
1555 return;
1556 }
1557 inner.ttl_expiry.remove(key);
1558 inner.lru_remove(key);
1559 (inner.backend.to_vec(), previous)
1560 };
1561 let version = self.emitter.emit(core, snapshot);
1562 if let Some(prev) = previous {
1563 self.inner.lock().record(
1564 MapChange::Delete {
1565 key: key.clone(),
1566 previous: prev,
1567 reason: DeleteReason::Explicit,
1568 },
1569 version,
1570 );
1571 }
1572 }
1573
1574 pub fn delete_many(&self, core: &Core, keys: &[K]) {
1575 let (snapshot, actually_deleted) = {
1576 let mut inner = self.inner.lock();
1577 let actually_deleted: Vec<(K, V)> = keys
1578 .iter()
1579 .filter_map(|k| inner.backend.get(k).map(|v| (k.clone(), v)))
1580 .collect();
1581 let removed = inner.backend.delete_many(keys);
1582 if removed == 0 {
1583 return;
1584 }
1585 for k in keys {
1586 inner.ttl_expiry.remove(k);
1587 inner.lru_remove(k);
1588 }
1589 (inner.backend.to_vec(), actually_deleted)
1590 };
1591 let version = self.emitter.emit(core, snapshot);
1592 if !actually_deleted.is_empty() {
1593 let mut inner = self.inner.lock();
1594 for (k, prev) in actually_deleted {
1595 inner.record(
1596 MapChange::Delete {
1597 key: k,
1598 previous: prev,
1599 reason: DeleteReason::Explicit,
1600 },
1601 version.clone(),
1602 );
1603 }
1604 }
1605 }
1606
1607 pub fn clear(&self, core: &Core) {
1608 let (snapshot, count) = {
1609 let mut inner = self.inner.lock();
1610 let count = inner.backend.clear();
1611 if count == 0 {
1612 return;
1613 }
1614 inner.ttl_expiry.clear();
1615 inner.lru_order.clear();
1616 (inner.backend.to_vec(), count)
1617 };
1618 let version = self.emitter.emit(core, snapshot);
1619 self.inner
1620 .lock()
1621 .record(MapChange::Clear { count }, version);
1622 }
1623
1624 pub fn prune_expired(&self, core: &Core) -> usize {
1626 let expired = {
1627 let mut inner = self.inner.lock();
1628 inner.prune_expired_inner()
1629 };
1630 if expired.is_empty() {
1631 return 0;
1632 }
1633 let count = expired.len();
1634 let snapshot = self.inner.lock().backend.to_vec();
1635 let version = self.emitter.emit(core, snapshot);
1636 let mut inner = self.inner.lock();
1637 for (k, prev) in expired {
1638 inner.record(
1639 MapChange::Delete {
1640 key: k,
1641 previous: prev,
1642 reason: DeleteReason::Expired,
1643 },
1644 version.clone(),
1645 );
1646 }
1647 count
1648 }
1649
1650 #[must_use]
1651 pub fn to_vec(&self) -> Vec<(K, V)> {
1652 self.inner.lock().backend.to_vec()
1653 }
1654
1655 #[must_use]
1656 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<MapChange<K, V>>>> {
1657 self.inner.lock().mutation_log.clone()
1658 }
1659}
1660
1661pub struct ReactiveIndex<K, V>
1669where
1670 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1671 V: Clone + Send + Sync + 'static,
1672{
1673 inner: Mutex<IndexInner<K, V>>,
1674 emitter: EmitHandle<Vec<IndexRow<K, V>>>,
1675 pub node_id: NodeId,
1676}
1677
1678pub type IndexEqualsFn<K, V> = Arc<dyn Fn(&IndexRow<K, V>, &IndexRow<K, V>) -> bool + Send + Sync>;
1680
1681struct IndexInner<K, V>
1682where
1683 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1684 V: Clone + Send + Sync + 'static,
1685{
1686 backend: Box<dyn IndexBackend<K, V>>,
1687 mutation_log: Option<Vec<BaseChange<IndexChange<K, V>>>>,
1688 structure_name: String,
1689 equals: Option<IndexEqualsFn<K, V>>,
1694}
1695
1696impl<K, V> IndexInner<K, V>
1697where
1698 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1699 V: Clone + Send + Sync + 'static,
1700{
1701 fn record(&mut self, change: IndexChange<K, V>, version: Version) {
1702 if let Some(log) = &mut self.mutation_log {
1703 log.push(BaseChange {
1704 structure: self.structure_name.clone(),
1705 version,
1706 t_ns: wall_clock_ns(),
1707 seq: None,
1708 lifecycle: Lifecycle::Data,
1709 change,
1710 });
1711 }
1712 }
1713}
1714
1715pub struct UpsertOptions<K, V>
1717where
1718 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1719 V: Clone + Send + Sync + 'static,
1720{
1721 pub equals: Option<IndexEqualsFn<K, V>>,
1723}
1724
1725impl<K, V> Default for UpsertOptions<K, V>
1726where
1727 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1728 V: Clone + Send + Sync + 'static,
1729{
1730 fn default() -> Self {
1731 Self { equals: None }
1732 }
1733}
1734
1735pub struct ReactiveIndexOptions<K, V>
1737where
1738 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1739 V: Clone + Send + Sync + 'static,
1740{
1741 pub name: String,
1742 pub backend: Option<Box<dyn IndexBackend<K, V>>>,
1743 pub mutation_log: bool,
1744 pub equals: Option<IndexEqualsFn<K, V>>,
1746}
1747
1748impl<K, V> Default for ReactiveIndexOptions<K, V>
1749where
1750 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1751 V: Clone + Send + Sync + 'static,
1752{
1753 fn default() -> Self {
1754 Self {
1755 name: "reactiveIndex".into(),
1756 backend: None,
1757 mutation_log: false,
1758 equals: None,
1759 }
1760 }
1761}
1762
1763impl<K, V> ReactiveIndex<K, V>
1764where
1765 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1766 V: Clone + Send + Sync + 'static,
1767{
1768 #[must_use]
1769 pub fn new(
1770 core: &Core,
1771 intern: InternFn<Vec<IndexRow<K, V>>>,
1772 opts: ReactiveIndexOptions<K, V>,
1773 ) -> Self {
1774 let node_id = core
1775 .register_state(HandleId::new(0), false)
1776 .expect("register_state for ReactiveIndex");
1777 let backend: Box<dyn IndexBackend<K, V>> = opts
1778 .backend
1779 .unwrap_or_else(|| Box::new(VecIndexBackend::new()));
1780 let mutation_log = if opts.mutation_log {
1781 Some(Vec::new())
1782 } else {
1783 None
1784 };
1785 let inner = IndexInner {
1786 backend,
1787 mutation_log,
1788 structure_name: opts.name,
1789 equals: opts.equals,
1790 };
1791 Self {
1792 inner: Mutex::new(inner),
1793 emitter: EmitHandle {
1794 node_id,
1795 intern,
1796 version: AtomicU64::new(0),
1797 },
1798 node_id,
1799 }
1800 }
1801
1802 #[must_use]
1803 pub fn size(&self) -> usize {
1804 self.inner.lock().backend.size()
1805 }
1806
1807 #[must_use]
1808 pub fn has(&self, primary: &K) -> bool {
1809 self.inner.lock().backend.has(primary)
1810 }
1811
1812 #[must_use]
1813 pub fn get(&self, primary: &K) -> Option<V> {
1814 self.inner.lock().backend.get(primary)
1815 }
1816
1817 pub fn upsert(&self, core: &Core, primary: K, secondary: String, value: V) -> bool {
1820 self.upsert_with(core, primary, secondary, value, &UpsertOptions::default())
1821 }
1822
1823 pub fn upsert_with(
1828 &self,
1829 core: &Core,
1830 primary: K,
1831 secondary: String,
1832 value: V,
1833 opts: &UpsertOptions<K, V>,
1834 ) -> bool {
1835 let (is_new, snapshot, change) = {
1836 let mut inner = self.inner.lock();
1837 let eq_fn = opts.equals.as_ref().or(inner.equals.as_ref());
1839 if let Some(eq) = eq_fn {
1840 if let Some(existing_row) = inner.backend.get_row(&primary) {
1841 let proposed = IndexRow {
1842 primary: primary.clone(),
1843 secondary: secondary.clone(),
1844 value: value.clone(),
1845 };
1846 if eq(&existing_row, &proposed) {
1847 return false;
1848 }
1849 }
1850 }
1851 let change = inner.mutation_log.is_some().then(|| IndexChange::Upsert {
1852 primary: primary.clone(),
1853 secondary: secondary.clone(),
1854 value: value.clone(),
1855 });
1856 let is_new = inner.backend.upsert(primary, secondary, value);
1857 (is_new, inner.backend.to_ordered(), change)
1858 };
1859 let version = self.emitter.emit(core, snapshot);
1860 if let Some(change) = change {
1861 self.inner.lock().record(change, version);
1862 }
1863 is_new
1864 }
1865
1866 pub fn upsert_many(&self, core: &Core, rows: Vec<(K, String, V)>) {
1869 if rows.is_empty() {
1870 return;
1871 }
1872 let (snapshot, changes) = {
1873 let mut inner = self.inner.lock();
1874 let effective_rows: Vec<(K, String, V)> = if let Some(eq) = &inner.equals {
1876 rows.into_iter()
1877 .filter(|(pk, sec, val)| {
1878 if let Some(existing) = inner.backend.get_row(pk) {
1879 let proposed = IndexRow {
1880 primary: pk.clone(),
1881 secondary: sec.clone(),
1882 value: val.clone(),
1883 };
1884 !eq(&existing, &proposed)
1885 } else {
1886 true
1887 }
1888 })
1889 .collect()
1890 } else {
1891 rows
1892 };
1893 if effective_rows.is_empty() {
1894 return;
1895 }
1896 let changes: Option<Vec<IndexChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1897 effective_rows
1898 .iter()
1899 .map(|(k, s, v)| IndexChange::Upsert {
1900 primary: k.clone(),
1901 secondary: s.clone(),
1902 value: v.clone(),
1903 })
1904 .collect()
1905 });
1906 inner.backend.upsert_many(effective_rows);
1907 (inner.backend.to_ordered(), changes)
1908 };
1909 let version = self.emitter.emit(core, snapshot);
1910 if let Some(changes) = changes {
1911 let mut inner = self.inner.lock();
1912 for change in changes {
1913 inner.record(change, version.clone());
1914 }
1915 }
1916 }
1917
1918 pub fn delete(&self, core: &Core, primary: &K) {
1919 let snapshot = {
1920 let mut inner = self.inner.lock();
1921 if !inner.backend.delete(primary) {
1922 return;
1923 }
1924 inner.backend.to_ordered()
1925 };
1926 let version = self.emitter.emit(core, snapshot);
1927 self.inner.lock().record(
1928 IndexChange::Delete {
1929 primary: primary.clone(),
1930 },
1931 version,
1932 );
1933 }
1934
1935 pub fn delete_many(&self, core: &Core, primaries: &[K]) {
1936 let (snapshot, actually_deleted) = {
1937 let mut inner = self.inner.lock();
1938 let actually_deleted: Vec<K> = if inner.mutation_log.is_some() {
1940 primaries
1941 .iter()
1942 .filter(|k| inner.backend.has(k))
1943 .cloned()
1944 .collect()
1945 } else {
1946 vec![]
1947 };
1948 let removed = inner.backend.delete_many(primaries);
1949 if removed == 0 {
1950 return;
1951 }
1952 (inner.backend.to_ordered(), actually_deleted)
1953 };
1954 let version = self.emitter.emit(core, snapshot);
1955 if !actually_deleted.is_empty() {
1956 self.inner.lock().record(
1957 IndexChange::DeleteMany {
1958 primaries: actually_deleted,
1959 },
1960 version,
1961 );
1962 }
1963 }
1964
1965 pub fn clear(&self, core: &Core) {
1966 let (snapshot, count) = {
1967 let mut inner = self.inner.lock();
1968 let count = inner.backend.clear();
1969 if count == 0 {
1970 return;
1971 }
1972 (inner.backend.to_ordered(), count)
1973 };
1974 let version = self.emitter.emit(core, snapshot);
1975 self.inner
1976 .lock()
1977 .record(IndexChange::Clear { count }, version);
1978 }
1979
1980 #[must_use]
1981 pub fn to_ordered(&self) -> Vec<IndexRow<K, V>> {
1982 self.inner.lock().backend.to_ordered()
1983 }
1984
1985 #[must_use]
1986 pub fn to_primary_map(&self) -> Vec<(K, V)> {
1987 self.inner.lock().backend.to_primary_map()
1988 }
1989
1990 #[must_use]
1995 pub fn range_by_primary(&self, start: &K, end: &K) -> Vec<V>
1996 where
1997 K: Ord,
1998 {
1999 let mut rows: Vec<(K, V)> = self
2000 .inner
2001 .lock()
2002 .backend
2003 .to_primary_map()
2004 .into_iter()
2005 .filter(|(k, _)| k >= start && k < end)
2006 .collect();
2007 rows.sort_by(|a, b| a.0.cmp(&b.0));
2008 rows.into_iter().map(|(_, v)| v).collect()
2009 }
2010
2011 #[must_use]
2012 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<IndexChange<K, V>>>> {
2013 self.inner.lock().mutation_log.clone()
2014 }
2015}