use std::num::NonZeroUsize;
use std::sync::Arc;
use super::super::NodeAddress;
use super::super::slot_range_map::SlotRangeMap;
#[derive(Debug, Clone)]
pub struct Shard {
slot_ranges: Arc<[(u16, u16)]>,
primary: NodeAddress,
replicas: Arc<[NodeAddress]>,
}
impl Shard {
pub fn new(
slot_ranges: impl Into<Arc<[(u16, u16)]>>,
primary: NodeAddress,
replicas: impl Into<Arc<[NodeAddress]>>,
) -> Self {
Self {
slot_ranges: slot_ranges.into(),
primary,
replicas: replicas.into(),
}
}
pub fn slot_ranges(&self) -> &[(u16, u16)] {
&self.slot_ranges
}
pub fn primary(&self) -> &NodeAddress {
&self.primary
}
pub fn replicas(&self) -> &[NodeAddress] {
&self.replicas
}
}
#[derive(Debug, Clone)]
pub struct ClusterTopology {
slots: SlotRangeMap<Arc<Shard>>,
}
impl ClusterTopology {
pub fn from_shards(shards: Vec<Shard>) -> Self {
let mut slots = SlotRangeMap::new();
for shard in shards {
let shard = Arc::new(shard);
for &(start, end) in shard.slot_ranges() {
slots.insert(start, end, Arc::clone(&shard));
}
}
Self { slots }
}
pub fn shard_for_slot(&self, slot: u16) -> Option<&Shard> {
self.slots.get(slot).map(|arc| arc.as_ref())
}
pub fn shards(&self) -> impl Iterator<Item = &Shard> {
let mut seen = std::collections::HashSet::new();
self.slots.values().filter_map(move |shard| {
if seen.insert(Arc::as_ptr(shard)) {
Some(shard.as_ref())
} else {
None
}
})
}
}
#[derive(Debug)]
pub struct Replicas<'a> {
inner: &'a [NodeAddress],
}
impl<'a> Replicas<'a> {
pub fn new(slice: &'a [NodeAddress]) -> Option<Self> {
if slice.is_empty() {
None
} else {
Some(Self { inner: slice })
}
}
pub fn len(&self) -> NonZeroUsize {
NonZeroUsize::new(self.inner.len()).expect("Replicas is non-empty")
}
pub fn first(&self) -> &'a NodeAddress {
&self.inner[0]
}
pub fn get(&self, idx: usize) -> Option<&'a NodeAddress> {
self.inner.get(idx)
}
pub fn choose_random(&self) -> &'a NodeAddress {
use rand::seq::IndexedRandom;
self.inner.choose(&mut rand::rng()).expect("non-empty")
}
pub fn iter(&self) -> impl Iterator<Item = &'a NodeAddress> {
self.inner.iter()
}
}
#[derive(Debug)]
pub struct AnyNodeCandidates<'a> {
slot: u16,
primary: &'a NodeAddress,
replicas: Replicas<'a>,
}
impl<'a> AnyNodeCandidates<'a> {
pub fn slot(&self) -> u16 {
self.slot
}
pub fn primary(&self) -> &'a NodeAddress {
self.primary
}
pub fn replicas(&self) -> &Replicas<'a> {
&self.replicas
}
}
#[derive(Debug)]
pub struct ReplicasOnlyCandidates<'a> {
slot: u16,
replicas: Replicas<'a>,
}
impl<'a> ReplicasOnlyCandidates<'a> {
pub fn slot(&self) -> u16 {
self.slot
}
pub fn replicas(&self) -> &Replicas<'a> {
&self.replicas
}
}
#[derive(Debug)]
pub enum ReadCandidates<'a> {
AnyNode(AnyNodeCandidates<'a>),
ReplicasOnly(ReplicasOnlyCandidates<'a>),
}
impl<'a> ReadCandidates<'a> {
pub fn slot(&self) -> u16 {
match self {
ReadCandidates::AnyNode(c) => c.slot(),
ReadCandidates::ReplicasOnly(c) => c.slot(),
}
}
pub(crate) fn any_node(slot: u16, primary: &'a NodeAddress, replicas: Replicas<'a>) -> Self {
ReadCandidates::AnyNode(AnyNodeCandidates {
slot,
primary,
replicas,
})
}
pub(crate) fn replicas_only(slot: u16, replicas: Replicas<'a>) -> Self {
ReadCandidates::ReplicasOnly(ReplicasOnlyCandidates { slot, replicas })
}
}
pub trait ReadRoutingStrategy: Send + Sync {
fn on_topology_changed(&self, _topology: ClusterTopology) {}
fn route_read<'a>(&self, candidates: &ReadCandidates<'a>) -> &'a NodeAddress;
}
pub trait ReadRoutingStrategyFactory: Send + Sync {
fn create_strategy(&self) -> Box<dyn ReadRoutingStrategy>;
}
impl<T: ReadRoutingStrategy + Default + 'static> ReadRoutingStrategyFactory for T {
fn create_strategy(&self) -> Box<dyn ReadRoutingStrategy> {
Box::new(T::default())
}
}