1use std::collections::HashMap;
29use std::convert::{TryFrom, TryInto};
30
31use bytes::{Buf, BufMut, Bytes};
32use snafu::{ensure, OptionExt};
33use uuid::Uuid;
34
35use crate::common::Capabilities;
36pub use crate::common::{Cardinality, RawTypedesc, State};
37use crate::descriptors::Typedesc;
38use crate::encoding::{Annotations, Decode, Encode, Input, KeyValues, Output};
39use crate::errors::{self, DecodeError, EncodeError};
40use crate::features::ProtocolVersion;
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43#[non_exhaustive]
44pub enum ServerMessage {
45 Authentication(Authentication),
46 CommandComplete0(CommandComplete0),
47 CommandComplete1(CommandComplete1),
48 CommandDataDescription0(CommandDataDescription0), CommandDataDescription1(CommandDataDescription1), StateDataDescription(StateDataDescription),
51 Data(Data),
52 DumpHeader(RawPacket),
55 DumpBlock(RawPacket),
56 ErrorResponse(ErrorResponse),
57 LogMessage(LogMessage),
58 ParameterStatus(ParameterStatus),
59 ReadyForCommand(ReadyForCommand),
60 RestoreReady(RestoreReady),
61 ServerHandshake(ServerHandshake),
62 ServerKeyData(ServerKeyData),
63 UnknownMessage(u8, Bytes),
64 PrepareComplete(PrepareComplete),
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct ReadyForCommand {
69 pub headers: KeyValues,
70 pub transaction_state: TransactionState,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum Authentication {
75 Ok,
76 Sasl { methods: Vec<String> },
77 SaslContinue { data: Bytes },
78 SaslFinal { data: Bytes },
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ErrorSeverity {
83 Error,
84 Fatal,
85 Panic,
86 Unknown(u8),
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum MessageSeverity {
91 Debug,
92 Info,
93 Notice,
94 Warning,
95 Unknown(u8),
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum TransactionState {
100 NotInTransaction = 0x49,
102
103 InTransaction = 0x54,
105
106 InFailedTransaction = 0x45,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct ErrorResponse {
113 pub severity: ErrorSeverity,
114 pub code: u32,
115 pub message: String,
116 pub attributes: KeyValues,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct LogMessage {
121 pub severity: MessageSeverity,
122 pub code: u32,
123 pub text: String,
124 pub attributes: KeyValues,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct ServerHandshake {
129 pub major_ver: u16,
130 pub minor_ver: u16,
131 pub extensions: HashMap<String, KeyValues>,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct ServerKeyData {
136 pub data: [u8; 32],
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
140pub struct ParameterStatus {
141 pub proto: ProtocolVersion,
142 pub name: Bytes,
143 pub value: Bytes,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct CommandComplete0 {
148 pub headers: KeyValues,
149 pub status_data: Bytes,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct CommandComplete1 {
154 pub annotations: Annotations,
155 pub capabilities: Capabilities,
156 pub status_data: Bytes,
157 pub state: Option<State>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct PrepareComplete {
162 pub headers: KeyValues,
163 pub cardinality: Cardinality,
164 pub input_typedesc_id: Uuid,
165 pub output_typedesc_id: Uuid,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct ParseComplete {
170 pub headers: KeyValues,
171 pub cardinality: Cardinality,
172 pub input_typedesc_id: Uuid,
173 pub output_typedesc_id: Uuid,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct CommandDataDescription0 {
178 pub headers: KeyValues,
179 pub result_cardinality: Cardinality,
180 pub input: RawTypedesc,
181 pub output: RawTypedesc,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct CommandDataDescription1 {
186 pub annotations: Annotations,
187 pub capabilities: Capabilities,
188 pub result_cardinality: Cardinality,
189 pub input: RawTypedesc,
190 pub output: RawTypedesc,
191}
192
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct StateDataDescription {
195 pub typedesc: RawTypedesc,
196}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub struct Data {
200 pub data: Vec<Bytes>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct RestoreReady {
205 pub headers: KeyValues,
206 pub jobs: u16,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct RawPacket {
211 pub data: Bytes,
212}
213
214fn encode<T: Encode>(buf: &mut Output, code: u8, msg: &T) -> Result<(), EncodeError> {
215 buf.reserve(5);
216 buf.put_u8(code);
217 let base = buf.len();
218 buf.put_slice(&[0; 4]);
219
220 msg.encode(buf)?;
221
222 let size = u32::try_from(buf.len() - base)
223 .ok()
224 .context(errors::MessageTooLong)?;
225 buf[base..base + 4].copy_from_slice(&size.to_be_bytes()[..]);
226 Ok(())
227}
228
229impl CommandDataDescription0 {
230 pub fn output(&self) -> Result<Typedesc, DecodeError> {
231 self.output.decode()
232 }
233 pub fn input(&self) -> Result<Typedesc, DecodeError> {
234 self.input.decode()
235 }
236}
237
238impl CommandDataDescription1 {
239 pub fn output(&self) -> Result<Typedesc, DecodeError> {
240 self.output.decode()
241 }
242 pub fn input(&self) -> Result<Typedesc, DecodeError> {
243 self.input.decode()
244 }
245}
246
247impl From<CommandDataDescription0> for CommandDataDescription1 {
248 fn from(value: CommandDataDescription0) -> Self {
249 Self {
250 annotations: HashMap::new(),
251 capabilities: decode_capabilities0(&value.headers).unwrap_or(Capabilities::ALL),
252 result_cardinality: value.result_cardinality,
253 input: value.input,
254 output: value.output,
255 }
256 }
257}
258
259impl StateDataDescription {
260 pub fn parse(self) -> Result<Typedesc, DecodeError> {
261 self.typedesc.decode()
262 }
263}
264
265impl ParameterStatus {
266 pub fn parse_system_config(self) -> Result<(Typedesc, Bytes), DecodeError> {
267 let cur = &mut Input::new(self.proto.clone(), self.value);
268 let typedesc_data = Bytes::decode(cur)?;
269 let data = Bytes::decode(cur)?;
270
271 let typedesc_buf = &mut Input::new(self.proto, typedesc_data);
272 let typedesc_id = Uuid::decode(typedesc_buf)?;
273 let typedesc = Typedesc::decode_with_id(typedesc_id, typedesc_buf)?;
274 Ok((typedesc, data))
275 }
276}
277
278impl ServerMessage {
279 pub fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
280 use ServerMessage::*;
281 match self {
282 ServerHandshake(h) => encode(buf, 0x76, h),
283 ErrorResponse(h) => encode(buf, 0x45, h),
284 LogMessage(h) => encode(buf, 0x4c, h),
285 Authentication(h) => encode(buf, 0x52, h),
286 ReadyForCommand(h) => encode(buf, 0x5a, h),
287 ServerKeyData(h) => encode(buf, 0x4b, h),
288 ParameterStatus(h) => encode(buf, 0x53, h),
289 CommandComplete0(h) => encode(buf, 0x43, h),
290 CommandComplete1(h) => encode(buf, 0x43, h),
291 PrepareComplete(h) => encode(buf, 0x31, h),
292 CommandDataDescription0(h) => encode(buf, 0x54, h),
293 CommandDataDescription1(h) => encode(buf, 0x54, h),
294 StateDataDescription(h) => encode(buf, 0x73, h),
295 Data(h) => encode(buf, 0x44, h),
296 RestoreReady(h) => encode(buf, 0x2b, h),
297 DumpHeader(h) => encode(buf, 0x40, h),
298 DumpBlock(h) => encode(buf, 0x3d, h),
299
300 UnknownMessage(_, _) => errors::UnknownMessageCantBeEncoded.fail()?,
301 }
302 }
303 pub fn decode(buf: &mut Input) -> Result<ServerMessage, DecodeError> {
309 use self::ServerMessage as M;
310 let data = &mut buf.slice(5..);
311 let result = match buf[0] {
312 0x76 => ServerHandshake::decode(data).map(M::ServerHandshake)?,
313 0x45 => ErrorResponse::decode(data).map(M::ErrorResponse)?,
314 0x4c => LogMessage::decode(data).map(M::LogMessage)?,
315 0x52 => Authentication::decode(data).map(M::Authentication)?,
316 0x5a => ReadyForCommand::decode(data).map(M::ReadyForCommand)?,
317 0x4b => ServerKeyData::decode(data).map(M::ServerKeyData)?,
318 0x53 => ParameterStatus::decode(data).map(M::ParameterStatus)?,
319 0x43 => {
320 if buf.proto().is_1() {
321 CommandComplete1::decode(data).map(M::CommandComplete1)?
322 } else {
323 CommandComplete0::decode(data).map(M::CommandComplete0)?
324 }
325 }
326 0x31 => PrepareComplete::decode(data).map(M::PrepareComplete)?,
327 0x44 => Data::decode(data).map(M::Data)?,
328 0x2b => RestoreReady::decode(data).map(M::RestoreReady)?,
329 0x40 => RawPacket::decode(data).map(M::DumpHeader)?,
330 0x3d => RawPacket::decode(data).map(M::DumpBlock)?,
331 0x54 => {
332 if buf.proto().is_1() {
333 CommandDataDescription1::decode(data).map(M::CommandDataDescription1)?
334 } else {
335 CommandDataDescription0::decode(data).map(M::CommandDataDescription0)?
336 }
337 }
338 0x73 => StateDataDescription::decode(data).map(M::StateDataDescription)?,
339 code => M::UnknownMessage(code, data.copy_to_bytes(data.remaining())),
340 };
341 ensure!(data.remaining() == 0, errors::ExtraData);
342 Ok(result)
343 }
344}
345
346impl Encode for ServerHandshake {
347 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
348 buf.reserve(6);
349 buf.put_u16(self.major_ver);
350 buf.put_u16(self.minor_ver);
351 buf.put_u16(
352 u16::try_from(self.extensions.len())
353 .ok()
354 .context(errors::TooManyExtensions)?,
355 );
356 for (name, headers) in &self.extensions {
357 name.encode(buf)?;
358 buf.reserve(2);
359 buf.put_u16(
360 u16::try_from(headers.len())
361 .ok()
362 .context(errors::TooManyHeaders)?,
363 );
364 for (&name, value) in headers {
365 buf.reserve(2);
366 buf.put_u16(name);
367 value.encode(buf)?;
368 }
369 }
370 Ok(())
371 }
372}
373
374impl Decode for ServerHandshake {
375 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
376 ensure!(buf.remaining() >= 6, errors::Underflow);
377 let major_ver = buf.get_u16();
378 let minor_ver = buf.get_u16();
379 let num_ext = buf.get_u16();
380 let mut extensions = HashMap::new();
381 for _ in 0..num_ext {
382 let name = String::decode(buf)?;
383 ensure!(buf.remaining() >= 2, errors::Underflow);
384 let num_headers = buf.get_u16();
385 let mut headers = HashMap::new();
386 for _ in 0..num_headers {
387 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
388 }
389 extensions.insert(name, headers);
390 }
391 Ok(ServerHandshake {
392 major_ver,
393 minor_ver,
394 extensions,
395 })
396 }
397}
398
399impl Encode for ErrorResponse {
400 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
401 buf.reserve(11);
402 buf.put_u8(self.severity.to_u8());
403 buf.put_u32(self.code);
404 self.message.encode(buf)?;
405 buf.reserve(2);
406 buf.put_u16(
407 u16::try_from(self.attributes.len())
408 .ok()
409 .context(errors::TooManyHeaders)?,
410 );
411 for (&name, value) in &self.attributes {
412 buf.reserve(2);
413 buf.put_u16(name);
414 value.encode(buf)?;
415 }
416 Ok(())
417 }
418}
419
420impl Decode for ErrorResponse {
421 fn decode(buf: &mut Input) -> Result<ErrorResponse, DecodeError> {
422 ensure!(buf.remaining() >= 11, errors::Underflow);
423 let severity = ErrorSeverity::from_u8(buf.get_u8());
424 let code = buf.get_u32();
425 let message = String::decode(buf)?;
426 ensure!(buf.remaining() >= 2, errors::Underflow);
427 let num_attributes = buf.get_u16();
428 let mut attributes = HashMap::new();
429 for _ in 0..num_attributes {
430 ensure!(buf.remaining() >= 4, errors::Underflow);
431 attributes.insert(buf.get_u16(), Bytes::decode(buf)?);
432 }
433 Ok(ErrorResponse {
434 severity,
435 code,
436 message,
437 attributes,
438 })
439 }
440}
441
442impl Encode for LogMessage {
443 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
444 buf.reserve(11);
445 buf.put_u8(self.severity.to_u8());
446 buf.put_u32(self.code);
447 self.text.encode(buf)?;
448 buf.reserve(2);
449 buf.put_u16(
450 u16::try_from(self.attributes.len())
451 .ok()
452 .context(errors::TooManyHeaders)?,
453 );
454 for (&name, value) in &self.attributes {
455 buf.reserve(2);
456 buf.put_u16(name);
457 value.encode(buf)?;
458 }
459 Ok(())
460 }
461}
462
463impl Decode for LogMessage {
464 fn decode(buf: &mut Input) -> Result<LogMessage, DecodeError> {
465 ensure!(buf.remaining() >= 11, errors::Underflow);
466 let severity = MessageSeverity::from_u8(buf.get_u8());
467 let code = buf.get_u32();
468 let text = String::decode(buf)?;
469 ensure!(buf.remaining() >= 2, errors::Underflow);
470 let num_attributes = buf.get_u16();
471 let mut attributes = HashMap::new();
472 for _ in 0..num_attributes {
473 ensure!(buf.remaining() >= 4, errors::Underflow);
474 attributes.insert(buf.get_u16(), Bytes::decode(buf)?);
475 }
476 Ok(LogMessage {
477 severity,
478 code,
479 text,
480 attributes,
481 })
482 }
483}
484
485impl Encode for Authentication {
486 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
487 use Authentication as A;
488 buf.reserve(1);
489 match self {
490 A::Ok => buf.put_u32(0),
491 A::Sasl { methods } => {
492 buf.put_u32(0x0A);
493 buf.reserve(4);
494 buf.put_u32(
495 methods
496 .len()
497 .try_into()
498 .ok()
499 .context(errors::TooManyMethods)?,
500 );
501 for meth in methods {
502 meth.encode(buf)?;
503 }
504 }
505 A::SaslContinue { data } => {
506 buf.put_u32(0x0B);
507 data.encode(buf)?;
508 }
509 A::SaslFinal { data } => {
510 buf.put_u32(0x0C);
511 data.encode(buf)?;
512 }
513 }
514 Ok(())
515 }
516}
517
518impl Decode for Authentication {
519 fn decode(buf: &mut Input) -> Result<Authentication, DecodeError> {
520 ensure!(buf.remaining() >= 4, errors::Underflow);
521 match buf.get_u32() {
522 0x00 => Ok(Authentication::Ok),
523 0x0A => {
524 ensure!(buf.remaining() >= 4, errors::Underflow);
525 let num_methods = buf.get_u32() as usize;
526 let mut methods = Vec::with_capacity(num_methods);
527 for _ in 0..num_methods {
528 methods.push(String::decode(buf)?);
529 }
530 Ok(Authentication::Sasl { methods })
531 }
532 0x0B => {
533 let data = Bytes::decode(buf)?;
534 Ok(Authentication::SaslContinue { data })
535 }
536 0x0C => {
537 let data = Bytes::decode(buf)?;
538 Ok(Authentication::SaslFinal { data })
539 }
540 c => errors::AuthStatusInvalid { auth_status: c }.fail()?,
541 }
542 }
543}
544
545impl Encode for ReadyForCommand {
546 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
547 buf.reserve(3);
548 buf.put_u16(
549 u16::try_from(self.headers.len())
550 .ok()
551 .context(errors::TooManyHeaders)?,
552 );
553 for (&name, value) in &self.headers {
554 buf.reserve(2);
555 buf.put_u16(name);
556 value.encode(buf)?;
557 }
558 buf.reserve(1);
559 buf.put_u8(self.transaction_state as u8);
560 Ok(())
561 }
562}
563impl Decode for ReadyForCommand {
564 fn decode(buf: &mut Input) -> Result<ReadyForCommand, DecodeError> {
565 use TransactionState::*;
566 ensure!(buf.remaining() >= 3, errors::Underflow);
567 let mut headers = HashMap::new();
568 let num_headers = buf.get_u16();
569 for _ in 0..num_headers {
570 ensure!(buf.remaining() >= 4, errors::Underflow);
571 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
572 }
573 ensure!(buf.remaining() >= 1, errors::Underflow);
574 let transaction_state = match buf.get_u8() {
575 0x49 => NotInTransaction,
576 0x54 => InTransaction,
577 0x45 => InFailedTransaction,
578 s => errors::InvalidTransactionState {
579 transaction_state: s,
580 }
581 .fail()?,
582 };
583 Ok(ReadyForCommand {
584 headers,
585 transaction_state,
586 })
587 }
588}
589
590impl ErrorSeverity {
591 pub fn from_u8(code: u8) -> ErrorSeverity {
592 use ErrorSeverity::*;
593 match code {
594 120 => Error,
595 200 => Fatal,
596 255 => Panic,
597 _ => Unknown(code),
598 }
599 }
600 pub fn to_u8(&self) -> u8 {
601 use ErrorSeverity::*;
602 match *self {
603 Error => 120,
604 Fatal => 200,
605 Panic => 255,
606 Unknown(code) => code,
607 }
608 }
609}
610
611impl MessageSeverity {
612 fn from_u8(code: u8) -> MessageSeverity {
613 use MessageSeverity::*;
614 match code {
615 20 => Debug,
616 40 => Info,
617 60 => Notice,
618 80 => Warning,
619 _ => Unknown(code),
620 }
621 }
622 fn to_u8(self) -> u8 {
623 use MessageSeverity::*;
624 match self {
625 Debug => 20,
626 Info => 40,
627 Notice => 60,
628 Warning => 80,
629 Unknown(code) => code,
630 }
631 }
632}
633
634impl Encode for ServerKeyData {
635 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
636 buf.extend(&self.data[..]);
637 Ok(())
638 }
639}
640impl Decode for ServerKeyData {
641 fn decode(buf: &mut Input) -> Result<ServerKeyData, DecodeError> {
642 ensure!(buf.remaining() >= 32, errors::Underflow);
643 let mut data = [0u8; 32];
644 buf.copy_to_slice(&mut data[..]);
645 Ok(ServerKeyData { data })
646 }
647}
648
649impl Encode for ParameterStatus {
650 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
651 self.name.encode(buf)?;
652 self.value.encode(buf)?;
653 Ok(())
654 }
655}
656impl Decode for ParameterStatus {
657 fn decode(buf: &mut Input) -> Result<ParameterStatus, DecodeError> {
658 let name = Bytes::decode(buf)?;
659 let value = Bytes::decode(buf)?;
660 Ok(ParameterStatus {
661 proto: buf.proto().clone(),
662 name,
663 value,
664 })
665 }
666}
667
668impl Encode for CommandComplete0 {
669 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
670 buf.reserve(6);
671 buf.put_u16(
672 u16::try_from(self.headers.len())
673 .ok()
674 .context(errors::TooManyHeaders)?,
675 );
676 for (&name, value) in &self.headers {
677 buf.reserve(2);
678 buf.put_u16(name);
679 value.encode(buf)?;
680 }
681 self.status_data.encode(buf)?;
682 Ok(())
683 }
684}
685
686impl Decode for CommandComplete0 {
687 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
688 ensure!(buf.remaining() >= 6, errors::Underflow);
689 let num_headers = buf.get_u16();
690 let mut headers = HashMap::new();
691 for _ in 0..num_headers {
692 ensure!(buf.remaining() >= 4, errors::Underflow);
693 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
694 }
695 let status_data = Bytes::decode(buf)?;
696 Ok(CommandComplete0 {
697 status_data,
698 headers,
699 })
700 }
701}
702
703impl Encode for CommandComplete1 {
704 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
705 buf.reserve(26);
706 buf.put_u16(
707 u16::try_from(self.annotations.len())
708 .ok()
709 .context(errors::TooManyHeaders)?,
710 );
711 for (name, value) in &self.annotations {
712 name.encode(buf)?;
713 value.encode(buf)?;
714 }
715 buf.put_u64(self.capabilities.bits());
716 self.status_data.encode(buf)?;
717 if let Some(state) = &self.state {
718 state.typedesc_id.encode(buf)?;
719 state.data.encode(buf)?;
720 } else {
721 Uuid::from_u128(0).encode(buf)?;
722 Bytes::new().encode(buf)?;
723 }
724 Ok(())
725 }
726}
727
728impl Decode for CommandComplete1 {
729 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
730 ensure!(buf.remaining() >= 26, errors::Underflow);
731 let num_annotations = buf.get_u16();
732 let mut annotations = HashMap::new();
733 for _ in 0..num_annotations {
734 annotations.insert(String::decode(buf)?, String::decode(buf)?);
735 }
736 let capabilities = Capabilities::from_bits_retain(buf.get_u64());
737 let status_data = Bytes::decode(buf)?;
738 let typedesc_id = Uuid::decode(buf)?;
739 let state_data = Bytes::decode(buf)?;
740 let state = if typedesc_id == Uuid::from_u128(0) {
741 None
742 } else {
743 Some(State {
744 typedesc_id,
745 data: state_data,
746 })
747 };
748 Ok(CommandComplete1 {
749 annotations,
750 capabilities,
751 status_data,
752 state,
753 })
754 }
755}
756
757impl Encode for PrepareComplete {
758 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
759 buf.reserve(35);
760 buf.put_u16(
761 u16::try_from(self.headers.len())
762 .ok()
763 .context(errors::TooManyHeaders)?,
764 );
765 for (&name, value) in &self.headers {
766 buf.reserve(2);
767 buf.put_u16(name);
768 value.encode(buf)?;
769 }
770 buf.reserve(33);
771 buf.put_u8(self.cardinality as u8);
772 self.input_typedesc_id.encode(buf)?;
773 self.output_typedesc_id.encode(buf)?;
774 Ok(())
775 }
776}
777
778impl Decode for PrepareComplete {
779 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
780 ensure!(buf.remaining() >= 35, errors::Underflow);
781 let num_headers = buf.get_u16();
782 let mut headers = HashMap::new();
783 for _ in 0..num_headers {
784 ensure!(buf.remaining() >= 4, errors::Underflow);
785 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
786 }
787 ensure!(buf.remaining() >= 33, errors::Underflow);
788 let cardinality = TryFrom::try_from(buf.get_u8())?;
789 let input_typedesc_id = Uuid::decode(buf)?;
790 let output_typedesc_id = Uuid::decode(buf)?;
791 Ok(PrepareComplete {
792 headers,
793 cardinality,
794 input_typedesc_id,
795 output_typedesc_id,
796 })
797 }
798}
799
800impl Encode for CommandDataDescription0 {
801 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
802 debug_assert!(!buf.proto().is_1());
803 buf.reserve(43);
804 buf.put_u16(
805 u16::try_from(self.headers.len())
806 .ok()
807 .context(errors::TooManyHeaders)?,
808 );
809 for (&name, value) in &self.headers {
810 buf.reserve(2);
811 buf.put_u16(name);
812 value.encode(buf)?;
813 }
814 buf.reserve(41);
815 buf.put_u8(self.result_cardinality as u8);
816 self.input.id.encode(buf)?;
817 self.input.data.encode(buf)?;
818 self.output.id.encode(buf)?;
819 self.output.data.encode(buf)?;
820 Ok(())
821 }
822}
823
824impl Decode for CommandDataDescription0 {
825 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
826 ensure!(buf.remaining() >= 43, errors::Underflow);
827 let num_headers = buf.get_u16();
828 let mut headers = HashMap::new();
829 for _ in 0..num_headers {
830 ensure!(buf.remaining() >= 4, errors::Underflow);
831 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
832 }
833 ensure!(buf.remaining() >= 41, errors::Underflow);
834 let result_cardinality = TryFrom::try_from(buf.get_u8())?;
835 let input = RawTypedesc {
836 proto: buf.proto().clone(),
837 id: Uuid::decode(buf)?,
838 data: Bytes::decode(buf)?,
839 };
840 let output = RawTypedesc {
841 proto: buf.proto().clone(),
842 id: Uuid::decode(buf)?,
843 data: Bytes::decode(buf)?,
844 };
845
846 Ok(CommandDataDescription0 {
847 headers,
848 result_cardinality,
849 input,
850 output,
851 })
852 }
853}
854
855impl Encode for CommandDataDescription1 {
856 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
857 debug_assert!(buf.proto().is_1());
858 buf.reserve(51);
859 buf.put_u16(
860 u16::try_from(self.annotations.len())
861 .ok()
862 .context(errors::TooManyHeaders)?,
863 );
864 for (name, value) in &self.annotations {
865 buf.reserve(4);
866 name.encode(buf)?;
867 value.encode(buf)?;
868 }
869 buf.reserve(49);
870 buf.put_u64(self.capabilities.bits());
871 buf.put_u8(self.result_cardinality as u8);
872 self.input.id.encode(buf)?;
873 self.input.data.encode(buf)?;
874 self.output.id.encode(buf)?;
875 self.output.data.encode(buf)?;
876 Ok(())
877 }
878}
879
880impl Decode for CommandDataDescription1 {
881 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
882 ensure!(buf.remaining() >= 51, errors::Underflow);
883 let num_annotations = buf.get_u16();
884 let mut annotations = HashMap::new();
885 for _ in 0..num_annotations {
886 ensure!(buf.remaining() >= 4, errors::Underflow);
887 annotations.insert(String::decode(buf)?, String::decode(buf)?);
888 }
889 ensure!(buf.remaining() >= 49, errors::Underflow);
890 let capabilities = Capabilities::from_bits_retain(buf.get_u64());
891 let result_cardinality = TryFrom::try_from(buf.get_u8())?;
892 let input = RawTypedesc {
893 proto: buf.proto().clone(),
894 id: Uuid::decode(buf)?,
895 data: Bytes::decode(buf)?,
896 };
897 let output = RawTypedesc {
898 proto: buf.proto().clone(),
899 id: Uuid::decode(buf)?,
900 data: Bytes::decode(buf)?,
901 };
902
903 Ok(CommandDataDescription1 {
904 annotations,
905 capabilities,
906 result_cardinality,
907 input,
908 output,
909 })
910 }
911}
912
913impl Encode for StateDataDescription {
914 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
915 debug_assert!(buf.proto().is_1());
916 self.typedesc.id.encode(buf)?;
917 self.typedesc.data.encode(buf)?;
918 Ok(())
919 }
920}
921
922impl Decode for StateDataDescription {
923 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
924 let typedesc = RawTypedesc {
925 proto: buf.proto().clone(),
926 id: Uuid::decode(buf)?,
927 data: Bytes::decode(buf)?,
928 };
929
930 Ok(StateDataDescription { typedesc })
931 }
932}
933
934impl Encode for Data {
935 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
936 buf.reserve(2);
937 buf.put_u16(
938 u16::try_from(self.data.len())
939 .ok()
940 .context(errors::TooManyHeaders)?,
941 );
942 for chunk in &self.data {
943 chunk.encode(buf)?;
944 }
945 Ok(())
946 }
947}
948
949impl Decode for Data {
950 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
951 ensure!(buf.remaining() >= 2, errors::Underflow);
952 let num_chunks = buf.get_u16() as usize;
953 let mut data = Vec::with_capacity(num_chunks);
954 for _ in 0..num_chunks {
955 data.push(Bytes::decode(buf)?);
956 }
957 Ok(Data { data })
958 }
959}
960
961impl Encode for RestoreReady {
962 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
963 buf.reserve(4);
964 buf.put_u16(
965 u16::try_from(self.headers.len())
966 .ok()
967 .context(errors::TooManyHeaders)?,
968 );
969 for (&name, value) in &self.headers {
970 buf.reserve(2);
971 buf.put_u16(name);
972 value.encode(buf)?;
973 }
974 buf.reserve(2);
975 buf.put_u16(self.jobs);
976 Ok(())
977 }
978}
979
980impl Decode for RestoreReady {
981 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
982 ensure!(buf.remaining() >= 4, errors::Underflow);
983 let num_headers = buf.get_u16();
984 let mut headers = HashMap::new();
985 for _ in 0..num_headers {
986 ensure!(buf.remaining() >= 4, errors::Underflow);
987 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
988 }
989 ensure!(buf.remaining() >= 2, errors::Underflow);
990 let jobs = buf.get_u16();
991 Ok(RestoreReady { jobs, headers })
992 }
993}
994
995impl Encode for RawPacket {
996 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
997 buf.extend(&self.data);
998 Ok(())
999 }
1000}
1001
1002impl Decode for RawPacket {
1003 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
1004 Ok(RawPacket {
1005 data: buf.copy_to_bytes(buf.remaining()),
1006 })
1007 }
1008}
1009
1010impl PrepareComplete {
1011 pub fn get_capabilities(&self) -> Option<Capabilities> {
1012 decode_capabilities0(&self.headers)
1013 }
1014}
1015
1016fn decode_capabilities0(headers: &KeyValues) -> Option<Capabilities> {
1017 headers.get(&0x1001).and_then(|bytes| {
1018 if bytes.len() == 8 {
1019 let mut array = [0u8; 8];
1020 array.copy_from_slice(bytes);
1021 Some(Capabilities::from_bits_retain(u64::from_be_bytes(array)))
1022 } else {
1023 None
1024 }
1025 })
1026}