streex 0.1.12

Kafka store runner
Documentation
use protolib::{Input, Key};
use rdkafka::{
    ClientConfig,
    producer::{FutureProducer, FutureRecord},
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub const KAFKA_SERVERS: &str = "localhost:9001,localhost:9002,localhost:9003";

#[tokio::main]
async fn main() {
    let mut handles = vec![];
    for tenant in 0..100 {
        let h = tokio::spawn(async move {
            for n in 0..20 {
                let producer: FutureProducer = ClientConfig::new()
                    .set("bootstrap.servers", KAFKA_SERVERS)
                    .create()
                    .unwrap();
                for topic in ["test-input"] {
                    let tot = 2000;
                    for i in 0..tot {
                        let k = Key {
                            version: 1,
                            tenant,
                            item: n,
                            hash: i,
                        };
                        let v = Input {
                            something_happened: "pinged".to_string(),
                            another_thing_maybe_interesting: "maybe".to_string(),
                            this_maybe_not_that_interesting: "nah".to_string(),
                        };
                        producer
                            .send(
                                FutureRecord::to(topic)
                                    .key(&Into::<Vec<u8>>::into(k))
                                    .payload(&Into::<Vec<u8>>::into(v))
                                    .timestamp(now()),
                                Duration::from_secs(0),
                            )
                            .await
                            .unwrap();
                        println!("-- {k}");
                    }
                }
            }
        });
        handles.push(h);
    }
    for h in handles {
        h.await.unwrap();
    }
}

fn now() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_millis()
        .try_into()
        .unwrap()
}