gel_protocol/
client_message.rs

1/*!
2([Website reference](https://www.edgedb.com/docs/reference/protocol/messages)) The [ClientMessage] enum and related types.
3
4```rust,ignore
5pub enum ClientMessage {
6    ClientHandshake(ClientHandshake),
7    ExecuteScript(ExecuteScript),
8    Prepare(Prepare),
9    Parse(Parse),
10    DescribeStatement(DescribeStatement),
11    Execute0(Execute0),
12    Execute1(Execute1),
13    OptimisticExecute(OptimisticExecute),
14    UnknownMessage(u8, Bytes),
15    AuthenticationSaslInitialResponse(SaslInitialResponse),
16    AuthenticationSaslResponse(SaslResponse),
17    Dump(Dump),
18    Restore(Restore),
19    RestoreBlock(RestoreBlock),
20    RestoreEof,
21    Sync,
22    Flush,
23    Terminate,
24}
25```
26*/
27
28use std::collections::HashMap;
29use std::convert::TryFrom;
30use std::sync::Arc;
31
32use bytes::{Buf, BufMut, Bytes};
33use snafu::{ensure, OptionExt};
34use uuid::Uuid;
35
36pub use crate::common::CompilationOptions;
37pub use crate::common::DumpFlags;
38pub use crate::common::{Capabilities, Cardinality, CompilationFlags};
39pub use crate::common::{RawTypedesc, State};
40use crate::encoding::{encode, Decode, Encode, Input, Output};
41use crate::encoding::{Annotations, KeyValues};
42use crate::errors::{self, DecodeError, EncodeError};
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45#[non_exhaustive]
46pub enum ClientMessage {
47    AuthenticationSaslInitialResponse(SaslInitialResponse),
48    AuthenticationSaslResponse(SaslResponse),
49    ClientHandshake(ClientHandshake),
50    Dump2(Dump2),
51    Dump3(Dump3),
52    Parse(Parse), // protocol > 1.0
53    ExecuteScript(ExecuteScript),
54    Execute0(Execute0),
55    Execute1(Execute1),
56    Restore(Restore),
57    RestoreBlock(RestoreBlock),
58    RestoreEof,
59    Sync,
60    Terminate,
61    Prepare(Prepare), // protocol < 1.0
62    DescribeStatement(DescribeStatement),
63    OptimisticExecute(OptimisticExecute),
64    UnknownMessage(u8, Bytes),
65    Flush,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct SaslInitialResponse {
70    pub method: String,
71    pub data: Bytes,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct SaslResponse {
76    pub data: Bytes,
77}
78
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct ClientHandshake {
81    pub major_ver: u16,
82    pub minor_ver: u16,
83    pub params: HashMap<String, String>,
84    pub extensions: HashMap<String, KeyValues>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct ExecuteScript {
89    pub headers: KeyValues,
90    pub script_text: String,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct Prepare {
95    pub headers: KeyValues,
96    pub io_format: IoFormat,
97    pub expected_cardinality: Cardinality,
98    pub statement_name: Bytes,
99    pub command_text: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Parse {
104    pub annotations: Option<Arc<Annotations>>,
105    pub allowed_capabilities: Capabilities,
106    pub compilation_flags: CompilationFlags,
107    pub implicit_limit: Option<u64>,
108    pub output_format: IoFormat,
109    pub expected_cardinality: Cardinality,
110    pub command_text: String,
111    pub state: State,
112    pub input_language: InputLanguage,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct DescribeStatement {
117    pub headers: KeyValues,
118    pub aspect: DescribeAspect,
119    pub statement_name: Bytes,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct Execute0 {
124    pub headers: KeyValues,
125    pub statement_name: Bytes,
126    pub arguments: Bytes,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct Execute1 {
131    pub annotations: Option<Arc<Annotations>>,
132    pub allowed_capabilities: Capabilities,
133    pub compilation_flags: CompilationFlags,
134    pub implicit_limit: Option<u64>,
135    pub output_format: IoFormat,
136    pub expected_cardinality: Cardinality,
137    pub command_text: String,
138    pub state: State,
139    pub input_typedesc_id: Uuid,
140    pub output_typedesc_id: Uuid,
141    pub arguments: Bytes,
142    pub input_language: InputLanguage,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct OptimisticExecute {
147    pub headers: KeyValues,
148    pub io_format: IoFormat,
149    pub expected_cardinality: Cardinality,
150    pub command_text: String,
151    pub input_typedesc_id: Uuid,
152    pub output_typedesc_id: Uuid,
153    pub arguments: Bytes,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct Dump2 {
158    pub headers: KeyValues,
159}
160
161#[derive(Debug, Clone, PartialEq, Eq)]
162pub struct Dump3 {
163    pub annotations: Option<Arc<Annotations>>,
164    pub flags: DumpFlags,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct Restore {
169    pub headers: KeyValues,
170    pub jobs: u16,
171    pub data: Bytes,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct RestoreBlock {
176    pub data: Bytes,
177}
178
179#[derive(Debug, Copy, Clone, PartialEq, Eq)]
180pub enum DescribeAspect {
181    DataDescription = 0x54,
182}
183
184#[derive(Debug, Copy, Clone, PartialEq, Eq)]
185pub enum InputLanguage {
186    EdgeQL = 0x45,
187    SQL = 0x53,
188}
189
190#[derive(Debug, Copy, Clone, PartialEq, Eq)]
191pub enum IoFormat {
192    Binary = 0x62,
193    Json = 0x6a,
194    JsonElements = 0x4a,
195    None = 0x6e,
196}
197
198struct Empty;
199impl ClientMessage {
200    pub fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
201        use ClientMessage::*;
202        match self {
203            ClientHandshake(h) => encode(buf, 0x56, h),
204            AuthenticationSaslInitialResponse(h) => encode(buf, 0x70, h),
205            AuthenticationSaslResponse(h) => encode(buf, 0x72, h),
206            ExecuteScript(h) => encode(buf, 0x51, h),
207            Prepare(h) => encode(buf, 0x50, h),
208            Parse(h) => encode(buf, 0x50, h),
209            DescribeStatement(h) => encode(buf, 0x44, h),
210            Execute0(h) => encode(buf, 0x45, h),
211            OptimisticExecute(h) => encode(buf, 0x4f, h),
212            Execute1(h) => encode(buf, 0x4f, h),
213            Dump2(h) => encode(buf, 0x3e, h),
214            Dump3(h) => encode(buf, 0x3e, h),
215            Restore(h) => encode(buf, 0x3c, h),
216            RestoreBlock(h) => encode(buf, 0x3d, h),
217            RestoreEof => encode(buf, 0x2e, &Empty),
218            Sync => encode(buf, 0x53, &Empty),
219            Flush => encode(buf, 0x48, &Empty),
220            Terminate => encode(buf, 0x58, &Empty),
221
222            UnknownMessage(_, _) => errors::UnknownMessageCantBeEncoded.fail()?,
223        }
224    }
225    /// Decode exactly one frame from the buffer.
226    ///
227    /// This expects a full frame to already be in the buffer. It can return
228    /// an arbitrary error or be silent if a message is only partially present
229    /// in the buffer or if extra data is present.
230    pub fn decode(buf: &mut Input) -> Result<ClientMessage, DecodeError> {
231        use self::ClientMessage as M;
232        let mut data = buf.slice(5..);
233        let result = match buf[0] {
234            0x56 => ClientHandshake::decode(&mut data).map(M::ClientHandshake)?,
235            0x70 => {
236                SaslInitialResponse::decode(&mut data).map(M::AuthenticationSaslInitialResponse)?
237            }
238            0x72 => SaslResponse::decode(&mut data).map(M::AuthenticationSaslResponse)?,
239            0x51 => ExecuteScript::decode(&mut data).map(M::ExecuteScript)?,
240            0x50 => {
241                if buf.proto().is_1() {
242                    Parse::decode(&mut data).map(M::Parse)?
243                } else {
244                    Prepare::decode(&mut data).map(M::Prepare)?
245                }
246            }
247            0x45 => Execute0::decode(&mut data).map(M::Execute0)?,
248            0x4f => {
249                if buf.proto().is_1() {
250                    Execute1::decode(&mut data).map(M::Execute1)?
251                } else {
252                    OptimisticExecute::decode(&mut data).map(M::OptimisticExecute)?
253                }
254            }
255            0x3e => {
256                if buf.proto().is_3() {
257                    Dump3::decode(&mut data).map(M::Dump3)?
258                } else {
259                    Dump2::decode(&mut data).map(M::Dump2)?
260                }
261            }
262            0x3c => Restore::decode(&mut data).map(M::Restore)?,
263            0x3d => RestoreBlock::decode(&mut data).map(M::RestoreBlock)?,
264            0x2e => M::RestoreEof,
265            0x53 => M::Sync,
266            0x48 => M::Flush,
267            0x58 => M::Terminate,
268            0x44 => DescribeStatement::decode(&mut data).map(M::DescribeStatement)?,
269            code => M::UnknownMessage(code, data.copy_to_bytes(data.remaining())),
270        };
271        ensure!(data.remaining() == 0, errors::ExtraData);
272        Ok(result)
273    }
274}
275
276impl Encode for Empty {
277    fn encode(&self, _buf: &mut Output) -> Result<(), EncodeError> {
278        Ok(())
279    }
280}
281
282impl Encode for ClientHandshake {
283    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
284        buf.reserve(8);
285        buf.put_u16(self.major_ver);
286        buf.put_u16(self.minor_ver);
287        buf.put_u16(
288            u16::try_from(self.params.len())
289                .ok()
290                .context(errors::TooManyParams)?,
291        );
292        for (k, v) in &self.params {
293            k.encode(buf)?;
294            v.encode(buf)?;
295        }
296        buf.reserve(2);
297        buf.put_u16(
298            u16::try_from(self.extensions.len())
299                .ok()
300                .context(errors::TooManyExtensions)?,
301        );
302        for (name, headers) in &self.extensions {
303            name.encode(buf)?;
304            buf.reserve(2);
305            buf.put_u16(
306                u16::try_from(headers.len())
307                    .ok()
308                    .context(errors::TooManyHeaders)?,
309            );
310            for (&name, value) in headers {
311                buf.reserve(2);
312                buf.put_u16(name);
313                value.encode(buf)?;
314            }
315        }
316        Ok(())
317    }
318}
319
320impl Decode for ClientHandshake {
321    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
322        ensure!(buf.remaining() >= 8, errors::Underflow);
323        let major_ver = buf.get_u16();
324        let minor_ver = buf.get_u16();
325        let num_params = buf.get_u16();
326        let mut params = HashMap::new();
327        for _ in 0..num_params {
328            params.insert(String::decode(buf)?, String::decode(buf)?);
329        }
330
331        ensure!(buf.remaining() >= 2, errors::Underflow);
332        let num_ext = buf.get_u16();
333        let mut extensions = HashMap::new();
334        for _ in 0..num_ext {
335            let name = String::decode(buf)?;
336            ensure!(buf.remaining() >= 2, errors::Underflow);
337            let num_headers = buf.get_u16();
338            let mut headers = HashMap::new();
339            for _ in 0..num_headers {
340                ensure!(buf.remaining() >= 4, errors::Underflow);
341                headers.insert(buf.get_u16(), Bytes::decode(buf)?);
342            }
343            extensions.insert(name, headers);
344        }
345        Ok(ClientHandshake {
346            major_ver,
347            minor_ver,
348            params,
349            extensions,
350        })
351    }
352}
353
354impl Encode for SaslInitialResponse {
355    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
356        self.method.encode(buf)?;
357        self.data.encode(buf)?;
358        Ok(())
359    }
360}
361
362impl Decode for SaslInitialResponse {
363    fn decode(buf: &mut Input) -> Result<SaslInitialResponse, DecodeError> {
364        let method = String::decode(buf)?;
365        let data = Bytes::decode(buf)?;
366        Ok(SaslInitialResponse { method, data })
367    }
368}
369
370impl Encode for SaslResponse {
371    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
372        self.data.encode(buf)?;
373        Ok(())
374    }
375}
376
377impl Decode for SaslResponse {
378    fn decode(buf: &mut Input) -> Result<SaslResponse, DecodeError> {
379        let data = Bytes::decode(buf)?;
380        Ok(SaslResponse { data })
381    }
382}
383
384impl Encode for ExecuteScript {
385    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
386        buf.reserve(6);
387        buf.put_u16(
388            u16::try_from(self.headers.len())
389                .ok()
390                .context(errors::TooManyHeaders)?,
391        );
392        for (&name, value) in &self.headers {
393            buf.reserve(2);
394            buf.put_u16(name);
395            value.encode(buf)?;
396        }
397        self.script_text.encode(buf)?;
398        Ok(())
399    }
400}
401
402impl Decode for ExecuteScript {
403    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
404        ensure!(buf.remaining() >= 6, errors::Underflow);
405        let num_headers = buf.get_u16();
406        let mut headers = HashMap::new();
407        for _ in 0..num_headers {
408            ensure!(buf.remaining() >= 4, errors::Underflow);
409            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
410        }
411        let script_text = String::decode(buf)?;
412        Ok(ExecuteScript {
413            script_text,
414            headers,
415        })
416    }
417}
418
419impl Encode for Prepare {
420    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
421        debug_assert!(!buf.proto().is_1());
422        buf.reserve(12);
423        buf.put_u16(
424            u16::try_from(self.headers.len())
425                .ok()
426                .context(errors::TooManyHeaders)?,
427        );
428        for (&name, value) in &self.headers {
429            buf.reserve(2);
430            buf.put_u16(name);
431            value.encode(buf)?;
432        }
433        buf.reserve(10);
434        buf.put_u8(self.io_format as u8);
435        buf.put_u8(self.expected_cardinality as u8);
436        self.statement_name.encode(buf)?;
437        self.command_text.encode(buf)?;
438        Ok(())
439    }
440}
441
442impl Decode for Prepare {
443    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
444        ensure!(buf.remaining() >= 12, errors::Underflow);
445        let num_headers = buf.get_u16();
446        let mut headers = HashMap::new();
447        for _ in 0..num_headers {
448            ensure!(buf.remaining() >= 4, errors::Underflow);
449            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
450        }
451        ensure!(buf.remaining() >= 8, errors::Underflow);
452        let io_format = match buf.get_u8() {
453            0x62 => IoFormat::Binary,
454            0x6a => IoFormat::Json,
455            0x4a => IoFormat::JsonElements,
456            c => errors::InvalidIoFormat { io_format: c }.fail()?,
457        };
458        let expected_cardinality = TryFrom::try_from(buf.get_u8())?;
459        let statement_name = Bytes::decode(buf)?;
460        let command_text = String::decode(buf)?;
461        Ok(Prepare {
462            headers,
463            io_format,
464            expected_cardinality,
465            statement_name,
466            command_text,
467        })
468    }
469}
470
471impl Encode for DescribeStatement {
472    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
473        buf.reserve(7);
474        buf.put_u16(
475            u16::try_from(self.headers.len())
476                .ok()
477                .context(errors::TooManyHeaders)?,
478        );
479        buf.reserve(5);
480        buf.put_u8(self.aspect as u8);
481        self.statement_name.encode(buf)?;
482        Ok(())
483    }
484}
485
486impl Decode for DescribeStatement {
487    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
488        ensure!(buf.remaining() >= 12, errors::Underflow);
489        let num_headers = buf.get_u16();
490        let mut headers = HashMap::new();
491        for _ in 0..num_headers {
492            ensure!(buf.remaining() >= 4, errors::Underflow);
493            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
494        }
495        ensure!(buf.remaining() >= 8, errors::Underflow);
496        let aspect = match buf.get_u8() {
497            0x54 => DescribeAspect::DataDescription,
498            c => errors::InvalidAspect { aspect: c }.fail()?,
499        };
500        let statement_name = Bytes::decode(buf)?;
501        Ok(DescribeStatement {
502            headers,
503            aspect,
504            statement_name,
505        })
506    }
507}
508
509impl Encode for Execute0 {
510    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
511        debug_assert!(!buf.proto().is_1());
512        buf.reserve(10);
513        buf.put_u16(
514            u16::try_from(self.headers.len())
515                .ok()
516                .context(errors::TooManyHeaders)?,
517        );
518        for (&name, value) in &self.headers {
519            buf.reserve(2);
520            buf.put_u16(name);
521            value.encode(buf)?;
522        }
523        self.statement_name.encode(buf)?;
524        self.arguments.encode(buf)?;
525        Ok(())
526    }
527}
528
529impl Decode for Execute0 {
530    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
531        ensure!(buf.remaining() >= 12, errors::Underflow);
532        let num_headers = buf.get_u16();
533        let mut headers = HashMap::new();
534        for _ in 0..num_headers {
535            ensure!(buf.remaining() >= 4, errors::Underflow);
536            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
537        }
538        let statement_name = Bytes::decode(buf)?;
539        let arguments = Bytes::decode(buf)?;
540        Ok(Execute0 {
541            headers,
542            statement_name,
543            arguments,
544        })
545    }
546}
547
548impl OptimisticExecute {
549    pub fn new(
550        flags: &CompilationOptions,
551        query: &str,
552        arguments: impl Into<Bytes>,
553        input_typedesc_id: Uuid,
554        output_typedesc_id: Uuid,
555    ) -> OptimisticExecute {
556        let mut headers = KeyValues::new();
557        if let Some(limit) = flags.implicit_limit {
558            headers.insert(0xFF01, Bytes::from(limit.to_string()));
559        }
560        if flags.implicit_typenames {
561            headers.insert(0xFF02, "true".into());
562        }
563        if flags.implicit_typeids {
564            headers.insert(0xFF03, "true".into());
565        }
566        let caps = flags.allow_capabilities.bits().to_be_bytes();
567        headers.insert(0xFF04, caps[..].to_vec().into());
568        if flags.explicit_objectids {
569            headers.insert(0xFF03, "true".into());
570        }
571        OptimisticExecute {
572            headers,
573            io_format: flags.io_format,
574            expected_cardinality: flags.expected_cardinality,
575            command_text: query.into(),
576            input_typedesc_id,
577            output_typedesc_id,
578            arguments: arguments.into(),
579        }
580    }
581}
582
583impl Encode for OptimisticExecute {
584    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
585        buf.reserve(2 + 1 + 1 + 4 + 16 + 16 + 4);
586        buf.put_u16(
587            u16::try_from(self.headers.len())
588                .ok()
589                .context(errors::TooManyHeaders)?,
590        );
591        for (&name, value) in &self.headers {
592            buf.reserve(2);
593            buf.put_u16(name);
594            value.encode(buf)?;
595        }
596        buf.reserve(1 + 1 + 4 + 16 + 16 + 4);
597        buf.put_u8(self.io_format as u8);
598        buf.put_u8(self.expected_cardinality as u8);
599        self.command_text.encode(buf)?;
600        self.input_typedesc_id.encode(buf)?;
601        self.output_typedesc_id.encode(buf)?;
602        self.arguments.encode(buf)?;
603        Ok(())
604    }
605}
606
607impl Decode for OptimisticExecute {
608    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
609        ensure!(buf.remaining() >= 12, errors::Underflow);
610        let num_headers = buf.get_u16();
611        let mut headers = HashMap::new();
612        for _ in 0..num_headers {
613            ensure!(buf.remaining() >= 4, errors::Underflow);
614            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
615        }
616        let io_format = match buf.get_u8() {
617            0x62 => IoFormat::Binary,
618            0x6a => IoFormat::Json,
619            0x4a => IoFormat::JsonElements,
620            c => errors::InvalidIoFormat { io_format: c }.fail()?,
621        };
622        let expected_cardinality = TryFrom::try_from(buf.get_u8())?;
623        let command_text = String::decode(buf)?;
624        let input_typedesc_id = Uuid::decode(buf)?;
625        let output_typedesc_id = Uuid::decode(buf)?;
626        let arguments = Bytes::decode(buf)?;
627        Ok(OptimisticExecute {
628            headers,
629            io_format,
630            expected_cardinality,
631            command_text,
632            input_typedesc_id,
633            output_typedesc_id,
634            arguments,
635        })
636    }
637}
638
639impl Encode for Execute1 {
640    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
641        buf.reserve(2 + 3 * 8 + 1 + 1 + 4 + 16 + 4 + 16 + 16 + 4);
642        if let Some(annotations) = self.annotations.as_deref() {
643            buf.put_u16(
644                u16::try_from(annotations.len())
645                    .ok()
646                    .context(errors::TooManyHeaders)?,
647            );
648            for (name, value) in annotations {
649                buf.reserve(4);
650                name.encode(buf)?;
651                value.encode(buf)?;
652            }
653        } else {
654            buf.put_u16(0);
655        }
656        buf.reserve(3 * 8 + 1 + 1 + 4 + 16 + 4 + 16 + 16 + 4);
657        buf.put_u64(self.allowed_capabilities.bits());
658        buf.put_u64(self.compilation_flags.bits());
659        buf.put_u64(self.implicit_limit.unwrap_or(0));
660        if buf.proto().is_multilingual() {
661            buf.put_u8(self.input_language as u8);
662        }
663        buf.put_u8(self.output_format as u8);
664        buf.put_u8(self.expected_cardinality as u8);
665        self.command_text.encode(buf)?;
666        self.state.typedesc_id.encode(buf)?;
667        self.state.data.encode(buf)?;
668        self.input_typedesc_id.encode(buf)?;
669        self.output_typedesc_id.encode(buf)?;
670        self.arguments.encode(buf)?;
671        Ok(())
672    }
673}
674
675impl Decode for Execute1 {
676    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
677        ensure!(
678            buf.remaining() >= 2 + 3 * 8 + 2 + 4 + 16 + 4 + 16 + 16 + 4,
679            errors::Underflow
680        );
681        let num_annotations = buf.get_u16();
682        let annotations = if num_annotations == 0 {
683            None
684        } else {
685            let mut annotations = HashMap::new();
686            for _ in 0..num_annotations {
687                ensure!(buf.remaining() >= 4, errors::Underflow);
688                annotations.insert(String::decode(buf)?, String::decode(buf)?);
689            }
690            Some(Arc::new(annotations))
691        };
692        ensure!(
693            buf.remaining() >= 3 * 8 + 2 + 4 + 16 + 4 + 16 + 16 + 4,
694            errors::Underflow
695        );
696        let allowed_capabilities = decode_capabilities(buf.get_u64())?;
697        let compilation_flags = decode_compilation_flags(buf.get_u64())?;
698        let implicit_limit = match buf.get_u64() {
699            0 => None,
700            val => Some(val),
701        };
702        let input_language = if buf.proto().is_multilingual() {
703            TryFrom::try_from(buf.get_u8())?
704        } else {
705            InputLanguage::EdgeQL
706        };
707        let output_format = match buf.get_u8() {
708            0x62 => IoFormat::Binary,
709            0x6a => IoFormat::Json,
710            0x4a => IoFormat::JsonElements,
711            c => errors::InvalidIoFormat { io_format: c }.fail()?,
712        };
713        let expected_cardinality = TryFrom::try_from(buf.get_u8())?;
714        let command_text = String::decode(buf)?;
715        let state = State {
716            typedesc_id: Uuid::decode(buf)?,
717            data: Bytes::decode(buf)?,
718        };
719        let input_typedesc_id = Uuid::decode(buf)?;
720        let output_typedesc_id = Uuid::decode(buf)?;
721        let arguments = Bytes::decode(buf)?;
722        Ok(Execute1 {
723            annotations,
724            allowed_capabilities,
725            compilation_flags,
726            implicit_limit,
727            output_format,
728            expected_cardinality,
729            command_text,
730            state,
731            input_typedesc_id,
732            output_typedesc_id,
733            arguments,
734            input_language,
735        })
736    }
737}
738
739impl Encode for Dump2 {
740    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
741        buf.reserve(10);
742        buf.put_u16(
743            u16::try_from(self.headers.len())
744                .ok()
745                .context(errors::TooManyHeaders)?,
746        );
747        for (&name, value) in &self.headers {
748            buf.reserve(2);
749            buf.put_u16(name);
750            value.encode(buf)?;
751        }
752        Ok(())
753    }
754}
755
756impl Decode for Dump2 {
757    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
758        ensure!(buf.remaining() >= 12, errors::Underflow);
759        let num_headers = buf.get_u16();
760        let mut headers = HashMap::new();
761        for _ in 0..num_headers {
762            ensure!(buf.remaining() >= 4, errors::Underflow);
763            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
764        }
765        Ok(Dump2 { headers })
766    }
767}
768
769impl Encode for Dump3 {
770    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
771        buf.reserve(2 + 8);
772        if let Some(annotations) = self.annotations.as_deref() {
773            buf.put_u16(
774                u16::try_from(annotations.len())
775                    .ok()
776                    .context(errors::TooManyHeaders)?,
777            );
778            for (name, value) in annotations {
779                buf.reserve(4);
780                name.encode(buf)?;
781                value.encode(buf)?;
782            }
783        } else {
784            buf.put_u16(0);
785        }
786        buf.put_u64(self.flags.bits());
787        Ok(())
788    }
789}
790
791impl Decode for Dump3 {
792    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
793        ensure!(buf.remaining() >= 2, errors::Underflow);
794        let num_headers = buf.get_u16();
795        let annotations = if num_headers == 0 {
796            None
797        } else {
798            let mut annotations = HashMap::new();
799            for _ in 0..num_headers {
800                ensure!(buf.remaining() >= 8, errors::Underflow);
801                annotations.insert(String::decode(buf)?, String::decode(buf)?);
802            }
803            Some(Arc::new(annotations))
804        };
805        ensure!(buf.remaining() >= 8, errors::Underflow);
806        let flags = decode_dump_flags(buf.get_u64())?;
807        Ok(Dump3 { annotations, flags })
808    }
809}
810
811impl Encode for Restore {
812    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
813        buf.reserve(4 + self.data.len());
814        buf.put_u16(
815            u16::try_from(self.headers.len())
816                .ok()
817                .context(errors::TooManyHeaders)?,
818        );
819        for (&name, value) in &self.headers {
820            buf.reserve(2);
821            buf.put_u16(name);
822            value.encode(buf)?;
823        }
824        buf.put_u16(self.jobs);
825        buf.extend(&self.data);
826        Ok(())
827    }
828}
829
830impl Decode for Restore {
831    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
832        ensure!(buf.remaining() >= 4, errors::Underflow);
833
834        let num_headers = buf.get_u16();
835        let mut headers = HashMap::new();
836        for _ in 0..num_headers {
837            ensure!(buf.remaining() >= 4, errors::Underflow);
838            headers.insert(buf.get_u16(), Bytes::decode(buf)?);
839        }
840
841        let jobs = buf.get_u16();
842
843        let data = buf.copy_to_bytes(buf.remaining());
844        Ok(Restore {
845            jobs,
846            headers,
847            data,
848        })
849    }
850}
851
852impl Encode for RestoreBlock {
853    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
854        buf.extend(&self.data);
855        Ok(())
856    }
857}
858
859impl Decode for RestoreBlock {
860    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
861        let data = buf.copy_to_bytes(buf.remaining());
862        Ok(RestoreBlock { data })
863    }
864}
865
866impl Parse {
867    pub fn new(
868        opts: &CompilationOptions,
869        query: &str,
870        state: State,
871        annotations: Option<Arc<Annotations>>,
872    ) -> Parse {
873        Parse {
874            annotations,
875            allowed_capabilities: opts.allow_capabilities,
876            compilation_flags: opts.flags(),
877            implicit_limit: opts.implicit_limit,
878            output_format: opts.io_format,
879            expected_cardinality: opts.expected_cardinality,
880            command_text: query.into(),
881            state,
882            input_language: opts.input_language,
883        }
884    }
885}
886
887impl Prepare {
888    pub fn new(flags: &CompilationOptions, query: &str) -> Prepare {
889        let mut headers = KeyValues::new();
890        if let Some(limit) = flags.implicit_limit {
891            headers.insert(0xFF01, Bytes::from(limit.to_string()));
892        }
893        if flags.implicit_typenames {
894            headers.insert(0xFF02, "true".into());
895        }
896        if flags.implicit_typeids {
897            headers.insert(0xFF03, "true".into());
898        }
899        let caps = flags.allow_capabilities.bits().to_be_bytes();
900        headers.insert(0xFF04, caps[..].to_vec().into());
901        if flags.explicit_objectids {
902            headers.insert(0xFF03, "true".into());
903        }
904        Prepare {
905            headers,
906            io_format: flags.io_format,
907            expected_cardinality: flags.expected_cardinality,
908            statement_name: Bytes::from(""),
909            command_text: query.into(),
910        }
911    }
912}
913
914fn decode_capabilities(val: u64) -> Result<Capabilities, DecodeError> {
915    Capabilities::from_bits(val)
916        .ok_or_else(|| errors::InvalidCapabilities { capabilities: val }.build())
917}
918
919fn decode_compilation_flags(val: u64) -> Result<CompilationFlags, DecodeError> {
920    CompilationFlags::from_bits(val).ok_or_else(|| {
921        errors::InvalidCompilationFlags {
922            compilation_flags: val,
923        }
924        .build()
925    })
926}
927
928fn decode_dump_flags(val: u64) -> Result<DumpFlags, DecodeError> {
929    DumpFlags::from_bits(val).ok_or_else(|| errors::InvalidDumpFlags { dump_flags: val }.build())
930}
931
932impl Decode for Parse {
933    fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
934        ensure!(buf.remaining() >= 52, errors::Underflow);
935        let num_headers = buf.get_u16();
936        let annotations = if num_headers == 0 {
937            None
938        } else {
939            let mut annotations = HashMap::new();
940            for _ in 0..num_headers {
941                ensure!(buf.remaining() >= 8, errors::Underflow);
942                annotations.insert(String::decode(buf)?, String::decode(buf)?);
943            }
944            Some(Arc::new(annotations))
945        };
946        ensure!(buf.remaining() >= 50, errors::Underflow);
947        let allowed_capabilities = decode_capabilities(buf.get_u64())?;
948        let compilation_flags = decode_compilation_flags(buf.get_u64())?;
949        let implicit_limit = match buf.get_u64() {
950            0 => None,
951            val => Some(val),
952        };
953        let input_language = if buf.proto().is_multilingual() {
954            TryFrom::try_from(buf.get_u8())?
955        } else {
956            InputLanguage::EdgeQL
957        };
958        let output_format = match buf.get_u8() {
959            0x62 => IoFormat::Binary,
960            0x6a => IoFormat::Json,
961            0x4a => IoFormat::JsonElements,
962            c => errors::InvalidIoFormat { io_format: c }.fail()?,
963        };
964        let expected_cardinality = TryFrom::try_from(buf.get_u8())?;
965        let command_text = String::decode(buf)?;
966        let state = State {
967            typedesc_id: Uuid::decode(buf)?,
968            data: Bytes::decode(buf)?,
969        };
970        Ok(Parse {
971            annotations,
972            allowed_capabilities,
973            compilation_flags,
974            implicit_limit,
975            output_format,
976            expected_cardinality,
977            command_text,
978            state,
979            input_language,
980        })
981    }
982}
983
984impl Encode for Parse {
985    fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
986        debug_assert!(buf.proto().is_1());
987        buf.reserve(52);
988        if let Some(annotations) = self.annotations.as_deref() {
989            buf.put_u16(
990                u16::try_from(annotations.len())
991                    .ok()
992                    .context(errors::TooManyHeaders)?,
993            );
994            for (name, value) in annotations {
995                buf.reserve(8);
996                name.encode(buf)?;
997                value.encode(buf)?;
998            }
999        } else {
1000            buf.put_u16(0);
1001        }
1002        buf.reserve(50);
1003        buf.put_u64(self.allowed_capabilities.bits());
1004        buf.put_u64(self.compilation_flags.bits());
1005        buf.put_u64(self.implicit_limit.unwrap_or(0));
1006        if buf.proto().is_multilingual() {
1007            buf.put_u8(self.input_language as u8);
1008        }
1009        buf.put_u8(self.output_format as u8);
1010        buf.put_u8(self.expected_cardinality as u8);
1011        self.command_text.encode(buf)?;
1012        self.state.typedesc_id.encode(buf)?;
1013        self.state.data.encode(buf)?;
1014        Ok(())
1015    }
1016}