use std::sync::Arc;
use crabka_schema_serde::cache::{CacheConfig, SchemaCache};
use crabka_schema_serde::{RegistryClient, set_default_registry};
use crate::dsl::StreamsBuilder;
use crate::error::StreamsClientError;
use crate::runtime::KafkaStreams;
use crate::runtime::eos::ProcessingGuarantee;
use crate::store::StoreBackend;
use crate::topology::BuiltTopology;
pub struct StreamsApp {
bootstrap: String,
application_id: String,
cache: Arc<SchemaCache>,
store_backend: StoreBackend,
processing_guarantee: ProcessingGuarantee,
cache_max_bytes: i64,
}
#[bon::bon]
impl StreamsApp {
#[allow(clippy::must_use_candidate)]
#[builder(start_fn = builder, finish_fn = build)]
pub fn new(
#[builder(into)] bootstrap: String,
#[builder(into)] application_id: String,
#[builder(into)]
schema_registry: String,
cache_config: Option<CacheConfig>,
#[builder(default)] store_backend: StoreBackend,
#[builder(default)] processing_guarantee: ProcessingGuarantee,
#[builder(default = 10_485_760)]
cache_max_bytes: i64,
) -> Self {
let cache = SchemaCache::new(
RegistryClient::new(schema_registry),
cache_config.unwrap_or_default(),
);
set_default_registry(cache.clone());
Self {
bootstrap,
application_id,
cache,
store_backend,
processing_guarantee,
cache_max_bytes,
}
}
}
impl StreamsApp {
#[must_use]
pub fn streams_builder(&self) -> StreamsBuilder {
StreamsBuilder::new()
}
#[must_use]
pub fn application_id(&self) -> &str {
&self.application_id
}
pub async fn run(self, builder: StreamsBuilder) -> Result<KafkaStreams, StreamsClientError> {
let built = builder.build(&self.application_id)?;
self.run_built(built).await
}
pub async fn run_built(
self,
topology: BuiltTopology,
) -> Result<KafkaStreams, StreamsClientError> {
self.cache
.prewarm()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))?;
KafkaStreams::builder()
.bootstrap(self.bootstrap)
.application_id(self.application_id)
.topology(topology)
.store_backend(self.store_backend)
.processing_guarantee(self.processing_guarantee)
.cache_max_bytes(self.cache_max_bytes)
.build()
.await
}
}