1use chrono::{DateTime, Utc};
26
27use super::cache;
28use super::event::Event;
29use super::kel::{GitKel, KelError};
30use super::state::KeyState;
31use super::types::Said;
32use crate::domain::EventHash;
33
34pub const MAX_INCREMENTAL_EVENTS: usize = 10_000;
37
38#[derive(Debug)]
40pub enum IncrementalResult {
41 CacheHit(KeyState),
43 IncrementalSuccess {
45 state: KeyState,
46 events_validated: usize,
47 },
48 NeedsFullReplay(ReplayReason),
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ReplayReason {
55 NoCacheFile,
57 CacheCorrupt,
59 CachedCommitMissing,
61 CachedCommitNotInAncestry,
63 CacheSaidMismatch,
65 TooManyEvents,
67}
68
69#[derive(Debug, thiserror::Error)]
72#[non_exhaustive]
73pub enum IncrementalError {
74 #[error("KEL error: {0}")]
75 Kel(#[from] KelError),
76
77 #[error("Chain continuity error: expected previous SAID {expected}, got {actual}")]
78 ChainContinuity { expected: Said, actual: Said },
79
80 #[error("Sequence error: expected {expected}, got {actual}")]
81 SequenceError { expected: u64, actual: u64 },
82
83 #[error("Malformed sequence number: {raw:?}")]
84 MalformedSequence { raw: String },
85
86 #[error("Invalid event type in KEL: {0}")]
87 InvalidEventType(String),
88
89 #[error("KEL history is non-linear: commit {commit} has {parent_count} parents (expected 1)")]
90 NonLinearHistory { commit: String, parent_count: usize },
91
92 #[error("KEL history is corrupted: commit {commit} has no parent but is not inception")]
93 MissingParent { commit: String },
94}
95
96impl auths_core::error::AuthsErrorInfo for IncrementalError {
97 fn error_code(&self) -> &'static str {
98 match self {
99 Self::Kel(_) => "AUTHS-E4951",
100 Self::ChainContinuity { .. } => "AUTHS-E4952",
101 Self::SequenceError { .. } => "AUTHS-E4953",
102 Self::MalformedSequence { .. } => "AUTHS-E4954",
103 Self::InvalidEventType(_) => "AUTHS-E4955",
104 Self::NonLinearHistory { .. } => "AUTHS-E4956",
105 Self::MissingParent { .. } => "AUTHS-E4957",
106 }
107 }
108
109 fn suggestion(&self) -> Option<&'static str> {
110 match self {
111 Self::Kel(_) => None,
112 Self::ChainContinuity { .. } => {
113 Some("The KEL chain is broken; clear the cache and retry")
114 }
115 Self::SequenceError { .. } => {
116 Some("The KEL has sequence gaps; re-sync from a trusted source")
117 }
118 Self::MalformedSequence { .. } => None,
119 Self::InvalidEventType(_) => None,
120 Self::NonLinearHistory { .. } => {
121 Some("The KEL has merge commits, indicating tampering")
122 }
123 Self::MissingParent { .. } => Some("The KEL commit history is corrupted"),
124 }
125 }
126}
127
128pub fn try_incremental_validation<'a>(
146 kel: &GitKel<'a>,
147 did: &str,
148 now: DateTime<Utc>,
149) -> Result<IncrementalResult, IncrementalError> {
150 let tip_hash = kel.tip_commit_hash()?;
152 let tip_event = kel.read_event_from_commit_hash(tip_hash)?;
153 let tip_said = tip_event.said();
154
155 let cached = match cache::try_load_cached_state_full(kel.workdir(), did) {
157 Some(c) => c,
158 None => {
159 log::debug!("KEL cache miss for {}: no cache file", did);
160 return Ok(IncrementalResult::NeedsFullReplay(
161 ReplayReason::NoCacheFile,
162 ));
163 }
164 };
165
166 if cached.validated_against_tip_said == *tip_said {
168 log::debug!("KEL cache hit for {}", did);
169 return Ok(IncrementalResult::CacheHit(cached.state));
170 }
171
172 let cached_hash = match GitKel::parse_hash(cached.last_commit_oid.as_str()) {
174 Ok(h) => h,
175 Err(_) => {
176 log::debug!("KEL cache corrupt for {}: invalid commit hash", did);
177 return Ok(IncrementalResult::NeedsFullReplay(
178 ReplayReason::CacheCorrupt,
179 ));
180 }
181 };
182
183 if !kel.commit_exists(cached_hash) {
185 log::debug!(
186 "KEL cache miss for {}: cached commit {} doesn't exist",
187 did,
188 cached_hash
189 );
190 return Ok(IncrementalResult::NeedsFullReplay(
191 ReplayReason::CachedCommitMissing,
192 ));
193 }
194
195 let cached_event = kel.read_event_from_commit_hash(cached_hash)?;
197 if *cached_event.said() != cached.validated_against_tip_said {
198 log::warn!(
199 "KEL cache SAID mismatch for {}: cache says {} but commit has {}",
200 did,
201 cached.validated_against_tip_said,
202 cached_event.said()
203 );
204 return Ok(IncrementalResult::NeedsFullReplay(
205 ReplayReason::CacheSaidMismatch,
206 ));
207 }
208
209 let delta = match build_delta_with_linearity_check(kel, tip_hash, cached_hash)? {
212 Some(d) => d,
213 None => {
214 log::debug!("KEL cache miss for {}: cached commit not in ancestry", did);
215 return Ok(IncrementalResult::NeedsFullReplay(
216 ReplayReason::CachedCommitNotInAncestry,
217 ));
218 }
219 };
220
221 if delta.len() > MAX_INCREMENTAL_EVENTS {
223 log::info!(
224 "KEL incremental skip for {}: {} events exceeds threshold {}",
225 did,
226 delta.len(),
227 MAX_INCREMENTAL_EVENTS
228 );
229 return Ok(IncrementalResult::NeedsFullReplay(
230 ReplayReason::TooManyEvents,
231 ));
232 }
233
234 log::debug!(
235 "KEL incremental validation for {}: {} new events",
236 did,
237 delta.len()
238 );
239
240 let mut state = cached.state;
242 let events_validated = delta.len();
243
244 for hash in delta {
245 let event = kel.read_event_from_commit_hash(hash)?;
246 apply_event_to_state(&mut state, &event)?;
247 }
248
249 if state.last_event_said != *tip_said {
251 return Err(IncrementalError::ChainContinuity {
252 expected: tip_said.clone(),
253 actual: state.last_event_said.clone(),
254 });
255 }
256
257 let _ = cache::write_kel_cache(
259 kel.workdir(),
260 did,
261 &state,
262 tip_said.as_str(),
263 &tip_hash.to_hex(),
264 now,
265 );
266
267 Ok(IncrementalResult::IncrementalSuccess {
268 state,
269 events_validated,
270 })
271}
272
273fn build_delta_with_linearity_check(
284 kel: &GitKel<'_>,
285 tip_hash: EventHash,
286 cached_hash: EventHash,
287) -> Result<Option<Vec<EventHash>>, IncrementalError> {
288 if tip_hash == cached_hash {
289 return Ok(Some(Vec::new()));
290 }
291
292 let mut delta = Vec::new();
293 let mut current = tip_hash;
294
295 loop {
296 delta.push(current);
297
298 let parent_count = kel.parent_count(current)?;
300
301 if parent_count > 1 {
302 return Err(IncrementalError::NonLinearHistory {
304 commit: current.to_hex(),
305 parent_count,
306 });
307 }
308
309 if parent_count == 0 {
310 return Ok(None);
313 }
314
315 #[allow(clippy::expect_used)]
317 let parent = kel.parent_hash(current)?.expect("parent_count was 1");
318
319 if parent == cached_hash {
320 break;
322 }
323 current = parent;
324 }
325
326 delta.reverse();
328 Ok(Some(delta))
329}
330
331fn apply_event_to_state(state: &mut KeyState, event: &Event) -> Result<(), IncrementalError> {
336 let expected_sequence = state.sequence + 1;
337 let actual_sequence = event.sequence().value();
338
339 if actual_sequence != expected_sequence {
341 return Err(IncrementalError::SequenceError {
342 expected: expected_sequence,
343 actual: actual_sequence,
344 });
345 }
346
347 if let Some(prev_said) = event.previous() {
349 if *prev_said != state.last_event_said {
350 return Err(IncrementalError::ChainContinuity {
351 expected: state.last_event_said.clone(),
352 actual: prev_said.clone(),
353 });
354 }
355 } else {
356 return Err(IncrementalError::InvalidEventType(
359 "Unexpected inception event in incremental validation".to_string(),
360 ));
361 }
362
363 match event {
365 Event::Rot(rot) => {
366 let threshold =
367 rot.kt
368 .parse::<u64>()
369 .map_err(|_| IncrementalError::MalformedSequence {
370 raw: rot.kt.clone(),
371 })?;
372 let next_threshold =
373 rot.nt
374 .parse::<u64>()
375 .map_err(|_| IncrementalError::MalformedSequence {
376 raw: rot.nt.clone(),
377 })?;
378
379 state.apply_rotation(
380 rot.k.clone(),
381 rot.n.clone(),
382 threshold,
383 next_threshold,
384 actual_sequence,
385 rot.d.clone(),
386 );
387 }
388 Event::Ixn(ixn) => {
389 state.apply_interaction(actual_sequence, ixn.d.clone());
390 }
391 Event::Icp(_) => {
392 return Err(IncrementalError::InvalidEventType(
393 "Inception event after KEL start".to_string(),
394 ));
395 }
396 }
397
398 Ok(())
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404 use crate::keri::Prefix;
405
406 #[test]
410 fn test_incremental_result_variants() {
411 let state = KeyState::from_inception(
413 Prefix::new_unchecked("ETest".to_string()),
414 vec!["DKey".to_string()],
415 vec!["ENext".to_string()],
416 1,
417 1,
418 Said::new_unchecked("ESaid".to_string()),
419 );
420
421 let _hit = IncrementalResult::CacheHit(state.clone());
422 let _success = IncrementalResult::IncrementalSuccess {
423 state,
424 events_validated: 5,
425 };
426 let _miss = IncrementalResult::NeedsFullReplay(ReplayReason::NoCacheFile);
427 }
428
429 #[test]
430 fn test_replay_reasons() {
431 assert_ne!(ReplayReason::NoCacheFile, ReplayReason::CacheCorrupt);
433 assert_ne!(
434 ReplayReason::CachedCommitMissing,
435 ReplayReason::CacheSaidMismatch
436 );
437 assert_ne!(
438 ReplayReason::TooManyEvents,
439 ReplayReason::CachedCommitNotInAncestry
440 );
441 }
442}