tansu-storage 0.6.0

Storage abstraction for PostgreSQL, libSQL and ObjectStore (S3 or memory)
Documentation
// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Type definitions for SlateDB storage engine
//!
//! # Key Design for LSM-tree
//!
//! All keys follow a consistent pattern optimized for LSM-tree storage:
//!
//! ```text
//! {type_prefix}/{hierarchy...}/{leaf_id}
//! ```
//!
//! ## Key Prefixes (sorted by access pattern)
//!
//! | Prefix | Description | Key Structure |
//! |--------|-------------|---------------|
//! | `b/` | Batch data | `b/{topic_uuid}/{partition:be32}/{offset:be64}` |
//! | `c/` | Consumer group commits | `c/{group}/{topic}/{partition:be32}` |
//! | `g/` | Group state | `g/{group_id}` |
//! | `w/` | Watermarks | `w/{topic_uuid}/{partition:be32}` |
//!
//! ## Design Principles
//!
//! 1. **Prefix-first**: Type prefix comes first for efficient filtering
//! 2. **Big-endian integers**: Preserves numeric ordering in lexicographic sort
//! 3. **Fixed-width encoding**: Ensures consistent key ordering
//! 4. **Hierarchical structure**: Enables efficient prefix scans
//!
//! ## LSM-tree Considerations
//!
//! - Keys with same prefix are stored together → better compaction
//! - Bloom filters can efficiently skip unrelated key types
//! - Range scans for a partition only touch relevant SSTable blocks

use std::{collections::BTreeMap, time::SystemTime};

use serde::{Deserialize, Serialize};
use tansu_sans_io::create_topics_request::CreatableTopic;
use uuid::Uuid;

use crate::{GroupDetail, TxnState, Version};

// Type aliases
pub(super) type Group = String;
pub(super) type Offset = i64;
pub(super) type Partition = i32;
pub(super) type ProducerEpoch = i16;
pub(super) type ProducerId = i64;
pub(super) type Sequence = i32;
pub(super) type Topic = String;

// Collection types
pub(super) type Topics = BTreeMap<Topic, TopicMetadata>;
pub(super) type Producers = BTreeMap<ProducerId, ProducerDetail>;
pub(super) type Brokers = BTreeMap<i32, BrokerInfo>;
pub(super) type Transactions = BTreeMap<String, Txn>;

/// Transaction produce offset range
#[derive(
    Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
)]
pub(super) struct TxnProduceOffset {
    pub offset_start: Offset,
    pub offset_end: Offset,
}

/// Transaction commit offset
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct TxnCommitOffset {
    pub committed_offset: Offset,
    pub leader_epoch: Option<i32>,
    pub metadata: Option<String>,
}

/// Topic metadata
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct TopicMetadata {
    pub id: Uuid,
    pub topic: CreatableTopic,
}

/// Watermark for a topic partition
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct Watermark {
    pub low: Option<i64>,
    pub high: Option<i64>,
    pub timestamps: Option<BTreeMap<i64, i64>>,
}

/// Key for watermark storage: `w/{topic_uuid}/{partition:be32}`
///
/// Watermarks are accessed per-partition, so we use topic+partition as the key.
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct WatermarkKey {
    /// Type prefix for LSM-tree grouping
    pub prefix: char,
    /// Topic UUID (16 bytes, fixed)
    pub topic: Uuid,
    /// Partition number (big-endian for correct ordering)
    #[serde(with = "postcard::fixint::be")]
    pub partition: Partition,
}

impl Default for WatermarkKey {
    fn default() -> Self {
        Self {
            prefix: 'w',
            topic: Uuid::nil(),
            partition: 0,
        }
    }
}

impl WatermarkKey {
    pub(super) fn new(topic: Uuid, partition: Partition) -> Self {
        Self {
            prefix: 'w',
            topic,
            partition,
        }
    }
}

/// Group detail with version
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct GroupDetailVersion {
    pub detail: GroupDetail,
    pub version: Version,
}

impl GroupDetailVersion {
    pub(super) fn detail(self, detail: GroupDetail) -> Self {
        Self { detail, ..self }
    }

    pub(super) fn version(self, version: Version) -> Self {
        Self { version, ..self }
    }
}

/// Producer detail with sequence tracking
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct ProducerDetail {
    pub sequences: BTreeMap<ProducerEpoch, BTreeMap<String, BTreeMap<i32, Sequence>>>,
}

/// Transaction identifier
#[allow(dead_code)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct TxnId {
    pub transaction: String,
    pub producer_id: ProducerId,
    pub producer_epoch: ProducerEpoch,
    pub state: TxnState,
}

/// Transaction state
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct Txn {
    pub producer: ProducerId,
    pub epochs: BTreeMap<ProducerEpoch, TxnDetail>,
}

/// Transaction detail
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct TxnDetail {
    pub transaction_timeout_ms: i32,
    pub started_at: Option<SystemTime>,
    pub state: Option<TxnState>,
    pub produces: BTreeMap<Topic, BTreeMap<Partition, Option<TxnProduceOffset>>>,
    pub offsets: BTreeMap<Group, BTreeMap<Topic, BTreeMap<Partition, TxnCommitOffset>>>,
}

