use std::time::Duration;
use serde::Deserialize;
#[derive(Debug, Deserialize, PartialEq, Eq)]
struct AppConfig {
host: String,
port: u16,
}
#[tokio::test]
async fn publishes_valid_updates() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("app.toml");
std::fs::write(&path, "host = \"127.0.0.1\"\nport = 8080\n").expect("write initial");
let watched = zonfig::watch::<AppConfig>(&path).await.expect("watch");
let mut reader = watched.subscribe();
tokio::time::sleep(Duration::from_millis(200)).await;
std::fs::write(&path, "host = \"0.0.0.0\"\nport = 9090\n").expect("write update");
let updated = tokio::time::timeout(Duration::from_secs(3), reader.changed())
.await
.expect("update timeout")
.expect("changed");
assert_eq!(updated.host, "0.0.0.0");
assert_eq!(updated.port, 9090);
}
#[tokio::test]
async fn calls_change_hooks_after_valid_updates() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("app.toml");
std::fs::write(&path, "host = \"127.0.0.1\"\nport = 8080\n").expect("write initial");
let watched = zonfig::watch::<AppConfig>(&path).await.expect("watch");
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
watched.on_change(move |config| {
let _ = tx.send(config.port);
});
tokio::time::sleep(Duration::from_millis(200)).await;
std::fs::write(&path, "host = \"0.0.0.0\"\nport = 9090\n").expect("write update");
let port = tokio::time::timeout(Duration::from_secs(3), rx.recv())
.await
.expect("hook timeout")
.expect("hook event");
assert_eq!(port, 9090);
}
#[tokio::test]
async fn cooldown_suppresses_burst_updates() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("app.toml");
std::fs::write(&path, "host = \"127.0.0.1\"\nport = 8080\n").expect("write initial");
let watched = zonfig::watch_with_options::<AppConfig>(
&path,
zonfig::WatchOptions::default().with_cooldown(Duration::from_millis(350)),
)
.await
.expect("watch");
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
watched.on_change(move |config| {
let _ = tx.send(config.port);
});
tokio::time::sleep(Duration::from_millis(200)).await;
std::fs::write(&path, "host = \"0.0.0.0\"\nport = 9090\n").expect("write first update");
tokio::time::sleep(Duration::from_millis(80)).await;
std::fs::write(&path, "host = \"0.0.0.0\"\nport = 9091\n").expect("write second update");
let port = tokio::time::timeout(Duration::from_secs(3), rx.recv())
.await
.expect("hook timeout")
.expect("hook event");
assert_eq!(port, 9090);
assert!(
tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.is_err(),
"burst updates should trigger one reload"
);
}
#[tokio::test]
async fn keeps_last_good_value_after_invalid_update() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("app.toml");
std::fs::write(&path, "host = \"127.0.0.1\"\nport = 8080\n").expect("write initial");
let watched = zonfig::watch::<AppConfig>(&path).await.expect("watch");
let mut errors = watched.errors();
tokio::time::sleep(Duration::from_millis(200)).await;
std::fs::write(&path, "host = \"broken\"\nport = \"not a number\"\n").expect("write invalid");
let error = tokio::time::timeout(Duration::from_secs(3), errors.recv())
.await
.expect("error timeout")
.expect("error event");
assert!(matches!(&*error, zonfig::Error::Parse { .. }));
assert_eq!(watched.get().port, 8080);
}
#[tokio::test]
async fn with_reads_latest_value() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("app.toml");
std::fs::write(&path, "host = \"127.0.0.1\"\nport = 8080\n").expect("write initial");
let watched = zonfig::watch::<AppConfig>(&path).await.expect("watch");
let mut reader = watched.clone();
assert_eq!(watched.with(|config| config.port), 8080);
tokio::time::sleep(Duration::from_millis(200)).await;
std::fs::write(&path, "host = \"0.0.0.0\"\nport = 9090\n").expect("write update");
tokio::time::timeout(Duration::from_secs(3), reader.changed())
.await
.expect("update timeout")
.expect("changed");
assert_eq!(watched.with(|config| config.port), 9090);
}
#[tokio::test]
async fn calls_error_hooks_after_invalid_updates() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("app.toml");
std::fs::write(&path, "host = \"127.0.0.1\"\nport = 8080\n").expect("write initial");
let watched = zonfig::watch::<AppConfig>(&path).await.expect("watch");
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
watched.on_error(move |error| {
let _ = tx.send(matches!(&*error, zonfig::Error::Parse { .. }));
});
tokio::time::sleep(Duration::from_millis(200)).await;
std::fs::write(&path, "host = \"broken\"\nport = \"not a number\"\n").expect("write invalid");
let is_parse_error = tokio::time::timeout(Duration::from_secs(3), rx.recv())
.await
.expect("hook timeout")
.expect("hook event");
assert!(is_parse_error);
assert_eq!(watched.get().port, 8080);
}