1extern crate alloc;
27
28use alloc::collections::BTreeMap;
29use alloc::sync::Arc;
30
31#[cfg(feature = "std")]
32use std::sync::Mutex;
33
34use zerodds_cdr::KEY_HASH_LEN;
35
36use crate::instance_handle::{InstanceHandle, InstanceHandleAllocator};
37use crate::sample_info::InstanceStateKind;
38use crate::time::Time;
39
40pub type KeyHash = [u8; KEY_HASH_LEN];
42
43#[derive(Debug, Clone)]
45pub struct InstanceState {
46 pub handle: InstanceHandle,
48 pub kind: InstanceStateKind,
50 pub disposed_generation_count: i32,
52 pub no_writers_generation_count: i32,
54 pub writer_count: u32,
59 pub last_sample_timestamp: Option<Time>,
61 pub last_delivered_ts: Option<Time>,
66 pub disposed_at: Option<Time>,
71 pub no_writers_at: Option<Time>,
75 pub current_owner: Option<([u8; 16], i32)>,
80 pub key_holder: alloc::vec::Vec<u8>,
84 pub reader_view_new: bool,
86 pub samples_in_cache: u32,
89}
90
91impl InstanceState {
92 fn fresh(handle: InstanceHandle, key_holder: alloc::vec::Vec<u8>) -> Self {
93 Self {
94 handle,
95 kind: InstanceStateKind::Alive,
96 disposed_generation_count: 0,
97 no_writers_generation_count: 0,
98 writer_count: 0,
99 last_sample_timestamp: None,
100 last_delivered_ts: None,
101 disposed_at: None,
102 no_writers_at: None,
103 current_owner: None,
104 key_holder,
105 reader_view_new: true,
106 samples_in_cache: 0,
107 }
108 }
109}
110
111#[derive(Debug)]
114pub struct InstanceTracker {
115 inner: Arc<Mutex<TrackerInner>>,
116 allocator: Arc<InstanceHandleAllocator>,
117}
118
119#[derive(Debug, Default)]
120struct TrackerInner {
121 by_keyhash: BTreeMap<KeyHash, InstanceState>,
122 handle_to_keyhash: BTreeMap<InstanceHandle, KeyHash>,
123}
124
125impl Default for InstanceTracker {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131impl InstanceTracker {
132 #[must_use]
134 pub fn new() -> Self {
135 Self {
136 inner: Arc::new(Mutex::new(TrackerInner::default())),
137 allocator: Arc::new(InstanceHandleAllocator::new()),
138 }
139 }
140
141 #[must_use]
144 pub fn with_allocator(allocator: Arc<InstanceHandleAllocator>) -> Self {
145 Self {
146 inner: Arc::new(Mutex::new(TrackerInner::default())),
147 allocator,
148 }
149 }
150
151 pub fn register(
156 &self,
157 keyhash: KeyHash,
158 key_holder: alloc::vec::Vec<u8>,
159 timestamp: Option<Time>,
160 ) -> InstanceHandle {
161 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
162 let entry = g.by_keyhash.entry(keyhash).or_insert_with(|| {
163 let h = self.allocator.allocate();
164 InstanceState::fresh(h, key_holder.clone())
165 });
166 match entry.kind {
169 InstanceStateKind::NotAliveDisposed => {
170 entry.disposed_generation_count = entry.disposed_generation_count.saturating_add(1);
171 entry.kind = InstanceStateKind::Alive;
172 }
173 InstanceStateKind::NotAliveNoWriters => {
174 entry.no_writers_generation_count =
175 entry.no_writers_generation_count.saturating_add(1);
176 entry.kind = InstanceStateKind::Alive;
177 }
178 InstanceStateKind::Alive => {}
179 }
180 entry.writer_count = entry.writer_count.saturating_add(1);
181 if let Some(ts) = timestamp {
182 entry.last_sample_timestamp = Some(ts);
183 }
184 let handle = entry.handle;
185 g.handle_to_keyhash.insert(handle, keyhash);
186 handle
187 }
188
189 #[must_use]
197 pub fn should_deliver_under_time_based_filter(
198 &self,
199 keyhash: &KeyHash,
200 sample_ts: Time,
201 min_separation_nanos: u128,
202 ) -> bool {
203 if min_separation_nanos == 0 {
204 return true;
205 }
206 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
207 let Some(s) = g.by_keyhash.get(keyhash) else {
208 return true;
209 };
210 let Some(last) = s.last_delivered_ts else {
211 return true;
212 };
213 let last_nanos = u128::from(u64::try_from(last.sec).unwrap_or(0)) * 1_000_000_000
217 + u128::from(last.nanosec);
218 let sample_nanos = u128::from(u64::try_from(sample_ts.sec).unwrap_or(0)) * 1_000_000_000
219 + u128::from(sample_ts.nanosec);
220 if sample_nanos < last_nanos {
221 return true;
222 }
223 sample_nanos - last_nanos >= min_separation_nanos
224 }
225
226 #[must_use]
233 pub fn should_deliver_under_destination_order(
234 &self,
235 keyhash: &KeyHash,
236 source_ts: Time,
237 by_source_timestamp: bool,
238 ) -> bool {
239 if !by_source_timestamp {
240 return true;
241 }
242 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
243 let Some(s) = g.by_keyhash.get(keyhash) else {
244 return true;
245 };
246 let Some(last) = s.last_delivered_ts else {
247 return true;
248 };
249 let last_nanos = u128::from(u64::try_from(last.sec).unwrap_or(0)) * 1_000_000_000
250 + u128::from(last.nanosec);
251 let src_nanos = u128::from(u64::try_from(source_ts.sec).unwrap_or(0)) * 1_000_000_000
252 + u128::from(source_ts.nanosec);
253 src_nanos > last_nanos
254 }
255
256 pub fn record_delivery(&self, keyhash: &KeyHash, sample_ts: Time) {
259 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
260 if let Some(s) = g.by_keyhash.get_mut(keyhash) {
261 s.last_delivered_ts = Some(sample_ts);
262 }
263 }
264
265 #[must_use]
267 pub fn lookup(&self, keyhash: &KeyHash) -> Option<InstanceHandle> {
268 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
269 g.by_keyhash.get(keyhash).map(|s| s.handle)
270 }
271
272 #[must_use]
274 pub fn get_by_handle(&self, handle: InstanceHandle) -> Option<InstanceState> {
275 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
276 let kh = g.handle_to_keyhash.get(&handle)?;
277 g.by_keyhash.get(kh).cloned()
278 }
279
280 #[must_use]
282 pub fn get_by_keyhash(&self, keyhash: &KeyHash) -> Option<InstanceState> {
283 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
284 g.by_keyhash.get(keyhash).cloned()
285 }
286
287 #[must_use]
290 pub fn get_key_holder(&self, handle: InstanceHandle) -> Option<alloc::vec::Vec<u8>> {
291 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
292 let kh = g.handle_to_keyhash.get(&handle)?;
293 g.by_keyhash.get(kh).map(|s| s.key_holder.clone())
294 }
295
296 pub fn dispose(&self, handle: InstanceHandle, timestamp: Option<Time>) -> bool {
301 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
302 let Some(kh) = g.handle_to_keyhash.get(&handle).copied() else {
303 return false;
304 };
305 if let Some(s) = g.by_keyhash.get_mut(&kh) {
306 s.kind = InstanceStateKind::NotAliveDisposed;
307 if let Some(ts) = timestamp {
308 s.last_sample_timestamp = Some(ts);
309 s.disposed_at = Some(ts);
310 }
311 return true;
312 }
313 false
314 }
315
316 pub fn unregister(&self, handle: InstanceHandle, timestamp: Option<Time>) -> bool {
319 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
320 let Some(kh) = g.handle_to_keyhash.get(&handle).copied() else {
321 return false;
322 };
323 if let Some(s) = g.by_keyhash.get_mut(&kh) {
324 s.writer_count = s.writer_count.saturating_sub(1);
325 if s.writer_count == 0 && !matches!(s.kind, InstanceStateKind::NotAliveDisposed) {
326 s.kind = InstanceStateKind::NotAliveNoWriters;
327 if let Some(ts) = timestamp {
328 s.no_writers_at = Some(ts);
329 }
330 }
331 if let Some(ts) = timestamp {
332 s.last_sample_timestamp = Some(ts);
333 }
334 return true;
335 }
336 false
337 }
338
339 pub fn should_accept_sample_under_exclusive_ownership(
354 &self,
355 keyhash: &KeyHash,
356 writer_guid: [u8; 16],
357 writer_strength: i32,
358 ) -> bool {
359 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
360 let Some(s) = g.by_keyhash.get_mut(keyhash) else {
361 return true; };
363 match s.current_owner {
364 None => {
365 s.current_owner = Some((writer_guid, writer_strength));
366 true
367 }
368 Some((cur_guid, cur_str)) => {
369 if writer_strength > cur_str
370 || (writer_strength == cur_str && writer_guid > cur_guid)
371 {
372 s.current_owner = Some((writer_guid, writer_strength));
373 true
374 } else {
375 writer_strength == cur_str && writer_guid == cur_guid
376 }
377 }
378 }
379 }
380
381 pub fn clear_owner_for_writer(&self, writer_guid: [u8; 16]) -> usize {
385 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
386 let mut cleared = 0;
387 for s in g.by_keyhash.values_mut() {
388 if let Some((g_, _)) = s.current_owner {
389 if g_ == writer_guid {
390 s.current_owner = None;
391 cleared += 1;
392 }
393 }
394 }
395 cleared
396 }
397
398 pub fn clear_owner_for_writer_prefix(&self, prefix: [u8; 12]) -> usize {
402 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
403 let mut cleared = 0;
404 for s in g.by_keyhash.values_mut() {
405 if let Some((g_, _)) = s.current_owner {
406 if g_[..12] == prefix {
407 s.current_owner = None;
408 cleared += 1;
409 }
410 }
411 }
412 cleared
413 }
414
415 pub fn autopurge(
425 &self,
426 now: Time,
427 autopurge_disposed_delay_nanos: u128,
428 autopurge_nowriter_delay_nanos: u128,
429 ) -> usize {
430 let now_nanos = u128::from(u64::try_from(now.sec).unwrap_or(0)) * 1_000_000_000
431 + u128::from(now.nanosec);
432 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
433 let mut to_purge: alloc::vec::Vec<KeyHash> = alloc::vec::Vec::new();
434 for (kh, s) in g.by_keyhash.iter() {
435 let purge = match s.kind {
436 InstanceStateKind::NotAliveDisposed
437 if autopurge_disposed_delay_nanos != u128::MAX =>
438 {
439 s.disposed_at.is_some_and(|t| {
440 let t_nanos = u128::from(u64::try_from(t.sec).unwrap_or(0)) * 1_000_000_000
441 + u128::from(t.nanosec);
442 now_nanos.saturating_sub(t_nanos) >= autopurge_disposed_delay_nanos
443 })
444 }
445 InstanceStateKind::NotAliveNoWriters
446 if autopurge_nowriter_delay_nanos != u128::MAX =>
447 {
448 s.no_writers_at.is_some_and(|t| {
449 let t_nanos = u128::from(u64::try_from(t.sec).unwrap_or(0)) * 1_000_000_000
450 + u128::from(t.nanosec);
451 now_nanos.saturating_sub(t_nanos) >= autopurge_nowriter_delay_nanos
452 })
453 }
454 _ => false,
455 };
456 if purge {
457 to_purge.push(*kh);
458 }
459 }
460 let count = to_purge.len();
461 for kh in to_purge {
462 if let Some(s) = g.by_keyhash.remove(&kh) {
463 g.handle_to_keyhash.remove(&s.handle);
464 }
465 }
466 count
467 }
468
469 pub fn observe_sample(
475 &self,
476 keyhash: KeyHash,
477 key_holder: alloc::vec::Vec<u8>,
478 timestamp: Option<Time>,
479 ) -> (InstanceHandle, bool) {
480 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
481 let mut was_new = false;
482 let entry = g.by_keyhash.entry(keyhash).or_insert_with(|| {
483 was_new = true;
484 let h = self.allocator.allocate();
485 InstanceState::fresh(h, key_holder.clone())
486 });
487 if matches!(
489 entry.kind,
490 InstanceStateKind::NotAliveDisposed | InstanceStateKind::NotAliveNoWriters
491 ) {
492 match entry.kind {
495 InstanceStateKind::NotAliveDisposed => {
496 entry.disposed_generation_count =
497 entry.disposed_generation_count.saturating_add(1);
498 }
499 InstanceStateKind::NotAliveNoWriters => {
500 entry.no_writers_generation_count =
501 entry.no_writers_generation_count.saturating_add(1);
502 }
503 InstanceStateKind::Alive => {}
504 }
505 entry.kind = InstanceStateKind::Alive;
506 entry.reader_view_new = true;
507 }
508 if let Some(ts) = timestamp {
509 entry.last_sample_timestamp = Some(ts);
510 }
511 entry.samples_in_cache = entry.samples_in_cache.saturating_add(1);
512 let handle = entry.handle;
513 g.handle_to_keyhash.insert(handle, keyhash);
514 (handle, was_new)
515 }
516
517 pub fn mark_view_seen(&self, handle: InstanceHandle) {
520 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
521 if let Some(kh) = g.handle_to_keyhash.get(&handle).copied() {
522 if let Some(s) = g.by_keyhash.get_mut(&kh) {
523 s.reader_view_new = false;
524 }
525 }
526 }
527
528 pub fn drain_samples(&self, handle: InstanceHandle, n: u32) {
531 let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
532 if let Some(kh) = g.handle_to_keyhash.get(&handle).copied() {
533 if let Some(s) = g.by_keyhash.get_mut(&kh) {
534 s.samples_in_cache = s.samples_in_cache.saturating_sub(n);
535 }
536 }
537 }
538
539 #[must_use]
542 pub fn ordered_handles(&self) -> alloc::vec::Vec<InstanceHandle> {
543 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
544 g.by_keyhash.values().map(|s| s.handle).collect()
545 }
546
547 #[must_use]
551 pub fn next_handle_after(&self, previous: InstanceHandle) -> Option<InstanceHandle> {
552 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
553 if previous.is_nil() {
554 return g.by_keyhash.values().next().map(|s| s.handle);
555 }
556 let prev_kh = g.handle_to_keyhash.get(&previous).copied()?;
557 let range: (core::ops::Bound<KeyHash>, core::ops::Bound<KeyHash>) = (
558 core::ops::Bound::Excluded(prev_kh),
559 core::ops::Bound::Unbounded,
560 );
561 g.by_keyhash.range(range).next().map(|(_, s)| s.handle)
562 }
563
564 #[must_use]
566 pub fn len(&self) -> usize {
567 let g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
568 g.by_keyhash.len()
569 }
570
571 #[must_use]
573 pub fn is_empty(&self) -> bool {
574 self.len() == 0
575 }
576}
577
578impl Clone for InstanceTracker {
579 fn clone(&self) -> Self {
580 Self {
581 inner: Arc::clone(&self.inner),
582 allocator: Arc::clone(&self.allocator),
583 }
584 }
585}
586
587#[cfg(test)]
588#[allow(clippy::expect_used, clippy::unwrap_used)]
589mod tests {
590 use super::*;
591
592 fn kh(byte: u8) -> KeyHash {
593 let mut k = [0u8; KEY_HASH_LEN];
594 k[0] = byte;
595 k
596 }
597
598 #[test]
599 fn register_assigns_stable_handle() {
600 let t = InstanceTracker::new();
601 let h1 = t.register(kh(1), alloc::vec![1], None);
602 let h2 = t.register(kh(1), alloc::vec![1], None);
603 assert_eq!(h1, h2);
604 assert!(!h1.is_nil());
605 }
606
607 #[test]
608 fn lookup_returns_handle_for_known_key() {
609 let t = InstanceTracker::new();
610 let h = t.register(kh(2), alloc::vec![2], None);
611 assert_eq!(t.lookup(&kh(2)), Some(h));
612 assert_eq!(t.lookup(&kh(99)), None);
613 }
614
615 #[test]
616 fn dispose_transitions_to_disposed() {
617 let t = InstanceTracker::new();
618 let h = t.register(kh(3), alloc::vec![3], None);
619 assert_eq!(t.get_by_handle(h).unwrap().kind, InstanceStateKind::Alive);
620 assert!(t.dispose(h, None));
621 assert_eq!(
622 t.get_by_handle(h).unwrap().kind,
623 InstanceStateKind::NotAliveDisposed
624 );
625 }
626
627 #[test]
628 fn unregister_decrements_writer_count() {
629 let t = InstanceTracker::new();
630 let h = t.register(kh(4), alloc::vec![4], None);
631 let _ = t.register(kh(4), alloc::vec![4], None);
633 assert_eq!(t.get_by_handle(h).unwrap().writer_count, 2);
634 assert!(t.unregister(h, None));
635 assert_eq!(t.get_by_handle(h).unwrap().kind, InstanceStateKind::Alive);
636 assert!(t.unregister(h, None));
637 assert_eq!(
639 t.get_by_handle(h).unwrap().kind,
640 InstanceStateKind::NotAliveNoWriters
641 );
642 }
643
644 #[test]
645 fn re_register_after_dispose_bumps_disposed_generation() {
646 let t = InstanceTracker::new();
647 let h = t.register(kh(5), alloc::vec![5], None);
648 t.dispose(h, None);
649 let _ = t.register(kh(5), alloc::vec![5], None);
650 let s = t.get_by_handle(h).unwrap();
651 assert_eq!(s.kind, InstanceStateKind::Alive);
652 assert_eq!(s.disposed_generation_count, 1);
653 }
654
655 #[test]
656 fn observe_sample_creates_new_instance_on_first_call() {
657 let t = InstanceTracker::new();
658 let (h, was_new) = t.observe_sample(kh(6), alloc::vec![6], None);
659 assert!(was_new);
660 assert!(t.get_by_handle(h).unwrap().reader_view_new);
661 let (h2, was_new2) = t.observe_sample(kh(6), alloc::vec![6], None);
662 assert_eq!(h, h2);
663 assert!(!was_new2);
664 }
665
666 #[test]
667 fn ordered_handles_iterates_in_keyhash_order() {
668 let t = InstanceTracker::new();
669 let h_b = t.register(kh(2), alloc::vec![2], None);
670 let h_a = t.register(kh(1), alloc::vec![1], None);
671 let h_c = t.register(kh(3), alloc::vec![3], None);
672 assert_eq!(t.ordered_handles(), alloc::vec![h_a, h_b, h_c]);
673 }
674
675 #[test]
676 fn next_handle_after_walks_in_order() {
677 let t = InstanceTracker::new();
678 let h_a = t.register(kh(1), alloc::vec![1], None);
679 let h_b = t.register(kh(2), alloc::vec![2], None);
680 let h_c = t.register(kh(3), alloc::vec![3], None);
681 assert_eq!(t.next_handle_after(crate::HANDLE_NIL), Some(h_a));
682 assert_eq!(t.next_handle_after(h_a), Some(h_b));
683 assert_eq!(t.next_handle_after(h_b), Some(h_c));
684 assert_eq!(t.next_handle_after(h_c), None);
685 }
686
687 #[test]
688 fn get_key_holder_returns_stored_bytes() {
689 let t = InstanceTracker::new();
690 let h = t.register(kh(7), alloc::vec![1, 2, 3], None);
691 assert_eq!(t.get_key_holder(h), Some(alloc::vec![1u8, 2, 3]));
692 }
693
694 #[test]
695 fn mark_view_seen_clears_new_flag() {
696 let t = InstanceTracker::new();
697 let (h, _) = t.observe_sample(kh(8), alloc::vec![8], None);
698 assert!(t.get_by_handle(h).unwrap().reader_view_new);
699 t.mark_view_seen(h);
700 assert!(!t.get_by_handle(h).unwrap().reader_view_new);
701 }
702
703 #[test]
704 fn observe_after_dispose_bumps_disposed_generation() {
705 let t = InstanceTracker::new();
706 let (h, _) = t.observe_sample(kh(9), alloc::vec![9], None);
707 t.dispose(h, None);
708 let (_, _) = t.observe_sample(kh(9), alloc::vec![9], None);
709 assert_eq!(t.get_by_handle(h).unwrap().disposed_generation_count, 1);
710 }
711
712 #[test]
713 fn drain_samples_decrements_count() {
714 let t = InstanceTracker::new();
715 let (h, _) = t.observe_sample(kh(10), alloc::vec![10], None);
716 let (_, _) = t.observe_sample(kh(10), alloc::vec![10], None);
717 assert_eq!(t.get_by_handle(h).unwrap().samples_in_cache, 2);
718 t.drain_samples(h, 2);
719 assert_eq!(t.get_by_handle(h).unwrap().samples_in_cache, 0);
720 }
721
722 #[test]
725 fn time_based_filter_first_sample_passes() {
726 let t = InstanceTracker::new();
727 let _ = t.observe_sample(kh(20), alloc::vec![20], Some(Time::new(1, 0)));
729 let pass = t.should_deliver_under_time_based_filter(
730 &kh(20),
731 Time::new(1, 0),
732 100_000_000, );
734 assert!(pass);
735 }
736
737 #[test]
738 fn time_based_filter_too_close_drops() {
739 let t = InstanceTracker::new();
740 let _ = t.observe_sample(kh(20), alloc::vec![20], None);
741 t.record_delivery(&kh(20), Time::new(1, 0));
742 let pass = t.should_deliver_under_time_based_filter(
744 &kh(20),
745 Time::new(1, 50_000_000),
746 100_000_000,
747 );
748 assert!(!pass, "50ms < 100ms separation → drop");
749 }
750
751 #[test]
752 fn time_based_filter_far_enough_passes() {
753 let t = InstanceTracker::new();
754 let _ = t.observe_sample(kh(20), alloc::vec![20], None);
755 t.record_delivery(&kh(20), Time::new(1, 0));
756 let pass = t.should_deliver_under_time_based_filter(
758 &kh(20),
759 Time::new(1, 150_000_000),
760 100_000_000,
761 );
762 assert!(pass, "150ms > 100ms separation → deliver");
763 }
764
765 #[test]
766 fn time_based_filter_zero_separation_always_passes() {
767 let t = InstanceTracker::new();
768 let _ = t.observe_sample(kh(20), alloc::vec![20], None);
769 t.record_delivery(&kh(20), Time::new(1, 0));
770 let pass = t.should_deliver_under_time_based_filter(&kh(20), Time::new(1, 0), 0);
771 assert!(pass, "min_separation=0 → kein Filter");
772 }
773
774 #[test]
775 fn time_based_filter_per_instance_isolation() {
776 let t = InstanceTracker::new();
779 let _ = t.observe_sample(kh(1), alloc::vec![1], None);
780 let _ = t.observe_sample(kh(2), alloc::vec![2], None);
781 t.record_delivery(&kh(1), Time::new(5, 0));
782 let pass =
784 t.should_deliver_under_time_based_filter(&kh(2), Time::new(5, 10_000_000), 100_000_000);
785 assert!(pass);
786 }
787
788 #[test]
789 fn time_based_filter_unknown_instance_passes() {
790 let t = InstanceTracker::new();
791 let pass = t.should_deliver_under_time_based_filter(&kh(99), Time::new(1, 0), 100_000_000);
792 assert!(pass, "unbekannte Instanz → pass");
793 }
794
795 #[test]
798 fn autopurge_disposed_after_delay() {
799 let t = InstanceTracker::new();
800 let h = t.register(kh(30), alloc::vec![30], None);
801 t.dispose(h, Some(Time::new(10, 0)));
803 let purged = t.autopurge(Time::new(15, 0), 3_000_000_000, u128::MAX);
805 assert_eq!(purged, 1);
806 assert!(t.lookup(&kh(30)).is_none());
808 }
809
810 #[test]
811 fn autopurge_disposed_before_delay_keeps_instance() {
812 let t = InstanceTracker::new();
813 let h = t.register(kh(31), alloc::vec![31], None);
814 t.dispose(h, Some(Time::new(10, 0)));
815 let purged = t.autopurge(Time::new(11, 0), 5_000_000_000, u128::MAX);
817 assert_eq!(purged, 0);
818 assert!(t.lookup(&kh(31)).is_some());
819 }
820
821 #[test]
822 fn autopurge_no_writers_after_delay() {
823 let t = InstanceTracker::new();
824 let h = t.register(kh(32), alloc::vec![32], None);
825 t.unregister(h, Some(Time::new(20, 0)));
827 let purged = t.autopurge(Time::new(25, 0), u128::MAX, 3_000_000_000);
828 assert_eq!(purged, 1);
829 assert!(t.lookup(&kh(32)).is_none());
830 }
831
832 #[test]
833 fn autopurge_alive_instance_never_purged() {
834 let t = InstanceTracker::new();
835 let _h = t.register(kh(33), alloc::vec![33], None);
836 let purged = t.autopurge(Time::new(1000, 0), 0, 0);
838 assert_eq!(purged, 0);
839 assert!(t.lookup(&kh(33)).is_some());
840 }
841
842 #[test]
843 fn autopurge_infinity_delay_never_purges() {
844 let t = InstanceTracker::new();
846 let h = t.register(kh(34), alloc::vec![34], None);
847 t.dispose(h, Some(Time::new(10, 0)));
848 let purged = t.autopurge(Time::new(99999, 0), u128::MAX, u128::MAX);
849 assert_eq!(purged, 0);
850 }
851
852 fn guid(byte: u8) -> [u8; 16] {
855 [byte; 16]
856 }
857
858 #[test]
859 fn exclusive_first_writer_wins() {
860 let t = InstanceTracker::new();
861 let _ = t.register(kh(40), alloc::vec![40], None);
862 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(40), guid(1), 10));
864 let s = t.get_by_keyhash(&kh(40)).unwrap();
865 assert_eq!(s.current_owner, Some((guid(1), 10)));
866 }
867
868 #[test]
869 fn exclusive_higher_strength_wins() {
870 let t = InstanceTracker::new();
871 let _ = t.register(kh(41), alloc::vec![41], None);
872 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(41), guid(1), 10));
873 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(41), guid(2), 20));
875 let s = t.get_by_keyhash(&kh(41)).unwrap();
876 assert_eq!(s.current_owner, Some((guid(2), 20)));
877 }
878
879 #[test]
880 fn exclusive_lower_strength_rejected() {
881 let t = InstanceTracker::new();
882 let _ = t.register(kh(42), alloc::vec![42], None);
883 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(42), guid(2), 20));
884 assert!(!t.should_accept_sample_under_exclusive_ownership(&kh(42), guid(1), 5));
886 let s = t.get_by_keyhash(&kh(42)).unwrap();
887 assert_eq!(s.current_owner, Some((guid(2), 20)));
888 }
889
890 #[test]
891 fn exclusive_tie_break_by_higher_guid() {
892 let t = InstanceTracker::new();
893 let _ = t.register(kh(43), alloc::vec![43], None);
894 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(43), guid(1), 10));
895 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(43), guid(2), 10));
897 }
898
899 #[test]
900 fn exclusive_tie_break_lower_guid_rejected() {
901 let t = InstanceTracker::new();
902 let _ = t.register(kh(44), alloc::vec![44], None);
903 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(44), guid(2), 10));
904 assert!(!t.should_accept_sample_under_exclusive_ownership(&kh(44), guid(1), 10));
906 }
907
908 #[test]
909 fn exclusive_same_writer_always_accepted() {
910 let t = InstanceTracker::new();
911 let _ = t.register(kh(45), alloc::vec![45], None);
912 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(45), guid(7), 10));
913 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(45), guid(7), 10));
915 }
916
917 #[test]
920 fn clear_owner_for_writer_resets_owner() {
921 let t = InstanceTracker::new();
922 let _ = t.register(kh(50), alloc::vec![50], None);
923 let _ = t.register(kh(51), alloc::vec![51], None);
924 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(50), guid(9), 100));
925 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(51), guid(9), 100));
926 let cleared = t.clear_owner_for_writer(guid(9));
928 assert_eq!(cleared, 2);
929 let s50 = t.get_by_keyhash(&kh(50)).unwrap();
930 let s51 = t.get_by_keyhash(&kh(51)).unwrap();
931 assert!(s50.current_owner.is_none());
932 assert!(s51.current_owner.is_none());
933 }
934
935 #[test]
936 fn failover_after_clear_accepts_weaker_writer() {
937 let t = InstanceTracker::new();
938 let _ = t.register(kh(52), alloc::vec![52], None);
939 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(52), guid(9), 100));
941 assert!(!t.should_accept_sample_under_exclusive_ownership(&kh(52), guid(1), 10));
943 t.clear_owner_for_writer(guid(9));
945 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(52), guid(1), 10));
946 }
947
948 #[test]
949 fn clear_owner_for_writer_prefix_matches_first_12_bytes() {
950 let t = InstanceTracker::new();
951 let _ = t.register(kh(60), alloc::vec![60], None);
952 let mut full_a = [9u8; 16];
954 full_a[..12].fill(1);
955 let mut full_b = [9u8; 16];
956 full_b[..12].fill(2);
957 assert!(t.should_accept_sample_under_exclusive_ownership(&kh(60), full_a, 50));
959 assert_eq!(t.clear_owner_for_writer_prefix([2u8; 12]), 0);
961 let s = t.get_by_keyhash(&kh(60)).unwrap();
962 assert!(s.current_owner.is_some());
963 assert_eq!(t.clear_owner_for_writer_prefix([1u8; 12]), 1);
965 let s2 = t.get_by_keyhash(&kh(60)).unwrap();
966 assert!(s2.current_owner.is_none());
967 let _ = full_b;
969 }
970
971 #[test]
972 fn clear_owner_for_writer_prefix_multi_instance() {
973 let t = InstanceTracker::new();
974 let _ = t.register(kh(70), alloc::vec![70], None);
975 let _ = t.register(kh(71), alloc::vec![71], None);
976 let _ = t.register(kh(72), alloc::vec![72], None);
977 let mut g = [0u8; 16];
978 g[..12].fill(7);
979 for k in [kh(70), kh(71), kh(72)] {
981 assert!(t.should_accept_sample_under_exclusive_ownership(&k, g, 1));
982 }
983 let cleared = t.clear_owner_for_writer_prefix([7u8; 12]);
985 assert_eq!(cleared, 3);
986 }
987}