1use std::collections::HashMap;
29use std::convert::TryFrom;
30
31use bytes::{Buf, BufMut, Bytes};
32use uuid::Uuid;
33
34use crate::common::Capabilities;
35pub use crate::common::{Cardinality, RawTypedesc, State};
36use crate::descriptors::Typedesc;
37use crate::encoding::{Annotations, Decode, Encode, Input, KeyValues, Output};
38use crate::errors::{self, DecodeError, EncodeError, MessageTooLong};
39use crate::features::ProtocolVersion;
40use crate::new_protocol::{
41 self, prelude::EncoderForExt, AnnotationBuilder, ProtocolExtensionBuilder,
42 ServerHandshakeBuilder,
43};
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46#[non_exhaustive]
47pub enum ServerMessage {
48 Authentication(Authentication),
49 CommandComplete1(CommandComplete1),
50 CommandDataDescription1(CommandDataDescription1),
51 StateDataDescription(StateDataDescription),
52 Data(Data),
53 DumpHeader(RawPacket),
56 DumpBlock(RawPacket),
57 ErrorResponse(ErrorResponse),
58 LogMessage(LogMessage),
59 ParameterStatus(ParameterStatus),
60 ReadyForCommand(ReadyForCommand),
61 RestoreReady(RestoreReady),
62 ServerHandshake(ServerHandshake),
63 ServerKeyData(ServerKeyData),
64 UnknownMessage(u8, Bytes),
65}
66
67pub use crate::new_protocol::TransactionState;
68
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct ReadyForCommand {
71 pub annotations: Annotations,
72 pub transaction_state: TransactionState,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub enum Authentication {
77 Ok,
78 Sasl { methods: Vec<String> },
79 SaslContinue { data: Bytes },
80 SaslFinal { data: Bytes },
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum ErrorSeverity {
85 Error,
86 Fatal,
87 Panic,
88 Unknown(u8),
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum MessageSeverity {
93 Debug,
94 Info,
95 Notice,
96 Warning,
97 Unknown(u8),
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct ErrorResponse {
102 pub severity: ErrorSeverity,
103 pub code: u32,
104 pub message: String,
105 pub attributes: KeyValues,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct LogMessage {
110 pub severity: MessageSeverity,
111 pub code: u32,
112 pub text: String,
113 pub annotations: Annotations,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct ServerHandshake {
118 pub major_ver: u16,
119 pub minor_ver: u16,
120 pub extensions: HashMap<String, Annotations>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct ServerKeyData {
125 pub data: [u8; 32],
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct ParameterStatus {
130 pub proto: ProtocolVersion,
131 pub name: Bytes,
132 pub value: Bytes,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct CommandComplete1 {
137 pub annotations: Annotations,
138 pub capabilities: Capabilities,
139 pub status: String,
140 pub state: Option<State>,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct ParseComplete {
145 pub headers: KeyValues,
146 pub cardinality: Cardinality,
147 pub input_typedesc_id: Uuid,
148 pub output_typedesc_id: Uuid,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct CommandDataDescription1 {
153 pub annotations: Annotations,
154 pub capabilities: Capabilities,
155 pub result_cardinality: Cardinality,
156 pub input: RawTypedesc,
157 pub output: RawTypedesc,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct StateDataDescription {
162 pub typedesc: RawTypedesc,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct Data {
167 pub data: Vec<Bytes>,
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct RestoreReady {
172 pub headers: KeyValues,
173 pub jobs: u16,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct RawPacket {
178 pub data: Bytes,
179}
180
181fn encode<T: Encode>(buf: &mut Output, msg: &T) -> Result<(), EncodeError> {
182 msg.encode(buf)?;
183 Ok(())
184}
185
186impl CommandDataDescription1 {
187 pub fn output(&self) -> Result<Typedesc, DecodeError> {
188 self.output.decode()
189 }
190 pub fn input(&self) -> Result<Typedesc, DecodeError> {
191 self.input.decode()
192 }
193}
194
195fn encode_output<T: 'static>(
197 buf: &mut Output,
198 builder: impl new_protocol::prelude::EncoderFor<T>,
199) -> Result<(), EncodeError> {
200 let len = builder.measure();
201 buf.reserve(len);
202 let len = builder
203 .encode_buffer_uninit(buf.uninit())
204 .map_err(|_| MessageTooLong.build())?
205 .len();
206 unsafe { buf.advance_mut(len) };
207 Ok(())
208}
209
210impl StateDataDescription {
211 pub fn parse(self) -> Result<Typedesc, DecodeError> {
212 self.typedesc.decode()
213 }
214}
215
216impl ParameterStatus {
217 pub fn parse_system_config(self) -> Result<(Typedesc, Bytes), DecodeError> {
218 let cur = &mut Input::new(self.proto.clone(), self.value);
219 let typedesc_data = Bytes::decode(cur)?;
220 let data = Bytes::decode(cur)?;
221
222 let typedesc_buf = &mut Input::new(self.proto, typedesc_data);
223 let typedesc_id = Uuid::decode(typedesc_buf)?;
224 let typedesc = Typedesc::decode_with_id(typedesc_id, typedesc_buf)?;
225 Ok((typedesc, data))
226 }
227}
228
229impl ServerMessage {
230 pub fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
231 use ServerMessage::*;
232 match self {
233 ServerHandshake(h) => encode(buf, h),
234 ErrorResponse(h) => encode(buf, h),
235 LogMessage(h) => encode(buf, h),
236 Authentication(h) => encode(buf, h),
237 ReadyForCommand(h) => encode(buf, h),
238 ServerKeyData(h) => encode(buf, h),
239 ParameterStatus(h) => encode(buf, h),
240 CommandComplete1(h) => encode(buf, h),
241 CommandDataDescription1(h) => encode(buf, h),
242 StateDataDescription(h) => encode(buf, h),
243 Data(h) => encode(buf, h),
244 RestoreReady(h) => encode(buf, h),
245 DumpHeader(h) => encode(buf, &(b'@', h)),
246 DumpBlock(h) => encode(buf, &(b'=', h)),
247
248 UnknownMessage(_, _) => errors::UnknownMessageCantBeEncoded.fail()?,
249 }
250 }
251 pub fn decode(buf: &mut Input) -> Result<ServerMessage, DecodeError> {
257 use self::ServerMessage as M;
258 let message = new_protocol::Message::new(buf)?;
259 let mut next = buf.slice(..message.mlen() + 1);
260 buf.advance(message.mlen() + 1);
261 let buf = &mut next;
262
263 let result = match buf[0] {
264 0x76 => ServerHandshake::decode(buf).map(M::ServerHandshake)?,
265 0x45 => ErrorResponse::decode(buf).map(M::ErrorResponse)?,
266 0x4c => LogMessage::decode(buf).map(M::LogMessage)?,
267 0x52 => Authentication::decode(buf).map(M::Authentication)?,
268 0x5a => ReadyForCommand::decode(buf).map(M::ReadyForCommand)?,
269 0x4b => ServerKeyData::decode(buf).map(M::ServerKeyData)?,
270 0x53 => ParameterStatus::decode(buf).map(M::ParameterStatus)?,
271 0x43 => CommandComplete1::decode(buf).map(M::CommandComplete1)?,
272 0x44 => Data::decode(buf).map(M::Data)?,
273 0x2b => RestoreReady::decode(buf).map(M::RestoreReady)?,
274 0x40 => RawPacket::decode(buf).map(M::DumpHeader)?,
275 0x3d => RawPacket::decode(buf).map(M::DumpBlock)?,
276 0x54 => CommandDataDescription1::decode(buf).map(M::CommandDataDescription1)?,
277 0x73 => StateDataDescription::decode(buf).map(M::StateDataDescription)?,
278 code => M::UnknownMessage(code, buf.copy_to_bytes(buf.remaining())),
279 };
280 Ok(result)
281 }
282}
283
284impl Encode for ServerHandshake {
285 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
286 let extensions = || {
287 self.extensions.iter().map(|(name, headers)| {
288 let annotations = move || {
289 headers
290 .iter()
291 .map(|(name, value)| AnnotationBuilder { name, value })
292 };
293 ProtocolExtensionBuilder { name, annotations }
294 })
295 };
296 let builder = ServerHandshakeBuilder {
297 major_ver: self.major_ver,
298 minor_ver: self.minor_ver,
299 extensions,
300 };
301
302 encode_output(buf, builder)?;
303 Ok(())
304 }
305}
306
307impl Decode for ServerHandshake {
308 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
309 let message = new_protocol::ServerHandshake::new(buf)?;
310 let mut extensions = HashMap::new();
311 for ext in message.extensions() {
312 let mut headers = HashMap::new();
313 for ann in ext.annotations() {
314 headers.insert(
315 ann.name().to_string_lossy().to_string(),
316 ann.value().to_string_lossy().to_string(),
317 );
318 }
319 extensions.insert(ext.name().to_string_lossy().to_string(), headers);
320 }
321
322 let decoded = ServerHandshake {
323 major_ver: message.major_ver(),
324 minor_ver: message.minor_ver(),
325 extensions,
326 };
327 buf.advance(message.as_ref().len());
328 Ok(decoded)
329 }
330}
331
332impl Encode for ErrorResponse {
333 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
334 let builder = new_protocol::ErrorResponseBuilder {
335 severity: self.severity.to_u8(),
336 error_code: self.code,
337 message: &self.message,
338 attributes: || {
339 self.attributes
340 .iter()
341 .map(|(name, value)| new_protocol::KeyValueBuilder {
342 code: *name,
343 value: value.as_ref(),
344 })
345 },
346 };
347 encode_output(buf, builder)?;
348
349 Ok(())
350 }
351}
352
353impl Decode for ErrorResponse {
354 fn decode(buf: &mut Input) -> Result<ErrorResponse, DecodeError> {
355 let message = new_protocol::ErrorResponse::new(buf)?;
356 let mut attributes = HashMap::new();
357 for attr in message.attributes() {
358 attributes.insert(attr.code(), attr.value().into_slice().to_vec().into());
359 }
360
361 let decoded = ErrorResponse {
362 severity: ErrorSeverity::from_u8(message.severity()),
363 code: message.error_code() as u32,
364 message: message.message().to_string_lossy().to_string(),
365 attributes,
366 };
367 buf.advance(message.as_ref().len());
368 Ok(decoded)
369 }
370}
371
372impl Encode for LogMessage {
373 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
374 let builder = new_protocol::LogMessageBuilder {
375 severity: self.severity.to_u8(),
376 code: self.code as i32,
377 text: &self.text,
378 annotations: || {
379 self.annotations
380 .iter()
381 .map(|(name, value)| new_protocol::AnnotationBuilder { name, value })
382 },
383 };
384 encode_output(buf, builder)?;
385
386 Ok(())
387 }
388}
389
390impl Decode for LogMessage {
391 fn decode(buf: &mut Input) -> Result<LogMessage, DecodeError> {
392 let message = new_protocol::LogMessage::new(buf)?;
393 let mut annotations = HashMap::new();
394 for ann in message.annotations() {
395 annotations.insert(
396 ann.name().to_string_lossy().to_string(),
397 ann.value().to_string_lossy().to_string(),
398 );
399 }
400
401 let decoded = LogMessage {
402 severity: MessageSeverity::from_u8(message.severity()),
403 code: message.code() as u32,
404 text: message.text().to_string_lossy().to_string(),
405 annotations,
406 };
407 buf.advance(message.as_ref().len());
408 Ok(decoded)
409 }
410}
411
412impl Encode for Authentication {
413 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
414 use Authentication as A;
415 match self {
416 A::Ok => encode_output(buf, new_protocol::AuthenticationOkBuilder {})?,
417 A::Sasl { methods } => {
418 let builder = new_protocol::AuthenticationRequiredSASLMessageBuilder {
419 methods: methods.as_slice(),
420 };
421 encode_output(buf, builder)?;
422 }
423 A::SaslContinue { data } => {
424 let builder = new_protocol::AuthenticationSASLContinueBuilder {
425 sasl_data: data.as_ref(),
426 };
427 encode_output(buf, builder)?;
428 }
429 A::SaslFinal { data } => {
430 let builder = new_protocol::AuthenticationSASLFinalBuilder {
431 sasl_data: data.as_ref(),
432 };
433 encode_output(buf, builder)?;
434 }
435 }
436 Ok(())
437 }
438}
439
440impl Decode for Authentication {
441 fn decode(buf: &mut Input) -> Result<Authentication, DecodeError> {
442 let auth = new_protocol::Authentication::new(buf)?;
443 match auth.auth_status() {
444 0x0 => Ok(Authentication::Ok),
445 0x0A => {
446 let auth = new_protocol::AuthenticationRequiredSASLMessage::new(buf)?;
447 let mut methods = Vec::new();
448 for method in auth.methods() {
449 methods.push(method.to_string_lossy().to_string());
450 }
451 Ok(Authentication::Sasl { methods })
452 }
453 0x0B => {
454 let auth = new_protocol::AuthenticationSASLContinue::new(buf)?;
455 Ok(Authentication::SaslContinue {
456 data: auth.sasl_data().into_slice().to_owned().into(),
457 })
458 }
459 0x0C => {
460 let auth = new_protocol::AuthenticationSASLFinal::new(buf)?;
461 Ok(Authentication::SaslFinal {
462 data: auth.sasl_data().into_slice().to_owned().into(),
463 })
464 }
465 _ => errors::AuthStatusInvalid {
466 auth_status: buf[0],
467 }
468 .fail()?,
469 }
470 }
471}
472
473impl Encode for ReadyForCommand {
474 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
475 let builder = new_protocol::ReadyForCommandBuilder {
476 transaction_state: self.transaction_state,
477 annotations: || {
478 self.annotations
479 .iter()
480 .map(|(name, value)| new_protocol::AnnotationBuilder { name, value })
481 },
482 };
483 buf.reserve(builder.measure());
484 let len = builder
485 .encode_buffer_uninit(buf.uninit())
486 .map_err(|_| MessageTooLong.build())?
487 .len();
488 unsafe { buf.advance_mut(len) };
489
490 Ok(())
491 }
492}
493impl Decode for ReadyForCommand {
494 fn decode(buf: &mut Input) -> Result<ReadyForCommand, DecodeError> {
495 let message = new_protocol::ReadyForCommand::new(buf)?;
496 let mut annotations = HashMap::new();
497 for ann in message.annotations() {
498 annotations.insert(
499 ann.name().to_string_lossy().to_string(),
500 ann.value().to_string_lossy().to_string(),
501 );
502 }
503
504 let decoded = ReadyForCommand {
505 annotations,
506 transaction_state: message.transaction_state(),
507 };
508 buf.advance(message.as_ref().len());
509 Ok(decoded)
510 }
511}
512
513impl ErrorSeverity {
514 pub fn from_u8(code: u8) -> ErrorSeverity {
515 use ErrorSeverity::*;
516 match code {
517 120 => Error,
518 200 => Fatal,
519 255 => Panic,
520 _ => Unknown(code),
521 }
522 }
523 pub fn to_u8(&self) -> u8 {
524 use ErrorSeverity::*;
525 match *self {
526 Error => 120,
527 Fatal => 200,
528 Panic => 255,
529 Unknown(code) => code,
530 }
531 }
532}
533
534impl MessageSeverity {
535 fn from_u8(code: u8) -> MessageSeverity {
536 use MessageSeverity::*;
537 match code {
538 20 => Debug,
539 40 => Info,
540 60 => Notice,
541 80 => Warning,
542 _ => Unknown(code),
543 }
544 }
545 fn to_u8(self) -> u8 {
546 use MessageSeverity::*;
547 match self {
548 Debug => 20,
549 Info => 40,
550 Notice => 60,
551 Warning => 80,
552 Unknown(code) => code,
553 }
554 }
555}
556
557impl Encode for ServerKeyData {
558 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
559 let builder = new_protocol::ServerKeyDataBuilder { data: self.data };
560 encode_output(buf, builder)?;
561 Ok(())
562 }
563}
564impl Decode for ServerKeyData {
565 fn decode(buf: &mut Input) -> Result<ServerKeyData, DecodeError> {
566 let message = new_protocol::ServerKeyData::new(buf)?;
567 let decoded = ServerKeyData {
568 data: message.data(),
569 };
570 buf.advance(message.as_ref().len());
571 Ok(decoded)
572 }
573}
574
575impl Encode for ParameterStatus {
576 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
577 let builder = new_protocol::ParameterStatusBuilder {
578 name: self.name.as_ref(),
579 value: self.value.as_ref(),
580 };
581 encode_output(buf, builder)?;
582 Ok(())
583 }
584}
585impl Decode for ParameterStatus {
586 fn decode(buf: &mut Input) -> Result<ParameterStatus, DecodeError> {
587 let message = new_protocol::ParameterStatus::new(buf)?;
588 let decoded = ParameterStatus {
589 proto: buf.proto().clone(),
590 name: message.name().into_slice().to_owned().into(),
591 value: message.value().into_slice().to_owned().into(),
592 };
593 buf.advance(message.as_ref().len());
594 Ok(decoded)
595 }
596}
597
598impl Encode for CommandComplete1 {
599 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
600 let builder = new_protocol::CommandCompleteBuilder {
601 annotations: || {
602 self.annotations
603 .iter()
604 .map(|(name, value)| new_protocol::AnnotationBuilder { name, value })
605 },
606 capabilities: self.capabilities.bits(),
607 status: &self.status,
608 state_data: self
609 .state
610 .as_ref()
611 .map(|state| state.data.as_ref())
612 .unwrap_or_default(),
613 state_typedesc_id: self
614 .state
615 .as_ref()
616 .map(|state| state.typedesc_id)
617 .unwrap_or_default(),
618 };
619 encode_output(buf, builder)?;
620
621 Ok(())
622 }
623}
624
625impl Decode for CommandComplete1 {
626 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
627 let message = new_protocol::CommandComplete::new(buf)?;
628 let mut annotations = HashMap::new();
629 for ann in message.annotations() {
630 annotations.insert(
631 ann.name().to_string_lossy().to_string(),
632 ann.value().to_string_lossy().to_string(),
633 );
634 }
635
636 let decoded = CommandComplete1 {
637 annotations,
638 capabilities: Capabilities::from_bits_retain(message.capabilities()),
639 status: message.status().to_string_lossy().to_string(),
640 state: if message.state_typedesc_id() == Uuid::from_u128(0) {
641 None
642 } else {
643 Some(State {
644 typedesc_id: message.state_typedesc_id(),
645 data: message.state_data().into_slice().to_owned().into(),
646 })
647 },
648 };
649 buf.advance(message.as_ref().len());
650 Ok(decoded)
651 }
652}
653
654impl Encode for CommandDataDescription1 {
655 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
656 let builder = new_protocol::CommandDataDescriptionBuilder {
657 annotations: || {
658 self.annotations
659 .iter()
660 .map(|(name, value)| new_protocol::AnnotationBuilder { name, value })
661 },
662 capabilities: self.capabilities.bits(),
663 result_cardinality: self.result_cardinality as u8,
664 input_typedesc_id: self.input.id,
665 input_typedesc: self.input.data.as_ref(),
666 output_typedesc_id: self.output.id,
667 output_typedesc: self.output.data.as_ref(),
668 };
669 encode_output(buf, builder)?;
670 Ok(())
671 }
672}
673
674impl Decode for CommandDataDescription1 {
675 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
676 let message = new_protocol::CommandDataDescription::new(buf)?;
677 let mut annotations = HashMap::new();
678 for ann in message.annotations() {
679 annotations.insert(
680 ann.name().to_string_lossy().to_string(),
681 ann.value().to_string_lossy().to_string(),
682 );
683 }
684
685 let decoded = CommandDataDescription1 {
686 annotations,
687 capabilities: Capabilities::from_bits_retain(message.capabilities()),
688 result_cardinality: TryFrom::try_from(message.result_cardinality())?,
689 input: RawTypedesc {
690 proto: buf.proto().clone(),
691 id: message.input_typedesc_id(),
692 data: message.input_typedesc().into_slice().to_owned().into(),
693 },
694 output: RawTypedesc {
695 proto: buf.proto().clone(),
696 id: message.output_typedesc_id(),
697 data: message.output_typedesc().into_slice().to_owned().into(),
698 },
699 };
700 buf.advance(message.as_ref().len());
701 Ok(decoded)
702 }
703}
704
705impl Encode for StateDataDescription {
706 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
707 let builder = new_protocol::StateDataDescriptionBuilder {
708 typedesc_id: self.typedesc.id,
709 typedesc: self.typedesc.data.as_ref(),
710 };
711 encode_output(buf, builder)?;
712 Ok(())
713 }
714}
715
716impl Decode for StateDataDescription {
717 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
718 let message = new_protocol::StateDataDescription::new(buf)?;
719 let decoded = StateDataDescription {
720 typedesc: RawTypedesc {
721 proto: buf.proto().clone(),
722 id: message.typedesc_id(),
723 data: message.typedesc().into_slice().to_owned().into(),
724 },
725 };
726 buf.advance(message.as_ref().len());
727 Ok(decoded)
728 }
729}
730
731impl Encode for Data {
732 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
733 let builder = new_protocol::DataBuilder {
734 data: || {
735 self.data
736 .iter()
737 .map(|chunk| new_protocol::DataElementBuilder {
738 data: chunk.as_ref(),
739 })
740 },
741 };
742 encode_output(buf, builder)?;
743 Ok(())
744 }
745}
746
747impl Decode for Data {
748 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
749 let message = new_protocol::Data::new(buf)?;
750 let mut data = Vec::new();
751 for element in message.data() {
752 data.push(element.data().into_slice().to_owned().into());
753 }
754
755 let decoded = Data { data };
756 buf.advance(message.as_ref().len());
757 Ok(decoded)
758 }
759}
760
761impl Encode for RestoreReady {
762 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
763 let builder = new_protocol::RestoreReadyBuilder {
764 headers: || {
765 self.headers
766 .iter()
767 .map(|(name, value)| new_protocol::KeyValueBuilder {
768 code: *name,
769 value: value.as_ref(),
770 })
771 },
772 jobs: self.jobs,
773 };
774 encode_output(buf, builder)?;
775 Ok(())
776 }
777}
778
779impl Decode for RestoreReady {
780 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
781 let message = new_protocol::RestoreReady::new(buf)?;
782 let mut headers = HashMap::new();
783 for header in message.headers() {
784 headers.insert(header.code(), header.value().into_slice().to_vec().into());
785 }
786
787 let decoded = RestoreReady {
788 headers,
789 jobs: message.jobs() as u16,
790 };
791 buf.advance(message.as_ref().len());
792 Ok(decoded)
793 }
794}
795
796impl Encode for (u8, &'_ RawPacket) {
797 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
798 buf.put_u8(self.0);
799 buf.put_u32((self.1.data.len() + 4) as u32);
800 buf.extend(&self.1.data);
801 Ok(())
802 }
803}
804
805impl Decode for RawPacket {
806 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
807 buf.advance(5);
809 let data = buf.copy_to_bytes(buf.remaining());
810 Ok(RawPacket { data })
811 }
812}