use crate::error::ReadError;
use crate::google::storage::v2::ReadRange as ProtoRange;
type ReadResult<T> = std::result::Result<T, ReadError>;
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct NormalizedRange {
offset: i64,
length: Option<i64>,
}
impl NormalizedRange {
pub fn new(offset: i64) -> ReadResult<Self> {
if offset < 0 {
return Err(ReadError::InvalidBidiStreamingReadResponse(
format!("invalid offset ({offset})").into(),
));
}
Ok(Self {
offset,
length: None,
})
}
pub fn with_length(mut self, length: i64) -> ReadResult<Self> {
if length < 0 {
return Err(ReadError::InvalidBidiStreamingReadResponse(
format!("invalid length ({length})").into(),
));
}
self.length = Some(length);
Ok(self)
}
pub fn from_proto(response: ProtoRange) -> ReadResult<Self> {
match (response.read_offset, response.read_length) {
(o, 0) => Self::new(o),
(o, l) => Self::new(o)?.with_length(l),
}
}
pub fn length(&self) -> Option<i64> {
self.length
}
pub fn as_proto(&self, id: i64) -> ProtoRange {
ProtoRange {
read_id: id,
read_offset: self.offset,
read_length: self.length.unwrap_or_default(),
}
}
pub fn update(&mut self, response: ProtoRange) -> ReadResult<()> {
let update = NormalizedRange::from_proto(response)?;
if update.offset != self.offset {
return Err(ReadError::bidi_out_of_order(self.offset, update.offset));
}
self.length = match (self.length, update.length) {
(None, _) => None,
(Some(l), None) => Some(l),
(Some(expected), Some(got)) if got <= expected => Some(expected - got),
(Some(expected), Some(got)) => {
return Err(ReadError::LongRead {
got: got as u64,
expected: expected as u64,
});
}
};
self.offset = update.offset + update.length().unwrap_or_default();
Ok(())
}
pub fn handle_empty(&self, end: bool) -> ReadResult<()> {
match (end, self.length) {
(true, Some(l)) if l > 0 => Err(ReadError::ShortRead(l as u64)),
(true, Some(_)) => Ok(()),
_ => Ok(()),
}
}
}
#[cfg(test)]
mod tests {
use super::super::tests::proto_range;
use super::*;
#[test]
fn without_length() -> anyhow::Result<()> {
let got = NormalizedRange::new(100)?;
let want = NormalizedRange {
offset: 100,
length: None,
};
assert_eq!(got, want);
assert!(got.length().is_none(), "{got:?}");
let proto = got.as_proto(123456);
let want = ProtoRange {
read_id: 123456,
read_offset: 100,
read_length: 0,
};
assert_eq!(proto, want);
Ok(())
}
#[test]
fn bad_offset() {
let got = NormalizedRange::new(-100);
assert!(
matches!(got, Err(ReadError::InvalidBidiStreamingReadResponse(_))),
"{got:?}"
);
}
#[test]
fn with_length() -> anyhow::Result<()> {
let input = NormalizedRange::new(100)?;
let got = input.with_length(50)?;
let want = NormalizedRange {
offset: 100,
length: Some(50),
};
assert_eq!(got, want);
assert_eq!(got.length(), Some(50), "{got:?}");
let proto = got.as_proto(123456);
let want = ProtoRange {
read_id: 123456,
read_offset: 100,
read_length: 50,
};
assert_eq!(proto, want);
Ok(())
}
#[test]
fn bad_length() -> anyhow::Result<()> {
let got = NormalizedRange::new(100)?.with_length(-50);
assert!(
matches!(got, Err(ReadError::InvalidBidiStreamingReadResponse(_))),
"{got:?}"
);
Ok(())
}
#[test]
fn update_errors() -> anyhow::Result<()> {
let mut normalized = NormalizedRange::new(100)?.with_length(50)?;
let response = ProtoRange {
read_offset: -50,
..ProtoRange::default()
};
let got = normalized.update(response);
assert!(got.is_err(), "{got:?}");
let response = proto_range(50, 0);
let got = normalized.update(response);
assert!(
matches!(got, Err(ReadError::InvalidBidiStreamingReadResponse(_))),
"{got:?}"
);
let response = proto_range(200, 0);
let got = normalized.update(response);
assert!(
matches!(got, Err(ReadError::InvalidBidiStreamingReadResponse(_))),
"{got:?}"
);
Ok(())
}
#[test]
fn update_with_length() -> anyhow::Result<()> {
let mut normalized = NormalizedRange::new(100)?.with_length(200)?;
let response = proto_range(100, 25);
normalized.update(response)?;
assert_eq!((normalized.offset, normalized.length()), (125, Some(175)));
let response = proto_range(125, 50);
normalized.update(response)?;
assert_eq!((normalized.offset, normalized.length()), (175, Some(125)));
let response = proto_range(175, 0);
normalized.update(response)?;
assert_eq!((normalized.offset, normalized.length()), (175, Some(125)));
Ok(())
}
#[test]
fn update_without_length() -> anyhow::Result<()> {
let mut normalized = NormalizedRange::new(100)?;
let response = proto_range(100, 25);
normalized.update(response)?;
assert_eq!((normalized.offset, normalized.length()), (125, None));
let response = proto_range(125, 50);
normalized.update(response)?;
assert_eq!((normalized.offset, normalized.length()), (175, None));
let response = proto_range(175, 0);
normalized.update(response)?;
assert_eq!((normalized.offset, normalized.length()), (175, None));
Ok(())
}
#[test]
fn update_with_bad_length() -> anyhow::Result<()> {
let mut normalized = NormalizedRange::new(100)?.with_length(200)?;
let response = proto_range(100, 300);
let result = normalized.update(response);
assert!(
matches!(result, Err(ReadError::LongRead { expected, got }) if expected == 200_u64 && got == 300_u64),
"{result:?}"
);
Ok(())
}
#[test]
fn handle_empty() -> anyhow::Result<()> {
let normalized = NormalizedRange::new(100)?.with_length(50)?;
normalized.handle_empty(false)?;
let result = normalized.handle_empty(true);
assert!(matches!(result, Err(ReadError::ShortRead(_))), "{result:?}");
let normalized = NormalizedRange::new(100)?;
normalized.handle_empty(false)?;
normalized.handle_empty(true)?;
Ok(())
}
}