Skip to main content

zerodds_dcps/
instance_tracker.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `InstanceTracker` — die zentrale Buchhaltung fuer keyed Topic-
4//! Instanzen, sowohl auf der Writer- als auch auf der Reader-Seite.
5//!
6//! Spec-Referenz: OMG DDS-DCPS 1.4 §2.2.2.4.2.5 (`register_instance`),
7//! §2.2.2.4.2.7 (`unregister_instance`), §2.2.2.4.2.10 (`dispose`),
8//! §2.2.2.4.2.13 (`get_key_value`), §2.2.2.4.2.14 (`lookup_instance`),
9//! §2.2.2.5.1 (`InstanceStateKind`).
10//!
11//! # Datenmodell
12//!
13//! Wir indizieren Instanzen ueber den **16-Byte-KeyHash** (XTypes 1.3
14//! §7.6.8). Pro Instanz tragen wir:
15//! * den vergebenen [`InstanceHandle`],
16//! * den Lifecycle-Zustand ([`InstanceStateKind`]),
17//! * die Generation-Counters (`disposed`, `no_writers`),
18//! * die Anzahl noch registrierter Writer (Reader-seitig),
19//! * den letzten beobachteten Sample-Timestamp.
20//!
21//! KeyHash → Handle ist eine 1:1-Map fuer die Lebenszeit des Trackers
22//! (Handles werden nicht recycled, auch wenn die Instanz disposed +
23//! purged ist; Spec laesst Recycling explizit zu, wir vermeiden es zur
24//! Test-Stabilitaet).
25
26extern crate alloc;
27
28use alloc::collections::BTreeMap;
29use alloc::sync::Arc;
30
31#[cfg(feature = "std")]
32use std::sync::Mutex;
33
34use zerodds_cdr::KEY_HASH_LEN;
35
36use crate::instance_handle::{InstanceHandle, InstanceHandleAllocator};
37use crate::sample_info::InstanceStateKind;
38use crate::time::Time;
39
40/// 16-Byte-KeyHash als Index.
41pub type KeyHash = [u8; KEY_HASH_LEN];
42
43/// Pro-Instanz-Buchhaltung.
44#[derive(Debug, Clone)]
45pub struct InstanceState {
46    /// Lokaler Handle (stabil ueber den Tracker-Lebenszyklus).
47    pub handle: InstanceHandle,
48    /// Aktueller Lifecycle-Zustand.
49    pub kind: InstanceStateKind,
50    /// `NOT_ALIVE_DISPOSED → ALIVE`-Transitions seit erstem Sample.
51    pub disposed_generation_count: i32,
52    /// `NOT_ALIVE_NO_WRITERS → ALIVE`-Transitions seit erstem Sample.
53    pub no_writers_generation_count: i32,
54    /// Anzahl Writer, die diese Instanz aktuell als registriert
55    /// fuehren. Reader-seitig: jeder eingehende Sample erhoeht den
56    /// Counter (falls neuer Writer); jeder unregister-Marker
57    /// dekrementiert. Writer-seitig: 0 oder 1 (eine Writer-Sicht).
58    pub writer_count: u32,
59    /// Wall-Clock-Zeit des zuletzt verarbeiteten Samples (oder None).
60    pub last_sample_timestamp: Option<Time>,
61    /// Reader-side: source_timestamp des letzten ans User-API
62    /// gelieferten Samples dieser Instanz. Spec §2.2.3.12
63    /// TIME_BASED_FILTER: ein neues Sample wird gefiltert wenn
64    /// `t - last_delivered_ts < minimum_separation`.
65    pub last_delivered_ts: Option<Time>,
66    /// Reader-side: Wall-Clock-Zeit, zu der die Instanz in
67    /// `NOT_ALIVE_DISPOSED` uebergegangen ist. Spec §2.2.3.22
68    /// `autopurge_disposed_samples_delay`: Samples werden nach
69    /// Ablauf des Delays purged.
70    pub disposed_at: Option<Time>,
71    /// Reader-side: Wall-Clock-Zeit, zu der die Instanz in
72    /// `NOT_ALIVE_NO_WRITERS` uebergegangen ist. Spec §2.2.3.22
73    /// `autopurge_no_writer_samples_delay`.
74    pub no_writers_at: Option<Time>,
75    /// Reader-side OWNERSHIP=EXCLUSIVE (Spec §2.2.3.10): aktueller
76    /// Eigentuemer-Writer als (GuidLike, Strength). Bei Tie-Breaker
77    /// gewinnt der lexikographisch hoehere `GuidLike`. Bei Liveliness-
78    /// Loss: explizit per `clear_owner` reset.
79    pub current_owner: Option<([u8; 16], i32)>,
80    /// Gespeicherter Key-Holder (Bytes), so kann `get_key_value` ohne
81    /// erneutes Decoden den Key zurueckspielen. Das ist der **PLAIN_CDR2-
82    /// BE**-Stream, also exakt der Input von `compute_key_hash`.
83    pub key_holder: alloc::vec::Vec<u8>,
84    /// View-State auf Reader-Seite. Auf Writer-Seite unbenutzt.
85    pub reader_view_new: bool,
86    /// Anzahl bisheriger Reader-Samples in dieser Instanz (sample_rank-
87    /// Hilfswert fuer den naechsten Read).
88    pub samples_in_cache: u32,
89}
90
91impl InstanceState {
92    fn fresh(handle: InstanceHandle, key_holder: alloc::vec::Vec<u8>) -> Self {
93        Self {
94            handle,
95            kind: InstanceStateKind::Alive,
96            disposed_generation_count: 0,
97            no_writers_generation_count: 0,
98            writer_count: 0,
99            last_sample_timestamp: None,
100            last_delivered_ts: None,
101            disposed_at: None,
102            no_writers_at: None,
103            current_owner: None,
104            key_holder,
105            reader_view_new: true,
106            samples_in_cache: 0,
107        }
108    }
109}
110
111/// Thread-safer Tracker — wird sowohl im DataWriter als auch im
112/// DataReader instanziiert.
113#[derive(Debug)]
114pub struct InstanceTracker {
115    inner: Arc<Mutex<TrackerInner>>,
116    allocator: Arc<InstanceHandleAllocator>,
117}
118
119#[derive(Debug, Default)]
120struct TrackerInner {
121    by_keyhash: BTreeMap<KeyHash, InstanceState>,
122    handle_to_keyhash: BTreeMap<InstanceHandle, KeyHash>,
123}
124
125impl Default for InstanceTracker {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131impl InstanceTracker {
132    /// Neuer Tracker mit eigenem [`InstanceHandleAllocator`].
133    #[must_use]
134    pub fn new() -> Self {
135        Self {
136            inner: Arc::new(Mutex::new(TrackerInner::default())),
137            allocator: Arc::new(InstanceHandleAllocator::new()),
138        }
139    }
140
141    /// Tracker mit shared Allocator (z.B. wenn Writer und Reader im
142    /// selben Participant ihre Handles aus demselben Pool ziehen).
143    #[must_use]
144    pub fn with_allocator(allocator: Arc<InstanceHandleAllocator>) -> Self {
145        Self {
146            inner: Arc::new(Mutex::new(TrackerInner::default())),
147            allocator,
148        }
149    }
150
151    /// Registriert die Instanz, wenn sie noch nicht bekannt ist; sonst
152    /// reaktiviert sie nur (Spec §2.2.2.4.2.5).
153    ///
154    /// Liefert immer den (stabilen) [`InstanceHandle`] zurueck.
155    pub fn register(
156        &self,
157        keyhash: KeyHash,
158        key_holder: alloc::vec::Vec<u8>,
159        timestamp: Option<Time>,
160    ) -> InstanceHandle {
161        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
162        let entry = g.by_keyhash.entry(keyhash).or_insert_with(|| {
163            let h = self.allocator.allocate();
164            InstanceState::fresh(h, key_holder.clone())
165        });
166        // Reaktivierung von NOT_ALIVE → ALIVE produziert einen
167        // Generation-Bump (Spec §2.2.2.5.1.7/8).
168        match entry.kind {
169            InstanceStateKind::NotAliveDisposed => {
170                entry.disposed_generation_count = entry.disposed_generation_count.saturating_add(1);
171                entry.kind = InstanceStateKind::Alive;
172            }
173            InstanceStateKind::NotAliveNoWriters => {
174                entry.no_writers_generation_count =
175                    entry.no_writers_generation_count.saturating_add(1);
176                entry.kind = InstanceStateKind::Alive;
177            }
178            InstanceStateKind::Alive => {}
179        }
180        entry.writer_count = entry.writer_count.saturating_add(1);
181        if let Some(ts) = timestamp {
182            entry.last_sample_timestamp = Some(ts);
183        }
184        let handle = entry.handle;
185        g.handle_to_keyhash.insert(handle, keyhash);
186        handle
187    }
188
189    /// Spec §2.2.3.12 TIME_BASED_FILTER — entscheidet, ob ein
190    /// Sample mit `sample_ts` ans User-API geliefert werden darf.
191    /// `false` wenn `t - last_delivered_ts < min_separation` (drop);
192    /// `true` sonst (deliver).
193    ///
194    /// Bei unbekannter Instanz oder erstem Sample (kein
195    /// `last_delivered_ts`) ist immer `true`.
196    #[must_use]
197    pub fn should_deliver_under_time_based_filter(
198        &self,
199        keyhash: &KeyHash,
200        sample_ts: Time,
201        min_separation_nanos: u128,
202    ) -> bool {
203        if min_separation_nanos == 0 {
204            return true;
205        }
206        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
207        let Some(s) = g.by_keyhash.get(keyhash) else {
208            return true;
209        };
210        let Some(last) = s.last_delivered_ts else {
211            return true;
212        };
213        // Nanos-Differenz; bei Sample-im-Vergangenheit (clock skew) —
214        // niemals filtern, der Caller hat ein Reorder-Problem aber
215        // nicht ein Filter-Problem.
216        let last_nanos = u128::from(u64::try_from(last.sec).unwrap_or(0)) * 1_000_000_000
217            + u128::from(last.nanosec);
218        let sample_nanos = u128::from(u64::try_from(sample_ts.sec).unwrap_or(0)) * 1_000_000_000
219            + u128::from(sample_ts.nanosec);
220        if sample_nanos < last_nanos {
221            return true;
222        }
223        sample_nanos - last_nanos >= min_separation_nanos
224    }
225
226    /// Spec §2.2.3.18 DESTINATION_ORDER — entscheidet, ob ein Sample
227    /// mit `source_ts` ans User-API geliefert werden darf.
228    /// `BY_RECEPTION_TIMESTAMP`: immer `true`. `BY_SOURCE_TIMESTAMP`:
229    /// nur wenn `source_ts` strikt groesser als `last_delivered_ts`
230    /// dieser Instanz ist (Tie-Breaker bei gleichem Timestamp via
231    /// Writer-GUID kommt im typisierten Pfad).
232    #[must_use]
233    pub fn should_deliver_under_destination_order(
234        &self,
235        keyhash: &KeyHash,
236        source_ts: Time,
237        by_source_timestamp: bool,
238    ) -> bool {
239        if !by_source_timestamp {
240            return true;
241        }
242        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
243        let Some(s) = g.by_keyhash.get(keyhash) else {
244            return true;
245        };
246        let Some(last) = s.last_delivered_ts else {
247            return true;
248        };
249        let last_nanos = u128::from(u64::try_from(last.sec).unwrap_or(0)) * 1_000_000_000
250            + u128::from(last.nanosec);
251        let src_nanos = u128::from(u64::try_from(source_ts.sec).unwrap_or(0)) * 1_000_000_000
252            + u128::from(source_ts.nanosec);
253        src_nanos > last_nanos
254    }
255
256    /// Markiert dass ein Sample mit `sample_ts` ans User-API geliefert
257    /// wurde (Spec §2.2.3.12 — fuer naechste Filter-Entscheidung).
258    pub fn record_delivery(&self, keyhash: &KeyHash, sample_ts: Time) {
259        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
260        if let Some(s) = g.by_keyhash.get_mut(keyhash) {
261            s.last_delivered_ts = Some(sample_ts);
262        }
263    }
264
265    /// Lookup ohne Mutation (Spec §2.2.2.4.2.14 `lookup_instance`).
266    #[must_use]
267    pub fn lookup(&self, keyhash: &KeyHash) -> Option<InstanceHandle> {
268        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
269        g.by_keyhash.get(keyhash).map(|s| s.handle)
270    }
271
272    /// Liefert eine Kopie des State-Snapshots fuer einen Handle.
273    #[must_use]
274    pub fn get_by_handle(&self, handle: InstanceHandle) -> Option<InstanceState> {
275        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
276        let kh = g.handle_to_keyhash.get(&handle)?;
277        g.by_keyhash.get(kh).cloned()
278    }
279
280    /// Liefert eine Kopie des State-Snapshots fuer einen KeyHash.
281    #[must_use]
282    pub fn get_by_keyhash(&self, keyhash: &KeyHash) -> Option<InstanceState> {
283        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
284        g.by_keyhash.get(keyhash).cloned()
285    }
286
287    /// Liefert den Key-Holder-Bytes-Stream fuer einen Handle (Spec
288    /// §2.2.2.4.2.13 `get_key_value`).
289    #[must_use]
290    pub fn get_key_holder(&self, handle: InstanceHandle) -> Option<alloc::vec::Vec<u8>> {
291        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
292        let kh = g.handle_to_keyhash.get(&handle)?;
293        g.by_keyhash.get(kh).map(|s| s.key_holder.clone())
294    }
295
296    /// Markiert die Instanz als `NOT_ALIVE_DISPOSED` (Spec
297    /// §2.2.2.4.2.10 `dispose`).
298    ///
299    /// Liefert `false` wenn die Instanz nicht bekannt ist.
300    pub fn dispose(&self, handle: InstanceHandle, timestamp: Option<Time>) -> bool {
301        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
302        let Some(kh) = g.handle_to_keyhash.get(&handle).copied() else {
303            return false;
304        };
305        if let Some(s) = g.by_keyhash.get_mut(&kh) {
306            s.kind = InstanceStateKind::NotAliveDisposed;
307            if let Some(ts) = timestamp {
308                s.last_sample_timestamp = Some(ts);
309                s.disposed_at = Some(ts);
310            }
311            return true;
312        }
313        false
314    }
315
316    /// Dekrementiert den Writer-Counter; faellt er auf 0, geht die
317    /// Instanz nach `NOT_ALIVE_NO_WRITERS` (Spec §2.2.2.4.2.7).
318    pub fn unregister(&self, handle: InstanceHandle, timestamp: Option<Time>) -> bool {
319        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
320        let Some(kh) = g.handle_to_keyhash.get(&handle).copied() else {
321            return false;
322        };
323        if let Some(s) = g.by_keyhash.get_mut(&kh) {
324            s.writer_count = s.writer_count.saturating_sub(1);
325            if s.writer_count == 0 && !matches!(s.kind, InstanceStateKind::NotAliveDisposed) {
326                s.kind = InstanceStateKind::NotAliveNoWriters;
327                if let Some(ts) = timestamp {
328                    s.no_writers_at = Some(ts);
329                }
330            }
331            if let Some(ts) = timestamp {
332                s.last_sample_timestamp = Some(ts);
333            }
334            return true;
335        }
336        false
337    }
338
339    /// Spec §2.2.3.10 OWNERSHIP=EXCLUSIVE Strength-Selection.
340    /// Liefert `true` wenn ein Sample vom Writer mit
341    /// `(writer_guid, writer_strength)` fuer die Instanz `keyhash`
342    /// akzeptiert werden soll.
343    ///
344    /// Algorithmus (Spec §2.2.3.10):
345    /// - Kein aktueller Owner → akzeptiere + setze als Owner.
346    /// - Strength > current → akzeptiere + ersetze Owner.
347    /// - Strength == current und guid > current_guid → akzeptiere
348    ///   (Spec-Tie-Breaker via lexikographisch hoehere Guid).
349    /// - Strength < current → reject.
350    /// - Strength == current und guid < current → reject.
351    /// - Strength == current und guid == current → akzeptiere
352    ///   (selber Writer).
353    pub fn should_accept_sample_under_exclusive_ownership(
354        &self,
355        keyhash: &KeyHash,
356        writer_guid: [u8; 16],
357        writer_strength: i32,
358    ) -> bool {
359        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
360        let Some(s) = g.by_keyhash.get_mut(keyhash) else {
361            return true; // unbekannte Instance → akzeptiere
362        };
363        match s.current_owner {
364            None => {
365                s.current_owner = Some((writer_guid, writer_strength));
366                true
367            }
368            Some((cur_guid, cur_str)) => {
369                if writer_strength > cur_str
370                    || (writer_strength == cur_str && writer_guid > cur_guid)
371                {
372                    s.current_owner = Some((writer_guid, writer_strength));
373                    true
374                } else {
375                    writer_strength == cur_str && writer_guid == cur_guid
376                }
377            }
378        }
379    }
380
381    /// Spec §2.2.3.23 — bei Liveliness-Loss eines Writers: clear-Owner
382    /// fuer alle Instanzen, deren Owner dieser Writer war. Naechster
383    /// Sample triggert Failover-Selection.
384    pub fn clear_owner_for_writer(&self, writer_guid: [u8; 16]) -> usize {
385        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
386        let mut cleared = 0;
387        for s in g.by_keyhash.values_mut() {
388            if let Some((g_, _)) = s.current_owner {
389                if g_ == writer_guid {
390                    s.current_owner = None;
391                    cleared += 1;
392                }
393            }
394        }
395        cleared
396    }
397
398    /// Wie [`Self::clear_owner_for_writer`], aber Match nach den ersten
399    /// 12 Bytes der GUID (GuidPrefix). Erlaubt Failover wenn nur die
400    /// Participant-Identitaet (z.B. via SPDP-Lease-Expiry) bekannt ist.
401    pub fn clear_owner_for_writer_prefix(&self, prefix: [u8; 12]) -> usize {
402        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
403        let mut cleared = 0;
404        for s in g.by_keyhash.values_mut() {
405            if let Some((g_, _)) = s.current_owner {
406                if g_[..12] == prefix {
407                    s.current_owner = None;
408                    cleared += 1;
409                }
410            }
411        }
412        cleared
413    }
414
415    /// Spec §2.2.3.22 READER_DATA_LIFECYCLE — purgt Instanzen, deren
416    /// Disposed/NoWriter-Marker laenger als der jeweilige Delay her ist.
417    /// `now` ist die Caller-side Wall-Clock; die Delays sind in Nanoseconds.
418    /// Liefert die Anzahl entfernter Instanzen.
419    ///
420    /// Lazy-Purge: wird vom Read-Pfad (oder einem Background-Tick)
421    /// gerufen. Spec laesst die Strategie offen — wir loeschen die
422    /// betroffene Instanz vollstaendig, sodass nachfolgende read/take
423    /// sie nicht mehr sehen.
424    pub fn autopurge(
425        &self,
426        now: Time,
427        autopurge_disposed_delay_nanos: u128,
428        autopurge_nowriter_delay_nanos: u128,
429    ) -> usize {
430        let now_nanos = u128::from(u64::try_from(now.sec).unwrap_or(0)) * 1_000_000_000
431            + u128::from(now.nanosec);
432        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
433        let mut to_purge: alloc::vec::Vec<KeyHash> = alloc::vec::Vec::new();
434        for (kh, s) in g.by_keyhash.iter() {
435            let purge = match s.kind {
436                InstanceStateKind::NotAliveDisposed
437                    if autopurge_disposed_delay_nanos != u128::MAX =>
438                {
439                    s.disposed_at.is_some_and(|t| {
440                        let t_nanos = u128::from(u64::try_from(t.sec).unwrap_or(0)) * 1_000_000_000
441                            + u128::from(t.nanosec);
442                        now_nanos.saturating_sub(t_nanos) >= autopurge_disposed_delay_nanos
443                    })
444                }
445                InstanceStateKind::NotAliveNoWriters
446                    if autopurge_nowriter_delay_nanos != u128::MAX =>
447                {
448                    s.no_writers_at.is_some_and(|t| {
449                        let t_nanos = u128::from(u64::try_from(t.sec).unwrap_or(0)) * 1_000_000_000
450                            + u128::from(t.nanosec);
451                        now_nanos.saturating_sub(t_nanos) >= autopurge_nowriter_delay_nanos
452                    })
453                }
454                _ => false,
455            };
456            if purge {
457                to_purge.push(*kh);
458            }
459        }
460        let count = to_purge.len();
461        for kh in to_purge {
462            if let Some(s) = g.by_keyhash.remove(&kh) {
463                g.handle_to_keyhash.remove(&s.handle);
464            }
465        }
466        count
467    }
468
469    /// Reader-seitiger Hook: Markiert, dass ein Sample fuer diese
470    /// Instanz angekommen ist. Liefert `(handle, war_neu)`, wo
471    /// `war_neu == true` heisst dass diese Instanz vorher unbekannt war
472    /// oder die Reader-View frisch zurueckgesetzt wurde (`view_state =
473    /// NEW`).
474    pub fn observe_sample(
475        &self,
476        keyhash: KeyHash,
477        key_holder: alloc::vec::Vec<u8>,
478        timestamp: Option<Time>,
479    ) -> (InstanceHandle, bool) {
480        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
481        let mut was_new = false;
482        let entry = g.by_keyhash.entry(keyhash).or_insert_with(|| {
483            was_new = true;
484            let h = self.allocator.allocate();
485            InstanceState::fresh(h, key_holder.clone())
486        });
487        // Reaktivierung Reader-Sicht
488        if matches!(
489            entry.kind,
490            InstanceStateKind::NotAliveDisposed | InstanceStateKind::NotAliveNoWriters
491        ) {
492            // Generation-Bumps wie auf Writer-Seite — das matched die
493            // Spec, weil Reader das aus dem Sample-Stream herleiten.
494            match entry.kind {
495                InstanceStateKind::NotAliveDisposed => {
496                    entry.disposed_generation_count =
497                        entry.disposed_generation_count.saturating_add(1);
498                }
499                InstanceStateKind::NotAliveNoWriters => {
500                    entry.no_writers_generation_count =
501                        entry.no_writers_generation_count.saturating_add(1);
502                }
503                InstanceStateKind::Alive => {}
504            }
505            entry.kind = InstanceStateKind::Alive;
506            entry.reader_view_new = true;
507        }
508        if let Some(ts) = timestamp {
509            entry.last_sample_timestamp = Some(ts);
510        }
511        entry.samples_in_cache = entry.samples_in_cache.saturating_add(1);
512        let handle = entry.handle;
513        g.handle_to_keyhash.insert(handle, keyhash);
514        (handle, was_new)
515    }
516
517    /// Reader-seitig: nach erstem `read`/`take` einer Instanz wird ihr
518    /// View-State auf `NOT_NEW` gesetzt.
519    pub fn mark_view_seen(&self, handle: InstanceHandle) {
520        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
521        if let Some(kh) = g.handle_to_keyhash.get(&handle).copied() {
522            if let Some(s) = g.by_keyhash.get_mut(&kh) {
523                s.reader_view_new = false;
524            }
525        }
526    }
527
528    /// Reader-seitig: nach `take` reduziert sich `samples_in_cache`
529    /// um `n`.
530    pub fn drain_samples(&self, handle: InstanceHandle, n: u32) {
531        let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
532        if let Some(kh) = g.handle_to_keyhash.get(&handle).copied() {
533            if let Some(s) = g.by_keyhash.get_mut(&kh) {
534                s.samples_in_cache = s.samples_in_cache.saturating_sub(n);
535            }
536        }
537    }
538
539    /// Listet alle Instanz-Handles in stabiler Reihenfolge (BTreeMap-
540    /// Order ueber KeyHash). Spec §2.2.2.5.3.28 `read_next_instance`.
541    #[must_use]
542    pub fn ordered_handles(&self) -> alloc::vec::Vec<InstanceHandle> {
543        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
544        g.by_keyhash.values().map(|s| s.handle).collect()
545    }
546
547    /// Liefert das **erste** Handle, dessen Sortier-Ordnung strikt
548    /// hinter `previous_handle` liegt (oder das erste ueberhaupt, wenn
549    /// `previous == HANDLE_NIL`). Spec §2.2.2.5.3.28.
550    #[must_use]
551    pub fn next_handle_after(&self, previous: InstanceHandle) -> Option<InstanceHandle> {
552        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
553        if previous.is_nil() {
554            return g.by_keyhash.values().next().map(|s| s.handle);
555        }
556        let prev_kh = g.handle_to_keyhash.get(&previous).copied()?;
557        let range: (core::ops::Bound<KeyHash>, core::ops::Bound<KeyHash>) = (
558            core::ops::Bound::Excluded(prev_kh),
559            core::ops::Bound::Unbounded,
560        );
561        g.by_keyhash.range(range).next().map(|(_, s)| s.handle)
562    }
563
564    /// Anzahl getrackter Instanzen.
565    #[must_use]
566    pub fn len(&self) -> usize {
567        let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
568        g.by_keyhash.len()
569    }
570
571    /// `true` wenn keine Instanzen getrackt sind.
572    #[must_use]
573    pub fn is_empty(&self) -> bool {
574        self.len() == 0
575    }
576}
577
578impl Clone for InstanceTracker {
579    fn clone(&self) -> Self {
580        Self {
581            inner: Arc::clone(&self.inner),
582            allocator: Arc::clone(&self.allocator),
583        }
584    }
585}
586
587#[cfg(test)]
588#[allow(clippy::expect_used, clippy::unwrap_used)]
589mod tests {
590    use super::*;
591
592    fn kh(byte: u8) -> KeyHash {
593        let mut k = [0u8; KEY_HASH_LEN];
594        k[0] = byte;
595        k
596    }
597
598    #[test]
599    fn register_assigns_stable_handle() {
600        let t = InstanceTracker::new();
601        let h1 = t.register(kh(1), alloc::vec![1], None);
602        let h2 = t.register(kh(1), alloc::vec![1], None);
603        assert_eq!(h1, h2);
604        assert!(!h1.is_nil());
605    }
606
607    #[test]
608    fn lookup_returns_handle_for_known_key() {
609        let t = InstanceTracker::new();
610        let h = t.register(kh(2), alloc::vec![2], None);
611        assert_eq!(t.lookup(&kh(2)), Some(h));
612        assert_eq!(t.lookup(&kh(99)), None);
613    }
614
615    #[test]
616    fn dispose_transitions_to_disposed() {
617        let t = InstanceTracker::new();
618        let h = t.register(kh(3), alloc::vec![3], None);
619        assert_eq!(t.get_by_handle(h).unwrap().kind, InstanceStateKind::Alive);
620        assert!(t.dispose(h, None));
621        assert_eq!(
622            t.get_by_handle(h).unwrap().kind,
623            InstanceStateKind::NotAliveDisposed
624        );
625    }
626
627    #[test]
628    fn unregister_decrements_writer_count() {
629        let t = InstanceTracker::new();
630        let h = t.register(kh(4), alloc::vec![4], None);
631        // Zwei Writer haben sich registriert.
632        let _ = t.register(kh(4), alloc::vec![4], None);
633        assert_eq!(t.get_by_handle(h).unwrap().writer_count, 2);
634        assert!(t.unregister(h, None));
635        assert_eq!(t.get_by_handle(h).unwrap().kind, InstanceStateKind::Alive);
636        assert!(t.unregister(h, None));
637        // jetzt 0 Writer → no_writers
638        assert_eq!(
639            t.get_by_handle(h).unwrap().kind,
640            InstanceStateKind::NotAliveNoWriters
641        );
642    }
643
644    #[test]
645    fn re_register_after_dispose_bumps_disposed_generation() {
646        let t = InstanceTracker::new();
647        let h = t.register(kh(5), alloc::vec![5], None);
648        t.dispose(h, None);
649        let _ = t.register(kh(5), alloc::vec![5], None);
650        let s = t.get_by_handle(h).unwrap();
651        assert_eq!(s.kind, InstanceStateKind::Alive);
652        assert_eq!(s.disposed_generation_count, 1);
653    }
654
655    #[test]
656    fn observe_sample_creates_new_instance_on_first_call() {
657        let t = InstanceTracker::new();
658        let (h, was_new) = t.observe_sample(kh(6), alloc::vec![6], None);
659        assert!(was_new);
660        assert!(t.get_by_handle(h).unwrap().reader_view_new);
661        let (h2, was_new2) = t.observe_sample(kh(6), alloc::vec![6], None);
662        assert_eq!(h, h2);
663        assert!(!was_new2);
664    }
665
666    #[test]
667    fn ordered_handles_iterates_in_keyhash_order() {
668        let t = InstanceTracker::new();
669        let h_b = t.register(kh(2), alloc::vec![2], None);
670        let h_a = t.register(kh(1), alloc::vec![1], None);
671        let h_c = t.register(kh(3), alloc::vec![3], None);
672        assert_eq!(t.ordered_handles(), alloc::vec![h_a, h_b, h_c]);
673    }
674
675    #[test]
676    fn next_handle_after_walks_in_order() {
677        let t = InstanceTracker::new();
678        let h_a = t.register(kh(1), alloc::vec![1], None);
679        let h_b = t.register(kh(2), alloc::vec![2], None);
680        let h_c = t.register(kh(3), alloc::vec![3], None);
681        assert_eq!(t.next_handle_after(crate::HANDLE_NIL), Some(h_a));
682        assert_eq!(t.next_handle_after(h_a), Some(h_b));
683        assert_eq!(t.next_handle_after(h_b), Some(h_c));
684        assert_eq!(t.next_handle_after(h_c), None);
685    }
686
687    #[test]
688    fn get_key_holder_returns_stored_bytes() {
689        let t = InstanceTracker::new();
690        let h = t.register(kh(7), alloc::vec![1, 2, 3], None);
691        assert_eq!(t.get_key_holder(h), Some(alloc::vec![1u8, 2, 3]));
692    }
693
694    #[test]
695    fn mark_view_seen_clears_new_flag() {
696        let t = InstanceTracker::new();
697        let (h, _) = t.observe_sample(kh(8), alloc::vec![8], None);
698        assert!(t.get_by_handle(h).unwrap().reader_view_new);
699        t.mark_view_seen(h);
700        assert!(!t.get_by_handle(h).unwrap().reader_view_new);
701    }
702
703    #[test]
704    fn observe_after_dispose_bumps_disposed_generation() {
705        let t = InstanceTracker::new();
706        let (h, _) = t.observe_sample(kh(9), alloc::vec![9], None);
707        t.dispose(h, None);
708        let (_, _) = t.observe_sample(kh(9), alloc::vec![9], None);
709        assert_eq!(t.get_by_handle(h).unwrap().disposed_generation_count, 1);
710    }
711
712    #[test]
713    fn drain_samples_decrements_count() {
714        let t = InstanceTracker::new();
715        let (h, _) = t.observe_sample(kh(10), alloc::vec![10], None);
716        let (_, _) = t.observe_sample(kh(10), alloc::vec![10], None);
717        assert_eq!(t.get_by_handle(h).unwrap().samples_in_cache, 2);
718        t.drain_samples(h, 2);
719        assert_eq!(t.get_by_handle(h).unwrap().samples_in_cache, 0);
720    }
721
722    // ---- §2.2.3.12 TIME_BASED_FILTER Reader-Drop ----
723
724    #[test]
725    fn time_based_filter_first_sample_passes() {
726        let t = InstanceTracker::new();
727        // Instance neu — keine last_delivered_ts, also liefern.
728        let _ = t.observe_sample(kh(20), alloc::vec![20], Some(Time::new(1, 0)));
729        let pass = t.should_deliver_under_time_based_filter(
730            &kh(20),
731            Time::new(1, 0),
732            100_000_000, // 100ms
733        );
734        assert!(pass);
735    }
736
737    #[test]
738    fn time_based_filter_too_close_drops() {
739        let t = InstanceTracker::new();
740        let _ = t.observe_sample(kh(20), alloc::vec![20], None);
741        t.record_delivery(&kh(20), Time::new(1, 0));
742        // 50ms später — drop bei min_separation=100ms.
743        let pass = t.should_deliver_under_time_based_filter(
744            &kh(20),
745            Time::new(1, 50_000_000),
746            100_000_000,
747        );
748        assert!(!pass, "50ms < 100ms separation → drop");
749    }
750
751    #[test]
752    fn time_based_filter_far_enough_passes() {
753        let t = InstanceTracker::new();
754        let _ = t.observe_sample(kh(20), alloc::vec![20], None);
755        t.record_delivery(&kh(20), Time::new(1, 0));
756        // 150ms später — pass.
757        let pass = t.should_deliver_under_time_based_filter(
758            &kh(20),
759            Time::new(1, 150_000_000),
760            100_000_000,
761        );
762        assert!(pass, "150ms > 100ms separation → deliver");
763    }
764
765    #[test]
766    fn time_based_filter_zero_separation_always_passes() {
767        let t = InstanceTracker::new();
768        let _ = t.observe_sample(kh(20), alloc::vec![20], None);
769        t.record_delivery(&kh(20), Time::new(1, 0));
770        let pass = t.should_deliver_under_time_based_filter(&kh(20), Time::new(1, 0), 0);
771        assert!(pass, "min_separation=0 → kein Filter");
772    }
773
774    #[test]
775    fn time_based_filter_per_instance_isolation() {
776        // Filter ist per-Instanz; Instance A's last_delivered beeinflusst
777        // Instance B nicht.
778        let t = InstanceTracker::new();
779        let _ = t.observe_sample(kh(1), alloc::vec![1], None);
780        let _ = t.observe_sample(kh(2), alloc::vec![2], None);
781        t.record_delivery(&kh(1), Time::new(5, 0));
782        // Instance 2 hat noch keine delivery → pass.
783        let pass =
784            t.should_deliver_under_time_based_filter(&kh(2), Time::new(5, 10_000_000), 100_000_000);
785        assert!(pass);
786    }
787
788    #[test]
789    fn time_based_filter_unknown_instance_passes() {
790        let t = InstanceTracker::new();
791        let pass = t.should_deliver_under_time_based_filter(&kh(99), Time::new(1, 0), 100_000_000);
792        assert!(pass, "unbekannte Instanz → pass");
793    }
794
795    // ---- §2.2.3.22 READER_DATA_LIFECYCLE autopurge ----
796
797    #[test]
798    fn autopurge_disposed_after_delay() {
799        let t = InstanceTracker::new();
800        let h = t.register(kh(30), alloc::vec![30], None);
801        // Dispose mit ts=t1.
802        t.dispose(h, Some(Time::new(10, 0)));
803        // Now=t1+5s, delay=3s → purge.
804        let purged = t.autopurge(Time::new(15, 0), 3_000_000_000, u128::MAX);
805        assert_eq!(purged, 1);
806        // Nach purge: lookup liefert None.
807        assert!(t.lookup(&kh(30)).is_none());
808    }
809
810    #[test]
811    fn autopurge_disposed_before_delay_keeps_instance() {
812        let t = InstanceTracker::new();
813        let h = t.register(kh(31), alloc::vec![31], None);
814        t.dispose(h, Some(Time::new(10, 0)));
815        // Now=t1+1s, delay=5s → keep.
816        let purged = t.autopurge(Time::new(11, 0), 5_000_000_000, u128::MAX);
817        assert_eq!(purged, 0);
818        assert!(t.lookup(&kh(31)).is_some());
819    }
820
821    #[test]
822    fn autopurge_no_writers_after_delay() {
823        let t = InstanceTracker::new();
824        let h = t.register(kh(32), alloc::vec![32], None);
825        // unregister setzt writer_count=0 → NotAliveNoWriters mit timestamp.
826        t.unregister(h, Some(Time::new(20, 0)));
827        let purged = t.autopurge(Time::new(25, 0), u128::MAX, 3_000_000_000);
828        assert_eq!(purged, 1);
829        assert!(t.lookup(&kh(32)).is_none());
830    }
831
832    #[test]
833    fn autopurge_alive_instance_never_purged() {
834        let t = InstanceTracker::new();
835        let _h = t.register(kh(33), alloc::vec![33], None);
836        // ALIVE → purge ignoriert.
837        let purged = t.autopurge(Time::new(1000, 0), 0, 0);
838        assert_eq!(purged, 0);
839        assert!(t.lookup(&kh(33)).is_some());
840    }
841
842    #[test]
843    fn autopurge_infinity_delay_never_purges() {
844        // u128::MAX als Delay = INFINITE = nie purgen (Spec-Default).
845        let t = InstanceTracker::new();
846        let h = t.register(kh(34), alloc::vec![34], None);
847        t.dispose(h, Some(Time::new(10, 0)));
848        let purged = t.autopurge(Time::new(99999, 0), u128::MAX, u128::MAX);
849        assert_eq!(purged, 0);
850    }
851
852    // ---- §2.2.3.10 OWNERSHIP=EXCLUSIVE Strength-Selection ----
853
854    fn guid(byte: u8) -> [u8; 16] {
855        [byte; 16]
856    }
857
858    #[test]
859    fn exclusive_first_writer_wins() {
860        let t = InstanceTracker::new();
861        let _ = t.register(kh(40), alloc::vec![40], None);
862        // No owner → first writer accepted, becomes owner.
863        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(40), guid(1), 10));
864        let s = t.get_by_keyhash(&kh(40)).unwrap();
865        assert_eq!(s.current_owner, Some((guid(1), 10)));
866    }
867
868    #[test]
869    fn exclusive_higher_strength_wins() {
870        let t = InstanceTracker::new();
871        let _ = t.register(kh(41), alloc::vec![41], None);
872        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(41), guid(1), 10));
873        // Stronger writer arrives → replaces owner.
874        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(41), guid(2), 20));
875        let s = t.get_by_keyhash(&kh(41)).unwrap();
876        assert_eq!(s.current_owner, Some((guid(2), 20)));
877    }
878
879    #[test]
880    fn exclusive_lower_strength_rejected() {
881        let t = InstanceTracker::new();
882        let _ = t.register(kh(42), alloc::vec![42], None);
883        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(42), guid(2), 20));
884        // Weaker writer rejected.
885        assert!(!t.should_accept_sample_under_exclusive_ownership(&kh(42), guid(1), 5));
886        let s = t.get_by_keyhash(&kh(42)).unwrap();
887        assert_eq!(s.current_owner, Some((guid(2), 20)));
888    }
889
890    #[test]
891    fn exclusive_tie_break_by_higher_guid() {
892        let t = InstanceTracker::new();
893        let _ = t.register(kh(43), alloc::vec![43], None);
894        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(43), guid(1), 10));
895        // Same strength, higher guid → wins.
896        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(43), guid(2), 10));
897    }
898
899    #[test]
900    fn exclusive_tie_break_lower_guid_rejected() {
901        let t = InstanceTracker::new();
902        let _ = t.register(kh(44), alloc::vec![44], None);
903        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(44), guid(2), 10));
904        // Same strength, lower guid → reject.
905        assert!(!t.should_accept_sample_under_exclusive_ownership(&kh(44), guid(1), 10));
906    }
907
908    #[test]
909    fn exclusive_same_writer_always_accepted() {
910        let t = InstanceTracker::new();
911        let _ = t.register(kh(45), alloc::vec![45], None);
912        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(45), guid(7), 10));
913        // Same writer, same strength → always accepted.
914        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(45), guid(7), 10));
915    }
916
917    // ---- §2.2.3.23 Liveliness-driven Failover ----
918
919    #[test]
920    fn clear_owner_for_writer_resets_owner() {
921        let t = InstanceTracker::new();
922        let _ = t.register(kh(50), alloc::vec![50], None);
923        let _ = t.register(kh(51), alloc::vec![51], None);
924        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(50), guid(9), 100));
925        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(51), guid(9), 100));
926        // Liveliness lost on writer guid(9) → clear from both instances.
927        let cleared = t.clear_owner_for_writer(guid(9));
928        assert_eq!(cleared, 2);
929        let s50 = t.get_by_keyhash(&kh(50)).unwrap();
930        let s51 = t.get_by_keyhash(&kh(51)).unwrap();
931        assert!(s50.current_owner.is_none());
932        assert!(s51.current_owner.is_none());
933    }
934
935    #[test]
936    fn failover_after_clear_accepts_weaker_writer() {
937        let t = InstanceTracker::new();
938        let _ = t.register(kh(52), alloc::vec![52], None);
939        // Strong writer becomes owner.
940        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(52), guid(9), 100));
941        // Weaker writer normally rejected.
942        assert!(!t.should_accept_sample_under_exclusive_ownership(&kh(52), guid(1), 10));
943        // Clear (Liveliness-Loss) → weaker writer kann uebernehmen.
944        t.clear_owner_for_writer(guid(9));
945        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(52), guid(1), 10));
946    }
947
948    #[test]
949    fn clear_owner_for_writer_prefix_matches_first_12_bytes() {
950        let t = InstanceTracker::new();
951        let _ = t.register(kh(60), alloc::vec![60], None);
952        // GUID = prefix [1;12] + entityId [9,9,9,9]
953        let mut full_a = [9u8; 16];
954        full_a[..12].fill(1);
955        let mut full_b = [9u8; 16];
956        full_b[..12].fill(2);
957        // owner with prefix=1
958        assert!(t.should_accept_sample_under_exclusive_ownership(&kh(60), full_a, 50));
959        // Clear by mismatching prefix → no clear.
960        assert_eq!(t.clear_owner_for_writer_prefix([2u8; 12]), 0);
961        let s = t.get_by_keyhash(&kh(60)).unwrap();
962        assert!(s.current_owner.is_some());
963        // Clear by matching prefix → cleared.
964        assert_eq!(t.clear_owner_for_writer_prefix([1u8; 12]), 1);
965        let s2 = t.get_by_keyhash(&kh(60)).unwrap();
966        assert!(s2.current_owner.is_none());
967        // Now full_b can take ownership (prefix=2, weaker).
968        let _ = full_b;
969    }
970
971    #[test]
972    fn clear_owner_for_writer_prefix_multi_instance() {
973        let t = InstanceTracker::new();
974        let _ = t.register(kh(70), alloc::vec![70], None);
975        let _ = t.register(kh(71), alloc::vec![71], None);
976        let _ = t.register(kh(72), alloc::vec![72], None);
977        let mut g = [0u8; 16];
978        g[..12].fill(7);
979        // Owner = same prefix on 3 instances.
980        for k in [kh(70), kh(71), kh(72)] {
981            assert!(t.should_accept_sample_under_exclusive_ownership(&k, g, 1));
982        }
983        // One participant disappears (lease lost) → all 3 cleared.
984        let cleared = t.clear_owner_for_writer_prefix([7u8; 12]);
985        assert_eq!(cleared, 3);
986    }
987}