gel_protocol/
server_message.rs

1/*!
2The [ServerMessage] enum and related types. Gel website documentation on messages [here](https://www.edgedb.com/docs/reference/protocol/messages).
3
4```rust,ignore
5pub enum ServerMessage {
6    ServerHandshake(ServerHandshake),
7    UnknownMessage(u8, Bytes),
8    LogMessage(LogMessage),
9    ErrorResponse(ErrorResponse),
10    Authentication(Authentication),
11    ReadyForCommand(ReadyForCommand),
12    ServerKeyData(ServerKeyData),
13    ParameterStatus(ParameterStatus),
14    CommandComplete0(CommandComplete0),
15    CommandComplete1(CommandComplete1),
16    PrepareComplete(PrepareComplete),
17    CommandDataDescription0(CommandDataDescription0), // protocol < 1.0
18    CommandDataDescription1(CommandDataDescription1), // protocol >= 1.0
19    StateDataDescription(StateDataDescription),
20    Data(Data),
21    RestoreReady(RestoreReady),
22    DumpHeader(RawPacket),
23    DumpBlock(RawPacket),
24}
25```
26*/
27
28use std::collections::HashMap;
29use std::convert::{TryFrom, TryInto};
30
31use bytes::{Buf, BufMut, Bytes};
32use snafu::{ensure, OptionExt};
33use uuid::Uuid;
34
35use crate::common::Capabilities;
36pub use crate::common::{Cardinality, RawTypedesc, State};
37use crate::descriptors::Typedesc;
38use crate::encoding::{Annotations, Decode, Encode, Input, KeyValues, Output};
39use crate::errors::{self, DecodeError, EncodeError};
40use crate::features::ProtocolVersion;
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43#[non_exhaustive]
44pub enum ServerMessage {
45    Authentication(Authentication),
46    CommandComplete0(CommandComplete0),
47    CommandComplete1(CommandComplete1),
48    CommandDataDescription0(CommandDataDescription0), // protocol < 1.0
49    CommandDataDescription1(CommandDataDescription1), // protocol >= 1.0
50    StateDataDescription(StateDataDescription),
51    Data(Data),
52    // Don't decode Dump packets here as we only need to process them as
53    // whole
54    DumpHeader(RawPacket),
55    DumpBlock(RawPacket),
56    ErrorResponse(ErrorResponse),
57    LogMessage(LogMessage),
58    ParameterStatus(ParameterStatus),
59    ReadyForCommand(ReadyForCommand),
60    RestoreReady(RestoreReady),
61    ServerHandshake(ServerHandshake),
62    ServerKeyData(ServerKeyData),
63    UnknownMessage(u8, Bytes),
64    PrepareComplete(PrepareComplete),
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct ReadyForCommand {
69    pub headers: KeyValues,
70    pub transaction_state: TransactionState,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum Authentication {
75    Ok,
76    Sasl { methods: Vec<String> },
77    SaslContinue { data: Bytes },
78    SaslFinal { data: Bytes },
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ErrorSeverity {
83    Error,
84    Fatal,
85    Panic,
86    Unknown(u8),
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum MessageSeverity {
91    Debug,
92    Info,
93    Notice,
94    Warning,
95    Unknown(u8),
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum TransactionState {
100    // Not in a transaction block.
101    NotInTransaction = 0x49,
102
103    // In a transaction block.
104    InTransaction = 0x54,
105
106    // In a failed transaction block
107    // (commands will be rejected until the block is ended).
108    InFailedTransaction = 0x45,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct ErrorResponse {
113    pub severity: ErrorSeverity,
114    pub code: u32,
115    pub message: String,
116    pub attributes: KeyValues,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct LogMessage {
121    pub severity: MessageSeverity,
122    pub code: u32,
123    pub text: String,
124    pub attributes: KeyValues,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct ServerHandshake {
129    pub major_ver: u16,
130    pub minor_ver: u16,
131    pub extensions: HashMap<String, KeyValues>,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct ServerKeyData {
136    pub data: [u8; 32],
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
140pub struct ParameterStatus {
141    pub proto: ProtocolVersion,
142    pub name: Bytes,
143    pub value: Bytes,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct CommandComplete0 {
148    pub headers: KeyValues,
149    pub status_data: Bytes,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct CommandComplete1 {
154    pub annotations: Annotations,
155    pub capabilities: Capabilities,
156    pub status_data: Bytes,
157    pub state: Option<State>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct PrepareComplete {
162    pub headers: KeyValues,
163    pub cardinality: Cardinality,
164    pub input_typedesc_id: Uuid,
165    pub output_typedesc_id: Uuid,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct ParseComplete {
170    pub headers: KeyValues,
171    pub cardinality: Cardinality,
172    pub input_typedesc_id: Uuid,
173    pub output_typedesc_id: Uuid,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct CommandDataDescription0 {
178    pub headers: KeyValues,
179    pub result_cardinality: Cardinality,
180    pub input: RawTypedesc,
181    pub output: RawTypedesc,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct CommandDataDescription1 {
186    pub annotations: Annotations,
187    pub capabilities: Capabilities,
188    pub result_cardinality: Cardinality,
189    pub input: RawTypedesc,
190    pub output: RawTypedesc,
191}
192
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct StateDataDescription {
195    pub typedesc: RawTypedesc,
196}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub struct Data {
200    pub data: Vec<Bytes>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct RestoreReady {
205    pub headers: KeyValues,
206    pub jobs: u16,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct RawPacket {
211    pub data: Bytes,
212}
213
214fn encode<T: Encode>(buf: &mut Output, code: u8, msg: &T) -> Result<(), EncodeError> {
215    buf.reserve(5);
216    buf.put_u8(code);
217    let base = buf.len();
218    buf.put_slice(&[0; 4]);
219
220    msg.encode(buf)?;
221
222    let size = u32::try_from(buf.len() - base)
223        .ok()
224        .context(errors::MessageTooLong)?;
225    buf[base..base + 4].copy_from_slice(&size.to_be_bytes()[..]);
226    Ok(())
227}
228
229impl CommandDataDescription0 {
230    pub fn output(&self) -> Result<Typedesc, DecodeError> {
231        self.output.decode()
232    }
233    pub fn input(&self) -> Result<Typedesc, DecodeError> {
234        self.input.decode()
235    }
236}
237
238impl CommandDataDescription1 {
239    pub fn output(&self) -> Result<Typedesc, DecodeError> {
240        self.output.decode()
241    }
242    pub fn input(&self) -> Result<Typedesc, DecodeError> {
243        self.input.decode()
244    }
245}
246
247impl From<CommandDataDescription0> for CommandDataDescription1 {
248    fn from(value: CommandDataDescription0) -> Self {
249        Self {
250            annotations: HashMap::new(),
251            capabilities: decode_capabilities0(&value.headers).unwrap_or(Capabilities::ALL),
252            result_cardinality: value.result_cardinality,
253            input: value.input,
254            output: value.output,
255        }
256    }
257}
258
259impl StateDataDescription {
260    pub fn parse(self) -> Result<Typedesc, DecodeError> {
261        self.typedesc.decode()
262    }
263}
264
265impl ParameterStatus {
266    pub fn parse_system_config(self) -> Result<(Typedesc, Bytes), DecodeError> {
267        let cur = &mut Input::new(self.proto.clone(), self.value);
268        let typedesc_data = Bytes::decode(cur)?;
269        let data = Bytes::decode(cur)?;
270
271        let typedesc_buf = &mut Input::new(self.proto, typedesc_data);
272        let typedesc_id = Uuid::decode(typedesc_buf)?;
273        let typedesc = Typedesc::decode_with_id(typedesc_id, typedesc_buf)?;
274        Ok((typedesc, data))
275    }
276}
277
278impl ServerMessage {
279    pub fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
280        use ServerMessage::*;
281        match self {
282            ServerHandshake(h) => encode(buf, 0x76, h),
283            ErrorResponse(h) => encode(buf, 0x45, h),
284            LogMessage(h) => encode(buf, 0x4c, h),
285            Authentication(h) => encode(buf, 0x52, h),
286            ReadyForCommand(h) => encode(buf, 0x5a, h),
287            ServerKeyData(h) => encode(buf, 0x4b, h),
288            ParameterStatus(h) => encode(buf, 0x53, h),
289            CommandComplete0(h) => encode(buf, 0x43, h),
290            CommandComplete1(h) => encode(buf, 0x43, h),
291            PrepareComplete(h) => encode(buf, 0x31, h),
292            CommandDataDescription0(h) => encode(buf, 0x54, h),
293            CommandDataDescription1(h) => encode(buf, 0x54, h),
294            StateDataDescription(h) => encode(buf, 0x73, h),
295            Data(h) => encode(buf, 0x44, h),
296            RestoreReady(h) => encode(buf, 0x2b, h),
297            DumpHeader(h) => encode(buf, 0x40, h),
298            DumpBlock(h) => encode(buf, 0x3d, h),
299
300            UnknownMessage(_, _) => errors::UnknownMessageCantBeEncoded.fail()?,
301        }
302    }
303    /// Decode exactly one frame from the buffer.
304    ///
305    /// This expects a full frame to already be in the buffer. It can return
306    /// an arbitrary error or be silent if a message is only partially present
307    /// in the buffer or if extra data is present.
308    pub fn decode(buf: &mut Input) -> Result<ServerMessage, DecodeError> {
309        use self::ServerMessage as M;
310        let data = &mut buf.slice(5..);
311        let result = match buf[0] {
312            0x76 => ServerHandshake::decode(data).map(M::ServerHandshake)?,
313            0x45 => ErrorResponse::decode(data).map(M::ErrorResponse)?,
314            0x4c => LogMessage::decode(data).map(M::LogMessage)?,
315            0x52 => Authentication::decode(data).map(M::Authentication)?,
316            0x5a => ReadyForCommand::decode(data).map(M::ReadyForCommand)?,
317            0x4b => ServerKeyData::decode(data).map(M::ServerKeyData)?,
318            0x53 => ParameterStatus::decode(data).map(M::ParameterStatus)?,
319            0x43 => {
320                if buf.proto().is_1() {
321                    CommandComplete1::decode(data).map(M::CommandComplete1)?
322                } else {
323                    CommandComplete0::decode(data).map(M::CommandComplete0)?
324                }
325            }
326            0x31 => PrepareComplete::decode(data).map(M::PrepareComplete)?,
327            0x44 => Data::decode(data).map(M::Data)?,
328            0x2b => RestoreReady::decode(data).map(M::RestoreReady)?,
329            0x40 => RawPacket::decode(data).map(M::DumpHeader)?,
330            0x3d => RawPacket::decode(data).map(M::DumpBlock)?,
331            0x54 => {
332                if buf.proto().is_1() {
333                    CommandDataDescription1::decode(data).map(M::CommandDataDescription1)?
334                } else {
335                    CommandDataDescription0::decode(data).map(M::CommandDataDescription0)?
336                }
337            }
338            0x73 => StateDataDescription::decode(data).map(M::StateDataDescription)?,
339            code => M::UnknownMessage(code, data.copy_to_bytes(data.remaining())),
340        };
341        ensure!(data.remaining() == 0, errors::ExtraData);
342        Ok(result)
343    }
344}
345
346impl Encode for ServerHandshake {
347    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
348        buf.reserve(6);
349        buf.put_u16(self.major_ver);
350        buf.put_u16(self.minor_ver);
351        buf.put_u16(
352            u16::try_from(self.extensions.len())
353                .ok()
354                .context(errors::TooManyExtensions)?,
355        );
356        for (name, headers) in &self.extensions {
357            name.encode(buf)?;
358            buf.reserve(2);
359            buf.put_u16(
360                u16::try_from(headers.len())
361                    .ok()
362                    .context(errors::TooManyHeaders)?,
363            );
364            for (&name, value) in headers {
365                buf.reserve(2);
366                buf.put_u16(name);
367                value.encode(buf)?;
368            }
369        }
370        Ok(())
371    }
372}
373
374impl Decode for ServerHandshake {
375    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
376        ensure!(buf.remaining() >= 6, errors::Underflow);
377        let major_ver = buf.get_u16();
378        let minor_ver = buf.get_u16();
379        let num_ext = buf.get_u16();
380        let mut extensions = HashMap::new();
381        for _ in 0..num_ext {
382            let name = String::decode(buf)?;
383            ensure!(buf.remaining() >= 2, errors::Underflow);
384            let num_headers = buf.get_u16();
385            let mut headers = HashMap::new();
386            for _ in 0..num_headers {
387                headers.insert(buf.get_u16(), Bytes::decode(buf)?);
388            }
389            extensions.insert(name, headers);
390        }
391        Ok(ServerHandshake {
392            major_ver,
393            minor_ver,
394            extensions,
395        })
396    }
397}
398
399impl Encode for ErrorResponse {
400    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
401        buf.reserve(11);
402        buf.put_u8(self.severity.to_u8());
403        buf.put_u32(self.code);
404        self.message.encode(buf)?;
405        buf.reserve(2);
406        buf.put_u16(
407            u16::try_from(self.attributes.len())
408                .ok()
409                .context(errors::TooManyHeaders)?,
410        );
411        for (&name, value) in &self.attributes {
412            buf.reserve(2);
413            buf.put_u16(name);
414            value.encode(buf)?;
415        }
416        Ok(())
417    }
418}
419
420impl Decode for ErrorResponse {
421    fn decode(buf: &mut Input) -> Result<ErrorResponse, DecodeError> {
422        ensure!(buf.remaining() >= 11, errors::Underflow);
423        let severity = ErrorSeverity::from_u8(buf.get_u8());
424        let code = buf.get_u32();
425        let message = String::decode(buf)?;
426        ensure!(buf.remaining() >= 2, errors::Underflow);
427        let num_attributes = buf.get_u16();
428        let mut attributes = HashMap::new();
429        for _ in 0..num_attributes {
430            ensure!(buf.remaining() >= 4, errors::Underflow);
431            attributes.insert(buf.get_u16(), Bytes::decode(buf)?);
432        }
433        Ok(ErrorResponse {
434            severity,
435            code,
436            message,
437            attributes,
438        })
439    }
440}
441
442impl Encode for LogMessage {
443    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
444        buf.reserve(11);
445        buf.put_u8(self.severity.to_u8());
446        buf.put_u32(self.code);
447        self.text.encode(buf)?;
448        buf.reserve(2);
449        buf.put_u16(
450            u16::try_from(self.attributes.len())
451                .ok()
452                .context(errors::TooManyHeaders)?,
453        );
454        for (&name, value) in &self.attributes {
455            buf.reserve(2);
456            buf.put_u16(name);
457            value.encode(buf)?;
458        }
459        Ok(())
460    }
461}
462
463impl Decode for LogMessage {
464    fn decode(buf: &mut Input) -> Result<LogMessage, DecodeError> {
465        ensure!(buf.remaining() >= 11, errors::Underflow);
466        let severity = MessageSeverity::from_u8(buf.get_u8());
467        let code = buf.get_u32();
468        let text = String::decode(buf)?;
469        ensure!(buf.remaining() >= 2, errors::Underflow);
470        let num_attributes = buf.get_u16();
471        let mut attributes = HashMap::new();
472        for _ in 0..num_attributes {
473            ensure!(buf.remaining() >= 4, errors::Underflow);
474            attributes.insert(buf.get_u16(), Bytes::decode(buf)?);
475        }
476        Ok(LogMessage {
477            severity,
478            code,
479            text,
480            attributes,
481        })
482    }
483}
484
485impl Encode for Authentication {
486    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
487        use Authentication as A;
488        buf.reserve(1);
489        match self {
490            A::Ok => buf.put_u32(0),
491            A::Sasl { methods } => {
492                buf.put_u32(0x0A);
493                buf.reserve(4);
494                buf.put_u32(
495                    methods
496                        .len()
497                        .try_into()
498                        .ok()
499                        .context(errors::TooManyMethods)?,
500                );
501                for meth in methods {
502                    meth.encode(buf)?;
503                }
504            }
505            A::SaslContinue { data } => {
506                buf.put_u32(0x0B);
507                data.encode(buf)?;
508            }
509            A::SaslFinal { data } => {
510                buf.put_u32(0x0C);
511                data.encode(buf)?;
512            }
513        }
514        Ok(())
515    }
516}
517
518impl Decode for Authentication {
519    fn decode(buf: &mut Input) -> Result<Authentication, DecodeError> {
520        ensure!(buf.remaining() >= 4, errors::Underflow);
521        match buf.get_u32() {
522            0x00 => Ok(Authentication::Ok),
523            0x0A => {
524                ensure!(buf.remaining() >= 4, errors::Underflow);
525                let num_methods = buf.get_u32() as usize;
526                let mut methods = Vec::with_capacity(num_methods);
527                for _ in 0..num_methods {
528                    methods.push(String::decode(buf)?);
529                }
530                Ok(Authentication::Sasl { methods })
531            }
532            0x0B => {
533                let data = Bytes::decode(buf)?;
534                Ok(Authentication::SaslContinue { data })
535            }
536            0x0C => {
537                let data = Bytes::decode(buf)?;
538                Ok(Authentication::SaslFinal { data })
539            }
540            c => errors::AuthStatusInvalid { auth_status: c }.fail()?,
541        }
542    }
543}
544
545impl Encode for ReadyForCommand {
546    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
547        buf.reserve(3);
548        buf.put_u16(
549            u16::try_from(self.headers.len())
550                .ok()
551                .context(errors::TooManyHeaders)?,
552        );
553        for (&name, value) in &self.headers {
554            buf.reserve(2);
555            buf.put_u16(name);
556            value.encode(buf)?;
557        }
558        buf.reserve(1);
559        buf.put_u8(self.transaction_state as u8);
560        Ok(())
561    }
562}
563impl Decode for ReadyForCommand {
564    fn decode(buf: &mut Input) -> Result<ReadyForCommand, DecodeError> {
565        use TransactionState::*;
566        ensure!(buf.remaining() >= 3, errors::Underflow);
567        let mut headers = HashMap::new();
568        let num_headers = buf.get_u16();
569        for _ in 0..num_headers {
570            ensure!(buf.remaining() >= 4, errors::Underflow);
571            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
572        }
573        ensure!(buf.remaining() >= 1, errors::Underflow);
574        let transaction_state = match buf.get_u8() {
575            0x49 => NotInTransaction,
576            0x54 => InTransaction,
577            0x45 => InFailedTransaction,
578            s => errors::InvalidTransactionState {
579                transaction_state: s,
580            }
581            .fail()?,
582        };
583        Ok(ReadyForCommand {
584            headers,
585            transaction_state,
586        })
587    }
588}
589
590impl ErrorSeverity {
591    pub fn from_u8(code: u8) -> ErrorSeverity {
592        use ErrorSeverity::*;
593        match code {
594            120 => Error,
595            200 => Fatal,
596            255 => Panic,
597            _ => Unknown(code),
598        }
599    }
600    pub fn to_u8(&self) -> u8 {
601        use ErrorSeverity::*;
602        match *self {
603            Error => 120,
604            Fatal => 200,
605            Panic => 255,
606            Unknown(code) => code,
607        }
608    }
609}
610
611impl MessageSeverity {
612    fn from_u8(code: u8) -> MessageSeverity {
613        use MessageSeverity::*;
614        match code {
615            20 => Debug,
616            40 => Info,
617            60 => Notice,
618            80 => Warning,
619            _ => Unknown(code),
620        }
621    }
622    fn to_u8(self) -> u8 {
623        use MessageSeverity::*;
624        match self {
625            Debug => 20,
626            Info => 40,
627            Notice => 60,
628            Warning => 80,
629            Unknown(code) => code,
630        }
631    }
632}
633
634impl Encode for ServerKeyData {
635    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
636        buf.extend(&self.data[..]);
637        Ok(())
638    }
639}
640impl Decode for ServerKeyData {
641    fn decode(buf: &mut Input) -> Result<ServerKeyData, DecodeError> {
642        ensure!(buf.remaining() >= 32, errors::Underflow);
643        let mut data = [0u8; 32];
644        buf.copy_to_slice(&mut data[..]);
645        Ok(ServerKeyData { data })
646    }
647}
648
649impl Encode for ParameterStatus {
650    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
651        self.name.encode(buf)?;
652        self.value.encode(buf)?;
653        Ok(())
654    }
655}
656impl Decode for ParameterStatus {
657    fn decode(buf: &mut Input) -> Result<ParameterStatus, DecodeError> {
658        let name = Bytes::decode(buf)?;
659        let value = Bytes::decode(buf)?;
660        Ok(ParameterStatus {
661            proto: buf.proto().clone(),
662            name,
663            value,
664        })
665    }
666}
667
668impl Encode for CommandComplete0 {
669    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
670        buf.reserve(6);
671        buf.put_u16(
672            u16::try_from(self.headers.len())
673                .ok()
674                .context(errors::TooManyHeaders)?,
675        );
676        for (&name, value) in &self.headers {
677            buf.reserve(2);
678            buf.put_u16(name);
679            value.encode(buf)?;
680        }
681        self.status_data.encode(buf)?;
682        Ok(())
683    }
684}
685
686impl Decode for CommandComplete0 {
687    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
688        ensure!(buf.remaining() >= 6, errors::Underflow);
689        let num_headers = buf.get_u16();
690        let mut headers = HashMap::new();
691        for _ in 0..num_headers {
692            ensure!(buf.remaining() >= 4, errors::Underflow);
693            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
694        }
695        let status_data = Bytes::decode(buf)?;
696        Ok(CommandComplete0 {
697            status_data,
698            headers,
699        })
700    }
701}
702
703impl Encode for CommandComplete1 {
704    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
705        buf.reserve(26);
706        buf.put_u16(
707            u16::try_from(self.annotations.len())
708                .ok()
709                .context(errors::TooManyHeaders)?,
710        );
711        for (name, value) in &self.annotations {
712            name.encode(buf)?;
713            value.encode(buf)?;
714        }
715        buf.put_u64(self.capabilities.bits());
716        self.status_data.encode(buf)?;
717        if let Some(state) = &self.state {
718            state.typedesc_id.encode(buf)?;
719            state.data.encode(buf)?;
720        } else {
721            Uuid::from_u128(0).encode(buf)?;
722            Bytes::new().encode(buf)?;
723        }
724        Ok(())
725    }
726}
727
728impl Decode for CommandComplete1 {
729    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
730        ensure!(buf.remaining() >= 26, errors::Underflow);
731        let num_annotations = buf.get_u16();
732        let mut annotations = HashMap::new();
733        for _ in 0..num_annotations {
734            annotations.insert(String::decode(buf)?, String::decode(buf)?);
735        }
736        let capabilities = Capabilities::from_bits_retain(buf.get_u64());
737        let status_data = Bytes::decode(buf)?;
738        let typedesc_id = Uuid::decode(buf)?;
739        let state_data = Bytes::decode(buf)?;
740        let state = if typedesc_id == Uuid::from_u128(0) {
741            None
742        } else {
743            Some(State {
744                typedesc_id,
745                data: state_data,
746            })
747        };
748        Ok(CommandComplete1 {
749            annotations,
750            capabilities,
751            status_data,
752            state,
753        })
754    }
755}
756
757impl Encode for PrepareComplete {
758    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
759        buf.reserve(35);
760        buf.put_u16(
761            u16::try_from(self.headers.len())
762                .ok()
763                .context(errors::TooManyHeaders)?,
764        );
765        for (&name, value) in &self.headers {
766            buf.reserve(2);
767            buf.put_u16(name);
768            value.encode(buf)?;
769        }
770        buf.reserve(33);
771        buf.put_u8(self.cardinality as u8);
772        self.input_typedesc_id.encode(buf)?;
773        self.output_typedesc_id.encode(buf)?;
774        Ok(())
775    }
776}
777
778impl Decode for PrepareComplete {
779    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
780        ensure!(buf.remaining() >= 35, errors::Underflow);
781        let num_headers = buf.get_u16();
782        let mut headers = HashMap::new();
783        for _ in 0..num_headers {
784            ensure!(buf.remaining() >= 4, errors::Underflow);
785            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
786        }
787        ensure!(buf.remaining() >= 33, errors::Underflow);
788        let cardinality = TryFrom::try_from(buf.get_u8())?;
789        let input_typedesc_id = Uuid::decode(buf)?;
790        let output_typedesc_id = Uuid::decode(buf)?;
791        Ok(PrepareComplete {
792            headers,
793            cardinality,
794            input_typedesc_id,
795            output_typedesc_id,
796        })
797    }
798}
799
800impl Encode for CommandDataDescription0 {
801    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
802        debug_assert!(!buf.proto().is_1());
803        buf.reserve(43);
804        buf.put_u16(
805            u16::try_from(self.headers.len())
806                .ok()
807                .context(errors::TooManyHeaders)?,
808        );
809        for (&name, value) in &self.headers {
810            buf.reserve(2);
811            buf.put_u16(name);
812            value.encode(buf)?;
813        }
814        buf.reserve(41);
815        buf.put_u8(self.result_cardinality as u8);
816        self.input.id.encode(buf)?;
817        self.input.data.encode(buf)?;
818        self.output.id.encode(buf)?;
819        self.output.data.encode(buf)?;
820        Ok(())
821    }
822}
823
824impl Decode for CommandDataDescription0 {
825    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
826        ensure!(buf.remaining() >= 43, errors::Underflow);
827        let num_headers = buf.get_u16();
828        let mut headers = HashMap::new();
829        for _ in 0..num_headers {
830            ensure!(buf.remaining() >= 4, errors::Underflow);
831            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
832        }
833        ensure!(buf.remaining() >= 41, errors::Underflow);
834        let result_cardinality = TryFrom::try_from(buf.get_u8())?;
835        let input = RawTypedesc {
836            proto: buf.proto().clone(),
837            id: Uuid::decode(buf)?,
838            data: Bytes::decode(buf)?,
839        };
840        let output = RawTypedesc {
841            proto: buf.proto().clone(),
842            id: Uuid::decode(buf)?,
843            data: Bytes::decode(buf)?,
844        };
845
846        Ok(CommandDataDescription0 {
847            headers,
848            result_cardinality,
849            input,
850            output,
851        })
852    }
853}
854
855impl Encode for CommandDataDescription1 {
856    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
857        debug_assert!(buf.proto().is_1());
858        buf.reserve(51);
859        buf.put_u16(
860            u16::try_from(self.annotations.len())
861                .ok()
862                .context(errors::TooManyHeaders)?,
863        );
864        for (name, value) in &self.annotations {
865            buf.reserve(4);
866            name.encode(buf)?;
867            value.encode(buf)?;
868        }
869        buf.reserve(49);
870        buf.put_u64(self.capabilities.bits());
871        buf.put_u8(self.result_cardinality as u8);
872        self.input.id.encode(buf)?;
873        self.input.data.encode(buf)?;
874        self.output.id.encode(buf)?;
875        self.output.data.encode(buf)?;
876        Ok(())
877    }
878}
879
880impl Decode for CommandDataDescription1 {
881    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
882        ensure!(buf.remaining() >= 51, errors::Underflow);
883        let num_annotations = buf.get_u16();
884        let mut annotations = HashMap::new();
885        for _ in 0..num_annotations {
886            ensure!(buf.remaining() >= 4, errors::Underflow);
887            annotations.insert(String::decode(buf)?, String::decode(buf)?);
888        }
889        ensure!(buf.remaining() >= 49, errors::Underflow);
890        let capabilities = Capabilities::from_bits_retain(buf.get_u64());
891        let result_cardinality = TryFrom::try_from(buf.get_u8())?;
892        let input = RawTypedesc {
893            proto: buf.proto().clone(),
894            id: Uuid::decode(buf)?,
895            data: Bytes::decode(buf)?,
896        };
897        let output = RawTypedesc {
898            proto: buf.proto().clone(),
899            id: Uuid::decode(buf)?,
900            data: Bytes::decode(buf)?,
901        };
902
903        Ok(CommandDataDescription1 {
904            annotations,
905            capabilities,
906            result_cardinality,
907            input,
908            output,
909        })
910    }
911}
912
913impl Encode for StateDataDescription {
914    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
915        debug_assert!(buf.proto().is_1());
916        self.typedesc.id.encode(buf)?;
917        self.typedesc.data.encode(buf)?;
918        Ok(())
919    }
920}
921
922impl Decode for StateDataDescription {
923    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
924        let typedesc = RawTypedesc {
925            proto: buf.proto().clone(),
926            id: Uuid::decode(buf)?,
927            data: Bytes::decode(buf)?,
928        };
929
930        Ok(StateDataDescription { typedesc })
931    }
932}
933
934impl Encode for Data {
935    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
936        buf.reserve(2);
937        buf.put_u16(
938            u16::try_from(self.data.len())
939                .ok()
940                .context(errors::TooManyHeaders)?,
941        );
942        for chunk in &self.data {
943            chunk.encode(buf)?;
944        }
945        Ok(())
946    }
947}
948
949impl Decode for Data {
950    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
951        ensure!(buf.remaining() >= 2, errors::Underflow);
952        let num_chunks = buf.get_u16() as usize;
953        let mut data = Vec::with_capacity(num_chunks);
954        for _ in 0..num_chunks {
955            data.push(Bytes::decode(buf)?);
956        }
957        Ok(Data { data })
958    }
959}
960
961impl Encode for RestoreReady {
962    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
963        buf.reserve(4);
964        buf.put_u16(
965            u16::try_from(self.headers.len())
966                .ok()
967                .context(errors::TooManyHeaders)?,
968        );
969        for (&name, value) in &self.headers {
970            buf.reserve(2);
971            buf.put_u16(name);
972            value.encode(buf)?;
973        }
974        buf.reserve(2);
975        buf.put_u16(self.jobs);
976        Ok(())
977    }
978}
979
980impl Decode for RestoreReady {
981    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
982        ensure!(buf.remaining() >= 4, errors::Underflow);
983        let num_headers = buf.get_u16();
984        let mut headers = HashMap::new();
985        for _ in 0..num_headers {
986            ensure!(buf.remaining() >= 4, errors::Underflow);
987            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
988        }
989        ensure!(buf.remaining() >= 2, errors::Underflow);
990        let jobs = buf.get_u16();
991        Ok(RestoreReady { jobs, headers })
992    }
993}
994
995impl Encode for RawPacket {
996    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
997        buf.extend(&self.data);
998        Ok(())
999    }
1000}
1001
1002impl Decode for RawPacket {
1003    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
1004        Ok(RawPacket {
1005            data: buf.copy_to_bytes(buf.remaining()),
1006        })
1007    }
1008}
1009
1010impl PrepareComplete {
1011    pub fn get_capabilities(&self) -> Option<Capabilities> {
1012        decode_capabilities0(&self.headers)
1013    }
1014}
1015
1016fn decode_capabilities0(headers: &KeyValues) -> Option<Capabilities> {
1017    headers.get(&0x1001).and_then(|bytes| {
1018        if bytes.len() == 8 {
1019            let mut array = [0u8; 8];
1020            array.copy_from_slice(bytes);
1021            Some(Capabilities::from_bits_retain(u64::from_be_bytes(array)))
1022        } else {
1023            None
1024        }
1025    })
1026}