use std::collections::HashMap;
use crate::record::obligation::{
ObligationAbortReason, ObligationKind, ObligationRecord, ObligationState,
};
use crate::types::symbol::{ObjectId, SymbolId};
use crate::types::{ObligationId, RegionId, TaskId, Time};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EpochId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EpochWindow {
pub start: EpochId,
pub end: EpochId,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SymbolObligationKind {
SymbolTransmit {
symbol_id: SymbolId,
destination: RegionId,
},
SymbolAck {
symbol_id: SymbolId,
source: RegionId,
},
DecodingInProgress {
object_id: ObjectId,
symbols_received: u32,
symbols_needed: u32,
},
EncodingSession {
object_id: ObjectId,
symbols_encoded: u32,
},
SymbolLease {
object_id: ObjectId,
lease_expires: Time,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DecodingProgressUpdateError {
NotDecodingObligation,
SymbolsReceivedExceedsNeeded {
received: u32,
needed: u32,
},
SymbolsReceivedRegressed {
previous: u32,
attempted: u32,
},
}
impl std::fmt::Display for DecodingProgressUpdateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotDecodingObligation => {
write!(
f,
"decoding progress can only be updated for decoding obligations"
)
}
Self::SymbolsReceivedExceedsNeeded { received, needed } => write!(
f,
"symbols_received ({received}) exceeds symbols_needed ({needed})"
),
Self::SymbolsReceivedRegressed {
previous,
attempted,
} => write!(
f,
"symbols_received regressed from {previous} to {attempted}"
),
}
}
}
impl std::error::Error for DecodingProgressUpdateError {}
#[derive(Debug)]
pub struct SymbolObligation {
inner: ObligationRecord,
kind: SymbolObligationKind,
valid_epoch: Option<EpochWindow>,
deadline: Option<Time>,
}
impl SymbolObligation {
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn transmit(
id: ObligationId,
holder: TaskId,
region: RegionId,
symbol_id: SymbolId,
destination: RegionId,
deadline: Option<Time>,
epoch_window: Option<EpochWindow>,
now: Time,
) -> Self {
Self {
inner: ObligationRecord::new(id, ObligationKind::IoOp, holder, region, now),
kind: SymbolObligationKind::SymbolTransmit {
symbol_id,
destination,
},
valid_epoch: epoch_window,
deadline,
}
}
#[must_use]
pub fn ack(
id: ObligationId,
holder: TaskId,
region: RegionId,
symbol_id: SymbolId,
source: RegionId,
now: Time,
) -> Self {
Self {
inner: ObligationRecord::new(id, ObligationKind::Ack, holder, region, now),
kind: SymbolObligationKind::SymbolAck { symbol_id, source },
valid_epoch: None,
deadline: None,
}
}
#[must_use]
pub fn decoding(
id: ObligationId,
holder: TaskId,
region: RegionId,
object_id: ObjectId,
symbols_needed: u32,
epoch_window: EpochWindow,
now: Time,
) -> Self {
Self {
inner: ObligationRecord::new(id, ObligationKind::IoOp, holder, region, now),
kind: SymbolObligationKind::DecodingInProgress {
object_id,
symbols_received: 0,
symbols_needed,
},
valid_epoch: Some(epoch_window),
deadline: None,
}
}
#[must_use]
pub fn lease(
id: ObligationId,
holder: TaskId,
region: RegionId,
object_id: ObjectId,
lease_expires: Time,
now: Time,
) -> Self {
Self {
inner: ObligationRecord::new(id, ObligationKind::Lease, holder, region, now),
kind: SymbolObligationKind::SymbolLease {
object_id,
lease_expires,
},
valid_epoch: None,
deadline: Some(lease_expires),
}
}
#[must_use]
#[inline]
pub fn is_pending(&self) -> bool {
self.inner.is_pending()
}
#[must_use]
pub fn is_epoch_valid(&self, current_epoch: EpochId) -> bool {
self.valid_epoch
.is_none_or(|window| current_epoch >= window.start && current_epoch <= window.end)
}
#[must_use]
pub fn is_expired(&self, now: Time) -> bool {
self.deadline.is_some_and(|deadline| now > deadline)
}
pub fn commit(&mut self, now: Time) {
self.inner.commit(now);
}
pub fn abort(&mut self, now: Time) {
self.inner.abort(now, ObligationAbortReason::Explicit);
}
pub fn mark_leaked(&mut self, now: Time) {
self.inner.mark_leaked(now);
}
pub fn update_decoding_progress(
&mut self,
symbols_received: u32,
) -> Result<(), DecodingProgressUpdateError> {
if let SymbolObligationKind::DecodingInProgress {
symbols_received: ref mut count,
symbols_needed,
..
} = self.kind
{
if symbols_received > symbols_needed {
return Err(DecodingProgressUpdateError::SymbolsReceivedExceedsNeeded {
received: symbols_received,
needed: symbols_needed,
});
}
if symbols_received < *count {
return Err(DecodingProgressUpdateError::SymbolsReceivedRegressed {
previous: *count,
attempted: symbols_received,
});
}
*count = symbols_received;
Ok(())
} else {
Err(DecodingProgressUpdateError::NotDecodingObligation)
}
}
#[must_use]
#[inline]
pub fn symbol_kind(&self) -> &SymbolObligationKind {
&self.kind
}
#[must_use]
#[inline]
pub fn state(&self) -> ObligationState {
self.inner.state
}
#[must_use]
#[inline]
pub fn id(&self) -> ObligationId {
self.inner.id
}
}
#[derive(Debug)]
pub struct SymbolObligationTracker {
obligations: HashMap<ObligationId, SymbolObligation>,
by_symbol: HashMap<SymbolId, Vec<ObligationId>>,
by_object: HashMap<ObjectId, Vec<ObligationId>>,
region_id: RegionId,
}
impl SymbolObligationTracker {
fn assert_registration_valid(&self, obligation: &SymbolObligation) {
assert_eq!(
obligation.inner.region, self.region_id,
"symbol obligation tracker region mismatch"
);
assert!(
!self.obligations.contains_key(&obligation.id()),
"duplicate symbol obligation id registered"
);
}
fn index_obligation_id(&mut self, id: ObligationId, kind: &SymbolObligationKind) {
match kind {
SymbolObligationKind::SymbolTransmit { symbol_id, .. }
| SymbolObligationKind::SymbolAck { symbol_id, .. } => {
self.by_symbol
.entry(*symbol_id)
.or_insert_with(|| Vec::with_capacity(2))
.push(id);
}
SymbolObligationKind::DecodingInProgress { object_id, .. }
| SymbolObligationKind::EncodingSession { object_id, .. }
| SymbolObligationKind::SymbolLease { object_id, .. } => {
self.by_object
.entry(*object_id)
.or_insert_with(|| Vec::with_capacity(2))
.push(id);
}
}
}
fn remove_indexed_obligation_id(&mut self, id: ObligationId, kind: &SymbolObligationKind) {
match kind {
SymbolObligationKind::SymbolTransmit { symbol_id, .. }
| SymbolObligationKind::SymbolAck { symbol_id, .. } => {
if let Some(ids) = self.by_symbol.get_mut(symbol_id) {
ids.retain(|i| *i != id);
if ids.is_empty() {
self.by_symbol.remove(symbol_id);
}
}
}
SymbolObligationKind::DecodingInProgress { object_id, .. }
| SymbolObligationKind::EncodingSession { object_id, .. }
| SymbolObligationKind::SymbolLease { object_id, .. } => {
if let Some(ids) = self.by_object.get_mut(object_id) {
ids.retain(|i| *i != id);
if ids.is_empty() {
self.by_object.remove(object_id);
}
}
}
}
}
fn remove_obligation(&mut self, id: ObligationId) -> Option<SymbolObligation> {
let obligation = self.obligations.remove(&id)?;
self.remove_indexed_obligation_id(id, &obligation.kind);
Some(obligation)
}
#[must_use]
pub fn new(region_id: RegionId) -> Self {
Self {
obligations: HashMap::with_capacity(16),
by_symbol: HashMap::with_capacity(16),
by_object: HashMap::with_capacity(16),
region_id,
}
}
#[must_use]
pub fn region_id(&self) -> RegionId {
self.region_id
}
pub fn register(&mut self, obligation: SymbolObligation) -> ObligationId {
self.assert_registration_valid(&obligation);
let id = obligation.id();
self.index_obligation_id(id, &obligation.kind);
self.obligations.insert(id, obligation);
id
}
pub fn resolve(
&mut self,
id: ObligationId,
commit: bool,
now: Time,
) -> Option<SymbolObligation> {
self.remove_obligation(id).map(|mut ob| {
if ob.is_pending() {
if commit {
ob.commit(now);
} else {
ob.abort(now);
}
}
ob
})
}
pub fn pending(&self) -> impl Iterator<Item = &SymbolObligation> {
self.obligations.values().filter(|o| o.is_pending())
}
#[must_use]
pub fn by_symbol(&self, symbol_id: SymbolId) -> Vec<&SymbolObligation> {
self.by_symbol
.get(&symbol_id)
.map(|ids| {
ids.iter()
.filter_map(|id| self.obligations.get(id).filter(|ob| ob.is_pending()))
.collect()
})
.unwrap_or_default()
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.obligations.values().filter(|o| o.is_pending()).count()
}
pub fn check_leaks(&mut self, now: Time) -> Vec<ObligationId> {
let leaked: Vec<ObligationId> = self
.obligations
.iter()
.filter_map(|(id, ob)| ob.is_pending().then_some(*id))
.collect();
for id in &leaked {
if let Some(mut ob) = self.remove_obligation(*id) {
ob.mark_leaked(now);
}
}
leaked
}
pub fn abort_expired_epoch(&mut self, current_epoch: EpochId, now: Time) -> Vec<ObligationId> {
let aborted: Vec<ObligationId> = self
.obligations
.iter()
.filter_map(|(id, ob)| {
(ob.is_pending() && !ob.is_epoch_valid(current_epoch)).then_some(*id)
})
.collect();
for id in &aborted {
if let Some(mut ob) = self.remove_obligation(*id) {
ob.abort(now);
}
}
aborted
}
pub fn abort_expired_deadlines(&mut self, now: Time) -> Vec<ObligationId> {
let aborted: Vec<ObligationId> = self
.obligations
.iter()
.filter_map(|(id, ob)| (ob.is_pending() && ob.is_expired(now)).then_some(*id))
.collect();
for id in &aborted {
if let Some(mut ob) = self.remove_obligation(*id) {
ob.abort(now);
}
}
aborted
}
}
pub struct ObligationGuard<'a> {
tracker: &'a mut SymbolObligationTracker,
id: ObligationId,
resolved: bool,
}
impl<'a> ObligationGuard<'a> {
pub fn new(tracker: &'a mut SymbolObligationTracker, id: ObligationId) -> Self {
Self {
tracker,
id,
resolved: false,
}
}
pub fn commit(mut self, now: Time) {
self.tracker.resolve(self.id, true, now);
self.resolved = true;
}
pub fn abort(mut self, now: Time) {
self.tracker.resolve(self.id, false, now);
self.resolved = true;
}
}
impl Drop for ObligationGuard<'_> {
fn drop(&mut self) {
if !self.resolved {
self.tracker.resolve(self.id, false, Time::ZERO);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::ArenaIndex;
fn test_ids() -> (ObligationId, TaskId, RegionId) {
(
ObligationId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
RegionId::from_arena(ArenaIndex::new(0, 0)),
)
}
#[test]
fn test_transmit_obligation_lifecycle_commit() {
let (oid, tid, rid) = test_ids();
let symbol_id = SymbolId::new_for_test(1, 0, 0);
let dest = RegionId::from_arena(ArenaIndex::new(1, 0));
let mut ob =
SymbolObligation::transmit(oid, tid, rid, symbol_id, dest, None, None, Time::ZERO);
assert!(ob.is_pending());
ob.commit(Time::from_millis(100));
assert!(!ob.is_pending());
assert_eq!(ob.state(), ObligationState::Committed);
}
#[test]
fn test_transmit_obligation_lifecycle_abort() {
let (oid, tid, rid) = test_ids();
let symbol_id = SymbolId::new_for_test(1, 0, 0);
let dest = RegionId::from_arena(ArenaIndex::new(1, 0));
let mut ob =
SymbolObligation::transmit(oid, tid, rid, symbol_id, dest, None, None, Time::ZERO);
ob.abort(Time::from_millis(100));
assert_eq!(ob.state(), ObligationState::Aborted);
}
#[test]
fn test_epoch_window_validity() {
let (oid, tid, rid) = test_ids();
let object_id = ObjectId::new_for_test(1);
let window = EpochWindow {
start: EpochId(10),
end: EpochId(20),
};
let ob = SymbolObligation::decoding(oid, tid, rid, object_id, 10, window, Time::ZERO);
assert!(!ob.is_epoch_valid(EpochId(5))); assert!(ob.is_epoch_valid(EpochId(10))); assert!(ob.is_epoch_valid(EpochId(15))); assert!(ob.is_epoch_valid(EpochId(20))); assert!(!ob.is_epoch_valid(EpochId(25))); }
#[test]
fn test_deadline_expiry() {
let (oid, tid, rid) = test_ids();
let object_id = ObjectId::new_for_test(1);
let deadline = Time::from_millis(1000);
let ob = SymbolObligation::lease(oid, tid, rid, object_id, deadline, Time::ZERO);
assert!(!ob.is_expired(Time::from_millis(500)));
assert!(!ob.is_expired(Time::from_millis(1000)));
assert!(ob.is_expired(Time::from_millis(1001)));
}
#[test]
fn test_tracker_registration() {
let rid = RegionId::from_arena(ArenaIndex::new(0, 0));
let mut tracker = SymbolObligationTracker::new(rid);
let (oid, tid, _) = test_ids();
let symbol_id = SymbolId::new_for_test(1, 0, 0);
let dest = RegionId::from_arena(ArenaIndex::new(1, 0));
let ob = SymbolObligation::transmit(oid, tid, rid, symbol_id, dest, None, None, Time::ZERO);
let id = tracker.register(ob);
assert_eq!(tracker.pending_count(), 1);
let found = tracker.by_symbol(symbol_id);
assert_eq!(found.len(), 1);
assert_eq!(found[0].id(), id);
}
#[test]
fn test_register_same_id_panics_and_preserves_original_obligation() {
let rid = RegionId::from_arena(ArenaIndex::new(0, 0));
let mut tracker = SymbolObligationTracker::new(rid);
let (oid, tid, _) = test_ids();
let dest = RegionId::from_arena(ArenaIndex::new(1, 0));
let first_symbol = SymbolId::new_for_test(11, 0, 0);
let second_symbol = SymbolId::new_for_test(12, 0, 0);
let first =
SymbolObligation::transmit(oid, tid, rid, first_symbol, dest, None, None, Time::ZERO);
tracker.register(first);
assert_eq!(tracker.by_symbol(first_symbol).len(), 1);
let second = SymbolObligation::transmit(
oid,
tid,
rid,
second_symbol,
dest,
None,
None,
Time::from_nanos(1),
);
let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tracker.register(second);
}));
assert!(panic.is_err());
let original = tracker.by_symbol(first_symbol);
assert_eq!(original.len(), 1);
assert_eq!(original[0].id(), oid);
assert!(tracker.by_symbol(second_symbol).is_empty());
assert_eq!(tracker.pending_count(), 1);
}
#[test]
fn test_register_cross_region_obligation_panics() {
let tracker_region = RegionId::from_arena(ArenaIndex::new(0, 0));
let other_region = RegionId::from_arena(ArenaIndex::new(9, 0));
let mut tracker = SymbolObligationTracker::new(tracker_region);
let (oid, tid, _) = test_ids();
let symbol_id = SymbolId::new_for_test(77, 0, 0);
let dest = RegionId::from_arena(ArenaIndex::new(1, 0));
let obligation = SymbolObligation::transmit(
oid,
tid,
other_region,
symbol_id,
dest,
None,
None,
Time::ZERO,
);
let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tracker.register(obligation);
}));
assert!(panic.is_err());
assert_eq!(tracker.pending_count(), 0);
assert!(tracker.by_symbol(symbol_id).is_empty());
}
#[test]
fn test_tracker_resolve_commit() {
let rid = RegionId::from_arena(ArenaIndex::new(0, 0));
let mut tracker = SymbolObligationTracker::new(rid);
let (oid, tid, _) = test_ids();
let symbol_id = SymbolId::new_for_test(1, 0, 0);
let dest = RegionId::from_arena(ArenaIndex::new(1, 0));
let ob = SymbolObligation::transmit(oid, tid, rid, symbol_id, dest, None, None, Time::ZERO);
let id = tracker.register(ob);
let resolved = tracker.resolve(id, true, Time::from_millis(100));
assert!(resolved.is_some());
assert_eq!(resolved.unwrap().state(), ObligationState::Committed);
assert_eq!(tracker.pending_count(), 0);
}
#[test]
fn test_leak_detection() {
let rid = RegionId::from_arena(ArenaIndex::new(0, 0));
let mut tracker = SymbolObligationTracker::new(rid);
let (oid1, tid, _) = test_ids();
let oid2 = ObligationId::from_arena(ArenaIndex::new(1, 0));
let symbol_id = SymbolId::new_for_test(1, 0, 0);
let dest = RegionId::from_arena(ArenaIndex::new(1, 0));
let ob1 =
SymbolObligation::transmit(oid1, tid, rid, symbol_id, dest, None, None, Time::ZERO);
let ob2 = SymbolObligation::ack(oid2, tid, rid, symbol_id, dest, Time::ZERO);
tracker.register(ob1);
let id2 = tracker.register(ob2);
tracker.resolve(id2, true, Time::from_millis(100));
let leaked = tracker.check_leaks(Time::from_millis(200));
assert_eq!(leaked.len(), 1);
assert_eq!(tracker.pending_count(), 0);
assert!(tracker.by_symbol(symbol_id).is_empty());
}
#[test]
fn test_abort_expired_epoch() {
let rid = RegionId::from_arena(ArenaIndex::new(0, 0));
let mut tracker = SymbolObligationTracker::new(rid);
let (oid, tid, _) = test_ids();
let object_id = ObjectId::new_for_test(1);
let window = EpochWindow {
start: EpochId(10),
end: EpochId(20),
};
let ob = SymbolObligation::decoding(oid, tid, rid, object_id, 10, window, Time::ZERO);
tracker.register(ob);
let aborted = tracker.abort_expired_epoch(EpochId(15), Time::from_millis(100));
assert_eq!(aborted.len(), 0);
let aborted = tracker.abort_expired_epoch(EpochId(25), Time::from_millis(200));
assert_eq!(aborted.len(), 1);
assert_eq!(tracker.pending_count(), 0);
assert!(tracker.obligations.is_empty());
assert!(tracker.by_object.is_empty());
}
#[test]
fn test_abort_expired_deadlines() {
let rid = RegionId::from_arena(ArenaIndex::new(0, 0));
let mut tracker = SymbolObligationTracker::new(rid);
let (oid, tid, _) = test_ids();
let object_id = ObjectId::new_for_test(1);
let deadline = Time::from_millis(1000);
let ob = SymbolObligation::lease(oid, tid, rid, object_id, deadline, Time::ZERO);
tracker.register(ob);
let aborted = tracker.abort_expired_deadlines(Time::from_millis(500));
assert_eq!(aborted.len(), 0);
let aborted = tracker.abort_expired_deadlines(Time::from_millis(1500));
assert_eq!(aborted.len(), 1);
assert_eq!(tracker.pending_count(), 0);
assert!(tracker.obligations.is_empty());
assert!(tracker.by_object.is_empty());
}
#[test]
fn test_decoding_progress_update() {
let (oid, tid, rid) = test_ids();
let object_id = ObjectId::new_for_test(1);
let window = EpochWindow {
start: EpochId(1),
end: EpochId(100),
};
let mut ob = SymbolObligation::decoding(oid, tid, rid, object_id, 10, window, Time::ZERO);
if let SymbolObligationKind::DecodingInProgress {
symbols_received, ..
} = ob.symbol_kind()
{
assert_eq!(*symbols_received, 0);
}
assert!(ob.update_decoding_progress(5).is_ok());
if let SymbolObligationKind::DecodingInProgress {
symbols_received, ..
} = ob.symbol_kind()
{
assert_eq!(*symbols_received, 5);
}
}
#[test]
fn test_decoding_progress_update_rejects_non_decoding_obligation() {
let (oid, tid, rid) = test_ids();
let symbol_id = SymbolId::new_for_test(42, 0, 0);
let mut ob = SymbolObligation::ack(oid, tid, rid, symbol_id, rid, Time::ZERO);
let result = ob.update_decoding_progress(1);
assert_eq!(
result,
Err(DecodingProgressUpdateError::NotDecodingObligation)
);
}
#[test]
fn test_decoding_progress_update_rejects_received_above_needed() {
let (oid, tid, rid) = test_ids();
let object_id = ObjectId::new_for_test(7);
let window = EpochWindow {
start: EpochId(1),
end: EpochId(2),
};
let mut ob = SymbolObligation::decoding(oid, tid, rid, object_id, 3, window, Time::ZERO);
let result = ob.update_decoding_progress(4);
assert_eq!(
result,
Err(DecodingProgressUpdateError::SymbolsReceivedExceedsNeeded {
received: 4,
needed: 3,
})
);
if let SymbolObligationKind::DecodingInProgress {
symbols_received, ..
} = ob.symbol_kind()
{
assert_eq!(*symbols_received, 0);
}
}
#[test]
fn test_decoding_progress_update_rejects_regression() {
let (oid, tid, rid) = test_ids();
let object_id = ObjectId::new_for_test(8);
let window = EpochWindow {
start: EpochId(1),
end: EpochId(2),
};
let mut ob = SymbolObligation::decoding(oid, tid, rid, object_id, 6, window, Time::ZERO);
assert!(ob.update_decoding_progress(4).is_ok());
let result = ob.update_decoding_progress(2);
assert_eq!(
result,
Err(DecodingProgressUpdateError::SymbolsReceivedRegressed {
previous: 4,
attempted: 2,
})
);
if let SymbolObligationKind::DecodingInProgress {
symbols_received, ..
} = ob.symbol_kind()
{
assert_eq!(*symbols_received, 4);
}
}
#[test]
#[should_panic(expected = "obligation already resolved")]
fn test_double_commit_panics() {
let (oid, tid, rid) = test_ids();
let symbol_id = SymbolId::new_for_test(1, 0, 0);
let dest = RegionId::from_arena(ArenaIndex::new(1, 0));
let mut ob =
SymbolObligation::transmit(oid, tid, rid, symbol_id, dest, None, None, Time::ZERO);
ob.commit(Time::from_millis(100));
ob.commit(Time::from_millis(200)); }
#[test]
fn test_no_epoch_constraint_always_valid() {
let (oid, tid, rid) = test_ids();
let symbol_id = SymbolId::new_for_test(1, 0, 0);
let ob = SymbolObligation::ack(oid, tid, rid, symbol_id, rid, Time::ZERO);
assert!(ob.is_epoch_valid(EpochId(0)));
assert!(ob.is_epoch_valid(EpochId(u64::MAX)));
}
#[test]
fn epoch_id_debug_clone_copy_eq_ord_hash() {
use std::collections::HashSet;
let a = EpochId(42);
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_ne!(a, EpochId(99));
assert!(a < EpochId(100));
let dbg = format!("{a:?}");
assert!(dbg.contains("EpochId"));
let mut set = HashSet::new();
set.insert(a);
assert!(set.contains(&b));
}
#[test]
fn epoch_window_debug_clone_copy_eq() {
let a = EpochWindow {
start: EpochId(10),
end: EpochId(20),
};
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_ne!(
a,
EpochWindow {
start: EpochId(0),
end: EpochId(5)
}
);
let dbg = format!("{a:?}");
assert!(dbg.contains("EpochWindow"));
}
}