1use async_trait::async_trait;
115use bytes::{Bytes, TryGetError};
116
117use console::Emoji;
118#[cfg(any(feature = "libsql", feature = "postgres"))]
119use deadpool::managed::PoolError;
120#[cfg(feature = "dynostore")]
121use dynostore::DynoStore;
122
123use glob::{GlobError, PatternError};
124
125use indicatif::{ProgressBar, ProgressStyle};
126#[cfg(feature = "dynostore")]
127use object_store::memory::InMemory;
128
129#[cfg(feature = "dynostore")]
130use object_store::aws::{AmazonS3Builder, S3ConditionalPut};
131
132use opentelemetry::{
133 InstrumentationScope, KeyValue, global,
134 metrics::{Counter, Meter},
135};
136use opentelemetry_semantic_conventions::SCHEMA_URL;
137
138#[cfg(feature = "postgres")]
139use pg::Postgres;
140
141use regex::Regex;
142use serde::{Deserialize, Serialize};
143#[cfg(any(feature = "libsql", feature = "postgres"))]
144use std::error;
145use std::{
146 array::TryFromSliceError,
147 collections::BTreeMap,
148 ffi::OsString,
149 fmt::{self, Debug, Display, Formatter},
150 fs::DirEntry,
151 io,
152 marker::PhantomData,
153 num::{ParseIntError, TryFromIntError},
154 path::PathBuf,
155 result,
156 str::FromStr,
157 sync::{Arc, LazyLock, PoisonError},
158 time::{Duration, SystemTime, SystemTimeError},
159};
160use tansu_sans_io::{
161 Body, ConfigResource, ErrorCode, IsolationLevel, ListOffset, NULL_TOPIC_ID, ScramMechanism,
162 add_partitions_to_txn_request::{
163 AddPartitionsToTxnRequest, AddPartitionsToTxnTopic, AddPartitionsToTxnTransaction,
164 },
165 add_partitions_to_txn_response::{AddPartitionsToTxnResult, AddPartitionsToTxnTopicResult},
166 consumer_group_describe_response,
167 create_topics_request::CreatableTopic,
168 delete_groups_response::DeletableGroupResult,
169 delete_records_request::DeleteRecordsTopic,
170 delete_records_response::DeleteRecordsTopicResult,
171 delete_topics_request::DeleteTopicState,
172 describe_cluster_response::DescribeClusterBroker,
173 describe_configs_response::DescribeConfigsResult,
174 describe_groups_response,
175 describe_topic_partitions_request::{Cursor, TopicRequest},
176 describe_topic_partitions_response::DescribeTopicPartitionsResponseTopic,
177 fetch_request::FetchTopic,
178 incremental_alter_configs_request::AlterConfigsResource,
179 incremental_alter_configs_response::AlterConfigsResourceResponse,
180 join_group_response::JoinGroupResponseMember,
181 list_groups_response::ListedGroup,
182 metadata_request::MetadataRequestTopic,
183 metadata_response::{MetadataResponseBroker, MetadataResponseTopic},
184 offset_commit_request::OffsetCommitRequestPartition,
185 record::deflated,
186 to_system_time, to_timestamp,
187 txn_offset_commit_request::TxnOffsetCommitRequestTopic,
188 txn_offset_commit_response::TxnOffsetCommitResponseTopic,
189};
190use tansu_schema::{Registry, lake::House};
191use tokio_util::sync::CancellationToken;
192use tracing::{debug, instrument};
193use tracing_subscriber::filter::ParseError;
194use url::Url;
195use uuid::Uuid;
196
197#[cfg(feature = "dynostore")]
198mod dynostore;
199
200mod null;
201
202#[cfg(feature = "postgres")]
203mod pg;
204
205mod proxy;
206mod service;
207
208pub use service::{
209 AlterUserScramCredentialsService, ChannelRequestLayer, ChannelRequestService,
210 ConsumerGroupDescribeService, CreateAclsService, CreateTopicsService, DeleteGroupsService,
211 DeleteRecordsService, DeleteTopicsService, DescribeAclsService, DescribeClusterService,
212 DescribeConfigsService, DescribeGroupsService, DescribeTopicPartitionsService,
213 DescribeUserScramCredentialsService, FetchService, FindCoordinatorService,
214 GetTelemetrySubscriptionsService, IncrementalAlterConfigsService, InitProducerIdService,
215 ListGroupsService, ListOffsetsService, ListPartitionReassignmentsService, MetadataService,
216 ProduceService, Request, RequestChannelService, RequestLayer, RequestReceiver, RequestSender,
217 RequestService, RequestStorageService, Response, TxnAddOffsetsService, TxnAddPartitionService,
218 TxnOffsetCommitService, bounded_channel,
219};
220
221#[cfg(feature = "slatedb")]
222pub mod slate;
223
224#[cfg(any(feature = "libsql", feature = "postgres", feature = "turso"))]
225pub(crate) mod sql;
226
227#[cfg(feature = "libsql")]
228mod lite;
229
230#[cfg(feature = "dynostore")]
231mod os;
232
233#[cfg(feature = "turso")]
234mod limbo;
235
236#[derive(Clone, Debug, thiserror::Error)]
238pub enum Error {
239 Api(ErrorCode),
240
241 ChronoParse(#[from] chrono::ParseError),
242
243 #[cfg(any(feature = "postgres", feature = "libsql"))]
244 DeadPoolBuild(#[from] deadpool::managed::BuildError),
245
246 Decode(Bytes),
247
248 FeatureNotEnabled {
249 feature: String,
250 message: String,
251 },
252
253 Glob(Arc<GlobError>),
254 Io(Arc<io::Error>),
255 LessThanBaseOffset {
256 offset: i64,
257 base_offset: i64,
258 },
259 LessThanLastOffset {
260 offset: i64,
261 last_offset: Option<i64>,
262 },
263
264 #[cfg(feature = "libsql")]
265 LibSql(Arc<libsql::Error>),
266
267 LessThanMaxTime {
268 time: i64,
269 max_time: Option<i64>,
270 },
271 LessThanMinTime {
272 time: i64,
273 min_time: Option<i64>,
274 },
275 Message(String),
276 NoSuchEntry {
277 nth: u32,
278 },
279 NoSuchOffset(i64),
280 OsString(OsString),
281
282 #[cfg(any(feature = "dynostore", feature = "slatedb"))]
283 ObjectStore(Arc<object_store::Error>),
284
285 ParseFilter(Arc<ParseError>),
286 Pattern(Arc<PatternError>),
287 ParseInt(#[from] ParseIntError),
288 PhantomCached(),
289 Poison,
290
291 #[cfg(any(feature = "libsql", feature = "postgres"))]
292 Pool(Arc<Box<dyn error::Error + Send + Sync>>),
293
294 #[cfg(feature = "slatedb")]
295 Postcard(#[from] postcard::Error),
296
297 Regex(#[from] regex::Error),
298
299 SansIo(#[from] tansu_sans_io::Error),
300
301 Schema(Arc<tansu_schema::Error>),
302
303 Rustls(#[from] rustls::Error),
304
305 SegmentEmpty(Topition),
306
307 SegmentMissing {
308 topition: Topition,
309 offset: Option<i64>,
310 },
311
312 SerdeJson(Arc<serde_json::Error>),
313
314 #[cfg(feature = "slatedb")]
315 Slate(Arc<slatedb::Error>),
316
317 SystemTime(#[from] SystemTimeError),
318
319 #[cfg(feature = "postgres")]
320 TokioPostgres(Arc<tokio_postgres::error::Error>),
321 TryFromInt(#[from] TryFromIntError),
322 TryFromSlice(#[from] TryFromSliceError),
323
324 TryGet(Arc<TryGetError>),
325
326 #[cfg(feature = "turso")]
327 Turso(Arc<turso::Error>),
328
329 UnexpectedBody(Box<Body>),
330
331 UnexpectedServiceResponse(Box<Response>),
332
333 #[cfg(feature = "turso")]
334 UnexpectedValue(turso::Value),
335
336 UnknownCacheKey(String),
337
338 UnsupportedStorageUrl(Url),
339 UnexpectedAddPartitionsToTxnRequest(Box<AddPartitionsToTxnRequest>),
340 Url(#[from] url::ParseError),
341 UnknownTxnState(String),
342
343 Uuid(#[from] uuid::Error),
344
345 UnableToSend,
346 OneshotRecv,
347}
348
349impl Display for Error {
350 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
351 write!(f, "{self:?}")
352 }
353}
354
355impl From<TryGetError> for Error {
356 fn from(value: TryGetError) -> Self {
357 Self::TryGet(Arc::new(value))
358 }
359}
360
361impl<T> From<PoisonError<T>> for Error {
362 fn from(_value: PoisonError<T>) -> Self {
363 Self::Poison
364 }
365}
366
367#[cfg(any(feature = "libsql", feature = "postgres"))]
368impl<E> From<PoolError<E>> for Error
369where
370 E: error::Error + Send + Sync + 'static,
371{
372 fn from(value: PoolError<E>) -> Self {
373 Self::Pool(Arc::new(Box::new(value)))
374 }
375}
376
377#[cfg(feature = "libsql")]
378impl From<libsql::Error> for Error {
379 fn from(value: libsql::Error) -> Self {
380 Self::LibSql(Arc::new(value))
381 }
382}
383
384#[cfg(feature = "slatedb")]
385impl From<slatedb::Error> for Error {
386 fn from(value: slatedb::Error) -> Self {
387 Self::Slate(Arc::new(value))
388 }
389}
390
391#[cfg(feature = "turso")]
392impl From<turso::Error> for Error {
393 fn from(value: turso::Error) -> Self {
394 Self::Turso(Arc::new(value))
395 }
396}
397
398impl From<GlobError> for Error {
399 fn from(value: GlobError) -> Self {
400 Self::Glob(Arc::new(value))
401 }
402}
403
404impl From<io::Error> for Error {
405 fn from(value: io::Error) -> Self {
406 Self::Io(Arc::new(value))
407 }
408}
409
410#[cfg(any(feature = "dynostore", feature = "slatedb"))]
411impl From<Arc<object_store::Error>> for Error {
412 fn from(value: Arc<object_store::Error>) -> Self {
413 Self::ObjectStore(value)
414 }
415}
416
417#[cfg(any(feature = "dynostore", feature = "slatedb"))]
418impl From<object_store::Error> for Error {
419 fn from(value: object_store::Error) -> Self {
420 Self::from(Arc::new(value))
421 }
422}
423
424impl From<ParseError> for Error {
425 fn from(value: ParseError) -> Self {
426 Self::ParseFilter(Arc::new(value))
427 }
428}
429
430impl From<PatternError> for Error {
431 fn from(value: PatternError) -> Self {
432 Self::Pattern(Arc::new(value))
433 }
434}
435
436impl From<serde_json::Error> for Error {
437 fn from(value: serde_json::Error) -> Self {
438 Self::from(Arc::new(value))
439 }
440}
441
442impl From<Arc<serde_json::Error>> for Error {
443 fn from(value: Arc<serde_json::Error>) -> Self {
444 Self::SerdeJson(value)
445 }
446}
447
448#[cfg(feature = "postgres")]
449impl From<tokio_postgres::error::Error> for Error {
450 fn from(value: tokio_postgres::error::Error) -> Self {
451 Self::from(Arc::new(value))
452 }
453}
454
455#[cfg(feature = "postgres")]
456impl From<Arc<tokio_postgres::error::Error>> for Error {
457 fn from(value: Arc<tokio_postgres::error::Error>) -> Self {
458 Self::TokioPostgres(value)
459 }
460}
461
462impl From<tansu_schema::Error> for Error {
463 fn from(value: tansu_schema::Error) -> Self {
464 if let tansu_schema::Error::Api(error_code) = value {
465 Self::Api(error_code)
466 } else {
467 Self::Schema(Arc::new(value))
468 }
469 }
470}
471
472pub type Result<T, E = Error> = result::Result<T, E>;
473
474#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
478pub struct Topition {
479 topic: String,
480 partition: i32,
481}
482
483impl Topition {
484 pub fn new(topic: impl Into<String>, partition: i32) -> Self {
485 let topic = topic.into();
486 Self { topic, partition }
487 }
488
489 pub fn topic(&self) -> &str {
490 &self.topic
491 }
492
493 pub fn partition(&self) -> i32 {
494 self.partition
495 }
496}
497
498impl From<Cursor> for Topition {
499 fn from(value: Cursor) -> Self {
500 Self {
501 topic: value.topic_name,
502 partition: value.partition_index,
503 }
504 }
505}
506
507impl TryFrom<&DirEntry> for Topition {
508 type Error = Error;
509
510 fn try_from(value: &DirEntry) -> result::Result<Self, Self::Error> {
511 Regex::new(r"^(?<topic>.+)-(?<partition>\d{10})$")
512 .map_err(Into::into)
513 .and_then(|re| {
514 value
515 .file_name()
516 .into_string()
517 .map_err(Error::OsString)
518 .and_then(|ref file_name| {
519 re.captures(file_name)
520 .ok_or(Error::Message(format!("no captures for {file_name}")))
521 .and_then(|ref captures| {
522 let topic = captures
523 .name("topic")
524 .ok_or(Error::Message(format!("missing topic for {file_name}")))
525 .map(|s| s.as_str().to_owned())?;
526
527 let partition = captures
528 .name("partition")
529 .ok_or(Error::Message(format!(
530 "missing partition for: {file_name}"
531 )))
532 .map(|s| s.as_str())
533 .and_then(|s| str::parse(s).map_err(Into::into))?;
534
535 Ok(Self { topic, partition })
536 })
537 })
538 })
539 }
540}
541
542impl FromStr for Topition {
543 type Err = Error;
544
545 fn from_str(s: &str) -> result::Result<Self, Self::Err> {
546 i32::from_str(&s[s.len() - 10..])
547 .map(|partition| {
548 let topic = String::from(&s[..s.len() - 11]);
549
550 Self { topic, partition }
551 })
552 .map_err(Into::into)
553 }
554}
555
556impl From<&Topition> for PathBuf {
557 fn from(value: &Topition) -> Self {
558 let topic = value.topic.as_str();
559 let partition = value.partition;
560 PathBuf::from(format!("{topic}-{partition:0>10}"))
561 }
562}
563
564#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
568pub struct TopitionOffset {
569 topition: Topition,
570 offset: i64,
571}
572
573impl TopitionOffset {
574 pub fn new(topition: Topition, offset: i64) -> Self {
575 Self { topition, offset }
576 }
577
578 pub fn topition(&self) -> &Topition {
579 &self.topition
580 }
581
582 pub fn offset(&self) -> i64 {
583 self.offset
584 }
585}
586
587impl From<&TopitionOffset> for PathBuf {
588 fn from(value: &TopitionOffset) -> Self {
589 let offset = value.offset;
590 PathBuf::from(value.topition()).join(format!("{offset:0>20}"))
591 }
592}
593
594pub type ListOffsetRequest = ListOffset;
595
596#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
597pub struct ListOffsetResponse {
598 pub error_code: ErrorCode,
599 pub timestamp: Option<SystemTime>,
600 pub offset: Option<i64>,
601}
602
603impl Default for ListOffsetResponse {
604 fn default() -> Self {
605 Self {
606 error_code: ErrorCode::None,
607 timestamp: None,
608 offset: None,
609 }
610 }
611}
612
613impl ListOffsetResponse {
614 pub fn offset(&self) -> Option<i64> {
615 self.offset
616 }
617
618 pub fn timestamp(&self) -> Result<Option<i64>> {
619 self.timestamp.map_or(Ok(None), |system_time| {
620 to_timestamp(&system_time).map(Some).map_err(Into::into)
621 })
622 }
623
624 pub fn error_code(&self) -> ErrorCode {
625 self.error_code
626 }
627}
628
629#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
633pub struct OffsetCommitRequest {
634 offset: i64,
635 leader_epoch: Option<i32>,
636 timestamp: Option<SystemTime>,
637 metadata: Option<String>,
638}
639
640impl OffsetCommitRequest {
641 pub fn offset(self, offset: i64) -> Self {
642 Self { offset, ..self }
643 }
644}
645
646impl TryFrom<&OffsetCommitRequestPartition> for OffsetCommitRequest {
647 type Error = Error;
648
649 fn try_from(value: &OffsetCommitRequestPartition) -> Result<Self, Self::Error> {
650 value
651 .commit_timestamp
652 .map_or(Ok(None), |commit_timestamp| {
653 to_system_time(commit_timestamp)
654 .map(Some)
655 .map_err(Into::into)
656 })
657 .map(|timestamp| Self {
658 offset: value.committed_offset,
659 leader_epoch: value.committed_leader_epoch,
660 timestamp,
661 metadata: value.committed_metadata.clone(),
662 })
663 }
664}
665
666#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
670pub enum TopicId {
671 Name(String),
672 Id(Uuid),
673}
674
675impl FromStr for TopicId {
676 type Err = Error;
677
678 fn from_str(s: &str) -> result::Result<Self, Self::Err> {
679 Ok(Self::Name(s.into()))
680 }
681}
682
683impl From<&str> for TopicId {
684 fn from(value: &str) -> Self {
685 Self::Name(value.to_owned())
686 }
687}
688
689impl From<String> for TopicId {
690 fn from(value: String) -> Self {
691 Self::Name(value)
692 }
693}
694
695impl From<Uuid> for TopicId {
696 fn from(value: Uuid) -> Self {
697 Self::Id(value)
698 }
699}
700
701impl From<[u8; 16]> for TopicId {
702 fn from(value: [u8; 16]) -> Self {
703 Self::Id(Uuid::from_bytes(value))
704 }
705}
706
707impl From<&TopicId> for [u8; 16] {
708 fn from(value: &TopicId) -> Self {
709 match value {
710 TopicId::Id(id) => id.into_bytes(),
711 TopicId::Name(_) => NULL_TOPIC_ID,
712 }
713 }
714}
715
716impl From<&FetchTopic> for TopicId {
717 fn from(value: &FetchTopic) -> Self {
718 if let Some(ref name) = value.topic {
719 Self::Name(name.into())
720 } else if let Some(ref id) = value.topic_id {
721 Self::Id(Uuid::from_bytes(*id))
722 } else {
723 panic!("neither name nor uuid")
724 }
725 }
726}
727
728impl From<&MetadataRequestTopic> for TopicId {
729 fn from(value: &MetadataRequestTopic) -> Self {
730 if let Some(ref name) = value.name {
731 Self::Name(name.into())
732 } else if let Some(ref id) = value.topic_id {
733 Self::Id(Uuid::from_bytes(*id))
734 } else {
735 panic!("neither name nor uuid")
736 }
737 }
738}
739
740impl From<DeleteTopicState> for TopicId {
741 fn from(value: DeleteTopicState) -> Self {
742 match value {
743 DeleteTopicState {
744 name: Some(name),
745 topic_id,
746 ..
747 } if topic_id == NULL_TOPIC_ID => name.into(),
748
749 DeleteTopicState { topic_id, .. } => topic_id.into(),
750 }
751 }
752}
753
754impl From<&TopicRequest> for TopicId {
755 fn from(value: &TopicRequest) -> Self {
756 value.name.to_owned().into()
757 }
758}
759
760impl From<&Topition> for TopicId {
761 fn from(value: &Topition) -> Self {
762 value.topic.to_owned().into()
763 }
764}
765
766#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
770pub struct BrokerRegistrationRequest {
771 pub broker_id: i32,
772 pub cluster_id: String,
773 pub incarnation_id: Uuid,
774 pub rack: Option<String>,
775}
776
777#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
778pub struct MetadataResponse {
779 cluster: Option<String>,
780 controller: Option<i32>,
781 brokers: Vec<MetadataResponseBroker>,
782 topics: Vec<MetadataResponseTopic>,
783}
784
785impl MetadataResponse {
786 pub fn cluster(&self) -> Option<&str> {
787 self.cluster.as_deref()
788 }
789
790 pub fn controller(&self) -> Option<i32> {
791 self.controller
792 }
793
794 pub fn brokers(&self) -> &[MetadataResponseBroker] {
795 self.brokers.as_ref()
796 }
797
798 pub fn topics(&self) -> &[MetadataResponseTopic] {
799 self.topics.as_ref()
800 }
801}
802
803#[derive(
807 Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
808)]
809pub struct OffsetStage {
810 last_stable: i64,
811 high_watermark: i64,
812 log_start: i64,
813}
814
815impl OffsetStage {
816 pub fn last_stable(&self) -> i64 {
817 self.last_stable
818 }
819
820 pub fn high_watermark(&self) -> i64 {
821 self.high_watermark
822 }
823
824 pub fn log_start(&self) -> i64 {
825 self.log_start
826 }
827}
828
829#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
831pub struct GroupMember {
832 pub join_response: JoinGroupResponseMember,
833 pub last_contact: Option<SystemTime>,
834}
835
836#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
840pub enum GroupState {
841 Forming {
842 protocol_type: Option<String>,
843 protocol_name: Option<String>,
844 leader: Option<String>,
845 },
846
847 Formed {
848 protocol_type: String,
849 protocol_name: String,
850 leader: String,
851 assignments: BTreeMap<String, Bytes>,
852 },
853}
854
855impl GroupState {
856 pub fn protocol_type(&self) -> Option<String> {
857 match self {
858 Self::Forming { protocol_type, .. } => protocol_type.clone(),
859 Self::Formed { protocol_type, .. } => Some(protocol_type.clone()),
860 }
861 }
862
863 pub fn protocol_name(&self) -> Option<String> {
864 match self {
865 Self::Forming { protocol_name, .. } => protocol_name.clone(),
866 Self::Formed { protocol_name, .. } => Some(protocol_name.clone()),
867 }
868 }
869
870 pub fn leader(&self) -> Option<String> {
871 match self {
872 Self::Forming { leader, .. } => leader.clone(),
873 Self::Formed { leader, .. } => Some(leader.clone()),
874 }
875 }
876
877 pub fn assignments(&self) -> BTreeMap<String, Bytes> {
878 match self {
879 Self::Forming { .. } => BTreeMap::new(),
880 Self::Formed { assignments, .. } => assignments.clone(),
881 }
882 }
883}
884
885impl Default for GroupState {
886 fn default() -> Self {
887 Self::Forming {
888 protocol_type: None,
889 protocol_name: Some("".into()),
890 leader: None,
891 }
892 }
893}
894
895#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
899pub enum ConsumerGroupState {
900 Unknown,
901 PreparingRebalance,
902 CompletingRebalance,
903 Stable,
904 Dead,
905 Empty,
906 Assigning,
907 Reconciling,
908}
909
910impl Display for ConsumerGroupState {
911 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
912 match self {
913 Self::Unknown => f.write_str("Unknown"),
914 Self::PreparingRebalance => f.write_str("PreparingRebalance"),
915 Self::CompletingRebalance => f.write_str("CompletingRebalance"),
916 Self::Stable => f.write_str("Stable"),
917 Self::Dead => f.write_str("Dead"),
918 Self::Empty => f.write_str("Empty"),
919 Self::Assigning => f.write_str("Assigning"),
920 Self::Reconciling => f.write_str("Reconciling"),
921 }
922 }
923}
924
925#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
929pub struct GroupDetail {
930 pub session_timeout_ms: i32,
931 pub rebalance_timeout_ms: Option<i32>,
932 pub members: BTreeMap<String, GroupMember>,
933 pub generation_id: i32,
934 pub skip_assignment: Option<bool>,
935 pub inception: SystemTime,
936 pub state: GroupState,
937}
938
939impl Default for GroupDetail {
940 fn default() -> Self {
941 Self {
942 session_timeout_ms: 45_000,
943 rebalance_timeout_ms: None,
944 members: BTreeMap::new(),
945 generation_id: -1,
946 skip_assignment: Some(false),
947 inception: SystemTime::now(),
948 state: GroupState::default(),
949 }
950 }
951}
952
953impl From<&GroupDetail> for ConsumerGroupState {
954 fn from(value: &GroupDetail) -> Self {
955 match value {
956 GroupDetail { members, .. } if members.is_empty() => Self::Empty,
957
958 GroupDetail {
959 state: GroupState::Forming { leader: None, .. },
960 ..
961 } => Self::Assigning,
962
963 GroupDetail {
964 state: GroupState::Formed { .. },
965 ..
966 } => Self::Stable,
967
968 _ => {
969 debug!(unknown = ?value);
970 Self::Unknown
971 }
972 }
973 }
974}
975
976impl From<&GroupDetail> for consumer_group_describe_response::DescribedGroup {
977 fn from(value: &GroupDetail) -> Self {
978 let assignor_name = match value.state {
979 GroupState::Forming { ref leader, .. } => leader.clone().unwrap_or_default(),
980 GroupState::Formed { ref leader, .. } => leader.clone(),
981 };
982
983 let group_state = ConsumerGroupState::from(value).to_string();
984
985 Self::default()
986 .error_code(ErrorCode::None.into())
987 .error_message(Some(ErrorCode::None.to_string()))
988 .group_id(Default::default())
989 .group_state(group_state)
990 .group_epoch(-1)
991 .assignment_epoch(-1)
992 .assignor_name(assignor_name)
993 .members(Some([].into()))
994 .authorized_operations(-1)
995 }
996}
997
998#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
999pub enum GroupDetailResponse {
1000 ErrorCode(ErrorCode),
1001 Found(GroupDetail),
1002}
1003
1004#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1009pub struct NamedGroupDetail {
1010 name: String,
1011 response: GroupDetailResponse,
1012}
1013
1014impl NamedGroupDetail {
1015 pub fn error_code(name: String, error_code: ErrorCode) -> Self {
1016 Self {
1017 name,
1018 response: GroupDetailResponse::ErrorCode(error_code),
1019 }
1020 }
1021
1022 pub fn found(name: String, found: GroupDetail) -> Self {
1023 Self {
1024 name,
1025 response: GroupDetailResponse::Found(found),
1026 }
1027 }
1028}
1029
1030impl From<&NamedGroupDetail> for consumer_group_describe_response::DescribedGroup {
1031 fn from(value: &NamedGroupDetail) -> Self {
1032 match value {
1033 NamedGroupDetail {
1034 name,
1035 response: GroupDetailResponse::Found(group_detail),
1036 } => {
1037 let assignor_name = match group_detail.state {
1038 GroupState::Forming { ref leader, .. } => leader.clone().unwrap_or_default(),
1039 GroupState::Formed { ref leader, .. } => leader.clone(),
1040 };
1041
1042 let group_state = ConsumerGroupState::from(group_detail).to_string();
1043
1044 Self::default()
1045 .error_code(ErrorCode::None.into())
1046 .error_message(Some(ErrorCode::None.to_string()))
1047 .group_id(name.into())
1048 .group_state(group_state)
1049 .group_epoch(-1)
1050 .assignment_epoch(-1)
1051 .assignor_name(assignor_name)
1052 .members(Some([].into()))
1053 .authorized_operations(-1)
1054 }
1055
1056 NamedGroupDetail {
1057 name,
1058 response: GroupDetailResponse::ErrorCode(error_code),
1059 } => Self::default()
1060 .error_code((*error_code).into())
1061 .error_message(Some(error_code.to_string()))
1062 .group_id(name.into())
1063 .group_state("Unknown".into())
1064 .group_epoch(-1)
1065 .assignment_epoch(-1)
1066 .assignor_name("".into())
1067 .members(Some([].into()))
1068 .authorized_operations(-1),
1069 }
1070 }
1071}
1072
1073impl From<&NamedGroupDetail> for describe_groups_response::DescribedGroup {
1074 fn from(value: &NamedGroupDetail) -> Self {
1075 match value {
1076 NamedGroupDetail {
1077 name,
1078 response: GroupDetailResponse::Found(group_detail),
1079 } => {
1080 let group_state = ConsumerGroupState::from(group_detail).to_string();
1081
1082 let members = group_detail
1083 .members
1084 .keys()
1085 .map(|member_id| {
1086 describe_groups_response::DescribedGroupMember::default()
1087 .member_id(member_id.into())
1088 .group_instance_id(None)
1089 .client_id("".into())
1090 .client_host("".into())
1091 .member_metadata(Bytes::new())
1092 .member_assignment(Bytes::new())
1093 })
1094 .collect::<Vec<_>>();
1095
1096 Self::default()
1097 .error_code(ErrorCode::None.into())
1098 .group_id(name.clone())
1099 .group_state(group_state)
1100 .protocol_type(group_detail.state.protocol_type().unwrap_or_default())
1101 .protocol_data("".into())
1102 .members(Some(members))
1103 .authorized_operations(Some(-1))
1104 }
1105
1106 NamedGroupDetail {
1107 name,
1108 response: GroupDetailResponse::ErrorCode(error_code),
1109 } => Self::default()
1110 .error_code((*error_code).into())
1111 .group_id(name.clone())
1112 .group_state("Unknown".into())
1113 .protocol_type("".into())
1114 .protocol_data("".into())
1115 .members(Some(vec![]))
1116 .authorized_operations(Some(-1)),
1117 }
1118 }
1119}
1120
1121#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1123pub struct TopitionDetail {
1124 error: ErrorCode,
1125 topic: TopicId,
1126 partitions: Option<Vec<PartitionDetail>>,
1127}
1128
1129#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1131pub struct PartitionDetail {
1132 error: ErrorCode,
1133 partition_index: i32,
1134}
1135
1136#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1138pub struct Version {
1139 e_tag: Option<String>,
1140 version: Option<String>,
1141}
1142
1143impl From<&Uuid> for Version {
1144 fn from(value: &Uuid) -> Self {
1145 Self {
1146 e_tag: Some(value.to_string()),
1147 version: None,
1148 }
1149 }
1150}
1151
1152#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1154pub struct ProducerIdResponse {
1155 pub error: ErrorCode,
1156 pub id: i64,
1157 pub epoch: i16,
1158}
1159
1160impl Default for ProducerIdResponse {
1161 fn default() -> Self {
1162 Self {
1163 error: ErrorCode::None,
1164 id: 1,
1165 epoch: 0,
1166 }
1167 }
1168}
1169
1170#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1173pub enum TxnAddPartitionsRequest {
1174 VersionZeroToThree {
1175 transaction_id: String,
1176 producer_id: i64,
1177 producer_epoch: i16,
1178 topics: Vec<AddPartitionsToTxnTopic>,
1179 },
1180
1181 VersionFourPlus {
1182 transactions: Vec<AddPartitionsToTxnTransaction>,
1183 },
1184}
1185
1186impl TryFrom<AddPartitionsToTxnRequest> for TxnAddPartitionsRequest {
1187 type Error = Error;
1188
1189 fn try_from(value: AddPartitionsToTxnRequest) -> result::Result<Self, Self::Error> {
1190 match value {
1191 AddPartitionsToTxnRequest {
1192 transactions: None,
1193 v_3_and_below_transactional_id: Some(transactional_id),
1194 v_3_and_below_producer_id: Some(producer_id),
1195 v_3_and_below_producer_epoch: Some(producer_epoch),
1196 v_3_and_below_topics: Some(topics),
1197 ..
1198 } => Ok(Self::VersionZeroToThree {
1199 transaction_id: transactional_id,
1200 producer_id,
1201 producer_epoch,
1202 topics,
1203 }),
1204
1205 AddPartitionsToTxnRequest {
1206 transactions: Some(transactions),
1207 v_3_and_below_transactional_id: None,
1208 v_3_and_below_producer_id: None,
1209 v_3_and_below_producer_epoch: None,
1210 v_3_and_below_topics: None,
1211 ..
1212 } => Ok(Self::VersionFourPlus { transactions }),
1213
1214 unexpected => Err(Error::UnexpectedAddPartitionsToTxnRequest(Box::new(
1215 unexpected,
1216 ))),
1217 }
1218 }
1219}
1220
1221#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1225pub enum TxnAddPartitionsResponse {
1226 VersionZeroToThree(Vec<AddPartitionsToTxnTopicResult>),
1227 VersionFourPlus(Vec<AddPartitionsToTxnResult>),
1228}
1229
1230impl TxnAddPartitionsResponse {
1231 pub fn zero_to_three(&self) -> &[AddPartitionsToTxnTopicResult] {
1232 match self {
1233 Self::VersionZeroToThree(result) => result.as_slice(),
1234 Self::VersionFourPlus(_) => &[][..],
1235 }
1236 }
1237
1238 pub fn four_plus(&self) -> &[AddPartitionsToTxnResult] {
1239 match self {
1240 Self::VersionZeroToThree(_) => &[][..],
1241 Self::VersionFourPlus(result) => result.as_slice(),
1242 }
1243 }
1244}
1245
1246#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1248pub struct TxnOffsetCommitRequest {
1249 pub transaction_id: String,
1250 pub group_id: String,
1251 pub producer_id: i64,
1252 pub producer_epoch: i16,
1253 pub generation_id: Option<i32>,
1254 pub member_id: Option<String>,
1255 pub group_instance_id: Option<String>,
1256 pub topics: Vec<TxnOffsetCommitRequestTopic>,
1257}
1258
1259#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1261pub enum TxnState {
1262 Begin,
1263 PrepareCommit,
1264 PrepareAbort,
1265 Committed,
1266 Aborted,
1267}
1268
1269impl TxnState {
1270 pub fn is_prepared(&self) -> bool {
1271 match self {
1272 Self::PrepareAbort | Self::PrepareCommit => true,
1273 _otherwise => false,
1274 }
1275 }
1276}
1277
1278impl FromStr for TxnState {
1279 type Err = Error;
1280
1281 fn from_str(s: &str) -> Result<Self, Self::Err> {
1282 match s {
1283 "ABORTED" => Ok(TxnState::Aborted),
1284 "BEGIN" => Ok(TxnState::Begin),
1285 "COMMITTED" => Ok(TxnState::Committed),
1286 "PREPARE_ABORT" => Ok(TxnState::PrepareAbort),
1287 "PREPARE_COMMIT" => Ok(TxnState::PrepareCommit),
1288 otherwise => Err(Error::UnknownTxnState(otherwise.to_owned())),
1289 }
1290 }
1291}
1292
1293impl TryFrom<String> for TxnState {
1294 type Error = Error;
1295
1296 fn try_from(value: String) -> Result<Self, Self::Error> {
1297 Self::from_str(&value)
1298 }
1299}
1300
1301impl From<TxnState> for String {
1302 fn from(value: TxnState) -> Self {
1303 match value {
1304 TxnState::Begin => "BEGIN".into(),
1305 TxnState::PrepareCommit => "PREPARE_COMMIT".into(),
1306 TxnState::PrepareAbort => "PREPARE_ABORT".into(),
1307 TxnState::Committed => "COMMITTED".into(),
1308 TxnState::Aborted => "ABORTED".into(),
1309 }
1310 }
1311}
1312
1313#[async_trait]
1317pub trait Storage: Clone + Debug + Send + Sync + 'static {
1318 async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()>;
1320
1321 async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid>;
1323
1324 async fn incremental_alter_resource(
1326 &self,
1327 resource: AlterConfigsResource,
1328 ) -> Result<AlterConfigsResourceResponse>;
1329
1330 async fn delete_records(
1332 &self,
1333 topics: &[DeleteRecordsTopic],
1334 ) -> Result<Vec<DeleteRecordsTopicResult>>;
1335
1336 async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode>;
1338
1339 async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>>;
1341
1342 async fn produce(
1344 &self,
1345 transaction_id: Option<&str>,
1346 topition: &Topition,
1347 batch: deflated::Batch,
1348 ) -> Result<i64>;
1349
1350 async fn fetch(
1352 &self,
1353 topition: &'_ Topition,
1354 offset: i64,
1355 min_bytes: u32,
1356 max_bytes: u32,
1357 isolation: IsolationLevel,
1358 ) -> Result<Vec<deflated::Batch>>;
1359
1360 async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage>;
1362
1363 async fn list_offsets(
1365 &self,
1366 isolation_level: IsolationLevel,
1367 offsets: &[(Topition, ListOffset)],
1368 ) -> Result<Vec<(Topition, ListOffsetResponse)>>;
1369
1370 async fn offset_commit(
1372 &self,
1373 group_id: &str,
1374 retention_time_ms: Option<Duration>,
1375 offsets: &[(Topition, OffsetCommitRequest)],
1376 ) -> Result<Vec<(Topition, ErrorCode)>>;
1377
1378 async fn offset_fetch(
1380 &self,
1381 group_id: Option<&str>,
1382 topics: &[Topition],
1383 require_stable: Option<bool>,
1384 ) -> Result<BTreeMap<Topition, i64>>;
1385
1386 async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>>;
1388
1389 async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse>;
1391
1392 async fn upsert_user_scram_credential(
1393 &self,
1394 user: &str,
1395 mechanism: ScramMechanism,
1396 credential: ScramCredential,
1397 ) -> Result<()>;
1398
1399 async fn delete_user_scram_credential(
1400 &self,
1401 user: &str,
1402 mechanism: ScramMechanism,
1403 ) -> Result<()>;
1404
1405 async fn user_scram_credential(
1406 &self,
1407 user: &str,
1408 mechanism: ScramMechanism,
1409 ) -> Result<Option<ScramCredential>>;
1410
1411 async fn describe_config(
1413 &self,
1414 name: &str,
1415 resource: ConfigResource,
1416 keys: Option<&[String]>,
1417 ) -> Result<DescribeConfigsResult>;
1418
1419 async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>>;
1421
1422 async fn delete_groups(
1424 &self,
1425 group_ids: Option<&[String]>,
1426 ) -> Result<Vec<DeletableGroupResult>>;
1427
1428 async fn describe_groups(
1430 &self,
1431 group_ids: Option<&[String]>,
1432 include_authorized_operations: bool,
1433 ) -> Result<Vec<NamedGroupDetail>>;
1434
1435 async fn describe_topic_partitions(
1437 &self,
1438 topics: Option<&[TopicId]>,
1439 partition_limit: i32,
1440 cursor: Option<Topition>,
1441 ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>>;
1442
1443 async fn update_group(
1445 &self,
1446 group_id: &str,
1447 detail: GroupDetail,
1448 version: Option<Version>,
1449 ) -> Result<Version, UpdateError<GroupDetail>>;
1450
1451 async fn init_producer(
1453 &self,
1454 transaction_id: Option<&str>,
1455 transaction_timeout_ms: i32,
1456 producer_id: Option<i64>,
1457 producer_epoch: Option<i16>,
1458 ) -> Result<ProducerIdResponse>;
1459
1460 async fn txn_add_offsets(
1462 &self,
1463 transaction_id: &str,
1464 producer_id: i64,
1465 producer_epoch: i16,
1466 group_id: &str,
1467 ) -> Result<ErrorCode>;
1468
1469 async fn txn_add_partitions(
1471 &self,
1472 partitions: TxnAddPartitionsRequest,
1473 ) -> Result<TxnAddPartitionsResponse>;
1474
1475 async fn txn_offset_commit(
1477 &self,
1478 offsets: TxnOffsetCommitRequest,
1479 ) -> Result<Vec<TxnOffsetCommitResponseTopic>>;
1480
1481 async fn txn_end(
1483 &self,
1484 transaction_id: &str,
1485 producer_id: i64,
1486 producer_epoch: i16,
1487 committed: bool,
1488 ) -> Result<ErrorCode>;
1489
1490 async fn maintain(&self, _now: SystemTime) -> Result<()> {
1492 Ok(())
1493 }
1494
1495 async fn cluster_id(&self) -> Result<String>;
1496
1497 async fn node(&self) -> Result<i32>;
1498
1499 async fn advertised_listener(&self) -> Result<Url>;
1500
1501 async fn ping(&self) -> Result<()>;
1502}
1503
1504#[derive(Clone, Debug, thiserror::Error)]
1506pub enum UpdateError<T> {
1507 Error(#[from] Error),
1508
1509 MissingEtag,
1510
1511 Outdated { current: Box<T>, version: Version },
1512
1513 SerdeJson(Arc<serde_json::Error>),
1514
1515 Uuid(#[from] uuid::Error),
1516}
1517
1518#[cfg(feature = "libsql")]
1519impl<T> From<libsql::Error> for UpdateError<T> {
1520 fn from(value: libsql::Error) -> Self {
1521 Self::Error(Error::from(value))
1522 }
1523}
1524
1525#[cfg(feature = "turso")]
1526impl<T> From<turso::Error> for UpdateError<T> {
1527 fn from(value: turso::Error) -> Self {
1528 Self::Error(Error::from(value))
1529 }
1530}
1531
1532#[cfg(any(feature = "dynostore", feature = "slatedb"))]
1533impl<T> From<object_store::Error> for UpdateError<T> {
1534 fn from(value: object_store::Error) -> Self {
1535 Self::Error(Error::from(value))
1536 }
1537}
1538
1539impl<T> From<serde_json::Error> for UpdateError<T> {
1540 fn from(value: serde_json::Error) -> Self {
1541 Self::SerdeJson(Arc::new(value))
1542 }
1543}
1544
1545#[cfg(feature = "postgres")]
1546impl<T> From<tokio_postgres::error::Error> for UpdateError<T> {
1547 fn from(value: tokio_postgres::error::Error) -> Self {
1548 Self::Error(Error::from(value))
1549 }
1550}
1551
1552#[derive(Clone)]
1554#[cfg_attr(
1555 not(any(
1556 feature = "dynostore",
1557 feature = "libsql",
1558 feature = "postgres",
1559 feature = "slatedb",
1560 feature = "turso"
1561 )),
1562 allow(missing_copy_implementations)
1563)]
1564pub enum StorageContainer {
1565 Null(null::Engine),
1566
1567 #[cfg(feature = "postgres")]
1568 Postgres(Postgres),
1569
1570 #[cfg(feature = "dynostore")]
1571 DynoStore(DynoStore),
1572
1573 #[cfg(feature = "libsql")]
1574 Lite(lite::Engine),
1575
1576 #[cfg(feature = "slatedb")]
1577 Slate(slate::Engine),
1578
1579 #[cfg(feature = "turso")]
1580 Turso(limbo::Engine),
1581}
1582
1583impl Debug for StorageContainer {
1584 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1585 match self {
1586 Self::Null(_) => f.debug_tuple(stringify!(StorageContainer::Null)).finish(),
1587
1588 #[cfg(feature = "postgres")]
1589 Self::Postgres(_) => f
1590 .debug_tuple(stringify!(StorageContainer::Postgres))
1591 .finish(),
1592
1593 #[cfg(feature = "dynostore")]
1594 Self::DynoStore(_) => f
1595 .debug_tuple(stringify!(StorageContainer::DynoStore))
1596 .finish(),
1597
1598 #[cfg(feature = "libsql")]
1599 Self::Lite(_) => f.debug_tuple(stringify!(StorageContainer::Lite)).finish(),
1600
1601 #[cfg(feature = "slatedb")]
1602 Self::Slate(_) => f.debug_tuple(stringify!(StorageContainer::Slate)).finish(),
1603
1604 #[cfg(feature = "turso")]
1605 Self::Turso(_) => f.debug_tuple(stringify!(StorageContainer::Turso)).finish(),
1606 }
1607 }
1608}
1609
1610impl StorageContainer {
1611 pub fn builder() -> PhantomBuilder {
1612 PhantomBuilder::default()
1613 }
1614}
1615
1616#[derive(Clone, Debug, Default)]
1618pub struct Builder<N, C, A, S> {
1619 node_id: N,
1620 cluster_id: C,
1621 advertised_listener: A,
1622 storage: S,
1623 schema_registry: Option<Registry>,
1624 lake_house: Option<House>,
1625 silent: bool,
1626
1627 cancellation: CancellationToken,
1628}
1629
1630type PhantomBuilder =
1631 Builder<PhantomData<i32>, PhantomData<String>, PhantomData<Url>, PhantomData<Url>>;
1632
1633impl<N, C, A, S> Builder<N, C, A, S> {
1634 pub fn node_id(self, node_id: i32) -> Builder<i32, C, A, S> {
1635 Builder {
1636 node_id,
1637 cluster_id: self.cluster_id,
1638 advertised_listener: self.advertised_listener,
1639 storage: self.storage,
1640 schema_registry: self.schema_registry,
1641 lake_house: self.lake_house,
1642 silent: self.silent,
1643 cancellation: self.cancellation,
1644 }
1645 }
1646
1647 pub fn cluster_id(self, cluster_id: impl Into<String>) -> Builder<N, String, A, S> {
1648 Builder {
1649 node_id: self.node_id,
1650 cluster_id: cluster_id.into(),
1651 advertised_listener: self.advertised_listener,
1652 storage: self.storage,
1653 schema_registry: self.schema_registry,
1654 lake_house: self.lake_house,
1655 silent: self.silent,
1656 cancellation: self.cancellation,
1657 }
1658 }
1659
1660 pub fn advertised_listener(self, advertised_listener: impl Into<Url>) -> Builder<N, C, Url, S> {
1661 Builder {
1662 node_id: self.node_id,
1663 cluster_id: self.cluster_id,
1664 advertised_listener: advertised_listener.into(),
1665 storage: self.storage,
1666 schema_registry: self.schema_registry,
1667 lake_house: self.lake_house,
1668 silent: self.silent,
1669 cancellation: self.cancellation,
1670 }
1671 }
1672
1673 pub fn storage(self, storage: Url) -> Builder<N, C, A, Url> {
1674 debug!(%storage);
1675
1676 Builder {
1677 node_id: self.node_id,
1678 cluster_id: self.cluster_id,
1679 advertised_listener: self.advertised_listener,
1680 storage,
1681 schema_registry: self.schema_registry,
1682 lake_house: self.lake_house,
1683 silent: self.silent,
1684 cancellation: self.cancellation,
1685 }
1686 }
1687
1688 pub fn schema_registry(self, schema_registry: Option<Registry>) -> Self {
1689 _ = schema_registry
1690 .as_ref()
1691 .inspect(|schema_registry| debug!(?schema_registry));
1692
1693 Self {
1694 schema_registry,
1695 ..self
1696 }
1697 }
1698
1699 pub fn lake_house(self, lake_house: Option<House>) -> Self {
1700 _ = lake_house
1701 .as_ref()
1702 .inspect(|lake_house| debug!(?lake_house));
1703
1704 Self { lake_house, ..self }
1705 }
1706
1707 pub fn cancellation(self, cancellation: CancellationToken) -> Self {
1708 Self {
1709 cancellation,
1710 ..self
1711 }
1712 }
1713
1714 pub fn silent(self, silent: bool) -> Self {
1715 Self { silent, ..self }
1716 }
1717}
1718
1719impl Builder<i32, String, Url, Url> {
1720 pub async fn build(self) -> Result<StorageContainer> {
1721 let storage = match self.storage.scheme() {
1722 #[cfg(feature = "postgres")]
1723 "postgres" | "postgresql" => Postgres::builder(self.storage.to_string().as_str())
1724 .map(|builder| builder.cluster(self.cluster_id.as_str()))
1725 .map(|builder| builder.node(self.node_id))
1726 .map(|builder| builder.advertised_listener(self.advertised_listener.clone()))
1727 .map(|builder| builder.schemas(self.schema_registry))
1728 .map(|builder| builder.lake(self.lake_house.clone()))
1729 .map(|builder| builder.build())
1730 .map(StorageContainer::Postgres),
1731
1732 #[cfg(not(feature = "postgres"))]
1733 "postgres" | "postgresql" => Err(Error::FeatureNotEnabled {
1734 feature: "postgres".into(),
1735 message: self.storage.to_string(),
1736 }),
1737
1738 #[cfg(feature = "dynostore")]
1739 "s3" => {
1740 let bucket_name = self.storage.host_str().unwrap_or("tansu");
1741
1742 AmazonS3Builder::from_env()
1743 .with_bucket_name(bucket_name)
1744 .with_conditional_put(S3ConditionalPut::ETagMatch)
1745 .build()
1746 .map(|object_store| {
1747 DynoStore::new(self.cluster_id.as_str(), self.node_id, object_store)
1748 .advertised_listener(self.advertised_listener.clone())
1749 .schemas(self.schema_registry)
1750 .lake(self.lake_house.clone())
1751 })
1752 .map(StorageContainer::DynoStore)
1753 .map_err(Into::into)
1754 }
1755
1756 #[cfg(feature = "dynostore")]
1757 "memory" => Ok(StorageContainer::DynoStore(
1758 DynoStore::new(self.cluster_id.as_str(), self.node_id, InMemory::new())
1759 .advertised_listener(self.advertised_listener.clone())
1760 .schemas(self.schema_registry)
1761 .lake(self.lake_house.clone()),
1762 )),
1763
1764 #[cfg(not(feature = "dynostore"))]
1765 "s3" | "memory" => Err(Error::FeatureNotEnabled {
1766 feature: "dynostore".into(),
1767 message: self.storage.to_string(),
1768 }),
1769
1770 #[cfg(feature = "libsql")]
1771 "sqlite" => lite::Engine::builder()
1772 .storage(self.storage.clone())
1773 .node(self.node_id)
1774 .cluster(self.cluster_id.clone())
1775 .advertised_listener(self.advertised_listener.clone())
1776 .schemas(self.schema_registry)
1777 .lake(self.lake_house.clone())
1778 .cancellation(self.cancellation.clone())
1779 .build()
1780 .await
1781 .map(StorageContainer::Lite),
1782
1783 #[cfg(not(feature = "libsql"))]
1784 "sqlite" => Err(Error::FeatureNotEnabled {
1785 feature: "libsql".into(),
1786 message: self.storage.to_string(),
1787 }),
1788
1789 #[cfg(feature = "slatedb")]
1790 "slatedb" => {
1791 use slatedb::Db;
1792 use slatedb::object_store::{
1793 ObjectStore as SlateObjectStore,
1794 aws::{
1795 AmazonS3Builder as SlateS3Builder,
1796 S3ConditionalPut as SlateS3ConditionalPut,
1797 },
1798 memory::InMemory as SlateInMemory,
1799 };
1800
1801 let host = self.storage.host_str().unwrap_or("tansu");
1802 let db_path = format!("tansu-{}.slatedb", self.cluster_id);
1803
1804 let object_store: Arc<dyn SlateObjectStore> = if host == "memory" {
1806 Arc::new(SlateInMemory::new())
1807 } else {
1808 SlateS3Builder::from_env()
1810 .with_bucket_name(host)
1811 .with_conditional_put(SlateS3ConditionalPut::ETagMatch)
1812 .build()
1813 .map(Arc::new)
1814 .map_err(|e| Error::Message(e.to_string()))?
1815 };
1816
1817 Db::open(db_path, object_store)
1818 .await
1819 .map(Arc::new)
1820 .map(|db| {
1821 slate::Engine::builder()
1822 .cluster(self.cluster_id.clone())
1823 .node(self.node_id)
1824 .advertised_listener(self.advertised_listener.clone())
1825 .db(db)
1826 .schemas(self.schema_registry)
1827 .lake(self.lake_house)
1828 .build()
1829 })
1830 .map(StorageContainer::Slate)
1831 .map_err(Into::into)
1832 }
1833
1834 #[cfg(not(feature = "slatedb"))]
1835 "slatedb" => Err(Error::FeatureNotEnabled {
1836 feature: "slatedb".into(),
1837 message: self.storage.to_string(),
1838 }),
1839
1840 #[cfg(feature = "turso")]
1841 "turso" => limbo::Engine::builder()
1842 .storage(self.storage.clone())
1843 .node(self.node_id)
1844 .cluster(self.cluster_id.clone())
1845 .advertised_listener(self.advertised_listener.clone())
1846 .schemas(self.schema_registry)
1847 .lake(self.lake_house.clone())
1848 .build()
1849 .await
1850 .map(StorageContainer::Turso),
1851
1852 #[cfg(not(feature = "turso"))]
1853 "turso" => Err(Error::FeatureNotEnabled {
1854 feature: "turso".into(),
1855 message: self.storage.to_string(),
1856 }),
1857
1858 "null" => Ok(StorageContainer::Null(null::Engine::new(
1859 self.cluster_id.clone(),
1860 self.node_id,
1861 self.advertised_listener.clone(),
1862 ))),
1863
1864 #[cfg(not(any(
1865 feature = "dynostore",
1866 feature = "libsql",
1867 feature = "postgres",
1868 feature = "slatedb",
1869 feature = "turso"
1870 )))]
1871 _storage => Ok(StorageContainer::Null(null::Engine::new(
1872 self.cluster_id.clone(),
1873 self.node_id,
1874 self.advertised_listener.clone(),
1875 ))),
1876
1877 #[cfg(any(
1878 feature = "dynostore",
1879 feature = "libsql",
1880 feature = "postgres",
1881 feature = "slatedb",
1882 feature = "turso"
1883 ))]
1884 _unsupported => Err(Error::UnsupportedStorageUrl(self.storage.clone())),
1885 }?;
1886
1887 let pb = if self.silent {
1888 None
1889 } else {
1890 let pb = ProgressBar::new(1);
1891 pb.set_style(
1892 ProgressStyle::with_template("[{elapsed}] {bar:40.cyan/blue} {msg}")
1893 .unwrap()
1894 .progress_chars("##-"),
1895 );
1896
1897 pb.set_message("connecting to storage");
1898
1899 Some(pb)
1900 };
1901
1902 storage.ping().await?;
1903
1904 if let Some(pb) = pb {
1905 pb.inc(1);
1906 pb.finish_with_message(format!("{} connected to storage", Emoji("✅", ""),));
1907 }
1908
1909 Ok(storage)
1910 }
1911}
1912
1913pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
1914 global::meter_with_scope(
1915 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
1916 .with_version(env!("CARGO_PKG_VERSION"))
1917 .with_schema_url(SCHEMA_URL)
1918 .build(),
1919 )
1920});
1921
1922static STORAGE_CONTAINER_REQUESTS: LazyLock<Counter<u64>> = LazyLock::new(|| {
1923 METER
1924 .u64_counter("tansu_storage_container_requests")
1925 .with_description("tansu storage container requests")
1926 .build()
1927});
1928
1929static STORAGE_CONTAINER_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
1930 METER
1931 .u64_counter("tansu_storage_container_errors")
1932 .with_description("tansu storage container errors")
1933 .build()
1934});
1935
1936#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
1937pub struct ScramCredential {
1938 pub salt: Bytes,
1939 pub iterations: i32,
1940 pub stored_key: Bytes,
1941 pub server_key: Bytes,
1942}
1943
1944#[async_trait]
1945impl Storage for StorageContainer {
1946 #[instrument(skip_all)]
1947 async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
1948 let attributes = [KeyValue::new("method", "register_broker")];
1949
1950 match self {
1951 #[cfg(feature = "dynostore")]
1952 Self::DynoStore(engine) => engine.register_broker(broker_registration),
1953
1954 #[cfg(feature = "libsql")]
1955 Self::Lite(engine) => engine.register_broker(broker_registration),
1956
1957 Self::Null(engine) => engine.register_broker(broker_registration),
1958
1959 #[cfg(feature = "postgres")]
1960 Self::Postgres(engine) => engine.register_broker(broker_registration),
1961
1962 #[cfg(feature = "slatedb")]
1963 Self::Slate(engine) => engine.register_broker(broker_registration),
1964
1965 #[cfg(feature = "turso")]
1966 Self::Turso(engine) => engine.register_broker(broker_registration),
1967 }
1968 .await
1969 .inspect(|_| {
1970 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
1971 })
1972 .inspect_err(|_| {
1973 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
1974 })
1975 }
1976
1977 #[instrument(skip_all)]
1978 async fn incremental_alter_resource(
1979 &self,
1980 resource: AlterConfigsResource,
1981 ) -> Result<AlterConfigsResourceResponse> {
1982 let attributes = [KeyValue::new("method", "incremental_alter_resource")];
1983
1984 match self {
1985 #[cfg(feature = "dynostore")]
1986 Self::DynoStore(engine) => engine.incremental_alter_resource(resource),
1987
1988 #[cfg(feature = "libsql")]
1989 Self::Lite(engine) => engine.incremental_alter_resource(resource),
1990
1991 Self::Null(engine) => engine.incremental_alter_resource(resource),
1992
1993 #[cfg(feature = "postgres")]
1994 Self::Postgres(engine) => engine.incremental_alter_resource(resource),
1995
1996 #[cfg(feature = "slatedb")]
1997 Self::Slate(engine) => engine.incremental_alter_resource(resource),
1998
1999 #[cfg(feature = "turso")]
2000 Self::Turso(engine) => engine.incremental_alter_resource(resource),
2001 }
2002 .await
2003 .inspect(|_| {
2004 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2005 })
2006 .inspect_err(|_| {
2007 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2008 })
2009 }
2010
2011 #[instrument(skip_all)]
2012 async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
2013 let attributes = [KeyValue::new("method", "create_topic")];
2014
2015 match self {
2016 #[cfg(feature = "dynostore")]
2017 Self::DynoStore(engine) => engine.create_topic(topic, validate_only),
2018
2019 #[cfg(feature = "libsql")]
2020 Self::Lite(engine) => engine.create_topic(topic, validate_only),
2021
2022 Self::Null(engine) => engine.create_topic(topic, validate_only),
2023
2024 #[cfg(feature = "postgres")]
2025 Self::Postgres(engine) => engine.create_topic(topic, validate_only),
2026
2027 #[cfg(feature = "turso")]
2028 Self::Turso(engine) => engine.create_topic(topic, validate_only),
2029
2030 #[cfg(feature = "slatedb")]
2031 Self::Slate(engine) => engine.create_topic(topic, validate_only),
2032 }
2033 .await
2034 .inspect(|_| {
2035 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2036 })
2037 .inspect_err(|_| {
2038 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2039 })
2040 }
2041
2042 #[instrument(skip_all)]
2043 async fn delete_records(
2044 &self,
2045 topics: &[DeleteRecordsTopic],
2046 ) -> Result<Vec<DeleteRecordsTopicResult>> {
2047 let attributes = [KeyValue::new("method", "delete_records")];
2048
2049 match self {
2050 #[cfg(feature = "dynostore")]
2051 Self::DynoStore(engine) => engine.delete_records(topics),
2052
2053 #[cfg(feature = "libsql")]
2054 Self::Lite(engine) => engine.delete_records(topics),
2055
2056 Self::Null(engine) => engine.delete_records(topics),
2057
2058 #[cfg(feature = "postgres")]
2059 Self::Postgres(engine) => engine.delete_records(topics),
2060
2061 #[cfg(feature = "slatedb")]
2062 Self::Slate(engine) => engine.delete_records(topics),
2063
2064 #[cfg(feature = "turso")]
2065 Self::Turso(engine) => engine.delete_records(topics),
2066 }
2067 .await
2068 .inspect(|_| {
2069 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2070 })
2071 .inspect_err(|_| {
2072 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2073 })
2074 }
2075
2076 #[instrument(skip_all)]
2077 async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
2078 let attributes = [KeyValue::new("method", "delete_topic")];
2079
2080 match self {
2081 #[cfg(feature = "dynostore")]
2082 Self::DynoStore(engine) => engine.delete_topic(topic),
2083
2084 #[cfg(feature = "libsql")]
2085 Self::Lite(engine) => engine.delete_topic(topic),
2086
2087 Self::Null(engine) => engine.delete_topic(topic),
2088
2089 #[cfg(feature = "postgres")]
2090 Self::Postgres(engine) => engine.delete_topic(topic),
2091
2092 #[cfg(feature = "slatedb")]
2093 Self::Slate(engine) => engine.delete_topic(topic),
2094
2095 #[cfg(feature = "turso")]
2096 Self::Turso(engine) => engine.delete_topic(topic),
2097 }
2098 .await
2099 .inspect(|_| {
2100 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2101 })
2102 .inspect_err(|_| {
2103 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2104 })
2105 }
2106
2107 #[instrument(skip_all)]
2108 async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
2109 let attributes = [KeyValue::new("method", "brokers")];
2110
2111 match self {
2112 #[cfg(feature = "dynostore")]
2113 Self::DynoStore(engine) => engine.brokers(),
2114
2115 #[cfg(feature = "libsql")]
2116 Self::Lite(engine) => engine.brokers(),
2117
2118 Self::Null(engine) => engine.brokers(),
2119
2120 #[cfg(feature = "postgres")]
2121 Self::Postgres(engine) => engine.brokers(),
2122
2123 #[cfg(feature = "slatedb")]
2124 Self::Slate(engine) => engine.brokers(),
2125
2126 #[cfg(feature = "turso")]
2127 Self::Turso(engine) => engine.brokers(),
2128 }
2129 .await
2130 .inspect(|_| {
2131 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2132 })
2133 .inspect_err(|_| {
2134 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2135 })
2136 }
2137
2138 #[instrument(skip_all)]
2139 async fn produce(
2140 &self,
2141 transaction_id: Option<&str>,
2142 topition: &Topition,
2143 batch: deflated::Batch,
2144 ) -> Result<i64> {
2145 let attributes = [KeyValue::new("method", "produce")];
2146
2147 match self {
2148 #[cfg(feature = "dynostore")]
2149 Self::DynoStore(engine) => engine.produce(transaction_id, topition, batch),
2150
2151 #[cfg(feature = "libsql")]
2152 Self::Lite(engine) => engine.produce(transaction_id, topition, batch),
2153
2154 Self::Null(engine) => engine.produce(transaction_id, topition, batch),
2155
2156 #[cfg(feature = "postgres")]
2157 Self::Postgres(engine) => engine.produce(transaction_id, topition, batch),
2158
2159 #[cfg(feature = "slatedb")]
2160 Self::Slate(engine) => engine.produce(transaction_id, topition, batch),
2161
2162 #[cfg(feature = "turso")]
2163 Self::Turso(engine) => engine.produce(transaction_id, topition, batch),
2164 }
2165 .await
2166 .inspect(|_| {
2167 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2168 })
2169 .inspect_err(|_| {
2170 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2171 })
2172 }
2173
2174 #[instrument(skip_all)]
2175 async fn fetch(
2176 &self,
2177 topition: &'_ Topition,
2178 offset: i64,
2179 min_bytes: u32,
2180 max_bytes: u32,
2181 isolation: IsolationLevel,
2182 ) -> Result<Vec<deflated::Batch>> {
2183 let attributes = [KeyValue::new("method", "fetch")];
2184
2185 match self {
2186 #[cfg(feature = "dynostore")]
2187 Self::DynoStore(engine) => {
2188 engine.fetch(topition, offset, min_bytes, max_bytes, isolation)
2189 }
2190
2191 #[cfg(feature = "libsql")]
2192 Self::Lite(engine) => engine.fetch(topition, offset, min_bytes, max_bytes, isolation),
2193
2194 Self::Null(engine) => engine.fetch(topition, offset, min_bytes, max_bytes, isolation),
2195
2196 #[cfg(feature = "postgres")]
2197 Self::Postgres(engine) => {
2198 engine.fetch(topition, offset, min_bytes, max_bytes, isolation)
2199 }
2200
2201 #[cfg(feature = "slatedb")]
2202 Self::Slate(engine) => engine.fetch(topition, offset, min_bytes, max_bytes, isolation),
2203
2204 #[cfg(feature = "turso")]
2205 Self::Turso(engine) => engine.fetch(topition, offset, min_bytes, max_bytes, isolation),
2206 }
2207 .await
2208 .inspect(|_| {
2209 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2210 })
2211 .inspect_err(|_| {
2212 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2213 })
2214 }
2215
2216 #[instrument(skip_all)]
2217 async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
2218 let attributes = [KeyValue::new("method", "offset_stage")];
2219
2220 match self {
2221 #[cfg(feature = "dynostore")]
2222 Self::DynoStore(engine) => engine.offset_stage(topition),
2223
2224 #[cfg(feature = "libsql")]
2225 Self::Lite(engine) => engine.offset_stage(topition),
2226
2227 Self::Null(engine) => engine.offset_stage(topition),
2228
2229 #[cfg(feature = "postgres")]
2230 Self::Postgres(engine) => engine.offset_stage(topition),
2231
2232 #[cfg(feature = "slatedb")]
2233 Self::Slate(engine) => engine.offset_stage(topition),
2234
2235 #[cfg(feature = "turso")]
2236 Self::Turso(engine) => engine.offset_stage(topition),
2237 }
2238 .await
2239 .inspect(|_| {
2240 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2241 })
2242 .inspect_err(|_| {
2243 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2244 })
2245 }
2246
2247 #[instrument(skip_all)]
2248 async fn list_offsets(
2249 &self,
2250 isolation_level: IsolationLevel,
2251 offsets: &[(Topition, ListOffset)],
2252 ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
2253 let attributes = [KeyValue::new("method", "list_offsets")];
2254
2255 match self {
2256 #[cfg(feature = "dynostore")]
2257 Self::DynoStore(engine) => engine.list_offsets(isolation_level, offsets),
2258
2259 #[cfg(feature = "libsql")]
2260 Self::Lite(engine) => engine.list_offsets(isolation_level, offsets),
2261
2262 Self::Null(engine) => engine.list_offsets(isolation_level, offsets),
2263
2264 #[cfg(feature = "postgres")]
2265 Self::Postgres(engine) => engine.list_offsets(isolation_level, offsets),
2266
2267 #[cfg(feature = "slatedb")]
2268 Self::Slate(engine) => engine.list_offsets(isolation_level, offsets),
2269
2270 #[cfg(feature = "turso")]
2271 Self::Turso(engine) => engine.list_offsets(isolation_level, offsets),
2272 }
2273 .await
2274 .inspect(|_| {
2275 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2276 })
2277 .inspect_err(|_| {
2278 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2279 })
2280 }
2281
2282 #[instrument(skip_all)]
2283 async fn offset_commit(
2284 &self,
2285 group_id: &str,
2286 retention_time_ms: Option<Duration>,
2287 offsets: &[(Topition, OffsetCommitRequest)],
2288 ) -> Result<Vec<(Topition, ErrorCode)>> {
2289 let attributes = [KeyValue::new("method", "offset_commit")];
2290
2291 match self {
2292 #[cfg(feature = "dynostore")]
2293 Self::DynoStore(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2294
2295 #[cfg(feature = "libsql")]
2296 Self::Lite(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2297
2298 Self::Null(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2299
2300 #[cfg(feature = "postgres")]
2301 Self::Postgres(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2302
2303 #[cfg(feature = "slatedb")]
2304 Self::Slate(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2305
2306 #[cfg(feature = "turso")]
2307 Self::Turso(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2308 }
2309 .await
2310 .inspect(|_| {
2311 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2312 })
2313 .inspect_err(|_| {
2314 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2315 })
2316 }
2317
2318 #[instrument(skip_all)]
2319 async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
2320 let attributes = [KeyValue::new("method", "committed_offset_topitions")];
2321
2322 match self {
2323 #[cfg(feature = "dynostore")]
2324 Self::DynoStore(engine) => engine.committed_offset_topitions(group_id),
2325
2326 #[cfg(feature = "libsql")]
2327 Self::Lite(engine) => engine.committed_offset_topitions(group_id),
2328
2329 Self::Null(engine) => engine.committed_offset_topitions(group_id),
2330
2331 #[cfg(feature = "postgres")]
2332 Self::Postgres(engine) => engine.committed_offset_topitions(group_id),
2333
2334 #[cfg(feature = "slatedb")]
2335 Self::Slate(engine) => engine.committed_offset_topitions(group_id),
2336
2337 #[cfg(feature = "turso")]
2338 Self::Turso(engine) => engine.committed_offset_topitions(group_id),
2339 }
2340 .await
2341 .inspect(|_| {
2342 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2343 })
2344 .inspect_err(|_| {
2345 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2346 })
2347 }
2348
2349 #[instrument(skip_all)]
2350 async fn offset_fetch(
2351 &self,
2352 group_id: Option<&str>,
2353 topics: &[Topition],
2354 require_stable: Option<bool>,
2355 ) -> Result<BTreeMap<Topition, i64>> {
2356 let attributes = [KeyValue::new("method", "offset_fetch")];
2357
2358 match self {
2359 #[cfg(feature = "dynostore")]
2360 Self::DynoStore(engine) => engine.offset_fetch(group_id, topics, require_stable),
2361
2362 #[cfg(feature = "libsql")]
2363 Self::Lite(engine) => engine.offset_fetch(group_id, topics, require_stable),
2364
2365 Self::Null(engine) => engine.offset_fetch(group_id, topics, require_stable),
2366
2367 #[cfg(feature = "postgres")]
2368 Self::Postgres(engine) => engine.offset_fetch(group_id, topics, require_stable),
2369
2370 #[cfg(feature = "slatedb")]
2371 Self::Slate(engine) => engine.offset_fetch(group_id, topics, require_stable),
2372
2373 #[cfg(feature = "turso")]
2374 Self::Turso(engine) => engine.offset_fetch(group_id, topics, require_stable),
2375 }
2376 .await
2377 .inspect(|_| {
2378 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2379 })
2380 .inspect_err(|_| {
2381 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2382 })
2383 }
2384
2385 #[instrument(skip_all)]
2386 async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
2387 let attributes = [KeyValue::new("method", "metadata")];
2388
2389 match self {
2390 #[cfg(feature = "dynostore")]
2391 Self::DynoStore(engine) => engine.metadata(topics),
2392
2393 #[cfg(feature = "libsql")]
2394 Self::Lite(engine) => engine.metadata(topics),
2395
2396 Self::Null(engine) => engine.metadata(topics),
2397
2398 #[cfg(feature = "postgres")]
2399 Self::Postgres(engine) => engine.metadata(topics),
2400
2401 #[cfg(feature = "slatedb")]
2402 Self::Slate(engine) => engine.metadata(topics),
2403
2404 #[cfg(feature = "turso")]
2405 Self::Turso(engine) => engine.metadata(topics),
2406 }
2407 .await
2408 .inspect(|_| {
2409 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2410 })
2411 .inspect_err(|_| {
2412 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2413 })
2414 }
2415
2416 #[instrument(skip_all)]
2417 async fn describe_config(
2418 &self,
2419 name: &str,
2420 resource: ConfigResource,
2421 keys: Option<&[String]>,
2422 ) -> Result<DescribeConfigsResult> {
2423 let attributes = [KeyValue::new("method", "describe_config")];
2424
2425 match self {
2426 #[cfg(feature = "dynostore")]
2427 Self::DynoStore(engine) => engine.describe_config(name, resource, keys),
2428
2429 #[cfg(feature = "libsql")]
2430 Self::Lite(engine) => engine.describe_config(name, resource, keys),
2431
2432 Self::Null(engine) => engine.describe_config(name, resource, keys),
2433
2434 #[cfg(feature = "postgres")]
2435 Self::Postgres(engine) => engine.describe_config(name, resource, keys),
2436
2437 #[cfg(feature = "slatedb")]
2438 Self::Slate(engine) => engine.describe_config(name, resource, keys),
2439
2440 #[cfg(feature = "turso")]
2441 Self::Turso(engine) => engine.describe_config(name, resource, keys),
2442 }
2443 .await
2444 .inspect(|_| {
2445 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2446 })
2447 .inspect_err(|_| {
2448 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2449 })
2450 }
2451
2452 #[instrument(skip_all)]
2453 async fn describe_topic_partitions(
2454 &self,
2455 topics: Option<&[TopicId]>,
2456 partition_limit: i32,
2457 cursor: Option<Topition>,
2458 ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
2459 let attributes = [KeyValue::new("method", "describe_topic_partitions")];
2460
2461 match self {
2462 #[cfg(feature = "dynostore")]
2463 Self::DynoStore(engine) => {
2464 engine.describe_topic_partitions(topics, partition_limit, cursor)
2465 }
2466
2467 #[cfg(feature = "libsql")]
2468 Self::Lite(engine) => engine.describe_topic_partitions(topics, partition_limit, cursor),
2469
2470 Self::Null(engine) => engine.describe_topic_partitions(topics, partition_limit, cursor),
2471
2472 #[cfg(feature = "postgres")]
2473 Self::Postgres(engine) => {
2474 engine.describe_topic_partitions(topics, partition_limit, cursor)
2475 }
2476
2477 #[cfg(feature = "slatedb")]
2478 Self::Slate(engine) => {
2479 engine.describe_topic_partitions(topics, partition_limit, cursor)
2480 }
2481
2482 #[cfg(feature = "turso")]
2483 Self::Turso(engine) => {
2484 engine.describe_topic_partitions(topics, partition_limit, cursor)
2485 }
2486 }
2487 .await
2488 .inspect(|_| {
2489 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2490 })
2491 .inspect_err(|_| {
2492 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2493 })
2494 }
2495
2496 #[instrument(skip_all)]
2497 async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
2498 let attributes = [KeyValue::new("method", "list_groups")];
2499
2500 match self {
2501 #[cfg(feature = "dynostore")]
2502 Self::DynoStore(engine) => engine.list_groups(states_filter),
2503
2504 #[cfg(feature = "libsql")]
2505 Self::Lite(engine) => engine.list_groups(states_filter),
2506
2507 Self::Null(engine) => engine.list_groups(states_filter),
2508
2509 #[cfg(feature = "postgres")]
2510 Self::Postgres(engine) => engine.list_groups(states_filter),
2511
2512 #[cfg(feature = "slatedb")]
2513 Self::Slate(engine) => engine.list_groups(states_filter),
2514
2515 #[cfg(feature = "turso")]
2516 Self::Turso(engine) => engine.list_groups(states_filter),
2517 }
2518 .await
2519 .inspect(|_| {
2520 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2521 })
2522 .inspect_err(|_| {
2523 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2524 })
2525 }
2526
2527 #[instrument(skip_all)]
2528 async fn delete_groups(
2529 &self,
2530 group_ids: Option<&[String]>,
2531 ) -> Result<Vec<DeletableGroupResult>> {
2532 let attributes = [KeyValue::new("method", "delete_groups")];
2533
2534 match self {
2535 #[cfg(feature = "dynostore")]
2536 Self::DynoStore(engine) => engine.delete_groups(group_ids),
2537
2538 #[cfg(feature = "libsql")]
2539 Self::Lite(engine) => engine.delete_groups(group_ids),
2540
2541 Self::Null(engine) => engine.delete_groups(group_ids),
2542
2543 #[cfg(feature = "postgres")]
2544 Self::Postgres(engine) => engine.delete_groups(group_ids),
2545
2546 #[cfg(feature = "slatedb")]
2547 Self::Slate(engine) => engine.delete_groups(group_ids),
2548
2549 #[cfg(feature = "turso")]
2550 Self::Turso(engine) => engine.delete_groups(group_ids),
2551 }
2552 .await
2553 .inspect(|_| {
2554 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2555 })
2556 .inspect_err(|_| {
2557 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2558 })
2559 }
2560
2561 #[instrument(skip_all)]
2562 async fn describe_groups(
2563 &self,
2564 group_ids: Option<&[String]>,
2565 include_authorized_operations: bool,
2566 ) -> Result<Vec<NamedGroupDetail>> {
2567 let attributes = [KeyValue::new("method", "describe_groups")];
2568
2569 match self {
2570 #[cfg(feature = "dynostore")]
2571 Self::DynoStore(engine) => {
2572 engine.describe_groups(group_ids, include_authorized_operations)
2573 }
2574
2575 #[cfg(feature = "libsql")]
2576 Self::Lite(engine) => engine.describe_groups(group_ids, include_authorized_operations),
2577
2578 Self::Null(engine) => engine.describe_groups(group_ids, include_authorized_operations),
2579
2580 #[cfg(feature = "postgres")]
2581 Self::Postgres(engine) => {
2582 engine.describe_groups(group_ids, include_authorized_operations)
2583 }
2584
2585 #[cfg(feature = "slatedb")]
2586 Self::Slate(engine) => engine.describe_groups(group_ids, include_authorized_operations),
2587
2588 #[cfg(feature = "turso")]
2589 Self::Turso(engine) => engine.describe_groups(group_ids, include_authorized_operations),
2590 }
2591 .await
2592 .inspect(|_| {
2593 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2594 })
2595 .inspect_err(|_| {
2596 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2597 })
2598 }
2599
2600 #[instrument(skip_all)]
2601 async fn update_group(
2602 &self,
2603 group_id: &str,
2604 detail: GroupDetail,
2605 version: Option<Version>,
2606 ) -> Result<Version, UpdateError<GroupDetail>> {
2607 let attributes = [KeyValue::new("method", "update_group")];
2608
2609 match self {
2610 #[cfg(feature = "dynostore")]
2611 Self::DynoStore(engine) => engine.update_group(group_id, detail, version),
2612
2613 #[cfg(feature = "libsql")]
2614 Self::Lite(engine) => engine.update_group(group_id, detail, version),
2615
2616 Self::Null(engine) => engine.update_group(group_id, detail, version),
2617
2618 #[cfg(feature = "postgres")]
2619 Self::Postgres(engine) => engine.update_group(group_id, detail, version),
2620
2621 #[cfg(feature = "slatedb")]
2622 Self::Slate(engine) => engine.update_group(group_id, detail, version),
2623
2624 #[cfg(feature = "turso")]
2625 Self::Turso(engine) => engine.update_group(group_id, detail, version),
2626 }
2627 .await
2628 .inspect(|_| {
2629 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2630 })
2631 .inspect_err(|_| {
2632 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2633 })
2634 }
2635
2636 #[instrument(skip_all)]
2637 async fn init_producer(
2638 &self,
2639 transaction_id: Option<&str>,
2640 transaction_timeout_ms: i32,
2641 producer_id: Option<i64>,
2642 producer_epoch: Option<i16>,
2643 ) -> Result<ProducerIdResponse> {
2644 let attributes = [KeyValue::new("method", "init_producer")];
2645
2646 match self {
2647 #[cfg(feature = "dynostore")]
2648 Self::DynoStore(engine) => engine.init_producer(
2649 transaction_id,
2650 transaction_timeout_ms,
2651 producer_id,
2652 producer_epoch,
2653 ),
2654
2655 #[cfg(feature = "libsql")]
2656 Self::Lite(engine) => engine.init_producer(
2657 transaction_id,
2658 transaction_timeout_ms,
2659 producer_id,
2660 producer_epoch,
2661 ),
2662
2663 Self::Null(engine) => engine.init_producer(
2664 transaction_id,
2665 transaction_timeout_ms,
2666 producer_id,
2667 producer_epoch,
2668 ),
2669
2670 #[cfg(feature = "postgres")]
2671 Self::Postgres(engine) => engine.init_producer(
2672 transaction_id,
2673 transaction_timeout_ms,
2674 producer_id,
2675 producer_epoch,
2676 ),
2677
2678 #[cfg(feature = "slatedb")]
2679 Self::Slate(engine) => engine.init_producer(
2680 transaction_id,
2681 transaction_timeout_ms,
2682 producer_id,
2683 producer_epoch,
2684 ),
2685
2686 #[cfg(feature = "turso")]
2687 Self::Turso(engine) => engine.init_producer(
2688 transaction_id,
2689 transaction_timeout_ms,
2690 producer_id,
2691 producer_epoch,
2692 ),
2693 }
2694 .await
2695 .inspect(|_| {
2696 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2697 })
2698 .inspect_err(|_| {
2699 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2700 })
2701 }
2702
2703 #[instrument(skip_all)]
2704 async fn txn_add_offsets(
2705 &self,
2706 transaction_id: &str,
2707 producer_id: i64,
2708 producer_epoch: i16,
2709 group_id: &str,
2710 ) -> Result<ErrorCode> {
2711 let attributes = [KeyValue::new("method", "txn_add_offsets")];
2712
2713 match self {
2714 #[cfg(feature = "dynostore")]
2715 Self::DynoStore(engine) => {
2716 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2717 }
2718
2719 #[cfg(feature = "libsql")]
2720 Self::Lite(engine) => {
2721 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2722 }
2723
2724 Self::Null(engine) => {
2725 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2726 }
2727
2728 #[cfg(feature = "postgres")]
2729 Self::Postgres(engine) => {
2730 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2731 }
2732
2733 #[cfg(feature = "slatedb")]
2734 Self::Slate(engine) => {
2735 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2736 }
2737
2738 #[cfg(feature = "turso")]
2739 Self::Turso(engine) => {
2740 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2741 }
2742 }
2743 .await
2744 .inspect(|_| {
2745 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2746 })
2747 .inspect_err(|_| {
2748 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2749 })
2750 }
2751
2752 #[instrument(skip_all)]
2753 async fn txn_add_partitions(
2754 &self,
2755 partitions: TxnAddPartitionsRequest,
2756 ) -> Result<TxnAddPartitionsResponse> {
2757 let attributes = [KeyValue::new("method", "txn_add_partitions")];
2758
2759 match self {
2760 #[cfg(feature = "dynostore")]
2761 Self::DynoStore(engine) => engine.txn_add_partitions(partitions),
2762
2763 #[cfg(feature = "libsql")]
2764 Self::Lite(engine) => engine.txn_add_partitions(partitions),
2765
2766 Self::Null(engine) => engine.txn_add_partitions(partitions),
2767
2768 #[cfg(feature = "postgres")]
2769 Self::Postgres(engine) => engine.txn_add_partitions(partitions),
2770
2771 #[cfg(feature = "slatedb")]
2772 Self::Slate(engine) => engine.txn_add_partitions(partitions),
2773
2774 #[cfg(feature = "turso")]
2775 Self::Turso(engine) => engine.txn_add_partitions(partitions),
2776 }
2777 .await
2778 .inspect(|_| {
2779 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2780 })
2781 .inspect_err(|_| {
2782 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2783 })
2784 }
2785
2786 #[instrument(skip_all)]
2787 async fn txn_offset_commit(
2788 &self,
2789 offsets: TxnOffsetCommitRequest,
2790 ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
2791 let attributes = [KeyValue::new("method", "txn_offset_commit")];
2792
2793 match self {
2794 #[cfg(feature = "dynostore")]
2795 Self::DynoStore(engine) => engine.txn_offset_commit(offsets),
2796
2797 #[cfg(feature = "libsql")]
2798 Self::Lite(engine) => engine.txn_offset_commit(offsets),
2799
2800 Self::Null(engine) => engine.txn_offset_commit(offsets),
2801
2802 #[cfg(feature = "postgres")]
2803 Self::Postgres(engine) => engine.txn_offset_commit(offsets),
2804
2805 #[cfg(feature = "slatedb")]
2806 Self::Slate(engine) => engine.txn_offset_commit(offsets),
2807
2808 #[cfg(feature = "turso")]
2809 Self::Turso(engine) => engine.txn_offset_commit(offsets),
2810 }
2811 .await
2812 .inspect(|_| {
2813 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2814 })
2815 .inspect_err(|_| {
2816 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2817 })
2818 }
2819
2820 #[instrument(skip_all)]
2821 async fn txn_end(
2822 &self,
2823 transaction_id: &str,
2824 producer_id: i64,
2825 producer_epoch: i16,
2826 committed: bool,
2827 ) -> Result<ErrorCode> {
2828 let attributes = [KeyValue::new("method", "txn_end")];
2829
2830 match self {
2831 #[cfg(feature = "dynostore")]
2832 Self::DynoStore(engine) => {
2833 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2834 }
2835
2836 #[cfg(feature = "libsql")]
2837 Self::Lite(engine) => {
2838 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2839 }
2840
2841 Self::Null(engine) => {
2842 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2843 }
2844
2845 #[cfg(feature = "postgres")]
2846 Self::Postgres(engine) => {
2847 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2848 }
2849
2850 #[cfg(feature = "slatedb")]
2851 Self::Slate(engine) => {
2852 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2853 }
2854
2855 #[cfg(feature = "turso")]
2856 Self::Turso(engine) => {
2857 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2858 }
2859 }
2860 .await
2861 .inspect(|_| {
2862 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2863 })
2864 .inspect_err(|_| {
2865 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2866 })
2867 }
2868
2869 #[instrument(skip_all)]
2870 async fn maintain(&self, now: SystemTime) -> Result<()> {
2871 let attributes = [KeyValue::new("method", "maintain")];
2872
2873 match self {
2874 #[cfg(feature = "dynostore")]
2875 Self::DynoStore(engine) => engine.maintain(now),
2876
2877 #[cfg(feature = "libsql")]
2878 Self::Lite(engine) => engine.maintain(now),
2879
2880 Self::Null(engine) => engine.maintain(now),
2881
2882 #[cfg(feature = "postgres")]
2883 Self::Postgres(engine) => engine.maintain(now),
2884
2885 #[cfg(feature = "slatedb")]
2886 Self::Slate(engine) => engine.maintain(now),
2887
2888 #[cfg(feature = "turso")]
2889 Self::Turso(engine) => engine.maintain(now),
2890 }
2891 .await
2892 .inspect(|maintain| {
2893 debug!(?maintain);
2894 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2895 })
2896 .inspect_err(|err| {
2897 debug!(?err);
2898 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2899 })
2900 }
2901
2902 #[instrument(skip_all)]
2903 async fn cluster_id(&self) -> Result<String> {
2904 match self {
2905 #[cfg(feature = "dynostore")]
2906 Self::DynoStore(engine) => engine.cluster_id().await,
2907
2908 #[cfg(feature = "libsql")]
2909 Self::Lite(engine) => engine.cluster_id().await,
2910
2911 Self::Null(engine) => engine.cluster_id().await,
2912
2913 #[cfg(feature = "postgres")]
2914 Self::Postgres(engine) => engine.cluster_id().await,
2915
2916 #[cfg(feature = "slatedb")]
2917 Self::Slate(engine) => engine.cluster_id().await,
2918
2919 #[cfg(feature = "turso")]
2920 Self::Turso(engine) => engine.cluster_id().await,
2921 }
2922 }
2923
2924 #[instrument(skip_all)]
2925 async fn node(&self) -> Result<i32> {
2926 match self {
2927 #[cfg(feature = "dynostore")]
2928 Self::DynoStore(engine) => engine.node().await,
2929
2930 #[cfg(feature = "libsql")]
2931 Self::Lite(engine) => engine.node().await,
2932
2933 Self::Null(engine) => engine.node().await,
2934
2935 #[cfg(feature = "postgres")]
2936 Self::Postgres(engine) => engine.node().await,
2937
2938 #[cfg(feature = "slatedb")]
2939 Self::Slate(engine) => engine.node().await,
2940
2941 #[cfg(feature = "turso")]
2942 Self::Turso(engine) => engine.node().await,
2943 }
2944 }
2945
2946 #[instrument(skip_all)]
2947 async fn advertised_listener(&self) -> Result<Url> {
2948 match self {
2949 #[cfg(feature = "dynostore")]
2950 Self::DynoStore(engine) => engine.advertised_listener().await,
2951
2952 #[cfg(feature = "libsql")]
2953 Self::Lite(engine) => engine.advertised_listener().await,
2954
2955 Self::Null(engine) => engine.advertised_listener().await,
2956
2957 #[cfg(feature = "postgres")]
2958 Self::Postgres(engine) => engine.advertised_listener().await,
2959
2960 #[cfg(feature = "slatedb")]
2961 Self::Slate(engine) => engine.advertised_listener().await,
2962
2963 #[cfg(feature = "turso")]
2964 Self::Turso(engine) => engine.advertised_listener().await,
2965 }
2966 }
2967
2968 async fn delete_user_scram_credential(
2969 &self,
2970 user: &str,
2971 mechanism: ScramMechanism,
2972 ) -> Result<()> {
2973 match self {
2974 #[cfg(feature = "dynostore")]
2975 Self::DynoStore(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2976
2977 #[cfg(feature = "libsql")]
2978 Self::Lite(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2979
2980 Self::Null(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2981
2982 #[cfg(feature = "postgres")]
2983 Self::Postgres(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2984
2985 #[cfg(feature = "slatedb")]
2986 Self::Slate(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2987
2988 #[cfg(feature = "turso")]
2989 Self::Turso(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2990 }
2991 }
2992
2993 async fn upsert_user_scram_credential(
2994 &self,
2995 user: &str,
2996 mechanism: ScramMechanism,
2997 credential: ScramCredential,
2998 ) -> Result<()> {
2999 match self {
3000 #[cfg(feature = "dynostore")]
3001 Self::DynoStore(engine) => {
3002 engine
3003 .upsert_user_scram_credential(user, mechanism, credential)
3004 .await
3005 }
3006
3007 #[cfg(feature = "libsql")]
3008 Self::Lite(engine) => {
3009 engine
3010 .upsert_user_scram_credential(user, mechanism, credential)
3011 .await
3012 }
3013
3014 Self::Null(engine) => {
3015 engine
3016 .upsert_user_scram_credential(user, mechanism, credential)
3017 .await
3018 }
3019
3020 #[cfg(feature = "postgres")]
3021 Self::Postgres(engine) => {
3022 engine
3023 .upsert_user_scram_credential(user, mechanism, credential)
3024 .await
3025 }
3026
3027 #[cfg(feature = "slatedb")]
3028 Self::Slate(engine) => {
3029 engine
3030 .upsert_user_scram_credential(user, mechanism, credential)
3031 .await
3032 }
3033
3034 #[cfg(feature = "turso")]
3035 Self::Turso(engine) => {
3036 engine
3037 .upsert_user_scram_credential(user, mechanism, credential)
3038 .await
3039 }
3040 }
3041 }
3042
3043 async fn user_scram_credential(
3044 &self,
3045 user: &str,
3046 mechanism: ScramMechanism,
3047 ) -> Result<Option<ScramCredential>> {
3048 match self {
3049 #[cfg(feature = "dynostore")]
3050 Self::DynoStore(engine) => engine.user_scram_credential(user, mechanism).await,
3051
3052 #[cfg(feature = "libsql")]
3053 Self::Lite(engine) => engine.user_scram_credential(user, mechanism).await,
3054
3055 Self::Null(engine) => engine.user_scram_credential(user, mechanism).await,
3056
3057 #[cfg(feature = "postgres")]
3058 Self::Postgres(engine) => engine.user_scram_credential(user, mechanism).await,
3059
3060 #[cfg(feature = "slatedb")]
3061 Self::Slate(engine) => engine.user_scram_credential(user, mechanism).await,
3062
3063 #[cfg(feature = "turso")]
3064 Self::Turso(engine) => engine.user_scram_credential(user, mechanism).await,
3065 }
3066 }
3067
3068 #[instrument(skip_all)]
3069 async fn ping(&self) -> Result<()> {
3070 let attributes = [KeyValue::new("method", "ping")];
3071
3072 match self {
3073 #[cfg(feature = "dynostore")]
3074 Self::DynoStore(engine) => engine.ping(),
3075
3076 #[cfg(feature = "libsql")]
3077 Self::Lite(engine) => engine.ping(),
3078
3079 Self::Null(engine) => engine.ping(),
3080
3081 #[cfg(feature = "postgres")]
3082 Self::Postgres(engine) => engine.ping(),
3083
3084 #[cfg(feature = "turso")]
3085 Self::Turso(engine) => engine.ping(),
3086
3087 #[cfg(feature = "slatedb")]
3088 Self::Slate(engine) => engine.ping(),
3089 }
3090 .await
3091 .inspect(|_| {
3092 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3093 })
3094 .inspect_err(|_| {
3095 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3096 })
3097 }
3098}
3099
3100#[cfg(test)]
3101mod tests {
3102 use super::*;
3103
3104 #[test]
3105 fn topition_from_str() -> Result<()> {
3106 let topition = Topition::from_str("qwerty-2147483647")?;
3107 assert_eq!("qwerty", topition.topic());
3108 assert_eq!(i32::MAX, topition.partition());
3109 Ok(())
3110 }
3111
3112 #[test]
3113 fn topic_with_dashes_in_name() -> Result<()> {
3114 let topition = Topition::from_str("test-topic-0000000-eFC79C8-2147483647")?;
3115 assert_eq!("test-topic-0000000-eFC79C8", topition.topic());
3116 assert_eq!(i32::MAX, topition.partition());
3117 Ok(())
3118 }
3119}