Skip to main content

tansu_broker/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    env::vars,
18    fmt, io,
19    net::AddrParseError,
20    num::TryFromIntError,
21    result,
22    str::{FromStr, Utf8Error},
23    string::FromUtf8Error,
24    sync::{Arc, LazyLock, PoisonError},
25    time::Duration,
26};
27
28use glob::PatternError;
29use jsonschema::ValidationError;
30use opentelemetry::{InstrumentationScope, global, metrics::Meter};
31use opentelemetry_otlp::ExporterBuildError;
32use opentelemetry_semantic_conventions::SCHEMA_URL;
33use regex::{Regex, Replacer};
34use tansu_sans_io::ErrorCode;
35use thiserror::Error;
36use tokio::{sync::broadcast::error::SendError, task::JoinError};
37use tracing_subscriber::filter::ParseError;
38use url::Url;
39
40pub mod broker;
41pub mod coordinator;
42pub mod otel;
43pub mod service;
44
45#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
46pub enum CancelKind {
47    Interrupt,
48    Terminate,
49}
50
51impl From<CancelKind> for Duration {
52    fn from(cancellation: CancelKind) -> Self {
53        Duration::from_millis(match cancellation {
54            CancelKind::Interrupt => 0,
55            CancelKind::Terminate => 5_000,
56        })
57    }
58}
59
60pub const NODE_ID: i32 = 111;
61
62pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
63    global::meter_with_scope(
64        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
65            .with_version(env!("CARGO_PKG_VERSION"))
66            .with_schema_url(SCHEMA_URL)
67            .build(),
68    )
69});
70
71#[derive(Clone, Debug, Error)]
72pub enum Error {
73    AddrParse(#[from] AddrParseError),
74    Api(ErrorCode),
75    Auth(#[from] tansu_auth::Error),
76    Custom(String),
77    DuplicateApiService(i16),
78    EmptyCoordinatorWrapper,
79    EmptyJoinGroupRequestProtocol,
80    ExpectedJoinGroupRequestProtocol(&'static str),
81    ExporterBuild(Arc<ExporterBuildError>),
82
83    Hyper(Arc<hyper::http::Error>),
84    Io(Arc<io::Error>),
85    Join(Arc<JoinError>),
86    Json(Arc<serde_json::Error>),
87    KafkaProtocol(#[from] tansu_sans_io::Error),
88
89    #[cfg(feature = "libsql")]
90    LibSql(Arc<libsql::Error>),
91
92    Message(String),
93    Model(#[from] tansu_model::Error),
94
95    #[cfg(feature = "dynostore")]
96    ObjectStore(Arc<object_store::Error>),
97
98    ParseFilter(Arc<ParseError>),
99    ParseInt(#[from] std::num::ParseIntError),
100    Pattern(Arc<PatternError>),
101    Poison,
102
103    #[cfg(feature = "postgres")]
104    Pool(Arc<deadpool_postgres::PoolError>),
105
106    SchemaRegistry(Arc<tansu_schema::Error>),
107    Service(#[from] tansu_service::Error),
108    Storage(#[from] tansu_storage::Error),
109    StringUtf8(#[from] FromUtf8Error),
110    Regex(#[from] regex::Error),
111
112    #[cfg(feature = "postgres")]
113    TokioPostgres(Arc<tokio_postgres::error::Error>),
114    TryFromInt(#[from] TryFromIntError),
115
116    #[cfg(feature = "turso")]
117    Turso(Arc<turso::Error>),
118
119    UnsupportedApiService(i16),
120    UnsupportedStorageUrl(Url),
121    UnsupportedTracingFormat(String),
122    Url(#[from] url::ParseError),
123    Utf8(#[from] Utf8Error),
124    Uuid(#[from] uuid::Error),
125    SchemaValidation,
126    Send(Arc<SendError<CancelKind>>),
127}
128
129#[cfg(feature = "libsql")]
130impl From<libsql::Error> for Error {
131    fn from(value: libsql::Error) -> Self {
132        Self::from(Arc::new(value))
133    }
134}
135
136#[cfg(feature = "libsql")]
137impl From<Arc<libsql::Error>> for Error {
138    fn from(value: Arc<libsql::Error>) -> Self {
139        Self::LibSql(value)
140    }
141}
142
143#[cfg(feature = "turso")]
144impl From<turso::Error> for Error {
145    fn from(value: turso::Error) -> Self {
146        Self::from(Arc::new(value))
147    }
148}
149
150#[cfg(feature = "turso")]
151impl From<Arc<turso::Error>> for Error {
152    fn from(value: Arc<turso::Error>) -> Self {
153        Self::Turso(value)
154    }
155}
156
157impl From<PatternError> for Error {
158    fn from(value: PatternError) -> Self {
159        Self::Pattern(Arc::new(value))
160    }
161}
162
163impl From<ExporterBuildError> for Error {
164    fn from(value: ExporterBuildError) -> Self {
165        Self::ExporterBuild(Arc::new(value))
166    }
167}
168
169impl From<SendError<CancelKind>> for Error {
170    fn from(value: SendError<CancelKind>) -> Self {
171        Self::Send(Arc::new(value))
172    }
173}
174
175#[cfg(feature = "postgres")]
176impl From<tokio_postgres::error::Error> for Error {
177    fn from(value: tokio_postgres::error::Error) -> Self {
178        Self::from(Arc::new(value))
179    }
180}
181
182#[cfg(feature = "postgres")]
183impl From<Arc<tokio_postgres::error::Error>> for Error {
184    fn from(value: Arc<tokio_postgres::error::Error>) -> Self {
185        Self::TokioPostgres(value)
186    }
187}
188
189impl From<hyper::http::Error> for Error {
190    fn from(value: hyper::http::Error) -> Self {
191        Self::Hyper(Arc::new(value))
192    }
193}
194
195impl From<JoinError> for Error {
196    fn from(value: JoinError) -> Self {
197        Self::Join(Arc::new(value))
198    }
199}
200
201impl From<serde_json::Error> for Error {
202    fn from(value: serde_json::Error) -> Self {
203        Self::from(Arc::new(value))
204    }
205}
206
207impl From<Arc<serde_json::Error>> for Error {
208    fn from(value: Arc<serde_json::Error>) -> Self {
209        Self::Json(value)
210    }
211}
212
213#[cfg(feature = "dynostore")]
214impl From<object_store::Error> for Error {
215    fn from(value: object_store::Error) -> Self {
216        Self::from(Arc::new(value))
217    }
218}
219
220#[cfg(feature = "dynostore")]
221impl From<Arc<object_store::Error>> for Error {
222    fn from(value: Arc<object_store::Error>) -> Self {
223        Self::ObjectStore(value)
224    }
225}
226
227impl From<ParseError> for Error {
228    fn from(value: ParseError) -> Self {
229        Self::ParseFilter(Arc::new(value))
230    }
231}
232
233#[cfg(feature = "postgres")]
234impl From<deadpool_postgres::PoolError> for Error {
235    fn from(value: deadpool_postgres::PoolError) -> Self {
236        Self::from(Arc::new(value))
237    }
238}
239
240#[cfg(feature = "postgres")]
241impl From<Arc<deadpool_postgres::PoolError>> for Error {
242    fn from(value: Arc<deadpool_postgres::PoolError>) -> Self {
243        Self::Pool(value)
244    }
245}
246
247impl From<tansu_schema::Error> for Error {
248    fn from(value: tansu_schema::Error) -> Self {
249        Self::SchemaRegistry(Arc::new(value))
250    }
251}
252
253impl From<io::Error> for Error {
254    fn from(value: io::Error) -> Self {
255        Self::Io(Arc::new(value))
256    }
257}
258
259impl<T> From<PoisonError<T>> for Error {
260    fn from(_value: PoisonError<T>) -> Self {
261        Self::Poison
262    }
263}
264
265impl From<ValidationError<'_>> for Error {
266    fn from(_value: ValidationError<'_>) -> Self {
267        Self::SchemaValidation
268    }
269}
270
271impl fmt::Display for Error {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        write!(f, "{self:?}")
274    }
275}
276
277pub type Result<T, E = Error> = result::Result<T, E>;
278
279#[derive(Copy, Clone, Debug)]
280pub enum TracingFormat {
281    Text,
282    Json,
283}
284
285impl FromStr for TracingFormat {
286    type Err = Error;
287
288    fn from_str(s: &str) -> Result<Self, Self::Err> {
289        match s {
290            "text" => Ok(Self::Text),
291            "json" => Ok(Self::Json),
292            otherwise => Err(Error::UnsupportedTracingFormat(otherwise.to_owned())),
293        }
294    }
295}
296
297#[derive(Clone, Debug)]
298pub struct VarRep(HashMap<String, String>);
299
300impl From<HashMap<String, String>> for VarRep {
301    fn from(value: HashMap<String, String>) -> Self {
302        Self(value)
303    }
304}
305
306impl VarRep {
307    fn replace(&self, haystack: &str) -> Result<String> {
308        Regex::new(r"\$\{(?<var>[^\}]+)\}")
309            .map(|re| re.replace(haystack, self).into_owned())
310            .map_err(Into::into)
311    }
312}
313
314impl Replacer for &VarRep {
315    fn replace_append(&mut self, caps: &regex::Captures<'_>, dst: &mut String) {
316        if let Some(variable) = caps.name("var")
317            && let Some(value) = self.0.get(variable.as_str())
318        {
319            dst.push_str(value);
320        }
321    }
322}
323
324#[derive(Clone, Debug)]
325pub struct EnvVarExp<T>(T);
326
327impl<T> EnvVarExp<T> {
328    pub fn into_inner(self) -> T {
329        self.0
330    }
331}
332
333impl<T> FromStr for EnvVarExp<T>
334where
335    T: FromStr,
336    Error: From<<T as FromStr>::Err>,
337{
338    type Err = Error;
339
340    fn from_str(s: &str) -> Result<Self, Self::Err> {
341        VarRep::from(vars().collect::<HashMap<_, _>>())
342            .replace(s)
343            .and_then(|s| T::from_str(&s).map_err(Into::into))
344            .map(|t| Self(t))
345    }
346}