#![forbid(unsafe_code)]
mod builder;
pub mod error;
mod hash;
mod interval;
mod migration;
mod node;
mod replication;
mod sharding;
pub use {
builder::KeyspaceBuilder,
error::*,
hash::DefaultHasher,
interval::{Interval, KeyRange},
migration::MigrationPlan,
node::{KeyspaceNode, NodeRef},
replication::{DefaultReplicationStrategy, ReplicationStrategy},
};
use {
node::Nodes,
sharding::{ShardIdx, Shards},
std::{
hash::{BuildHasher, BuildHasherDefault, Hash},
sync::Arc,
},
};
pub type KeyPosition = u64;
pub struct Keyspace<
N,
R = DefaultReplicationStrategy,
const RF: usize = 3,
H = BuildHasherDefault<DefaultHasher>,
> where
N: KeyspaceNode,
R: ReplicationStrategy<N>,
H: BuildHasher,
{
nodes: Arc<Nodes<N>>,
shards: Shards<N, RF>,
replication_strategy: R,
build_hasher: H,
version: u64,
}
impl<N, R, const RF: usize, H> Keyspace<N, R, RF, H>
where
N: KeyspaceNode,
R: ReplicationStrategy<N>,
H: BuildHasher,
{
fn with_build_hasher<I: IntoIterator<Item = N>>(
build_hasher: H,
init_nodes: I,
replication_strategy: R,
) -> KeyspaceResult<Self> {
let nodes = Nodes::from_iter(init_nodes);
let shards = Shards::new(&nodes, replication_strategy.clone())?;
Ok(Self {
nodes: Arc::new(nodes),
shards,
replication_strategy,
build_hasher,
version: 0,
})
}
pub fn add_node(&mut self, node: N) -> KeyspaceResult<MigrationPlan<N>> {
self.nodes.insert(node);
self.migration_plan()
}
pub fn remove_node(&mut self, node_id: &N::Id) -> KeyspaceResult<MigrationPlan<N>> {
self.nodes.remove(node_id);
self.migration_plan()
}
pub fn replicas<K: Hash>(&self, key: &K) -> impl Iterator<Item = NodeRef<N>> {
let key_position = self.build_hasher.hash_one(key);
let shard_idx = ShardIdx::from_position(key_position);
let replica_set = self.shards.replica_set(shard_idx);
replica_set.iter().map(Clone::clone)
}
pub fn version(&self) -> u64 {
self.version
}
pub fn iter(&self) -> impl Iterator<Item = (KeyRange, NodeRef<N>)> {
self.shards.iter().flat_map(|shard| {
let key_range = shard.key_range();
shard
.replica_set()
.iter()
.map(|idx| (key_range, idx.clone()))
.collect::<Vec<_>>()
})
}
pub fn iter_node(&self, node_id: &N::Id) -> impl Iterator<Item = KeyRange> {
self.iter().filter_map(move |(key_range, node)| {
if node.id() == node_id {
Some(key_range)
} else {
None
}
})
}
fn migration_plan(&mut self) -> KeyspaceResult<MigrationPlan<N>> {
let old_shards = self.shards.clone();
self.shards = Shards::new(&self.nodes, self.replication_strategy.clone())?;
let new_version = self.version + 1;
MigrationPlan::new(new_version, &old_shards, &self.shards).and_then(|plan| {
self.version = new_version;
Ok(plan)
})
}
}