1use moire_types::{
2 CutAck, CutId, Edge, Entity, Event, PullChangesResponse, Scope, SeqNo, StreamCursor,
3};
4
5use super::db::{runtime_db, runtime_stream_id};
6
7pub trait SnapshotSink {
8 fn entity(&mut self, entity: &Entity);
9 fn scope(&mut self, _scope: &Scope) {}
10 fn edge(&mut self, edge: &Edge);
11 fn event(&mut self, event: &Event);
12}
13
14pub fn write_snapshot_to<S>(sink: &mut S)
15where
16 S: SnapshotSink,
17{
18 let Ok(db) = runtime_db().lock() else {
19 return;
20 };
21 for entity in db.entities.values() {
22 sink.entity(entity);
23 }
24 for scope in db.scopes.values() {
25 sink.scope(scope);
26 }
27 for edge in db.edges.values() {
28 sink.edge(edge);
29 }
30 for event in &db.events {
31 sink.event(event);
32 }
33}
34
35pub fn pull_changes_since(from_seq_no: SeqNo, max_changes: u32) -> PullChangesResponse {
36 let stream_id = runtime_stream_id();
37 let Ok(db) = runtime_db().lock() else {
38 return PullChangesResponse {
39 stream_id,
40 from_seq_no,
41 next_seq_no: from_seq_no,
42 changes: Vec::new(),
43 truncated: false,
44 compacted_before_seq_no: None,
45 };
46 };
47 db.pull_changes_since(from_seq_no, max_changes)
48}
49
50pub fn current_cursor() -> StreamCursor {
51 let stream_id = runtime_stream_id();
52 let Ok(db) = runtime_db().lock() else {
53 return StreamCursor {
54 stream_id,
55 next_seq_no: SeqNo::ZERO,
56 };
57 };
58 db.current_cursor()
59}
60
61pub fn ack_cut(cut_id: CutId) -> CutAck {
62 CutAck {
63 cut_id,
64 cursor: current_cursor(),
65 }
66}