use crate::store::Store;
use serde::{Deserialize, Serialize};
pub const SUBSCRIBER_FRONTIER_REPORT_SCHEMA_VERSION: u16 = 1;
pub type SubscriberFrontierHash = [u8; 32];
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SubscriberFrontierSource {
LossyPush,
CursorBacked,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SubscriberDeliveryState {
Active,
Lagging,
Dropped,
Disconnected,
Unknown,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum LossPrecision {
ExactRange,
LossAfterFrontier,
SubscriberDropped,
Unknown,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SubscriberFrontierRequest {
pub source: SubscriberFrontierSource,
pub consumed_frontier_sequence: Option<u64>,
pub delivery_state: SubscriberDeliveryState,
pub loss_precision: LossPrecision,
pub exact_dropped_ranges: Vec<(u64, u64)>,
}
impl SubscriberFrontierRequest {
#[must_use]
pub fn lossy_push(
consumed_frontier_sequence: Option<u64>,
delivery_state: SubscriberDeliveryState,
loss_precision: LossPrecision,
) -> Self {
Self {
source: SubscriberFrontierSource::LossyPush,
consumed_frontier_sequence,
delivery_state,
loss_precision,
exact_dropped_ranges: Vec::new(),
}
}
#[must_use]
pub fn cursor_backed(
consumed_frontier_sequence: Option<u64>,
delivery_state: SubscriberDeliveryState,
loss_precision: LossPrecision,
) -> Self {
Self {
source: SubscriberFrontierSource::CursorBacked,
consumed_frontier_sequence,
delivery_state,
loss_precision,
exact_dropped_ranges: Vec::new(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum SubscriberFrontierFinding {
ConsumedFrontierUnknown,
DeliveryStateUnknown,
LagObserved {
lag_events: u64,
},
ConsumedFrontierAheadOfAvailable {
consumed_sequence: u64,
available_sequence: u64,
},
DeliveryDropped,
DeliveryDisconnected,
LossObserved {
precision: LossPrecision,
},
ExactDroppedRange {
start_sequence: u64,
end_sequence: u64,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SubscriberFrontierReportBody {
pub schema_version: u16,
pub source: SubscriberFrontierSource,
pub consumed_frontier_sequence: Option<u64>,
pub available_frontier_sequence: u64,
pub lag_events: Option<u64>,
pub delivery_state: SubscriberDeliveryState,
pub loss_precision: LossPrecision,
pub findings: Vec<SubscriberFrontierFinding>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SubscriberFrontierEvidenceReport {
pub body: SubscriberFrontierReportBody,
pub body_hash: SubscriberFrontierHash,
pub generated_at_unix_ms: Option<u64>,
pub batpak_version: Option<String>,
pub diagnostics: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum SubscriberFrontierReportError {
BodyEncoding {
message: String,
},
}
impl std::fmt::Display for SubscriberFrontierReportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BodyEncoding { message } => {
write!(
f,
"subscriber frontier report body encoding failed: {message}"
)
}
}
}
}
impl std::error::Error for SubscriberFrontierReportError {}
impl<State: crate::store::StoreState> Store<State> {
pub fn subscriber_frontier_observation(
&self,
request: &SubscriberFrontierRequest,
) -> Result<SubscriberFrontierEvidenceReport, SubscriberFrontierReportError> {
let frontier = self.frontier();
let available_frontier_sequence = match request.source {
SubscriberFrontierSource::LossyPush => frontier.emitted_hlc.global_sequence,
SubscriberFrontierSource::CursorBacked => frontier.visible_hlc.global_sequence,
};
let lag_events = request
.consumed_frontier_sequence
.map(|consumed| available_frontier_sequence.saturating_sub(consumed));
let mut findings = Vec::new();
if request.consumed_frontier_sequence.is_none() {
findings.push(SubscriberFrontierFinding::ConsumedFrontierUnknown);
}
if request.delivery_state == SubscriberDeliveryState::Unknown {
findings.push(SubscriberFrontierFinding::DeliveryStateUnknown);
}
if let Some(lag) = lag_events {
if lag > 0 {
findings.push(SubscriberFrontierFinding::LagObserved { lag_events: lag });
}
}
if let Some(consumed_sequence) = request.consumed_frontier_sequence {
if consumed_sequence > available_frontier_sequence {
findings.push(
SubscriberFrontierFinding::ConsumedFrontierAheadOfAvailable {
consumed_sequence,
available_sequence: available_frontier_sequence,
},
);
}
}
match request.delivery_state {
SubscriberDeliveryState::Dropped => {
findings.push(SubscriberFrontierFinding::DeliveryDropped)
}
SubscriberDeliveryState::Disconnected => {
findings.push(SubscriberFrontierFinding::DeliveryDisconnected)
}
SubscriberDeliveryState::Active
| SubscriberDeliveryState::Lagging
| SubscriberDeliveryState::Unknown => {}
}
if request.loss_precision != LossPrecision::Unknown {
findings.push(SubscriberFrontierFinding::LossObserved {
precision: request.loss_precision,
});
}
if request.loss_precision == LossPrecision::ExactRange {
let mut ranges = request.exact_dropped_ranges.clone();
ranges.sort_unstable();
for (start_sequence, end_sequence) in ranges {
findings.push(SubscriberFrontierFinding::ExactDroppedRange {
start_sequence,
end_sequence,
});
}
}
crate::evidence::sort_findings(&mut findings);
let body = SubscriberFrontierReportBody {
schema_version: SUBSCRIBER_FRONTIER_REPORT_SCHEMA_VERSION,
source: request.source,
consumed_frontier_sequence: request.consumed_frontier_sequence,
available_frontier_sequence,
lag_events,
delivery_state: request.delivery_state,
loss_precision: request.loss_precision,
findings,
};
let body_hash = report_body_hash(&body)?;
Ok(SubscriberFrontierEvidenceReport {
body,
body_hash,
generated_at_unix_ms: None,
batpak_version: None,
diagnostics: Vec::new(),
})
}
}
fn report_body_hash(
body: &SubscriberFrontierReportBody,
) -> Result<SubscriberFrontierHash, SubscriberFrontierReportError> {
crate::evidence::report_body_hash(body, |message| {
SubscriberFrontierReportError::BodyEncoding { message }
})
}
#[cfg(test)]
mod lag_boundary_tests {
use super::{
LossPrecision, SubscriberDeliveryState, SubscriberFrontierFinding,
SubscriberFrontierRequest,
};
use crate::coordinate::Coordinate;
use crate::event::EventKind;
use crate::store::{Store, StoreConfig};
fn open_store() -> (tempfile::TempDir, Store) {
let dir = tempfile::TempDir::new().expect("temp dir");
let store = Store::open(StoreConfig::new(dir.path())).expect("open store");
(dir, store)
}
fn cursor_backed(consumed: Option<u64>) -> SubscriberFrontierRequest {
SubscriberFrontierRequest::cursor_backed(
consumed,
SubscriberDeliveryState::Active,
LossPrecision::Unknown,
)
}
#[test]
fn positive_lag_emits_lag_observed_with_the_exact_event_delta() {
let (_dir, store) = open_store();
let coord = Coordinate::new("entity:lag-positive", "scope:test").expect("coord");
let _receipt = store
.append(
&coord,
EventKind::custom(0xF, 0x40),
&serde_json::json!({ "n": 0 }),
)
.expect("append advances the visible frontier");
let report = store
.subscriber_frontier_observation(&cursor_backed(Some(0)))
.expect("observation");
let available = report.body.available_frontier_sequence;
assert!(
available >= 1,
"sanity: an acked append advanced the available (visible) frontier, got {available}"
);
assert_eq!(
report.body.lag_events,
Some(available),
"consumed 0 against available {available} is a lag of {available} events"
);
assert!(
report.body.findings.iter().any(|finding| matches!(
finding,
SubscriberFrontierFinding::LagObserved { lag_events } if *lag_events == available
)),
"PROPERTY: a positive lag emits LagObserved{{{available}}}; the `<`/`==` \
mutants of `lag > 0` drop it, got {:?}",
report.body.findings
);
assert!(
!report.body.findings.iter().any(|f| matches!(
f,
SubscriberFrontierFinding::ConsumedFrontierAheadOfAvailable { .. }
)),
"a trailing consumed frontier is not ahead of available"
);
}
#[test]
fn zero_lag_emits_neither_lag_observed_nor_ahead_of_available() {
let (_dir, store) = open_store();
let available = store
.subscriber_frontier_observation(&cursor_backed(None))
.expect("probe observation")
.body
.available_frontier_sequence;
let report = store
.subscriber_frontier_observation(&cursor_backed(Some(available)))
.expect("observation");
assert_eq!(
report.body.lag_events,
Some(0),
"consuming exactly up to available is a zero lag"
);
assert!(
!report
.body
.findings
.iter()
.any(|f| matches!(f, SubscriberFrontierFinding::LagObserved { .. })),
"PROPERTY: a zero lag emits NO LagObserved; the `>=`/`==` mutants of \
`lag > 0` fabricate one, got {:?}",
report.body.findings
);
assert!(
!report.body.findings.iter().any(|f| matches!(
f,
SubscriberFrontierFinding::ConsumedFrontierAheadOfAvailable { .. }
)),
"PROPERTY: consumed == available is NOT ahead of available; the `>=` mutant \
of `consumed > available` fabricates an ahead-of-available finding, got {:?}",
report.body.findings
);
}
}
#[cfg(test)]
mod tests {
use super::{LossPrecision, SubscriberFrontierFinding};
#[test]
fn subscriber_frontier_findings_are_sorted_structurally() {
let mut findings = vec![
SubscriberFrontierFinding::ExactDroppedRange {
start_sequence: 20,
end_sequence: 30,
},
SubscriberFrontierFinding::ExactDroppedRange {
start_sequence: 10,
end_sequence: 15,
},
SubscriberFrontierFinding::LossObserved {
precision: LossPrecision::Unknown,
},
];
crate::evidence::sort_findings(&mut findings);
assert_eq!(
findings,
vec![
SubscriberFrontierFinding::LossObserved {
precision: LossPrecision::Unknown,
},
SubscriberFrontierFinding::ExactDroppedRange {
start_sequence: 10,
end_sequence: 15,
},
SubscriberFrontierFinding::ExactDroppedRange {
start_sequence: 20,
end_sequence: 30,
},
],
"PROPERTY: subscriber frontier findings must be sorted in deterministic structural order"
);
}
}