Skip to main content

edgesentry_rs/buffer/
mod.rs

1//! Offline buffer / store-and-forward for resilience (CLS-09).
2//!
3//! When an edge device loses connectivity it can push signed [`AuditRecord`]s into an
4//! [`OfflineBuffer`].  When the link recovers the caller calls [`OfflineBuffer::flush`] which
5//! replays the buffered records through an [`IngestService`] in the original order and reports
6//! how many were accepted.
7//!
8//! # Pluggable storage
9//! [`BufferStore`] is a synchronous trait.  The default implementation is
10//! [`InMemoryBufferStore`] (volatile, useful for tests and embedded environments with no
11//! persistent storage).  An optional SQLite-backed store is available behind the
12//! `buffer-sqlite` feature flag ([`SqliteBufferStore`]).
13//!
14//! # Flush semantics
15//! Records are replayed oldest-first.  A record that returns
16//! [`IngestServiceError::Verify(IngestError::Duplicate)`] is treated as "already delivered"
17//! and counted as accepted.  Any other error stops the flush at that point and the
18//! remaining records stay in the buffer.
19
20use thiserror::Error;
21
22use crate::record::AuditRecord;
23use crate::ingest::{IngestService, IngestServiceError, RawDataStore, AuditLedger, OperationLogStore};
24use crate::ingest::IngestError;
25
26/// A single entry in the offline buffer.
27#[derive(Debug, Clone)]
28pub struct BufferedEntry {
29    pub record: AuditRecord,
30    pub raw_payload: Vec<u8>,
31}
32
33/// Pluggable persistent / volatile storage for [`OfflineBuffer`].
34pub trait BufferStore {
35    type Error: std::error::Error + Send + Sync + 'static;
36
37    /// Append an entry to the back of the buffer.
38    fn push(&mut self, entry: BufferedEntry) -> Result<(), Self::Error>;
39
40    /// Return all entries in insertion order.
41    fn entries(&self) -> Result<Vec<BufferedEntry>, Self::Error>;
42
43    /// Remove the oldest `n` entries.  If `n >= len` the buffer is emptied.
44    fn drop_oldest(&mut self, n: usize) -> Result<(), Self::Error>;
45
46    /// Remove every entry.
47    fn clear(&mut self) -> Result<(), Self::Error>;
48
49    /// Return the current number of buffered entries.
50    fn len(&self) -> Result<usize, Self::Error>;
51
52    /// Return `true` when there are no buffered entries.
53    fn is_empty(&self) -> Result<bool, Self::Error> {
54        Ok(self.len()? == 0)
55    }
56}
57
58/// Report returned by [`OfflineBuffer::flush`].
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct FlushReport {
61    /// Number of records that were accepted (or already known to the service as duplicates).
62    pub accepted: usize,
63    /// Number of records that remain in the buffer after the flush.
64    pub remaining: usize,
65}
66
67/// Error returned by [`OfflineBuffer::flush`].
68#[derive(Debug, Error)]
69pub enum FlushError<SE>
70where
71    SE: std::error::Error + Send + Sync + 'static,
72{
73    #[error("buffer store error: {0}")]
74    Store(SE),
75    #[error("ingest service error: {0}")]
76    Ingest(IngestServiceError),
77}
78
79/// A store-and-forward buffer that accumulates signed [`AuditRecord`]s during connectivity loss
80/// and replays them in order when the link recovers.
81pub struct OfflineBuffer<S: BufferStore> {
82    store: S,
83}
84
85impl<S: BufferStore> OfflineBuffer<S> {
86    /// Create a new buffer backed by `store`.
87    pub fn new(store: S) -> Self {
88        Self { store }
89    }
90
91    /// Push a signed record into the buffer.
92    pub fn push(&mut self, record: AuditRecord, raw_payload: Vec<u8>) -> Result<(), S::Error> {
93        self.store.push(BufferedEntry { record, raw_payload })
94    }
95
96    /// Return the number of records currently buffered.
97    pub fn len(&self) -> Result<usize, S::Error> {
98        self.store.len()
99    }
100
101    /// Return `true` when the buffer is empty.
102    pub fn is_empty(&self) -> Result<bool, S::Error> {
103        self.store.is_empty()
104    }
105
106    /// Replay buffered records through `service` and return a [`FlushReport`].
107    ///
108    /// Records are submitted oldest-first.  A record that is accepted **or** already a
109    /// duplicate is removed from the buffer.  The first non-duplicate error stops the
110    /// replay; any remaining records are preserved for the next flush attempt.
111    pub fn flush<R, L, O>(
112        &mut self,
113        service: &mut IngestService<R, L, O>,
114    ) -> Result<FlushReport, FlushError<S::Error>>
115    where
116        R: RawDataStore,
117        L: AuditLedger,
118        O: OperationLogStore,
119    {
120        let entries = self.store.entries().map_err(FlushError::Store)?;
121        let total = entries.len();
122        let mut accepted = 0usize;
123
124        for entry in &entries {
125            match service.ingest(entry.record.clone(), &entry.raw_payload, None) {
126                Ok(()) => {
127                    accepted += 1;
128                }
129                Err(IngestServiceError::Verify(IngestError::Duplicate { .. })) => {
130                    // Already delivered — count it and move on.
131                    accepted += 1;
132                }
133                Err(other) => {
134                    // Unrecoverable error for this record; stop here.
135                    self.store.drop_oldest(accepted).map_err(FlushError::Store)?;
136                    return Err(FlushError::Ingest(other));
137                }
138            }
139        }
140
141        self.store.drop_oldest(accepted).map_err(FlushError::Store)?;
142        let remaining = total.saturating_sub(accepted);
143        Ok(FlushReport { accepted, remaining })
144    }
145}
146
147// ---------------------------------------------------------------------------
148// InMemoryBufferStore
149// ---------------------------------------------------------------------------
150
151/// Volatile in-memory implementation of [`BufferStore`].  Entries are lost on drop.
152#[derive(Default)]
153pub struct InMemoryBufferStore {
154    entries: Vec<BufferedEntry>,
155}
156
157impl InMemoryBufferStore {
158    pub fn new() -> Self {
159        Self::default()
160    }
161}
162
163#[derive(Debug, Error)]
164#[error("in-memory buffer store error: {message}")]
165pub struct InMemoryBufferStoreError {
166    message: String,
167}
168
169impl BufferStore for InMemoryBufferStore {
170    type Error = InMemoryBufferStoreError;
171
172    fn push(&mut self, entry: BufferedEntry) -> Result<(), Self::Error> {
173        self.entries.push(entry);
174        Ok(())
175    }
176
177    fn entries(&self) -> Result<Vec<BufferedEntry>, Self::Error> {
178        Ok(self.entries.clone())
179    }
180
181    fn drop_oldest(&mut self, n: usize) -> Result<(), Self::Error> {
182        let drain = n.min(self.entries.len());
183        self.entries.drain(..drain);
184        Ok(())
185    }
186
187    fn clear(&mut self) -> Result<(), Self::Error> {
188        self.entries.clear();
189        Ok(())
190    }
191
192    fn len(&self) -> Result<usize, Self::Error> {
193        Ok(self.entries.len())
194    }
195}
196
197// ---------------------------------------------------------------------------
198// SqliteBufferStore (optional feature)
199// ---------------------------------------------------------------------------
200
201#[cfg(feature = "buffer-sqlite")]
202pub mod sqlite {
203    use super::{BufferedEntry, BufferStore};
204    use crate::record::AuditRecord;
205    use thiserror::Error;
206
207    #[derive(Debug, Error)]
208    pub enum SqliteBufferStoreError {
209        #[error("sqlite error: {0}")]
210        Sqlite(#[from] rusqlite::Error),
211        #[error("deserialize error: {0}")]
212        Deserialize(String),
213    }
214
215    /// SQLite-backed implementation of [`BufferStore`].
216    ///
217    /// Records are stored as JSON in a `buffer` table so they survive process restarts.
218    pub struct SqliteBufferStore {
219        conn: rusqlite::Connection,
220    }
221
222    impl SqliteBufferStore {
223        /// Open (or create) the SQLite database at `path`.
224        pub fn open(path: &str) -> Result<Self, SqliteBufferStoreError> {
225            let conn = rusqlite::Connection::open(path)?;
226            conn.execute_batch(
227                "CREATE TABLE IF NOT EXISTS buffer (
228                    id          INTEGER PRIMARY KEY AUTOINCREMENT,
229                    record_json TEXT    NOT NULL,
230                    payload_hex TEXT    NOT NULL
231                );",
232            )?;
233            Ok(Self { conn })
234        }
235
236        /// Open an in-memory SQLite database (useful for tests).
237        pub fn open_in_memory() -> Result<Self, SqliteBufferStoreError> {
238            let conn = rusqlite::Connection::open_in_memory()?;
239            conn.execute_batch(
240                "CREATE TABLE IF NOT EXISTS buffer (
241                    id          INTEGER PRIMARY KEY AUTOINCREMENT,
242                    record_json TEXT    NOT NULL,
243                    payload_hex TEXT    NOT NULL
244                );",
245            )?;
246            Ok(Self { conn })
247        }
248    }
249
250    impl BufferStore for SqliteBufferStore {
251        type Error = SqliteBufferStoreError;
252
253        fn push(&mut self, entry: BufferedEntry) -> Result<(), Self::Error> {
254            let record_json = serde_json::to_string(&entry.record)
255                .map_err(|e| SqliteBufferStoreError::Deserialize(e.to_string()))?;
256            let payload_hex = hex::encode(&entry.raw_payload);
257            self.conn.execute(
258                "INSERT INTO buffer (record_json, payload_hex) VALUES (?1, ?2)",
259                rusqlite::params![record_json, payload_hex],
260            )?;
261            Ok(())
262        }
263
264        fn entries(&self) -> Result<Vec<BufferedEntry>, Self::Error> {
265            let mut stmt = self
266                .conn
267                .prepare("SELECT record_json, payload_hex FROM buffer ORDER BY id ASC")?;
268            let rows = stmt.query_map([], |row| {
269                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
270            })?;
271
272            let mut entries = Vec::new();
273            for row in rows {
274                let (record_json, payload_hex) = row?;
275                let record: AuditRecord = serde_json::from_str(&record_json)
276                    .map_err(|e| SqliteBufferStoreError::Deserialize(e.to_string()))?;
277                let raw_payload = hex::decode(&payload_hex)
278                    .map_err(|e| SqliteBufferStoreError::Deserialize(e.to_string()))?;
279                entries.push(BufferedEntry { record, raw_payload });
280            }
281            Ok(entries)
282        }
283
284        fn drop_oldest(&mut self, n: usize) -> Result<(), Self::Error> {
285            if n == 0 {
286                return Ok(());
287            }
288            self.conn.execute(
289                "DELETE FROM buffer WHERE id IN (
290                    SELECT id FROM buffer ORDER BY id ASC LIMIT ?1
291                )",
292                rusqlite::params![n as i64],
293            )?;
294            Ok(())
295        }
296
297        fn clear(&mut self) -> Result<(), Self::Error> {
298            self.conn.execute("DELETE FROM buffer", [])?;
299            Ok(())
300        }
301
302        fn len(&self) -> Result<usize, Self::Error> {
303            let count: i64 =
304                self.conn.query_row("SELECT COUNT(*) FROM buffer", [], |r| r.get(0))?;
305            Ok(count as usize)
306        }
307    }
308}
309
310// ---------------------------------------------------------------------------
311// Unit tests
312// ---------------------------------------------------------------------------
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use crate::{
318        build_lift_inspection_demo_records_with_payloads,
319        ingest::{
320            InMemoryAuditLedger, InMemoryOperationLog, InMemoryRawDataStore,
321            IngestService, IntegrityPolicyGate,
322        },
323        parse_fixed_hex,
324        record::AuditRecord,
325    };
326    use ed25519_dalek::SigningKey;
327
328    const PRIV_HEX: &str = "0101010101010101010101010101010101010101010101010101010101010101";
329
330    fn make_service() -> IngestService<InMemoryRawDataStore, InMemoryAuditLedger, InMemoryOperationLog> {
331        let key_bytes = parse_fixed_hex::<32>(PRIV_HEX).unwrap();
332        let signing_key = SigningKey::from_bytes(&key_bytes);
333        let verifying_key = signing_key.verifying_key();
334
335        let mut policy = IntegrityPolicyGate::new();
336        policy.register_device("lift-01", verifying_key);
337        IngestService::new(
338            policy,
339            InMemoryRawDataStore::default(),
340            InMemoryAuditLedger::default(),
341            InMemoryOperationLog::default(),
342        )
343    }
344
345    fn demo_entries() -> Vec<(AuditRecord, Vec<u8>)> {
346        build_lift_inspection_demo_records_with_payloads(
347            "lift-01",
348            PRIV_HEX,
349            1_700_000_000_000,
350            "s3://bucket/lift-01",
351        )
352        .unwrap()
353    }
354
355    #[test]
356    fn push_increases_len() {
357        let mut buf: OfflineBuffer<InMemoryBufferStore> =
358            OfflineBuffer::new(InMemoryBufferStore::new());
359        assert_eq!(buf.len().unwrap(), 0);
360
361        let pairs = demo_entries();
362        buf.push(pairs[0].0.clone(), pairs[0].1.clone()).unwrap();
363        assert_eq!(buf.len().unwrap(), 1);
364        buf.push(pairs[1].0.clone(), pairs[1].1.clone()).unwrap();
365        assert_eq!(buf.len().unwrap(), 2);
366    }
367
368    #[test]
369    fn flush_all_accepted_empties_buffer() {
370        let mut buf: OfflineBuffer<InMemoryBufferStore> =
371            OfflineBuffer::new(InMemoryBufferStore::new());
372        let pairs = demo_entries();
373        for (r, p) in &pairs {
374            buf.push(r.clone(), p.clone()).unwrap();
375        }
376
377        let mut svc = make_service();
378        let report = buf.flush(&mut svc).unwrap();
379
380        assert_eq!(report.accepted, pairs.len());
381        assert_eq!(report.remaining, 0);
382        assert_eq!(buf.len().unwrap(), 0);
383    }
384
385    #[test]
386    fn flush_duplicate_records_counted_as_accepted() {
387        let mut buf: OfflineBuffer<InMemoryBufferStore> =
388            OfflineBuffer::new(InMemoryBufferStore::new());
389        let pairs = demo_entries();
390        for (r, p) in &pairs {
391            buf.push(r.clone(), p.clone()).unwrap();
392        }
393
394        let mut svc = make_service();
395        // First flush delivers all records.
396        let _ = buf.flush(&mut svc).unwrap();
397
398        // Re-push the same records — they are now duplicates.
399        for (r, p) in &pairs {
400            buf.push(r.clone(), p.clone()).unwrap();
401        }
402        let report = buf.flush(&mut svc).unwrap();
403        assert_eq!(report.accepted, pairs.len(), "duplicates should be counted as accepted");
404        assert_eq!(report.remaining, 0);
405    }
406
407    #[test]
408    fn flush_stops_on_unknown_device_error() {
409        // Build a record for a device that is NOT registered in the service.
410        let pairs = build_lift_inspection_demo_records_with_payloads(
411            "unknown-device",
412            PRIV_HEX,
413            1_700_000_000_000,
414            "s3://bucket/unknown",
415        )
416        .unwrap();
417
418        let mut buf: OfflineBuffer<InMemoryBufferStore> =
419            OfflineBuffer::new(InMemoryBufferStore::new());
420        buf.push(pairs[0].0.clone(), pairs[0].1.clone()).unwrap();
421
422        let mut svc = make_service(); // "unknown-device" is not registered
423        let result = buf.flush(&mut svc);
424        assert!(result.is_err(), "flush must fail when device is unknown");
425        // The record should still be in the buffer.
426        assert_eq!(buf.len().unwrap(), 1);
427    }
428
429    #[test]
430    fn in_memory_store_drop_oldest_removes_entries() {
431        let mut store = InMemoryBufferStore::new();
432        let pairs = demo_entries();
433        for (r, p) in &pairs {
434            store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
435        }
436        assert_eq!(store.len().unwrap(), 3);
437        store.drop_oldest(2).unwrap();
438        assert_eq!(store.len().unwrap(), 1);
439        let remaining = store.entries().unwrap();
440        assert_eq!(remaining[0].record.sequence, pairs[2].0.sequence);
441    }
442
443    #[test]
444    fn in_memory_store_clear_empties() {
445        let mut store = InMemoryBufferStore::new();
446        let pairs = demo_entries();
447        for (r, p) in &pairs {
448            store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
449        }
450        store.clear().unwrap();
451        assert_eq!(store.len().unwrap(), 0);
452    }
453
454    #[cfg(feature = "buffer-sqlite")]
455    mod sqlite_tests {
456        use super::*;
457        use crate::buffer::sqlite::SqliteBufferStore;
458        use std::{fs, path::PathBuf};
459
460        fn tmp_db_path(name: &str) -> PathBuf {
461            let mut p = std::env::temp_dir();
462            p.push(format!("edgesentry_buf_{}_{name}.db", std::process::id()));
463            p
464        }
465
466        struct TmpFile(PathBuf);
467        impl Drop for TmpFile {
468            fn drop(&mut self) {
469                let _ = fs::remove_file(&self.0);
470            }
471        }
472
473        #[test]
474        fn sqlite_store_roundtrip() {
475            let mut store = SqliteBufferStore::open_in_memory().unwrap();
476            let pairs = demo_entries();
477            for (r, p) in &pairs {
478                store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
479            }
480            assert_eq!(store.len().unwrap(), 3);
481            let entries = store.entries().unwrap();
482            assert_eq!(entries[0].record.sequence, 1);
483            assert_eq!(entries[2].record.sequence, 3);
484        }
485
486        #[test]
487        fn sqlite_flush_works() {
488            let store = SqliteBufferStore::open_in_memory().unwrap();
489            let mut buf = OfflineBuffer::new(store);
490            let pairs = demo_entries();
491            for (r, p) in &pairs {
492                buf.push(r.clone(), p.clone()).unwrap();
493            }
494            let mut svc = make_service();
495            let report = buf.flush(&mut svc).unwrap();
496            assert_eq!(report.accepted, 3);
497            assert_eq!(report.remaining, 0);
498        }
499
500        /// Records written to a file-backed store survive handle drop and re-open.
501        #[test]
502        fn sqlite_records_survive_handle_drop_and_reopen() {
503            let path = tmp_db_path("reopen");
504            let _guard = TmpFile(path.clone());
505            let path_str = path.to_str().unwrap();
506
507            let pairs = demo_entries();
508
509            // Write phase: push 3 records then drop the handle.
510            {
511                let mut store = SqliteBufferStore::open(path_str).unwrap();
512                for (r, p) in &pairs {
513                    store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
514                }
515                assert_eq!(store.len().unwrap(), 3);
516                // `store` is dropped here, closing the SQLite connection.
517            }
518
519            // Read phase: re-open from the same file and verify all records are present.
520            {
521                let store = SqliteBufferStore::open(path_str).unwrap();
522                assert_eq!(store.len().unwrap(), 3, "all records must survive handle drop");
523
524                let entries = store.entries().unwrap();
525                assert_eq!(entries.len(), 3);
526                for (i, (original, entry)) in pairs.iter().zip(entries.iter()).enumerate() {
527                    assert_eq!(
528                        entry.record.sequence, original.0.sequence,
529                        "sequence mismatch at index {i}"
530                    );
531                    assert_eq!(
532                        entry.record.device_id, original.0.device_id,
533                        "device_id mismatch at index {i}"
534                    );
535                    assert_eq!(
536                        entry.raw_payload, original.1,
537                        "raw_payload mismatch at index {i}"
538                    );
539                }
540            }
541        }
542
543        /// `drop_oldest` removes rows durably — they are gone after re-open.
544        #[test]
545        fn sqlite_drop_oldest_is_durable() {
546            let path = tmp_db_path("drop_oldest");
547            let _guard = TmpFile(path.clone());
548            let path_str = path.to_str().unwrap();
549
550            let pairs = demo_entries();
551
552            // Write 3 records then drop the 2 oldest.
553            {
554                let mut store = SqliteBufferStore::open(path_str).unwrap();
555                for (r, p) in &pairs {
556                    store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
557                }
558                store.drop_oldest(2).unwrap();
559                assert_eq!(store.len().unwrap(), 1);
560            }
561
562            // Re-open: only 1 record should remain, and it must be the last one.
563            {
564                let store = SqliteBufferStore::open(path_str).unwrap();
565                assert_eq!(store.len().unwrap(), 1, "only the youngest record must remain");
566                let entries = store.entries().unwrap();
567                assert_eq!(entries[0].record.sequence, pairs[2].0.sequence);
568            }
569        }
570
571        /// `clear` removes all rows durably — the table is empty after re-open.
572        #[test]
573        fn sqlite_clear_is_durable() {
574            let path = tmp_db_path("clear");
575            let _guard = TmpFile(path.clone());
576            let path_str = path.to_str().unwrap();
577
578            let pairs = demo_entries();
579
580            // Write 3 records then clear.
581            {
582                let mut store = SqliteBufferStore::open(path_str).unwrap();
583                for (r, p) in &pairs {
584                    store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
585                }
586                store.clear().unwrap();
587                assert_eq!(store.len().unwrap(), 0);
588            }
589
590            // Re-open: buffer must still be empty.
591            {
592                let store = SqliteBufferStore::open(path_str).unwrap();
593                assert_eq!(store.len().unwrap(), 0, "clear must persist across re-open");
594            }
595        }
596
597        /// Records are replayed in insertion order (oldest-first) after re-open.
598        #[test]
599        fn sqlite_insertion_order_preserved_across_reopen() {
600            let path = tmp_db_path("order");
601            let _guard = TmpFile(path.clone());
602            let path_str = path.to_str().unwrap();
603
604            let pairs = demo_entries();
605
606            {
607                let mut store = SqliteBufferStore::open(path_str).unwrap();
608                // Push in reverse order to confirm ordering is by insertion, not by sequence.
609                for (r, p) in pairs.iter().rev() {
610                    store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
611                }
612            }
613
614            {
615                let store = SqliteBufferStore::open(path_str).unwrap();
616                let entries = store.entries().unwrap();
617                assert_eq!(entries.len(), 3);
618                // Insertion was 3, 2, 1 — entries must come back in that same order.
619                assert_eq!(entries[0].record.sequence, pairs[2].0.sequence);
620                assert_eq!(entries[1].record.sequence, pairs[1].0.sequence);
621                assert_eq!(entries[2].record.sequence, pairs[0].0.sequence);
622            }
623        }
624    }
625}