modality_ingest_protocol/
protocol.rs

1use minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
2use modality_api::{AttrVal, TimelineId};
3
4pub use modality_api::protocol::{TAG_LOGICAL_TIME, TAG_NS, TAG_TIMELINE_ID};
5
6#[cfg_attr(feature = "client", derive(Decode))]
7#[derive(Debug)]
8pub enum IngestResponse {
9    #[n(1)]
10    AuthResponse {
11        #[n(0)]
12        ok: bool,
13
14        #[n(1)]
15        message: Option<String>,
16    },
17
18    #[n(2)]
19    UnauthenticatedResponse {},
20
21    #[n(101)]
22    IngestStatusResponse {
23        #[n(0)]
24        current_timeline: Option<TimelineId>,
25
26        #[n(1)]
27        events_received: u64,
28
29        #[n(2)]
30        events_written: u64,
31
32        #[n(3)]
33        events_pending: u64,
34    },
35}
36
37#[cfg_attr(feature = "client", derive(Encode))]
38#[derive(Debug)]
39pub enum IngestMessage {
40    #[n(0)]
41    AuthRequest {
42        #[n(0)]
43        token: Vec<u8>,
44    },
45
46    #[n(100)]
47    IngestStatusRequest {},
48
49    #[n(102)]
50    /// An advisory message, asking the server to immediately write any pending events to disk.
51    Flush {},
52
53    #[n(110)]
54    DeclareAttrKey {
55        #[n(0)]
56        name: String,
57
58        #[n(1)]
59        wire_id: InternedAttrKey,
60    },
61
62    #[n(112)]
63    OpenTimeline {
64        #[n(0)]
65        id: TimelineId,
66    },
67
68    #[n(113)]
69    TimelineMetadata {
70        #[n(0)]
71        attrs: PackedAttrKvs<InternedAttrKey>,
72    },
73
74    #[n(114)]
75    Event {
76        #[n(0)]
77        be_ordering: Vec<u8>,
78
79        #[n(1)]
80        attrs: PackedAttrKvs<InternedAttrKey>,
81    },
82}
83
84/// The numeric representation of an `AttrKey` after it has been declared on a connection.
85#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
86#[repr(transparent)]
87pub struct InternedAttrKey(pub(crate) u32);
88
89impl From<u32> for InternedAttrKey {
90    fn from(i: u32) -> Self {
91        InternedAttrKey(i)
92    }
93}
94
95impl From<InternedAttrKey> for u32 {
96    fn from(i: InternedAttrKey) -> u32 {
97        i.0
98    }
99}
100
101impl Encode for InternedAttrKey {
102    fn encode<W: encode::Write>(&self, e: &mut Encoder<W>) -> Result<(), encode::Error<W::Error>> {
103        e.u32(self.0)?;
104        Ok(())
105    }
106}
107
108impl<'b> Decode<'b> for InternedAttrKey {
109    fn decode(d: &mut Decoder<'b>) -> Result<Self, decode::Error> {
110        let i = d.u32()?;
111        Ok(i.into())
112    }
113}
114
115/// A way to bundle together attr kvs for transport purposes.  The 'u32' is meant to represent an
116/// attr key, where the name->number mapping is defined elsewhere in the protocol.
117///
118/// These are encoded in cbor in a 'reasonably compact' way: an array of alternating u32 key and
119/// AttrVals.
120#[derive(Debug)]
121pub struct PackedAttrKvs<K: Into<u32> + Copy + std::fmt::Debug>(pub Vec<(K, AttrVal)>);
122
123impl<K: Into<u32> + Copy + std::fmt::Debug> Encode for PackedAttrKvs<K> {
124    fn encode<W: encode::Write>(&self, e: &mut Encoder<W>) -> Result<(), encode::Error<W::Error>> {
125        e.array((self.0.len() * 2) as u64)?;
126        for (k, v) in self.0.iter() {
127            e.u32((*k).into())?;
128            v.encode(e)?;
129        }
130
131        Ok(())
132    }
133}