1use thiserror::Error;
21
22use crate::record::AuditRecord;
23use crate::ingest::{IngestService, IngestServiceError, RawDataStore, AuditLedger, OperationLogStore};
24use crate::ingest::IngestError;
25
26#[derive(Debug, Clone)]
28pub struct BufferedEntry {
29 pub record: AuditRecord,
30 pub raw_payload: Vec<u8>,
31}
32
33pub trait BufferStore {
35 type Error: std::error::Error + Send + Sync + 'static;
36
37 fn push(&mut self, entry: BufferedEntry) -> Result<(), Self::Error>;
39
40 fn entries(&self) -> Result<Vec<BufferedEntry>, Self::Error>;
42
43 fn drop_oldest(&mut self, n: usize) -> Result<(), Self::Error>;
45
46 fn clear(&mut self) -> Result<(), Self::Error>;
48
49 fn len(&self) -> Result<usize, Self::Error>;
51
52 fn is_empty(&self) -> Result<bool, Self::Error> {
54 Ok(self.len()? == 0)
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct FlushReport {
61 pub accepted: usize,
63 pub remaining: usize,
65}
66
67#[derive(Debug, Error)]
69pub enum FlushError<SE>
70where
71 SE: std::error::Error + Send + Sync + 'static,
72{
73 #[error("buffer store error: {0}")]
74 Store(SE),
75 #[error("ingest service error: {0}")]
76 Ingest(IngestServiceError),
77}
78
79pub struct OfflineBuffer<S: BufferStore> {
82 store: S,
83}
84
85impl<S: BufferStore> OfflineBuffer<S> {
86 pub fn new(store: S) -> Self {
88 Self { store }
89 }
90
91 pub fn push(&mut self, record: AuditRecord, raw_payload: Vec<u8>) -> Result<(), S::Error> {
93 self.store.push(BufferedEntry { record, raw_payload })
94 }
95
96 pub fn len(&self) -> Result<usize, S::Error> {
98 self.store.len()
99 }
100
101 pub fn is_empty(&self) -> Result<bool, S::Error> {
103 self.store.is_empty()
104 }
105
106 pub fn flush<R, L, O>(
112 &mut self,
113 service: &mut IngestService<R, L, O>,
114 ) -> Result<FlushReport, FlushError<S::Error>>
115 where
116 R: RawDataStore,
117 L: AuditLedger,
118 O: OperationLogStore,
119 {
120 let entries = self.store.entries().map_err(FlushError::Store)?;
121 let total = entries.len();
122 let mut accepted = 0usize;
123
124 for entry in &entries {
125 match service.ingest(entry.record.clone(), &entry.raw_payload, None) {
126 Ok(()) => {
127 accepted += 1;
128 }
129 Err(IngestServiceError::Verify(IngestError::Duplicate { .. })) => {
130 accepted += 1;
132 }
133 Err(other) => {
134 self.store.drop_oldest(accepted).map_err(FlushError::Store)?;
136 return Err(FlushError::Ingest(other));
137 }
138 }
139 }
140
141 self.store.drop_oldest(accepted).map_err(FlushError::Store)?;
142 let remaining = total.saturating_sub(accepted);
143 Ok(FlushReport { accepted, remaining })
144 }
145}
146
147#[derive(Default)]
153pub struct InMemoryBufferStore {
154 entries: Vec<BufferedEntry>,
155}
156
157impl InMemoryBufferStore {
158 pub fn new() -> Self {
159 Self::default()
160 }
161}
162
163#[derive(Debug, Error)]
164#[error("in-memory buffer store error: {message}")]
165pub struct InMemoryBufferStoreError {
166 message: String,
167}
168
169impl BufferStore for InMemoryBufferStore {
170 type Error = InMemoryBufferStoreError;
171
172 fn push(&mut self, entry: BufferedEntry) -> Result<(), Self::Error> {
173 self.entries.push(entry);
174 Ok(())
175 }
176
177 fn entries(&self) -> Result<Vec<BufferedEntry>, Self::Error> {
178 Ok(self.entries.clone())
179 }
180
181 fn drop_oldest(&mut self, n: usize) -> Result<(), Self::Error> {
182 let drain = n.min(self.entries.len());
183 self.entries.drain(..drain);
184 Ok(())
185 }
186
187 fn clear(&mut self) -> Result<(), Self::Error> {
188 self.entries.clear();
189 Ok(())
190 }
191
192 fn len(&self) -> Result<usize, Self::Error> {
193 Ok(self.entries.len())
194 }
195}
196
197#[cfg(feature = "buffer-sqlite")]
202pub mod sqlite {
203 use super::{BufferedEntry, BufferStore};
204 use crate::record::AuditRecord;
205 use thiserror::Error;
206
207 #[derive(Debug, Error)]
208 pub enum SqliteBufferStoreError {
209 #[error("sqlite error: {0}")]
210 Sqlite(#[from] rusqlite::Error),
211 #[error("deserialize error: {0}")]
212 Deserialize(String),
213 }
214
215 pub struct SqliteBufferStore {
219 conn: rusqlite::Connection,
220 }
221
222 impl SqliteBufferStore {
223 pub fn open(path: &str) -> Result<Self, SqliteBufferStoreError> {
225 let conn = rusqlite::Connection::open(path)?;
226 conn.execute_batch(
227 "CREATE TABLE IF NOT EXISTS buffer (
228 id INTEGER PRIMARY KEY AUTOINCREMENT,
229 record_json TEXT NOT NULL,
230 payload_hex TEXT NOT NULL
231 );",
232 )?;
233 Ok(Self { conn })
234 }
235
236 pub fn open_in_memory() -> Result<Self, SqliteBufferStoreError> {
238 let conn = rusqlite::Connection::open_in_memory()?;
239 conn.execute_batch(
240 "CREATE TABLE IF NOT EXISTS buffer (
241 id INTEGER PRIMARY KEY AUTOINCREMENT,
242 record_json TEXT NOT NULL,
243 payload_hex TEXT NOT NULL
244 );",
245 )?;
246 Ok(Self { conn })
247 }
248 }
249
250 impl BufferStore for SqliteBufferStore {
251 type Error = SqliteBufferStoreError;
252
253 fn push(&mut self, entry: BufferedEntry) -> Result<(), Self::Error> {
254 let record_json = serde_json::to_string(&entry.record)
255 .map_err(|e| SqliteBufferStoreError::Deserialize(e.to_string()))?;
256 let payload_hex = hex::encode(&entry.raw_payload);
257 self.conn.execute(
258 "INSERT INTO buffer (record_json, payload_hex) VALUES (?1, ?2)",
259 rusqlite::params![record_json, payload_hex],
260 )?;
261 Ok(())
262 }
263
264 fn entries(&self) -> Result<Vec<BufferedEntry>, Self::Error> {
265 let mut stmt = self
266 .conn
267 .prepare("SELECT record_json, payload_hex FROM buffer ORDER BY id ASC")?;
268 let rows = stmt.query_map([], |row| {
269 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
270 })?;
271
272 let mut entries = Vec::new();
273 for row in rows {
274 let (record_json, payload_hex) = row?;
275 let record: AuditRecord = serde_json::from_str(&record_json)
276 .map_err(|e| SqliteBufferStoreError::Deserialize(e.to_string()))?;
277 let raw_payload = hex::decode(&payload_hex)
278 .map_err(|e| SqliteBufferStoreError::Deserialize(e.to_string()))?;
279 entries.push(BufferedEntry { record, raw_payload });
280 }
281 Ok(entries)
282 }
283
284 fn drop_oldest(&mut self, n: usize) -> Result<(), Self::Error> {
285 if n == 0 {
286 return Ok(());
287 }
288 self.conn.execute(
289 "DELETE FROM buffer WHERE id IN (
290 SELECT id FROM buffer ORDER BY id ASC LIMIT ?1
291 )",
292 rusqlite::params![n as i64],
293 )?;
294 Ok(())
295 }
296
297 fn clear(&mut self) -> Result<(), Self::Error> {
298 self.conn.execute("DELETE FROM buffer", [])?;
299 Ok(())
300 }
301
302 fn len(&self) -> Result<usize, Self::Error> {
303 let count: i64 =
304 self.conn.query_row("SELECT COUNT(*) FROM buffer", [], |r| r.get(0))?;
305 Ok(count as usize)
306 }
307 }
308}
309
310#[cfg(test)]
315mod tests {
316 use super::*;
317 use crate::{
318 build_lift_inspection_demo_records_with_payloads,
319 ingest::{
320 InMemoryAuditLedger, InMemoryOperationLog, InMemoryRawDataStore,
321 IngestService, IntegrityPolicyGate,
322 },
323 parse_fixed_hex,
324 record::AuditRecord,
325 };
326 use ed25519_dalek::SigningKey;
327
328 const PRIV_HEX: &str = "0101010101010101010101010101010101010101010101010101010101010101";
329
330 fn make_service() -> IngestService<InMemoryRawDataStore, InMemoryAuditLedger, InMemoryOperationLog> {
331 let key_bytes = parse_fixed_hex::<32>(PRIV_HEX).unwrap();
332 let signing_key = SigningKey::from_bytes(&key_bytes);
333 let verifying_key = signing_key.verifying_key();
334
335 let mut policy = IntegrityPolicyGate::new();
336 policy.register_device("lift-01", verifying_key);
337 IngestService::new(
338 policy,
339 InMemoryRawDataStore::default(),
340 InMemoryAuditLedger::default(),
341 InMemoryOperationLog::default(),
342 )
343 }
344
345 fn demo_entries() -> Vec<(AuditRecord, Vec<u8>)> {
346 build_lift_inspection_demo_records_with_payloads(
347 "lift-01",
348 PRIV_HEX,
349 1_700_000_000_000,
350 "s3://bucket/lift-01",
351 )
352 .unwrap()
353 }
354
355 #[test]
356 fn push_increases_len() {
357 let mut buf: OfflineBuffer<InMemoryBufferStore> =
358 OfflineBuffer::new(InMemoryBufferStore::new());
359 assert_eq!(buf.len().unwrap(), 0);
360
361 let pairs = demo_entries();
362 buf.push(pairs[0].0.clone(), pairs[0].1.clone()).unwrap();
363 assert_eq!(buf.len().unwrap(), 1);
364 buf.push(pairs[1].0.clone(), pairs[1].1.clone()).unwrap();
365 assert_eq!(buf.len().unwrap(), 2);
366 }
367
368 #[test]
369 fn flush_all_accepted_empties_buffer() {
370 let mut buf: OfflineBuffer<InMemoryBufferStore> =
371 OfflineBuffer::new(InMemoryBufferStore::new());
372 let pairs = demo_entries();
373 for (r, p) in &pairs {
374 buf.push(r.clone(), p.clone()).unwrap();
375 }
376
377 let mut svc = make_service();
378 let report = buf.flush(&mut svc).unwrap();
379
380 assert_eq!(report.accepted, pairs.len());
381 assert_eq!(report.remaining, 0);
382 assert_eq!(buf.len().unwrap(), 0);
383 }
384
385 #[test]
386 fn flush_duplicate_records_counted_as_accepted() {
387 let mut buf: OfflineBuffer<InMemoryBufferStore> =
388 OfflineBuffer::new(InMemoryBufferStore::new());
389 let pairs = demo_entries();
390 for (r, p) in &pairs {
391 buf.push(r.clone(), p.clone()).unwrap();
392 }
393
394 let mut svc = make_service();
395 let _ = buf.flush(&mut svc).unwrap();
397
398 for (r, p) in &pairs {
400 buf.push(r.clone(), p.clone()).unwrap();
401 }
402 let report = buf.flush(&mut svc).unwrap();
403 assert_eq!(report.accepted, pairs.len(), "duplicates should be counted as accepted");
404 assert_eq!(report.remaining, 0);
405 }
406
407 #[test]
408 fn flush_stops_on_unknown_device_error() {
409 let pairs = build_lift_inspection_demo_records_with_payloads(
411 "unknown-device",
412 PRIV_HEX,
413 1_700_000_000_000,
414 "s3://bucket/unknown",
415 )
416 .unwrap();
417
418 let mut buf: OfflineBuffer<InMemoryBufferStore> =
419 OfflineBuffer::new(InMemoryBufferStore::new());
420 buf.push(pairs[0].0.clone(), pairs[0].1.clone()).unwrap();
421
422 let mut svc = make_service(); let result = buf.flush(&mut svc);
424 assert!(result.is_err(), "flush must fail when device is unknown");
425 assert_eq!(buf.len().unwrap(), 1);
427 }
428
429 #[test]
430 fn in_memory_store_drop_oldest_removes_entries() {
431 let mut store = InMemoryBufferStore::new();
432 let pairs = demo_entries();
433 for (r, p) in &pairs {
434 store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
435 }
436 assert_eq!(store.len().unwrap(), 3);
437 store.drop_oldest(2).unwrap();
438 assert_eq!(store.len().unwrap(), 1);
439 let remaining = store.entries().unwrap();
440 assert_eq!(remaining[0].record.sequence, pairs[2].0.sequence);
441 }
442
443 #[test]
444 fn in_memory_store_clear_empties() {
445 let mut store = InMemoryBufferStore::new();
446 let pairs = demo_entries();
447 for (r, p) in &pairs {
448 store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
449 }
450 store.clear().unwrap();
451 assert_eq!(store.len().unwrap(), 0);
452 }
453
454 #[cfg(feature = "buffer-sqlite")]
455 mod sqlite_tests {
456 use super::*;
457 use crate::buffer::sqlite::SqliteBufferStore;
458 use std::{fs, path::PathBuf};
459
460 fn tmp_db_path(name: &str) -> PathBuf {
461 let mut p = std::env::temp_dir();
462 p.push(format!("edgesentry_buf_{}_{name}.db", std::process::id()));
463 p
464 }
465
466 struct TmpFile(PathBuf);
467 impl Drop for TmpFile {
468 fn drop(&mut self) {
469 let _ = fs::remove_file(&self.0);
470 }
471 }
472
473 #[test]
474 fn sqlite_store_roundtrip() {
475 let mut store = SqliteBufferStore::open_in_memory().unwrap();
476 let pairs = demo_entries();
477 for (r, p) in &pairs {
478 store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
479 }
480 assert_eq!(store.len().unwrap(), 3);
481 let entries = store.entries().unwrap();
482 assert_eq!(entries[0].record.sequence, 1);
483 assert_eq!(entries[2].record.sequence, 3);
484 }
485
486 #[test]
487 fn sqlite_flush_works() {
488 let store = SqliteBufferStore::open_in_memory().unwrap();
489 let mut buf = OfflineBuffer::new(store);
490 let pairs = demo_entries();
491 for (r, p) in &pairs {
492 buf.push(r.clone(), p.clone()).unwrap();
493 }
494 let mut svc = make_service();
495 let report = buf.flush(&mut svc).unwrap();
496 assert_eq!(report.accepted, 3);
497 assert_eq!(report.remaining, 0);
498 }
499
500 #[test]
502 fn sqlite_records_survive_handle_drop_and_reopen() {
503 let path = tmp_db_path("reopen");
504 let _guard = TmpFile(path.clone());
505 let path_str = path.to_str().unwrap();
506
507 let pairs = demo_entries();
508
509 {
511 let mut store = SqliteBufferStore::open(path_str).unwrap();
512 for (r, p) in &pairs {
513 store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
514 }
515 assert_eq!(store.len().unwrap(), 3);
516 }
518
519 {
521 let store = SqliteBufferStore::open(path_str).unwrap();
522 assert_eq!(store.len().unwrap(), 3, "all records must survive handle drop");
523
524 let entries = store.entries().unwrap();
525 assert_eq!(entries.len(), 3);
526 for (i, (original, entry)) in pairs.iter().zip(entries.iter()).enumerate() {
527 assert_eq!(
528 entry.record.sequence, original.0.sequence,
529 "sequence mismatch at index {i}"
530 );
531 assert_eq!(
532 entry.record.device_id, original.0.device_id,
533 "device_id mismatch at index {i}"
534 );
535 assert_eq!(
536 entry.raw_payload, original.1,
537 "raw_payload mismatch at index {i}"
538 );
539 }
540 }
541 }
542
543 #[test]
545 fn sqlite_drop_oldest_is_durable() {
546 let path = tmp_db_path("drop_oldest");
547 let _guard = TmpFile(path.clone());
548 let path_str = path.to_str().unwrap();
549
550 let pairs = demo_entries();
551
552 {
554 let mut store = SqliteBufferStore::open(path_str).unwrap();
555 for (r, p) in &pairs {
556 store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
557 }
558 store.drop_oldest(2).unwrap();
559 assert_eq!(store.len().unwrap(), 1);
560 }
561
562 {
564 let store = SqliteBufferStore::open(path_str).unwrap();
565 assert_eq!(store.len().unwrap(), 1, "only the youngest record must remain");
566 let entries = store.entries().unwrap();
567 assert_eq!(entries[0].record.sequence, pairs[2].0.sequence);
568 }
569 }
570
571 #[test]
573 fn sqlite_clear_is_durable() {
574 let path = tmp_db_path("clear");
575 let _guard = TmpFile(path.clone());
576 let path_str = path.to_str().unwrap();
577
578 let pairs = demo_entries();
579
580 {
582 let mut store = SqliteBufferStore::open(path_str).unwrap();
583 for (r, p) in &pairs {
584 store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
585 }
586 store.clear().unwrap();
587 assert_eq!(store.len().unwrap(), 0);
588 }
589
590 {
592 let store = SqliteBufferStore::open(path_str).unwrap();
593 assert_eq!(store.len().unwrap(), 0, "clear must persist across re-open");
594 }
595 }
596
597 #[test]
599 fn sqlite_insertion_order_preserved_across_reopen() {
600 let path = tmp_db_path("order");
601 let _guard = TmpFile(path.clone());
602 let path_str = path.to_str().unwrap();
603
604 let pairs = demo_entries();
605
606 {
607 let mut store = SqliteBufferStore::open(path_str).unwrap();
608 for (r, p) in pairs.iter().rev() {
610 store.push(BufferedEntry { record: r.clone(), raw_payload: p.clone() }).unwrap();
611 }
612 }
613
614 {
615 let store = SqliteBufferStore::open(path_str).unwrap();
616 let entries = store.entries().unwrap();
617 assert_eq!(entries.len(), 3);
618 assert_eq!(entries[0].record.sequence, pairs[2].0.sequence);
620 assert_eq!(entries[1].record.sequence, pairs[1].0.sequence);
621 assert_eq!(entries[2].record.sequence, pairs[0].0.sequence);
622 }
623 }
624 }
625}