Skip to main content

tansu_storage/
lib.rs

1// Copyright ⓒ 2024-2026 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15//! Tansu Storage Abstraction
16//!
17//! [`StorageContainer`] provides an abstraction over [`Storage`] and can
18//! be configured to use memory, [S3](https://en.wikipedia.org/wiki/Amazon_S3),
19//! [PostgreSQL](https://postgresql.org/),
20//! [libSQL](https://github.com/tursodatabase/libsql) and
21//! [Turso](https://github.com/tursodatabase/turso) (alpha: currently feature locked).
22//!
23//! ## Memory
24//!
25//! ```
26//! # use tansu_storage::{Error, StorageContainer};
27//! # use url::Url;
28//! # #[tokio::main]
29//! # async fn main() -> Result<(), Error> {
30//! let storage = StorageContainer::builder()
31//!     .cluster_id("tansu")
32//!     .node_id(111)
33//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
34//!     .storage(Url::parse("memory://tansu/")?)
35//!     .build()
36//!     .await?;
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! ## S3
42//!
43//! ```no_run
44//! # use tansu_storage::{Error, StorageContainer};
45//! # use url::Url;
46//! # #[tokio::main]
47//! # async fn main() -> Result<(), Error> {
48//! let storage = StorageContainer::builder()
49//!     .cluster_id("tansu")
50//!     .node_id(111)
51//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
52//!     .storage(Url::parse("s3://tansu/")?)
53//!     .build()
54//!     .await?;
55//! # Ok(())
56//! # }
57//! ```
58//!
59//! ## PostgreSQL
60//!
61//! ```no_run
62//! # use tansu_storage::{Error, StorageContainer};
63//! # use url::Url;
64//! # #[tokio::main]
65//! # async fn main() -> Result<(), Error> {
66//! let storage = StorageContainer::builder()
67//!     .cluster_id("tansu")
68//!     .node_id(111)
69//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
70//!     .storage(Url::parse("postgres://postgres:postgres@localhost")?)
71//!     .build()
72//!     .await?;
73//! # Ok(())
74//! # }
75//! ```
76//!
77//! ## libSQL (SQLite)
78//!
79//! ```no_run
80//! # use tansu_storage::{Error, StorageContainer};
81//! # use url::Url;
82//! # #[tokio::main]
83//! # async fn main() -> Result<(), Error> {
84//! let storage = StorageContainer::builder()
85//!     .cluster_id("tansu")
86//!     .node_id(111)
87//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
88//!     .storage(Url::parse("sqlite://tansu.db")?)
89//!     .build()
90//!     .await?;
91//! # Ok(())
92//! # }
93//! ```
94//!
95//! ## Turso
96//!
97//! ```no_run
98//! # use tansu_storage::{Error, StorageContainer};
99//! # use url::Url;
100//! # #[tokio::main]
101//! # async fn main() -> Result<(), Error> {
102//! let storage = StorageContainer::builder()
103//!     .cluster_id("tansu")
104//!     .node_id(111)
105//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
106//!     .storage(Url::parse("turso://tansu.db")?)
107//!     .build()
108//!     .await?;
109//! # Ok(())
110//! # }
111//! ```
112//!
113
114use async_trait::async_trait;
115use bytes::{Bytes, TryGetError};
116
117use console::Emoji;
118#[cfg(any(feature = "libsql", feature = "postgres"))]
119use deadpool::managed::PoolError;
120#[cfg(feature = "dynostore")]
121use dynostore::DynoStore;
122
123use glob::{GlobError, PatternError};
124
125use indicatif::{ProgressBar, ProgressStyle};
126#[cfg(feature = "dynostore")]
127use object_store::memory::InMemory;
128
129#[cfg(feature = "dynostore")]
130use object_store::aws::{AmazonS3Builder, S3ConditionalPut};
131
132use opentelemetry::{
133    InstrumentationScope, KeyValue, global,
134    metrics::{Counter, Meter},
135};
136use opentelemetry_semantic_conventions::SCHEMA_URL;
137
138#[cfg(feature = "postgres")]
139use pg::Postgres;
140
141use regex::Regex;
142use serde::{Deserialize, Serialize};
143#[cfg(any(feature = "libsql", feature = "postgres"))]
144use std::error;
145use std::{
146    array::TryFromSliceError,
147    collections::BTreeMap,
148    ffi::OsString,
149    fmt::{self, Debug, Display, Formatter},
150    fs::DirEntry,
151    io,
152    marker::PhantomData,
153    num::{ParseIntError, TryFromIntError},
154    path::PathBuf,
155    result,
156    str::FromStr,
157    sync::{Arc, LazyLock, PoisonError},
158    time::{Duration, SystemTime, SystemTimeError},
159};
160use tansu_sans_io::{
161    Body, ConfigResource, ErrorCode, IsolationLevel, ListOffset, NULL_TOPIC_ID, ScramMechanism,
162    add_partitions_to_txn_request::{
163        AddPartitionsToTxnRequest, AddPartitionsToTxnTopic, AddPartitionsToTxnTransaction,
164    },
165    add_partitions_to_txn_response::{AddPartitionsToTxnResult, AddPartitionsToTxnTopicResult},
166    consumer_group_describe_response,
167    create_topics_request::CreatableTopic,
168    delete_groups_response::DeletableGroupResult,
169    delete_records_request::DeleteRecordsTopic,
170    delete_records_response::DeleteRecordsTopicResult,
171    delete_topics_request::DeleteTopicState,
172    describe_cluster_response::DescribeClusterBroker,
173    describe_configs_response::DescribeConfigsResult,
174    describe_groups_response,
175    describe_topic_partitions_request::{Cursor, TopicRequest},
176    describe_topic_partitions_response::DescribeTopicPartitionsResponseTopic,
177    fetch_request::FetchTopic,
178    incremental_alter_configs_request::AlterConfigsResource,
179    incremental_alter_configs_response::AlterConfigsResourceResponse,
180    join_group_response::JoinGroupResponseMember,
181    list_groups_response::ListedGroup,
182    metadata_request::MetadataRequestTopic,
183    metadata_response::{MetadataResponseBroker, MetadataResponseTopic},
184    offset_commit_request::OffsetCommitRequestPartition,
185    record::deflated,
186    to_system_time, to_timestamp,
187    txn_offset_commit_request::TxnOffsetCommitRequestTopic,
188    txn_offset_commit_response::TxnOffsetCommitResponseTopic,
189};
190use tansu_schema::{Registry, lake::House};
191use tokio_util::sync::CancellationToken;
192use tracing::{debug, instrument};
193use tracing_subscriber::filter::ParseError;
194use url::Url;
195use uuid::Uuid;
196
197#[cfg(feature = "dynostore")]
198mod dynostore;
199
200mod null;
201
202#[cfg(feature = "postgres")]
203mod pg;
204
205mod proxy;
206mod service;
207
208pub use service::{
209    AlterUserScramCredentialsService, ChannelRequestLayer, ChannelRequestService,
210    ConsumerGroupDescribeService, CreateAclsService, CreateTopicsService, DeleteGroupsService,
211    DeleteRecordsService, DeleteTopicsService, DescribeAclsService, DescribeClusterService,
212    DescribeConfigsService, DescribeGroupsService, DescribeTopicPartitionsService,
213    DescribeUserScramCredentialsService, FetchService, FindCoordinatorService,
214    GetTelemetrySubscriptionsService, IncrementalAlterConfigsService, InitProducerIdService,
215    ListGroupsService, ListOffsetsService, ListPartitionReassignmentsService, MetadataService,
216    ProduceService, Request, RequestChannelService, RequestLayer, RequestReceiver, RequestSender,
217    RequestService, RequestStorageService, Response, TxnAddOffsetsService, TxnAddPartitionService,
218    TxnOffsetCommitService, bounded_channel,
219};
220
221#[cfg(feature = "slatedb")]
222pub mod slate;
223
224#[cfg(any(feature = "libsql", feature = "postgres", feature = "turso"))]
225pub(crate) mod sql;
226
227#[cfg(feature = "libsql")]
228mod lite;
229
230#[cfg(feature = "dynostore")]
231mod os;
232
233#[cfg(feature = "turso")]
234mod limbo;
235
236/// Storage Errors
237#[derive(Clone, Debug, thiserror::Error)]
238pub enum Error {
239    Api(ErrorCode),
240
241    ChronoParse(#[from] chrono::ParseError),
242
243    #[cfg(any(feature = "postgres", feature = "libsql"))]
244    DeadPoolBuild(#[from] deadpool::managed::BuildError),
245
246    Decode(Bytes),
247
248    FeatureNotEnabled {
249        feature: String,
250        message: String,
251    },
252
253    Glob(Arc<GlobError>),
254    Io(Arc<io::Error>),
255    LessThanBaseOffset {
256        offset: i64,
257        base_offset: i64,
258    },
259    LessThanLastOffset {
260        offset: i64,
261        last_offset: Option<i64>,
262    },
263
264    #[cfg(feature = "libsql")]
265    LibSql(Arc<libsql::Error>),
266
267    LessThanMaxTime {
268        time: i64,
269        max_time: Option<i64>,
270    },
271    LessThanMinTime {
272        time: i64,
273        min_time: Option<i64>,
274    },
275    Message(String),
276    NoSuchEntry {
277        nth: u32,
278    },
279    NoSuchOffset(i64),
280    OsString(OsString),
281
282    #[cfg(any(feature = "dynostore", feature = "slatedb"))]
283    ObjectStore(Arc<object_store::Error>),
284
285    ParseFilter(Arc<ParseError>),
286    Pattern(Arc<PatternError>),
287    ParseInt(#[from] ParseIntError),
288    PhantomCached(),
289    Poison,
290
291    #[cfg(any(feature = "libsql", feature = "postgres"))]
292    Pool(Arc<Box<dyn error::Error + Send + Sync>>),
293
294    #[cfg(feature = "slatedb")]
295    Postcard(#[from] postcard::Error),
296
297    Regex(#[from] regex::Error),
298
299    SansIo(#[from] tansu_sans_io::Error),
300
301    Schema(Arc<tansu_schema::Error>),
302
303    Rustls(#[from] rustls::Error),
304
305    SegmentEmpty(Topition),
306
307    SegmentMissing {
308        topition: Topition,
309        offset: Option<i64>,
310    },
311
312    SerdeJson(Arc<serde_json::Error>),
313
314    #[cfg(feature = "slatedb")]
315    Slate(Arc<slatedb::Error>),
316
317    SystemTime(#[from] SystemTimeError),
318
319    #[cfg(feature = "postgres")]
320    TokioPostgres(Arc<tokio_postgres::error::Error>),
321    TryFromInt(#[from] TryFromIntError),
322    TryFromSlice(#[from] TryFromSliceError),
323
324    TryGet(Arc<TryGetError>),
325
326    #[cfg(feature = "turso")]
327    Turso(Arc<turso::Error>),
328
329    UnexpectedBody(Box<Body>),
330
331    UnexpectedServiceResponse(Box<Response>),
332
333    #[cfg(feature = "turso")]
334    UnexpectedValue(turso::Value),
335
336    UnknownCacheKey(String),
337
338    UnsupportedStorageUrl(Url),
339    UnexpectedAddPartitionsToTxnRequest(Box<AddPartitionsToTxnRequest>),
340    Url(#[from] url::ParseError),
341    UnknownTxnState(String),
342
343    Uuid(#[from] uuid::Error),
344
345    UnableToSend,
346    OneshotRecv,
347}
348
349impl Display for Error {
350    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
351        write!(f, "{self:?}")
352    }
353}
354
355impl From<TryGetError> for Error {
356    fn from(value: TryGetError) -> Self {
357        Self::TryGet(Arc::new(value))
358    }
359}
360
361impl<T> From<PoisonError<T>> for Error {
362    fn from(_value: PoisonError<T>) -> Self {
363        Self::Poison
364    }
365}
366
367#[cfg(any(feature = "libsql", feature = "postgres"))]
368impl<E> From<PoolError<E>> for Error
369where
370    E: error::Error + Send + Sync + 'static,
371{
372    fn from(value: PoolError<E>) -> Self {
373        Self::Pool(Arc::new(Box::new(value)))
374    }
375}
376
377#[cfg(feature = "libsql")]
378impl From<libsql::Error> for Error {
379    fn from(value: libsql::Error) -> Self {
380        Self::LibSql(Arc::new(value))
381    }
382}
383
384#[cfg(feature = "slatedb")]
385impl From<slatedb::Error> for Error {
386    fn from(value: slatedb::Error) -> Self {
387        Self::Slate(Arc::new(value))
388    }
389}
390
391#[cfg(feature = "turso")]
392impl From<turso::Error> for Error {
393    fn from(value: turso::Error) -> Self {
394        Self::Turso(Arc::new(value))
395    }
396}
397
398impl From<GlobError> for Error {
399    fn from(value: GlobError) -> Self {
400        Self::Glob(Arc::new(value))
401    }
402}
403
404impl From<io::Error> for Error {
405    fn from(value: io::Error) -> Self {
406        Self::Io(Arc::new(value))
407    }
408}
409
410#[cfg(any(feature = "dynostore", feature = "slatedb"))]
411impl From<Arc<object_store::Error>> for Error {
412    fn from(value: Arc<object_store::Error>) -> Self {
413        Self::ObjectStore(value)
414    }
415}
416
417#[cfg(any(feature = "dynostore", feature = "slatedb"))]
418impl From<object_store::Error> for Error {
419    fn from(value: object_store::Error) -> Self {
420        Self::from(Arc::new(value))
421    }
422}
423
424impl From<ParseError> for Error {
425    fn from(value: ParseError) -> Self {
426        Self::ParseFilter(Arc::new(value))
427    }
428}
429
430impl From<PatternError> for Error {
431    fn from(value: PatternError) -> Self {
432        Self::Pattern(Arc::new(value))
433    }
434}
435
436impl From<serde_json::Error> for Error {
437    fn from(value: serde_json::Error) -> Self {
438        Self::from(Arc::new(value))
439    }
440}
441
442impl From<Arc<serde_json::Error>> for Error {
443    fn from(value: Arc<serde_json::Error>) -> Self {
444        Self::SerdeJson(value)
445    }
446}
447
448#[cfg(feature = "postgres")]
449impl From<tokio_postgres::error::Error> for Error {
450    fn from(value: tokio_postgres::error::Error) -> Self {
451        Self::from(Arc::new(value))
452    }
453}
454
455#[cfg(feature = "postgres")]
456impl From<Arc<tokio_postgres::error::Error>> for Error {
457    fn from(value: Arc<tokio_postgres::error::Error>) -> Self {
458        Self::TokioPostgres(value)
459    }
460}
461
462impl From<tansu_schema::Error> for Error {
463    fn from(value: tansu_schema::Error) -> Self {
464        if let tansu_schema::Error::Api(error_code) = value {
465            Self::Api(error_code)
466        } else {
467            Self::Schema(Arc::new(value))
468        }
469    }
470}
471
472pub type Result<T, E = Error> = result::Result<T, E>;
473
474/// Topic Partition (topition)
475///
476/// A topic partition pair.
477#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
478pub struct Topition {
479    topic: String,
480    partition: i32,
481}
482
483impl Topition {
484    pub fn new(topic: impl Into<String>, partition: i32) -> Self {
485        let topic = topic.into();
486        Self { topic, partition }
487    }
488
489    pub fn topic(&self) -> &str {
490        &self.topic
491    }
492
493    pub fn partition(&self) -> i32 {
494        self.partition
495    }
496}
497
498impl From<Cursor> for Topition {
499    fn from(value: Cursor) -> Self {
500        Self {
501            topic: value.topic_name,
502            partition: value.partition_index,
503        }
504    }
505}
506
507impl TryFrom<&DirEntry> for Topition {
508    type Error = Error;
509
510    fn try_from(value: &DirEntry) -> result::Result<Self, Self::Error> {
511        Regex::new(r"^(?<topic>.+)-(?<partition>\d{10})$")
512            .map_err(Into::into)
513            .and_then(|re| {
514                value
515                    .file_name()
516                    .into_string()
517                    .map_err(Error::OsString)
518                    .and_then(|ref file_name| {
519                        re.captures(file_name)
520                            .ok_or(Error::Message(format!("no captures for {file_name}")))
521                            .and_then(|ref captures| {
522                                let topic = captures
523                                    .name("topic")
524                                    .ok_or(Error::Message(format!("missing topic for {file_name}")))
525                                    .map(|s| s.as_str().to_owned())?;
526
527                                let partition = captures
528                                    .name("partition")
529                                    .ok_or(Error::Message(format!(
530                                        "missing partition for: {file_name}"
531                                    )))
532                                    .map(|s| s.as_str())
533                                    .and_then(|s| str::parse(s).map_err(Into::into))?;
534
535                                Ok(Self { topic, partition })
536                            })
537                    })
538            })
539    }
540}
541
542impl FromStr for Topition {
543    type Err = Error;
544
545    fn from_str(s: &str) -> result::Result<Self, Self::Err> {
546        i32::from_str(&s[s.len() - 10..])
547            .map(|partition| {
548                let topic = String::from(&s[..s.len() - 11]);
549
550                Self { topic, partition }
551            })
552            .map_err(Into::into)
553    }
554}
555
556impl From<&Topition> for PathBuf {
557    fn from(value: &Topition) -> Self {
558        let topic = value.topic.as_str();
559        let partition = value.partition;
560        PathBuf::from(format!("{topic}-{partition:0>10}"))
561    }
562}
563
564/// Topic Partition Offset
565///
566/// A topic partition with an offset.
567#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
568pub struct TopitionOffset {
569    topition: Topition,
570    offset: i64,
571}
572
573impl TopitionOffset {
574    pub fn new(topition: Topition, offset: i64) -> Self {
575        Self { topition, offset }
576    }
577
578    pub fn topition(&self) -> &Topition {
579        &self.topition
580    }
581
582    pub fn offset(&self) -> i64 {
583        self.offset
584    }
585}
586
587impl From<&TopitionOffset> for PathBuf {
588    fn from(value: &TopitionOffset) -> Self {
589        let offset = value.offset;
590        PathBuf::from(value.topition()).join(format!("{offset:0>20}"))
591    }
592}
593
594pub type ListOffsetRequest = ListOffset;
595
596#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
597pub struct ListOffsetResponse {
598    pub error_code: ErrorCode,
599    pub timestamp: Option<SystemTime>,
600    pub offset: Option<i64>,
601}
602
603impl Default for ListOffsetResponse {
604    fn default() -> Self {
605        Self {
606            error_code: ErrorCode::None,
607            timestamp: None,
608            offset: None,
609        }
610    }
611}
612
613impl ListOffsetResponse {
614    pub fn offset(&self) -> Option<i64> {
615        self.offset
616    }
617
618    pub fn timestamp(&self) -> Result<Option<i64>> {
619        self.timestamp.map_or(Ok(None), |system_time| {
620            to_timestamp(&system_time).map(Some).map_err(Into::into)
621        })
622    }
623
624    pub fn error_code(&self) -> ErrorCode {
625        self.error_code
626    }
627}
628
629/// Offset Commit Request
630///
631/// A structure representing an [`tansu_sans_io::OffsetCommitRequestPartition](OffsetCommitRequestPartition).
632#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
633pub struct OffsetCommitRequest {
634    offset: i64,
635    leader_epoch: Option<i32>,
636    timestamp: Option<SystemTime>,
637    metadata: Option<String>,
638}
639
640impl OffsetCommitRequest {
641    pub fn offset(self, offset: i64) -> Self {
642        Self { offset, ..self }
643    }
644}
645
646impl TryFrom<&OffsetCommitRequestPartition> for OffsetCommitRequest {
647    type Error = Error;
648
649    fn try_from(value: &OffsetCommitRequestPartition) -> Result<Self, Self::Error> {
650        value
651            .commit_timestamp
652            .map_or(Ok(None), |commit_timestamp| {
653                to_system_time(commit_timestamp)
654                    .map(Some)
655                    .map_err(Into::into)
656            })
657            .map(|timestamp| Self {
658                offset: value.committed_offset,
659                leader_epoch: value.committed_leader_epoch,
660                timestamp,
661                metadata: value.committed_metadata.clone(),
662            })
663    }
664}
665
666/// Topic Id
667///
668/// An enumeration of either the name or UUID of a topic.
669#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
670pub enum TopicId {
671    Name(String),
672    Id(Uuid),
673}
674
675impl FromStr for TopicId {
676    type Err = Error;
677
678    fn from_str(s: &str) -> result::Result<Self, Self::Err> {
679        Ok(Self::Name(s.into()))
680    }
681}
682
683impl From<&str> for TopicId {
684    fn from(value: &str) -> Self {
685        Self::Name(value.to_owned())
686    }
687}
688
689impl From<String> for TopicId {
690    fn from(value: String) -> Self {
691        Self::Name(value)
692    }
693}
694
695impl From<Uuid> for TopicId {
696    fn from(value: Uuid) -> Self {
697        Self::Id(value)
698    }
699}
700
701impl From<[u8; 16]> for TopicId {
702    fn from(value: [u8; 16]) -> Self {
703        Self::Id(Uuid::from_bytes(value))
704    }
705}
706
707impl From<&TopicId> for [u8; 16] {
708    fn from(value: &TopicId) -> Self {
709        match value {
710            TopicId::Id(id) => id.into_bytes(),
711            TopicId::Name(_) => NULL_TOPIC_ID,
712        }
713    }
714}
715
716impl From<&FetchTopic> for TopicId {
717    fn from(value: &FetchTopic) -> Self {
718        if let Some(ref name) = value.topic {
719            Self::Name(name.into())
720        } else if let Some(ref id) = value.topic_id {
721            Self::Id(Uuid::from_bytes(*id))
722        } else {
723            panic!("neither name nor uuid")
724        }
725    }
726}
727
728impl From<&MetadataRequestTopic> for TopicId {
729    fn from(value: &MetadataRequestTopic) -> Self {
730        if let Some(ref name) = value.name {
731            Self::Name(name.into())
732        } else if let Some(ref id) = value.topic_id {
733            Self::Id(Uuid::from_bytes(*id))
734        } else {
735            panic!("neither name nor uuid")
736        }
737    }
738}
739
740impl From<DeleteTopicState> for TopicId {
741    fn from(value: DeleteTopicState) -> Self {
742        match value {
743            DeleteTopicState {
744                name: Some(name),
745                topic_id,
746                ..
747            } if topic_id == NULL_TOPIC_ID => name.into(),
748
749            DeleteTopicState { topic_id, .. } => topic_id.into(),
750        }
751    }
752}
753
754impl From<&TopicRequest> for TopicId {
755    fn from(value: &TopicRequest) -> Self {
756        value.name.to_owned().into()
757    }
758}
759
760impl From<&Topition> for TopicId {
761    fn from(value: &Topition) -> Self {
762        value.topic.to_owned().into()
763    }
764}
765
766/// Broker Registration Request
767///
768/// A broker will register with storage using this structure.
769#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
770pub struct BrokerRegistrationRequest {
771    pub broker_id: i32,
772    pub cluster_id: String,
773    pub incarnation_id: Uuid,
774    pub rack: Option<String>,
775}
776
777#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
778pub struct MetadataResponse {
779    cluster: Option<String>,
780    controller: Option<i32>,
781    brokers: Vec<MetadataResponseBroker>,
782    topics: Vec<MetadataResponseTopic>,
783}
784
785impl MetadataResponse {
786    pub fn cluster(&self) -> Option<&str> {
787        self.cluster.as_deref()
788    }
789
790    pub fn controller(&self) -> Option<i32> {
791        self.controller
792    }
793
794    pub fn brokers(&self) -> &[MetadataResponseBroker] {
795        self.brokers.as_ref()
796    }
797
798    pub fn topics(&self) -> &[MetadataResponseTopic] {
799        self.topics.as_ref()
800    }
801}
802
803/// Offset Stage
804///
805/// An offset stage structure represents the `last_stable`, `high_watermark` and `log_start` offsets.
806#[derive(
807    Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
808)]
809pub struct OffsetStage {
810    last_stable: i64,
811    high_watermark: i64,
812    log_start: i64,
813}
814
815impl OffsetStage {
816    pub fn last_stable(&self) -> i64 {
817        self.last_stable
818    }
819
820    pub fn high_watermark(&self) -> i64 {
821        self.high_watermark
822    }
823
824    pub fn log_start(&self) -> i64 {
825        self.log_start
826    }
827}
828
829/// Group Member
830#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
831pub struct GroupMember {
832    pub join_response: JoinGroupResponseMember,
833    pub last_contact: Option<SystemTime>,
834}
835
836/// Group State
837///
838/// A group is either in the process of [`GroupState::Forming`] or has [`GroupState::Formed`].
839#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
840pub enum GroupState {
841    Forming {
842        protocol_type: Option<String>,
843        protocol_name: Option<String>,
844        leader: Option<String>,
845    },
846
847    Formed {
848        protocol_type: String,
849        protocol_name: String,
850        leader: String,
851        assignments: BTreeMap<String, Bytes>,
852    },
853}
854
855impl GroupState {
856    pub fn protocol_type(&self) -> Option<String> {
857        match self {
858            Self::Forming { protocol_type, .. } => protocol_type.clone(),
859            Self::Formed { protocol_type, .. } => Some(protocol_type.clone()),
860        }
861    }
862
863    pub fn protocol_name(&self) -> Option<String> {
864        match self {
865            Self::Forming { protocol_name, .. } => protocol_name.clone(),
866            Self::Formed { protocol_name, .. } => Some(protocol_name.clone()),
867        }
868    }
869
870    pub fn leader(&self) -> Option<String> {
871        match self {
872            Self::Forming { leader, .. } => leader.clone(),
873            Self::Formed { leader, .. } => Some(leader.clone()),
874        }
875    }
876
877    pub fn assignments(&self) -> BTreeMap<String, Bytes> {
878        match self {
879            Self::Forming { .. } => BTreeMap::new(),
880            Self::Formed { assignments, .. } => assignments.clone(),
881        }
882    }
883}
884
885impl Default for GroupState {
886    fn default() -> Self {
887        Self::Forming {
888            protocol_type: None,
889            protocol_name: Some("".into()),
890            leader: None,
891        }
892    }
893}
894
895/// Consumer Group State
896///
897/// A helper type for conversion into [`consumer_group_describe_response::DescribedGroup`].
898#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
899pub enum ConsumerGroupState {
900    Unknown,
901    PreparingRebalance,
902    CompletingRebalance,
903    Stable,
904    Dead,
905    Empty,
906    Assigning,
907    Reconciling,
908}
909
910impl Display for ConsumerGroupState {
911    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
912        match self {
913            Self::Unknown => f.write_str("Unknown"),
914            Self::PreparingRebalance => f.write_str("PreparingRebalance"),
915            Self::CompletingRebalance => f.write_str("CompletingRebalance"),
916            Self::Stable => f.write_str("Stable"),
917            Self::Dead => f.write_str("Dead"),
918            Self::Empty => f.write_str("Empty"),
919            Self::Assigning => f.write_str("Assigning"),
920            Self::Reconciling => f.write_str("Reconciling"),
921        }
922    }
923}
924
925/// Group Detail
926///
927/// A helper type that can be easily converted into [`consumer_group_describe_response::DescribedGroup`].
928#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
929pub struct GroupDetail {
930    pub session_timeout_ms: i32,
931    pub rebalance_timeout_ms: Option<i32>,
932    pub members: BTreeMap<String, GroupMember>,
933    pub generation_id: i32,
934    pub skip_assignment: Option<bool>,
935    pub inception: SystemTime,
936    pub state: GroupState,
937}
938
939impl Default for GroupDetail {
940    fn default() -> Self {
941        Self {
942            session_timeout_ms: 45_000,
943            rebalance_timeout_ms: None,
944            members: BTreeMap::new(),
945            generation_id: -1,
946            skip_assignment: Some(false),
947            inception: SystemTime::now(),
948            state: GroupState::default(),
949        }
950    }
951}
952
953impl From<&GroupDetail> for ConsumerGroupState {
954    fn from(value: &GroupDetail) -> Self {
955        match value {
956            GroupDetail { members, .. } if members.is_empty() => Self::Empty,
957
958            GroupDetail {
959                state: GroupState::Forming { leader: None, .. },
960                ..
961            } => Self::Assigning,
962
963            GroupDetail {
964                state: GroupState::Formed { .. },
965                ..
966            } => Self::Stable,
967
968            _ => {
969                debug!(unknown = ?value);
970                Self::Unknown
971            }
972        }
973    }
974}
975
976impl From<&GroupDetail> for consumer_group_describe_response::DescribedGroup {
977    fn from(value: &GroupDetail) -> Self {
978        let assignor_name = match value.state {
979            GroupState::Forming { ref leader, .. } => leader.clone().unwrap_or_default(),
980            GroupState::Formed { ref leader, .. } => leader.clone(),
981        };
982
983        let group_state = ConsumerGroupState::from(value).to_string();
984
985        Self::default()
986            .error_code(ErrorCode::None.into())
987            .error_message(Some(ErrorCode::None.to_string()))
988            .group_id(Default::default())
989            .group_state(group_state)
990            .group_epoch(-1)
991            .assignment_epoch(-1)
992            .assignor_name(assignor_name)
993            .members(Some([].into()))
994            .authorized_operations(-1)
995    }
996}
997
998#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
999pub enum GroupDetailResponse {
1000    ErrorCode(ErrorCode),
1001    Found(GroupDetail),
1002}
1003
1004/// NamedGroupDetail
1005///
1006/// A helper type that can be easily converted into [`consumer_group_describe_response::DescribedGroup`]
1007/// or [`describe_groups_response::DescribedGroup`].
1008#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1009pub struct NamedGroupDetail {
1010    name: String,
1011    response: GroupDetailResponse,
1012}
1013
1014impl NamedGroupDetail {
1015    pub fn error_code(name: String, error_code: ErrorCode) -> Self {
1016        Self {
1017            name,
1018            response: GroupDetailResponse::ErrorCode(error_code),
1019        }
1020    }
1021
1022    pub fn found(name: String, found: GroupDetail) -> Self {
1023        Self {
1024            name,
1025            response: GroupDetailResponse::Found(found),
1026        }
1027    }
1028}
1029
1030impl From<&NamedGroupDetail> for consumer_group_describe_response::DescribedGroup {
1031    fn from(value: &NamedGroupDetail) -> Self {
1032        match value {
1033            NamedGroupDetail {
1034                name,
1035                response: GroupDetailResponse::Found(group_detail),
1036            } => {
1037                let assignor_name = match group_detail.state {
1038                    GroupState::Forming { ref leader, .. } => leader.clone().unwrap_or_default(),
1039                    GroupState::Formed { ref leader, .. } => leader.clone(),
1040                };
1041
1042                let group_state = ConsumerGroupState::from(group_detail).to_string();
1043
1044                Self::default()
1045                    .error_code(ErrorCode::None.into())
1046                    .error_message(Some(ErrorCode::None.to_string()))
1047                    .group_id(name.into())
1048                    .group_state(group_state)
1049                    .group_epoch(-1)
1050                    .assignment_epoch(-1)
1051                    .assignor_name(assignor_name)
1052                    .members(Some([].into()))
1053                    .authorized_operations(-1)
1054            }
1055
1056            NamedGroupDetail {
1057                name,
1058                response: GroupDetailResponse::ErrorCode(error_code),
1059            } => Self::default()
1060                .error_code((*error_code).into())
1061                .error_message(Some(error_code.to_string()))
1062                .group_id(name.into())
1063                .group_state("Unknown".into())
1064                .group_epoch(-1)
1065                .assignment_epoch(-1)
1066                .assignor_name("".into())
1067                .members(Some([].into()))
1068                .authorized_operations(-1),
1069        }
1070    }
1071}
1072
1073impl From<&NamedGroupDetail> for describe_groups_response::DescribedGroup {
1074    fn from(value: &NamedGroupDetail) -> Self {
1075        match value {
1076            NamedGroupDetail {
1077                name,
1078                response: GroupDetailResponse::Found(group_detail),
1079            } => {
1080                let group_state = ConsumerGroupState::from(group_detail).to_string();
1081
1082                let members = group_detail
1083                    .members
1084                    .keys()
1085                    .map(|member_id| {
1086                        describe_groups_response::DescribedGroupMember::default()
1087                            .member_id(member_id.into())
1088                            .group_instance_id(None)
1089                            .client_id("".into())
1090                            .client_host("".into())
1091                            .member_metadata(Bytes::new())
1092                            .member_assignment(Bytes::new())
1093                    })
1094                    .collect::<Vec<_>>();
1095
1096                Self::default()
1097                    .error_code(ErrorCode::None.into())
1098                    .group_id(name.clone())
1099                    .group_state(group_state)
1100                    .protocol_type(group_detail.state.protocol_type().unwrap_or_default())
1101                    .protocol_data("".into())
1102                    .members(Some(members))
1103                    .authorized_operations(Some(-1))
1104            }
1105
1106            NamedGroupDetail {
1107                name,
1108                response: GroupDetailResponse::ErrorCode(error_code),
1109            } => Self::default()
1110                .error_code((*error_code).into())
1111                .group_id(name.clone())
1112                .group_state("Unknown".into())
1113                .protocol_type("".into())
1114                .protocol_data("".into())
1115                .members(Some(vec![]))
1116                .authorized_operations(Some(-1)),
1117        }
1118    }
1119}
1120
1121/// Topition (topic partition) Detail
1122#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1123pub struct TopitionDetail {
1124    error: ErrorCode,
1125    topic: TopicId,
1126    partitions: Option<Vec<PartitionDetail>>,
1127}
1128
1129/// Partition Detail
1130#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1131pub struct PartitionDetail {
1132    error: ErrorCode,
1133    partition_index: i32,
1134}
1135
1136/// Version representing an `e_tag` and `version` used in conditional writes to an object store.
1137#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1138pub struct Version {
1139    e_tag: Option<String>,
1140    version: Option<String>,
1141}
1142
1143impl From<&Uuid> for Version {
1144    fn from(value: &Uuid) -> Self {
1145        Self {
1146            e_tag: Some(value.to_string()),
1147            version: None,
1148        }
1149    }
1150}
1151
1152/// Producer Id Response
1153#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1154pub struct ProducerIdResponse {
1155    pub error: ErrorCode,
1156    pub id: i64,
1157    pub epoch: i16,
1158}
1159
1160impl Default for ProducerIdResponse {
1161    fn default() -> Self {
1162        Self {
1163            error: ErrorCode::None,
1164            id: 1,
1165            epoch: 0,
1166        }
1167    }
1168}
1169
1170/// For protocol versions 0..=3 using [`AddPartitionsToTxnTopic`],
1171/// thereafter using [`AddPartitionsToTxnTransaction`].
1172#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1173pub enum TxnAddPartitionsRequest {
1174    VersionZeroToThree {
1175        transaction_id: String,
1176        producer_id: i64,
1177        producer_epoch: i16,
1178        topics: Vec<AddPartitionsToTxnTopic>,
1179    },
1180
1181    VersionFourPlus {
1182        transactions: Vec<AddPartitionsToTxnTransaction>,
1183    },
1184}
1185
1186impl TryFrom<AddPartitionsToTxnRequest> for TxnAddPartitionsRequest {
1187    type Error = Error;
1188
1189    fn try_from(value: AddPartitionsToTxnRequest) -> result::Result<Self, Self::Error> {
1190        match value {
1191            AddPartitionsToTxnRequest {
1192                transactions: None,
1193                v_3_and_below_transactional_id: Some(transactional_id),
1194                v_3_and_below_producer_id: Some(producer_id),
1195                v_3_and_below_producer_epoch: Some(producer_epoch),
1196                v_3_and_below_topics: Some(topics),
1197                ..
1198            } => Ok(Self::VersionZeroToThree {
1199                transaction_id: transactional_id,
1200                producer_id,
1201                producer_epoch,
1202                topics,
1203            }),
1204
1205            AddPartitionsToTxnRequest {
1206                transactions: Some(transactions),
1207                v_3_and_below_transactional_id: None,
1208                v_3_and_below_producer_id: None,
1209                v_3_and_below_producer_epoch: None,
1210                v_3_and_below_topics: None,
1211                ..
1212            } => Ok(Self::VersionFourPlus { transactions }),
1213
1214            unexpected => Err(Error::UnexpectedAddPartitionsToTxnRequest(Box::new(
1215                unexpected,
1216            ))),
1217        }
1218    }
1219}
1220
1221/// Transaction Add Partitions Response
1222///
1223/// For protocol versions 0..=3 using `AddPartitionsToTxnTopic`, thereafter using `AddPartitionsToTxnTransaction`.
1224#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1225pub enum TxnAddPartitionsResponse {
1226    VersionZeroToThree(Vec<AddPartitionsToTxnTopicResult>),
1227    VersionFourPlus(Vec<AddPartitionsToTxnResult>),
1228}
1229
1230impl TxnAddPartitionsResponse {
1231    pub fn zero_to_three(&self) -> &[AddPartitionsToTxnTopicResult] {
1232        match self {
1233            Self::VersionZeroToThree(result) => result.as_slice(),
1234            Self::VersionFourPlus(_) => &[][..],
1235        }
1236    }
1237
1238    pub fn four_plus(&self) -> &[AddPartitionsToTxnResult] {
1239        match self {
1240            Self::VersionZeroToThree(_) => &[][..],
1241            Self::VersionFourPlus(result) => result.as_slice(),
1242        }
1243    }
1244}
1245
1246/// Transaction Offset Commit Request
1247#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1248pub struct TxnOffsetCommitRequest {
1249    pub transaction_id: String,
1250    pub group_id: String,
1251    pub producer_id: i64,
1252    pub producer_epoch: i16,
1253    pub generation_id: Option<i32>,
1254    pub member_id: Option<String>,
1255    pub group_instance_id: Option<String>,
1256    pub topics: Vec<TxnOffsetCommitRequestTopic>,
1257}
1258
1259/// Transaction State
1260#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1261pub enum TxnState {
1262    Begin,
1263    PrepareCommit,
1264    PrepareAbort,
1265    Committed,
1266    Aborted,
1267}
1268
1269impl TxnState {
1270    pub fn is_prepared(&self) -> bool {
1271        match self {
1272            Self::PrepareAbort | Self::PrepareCommit => true,
1273            _otherwise => false,
1274        }
1275    }
1276}
1277
1278impl FromStr for TxnState {
1279    type Err = Error;
1280
1281    fn from_str(s: &str) -> Result<Self, Self::Err> {
1282        match s {
1283            "ABORTED" => Ok(TxnState::Aborted),
1284            "BEGIN" => Ok(TxnState::Begin),
1285            "COMMITTED" => Ok(TxnState::Committed),
1286            "PREPARE_ABORT" => Ok(TxnState::PrepareAbort),
1287            "PREPARE_COMMIT" => Ok(TxnState::PrepareCommit),
1288            otherwise => Err(Error::UnknownTxnState(otherwise.to_owned())),
1289        }
1290    }
1291}
1292
1293impl TryFrom<String> for TxnState {
1294    type Error = Error;
1295
1296    fn try_from(value: String) -> Result<Self, Self::Error> {
1297        Self::from_str(&value)
1298    }
1299}
1300
1301impl From<TxnState> for String {
1302    fn from(value: TxnState) -> Self {
1303        match value {
1304            TxnState::Begin => "BEGIN".into(),
1305            TxnState::PrepareCommit => "PREPARE_COMMIT".into(),
1306            TxnState::PrepareAbort => "PREPARE_ABORT".into(),
1307            TxnState::Committed => "COMMITTED".into(),
1308            TxnState::Aborted => "ABORTED".into(),
1309        }
1310    }
1311}
1312
1313/// Storage
1314///
1315/// The Core storage abstraction. All storage engines implement this type.
1316#[async_trait]
1317pub trait Storage: Clone + Debug + Send + Sync + 'static {
1318    /// On startup a broker will register with storage.
1319    async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()>;
1320
1321    /// Create a topic on this storage.
1322    async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid>;
1323
1324    /// Incrementally alter a resource on this storage.
1325    async fn incremental_alter_resource(
1326        &self,
1327        resource: AlterConfigsResource,
1328    ) -> Result<AlterConfigsResourceResponse>;
1329
1330    /// Delete records on this storage.
1331    async fn delete_records(
1332        &self,
1333        topics: &[DeleteRecordsTopic],
1334    ) -> Result<Vec<DeleteRecordsTopicResult>>;
1335
1336    /// Delete a topic from this storage.
1337    async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode>;
1338
1339    /// Query the brokers registered with this storage.
1340    async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>>;
1341
1342    /// Produce a deflated batch to this storage.
1343    async fn produce(
1344        &self,
1345        transaction_id: Option<&str>,
1346        topition: &Topition,
1347        batch: deflated::Batch,
1348    ) -> Result<i64>;
1349
1350    /// Fetch deflated batches from storage.
1351    async fn fetch(
1352        &self,
1353        topition: &'_ Topition,
1354        offset: i64,
1355        min_bytes: u32,
1356        max_bytes: u32,
1357        isolation: IsolationLevel,
1358    ) -> Result<Vec<deflated::Batch>>;
1359
1360    /// Query the offset stage for a topic partition.
1361    async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage>;
1362
1363    /// Query the offsets for one or more topic partitions.
1364    async fn list_offsets(
1365        &self,
1366        isolation_level: IsolationLevel,
1367        offsets: &[(Topition, ListOffset)],
1368    ) -> Result<Vec<(Topition, ListOffsetResponse)>>;
1369
1370    /// Commit offsets for one or more topic partitions in a consumer group.
1371    async fn offset_commit(
1372        &self,
1373        group_id: &str,
1374        retention_time_ms: Option<Duration>,
1375        offsets: &[(Topition, OffsetCommitRequest)],
1376    ) -> Result<Vec<(Topition, ErrorCode)>>;
1377
1378    /// Fetch committed offsets for one or more topic partitions in a consumer group.
1379    async fn offset_fetch(
1380        &self,
1381        group_id: Option<&str>,
1382        topics: &[Topition],
1383        require_stable: Option<bool>,
1384    ) -> Result<BTreeMap<Topition, i64>>;
1385
1386    /// Fetch all committed offsets in a consumer group.
1387    async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>>;
1388
1389    /// Query broker and topic metadata.
1390    async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse>;
1391
1392    async fn upsert_user_scram_credential(
1393        &self,
1394        user: &str,
1395        mechanism: ScramMechanism,
1396        credential: ScramCredential,
1397    ) -> Result<()>;
1398
1399    async fn delete_user_scram_credential(
1400        &self,
1401        user: &str,
1402        mechanism: ScramMechanism,
1403    ) -> Result<()>;
1404
1405    async fn user_scram_credential(
1406        &self,
1407        user: &str,
1408        mechanism: ScramMechanism,
1409    ) -> Result<Option<ScramCredential>>;
1410
1411    /// Query the configuration of a resource in this storage.
1412    async fn describe_config(
1413        &self,
1414        name: &str,
1415        resource: ConfigResource,
1416        keys: Option<&[String]>,
1417    ) -> Result<DescribeConfigsResult>;
1418
1419    /// Query available groups optionally with a state filter.
1420    async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>>;
1421
1422    /// Delete one or more groups from storage.
1423    async fn delete_groups(
1424        &self,
1425        group_ids: Option<&[String]>,
1426    ) -> Result<Vec<DeletableGroupResult>>;
1427
1428    /// Describe the groups found in this storage.
1429    async fn describe_groups(
1430        &self,
1431        group_ids: Option<&[String]>,
1432        include_authorized_operations: bool,
1433    ) -> Result<Vec<NamedGroupDetail>>;
1434
1435    /// Describe the topic partitions found in this storage.
1436    async fn describe_topic_partitions(
1437        &self,
1438        topics: Option<&[TopicId]>,
1439        partition_limit: i32,
1440        cursor: Option<Topition>,
1441    ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>>;
1442
1443    /// Conditionally update the state of a group in this storage.
1444    async fn update_group(
1445        &self,
1446        group_id: &str,
1447        detail: GroupDetail,
1448        version: Option<Version>,
1449    ) -> Result<Version, UpdateError<GroupDetail>>;
1450
1451    /// Initialise a transactional or idempotent producer in this storage.
1452    async fn init_producer(
1453        &self,
1454        transaction_id: Option<&str>,
1455        transaction_timeout_ms: i32,
1456        producer_id: Option<i64>,
1457        producer_epoch: Option<i16>,
1458    ) -> Result<ProducerIdResponse>;
1459
1460    /// Add offsets to a transaction for a producer.
1461    async fn txn_add_offsets(
1462        &self,
1463        transaction_id: &str,
1464        producer_id: i64,
1465        producer_epoch: i16,
1466        group_id: &str,
1467    ) -> Result<ErrorCode>;
1468
1469    /// Add partitions to a transaction.
1470    async fn txn_add_partitions(
1471        &self,
1472        partitions: TxnAddPartitionsRequest,
1473    ) -> Result<TxnAddPartitionsResponse>;
1474
1475    /// Commit an offset within a transaction.
1476    async fn txn_offset_commit(
1477        &self,
1478        offsets: TxnOffsetCommitRequest,
1479    ) -> Result<Vec<TxnOffsetCommitResponseTopic>>;
1480
1481    /// Commit or abort a running transaction.
1482    async fn txn_end(
1483        &self,
1484        transaction_id: &str,
1485        producer_id: i64,
1486        producer_epoch: i16,
1487        committed: bool,
1488    ) -> Result<ErrorCode>;
1489
1490    /// Run periodic maintenance on this storage.
1491    async fn maintain(&self, _now: SystemTime) -> Result<()> {
1492        Ok(())
1493    }
1494
1495    async fn cluster_id(&self) -> Result<String>;
1496
1497    async fn node(&self) -> Result<i32>;
1498
1499    async fn advertised_listener(&self) -> Result<Url>;
1500
1501    async fn ping(&self) -> Result<()>;
1502}
1503
1504/// Conditional Update Errors
1505#[derive(Clone, Debug, thiserror::Error)]
1506pub enum UpdateError<T> {
1507    Error(#[from] Error),
1508
1509    MissingEtag,
1510
1511    Outdated { current: Box<T>, version: Version },
1512
1513    SerdeJson(Arc<serde_json::Error>),
1514
1515    Uuid(#[from] uuid::Error),
1516}
1517
1518#[cfg(feature = "libsql")]
1519impl<T> From<libsql::Error> for UpdateError<T> {
1520    fn from(value: libsql::Error) -> Self {
1521        Self::Error(Error::from(value))
1522    }
1523}
1524
1525#[cfg(feature = "turso")]
1526impl<T> From<turso::Error> for UpdateError<T> {
1527    fn from(value: turso::Error) -> Self {
1528        Self::Error(Error::from(value))
1529    }
1530}
1531
1532#[cfg(any(feature = "dynostore", feature = "slatedb"))]
1533impl<T> From<object_store::Error> for UpdateError<T> {
1534    fn from(value: object_store::Error) -> Self {
1535        Self::Error(Error::from(value))
1536    }
1537}
1538
1539impl<T> From<serde_json::Error> for UpdateError<T> {
1540    fn from(value: serde_json::Error) -> Self {
1541        Self::SerdeJson(Arc::new(value))
1542    }
1543}
1544
1545#[cfg(feature = "postgres")]
1546impl<T> From<tokio_postgres::error::Error> for UpdateError<T> {
1547    fn from(value: tokio_postgres::error::Error) -> Self {
1548        Self::Error(Error::from(value))
1549    }
1550}
1551
1552/// Storage Container
1553#[derive(Clone)]
1554#[cfg_attr(
1555    not(any(
1556        feature = "dynostore",
1557        feature = "libsql",
1558        feature = "postgres",
1559        feature = "slatedb",
1560        feature = "turso"
1561    )),
1562    allow(missing_copy_implementations)
1563)]
1564pub enum StorageContainer {
1565    Null(null::Engine),
1566
1567    #[cfg(feature = "postgres")]
1568    Postgres(Postgres),
1569
1570    #[cfg(feature = "dynostore")]
1571    DynoStore(DynoStore),
1572
1573    #[cfg(feature = "libsql")]
1574    Lite(lite::Engine),
1575
1576    #[cfg(feature = "slatedb")]
1577    Slate(slate::Engine),
1578
1579    #[cfg(feature = "turso")]
1580    Turso(limbo::Engine),
1581}
1582
1583impl Debug for StorageContainer {
1584    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1585        match self {
1586            Self::Null(_) => f.debug_tuple(stringify!(StorageContainer::Null)).finish(),
1587
1588            #[cfg(feature = "postgres")]
1589            Self::Postgres(_) => f
1590                .debug_tuple(stringify!(StorageContainer::Postgres))
1591                .finish(),
1592
1593            #[cfg(feature = "dynostore")]
1594            Self::DynoStore(_) => f
1595                .debug_tuple(stringify!(StorageContainer::DynoStore))
1596                .finish(),
1597
1598            #[cfg(feature = "libsql")]
1599            Self::Lite(_) => f.debug_tuple(stringify!(StorageContainer::Lite)).finish(),
1600
1601            #[cfg(feature = "slatedb")]
1602            Self::Slate(_) => f.debug_tuple(stringify!(StorageContainer::Slate)).finish(),
1603
1604            #[cfg(feature = "turso")]
1605            Self::Turso(_) => f.debug_tuple(stringify!(StorageContainer::Turso)).finish(),
1606        }
1607    }
1608}
1609
1610impl StorageContainer {
1611    pub fn builder() -> PhantomBuilder {
1612        PhantomBuilder::default()
1613    }
1614}
1615
1616/// A [`StorageContainer`] builder
1617#[derive(Clone, Debug, Default)]
1618pub struct Builder<N, C, A, S> {
1619    node_id: N,
1620    cluster_id: C,
1621    advertised_listener: A,
1622    storage: S,
1623    schema_registry: Option<Registry>,
1624    lake_house: Option<House>,
1625    silent: bool,
1626
1627    cancellation: CancellationToken,
1628}
1629
1630type PhantomBuilder =
1631    Builder<PhantomData<i32>, PhantomData<String>, PhantomData<Url>, PhantomData<Url>>;
1632
1633impl<N, C, A, S> Builder<N, C, A, S> {
1634    pub fn node_id(self, node_id: i32) -> Builder<i32, C, A, S> {
1635        Builder {
1636            node_id,
1637            cluster_id: self.cluster_id,
1638            advertised_listener: self.advertised_listener,
1639            storage: self.storage,
1640            schema_registry: self.schema_registry,
1641            lake_house: self.lake_house,
1642            silent: self.silent,
1643            cancellation: self.cancellation,
1644        }
1645    }
1646
1647    pub fn cluster_id(self, cluster_id: impl Into<String>) -> Builder<N, String, A, S> {
1648        Builder {
1649            node_id: self.node_id,
1650            cluster_id: cluster_id.into(),
1651            advertised_listener: self.advertised_listener,
1652            storage: self.storage,
1653            schema_registry: self.schema_registry,
1654            lake_house: self.lake_house,
1655            silent: self.silent,
1656            cancellation: self.cancellation,
1657        }
1658    }
1659
1660    pub fn advertised_listener(self, advertised_listener: impl Into<Url>) -> Builder<N, C, Url, S> {
1661        Builder {
1662            node_id: self.node_id,
1663            cluster_id: self.cluster_id,
1664            advertised_listener: advertised_listener.into(),
1665            storage: self.storage,
1666            schema_registry: self.schema_registry,
1667            lake_house: self.lake_house,
1668            silent: self.silent,
1669            cancellation: self.cancellation,
1670        }
1671    }
1672
1673    pub fn storage(self, storage: Url) -> Builder<N, C, A, Url> {
1674        debug!(%storage);
1675
1676        Builder {
1677            node_id: self.node_id,
1678            cluster_id: self.cluster_id,
1679            advertised_listener: self.advertised_listener,
1680            storage,
1681            schema_registry: self.schema_registry,
1682            lake_house: self.lake_house,
1683            silent: self.silent,
1684            cancellation: self.cancellation,
1685        }
1686    }
1687
1688    pub fn schema_registry(self, schema_registry: Option<Registry>) -> Self {
1689        _ = schema_registry
1690            .as_ref()
1691            .inspect(|schema_registry| debug!(?schema_registry));
1692
1693        Self {
1694            schema_registry,
1695            ..self
1696        }
1697    }
1698
1699    pub fn lake_house(self, lake_house: Option<House>) -> Self {
1700        _ = lake_house
1701            .as_ref()
1702            .inspect(|lake_house| debug!(?lake_house));
1703
1704        Self { lake_house, ..self }
1705    }
1706
1707    pub fn cancellation(self, cancellation: CancellationToken) -> Self {
1708        Self {
1709            cancellation,
1710            ..self
1711        }
1712    }
1713
1714    pub fn silent(self, silent: bool) -> Self {
1715        Self { silent, ..self }
1716    }
1717}
1718
1719impl Builder<i32, String, Url, Url> {
1720    pub async fn build(self) -> Result<StorageContainer> {
1721        let storage = match self.storage.scheme() {
1722            #[cfg(feature = "postgres")]
1723            "postgres" | "postgresql" => Postgres::builder(self.storage.to_string().as_str())
1724                .map(|builder| builder.cluster(self.cluster_id.as_str()))
1725                .map(|builder| builder.node(self.node_id))
1726                .map(|builder| builder.advertised_listener(self.advertised_listener.clone()))
1727                .map(|builder| builder.schemas(self.schema_registry))
1728                .map(|builder| builder.lake(self.lake_house.clone()))
1729                .map(|builder| builder.build())
1730                .map(StorageContainer::Postgres),
1731
1732            #[cfg(not(feature = "postgres"))]
1733            "postgres" | "postgresql" => Err(Error::FeatureNotEnabled {
1734                feature: "postgres".into(),
1735                message: self.storage.to_string(),
1736            }),
1737
1738            #[cfg(feature = "dynostore")]
1739            "s3" => {
1740                let bucket_name = self.storage.host_str().unwrap_or("tansu");
1741
1742                AmazonS3Builder::from_env()
1743                    .with_bucket_name(bucket_name)
1744                    .with_conditional_put(S3ConditionalPut::ETagMatch)
1745                    .build()
1746                    .map(|object_store| {
1747                        DynoStore::new(self.cluster_id.as_str(), self.node_id, object_store)
1748                            .advertised_listener(self.advertised_listener.clone())
1749                            .schemas(self.schema_registry)
1750                            .lake(self.lake_house.clone())
1751                    })
1752                    .map(StorageContainer::DynoStore)
1753                    .map_err(Into::into)
1754            }
1755
1756            #[cfg(feature = "dynostore")]
1757            "memory" => Ok(StorageContainer::DynoStore(
1758                DynoStore::new(self.cluster_id.as_str(), self.node_id, InMemory::new())
1759                    .advertised_listener(self.advertised_listener.clone())
1760                    .schemas(self.schema_registry)
1761                    .lake(self.lake_house.clone()),
1762            )),
1763
1764            #[cfg(not(feature = "dynostore"))]
1765            "s3" | "memory" => Err(Error::FeatureNotEnabled {
1766                feature: "dynostore".into(),
1767                message: self.storage.to_string(),
1768            }),
1769
1770            #[cfg(feature = "libsql")]
1771            "sqlite" => lite::Engine::builder()
1772                .storage(self.storage.clone())
1773                .node(self.node_id)
1774                .cluster(self.cluster_id.clone())
1775                .advertised_listener(self.advertised_listener.clone())
1776                .schemas(self.schema_registry)
1777                .lake(self.lake_house.clone())
1778                .cancellation(self.cancellation.clone())
1779                .build()
1780                .await
1781                .map(StorageContainer::Lite),
1782
1783            #[cfg(not(feature = "libsql"))]
1784            "sqlite" => Err(Error::FeatureNotEnabled {
1785                feature: "libsql".into(),
1786                message: self.storage.to_string(),
1787            }),
1788
1789            #[cfg(feature = "slatedb")]
1790            "slatedb" => {
1791                use slatedb::Db;
1792                use slatedb::object_store::{
1793                    ObjectStore as SlateObjectStore,
1794                    aws::{
1795                        AmazonS3Builder as SlateS3Builder,
1796                        S3ConditionalPut as SlateS3ConditionalPut,
1797                    },
1798                    memory::InMemory as SlateInMemory,
1799                };
1800
1801                let host = self.storage.host_str().unwrap_or("tansu");
1802                let db_path = format!("tansu-{}.slatedb", self.cluster_id);
1803
1804                // Support memory backend for testing: slatedb://memory
1805                let object_store: Arc<dyn SlateObjectStore> = if host == "memory" {
1806                    Arc::new(SlateInMemory::new())
1807                } else {
1808                    // Use S3 backend with host as bucket name
1809                    SlateS3Builder::from_env()
1810                        .with_bucket_name(host)
1811                        .with_conditional_put(SlateS3ConditionalPut::ETagMatch)
1812                        .build()
1813                        .map(Arc::new)
1814                        .map_err(|e| Error::Message(e.to_string()))?
1815                };
1816
1817                Db::open(db_path, object_store)
1818                    .await
1819                    .map(Arc::new)
1820                    .map(|db| {
1821                        slate::Engine::builder()
1822                            .cluster(self.cluster_id.clone())
1823                            .node(self.node_id)
1824                            .advertised_listener(self.advertised_listener.clone())
1825                            .db(db)
1826                            .schemas(self.schema_registry)
1827                            .lake(self.lake_house)
1828                            .build()
1829                    })
1830                    .map(StorageContainer::Slate)
1831                    .map_err(Into::into)
1832            }
1833
1834            #[cfg(not(feature = "slatedb"))]
1835            "slatedb" => Err(Error::FeatureNotEnabled {
1836                feature: "slatedb".into(),
1837                message: self.storage.to_string(),
1838            }),
1839
1840            #[cfg(feature = "turso")]
1841            "turso" => limbo::Engine::builder()
1842                .storage(self.storage.clone())
1843                .node(self.node_id)
1844                .cluster(self.cluster_id.clone())
1845                .advertised_listener(self.advertised_listener.clone())
1846                .schemas(self.schema_registry)
1847                .lake(self.lake_house.clone())
1848                .build()
1849                .await
1850                .map(StorageContainer::Turso),
1851
1852            #[cfg(not(feature = "turso"))]
1853            "turso" => Err(Error::FeatureNotEnabled {
1854                feature: "turso".into(),
1855                message: self.storage.to_string(),
1856            }),
1857
1858            "null" => Ok(StorageContainer::Null(null::Engine::new(
1859                self.cluster_id.clone(),
1860                self.node_id,
1861                self.advertised_listener.clone(),
1862            ))),
1863
1864            #[cfg(not(any(
1865                feature = "dynostore",
1866                feature = "libsql",
1867                feature = "postgres",
1868                feature = "slatedb",
1869                feature = "turso"
1870            )))]
1871            _storage => Ok(StorageContainer::Null(null::Engine::new(
1872                self.cluster_id.clone(),
1873                self.node_id,
1874                self.advertised_listener.clone(),
1875            ))),
1876
1877            #[cfg(any(
1878                feature = "dynostore",
1879                feature = "libsql",
1880                feature = "postgres",
1881                feature = "slatedb",
1882                feature = "turso"
1883            ))]
1884            _unsupported => Err(Error::UnsupportedStorageUrl(self.storage.clone())),
1885        }?;
1886
1887        let pb = if self.silent {
1888            None
1889        } else {
1890            let pb = ProgressBar::new(1);
1891            pb.set_style(
1892                ProgressStyle::with_template("[{elapsed}] {bar:40.cyan/blue} {msg}")
1893                    .unwrap()
1894                    .progress_chars("##-"),
1895            );
1896
1897            pb.set_message("connecting to storage");
1898
1899            Some(pb)
1900        };
1901
1902        storage.ping().await?;
1903
1904        if let Some(pb) = pb {
1905            pb.inc(1);
1906            pb.finish_with_message(format!("{} connected to storage", Emoji("✅", ""),));
1907        }
1908
1909        Ok(storage)
1910    }
1911}
1912
1913pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
1914    global::meter_with_scope(
1915        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
1916            .with_version(env!("CARGO_PKG_VERSION"))
1917            .with_schema_url(SCHEMA_URL)
1918            .build(),
1919    )
1920});
1921
1922static STORAGE_CONTAINER_REQUESTS: LazyLock<Counter<u64>> = LazyLock::new(|| {
1923    METER
1924        .u64_counter("tansu_storage_container_requests")
1925        .with_description("tansu storage container requests")
1926        .build()
1927});
1928
1929static STORAGE_CONTAINER_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
1930    METER
1931        .u64_counter("tansu_storage_container_errors")
1932        .with_description("tansu storage container errors")
1933        .build()
1934});
1935
1936#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
1937pub struct ScramCredential {
1938    pub salt: Bytes,
1939    pub iterations: i32,
1940    pub stored_key: Bytes,
1941    pub server_key: Bytes,
1942}
1943
1944#[async_trait]
1945impl Storage for StorageContainer {
1946    #[instrument(skip_all)]
1947    async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
1948        let attributes = [KeyValue::new("method", "register_broker")];
1949
1950        match self {
1951            #[cfg(feature = "dynostore")]
1952            Self::DynoStore(engine) => engine.register_broker(broker_registration),
1953
1954            #[cfg(feature = "libsql")]
1955            Self::Lite(engine) => engine.register_broker(broker_registration),
1956
1957            Self::Null(engine) => engine.register_broker(broker_registration),
1958
1959            #[cfg(feature = "postgres")]
1960            Self::Postgres(engine) => engine.register_broker(broker_registration),
1961
1962            #[cfg(feature = "slatedb")]
1963            Self::Slate(engine) => engine.register_broker(broker_registration),
1964
1965            #[cfg(feature = "turso")]
1966            Self::Turso(engine) => engine.register_broker(broker_registration),
1967        }
1968        .await
1969        .inspect(|_| {
1970            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
1971        })
1972        .inspect_err(|_| {
1973            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
1974        })
1975    }
1976
1977    #[instrument(skip_all)]
1978    async fn incremental_alter_resource(
1979        &self,
1980        resource: AlterConfigsResource,
1981    ) -> Result<AlterConfigsResourceResponse> {
1982        let attributes = [KeyValue::new("method", "incremental_alter_resource")];
1983
1984        match self {
1985            #[cfg(feature = "dynostore")]
1986            Self::DynoStore(engine) => engine.incremental_alter_resource(resource),
1987
1988            #[cfg(feature = "libsql")]
1989            Self::Lite(engine) => engine.incremental_alter_resource(resource),
1990
1991            Self::Null(engine) => engine.incremental_alter_resource(resource),
1992
1993            #[cfg(feature = "postgres")]
1994            Self::Postgres(engine) => engine.incremental_alter_resource(resource),
1995
1996            #[cfg(feature = "slatedb")]
1997            Self::Slate(engine) => engine.incremental_alter_resource(resource),
1998
1999            #[cfg(feature = "turso")]
2000            Self::Turso(engine) => engine.incremental_alter_resource(resource),
2001        }
2002        .await
2003        .inspect(|_| {
2004            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2005        })
2006        .inspect_err(|_| {
2007            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2008        })
2009    }
2010
2011    #[instrument(skip_all)]
2012    async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
2013        let attributes = [KeyValue::new("method", "create_topic")];
2014
2015        match self {
2016            #[cfg(feature = "dynostore")]
2017            Self::DynoStore(engine) => engine.create_topic(topic, validate_only),
2018
2019            #[cfg(feature = "libsql")]
2020            Self::Lite(engine) => engine.create_topic(topic, validate_only),
2021
2022            Self::Null(engine) => engine.create_topic(topic, validate_only),
2023
2024            #[cfg(feature = "postgres")]
2025            Self::Postgres(engine) => engine.create_topic(topic, validate_only),
2026
2027            #[cfg(feature = "turso")]
2028            Self::Turso(engine) => engine.create_topic(topic, validate_only),
2029
2030            #[cfg(feature = "slatedb")]
2031            Self::Slate(engine) => engine.create_topic(topic, validate_only),
2032        }
2033        .await
2034        .inspect(|_| {
2035            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2036        })
2037        .inspect_err(|_| {
2038            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2039        })
2040    }
2041
2042    #[instrument(skip_all)]
2043    async fn delete_records(
2044        &self,
2045        topics: &[DeleteRecordsTopic],
2046    ) -> Result<Vec<DeleteRecordsTopicResult>> {
2047        let attributes = [KeyValue::new("method", "delete_records")];
2048
2049        match self {
2050            #[cfg(feature = "dynostore")]
2051            Self::DynoStore(engine) => engine.delete_records(topics),
2052
2053            #[cfg(feature = "libsql")]
2054            Self::Lite(engine) => engine.delete_records(topics),
2055
2056            Self::Null(engine) => engine.delete_records(topics),
2057
2058            #[cfg(feature = "postgres")]
2059            Self::Postgres(engine) => engine.delete_records(topics),
2060
2061            #[cfg(feature = "slatedb")]
2062            Self::Slate(engine) => engine.delete_records(topics),
2063
2064            #[cfg(feature = "turso")]
2065            Self::Turso(engine) => engine.delete_records(topics),
2066        }
2067        .await
2068        .inspect(|_| {
2069            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2070        })
2071        .inspect_err(|_| {
2072            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2073        })
2074    }
2075
2076    #[instrument(skip_all)]
2077    async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
2078        let attributes = [KeyValue::new("method", "delete_topic")];
2079
2080        match self {
2081            #[cfg(feature = "dynostore")]
2082            Self::DynoStore(engine) => engine.delete_topic(topic),
2083
2084            #[cfg(feature = "libsql")]
2085            Self::Lite(engine) => engine.delete_topic(topic),
2086
2087            Self::Null(engine) => engine.delete_topic(topic),
2088
2089            #[cfg(feature = "postgres")]
2090            Self::Postgres(engine) => engine.delete_topic(topic),
2091
2092            #[cfg(feature = "slatedb")]
2093            Self::Slate(engine) => engine.delete_topic(topic),
2094
2095            #[cfg(feature = "turso")]
2096            Self::Turso(engine) => engine.delete_topic(topic),
2097        }
2098        .await
2099        .inspect(|_| {
2100            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2101        })
2102        .inspect_err(|_| {
2103            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2104        })
2105    }
2106
2107    #[instrument(skip_all)]
2108    async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
2109        let attributes = [KeyValue::new("method", "brokers")];
2110
2111        match self {
2112            #[cfg(feature = "dynostore")]
2113            Self::DynoStore(engine) => engine.brokers(),
2114
2115            #[cfg(feature = "libsql")]
2116            Self::Lite(engine) => engine.brokers(),
2117
2118            Self::Null(engine) => engine.brokers(),
2119
2120            #[cfg(feature = "postgres")]
2121            Self::Postgres(engine) => engine.brokers(),
2122
2123            #[cfg(feature = "slatedb")]
2124            Self::Slate(engine) => engine.brokers(),
2125
2126            #[cfg(feature = "turso")]
2127            Self::Turso(engine) => engine.brokers(),
2128        }
2129        .await
2130        .inspect(|_| {
2131            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2132        })
2133        .inspect_err(|_| {
2134            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2135        })
2136    }
2137
2138    #[instrument(skip_all)]
2139    async fn produce(
2140        &self,
2141        transaction_id: Option<&str>,
2142        topition: &Topition,
2143        batch: deflated::Batch,
2144    ) -> Result<i64> {
2145        let attributes = [KeyValue::new("method", "produce")];
2146
2147        match self {
2148            #[cfg(feature = "dynostore")]
2149            Self::DynoStore(engine) => engine.produce(transaction_id, topition, batch),
2150
2151            #[cfg(feature = "libsql")]
2152            Self::Lite(engine) => engine.produce(transaction_id, topition, batch),
2153
2154            Self::Null(engine) => engine.produce(transaction_id, topition, batch),
2155
2156            #[cfg(feature = "postgres")]
2157            Self::Postgres(engine) => engine.produce(transaction_id, topition, batch),
2158
2159            #[cfg(feature = "slatedb")]
2160            Self::Slate(engine) => engine.produce(transaction_id, topition, batch),
2161
2162            #[cfg(feature = "turso")]
2163            Self::Turso(engine) => engine.produce(transaction_id, topition, batch),
2164        }
2165        .await
2166        .inspect(|_| {
2167            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2168        })
2169        .inspect_err(|_| {
2170            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2171        })
2172    }
2173
2174    #[instrument(skip_all)]
2175    async fn fetch(
2176        &self,
2177        topition: &'_ Topition,
2178        offset: i64,
2179        min_bytes: u32,
2180        max_bytes: u32,
2181        isolation: IsolationLevel,
2182    ) -> Result<Vec<deflated::Batch>> {
2183        let attributes = [KeyValue::new("method", "fetch")];
2184
2185        match self {
2186            #[cfg(feature = "dynostore")]
2187            Self::DynoStore(engine) => {
2188                engine.fetch(topition, offset, min_bytes, max_bytes, isolation)
2189            }
2190
2191            #[cfg(feature = "libsql")]
2192            Self::Lite(engine) => engine.fetch(topition, offset, min_bytes, max_bytes, isolation),
2193
2194            Self::Null(engine) => engine.fetch(topition, offset, min_bytes, max_bytes, isolation),
2195
2196            #[cfg(feature = "postgres")]
2197            Self::Postgres(engine) => {
2198                engine.fetch(topition, offset, min_bytes, max_bytes, isolation)
2199            }
2200
2201            #[cfg(feature = "slatedb")]
2202            Self::Slate(engine) => engine.fetch(topition, offset, min_bytes, max_bytes, isolation),
2203
2204            #[cfg(feature = "turso")]
2205            Self::Turso(engine) => engine.fetch(topition, offset, min_bytes, max_bytes, isolation),
2206        }
2207        .await
2208        .inspect(|_| {
2209            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2210        })
2211        .inspect_err(|_| {
2212            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2213        })
2214    }
2215
2216    #[instrument(skip_all)]
2217    async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
2218        let attributes = [KeyValue::new("method", "offset_stage")];
2219
2220        match self {
2221            #[cfg(feature = "dynostore")]
2222            Self::DynoStore(engine) => engine.offset_stage(topition),
2223
2224            #[cfg(feature = "libsql")]
2225            Self::Lite(engine) => engine.offset_stage(topition),
2226
2227            Self::Null(engine) => engine.offset_stage(topition),
2228
2229            #[cfg(feature = "postgres")]
2230            Self::Postgres(engine) => engine.offset_stage(topition),
2231
2232            #[cfg(feature = "slatedb")]
2233            Self::Slate(engine) => engine.offset_stage(topition),
2234
2235            #[cfg(feature = "turso")]
2236            Self::Turso(engine) => engine.offset_stage(topition),
2237        }
2238        .await
2239        .inspect(|_| {
2240            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2241        })
2242        .inspect_err(|_| {
2243            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2244        })
2245    }
2246
2247    #[instrument(skip_all)]
2248    async fn list_offsets(
2249        &self,
2250        isolation_level: IsolationLevel,
2251        offsets: &[(Topition, ListOffset)],
2252    ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
2253        let attributes = [KeyValue::new("method", "list_offsets")];
2254
2255        match self {
2256            #[cfg(feature = "dynostore")]
2257            Self::DynoStore(engine) => engine.list_offsets(isolation_level, offsets),
2258
2259            #[cfg(feature = "libsql")]
2260            Self::Lite(engine) => engine.list_offsets(isolation_level, offsets),
2261
2262            Self::Null(engine) => engine.list_offsets(isolation_level, offsets),
2263
2264            #[cfg(feature = "postgres")]
2265            Self::Postgres(engine) => engine.list_offsets(isolation_level, offsets),
2266
2267            #[cfg(feature = "slatedb")]
2268            Self::Slate(engine) => engine.list_offsets(isolation_level, offsets),
2269
2270            #[cfg(feature = "turso")]
2271            Self::Turso(engine) => engine.list_offsets(isolation_level, offsets),
2272        }
2273        .await
2274        .inspect(|_| {
2275            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2276        })
2277        .inspect_err(|_| {
2278            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2279        })
2280    }
2281
2282    #[instrument(skip_all)]
2283    async fn offset_commit(
2284        &self,
2285        group_id: &str,
2286        retention_time_ms: Option<Duration>,
2287        offsets: &[(Topition, OffsetCommitRequest)],
2288    ) -> Result<Vec<(Topition, ErrorCode)>> {
2289        let attributes = [KeyValue::new("method", "offset_commit")];
2290
2291        match self {
2292            #[cfg(feature = "dynostore")]
2293            Self::DynoStore(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2294
2295            #[cfg(feature = "libsql")]
2296            Self::Lite(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2297
2298            Self::Null(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2299
2300            #[cfg(feature = "postgres")]
2301            Self::Postgres(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2302
2303            #[cfg(feature = "slatedb")]
2304            Self::Slate(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2305
2306            #[cfg(feature = "turso")]
2307            Self::Turso(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2308        }
2309        .await
2310        .inspect(|_| {
2311            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2312        })
2313        .inspect_err(|_| {
2314            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2315        })
2316    }
2317
2318    #[instrument(skip_all)]
2319    async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
2320        let attributes = [KeyValue::new("method", "committed_offset_topitions")];
2321
2322        match self {
2323            #[cfg(feature = "dynostore")]
2324            Self::DynoStore(engine) => engine.committed_offset_topitions(group_id),
2325
2326            #[cfg(feature = "libsql")]
2327            Self::Lite(engine) => engine.committed_offset_topitions(group_id),
2328
2329            Self::Null(engine) => engine.committed_offset_topitions(group_id),
2330
2331            #[cfg(feature = "postgres")]
2332            Self::Postgres(engine) => engine.committed_offset_topitions(group_id),
2333
2334            #[cfg(feature = "slatedb")]
2335            Self::Slate(engine) => engine.committed_offset_topitions(group_id),
2336
2337            #[cfg(feature = "turso")]
2338            Self::Turso(engine) => engine.committed_offset_topitions(group_id),
2339        }
2340        .await
2341        .inspect(|_| {
2342            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2343        })
2344        .inspect_err(|_| {
2345            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2346        })
2347    }
2348
2349    #[instrument(skip_all)]
2350    async fn offset_fetch(
2351        &self,
2352        group_id: Option<&str>,
2353        topics: &[Topition],
2354        require_stable: Option<bool>,
2355    ) -> Result<BTreeMap<Topition, i64>> {
2356        let attributes = [KeyValue::new("method", "offset_fetch")];
2357
2358        match self {
2359            #[cfg(feature = "dynostore")]
2360            Self::DynoStore(engine) => engine.offset_fetch(group_id, topics, require_stable),
2361
2362            #[cfg(feature = "libsql")]
2363            Self::Lite(engine) => engine.offset_fetch(group_id, topics, require_stable),
2364
2365            Self::Null(engine) => engine.offset_fetch(group_id, topics, require_stable),
2366
2367            #[cfg(feature = "postgres")]
2368            Self::Postgres(engine) => engine.offset_fetch(group_id, topics, require_stable),
2369
2370            #[cfg(feature = "slatedb")]
2371            Self::Slate(engine) => engine.offset_fetch(group_id, topics, require_stable),
2372
2373            #[cfg(feature = "turso")]
2374            Self::Turso(engine) => engine.offset_fetch(group_id, topics, require_stable),
2375        }
2376        .await
2377        .inspect(|_| {
2378            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2379        })
2380        .inspect_err(|_| {
2381            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2382        })
2383    }
2384
2385    #[instrument(skip_all)]
2386    async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
2387        let attributes = [KeyValue::new("method", "metadata")];
2388
2389        match self {
2390            #[cfg(feature = "dynostore")]
2391            Self::DynoStore(engine) => engine.metadata(topics),
2392
2393            #[cfg(feature = "libsql")]
2394            Self::Lite(engine) => engine.metadata(topics),
2395
2396            Self::Null(engine) => engine.metadata(topics),
2397
2398            #[cfg(feature = "postgres")]
2399            Self::Postgres(engine) => engine.metadata(topics),
2400
2401            #[cfg(feature = "slatedb")]
2402            Self::Slate(engine) => engine.metadata(topics),
2403
2404            #[cfg(feature = "turso")]
2405            Self::Turso(engine) => engine.metadata(topics),
2406        }
2407        .await
2408        .inspect(|_| {
2409            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2410        })
2411        .inspect_err(|_| {
2412            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2413        })
2414    }
2415
2416    #[instrument(skip_all)]
2417    async fn describe_config(
2418        &self,
2419        name: &str,
2420        resource: ConfigResource,
2421        keys: Option<&[String]>,
2422    ) -> Result<DescribeConfigsResult> {
2423        let attributes = [KeyValue::new("method", "describe_config")];
2424
2425        match self {
2426            #[cfg(feature = "dynostore")]
2427            Self::DynoStore(engine) => engine.describe_config(name, resource, keys),
2428
2429            #[cfg(feature = "libsql")]
2430            Self::Lite(engine) => engine.describe_config(name, resource, keys),
2431
2432            Self::Null(engine) => engine.describe_config(name, resource, keys),
2433
2434            #[cfg(feature = "postgres")]
2435            Self::Postgres(engine) => engine.describe_config(name, resource, keys),
2436
2437            #[cfg(feature = "slatedb")]
2438            Self::Slate(engine) => engine.describe_config(name, resource, keys),
2439
2440            #[cfg(feature = "turso")]
2441            Self::Turso(engine) => engine.describe_config(name, resource, keys),
2442        }
2443        .await
2444        .inspect(|_| {
2445            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2446        })
2447        .inspect_err(|_| {
2448            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2449        })
2450    }
2451
2452    #[instrument(skip_all)]
2453    async fn describe_topic_partitions(
2454        &self,
2455        topics: Option<&[TopicId]>,
2456        partition_limit: i32,
2457        cursor: Option<Topition>,
2458    ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
2459        let attributes = [KeyValue::new("method", "describe_topic_partitions")];
2460
2461        match self {
2462            #[cfg(feature = "dynostore")]
2463            Self::DynoStore(engine) => {
2464                engine.describe_topic_partitions(topics, partition_limit, cursor)
2465            }
2466
2467            #[cfg(feature = "libsql")]
2468            Self::Lite(engine) => engine.describe_topic_partitions(topics, partition_limit, cursor),
2469
2470            Self::Null(engine) => engine.describe_topic_partitions(topics, partition_limit, cursor),
2471
2472            #[cfg(feature = "postgres")]
2473            Self::Postgres(engine) => {
2474                engine.describe_topic_partitions(topics, partition_limit, cursor)
2475            }
2476
2477            #[cfg(feature = "slatedb")]
2478            Self::Slate(engine) => {
2479                engine.describe_topic_partitions(topics, partition_limit, cursor)
2480            }
2481
2482            #[cfg(feature = "turso")]
2483            Self::Turso(engine) => {
2484                engine.describe_topic_partitions(topics, partition_limit, cursor)
2485            }
2486        }
2487        .await
2488        .inspect(|_| {
2489            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2490        })
2491        .inspect_err(|_| {
2492            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2493        })
2494    }
2495
2496    #[instrument(skip_all)]
2497    async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
2498        let attributes = [KeyValue::new("method", "list_groups")];
2499
2500        match self {
2501            #[cfg(feature = "dynostore")]
2502            Self::DynoStore(engine) => engine.list_groups(states_filter),
2503
2504            #[cfg(feature = "libsql")]
2505            Self::Lite(engine) => engine.list_groups(states_filter),
2506
2507            Self::Null(engine) => engine.list_groups(states_filter),
2508
2509            #[cfg(feature = "postgres")]
2510            Self::Postgres(engine) => engine.list_groups(states_filter),
2511
2512            #[cfg(feature = "slatedb")]
2513            Self::Slate(engine) => engine.list_groups(states_filter),
2514
2515            #[cfg(feature = "turso")]
2516            Self::Turso(engine) => engine.list_groups(states_filter),
2517        }
2518        .await
2519        .inspect(|_| {
2520            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2521        })
2522        .inspect_err(|_| {
2523            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2524        })
2525    }
2526
2527    #[instrument(skip_all)]
2528    async fn delete_groups(
2529        &self,
2530        group_ids: Option<&[String]>,
2531    ) -> Result<Vec<DeletableGroupResult>> {
2532        let attributes = [KeyValue::new("method", "delete_groups")];
2533
2534        match self {
2535            #[cfg(feature = "dynostore")]
2536            Self::DynoStore(engine) => engine.delete_groups(group_ids),
2537
2538            #[cfg(feature = "libsql")]
2539            Self::Lite(engine) => engine.delete_groups(group_ids),
2540
2541            Self::Null(engine) => engine.delete_groups(group_ids),
2542
2543            #[cfg(feature = "postgres")]
2544            Self::Postgres(engine) => engine.delete_groups(group_ids),
2545
2546            #[cfg(feature = "slatedb")]
2547            Self::Slate(engine) => engine.delete_groups(group_ids),
2548
2549            #[cfg(feature = "turso")]
2550            Self::Turso(engine) => engine.delete_groups(group_ids),
2551        }
2552        .await
2553        .inspect(|_| {
2554            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2555        })
2556        .inspect_err(|_| {
2557            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2558        })
2559    }
2560
2561    #[instrument(skip_all)]
2562    async fn describe_groups(
2563        &self,
2564        group_ids: Option<&[String]>,
2565        include_authorized_operations: bool,
2566    ) -> Result<Vec<NamedGroupDetail>> {
2567        let attributes = [KeyValue::new("method", "describe_groups")];
2568
2569        match self {
2570            #[cfg(feature = "dynostore")]
2571            Self::DynoStore(engine) => {
2572                engine.describe_groups(group_ids, include_authorized_operations)
2573            }
2574
2575            #[cfg(feature = "libsql")]
2576            Self::Lite(engine) => engine.describe_groups(group_ids, include_authorized_operations),
2577
2578            Self::Null(engine) => engine.describe_groups(group_ids, include_authorized_operations),
2579
2580            #[cfg(feature = "postgres")]
2581            Self::Postgres(engine) => {
2582                engine.describe_groups(group_ids, include_authorized_operations)
2583            }
2584
2585            #[cfg(feature = "slatedb")]
2586            Self::Slate(engine) => engine.describe_groups(group_ids, include_authorized_operations),
2587
2588            #[cfg(feature = "turso")]
2589            Self::Turso(engine) => engine.describe_groups(group_ids, include_authorized_operations),
2590        }
2591        .await
2592        .inspect(|_| {
2593            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2594        })
2595        .inspect_err(|_| {
2596            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2597        })
2598    }
2599
2600    #[instrument(skip_all)]
2601    async fn update_group(
2602        &self,
2603        group_id: &str,
2604        detail: GroupDetail,
2605        version: Option<Version>,
2606    ) -> Result<Version, UpdateError<GroupDetail>> {
2607        let attributes = [KeyValue::new("method", "update_group")];
2608
2609        match self {
2610            #[cfg(feature = "dynostore")]
2611            Self::DynoStore(engine) => engine.update_group(group_id, detail, version),
2612
2613            #[cfg(feature = "libsql")]
2614            Self::Lite(engine) => engine.update_group(group_id, detail, version),
2615
2616            Self::Null(engine) => engine.update_group(group_id, detail, version),
2617
2618            #[cfg(feature = "postgres")]
2619            Self::Postgres(engine) => engine.update_group(group_id, detail, version),
2620
2621            #[cfg(feature = "slatedb")]
2622            Self::Slate(engine) => engine.update_group(group_id, detail, version),
2623
2624            #[cfg(feature = "turso")]
2625            Self::Turso(engine) => engine.update_group(group_id, detail, version),
2626        }
2627        .await
2628        .inspect(|_| {
2629            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2630        })
2631        .inspect_err(|_| {
2632            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2633        })
2634    }
2635
2636    #[instrument(skip_all)]
2637    async fn init_producer(
2638        &self,
2639        transaction_id: Option<&str>,
2640        transaction_timeout_ms: i32,
2641        producer_id: Option<i64>,
2642        producer_epoch: Option<i16>,
2643    ) -> Result<ProducerIdResponse> {
2644        let attributes = [KeyValue::new("method", "init_producer")];
2645
2646        match self {
2647            #[cfg(feature = "dynostore")]
2648            Self::DynoStore(engine) => engine.init_producer(
2649                transaction_id,
2650                transaction_timeout_ms,
2651                producer_id,
2652                producer_epoch,
2653            ),
2654
2655            #[cfg(feature = "libsql")]
2656            Self::Lite(engine) => engine.init_producer(
2657                transaction_id,
2658                transaction_timeout_ms,
2659                producer_id,
2660                producer_epoch,
2661            ),
2662
2663            Self::Null(engine) => engine.init_producer(
2664                transaction_id,
2665                transaction_timeout_ms,
2666                producer_id,
2667                producer_epoch,
2668            ),
2669
2670            #[cfg(feature = "postgres")]
2671            Self::Postgres(engine) => engine.init_producer(
2672                transaction_id,
2673                transaction_timeout_ms,
2674                producer_id,
2675                producer_epoch,
2676            ),
2677
2678            #[cfg(feature = "slatedb")]
2679            Self::Slate(engine) => engine.init_producer(
2680                transaction_id,
2681                transaction_timeout_ms,
2682                producer_id,
2683                producer_epoch,
2684            ),
2685
2686            #[cfg(feature = "turso")]
2687            Self::Turso(engine) => engine.init_producer(
2688                transaction_id,
2689                transaction_timeout_ms,
2690                producer_id,
2691                producer_epoch,
2692            ),
2693        }
2694        .await
2695        .inspect(|_| {
2696            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2697        })
2698        .inspect_err(|_| {
2699            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2700        })
2701    }
2702
2703    #[instrument(skip_all)]
2704    async fn txn_add_offsets(
2705        &self,
2706        transaction_id: &str,
2707        producer_id: i64,
2708        producer_epoch: i16,
2709        group_id: &str,
2710    ) -> Result<ErrorCode> {
2711        let attributes = [KeyValue::new("method", "txn_add_offsets")];
2712
2713        match self {
2714            #[cfg(feature = "dynostore")]
2715            Self::DynoStore(engine) => {
2716                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2717            }
2718
2719            #[cfg(feature = "libsql")]
2720            Self::Lite(engine) => {
2721                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2722            }
2723
2724            Self::Null(engine) => {
2725                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2726            }
2727
2728            #[cfg(feature = "postgres")]
2729            Self::Postgres(engine) => {
2730                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2731            }
2732
2733            #[cfg(feature = "slatedb")]
2734            Self::Slate(engine) => {
2735                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2736            }
2737
2738            #[cfg(feature = "turso")]
2739            Self::Turso(engine) => {
2740                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
2741            }
2742        }
2743        .await
2744        .inspect(|_| {
2745            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2746        })
2747        .inspect_err(|_| {
2748            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2749        })
2750    }
2751
2752    #[instrument(skip_all)]
2753    async fn txn_add_partitions(
2754        &self,
2755        partitions: TxnAddPartitionsRequest,
2756    ) -> Result<TxnAddPartitionsResponse> {
2757        let attributes = [KeyValue::new("method", "txn_add_partitions")];
2758
2759        match self {
2760            #[cfg(feature = "dynostore")]
2761            Self::DynoStore(engine) => engine.txn_add_partitions(partitions),
2762
2763            #[cfg(feature = "libsql")]
2764            Self::Lite(engine) => engine.txn_add_partitions(partitions),
2765
2766            Self::Null(engine) => engine.txn_add_partitions(partitions),
2767
2768            #[cfg(feature = "postgres")]
2769            Self::Postgres(engine) => engine.txn_add_partitions(partitions),
2770
2771            #[cfg(feature = "slatedb")]
2772            Self::Slate(engine) => engine.txn_add_partitions(partitions),
2773
2774            #[cfg(feature = "turso")]
2775            Self::Turso(engine) => engine.txn_add_partitions(partitions),
2776        }
2777        .await
2778        .inspect(|_| {
2779            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2780        })
2781        .inspect_err(|_| {
2782            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2783        })
2784    }
2785
2786    #[instrument(skip_all)]
2787    async fn txn_offset_commit(
2788        &self,
2789        offsets: TxnOffsetCommitRequest,
2790    ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
2791        let attributes = [KeyValue::new("method", "txn_offset_commit")];
2792
2793        match self {
2794            #[cfg(feature = "dynostore")]
2795            Self::DynoStore(engine) => engine.txn_offset_commit(offsets),
2796
2797            #[cfg(feature = "libsql")]
2798            Self::Lite(engine) => engine.txn_offset_commit(offsets),
2799
2800            Self::Null(engine) => engine.txn_offset_commit(offsets),
2801
2802            #[cfg(feature = "postgres")]
2803            Self::Postgres(engine) => engine.txn_offset_commit(offsets),
2804
2805            #[cfg(feature = "slatedb")]
2806            Self::Slate(engine) => engine.txn_offset_commit(offsets),
2807
2808            #[cfg(feature = "turso")]
2809            Self::Turso(engine) => engine.txn_offset_commit(offsets),
2810        }
2811        .await
2812        .inspect(|_| {
2813            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2814        })
2815        .inspect_err(|_| {
2816            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2817        })
2818    }
2819
2820    #[instrument(skip_all)]
2821    async fn txn_end(
2822        &self,
2823        transaction_id: &str,
2824        producer_id: i64,
2825        producer_epoch: i16,
2826        committed: bool,
2827    ) -> Result<ErrorCode> {
2828        let attributes = [KeyValue::new("method", "txn_end")];
2829
2830        match self {
2831            #[cfg(feature = "dynostore")]
2832            Self::DynoStore(engine) => {
2833                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2834            }
2835
2836            #[cfg(feature = "libsql")]
2837            Self::Lite(engine) => {
2838                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2839            }
2840
2841            Self::Null(engine) => {
2842                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2843            }
2844
2845            #[cfg(feature = "postgres")]
2846            Self::Postgres(engine) => {
2847                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2848            }
2849
2850            #[cfg(feature = "slatedb")]
2851            Self::Slate(engine) => {
2852                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2853            }
2854
2855            #[cfg(feature = "turso")]
2856            Self::Turso(engine) => {
2857                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
2858            }
2859        }
2860        .await
2861        .inspect(|_| {
2862            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2863        })
2864        .inspect_err(|_| {
2865            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2866        })
2867    }
2868
2869    #[instrument(skip_all)]
2870    async fn maintain(&self, now: SystemTime) -> Result<()> {
2871        let attributes = [KeyValue::new("method", "maintain")];
2872
2873        match self {
2874            #[cfg(feature = "dynostore")]
2875            Self::DynoStore(engine) => engine.maintain(now),
2876
2877            #[cfg(feature = "libsql")]
2878            Self::Lite(engine) => engine.maintain(now),
2879
2880            Self::Null(engine) => engine.maintain(now),
2881
2882            #[cfg(feature = "postgres")]
2883            Self::Postgres(engine) => engine.maintain(now),
2884
2885            #[cfg(feature = "slatedb")]
2886            Self::Slate(engine) => engine.maintain(now),
2887
2888            #[cfg(feature = "turso")]
2889            Self::Turso(engine) => engine.maintain(now),
2890        }
2891        .await
2892        .inspect(|maintain| {
2893            debug!(?maintain);
2894            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2895        })
2896        .inspect_err(|err| {
2897            debug!(?err);
2898            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2899        })
2900    }
2901
2902    #[instrument(skip_all)]
2903    async fn cluster_id(&self) -> Result<String> {
2904        match self {
2905            #[cfg(feature = "dynostore")]
2906            Self::DynoStore(engine) => engine.cluster_id().await,
2907
2908            #[cfg(feature = "libsql")]
2909            Self::Lite(engine) => engine.cluster_id().await,
2910
2911            Self::Null(engine) => engine.cluster_id().await,
2912
2913            #[cfg(feature = "postgres")]
2914            Self::Postgres(engine) => engine.cluster_id().await,
2915
2916            #[cfg(feature = "slatedb")]
2917            Self::Slate(engine) => engine.cluster_id().await,
2918
2919            #[cfg(feature = "turso")]
2920            Self::Turso(engine) => engine.cluster_id().await,
2921        }
2922    }
2923
2924    #[instrument(skip_all)]
2925    async fn node(&self) -> Result<i32> {
2926        match self {
2927            #[cfg(feature = "dynostore")]
2928            Self::DynoStore(engine) => engine.node().await,
2929
2930            #[cfg(feature = "libsql")]
2931            Self::Lite(engine) => engine.node().await,
2932
2933            Self::Null(engine) => engine.node().await,
2934
2935            #[cfg(feature = "postgres")]
2936            Self::Postgres(engine) => engine.node().await,
2937
2938            #[cfg(feature = "slatedb")]
2939            Self::Slate(engine) => engine.node().await,
2940
2941            #[cfg(feature = "turso")]
2942            Self::Turso(engine) => engine.node().await,
2943        }
2944    }
2945
2946    #[instrument(skip_all)]
2947    async fn advertised_listener(&self) -> Result<Url> {
2948        match self {
2949            #[cfg(feature = "dynostore")]
2950            Self::DynoStore(engine) => engine.advertised_listener().await,
2951
2952            #[cfg(feature = "libsql")]
2953            Self::Lite(engine) => engine.advertised_listener().await,
2954
2955            Self::Null(engine) => engine.advertised_listener().await,
2956
2957            #[cfg(feature = "postgres")]
2958            Self::Postgres(engine) => engine.advertised_listener().await,
2959
2960            #[cfg(feature = "slatedb")]
2961            Self::Slate(engine) => engine.advertised_listener().await,
2962
2963            #[cfg(feature = "turso")]
2964            Self::Turso(engine) => engine.advertised_listener().await,
2965        }
2966    }
2967
2968    async fn delete_user_scram_credential(
2969        &self,
2970        user: &str,
2971        mechanism: ScramMechanism,
2972    ) -> Result<()> {
2973        match self {
2974            #[cfg(feature = "dynostore")]
2975            Self::DynoStore(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2976
2977            #[cfg(feature = "libsql")]
2978            Self::Lite(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2979
2980            Self::Null(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2981
2982            #[cfg(feature = "postgres")]
2983            Self::Postgres(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2984
2985            #[cfg(feature = "slatedb")]
2986            Self::Slate(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2987
2988            #[cfg(feature = "turso")]
2989            Self::Turso(engine) => engine.delete_user_scram_credential(user, mechanism).await,
2990        }
2991    }
2992
2993    async fn upsert_user_scram_credential(
2994        &self,
2995        user: &str,
2996        mechanism: ScramMechanism,
2997        credential: ScramCredential,
2998    ) -> Result<()> {
2999        match self {
3000            #[cfg(feature = "dynostore")]
3001            Self::DynoStore(engine) => {
3002                engine
3003                    .upsert_user_scram_credential(user, mechanism, credential)
3004                    .await
3005            }
3006
3007            #[cfg(feature = "libsql")]
3008            Self::Lite(engine) => {
3009                engine
3010                    .upsert_user_scram_credential(user, mechanism, credential)
3011                    .await
3012            }
3013
3014            Self::Null(engine) => {
3015                engine
3016                    .upsert_user_scram_credential(user, mechanism, credential)
3017                    .await
3018            }
3019
3020            #[cfg(feature = "postgres")]
3021            Self::Postgres(engine) => {
3022                engine
3023                    .upsert_user_scram_credential(user, mechanism, credential)
3024                    .await
3025            }
3026
3027            #[cfg(feature = "slatedb")]
3028            Self::Slate(engine) => {
3029                engine
3030                    .upsert_user_scram_credential(user, mechanism, credential)
3031                    .await
3032            }
3033
3034            #[cfg(feature = "turso")]
3035            Self::Turso(engine) => {
3036                engine
3037                    .upsert_user_scram_credential(user, mechanism, credential)
3038                    .await
3039            }
3040        }
3041    }
3042
3043    async fn user_scram_credential(
3044        &self,
3045        user: &str,
3046        mechanism: ScramMechanism,
3047    ) -> Result<Option<ScramCredential>> {
3048        match self {
3049            #[cfg(feature = "dynostore")]
3050            Self::DynoStore(engine) => engine.user_scram_credential(user, mechanism).await,
3051
3052            #[cfg(feature = "libsql")]
3053            Self::Lite(engine) => engine.user_scram_credential(user, mechanism).await,
3054
3055            Self::Null(engine) => engine.user_scram_credential(user, mechanism).await,
3056
3057            #[cfg(feature = "postgres")]
3058            Self::Postgres(engine) => engine.user_scram_credential(user, mechanism).await,
3059
3060            #[cfg(feature = "slatedb")]
3061            Self::Slate(engine) => engine.user_scram_credential(user, mechanism).await,
3062
3063            #[cfg(feature = "turso")]
3064            Self::Turso(engine) => engine.user_scram_credential(user, mechanism).await,
3065        }
3066    }
3067
3068    #[instrument(skip_all)]
3069    async fn ping(&self) -> Result<()> {
3070        let attributes = [KeyValue::new("method", "ping")];
3071
3072        match self {
3073            #[cfg(feature = "dynostore")]
3074            Self::DynoStore(engine) => engine.ping(),
3075
3076            #[cfg(feature = "libsql")]
3077            Self::Lite(engine) => engine.ping(),
3078
3079            Self::Null(engine) => engine.ping(),
3080
3081            #[cfg(feature = "postgres")]
3082            Self::Postgres(engine) => engine.ping(),
3083
3084            #[cfg(feature = "turso")]
3085            Self::Turso(engine) => engine.ping(),
3086
3087            #[cfg(feature = "slatedb")]
3088            Self::Slate(engine) => engine.ping(),
3089        }
3090        .await
3091        .inspect(|_| {
3092            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3093        })
3094        .inspect_err(|_| {
3095            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3096        })
3097    }
3098}
3099
3100#[cfg(test)]
3101mod tests {
3102    use super::*;
3103
3104    #[test]
3105    fn topition_from_str() -> Result<()> {
3106        let topition = Topition::from_str("qwerty-2147483647")?;
3107        assert_eq!("qwerty", topition.topic());
3108        assert_eq!(i32::MAX, topition.partition());
3109        Ok(())
3110    }
3111
3112    #[test]
3113    fn topic_with_dashes_in_name() -> Result<()> {
3114        let topition = Topition::from_str("test-topic-0000000-eFC79C8-2147483647")?;
3115        assert_eq!("test-topic-0000000-eFC79C8", topition.topic());
3116        assert_eq!(i32::MAX, topition.partition());
3117        Ok(())
3118    }
3119}