1use async_trait::async_trait;
115use bytes::{Bytes, TryGetError};
116
117use console::Emoji;
118#[cfg(any(feature = "libsql", feature = "postgres"))]
119use deadpool::managed::PoolError;
120#[cfg(feature = "dynostore")]
121use dynostore::DynoStore;
122
123use glob::{GlobError, PatternError};
124
125use indicatif::{ProgressBar, ProgressStyle};
126#[cfg(feature = "dynostore")]
127use object_store::memory::InMemory;
128
129#[cfg(feature = "dynostore")]
130use object_store::aws::{AmazonS3Builder, S3ConditionalPut};
131
132use opentelemetry::{
133 InstrumentationScope, KeyValue, global,
134 metrics::{Counter, Meter},
135};
136use opentelemetry_semantic_conventions::SCHEMA_URL;
137
138#[cfg(feature = "postgres")]
139use pg::Postgres;
140
141use governor::InsufficientCapacity;
142use nisshi_sans_io::{
143 Body, ConfigResource, ErrorCode, IsolationLevel, ListOffset, NULL_TOPIC_ID, ScramMechanism,
144 add_partitions_to_txn_request::{
145 AddPartitionsToTxnRequest, AddPartitionsToTxnTopic, AddPartitionsToTxnTransaction,
146 },
147 add_partitions_to_txn_response::{AddPartitionsToTxnResult, AddPartitionsToTxnTopicResult},
148 consumer_group_describe_response,
149 create_topics_request::CreatableTopic,
150 delete_groups_response::DeletableGroupResult,
151 delete_records_request::DeleteRecordsTopic,
152 delete_records_response::DeleteRecordsTopicResult,
153 delete_topics_request::DeleteTopicState,
154 describe_cluster_response::DescribeClusterBroker,
155 describe_configs_response::DescribeConfigsResult,
156 describe_groups_response,
157 describe_topic_partitions_request::{Cursor, TopicRequest},
158 describe_topic_partitions_response::DescribeTopicPartitionsResponseTopic,
159 fetch_request::FetchTopic,
160 incremental_alter_configs_request::AlterConfigsResource,
161 incremental_alter_configs_response::AlterConfigsResourceResponse,
162 join_group_response::JoinGroupResponseMember,
163 list_groups_response::ListedGroup,
164 metadata_request::MetadataRequestTopic,
165 metadata_response::{MetadataResponseBroker, MetadataResponseTopic},
166 offset_commit_request::OffsetCommitRequestPartition,
167 record::deflated,
168 to_system_time, to_timestamp,
169 txn_offset_commit_request::TxnOffsetCommitRequestTopic,
170 txn_offset_commit_response::TxnOffsetCommitResponseTopic,
171};
172use nisshi_schema::{Registry, lake::House};
173use regex::Regex;
174use serde::{Deserialize, Serialize};
175#[cfg(any(feature = "libsql", feature = "postgres"))]
176use std::error;
177use std::{
178 array::TryFromSliceError,
179 collections::BTreeMap,
180 ffi::OsString,
181 fmt::{self, Debug, Display, Formatter},
182 fs::DirEntry,
183 io,
184 marker::PhantomData,
185 num::{ParseIntError, TryFromIntError},
186 path::PathBuf,
187 result,
188 str::FromStr,
189 sync::{Arc, LazyLock, PoisonError},
190 time::{Duration, SystemTime, SystemTimeError},
191};
192use tokio::sync::AcquireError;
193use tokio_util::sync::CancellationToken;
194use tracing::{debug, instrument};
195use tracing_subscriber::filter::ParseError;
196use url::Url;
197use uuid::Uuid;
198
199#[cfg(feature = "dynostore")]
200use tracing::warn;
201
202#[cfg(feature = "dynostore")]
203mod dynostore;
204
205#[cfg(feature = "postgres")]
206mod pg;
207
208#[cfg(feature = "dynostore")]
209mod batch;
210mod latency;
211
212mod null;
213
214#[cfg(feature = "libsql")]
215mod proxy;
216
217mod service;
218
219pub use latency::LatencyIntroducingStorage;
220
221pub use service::{
222 AlterUserScramCredentialsService, ChannelRequestLayer, ChannelRequestService,
223 ConsumerGroupDescribeService, CreateAclsService, CreateTopicsService, DeleteGroupsService,
224 DeleteRecordsService, DeleteTopicsService, DescribeAclsService, DescribeClusterService,
225 DescribeConfigsService, DescribeGroupsService, DescribeTopicPartitionsService,
226 DescribeUserScramCredentialsService, FetchService, FindCoordinatorService,
227 GetTelemetrySubscriptionsService, IncrementalAlterConfigsService, InitProducerIdService,
228 ListGroupsService, ListOffsetsService, ListPartitionReassignmentsService, MetadataService,
229 ProduceService, Request, RequestChannelService, RequestLayer, RequestReceiver, RequestSender,
230 RequestService, RequestStorageService, Response, TxnAddOffsetsService, TxnAddPartitionService,
231 TxnEndService, TxnOffsetCommitService, bounded_channel,
232};
233
234#[cfg(feature = "slatedb")]
235pub mod slate;
236
237#[cfg(any(feature = "libsql", feature = "postgres", feature = "turso"))]
238pub(crate) mod sql;
239
240#[cfg(feature = "libsql")]
241mod lite;
242
243#[cfg(feature = "dynostore")]
244mod gcs;
245
246#[cfg(feature = "dynostore")]
247mod os;
248
249#[cfg(feature = "turso")]
250mod limbo;
251
252#[derive(Clone, Debug, thiserror::Error)]
254pub enum Error {
255 Acquire(Arc<AcquireError>),
256
257 Api(ErrorCode),
258
259 ChronoParse(#[from] chrono::ParseError),
260
261 #[cfg(any(feature = "postgres", feature = "libsql"))]
262 DeadPoolBuild(#[from] deadpool::managed::BuildError),
263
264 Decode(Bytes),
265
266 FeatureNotEnabled {
267 feature: String,
268 message: String,
269 },
270
271 Glob(Arc<GlobError>),
272 InsufficientCapacity(#[from] InsufficientCapacity),
273 Io(Arc<io::Error>),
274
275 LessThanBaseOffset {
276 offset: i64,
277 base_offset: i64,
278 },
279 LessThanLastOffset {
280 offset: i64,
281 last_offset: Option<i64>,
282 },
283
284 #[cfg(feature = "libsql")]
285 LibSql(Arc<libsql::Error>),
286
287 LessThanMaxTime {
288 time: i64,
289 max_time: Option<i64>,
290 },
291 LessThanMinTime {
292 time: i64,
293 min_time: Option<i64>,
294 },
295 Message(String),
296 NoSuchEntry {
297 nth: u32,
298 },
299 NoSuchOffset(i64),
300 OsString(OsString),
301
302 #[cfg(any(feature = "dynostore", feature = "slatedb"))]
303 ObjectStore(Arc<object_store::Error>),
304
305 ParseFilter(Arc<ParseError>),
306 Pattern(Arc<PatternError>),
307 ParseInt(#[from] ParseIntError),
308 PhantomCached(),
309 Poison,
310
311 #[cfg(any(feature = "libsql", feature = "postgres"))]
312 Pool(Arc<Box<dyn error::Error + Send + Sync>>),
313
314 #[cfg(feature = "slatedb")]
315 Postcard(#[from] postcard::Error),
316
317 Regex(#[from] regex::Error),
318
319 SansIo(#[from] nisshi_sans_io::Error),
320
321 Schema(Arc<nisshi_schema::Error>),
322
323 Rustls(#[from] rustls::Error),
324
325 SegmentEmpty(Topition),
326
327 SegmentMissing {
328 topition: Topition,
329 offset: Option<i64>,
330 },
331
332 SerdeJson(Arc<serde_json::Error>),
333
334 #[cfg(feature = "slatedb")]
335 Slate(Arc<slatedb::Error>),
336
337 SystemTime(#[from] SystemTimeError),
338
339 #[cfg(feature = "postgres")]
340 TokioPostgres(Arc<tokio_postgres::error::Error>),
341 TryFromInt(#[from] TryFromIntError),
342 TryFromSlice(#[from] TryFromSliceError),
343
344 TryGet(Arc<TryGetError>),
345
346 #[cfg(feature = "turso")]
347 Turso(Arc<turso::Error>),
348
349 UnexpectedBody(Box<Body>),
350
351 UnexpectedServiceResponse(Box<Response>),
352
353 #[cfg(feature = "turso")]
354 UnexpectedValue(turso::Value),
355
356 UnknownCacheKey(String),
357
358 UnsupportedStorageUrl(Url),
359 UnexpectedAddPartitionsToTxnRequest(Box<AddPartitionsToTxnRequest>),
360 Url(#[from] url::ParseError),
361 UnknownTxnState(String),
362
363 Uuid(#[from] uuid::Error),
364
365 UnableToSend,
366 OneshotRecv,
367}
368
369impl Display for Error {
370 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
371 write!(f, "{self:?}")
372 }
373}
374
375impl From<TryGetError> for Error {
376 fn from(value: TryGetError) -> Self {
377 Self::TryGet(Arc::new(value))
378 }
379}
380
381impl<T> From<PoisonError<T>> for Error {
382 fn from(_value: PoisonError<T>) -> Self {
383 Self::Poison
384 }
385}
386
387impl From<AcquireError> for Error {
388 fn from(value: AcquireError) -> Self {
389 Self::Acquire(Arc::new(value))
390 }
391}
392
393#[cfg(any(feature = "libsql", feature = "postgres"))]
394impl<E> From<PoolError<E>> for Error
395where
396 E: error::Error + Send + Sync + 'static,
397{
398 fn from(value: PoolError<E>) -> Self {
399 Self::Pool(Arc::new(Box::new(value)))
400 }
401}
402
403#[cfg(feature = "libsql")]
404impl From<libsql::Error> for Error {
405 fn from(value: libsql::Error) -> Self {
406 Self::LibSql(Arc::new(value))
407 }
408}
409
410#[cfg(feature = "slatedb")]
411impl From<slatedb::Error> for Error {
412 fn from(value: slatedb::Error) -> Self {
413 Self::Slate(Arc::new(value))
414 }
415}
416
417#[cfg(feature = "turso")]
418impl From<turso::Error> for Error {
419 fn from(value: turso::Error) -> Self {
420 Self::Turso(Arc::new(value))
421 }
422}
423
424impl From<GlobError> for Error {
425 fn from(value: GlobError) -> Self {
426 Self::Glob(Arc::new(value))
427 }
428}
429
430impl From<io::Error> for Error {
431 fn from(value: io::Error) -> Self {
432 Self::Io(Arc::new(value))
433 }
434}
435
436#[cfg(any(feature = "dynostore", feature = "slatedb"))]
437impl From<Arc<object_store::Error>> for Error {
438 fn from(value: Arc<object_store::Error>) -> Self {
439 Self::ObjectStore(value)
440 }
441}
442
443#[cfg(any(feature = "dynostore", feature = "slatedb"))]
444impl From<object_store::Error> for Error {
445 fn from(value: object_store::Error) -> Self {
446 Self::from(Arc::new(value))
447 }
448}
449
450impl From<ParseError> for Error {
451 fn from(value: ParseError) -> Self {
452 Self::ParseFilter(Arc::new(value))
453 }
454}
455
456impl From<PatternError> for Error {
457 fn from(value: PatternError) -> Self {
458 Self::Pattern(Arc::new(value))
459 }
460}
461
462impl From<serde_json::Error> for Error {
463 fn from(value: serde_json::Error) -> Self {
464 Self::from(Arc::new(value))
465 }
466}
467
468impl From<Arc<serde_json::Error>> for Error {
469 fn from(value: Arc<serde_json::Error>) -> Self {
470 Self::SerdeJson(value)
471 }
472}
473
474#[cfg(feature = "postgres")]
475impl From<tokio_postgres::error::Error> for Error {
476 fn from(value: tokio_postgres::error::Error) -> Self {
477 Self::from(Arc::new(value))
478 }
479}
480
481#[cfg(feature = "postgres")]
482impl From<Arc<tokio_postgres::error::Error>> for Error {
483 fn from(value: Arc<tokio_postgres::error::Error>) -> Self {
484 Self::TokioPostgres(value)
485 }
486}
487
488impl From<nisshi_schema::Error> for Error {
489 fn from(value: nisshi_schema::Error) -> Self {
490 if let nisshi_schema::Error::Api(error_code) = value {
491 Self::Api(error_code)
492 } else {
493 Self::Schema(Arc::new(value))
494 }
495 }
496}
497
498pub type Result<T, E = Error> = result::Result<T, E>;
499
500#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
504pub struct Topition {
505 topic: String,
506 partition: i32,
507}
508
509impl Topition {
510 pub fn new(topic: impl Into<String>, partition: i32) -> Self {
511 let topic = topic.into();
512 Self { topic, partition }
513 }
514
515 pub fn topic(&self) -> &str {
516 &self.topic
517 }
518
519 pub fn partition(&self) -> i32 {
520 self.partition
521 }
522}
523
524impl From<Cursor> for Topition {
525 fn from(value: Cursor) -> Self {
526 Self {
527 topic: value.topic_name,
528 partition: value.partition_index,
529 }
530 }
531}
532
533impl TryFrom<&DirEntry> for Topition {
534 type Error = Error;
535
536 fn try_from(value: &DirEntry) -> result::Result<Self, Self::Error> {
537 Regex::new(r"^(?<topic>.+)-(?<partition>\d{10})$")
538 .map_err(Into::into)
539 .and_then(|re| {
540 value
541 .file_name()
542 .into_string()
543 .map_err(Error::OsString)
544 .and_then(|ref file_name| {
545 re.captures(file_name)
546 .ok_or(Error::Message(format!("no captures for {file_name}")))
547 .and_then(|ref captures| {
548 let topic = captures
549 .name("topic")
550 .ok_or(Error::Message(format!("missing topic for {file_name}")))
551 .map(|s| s.as_str().to_owned())?;
552
553 let partition = captures
554 .name("partition")
555 .ok_or(Error::Message(format!(
556 "missing partition for: {file_name}"
557 )))
558 .map(|s| s.as_str())
559 .and_then(|s| str::parse(s).map_err(Into::into))?;
560
561 Ok(Self { topic, partition })
562 })
563 })
564 })
565 }
566}
567
568impl FromStr for Topition {
569 type Err = Error;
570
571 fn from_str(s: &str) -> result::Result<Self, Self::Err> {
572 i32::from_str(&s[s.len() - 10..])
573 .map(|partition| {
574 let topic = String::from(&s[..s.len() - 11]);
575
576 Self { topic, partition }
577 })
578 .map_err(Into::into)
579 }
580}
581
582impl From<&Topition> for PathBuf {
583 fn from(value: &Topition) -> Self {
584 let topic = value.topic.as_str();
585 let partition = value.partition;
586 PathBuf::from(format!("{topic}-{partition:0>10}"))
587 }
588}
589
590#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
594pub struct TopitionOffset {
595 topition: Topition,
596 offset: i64,
597}
598
599impl TopitionOffset {
600 pub fn new(topition: Topition, offset: i64) -> Self {
601 Self { topition, offset }
602 }
603
604 pub fn topition(&self) -> &Topition {
605 &self.topition
606 }
607
608 pub fn offset(&self) -> i64 {
609 self.offset
610 }
611}
612
613impl From<&TopitionOffset> for PathBuf {
614 fn from(value: &TopitionOffset) -> Self {
615 let offset = value.offset;
616 PathBuf::from(value.topition()).join(format!("{offset:0>20}"))
617 }
618}
619
620pub type ListOffsetRequest = ListOffset;
621
622#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
623pub struct ListOffsetResponse {
624 pub error_code: ErrorCode,
625 pub timestamp: Option<SystemTime>,
626 pub offset: Option<i64>,
627}
628
629impl Default for ListOffsetResponse {
630 fn default() -> Self {
631 Self {
632 error_code: ErrorCode::None,
633 timestamp: None,
634 offset: None,
635 }
636 }
637}
638
639impl ListOffsetResponse {
640 pub fn offset(&self) -> Option<i64> {
641 self.offset
642 }
643
644 pub fn timestamp(&self) -> Result<Option<i64>> {
645 self.timestamp.map_or(Ok(None), |system_time| {
646 to_timestamp(&system_time).map(Some).map_err(Into::into)
647 })
648 }
649
650 pub fn error_code(&self) -> ErrorCode {
651 self.error_code
652 }
653}
654
655#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
659pub struct OffsetCommitRequest {
660 offset: i64,
661 leader_epoch: Option<i32>,
662 timestamp: Option<SystemTime>,
663 metadata: Option<String>,
664}
665
666impl OffsetCommitRequest {
667 pub fn offset(self, offset: i64) -> Self {
668 Self { offset, ..self }
669 }
670}
671
672impl TryFrom<&OffsetCommitRequestPartition> for OffsetCommitRequest {
673 type Error = Error;
674
675 fn try_from(value: &OffsetCommitRequestPartition) -> Result<Self, Self::Error> {
676 value
677 .commit_timestamp
678 .map_or(Ok(None), |commit_timestamp| {
679 to_system_time(commit_timestamp)
680 .map(Some)
681 .map_err(Into::into)
682 })
683 .map(|timestamp| Self {
684 offset: value.committed_offset,
685 leader_epoch: value.committed_leader_epoch,
686 timestamp,
687 metadata: value.committed_metadata.clone(),
688 })
689 }
690}
691
692#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
696pub enum TopicId {
697 Name(String),
698 Id(Uuid),
699}
700
701impl FromStr for TopicId {
702 type Err = Error;
703
704 fn from_str(s: &str) -> result::Result<Self, Self::Err> {
705 Ok(Self::Name(s.into()))
706 }
707}
708
709impl From<&str> for TopicId {
710 fn from(value: &str) -> Self {
711 Self::Name(value.to_owned())
712 }
713}
714
715impl From<String> for TopicId {
716 fn from(value: String) -> Self {
717 Self::Name(value)
718 }
719}
720
721impl From<Uuid> for TopicId {
722 fn from(value: Uuid) -> Self {
723 Self::Id(value)
724 }
725}
726
727impl From<[u8; 16]> for TopicId {
728 fn from(value: [u8; 16]) -> Self {
729 Self::Id(Uuid::from_bytes(value))
730 }
731}
732
733impl From<&TopicId> for [u8; 16] {
734 fn from(value: &TopicId) -> Self {
735 match value {
736 TopicId::Id(id) => id.into_bytes(),
737 TopicId::Name(_) => NULL_TOPIC_ID,
738 }
739 }
740}
741
742impl From<&FetchTopic> for TopicId {
743 fn from(value: &FetchTopic) -> Self {
744 if let Some(ref name) = value.topic {
745 Self::Name(name.into())
746 } else if let Some(ref id) = value.topic_id {
747 Self::Id(Uuid::from_bytes(*id))
748 } else {
749 panic!("neither name nor uuid")
750 }
751 }
752}
753
754impl From<&MetadataRequestTopic> for TopicId {
755 fn from(value: &MetadataRequestTopic) -> Self {
756 if let Some(ref name) = value.name {
757 Self::Name(name.into())
758 } else if let Some(ref id) = value.topic_id {
759 Self::Id(Uuid::from_bytes(*id))
760 } else {
761 panic!("neither name nor uuid")
762 }
763 }
764}
765
766impl From<DeleteTopicState> for TopicId {
767 fn from(value: DeleteTopicState) -> Self {
768 match value {
769 DeleteTopicState {
770 name: Some(name),
771 topic_id,
772 ..
773 } if topic_id == NULL_TOPIC_ID => name.into(),
774
775 DeleteTopicState { topic_id, .. } => topic_id.into(),
776 }
777 }
778}
779
780impl From<&TopicRequest> for TopicId {
781 fn from(value: &TopicRequest) -> Self {
782 value.name.to_owned().into()
783 }
784}
785
786impl From<&Topition> for TopicId {
787 fn from(value: &Topition) -> Self {
788 value.topic.to_owned().into()
789 }
790}
791
792#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
796pub struct BrokerRegistrationRequest {
797 pub broker_id: i32,
798 pub cluster_id: String,
799 pub incarnation_id: Uuid,
800 pub rack: Option<String>,
801}
802
803#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
804pub struct MetadataResponse {
805 cluster: Option<String>,
806 controller: Option<i32>,
807 brokers: Vec<MetadataResponseBroker>,
808 topics: Vec<MetadataResponseTopic>,
809}
810
811impl MetadataResponse {
812 pub fn cluster(&self) -> Option<&str> {
813 self.cluster.as_deref()
814 }
815
816 pub fn controller(&self) -> Option<i32> {
817 self.controller
818 }
819
820 pub fn brokers(&self) -> &[MetadataResponseBroker] {
821 self.brokers.as_ref()
822 }
823
824 pub fn topics(&self) -> &[MetadataResponseTopic] {
825 self.topics.as_ref()
826 }
827}
828
829#[derive(
833 Clone, Copy, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
834)]
835pub struct OffsetStage {
836 last_stable: i64,
837 high_watermark: i64,
838 log_start: i64,
839}
840
841impl OffsetStage {
842 pub fn last_stable(&self) -> i64 {
843 self.last_stable
844 }
845
846 pub fn high_watermark(&self) -> i64 {
847 self.high_watermark
848 }
849
850 pub fn log_start(&self) -> i64 {
851 self.log_start
852 }
853}
854
855#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
857pub struct GroupMember {
858 pub join_response: JoinGroupResponseMember,
859 pub last_contact: Option<SystemTime>,
860}
861
862#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
866pub enum GroupState {
867 Forming {
868 protocol_type: Option<String>,
869 protocol_name: Option<String>,
870 leader: Option<String>,
871 },
872
873 Formed {
874 protocol_type: String,
875 protocol_name: String,
876 leader: String,
877 assignments: BTreeMap<String, Bytes>,
878 },
879}
880
881impl GroupState {
882 pub fn protocol_type(&self) -> Option<String> {
883 match self {
884 Self::Forming { protocol_type, .. } => protocol_type.clone(),
885 Self::Formed { protocol_type, .. } => Some(protocol_type.clone()),
886 }
887 }
888
889 pub fn protocol_name(&self) -> Option<String> {
890 match self {
891 Self::Forming { protocol_name, .. } => protocol_name.clone(),
892 Self::Formed { protocol_name, .. } => Some(protocol_name.clone()),
893 }
894 }
895
896 pub fn leader(&self) -> Option<String> {
897 match self {
898 Self::Forming { leader, .. } => leader.clone(),
899 Self::Formed { leader, .. } => Some(leader.clone()),
900 }
901 }
902
903 pub fn assignments(&self) -> BTreeMap<String, Bytes> {
904 match self {
905 Self::Forming { .. } => BTreeMap::new(),
906 Self::Formed { assignments, .. } => assignments.clone(),
907 }
908 }
909}
910
911impl Default for GroupState {
912 fn default() -> Self {
913 Self::Forming {
914 protocol_type: None,
915 protocol_name: Some("".into()),
916 leader: None,
917 }
918 }
919}
920
921#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
925pub enum ConsumerGroupState {
926 Unknown,
927 PreparingRebalance,
928 CompletingRebalance,
929 Stable,
930 Dead,
931 Empty,
932 Assigning,
933 Reconciling,
934}
935
936impl Display for ConsumerGroupState {
937 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
938 match self {
939 Self::Unknown => f.write_str("Unknown"),
940 Self::PreparingRebalance => f.write_str("PreparingRebalance"),
941 Self::CompletingRebalance => f.write_str("CompletingRebalance"),
942 Self::Stable => f.write_str("Stable"),
943 Self::Dead => f.write_str("Dead"),
944 Self::Empty => f.write_str("Empty"),
945 Self::Assigning => f.write_str("Assigning"),
946 Self::Reconciling => f.write_str("Reconciling"),
947 }
948 }
949}
950
951#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
955pub struct GroupDetail {
956 pub session_timeout_ms: i32,
957 pub rebalance_timeout_ms: Option<i32>,
958 pub members: BTreeMap<String, GroupMember>,
959 pub generation_id: i32,
960 pub skip_assignment: Option<bool>,
961 pub inception: SystemTime,
962 pub state: GroupState,
963}
964
965impl Default for GroupDetail {
966 fn default() -> Self {
967 Self {
968 session_timeout_ms: 45_000,
969 rebalance_timeout_ms: Default::default(),
970 members: Default::default(),
971 generation_id: Default::default(),
972 skip_assignment: Some(false),
973 inception: SystemTime::now(),
974 state: GroupState::default(),
975 }
976 }
977}
978
979impl From<&GroupDetail> for ConsumerGroupState {
980 fn from(value: &GroupDetail) -> Self {
981 match value {
982 GroupDetail { members, .. } if members.is_empty() => Self::Empty,
983
984 GroupDetail {
985 state: GroupState::Forming { leader: None, .. },
986 ..
987 } => Self::Assigning,
988
989 GroupDetail {
990 state: GroupState::Formed { .. },
991 ..
992 } => Self::Stable,
993
994 _ => {
995 debug!(unknown = ?value);
996 Self::Unknown
997 }
998 }
999 }
1000}
1001
1002impl From<&GroupDetail> for consumer_group_describe_response::DescribedGroup {
1003 fn from(value: &GroupDetail) -> Self {
1004 let assignor_name = match value.state {
1005 GroupState::Forming { ref leader, .. } => leader.clone().unwrap_or_default(),
1006 GroupState::Formed { ref leader, .. } => leader.clone(),
1007 };
1008
1009 let group_state = ConsumerGroupState::from(value).to_string();
1010
1011 Self::default()
1012 .error_code(ErrorCode::None.into())
1013 .error_message(Some(ErrorCode::None.to_string()))
1014 .group_id(Default::default())
1015 .group_state(group_state)
1016 .group_epoch(-1)
1017 .assignment_epoch(-1)
1018 .assignor_name(assignor_name)
1019 .members(Some([].into()))
1020 .authorized_operations(-1)
1021 }
1022}
1023
1024#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1025pub enum GroupDetailResponse {
1026 ErrorCode(ErrorCode),
1027 Found(GroupDetail),
1028}
1029
1030#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1035pub struct NamedGroupDetail {
1036 name: String,
1037 response: GroupDetailResponse,
1038}
1039
1040impl NamedGroupDetail {
1041 pub fn error_code(name: String, error_code: ErrorCode) -> Self {
1042 Self {
1043 name,
1044 response: GroupDetailResponse::ErrorCode(error_code),
1045 }
1046 }
1047
1048 pub fn found(name: String, found: GroupDetail) -> Self {
1049 Self {
1050 name,
1051 response: GroupDetailResponse::Found(found),
1052 }
1053 }
1054}
1055
1056impl From<&NamedGroupDetail> for consumer_group_describe_response::DescribedGroup {
1057 fn from(value: &NamedGroupDetail) -> Self {
1058 match value {
1059 NamedGroupDetail {
1060 name,
1061 response: GroupDetailResponse::Found(group_detail),
1062 } => {
1063 let assignor_name = match group_detail.state {
1064 GroupState::Forming { ref leader, .. } => leader.clone().unwrap_or_default(),
1065 GroupState::Formed { ref leader, .. } => leader.clone(),
1066 };
1067
1068 let group_state = ConsumerGroupState::from(group_detail).to_string();
1069
1070 Self::default()
1071 .error_code(ErrorCode::None.into())
1072 .error_message(Some(ErrorCode::None.to_string()))
1073 .group_id(name.into())
1074 .group_state(group_state)
1075 .group_epoch(-1)
1076 .assignment_epoch(-1)
1077 .assignor_name(assignor_name)
1078 .members(Some([].into()))
1079 .authorized_operations(-1)
1080 }
1081
1082 NamedGroupDetail {
1083 name,
1084 response: GroupDetailResponse::ErrorCode(error_code),
1085 } => Self::default()
1086 .error_code((*error_code).into())
1087 .error_message(Some(error_code.to_string()))
1088 .group_id(name.into())
1089 .group_state("Unknown".into())
1090 .group_epoch(-1)
1091 .assignment_epoch(-1)
1092 .assignor_name("".into())
1093 .members(Some([].into()))
1094 .authorized_operations(-1),
1095 }
1096 }
1097}
1098
1099impl From<&NamedGroupDetail> for describe_groups_response::DescribedGroup {
1100 fn from(value: &NamedGroupDetail) -> Self {
1101 match value {
1102 NamedGroupDetail {
1103 name,
1104 response: GroupDetailResponse::Found(group_detail),
1105 } => {
1106 let group_state = ConsumerGroupState::from(group_detail).to_string();
1107
1108 let members = group_detail
1109 .members
1110 .keys()
1111 .map(|member_id| {
1112 describe_groups_response::DescribedGroupMember::default()
1113 .member_id(member_id.into())
1114 .group_instance_id(None)
1115 .client_id("".into())
1116 .client_host("".into())
1117 .member_metadata(Bytes::new())
1118 .member_assignment(Bytes::new())
1119 })
1120 .collect::<Vec<_>>();
1121
1122 Self::default()
1123 .error_code(ErrorCode::None.into())
1124 .group_id(name.clone())
1125 .group_state(group_state)
1126 .protocol_type(group_detail.state.protocol_type().unwrap_or_default())
1127 .protocol_data("".into())
1128 .members(Some(members))
1129 .authorized_operations(Some(-1))
1130 }
1131
1132 NamedGroupDetail {
1133 name,
1134 response: GroupDetailResponse::ErrorCode(error_code),
1135 } => Self::default()
1136 .error_code((*error_code).into())
1137 .group_id(name.clone())
1138 .group_state("Unknown".into())
1139 .protocol_type("".into())
1140 .protocol_data("".into())
1141 .members(Some(vec![]))
1142 .authorized_operations(Some(-1)),
1143 }
1144 }
1145}
1146
1147#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1149pub struct TopitionDetail {
1150 error: ErrorCode,
1151 topic: TopicId,
1152 partitions: Option<Vec<PartitionDetail>>,
1153}
1154
1155#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1157pub struct PartitionDetail {
1158 error: ErrorCode,
1159 partition_index: i32,
1160}
1161
1162#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1164pub struct Version {
1165 e_tag: Option<String>,
1166 version: Option<String>,
1167}
1168
1169impl From<&Uuid> for Version {
1170 fn from(value: &Uuid) -> Self {
1171 Self {
1172 e_tag: Some(value.to_string()),
1173 version: None,
1174 }
1175 }
1176}
1177
1178#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1180pub struct ProducerIdResponse {
1181 pub error: ErrorCode,
1182 pub id: i64,
1183 pub epoch: i16,
1184}
1185
1186impl Default for ProducerIdResponse {
1187 fn default() -> Self {
1188 Self {
1189 error: ErrorCode::None,
1190 id: 1,
1191 epoch: 0,
1192 }
1193 }
1194}
1195
1196#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1199pub enum TxnAddPartitionsRequest {
1200 VersionZeroToThree {
1201 transaction_id: String,
1202 producer_id: i64,
1203 producer_epoch: i16,
1204 topics: Vec<AddPartitionsToTxnTopic>,
1205 },
1206
1207 VersionFourPlus {
1208 transactions: Vec<AddPartitionsToTxnTransaction>,
1209 },
1210}
1211
1212impl TryFrom<AddPartitionsToTxnRequest> for TxnAddPartitionsRequest {
1213 type Error = Error;
1214
1215 fn try_from(value: AddPartitionsToTxnRequest) -> result::Result<Self, Self::Error> {
1216 match value {
1217 AddPartitionsToTxnRequest {
1218 transactions: None,
1219 v_3_and_below_transactional_id: Some(transactional_id),
1220 v_3_and_below_producer_id: Some(producer_id),
1221 v_3_and_below_producer_epoch: Some(producer_epoch),
1222 v_3_and_below_topics: Some(topics),
1223 ..
1224 } => Ok(Self::VersionZeroToThree {
1225 transaction_id: transactional_id,
1226 producer_id,
1227 producer_epoch,
1228 topics,
1229 }),
1230
1231 AddPartitionsToTxnRequest {
1232 transactions: Some(transactions),
1233 v_3_and_below_transactional_id: None,
1234 v_3_and_below_producer_id: None,
1235 v_3_and_below_producer_epoch: None,
1236 v_3_and_below_topics: None,
1237 ..
1238 } => Ok(Self::VersionFourPlus { transactions }),
1239
1240 unexpected => Err(Error::UnexpectedAddPartitionsToTxnRequest(Box::new(
1241 unexpected,
1242 ))),
1243 }
1244 }
1245}
1246
1247#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1251pub enum TxnAddPartitionsResponse {
1252 VersionZeroToThree(Vec<AddPartitionsToTxnTopicResult>),
1253 VersionFourPlus(Vec<AddPartitionsToTxnResult>),
1254}
1255
1256impl TxnAddPartitionsResponse {
1257 pub fn zero_to_three(&self) -> &[AddPartitionsToTxnTopicResult] {
1258 match self {
1259 Self::VersionZeroToThree(result) => result.as_slice(),
1260 Self::VersionFourPlus(_) => &[][..],
1261 }
1262 }
1263
1264 pub fn four_plus(&self) -> &[AddPartitionsToTxnResult] {
1265 match self {
1266 Self::VersionZeroToThree(_) => &[][..],
1267 Self::VersionFourPlus(result) => result.as_slice(),
1268 }
1269 }
1270}
1271
1272#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1274pub struct TxnOffsetCommitRequest {
1275 pub transaction_id: String,
1276 pub group_id: String,
1277 pub producer_id: i64,
1278 pub producer_epoch: i16,
1279 pub generation_id: Option<i32>,
1280 pub member_id: Option<String>,
1281 pub group_instance_id: Option<String>,
1282 pub topics: Vec<TxnOffsetCommitRequestTopic>,
1283}
1284
1285#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
1287pub enum TxnState {
1288 Begin,
1289 PrepareCommit,
1290 PrepareAbort,
1291 Committed,
1292 Aborted,
1293}
1294
1295impl TxnState {
1296 pub fn is_prepared(&self) -> bool {
1297 match self {
1298 Self::PrepareAbort | Self::PrepareCommit => true,
1299 _otherwise => false,
1300 }
1301 }
1302}
1303
1304impl FromStr for TxnState {
1305 type Err = Error;
1306
1307 fn from_str(s: &str) -> Result<Self, Self::Err> {
1308 match s {
1309 "ABORTED" => Ok(TxnState::Aborted),
1310 "BEGIN" => Ok(TxnState::Begin),
1311 "COMMITTED" => Ok(TxnState::Committed),
1312 "PREPARE_ABORT" => Ok(TxnState::PrepareAbort),
1313 "PREPARE_COMMIT" => Ok(TxnState::PrepareCommit),
1314 otherwise => Err(Error::UnknownTxnState(otherwise.to_owned())),
1315 }
1316 }
1317}
1318
1319impl TryFrom<String> for TxnState {
1320 type Error = Error;
1321
1322 fn try_from(value: String) -> Result<Self, Self::Error> {
1323 Self::from_str(&value)
1324 }
1325}
1326
1327impl From<TxnState> for String {
1328 fn from(value: TxnState) -> Self {
1329 match value {
1330 TxnState::Begin => "BEGIN".into(),
1331 TxnState::PrepareCommit => "PREPARE_COMMIT".into(),
1332 TxnState::PrepareAbort => "PREPARE_ABORT".into(),
1333 TxnState::Committed => "COMMITTED".into(),
1334 TxnState::Aborted => "ABORTED".into(),
1335 }
1336 }
1337}
1338
1339#[async_trait]
1343pub trait Storage: Debug + Send + Sync + 'static {
1344 async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()>;
1346
1347 async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid>;
1349
1350 async fn incremental_alter_resource(
1352 &self,
1353 resource: AlterConfigsResource,
1354 ) -> Result<AlterConfigsResourceResponse>;
1355
1356 async fn delete_records(
1358 &self,
1359 topics: &[DeleteRecordsTopic],
1360 ) -> Result<Vec<DeleteRecordsTopicResult>>;
1361
1362 async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode>;
1364
1365 async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>>;
1367
1368 async fn produce(
1370 &self,
1371 transaction_id: Option<&str>,
1372 topition: &Topition,
1373 batch: deflated::Batch,
1374 ) -> Result<i64>;
1375
1376 async fn fetch(
1378 &self,
1379 topition: &'_ Topition,
1380 offset: i64,
1381 min_bytes: u32,
1382 max_bytes: u32,
1383 isolation: IsolationLevel,
1384 max_wait: Duration,
1385 ) -> Result<Vec<deflated::Batch>>;
1386
1387 async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage>;
1389
1390 async fn list_offsets(
1392 &self,
1393 isolation_level: IsolationLevel,
1394 offsets: &[(Topition, ListOffset)],
1395 ) -> Result<Vec<(Topition, ListOffsetResponse)>>;
1396
1397 async fn offset_commit(
1399 &self,
1400 group_id: &str,
1401 retention_time_ms: Option<Duration>,
1402 offsets: &[(Topition, OffsetCommitRequest)],
1403 ) -> Result<Vec<(Topition, ErrorCode)>>;
1404
1405 async fn offset_fetch(
1407 &self,
1408 group_id: Option<&str>,
1409 topics: &[Topition],
1410 require_stable: Option<bool>,
1411 ) -> Result<BTreeMap<Topition, i64>>;
1412
1413 async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>>;
1415
1416 async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse>;
1418
1419 async fn upsert_user_scram_credential(
1420 &self,
1421 user: &str,
1422 mechanism: ScramMechanism,
1423 credential: ScramCredential,
1424 ) -> Result<()>;
1425
1426 async fn delete_user_scram_credential(
1427 &self,
1428 user: &str,
1429 mechanism: ScramMechanism,
1430 ) -> Result<()>;
1431
1432 async fn user_scram_credential(
1433 &self,
1434 user: &str,
1435 mechanism: ScramMechanism,
1436 ) -> Result<Option<ScramCredential>>;
1437
1438 async fn describe_config(
1440 &self,
1441 name: &str,
1442 resource: ConfigResource,
1443 keys: Option<&[String]>,
1444 ) -> Result<DescribeConfigsResult>;
1445
1446 async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>>;
1448
1449 async fn delete_groups(
1451 &self,
1452 group_ids: Option<&[String]>,
1453 ) -> Result<Vec<DeletableGroupResult>>;
1454
1455 async fn describe_groups(
1457 &self,
1458 group_ids: Option<&[String]>,
1459 include_authorized_operations: bool,
1460 ) -> Result<Vec<NamedGroupDetail>>;
1461
1462 async fn describe_topic_partitions(
1464 &self,
1465 topics: Option<&[TopicId]>,
1466 partition_limit: i32,
1467 cursor: Option<Topition>,
1468 ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>>;
1469
1470 async fn update_group(
1472 &self,
1473 group_id: &str,
1474 detail: GroupDetail,
1475 version: Option<Version>,
1476 ) -> Result<Version, UpdateError<GroupDetail>>;
1477
1478 async fn init_producer(
1480 &self,
1481 transaction_id: Option<&str>,
1482 transaction_timeout_ms: i32,
1483 producer_id: Option<i64>,
1484 producer_epoch: Option<i16>,
1485 ) -> Result<ProducerIdResponse>;
1486
1487 async fn txn_add_offsets(
1489 &self,
1490 transaction_id: &str,
1491 producer_id: i64,
1492 producer_epoch: i16,
1493 group_id: &str,
1494 ) -> Result<ErrorCode>;
1495
1496 async fn txn_add_partitions(
1498 &self,
1499 partitions: TxnAddPartitionsRequest,
1500 ) -> Result<TxnAddPartitionsResponse>;
1501
1502 async fn txn_offset_commit(
1504 &self,
1505 offsets: TxnOffsetCommitRequest,
1506 ) -> Result<Vec<TxnOffsetCommitResponseTopic>>;
1507
1508 async fn txn_end(
1510 &self,
1511 transaction_id: &str,
1512 producer_id: i64,
1513 producer_epoch: i16,
1514 committed: bool,
1515 ) -> Result<ErrorCode>;
1516
1517 async fn maintain(&self, _now: SystemTime) -> Result<()> {
1519 Ok(())
1520 }
1521
1522 async fn cluster_id(&self) -> Result<String>;
1523
1524 async fn node(&self) -> Result<i32>;
1525
1526 async fn advertised_listener(&self) -> Result<Url>;
1527
1528 async fn ping(&self) -> Result<()>;
1529}
1530
1531fn _assert_trait_object(_s: &dyn Storage) {}
1534
1535#[async_trait]
1536impl<T> Storage for Arc<T>
1537where
1538 T: Storage + ?Sized,
1539{
1540 async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
1541 self.as_ref().register_broker(broker_registration).await
1542 }
1543
1544 async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
1545 self.as_ref().create_topic(topic, validate_only).await
1546 }
1547
1548 async fn incremental_alter_resource(
1549 &self,
1550 resource: AlterConfigsResource,
1551 ) -> Result<AlterConfigsResourceResponse> {
1552 self.as_ref().incremental_alter_resource(resource).await
1553 }
1554
1555 async fn delete_records(
1556 &self,
1557 topics: &[DeleteRecordsTopic],
1558 ) -> Result<Vec<DeleteRecordsTopicResult>> {
1559 self.as_ref().delete_records(topics).await
1560 }
1561
1562 async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
1563 self.as_ref().delete_topic(topic).await
1564 }
1565
1566 async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
1567 self.as_ref().brokers().await
1568 }
1569
1570 async fn produce(
1571 &self,
1572 transaction_id: Option<&str>,
1573 topition: &Topition,
1574 batch: deflated::Batch,
1575 ) -> Result<i64> {
1576 self.as_ref().produce(transaction_id, topition, batch).await
1577 }
1578
1579 async fn fetch(
1580 &self,
1581 topition: &'_ Topition,
1582 offset: i64,
1583 min_bytes: u32,
1584 max_bytes: u32,
1585 isolation: IsolationLevel,
1586 max_wait: Duration,
1587 ) -> Result<Vec<deflated::Batch>> {
1588 self.as_ref()
1589 .fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
1590 .await
1591 }
1592
1593 async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
1594 self.as_ref().offset_stage(topition).await
1595 }
1596
1597 async fn list_offsets(
1598 &self,
1599 isolation_level: IsolationLevel,
1600 offsets: &[(Topition, ListOffset)],
1601 ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
1602 self.as_ref().list_offsets(isolation_level, offsets).await
1603 }
1604
1605 async fn offset_commit(
1606 &self,
1607 group_id: &str,
1608 retention_time_ms: Option<Duration>,
1609 offsets: &[(Topition, OffsetCommitRequest)],
1610 ) -> Result<Vec<(Topition, ErrorCode)>> {
1611 self.as_ref()
1612 .offset_commit(group_id, retention_time_ms, offsets)
1613 .await
1614 }
1615
1616 async fn offset_fetch(
1617 &self,
1618 group_id: Option<&str>,
1619 topics: &[Topition],
1620 require_stable: Option<bool>,
1621 ) -> Result<BTreeMap<Topition, i64>> {
1622 self.as_ref()
1623 .offset_fetch(group_id, topics, require_stable)
1624 .await
1625 }
1626
1627 async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
1628 self.as_ref().committed_offset_topitions(group_id).await
1629 }
1630
1631 async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
1632 self.as_ref().metadata(topics).await
1633 }
1634
1635 async fn upsert_user_scram_credential(
1636 &self,
1637 user: &str,
1638 mechanism: ScramMechanism,
1639 credential: ScramCredential,
1640 ) -> Result<()> {
1641 self.as_ref()
1642 .upsert_user_scram_credential(user, mechanism, credential)
1643 .await
1644 }
1645
1646 async fn delete_user_scram_credential(
1647 &self,
1648 user: &str,
1649 mechanism: ScramMechanism,
1650 ) -> Result<()> {
1651 self.as_ref()
1652 .delete_user_scram_credential(user, mechanism)
1653 .await
1654 }
1655
1656 async fn user_scram_credential(
1657 &self,
1658 user: &str,
1659 mechanism: ScramMechanism,
1660 ) -> Result<Option<ScramCredential>> {
1661 self.as_ref().user_scram_credential(user, mechanism).await
1662 }
1663
1664 async fn describe_config(
1665 &self,
1666 name: &str,
1667 resource: ConfigResource,
1668 keys: Option<&[String]>,
1669 ) -> Result<DescribeConfigsResult> {
1670 self.as_ref().describe_config(name, resource, keys).await
1671 }
1672
1673 async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
1674 self.as_ref().list_groups(states_filter).await
1675 }
1676
1677 async fn delete_groups(
1678 &self,
1679 group_ids: Option<&[String]>,
1680 ) -> Result<Vec<DeletableGroupResult>> {
1681 self.as_ref().delete_groups(group_ids).await
1682 }
1683
1684 async fn describe_groups(
1685 &self,
1686 group_ids: Option<&[String]>,
1687 include_authorized_operations: bool,
1688 ) -> Result<Vec<NamedGroupDetail>> {
1689 self.as_ref()
1690 .describe_groups(group_ids, include_authorized_operations)
1691 .await
1692 }
1693
1694 async fn describe_topic_partitions(
1695 &self,
1696 topics: Option<&[TopicId]>,
1697 partition_limit: i32,
1698 cursor: Option<Topition>,
1699 ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
1700 self.as_ref()
1701 .describe_topic_partitions(topics, partition_limit, cursor)
1702 .await
1703 }
1704
1705 async fn update_group(
1706 &self,
1707 group_id: &str,
1708 detail: GroupDetail,
1709 version: Option<Version>,
1710 ) -> Result<Version, UpdateError<GroupDetail>> {
1711 self.as_ref().update_group(group_id, detail, version).await
1712 }
1713
1714 async fn init_producer(
1715 &self,
1716 transaction_id: Option<&str>,
1717 transaction_timeout_ms: i32,
1718 producer_id: Option<i64>,
1719 producer_epoch: Option<i16>,
1720 ) -> Result<ProducerIdResponse> {
1721 self.as_ref()
1722 .init_producer(
1723 transaction_id,
1724 transaction_timeout_ms,
1725 producer_id,
1726 producer_epoch,
1727 )
1728 .await
1729 }
1730
1731 async fn txn_add_offsets(
1732 &self,
1733 transaction_id: &str,
1734 producer_id: i64,
1735 producer_epoch: i16,
1736 group_id: &str,
1737 ) -> Result<ErrorCode> {
1738 self.as_ref()
1739 .txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
1740 .await
1741 }
1742
1743 async fn txn_add_partitions(
1744 &self,
1745 partitions: TxnAddPartitionsRequest,
1746 ) -> Result<TxnAddPartitionsResponse> {
1747 self.as_ref().txn_add_partitions(partitions).await
1748 }
1749
1750 async fn txn_offset_commit(
1751 &self,
1752 offsets: TxnOffsetCommitRequest,
1753 ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
1754 self.as_ref().txn_offset_commit(offsets).await
1755 }
1756
1757 async fn txn_end(
1758 &self,
1759 transaction_id: &str,
1760 producer_id: i64,
1761 producer_epoch: i16,
1762 committed: bool,
1763 ) -> Result<ErrorCode> {
1764 self.as_ref()
1765 .txn_end(transaction_id, producer_id, producer_epoch, committed)
1766 .await
1767 }
1768
1769 async fn maintain(&self, now: SystemTime) -> Result<()> {
1770 self.as_ref().maintain(now).await
1771 }
1772
1773 async fn cluster_id(&self) -> Result<String> {
1774 self.as_ref().cluster_id().await
1775 }
1776
1777 async fn node(&self) -> Result<i32> {
1778 self.as_ref().node().await
1779 }
1780
1781 async fn advertised_listener(&self) -> Result<Url> {
1782 self.as_ref().advertised_listener().await
1783 }
1784
1785 async fn ping(&self) -> Result<()> {
1786 self.as_ref().ping().await
1787 }
1788}
1789
1790#[async_trait]
1791impl<T> Storage for Box<T>
1792where
1793 T: Storage + ?Sized,
1794{
1795 async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
1796 self.as_ref().register_broker(broker_registration).await
1797 }
1798
1799 async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
1800 self.as_ref().create_topic(topic, validate_only).await
1801 }
1802
1803 async fn incremental_alter_resource(
1804 &self,
1805 resource: AlterConfigsResource,
1806 ) -> Result<AlterConfigsResourceResponse> {
1807 self.as_ref().incremental_alter_resource(resource).await
1808 }
1809
1810 async fn delete_records(
1811 &self,
1812 topics: &[DeleteRecordsTopic],
1813 ) -> Result<Vec<DeleteRecordsTopicResult>> {
1814 self.as_ref().delete_records(topics).await
1815 }
1816
1817 async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
1818 self.as_ref().delete_topic(topic).await
1819 }
1820
1821 async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
1822 self.as_ref().brokers().await
1823 }
1824
1825 async fn produce(
1826 &self,
1827 transaction_id: Option<&str>,
1828 topition: &Topition,
1829 batch: deflated::Batch,
1830 ) -> Result<i64> {
1831 self.as_ref().produce(transaction_id, topition, batch).await
1832 }
1833
1834 async fn fetch(
1835 &self,
1836 topition: &'_ Topition,
1837 offset: i64,
1838 min_bytes: u32,
1839 max_bytes: u32,
1840 isolation: IsolationLevel,
1841 max_wait: Duration,
1842 ) -> Result<Vec<deflated::Batch>> {
1843 self.as_ref()
1844 .fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
1845 .await
1846 }
1847
1848 async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
1849 self.as_ref().offset_stage(topition).await
1850 }
1851
1852 async fn list_offsets(
1853 &self,
1854 isolation_level: IsolationLevel,
1855 offsets: &[(Topition, ListOffset)],
1856 ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
1857 self.as_ref().list_offsets(isolation_level, offsets).await
1858 }
1859
1860 async fn offset_commit(
1861 &self,
1862 group_id: &str,
1863 retention_time_ms: Option<Duration>,
1864 offsets: &[(Topition, OffsetCommitRequest)],
1865 ) -> Result<Vec<(Topition, ErrorCode)>> {
1866 self.as_ref()
1867 .offset_commit(group_id, retention_time_ms, offsets)
1868 .await
1869 }
1870
1871 async fn offset_fetch(
1872 &self,
1873 group_id: Option<&str>,
1874 topics: &[Topition],
1875 require_stable: Option<bool>,
1876 ) -> Result<BTreeMap<Topition, i64>> {
1877 self.as_ref()
1878 .offset_fetch(group_id, topics, require_stable)
1879 .await
1880 }
1881
1882 async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
1883 self.as_ref().committed_offset_topitions(group_id).await
1884 }
1885
1886 async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
1887 self.as_ref().metadata(topics).await
1888 }
1889
1890 async fn upsert_user_scram_credential(
1891 &self,
1892 user: &str,
1893 mechanism: ScramMechanism,
1894 credential: ScramCredential,
1895 ) -> Result<()> {
1896 self.as_ref()
1897 .upsert_user_scram_credential(user, mechanism, credential)
1898 .await
1899 }
1900
1901 async fn delete_user_scram_credential(
1902 &self,
1903 user: &str,
1904 mechanism: ScramMechanism,
1905 ) -> Result<()> {
1906 self.as_ref()
1907 .delete_user_scram_credential(user, mechanism)
1908 .await
1909 }
1910
1911 async fn user_scram_credential(
1912 &self,
1913 user: &str,
1914 mechanism: ScramMechanism,
1915 ) -> Result<Option<ScramCredential>> {
1916 self.as_ref().user_scram_credential(user, mechanism).await
1917 }
1918
1919 async fn describe_config(
1920 &self,
1921 name: &str,
1922 resource: ConfigResource,
1923 keys: Option<&[String]>,
1924 ) -> Result<DescribeConfigsResult> {
1925 self.as_ref().describe_config(name, resource, keys).await
1926 }
1927
1928 async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
1929 self.as_ref().list_groups(states_filter).await
1930 }
1931
1932 async fn delete_groups(
1933 &self,
1934 group_ids: Option<&[String]>,
1935 ) -> Result<Vec<DeletableGroupResult>> {
1936 self.as_ref().delete_groups(group_ids).await
1937 }
1938
1939 async fn describe_groups(
1940 &self,
1941 group_ids: Option<&[String]>,
1942 include_authorized_operations: bool,
1943 ) -> Result<Vec<NamedGroupDetail>> {
1944 self.as_ref()
1945 .describe_groups(group_ids, include_authorized_operations)
1946 .await
1947 }
1948
1949 async fn describe_topic_partitions(
1950 &self,
1951 topics: Option<&[TopicId]>,
1952 partition_limit: i32,
1953 cursor: Option<Topition>,
1954 ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
1955 self.as_ref()
1956 .describe_topic_partitions(topics, partition_limit, cursor)
1957 .await
1958 }
1959
1960 async fn update_group(
1961 &self,
1962 group_id: &str,
1963 detail: GroupDetail,
1964 version: Option<Version>,
1965 ) -> Result<Version, UpdateError<GroupDetail>> {
1966 self.as_ref().update_group(group_id, detail, version).await
1967 }
1968
1969 async fn init_producer(
1970 &self,
1971 transaction_id: Option<&str>,
1972 transaction_timeout_ms: i32,
1973 producer_id: Option<i64>,
1974 producer_epoch: Option<i16>,
1975 ) -> Result<ProducerIdResponse> {
1976 self.as_ref()
1977 .init_producer(
1978 transaction_id,
1979 transaction_timeout_ms,
1980 producer_id,
1981 producer_epoch,
1982 )
1983 .await
1984 }
1985
1986 async fn txn_add_offsets(
1987 &self,
1988 transaction_id: &str,
1989 producer_id: i64,
1990 producer_epoch: i16,
1991 group_id: &str,
1992 ) -> Result<ErrorCode> {
1993 self.as_ref()
1994 .txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
1995 .await
1996 }
1997
1998 async fn txn_add_partitions(
1999 &self,
2000 partitions: TxnAddPartitionsRequest,
2001 ) -> Result<TxnAddPartitionsResponse> {
2002 self.as_ref().txn_add_partitions(partitions).await
2003 }
2004
2005 async fn txn_offset_commit(
2006 &self,
2007 offsets: TxnOffsetCommitRequest,
2008 ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
2009 self.as_ref().txn_offset_commit(offsets).await
2010 }
2011
2012 async fn txn_end(
2013 &self,
2014 transaction_id: &str,
2015 producer_id: i64,
2016 producer_epoch: i16,
2017 committed: bool,
2018 ) -> Result<ErrorCode> {
2019 self.as_ref()
2020 .txn_end(transaction_id, producer_id, producer_epoch, committed)
2021 .await
2022 }
2023
2024 async fn maintain(&self, now: SystemTime) -> Result<()> {
2025 self.as_ref().maintain(now).await
2026 }
2027
2028 async fn cluster_id(&self) -> Result<String> {
2029 self.as_ref().cluster_id().await
2030 }
2031
2032 async fn node(&self) -> Result<i32> {
2033 self.as_ref().node().await
2034 }
2035
2036 async fn advertised_listener(&self) -> Result<Url> {
2037 self.as_ref().advertised_listener().await
2038 }
2039
2040 async fn ping(&self) -> Result<()> {
2041 self.as_ref().ping().await
2042 }
2043}
2044
2045pub type DynStorage = dyn Storage;
2046pub type ArcDynStorage = Arc<DynStorage>;
2047
2048#[derive(Clone, Debug, thiserror::Error)]
2050pub enum UpdateError<T> {
2051 Error(#[from] Error),
2052
2053 MissingEtag,
2054
2055 Outdated { current: Box<T>, version: Version },
2056
2057 SerdeJson(Arc<serde_json::Error>),
2058
2059 Uuid(#[from] uuid::Error),
2060}
2061
2062#[cfg(feature = "libsql")]
2063impl<T> From<libsql::Error> for UpdateError<T> {
2064 fn from(value: libsql::Error) -> Self {
2065 Self::Error(Error::from(value))
2066 }
2067}
2068
2069#[cfg(feature = "turso")]
2070impl<T> From<turso::Error> for UpdateError<T> {
2071 fn from(value: turso::Error) -> Self {
2072 Self::Error(Error::from(value))
2073 }
2074}
2075
2076#[cfg(any(feature = "dynostore", feature = "slatedb"))]
2077impl<T> From<object_store::Error> for UpdateError<T> {
2078 fn from(value: object_store::Error) -> Self {
2079 Self::Error(Error::from(value))
2080 }
2081}
2082
2083impl<T> From<serde_json::Error> for UpdateError<T> {
2084 fn from(value: serde_json::Error) -> Self {
2085 Self::SerdeJson(Arc::new(value))
2086 }
2087}
2088
2089#[cfg(feature = "postgres")]
2090impl<T> From<tokio_postgres::error::Error> for UpdateError<T> {
2091 fn from(value: tokio_postgres::error::Error) -> Self {
2092 Self::Error(Error::from(value))
2093 }
2094}
2095
2096#[derive(Clone)]
2098#[cfg_attr(
2099 not(any(
2100 feature = "dynostore",
2101 feature = "libsql",
2102 feature = "postgres",
2103 feature = "slatedb",
2104 feature = "turso"
2105 )),
2106 allow(missing_copy_implementations)
2107)]
2108pub enum StorageContainer {
2109 Null(null::Engine),
2110
2111 #[cfg(feature = "postgres")]
2112 Postgres(Postgres),
2113
2114 #[cfg(feature = "dynostore")]
2115 DynoStore(DynoStore),
2116
2117 #[cfg(feature = "libsql")]
2118 Lite(lite::Engine),
2119
2120 #[cfg(feature = "slatedb")]
2121 Slate(slate::Engine),
2122
2123 #[cfg(feature = "turso")]
2124 Turso(limbo::Engine),
2125}
2126
2127impl Debug for StorageContainer {
2128 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
2129 match self {
2130 Self::Null(_) => f.debug_tuple(stringify!(StorageContainer::Null)).finish(),
2131
2132 #[cfg(feature = "postgres")]
2133 Self::Postgres(_) => f
2134 .debug_tuple(stringify!(StorageContainer::Postgres))
2135 .finish(),
2136
2137 #[cfg(feature = "dynostore")]
2138 Self::DynoStore(_) => f
2139 .debug_tuple(stringify!(StorageContainer::DynoStore))
2140 .finish(),
2141
2142 #[cfg(feature = "libsql")]
2143 Self::Lite(_) => f.debug_tuple(stringify!(StorageContainer::Lite)).finish(),
2144
2145 #[cfg(feature = "slatedb")]
2146 Self::Slate(_) => f.debug_tuple(stringify!(StorageContainer::Slate)).finish(),
2147
2148 #[cfg(feature = "turso")]
2149 Self::Turso(_) => f.debug_tuple(stringify!(StorageContainer::Turso)).finish(),
2150 }
2151 }
2152}
2153
2154impl StorageContainer {
2155 pub fn builder() -> PhantomBuilder {
2156 PhantomBuilder::default()
2157 }
2158}
2159
2160#[derive(Clone, Debug, Default)]
2162pub struct Builder<N, C, A, S> {
2163 node_id: N,
2164 cluster_id: C,
2165 advertised_listener: A,
2166 storage: S,
2167 schema_registry: Option<Registry>,
2168 lake_house: Option<House>,
2169 silent: bool,
2170
2171 cancellation: CancellationToken,
2172}
2173
2174type PhantomBuilder =
2175 Builder<PhantomData<i32>, PhantomData<String>, PhantomData<Url>, PhantomData<Url>>;
2176
2177impl<N, C, A, S> Builder<N, C, A, S> {
2178 pub fn node_id(self, node_id: i32) -> Builder<i32, C, A, S> {
2179 Builder {
2180 node_id,
2181 cluster_id: self.cluster_id,
2182 advertised_listener: self.advertised_listener,
2183 storage: self.storage,
2184 schema_registry: self.schema_registry,
2185 lake_house: self.lake_house,
2186 silent: self.silent,
2187 cancellation: self.cancellation,
2188 }
2189 }
2190
2191 pub fn cluster_id(self, cluster_id: impl Into<String>) -> Builder<N, String, A, S> {
2192 Builder {
2193 node_id: self.node_id,
2194 cluster_id: cluster_id.into(),
2195 advertised_listener: self.advertised_listener,
2196 storage: self.storage,
2197 schema_registry: self.schema_registry,
2198 lake_house: self.lake_house,
2199 silent: self.silent,
2200 cancellation: self.cancellation,
2201 }
2202 }
2203
2204 pub fn advertised_listener(self, advertised_listener: impl Into<Url>) -> Builder<N, C, Url, S> {
2205 Builder {
2206 node_id: self.node_id,
2207 cluster_id: self.cluster_id,
2208 advertised_listener: advertised_listener.into(),
2209 storage: self.storage,
2210 schema_registry: self.schema_registry,
2211 lake_house: self.lake_house,
2212 silent: self.silent,
2213 cancellation: self.cancellation,
2214 }
2215 }
2216
2217 pub fn storage(self, storage: Url) -> Builder<N, C, A, Url> {
2218 debug!(%storage);
2219
2220 Builder {
2221 node_id: self.node_id,
2222 cluster_id: self.cluster_id,
2223 advertised_listener: self.advertised_listener,
2224 storage,
2225 schema_registry: self.schema_registry,
2226 lake_house: self.lake_house,
2227 silent: self.silent,
2228 cancellation: self.cancellation,
2229 }
2230 }
2231
2232 pub fn schema_registry(self, schema_registry: Option<Registry>) -> Self {
2233 _ = schema_registry
2234 .as_ref()
2235 .inspect(|schema_registry| debug!(?schema_registry));
2236
2237 Self {
2238 schema_registry,
2239 ..self
2240 }
2241 }
2242
2243 pub fn lake_house(self, lake_house: Option<House>) -> Self {
2244 _ = lake_house
2245 .as_ref()
2246 .inspect(|lake_house| debug!(?lake_house));
2247
2248 Self { lake_house, ..self }
2249 }
2250
2251 pub fn cancellation(self, cancellation: CancellationToken) -> Self {
2252 Self {
2253 cancellation,
2254 ..self
2255 }
2256 }
2257
2258 pub fn silent(self, silent: bool) -> Self {
2259 Self { silent, ..self }
2260 }
2261}
2262
2263impl Builder<i32, String, Url, Url> {
2264 pub async fn build(self) -> Result<Arc<Box<dyn Storage>>> {
2265 let storage = match self.storage.scheme() {
2266 #[cfg(feature = "postgres")]
2267 "postgres" | "postgresql" => Postgres::builder(self.storage.to_string().as_str())
2268 .map(|builder| builder.cluster(self.cluster_id.as_str()))
2269 .map(|builder| builder.node(self.node_id))
2270 .map(|builder| builder.advertised_listener(self.advertised_listener.clone()))
2271 .map(|builder| builder.schemas(self.schema_registry))
2272 .map(|builder| builder.lake(self.lake_house.clone()))
2273 .map(|builder| builder.build())
2274 .map(|storage| Box::new(storage) as Box<dyn Storage>)
2275 .map(Arc::new),
2276
2277 #[cfg(not(feature = "postgres"))]
2278 "postgres" | "postgresql" => Err(Error::FeatureNotEnabled {
2279 feature: "postgres".into(),
2280 message: self.storage.to_string(),
2281 }),
2282
2283 #[cfg(feature = "dynostore")]
2284 "s3" => {
2285 use crate::batch::ProduceRequestBatcher;
2286
2287 let bucket_name = self.storage.host_str().unwrap_or("nisshi");
2288
2289 let minimum_size = self.storage.query_pairs().find_map(|(k, v)| {
2290 if k == "batch_min_size" {
2291 human_units::Size::from_str(v.as_ref())
2292 .map(|size| size.0)
2293 .inspect_err(|err| warn!(storage = %self.storage, v = v.as_ref(), ?err))
2294 .ok()
2295 .and_then(|size| usize::try_from(size).ok())
2296 } else {
2297 None
2298 }
2299 });
2300
2301 let maximum_delay = self.storage.query_pairs().find_map(|(k, v)| {
2302 if k == "batch_max_delay" {
2303 human_units::Duration::from_str(v.as_ref())
2304 .map(|duration| duration.0)
2305 .inspect_err(|err| warn!(storage = %self.storage, v = v.as_ref(), ?err))
2306 .ok()
2307 } else {
2308 None
2309 }
2310 });
2311
2312 debug!(?minimum_size, ?maximum_delay);
2313
2314 AmazonS3Builder::from_env()
2315 .with_bucket_name(bucket_name)
2316 .with_conditional_put(S3ConditionalPut::ETagMatch)
2317 .build()
2318 .map(|object_store| {
2319 DynoStore::new(self.cluster_id.as_str(), self.node_id, object_store)
2320 .advertised_listener(self.advertised_listener.clone())
2321 .schemas(self.schema_registry)
2322 .lake(self.lake_house.clone())
2323 })
2324 .map(|storage| {
2325 ProduceRequestBatcher::new(storage)
2326 .with_minimum_size(minimum_size)
2327 .with_maximum_delay(maximum_delay)
2328 })
2329 .map(|storage| Box::new(storage) as Box<dyn Storage>)
2330 .map(Arc::new)
2331 .map_err(Into::into)
2332 }
2333
2334 #[cfg(feature = "dynostore")]
2335 "gs" => {
2336 use std::num::NonZeroU32;
2337
2338 use object_store::gcp::GoogleCloudStorageBuilder;
2339
2340 use crate::{batch::ProduceRequestBatcher, gcs::limit::PutRateLimiter};
2341
2342 let bucket_name = self.storage.host_str().unwrap_or("nisshi");
2343
2344 let minimum_size = self.storage.query_pairs().find_map(|(k, v)| {
2345 if k == "batch_min_size" {
2346 human_units::Size::from_str(v.as_ref())
2347 .map(|size| size.0)
2348 .inspect_err(|err| warn!(storage = %self.storage, v = v.as_ref(), ?err))
2349 .ok()
2350 .and_then(|size| usize::try_from(size).ok())
2351 } else {
2352 None
2353 }
2354 });
2355
2356 let maximum_delay = self.storage.query_pairs().find_map(|(k, v)| {
2357 if k == "batch_max_delay" {
2358 human_units::Duration::from_str(v.as_ref())
2359 .map(|duration| duration.0)
2360 .inspect_err(|err| warn!(storage = %self.storage, v = v.as_ref(), ?err))
2361 .ok()
2362 } else {
2363 None
2364 }
2365 });
2366
2367 GoogleCloudStorageBuilder::from_env()
2368 .with_bucket_name(bucket_name)
2369 .build()
2370 .map(|object_store| {
2371 PutRateLimiter::new(object_store, Duration::from_mins(5))
2372 .with_rate_per_second(NonZeroU32::new(1))
2373 .with_jitter(Some(Duration::from_millis(50)))
2374 })
2375 .map(|object_store| {
2376 DynoStore::new(self.cluster_id.as_str(), self.node_id, object_store)
2377 .advertised_listener(self.advertised_listener.clone())
2378 .schemas(self.schema_registry)
2379 .lake(self.lake_house.clone())
2380 })
2381 .map(|storage| {
2382 ProduceRequestBatcher::new(storage)
2383 .with_minimum_size(minimum_size)
2384 .with_maximum_delay(maximum_delay)
2385 })
2386 .map(|storage| Box::new(storage) as Box<dyn Storage>)
2387 .map(Arc::new)
2388 .map_err(Into::into)
2389 }
2390
2391 #[cfg(feature = "dynostore")]
2392 "memory" => Ok(
2393 DynoStore::new(self.cluster_id.as_str(), self.node_id, InMemory::new())
2394 .advertised_listener(self.advertised_listener.clone())
2395 .schemas(self.schema_registry)
2396 .lake(self.lake_house.clone()),
2397 )
2398 .map(|storage| Box::new(storage) as Box<dyn Storage>)
2399 .map(Arc::new),
2400
2401 #[cfg(not(feature = "dynostore"))]
2402 "s3" | "memory" => Err(Error::FeatureNotEnabled {
2403 feature: "dynostore".into(),
2404 message: self.storage.to_string(),
2405 }),
2406
2407 #[cfg(feature = "libsql")]
2408 "sqlite" => {
2409 lite::Engine::builder()
2410 .storage(self.storage.clone())
2411 .node(self.node_id)
2412 .cluster(self.cluster_id.clone())
2413 .advertised_listener(self.advertised_listener.clone())
2414 .schemas(self.schema_registry)
2415 .lake(self.lake_house.clone())
2416 .cancellation(self.cancellation.clone())
2417 .build()
2418 .await
2419 }
2420
2421 #[cfg(not(feature = "libsql"))]
2422 "sqlite" => Err(Error::FeatureNotEnabled {
2423 feature: "libsql".into(),
2424 message: self.storage.to_string(),
2425 }),
2426
2427 #[cfg(feature = "slatedb")]
2428 "slatedb" => {
2429 use slatedb::Db;
2430 use slatedb::object_store::{
2431 ObjectStore as SlateObjectStore,
2432 aws::{
2433 AmazonS3Builder as SlateS3Builder,
2434 S3ConditionalPut as SlateS3ConditionalPut,
2435 },
2436 memory::InMemory as SlateInMemory,
2437 };
2438
2439 let host = self.storage.host_str().unwrap_or("nisshi");
2440 let db_path = format!("nisshi-{}.slatedb", self.cluster_id);
2441
2442 let object_store: Arc<dyn SlateObjectStore> = if host == "memory" {
2444 Arc::new(SlateInMemory::new())
2445 } else {
2446 SlateS3Builder::from_env()
2448 .with_bucket_name(host)
2449 .with_conditional_put(SlateS3ConditionalPut::ETagMatch)
2450 .build()
2451 .map(Arc::new)
2452 .map_err(|e| Error::Message(e.to_string()))?
2453 };
2454
2455 Db::open(db_path, object_store)
2456 .await
2457 .map(Arc::new)
2458 .map(|db| {
2459 slate::Engine::builder()
2460 .cluster(self.cluster_id.clone())
2461 .node(self.node_id)
2462 .advertised_listener(self.advertised_listener.clone())
2463 .db(db)
2464 .schemas(self.schema_registry)
2465 .lake(self.lake_house)
2466 .build()
2467 })
2468 .map(|storage| Box::new(storage) as Box<dyn Storage>)
2469 .map(Arc::new)
2470 .map_err(Into::into)
2471 }
2472
2473 #[cfg(not(feature = "slatedb"))]
2474 "slatedb" => Err(Error::FeatureNotEnabled {
2475 feature: "slatedb".into(),
2476 message: self.storage.to_string(),
2477 }),
2478
2479 #[cfg(feature = "turso")]
2480 "turso" => limbo::Engine::builder()
2481 .storage(self.storage.clone())
2482 .node(self.node_id)
2483 .cluster(self.cluster_id.clone())
2484 .advertised_listener(self.advertised_listener.clone())
2485 .schemas(self.schema_registry)
2486 .lake(self.lake_house.clone())
2487 .build()
2488 .await
2489 .map(|storage| Box::new(storage) as Box<dyn Storage>)
2490 .map(Arc::new),
2491
2492 #[cfg(not(feature = "turso"))]
2493 "turso" => Err(Error::FeatureNotEnabled {
2494 feature: "turso".into(),
2495 message: self.storage.to_string(),
2496 }),
2497
2498 "null" => Ok(null::Engine::new(
2499 self.cluster_id.clone(),
2500 self.node_id,
2501 self.advertised_listener.clone(),
2502 ))
2503 .map(|storage| Box::new(storage) as Box<dyn Storage>)
2504 .map(Arc::new),
2505
2506 #[cfg(not(any(
2507 feature = "dynostore",
2508 feature = "libsql",
2509 feature = "postgres",
2510 feature = "slatedb",
2511 feature = "turso"
2512 )))]
2513 _storage => Ok(null::Engine::new(
2514 self.cluster_id.clone(),
2515 self.node_id,
2516 self.advertised_listener.clone(),
2517 ))
2518 .map(|storage| Box::new(storage) as Box<dyn Storage>)
2519 .map(Arc::new),
2520
2521 #[cfg(any(
2522 feature = "dynostore",
2523 feature = "libsql",
2524 feature = "postgres",
2525 feature = "slatedb",
2526 feature = "turso"
2527 ))]
2528 _unsupported => Err(Error::UnsupportedStorageUrl(self.storage.clone())),
2529 }?;
2530
2531 let pb = if self.silent {
2532 None
2533 } else {
2534 let pb = ProgressBar::new(1);
2535 pb.set_style(
2536 ProgressStyle::with_template("[{elapsed}] {bar:40.cyan/blue} {msg}")
2537 .unwrap()
2538 .progress_chars("##-"),
2539 );
2540
2541 pb.set_message("connecting to storage");
2542
2543 Some(pb)
2544 };
2545
2546 storage.ping().await?;
2547
2548 if let Some(pb) = pb {
2549 pb.inc(1);
2550 pb.finish_with_message(format!("{} connected to storage", Emoji("✅", ""),));
2551 }
2552
2553 Ok(storage)
2554 }
2555}
2556
2557pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
2558 global::meter_with_scope(
2559 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
2560 .with_version(env!("CARGO_PKG_VERSION"))
2561 .with_schema_url(SCHEMA_URL)
2562 .build(),
2563 )
2564});
2565
2566static STORAGE_CONTAINER_REQUESTS: LazyLock<Counter<u64>> = LazyLock::new(|| {
2567 METER
2568 .u64_counter("nisshi_storage_container_requests")
2569 .with_description("nisshi storage container requests")
2570 .build()
2571});
2572
2573static STORAGE_CONTAINER_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
2574 METER
2575 .u64_counter("nisshi_storage_container_errors")
2576 .with_description("nisshi storage container errors")
2577 .build()
2578});
2579
2580#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
2581pub struct ScramCredential {
2582 pub salt: Bytes,
2583 pub iterations: i32,
2584 pub stored_key: Bytes,
2585 pub server_key: Bytes,
2586}
2587
2588#[async_trait]
2589impl Storage for StorageContainer {
2590 #[instrument(skip_all)]
2591 async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
2592 let attributes = [KeyValue::new("method", "register_broker")];
2593
2594 match self {
2595 #[cfg(feature = "dynostore")]
2596 Self::DynoStore(engine) => engine.register_broker(broker_registration),
2597
2598 #[cfg(feature = "libsql")]
2599 Self::Lite(engine) => engine.register_broker(broker_registration),
2600
2601 Self::Null(engine) => engine.register_broker(broker_registration),
2602
2603 #[cfg(feature = "postgres")]
2604 Self::Postgres(engine) => engine.register_broker(broker_registration),
2605
2606 #[cfg(feature = "slatedb")]
2607 Self::Slate(engine) => engine.register_broker(broker_registration),
2608
2609 #[cfg(feature = "turso")]
2610 Self::Turso(engine) => engine.register_broker(broker_registration),
2611 }
2612 .await
2613 .inspect(|_| {
2614 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2615 })
2616 .inspect_err(|_| {
2617 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2618 })
2619 }
2620
2621 #[instrument(skip_all)]
2622 async fn incremental_alter_resource(
2623 &self,
2624 resource: AlterConfigsResource,
2625 ) -> Result<AlterConfigsResourceResponse> {
2626 let attributes = [KeyValue::new("method", "incremental_alter_resource")];
2627
2628 match self {
2629 #[cfg(feature = "dynostore")]
2630 Self::DynoStore(engine) => engine.incremental_alter_resource(resource),
2631
2632 #[cfg(feature = "libsql")]
2633 Self::Lite(engine) => engine.incremental_alter_resource(resource),
2634
2635 Self::Null(engine) => engine.incremental_alter_resource(resource),
2636
2637 #[cfg(feature = "postgres")]
2638 Self::Postgres(engine) => engine.incremental_alter_resource(resource),
2639
2640 #[cfg(feature = "slatedb")]
2641 Self::Slate(engine) => engine.incremental_alter_resource(resource),
2642
2643 #[cfg(feature = "turso")]
2644 Self::Turso(engine) => engine.incremental_alter_resource(resource),
2645 }
2646 .await
2647 .inspect(|_| {
2648 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2649 })
2650 .inspect_err(|_| {
2651 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2652 })
2653 }
2654
2655 #[instrument(skip_all)]
2656 async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
2657 let attributes = [KeyValue::new("method", "create_topic")];
2658
2659 match self {
2660 #[cfg(feature = "dynostore")]
2661 Self::DynoStore(engine) => engine.create_topic(topic, validate_only),
2662
2663 #[cfg(feature = "libsql")]
2664 Self::Lite(engine) => engine.create_topic(topic, validate_only),
2665
2666 Self::Null(engine) => engine.create_topic(topic, validate_only),
2667
2668 #[cfg(feature = "postgres")]
2669 Self::Postgres(engine) => engine.create_topic(topic, validate_only),
2670
2671 #[cfg(feature = "turso")]
2672 Self::Turso(engine) => engine.create_topic(topic, validate_only),
2673
2674 #[cfg(feature = "slatedb")]
2675 Self::Slate(engine) => engine.create_topic(topic, validate_only),
2676 }
2677 .await
2678 .inspect(|_| {
2679 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2680 })
2681 .inspect_err(|_| {
2682 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2683 })
2684 }
2685
2686 #[instrument(skip_all)]
2687 async fn delete_records(
2688 &self,
2689 topics: &[DeleteRecordsTopic],
2690 ) -> Result<Vec<DeleteRecordsTopicResult>> {
2691 let attributes = [KeyValue::new("method", "delete_records")];
2692
2693 match self {
2694 #[cfg(feature = "dynostore")]
2695 Self::DynoStore(engine) => engine.delete_records(topics),
2696
2697 #[cfg(feature = "libsql")]
2698 Self::Lite(engine) => engine.delete_records(topics),
2699
2700 Self::Null(engine) => engine.delete_records(topics),
2701
2702 #[cfg(feature = "postgres")]
2703 Self::Postgres(engine) => engine.delete_records(topics),
2704
2705 #[cfg(feature = "slatedb")]
2706 Self::Slate(engine) => engine.delete_records(topics),
2707
2708 #[cfg(feature = "turso")]
2709 Self::Turso(engine) => engine.delete_records(topics),
2710 }
2711 .await
2712 .inspect(|_| {
2713 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2714 })
2715 .inspect_err(|_| {
2716 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2717 })
2718 }
2719
2720 #[instrument(skip_all)]
2721 async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
2722 let attributes = [KeyValue::new("method", "delete_topic")];
2723
2724 match self {
2725 #[cfg(feature = "dynostore")]
2726 Self::DynoStore(engine) => engine.delete_topic(topic),
2727
2728 #[cfg(feature = "libsql")]
2729 Self::Lite(engine) => engine.delete_topic(topic),
2730
2731 Self::Null(engine) => engine.delete_topic(topic),
2732
2733 #[cfg(feature = "postgres")]
2734 Self::Postgres(engine) => engine.delete_topic(topic),
2735
2736 #[cfg(feature = "slatedb")]
2737 Self::Slate(engine) => engine.delete_topic(topic),
2738
2739 #[cfg(feature = "turso")]
2740 Self::Turso(engine) => engine.delete_topic(topic),
2741 }
2742 .await
2743 .inspect(|_| {
2744 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2745 })
2746 .inspect_err(|_| {
2747 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2748 })
2749 }
2750
2751 #[instrument(skip_all)]
2752 async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
2753 let attributes = [KeyValue::new("method", "brokers")];
2754
2755 match self {
2756 #[cfg(feature = "dynostore")]
2757 Self::DynoStore(engine) => engine.brokers(),
2758
2759 #[cfg(feature = "libsql")]
2760 Self::Lite(engine) => engine.brokers(),
2761
2762 Self::Null(engine) => engine.brokers(),
2763
2764 #[cfg(feature = "postgres")]
2765 Self::Postgres(engine) => engine.brokers(),
2766
2767 #[cfg(feature = "slatedb")]
2768 Self::Slate(engine) => engine.brokers(),
2769
2770 #[cfg(feature = "turso")]
2771 Self::Turso(engine) => engine.brokers(),
2772 }
2773 .await
2774 .inspect(|_| {
2775 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2776 })
2777 .inspect_err(|_| {
2778 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2779 })
2780 }
2781
2782 #[instrument(skip_all)]
2783 async fn produce(
2784 &self,
2785 transaction_id: Option<&str>,
2786 topition: &Topition,
2787 batch: deflated::Batch,
2788 ) -> Result<i64> {
2789 let attributes = [KeyValue::new("method", "produce")];
2790
2791 match self {
2792 #[cfg(feature = "dynostore")]
2793 Self::DynoStore(engine) => engine.produce(transaction_id, topition, batch),
2794
2795 #[cfg(feature = "libsql")]
2796 Self::Lite(engine) => engine.produce(transaction_id, topition, batch),
2797
2798 Self::Null(engine) => engine.produce(transaction_id, topition, batch),
2799
2800 #[cfg(feature = "postgres")]
2801 Self::Postgres(engine) => engine.produce(transaction_id, topition, batch),
2802
2803 #[cfg(feature = "slatedb")]
2804 Self::Slate(engine) => engine.produce(transaction_id, topition, batch),
2805
2806 #[cfg(feature = "turso")]
2807 Self::Turso(engine) => engine.produce(transaction_id, topition, batch),
2808 }
2809 .await
2810 .inspect(|_| {
2811 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2812 })
2813 .inspect_err(|_| {
2814 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2815 })
2816 }
2817
2818 #[instrument(skip_all)]
2819 async fn fetch(
2820 &self,
2821 topition: &'_ Topition,
2822 offset: i64,
2823 min_bytes: u32,
2824 max_bytes: u32,
2825 isolation: IsolationLevel,
2826 max_wait: Duration,
2827 ) -> Result<Vec<deflated::Batch>> {
2828 let attributes = [KeyValue::new("method", "fetch")];
2829
2830 match self {
2831 #[cfg(feature = "dynostore")]
2832 Self::DynoStore(engine) => {
2833 engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2834 }
2835
2836 #[cfg(feature = "libsql")]
2837 Self::Lite(engine) => {
2838 engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2839 }
2840
2841 Self::Null(engine) => {
2842 engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2843 }
2844
2845 #[cfg(feature = "postgres")]
2846 Self::Postgres(engine) => {
2847 engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2848 }
2849
2850 #[cfg(feature = "slatedb")]
2851 Self::Slate(engine) => {
2852 engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2853 }
2854
2855 #[cfg(feature = "turso")]
2856 Self::Turso(engine) => {
2857 engine.fetch(topition, offset, min_bytes, max_bytes, isolation, max_wait)
2858 }
2859 }
2860 .await
2861 .inspect(|_| {
2862 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2863 })
2864 .inspect_err(|_| {
2865 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2866 })
2867 }
2868
2869 #[instrument(skip_all)]
2870 async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
2871 let attributes = [KeyValue::new("method", "offset_stage")];
2872
2873 match self {
2874 #[cfg(feature = "dynostore")]
2875 Self::DynoStore(engine) => engine.offset_stage(topition),
2876
2877 #[cfg(feature = "libsql")]
2878 Self::Lite(engine) => engine.offset_stage(topition),
2879
2880 Self::Null(engine) => engine.offset_stage(topition),
2881
2882 #[cfg(feature = "postgres")]
2883 Self::Postgres(engine) => engine.offset_stage(topition),
2884
2885 #[cfg(feature = "slatedb")]
2886 Self::Slate(engine) => engine.offset_stage(topition),
2887
2888 #[cfg(feature = "turso")]
2889 Self::Turso(engine) => engine.offset_stage(topition),
2890 }
2891 .await
2892 .inspect(|_| {
2893 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2894 })
2895 .inspect_err(|_| {
2896 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2897 })
2898 }
2899
2900 #[instrument(skip_all)]
2901 async fn list_offsets(
2902 &self,
2903 isolation_level: IsolationLevel,
2904 offsets: &[(Topition, ListOffset)],
2905 ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
2906 let attributes = [KeyValue::new("method", "list_offsets")];
2907
2908 match self {
2909 #[cfg(feature = "dynostore")]
2910 Self::DynoStore(engine) => engine.list_offsets(isolation_level, offsets),
2911
2912 #[cfg(feature = "libsql")]
2913 Self::Lite(engine) => engine.list_offsets(isolation_level, offsets),
2914
2915 Self::Null(engine) => engine.list_offsets(isolation_level, offsets),
2916
2917 #[cfg(feature = "postgres")]
2918 Self::Postgres(engine) => engine.list_offsets(isolation_level, offsets),
2919
2920 #[cfg(feature = "slatedb")]
2921 Self::Slate(engine) => engine.list_offsets(isolation_level, offsets),
2922
2923 #[cfg(feature = "turso")]
2924 Self::Turso(engine) => engine.list_offsets(isolation_level, offsets),
2925 }
2926 .await
2927 .inspect(|_| {
2928 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2929 })
2930 .inspect_err(|_| {
2931 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2932 })
2933 }
2934
2935 #[instrument(skip_all)]
2936 async fn offset_commit(
2937 &self,
2938 group_id: &str,
2939 retention_time_ms: Option<Duration>,
2940 offsets: &[(Topition, OffsetCommitRequest)],
2941 ) -> Result<Vec<(Topition, ErrorCode)>> {
2942 let attributes = [KeyValue::new("method", "offset_commit")];
2943
2944 match self {
2945 #[cfg(feature = "dynostore")]
2946 Self::DynoStore(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2947
2948 #[cfg(feature = "libsql")]
2949 Self::Lite(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2950
2951 Self::Null(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2952
2953 #[cfg(feature = "postgres")]
2954 Self::Postgres(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2955
2956 #[cfg(feature = "slatedb")]
2957 Self::Slate(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2958
2959 #[cfg(feature = "turso")]
2960 Self::Turso(engine) => engine.offset_commit(group_id, retention_time_ms, offsets),
2961 }
2962 .await
2963 .inspect(|_| {
2964 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2965 })
2966 .inspect_err(|_| {
2967 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2968 })
2969 }
2970
2971 #[instrument(skip_all)]
2972 async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
2973 let attributes = [KeyValue::new("method", "committed_offset_topitions")];
2974
2975 match self {
2976 #[cfg(feature = "dynostore")]
2977 Self::DynoStore(engine) => engine.committed_offset_topitions(group_id),
2978
2979 #[cfg(feature = "libsql")]
2980 Self::Lite(engine) => engine.committed_offset_topitions(group_id),
2981
2982 Self::Null(engine) => engine.committed_offset_topitions(group_id),
2983
2984 #[cfg(feature = "postgres")]
2985 Self::Postgres(engine) => engine.committed_offset_topitions(group_id),
2986
2987 #[cfg(feature = "slatedb")]
2988 Self::Slate(engine) => engine.committed_offset_topitions(group_id),
2989
2990 #[cfg(feature = "turso")]
2991 Self::Turso(engine) => engine.committed_offset_topitions(group_id),
2992 }
2993 .await
2994 .inspect(|_| {
2995 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
2996 })
2997 .inspect_err(|_| {
2998 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
2999 })
3000 }
3001
3002 #[instrument(skip_all)]
3003 async fn offset_fetch(
3004 &self,
3005 group_id: Option<&str>,
3006 topics: &[Topition],
3007 require_stable: Option<bool>,
3008 ) -> Result<BTreeMap<Topition, i64>> {
3009 let attributes = [KeyValue::new("method", "offset_fetch")];
3010
3011 match self {
3012 #[cfg(feature = "dynostore")]
3013 Self::DynoStore(engine) => engine.offset_fetch(group_id, topics, require_stable),
3014
3015 #[cfg(feature = "libsql")]
3016 Self::Lite(engine) => engine.offset_fetch(group_id, topics, require_stable),
3017
3018 Self::Null(engine) => engine.offset_fetch(group_id, topics, require_stable),
3019
3020 #[cfg(feature = "postgres")]
3021 Self::Postgres(engine) => engine.offset_fetch(group_id, topics, require_stable),
3022
3023 #[cfg(feature = "slatedb")]
3024 Self::Slate(engine) => engine.offset_fetch(group_id, topics, require_stable),
3025
3026 #[cfg(feature = "turso")]
3027 Self::Turso(engine) => engine.offset_fetch(group_id, topics, require_stable),
3028 }
3029 .await
3030 .inspect(|_| {
3031 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3032 })
3033 .inspect_err(|_| {
3034 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3035 })
3036 }
3037
3038 #[instrument(skip_all)]
3039 async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
3040 let attributes = [KeyValue::new("method", "metadata")];
3041
3042 match self {
3043 #[cfg(feature = "dynostore")]
3044 Self::DynoStore(engine) => engine.metadata(topics),
3045
3046 #[cfg(feature = "libsql")]
3047 Self::Lite(engine) => engine.metadata(topics),
3048
3049 Self::Null(engine) => engine.metadata(topics),
3050
3051 #[cfg(feature = "postgres")]
3052 Self::Postgres(engine) => engine.metadata(topics),
3053
3054 #[cfg(feature = "slatedb")]
3055 Self::Slate(engine) => engine.metadata(topics),
3056
3057 #[cfg(feature = "turso")]
3058 Self::Turso(engine) => engine.metadata(topics),
3059 }
3060 .await
3061 .inspect(|_| {
3062 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3063 })
3064 .inspect_err(|_| {
3065 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3066 })
3067 }
3068
3069 #[instrument(skip_all)]
3070 async fn describe_config(
3071 &self,
3072 name: &str,
3073 resource: ConfigResource,
3074 keys: Option<&[String]>,
3075 ) -> Result<DescribeConfigsResult> {
3076 let attributes = [KeyValue::new("method", "describe_config")];
3077
3078 match self {
3079 #[cfg(feature = "dynostore")]
3080 Self::DynoStore(engine) => engine.describe_config(name, resource, keys),
3081
3082 #[cfg(feature = "libsql")]
3083 Self::Lite(engine) => engine.describe_config(name, resource, keys),
3084
3085 Self::Null(engine) => engine.describe_config(name, resource, keys),
3086
3087 #[cfg(feature = "postgres")]
3088 Self::Postgres(engine) => engine.describe_config(name, resource, keys),
3089
3090 #[cfg(feature = "slatedb")]
3091 Self::Slate(engine) => engine.describe_config(name, resource, keys),
3092
3093 #[cfg(feature = "turso")]
3094 Self::Turso(engine) => engine.describe_config(name, resource, keys),
3095 }
3096 .await
3097 .inspect(|_| {
3098 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3099 })
3100 .inspect_err(|_| {
3101 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3102 })
3103 }
3104
3105 #[instrument(skip_all)]
3106 async fn describe_topic_partitions(
3107 &self,
3108 topics: Option<&[TopicId]>,
3109 partition_limit: i32,
3110 cursor: Option<Topition>,
3111 ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
3112 let attributes = [KeyValue::new("method", "describe_topic_partitions")];
3113
3114 match self {
3115 #[cfg(feature = "dynostore")]
3116 Self::DynoStore(engine) => {
3117 engine.describe_topic_partitions(topics, partition_limit, cursor)
3118 }
3119
3120 #[cfg(feature = "libsql")]
3121 Self::Lite(engine) => engine.describe_topic_partitions(topics, partition_limit, cursor),
3122
3123 Self::Null(engine) => engine.describe_topic_partitions(topics, partition_limit, cursor),
3124
3125 #[cfg(feature = "postgres")]
3126 Self::Postgres(engine) => {
3127 engine.describe_topic_partitions(topics, partition_limit, cursor)
3128 }
3129
3130 #[cfg(feature = "slatedb")]
3131 Self::Slate(engine) => {
3132 engine.describe_topic_partitions(topics, partition_limit, cursor)
3133 }
3134
3135 #[cfg(feature = "turso")]
3136 Self::Turso(engine) => {
3137 engine.describe_topic_partitions(topics, partition_limit, cursor)
3138 }
3139 }
3140 .await
3141 .inspect(|_| {
3142 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3143 })
3144 .inspect_err(|_| {
3145 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3146 })
3147 }
3148
3149 #[instrument(skip_all)]
3150 async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
3151 let attributes = [KeyValue::new("method", "list_groups")];
3152
3153 match self {
3154 #[cfg(feature = "dynostore")]
3155 Self::DynoStore(engine) => engine.list_groups(states_filter),
3156
3157 #[cfg(feature = "libsql")]
3158 Self::Lite(engine) => engine.list_groups(states_filter),
3159
3160 Self::Null(engine) => engine.list_groups(states_filter),
3161
3162 #[cfg(feature = "postgres")]
3163 Self::Postgres(engine) => engine.list_groups(states_filter),
3164
3165 #[cfg(feature = "slatedb")]
3166 Self::Slate(engine) => engine.list_groups(states_filter),
3167
3168 #[cfg(feature = "turso")]
3169 Self::Turso(engine) => engine.list_groups(states_filter),
3170 }
3171 .await
3172 .inspect(|_| {
3173 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3174 })
3175 .inspect_err(|_| {
3176 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3177 })
3178 }
3179
3180 #[instrument(skip_all)]
3181 async fn delete_groups(
3182 &self,
3183 group_ids: Option<&[String]>,
3184 ) -> Result<Vec<DeletableGroupResult>> {
3185 let attributes = [KeyValue::new("method", "delete_groups")];
3186
3187 match self {
3188 #[cfg(feature = "dynostore")]
3189 Self::DynoStore(engine) => engine.delete_groups(group_ids),
3190
3191 #[cfg(feature = "libsql")]
3192 Self::Lite(engine) => engine.delete_groups(group_ids),
3193
3194 Self::Null(engine) => engine.delete_groups(group_ids),
3195
3196 #[cfg(feature = "postgres")]
3197 Self::Postgres(engine) => engine.delete_groups(group_ids),
3198
3199 #[cfg(feature = "slatedb")]
3200 Self::Slate(engine) => engine.delete_groups(group_ids),
3201
3202 #[cfg(feature = "turso")]
3203 Self::Turso(engine) => engine.delete_groups(group_ids),
3204 }
3205 .await
3206 .inspect(|_| {
3207 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3208 })
3209 .inspect_err(|_| {
3210 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3211 })
3212 }
3213
3214 #[instrument(skip_all)]
3215 async fn describe_groups(
3216 &self,
3217 group_ids: Option<&[String]>,
3218 include_authorized_operations: bool,
3219 ) -> Result<Vec<NamedGroupDetail>> {
3220 let attributes = [KeyValue::new("method", "describe_groups")];
3221
3222 match self {
3223 #[cfg(feature = "dynostore")]
3224 Self::DynoStore(engine) => {
3225 engine.describe_groups(group_ids, include_authorized_operations)
3226 }
3227
3228 #[cfg(feature = "libsql")]
3229 Self::Lite(engine) => engine.describe_groups(group_ids, include_authorized_operations),
3230
3231 Self::Null(engine) => engine.describe_groups(group_ids, include_authorized_operations),
3232
3233 #[cfg(feature = "postgres")]
3234 Self::Postgres(engine) => {
3235 engine.describe_groups(group_ids, include_authorized_operations)
3236 }
3237
3238 #[cfg(feature = "slatedb")]
3239 Self::Slate(engine) => engine.describe_groups(group_ids, include_authorized_operations),
3240
3241 #[cfg(feature = "turso")]
3242 Self::Turso(engine) => engine.describe_groups(group_ids, include_authorized_operations),
3243 }
3244 .await
3245 .inspect(|_| {
3246 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3247 })
3248 .inspect_err(|_| {
3249 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3250 })
3251 }
3252
3253 #[instrument(skip_all)]
3254 async fn update_group(
3255 &self,
3256 group_id: &str,
3257 detail: GroupDetail,
3258 version: Option<Version>,
3259 ) -> Result<Version, UpdateError<GroupDetail>> {
3260 let attributes = [KeyValue::new("method", "update_group")];
3261
3262 match self {
3263 #[cfg(feature = "dynostore")]
3264 Self::DynoStore(engine) => engine.update_group(group_id, detail, version),
3265
3266 #[cfg(feature = "libsql")]
3267 Self::Lite(engine) => engine.update_group(group_id, detail, version),
3268
3269 Self::Null(engine) => engine.update_group(group_id, detail, version),
3270
3271 #[cfg(feature = "postgres")]
3272 Self::Postgres(engine) => engine.update_group(group_id, detail, version),
3273
3274 #[cfg(feature = "slatedb")]
3275 Self::Slate(engine) => engine.update_group(group_id, detail, version),
3276
3277 #[cfg(feature = "turso")]
3278 Self::Turso(engine) => engine.update_group(group_id, detail, version),
3279 }
3280 .await
3281 .inspect(|_| {
3282 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3283 })
3284 .inspect_err(|_| {
3285 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3286 })
3287 }
3288
3289 #[instrument(skip_all)]
3290 async fn init_producer(
3291 &self,
3292 transaction_id: Option<&str>,
3293 transaction_timeout_ms: i32,
3294 producer_id: Option<i64>,
3295 producer_epoch: Option<i16>,
3296 ) -> Result<ProducerIdResponse> {
3297 let attributes = [KeyValue::new("method", "init_producer")];
3298
3299 match self {
3300 #[cfg(feature = "dynostore")]
3301 Self::DynoStore(engine) => engine.init_producer(
3302 transaction_id,
3303 transaction_timeout_ms,
3304 producer_id,
3305 producer_epoch,
3306 ),
3307
3308 #[cfg(feature = "libsql")]
3309 Self::Lite(engine) => engine.init_producer(
3310 transaction_id,
3311 transaction_timeout_ms,
3312 producer_id,
3313 producer_epoch,
3314 ),
3315
3316 Self::Null(engine) => engine.init_producer(
3317 transaction_id,
3318 transaction_timeout_ms,
3319 producer_id,
3320 producer_epoch,
3321 ),
3322
3323 #[cfg(feature = "postgres")]
3324 Self::Postgres(engine) => engine.init_producer(
3325 transaction_id,
3326 transaction_timeout_ms,
3327 producer_id,
3328 producer_epoch,
3329 ),
3330
3331 #[cfg(feature = "slatedb")]
3332 Self::Slate(engine) => engine.init_producer(
3333 transaction_id,
3334 transaction_timeout_ms,
3335 producer_id,
3336 producer_epoch,
3337 ),
3338
3339 #[cfg(feature = "turso")]
3340 Self::Turso(engine) => engine.init_producer(
3341 transaction_id,
3342 transaction_timeout_ms,
3343 producer_id,
3344 producer_epoch,
3345 ),
3346 }
3347 .await
3348 .inspect(|_| {
3349 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3350 })
3351 .inspect_err(|_| {
3352 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3353 })
3354 }
3355
3356 #[instrument(skip_all)]
3357 async fn txn_add_offsets(
3358 &self,
3359 transaction_id: &str,
3360 producer_id: i64,
3361 producer_epoch: i16,
3362 group_id: &str,
3363 ) -> Result<ErrorCode> {
3364 let attributes = [KeyValue::new("method", "txn_add_offsets")];
3365
3366 match self {
3367 #[cfg(feature = "dynostore")]
3368 Self::DynoStore(engine) => {
3369 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3370 }
3371
3372 #[cfg(feature = "libsql")]
3373 Self::Lite(engine) => {
3374 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3375 }
3376
3377 Self::Null(engine) => {
3378 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3379 }
3380
3381 #[cfg(feature = "postgres")]
3382 Self::Postgres(engine) => {
3383 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3384 }
3385
3386 #[cfg(feature = "slatedb")]
3387 Self::Slate(engine) => {
3388 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3389 }
3390
3391 #[cfg(feature = "turso")]
3392 Self::Turso(engine) => {
3393 engine.txn_add_offsets(transaction_id, producer_id, producer_epoch, group_id)
3394 }
3395 }
3396 .await
3397 .inspect(|_| {
3398 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3399 })
3400 .inspect_err(|_| {
3401 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3402 })
3403 }
3404
3405 #[instrument(skip_all)]
3406 async fn txn_add_partitions(
3407 &self,
3408 partitions: TxnAddPartitionsRequest,
3409 ) -> Result<TxnAddPartitionsResponse> {
3410 let attributes = [KeyValue::new("method", "txn_add_partitions")];
3411
3412 match self {
3413 #[cfg(feature = "dynostore")]
3414 Self::DynoStore(engine) => engine.txn_add_partitions(partitions),
3415
3416 #[cfg(feature = "libsql")]
3417 Self::Lite(engine) => engine.txn_add_partitions(partitions),
3418
3419 Self::Null(engine) => engine.txn_add_partitions(partitions),
3420
3421 #[cfg(feature = "postgres")]
3422 Self::Postgres(engine) => engine.txn_add_partitions(partitions),
3423
3424 #[cfg(feature = "slatedb")]
3425 Self::Slate(engine) => engine.txn_add_partitions(partitions),
3426
3427 #[cfg(feature = "turso")]
3428 Self::Turso(engine) => engine.txn_add_partitions(partitions),
3429 }
3430 .await
3431 .inspect(|_| {
3432 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3433 })
3434 .inspect_err(|_| {
3435 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3436 })
3437 }
3438
3439 #[instrument(skip_all)]
3440 async fn txn_offset_commit(
3441 &self,
3442 offsets: TxnOffsetCommitRequest,
3443 ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
3444 let attributes = [KeyValue::new("method", "txn_offset_commit")];
3445
3446 match self {
3447 #[cfg(feature = "dynostore")]
3448 Self::DynoStore(engine) => engine.txn_offset_commit(offsets),
3449
3450 #[cfg(feature = "libsql")]
3451 Self::Lite(engine) => engine.txn_offset_commit(offsets),
3452
3453 Self::Null(engine) => engine.txn_offset_commit(offsets),
3454
3455 #[cfg(feature = "postgres")]
3456 Self::Postgres(engine) => engine.txn_offset_commit(offsets),
3457
3458 #[cfg(feature = "slatedb")]
3459 Self::Slate(engine) => engine.txn_offset_commit(offsets),
3460
3461 #[cfg(feature = "turso")]
3462 Self::Turso(engine) => engine.txn_offset_commit(offsets),
3463 }
3464 .await
3465 .inspect(|_| {
3466 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3467 })
3468 .inspect_err(|_| {
3469 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3470 })
3471 }
3472
3473 #[instrument(skip_all)]
3474 async fn txn_end(
3475 &self,
3476 transaction_id: &str,
3477 producer_id: i64,
3478 producer_epoch: i16,
3479 committed: bool,
3480 ) -> Result<ErrorCode> {
3481 let attributes = [KeyValue::new("method", "txn_end")];
3482
3483 match self {
3484 #[cfg(feature = "dynostore")]
3485 Self::DynoStore(engine) => {
3486 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3487 }
3488
3489 #[cfg(feature = "libsql")]
3490 Self::Lite(engine) => {
3491 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3492 }
3493
3494 Self::Null(engine) => {
3495 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3496 }
3497
3498 #[cfg(feature = "postgres")]
3499 Self::Postgres(engine) => {
3500 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3501 }
3502
3503 #[cfg(feature = "slatedb")]
3504 Self::Slate(engine) => {
3505 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3506 }
3507
3508 #[cfg(feature = "turso")]
3509 Self::Turso(engine) => {
3510 engine.txn_end(transaction_id, producer_id, producer_epoch, committed)
3511 }
3512 }
3513 .await
3514 .inspect(|_| {
3515 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3516 })
3517 .inspect_err(|_| {
3518 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3519 })
3520 }
3521
3522 #[instrument(skip_all)]
3523 async fn maintain(&self, now: SystemTime) -> Result<()> {
3524 let attributes = [KeyValue::new("method", "maintain")];
3525
3526 match self {
3527 #[cfg(feature = "dynostore")]
3528 Self::DynoStore(engine) => engine.maintain(now),
3529
3530 #[cfg(feature = "libsql")]
3531 Self::Lite(engine) => engine.maintain(now),
3532
3533 Self::Null(engine) => engine.maintain(now),
3534
3535 #[cfg(feature = "postgres")]
3536 Self::Postgres(engine) => engine.maintain(now),
3537
3538 #[cfg(feature = "slatedb")]
3539 Self::Slate(engine) => engine.maintain(now),
3540
3541 #[cfg(feature = "turso")]
3542 Self::Turso(engine) => engine.maintain(now),
3543 }
3544 .await
3545 .inspect(|maintain| {
3546 debug!(?maintain);
3547 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3548 })
3549 .inspect_err(|err| {
3550 debug!(?err);
3551 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3552 })
3553 }
3554
3555 #[instrument(skip_all)]
3556 async fn cluster_id(&self) -> Result<String> {
3557 match self {
3558 #[cfg(feature = "dynostore")]
3559 Self::DynoStore(engine) => engine.cluster_id().await,
3560
3561 #[cfg(feature = "libsql")]
3562 Self::Lite(engine) => engine.cluster_id().await,
3563
3564 Self::Null(engine) => engine.cluster_id().await,
3565
3566 #[cfg(feature = "postgres")]
3567 Self::Postgres(engine) => engine.cluster_id().await,
3568
3569 #[cfg(feature = "slatedb")]
3570 Self::Slate(engine) => engine.cluster_id().await,
3571
3572 #[cfg(feature = "turso")]
3573 Self::Turso(engine) => engine.cluster_id().await,
3574 }
3575 }
3576
3577 #[instrument(skip_all)]
3578 async fn node(&self) -> Result<i32> {
3579 match self {
3580 #[cfg(feature = "dynostore")]
3581 Self::DynoStore(engine) => engine.node().await,
3582
3583 #[cfg(feature = "libsql")]
3584 Self::Lite(engine) => engine.node().await,
3585
3586 Self::Null(engine) => engine.node().await,
3587
3588 #[cfg(feature = "postgres")]
3589 Self::Postgres(engine) => engine.node().await,
3590
3591 #[cfg(feature = "slatedb")]
3592 Self::Slate(engine) => engine.node().await,
3593
3594 #[cfg(feature = "turso")]
3595 Self::Turso(engine) => engine.node().await,
3596 }
3597 }
3598
3599 #[instrument(skip_all)]
3600 async fn advertised_listener(&self) -> Result<Url> {
3601 match self {
3602 #[cfg(feature = "dynostore")]
3603 Self::DynoStore(engine) => engine.advertised_listener().await,
3604
3605 #[cfg(feature = "libsql")]
3606 Self::Lite(engine) => engine.advertised_listener().await,
3607
3608 Self::Null(engine) => engine.advertised_listener().await,
3609
3610 #[cfg(feature = "postgres")]
3611 Self::Postgres(engine) => engine.advertised_listener().await,
3612
3613 #[cfg(feature = "slatedb")]
3614 Self::Slate(engine) => engine.advertised_listener().await,
3615
3616 #[cfg(feature = "turso")]
3617 Self::Turso(engine) => engine.advertised_listener().await,
3618 }
3619 }
3620
3621 async fn delete_user_scram_credential(
3622 &self,
3623 user: &str,
3624 mechanism: ScramMechanism,
3625 ) -> Result<()> {
3626 match self {
3627 #[cfg(feature = "dynostore")]
3628 Self::DynoStore(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3629
3630 #[cfg(feature = "libsql")]
3631 Self::Lite(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3632
3633 Self::Null(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3634
3635 #[cfg(feature = "postgres")]
3636 Self::Postgres(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3637
3638 #[cfg(feature = "slatedb")]
3639 Self::Slate(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3640
3641 #[cfg(feature = "turso")]
3642 Self::Turso(engine) => engine.delete_user_scram_credential(user, mechanism).await,
3643 }
3644 }
3645
3646 async fn upsert_user_scram_credential(
3647 &self,
3648 user: &str,
3649 mechanism: ScramMechanism,
3650 credential: ScramCredential,
3651 ) -> Result<()> {
3652 match self {
3653 #[cfg(feature = "dynostore")]
3654 Self::DynoStore(engine) => {
3655 engine
3656 .upsert_user_scram_credential(user, mechanism, credential)
3657 .await
3658 }
3659
3660 #[cfg(feature = "libsql")]
3661 Self::Lite(engine) => {
3662 engine
3663 .upsert_user_scram_credential(user, mechanism, credential)
3664 .await
3665 }
3666
3667 Self::Null(engine) => {
3668 engine
3669 .upsert_user_scram_credential(user, mechanism, credential)
3670 .await
3671 }
3672
3673 #[cfg(feature = "postgres")]
3674 Self::Postgres(engine) => {
3675 engine
3676 .upsert_user_scram_credential(user, mechanism, credential)
3677 .await
3678 }
3679
3680 #[cfg(feature = "slatedb")]
3681 Self::Slate(engine) => {
3682 engine
3683 .upsert_user_scram_credential(user, mechanism, credential)
3684 .await
3685 }
3686
3687 #[cfg(feature = "turso")]
3688 Self::Turso(engine) => {
3689 engine
3690 .upsert_user_scram_credential(user, mechanism, credential)
3691 .await
3692 }
3693 }
3694 }
3695
3696 async fn user_scram_credential(
3697 &self,
3698 user: &str,
3699 mechanism: ScramMechanism,
3700 ) -> Result<Option<ScramCredential>> {
3701 match self {
3702 #[cfg(feature = "dynostore")]
3703 Self::DynoStore(engine) => engine.user_scram_credential(user, mechanism).await,
3704
3705 #[cfg(feature = "libsql")]
3706 Self::Lite(engine) => engine.user_scram_credential(user, mechanism).await,
3707
3708 Self::Null(engine) => engine.user_scram_credential(user, mechanism).await,
3709
3710 #[cfg(feature = "postgres")]
3711 Self::Postgres(engine) => engine.user_scram_credential(user, mechanism).await,
3712
3713 #[cfg(feature = "slatedb")]
3714 Self::Slate(engine) => engine.user_scram_credential(user, mechanism).await,
3715
3716 #[cfg(feature = "turso")]
3717 Self::Turso(engine) => engine.user_scram_credential(user, mechanism).await,
3718 }
3719 }
3720
3721 #[instrument(skip_all)]
3722 async fn ping(&self) -> Result<()> {
3723 let attributes = [KeyValue::new("method", "ping")];
3724
3725 match self {
3726 #[cfg(feature = "dynostore")]
3727 Self::DynoStore(engine) => engine.ping(),
3728
3729 #[cfg(feature = "libsql")]
3730 Self::Lite(engine) => engine.ping(),
3731
3732 Self::Null(engine) => engine.ping(),
3733
3734 #[cfg(feature = "postgres")]
3735 Self::Postgres(engine) => engine.ping(),
3736
3737 #[cfg(feature = "turso")]
3738 Self::Turso(engine) => engine.ping(),
3739
3740 #[cfg(feature = "slatedb")]
3741 Self::Slate(engine) => engine.ping(),
3742 }
3743 .await
3744 .inspect(|_| {
3745 STORAGE_CONTAINER_REQUESTS.add(1, &attributes);
3746 })
3747 .inspect_err(|_| {
3748 STORAGE_CONTAINER_ERRORS.add(1, &attributes);
3749 })
3750 }
3751}
3752
3753#[cfg(test)]
3754mod tests {
3755 use super::*;
3756
3757 #[test]
3758 fn topition_from_str() -> Result<()> {
3759 let topition = Topition::from_str("qwerty-2147483647")?;
3760 assert_eq!("qwerty", topition.topic());
3761 assert_eq!(i32::MAX, topition.partition());
3762 Ok(())
3763 }
3764
3765 #[test]
3766 fn topic_with_dashes_in_name() -> Result<()> {
3767 let topition = Topition::from_str("test-topic-0000000-eFC79C8-2147483647")?;
3768 assert_eq!("test-topic-0000000-eFC79C8", topition.topic());
3769 assert_eq!(i32::MAX, topition.partition());
3770 Ok(())
3771 }
3772}