#![allow(clippy::assign_op_pattern)]
use fluvio_types::SpuId;
use fluvio_protocol::{Encoder, Decoder};
use crate::topic::{CleanupPolicy, TopicStorageConfig, TopicSpec, CompressionAlgorithm, Deduplication};
#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase")
)]
pub struct PartitionSpec {
pub leader: SpuId,
pub replicas: Vec<SpuId>,
#[fluvio(min_version = 4)]
pub cleanup_policy: Option<CleanupPolicy>,
#[fluvio(min_version = 4)]
pub storage: Option<TopicStorageConfig>,
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 6)]
pub compression_type: CompressionAlgorithm,
#[fluvio(min_version = 12)]
pub deduplication: Option<Deduplication>,
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 13)]
pub system: bool,
}
impl PartitionSpec {
pub fn new(leader: SpuId, replicas: Vec<SpuId>) -> Self {
Self {
leader,
replicas,
..Default::default()
}
}
pub fn from_replicas(replicas: Vec<SpuId>, topic: &TopicSpec) -> Self {
let leader = if replicas.is_empty() { 0 } else { replicas[0] };
Self {
leader,
replicas,
cleanup_policy: topic.get_clean_policy().cloned(),
storage: topic.get_storage().cloned(),
compression_type: topic.get_compression_type().clone(),
deduplication: topic.get_deduplication().cloned(),
system: topic.is_system(),
}
}
pub fn has_spu(&self, spu: &SpuId) -> bool {
self.replicas.contains(spu)
}
pub fn followers(&self) -> Vec<SpuId> {
self.replicas
.iter()
.filter_map(|r| if r == &self.leader { None } else { Some(*r) })
.collect()
}
}
impl From<Vec<SpuId>> for PartitionSpec {
fn from(replicas: Vec<SpuId>) -> Self {
if !replicas.is_empty() {
Self::new(replicas[0], replicas)
} else {
Self::new(0, replicas)
}
}
}
#[derive(Decoder, Encoder, Debug, Eq, PartialEq, Clone, Default)]
pub struct PartitionConfig {
pub retention_time_seconds: Option<u32>,
}