use crate::coordinate::{KindFilter, Region};
use crate::store::index::IndexEntry;
use crate::store::{Freshness, HlcPoint, Store};
use serde::{Deserialize, Serialize};
pub const READ_WALK_REPORT_SCHEMA_VERSION: u16 = 1;
pub type ReadWalkHash = [u8; 32];
#[derive(Clone, Debug)]
pub struct ReadWalkRequest {
pub region: Region,
pub limit: Option<usize>,
pub include_proof_refs: bool,
pub freshness_intent: Freshness,
}
impl ReadWalkRequest {
#[must_use]
pub fn full(region: Region) -> Self {
Self {
region,
limit: None,
include_proof_refs: false,
freshness_intent: Freshness::Consistent,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkSourceRef {
EntityPrefix {
prefix: String,
},
Scope {
scope: String,
},
FactExact {
category: u8,
type_id: u16,
},
FactCategory {
category: u8,
},
ClockRange {
start_clock: u32,
end_clock: u32,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkReplayMode {
Current,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkFreshnessIntent {
Consistent,
MaybeStale {
max_stale_ms: u64,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkFrontierKind {
Visible,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ReadWalkInputFrontier {
pub kind: ReadWalkFrontierKind,
pub wall_ms: u64,
pub global_sequence: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkDroppedCount {
Known(u64),
NotApplicable,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ReadWalkProofRef {
pub event_id: u128,
pub global_sequence: u64,
pub event_hash: ReadWalkHash,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReadWalkProofRefs {
Known(Vec<ReadWalkProofRef>),
NotApplicable,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ReadWalkFinding {
InputFrontierUnknown,
LimitedResults {
dropped_count: u64,
},
MissingBackingEntry {
event_id: u128,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReadWalkReportBody {
pub schema_version: u16,
pub source_refs: Vec<ReadWalkSourceRef>,
pub replay_mode: ReadWalkReplayMode,
pub freshness_intent: ReadWalkFreshnessIntent,
pub input_frontier: Option<ReadWalkInputFrontier>,
pub requested_limit: Option<u64>,
pub matched_count: u64,
pub returned_count: u64,
pub dropped_limited_count: ReadWalkDroppedCount,
pub proof_refs: ReadWalkProofRefs,
pub findings: Vec<ReadWalkFinding>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReadWalkEvidenceReport {
pub body: ReadWalkReportBody,
pub body_hash: ReadWalkHash,
pub generated_at_unix_ms: Option<u64>,
pub batpak_version: Option<String>,
pub diagnostics: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ReadWalkReportError {
BodyEncoding {
message: String,
},
}
impl std::fmt::Display for ReadWalkReportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BodyEncoding { message } => {
write!(f, "read walk report body encoding failed: {message}")
}
}
}
}
impl std::error::Error for ReadWalkReportError {}
impl<State: crate::store::StoreState> Store<State> {
pub fn query_with_read_walk_evidence(
&self,
request: &ReadWalkRequest,
) -> Result<(Vec<IndexEntry>, ReadWalkEvidenceReport), ReadWalkReportError> {
let (hits, visibility) = self.index.query_hits_with_snapshot(&request.region);
let visible_upper_bound = visibility.visible_upper_bound();
let matched_count = hits.len() as u64;
let requested_limit = request.limit.map(|value| value as u64);
let mut selected_hits = hits;
let dropped_limited_count = if let Some(limit) = request.limit {
if selected_hits.len() > limit {
let dropped = (selected_hits.len() - limit) as u64;
selected_hits.truncate(limit);
ReadWalkDroppedCount::Known(dropped)
} else {
ReadWalkDroppedCount::NotApplicable
}
} else {
ReadWalkDroppedCount::NotApplicable
};
let mut findings = Vec::new();
if let ReadWalkDroppedCount::Known(dropped_count) = dropped_limited_count {
findings.push(ReadWalkFinding::LimitedResults { dropped_count });
}
let mut entries = Vec::with_capacity(selected_hits.len());
for hit in &selected_hits {
match self.index.upgrade_hit_with_visibility(*hit, &visibility) {
Some(entry) => entries.push(entry),
None => findings.push(ReadWalkFinding::MissingBackingEntry {
event_id: hit.event_id,
}),
}
}
let proof_refs = if request.include_proof_refs {
ReadWalkProofRefs::Known(
entries
.iter()
.map(|entry| ReadWalkProofRef {
event_id: entry.event_id,
global_sequence: entry.global_sequence,
event_hash: entry.hash_chain.event_hash,
})
.collect(),
)
} else {
ReadWalkProofRefs::NotApplicable
};
let observed_visible_sequence = visible_upper_bound.saturating_sub(1);
let input_frontier = if visible_upper_bound == 0 {
Some(ReadWalkInputFrontier {
kind: ReadWalkFrontierKind::Visible,
wall_ms: HlcPoint::ORIGIN.wall_ms,
global_sequence: HlcPoint::ORIGIN.global_sequence,
})
} else {
self.index
.hlc_for_global_sequence(observed_visible_sequence)
.map(|point| ReadWalkInputFrontier {
kind: ReadWalkFrontierKind::Visible,
wall_ms: point.wall_ms,
global_sequence: point.global_sequence,
})
};
if input_frontier.is_none() {
findings.push(ReadWalkFinding::InputFrontierUnknown);
}
crate::evidence::sort_findings(&mut findings);
let body = ReadWalkReportBody {
schema_version: READ_WALK_REPORT_SCHEMA_VERSION,
source_refs: source_refs_from_region(&request.region),
replay_mode: ReadWalkReplayMode::Current,
freshness_intent: map_freshness_intent(&request.freshness_intent),
input_frontier,
requested_limit,
matched_count,
returned_count: entries.len() as u64,
dropped_limited_count,
proof_refs,
findings,
};
let body_hash = report_body_hash(&body)?;
let report = ReadWalkEvidenceReport {
body,
body_hash,
generated_at_unix_ms: None,
batpak_version: None,
diagnostics: Vec::new(),
};
Ok((entries, report))
}
}
fn source_refs_from_region(region: &Region) -> Vec<ReadWalkSourceRef> {
let mut refs = Vec::new();
if let Some(prefix) = region.entity_prefix() {
refs.push(ReadWalkSourceRef::EntityPrefix {
prefix: prefix.to_owned(),
});
}
if let Some(scope) = region.scope_value() {
refs.push(ReadWalkSourceRef::Scope {
scope: scope.to_owned(),
});
}
if let Some(fact) = region.fact() {
match fact {
KindFilter::Exact(kind) => refs.push(ReadWalkSourceRef::FactExact {
category: kind.category(),
type_id: kind.type_id(),
}),
KindFilter::Category(category) => refs.push(ReadWalkSourceRef::FactCategory {
category: *category,
}),
KindFilter::Any => {}
}
}
if let Some(range) = region.clock_range() {
refs.push(ReadWalkSourceRef::ClockRange {
start_clock: range.start(),
end_clock: range.end(),
});
}
refs.sort();
refs
}
fn map_freshness_intent(freshness: &Freshness) -> ReadWalkFreshnessIntent {
match freshness {
Freshness::Consistent => ReadWalkFreshnessIntent::Consistent,
Freshness::MaybeStale { max_stale_ms } => ReadWalkFreshnessIntent::MaybeStale {
max_stale_ms: *max_stale_ms,
},
}
}
fn report_body_hash(body: &ReadWalkReportBody) -> Result<ReadWalkHash, ReadWalkReportError> {
crate::evidence::report_body_hash(body, |message| ReadWalkReportError::BodyEncoding {
message,
})
}
#[cfg(test)]
mod tests {
use super::{
source_refs_from_region, ReadWalkDroppedCount, ReadWalkFinding, ReadWalkRequest,
ReadWalkSourceRef,
};
use crate::coordinate::{ClockRange, Coordinate, EventCategory, Region};
use crate::event::EventKind;
use crate::store::{Store, StoreConfig};
#[test]
fn read_walk_at_exact_limit_reports_no_dropped_results() {
let dir = tempfile::tempdir().expect("tempdir");
let store = Store::open(StoreConfig::new(dir.path())).expect("open");
let coord = Coordinate::new("entity:rw-limit", "scope:rw").expect("coord");
let kind = EventKind::custom(0xF, 0x51);
for n in 0..2 {
let _ = store
.append(&coord, kind, &serde_json::json!({ "n": n }))
.expect("append");
}
let mut request = ReadWalkRequest::full(Region::entity("entity:rw-limit"));
request.limit = Some(2);
let (entries, report) = store
.query_with_read_walk_evidence(&request)
.expect("evidence");
assert_eq!(
entries.len(),
2,
"premise: exactly two entries match at the limit"
);
assert!(
matches!(
report.body.dropped_limited_count,
ReadWalkDroppedCount::NotApplicable
),
"a result set exactly at the limit drops nothing (kills `> -> >=`), got {:?}",
report.body.dropped_limited_count
);
assert!(
!report
.body
.findings
.iter()
.any(|f| matches!(f, ReadWalkFinding::LimitedResults { .. })),
"no LimitedResults finding when nothing was dropped"
);
store.close().expect("close");
}
#[test]
fn source_refs_capture_every_active_region_selector() {
let region = Region::entity("entity:rw")
.with_fact_category(EventCategory::new(7).expect("valid category"))
.with_clock_range(ClockRange::new(3, 9).expect("valid range"));
let refs = source_refs_from_region(®ion);
let mut failures: Vec<String> = Vec::new();
if !refs.iter().any(
|r| matches!(r, ReadWalkSourceRef::EntityPrefix { prefix } if prefix == "entity:rw"),
) {
failures.push("missing EntityPrefix ref".into());
}
if !refs
.iter()
.any(|r| matches!(r, ReadWalkSourceRef::FactCategory { category } if *category == 7))
{
failures.push("missing FactCategory ref".into());
}
if !refs.iter().any(|r| {
matches!(
r,
ReadWalkSourceRef::ClockRange { start_clock, end_clock }
if *start_clock == 3 && *end_clock == 9
)
}) {
failures.push("missing ClockRange ref".into());
}
assert!(
failures.is_empty(),
"source_refs_from_region must capture every active selector (the `-> vec![]` \
mutant returns none): {failures:?}"
);
}
}