Skip to main content

nisshi_storage/
lib.rs

1// Copyright ⓒ 2024-2026 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15//! Nisshi Storage Abstraction
16//!
17//! [`StorageContainer`] provides an abstraction over [`Storage`] and can
18//! be configured to use memory, [S3](https://en.wikipedia.org/wiki/Amazon_S3),
19//! [PostgreSQL](https://postgresql.org/),
20//! [libSQL](https://github.com/tursodatabase/libsql) and
21//! [Turso](https://github.com/tursodatabase/turso) (alpha: currently feature locked).
22//!
23//! ## Memory
24//!
25//! ```
26//! # use nisshi_storage::{Error, StorageContainer};
27//! # use url::Url;
28//! # #[tokio::main]
29//! # async fn main() -> Result<(), Error> {
30//! let storage = StorageContainer::builder()
31//!     .cluster_id("nisshi")
32//!     .node_id(111)
33//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
34//!     .storage(Url::parse("memory://nisshi/")?)
35//!     .build()
36//!     .await?;
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! ## S3
42//!
43//! ```no_run
44//! # use nisshi_storage::{Error, StorageContainer};
45//! # use url::Url;
46//! # #[tokio::main]
47//! # async fn main() -> Result<(), Error> {
48//! let storage = StorageContainer::builder()
49//!     .cluster_id("nisshi")
50//!     .node_id(111)
51//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
52//!     .storage(Url::parse("s3://nisshi/")?)
53//!     .build()
54//!     .await?;
55//! # Ok(())
56//! # }
57//! ```
58//!
59//! ## PostgreSQL
60//!
61//! ```no_run
62//! # use nisshi_storage::{Error, StorageContainer};
63//! # use url::Url;
64//! # #[tokio::main]
65//! # async fn main() -> Result<(), Error> {
66//! let storage = StorageContainer::builder()
67//!     .cluster_id("nisshi")
68//!     .node_id(111)
69//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
70//!     .storage(Url::parse("postgres://postgres:postgres@localhost")?)
71//!     .build()
72//!     .await?;
73//! # Ok(())
74//! # }
75//! ```
76//!
77//! ## libSQL (SQLite)
78//!
79//! ```no_run
80//! # use nisshi_storage::{Error, StorageContainer};
81//! # use url::Url;
82//! # #[tokio::main]
83//! # async fn main() -> Result<(), Error> {
84//! let storage = StorageContainer::builder()
85//!     .cluster_id("nisshi")
86//!     .node_id(111)
87//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
88//!     .storage(Url::parse("sqlite://nisshi.db")?)
89//!     .build()
90//!     .await?;
91//! # Ok(())
92//! # }
93//! ```
94//!
95//! ## Turso
96//!
97//! ```no_run
98//! # use nisshi_storage::{Error, StorageContainer};
99//! # use url::Url;
100//! # #[tokio::main]
101//! # async fn main() -> Result<(), Error> {
102//! let storage = StorageContainer::builder()
103//!     .cluster_id("nisshi")
104//!     .node_id(111)
105//!     .advertised_listener(Url::parse("tcp://localhost:9092")?)
106//!     .storage(Url::parse("turso://nisshi.db")?)
107//!     .build()
108//!     .await?;
109//! # Ok(())
110//! # }
111//! ```
112//!
113
114use async_trait::async_trait;
115use bytes::{Bytes, TryGetError};
116
117use console::Emoji;
118#[cfg(any(feature = "libsql", feature = "postgres"))]
119use deadpool::managed::PoolError;
120#[cfg(feature = "dynostore")]
121use dynostore::DynoStore;
122
123use glob::{GlobError, PatternError};
124
125use indicatif::{ProgressBar, ProgressStyle};
126#[cfg(feature = "dynostore")]
127use object_store::memory::InMemory;
128
129#[cfg(feature = "dynostore")]
130use object_store::aws::{AmazonS3Builder, S3ConditionalPut};
131
132use opentelemetry::{
133    InstrumentationScope, KeyValue, global,
134    metrics::{Counter, Meter},
135};
136use opentelemetry_semantic_conventions::SCHEMA_URL;
137
138#[cfg(feature = "postgres")]
139use pg::Postgres;
140
141use governor::InsufficientCapacity;
142use nisshi_sans_io::{
143    Body, ConfigResource, ErrorCode, IsolationLevel, ListOffset, NULL_TOPIC_ID, ScramMechanism,
144    add_partitions_to_txn_request::{
145        AddPartitionsToTxnRequest, AddPartitionsToTxnTopic, AddPartitionsToTxnTransaction,
146    },
147    add_partitions_to_txn_response::{AddPartitionsToTxnResult, AddPartitionsToTxnTopicResult},
148    consumer_group_describe_response,
149    create_topics_request::CreatableTopic,
150    delete_groups_response::DeletableGroupResult,
151    delete_records_request::DeleteRecordsTopic,
152    delete_records_response::DeleteRecordsTopicResult,
153    delete_topics_request::DeleteTopicState,
154    describe_cluster_response::DescribeClusterBroker,
155    describe_configs_response::DescribeConfigsResult,
156    describe_groups_response,
157    describe_topic_partitions_request::{Cursor, TopicRequest},
158    describe_topic_partitions_response::DescribeTopicPartitionsResponseTopic,
159    fetch_request::FetchTopic,
160    incremental_alter_configs_request::AlterConfigsResource,
161    incremental_alter_configs_response::AlterConfigsResourceResponse,
162    join_group_response::JoinGroupResponseMember,
163    list_groups_response::ListedGroup,
164    metadata_request::MetadataRequestTopic,
165    metadata_response::{MetadataResponseBroker, MetadataResponseTopic},
166    offset_commit_request::OffsetCommitRequestPartition,
167    record::deflated,
168    to_system_time, to_timestamp,
169    txn_offset_commit_request::TxnOffsetCommitRequestTopic,
170    txn_offset_commit_response::TxnOffsetCommitResponseTopic,
171};
172use nisshi_schema::{Registry, lake::House};
173use regex::Regex;
174use serde::{Deserialize, Serialize};
175#[cfg(any(feature = "libsql", feature = "postgres"))]
176use std::error;
177use std::{
178    array::TryFromSliceError,
179    collections::BTreeMap,
180    ffi::OsString,
181    fmt::{self, Debug, Display, Formatter},
182    fs::DirEntry,
183    io,
184    marker::PhantomData,
185    num::{ParseIntError, TryFromIntError},
186    path::PathBuf,
187    result,
188    str::FromStr,
189    sync::{Arc, LazyLock, PoisonError},
190    time::{Duration, SystemTime, SystemTimeError},
191};
192use tokio::sync::AcquireError;
193use tokio_util::sync::CancellationToken;
194use tracing::{debug, instrument};
195use tracing_subscriber::filter::ParseError;
196use url::Url;
197use uuid::Uuid;
198
199#[cfg(feature = "dynostore")]
200use tracing::warn;
201
202#[cfg(feature = "dynostore")]
203mod dynostore;
204
205#[cfg(feature = "postgres")]
206mod pg;
207
208#[cfg(feature = "dynostore")]
209mod batch;
210mod latency;
211
212mod null;
213
214#[cfg(feature = "libsql")]
215mod proxy;
216
217mod service;
218
219pub use latency::LatencyIntroducingStorage;
220
221pub use service::{
222    AlterUserScramCredentialsService, ChannelRequestLayer, ChannelRequestService,
223    ConsumerGroupDescribeService, CreateAclsService, CreateTopicsService, DeleteGroupsService,
224    DeleteRecordsService, DeleteTopicsService, DescribeAclsService, DescribeClusterService,
225    DescribeConfigsService, DescribeGroupsService, DescribeTopicPartitionsService,
226    DescribeUserScramCredentialsService, FetchService, FindCoordinatorService,
227    GetTelemetrySubscriptionsService, IncrementalAlterConfigsService, InitProducerIdService,
228    ListGroupsService, ListOffsetsService, ListPartitionReassignmentsService, MetadataService,
229    ProduceService, Request, RequestChannelService, RequestLayer, RequestReceiver, RequestSender,
230    RequestService, RequestStorageService, Response, TxnAddOffsetsService, TxnAddPartitionService,
231    TxnEndService, TxnOffsetCommitService, bounded_channel,
232};
233
234#[cfg(feature = "slatedb")]
235pub mod slate;
236
237#[cfg(any(feature = "libsql", feature = "postgres", feature = "turso"))]
238pub(crate) mod sql;
239
240#[cfg(feature = "libsql")]
241mod lite;
242
243#[cfg(feature = "dynostore")]
244mod gcs;
245
246#[cfg(feature = "dynostore")]
247mod os;
248
249#[cfg(feature = "turso")]
250mod limbo;
251
252/// Storage Errors
253#[derive(Clone, Debug, thiserror::Error)]
254pub enum Error {
255    Acquire(Arc<AcquireError>),
256
257    Api(ErrorCode),
258
259    ChronoParse(#[from] chrono::ParseError),
260
261    #[cfg(any(feature = "postgres", feature = "libsql"))]
262    DeadPoolBuild(#[from] deadpool::managed::BuildError),
263
264    Decode(Bytes),
265
266    FeatureNotEnabled {
267        feature: String,
268        message: String,
269    },
270
271    Glob(Arc<GlobError>),
272    InsufficientCapacity(#[from] InsufficientCapacity),
273    Io(Arc<io::Error>),
274
275    LessThanBaseOffset {
276        offset: i64,
277        base_offset: i64,
278    },
279    LessThanLastOffset {
280        offset: i64,
281        last_offset: Option<i64>,
282    },
283
284    #[cfg(feature = "libsql")]
285    LibSql(Arc<libsql::Error>),
286
287    LessThanMaxTime {
288        time: i64,
289        max_time: Option<i64>,
290    },
291    LessThanMinTime {
292        time: i64,
293        min_time: Option<i64>,
294    },
295    Message(String),
296    NoSuchEntry {
297        nth: u32,
298    },
299    NoSuchOffset(i64),
300    OsString(OsString),
301
302    #[cfg(any(feature = "dynostore", feature = "slatedb"))]
303    ObjectStore(Arc<object_store::Error>),
304
305    ParseFilter(Arc<ParseError>),
306    Pattern(Arc<PatternError>),
307    ParseInt(#[from] ParseIntError),
308    PhantomCached(),
309    Poison,
310
311    #[cfg(any(feature = "libsql", feature = "postgres"))]
312    Pool(Arc<Box<dyn error::Error + Send + Sync>>),
313
314    #[cfg(feature = "slatedb")]
315    Postcard(#[from] postcard::Error),
316
317    Regex(#[from] regex::Error),
318
319    SansIo(#[from] nisshi_sans_io::Error),
320
321    Schema(Arc<nisshi_schema::Error>),
322
323    Rustls(#[from] rustls::Error),
324
325    SegmentEmpty(Topition),
326
327    SegmentMissing {
328        topition: Topition,
329        offset: Option<i64>,
330    },
331
332    SerdeJson(Arc<serde_json::Error>),
333
334    #[cfg(feature = "slatedb")]
335    Slate(Arc<slatedb::Error>),
336
337    SystemTime(#[from] SystemTimeError),
338
339    #[cfg(feature = "postgres")]
340    TokioPostgres(Arc<tokio_postgres::error::Error>),
341    TryFromInt(#[from] TryFromIntError),
342    TryFromSlice(#[from] TryFromSliceError),
343
344    TryGet(Arc<TryGetError>),
345
346    #[cfg(feature = "turso")]
347    Turso(Arc<turso::Error>),
348
349    UnexpectedBody(Box<Body>),
350
351    UnexpectedServiceResponse(Box<Response>),
352
353    #[cfg(feature = "turso")]
354    UnexpectedValue(turso::Value),
355
356    UnknownCacheKey(String),
357
358    UnsupportedStorageUrl(Url),
359    UnexpectedAddPartitionsToTxnRequest(Box<AddPartitionsToTxnRequest>),
360    Url(#[from] url::ParseError),
361    UnknownTxnState(String),
362
363    Uuid(#[from] uuid::Error),
364
365    UnableToSend,
366    OneshotRecv,
367}
368
369impl Display for Error {
370    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
371        write!(f, "{self:?}")
372    }
373}
374
375impl From<TryGetError> for Error {
376    fn from(value: TryGetError) -> Self {
377        Self::TryGet(Arc::new(value))
378    }
379}
380
381impl<T> From<PoisonError<T>> for Error {
382    fn from(_value: PoisonError<T>) -> Self {
383        Self::Poison
384    }
385}
386
387impl From<AcquireError> for Error {
388    fn from(value: AcquireError) -> Self {
389        Self::Acquire(Arc::new(value))
390    }
391}
392
393#[cfg(any(feature = "libsql", feature = "postgres"))]
394impl<E> From<PoolError<E>> for Error
395where
396    E: error::Error + Send + Sync + 'static,
397{
398    fn from(value: PoolError<E>) -> Self {
399        Self::Pool(Arc::new(Box::new(value)))
400    }
401}
402
403#[cfg(feature = "libsql")]
404impl From<libsql::Error> for Error {
405    fn from(value: libsql::Error) -> Self {
406        Self::LibSql(Arc::new(value))
407    }
408}
409
410#[cfg(feature = "slatedb")]
411impl From<slatedb::Error> for Error {
412    fn from(value: slatedb::Error) -> Self {
413        Self::Slate(Arc::new(value))
414    }
415}
416
417#[cfg(feature = "turso")]
418impl From<turso::Error> for Error {
419    fn from(value: turso::Error) -> Self {
420        Self::Turso(Arc::new(value))
421    }
422}
423
424impl From<GlobError> for Error {
425    fn from(value: GlobError) -> Self {
426        Self::Glob(Arc::new(value))
427    }
428}
429
430impl From<io::Error> for Error {
431    fn from(value: io::Error) -> Self {
432        Self::Io(Arc::new(value))
433    }
434}
435
436#[cfg(any(feature = "dynostore", feature = "slatedb"))]
437impl From<Arc<object_store::Error>> for Error {
438    fn from(value: Arc<object_store::Error>) -> Self {
439        Self::ObjectStore(value)
440    }
441}
442
443#[cfg(any(feature = "dynostore", feature = "slatedb"))]
444impl From<object_store::Error> for Error {
445    fn from(value: object_store::Error) -> Self {
446        Self::from(Arc::new(value))
447    }
448}
449
450impl From<ParseError> for Error {
451    fn from(value: ParseError) -> Self {
452        Self::ParseFilter(Arc::new(value))
453    }
454}
455
456impl From<PatternError> for Error {
457    fn from(value: PatternError) -> Self {
458        Self::Pattern(Arc::new(value))
459    }
460}
461
462impl From<serde_json::Error> for Error {
463    fn from(value: serde_json::Error) -> Self {
464        Self::from(Arc::new(value))
465    }
466}
467
468impl From<Arc<serde_json::Error>> for Error {
469    fn from(value: Arc<serde_json::Error>) -> Self {
470        Self::SerdeJson(value)
471    }
472}
473
474#[cfg(feature = "postgres")]
475impl From<tokio_postgres::error::Error> for Error {
476    fn from(value: tokio_postgres::error::Error) -> Self {
477        Self::from(Arc::new(value))
478    }
479}
480
481#[cfg(feature = "postgres")]
482impl From<Arc<tokio_postgres::error::Error>> for Error {
483    fn from(value: Arc<tokio_postgres::error::Error>) -> Self {
484        Self::TokioPostgres(value)
485    }
486}
487
488impl From<nisshi_schema::Error> for Error {
489    fn from(value: nisshi_schema::Error) -> Self {
490        if let nisshi_schema::Error::Api(error_code) = value {
491            Self::Api(error_code)
492        } else {
493            Self::Schema(Arc::new(value))
494        }
495    }
496}
497
498pub type Result<T, E = Error> = result::Result<T, E>;
499
500/// Topic Partition (topition)
501///
502/// A topic partition pair.
503#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
504pub struct Topition {
505    topic: String,
506    partition: i32,
507}
508
509impl Topition {
510    pub fn new(topic: impl Into<String>, partition: i32) -> Self {
511        let topic = topic.into();
512        Self { topic, partition }
513    }
514
515    pub fn topic(&self) -> &str {
516        &self.topic
517    }
518
519    pub fn partition(&self) -> i32 {
520        self.partition
521    }
522}
523
524impl From<Cursor> for Topition {
525    fn from(value: Cursor) -> Self {
526        Self {
527            topic: value.topic_name,
528            partition: value.partition_index,
529        }
530    }
531}
532
533impl TryFrom<&DirEntry> for Topition {
534    type Error = Error;
535
536    fn try_from(value: &DirEntry) -> result::Result<Self, Self::Error> {
537        Regex::new(r"^(?<topic>.+)-(?<partition>\d{10})$")
538            .map_err(Into::into)
539            .and_then(|re| {
540                value
541                    .file_name()
542                    .into_string()
543                    .map_err(Error::OsString)
544                    .and_then(|ref file_name| {
545                        re.captures(file_name)
546                            .ok_or(Error::Message(format!("no captures for {file_name}")))
547                            .and_then(|ref captures| {
548                                let topic = captures
549                                    .name("topic")
550                                    .ok_or(Error::Message(format!("missing topic for {file_name}")))
551                                    .map(|s| s.as_str().to_owned())?;
552
553                                let partition = captures
554                                    .name("partition")
555                                    .ok_or(Error::Message(format!(
556                                        "missing partition for: {file_name}"
557                                    )))
558                                    .map(|s| s.as_str())
559                                    .and_then(|s| str::parse(s).map_err(Into::into))?;
560
561                                Ok(Self { topic, partition })
562                            })
563                    })
564            })
565    }
566}
567
568impl FromStr for Topition {
569    type Err = Error;
570
571    fn from_str(s: &str) -> result::Result<Self, Self::Err> {
572        i32::from_str(&s[s.len() - 10..])
573            .map(|partition| {
574                let topic = String::from(&s[..s.len() - 11]);
575
576                Self { topic, partition }
577            })
578            .map_err(Into::into)
579    }
580}
581
582impl From<&Topition> for PathBuf {
583    fn from(value: &Topition) -> Self {
584        let topic = value.topic.as_str();
585        let partition = value.partition;
586        PathBuf::from(format!("{topic}-{partition:0>10}"))
587    }
588}
589
590/// Topic Partition Offset
591///
592/// A topic partition with an offset.
593#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
594pub struct TopitionOffset {
595    topition: Topition,
596    offset: i64,
597}
598
599impl TopitionOffset {
600    pub fn new(topition: Topition, offset: i64) -> Self {
601        Self { topition, offset }
602    }
603
604    pub fn topition(&self) -> &Topition {
605        &self.topition
606    }
607
608    pub fn offset(&self) -> i64 {
609        self.offset
610    }
611}
612
613impl From<&TopitionOffset> for PathBuf {
614    fn from(value: &TopitionOffset) -> Self {
615        let offset = value.offset;
616        PathBuf::from(value.topition()).join(format!("{offset:0>20}"))
617    }
618}
619
620pub type ListOffsetRequest = ListOffset;
621
622#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
623pub struct ListOffsetResponse {
624    pub error_code: ErrorCode,
625    pub timestamp: Option<SystemTime>,
626    pub offset: Option<i64>,
627}
628
629impl Default for ListOffsetResponse {
630    fn default() -> Self {
631        Self {
632            error_code: ErrorCode::None,
633            timestamp: None,
634            offset: None,
635        }
636    }
637}
638
639impl ListOffsetResponse {
640    pub fn offset(&self) -> Option<i64> {
641        self.offset
642    }
643
644    pub fn timestamp(&self) -> Result<Option<i64>> {
645        self.timestamp.map_or(Ok(None), |system_time| {
646            to_timestamp(&system_time).map(Some).map_err(Into::into)
647        })
648    }
649
650    pub fn error_code(&self) -> ErrorCode {
651        self.error_code
652    }
653}
654
655/// Offset Commit Request
656///
657/// A structure representing an [`nisshi_sans_io::OffsetCommitRequestPartition](OffsetCommitRequestPartition).
658#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
659pub struct OffsetCommitRequest {
660    offset: i64,
661    leader_epoch: Option<i32>,
662    timestamp: Option<SystemTime>,
663    metadata: Option<String>,
664}
665
666impl OffsetCommitRequest {
667    pub fn offset(self, offset: i64) -> Self {
668        Self { offset, ..self }
669    }
670}
671
672impl TryFrom<&OffsetCommitRequestPartition> for OffsetCommitRequest {
673    type Error = Error;
674
675    fn try_from(value: &OffsetCommitRequestPartition) -> Result<Self, Self::Error> {
676        value
677            .commit_timestamp
678            .map_or(Ok(None), |commit_timestamp| {
679                to_system_time(commit_timestamp)
680                    .map(Some)
681                    .map_err(Into::into)
682            })
683            .map(|timestamp| Self {
684                offset: value.committed_offset,
685                leader_epoch: value.committed_leader_epoch,
686                timestamp,
687                metadata: value.committed_metadata.clone(),
688            })
689    }
690}
691
692/// Topic Id
693///
694/// An enumeration of either the name or UUID of a topic.
695#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
696pub enum TopicId {
697    Name(String),
698    Id(Uuid),
699}
700
701impl FromStr for TopicId {
702    type Err = Error;
703
704    fn from_str(s: &str) -> result::Result<Self, Self::Err> {
705        Ok(Self::Name(s.into()))
706    }
707}
708
709impl From<&str> for TopicId {
710    fn from(value: &str) -> Self {
711        Self::Name(value.to_owned())
712    }
713}
714
715impl From<String> for TopicId {
716    fn from(value: String) -> Self {
717        Self::Name(value)
718    }
719}
720
721impl From<Uuid> for TopicId {
722    fn from(value: Uuid) -> Self {
723        Self::Id(value)
724    }
725}
726
727impl From<[u8; 16]> for TopicId {
728    fn from(value: [u8; 16]) -> Self {
729        Self::Id(Uuid::from_bytes(value))
730    }
731}
732
733impl From<&TopicId> for [u8; 16] {
734    fn from(value: &TopicId) -> Self {
735        match value {
736            TopicId::Id(id) => id.into_bytes(),
737            TopicId::Name(_) => NULL_TOPIC_ID,
738        }
739    }
740}
741
742impl From<&FetchTopic> for TopicId {
743    fn from(value: &FetchTopic) -> Self {
744        if let Some(ref name) = value.topic {
745            Self::Name(name.into())
746        } else if let Some(ref id) = value.topic_id {
747            Self::Id(Uuid::from_bytes(*id))
748        } else {
749            panic!("neither name nor uuid")
750        }
751    }
752}
753
754impl From<&MetadataRequestTopic> for TopicId {
755    fn from(value: &MetadataRequestTopic) -> Self {
756        if let Some(ref name) = value.name {
757            Self::Name(name.into())
758        } else if let Some(ref id) = value.topic_id {
759            Self::Id(Uuid::from_bytes(*id))
760        } else {
761            panic!("neither name nor uuid")
762        }
763    }
764}
765
766impl From<DeleteTopicState> for TopicId {
767    fn from(value: DeleteTopicState) -> Self {
768        match value {
769            DeleteTopicState {
770                name: Some(name),
771                topic_id,
772                ..
773            } if topic_id == NULL_TOPIC_ID => name.into(),
774
775            DeleteTopicState { topic_id, .. } => topic_id.into(),
776        }
777    }
778}
779
780impl From<&TopicRequest> for TopicId {
781    fn from(value: &TopicRequest) -> Self {
782        value.name.to_owned().into()
783    }
784}
785
786impl From<&Topition> for TopicId {
787    fn from(value: &Topition) -> Self {
788        value.topic.to_owned().into()
789    }
790}
791
792/// Broker Registration Request
793///
794/// A broker will register with storage using this structure.
795#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
796pub struct BrokerRegistrationRequest {
797    pub broker_id: i32,
798    pub cluster_id: String,
799    pub incarnation_id: Uuid,
800    pub rack: Option<String>,
801}
802
803#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
804pub struct MetadataResponse {
805    cluster: Option<String>,
806    controller: Option<i32>,
807    brokers: Vec<MetadataResponseBroker>,
808    topics: Vec<MetadataResponseTopic>,
809}
810
811impl MetadataResponse {
812    pub fn cluster(&self) -> Option<&str> {
813        self.cluster.as_deref()
814    }
815
816    pub fn controller(&self) -> Option<i32> {
817        self.controller
818    }
819
820    pub fn brokers(&self) -> &[MetadataResponseBroker] {
821        self.brokers.as_ref()
822    }
823
824    pub fn topics(&self) -> &[MetadataResponseTopic] {
825        self.topics.as_ref()
826    }
827}
828
829/// Offset Stage
830///
831/// An offset stage structure represents the `last_stable`, `high_watermark` and `log_start` offsets.
832#[derive(
833    Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
834)]
835pub struct OffsetStage {
836    last_stable: i64,
837    high_watermark: i64,
838    log_start: i64,
839}
840
841impl OffsetStage {
842    pub fn last_stable(&self) -> i64 {
843        self.last_stable
844    }
845
846    pub fn high_watermark(&self) -> i64 {
847        self.high_watermark
848    }
849
850    pub fn log_start(&self) -> i64 {
851        self.log_start
852    }
853}
854
855/// Group Member
856#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
857pub struct GroupMember {
858    pub join_response: JoinGroupResponseMember,
859    pub last_contact: Option<SystemTime>,
860}
861
862/// Group State
863///
864/// A group is either in the process of [`GroupState::Forming`] or has [`GroupState::Formed`].
865#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
866pub enum GroupState {
867    Forming {
868        protocol_type: Option<String>,
869        protocol_name: Option<String>,
870        leader: Option<String>,
871    },
872
873    Formed {
874        protocol_type: String,
875        protocol_name: String,
876        leader: String,
877        assignments: BTreeMap<String, Bytes>,
878    },
879}
880
881impl GroupState {
882    pub fn protocol_type(&self) -> Option<String> {
883        match self {
884            Self::Forming { protocol_type, .. } => protocol_type.clone(),
885            Self::Formed { protocol_type, .. } => Some(protocol_type.clone()),
886        }
887    }
888
889    pub fn protocol_name(&self) -> Option<String> {
890        match self {
891            Self::Forming { protocol_name, .. } => protocol_name.clone(),
892            Self::Formed { protocol_name, .. } => Some(protocol_name.clone()),
893        }
894    }
895
896    pub fn leader(&self) -> Option<String> {
897        match self {
898            Self::Forming { leader, .. } => leader.clone(),
899            Self::Formed { leader, .. } => Some(leader.clone()),
900        }
901    }
902
903    pub fn assignments(&self) -> BTreeMap<String, Bytes> {
904        match self {
905            Self::Forming { .. } => BTreeMap::new(),
906            Self::Formed { assignments, .. } => assignments.clone(),
907        }
908    }
909}
910
911impl Default for GroupState {
912    fn default() -> Self {
913        Self::Forming {
914            protocol_type: None,
915            protocol_name: Some("".into()),
916            leader: None,
917        }
918    }
919}
920
921/// Consumer Group State
922///
923/// A helper type for conversion into [`consumer_group_describe_response::DescribedGroup`].
924#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
925pub enum ConsumerGroupState {
926    Unknown,
927    PreparingRebalance,
928    CompletingRebalance,
929    Stable,
930    Dead,
931    Empty,
932    Assigning,
933    Reconciling,
934}
935
936impl Display for ConsumerGroupState {
937    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
938        match self {
939            Self::Unknown => f.write_str("Unknown"),
940            Self::PreparingRebalance => f.write_str("PreparingRebalance"),
941            Self::CompletingRebalance => f.write_str("CompletingRebalance"),
942            Self::Stable => f.write_str("Stable"),
943            Self::Dead => f.write_str("Dead"),
944            Self::Empty => f.write_str("Empty"),
945            Self::Assigning => f.write_str("Assigning"),
946            Self::Reconciling => f.write_str("Reconciling"),
947        }
948    }
949}
950
951/// Group Detail
952///
953/// A helper type that can be easily converted into [`consumer_group_describe_response::DescribedGroup`].
954#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
955pub struct GroupDetail {
956    pub session_timeout_ms: i32,
957    pub rebalance_timeout_ms: Option<i32>,
958    pub members: BTreeMap<String, GroupMember>,
959    pub generation_id: i32,
960    pub skip_assignment: Option<bool>,
961    pub inception: SystemTime,
962    pub state: GroupState,
963}
964
965impl Default for GroupDetail {
966    fn default() -> Self {
967        Self {
968            session_timeout_ms: 45_000,
969            rebalance_timeout_ms: Default::default(),
970            members: Default::default(),
971            generation_id: Default::default(),
972            skip_assignment: Some(false),
973            inception: SystemTime::now(),
974            state: GroupState::default(),
975        }
976    }
977}
978
979impl From<&GroupDetail> for ConsumerGroupState {
980    fn from(value: &GroupDetail) -> Self {
981        match value {
982            GroupDetail { members, .. } if members.is_empty() => Self::Empty,
983
984            GroupDetail {
985                state: GroupState::Forming { leader: None, .. },
986                ..
987            } => Self::Assigning,
988
989            GroupDetail {
990                state: GroupState::Formed { .. },
991                ..
992            } => Self::Stable,
993
994            _ => {
995                debug!(unknown = ?value);
996                Self::Unknown
997            }
998        }
999    }
1000}
1001
1002impl From<&GroupDetail> for consumer_group_describe_response::DescribedGroup {
1003    fn from(value: &GroupDetail) -> Self {
1004        let assignor_name = match value.state {
1005            GroupState::Forming { ref leader, .. } => leader.clone().unwrap_or_default(),
1006            GroupState::Formed { ref leader, .. } => leader.clone(),
1007        };
1008
1009        let group_state = ConsumerGroupState::from(value).to_string();
1010
1011        Self::default()
1012            .error_code(ErrorCode::None.into())
1013            .error_message(Some(ErrorCode::None.to_string()))
1014            .group_id(Default::default())
1015            .group_state(group_state)
1016            .group_epoch(-1)
1017            .assignment_epoch(-1)
1018            .assignor_name(assignor_name)
1019            .members(Some([].into()))
1020            .authorized_operations(-1)
1021    }
1022}
1023
1024#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1025pub enum GroupDetailResponse {
1026    ErrorCode(ErrorCode),
1027    Found(GroupDetail),
1028}
1029
1030/// NamedGroupDetail
1031///
1032/// A helper type that can be easily converted into [`consumer_group_describe_response::DescribedGroup`]
1033/// or [`describe_groups_response::DescribedGroup`].
1034#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1035pub struct NamedGroupDetail {
1036    name: String,
1037    response: GroupDetailResponse,
1038}
1039
1040impl NamedGroupDetail {
1041    pub fn error_code(name: String, error_code: ErrorCode) -> Self {
1042        Self {
1043            name,
1044            response: GroupDetailResponse::ErrorCode(error_code),
1045        }
1046    }
1047
1048    pub fn found(name: String, found: GroupDetail) -> Self {
1049        Self {
1050            name,
1051            response: GroupDetailResponse::Found(found),
1052        }
1053    }
1054}
1055
1056impl From<&NamedGroupDetail> for consumer_group_describe_response::DescribedGroup {
1057    fn from(value: &NamedGroupDetail) -> Self {
1058        match value {
1059            NamedGroupDetail {
1060                name,
1061                response: GroupDetailResponse::Found(group_detail),
1062            } => {
1063                let assignor_name = match group_detail.state {
1064                    GroupState::Forming { ref leader, .. } => leader.clone().unwrap_or_default(),
1065                    GroupState::Formed { ref leader, .. } => leader.clone(),
1066                };
1067
1068                let group_state = ConsumerGroupState::from(group_detail).to_string();
1069
1070                Self::default()
1071                    .error_code(ErrorCode::None.into())
1072                    .error_message(Some(ErrorCode::None.to_string()))
1073                    .group_id(name.into())
1074                    .group_state(group_state)
1075                    .group_epoch(-1)
1076                    .assignment_epoch(-1)
1077                    .assignor_name(assignor_name)
1078                    .members(Some([].into()))
1079                    .authorized_operations(-1)
1080            }
1081
1082            NamedGroupDetail {
1083                name,
1084                response: GroupDetailResponse::ErrorCode(error_code),
1085            } => Self::default()
1086                .error_code((*error_code).into())
1087                .error_message(Some(error_code.to_string()))
1088                .group_id(name.into())
1089                .group_state("Unknown".into())
1090                .group_epoch(-1)
1091                .assignment_epoch(-1)
1092                .assignor_name("".into())
1093                .members(Some([].into()))
1094                .authorized_operations(-1),
1095        }
1096    }
1097}
1098
1099impl From<&NamedGroupDetail> for describe_groups_response::DescribedGroup {
1100    fn from(value: &NamedGroupDetail) -> Self {
1101        match value {
1102            NamedGroupDetail {
1103                name,
1104                response: GroupDetailResponse::Found(group_detail),
1105            } => {
1106                let group_state = ConsumerGroupState::from(group_detail).to_string();
1107
1108                let members = group_detail
1109                    .members
1110                    .keys()
1111                    .map(|member_id| {
1112                        describe_groups_response::DescribedGroupMember::default()
1113                            .member_id(member_id.into())
1114                            .group_instance_id(None)
1115                            .client_id("".into())
1116                            .client_host("".into())
1117                            .member_metadata(Bytes::new())
1118                            .member_assignment(Bytes::new())
1119                    })
1120                    .collect::<Vec<_>>();
1121
1122                Self::default()
1123                    .error_code(ErrorCode::None.into())
1124                    .group_id(name.clone())
1125                    .group_state(group_state)
1126                    .protocol_type(group_detail.state.protocol_type().unwrap_or_default())
1127                    .protocol_data("".into())
1128                    .members(Some(members))
1129                    .authorized_operations(Some(-1))
1130            }
1131
1132            NamedGroupDetail {
1133                name,
1134                response: GroupDetailResponse::ErrorCode(error_code),
1135            } => Self::default()
1136                .error_code((*error_code).into())
1137                .group_id(name.clone())
1138                .group_state("Unknown".into())
1139                .protocol_type("".into())
1140                .protocol_data("".into())
1141                .members(Some(vec![]))
1142                .authorized_operations(Some(-1)),
1143        }
1144    }
1145}
1146
1147/// Topition (topic partition) Detail
1148#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1149pub struct TopitionDetail {
1150    error: ErrorCode,
1151    topic: TopicId,
1152    partitions: Option<Vec<PartitionDetail>>,
1153}
1154
1155/// Partition Detail
1156#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1157pub struct PartitionDetail {
1158    error: ErrorCode,
1159    partition_index: i32,
1160}
1161
1162/// Version representing an `e_tag` and `version` used in conditional writes to an object store.
1163#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1164pub struct Version {
1165    e_tag: Option<String>,
1166    version: Option<String>,
1167}
1168
1169impl From<&Uuid> for Version {
1170    fn from(value: &Uuid) -> Self {
1171        Self {
1172            e_tag: Some(value.to_string()),
1173            version: None,
1174        }
1175    }
1176}
1177
1178/// Producer Id Response
1179#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1180pub struct ProducerIdResponse {
1181    pub error: ErrorCode,
1182    pub id: i64,
1183    pub epoch: i16,
1184}
1185
1186impl Default for ProducerIdResponse {
1187    fn default() -> Self {
1188        Self {
1189            error: ErrorCode::None,
1190            id: 1,
1191            epoch: 0,
1192        }
1193    }
1194}
1195
1196/// For protocol versions 0..=3 using [`AddPartitionsToTxnTopic`],
1197/// thereafter using [`AddPartitionsToTxnTransaction`].
1198#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1199pub enum TxnAddPartitionsRequest {
1200    VersionZeroToThree {
1201        transaction_id: String,
1202        producer_id: i64,
1203        producer_epoch: i16,
1204        topics: Vec<AddPartitionsToTxnTopic>,
1205    },
1206
1207    VersionFourPlus {
1208        transactions: Vec<AddPartitionsToTxnTransaction>,
1209    },
1210}
1211
1212impl TryFrom<AddPartitionsToTxnRequest> for TxnAddPartitionsRequest {
1213    type Error = Error;
1214
1215    fn try_from(value: AddPartitionsToTxnRequest) -> result::Result<Self, Self::Error> {
1216        match value {
1217            AddPartitionsToTxnRequest {
1218                transactions: None,
1219                v_3_and_below_transactional_id: Some(transactional_id),
1220                v_3_and_below_producer_id: Some(producer_id),
1221                v_3_and_below_producer_epoch: Some(producer_epoch),
1222                v_3_and_below_topics: Some(topics),
1223                ..
1224            } => Ok(Self::VersionZeroToThree {
1225                transaction_id: transactional_id,
1226                producer_id,
1227                producer_epoch,
1228                topics,
1229            }),
1230
1231            AddPartitionsToTxnRequest {
1232                transactions: Some(transactions),
1233                v_3_and_below_transactional_id: None,
1234                v_3_and_below_producer_id: None,
1235                v_3_and_below_producer_epoch: None,
1236                v_3_and_below_topics: None,
1237                ..
1238            } => Ok(Self::VersionFourPlus { transactions }),
1239
1240            unexpected => Err(Error::UnexpectedAddPartitionsToTxnRequest(Box::new(
1241                unexpected,
1242            ))),
1243        }
1244    }
1245}
1246
1247/// Transaction Add Partitions Response
1248///
1249/// For protocol versions 0..=3 using `AddPartitionsToTxnTopic`, thereafter using `AddPartitionsToTxnTransaction`.
1250#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1251pub enum TxnAddPartitionsResponse {
1252    VersionZeroToThree(Vec<AddPartitionsToTxnTopicResult>),
1253    VersionFourPlus(Vec<AddPartitionsToTxnResult>),
1254}
1255
1256impl TxnAddPartitionsResponse {
1257    pub fn zero_to_three(&self) -> &[AddPartitionsToTxnTopicResult] {
1258        match self {
1259            Self::VersionZeroToThree(result) => result.as_slice(),
1260            Self::VersionFourPlus(_) => &[][..],
1261        }
1262    }
1263
1264    pub fn four_plus(&self) -> &[AddPartitionsToTxnResult] {
1265        match self {
1266            Self::VersionZeroToThree(_) => &[][..],
1267            Self::VersionFourPlus(result) => result.as_slice(),
1268        }
1269    }
1270}
1271
1272/// Transaction Offset Commit Request
1273#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1274pub struct TxnOffsetCommitRequest {
1275    pub transaction_id: String,
1276    pub group_id: String,
1277    pub producer_id: i64,
1278    pub producer_epoch: i16,
1279    pub generation_id: Option<i32>,
1280    pub member_id: Option<String>,
1281    pub group_instance_id: Option<String>,
1282    pub topics: Vec<TxnOffsetCommitRequestTopic>,
1283}
1284
1285/// Transaction State
1286#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1287pub enum TxnState {
1288    Begin,
1289    PrepareCommit,
1290    PrepareAbort,
1291    Committed,
1292    Aborted,
1293}
1294
1295impl TxnState {
1296    pub fn is_prepared(&self) -> bool {
1297        match self {
1298            Self::PrepareAbort | Self::PrepareCommit => true,
1299            _otherwise => false,
1300        }
1301    }
1302}
1303
1304impl FromStr for TxnState {
1305    type Err = Error;
1306
1307    fn from_str(s: &str) -> Result<Self, Self::Err> {
1308        match s {
1309            "ABORTED" => Ok(TxnState::Aborted),
1310            "BEGIN" => Ok(TxnState::Begin),
1311            "COMMITTED" => Ok(TxnState::Committed),
1312            "PREPARE_ABORT" => Ok(TxnState::PrepareAbort),
1313            "PREPARE_COMMIT" => Ok(TxnState::PrepareCommit),
1314            otherwise => Err(Error::UnknownTxnState(otherwise.to_owned())),
1315        }
1316    }
1317}
1318
1319impl TryFrom<String> for TxnState {
1320    type Error = Error;
1321
1322    fn try_from(value: String) -> Result<Self, Self::Error> {
1323        Self::from_str(&value)
1324    }
1325}
1326
1327impl From<TxnState> for String {
1328    fn from(value: TxnState) -> Self {
1329        match value {
1330            TxnState::Begin => "BEGIN".into(),
1331            TxnState::PrepareCommit => "PREPARE_COMMIT".into(),
1332            TxnState::PrepareAbort => "PREPARE_ABORT".into(),
1333            TxnState::Committed => "COMMITTED".into(),
1334            TxnState::Aborted => "ABORTED".into(),
1335        }
1336    }
1337}
1338
1339/// Storage
1340///
1341/// The Core storage abstraction. All storage engines implement this type.
1342#[async_trait]
1343pub trait Storage: Debug + Send + Sync + 'static {
1344    /// On startup a broker will register with storage.
1345    async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()>;
1346
1347    /// Create a topic on this storage.
1348    async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid>;
1349
1350    /// Incrementally alter a resource on this storage.
1351    async fn incremental_alter_resource(
1352        &self,
1353        resource: AlterConfigsResource,
1354    ) -> Result<AlterConfigsResourceResponse>;
1355
1356    /// Delete records on this storage.
1357    async fn delete_records(
1358        &self,
1359        topics: &[DeleteRecordsTopic],
1360    ) -> Result<Vec<DeleteRecordsTopicResult>>;
1361
1362    /// Delete a topic from this storage.
1363    async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode>;
1364
1365    /// Query the brokers registered with this storage.
1366    async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>>;
1367
1368    /// Produce a deflated batch to this storage.
1369    async fn produce(
1370        &self,
1371        transaction_id: Option<&str>,
1372        topition: &Topition,
1373        batch: deflated::Batch,
1374    ) -> Result<i64>;
1375
1376    /// Fetch deflated batches from storage.
1377    async fn fetch(
1378        &self,
1379        topition: &'_ Topition,
1380        offset: i64,
1381        min_bytes: u32,
1382        max_bytes: u32,
1383        isolation: IsolationLevel,
1384        max_wait: Duration,
1385    ) -> Result<Vec<deflated::Batch>>;
1386
1387    /// Query the offset stage for a topic partition.
1388    async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage>;
1389
1390    /// Query the offsets for one or more topic partitions.
1391    async fn list_offsets(
1392        &self,
1393        isolation_level: IsolationLevel,
1394        offsets: &[(Topition, ListOffset)],
1395    ) -> Result<Vec<(Topition, ListOffsetResponse)>>;
1396
1397    /// Commit offsets for one or more topic partitions in a consumer group.
1398    async fn offset_commit(
1399        &self,
1400        group_id: &str,
1401        retention_time_ms: Option<Duration>,
1402        offsets: &[(Topition, OffsetCommitRequest)],
1403    ) -> Result<Vec<(Topition, ErrorCode)>>;
1404
1405    /// Fetch committed offsets for one or more topic partitions in a consumer group.
1406    async fn offset_fetch(
1407        &self,
1408        group_id: Option<&str>,
1409        topics: &[Topition],
1410        require_stable: Option<bool>,
1411    ) -> Result<BTreeMap<Topition, i64>>;
1412
1413    /// Fetch all committed offsets in a consumer group.
1414    async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>>;
1415
1416    /// Query broker and topic metadata.
1417    async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse>;
1418
1419    async fn upsert_user_scram_credential(
1420        &self,
1421        user: &str,
1422        mechanism: ScramMechanism,
1423        credential: ScramCredential,
1424    ) -> Result<()>;
1425
1426    async fn delete_user_scram_credential(
1427        &self,
1428        user: &str,
1429        mechanism: ScramMechanism,
1430    ) -> Result<()>;
1431
1432    async fn user_scram_credential(
1433        &self,
1434        user: &str,
1435        mechanism: ScramMechanism,
1436    ) -> Result<Option<ScramCredential>>;
1437
1438    /// Query the configuration of a resource in this storage.
1439    async fn describe_config(
1440        &self,
1441        name: &str,
1442        resource: ConfigResource,
1443        keys: Option<&[String]>,
1444    ) -> Result<DescribeConfigsResult>;
1445
1446    /// Query available groups optionally with a state filter.
1447    async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>>;
1448
1449    /// Delete one or more groups from storage.
1450    async fn delete_groups(
1451        &self,
1452        group_ids: Option<&[String]>,
1453    ) -> Result<Vec<DeletableGroupResult>>;
1454
1455    /// Describe the groups found in this storage.
1456    async fn describe_groups(
1457        &self,
1458        group_ids: Option<&[String]>,
1459        include_authorized_operations: bool,
1460    ) -> Result<Vec<NamedGroupDetail>>;
1461
1462    /// Describe the topic partitions found in this storage.
1463    async fn describe_topic_partitions(
1464        &self,
1465        topics: Option<&[TopicId]>,
1466        partition_limit: i32,
1467        cursor: Option<Topition>,
1468    ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>>;
1469
1470    /// Conditionally update the state of a group in this storage.
1471    async fn update_group(
1472        &self,
1473        group_id: &str,
1474        detail: GroupDetail,
1475        version: Option<Version>,
1476    ) -> Result<Version, UpdateError<GroupDetail>>;
1477
1478    /// Initialise a transactional or idempotent producer in this storage.
1479    async fn init_producer(
1480        &self,
1481        transaction_id: Option<&str>,
1482        transaction_timeout_ms: i32,
1483        producer_id: Option<i64>,
1484        producer_epoch: Option<i16>,
1485    ) -> Result<ProducerIdResponse>;
1486
1487    /// Add offsets to a transaction for a producer.
1488    async fn txn_add_offsets(
1489        &self,
1490        transaction_id: &str,
1491        producer_id: i64,
1492        producer_epoch: i16,
1493        group_id: &str,
1494    ) -> Result<ErrorCode>;
1495
1496    /// Add partitions to a transaction.
1497    async fn txn_add_partitions(
1498        &self,
1499        partitions: TxnAddPartitionsRequest,
1500    ) -> Result<TxnAddPartitionsResponse>;
1501
1502    /// Commit an offset within a transaction.
1503    async fn txn_offset_commit(
1504        &self,
1505        offsets: TxnOffsetCommitRequest,
1506    ) -> Result<Vec<TxnOffsetCommitResponseTopic>>;
1507
1508    /// Commit or abort a running transaction.
1509    async fn txn_end(
1510        &self,
1511        transaction_id: &str,
1512        producer_id: i64,
1513        producer_epoch: i16,
1514        committed: bool,
1515    ) -> Result<ErrorCode>;
1516
1517    /// Run periodic maintenance on this storage.
1518    async fn maintain(&self, _now: SystemTime) -> Result<()> {
1519        Ok(())
1520    }
1521
1522    async fn cluster_id(&self) -> Result<String>;
1523
1524    async fn node(&self) -> Result<i32>;
1525
1526    async fn advertised_listener(&self) -> Result<Url>;
1527
1528    async fn ping(&self) -> Result<()>;
1529}
1530
1531// The existence of this function makes the compiler catch if the Storage
1532// trait is "object-safe" or not.
1533fn _assert_trait_object(_s: &dyn Storage) {}
1534
1535#[async_trait]
1536impl<T> Storage for Arc<T>
1537where
1538    T: Storage + ?Sized,
1539{
1540    async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
1541        self.as_ref().register_broker(broker_registration).await
1542    }
1543
1544    async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
1545        self.as_ref().create_topic(topic, validate_only).await
1546    }
1547
1548    async fn incremental_alter_resource(
1549        &self,
1550        resource: AlterConfigsResource,
1551    ) -> Result<AlterConfigsResourceResponse> {
1552        self.as_ref().incremental_alter_resource(resource).await
1553    }
1554
1555    async fn delete_records(
1556        &self,
1557        topics: &[DeleteRecordsTopic],
1558    ) -> Result<Vec<DeleteRecordsTopicResult>> {
1559        self.as_ref().delete_records(topics).await
1560    }
1561
1562    async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
1563        self.as_ref().delete_topic(topic).await
1564    }
1565
1566    async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
1567        self.as_ref().brokers().await
1568    }
1569
1570    async fn produce(
1571        &self,
1572        transaction_id: Option<&str>,
1573        topition: &Topition,
1574        batch: deflated::Batch,
1575    ) -> Result<i64> {
1576        self.as_ref().produce(transaction_id, topition, batch).await
1577    }
1578
1579    async fn fetch(
1580        &self,
1581        topition: &'_ Topition,
1582        offset: i64,
1583        min_bytes: u32,
1584        max_bytes: u32,
1585        isolation: IsolationLevel,
1586        max_wait: Duration,
1587    ) -> Result<Vec<deflated::Batch>> {
1588        self.as_ref()
1589            .fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
1590            .await
1591    }
1592
1593    async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
1594        self.as_ref().offset_stage(topition).await
1595    }
1596
1597    async fn list_offsets(
1598        &self,
1599        isolation_level: IsolationLevel,
1600        offsets: &[(Topition, ListOffset)],
1601    ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
1602        self.as_ref().list_offsets(isolation_level, offsets).await
1603    }
1604
1605    async fn offset_commit(
1606        &self,
1607        group_id: &str,
1608        retention_time_ms: Option<Duration>,
1609        offsets: &[(Topition, OffsetCommitRequest)],
1610    ) -> Result<Vec<(Topition, ErrorCode)>> {
1611        self.as_ref()
1612            .offset_commit(group_id, retention_time_ms, offsets)
1613            .await
1614    }
1615
1616    async fn offset_fetch(
1617        &self,
1618        group_id: Option<&str>,
1619        topics: &[Topition],
1620        require_stable: Option<bool>,
1621    ) -> Result<BTreeMap<Topition, i64>> {
1622        self.as_ref()
1623            .offset_fetch(group_id, topics, require_stable)
1624            .await
1625    }
1626
1627    async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
1628        self.as_ref().committed_offset_topitions(group_id).await
1629    }
1630
1631    async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
1632        self.as_ref().metadata(topics).await
1633    }
1634
1635    async fn upsert_user_scram_credential(
1636        &self,
1637        user: &str,
1638        mechanism: ScramMechanism,
1639        credential: ScramCredential,
1640    ) -> Result<()> {
1641        self.as_ref()
1642            .upsert_user_scram_credential(user, mechanism, credential)
1643            .await
1644    }
1645
1646    async fn delete_user_scram_credential(
1647        &self,
1648        user: &str,
1649        mechanism: ScramMechanism,
1650    ) -> Result<()> {
1651        self.as_ref()
1652            .delete_user_scram_credential(user, mechanism)
1653            .await
1654    }
1655
1656    async fn user_scram_credential(
1657        &self,
1658        user: &str,
1659        mechanism: ScramMechanism,
1660    ) -> Result<Option<ScramCredential>> {
1661        self.as_ref().user_scram_credential(user, mechanism).await
1662    }
1663
1664    async fn describe_config(
1665        &self,
1666        name: &str,
1667        resource: ConfigResource,
1668        keys: Option<&[String]>,
1669    ) -> Result<DescribeConfigsResult> {
1670        self.as_ref().describe_config(name, resource, keys).await
1671    }
1672
1673    async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
1674        self.as_ref().list_groups(states_filter).await
1675    }
1676
1677    async fn delete_groups(
1678        &self,
1679        group_ids: Option<&[String]>,
1680    ) -> Result<Vec<DeletableGroupResult>> {
1681        self.as_ref().delete_groups(group_ids).await
1682    }
1683
1684    async fn describe_groups(
1685        &self,
1686        group_ids: Option<&[String]>,
1687        include_authorized_operations: bool,
1688    ) -> Result<Vec<NamedGroupDetail>> {
1689        self.as_ref()
1690            .describe_groups(group_ids, include_authorized_operations)
1691            .await
1692    }
1693
1694    async fn describe_topic_partitions(
1695        &self,
1696        topics: Option<&[TopicId]>,
1697        partition_limit: i32,
1698        cursor: Option<Topition>,
1699    ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
1700        self.as_ref()
1701            .describe_topic_partitions(topics, partition_limit, cursor)
1702            .await
1703    }
1704
1705    async fn update_group(
1706        &self,
1707        group_id: &str,
1708        detail: GroupDetail,
1709        version: Option<Version>,
1710    ) -> Result<Version, UpdateError<GroupDetail>> {
1711        self.as_ref().update_group(group_id, detail, version).await
1712    }
1713
1714    async fn init_producer(
1715        &self,
1716        transaction_id: Option<&str>,
1717        transaction_timeout_ms: i32,
1718        producer_id: Option<i64>,
1719        producer_epoch: Option<i16>,
1720    ) -> Result<ProducerIdResponse> {
1721        self.as_ref()
1722            .init_producer(
1723                transaction_id,
1724                transaction_timeout_ms,
1725                producer_id,
1726                producer_epoch,
1727            )
1728            .await
1729    }
1730
1731    async fn txn_add_offsets(
1732        &self,
1733        transaction_id: &str,
1734        producer_id: i64,
1735        producer_epoch: i16,
1736        group_id: &str,
1737    ) -> Result<ErrorCode> {
1738        self.as_ref()
1739            .txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
1740            .await
1741    }
1742
1743    async fn txn_add_partitions(
1744        &self,
1745        partitions: TxnAddPartitionsRequest,
1746    ) -> Result<TxnAddPartitionsResponse> {
1747        self.as_ref().txn_add_partitions(partitions).await
1748    }
1749
1750    async fn txn_offset_commit(
1751        &self,
1752        offsets: TxnOffsetCommitRequest,
1753    ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
1754        self.as_ref().txn_offset_commit(offsets).await
1755    }
1756
1757    async fn txn_end(
1758        &self,
1759        transaction_id: &str,
1760        producer_id: i64,
1761        producer_epoch: i16,
1762        committed: bool,
1763    ) -> Result<ErrorCode> {
1764        self.as_ref()
1765            .txn_end(transaction_id, producer_id, producer_epoch, committed)
1766            .await
1767    }
1768
1769    async fn maintain(&self, now: SystemTime) -> Result<()> {
1770        self.as_ref().maintain(now).await
1771    }
1772
1773    async fn cluster_id(&self) -> Result<String> {
1774        self.as_ref().cluster_id().await
1775    }
1776
1777    async fn node(&self) -> Result<i32> {
1778        self.as_ref().node().await
1779    }
1780
1781    async fn advertised_listener(&self) -> Result<Url> {
1782        self.as_ref().advertised_listener().await
1783    }
1784
1785    async fn ping(&self) -> Result<()> {
1786        self.as_ref().ping().await
1787    }
1788}
1789
1790#[async_trait]
1791impl<T> Storage for Box<T>
1792where
1793    T: Storage + ?Sized,
1794{
1795    async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
1796        self.as_ref().register_broker(broker_registration).await
1797    }
1798
1799    async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
1800        self.as_ref().create_topic(topic, validate_only).await
1801    }
1802
1803    async fn incremental_alter_resource(
1804        &self,
1805        resource: AlterConfigsResource,
1806    ) -> Result<AlterConfigsResourceResponse> {
1807        self.as_ref().incremental_alter_resource(resource).await
1808    }
1809
1810    async fn delete_records(
1811        &self,
1812        topics: &[DeleteRecordsTopic],
1813    ) -> Result<Vec<DeleteRecordsTopicResult>> {
1814        self.as_ref().delete_records(topics).await
1815    }
1816
1817    async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
1818        self.as_ref().delete_topic(topic).await
1819    }
1820
1821    async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
1822        self.as_ref().brokers().await
1823    }
1824
1825    async fn produce(
1826        &self,
1827        transaction_id: Option<&str>,
1828        topition: &Topition,
1829        batch: deflated::Batch,
1830    ) -> Result<i64> {
1831        self.as_ref().produce(transaction_id, topition, batch).await
1832    }
1833
1834    async fn fetch(
1835        &self,
1836        topition: &'_ Topition,
1837        offset: i64,
1838        min_bytes: u32,
1839        max_bytes: u32,
1840        isolation: IsolationLevel,
1841        max_wait: Duration,
1842    ) -> Result<Vec<deflated::Batch>> {
1843        self.as_ref()
1844            .fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
1845            .await
1846    }
1847
1848    async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
1849        self.as_ref().offset_stage(topition).await
1850    }
1851
1852    async fn list_offsets(
1853        &self,
1854        isolation_level: IsolationLevel,
1855        offsets: &[(Topition, ListOffset)],
1856    ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
1857        self.as_ref().list_offsets(isolation_level, offsets).await
1858    }
1859
1860    async fn offset_commit(
1861        &self,
1862        group_id: &str,
1863        retention_time_ms: Option<Duration>,
1864        offsets: &[(Topition, OffsetCommitRequest)],
1865    ) -> Result<Vec<(Topition, ErrorCode)>> {
1866        self.as_ref()
1867            .offset_commit(group_id, retention_time_ms, offsets)
1868            .await
1869    }
1870
1871    async fn offset_fetch(
1872        &self,
1873        group_id: Option<&str>,
1874        topics: &[Topition],
1875        require_stable: Option<bool>,
1876    ) -> Result<BTreeMap<Topition, i64>> {
1877        self.as_ref()
1878            .offset_fetch(group_id, topics, require_stable)
1879            .await
1880    }
1881
1882    async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
1883        self.as_ref().committed_offset_topitions(group_id).await
1884    }
1885
1886    async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
1887        self.as_ref().metadata(topics).await
1888    }
1889
1890    async fn upsert_user_scram_credential(
1891        &self,
1892        user: &str,
1893        mechanism: ScramMechanism,
1894        credential: ScramCredential,
1895    ) -> Result<()> {
1896        self.as_ref()
1897            .upsert_user_scram_credential(user, mechanism, credential)
1898            .await
1899    }
1900
1901    async fn delete_user_scram_credential(
1902        &self,
1903        user: &str,
1904        mechanism: ScramMechanism,
1905    ) -> Result<()> {
1906        self.as_ref()
1907            .delete_user_scram_credential(user, mechanism)
1908            .await
1909    }
1910
1911    async fn user_scram_credential(
1912        &self,
1913        user: &str,
1914        mechanism: ScramMechanism,
1915    ) -> Result<Option<ScramCredential>> {
1916        self.as_ref().user_scram_credential(user, mechanism).await
1917    }
1918
1919    async fn describe_config(
1920        &self,
1921        name: &str,
1922        resource: ConfigResource,
1923        keys: Option<&[String]>,
1924    ) -> Result<DescribeConfigsResult> {
1925        self.as_ref().describe_config(name, resource, keys).await
1926    }
1927
1928    async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
1929        self.as_ref().list_groups(states_filter).await
1930    }
1931
1932    async fn delete_groups(
1933        &self,
1934        group_ids: Option<&[String]>,
1935    ) -> Result<Vec<DeletableGroupResult>> {
1936        self.as_ref().delete_groups(group_ids).await
1937    }
1938
1939    async fn describe_groups(
1940        &self,
1941        group_ids: Option<&[String]>,
1942        include_authorized_operations: bool,
1943    ) -> Result<Vec<NamedGroupDetail>> {
1944        self.as_ref()
1945            .describe_groups(group_ids, include_authorized_operations)
1946            .await
1947    }
1948
1949    async fn describe_topic_partitions(
1950        &self,
1951        topics: Option<&[TopicId]>,
1952        partition_limit: i32,
1953        cursor: Option<Topition>,
1954    ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
1955        self.as_ref()
1956            .describe_topic_partitions(topics, partition_limit, cursor)
1957            .await
1958    }
1959
1960    async fn update_group(
1961        &self,
1962        group_id: &str,
1963        detail: GroupDetail,
1964        version: Option<Version>,
1965    ) -> Result<Version, UpdateError<GroupDetail>> {
1966        self.as_ref().update_group(group_id, detail, version).await
1967    }
1968
1969    async fn init_producer(
1970        &self,
1971        transaction_id: Option<&str>,
1972        transaction_timeout_ms: i32,
1973        producer_id: Option<i64>,
1974        producer_epoch: Option<i16>,
1975    ) -> Result<ProducerIdResponse> {
1976        self.as_ref()
1977            .init_producer(
1978                transaction_id,
1979                transaction_timeout_ms,
1980                producer_id,
1981                producer_epoch,
1982            )
1983            .await
1984    }
1985
1986    async fn txn_add_offsets(
1987        &self,
1988        transaction_id: &str,
1989        producer_id: i64,
1990        producer_epoch: i16,
1991        group_id: &str,
1992    ) -> Result<ErrorCode> {
1993        self.as_ref()
1994            .txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
1995            .await
1996    }
1997
1998    async fn txn_add_partitions(
1999        &self,
2000        partitions: TxnAddPartitionsRequest,
2001    ) -> Result<TxnAddPartitionsResponse> {
2002        self.as_ref().txn_add_partitions(partitions).await
2003    }
2004
2005    async fn txn_offset_commit(
2006        &self,
2007        offsets: TxnOffsetCommitRequest,
2008    ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
2009        self.as_ref().txn_offset_commit(offsets).await
2010    }
2011
2012    async fn txn_end(
2013        &self,
2014        transaction_id: &str,
2015        producer_id: i64,
2016        producer_epoch: i16,
2017        committed: bool,
2018    ) -> Result<ErrorCode> {
2019        self.as_ref()
2020            .txn_end(transaction_id, producer_id, producer_epoch, committed)
2021            .await
2022    }
2023
2024    async fn maintain(&self, now: SystemTime) -> Result<()> {
2025        self.as_ref().maintain(now).await
2026    }
2027
2028    async fn cluster_id(&self) -> Result<String> {
2029        self.as_ref().cluster_id().await
2030    }
2031
2032    async fn node(&self) -> Result<i32> {
2033        self.as_ref().node().await
2034    }
2035
2036    async fn advertised_listener(&self) -> Result<Url> {
2037        self.as_ref().advertised_listener().await
2038    }
2039
2040    async fn ping(&self) -> Result<()> {
2041        self.as_ref().ping().await
2042    }
2043}
2044
2045pub type DynStorage = dyn Storage;
2046pub type ArcDynStorage = Arc<DynStorage>;
2047
2048/// Conditional Update Errors
2049#[derive(Clone, Debug, thiserror::Error)]
2050pub enum UpdateError<T> {
2051    Error(#[from] Error),
2052
2053    MissingEtag,
2054
2055    Outdated { current: Box<T>, version: Version },
2056
2057    SerdeJson(Arc<serde_json::Error>),
2058
2059    Uuid(#[from] uuid::Error),
2060}
2061
2062#[cfg(feature = "libsql")]
2063impl<T> From<libsql::Error> for UpdateError<T> {
2064    fn from(value: libsql::Error) -> Self {
2065        Self::Error(Error::from(value))
2066    }
2067}
2068
2069#[cfg(feature = "turso")]
2070impl<T> From<turso::Error> for UpdateError<T> {
2071    fn from(value: turso::Error) -> Self {
2072        Self::Error(Error::from(value))
2073    }
2074}
2075
2076#[cfg(any(feature = "dynostore", feature = "slatedb"))]
2077impl<T> From<object_store::Error> for UpdateError<T> {
2078    fn from(value: object_store::Error) -> Self {
2079        Self::Error(Error::from(value))
2080    }
2081}
2082
2083impl<T> From<serde_json::Error> for UpdateError<T> {
2084    fn from(value: serde_json::Error) -> Self {
2085        Self::SerdeJson(Arc::new(value))
2086    }
2087}
2088
2089#[cfg(feature = "postgres")]
2090impl<T> From<tokio_postgres::error::Error> for UpdateError<T> {
2091    fn from(value: tokio_postgres::error::Error) -> Self {
2092        Self::Error(Error::from(value))
2093    }
2094}
2095
2096/// Storage Container
2097#[derive(Clone)]
2098#[cfg_attr(
2099    not(any(
2100        feature = "dynostore",
2101        feature = "libsql",
2102        feature = "postgres",
2103        feature = "slatedb",
2104        feature = "turso"
2105    )),
2106    allow(missing_copy_implementations)
2107)]
2108pub enum StorageContainer {
2109    Null(null::Engine),
2110
2111    #[cfg(feature = "postgres")]
2112    Postgres(Postgres),
2113
2114    #[cfg(feature = "dynostore")]
2115    DynoStore(DynoStore),
2116
2117    #[cfg(feature = "libsql")]
2118    Lite(lite::Engine),
2119
2120    #[cfg(feature = "slatedb")]
2121    Slate(slate::Engine),
2122
2123    #[cfg(feature = "turso")]
2124    Turso(limbo::Engine),
2125}
2126
2127impl Debug for StorageContainer {
2128    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
2129        match self {
2130            Self::Null(_) => f.debug_tuple(stringify!(StorageContainer::Null)).finish(),
2131
2132            #[cfg(feature = "postgres")]
2133            Self::Postgres(_) => f
2134                .debug_tuple(stringify!(StorageContainer::Postgres))
2135                .finish(),
2136
2137            #[cfg(feature = "dynostore")]
2138            Self::DynoStore(_) => f
2139                .debug_tuple(stringify!(StorageContainer::DynoStore))
2140                .finish(),
2141
2142            #[cfg(feature = "libsql")]
2143            Self::Lite(_) => f.debug_tuple(stringify!(StorageContainer::Lite)).finish(),
2144
2145            #[cfg(feature = "slatedb")]
2146            Self::Slate(_) => f.debug_tuple(stringify!(StorageContainer::Slate)).finish(),
2147
2148            #[cfg(feature = "turso")]
2149            Self::Turso(_) => f.debug_tuple(stringify!(StorageContainer::Turso)).finish(),
2150        }
2151    }
2152}
2153
2154impl StorageContainer {
2155    pub fn builder() -> PhantomBuilder {
2156        PhantomBuilder::default()
2157    }
2158}
2159
2160/// A [`StorageContainer`] builder
2161#[derive(Clone, Debug, Default)]
2162pub struct Builder<N, C, A, S> {
2163    node_id: N,
2164    cluster_id: C,
2165    advertised_listener: A,
2166    storage: S,
2167    schema_registry: Option<Registry>,
2168    lake_house: Option<House>,
2169    silent: bool,
2170
2171    cancellation: CancellationToken,
2172}
2173
2174type PhantomBuilder =
2175    Builder<PhantomData<i32>, PhantomData<String>, PhantomData<Url>, PhantomData<Url>>;
2176
2177impl<N, C, A, S> Builder<N, C, A, S> {
2178    pub fn node_id(self, node_id: i32) -> Builder<i32, C, A, S> {
2179        Builder {
2180            node_id,
2181            cluster_id: self.cluster_id,
2182            advertised_listener: self.advertised_listener,
2183            storage: self.storage,
2184            schema_registry: self.schema_registry,
2185            lake_house: self.lake_house,
2186            silent: self.silent,
2187            cancellation: self.cancellation,
2188        }
2189    }
2190
2191    pub fn cluster_id(self, cluster_id: impl Into<String>) -> Builder<N, String, A, S> {
2192        Builder {
2193            node_id: self.node_id,
2194            cluster_id: cluster_id.into(),
2195            advertised_listener: self.advertised_listener,
2196            storage: self.storage,
2197            schema_registry: self.schema_registry,
2198            lake_house: self.lake_house,
2199            silent: self.silent,
2200            cancellation: self.cancellation,
2201        }
2202    }
2203
2204    pub fn advertised_listener(self, advertised_listener: impl Into<Url>) -> Builder<N, C, Url, S> {
2205        Builder {
2206            node_id: self.node_id,
2207            cluster_id: self.cluster_id,
2208            advertised_listener: advertised_listener.into(),
2209            storage: self.storage,
2210            schema_registry: self.schema_registry,
2211            lake_house: self.lake_house,
2212            silent: self.silent,
2213            cancellation: self.cancellation,
2214        }
2215    }
2216
2217    pub fn storage(self, storage: Url) -> Builder<N, C, A, Url> {
2218        debug!(%storage);
2219
2220        Builder {
2221            node_id: self.node_id,
2222            cluster_id: self.cluster_id,
2223            advertised_listener: self.advertised_listener,
2224            storage,
2225            schema_registry: self.schema_registry,
2226            lake_house: self.lake_house,
2227            silent: self.silent,
2228            cancellation: self.cancellation,
2229        }
2230    }
2231
2232    pub fn schema_registry(self, schema_registry: Option<Registry>) -> Self {
2233        _ = schema_registry
2234            .as_ref()
2235            .inspect(|schema_registry| debug!(?schema_registry));
2236
2237        Self {
2238            schema_registry,
2239            ..self
2240        }
2241    }
2242
2243    pub fn lake_house(self, lake_house: Option<House>) -> Self {
2244        _ = lake_house
2245            .as_ref()
2246            .inspect(|lake_house| debug!(?lake_house));
2247
2248        Self { lake_house, ..self }
2249    }
2250
2251    pub fn cancellation(self, cancellation: CancellationToken) -> Self {
2252        Self {
2253            cancellation,
2254            ..self
2255        }
2256    }
2257
2258    pub fn silent(self, silent: bool) -> Self {
2259        Self { silent, ..self }
2260    }
2261}
2262
2263impl Builder<i32, String, Url, Url> {
2264    pub async fn build(self) -> Result<Arc<Box<dyn Storage>>> {
2265        let storage = match self.storage.scheme() {
2266            #[cfg(feature = "postgres")]
2267            "postgres" | "postgresql" => Postgres::builder(self.storage.to_string().as_str())
2268                .map(|builder| builder.cluster(self.cluster_id.as_str()))
2269                .map(|builder| builder.node(self.node_id))
2270                .map(|builder| builder.advertised_listener(self.advertised_listener.clone()))
2271                .map(|builder| builder.schemas(self.schema_registry))
2272                .map(|builder| builder.lake(self.lake_house.clone()))
2273                .map(|builder| builder.build())
2274                .map(|storage| Box::new(storage) as Box<dyn Storage>)
2275                .map(Arc::new),
2276
2277            #[cfg(not(feature = "postgres"))]
2278            "postgres" | "postgresql" => Err(Error::FeatureNotEnabled {
2279                feature: "postgres".into(),
2280                message: self.storage.to_string(),
2281            }),
2282
2283            #[cfg(feature = "dynostore")]
2284            "s3" => {
2285                use crate::batch::ProduceRequestBatcher;
2286
2287                let bucket_name = self.storage.host_str().unwrap_or("nisshi");
2288
2289                let minimum_size = self.storage.query_pairs().find_map(|(k, v)| {
2290                    if k == "batch_min_size" {
2291                        human_units::Size::from_str(v.as_ref())
2292                            .map(|size| size.0)
2293                            .inspect_err(|err| warn!(storage = %self.storage, v = v.as_ref(), ?err))
2294                            .ok()
2295                            .and_then(|size| usize::try_from(size).ok())
2296                    } else {
2297                        None
2298                    }
2299                });
2300
2301                let maximum_delay = self.storage.query_pairs().find_map(|(k, v)| {
2302                    if k == "batch_max_delay" {
2303                        human_units::Duration::from_str(v.as_ref())
2304                            .map(|duration| duration.0)
2305                            .inspect_err(|err| warn!(storage = %self.storage, v = v.as_ref(), ?err))
2306                            .ok()
2307                    } else {
2308                        None
2309                    }
2310                });
2311
2312                debug!(?minimum_size, ?maximum_delay);
2313
2314                AmazonS3Builder::from_env()
2315                    .with_bucket_name(bucket_name)
2316                    .with_conditional_put(S3ConditionalPut::ETagMatch)
2317                    .build()
2318                    .map(|object_store| {
2319                        DynoStore::new(self.cluster_id.as_str(), self.node_id, object_store)
2320                            .advertised_listener(self.advertised_listener.clone())
2321                            .schemas(self.schema_registry)
2322                            .lake(self.lake_house.clone())
2323                    })
2324                    .map(|storage| {
2325                        ProduceRequestBatcher::new(storage)
2326                            .with_minimum_size(minimum_size)
2327                            .with_maximum_delay(maximum_delay)
2328                    })
2329                    .map(|storage| Box::new(storage) as Box<dyn Storage>)
2330                    .map(Arc::new)
2331                    .map_err(Into::into)
2332            }
2333
2334            #[cfg(feature = "dynostore")]
2335            "gs" => {
2336                use std::num::NonZeroU32;
2337
2338                use object_store::gcp::GoogleCloudStorageBuilder;
2339
2340                use crate::{batch::ProduceRequestBatcher, gcs::limit::PutRateLimiter};
2341
2342                let bucket_name = self.storage.host_str().unwrap_or("nisshi");
2343
2344                let minimum_size = self.storage.query_pairs().find_map(|(k, v)| {
2345                    if k == "batch_min_size" {
2346                        human_units::Size::from_str(v.as_ref())
2347                            .map(|size| size.0)
2348                            .inspect_err(|err| warn!(storage = %self.storage, v = v.as_ref(), ?err))
2349                            .ok()
2350                            .and_then(|size| usize::try_from(size).ok())
2351                    } else {
2352                        None
2353                    }
2354                });
2355
2356                let maximum_delay = self.storage.query_pairs().find_map(|(k, v)| {
2357                    if k == "batch_max_delay" {
2358                        human_units::Duration::from_str(v.as_ref())
2359                            .map(|duration| duration.0)
2360                            .inspect_err(|err| warn!(storage = %self.storage, v = v.as_ref(), ?err))
2361                            .ok()
2362                    } else {
2363                        None
2364                    }
2365                });
2366
2367                GoogleCloudStorageBuilder::from_env()
2368                    .with_bucket_name(bucket_name)
2369                    .build()
2370                    .map(|object_store| {
2371                        PutRateLimiter::new(object_store, Duration::from_mins(5))
2372                            .with_rate_per_second(NonZeroU32::new(1))
2373                            .with_jitter(Some(Duration::from_millis(50)))
2374                    })
2375                    .map(|object_store| {
2376                        DynoStore::new(self.cluster_id.as_str(), self.node_id, object_store)
2377                            .advertised_listener(self.advertised_listener.clone())
2378                            .schemas(self.schema_registry)
2379                            .lake(self.lake_house.clone())
2380                    })
2381                    .map(|storage| {
2382                        ProduceRequestBatcher::new(storage)
2383                            .with_minimum_size(minimum_size)
2384                            .with_maximum_delay(maximum_delay)
2385                    })
2386                    .map(|storage| Box::new(storage) as Box<dyn Storage>)
2387                    .map(Arc::new)
2388                    .map_err(Into::into)
2389            }
2390
2391            #[cfg(feature = "dynostore")]
2392            "memory" => Ok(
2393                DynoStore::new(self.cluster_id.as_str(), self.node_id, InMemory::new())
2394                    .advertised_listener(self.advertised_listener.clone())
2395                    .schemas(self.schema_registry)
2396                    .lake(self.lake_house.clone()),
2397            )
2398            .map(|storage| Box::new(storage) as Box<dyn Storage>)
2399            .map(Arc::new),
2400
2401            #[cfg(not(feature = "dynostore"))]
2402            "s3" | "memory" => Err(Error::FeatureNotEnabled {
2403                feature: "dynostore".into(),
2404                message: self.storage.to_string(),
2405            }),
2406
2407            #[cfg(feature = "libsql")]
2408            "sqlite" => {
2409                lite::Engine::builder()
2410                    .storage(self.storage.clone())
2411                    .node(self.node_id)
2412                    .cluster(self.cluster_id.clone())
2413                    .advertised_listener(self.advertised_listener.clone())
2414                    .schemas(self.schema_registry)
2415                    .lake(self.lake_house.clone())
2416                    .cancellation(self.cancellation.clone())
2417                    .build()
2418                    .await
2419            }
2420
2421            #[cfg(not(feature = "libsql"))]
2422            "sqlite" => Err(Error::FeatureNotEnabled {
2423                feature: "libsql".into(),
2424                message: self.storage.to_string(),
2425            }),
2426
2427            #[cfg(feature = "slatedb")]
2428            "slatedb" => {
2429                use slatedb::Db;
2430                use slatedb::object_store::{
2431                    ObjectStore as SlateObjectStore,
2432                    aws::{
2433                        AmazonS3Builder as SlateS3Builder,
2434                        S3ConditionalPut as SlateS3ConditionalPut,
2435                    },
2436                    memory::InMemory as SlateInMemory,
2437                };
2438
2439                let host = self.storage.host_str().unwrap_or("nisshi");
2440                let db_path = format!("nisshi-{}.slatedb", self.cluster_id);
2441
2442                // Support memory backend for testing: slatedb://memory
2443                let object_store: Arc<dyn SlateObjectStore> = if host == "memory" {
2444                    Arc::new(SlateInMemory::new())
2445                } else {
2446                    // Use S3 backend with host as bucket name
2447                    SlateS3Builder::from_env()
2448                        .with_bucket_name(host)
2449                        .with_conditional_put(SlateS3ConditionalPut::ETagMatch)
2450                        .build()
2451                        .map(Arc::new)
2452                        .map_err(|e| Error::Message(e.to_string()))?
2453                };
2454
2455                Db::open(db_path, object_store)
2456                    .await
2457                    .map(Arc::new)
2458                    .map(|db| {
2459                        slate::Engine::builder()
2460                            .cluster(self.cluster_id.clone())
2461                            .node(self.node_id)
2462                            .advertised_listener(self.advertised_listener.clone())
2463                            .db(db)
2464                            .schemas(self.schema_registry)
2465                            .lake(self.lake_house)
2466                            .build()
2467                    })
2468                    .map(|storage| Box::new(storage) as Box<dyn Storage>)
2469                    .map(Arc::new)
2470                    .map_err(Into::into)
2471            }
2472
2473            #[cfg(not(feature = "slatedb"))]
2474            "slatedb" => Err(Error::FeatureNotEnabled {
2475                feature: "slatedb".into(),
2476                message: self.storage.to_string(),
2477            }),
2478
2479            #[cfg(feature = "turso")]
2480            "turso" => limbo::Engine::builder()
2481                .storage(self.storage.clone())
2482                .node(self.node_id)
2483                .cluster(self.cluster_id.clone())
2484                .advertised_listener(self.advertised_listener.clone())
2485                .schemas(self.schema_registry)
2486                .lake(self.lake_house.clone())
2487                .build()
2488                .await
2489                .map(|storage| Box::new(storage) as Box<dyn Storage>)
2490                .map(Arc::new),
2491
2492            #[cfg(not(feature = "turso"))]
2493            "turso" => Err(Error::FeatureNotEnabled {
2494                feature: "turso".into(),
2495                message: self.storage.to_string(),
2496            }),
2497
2498            "null" => Ok(null::Engine::new(
2499                self.cluster_id.clone(),
2500                self.node_id,
2501                self.advertised_listener.clone(),
2502            ))
2503            .map(|storage| Box::new(storage) as Box<dyn Storage>)
2504            .map(Arc::new),
2505
2506            #[cfg(not(any(
2507                feature = "dynostore",
2508                feature = "libsql",
2509                feature = "postgres",
2510                feature = "slatedb",
2511                feature = "turso"
2512            )))]
2513            _storage => Ok(null::Engine::new(
2514                self.cluster_id.clone(),
2515                self.node_id,
2516                self.advertised_listener.clone(),
2517            ))
2518            .map(|storage| Box::new(storage) as Box<dyn Storage>)
2519            .map(Arc::new),
2520
2521            #[cfg(any(
2522                feature = "dynostore",
2523                feature = "libsql",
2524                feature = "postgres",
2525                feature = "slatedb",
2526                feature = "turso"
2527            ))]
2528            _unsupported => Err(Error::UnsupportedStorageUrl(self.storage.clone())),
2529        }?;
2530
2531        let pb = if self.silent {
2532            None
2533        } else {
2534            let pb = ProgressBar::new(1);
2535            pb.set_style(
2536                ProgressStyle::with_template("[{elapsed}] {bar:40.cyan/blue} {msg}")
2537                    .unwrap()
2538                    .progress_chars("##-"),
2539            );
2540
2541            pb.set_message("connecting to storage");
2542
2543            Some(pb)
2544        };
2545
2546        storage.ping().await?;
2547
2548        if let Some(pb) = pb {
2549            pb.inc(1);
2550            pb.finish_with_message(format!("{} connected to storage", Emoji("✅", ""),));
2551        }
2552
2553        Ok(storage)
2554    }
2555}
2556
2557pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
2558    global::meter_with_scope(
2559        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
2560            .with_version(env!("CARGO_PKG_VERSION"))
2561            .with_schema_url(SCHEMA_URL)
2562            .build(),
2563    )
2564});
2565
2566static STORAGE_CONTAINER_REQUESTS: LazyLock<Counter<u64>> = LazyLock::new(|| {
2567    METER
2568        .u64_counter("nisshi_storage_container_requests")
2569        .with_description("nisshi storage container requests")
2570        .build()
2571});
2572
2573static STORAGE_CONTAINER_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
2574    METER
2575        .u64_counter("nisshi_storage_container_errors")
2576        .with_description("nisshi storage container errors")
2577        .build()
2578});
2579
2580#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
2581pub struct ScramCredential {
2582    pub salt: Bytes,
2583    pub iterations: i32,
2584    pub stored_key: Bytes,
2585    pub server_key: Bytes,
2586}
2587
2588#[async_trait]
2589impl Storage for StorageContainer {
2590    #[instrument(skip_all)]
2591    async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
2592        let attributes = [KeyValue::new("method", "register_broker")];
2593
2594        match self {
2595            #[cfg(feature = "dynostore")]
2596            Self::DynoStore(engine) => engine.register_broker(broker_registration),
2597
2598            #[cfg(feature = "libsql")]
2599            Self::Lite(engine) => engine.register_broker(broker_registration),
2600
2601            Self::Null(engine) => engine.register_broker(broker_registration),
2602
2603            #[cfg(feature = "postgres")]
2604            Self::Postgres(engine) => engine.register_broker(broker_registration),
2605
2606            #[cfg(feature = "slatedb")]
2607            Self::Slate(engine) => engine.register_broker(broker_registration),
2608
2609            #[cfg(feature = "turso")]
2610            Self::Turso(engine) => engine.register_broker(broker_registration),
2611        }
2612        .await
2613        .inspect(|_| {
2614            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2615        })
2616        .inspect_err(|_| {
2617            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2618        })
2619    }
2620
2621    #[instrument(skip_all)]
2622    async fn incremental_alter_resource(
2623        &self,
2624        resource: AlterConfigsResource,
2625    ) -> Result<AlterConfigsResourceResponse> {
2626        let attributes = [KeyValue::new("method", "incremental_alter_resource")];
2627
2628        match self {
2629            #[cfg(feature = "dynostore")]
2630            Self::DynoStore(engine) => engine.incremental_alter_resource(resource),
2631
2632            #[cfg(feature = "libsql")]
2633            Self::Lite(engine) => engine.incremental_alter_resource(resource),
2634
2635            Self::Null(engine) => engine.incremental_alter_resource(resource),
2636
2637            #[cfg(feature = "postgres")]
2638            Self::Postgres(engine) => engine.incremental_alter_resource(resource),
2639
2640            #[cfg(feature = "slatedb")]
2641            Self::Slate(engine) => engine.incremental_alter_resource(resource),
2642
2643            #[cfg(feature = "turso")]
2644            Self::Turso(engine) => engine.incremental_alter_resource(resource),
2645        }
2646        .await
2647        .inspect(|_| {
2648            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2649        })
2650        .inspect_err(|_| {
2651            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2652        })
2653    }
2654
2655    #[instrument(skip_all)]
2656    async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
2657        let attributes = [KeyValue::new("method", "create_topic")];
2658
2659        match self {
2660            #[cfg(feature = "dynostore")]
2661            Self::DynoStore(engine) => engine.create_topic(topic, validate_only),
2662
2663            #[cfg(feature = "libsql")]
2664            Self::Lite(engine) => engine.create_topic(topic, validate_only),
2665
2666            Self::Null(engine) => engine.create_topic(topic, validate_only),
2667
2668            #[cfg(feature = "postgres")]
2669            Self::Postgres(engine) => engine.create_topic(topic, validate_only),
2670
2671            #[cfg(feature = "turso")]
2672            Self::Turso(engine) => engine.create_topic(topic, validate_only),
2673
2674            #[cfg(feature = "slatedb")]
2675            Self::Slate(engine) => engine.create_topic(topic, validate_only),
2676        }
2677        .await
2678        .inspect(|_| {
2679            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2680        })
2681        .inspect_err(|_| {
2682            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2683        })
2684    }
2685
2686    #[instrument(skip_all)]
2687    async fn delete_records(
2688        &self,
2689        topics: &[DeleteRecordsTopic],
2690    ) -> Result<Vec<DeleteRecordsTopicResult>> {
2691        let attributes = [KeyValue::new("method", "delete_records")];
2692
2693        match self {
2694            #[cfg(feature = "dynostore")]
2695            Self::DynoStore(engine) => engine.delete_records(topics),
2696
2697            #[cfg(feature = "libsql")]
2698            Self::Lite(engine) => engine.delete_records(topics),
2699
2700            Self::Null(engine) => engine.delete_records(topics),
2701
2702            #[cfg(feature = "postgres")]
2703            Self::Postgres(engine) => engine.delete_records(topics),
2704
2705            #[cfg(feature = "slatedb")]
2706            Self::Slate(engine) => engine.delete_records(topics),
2707
2708            #[cfg(feature = "turso")]
2709            Self::Turso(engine) => engine.delete_records(topics),
2710        }
2711        .await
2712        .inspect(|_| {
2713            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2714        })
2715        .inspect_err(|_| {
2716            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2717        })
2718    }
2719
2720    #[instrument(skip_all)]
2721    async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
2722        let attributes = [KeyValue::new("method", "delete_topic")];
2723
2724        match self {
2725            #[cfg(feature = "dynostore")]
2726            Self::DynoStore(engine) => engine.delete_topic(topic),
2727
2728            #[cfg(feature = "libsql")]
2729            Self::Lite(engine) => engine.delete_topic(topic),
2730
2731            Self::Null(engine) => engine.delete_topic(topic),
2732
2733            #[cfg(feature = "postgres")]
2734            Self::Postgres(engine) => engine.delete_topic(topic),
2735
2736            #[cfg(feature = "slatedb")]
2737            Self::Slate(engine) => engine.delete_topic(topic),
2738
2739            #[cfg(feature = "turso")]
2740            Self::Turso(engine) => engine.delete_topic(topic),
2741        }
2742        .await
2743        .inspect(|_| {
2744            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2745        })
2746        .inspect_err(|_| {
2747            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2748        })
2749    }
2750
2751    #[instrument(skip_all)]
2752    async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
2753        let attributes = [KeyValue::new("method", "brokers")];
2754
2755        match self {
2756            #[cfg(feature = "dynostore")]
2757            Self::DynoStore(engine) => engine.brokers(),
2758
2759            #[cfg(feature = "libsql")]
2760            Self::Lite(engine) => engine.brokers(),
2761
2762            Self::Null(engine) => engine.brokers(),
2763
2764            #[cfg(feature = "postgres")]
2765            Self::Postgres(engine) => engine.brokers(),
2766
2767            #[cfg(feature = "slatedb")]
2768            Self::Slate(engine) => engine.brokers(),
2769
2770            #[cfg(feature = "turso")]
2771            Self::Turso(engine) => engine.brokers(),
2772        }
2773        .await
2774        .inspect(|_| {
2775            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2776        })
2777        .inspect_err(|_| {
2778            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2779        })
2780    }
2781
2782    #[instrument(skip_all)]
2783    async fn produce(
2784        &self,
2785        transaction_id: Option<&str>,
2786        topition: &Topition,
2787        batch: deflated::Batch,
2788    ) -> Result<i64> {
2789        let attributes = [KeyValue::new("method", "produce")];
2790
2791        match self {
2792            #[cfg(feature = "dynostore")]
2793            Self::DynoStore(engine) => engine.produce(transaction_id, topition, batch),
2794
2795            #[cfg(feature = "libsql")]
2796            Self::Lite(engine) => engine.produce(transaction_id, topition, batch),
2797
2798            Self::Null(engine) => engine.produce(transaction_id, topition, batch),
2799
2800            #[cfg(feature = "postgres")]
2801            Self::Postgres(engine) => engine.produce(transaction_id, topition, batch),
2802
2803            #[cfg(feature = "slatedb")]
2804            Self::Slate(engine) => engine.produce(transaction_id, topition, batch),
2805
2806            #[cfg(feature = "turso")]
2807            Self::Turso(engine) => engine.produce(transaction_id, topition, batch),
2808        }
2809        .await
2810        .inspect(|_| {
2811            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2812        })
2813        .inspect_err(|_| {
2814            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2815        })
2816    }
2817
2818    #[instrument(skip_all)]
2819    async fn fetch(
2820        &self,
2821        topition: &'_ Topition,
2822        offset: i64,
2823        min_bytes: u32,
2824        max_bytes: u32,
2825        isolation: IsolationLevel,
2826        max_wait: Duration,
2827    ) -> Result<Vec<deflated::Batch>> {
2828        let attributes = [KeyValue::new("method", "fetch")];
2829
2830        match self {
2831            #[cfg(feature = "dynostore")]
2832            Self::DynoStore(engine) => {
2833                engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2834            }
2835
2836            #[cfg(feature = "libsql")]
2837            Self::Lite(engine) => {
2838                engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2839            }
2840
2841            Self::Null(engine) => {
2842                engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2843            }
2844
2845            #[cfg(feature = "postgres")]
2846            Self::Postgres(engine) => {
2847                engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2848            }
2849
2850            #[cfg(feature = "slatedb")]
2851            Self::Slate(engine) => {
2852                engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2853            }
2854
2855            #[cfg(feature = "turso")]
2856            Self::Turso(engine) => {
2857                engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2858            }
2859        }
2860        .await
2861        .inspect(|_| {
2862            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2863        })
2864        .inspect_err(|_| {
2865            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2866        })
2867    }
2868
2869    #[instrument(skip_all)]
2870    async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
2871        let attributes = [KeyValue::new("method", "offset_stage")];
2872
2873        match self {
2874            #[cfg(feature = "dynostore")]
2875            Self::DynoStore(engine) => engine.offset_stage(topition),
2876
2877            #[cfg(feature = "libsql")]
2878            Self::Lite(engine) => engine.offset_stage(topition),
2879
2880            Self::Null(engine) => engine.offset_stage(topition),
2881
2882            #[cfg(feature = "postgres")]
2883            Self::Postgres(engine) => engine.offset_stage(topition),
2884
2885            #[cfg(feature = "slatedb")]
2886            Self::Slate(engine) => engine.offset_stage(topition),
2887
2888            #[cfg(feature = "turso")]
2889            Self::Turso(engine) => engine.offset_stage(topition),
2890        }
2891        .await
2892        .inspect(|_| {
2893            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2894        })
2895        .inspect_err(|_| {
2896            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2897        })
2898    }
2899
2900    #[instrument(skip_all)]
2901    async fn list_offsets(
2902        &self,
2903        isolation_level: IsolationLevel,
2904        offsets: &[(Topition, ListOffset)],
2905    ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
2906        let attributes = [KeyValue::new("method", "list_offsets")];
2907
2908        match self {
2909            #[cfg(feature = "dynostore")]
2910            Self::DynoStore(engine) => engine.list_offsets(isolation_level, offsets),
2911
2912            #[cfg(feature = "libsql")]
2913            Self::Lite(engine) => engine.list_offsets(isolation_level, offsets),
2914
2915            Self::Null(engine) => engine.list_offsets(isolation_level, offsets),
2916
2917            #[cfg(feature = "postgres")]
2918            Self::Postgres(engine) => engine.list_offsets(isolation_level, offsets),
2919
2920            #[cfg(feature = "slatedb")]
2921            Self::Slate(engine) => engine.list_offsets(isolation_level, offsets),
2922
2923            #[cfg(feature = "turso")]
2924            Self::Turso(engine) => engine.list_offsets(isolation_level, offsets),
2925        }
2926        .await
2927        .inspect(|_| {
2928            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2929        })
2930        .inspect_err(|_| {
2931            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2932        })
2933    }
2934
2935    #[instrument(skip_all)]
2936    async fn offset_commit(
2937        &self,
2938        group_id: &str,
2939        retention_time_ms: Option<Duration>,
2940        offsets: &[(Topition, OffsetCommitRequest)],
2941    ) -> Result<Vec<(Topition, ErrorCode)>> {
2942        let attributes = [KeyValue::new("method", "offset_commit")];
2943
2944        match self {
2945            #[cfg(feature = "dynostore")]
2946            Self::DynoStore(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2947
2948            #[cfg(feature = "libsql")]
2949            Self::Lite(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2950
2951            Self::Null(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2952
2953            #[cfg(feature = "postgres")]
2954            Self::Postgres(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2955
2956            #[cfg(feature = "slatedb")]
2957            Self::Slate(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2958
2959            #[cfg(feature = "turso")]
2960            Self::Turso(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2961        }
2962        .await
2963        .inspect(|_| {
2964            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2965        })
2966        .inspect_err(|_| {
2967            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2968        })
2969    }
2970
2971    #[instrument(skip_all)]
2972    async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
2973        let attributes = [KeyValue::new("method", "committed_offset_topitions")];
2974
2975        match self {
2976            #[cfg(feature = "dynostore")]
2977            Self::DynoStore(engine) => engine.committed_offset_topitions(group_id),
2978
2979            #[cfg(feature = "libsql")]
2980            Self::Lite(engine) => engine.committed_offset_topitions(group_id),
2981
2982            Self::Null(engine) => engine.committed_offset_topitions(group_id),
2983
2984            #[cfg(feature = "postgres")]
2985            Self::Postgres(engine) => engine.committed_offset_topitions(group_id),
2986
2987            #[cfg(feature = "slatedb")]
2988            Self::Slate(engine) => engine.committed_offset_topitions(group_id),
2989
2990            #[cfg(feature = "turso")]
2991            Self::Turso(engine) => engine.committed_offset_topitions(group_id),
2992        }
2993        .await
2994        .inspect(|_| {
2995            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2996        })
2997        .inspect_err(|_| {
2998            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2999        })
3000    }
3001
3002    #[instrument(skip_all)]
3003    async fn offset_fetch(
3004        &self,
3005        group_id: Option<&str>,
3006        topics: &[Topition],
3007        require_stable: Option<bool>,
3008    ) -> Result<BTreeMap<Topition, i64>> {
3009        let attributes = [KeyValue::new("method", "offset_fetch")];
3010
3011        match self {
3012            #[cfg(feature = "dynostore")]
3013            Self::DynoStore(engine) => engine.offset_fetch(group_id, topics, require_stable),
3014
3015            #[cfg(feature = "libsql")]
3016            Self::Lite(engine) => engine.offset_fetch(group_id, topics, require_stable),
3017
3018            Self::Null(engine) => engine.offset_fetch(group_id, topics, require_stable),
3019
3020            #[cfg(feature = "postgres")]
3021            Self::Postgres(engine) => engine.offset_fetch(group_id, topics, require_stable),
3022
3023            #[cfg(feature = "slatedb")]
3024            Self::Slate(engine) => engine.offset_fetch(group_id, topics, require_stable),
3025
3026            #[cfg(feature = "turso")]
3027            Self::Turso(engine) => engine.offset_fetch(group_id, topics, require_stable),
3028        }
3029        .await
3030        .inspect(|_| {
3031            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3032        })
3033        .inspect_err(|_| {
3034            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3035        })
3036    }
3037
3038    #[instrument(skip_all)]
3039    async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
3040        let attributes = [KeyValue::new("method", "metadata")];
3041
3042        match self {
3043            #[cfg(feature = "dynostore")]
3044            Self::DynoStore(engine) => engine.metadata(topics),
3045
3046            #[cfg(feature = "libsql")]
3047            Self::Lite(engine) => engine.metadata(topics),
3048
3049            Self::Null(engine) => engine.metadata(topics),
3050
3051            #[cfg(feature = "postgres")]
3052            Self::Postgres(engine) => engine.metadata(topics),
3053
3054            #[cfg(feature = "slatedb")]
3055            Self::Slate(engine) => engine.metadata(topics),
3056
3057            #[cfg(feature = "turso")]
3058            Self::Turso(engine) => engine.metadata(topics),
3059        }
3060        .await
3061        .inspect(|_| {
3062            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3063        })
3064        .inspect_err(|_| {
3065            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3066        })
3067    }
3068
3069    #[instrument(skip_all)]
3070    async fn describe_config(
3071        &self,
3072        name: &str,
3073        resource: ConfigResource,
3074        keys: Option<&[String]>,
3075    ) -> Result<DescribeConfigsResult> {
3076        let attributes = [KeyValue::new("method", "describe_config")];
3077
3078        match self {
3079            #[cfg(feature = "dynostore")]
3080            Self::DynoStore(engine) => engine.describe_config(name, resource, keys),
3081
3082            #[cfg(feature = "libsql")]
3083            Self::Lite(engine) => engine.describe_config(name, resource, keys),
3084
3085            Self::Null(engine) => engine.describe_config(name, resource, keys),
3086
3087            #[cfg(feature = "postgres")]
3088            Self::Postgres(engine) => engine.describe_config(name, resource, keys),
3089
3090            #[cfg(feature = "slatedb")]
3091            Self::Slate(engine) => engine.describe_config(name, resource, keys),
3092
3093            #[cfg(feature = "turso")]
3094            Self::Turso(engine) => engine.describe_config(name, resource, keys),
3095        }
3096        .await
3097        .inspect(|_| {
3098            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3099        })
3100        .inspect_err(|_| {
3101            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3102        })
3103    }
3104
3105    #[instrument(skip_all)]
3106    async fn describe_topic_partitions(
3107        &self,
3108        topics: Option<&[TopicId]>,
3109        partition_limit: i32,
3110        cursor: Option<Topition>,
3111    ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
3112        let attributes = [KeyValue::new("method", "describe_topic_partitions")];
3113
3114        match self {
3115            #[cfg(feature = "dynostore")]
3116            Self::DynoStore(engine) => {
3117                engine.describe_topic_partitions(topics, partition_limit, cursor)
3118            }
3119
3120            #[cfg(feature = "libsql")]
3121            Self::Lite(engine) => engine.describe_topic_partitions(topics, partition_limit, cursor),
3122
3123            Self::Null(engine) => engine.describe_topic_partitions(topics, partition_limit, cursor),
3124
3125            #[cfg(feature = "postgres")]
3126            Self::Postgres(engine) => {
3127                engine.describe_topic_partitions(topics, partition_limit, cursor)
3128            }
3129
3130            #[cfg(feature = "slatedb")]
3131            Self::Slate(engine) => {
3132                engine.describe_topic_partitions(topics, partition_limit, cursor)
3133            }
3134
3135            #[cfg(feature = "turso")]
3136            Self::Turso(engine) => {
3137                engine.describe_topic_partitions(topics, partition_limit, cursor)
3138            }
3139        }
3140        .await
3141        .inspect(|_| {
3142            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3143        })
3144        .inspect_err(|_| {
3145            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3146        })
3147    }
3148
3149    #[instrument(skip_all)]
3150    async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
3151        let attributes = [KeyValue::new("method", "list_groups")];
3152
3153        match self {
3154            #[cfg(feature = "dynostore")]
3155            Self::DynoStore(engine) => engine.list_groups(states_filter),
3156
3157            #[cfg(feature = "libsql")]
3158            Self::Lite(engine) => engine.list_groups(states_filter),
3159
3160            Self::Null(engine) => engine.list_groups(states_filter),
3161
3162            #[cfg(feature = "postgres")]
3163            Self::Postgres(engine) => engine.list_groups(states_filter),
3164
3165            #[cfg(feature = "slatedb")]
3166            Self::Slate(engine) => engine.list_groups(states_filter),
3167
3168            #[cfg(feature = "turso")]
3169            Self::Turso(engine) => engine.list_groups(states_filter),
3170        }
3171        .await
3172        .inspect(|_| {
3173            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3174        })
3175        .inspect_err(|_| {
3176            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3177        })
3178    }
3179
3180    #[instrument(skip_all)]
3181    async fn delete_groups(
3182        &self,
3183        group_ids: Option<&[String]>,
3184    ) -> Result<Vec<DeletableGroupResult>> {
3185        let attributes = [KeyValue::new("method", "delete_groups")];
3186
3187        match self {
3188            #[cfg(feature = "dynostore")]
3189            Self::DynoStore(engine) => engine.delete_groups(group_ids),
3190
3191            #[cfg(feature = "libsql")]
3192            Self::Lite(engine) => engine.delete_groups(group_ids),
3193
3194            Self::Null(engine) => engine.delete_groups(group_ids),
3195
3196            #[cfg(feature = "postgres")]
3197            Self::Postgres(engine) => engine.delete_groups(group_ids),
3198
3199            #[cfg(feature = "slatedb")]
3200            Self::Slate(engine) => engine.delete_groups(group_ids),
3201
3202            #[cfg(feature = "turso")]
3203            Self::Turso(engine) => engine.delete_groups(group_ids),
3204        }
3205        .await
3206        .inspect(|_| {
3207            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3208        })
3209        .inspect_err(|_| {
3210            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3211        })
3212    }
3213
3214    #[instrument(skip_all)]
3215    async fn describe_groups(
3216        &self,
3217        group_ids: Option<&[String]>,
3218        include_authorized_operations: bool,
3219    ) -> Result<Vec<NamedGroupDetail>> {
3220        let attributes = [KeyValue::new("method", "describe_groups")];
3221
3222        match self {
3223            #[cfg(feature = "dynostore")]
3224            Self::DynoStore(engine) => {
3225                engine.describe_groups(group_ids, include_authorized_operations)
3226            }
3227
3228            #[cfg(feature = "libsql")]
3229            Self::Lite(engine) => engine.describe_groups(group_ids, include_authorized_operations),
3230
3231            Self::Null(engine) => engine.describe_groups(group_ids, include_authorized_operations),
3232
3233            #[cfg(feature = "postgres")]
3234            Self::Postgres(engine) => {
3235                engine.describe_groups(group_ids, include_authorized_operations)
3236            }
3237
3238            #[cfg(feature = "slatedb")]
3239            Self::Slate(engine) => engine.describe_groups(group_ids, include_authorized_operations),
3240
3241            #[cfg(feature = "turso")]
3242            Self::Turso(engine) => engine.describe_groups(group_ids, include_authorized_operations),
3243        }
3244        .await
3245        .inspect(|_| {
3246            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3247        })
3248        .inspect_err(|_| {
3249            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3250        })
3251    }
3252
3253    #[instrument(skip_all)]
3254    async fn update_group(
3255        &self,
3256        group_id: &str,
3257        detail: GroupDetail,
3258        version: Option<Version>,
3259    ) -> Result<Version, UpdateError<GroupDetail>> {
3260        let attributes = [KeyValue::new("method", "update_group")];
3261
3262        match self {
3263            #[cfg(feature = "dynostore")]
3264            Self::DynoStore(engine) => engine.update_group(group_id, detail, version),
3265
3266            #[cfg(feature = "libsql")]
3267            Self::Lite(engine) => engine.update_group(group_id, detail, version),
3268
3269            Self::Null(engine) => engine.update_group(group_id, detail, version),
3270
3271            #[cfg(feature = "postgres")]
3272            Self::Postgres(engine) => engine.update_group(group_id, detail, version),
3273
3274            #[cfg(feature = "slatedb")]
3275            Self::Slate(engine) => engine.update_group(group_id, detail, version),
3276
3277            #[cfg(feature = "turso")]
3278            Self::Turso(engine) => engine.update_group(group_id, detail, version),
3279        }
3280        .await
3281        .inspect(|_| {
3282            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3283        })
3284        .inspect_err(|_| {
3285            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3286        })
3287    }
3288
3289    #[instrument(skip_all)]
3290    async fn init_producer(
3291        &self,
3292        transaction_id: Option<&str>,
3293        transaction_timeout_ms: i32,
3294        producer_id: Option<i64>,
3295        producer_epoch: Option<i16>,
3296    ) -> Result<ProducerIdResponse> {
3297        let attributes = [KeyValue::new("method", "init_producer")];
3298
3299        match self {
3300            #[cfg(feature = "dynostore")]
3301            Self::DynoStore(engine) => engine.init_producer(
3302                transaction_id,
3303                transaction_timeout_ms,
3304                producer_id,
3305                producer_epoch,
3306            ),
3307
3308            #[cfg(feature = "libsql")]
3309            Self::Lite(engine) => engine.init_producer(
3310                transaction_id,
3311                transaction_timeout_ms,
3312                producer_id,
3313                producer_epoch,
3314            ),
3315
3316            Self::Null(engine) => engine.init_producer(
3317                transaction_id,
3318                transaction_timeout_ms,
3319                producer_id,
3320                producer_epoch,
3321            ),
3322
3323            #[cfg(feature = "postgres")]
3324            Self::Postgres(engine) => engine.init_producer(
3325                transaction_id,
3326                transaction_timeout_ms,
3327                producer_id,
3328                producer_epoch,
3329            ),
3330
3331            #[cfg(feature = "slatedb")]
3332            Self::Slate(engine) => engine.init_producer(
3333                transaction_id,
3334                transaction_timeout_ms,
3335                producer_id,
3336                producer_epoch,
3337            ),
3338
3339            #[cfg(feature = "turso")]
3340            Self::Turso(engine) => engine.init_producer(
3341                transaction_id,
3342                transaction_timeout_ms,
3343                producer_id,
3344                producer_epoch,
3345            ),
3346        }
3347        .await
3348        .inspect(|_| {
3349            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3350        })
3351        .inspect_err(|_| {
3352            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3353        })
3354    }
3355
3356    #[instrument(skip_all)]
3357    async fn txn_add_offsets(
3358        &self,
3359        transaction_id: &str,
3360        producer_id: i64,
3361        producer_epoch: i16,
3362        group_id: &str,
3363    ) -> Result<ErrorCode> {
3364        let attributes = [KeyValue::new("method", "txn_add_offsets")];
3365
3366        match self {
3367            #[cfg(feature = "dynostore")]
3368            Self::DynoStore(engine) => {
3369                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3370            }
3371
3372            #[cfg(feature = "libsql")]
3373            Self::Lite(engine) => {
3374                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3375            }
3376
3377            Self::Null(engine) => {
3378                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3379            }
3380
3381            #[cfg(feature = "postgres")]
3382            Self::Postgres(engine) => {
3383                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3384            }
3385
3386            #[cfg(feature = "slatedb")]
3387            Self::Slate(engine) => {
3388                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3389            }
3390
3391            #[cfg(feature = "turso")]
3392            Self::Turso(engine) => {
3393                engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3394            }
3395        }
3396        .await
3397        .inspect(|_| {
3398            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3399        })
3400        .inspect_err(|_| {
3401            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3402        })
3403    }
3404
3405    #[instrument(skip_all)]
3406    async fn txn_add_partitions(
3407        &self,
3408        partitions: TxnAddPartitionsRequest,
3409    ) -> Result<TxnAddPartitionsResponse> {
3410        let attributes = [KeyValue::new("method", "txn_add_partitions")];
3411
3412        match self {
3413            #[cfg(feature = "dynostore")]
3414            Self::DynoStore(engine) => engine.txn_add_partitions(partitions),
3415
3416            #[cfg(feature = "libsql")]
3417            Self::Lite(engine) => engine.txn_add_partitions(partitions),
3418
3419            Self::Null(engine) => engine.txn_add_partitions(partitions),
3420
3421            #[cfg(feature = "postgres")]
3422            Self::Postgres(engine) => engine.txn_add_partitions(partitions),
3423
3424            #[cfg(feature = "slatedb")]
3425            Self::Slate(engine) => engine.txn_add_partitions(partitions),
3426
3427            #[cfg(feature = "turso")]
3428            Self::Turso(engine) => engine.txn_add_partitions(partitions),
3429        }
3430        .await
3431        .inspect(|_| {
3432            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3433        })
3434        .inspect_err(|_| {
3435            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3436        })
3437    }
3438
3439    #[instrument(skip_all)]
3440    async fn txn_offset_commit(
3441        &self,
3442        offsets: TxnOffsetCommitRequest,
3443    ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
3444        let attributes = [KeyValue::new("method", "txn_offset_commit")];
3445
3446        match self {
3447            #[cfg(feature = "dynostore")]
3448            Self::DynoStore(engine) => engine.txn_offset_commit(offsets),
3449
3450            #[cfg(feature = "libsql")]
3451            Self::Lite(engine) => engine.txn_offset_commit(offsets),
3452
3453            Self::Null(engine) => engine.txn_offset_commit(offsets),
3454
3455            #[cfg(feature = "postgres")]
3456            Self::Postgres(engine) => engine.txn_offset_commit(offsets),
3457
3458            #[cfg(feature = "slatedb")]
3459            Self::Slate(engine) => engine.txn_offset_commit(offsets),
3460
3461            #[cfg(feature = "turso")]
3462            Self::Turso(engine) => engine.txn_offset_commit(offsets),
3463        }
3464        .await
3465        .inspect(|_| {
3466            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3467        })
3468        .inspect_err(|_| {
3469            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3470        })
3471    }
3472
3473    #[instrument(skip_all)]
3474    async fn txn_end(
3475        &self,
3476        transaction_id: &str,
3477        producer_id: i64,
3478        producer_epoch: i16,
3479        committed: bool,
3480    ) -> Result<ErrorCode> {
3481        let attributes = [KeyValue::new("method", "txn_end")];
3482
3483        match self {
3484            #[cfg(feature = "dynostore")]
3485            Self::DynoStore(engine) => {
3486                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3487            }
3488
3489            #[cfg(feature = "libsql")]
3490            Self::Lite(engine) => {
3491                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3492            }
3493
3494            Self::Null(engine) => {
3495                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3496            }
3497
3498            #[cfg(feature = "postgres")]
3499            Self::Postgres(engine) => {
3500                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3501            }
3502
3503            #[cfg(feature = "slatedb")]
3504            Self::Slate(engine) => {
3505                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3506            }
3507
3508            #[cfg(feature = "turso")]
3509            Self::Turso(engine) => {
3510                engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3511            }
3512        }
3513        .await
3514        .inspect(|_| {
3515            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3516        })
3517        .inspect_err(|_| {
3518            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3519        })
3520    }
3521
3522    #[instrument(skip_all)]
3523    async fn maintain(&self, now: SystemTime) -> Result<()> {
3524        let attributes = [KeyValue::new("method", "maintain")];
3525
3526        match self {
3527            #[cfg(feature = "dynostore")]
3528            Self::DynoStore(engine) => engine.maintain(now),
3529
3530            #[cfg(feature = "libsql")]
3531            Self::Lite(engine) => engine.maintain(now),
3532
3533            Self::Null(engine) => engine.maintain(now),
3534
3535            #[cfg(feature = "postgres")]
3536            Self::Postgres(engine) => engine.maintain(now),
3537
3538            #[cfg(feature = "slatedb")]
3539            Self::Slate(engine) => engine.maintain(now),
3540
3541            #[cfg(feature = "turso")]
3542            Self::Turso(engine) => engine.maintain(now),
3543        }
3544        .await
3545        .inspect(|maintain| {
3546            debug!(?maintain);
3547            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3548        })
3549        .inspect_err(|err| {
3550            debug!(?err);
3551            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3552        })
3553    }
3554
3555    #[instrument(skip_all)]
3556    async fn cluster_id(&self) -> Result<String> {
3557        match self {
3558            #[cfg(feature = "dynostore")]
3559            Self::DynoStore(engine) => engine.cluster_id().await,
3560
3561            #[cfg(feature = "libsql")]
3562            Self::Lite(engine) => engine.cluster_id().await,
3563
3564            Self::Null(engine) => engine.cluster_id().await,
3565
3566            #[cfg(feature = "postgres")]
3567            Self::Postgres(engine) => engine.cluster_id().await,
3568
3569            #[cfg(feature = "slatedb")]
3570            Self::Slate(engine) => engine.cluster_id().await,
3571
3572            #[cfg(feature = "turso")]
3573            Self::Turso(engine) => engine.cluster_id().await,
3574        }
3575    }
3576
3577    #[instrument(skip_all)]
3578    async fn node(&self) -> Result<i32> {
3579        match self {
3580            #[cfg(feature = "dynostore")]
3581            Self::DynoStore(engine) => engine.node().await,
3582
3583            #[cfg(feature = "libsql")]
3584            Self::Lite(engine) => engine.node().await,
3585
3586            Self::Null(engine) => engine.node().await,
3587
3588            #[cfg(feature = "postgres")]
3589            Self::Postgres(engine) => engine.node().await,
3590
3591            #[cfg(feature = "slatedb")]
3592            Self::Slate(engine) => engine.node().await,
3593
3594            #[cfg(feature = "turso")]
3595            Self::Turso(engine) => engine.node().await,
3596        }
3597    }
3598
3599    #[instrument(skip_all)]
3600    async fn advertised_listener(&self) -> Result<Url> {
3601        match self {
3602            #[cfg(feature = "dynostore")]
3603            Self::DynoStore(engine) => engine.advertised_listener().await,
3604
3605            #[cfg(feature = "libsql")]
3606            Self::Lite(engine) => engine.advertised_listener().await,
3607
3608            Self::Null(engine) => engine.advertised_listener().await,
3609
3610            #[cfg(feature = "postgres")]
3611            Self::Postgres(engine) => engine.advertised_listener().await,
3612
3613            #[cfg(feature = "slatedb")]
3614            Self::Slate(engine) => engine.advertised_listener().await,
3615
3616            #[cfg(feature = "turso")]
3617            Self::Turso(engine) => engine.advertised_listener().await,
3618        }
3619    }
3620
3621    async fn delete_user_scram_credential(
3622        &self,
3623        user: &str,
3624        mechanism: ScramMechanism,
3625    ) -> Result<()> {
3626        match self {
3627            #[cfg(feature = "dynostore")]
3628            Self::DynoStore(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3629
3630            #[cfg(feature = "libsql")]
3631            Self::Lite(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3632
3633            Self::Null(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3634
3635            #[cfg(feature = "postgres")]
3636            Self::Postgres(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3637
3638            #[cfg(feature = "slatedb")]
3639            Self::Slate(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3640
3641            #[cfg(feature = "turso")]
3642            Self::Turso(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3643        }
3644    }
3645
3646    async fn upsert_user_scram_credential(
3647        &self,
3648        user: &str,
3649        mechanism: ScramMechanism,
3650        credential: ScramCredential,
3651    ) -> Result<()> {
3652        match self {
3653            #[cfg(feature = "dynostore")]
3654            Self::DynoStore(engine) => {
3655                engine
3656                    .upsert_user_scram_credential(user, mechanism, credential)
3657                    .await
3658            }
3659
3660            #[cfg(feature = "libsql")]
3661            Self::Lite(engine) => {
3662                engine
3663                    .upsert_user_scram_credential(user, mechanism, credential)
3664                    .await
3665            }
3666
3667            Self::Null(engine) => {
3668                engine
3669                    .upsert_user_scram_credential(user, mechanism, credential)
3670                    .await
3671            }
3672
3673            #[cfg(feature = "postgres")]
3674            Self::Postgres(engine) => {
3675                engine
3676                    .upsert_user_scram_credential(user, mechanism, credential)
3677                    .await
3678            }
3679
3680            #[cfg(feature = "slatedb")]
3681            Self::Slate(engine) => {
3682                engine
3683                    .upsert_user_scram_credential(user, mechanism, credential)
3684                    .await
3685            }
3686
3687            #[cfg(feature = "turso")]
3688            Self::Turso(engine) => {
3689                engine
3690                    .upsert_user_scram_credential(user, mechanism, credential)
3691                    .await
3692            }
3693        }
3694    }
3695
3696    async fn user_scram_credential(
3697        &self,
3698        user: &str,
3699        mechanism: ScramMechanism,
3700    ) -> Result<Option<ScramCredential>> {
3701        match self {
3702            #[cfg(feature = "dynostore")]
3703            Self::DynoStore(engine) => engine.user_scram_credential(user, mechanism).await,
3704
3705            #[cfg(feature = "libsql")]
3706            Self::Lite(engine) => engine.user_scram_credential(user, mechanism).await,
3707
3708            Self::Null(engine) => engine.user_scram_credential(user, mechanism).await,
3709
3710            #[cfg(feature = "postgres")]
3711            Self::Postgres(engine) => engine.user_scram_credential(user, mechanism).await,
3712
3713            #[cfg(feature = "slatedb")]
3714            Self::Slate(engine) => engine.user_scram_credential(user, mechanism).await,
3715
3716            #[cfg(feature = "turso")]
3717            Self::Turso(engine) => engine.user_scram_credential(user, mechanism).await,
3718        }
3719    }
3720
3721    #[instrument(skip_all)]
3722    async fn ping(&self) -> Result<()> {
3723        let attributes = [KeyValue::new("method", "ping")];
3724
3725        match self {
3726            #[cfg(feature = "dynostore")]
3727            Self::DynoStore(engine) => engine.ping(),
3728
3729            #[cfg(feature = "libsql")]
3730            Self::Lite(engine) => engine.ping(),
3731
3732            Self::Null(engine) => engine.ping(),
3733
3734            #[cfg(feature = "postgres")]
3735            Self::Postgres(engine) => engine.ping(),
3736
3737            #[cfg(feature = "turso")]
3738            Self::Turso(engine) => engine.ping(),
3739
3740            #[cfg(feature = "slatedb")]
3741            Self::Slate(engine) => engine.ping(),
3742        }
3743        .await
3744        .inspect(|_| {
3745            STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3746        })
3747        .inspect_err(|_| {
3748            STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3749        })
3750    }
3751}
3752
3753#[cfg(test)]
3754mod tests {
3755    use super::*;
3756
3757    #[test]
3758    fn topition_from_str() -> Result<()> {
3759        let topition = Topition::from_str("qwerty-2147483647")?;
3760        assert_eq!("qwerty", topition.topic());
3761        assert_eq!(i32::MAX, topition.partition());
3762        Ok(())
3763    }
3764
3765    #[test]
3766    fn topic_with_dashes_in_name() -> Result<()> {
3767        let topition = Topition::from_str("test-topic-0000000-eFC79C8-2147483647")?;
3768        assert_eq!("test-topic-0000000-eFC79C8", topition.topic());
3769        assert_eq!(i32::MAX, topition.partition());
3770        Ok(())
3771    }
3772}