pub mod diff;
pub mod poller;
pub mod storage;
pub mod types;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context as _, Result, bail};
use chrono::Utc;
use tokio::sync::{RwLock, broadcast};
use tracing::info;
pub use types::{
AddOptions, DiffKind, NotifyOn, Watch, WatchEvent, WatchId, WatchOptions, WatchSnapshot,
};
const DEFAULT_INTERVAL_SECS: u64 = 3600;
const BROADCAST_CAPACITY: usize = 64;
const POLL_LOOP_INTERVAL: Duration = Duration::from_secs(60);
pub struct WatchManager {
storage_dir: PathBuf,
snapshot_dir: PathBuf,
watches: Arc<RwLock<HashMap<WatchId, Watch>>>,
event_tx: broadcast::Sender<WatchEvent>,
}
impl WatchManager {
pub fn new_default() -> Result<Self> {
let base = dirs::data_local_dir()
.or_else(dirs::home_dir)
.unwrap_or_else(|| PathBuf::from("."))
.join("nab")
.join("watches");
Self::with_storage_dir(base)
}
pub fn with_storage_dir(storage_dir: PathBuf) -> Result<Self> {
let snapshot_dir = storage_dir.join("snapshots");
let (event_tx, _) = broadcast::channel(BROADCAST_CAPACITY);
let manager = Self {
storage_dir,
snapshot_dir,
watches: Arc::new(RwLock::new(HashMap::new())),
event_tx,
};
manager.load_from_disk();
Ok(manager)
}
fn load_from_disk(&self) {
if !self.storage_dir.exists() {
return;
}
let watches = storage::load_all_watches(&self.storage_dir);
let mut table = self.watches.try_write().expect("no contention at init");
for w in watches {
table.insert(w.id.clone(), w);
}
info!("Loaded {} watches from disk", table.len());
}
pub async fn add(&self, url: &str, mut opts: AddOptions) -> Result<WatchId> {
url::Url::parse(url).with_context(|| format!("invalid URL: {url}"))?;
if opts.interval_secs == 0 {
opts.interval_secs = DEFAULT_INTERVAL_SECS;
}
let id = generate_id(url, opts.selector.as_deref());
info!(%id, %url, "Adding watch");
let (etag, last_modified, snapshot) =
initial_fetch(url, opts.selector.as_deref(), &opts.options).await?;
let now = Utc::now();
let watch = Watch {
id: id.clone(),
url: url.to_owned(),
selector: opts.selector.clone(),
interval_secs: opts.interval_secs,
created_at: now,
last_check_at: Some(now),
last_change_at: Some(now),
last_etag: etag,
last_last_modified: last_modified,
snapshots: snapshot.into_iter().collect(),
consecutive_errors: 0,
options: opts.options,
};
if let Some(snap) = watch.snapshots.first() {
let content =
load_initial_content(url, watch.selector.as_deref(), &watch.options).await;
if let Ok(c) = content {
let _ = storage::save_snapshot_body(&self.snapshot_dir, &snap.sha256, c.as_bytes());
}
}
storage::save_watch(&self.storage_dir, &watch).context("persist new watch")?;
{
let mut table = self.watches.write().await;
table.insert(id.clone(), watch);
}
let _ = self.event_tx.send(WatchEvent::Added(id.clone()));
Ok(id)
}
pub async fn remove(&self, id: &WatchId) -> Result<()> {
let watch = {
let mut table = self.watches.write().await;
table
.remove(id)
.ok_or_else(|| anyhow::anyhow!("watch '{id}' not found"))?
};
storage::delete_watch(&self.storage_dir, id).context("delete watch file")?;
let still_referenced = self.all_snapshot_hashes().await;
storage::gc_snapshots(&self.snapshot_dir, &still_referenced);
let _ = self.event_tx.send(WatchEvent::Removed(id.clone()));
info!(%id, url = %watch.url, "Watch removed");
Ok(())
}
pub async fn list(&self) -> Vec<Watch> {
self.watches.read().await.values().cloned().collect()
}
pub async fn get(&self, id: &WatchId) -> Option<Watch> {
self.watches.read().await.get(id).cloned()
}
pub fn subscribe(&self) -> broadcast::Receiver<WatchEvent> {
self.event_tx.subscribe()
}
pub async fn render_resource(&self, id: &WatchId) -> Option<String> {
let watch = self.get(id).await?;
let snap = watch.snapshots.first()?;
let body = storage::load_snapshot_body(&self.snapshot_dir, &snap.sha256)?;
let text = String::from_utf8_lossy(&body).into_owned();
Some(format!(
"# Watch: {}\n\n\
**URL**: {}\n\
**Last checked**: {}\n\
**Last changed**: {}\n\
**Interval**: {}s\n\
{}\
\n---\n\n{}",
watch.id,
watch.url,
watch.last_check_at.map_or_else(
|| "never".into(),
|t| t.format("%Y-%m-%dT%H:%M:%SZ").to_string()
),
watch.last_change_at.map_or_else(
|| "never".into(),
|t| t.format("%Y-%m-%dT%H:%M:%SZ").to_string()
),
watch.interval_secs,
watch
.selector
.as_deref()
.map(|s| format!("**Selector**: `{s}`\n"))
.unwrap_or_default(),
text,
))
}
pub async fn poll_loop(self: Arc<Self>) {
let mut ticker = tokio::time::interval(POLL_LOOP_INTERVAL);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
self.poll_due().await;
}
}
pub async fn poll_due(&self) {
let due: Vec<Watch> = {
let table = self.watches.read().await;
table.values().filter(|w| w.is_due()).cloned().collect()
};
if due.is_empty() {
return;
}
info!("Polling {} due watches", due.len());
let results = poller::poll_batch(due, &self.storage_dir, &self.snapshot_dir).await;
let mut table = self.watches.write().await;
for result in results {
table.insert(result.id.clone(), result.updated_watch);
let _ = self.event_tx.send(result.event);
}
}
async fn all_snapshot_hashes(&self) -> HashSet<String> {
self.watches
.read()
.await
.values()
.flat_map(|w| w.snapshots.iter().map(|s| s.sha256.clone()))
.collect()
}
}
fn generate_id(url: &str, selector: Option<&str>) -> WatchId {
use sha2::{Digest, Sha256};
let ts = Utc::now().timestamp_nanos_opt().unwrap_or(0);
let mut h = Sha256::new();
h.update(url.as_bytes());
if let Some(s) = selector {
h.update(b"\x00");
h.update(s.as_bytes());
}
h.update(ts.to_le_bytes());
hex::encode(&h.finalize()[..4])
}
async fn initial_fetch(
url: &str,
selector: Option<&str>,
options: &WatchOptions,
) -> Result<(Option<String>, Option<String>, Option<WatchSnapshot>)> {
let client = reqwest::Client::builder()
.user_agent(poller::WATCH_USER_AGENT)
.timeout(Duration::from_secs(30))
.build()?;
let resp = client
.get(url)
.send()
.await
.context("initial fetch failed")?;
if !resp.status().is_success() {
bail!("initial fetch returned HTTP {}", resp.status());
}
let etag = resp
.headers()
.get(reqwest::header::ETAG)
.and_then(|v| v.to_str().ok())
.map(ToOwned::to_owned);
let last_modified = resp
.headers()
.get(reqwest::header::LAST_MODIFIED)
.and_then(|v| v.to_str().ok())
.map(ToOwned::to_owned);
let body = resp.bytes().await.context("read initial body")?;
let content = diff::extract_content(
&String::from_utf8_lossy(&body),
selector,
&options.diff_kind,
);
let sha256 = diff::sha256_hex(content.as_bytes());
let snap = WatchSnapshot {
sha256,
captured_at: Utc::now(),
size: body.len(),
};
Ok((etag, last_modified, Some(snap)))
}
async fn load_initial_content(
url: &str,
selector: Option<&str>,
options: &WatchOptions,
) -> Result<String> {
let client = reqwest::Client::builder()
.user_agent(poller::WATCH_USER_AGENT)
.timeout(Duration::from_secs(30))
.build()?;
let body = client.get(url).send().await?.bytes().await?;
Ok(diff::extract_content(
&String::from_utf8_lossy(&body),
selector,
&options.diff_kind,
))
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn manager_with_tmp() -> (Arc<WatchManager>, TempDir) {
let dir = tempfile::tempdir().unwrap();
let mgr = Arc::new(WatchManager::with_storage_dir(dir.path().to_owned()).unwrap());
(mgr, dir)
}
#[test]
fn generate_id_is_8_hex_chars() {
let id = generate_id("https://example.com", None);
assert_eq!(id.len(), 8);
assert!(id.chars().all(|c| c.is_ascii_hexdigit()), "got: {id}");
}
#[test]
fn add_returns_unique_ids_for_same_url() {
let id1 = generate_id("https://example.com", None);
std::thread::sleep(std::time::Duration::from_millis(1));
let id2 = generate_id("https://example.com", None);
assert_ne!(id1, id2);
}
#[tokio::test]
async fn list_after_add_contains_watch() {
let (mgr, _dir) = manager_with_tmp();
let watch = Watch {
id: "test0001".into(),
url: "https://example.com".into(),
selector: None,
interval_secs: 3600,
created_at: Utc::now(),
last_check_at: None,
last_change_at: None,
last_etag: None,
last_last_modified: None,
snapshots: vec![],
consecutive_errors: 0,
options: WatchOptions::default(),
};
{
let mut table = mgr.watches.write().await;
table.insert(watch.id.clone(), watch.clone());
}
storage::save_watch(&mgr.storage_dir, &watch).unwrap();
let list = mgr.list().await;
assert!(list.iter().any(|w| w.id == "test0001"));
}
#[tokio::test]
async fn remove_drops_from_list() {
let (mgr, _dir) = manager_with_tmp();
let watch = Watch {
id: "rmtest01".into(),
url: "https://remove-me.com".into(),
selector: None,
interval_secs: 3600,
created_at: Utc::now(),
last_check_at: None,
last_change_at: None,
last_etag: None,
last_last_modified: None,
snapshots: vec![],
consecutive_errors: 0,
options: WatchOptions::default(),
};
{
let mut table = mgr.watches.write().await;
table.insert(watch.id.clone(), watch.clone());
}
storage::save_watch(&mgr.storage_dir, &watch).unwrap();
mgr.remove(&watch.id).await.unwrap();
let list = mgr.list().await;
assert!(!list.iter().any(|w| w.id == "rmtest01"));
}
#[tokio::test]
async fn snapshot_dedup_shares_file() {
let (mgr, _dir) = manager_with_tmp();
let body = b"shared content";
let sha = diff::sha256_hex(body);
storage::save_snapshot_body(&mgr.snapshot_dir, &sha, body).unwrap();
storage::save_snapshot_body(&mgr.snapshot_dir, &sha, body).unwrap();
let count = std::fs::read_dir(&mgr.snapshot_dir).unwrap().count();
assert_eq!(count, 1);
}
#[tokio::test]
async fn render_resource_returns_none_for_unknown_id() {
let (mgr, _dir) = manager_with_tmp();
let result = mgr.render_resource(&"unknown1".into()).await;
assert!(result.is_none());
}
}