use std::path::PathBuf;
use std::time::{Duration, SystemTime};
use thiserror::Error;
use tokio::signal;
use tokio_stream::StreamExt;
use ddns_a::config::ValidatedConfig;
use ddns_a::monitor::{
ChangeKind, DebouncePolicy, HybridMonitor, IpChange, PollingMonitor, diff,
filter_by_change_kind, filter_by_version,
};
use ddns_a::network::filter::{FilterChain, FilteredFetcher};
use ddns_a::network::platform::PlatformFetcher;
use ddns_a::network::{AdapterSnapshot, AddressFetcher, IpVersion};
use ddns_a::state::{FileStateStore, LoadResult, StateStore};
use ddns_a::webhook::{HttpWebhook, ReqwestClient, WebhookSender};
type AppFetcher = FilteredFetcher<PlatformFetcher, FilterChain>;
#[cfg(windows)]
use ddns_a::monitor::platform::PlatformListener;
#[cfg(test)]
#[path = "run_tests.rs"]
mod tests;
#[derive(Debug, Error)]
pub enum RunError {
#[error("Failed to create API listener: {0}")]
ApiListenerCreation(#[source] ddns_a::monitor::ApiError),
#[error("Monitor stream terminated unexpectedly")]
StreamTerminated,
#[error("Failed to fetch initial network state: {0}")]
InitialFetch(#[source] ddns_a::network::FetchError),
#[error("Failed to save state: {0}")]
StateSave(#[source] ddns_a::state::StateError),
}
struct RuntimeOptions {
ip_version: IpVersion,
change_kind: ChangeKind,
poll_interval: Duration,
poll_only: bool,
dry_run: bool,
state_file: Option<PathBuf>,
}
impl From<&ValidatedConfig> for RuntimeOptions {
fn from(config: &ValidatedConfig) -> Self {
Self {
ip_version: config.ip_version,
change_kind: config.change_kind,
poll_interval: config.poll_interval,
poll_only: config.poll_only,
dry_run: config.dry_run,
state_file: config.state_file.clone(),
}
}
}
#[cfg(not(tarpaulin_include))]
pub async fn execute(config: ValidatedConfig) -> Result<(), RunError> {
let options = RuntimeOptions::from(&config);
let webhook = create_webhook(&config);
let fetcher = FilteredFetcher::new(PlatformFetcher::default(), config.filter);
if options.dry_run {
tracing::info!("Dry-run mode enabled - webhook requests will be logged but not sent");
}
let state_store = options.state_file.as_ref().map(FileStateStore::new);
if let Some(ref store) = state_store {
tracing::info!("State persistence enabled: {}", store.path().display());
startup_change_detection(store, &fetcher, &webhook, &options).await?;
}
if options.poll_only {
tracing::info!(
"Polling-only mode enabled (interval: {}s)",
options.poll_interval.as_secs()
);
run_polling_loop(fetcher, webhook, options, state_store).await
} else {
tracing::info!(
"Hybrid mode enabled (API events + polling every {}s)",
options.poll_interval.as_secs()
);
run_hybrid_loop(fetcher, webhook, options, state_store).await
}
}
#[cfg(not(tarpaulin_include))]
async fn startup_change_detection<W: WebhookSender>(
store: &FileStateStore,
fetcher: &AppFetcher,
webhook: &W,
options: &RuntimeOptions,
) -> Result<(), RunError> {
let current = fetcher.fetch().map_err(RunError::InitialFetch)?;
let startup_changes = detect_startup_changes(store, ¤t, options);
if startup_changes.is_empty() {
tracing::debug!("No IP changes detected since last run");
} else {
tracing::info!(
"Detected {} change(s) since last run",
startup_changes.len()
);
handle_changes(&startup_changes, webhook, options.dry_run).await;
}
if let Err(e) = store.save(¤t).await {
tracing::error!("Failed to save state: {e}");
return Err(RunError::StateSave(e));
}
Ok(())
}
fn detect_startup_changes(
store: &impl StateStore,
current: &[AdapterSnapshot],
options: &RuntimeOptions,
) -> Vec<IpChange> {
detect_startup_changes_with_timestamp(store, current, options, SystemTime::now())
}
fn detect_startup_changes_with_timestamp(
store: &impl StateStore,
current: &[AdapterSnapshot],
options: &RuntimeOptions,
timestamp: SystemTime,
) -> Vec<IpChange> {
match store.load() {
LoadResult::Loaded(saved) => {
let changes = diff(&saved, current, timestamp);
let filtered = filter_by_version(changes, options.ip_version);
filter_by_change_kind(filtered, options.change_kind)
}
LoadResult::NotFound => {
tracing::info!("No previous state found, starting fresh");
vec![]
}
LoadResult::Corrupted { reason } => {
tracing::warn!("State file corrupted ({reason}), will overwrite on next save");
vec![]
}
}
}
fn create_webhook(config: &ValidatedConfig) -> HttpWebhook<ReqwestClient> {
let mut webhook = HttpWebhook::new(ReqwestClient::new(), config.url.clone())
.with_method(config.method.clone())
.with_headers(config.headers.clone())
.with_retry_policy(config.retry_policy.clone());
if let Some(ref template) = config.body_template {
webhook = webhook.with_body_template(template);
}
webhook
}
#[cfg(not(tarpaulin_include))]
async fn run_polling_loop<W: WebhookSender>(
fetcher: AppFetcher,
webhook: W,
options: RuntimeOptions,
state_store: Option<FileStateStore>,
) -> Result<(), RunError> {
let monitor = PollingMonitor::new(fetcher, options.poll_interval)
.with_debounce(DebouncePolicy::default());
let mut stream = monitor.into_stream();
let shutdown = shutdown_signal();
tokio::pin!(shutdown);
loop {
tokio::select! {
biased;
() = &mut shutdown => {
tracing::info!("Shutdown signal received, stopping...");
return Ok(());
}
changes = stream.next() => {
match changes {
Some(changes) => {
let filtered = filter_by_version(changes, options.ip_version);
let filtered = filter_by_change_kind(filtered, options.change_kind);
if !filtered.is_empty() {
save_state_if_configured(state_store.as_ref(), stream.current_snapshot()).await;
handle_changes(&filtered, &webhook, options.dry_run).await;
}
}
None => {
return Err(RunError::StreamTerminated);
}
}
}
}
}
}
async fn save_state_if_configured(
store: Option<&FileStateStore>,
snapshot: Option<&[AdapterSnapshot]>,
) {
if let (Some(store), Some(snapshot)) = (store, snapshot) {
if let Err(e) = store.save(snapshot).await {
tracing::error!("Failed to save state: {e}");
}
}
}
#[cfg(not(tarpaulin_include))]
#[cfg(windows)]
async fn run_hybrid_loop<W: WebhookSender>(
fetcher: AppFetcher,
webhook: W,
options: RuntimeOptions,
state_store: Option<FileStateStore>,
) -> Result<(), RunError> {
let listener = PlatformListener::new().map_err(RunError::ApiListenerCreation)?;
let monitor = HybridMonitor::new(fetcher, listener, options.poll_interval)
.with_debounce(DebouncePolicy::default());
let mut stream = monitor.into_stream();
let shutdown = shutdown_signal();
tokio::pin!(shutdown);
let mut logged_degradation = false;
loop {
tokio::select! {
biased;
() = &mut shutdown => {
tracing::info!("Shutdown signal received, stopping...");
return Ok(());
}
changes = stream.next() => {
if !logged_degradation && stream.is_polling_only() {
tracing::warn!("API listener failed, degraded to polling-only mode");
logged_degradation = true;
}
match changes {
Some(changes) => {
let filtered = filter_by_version(changes, options.ip_version);
let filtered = filter_by_change_kind(filtered, options.change_kind);
if !filtered.is_empty() {
save_state_if_configured(state_store.as_ref(), stream.current_snapshot()).await;
handle_changes(&filtered, &webhook, options.dry_run).await;
}
}
None => {
return Err(RunError::StreamTerminated);
}
}
}
}
}
}
#[cfg(not(tarpaulin_include))]
#[cfg(not(windows))]
async fn run_hybrid_loop<W: WebhookSender>(
fetcher: AppFetcher,
webhook: W,
options: RuntimeOptions,
state_store: Option<FileStateStore>,
) -> Result<(), RunError> {
tracing::warn!("API listener not supported on this platform, using polling-only mode");
run_polling_loop(fetcher, webhook, options, state_store).await
}
async fn handle_changes<W: WebhookSender>(changes: &[IpChange], webhook: &W, dry_run: bool) {
for change in changes {
let action = if change.is_added() { "+" } else { "-" };
tracing::info!(
"{action} {address} on {adapter}",
address = change.address,
adapter = change.adapter,
);
}
if dry_run {
tracing::debug!("Dry-run: skipping webhook for {} change(s)", changes.len());
return;
}
match webhook.send(changes).await {
Ok(()) => {
tracing::debug!("Webhook sent successfully for {} change(s)", changes.len());
}
Err(e) => {
tracing::error!("Webhook failed: {e}");
}
}
}
#[cfg(not(tarpaulin_include))]
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => {}
() = terminate => {}
}
}