1use std::collections::HashMap;
29use std::convert::TryFrom;
30use std::sync::Arc;
31
32use bytes::{Buf, BufMut, Bytes};
33use snafu::{ensure, OptionExt};
34use uuid::Uuid;
35
36pub use crate::common::CompilationOptions;
37pub use crate::common::DumpFlags;
38pub use crate::common::{Capabilities, Cardinality, CompilationFlags};
39pub use crate::common::{RawTypedesc, State};
40use crate::encoding::{encode, Decode, Encode, Input, Output};
41use crate::encoding::{Annotations, KeyValues};
42use crate::errors::{self, DecodeError, EncodeError};
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45#[non_exhaustive]
46pub enum ClientMessage {
47 AuthenticationSaslInitialResponse(SaslInitialResponse),
48 AuthenticationSaslResponse(SaslResponse),
49 ClientHandshake(ClientHandshake),
50 Dump2(Dump2),
51 Dump3(Dump3),
52 Parse(Parse), ExecuteScript(ExecuteScript),
54 Execute0(Execute0),
55 Execute1(Execute1),
56 Restore(Restore),
57 RestoreBlock(RestoreBlock),
58 RestoreEof,
59 Sync,
60 Terminate,
61 Prepare(Prepare), DescribeStatement(DescribeStatement),
63 OptimisticExecute(OptimisticExecute),
64 UnknownMessage(u8, Bytes),
65 Flush,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct SaslInitialResponse {
70 pub method: String,
71 pub data: Bytes,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct SaslResponse {
76 pub data: Bytes,
77}
78
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct ClientHandshake {
81 pub major_ver: u16,
82 pub minor_ver: u16,
83 pub params: HashMap<String, String>,
84 pub extensions: HashMap<String, KeyValues>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct ExecuteScript {
89 pub headers: KeyValues,
90 pub script_text: String,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct Prepare {
95 pub headers: KeyValues,
96 pub io_format: IoFormat,
97 pub expected_cardinality: Cardinality,
98 pub statement_name: Bytes,
99 pub command_text: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Parse {
104 pub annotations: Option<Arc<Annotations>>,
105 pub allowed_capabilities: Capabilities,
106 pub compilation_flags: CompilationFlags,
107 pub implicit_limit: Option<u64>,
108 pub output_format: IoFormat,
109 pub expected_cardinality: Cardinality,
110 pub command_text: String,
111 pub state: State,
112 pub input_language: InputLanguage,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct DescribeStatement {
117 pub headers: KeyValues,
118 pub aspect: DescribeAspect,
119 pub statement_name: Bytes,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct Execute0 {
124 pub headers: KeyValues,
125 pub statement_name: Bytes,
126 pub arguments: Bytes,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct Execute1 {
131 pub annotations: Option<Arc<Annotations>>,
132 pub allowed_capabilities: Capabilities,
133 pub compilation_flags: CompilationFlags,
134 pub implicit_limit: Option<u64>,
135 pub output_format: IoFormat,
136 pub expected_cardinality: Cardinality,
137 pub command_text: String,
138 pub state: State,
139 pub input_typedesc_id: Uuid,
140 pub output_typedesc_id: Uuid,
141 pub arguments: Bytes,
142 pub input_language: InputLanguage,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct OptimisticExecute {
147 pub headers: KeyValues,
148 pub io_format: IoFormat,
149 pub expected_cardinality: Cardinality,
150 pub command_text: String,
151 pub input_typedesc_id: Uuid,
152 pub output_typedesc_id: Uuid,
153 pub arguments: Bytes,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct Dump2 {
158 pub headers: KeyValues,
159}
160
161#[derive(Debug, Clone, PartialEq, Eq)]
162pub struct Dump3 {
163 pub annotations: Option<Arc<Annotations>>,
164 pub flags: DumpFlags,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct Restore {
169 pub headers: KeyValues,
170 pub jobs: u16,
171 pub data: Bytes,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct RestoreBlock {
176 pub data: Bytes,
177}
178
179#[derive(Debug, Copy, Clone, PartialEq, Eq)]
180pub enum DescribeAspect {
181 DataDescription = 0x54,
182}
183
184#[derive(Debug, Copy, Clone, PartialEq, Eq)]
185pub enum InputLanguage {
186 EdgeQL = 0x45,
187 SQL = 0x53,
188}
189
190#[derive(Debug, Copy, Clone, PartialEq, Eq)]
191pub enum IoFormat {
192 Binary = 0x62,
193 Json = 0x6a,
194 JsonElements = 0x4a,
195 None = 0x6e,
196}
197
198struct Empty;
199impl ClientMessage {
200 pub fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
201 use ClientMessage::*;
202 match self {
203 ClientHandshake(h) => encode(buf, 0x56, h),
204 AuthenticationSaslInitialResponse(h) => encode(buf, 0x70, h),
205 AuthenticationSaslResponse(h) => encode(buf, 0x72, h),
206 ExecuteScript(h) => encode(buf, 0x51, h),
207 Prepare(h) => encode(buf, 0x50, h),
208 Parse(h) => encode(buf, 0x50, h),
209 DescribeStatement(h) => encode(buf, 0x44, h),
210 Execute0(h) => encode(buf, 0x45, h),
211 OptimisticExecute(h) => encode(buf, 0x4f, h),
212 Execute1(h) => encode(buf, 0x4f, h),
213 Dump2(h) => encode(buf, 0x3e, h),
214 Dump3(h) => encode(buf, 0x3e, h),
215 Restore(h) => encode(buf, 0x3c, h),
216 RestoreBlock(h) => encode(buf, 0x3d, h),
217 RestoreEof => encode(buf, 0x2e, &Empty),
218 Sync => encode(buf, 0x53, &Empty),
219 Flush => encode(buf, 0x48, &Empty),
220 Terminate => encode(buf, 0x58, &Empty),
221
222 UnknownMessage(_, _) => errors::UnknownMessageCantBeEncoded.fail()?,
223 }
224 }
225 pub fn decode(buf: &mut Input) -> Result<ClientMessage, DecodeError> {
231 use self::ClientMessage as M;
232 let mut data = buf.slice(5..);
233 let result = match buf[0] {
234 0x56 => ClientHandshake::decode(&mut data).map(M::ClientHandshake)?,
235 0x70 => {
236 SaslInitialResponse::decode(&mut data).map(M::AuthenticationSaslInitialResponse)?
237 }
238 0x72 => SaslResponse::decode(&mut data).map(M::AuthenticationSaslResponse)?,
239 0x51 => ExecuteScript::decode(&mut data).map(M::ExecuteScript)?,
240 0x50 => {
241 if buf.proto().is_1() {
242 Parse::decode(&mut data).map(M::Parse)?
243 } else {
244 Prepare::decode(&mut data).map(M::Prepare)?
245 }
246 }
247 0x45 => Execute0::decode(&mut data).map(M::Execute0)?,
248 0x4f => {
249 if buf.proto().is_1() {
250 Execute1::decode(&mut data).map(M::Execute1)?
251 } else {
252 OptimisticExecute::decode(&mut data).map(M::OptimisticExecute)?
253 }
254 }
255 0x3e => {
256 if buf.proto().is_3() {
257 Dump3::decode(&mut data).map(M::Dump3)?
258 } else {
259 Dump2::decode(&mut data).map(M::Dump2)?
260 }
261 }
262 0x3c => Restore::decode(&mut data).map(M::Restore)?,
263 0x3d => RestoreBlock::decode(&mut data).map(M::RestoreBlock)?,
264 0x2e => M::RestoreEof,
265 0x53 => M::Sync,
266 0x48 => M::Flush,
267 0x58 => M::Terminate,
268 0x44 => DescribeStatement::decode(&mut data).map(M::DescribeStatement)?,
269 code => M::UnknownMessage(code, data.copy_to_bytes(data.remaining())),
270 };
271 ensure!(data.remaining() == 0, errors::ExtraData);
272 Ok(result)
273 }
274}
275
276impl Encode for Empty {
277 fn encode(&self, _buf: &mut Output) -> Result<(), EncodeError> {
278 Ok(())
279 }
280}
281
282impl Encode for ClientHandshake {
283 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
284 buf.reserve(8);
285 buf.put_u16(self.major_ver);
286 buf.put_u16(self.minor_ver);
287 buf.put_u16(
288 u16::try_from(self.params.len())
289 .ok()
290 .context(errors::TooManyParams)?,
291 );
292 for (k, v) in &self.params {
293 k.encode(buf)?;
294 v.encode(buf)?;
295 }
296 buf.reserve(2);
297 buf.put_u16(
298 u16::try_from(self.extensions.len())
299 .ok()
300 .context(errors::TooManyExtensions)?,
301 );
302 for (name, headers) in &self.extensions {
303 name.encode(buf)?;
304 buf.reserve(2);
305 buf.put_u16(
306 u16::try_from(headers.len())
307 .ok()
308 .context(errors::TooManyHeaders)?,
309 );
310 for (&name, value) in headers {
311 buf.reserve(2);
312 buf.put_u16(name);
313 value.encode(buf)?;
314 }
315 }
316 Ok(())
317 }
318}
319
320impl Decode for ClientHandshake {
321 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
322 ensure!(buf.remaining() >= 8, errors::Underflow);
323 let major_ver = buf.get_u16();
324 let minor_ver = buf.get_u16();
325 let num_params = buf.get_u16();
326 let mut params = HashMap::new();
327 for _ in 0..num_params {
328 params.insert(String::decode(buf)?, String::decode(buf)?);
329 }
330
331 ensure!(buf.remaining() >= 2, errors::Underflow);
332 let num_ext = buf.get_u16();
333 let mut extensions = HashMap::new();
334 for _ in 0..num_ext {
335 let name = String::decode(buf)?;
336 ensure!(buf.remaining() >= 2, errors::Underflow);
337 let num_headers = buf.get_u16();
338 let mut headers = HashMap::new();
339 for _ in 0..num_headers {
340 ensure!(buf.remaining() >= 4, errors::Underflow);
341 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
342 }
343 extensions.insert(name, headers);
344 }
345 Ok(ClientHandshake {
346 major_ver,
347 minor_ver,
348 params,
349 extensions,
350 })
351 }
352}
353
354impl Encode for SaslInitialResponse {
355 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
356 self.method.encode(buf)?;
357 self.data.encode(buf)?;
358 Ok(())
359 }
360}
361
362impl Decode for SaslInitialResponse {
363 fn decode(buf: &mut Input) -> Result<SaslInitialResponse, DecodeError> {
364 let method = String::decode(buf)?;
365 let data = Bytes::decode(buf)?;
366 Ok(SaslInitialResponse { method, data })
367 }
368}
369
370impl Encode for SaslResponse {
371 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
372 self.data.encode(buf)?;
373 Ok(())
374 }
375}
376
377impl Decode for SaslResponse {
378 fn decode(buf: &mut Input) -> Result<SaslResponse, DecodeError> {
379 let data = Bytes::decode(buf)?;
380 Ok(SaslResponse { data })
381 }
382}
383
384impl Encode for ExecuteScript {
385 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
386 buf.reserve(6);
387 buf.put_u16(
388 u16::try_from(self.headers.len())
389 .ok()
390 .context(errors::TooManyHeaders)?,
391 );
392 for (&name, value) in &self.headers {
393 buf.reserve(2);
394 buf.put_u16(name);
395 value.encode(buf)?;
396 }
397 self.script_text.encode(buf)?;
398 Ok(())
399 }
400}
401
402impl Decode for ExecuteScript {
403 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
404 ensure!(buf.remaining() >= 6, errors::Underflow);
405 let num_headers = buf.get_u16();
406 let mut headers = HashMap::new();
407 for _ in 0..num_headers {
408 ensure!(buf.remaining() >= 4, errors::Underflow);
409 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
410 }
411 let script_text = String::decode(buf)?;
412 Ok(ExecuteScript {
413 script_text,
414 headers,
415 })
416 }
417}
418
419impl Encode for Prepare {
420 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
421 debug_assert!(!buf.proto().is_1());
422 buf.reserve(12);
423 buf.put_u16(
424 u16::try_from(self.headers.len())
425 .ok()
426 .context(errors::TooManyHeaders)?,
427 );
428 for (&name, value) in &self.headers {
429 buf.reserve(2);
430 buf.put_u16(name);
431 value.encode(buf)?;
432 }
433 buf.reserve(10);
434 buf.put_u8(self.io_format as u8);
435 buf.put_u8(self.expected_cardinality as u8);
436 self.statement_name.encode(buf)?;
437 self.command_text.encode(buf)?;
438 Ok(())
439 }
440}
441
442impl Decode for Prepare {
443 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
444 ensure!(buf.remaining() >= 12, errors::Underflow);
445 let num_headers = buf.get_u16();
446 let mut headers = HashMap::new();
447 for _ in 0..num_headers {
448 ensure!(buf.remaining() >= 4, errors::Underflow);
449 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
450 }
451 ensure!(buf.remaining() >= 8, errors::Underflow);
452 let io_format = match buf.get_u8() {
453 0x62 => IoFormat::Binary,
454 0x6a => IoFormat::Json,
455 0x4a => IoFormat::JsonElements,
456 c => errors::InvalidIoFormat { io_format: c }.fail()?,
457 };
458 let expected_cardinality = TryFrom::try_from(buf.get_u8())?;
459 let statement_name = Bytes::decode(buf)?;
460 let command_text = String::decode(buf)?;
461 Ok(Prepare {
462 headers,
463 io_format,
464 expected_cardinality,
465 statement_name,
466 command_text,
467 })
468 }
469}
470
471impl Encode for DescribeStatement {
472 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
473 buf.reserve(7);
474 buf.put_u16(
475 u16::try_from(self.headers.len())
476 .ok()
477 .context(errors::TooManyHeaders)?,
478 );
479 buf.reserve(5);
480 buf.put_u8(self.aspect as u8);
481 self.statement_name.encode(buf)?;
482 Ok(())
483 }
484}
485
486impl Decode for DescribeStatement {
487 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
488 ensure!(buf.remaining() >= 12, errors::Underflow);
489 let num_headers = buf.get_u16();
490 let mut headers = HashMap::new();
491 for _ in 0..num_headers {
492 ensure!(buf.remaining() >= 4, errors::Underflow);
493 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
494 }
495 ensure!(buf.remaining() >= 8, errors::Underflow);
496 let aspect = match buf.get_u8() {
497 0x54 => DescribeAspect::DataDescription,
498 c => errors::InvalidAspect { aspect: c }.fail()?,
499 };
500 let statement_name = Bytes::decode(buf)?;
501 Ok(DescribeStatement {
502 headers,
503 aspect,
504 statement_name,
505 })
506 }
507}
508
509impl Encode for Execute0 {
510 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
511 debug_assert!(!buf.proto().is_1());
512 buf.reserve(10);
513 buf.put_u16(
514 u16::try_from(self.headers.len())
515 .ok()
516 .context(errors::TooManyHeaders)?,
517 );
518 for (&name, value) in &self.headers {
519 buf.reserve(2);
520 buf.put_u16(name);
521 value.encode(buf)?;
522 }
523 self.statement_name.encode(buf)?;
524 self.arguments.encode(buf)?;
525 Ok(())
526 }
527}
528
529impl Decode for Execute0 {
530 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
531 ensure!(buf.remaining() >= 12, errors::Underflow);
532 let num_headers = buf.get_u16();
533 let mut headers = HashMap::new();
534 for _ in 0..num_headers {
535 ensure!(buf.remaining() >= 4, errors::Underflow);
536 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
537 }
538 let statement_name = Bytes::decode(buf)?;
539 let arguments = Bytes::decode(buf)?;
540 Ok(Execute0 {
541 headers,
542 statement_name,
543 arguments,
544 })
545 }
546}
547
548impl OptimisticExecute {
549 pub fn new(
550 flags: &CompilationOptions,
551 query: &str,
552 arguments: impl Into<Bytes>,
553 input_typedesc_id: Uuid,
554 output_typedesc_id: Uuid,
555 ) -> OptimisticExecute {
556 let mut headers = KeyValues::new();
557 if let Some(limit) = flags.implicit_limit {
558 headers.insert(0xFF01, Bytes::from(limit.to_string()));
559 }
560 if flags.implicit_typenames {
561 headers.insert(0xFF02, "true".into());
562 }
563 if flags.implicit_typeids {
564 headers.insert(0xFF03, "true".into());
565 }
566 let caps = flags.allow_capabilities.bits().to_be_bytes();
567 headers.insert(0xFF04, caps[..].to_vec().into());
568 if flags.explicit_objectids {
569 headers.insert(0xFF03, "true".into());
570 }
571 OptimisticExecute {
572 headers,
573 io_format: flags.io_format,
574 expected_cardinality: flags.expected_cardinality,
575 command_text: query.into(),
576 input_typedesc_id,
577 output_typedesc_id,
578 arguments: arguments.into(),
579 }
580 }
581}
582
583impl Encode for OptimisticExecute {
584 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
585 buf.reserve(2 + 1 + 1 + 4 + 16 + 16 + 4);
586 buf.put_u16(
587 u16::try_from(self.headers.len())
588 .ok()
589 .context(errors::TooManyHeaders)?,
590 );
591 for (&name, value) in &self.headers {
592 buf.reserve(2);
593 buf.put_u16(name);
594 value.encode(buf)?;
595 }
596 buf.reserve(1 + 1 + 4 + 16 + 16 + 4);
597 buf.put_u8(self.io_format as u8);
598 buf.put_u8(self.expected_cardinality as u8);
599 self.command_text.encode(buf)?;
600 self.input_typedesc_id.encode(buf)?;
601 self.output_typedesc_id.encode(buf)?;
602 self.arguments.encode(buf)?;
603 Ok(())
604 }
605}
606
607impl Decode for OptimisticExecute {
608 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
609 ensure!(buf.remaining() >= 12, errors::Underflow);
610 let num_headers = buf.get_u16();
611 let mut headers = HashMap::new();
612 for _ in 0..num_headers {
613 ensure!(buf.remaining() >= 4, errors::Underflow);
614 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
615 }
616 let io_format = match buf.get_u8() {
617 0x62 => IoFormat::Binary,
618 0x6a => IoFormat::Json,
619 0x4a => IoFormat::JsonElements,
620 c => errors::InvalidIoFormat { io_format: c }.fail()?,
621 };
622 let expected_cardinality = TryFrom::try_from(buf.get_u8())?;
623 let command_text = String::decode(buf)?;
624 let input_typedesc_id = Uuid::decode(buf)?;
625 let output_typedesc_id = Uuid::decode(buf)?;
626 let arguments = Bytes::decode(buf)?;
627 Ok(OptimisticExecute {
628 headers,
629 io_format,
630 expected_cardinality,
631 command_text,
632 input_typedesc_id,
633 output_typedesc_id,
634 arguments,
635 })
636 }
637}
638
639impl Encode for Execute1 {
640 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
641 buf.reserve(2 + 3 * 8 + 1 + 1 + 4 + 16 + 4 + 16 + 16 + 4);
642 if let Some(annotations) = self.annotations.as_deref() {
643 buf.put_u16(
644 u16::try_from(annotations.len())
645 .ok()
646 .context(errors::TooManyHeaders)?,
647 );
648 for (name, value) in annotations {
649 buf.reserve(4);
650 name.encode(buf)?;
651 value.encode(buf)?;
652 }
653 } else {
654 buf.put_u16(0);
655 }
656 buf.reserve(3 * 8 + 1 + 1 + 4 + 16 + 4 + 16 + 16 + 4);
657 buf.put_u64(self.allowed_capabilities.bits());
658 buf.put_u64(self.compilation_flags.bits());
659 buf.put_u64(self.implicit_limit.unwrap_or(0));
660 if buf.proto().is_multilingual() {
661 buf.put_u8(self.input_language as u8);
662 }
663 buf.put_u8(self.output_format as u8);
664 buf.put_u8(self.expected_cardinality as u8);
665 self.command_text.encode(buf)?;
666 self.state.typedesc_id.encode(buf)?;
667 self.state.data.encode(buf)?;
668 self.input_typedesc_id.encode(buf)?;
669 self.output_typedesc_id.encode(buf)?;
670 self.arguments.encode(buf)?;
671 Ok(())
672 }
673}
674
675impl Decode for Execute1 {
676 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
677 ensure!(
678 buf.remaining() >= 2 + 3 * 8 + 2 + 4 + 16 + 4 + 16 + 16 + 4,
679 errors::Underflow
680 );
681 let num_annotations = buf.get_u16();
682 let annotations = if num_annotations == 0 {
683 None
684 } else {
685 let mut annotations = HashMap::new();
686 for _ in 0..num_annotations {
687 ensure!(buf.remaining() >= 4, errors::Underflow);
688 annotations.insert(String::decode(buf)?, String::decode(buf)?);
689 }
690 Some(Arc::new(annotations))
691 };
692 ensure!(
693 buf.remaining() >= 3 * 8 + 2 + 4 + 16 + 4 + 16 + 16 + 4,
694 errors::Underflow
695 );
696 let allowed_capabilities = decode_capabilities(buf.get_u64())?;
697 let compilation_flags = decode_compilation_flags(buf.get_u64())?;
698 let implicit_limit = match buf.get_u64() {
699 0 => None,
700 val => Some(val),
701 };
702 let input_language = if buf.proto().is_multilingual() {
703 TryFrom::try_from(buf.get_u8())?
704 } else {
705 InputLanguage::EdgeQL
706 };
707 let output_format = match buf.get_u8() {
708 0x62 => IoFormat::Binary,
709 0x6a => IoFormat::Json,
710 0x4a => IoFormat::JsonElements,
711 c => errors::InvalidIoFormat { io_format: c }.fail()?,
712 };
713 let expected_cardinality = TryFrom::try_from(buf.get_u8())?;
714 let command_text = String::decode(buf)?;
715 let state = State {
716 typedesc_id: Uuid::decode(buf)?,
717 data: Bytes::decode(buf)?,
718 };
719 let input_typedesc_id = Uuid::decode(buf)?;
720 let output_typedesc_id = Uuid::decode(buf)?;
721 let arguments = Bytes::decode(buf)?;
722 Ok(Execute1 {
723 annotations,
724 allowed_capabilities,
725 compilation_flags,
726 implicit_limit,
727 output_format,
728 expected_cardinality,
729 command_text,
730 state,
731 input_typedesc_id,
732 output_typedesc_id,
733 arguments,
734 input_language,
735 })
736 }
737}
738
739impl Encode for Dump2 {
740 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
741 buf.reserve(10);
742 buf.put_u16(
743 u16::try_from(self.headers.len())
744 .ok()
745 .context(errors::TooManyHeaders)?,
746 );
747 for (&name, value) in &self.headers {
748 buf.reserve(2);
749 buf.put_u16(name);
750 value.encode(buf)?;
751 }
752 Ok(())
753 }
754}
755
756impl Decode for Dump2 {
757 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
758 ensure!(buf.remaining() >= 12, errors::Underflow);
759 let num_headers = buf.get_u16();
760 let mut headers = HashMap::new();
761 for _ in 0..num_headers {
762 ensure!(buf.remaining() >= 4, errors::Underflow);
763 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
764 }
765 Ok(Dump2 { headers })
766 }
767}
768
769impl Encode for Dump3 {
770 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
771 buf.reserve(2 + 8);
772 if let Some(annotations) = self.annotations.as_deref() {
773 buf.put_u16(
774 u16::try_from(annotations.len())
775 .ok()
776 .context(errors::TooManyHeaders)?,
777 );
778 for (name, value) in annotations {
779 buf.reserve(4);
780 name.encode(buf)?;
781 value.encode(buf)?;
782 }
783 } else {
784 buf.put_u16(0);
785 }
786 buf.put_u64(self.flags.bits());
787 Ok(())
788 }
789}
790
791impl Decode for Dump3 {
792 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
793 ensure!(buf.remaining() >= 2, errors::Underflow);
794 let num_headers = buf.get_u16();
795 let annotations = if num_headers == 0 {
796 None
797 } else {
798 let mut annotations = HashMap::new();
799 for _ in 0..num_headers {
800 ensure!(buf.remaining() >= 8, errors::Underflow);
801 annotations.insert(String::decode(buf)?, String::decode(buf)?);
802 }
803 Some(Arc::new(annotations))
804 };
805 ensure!(buf.remaining() >= 8, errors::Underflow);
806 let flags = decode_dump_flags(buf.get_u64())?;
807 Ok(Dump3 { annotations, flags })
808 }
809}
810
811impl Encode for Restore {
812 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
813 buf.reserve(4 + self.data.len());
814 buf.put_u16(
815 u16::try_from(self.headers.len())
816 .ok()
817 .context(errors::TooManyHeaders)?,
818 );
819 for (&name, value) in &self.headers {
820 buf.reserve(2);
821 buf.put_u16(name);
822 value.encode(buf)?;
823 }
824 buf.put_u16(self.jobs);
825 buf.extend(&self.data);
826 Ok(())
827 }
828}
829
830impl Decode for Restore {
831 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
832 ensure!(buf.remaining() >= 4, errors::Underflow);
833
834 let num_headers = buf.get_u16();
835 let mut headers = HashMap::new();
836 for _ in 0..num_headers {
837 ensure!(buf.remaining() >= 4, errors::Underflow);
838 headers.insert(buf.get_u16(), Bytes::decode(buf)?);
839 }
840
841 let jobs = buf.get_u16();
842
843 let data = buf.copy_to_bytes(buf.remaining());
844 Ok(Restore {
845 jobs,
846 headers,
847 data,
848 })
849 }
850}
851
852impl Encode for RestoreBlock {
853 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
854 buf.extend(&self.data);
855 Ok(())
856 }
857}
858
859impl Decode for RestoreBlock {
860 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
861 let data = buf.copy_to_bytes(buf.remaining());
862 Ok(RestoreBlock { data })
863 }
864}
865
866impl Parse {
867 pub fn new(
868 opts: &CompilationOptions,
869 query: &str,
870 state: State,
871 annotations: Option<Arc<Annotations>>,
872 ) -> Parse {
873 Parse {
874 annotations,
875 allowed_capabilities: opts.allow_capabilities,
876 compilation_flags: opts.flags(),
877 implicit_limit: opts.implicit_limit,
878 output_format: opts.io_format,
879 expected_cardinality: opts.expected_cardinality,
880 command_text: query.into(),
881 state,
882 input_language: opts.input_language,
883 }
884 }
885}
886
887impl Prepare {
888 pub fn new(flags: &CompilationOptions, query: &str) -> Prepare {
889 let mut headers = KeyValues::new();
890 if let Some(limit) = flags.implicit_limit {
891 headers.insert(0xFF01, Bytes::from(limit.to_string()));
892 }
893 if flags.implicit_typenames {
894 headers.insert(0xFF02, "true".into());
895 }
896 if flags.implicit_typeids {
897 headers.insert(0xFF03, "true".into());
898 }
899 let caps = flags.allow_capabilities.bits().to_be_bytes();
900 headers.insert(0xFF04, caps[..].to_vec().into());
901 if flags.explicit_objectids {
902 headers.insert(0xFF03, "true".into());
903 }
904 Prepare {
905 headers,
906 io_format: flags.io_format,
907 expected_cardinality: flags.expected_cardinality,
908 statement_name: Bytes::from(""),
909 command_text: query.into(),
910 }
911 }
912}
913
914fn decode_capabilities(val: u64) -> Result<Capabilities, DecodeError> {
915 Capabilities::from_bits(val)
916 .ok_or_else(|| errors::InvalidCapabilities { capabilities: val }.build())
917}
918
919fn decode_compilation_flags(val: u64) -> Result<CompilationFlags, DecodeError> {
920 CompilationFlags::from_bits(val).ok_or_else(|| {
921 errors::InvalidCompilationFlags {
922 compilation_flags: val,
923 }
924 .build()
925 })
926}
927
928fn decode_dump_flags(val: u64) -> Result<DumpFlags, DecodeError> {
929 DumpFlags::from_bits(val).ok_or_else(|| errors::InvalidDumpFlags { dump_flags: val }.build())
930}
931
932impl Decode for Parse {
933 fn decode(buf: &mut Input) -> Result<Self, DecodeError> {
934 ensure!(buf.remaining() >= 52, errors::Underflow);
935 let num_headers = buf.get_u16();
936 let annotations = if num_headers == 0 {
937 None
938 } else {
939 let mut annotations = HashMap::new();
940 for _ in 0..num_headers {
941 ensure!(buf.remaining() >= 8, errors::Underflow);
942 annotations.insert(String::decode(buf)?, String::decode(buf)?);
943 }
944 Some(Arc::new(annotations))
945 };
946 ensure!(buf.remaining() >= 50, errors::Underflow);
947 let allowed_capabilities = decode_capabilities(buf.get_u64())?;
948 let compilation_flags = decode_compilation_flags(buf.get_u64())?;
949 let implicit_limit = match buf.get_u64() {
950 0 => None,
951 val => Some(val),
952 };
953 let input_language = if buf.proto().is_multilingual() {
954 TryFrom::try_from(buf.get_u8())?
955 } else {
956 InputLanguage::EdgeQL
957 };
958 let output_format = match buf.get_u8() {
959 0x62 => IoFormat::Binary,
960 0x6a => IoFormat::Json,
961 0x4a => IoFormat::JsonElements,
962 c => errors::InvalidIoFormat { io_format: c }.fail()?,
963 };
964 let expected_cardinality = TryFrom::try_from(buf.get_u8())?;
965 let command_text = String::decode(buf)?;
966 let state = State {
967 typedesc_id: Uuid::decode(buf)?,
968 data: Bytes::decode(buf)?,
969 };
970 Ok(Parse {
971 annotations,
972 allowed_capabilities,
973 compilation_flags,
974 implicit_limit,
975 output_format,
976 expected_cardinality,
977 command_text,
978 state,
979 input_language,
980 })
981 }
982}
983
984impl Encode for Parse {
985 fn encode(&self, buf: &mut Output) -> Result<(), EncodeError> {
986 debug_assert!(buf.proto().is_1());
987 buf.reserve(52);
988 if let Some(annotations) = self.annotations.as_deref() {
989 buf.put_u16(
990 u16::try_from(annotations.len())
991 .ok()
992 .context(errors::TooManyHeaders)?,
993 );
994 for (name, value) in annotations {
995 buf.reserve(8);
996 name.encode(buf)?;
997 value.encode(buf)?;
998 }
999 } else {
1000 buf.put_u16(0);
1001 }
1002 buf.reserve(50);
1003 buf.put_u64(self.allowed_capabilities.bits());
1004 buf.put_u64(self.compilation_flags.bits());
1005 buf.put_u64(self.implicit_limit.unwrap_or(0));
1006 if buf.proto().is_multilingual() {
1007 buf.put_u8(self.input_language as u8);
1008 }
1009 buf.put_u8(self.output_format as u8);
1010 buf.put_u8(self.expected_cardinality as u8);
1011 self.command_text.encode(buf)?;
1012 self.state.typedesc_id.encode(buf)?;
1013 self.state.data.encode(buf)?;
1014 Ok(())
1015 }
1016}