use std::time::Duration;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use tracing::{info, Level};
use rustainers::compose::images::Redpanda;
use rustainers::runner::Runner;
mod common;
pub use self::common::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_tracing(Level::DEBUG);
let runner = Runner::auto()?;
let image = Redpanda::build_single().await?;
let containers = runner.compose_start(image).await?;
info!("Now I can use {containers}");
do_something_with_kafka(&containers).await?;
Ok(())
}
async fn do_something_with_kafka(image: &Redpanda) -> anyhow::Result<()> {
let topic = "plop";
let broker_address = image.broker_address().await?;
info!("Using kafka broker: {broker_address}");
let schema_registry_url = image.schema_registry_endpoint().await?;
info!("Using schema registry: {schema_registry_url}");
let mut config = ClientConfig::new();
config.set("bootstrap.servers", broker_address);
config.set("message.timeout.ms", "5000");
info!("Config {config:#?}");
let producer: &FutureProducer = &config.create()?;
let record = FutureRecord::to(topic).payload("plop").key(&());
let sent = producer
.send(record, Duration::from_secs(0))
.await
.map_err(|(err, _)| err)?;
info!("✉️ {sent:?}");
Ok(())
}