Skip to main content

auths_id/keri/
incremental.rs

1//! Incremental KEL validation engine.
2//!
3//! This module provides O(k) incremental validation where k is the number of
4//! new events since the last cached validation, instead of O(n) full replay.
5//!
6//! ## Algorithm
7//!
8//! 1. Load cached state (includes last_commit_oid)
9//! 2. Verify cache integrity (SAID matches event in cached commit)
10//! 3. Walk from tip backwards to find the cached commit
11//! 4. If k > MAX_INCREMENTAL_EVENTS, skip to full replay
12//! 5. Verify linear history (exactly 1 parent per commit)
13//! 6. Reverse the delta to get oldest-to-newest order
14//! 7. Validate and apply each new event to the cached state
15//! 8. Write updated cache with new tip position
16//!
17//! If any step fails, fall back to full replay.
18//!
19//! ## Invariants
20//!
21//! - KEL must be strictly linear (no merge commits)
22//! - Cache is only trusted if SAID in cache matches SAID in cached commit
23//! - Incremental path is bounded to prevent pathological walks
24
25use chrono::{DateTime, Utc};
26
27use super::cache;
28use super::event::Event;
29use super::kel::{GitKel, KelError};
30use super::state::KeyState;
31use super::types::Said;
32use crate::domain::EventHash;
33
34/// Maximum number of events to validate incrementally.
35/// If k exceeds this, fall back to full replay to avoid pathological walks.
36pub const MAX_INCREMENTAL_EVENTS: usize = 10_000;
37
38/// Result of incremental validation attempt.
39#[derive(Debug)]
40pub enum IncrementalResult {
41    /// Cache hit - state matches current tip exactly
42    CacheHit(KeyState),
43    /// Incremental success - validated k new events
44    IncrementalSuccess {
45        state: KeyState,
46        events_validated: usize,
47    },
48    /// Cache miss - needs full replay (cache missing, corrupt, or not in ancestry)
49    NeedsFullReplay(ReplayReason),
50}
51
52/// Reason why full replay is needed.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ReplayReason {
55    /// No cache file exists
56    NoCacheFile,
57    /// Cache file exists but couldn't be parsed or version mismatch
58    CacheCorrupt,
59    /// Cached commit OID doesn't exist in repository
60    CachedCommitMissing,
61    /// Cached commit is not in the ancestry of tip
62    CachedCommitNotInAncestry,
63    /// Cache SAID doesn't match SAID in cached commit (cache lies)
64    CacheSaidMismatch,
65    /// Too many events since cache (k > MAX_INCREMENTAL_EVENTS)
66    TooManyEvents,
67}
68
69/// Errors specific to incremental validation.
70/// These are hard errors that indicate KEL corruption, not cache problems.
71#[derive(Debug, thiserror::Error)]
72#[non_exhaustive]
73pub enum IncrementalError {
74    #[error("KEL error: {0}")]
75    Kel(#[from] KelError),
76
77    #[error("Chain continuity error: expected previous SAID {expected}, got {actual}")]
78    ChainContinuity { expected: Said, actual: Said },
79
80    #[error("Sequence error: expected {expected}, got {actual}")]
81    SequenceError { expected: u64, actual: u64 },
82
83    #[error("Malformed sequence number: {raw:?}")]
84    MalformedSequence { raw: String },
85
86    #[error("Invalid event type in KEL: {0}")]
87    InvalidEventType(String),
88
89    #[error("KEL history is non-linear: commit {commit} has {parent_count} parents (expected 1)")]
90    NonLinearHistory { commit: String, parent_count: usize },
91
92    #[error("KEL history is corrupted: commit {commit} has no parent but is not inception")]
93    MissingParent { commit: String },
94}
95
96impl auths_core::error::AuthsErrorInfo for IncrementalError {
97    fn error_code(&self) -> &'static str {
98        match self {
99            Self::Kel(_) => "AUTHS-E4951",
100            Self::ChainContinuity { .. } => "AUTHS-E4952",
101            Self::SequenceError { .. } => "AUTHS-E4953",
102            Self::MalformedSequence { .. } => "AUTHS-E4954",
103            Self::InvalidEventType(_) => "AUTHS-E4955",
104            Self::NonLinearHistory { .. } => "AUTHS-E4956",
105            Self::MissingParent { .. } => "AUTHS-E4957",
106        }
107    }
108
109    fn suggestion(&self) -> Option<&'static str> {
110        match self {
111            Self::Kel(_) => None,
112            Self::ChainContinuity { .. } => {
113                Some("The KEL chain is broken; clear the cache and retry")
114            }
115            Self::SequenceError { .. } => {
116                Some("The KEL has sequence gaps; re-sync from a trusted source")
117            }
118            Self::MalformedSequence { .. } => None,
119            Self::InvalidEventType(_) => None,
120            Self::NonLinearHistory { .. } => {
121                Some("The KEL has merge commits, indicating tampering")
122            }
123            Self::MissingParent { .. } => Some("The KEL commit history is corrupted"),
124        }
125    }
126}
127
128/// Attempt incremental validation from cached state to current tip.
129///
130/// Returns `IncrementalResult::NeedsFullReplay` if:
131/// - No cache exists
132/// - Cache version mismatch or corrupt
133/// - Cached commit doesn't exist in repo
134/// - Cached commit is not in the ancestry of tip
135/// - Cache SAID doesn't match SAID in cached commit
136/// - Too many events since cache (k > MAX_INCREMENTAL_EVENTS)
137///
138/// Returns `IncrementalResult::CacheHit` if cache matches tip exactly.
139///
140/// Returns `IncrementalResult::IncrementalSuccess` if we successfully validated
141/// from cached position to tip.
142///
143/// Returns `Err(IncrementalError)` for hard errors (KEL corruption like merge
144/// commits, broken chain). These should NOT fall back - they indicate real problems.
145pub fn try_incremental_validation<'a>(
146    kel: &GitKel<'a>,
147    did: &str,
148    now: DateTime<Utc>,
149) -> Result<IncrementalResult, IncrementalError> {
150    // Get current tip
151    let tip_hash = kel.tip_commit_hash()?;
152    let tip_event = kel.read_event_from_commit_hash(tip_hash)?;
153    let tip_said = tip_event.said();
154
155    // Try to load cached state
156    let cached = match cache::try_load_cached_state_full(kel.workdir(), did) {
157        Some(c) => c,
158        None => {
159            log::debug!("KEL cache miss for {}: no cache file", did);
160            return Ok(IncrementalResult::NeedsFullReplay(
161                ReplayReason::NoCacheFile,
162            ));
163        }
164    };
165
166    // Check if cache matches tip exactly (cache hit)
167    if cached.validated_against_tip_said == *tip_said {
168        log::debug!("KEL cache hit for {}", did);
169        return Ok(IncrementalResult::CacheHit(cached.state));
170    }
171
172    // Parse cached commit hash
173    let cached_hash = match GitKel::parse_hash(cached.last_commit_oid.as_str()) {
174        Ok(h) => h,
175        Err(_) => {
176            log::debug!("KEL cache corrupt for {}: invalid commit hash", did);
177            return Ok(IncrementalResult::NeedsFullReplay(
178                ReplayReason::CacheCorrupt,
179            ));
180        }
181    };
182
183    // Verify cached commit exists
184    if !kel.commit_exists(cached_hash) {
185        log::debug!(
186            "KEL cache miss for {}: cached commit {} doesn't exist",
187            did,
188            cached_hash
189        );
190        return Ok(IncrementalResult::NeedsFullReplay(
191            ReplayReason::CachedCommitMissing,
192        ));
193    }
194
195    // CACHE TRUST RULE: Verify cache SAID matches SAID in cached commit
196    let cached_event = kel.read_event_from_commit_hash(cached_hash)?;
197    if *cached_event.said() != cached.validated_against_tip_said {
198        log::warn!(
199            "KEL cache SAID mismatch for {}: cache says {} but commit has {}",
200            did,
201            cached.validated_against_tip_said,
202            cached_event.said()
203        );
204        return Ok(IncrementalResult::NeedsFullReplay(
205            ReplayReason::CacheSaidMismatch,
206        ));
207    }
208
209    // Build the delta: walk from tip backwards to cached commit
210    // This also enforces linear history (exactly 1 parent per commit)
211    let delta = match build_delta_with_linearity_check(kel, tip_hash, cached_hash)? {
212        Some(d) => d,
213        None => {
214            log::debug!("KEL cache miss for {}: cached commit not in ancestry", did);
215            return Ok(IncrementalResult::NeedsFullReplay(
216                ReplayReason::CachedCommitNotInAncestry,
217            ));
218        }
219    };
220
221    // Check if k exceeds threshold
222    if delta.len() > MAX_INCREMENTAL_EVENTS {
223        log::info!(
224            "KEL incremental skip for {}: {} events exceeds threshold {}",
225            did,
226            delta.len(),
227            MAX_INCREMENTAL_EVENTS
228        );
229        return Ok(IncrementalResult::NeedsFullReplay(
230            ReplayReason::TooManyEvents,
231        ));
232    }
233
234    log::debug!(
235        "KEL incremental validation for {}: {} new events",
236        did,
237        delta.len()
238    );
239
240    // Apply events incrementally
241    let mut state = cached.state;
242    let events_validated = delta.len();
243
244    for hash in delta {
245        let event = kel.read_event_from_commit_hash(hash)?;
246        apply_event_to_state(&mut state, &event)?;
247    }
248
249    // Verify final state matches tip
250    if state.last_event_said != *tip_said {
251        return Err(IncrementalError::ChainContinuity {
252            expected: tip_said.clone(),
253            actual: state.last_event_said.clone(),
254        });
255    }
256
257    // Write updated cache
258    let _ = cache::write_kel_cache(
259        kel.workdir(),
260        did,
261        &state,
262        tip_said.as_str(),
263        &tip_hash.to_hex(),
264        now,
265    );
266
267    Ok(IncrementalResult::IncrementalSuccess {
268        state,
269        events_validated,
270    })
271}
272
273/// Build the list of commits from cached position (exclusive) to tip (inclusive).
274///
275/// Also enforces that the KEL is strictly linear (exactly 1 parent per commit,
276/// except inception which has 0).
277///
278/// Returns `None` if cached_hash is not in the ancestry of tip_hash.
279/// Returns `Some(vec![])` if cached_hash == tip_hash (nothing to do).
280/// Returns `Some(vec![...])` with commits in oldest-to-newest order.
281///
282/// Returns `Err(IncrementalError::NonLinearHistory)` if any commit has >1 parent.
283fn build_delta_with_linearity_check(
284    kel: &GitKel<'_>,
285    tip_hash: EventHash,
286    cached_hash: EventHash,
287) -> Result<Option<Vec<EventHash>>, IncrementalError> {
288    if tip_hash == cached_hash {
289        return Ok(Some(Vec::new()));
290    }
291
292    let mut delta = Vec::new();
293    let mut current = tip_hash;
294
295    loop {
296        delta.push(current);
297
298        // Get parent count and enforce linearity
299        let parent_count = kel.parent_count(current)?;
300
301        if parent_count > 1 {
302            // Merge commit detected - this is a hard error
303            return Err(IncrementalError::NonLinearHistory {
304                commit: current.to_hex(),
305                parent_count,
306            });
307        }
308
309        if parent_count == 0 {
310            // Reached inception without finding cached commit
311            // This means cache is invalid (not in ancestry)
312            return Ok(None);
313        }
314
315        // INVARIANT: parent_count==1 verified above
316        #[allow(clippy::expect_used)]
317        let parent = kel.parent_hash(current)?.expect("parent_count was 1");
318
319        if parent == cached_hash {
320            // Found the cached commit - we're done
321            break;
322        }
323        current = parent;
324    }
325
326    // Reverse to get oldest-to-newest order
327    delta.reverse();
328    Ok(Some(delta))
329}
330
331/// Apply a single event to the key state.
332///
333/// This is a pure function that validates chain continuity and sequence,
334/// then updates the state accordingly.
335fn apply_event_to_state(state: &mut KeyState, event: &Event) -> Result<(), IncrementalError> {
336    let expected_sequence = state.sequence + 1;
337    let actual_sequence = event.sequence().value();
338
339    // Verify sequence increments by 1
340    if actual_sequence != expected_sequence {
341        return Err(IncrementalError::SequenceError {
342            expected: expected_sequence,
343            actual: actual_sequence,
344        });
345    }
346
347    // Verify chain continuity (previous SAID matches our current tip)
348    if let Some(prev_said) = event.previous() {
349        if *prev_said != state.last_event_said {
350            return Err(IncrementalError::ChainContinuity {
351                expected: state.last_event_said.clone(),
352                actual: prev_said.clone(),
353            });
354        }
355    } else {
356        // Only inception events have no previous, and we shouldn't see those
357        // in incremental validation (cache always starts after inception)
358        return Err(IncrementalError::InvalidEventType(
359            "Unexpected inception event in incremental validation".to_string(),
360        ));
361    }
362
363    // Apply the event
364    match event {
365        Event::Rot(rot) => {
366            let threshold =
367                rot.kt
368                    .parse::<u64>()
369                    .map_err(|_| IncrementalError::MalformedSequence {
370                        raw: rot.kt.clone(),
371                    })?;
372            let next_threshold =
373                rot.nt
374                    .parse::<u64>()
375                    .map_err(|_| IncrementalError::MalformedSequence {
376                        raw: rot.nt.clone(),
377                    })?;
378
379            state.apply_rotation(
380                rot.k.clone(),
381                rot.n.clone(),
382                threshold,
383                next_threshold,
384                actual_sequence,
385                rot.d.clone(),
386            );
387        }
388        Event::Ixn(ixn) => {
389            state.apply_interaction(actual_sequence, ixn.d.clone());
390        }
391        Event::Icp(_) => {
392            return Err(IncrementalError::InvalidEventType(
393                "Inception event after KEL start".to_string(),
394            ));
395        }
396    }
397
398    Ok(())
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use crate::keri::Prefix;
405
406    // Integration tests for incremental validation are in kel.rs
407    // since they need access to a full GitKel setup
408
409    #[test]
410    fn test_incremental_result_variants() {
411        // Just verify the enum compiles and has expected variants
412        let state = KeyState::from_inception(
413            Prefix::new_unchecked("ETest".to_string()),
414            vec!["DKey".to_string()],
415            vec!["ENext".to_string()],
416            1,
417            1,
418            Said::new_unchecked("ESaid".to_string()),
419        );
420
421        let _hit = IncrementalResult::CacheHit(state.clone());
422        let _success = IncrementalResult::IncrementalSuccess {
423            state,
424            events_validated: 5,
425        };
426        let _miss = IncrementalResult::NeedsFullReplay(ReplayReason::NoCacheFile);
427    }
428
429    #[test]
430    fn test_replay_reasons() {
431        // Verify all replay reasons
432        assert_ne!(ReplayReason::NoCacheFile, ReplayReason::CacheCorrupt);
433        assert_ne!(
434            ReplayReason::CachedCommitMissing,
435            ReplayReason::CacheSaidMismatch
436        );
437        assert_ne!(
438            ReplayReason::TooManyEvents,
439            ReplayReason::CachedCommitNotInAncestry
440        );
441    }
442}