1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
//! Deterministic structural evidence over stable store-owned diagnostics facts.
//!
//! Canonical identity intentionally excludes raw filesystem paths, free-form
//! diagnostics strings, and per-process timestamps outside the structured
//! [`crate::store::cold_start::rebuild::OpenIndexReport`] snapshot (which is
//! already part of cold-start truth, not host noise).
//!
//! Bodies are **point-in-time** snapshots: comparing full bodies across
//! `close`/`open` is not a substrate contract because cold-start path,
//! frontier timing, and system event replay can legitimately differ while the
//! store remains consistent.
use crate::store::cold_start::rebuild::OpenIndexReport;
use crate::store::stats::{
FrontierView, PlatformEvidenceSummary, StoreDiagnostics, WriterPressure,
};
use crate::store::RestartPolicy;
use serde::{Deserialize, Serialize};
use std::path::Path;
/// Schema version for store resource evidence bodies.
pub const STORE_RESOURCE_REPORT_SCHEMA_VERSION: u32 = 1;
/// Hash alias for store resource report bodies (`body_hash`).
pub type StoreResourceHash = [u8; 32];
/// Error returned when store resource evidence generation fails.
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum StoreResourceReportError {
/// Canonical body encoding failed.
BodyEncoding {
/// Human-readable encoding error.
message: String,
},
}
impl std::fmt::Display for StoreResourceReportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BodyEncoding { message } => {
write!(f, "store resource report body encoding failed: {message}")
}
}
}
}
impl std::error::Error for StoreResourceReportError {}
/// Writer restart policy shape captured in resource evidence (no extra behavior).
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum StoreResourceRestartPolicyShape {
/// At most one automatic restart after a writer panic.
Once,
/// Bounded restarts within a rolling millisecond window.
Bounded {
/// Maximum restarts permitted within the time window.
max_restarts: u32,
/// Rolling window length in milliseconds.
within_ms: u64,
},
}
fn restart_policy_shape(policy: &RestartPolicy) -> StoreResourceRestartPolicyShape {
match policy {
RestartPolicy::Once => StoreResourceRestartPolicyShape::Once,
RestartPolicy::Bounded {
max_restarts,
within_ms,
} => StoreResourceRestartPolicyShape::Bounded {
max_restarts: *max_restarts,
within_ms: *within_ms,
},
}
}
/// Stable digest of the store data directory identity.
///
/// Existing paths are canonicalized before hashing so equivalent spellings of
/// the same directory share one identity. If canonicalization fails, the raw
/// path bytes remain the fallback identity material.
#[must_use]
pub fn store_data_dir_identity_hash(path: &Path) -> StoreResourceHash {
let canonical;
let identity_path = match crate::store::platform::fs::canonicalize(path) {
Ok(path) => {
canonical = path;
canonical.as_path()
}
Err(_) => path,
};
let bytes =
crate::store::platform::path_identity::path_bytes_for_identity_digest(identity_path);
crate::evidence::content_hash(&bytes)
}
/// Frontier coordinates serialized without `HlcPoint` to keep the body fully serde-stable.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StoreResourceFrontierBody {
/// Accepted watermark wall clock (ms).
pub accepted_wall_ms: u64,
/// Accepted watermark global sequence.
pub accepted_global_sequence: u64,
/// Written watermark wall clock (ms).
pub written_wall_ms: u64,
/// Written watermark global sequence.
pub written_global_sequence: u64,
/// Durable watermark wall clock (ms).
pub durable_wall_ms: u64,
/// Durable watermark global sequence.
pub durable_global_sequence: u64,
/// Visible watermark wall clock (ms).
pub visible_wall_ms: u64,
/// Visible watermark global sequence.
pub visible_global_sequence: u64,
/// Applied watermark wall clock (ms).
pub applied_wall_ms: u64,
/// Applied watermark global sequence.
pub applied_global_sequence: u64,
/// Emitted watermark wall clock (ms).
pub emitted_wall_ms: u64,
/// Emitted watermark global sequence.
pub emitted_global_sequence: u64,
/// Signed visible-minus-durable sequence gap at snapshot time.
pub visible_minus_durable_seq: i64,
/// Oldest undurable write age in milliseconds when known.
pub oldest_pending_write_age_ms: Option<u64>,
}
impl From<FrontierView> for StoreResourceFrontierBody {
fn from(f: FrontierView) -> Self {
Self {
accepted_wall_ms: f.accepted_hlc.wall_ms,
accepted_global_sequence: f.accepted_hlc.global_sequence,
written_wall_ms: f.written_hlc.wall_ms,
written_global_sequence: f.written_hlc.global_sequence,
durable_wall_ms: f.durable_hlc.wall_ms,
durable_global_sequence: f.durable_hlc.global_sequence,
visible_wall_ms: f.visible_hlc.wall_ms,
visible_global_sequence: f.visible_hlc.global_sequence,
applied_wall_ms: f.applied_hlc.wall_ms,
applied_global_sequence: f.applied_hlc.global_sequence,
emitted_wall_ms: f.emitted_hlc.wall_ms,
emitted_global_sequence: f.emitted_hlc.global_sequence,
visible_minus_durable_seq: f.visible_minus_durable_seq,
oldest_pending_write_age_ms: f.oldest_pending_write_age_ms,
}
}
}
/// Deterministic store resource / diagnostics snapshot body.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StoreResourceReportBody {
/// Schema version for this evidence shape.
pub schema_version: u32,
/// Identity hash over canonical data-directory path bytes (never the raw path).
pub data_dir_identity_hash: StoreResourceHash,
/// Events currently indexed.
pub event_count: u64,
/// Global sequence allocator.
pub global_sequence: u64,
/// Visibility sequence bound.
pub visible_sequence: u64,
/// Segment rotation bound from config.
pub segment_max_bytes: u64,
/// FD budget from config.
pub fd_budget: u64,
/// Writer restart policy shape from config.
pub restart_policy: StoreResourceRestartPolicyShape,
/// Writer mailbox pressure snapshot.
pub writer_pressure: WriterPressure,
/// Frontier coordinates at snapshot time.
pub frontier: StoreResourceFrontierBody,
/// Index topology label at snapshot time.
pub index_topology: String,
/// Tile count from index overlay accounting.
pub tile_count: u64,
/// Cold-start open report when present (structural cold-start truth only).
pub open_report: Option<OpenIndexReport>,
/// Platform evidence summary for the configured store path.
pub platform_evidence: PlatformEvidenceSummary,
}
/// Build a canonical body from live diagnostics.
#[must_use]
pub fn store_resource_report_body_from_diagnostics(
d: &StoreDiagnostics,
) -> StoreResourceReportBody {
StoreResourceReportBody {
schema_version: STORE_RESOURCE_REPORT_SCHEMA_VERSION,
data_dir_identity_hash: store_data_dir_identity_hash(&d.data_dir),
event_count: u64::try_from(d.event_count).unwrap_or(u64::MAX),
global_sequence: d.global_sequence,
visible_sequence: d.visible_sequence,
segment_max_bytes: d.segment_max_bytes,
fd_budget: u64::try_from(d.fd_budget).unwrap_or(u64::MAX),
restart_policy: restart_policy_shape(&d.restart_policy),
writer_pressure: d.writer_pressure,
frontier: StoreResourceFrontierBody::from(d.frontier.clone()),
index_topology: d.index_topology.to_string(),
tile_count: u64::try_from(d.tile_count).unwrap_or(u64::MAX),
open_report: d.open_report.clone(),
platform_evidence: d.platform_evidence.clone(),
}
}
/// Canonical `body_hash` over the report body.
///
/// # Errors
/// Canonical MessagePack encode failure.
pub fn store_resource_report_body_hash(
body: &StoreResourceReportBody,
) -> Result<StoreResourceHash, StoreResourceReportError> {
crate::evidence::report_body_hash(body, |message| StoreResourceReportError::BodyEncoding {
message,
})
}
/// Store resource evidence report envelope (metadata outside deterministic identity).
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StoreResourceEvidenceReport {
/// Deterministic report body.
pub body: StoreResourceReportBody,
/// Canonical hash of `body`.
pub body_hash: StoreResourceHash,
/// Optional generation timestamp metadata outside deterministic identity.
pub generated_at_unix_ms: Option<u64>,
/// Optional producer version metadata outside deterministic identity.
pub batpak_version: Option<String>,
/// Optional diagnostics outside deterministic identity.
pub diagnostics: Vec<String>,
}
/// Build evidence from diagnostics, including `body_hash`.
///
/// # Errors
/// Canonical body encoding failure while computing `body_hash`.
pub fn store_resource_evidence_report_from_diagnostics(
d: &StoreDiagnostics,
) -> Result<StoreResourceEvidenceReport, StoreResourceReportError> {
let body = store_resource_report_body_from_diagnostics(d);
let body_hash = store_resource_report_body_hash(&body)?;
Ok(StoreResourceEvidenceReport {
body,
body_hash,
generated_at_unix_ms: None,
batpak_version: None,
diagnostics: Vec::new(),
})
}
#[cfg(test)]
mod tests;