Skip to main content

graphrefly_structures/
reactive.rs

1//! Reactive data structures — Core-level integration (M5.A — D177/D179).
2//!
3//! Each structure wraps a backend trait behind a `parking_lot::Mutex`, owns a
4//! state `NodeId` registered with Core, and emits DIRTY->DATA snapshots on
5//! every mutation. Structures are standalone — no Graph dependency required.
6//!
7//! **Lock discipline (P1 fix):** Mutation methods hold the inner lock only
8//! during backend mutation and snapshot collection. The lock is dropped before
9//! `Core::emit()` is called, so subscribers are free to read (but not mutate)
10//! the structure during message delivery. The `EmitHandle` lives outside the
11//! inner `Mutex` and uses `AtomicU64` for thread-safe version counting.
12
13use std::collections::HashMap;
14use std::hash::Hash;
15use std::rc::Rc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20
21use graphrefly_core::{
22    monotonic_ns, wall_clock_ns, Core, CoreMailbox, HandleId, Message, NodeId, SubscriptionId,
23};
24
25use crate::backend::{
26    HashMapBackend, IndexBackend, IndexRow, ListBackend, LogBackend, MapBackend, VecIndexBackend,
27    VecListBackend, VecLogBackend,
28};
29use crate::changeset::{
30    BaseChange, DeleteReason, IndexChange, Lifecycle, ListChange, LogChange, MapChange, Version,
31};
32
33// ---------------------------------------------------------------------------
34// Intern function type — user-supplied value->handle conversion
35// ---------------------------------------------------------------------------
36
37/// User-supplied function that converts a snapshot value `Vec<T>` (or similar)
38/// into a `HandleId` for Core emission. The binding owns the value registry;
39/// this closure captures `Arc<Binding>` and calls `binding.intern(snapshot)`.
40pub type InternFn<S> = Arc<dyn Fn(S) -> HandleId + Send + Sync>;
41
42// ---------------------------------------------------------------------------
43// EmitHandle — outside Mutex, thread-safe emission
44// ---------------------------------------------------------------------------
45
46/// Emission handle stored outside the inner Mutex. Uses `AtomicU64` for the
47/// version counter so no lock is needed during the emit.
48///
49/// D246 rule 6: structure mutation is owner-synchronous. The handle no
50/// longer holds the `Arc<CoreMailbox>`; the owner threads its `&Core`
51/// into each mutation method and `emit` calls `core.emit(..)` directly.
52struct EmitHandle<S> {
53    node_id: NodeId,
54    intern: InternFn<S>,
55    version: AtomicU64,
56}
57
58impl<S> EmitHandle<S> {
59    /// Bump version, intern snapshot, emit synchronously owner-side.
60    /// Returns the new version.
61    fn emit(&self, core: &Core, snapshot: S) -> Version {
62        let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
63        let handle = (self.intern)(snapshot);
64        core.emit(self.node_id, handle);
65        Version::Counter(ver)
66    }
67}
68
69/// In-wave sink emitter. Captured into long-lived `core.subscribe(..)`
70/// sink closures (`view`/`scan`) that fire IN-WAVE — re-entering Core
71/// synchronously from inside a wave is unsound, so it posts
72/// `MailboxOp::Emit` (deferred, drained owner-side / at the embedder
73/// pump). This is genuine deferred sink re-entry, correct under D246
74/// rule 6 (same mechanism `attach`/`attach_storage` use).
75struct SinkEmitter<S> {
76    mailbox: Arc<CoreMailbox>,
77    node_id: NodeId,
78    intern: InternFn<S>,
79    version: AtomicU64,
80}
81
82impl<S> SinkEmitter<S> {
83    /// Bump version, intern snapshot, post the deferred emit. Returns
84    /// the new version.
85    fn emit(&self, snapshot: S) -> Version {
86        let ver = self.version.fetch_add(1, Ordering::Relaxed) + 1;
87        let handle = (self.intern)(snapshot);
88        // Core-gone (`false`) ⇒ the op is dropped unrun — identical to
89        // the pre-S2b `WeakCore::upgrade()` → `None` no-op (the
90        // structure is being torn down; the binding owns intern/release
91        // so there is no handle leak the old path didn't also have).
92        let _ = self.mailbox.post_emit(self.node_id, handle);
93        Version::Counter(ver)
94    }
95}
96
97// ---------------------------------------------------------------------------
98// IndexOutOfBounds error
99// ---------------------------------------------------------------------------
100
101/// Error returned when a list `insert` or `insert_many` index is out of bounds.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub struct IndexOutOfBounds;
104
105impl std::fmt::Display for IndexOutOfBounds {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.write_str("index out of bounds")
108    }
109}
110
111impl std::error::Error for IndexOutOfBounds {}
112
113// ---------------------------------------------------------------------------
114// ReactiveLog
115// ---------------------------------------------------------------------------
116
117/// Reactive append-only log with optional ring-buffer cap.
118///
119/// Emits `Vec<T>` snapshots via a Core state node on every mutation.
120/// Optionally records `BaseChange<LogChange<T>>` deltas to a companion
121/// mutation log.
122pub struct ReactiveLog<T: Clone + Send + Sync + 'static> {
123    inner: Arc<Mutex<LogInner<T>>>,
124    emitter: EmitHandle<Vec<T>>,
125    /// The Core state node backing this structure.
126    pub node_id: NodeId,
127}
128
129struct LogInner<T: Clone + Send + Sync + 'static> {
130    backend: Box<dyn LogBackend<T>>,
131    mutation_log: Option<Vec<BaseChange<LogChange<T>>>>,
132    structure_name: String,
133}
134
135impl<T: Clone + Send + Sync + 'static> LogInner<T> {
136    fn record(&mut self, change: LogChange<T>, version: Version) {
137        if let Some(log) = &mut self.mutation_log {
138            log.push(BaseChange {
139                structure: self.structure_name.clone(),
140                version,
141                t_ns: wall_clock_ns(),
142                seq: None,
143                lifecycle: Lifecycle::Data,
144                change,
145            });
146        }
147    }
148}
149
150/// Options for constructing a [`ReactiveLog`].
151pub struct ReactiveLogOptions<T: Clone + Send + Sync + 'static> {
152    pub name: String,
153    pub max_size: Option<usize>,
154    pub backend: Option<Box<dyn LogBackend<T>>>,
155    pub mutation_log: bool,
156}
157
158impl<T: Clone + Send + Sync + 'static> Default for ReactiveLogOptions<T> {
159    fn default() -> Self {
160        Self {
161            name: "reactiveLog".into(),
162            max_size: None,
163            backend: None,
164            mutation_log: false,
165        }
166    }
167}
168
169impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
170    /// Create a new reactive log registered with the given Core.
171    ///
172    /// `intern` converts a `Vec<T>` snapshot to a `HandleId` for Core emission.
173    #[must_use]
174    pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveLogOptions<T>) -> Self {
175        let node_id = core
176            .register_state(HandleId::new(0), false)
177            .expect("register_state for ReactiveLog");
178        let backend: Box<dyn LogBackend<T>> = opts
179            .backend
180            .unwrap_or_else(|| Box::new(VecLogBackend::new(opts.max_size)));
181        let mutation_log = if opts.mutation_log {
182            Some(Vec::new())
183        } else {
184            None
185        };
186        let inner = LogInner {
187            backend,
188            mutation_log,
189            structure_name: opts.name,
190        };
191        Self {
192            inner: Arc::new(Mutex::new(inner)),
193            emitter: EmitHandle {
194                node_id,
195                intern,
196                version: AtomicU64::new(0),
197            },
198            node_id,
199        }
200    }
201
202    #[must_use]
203    pub fn size(&self) -> usize {
204        self.inner.lock().backend.size()
205    }
206
207    #[must_use]
208    pub fn at(&self, index: i64) -> Option<T> {
209        self.inner.lock().backend.at(index)
210    }
211
212    pub fn append(&self, core: &Core, value: T) {
213        let (snapshot, change) = {
214            let mut inner = self.inner.lock();
215            let change = inner.mutation_log.is_some().then(|| LogChange::Append {
216                value: value.clone(),
217            });
218            inner.backend.append(value);
219            (inner.backend.to_vec(), change)
220        };
221        let version = self.emitter.emit(core, snapshot);
222        if let Some(change) = change {
223            self.inner.lock().record(change, version);
224        }
225    }
226
227    pub fn append_many(&self, core: &Core, values: Vec<T>) {
228        if values.is_empty() {
229            return;
230        }
231        let (snapshot, change) = {
232            let mut inner = self.inner.lock();
233            let change = inner.mutation_log.is_some().then(|| LogChange::AppendMany {
234                values: values.clone(),
235            });
236            inner.backend.append_many(values);
237            (inner.backend.to_vec(), change)
238        };
239        let version = self.emitter.emit(core, snapshot);
240        if let Some(change) = change {
241            self.inner.lock().record(change, version);
242        }
243    }
244
245    pub fn clear(&self, core: &Core) {
246        let (snapshot, count) = {
247            let mut inner = self.inner.lock();
248            let count = inner.backend.clear();
249            if count == 0 {
250                return;
251            }
252            (inner.backend.to_vec(), count)
253        };
254        let version = self.emitter.emit(core, snapshot);
255        self.inner
256            .lock()
257            .record(LogChange::Clear { count }, version);
258    }
259
260    pub fn trim_head(&self, core: &Core, n: usize) {
261        if n == 0 {
262            return;
263        }
264        let (snapshot, actual) = {
265            let mut inner = self.inner.lock();
266            let actual = inner.backend.trim_head(n);
267            if actual == 0 {
268                return;
269            }
270            (inner.backend.to_vec(), actual)
271        };
272        let version = self.emitter.emit(core, snapshot);
273        self.inner
274            .lock()
275            .record(LogChange::TrimHead { n: actual }, version);
276    }
277
278    #[must_use]
279    pub fn to_vec(&self) -> Vec<T> {
280        self.inner.lock().backend.to_vec()
281    }
282
283    /// Snapshot the mutation log (if enabled). Returns `None` if mutation log
284    /// was not enabled at construction.
285    #[must_use]
286    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<LogChange<T>>>> {
287        self.inner.lock().mutation_log.clone()
288    }
289}
290
291// ---------------------------------------------------------------------------
292// ReactiveLog — views, scan, attach, attachStorage (M5.B)
293// ---------------------------------------------------------------------------
294
295/// View specification for [`ReactiveLog::view`].
296pub enum ViewSpec {
297    /// Last `n` entries.
298    Tail { n: usize },
299    /// Range `[start..stop)`. `stop` defaults to log length.
300    Slice { start: usize, stop: Option<usize> },
301    /// Entries from cursor position onward. The cursor node provides a `usize`
302    /// position via `read_cursor(handle) -> usize`.
303    FromCursor {
304        cursor_node: NodeId,
305        read_cursor: Arc<dyn Fn(HandleId) -> usize + Send + Sync>,
306    },
307}
308
309/// D246 rule 3: NO RAII unsubscribe below the binding. A subscription
310/// bundle that tracks its `(NodeId, SubscriptionId)` subs and is torn
311/// down by the **owner-invoked, synchronous** [`ReactiveSub::detach`] —
312/// not `Drop`. Carries no `&Core` (the `'c` lifetime is gone); the
313/// owner passes its `&Core` into `detach`.
314pub struct ReactiveSub {
315    subs: Vec<(NodeId, SubscriptionId)>,
316}
317
318impl ReactiveSub {
319    /// Synchronously unsubscribe every tracked sub. Owner-invoked
320    /// (D246 r3). Idempotent: subs are drained, so a second call is a
321    /// no-op.
322    pub fn detach(&mut self, core: &Core) {
323        for (node_id, sub_id) in self.subs.drain(..) {
324            core.unsubscribe(node_id, sub_id);
325        }
326    }
327}
328
329/// Handle to a log view. Call [`LogView::detach`] to dispose the view
330/// subscription (no RAII — D246 r3).
331pub struct LogView {
332    /// The state node backing this view. Subscribe to receive view snapshots.
333    pub node_id: NodeId,
334    sub: ReactiveSub,
335}
336
337impl LogView {
338    /// Synchronously dispose the view subscription. Owner-invoked.
339    pub fn detach(&mut self, core: &Core) {
340        self.sub.detach(core);
341    }
342}
343
344/// Handle to a log scan. Call [`ScanHandle::detach`] to dispose the
345/// scan subscription (no RAII — D246 r3).
346pub struct ScanHandle {
347    /// The state node backing this scan. Subscribe to receive accumulator snapshots.
348    pub node_id: NodeId,
349    sub: ReactiveSub,
350}
351
352impl ScanHandle {
353    /// Synchronously dispose the scan subscription. Owner-invoked.
354    pub fn detach(&mut self, core: &Core) {
355        self.sub.detach(core);
356    }
357}
358
359/// **Local mirror of `graphrefly_storage::AppendLogMode`** (next batch
360/// 2026-05-21). `graphrefly-storage` already depends on
361/// `graphrefly-structures` (for `BaseChange<T>`), so we can't forward
362/// the enum the other direction without creating a dep cycle. The two
363/// declarations are intentionally identical and pre-1.0 stable; if a
364/// third mode is ever added, both must move together (`Append` /
365/// `Overwrite` are the locked D269 vocabulary).
366#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
367pub enum AppendLogMode {
368    /// Read existing bucket, merge new entries, write back. M4.B default.
369    #[default]
370    Append,
371    /// Replace bucket contents with the current batch (no read-merge).
372    /// `ReactiveLog::attach_storage` rejects sinks declaring this mode —
373    /// delta-shipping into an overwrite sink silently truncates the log
374    /// to the last batch (memo:Re P1 hazard class).
375    Overwrite,
376}
377
378/// **Error returned by [`ReactiveLog::attach_storage`]** (next batch
379/// 2026-05-21 — D269 part-2 parity with TS `attachStorage` throw).
380#[derive(Debug)]
381pub enum AttachStorageError {
382    /// A sink declared `AppendLogMode::Overwrite`. Delta-shipping into
383    /// an overwrite sink silently truncates the log to the last batch.
384    OverwriteSinkRejected { sink_index: usize },
385}
386
387impl std::fmt::Display for AttachStorageError {
388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389        match self {
390            Self::OverwriteSinkRejected { sink_index } => write!(
391                f,
392                "attach_storage: sink[{sink_index}] declares AppendLogMode::Overwrite — \
393                 delta-shipping into an overwrite sink truncates; use an Append-mode sink",
394            ),
395        }
396    }
397}
398
399impl std::error::Error for AttachStorageError {}
400
401/// Trait for append-log storage sinks used by [`ReactiveLog::attach_storage`].
402///
403/// Implementors receive batches of entries and optionally support pre-loading.
404/// `append_entries` / `load_entries` / `flush` errors are swallowed by
405/// `attach_storage`'s in-wave delivery sink (matches TS try/catch semantics) so
406/// a failing tier doesn't break the reactive graph. `mode` is consulted at
407/// attach time only.
408pub trait AppendLogSink<T>: Send + Sync {
409    /// Append entries to persistent storage.
410    fn append_entries(&self, entries: &[T])
411        -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
412    /// Load previously stored entries (for pre-loading on startup).
413    /// Returns an empty vec if no entries are stored.
414    fn load_entries(&self) -> Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>;
415    /// **D269 part-2 parity** — sink's persistence mode. `attach_storage`
416    /// rejects `Overwrite` sinks at attach time. Default `Append` so
417    /// existing impls work unchanged.
418    fn mode(&self) -> AppendLogMode {
419        AppendLogMode::Append
420    }
421    /// **F9 parity (Option B per D171 precedent)** — flush any buffered
422    /// writes synchronously. Drives [`AttachStorageHandle::flush_all`],
423    /// which callers wire from a reactive timer subgraph
424    /// (`graphrefly_operators::temporal::interval`). Default no-op so
425    /// non-buffering sinks need no implementation.
426    fn flush(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
427        Ok(())
428    }
429}
430
431/// Handle to an attach-storage subscription. Call
432/// [`AttachStorageHandle::detach`] to dispose it (no RAII — D246 r3).
433///
434/// **Generic over `T`** (next batch 2026-05-21) — the handle retains
435/// shared references to the sinks so [`AttachStorageHandle::flush_all`]
436/// can drive their `flush()` methods. Callers wire
437/// `interval(debounce_ms) → handle.flush_all()` (Option B per D171
438/// precedent — mirrors `graphrefly_storage::StorageHandle::flush_all`).
439pub struct AttachStorageHandle<T> {
440    sub: ReactiveSub,
441    sinks: Vec<Arc<dyn AppendLogSink<T>>>,
442}
443
444impl<T> AttachStorageHandle<T> {
445    /// Synchronously dispose the attach-storage subscription.
446    /// Owner-invoked.
447    pub fn detach(&mut self, core: &Core) {
448        self.sub.detach(core);
449    }
450    /// **F9 parity** — drive every sink's `flush()` synchronously.
451    /// Returns the first error encountered (with a `sink[N] flush:`
452    /// prefix for diagnostics); subsequent sinks are NOT attempted
453    /// after a failure (matches `graphrefly_storage::StorageHandle::flush_all`).
454    /// Callers wire this from a reactive timer subgraph; the handle
455    /// owns no internal timer (no tokio in `graphrefly-structures`).
456    ///
457    /// **Caller obligation (D271 QA EC-2):** callers MUST NOT hold a
458    /// lock that any sink's `flush()` impl might try to acquire.
459    /// `flush_all` calls each sink synchronously and holds no internal
460    /// lock during the call, so a sink that re-enters a caller-held
461    /// lock would deadlock. Real-world sinks (`graphrefly-storage`'s
462    /// `AppendLogStorage::flush` is the canonical example) take only
463    /// their own internal locks — wiring `flush_all` from a reactive
464    /// timer subgraph is therefore deadlock-free by construction.
465    #[must_use = "ignoring AttachStorageHandle::flush_all's Result swallows write errors"]
466    pub fn flush_all(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
467        for (i, sink) in self.sinks.iter().enumerate() {
468            if let Err(e) = sink.flush() {
469                return Err(format!("sink[{i}] flush: {e}").into());
470            }
471        }
472        Ok(())
473    }
474}
475
476/// **D270 — `AttachOptions { skip_cached_replay }` (memo:Re P2 parity).**
477/// Mirrors TS `ReactiveLogBundle.attach(upstream, opts?)`: when
478/// `skip_cached_replay = true` AND the upstream has a cached value
479/// (`Core::cache_of(upstream)` non-sentinel), drop the FIRST DATA-
480/// bearing batch the attach sink receives — i.e. the subscribe-
481/// handshake replay. Subsequent live emissions still land. A cold
482/// upstream's first live emit is NOT dropped (the `cache_of` check
483/// gates the suppression). The flag has no effect when the upstream's
484/// cache is sentinel (no replay to skip).
485#[derive(Debug, Clone, Copy, Default)]
486pub struct AttachOptions {
487    pub skip_cached_replay: bool,
488}
489
490impl<T: Clone + Send + Sync + 'static> ReactiveLog<T> {
491    /// Create a reactive view of this log. Returns a [`LogView`] whose
492    /// `node_id` emits `Vec<T>` snapshots on every log mutation.
493    ///
494    /// `intern` converts the view `Vec<T>` into a `HandleId` for Core emission.
495    /// β/D231/D246: takes the owner's `&Core` (no stored Core under the
496    /// actor model). The returned [`LogView`] carries no Core borrow;
497    /// tear it down via [`LogView::detach`] (D246 r3 — no RAII).
498    #[allow(clippy::too_many_lines)]
499    pub fn view(&self, core: &Core, spec: ViewSpec, intern: InternFn<Vec<T>>) -> LogView {
500        let view_node = core
501            .register_state(HandleId::new(0), false)
502            .expect("register_state for LogView");
503        // In-wave sink emitter: the `core.subscribe(..)` closures below
504        // fire IN-WAVE, so emission must be deferred (D246 r6).
505        let view_emitter = Arc::new(SinkEmitter {
506            mailbox: core.mailbox(),
507            node_id: view_node,
508            intern,
509            version: AtomicU64::new(0),
510        });
511
512        let inner = Arc::clone(&self.inner);
513        let mut subscriptions: Vec<(NodeId, SubscriptionId)> = Vec::new();
514
515        match spec {
516            ViewSpec::Tail { n } => {
517                let inner_c = Arc::clone(&inner);
518                let emitter_c = Arc::clone(&view_emitter);
519                let sub = core.subscribe(
520                    self.node_id,
521                    Rc::new(move |msgs| {
522                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
523                            let guard = inner_c.lock();
524                            let data = guard.backend.to_vec();
525                            let start = data.len().saturating_sub(n);
526                            let view = data[start..].to_vec();
527                            drop(guard);
528                            emitter_c.emit(view);
529                        }
530                    }),
531                );
532                subscriptions.push((self.node_id, sub));
533            }
534            ViewSpec::Slice { start, stop } => {
535                let inner_c = Arc::clone(&inner);
536                let emitter_c = Arc::clone(&view_emitter);
537                let sub = core.subscribe(
538                    self.node_id,
539                    Rc::new(move |msgs| {
540                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
541                            let guard = inner_c.lock();
542                            let data = guard.backend.to_vec();
543                            let end = stop.unwrap_or(data.len()).min(data.len());
544                            let s = start.min(end);
545                            let view = data[s..end].to_vec();
546                            drop(guard);
547                            emitter_c.emit(view);
548                        }
549                    }),
550                );
551                subscriptions.push((self.node_id, sub));
552            }
553            ViewSpec::FromCursor {
554                cursor_node,
555                read_cursor,
556            } => {
557                let cursor_pos = Arc::new(Mutex::new(0usize));
558
559                // Cursor subscription: update position and recompute.
560                let cursor_pos_c = Arc::clone(&cursor_pos);
561                let inner_c = Arc::clone(&inner);
562                let emitter_c = Arc::clone(&view_emitter);
563                let read_cursor_c = Arc::clone(&read_cursor);
564                let sub_cursor = core.subscribe(
565                    cursor_node,
566                    Rc::new(move |msgs| {
567                        for m in msgs {
568                            if let Message::Data(h) = m {
569                                let pos = read_cursor_c(*h);
570                                *cursor_pos_c.lock() = pos;
571                                let guard = inner_c.lock();
572                                let data = guard.backend.to_vec();
573                                let s = pos.min(data.len());
574                                let view = data[s..].to_vec();
575                                drop(guard);
576                                emitter_c.emit(view);
577                            }
578                        }
579                    }),
580                );
581                subscriptions.push((cursor_node, sub_cursor));
582
583                // Log subscription: recompute with current cursor.
584                let cursor_pos_c2 = Arc::clone(&cursor_pos);
585                let inner_c2 = Arc::clone(&inner);
586                let emitter_c2 = view_emitter;
587                let sub_log = core.subscribe(
588                    self.node_id,
589                    Rc::new(move |msgs| {
590                        if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
591                            let pos = *cursor_pos_c2.lock();
592                            let guard = inner_c2.lock();
593                            let data = guard.backend.to_vec();
594                            let s = pos.min(data.len());
595                            let view = data[s..].to_vec();
596                            drop(guard);
597                            emitter_c2.emit(view);
598                        }
599                    }),
600                );
601                subscriptions.push((self.node_id, sub_log));
602            }
603        }
604
605        LogView {
606            node_id: view_node,
607            sub: ReactiveSub {
608                subs: subscriptions,
609            },
610        }
611    }
612
613    /// Incremental running aggregate over the log.
614    ///
615    /// Returns a [`ScanHandle`] whose `node_id` emits the current accumulator
616    /// on every log mutation. Appends are O(1) (only new entries processed);
617    /// `trimHead`/`clear` trigger a full rescan.
618    pub fn scan<TAcc: Clone + Send + Sync + 'static>(
619        &self,
620        core: &Core,
621        initial: TAcc,
622        step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
623        intern: InternFn<TAcc>,
624    ) -> ScanHandle {
625        struct ScanState<T, TAcc> {
626            acc: TAcc,
627            processed: usize,
628            initial: TAcc,
629            step: Arc<dyn Fn(&TAcc, &T) -> TAcc + Send + Sync>,
630        }
631
632        let scan_node = core
633            .register_state(HandleId::new(0), false)
634            .expect("register_state for Scan");
635
636        let state = Arc::new(Mutex::new(ScanState {
637            acc: initial.clone(),
638            processed: 0,
639            initial,
640            step,
641        }));
642        let inner = Arc::clone(&self.inner);
643        // In-wave sink emitter (D246 r6): the `core.subscribe(..)`
644        // closure below fires IN-WAVE, so emission is deferred.
645        let scan_emitter = Arc::new(SinkEmitter {
646            mailbox: core.mailbox(),
647            node_id: scan_node,
648            intern,
649            version: AtomicU64::new(0),
650        });
651
652        let sub = core.subscribe(
653            self.node_id,
654            Rc::new(move |msgs| {
655                if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
656                    let mut ss = state.lock();
657                    let guard = inner.lock();
658                    let data = guard.backend.to_vec();
659                    drop(guard);
660
661                    if data.len() < ss.processed {
662                        // Length decreased (clear/trim) — full rescan.
663                        ss.acc = ss.initial.clone();
664                        ss.processed = 0;
665                    }
666                    for item in &data[ss.processed..] {
667                        ss.acc = (ss.step)(&ss.acc, item);
668                    }
669                    ss.processed = data.len();
670                    let acc = ss.acc.clone();
671                    drop(ss);
672                    scan_emitter.emit(acc);
673                }
674            }),
675        );
676
677        ScanHandle {
678            node_id: scan_node,
679            sub: ReactiveSub {
680                subs: vec![(self.node_id, sub)],
681            },
682        }
683    }
684
685    /// Subscribe to an upstream node and append every DATA value into this log.
686    ///
687    /// `read_value` converts the upstream `HandleId` to a `T` for appending.
688    /// Returns a [`ReactiveSub`] — call [`ReactiveSub::detach`] to stop
689    /// the attachment (D246 r3 — no RAII).
690    /// β/D231: takes the owner's `&Core`.
691    pub fn attach(
692        &self,
693        core: &Core,
694        upstream: NodeId,
695        read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
696    ) -> ReactiveSub {
697        self.attach_with_options(core, upstream, read_value, AttachOptions::default())
698    }
699
700    /// D270 — variant of [`Self::attach`] accepting [`AttachOptions`].
701    /// Use when you need `skip_cached_replay` or future attach knobs;
702    /// the no-option `attach` delegates here with `AttachOptions::default()`
703    /// for back-compat callers.
704    pub fn attach_with_options(
705        &self,
706        core: &Core,
707        upstream: NodeId,
708        read_value: Arc<dyn Fn(HandleId) -> T + Send + Sync>,
709        opts: AttachOptions,
710    ) -> ReactiveSub {
711        let inner = Arc::clone(&self.inner);
712        // β/D232-AMEND: the long-lived attach sink re-enters Core to
713        // emit; it captures the `Send + Sync` `Arc<CoreMailbox>` and
714        // posts `MailboxOp::Emit` (drained owner-side in-wave / at the
715        // pump) — it can no longer hold a relocatable `WeakCore`.
716        let mailbox = core.mailbox();
717        let node_id = self.node_id;
718        let intern = Arc::clone(&self.emitter.intern);
719
720        // D270: gate the "skip first DATA-bearing batch" decision on
721        // the upstream having a cached value at attach time. The flag
722        // is one-shot — once a DATA batch arrives and is dropped, the
723        // sink delivers normally for all subsequent batches.
724        let suppress = if opts.skip_cached_replay {
725            let cache = core.cache_of(upstream);
726            cache != graphrefly_core::NO_HANDLE
727        } else {
728            false
729        };
730        let skip_state = Arc::new(parking_lot::Mutex::new(suppress));
731
732        let sub = core.subscribe(
733            upstream,
734            Rc::new(move |msgs| {
735                // D270: check-and-clear the skip flag if THIS batch
736                // carries DATA. Locks-then-clears in one atomic step
737                // so the next batch (live emission) is not skipped.
738                let should_skip = {
739                    let mut s = skip_state.lock();
740                    let has_data = msgs.iter().any(|m| matches!(m, Message::Data(_)));
741                    let do_skip = *s && has_data;
742                    if do_skip {
743                        *s = false;
744                    }
745                    do_skip
746                };
747                if should_skip {
748                    return;
749                }
750                for m in msgs {
751                    if let Message::Data(h) = m {
752                        let value = read_value(*h);
753                        let snapshot = {
754                            let mut guard = inner.lock();
755                            guard.backend.append(value);
756                            guard.backend.to_vec()
757                        };
758                        let handle = (intern)(snapshot);
759                        // Core-gone ⇒ drop-unrun, same as the pre-S2b
760                        // `weak_core.upgrade()` → `None` no-op.
761                        let _ = mailbox.post_emit(node_id, handle);
762                    }
763                }
764            }),
765        );
766        ReactiveSub {
767            subs: vec![(upstream, sub)],
768        }
769    }
770
771    /// Wire append-log storage sinks to this log.
772    ///
773    /// If `preload` is true, attempts `load_entries()` on the first sink that
774    /// returns data and appends to this log before subscribing. After
775    /// subscription, deltas are forwarded to ALL sinks. On trim/clear, the
776    /// full snapshot is re-shipped. Sink errors are swallowed (logged via
777    /// `eprintln!`) so a failing tier doesn't break the reactive graph.
778    pub fn attach_storage(
779        &self,
780        core: &Core,
781        sinks: Vec<Arc<dyn AppendLogSink<T>>>,
782        preload: bool,
783    ) -> Result<AttachStorageHandle<T>, AttachStorageError> {
784        // **D269 part-2 parity (next batch 2026-05-21)** — reject
785        // `Overwrite` sinks loud (TS `attachStorage` throws on the
786        // same shape). Delta-shipping into overwrite silently
787        // truncates the log to the last batch — the memo:Re P1
788        // hazard class.
789        for (sink_index, sink) in sinks.iter().enumerate() {
790            if sink.mode() == AppendLogMode::Overwrite {
791                return Err(AttachStorageError::OverwriteSinkRejected { sink_index });
792            }
793        }
794
795        if sinks.is_empty() {
796            let sub = core.subscribe(self.node_id, Rc::new(|_| {}));
797            return Ok(AttachStorageHandle {
798                sub: ReactiveSub {
799                    subs: vec![(self.node_id, sub)],
800                },
801                sinks,
802            });
803        }
804
805        if preload {
806            for sink in &sinks {
807                if let Ok(entries) = sink.load_entries() {
808                    if !entries.is_empty() {
809                        self.append_many(core, entries);
810                        break;
811                    }
812                }
813            }
814        }
815
816        let current_size = self.size();
817        // All sinks start delivered at current_size (post-preload snapshot).
818        // The subscriber only sees future emissions, so no double-write.
819        let delivered: Vec<Arc<Mutex<usize>>> = sinks
820            .iter()
821            .map(|_| Arc::new(Mutex::new(current_size)))
822            .collect();
823
824        let inner = Arc::clone(&self.inner);
825        let sinks_for_sink = sinks.clone();
826        let delivered_arc = delivered;
827
828        let sub = core.subscribe(
829            self.node_id,
830            Rc::new(move |msgs| {
831                if msgs.iter().any(|m| matches!(m, Message::Data(_))) {
832                    let guard = inner.lock();
833                    let data = guard.backend.to_vec();
834                    drop(guard);
835
836                    for (i, sink) in sinks_for_sink.iter().enumerate() {
837                        let mut del = delivered_arc[i].lock();
838                        let result = match data.len().cmp(&*del) {
839                            std::cmp::Ordering::Greater => sink.append_entries(&data[*del..]),
840                            std::cmp::Ordering::Less => sink.append_entries(&data),
841                            std::cmp::Ordering::Equal => continue,
842                        };
843                        match result {
844                            Ok(()) => *del = data.len(),
845                            Err(e) => eprintln!("attach_storage sink[{i}] error: {e}"),
846                        }
847                    }
848                }
849            }),
850        );
851
852        Ok(AttachStorageHandle {
853            sub: ReactiveSub {
854                subs: vec![(self.node_id, sub)],
855            },
856            sinks,
857        })
858    }
859}
860
861// ---------------------------------------------------------------------------
862// ReactiveList
863// ---------------------------------------------------------------------------
864
865/// Reactive ordered list.
866///
867/// Emits `Vec<T>` snapshots via a Core state node on every mutation.
868pub struct ReactiveList<T: Clone + Send + Sync + 'static> {
869    inner: Mutex<ListInner<T>>,
870    emitter: EmitHandle<Vec<T>>,
871    pub node_id: NodeId,
872}
873
874struct ListInner<T: Clone + Send + Sync + 'static> {
875    backend: Box<dyn ListBackend<T>>,
876    mutation_log: Option<Vec<BaseChange<ListChange<T>>>>,
877    structure_name: String,
878}
879
880impl<T: Clone + Send + Sync + 'static> ListInner<T> {
881    fn record(&mut self, change: ListChange<T>, version: Version) {
882        if let Some(log) = &mut self.mutation_log {
883            log.push(BaseChange {
884                structure: self.structure_name.clone(),
885                version,
886                t_ns: wall_clock_ns(),
887                seq: None,
888                lifecycle: Lifecycle::Data,
889                change,
890            });
891        }
892    }
893}
894
895/// Options for constructing a [`ReactiveList`].
896pub struct ReactiveListOptions<T: Clone + Send + Sync + 'static> {
897    pub name: String,
898    pub backend: Option<Box<dyn ListBackend<T>>>,
899    pub mutation_log: bool,
900}
901
902impl<T: Clone + Send + Sync + 'static> Default for ReactiveListOptions<T> {
903    fn default() -> Self {
904        Self {
905            name: "reactiveList".into(),
906            backend: None,
907            mutation_log: false,
908        }
909    }
910}
911
912impl<T: Clone + Send + Sync + 'static> ReactiveList<T> {
913    #[must_use]
914    pub fn new(core: &Core, intern: InternFn<Vec<T>>, opts: ReactiveListOptions<T>) -> Self {
915        let node_id = core
916            .register_state(HandleId::new(0), false)
917            .expect("register_state for ReactiveList");
918        let backend: Box<dyn ListBackend<T>> = opts
919            .backend
920            .unwrap_or_else(|| Box::new(VecListBackend::new()));
921        let mutation_log = if opts.mutation_log {
922            Some(Vec::new())
923        } else {
924            None
925        };
926        let inner = ListInner {
927            backend,
928            mutation_log,
929            structure_name: opts.name,
930        };
931        Self {
932            inner: Mutex::new(inner),
933            emitter: EmitHandle {
934                node_id,
935                intern,
936                version: AtomicU64::new(0),
937            },
938            node_id,
939        }
940    }
941
942    #[must_use]
943    pub fn size(&self) -> usize {
944        self.inner.lock().backend.size()
945    }
946
947    #[must_use]
948    pub fn at(&self, index: i64) -> Option<T> {
949        self.inner.lock().backend.at(index)
950    }
951
952    pub fn append(&self, core: &Core, value: T) {
953        let (snapshot, change) = {
954            let mut inner = self.inner.lock();
955            let change = inner.mutation_log.is_some().then(|| ListChange::Append {
956                value: value.clone(),
957            });
958            inner.backend.append(value);
959            (inner.backend.to_vec(), change)
960        };
961        let version = self.emitter.emit(core, snapshot);
962        if let Some(change) = change {
963            self.inner.lock().record(change, version);
964        }
965    }
966
967    pub fn append_many(&self, core: &Core, values: Vec<T>) {
968        if values.is_empty() {
969            return;
970        }
971        let (snapshot, change) = {
972            let mut inner = self.inner.lock();
973            let change = inner
974                .mutation_log
975                .is_some()
976                .then(|| ListChange::AppendMany {
977                    values: values.clone(),
978                });
979            inner.backend.append_many(values);
980            (inner.backend.to_vec(), change)
981        };
982        let version = self.emitter.emit(core, snapshot);
983        if let Some(change) = change {
984            self.inner.lock().record(change, version);
985        }
986    }
987
988    /// Insert `value` at `index`. Returns `Err(IndexOutOfBounds)` if
989    /// `index > self.size()`.
990    pub fn insert(&self, core: &Core, index: usize, value: T) -> Result<(), IndexOutOfBounds> {
991        let (snapshot, change) = {
992            let mut inner = self.inner.lock();
993            if index > inner.backend.size() {
994                return Err(IndexOutOfBounds);
995            }
996            let change = inner.mutation_log.is_some().then(|| ListChange::Insert {
997                index,
998                value: value.clone(),
999            });
1000            inner.backend.insert(index, value);
1001            (inner.backend.to_vec(), change)
1002        };
1003        let version = self.emitter.emit(core, snapshot);
1004        if let Some(change) = change {
1005            self.inner.lock().record(change, version);
1006        }
1007        Ok(())
1008    }
1009
1010    /// Insert `values` starting at `index`. Returns `Err(IndexOutOfBounds)` if
1011    /// `index > self.size()`.
1012    pub fn insert_many(
1013        &self,
1014        core: &Core,
1015        index: usize,
1016        values: Vec<T>,
1017    ) -> Result<(), IndexOutOfBounds> {
1018        if values.is_empty() {
1019            return Ok(());
1020        }
1021        let (snapshot, change) = {
1022            let mut inner = self.inner.lock();
1023            if index > inner.backend.size() {
1024                return Err(IndexOutOfBounds);
1025            }
1026            let change = inner
1027                .mutation_log
1028                .is_some()
1029                .then(|| ListChange::InsertMany {
1030                    index,
1031                    values: values.clone(),
1032                });
1033            inner.backend.insert_many(index, values);
1034            (inner.backend.to_vec(), change)
1035        };
1036        let version = self.emitter.emit(core, snapshot);
1037        if let Some(change) = change {
1038            self.inner.lock().record(change, version);
1039        }
1040        Ok(())
1041    }
1042
1043    /// Remove and return element at `index`. Negative indices count from end.
1044    /// Returns `None` if index is out of range.
1045    pub fn pop(&self, core: &Core, index: i64) -> Option<T> {
1046        let (value, snapshot, change) = {
1047            let mut inner = self.inner.lock();
1048            let value = inner.backend.pop(index)?;
1049            let change = inner.mutation_log.is_some().then(|| ListChange::Pop {
1050                index,
1051                value: value.clone(),
1052            });
1053            let snapshot = inner.backend.to_vec();
1054            (value, snapshot, change)
1055        };
1056        let version = self.emitter.emit(core, snapshot);
1057        if let Some(change) = change {
1058            self.inner.lock().record(change, version);
1059        }
1060        Some(value)
1061    }
1062
1063    pub fn clear(&self, core: &Core) {
1064        let (snapshot, count) = {
1065            let mut inner = self.inner.lock();
1066            let count = inner.backend.clear();
1067            if count == 0 {
1068                return;
1069            }
1070            (inner.backend.to_vec(), count)
1071        };
1072        let version = self.emitter.emit(core, snapshot);
1073        self.inner
1074            .lock()
1075            .record(ListChange::Clear { count }, version);
1076    }
1077
1078    #[must_use]
1079    pub fn to_vec(&self) -> Vec<T> {
1080        self.inner.lock().backend.to_vec()
1081    }
1082
1083    #[must_use]
1084    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<ListChange<T>>>> {
1085        self.inner.lock().mutation_log.clone()
1086    }
1087}
1088
1089// ---------------------------------------------------------------------------
1090// ReactiveMap
1091// ---------------------------------------------------------------------------
1092
1093/// Reactive key-value map.
1094///
1095/// Emits `Vec<(K, V)>` snapshots via a Core state node on every mutation.
1096/// (Vec of pairs rather than HashMap for serialization stability.)
1097pub struct ReactiveMap<K, V>
1098where
1099    K: Clone + Eq + Hash + Send + Sync + 'static,
1100    V: Clone + Send + Sync + 'static,
1101{
1102    inner: Mutex<MapInner<K, V>>,
1103    emitter: EmitHandle<Vec<(K, V)>>,
1104    pub node_id: NodeId,
1105}
1106
1107/// Score-based retention policy for [`ReactiveMap`].
1108///
1109/// After every mutation, entries are scored and those below `archive_threshold`
1110/// (or exceeding `max_size` by ascending score) are archived (deleted).
1111/// Mutually exclusive with top-level `max_size` (LRU).
1112pub struct RetentionPolicy<K, V>
1113where
1114    K: Clone + Eq + Hash + Send + Sync + 'static,
1115    V: Clone + Send + Sync + 'static,
1116{
1117    /// Score function — higher is kept.
1118    pub score: Arc<dyn Fn(&K, &V) -> f64 + Send + Sync>,
1119    /// Entries scoring below this threshold are archived.
1120    pub archive_threshold: Option<f64>,
1121    /// Maximum number of live entries. Lowest-scored entries archived first.
1122    pub max_size: Option<usize>,
1123    /// Called for each archived entry.
1124    pub on_archive: Option<Arc<dyn Fn(&K, &V, f64) + Send + Sync>>,
1125}
1126
1127struct MapInner<K, V>
1128where
1129    K: Clone + Eq + Hash + Send + Sync + 'static,
1130    V: Clone + Send + Sync + 'static,
1131{
1132    backend: Box<dyn MapBackend<K, V>>,
1133    mutation_log: Option<Vec<BaseChange<MapChange<K, V>>>>,
1134    structure_name: String,
1135    /// Per-key expiry timestamps (monotonic ns). Present only when TTL is enabled.
1136    ttl_expiry: HashMap<K, u64>,
1137    /// Default TTL in nanoseconds. None = no TTL.
1138    default_ttl_ns: Option<u64>,
1139    /// LRU access order (most recently used at end). Present only when LRU is enabled.
1140    lru_order: Vec<K>,
1141    /// Maximum entries for LRU eviction. None = no LRU cap.
1142    lru_max_size: Option<usize>,
1143    /// Score-based retention policy.
1144    retention: Option<RetentionPolicy<K, V>>,
1145}
1146
1147impl<K, V> MapInner<K, V>
1148where
1149    K: Clone + Eq + Hash + Send + Sync + 'static,
1150    V: Clone + Send + Sync + 'static,
1151{
1152    fn record(&mut self, change: MapChange<K, V>, version: Version) {
1153        if let Some(log) = &mut self.mutation_log {
1154            log.push(BaseChange {
1155                structure: self.structure_name.clone(),
1156                version,
1157                t_ns: wall_clock_ns(),
1158                seq: None,
1159                lifecycle: Lifecycle::Data,
1160                change,
1161            });
1162        }
1163    }
1164
1165    /// Prune expired keys. Returns deleted (key, previous_value) pairs.
1166    fn prune_expired_inner(&mut self) -> Vec<(K, V)> {
1167        if self.ttl_expiry.is_empty() {
1168            return vec![];
1169        }
1170        let now = monotonic_ns();
1171        let expired_keys: Vec<K> = self
1172            .ttl_expiry
1173            .iter()
1174            .filter(|(_, &exp)| now >= exp)
1175            .map(|(k, _)| k.clone())
1176            .collect();
1177        let mut expired = Vec::new();
1178        for k in expired_keys {
1179            if let Some(prev) = self.backend.get(&k) {
1180                self.backend.delete(&k);
1181                self.ttl_expiry.remove(&k);
1182                self.lru_remove(&k);
1183                expired.push((k, prev));
1184            }
1185        }
1186        expired
1187    }
1188
1189    /// Apply retention policy. Returns archived keys with scores.
1190    fn apply_retention_inner(&mut self) -> Vec<(K, V, f64)> {
1191        // Extract retention config values to avoid borrow conflict.
1192        let (score_fn, archive_threshold, max_size) = match &self.retention {
1193            Some(r) => (Arc::clone(&r.score), r.archive_threshold, r.max_size),
1194            None => return vec![],
1195        };
1196        let entries = self.backend.to_vec();
1197        if entries.is_empty() {
1198            return vec![];
1199        }
1200        let mut scored: Vec<(K, V, f64)> = entries
1201            .into_iter()
1202            .map(|(k, v)| {
1203                let s = (score_fn)(&k, &v);
1204                (k, v, s)
1205            })
1206            .collect();
1207        scored.sort_by(|a, b| a.2.total_cmp(&b.2));
1208
1209        let mut archived = Vec::new();
1210        if let Some(threshold) = archive_threshold {
1211            while let Some(entry) = scored.first() {
1212                if entry.2 < threshold {
1213                    let (k, v, s) = scored.remove(0);
1214                    self.backend.delete(&k);
1215                    self.ttl_expiry.remove(&k);
1216                    self.lru_remove(&k);
1217                    archived.push((k, v, s));
1218                } else {
1219                    break;
1220                }
1221            }
1222        }
1223        if let Some(max) = max_size {
1224            while scored.len() > max {
1225                let (k, v, s) = scored.remove(0);
1226                self.backend.delete(&k);
1227                self.ttl_expiry.remove(&k);
1228                self.lru_remove(&k);
1229                archived.push((k, v, s));
1230            }
1231        }
1232        archived
1233    }
1234
1235    /// Touch key in LRU order (move to end). Does NOT emit.
1236    fn lru_touch(&mut self, key: &K) {
1237        if self.lru_max_size.is_none() {
1238            return;
1239        }
1240        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
1241            self.lru_order.remove(pos);
1242            self.lru_order.push(key.clone());
1243        }
1244    }
1245
1246    /// Remove key from LRU order.
1247    fn lru_remove(&mut self, key: &K) {
1248        if self.lru_max_size.is_none() {
1249            return;
1250        }
1251        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
1252            self.lru_order.remove(pos);
1253        }
1254    }
1255
1256    /// Evict LRU entries exceeding max_size. Returns evicted (key, previous_value) pairs.
1257    fn lru_evict(&mut self) -> Vec<(K, V)> {
1258        let Some(max) = self.lru_max_size else {
1259            return vec![];
1260        };
1261        let mut evicted = Vec::new();
1262        while self.backend.size() > max && !self.lru_order.is_empty() {
1263            let victim = self.lru_order.remove(0);
1264            if let Some(prev) = self.backend.get(&victim) {
1265                self.backend.delete(&victim);
1266                self.ttl_expiry.remove(&victim);
1267                evicted.push((victim, prev));
1268            }
1269        }
1270        evicted
1271    }
1272
1273    /// Record TTL for a key. Per-call `ttl` (seconds) overrides `default_ttl_ns`.
1274    fn set_ttl_with(&mut self, key: &K, ttl: Option<f64>) {
1275        let ttl_ns = match ttl {
1276            Some(secs) => Some((secs * 1_000_000_000.0) as u64),
1277            None => self.default_ttl_ns,
1278        };
1279        if let Some(ns) = ttl_ns {
1280            self.ttl_expiry.insert(key.clone(), monotonic_ns() + ns);
1281        }
1282    }
1283}
1284
1285/// Options for constructing a [`ReactiveMap`].
1286pub struct ReactiveMapOptions<K, V>
1287where
1288    K: Clone + Eq + Hash + Send + Sync + 'static,
1289    V: Clone + Send + Sync + 'static,
1290{
1291    pub name: String,
1292    pub backend: Option<Box<dyn MapBackend<K, V>>>,
1293    pub mutation_log: bool,
1294    /// Default TTL in seconds. Applied to all `set`/`set_many` calls.
1295    /// Must be > 0 if set.
1296    pub default_ttl: Option<f64>,
1297    /// LRU cap. Mutually exclusive with `retention`.
1298    pub max_size: Option<usize>,
1299    /// Score-based retention policy. Mutually exclusive with `max_size`.
1300    pub retention: Option<RetentionPolicy<K, V>>,
1301}
1302
1303impl<K, V> Default for ReactiveMapOptions<K, V>
1304where
1305    K: Clone + Eq + Hash + Send + Sync + 'static,
1306    V: Clone + Send + Sync + 'static,
1307{
1308    fn default() -> Self {
1309        Self {
1310            name: "reactiveMap".into(),
1311            backend: None,
1312            mutation_log: false,
1313            default_ttl: None,
1314            max_size: None,
1315            retention: None,
1316        }
1317    }
1318}
1319
1320/// Configuration validation error for [`ReactiveMap`].
1321#[derive(Debug, Clone, PartialEq, Eq)]
1322pub struct MapConfigError(pub String);
1323
1324impl std::fmt::Display for MapConfigError {
1325    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1326        f.write_str(&self.0)
1327    }
1328}
1329
1330impl std::error::Error for MapConfigError {}
1331
1332impl<K, V> ReactiveMap<K, V>
1333where
1334    K: Clone + Eq + Hash + Send + Sync + 'static,
1335    V: Clone + Send + Sync + 'static,
1336{
1337    /// Create a new reactive map.
1338    ///
1339    /// # Errors
1340    /// Returns `MapConfigError` if both `max_size` (LRU) and `retention` are set.
1341    pub fn new(
1342        core: &Core,
1343        intern: InternFn<Vec<(K, V)>>,
1344        opts: ReactiveMapOptions<K, V>,
1345    ) -> Result<Self, MapConfigError> {
1346        if opts.max_size.is_some() && opts.retention.is_some() {
1347            return Err(MapConfigError(
1348                "max_size (LRU) and retention are mutually exclusive".into(),
1349            ));
1350        }
1351        if let Some(ref r) = opts.retention {
1352            if r.archive_threshold.is_none() && r.max_size.is_none() {
1353                return Err(MapConfigError(
1354                    "retention requires at least one of archive_threshold or max_size".into(),
1355                ));
1356            }
1357        }
1358        if let Some(ttl) = opts.default_ttl {
1359            if ttl <= 0.0 {
1360                return Err(MapConfigError("default_ttl must be > 0".into()));
1361            }
1362        }
1363        let node_id = core
1364            .register_state(HandleId::new(0), false)
1365            .expect("register_state for ReactiveMap");
1366        let backend: Box<dyn MapBackend<K, V>> = opts
1367            .backend
1368            .unwrap_or_else(|| Box::new(HashMapBackend::new()));
1369        let mutation_log = if opts.mutation_log {
1370            Some(Vec::new())
1371        } else {
1372            None
1373        };
1374        let default_ttl_ns = opts.default_ttl.map(|secs| (secs * 1_000_000_000.0) as u64);
1375        let inner = MapInner {
1376            backend,
1377            mutation_log,
1378            structure_name: opts.name,
1379            ttl_expiry: HashMap::new(),
1380            default_ttl_ns,
1381            lru_order: Vec::new(),
1382            lru_max_size: opts.max_size,
1383            retention: opts.retention,
1384        };
1385        Ok(Self {
1386            inner: Mutex::new(inner),
1387            emitter: EmitHandle {
1388                node_id,
1389                intern,
1390                version: AtomicU64::new(0),
1391            },
1392            node_id,
1393        })
1394    }
1395
1396    #[must_use]
1397    pub fn size(&self) -> usize {
1398        self.inner.lock().backend.size()
1399    }
1400
1401    /// Check if key exists. Expired keys are pruned (observable side-effect).
1402    /// LRU touch: live key is marked as most-recently-used (no emission).
1403    pub fn has(&self, core: &Core, key: &K) -> bool {
1404        let (has, expired) = {
1405            let mut inner = self.inner.lock();
1406            let mut target_expired = false;
1407            // Check TTL expiry first.
1408            if inner.default_ttl_ns.is_some() {
1409                if let Some(&exp) = inner.ttl_expiry.get(key) {
1410                    if monotonic_ns() >= exp {
1411                        target_expired = true;
1412                    }
1413                }
1414            }
1415            // Prune all expired keys (including the target if expired).
1416            let mut expired = inner.prune_expired_inner();
1417            if target_expired && !expired.iter().any(|(k, _)| k == key) {
1418                // Target was expired but prune_expired_inner didn't catch it
1419                // (race with monotonic_ns). Delete it explicitly.
1420                if let Some(prev) = inner.backend.get(key) {
1421                    inner.backend.delete(key);
1422                    inner.ttl_expiry.remove(key);
1423                    inner.lru_remove(key);
1424                    expired.push((key.clone(), prev));
1425                }
1426            }
1427            let has = if target_expired {
1428                false
1429            } else {
1430                let h = inner.backend.has(key);
1431                if h {
1432                    inner.lru_touch(key);
1433                }
1434                h
1435            };
1436            (has, expired)
1437        };
1438        if !expired.is_empty() {
1439            let snapshot = self.inner.lock().backend.to_vec();
1440            let version = self.emitter.emit(core, snapshot);
1441            let mut inner = self.inner.lock();
1442            for (k, prev) in expired {
1443                inner.record(
1444                    MapChange::Delete {
1445                        key: k,
1446                        previous: prev,
1447                        reason: DeleteReason::Expired,
1448                    },
1449                    version.clone(),
1450                );
1451            }
1452        }
1453        has
1454    }
1455
1456    /// Get value by key. Expired keys return `None` (observable side-effect).
1457    /// LRU touch: live key is marked as most-recently-used (no emission).
1458    pub fn get(&self, core: &Core, key: &K) -> Option<V> {
1459        let (value, expired) = {
1460            let mut inner = self.inner.lock();
1461            let mut target_expired = false;
1462            if inner.default_ttl_ns.is_some() {
1463                if let Some(&exp) = inner.ttl_expiry.get(key) {
1464                    if monotonic_ns() >= exp {
1465                        target_expired = true;
1466                    }
1467                }
1468            }
1469            // Prune all expired keys (including the target if expired).
1470            let mut expired = inner.prune_expired_inner();
1471            if target_expired && !expired.iter().any(|(k, _)| k == key) {
1472                // Target was expired but prune_expired_inner didn't catch it.
1473                if let Some(prev) = inner.backend.get(key) {
1474                    inner.backend.delete(key);
1475                    inner.ttl_expiry.remove(key);
1476                    inner.lru_remove(key);
1477                    expired.push((key.clone(), prev));
1478                }
1479            }
1480            let value = if target_expired {
1481                None
1482            } else {
1483                let v = inner.backend.get(key);
1484                if v.is_some() {
1485                    inner.lru_touch(key);
1486                }
1487                v
1488            };
1489            (value, expired)
1490        };
1491        if !expired.is_empty() {
1492            let snapshot = self.inner.lock().backend.to_vec();
1493            let version = self.emitter.emit(core, snapshot);
1494            let mut inner = self.inner.lock();
1495            for (k, prev) in expired {
1496                inner.record(
1497                    MapChange::Delete {
1498                        key: k,
1499                        previous: prev,
1500                        reason: DeleteReason::Expired,
1501                    },
1502                    version.clone(),
1503                );
1504            }
1505        }
1506        value
1507    }
1508
1509    pub fn set(&self, core: &Core, key: K, value: V) {
1510        self.set_with_ttl(core, key, value, None);
1511    }
1512
1513    /// Set a key with an optional per-call TTL override (seconds).
1514    ///
1515    /// # Panics
1516    /// Panics if `ttl` is `Some` with a non-positive or non-finite value.
1517    pub fn set_with_ttl(&self, core: &Core, key: K, value: V, ttl: Option<f64>) {
1518        if let Some(t) = ttl {
1519            assert!(
1520                t > 0.0 && t.is_finite(),
1521                "per-call ttl must be positive and finite"
1522            );
1523        }
1524        let (snapshot, change, eviction_changes) = {
1525            let mut inner = self.inner.lock();
1526            let expired = inner.prune_expired_inner();
1527            let change = inner.mutation_log.is_some().then(|| MapChange::Set {
1528                key: key.clone(),
1529                value: value.clone(),
1530            });
1531            inner.set_ttl_with(&key, ttl);
1532            inner.lru_remove(&key);
1533            if inner.lru_max_size.is_some() {
1534                inner.lru_order.push(key.clone());
1535            }
1536            inner.backend.set(key, value);
1537            let evicted = inner.lru_evict();
1538            let archived = inner.apply_retention_inner();
1539            let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1540            for (k, prev) in expired {
1541                eviction_changes.push((k, prev, DeleteReason::Expired));
1542            }
1543            for (k, prev) in evicted {
1544                eviction_changes.push((k, prev, DeleteReason::LruEvict));
1545            }
1546            for (k, v, s) in &archived {
1547                if let Some(on_archive) =
1548                    &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1549                {
1550                    on_archive(k, v, *s);
1551                }
1552                eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1553            }
1554            (inner.backend.to_vec(), change, eviction_changes)
1555        };
1556        let version = self.emitter.emit(core, snapshot);
1557        if change.is_some() || !eviction_changes.is_empty() {
1558            let mut inner = self.inner.lock();
1559            if let Some(change) = change {
1560                inner.record(change, version.clone());
1561            }
1562            for (k, prev, reason) in eviction_changes {
1563                inner.record(
1564                    MapChange::Delete {
1565                        key: k,
1566                        previous: prev,
1567                        reason,
1568                    },
1569                    version.clone(),
1570                );
1571            }
1572        }
1573    }
1574
1575    pub fn set_many(&self, core: &Core, entries: Vec<(K, V)>) {
1576        self.set_many_with_ttl(core, entries, None);
1577    }
1578
1579    /// Batch set with optional per-call TTL override (seconds).
1580    ///
1581    /// # Panics
1582    /// Panics if `ttl` is `Some` with a non-positive or non-finite value.
1583    pub fn set_many_with_ttl(&self, core: &Core, entries: Vec<(K, V)>, ttl: Option<f64>) {
1584        if let Some(t) = ttl {
1585            assert!(
1586                t > 0.0 && t.is_finite(),
1587                "per-call ttl must be positive and finite"
1588            );
1589        }
1590        if entries.is_empty() {
1591            return;
1592        }
1593        let (snapshot, changes, eviction_changes) = {
1594            let mut inner = self.inner.lock();
1595            let expired = inner.prune_expired_inner();
1596            let changes: Option<Vec<MapChange<K, V>>> = inner.mutation_log.is_some().then(|| {
1597                entries
1598                    .iter()
1599                    .map(|(k, v)| MapChange::Set {
1600                        key: k.clone(),
1601                        value: v.clone(),
1602                    })
1603                    .collect()
1604            });
1605            for (k, _) in &entries {
1606                inner.set_ttl_with(k, ttl);
1607                inner.lru_remove(k);
1608                if inner.lru_max_size.is_some() {
1609                    inner.lru_order.push(k.clone());
1610                }
1611            }
1612            inner.backend.set_many(entries);
1613            let evicted = inner.lru_evict();
1614            let archived = inner.apply_retention_inner();
1615            let mut eviction_changes: Vec<(K, V, DeleteReason)> = Vec::new();
1616            for (k, prev) in expired {
1617                eviction_changes.push((k, prev, DeleteReason::Expired));
1618            }
1619            for (k, prev) in evicted {
1620                eviction_changes.push((k, prev, DeleteReason::LruEvict));
1621            }
1622            for (k, v, s) in &archived {
1623                if let Some(on_archive) =
1624                    &inner.retention.as_ref().and_then(|r| r.on_archive.clone())
1625                {
1626                    on_archive(k, v, *s);
1627                }
1628                eviction_changes.push((k.clone(), v.clone(), DeleteReason::Archived));
1629            }
1630            (inner.backend.to_vec(), changes, eviction_changes)
1631        };
1632        let version = self.emitter.emit(core, snapshot);
1633        if changes.is_some() || !eviction_changes.is_empty() {
1634            let mut inner = self.inner.lock();
1635            if let Some(changes) = changes {
1636                for change in changes {
1637                    inner.record(change, version.clone());
1638                }
1639            }
1640            for (k, prev, reason) in eviction_changes {
1641                inner.record(
1642                    MapChange::Delete {
1643                        key: k,
1644                        previous: prev,
1645                        reason,
1646                    },
1647                    version.clone(),
1648                );
1649            }
1650        }
1651    }
1652
1653    pub fn delete(&self, core: &Core, key: &K) {
1654        let (snapshot, previous) = {
1655            let mut inner = self.inner.lock();
1656            let previous = inner.backend.get(key);
1657            if !inner.backend.delete(key) {
1658                return;
1659            }
1660            inner.ttl_expiry.remove(key);
1661            inner.lru_remove(key);
1662            (inner.backend.to_vec(), previous)
1663        };
1664        let version = self.emitter.emit(core, snapshot);
1665        if let Some(prev) = previous {
1666            self.inner.lock().record(
1667                MapChange::Delete {
1668                    key: key.clone(),
1669                    previous: prev,
1670                    reason: DeleteReason::Explicit,
1671                },
1672                version,
1673            );
1674        }
1675    }
1676
1677    pub fn delete_many(&self, core: &Core, keys: &[K]) {
1678        let (snapshot, actually_deleted) = {
1679            let mut inner = self.inner.lock();
1680            let actually_deleted: Vec<(K, V)> = keys
1681                .iter()
1682                .filter_map(|k| inner.backend.get(k).map(|v| (k.clone(), v)))
1683                .collect();
1684            let removed = inner.backend.delete_many(keys);
1685            if removed == 0 {
1686                return;
1687            }
1688            for k in keys {
1689                inner.ttl_expiry.remove(k);
1690                inner.lru_remove(k);
1691            }
1692            (inner.backend.to_vec(), actually_deleted)
1693        };
1694        let version = self.emitter.emit(core, snapshot);
1695        if !actually_deleted.is_empty() {
1696            let mut inner = self.inner.lock();
1697            for (k, prev) in actually_deleted {
1698                inner.record(
1699                    MapChange::Delete {
1700                        key: k,
1701                        previous: prev,
1702                        reason: DeleteReason::Explicit,
1703                    },
1704                    version.clone(),
1705                );
1706            }
1707        }
1708    }
1709
1710    pub fn clear(&self, core: &Core) {
1711        let (snapshot, count) = {
1712            let mut inner = self.inner.lock();
1713            let count = inner.backend.clear();
1714            if count == 0 {
1715                return;
1716            }
1717            inner.ttl_expiry.clear();
1718            inner.lru_order.clear();
1719            (inner.backend.to_vec(), count)
1720        };
1721        let version = self.emitter.emit(core, snapshot);
1722        self.inner
1723            .lock()
1724            .record(MapChange::Clear { count }, version);
1725    }
1726
1727    /// Explicitly prune all expired keys. Returns the number of keys removed.
1728    pub fn prune_expired(&self, core: &Core) -> usize {
1729        let expired = {
1730            let mut inner = self.inner.lock();
1731            inner.prune_expired_inner()
1732        };
1733        if expired.is_empty() {
1734            return 0;
1735        }
1736        let count = expired.len();
1737        let snapshot = self.inner.lock().backend.to_vec();
1738        let version = self.emitter.emit(core, snapshot);
1739        let mut inner = self.inner.lock();
1740        for (k, prev) in expired {
1741            inner.record(
1742                MapChange::Delete {
1743                    key: k,
1744                    previous: prev,
1745                    reason: DeleteReason::Expired,
1746                },
1747                version.clone(),
1748            );
1749        }
1750        count
1751    }
1752
1753    #[must_use]
1754    pub fn to_vec(&self) -> Vec<(K, V)> {
1755        self.inner.lock().backend.to_vec()
1756    }
1757
1758    #[must_use]
1759    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<MapChange<K, V>>>> {
1760        self.inner.lock().mutation_log.clone()
1761    }
1762}
1763
1764// ---------------------------------------------------------------------------
1765// ReactiveIndex
1766// ---------------------------------------------------------------------------
1767
1768/// Reactive sorted index with primary key lookup and secondary sort order.
1769///
1770/// Emits `Vec<IndexRow<K, V>>` snapshots via a Core state node on every mutation.
1771pub struct ReactiveIndex<K, V>
1772where
1773    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1774    V: Clone + Send + Sync + 'static,
1775{
1776    inner: Mutex<IndexInner<K, V>>,
1777    emitter: EmitHandle<Vec<IndexRow<K, V>>>,
1778    pub node_id: NodeId,
1779}
1780
1781/// User-supplied equality function for upsert idempotency.
1782pub type IndexEqualsFn<K, V> = Arc<dyn Fn(&IndexRow<K, V>, &IndexRow<K, V>) -> bool + Send + Sync>;
1783
1784struct IndexInner<K, V>
1785where
1786    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1787    V: Clone + Send + Sync + 'static,
1788{
1789    backend: Box<dyn IndexBackend<K, V>>,
1790    mutation_log: Option<Vec<BaseChange<IndexChange<K, V>>>>,
1791    structure_name: String,
1792    /// Factory-level equals for upsert idempotency. When set, `upsert` on an
1793    /// existing key compares the existing row with the proposed row; if
1794    /// `equals(existing, proposed)` returns `true`, the upsert is a no-op
1795    /// (no version bump, no emission).
1796    equals: Option<IndexEqualsFn<K, V>>,
1797}
1798
1799impl<K, V> IndexInner<K, V>
1800where
1801    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1802    V: Clone + Send + Sync + 'static,
1803{
1804    fn record(&mut self, change: IndexChange<K, V>, version: Version) {
1805        if let Some(log) = &mut self.mutation_log {
1806            log.push(BaseChange {
1807                structure: self.structure_name.clone(),
1808                version,
1809                t_ns: wall_clock_ns(),
1810                seq: None,
1811                lifecycle: Lifecycle::Data,
1812                change,
1813            });
1814        }
1815    }
1816}
1817
1818/// Per-call upsert options for [`ReactiveIndex::upsert`].
1819pub struct UpsertOptions<K, V>
1820where
1821    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1822    V: Clone + Send + Sync + 'static,
1823{
1824    /// Per-call equals override. Takes precedence over the factory-level equals.
1825    pub equals: Option<IndexEqualsFn<K, V>>,
1826}
1827
1828impl<K, V> Default for UpsertOptions<K, V>
1829where
1830    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1831    V: Clone + Send + Sync + 'static,
1832{
1833    fn default() -> Self {
1834        Self { equals: None }
1835    }
1836}
1837
1838/// Options for constructing a [`ReactiveIndex`].
1839pub struct ReactiveIndexOptions<K, V>
1840where
1841    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1842    V: Clone + Send + Sync + 'static,
1843{
1844    pub name: String,
1845    pub backend: Option<Box<dyn IndexBackend<K, V>>>,
1846    pub mutation_log: bool,
1847    /// Factory-level equals for upsert idempotency.
1848    pub equals: Option<IndexEqualsFn<K, V>>,
1849}
1850
1851impl<K, V> Default for ReactiveIndexOptions<K, V>
1852where
1853    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1854    V: Clone + Send + Sync + 'static,
1855{
1856    fn default() -> Self {
1857        Self {
1858            name: "reactiveIndex".into(),
1859            backend: None,
1860            mutation_log: false,
1861            equals: None,
1862        }
1863    }
1864}
1865
1866impl<K, V> ReactiveIndex<K, V>
1867where
1868    K: Clone + Eq + Hash + Send + Sync + ToString + 'static,
1869    V: Clone + Send + Sync + 'static,
1870{
1871    #[must_use]
1872    pub fn new(
1873        core: &Core,
1874        intern: InternFn<Vec<IndexRow<K, V>>>,
1875        opts: ReactiveIndexOptions<K, V>,
1876    ) -> Self {
1877        let node_id = core
1878            .register_state(HandleId::new(0), false)
1879            .expect("register_state for ReactiveIndex");
1880        let backend: Box<dyn IndexBackend<K, V>> = opts
1881            .backend
1882            .unwrap_or_else(|| Box::new(VecIndexBackend::new()));
1883        let mutation_log = if opts.mutation_log {
1884            Some(Vec::new())
1885        } else {
1886            None
1887        };
1888        let inner = IndexInner {
1889            backend,
1890            mutation_log,
1891            structure_name: opts.name,
1892            equals: opts.equals,
1893        };
1894        Self {
1895            inner: Mutex::new(inner),
1896            emitter: EmitHandle {
1897                node_id,
1898                intern,
1899                version: AtomicU64::new(0),
1900            },
1901            node_id,
1902        }
1903    }
1904
1905    #[must_use]
1906    pub fn size(&self) -> usize {
1907        self.inner.lock().backend.size()
1908    }
1909
1910    #[must_use]
1911    pub fn has(&self, primary: &K) -> bool {
1912        self.inner.lock().backend.has(primary)
1913    }
1914
1915    #[must_use]
1916    pub fn get(&self, primary: &K) -> Option<V> {
1917        self.inner.lock().backend.get(primary)
1918    }
1919
1920    /// Insert or update a row. Returns `true` if inserted (new primary key),
1921    /// `false` if updated or skipped by equals idempotency.
1922    pub fn upsert(&self, core: &Core, primary: K, secondary: String, value: V) -> bool {
1923        self.upsert_with(core, primary, secondary, value, &UpsertOptions::default())
1924    }
1925
1926    /// Insert or update a row with per-call options.
1927    ///
1928    /// If `opts.equals` (or the factory-level equals) returns `true` for the
1929    /// existing row vs the proposed row, the upsert is a no-op (no emission).
1930    pub fn upsert_with(
1931        &self,
1932        core: &Core,
1933        primary: K,
1934        secondary: String,
1935        value: V,
1936        opts: &UpsertOptions<K, V>,
1937    ) -> bool {
1938        let (is_new, snapshot, change) = {
1939            let mut inner = self.inner.lock();
1940            // Check equals idempotency: per-call overrides factory-level.
1941            let eq_fn = opts.equals.as_ref().or(inner.equals.as_ref());
1942            if let Some(eq) = eq_fn {
1943                if let Some(existing_row) = inner.backend.get_row(&primary) {
1944                    let proposed = IndexRow {
1945                        primary: primary.clone(),
1946                        secondary: secondary.clone(),
1947                        value: value.clone(),
1948                    };
1949                    if eq(&existing_row, &proposed) {
1950                        return false;
1951                    }
1952                }
1953            }
1954            let change = inner.mutation_log.is_some().then(|| IndexChange::Upsert {
1955                primary: primary.clone(),
1956                secondary: secondary.clone(),
1957                value: value.clone(),
1958            });
1959            let is_new = inner.backend.upsert(primary, secondary, value);
1960            (is_new, inner.backend.to_ordered(), change)
1961        };
1962        let version = self.emitter.emit(core, snapshot);
1963        if let Some(change) = change {
1964            self.inner.lock().record(change, version);
1965        }
1966        is_new
1967    }
1968
1969    /// Batch upsert. Rows matching the factory-level equals are skipped.
1970    /// If ALL rows are skipped, no emission occurs.
1971    pub fn upsert_many(&self, core: &Core, rows: Vec<(K, String, V)>) {
1972        if rows.is_empty() {
1973            return;
1974        }
1975        let (snapshot, changes) = {
1976            let mut inner = self.inner.lock();
1977            // Filter rows through equals idempotency.
1978            let effective_rows: Vec<(K, String, V)> = if let Some(eq) = &inner.equals {
1979                rows.into_iter()
1980                    .filter(|(pk, sec, val)| {
1981                        if let Some(existing) = inner.backend.get_row(pk) {
1982                            let proposed = IndexRow {
1983                                primary: pk.clone(),
1984                                secondary: sec.clone(),
1985                                value: val.clone(),
1986                            };
1987                            !eq(&existing, &proposed)
1988                        } else {
1989                            true
1990                        }
1991                    })
1992                    .collect()
1993            } else {
1994                rows
1995            };
1996            if effective_rows.is_empty() {
1997                return;
1998            }
1999            let changes: Option<Vec<IndexChange<K, V>>> = inner.mutation_log.is_some().then(|| {
2000                effective_rows
2001                    .iter()
2002                    .map(|(k, s, v)| IndexChange::Upsert {
2003                        primary: k.clone(),
2004                        secondary: s.clone(),
2005                        value: v.clone(),
2006                    })
2007                    .collect()
2008            });
2009            inner.backend.upsert_many(effective_rows);
2010            (inner.backend.to_ordered(), changes)
2011        };
2012        let version = self.emitter.emit(core, snapshot);
2013        if let Some(changes) = changes {
2014            let mut inner = self.inner.lock();
2015            for change in changes {
2016                inner.record(change, version.clone());
2017            }
2018        }
2019    }
2020
2021    pub fn delete(&self, core: &Core, primary: &K) {
2022        let snapshot = {
2023            let mut inner = self.inner.lock();
2024            if !inner.backend.delete(primary) {
2025                return;
2026            }
2027            inner.backend.to_ordered()
2028        };
2029        let version = self.emitter.emit(core, snapshot);
2030        self.inner.lock().record(
2031            IndexChange::Delete {
2032                primary: primary.clone(),
2033            },
2034            version,
2035        );
2036    }
2037
2038    pub fn delete_many(&self, core: &Core, primaries: &[K]) {
2039        let (snapshot, actually_deleted) = {
2040            let mut inner = self.inner.lock();
2041            // Pre-filter to keys that actually exist (P4 fix).
2042            let actually_deleted: Vec<K> = if inner.mutation_log.is_some() {
2043                primaries
2044                    .iter()
2045                    .filter(|k| inner.backend.has(k))
2046                    .cloned()
2047                    .collect()
2048            } else {
2049                vec![]
2050            };
2051            let removed = inner.backend.delete_many(primaries);
2052            if removed == 0 {
2053                return;
2054            }
2055            (inner.backend.to_ordered(), actually_deleted)
2056        };
2057        let version = self.emitter.emit(core, snapshot);
2058        if !actually_deleted.is_empty() {
2059            self.inner.lock().record(
2060                IndexChange::DeleteMany {
2061                    primaries: actually_deleted,
2062                },
2063                version,
2064            );
2065        }
2066    }
2067
2068    pub fn clear(&self, core: &Core) {
2069        let (snapshot, count) = {
2070            let mut inner = self.inner.lock();
2071            let count = inner.backend.clear();
2072            if count == 0 {
2073                return;
2074            }
2075            (inner.backend.to_ordered(), count)
2076        };
2077        let version = self.emitter.emit(core, snapshot);
2078        self.inner
2079            .lock()
2080            .record(IndexChange::Clear { count }, version);
2081    }
2082
2083    #[must_use]
2084    pub fn to_ordered(&self) -> Vec<IndexRow<K, V>> {
2085        self.inner.lock().backend.to_ordered()
2086    }
2087
2088    #[must_use]
2089    pub fn to_primary_map(&self) -> Vec<(K, V)> {
2090        self.inner.lock().backend.to_primary_map()
2091    }
2092
2093    /// Values of all rows whose **primary key** sorts within `[start, end)`
2094    /// (inclusive start, exclusive end), in **ascending primary-key order**
2095    /// (D205). `start >= end` or no matches → empty vec. The `K: Ord` bound
2096    /// is method-scoped so it does not constrain `ReactiveIndex<K, V>` itself.
2097    #[must_use]
2098    pub fn range_by_primary(&self, start: &K, end: &K) -> Vec<V>
2099    where
2100        K: Ord,
2101    {
2102        let mut rows: Vec<(K, V)> = self
2103            .inner
2104            .lock()
2105            .backend
2106            .to_primary_map()
2107            .into_iter()
2108            .filter(|(k, _)| k >= start && k < end)
2109            .collect();
2110        rows.sort_by(|a, b| a.0.cmp(&b.0));
2111        rows.into_iter().map(|(_, v)| v).collect()
2112    }
2113
2114    #[must_use]
2115    pub fn mutation_log_snapshot(&self) -> Option<Vec<BaseChange<IndexChange<K, V>>>> {
2116        self.inner.lock().mutation_log.clone()
2117    }
2118}