Skip to main content

moire_runtime/
api.rs

1use moire_types::{
2    CutAck, CutId, Edge, Entity, Event, PullChangesResponse, Scope, SeqNo, StreamCursor,
3};
4
5use super::db::{runtime_db, runtime_stream_id};
6
7pub trait SnapshotSink {
8    fn entity(&mut self, entity: &Entity);
9    fn scope(&mut self, _scope: &Scope) {}
10    fn edge(&mut self, edge: &Edge);
11    fn event(&mut self, event: &Event);
12}
13
14pub fn write_snapshot_to<S>(sink: &mut S)
15where
16    S: SnapshotSink,
17{
18    let Ok(db) = runtime_db().lock() else {
19        return;
20    };
21    for entity in db.entities.values() {
22        sink.entity(entity);
23    }
24    for scope in db.scopes.values() {
25        sink.scope(scope);
26    }
27    for edge in db.edges.values() {
28        sink.edge(edge);
29    }
30    for event in &db.events {
31        sink.event(event);
32    }
33}
34
35pub fn pull_changes_since(from_seq_no: SeqNo, max_changes: u32) -> PullChangesResponse {
36    let stream_id = runtime_stream_id();
37    let Ok(db) = runtime_db().lock() else {
38        return PullChangesResponse {
39            stream_id,
40            from_seq_no,
41            next_seq_no: from_seq_no,
42            changes: Vec::new(),
43            truncated: false,
44            compacted_before_seq_no: None,
45        };
46    };
47    db.pull_changes_since(from_seq_no, max_changes)
48}
49
50pub fn current_cursor() -> StreamCursor {
51    let stream_id = runtime_stream_id();
52    let Ok(db) = runtime_db().lock() else {
53        return StreamCursor {
54            stream_id,
55            next_seq_no: SeqNo::ZERO,
56        };
57    };
58    db.current_cursor()
59}
60
61pub fn ack_cut(cut_id: CutId) -> CutAck {
62    CutAck {
63        cut_id,
64        cursor: current_cursor(),
65    }
66}