use std::{
collections::{BTreeMap, BTreeSet, btree_map::Entry},
fmt::{Debug, Display},
str::FromStr,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{
StreamExt,
stream::{BoxStream, TryStreamExt},
};
use metadata::Cache;
use object_store::{
Attribute, AttributeValue, Attributes, CopyOptions, DynObjectStore, GetOptions, GetResult,
ListResult, MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreExt, PutMode,
PutMultipartOptions, PutOptions, PutPayload, PutResult, UpdateVersion, path::Path,
};
use opentelemetry::{
KeyValue,
metrics::{Counter, Histogram},
};
use opticon::OptiCon;
use rand::{prelude::*, rng};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use tansu_sans_io::{
BatchAttribute, ConfigResource, ConfigSource, ConfigType, ControlBatch, EndTransactionMarker,
ErrorCode, IsolationLevel, ListOffset, NULL_TOPIC_ID, OpType, ScramMechanism,
add_partitions_to_txn_response::{
AddPartitionsToTxnPartitionResult, AddPartitionsToTxnTopicResult,
},
create_topics_request::{CreatableTopic, CreatableTopicConfig},
delete_groups_response::DeletableGroupResult,
delete_records_request::DeleteRecordsTopic,
delete_records_response::DeleteRecordsTopicResult,
describe_cluster_response::DescribeClusterBroker,
describe_configs_response::{DescribeConfigsResourceResult, DescribeConfigsResult},
describe_topic_partitions_response::{
DescribeTopicPartitionsResponsePartition, DescribeTopicPartitionsResponseTopic,
},
incremental_alter_configs_request::{AlterConfigsResource, AlterableConfig},
incremental_alter_configs_response::AlterConfigsResourceResponse,
list_groups_response::ListedGroup,
metadata_response::{MetadataResponseBroker, MetadataResponsePartition, MetadataResponseTopic},
record::{Record, deflated, inflated},
txn_offset_commit_response::{TxnOffsetCommitResponsePartition, TxnOffsetCommitResponseTopic},
};
use tansu_schema::{
Registry,
lake::{House, LakeHouse as _},
};
use tracing::{debug, error, instrument, warn};
use url::Url;
use uuid::Uuid;
mod metadata;
mod opticon;
use crate::{
BrokerRegistrationRequest, Error, GroupDetail, ListOffsetResponse, METER, MetadataResponse,
NamedGroupDetail, OffsetCommitRequest, OffsetStage, ProducerIdResponse, Result,
ScramCredential, Storage, TopicId, Topition, TxnAddPartitionsRequest, TxnAddPartitionsResponse,
TxnOffsetCommitRequest, TxnState, UpdateError, Version,
};
const APPLICATION_JSON: &str = "application/json";
#[derive(Clone, Debug)]
pub struct DynoStore {
cluster: String,
node: i32,
advertised_listener: Url,
schemas: Option<Registry>,
lake: Option<House>,
watermarks: Arc<Mutex<BTreeMap<Topition, OptiCon<Watermark>>>>,
meta: OptiCon<Meta>,
object_store: Arc<DynObjectStore>,
}
type Group = String;
type Offset = i64;
type Partition = i32;
type ProducerEpoch = i16;
type ProducerId = i64;
type Sequence = i32;
type Topic = String;
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
struct Meta {
producers: BTreeMap<ProducerId, ProducerDetail>,
topics: BTreeMap<Topic, TopicMetadata>,
transactions: BTreeMap<String, Txn>,
}
impl OptiCon<Meta> {
fn new(cluster: &str) -> Self {
Self::path(format!("clusters/{cluster}/meta.json"))
}
}
impl Meta {
fn produced(
&self,
transaction_id: &str,
producer_id: ProducerId,
producer_epoch: ProducerEpoch,
) -> Result<BTreeMap<Topition, TxnProduceOffset>> {
let Some(txn) = self.transactions.get(transaction_id) else {
return Err(Error::Api(ErrorCode::TransactionalIdNotFound));
};
if txn.producer != producer_id {
return Err(Error::Api(ErrorCode::UnknownProducerId));
}
let Some(txn_detail) = txn.epochs.get(&producer_epoch) else {
return Err(Error::Api(ErrorCode::ProducerFenced));
};
let mut produced = BTreeMap::new();
for (topic, partitions) in txn_detail.produces.iter() {
for (partition, offset_range) in partitions.iter() {
let Some(offset_range) = offset_range else {
continue;
};
let tp = Topition::new(topic.to_owned(), *partition);
assert_eq!(None, produced.insert(tp, *offset_range));
}
}
Ok(produced)
}
fn overlapping_transactions(
&self,
transaction_id: &str,
producer_id: ProducerId,
producer_epoch: ProducerEpoch,
) -> Result<Vec<TxnId>> {
let candidates = self.produced(transaction_id, producer_id, producer_epoch)?;
let mut overlapping = Vec::new();
'candidates: for (candidate_id, txn) in self.transactions.iter() {
for (epoch, txn_detail) in txn.epochs.iter() {
if transaction_id == candidate_id
&& producer_id == txn.producer
&& producer_epoch == *epoch
{
continue;
}
let Some(state) = txn_detail.state else {
continue;
};
for (topic, partitions) in txn_detail.produces.iter() {
for (partition, offset_range) in partitions.iter() {
let Some(offset_range) = offset_range else {
continue;
};
let tp = Topition::new(topic.to_owned(), *partition);
if let Some(candidate) = candidates.get(&tp)
&& offset_range.offset_start < candidate.offset_end
{
overlapping.push(TxnId {
transaction: candidate_id.to_owned(),
producer_id: txn.producer,
producer_epoch: *epoch,
state,
});
continue 'candidates;
}
}
}
}
}
Ok(overlapping)
}
fn alter_topic(&mut self, topic: &str, changes: &[AlterableConfig]) -> Result<()> {
if let Some(metadata) = self.topics.get_mut(topic) {
let mut configuration = metadata
.topic
.configs
.as_deref()
.unwrap_or_default()
.iter()
.fold(BTreeMap::new(), |mut acc, item| {
_ = acc.insert(item.name.as_str(), item.value.as_deref());
acc
});
for change in changes {
match OpType::try_from(change.config_operation)? {
OpType::Set => {
_ = configuration.insert(change.name.as_str(), change.value.as_deref());
}
OpType::Delete => {
_ = configuration.remove(change.name.as_str());
}
OpType::Append => todo!(),
OpType::Subtract => todo!(),
}
}
_ = metadata
.topic
.configs
.replace(
configuration
.into_iter()
.fold(Vec::new(), |mut acc, (key, value)| {
acc.push(
CreatableTopicConfig::default()
.name(key.to_owned())
.value(value.map(|value| value.to_owned())),
);
acc
}),
);
}
Ok(())
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
struct ProducerDetail {
sequences: BTreeMap<ProducerEpoch, BTreeMap<String, BTreeMap<i32, Sequence>>>,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
struct TxnId {
transaction: String,
producer_id: ProducerId,
producer_epoch: ProducerEpoch,
state: TxnState,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
struct Txn {
producer: ProducerId,
epochs: BTreeMap<ProducerEpoch, TxnDetail>,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
struct TxnDetail {
transaction_timeout_ms: i32,
started_at: Option<SystemTime>,
state: Option<TxnState>,
produces: BTreeMap<Topic, BTreeMap<Partition, Option<TxnProduceOffset>>>,
offsets: BTreeMap<Group, BTreeMap<Topic, BTreeMap<Partition, TxnCommitOffset>>>,
}
impl From<&TxnDetail> for BTreeMap<Topition, Offset> {
fn from(value: &TxnDetail) -> Self {
let mut result = BTreeMap::new();
for (topic, partitions) in value.produces.iter() {
for (partition, offset_range) in partitions.iter() {
let Some(offset_range) = offset_range else {
continue;
};
let tp = Topition::new(topic.to_owned(), *partition);
assert_eq!(None, result.insert(tp, offset_range.offset_start));
}
}
result
}
}
#[derive(
Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
)]
struct TxnProduceOffset {
offset_start: Offset,
offset_end: Offset,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
struct TxnCommitOffset {
committed_offset: Offset,
leader_epoch: Option<i32>,
metadata: Option<String>,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
struct TopicMetadata {
id: Uuid,
topic: CreatableTopic,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
struct Watermark {
low: Option<i64>,
high: Option<i64>,
timestamps: Option<BTreeMap<i64, i64>>,
}
impl OptiCon<Watermark> {
fn new(cluster: &str, topition: &Topition) -> Self {
Self::path(format!(
"clusters/{}/topics/{}/partitions/{:0>10}/watermark.json",
cluster, topition.topic, topition.partition,
))
}
}
fn json_content_type() -> Attributes {
let mut attributes = Attributes::new();
_ = attributes.insert(
Attribute::ContentType,
AttributeValue::from(APPLICATION_JSON),
);
attributes
}
impl DynoStore {
pub fn new(cluster: &str, node: i32, object_store: impl ObjectStore) -> Self {
Self {
cluster: cluster.into(),
node,
advertised_listener: Url::parse("tcp://127.0.0.1/").unwrap(),
schemas: None,
lake: None,
watermarks: Arc::new(Mutex::new(BTreeMap::new())),
meta: OptiCon::<Meta>::new(cluster),
object_store: Arc::new(Cache::new(
Metron::new(object_store, cluster),
Duration::from_millis(5_000),
)),
}
}
pub fn advertised_listener(self, advertised_listener: Url) -> Self {
Self {
advertised_listener,
..self
}
}
pub fn schemas(self, schemas: Option<Registry>) -> Self {
Self { schemas, ..self }
}
pub fn lake(self, lake: Option<House>) -> Self {
Self { lake, ..self }
}
async fn topic_metadata(&self, topic: &TopicId) -> Result<Option<TopicMetadata>> {
debug!(?topic);
self.meta
.with(&self.object_store, |meta| match topic {
TopicId::Name(name) => Ok(meta.topics.get(name).cloned()),
TopicId::Id(id) => {
for (_, metadata) in meta.topics.iter() {
if &metadata.id == id {
return Ok(Some(metadata.clone()));
}
}
Ok(None)
}
})
.await
}
fn encode(&self, deflated: deflated::Batch) -> Result<PutPayload> {
Ok(PutPayload::from(Bytes::from(deflated)))
}
fn decode(&self, encoded: Bytes) -> Result<deflated::Batch> {
debug!(encoded = ?&encoded[..]);
deflated::Batch::try_from(encoded)
.inspect_err(|err| debug!(?err))
.map_err(Into::into)
}
async fn get<V>(&self, location: &Path) -> Result<(V, Version)>
where
V: DeserializeOwned,
{
let get_result = self.object_store.get(location).await?;
let meta = get_result.meta.clone();
let payload = get_result
.bytes()
.await
.map_err(Into::into)
.and_then(|encoded| serde_json::from_reader(&encoded[..]).map_err(Error::from))?;
Ok((payload, meta.into()))
}
async fn put<V>(
&self,
location: &Path,
value: V,
attributes: Attributes,
update_version: Option<UpdateVersion>,
) -> Result<PutResult, UpdateError<V>>
where
V: PartialEq + Serialize + DeserializeOwned + Debug,
{
debug!(%location, ?attributes, ?update_version, ?value);
let options = PutOptions {
mode: update_version.map_or(PutMode::Create, PutMode::Update),
attributes,
..Default::default()
};
let payload = serde_json::to_vec(&value)
.map(Bytes::from)
.map(PutPayload::from)?;
match self
.object_store
.put_opts(location, payload, options)
.await
.inspect_err(|error| debug!(%location, ?error))
{
Ok(put_result) => Ok(put_result),
Err(object_store::Error::Precondition { .. })
| Err(object_store::Error::AlreadyExists { .. }) => {
let (current, version) = self
.get(location)
.await
.inspect_err(|error| error!(%location, ?error))?;
debug!(%location, ?value, ?current);
Err(UpdateError::Outdated {
current: Box::new(current),
version,
})
}
Err(otherwise) => Err(otherwise.into()),
}
}
fn txn_offset_commit_response_error(
offsets: &TxnOffsetCommitRequest,
error_code: ErrorCode,
) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
let mut responses = vec![];
for topic in &offsets.topics {
let mut partition_responses = vec![];
if let Some(partitions) = topic.partitions.as_deref() {
for partition in partitions {
partition_responses.push(
TxnOffsetCommitResponsePartition::default()
.partition_index(partition.partition_index)
.error_code(error_code.into()),
);
}
}
responses.push(
TxnOffsetCommitResponseTopic::default()
.name(topic.name.to_string())
.partitions(Some(partition_responses)),
);
}
Ok(responses)
}
}
#[async_trait]
impl Storage for DynoStore {
async fn register_broker(&self, _broker_registration: BrokerRegistrationRequest) -> Result<()> {
Ok(())
}
async fn incremental_alter_resource(
&self,
resource: AlterConfigsResource,
) -> Result<AlterConfigsResourceResponse> {
let _ = resource;
match ConfigResource::from(resource.resource_type) {
ConfigResource::Group => Ok(AlterConfigsResourceResponse::default()
.error_code(ErrorCode::None.into())
.error_message(Some("".into()))
.resource_type(resource.resource_type)
.resource_name(resource.resource_name)),
ConfigResource::ClientMetric => Ok(AlterConfigsResourceResponse::default()
.error_code(ErrorCode::None.into())
.error_message(Some("".into()))
.resource_type(resource.resource_type)
.resource_name(resource.resource_name)),
ConfigResource::BrokerLogger => Ok(AlterConfigsResourceResponse::default()
.error_code(ErrorCode::None.into())
.error_message(Some("".into()))
.resource_type(resource.resource_type)
.resource_name(resource.resource_name)),
ConfigResource::Broker => Ok(AlterConfigsResourceResponse::default()
.error_code(ErrorCode::None.into())
.error_message(Some("".into()))
.resource_type(resource.resource_type)
.resource_name(resource.resource_name)),
ConfigResource::Topic => self
.meta
.with_mut(&self.object_store, |meta| {
meta.alter_topic(
resource.resource_name.as_str(),
resource.configs.as_deref().unwrap_or_default(),
)
})
.await
.map(|()| {
AlterConfigsResourceResponse::default()
.error_code(ErrorCode::None.into())
.error_message(Some("".into()))
.resource_type(resource.resource_type)
.resource_name(resource.resource_name)
}),
ConfigResource::Unknown => Ok(AlterConfigsResourceResponse::default()
.error_code(ErrorCode::None.into())
.error_message(Some("".into()))
.resource_type(resource.resource_type)
.resource_name(resource.resource_name)),
}
}
#[instrument(skip_all, fields(topic = %topic.name))]
async fn create_topic(&self, topic: CreatableTopic, _validate_only: bool) -> Result<Uuid> {
match self
.meta
.with_mut(&self.object_store, |meta| {
if meta.topics.contains_key(topic.name.as_str()) {
return Err(Error::Api(ErrorCode::TopicAlreadyExists));
}
let id = Uuid::now_v7();
debug!(%id);
let td = TopicMetadata {
id,
topic: topic.clone(),
};
assert_eq!(None, meta.topics.insert(topic.name.clone(), td));
Ok(id)
})
.await
{
Ok(id) => {
for partition in 0..topic.num_partitions {
let topition = Topition::new(topic.name.as_str(), partition);
let watermark = self.watermarks.lock().map(|mut locked| {
locked
.entry(topition.to_owned())
.or_insert(OptiCon::<Watermark>::new(self.cluster.as_str(), &topition))
.to_owned()
})?;
watermark
.with_mut(&self.object_store, |watermark| {
_ = watermark.high.take();
_ = watermark.low.take();
Ok(())
})
.await?;
}
Ok(id)
}
error @ Err(_) => error,
}
}
async fn delete_records(
&self,
_topics: &[DeleteRecordsTopic],
) -> Result<Vec<DeleteRecordsTopicResult>> {
todo!()
}
async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
if let Some(metadata) = self.topic_metadata(topic).await? {
self.meta
.with_mut(&self.object_store, |meta| {
_ = meta.topics.remove(metadata.topic.name.as_str());
Ok(())
})
.await?;
let prefix = Path::from(format!(
"clusters/{}/topics/{}/",
self.cluster, metadata.topic.name,
));
let locations = self
.object_store
.list(Some(&prefix))
.map_ok(|m| m.location)
.boxed();
_ = self
.object_store
.delete_stream(locations)
.try_collect::<Vec<Path>>()
.await?;
let prefix = Path::from(format!("clusters/{}/groups/consumers/", self.cluster));
let topic_name = metadata.topic.name.clone();
let prefix_clone = prefix.clone();
let locations = self
.object_store
.list(Some(&prefix))
.filter_map(move |m| {
let prefix = prefix_clone.clone();
let topic_name = topic_name.clone();
async move {
m.map_or(None, |m| {
debug!(?m.location);
m.location.prefix_match(&prefix).and_then(|mut i| {
_ = i.next();
let sub = Path::from_iter(i);
debug!(?sub);
if sub.prefix_matches(&Path::from(format!(
"offsets/{}/partitions/",
topic_name
))) {
Some(Ok(m.location.clone()))
} else {
None
}
})
})
}
})
.boxed();
_ = self
.object_store
.delete_stream(locations)
.try_collect::<Vec<Path>>()
.await?;
Ok(ErrorCode::None)
} else {
Ok(ErrorCode::UnknownTopicOrPartition)
}
}
async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
let broker_id = self.node;
let host = self
.advertised_listener
.host_str()
.unwrap_or("0.0.0.0")
.into();
let port = self.advertised_listener.port().unwrap_or(9092).into();
let rack = None;
Ok(vec![
DescribeClusterBroker::default()
.broker_id(broker_id)
.host(host)
.port(port)
.rack(rack),
])
}
async fn produce(
&self,
transaction_id: Option<&str>,
topition: &Topition,
deflated: deflated::Batch,
) -> Result<i64> {
let config = self
.describe_config(topition.topic(), ConfigResource::Topic, None)
.await
.inspect_err(|err| debug!(?err))?;
if self.lake.is_some()
&& config
.configs
.as_ref()
.map(|configs| {
configs
.iter()
.inspect(|config| debug!(?config))
.any(|config| {
config.name.as_str() == "tansu.lake.sink"
&& config
.value
.as_deref()
.and_then(|value| bool::from_str(value).ok())
.unwrap_or(false)
})
})
.unwrap_or(false)
{
let watermark = self.watermarks.lock().map(|mut locked| {
locked
.entry(topition.to_owned())
.or_insert_with(|| OptiCon::<Watermark>::new(self.cluster.as_str(), topition))
.to_owned()
})?;
let offset = watermark
.with_mut(&self.object_store, |watermark| {
debug!(?watermark);
let offset = watermark.high.unwrap_or_default();
watermark.high = watermark.high.map_or_else(
|| Some(deflated.last_offset_delta as i64 + 1i64),
|high| Some(high + deflated.last_offset_delta as i64 + 1i64),
);
watermark.timestamps = None;
debug!(?watermark);
Ok(offset)
})
.await
.inspect(|offset| debug!(offset, transaction_id, ?topition))
.inspect_err(|err| error!(?err, transaction_id, ?topition))?;
if let Some(ref registry) = self.schemas {
let batch_attribute = BatchAttribute::try_from(deflated.attributes)
.inspect(|batch_attribute| debug!(?batch_attribute))
.inspect_err(|err| debug!(?err))?;
if !batch_attribute.control {
let inflated = inflated::Batch::try_from(&deflated)
.inspect(|inflated| debug!(?inflated))
.inspect_err(|err| debug!(?err))?;
registry
.validate(topition.topic(), &inflated)
.await
.inspect(|validation| debug!(?validation))
.inspect_err(|err| debug!(?err))?;
if let Some(ref lake) = self.lake {
lake.store(
topition.topic(),
topition.partition(),
offset,
&inflated,
config,
)
.await
.inspect(|store| debug!(?store))
.inspect_err(|err| debug!(?err))?;
}
}
}
Ok(offset)
} else {
if deflated.is_idempotent() {
self.meta
.with_mut(&self.object_store, |meta| {
let Some(pd) = meta.producers.get_mut(&deflated.producer_id) else {
debug!(producer_id = deflated.producer_id, ?meta.producers);
return Err(Error::Api(ErrorCode::UnknownProducerId));
};
let Some(mut current) = pd.sequences.last_entry() else {
debug!(last_entry = ?pd.sequences.last_entry());
return Err(Error::Api(ErrorCode::UnknownServerError));
};
if current.key() != &deflated.producer_epoch {
debug!(current = ?current.key(), producer_epoch = deflated.producer_epoch);
return Err(Error::Api(ErrorCode::ProducerFenced));
}
let sequences = current.get_mut();
debug!(?sequences);
match sequences
.entry(topition.topic.clone())
.or_default()
.entry(topition.partition)
.or_default()
{
sequence if *sequence < deflated.base_sequence => {
debug!(?sequence, base_sequence = deflated.base_sequence);
Err(Error::Api(ErrorCode::OutOfOrderSequenceNumber))
}
sequence if *sequence > deflated.base_sequence => {
debug!(?sequence, base_sequence = deflated.base_sequence);
Err(Error::Api(ErrorCode::DuplicateSequenceNumber))
}
sequence => {
debug!(?sequence, delta = deflated.last_offset_delta + 1);
*sequence += deflated.last_offset_delta + 1;
Ok(())
}
}
})
.await
.inspect(|outcome| debug!(transaction_id, ?topition, ?outcome))
.inspect_err(|err| error!(?err, transaction_id, ?topition))?;
}
if let Some(ref registry) = self.schemas {
let batch_attribute = BatchAttribute::try_from(deflated.attributes)
.inspect_err(|err| debug!(?err))?;
if !batch_attribute.control {
let inflated =
inflated::Batch::try_from(&deflated).inspect_err(|err| debug!(?err))?;
registry
.validate(topition.topic(), &inflated)
.await
.inspect_err(|err| debug!(?err))?;
}
}
let watermark = self.watermarks.lock().map(|mut locked| {
locked
.entry(topition.to_owned())
.or_insert_with(|| OptiCon::<Watermark>::new(self.cluster.as_str(), topition))
.to_owned()
})?;
let offset = watermark
.with_mut(&self.object_store, |watermark| {
debug!(?watermark);
let offset = watermark.high.unwrap_or_default();
watermark.high = watermark.high.map_or_else(
|| Some(deflated.last_offset_delta as i64 + 1i64),
|high| Some(high + deflated.last_offset_delta as i64 + 1i64),
);
watermark.timestamps = None;
debug!(?watermark);
Ok(offset)
})
.await
.inspect(|offset| debug!(offset, transaction_id, ?topition))
.inspect_err(|err| error!(?err, transaction_id, ?topition))?;
let attributes =
BatchAttribute::try_from(deflated.attributes).inspect_err(|err| debug!(?err))?;
if !attributes.control
&& let Some(ref lake) = self.lake
{
let inflated =
inflated::Batch::try_from(&deflated).inspect_err(|err| debug!(?err))?;
lake.store(
topition.topic(),
topition.partition(),
offset,
&inflated,
config,
)
.await
.inspect(|store| debug!(?store))
.inspect_err(|err| debug!(?err))?;
}
if let Some(transaction_id) = transaction_id
&& attributes.transaction
{
self.meta
.with_mut(&self.object_store, |meta| {
if let Some(transaction) = meta.transactions.get_mut(transaction_id) {
debug!(?transaction);
if let Some(txn_detail) =
transaction.epochs.get_mut(&deflated.producer_epoch)
{
debug!(?txn_detail);
let offset_end = offset + deflated.last_offset_delta as i64;
_ = txn_detail
.produces
.entry(topition.topic.clone())
.or_default()
.entry(topition.partition)
.and_modify(|entry| {
let range = entry.get_or_insert(TxnProduceOffset {
offset_start: offset,
offset_end,
});
if offset_end > range.offset_end {
range.offset_end = offset_end;
}
})
.or_insert(Some(TxnProduceOffset {
offset_start: offset,
offset_end,
}));
}
}
Ok(())
})
.await
.inspect(|outcome| debug!(?outcome, transaction_id, ?topition))
.inspect_err(|err| error!(?err, transaction_id, ?topition))?;
}
let location = Path::from(format!(
"clusters/{}/topics/{}/partitions/{:0>10}/records/{:0>20}.batch",
self.cluster, topition.topic, topition.partition, offset,
));
let payload = self.encode(deflated).inspect_err(|err| debug!(?err))?;
_ = self
.object_store
.put_opts(
&location,
payload,
PutOptions {
mode: PutMode::Create,
attributes: Attributes::new(),
..Default::default()
},
)
.await
.inspect(|outcome| debug!(?outcome, transaction_id, ?topition))
.inspect_err(|error| error!(?error, transaction_id, ?topition))?;
Ok(offset)
}
}
async fn fetch(
&self,
topition: &'_ Topition,
offset: i64,
min_bytes: u32,
max_bytes: u32,
isolation_level: IsolationLevel,
) -> Result<Vec<deflated::Batch>> {
let high_watermark = self.offset_stage(topition).await.map(|offset_stage| {
if isolation_level == IsolationLevel::ReadCommitted {
offset_stage.last_stable
} else {
offset_stage.high_watermark
}
})?;
debug!(high_watermark);
let mut offsets = BTreeSet::new();
if offset < high_watermark {
let location = Path::from(format!(
"clusters/{}/topics/{}/partitions/{:0>10}/records/",
self.cluster, topition.topic, topition.partition
));
let mut list_stream = self.object_store.list(Some(&location));
while let Some(meta) = list_stream
.next()
.await
.inspect(|meta| debug!(?meta))
.transpose()
.inspect_err(|error| error!(?error, ?topition, ?offset, ?min_bytes, ?max_bytes))
.map_err(|_| Error::Api(ErrorCode::UnknownServerError))?
{
let Some(offset) = meta.location.parts().next_back() else {
continue;
};
let offset = i64::from_str(&offset.as_ref()[0..20])?;
debug!(offset);
if offset < high_watermark {
_ = offsets.insert(offset);
}
}
}
let mut batches = vec![];
let mut bytes = max_bytes as u64;
for offset in offsets.split_off(&offset) {
debug!(?offset);
let location = Path::from(format!(
"clusters/{}/topics/{}/partitions/{:0>10}/records/{:0>20}.batch",
self.cluster, topition.topic, topition.partition, offset,
));
let get_result = self
.object_store
.get(&location)
.await
.inspect_err(|error| error!(?error, ?topition, ?offset, ?min_bytes, ?max_bytes))
.map_err(|_| Error::Api(ErrorCode::UnknownServerError))?;
let size = get_result.meta.size;
let mut batch = get_result
.bytes()
.await
.inspect_err(|error| error!(?error, %location))
.map_err(|_| Error::Api(ErrorCode::UnknownServerError))
.and_then(|encoded| self.decode(encoded))?;
batch.base_offset = offset;
batches.push(batch);
if size > bytes {
break;
} else {
bytes = bytes.saturating_sub(size);
}
}
Ok(batches)
}
async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
let stable = self
.meta
.with(&self.object_store, |meta| {
Ok(meta
.transactions
.values()
.flat_map(|txn| {
debug!(?txn);
txn.epochs
.values()
.filter(|detail| {
detail.state.is_some_and(|state| {
state != TxnState::Committed && state != TxnState::Aborted
})
})
.map(BTreeMap::<Topition, Offset>::from)
.collect::<Vec<_>>()
})
.reduce(|mut acc, e| {
debug!(?acc, ?e);
for (topition, offset_start) in e.iter() {
_ = acc
.entry(topition.to_owned())
.and_modify(|existing_offset_start| {
if *existing_offset_start > *offset_start {
*existing_offset_start = *offset_start
}
})
.or_insert(*offset_start);
}
acc
})
.unwrap_or(BTreeMap::new()))
})
.await?;
debug!(?stable);
let watermark = self.watermarks.lock().map(|mut locked| {
locked
.entry(topition.to_owned())
.or_insert(OptiCon::<Watermark>::new(self.cluster.as_str(), topition))
.to_owned()
})?;
watermark
.with(&self.object_store, |watermark| {
debug!(?watermark);
let high_watermark = watermark.high.unwrap_or(0);
let log_start = watermark.low.unwrap_or(0);
let last_stable = stable.get(topition).copied().unwrap_or(high_watermark);
Ok(OffsetStage {
last_stable,
high_watermark,
log_start,
})
})
.await
}
async fn list_offsets(
&self,
isolation_level: IsolationLevel,
offsets: &[(Topition, ListOffset)],
) -> Result<Vec<(Topition, ListOffsetResponse)>> {
let stable = if isolation_level == IsolationLevel::ReadCommitted {
self.meta
.with(&self.object_store, |meta| {
Ok(meta
.transactions
.values()
.flat_map(|txn| {
txn.epochs
.values()
.filter(|detail| {
detail.state.is_some_and(|state| {
state != TxnState::Committed && state != TxnState::Aborted
})
})
.map(BTreeMap::<Topition, Offset>::from)
.collect::<Vec<_>>()
})
.reduce(|mut acc, e| {
debug!(?acc, ?e);
for (topition, offset_start) in e.iter() {
_ = acc
.entry(topition.to_owned())
.and_modify(|existing_offset_start| {
if *existing_offset_start > *offset_start {
*existing_offset_start = *offset_start
}
})
.or_insert(*offset_start);
}
acc
})
.unwrap_or(BTreeMap::new()))
})
.await?
} else {
BTreeMap::new()
};
let mut responses = vec![];
for (topition, offset_request) in offsets {
let location = Path::from(format!(
"clusters/{}/topics/{}/partitions/{:0>10}/records",
self.cluster, topition.topic, topition.partition,
));
let mut list_stream = self.object_store.list(Some(&location));
let mut candidate: Option<ObjectMeta> = None;
while let Some(meta) = list_stream
.next()
.await
.inspect(|meta| debug!(?meta))
.transpose()
.inspect_err(|error| error!(?error))
.map_err(|_| Error::Api(ErrorCode::UnknownServerError))?
{
if let Some(last) = stable.get(topition)
&& offset_request == &ListOffset::Latest
{
let Some(found_offset) = candidate
.as_ref()
.and_then(|found| found.location.parts().next_back())
.and_then(|offset| i64::from_str(&offset.as_ref()[0..20]).ok())
else {
continue;
};
let Some(meta_offset) = meta
.location
.parts()
.next_back()
.and_then(|offset| i64::from_str(&offset.as_ref()[0..20]).ok())
else {
continue;
};
if meta_offset >= *last && found_offset > meta_offset {
_ = candidate.replace(meta);
}
} else {
match offset_request {
ListOffset::Earliest
if candidate
.as_ref()
.is_none_or(|found| found.last_modified > meta.last_modified) =>
{
_ = candidate.replace(meta);
}
ListOffset::Latest
if candidate
.as_ref()
.is_none_or(|found| meta.last_modified > found.last_modified) =>
{
_ = candidate.replace(meta);
}
ListOffset::Timestamp(system_time)
if SystemTime::from(meta.last_modified) > *system_time
&& candidate.as_ref().is_none_or(|found| {
found.last_modified > meta.last_modified
}) =>
{
_ = candidate.replace(meta);
}
_ => continue,
}
}
}
debug!(?candidate);
if let Some(ref found) = candidate {
let Some(offset) = found.location.parts().next_back() else {
continue;
};
let offset = i64::from_str(&offset.as_ref()[0..20])?;
debug!(offset);
responses.push((
topition.to_owned(),
ListOffsetResponse {
error_code: ErrorCode::None,
offset: Some(match offset_request {
ListOffset::Latest => offset + 1,
_ => offset,
}),
timestamp: Some(found.last_modified.into()),
},
))
} else {
responses.push((
topition.to_owned(),
ListOffsetResponse {
error_code: ErrorCode::None,
offset: Some(0),
..Default::default()
},
))
}
}
Ok(responses)
}
async fn offset_commit(
&self,
group_id: &str,
_retention_time_ms: Option<Duration>,
offsets: &[(Topition, OffsetCommitRequest)],
) -> Result<Vec<(Topition, ErrorCode)>> {
let mut responses = vec![];
for (topition, offset_commit) in offsets {
if self
.topic_metadata(&TopicId::from(topition))
.await?
.is_some()
{
let location = Path::from(format!(
"clusters/{}/groups/consumers/{}/offsets/{}/partitions/{:0>10}.json",
self.cluster, group_id, topition.topic, topition.partition,
));
let payload = serde_json::to_vec(&offset_commit)
.map(Bytes::from)
.map(PutPayload::from)?;
let options = PutOptions {
mode: PutMode::Overwrite,
attributes: json_content_type(),
..Default::default()
};
let error_code = self
.object_store
.put_opts(&location, payload, options)
.await
.inspect_err(|err| error!(?err))
.inspect(|outcome| debug!(?outcome))
.map_or(ErrorCode::UnknownServerError, |_| ErrorCode::None);
responses.push((topition.to_owned(), error_code));
} else {
responses.push((topition.to_owned(), ErrorCode::UnknownTopicOrPartition));
}
}
Ok(responses)
}
async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
let mut topitions = vec![];
{
let location = Path::from(format!(
"clusters/{}/groups/consumers/{}/offsets/",
self.cluster, group_id,
));
let mut list_stream = self.object_store.list(Some(&location));
while let Some(meta) = list_stream
.next()
.await
.inspect(|meta| debug!(?meta))
.transpose()
.inspect_err(|error| error!(?error))
.map_err(|_| Error::Api(ErrorCode::UnknownServerError))?
{
debug!(?meta);
let Some(topic): Option<String> = meta
.location
.parts()
.nth(6)
.inspect(|topic| debug!(?topic))
.map(|topic| topic.as_ref().into())
else {
continue;
};
let Some(partition) = meta
.location
.parts()
.nth(8)
.inspect(|partition| debug!(?partition))
.map(|partition| i32::from_str(&partition.as_ref()[0..10]))
.transpose()?
else {
continue;
};
debug!(topic, partition);
topitions.push(Topition::new(topic, partition));
}
}
self.offset_fetch(Some(group_id), topitions.as_ref(), Some(false))
.await
}
async fn offset_fetch(
&self,
group_id: Option<&str>,
topics: &[Topition],
_require_stable: Option<bool>,
) -> Result<BTreeMap<Topition, i64>> {
let mut responses = BTreeMap::new();
if let Some(group_id) = group_id {
for topition in topics {
let location = Path::from(format!(
"clusters/{}/groups/consumers/{}/offsets/{}/partitions/{:0>10}.json",
self.cluster, group_id, topition.topic, topition.partition,
));
let offset = match self.object_store.get(&location).await {
Ok(get_result) => get_result
.bytes()
.await
.map_err(Error::from)
.and_then(|encoded| {
serde_json::from_slice::<OffsetCommitRequest>(&encoded[..])
.map_err(Error::from)
})
.map(|commit| commit.offset)
.inspect_err(|error| error!(?error, ?group_id, ?topition))
.map_err(|_| Error::Api(ErrorCode::UnknownServerError)),
Err(object_store::Error::NotFound { .. }) => Ok(-1),
Err(error) => {
error!(?error, ?group_id, ?topition);
Err(Error::Api(ErrorCode::UnknownServerError))
}
}?;
_ = responses.insert(topition.to_owned(), offset);
}
}
Ok(responses)
}
async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
let brokers = vec![
MetadataResponseBroker::default()
.node_id(self.node)
.host(
self.advertised_listener
.host_str()
.unwrap_or("0.0.0.0")
.into(),
)
.port(self.advertised_listener.port().unwrap_or(9092).into())
.rack(None),
];
let responses = match topics {
Some(topics) if !topics.is_empty() => {
let mut responses = vec![];
for topic in topics {
let response = match self
.topic_metadata(topic)
.await
.inspect_err(|error| error!(?error))
{
Ok(Some(topic_metadata)) => {
let name = Some(topic_metadata.topic.name.to_owned());
let error_code = ErrorCode::None.into();
let topic_id = Some(topic_metadata.id.into_bytes());
let is_internal = Some(false);
let partitions = topic_metadata.topic.num_partitions;
let replication_factor = topic_metadata.topic.replication_factor;
debug!(
?error_code,
?topic_id,
?name,
?is_internal,
?partitions,
?replication_factor
);
let mut rng = rng();
let mut broker_ids: Vec<_> =
brokers.iter().map(|broker| broker.node_id).collect();
broker_ids.shuffle(&mut rng);
let mut brokers = broker_ids.into_iter().cycle();
let partitions = Some(
(0..partitions)
.map(|partition_index| {
let leader_id = brokers.next().expect("cycling");
let replica_nodes = Some(
(0..replication_factor)
.map(|_replica| brokers.next().expect("cycling"))
.collect(),
);
let isr_nodes = replica_nodes.clone();
MetadataResponsePartition::default()
.error_code(error_code)
.partition_index(partition_index)
.leader_id(leader_id)
.leader_epoch(Some(0))
.replica_nodes(replica_nodes)
.isr_nodes(isr_nodes)
.offline_replicas(Some([].into()))
})
.collect(),
);
MetadataResponseTopic::default()
.error_code(error_code)
.name(name)
.topic_id(topic_id)
.is_internal(is_internal)
.partitions(partitions)
.topic_authorized_operations(Some(-2147483648))
}
Ok(None) => MetadataResponseTopic::default()
.error_code(ErrorCode::UnknownTopicOrPartition.into())
.name(match topic {
TopicId::Name(name) => Some(name.into()),
TopicId::Id(_) => None,
})
.topic_id(Some(match topic {
TopicId::Name(_) => NULL_TOPIC_ID,
TopicId::Id(id) => id.into_bytes(),
}))
.is_internal(Some(false))
.partitions(Some([].into()))
.topic_authorized_operations(Some(-2147483648)),
Err(_) => MetadataResponseTopic::default()
.error_code(ErrorCode::UnknownServerError.into())
.name(match topic {
TopicId::Name(name) => Some(name.into()),
TopicId::Id(_) => Some("".into()),
})
.topic_id(Some(match topic {
TopicId::Name(_) => NULL_TOPIC_ID,
TopicId::Id(id) => id.into_bytes(),
}))
.is_internal(Some(false))
.partitions(Some([].into()))
.topic_authorized_operations(Some(-2147483648)),
};
responses.push(response);
}
responses
}
_ => {
self.meta
.with(&self.object_store, |meta| {
let mut responses = vec![];
for (name, topic_metadata) in meta.topics.iter() {
debug!(?name, ?topic_metadata);
let name = Some(name.to_owned());
let error_code = ErrorCode::None.into();
let topic_id = Some(topic_metadata.id.into_bytes());
let is_internal = Some(false);
let partitions = topic_metadata.topic.num_partitions;
let replication_factor = topic_metadata.topic.replication_factor;
debug!(
?error_code,
?topic_id,
?name,
?is_internal,
?partitions,
?replication_factor
);
let mut rng = rng();
let mut broker_ids: Vec<_> =
brokers.iter().map(|broker| broker.node_id).collect();
broker_ids.shuffle(&mut rng);
let mut brokers = broker_ids.into_iter().cycle();
let partitions = Some(
(0..partitions)
.map(|partition_index| {
let leader_id = brokers.next().expect("cycling");
let replica_nodes = Some(
(0..replication_factor)
.map(|_replica| brokers.next().expect("cycling"))
.collect(),
);
let isr_nodes = replica_nodes.clone();
MetadataResponsePartition::default()
.error_code(error_code)
.partition_index(partition_index)
.leader_id(leader_id)
.leader_epoch(Some(0))
.replica_nodes(replica_nodes)
.isr_nodes(isr_nodes)
.offline_replicas(Some([].into()))
})
.collect(),
);
responses.push(
MetadataResponseTopic::default()
.error_code(error_code)
.name(name)
.topic_id(topic_id)
.is_internal(is_internal)
.partitions(partitions)
.topic_authorized_operations(Some(-2147483648)),
);
}
Ok(responses)
})
.await?
}
};
Ok(MetadataResponse {
cluster: Some(self.cluster.clone()),
controller: Some(self.node),
brokers,
topics: responses,
})
}
async fn describe_config(
&self,
name: &str,
resource: ConfigResource,
_keys: Option<&[String]>,
) -> Result<DescribeConfigsResult> {
match resource {
ConfigResource::Topic => match self.topic_metadata(&TopicId::Name(name.into())).await {
Ok(Some(topic_metadata)) => {
let error_code = ErrorCode::None;
Ok(DescribeConfigsResult::default()
.error_code(error_code.into())
.error_message(Some(error_code.to_string()))
.resource_type(i8::from(resource))
.resource_name(name.into())
.configs(topic_metadata.topic.configs.map(|configs| {
configs
.iter()
.map(|config| {
DescribeConfigsResourceResult::default()
.name(config.name.clone())
.value(config.value.clone())
.read_only(false)
.is_default(None)
.config_source(Some(ConfigSource::DefaultConfig.into()))
.is_sensitive(false)
.synonyms(Some([].into()))
.config_type(Some(ConfigType::String.into()))
.documentation(Some("".into()))
})
.collect()
})))
}
Ok(None) => Ok(DescribeConfigsResult::default()
.error_code(ErrorCode::None.into())
.error_message(Some(ErrorCode::None.to_string()))
.resource_type(i8::from(resource))
.resource_name(name.into())
.configs(Some(vec![]))),
Err(_) => todo!(),
},
_ => Ok(DescribeConfigsResult::default()
.error_code(ErrorCode::None.into())
.error_message(Some(ErrorCode::None.to_string()))
.resource_type(i8::from(resource))
.resource_name(name.into())
.configs(Some(vec![]))),
}
}
async fn describe_topic_partitions(
&self,
topics: Option<&[TopicId]>,
partition_limit: i32,
cursor: Option<Topition>,
) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
let _ = (partition_limit, cursor);
let mut responses =
Vec::with_capacity(topics.map(|topics| topics.len()).unwrap_or_default());
for topic in topics.unwrap_or_default() {
match self
.topic_metadata(topic)
.await
.inspect_err(|error| error!(?error))
{
Ok(Some(topic_metadata)) => responses.push(
DescribeTopicPartitionsResponseTopic::default()
.error_code(ErrorCode::None.into())
.name(Some(topic_metadata.topic.name))
.topic_id(topic_metadata.id.into_bytes())
.is_internal(false)
.partitions(Some(
(0..topic_metadata.topic.num_partitions)
.map(|partition_index| {
DescribeTopicPartitionsResponsePartition::default()
.error_code(ErrorCode::None.into())
.partition_index(partition_index)
.leader_id(self.node)
.leader_epoch(0)
.replica_nodes(Some(vec![
self.node;
topic_metadata.topic.replication_factor
as usize
]))
.isr_nodes(Some(vec![
self.node;
topic_metadata.topic.replication_factor
as usize
]))
.eligible_leader_replicas(Some(vec![]))
.last_known_elr(Some(vec![]))
.offline_replicas(Some(vec![]))
})
.collect(),
))
.topic_authorized_operations(-2147483648),
),
Ok(None) => responses.push(
DescribeTopicPartitionsResponseTopic::default()
.error_code(ErrorCode::UnknownTopicOrPartition.into())
.name(match topic {
TopicId::Name(name) => Some(name.into()),
TopicId::Id(_) => None,
})
.topic_id(match topic {
TopicId::Name(_) => NULL_TOPIC_ID,
TopicId::Id(id) => id.into_bytes(),
})
.is_internal(false)
.partitions(Some([].into()))
.topic_authorized_operations(-2147483648),
),
Err(_) => responses.push(
DescribeTopicPartitionsResponseTopic::default()
.error_code(ErrorCode::UnknownServerError.into())
.name(match topic {
TopicId::Name(name) => Some(name.into()),
TopicId::Id(_) => None,
})
.topic_id(match topic {
TopicId::Name(_) => NULL_TOPIC_ID,
TopicId::Id(id) => id.into_bytes(),
})
.is_internal(false)
.partitions(Some([].into()))
.topic_authorized_operations(-2147483648),
),
}
}
Ok(responses)
}
async fn list_groups(&self, _states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
let location = Path::from(format!("clusters/{}/groups/consumers/", self.cluster,));
let list_result = self
.object_store
.list_with_delimiter(Some(&location))
.await
.inspect(|list_result| debug!(?list_result))
.inspect_err(|error| error!(?error, cluster = self.cluster))?;
let mut listed_groups = vec![];
for prefix in list_result.common_prefixes {
if let Some(group_id) = prefix.parts().next_back() {
listed_groups.push(
ListedGroup::default()
.group_id(group_id.as_ref().into())
.protocol_type("consumer".into())
.group_state(Some("Unknown".into()))
.group_type(Some("classic".into())),
);
}
}
Ok(listed_groups)
}
async fn delete_groups(
&self,
group_ids: Option<&[String]>,
) -> Result<Vec<DeletableGroupResult>> {
let mut results = vec![];
if let Some(group_ids) = group_ids {
for group_id in group_ids {
let location = Path::from(format!(
"clusters/{}/groups/consumers/{}.json",
self.cluster, group_id,
));
let had_group_state = self
.object_store
.delete(&location)
.await
.inspect(|outcome| debug!(group_id, ?outcome))
.inspect_err(|err| error!(group_id, ?err))
.is_ok();
debug!(group_id, had_group_state);
let prefix = Path::from(format!(
"clusters/{}/groups/consumers/{}",
self.cluster, group_id,
));
let locations = self
.object_store
.list(Some(&prefix))
.map_ok(|m| m.location)
.boxed();
let deleted_committed_offsets = self
.object_store
.delete_stream(locations)
.try_collect::<Vec<Path>>()
.await?;
debug!(group_id, ?deleted_committed_offsets);
results.push(
DeletableGroupResult::default()
.group_id(group_id.into())
.error_code(
if had_group_state || !deleted_committed_offsets.is_empty() {
ErrorCode::None
} else {
ErrorCode::GroupIdNotFound
}
.into(),
),
);
}
}
Ok(results)
}
async fn describe_groups(
&self,
group_ids: Option<&[String]>,
_include_authorized_operations: bool,
) -> Result<Vec<NamedGroupDetail>> {
let mut results = vec![];
if let Some(group_ids) = group_ids {
for group_id in group_ids {
let location = Path::from(format!(
"clusters/{}/groups/consumers/{}.json",
self.cluster, group_id,
));
match self
.get::<GroupDetail>(&location)
.await
.inspect(|o| debug!(?o, group_id))
.inspect_err(|err| error!(?err, group_id))
{
Ok((group_detail, _)) => {
results.push(NamedGroupDetail::found(group_id.into(), group_detail));
}
Err(Error::ObjectStore(error)) => match error.as_ref() {
object_store::Error::NotFound { .. } => {
results.push(NamedGroupDetail::found(
group_id.into(),
GroupDetail::default(),
));
}
_otherwise => {
results.push(NamedGroupDetail::found(
group_id.into(),
GroupDetail::default(),
));
}
},
Err(_) => {
results.push(NamedGroupDetail::error_code(
group_id.into(),
ErrorCode::UnknownServerError,
));
}
}
}
}
Ok(results)
}
async fn update_group(
&self,
group_id: &str,
detail: GroupDetail,
version: Option<Version>,
) -> Result<Version, UpdateError<GroupDetail>> {
let location = Path::from(format!(
"clusters/{}/groups/consumers/{}.json",
self.cluster, group_id,
));
self.put(
&location,
detail,
json_content_type(),
version.map(Into::into),
)
.await
.map(Into::into)
}
async fn init_producer(
&self,
transaction_id: Option<&str>,
transaction_timeout_ms: i32,
producer_id: Option<i64>,
producer_epoch: Option<i16>,
) -> Result<ProducerIdResponse> {
#[derive(Clone, Debug)]
enum InitProducer {
Completed(ProducerIdResponse),
NeedToRollback {
producer_id: i64,
producer_epoch: i16,
},
}
if let Some(transaction_id) = transaction_id {
match self
.meta
.with_mut(&self.object_store, |meta| {
debug!(?meta);
match (producer_id, producer_epoch) {
(Some(-1), Some(-1)) => {
match meta.transactions.entry(transaction_id.to_string()) {
Entry::Vacant(vacant) => {
let id = meta
.producers
.last_key_value()
.map_or(1.into(), |(k, _v)| k + 1);
let mut pd = ProducerDetail::default();
assert_eq!(None, pd.sequences.insert(0, BTreeMap::new()));
assert_eq!(None, meta.producers.insert(id, pd));
let mut epochs = BTreeMap::new();
assert_eq!(
None,
epochs.insert(
0,
TxnDetail {
transaction_timeout_ms,
..Default::default()
},
)
);
_ = vacant.insert(Txn {
producer: id,
epochs,
});
Ok(InitProducer::Completed(ProducerIdResponse {
id,
epoch: 0,
error: ErrorCode::None,
}))
}
Entry::Occupied(mut occupied) => {
if let Some((current_epoch, txn_detail)) =
occupied.get().epochs.last_key_value()
{
if txn_detail.state == Some(TxnState::Begin) {
Ok(InitProducer::NeedToRollback {
producer_id: occupied.get().producer,
producer_epoch: *current_epoch,
})
} else {
let id = occupied.get().producer;
let epoch = current_epoch + 1;
_ = meta.producers.entry(id).and_modify(|pd| {
assert_eq!(
None,
pd.sequences.insert(epoch, BTreeMap::new())
);
});
assert_eq!(
None,
occupied.get_mut().epochs.insert(
epoch,
TxnDetail {
transaction_timeout_ms,
..Default::default()
}
)
);
Ok(InitProducer::Completed(ProducerIdResponse {
id,
epoch,
error: ErrorCode::None,
}))
}
} else {
todo!()
}
}
}
}
(producer, epoch) => {
error!(?producer, ?epoch);
Ok(InitProducer::Completed(ProducerIdResponse {
id: -1,
epoch: -1,
error: ErrorCode::UnknownServerError,
}))
}
}
})
.await?
{
InitProducer::Completed(completed) => Ok(completed),
InitProducer::NeedToRollback {
producer_id: rollback_producer_id,
producer_epoch: rollback_producer_epoch,
} => {
let error_code = self
.txn_end(
transaction_id,
rollback_producer_id,
rollback_producer_epoch,
false,
)
.await?;
debug!(?rollback_producer_id, ?rollback_producer_epoch, ?error_code);
if error_code == ErrorCode::None {
return self
.init_producer(
Some(transaction_id),
transaction_timeout_ms,
producer_id,
producer_epoch,
)
.await;
} else {
Ok(ProducerIdResponse {
id: -1,
epoch: -1,
error: ErrorCode::UnknownServerError,
})
}
}
}
} else {
self.meta
.with_mut(&self.object_store, |meta| {
debug!(?meta);
match (producer_id, producer_epoch) {
(Some(-1), Some(-1)) => {
let producer = meta
.producers
.last_key_value()
.map_or(1.into(), |(k, _v)| k + 1);
let epoch = 0;
let mut pd = ProducerDetail::default();
assert_eq!(None, pd.sequences.insert(epoch, BTreeMap::new()));
debug!(?producer, ?pd);
assert_eq!(None, meta.producers.insert(producer, pd));
Ok(ProducerIdResponse {
id: producer,
epoch,
..Default::default()
})
}
(producer, epoch) => {
error!(?producer, ?epoch);
Ok(ProducerIdResponse {
id: -1,
epoch: -1,
error: ErrorCode::UnknownServerError,
})
}
}
})
.await
}
}
async fn txn_add_offsets(
&self,
_transaction_id: &str,
_producer_id: i64,
_producer_epoch: i16,
_group_id: &str,
) -> Result<ErrorCode> {
Ok(ErrorCode::None)
}
async fn txn_add_partitions(
&self,
partitions: TxnAddPartitionsRequest,
) -> Result<TxnAddPartitionsResponse> {
match partitions {
TxnAddPartitionsRequest::VersionZeroToThree {
transaction_id,
producer_id,
producer_epoch,
ref topics,
} => {
self.meta
.with_mut(&self.object_store, |meta| {
let Some(transaction) = meta.transactions.get_mut(&transaction_id) else {
let mut results = vec![];
for topic in topics {
let mut results_by_partition = vec![];
for partition_index in topic.partitions.as_deref().unwrap_or(&[]) {
results_by_partition.push(
AddPartitionsToTxnPartitionResult::default()
.partition_index(*partition_index)
.partition_error_code(
ErrorCode::TransactionalIdNotFound.into(),
),
);
}
results.push(
AddPartitionsToTxnTopicResult::default()
.name(topic.name.clone())
.results_by_partition(Some(results_by_partition)),
)
}
return Ok(TxnAddPartitionsResponse::VersionZeroToThree(results));
};
if transaction.producer != producer_id {
let mut results = vec![];
for topic in topics {
let mut results_by_partition = vec![];
for partition_index in topic.partitions.as_deref().unwrap_or(&[]) {
results_by_partition.push(
AddPartitionsToTxnPartitionResult::default()
.partition_index(*partition_index)
.partition_error_code(
ErrorCode::UnknownProducerId.into(),
),
);
}
results.push(
AddPartitionsToTxnTopicResult::default()
.name(topic.name.clone())
.results_by_partition(Some(results_by_partition)),
)
}
return Ok(TxnAddPartitionsResponse::VersionZeroToThree(results));
}
let Some(mut current_epoch) = transaction.epochs.last_entry() else {
let mut results = vec![];
for topic in topics {
let mut results_by_partition = vec![];
for partition_index in topic.partitions.as_deref().unwrap_or(&[]) {
results_by_partition.push(
AddPartitionsToTxnPartitionResult::default()
.partition_index(*partition_index)
.partition_error_code(ErrorCode::ProducerFenced.into()),
);
}
results.push(
AddPartitionsToTxnTopicResult::default()
.name(topic.name.clone())
.results_by_partition(Some(results_by_partition)),
)
}
return Ok(TxnAddPartitionsResponse::VersionZeroToThree(results));
};
if &producer_epoch != current_epoch.key() {
let mut results = vec![];
for topic in topics {
let mut results_by_partition = vec![];
for partition_index in topic.partitions.as_deref().unwrap_or(&[]) {
results_by_partition.push(
AddPartitionsToTxnPartitionResult::default()
.partition_index(*partition_index)
.partition_error_code(ErrorCode::ProducerFenced.into()),
);
}
results.push(
AddPartitionsToTxnTopicResult::default()
.name(topic.name.clone())
.results_by_partition(Some(results_by_partition)),
)
}
return Ok(TxnAddPartitionsResponse::VersionZeroToThree(results));
}
let txn_detail = current_epoch.get_mut();
let mut results = vec![];
for topic in topics {
let mut results_by_partition = vec![];
for partition_index in topic.partitions.as_deref().unwrap_or(&[]) {
_ = txn_detail
.produces
.entry(topic.name.clone())
.or_default()
.entry(*partition_index)
.or_default();
results_by_partition.push(
AddPartitionsToTxnPartitionResult::default()
.partition_index(*partition_index)
.partition_error_code(i16::from(ErrorCode::None)),
);
}
results.push(
AddPartitionsToTxnTopicResult::default()
.name(topic.name.clone())
.results_by_partition(Some(results_by_partition)),
)
}
txn_detail.started_at = Some(SystemTime::now());
txn_detail.state = Some(TxnState::Begin);
Ok(TxnAddPartitionsResponse::VersionZeroToThree(results))
})
.await
}
TxnAddPartitionsRequest::VersionFourPlus { .. } => {
todo!()
}
}
}
async fn txn_offset_commit(
&self,
offsets: TxnOffsetCommitRequest,
) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
self.meta
.with_mut(&self.object_store, |meta| {
let Some(transaction) = meta.transactions.get_mut(&offsets.transaction_id) else {
return Self::txn_offset_commit_response_error(
&offsets,
ErrorCode::TransactionalIdNotFound,
);
};
if transaction.producer != offsets.producer_id {
return Self::txn_offset_commit_response_error(
&offsets,
ErrorCode::UnknownProducerId,
);
}
let Some(mut current_epoch) = transaction.epochs.last_entry() else {
return Self::txn_offset_commit_response_error(
&offsets,
ErrorCode::ProducerFenced,
);
};
if &offsets.producer_epoch != current_epoch.key() {
return Self::txn_offset_commit_response_error(
&offsets,
ErrorCode::ProducerFenced,
);
}
let txn_detail = current_epoch.get_mut();
let mut responses = vec![];
for topic in &offsets.topics {
let mut partition_responses = vec![];
if let Some(partitions) = topic.partitions.as_deref() {
for partition in partitions {
_ = txn_detail
.offsets
.entry(offsets.group_id.clone())
.or_default()
.entry(topic.name.clone())
.or_default()
.insert(
partition.partition_index,
TxnCommitOffset {
committed_offset: partition.committed_offset,
leader_epoch: partition.committed_leader_epoch,
metadata: partition.committed_metadata.clone(),
},
);
partition_responses.push(
TxnOffsetCommitResponsePartition::default()
.partition_index(partition.partition_index)
.error_code(ErrorCode::None.into()),
);
}
}
responses.push(
TxnOffsetCommitResponseTopic::default()
.name(topic.name.to_string())
.partitions(Some(partition_responses)),
);
}
Ok(responses)
})
.await
}
async fn txn_end(
&self,
transaction_id: &str,
producer_id: i64,
producer_epoch: i16,
committed: bool,
) -> Result<ErrorCode> {
let produced = self
.meta
.with_mut(&self.object_store, |meta| {
debug!(transactions = ?meta.transactions);
let Some(transaction) = meta.transactions.get_mut(transaction_id) else {
return Err(Error::Api(ErrorCode::TransactionalIdNotFound));
};
if transaction.producer != producer_id {
return Err(Error::Api(ErrorCode::UnknownProducerId));
}
let Some(mut current_epoch) = transaction.epochs.last_entry() else {
return Err(Error::Api(ErrorCode::ProducerFenced));
};
if &producer_epoch != current_epoch.key() {
return Err(Error::Api(ErrorCode::ProducerFenced));
}
let txn_detail = current_epoch.get_mut();
let mut produced = vec![];
if txn_detail.state == Some(TxnState::Begin) {
assert_eq!(
Some(TxnState::Begin),
txn_detail.state.replace(if committed {
TxnState::PrepareCommit
} else {
TxnState::PrepareAbort
})
);
for (topic, partitions) in &txn_detail.produces {
for (partition, offset_range) in partitions {
debug!(?topic, partition, ?offset_range);
if offset_range.is_some() {
produced.push(Topition::new(topic.to_owned(), *partition));
}
}
}
}
Ok(produced)
})
.await
.inspect(|produced| debug!(?produced))
.inspect_err(|err| error!(?err))?;
for topition in produced {
debug!(?topition);
let control_batch: Bytes = if committed {
ControlBatch::default().commit().try_into()?
} else {
ControlBatch::default().abort().try_into()?
};
let end_transaction_marker: Bytes = EndTransactionMarker::default().try_into()?;
let batch = inflated::Batch::builder()
.record(
Record::builder()
.key(control_batch.into())
.value(end_transaction_marker.into()),
)
.attributes(
BatchAttribute::default()
.control(true)
.transaction(true)
.into(),
)
.producer_id(producer_id)
.producer_epoch(producer_epoch)
.base_sequence(-1)
.build()
.and_then(TryInto::try_into)
.inspect(|deflated| debug!(?deflated))?;
_ = self
.produce(Some(transaction_id), &topition, batch)
.await
.inspect(|offset| {
debug!(
offset,
?topition,
producer_id,
producer_epoch,
transaction_id,
committed,
)
})
.inspect_err(|err| {
error!(
?err,
?topition,
producer_id,
producer_epoch,
transaction_id,
committed,
)
})?;
}
let offsets_to_commit = self
.meta
.with_mut(&self.object_store, |meta| {
debug!(transactions = ?meta.transactions);
let Some(transaction) = meta.transactions.get_mut(transaction_id) else {
return Err(Error::Api(ErrorCode::TransactionalIdNotFound));
};
if transaction.producer != producer_id {
return Err(Error::Api(ErrorCode::UnknownProducerId));
}
let Some(current_epoch) = transaction.epochs.last_entry() else {
return Err(Error::Api(ErrorCode::ProducerFenced));
};
if &producer_epoch != current_epoch.key() {
return Err(Error::Api(ErrorCode::ProducerFenced));
}
let mut overlaps =
meta.overlapping_transactions(transaction_id, producer_id, producer_epoch)?;
debug!(?overlaps);
let mut offsets_to_commit: BTreeMap<
Group,
BTreeMap<Topic, BTreeMap<Partition, TxnCommitOffset>>,
> = BTreeMap::new();
if overlaps.iter().all(|txn_id| txn_id.state.is_prepared()) {
let txn_ids = {
overlaps.push(TxnId {
transaction: transaction_id.into(),
producer_id,
producer_epoch,
state: if committed {
TxnState::PrepareCommit
} else {
TxnState::PrepareAbort
},
});
overlaps
};
for txn_id in txn_ids {
debug!(?txn_id);
if let Some(txn) = meta.transactions.get_mut(txn_id.transaction.as_str())
&& let Some(txn_detail) = txn.epochs.get_mut(&txn_id.producer_epoch)
{
debug!(?txn_detail);
match txn_detail.state {
None | Some(TxnState::PrepareCommit) => {
_ = txn_detail.state.replace(TxnState::Committed);
}
Some(TxnState::PrepareAbort) => {
_ = txn_detail.state.replace(TxnState::Aborted);
}
otherwise => {
warn!(
transaction = txn_id.transaction,
producer = txn_id.producer_id,
epoch = txn_id.producer_epoch,
?otherwise,
);
continue;
}
}
if txn_id.state == TxnState::PrepareCommit {
for (group, topics) in txn_detail.offsets.iter() {
for (topic, partitions) in topics.iter() {
for (partition, committed_offset) in partitions {
_ = offsets_to_commit
.entry(group.to_owned())
.or_default()
.entry(topic.to_owned())
.or_default()
.insert(*partition, committed_offset.to_owned());
}
}
}
}
txn_detail.produces.clear();
txn_detail.offsets.clear();
_ = txn_detail.started_at.take();
}
}
}
Ok(offsets_to_commit)
})
.await
.inspect(|outcome| debug!(?outcome))
.inspect_err(|err| error!(?err))?;
debug!(?offsets_to_commit);
for (group, topics) in offsets_to_commit.iter() {
let mut offsets = vec![];
for (topic, partitions) in topics.iter() {
for (partition, txn_co) in partitions {
let tp = Topition::new(topic.to_owned(), *partition);
let ocr = OffsetCommitRequest {
offset: txn_co.committed_offset,
leader_epoch: txn_co.leader_epoch,
timestamp: None,
metadata: txn_co.metadata.clone(),
};
offsets.push((tp, ocr));
}
}
_ = self.offset_commit(group, None, &offsets[..]).await?;
}
Ok(ErrorCode::None)
}
async fn maintain(&self, _now: SystemTime) -> Result<()> {
if let Some(ref lake) = self.lake {
return lake
.maintain()
.await
.inspect(|maintain| debug!(?maintain))
.inspect_err(|err| debug!(?err))
.map_err(Into::into);
}
Ok(())
}
async fn cluster_id(&self) -> Result<String> {
Ok(self.cluster.clone())
}
async fn node(&self) -> Result<i32> {
Ok(self.node)
}
async fn advertised_listener(&self) -> Result<Url> {
Ok(self.advertised_listener.clone())
}
#[instrument(skip_all)]
async fn delete_user_scram_credential(
&self,
_user: &str,
_mechanism: ScramMechanism,
) -> Result<()> {
Ok(())
}
async fn upsert_user_scram_credential(
&self,
_user: &str,
_mechanism: ScramMechanism,
_credential: ScramCredential,
) -> Result<()> {
Ok(())
}
async fn user_scram_credential(
&self,
_user: &str,
_mechanism: ScramMechanism,
) -> Result<Option<ScramCredential>> {
Ok(None)
}
#[instrument(skip_all)]
async fn ping(&self) -> Result<()> {
let _ = self.object_store.list(Some(&Path::from("/"))).next().await;
Ok(())
}
}
fn object_store_error_name(error: &object_store::Error) -> &'static str {
match error {
object_store::Error::Precondition { .. } => "pre_condition",
object_store::Error::AlreadyExists { .. } => "already_exists",
object_store::Error::NotModified { .. } => "not_modified",
object_store::Error::NotFound { .. } => "not_found",
otherwise => {
debug!(?otherwise);
"otherwise"
}
}
}
#[derive(Debug, Clone)]
struct Metron<O> {
request_duration: Histogram<u64>,
request_error: Counter<u64>,
cluster: String,
object_store: O,
}
impl<O> Display for Metron<O> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Metron").finish()
}
}
impl<O> Metron<O>
where
O: ObjectStore,
{
fn new(object_store: O, cluster: &str) -> Self {
Self {
cluster: cluster.into(),
request_duration: METER
.u64_histogram("tansu_object_store_request_duration")
.with_unit("ms")
.with_description("The object store request latencies in milliseconds")
.build(),
request_error: METER
.u64_counter("tansu_object_store_request_error")
.with_description("The object store request errors")
.build(),
object_store,
}
}
}
#[async_trait]
impl<O> ObjectStore for Metron<O>
where
O: ObjectStore,
{
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult, object_store::Error> {
debug!(%location, ?opts);
let execute_start = SystemTime::now();
let mut attributes = vec![
KeyValue::new("method", "put_opts"),
KeyValue::new("cluster", self.cluster.clone()),
];
self.object_store
.put_opts(location, payload, opts.clone())
.await
.inspect(|put_result| {
debug!(%location, etag = ?put_result.e_tag, version = ?put_result.version);
self.request_duration.record(
execute_start
.elapsed()
.map_or(0, |duration| duration.as_millis() as u64),
attributes.as_ref(),
)
})
.inspect_err(|err| {
debug!(%location, opts = ?opts, err = ?err);
let mut additional = vec![KeyValue::new("reason", object_store_error_name(err))];
additional.append(&mut attributes);
self.request_error.add(1, &additional[..]);
})
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>, object_store::Error> {
debug!(%location, ?opts);
let execute_start = SystemTime::now();
let mut attributes = vec![
KeyValue::new("method", "put_multipart_opts"),
KeyValue::new("cluster", self.cluster.clone()),
];
self.object_store
.put_multipart_opts(location, opts)
.await
.inspect(|_put_result| {
self.request_duration.record(
execute_start
.elapsed()
.map_or(0, |duration| duration.as_millis() as u64),
attributes.as_ref(),
)
})
.inspect_err(|err| {
let mut additional = vec![KeyValue::new("reason", object_store_error_name(err))];
additional.append(&mut attributes);
self.request_error.add(1, &additional[..]);
})
}
#[instrument(skip_all, fields(%location), ret)]
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> Result<GetResult, object_store::Error> {
debug!(?options);
let execute_start = SystemTime::now();
let mut attributes = vec![
KeyValue::new("method", "get_opts"),
KeyValue::new("cluster", self.cluster.clone()),
];
self.object_store
.get_opts(location, options.clone())
.await
.inspect(|get_result| {
debug!(meta = ?get_result.meta);
self.request_duration.record(
execute_start
.elapsed()
.map_or(0, |duration| duration.as_millis() as u64),
attributes.as_ref(),
)
})
.inspect_err(|err| {
debug!(?err);
let mut additional = vec![KeyValue::new("reason", object_store_error_name(err))];
additional.append(&mut attributes);
self.request_error.add(1, &additional[..]);
})
}
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path, object_store::Error>>,
) -> BoxStream<'static, Result<Path, object_store::Error>> {
debug!("delete_stream");
let cluster = self.cluster.clone();
let request_duration = self.request_duration.clone();
let request_error = self.request_error.clone();
let inner = self.object_store.delete_stream(locations);
Box::pin(inner.inspect(move |result| {
let attributes = vec![
KeyValue::new("method", "delete_stream"),
KeyValue::new("cluster", cluster.clone()),
];
if let Err(err) = result {
let mut additional = vec![KeyValue::new("reason", object_store_error_name(err))];
additional.extend(attributes);
request_error.add(1, &additional[..]);
} else {
request_duration.record(0, attributes.as_ref());
}
}))
}
fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta, object_store::Error>> {
debug!(?prefix);
self.object_store.list(prefix)
}
async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> Result<ListResult, object_store::Error> {
debug!(?prefix);
let execute_start = SystemTime::now();
let mut attributes = vec![
KeyValue::new("method", "list_with_delimiter"),
KeyValue::new("cluster", self.cluster.clone()),
];
if let Some(prefix) = prefix {
attributes.push(KeyValue::new("prefix", prefix.to_string()));
}
self.object_store
.list_with_delimiter(prefix)
.await
.inspect(|_list_result| {
debug!(?prefix);
self.request_duration.record(
execute_start
.elapsed()
.map_or(0, |duration| duration.as_millis() as u64),
attributes.as_ref(),
)
})
.inspect_err(|err| {
debug!(?prefix, err = ?err);
let mut additional = vec![KeyValue::new("reason", object_store_error_name(err))];
additional.append(&mut attributes);
self.request_error.add(1, &additional[..]);
})
}
async fn copy_opts(
&self,
from: &Path,
to: &Path,
opts: CopyOptions,
) -> Result<(), object_store::Error> {
debug!(%from, %to, ?opts);
let execute_start = SystemTime::now();
let mut attributes = vec![
KeyValue::new("method", "copy_opts"),
KeyValue::new("cluster", self.cluster.clone()),
];
self.object_store
.copy_opts(from, to, opts)
.await
.inspect(|_| {
debug!(%from, %to);
self.request_duration.record(
execute_start
.elapsed()
.map_or(0, |duration| duration.as_millis() as u64),
attributes.as_ref(),
)
})
.inspect_err(|err| {
debug!(%from, %to, err = ?err);
let mut additional = vec![KeyValue::new("reason", object_store_error_name(err))];
additional.append(&mut attributes);
self.request_error.add(1, &additional[..]);
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn range_check() {
let map = BTreeMap::from([(3, "a"), (5, "b"), (8, "c")]);
assert_eq!(Some((&3, &"a")), map.range(2..).next());
assert_eq!(Some((&5, &"b")), map.range(4..).next());
assert_eq!(None, map.range(9..).next());
}
#[test]
fn schema_change() -> Result<()> {
#[derive(
Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
)]
struct X0 {
low: Option<i64>,
high: Option<i64>,
}
let low = Some(6);
let high = Some(66);
let x0 = X0 { low, high };
let encoded = serde_json::to_string(&x0)?;
#[derive(
Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
)]
struct X1 {
low: Option<i64>,
high: Option<i64>,
timestamps: Option<BTreeMap<i64, i64>>,
}
let x1: X1 = serde_json::from_str(&encoded[..])?;
assert_eq!(low, x1.low);
assert_eq!(high, x1.high);
assert!(x1.timestamps.is_none());
Ok(())
}
}