use protolib::{Input, Key};
use rdkafka::{
ClientConfig,
producer::{FutureProducer, FutureRecord},
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub const KAFKA_SERVERS: &str = "localhost:9001,localhost:9002,localhost:9003";
#[tokio::main]
async fn main() {
let mut handles = vec![];
for tenant in 0..100 {
let h = tokio::spawn(async move {
for n in 0..20 {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", KAFKA_SERVERS)
.create()
.unwrap();
for topic in ["test-input"] {
let tot = 2000;
for i in 0..tot {
let k = Key {
version: 1,
tenant,
item: n,
hash: i,
};
let v = Input {
something_happened: "pinged".to_string(),
another_thing_maybe_interesting: "maybe".to_string(),
this_maybe_not_that_interesting: "nah".to_string(),
};
producer
.send(
FutureRecord::to(topic)
.key(&Into::<Vec<u8>>::into(k))
.payload(&Into::<Vec<u8>>::into(v))
.timestamp(now()),
Duration::from_secs(0),
)
.await
.unwrap();
println!("-- {k}");
}
}
}
});
handles.push(h);
}
for h in handles {
h.await.unwrap();
}
}
fn now() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap()
}