Skip to main content

tansu_cat/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu Cat
16//!
17//! Fetch or Produce (with validation when backed by a schema) messages to a topic
18
19use std::{fmt, io, result, sync::Arc};
20
21use consume::Consume;
22use produce::Produce;
23use tansu_sans_io::ErrorCode;
24use tokio_util::codec::LinesCodecError;
25
26mod consume;
27mod produce;
28
29pub type Result<T, E = Error> = result::Result<T, E>;
30
31#[derive(thiserror::Error, Debug)]
32pub enum Error {
33    Api(ErrorCode),
34    Client(#[from] tansu_client::Error),
35    Io(Arc<io::Error>),
36    LinesCodec(#[from] LinesCodecError),
37    Protocol(#[from] tansu_sans_io::Error),
38    Schema(#[from] tansu_schema::Error),
39    SerdeJson(#[from] serde_json::Error),
40}
41
42impl From<io::Error> for Error {
43    fn from(value: io::Error) -> Self {
44        Self::Io(Arc::new(value))
45    }
46}
47
48impl fmt::Display for Error {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        write!(f, "{self:?}")
51    }
52}
53
54#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
55pub enum Cat {
56    Consume(Box<consume::Configuration>),
57    Produce(Box<produce::Configuration>),
58}
59
60impl Cat {
61    pub fn produce() -> produce::PhantomBuilder {
62        Builder::produce()
63    }
64
65    pub fn consume() -> consume::PhantomBuilder {
66        Builder::consume()
67    }
68
69    pub async fn main(self) -> Result<ErrorCode> {
70        match self {
71            Self::Produce(configuration) => Produce::try_from(*configuration)?.main().await,
72            Self::Consume(configuration) => Consume::try_from(*configuration)?.main().await,
73        }
74    }
75}
76
77#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
78pub struct Builder;
79
80impl Builder {
81    pub fn produce() -> produce::PhantomBuilder {
82        produce::Builder::default()
83    }
84
85    pub fn consume() -> consume::PhantomBuilder {
86        consume::Builder::default()
87    }
88}