use std::{
collections::BTreeMap,
fmt::Debug,
sync::{Arc, LazyLock},
time::{Duration, SystemTime},
};
use async_trait::async_trait;
use nisshi_sans_io::{
ConfigResource, ErrorCode, IsolationLevel, ListOffset, ScramMechanism,
create_topics_request::CreatableTopic, delete_groups_response::DeletableGroupResult,
delete_records_request::DeleteRecordsTopic, delete_records_response::DeleteRecordsTopicResult,
describe_cluster_response::DescribeClusterBroker,
describe_configs_response::DescribeConfigsResult,
describe_topic_partitions_response::DescribeTopicPartitionsResponseTopic,
incremental_alter_configs_request::AlterConfigsResource,
incremental_alter_configs_response::AlterConfigsResourceResponse,
list_groups_response::ListedGroup, record::deflated,
txn_offset_commit_response::TxnOffsetCommitResponseTopic,
};
use opentelemetry::{KeyValue, metrics::Histogram};
use tokio::sync::Semaphore;
use tracing::instrument;
use url::Url;
use uuid::Uuid;
use crate::{
BrokerRegistrationRequest, GroupDetail, ListOffsetResponse, METER, MetadataResponse,
NamedGroupDetail, OffsetCommitRequest, OffsetStage, ProducerIdResponse, Result,
ScramCredential, Storage, TopicId, Topition, TxnAddPartitionsRequest, TxnAddPartitionsResponse,
TxnOffsetCommitRequest, UpdateError, Version,
};
static SEMAPHORE_ACQUIRE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
METER
.u64_histogram("nisshi_storage_proxy_semaphore_acquire_duration")
.with_boundaries(
[
0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0,
1000.0,
]
.into(),
)
.with_unit("ms")
.with_description("Storage proxy semaphore acquisition duration in ms")
.build()
});
#[derive(Clone, Debug)]
pub(crate) struct SemaphoreProxy<G> {
storage: G,
semaphore: Arc<Semaphore>,
}
impl<G> SemaphoreProxy<G> {
pub(crate) fn new(storage: G) -> Self {
Self {
storage,
semaphore: Arc::new(Semaphore::new(1)),
}
}
}
fn elapsed_millis(start: SystemTime) -> u64 {
start
.elapsed()
.map_or(0, |duration| duration.as_millis() as u64)
}
#[async_trait]
impl<G> Storage for SemaphoreProxy<G>
where
G: Storage + Clone,
{
async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "register_broker")],
)
})?;
self.storage.register_broker(broker_registration).await
}
async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "create_topic")],
)
})?;
self.storage.create_topic(topic, validate_only).await
}
async fn incremental_alter_resource(
&self,
resource: AlterConfigsResource,
) -> Result<AlterConfigsResourceResponse> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "incremental_alter_resource")],
)
})?;
self.storage.incremental_alter_resource(resource).await
}
async fn delete_records(
&self,
topics: &[DeleteRecordsTopic],
) -> Result<Vec<DeleteRecordsTopicResult>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "delete_records")],
)
})?;
self.storage.delete_records(topics).await
}
async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "delete_topic")],
)
})?;
self.storage.delete_topic(topic).await
}
async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "brokers")],
)
})?;
self.storage.brokers().await
}
#[instrument(skip_all, fields(transaction_id, topic = topition.topic, partition = topition.partition))]
async fn produce(
&self,
transaction_id: Option<&str>,
topition: &Topition,
deflated: deflated::Batch,
) -> Result<i64> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "produce")],
)
})?;
self.storage
.produce(transaction_id, topition, deflated)
.await
}
async fn fetch(
&self,
topition: &'_ Topition,
offset: i64,
min_bytes: u32,
max_bytes: u32,
isolation: IsolationLevel,
max_wait: Duration,
) -> Result<Vec<deflated::Batch>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "fetch")],
)
})?;
self.storage
.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
.await
}
async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "offset_stage")],
)
})?;
self.storage.offset_stage(topition).await
}
async fn list_offsets(
&self,
isolation_level: IsolationLevel,
offsets: &[(Topition, ListOffset)],
) -> Result<Vec<(Topition, ListOffsetResponse)>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "list_offsets")],
)
})?;
self.storage.list_offsets(isolation_level, offsets).await
}
async fn offset_commit(
&self,
group_id: &str,
retention_time_ms: Option<Duration>,
offsets: &[(Topition, OffsetCommitRequest)],
) -> Result<Vec<(Topition, ErrorCode)>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "offset_commit")],
)
})?;
self.storage
.offset_commit(group_id, retention_time_ms, offsets)
.await
}
async fn offset_fetch(
&self,
group_id: Option<&str>,
topics: &[Topition],
require_stable: Option<bool>,
) -> Result<BTreeMap<Topition, i64>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "offset_fetch")],
)
})?;
self.storage
.offset_fetch(group_id, topics, require_stable)
.await
}
async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "committed_offset_topitions")],
)
})?;
self.storage.committed_offset_topitions(group_id).await
}
async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "metadata")],
)
})?;
self.storage.metadata(topics).await
}
async fn upsert_user_scram_credential(
&self,
user: &str,
mechanism: ScramMechanism,
credential: ScramCredential,
) -> Result<()> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "upsert_user_scram_credential")],
)
})?;
self.storage
.upsert_user_scram_credential(user, mechanism, credential)
.await
}
async fn delete_user_scram_credential(
&self,
user: &str,
mechanism: ScramMechanism,
) -> Result<()> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "delete_user_scram_credential")],
)
})?;
self.storage
.delete_user_scram_credential(user, mechanism)
.await
}
async fn user_scram_credential(
&self,
user: &str,
mechanism: ScramMechanism,
) -> Result<Option<ScramCredential>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "user_scram_credential")],
)
})?;
self.storage.user_scram_credential(user, mechanism).await
}
async fn describe_config(
&self,
name: &str,
resource: ConfigResource,
keys: Option<&[String]>,
) -> Result<DescribeConfigsResult> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "describe_config")],
)
})?;
self.storage.describe_config(name, resource, keys).await
}
async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "list_groups")],
)
})?;
self.storage.list_groups(states_filter).await
}
async fn delete_groups(
&self,
group_ids: Option<&[String]>,
) -> Result<Vec<DeletableGroupResult>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "delete_groups")],
)
})?;
self.storage.delete_groups(group_ids).await
}
async fn describe_groups(
&self,
group_ids: Option<&[String]>,
include_authorized_operations: bool,
) -> Result<Vec<NamedGroupDetail>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "describe_groups")],
)
})?;
self.storage
.describe_groups(group_ids, include_authorized_operations)
.await
}
async fn describe_topic_partitions(
&self,
topics: Option<&[TopicId]>,
partition_limit: i32,
cursor: Option<Topition>,
) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "describe_topic_partitions")],
)
})?;
self.storage
.describe_topic_partitions(topics, partition_limit, cursor)
.await
}
async fn update_group(
&self,
group_id: &str,
detail: GroupDetail,
version: Option<Version>,
) -> Result<Version, UpdateError<GroupDetail>> {
let start = SystemTime::now();
let _permit = self
.semaphore
.acquire()
.await
.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "update_group")],
)
})
.map_err(|err| UpdateError::Error(err.into()))?;
self.storage.update_group(group_id, detail, version).await
}
async fn init_producer(
&self,
transaction_id: Option<&str>,
transaction_timeout_ms: i32,
producer_id: Option<i64>,
producer_epoch: Option<i16>,
) -> Result<ProducerIdResponse> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "init_producer")],
)
})?;
self.storage
.init_producer(
transaction_id,
transaction_timeout_ms,
producer_id,
producer_epoch,
)
.await
}
async fn txn_add_offsets(
&self,
transaction_id: &str,
producer_id: i64,
producer_epoch: i16,
group_id: &str,
) -> Result<ErrorCode> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "txn_add_offsets")],
)
})?;
self.storage
.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
.await
}
async fn txn_add_partitions(
&self,
partitions: TxnAddPartitionsRequest,
) -> Result<TxnAddPartitionsResponse> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "txn_add_partitions")],
)
})?;
self.storage.txn_add_partitions(partitions).await
}
async fn txn_offset_commit(
&self,
offsets: TxnOffsetCommitRequest,
) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "txn_offset_commit")],
)
})?;
self.storage.txn_offset_commit(offsets).await
}
async fn txn_end(
&self,
transaction_id: &str,
producer_id: i64,
producer_epoch: i16,
committed: bool,
) -> Result<ErrorCode> {
let start = SystemTime::now();
let _permit = self.semaphore.acquire().await.inspect(|_| {
SEMAPHORE_ACQUIRE_DURATION.record(
elapsed_millis(start),
&[KeyValue::new("operation", "txn_end")],
)
})?;
self.storage
.txn_end(transaction_id, producer_id, producer_epoch, committed)
.await
}
async fn maintain(&self, now: SystemTime) -> Result<()> {
self.storage.maintain(now).await
}
async fn cluster_id(&self) -> Result<String> {
self.storage.cluster_id().await
}
async fn node(&self) -> Result<i32> {
self.storage.node().await
}
async fn advertised_listener(&self) -> Result<Url> {
self.storage.advertised_listener().await
}
async fn ping(&self) -> Result<()> {
self.storage.ping().await
}
}