#![allow(clippy::expect_used, clippy::panic)]
use super::*;
use std::sync::Arc;
use std::time::Duration;
use krafka::consumer::Consumer;
use osproxy_core::{ClusterId, Epoch, IndexName, Target};
use osproxy_engine::WriteQueue;
use osproxy_kafka_krafka::KrafkaProducer;
use osproxy_sink::{DocOp, WriteBatch, WriteOp};
use prost::Message;
use testcontainers_modules::kafka::apache::{Kafka, KAFKA_PORT};
use testcontainers_modules::testcontainers::runners::AsyncRunner;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "needs a Docker daemon"]
async fn op_envelope_round_trips_through_a_real_broker() {
let node = Kafka::default().start().await.expect("start kafka");
let port = node
.get_host_port_ipv4(KAFKA_PORT)
.await
.expect("mapped kafka port");
let brokers = vec![format!("127.0.0.1:{port}")];
let topic = "osproxy.fanout.test";
ensure_topic(&brokers, topic).await;
let producer = KrafkaProducer::connect(brokers.clone(), "osproxy-fanout-test", None)
.await
.expect("connect producer");
let queue = KafkaWriteQueue::new(Arc::new(producer), topic.to_owned(), BodyEncoding::Cbor);
let write = QueuedWrite {
op_id: "op-1".to_owned(),
partition_key: "acme".to_owned(),
batch: WriteBatch::single(WriteOp::new(
Target::new(ClusterId::from("eu-1"), IndexName::from("shared")),
DocOp::Index {
id: Some("acme:7".to_owned()),
routing: Some("acme".to_owned()),
body: bytes::Bytes::from_static(br#"{"_tenant":"acme","id":7,"msg":"hi"}"#),
},
Epoch::new(4),
)),
};
queue.enqueue(write).await.expect("enqueue acked");
let record = read_first(&brokers, topic).await;
assert_eq!(record.key.as_deref(), Some(b"acme".as_ref())); let env = OpEnvelope::decode(record.value.expect("payload").as_ref()).expect("decode");
assert_eq!(env.op_id, "op-1");
assert_eq!(env.partition, "acme");
assert_eq!(env.cluster, "eu-1");
assert_eq!(env.index, "shared");
assert_eq!(env.epoch, 4);
assert_eq!(env.op_type, OpType::Index as i32);
assert_eq!(env.id, "acme:7");
assert_eq!(env.content_type, "application/cbor");
let body: serde_json::Value =
ciborium::from_reader(env.body.as_slice()).expect("decode cbor body");
assert_eq!(
body,
serde_json::json!({"_tenant":"acme","id":7,"msg":"hi"})
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "needs a Docker daemon"]
async fn multi_op_partition_round_trips_in_order() {
let node = Kafka::default().start().await.expect("start kafka");
let port = node
.get_host_port_ipv4(KAFKA_PORT)
.await
.expect("mapped kafka port");
let brokers = vec![format!("127.0.0.1:{port}")];
let topic = "osproxy.fanout.multi";
ensure_topic(&brokers, topic).await;
let producer = KrafkaProducer::connect(brokers.clone(), "osproxy-fanout-multi", None)
.await
.expect("connect producer");
let queue = KafkaWriteQueue::new(Arc::new(producer), topic.to_owned(), BodyEncoding::Cbor);
let index = single(
"batch:0",
DocOp::Index {
id: Some("acme:7".to_owned()),
routing: Some("acme".to_owned()),
body: bytes::Bytes::from_static(br#"{"_tenant":"acme","id":7}"#),
},
);
let delete = single(
"batch:1",
DocOp::Delete {
id: "acme:8".to_owned(),
routing: Some("acme".to_owned()),
},
);
queue.enqueue(index).await.expect("enqueue index acked");
queue.enqueue(delete).await.expect("enqueue delete acked");
let records = read_n(&brokers, topic, 2).await;
for r in &records {
assert_eq!(r.key.as_deref(), Some(b"acme".as_ref())); }
let first = OpEnvelope::decode(records[0].value.as_deref().expect("payload")).expect("decode");
assert_eq!(first.op_id, "batch:0");
assert_eq!(first.op_type, OpType::Index as i32);
assert_eq!(first.id, "acme:7");
assert_eq!(first.content_type, "application/cbor");
let second = OpEnvelope::decode(records[1].value.as_deref().expect("payload")).expect("decode");
assert_eq!(second.op_id, "batch:1");
assert_eq!(second.op_type, OpType::Delete as i32);
assert_eq!(second.id, "acme:8");
assert!(second.body.is_empty(), "a delete carries no body");
assert_eq!(second.content_type, "");
}
fn single(op_id: &str, doc: DocOp) -> QueuedWrite {
QueuedWrite {
op_id: op_id.to_owned(),
partition_key: "acme".to_owned(),
batch: WriteBatch::single(WriteOp::new(
Target::new(ClusterId::from("eu-1"), IndexName::from("shared")),
doc,
Epoch::new(4),
)),
}
}
async fn ensure_topic(brokers: &[String], topic: &str) {
use krafka::admin::{AdminClient, NewTopic};
let admin = AdminClient::builder()
.bootstrap_servers(brokers.join(","))
.build()
.await
.expect("admin client");
for _ in 0..20 {
let spec = NewTopic::new(topic, 1, 1).expect("topic spec");
if admin
.create_topics(vec![spec], Duration::from_secs(10), false)
.await
.is_ok()
{
return;
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
panic!("topic could not be created within the warmup window");
}
async fn read_first(brokers: &[String], topic: &str) -> krafka::consumer::ConsumerRecord {
let consumer = Consumer::builder()
.bootstrap_servers(brokers.join(","))
.build()
.await
.expect("build consumer");
consumer.assign(topic, vec![0]).await.expect("assign");
consumer
.seek_to_beginning(topic, 0)
.await
.expect("seek to beginning");
read_n(brokers, topic, 1).await.pop().expect("a record")
}
async fn read_n(
brokers: &[String],
topic: &str,
n: usize,
) -> Vec<krafka::consumer::ConsumerRecord> {
let consumer = Consumer::builder()
.bootstrap_servers(brokers.join(","))
.build()
.await
.expect("build consumer");
consumer.assign(topic, vec![0]).await.expect("assign");
consumer
.seek_to_beginning(topic, 0)
.await
.expect("seek to beginning");
let mut out = Vec::with_capacity(n);
while out.len() < n {
let record = tokio::time::timeout(Duration::from_secs(30), consumer.recv())
.await
.expect("recv did not time out")
.expect("a record");
out.push(record);
}
out
}