opengauss_protocol/message/
backend.rs

1#![allow(missing_docs)]
2
3use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
4use bytes::{Bytes, BytesMut};
5use fallible_iterator::FallibleIterator;
6use memchr::memchr;
7use std::cmp;
8use std::io::{self, Read};
9use std::ops::Range;
10use std::str;
11
12use crate::Oid;
13
14pub const PARSE_COMPLETE_TAG: u8 = b'1';
15pub const BIND_COMPLETE_TAG: u8 = b'2';
16pub const CLOSE_COMPLETE_TAG: u8 = b'3';
17pub const NOTIFICATION_RESPONSE_TAG: u8 = b'A';
18pub const COPY_DONE_TAG: u8 = b'c';
19pub const COMMAND_COMPLETE_TAG: u8 = b'C';
20pub const COPY_DATA_TAG: u8 = b'd';
21pub const DATA_ROW_TAG: u8 = b'D';
22pub const ERROR_RESPONSE_TAG: u8 = b'E';
23pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
24pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
25pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
26pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
27pub const NO_DATA_TAG: u8 = b'n';
28pub const NOTICE_RESPONSE_TAG: u8 = b'N';
29pub const AUTHENTICATION_TAG: u8 = b'R';
30pub const PORTAL_SUSPENDED_TAG: u8 = b's';
31pub const PARAMETER_STATUS_TAG: u8 = b'S';
32pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
33pub const ROW_DESCRIPTION_TAG: u8 = b'T';
34pub const READY_FOR_QUERY_TAG: u8 = b'Z';
35
36#[derive(Debug, Copy, Clone)]
37pub struct Header {
38    tag: u8,
39    len: i32,
40}
41
42#[allow(clippy::len_without_is_empty)]
43impl Header {
44    #[inline]
45    pub fn parse(buf: &[u8]) -> io::Result<Option<Header>> {
46        if buf.len() < 5 {
47            return Ok(None);
48        }
49
50        let tag = buf[0];
51        let len = BigEndian::read_i32(&buf[1..]);
52
53        if len < 4 {
54            return Err(io::Error::new(
55                io::ErrorKind::InvalidData,
56                "invalid message length: header length < 4",
57            ));
58        }
59
60        Ok(Some(Header { tag, len }))
61    }
62
63    #[inline]
64    pub fn tag(self) -> u8 {
65        self.tag
66    }
67
68    #[inline]
69    pub fn len(self) -> i32 {
70        self.len
71    }
72}
73
74/// An enum representing Postgres backend messages.
75#[non_exhaustive]
76pub enum Message {
77    AuthenticationCleartextPassword,
78    AuthenticationGss,
79    AuthenticationKerberosV5,
80    AuthenticationMd5Password(AuthenticationMd5PasswordBody),
81    AuthenticationSha256Password(AuthenticationSha256PasswordBody),
82    AuthenticationOk,
83    AuthenticationScmCredential,
84    AuthenticationSspi,
85    AuthenticationGssContinue(AuthenticationGssContinueBody),
86    AuthenticationSasl(AuthenticationSaslBody),
87    AuthenticationSaslContinue(AuthenticationSaslContinueBody),
88    AuthenticationSaslFinal(AuthenticationSaslFinalBody),
89    BackendKeyData(BackendKeyDataBody),
90    BindComplete,
91    CloseComplete,
92    CommandComplete(CommandCompleteBody),
93    CopyData(CopyDataBody),
94    CopyDone,
95    CopyInResponse(CopyInResponseBody),
96    CopyOutResponse(CopyOutResponseBody),
97    DataRow(DataRowBody),
98    EmptyQueryResponse,
99    ErrorResponse(ErrorResponseBody),
100    NoData,
101    NoticeResponse(NoticeResponseBody),
102    NotificationResponse(NotificationResponseBody),
103    ParameterDescription(ParameterDescriptionBody),
104    ParameterStatus(ParameterStatusBody),
105    ParseComplete,
106    PortalSuspended,
107    ReadyForQuery(ReadyForQueryBody),
108    RowDescription(RowDescriptionBody),
109}
110
111impl Message {
112    #[inline]
113    pub fn parse(buf: &mut BytesMut) -> io::Result<Option<Message>> {
114        if buf.len() < 5 {
115            let to_read = 5 - buf.len();
116            buf.reserve(to_read);
117            return Ok(None);
118        }
119
120        let tag = buf[0];
121        let len = (&buf[1..5]).read_u32::<BigEndian>().unwrap();
122
123        if len < 4 {
124            return Err(io::Error::new(
125                io::ErrorKind::InvalidInput,
126                "invalid message length: parsing u32",
127            ));
128        }
129
130        let total_len = len as usize + 1;
131        if buf.len() < total_len {
132            let to_read = total_len - buf.len();
133            buf.reserve(to_read);
134            return Ok(None);
135        }
136
137        let mut buf = Buffer {
138            bytes: buf.split_to(total_len).freeze(),
139            idx: 5,
140        };
141
142        let message = match tag {
143            PARSE_COMPLETE_TAG => Message::ParseComplete,
144            BIND_COMPLETE_TAG => Message::BindComplete,
145            CLOSE_COMPLETE_TAG => Message::CloseComplete,
146            NOTIFICATION_RESPONSE_TAG => {
147                let process_id = buf.read_i32::<BigEndian>()?;
148                let channel = buf.read_cstr()?;
149                let message = buf.read_cstr()?;
150                Message::NotificationResponse(NotificationResponseBody {
151                    process_id,
152                    channel,
153                    message,
154                })
155            }
156            COPY_DONE_TAG => Message::CopyDone,
157            COMMAND_COMPLETE_TAG => {
158                let tag = buf.read_cstr()?;
159                Message::CommandComplete(CommandCompleteBody { tag })
160            }
161            COPY_DATA_TAG => {
162                let storage = buf.read_all();
163                Message::CopyData(CopyDataBody { storage })
164            }
165            DATA_ROW_TAG => {
166                let len = buf.read_u16::<BigEndian>()?;
167                let storage = buf.read_all();
168                Message::DataRow(DataRowBody { storage, len })
169            }
170            ERROR_RESPONSE_TAG => {
171                let storage = buf.read_all();
172                Message::ErrorResponse(ErrorResponseBody { storage })
173            }
174            COPY_IN_RESPONSE_TAG => {
175                let format = buf.read_u8()?;
176                let len = buf.read_u16::<BigEndian>()?;
177                let storage = buf.read_all();
178                Message::CopyInResponse(CopyInResponseBody {
179                    format,
180                    len,
181                    storage,
182                })
183            }
184            COPY_OUT_RESPONSE_TAG => {
185                let format = buf.read_u8()?;
186                let len = buf.read_u16::<BigEndian>()?;
187                let storage = buf.read_all();
188                Message::CopyOutResponse(CopyOutResponseBody {
189                    format,
190                    len,
191                    storage,
192                })
193            }
194            EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
195            BACKEND_KEY_DATA_TAG => {
196                let process_id = buf.read_i32::<BigEndian>()?;
197                let secret_key = buf.read_i32::<BigEndian>()?;
198                Message::BackendKeyData(BackendKeyDataBody {
199                    process_id,
200                    secret_key,
201                })
202            }
203            NO_DATA_TAG => Message::NoData,
204            NOTICE_RESPONSE_TAG => {
205                let storage = buf.read_all();
206                Message::NoticeResponse(NoticeResponseBody { storage })
207            }
208            AUTHENTICATION_TAG => match buf.read_i32::<BigEndian>()? {
209                0 => Message::AuthenticationOk,
210                2 => Message::AuthenticationKerberosV5,
211                3 => Message::AuthenticationCleartextPassword,
212                5 => {
213                    let mut salt = [0; 4];
214                    buf.read_exact(&mut salt)?;
215                    Message::AuthenticationMd5Password(AuthenticationMd5PasswordBody { salt })
216                }
217                6 => Message::AuthenticationScmCredential,
218                7 => Message::AuthenticationGss,
219                8 => {
220                    let storage = buf.read_all();
221                    Message::AuthenticationGssContinue(AuthenticationGssContinueBody(storage))
222                }
223                9 => Message::AuthenticationSspi,
224                10 => match buf.read_i32::<BigEndian>()? {
225                    0 | 2 => {
226                        let mut random64code = [0; 64];
227                        buf.read_exact(&mut random64code)?;
228                        let mut token = [0; 8];
229                        buf.read_exact(&mut token)?;
230                        let mut server_iteration = [0; 4];
231                        buf.read_exact(&mut server_iteration)?;
232                        Message::AuthenticationSha256Password(AuthenticationSha256PasswordBody {
233                            random64code,
234                            token,
235                            server_iteration,
236                        })
237                    }
238                    1 => {
239                        let mut salt = [0; 4];
240                        buf.read_exact(&mut salt)?;
241                        Message::AuthenticationMd5Password(AuthenticationMd5PasswordBody { salt })
242                    }
243                    tag => {
244                        println!("unknown sha256 authentication tag `{}`", tag);
245                        return Err(io::Error::new(
246                            io::ErrorKind::InvalidInput,
247                            format!("unknown sha256 authentication tag `{}`", tag),
248                        ));
249                    }
250                },
251                11 => {
252                    let storage = buf.read_all();
253                    Message::AuthenticationSaslContinue(AuthenticationSaslContinueBody(storage))
254                }
255                12 => {
256                    let storage = buf.read_all();
257                    Message::AuthenticationSaslFinal(AuthenticationSaslFinalBody(storage))
258                }
259                tag => {
260                    return Err(io::Error::new(
261                        io::ErrorKind::InvalidInput,
262                        format!("unknown authentication tag `{}`", tag),
263                    ));
264                }
265            },
266            PORTAL_SUSPENDED_TAG => Message::PortalSuspended,
267            PARAMETER_STATUS_TAG => {
268                let name = buf.read_cstr()?;
269                let value = buf.read_cstr()?;
270                Message::ParameterStatus(ParameterStatusBody { name, value })
271            }
272            PARAMETER_DESCRIPTION_TAG => {
273                let len = buf.read_u16::<BigEndian>()?;
274                let storage = buf.read_all();
275                Message::ParameterDescription(ParameterDescriptionBody { storage, len })
276            }
277            ROW_DESCRIPTION_TAG => {
278                let len = buf.read_u16::<BigEndian>()?;
279                let storage = buf.read_all();
280                Message::RowDescription(RowDescriptionBody { storage, len })
281            }
282            READY_FOR_QUERY_TAG => {
283                let status = buf.read_u8()?;
284                Message::ReadyForQuery(ReadyForQueryBody { status })
285            }
286            tag => {
287                return Err(io::Error::new(
288                    io::ErrorKind::InvalidInput,
289                    format!("unknown message tag `{}`", tag),
290                ));
291            }
292        };
293
294        if !buf.is_empty() {
295            return Err(io::Error::new(
296                io::ErrorKind::InvalidInput,
297                "invalid message length: expected buffer to be empty",
298            ));
299        }
300
301        Ok(Some(message))
302    }
303}
304
305struct Buffer {
306    bytes: Bytes,
307    idx: usize,
308}
309
310impl Buffer {
311    #[inline]
312    fn slice(&self) -> &[u8] {
313        &self.bytes[self.idx..]
314    }
315
316    #[inline]
317    fn is_empty(&self) -> bool {
318        self.slice().is_empty()
319    }
320
321    #[inline]
322    fn read_cstr(&mut self) -> io::Result<Bytes> {
323        match memchr(0, self.slice()) {
324            Some(pos) => {
325                let start = self.idx;
326                let end = start + pos;
327                let cstr = self.bytes.slice(start..end);
328                self.idx = end + 1;
329                Ok(cstr)
330            }
331            None => Err(io::Error::new(
332                io::ErrorKind::UnexpectedEof,
333                "unexpected EOF",
334            )),
335        }
336    }
337
338    #[inline]
339    fn read_all(&mut self) -> Bytes {
340        let buf = self.bytes.slice(self.idx..);
341        self.idx = self.bytes.len();
342        buf
343    }
344}
345
346impl Read for Buffer {
347    #[inline]
348    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
349        let len = {
350            let slice = self.slice();
351            let len = cmp::min(slice.len(), buf.len());
352            buf[..len].copy_from_slice(&slice[..len]);
353            len
354        };
355        self.idx += len;
356        Ok(len)
357    }
358}
359
360pub struct AuthenticationMd5PasswordBody {
361    salt: [u8; 4],
362}
363
364impl AuthenticationMd5PasswordBody {
365    #[inline]
366    pub fn salt(&self) -> [u8; 4] {
367        self.salt
368    }
369}
370
371pub struct AuthenticationSha256PasswordBody {
372    random64code: [u8; 64],
373    token: [u8; 8],
374    server_iteration: [u8; 4],
375}
376
377impl AuthenticationSha256PasswordBody {
378    #[inline]
379    pub fn random64code(&self) -> [u8; 64] {
380        self.random64code
381    }
382
383    #[inline]
384    pub fn token(&self) -> [u8; 8] {
385        self.token
386    }
387
388    #[inline]
389    pub fn server_iteration(&self) -> u32 {
390        ((self.server_iteration[0] as u32) << 24)
391            | ((self.server_iteration[1] as u32) << 16)
392            | ((self.server_iteration[2] as u32) << 8)
393            | (self.server_iteration[3] as u32)
394    }
395
396    #[inline]
397    pub fn new(
398        random64code: [u8; 64],
399        token: [u8; 8],
400        server_iteration: [u8; 4],
401    ) -> AuthenticationSha256PasswordBody {
402        AuthenticationSha256PasswordBody {
403            random64code,
404            token,
405            server_iteration,
406        }
407    }
408}
409
410pub struct AuthenticationGssContinueBody(Bytes);
411
412impl AuthenticationGssContinueBody {
413    #[inline]
414    pub fn data(&self) -> &[u8] {
415        &self.0
416    }
417}
418
419pub struct AuthenticationSaslBody(Bytes);
420
421impl AuthenticationSaslBody {
422    #[inline]
423    pub fn mechanisms(&self) -> SaslMechanisms<'_> {
424        SaslMechanisms(&self.0)
425    }
426}
427
428pub struct SaslMechanisms<'a>(&'a [u8]);
429
430impl<'a> FallibleIterator for SaslMechanisms<'a> {
431    type Item = &'a str;
432    type Error = io::Error;
433
434    #[inline]
435    fn next(&mut self) -> io::Result<Option<&'a str>> {
436        let value_end = find_null(self.0, 0)?;
437        if value_end == 0 {
438            if self.0.len() != 1 {
439                return Err(io::Error::new(
440                    io::ErrorKind::InvalidData,
441                    "invalid message length: expected to be at end of iterator for sasl",
442                ));
443            }
444            Ok(None)
445        } else {
446            let value = get_str(&self.0[..value_end])?;
447            self.0 = &self.0[value_end + 1..];
448            Ok(Some(value))
449        }
450    }
451}
452
453pub struct AuthenticationSaslContinueBody(Bytes);
454
455impl AuthenticationSaslContinueBody {
456    #[inline]
457    pub fn data(&self) -> &[u8] {
458        &self.0
459    }
460}
461
462pub struct AuthenticationSaslFinalBody(Bytes);
463
464impl AuthenticationSaslFinalBody {
465    #[inline]
466    pub fn data(&self) -> &[u8] {
467        &self.0
468    }
469}
470
471pub struct BackendKeyDataBody {
472    process_id: i32,
473    secret_key: i32,
474}
475
476impl BackendKeyDataBody {
477    #[inline]
478    pub fn process_id(&self) -> i32 {
479        self.process_id
480    }
481
482    #[inline]
483    pub fn secret_key(&self) -> i32 {
484        self.secret_key
485    }
486}
487
488pub struct CommandCompleteBody {
489    tag: Bytes,
490}
491
492impl CommandCompleteBody {
493    #[inline]
494    pub fn tag(&self) -> io::Result<&str> {
495        get_str(&self.tag)
496    }
497}
498
499pub struct CopyDataBody {
500    storage: Bytes,
501}
502
503impl CopyDataBody {
504    #[inline]
505    pub fn data(&self) -> &[u8] {
506        &self.storage
507    }
508
509    #[inline]
510    pub fn into_bytes(self) -> Bytes {
511        self.storage
512    }
513}
514
515pub struct CopyInResponseBody {
516    format: u8,
517    len: u16,
518    storage: Bytes,
519}
520
521impl CopyInResponseBody {
522    #[inline]
523    pub fn format(&self) -> u8 {
524        self.format
525    }
526
527    #[inline]
528    pub fn column_formats(&self) -> ColumnFormats<'_> {
529        ColumnFormats {
530            remaining: self.len,
531            buf: &self.storage,
532        }
533    }
534}
535
536pub struct ColumnFormats<'a> {
537    buf: &'a [u8],
538    remaining: u16,
539}
540
541impl<'a> FallibleIterator for ColumnFormats<'a> {
542    type Item = u16;
543    type Error = io::Error;
544
545    #[inline]
546    fn next(&mut self) -> io::Result<Option<u16>> {
547        if self.remaining == 0 {
548            if self.buf.is_empty() {
549                return Ok(None);
550            } else {
551                return Err(io::Error::new(
552                    io::ErrorKind::InvalidInput,
553                    "invalid message length: wrong column formats",
554                ));
555            }
556        }
557
558        self.remaining -= 1;
559        self.buf.read_u16::<BigEndian>().map(Some)
560    }
561
562    #[inline]
563    fn size_hint(&self) -> (usize, Option<usize>) {
564        let len = self.remaining as usize;
565        (len, Some(len))
566    }
567}
568
569pub struct CopyOutResponseBody {
570    format: u8,
571    len: u16,
572    storage: Bytes,
573}
574
575impl CopyOutResponseBody {
576    #[inline]
577    pub fn format(&self) -> u8 {
578        self.format
579    }
580
581    #[inline]
582    pub fn column_formats(&self) -> ColumnFormats<'_> {
583        ColumnFormats {
584            remaining: self.len,
585            buf: &self.storage,
586        }
587    }
588}
589
590pub struct DataRowBody {
591    storage: Bytes,
592    len: u16,
593}
594
595impl DataRowBody {
596    #[inline]
597    pub fn ranges(&self) -> DataRowRanges<'_> {
598        DataRowRanges {
599            buf: &self.storage,
600            len: self.storage.len(),
601            remaining: self.len,
602        }
603    }
604
605    #[inline]
606    pub fn buffer(&self) -> &[u8] {
607        &self.storage
608    }
609}
610
611pub struct DataRowRanges<'a> {
612    buf: &'a [u8],
613    len: usize,
614    remaining: u16,
615}
616
617impl<'a> FallibleIterator for DataRowRanges<'a> {
618    type Item = Option<Range<usize>>;
619    type Error = io::Error;
620
621    #[inline]
622    fn next(&mut self) -> io::Result<Option<Option<Range<usize>>>> {
623        if self.remaining == 0 {
624            if self.buf.is_empty() {
625                return Ok(None);
626            } else {
627                return Err(io::Error::new(
628                    io::ErrorKind::InvalidInput,
629                    "invalid message length: datarowrange is not empty",
630                ));
631            }
632        }
633
634        self.remaining -= 1;
635        let len = self.buf.read_i32::<BigEndian>()?;
636        if len < 0 {
637            Ok(Some(None))
638        } else {
639            let len = len as usize;
640            if self.buf.len() < len {
641                return Err(io::Error::new(
642                    io::ErrorKind::UnexpectedEof,
643                    "unexpected EOF",
644                ));
645            }
646            let base = self.len - self.buf.len();
647            self.buf = &self.buf[len as usize..];
648            Ok(Some(Some(base..base + len)))
649        }
650    }
651
652    #[inline]
653    fn size_hint(&self) -> (usize, Option<usize>) {
654        let len = self.remaining as usize;
655        (len, Some(len))
656    }
657}
658
659pub struct ErrorResponseBody {
660    storage: Bytes,
661}
662
663impl ErrorResponseBody {
664    #[inline]
665    pub fn fields(&self) -> ErrorFields<'_> {
666        ErrorFields { buf: &self.storage }
667    }
668}
669
670pub struct ErrorFields<'a> {
671    buf: &'a [u8],
672}
673
674impl<'a> FallibleIterator for ErrorFields<'a> {
675    type Item = ErrorField<'a>;
676    type Error = io::Error;
677
678    #[inline]
679    fn next(&mut self) -> io::Result<Option<ErrorField<'a>>> {
680        let type_ = self.buf.read_u8()?;
681        if type_ == 0 {
682            if self.buf.is_empty() {
683                return Ok(None);
684            } else {
685                return Err(io::Error::new(
686                    io::ErrorKind::InvalidInput,
687                    "invalid message length: error fields is not drained",
688                ));
689            }
690        }
691
692        let value_end = find_null(self.buf, 0)?;
693        let value = get_str(&self.buf[..value_end])?;
694        self.buf = &self.buf[value_end + 1..];
695
696        Ok(Some(ErrorField { type_, value }))
697    }
698}
699
700pub struct ErrorField<'a> {
701    type_: u8,
702    value: &'a str,
703}
704
705impl<'a> ErrorField<'a> {
706    #[inline]
707    pub fn type_(&self) -> u8 {
708        self.type_
709    }
710
711    #[inline]
712    pub fn value(&self) -> &str {
713        self.value
714    }
715}
716
717pub struct NoticeResponseBody {
718    storage: Bytes,
719}
720
721impl NoticeResponseBody {
722    #[inline]
723    pub fn fields(&self) -> ErrorFields<'_> {
724        ErrorFields { buf: &self.storage }
725    }
726}
727
728pub struct NotificationResponseBody {
729    process_id: i32,
730    channel: Bytes,
731    message: Bytes,
732}
733
734impl NotificationResponseBody {
735    #[inline]
736    pub fn process_id(&self) -> i32 {
737        self.process_id
738    }
739
740    #[inline]
741    pub fn channel(&self) -> io::Result<&str> {
742        get_str(&self.channel)
743    }
744
745    #[inline]
746    pub fn message(&self) -> io::Result<&str> {
747        get_str(&self.message)
748    }
749}
750
751pub struct ParameterDescriptionBody {
752    storage: Bytes,
753    len: u16,
754}
755
756impl ParameterDescriptionBody {
757    #[inline]
758    pub fn parameters(&self) -> Parameters<'_> {
759        Parameters {
760            buf: &self.storage,
761            remaining: self.len,
762        }
763    }
764}
765
766pub struct Parameters<'a> {
767    buf: &'a [u8],
768    remaining: u16,
769}
770
771impl<'a> FallibleIterator for Parameters<'a> {
772    type Item = Oid;
773    type Error = io::Error;
774
775    #[inline]
776    fn next(&mut self) -> io::Result<Option<Oid>> {
777        if self.remaining == 0 {
778            if self.buf.is_empty() {
779                return Ok(None);
780            } else {
781                return Err(io::Error::new(
782                    io::ErrorKind::InvalidInput,
783                    "invalid message length: parameters is not drained",
784                ));
785            }
786        }
787
788        self.remaining -= 1;
789        self.buf.read_u32::<BigEndian>().map(Some)
790    }
791
792    #[inline]
793    fn size_hint(&self) -> (usize, Option<usize>) {
794        let len = self.remaining as usize;
795        (len, Some(len))
796    }
797}
798
799pub struct ParameterStatusBody {
800    name: Bytes,
801    value: Bytes,
802}
803
804impl ParameterStatusBody {
805    #[inline]
806    pub fn name(&self) -> io::Result<&str> {
807        get_str(&self.name)
808    }
809
810    #[inline]
811    pub fn value(&self) -> io::Result<&str> {
812        get_str(&self.value)
813    }
814}
815
816pub struct ReadyForQueryBody {
817    status: u8,
818}
819
820impl ReadyForQueryBody {
821    #[inline]
822    pub fn status(&self) -> u8 {
823        self.status
824    }
825}
826
827pub struct RowDescriptionBody {
828    storage: Bytes,
829    len: u16,
830}
831
832impl RowDescriptionBody {
833    #[inline]
834    pub fn fields(&self) -> Fields<'_> {
835        Fields {
836            buf: &self.storage,
837            remaining: self.len,
838        }
839    }
840}
841
842pub struct Fields<'a> {
843    buf: &'a [u8],
844    remaining: u16,
845}
846
847impl<'a> FallibleIterator for Fields<'a> {
848    type Item = Field<'a>;
849    type Error = io::Error;
850
851    #[inline]
852    fn next(&mut self) -> io::Result<Option<Field<'a>>> {
853        if self.remaining == 0 {
854            if self.buf.is_empty() {
855                return Ok(None);
856            } else {
857                return Err(io::Error::new(
858                    io::ErrorKind::InvalidInput,
859                    "invalid message length: field is not drained",
860                ));
861            }
862        }
863
864        self.remaining -= 1;
865        let name_end = find_null(self.buf, 0)?;
866        let name = get_str(&self.buf[..name_end])?;
867        self.buf = &self.buf[name_end + 1..];
868        let table_oid = self.buf.read_u32::<BigEndian>()?;
869        let column_id = self.buf.read_i16::<BigEndian>()?;
870        let type_oid = self.buf.read_u32::<BigEndian>()?;
871        let type_size = self.buf.read_i16::<BigEndian>()?;
872        let type_modifier = self.buf.read_i32::<BigEndian>()?;
873        let format = self.buf.read_i16::<BigEndian>()?;
874
875        Ok(Some(Field {
876            name,
877            table_oid,
878            column_id,
879            type_oid,
880            type_size,
881            type_modifier,
882            format,
883        }))
884    }
885}
886
887pub struct Field<'a> {
888    name: &'a str,
889    table_oid: Oid,
890    column_id: i16,
891    type_oid: Oid,
892    type_size: i16,
893    type_modifier: i32,
894    format: i16,
895}
896
897impl<'a> Field<'a> {
898    #[inline]
899    pub fn name(&self) -> &'a str {
900        self.name
901    }
902
903    #[inline]
904    pub fn table_oid(&self) -> Oid {
905        self.table_oid
906    }
907
908    #[inline]
909    pub fn column_id(&self) -> i16 {
910        self.column_id
911    }
912
913    #[inline]
914    pub fn type_oid(&self) -> Oid {
915        self.type_oid
916    }
917
918    #[inline]
919    pub fn type_size(&self) -> i16 {
920        self.type_size
921    }
922
923    #[inline]
924    pub fn type_modifier(&self) -> i32 {
925        self.type_modifier
926    }
927
928    #[inline]
929    pub fn format(&self) -> i16 {
930        self.format
931    }
932}
933
934#[inline]
935fn find_null(buf: &[u8], start: usize) -> io::Result<usize> {
936    match memchr(0, &buf[start..]) {
937        Some(pos) => Ok(pos + start),
938        None => Err(io::Error::new(
939            io::ErrorKind::UnexpectedEof,
940            "unexpected EOF",
941        )),
942    }
943}
944
945#[inline]
946fn get_str(buf: &[u8]) -> io::Result<&str> {
947    str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
948}