Skip to main content

graphrefly_structures/
reactive.rs

1//! Reactive data structures — Core-level integration (M5.A — D177/D179).
2//!
3//! Each structure wraps a backend trait behind a `parking_lot::Mutex`, owns a
4//! state `NodeId` registered with Core, and emits DIRTY->DATA snapshots on
5//! every mutation. Structures are standalone — no Graph dependency required.
6//!
7//! **Lock discipline (P1 fix):** Mutation methods hold the inner lock only
8//! during backend mutation and snapshot collection. The lock is dropped before
9//! `Core::emit()` is called, so subscribers are free to read (but not mutate)
10//! the structure during message delivery. The `EmitHandle` lives outside the
11//! inner `Mutex` and uses `AtomicU64` for thread-safe version counting.
12
13use std::collections::HashMap;
14use std::hash::Hash;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use parking_lot::Mutex;
19
20use graphrefly_core::{
21    monotonic_ns, wall_clock_ns, Core, HandleId, Message, NodeId, Subscription, WeakCore,
22};
23
24use crate::backend::{
25    HashMapBackend, IndexBackend, IndexRow, ListBackend, LogBackend, MapBackend, VecIndexBackend,
26    VecListBackend, VecLogBackend,
27};
28use crate::changeset::{
29    BaseChange, DeleteReason, IndexChange, Lifecycle, ListChange, LogChange, MapChange, Version,
30};
31
32// ---------------------------------------------------------------------------
33// Intern function type — user-supplied value->handle conversion
34// ---------------------------------------------------------------------------
35
36/// User-supplied function that converts a snapshot value `Vec<T>` (or similar)
37/// into a `HandleId` for Core emission. The binding owns the value registry;
38/// this closure captures `Arc<Binding>` and calls `binding.intern(snapshot)`.
39pub type InternFn<S> = Arc<dyn Fn(S) -> HandleId + Send + Sync>;
40
41// ---------------------------------------------------------------------------
42// EmitHandle — outside Mutex, thread-safe emission
43// ---------------------------------------------------------------------------
44
45/// Emission handle stored outside the inner Mutex. Uses `AtomicU64` for the
46/// version counter so no lock is needed during `Core::emit()`.
47struct EmitHandle<S> {
48    core: WeakCore,
49    node_id: NodeId,
50    intern: InternFn<S>,
51    version: AtomicU64,
52}
53
54impl<S> EmitHandle<S> {
55    /// Bump version, intern snapshot, emit via Core. Returns the new version.
56    fn emit(&self, snapshot: S) -> Version {
57        let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
58        let handle = (self.intern)(snapshot);
59        if let Some(core) = self.core.upgrade() {
60            core.emit(self.node_id, handle);
61        }
62        Version::Counter(ver)
63    }
64}
65
66// ---------------------------------------------------------------------------
67// IndexOutOfBounds error
68// ---------------------------------------------------------------------------
69
70/// Error returned when a list `insert` or `insert_many` index is out of bounds.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub struct IndexOutOfBounds;
73
74impl std::fmt::Display for IndexOutOfBounds {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.write_str("index out of bounds")
77    }
78}
79
80impl std::error::Error for IndexOutOfBounds {}
81
82// ---------------------------------------------------------------------------
83// ReactiveLog
84// ---------------------------------------------------------------------------
85
86/// Reactive append-only log with optional ring-buffer cap.
87///
88/// Emits `Vec<T>` snapshots via a Core state node on every mutation.
89/// Optionally records `BaseChange<LogChange<T>>` deltas to a companion
90/// mutation log.
91pub struct ReactiveLog<T: Clone + Send + Sync + 'static> {
92    inner: Arc<Mutex<LogInner<T>>>,
93    emitter: EmitHandle<Vec<T>>,
94    /// The Core state node backing this structure.
95    pub node_id: NodeId,
96}
97
98struct LogInner<T: Clone + Send + Sync + 'static> {
99    backend: Box<dyn LogBackend<T>>,
100    mutation_log: Option<Vec<BaseChange<LogChange<T>>>>,
101    structure_name: String,
102}
103
104impl<T: Clone + Send + Sync + 'static> LogInner<T> {
105    fn record(&mut self, change: LogChange<T>, version: Version) {
106        if let Some(log) = &mut self.mutation_log {
107            log.push(BaseChange {
108                structure: self.structure_name.clone(),
109                version,
110                t_ns: wall_clock_ns(),
111                seq: None,
112                lifecycle: Lifecycle::Data,
113                change,
114            });
115        }
116    }
117}
118
119/// Options for constructing a [`ReactiveLog`].
120pub struct ReactiveLogOptions<T: Clone + Send + Sync + 'static> {
121    pub name: String,
122    pub max_size: Option<usize>,
123    pub backend: Option<Box<dyn LogBackend<T>>>,
124    pub mutation_log: bool,
125}
126
127impl<T: Clone + Send + Sync + 'static> Default for ReactiveLogOptions<T> {
128    fn default() -> Self {
129        Self {
130            name: "reactiveLog".into(),
131            max_size: None,
132            backend: None,
133            mutation_log: false,
134        }
135    }
136}
137
138impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
139    /// Create a new reactive log registered with the given Core.
140    ///
141    /// `intern` converts a `Vec<T>` snapshot to a `HandleId` for Core emission.
142    #[must_use]
143    pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveLogOptions<T>) -> Self {
144        let node_id = core
145            .register_state(HandleId::new(0), false)
146            .expect("register_state for ReactiveLog");
147        let backend: Box<dyn LogBackend<T>> = opts
148            .backend
149            .unwrap_or_else(|| Box::new(VecLogBackend::new(opts.max_size)));
150        let mutation_log = if opts.mutation_log {
151            Some(Vec::new())
152        } else {
153            None
154        };
155        let inner = LogInner {
156            backend,
157            mutation_log,
158            structure_name: opts.name,
159        };
160        Self {
161            inner: Arc::new(Mutex::new(inner)),
162            emitter: EmitHandle {
163                core: core.weak_handle(),
164                node_id,
165                intern,
166                version: AtomicU64::new(0),
167            },
168            node_id,
169        }
170    }
171
172    #[must_use]
173    pub fn size(&self) -> usize {
174        self.inner.lock().backend.size()
175    }
176
177    #[must_use]
178    pub fn at(&self, index: i64) -> Option<T> {
179        self.inner.lock().backend.at(index)
180    }
181
182    pub fn append(&self, value: T) {
183        let (snapshot, change) = {
184            let mut inner = self.inner.lock();
185            let change = inner.mutation_log.is_some().then(|| LogChange::Append {
186                value: value.clone(),
187            });
188            inner.backend.append(value);
189            (inner.backend.to_vec(), change)
190        };
191        let version = self.emitter.emit(snapshot);
192        if let Some(change) = change {
193            self.inner.lock().record(change, version);
194        }
195    }
196
197    pub fn append_many(&self, values: Vec<T>) {
198        if values.is_empty() {
199            return;
200        }
201        let (snapshot, change) = {
202            let mut inner = self.inner.lock();
203            let change = inner.mutation_log.is_some().then(|| LogChange::AppendMany {
204                values: values.clone(),
205            });
206            inner.backend.append_many(values);
207            (inner.backend.to_vec(), change)
208        };
209        let version = self.emitter.emit(snapshot);
210        if let Some(change) = change {
211            self.inner.lock().record(change, version);
212        }
213    }
214
215    pub fn clear(&self) {
216        let (snapshot, count) = {
217            let mut inner = self.inner.lock();
218            let count = inner.backend.clear();
219            if count == 0 {
220                return;
221            }
222            (inner.backend.to_vec(), count)
223        };
224        let version = self.emitter.emit(snapshot);
225        self.inner
226            .lock()
227            .record(LogChange::Clear { count }, version);
228    }
229
230    pub fn trim_head(&self, n: usize) {
231        if n == 0 {
232            return;
233        }
234        let (snapshot, actual) = {
235            let mut inner = self.inner.lock();
236            let actual = inner.backend.trim_head(n);
237            if actual == 0 {
238                return;
239            }
240            (inner.backend.to_vec(), actual)
241        };
242        let version = self.emitter.emit(snapshot);
243        self.inner
244            .lock()
245            .record(LogChange::TrimHead { n: actual }, version);
246    }
247
248    #[must_use]
249    pub fn to_vec(&self) -> Vec<T> {
250        self.inner.lock().backend.to_vec()
251    }
252
253    /// Snapshot the mutation log (if enabled). Returns `None` if mutation log
254    /// was not enabled at construction.
255    #[must_use]
256    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<LogChange<T>>>> {
257        self.inner.lock().mutation_log.clone()
258    }
259}
260
261// ---------------------------------------------------------------------------
262// ReactiveLog — views, scan, attach, attachStorage (M5.B)
263// ---------------------------------------------------------------------------
264
265/// View specification for [`ReactiveLog::view`].
266pub enum ViewSpec {
267    /// Last `n` entries.
268    Tail { n: usize },
269    /// Range `[start..stop)`. `stop` defaults to log length.
270    Slice { start: usize, stop: Option<usize> },
271    /// Entries from cursor position onward. The cursor node provides a `usize`
272    /// position via `read_cursor(handle) -> usize`.
273    FromCursor {
274        cursor_node: NodeId,
275        read_cursor: Arc<dyn Fn(HandleId) -> usize + Send + Sync>,
276    },
277}
278
279/// Handle to a log view. Dropping disposes the view subscription.
280pub struct LogView {
281    /// The state node backing this view. Subscribe to receive view snapshots.
282    pub node_id: NodeId,
283    _subscriptions: Vec<Subscription>,
284}
285
286/// Handle to a log scan. Dropping disposes the scan subscription.
287pub struct ScanHandle {
288    /// The state node backing this scan. Subscribe to receive accumulator snapshots.
289    pub node_id: NodeId,
290    _subscription: Subscription,
291}
292
293/// Trait for append-log storage sinks used by [`ReactiveLog::attach_storage`].
294///
295/// Implementors receive batches of entries and optionally support pre-loading.
296/// Methods return `Result` — errors are swallowed by `attach_storage` (matches
297/// TS try/catch semantics) so a failing tier doesn't break the reactive graph.
298pub trait AppendLogSink<T>: Send + Sync {
299    /// Append entries to persistent storage.
300    fn append_entries(&self, entries: &[T])
301        -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
302    /// Load previously stored entries (for pre-loading on startup).
303    /// Returns an empty vec if no entries are stored.
304    fn load_entries(&self) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>;
305}
306
307/// Handle to an attach-storage subscription. Dropping disposes the subscription.
308pub struct AttachStorageHandle {
309    _subscription: Subscription,
310}
311
312impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
313    /// Create a reactive view of this log. Returns a [`LogView`] whose
314    /// `node_id` emits `Vec<T>` snapshots on every log mutation.
315    ///
316    /// `intern` converts the view `Vec<T>` into a `HandleId` for Core emission.
317    #[allow(clippy::too_many_lines)]
318    pub fn view(&self, spec: ViewSpec, intern: InternFn<Vec<T>>) -> LogView {
319        let core = self.emitter.core.upgrade().expect("Core dropped");
320        let view_node = core
321            .register_state(HandleId::new(0), false)
322            .expect("register_state for LogView");
323        let view_emitter = Arc::new(EmitHandle {
324            core: self.emitter.core.clone(),
325            node_id: view_node,
326            intern,
327            version: AtomicU64::new(0),
328        });
329
330        let inner = Arc::clone(&self.inner);
331        let mut subscriptions = Vec::new();
332
333        match spec {
334            ViewSpec::Tail { n } => {
335                let inner_c = Arc::clone(&inner);
336                let emitter_c = Arc::clone(&view_emitter);
337                let sub = core.subscribe(
338                    self.node_id,
339                    Arc::new(move |msgs| {
340                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
341                            let guard = inner_c.lock();
342                            let data = guard.backend.to_vec();
343                            let start = data.len().saturating_sub(n);
344                            let view = data[start..].to_vec();
345                            drop(guard);
346                            emitter_c.emit(view);
347                        }
348                    }),
349                );
350                subscriptions.push(sub);
351            }
352            ViewSpec::Slice { start, stop } => {
353                let inner_c = Arc::clone(&inner);
354                let emitter_c = Arc::clone(&view_emitter);
355                let sub = core.subscribe(
356                    self.node_id,
357                    Arc::new(move |msgs| {
358                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
359                            let guard = inner_c.lock();
360                            let data = guard.backend.to_vec();
361                            let end = stop.unwrap_or(data.len()).min(data.len());
362                            let s = start.min(end);
363                            let view = data[s..end].to_vec();
364                            drop(guard);
365                            emitter_c.emit(view);
366                        }
367                    }),
368                );
369                subscriptions.push(sub);
370            }
371            ViewSpec::FromCursor {
372                cursor_node,
373                read_cursor,
374            } => {
375                let cursor_pos = Arc::new(Mutex::new(0usize));
376
377                // Cursor subscription: update position and recompute.
378                let cursor_pos_c = Arc::clone(&cursor_pos);
379                let inner_c = Arc::clone(&inner);
380                let emitter_c = Arc::clone(&view_emitter);
381                let read_cursor_c = Arc::clone(&read_cursor);
382                let sub_cursor = core.subscribe(
383                    cursor_node,
384                    Arc::new(move |msgs| {
385                        for m in msgs {
386                            if let Message::Data(h) = m {
387                                let pos = read_cursor_c(*h);
388                                *cursor_pos_c.lock() = pos;
389                                let guard = inner_c.lock();
390                                let data = guard.backend.to_vec();
391                                let s = pos.min(data.len());
392                                let view = data[s..].to_vec();
393                                drop(guard);
394                                emitter_c.emit(view);
395                            }
396                        }
397                    }),
398                );
399                subscriptions.push(sub_cursor);
400
401                // Log subscription: recompute with current cursor.
402                let cursor_pos_c2 = Arc::clone(&cursor_pos);
403                let inner_c2 = Arc::clone(&inner);
404                let emitter_c2 = view_emitter;
405                let sub_log = core.subscribe(
406                    self.node_id,
407                    Arc::new(move |msgs| {
408                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
409                            let pos = *cursor_pos_c2.lock();
410                            let guard = inner_c2.lock();
411                            let data = guard.backend.to_vec();
412                            let s = pos.min(data.len());
413                            let view = data[s..].to_vec();
414                            drop(guard);
415                            emitter_c2.emit(view);
416                        }
417                    }),
418                );
419                subscriptions.push(sub_log);
420            }
421        }
422
423        LogView {
424            node_id: view_node,
425            _subscriptions: subscriptions,
426        }
427    }
428
429    /// Incremental running aggregate over the log.
430    ///
431    /// Returns a [`ScanHandle`] whose `node_id` emits the current accumulator
432    /// on every log mutation. Appends are O(1) (only new entries processed);
433    /// `trimHead`/`clear` trigger a full rescan.
434    pub fn scan<TAcc: Clone + Send + Sync + 'static>(
435        &self,
436        initial: TAcc,
437        step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
438        intern: InternFn<TAcc>,
439    ) -> ScanHandle {
440        struct ScanState<T, TAcc> {
441            acc: TAcc,
442            processed: usize,
443            initial: TAcc,
444            step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
445        }
446
447        let core = self.emitter.core.upgrade().expect("Core dropped");
448        let scan_node = core
449            .register_state(HandleId::new(0), false)
450            .expect("register_state for Scan");
451
452        let state = Arc::new(Mutex::new(ScanState {
453            acc: initial.clone(),
454            processed: 0,
455            initial,
456            step,
457        }));
458        let inner = Arc::clone(&self.inner);
459        let scan_emitter = Arc::new(EmitHandle {
460            core: self.emitter.core.clone(),
461            node_id: scan_node,
462            intern,
463            version: AtomicU64::new(0),
464        });
465
466        let sub = core.subscribe(
467            self.node_id,
468            Arc::new(move |msgs| {
469                if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
470                    let mut ss = state.lock();
471                    let guard = inner.lock();
472                    let data = guard.backend.to_vec();
473                    drop(guard);
474
475                    if data.len() < ss.processed {
476                        // Length decreased (clear/trim) — full rescan.
477                        ss.acc = ss.initial.clone();
478                        ss.processed = 0;
479                    }
480                    for item in &data[ss.processed..] {
481                        ss.acc = (ss.step)(&ss.acc, item);
482                    }
483                    ss.processed = data.len();
484                    let acc = ss.acc.clone();
485                    drop(ss);
486                    scan_emitter.emit(acc);
487                }
488            }),
489        );
490
491        ScanHandle {
492            node_id: scan_node,
493            _subscription: sub,
494        }
495    }
496
497    /// Subscribe to an upstream node and append every DATA value into this log.
498    ///
499    /// `read_value` converts the upstream `HandleId` to a `T` for appending.
500    /// Returns a `Subscription` — dropping it stops the attachment.
501    pub fn attach(
502        &self,
503        upstream: NodeId,
504        read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
505    ) -> Subscription {
506        let core = self.emitter.core.upgrade().expect("Core dropped");
507        let inner = Arc::clone(&self.inner);
508        let weak_core = self.emitter.core.clone();
509        let node_id = self.node_id;
510        let intern = Arc::clone(&self.emitter.intern);
511
512        core.subscribe(
513            upstream,
514            Arc::new(move |msgs| {
515                for m in msgs {
516                    if let Message::Data(h) = m {
517                        let value = read_value(*h);
518                        let snapshot = {
519                            let mut guard = inner.lock();
520                            guard.backend.append(value);
521                            guard.backend.to_vec()
522                        };
523                        let handle = (intern)(snapshot);
524                        if let Some(c) = weak_core.upgrade() {
525                            c.emit(node_id, handle);
526                        }
527                    }
528                }
529            }),
530        )
531    }
532
533    /// Wire append-log storage sinks to this log.
534    ///
535    /// If `preload` is true, attempts `load_entries()` on the first sink that
536    /// returns data and appends to this log before subscribing. After
537    /// subscription, deltas are forwarded to ALL sinks. On trim/clear, the
538    /// full snapshot is re-shipped. Sink errors are swallowed (logged via
539    /// `eprintln!`) so a failing tier doesn't break the reactive graph.
540    pub fn attach_storage(
541        &self,
542        sinks: Vec<Arc<dyn AppendLogSink<T>>>,
543        preload: bool,
544    ) -> AttachStorageHandle {
545        if sinks.is_empty() {
546            let core = self.emitter.core.upgrade().expect("Core dropped");
547            let sub = core.subscribe(self.node_id, Arc::new(|_| {}));
548            return AttachStorageHandle { _subscription: sub };
549        }
550
551        if preload {
552            for sink in &sinks {
553                if let Ok(entries) = sink.load_entries() {
554                    if !entries.is_empty() {
555                        self.append_many(entries);
556                        break;
557                    }
558                }
559            }
560        }
561
562        let current_size = self.size();
563        // All sinks start delivered at current_size (post-preload snapshot).
564        // The subscriber only sees future emissions, so no double-write.
565        let delivered: Vec<Arc<Mutex<usize>>> = sinks
566            .iter()
567            .map(|_| Arc::new(Mutex::new(current_size)))
568            .collect();
569
570        let core = self.emitter.core.upgrade().expect("Core dropped");
571        let inner = Arc::clone(&self.inner);
572        let sinks_arc = sinks;
573        let delivered_arc = delivered;
574
575        let sub = core.subscribe(
576            self.node_id,
577            Arc::new(move |msgs| {
578                if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
579                    let guard = inner.lock();
580                    let data = guard.backend.to_vec();
581                    drop(guard);
582
583                    for (i, sink) in sinks_arc.iter().enumerate() {
584                        let mut del = delivered_arc[i].lock();
585                        let result = match data.len().cmp(&*del) {
586                            std::cmp::Ordering::Greater => sink.append_entries(&data[*del..]),
587                            std::cmp::Ordering::Less => sink.append_entries(&data),
588                            std::cmp::Ordering::Equal => continue,
589                        };
590                        match result {
591                            Ok(()) => *del = data.len(),
592                            Err(e) => eprintln!("attach_storage sink[{i}] error: {e}"),
593                        }
594                    }
595                }
596            }),
597        );
598
599        AttachStorageHandle { _subscription: sub }
600    }
601}
602
603// ---------------------------------------------------------------------------
604// ReactiveList
605// ---------------------------------------------------------------------------
606
607/// Reactive ordered list.
608///
609/// Emits `Vec<T>` snapshots via a Core state node on every mutation.
610pub struct ReactiveList<T: Clone + Send + Sync + 'static> {
611    inner: Mutex<ListInner<T>>,
612    emitter: EmitHandle<Vec<T>>,
613    pub node_id: NodeId,
614}
615
616struct ListInner<T: Clone + Send + Sync + 'static> {
617    backend: Box<dyn ListBackend<T>>,
618    mutation_log: Option<Vec<BaseChange<ListChange<T>>>>,
619    structure_name: String,
620}
621
622impl<T: Clone + Send + Sync + 'static> ListInner<T> {
623    fn record(&mut self, change: ListChange<T>, version: Version) {
624        if let Some(log) = &mut self.mutation_log {
625            log.push(BaseChange {
626                structure: self.structure_name.clone(),
627                version,
628                t_ns: wall_clock_ns(),
629                seq: None,
630                lifecycle: Lifecycle::Data,
631                change,
632            });
633        }
634    }
635}
636
637/// Options for constructing a [`ReactiveList`].
638pub struct ReactiveListOptions<T: Clone + Send + Sync + 'static> {
639    pub name: String,
640    pub backend: Option<Box<dyn ListBackend<T>>>,
641    pub mutation_log: bool,
642}
643
644impl<T: Clone + Send + Sync + 'static> Default for ReactiveListOptions<T> {
645    fn default() -> Self {
646        Self {
647            name: "reactiveList".into(),
648            backend: None,
649            mutation_log: false,
650        }
651    }
652}
653
654impl<T: Clone + Send + Sync + 'static> ReactiveList<T> {
655    #[must_use]
656    pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveListOptions<T>) -> Self {
657        let node_id = core
658            .register_state(HandleId::new(0), false)
659            .expect("register_state for ReactiveList");
660        let backend: Box<dyn ListBackend<T>> = opts
661            .backend
662            .unwrap_or_else(|| Box::new(VecListBackend::new()));
663        let mutation_log = if opts.mutation_log {
664            Some(Vec::new())
665        } else {
666            None
667        };
668        let inner = ListInner {
669            backend,
670            mutation_log,
671            structure_name: opts.name,
672        };
673        Self {
674            inner: Mutex::new(inner),
675            emitter: EmitHandle {
676                core: core.weak_handle(),
677                node_id,
678                intern,
679                version: AtomicU64::new(0),
680            },
681            node_id,
682        }
683    }
684
685    #[must_use]
686    pub fn size(&self) -> usize {
687        self.inner.lock().backend.size()
688    }
689
690    #[must_use]
691    pub fn at(&self, index: i64) -> Option<T> {
692        self.inner.lock().backend.at(index)
693    }
694
695    pub fn append(&self, value: T) {
696        let (snapshot, change) = {
697            let mut inner = self.inner.lock();
698            let change = inner.mutation_log.is_some().then(|| ListChange::Append {
699                value: value.clone(),
700            });
701            inner.backend.append(value);
702            (inner.backend.to_vec(), change)
703        };
704        let version = self.emitter.emit(snapshot);
705        if let Some(change) = change {
706            self.inner.lock().record(change, version);
707        }
708    }
709
710    pub fn append_many(&self, values: Vec<T>) {
711        if values.is_empty() {
712            return;
713        }
714        let (snapshot, change) = {
715            let mut inner = self.inner.lock();
716            let change = inner
717                .mutation_log
718                .is_some()
719                .then(|| ListChange::AppendMany {
720                    values: values.clone(),
721                });
722            inner.backend.append_many(values);
723            (inner.backend.to_vec(), change)
724        };
725        let version = self.emitter.emit(snapshot);
726        if let Some(change) = change {
727            self.inner.lock().record(change, version);
728        }
729    }
730
731    /// Insert `value` at `index`. Returns `Err(IndexOutOfBounds)` if
732    /// `index > self.size()`.
733    pub fn insert(&self, index: usize, value: T) -> Result<(), IndexOutOfBounds> {
734        let (snapshot, change) = {
735            let mut inner = self.inner.lock();
736            if index > inner.backend.size() {
737                return Err(IndexOutOfBounds);
738            }
739            let change = inner.mutation_log.is_some().then(|| ListChange::Insert {
740                index,
741                value: value.clone(),
742            });
743            inner.backend.insert(index, value);
744            (inner.backend.to_vec(), change)
745        };
746        let version = self.emitter.emit(snapshot);
747        if let Some(change) = change {
748            self.inner.lock().record(change, version);
749        }
750        Ok(())
751    }
752
753    /// Insert `values` starting at `index`. Returns `Err(IndexOutOfBounds)` if
754    /// `index > self.size()`.
755    pub fn insert_many(&self, index: usize, values: Vec<T>) -> Result<(), IndexOutOfBounds> {
756        if values.is_empty() {
757            return Ok(());
758        }
759        let (snapshot, change) = {
760            let mut inner = self.inner.lock();
761            if index > inner.backend.size() {
762                return Err(IndexOutOfBounds);
763            }
764            let change = inner
765                .mutation_log
766                .is_some()
767                .then(|| ListChange::InsertMany {
768                    index,
769                    values: values.clone(),
770                });
771            inner.backend.insert_many(index, values);
772            (inner.backend.to_vec(), change)
773        };
774        let version = self.emitter.emit(snapshot);
775        if let Some(change) = change {
776            self.inner.lock().record(change, version);
777        }
778        Ok(())
779    }
780
781    /// Remove and return element at `index`. Negative indices count from end.
782    /// Returns `None` if index is out of range.
783    pub fn pop(&self, index: i64) -> Option<T> {
784        let (value, snapshot, change) = {
785            let mut inner = self.inner.lock();
786            let value = inner.backend.pop(index)?;
787            let change = inner.mutation_log.is_some().then(|| ListChange::Pop {
788                index,
789                value: value.clone(),
790            });
791            let snapshot = inner.backend.to_vec();
792            (value, snapshot, change)
793        };
794        let version = self.emitter.emit(snapshot);
795        if let Some(change) = change {
796            self.inner.lock().record(change, version);
797        }
798        Some(value)
799    }
800
801    pub fn clear(&self) {
802        let (snapshot, count) = {
803            let mut inner = self.inner.lock();
804            let count = inner.backend.clear();
805            if count == 0 {
806                return;
807            }
808            (inner.backend.to_vec(), count)
809        };
810        let version = self.emitter.emit(snapshot);
811        self.inner
812            .lock()
813            .record(ListChange::Clear { count }, version);
814    }
815
816    #[must_use]
817    pub fn to_vec(&self) -> Vec<T> {
818        self.inner.lock().backend.to_vec()
819    }
820
821    #[must_use]
822    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<ListChange<T>>>> {
823        self.inner.lock().mutation_log.clone()
824    }
825}
826
827// ---------------------------------------------------------------------------
828// ReactiveMap
829// ---------------------------------------------------------------------------
830
831/// Reactive key-value map.
832///
833/// Emits `Vec<(K, V)>` snapshots via a Core state node on every mutation.
834/// (Vec of pairs rather than HashMap for serialization stability.)
835pub struct ReactiveMap<K, V>
836where
837    K: Clone + Eq + Hash + Send + Sync + 'static,
838    V: Clone + Send + Sync + 'static,
839{
840    inner: Mutex<MapInner<K, V>>,
841    emitter: EmitHandle<Vec<(K, V)>>,
842    pub node_id: NodeId,
843}
844
845/// Score-based retention policy for [`ReactiveMap`].
846///
847/// After every mutation, entries are scored and those below `archive_threshold`
848/// (or exceeding `max_size` by ascending score) are archived (deleted).
849/// Mutually exclusive with top-level `max_size` (LRU).
850pub struct RetentionPolicy<K, V>
851where
852    K: Clone + Eq + Hash + Send + Sync + 'static,
853    V: Clone + Send + Sync + 'static,
854{
855    /// Score function — higher is kept.
856    pub score: Arc<dyn Fn(&K, &V) -> f64 + Send + Sync>,
857    /// Entries scoring below this threshold are archived.
858    pub archive_threshold: Option<f64>,
859    /// Maximum number of live entries. Lowest-scored entries archived first.
860    pub max_size: Option<usize>,
861    /// Called for each archived entry.
862    pub on_archive: Option<Arc<dyn Fn(&K, &V, f64) + Send + Sync>>,
863}
864
865struct MapInner<K, V>
866where
867    K: Clone + Eq + Hash + Send + Sync + 'static,
868    V: Clone + Send + Sync + 'static,
869{
870    backend: Box<dyn MapBackend<K, V>>,
871    mutation_log: Option<Vec<BaseChange<MapChange<K, V>>>>,
872    structure_name: String,
873    /// Per-key expiry timestamps (monotonic ns). Present only when TTL is enabled.
874    ttl_expiry: HashMap<K, u64>,
875    /// Default TTL in nanoseconds. None = no TTL.
876    default_ttl_ns: Option<u64>,
877    /// LRU access order (most recently used at end). Present only when LRU is enabled.
878    lru_order: Vec<K>,
879    /// Maximum entries for LRU eviction. None = no LRU cap.
880    lru_max_size: Option<usize>,
881    /// Score-based retention policy.
882    retention: Option<RetentionPolicy<K, V>>,
883}
884
885impl<K, V> MapInner<K, V>
886where
887    K: Clone + Eq + Hash + Send + Sync + 'static,
888    V: Clone + Send + Sync + 'static,
889{
890    fn record(&mut self, change: MapChange<K, V>, version: Version) {
891        if let Some(log) = &mut self.mutation_log {
892            log.push(BaseChange {
893                structure: self.structure_name.clone(),
894                version,
895                t_ns: wall_clock_ns(),
896                seq: None,
897                lifecycle: Lifecycle::Data,
898                change,
899            });
900        }
901    }
902
903    /// Prune expired keys. Returns deleted (key, previous_value) pairs.
904    fn prune_expired_inner(&mut self) -> Vec<(K, V)> {
905        if self.ttl_expiry.is_empty() {
906            return vec![];
907        }
908        let now = monotonic_ns();
909        let expired_keys: Vec<K> = self
910            .ttl_expiry
911            .iter()
912            .filter(|(_, &exp)| now >= exp)
913            .map(|(k, _)| k.clone())
914            .collect();
915        let mut expired = Vec::new();
916        for k in expired_keys {
917            if let Some(prev) = self.backend.get(&k) {
918                self.backend.delete(&k);
919                self.ttl_expiry.remove(&k);
920                self.lru_remove(&k);
921                expired.push((k, prev));
922            }
923        }
924        expired
925    }
926
927    /// Apply retention policy. Returns archived keys with scores.
928    fn apply_retention_inner(&mut self) -> Vec<(K, V, f64)> {
929        // Extract retention config values to avoid borrow conflict.
930        let (score_fn, archive_threshold, max_size) = match &self.retention {
931            Some(r) => (Arc::clone(&r.score), r.archive_threshold, r.max_size),
932            None => return vec![],
933        };
934        let entries = self.backend.to_vec();
935        if entries.is_empty() {
936            return vec![];
937        }
938        let mut scored: Vec<(K, V, f64)> = entries
939            .into_iter()
940            .map(|(k, v)| {
941                let s = (score_fn)(&k, &v);
942                (k, v, s)
943            })
944            .collect();
945        scored.sort_by(|a, b| a.2.total_cmp(&b.2));
946
947        let mut archived = Vec::new();
948        if let Some(threshold) = archive_threshold {
949            while let Some(entry) = scored.first() {
950                if entry.2 < threshold {
951                    let (k, v, s) = scored.remove(0);
952                    self.backend.delete(&k);
953                    self.ttl_expiry.remove(&k);
954                    self.lru_remove(&k);
955                    archived.push((k, v, s));
956                } else {
957                    break;
958                }
959            }
960        }
961        if let Some(max) = max_size {
962            while scored.len() > max {
963                let (k, v, s) = scored.remove(0);
964                self.backend.delete(&k);
965                self.ttl_expiry.remove(&k);
966                self.lru_remove(&k);
967                archived.push((k, v, s));
968            }
969        }
970        archived
971    }
972
973    /// Touch key in LRU order (move to end). Does NOT emit.
974    fn lru_touch(&mut self, key: &K) {
975        if self.lru_max_size.is_none() {
976            return;
977        }
978        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
979            self.lru_order.remove(pos);
980            self.lru_order.push(key.clone());
981        }
982    }
983
984    /// Remove key from LRU order.
985    fn lru_remove(&mut self, key: &K) {
986        if self.lru_max_size.is_none() {
987            return;
988        }
989        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
990            self.lru_order.remove(pos);
991        }
992    }
993
994    /// Evict LRU entries exceeding max_size. Returns evicted (key, previous_value) pairs.
995    fn lru_evict(&mut self) -> Vec<(K, V)> {
996        let Some(max) = self.lru_max_size else {
997            return vec![];
998        };
999        let mut evicted = Vec::new();
1000        while self.backend.size() > max && !self.lru_order.is_empty() {
1001            let victim = self.lru_order.remove(0);
1002            if let Some(prev) = self.backend.get(&victim) {
1003                self.backend.delete(&victim);
1004                self.ttl_expiry.remove(&victim);
1005                evicted.push((victim, prev));
1006            }
1007        }
1008        evicted
1009    }
1010
1011    /// Record TTL for a key. Per-call `ttl` (seconds) overrides `default_ttl_ns`.
1012    fn set_ttl_with(&mut self, key: &K, ttl: Option<f64>) {
1013        let ttl_ns = match ttl {
1014            Some(secs) => Some((secs * 1_000_000_000.0) as u64),
1015            None => self.default_ttl_ns,
1016        };
1017        if let Some(ns) = ttl_ns {
1018            self.ttl_expiry.insert(key.clone(), monotonic_ns() + ns);
1019        }
1020    }
1021}
1022
1023/// Options for constructing a [`ReactiveMap`].
1024pub struct ReactiveMapOptions<K, V>
1025where
1026    K: Clone + Eq + Hash + Send + Sync + 'static,
1027    V: Clone + Send + Sync + 'static,
1028{
1029    pub name: String,
1030    pub backend: Option<Box<dyn MapBackend<K, V>>>,
1031    pub mutation_log: bool,
1032    /// Default TTL in seconds. Applied to all `set`/`set_many` calls.
1033    /// Must be > 0 if set.
1034    pub default_ttl: Option<f64>,
1035    /// LRU cap. Mutually exclusive with `retention`.
1036    pub max_size: Option<usize>,
1037    /// Score-based retention policy. Mutually exclusive with `max_size`.
1038    pub retention: Option<RetentionPolicy<K, V>>,
1039}
1040
1041impl<K, V> Default for ReactiveMapOptions<K, V>
1042where
1043    K: Clone + Eq + Hash + Send + Sync + 'static,
1044    V: Clone + Send + Sync + 'static,
1045{
1046    fn default() -> Self {
1047        Self {
1048            name: "reactiveMap".into(),
1049            backend: None,
1050            mutation_log: false,
1051            default_ttl: None,
1052            max_size: None,
1053            retention: None,
1054        }
1055    }
1056}
1057
1058/// Configuration validation error for [`ReactiveMap`].
1059#[derive(Debug, Clone, PartialEq, Eq)]
1060pub struct MapConfigError(pub String);
1061
1062impl std::fmt::Display for MapConfigError {
1063    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1064        f.write_str(&self.0)
1065    }
1066}
1067
1068impl std::error::Error for MapConfigError {}
1069
1070impl<K, V> ReactiveMap<K, V>
1071where
1072    K: Clone + Eq + Hash + Send + Sync + 'static,
1073    V: Clone + Send + Sync + 'static,
1074{
1075    /// Create a new reactive map.
1076    ///
1077    /// # Errors
1078    /// Returns `MapConfigError` if both `max_size` (LRU) and `retention` are set.
1079    pub fn new(
1080        core: &Core,
1081        intern: InternFn<Vec<(K, V)>>,
1082        opts: ReactiveMapOptions<K, V>,
1083    ) -> Result<Self, MapConfigError> {
1084        if opts.max_size.is_some() && opts.retention.is_some() {
1085            return Err(MapConfigError(
1086                "max_size (LRU) and retention are mutually exclusive".into(),
1087            ));
1088        }
1089        if let Some(ref r) = opts.retention {
1090            if r.archive_threshold.is_none() && r.max_size.is_none() {
1091                return Err(MapConfigError(
1092                    "retention requires at least one of archive_threshold or max_size".into(),
1093                ));
1094            }
1095        }
1096        if let Some(ttl) = opts.default_ttl {
1097            if ttl <= 0.0 {
1098                return Err(MapConfigError("default_ttl must be > 0".into()));
1099            }
1100        }
1101        let node_id = core
1102            .register_state(HandleId::new(0), false)
1103            .expect("register_state for ReactiveMap");
1104        let backend: Box<dyn MapBackend<K, V>> = opts
1105            .backend
1106            .unwrap_or_else(|| Box::new(HashMapBackend::new()));
1107        let mutation_log = if opts.mutation_log {
1108            Some(Vec::new())
1109        } else {
1110            None
1111        };
1112        let default_ttl_ns = opts.default_ttl.map(|secs| (secs * 1_000_000_000.0) as u64);
1113        let inner = MapInner {
1114            backend,
1115            mutation_log,
1116            structure_name: opts.name,
1117            ttl_expiry: HashMap::new(),
1118            default_ttl_ns,
1119            lru_order: Vec::new(),
1120            lru_max_size: opts.max_size,
1121            retention: opts.retention,
1122        };
1123        Ok(Self {
1124            inner: Mutex::new(inner),
1125            emitter: EmitHandle {
1126                core: core.weak_handle(),
1127                node_id,
1128                intern,
1129                version: AtomicU64::new(0),
1130            },
1131            node_id,
1132        })
1133    }
1134
1135    #[must_use]
1136    pub fn size(&self) -> usize {
1137        self.inner.lock().backend.size()
1138    }
1139
1140    /// Check if key exists. Expired keys are pruned (observable side-effect).
1141    /// LRU touch: live key is marked as most-recently-used (no emission).
1142    pub fn has(&self, key: &K) -> bool {
1143        let (has, expired) = {
1144            let mut inner = self.inner.lock();
1145            let mut target_expired = false;
1146            // Check TTL expiry first.
1147            if inner.default_ttl_ns.is_some() {
1148                if let Some(&exp) = inner.ttl_expiry.get(key) {
1149                    if monotonic_ns() >= exp {
1150                        target_expired = true;
1151                    }
1152                }
1153            }
1154            // Prune all expired keys (including the target if expired).
1155            let mut expired = inner.prune_expired_inner();
1156            if target_expired && !expired.iter().any(|(k, _)| k == key) {
1157                // Target was expired but prune_expired_inner didn't catch it
1158                // (race with monotonic_ns). Delete it explicitly.
1159                if let Some(prev) = inner.backend.get(key) {
1160                    inner.backend.delete(key);
1161                    inner.ttl_expiry.remove(key);
1162                    inner.lru_remove(key);
1163                    expired.push((key.clone(), prev));
1164                }
1165            }
1166            let has = if target_expired {
1167                false
1168            } else {
1169                let h = inner.backend.has(key);
1170                if h {
1171                    inner.lru_touch(key);
1172                }
1173                h
1174            };
1175            (has, expired)
1176        };
1177        if !expired.is_empty() {
1178            let snapshot = self.inner.lock().backend.to_vec();
1179            let version = self.emitter.emit(snapshot);
1180            let mut inner = self.inner.lock();
1181            for (k, prev) in expired {
1182                inner.record(
1183                    MapChange::Delete {
1184                        key: k,
1185                        previous: prev,
1186                        reason: DeleteReason::Expired,
1187                    },
1188                    version.clone(),
1189                );
1190            }
1191        }
1192        has
1193    }
1194
1195    /// Get value by key. Expired keys return `None` (observable side-effect).
1196    /// LRU touch: live key is marked as most-recently-used (no emission).
1197    pub fn get(&self, key: &K) -> Option<V> {
1198        let (value, expired) = {
1199            let mut inner = self.inner.lock();
1200            let mut target_expired = false;
1201            if inner.default_ttl_ns.is_some() {
1202                if let Some(&exp) = inner.ttl_expiry.get(key) {
1203                    if monotonic_ns() >= exp {
1204                        target_expired = true;
1205                    }
1206                }
1207            }
1208            // Prune all expired keys (including the target if expired).
1209            let mut expired = inner.prune_expired_inner();
1210            if target_expired && !expired.iter().any(|(k, _)| k == key) {
1211                // Target was expired but prune_expired_inner didn't catch it.
1212                if let Some(prev) = inner.backend.get(key) {
1213                    inner.backend.delete(key);
1214                    inner.ttl_expiry.remove(key);
1215                    inner.lru_remove(key);
1216                    expired.push((key.clone(), prev));
1217                }
1218            }
1219            let value = if target_expired {
1220                None
1221            } else {
1222                let v = inner.backend.get(key);
1223                if v.is_some() {
1224                    inner.lru_touch(key);
1225                }
1226                v
1227            };
1228            (value, expired)
1229        };
1230        if !expired.is_empty() {
1231            let snapshot = self.inner.lock().backend.to_vec();
1232            let version = self.emitter.emit(snapshot);
1233            let mut inner = self.inner.lock();
1234            for (k, prev) in expired {
1235                inner.record(
1236                    MapChange::Delete {
1237                        key: k,
1238                        previous: prev,
1239                        reason: DeleteReason::Expired,
1240                    },
1241                    version.clone(),
1242                );
1243            }
1244        }
1245        value
1246    }
1247
1248    pub fn set(&self, key: K, value: V) {
1249        self.set_with_ttl(key, value, None);
1250    }
1251
1252    /// Set a key with an optional per-call TTL override (seconds).
1253    ///
1254    /// # Panics
1255    /// Panics if `ttl` is `Some` with a non-positive or non-finite value.
1256    pub fn set_with_ttl(&self, key: K, value: V, ttl: Option<f64>) {
1257        if let Some(t) = ttl {
1258            assert!(
1259                t > 0.0 && t.is_finite(),
1260                "per-call ttl must be positive and finite"
1261            );
1262        }
1263        let (snapshot, change, eviction_changes) = {
1264            let mut inner = self.inner.lock();
1265            let expired = inner.prune_expired_inner();
1266            let change = inner.mutation_log.is_some().then(|| MapChange::Set {
1267                key: key.clone(),
1268                value: value.clone(),
1269            });
1270            inner.set_ttl_with(&key, ttl);
1271            inner.lru_remove(&key);
1272            if inner.lru_max_size.is_some() {
1273                inner.lru_order.push(key.clone());
1274            }
1275            inner.backend.set(key, value);
1276            let evicted = inner.lru_evict();
1277            let archived = inner.apply_retention_inner();
1278            let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1279            for (k, prev) in expired {
1280                eviction_changes.push((k, prev, DeleteReason::Expired));
1281            }
1282            for (k, prev) in evicted {
1283                eviction_changes.push((k, prev, DeleteReason::LruEvict));
1284            }
1285            for (k, v, s) in &archived {
1286                if let Some(on_archive) =
1287                    &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1288                {
1289                    on_archive(k, v, *s);
1290                }
1291                eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1292            }
1293            (inner.backend.to_vec(), change, eviction_changes)
1294        };
1295        let version = self.emitter.emit(snapshot);
1296        if change.is_some() || !eviction_changes.is_empty() {
1297            let mut inner = self.inner.lock();
1298            if let Some(change) = change {
1299                inner.record(change, version.clone());
1300            }
1301            for (k, prev, reason) in eviction_changes {
1302                inner.record(
1303                    MapChange::Delete {
1304                        key: k,
1305                        previous: prev,
1306                        reason,
1307                    },
1308                    version.clone(),
1309                );
1310            }
1311        }
1312    }
1313
1314    pub fn set_many(&self, entries: Vec<(K, V)>) {
1315        self.set_many_with_ttl(entries, None);
1316    }
1317
1318    /// Batch set with optional per-call TTL override (seconds).
1319    ///
1320    /// # Panics
1321    /// Panics if `ttl` is `Some` with a non-positive or non-finite value.
1322    pub fn set_many_with_ttl(&self, entries: Vec<(K, V)>, ttl: Option<f64>) {
1323        if let Some(t) = ttl {
1324            assert!(
1325                t > 0.0 && t.is_finite(),
1326                "per-call ttl must be positive and finite"
1327            );
1328        }
1329        if entries.is_empty() {
1330            return;
1331        }
1332        let (snapshot, changes, eviction_changes) = {
1333            let mut inner = self.inner.lock();
1334            let expired = inner.prune_expired_inner();
1335            let changes: Option<Vec<MapChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1336                entries
1337                    .iter()
1338                    .map(|(k, v)| MapChange::Set {
1339                        key: k.clone(),
1340                        value: v.clone(),
1341                    })
1342                    .collect()
1343            });
1344            for (k, _) in &entries {
1345                inner.set_ttl_with(k, ttl);
1346                inner.lru_remove(k);
1347                if inner.lru_max_size.is_some() {
1348                    inner.lru_order.push(k.clone());
1349                }
1350            }
1351            inner.backend.set_many(entries);
1352            let evicted = inner.lru_evict();
1353            let archived = inner.apply_retention_inner();
1354            let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1355            for (k, prev) in expired {
1356                eviction_changes.push((k, prev, DeleteReason::Expired));
1357            }
1358            for (k, prev) in evicted {
1359                eviction_changes.push((k, prev, DeleteReason::LruEvict));
1360            }
1361            for (k, v, s) in &archived {
1362                if let Some(on_archive) =
1363                    &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1364                {
1365                    on_archive(k, v, *s);
1366                }
1367                eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1368            }
1369            (inner.backend.to_vec(), changes, eviction_changes)
1370        };
1371        let version = self.emitter.emit(snapshot);
1372        if changes.is_some() || !eviction_changes.is_empty() {
1373            let mut inner = self.inner.lock();
1374            if let Some(changes) = changes {
1375                for change in changes {
1376                    inner.record(change, version.clone());
1377                }
1378            }
1379            for (k, prev, reason) in eviction_changes {
1380                inner.record(
1381                    MapChange::Delete {
1382                        key: k,
1383                        previous: prev,
1384                        reason,
1385                    },
1386                    version.clone(),
1387                );
1388            }
1389        }
1390    }
1391
1392    pub fn delete(&self, key: &K) {
1393        let (snapshot, previous) = {
1394            let mut inner = self.inner.lock();
1395            let previous = inner.backend.get(key);
1396            if !inner.backend.delete(key) {
1397                return;
1398            }
1399            inner.ttl_expiry.remove(key);
1400            inner.lru_remove(key);
1401            (inner.backend.to_vec(), previous)
1402        };
1403        let version = self.emitter.emit(snapshot);
1404        if let Some(prev) = previous {
1405            self.inner.lock().record(
1406                MapChange::Delete {
1407                    key: key.clone(),
1408                    previous: prev,
1409                    reason: DeleteReason::Explicit,
1410                },
1411                version,
1412            );
1413        }
1414    }
1415
1416    pub fn delete_many(&self, keys: &[K]) {
1417        let (snapshot, actually_deleted) = {
1418            let mut inner = self.inner.lock();
1419            let actually_deleted: Vec<(K, V)> = keys
1420                .iter()
1421                .filter_map(|k| inner.backend.get(k).map(|v| (k.clone(), v)))
1422                .collect();
1423            let removed = inner.backend.delete_many(keys);
1424            if removed == 0 {
1425                return;
1426            }
1427            for k in keys {
1428                inner.ttl_expiry.remove(k);
1429                inner.lru_remove(k);
1430            }
1431            (inner.backend.to_vec(), actually_deleted)
1432        };
1433        let version = self.emitter.emit(snapshot);
1434        if !actually_deleted.is_empty() {
1435            let mut inner = self.inner.lock();
1436            for (k, prev) in actually_deleted {
1437                inner.record(
1438                    MapChange::Delete {
1439                        key: k,
1440                        previous: prev,
1441                        reason: DeleteReason::Explicit,
1442                    },
1443                    version.clone(),
1444                );
1445            }
1446        }
1447    }
1448
1449    pub fn clear(&self) {
1450        let (snapshot, count) = {
1451            let mut inner = self.inner.lock();
1452            let count = inner.backend.clear();
1453            if count == 0 {
1454                return;
1455            }
1456            inner.ttl_expiry.clear();
1457            inner.lru_order.clear();
1458            (inner.backend.to_vec(), count)
1459        };
1460        let version = self.emitter.emit(snapshot);
1461        self.inner
1462            .lock()
1463            .record(MapChange::Clear { count }, version);
1464    }
1465
1466    /// Explicitly prune all expired keys. Returns the number of keys removed.
1467    pub fn prune_expired(&self) -> usize {
1468        let expired = {
1469            let mut inner = self.inner.lock();
1470            inner.prune_expired_inner()
1471        };
1472        if expired.is_empty() {
1473            return 0;
1474        }
1475        let count = expired.len();
1476        let snapshot = self.inner.lock().backend.to_vec();
1477        let version = self.emitter.emit(snapshot);
1478        let mut inner = self.inner.lock();
1479        for (k, prev) in expired {
1480            inner.record(
1481                MapChange::Delete {
1482                    key: k,
1483                    previous: prev,
1484                    reason: DeleteReason::Expired,
1485                },
1486                version.clone(),
1487            );
1488        }
1489        count
1490    }
1491
1492    #[must_use]
1493    pub fn to_vec(&self) -> Vec<(K, V)> {
1494        self.inner.lock().backend.to_vec()
1495    }
1496
1497    #[must_use]
1498    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<MapChange<K, V>>>> {
1499        self.inner.lock().mutation_log.clone()
1500    }
1501}
1502
1503// ---------------------------------------------------------------------------
1504// ReactiveIndex
1505// ---------------------------------------------------------------------------
1506
1507/// Reactive sorted index with primary key lookup and secondary sort order.
1508///
1509/// Emits `Vec<IndexRow<K, V>>` snapshots via a Core state node on every mutation.
1510pub struct ReactiveIndex<K, V>
1511where
1512    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1513    V: Clone + Send + Sync + 'static,
1514{
1515    inner: Mutex<IndexInner<K, V>>,
1516    emitter: EmitHandle<Vec<IndexRow<K, V>>>,
1517    pub node_id: NodeId,
1518}
1519
1520/// User-supplied equality function for upsert idempotency.
1521pub type IndexEqualsFn<K, V> = Arc<dyn Fn(&IndexRow<K, V>, &IndexRow<K, V>) -> bool + Send + Sync>;
1522
1523struct IndexInner<K, V>
1524where
1525    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1526    V: Clone + Send + Sync + 'static,
1527{
1528    backend: Box<dyn IndexBackend<K, V>>,
1529    mutation_log: Option<Vec<BaseChange<IndexChange<K, V>>>>,
1530    structure_name: String,
1531    /// Factory-level equals for upsert idempotency. When set, `upsert` on an
1532    /// existing key compares the existing row with the proposed row; if
1533    /// `equals(existing, proposed)` returns `true`, the upsert is a no-op
1534    /// (no version bump, no emission).
1535    equals: Option<IndexEqualsFn<K, V>>,
1536}
1537
1538impl<K, V> IndexInner<K, V>
1539where
1540    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1541    V: Clone + Send + Sync + 'static,
1542{
1543    fn record(&mut self, change: IndexChange<K, V>, version: Version) {
1544        if let Some(log) = &mut self.mutation_log {
1545            log.push(BaseChange {
1546                structure: self.structure_name.clone(),
1547                version,
1548                t_ns: wall_clock_ns(),
1549                seq: None,
1550                lifecycle: Lifecycle::Data,
1551                change,
1552            });
1553        }
1554    }
1555}
1556
1557/// Per-call upsert options for [`ReactiveIndex::upsert`].
1558pub struct UpsertOptions<K, V>
1559where
1560    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1561    V: Clone + Send + Sync + 'static,
1562{
1563    /// Per-call equals override. Takes precedence over the factory-level equals.
1564    pub equals: Option<IndexEqualsFn<K, V>>,
1565}
1566
1567impl<K, V> Default for UpsertOptions<K, V>
1568where
1569    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1570    V: Clone + Send + Sync + 'static,
1571{
1572    fn default() -> Self {
1573        Self { equals: None }
1574    }
1575}
1576
1577/// Options for constructing a [`ReactiveIndex`].
1578pub struct ReactiveIndexOptions<K, V>
1579where
1580    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1581    V: Clone + Send + Sync + 'static,
1582{
1583    pub name: String,
1584    pub backend: Option<Box<dyn IndexBackend<K, V>>>,
1585    pub mutation_log: bool,
1586    /// Factory-level equals for upsert idempotency.
1587    pub equals: Option<IndexEqualsFn<K, V>>,
1588}
1589
1590impl<K, V> Default for ReactiveIndexOptions<K, V>
1591where
1592    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1593    V: Clone + Send + Sync + 'static,
1594{
1595    fn default() -> Self {
1596        Self {
1597            name: "reactiveIndex".into(),
1598            backend: None,
1599            mutation_log: false,
1600            equals: None,
1601        }
1602    }
1603}
1604
1605impl<K, V> ReactiveIndex<K, V>
1606where
1607    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1608    V: Clone + Send + Sync + 'static,
1609{
1610    #[must_use]
1611    pub fn new(
1612        core: &Core,
1613        intern: InternFn<Vec<IndexRow<K, V>>>,
1614        opts: ReactiveIndexOptions<K, V>,
1615    ) -> Self {
1616        let node_id = core
1617            .register_state(HandleId::new(0), false)
1618            .expect("register_state for ReactiveIndex");
1619        let backend: Box<dyn IndexBackend<K, V>> = opts
1620            .backend
1621            .unwrap_or_else(|| Box::new(VecIndexBackend::new()));
1622        let mutation_log = if opts.mutation_log {
1623            Some(Vec::new())
1624        } else {
1625            None
1626        };
1627        let inner = IndexInner {
1628            backend,
1629            mutation_log,
1630            structure_name: opts.name,
1631            equals: opts.equals,
1632        };
1633        Self {
1634            inner: Mutex::new(inner),
1635            emitter: EmitHandle {
1636                core: core.weak_handle(),
1637                node_id,
1638                intern,
1639                version: AtomicU64::new(0),
1640            },
1641            node_id,
1642        }
1643    }
1644
1645    #[must_use]
1646    pub fn size(&self) -> usize {
1647        self.inner.lock().backend.size()
1648    }
1649
1650    #[must_use]
1651    pub fn has(&self, primary: &K) -> bool {
1652        self.inner.lock().backend.has(primary)
1653    }
1654
1655    #[must_use]
1656    pub fn get(&self, primary: &K) -> Option<V> {
1657        self.inner.lock().backend.get(primary)
1658    }
1659
1660    /// Insert or update a row. Returns `true` if inserted (new primary key),
1661    /// `false` if updated or skipped by equals idempotency.
1662    pub fn upsert(&self, primary: K, secondary: String, value: V) -> bool {
1663        self.upsert_with(primary, secondary, value, &UpsertOptions::default())
1664    }
1665
1666    /// Insert or update a row with per-call options.
1667    ///
1668    /// If `opts.equals` (or the factory-level equals) returns `true` for the
1669    /// existing row vs the proposed row, the upsert is a no-op (no emission).
1670    pub fn upsert_with(
1671        &self,
1672        primary: K,
1673        secondary: String,
1674        value: V,
1675        opts: &UpsertOptions<K, V>,
1676    ) -> bool {
1677        let (is_new, snapshot, change) = {
1678            let mut inner = self.inner.lock();
1679            // Check equals idempotency: per-call overrides factory-level.
1680            let eq_fn = opts.equals.as_ref().or(inner.equals.as_ref());
1681            if let Some(eq) = eq_fn {
1682                if let Some(existing_row) = inner.backend.get_row(&primary) {
1683                    let proposed = IndexRow {
1684                        primary: primary.clone(),
1685                        secondary: secondary.clone(),
1686                        value: value.clone(),
1687                    };
1688                    if eq(&existing_row, &proposed) {
1689                        return false;
1690                    }
1691                }
1692            }
1693            let change = inner.mutation_log.is_some().then(|| IndexChange::Upsert {
1694                primary: primary.clone(),
1695                secondary: secondary.clone(),
1696                value: value.clone(),
1697            });
1698            let is_new = inner.backend.upsert(primary, secondary, value);
1699            (is_new, inner.backend.to_ordered(), change)
1700        };
1701        let version = self.emitter.emit(snapshot);
1702        if let Some(change) = change {
1703            self.inner.lock().record(change, version);
1704        }
1705        is_new
1706    }
1707
1708    /// Batch upsert. Rows matching the factory-level equals are skipped.
1709    /// If ALL rows are skipped, no emission occurs.
1710    pub fn upsert_many(&self, rows: Vec<(K, String, V)>) {
1711        if rows.is_empty() {
1712            return;
1713        }
1714        let (snapshot, changes) = {
1715            let mut inner = self.inner.lock();
1716            // Filter rows through equals idempotency.
1717            let effective_rows: Vec<(K, String, V)> = if let Some(eq) = &inner.equals {
1718                rows.into_iter()
1719                    .filter(|(pk, sec, val)| {
1720                        if let Some(existing) = inner.backend.get_row(pk) {
1721                            let proposed = IndexRow {
1722                                primary: pk.clone(),
1723                                secondary: sec.clone(),
1724                                value: val.clone(),
1725                            };
1726                            !eq(&existing, &proposed)
1727                        } else {
1728                            true
1729                        }
1730                    })
1731                    .collect()
1732            } else {
1733                rows
1734            };
1735            if effective_rows.is_empty() {
1736                return;
1737            }
1738            let changes: Option<Vec<IndexChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1739                effective_rows
1740                    .iter()
1741                    .map(|(k, s, v)| IndexChange::Upsert {
1742                        primary: k.clone(),
1743                        secondary: s.clone(),
1744                        value: v.clone(),
1745                    })
1746                    .collect()
1747            });
1748            inner.backend.upsert_many(effective_rows);
1749            (inner.backend.to_ordered(), changes)
1750        };
1751        let version = self.emitter.emit(snapshot);
1752        if let Some(changes) = changes {
1753            let mut inner = self.inner.lock();
1754            for change in changes {
1755                inner.record(change, version.clone());
1756            }
1757        }
1758    }
1759
1760    pub fn delete(&self, primary: &K) {
1761        let snapshot = {
1762            let mut inner = self.inner.lock();
1763            if !inner.backend.delete(primary) {
1764                return;
1765            }
1766            inner.backend.to_ordered()
1767        };
1768        let version = self.emitter.emit(snapshot);
1769        self.inner.lock().record(
1770            IndexChange::Delete {
1771                primary: primary.clone(),
1772            },
1773            version,
1774        );
1775    }
1776
1777    pub fn delete_many(&self, primaries: &[K]) {
1778        let (snapshot, actually_deleted) = {
1779            let mut inner = self.inner.lock();
1780            // Pre-filter to keys that actually exist (P4 fix).
1781            let actually_deleted: Vec<K> = if inner.mutation_log.is_some() {
1782                primaries
1783                    .iter()
1784                    .filter(|k| inner.backend.has(k))
1785                    .cloned()
1786                    .collect()
1787            } else {
1788                vec![]
1789            };
1790            let removed = inner.backend.delete_many(primaries);
1791            if removed == 0 {
1792                return;
1793            }
1794            (inner.backend.to_ordered(), actually_deleted)
1795        };
1796        let version = self.emitter.emit(snapshot);
1797        if !actually_deleted.is_empty() {
1798            self.inner.lock().record(
1799                IndexChange::DeleteMany {
1800                    primaries: actually_deleted,
1801                },
1802                version,
1803            );
1804        }
1805    }
1806
1807    pub fn clear(&self) {
1808        let (snapshot, count) = {
1809            let mut inner = self.inner.lock();
1810            let count = inner.backend.clear();
1811            if count == 0 {
1812                return;
1813            }
1814            (inner.backend.to_ordered(), count)
1815        };
1816        let version = self.emitter.emit(snapshot);
1817        self.inner
1818            .lock()
1819            .record(IndexChange::Clear { count }, version);
1820    }
1821
1822    #[must_use]
1823    pub fn to_ordered(&self) -> Vec<IndexRow<K, V>> {
1824        self.inner.lock().backend.to_ordered()
1825    }
1826
1827    #[must_use]
1828    pub fn to_primary_map(&self) -> Vec<(K, V)> {
1829        self.inner.lock().backend.to_primary_map()
1830    }
1831
1832    /// Values of all rows whose **primary key** sorts within `[start, end)`
1833    /// (inclusive start, exclusive end), in **ascending primary-key order**
1834    /// (D205). `start >= end` or no matches → empty vec. The `K: Ord` bound
1835    /// is method-scoped so it does not constrain `ReactiveIndex<K, V>` itself.
1836    #[must_use]
1837    pub fn range_by_primary(&self, start: &K, end: &K) -> Vec<V>
1838    where
1839        K: Ord,
1840    {
1841        let mut rows: Vec<(K, V)> = self
1842            .inner
1843            .lock()
1844            .backend
1845            .to_primary_map()
1846            .into_iter()
1847            .filter(|(k, _)| k >= start && k < end)
1848            .collect();
1849        rows.sort_by(|a, b| a.0.cmp(&b.0));
1850        rows.into_iter().map(|(_, v)| v).collect()
1851    }
1852
1853    #[must_use]
1854    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<IndexChange<K, V>>>> {
1855        self.inner.lock().mutation_log.clone()
1856    }
1857}