use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use anyhow::{bail, Context, Result};
use reqwest::Url;
const MAX_WATCH_BYTES: usize = 800_000;
const MIN_INTERVAL_SECS: u64 = 10;
pub fn parse_interval(s: &str) -> Result<u64> {
let s = s.trim().to_lowercase();
let secs = if let Some(hours) = s.strip_suffix('h') {
let n: u64 = hours.parse().with_context(|| "Invalid hours value")?;
n * 3600
} else if let Some(mins) = s.strip_suffix('m') {
let n: u64 = mins.parse().with_context(|| "Invalid minutes value")?;
n * 60
} else if let Some(sec_str) = s.strip_suffix('s') {
let n: u64 = sec_str.parse().with_context(|| "Invalid seconds value")?;
n
} else {
s.parse::<u64>()
.with_context(|| "Invalid interval. Use formats like 1h, 30m, or 60s")?
};
if secs < MIN_INTERVAL_SECS {
bail!(
"Interval too small ({}s). Minimum is {}s to avoid excessive requests.",
secs,
MIN_INTERVAL_SECS
);
}
Ok(secs)
}
fn url_hash(url: &str) -> u64 {
let mut hasher = DefaultHasher::new();
url.hash(&mut hasher);
hasher.finish()
}
fn snapshot_path(url: &str) -> PathBuf {
let hash = format!("{:x}", url_hash(url));
zeptoclaw::config::Config::dir()
.join("watch")
.join(format!("{}.txt", hash))
}
async fn validate_watch_url(url: &str) -> Result<Url> {
let parsed = Url::parse(url).with_context(|| format!("Invalid URL: {}", url))?;
match parsed.scheme() {
"http" | "https" => {}
other => bail!("Only http/https URLs are allowed, got: {}", other),
}
if zeptoclaw::tools::is_blocked_host(&parsed) {
bail!("Blocked URL host (local or private network): {}", url);
}
zeptoclaw::tools::resolve_and_check_host(&parsed)
.await
.map_err(|e| anyhow::anyhow!("SSRF check failed for {}: {}", url, e))?;
Ok(parsed)
}
async fn read_body_limited(resp: reqwest::Response, max_bytes: usize) -> Result<String> {
let mut buf = Vec::new();
let stream = resp.bytes().await.unwrap_or_default();
if stream.len() > max_bytes {
buf.extend_from_slice(&stream[..max_bytes]);
} else {
buf.extend_from_slice(&stream);
}
Ok(String::from_utf8_lossy(&buf).into_owned())
}
pub(crate) async fn cmd_watch(url: String, interval: String, notify: Option<String>) -> Result<()> {
let interval_secs = parse_interval(&interval)?;
validate_watch_url(&url).await?;
println!("Watching: {}", url);
println!("Interval: {} ({}s)", interval, interval_secs);
if let Some(ref channel) = notify {
println!("Notify via: {}", channel);
eprintln!("Warning: Channel notification is not yet implemented. Changes will be printed to stdout.");
} else {
println!("Notify: stdout only");
}
println!();
println!("Press Ctrl+C to stop.");
println!();
let watch_dir = zeptoclaw::config::Config::dir().join("watch");
std::fs::create_dir_all(&watch_dir)
.with_context(|| format!("Failed to create watch directory: {:?}", watch_dir))?;
let snap_path = snapshot_path(&url);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()?;
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
ticker.tick().await;
match client.get(&url).send().await {
Ok(resp) => {
if zeptoclaw::tools::is_blocked_host(resp.url()) {
eprintln!(
"[{}] Blocked: redirect to local/private host {}",
chrono::Local::now().format("%H:%M"),
resp.url()
);
continue;
}
let status = resp.status();
if !status.is_success() {
eprintln!(
"[{}] HTTP {} for {}",
chrono::Local::now().format("%H:%M"),
status,
url
);
continue;
}
let body = read_body_limited(resp, MAX_WATCH_BYTES).await?;
let previous = std::fs::read_to_string(&snap_path).unwrap_or_default();
if previous.is_empty() {
std::fs::write(&snap_path, &body)?;
println!(
"[{}] Baseline saved ({} bytes)",
chrono::Local::now().format("%H:%M"),
body.len()
);
} else if body != previous {
std::fs::write(&snap_path, &body)?;
println!(
"[{}] Change detected! (was {} bytes, now {} bytes)",
chrono::Local::now().format("%H:%M"),
previous.len(),
body.len()
);
let message = format!(
"URL changed: {}\nPrevious: {} bytes -> New: {} bytes",
url,
previous.len(),
body.len()
);
if let Some(ref channel) = notify {
println!(" Notification ({}): {}", channel, message);
} else {
println!(" {}", message);
}
} else {
eprintln!("[{}] No change", chrono::Local::now().format("%H:%M"));
}
}
Err(e) => {
eprintln!(
"[{}] Fetch error: {}",
chrono::Local::now().format("%H:%M"),
e
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_interval_hours() {
assert_eq!(parse_interval("1h").unwrap(), 3600);
assert_eq!(parse_interval("2h").unwrap(), 7200);
}
#[test]
fn test_parse_interval_minutes() {
assert_eq!(parse_interval("30m").unwrap(), 1800);
assert_eq!(parse_interval("15m").unwrap(), 900);
}
#[test]
fn test_parse_interval_seconds() {
assert_eq!(parse_interval("60s").unwrap(), 60);
assert_eq!(parse_interval("120s").unwrap(), 120);
}
#[test]
fn test_parse_interval_bare_number() {
assert_eq!(parse_interval("3600").unwrap(), 3600);
}
#[test]
fn test_parse_interval_invalid() {
assert!(parse_interval("abc").is_err());
assert!(parse_interval("").is_err());
}
#[test]
fn test_parse_interval_zero_rejected() {
assert!(parse_interval("0s").is_err());
assert!(parse_interval("0m").is_err());
assert!(parse_interval("0h").is_err());
}
#[test]
fn test_parse_interval_below_minimum_rejected() {
assert!(parse_interval("5s").is_err());
assert!(parse_interval("9s").is_err());
assert!(parse_interval("10s").is_ok());
}
#[test]
fn test_url_hash_deterministic() {
let h1 = url_hash("https://example.com");
let h2 = url_hash("https://example.com");
assert_eq!(h1, h2);
}
#[test]
fn test_url_hash_different_urls() {
let h1 = url_hash("https://example.com");
let h2 = url_hash("https://other.com");
assert_ne!(h1, h2);
}
}