crabka-client-streams 0.3.6

KIP-1071 Kafka Streams rebalance-protocol client for Apache Kafka in Rust
Documentation
//! [`StreamsApp`] — one component that owns the schema-registry lifecycle and
//! the managed [`KafkaStreams`] runtime, so applications don't hand-wire the
//! cache, `set_default_registry`, pre-warm, and `KafkaStreams::builder()`.
//!
//! Build it **first** — that installs the process default registry — then build a
//! topology against it and `run` it. The registry must be installed before the
//! topology is constructed, because the default Avro/Protobuf/JSON serdes read it
//! when the DSL (or [`Topology`](crate::Topology)) builds them.
//!
//! ```no_run
//! use apache_avro::AvroSchema;
//! use crabka_client_streams::{DefaultSerde, SchemaSerde, StreamsApp};
//! use crabka_schema_serde::format::avro::AvroSerde;
//! use serde::{Deserialize, Serialize};
//!
//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
//! struct Order {
//!     id: String,
//!     total: f64,
//! }
//! impl DefaultSerde for Order {
//!     type Serde = SchemaSerde<Order, AvroSerde<Order>>;
//! }
//!
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! let app = StreamsApp::builder()
//!     .bootstrap("127.0.0.1:9092")
//!     .application_id("orders")
//!     .schema_registry("http://127.0.0.1:8081")
//!     .build();
//!
//! let topology = app.streams_builder();
//! topology
//!     .stream::<String, Order>(["orders"])
//!     .map_values(|o: &Order| Order {
//!         id: o.id.clone(),
//!         total: o.total * 2.0,
//!     })
//!     .to("orders-doubled");
//!
//! let mut streams = app.run(topology).await?;
//! streams.close().await?;
//! # Ok(()) }
//! ```

use std::sync::Arc;

use crabka_schema_serde::cache::{CacheConfig, SchemaCache};
use crabka_schema_serde::{RegistryClient, set_default_registry};

use crate::dsl::StreamsBuilder;
use crate::error::StreamsClientError;
use crate::runtime::KafkaStreams;
use crate::runtime::eos::ProcessingGuarantee;
use crate::store::StoreBackend;
use crate::topology::BuiltTopology;

/// Owns the schema-registry lifecycle (cache + process default + pre-warm) and
/// the [`KafkaStreams`] wiring for a streams application. Construct via
/// [`StreamsApp::builder`].
pub struct StreamsApp {
    bootstrap: String,
    application_id: String,
    cache: Arc<SchemaCache>,
    store_backend: StoreBackend,
    processing_guarantee: ProcessingGuarantee,
    cache_max_bytes: i64,
}

#[bon::bon]
impl StreamsApp {
    /// Configure the app and **install the process default schema registry**.
    ///
    /// Installs the registry as a side effect of `build`, so it is ready before
    /// the topology (and its default serdes) are constructed.
    // `build()` installs the process default registry as a side effect, so it's
    // not a pure value-returning constructor.
    #[allow(clippy::must_use_candidate)]
    #[builder(start_fn = builder, finish_fn = build)]
    pub fn new(
        #[builder(into)] bootstrap: String,
        #[builder(into)] application_id: String,
        /// Schema Registry base URL, e.g. `http://localhost:8081`.
        #[builder(into)]
        schema_registry: String,
        /// Registry behavior (auto-register vs lookup); defaults to auto-register.
        cache_config: Option<CacheConfig>,
        #[builder(default)] store_backend: StoreBackend,
        #[builder(default)] processing_guarantee: ProcessingGuarantee,
        /// Record-cache budget (JVM `statestore.cache.max.bytes`); `0` disables
        /// caching. Defaults to 10 MiB, matching the JVM default.
        #[builder(default = 10_485_760)]
        cache_max_bytes: i64,
    ) -> Self {
        let cache = SchemaCache::new(
            RegistryClient::new(schema_registry),
            cache_config.unwrap_or_default(),
        );
        set_default_registry(cache.clone());
        Self {
            bootstrap,
            application_id,
            cache,
            store_backend,
            processing_guarantee,
            cache_max_bytes,
        }
    }
}

impl StreamsApp {
    /// A fresh DSL builder; the schema registry is already installed.
    #[must_use]
    pub fn streams_builder(&self) -> StreamsBuilder {
        StreamsBuilder::new()
    }

    /// The configured application id — pass it to [`Topology::build`] if you wire
    /// a Processor-API topology by hand and feed it to [`run_built`](Self::run_built).
    ///
    /// [`Topology::build`]: crate::Topology::build
    #[must_use]
    pub fn application_id(&self) -> &str {
        &self.application_id
    }

    /// Lower the DSL topology, pre-warm its schema subjects, and start the app.
    pub async fn run(self, builder: StreamsBuilder) -> Result<KafkaStreams, StreamsClientError> {
        let built = builder.build(&self.application_id)?;
        self.run_built(built).await
    }

    /// Like [`run`](Self::run), but for an already-built topology (Processor API).
    pub async fn run_built(
        self,
        topology: BuiltTopology,
    ) -> Result<KafkaStreams, StreamsClientError> {
        self.cache
            .prewarm()
            .await
            .map_err(|e| StreamsClientError::Runtime(e.to_string()))?;
        KafkaStreams::builder()
            .bootstrap(self.bootstrap)
            .application_id(self.application_id)
            .topology(topology)
            .store_backend(self.store_backend)
            .processing_guarantee(self.processing_guarantee)
            .cache_max_bytes(self.cache_max_bytes)
            .build()
            .await
    }
}