1use crate::UBL_DOMAIN_SIGN;
16use ubl_types::{Cid32, PublicKeyBytes, SignatureBytes};
17use once_cell::sync::Lazy;
18use parking_lot::Mutex;
19use serde::{Deserialize, Serialize};
20use std::fs::{File, OpenOptions};
21use std::io::{BufRead, BufReader, BufWriter, Write};
22use std::path::{Path, PathBuf};
23use std::time::{Duration, SystemTime};
24use thiserror::Error;
25use time::{format_description::well_known::Rfc3339, OffsetDateTime};
26
27#[derive(Debug, Error)]
33pub enum LedgerError {
34 #[error("io error: {0}")]
36 Io(#[from] std::io::Error),
37 #[error("serde json: {0}")]
39 Serde(#[from] serde_json::Error),
40 #[error("canon error: {0}")]
42 Canon(String),
43 #[error("cid mismatch")]
45 CidMismatch,
46 #[error("signature missing")]
48 SigMissing,
49 #[error("signature invalid")]
51 SigInvalid,
52 #[error("writer lock held by another process")]
54 LockHeld,
55}
56
57#[derive(Debug, Clone)]
63pub struct AppendResult {
64 pub path: String,
66 pub line_no: u64,
68 pub cid: Cid32,
70}
71
72#[derive(Debug, Clone, Default)]
74pub enum RotatePolicy {
75 BySizeBytes(u64),
77 #[default]
79 Hourly,
80 None,
82}
83
84#[derive(Debug, Clone)]
86pub enum FsyncPolicy {
87 EveryNLines(u32),
89 IntervalMs(u64),
91 Manual,
93}
94
95impl Default for FsyncPolicy {
96 fn default() -> Self {
97 FsyncPolicy::EveryNLines(100)
98 }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
109pub struct LedgerEntry {
110 pub ts: String,
112 pub cid: Cid32,
114 #[serde(with = "serde_bytes")]
116 pub intent: Vec<u8>,
117 #[serde(skip_serializing_if = "Option::is_none")]
119 pub actor: Option<String>,
120 #[serde(with = "serde_bytes", default, skip_serializing_if = "Vec::is_empty")]
122 pub extra: Vec<u8>,
123 #[serde(skip_serializing_if = "Option::is_none")]
125 pub pubkey: Option<PublicKeyBytes>,
126 #[serde(skip_serializing_if = "Option::is_none")]
128 pub signature: Option<SignatureBytes>,
129}
130
131impl LedgerEntry {
132 pub fn unsigned(
138 intent_value: &serde_json::Value,
139 actor: Option<String>,
140 extra: &[u8],
141 ) -> Result<Self, LedgerError> {
142 let canon =
143 json_atomic::canonize(intent_value).map_err(|e| LedgerError::Canon(format!("{e:?}")))?;
144 let cid = ubl_crypto::blake3_cid(&canon);
145 Ok(Self {
146 ts: OffsetDateTime::now_utc()
147 .format(&Rfc3339)
148 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".into()),
149 cid,
150 intent: canon,
151 actor,
152 extra: extra.to_vec(),
153 pubkey: None,
154 signature: None,
155 })
156 }
157
158 #[cfg(feature = "signing")]
160 #[must_use]
161 pub fn sign(mut self, sk: &ubl_crypto::SecretKey) -> Self {
162 let pk = ubl_crypto::derive_public_bytes(&sk.0);
163 let msg = sign_message(&self.cid);
164 let sig = ubl_crypto::sign_bytes(&msg, &sk.0);
165 self.pubkey = Some(pk);
166 self.signature = Some(sig);
167 self
168 }
169
170 pub fn verify(&self) -> Result<(), LedgerError> {
176 let cid_check = ubl_crypto::blake3_cid(&self.intent);
178 if cid_check != self.cid {
179 return Err(LedgerError::CidMismatch);
180 }
181
182 match (&self.pubkey, &self.signature) {
184 (Some(pk), Some(sig)) => {
185 #[cfg(feature = "signing")]
186 {
187 let msg = sign_message(&self.cid);
188 if !ubl_crypto::verify_bytes(&msg, pk, sig) {
189 return Err(LedgerError::SigInvalid);
190 }
191 }
192 #[cfg(not(feature = "signing"))]
193 {
194 let _ = (pk, sig);
195 return Err(LedgerError::SigInvalid);
196 }
197 }
198 (None, None) => { }
199 _ => return Err(LedgerError::SigMissing),
200 }
201 Ok(())
202 }
203}
204
205#[cfg(feature = "signing")]
207fn sign_message(cid: &Cid32) -> Vec<u8> {
208 let mut m = Vec::with_capacity(UBL_DOMAIN_SIGN.len() + 32);
209 m.extend_from_slice(UBL_DOMAIN_SIGN);
210 m.extend_from_slice(&cid.0);
211 m
212}
213
214#[derive(Debug)]
220struct WriterLock {
221 lock_path: PathBuf,
222}
223
224impl WriterLock {
225 fn acquire(dir: &Path) -> Result<Self, LedgerError> {
227 let lock_path = dir.join(".ubl-writer.lock");
228
229 #[cfg(unix)]
231 {
232 use std::os::unix::fs::OpenOptionsExt;
233 match OpenOptions::new()
234 .write(true)
235 .create_new(true)
236 .mode(0o600)
237 .open(&lock_path)
238 {
239 Ok(_) => Ok(Self { lock_path }),
240 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
241 Err(LedgerError::LockHeld)
242 }
243 Err(e) => Err(LedgerError::Io(e)),
244 }
245 }
246
247 #[cfg(not(unix))]
248 {
249 match OpenOptions::new()
250 .write(true)
251 .create_new(true)
252 .open(&lock_path)
253 {
254 Ok(_) => Ok(Self { lock_path }),
255 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
256 Err(LedgerError::LockHeld)
257 }
258 Err(e) => Err(LedgerError::Io(e)),
259 }
260 }
261 }
262}
263
264impl Drop for WriterLock {
265 fn drop(&mut self) {
266 let _ = std::fs::remove_file(&self.lock_path);
267 }
268}
269
270pub struct LedgerWriter {
276 file: BufWriter<File>,
277 current_path: PathBuf,
278 base_dir: PathBuf,
279 line_no: u64,
280 last_sync_line: u64,
281 opened_at: OffsetDateTime,
282 rotate: RotatePolicy,
283 fsync: FsyncPolicy,
284 _lock: WriterLock,
285}
286
287static LAST_FSYNC: Lazy<Mutex<SystemTime>> = Lazy::new(|| Mutex::new(SystemTime::now()));
289
290impl LedgerWriter {
291 pub fn open_with<P: AsRef<Path>>(
297 path: P,
298 rotate: RotatePolicy,
299 fsync: FsyncPolicy,
300 ) -> Result<Self, LedgerError> {
301 let path = path.as_ref().to_path_buf();
302 let base_dir = path
303 .parent()
304 .unwrap_or_else(|| Path::new("."))
305 .to_path_buf();
306 std::fs::create_dir_all(&base_dir)?;
307 let _lock = WriterLock::acquire(&base_dir)?;
308
309 let file = OpenOptions::new().create(true).append(true).open(&path)?;
310 let line_no = 0u64; Ok(Self {
313 file: BufWriter::new(file),
314 current_path: path,
315 base_dir,
316 line_no,
317 last_sync_line: 0,
318 opened_at: OffsetDateTime::now_utc(),
319 rotate,
320 fsync,
321 _lock,
322 })
323 }
324
325 pub fn open_append<P: AsRef<Path>>(path: P) -> Result<Self, LedgerError> {
331 Self::open_with(
332 path,
333 RotatePolicy::Hourly,
334 FsyncPolicy::EveryNLines(100),
335 )
336 }
337
338 pub fn append(&mut self, entry: &LedgerEntry) -> Result<AppendResult, LedgerError> {
344 self.maybe_rotate()?;
346
347 let v = serde_json::to_value(entry)?;
349 let canon = json_atomic::canonize(&v).map_err(|e| LedgerError::Canon(format!("{e:?}")))?;
350 self.file.write_all(&canon)?;
351 self.file.write_all(b"\n")?;
352 self.line_no += 1;
353
354 self.maybe_fsync()?;
356
357 #[cfg(feature = "metrics")]
358 metrics::counter!("ubl_entries_appended_total").increment(1);
359
360 Ok(AppendResult {
361 path: self.current_path.to_string_lossy().into_owned(),
362 line_no: self.line_no,
363 cid: entry.cid,
364 })
365 }
366
367 pub fn fsync(&mut self) -> Result<(), LedgerError> {
373 self.file.flush()?;
374 self.file.get_ref().sync_data()?;
375 self.last_sync_line = self.line_no;
376 Ok(())
377 }
378
379 #[must_use]
381 pub fn current_path(&self) -> &Path {
382 &self.current_path
383 }
384
385 #[must_use]
387 pub fn line_no(&self) -> u64 {
388 self.line_no
389 }
390
391 fn maybe_fsync(&mut self) -> Result<(), LedgerError> {
392 match &self.fsync {
393 FsyncPolicy::EveryNLines(n) => {
394 if (self.line_no - self.last_sync_line) >= u64::from(*n) {
395 self.fsync()?;
396 }
397 }
398 FsyncPolicy::IntervalMs(ms) => {
399 let mut guard = LAST_FSYNC.lock();
400 if guard.elapsed().unwrap_or(Duration::ZERO) >= Duration::from_millis(*ms) {
401 self.fsync()?;
402 *guard = SystemTime::now();
403 }
404 }
405 FsyncPolicy::Manual => {}
406 }
407 Ok(())
408 }
409
410 fn maybe_rotate(&mut self) -> Result<(), LedgerError> {
411 let need = match &self.rotate {
412 RotatePolicy::None => false,
413 RotatePolicy::BySizeBytes(max) => {
414 let size = self.file.get_ref().metadata()?.len();
415 size >= *max
416 }
417 RotatePolicy::Hourly => {
418 let now = OffsetDateTime::now_utc();
419 now.hour() != self.opened_at.hour() || now.date() != self.opened_at.date()
420 }
421 };
422
423 if !need {
424 return Ok(());
425 }
426
427 self.file.flush()?;
429 self.file.get_ref().sync_data()?;
430
431 let now = OffsetDateTime::now_utc();
433 let dir = self.base_dir.join(format!("{}", now.date()));
434 std::fs::create_dir_all(&dir)?;
435 let next = dir.join(format!("{:02}.ndjson", now.hour()));
436
437 let f = OpenOptions::new().create(true).append(true).open(&next)?;
438 self.file = BufWriter::new(f);
439 self.current_path = next;
440 self.line_no = 0;
441 self.last_sync_line = 0;
442 self.opened_at = now;
443
444 #[cfg(feature = "metrics")]
445 metrics::counter!("ubl_writer_rotate_total").increment(1);
446
447 Ok(())
448 }
449}
450
451pub struct SimpleLedgerWriter {
457 file: File,
458}
459
460impl SimpleLedgerWriter {
461 pub fn open_append<P: AsRef<Path>>(path: P) -> Result<Self, LedgerError> {
467 let file = OpenOptions::new().create(true).append(true).open(path)?;
468 Ok(Self { file })
469 }
470
471 pub fn append(&mut self, entry: &LedgerEntry) -> Result<(), LedgerError> {
477 let v = serde_json::to_value(entry)?;
478 let canon = json_atomic::canonize(&v).map_err(|e| LedgerError::Canon(format!("{e:?}")))?;
479 self.file.write_all(&canon)?;
480 self.file.write_all(b"\n")?;
481 Ok(())
482 }
483
484 pub fn sync(&mut self) -> Result<(), LedgerError> {
490 self.file.sync_all()?;
491 Ok(())
492 }
493}
494
495pub struct SimpleLedgerReader<R: BufRead> {
501 inner: R,
502}
503
504impl<R: BufRead> SimpleLedgerReader<R> {
505 #[must_use]
507 pub fn new(inner: R) -> Self {
508 Self { inner }
509 }
510}
511
512impl SimpleLedgerReader<BufReader<File>> {
513 pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, LedgerError> {
519 let f = File::open(path)?;
520 Ok(Self {
521 inner: BufReader::new(f),
522 })
523 }
524}
525
526impl<R: BufRead> SimpleLedgerReader<R> {
527 pub fn iter(self) -> SimpleLedgerIter<R> {
529 SimpleLedgerIter {
530 inner: self.inner,
531 buf: String::new(),
532 }
533 }
534}
535
536pub struct SimpleLedgerIter<R: BufRead> {
538 inner: R,
539 buf: String,
540}
541
542impl<R: BufRead> Iterator for SimpleLedgerIter<R> {
543 type Item = Result<LedgerEntry, LedgerError>;
544
545 fn next(&mut self) -> Option<Self::Item> {
546 self.buf.clear();
547 match self.inner.read_line(&mut self.buf) {
548 Ok(0) => None, Ok(_) => {
550 let entry: LedgerEntry = match serde_json::from_str(&self.buf) {
552 Ok(e) => e,
553 Err(e) => return Some(Err(LedgerError::Serde(e))),
554 };
555 Some(entry.verify().map(|()| entry))
556 }
557 Err(e) => Some(Err(LedgerError::Io(e))),
558 }
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use serde_json::json;
566 use tempfile::{tempdir, NamedTempFile};
567
568 #[test]
569 fn unsigned_roundtrip() -> Result<(), LedgerError> {
570 let intent = json!({"intent":"Grant","to":"alice","amount":3});
571 let e = LedgerEntry::unsigned(&intent, Some("tester".into()), b"")?;
572 e.verify()?;
573
574 let tmp = NamedTempFile::new().unwrap();
575 {
576 let mut w = SimpleLedgerWriter::open_append(tmp.path())?;
577 w.append(&e)?;
578 w.sync()?;
579 }
580 let r = SimpleLedgerReader::from_path(tmp.path())?
581 .iter()
582 .next()
583 .unwrap()?;
584 assert_eq!(e.cid, r.cid);
585 assert_eq!(e.intent, r.intent);
586 Ok(())
587 }
588
589 #[test]
590 fn cid_mismatch_detected() -> Result<(), LedgerError> {
591 let intent = json!({"test":"value"});
592 let mut e = LedgerEntry::unsigned(&intent, None, b"")?;
593 e.cid.0[0] ^= 0xFF;
594 assert!(matches!(e.verify(), Err(LedgerError::CidMismatch)));
595 Ok(())
596 }
597
598 #[cfg(feature = "signing")]
599 #[test]
600 fn signed_roundtrip() -> Result<(), LedgerError> {
601 use ubl_crypto::Keypair;
602
603 let kp = Keypair::generate();
604 let intent = json!({"intent":"Freeze","id":"X"});
605 let e = LedgerEntry::unsigned(&intent, None, b"")?.sign(&kp.sk);
606 e.verify()?;
607
608 let tmp = NamedTempFile::new().unwrap();
609 {
610 let mut w = SimpleLedgerWriter::open_append(tmp.path())?;
611 w.append(&e)?;
612 w.sync()?;
613 }
614 let r = SimpleLedgerReader::from_path(tmp.path())?
615 .iter()
616 .next()
617 .unwrap()?;
618 assert_eq!(e.cid, r.cid);
619 assert!(r.pubkey.is_some());
620 assert!(r.signature.is_some());
621 Ok(())
622 }
623
624 #[test]
625 fn multiple_entries() -> Result<(), LedgerError> {
626 let tmp = NamedTempFile::new().unwrap();
627 {
628 let mut w = SimpleLedgerWriter::open_append(tmp.path())?;
629 for i in 0..5 {
630 let intent = json!({"seq": i});
631 let e = LedgerEntry::unsigned(&intent, None, b"")?;
632 w.append(&e)?;
633 }
634 w.sync()?;
635 }
636 let count = SimpleLedgerReader::from_path(tmp.path())?.iter().count();
637 assert_eq!(count, 5);
638 Ok(())
639 }
640
641 #[test]
642 fn production_writer_append_result() -> Result<(), LedgerError> {
643 let dir = tempdir().unwrap();
644 let path = dir.path().join("ledger.ndjson");
645 let mut w = LedgerWriter::open_with(path, RotatePolicy::None, FsyncPolicy::Manual)?;
646
647 let intent = json!({"action":"test"});
648 let e = LedgerEntry::unsigned(&intent, None, b"")?;
649 let res = w.append(&e)?;
650
651 assert_eq!(res.line_no, 1);
652 assert_eq!(res.cid, e.cid);
653 w.fsync()?;
654 Ok(())
655 }
656
657 #[test]
658 fn writer_lock_prevents_double_open() -> Result<(), LedgerError> {
659 let dir = tempdir().unwrap();
660 let path = dir.path().join("ledger.ndjson");
661
662 let _w1 = LedgerWriter::open_with(&path, RotatePolicy::None, FsyncPolicy::Manual)?;
663 let w2 = LedgerWriter::open_with(&path, RotatePolicy::None, FsyncPolicy::Manual);
664
665 assert!(matches!(w2, Err(LedgerError::LockHeld)));
666 Ok(())
667 }
668}
669