use std::{collections::BTreeMap, sync::LazyLock};
use chrono::{TimeZone, Utc};
use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact};
use rskafka::{
client::partition::Compression,
record::{Record, RecordAndOffset},
};
#[macro_export]
macro_rules! maybe_skip_java_interopt {
() => {{
use std::env;
dotenvy::dotenv().ok();
if env::var("TEST_JAVA_INTEROPT").is_err() {
return;
}
}};
}
pub async fn produce(
connection: &[String],
records: Vec<(String, i32, Record)>,
compression: Compression,
) -> Vec<i64> {
let jvm = setup_jvm();
let compression = match compression {
Compression::NoCompression => "none",
#[cfg(feature = "compression-gzip")]
Compression::Gzip => "gzip",
#[cfg(feature = "compression-lz4")]
Compression::Lz4 => "lz4",
#[cfg(feature = "compression-snappy")]
Compression::Snappy => "snappy",
#[cfg(feature = "compression-zstd")]
Compression::Zstd => "zstd",
};
let props = create_properties(
&jvm,
&[
("bootstrap.servers", &connection.join(",")),
(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer",
),
(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer",
),
("linger.ms", "100000"),
("compression.type", compression),
],
);
let producer = jvm
.create_instance(
"org.apache.kafka.clients.producer.KafkaProducer",
&[InvocationArg::from(props)],
)
.expect("creating KafkaProducer");
let mut futures = vec![];
for (topic_name, partition_index, record) in records {
let ts = record.timestamp.timestamp_millis();
let k = String::from_utf8(record.key.unwrap()).unwrap();
let v = String::from_utf8(record.value.unwrap()).unwrap();
let headers = jvm
.create_instance(
"org.apache.kafka.common.header.internals.RecordHeaders",
InvocationArg::empty(),
)
.expect("creating KafkaProducer");
for (k, v) in record.headers {
jvm.invoke(
&headers,
"add",
&[
InvocationArg::try_from(k).expect("key arg"),
InvocationArg::from(to_java_bytes(&jvm, &v)),
],
)
.expect("add header");
}
let jvm_record = jvm
.create_instance(
"org.apache.kafka.clients.producer.ProducerRecord",
&[
InvocationArg::try_from(topic_name).expect("topic arg"),
InvocationArg::try_from(partition_index).expect("partition arg"),
InvocationArg::try_from(ts).expect("ts arg"),
InvocationArg::try_from(k).expect("key arg"),
InvocationArg::try_from(v).expect("value arg"),
InvocationArg::from(headers),
],
)
.expect("creating KafkaProducer");
let fut = jvm
.invoke(&producer, "send", &[InvocationArg::from(jvm_record)])
.expect("flush");
futures.push(fut);
}
jvm.invoke(&producer, "flush", InvocationArg::empty())
.expect("flush");
let mut offsets = vec![];
for fut in futures {
let record_metadata = jvm
.invoke(&fut, "get", InvocationArg::empty())
.expect("polling future");
let record_metadata = jvm
.cast(
&record_metadata,
"org.apache.kafka.clients.producer.RecordMetadata",
)
.expect("cast to RecordMetadata");
let offset = jvm
.invoke(&record_metadata, "offset", InvocationArg::empty())
.expect("getting offset");
let offset: i64 = jvm.to_rust(offset).expect("converting offset to Rust");
offsets.push(offset);
}
jvm.invoke(&producer, "close", InvocationArg::empty())
.expect("close");
offsets
}
pub async fn consume(
connection: &[String],
topic_name: &str,
partition_index: i32,
n: usize,
) -> Vec<RecordAndOffset> {
let jvm = setup_jvm();
let props = create_properties(
&jvm,
&[
("bootstrap.servers", &connection.join(",")),
(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer",
),
(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer",
),
("auto.offset.reset", "earliest"),
("enable.auto.commit", "false"),
],
);
let consumer = jvm
.create_instance(
"org.apache.kafka.clients.consumer.KafkaConsumer",
&[InvocationArg::from(props)],
)
.expect("creating KafkaConsumer");
let topic_partition = jvm
.create_instance(
"org.apache.kafka.common.TopicPartition",
&[
InvocationArg::try_from(topic_name).expect("topic arg"),
InvocationArg::try_from(partition_index)
.expect("partition arg")
.into_primitive()
.expect("partition arg to int"),
],
)
.expect("creating TopicPartition");
let partitions = jvm
.java_list(
"org.apache.kafka.common.TopicPartition",
vec![Ok(InvocationArg::from(topic_partition))],
)
.expect("creating partitions array");
jvm.invoke(&consumer, "assign", &[InvocationArg::from(partitions)])
.expect("assign");
let mut results = vec![];
while results.len() < n {
let consumer_records = jvm
.invoke(
&consumer,
"poll",
&[InvocationArg::try_from(1_000i64)
.expect("timeout arg")
.into_primitive()
.expect("timeout into primitive")],
)
.expect("poll");
let it = jvm
.invoke(&consumer_records, "iterator", InvocationArg::empty())
.expect("iterator");
for consumer_record in JavaIterator::new(&jvm, it) {
let consumer_record = jvm
.cast(
&consumer_record,
"org.apache.kafka.clients.consumer.ConsumerRecord",
)
.expect("cast to ConsumerRecord");
let key = jvm
.invoke(&consumer_record, "key", InvocationArg::empty())
.expect("key");
let key: String = jvm.to_rust(key).expect("key to Rust");
let offset = jvm
.invoke(&consumer_record, "offset", InvocationArg::empty())
.expect("offset");
let offset: i64 = jvm.to_rust(offset).expect("offset to Rust");
let timestamp = jvm
.invoke(&consumer_record, "timestamp", InvocationArg::empty())
.expect("timestamp");
let timestamp: i64 = jvm.to_rust(timestamp).expect("timestamp to Rust");
let value = jvm
.invoke(&consumer_record, "value", InvocationArg::empty())
.expect("value");
let value: String = jvm.to_rust(value).expect("value to Rust");
let headers = jvm
.invoke(&consumer_record, "headers", InvocationArg::empty())
.expect("headers");
let headers = jvm
.invoke(&headers, "toArray", InvocationArg::empty())
.expect("toArray");
let headers = jvm
.invoke_static(
"java.util.Arrays",
"asList",
&[InvocationArg::from(headers)],
)
.expect("headers asList");
let headers_it = jvm
.invoke(&headers, "iterator", InvocationArg::empty())
.expect("iterator");
let mut headers = BTreeMap::new();
for header in JavaIterator::new(&jvm, headers_it) {
let header = jvm
.cast(&header, "org.apache.kafka.common.header.Header")
.expect("cast to Header");
let key = jvm
.invoke(&header, "key", InvocationArg::empty())
.expect("key");
let key: String = jvm.to_rust(key).expect("key to Rust");
let value = jvm
.invoke(&header, "value", InvocationArg::empty())
.expect("value");
let value = from_java_bytes(&jvm, value);
headers.insert(key, value);
}
let record = Record {
key: Some(key.as_bytes().to_vec()),
value: Some(value.as_bytes().to_vec()),
headers,
timestamp: Utc.timestamp_millis_opt(timestamp).unwrap(),
};
let record_and_offset = RecordAndOffset { record, offset };
results.push(record_and_offset);
}
}
jvm.invoke(&consumer, "close", InvocationArg::empty())
.expect("close");
results
}
static JVM_SETUP: LazyLock<()> = LazyLock::new(|| {
let jvm_installation = JvmBuilder::new().build().expect("setup JVM");
for artifact_name in [
"org.apache.kafka:kafka-clients:3.5.0",
"org.apache.commons:commons-lang3:3.12.0",
"org.lz4:lz4-java:1.8.0",
"org.xerial.snappy:snappy-java:1.1.10.2",
"com.github.luben:zstd-jni:1.5.5-5",
"org.slf4j:slf4j-api:2.0.7",
] {
let artifact = MavenArtifact::from(artifact_name);
jvm_installation
.deploy_artifact(&artifact)
.unwrap_or_else(|_| panic!("Artifact deployment failed ({artifact_name})"));
}
});
fn setup_jvm() -> Jvm {
LazyLock::force(&JVM_SETUP);
let jvm = JvmBuilder::new().build().expect("setup JVM");
jvm
}
fn create_properties(jvm: &Jvm, properties: &[(&str, &str)]) -> Instance {
let props = jvm
.create_instance("java.util.Properties", InvocationArg::empty())
.expect("creating Properties");
for (k, v) in properties {
jvm.invoke(
&props,
"put",
&[
InvocationArg::try_from(*k).expect("convert str to java"),
InvocationArg::try_from(*v).expect("convert str to java"),
],
)
.expect("put property");
}
props
}
fn to_java_bytes(jvm: &Jvm, bytes: &[u8]) -> Instance {
let mut args = vec![];
for b in bytes {
args.push(
InvocationArg::try_from(*b as i8)
.expect("byte arg")
.into_primitive()
.expect("to byte primitive"),
);
}
jvm.create_java_array("byte", &args).expect("create array")
}
fn from_java_bytes(jvm: &Jvm, bytes: Instance) -> Vec<u8> {
let bytes = jvm
.invoke_static(
"org.apache.commons.lang3.ArrayUtils",
"toObject",
&[InvocationArg::from(bytes)],
)
.expect("toObject");
let bytes = jvm
.invoke_static("java.util.Arrays", "asList", &[InvocationArg::from(bytes)])
.expect("bytes asList");
let it = jvm
.invoke(&bytes, "iterator", InvocationArg::empty())
.expect("iterator");
let mut res = vec![];
for byte in JavaIterator::new(jvm, it) {
let byte: i8 = jvm.to_rust(byte).expect("byte to Rust");
res.push(byte as u8);
}
res
}
struct JavaIterator<'a> {
jvm: &'a Jvm,
it: Instance,
}
impl<'a> JavaIterator<'a> {
fn new(jvm: &'a Jvm, it: Instance) -> Self {
Self { jvm, it }
}
}
impl Iterator for JavaIterator<'_> {
type Item = Instance;
fn next(&mut self) -> Option<Self::Item> {
let has_next = self
.jvm
.invoke(&self.it, "hasNext", InvocationArg::empty())
.expect("hasNext");
let has_next: bool = self.jvm.to_rust(has_next).expect("hasNext to Rust");
if has_next {
Some(
self.jvm
.invoke(&self.it, "next", InvocationArg::empty())
.expect("next"),
)
} else {
None
}
}
}