use super::{Event, SourceId};
use crate::tagged::{EventKey, StreamId};
use derive_more::{Display, From, Into};
use num_traits::Bounded;
use serde::{
de::{Error, Visitor},
Deserialize, Deserializer, Serialize,
};
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
fmt::{self, Debug},
iter::FromIterator,
ops::{Add, AddAssign, BitAnd, BitAndAssign, BitOr, BitOrAssign, Sub, SubAssign},
};
const MAX_SAFE_INT: i64 = 9_007_199_254_740_991;
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Hash, PartialEq, Eq, PartialOrd, Ord, From, Into, Display)]
#[cfg_attr(feature = "dataflow", derive(Abomonation))]
pub struct OffsetOrMin(#[serde(with = "i64_from_minus_one")] i64);
mod i64_from_minus_one {
use super::*;
use serde::Serializer;
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<i64, D::Error> {
fn range<E: Error>(r: Result<i64, E>) -> Result<i64, E> {
r.and_then(|i| {
if i < -1 {
Err(E::custom(format!("number {} is below -1", i)))
} else if i > MAX_SAFE_INT {
Err(E::custom(format!("number {} is too large", i)))
} else {
Ok(i)
}
})
}
struct X;
impl<'de> Visitor<'de> for X {
type Value = i64;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("string 'min'/'max' or integer")
}
fn visit_str<E: Error>(self, v: &str) -> Result<Self::Value, E> {
match v {
"min" => Ok(-1),
"max" => Ok(MAX_SAFE_INT),
v => range(v.parse::<i64>().map_err(Error::custom)),
}
}
fn visit_f64<E: Error>(self, v: f64) -> Result<Self::Value, E> {
let i = v as i64;
#[allow(clippy::float_cmp)]
if i as f64 == v {
range(Ok(i))
} else {
Err(E::custom("not an integer"))
}
}
fn visit_u64<E: Error>(self, v: u64) -> Result<Self::Value, E> {
if v <= i64::max_value() as u64 {
range(Ok(v as i64))
} else {
Err(E::custom("number too large"))
}
}
fn visit_i64<E: Error>(self, v: i64) -> Result<Self::Value, E> {
range(Ok(v))
}
}
d.deserialize_any(X)
}
pub fn serialize<S: Serializer>(t: &i64, s: S) -> Result<S::Ok, S::Error> {
if *t < 0 {
"min".serialize(s)
} else {
t.serialize(s)
}
}
}
impl OffsetOrMin {
pub const ZERO: OffsetOrMin = OffsetOrMin(0);
pub const MAX: OffsetOrMin = OffsetOrMin(MAX_SAFE_INT);
pub const MIN: OffsetOrMin = OffsetOrMin(-1);
pub fn mk_test(o: u32) -> Self {
Self(o.into())
}
pub fn succ(&self) -> Offset {
if *self == Self::MAX {
panic!("cannot increment OffsetOrMin({})", self)
}
Offset(self.0 + 1)
}
pub fn pred(&self) -> Option<Self> {
if self > &Self::MIN {
Some(Self(self.0 - 1))
} else {
None
}
}
}
impl Default for OffsetOrMin {
fn default() -> Self {
Self::MIN
}
}
impl From<Offset> for OffsetOrMin {
fn from(o: Offset) -> Self {
Self(o.0)
}
}
impl PartialEq<Offset> for OffsetOrMin {
fn eq(&self, other: &Offset) -> bool {
OffsetOrMin::from(*other) == *self
}
}
impl PartialOrd<Offset> for OffsetOrMin {
fn partial_cmp(&self, other: &Offset) -> Option<Ordering> {
self.partial_cmp(&OffsetOrMin::from(*other))
}
}
impl Sub for OffsetOrMin {
type Output = i64;
fn sub(self, rhs: Self) -> Self::Output {
self.0 - rhs.0
}
}
impl Add<u32> for OffsetOrMin {
type Output = Self;
fn add(self, rhs: u32) -> Self {
if Self::MAX - self < i64::from(rhs) {
panic!("cannot add {} to OffsetOrMin({})", rhs, self)
}
Self(self.0 + i64::from(rhs))
}
}
impl Bounded for OffsetOrMin {
fn min_value() -> Self {
OffsetOrMin::MIN
}
fn max_value() -> Self {
OffsetOrMin::MAX
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Hash, PartialEq, Eq, PartialOrd, Ord, Display)]
#[cfg_attr(feature = "dataflow", derive(Abomonation))]
pub struct Offset(#[serde(deserialize_with = "offset_i64")] i64);
impl From<Offset> for u64 {
fn from(value: Offset) -> Self {
value.0 as u64
}
}
impl TryFrom<u64> for Offset {
type Error = &'static str;
fn try_from(value: u64) -> Result<Self, Self::Error> {
if value > MAX_SAFE_INT as u64 {
Err("number too large")
} else {
Ok(Offset(value as i64))
}
}
}
fn validate_offset(o: i64) -> Result<Offset, &'static str> {
if o < 0 {
Err("negative number")
} else if o > MAX_SAFE_INT {
Err("number too large")
} else {
Ok(Offset(o))
}
}
fn offset_i64<'de, D: Deserializer<'de>>(d: D) -> Result<i64, D::Error> {
let o = i64::deserialize(d)?;
validate_offset(o).map(|o| o - Offset::ZERO).map_err(D::Error::custom)
}
impl Offset {
pub const ZERO: Offset = Offset(0);
pub const MAX: Offset = Offset(MAX_SAFE_INT);
pub fn mk_test(o: u32) -> Self {
Self(o.into())
}
pub fn from_offset_or_min(o: OffsetOrMin) -> Option<Self> {
if o >= Self::ZERO {
Some(Self(o.0))
} else {
None
}
}
pub fn succ(&self) -> Self {
if *self == Self::MAX {
panic!("cannot increment Offset({})", self)
}
Self(self.0 + 1)
}
pub fn pred(&self) -> Option<Self> {
if self > &Self::ZERO {
Some(Self(self.0 - 1))
} else {
None
}
}
pub fn pred_or_min(&self) -> OffsetOrMin {
OffsetOrMin(self.0 - 1)
}
}
impl Default for Offset {
fn default() -> Self {
Self::ZERO
}
}
impl PartialEq<OffsetOrMin> for Offset {
fn eq(&self, other: &OffsetOrMin) -> bool {
OffsetOrMin::from(*self) == *other
}
}
impl PartialOrd<OffsetOrMin> for Offset {
fn partial_cmp(&self, other: &OffsetOrMin) -> Option<Ordering> {
OffsetOrMin::from(*self).partial_cmp(other)
}
}
impl Sub for Offset {
type Output = i64;
fn sub(self, rhs: Self) -> Self::Output {
self.0 - rhs.0
}
}
impl Add<u32> for Offset {
type Output = Self;
fn add(self, rhs: u32) -> Self {
if Self::MAX - self < i64::from(rhs) {
panic!("cannot add {} to Offset({})", rhs, self)
}
Self(self.0 + i64::from(rhs))
}
}
impl Bounded for Offset {
fn min_value() -> Self {
Offset::ZERO
}
fn max_value() -> Self {
Offset::MAX
}
}
#[cfg(feature = "sqlite")]
mod sqlite {
use super::*;
use rusqlite::{
types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
ToSql,
};
impl FromSql for Offset {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
value
.as_i64()
.and_then(|o| validate_offset(o).map_err(|_| FromSqlError::OutOfRange(o)))
}
}
impl ToSql for Offset {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(ToSqlOutput::from(self.0))
}
}
}
#[cfg(feature = "postgresql")]
mod postgresql {
use super::*;
use bytes::BytesMut;
use postgres_types::{FromSql, IsNull, ToSql, Type};
impl<'a> FromSql<'a> for Offset {
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
i64::from_sql(ty, raw).and_then(|o| validate_offset(o).map_err(|e| e.into()))
}
fn accepts(ty: &Type) -> bool {
<i64 as FromSql>::accepts(ty)
}
}
impl ToSql for Offset {
fn accepts(ty: &Type) -> bool
where
Self: Sized,
{
<i64 as ToSql>::accepts(ty)
}
fn to_sql_checked(
&self,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
self.0.to_sql_checked(ty, out)
}
fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
self.0.to_sql(ty, out)
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(from = "BTreeMap<StreamId, OffsetOrMin>")]
pub struct OffsetMap(BTreeMap<StreamId, Offset>);
impl OffsetMap {
pub fn empty() -> Self {
Default::default()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn contains<T>(&self, event: &Event<T>) -> bool {
self.0.get(&event.stream.source.into()).copied().unwrap_or_default() >= event.offset
}
pub fn contains_source(&self, source: &SourceId) -> bool {
self.0.contains_key(&source.into())
}
pub fn contains_stream(&self, stream: &StreamId) -> bool {
self.0.contains_key(stream)
}
pub fn offset(&self, stream: impl Into<StreamId>) -> OffsetOrMin {
self.get(stream).map(|o| o.into()).unwrap_or(OffsetOrMin::MIN)
}
pub fn get(&self, stream: impl Into<StreamId>) -> Option<Offset> {
self.0.get(&stream.into()).cloned()
}
pub fn size(&self) -> u64 {
self - &OffsetMap::empty()
}
pub fn union_with(&mut self, other: &OffsetMap) {
for (k, v) in &other.0 {
self.0.entry(*k).and_modify(|me| *me = (*me).max(*v)).or_insert(*v);
}
}
pub fn union(&self, other: &OffsetMap) -> OffsetMap {
let mut copy = self.clone();
copy.union_with(other);
copy
}
#[allow(clippy::needless_collect)]
pub fn intersection_with(&mut self, other: &OffsetMap) {
let keys = self.0.keys().cloned().collect::<Vec<_>>();
for key in keys.into_iter() {
let offset = other.offset(key).min(self.offset(key));
if let Some(offset) = Offset::from_offset_or_min(offset) {
self.0.insert(key, offset);
} else {
self.0.remove(&key);
}
}
}
pub fn intersection(&self, other: &OffsetMap) -> OffsetMap {
let left = self.0.keys().collect::<BTreeSet<_>>();
let right = other.0.keys().collect::<BTreeSet<_>>();
let keys = left.intersection(&right);
Self(
keys.map(|key| {
(
**key,
self.0
.get(key)
.copied()
.unwrap_or_default()
.min(other.0.get(key).copied().unwrap_or_default()),
)
})
.collect(),
)
}
pub fn into_inner(self) -> BTreeMap<StreamId, Offset> {
self.0
}
#[allow(clippy::needless_lifetimes)]
pub fn sources<'a>(&'a self) -> impl Iterator<Item = SourceId> + 'a {
self.0.keys().filter_map(|stream| stream.to_source_id().ok())
}
#[allow(clippy::needless_lifetimes)]
pub fn streams<'a>(&'a self) -> impl Iterator<Item = StreamId> + 'a {
self.0.keys().copied()
}
#[allow(clippy::needless_lifetimes)]
pub fn source_iter<'a>(&'a self) -> impl Iterator<Item = (SourceId, Offset)> + 'a {
self.0
.iter()
.filter_map(|(k, v)| k.to_source_id().ok().map(|k| (k, *v)))
}
#[allow(clippy::needless_lifetimes)]
pub fn stream_iter<'a>(&'a self) -> impl Iterator<Item = (StreamId, Offset)> + 'a {
self.0.iter().map(|(k, v)| (*k, *v))
}
pub fn update(&mut self, stream: impl Into<StreamId>, offset: Offset) -> Option<OffsetOrMin> {
let stream = stream.into();
let previous = self.offset(stream);
if offset > previous {
self.0.insert(stream, offset);
Some(previous)
} else {
None
}
}
}
impl PartialOrd for OffsetMap {
fn partial_cmp(&self, rhs: &Self) -> Option<Ordering> {
let lhs = self;
let mut lt = false;
let mut eq = false;
let mut gt = false;
let mut cross = |a: &OffsetOrMin, b: &OffsetOrMin| -> bool {
match Ord::cmp(a, b) {
Ordering::Less => lt = true,
Ordering::Equal => eq = true,
Ordering::Greater => gt = true,
}
lt && gt
};
for (k, a) in &lhs.0 {
let b = &rhs.offset(*k);
if cross(&OffsetOrMin::from(*a), b) {
return None;
}
}
for (k, b) in &rhs.0 {
let a = &lhs.offset(*k);
if cross(a, &OffsetOrMin::from(*b)) {
return None;
}
}
if lt {
Some(Ordering::Less)
} else if gt {
Some(Ordering::Greater)
} else {
Some(Ordering::Equal)
}
}
}
impl AsRef<BTreeMap<StreamId, Offset>> for OffsetMap {
fn as_ref(&self) -> &BTreeMap<StreamId, Offset> {
&self.0
}
}
impl Default for OffsetMap {
fn default() -> Self {
OffsetMap(BTreeMap::new())
}
}
impl From<BTreeMap<SourceId, Offset>> for OffsetMap {
fn from(map: BTreeMap<SourceId, Offset>) -> Self {
map.into_iter().collect()
}
}
impl From<BTreeMap<SourceId, OffsetOrMin>> for OffsetMap {
fn from(map: BTreeMap<SourceId, OffsetOrMin>) -> Self {
map.into_iter()
.filter_map(|(s, o)| Offset::from_offset_or_min(o).map(|o| (s, o)))
.collect()
}
}
impl FromIterator<(SourceId, Offset)> for OffsetMap {
fn from_iter<T: IntoIterator<Item = (SourceId, Offset)>>(iter: T) -> Self {
Self(iter.into_iter().map(|(s, o)| (s.into(), o)).collect())
}
}
impl From<BTreeMap<StreamId, Offset>> for OffsetMap {
fn from(map: BTreeMap<StreamId, Offset>) -> Self {
Self(map)
}
}
impl From<BTreeMap<StreamId, OffsetOrMin>> for OffsetMap {
fn from(map: BTreeMap<StreamId, OffsetOrMin>) -> Self {
map.into_iter()
.filter_map(|(s, o)| Offset::from_offset_or_min(o).map(|o| (s, o)))
.collect()
}
}
impl FromIterator<(StreamId, Offset)> for OffsetMap {
fn from_iter<T: IntoIterator<Item = (StreamId, Offset)>>(iter: T) -> Self {
Self(iter.into_iter().collect())
}
}
impl<T> AddAssign<&Event<T>> for OffsetMap {
fn add_assign(&mut self, other: &Event<T>) {
let off = self.0.entry(other.stream.source.into()).or_default();
if *off < other.offset {
*off = other.offset;
}
}
}
impl AddAssign<&EventKey> for OffsetMap {
fn add_assign(&mut self, other: &EventKey) {
let off = self.0.entry(other.stream).or_default();
if *off < other.offset {
*off = other.offset;
}
}
}
impl<T> SubAssign<&Event<T>> for OffsetMap {
fn sub_assign(&mut self, other: &Event<T>) {
let off = self.0.entry(other.stream.source.into()).or_default();
if *off >= other.offset {
if let Some(o) = other.offset.pred() {
*off = o;
} else {
self.0.remove(&other.stream.source.into());
}
}
}
}
impl SubAssign<&EventKey> for OffsetMap {
fn sub_assign(&mut self, other: &EventKey) {
let off = self.0.entry(other.stream).or_default();
if *off >= other.offset {
if let Some(o) = other.offset.pred() {
*off = o;
} else {
self.0.remove(&other.stream);
}
}
}
}
impl Sub<OffsetMap> for OffsetMap {
type Output = u64;
fn sub(self, other: Self) -> u64 {
&self - &other
}
}
impl Sub<&OffsetMap> for &OffsetMap {
type Output = u64;
fn sub(self, other: &OffsetMap) -> u64 {
let mut ret = 0;
for (k, a) in &self.0 {
let a: OffsetOrMin = (*a).into();
let b = other.offset(*k);
if a > b {
ret += (a - b) as u64;
}
}
ret
}
}
impl BitAnd for OffsetMap {
type Output = OffsetMap;
fn bitand(self, rhs: Self) -> Self::Output {
self.intersection(&rhs)
}
}
impl BitAnd for &OffsetMap {
type Output = OffsetMap;
fn bitand(self, rhs: Self) -> Self::Output {
self.intersection(rhs)
}
}
impl BitAndAssign for OffsetMap {
fn bitand_assign(&mut self, rhs: Self) {
*self = &*self & &rhs;
}
}
impl BitOr for OffsetMap {
type Output = OffsetMap;
fn bitor(mut self, rhs: Self) -> Self::Output {
self.union_with(&rhs);
self
}
}
impl BitOr for &OffsetMap {
type Output = OffsetMap;
fn bitor(self, rhs: Self) -> Self::Output {
self.union(rhs)
}
}
impl BitOrAssign for OffsetMap {
fn bitor_assign(&mut self, rhs: Self) {
*self = &*self | &rhs;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
event::{LamportTimestamp, Payload, StreamInfo, TimeStamp},
fish_name, semantics, source_id,
};
use std::str::FromStr;
fn mk_event(source: &str, offset: u32) -> Event<Payload> {
Event {
lamport: LamportTimestamp::new(1),
stream: StreamInfo {
semantics: semantics!("dummy"),
name: fish_name!("dummy"),
source: SourceId::from_str(source).unwrap(),
},
offset: Offset::mk_test(offset),
timestamp: TimeStamp::now(),
payload: Payload::default(),
}
}
#[test]
#[allow(clippy::eq_op)]
pub fn must_calculate_offset_map() {
let ev1 = &mk_event("a", 1);
let ev2 = &mk_event("b", 2);
let ev3 = &mk_event("c", 1);
let empty = &OffsetMap::default();
let mut map1 = empty.clone();
map1 += ev1;
let mut map2 = map1.clone();
map2 += ev2;
let mut map3 = map1.clone();
map3 += ev3;
assert_eq!(&map2 - &map2, 0);
assert_eq!(&map2 - &map1, 3);
assert_eq!(&map2 - empty, 5);
assert!(map2.contains(ev1));
assert!(map1.contains(ev1));
assert!(map2.contains(ev2));
assert!(!map1.contains(ev2));
assert!(map1 > *empty);
assert!(map1 <= map1);
assert!(map1 >= map1);
assert!(map3 > map1);
assert!(map2 > map1);
assert!(map2.partial_cmp(&map3).is_none());
assert_eq!(map1 - map2, 0);
}
#[test]
pub fn must_set_op() {
let left = OffsetMap::from(
[
(source_id!("a"), Offset::mk_test(1)),
(source_id!("b"), Offset::mk_test(2)),
(source_id!("c"), Offset::mk_test(3)),
(source_id!("d"), Offset::mk_test(4)),
]
.iter()
.copied()
.collect::<BTreeMap<_, _>>(),
);
let right = OffsetMap::from(
[
(source_id!("b"), Offset::mk_test(4)),
(source_id!("c"), Offset::mk_test(3)),
(source_id!("d"), Offset::mk_test(2)),
(source_id!("e"), Offset::mk_test(1)),
]
.iter()
.copied()
.collect::<BTreeMap<_, _>>(),
);
let union = OffsetMap::from(
[
(source_id!("a"), Offset::mk_test(1)),
(source_id!("b"), Offset::mk_test(4)),
(source_id!("c"), Offset::mk_test(3)),
(source_id!("d"), Offset::mk_test(4)),
(source_id!("e"), Offset::mk_test(1)),
]
.iter()
.copied()
.collect::<BTreeMap<_, _>>(),
);
let intersection = OffsetMap::from(
[
(source_id!("b"), Offset::mk_test(2)),
(source_id!("c"), Offset::mk_test(3)),
(source_id!("d"), Offset::mk_test(2)),
]
.iter()
.copied()
.collect::<BTreeMap<_, _>>(),
);
assert_eq!(left.union(&right), union);
assert_eq!(left.intersection(&right), intersection);
assert_eq!(&left | &right, union);
assert_eq!(left & right, intersection);
}
#[test]
fn must_to_string() {
assert_eq!(OffsetOrMin(12).to_string(), "12");
assert_eq!(Offset::mk_test(3).to_string(), "3");
}
fn ser<T: Serialize>(v: T, expected: &str) {
assert_eq!(serde_json::to_string(&v).unwrap(), expected);
}
fn de<'de, T: Deserialize<'de> + Debug + PartialEq>(from: &'de str, expected: T) {
assert_eq!(serde_json::from_str::<T>(from).unwrap(), expected);
}
fn err<'de, T: Deserialize<'de> + Debug>(from: &'de str, msg: &str) {
let s = serde_json::from_str::<T>(from).unwrap_err().to_string();
assert!(s.contains(msg), "{} did not contain {}", s, msg);
}
#[test]
fn must_serde_offset() {
ser(Offset::ZERO, "0");
ser(Offset::mk_test(1), "1");
ser(Offset::MAX, "9007199254740991");
de("0", Offset::ZERO);
de("1", Offset::mk_test(1));
de("9007199254740991", Offset::MAX);
err::<Offset>("-1", "negative");
err::<Offset>("-42", "negative");
err::<Offset>("90071992547409911", "too large");
}
#[test]
fn must_serde_offset_or_min() {
ser(OffsetOrMin::MIN, "\"min\"");
ser(OffsetOrMin::ZERO, "0");
ser(OffsetOrMin::mk_test(42), "42");
ser(OffsetOrMin::MAX, "9007199254740991");
de("\"min\"", OffsetOrMin::MIN);
de("-1", OffsetOrMin::MIN);
de("\"-1\"", OffsetOrMin::MIN);
de("0", OffsetOrMin::ZERO);
de("1", OffsetOrMin::mk_test(1));
de("9007199254740991", OffsetOrMin::MAX);
de("\"9007199254740991\"", OffsetOrMin::MAX);
de("\"max\"", OffsetOrMin::MAX);
err::<OffsetOrMin>("-2", "below -1");
err::<OffsetOrMin>("-20000000000000000000", "not an integer");
err::<OffsetOrMin>("3.4", "not an integer");
err::<OffsetOrMin>("90071992547409911", "too large");
err::<OffsetOrMin>("\"-2\"", "below -1");
err::<OffsetOrMin>("\"-20000000000000000000\"", "number too small");
err::<OffsetOrMin>("\"3.4\"", "invalid digit");
err::<OffsetOrMin>("\"90071992547409911\"", "too large");
}
#[test]
fn must_serde_offset_map() {
let mut map = OffsetMap::empty();
ser(map.clone(), "{}");
map.update(source_id!("a"), Offset::mk_test(12));
ser(map.clone(), "{\"a\":12}");
de("{}", OffsetMap::empty());
de("{\"a\":12}", map.clone());
de("{\"a\":12,\"b\":-1}", map);
err::<OffsetMap>("{\"a\":12,\"b\":-11}", "below -1");
}
}