Skip to main content

pgwire/messages/
extendedquery.rs

1use bytes::{Buf, BufMut, Bytes};
2
3use super::{DecodeContext, Message, codec};
4use crate::error::PgWireResult;
5
6/// Request from frontend to parse a prepared query string
7#[non_exhaustive]
8#[derive(PartialEq, Eq, Debug, new)]
9pub struct Parse {
10    pub name: Option<String>,
11    pub query: String,
12    pub type_oids: Vec<u32>,
13}
14
15pub const MESSAGE_TYPE_BYTE_PARSE: u8 = b'P';
16
17impl Message for Parse {
18    #[inline]
19    fn message_type() -> Option<u8> {
20        Some(MESSAGE_TYPE_BYTE_PARSE)
21    }
22
23    #[inline]
24    fn max_message_length() -> usize {
25        super::LARGE_PACKET_SIZE_LIMIT
26    }
27
28    fn message_length(&self) -> usize {
29        4 + codec::option_string_len(&self.name) // name
30            + (1 + self.query.len()) // query
31            + 2 + (4 * self.type_oids.len()) // type oids
32    }
33
34    fn encode_body(&self, buf: &mut bytes::BytesMut) -> PgWireResult<()> {
35        codec::put_option_cstring(buf, &self.name);
36        codec::put_cstring(buf, &self.query);
37
38        buf.put_u16(self.type_oids.len() as u16);
39        for oid in &self.type_oids {
40            buf.put_u32(*oid);
41        }
42
43        Ok(())
44    }
45
46    fn decode_body(
47        buf: &mut bytes::BytesMut,
48        _: usize,
49        _ctx: &DecodeContext,
50    ) -> PgWireResult<Self> {
51        let name = codec::get_cstring(buf);
52        let query = codec::get_cstring(buf).unwrap_or_else(|| "".to_owned());
53        let type_oid_count = buf.get_u16();
54
55        let mut type_oids = Vec::with_capacity(type_oid_count as usize);
56        for _ in 0..type_oid_count {
57            type_oids.push(buf.get_u32());
58        }
59
60        Ok(Parse {
61            name,
62            query,
63            type_oids,
64        })
65    }
66}
67
68/// Response for Parse command, sent from backend to frontend
69#[non_exhaustive]
70#[derive(PartialEq, Eq, Debug, new)]
71pub struct ParseComplete;
72
73pub const MESSAGE_TYPE_BYTE_PARSE_COMPLETE: u8 = b'1';
74
75impl Message for ParseComplete {
76    #[inline]
77    fn message_type() -> Option<u8> {
78        Some(MESSAGE_TYPE_BYTE_PARSE_COMPLETE)
79    }
80
81    #[inline]
82    fn max_message_length() -> usize {
83        super::SMALL_BACKEND_PACKET_SIZE_LIMIT
84    }
85
86    #[inline]
87    fn message_length(&self) -> usize {
88        4
89    }
90
91    #[inline]
92    fn encode_body(&self, _buf: &mut bytes::BytesMut) -> PgWireResult<()> {
93        Ok(())
94    }
95
96    #[inline]
97    fn decode_body(
98        _buf: &mut bytes::BytesMut,
99        _: usize,
100        _ctx: &DecodeContext,
101    ) -> PgWireResult<Self> {
102        Ok(ParseComplete)
103    }
104}
105
106/// Closing the prepared statement or portal
107#[non_exhaustive]
108#[derive(PartialEq, Eq, Debug, new)]
109pub struct Close {
110    pub target_type: u8,
111    pub name: Option<String>,
112}
113
114pub const TARGET_TYPE_BYTE_STATEMENT: u8 = b'S';
115pub const TARGET_TYPE_BYTE_PORTAL: u8 = b'P';
116
117pub const MESSAGE_TYPE_BYTE_CLOSE: u8 = b'C';
118
119impl Message for Close {
120    #[inline]
121    fn message_type() -> Option<u8> {
122        Some(MESSAGE_TYPE_BYTE_CLOSE)
123    }
124
125    fn message_length(&self) -> usize {
126        4 + 1 + codec::option_string_len(&self.name)
127    }
128
129    fn encode_body(&self, buf: &mut bytes::BytesMut) -> PgWireResult<()> {
130        buf.put_u8(self.target_type);
131        codec::put_option_cstring(buf, &self.name);
132        Ok(())
133    }
134
135    fn decode_body(
136        buf: &mut bytes::BytesMut,
137        _: usize,
138        _ctx: &DecodeContext,
139    ) -> PgWireResult<Self> {
140        let target_type = buf.get_u8();
141        let name = codec::get_cstring(buf);
142
143        Ok(Close { target_type, name })
144    }
145}
146
147/// Response for Close command, sent from backend to frontend
148#[non_exhaustive]
149#[derive(PartialEq, Eq, Debug, new)]
150pub struct CloseComplete;
151
152pub const MESSAGE_TYPE_BYTE_CLOSE_COMPLETE: u8 = b'3';
153
154impl Message for CloseComplete {
155    #[inline]
156    fn message_type() -> Option<u8> {
157        Some(MESSAGE_TYPE_BYTE_CLOSE_COMPLETE)
158    }
159
160    #[inline]
161    fn max_message_length() -> usize {
162        super::SMALL_BACKEND_PACKET_SIZE_LIMIT
163    }
164
165    #[inline]
166    fn message_length(&self) -> usize {
167        4
168    }
169
170    #[inline]
171    fn encode_body(&self, _buf: &mut bytes::BytesMut) -> PgWireResult<()> {
172        Ok(())
173    }
174
175    #[inline]
176    fn decode_body(
177        _buf: &mut bytes::BytesMut,
178        _: usize,
179        _ctx: &DecodeContext,
180    ) -> PgWireResult<Self> {
181        Ok(CloseComplete)
182    }
183}
184
185/// Bind command, for executing prepared statement
186#[non_exhaustive]
187#[derive(PartialEq, Eq, Debug, new)]
188pub struct Bind {
189    pub portal_name: Option<String>,
190    pub statement_name: Option<String>,
191    pub parameter_format_codes: Vec<i16>,
192    // None for Null data, TODO: consider wrapping this together with DataRow in
193    // data.rs
194    pub parameters: Vec<Option<Bytes>>,
195
196    pub result_column_format_codes: Vec<i16>,
197}
198
199pub const MESSAGE_TYPE_BYTE_BIND: u8 = b'B';
200
201impl Message for Bind {
202    #[inline]
203    fn message_type() -> Option<u8> {
204        Some(MESSAGE_TYPE_BYTE_BIND)
205    }
206
207    #[inline]
208    fn max_message_length() -> usize {
209        super::LARGE_PACKET_SIZE_LIMIT
210    }
211
212    fn message_length(&self) -> usize {
213        4 + codec::option_string_len(&self.portal_name) + codec::option_string_len(&self.statement_name)
214            + 2 // parameter_format_code len
215            + (2 * self.parameter_format_codes.len()) // parameter_format_codes
216            + 2 // parameters len
217            + self.parameters.iter().map(|p| 4 + p.as_ref().map(|data| data.len()).unwrap_or(0)).sum::<usize>() // parameters
218            + 2 // result_format_code len
219            + (2 * self.result_column_format_codes.len()) // result_format_codes
220    }
221
222    fn encode_body(&self, buf: &mut bytes::BytesMut) -> PgWireResult<()> {
223        codec::put_option_cstring(buf, &self.portal_name);
224        codec::put_option_cstring(buf, &self.statement_name);
225
226        buf.put_u16(self.parameter_format_codes.len() as u16);
227        for c in &self.parameter_format_codes {
228            buf.put_i16(*c);
229        }
230
231        buf.put_u16(self.parameters.len() as u16);
232        for v in &self.parameters {
233            if let Some(v) = v {
234                buf.put_i32(v.len() as i32);
235                buf.put_slice(v.as_ref());
236            } else {
237                buf.put_i32(-1);
238            }
239        }
240
241        buf.put_i16(self.result_column_format_codes.len() as i16);
242        for c in &self.result_column_format_codes {
243            buf.put_i16(*c);
244        }
245
246        Ok(())
247    }
248
249    fn decode_body(
250        buf: &mut bytes::BytesMut,
251        _: usize,
252        _ctx: &DecodeContext,
253    ) -> PgWireResult<Self> {
254        let portal_name = codec::get_cstring(buf);
255        let statement_name = codec::get_cstring(buf);
256
257        let parameter_format_code_len = buf.get_u16();
258        let mut parameter_format_codes = Vec::with_capacity(parameter_format_code_len as usize);
259
260        for _ in 0..parameter_format_code_len {
261            parameter_format_codes.push(buf.get_i16());
262        }
263
264        let parameter_len = buf.get_u16();
265        let mut parameters = Vec::with_capacity(parameter_len as usize);
266        for _ in 0..parameter_len {
267            let data_len = buf.get_i32();
268
269            if data_len >= 0 {
270                parameters.push(Some(buf.split_to(data_len as usize).freeze()));
271            } else {
272                parameters.push(None);
273            }
274        }
275
276        let result_column_format_code_len = buf.get_i16();
277        let mut result_column_format_codes =
278            Vec::with_capacity(result_column_format_code_len as usize);
279        for _ in 0..result_column_format_code_len {
280            result_column_format_codes.push(buf.get_i16());
281        }
282
283        Ok(Bind {
284            portal_name,
285            statement_name,
286
287            parameter_format_codes,
288            parameters,
289
290            result_column_format_codes,
291        })
292    }
293}
294
295/// Success response for `Bind`
296#[non_exhaustive]
297#[derive(PartialEq, Eq, Debug, new)]
298pub struct BindComplete;
299
300pub const MESSAGE_TYPE_BYTE_BIND_COMPLETE: u8 = b'2';
301
302impl Message for BindComplete {
303    #[inline]
304    fn message_type() -> Option<u8> {
305        Some(MESSAGE_TYPE_BYTE_BIND_COMPLETE)
306    }
307
308    #[inline]
309    fn max_message_length() -> usize {
310        super::SMALL_BACKEND_PACKET_SIZE_LIMIT
311    }
312
313    #[inline]
314    fn message_length(&self) -> usize {
315        4
316    }
317
318    #[inline]
319    fn encode_body(&self, _buf: &mut bytes::BytesMut) -> PgWireResult<()> {
320        Ok(())
321    }
322
323    #[inline]
324    fn decode_body(
325        _buf: &mut bytes::BytesMut,
326        _: usize,
327        _ctx: &DecodeContext,
328    ) -> PgWireResult<Self> {
329        Ok(BindComplete)
330    }
331}
332
333/// Describe command fron frontend to backend. For getting information of
334/// particular portal or statement
335#[non_exhaustive]
336#[derive(PartialEq, Eq, Debug, new)]
337pub struct Describe {
338    pub target_type: u8,
339    pub name: Option<String>,
340}
341
342pub const MESSAGE_TYPE_BYTE_DESCRIBE: u8 = b'D';
343
344impl Message for Describe {
345    #[inline]
346    fn message_type() -> Option<u8> {
347        Some(MESSAGE_TYPE_BYTE_DESCRIBE)
348    }
349
350    fn message_length(&self) -> usize {
351        4 + 1 + codec::option_string_len(&self.name)
352    }
353
354    fn encode_body(&self, buf: &mut bytes::BytesMut) -> PgWireResult<()> {
355        buf.put_u8(self.target_type);
356        codec::put_option_cstring(buf, &self.name);
357        Ok(())
358    }
359
360    fn decode_body(
361        buf: &mut bytes::BytesMut,
362        _: usize,
363        _ctx: &DecodeContext,
364    ) -> PgWireResult<Self> {
365        let target_type = buf.get_u8();
366        let name = codec::get_cstring(buf);
367
368        Ok(Describe { target_type, name })
369    }
370}
371
372/// Execute portal by its name
373#[non_exhaustive]
374#[derive(PartialEq, Eq, Debug, new)]
375pub struct Execute {
376    pub name: Option<String>,
377    pub max_rows: i32,
378}
379
380pub const MESSAGE_TYPE_BYTE_EXECUTE: u8 = b'E';
381
382impl Message for Execute {
383    #[inline]
384    fn message_type() -> Option<u8> {
385        Some(MESSAGE_TYPE_BYTE_EXECUTE)
386    }
387
388    fn message_length(&self) -> usize {
389        4 + codec::option_string_len(&self.name) + 4
390    }
391
392    fn encode_body(&self, buf: &mut bytes::BytesMut) -> PgWireResult<()> {
393        codec::put_option_cstring(buf, &self.name);
394        buf.put_i32(self.max_rows);
395        Ok(())
396    }
397
398    fn decode_body(
399        buf: &mut bytes::BytesMut,
400        _: usize,
401        _ctx: &DecodeContext,
402    ) -> PgWireResult<Self> {
403        let name = codec::get_cstring(buf);
404        let max_rows = buf.get_i32();
405
406        Ok(Execute { name, max_rows })
407    }
408}
409
410#[non_exhaustive]
411#[derive(PartialEq, Eq, Debug, new)]
412pub struct Flush;
413
414pub const MESSAGE_TYPE_BYTE_FLUSH: u8 = b'H';
415
416impl Message for Flush {
417    #[inline]
418    fn message_type() -> Option<u8> {
419        Some(MESSAGE_TYPE_BYTE_FLUSH)
420    }
421
422    #[inline]
423    fn message_length(&self) -> usize {
424        4
425    }
426
427    fn encode_body(&self, _buf: &mut bytes::BytesMut) -> PgWireResult<()> {
428        Ok(())
429    }
430
431    fn decode_body(
432        _buf: &mut bytes::BytesMut,
433        _: usize,
434        _ctx: &DecodeContext,
435    ) -> PgWireResult<Self> {
436        Ok(Flush)
437    }
438}
439
440/// Execute portal by its name
441#[non_exhaustive]
442#[derive(PartialEq, Eq, Debug, new)]
443pub struct Sync;
444
445pub const MESSAGE_TYPE_BYTE_SYNC: u8 = b'S';
446
447impl Message for Sync {
448    #[inline]
449    fn message_type() -> Option<u8> {
450        Some(MESSAGE_TYPE_BYTE_SYNC)
451    }
452
453    #[inline]
454    fn message_length(&self) -> usize {
455        4
456    }
457
458    fn encode_body(&self, _buf: &mut bytes::BytesMut) -> PgWireResult<()> {
459        Ok(())
460    }
461
462    fn decode_body(
463        _buf: &mut bytes::BytesMut,
464        _: usize,
465        _ctx: &DecodeContext,
466    ) -> PgWireResult<Self> {
467        Ok(Sync)
468    }
469}
470
471#[non_exhaustive]
472#[derive(PartialEq, Eq, Debug, new)]
473pub struct PortalSuspended;
474
475pub const MESSAGE_TYPE_BYTE_PORTAL_SUSPENDED: u8 = b's';
476
477impl Message for PortalSuspended {
478    #[inline]
479    fn message_type() -> Option<u8> {
480        Some(MESSAGE_TYPE_BYTE_PORTAL_SUSPENDED)
481    }
482
483    #[inline]
484    fn max_message_length() -> usize {
485        super::SMALL_BACKEND_PACKET_SIZE_LIMIT
486    }
487
488    #[inline]
489    fn message_length(&self) -> usize {
490        4
491    }
492
493    fn encode_body(&self, _buf: &mut bytes::BytesMut) -> PgWireResult<()> {
494        Ok(())
495    }
496
497    fn decode_body(
498        _buf: &mut bytes::BytesMut,
499        _: usize,
500        _ctx: &DecodeContext,
501    ) -> PgWireResult<Self> {
502        Ok(PortalSuspended)
503    }
504}