use super::*;
use crate::nodes::{NodeOperators, StreamOperators, constant};
use crate::{RunFor, RunMode, burst};
use fluvio::consumer::ConsumerConfigExt;
use fluvio::metadata::customspu::CustomSpuSpec;
use fluvio::metadata::topic::TopicSpec;
use fluvio::{Fluvio, FluvioAdmin, FluvioClusterConfig, Offset};
use fluvio_controlplane_metadata::spu::{EncryptionEnum, Endpoint, IngressPort};
use futures::StreamExt;
use testcontainers::{
GenericImage, ImageExt, core::ExecCommand, core::WaitFor, runners::SyncRunner,
};
const FLUVIO_SC_PORT: u16 = 9003;
const FLUVIO_SC_PRIVATE_PORT: u16 = 9004;
const FLUVIO_SPU_PORT: u16 = 9010;
const FLUVIO_SPU_PRIVATE_PORT: u16 = 9011;
const FLUVIO_IMAGE: &str = "infinyon/fluvio";
const FLUVIO_TAG: &str = "0.18.1";
fn start_fluvio() -> anyhow::Result<(impl Drop, String)> {
let container = GenericImage::new(FLUVIO_IMAGE, FLUVIO_TAG)
.with_wait_for(WaitFor::message_on_stdout(
"Streaming Controller started successfully",
))
.with_cmd(vec![
"/bin/sh",
"-c",
"/fluvio-run sc --local /tmp/fluvio & wait",
])
.with_host_config_modifier(|hc| {
hc.network_mode = Some("host".to_string());
})
.start()?;
let endpoint = format!("127.0.0.1:{FLUVIO_SC_PORT}");
wait_for_port(&endpoint, std::time::Duration::from_secs(10))?;
register_spu(&endpoint)?;
let spu_cmd = format!(
"/fluvio-run spu \
--id 5001 \
--public-server 127.0.0.1:{FLUVIO_SPU_PORT} \
--private-server 127.0.0.1:{FLUVIO_SPU_PRIVATE_PORT} \
--sc-addr 127.0.0.1:{FLUVIO_SC_PRIVATE_PORT} \
--log-base-dir /tmp/fluvio &"
);
container.exec(ExecCommand::new(["/bin/sh", "-c", &spu_cmd]))?;
std::thread::sleep(std::time::Duration::from_secs(3));
Ok((container, endpoint))
}
fn wait_for_port(endpoint: &str, timeout: std::time::Duration) -> anyhow::Result<()> {
let addr: std::net::SocketAddr = endpoint
.parse()
.map_err(|e| anyhow::anyhow!("invalid endpoint {endpoint}: {e}"))?;
let deadline = std::time::Instant::now() + timeout;
loop {
match std::net::TcpStream::connect_timeout(&addr, std::time::Duration::from_millis(200)) {
Ok(_) => return Ok(()),
Err(_) if std::time::Instant::now() < deadline => {
std::thread::sleep(std::time::Duration::from_millis(100));
}
Err(e) => {
return Err(anyhow::anyhow!(
"fluvio SC never accepted on {endpoint}: {e}"
));
}
}
}
}
fn register_spu(endpoint: &str) -> anyhow::Result<()> {
const SPU_ID: i32 = 5001;
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let config = FluvioClusterConfig::new(endpoint);
let admin = FluvioAdmin::connect_with_config(&config)
.await
.map_err(|e| anyhow::anyhow!("fluvio admin connect failed: {e}"))?;
let spec = CustomSpuSpec {
id: SPU_ID,
public_endpoint: IngressPort::from_port_host(FLUVIO_SPU_PORT, "127.0.0.1".to_string()),
private_endpoint: Endpoint {
port: FLUVIO_SPU_PRIVATE_PORT,
host: "127.0.0.1".to_string(),
encryption: EncryptionEnum::PLAINTEXT,
},
rack: None,
public_endpoint_local: None,
};
admin
.create::<CustomSpuSpec>("spu-5001".to_string(), false, spec)
.await
.map_err(|e| anyhow::anyhow!("SPU register failed: {e:?}"))?;
Ok(())
})
}
fn create_topic(endpoint: &str, topic: &str) -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let config = FluvioClusterConfig::new(endpoint);
let admin = FluvioAdmin::connect_with_config(&config)
.await
.map_err(|e| anyhow::anyhow!("fluvio admin connect failed: {e}"))?;
admin
.create::<TopicSpec>(
topic.to_string(),
false,
TopicSpec::new_computed(1, 1, None),
)
.await
.map_err(|e| anyhow::anyhow!("topic create failed: {e}"))?;
Ok(())
})
}
fn seed_records(endpoint: &str, topic: &str, records: &[(&str, &[u8])]) -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let config = FluvioClusterConfig::new(endpoint);
let client = Fluvio::connect_with_config(&config)
.await
.map_err(|e| anyhow::anyhow!("fluvio connect failed: {e}"))?;
let producer = client
.topic_producer(topic)
.await
.map_err(|e| anyhow::anyhow!("producer create failed: {e}"))?;
for (key, value) in records {
producer
.send(*key, *value)
.await
.map_err(|e| anyhow::anyhow!("send failed: {e}"))?;
}
producer
.flush()
.await
.map_err(|e| anyhow::anyhow!("flush failed: {e}"))?;
Ok(())
})
}
fn read_records(
endpoint: &str,
topic: &str,
limit: usize,
) -> anyhow::Result<Vec<(Option<Vec<u8>>, Vec<u8>)>> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let config = FluvioClusterConfig::new(endpoint);
let client = Fluvio::connect_with_config(&config)
.await
.map_err(|e| anyhow::anyhow!("fluvio connect failed: {e}"))?;
let consumer_config = ConsumerConfigExt::builder()
.topic(topic)
.partition(0u32)
.offset_start(Offset::beginning())
.build()
.map_err(|e| anyhow::anyhow!("consumer config failed: {e}"))?;
let mut stream = client
.consumer_with_config(consumer_config)
.await
.map_err(|e| anyhow::anyhow!("consumer create failed: {e}"))?;
let mut results = Vec::new();
while results.len() < limit {
match stream.next().await {
Some(Ok(record)) => {
results.push((record.key().map(|k| k.to_vec()), record.value().to_vec()));
}
Some(Err(e)) => {
return Err(anyhow::anyhow!("record error: {e:?}"));
}
None => break,
}
}
Ok(results)
})
}
#[test]
fn test_connection_refused() {
let conn = FluvioConnection::new("127.0.0.1:59999");
let result = fluvio_sub(conn, "any-topic", 0, None)
.collapse()
.collect()
.run(RunMode::RealTime, RunFor::Cycles(1));
assert!(result.is_err(), "expected connection error");
}
#[test]
fn test_sub_from_beginning() -> anyhow::Result<()> {
let (_container, endpoint) = start_fluvio()?;
let topic = "sub-from-beginning";
create_topic(&endpoint, topic)?;
seed_records(&endpoint, topic, &[("k1", b"hello"), ("k2", b"world")])?;
let conn = FluvioConnection::new(&endpoint);
let collected = fluvio_sub(conn, topic, 0, None).collect();
collected.run(RunMode::RealTime, RunFor::Cycles(2))?;
let bursts = collected.peek_value();
let events: Vec<FluvioEvent> = bursts
.into_iter()
.flat_map(|b| b.value.into_iter())
.collect();
assert_eq!(events.len(), 2);
assert_eq!(events[0].value, b"hello");
assert_eq!(events[1].value, b"world");
Ok(())
}
#[test]
fn test_pub_round_trip() -> anyhow::Result<()> {
let (_container, endpoint) = start_fluvio()?;
let topic = "pub-round-trip";
create_topic(&endpoint, topic)?;
let conn = FluvioConnection::new(&endpoint);
let source = constant(burst![
FluvioRecord::with_key("greeting", b"hello".to_vec()),
FluvioRecord::new(b"world".to_vec()),
]);
fluvio_pub(conn, topic, &source).run(RunMode::RealTime, RunFor::Cycles(1))?;
let records = read_records(&endpoint, topic, 2)?;
assert_eq!(records.len(), 2);
assert_eq!(records[0].1, b"hello");
assert_eq!(records[1].1, b"world");
Ok(())
}
#[test]
fn test_sub_live_stream() -> anyhow::Result<()> {
let (_container, endpoint) = start_fluvio()?;
let topic = "sub-live-stream";
create_topic(&endpoint, topic)?;
let endpoint_clone = endpoint.clone();
let handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
seed_records(&endpoint_clone, topic, &[("live", b"event")]).unwrap();
});
let conn = FluvioConnection::new(&endpoint);
let collected = fluvio_sub(conn, topic, 0, None).collapse().collect();
collected.run(RunMode::RealTime, RunFor::Cycles(1))?;
handle.join().unwrap();
let events = collected.peek_value();
assert_eq!(events.len(), 1);
assert_eq!(events[0].value.value, b"event");
Ok(())
}
#[test]
fn test_pub_keyless_record() -> anyhow::Result<()> {
let (_container, endpoint) = start_fluvio()?;
let topic = "keyless-records";
create_topic(&endpoint, topic)?;
let conn = FluvioConnection::new(&endpoint);
let source = constant(burst![FluvioRecord::new(b"no-key".to_vec())]);
fluvio_pub(conn, topic, &source).run(RunMode::RealTime, RunFor::Cycles(1))?;
let records = read_records(&endpoint, topic, 1)?;
assert_eq!(records.len(), 1);
assert_eq!(records[0].0, None, "key should be None for keyless record");
assert_eq!(records[0].1, b"no-key");
Ok(())
}
#[test]
fn test_pub_keyed_record() -> anyhow::Result<()> {
let (_container, endpoint) = start_fluvio()?;
let topic = "keyed-records";
create_topic(&endpoint, topic)?;
let conn = FluvioConnection::new(&endpoint);
let source = constant(burst![FluvioRecord::with_key(
"my-key",
b"my-value".to_vec()
)]);
fluvio_pub(conn, topic, &source).run(RunMode::RealTime, RunFor::Cycles(1))?;
let records = read_records(&endpoint, topic, 1)?;
assert_eq!(records.len(), 1);
assert_eq!(records[0].0.as_deref(), Some(b"my-key".as_ref()));
assert_eq!(records[0].1, b"my-value");
Ok(())
}
#[test]
fn test_sub_from_absolute_offset() -> anyhow::Result<()> {
let (_container, endpoint) = start_fluvio()?;
let topic = "sub-abs-offset";
create_topic(&endpoint, topic)?;
seed_records(
&endpoint,
topic,
&[("k0", b"first"), ("k1", b"second"), ("k2", b"third")],
)?;
let conn = FluvioConnection::new(&endpoint);
let collected = fluvio_sub(conn, topic, 0, Some(1)).collect();
collected.run(RunMode::RealTime, RunFor::Cycles(2))?;
let events: Vec<FluvioEvent> = collected
.peek_value()
.into_iter()
.flat_map(|b| b.value.into_iter())
.collect();
assert_eq!(events.len(), 2);
assert_eq!(events[0].value, b"second");
assert_eq!(events[1].value, b"third");
Ok(())
}