tansu_cat/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu Cat
16//!
17//! Fetch or Produce (with validation when backed by a schema) messages to a topic
18
19use std::{fmt, io, result, sync::Arc};
20
21use consume::Consume;
22use produce::Produce;
23use tansu_sans_io::ErrorCode;
24use tokio_util::codec::LinesCodecError;
25
26mod consume;
27mod produce;
28
29pub type Result<T, E = Error> = result::Result<T, E>;
30
31#[derive(thiserror::Error, Debug)]
32pub enum Error {
33    Api(ErrorCode),
34    Io(Arc<io::Error>),
35    LinesCodec(#[from] LinesCodecError),
36    Protocol(#[from] tansu_sans_io::Error),
37    Schema(#[from] tansu_schema::Error),
38    SerdeJson(#[from] serde_json::Error),
39}
40
41impl From<io::Error> for Error {
42    fn from(value: io::Error) -> Self {
43        Self::Io(Arc::new(value))
44    }
45}
46
47impl fmt::Display for Error {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        write!(f, "{self:?}")
50    }
51}
52
53#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
54pub enum Cat {
55    Consume(Box<consume::Configuration>),
56    Produce(Box<produce::Configuration>),
57}
58
59impl Cat {
60    pub fn produce() -> produce::PhantomBuilder {
61        Builder::produce()
62    }
63
64    pub fn consume() -> consume::PhantomBuilder {
65        Builder::consume()
66    }
67
68    pub async fn main(self) -> Result<ErrorCode> {
69        match self {
70            Self::Produce(configuration) => Produce::try_from(*configuration)?.main().await,
71            Self::Consume(configuration) => Consume::try_from(*configuration)?.main().await,
72        }
73    }
74}
75
76#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
77pub struct Builder;
78
79impl Builder {
80    pub fn produce() -> produce::PhantomBuilder {
81        produce::Builder::default()
82    }
83
84    pub fn consume() -> consume::PhantomBuilder {
85        consume::Builder::default()
86    }
87}