1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
use bytes::{Buf, Bytes, BytesMut}; use fallible_iterator::FallibleIterator; use postgres_protocol::message::backend; use postgres_protocol::message::frontend::CopyData; use std::io; use tokio_util::codec::{Decoder, Encoder}; pub enum FrontendMessage { Raw(Bytes), CopyData(CopyData<Box<dyn Buf + Send>>), } pub enum BackendMessage { Normal { messages: BackendMessages, request_complete: bool, }, Async(backend::Message), } pub struct BackendMessages(BytesMut); impl BackendMessages { pub fn empty() -> BackendMessages { BackendMessages(BytesMut::new()) } } impl FallibleIterator for BackendMessages { type Item = backend::Message; type Error = io::Error; fn next(&mut self) -> io::Result<Option<backend::Message>> { backend::Message::parse(&mut self.0) } } pub struct PostgresCodec; impl Encoder<FrontendMessage> for PostgresCodec { type Error = io::Error; fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> { match item { FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf), FrontendMessage::CopyData(data) => data.write(dst), } Ok(()) } } impl Decoder for PostgresCodec { type Item = BackendMessage; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> { let mut idx = 0; let mut request_complete = false; while let Some(header) = backend::Header::parse(&src[idx..])? { let len = header.len() as usize + 1; if src[idx..].len() < len { break; } match header.tag() { backend::NOTICE_RESPONSE_TAG | backend::NOTIFICATION_RESPONSE_TAG | backend::PARAMETER_STATUS_TAG => { if idx == 0 { let message = backend::Message::parse(src)?.unwrap(); return Ok(Some(BackendMessage::Async(message))); } else { break; } } _ => {} } idx += len; if header.tag() == backend::READY_FOR_QUERY_TAG { request_complete = true; break; } } if idx == 0 { Ok(None) } else { Ok(Some(BackendMessage::Normal { messages: BackendMessages(src.split_to(idx)), request_complete, })) } } }