gel_protocol/
server_message.rs

1/*!
2The [ServerMessage] enum and related types. Gel website documentation on messages [here](https://www.edgedb.com/docs/reference/protocol/messages).
3
4```rust,ignore
5pub enum ServerMessage {
6    ServerHandshake(ServerHandshake),
7    UnknownMessage(u8, Bytes),
8    LogMessage(LogMessage),
9    ErrorResponse(ErrorResponse),
10    Authentication(Authentication),
11    ReadyForCommand(ReadyForCommand),
12    ServerKeyData(ServerKeyData),
13    ParameterStatus(ParameterStatus),
14    CommandComplete0(CommandComplete0),
15    CommandComplete1(CommandComplete1),
16    PrepareComplete(PrepareComplete),
17    CommandDataDescription0(CommandDataDescription0), // protocol < 1.0
18    CommandDataDescription1(CommandDataDescription1), // protocol >= 1.0
19    StateDataDescription(StateDataDescription),
20    Data(Data),
21    RestoreReady(RestoreReady),
22    DumpHeader(RawPacket),
23    DumpBlock(RawPacket),
24}
25```
26*/
27
28use std::collections::HashMap;
29use std::convert::TryFrom;
30
31use bytes::{Buf, BufMut, Bytes};
32use uuid::Uuid;
33
34use crate::common::Capabilities;
35pub use crate::common::{Cardinality, RawTypedesc, State};
36use crate::descriptors::Typedesc;
37use crate::encoding::{Annotations, Decode, Encode, Input, KeyValues, Output};
38use crate::errors::{self, DecodeError, EncodeError, MessageTooLong};
39use crate::features::ProtocolVersion;
40use crate::new_protocol::{
41    self, prelude::EncoderForExt, AnnotationBuilder, ProtocolExtensionBuilder,
42    ServerHandshakeBuilder,
43};
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46#[non_exhaustive]
47pub enum ServerMessage {
48    Authentication(Authentication),
49    CommandComplete1(CommandComplete1),
50    CommandDataDescription1(CommandDataDescription1),
51    StateDataDescription(StateDataDescription),
52    Data(Data),
53    // Don't decode Dump packets here as we only need to process them as
54    // whole
55    DumpHeader(RawPacket),
56    DumpBlock(RawPacket),
57    ErrorResponse(ErrorResponse),
58    LogMessage(LogMessage),
59    ParameterStatus(ParameterStatus),
60    ReadyForCommand(ReadyForCommand),
61    RestoreReady(RestoreReady),
62    ServerHandshake(ServerHandshake),
63    ServerKeyData(ServerKeyData),
64    UnknownMessage(u8, Bytes),
65}
66
67pub use crate::new_protocol::TransactionState;
68
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct ReadyForCommand {
71    pub annotations: Annotations,
72    pub transaction_state: TransactionState,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub enum Authentication {
77    Ok,
78    Sasl { methods: Vec<String> },
79    SaslContinue { data: Bytes },
80    SaslFinal { data: Bytes },
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum ErrorSeverity {
85    Error,
86    Fatal,
87    Panic,
88    Unknown(u8),
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum MessageSeverity {
93    Debug,
94    Info,
95    Notice,
96    Warning,
97    Unknown(u8),
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct ErrorResponse {
102    pub severity: ErrorSeverity,
103    pub code: u32,
104    pub message: String,
105    pub attributes: KeyValues,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct LogMessage {
110    pub severity: MessageSeverity,
111    pub code: u32,
112    pub text: String,
113    pub annotations: Annotations,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct ServerHandshake {
118    pub major_ver: u16,
119    pub minor_ver: u16,
120    pub extensions: HashMap<String, Annotations>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct ServerKeyData {
125    pub data: [u8; 32],
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct ParameterStatus {
130    pub proto: ProtocolVersion,
131    pub name: Bytes,
132    pub value: Bytes,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct CommandComplete1 {
137    pub annotations: Annotations,
138    pub capabilities: Capabilities,
139    pub status: String,
140    pub state: Option<State>,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct ParseComplete {
145    pub headers: KeyValues,
146    pub cardinality: Cardinality,
147    pub input_typedesc_id: Uuid,
148    pub output_typedesc_id: Uuid,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct CommandDataDescription1 {
153    pub annotations: Annotations,
154    pub capabilities: Capabilities,
155    pub result_cardinality: Cardinality,
156    pub input: RawTypedesc,
157    pub output: RawTypedesc,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct StateDataDescription {
162    pub typedesc: RawTypedesc,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct Data {
167    pub data: Vec<Bytes>,
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct RestoreReady {
172    pub headers: KeyValues,
173    pub jobs: u16,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct RawPacket {
178    pub data: Bytes,
179}
180
181fn encode<T: Encode>(buf: &mut Output, msg: &T) -> Result<(), EncodeError> {
182    msg.encode(buf)?;
183    Ok(())
184}
185
186impl CommandDataDescription1 {
187    pub fn output(&self) -> Result<Typedesc, DecodeError> {
188        self.output.decode()
189    }
190    pub fn input(&self) -> Result<Typedesc, DecodeError> {
191        self.input.decode()
192    }
193}
194
195/// Bridges the old and new world.
196fn encode_output<T: 'static>(
197    buf: &mut Output,
198    builder: impl new_protocol::prelude::EncoderFor<T>,
199) -> Result<(), EncodeError> {
200    let len = builder.measure();
201    buf.reserve(len);
202    let len = builder
203        .encode_buffer_uninit(buf.uninit())
204        .map_err(|_| MessageTooLong.build())?
205        .len();
206    unsafe { buf.advance_mut(len) };
207    Ok(())
208}
209
210impl StateDataDescription {
211    pub fn parse(self) -> Result<Typedesc, DecodeError> {
212        self.typedesc.decode()
213    }
214}
215
216impl ParameterStatus {
217    pub fn parse_system_config(self) -> Result<(Typedesc, Bytes), DecodeError> {
218        let cur = &mut Input::new(self.proto.clone(), self.value);
219        let typedesc_data = Bytes::decode(cur)?;
220        let data = Bytes::decode(cur)?;
221
222        let typedesc_buf = &mut Input::new(self.proto, typedesc_data);
223        let typedesc_id = Uuid::decode(typedesc_buf)?;
224        let typedesc = Typedesc::decode_with_id(typedesc_id, typedesc_buf)?;
225        Ok((typedesc, data))
226    }
227}
228
229impl ServerMessage {
230    pub fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
231        use ServerMessage::*;
232        match self {
233            ServerHandshake(h) => encode(buf, h),
234            ErrorResponse(h) => encode(buf, h),
235            LogMessage(h) => encode(buf, h),
236            Authentication(h) => encode(buf, h),
237            ReadyForCommand(h) => encode(buf, h),
238            ServerKeyData(h) => encode(buf, h),
239            ParameterStatus(h) => encode(buf, h),
240            CommandComplete1(h) => encode(buf, h),
241            CommandDataDescription1(h) => encode(buf, h),
242            StateDataDescription(h) => encode(buf, h),
243            Data(h) => encode(buf, h),
244            RestoreReady(h) => encode(buf, h),
245            DumpHeader(h) => encode(buf, &(b'@', h)),
246            DumpBlock(h) => encode(buf, &(b'=', h)),
247
248            UnknownMessage(_, _) => errors::UnknownMessageCantBeEncoded.fail()?,
249        }
250    }
251    /// Decode exactly one frame from the buffer.
252    ///
253    /// This expects a full frame to already be in the buffer. It can return
254    /// an arbitrary error or be silent if a message is only partially present
255    /// in the buffer or if extra data is present.
256    pub fn decode(buf: &mut Input) -> Result<ServerMessage, DecodeError> {
257        use self::ServerMessage as M;
258        let message = new_protocol::Message::new(buf)?;
259        let mut next = buf.slice(..message.mlen() + 1);
260        buf.advance(message.mlen() + 1);
261        let buf = &mut next;
262
263        let result = match buf[0] {
264            0x76 => ServerHandshake::decode(buf).map(M::ServerHandshake)?,
265            0x45 => ErrorResponse::decode(buf).map(M::ErrorResponse)?,
266            0x4c => LogMessage::decode(buf).map(M::LogMessage)?,
267            0x52 => Authentication::decode(buf).map(M::Authentication)?,
268            0x5a => ReadyForCommand::decode(buf).map(M::ReadyForCommand)?,
269            0x4b => ServerKeyData::decode(buf).map(M::ServerKeyData)?,
270            0x53 => ParameterStatus::decode(buf).map(M::ParameterStatus)?,
271            0x43 => CommandComplete1::decode(buf).map(M::CommandComplete1)?,
272            0x44 => Data::decode(buf).map(M::Data)?,
273            0x2b => RestoreReady::decode(buf).map(M::RestoreReady)?,
274            0x40 => RawPacket::decode(buf).map(M::DumpHeader)?,
275            0x3d => RawPacket::decode(buf).map(M::DumpBlock)?,
276            0x54 => CommandDataDescription1::decode(buf).map(M::CommandDataDescription1)?,
277            0x73 => StateDataDescription::decode(buf).map(M::StateDataDescription)?,
278            code => M::UnknownMessage(code, buf.copy_to_bytes(buf.remaining())),
279        };
280        Ok(result)
281    }
282}
283
284impl Encode for ServerHandshake {
285    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
286        let extensions = || {
287            self.extensions.iter().map(|(name, headers)| {
288                let annotations = move || {
289                    headers
290                        .iter()
291                        .map(|(name, value)| AnnotationBuilder { name, value })
292                };
293                ProtocolExtensionBuilder { name, annotations }
294            })
295        };
296        let builder = ServerHandshakeBuilder {
297            major_ver: self.major_ver,
298            minor_ver: self.minor_ver,
299            extensions,
300        };
301
302        encode_output(buf, builder)?;
303        Ok(())
304    }
305}
306
307impl Decode for ServerHandshake {
308    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
309        let message = new_protocol::ServerHandshake::new(buf)?;
310        let mut extensions = HashMap::new();
311        for ext in message.extensions() {
312            let mut headers = HashMap::new();
313            for ann in ext.annotations() {
314                headers.insert(
315                    ann.name().to_string_lossy().to_string(),
316                    ann.value().to_string_lossy().to_string(),
317                );
318            }
319            extensions.insert(ext.name().to_string_lossy().to_string(), headers);
320        }
321
322        let decoded = ServerHandshake {
323            major_ver: message.major_ver(),
324            minor_ver: message.minor_ver(),
325            extensions,
326        };
327        buf.advance(message.as_ref().len());
328        Ok(decoded)
329    }
330}
331
332impl Encode for ErrorResponse {
333    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
334        let builder = new_protocol::ErrorResponseBuilder {
335            severity: self.severity.to_u8(),
336            error_code: self.code,
337            message: &self.message,
338            attributes: || {
339                self.attributes
340                    .iter()
341                    .map(|(name, value)| new_protocol::KeyValueBuilder {
342                        code: *name,
343                        value: value.as_ref(),
344                    })
345            },
346        };
347        encode_output(buf, builder)?;
348
349        Ok(())
350    }
351}
352
353impl Decode for ErrorResponse {
354    fn decode(buf: &mut Input) -> Result<ErrorResponse, DecodeError> {
355        let message = new_protocol::ErrorResponse::new(buf)?;
356        let mut attributes = HashMap::new();
357        for attr in message.attributes() {
358            attributes.insert(attr.code(), attr.value().into_slice().to_vec().into());
359        }
360
361        let decoded = ErrorResponse {
362            severity: ErrorSeverity::from_u8(message.severity()),
363            code: message.error_code() as u32,
364            message: message.message().to_string_lossy().to_string(),
365            attributes,
366        };
367        buf.advance(message.as_ref().len());
368        Ok(decoded)
369    }
370}
371
372impl Encode for LogMessage {
373    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
374        let builder = new_protocol::LogMessageBuilder {
375            severity: self.severity.to_u8(),
376            code: self.code as i32,
377            text: &self.text,
378            annotations: || {
379                self.annotations
380                    .iter()
381                    .map(|(name, value)| new_protocol::AnnotationBuilder { name, value })
382            },
383        };
384        encode_output(buf, builder)?;
385
386        Ok(())
387    }
388}
389
390impl Decode for LogMessage {
391    fn decode(buf: &mut Input) -> Result<LogMessage, DecodeError> {
392        let message = new_protocol::LogMessage::new(buf)?;
393        let mut annotations = HashMap::new();
394        for ann in message.annotations() {
395            annotations.insert(
396                ann.name().to_string_lossy().to_string(),
397                ann.value().to_string_lossy().to_string(),
398            );
399        }
400
401        let decoded = LogMessage {
402            severity: MessageSeverity::from_u8(message.severity()),
403            code: message.code() as u32,
404            text: message.text().to_string_lossy().to_string(),
405            annotations,
406        };
407        buf.advance(message.as_ref().len());
408        Ok(decoded)
409    }
410}
411
412impl Encode for Authentication {
413    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
414        use Authentication as A;
415        match self {
416            A::Ok => encode_output(buf, new_protocol::AuthenticationOkBuilder {})?,
417            A::Sasl { methods } => {
418                let builder = new_protocol::AuthenticationRequiredSASLMessageBuilder {
419                    methods: methods.as_slice(),
420                };
421                encode_output(buf, builder)?;
422            }
423            A::SaslContinue { data } => {
424                let builder = new_protocol::AuthenticationSASLContinueBuilder {
425                    sasl_data: data.as_ref(),
426                };
427                encode_output(buf, builder)?;
428            }
429            A::SaslFinal { data } => {
430                let builder = new_protocol::AuthenticationSASLFinalBuilder {
431                    sasl_data: data.as_ref(),
432                };
433                encode_output(buf, builder)?;
434            }
435        }
436        Ok(())
437    }
438}
439
440impl Decode for Authentication {
441    fn decode(buf: &mut Input) -> Result<Authentication, DecodeError> {
442        let auth = new_protocol::Authentication::new(buf)?;
443        match auth.auth_status() {
444            0x0 => Ok(Authentication::Ok),
445            0x0A => {
446                let auth = new_protocol::AuthenticationRequiredSASLMessage::new(buf)?;
447                let mut methods = Vec::new();
448                for method in auth.methods() {
449                    methods.push(method.to_string_lossy().to_string());
450                }
451                Ok(Authentication::Sasl { methods })
452            }
453            0x0B => {
454                let auth = new_protocol::AuthenticationSASLContinue::new(buf)?;
455                Ok(Authentication::SaslContinue {
456                    data: auth.sasl_data().into_slice().to_owned().into(),
457                })
458            }
459            0x0C => {
460                let auth = new_protocol::AuthenticationSASLFinal::new(buf)?;
461                Ok(Authentication::SaslFinal {
462                    data: auth.sasl_data().into_slice().to_owned().into(),
463                })
464            }
465            _ => errors::AuthStatusInvalid {
466                auth_status: buf[0],
467            }
468            .fail()?,
469        }
470    }
471}
472
473impl Encode for ReadyForCommand {
474    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
475        let builder = new_protocol::ReadyForCommandBuilder {
476            transaction_state: self.transaction_state,
477            annotations: || {
478                self.annotations
479                    .iter()
480                    .map(|(name, value)| new_protocol::AnnotationBuilder { name, value })
481            },
482        };
483        buf.reserve(builder.measure());
484        let len = builder
485            .encode_buffer_uninit(buf.uninit())
486            .map_err(|_| MessageTooLong.build())?
487            .len();
488        unsafe { buf.advance_mut(len) };
489
490        Ok(())
491    }
492}
493impl Decode for ReadyForCommand {
494    fn decode(buf: &mut Input) -> Result<ReadyForCommand, DecodeError> {
495        let message = new_protocol::ReadyForCommand::new(buf)?;
496        let mut annotations = HashMap::new();
497        for ann in message.annotations() {
498            annotations.insert(
499                ann.name().to_string_lossy().to_string(),
500                ann.value().to_string_lossy().to_string(),
501            );
502        }
503
504        let decoded = ReadyForCommand {
505            annotations,
506            transaction_state: message.transaction_state(),
507        };
508        buf.advance(message.as_ref().len());
509        Ok(decoded)
510    }
511}
512
513impl ErrorSeverity {
514    pub fn from_u8(code: u8) -> ErrorSeverity {
515        use ErrorSeverity::*;
516        match code {
517            120 => Error,
518            200 => Fatal,
519            255 => Panic,
520            _ => Unknown(code),
521        }
522    }
523    pub fn to_u8(&self) -> u8 {
524        use ErrorSeverity::*;
525        match *self {
526            Error => 120,
527            Fatal => 200,
528            Panic => 255,
529            Unknown(code) => code,
530        }
531    }
532}
533
534impl MessageSeverity {
535    fn from_u8(code: u8) -> MessageSeverity {
536        use MessageSeverity::*;
537        match code {
538            20 => Debug,
539            40 => Info,
540            60 => Notice,
541            80 => Warning,
542            _ => Unknown(code),
543        }
544    }
545    fn to_u8(self) -> u8 {
546        use MessageSeverity::*;
547        match self {
548            Debug => 20,
549            Info => 40,
550            Notice => 60,
551            Warning => 80,
552            Unknown(code) => code,
553        }
554    }
555}
556
557impl Encode for ServerKeyData {
558    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
559        let builder = new_protocol::ServerKeyDataBuilder { data: self.data };
560        encode_output(buf, builder)?;
561        Ok(())
562    }
563}
564impl Decode for ServerKeyData {
565    fn decode(buf: &mut Input) -> Result<ServerKeyData, DecodeError> {
566        let message = new_protocol::ServerKeyData::new(buf)?;
567        let decoded = ServerKeyData {
568            data: message.data(),
569        };
570        buf.advance(message.as_ref().len());
571        Ok(decoded)
572    }
573}
574
575impl Encode for ParameterStatus {
576    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
577        let builder = new_protocol::ParameterStatusBuilder {
578            name: self.name.as_ref(),
579            value: self.value.as_ref(),
580        };
581        encode_output(buf, builder)?;
582        Ok(())
583    }
584}
585impl Decode for ParameterStatus {
586    fn decode(buf: &mut Input) -> Result<ParameterStatus, DecodeError> {
587        let message = new_protocol::ParameterStatus::new(buf)?;
588        let decoded = ParameterStatus {
589            proto: buf.proto().clone(),
590            name: message.name().into_slice().to_owned().into(),
591            value: message.value().into_slice().to_owned().into(),
592        };
593        buf.advance(message.as_ref().len());
594        Ok(decoded)
595    }
596}
597
598impl Encode for CommandComplete1 {
599    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
600        let builder = new_protocol::CommandCompleteBuilder {
601            annotations: || {
602                self.annotations
603                    .iter()
604                    .map(|(name, value)| new_protocol::AnnotationBuilder { name, value })
605            },
606            capabilities: self.capabilities.bits(),
607            status: &self.status,
608            state_data: self
609                .state
610                .as_ref()
611                .map(|state| state.data.as_ref())
612                .unwrap_or_default(),
613            state_typedesc_id: self
614                .state
615                .as_ref()
616                .map(|state| state.typedesc_id)
617                .unwrap_or_default(),
618        };
619        encode_output(buf, builder)?;
620
621        Ok(())
622    }
623}
624
625impl Decode for CommandComplete1 {
626    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
627        let message = new_protocol::CommandComplete::new(buf)?;
628        let mut annotations = HashMap::new();
629        for ann in message.annotations() {
630            annotations.insert(
631                ann.name().to_string_lossy().to_string(),
632                ann.value().to_string_lossy().to_string(),
633            );
634        }
635
636        let decoded = CommandComplete1 {
637            annotations,
638            capabilities: Capabilities::from_bits_retain(message.capabilities()),
639            status: message.status().to_string_lossy().to_string(),
640            state: if message.state_typedesc_id() == Uuid::from_u128(0) {
641                None
642            } else {
643                Some(State {
644                    typedesc_id: message.state_typedesc_id(),
645                    data: message.state_data().into_slice().to_owned().into(),
646                })
647            },
648        };
649        buf.advance(message.as_ref().len());
650        Ok(decoded)
651    }
652}
653
654impl Encode for CommandDataDescription1 {
655    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
656        let builder = new_protocol::CommandDataDescriptionBuilder {
657            annotations: || {
658                self.annotations
659                    .iter()
660                    .map(|(name, value)| new_protocol::AnnotationBuilder { name, value })
661            },
662            capabilities: self.capabilities.bits(),
663            result_cardinality: self.result_cardinality as u8,
664            input_typedesc_id: self.input.id,
665            input_typedesc: self.input.data.as_ref(),
666            output_typedesc_id: self.output.id,
667            output_typedesc: self.output.data.as_ref(),
668        };
669        encode_output(buf, builder)?;
670        Ok(())
671    }
672}
673
674impl Decode for CommandDataDescription1 {
675    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
676        let message = new_protocol::CommandDataDescription::new(buf)?;
677        let mut annotations = HashMap::new();
678        for ann in message.annotations() {
679            annotations.insert(
680                ann.name().to_string_lossy().to_string(),
681                ann.value().to_string_lossy().to_string(),
682            );
683        }
684
685        let decoded = CommandDataDescription1 {
686            annotations,
687            capabilities: Capabilities::from_bits_retain(message.capabilities()),
688            result_cardinality: TryFrom::try_from(message.result_cardinality())?,
689            input: RawTypedesc {
690                proto: buf.proto().clone(),
691                id: message.input_typedesc_id(),
692                data: message.input_typedesc().into_slice().to_owned().into(),
693            },
694            output: RawTypedesc {
695                proto: buf.proto().clone(),
696                id: message.output_typedesc_id(),
697                data: message.output_typedesc().into_slice().to_owned().into(),
698            },
699        };
700        buf.advance(message.as_ref().len());
701        Ok(decoded)
702    }
703}
704
705impl Encode for StateDataDescription {
706    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
707        let builder = new_protocol::StateDataDescriptionBuilder {
708            typedesc_id: self.typedesc.id,
709            typedesc: self.typedesc.data.as_ref(),
710        };
711        encode_output(buf, builder)?;
712        Ok(())
713    }
714}
715
716impl Decode for StateDataDescription {
717    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
718        let message = new_protocol::StateDataDescription::new(buf)?;
719        let decoded = StateDataDescription {
720            typedesc: RawTypedesc {
721                proto: buf.proto().clone(),
722                id: message.typedesc_id(),
723                data: message.typedesc().into_slice().to_owned().into(),
724            },
725        };
726        buf.advance(message.as_ref().len());
727        Ok(decoded)
728    }
729}
730
731impl Encode for Data {
732    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
733        let builder = new_protocol::DataBuilder {
734            data: || {
735                self.data
736                    .iter()
737                    .map(|chunk| new_protocol::DataElementBuilder {
738                        data: chunk.as_ref(),
739                    })
740            },
741        };
742        encode_output(buf, builder)?;
743        Ok(())
744    }
745}
746
747impl Decode for Data {
748    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
749        let message = new_protocol::Data::new(buf)?;
750        let mut data = Vec::new();
751        for element in message.data() {
752            data.push(element.data().into_slice().to_owned().into());
753        }
754
755        let decoded = Data { data };
756        buf.advance(message.as_ref().len());
757        Ok(decoded)
758    }
759}
760
761impl Encode for RestoreReady {
762    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
763        let builder = new_protocol::RestoreReadyBuilder {
764            headers: || {
765                self.headers
766                    .iter()
767                    .map(|(name, value)| new_protocol::KeyValueBuilder {
768                        code: *name,
769                        value: value.as_ref(),
770                    })
771            },
772            jobs: self.jobs,
773        };
774        encode_output(buf, builder)?;
775        Ok(())
776    }
777}
778
779impl Decode for RestoreReady {
780    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
781        let message = new_protocol::RestoreReady::new(buf)?;
782        let mut headers = HashMap::new();
783        for header in message.headers() {
784            headers.insert(header.code(), header.value().into_slice().to_vec().into());
785        }
786
787        let decoded = RestoreReady {
788            headers,
789            jobs: message.jobs() as u16,
790        };
791        buf.advance(message.as_ref().len());
792        Ok(decoded)
793    }
794}
795
796impl Encode for (u8, &'_ RawPacket) {
797    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
798        buf.put_u8(self.0);
799        buf.put_u32((self.1.data.len() + 4) as u32);
800        buf.extend(&self.1.data);
801        Ok(())
802    }
803}
804
805impl Decode for RawPacket {
806    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
807        // Skip the message type
808        buf.advance(5);
809        let data = buf.copy_to_bytes(buf.remaining());
810        Ok(RawPacket { data })
811    }
812}