1use std::collections::HashMap;
14use std::hash::Hash;
15use std::rc::Rc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20
21use graphrefly_core::{
22 monotonic_ns, wall_clock_ns, Core, CoreMailbox, HandleId, Message, NodeId, SubscriptionId,
23};
24
25use crate::backend::{
26 HashMapBackend, IndexBackend, IndexRow, ListBackend, LogBackend, MapBackend, VecIndexBackend,
27 VecListBackend, VecLogBackend,
28};
29use crate::changeset::{
30 BaseChange, DeleteReason, IndexChange, Lifecycle, ListChange, LogChange, MapChange, Version,
31};
32
33pub type InternFn<S> = Arc<dyn Fn(S) -> HandleId + Send + Sync>;
41
42struct EmitHandle<S> {
53 node_id: NodeId,
54 intern: InternFn<S>,
55 version: AtomicU64,
56}
57
58impl<S> EmitHandle<S> {
59 fn emit(&self, core: &Core, snapshot: S) -> Version {
62 let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
63 let handle = (self.intern)(snapshot);
64 core.emit(self.node_id, handle);
65 Version::Counter(ver)
66 }
67}
68
69struct SinkEmitter<S> {
76 mailbox: Arc<CoreMailbox>,
77 node_id: NodeId,
78 intern: InternFn<S>,
79 version: AtomicU64,
80}
81
82impl<S> SinkEmitter<S> {
83 fn emit(&self, snapshot: S) -> Version {
86 let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
87 let handle = (self.intern)(snapshot);
88 let _ = self.mailbox.post_emit(self.node_id, handle);
93 Version::Counter(ver)
94 }
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub struct IndexOutOfBounds;
104
105impl std::fmt::Display for IndexOutOfBounds {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.write_str("index out of bounds")
108 }
109}
110
111impl std::error::Error for IndexOutOfBounds {}
112
113pub struct ReactiveLog<T: Clone + Send + Sync + 'static> {
123 inner: Arc<Mutex<LogInner<T>>>,
124 emitter: EmitHandle<Vec<T>>,
125 pub node_id: NodeId,
127}
128
129struct LogInner<T: Clone + Send + Sync + 'static> {
130 backend: Box<dyn LogBackend<T>>,
131 mutation_log: Option<Vec<BaseChange<LogChange<T>>>>,
132 structure_name: String,
133}
134
135impl<T: Clone + Send + Sync + 'static> LogInner<T> {
136 fn record(&mut self, change: LogChange<T>, version: Version) {
137 if let Some(log) = &mut self.mutation_log {
138 log.push(BaseChange {
139 structure: self.structure_name.clone(),
140 version,
141 t_ns: wall_clock_ns(),
142 seq: None,
143 lifecycle: Lifecycle::Data,
144 change,
145 });
146 }
147 }
148}
149
150pub struct ReactiveLogOptions<T: Clone + Send + Sync + 'static> {
152 pub name: String,
153 pub max_size: Option<usize>,
154 pub backend: Option<Box<dyn LogBackend<T>>>,
155 pub mutation_log: bool,
156}
157
158impl<T: Clone + Send + Sync + 'static> Default for ReactiveLogOptions<T> {
159 fn default() -> Self {
160 Self {
161 name: "reactiveLog".into(),
162 max_size: None,
163 backend: None,
164 mutation_log: false,
165 }
166 }
167}
168
169impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
170 #[must_use]
174 pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveLogOptions<T>) -> Self {
175 let node_id = core
176 .register_state(HandleId::new(0), false)
177 .expect("register_state for ReactiveLog");
178 let backend: Box<dyn LogBackend<T>> = opts
179 .backend
180 .unwrap_or_else(|| Box::new(VecLogBackend::new(opts.max_size)));
181 let mutation_log = if opts.mutation_log {
182 Some(Vec::new())
183 } else {
184 None
185 };
186 let inner = LogInner {
187 backend,
188 mutation_log,
189 structure_name: opts.name,
190 };
191 Self {
192 inner: Arc::new(Mutex::new(inner)),
193 emitter: EmitHandle {
194 node_id,
195 intern,
196 version: AtomicU64::new(0),
197 },
198 node_id,
199 }
200 }
201
202 #[must_use]
203 pub fn size(&self) -> usize {
204 self.inner.lock().backend.size()
205 }
206
207 #[must_use]
208 pub fn at(&self, index: i64) -> Option<T> {
209 self.inner.lock().backend.at(index)
210 }
211
212 pub fn append(&self, core: &Core, value: T) {
213 let (snapshot, change) = {
214 let mut inner = self.inner.lock();
215 let change = inner.mutation_log.is_some().then(|| LogChange::Append {
216 value: value.clone(),
217 });
218 inner.backend.append(value);
219 (inner.backend.to_vec(), change)
220 };
221 let version = self.emitter.emit(core, snapshot);
222 if let Some(change) = change {
223 self.inner.lock().record(change, version);
224 }
225 }
226
227 pub fn append_many(&self, core: &Core, values: Vec<T>) {
228 if values.is_empty() {
229 return;
230 }
231 let (snapshot, change) = {
232 let mut inner = self.inner.lock();
233 let change = inner.mutation_log.is_some().then(|| LogChange::AppendMany {
234 values: values.clone(),
235 });
236 inner.backend.append_many(values);
237 (inner.backend.to_vec(), change)
238 };
239 let version = self.emitter.emit(core, snapshot);
240 if let Some(change) = change {
241 self.inner.lock().record(change, version);
242 }
243 }
244
245 pub fn clear(&self, core: &Core) {
246 let (snapshot, count) = {
247 let mut inner = self.inner.lock();
248 let count = inner.backend.clear();
249 if count == 0 {
250 return;
251 }
252 (inner.backend.to_vec(), count)
253 };
254 let version = self.emitter.emit(core, snapshot);
255 self.inner
256 .lock()
257 .record(LogChange::Clear { count }, version);
258 }
259
260 pub fn trim_head(&self, core: &Core, n: usize) {
261 if n == 0 {
262 return;
263 }
264 let (snapshot, actual) = {
265 let mut inner = self.inner.lock();
266 let actual = inner.backend.trim_head(n);
267 if actual == 0 {
268 return;
269 }
270 (inner.backend.to_vec(), actual)
271 };
272 let version = self.emitter.emit(core, snapshot);
273 self.inner
274 .lock()
275 .record(LogChange::TrimHead { n: actual }, version);
276 }
277
278 #[must_use]
279 pub fn to_vec(&self) -> Vec<T> {
280 self.inner.lock().backend.to_vec()
281 }
282
283 #[must_use]
286 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<LogChange<T>>>> {
287 self.inner.lock().mutation_log.clone()
288 }
289}
290
291pub enum ViewSpec {
297 Tail { n: usize },
299 Slice { start: usize, stop: Option<usize> },
301 FromCursor {
304 cursor_node: NodeId,
305 read_cursor: Arc<dyn Fn(HandleId) -> usize + Send + Sync>,
306 },
307}
308
309pub struct ReactiveSub {
315 subs: Vec<(NodeId, SubscriptionId)>,
316}
317
318impl ReactiveSub {
319 pub fn detach(&mut self, core: &Core) {
323 for (node_id, sub_id) in self.subs.drain(..) {
324 core.unsubscribe(node_id, sub_id);
325 }
326 }
327}
328
329pub struct LogView {
332 pub node_id: NodeId,
334 sub: ReactiveSub,
335}
336
337impl LogView {
338 pub fn detach(&mut self, core: &Core) {
340 self.sub.detach(core);
341 }
342}
343
344pub struct ScanHandle {
347 pub node_id: NodeId,
349 sub: ReactiveSub,
350}
351
352impl ScanHandle {
353 pub fn detach(&mut self, core: &Core) {
355 self.sub.detach(core);
356 }
357}
358
359#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
367pub enum AppendLogMode {
368 #[default]
370 Append,
371 Overwrite,
376}
377
378#[derive(Debug)]
381pub enum AttachStorageError {
382 OverwriteSinkRejected { sink_index: usize },
385}
386
387impl std::fmt::Display for AttachStorageError {
388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389 match self {
390 Self::OverwriteSinkRejected { sink_index } => write!(
391 f,
392 "attach_storage: sink[{sink_index}] declares AppendLogMode::Overwrite — \
393 delta-shipping into an overwrite sink truncates; use an Append-mode sink",
394 ),
395 }
396 }
397}
398
399impl std::error::Error for AttachStorageError {}
400
401pub trait AppendLogSink<T>: Send + Sync {
409 fn append_entries(&self, entries: &[T])
411 -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
412 fn load_entries(&self) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>;
415 fn mode(&self) -> AppendLogMode {
419 AppendLogMode::Append
420 }
421 fn flush(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
427 Ok(())
428 }
429}
430
431pub struct AttachStorageHandle<T> {
440 sub: ReactiveSub,
441 sinks: Vec<Arc<dyn AppendLogSink<T>>>,
442}
443
444impl<T> AttachStorageHandle<T> {
445 pub fn detach(&mut self, core: &Core) {
448 self.sub.detach(core);
449 }
450 #[must_use = "ignoring AttachStorageHandle::flush_all's Result swallows write errors"]
466 pub fn flush_all(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
467 for (i, sink) in self.sinks.iter().enumerate() {
468 if let Err(e) = sink.flush() {
469 return Err(format!("sink[{i}] flush: {e}").into());
470 }
471 }
472 Ok(())
473 }
474}
475
476#[derive(Debug, Clone, Copy, Default)]
486pub struct AttachOptions {
487 pub skip_cached_replay: bool,
488}
489
490impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
491 #[allow(clippy::too_many_lines)]
499 pub fn view(&self, core: &Core, spec: ViewSpec, intern: InternFn<Vec<T>>) -> LogView {
500 let view_node = core
501 .register_state(HandleId::new(0), false)
502 .expect("register_state for LogView");
503 let view_emitter = Arc::new(SinkEmitter {
506 mailbox: core.mailbox(),
507 node_id: view_node,
508 intern,
509 version: AtomicU64::new(0),
510 });
511
512 let inner = Arc::clone(&self.inner);
513 let mut subscriptions: Vec<(NodeId, SubscriptionId)> = Vec::new();
514
515 match spec {
516 ViewSpec::Tail { n } => {
517 let inner_c = Arc::clone(&inner);
518 let emitter_c = Arc::clone(&view_emitter);
519 let sub = core.subscribe(
520 self.node_id,
521 Rc::new(move |msgs| {
522 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
523 let guard = inner_c.lock();
524 let data = guard.backend.to_vec();
525 let start = data.len().saturating_sub(n);
526 let view = data[start..].to_vec();
527 drop(guard);
528 emitter_c.emit(view);
529 }
530 }),
531 );
532 subscriptions.push((self.node_id, sub));
533 }
534 ViewSpec::Slice { start, stop } => {
535 let inner_c = Arc::clone(&inner);
536 let emitter_c = Arc::clone(&view_emitter);
537 let sub = core.subscribe(
538 self.node_id,
539 Rc::new(move |msgs| {
540 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
541 let guard = inner_c.lock();
542 let data = guard.backend.to_vec();
543 let end = stop.unwrap_or(data.len()).min(data.len());
544 let s = start.min(end);
545 let view = data[s..end].to_vec();
546 drop(guard);
547 emitter_c.emit(view);
548 }
549 }),
550 );
551 subscriptions.push((self.node_id, sub));
552 }
553 ViewSpec::FromCursor {
554 cursor_node,
555 read_cursor,
556 } => {
557 let cursor_pos = Arc::new(Mutex::new(0usize));
558
559 let cursor_pos_c = Arc::clone(&cursor_pos);
561 let inner_c = Arc::clone(&inner);
562 let emitter_c = Arc::clone(&view_emitter);
563 let read_cursor_c = Arc::clone(&read_cursor);
564 let sub_cursor = core.subscribe(
565 cursor_node,
566 Rc::new(move |msgs| {
567 for m in msgs {
568 if let Message::Data(h) = m {
569 let pos = read_cursor_c(*h);
570 *cursor_pos_c.lock() = pos;
571 let guard = inner_c.lock();
572 let data = guard.backend.to_vec();
573 let s = pos.min(data.len());
574 let view = data[s..].to_vec();
575 drop(guard);
576 emitter_c.emit(view);
577 }
578 }
579 }),
580 );
581 subscriptions.push((cursor_node, sub_cursor));
582
583 let cursor_pos_c2 = Arc::clone(&cursor_pos);
585 let inner_c2 = Arc::clone(&inner);
586 let emitter_c2 = view_emitter;
587 let sub_log = core.subscribe(
588 self.node_id,
589 Rc::new(move |msgs| {
590 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
591 let pos = *cursor_pos_c2.lock();
592 let guard = inner_c2.lock();
593 let data = guard.backend.to_vec();
594 let s = pos.min(data.len());
595 let view = data[s..].to_vec();
596 drop(guard);
597 emitter_c2.emit(view);
598 }
599 }),
600 );
601 subscriptions.push((self.node_id, sub_log));
602 }
603 }
604
605 LogView {
606 node_id: view_node,
607 sub: ReactiveSub {
608 subs: subscriptions,
609 },
610 }
611 }
612
613 pub fn scan<TAcc: Clone + Send + Sync + 'static>(
619 &self,
620 core: &Core,
621 initial: TAcc,
622 step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
623 intern: InternFn<TAcc>,
624 ) -> ScanHandle {
625 struct ScanState<T, TAcc> {
626 acc: TAcc,
627 processed: usize,
628 initial: TAcc,
629 step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
630 }
631
632 let scan_node = core
633 .register_state(HandleId::new(0), false)
634 .expect("register_state for Scan");
635
636 let state = Arc::new(Mutex::new(ScanState {
637 acc: initial.clone(),
638 processed: 0,
639 initial,
640 step,
641 }));
642 let inner = Arc::clone(&self.inner);
643 let scan_emitter = Arc::new(SinkEmitter {
646 mailbox: core.mailbox(),
647 node_id: scan_node,
648 intern,
649 version: AtomicU64::new(0),
650 });
651
652 let sub = core.subscribe(
653 self.node_id,
654 Rc::new(move |msgs| {
655 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
656 let mut ss = state.lock();
657 let guard = inner.lock();
658 let data = guard.backend.to_vec();
659 drop(guard);
660
661 if data.len() < ss.processed {
662 ss.acc = ss.initial.clone();
664 ss.processed = 0;
665 }
666 for item in &data[ss.processed..] {
667 ss.acc = (ss.step)(&ss.acc, item);
668 }
669 ss.processed = data.len();
670 let acc = ss.acc.clone();
671 drop(ss);
672 scan_emitter.emit(acc);
673 }
674 }),
675 );
676
677 ScanHandle {
678 node_id: scan_node,
679 sub: ReactiveSub {
680 subs: vec![(self.node_id, sub)],
681 },
682 }
683 }
684
685 pub fn attach(
692 &self,
693 core: &Core,
694 upstream: NodeId,
695 read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
696 ) -> ReactiveSub {
697 self.attach_with_options(core, upstream, read_value, AttachOptions::default())
698 }
699
700 pub fn attach_with_options(
705 &self,
706 core: &Core,
707 upstream: NodeId,
708 read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
709 opts: AttachOptions,
710 ) -> ReactiveSub {
711 let inner = Arc::clone(&self.inner);
712 let mailbox = core.mailbox();
717 let node_id = self.node_id;
718 let intern = Arc::clone(&self.emitter.intern);
719
720 let suppress = if opts.skip_cached_replay {
725 let cache = core.cache_of(upstream);
726 cache != graphrefly_core::NO_HANDLE
727 } else {
728 false
729 };
730 let skip_state = Arc::new(parking_lot::Mutex::new(suppress));
731
732 let sub = core.subscribe(
733 upstream,
734 Rc::new(move |msgs| {
735 let should_skip = {
739 let mut s = skip_state.lock();
740 let has_data = msgs.iter().any(|m| matches!(m, Message::Data(_)));
741 let do_skip = *s && has_data;
742 if do_skip {
743 *s = false;
744 }
745 do_skip
746 };
747 if should_skip {
748 return;
749 }
750 for m in msgs {
751 if let Message::Data(h) = m {
752 let value = read_value(*h);
753 let snapshot = {
754 let mut guard = inner.lock();
755 guard.backend.append(value);
756 guard.backend.to_vec()
757 };
758 let handle = (intern)(snapshot);
759 let _ = mailbox.post_emit(node_id, handle);
762 }
763 }
764 }),
765 );
766 ReactiveSub {
767 subs: vec![(upstream, sub)],
768 }
769 }
770
771 pub fn attach_storage(
779 &self,
780 core: &Core,
781 sinks: Vec<Arc<dyn AppendLogSink<T>>>,
782 preload: bool,
783 ) -> Result<AttachStorageHandle<T>, AttachStorageError> {
784 for (sink_index, sink) in sinks.iter().enumerate() {
790 if sink.mode() == AppendLogMode::Overwrite {
791 return Err(AttachStorageError::OverwriteSinkRejected { sink_index });
792 }
793 }
794
795 if sinks.is_empty() {
796 let sub = core.subscribe(self.node_id, Rc::new(|_| {}));
797 return Ok(AttachStorageHandle {
798 sub: ReactiveSub {
799 subs: vec![(self.node_id, sub)],
800 },
801 sinks,
802 });
803 }
804
805 if preload {
806 for sink in &sinks {
807 if let Ok(entries) = sink.load_entries() {
808 if !entries.is_empty() {
809 self.append_many(core, entries);
810 break;
811 }
812 }
813 }
814 }
815
816 let current_size = self.size();
817 let delivered: Vec<Arc<Mutex<usize>>> = sinks
820 .iter()
821 .map(|_| Arc::new(Mutex::new(current_size)))
822 .collect();
823
824 let inner = Arc::clone(&self.inner);
825 let sinks_for_sink = sinks.clone();
826 let delivered_arc = delivered;
827
828 let sub = core.subscribe(
829 self.node_id,
830 Rc::new(move |msgs| {
831 if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
832 let guard = inner.lock();
833 let data = guard.backend.to_vec();
834 drop(guard);
835
836 for (i, sink) in sinks_for_sink.iter().enumerate() {
837 let mut del = delivered_arc[i].lock();
838 let result = match data.len().cmp(&*del) {
839 std::cmp::Ordering::Greater => sink.append_entries(&data[*del..]),
840 std::cmp::Ordering::Less => sink.append_entries(&data),
841 std::cmp::Ordering::Equal => continue,
842 };
843 match result {
844 Ok(()) => *del = data.len(),
845 Err(e) => eprintln!("attach_storage sink[{i}] error: {e}"),
846 }
847 }
848 }
849 }),
850 );
851
852 Ok(AttachStorageHandle {
853 sub: ReactiveSub {
854 subs: vec![(self.node_id, sub)],
855 },
856 sinks,
857 })
858 }
859}
860
861pub struct ReactiveList<T: Clone + Send + Sync + 'static> {
869 inner: Mutex<ListInner<T>>,
870 emitter: EmitHandle<Vec<T>>,
871 pub node_id: NodeId,
872}
873
874struct ListInner<T: Clone + Send + Sync + 'static> {
875 backend: Box<dyn ListBackend<T>>,
876 mutation_log: Option<Vec<BaseChange<ListChange<T>>>>,
877 structure_name: String,
878}
879
880impl<T: Clone + Send + Sync + 'static> ListInner<T> {
881 fn record(&mut self, change: ListChange<T>, version: Version) {
882 if let Some(log) = &mut self.mutation_log {
883 log.push(BaseChange {
884 structure: self.structure_name.clone(),
885 version,
886 t_ns: wall_clock_ns(),
887 seq: None,
888 lifecycle: Lifecycle::Data,
889 change,
890 });
891 }
892 }
893}
894
895pub struct ReactiveListOptions<T: Clone + Send + Sync + 'static> {
897 pub name: String,
898 pub backend: Option<Box<dyn ListBackend<T>>>,
899 pub mutation_log: bool,
900}
901
902impl<T: Clone + Send + Sync + 'static> Default for ReactiveListOptions<T> {
903 fn default() -> Self {
904 Self {
905 name: "reactiveList".into(),
906 backend: None,
907 mutation_log: false,
908 }
909 }
910}
911
912impl<T: Clone + Send + Sync + 'static> ReactiveList<T> {
913 #[must_use]
914 pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveListOptions<T>) -> Self {
915 let node_id = core
916 .register_state(HandleId::new(0), false)
917 .expect("register_state for ReactiveList");
918 let backend: Box<dyn ListBackend<T>> = opts
919 .backend
920 .unwrap_or_else(|| Box::new(VecListBackend::new()));
921 let mutation_log = if opts.mutation_log {
922 Some(Vec::new())
923 } else {
924 None
925 };
926 let inner = ListInner {
927 backend,
928 mutation_log,
929 structure_name: opts.name,
930 };
931 Self {
932 inner: Mutex::new(inner),
933 emitter: EmitHandle {
934 node_id,
935 intern,
936 version: AtomicU64::new(0),
937 },
938 node_id,
939 }
940 }
941
942 #[must_use]
943 pub fn size(&self) -> usize {
944 self.inner.lock().backend.size()
945 }
946
947 #[must_use]
948 pub fn at(&self, index: i64) -> Option<T> {
949 self.inner.lock().backend.at(index)
950 }
951
952 pub fn append(&self, core: &Core, value: T) {
953 let (snapshot, change) = {
954 let mut inner = self.inner.lock();
955 let change = inner.mutation_log.is_some().then(|| ListChange::Append {
956 value: value.clone(),
957 });
958 inner.backend.append(value);
959 (inner.backend.to_vec(), change)
960 };
961 let version = self.emitter.emit(core, snapshot);
962 if let Some(change) = change {
963 self.inner.lock().record(change, version);
964 }
965 }
966
967 pub fn append_many(&self, core: &Core, values: Vec<T>) {
968 if values.is_empty() {
969 return;
970 }
971 let (snapshot, change) = {
972 let mut inner = self.inner.lock();
973 let change = inner
974 .mutation_log
975 .is_some()
976 .then(|| ListChange::AppendMany {
977 values: values.clone(),
978 });
979 inner.backend.append_many(values);
980 (inner.backend.to_vec(), change)
981 };
982 let version = self.emitter.emit(core, snapshot);
983 if let Some(change) = change {
984 self.inner.lock().record(change, version);
985 }
986 }
987
988 pub fn insert(&self, core: &Core, index: usize, value: T) -> Result<(), IndexOutOfBounds> {
991 let (snapshot, change) = {
992 let mut inner = self.inner.lock();
993 if index > inner.backend.size() {
994 return Err(IndexOutOfBounds);
995 }
996 let change = inner.mutation_log.is_some().then(|| ListChange::Insert {
997 index,
998 value: value.clone(),
999 });
1000 inner.backend.insert(index, value);
1001 (inner.backend.to_vec(), change)
1002 };
1003 let version = self.emitter.emit(core, snapshot);
1004 if let Some(change) = change {
1005 self.inner.lock().record(change, version);
1006 }
1007 Ok(())
1008 }
1009
1010 pub fn insert_many(
1013 &self,
1014 core: &Core,
1015 index: usize,
1016 values: Vec<T>,
1017 ) -> Result<(), IndexOutOfBounds> {
1018 if values.is_empty() {
1019 return Ok(());
1020 }
1021 let (snapshot, change) = {
1022 let mut inner = self.inner.lock();
1023 if index > inner.backend.size() {
1024 return Err(IndexOutOfBounds);
1025 }
1026 let change = inner
1027 .mutation_log
1028 .is_some()
1029 .then(|| ListChange::InsertMany {
1030 index,
1031 values: values.clone(),
1032 });
1033 inner.backend.insert_many(index, values);
1034 (inner.backend.to_vec(), change)
1035 };
1036 let version = self.emitter.emit(core, snapshot);
1037 if let Some(change) = change {
1038 self.inner.lock().record(change, version);
1039 }
1040 Ok(())
1041 }
1042
1043 pub fn pop(&self, core: &Core, index: i64) -> Option<T> {
1046 let (value, snapshot, change) = {
1047 let mut inner = self.inner.lock();
1048 let value = inner.backend.pop(index)?;
1049 let change = inner.mutation_log.is_some().then(|| ListChange::Pop {
1050 index,
1051 value: value.clone(),
1052 });
1053 let snapshot = inner.backend.to_vec();
1054 (value, snapshot, change)
1055 };
1056 let version = self.emitter.emit(core, snapshot);
1057 if let Some(change) = change {
1058 self.inner.lock().record(change, version);
1059 }
1060 Some(value)
1061 }
1062
1063 pub fn clear(&self, core: &Core) {
1064 let (snapshot, count) = {
1065 let mut inner = self.inner.lock();
1066 let count = inner.backend.clear();
1067 if count == 0 {
1068 return;
1069 }
1070 (inner.backend.to_vec(), count)
1071 };
1072 let version = self.emitter.emit(core, snapshot);
1073 self.inner
1074 .lock()
1075 .record(ListChange::Clear { count }, version);
1076 }
1077
1078 #[must_use]
1079 pub fn to_vec(&self) -> Vec<T> {
1080 self.inner.lock().backend.to_vec()
1081 }
1082
1083 #[must_use]
1084 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<ListChange<T>>>> {
1085 self.inner.lock().mutation_log.clone()
1086 }
1087}
1088
1089pub struct ReactiveMap<K, V>
1098where
1099 K: Clone + Eq + Hash + Send + Sync + 'static,
1100 V: Clone + Send + Sync + 'static,
1101{
1102 inner: Mutex<MapInner<K, V>>,
1103 emitter: EmitHandle<Vec<(K, V)>>,
1104 pub node_id: NodeId,
1105}
1106
1107pub struct RetentionPolicy<K, V>
1113where
1114 K: Clone + Eq + Hash + Send + Sync + 'static,
1115 V: Clone + Send + Sync + 'static,
1116{
1117 pub score: Arc<dyn Fn(&K, &V) -> f64 + Send + Sync>,
1119 pub archive_threshold: Option<f64>,
1121 pub max_size: Option<usize>,
1123 pub on_archive: Option<Arc<dyn Fn(&K, &V, f64) + Send + Sync>>,
1125}
1126
1127struct MapInner<K, V>
1128where
1129 K: Clone + Eq + Hash + Send + Sync + 'static,
1130 V: Clone + Send + Sync + 'static,
1131{
1132 backend: Box<dyn MapBackend<K, V>>,
1133 mutation_log: Option<Vec<BaseChange<MapChange<K, V>>>>,
1134 structure_name: String,
1135 ttl_expiry: HashMap<K, u64>,
1137 default_ttl_ns: Option<u64>,
1139 lru_order: Vec<K>,
1141 lru_max_size: Option<usize>,
1143 retention: Option<RetentionPolicy<K, V>>,
1145}
1146
1147impl<K, V> MapInner<K, V>
1148where
1149 K: Clone + Eq + Hash + Send + Sync + 'static,
1150 V: Clone + Send + Sync + 'static,
1151{
1152 fn record(&mut self, change: MapChange<K, V>, version: Version) {
1153 if let Some(log) = &mut self.mutation_log {
1154 log.push(BaseChange {
1155 structure: self.structure_name.clone(),
1156 version,
1157 t_ns: wall_clock_ns(),
1158 seq: None,
1159 lifecycle: Lifecycle::Data,
1160 change,
1161 });
1162 }
1163 }
1164
1165 fn prune_expired_inner(&mut self) -> Vec<(K, V)> {
1167 if self.ttl_expiry.is_empty() {
1168 return vec![];
1169 }
1170 let now = monotonic_ns();
1171 let expired_keys: Vec<K> = self
1172 .ttl_expiry
1173 .iter()
1174 .filter(|(_, &exp)| now >= exp)
1175 .map(|(k, _)| k.clone())
1176 .collect();
1177 let mut expired = Vec::new();
1178 for k in expired_keys {
1179 if let Some(prev) = self.backend.get(&k) {
1180 self.backend.delete(&k);
1181 self.ttl_expiry.remove(&k);
1182 self.lru_remove(&k);
1183 expired.push((k, prev));
1184 }
1185 }
1186 expired
1187 }
1188
1189 fn apply_retention_inner(&mut self) -> Vec<(K, V, f64)> {
1191 let (score_fn, archive_threshold, max_size) = match &self.retention {
1193 Some(r) => (Arc::clone(&r.score), r.archive_threshold, r.max_size),
1194 None => return vec![],
1195 };
1196 let entries = self.backend.to_vec();
1197 if entries.is_empty() {
1198 return vec![];
1199 }
1200 let mut scored: Vec<(K, V, f64)> = entries
1201 .into_iter()
1202 .map(|(k, v)| {
1203 let s = (score_fn)(&k, &v);
1204 (k, v, s)
1205 })
1206 .collect();
1207 scored.sort_by(|a, b| a.2.total_cmp(&b.2));
1208
1209 let mut archived = Vec::new();
1210 if let Some(threshold) = archive_threshold {
1211 while let Some(entry) = scored.first() {
1212 if entry.2 < threshold {
1213 let (k, v, s) = scored.remove(0);
1214 self.backend.delete(&k);
1215 self.ttl_expiry.remove(&k);
1216 self.lru_remove(&k);
1217 archived.push((k, v, s));
1218 } else {
1219 break;
1220 }
1221 }
1222 }
1223 if let Some(max) = max_size {
1224 while scored.len() > max {
1225 let (k, v, s) = scored.remove(0);
1226 self.backend.delete(&k);
1227 self.ttl_expiry.remove(&k);
1228 self.lru_remove(&k);
1229 archived.push((k, v, s));
1230 }
1231 }
1232 archived
1233 }
1234
1235 fn lru_touch(&mut self, key: &K) {
1237 if self.lru_max_size.is_none() {
1238 return;
1239 }
1240 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
1241 self.lru_order.remove(pos);
1242 self.lru_order.push(key.clone());
1243 }
1244 }
1245
1246 fn lru_remove(&mut self, key: &K) {
1248 if self.lru_max_size.is_none() {
1249 return;
1250 }
1251 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
1252 self.lru_order.remove(pos);
1253 }
1254 }
1255
1256 fn lru_evict(&mut self) -> Vec<(K, V)> {
1258 let Some(max) = self.lru_max_size else {
1259 return vec![];
1260 };
1261 let mut evicted = Vec::new();
1262 while self.backend.size() > max && !self.lru_order.is_empty() {
1263 let victim = self.lru_order.remove(0);
1264 if let Some(prev) = self.backend.get(&victim) {
1265 self.backend.delete(&victim);
1266 self.ttl_expiry.remove(&victim);
1267 evicted.push((victim, prev));
1268 }
1269 }
1270 evicted
1271 }
1272
1273 fn set_ttl_with(&mut self, key: &K, ttl: Option<f64>) {
1275 let ttl_ns = match ttl {
1276 Some(secs) => Some((secs * 1_000_000_000.0) as u64),
1277 None => self.default_ttl_ns,
1278 };
1279 if let Some(ns) = ttl_ns {
1280 self.ttl_expiry.insert(key.clone(), monotonic_ns() + ns);
1281 }
1282 }
1283}
1284
1285pub struct ReactiveMapOptions<K, V>
1287where
1288 K: Clone + Eq + Hash + Send + Sync + 'static,
1289 V: Clone + Send + Sync + 'static,
1290{
1291 pub name: String,
1292 pub backend: Option<Box<dyn MapBackend<K, V>>>,
1293 pub mutation_log: bool,
1294 pub default_ttl: Option<f64>,
1297 pub max_size: Option<usize>,
1299 pub retention: Option<RetentionPolicy<K, V>>,
1301}
1302
1303impl<K, V> Default for ReactiveMapOptions<K, V>
1304where
1305 K: Clone + Eq + Hash + Send + Sync + 'static,
1306 V: Clone + Send + Sync + 'static,
1307{
1308 fn default() -> Self {
1309 Self {
1310 name: "reactiveMap".into(),
1311 backend: None,
1312 mutation_log: false,
1313 default_ttl: None,
1314 max_size: None,
1315 retention: None,
1316 }
1317 }
1318}
1319
1320#[derive(Debug, Clone, PartialEq, Eq)]
1322pub struct MapConfigError(pub String);
1323
1324impl std::fmt::Display for MapConfigError {
1325 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1326 f.write_str(&self.0)
1327 }
1328}
1329
1330impl std::error::Error for MapConfigError {}
1331
1332impl<K, V> ReactiveMap<K, V>
1333where
1334 K: Clone + Eq + Hash + Send + Sync + 'static,
1335 V: Clone + Send + Sync + 'static,
1336{
1337 pub fn new(
1342 core: &Core,
1343 intern: InternFn<Vec<(K, V)>>,
1344 opts: ReactiveMapOptions<K, V>,
1345 ) -> Result<Self, MapConfigError> {
1346 if opts.max_size.is_some() && opts.retention.is_some() {
1347 return Err(MapConfigError(
1348 "max_size (LRU) and retention are mutually exclusive".into(),
1349 ));
1350 }
1351 if let Some(ref r) = opts.retention {
1352 if r.archive_threshold.is_none() && r.max_size.is_none() {
1353 return Err(MapConfigError(
1354 "retention requires at least one of archive_threshold or max_size".into(),
1355 ));
1356 }
1357 }
1358 if let Some(ttl) = opts.default_ttl {
1359 if ttl <= 0.0 {
1360 return Err(MapConfigError("default_ttl must be > 0".into()));
1361 }
1362 }
1363 let node_id = core
1364 .register_state(HandleId::new(0), false)
1365 .expect("register_state for ReactiveMap");
1366 let backend: Box<dyn MapBackend<K, V>> = opts
1367 .backend
1368 .unwrap_or_else(|| Box::new(HashMapBackend::new()));
1369 let mutation_log = if opts.mutation_log {
1370 Some(Vec::new())
1371 } else {
1372 None
1373 };
1374 let default_ttl_ns = opts.default_ttl.map(|secs| (secs * 1_000_000_000.0) as u64);
1375 let inner = MapInner {
1376 backend,
1377 mutation_log,
1378 structure_name: opts.name,
1379 ttl_expiry: HashMap::new(),
1380 default_ttl_ns,
1381 lru_order: Vec::new(),
1382 lru_max_size: opts.max_size,
1383 retention: opts.retention,
1384 };
1385 Ok(Self {
1386 inner: Mutex::new(inner),
1387 emitter: EmitHandle {
1388 node_id,
1389 intern,
1390 version: AtomicU64::new(0),
1391 },
1392 node_id,
1393 })
1394 }
1395
1396 #[must_use]
1397 pub fn size(&self) -> usize {
1398 self.inner.lock().backend.size()
1399 }
1400
1401 pub fn has(&self, core: &Core, key: &K) -> bool {
1404 let (has, expired) = {
1405 let mut inner = self.inner.lock();
1406 let mut target_expired = false;
1407 if inner.default_ttl_ns.is_some() {
1409 if let Some(&exp) = inner.ttl_expiry.get(key) {
1410 if monotonic_ns() >= exp {
1411 target_expired = true;
1412 }
1413 }
1414 }
1415 let mut expired = inner.prune_expired_inner();
1417 if target_expired && !expired.iter().any(|(k, _)| k == key) {
1418 if let Some(prev) = inner.backend.get(key) {
1421 inner.backend.delete(key);
1422 inner.ttl_expiry.remove(key);
1423 inner.lru_remove(key);
1424 expired.push((key.clone(), prev));
1425 }
1426 }
1427 let has = if target_expired {
1428 false
1429 } else {
1430 let h = inner.backend.has(key);
1431 if h {
1432 inner.lru_touch(key);
1433 }
1434 h
1435 };
1436 (has, expired)
1437 };
1438 if !expired.is_empty() {
1439 let snapshot = self.inner.lock().backend.to_vec();
1440 let version = self.emitter.emit(core, snapshot);
1441 let mut inner = self.inner.lock();
1442 for (k, prev) in expired {
1443 inner.record(
1444 MapChange::Delete {
1445 key: k,
1446 previous: prev,
1447 reason: DeleteReason::Expired,
1448 },
1449 version.clone(),
1450 );
1451 }
1452 }
1453 has
1454 }
1455
1456 pub fn get(&self, core: &Core, key: &K) -> Option<V> {
1459 let (value, expired) = {
1460 let mut inner = self.inner.lock();
1461 let mut target_expired = false;
1462 if inner.default_ttl_ns.is_some() {
1463 if let Some(&exp) = inner.ttl_expiry.get(key) {
1464 if monotonic_ns() >= exp {
1465 target_expired = true;
1466 }
1467 }
1468 }
1469 let mut expired = inner.prune_expired_inner();
1471 if target_expired && !expired.iter().any(|(k, _)| k == key) {
1472 if let Some(prev) = inner.backend.get(key) {
1474 inner.backend.delete(key);
1475 inner.ttl_expiry.remove(key);
1476 inner.lru_remove(key);
1477 expired.push((key.clone(), prev));
1478 }
1479 }
1480 let value = if target_expired {
1481 None
1482 } else {
1483 let v = inner.backend.get(key);
1484 if v.is_some() {
1485 inner.lru_touch(key);
1486 }
1487 v
1488 };
1489 (value, expired)
1490 };
1491 if !expired.is_empty() {
1492 let snapshot = self.inner.lock().backend.to_vec();
1493 let version = self.emitter.emit(core, snapshot);
1494 let mut inner = self.inner.lock();
1495 for (k, prev) in expired {
1496 inner.record(
1497 MapChange::Delete {
1498 key: k,
1499 previous: prev,
1500 reason: DeleteReason::Expired,
1501 },
1502 version.clone(),
1503 );
1504 }
1505 }
1506 value
1507 }
1508
1509 pub fn set(&self, core: &Core, key: K, value: V) {
1510 self.set_with_ttl(core, key, value, None);
1511 }
1512
1513 pub fn set_with_ttl(&self, core: &Core, key: K, value: V, ttl: Option<f64>) {
1518 if let Some(t) = ttl {
1519 assert!(
1520 t > 0.0 && t.is_finite(),
1521 "per-call ttl must be positive and finite"
1522 );
1523 }
1524 let (snapshot, change, eviction_changes) = {
1525 let mut inner = self.inner.lock();
1526 let expired = inner.prune_expired_inner();
1527 let change = inner.mutation_log.is_some().then(|| MapChange::Set {
1528 key: key.clone(),
1529 value: value.clone(),
1530 });
1531 inner.set_ttl_with(&key, ttl);
1532 inner.lru_remove(&key);
1533 if inner.lru_max_size.is_some() {
1534 inner.lru_order.push(key.clone());
1535 }
1536 inner.backend.set(key, value);
1537 let evicted = inner.lru_evict();
1538 let archived = inner.apply_retention_inner();
1539 let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1540 for (k, prev) in expired {
1541 eviction_changes.push((k, prev, DeleteReason::Expired));
1542 }
1543 for (k, prev) in evicted {
1544 eviction_changes.push((k, prev, DeleteReason::LruEvict));
1545 }
1546 for (k, v, s) in &archived {
1547 if let Some(on_archive) =
1548 &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1549 {
1550 on_archive(k, v, *s);
1551 }
1552 eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1553 }
1554 (inner.backend.to_vec(), change, eviction_changes)
1555 };
1556 let version = self.emitter.emit(core, snapshot);
1557 if change.is_some() || !eviction_changes.is_empty() {
1558 let mut inner = self.inner.lock();
1559 if let Some(change) = change {
1560 inner.record(change, version.clone());
1561 }
1562 for (k, prev, reason) in eviction_changes {
1563 inner.record(
1564 MapChange::Delete {
1565 key: k,
1566 previous: prev,
1567 reason,
1568 },
1569 version.clone(),
1570 );
1571 }
1572 }
1573 }
1574
1575 pub fn set_many(&self, core: &Core, entries: Vec<(K, V)>) {
1576 self.set_many_with_ttl(core, entries, None);
1577 }
1578
1579 pub fn set_many_with_ttl(&self, core: &Core, entries: Vec<(K, V)>, ttl: Option<f64>) {
1584 if let Some(t) = ttl {
1585 assert!(
1586 t > 0.0 && t.is_finite(),
1587 "per-call ttl must be positive and finite"
1588 );
1589 }
1590 if entries.is_empty() {
1591 return;
1592 }
1593 let (snapshot, changes, eviction_changes) = {
1594 let mut inner = self.inner.lock();
1595 let expired = inner.prune_expired_inner();
1596 let changes: Option<Vec<MapChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1597 entries
1598 .iter()
1599 .map(|(k, v)| MapChange::Set {
1600 key: k.clone(),
1601 value: v.clone(),
1602 })
1603 .collect()
1604 });
1605 for (k, _) in &entries {
1606 inner.set_ttl_with(k, ttl);
1607 inner.lru_remove(k);
1608 if inner.lru_max_size.is_some() {
1609 inner.lru_order.push(k.clone());
1610 }
1611 }
1612 inner.backend.set_many(entries);
1613 let evicted = inner.lru_evict();
1614 let archived = inner.apply_retention_inner();
1615 let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1616 for (k, prev) in expired {
1617 eviction_changes.push((k, prev, DeleteReason::Expired));
1618 }
1619 for (k, prev) in evicted {
1620 eviction_changes.push((k, prev, DeleteReason::LruEvict));
1621 }
1622 for (k, v, s) in &archived {
1623 if let Some(on_archive) =
1624 &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1625 {
1626 on_archive(k, v, *s);
1627 }
1628 eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1629 }
1630 (inner.backend.to_vec(), changes, eviction_changes)
1631 };
1632 let version = self.emitter.emit(core, snapshot);
1633 if changes.is_some() || !eviction_changes.is_empty() {
1634 let mut inner = self.inner.lock();
1635 if let Some(changes) = changes {
1636 for change in changes {
1637 inner.record(change, version.clone());
1638 }
1639 }
1640 for (k, prev, reason) in eviction_changes {
1641 inner.record(
1642 MapChange::Delete {
1643 key: k,
1644 previous: prev,
1645 reason,
1646 },
1647 version.clone(),
1648 );
1649 }
1650 }
1651 }
1652
1653 pub fn delete(&self, core: &Core, key: &K) {
1654 let (snapshot, previous) = {
1655 let mut inner = self.inner.lock();
1656 let previous = inner.backend.get(key);
1657 if !inner.backend.delete(key) {
1658 return;
1659 }
1660 inner.ttl_expiry.remove(key);
1661 inner.lru_remove(key);
1662 (inner.backend.to_vec(), previous)
1663 };
1664 let version = self.emitter.emit(core, snapshot);
1665 if let Some(prev) = previous {
1666 self.inner.lock().record(
1667 MapChange::Delete {
1668 key: key.clone(),
1669 previous: prev,
1670 reason: DeleteReason::Explicit,
1671 },
1672 version,
1673 );
1674 }
1675 }
1676
1677 pub fn delete_many(&self, core: &Core, keys: &[K]) {
1678 let (snapshot, actually_deleted) = {
1679 let mut inner = self.inner.lock();
1680 let actually_deleted: Vec<(K, V)> = keys
1681 .iter()
1682 .filter_map(|k| inner.backend.get(k).map(|v| (k.clone(), v)))
1683 .collect();
1684 let removed = inner.backend.delete_many(keys);
1685 if removed == 0 {
1686 return;
1687 }
1688 for k in keys {
1689 inner.ttl_expiry.remove(k);
1690 inner.lru_remove(k);
1691 }
1692 (inner.backend.to_vec(), actually_deleted)
1693 };
1694 let version = self.emitter.emit(core, snapshot);
1695 if !actually_deleted.is_empty() {
1696 let mut inner = self.inner.lock();
1697 for (k, prev) in actually_deleted {
1698 inner.record(
1699 MapChange::Delete {
1700 key: k,
1701 previous: prev,
1702 reason: DeleteReason::Explicit,
1703 },
1704 version.clone(),
1705 );
1706 }
1707 }
1708 }
1709
1710 pub fn clear(&self, core: &Core) {
1711 let (snapshot, count) = {
1712 let mut inner = self.inner.lock();
1713 let count = inner.backend.clear();
1714 if count == 0 {
1715 return;
1716 }
1717 inner.ttl_expiry.clear();
1718 inner.lru_order.clear();
1719 (inner.backend.to_vec(), count)
1720 };
1721 let version = self.emitter.emit(core, snapshot);
1722 self.inner
1723 .lock()
1724 .record(MapChange::Clear { count }, version);
1725 }
1726
1727 pub fn prune_expired(&self, core: &Core) -> usize {
1729 let expired = {
1730 let mut inner = self.inner.lock();
1731 inner.prune_expired_inner()
1732 };
1733 if expired.is_empty() {
1734 return 0;
1735 }
1736 let count = expired.len();
1737 let snapshot = self.inner.lock().backend.to_vec();
1738 let version = self.emitter.emit(core, snapshot);
1739 let mut inner = self.inner.lock();
1740 for (k, prev) in expired {
1741 inner.record(
1742 MapChange::Delete {
1743 key: k,
1744 previous: prev,
1745 reason: DeleteReason::Expired,
1746 },
1747 version.clone(),
1748 );
1749 }
1750 count
1751 }
1752
1753 #[must_use]
1754 pub fn to_vec(&self) -> Vec<(K, V)> {
1755 self.inner.lock().backend.to_vec()
1756 }
1757
1758 #[must_use]
1759 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<MapChange<K, V>>>> {
1760 self.inner.lock().mutation_log.clone()
1761 }
1762}
1763
1764pub struct ReactiveIndex<K, V>
1772where
1773 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1774 V: Clone + Send + Sync + 'static,
1775{
1776 inner: Mutex<IndexInner<K, V>>,
1777 emitter: EmitHandle<Vec<IndexRow<K, V>>>,
1778 pub node_id: NodeId,
1779}
1780
1781pub type IndexEqualsFn<K, V> = Arc<dyn Fn(&IndexRow<K, V>, &IndexRow<K, V>) -> bool + Send + Sync>;
1783
1784struct IndexInner<K, V>
1785where
1786 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1787 V: Clone + Send + Sync + 'static,
1788{
1789 backend: Box<dyn IndexBackend<K, V>>,
1790 mutation_log: Option<Vec<BaseChange<IndexChange<K, V>>>>,
1791 structure_name: String,
1792 equals: Option<IndexEqualsFn<K, V>>,
1797}
1798
1799impl<K, V> IndexInner<K, V>
1800where
1801 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1802 V: Clone + Send + Sync + 'static,
1803{
1804 fn record(&mut self, change: IndexChange<K, V>, version: Version) {
1805 if let Some(log) = &mut self.mutation_log {
1806 log.push(BaseChange {
1807 structure: self.structure_name.clone(),
1808 version,
1809 t_ns: wall_clock_ns(),
1810 seq: None,
1811 lifecycle: Lifecycle::Data,
1812 change,
1813 });
1814 }
1815 }
1816}
1817
1818pub struct UpsertOptions<K, V>
1820where
1821 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1822 V: Clone + Send + Sync + 'static,
1823{
1824 pub equals: Option<IndexEqualsFn<K, V>>,
1826}
1827
1828impl<K, V> Default for UpsertOptions<K, V>
1829where
1830 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1831 V: Clone + Send + Sync + 'static,
1832{
1833 fn default() -> Self {
1834 Self { equals: None }
1835 }
1836}
1837
1838pub struct ReactiveIndexOptions<K, V>
1840where
1841 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1842 V: Clone + Send + Sync + 'static,
1843{
1844 pub name: String,
1845 pub backend: Option<Box<dyn IndexBackend<K, V>>>,
1846 pub mutation_log: bool,
1847 pub equals: Option<IndexEqualsFn<K, V>>,
1849}
1850
1851impl<K, V> Default for ReactiveIndexOptions<K, V>
1852where
1853 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1854 V: Clone + Send + Sync + 'static,
1855{
1856 fn default() -> Self {
1857 Self {
1858 name: "reactiveIndex".into(),
1859 backend: None,
1860 mutation_log: false,
1861 equals: None,
1862 }
1863 }
1864}
1865
1866impl<K, V> ReactiveIndex<K, V>
1867where
1868 K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1869 V: Clone + Send + Sync + 'static,
1870{
1871 #[must_use]
1872 pub fn new(
1873 core: &Core,
1874 intern: InternFn<Vec<IndexRow<K, V>>>,
1875 opts: ReactiveIndexOptions<K, V>,
1876 ) -> Self {
1877 let node_id = core
1878 .register_state(HandleId::new(0), false)
1879 .expect("register_state for ReactiveIndex");
1880 let backend: Box<dyn IndexBackend<K, V>> = opts
1881 .backend
1882 .unwrap_or_else(|| Box::new(VecIndexBackend::new()));
1883 let mutation_log = if opts.mutation_log {
1884 Some(Vec::new())
1885 } else {
1886 None
1887 };
1888 let inner = IndexInner {
1889 backend,
1890 mutation_log,
1891 structure_name: opts.name,
1892 equals: opts.equals,
1893 };
1894 Self {
1895 inner: Mutex::new(inner),
1896 emitter: EmitHandle {
1897 node_id,
1898 intern,
1899 version: AtomicU64::new(0),
1900 },
1901 node_id,
1902 }
1903 }
1904
1905 #[must_use]
1906 pub fn size(&self) -> usize {
1907 self.inner.lock().backend.size()
1908 }
1909
1910 #[must_use]
1911 pub fn has(&self, primary: &K) -> bool {
1912 self.inner.lock().backend.has(primary)
1913 }
1914
1915 #[must_use]
1916 pub fn get(&self, primary: &K) -> Option<V> {
1917 self.inner.lock().backend.get(primary)
1918 }
1919
1920 pub fn upsert(&self, core: &Core, primary: K, secondary: String, value: V) -> bool {
1923 self.upsert_with(core, primary, secondary, value, &UpsertOptions::default())
1924 }
1925
1926 pub fn upsert_with(
1931 &self,
1932 core: &Core,
1933 primary: K,
1934 secondary: String,
1935 value: V,
1936 opts: &UpsertOptions<K, V>,
1937 ) -> bool {
1938 let (is_new, snapshot, change) = {
1939 let mut inner = self.inner.lock();
1940 let eq_fn = opts.equals.as_ref().or(inner.equals.as_ref());
1942 if let Some(eq) = eq_fn {
1943 if let Some(existing_row) = inner.backend.get_row(&primary) {
1944 let proposed = IndexRow {
1945 primary: primary.clone(),
1946 secondary: secondary.clone(),
1947 value: value.clone(),
1948 };
1949 if eq(&existing_row, &proposed) {
1950 return false;
1951 }
1952 }
1953 }
1954 let change = inner.mutation_log.is_some().then(|| IndexChange::Upsert {
1955 primary: primary.clone(),
1956 secondary: secondary.clone(),
1957 value: value.clone(),
1958 });
1959 let is_new = inner.backend.upsert(primary, secondary, value);
1960 (is_new, inner.backend.to_ordered(), change)
1961 };
1962 let version = self.emitter.emit(core, snapshot);
1963 if let Some(change) = change {
1964 self.inner.lock().record(change, version);
1965 }
1966 is_new
1967 }
1968
1969 pub fn upsert_many(&self, core: &Core, rows: Vec<(K, String, V)>) {
1972 if rows.is_empty() {
1973 return;
1974 }
1975 let (snapshot, changes) = {
1976 let mut inner = self.inner.lock();
1977 let effective_rows: Vec<(K, String, V)> = if let Some(eq) = &inner.equals {
1979 rows.into_iter()
1980 .filter(|(pk, sec, val)| {
1981 if let Some(existing) = inner.backend.get_row(pk) {
1982 let proposed = IndexRow {
1983 primary: pk.clone(),
1984 secondary: sec.clone(),
1985 value: val.clone(),
1986 };
1987 !eq(&existing, &proposed)
1988 } else {
1989 true
1990 }
1991 })
1992 .collect()
1993 } else {
1994 rows
1995 };
1996 if effective_rows.is_empty() {
1997 return;
1998 }
1999 let changes: Option<Vec<IndexChange<K, V>>> = inner.mutation_log.is_some().then(|| {
2000 effective_rows
2001 .iter()
2002 .map(|(k, s, v)| IndexChange::Upsert {
2003 primary: k.clone(),
2004 secondary: s.clone(),
2005 value: v.clone(),
2006 })
2007 .collect()
2008 });
2009 inner.backend.upsert_many(effective_rows);
2010 (inner.backend.to_ordered(), changes)
2011 };
2012 let version = self.emitter.emit(core, snapshot);
2013 if let Some(changes) = changes {
2014 let mut inner = self.inner.lock();
2015 for change in changes {
2016 inner.record(change, version.clone());
2017 }
2018 }
2019 }
2020
2021 pub fn delete(&self, core: &Core, primary: &K) {
2022 let snapshot = {
2023 let mut inner = self.inner.lock();
2024 if !inner.backend.delete(primary) {
2025 return;
2026 }
2027 inner.backend.to_ordered()
2028 };
2029 let version = self.emitter.emit(core, snapshot);
2030 self.inner.lock().record(
2031 IndexChange::Delete {
2032 primary: primary.clone(),
2033 },
2034 version,
2035 );
2036 }
2037
2038 pub fn delete_many(&self, core: &Core, primaries: &[K]) {
2039 let (snapshot, actually_deleted) = {
2040 let mut inner = self.inner.lock();
2041 let actually_deleted: Vec<K> = if inner.mutation_log.is_some() {
2043 primaries
2044 .iter()
2045 .filter(|k| inner.backend.has(k))
2046 .cloned()
2047 .collect()
2048 } else {
2049 vec![]
2050 };
2051 let removed = inner.backend.delete_many(primaries);
2052 if removed == 0 {
2053 return;
2054 }
2055 (inner.backend.to_ordered(), actually_deleted)
2056 };
2057 let version = self.emitter.emit(core, snapshot);
2058 if !actually_deleted.is_empty() {
2059 self.inner.lock().record(
2060 IndexChange::DeleteMany {
2061 primaries: actually_deleted,
2062 },
2063 version,
2064 );
2065 }
2066 }
2067
2068 pub fn clear(&self, core: &Core) {
2069 let (snapshot, count) = {
2070 let mut inner = self.inner.lock();
2071 let count = inner.backend.clear();
2072 if count == 0 {
2073 return;
2074 }
2075 (inner.backend.to_ordered(), count)
2076 };
2077 let version = self.emitter.emit(core, snapshot);
2078 self.inner
2079 .lock()
2080 .record(IndexChange::Clear { count }, version);
2081 }
2082
2083 #[must_use]
2084 pub fn to_ordered(&self) -> Vec<IndexRow<K, V>> {
2085 self.inner.lock().backend.to_ordered()
2086 }
2087
2088 #[must_use]
2089 pub fn to_primary_map(&self) -> Vec<(K, V)> {
2090 self.inner.lock().backend.to_primary_map()
2091 }
2092
2093 #[must_use]
2098 pub fn range_by_primary(&self, start: &K, end: &K) -> Vec<V>
2099 where
2100 K: Ord,
2101 {
2102 let mut rows: Vec<(K, V)> = self
2103 .inner
2104 .lock()
2105 .backend
2106 .to_primary_map()
2107 .into_iter()
2108 .filter(|(k, _)| k >= start && k < end)
2109 .collect();
2110 rows.sort_by(|a, b| a.0.cmp(&b.0));
2111 rows.into_iter().map(|(_, v)| v).collect()
2112 }
2113
2114 #[must_use]
2115 pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<IndexChange<K, V>>>> {
2116 self.inner.lock().mutation_log.clone()
2117 }
2118}