use crate::store::Store;
use serde::{Deserialize, Serialize};
pub const SUBSCRIBER_FRONTIER_REPORT_SCHEMA_VERSION: u16 = 1;
pub type SubscriberFrontierHash = [u8; 32];
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SubscriberFrontierSource {
LossyPush,
CursorBacked,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SubscriberDeliveryState {
Active,
Lagging,
Dropped,
Disconnected,
Unknown,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum LossPrecision {
ExactRange,
LossAfterFrontier,
SubscriberDropped,
Unknown,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SubscriberFrontierRequest {
pub source: SubscriberFrontierSource,
pub consumed_frontier_sequence: Option<u64>,
pub delivery_state: SubscriberDeliveryState,
pub loss_precision: LossPrecision,
pub exact_dropped_ranges: Vec<(u64, u64)>,
}
impl SubscriberFrontierRequest {
#[must_use]
pub fn lossy_push(
consumed_frontier_sequence: Option<u64>,
delivery_state: SubscriberDeliveryState,
loss_precision: LossPrecision,
) -> Self {
Self {
source: SubscriberFrontierSource::LossyPush,
consumed_frontier_sequence,
delivery_state,
loss_precision,
exact_dropped_ranges: Vec::new(),
}
}
#[must_use]
pub fn cursor_backed(
consumed_frontier_sequence: Option<u64>,
delivery_state: SubscriberDeliveryState,
loss_precision: LossPrecision,
) -> Self {
Self {
source: SubscriberFrontierSource::CursorBacked,
consumed_frontier_sequence,
delivery_state,
loss_precision,
exact_dropped_ranges: Vec::new(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum SubscriberFrontierFinding {
ConsumedFrontierUnknown,
DeliveryStateUnknown,
LagObserved {
lag_events: u64,
},
ConsumedFrontierAheadOfAvailable {
consumed_sequence: u64,
available_sequence: u64,
},
DeliveryDropped,
DeliveryDisconnected,
LossObserved {
precision: LossPrecision,
},
ExactDroppedRange {
start_sequence: u64,
end_sequence: u64,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SubscriberFrontierReportBody {
pub schema_version: u16,
pub source: SubscriberFrontierSource,
pub consumed_frontier_sequence: Option<u64>,
pub available_frontier_sequence: u64,
pub lag_events: Option<u64>,
pub delivery_state: SubscriberDeliveryState,
pub loss_precision: LossPrecision,
pub findings: Vec<SubscriberFrontierFinding>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SubscriberFrontierEvidenceReport {
pub body: SubscriberFrontierReportBody,
pub body_hash: SubscriberFrontierHash,
pub generated_at_unix_ms: Option<u64>,
pub batpak_version: Option<String>,
pub diagnostics: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum SubscriberFrontierReportError {
BodyEncoding {
message: String,
},
}
impl std::fmt::Display for SubscriberFrontierReportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BodyEncoding { message } => {
write!(
f,
"subscriber frontier report body encoding failed: {message}"
)
}
}
}
}
impl std::error::Error for SubscriberFrontierReportError {}
impl<State> Store<State> {
pub fn subscriber_frontier_observation(
&self,
request: &SubscriberFrontierRequest,
) -> Result<SubscriberFrontierEvidenceReport, SubscriberFrontierReportError> {
let frontier = self.frontier();
let available_frontier_sequence = match request.source {
SubscriberFrontierSource::LossyPush => frontier.emitted_hlc.global_sequence,
SubscriberFrontierSource::CursorBacked => frontier.current_visible_hlc.global_sequence,
};
let lag_events = request
.consumed_frontier_sequence
.map(|consumed| available_frontier_sequence.saturating_sub(consumed));
let mut findings = Vec::new();
if request.consumed_frontier_sequence.is_none() {
findings.push(SubscriberFrontierFinding::ConsumedFrontierUnknown);
}
if request.delivery_state == SubscriberDeliveryState::Unknown {
findings.push(SubscriberFrontierFinding::DeliveryStateUnknown);
}
if let Some(lag) = lag_events {
if lag > 0 {
findings.push(SubscriberFrontierFinding::LagObserved { lag_events: lag });
}
}
if let Some(consumed_sequence) = request.consumed_frontier_sequence {
if consumed_sequence > available_frontier_sequence {
findings.push(
SubscriberFrontierFinding::ConsumedFrontierAheadOfAvailable {
consumed_sequence,
available_sequence: available_frontier_sequence,
},
);
}
}
match request.delivery_state {
SubscriberDeliveryState::Dropped => {
findings.push(SubscriberFrontierFinding::DeliveryDropped)
}
SubscriberDeliveryState::Disconnected => {
findings.push(SubscriberFrontierFinding::DeliveryDisconnected)
}
SubscriberDeliveryState::Active
| SubscriberDeliveryState::Lagging
| SubscriberDeliveryState::Unknown => {}
}
if request.loss_precision != LossPrecision::Unknown {
findings.push(SubscriberFrontierFinding::LossObserved {
precision: request.loss_precision,
});
}
if request.loss_precision == LossPrecision::ExactRange {
let mut ranges = request.exact_dropped_ranges.clone();
ranges.sort_unstable();
for (start_sequence, end_sequence) in ranges {
findings.push(SubscriberFrontierFinding::ExactDroppedRange {
start_sequence,
end_sequence,
});
}
}
crate::evidence::sort_findings(&mut findings);
let body = SubscriberFrontierReportBody {
schema_version: SUBSCRIBER_FRONTIER_REPORT_SCHEMA_VERSION,
source: request.source,
consumed_frontier_sequence: request.consumed_frontier_sequence,
available_frontier_sequence,
lag_events,
delivery_state: request.delivery_state,
loss_precision: request.loss_precision,
findings,
};
let body_hash = report_body_hash(&body)?;
Ok(SubscriberFrontierEvidenceReport {
body,
body_hash,
generated_at_unix_ms: None,
batpak_version: None,
diagnostics: Vec::new(),
})
}
}
fn report_body_hash(
body: &SubscriberFrontierReportBody,
) -> Result<SubscriberFrontierHash, SubscriberFrontierReportError> {
crate::evidence::report_body_hash(body, |message| {
SubscriberFrontierReportError::BodyEncoding { message }
})
}
#[cfg(test)]
mod tests {
use super::{LossPrecision, SubscriberFrontierFinding};
#[test]
fn subscriber_frontier_findings_are_sorted_structurally() {
let mut findings = vec![
SubscriberFrontierFinding::ExactDroppedRange {
start_sequence: 20,
end_sequence: 30,
},
SubscriberFrontierFinding::ExactDroppedRange {
start_sequence: 10,
end_sequence: 15,
},
SubscriberFrontierFinding::LossObserved {
precision: LossPrecision::Unknown,
},
];
crate::evidence::sort_findings(&mut findings);
assert_eq!(
findings,
vec![
SubscriberFrontierFinding::LossObserved {
precision: LossPrecision::Unknown,
},
SubscriberFrontierFinding::ExactDroppedRange {
start_sequence: 10,
end_sequence: 15,
},
SubscriberFrontierFinding::ExactDroppedRange {
start_sequence: 20,
end_sequence: 30,
},
],
"PROPERTY: subscriber frontier findings must be sorted in deterministic structural order"
);
}
}