#![cfg(all(feature = "client", feature = "server"))]
use std::time::Duration;
use ferro_lumberjack::client::ClientBuilder;
use ferro_lumberjack::server::Server;
async fn ephemeral() -> (String, ferro_lumberjack::server::Listener) {
let listener = Server::builder().bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
(addr.to_string(), listener)
}
#[tokio::test]
async fn e2e_single_window_uncompressed() {
let (addr, listener) = ephemeral().await;
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
conn.read_and_ack().await.unwrap()
});
let mut client = ClientBuilder::new()
.add_host(addr)
.compression_level(0)
.timeout(Duration::from_secs(5))
.connect()
.await
.unwrap();
let acked = client
.send_json(vec![
br#"{"a":1}"#.to_vec(),
br#"{"a":2}"#.to_vec(),
br#"{"a":3}"#.to_vec(),
])
.await
.unwrap();
assert_eq!(acked, 3);
let window = server.await.unwrap().expect("got window");
assert_eq!(window.events.len(), 3);
assert_eq!(window.last_seq, 3);
assert_eq!(window.events[2].payload, br#"{"a":3}"#);
}
#[tokio::test]
async fn e2e_single_window_compressed() {
let (addr, listener) = ephemeral().await;
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
conn.read_and_ack().await.unwrap()
});
let payloads: Vec<Vec<u8>> = (0..20)
.map(|_| br#"{"repeat":"AAAAAAAAAAAAAAAAAAAAAAAAAAAA"}"#.to_vec())
.collect();
let mut client = ClientBuilder::new()
.add_host(addr)
.compression_level(6)
.timeout(Duration::from_secs(5))
.connect()
.await
.unwrap();
let acked = client.send_json(payloads.clone()).await.unwrap();
assert_eq!(acked, 20);
let window = server.await.unwrap().expect("got window");
assert_eq!(window.events.len(), 20);
assert_eq!(window.last_seq, 20);
assert_eq!(window.events[0].payload, payloads[0]);
}
#[tokio::test]
async fn e2e_multiple_consecutive_windows() {
let (addr, listener) = ephemeral().await;
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
let mut got = Vec::new();
while let Some(window) = conn.read_window().await.unwrap() {
conn.send_ack(window.last_seq).await.unwrap();
got.push(window);
}
got
});
let mut client = ClientBuilder::new()
.add_host(addr)
.compression_level(0)
.timeout(Duration::from_secs(5))
.connect()
.await
.unwrap();
let acked1 = client
.send_json(vec![b"a".to_vec(), b"b".to_vec()])
.await
.unwrap();
let acked2 = client.send_json(vec![b"c".to_vec()]).await.unwrap();
let acked3 = client
.send_json(vec![b"d".to_vec(), b"e".to_vec(), b"f".to_vec()])
.await
.unwrap();
assert_eq!(acked1, 2);
assert_eq!(acked2, 1);
assert_eq!(acked3, 3);
drop(client);
let windows = server.await.unwrap();
assert_eq!(windows.len(), 3);
assert_eq!(windows[0].events.len(), 2);
assert_eq!(windows[1].events.len(), 1);
assert_eq!(windows[2].events.len(), 3);
assert_eq!(windows[0].events[0].seq, 1);
assert_eq!(windows[2].events[2].seq, 6);
}
#[tokio::test]
async fn e2e_host_failover_to_second_host() {
let dead = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let dead_addr = dead.local_addr().unwrap();
drop(dead);
let (live_addr, listener) = ephemeral().await;
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
conn.read_and_ack().await.unwrap()
});
let mut client = ClientBuilder::new()
.add_host(live_addr) .timeout(Duration::from_secs(2))
.compression_level(0)
.connect()
.await
.unwrap();
let _ = dead_addr;
let acked = client.send_json(vec![b"x".to_vec()]).await.unwrap();
assert_eq!(acked, 1);
let _ = server.await.unwrap().expect("window");
}
#[cfg(feature = "tls")]
#[tokio::test]
async fn e2e_tls_round_trip_with_self_signed() {
use ferro_lumberjack::tls::{ServerTlsConfig, TlsConfig};
let params = rcgen::CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let kp = rcgen::KeyPair::generate().unwrap();
let cert = params.self_signed(&kp).unwrap();
let cert_pem = cert.pem();
let key_pem = kp.serialize_pem();
let server_tls = ServerTlsConfig::builder()
.cert_pem_bytes(cert_pem.as_bytes())
.unwrap()
.key_pem_bytes(key_pem.as_bytes())
.unwrap()
.build()
.unwrap();
let listener = Server::builder()
.tls(server_tls)
.bind("127.0.0.1:0")
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
conn.read_and_ack().await.unwrap()
});
let client_tls = TlsConfig::builder()
.add_ca_pem_bytes(cert_pem.as_bytes())
.unwrap()
.build()
.unwrap();
let host = format!("localhost:{}", addr.port());
let mut client = ClientBuilder::new()
.add_host(host)
.tls(client_tls)
.timeout(Duration::from_secs(10))
.compression_level(0)
.connect()
.await
.unwrap();
let acked = client
.send_json(vec![b"tls-event-1".to_vec(), b"tls-event-2".to_vec()])
.await
.unwrap();
assert_eq!(acked, 2);
let window = server.await.unwrap().expect("window");
assert_eq!(window.events.len(), 2);
assert_eq!(window.events[0].payload, b"tls-event-1");
assert_eq!(window.last_seq, 2);
}
#[cfg(feature = "tls")]
#[tokio::test]
async fn e2e_tls_with_dangerous_disable_verification() {
use ferro_lumberjack::tls::{ServerTlsConfig, TlsConfig};
let params = rcgen::CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let kp = rcgen::KeyPair::generate().unwrap();
let cert = params.self_signed(&kp).unwrap();
let server_tls = ServerTlsConfig::builder()
.cert_pem_bytes(cert.pem().as_bytes())
.unwrap()
.key_pem_bytes(kp.serialize_pem().as_bytes())
.unwrap()
.build()
.unwrap();
let listener = Server::builder()
.tls(server_tls)
.bind("127.0.0.1:0")
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
conn.read_and_ack().await.unwrap()
});
let client_tls = TlsConfig::builder()
.dangerous_disable_verification()
.build()
.unwrap();
let host = format!("127.0.0.1:{}", addr.port());
let mut client = ClientBuilder::new()
.add_host(host)
.tls(client_tls)
.timeout(Duration::from_secs(10))
.compression_level(0)
.connect()
.await
.unwrap();
let acked = client.send_json(vec![b"insecure".to_vec()]).await.unwrap();
assert_eq!(acked, 1);
let _ = server.await.unwrap().expect("window");
}