use anyhow::Context;
use bytes::{BufMut, Bytes, BytesMut};
use crate::errors::EncodingError;
pub(crate) use self::{
auth::Auth, begin::Begin, call::Call, commit::Commit, delete::Delete, eval::Eval,
execute::Execute, id::Id, insert::Insert, ping::Ping, prepare::Prepare, replace::Replace,
rollback::Rollback, select::Select, update::Update, upsert::Upsert,
};
use std::io::Write;
use super::consts::{keys, RequestType};
mod auth;
mod begin;
mod call;
mod commit;
mod delete;
mod eval;
mod execute;
mod id;
mod insert;
mod ping;
mod prepare;
mod replace;
mod rollback;
mod select;
mod update;
mod upsert;
pub const PROTOCOL_VERSION: u8 = 3;
const DEFAULT_ENCODE_BUFFER_SIZE: usize = 128;
const INDEX_BASE_VALUE: u32 = 0;
pub trait Request {
fn request_type() -> RequestType
where
Self: Sized;
fn encode(&self, buf: &mut dyn Write) -> Result<(), EncodingError>;
}
#[doc(hidden)]
pub struct EncodedRequest {
pub(crate) request_type: RequestType,
pub(crate) sync: u32,
pub(crate) schema_version: Option<u32>,
pub(crate) stream_id: Option<u32>,
pub(crate) encoded_body: Bytes,
}
impl EncodedRequest {
pub fn new<Body: Request>(body: Body, stream_id: Option<u32>) -> Result<Self, EncodingError> {
let mut buf = BytesMut::with_capacity(DEFAULT_ENCODE_BUFFER_SIZE).writer();
body.encode(&mut buf)?;
Ok(Self {
request_type: Body::request_type(),
sync: 0,
schema_version: None,
stream_id,
encoded_body: buf.into_inner().freeze(),
})
}
pub fn encode(&self, mut buf: impl Write) -> Result<(), EncodingError> {
let map_len = 2
+ if self.schema_version.is_some() { 1 } else { 0 }
+ if self.stream_id.is_some() { 1 } else { 0 };
rmp::encode::write_map_len(&mut buf, map_len)?;
rmp::encode::write_pfix(&mut buf, keys::REQUEST_TYPE)?;
rmp::encode::write_u8(&mut buf, self.request_type as u8)?;
rmp::encode::write_pfix(&mut buf, keys::SYNC)?;
rmp::encode::write_u32(&mut buf, self.sync)?;
if let Some(x) = self.schema_version {
rmp::encode::write_pfix(&mut buf, keys::SCHEMA_VERSION)?;
rmp::encode::write_u32(&mut buf, x)?;
}
if let Some(x) = self.stream_id {
rmp::encode::write_pfix(&mut buf, keys::STREAM_ID)?;
rmp::encode::write_u32(&mut buf, x)?;
}
buf.write_all(&self.encoded_body)
.context("Failed to write encoded body to buffer")
.map_err(EncodingError::MessagePack)
}
pub(crate) fn sync_mut(&mut self) -> &mut u32 {
&mut self.sync
}
}