use kvarn::prelude::*;
use kvarn_testing::prelude::*;
#[tokio::test]
async fn basic() {
let server = ServerBuilder::default()
.with_extensions(|ext| {
ext.add_prepare_single(
"/slow-response",
prepare!(_, _, _, _, {
tokio::time::sleep(Duration::from_millis(100)).await;
FatResponse::no_cache(Response::new(Bytes::from_static(b"Finally here!")))
}),
)
})
.run()
.await;
internals::timeout(Duration::from_millis(1000), async move {
let shutdown = server.get_shutdown_manager();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
shutdown.shutdown();
});
let response = server.get("/slow-response").send().await.unwrap();
assert_eq!(response.text().await.unwrap(), "Finally here!");
})
.await
.unwrap();
}
#[tokio::test]
async fn handover() {
let socket_path = "handover_test.sock";
let server = ServerBuilder::default()
.http()
.enable_handover(socket_path)
.run()
.await;
let url = server.url("/");
let cert = server.cert().cloned();
let failed = Arc::new(threading::atomic::AtomicBool::new(false));
let running = Arc::new(threading::atomic::AtomicBool::new(true));
let running_send = Arc::clone(&running);
let f = failed.clone();
tokio::spawn(async move {
loop {
if !running_send.load(threading::Ordering::Acquire) {
break;
}
let mut client = reqwest::Client::builder();
if let Some(cert) = &cert {
let cert = reqwest::Certificate::from_der(cert).unwrap();
client = client.add_root_certificate(cert);
};
let client = client.build().unwrap();
let request = client.request(reqwest::Method::GET, url.clone());
let failed = f.clone();
tokio::spawn(async move {
let response = match request.send().await {
Err(err) => {
let error_text = format!("{err:?}");
if error_text.contains("ConnectionReset") {
return;
}
failed.store(true, threading::Ordering::SeqCst);
println!("{:?}", std::time::SystemTime::now());
panic!("{err}");
}
Ok(r) => r,
};
if let Err(err) = response.text().await {
failed.store(true, threading::Ordering::SeqCst);
println!("{:?}", std::time::SystemTime::now());
panic!("{err}");
}
});
if !running_send.load(threading::Ordering::Relaxed) {
return;
}
tokio::time::sleep(Duration::from_micros(100)).await;
}
});
tokio::time::sleep(Duration::from_millis(100)).await;
let server = ServerBuilder::default()
.http()
.handover_from(&server)
.run()
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
running.store(false, threading::Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(10)).await;
drop(server);
tokio::time::sleep(Duration::from_millis(10)).await;
if failed.load(threading::Ordering::SeqCst) {
panic!("Requests were rejected!");
}
assert!(!Path::new(socket_path).exists());
}