Skip to main content

perspt_sdk/
ledger.rs

1//! Event-sourced replay ledger and durable-execution obligations (PSP-8
2//! System 11).
3//!
4//! The ledger is an append-only, Merkle-chained event stream. Replay is
5//! deterministic over recorded observations: any nondeterministic observation a
6//! transition depends on is recorded *before* it is used, and the kernel refuses
7//! to commit a transition that references an observation lacking a ledger
8//! record. This structurally discharges the recording obligation at the SDK
9//! effect boundary rather than relying on convention.
10//!
11//! The six durable-execution obligations (PSP-8 Def 9.1):
12//!
13//! * R1 durable single-assignment outcomes ([`IdempotencyLog`]);
14//! * R2 recorded nondeterminism ([`Ledger::record_observation`]);
15//! * R3 deterministic transition ([`replay_accepted_trajectory`]);
16//! * R4 unforgeable capability transport ([`crate::capability`]);
17//! * R5 write-ahead external effects ([`ExternalEffectLog`]);
18//! * R6 ordered non-commuting durable turns ([`crate::scheduler`]).
19
20use std::collections::HashMap;
21
22use serde::{Deserialize, Serialize};
23use sha2::{Digest, Sha256};
24
25use crate::error::{Result, SdkError};
26
27/// A ledgered event (PSP-8 System 11). Representative of the full event family;
28/// `Custom` carries any additional structured event.
29#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
30#[serde(tag = "event", rename_all = "snake_case")]
31pub enum LedgerEvent {
32    ProposalObserved {
33        proposal_id: String,
34        actor: String,
35    },
36    AdmissibilityChecked {
37        proposal_id: String,
38        allowed: bool,
39    },
40    EffectApplied {
41        proposal_id: String,
42        idempotency_key: String,
43    },
44    EffectDenied {
45        proposal_id: String,
46        reason: String,
47    },
48    VerifierCompleted {
49        node_id: String,
50        generation: u32,
51    },
52    ResidualEmitted {
53        residual_id: String,
54        node_id: String,
55    },
56    EnergyScored {
57        node_id: String,
58        generation: u32,
59        energy: f64,
60    },
61    GateDecisionRecorded {
62        node_id: String,
63        accepted: bool,
64    },
65    CandidateAccepted {
66        node_id: String,
67        generation: u32,
68        energy: f64,
69    },
70    CandidateRejected {
71        node_id: String,
72        generation: u32,
73    },
74    GraphRevisionAccepted {
75        revision_id: String,
76        sequence: u32,
77    },
78    NodeGenerationRetired {
79        node_id: String,
80        generation: u32,
81    },
82    ResidualCertificateIssued {
83        certificate_id: String,
84        node_id: String,
85    },
86    RollbackApplied {
87        target_event: String,
88    },
89    CapabilityGranted {
90        capability_id: String,
91        holder: String,
92    },
93    CapabilityRevoked {
94        capability_id: String,
95    },
96    /// An observation of nondeterministic data, recorded before use (R2).
97    ObservationRecorded {
98        handle: String,
99        content_hash: String,
100    },
101    Custom {
102        kind: String,
103        payload: serde_json::Value,
104    },
105}
106
107/// One record in the Merkle-chained ledger.
108#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
109pub struct LedgerRecord {
110    pub sequence: u64,
111    pub event: LedgerEvent,
112    /// Hash of the previous record (`"GENESIS"` for the first).
113    pub prev_hash: String,
114    /// `sha256(prev_hash || sequence || canonical(event))`.
115    pub hash: String,
116}
117
118/// Compute the chained hash for a record.
119fn chain_hash(prev_hash: &str, sequence: u64, event: &LedgerEvent) -> Result<String> {
120    let canonical = serde_json::to_vec(event)
121        .map_err(|e| SdkError::Domain(format!("event serialization failed: {e}")))?;
122    let mut hasher = Sha256::new();
123    hasher.update(prev_hash.as_bytes());
124    hasher.update(sequence.to_le_bytes());
125    hasher.update(&canonical);
126    Ok(hex(&hasher.finalize()))
127}
128
129/// Hash arbitrary content (for observations and state witnesses).
130pub fn content_hash(bytes: &[u8]) -> String {
131    let mut hasher = Sha256::new();
132    hasher.update(bytes);
133    hex(&hasher.finalize())
134}
135
136fn hex(bytes: &[u8]) -> String {
137    let mut s = String::with_capacity(bytes.len() * 2);
138    for b in bytes {
139        s.push_str(&format!("{b:02x}"));
140    }
141    s
142}
143
144/// The append-only, Merkle-chained event ledger.
145#[derive(Debug, Clone, Default)]
146pub struct Ledger {
147    records: Vec<LedgerRecord>,
148    /// Recorded observation handles -> content hash (R2).
149    observations: HashMap<String, String>,
150}
151
152impl Ledger {
153    pub fn new() -> Self {
154        Self::default()
155    }
156
157    /// The current ledger head (Merkle root), or `"GENESIS"` when empty.
158    pub fn head(&self) -> String {
159        self.records
160            .last()
161            .map(|r| r.hash.clone())
162            .unwrap_or_else(|| "GENESIS".to_string())
163    }
164
165    pub fn len(&self) -> usize {
166        self.records.len()
167    }
168
169    pub fn is_empty(&self) -> bool {
170        self.records.is_empty()
171    }
172
173    pub fn records(&self) -> &[LedgerRecord] {
174        &self.records
175    }
176
177    /// Append an event, extending the Merkle chain. Returns the new head.
178    pub fn append(&mut self, event: LedgerEvent) -> Result<String> {
179        let sequence = self.records.len() as u64;
180        let prev_hash = self.head();
181        let hash = chain_hash(&prev_hash, sequence, &event)?;
182        // Track observation records as they are appended (R2).
183        if let LedgerEvent::ObservationRecorded {
184            handle,
185            content_hash,
186        } = &event
187        {
188            self.observations
189                .insert(handle.clone(), content_hash.clone());
190        }
191        self.records.push(LedgerRecord {
192            sequence,
193            event,
194            prev_hash,
195            hash: hash.clone(),
196        });
197        Ok(hash)
198    }
199
200    /// Record a nondeterministic observation before it is used (R2). Returns the
201    /// observation handle (content address).
202    pub fn record_observation(&mut self, content: &[u8]) -> Result<String> {
203        let content_hash = content_hash(content);
204        let handle = content_hash.clone();
205        self.append(LedgerEvent::ObservationRecorded {
206            handle: handle.clone(),
207            content_hash,
208        })?;
209        Ok(handle)
210    }
211
212    /// Whether an observation handle has a ledger record.
213    pub fn has_observation(&self, handle: &str) -> bool {
214        self.observations.contains_key(handle)
215    }
216
217    /// The kernel-refusal rule: refuse to commit a transition that references an
218    /// observation lacking a ledger record (PSP-8 System 11). Returns an error
219    /// naming the first unrecorded handle.
220    pub fn commit_transition(
221        &mut self,
222        event: LedgerEvent,
223        referenced_observations: &[String],
224    ) -> Result<String> {
225        for handle in referenced_observations {
226            if !self.has_observation(handle) {
227                return Err(SdkError::Domain(format!(
228                    "kernel-refusal: transition references unrecorded observation `{handle}`"
229                )));
230            }
231        }
232        self.append(event)
233    }
234
235    /// Verify the Merkle chain end to end (tamper detection).
236    pub fn verify_chain(&self) -> Result<()> {
237        let mut prev = "GENESIS".to_string();
238        for (i, rec) in self.records.iter().enumerate() {
239            if rec.sequence != i as u64 {
240                return Err(SdkError::Domain(format!("sequence gap at index {i}")));
241            }
242            if rec.prev_hash != prev {
243                return Err(SdkError::Domain(format!(
244                    "broken chain at sequence {}",
245                    rec.sequence
246                )));
247            }
248            let expected = chain_hash(&rec.prev_hash, rec.sequence, &rec.event)?;
249            if expected != rec.hash {
250                return Err(SdkError::Domain(format!(
251                    "hash mismatch at sequence {}",
252                    rec.sequence
253                )));
254            }
255            prev = rec.hash.clone();
256        }
257        Ok(())
258    }
259}
260
261/// Reconstruct the accepted trajectory deterministically from the recorded
262/// events (R3). Replay reads recorded observations rather than re-running
263/// nondeterministic sources.
264pub fn replay_accepted_trajectory(ledger: &Ledger) -> Vec<(String, u32, f64)> {
265    ledger
266        .records()
267        .iter()
268        .filter_map(|r| match &r.event {
269            LedgerEvent::CandidateAccepted {
270                node_id,
271                generation,
272                energy,
273            } => Some((node_id.clone(), *generation, *energy)),
274            _ => None,
275        })
276        .collect()
277}
278
279/// R1 durable single-assignment outcomes. Every proposal/commit outcome is
280/// written once and never reassigned; redelivery of the same idempotency key
281/// with equivalent content returns the prior result, and key reuse for
282/// different content is invalid.
283#[derive(Debug, Clone, Default)]
284pub struct IdempotencyLog {
285    entries: HashMap<String, (String, String)>, // key -> (content_hash, outcome)
286}
287
288impl IdempotencyLog {
289    pub fn new() -> Self {
290        Self::default()
291    }
292
293    /// Record an outcome under an idempotency key. On first write, stores and
294    /// returns the outcome. On redelivery with equivalent content, returns the
295    /// prior outcome. On key reuse with different content, returns an error.
296    pub fn record(&mut self, key: &str, content: &[u8], outcome: &str) -> Result<String> {
297        let ch = content_hash(content);
298        match self.entries.get(key) {
299            Some((existing_hash, existing_outcome)) => {
300                if existing_hash == &ch {
301                    Ok(existing_outcome.clone())
302                } else {
303                    Err(SdkError::Domain(format!(
304                        "idempotency key `{key}` reused for different content"
305                    )))
306                }
307            }
308            None => {
309                self.entries
310                    .insert(key.to_string(), (ch, outcome.to_string()));
311                Ok(outcome.to_string())
312            }
313        }
314    }
315}
316
317/// R5 write-ahead external effects: each irreversible external effect is
318/// bracketed by intent, result, and (where defined) compensation records under
319/// a stable idempotency key.
320#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
321pub enum ExternalEffectPhase {
322    Intent,
323    Result,
324    Compensation,
325}
326
327#[derive(Debug, Clone, Default)]
328pub struct ExternalEffectLog {
329    phases: HashMap<String, Vec<ExternalEffectPhase>>, // idempotency_key -> phases
330}
331
332impl ExternalEffectLog {
333    pub fn new() -> Self {
334        Self::default()
335    }
336
337    /// Record intent before the external effect executes.
338    pub fn intent(&mut self, key: &str) {
339        self.phases
340            .entry(key.to_string())
341            .or_default()
342            .push(ExternalEffectPhase::Intent);
343    }
344
345    /// Record the result after the external effect executes.
346    pub fn result(&mut self, key: &str) -> Result<()> {
347        let phases = self.phases.get(key).cloned().unwrap_or_default();
348        if !phases.contains(&ExternalEffectPhase::Intent) {
349            return Err(SdkError::Domain(format!(
350                "R5 violation: result recorded for `{key}` without prior intent"
351            )));
352        }
353        self.phases
354            .get_mut(key)
355            .unwrap()
356            .push(ExternalEffectPhase::Result);
357        Ok(())
358    }
359
360    pub fn compensation(&mut self, key: &str) {
361        self.phases
362            .entry(key.to_string())
363            .or_default()
364            .push(ExternalEffectPhase::Compensation);
365    }
366
367    /// Whether an effect was properly bracketed (intent precedes result).
368    pub fn is_bracketed(&self, key: &str) -> bool {
369        match self.phases.get(key) {
370            Some(p) => {
371                let i = p.iter().position(|x| *x == ExternalEffectPhase::Intent);
372                let r = p.iter().position(|x| *x == ExternalEffectPhase::Result);
373                matches!((i, r), (Some(i), Some(r)) if i < r)
374            }
375            None => false,
376        }
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383
384    #[test]
385    fn chain_is_verifiable() {
386        let mut ledger = Ledger::new();
387        ledger
388            .append(LedgerEvent::CandidateAccepted {
389                node_id: "a".into(),
390                generation: 0,
391                energy: 5.0,
392            })
393            .unwrap();
394        ledger
395            .append(LedgerEvent::CandidateAccepted {
396                node_id: "b".into(),
397                generation: 0,
398                energy: 0.0,
399            })
400            .unwrap();
401        assert_eq!(ledger.len(), 2);
402        assert!(ledger.verify_chain().is_ok());
403    }
404
405    #[test]
406    fn tampering_breaks_the_chain() {
407        let mut ledger = Ledger::new();
408        ledger
409            .append(LedgerEvent::CandidateAccepted {
410                node_id: "a".into(),
411                generation: 0,
412                energy: 5.0,
413            })
414            .unwrap();
415        ledger
416            .append(LedgerEvent::CandidateAccepted {
417                node_id: "b".into(),
418                generation: 0,
419                energy: 0.0,
420            })
421            .unwrap();
422        // Tamper with a recorded energy.
423        if let LedgerEvent::CandidateAccepted { energy, .. } = &mut ledger.records[0].event {
424            *energy = 999.0;
425        }
426        assert!(ledger.verify_chain().is_err());
427    }
428
429    #[test]
430    fn replay_reconstructs_accepted_trajectory() {
431        let mut ledger = Ledger::new();
432        ledger
433            .append(LedgerEvent::CandidateRejected {
434                node_id: "a".into(),
435                generation: 0,
436            })
437            .unwrap();
438        ledger
439            .append(LedgerEvent::CandidateAccepted {
440                node_id: "a".into(),
441                generation: 1,
442                energy: 8.0,
443            })
444            .unwrap();
445        ledger
446            .append(LedgerEvent::CandidateAccepted {
447                node_id: "b".into(),
448                generation: 0,
449                energy: 0.0,
450            })
451            .unwrap();
452        let traj = replay_accepted_trajectory(&ledger);
453        assert_eq!(traj, vec![("a".into(), 1, 8.0), ("b".into(), 0, 0.0)]);
454    }
455
456    #[test]
457    fn kernel_refuses_unrecorded_observation() {
458        let mut ledger = Ledger::new();
459        let event = LedgerEvent::EffectApplied {
460            proposal_id: "p1".into(),
461            idempotency_key: "k1".into(),
462        };
463        // Referencing an observation that was never recorded is refused.
464        let err = ledger.commit_transition(event.clone(), &["never-recorded".into()]);
465        assert!(err.is_err());
466
467        // After recording the observation, the same commit succeeds.
468        let handle = ledger.record_observation(b"llm output bytes").unwrap();
469        assert!(ledger.has_observation(&handle));
470        assert!(ledger.commit_transition(event, &[handle]).is_ok());
471    }
472
473    #[test]
474    fn idempotency_redelivery_returns_prior_outcome() {
475        let mut log = IdempotencyLog::new();
476        let first = log.record("k1", b"patch-content", "applied").unwrap();
477        assert_eq!(first, "applied");
478        // Redelivery with same content returns the prior result.
479        let again = log.record("k1", b"patch-content", "applied-again").unwrap();
480        assert_eq!(again, "applied");
481    }
482
483    #[test]
484    fn idempotency_key_reuse_for_different_content_is_invalid() {
485        let mut log = IdempotencyLog::new();
486        log.record("k1", b"content-a", "applied").unwrap();
487        assert!(log.record("k1", b"content-b", "applied").is_err());
488    }
489
490    #[test]
491    fn external_effect_must_be_bracketed() {
492        let mut log = ExternalEffectLog::new();
493        // Result without intent is an R5 violation.
494        assert!(log.result("k1").is_err());
495        log.intent("k1");
496        assert!(log.result("k1").is_ok());
497        assert!(log.is_bracketed("k1"));
498    }
499}