Skip to main content

selium_atlas_protocol/
lib.rs

1//! Flatbuffers protocol helpers for the Atlas control plane.
2
3use flatbuffers::{FlatBufferBuilder, InvalidFlatbuffer};
4use thiserror::Error;
5
6/// Generated Flatbuffers bindings for the Atlas protocol.
7#[allow(missing_docs)]
8#[allow(warnings)]
9#[rustfmt::skip]
10pub mod fbs;
11/// URI parsing and Flatbuffers helpers.
12pub mod uri;
13
14use crate::fbs::atlas::{protocol as fb, uri as fb_uri};
15use crate::uri::Uri;
16
17/// Id type stored in the Atlas.
18pub type AtlasId = u64;
19
20const ATLAS_IDENTIFIER: &str = "ATLS";
21
22/// Atlas protocol message envelope.
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub enum Message {
25    /// Retrieve a single Atlas entry by URI.
26    GetRequest {
27        /// Correlation identifier supplied by the client.
28        request_id: u64,
29        /// URI for the lookup.
30        uri: Uri,
31        /// Shared handle for the response channel.
32        reply_channel: u64,
33    },
34    /// Insert or update an Atlas entry.
35    InsertRequest {
36        /// Correlation identifier supplied by the client.
37        request_id: u64,
38        /// URI to insert.
39        uri: Uri,
40        /// Value to store against the URI.
41        id: AtlasId,
42        /// Shared handle for the response channel.
43        reply_channel: u64,
44    },
45    /// Remove an Atlas entry by URI.
46    RemoveRequest {
47        /// Correlation identifier supplied by the client.
48        request_id: u64,
49        /// URI for the removal.
50        uri: Uri,
51        /// Shared handle for the response channel.
52        reply_channel: u64,
53    },
54    /// Lookup Atlas entries matching the supplied URI pattern.
55    LookupRequest {
56        /// Correlation identifier supplied by the client.
57        request_id: u64,
58        /// Pattern used to match stored URIs.
59        pattern: String,
60        /// Shared handle for the response channel.
61        reply_channel: u64,
62    },
63    /// Response to a get request.
64    ResponseGet {
65        /// Correlation identifier supplied by the client.
66        request_id: u64,
67        /// Stored id, if found.
68        id: AtlasId,
69        /// Whether the entry existed.
70        found: bool,
71    },
72    /// Response to a remove request.
73    ResponseRemove {
74        /// Correlation identifier supplied by the client.
75        request_id: u64,
76        /// Removed id, if found.
77        id: AtlasId,
78        /// Whether the entry existed.
79        found: bool,
80    },
81    /// Response to a lookup request.
82    ResponseLookup {
83        /// Correlation identifier supplied by the client.
84        request_id: u64,
85        /// Matching Atlas identifiers.
86        ids: Vec<AtlasId>,
87    },
88    /// Empty response acknowledging a request.
89    ResponseOk {
90        /// Correlation identifier supplied by the client.
91        request_id: u64,
92    },
93    /// Error response for a request.
94    ResponseError {
95        /// Correlation identifier supplied by the client.
96        request_id: u64,
97        /// Error message supplied by the Atlas service.
98        message: String,
99    },
100}
101
102/// Errors produced while encoding or decoding Atlas messages.
103#[derive(Debug, Error)]
104pub enum ProtocolError {
105    /// Flatbuffers payload failed to verify.
106    #[error("invalid flatbuffer: {0:?}")]
107    InvalidFlatbuffer(InvalidFlatbuffer),
108    /// Message payload was not present.
109    #[error("atlas message missing payload")]
110    MissingPayload,
111    /// Message payload type is unsupported.
112    #[error("unknown atlas payload type")]
113    UnknownPayload,
114    /// Atlas message identifier did not match.
115    #[error("invalid atlas message identifier")]
116    InvalidIdentifier,
117}
118
119impl From<InvalidFlatbuffer> for ProtocolError {
120    fn from(value: InvalidFlatbuffer) -> Self {
121        ProtocolError::InvalidFlatbuffer(value)
122    }
123}
124
125impl Message {
126    /// Return the request identifier associated with this message.
127    pub fn request_id(&self) -> u64 {
128        match self {
129            Message::GetRequest { request_id, .. }
130            | Message::InsertRequest { request_id, .. }
131            | Message::RemoveRequest { request_id, .. }
132            | Message::LookupRequest { request_id, .. }
133            | Message::ResponseGet { request_id, .. }
134            | Message::ResponseRemove { request_id, .. }
135            | Message::ResponseLookup { request_id, .. }
136            | Message::ResponseOk { request_id }
137            | Message::ResponseError { request_id, .. } => *request_id,
138        }
139    }
140}
141
142/// Encode an Atlas message to Flatbuffers bytes.
143pub fn encode_message(message: &Message) -> Result<Vec<u8>, ProtocolError> {
144    let mut builder = FlatBufferBuilder::new();
145    let (request_id, payload_type, payload) = match message {
146        Message::GetRequest {
147            request_id,
148            uri,
149            reply_channel,
150        } => {
151            let uri = encode_uri(&mut builder, uri);
152            let payload = fb::GetRequest::create(
153                &mut builder,
154                &fb::GetRequestArgs {
155                    uri: Some(uri),
156                    reply_channel: *reply_channel,
157                },
158            );
159            (
160                *request_id,
161                fb::AtlasPayload::GetRequest,
162                Some(payload.as_union_value()),
163            )
164        }
165        Message::InsertRequest {
166            request_id,
167            uri,
168            id,
169            reply_channel,
170        } => {
171            let uri = encode_uri(&mut builder, uri);
172            let payload = fb::InsertRequest::create(
173                &mut builder,
174                &fb::InsertRequestArgs {
175                    uri: Some(uri),
176                    id: *id,
177                    reply_channel: *reply_channel,
178                },
179            );
180            (
181                *request_id,
182                fb::AtlasPayload::InsertRequest,
183                Some(payload.as_union_value()),
184            )
185        }
186        Message::RemoveRequest {
187            request_id,
188            uri,
189            reply_channel,
190        } => {
191            let uri = encode_uri(&mut builder, uri);
192            let payload = fb::RemoveRequest::create(
193                &mut builder,
194                &fb::RemoveRequestArgs {
195                    uri: Some(uri),
196                    reply_channel: *reply_channel,
197                },
198            );
199            (
200                *request_id,
201                fb::AtlasPayload::RemoveRequest,
202                Some(payload.as_union_value()),
203            )
204        }
205        Message::LookupRequest {
206            request_id,
207            pattern,
208            reply_channel,
209        } => {
210            let pattern = builder.create_string(pattern);
211            let payload = fb::LookupRequest::create(
212                &mut builder,
213                &fb::LookupRequestArgs {
214                    pattern: Some(pattern),
215                    reply_channel: *reply_channel,
216                },
217            );
218            (
219                *request_id,
220                fb::AtlasPayload::LookupRequest,
221                Some(payload.as_union_value()),
222            )
223        }
224        Message::ResponseGet {
225            request_id,
226            id,
227            found,
228        } => {
229            let payload = fb::GetResponse::create(
230                &mut builder,
231                &fb::GetResponseArgs {
232                    id: *id,
233                    found: *found,
234                },
235            );
236            (
237                *request_id,
238                fb::AtlasPayload::GetResponse,
239                Some(payload.as_union_value()),
240            )
241        }
242        Message::ResponseRemove {
243            request_id,
244            id,
245            found,
246        } => {
247            let payload = fb::RemoveResponse::create(
248                &mut builder,
249                &fb::RemoveResponseArgs {
250                    id: *id,
251                    found: *found,
252                },
253            );
254            (
255                *request_id,
256                fb::AtlasPayload::RemoveResponse,
257                Some(payload.as_union_value()),
258            )
259        }
260        Message::ResponseLookup { request_id, ids } => {
261            let ids = builder.create_vector(ids);
262            let payload = fb::LookupResponse::create(
263                &mut builder,
264                &fb::LookupResponseArgs { ids: Some(ids) },
265            );
266            (
267                *request_id,
268                fb::AtlasPayload::LookupResponse,
269                Some(payload.as_union_value()),
270            )
271        }
272        Message::ResponseOk { request_id } => {
273            let payload = fb::OkResponse::create(&mut builder, &fb::OkResponseArgs {});
274            (
275                *request_id,
276                fb::AtlasPayload::OkResponse,
277                Some(payload.as_union_value()),
278            )
279        }
280        Message::ResponseError {
281            request_id,
282            message,
283        } => {
284            let message = builder.create_string(message);
285            let payload = fb::ErrorResponse::create(
286                &mut builder,
287                &fb::ErrorResponseArgs {
288                    message: Some(message),
289                },
290            );
291            (
292                *request_id,
293                fb::AtlasPayload::ErrorResponse,
294                Some(payload.as_union_value()),
295            )
296        }
297    };
298
299    let message = fb::AtlasMessage::create(
300        &mut builder,
301        &fb::AtlasMessageArgs {
302            request_id,
303            payload_type,
304            payload,
305        },
306    );
307    builder.finish(message, Some(ATLAS_IDENTIFIER));
308    Ok(builder.finished_data().to_vec())
309}
310
311/// Decode Flatbuffers bytes into an Atlas message.
312pub fn decode_message(bytes: &[u8]) -> Result<Message, ProtocolError> {
313    if !fb::atlas_message_buffer_has_identifier(bytes) {
314        return Err(ProtocolError::InvalidIdentifier);
315    }
316    let message = flatbuffers::root::<fb::AtlasMessage>(bytes)?;
317
318    match message.payload_type() {
319        fb::AtlasPayload::GetRequest => {
320            let req = message
321                .payload_as_get_request()
322                .ok_or(ProtocolError::MissingPayload)?;
323            let uri = decode_uri(req.uri().ok_or(ProtocolError::MissingPayload)?);
324            Ok(Message::GetRequest {
325                request_id: message.request_id(),
326                uri,
327                reply_channel: req.reply_channel(),
328            })
329        }
330        fb::AtlasPayload::InsertRequest => {
331            let req = message
332                .payload_as_insert_request()
333                .ok_or(ProtocolError::MissingPayload)?;
334            let uri = decode_uri(req.uri().ok_or(ProtocolError::MissingPayload)?);
335            Ok(Message::InsertRequest {
336                request_id: message.request_id(),
337                uri,
338                id: req.id(),
339                reply_channel: req.reply_channel(),
340            })
341        }
342        fb::AtlasPayload::RemoveRequest => {
343            let req = message
344                .payload_as_remove_request()
345                .ok_or(ProtocolError::MissingPayload)?;
346            let uri = decode_uri(req.uri().ok_or(ProtocolError::MissingPayload)?);
347            Ok(Message::RemoveRequest {
348                request_id: message.request_id(),
349                uri,
350                reply_channel: req.reply_channel(),
351            })
352        }
353        fb::AtlasPayload::LookupRequest => {
354            let req = message
355                .payload_as_lookup_request()
356                .ok_or(ProtocolError::MissingPayload)?;
357            let pattern = req
358                .pattern()
359                .ok_or(ProtocolError::MissingPayload)?
360                .to_string();
361            Ok(Message::LookupRequest {
362                request_id: message.request_id(),
363                pattern,
364                reply_channel: req.reply_channel(),
365            })
366        }
367        fb::AtlasPayload::GetResponse => {
368            let resp = message
369                .payload_as_get_response()
370                .ok_or(ProtocolError::MissingPayload)?;
371            Ok(Message::ResponseGet {
372                request_id: message.request_id(),
373                id: resp.id(),
374                found: resp.found(),
375            })
376        }
377        fb::AtlasPayload::RemoveResponse => {
378            let resp = message
379                .payload_as_remove_response()
380                .ok_or(ProtocolError::MissingPayload)?;
381            Ok(Message::ResponseRemove {
382                request_id: message.request_id(),
383                id: resp.id(),
384                found: resp.found(),
385            })
386        }
387        fb::AtlasPayload::LookupResponse => {
388            let resp = message
389                .payload_as_lookup_response()
390                .ok_or(ProtocolError::MissingPayload)?;
391            Ok(Message::ResponseLookup {
392                request_id: message.request_id(),
393                ids: decode_ids(resp.ids()),
394            })
395        }
396        fb::AtlasPayload::OkResponse => Ok(Message::ResponseOk {
397            request_id: message.request_id(),
398        }),
399        fb::AtlasPayload::ErrorResponse => {
400            let resp = message
401                .payload_as_error_response()
402                .ok_or(ProtocolError::MissingPayload)?;
403            let error_message = resp
404                .message()
405                .ok_or(ProtocolError::MissingPayload)?
406                .to_string();
407            Ok(Message::ResponseError {
408                request_id: message.request_id(),
409                message: error_message,
410            })
411        }
412        _ => Err(ProtocolError::UnknownPayload),
413    }
414}
415
416fn encode_uri<'bldr>(
417    builder: &mut FlatBufferBuilder<'bldr>,
418    uri: &Uri,
419) -> flatbuffers::WIPOffset<fb_uri::Uri<'bldr>> {
420    uri.write_flatbuffer(builder)
421}
422
423fn decode_uri(table: fb_uri::Uri<'_>) -> Uri {
424    Uri::from_flatbuffer_table(table)
425}
426
427fn decode_ids(ids: Option<flatbuffers::Vector<'_, AtlasId>>) -> Vec<AtlasId> {
428    let mut out = Vec::new();
429    if let Some(vec) = ids {
430        out.extend(vec.iter());
431    }
432    out
433}