use crate::coordinate::{KindFilter, Region};
use crate::store::{Freshness, HlcPoint, IndexEntry, Store};
use serde::{Deserialize, Serialize};
pub const READ_WALK_REPORT_SCHEMA_VERSION: u16 = 1;
pub type ReadWalkHash = [u8; 32];
#[derive(Clone, Debug)]
pub struct ReadWalkRequest {
pub region: Region,
pub limit: Option<usize>,
pub include_proof_refs: bool,
pub freshness_intent: Freshness,
}
impl ReadWalkRequest {
#[must_use]
pub fn full(region: Region) -> Self {
Self {
region,
limit: None,
include_proof_refs: false,
freshness_intent: Freshness::Consistent,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkSourceRef {
EntityPrefix {
prefix: String,
},
Scope {
scope: String,
},
FactExact {
category: u8,
type_id: u16,
},
FactCategory {
category: u8,
},
ClockRange {
start_clock: u32,
end_clock: u32,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkReplayMode {
Current,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkFreshnessIntent {
Consistent,
MaybeStale {
max_stale_ms: u64,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkFrontierKind {
Visible,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ReadWalkInputFrontier {
pub kind: ReadWalkFrontierKind,
pub wall_ms: u64,
pub global_sequence: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkDroppedCount {
Known(u64),
NotApplicable,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ReadWalkProofRef {
pub event_id: u128,
pub global_sequence: u64,
pub event_hash: ReadWalkHash,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReadWalkProofRefs {
Known(Vec<ReadWalkProofRef>),
NotApplicable,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkFinding {
InputFrontierUnknown,
LimitedResults {
dropped_count: u64,
},
MissingBackingEntry {
event_id: u128,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReadWalkReportBody {
pub schema_version: u16,
pub source_refs: Vec<ReadWalkSourceRef>,
pub replay_mode: ReadWalkReplayMode,
pub freshness_intent: ReadWalkFreshnessIntent,
pub input_frontier: Option<ReadWalkInputFrontier>,
pub requested_limit: Option<u64>,
pub matched_count: u64,
pub returned_count: u64,
pub dropped_limited_count: ReadWalkDroppedCount,
pub proof_refs: ReadWalkProofRefs,
pub findings: Vec<ReadWalkFinding>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReadWalkEvidenceReport {
pub body: ReadWalkReportBody,
pub body_hash: ReadWalkHash,
pub generated_at_unix_ms: Option<u64>,
pub batpak_version: Option<String>,
pub diagnostics: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ReadWalkReportError {
BodyEncoding {
message: String,
},
}
impl std::fmt::Display for ReadWalkReportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BodyEncoding { message } => {
write!(f, "read walk report body encoding failed: {message}")
}
}
}
}
impl std::error::Error for ReadWalkReportError {}
impl<State> Store<State> {
pub fn query_with_read_walk_evidence(
&self,
request: &ReadWalkRequest,
) -> Result<(Vec<IndexEntry>, ReadWalkEvidenceReport), ReadWalkReportError> {
let (hits, visible_upper_bound) = self
.index
.query_hits_with_visible_upper_bound(&request.region);
let matched_count = hits.len() as u64;
let requested_limit = request.limit.map(|value| value as u64);
let mut selected_hits = hits;
let dropped_limited_count = if let Some(limit) = request.limit {
if selected_hits.len() > limit {
let dropped = (selected_hits.len() - limit) as u64;
selected_hits.truncate(limit);
ReadWalkDroppedCount::Known(dropped)
} else {
ReadWalkDroppedCount::NotApplicable
}
} else {
ReadWalkDroppedCount::NotApplicable
};
let mut findings = Vec::new();
if let ReadWalkDroppedCount::Known(dropped_count) = dropped_limited_count {
findings.push(ReadWalkFinding::LimitedResults { dropped_count });
}
let mut entries = Vec::with_capacity(selected_hits.len());
for hit in &selected_hits {
match self
.index
.upgrade_hit_with_visible_upper_bound(*hit, visible_upper_bound)
{
Some(entry) => entries.push(entry),
None => findings.push(ReadWalkFinding::MissingBackingEntry {
event_id: hit.event_id,
}),
}
}
let proof_refs = if request.include_proof_refs {
ReadWalkProofRefs::Known(
entries
.iter()
.map(|entry| ReadWalkProofRef {
event_id: entry.event_id,
global_sequence: entry.global_sequence,
event_hash: entry.hash_chain.event_hash,
})
.collect(),
)
} else {
ReadWalkProofRefs::NotApplicable
};
let observed_visible_sequence = visible_upper_bound.saturating_sub(1);
let input_frontier = if visible_upper_bound == 0 {
Some(ReadWalkInputFrontier {
kind: ReadWalkFrontierKind::Visible,
wall_ms: HlcPoint::ORIGIN.wall_ms,
global_sequence: HlcPoint::ORIGIN.global_sequence,
})
} else {
self.index
.hlc_for_global_sequence(observed_visible_sequence)
.map(|point| ReadWalkInputFrontier {
kind: ReadWalkFrontierKind::Visible,
wall_ms: point.wall_ms,
global_sequence: point.global_sequence,
})
};
if input_frontier.is_none() {
findings.push(ReadWalkFinding::InputFrontierUnknown);
}
crate::evidence::sort_findings(&mut findings);
let body = ReadWalkReportBody {
schema_version: READ_WALK_REPORT_SCHEMA_VERSION,
source_refs: source_refs_from_region(&request.region),
replay_mode: ReadWalkReplayMode::Current,
freshness_intent: map_freshness_intent(&request.freshness_intent),
input_frontier,
requested_limit,
matched_count,
returned_count: entries.len() as u64,
dropped_limited_count,
proof_refs,
findings,
};
let body_hash = report_body_hash(&body)?;
let report = ReadWalkEvidenceReport {
body,
body_hash,
generated_at_unix_ms: None,
batpak_version: None,
diagnostics: Vec::new(),
};
Ok((entries, report))
}
}
fn source_refs_from_region(region: &Region) -> Vec<ReadWalkSourceRef> {
let mut refs = Vec::new();
if let Some(prefix) = region.entity_prefix() {
refs.push(ReadWalkSourceRef::EntityPrefix {
prefix: prefix.to_owned(),
});
}
if let Some(scope) = region.scope_value() {
refs.push(ReadWalkSourceRef::Scope {
scope: scope.to_owned(),
});
}
if let Some(fact) = region.fact() {
match fact {
KindFilter::Exact(kind) => refs.push(ReadWalkSourceRef::FactExact {
category: kind.category(),
type_id: kind.type_id(),
}),
KindFilter::Category(category) => refs.push(ReadWalkSourceRef::FactCategory {
category: *category,
}),
KindFilter::Any => {}
}
}
if let Some((start_clock, end_clock)) = region.clock_range() {
refs.push(ReadWalkSourceRef::ClockRange {
start_clock,
end_clock,
});
}
refs.sort();
refs
}
fn map_freshness_intent(freshness: &Freshness) -> ReadWalkFreshnessIntent {
match freshness {
Freshness::Consistent => ReadWalkFreshnessIntent::Consistent,
Freshness::MaybeStale { max_stale_ms } => ReadWalkFreshnessIntent::MaybeStale {
max_stale_ms: *max_stale_ms,
},
}
}
fn report_body_hash(body: &ReadWalkReportBody) -> Result<ReadWalkHash, ReadWalkReportError> {
crate::evidence::report_body_hash(body, |message| ReadWalkReportError::BodyEncoding {
message,
})
}