Skip to main content

graphrefly_structures/
reactive.rs

1//! Reactive data structures — Core-level integration (M5.A — D177/D179).
2//!
3//! Each structure wraps a backend trait behind a `parking_lot::Mutex`, owns a
4//! state `NodeId` registered with Core, and emits DIRTY->DATA snapshots on
5//! every mutation. Structures are standalone — no Graph dependency required.
6//!
7//! **Lock discipline (P1 fix):** Mutation methods hold the inner lock only
8//! during backend mutation and snapshot collection. The lock is dropped before
9//! `Core::emit()` is called, so subscribers are free to read (but not mutate)
10//! the structure during message delivery. The `EmitHandle` lives outside the
11//! inner `Mutex` and uses `AtomicU64` for thread-safe version counting.
12
13use std::collections::HashMap;
14use std::hash::Hash;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use parking_lot::Mutex;
19
20use graphrefly_core::{
21    monotonic_ns, wall_clock_ns, Core, CoreMailbox, HandleId, Message, NodeId, SubscriptionId,
22};
23
24use crate::backend::{
25    HashMapBackend, IndexBackend, IndexRow, ListBackend, LogBackend, MapBackend, VecIndexBackend,
26    VecListBackend, VecLogBackend,
27};
28use crate::changeset::{
29    BaseChange, DeleteReason, IndexChange, Lifecycle, ListChange, LogChange, MapChange, Version,
30};
31
32// ---------------------------------------------------------------------------
33// Intern function type — user-supplied value->handle conversion
34// ---------------------------------------------------------------------------
35
36/// User-supplied function that converts a snapshot value `Vec<T>` (or similar)
37/// into a `HandleId` for Core emission. The binding owns the value registry;
38/// this closure captures `Arc<Binding>` and calls `binding.intern(snapshot)`.
39pub type InternFn<S> = Arc<dyn Fn(S) -> HandleId + Send + Sync>;
40
41// ---------------------------------------------------------------------------
42// EmitHandle — outside Mutex, thread-safe emission
43// ---------------------------------------------------------------------------
44
45/// Emission handle stored outside the inner Mutex. Uses `AtomicU64` for the
46/// version counter so no lock is needed during the emit.
47///
48/// D246 rule 6: structure mutation is owner-synchronous. The handle no
49/// longer holds the `Arc<CoreMailbox>`; the owner threads its `&Core`
50/// into each mutation method and `emit` calls `core.emit(..)` directly.
51struct EmitHandle<S> {
52    node_id: NodeId,
53    intern: InternFn<S>,
54    version: AtomicU64,
55}
56
57impl<S> EmitHandle<S> {
58    /// Bump version, intern snapshot, emit synchronously owner-side.
59    /// Returns the new version.
60    fn emit(&self, core: &Core, snapshot: S) -> Version {
61        let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
62        let handle = (self.intern)(snapshot);
63        core.emit(self.node_id, handle);
64        Version::Counter(ver)
65    }
66}
67
68/// In-wave sink emitter. Captured into long-lived `core.subscribe(..)`
69/// sink closures (`view`/`scan`) that fire IN-WAVE — re-entering Core
70/// synchronously from inside a wave is unsound, so it posts
71/// `MailboxOp::Emit` (deferred, drained owner-side / at the embedder
72/// pump). This is genuine deferred sink re-entry, correct under D246
73/// rule 6 (same mechanism `attach`/`attach_storage` use).
74struct SinkEmitter<S> {
75    mailbox: Arc<CoreMailbox>,
76    node_id: NodeId,
77    intern: InternFn<S>,
78    version: AtomicU64,
79}
80
81impl<S> SinkEmitter<S> {
82    /// Bump version, intern snapshot, post the deferred emit. Returns
83    /// the new version.
84    fn emit(&self, snapshot: S) -> Version {
85        let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
86        let handle = (self.intern)(snapshot);
87        // Core-gone (`false`) ⇒ the op is dropped unrun — identical to
88        // the pre-S2b `WeakCore::upgrade()` → `None` no-op (the
89        // structure is being torn down; the binding owns intern/release
90        // so there is no handle leak the old path didn't also have).
91        let _ = self.mailbox.post_emit(self.node_id, handle);
92        Version::Counter(ver)
93    }
94}
95
96// ---------------------------------------------------------------------------
97// IndexOutOfBounds error
98// ---------------------------------------------------------------------------
99
100/// Error returned when a list `insert` or `insert_many` index is out of bounds.
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub struct IndexOutOfBounds;
103
104impl std::fmt::Display for IndexOutOfBounds {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.write_str("index out of bounds")
107    }
108}
109
110impl std::error::Error for IndexOutOfBounds {}
111
112// ---------------------------------------------------------------------------
113// ReactiveLog
114// ---------------------------------------------------------------------------
115
116/// Reactive append-only log with optional ring-buffer cap.
117///
118/// Emits `Vec<T>` snapshots via a Core state node on every mutation.
119/// Optionally records `BaseChange<LogChange<T>>` deltas to a companion
120/// mutation log.
121pub struct ReactiveLog<T: Clone + Send + Sync + 'static> {
122    inner: Arc<Mutex<LogInner<T>>>,
123    emitter: EmitHandle<Vec<T>>,
124    /// The Core state node backing this structure.
125    pub node_id: NodeId,
126}
127
128struct LogInner<T: Clone + Send + Sync + 'static> {
129    backend: Box<dyn LogBackend<T>>,
130    mutation_log: Option<Vec<BaseChange<LogChange<T>>>>,
131    structure_name: String,
132}
133
134impl<T: Clone + Send + Sync + 'static> LogInner<T> {
135    fn record(&mut self, change: LogChange<T>, version: Version) {
136        if let Some(log) = &mut self.mutation_log {
137            log.push(BaseChange {
138                structure: self.structure_name.clone(),
139                version,
140                t_ns: wall_clock_ns(),
141                seq: None,
142                lifecycle: Lifecycle::Data,
143                change,
144            });
145        }
146    }
147}
148
149/// Options for constructing a [`ReactiveLog`].
150pub struct ReactiveLogOptions<T: Clone + Send + Sync + 'static> {
151    pub name: String,
152    pub max_size: Option<usize>,
153    pub backend: Option<Box<dyn LogBackend<T>>>,
154    pub mutation_log: bool,
155}
156
157impl<T: Clone + Send + Sync + 'static> Default for ReactiveLogOptions<T> {
158    fn default() -> Self {
159        Self {
160            name: "reactiveLog".into(),
161            max_size: None,
162            backend: None,
163            mutation_log: false,
164        }
165    }
166}
167
168impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
169    /// Create a new reactive log registered with the given Core.
170    ///
171    /// `intern` converts a `Vec<T>` snapshot to a `HandleId` for Core emission.
172    #[must_use]
173    pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveLogOptions<T>) -> Self {
174        let node_id = core
175            .register_state(HandleId::new(0), false)
176            .expect("register_state for ReactiveLog");
177        let backend: Box<dyn LogBackend<T>> = opts
178            .backend
179            .unwrap_or_else(|| Box::new(VecLogBackend::new(opts.max_size)));
180        let mutation_log = if opts.mutation_log {
181            Some(Vec::new())
182        } else {
183            None
184        };
185        let inner = LogInner {
186            backend,
187            mutation_log,
188            structure_name: opts.name,
189        };
190        Self {
191            inner: Arc::new(Mutex::new(inner)),
192            emitter: EmitHandle {
193                node_id,
194                intern,
195                version: AtomicU64::new(0),
196            },
197            node_id,
198        }
199    }
200
201    #[must_use]
202    pub fn size(&self) -> usize {
203        self.inner.lock().backend.size()
204    }
205
206    #[must_use]
207    pub fn at(&self, index: i64) -> Option<T> {
208        self.inner.lock().backend.at(index)
209    }
210
211    pub fn append(&self, core: &Core, value: T) {
212        let (snapshot, change) = {
213            let mut inner = self.inner.lock();
214            let change = inner.mutation_log.is_some().then(|| LogChange::Append {
215                value: value.clone(),
216            });
217            inner.backend.append(value);
218            (inner.backend.to_vec(), change)
219        };
220        let version = self.emitter.emit(core, snapshot);
221        if let Some(change) = change {
222            self.inner.lock().record(change, version);
223        }
224    }
225
226    pub fn append_many(&self, core: &Core, values: Vec<T>) {
227        if values.is_empty() {
228            return;
229        }
230        let (snapshot, change) = {
231            let mut inner = self.inner.lock();
232            let change = inner.mutation_log.is_some().then(|| LogChange::AppendMany {
233                values: values.clone(),
234            });
235            inner.backend.append_many(values);
236            (inner.backend.to_vec(), change)
237        };
238        let version = self.emitter.emit(core, snapshot);
239        if let Some(change) = change {
240            self.inner.lock().record(change, version);
241        }
242    }
243
244    pub fn clear(&self, core: &Core) {
245        let (snapshot, count) = {
246            let mut inner = self.inner.lock();
247            let count = inner.backend.clear();
248            if count == 0 {
249                return;
250            }
251            (inner.backend.to_vec(), count)
252        };
253        let version = self.emitter.emit(core, snapshot);
254        self.inner
255            .lock()
256            .record(LogChange::Clear { count }, version);
257    }
258
259    pub fn trim_head(&self, core: &Core, n: usize) {
260        if n == 0 {
261            return;
262        }
263        let (snapshot, actual) = {
264            let mut inner = self.inner.lock();
265            let actual = inner.backend.trim_head(n);
266            if actual == 0 {
267                return;
268            }
269            (inner.backend.to_vec(), actual)
270        };
271        let version = self.emitter.emit(core, snapshot);
272        self.inner
273            .lock()
274            .record(LogChange::TrimHead { n: actual }, version);
275    }
276
277    #[must_use]
278    pub fn to_vec(&self) -> Vec<T> {
279        self.inner.lock().backend.to_vec()
280    }
281
282    /// Snapshot the mutation log (if enabled). Returns `None` if mutation log
283    /// was not enabled at construction.
284    #[must_use]
285    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<LogChange<T>>>> {
286        self.inner.lock().mutation_log.clone()
287    }
288}
289
290// ---------------------------------------------------------------------------
291// ReactiveLog — views, scan, attach, attachStorage (M5.B)
292// ---------------------------------------------------------------------------
293
294/// View specification for [`ReactiveLog::view`].
295pub enum ViewSpec {
296    /// Last `n` entries.
297    Tail { n: usize },
298    /// Range `[start..stop)`. `stop` defaults to log length.
299    Slice { start: usize, stop: Option<usize> },
300    /// Entries from cursor position onward. The cursor node provides a `usize`
301    /// position via `read_cursor(handle) -> usize`.
302    FromCursor {
303        cursor_node: NodeId,
304        read_cursor: Arc<dyn Fn(HandleId) -> usize + Send + Sync>,
305    },
306}
307
308/// D246 rule 3: NO RAII unsubscribe below the binding. A subscription
309/// bundle that tracks its `(NodeId, SubscriptionId)` subs and is torn
310/// down by the **owner-invoked, synchronous** [`ReactiveSub::detach`] —
311/// not `Drop`. Carries no `&Core` (the `'c` lifetime is gone); the
312/// owner passes its `&Core` into `detach`.
313pub struct ReactiveSub {
314    subs: Vec<(NodeId, SubscriptionId)>,
315}
316
317impl ReactiveSub {
318    /// Synchronously unsubscribe every tracked sub. Owner-invoked
319    /// (D246 r3). Idempotent: subs are drained, so a second call is a
320    /// no-op.
321    pub fn detach(&mut self, core: &Core) {
322        for (node_id, sub_id) in self.subs.drain(..) {
323            core.unsubscribe(node_id, sub_id);
324        }
325    }
326}
327
328/// Handle to a log view. Call [`LogView::detach`] to dispose the view
329/// subscription (no RAII — D246 r3).
330pub struct LogView {
331    /// The state node backing this view. Subscribe to receive view snapshots.
332    pub node_id: NodeId,
333    sub: ReactiveSub,
334}
335
336impl LogView {
337    /// Synchronously dispose the view subscription. Owner-invoked.
338    pub fn detach(&mut self, core: &Core) {
339        self.sub.detach(core);
340    }
341}
342
343/// Handle to a log scan. Call [`ScanHandle::detach`] to dispose the
344/// scan subscription (no RAII — D246 r3).
345pub struct ScanHandle {
346    /// The state node backing this scan. Subscribe to receive accumulator snapshots.
347    pub node_id: NodeId,
348    sub: ReactiveSub,
349}
350
351impl ScanHandle {
352    /// Synchronously dispose the scan subscription. Owner-invoked.
353    pub fn detach(&mut self, core: &Core) {
354        self.sub.detach(core);
355    }
356}
357
358/// Trait for append-log storage sinks used by [`ReactiveLog::attach_storage`].
359///
360/// Implementors receive batches of entries and optionally support pre-loading.
361/// Methods return `Result` — errors are swallowed by `attach_storage` (matches
362/// TS try/catch semantics) so a failing tier doesn't break the reactive graph.
363pub trait AppendLogSink<T>: Send + Sync {
364    /// Append entries to persistent storage.
365    fn append_entries(&self, entries: &[T])
366        -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
367    /// Load previously stored entries (for pre-loading on startup).
368    /// Returns an empty vec if no entries are stored.
369    fn load_entries(&self) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>;
370}
371
372/// Handle to an attach-storage subscription. Call
373/// [`AttachStorageHandle::detach`] to dispose it (no RAII — D246 r3).
374pub struct AttachStorageHandle {
375    sub: ReactiveSub,
376}
377
378impl AttachStorageHandle {
379    /// Synchronously dispose the attach-storage subscription.
380    /// Owner-invoked.
381    pub fn detach(&mut self, core: &Core) {
382        self.sub.detach(core);
383    }
384}
385
386/// **D270 — `AttachOptions { skip_cached_replay }` (memo:Re P2 parity).**
387/// Mirrors TS `ReactiveLogBundle.attach(upstream, opts?)`: when
388/// `skip_cached_replay = true` AND the upstream has a cached value
389/// (`Core::cache_of(upstream)` non-sentinel), drop the FIRST DATA-
390/// bearing batch the attach sink receives — i.e. the subscribe-
391/// handshake replay. Subsequent live emissions still land. A cold
392/// upstream's first live emit is NOT dropped (the `cache_of` check
393/// gates the suppression). The flag has no effect when the upstream's
394/// cache is sentinel (no replay to skip).
395#[derive(Debug, Clone, Copy, Default)]
396pub struct AttachOptions {
397    pub skip_cached_replay: bool,
398}
399
400impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
401    /// Create a reactive view of this log. Returns a [`LogView`] whose
402    /// `node_id` emits `Vec<T>` snapshots on every log mutation.
403    ///
404    /// `intern` converts the view `Vec<T>` into a `HandleId` for Core emission.
405    /// β/D231/D246: takes the owner's `&Core` (no stored Core under the
406    /// actor model). The returned [`LogView`] carries no Core borrow;
407    /// tear it down via [`LogView::detach`] (D246 r3 — no RAII).
408    #[allow(clippy::too_many_lines)]
409    pub fn view(&self, core: &Core, spec: ViewSpec, intern: InternFn<Vec<T>>) -> LogView {
410        let view_node = core
411            .register_state(HandleId::new(0), false)
412            .expect("register_state for LogView");
413        // In-wave sink emitter: the `core.subscribe(..)` closures below
414        // fire IN-WAVE, so emission must be deferred (D246 r6).
415        let view_emitter = Arc::new(SinkEmitter {
416            mailbox: core.mailbox(),
417            node_id: view_node,
418            intern,
419            version: AtomicU64::new(0),
420        });
421
422        let inner = Arc::clone(&self.inner);
423        let mut subscriptions: Vec<(NodeId, SubscriptionId)> = Vec::new();
424
425        match spec {
426            ViewSpec::Tail { n } => {
427                let inner_c = Arc::clone(&inner);
428                let emitter_c = Arc::clone(&view_emitter);
429                let sub = core.subscribe(
430                    self.node_id,
431                    Arc::new(move |msgs| {
432                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
433                            let guard = inner_c.lock();
434                            let data = guard.backend.to_vec();
435                            let start = data.len().saturating_sub(n);
436                            let view = data[start..].to_vec();
437                            drop(guard);
438                            emitter_c.emit(view);
439                        }
440                    }),
441                );
442                subscriptions.push((self.node_id, sub));
443            }
444            ViewSpec::Slice { start, stop } => {
445                let inner_c = Arc::clone(&inner);
446                let emitter_c = Arc::clone(&view_emitter);
447                let sub = core.subscribe(
448                    self.node_id,
449                    Arc::new(move |msgs| {
450                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
451                            let guard = inner_c.lock();
452                            let data = guard.backend.to_vec();
453                            let end = stop.unwrap_or(data.len()).min(data.len());
454                            let s = start.min(end);
455                            let view = data[s..end].to_vec();
456                            drop(guard);
457                            emitter_c.emit(view);
458                        }
459                    }),
460                );
461                subscriptions.push((self.node_id, sub));
462            }
463            ViewSpec::FromCursor {
464                cursor_node,
465                read_cursor,
466            } => {
467                let cursor_pos = Arc::new(Mutex::new(0usize));
468
469                // Cursor subscription: update position and recompute.
470                let cursor_pos_c = Arc::clone(&cursor_pos);
471                let inner_c = Arc::clone(&inner);
472                let emitter_c = Arc::clone(&view_emitter);
473                let read_cursor_c = Arc::clone(&read_cursor);
474                let sub_cursor = core.subscribe(
475                    cursor_node,
476                    Arc::new(move |msgs| {
477                        for m in msgs {
478                            if let Message::Data(h) = m {
479                                let pos = read_cursor_c(*h);
480                                *cursor_pos_c.lock() = pos;
481                                let guard = inner_c.lock();
482                                let data = guard.backend.to_vec();
483                                let s = pos.min(data.len());
484                                let view = data[s..].to_vec();
485                                drop(guard);
486                                emitter_c.emit(view);
487                            }
488                        }
489                    }),
490                );
491                subscriptions.push((cursor_node, sub_cursor));
492
493                // Log subscription: recompute with current cursor.
494                let cursor_pos_c2 = Arc::clone(&cursor_pos);
495                let inner_c2 = Arc::clone(&inner);
496                let emitter_c2 = view_emitter;
497                let sub_log = core.subscribe(
498                    self.node_id,
499                    Arc::new(move |msgs| {
500                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
501                            let pos = *cursor_pos_c2.lock();
502                            let guard = inner_c2.lock();
503                            let data = guard.backend.to_vec();
504                            let s = pos.min(data.len());
505                            let view = data[s..].to_vec();
506                            drop(guard);
507                            emitter_c2.emit(view);
508                        }
509                    }),
510                );
511                subscriptions.push((self.node_id, sub_log));
512            }
513        }
514
515        LogView {
516            node_id: view_node,
517            sub: ReactiveSub {
518                subs: subscriptions,
519            },
520        }
521    }
522
523    /// Incremental running aggregate over the log.
524    ///
525    /// Returns a [`ScanHandle`] whose `node_id` emits the current accumulator
526    /// on every log mutation. Appends are O(1) (only new entries processed);
527    /// `trimHead`/`clear` trigger a full rescan.
528    pub fn scan<TAcc: Clone + Send + Sync + 'static>(
529        &self,
530        core: &Core,
531        initial: TAcc,
532        step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
533        intern: InternFn<TAcc>,
534    ) -> ScanHandle {
535        struct ScanState<T, TAcc> {
536            acc: TAcc,
537            processed: usize,
538            initial: TAcc,
539            step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
540        }
541
542        let scan_node = core
543            .register_state(HandleId::new(0), false)
544            .expect("register_state for Scan");
545
546        let state = Arc::new(Mutex::new(ScanState {
547            acc: initial.clone(),
548            processed: 0,
549            initial,
550            step,
551        }));
552        let inner = Arc::clone(&self.inner);
553        // In-wave sink emitter (D246 r6): the `core.subscribe(..)`
554        // closure below fires IN-WAVE, so emission is deferred.
555        let scan_emitter = Arc::new(SinkEmitter {
556            mailbox: core.mailbox(),
557            node_id: scan_node,
558            intern,
559            version: AtomicU64::new(0),
560        });
561
562        let sub = core.subscribe(
563            self.node_id,
564            Arc::new(move |msgs| {
565                if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
566                    let mut ss = state.lock();
567                    let guard = inner.lock();
568                    let data = guard.backend.to_vec();
569                    drop(guard);
570
571                    if data.len() < ss.processed {
572                        // Length decreased (clear/trim) — full rescan.
573                        ss.acc = ss.initial.clone();
574                        ss.processed = 0;
575                    }
576                    for item in &data[ss.processed..] {
577                        ss.acc = (ss.step)(&ss.acc, item);
578                    }
579                    ss.processed = data.len();
580                    let acc = ss.acc.clone();
581                    drop(ss);
582                    scan_emitter.emit(acc);
583                }
584            }),
585        );
586
587        ScanHandle {
588            node_id: scan_node,
589            sub: ReactiveSub {
590                subs: vec![(self.node_id, sub)],
591            },
592        }
593    }
594
595    /// Subscribe to an upstream node and append every DATA value into this log.
596    ///
597    /// `read_value` converts the upstream `HandleId` to a `T` for appending.
598    /// Returns a [`ReactiveSub`] — call [`ReactiveSub::detach`] to stop
599    /// the attachment (D246 r3 — no RAII).
600    /// β/D231: takes the owner's `&Core`.
601    pub fn attach(
602        &self,
603        core: &Core,
604        upstream: NodeId,
605        read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
606    ) -> ReactiveSub {
607        self.attach_with_options(core, upstream, read_value, AttachOptions::default())
608    }
609
610    /// D270 — variant of [`Self::attach`] accepting [`AttachOptions`].
611    /// Use when you need `skip_cached_replay` or future attach knobs;
612    /// the no-option `attach` delegates here with `AttachOptions::default()`
613    /// for back-compat callers.
614    pub fn attach_with_options(
615        &self,
616        core: &Core,
617        upstream: NodeId,
618        read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
619        opts: AttachOptions,
620    ) -> ReactiveSub {
621        let inner = Arc::clone(&self.inner);
622        // β/D232-AMEND: the long-lived attach sink re-enters Core to
623        // emit; it captures the `Send + Sync` `Arc<CoreMailbox>` and
624        // posts `MailboxOp::Emit` (drained owner-side in-wave / at the
625        // pump) — it can no longer hold a relocatable `WeakCore`.
626        let mailbox = core.mailbox();
627        let node_id = self.node_id;
628        let intern = Arc::clone(&self.emitter.intern);
629
630        // D270: gate the "skip first DATA-bearing batch" decision on
631        // the upstream having a cached value at attach time. The flag
632        // is one-shot — once a DATA batch arrives and is dropped, the
633        // sink delivers normally for all subsequent batches.
634        let suppress = if opts.skip_cached_replay {
635            let cache = core.cache_of(upstream);
636            cache != graphrefly_core::NO_HANDLE
637        } else {
638            false
639        };
640        let skip_state = Arc::new(parking_lot::Mutex::new(suppress));
641
642        let sub = core.subscribe(
643            upstream,
644            Arc::new(move |msgs| {
645                // D270: check-and-clear the skip flag if THIS batch
646                // carries DATA. Locks-then-clears in one atomic step
647                // so the next batch (live emission) is not skipped.
648                let should_skip = {
649                    let mut s = skip_state.lock();
650                    let has_data = msgs.iter().any(|m| matches!(m, Message::Data(_)));
651                    let do_skip = *s && has_data;
652                    if do_skip {
653                        *s = false;
654                    }
655                    do_skip
656                };
657                if should_skip {
658                    return;
659                }
660                for m in msgs {
661                    if let Message::Data(h) = m {
662                        let value = read_value(*h);
663                        let snapshot = {
664                            let mut guard = inner.lock();
665                            guard.backend.append(value);
666                            guard.backend.to_vec()
667                        };
668                        let handle = (intern)(snapshot);
669                        // Core-gone ⇒ drop-unrun, same as the pre-S2b
670                        // `weak_core.upgrade()` → `None` no-op.
671                        let _ = mailbox.post_emit(node_id, handle);
672                    }
673                }
674            }),
675        );
676        ReactiveSub {
677            subs: vec![(upstream, sub)],
678        }
679    }
680
681    /// Wire append-log storage sinks to this log.
682    ///
683    /// If `preload` is true, attempts `load_entries()` on the first sink that
684    /// returns data and appends to this log before subscribing. After
685    /// subscription, deltas are forwarded to ALL sinks. On trim/clear, the
686    /// full snapshot is re-shipped. Sink errors are swallowed (logged via
687    /// `eprintln!`) so a failing tier doesn't break the reactive graph.
688    pub fn attach_storage(
689        &self,
690        core: &Core,
691        sinks: Vec<Arc<dyn AppendLogSink<T>>>,
692        preload: bool,
693    ) -> AttachStorageHandle {
694        if sinks.is_empty() {
695            let sub = core.subscribe(self.node_id, Arc::new(|_| {}));
696            return AttachStorageHandle {
697                sub: ReactiveSub {
698                    subs: vec![(self.node_id, sub)],
699                },
700            };
701        }
702
703        if preload {
704            for sink in &sinks {
705                if let Ok(entries) = sink.load_entries() {
706                    if !entries.is_empty() {
707                        self.append_many(core, entries);
708                        break;
709                    }
710                }
711            }
712        }
713
714        let current_size = self.size();
715        // All sinks start delivered at current_size (post-preload snapshot).
716        // The subscriber only sees future emissions, so no double-write.
717        let delivered: Vec<Arc<Mutex<usize>>> = sinks
718            .iter()
719            .map(|_| Arc::new(Mutex::new(current_size)))
720            .collect();
721
722        let inner = Arc::clone(&self.inner);
723        let sinks_arc = sinks;
724        let delivered_arc = delivered;
725
726        let sub = core.subscribe(
727            self.node_id,
728            Arc::new(move |msgs| {
729                if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
730                    let guard = inner.lock();
731                    let data = guard.backend.to_vec();
732                    drop(guard);
733
734                    for (i, sink) in sinks_arc.iter().enumerate() {
735                        let mut del = delivered_arc[i].lock();
736                        let result = match data.len().cmp(&*del) {
737                            std::cmp::Ordering::Greater => sink.append_entries(&data[*del..]),
738                            std::cmp::Ordering::Less => sink.append_entries(&data),
739                            std::cmp::Ordering::Equal => continue,
740                        };
741                        match result {
742                            Ok(()) => *del = data.len(),
743                            Err(e) => eprintln!("attach_storage sink[{i}] error: {e}"),
744                        }
745                    }
746                }
747            }),
748        );
749
750        AttachStorageHandle {
751            sub: ReactiveSub {
752                subs: vec![(self.node_id, sub)],
753            },
754        }
755    }
756}
757
758// ---------------------------------------------------------------------------
759// ReactiveList
760// ---------------------------------------------------------------------------
761
762/// Reactive ordered list.
763///
764/// Emits `Vec<T>` snapshots via a Core state node on every mutation.
765pub struct ReactiveList<T: Clone + Send + Sync + 'static> {
766    inner: Mutex<ListInner<T>>,
767    emitter: EmitHandle<Vec<T>>,
768    pub node_id: NodeId,
769}
770
771struct ListInner<T: Clone + Send + Sync + 'static> {
772    backend: Box<dyn ListBackend<T>>,
773    mutation_log: Option<Vec<BaseChange<ListChange<T>>>>,
774    structure_name: String,
775}
776
777impl<T: Clone + Send + Sync + 'static> ListInner<T> {
778    fn record(&mut self, change: ListChange<T>, version: Version) {
779        if let Some(log) = &mut self.mutation_log {
780            log.push(BaseChange {
781                structure: self.structure_name.clone(),
782                version,
783                t_ns: wall_clock_ns(),
784                seq: None,
785                lifecycle: Lifecycle::Data,
786                change,
787            });
788        }
789    }
790}
791
792/// Options for constructing a [`ReactiveList`].
793pub struct ReactiveListOptions<T: Clone + Send + Sync + 'static> {
794    pub name: String,
795    pub backend: Option<Box<dyn ListBackend<T>>>,
796    pub mutation_log: bool,
797}
798
799impl<T: Clone + Send + Sync + 'static> Default for ReactiveListOptions<T> {
800    fn default() -> Self {
801        Self {
802            name: "reactiveList".into(),
803            backend: None,
804            mutation_log: false,
805        }
806    }
807}
808
809impl<T: Clone + Send + Sync + 'static> ReactiveList<T> {
810    #[must_use]
811    pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveListOptions<T>) -> Self {
812        let node_id = core
813            .register_state(HandleId::new(0), false)
814            .expect("register_state for ReactiveList");
815        let backend: Box<dyn ListBackend<T>> = opts
816            .backend
817            .unwrap_or_else(|| Box::new(VecListBackend::new()));
818        let mutation_log = if opts.mutation_log {
819            Some(Vec::new())
820        } else {
821            None
822        };
823        let inner = ListInner {
824            backend,
825            mutation_log,
826            structure_name: opts.name,
827        };
828        Self {
829            inner: Mutex::new(inner),
830            emitter: EmitHandle {
831                node_id,
832                intern,
833                version: AtomicU64::new(0),
834            },
835            node_id,
836        }
837    }
838
839    #[must_use]
840    pub fn size(&self) -> usize {
841        self.inner.lock().backend.size()
842    }
843
844    #[must_use]
845    pub fn at(&self, index: i64) -> Option<T> {
846        self.inner.lock().backend.at(index)
847    }
848
849    pub fn append(&self, core: &Core, value: T) {
850        let (snapshot, change) = {
851            let mut inner = self.inner.lock();
852            let change = inner.mutation_log.is_some().then(|| ListChange::Append {
853                value: value.clone(),
854            });
855            inner.backend.append(value);
856            (inner.backend.to_vec(), change)
857        };
858        let version = self.emitter.emit(core, snapshot);
859        if let Some(change) = change {
860            self.inner.lock().record(change, version);
861        }
862    }
863
864    pub fn append_many(&self, core: &Core, values: Vec<T>) {
865        if values.is_empty() {
866            return;
867        }
868        let (snapshot, change) = {
869            let mut inner = self.inner.lock();
870            let change = inner
871                .mutation_log
872                .is_some()
873                .then(|| ListChange::AppendMany {
874                    values: values.clone(),
875                });
876            inner.backend.append_many(values);
877            (inner.backend.to_vec(), change)
878        };
879        let version = self.emitter.emit(core, snapshot);
880        if let Some(change) = change {
881            self.inner.lock().record(change, version);
882        }
883    }
884
885    /// Insert `value` at `index`. Returns `Err(IndexOutOfBounds)` if
886    /// `index > self.size()`.
887    pub fn insert(&self, core: &Core, index: usize, value: T) -> Result<(), IndexOutOfBounds> {
888        let (snapshot, change) = {
889            let mut inner = self.inner.lock();
890            if index > inner.backend.size() {
891                return Err(IndexOutOfBounds);
892            }
893            let change = inner.mutation_log.is_some().then(|| ListChange::Insert {
894                index,
895                value: value.clone(),
896            });
897            inner.backend.insert(index, value);
898            (inner.backend.to_vec(), change)
899        };
900        let version = self.emitter.emit(core, snapshot);
901        if let Some(change) = change {
902            self.inner.lock().record(change, version);
903        }
904        Ok(())
905    }
906
907    /// Insert `values` starting at `index`. Returns `Err(IndexOutOfBounds)` if
908    /// `index > self.size()`.
909    pub fn insert_many(
910        &self,
911        core: &Core,
912        index: usize,
913        values: Vec<T>,
914    ) -> Result<(), IndexOutOfBounds> {
915        if values.is_empty() {
916            return Ok(());
917        }
918        let (snapshot, change) = {
919            let mut inner = self.inner.lock();
920            if index > inner.backend.size() {
921                return Err(IndexOutOfBounds);
922            }
923            let change = inner
924                .mutation_log
925                .is_some()
926                .then(|| ListChange::InsertMany {
927                    index,
928                    values: values.clone(),
929                });
930            inner.backend.insert_many(index, values);
931            (inner.backend.to_vec(), change)
932        };
933        let version = self.emitter.emit(core, snapshot);
934        if let Some(change) = change {
935            self.inner.lock().record(change, version);
936        }
937        Ok(())
938    }
939
940    /// Remove and return element at `index`. Negative indices count from end.
941    /// Returns `None` if index is out of range.
942    pub fn pop(&self, core: &Core, index: i64) -> Option<T> {
943        let (value, snapshot, change) = {
944            let mut inner = self.inner.lock();
945            let value = inner.backend.pop(index)?;
946            let change = inner.mutation_log.is_some().then(|| ListChange::Pop {
947                index,
948                value: value.clone(),
949            });
950            let snapshot = inner.backend.to_vec();
951            (value, snapshot, change)
952        };
953        let version = self.emitter.emit(core, snapshot);
954        if let Some(change) = change {
955            self.inner.lock().record(change, version);
956        }
957        Some(value)
958    }
959
960    pub fn clear(&self, core: &Core) {
961        let (snapshot, count) = {
962            let mut inner = self.inner.lock();
963            let count = inner.backend.clear();
964            if count == 0 {
965                return;
966            }
967            (inner.backend.to_vec(), count)
968        };
969        let version = self.emitter.emit(core, snapshot);
970        self.inner
971            .lock()
972            .record(ListChange::Clear { count }, version);
973    }
974
975    #[must_use]
976    pub fn to_vec(&self) -> Vec<T> {
977        self.inner.lock().backend.to_vec()
978    }
979
980    #[must_use]
981    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<ListChange<T>>>> {
982        self.inner.lock().mutation_log.clone()
983    }
984}
985
986// ---------------------------------------------------------------------------
987// ReactiveMap
988// ---------------------------------------------------------------------------
989
990/// Reactive key-value map.
991///
992/// Emits `Vec<(K, V)>` snapshots via a Core state node on every mutation.
993/// (Vec of pairs rather than HashMap for serialization stability.)
994pub struct ReactiveMap<K, V>
995where
996    K: Clone + Eq + Hash + Send + Sync + 'static,
997    V: Clone + Send + Sync + 'static,
998{
999    inner: Mutex<MapInner<K, V>>,
1000    emitter: EmitHandle<Vec<(K, V)>>,
1001    pub node_id: NodeId,
1002}
1003
1004/// Score-based retention policy for [`ReactiveMap`].
1005///
1006/// After every mutation, entries are scored and those below `archive_threshold`
1007/// (or exceeding `max_size` by ascending score) are archived (deleted).
1008/// Mutually exclusive with top-level `max_size` (LRU).
1009pub struct RetentionPolicy<K, V>
1010where
1011    K: Clone + Eq + Hash + Send + Sync + 'static,
1012    V: Clone + Send + Sync + 'static,
1013{
1014    /// Score function — higher is kept.
1015    pub score: Arc<dyn Fn(&K, &V) -> f64 + Send + Sync>,
1016    /// Entries scoring below this threshold are archived.
1017    pub archive_threshold: Option<f64>,
1018    /// Maximum number of live entries. Lowest-scored entries archived first.
1019    pub max_size: Option<usize>,
1020    /// Called for each archived entry.
1021    pub on_archive: Option<Arc<dyn Fn(&K, &V, f64) + Send + Sync>>,
1022}
1023
1024struct MapInner<K, V>
1025where
1026    K: Clone + Eq + Hash + Send + Sync + 'static,
1027    V: Clone + Send + Sync + 'static,
1028{
1029    backend: Box<dyn MapBackend<K, V>>,
1030    mutation_log: Option<Vec<BaseChange<MapChange<K, V>>>>,
1031    structure_name: String,
1032    /// Per-key expiry timestamps (monotonic ns). Present only when TTL is enabled.
1033    ttl_expiry: HashMap<K, u64>,
1034    /// Default TTL in nanoseconds. None = no TTL.
1035    default_ttl_ns: Option<u64>,
1036    /// LRU access order (most recently used at end). Present only when LRU is enabled.
1037    lru_order: Vec<K>,
1038    /// Maximum entries for LRU eviction. None = no LRU cap.
1039    lru_max_size: Option<usize>,
1040    /// Score-based retention policy.
1041    retention: Option<RetentionPolicy<K, V>>,
1042}
1043
1044impl<K, V> MapInner<K, V>
1045where
1046    K: Clone + Eq + Hash + Send + Sync + 'static,
1047    V: Clone + Send + Sync + 'static,
1048{
1049    fn record(&mut self, change: MapChange<K, V>, version: Version) {
1050        if let Some(log) = &mut self.mutation_log {
1051            log.push(BaseChange {
1052                structure: self.structure_name.clone(),
1053                version,
1054                t_ns: wall_clock_ns(),
1055                seq: None,
1056                lifecycle: Lifecycle::Data,
1057                change,
1058            });
1059        }
1060    }
1061
1062    /// Prune expired keys. Returns deleted (key, previous_value) pairs.
1063    fn prune_expired_inner(&mut self) -> Vec<(K, V)> {
1064        if self.ttl_expiry.is_empty() {
1065            return vec![];
1066        }
1067        let now = monotonic_ns();
1068        let expired_keys: Vec<K> = self
1069            .ttl_expiry
1070            .iter()
1071            .filter(|(_, &exp)| now >= exp)
1072            .map(|(k, _)| k.clone())
1073            .collect();
1074        let mut expired = Vec::new();
1075        for k in expired_keys {
1076            if let Some(prev) = self.backend.get(&k) {
1077                self.backend.delete(&k);
1078                self.ttl_expiry.remove(&k);
1079                self.lru_remove(&k);
1080                expired.push((k, prev));
1081            }
1082        }
1083        expired
1084    }
1085
1086    /// Apply retention policy. Returns archived keys with scores.
1087    fn apply_retention_inner(&mut self) -> Vec<(K, V, f64)> {
1088        // Extract retention config values to avoid borrow conflict.
1089        let (score_fn, archive_threshold, max_size) = match &self.retention {
1090            Some(r) => (Arc::clone(&r.score), r.archive_threshold, r.max_size),
1091            None => return vec![],
1092        };
1093        let entries = self.backend.to_vec();
1094        if entries.is_empty() {
1095            return vec![];
1096        }
1097        let mut scored: Vec<(K, V, f64)> = entries
1098            .into_iter()
1099            .map(|(k, v)| {
1100                let s = (score_fn)(&k, &v);
1101                (k, v, s)
1102            })
1103            .collect();
1104        scored.sort_by(|a, b| a.2.total_cmp(&b.2));
1105
1106        let mut archived = Vec::new();
1107        if let Some(threshold) = archive_threshold {
1108            while let Some(entry) = scored.first() {
1109                if entry.2 < threshold {
1110                    let (k, v, s) = scored.remove(0);
1111                    self.backend.delete(&k);
1112                    self.ttl_expiry.remove(&k);
1113                    self.lru_remove(&k);
1114                    archived.push((k, v, s));
1115                } else {
1116                    break;
1117                }
1118            }
1119        }
1120        if let Some(max) = max_size {
1121            while scored.len() > max {
1122                let (k, v, s) = scored.remove(0);
1123                self.backend.delete(&k);
1124                self.ttl_expiry.remove(&k);
1125                self.lru_remove(&k);
1126                archived.push((k, v, s));
1127            }
1128        }
1129        archived
1130    }
1131
1132    /// Touch key in LRU order (move to end). Does NOT emit.
1133    fn lru_touch(&mut self, key: &K) {
1134        if self.lru_max_size.is_none() {
1135            return;
1136        }
1137        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
1138            self.lru_order.remove(pos);
1139            self.lru_order.push(key.clone());
1140        }
1141    }
1142
1143    /// Remove key from LRU order.
1144    fn lru_remove(&mut self, key: &K) {
1145        if self.lru_max_size.is_none() {
1146            return;
1147        }
1148        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
1149            self.lru_order.remove(pos);
1150        }
1151    }
1152
1153    /// Evict LRU entries exceeding max_size. Returns evicted (key, previous_value) pairs.
1154    fn lru_evict(&mut self) -> Vec<(K, V)> {
1155        let Some(max) = self.lru_max_size else {
1156            return vec![];
1157        };
1158        let mut evicted = Vec::new();
1159        while self.backend.size() > max && !self.lru_order.is_empty() {
1160            let victim = self.lru_order.remove(0);
1161            if let Some(prev) = self.backend.get(&victim) {
1162                self.backend.delete(&victim);
1163                self.ttl_expiry.remove(&victim);
1164                evicted.push((victim, prev));
1165            }
1166        }
1167        evicted
1168    }
1169
1170    /// Record TTL for a key. Per-call `ttl` (seconds) overrides `default_ttl_ns`.
1171    fn set_ttl_with(&mut self, key: &K, ttl: Option<f64>) {
1172        let ttl_ns = match ttl {
1173            Some(secs) => Some((secs * 1_000_000_000.0) as u64),
1174            None => self.default_ttl_ns,
1175        };
1176        if let Some(ns) = ttl_ns {
1177            self.ttl_expiry.insert(key.clone(), monotonic_ns() + ns);
1178        }
1179    }
1180}
1181
1182/// Options for constructing a [`ReactiveMap`].
1183pub struct ReactiveMapOptions<K, V>
1184where
1185    K: Clone + Eq + Hash + Send + Sync + 'static,
1186    V: Clone + Send + Sync + 'static,
1187{
1188    pub name: String,
1189    pub backend: Option<Box<dyn MapBackend<K, V>>>,
1190    pub mutation_log: bool,
1191    /// Default TTL in seconds. Applied to all `set`/`set_many` calls.
1192    /// Must be > 0 if set.
1193    pub default_ttl: Option<f64>,
1194    /// LRU cap. Mutually exclusive with `retention`.
1195    pub max_size: Option<usize>,
1196    /// Score-based retention policy. Mutually exclusive with `max_size`.
1197    pub retention: Option<RetentionPolicy<K, V>>,
1198}
1199
1200impl<K, V> Default for ReactiveMapOptions<K, V>
1201where
1202    K: Clone + Eq + Hash + Send + Sync + 'static,
1203    V: Clone + Send + Sync + 'static,
1204{
1205    fn default() -> Self {
1206        Self {
1207            name: "reactiveMap".into(),
1208            backend: None,
1209            mutation_log: false,
1210            default_ttl: None,
1211            max_size: None,
1212            retention: None,
1213        }
1214    }
1215}
1216
1217/// Configuration validation error for [`ReactiveMap`].
1218#[derive(Debug, Clone, PartialEq, Eq)]
1219pub struct MapConfigError(pub String);
1220
1221impl std::fmt::Display for MapConfigError {
1222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1223        f.write_str(&self.0)
1224    }
1225}
1226
1227impl std::error::Error for MapConfigError {}
1228
1229impl<K, V> ReactiveMap<K, V>
1230where
1231    K: Clone + Eq + Hash + Send + Sync + 'static,
1232    V: Clone + Send + Sync + 'static,
1233{
1234    /// Create a new reactive map.
1235    ///
1236    /// # Errors
1237    /// Returns `MapConfigError` if both `max_size` (LRU) and `retention` are set.
1238    pub fn new(
1239        core: &Core,
1240        intern: InternFn<Vec<(K, V)>>,
1241        opts: ReactiveMapOptions<K, V>,
1242    ) -> Result<Self, MapConfigError> {
1243        if opts.max_size.is_some() && opts.retention.is_some() {
1244            return Err(MapConfigError(
1245                "max_size (LRU) and retention are mutually exclusive".into(),
1246            ));
1247        }
1248        if let Some(ref r) = opts.retention {
1249            if r.archive_threshold.is_none() && r.max_size.is_none() {
1250                return Err(MapConfigError(
1251                    "retention requires at least one of archive_threshold or max_size".into(),
1252                ));
1253            }
1254        }
1255        if let Some(ttl) = opts.default_ttl {
1256            if ttl <= 0.0 {
1257                return Err(MapConfigError("default_ttl must be > 0".into()));
1258            }
1259        }
1260        let node_id = core
1261            .register_state(HandleId::new(0), false)
1262            .expect("register_state for ReactiveMap");
1263        let backend: Box<dyn MapBackend<K, V>> = opts
1264            .backend
1265            .unwrap_or_else(|| Box::new(HashMapBackend::new()));
1266        let mutation_log = if opts.mutation_log {
1267            Some(Vec::new())
1268        } else {
1269            None
1270        };
1271        let default_ttl_ns = opts.default_ttl.map(|secs| (secs * 1_000_000_000.0) as u64);
1272        let inner = MapInner {
1273            backend,
1274            mutation_log,
1275            structure_name: opts.name,
1276            ttl_expiry: HashMap::new(),
1277            default_ttl_ns,
1278            lru_order: Vec::new(),
1279            lru_max_size: opts.max_size,
1280            retention: opts.retention,
1281        };
1282        Ok(Self {
1283            inner: Mutex::new(inner),
1284            emitter: EmitHandle {
1285                node_id,
1286                intern,
1287                version: AtomicU64::new(0),
1288            },
1289            node_id,
1290        })
1291    }
1292
1293    #[must_use]
1294    pub fn size(&self) -> usize {
1295        self.inner.lock().backend.size()
1296    }
1297
1298    /// Check if key exists. Expired keys are pruned (observable side-effect).
1299    /// LRU touch: live key is marked as most-recently-used (no emission).
1300    pub fn has(&self, core: &Core, key: &K) -> bool {
1301        let (has, expired) = {
1302            let mut inner = self.inner.lock();
1303            let mut target_expired = false;
1304            // Check TTL expiry first.
1305            if inner.default_ttl_ns.is_some() {
1306                if let Some(&exp) = inner.ttl_expiry.get(key) {
1307                    if monotonic_ns() >= exp {
1308                        target_expired = true;
1309                    }
1310                }
1311            }
1312            // Prune all expired keys (including the target if expired).
1313            let mut expired = inner.prune_expired_inner();
1314            if target_expired && !expired.iter().any(|(k, _)| k == key) {
1315                // Target was expired but prune_expired_inner didn't catch it
1316                // (race with monotonic_ns). Delete it explicitly.
1317                if let Some(prev) = inner.backend.get(key) {
1318                    inner.backend.delete(key);
1319                    inner.ttl_expiry.remove(key);
1320                    inner.lru_remove(key);
1321                    expired.push((key.clone(), prev));
1322                }
1323            }
1324            let has = if target_expired {
1325                false
1326            } else {
1327                let h = inner.backend.has(key);
1328                if h {
1329                    inner.lru_touch(key);
1330                }
1331                h
1332            };
1333            (has, expired)
1334        };
1335        if !expired.is_empty() {
1336            let snapshot = self.inner.lock().backend.to_vec();
1337            let version = self.emitter.emit(core, snapshot);
1338            let mut inner = self.inner.lock();
1339            for (k, prev) in expired {
1340                inner.record(
1341                    MapChange::Delete {
1342                        key: k,
1343                        previous: prev,
1344                        reason: DeleteReason::Expired,
1345                    },
1346                    version.clone(),
1347                );
1348            }
1349        }
1350        has
1351    }
1352
1353    /// Get value by key. Expired keys return `None` (observable side-effect).
1354    /// LRU touch: live key is marked as most-recently-used (no emission).
1355    pub fn get(&self, core: &Core, key: &K) -> Option<V> {
1356        let (value, expired) = {
1357            let mut inner = self.inner.lock();
1358            let mut target_expired = false;
1359            if inner.default_ttl_ns.is_some() {
1360                if let Some(&exp) = inner.ttl_expiry.get(key) {
1361                    if monotonic_ns() >= exp {
1362                        target_expired = true;
1363                    }
1364                }
1365            }
1366            // Prune all expired keys (including the target if expired).
1367            let mut expired = inner.prune_expired_inner();
1368            if target_expired && !expired.iter().any(|(k, _)| k == key) {
1369                // Target was expired but prune_expired_inner didn't catch it.
1370                if let Some(prev) = inner.backend.get(key) {
1371                    inner.backend.delete(key);
1372                    inner.ttl_expiry.remove(key);
1373                    inner.lru_remove(key);
1374                    expired.push((key.clone(), prev));
1375                }
1376            }
1377            let value = if target_expired {
1378                None
1379            } else {
1380                let v = inner.backend.get(key);
1381                if v.is_some() {
1382                    inner.lru_touch(key);
1383                }
1384                v
1385            };
1386            (value, expired)
1387        };
1388        if !expired.is_empty() {
1389            let snapshot = self.inner.lock().backend.to_vec();
1390            let version = self.emitter.emit(core, snapshot);
1391            let mut inner = self.inner.lock();
1392            for (k, prev) in expired {
1393                inner.record(
1394                    MapChange::Delete {
1395                        key: k,
1396                        previous: prev,
1397                        reason: DeleteReason::Expired,
1398                    },
1399                    version.clone(),
1400                );
1401            }
1402        }
1403        value
1404    }
1405
1406    pub fn set(&self, core: &Core, key: K, value: V) {
1407        self.set_with_ttl(core, key, value, None);
1408    }
1409
1410    /// Set a key with an optional per-call TTL override (seconds).
1411    ///
1412    /// # Panics
1413    /// Panics if `ttl` is `Some` with a non-positive or non-finite value.
1414    pub fn set_with_ttl(&self, core: &Core, key: K, value: V, ttl: Option<f64>) {
1415        if let Some(t) = ttl {
1416            assert!(
1417                t > 0.0 && t.is_finite(),
1418                "per-call ttl must be positive and finite"
1419            );
1420        }
1421        let (snapshot, change, eviction_changes) = {
1422            let mut inner = self.inner.lock();
1423            let expired = inner.prune_expired_inner();
1424            let change = inner.mutation_log.is_some().then(|| MapChange::Set {
1425                key: key.clone(),
1426                value: value.clone(),
1427            });
1428            inner.set_ttl_with(&key, ttl);
1429            inner.lru_remove(&key);
1430            if inner.lru_max_size.is_some() {
1431                inner.lru_order.push(key.clone());
1432            }
1433            inner.backend.set(key, value);
1434            let evicted = inner.lru_evict();
1435            let archived = inner.apply_retention_inner();
1436            let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1437            for (k, prev) in expired {
1438                eviction_changes.push((k, prev, DeleteReason::Expired));
1439            }
1440            for (k, prev) in evicted {
1441                eviction_changes.push((k, prev, DeleteReason::LruEvict));
1442            }
1443            for (k, v, s) in &archived {
1444                if let Some(on_archive) =
1445                    &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1446                {
1447                    on_archive(k, v, *s);
1448                }
1449                eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1450            }
1451            (inner.backend.to_vec(), change, eviction_changes)
1452        };
1453        let version = self.emitter.emit(core, snapshot);
1454        if change.is_some() || !eviction_changes.is_empty() {
1455            let mut inner = self.inner.lock();
1456            if let Some(change) = change {
1457                inner.record(change, version.clone());
1458            }
1459            for (k, prev, reason) in eviction_changes {
1460                inner.record(
1461                    MapChange::Delete {
1462                        key: k,
1463                        previous: prev,
1464                        reason,
1465                    },
1466                    version.clone(),
1467                );
1468            }
1469        }
1470    }
1471
1472    pub fn set_many(&self, core: &Core, entries: Vec<(K, V)>) {
1473        self.set_many_with_ttl(core, entries, None);
1474    }
1475
1476    /// Batch set with optional per-call TTL override (seconds).
1477    ///
1478    /// # Panics
1479    /// Panics if `ttl` is `Some` with a non-positive or non-finite value.
1480    pub fn set_many_with_ttl(&self, core: &Core, entries: Vec<(K, V)>, ttl: Option<f64>) {
1481        if let Some(t) = ttl {
1482            assert!(
1483                t > 0.0 && t.is_finite(),
1484                "per-call ttl must be positive and finite"
1485            );
1486        }
1487        if entries.is_empty() {
1488            return;
1489        }
1490        let (snapshot, changes, eviction_changes) = {
1491            let mut inner = self.inner.lock();
1492            let expired = inner.prune_expired_inner();
1493            let changes: Option<Vec<MapChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1494                entries
1495                    .iter()
1496                    .map(|(k, v)| MapChange::Set {
1497                        key: k.clone(),
1498                        value: v.clone(),
1499                    })
1500                    .collect()
1501            });
1502            for (k, _) in &entries {
1503                inner.set_ttl_with(k, ttl);
1504                inner.lru_remove(k);
1505                if inner.lru_max_size.is_some() {
1506                    inner.lru_order.push(k.clone());
1507                }
1508            }
1509            inner.backend.set_many(entries);
1510            let evicted = inner.lru_evict();
1511            let archived = inner.apply_retention_inner();
1512            let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1513            for (k, prev) in expired {
1514                eviction_changes.push((k, prev, DeleteReason::Expired));
1515            }
1516            for (k, prev) in evicted {
1517                eviction_changes.push((k, prev, DeleteReason::LruEvict));
1518            }
1519            for (k, v, s) in &archived {
1520                if let Some(on_archive) =
1521                    &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1522                {
1523                    on_archive(k, v, *s);
1524                }
1525                eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1526            }
1527            (inner.backend.to_vec(), changes, eviction_changes)
1528        };
1529        let version = self.emitter.emit(core, snapshot);
1530        if changes.is_some() || !eviction_changes.is_empty() {
1531            let mut inner = self.inner.lock();
1532            if let Some(changes) = changes {
1533                for change in changes {
1534                    inner.record(change, version.clone());
1535                }
1536            }
1537            for (k, prev, reason) in eviction_changes {
1538                inner.record(
1539                    MapChange::Delete {
1540                        key: k,
1541                        previous: prev,
1542                        reason,
1543                    },
1544                    version.clone(),
1545                );
1546            }
1547        }
1548    }
1549
1550    pub fn delete(&self, core: &Core, key: &K) {
1551        let (snapshot, previous) = {
1552            let mut inner = self.inner.lock();
1553            let previous = inner.backend.get(key);
1554            if !inner.backend.delete(key) {
1555                return;
1556            }
1557            inner.ttl_expiry.remove(key);
1558            inner.lru_remove(key);
1559            (inner.backend.to_vec(), previous)
1560        };
1561        let version = self.emitter.emit(core, snapshot);
1562        if let Some(prev) = previous {
1563            self.inner.lock().record(
1564                MapChange::Delete {
1565                    key: key.clone(),
1566                    previous: prev,
1567                    reason: DeleteReason::Explicit,
1568                },
1569                version,
1570            );
1571        }
1572    }
1573
1574    pub fn delete_many(&self, core: &Core, keys: &[K]) {
1575        let (snapshot, actually_deleted) = {
1576            let mut inner = self.inner.lock();
1577            let actually_deleted: Vec<(K, V)> = keys
1578                .iter()
1579                .filter_map(|k| inner.backend.get(k).map(|v| (k.clone(), v)))
1580                .collect();
1581            let removed = inner.backend.delete_many(keys);
1582            if removed == 0 {
1583                return;
1584            }
1585            for k in keys {
1586                inner.ttl_expiry.remove(k);
1587                inner.lru_remove(k);
1588            }
1589            (inner.backend.to_vec(), actually_deleted)
1590        };
1591        let version = self.emitter.emit(core, snapshot);
1592        if !actually_deleted.is_empty() {
1593            let mut inner = self.inner.lock();
1594            for (k, prev) in actually_deleted {
1595                inner.record(
1596                    MapChange::Delete {
1597                        key: k,
1598                        previous: prev,
1599                        reason: DeleteReason::Explicit,
1600                    },
1601                    version.clone(),
1602                );
1603            }
1604        }
1605    }
1606
1607    pub fn clear(&self, core: &Core) {
1608        let (snapshot, count) = {
1609            let mut inner = self.inner.lock();
1610            let count = inner.backend.clear();
1611            if count == 0 {
1612                return;
1613            }
1614            inner.ttl_expiry.clear();
1615            inner.lru_order.clear();
1616            (inner.backend.to_vec(), count)
1617        };
1618        let version = self.emitter.emit(core, snapshot);
1619        self.inner
1620            .lock()
1621            .record(MapChange::Clear { count }, version);
1622    }
1623
1624    /// Explicitly prune all expired keys. Returns the number of keys removed.
1625    pub fn prune_expired(&self, core: &Core) -> usize {
1626        let expired = {
1627            let mut inner = self.inner.lock();
1628            inner.prune_expired_inner()
1629        };
1630        if expired.is_empty() {
1631            return 0;
1632        }
1633        let count = expired.len();
1634        let snapshot = self.inner.lock().backend.to_vec();
1635        let version = self.emitter.emit(core, snapshot);
1636        let mut inner = self.inner.lock();
1637        for (k, prev) in expired {
1638            inner.record(
1639                MapChange::Delete {
1640                    key: k,
1641                    previous: prev,
1642                    reason: DeleteReason::Expired,
1643                },
1644                version.clone(),
1645            );
1646        }
1647        count
1648    }
1649
1650    #[must_use]
1651    pub fn to_vec(&self) -> Vec<(K, V)> {
1652        self.inner.lock().backend.to_vec()
1653    }
1654
1655    #[must_use]
1656    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<MapChange<K, V>>>> {
1657        self.inner.lock().mutation_log.clone()
1658    }
1659}
1660
1661// ---------------------------------------------------------------------------
1662// ReactiveIndex
1663// ---------------------------------------------------------------------------
1664
1665/// Reactive sorted index with primary key lookup and secondary sort order.
1666///
1667/// Emits `Vec<IndexRow<K, V>>` snapshots via a Core state node on every mutation.
1668pub struct ReactiveIndex<K, V>
1669where
1670    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1671    V: Clone + Send + Sync + 'static,
1672{
1673    inner: Mutex<IndexInner<K, V>>,
1674    emitter: EmitHandle<Vec<IndexRow<K, V>>>,
1675    pub node_id: NodeId,
1676}
1677
1678/// User-supplied equality function for upsert idempotency.
1679pub type IndexEqualsFn<K, V> = Arc<dyn Fn(&IndexRow<K, V>, &IndexRow<K, V>) -> bool + Send + Sync>;
1680
1681struct IndexInner<K, V>
1682where
1683    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1684    V: Clone + Send + Sync + 'static,
1685{
1686    backend: Box<dyn IndexBackend<K, V>>,
1687    mutation_log: Option<Vec<BaseChange<IndexChange<K, V>>>>,
1688    structure_name: String,
1689    /// Factory-level equals for upsert idempotency. When set, `upsert` on an
1690    /// existing key compares the existing row with the proposed row; if
1691    /// `equals(existing, proposed)` returns `true`, the upsert is a no-op
1692    /// (no version bump, no emission).
1693    equals: Option<IndexEqualsFn<K, V>>,
1694}
1695
1696impl<K, V> IndexInner<K, V>
1697where
1698    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1699    V: Clone + Send + Sync + 'static,
1700{
1701    fn record(&mut self, change: IndexChange<K, V>, version: Version) {
1702        if let Some(log) = &mut self.mutation_log {
1703            log.push(BaseChange {
1704                structure: self.structure_name.clone(),
1705                version,
1706                t_ns: wall_clock_ns(),
1707                seq: None,
1708                lifecycle: Lifecycle::Data,
1709                change,
1710            });
1711        }
1712    }
1713}
1714
1715/// Per-call upsert options for [`ReactiveIndex::upsert`].
1716pub struct UpsertOptions<K, V>
1717where
1718    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1719    V: Clone + Send + Sync + 'static,
1720{
1721    /// Per-call equals override. Takes precedence over the factory-level equals.
1722    pub equals: Option<IndexEqualsFn<K, V>>,
1723}
1724
1725impl<K, V> Default for UpsertOptions<K, V>
1726where
1727    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1728    V: Clone + Send + Sync + 'static,
1729{
1730    fn default() -> Self {
1731        Self { equals: None }
1732    }
1733}
1734
1735/// Options for constructing a [`ReactiveIndex`].
1736pub struct ReactiveIndexOptions<K, V>
1737where
1738    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1739    V: Clone + Send + Sync + 'static,
1740{
1741    pub name: String,
1742    pub backend: Option<Box<dyn IndexBackend<K, V>>>,
1743    pub mutation_log: bool,
1744    /// Factory-level equals for upsert idempotency.
1745    pub equals: Option<IndexEqualsFn<K, V>>,
1746}
1747
1748impl<K, V> Default for ReactiveIndexOptions<K, V>
1749where
1750    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1751    V: Clone + Send + Sync + 'static,
1752{
1753    fn default() -> Self {
1754        Self {
1755            name: "reactiveIndex".into(),
1756            backend: None,
1757            mutation_log: false,
1758            equals: None,
1759        }
1760    }
1761}
1762
1763impl<K, V> ReactiveIndex<K, V>
1764where
1765    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1766    V: Clone + Send + Sync + 'static,
1767{
1768    #[must_use]
1769    pub fn new(
1770        core: &Core,
1771        intern: InternFn<Vec<IndexRow<K, V>>>,
1772        opts: ReactiveIndexOptions<K, V>,
1773    ) -> Self {
1774        let node_id = core
1775            .register_state(HandleId::new(0), false)
1776            .expect("register_state for ReactiveIndex");
1777        let backend: Box<dyn IndexBackend<K, V>> = opts
1778            .backend
1779            .unwrap_or_else(|| Box::new(VecIndexBackend::new()));
1780        let mutation_log = if opts.mutation_log {
1781            Some(Vec::new())
1782        } else {
1783            None
1784        };
1785        let inner = IndexInner {
1786            backend,
1787            mutation_log,
1788            structure_name: opts.name,
1789            equals: opts.equals,
1790        };
1791        Self {
1792            inner: Mutex::new(inner),
1793            emitter: EmitHandle {
1794                node_id,
1795                intern,
1796                version: AtomicU64::new(0),
1797            },
1798            node_id,
1799        }
1800    }
1801
1802    #[must_use]
1803    pub fn size(&self) -> usize {
1804        self.inner.lock().backend.size()
1805    }
1806
1807    #[must_use]
1808    pub fn has(&self, primary: &K) -> bool {
1809        self.inner.lock().backend.has(primary)
1810    }
1811
1812    #[must_use]
1813    pub fn get(&self, primary: &K) -> Option<V> {
1814        self.inner.lock().backend.get(primary)
1815    }
1816
1817    /// Insert or update a row. Returns `true` if inserted (new primary key),
1818    /// `false` if updated or skipped by equals idempotency.
1819    pub fn upsert(&self, core: &Core, primary: K, secondary: String, value: V) -> bool {
1820        self.upsert_with(core, primary, secondary, value, &UpsertOptions::default())
1821    }
1822
1823    /// Insert or update a row with per-call options.
1824    ///
1825    /// If `opts.equals` (or the factory-level equals) returns `true` for the
1826    /// existing row vs the proposed row, the upsert is a no-op (no emission).
1827    pub fn upsert_with(
1828        &self,
1829        core: &Core,
1830        primary: K,
1831        secondary: String,
1832        value: V,
1833        opts: &UpsertOptions<K, V>,
1834    ) -> bool {
1835        let (is_new, snapshot, change) = {
1836            let mut inner = self.inner.lock();
1837            // Check equals idempotency: per-call overrides factory-level.
1838            let eq_fn = opts.equals.as_ref().or(inner.equals.as_ref());
1839            if let Some(eq) = eq_fn {
1840                if let Some(existing_row) = inner.backend.get_row(&primary) {
1841                    let proposed = IndexRow {
1842                        primary: primary.clone(),
1843                        secondary: secondary.clone(),
1844                        value: value.clone(),
1845                    };
1846                    if eq(&existing_row, &proposed) {
1847                        return false;
1848                    }
1849                }
1850            }
1851            let change = inner.mutation_log.is_some().then(|| IndexChange::Upsert {
1852                primary: primary.clone(),
1853                secondary: secondary.clone(),
1854                value: value.clone(),
1855            });
1856            let is_new = inner.backend.upsert(primary, secondary, value);
1857            (is_new, inner.backend.to_ordered(), change)
1858        };
1859        let version = self.emitter.emit(core, snapshot);
1860        if let Some(change) = change {
1861            self.inner.lock().record(change, version);
1862        }
1863        is_new
1864    }
1865
1866    /// Batch upsert. Rows matching the factory-level equals are skipped.
1867    /// If ALL rows are skipped, no emission occurs.
1868    pub fn upsert_many(&self, core: &Core, rows: Vec<(K, String, V)>) {
1869        if rows.is_empty() {
1870            return;
1871        }
1872        let (snapshot, changes) = {
1873            let mut inner = self.inner.lock();
1874            // Filter rows through equals idempotency.
1875            let effective_rows: Vec<(K, String, V)> = if let Some(eq) = &inner.equals {
1876                rows.into_iter()
1877                    .filter(|(pk, sec, val)| {
1878                        if let Some(existing) = inner.backend.get_row(pk) {
1879                            let proposed = IndexRow {
1880                                primary: pk.clone(),
1881                                secondary: sec.clone(),
1882                                value: val.clone(),
1883                            };
1884                            !eq(&existing, &proposed)
1885                        } else {
1886                            true
1887                        }
1888                    })
1889                    .collect()
1890            } else {
1891                rows
1892            };
1893            if effective_rows.is_empty() {
1894                return;
1895            }
1896            let changes: Option<Vec<IndexChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1897                effective_rows
1898                    .iter()
1899                    .map(|(k, s, v)| IndexChange::Upsert {
1900                        primary: k.clone(),
1901                        secondary: s.clone(),
1902                        value: v.clone(),
1903                    })
1904                    .collect()
1905            });
1906            inner.backend.upsert_many(effective_rows);
1907            (inner.backend.to_ordered(), changes)
1908        };
1909        let version = self.emitter.emit(core, snapshot);
1910        if let Some(changes) = changes {
1911            let mut inner = self.inner.lock();
1912            for change in changes {
1913                inner.record(change, version.clone());
1914            }
1915        }
1916    }
1917
1918    pub fn delete(&self, core: &Core, primary: &K) {
1919        let snapshot = {
1920            let mut inner = self.inner.lock();
1921            if !inner.backend.delete(primary) {
1922                return;
1923            }
1924            inner.backend.to_ordered()
1925        };
1926        let version = self.emitter.emit(core, snapshot);
1927        self.inner.lock().record(
1928            IndexChange::Delete {
1929                primary: primary.clone(),
1930            },
1931            version,
1932        );
1933    }
1934
1935    pub fn delete_many(&self, core: &Core, primaries: &[K]) {
1936        let (snapshot, actually_deleted) = {
1937            let mut inner = self.inner.lock();
1938            // Pre-filter to keys that actually exist (P4 fix).
1939            let actually_deleted: Vec<K> = if inner.mutation_log.is_some() {
1940                primaries
1941                    .iter()
1942                    .filter(|k| inner.backend.has(k))
1943                    .cloned()
1944                    .collect()
1945            } else {
1946                vec![]
1947            };
1948            let removed = inner.backend.delete_many(primaries);
1949            if removed == 0 {
1950                return;
1951            }
1952            (inner.backend.to_ordered(), actually_deleted)
1953        };
1954        let version = self.emitter.emit(core, snapshot);
1955        if !actually_deleted.is_empty() {
1956            self.inner.lock().record(
1957                IndexChange::DeleteMany {
1958                    primaries: actually_deleted,
1959                },
1960                version,
1961            );
1962        }
1963    }
1964
1965    pub fn clear(&self, core: &Core) {
1966        let (snapshot, count) = {
1967            let mut inner = self.inner.lock();
1968            let count = inner.backend.clear();
1969            if count == 0 {
1970                return;
1971            }
1972            (inner.backend.to_ordered(), count)
1973        };
1974        let version = self.emitter.emit(core, snapshot);
1975        self.inner
1976            .lock()
1977            .record(IndexChange::Clear { count }, version);
1978    }
1979
1980    #[must_use]
1981    pub fn to_ordered(&self) -> Vec<IndexRow<K, V>> {
1982        self.inner.lock().backend.to_ordered()
1983    }
1984
1985    #[must_use]
1986    pub fn to_primary_map(&self) -> Vec<(K, V)> {
1987        self.inner.lock().backend.to_primary_map()
1988    }
1989
1990    /// Values of all rows whose **primary key** sorts within `[start, end)`
1991    /// (inclusive start, exclusive end), in **ascending primary-key order**
1992    /// (D205). `start >= end` or no matches → empty vec. The `K: Ord` bound
1993    /// is method-scoped so it does not constrain `ReactiveIndex<K, V>` itself.
1994    #[must_use]
1995    pub fn range_by_primary(&self, start: &K, end: &K) -> Vec<V>
1996    where
1997        K: Ord,
1998    {
1999        let mut rows: Vec<(K, V)> = self
2000            .inner
2001            .lock()
2002            .backend
2003            .to_primary_map()
2004            .into_iter()
2005            .filter(|(k, _)| k >= start && k < end)
2006            .collect();
2007        rows.sort_by(|a, b| a.0.cmp(&b.0));
2008        rows.into_iter().map(|(_, v)| v).collect()
2009    }
2010
2011    #[must_use]
2012    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<IndexChange<K, V>>>> {
2013        self.inner.lock().mutation_log.clone()
2014    }
2015}