1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
//! Traits and types related to the distribution of data.
//!
//! These traits and types are in support of a flexible approach to data distribution and organization,
//! in which we might like to more explicitly manage how certain types are handled. Although the term
//! "hashing" is used throughout, it is a misnomer; these traits relate to extracting reasonably distributed
//! integers from the types, and hashing happens to be evocative of this.
//!
//! Differential dataflow operators need to co-locate data that are equivalent so that they may have
//! the differences consolidated, and eventually cancelled. The chose approach is to extract an integer
//! from the keys of the data, ensuring that elements with the same key arrive at the same worker, where
//! the consolidation can occur.
//!
//! The intent is that types should be able to indicate how this integer is determined, so that general
//! data types can use a generic hash function, where as more specialized types such as uniformly
//! distributed integers can perhaps do something simpler (like report their own value).

use std::hash::Hasher;
use std::ops::Deref;

use abomonation::Abomonation;

use timely_sort::Unsigned;

/// Types with a `hashed` method, producing an unsigned output of some type.
///
/// The output type may vary from a `u8` up to a `u64`, allowing types with simple keys
/// to communicate this through their size. Certain algorithms, for example radix sorting,
/// can take advantage of the smaller size.
pub trait Hashable {
    /// The type of the output value.
    type Output: Unsigned+Copy;
    /// A well-distributed integer derived from the data.
    fn hashed(&self) -> Self::Output;
}

impl<T: ::std::hash::Hash> Hashable for T {
    type Output = u64;
    fn hashed(&self) -> u64 {
        let mut h: ::fnv::FnvHasher = Default::default();
        self.hash(&mut h);
        h.finish()
    }
}

/// A marker trait for types whose `Ord` implementation orders first by `hashed()`.
///
/// Types implementing this trait *must* implement `Ord` and satisfy the property that two values
/// with different hashes have the same order as their hashes. This trait allows implementations
/// that sort by hash value to rely on the `Ord` implementation of the type.
pub trait HashOrdered : Ord+Hashable { }
impl<T: Ord+Hashable> HashOrdered for OrdWrapper<T> { }
impl<T: Ord+Hashable> HashOrdered for HashableWrapper<T> { }
impl<T: Unsigned+Copy> HashOrdered for UnsignedWrapper<T> { }

// It would be great to use the macros for these, but I couldn't figure out how to get it
// to work with constraints (i.e. `Hashable`) on the generic parameters.
impl<T: Ord+Hashable+Abomonation> Abomonation for OrdWrapper<T> {
    #[inline] unsafe fn entomb<W: ::std::io::Write>(&self, write: &mut W) -> ::std::io::Result<()> {
        self.item.entomb(write)
    }
    #[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
        let temp = bytes;
        bytes = self.item.exhume(temp)?;
        Some(bytes)
    }
}

// It would be great to use the macros for these, but I couldn't figure out how to get it
// to work with constraints (i.e. `Hashable`) on the generic parameters.
impl<T: Hashable+Abomonation> Abomonation for HashableWrapper<T> {

    #[inline] unsafe fn entomb<W: ::std::io::Write>(&self, write: &mut W) -> ::std::io::Result<()> {
        self.item.entomb(write)
    }
    #[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
        let temp = bytes;
        bytes = self.item.exhume(temp)?;
        Some(bytes)
    }
}

impl<T: Unsigned+Copy+Hashable+Abomonation> Abomonation for UnsignedWrapper<T> {

    #[inline] unsafe fn entomb<W: ::std::io::Write>(&self, write: &mut W) -> ::std::io::Result<()> {
        self.item.entomb(write)
    }
    #[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
        let temp = bytes;
        bytes = self.item.exhume(temp)?;
        Some(bytes)
    }
}


/// A wrapper around hashable types that ensures an implementation of `Ord` that compares
/// hash values first.
#[derive(Clone, Eq, PartialEq, Debug, Default)]
pub struct OrdWrapper<T: Ord+Hashable> {
    /// The item, so you can grab it.
    pub item: T
}

impl<T: Ord+Hashable> PartialOrd for OrdWrapper<T> {
    #[inline]
    fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
        (self.item.hashed(), &self.item).partial_cmp(&(other.item.hashed(), &other.item))
    }
}
impl<T: Ord+Hashable> Ord for OrdWrapper<T> {
    #[inline]
    fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
        (self.item.hashed(), &self.item).cmp(&(other.item.hashed(), &other.item))
    }
}

impl<T: Ord+Hashable> Hashable for OrdWrapper<T> {
    type Output = T::Output;
    fn hashed(&self) -> T::Output { self.item.hashed() }
}

impl<T: Ord+Hashable> Deref for OrdWrapper<T> {
    type Target = T;
    fn deref(&self) -> &T { &self.item }
}


/// Wrapper to stash hash value with the actual value.
#[derive(Clone, Default, Ord, PartialOrd, Eq, PartialEq, Debug, Copy)]
pub struct HashableWrapper<T: Hashable> {
    hash: T::Output,
    /// The item, for reference.
    pub item: T,
}

impl<T: Hashable> Hashable for HashableWrapper<T> {
    type Output = T::Output;
    #[inline]
    fn hashed(&self) -> T::Output { self.hash }
}

impl<T: Hashable> Deref for HashableWrapper<T> {
    type Target = T;
    #[inline]
    fn deref(&self) -> &T { &self.item }
}

impl<T: Hashable> From<T> for HashableWrapper<T> {
    #[inline]
    fn from(item: T) -> HashableWrapper<T> {
        HashableWrapper {
            hash: item.hashed(),
            item,
        }
    }
}

/// A wrapper around an unsigned integer, providing `hashed` as the value itself.
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Default, Debug, Copy)]
pub struct UnsignedWrapper<T: Unsigned+Copy> {
    /// The item.
    pub item: T,
}

impl<T: Unsigned+Copy> Hashable for UnsignedWrapper<T> {
    type Output = T;
    #[inline]
    fn hashed(&self) -> Self::Output { self.item }
}

impl<T: Unsigned+Copy> Deref for UnsignedWrapper<T> {
    type Target = T;
    #[inline]
    fn deref(&self) -> &T { &self.item }
}

impl<T: Unsigned+Copy> From<T> for UnsignedWrapper<T> {
    #[inline]
    fn from(item: T) -> Self { UnsignedWrapper { item } }
}