1use std::{
16 collections::HashMap,
17 env::vars,
18 fmt, io,
19 net::AddrParseError,
20 num::TryFromIntError,
21 result,
22 str::{FromStr, Utf8Error},
23 string::FromUtf8Error,
24 sync::{Arc, LazyLock, PoisonError},
25 time::Duration,
26};
27
28use glob::PatternError;
29use jsonschema::ValidationError;
30use opentelemetry::{InstrumentationScope, global, metrics::Meter};
31use opentelemetry_otlp::ExporterBuildError;
32use opentelemetry_semantic_conventions::SCHEMA_URL;
33use regex::{Regex, Replacer};
34use tansu_sans_io::ErrorCode;
35use thiserror::Error;
36use tokio::{sync::broadcast::error::SendError, task::JoinError};
37use tracing_subscriber::filter::ParseError;
38use url::Url;
39
40pub mod broker;
41pub mod coordinator;
42pub mod otel;
43pub mod service;
44
45#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
46pub enum CancelKind {
47 Interrupt,
48 Terminate,
49}
50
51impl From<CancelKind> for Duration {
52 fn from(cancellation: CancelKind) -> Self {
53 Duration::from_millis(match cancellation {
54 CancelKind::Interrupt => 0,
55 CancelKind::Terminate => 5_000,
56 })
57 }
58}
59
60pub const NODE_ID: i32 = 111;
61
62pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
63 global::meter_with_scope(
64 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
65 .with_version(env!("CARGO_PKG_VERSION"))
66 .with_schema_url(SCHEMA_URL)
67 .build(),
68 )
69});
70
71#[derive(Clone, Debug, Error)]
72pub enum Error {
73 AddrParse(#[from] AddrParseError),
74 Api(ErrorCode),
75 Auth(#[from] tansu_auth::Error),
76 Custom(String),
77 DuplicateApiService(i16),
78 EmptyCoordinatorWrapper,
79 EmptyJoinGroupRequestProtocol,
80 ExpectedJoinGroupRequestProtocol(&'static str),
81 ExporterBuild(Arc<ExporterBuildError>),
82
83 Hyper(Arc<hyper::http::Error>),
84 Io(Arc<io::Error>),
85 Join(Arc<JoinError>),
86 Json(Arc<serde_json::Error>),
87 KafkaProtocol(#[from] tansu_sans_io::Error),
88
89 #[cfg(feature = "libsql")]
90 LibSql(Arc<libsql::Error>),
91
92 Message(String),
93 Model(#[from] tansu_model::Error),
94
95 #[cfg(feature = "dynostore")]
96 ObjectStore(Arc<object_store::Error>),
97
98 ParseFilter(Arc<ParseError>),
99 ParseInt(#[from] std::num::ParseIntError),
100 Pattern(Arc<PatternError>),
101 Poison,
102
103 #[cfg(feature = "postgres")]
104 Pool(Arc<deadpool_postgres::PoolError>),
105
106 SchemaRegistry(Arc<tansu_schema::Error>),
107 Service(#[from] tansu_service::Error),
108 Storage(#[from] tansu_storage::Error),
109 StringUtf8(#[from] FromUtf8Error),
110 Regex(#[from] regex::Error),
111
112 #[cfg(feature = "postgres")]
113 TokioPostgres(Arc<tokio_postgres::error::Error>),
114 TryFromInt(#[from] TryFromIntError),
115
116 #[cfg(feature = "turso")]
117 Turso(Arc<turso::Error>),
118
119 UnsupportedApiService(i16),
120 UnsupportedStorageUrl(Url),
121 UnsupportedTracingFormat(String),
122 Url(#[from] url::ParseError),
123 Utf8(#[from] Utf8Error),
124 Uuid(#[from] uuid::Error),
125 SchemaValidation,
126 Send(Arc<SendError<CancelKind>>),
127}
128
129#[cfg(feature = "libsql")]
130impl From<libsql::Error> for Error {
131 fn from(value: libsql::Error) -> Self {
132 Self::from(Arc::new(value))
133 }
134}
135
136#[cfg(feature = "libsql")]
137impl From<Arc<libsql::Error>> for Error {
138 fn from(value: Arc<libsql::Error>) -> Self {
139 Self::LibSql(value)
140 }
141}
142
143#[cfg(feature = "turso")]
144impl From<turso::Error> for Error {
145 fn from(value: turso::Error) -> Self {
146 Self::from(Arc::new(value))
147 }
148}
149
150#[cfg(feature = "turso")]
151impl From<Arc<turso::Error>> for Error {
152 fn from(value: Arc<turso::Error>) -> Self {
153 Self::Turso(value)
154 }
155}
156
157impl From<PatternError> for Error {
158 fn from(value: PatternError) -> Self {
159 Self::Pattern(Arc::new(value))
160 }
161}
162
163impl From<ExporterBuildError> for Error {
164 fn from(value: ExporterBuildError) -> Self {
165 Self::ExporterBuild(Arc::new(value))
166 }
167}
168
169impl From<SendError<CancelKind>> for Error {
170 fn from(value: SendError<CancelKind>) -> Self {
171 Self::Send(Arc::new(value))
172 }
173}
174
175#[cfg(feature = "postgres")]
176impl From<tokio_postgres::error::Error> for Error {
177 fn from(value: tokio_postgres::error::Error) -> Self {
178 Self::from(Arc::new(value))
179 }
180}
181
182#[cfg(feature = "postgres")]
183impl From<Arc<tokio_postgres::error::Error>> for Error {
184 fn from(value: Arc<tokio_postgres::error::Error>) -> Self {
185 Self::TokioPostgres(value)
186 }
187}
188
189impl From<hyper::http::Error> for Error {
190 fn from(value: hyper::http::Error) -> Self {
191 Self::Hyper(Arc::new(value))
192 }
193}
194
195impl From<JoinError> for Error {
196 fn from(value: JoinError) -> Self {
197 Self::Join(Arc::new(value))
198 }
199}
200
201impl From<serde_json::Error> for Error {
202 fn from(value: serde_json::Error) -> Self {
203 Self::from(Arc::new(value))
204 }
205}
206
207impl From<Arc<serde_json::Error>> for Error {
208 fn from(value: Arc<serde_json::Error>) -> Self {
209 Self::Json(value)
210 }
211}
212
213#[cfg(feature = "dynostore")]
214impl From<object_store::Error> for Error {
215 fn from(value: object_store::Error) -> Self {
216 Self::from(Arc::new(value))
217 }
218}
219
220#[cfg(feature = "dynostore")]
221impl From<Arc<object_store::Error>> for Error {
222 fn from(value: Arc<object_store::Error>) -> Self {
223 Self::ObjectStore(value)
224 }
225}
226
227impl From<ParseError> for Error {
228 fn from(value: ParseError) -> Self {
229 Self::ParseFilter(Arc::new(value))
230 }
231}
232
233#[cfg(feature = "postgres")]
234impl From<deadpool_postgres::PoolError> for Error {
235 fn from(value: deadpool_postgres::PoolError) -> Self {
236 Self::from(Arc::new(value))
237 }
238}
239
240#[cfg(feature = "postgres")]
241impl From<Arc<deadpool_postgres::PoolError>> for Error {
242 fn from(value: Arc<deadpool_postgres::PoolError>) -> Self {
243 Self::Pool(value)
244 }
245}
246
247impl From<tansu_schema::Error> for Error {
248 fn from(value: tansu_schema::Error) -> Self {
249 Self::SchemaRegistry(Arc::new(value))
250 }
251}
252
253impl From<io::Error> for Error {
254 fn from(value: io::Error) -> Self {
255 Self::Io(Arc::new(value))
256 }
257}
258
259impl<T> From<PoisonError<T>> for Error {
260 fn from(_value: PoisonError<T>) -> Self {
261 Self::Poison
262 }
263}
264
265impl From<ValidationError<'_>> for Error {
266 fn from(_value: ValidationError<'_>) -> Self {
267 Self::SchemaValidation
268 }
269}
270
271impl fmt::Display for Error {
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 write!(f, "{self:?}")
274 }
275}
276
277pub type Result<T, E = Error> = result::Result<T, E>;
278
279#[derive(Copy, Clone, Debug)]
280pub enum TracingFormat {
281 Text,
282 Json,
283}
284
285impl FromStr for TracingFormat {
286 type Err = Error;
287
288 fn from_str(s: &str) -> Result<Self, Self::Err> {
289 match s {
290 "text" => Ok(Self::Text),
291 "json" => Ok(Self::Json),
292 otherwise => Err(Error::UnsupportedTracingFormat(otherwise.to_owned())),
293 }
294 }
295}
296
297#[derive(Clone, Debug)]
298pub struct VarRep(HashMap<String, String>);
299
300impl From<HashMap<String, String>> for VarRep {
301 fn from(value: HashMap<String, String>) -> Self {
302 Self(value)
303 }
304}
305
306impl VarRep {
307 fn replace(&self, haystack: &str) -> Result<String> {
308 Regex::new(r"\$\{(?<var>[^\}]+)\}")
309 .map(|re| re.replace(haystack, self).into_owned())
310 .map_err(Into::into)
311 }
312}
313
314impl Replacer for &VarRep {
315 fn replace_append(&mut self, caps: ®ex::Captures<'_>, dst: &mut String) {
316 if let Some(variable) = caps.name("var")
317 && let Some(value) = self.0.get(variable.as_str())
318 {
319 dst.push_str(value);
320 }
321 }
322}
323
324#[derive(Clone, Debug)]
325pub struct EnvVarExp<T>(T);
326
327impl<T> EnvVarExp<T> {
328 pub fn into_inner(self) -> T {
329 self.0
330 }
331}
332
333impl<T> FromStr for EnvVarExp<T>
334where
335 T: FromStr,
336 Error: From<<T as FromStr>::Err>,
337{
338 type Err = Error;
339
340 fn from_str(s: &str) -> Result<Self, Self::Err> {
341 VarRep::from(vars().collect::<HashMap<_, _>>())
342 .replace(s)
343 .and_then(|s| T::from_str(&s).map_err(Into::into))
344 .map(|t| Self(t))
345 }
346}