use std::sync::Arc;
use fluvio_types::PartitionId;
use tracing::debug;
use async_trait::async_trait;
use fluvio_types::SpuId;
use crate::store::*;
use crate::core::*;
use super::*;
pub type SharedPartitionStore<C> = Arc<PartitionLocalStore<C>>;
pub type PartitionMetadata<C> = MetadataStoreObject<PartitionSpec, C>;
pub type PartitionLocalStore<C> = LocalStore<PartitionSpec, C>;
pub type DefaultPartitionMd = PartitionMetadata<String>;
pub type DefaultPartitionStore = PartitionLocalStore<u32>;
pub trait PartitionMd<C: MetadataItem> {
fn with_replicas(key: ReplicaKey, replicas: Vec<SpuId>) -> Self;
fn quick<S: Into<String>>(partition: ((S, PartitionId), Vec<SpuId>)) -> Self;
}
impl<C: MetadataItem> PartitionMd<C> for PartitionMetadata<C> {
fn with_replicas(key: ReplicaKey, replicas: Vec<SpuId>) -> Self {
let spec: PartitionSpec = replicas.into();
Self::new(key, spec, PartitionStatus::default())
}
fn quick<S: Into<String>>(partition: ((S, PartitionId), Vec<SpuId>)) -> Self {
let (replica_key, replicas) = partition;
Self::with_replicas(replica_key.into(), replicas)
}
}
#[async_trait]
pub trait PartitionLocalStorePolicy<C>
where
C: MetadataItem,
{
async fn names(&self) -> Vec<ReplicaKey>;
async fn topic_partitions(&self, topic: &str) -> Vec<PartitionMetadata<C>>;
async fn partition_spec_for_spu(&self, target_spu: SpuId) -> Vec<(ReplicaKey, PartitionSpec)>;
async fn count_topic_partitions(&self, topic: &str) -> i32;
async fn topic_partitions_list(&self, topic: &str) -> Vec<ReplicaKey>;
async fn table_fmt(&self) -> String;
async fn replica_for_spu(&self, target_spu: SpuId) -> Vec<Replica>;
async fn leaders(&self) -> Vec<ReplicaLeader>;
fn bulk_load<S: Into<String>>(partitions: Vec<((S, PartitionId), Vec<SpuId>)>) -> Self;
}
#[async_trait]
impl<C> PartitionLocalStorePolicy<C> for PartitionLocalStore<C>
where
C: MetadataItem + Send + Sync,
{
async fn names(&self) -> Vec<ReplicaKey> {
self.read().await.keys().cloned().collect()
}
async fn topic_partitions(&self, topic: &str) -> Vec<PartitionMetadata<C>> {
let mut res: Vec<PartitionMetadata<C>> = Vec::default();
for (name, partition) in self.read().await.iter() {
if name.topic == topic {
res.push(partition.inner().clone());
}
}
res
}
async fn partition_spec_for_spu(&self, target_spu: SpuId) -> Vec<(ReplicaKey, PartitionSpec)> {
let mut res = vec![];
for (name, partition) in self.read().await.iter() {
if partition.spec.replicas.contains(&target_spu) {
res.push((name.clone(), partition.spec.clone()));
}
}
res
}
async fn count_topic_partitions(&self, topic: &str) -> i32 {
let mut count: i32 = 0;
for (name, _) in self.read().await.iter() {
if name.topic == topic {
count += 1;
}
}
count
}
async fn topic_partitions_list(&self, topic: &str) -> Vec<ReplicaKey> {
self.read()
.await
.keys()
.filter_map(|name| {
if name.topic == topic {
Some(name.clone())
} else {
None
}
})
.collect()
}
async fn table_fmt(&self) -> String {
let mut table = String::new();
let partition_hdr = format!(
"{n:<18} {l:<6} {r}\n",
n = "PARTITION",
l = "LEADER",
r = "LIVE-REPLICAS",
);
table.push_str(&partition_hdr);
for (name, partition) in self.read().await.iter() {
let mut leader = String::from("-");
let mut _lrs = String::from("[]");
if partition.spec.leader >= 0 {
leader = format!("{}", partition.spec.leader);
}
let row = format!("{n:<18} {l:<6} \n", n = name.to_string(), l = leader,);
table.push_str(&row);
}
table
}
async fn replica_for_spu(&self, target_spu: SpuId) -> Vec<Replica> {
let msgs: Vec<Replica> = self
.partition_spec_for_spu(target_spu)
.await
.into_iter()
.map(|(replica_key, partition_spec)| {
Replica::new(replica_key, partition_spec.leader, partition_spec.replicas)
})
.collect();
debug!(
"{} computing replica msg for spu y: {}, msg: {}",
self,
target_spu,
msgs.len()
);
msgs
}
async fn leaders(&self) -> Vec<ReplicaLeader> {
self.read()
.await
.iter()
.map(|(key, value)| ReplicaLeader {
id: key.clone(),
leader: value.spec.leader,
})
.collect()
}
fn bulk_load<S: Into<String>>(partitions: Vec<((S, PartitionId), Vec<SpuId>)>) -> Self {
let elements = partitions
.into_iter()
.map(|(replica_key, replicas)| PartitionMetadata::quick((replica_key, replicas)))
.collect();
Self::bulk_new(elements)
}
}
#[cfg(test)]
pub mod test {
use super::*;
#[fluvio_future::test]
async fn test_partitions_to_replica_msgs() {
let partitions = DefaultPartitionStore::bulk_load(vec![(("topic1", 0), vec![10, 11, 12])]);
let replica_msg = partitions.replica_for_spu(10).await;
assert_eq!(replica_msg.len(), 1);
}
}