1use flatbuffers::{FlatBufferBuilder, InvalidFlatbuffer};
4use thiserror::Error;
5
6#[allow(missing_docs)]
8#[allow(warnings)]
9#[rustfmt::skip]
10pub mod fbs;
11pub mod uri;
13
14use crate::fbs::atlas::{protocol as fb, uri as fb_uri};
15use crate::uri::Uri;
16
17pub type AtlasId = u64;
19
20const ATLAS_IDENTIFIER: &str = "ATLS";
21
22#[derive(Clone, Debug, PartialEq, Eq)]
24pub enum Message {
25 GetRequest {
27 request_id: u64,
29 uri: Uri,
31 reply_channel: u64,
33 },
34 InsertRequest {
36 request_id: u64,
38 uri: Uri,
40 id: AtlasId,
42 reply_channel: u64,
44 },
45 RemoveRequest {
47 request_id: u64,
49 uri: Uri,
51 reply_channel: u64,
53 },
54 LookupRequest {
56 request_id: u64,
58 pattern: String,
60 reply_channel: u64,
62 },
63 ResponseGet {
65 request_id: u64,
67 id: AtlasId,
69 found: bool,
71 },
72 ResponseRemove {
74 request_id: u64,
76 id: AtlasId,
78 found: bool,
80 },
81 ResponseLookup {
83 request_id: u64,
85 ids: Vec<AtlasId>,
87 },
88 ResponseOk {
90 request_id: u64,
92 },
93 ResponseError {
95 request_id: u64,
97 message: String,
99 },
100}
101
102#[derive(Debug, Error)]
104pub enum ProtocolError {
105 #[error("invalid flatbuffer: {0:?}")]
107 InvalidFlatbuffer(InvalidFlatbuffer),
108 #[error("atlas message missing payload")]
110 MissingPayload,
111 #[error("unknown atlas payload type")]
113 UnknownPayload,
114 #[error("invalid atlas message identifier")]
116 InvalidIdentifier,
117}
118
119impl From<InvalidFlatbuffer> for ProtocolError {
120 fn from(value: InvalidFlatbuffer) -> Self {
121 ProtocolError::InvalidFlatbuffer(value)
122 }
123}
124
125impl Message {
126 pub fn request_id(&self) -> u64 {
128 match self {
129 Message::GetRequest { request_id, .. }
130 | Message::InsertRequest { request_id, .. }
131 | Message::RemoveRequest { request_id, .. }
132 | Message::LookupRequest { request_id, .. }
133 | Message::ResponseGet { request_id, .. }
134 | Message::ResponseRemove { request_id, .. }
135 | Message::ResponseLookup { request_id, .. }
136 | Message::ResponseOk { request_id }
137 | Message::ResponseError { request_id, .. } => *request_id,
138 }
139 }
140}
141
142pub fn encode_message(message: &Message) -> Result<Vec<u8>, ProtocolError> {
144 let mut builder = FlatBufferBuilder::new();
145 let (request_id, payload_type, payload) = match message {
146 Message::GetRequest {
147 request_id,
148 uri,
149 reply_channel,
150 } => {
151 let uri = encode_uri(&mut builder, uri);
152 let payload = fb::GetRequest::create(
153 &mut builder,
154 &fb::GetRequestArgs {
155 uri: Some(uri),
156 reply_channel: *reply_channel,
157 },
158 );
159 (
160 *request_id,
161 fb::AtlasPayload::GetRequest,
162 Some(payload.as_union_value()),
163 )
164 }
165 Message::InsertRequest {
166 request_id,
167 uri,
168 id,
169 reply_channel,
170 } => {
171 let uri = encode_uri(&mut builder, uri);
172 let payload = fb::InsertRequest::create(
173 &mut builder,
174 &fb::InsertRequestArgs {
175 uri: Some(uri),
176 id: *id,
177 reply_channel: *reply_channel,
178 },
179 );
180 (
181 *request_id,
182 fb::AtlasPayload::InsertRequest,
183 Some(payload.as_union_value()),
184 )
185 }
186 Message::RemoveRequest {
187 request_id,
188 uri,
189 reply_channel,
190 } => {
191 let uri = encode_uri(&mut builder, uri);
192 let payload = fb::RemoveRequest::create(
193 &mut builder,
194 &fb::RemoveRequestArgs {
195 uri: Some(uri),
196 reply_channel: *reply_channel,
197 },
198 );
199 (
200 *request_id,
201 fb::AtlasPayload::RemoveRequest,
202 Some(payload.as_union_value()),
203 )
204 }
205 Message::LookupRequest {
206 request_id,
207 pattern,
208 reply_channel,
209 } => {
210 let pattern = builder.create_string(pattern);
211 let payload = fb::LookupRequest::create(
212 &mut builder,
213 &fb::LookupRequestArgs {
214 pattern: Some(pattern),
215 reply_channel: *reply_channel,
216 },
217 );
218 (
219 *request_id,
220 fb::AtlasPayload::LookupRequest,
221 Some(payload.as_union_value()),
222 )
223 }
224 Message::ResponseGet {
225 request_id,
226 id,
227 found,
228 } => {
229 let payload = fb::GetResponse::create(
230 &mut builder,
231 &fb::GetResponseArgs {
232 id: *id,
233 found: *found,
234 },
235 );
236 (
237 *request_id,
238 fb::AtlasPayload::GetResponse,
239 Some(payload.as_union_value()),
240 )
241 }
242 Message::ResponseRemove {
243 request_id,
244 id,
245 found,
246 } => {
247 let payload = fb::RemoveResponse::create(
248 &mut builder,
249 &fb::RemoveResponseArgs {
250 id: *id,
251 found: *found,
252 },
253 );
254 (
255 *request_id,
256 fb::AtlasPayload::RemoveResponse,
257 Some(payload.as_union_value()),
258 )
259 }
260 Message::ResponseLookup { request_id, ids } => {
261 let ids = builder.create_vector(ids);
262 let payload = fb::LookupResponse::create(
263 &mut builder,
264 &fb::LookupResponseArgs { ids: Some(ids) },
265 );
266 (
267 *request_id,
268 fb::AtlasPayload::LookupResponse,
269 Some(payload.as_union_value()),
270 )
271 }
272 Message::ResponseOk { request_id } => {
273 let payload = fb::OkResponse::create(&mut builder, &fb::OkResponseArgs {});
274 (
275 *request_id,
276 fb::AtlasPayload::OkResponse,
277 Some(payload.as_union_value()),
278 )
279 }
280 Message::ResponseError {
281 request_id,
282 message,
283 } => {
284 let message = builder.create_string(message);
285 let payload = fb::ErrorResponse::create(
286 &mut builder,
287 &fb::ErrorResponseArgs {
288 message: Some(message),
289 },
290 );
291 (
292 *request_id,
293 fb::AtlasPayload::ErrorResponse,
294 Some(payload.as_union_value()),
295 )
296 }
297 };
298
299 let message = fb::AtlasMessage::create(
300 &mut builder,
301 &fb::AtlasMessageArgs {
302 request_id,
303 payload_type,
304 payload,
305 },
306 );
307 builder.finish(message, Some(ATLAS_IDENTIFIER));
308 Ok(builder.finished_data().to_vec())
309}
310
311pub fn decode_message(bytes: &[u8]) -> Result<Message, ProtocolError> {
313 if !fb::atlas_message_buffer_has_identifier(bytes) {
314 return Err(ProtocolError::InvalidIdentifier);
315 }
316 let message = flatbuffers::root::<fb::AtlasMessage>(bytes)?;
317
318 match message.payload_type() {
319 fb::AtlasPayload::GetRequest => {
320 let req = message
321 .payload_as_get_request()
322 .ok_or(ProtocolError::MissingPayload)?;
323 let uri = decode_uri(req.uri().ok_or(ProtocolError::MissingPayload)?);
324 Ok(Message::GetRequest {
325 request_id: message.request_id(),
326 uri,
327 reply_channel: req.reply_channel(),
328 })
329 }
330 fb::AtlasPayload::InsertRequest => {
331 let req = message
332 .payload_as_insert_request()
333 .ok_or(ProtocolError::MissingPayload)?;
334 let uri = decode_uri(req.uri().ok_or(ProtocolError::MissingPayload)?);
335 Ok(Message::InsertRequest {
336 request_id: message.request_id(),
337 uri,
338 id: req.id(),
339 reply_channel: req.reply_channel(),
340 })
341 }
342 fb::AtlasPayload::RemoveRequest => {
343 let req = message
344 .payload_as_remove_request()
345 .ok_or(ProtocolError::MissingPayload)?;
346 let uri = decode_uri(req.uri().ok_or(ProtocolError::MissingPayload)?);
347 Ok(Message::RemoveRequest {
348 request_id: message.request_id(),
349 uri,
350 reply_channel: req.reply_channel(),
351 })
352 }
353 fb::AtlasPayload::LookupRequest => {
354 let req = message
355 .payload_as_lookup_request()
356 .ok_or(ProtocolError::MissingPayload)?;
357 let pattern = req
358 .pattern()
359 .ok_or(ProtocolError::MissingPayload)?
360 .to_string();
361 Ok(Message::LookupRequest {
362 request_id: message.request_id(),
363 pattern,
364 reply_channel: req.reply_channel(),
365 })
366 }
367 fb::AtlasPayload::GetResponse => {
368 let resp = message
369 .payload_as_get_response()
370 .ok_or(ProtocolError::MissingPayload)?;
371 Ok(Message::ResponseGet {
372 request_id: message.request_id(),
373 id: resp.id(),
374 found: resp.found(),
375 })
376 }
377 fb::AtlasPayload::RemoveResponse => {
378 let resp = message
379 .payload_as_remove_response()
380 .ok_or(ProtocolError::MissingPayload)?;
381 Ok(Message::ResponseRemove {
382 request_id: message.request_id(),
383 id: resp.id(),
384 found: resp.found(),
385 })
386 }
387 fb::AtlasPayload::LookupResponse => {
388 let resp = message
389 .payload_as_lookup_response()
390 .ok_or(ProtocolError::MissingPayload)?;
391 Ok(Message::ResponseLookup {
392 request_id: message.request_id(),
393 ids: decode_ids(resp.ids()),
394 })
395 }
396 fb::AtlasPayload::OkResponse => Ok(Message::ResponseOk {
397 request_id: message.request_id(),
398 }),
399 fb::AtlasPayload::ErrorResponse => {
400 let resp = message
401 .payload_as_error_response()
402 .ok_or(ProtocolError::MissingPayload)?;
403 let error_message = resp
404 .message()
405 .ok_or(ProtocolError::MissingPayload)?
406 .to_string();
407 Ok(Message::ResponseError {
408 request_id: message.request_id(),
409 message: error_message,
410 })
411 }
412 _ => Err(ProtocolError::UnknownPayload),
413 }
414}
415
416fn encode_uri<'bldr>(
417 builder: &mut FlatBufferBuilder<'bldr>,
418 uri: &Uri,
419) -> flatbuffers::WIPOffset<fb_uri::Uri<'bldr>> {
420 uri.write_flatbuffer(builder)
421}
422
423fn decode_uri(table: fb_uri::Uri<'_>) -> Uri {
424 Uri::from_flatbuffer_table(table)
425}
426
427fn decode_ids(ids: Option<flatbuffers::Vector<'_, AtlasId>>) -> Vec<AtlasId> {
428 let mut out = Vec::new();
429 if let Some(vec) = ids {
430 out.extend(vec.iter());
431 }
432 out
433}