//! Represents a memory backed RESP response.
//!
//! We use an internal buffer here so that we can build the complete response in one go without
//! blocking and then push the whole thing into the network with a single sys-call.
//!
//! Therefore we pre-allocate a buffer of 8k and grow this if needed. Note that so far, no
//! intermediate write will happen and the whole response is buffered in memory. As we expect
//! responses to be small, this is a good approach - however, when trying to send gigabytes,
//! this isn't probably the way to go.
//!
//! # Example
//!
//! ```
//! # use std::error::Error;
//! # use jupiter::response::Response;
//! # use jupiter::response::OutputError;
//!
//! # fn main() -> Result<(), OutputError> {
//! let mut response = Response::new();
//! response.ok()?;
//! assert_eq!(response.complete_string()?, "+OK\r\n");
//!
//! # Ok(())
//! # }
//! ```
//!
use std::error::Error;
use std::fmt::{Display, Formatter, Write};
use anyhow::anyhow;
use bytes::BytesMut;
/// Enumerates the possible errors when creating a response.
#[derive(Debug)]
pub enum OutputError {
/// Represents any IO or formatting error while generating the response.
IoError(std::fmt::Error),
/// Represents a protocol error which most probably indicates an invalid nesting
/// (e.g. when providing too few or too many result entries for an array..).
ProtocolError(anyhow::Error),
}
impl From<std::fmt::Error> for OutputError {
fn from(err: std::fmt::Error) -> OutputError {
OutputError::IoError(err)
}
}
impl From<anyhow::Error> for OutputError {
fn from(err: anyhow::Error) -> OutputError {
OutputError::ProtocolError(err)
}
}
impl Display for OutputError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
OutputError::IoError(e) => write!(f, "IO error: {:?}", e),
OutputError::ProtocolError(e) => write!(f, "Protocol error: {:?}", e),
}
}
}
impl Error for OutputError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match *self {
OutputError::IoError(ref e) => Some(e),
OutputError::ProtocolError(_) => None,
}
}
}
/// Represents the result type for all output operations.
///
/// The operations itself don't generate a result but might emit an **OutputError**.
pub type OutputResult = std::result::Result<(), OutputError>;
/// Represents a RESP response being built.
///
/// This is the buffer in which the response is stored along with a control structure which monitors
/// the nesting of the result and ensures that only valid response will be generated.
#[derive(Default)]
pub struct Response {
buffer: BytesMut,
nesting: Vec<i32>,
}
/// Represents a separator used when outputting management data.
pub static SEPARATOR: &str =
"---------------------------------------------------------------------------------------------------\n";
impl Response {
/// Creates a new response.
///
/// Creates a new response instance which internally allocates a buffer of 8 kB. Note that
/// this expects a single response element. If several elements are to be output, an
/// [Response::array](Response::array) has to be used.
pub fn new() -> Self {
Response {
buffer: BytesMut::with_capacity(8192),
nesting: vec![1],
}
}
fn check_nesting(&mut self) -> OutputResult {
let current_nesting = match self.nesting.last_mut() {
Some(level) => level,
None => {
return Err(OutputError::ProtocolError(anyhow!(
"Invalid result nesting!"
)));
}
};
*current_nesting -= 1;
match *current_nesting {
nesting if nesting > 0 => Ok(()),
nesting if nesting == 0 => {
let _ = self.nesting.pop();
Ok(())
}
_ => Err(OutputError::ProtocolError(anyhow!(
"Invalid result nesting!"
))),
}
}
#[inline]
fn reserve(&mut self, required_length: usize) {
let len = self.buffer.len();
let rem = self.buffer.capacity() - len;
if rem < required_length {
self.reserve_inner(required_length);
}
}
fn reserve_inner(&mut self, required_length: usize) {
let required_blocks = (required_length / 8192) + 1;
self.buffer.reserve(required_blocks * 8192);
}
/// Completes the request and returns the serialized response as bytes buffer.
///
/// Note that this returns an error if the internal nesting is invalid (e.g. an array
/// is missing elements).
///
/// As this consumes **self**, this is the final operation to be performed on a response.
pub fn complete(mut self) -> Result<BytesMut, OutputError> {
if !self.nesting.is_empty() {
return Err(OutputError::ProtocolError(anyhow!(
"Invalid result nesting!"
)));
}
self.nesting.push(1);
Ok(self.buffer)
}
/// Provides a helper method which directly transforms the response into its string
/// representation.
///
/// This is only intended to be used in test environments to verify if a generated response as
/// the expected size and shape.
///
/// Note that this does not support responses which contain non UTF-8 data (which are generally
/// supported by RESP).
pub fn complete_string(self) -> Result<String, OutputError> {
let buffer = self.complete()?;
match std::str::from_utf8(&buffer[..]) {
Ok(str) => Ok(str.to_owned()),
Err(_) => Err(OutputError::ProtocolError(anyhow::anyhow!(
"Non UTF-8 data found"
))),
}
}
/// Starts an array with the given number of items.
///
/// Note that this call has to be followed by the exact number of outer output calls in order
/// to generate a valid response.
///
/// # Example
///
/// Properly building an array:
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.array(2)?;
/// response.simple("Hello")?;
/// response.simple("World")?;
///
/// assert_eq!(response.complete_string()?, "*2\r\n+Hello\r\n+World\r\n");
/// # Ok(())
/// # }
/// ```
///
/// This will panic as the array is missing elements
/// ```should_panic
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.array(3)?;
/// response.simple("Hello")?;
///
/// response.complete_string().unwrap();
/// # Ok(())
/// # }
/// ```
pub fn array(&mut self, items: i32) -> OutputResult {
self.check_nesting()?;
if items > 0 {
self.nesting.push(items);
}
self.reserve(16);
self.buffer.write_char('*')?;
write!(self.buffer, "{}\r\n", items)?;
Ok(())
}
/// Emits "OK" as simple string.
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.ok()?;
/// assert_eq!(response.complete_string()?, "+OK\r\n");
/// # Ok(())
/// # }
/// ```
pub fn ok(&mut self) -> OutputResult {
self.check_nesting()?;
self.reserve(5);
self.buffer.write_str("+OK\r\n")?;
Ok(())
}
/// Emits "0" as number.
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.zero()?;
/// assert_eq!(response.complete_string()?, ":0\r\n");
/// # Ok(())
/// # }
/// ```
pub fn zero(&mut self) -> OutputResult {
self.check_nesting()?;
self.reserve(4);
self.buffer.write_str(":0\r\n")?;
Ok(())
}
/// Emits "1" as number.
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.one()?;
/// assert_eq!(response.complete_string()?, ":1\r\n");
/// # Ok(())
/// # }
/// ```
pub fn one(&mut self) -> OutputResult {
self.check_nesting()?;
self.reserve(4);
self.buffer.write_str(":1\r\n")?;
Ok(())
}
/// Emits the given number.
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.number(42)?;
/// assert_eq!(response.complete_string()?, ":42\r\n");
/// # Ok(())
/// # }
/// ```
pub fn number(&mut self, number: i64) -> OutputResult {
if number == 0 {
self.zero()
} else if number == 1 {
self.one()
} else {
self.check_nesting()?;
self.reserve(32);
self.buffer.write_char(':')?;
write!(self.buffer, "{}\r\n", number)?;
Ok(())
}
}
/// Emits "1" if the given value is **true** or "0" otherwise.
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.boolean(true)?;
/// assert_eq!(response.complete_string()?, ":1\r\n");
/// # Ok(())
/// # }
/// ```
pub fn boolean(&mut self, boolean: bool) -> OutputResult {
self.number(i64::from(boolean))
}
/// Emits the given string as **simple string**.
///
/// A simple string is encoded as "+STRING_VALUE". This requires that the given string
/// is valid UTF-8 and that it does not contain any line breaks (CR or LF). This isn't enforced
/// by this method. When in doubt, use [Response::bulk](Response::bulk).
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.simple("Hello World")?;
/// assert_eq!(response.complete_string()?, "+Hello World\r\n");
/// # Ok(())
/// # }
/// ```
pub fn simple(&mut self, string: impl AsRef<str>) -> OutputResult {
if string.as_ref().is_empty() {
self.empty_string()
} else {
self.check_nesting()?;
self.reserve(3 + string.as_ref().len());
self.buffer.write_char('+')?;
self.buffer.write_str(string.as_ref())?;
self.buffer.write_str("\r\n")?;
Ok(())
}
}
/// Emits an empty string.
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.empty_string()?;
/// assert_eq!(response.complete_string()?, "+\r\n");
/// # Ok(())
/// # }
/// ```
pub fn empty_string(&mut self) -> OutputResult {
self.check_nesting()?;
self.reserve(3);
self.buffer.write_str("+\r\n")?;
Ok(())
}
/// Emits the given string as bulk data.
///
/// RESP doesn't provide any requirements for bulk string. These can therefore contain line
/// breaks (CR and or LF) as well as non UTF-8 data (which isn't supported here).
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.bulk("Hello\nWorld")?;
/// assert_eq!(response.complete_string()?, "$11\r\nHello\nWorld\r\n");
/// # Ok(())
/// # }
/// ```
pub fn bulk(&mut self, string: impl AsRef<str>) -> OutputResult {
self.check_nesting()?;
self.reserve(3 + 16 + string.as_ref().len());
self.buffer.write_char('$')?;
write!(self.buffer, "{}\r\n", string.as_ref().len())?;
self.buffer.write_str(string.as_ref())?;
self.buffer.write_str("\r\n")?;
Ok(())
}
/// Emits an error message.
///
/// Errors are encoded as "-ERROR MESSAGE". Therefore, just like simple strings, these must not
/// contain line breaks (CR or LF) or non UTF-8 data.
///
/// This method will automatically transform CR and LF to " " so that we do not double fail
/// (crash when reporting an error).
///
/// # Example
///
/// ```
/// # use jupiter::response::{OutputResult, Response};
/// # fn main() -> OutputResult {
/// let mut response = Response::new();
/// response.error("Good bye,\ncruel World")?;
/// assert_eq!(response.complete_string()?, "-Good bye, cruel World\r\n");
/// # Ok(())
/// # }
/// ```
pub fn error(&mut self, string: impl AsRef<str>) -> OutputResult {
self.check_nesting()?;
self.reserve(3 + string.as_ref().len());
self.buffer.write_char('-')?;
self.buffer.write_str(
string
.as_ref()
.to_owned()
.replace(['\r', '\n'], " ")
.as_str(),
)?;
self.buffer.write_str("\r\n")?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::request::Request;
use crate::response::Response;
#[test]
fn an_array_of_bulk_strings_can_be_read_by_request() {
let mut response = Response::new();
response.array(2).unwrap();
response.bulk("Hello").unwrap();
response.bulk("World").unwrap();
let mut buffer = response.complete().unwrap();
let request = Request::parse(&mut buffer).unwrap().unwrap();
assert_eq!(request.command(), "Hello");
assert_eq!(request.parameter_count(), 1);
assert_eq!(request.str_parameter(0).unwrap(), "World");
}
#[test]
fn errors_are_sanitized() {
let mut response = Response::new();
response.error("Error\nProblem").unwrap();
assert_eq!(response.complete_string().unwrap(), "-Error Problem\r\n");
}
#[test]
fn incorrect_nesting_is_detected() {
{
let mut response = Response::new();
response.array(2).unwrap();
response.ok().unwrap();
assert_eq!(response.complete().is_err(), true);
}
{
let mut response = Response::new();
response.ok().unwrap();
assert_eq!(response.ok().is_err(), true);
}
{
let mut response = Response::new();
response.array(1).unwrap();
response.ok().unwrap();
assert_eq!(response.ok().is_err(), true);
}
}
#[test]
fn dynamic_buffer_allocation_works() {
let many_x = "X".repeat(16_000);
let many_y = "Y".repeat(16_000);
let mut response = Response::new();
response.array(2).unwrap();
response.simple(many_x.as_str()).unwrap();
response.bulk(many_y.as_str()).unwrap();
assert_eq!(
response.complete_string().unwrap(),
format!("*2\r\n+{}\r\n$16000\r\n{}\r\n", many_x, many_y)
);
}
}