/// Key for batch storage: `b/{topic_uuid}/{partition:be32}/{offset:be64}`
///
/// Batches are the most frequently accessed data. The key structure enables:
/// - Efficient sequential reads by offset (big-endian preserves order)
/// - Prefix scan for all batches in a partition
/// - Bloom filter can quickly skip non-batch keys
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct BatchKey {
    /// Type prefix 'b' for batch
    pub prefix: char,
    /// Topic UUID (16 bytes, fixed)
    pub topic: Uuid,
    /// Partition number (big-endian for correct ordering)
    #[serde(with = "postcard::fixint::be")]
    pub partition: Partition,
    /// Offset within partition (big-endian for correct ordering)
    #[serde(with = "postcard::fixint::be")]
    pub offset: Offset,
}

impl Default for BatchKey {
    fn default() -> Self {
        Self {
            prefix: 'b',
            topic: Uuid::nil(),
            partition: 0,
            offset: 0,
        }
    }
}

impl BatchKey {
    pub(super) fn new(topic: Uuid, partition: Partition, offset: Offset) -> Self {
        Self {
            prefix: 'b',
            topic,
            partition,
            offset,
        }
    }

    /// Create a key for range scan starting from this offset
    pub(super) fn scan_from(topic: Uuid, partition: Partition, offset: Offset) -> Self {
        Self::new(topic, partition, offset)
    }
}

/// Prefix key for scanning all batches in a topic partition: `b/{topic_uuid}/{partition}`
///
/// This is a separate struct from `BatchKey` because we need to check if a scanned key
/// still belongs to the same topic/partition before decoding the batch data.
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct BatchKeyPrefix {
    /// Type prefix 'b' for batch
    pub prefix: char,
    /// Topic UUID (16 bytes, fixed)
    pub topic: Uuid,
    /// Partition number (big-endian for correct ordering)
    #[serde(with = "postcard::fixint::be")]
    pub partition: Partition,
}

impl BatchKeyPrefix {
    pub(super) fn new(topic: Uuid, partition: Partition) -> Self {
        Self {
            prefix: 'b',
            topic,
            partition,
        }
    }
}

/// Key for storing committed offsets: `c/{group}/{topic}/{partition:be32}`
///
/// Consumer group offsets are accessed by group, then by topic-partition.
/// This enables efficient "get all offsets for a group" scans.
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct OffsetCommitKey {
    /// Type prefix 'c' for commit
    pub prefix: char,
    /// Consumer group ID
    pub group: String,
    /// Topic name
    pub topic: String,
    /// Partition number (big-endian for correct ordering)
    #[serde(with = "postcard::fixint::be")]
    pub partition: Partition,
}

impl Default for OffsetCommitKey {
    fn default() -> Self {
        Self {
            prefix: 'c',
            group: String::new(),
            topic: String::new(),
            partition: 0,
        }
    }
}

impl OffsetCommitKey {
    pub(super) fn new(
        group: impl Into<String>,
        topic: impl Into<String>,
        partition: Partition,
    ) -> Self {
        Self {
            prefix: 'c',
            group: group.into(),
            topic: topic.into(),
            partition,
        }
    }
}

/// Prefix key for scanning all offsets in a consumer group: `c/{group}`
///
/// This is a separate struct from `OffsetCommitKey` because postcard serialization
/// includes length prefixes for strings, so we can't use an `OffsetCommitKey` with
/// empty topic as a scan prefix - it would include the empty string's length marker.
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct OffsetCommitKeyPrefix {
    /// Type prefix 'c' for commit
    pub prefix: char,
    /// Consumer group ID
    pub group: String,
}

impl OffsetCommitKeyPrefix {
    pub(super) fn new(group: impl Into<String>) -> Self {
        Self {
            prefix: 'c',
            group: group.into(),
        }
    }
}

/// Value stored for offset commits
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct OffsetCommitValue {
    pub offset: i64,
    pub leader_epoch: Option<i32>,
    pub metadata: Option<String>,
}

/// Key for storing group state: `g/{group_id}`
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct GroupKey {
    /// Type prefix 'g' for group
    pub prefix: char,
    /// Group ID
    pub group_id: String,
}

impl Default for GroupKey {
    fn default() -> Self {
        Self {
            prefix: 'g',
            group_id: String::new(),
        }
    }
}

impl GroupKey {
    pub(super) fn new(group_id: impl Into<String>) -> Self {
        Self {
            prefix: 'g',
            group_id: group_id.into(),
        }
    }
}

/// Prefix key for scanning all groups: `g`
///
/// This is a separate struct from `GroupKey` because postcard serialization
/// includes length prefixes for strings, so we can't use a `GroupKey` with
/// empty group_id as a scan prefix.
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct GroupKeyPrefix {
    /// Type prefix 'g' for group
    pub prefix: char,
}

impl GroupKeyPrefix {
    pub(super) fn new() -> Self {
        Self { prefix: 'g' }
    }
}

/// Stored broker information
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub(super) struct BrokerInfo {
    pub broker_id: i32,
    pub host: String,
    pub port: i32,
    pub rack: Option<String>,
}