use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use gen_fsm::{Action, EventType, FsmDriver, FsmHandler, Transition};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use dynvec::SearchResult;
use crate::cluster::apl::{walk_n_successors, ClusterState};
use crate::embed::events::PeerId;
pub const DEFAULT_PER_PEER_DEADLINE_MS: u64 = 5_000;
#[derive(Clone, Debug, PartialEq)]
pub struct PeerHits {
pub peer: String,
pub hits: Vec<SearchResult>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SearchRequest {
pub table: String,
pub vector: Vec<f32>,
pub k: usize,
pub ef: Option<usize>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct SearchResponse {
pub hits: Vec<SearchResult>,
pub peers_consulted: usize,
}
pub type PeerProbe =
Arc<dyn Fn(&str, SearchRequest) -> Result<Vec<SearchResult>, String> + Send + Sync + 'static>;
#[derive(Debug)]
pub enum Event {
Fanout,
Gather,
PeerHits(PeerHits),
GatherComplete,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum State {
Init,
Fanout,
Gather,
Merge,
}
pub struct Coordinator {
request: SearchRequest,
peers: Vec<String>,
probe: PeerProbe,
hits: HashMap<String, Vec<SearchResult>>,
response: Arc<Mutex<Option<SearchResponse>>>,
deadline: Duration,
}
impl Coordinator {
#[must_use]
pub fn new(
request: SearchRequest,
peers: Vec<String>,
probe: PeerProbe,
deadline: Duration,
) -> (Self, Arc<Mutex<Option<SearchResponse>>>) {
let response = Arc::new(Mutex::new(None));
let coord = Self {
request,
peers,
probe,
hits: HashMap::new(),
response: Arc::clone(&response),
deadline,
};
(coord, response)
}
}
impl FsmHandler for Coordinator {
type State = State;
type Event = Event;
type Reply = ();
type Stop = String;
fn initial(&self) -> Self::State {
State::Init
}
fn handle(
&mut self,
state: Self::State,
_event_type: EventType,
event: Self::Event,
) -> Transition<Self> {
match (state, event) {
(State::Init, Event::Fanout) => {
Transition::Next(State::Fanout, vec![Action::post_internal(Event::Gather)])
}
(State::Fanout, Event::Gather) => {
let mut completion: Vec<Action<Self>> = Vec::new();
for peer in self.peers.clone() {
let res = (self.probe)(&peer, self.request.clone());
match res {
Ok(hits) => {
completion.push(Action::post_internal(Event::PeerHits(PeerHits {
peer,
hits,
})));
}
Err(err) => {
tracing::warn!(peer=%peer, error=%err, "peer probe failed");
completion.push(Action::post_internal(Event::PeerHits(PeerHits {
peer,
hits: Vec::new(),
})));
}
}
}
completion.push(Action::set_state_timeout(self.deadline));
if completion.is_empty() {
Transition::Next(
State::Merge,
vec![Action::post_internal(Event::GatherComplete)],
)
} else {
Transition::Next(State::Gather, completion)
}
}
(State::Gather, Event::PeerHits(reply)) => {
self.hits.insert(reply.peer, reply.hits);
if self.hits.len() >= self.peers.len() {
Transition::Next(
State::Merge,
vec![Action::post_internal(Event::GatherComplete)],
)
} else {
Transition::Keep(vec![])
}
}
(State::Merge, Event::GatherComplete) => {
let merged = merge_hits(&self.hits, self.request.k);
let response = SearchResponse {
hits: merged,
peers_consulted: self.hits.values().filter(|h| !h.is_empty()).count(),
};
*self.response.lock() = Some(response);
Transition::Stop("complete".to_string())
}
(_, _) => Transition::Keep(vec![]),
}
}
fn on_timeout(&mut self, state: Self::State, _kind: gen_fsm::TimeoutKind) -> Transition<Self> {
match state {
State::Gather => Transition::Next(
State::Merge,
vec![Action::post_internal(Event::GatherComplete)],
),
_ => Transition::Keep(vec![]),
}
}
}
#[must_use]
pub fn merge_hits<S: std::hash::BuildHasher>(
per_peer: &HashMap<String, Vec<SearchResult>, S>,
k: usize,
) -> Vec<SearchResult> {
let mut all: Vec<SearchResult> = per_peer.values().flatten().cloned().collect();
all.sort_by(|a, b| {
a.score
.partial_cmp(&b.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut seen: HashMap<u64, f32> = HashMap::new();
let mut deduped: Vec<SearchResult> = Vec::with_capacity(all.len());
for r in all {
let entry = seen.entry(r.id).or_insert(r.score);
if r.score <= *entry {
*entry = r.score;
deduped.push(r);
}
}
deduped.sort_by(|a, b| {
a.score
.partial_cmp(&b.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut final_seen: std::collections::HashSet<u64> = std::collections::HashSet::new();
let mut out: Vec<SearchResult> = Vec::with_capacity(k);
for r in deduped {
if final_seen.insert(r.id) {
out.push(r);
if out.len() >= k {
break;
}
}
}
out
}
pub async fn run(
request: SearchRequest,
peers: Vec<String>,
probe: PeerProbe,
deadline: Duration,
) -> Result<SearchResponse, gen_fsm::DriverError> {
let (coord, response) = Coordinator::new(request, peers, probe, deadline);
let driver = gen_fsm::FsmDriver::start(coord);
driver.cast_checked(Event::Fanout).await?;
let _stop = driver.join().await?;
let final_resp = response.lock().clone().unwrap_or(SearchResponse {
hits: Vec::new(),
peers_consulted: 0,
});
Ok(final_resp)
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SerializedQuery {
Knn {
vector_field: String,
vector_bytes: Vec<u8>,
ef: Option<u32>,
},
Text {
field: String,
query: Vec<u8>,
},
Regex {
field: String,
pattern: String,
max_errors: u16,
},
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct HitWithScore {
pub doc_id: Vec<u8>,
pub score: f32,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct PeerReply {
pub hits: Vec<HitWithScore>,
pub timed_out: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BroadcastRequest {
pub table: String,
pub query: SerializedQuery,
pub top_k: u32,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BroadcastResponse {
pub hits: Vec<HitWithScore>,
pub peers_consulted: usize,
pub peers_timed_out: usize,
pub partial: bool,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MergeOrder {
ScoreAscending,
DocIdAscending,
}
pub type AsyncPeerProbe = Arc<
dyn Fn(
PeerId,
BroadcastRequest,
)
-> Pin<Box<dyn Future<Output = Result<Vec<HitWithScore>, String>> + Send + 'static>>
+ Send
+ Sync
+ 'static,
>;
#[must_use]
pub fn select_primary_peers(cluster: &ClusterState) -> Vec<PeerId> {
let len = cluster.ring().len();
if len == 0 {
return Vec::new();
}
walk_n_successors(cluster, 0, len)
.into_iter()
.filter(|(_, pid)| cluster.is_alive(*pid))
.map(|(_, pid)| pid)
.collect()
}
#[must_use]
pub const fn default_per_peer_deadline() -> Duration {
Duration::from_millis(DEFAULT_PER_PEER_DEADLINE_MS)
}
#[must_use]
pub fn merge_hits_ranked(
per_peer: &[PeerReply],
top_k: u32,
order: MergeOrder,
) -> Vec<HitWithScore> {
let cap = usize::try_from(top_k).unwrap_or(usize::MAX);
if cap == 0 {
return Vec::new();
}
let mut all: Vec<HitWithScore> = per_peer
.iter()
.flat_map(|reply| reply.hits.iter().cloned())
.collect();
sort_hits(&mut all, order);
let mut seen: HashSet<Vec<u8>> = HashSet::with_capacity(all.len().min(cap));
let mut out: Vec<HitWithScore> = Vec::with_capacity(cap);
for hit in all {
if seen.insert(hit.doc_id.clone()) {
out.push(hit);
if out.len() >= cap {
break;
}
}
}
out
}
fn sort_hits(hits: &mut [HitWithScore], order: MergeOrder) {
match order {
MergeOrder::ScoreAscending => {
hits.sort_by(|a, b| {
a.score
.partial_cmp(&b.score)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.doc_id.cmp(&b.doc_id))
});
}
MergeOrder::DocIdAscending => {
hits.sort_by(|a, b| a.doc_id.cmp(&b.doc_id));
}
}
}
#[derive(Debug)]
pub enum BroadcastEvent {
PeerReplied(PeerReply),
AllReceived,
MergeDone,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BroadcastState {
Init,
Gathering,
Merging,
}
pub struct BroadcastCoordinator {
request: BroadcastRequest,
expected_peers: usize,
replies: Vec<PeerReply>,
order: MergeOrder,
response: Arc<Mutex<Option<BroadcastResponse>>>,
overall_deadline: Duration,
}
impl BroadcastCoordinator {
#[must_use]
pub fn new(
request: BroadcastRequest,
expected_peers: usize,
order: MergeOrder,
overall_deadline: Duration,
) -> (Self, Arc<Mutex<Option<BroadcastResponse>>>) {
let response = Arc::new(Mutex::new(None));
let coord = Self {
request,
expected_peers,
replies: Vec::with_capacity(expected_peers),
order,
response: Arc::clone(&response),
overall_deadline,
};
(coord, response)
}
fn finalise(&self) -> BroadcastResponse {
let timed_out = self.replies.iter().filter(|r| r.timed_out).count();
let consulted = self.replies.len();
let merged = merge_hits_ranked(&self.replies, self.request.top_k, self.order);
BroadcastResponse {
hits: merged,
peers_consulted: consulted,
peers_timed_out: timed_out,
partial: timed_out > 0 || consulted < self.expected_peers,
}
}
}
impl FsmHandler for BroadcastCoordinator {
type State = BroadcastState;
type Event = BroadcastEvent;
type Reply = ();
type Stop = String;
fn initial(&self) -> Self::State {
BroadcastState::Init
}
fn handle(
&mut self,
state: Self::State,
_event_type: EventType,
event: Self::Event,
) -> Transition<Self> {
match (state, event) {
(BroadcastState::Init | BroadcastState::Gathering, BroadcastEvent::PeerReplied(r)) => {
self.replies.push(r);
if self.replies.len() >= self.expected_peers {
Transition::Next(
BroadcastState::Merging,
vec![
Action::cancel_state_timeout(),
Action::post_internal(BroadcastEvent::AllReceived),
],
)
} else if state == BroadcastState::Init {
Transition::Next(
BroadcastState::Gathering,
vec![Action::set_state_timeout(self.overall_deadline)],
)
} else {
Transition::Keep(vec![])
}
}
(BroadcastState::Merging, BroadcastEvent::AllReceived | BroadcastEvent::MergeDone) => {
let resp = self.finalise();
*self.response.lock() = Some(resp);
Transition::Stop("broadcast complete".to_string())
}
_ => Transition::Keep(vec![]),
}
}
fn on_timeout(&mut self, state: Self::State, _kind: gen_fsm::TimeoutKind) -> Transition<Self> {
if matches!(state, BroadcastState::Gathering | BroadcastState::Init) {
while self.replies.len() < self.expected_peers {
self.replies.push(PeerReply {
hits: Vec::new(),
timed_out: true,
});
}
Transition::Next(
BroadcastState::Merging,
vec![Action::post_internal(BroadcastEvent::AllReceived)],
)
} else {
Transition::Keep(vec![])
}
}
}
pub async fn broadcast(
request: BroadcastRequest,
peers: Vec<PeerId>,
probe: AsyncPeerProbe,
per_peer_deadline: Duration,
order: MergeOrder,
) -> Result<BroadcastResponse, gen_fsm::DriverError> {
if peers.is_empty() {
return Ok(BroadcastResponse {
hits: Vec::new(),
peers_consulted: 0,
peers_timed_out: 0,
partial: true,
});
}
let overall = per_peer_deadline
.saturating_mul(2)
.saturating_add(Duration::from_secs(1));
let n = peers.len();
let (handler, response) = BroadcastCoordinator::new(request.clone(), n, order, overall);
let driver: FsmDriver<BroadcastCoordinator> = FsmDriver::start(handler);
let (reply_tx, mut reply_rx) = mpsc::channel::<PeerReply>(n);
for peer in peers {
let probe = Arc::clone(&probe);
let req = request.clone();
let tx = reply_tx.clone();
tokio::spawn(async move {
let fut = probe(peer, req);
let reply = match tokio::time::timeout(per_peer_deadline, fut).await {
Ok(Ok(hits)) => PeerReply {
hits,
timed_out: false,
},
Ok(Err(err)) => {
tracing::warn!(peer=peer, error=%err, "FT.SEARCH peer probe failed");
PeerReply {
hits: Vec::new(),
timed_out: false,
}
}
Err(_) => {
tracing::warn!(
peer = peer,
"FT.SEARCH peer probe timed out (per-peer deadline elapsed)"
);
PeerReply {
hits: Vec::new(),
timed_out: true,
}
}
};
let _ = tx.send(reply).await;
});
}
drop(reply_tx);
let driver_for_pump = driver.clone();
let pump = tokio::spawn(async move {
while let Some(reply) = reply_rx.recv().await {
if driver_for_pump
.cast_checked(BroadcastEvent::PeerReplied(reply))
.await
.is_err()
{
break;
}
}
});
let _ = driver.join().await?;
let _ = pump.await;
let final_resp = response
.lock()
.clone()
.unwrap_or_else(|| BroadcastResponse {
hits: Vec::new(),
peers_consulted: 0,
peers_timed_out: n,
partial: true,
});
Ok(final_resp)
}
#[cfg(test)]
mod tests {
use super::*;
use dynvec::SearchResult;
fn req() -> SearchRequest {
SearchRequest {
table: "t".to_string(),
vector: vec![0.0; 4],
k: 3,
ef: None,
}
}
#[tokio::test]
async fn merges_hits_from_multiple_peers() {
let hits_p1 = vec![
SearchResult { id: 1, score: 0.1 },
SearchResult { id: 2, score: 0.5 },
];
let hits_p2 = vec![
SearchResult { id: 3, score: 0.2 },
SearchResult { id: 4, score: 0.6 },
];
let probe: PeerProbe = Arc::new(move |peer, _r| match peer {
"p1" => Ok(hits_p1.clone()),
"p2" => Ok(hits_p2.clone()),
_ => Err("unknown peer".to_string()),
});
let resp = run(
req(),
vec!["p1".to_string(), "p2".to_string()],
probe,
Duration::from_secs(1),
)
.await
.unwrap();
assert_eq!(resp.peers_consulted, 2);
assert_eq!(resp.hits.len(), 3);
assert_eq!(resp.hits[0].id, 1);
assert_eq!(resp.hits[1].id, 3);
assert_eq!(resp.hits[2].id, 2);
}
#[tokio::test]
async fn missing_peers_are_tolerated() {
let probe: PeerProbe = Arc::new(|peer, _r| match peer {
"good" => Ok(vec![SearchResult { id: 1, score: 0.1 }]),
_ => Err("dead".to_string()),
});
let resp = run(
req(),
vec!["good".to_string(), "bad".to_string()],
probe,
Duration::from_secs(1),
)
.await
.unwrap();
assert_eq!(resp.peers_consulted, 1);
assert_eq!(resp.hits.len(), 1);
assert_eq!(resp.hits[0].id, 1);
}
#[tokio::test]
async fn duplicate_ids_collapsed() {
let probe: PeerProbe = Arc::new(|peer, _r| match peer {
"p1" => Ok(vec![SearchResult { id: 1, score: 0.10 }]),
"p2" => Ok(vec![SearchResult { id: 1, score: 0.05 }]),
_ => Err("unknown".to_string()),
});
let resp = run(
SearchRequest {
table: "t".to_string(),
vector: vec![],
k: 2,
ef: None,
},
vec!["p1".to_string(), "p2".to_string()],
probe,
Duration::from_secs(1),
)
.await
.unwrap();
assert_eq!(resp.hits.len(), 1);
assert!((resp.hits[0].score - 0.05).abs() < 1e-6);
}
use std::collections::HashSet;
use crate::cluster::apl::{ClusterState, RingPoint};
fn knn_request(top_k: u32) -> BroadcastRequest {
BroadcastRequest {
table: "idx".into(),
query: SerializedQuery::Knn {
vector_field: "v".into(),
vector_bytes: vec![0u8; 16],
ef: None,
},
top_k,
}
}
fn fixed_probe(per_peer: HashMap<PeerId, Vec<HitWithScore>>) -> AsyncPeerProbe {
Arc::new(move |peer, _req| {
let hits = per_peer.get(&peer).cloned().unwrap_or_default();
Box::pin(async move { Ok(hits) })
})
}
#[tokio::test]
async fn merge_score_ascending_picks_smallest_scores() {
let p0 = PeerReply {
hits: vec![
HitWithScore {
doc_id: b"a".to_vec(),
score: 0.1,
},
HitWithScore {
doc_id: b"b".to_vec(),
score: 0.5,
},
],
timed_out: false,
};
let p1 = PeerReply {
hits: vec![
HitWithScore {
doc_id: b"c".to_vec(),
score: 0.05,
},
HitWithScore {
doc_id: b"d".to_vec(),
score: 0.6,
},
],
timed_out: false,
};
let merged = merge_hits_ranked(&[p0, p1], 3, MergeOrder::ScoreAscending);
assert_eq!(merged.len(), 3);
assert_eq!(merged[0].doc_id, b"c");
assert_eq!(merged[1].doc_id, b"a");
assert_eq!(merged[2].doc_id, b"b");
}
#[tokio::test]
async fn merge_doc_id_ascending_orders_by_key() {
let p0 = PeerReply {
hits: vec![
HitWithScore {
doc_id: b"key:9".to_vec(),
score: 0.0,
},
HitWithScore {
doc_id: b"key:1".to_vec(),
score: 0.0,
},
],
timed_out: false,
};
let p1 = PeerReply {
hits: vec![HitWithScore {
doc_id: b"key:5".to_vec(),
score: 0.0,
}],
timed_out: false,
};
let merged = merge_hits_ranked(&[p0, p1], 5, MergeOrder::DocIdAscending);
assert_eq!(
merged.iter().map(|h| h.doc_id.clone()).collect::<Vec<_>>(),
vec![b"key:1".to_vec(), b"key:5".to_vec(), b"key:9".to_vec()],
);
}
#[tokio::test]
async fn merge_dedups_doc_ids_in_score_order() {
let p0 = PeerReply {
hits: vec![HitWithScore {
doc_id: b"a".to_vec(),
score: 0.10,
}],
timed_out: false,
};
let p1 = PeerReply {
hits: vec![HitWithScore {
doc_id: b"a".to_vec(),
score: 0.05,
}],
timed_out: false,
};
let merged = merge_hits_ranked(&[p0, p1], 5, MergeOrder::ScoreAscending);
assert_eq!(merged.len(), 1);
assert!((merged[0].score - 0.05).abs() < 1e-6);
}
#[tokio::test]
async fn merge_top_k_zero_returns_empty() {
let p = PeerReply {
hits: vec![HitWithScore {
doc_id: b"a".to_vec(),
score: 0.1,
}],
timed_out: false,
};
assert!(merge_hits_ranked(&[p], 0, MergeOrder::ScoreAscending).is_empty());
}
#[tokio::test]
async fn broadcast_with_no_peers_returns_partial_empty() {
let probe: AsyncPeerProbe = Arc::new(|_peer, _req| Box::pin(async { Ok(Vec::new()) }));
let resp = broadcast(
knn_request(5),
Vec::new(),
probe,
Duration::from_millis(50),
MergeOrder::ScoreAscending,
)
.await
.unwrap();
assert!(resp.hits.is_empty());
assert_eq!(resp.peers_consulted, 0);
assert!(resp.partial);
}
#[tokio::test]
async fn broadcast_one_peer_returns_local_top_k() {
let mut per_peer: HashMap<PeerId, Vec<HitWithScore>> = HashMap::new();
per_peer.insert(
7,
vec![
HitWithScore {
doc_id: b"a".to_vec(),
score: 0.10,
},
HitWithScore {
doc_id: b"b".to_vec(),
score: 0.30,
},
],
);
let resp = broadcast(
knn_request(2),
vec![7],
fixed_probe(per_peer),
Duration::from_millis(200),
MergeOrder::ScoreAscending,
)
.await
.unwrap();
assert_eq!(resp.peers_consulted, 1);
assert_eq!(resp.peers_timed_out, 0);
assert!(!resp.partial);
assert_eq!(resp.hits.len(), 2);
assert_eq!(resp.hits[0].doc_id, b"a");
}
#[tokio::test]
async fn broadcast_two_peers_merges() {
let mut per_peer: HashMap<PeerId, Vec<HitWithScore>> = HashMap::new();
per_peer.insert(
1,
vec![
HitWithScore {
doc_id: b"a".to_vec(),
score: 0.10,
},
HitWithScore {
doc_id: b"b".to_vec(),
score: 0.40,
},
],
);
per_peer.insert(
2,
vec![
HitWithScore {
doc_id: b"c".to_vec(),
score: 0.05,
},
HitWithScore {
doc_id: b"d".to_vec(),
score: 0.50,
},
],
);
let resp = broadcast(
knn_request(3),
vec![1, 2],
fixed_probe(per_peer),
Duration::from_millis(200),
MergeOrder::ScoreAscending,
)
.await
.unwrap();
assert_eq!(resp.peers_consulted, 2);
assert_eq!(resp.hits.len(), 3);
assert_eq!(resp.hits[0].doc_id, b"c");
assert_eq!(resp.hits[1].doc_id, b"a");
assert_eq!(resp.hits[2].doc_id, b"b");
}
#[tokio::test]
async fn broadcast_one_peer_timeout_marks_partial() {
let probe: AsyncPeerProbe = Arc::new(move |peer, _req| {
Box::pin(async move {
if peer == 9 {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok(Vec::new())
} else {
Ok(vec![HitWithScore {
doc_id: b"x".to_vec(),
score: 0.10,
}])
}
})
});
let resp = broadcast(
knn_request(3),
vec![1, 9],
probe,
Duration::from_millis(50),
MergeOrder::ScoreAscending,
)
.await
.unwrap();
assert_eq!(resp.peers_consulted, 2);
assert_eq!(resp.peers_timed_out, 1);
assert!(resp.partial);
assert_eq!(resp.hits.len(), 1);
assert_eq!(resp.hits[0].doc_id, b"x");
}
#[tokio::test]
async fn broadcast_all_peers_timeout_returns_empty_partial() {
let probe: AsyncPeerProbe = Arc::new(|_peer, _req| {
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok(Vec::new())
})
});
let resp = broadcast(
knn_request(3),
vec![1, 2, 3],
probe,
Duration::from_millis(40),
MergeOrder::ScoreAscending,
)
.await
.unwrap();
assert_eq!(resp.peers_consulted, 3);
assert_eq!(resp.peers_timed_out, 3);
assert!(resp.partial);
assert!(resp.hits.is_empty());
}
#[tokio::test]
async fn select_primary_peers_returns_one_per_distinct_alive_peer() {
let cs = ClusterState::new(
vec![
RingPoint::new(100, 0),
RingPoint::new(200, 1),
RingPoint::new(300, 2),
],
[0u32, 1, 2].into_iter().collect::<HashSet<_>>(),
);
let mut peers = select_primary_peers(&cs);
peers.sort_unstable();
assert_eq!(peers, vec![0, 1, 2]);
}
#[tokio::test]
async fn select_primary_peers_filters_dead_peers() {
let cs = ClusterState::new(
vec![
RingPoint::new(100, 0),
RingPoint::new(200, 1),
RingPoint::new(300, 2),
],
[0u32, 2].into_iter().collect::<HashSet<_>>(),
);
let mut peers = select_primary_peers(&cs);
peers.sort_unstable();
assert_eq!(peers, vec![0, 2]);
}
#[tokio::test]
async fn select_primary_peers_dedups_multi_vnode_peers() {
let cs = ClusterState::new(
vec![
RingPoint::new(100, 0),
RingPoint::new(200, 0),
RingPoint::new(300, 1),
],
[0u32, 1].into_iter().collect::<HashSet<_>>(),
);
let mut peers = select_primary_peers(&cs);
peers.sort_unstable();
assert_eq!(peers, vec![0, 1]);
}
}