use bytes::Bytes;
use lnc_client::{ClientConfig, LanceClient};
use std::time::{Duration, Instant};
fn get_test_addr() -> String {
std::env::var("LANCE_TEST_ADDR").unwrap_or_else(|_| "127.0.0.1:1992".to_string())
}
fn test_config() -> ClientConfig {
ClientConfig {
addr: get_test_addr(),
connect_timeout: Duration::from_secs(5),
read_timeout: Duration::from_secs(10),
write_timeout: Duration::from_secs(5),
keepalive_interval: Duration::from_secs(10),
tls: None,
}
}
fn is_cluster_mode() -> bool {
std::env::var("LANCE_NODE2_ADDR").is_ok()
&& std::env::var("LANCE_NODE2_ADDR")
.map(|v| v != std::env::var("LANCE_TEST_ADDR").unwrap_or_default())
.unwrap_or(false)
}
async fn wait_for_replication() {
if is_cluster_mode() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_connect_and_close() {
println!("Testing connection to {:?}", get_test_addr());
let client = LanceClient::connect(test_config()).await;
assert!(client.is_ok(), "Failed to connect: {:?}", client.err());
let client = client.unwrap();
println!("Connected successfully: {:?}", client);
let result = client.close().await;
assert!(result.is_ok(), "Failed to close: {:?}", result.err());
println!("Connection closed successfully");
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_connect_with_string_addr() {
let addr = get_test_addr();
println!("Testing connection to {}", addr);
let client = LanceClient::connect_to(&addr).await;
assert!(client.is_ok(), "Failed to connect: {:?}", client.err());
client.unwrap().close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_ping_latency() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let _ = client.ping().await;
let mut latencies = Vec::new();
for _ in 0..10 {
let latency = client.ping().await.unwrap();
latencies.push(latency);
}
let avg_latency: Duration = latencies.iter().sum::<Duration>() / latencies.len() as u32;
let min_latency = latencies.iter().min().unwrap();
let max_latency = latencies.iter().max().unwrap();
println!("Ping latency (10 samples):");
println!(" Min: {:?}", min_latency);
println!(" Max: {:?}", max_latency);
println!(" Avg: {:?}", avg_latency);
assert!(
avg_latency < Duration::from_millis(100),
"Average ping latency too high: {:?}",
avg_latency
);
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_single_ingest_sync() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic = client
.create_topic(&unique_topic_name("ingest_sync"))
.await
.unwrap();
let topic_id = topic.id;
let payload = Bytes::from_static(b"Hello, LANCE!");
let batch_id = client.send_ingest_sync(payload, topic_id).await;
assert!(batch_id.is_ok(), "Ingest failed: {:?}", batch_id.err());
println!("Ingested batch_id: {}", batch_id.unwrap());
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_sequential_ingests() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic = client
.create_topic(&unique_topic_name("sequential"))
.await
.unwrap();
let topic_id = topic.id;
let count = 100;
let start = Instant::now();
for i in 1..=count {
let payload = Bytes::from(format!("sequential message {}", i));
let batch_id = client.send_ingest_sync(payload, topic_id).await.unwrap();
assert_eq!(batch_id, i as u64);
}
let elapsed = start.elapsed();
let rate = count as f64 / elapsed.as_secs_f64();
println!("Sequential ingests:");
println!(" Count: {}", count);
println!(" Time: {:?}", elapsed);
println!(" Rate: {:.2} msgs/sec", rate);
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_pipelined_ingests() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic = client
.create_topic(&unique_topic_name("pipelined"))
.await
.unwrap();
let topic_id = topic.id;
let count = 1000;
let payload_size = 1024;
let start = Instant::now();
for _ in 0..count {
let payload = Bytes::from(vec![0xAB; payload_size]);
client.send_ingest(payload, topic_id).await.unwrap();
}
let send_elapsed = start.elapsed();
for i in 1..=count {
let acked_id = client.recv_ack().await.unwrap();
assert_eq!(acked_id, i as u64);
}
let total_elapsed = start.elapsed();
let total_bytes = count * payload_size;
let throughput_mbps = (total_bytes as f64 / 1024.0 / 1024.0) / total_elapsed.as_secs_f64();
println!("Pipelined ingests:");
println!(" Count: {}", count);
println!(" Payload size: {} bytes", payload_size);
println!(" Send time: {:?}", send_elapsed);
println!(" Total time: {:?}", total_elapsed);
println!(
" Rate: {:.2} msgs/sec",
count as f64 / total_elapsed.as_secs_f64()
);
println!(" Throughput: {:.2} MB/s", throughput_mbps);
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_large_payload() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic = client
.create_topic(&unique_topic_name("large_payload"))
.await
.unwrap();
let topic_id = topic.id;
let payload_size = 1024 * 1024;
let payload = Bytes::from(vec![0xCD; payload_size]);
let start = Instant::now();
let batch_id = client.send_ingest_sync(payload, topic_id).await;
let elapsed = start.elapsed();
assert!(
batch_id.is_ok(),
"Large payload ingest failed: {:?}",
batch_id.err()
);
let throughput_mbps = (payload_size as f64 / 1024.0 / 1024.0) / elapsed.as_secs_f64();
println!("Large payload ingest:");
println!(
" Size: {} bytes ({} MB)",
payload_size,
payload_size / 1024 / 1024
);
println!(" Time: {:?}", elapsed);
println!(" Throughput: {:.2} MB/s", throughput_mbps);
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_throughput_benchmark() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic = client
.create_topic(&unique_topic_name("throughput"))
.await
.unwrap();
let topic_id = topic.id;
let duration_secs = 5;
let payload_size = 10 * 1024; let payload = Bytes::from(vec![0xEF; payload_size]);
println!(
"Running throughput benchmark for {} seconds...",
duration_secs
);
println!("Payload size: {} bytes", payload_size);
let start = Instant::now();
let deadline = start + Duration::from_secs(duration_secs);
let mut sent_count = 0u64;
let mut acked_count = 0u64;
while Instant::now() < deadline {
for _ in 0..100 {
if Instant::now() >= deadline {
break;
}
client.send_ingest(payload.clone(), topic_id).await.unwrap();
sent_count += 1;
}
while acked_count < sent_count {
match tokio::time::timeout(Duration::from_millis(1), client.recv_ack()).await {
Ok(Ok(_)) => acked_count += 1,
Ok(Err(e)) => panic!("Ack error: {:?}", e),
Err(_) => break, }
}
}
while acked_count < sent_count {
client.recv_ack().await.unwrap();
acked_count += 1;
}
let elapsed = start.elapsed();
let total_bytes = sent_count * payload_size as u64;
let throughput_mbps = (total_bytes as f64 / 1024.0 / 1024.0) / elapsed.as_secs_f64();
let msgs_per_sec = sent_count as f64 / elapsed.as_secs_f64();
println!("\nBenchmark Results:");
println!(" Duration: {:?}", elapsed);
println!(" Messages sent: {}", sent_count);
println!(" Messages acked: {}", acked_count);
println!(
" Total data: {:.2} MB",
total_bytes as f64 / 1024.0 / 1024.0
);
println!(" Throughput: {:.2} MB/s", throughput_mbps);
println!(" Message rate: {:.2} msgs/sec", msgs_per_sec);
client.close().await.unwrap();
}
fn unique_topic_name(prefix: &str) -> String {
format!(
"{}_{}",
prefix,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros()
)
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_create_and_list_topics() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("create_list_test");
let created = client.create_topic(&topic_name).await.unwrap();
wait_for_replication().await;
println!("Created topic: id={}, name={}", created.id, created.name);
assert_eq!(created.name, topic_name);
assert!(created.id > 0);
let topics = client.list_topics().await.unwrap();
assert!(
topics.iter().any(|t| t.id == created.id),
"Created topic should appear in list"
);
println!(
"Topic {} found in list of {} topics",
created.id,
topics.len()
);
client.delete_topic(created.id).await.unwrap();
wait_for_replication().await;
let topics_after = client.list_topics().await.unwrap();
assert!(
!topics_after.iter().any(|t| t.id == created.id),
"Deleted topic should not be in list"
);
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_get_topic_by_id() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("get_by_id_test");
let created = client.create_topic(&topic_name).await.unwrap();
wait_for_replication().await;
let fetched = client.get_topic(created.id).await.unwrap();
assert_eq!(fetched.id, created.id);
assert_eq!(fetched.name, topic_name);
println!("Fetched topic: id={}, name={}", fetched.id, fetched.name);
client.delete_topic(created.id).await.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_delete_topic() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("delete_test");
let created = client.create_topic(&topic_name).await.unwrap();
println!("Created topic for deletion: id={}", created.id);
wait_for_replication().await;
client.delete_topic(created.id).await.unwrap();
println!("Deleted topic id={}", created.id);
wait_for_replication().await;
let result = client.get_topic(created.id).await;
assert!(result.is_err(), "Topic should not exist after deletion");
println!("Expected error after deletion: {:?}", result.err());
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_topic_lifecycle_with_data() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("lifecycle_test");
let topic = client.create_topic(&topic_name).await.unwrap();
println!("1. Created topic: id={}, name={}", topic.id, topic.name);
wait_for_replication().await;
let mut total_bytes = 0usize;
let batch_count = 10;
for i in 0..batch_count {
let payload = Bytes::from(format!(
"lifecycle test message {} for topic {}",
i, topic.id
));
total_bytes += payload.len();
client
.send_ingest_to_topic_sync(topic.id, payload, 1, None)
.await
.unwrap();
}
println!(
"2. Ingested {} batches ({} bytes) to topic {}",
batch_count, total_bytes, topic.id
);
let fetched = client.get_topic(topic.id).await.unwrap();
assert_eq!(fetched.id, topic.id);
println!("3. Topic still exists: id={}", fetched.id);
client.delete_topic(topic.id).await.unwrap();
println!("4. Deleted topic id={}", topic.id);
wait_for_replication().await;
let result = client.get_topic(topic.id).await;
assert!(result.is_err());
println!("5. Topic confirmed deleted");
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_multiple_topics_concurrent_ingest() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_count = 3;
let mut topics = Vec::new();
for i in 0..topic_count {
let name = unique_topic_name(&format!("concurrent_{}", i));
let topic = client.create_topic(&name).await.unwrap();
topics.push(topic);
}
println!("Created {} topics", topics.len());
let messages_per_topic = 100;
let start = Instant::now();
for msg_idx in 0..(messages_per_topic * topic_count) {
let topic_idx = msg_idx % topic_count;
let topic_id = topics[topic_idx].id;
let payload = Bytes::from(format!("msg {} to topic {}", msg_idx, topic_id));
client
.send_ingest_to_topic_sync(topic_id, payload, 1, None)
.await
.unwrap();
}
let elapsed = start.elapsed();
let total_msgs = messages_per_topic * topic_count;
println!(
"Ingested {} messages to {} topics in {:?}",
total_msgs, topic_count, elapsed
);
println!(
"Rate: {:.2} msgs/sec",
total_msgs as f64 / elapsed.as_secs_f64()
);
for topic in &topics {
client.delete_topic(topic.id).await.unwrap();
}
println!("Cleaned up {} topics", topics.len());
client.close().await.unwrap();
}
#[tokio::test]
async fn test_connect_to_invalid_address() {
let config = ClientConfig {
addr: "127.0.0.1:19999".to_string(), connect_timeout: Duration::from_secs(1),
..Default::default()
};
let result = LanceClient::connect(config).await;
assert!(result.is_err(), "Should fail to connect to invalid address");
println!("Expected error: {:?}", result.err());
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_get_nonexistent_topic() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let result = client.get_topic(999999).await;
assert!(result.is_err(), "Should fail to get nonexistent topic");
println!("Expected error: {:?}", result.err());
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_delete_nonexistent_topic() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let result = client.delete_topic(999999).await;
assert!(result.is_err(), "Should fail to delete nonexistent topic");
println!("Expected error: {:?}", result.err());
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_multiple_subscribers_single_topic() {
let mut client1 = LanceClient::connect(test_config()).await.unwrap();
let mut client2 = LanceClient::connect(test_config()).await.unwrap();
let mut client3 = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("multi_subscriber");
let topic = client1.create_topic(&topic_name).await.unwrap();
let topic_id = topic.id;
println!("Created shared topic: id={}, name={}", topic_id, topic_name);
wait_for_replication().await;
let consumer_id1 = 1001u64;
let consumer_id2 = 1002u64;
let consumer_id3 = 1003u64;
let max_batch_bytes = 64 * 1024u32;
let result1 = client1
.subscribe(topic_id, 0, max_batch_bytes, consumer_id1)
.await;
assert!(
result1.is_ok(),
"Client 1 subscribe failed: {:?}",
result1.err()
);
let sub1 = result1.unwrap();
println!(
"Subscriber 1 (consumer_id={}) subscribed at offset {}",
consumer_id1, sub1.start_offset
);
let result2 = client2
.subscribe(topic_id, 0, max_batch_bytes, consumer_id2)
.await;
assert!(
result2.is_ok(),
"Client 2 subscribe failed: {:?}",
result2.err()
);
let sub2 = result2.unwrap();
println!(
"Subscriber 2 (consumer_id={}) subscribed at offset {}",
consumer_id2, sub2.start_offset
);
let result3 = client3
.subscribe(topic_id, 0, max_batch_bytes, consumer_id3)
.await;
assert!(
result3.is_ok(),
"Client 3 subscribe failed: {:?}",
result3.err()
);
let sub3 = result3.unwrap();
println!(
"Subscriber 3 (consumer_id={}) subscribed at offset {}",
consumer_id3, sub3.start_offset
);
let mut producer = LanceClient::connect(test_config()).await.unwrap();
let message_count = 20;
for i in 0..message_count {
let payload = Bytes::from(format!("multi-sub message {}", i));
producer
.send_ingest_to_topic_sync(topic_id, payload, 1, None)
.await
.unwrap();
}
println!("Ingested {} messages to topic {}", message_count, topic_id);
client1
.commit_offset(topic_id, consumer_id1, 5)
.await
.unwrap();
println!("Subscriber 1 committed offset 5");
client2
.commit_offset(topic_id, consumer_id2, 10)
.await
.unwrap();
println!("Subscriber 2 committed offset 10");
client3
.commit_offset(topic_id, consumer_id3, 15)
.await
.unwrap();
println!("Subscriber 3 committed offset 15");
client1.unsubscribe(topic_id, consumer_id1).await.unwrap();
client2.unsubscribe(topic_id, consumer_id2).await.unwrap();
client3.unsubscribe(topic_id, consumer_id3).await.unwrap();
println!("All subscribers unsubscribed");
producer.delete_topic(topic_id).await.unwrap();
println!("Deleted topic {}", topic_id);
client1.close().await.unwrap();
client2.close().await.unwrap();
client3.close().await.unwrap();
producer.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_subscriber_resume_after_disconnect() {
let mut client1 = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("resume_test");
let topic = client1.create_topic(&topic_name).await.unwrap();
let topic_id = topic.id;
let consumer_id = 2001u64;
wait_for_replication().await;
let max_batch_bytes = 64 * 1024u32;
client1
.subscribe(topic_id, 0, max_batch_bytes, consumer_id)
.await
.unwrap();
println!("1. Subscribed to topic {}", topic_id);
for i in 0..10 {
let payload = Bytes::from(format!("resume test message {}", i));
client1
.send_ingest_to_topic_sync(topic_id, payload, 1, None)
.await
.unwrap();
}
println!("2. Ingested 10 messages");
client1
.commit_offset(topic_id, consumer_id, 5)
.await
.unwrap();
println!("3. Committed offset 5");
client1.unsubscribe(topic_id, consumer_id).await.unwrap();
client1.close().await.unwrap();
println!("4. Disconnected");
let mut client2 = LanceClient::connect(test_config()).await.unwrap();
let result = client2
.subscribe(topic_id, 5, max_batch_bytes, consumer_id)
.await;
assert!(result.is_ok(), "Re-subscribe failed: {:?}", result.err());
let sub_result = result.unwrap();
println!(
"5. Re-subscribed, resumed at offset {}",
sub_result.start_offset
);
client2.delete_topic(topic_id).await.unwrap();
client2.close().await.unwrap();
println!("6. Cleanup complete");
}
#[tokio::test]
#[ignore] async fn test_consumer_offset_store_persistence() {
use lnc_client::{Consumer, ConsumerConfig, LockFileOffsetStore, OffsetStore};
use std::sync::Arc;
let temp_dir = tempfile::tempdir().unwrap();
let offset_dir = temp_dir.path();
let mut client1 = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("offset_store_test");
let topic = client1.create_topic(&topic_name).await.unwrap();
let topic_id = topic.id;
let consumer_id = 3001u64;
let consumer_name = "test-consumer";
wait_for_replication().await;
for i in 0..20 {
let payload = Bytes::from(format!("offset store message {}", i));
client1
.send_ingest_to_topic_sync(topic_id, payload, 1, None)
.await
.unwrap();
}
println!("1. Ingested 20 messages to topic {}", topic_id);
{
let client2 = LanceClient::connect(test_config()).await.unwrap();
let offset_store = LockFileOffsetStore::open(offset_dir, consumer_name).unwrap();
let config = ConsumerConfig::new(topic_id);
let mut consumer = Consumer::with_offset_store(
client2,
&get_test_addr(),
config,
consumer_id,
Arc::new(offset_store),
)
.unwrap();
drop(consumer.poll());
println!("2. Consumer created and polled");
consumer.seek_to_offset(10).await.unwrap();
consumer.commit().await.unwrap();
println!("3. Committed offset at position 10");
}
println!("4. Consumer disconnected");
{
let offset_store = LockFileOffsetStore::open(offset_dir, consumer_name).unwrap();
let stored = offset_store.load(topic_id, consumer_id).unwrap();
assert_eq!(stored, Some(10), "Offset should be persisted to file");
println!("5. Verified persisted offset = 10");
}
{
let client3 = LanceClient::connect(test_config()).await.unwrap();
let offset_store = LockFileOffsetStore::open(offset_dir, consumer_name).unwrap();
let config = ConsumerConfig::new(topic_id);
let consumer = Consumer::with_offset_store(
client3,
&get_test_addr(),
config,
consumer_id, Arc::new(offset_store),
)
.unwrap();
assert_eq!(
consumer.current_offset(),
10,
"Consumer should resume from stored offset"
);
println!(
"6. New consumer resumed at offset {}",
consumer.current_offset()
);
}
client1.delete_topic(topic_id).await.unwrap();
client1.close().await.unwrap();
println!("7. Cleanup complete");
}
fn get_node_addr(node: u8) -> Option<String> {
let var_name = format!("LANCE_NODE{}_ADDR", node);
std::env::var(&var_name).ok()
}
fn cluster_config(node: u8) -> Option<ClientConfig> {
get_node_addr(node).map(|addr| ClientConfig {
addr,
connect_timeout: Duration::from_secs(5),
read_timeout: Duration::from_secs(10),
write_timeout: Duration::from_secs(5),
keepalive_interval: Duration::from_secs(10),
tls: None,
})
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_cluster_write_to_leader_read_from_follower() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let mut client1 = LanceClient::connect(node1_config).await.unwrap();
let mut client2 = LanceClient::connect(node2_config).await.unwrap();
println!("Connected to node 1 and node 2");
let topic_name = unique_topic_name("cluster_test");
let topic = client1.create_topic(&topic_name).await.unwrap();
let topic_id = topic.id;
println!("Created topic {} on node 1", topic_id);
let message_count = 10;
for i in 0..message_count {
let payload = Bytes::from(format!("cluster message {}", i));
client1
.send_ingest_to_topic_sync(topic_id, payload, 1, None)
.await
.unwrap();
}
println!("Ingested {} messages to node 1", message_count);
tokio::time::sleep(Duration::from_millis(500)).await;
let result = client2.get_topic(topic_id).await;
if result.is_ok() {
println!("Topic {} found on node 2 (replicated)", topic_id);
} else {
println!(
"Topic {} not yet replicated to node 2: {:?}",
topic_id,
result.err()
);
}
let topics = client2.list_topics().await.unwrap();
println!("Node 2 has {} topics", topics.len());
client1.delete_topic(topic_id).await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
println!("Cleanup complete");
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_cluster_all_nodes_see_topics() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let node3_config = cluster_config(3).expect("LANCE_NODE3_ADDR not set");
let mut client1 = LanceClient::connect(node1_config).await.unwrap();
let mut client2 = LanceClient::connect(node2_config).await.unwrap();
let mut client3 = LanceClient::connect(node3_config).await.unwrap();
println!("Connected to all 3 nodes");
let topic_name = unique_topic_name("cluster_visibility");
let topic = client1.create_topic(&topic_name).await.unwrap();
let topic_id = topic.id;
println!("Created topic {} on node 1", topic_id);
tokio::time::sleep(Duration::from_secs(1)).await;
let topics1 = client1.list_topics().await.unwrap();
let topics2 = client2.list_topics().await.unwrap();
let topics3 = client3.list_topics().await.unwrap();
println!(
"Topic counts: node1={}, node2={}, node3={}",
topics1.len(),
topics2.len(),
topics3.len()
);
let visible_on_1 = topics1.iter().any(|t| t.id == topic_id);
let visible_on_2 = topics2.iter().any(|t| t.id == topic_id);
let visible_on_3 = topics3.iter().any(|t| t.id == topic_id);
println!(
"Topic {} visible: node1={}, node2={}, node3={}",
topic_id, visible_on_1, visible_on_2, visible_on_3
);
assert!(visible_on_1, "Topic should be visible on node 1");
client1.delete_topic(topic_id).await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
client3.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_cluster_topic_delete_replication() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let mut client1 = LanceClient::connect(node1_config).await.unwrap();
let mut client2 = LanceClient::connect(node2_config).await.unwrap();
println!("Connected to node 1 and node 2");
let topic_name = unique_topic_name("delete_replication");
let topic = client1.create_topic(&topic_name).await.unwrap();
let topic_id = topic.id;
println!("Created topic {} on node 1", topic_id);
tokio::time::sleep(Duration::from_secs(1)).await;
let topics_before = client2.list_topics().await.unwrap();
let exists_on_node2 = topics_before.iter().any(|t| t.id == topic_id);
println!(
"Topic {} exists on node 2 before delete: {}",
topic_id, exists_on_node2
);
client1.delete_topic(topic_id).await.unwrap();
println!("Deleted topic {} on node 1", topic_id);
tokio::time::sleep(Duration::from_secs(1)).await;
let topics_after = client2.list_topics().await.unwrap();
let still_exists = topics_after.iter().any(|t| t.id == topic_id);
println!(
"Topic {} exists on node 2 after delete: {}",
topic_id, still_exists
);
client1.close().await.unwrap();
client2.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_cluster_write_routing_from_follower() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let mut client1 = LanceClient::connect(node1_config).await.unwrap();
let mut client2 = LanceClient::connect(node2_config).await.unwrap();
println!("Connected to node 1 and node 2");
let topic_name = unique_topic_name("write_routing");
let topic = client1.create_topic(&topic_name).await.unwrap();
let topic_id = topic.id;
println!("Created topic {} on node 1", topic_id);
let topic2_name = unique_topic_name("follower_create");
let result = client2.create_topic(&topic2_name).await;
match result {
Ok(topic2) => {
println!(
"Topic {} created via node 2 (redirected to leader)",
topic2.id
);
client1.delete_topic(topic2.id).await.unwrap();
},
Err(e) => {
let err_str = format!("{:?}", e);
println!("Create from follower returned: {}", err_str);
assert!(
err_str.contains("NOT_LEADER") || err_str.contains("redirect"),
"Expected NOT_LEADER error, got: {}",
err_str
);
},
}
client1.delete_topic(topic_id).await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_cluster_concurrent_writes_multiple_nodes() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let mut client1 = LanceClient::connect(node1_config.clone()).await.unwrap();
let mut client2 = LanceClient::connect(node2_config.clone()).await.unwrap();
let topic1_name = unique_topic_name("concurrent_node1");
let topic2_name = unique_topic_name("concurrent_node2");
let topic1 = client1.create_topic(&topic1_name).await.unwrap();
let topic2 = client2.create_topic(&topic2_name).await.unwrap();
println!(
"Created topic {} on node 1, topic {} on node 2",
topic1.id, topic2.id
);
let start = Instant::now();
let messages_per_client = 50;
let client1_handle = {
let topic_id = topic1.id;
let config = node1_config.clone();
tokio::spawn(async move {
let mut c = LanceClient::connect(config).await.unwrap();
for i in 0..messages_per_client {
let payload = Bytes::from(format!("node1 msg {}", i));
c.send_ingest_to_topic_sync(topic_id, payload, 1, None)
.await
.unwrap();
}
c.close().await.unwrap();
})
};
let client2_handle = {
let topic_id = topic2.id;
let config = node2_config.clone();
tokio::spawn(async move {
let mut c = LanceClient::connect(config).await.unwrap();
for i in 0..messages_per_client {
let payload = Bytes::from(format!("node2 msg {}", i));
c.send_ingest_to_topic_sync(topic_id, payload, 1, None)
.await
.unwrap();
}
c.close().await.unwrap();
})
};
client1_handle.await.unwrap();
client2_handle.await.unwrap();
let elapsed = start.elapsed();
let total_messages = messages_per_client * 2;
println!(
"Concurrent writes complete: {} messages in {:?}",
total_messages, elapsed
);
println!(
"Rate: {:.2} msgs/sec",
total_messages as f64 / elapsed.as_secs_f64()
);
client1.delete_topic(topic1.id).await.unwrap();
client2.delete_topic(topic2.id).await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_set_retention_policy() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("retention_set");
let topic = client.create_topic(&topic_name).await.unwrap();
println!("Created topic {} with id {}", topic_name, topic.id);
let max_age_secs = 3600;
let max_bytes = 100 * 1024 * 1024; let result = client
.set_retention(topic.id, max_age_secs, max_bytes)
.await;
assert!(
result.is_ok(),
"Failed to set retention: {:?}",
result.err()
);
println!(
"Set retention policy: max_age={}s, max_bytes={}",
max_age_secs, max_bytes
);
let info = client.get_topic(topic.id).await;
assert!(info.is_ok(), "Failed to get topic: {:?}", info.err());
let info = info.unwrap();
println!("Topic info after retention set: {:?}", info);
client.delete_topic(topic.id).await.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_create_topic_with_retention() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("retention_create");
let max_age_secs = 7200; let max_bytes = 50 * 1024 * 1024;
let result = client
.create_topic_with_retention(&topic_name, max_age_secs, max_bytes)
.await;
assert!(
result.is_ok(),
"Failed to create topic with retention: {:?}",
result.err()
);
let topic = result.unwrap();
println!(
"Created topic {} with id {} and retention policy",
topic_name, topic.id
);
let info = client.get_topic(topic.id).await;
assert!(info.is_ok(), "Failed to get topic: {:?}", info.err());
client.delete_topic(topic.id).await.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_retention_policy_update() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("retention_update");
let topic = client.create_topic(&topic_name).await.unwrap();
let result = client.set_retention(topic.id, 3600, 100_000_000).await;
assert!(result.is_ok());
println!("Initial retention set");
let result = client.set_retention(topic.id, 1800, 50_000_000).await;
assert!(
result.is_ok(),
"Failed to update retention: {:?}",
result.err()
);
println!("Retention updated successfully");
client.delete_topic(topic.id).await.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_retention_on_nonexistent_topic() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let result = client.set_retention(99999, 3600, 100_000_000).await;
assert!(result.is_err(), "Expected error for nonexistent topic");
println!(
"Correctly rejected retention on nonexistent topic: {:?}",
result.err()
);
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_cluster_retention_replication() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let mut client1 = LanceClient::connect(node1_config).await.unwrap();
let mut client2 = LanceClient::connect(node2_config).await.unwrap();
let topic_name = unique_topic_name("retention_cluster");
let topic = client1.create_topic(&topic_name).await.unwrap();
println!("Created topic {} on node 1", topic.id);
let result = client1.set_retention(topic.id, 3600, 100_000_000).await;
assert!(
result.is_ok(),
"Failed to set retention: {:?}",
result.err()
);
println!("Set retention on node 1");
wait_for_replication().await;
let info = client2.get_topic(topic.id).await;
assert!(
info.is_ok(),
"Topic should be visible on node 2: {:?}",
info.err()
);
println!("Topic with retention visible on node 2");
client1.delete_topic(topic.id).await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_get_cluster_status() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let status = client.get_cluster_status().await.unwrap();
println!("Cluster status: {:?}", status);
println!("Node ID: {}", status.node_id);
println!("Is Leader: {}", status.is_leader);
println!("Node Count: {}", status.node_count);
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_cluster_status_from_multiple_nodes() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let node3_config = cluster_config(3).expect("LANCE_NODE3_ADDR not set");
let mut client1 = LanceClient::connect(node1_config).await.unwrap();
let mut client2 = LanceClient::connect(node2_config).await.unwrap();
let mut client3 = LanceClient::connect(node3_config).await.unwrap();
let status1 = client1.get_cluster_status().await.unwrap();
let status2 = client2.get_cluster_status().await.unwrap();
let status3 = client3.get_cluster_status().await.unwrap();
println!("Node 1: is_leader={}", status1.is_leader);
println!("Node 2: is_leader={}", status2.is_leader);
println!("Node 3: is_leader={}", status3.is_leader);
let leader_count = [status1.is_leader, status2.is_leader, status3.is_leader]
.iter()
.filter(|&&x| x)
.count();
assert_eq!(leader_count, 1, "Exactly one node should be leader");
client1.close().await.unwrap();
client2.close().await.unwrap();
client3.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_cluster_failover_write_continuity() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let mut client1 = LanceClient::connect(node1_config.clone()).await.unwrap();
let mut client2 = LanceClient::connect(node2_config.clone()).await.unwrap();
let topic_name = unique_topic_name("failover_test");
let topic = client1.create_topic(&topic_name).await.unwrap();
println!("Created topic {} for failover test", topic.id);
wait_for_replication().await;
let status1 = client1.get_cluster_status().await.unwrap();
let status2 = client2.get_cluster_status().await.unwrap();
println!("Node 1 is_leader: {}", status1.is_leader);
println!("Node 2 is_leader: {}", status2.is_leader);
let test_data = bytes::Bytes::from_static(b"pre-failover data");
let write_result = if status1.is_leader {
client1
.send_ingest_to_topic_sync(topic.id, test_data.clone(), 1, None)
.await
} else {
client2
.send_ingest_to_topic_sync(topic.id, test_data, 1, None)
.await
};
assert!(write_result.is_ok(), "Pre-failover write should succeed");
println!("Pre-failover write succeeded");
client1.delete_topic(topic.id).await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires 3-node LANCE cluster"]
async fn test_write_forwarding_from_follower() {
let node1_config = cluster_config(1).expect("LANCE_NODE1_ADDR not set");
let node2_config = cluster_config(2).expect("LANCE_NODE2_ADDR not set");
let node3_config = cluster_config(3).expect("LANCE_NODE3_ADDR not set");
let mut client1 = LanceClient::connect(node1_config).await.unwrap();
let mut client2 = LanceClient::connect(node2_config).await.unwrap();
let mut client3 = LanceClient::connect(node3_config).await.unwrap();
let topic_name = unique_topic_name("forward_test");
let topic = client1.create_topic(&topic_name).await.unwrap();
println!("Created topic {} for write forwarding test", topic.id);
wait_for_replication().await;
let status1 = client1.get_cluster_status().await.unwrap();
let status2 = client2.get_cluster_status().await.unwrap();
let _status3 = client3.get_cluster_status().await.unwrap();
let follower_client = if !status1.is_leader {
&mut client1
} else if !status2.is_leader {
&mut client2
} else {
&mut client3
};
let test_data = bytes::Bytes::from_static(b"forwarded write data");
let result = follower_client
.send_ingest_to_topic_sync(topic.id, test_data, 1, None)
.await;
match &result {
Ok(_) => println!("Write forwarded successfully"),
Err(e) => println!("Write forward result: {:?}", e),
}
client1.delete_topic(topic.id).await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
client3.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server with WAL enabled"]
async fn test_wal_replay_after_restart() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("wal_test");
let topic = client.create_topic(&topic_name).await.unwrap();
println!("Created topic {} for WAL test", topic.id);
for i in 0..10 {
let data = bytes::Bytes::from(format!("wal_test_batch_{}", i));
let result = client
.send_ingest_to_topic_sync(topic.id, data, 1, None)
.await;
assert!(result.is_ok(), "Write {} should succeed", i);
}
println!("Wrote 10 batches to topic");
let info = client.get_topic(topic.id).await.unwrap();
println!("Topic info after writes: {:?}", info);
client.delete_topic(topic.id).await.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_backpressure_under_load() {
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("backpressure_test");
let topic = client.create_topic(&topic_name).await.unwrap();
println!("Created topic {} for backpressure test", topic.id);
let mut success_count = 0u32;
let mut backpressure_count = 0u32;
let data = bytes::Bytes::from(vec![0u8; 256]); for i in 0..1000 {
match client
.send_ingest_to_topic_sync(topic.id, data.clone(), 1, None)
.await
{
Ok(_) => success_count += 1,
Err(e) => {
let err_str = format!("{:?}", e);
if err_str.contains("backpressure") || err_str.contains("Backpressure") {
backpressure_count += 1;
} else {
println!("Write {} failed: {:?}", i, e);
}
},
}
}
println!(
"Backpressure test: {} succeeded, {} backpressured",
success_count, backpressure_count
);
assert!(success_count > 0, "At least some writes should succeed");
client.delete_topic(topic.id).await.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires LANCE server with TLS enabled and certificates"]
async fn test_tls_encrypted_connection() {
let config = test_config();
let result = LanceClient::connect(config).await;
match result {
Ok(mut client) => {
let topic_name = unique_topic_name("tls_test");
let topic_result = client.create_topic(&topic_name).await;
assert!(
topic_result.is_ok(),
"Should create topic over TLS: {:?}",
topic_result.err()
);
let topic = topic_result.unwrap();
println!("Created topic {} over TLS connection", topic.id);
let test_data = bytes::Bytes::from_static(b"TLS encrypted data");
let write_result = client
.send_ingest_to_topic_sync(topic.id, test_data, 1, None)
.await;
assert!(
write_result.is_ok(),
"Should write over TLS: {:?}",
write_result.err()
);
println!("Wrote data over TLS connection");
client.delete_topic(topic.id).await.unwrap();
client.close().await.unwrap();
println!("TLS connection test passed");
},
Err(e) => {
println!(
"TLS connection failed (expected if server not TLS-enabled): {:?}",
e
);
},
}
}
#[tokio::test]
#[ignore = "requires LANCE server with mTLS enabled"]
async fn test_mtls_client_certificate() {
let config = test_config();
let result = LanceClient::connect(config).await;
match result {
Ok(mut client) => {
println!("mTLS connection established");
let ping_result = client.ping().await;
assert!(ping_result.is_ok(), "Ping should succeed over mTLS");
println!("Ping latency: {:?}", ping_result.unwrap());
client.close().await.unwrap();
},
Err(e) => {
println!("mTLS connection failed: {:?}", e);
},
}
}
#[tokio::test]
#[ignore = "requires LANCE server with TLS enabled"]
async fn test_tls_with_client_config_integration() {
use lnc_client::TlsClientConfig;
let addr = get_test_addr();
let tls = TlsClientConfig::new();
let config = ClientConfig::new(addr).with_tls(tls);
assert!(config.is_tls_enabled(), "TLS should be enabled in config");
match LanceClient::connect(config).await {
Ok(mut client) => {
let ping = client.ping().await;
assert!(ping.is_ok(), "Ping over TLS should succeed");
println!("TLS via ClientConfig integration: ping {:?}", ping.unwrap());
client.close().await.unwrap();
},
Err(e) => {
println!("TLS connection via ClientConfig failed: {:?}", e);
},
}
}
#[tokio::test]
#[ignore = "requires LANCE server with TLS enabled"]
async fn test_tls_certificate_validation() {
use lnc_client::TlsClientConfig;
let addr = get_test_addr();
let tls = TlsClientConfig::new().with_ca_cert("/nonexistent/ca.pem");
let config = ClientConfig::new(addr).with_tls(tls);
let result = LanceClient::connect(config).await;
assert!(result.is_err(), "Connection with invalid CA should fail");
println!("Certificate validation correctly rejected invalid CA");
}
#[tokio::test]
#[ignore = "requires LANCE cluster with TLS enabled"]
async fn test_tls_cluster_communication() {
let config = test_config();
match LanceClient::connect(config).await {
Ok(mut client) => {
let status = client.get_cluster_status().await;
match status {
Ok(cluster) => {
println!(
"Cluster status over TLS: {} nodes, leader: {:?}",
cluster.node_count, cluster.leader_id
);
assert!(cluster.node_count >= 1, "Should have at least one node");
},
Err(e) => {
println!("Cluster query failed (may be single-node): {:?}", e);
},
}
client.close().await.unwrap();
},
Err(e) => {
println!("Cluster TLS connection failed: {:?}", e);
},
}
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_connection_pool_basic() {
use lnc_client::{ConnectionPool, ConnectionPoolConfig};
let addr = get_test_addr();
let config = ConnectionPoolConfig::new()
.with_max_connections(5)
.with_min_idle(1);
let pool = ConnectionPool::new(&addr, config).await.unwrap();
let mut conn = pool.get().await.unwrap();
let latency = conn.ping().await.unwrap();
println!("Pool connection ping latency: {:?}", latency);
let stats = pool.stats();
assert!(
stats.connections_created >= 1,
"Should have created at least one connection"
);
println!("Pool stats: {:?}", stats);
drop(conn);
pool.close().await;
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_connection_pool_concurrent_access() {
use lnc_client::{ConnectionPool, ConnectionPoolConfig};
use std::sync::Arc;
let addr = get_test_addr();
let config = ConnectionPoolConfig::new()
.with_max_connections(3)
.with_acquire_timeout(Duration::from_secs(5));
let pool = Arc::new(ConnectionPool::new(&addr, config).await.unwrap());
let mut handles = vec![];
for i in 0..5 {
let pool = pool.clone();
handles.push(tokio::spawn(async move {
let mut conn = pool.get().await.unwrap();
let latency = conn.ping().await.unwrap();
println!("Task {} ping latency: {:?}", i, latency);
}));
}
for handle in handles {
handle.await.unwrap();
}
let stats = pool.stats();
println!("Concurrent access stats: {:?}", stats);
pool.close().await;
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_reconnecting_client_basic() {
use lnc_client::ReconnectingClient;
let addr = get_test_addr();
let mut client = ReconnectingClient::connect(&addr).await.unwrap();
let inner = client.client().await.unwrap();
let latency = inner.ping().await.unwrap();
println!("ReconnectingClient ping latency: {:?}", latency);
assert_eq!(client.original_addr(), &addr);
assert_eq!(client.reconnect_attempts(), 0);
}
#[tokio::test]
#[ignore = "requires LANCE cluster for failover testing"]
async fn test_reconnecting_client_leader_failover() {
use lnc_client::ReconnectingClient;
use std::net::SocketAddr;
let addr = get_test_addr();
let mut client = ReconnectingClient::connect(&addr)
.await
.unwrap()
.with_max_attempts(3)
.with_follow_leader(true);
let new_leader: SocketAddr = "127.0.0.1:1993".parse().unwrap();
client.set_leader_addr(new_leader);
assert_eq!(client.leader_addr(), Some(new_leader));
println!("Leader address updated to {:?}", client.leader_addr());
}
#[tokio::test]
async fn test_connection_to_invalid_address() {
let config = ClientConfig {
addr: "127.0.0.1:59999".parse().unwrap(), connect_timeout: Duration::from_millis(100),
read_timeout: Duration::from_secs(1),
write_timeout: Duration::from_secs(1),
keepalive_interval: Duration::from_secs(10),
tls: None,
};
let result = LanceClient::connect(config).await;
assert!(result.is_err(), "Should fail to connect to invalid address");
println!("Connection to invalid address correctly failed");
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_producer_connect_and_send() {
use lnc_client::{Producer, ProducerConfig};
let config = ProducerConfig::new()
.with_batch_size(1024)
.with_linger_ms(10);
let addr = get_test_addr();
let producer = Producer::connect(&addr, config).await.unwrap();
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("producer_test");
let topic_info = client.create_topic(&topic_name).await.unwrap();
let topic_id = topic_info.id;
for i in 0..10 {
let ack = producer
.send(topic_id, format!("message-{}", i).as_bytes())
.await
.unwrap();
assert!(ack.batch_id > 0);
assert_eq!(ack.topic_id, topic_id);
}
let metrics = producer.metrics();
assert!(metrics.records_sent >= 10);
assert!(metrics.bytes_sent > 0);
producer.close().await.unwrap();
client.delete_topic(topic_id).await.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_producer_batching_and_flush() {
use lnc_client::{Producer, ProducerConfig};
let config = ProducerConfig::new()
.with_batch_size(16 * 1024) .with_linger_ms(1000);
let addr = get_test_addr();
let producer = Producer::connect(&addr, config).await.unwrap();
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("producer_batch_test");
let topic_info = client.create_topic(&topic_name).await.unwrap();
let topic_id = topic_info.id;
let start = Instant::now();
for i in 0..5 {
producer
.send_async(topic_id, format!("batch-message-{}", i).as_bytes())
.await
.unwrap();
}
assert!(start.elapsed() < Duration::from_millis(100));
producer.flush().await.unwrap();
let metrics = producer.metrics();
assert!(metrics.batches_sent >= 1);
producer.close().await.unwrap();
client.delete_topic(topic_id).await.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore = "requires running LANCE server"]
async fn test_producer_metrics_tracking() {
use lnc_client::{Producer, ProducerConfig};
let config = ProducerConfig::new()
.with_batch_size(1024)
.with_linger_ms(5);
let addr = get_test_addr();
let producer = Producer::connect(&addr, config).await.unwrap();
let mut client = LanceClient::connect(test_config()).await.unwrap();
let topic_name = unique_topic_name("producer_metrics_test");
let topic_info = client.create_topic(&topic_name).await.unwrap();
let topic_id = topic_info.id;
let initial = producer.metrics();
assert_eq!(initial.records_sent, 0);
assert_eq!(initial.bytes_sent, 0);
assert_eq!(initial.errors, 0);
let record_count: u64 = 20;
let record_size: u64 = 100;
for i in 0..record_count {
let data = format!("{:0>width$}", i, width = record_size as usize);
producer.send(topic_id, data.as_bytes()).await.unwrap();
}
let final_metrics = producer.metrics();
assert_eq!(final_metrics.records_sent, record_count);
assert!(final_metrics.bytes_sent >= record_count * record_size);
assert!(final_metrics.batches_sent >= 1);
assert_eq!(final_metrics.errors, 0);
producer.close().await.unwrap();
client.delete_topic(topic_id).await.unwrap();
client.close().await.unwrap();
}