ubl_ledger/
ledger.rs

1//! Minimal NDJSON ledger with canonical bytes, CID verification, and optional signing.
2//!
3//! This module provides a simplified, deterministic ledger API using:
4//! - `json_atomic::canonize` for canonical bytes
5//! - `BLAKE3(canonical_bytes)` for CID
6//! - Optional Ed25519 signing with domain `UBL:LEDGER:v1`
7//!
8//! ## Production features
9//!
10//! - **Rotation**: by size or hourly
11//! - **Fsync**: configurable durability policy
12//! - **Lock**: single writer per directory
13//! - **Metrics**: optional via `metrics` feature
14
15use crate::UBL_DOMAIN_SIGN;
16use ubl_types::{Cid32, PublicKeyBytes, SignatureBytes};
17use once_cell::sync::Lazy;
18use parking_lot::Mutex;
19use serde::{Deserialize, Serialize};
20use std::fs::{File, OpenOptions};
21use std::io::{BufRead, BufReader, BufWriter, Write};
22use std::path::{Path, PathBuf};
23use std::time::{Duration, SystemTime};
24use thiserror::Error;
25use time::{format_description::well_known::Rfc3339, OffsetDateTime};
26
27// ══════════════════════════════════════════════════════════════════════════════
28// Errors
29// ══════════════════════════════════════════════════════════════════════════════
30
31/// Errors from ledger operations.
32#[derive(Debug, Error)]
33pub enum LedgerError {
34    /// IO error.
35    #[error("io error: {0}")]
36    Io(#[from] std::io::Error),
37    /// JSON serialization error.
38    #[error("serde json: {0}")]
39    Serde(#[from] serde_json::Error),
40    /// Canonicalization error.
41    #[error("canon error: {0}")]
42    Canon(String),
43    /// CID mismatch (content integrity).
44    #[error("cid mismatch")]
45    CidMismatch,
46    /// Signature missing (partial signature state).
47    #[error("signature missing")]
48    SigMissing,
49    /// Signature verification failed.
50    #[error("signature invalid")]
51    SigInvalid,
52    /// Writer lock already held.
53    #[error("writer lock held by another process")]
54    LockHeld,
55}
56
57// ══════════════════════════════════════════════════════════════════════════════
58// Types
59// ══════════════════════════════════════════════════════════════════════════════
60
61/// Result of append for idempotency/offset tracking.
62#[derive(Debug, Clone)]
63pub struct AppendResult {
64    /// Path of the file written to.
65    pub path: String,
66    /// Line number (1-based).
67    pub line_no: u64,
68    /// CID of the appended entry.
69    pub cid: Cid32,
70}
71
72/// Rotation policies for NDJSON files.
73#[derive(Debug, Clone, Default)]
74pub enum RotatePolicy {
75    /// Rotate when file exceeds N bytes.
76    BySizeBytes(u64),
77    /// Rotate every hour (YYYY-MM-DD/HH.ndjson).
78    #[default]
79    Hourly,
80    /// No rotation.
81    None,
82}
83
84/// Fsync (durability) policies.
85#[derive(Debug, Clone)]
86pub enum FsyncPolicy {
87    /// Fsync every N lines.
88    EveryNLines(u32),
89    /// Fsync at least every N milliseconds.
90    IntervalMs(u64),
91    /// Manual fsync only.
92    Manual,
93}
94
95impl Default for FsyncPolicy {
96    fn default() -> Self {
97        FsyncPolicy::EveryNLines(100)
98    }
99}
100
101// ══════════════════════════════════════════════════════════════════════════════
102// LedgerEntry
103// ══════════════════════════════════════════════════════════════════════════════
104
105/// Minimal ledger entry (NDJSON line).
106///
107/// Contains canonical bytes of the intent, their CID, and optional signature.
108#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
109pub struct LedgerEntry {
110    /// ISO-8601 timestamp (UTC).
111    pub ts: String,
112    /// CID = BLAKE3(canonical intent bytes).
113    pub cid: Cid32,
114    /// Canonical intent bytes (from `json_atomic::canonize`).
115    #[serde(with = "serde_bytes")]
116    pub intent: Vec<u8>,
117    /// Logical actor (optional).
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub actor: Option<String>,
120    /// Extra blob (forward-compatible).
121    #[serde(with = "serde_bytes", default, skip_serializing_if = "Vec::is_empty")]
122    pub extra: Vec<u8>,
123    /// Public key (if signed).
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub pubkey: Option<PublicKeyBytes>,
126    /// Signature (if signed).
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub signature: Option<SignatureBytes>,
129}
130
131impl LedgerEntry {
132    /// Creates an unsigned entry. CID = BLAKE3(canonical bytes).
133    ///
134    /// # Errors
135    ///
136    /// Returns error if canonicalization fails.
137    pub fn unsigned(
138        intent_value: &serde_json::Value,
139        actor: Option<String>,
140        extra: &[u8],
141    ) -> Result<Self, LedgerError> {
142        let canon =
143            json_atomic::canonize(intent_value).map_err(|e| LedgerError::Canon(format!("{e:?}")))?;
144        let cid = ubl_crypto::blake3_cid(&canon);
145        Ok(Self {
146            ts: OffsetDateTime::now_utc()
147                .format(&Rfc3339)
148                .unwrap_or_else(|_| "1970-01-01T00:00:00Z".into()),
149            cid,
150            intent: canon,
151            actor,
152            extra: extra.to_vec(),
153            pubkey: None,
154            signature: None,
155        })
156    }
157
158    /// Signs the entry with domain `UBL:LEDGER:v1` + CID.
159    #[cfg(feature = "signing")]
160    #[must_use]
161    pub fn sign(mut self, sk: &ubl_crypto::SecretKey) -> Self {
162        let pk = ubl_crypto::derive_public_bytes(&sk.0);
163        let msg = sign_message(&self.cid);
164        let sig = ubl_crypto::sign_bytes(&msg, &sk.0);
165        self.pubkey = Some(pk);
166        self.signature = Some(sig);
167        self
168    }
169
170    /// Verifies: (1) CID == BLAKE3(intent); (2) signature if present.
171    ///
172    /// # Errors
173    ///
174    /// Returns error if CID mismatch or signature invalid.
175    pub fn verify(&self) -> Result<(), LedgerError> {
176        // 1) CID check
177        let cid_check = ubl_crypto::blake3_cid(&self.intent);
178        if cid_check != self.cid {
179            return Err(LedgerError::CidMismatch);
180        }
181
182        // 2) Signature check (optional)
183        match (&self.pubkey, &self.signature) {
184            (Some(pk), Some(sig)) => {
185                #[cfg(feature = "signing")]
186                {
187                    let msg = sign_message(&self.cid);
188                    if !ubl_crypto::verify_bytes(&msg, pk, sig) {
189                        return Err(LedgerError::SigInvalid);
190                    }
191                }
192                #[cfg(not(feature = "signing"))]
193                {
194                    let _ = (pk, sig);
195                    return Err(LedgerError::SigInvalid);
196                }
197            }
198            (None, None) => { /* ok: unsigned entry */ }
199            _ => return Err(LedgerError::SigMissing),
200        }
201        Ok(())
202    }
203}
204
205/// Builds the signing message: domain + CID.
206#[cfg(feature = "signing")]
207fn sign_message(cid: &Cid32) -> Vec<u8> {
208    let mut m = Vec::with_capacity(UBL_DOMAIN_SIGN.len() + 32);
209    m.extend_from_slice(UBL_DOMAIN_SIGN);
210    m.extend_from_slice(&cid.0);
211    m
212}
213
214// ══════════════════════════════════════════════════════════════════════════════
215// Writer Lock (single writer per directory)
216// ══════════════════════════════════════════════════════════════════════════════
217
218/// Guard for exclusive writer access.
219#[derive(Debug)]
220struct WriterLock {
221    lock_path: PathBuf,
222}
223
224impl WriterLock {
225    /// Acquires exclusive lock via lock file.
226    fn acquire(dir: &Path) -> Result<Self, LedgerError> {
227        let lock_path = dir.join(".ubl-writer.lock");
228
229        // Try to create exclusively
230        #[cfg(unix)]
231        {
232            use std::os::unix::fs::OpenOptionsExt;
233            match OpenOptions::new()
234                .write(true)
235                .create_new(true)
236                .mode(0o600)
237                .open(&lock_path)
238            {
239                Ok(_) => Ok(Self { lock_path }),
240                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
241                    Err(LedgerError::LockHeld)
242                }
243                Err(e) => Err(LedgerError::Io(e)),
244            }
245        }
246
247        #[cfg(not(unix))]
248        {
249            match OpenOptions::new()
250                .write(true)
251                .create_new(true)
252                .open(&lock_path)
253            {
254                Ok(_) => Ok(Self { lock_path }),
255                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
256                    Err(LedgerError::LockHeld)
257                }
258                Err(e) => Err(LedgerError::Io(e)),
259            }
260        }
261    }
262}
263
264impl Drop for WriterLock {
265    fn drop(&mut self) {
266        let _ = std::fs::remove_file(&self.lock_path);
267    }
268}
269
270// ══════════════════════════════════════════════════════════════════════════════
271// LedgerWriter (production)
272// ══════════════════════════════════════════════════════════════════════════════
273
274/// Production NDJSON writer with rotation, fsync, and locking.
275pub struct LedgerWriter {
276    file: BufWriter<File>,
277    current_path: PathBuf,
278    base_dir: PathBuf,
279    line_no: u64,
280    last_sync_line: u64,
281    opened_at: OffsetDateTime,
282    rotate: RotatePolicy,
283    fsync: FsyncPolicy,
284    _lock: WriterLock,
285}
286
287/// Global timestamp for interval-based fsync.
288static LAST_FSYNC: Lazy<Mutex<SystemTime>> = Lazy::new(|| Mutex::new(SystemTime::now()));
289
290impl LedgerWriter {
291    /// Opens with custom rotation and fsync policies.
292    ///
293    /// # Errors
294    ///
295    /// Returns error if lock cannot be acquired or file cannot be opened.
296    pub fn open_with<P: AsRef<Path>>(
297        path: P,
298        rotate: RotatePolicy,
299        fsync: FsyncPolicy,
300    ) -> Result<Self, LedgerError> {
301        let path = path.as_ref().to_path_buf();
302        let base_dir = path
303            .parent()
304            .unwrap_or_else(|| Path::new("."))
305            .to_path_buf();
306        std::fs::create_dir_all(&base_dir)?;
307        let _lock = WriterLock::acquire(&base_dir)?;
308
309        let file = OpenOptions::new().create(true).append(true).open(&path)?;
310        let line_no = 0u64; // Could count lines on open if needed
311
312        Ok(Self {
313            file: BufWriter::new(file),
314            current_path: path,
315            base_dir,
316            line_no,
317            last_sync_line: 0,
318            opened_at: OffsetDateTime::now_utc(),
319            rotate,
320            fsync,
321            _lock,
322        })
323    }
324
325    /// Opens with default policies (Hourly rotation, fsync every 100 lines).
326    ///
327    /// # Errors
328    ///
329    /// Returns error if lock cannot be acquired or file cannot be opened.
330    pub fn open_append<P: AsRef<Path>>(path: P) -> Result<Self, LedgerError> {
331        Self::open_with(
332            path,
333            RotatePolicy::Hourly,
334            FsyncPolicy::EveryNLines(100),
335        )
336    }
337
338    /// Appends an entry with automatic rotation and fsync.
339    ///
340    /// # Errors
341    ///
342    /// Returns error if write fails.
343    pub fn append(&mut self, entry: &LedgerEntry) -> Result<AppendResult, LedgerError> {
344        // 1) Check rotation
345        self.maybe_rotate()?;
346
347        // 2) Canonical serialization
348        let v = serde_json::to_value(entry)?;
349        let canon = json_atomic::canonize(&v).map_err(|e| LedgerError::Canon(format!("{e:?}")))?;
350        self.file.write_all(&canon)?;
351        self.file.write_all(b"\n")?;
352        self.line_no += 1;
353
354        // 3) Fsync per policy
355        self.maybe_fsync()?;
356
357        #[cfg(feature = "metrics")]
358        metrics::counter!("ubl_entries_appended_total").increment(1);
359
360        Ok(AppendResult {
361            path: self.current_path.to_string_lossy().into_owned(),
362            line_no: self.line_no,
363            cid: entry.cid,
364        })
365    }
366
367    /// Forces fsync to disk.
368    ///
369    /// # Errors
370    ///
371    /// Returns error if sync fails.
372    pub fn fsync(&mut self) -> Result<(), LedgerError> {
373        self.file.flush()?;
374        self.file.get_ref().sync_data()?;
375        self.last_sync_line = self.line_no;
376        Ok(())
377    }
378
379    /// Returns current file path.
380    #[must_use]
381    pub fn current_path(&self) -> &Path {
382        &self.current_path
383    }
384
385    /// Returns current line number.
386    #[must_use]
387    pub fn line_no(&self) -> u64 {
388        self.line_no
389    }
390
391    fn maybe_fsync(&mut self) -> Result<(), LedgerError> {
392        match &self.fsync {
393            FsyncPolicy::EveryNLines(n) => {
394                if (self.line_no - self.last_sync_line) >= u64::from(*n) {
395                    self.fsync()?;
396                }
397            }
398            FsyncPolicy::IntervalMs(ms) => {
399                let mut guard = LAST_FSYNC.lock();
400                if guard.elapsed().unwrap_or(Duration::ZERO) >= Duration::from_millis(*ms) {
401                    self.fsync()?;
402                    *guard = SystemTime::now();
403                }
404            }
405            FsyncPolicy::Manual => {}
406        }
407        Ok(())
408    }
409
410    fn maybe_rotate(&mut self) -> Result<(), LedgerError> {
411        let need = match &self.rotate {
412            RotatePolicy::None => false,
413            RotatePolicy::BySizeBytes(max) => {
414                let size = self.file.get_ref().metadata()?.len();
415                size >= *max
416            }
417            RotatePolicy::Hourly => {
418                let now = OffsetDateTime::now_utc();
419                now.hour() != self.opened_at.hour() || now.date() != self.opened_at.date()
420            }
421        };
422
423        if !need {
424            return Ok(());
425        }
426
427        // Flush and sync current file
428        self.file.flush()?;
429        self.file.get_ref().sync_data()?;
430
431        // Open new file with path YYYY-MM-DD/HH.ndjson
432        let now = OffsetDateTime::now_utc();
433        let dir = self.base_dir.join(format!("{}", now.date()));
434        std::fs::create_dir_all(&dir)?;
435        let next = dir.join(format!("{:02}.ndjson", now.hour()));
436
437        let f = OpenOptions::new().create(true).append(true).open(&next)?;
438        self.file = BufWriter::new(f);
439        self.current_path = next;
440        self.line_no = 0;
441        self.last_sync_line = 0;
442        self.opened_at = now;
443
444        #[cfg(feature = "metrics")]
445        metrics::counter!("ubl_writer_rotate_total").increment(1);
446
447        Ok(())
448    }
449}
450
451// ══════════════════════════════════════════════════════════════════════════════
452// SimpleLedgerWriter (basic, no rotation/lock)
453// ══════════════════════════════════════════════════════════════════════════════
454
455/// Basic append-only NDJSON writer (no rotation, no lock).
456pub struct SimpleLedgerWriter {
457    file: File,
458}
459
460impl SimpleLedgerWriter {
461    /// Opens (or creates) file for append.
462    ///
463    /// # Errors
464    ///
465    /// Returns error if file cannot be opened.
466    pub fn open_append<P: AsRef<Path>>(path: P) -> Result<Self, LedgerError> {
467        let file = OpenOptions::new().create(true).append(true).open(path)?;
468        Ok(Self { file })
469    }
470
471    /// Appends an entry as a canonical NDJSON line.
472    ///
473    /// # Errors
474    ///
475    /// Returns error if write fails.
476    pub fn append(&mut self, entry: &LedgerEntry) -> Result<(), LedgerError> {
477        let v = serde_json::to_value(entry)?;
478        let canon = json_atomic::canonize(&v).map_err(|e| LedgerError::Canon(format!("{e:?}")))?;
479        self.file.write_all(&canon)?;
480        self.file.write_all(b"\n")?;
481        Ok(())
482    }
483
484    /// Flushes and syncs to disk.
485    ///
486    /// # Errors
487    ///
488    /// Returns error if sync fails.
489    pub fn sync(&mut self) -> Result<(), LedgerError> {
490        self.file.sync_all()?;
491        Ok(())
492    }
493}
494
495// ══════════════════════════════════════════════════════════════════════════════
496// Reader
497// ══════════════════════════════════════════════════════════════════════════════
498
499/// NDJSON reader with verification.
500pub struct SimpleLedgerReader<R: BufRead> {
501    inner: R,
502}
503
504impl<R: BufRead> SimpleLedgerReader<R> {
505    /// Creates a new reader.
506    #[must_use]
507    pub fn new(inner: R) -> Self {
508        Self { inner }
509    }
510}
511
512impl SimpleLedgerReader<BufReader<File>> {
513    /// Opens a file for reading.
514    ///
515    /// # Errors
516    ///
517    /// Returns error if file cannot be opened.
518    pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, LedgerError> {
519        let f = File::open(path)?;
520        Ok(Self {
521            inner: BufReader::new(f),
522        })
523    }
524}
525
526impl<R: BufRead> SimpleLedgerReader<R> {
527    /// Returns an iterator over verified entries.
528    pub fn iter(self) -> SimpleLedgerIter<R> {
529        SimpleLedgerIter {
530            inner: self.inner,
531            buf: String::new(),
532        }
533    }
534}
535
536/// Iterator over ledger entries with verification.
537pub struct SimpleLedgerIter<R: BufRead> {
538    inner: R,
539    buf: String,
540}
541
542impl<R: BufRead> Iterator for SimpleLedgerIter<R> {
543    type Item = Result<LedgerEntry, LedgerError>;
544
545    fn next(&mut self) -> Option<Self::Item> {
546        self.buf.clear();
547        match self.inner.read_line(&mut self.buf) {
548            Ok(0) => None, // EOF
549            Ok(_) => {
550                // Parse → verify → yield
551                let entry: LedgerEntry = match serde_json::from_str(&self.buf) {
552                    Ok(e) => e,
553                    Err(e) => return Some(Err(LedgerError::Serde(e))),
554                };
555                Some(entry.verify().map(|()| entry))
556            }
557            Err(e) => Some(Err(LedgerError::Io(e))),
558        }
559    }
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use serde_json::json;
566    use tempfile::{tempdir, NamedTempFile};
567
568    #[test]
569    fn unsigned_roundtrip() -> Result<(), LedgerError> {
570        let intent = json!({"intent":"Grant","to":"alice","amount":3});
571        let e = LedgerEntry::unsigned(&intent, Some("tester".into()), b"")?;
572        e.verify()?;
573
574        let tmp = NamedTempFile::new().unwrap();
575        {
576            let mut w = SimpleLedgerWriter::open_append(tmp.path())?;
577            w.append(&e)?;
578            w.sync()?;
579        }
580        let r = SimpleLedgerReader::from_path(tmp.path())?
581            .iter()
582            .next()
583            .unwrap()?;
584        assert_eq!(e.cid, r.cid);
585        assert_eq!(e.intent, r.intent);
586        Ok(())
587    }
588
589    #[test]
590    fn cid_mismatch_detected() -> Result<(), LedgerError> {
591        let intent = json!({"test":"value"});
592        let mut e = LedgerEntry::unsigned(&intent, None, b"")?;
593        e.cid.0[0] ^= 0xFF;
594        assert!(matches!(e.verify(), Err(LedgerError::CidMismatch)));
595        Ok(())
596    }
597
598    #[cfg(feature = "signing")]
599    #[test]
600    fn signed_roundtrip() -> Result<(), LedgerError> {
601        use ubl_crypto::Keypair;
602
603        let kp = Keypair::generate();
604        let intent = json!({"intent":"Freeze","id":"X"});
605        let e = LedgerEntry::unsigned(&intent, None, b"")?.sign(&kp.sk);
606        e.verify()?;
607
608        let tmp = NamedTempFile::new().unwrap();
609        {
610            let mut w = SimpleLedgerWriter::open_append(tmp.path())?;
611            w.append(&e)?;
612            w.sync()?;
613        }
614        let r = SimpleLedgerReader::from_path(tmp.path())?
615            .iter()
616            .next()
617            .unwrap()?;
618        assert_eq!(e.cid, r.cid);
619        assert!(r.pubkey.is_some());
620        assert!(r.signature.is_some());
621        Ok(())
622    }
623
624    #[test]
625    fn multiple_entries() -> Result<(), LedgerError> {
626        let tmp = NamedTempFile::new().unwrap();
627        {
628            let mut w = SimpleLedgerWriter::open_append(tmp.path())?;
629            for i in 0..5 {
630                let intent = json!({"seq": i});
631                let e = LedgerEntry::unsigned(&intent, None, b"")?;
632                w.append(&e)?;
633            }
634            w.sync()?;
635        }
636        let count = SimpleLedgerReader::from_path(tmp.path())?.iter().count();
637        assert_eq!(count, 5);
638        Ok(())
639    }
640
641    #[test]
642    fn production_writer_append_result() -> Result<(), LedgerError> {
643        let dir = tempdir().unwrap();
644        let path = dir.path().join("ledger.ndjson");
645        let mut w = LedgerWriter::open_with(path, RotatePolicy::None, FsyncPolicy::Manual)?;
646
647        let intent = json!({"action":"test"});
648        let e = LedgerEntry::unsigned(&intent, None, b"")?;
649        let res = w.append(&e)?;
650
651        assert_eq!(res.line_no, 1);
652        assert_eq!(res.cid, e.cid);
653        w.fsync()?;
654        Ok(())
655    }
656
657    #[test]
658    fn writer_lock_prevents_double_open() -> Result<(), LedgerError> {
659        let dir = tempdir().unwrap();
660        let path = dir.path().join("ledger.ndjson");
661
662        let _w1 = LedgerWriter::open_with(&path, RotatePolicy::None, FsyncPolicy::Manual)?;
663        let w2 = LedgerWriter::open_with(&path, RotatePolicy::None, FsyncPolicy::Manual);
664
665        assert!(matches!(w2, Err(LedgerError::LockHeld)));
666        Ok(())
667    }
668}
669