use std::{collections::BTreeMap, time::SystemTime};
use serde::{Deserialize, Serialize};
use tansu_sans_io::create_topics_request::CreatableTopic;
use uuid::Uuid;
use crate::{GroupDetail, TxnState, Version};
pub(super) type Group = String;
pub(super) type Offset = i64;
pub(super) type Partition = i32;
pub(super) type ProducerEpoch = i16;
pub(super) type ProducerId = i64;
pub(super) type Sequence = i32;
pub(super) type Topic = String;
pub(super) type Topics = BTreeMap<Topic, TopicMetadata>;
pub(super) type Producers = BTreeMap<ProducerId, ProducerDetail>;
pub(super) type Brokers = BTreeMap<i32, BrokerInfo>;
pub(super) type Transactions = BTreeMap<String, Txn>;
#[derive(
Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
)]
pub(super) struct TxnProduceOffset {
pub offset_start: Offset,
pub offset_end: Offset,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct TxnCommitOffset {
pub committed_offset: Offset,
pub leader_epoch: Option<i32>,
pub metadata: Option<String>,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct TopicMetadata {
pub id: Uuid,
pub topic: CreatableTopic,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct Watermark {
pub low: Option<i64>,
pub high: Option<i64>,
pub timestamps: Option<BTreeMap<i64, i64>>,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct WatermarkKey {
pub prefix: char,
pub topic: Uuid,
#[serde(with = "postcard::fixint::be")]
pub partition: Partition,
}
impl Default for WatermarkKey {
fn default() -> Self {
Self {
prefix: 'w',
topic: Uuid::nil(),
partition: 0,
}
}
}
impl WatermarkKey {
pub(super) fn new(topic: Uuid, partition: Partition) -> Self {
Self {
prefix: 'w',
topic,
partition,
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct GroupDetailVersion {
pub detail: GroupDetail,
pub version: Version,
}
impl GroupDetailVersion {
pub(super) fn detail(self, detail: GroupDetail) -> Self {
Self { detail, ..self }
}
pub(super) fn version(self, version: Version) -> Self {
Self { version, ..self }
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct ProducerDetail {
pub sequences: BTreeMap<ProducerEpoch, BTreeMap<String, BTreeMap<i32, Sequence>>>,
}
#[allow(dead_code)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct TxnId {
pub transaction: String,
pub producer_id: ProducerId,
pub producer_epoch: ProducerEpoch,
pub state: TxnState,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct Txn {
pub producer: ProducerId,
pub epochs: BTreeMap<ProducerEpoch, TxnDetail>,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct TxnDetail {
pub transaction_timeout_ms: i32,
pub started_at: Option<SystemTime>,
pub state: Option<TxnState>,
pub produces: BTreeMap<Topic, BTreeMap<Partition, Option<TxnProduceOffset>>>,
pub offsets: BTreeMap<Group, BTreeMap<Topic, BTreeMap<Partition, TxnCommitOffset>>>,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct BatchKey {
pub prefix: char,
pub topic: Uuid,
#[serde(with = "postcard::fixint::be")]
pub partition: Partition,
#[serde(with = "postcard::fixint::be")]
pub offset: Offset,
}
impl Default for BatchKey {
fn default() -> Self {
Self {
prefix: 'b',
topic: Uuid::nil(),
partition: 0,
offset: 0,
}
}
}
impl BatchKey {
pub(super) fn new(topic: Uuid, partition: Partition, offset: Offset) -> Self {
Self {
prefix: 'b',
topic,
partition,
offset,
}
}
pub(super) fn scan_from(topic: Uuid, partition: Partition, offset: Offset) -> Self {
Self::new(topic, partition, offset)
}
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct BatchKeyPrefix {
pub prefix: char,
pub topic: Uuid,
#[serde(with = "postcard::fixint::be")]
pub partition: Partition,
}
impl BatchKeyPrefix {
pub(super) fn new(topic: Uuid, partition: Partition) -> Self {
Self {
prefix: 'b',
topic,
partition,
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct OffsetCommitKey {
pub prefix: char,
pub group: String,
pub topic: String,
#[serde(with = "postcard::fixint::be")]
pub partition: Partition,
}
impl Default for OffsetCommitKey {
fn default() -> Self {
Self {
prefix: 'c',
group: String::new(),
topic: String::new(),
partition: 0,
}
}
}
impl OffsetCommitKey {
pub(super) fn new(
group: impl Into<String>,
topic: impl Into<String>,
partition: Partition,
) -> Self {
Self {
prefix: 'c',
group: group.into(),
topic: topic.into(),
partition,
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct OffsetCommitKeyPrefix {
pub prefix: char,
pub group: String,
}
impl OffsetCommitKeyPrefix {
pub(super) fn new(group: impl Into<String>) -> Self {
Self {
prefix: 'c',
group: group.into(),
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct OffsetCommitValue {
pub offset: i64,
pub leader_epoch: Option<i32>,
pub metadata: Option<String>,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct GroupKey {
pub prefix: char,
pub group_id: String,
}
impl Default for GroupKey {
fn default() -> Self {
Self {
prefix: 'g',
group_id: String::new(),
}
}
}
impl GroupKey {
pub(super) fn new(group_id: impl Into<String>) -> Self {
Self {
prefix: 'g',
group_id: group_id.into(),
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct GroupKeyPrefix {
pub prefix: char,
}
impl GroupKeyPrefix {
pub(super) fn new() -> Self {
Self { prefix: 'g' }
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct BrokerInfo {
pub broker_id: i32,
pub host: String,
pub port: i32,
pub rack: Option<String>,
}