use assert2::assert;
use std::path::Path;
use std::process::{Command, Stdio};
use crabka_log::{Log, LogConfig};
use tempfile::tempdir;
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::kafka::{KAFKA_PORT, Kafka};
const TOPIC: &str = "crabka-log-itest";
fn docker_exec(container_id: &str, args: &[&str]) -> std::process::Output {
let mut cmd = Command::new("docker");
cmd.arg("exec").arg(container_id).args(args);
let out = cmd
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.output()
.expect("spawn docker exec");
assert!(
out.status.success(),
"docker exec {args:?} failed: stdout={}, stderr={}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
out
}
fn docker_exec_stdin(container_id: &str, args: &[&str], stdin: &[u8]) {
use std::io::Write;
let mut child = Command::new("docker")
.arg("exec")
.arg("-i")
.arg(container_id)
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn docker exec");
child
.stdin
.as_mut()
.expect("stdin piped")
.write_all(stdin)
.expect("write stdin");
drop(child.stdin.take());
let out = child.wait_with_output().expect("wait docker exec");
assert!(
out.status.success(),
"docker exec -i {args:?} failed: stdout={}, stderr={}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
}
fn docker_cp(container_id: &str, src: &str, dst: &Path) {
let out = Command::new("docker")
.arg("cp")
.arg(format!("{container_id}:{src}"))
.arg(dst)
.output()
.expect("spawn docker cp");
assert!(
out.status.success(),
"docker cp {src} -> {} failed: stdout={}, stderr={}",
dst.display(),
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
}
#[tokio::test]
#[ignore = "requires Docker"]
async fn read_jvm_produced_log_dir() {
let kafka = Kafka::default()
.start()
.await
.expect("start kafka container");
let container_id = kafka.id().to_string();
let _host_port = kafka
.get_host_port_ipv4(KAFKA_PORT)
.await
.expect("get host port");
let bootstrap = "localhost:9092";
docker_exec(
&container_id,
&[
"kafka-topics",
"--create",
"--if-not-exists",
"--topic",
TOPIC,
"--partitions",
"1",
"--replication-factor",
"1",
"--bootstrap-server",
bootstrap,
],
);
let stdin = b"k1:v1\nk2:v2\nk3:v3\n";
docker_exec_stdin(
&container_id,
&[
"kafka-console-producer",
"--bootstrap-server",
bootstrap,
"--topic",
TOPIC,
"--property",
"parse.key=true",
"--property",
"key.separator=:",
],
stdin,
);
let partition_dir = format!("/var/lib/kafka/data/{TOPIC}-0");
docker_exec(&container_id, &["ls", "-la", &partition_dir]);
let host_tmp = tempdir().expect("tempdir");
let host_target = host_tmp.path().join(format!("{TOPIC}-0"));
docker_cp(&container_id, &partition_dir, host_tmp.path());
assert!(
host_target.exists(),
"expected docker cp to produce {}",
host_target.display()
);
let log = Log::open(&host_target, LogConfig::default()).expect("open log");
let out = log
.read(log.log_start_offset(), usize::MAX)
.expect("read log");
assert!(
!out.batches.is_empty(),
"expected at least one batch in JVM-produced log; got 0"
);
let total_records: usize = out.batches.iter().map(|b| b.records.len()).sum();
assert!(
total_records >= 3,
"expected at least 3 records (produced 3); got {total_records}"
);
}