use std::{cmp::Ordering, fmt, sync::Arc};
use bytes::Bytes;
use serde::Deserialize;
use slatedb::Db;
use tansu_sans_io::{ErrorCode, de::BatchDecoder, record::deflated::Batch};
use tansu_schema::{Registry, lake::House};
use tracing::debug;
use url::Url;
use crate::{Error, Result, TopicId};
use super::types::{TopicMetadata, Topics, Transactions};
#[derive(Clone)]
pub struct Engine {
pub(super) cluster: String,
pub(super) node: i32,
pub(super) advertised_listener: Url,
pub(super) db: Arc<Db>,
pub(super) schemas: Option<Registry>,
pub(super) lake: Option<House>,
}
impl fmt::Debug for Engine {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(stringify!(Engine))
.field("cluster", &self.cluster)
.field("node", &self.node)
.finish()
}
}
#[derive(Clone, Default)]
pub struct Builder {
cluster: Option<String>,
node: Option<i32>,
advertised_listener: Option<Url>,
db: Option<Arc<Db>>,
schemas: Option<Registry>,
lake: Option<House>,
}
impl fmt::Debug for Builder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Builder")
.field("cluster", &self.cluster)
.field("node", &self.node)
.field("advertised_listener", &self.advertised_listener)
.field("db", &self.db.as_ref().map(|_| "Arc<Db>"))
.field("schemas", &self.schemas.as_ref().map(|_| "Registry"))
.field("lake", &self.lake.as_ref().map(|_| "House"))
.finish()
}
}
impl Builder {
pub fn cluster(mut self, cluster: impl Into<String>) -> Self {
self.cluster = Some(cluster.into());
self
}
pub fn node(mut self, node: i32) -> Self {
self.node = Some(node);
self
}
pub fn advertised_listener(mut self, advertised_listener: Url) -> Self {
self.advertised_listener = Some(advertised_listener);
self
}
pub fn db(mut self, db: Arc<Db>) -> Self {
self.db = Some(db);
self
}
pub fn schemas(mut self, schemas: Option<Registry>) -> Self {
self.schemas = schemas;
self
}
pub fn lake(mut self, lake: Option<House>) -> Self {
self.lake = lake;
self
}
pub fn build(self) -> Engine {
Engine {
cluster: self.cluster.expect("cluster is required"),
node: self.node.expect("node is required"),
advertised_listener: self
.advertised_listener
.expect("advertised_listener is required"),
db: self.db.expect("db is required"),
schemas: self.schemas,
lake: self.lake,
}
}
pub fn try_build(self) -> Option<Engine> {
Some(Engine {
cluster: self.cluster?,
node: self.node?,
advertised_listener: self.advertised_listener?,
db: self.db?,
schemas: self.schemas,
lake: self.lake,
})
}
}
impl Engine {
pub(super) const BROKERS: &[u8] = b"brokers.pc.bin";
pub(super) const PRODUCERS: &[u8] = b"producers.pc.bin";
pub(super) const TOPICS: &[u8] = b"topics.pc.bin";
pub(super) const TRANSACTIONS: &[u8] = b"transactions.pc.bin";
pub fn new(cluster: &str, node: i32, advertised_listener: Url, db: Arc<Db>) -> Self {
Self {
cluster: cluster.to_string(),
node,
advertised_listener,
db,
schemas: None,
lake: None,
}
}
pub fn builder() -> Builder {
Builder::default()
}
pub(super) fn decode(&self, encoded: Bytes) -> Result<Batch> {
let decoder = BatchDecoder::new(encoded);
Batch::deserialize(decoder).map_err(Into::into)
}
pub(super) async fn load_metadata<T>(
&self,
tx: &slatedb::DbTransaction,
key: &[u8],
) -> Result<T>
where
T: serde::de::DeserializeOwned + Default,
{
tx.get(key)
.await
.map_err(Error::from)
.and_then(|bytes: Option<Bytes>| {
bytes.map_or(Ok(T::default()), |encoded| {
postcard::from_bytes(&encoded[..]).map_err(Into::into)
})
})
}
pub(super) fn save_metadata<T>(
&self,
tx: &slatedb::DbTransaction,
key: &[u8],
value: &T,
) -> Result<()>
where
T: serde::Serialize,
{
postcard::to_stdvec(value)
.map_err(Error::from)
.and_then(|encoded| tx.put(key, encoded).map_err(Into::into))
}
pub(super) async fn get_topics(&self) -> Result<Topics> {
self.db
.get(Self::TOPICS)
.await
.map_err(Error::from)
.and_then(|topics| {
topics.map_or(Ok(Topics::default()), |encoded| {
postcard::from_bytes(&encoded[..]).map_err(Into::into)
})
})
}
pub(super) async fn get_transactions(&self) -> Result<Transactions> {
self.db
.get(Self::TRANSACTIONS)
.await
.map_err(Error::from)
.and_then(|txns| {
txns.map_or(Ok(Transactions::default()), |encoded| {
postcard::from_bytes(&encoded[..]).map_err(Into::into)
})
})
}
pub(super) async fn topic_metadata(&self, topic_id: &TopicId) -> Result<Option<TopicMetadata>> {
let topics = self.get_topics().await?;
match topic_id {
TopicId::Name(name) => Ok(topics.get(name.as_str()).cloned()),
TopicId::Id(id) => Ok(topics.values().find(|tm| tm.id == *id).cloned()),
}
}
pub(super) fn idempotent_sequence_check(
producer_epoch: &i16,
sequence: &i32,
deflated: &Batch,
) -> Result<i32> {
match producer_epoch.cmp(&deflated.producer_epoch) {
Ordering::Equal => match sequence.cmp(&deflated.base_sequence) {
Ordering::Equal => Ok(deflated.last_offset_delta + 1),
Ordering::Greater => {
debug!(
expected_sequence = ?sequence,
received_sequence = ?deflated.base_sequence,
"Duplicate sequence detected"
);
Err(Error::Api(ErrorCode::DuplicateSequenceNumber))
}
Ordering::Less => {
debug!(
expected_sequence = ?sequence,
received_sequence = ?deflated.base_sequence,
"Out of order sequence detected"
);
Err(Error::Api(ErrorCode::OutOfOrderSequenceNumber))
}
},
Ordering::Greater => Err(Error::Api(ErrorCode::ProducerFenced)),
Ordering::Less => Err(Error::Api(ErrorCode::InvalidProducerEpoch)),
}
}
}