import argparse
import json
import os
import re
import signal
import subprocess
import sys
import time
from pathlib import Path
import requests
STATE_FILE = Path(__file__).parent / ".monitor_state.json"
ENV_FILE = Path(__file__).resolve().parent.parent.parent / ".env.local"
HN_API = "https://hacker-news.firebaseio.com/v0"
REDDIT_TOKEN_URL = "https://www.reddit.com/api/v1/access_token"
USER_AGENT = "rgx-launch-monitor/0.1 (by /u/rgx_dev)"
def load_state() -> dict:
if STATE_FILE.exists():
with open(STATE_FILE) as f:
return json.load(f)
return {}
def save_state(state: dict) -> None:
tmp = STATE_FILE.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(state, f, indent=2)
tmp.rename(STATE_FILE)
def notify(title: str, message: str) -> None:
safe_title = title.replace('"', '\\"')
safe_msg = message.replace('"', '\\"')
script = (
f'display notification "{safe_msg}" with title "{safe_title}" sound name "Glass"'
)
try:
subprocess.run(
["osascript", "-e", script],
capture_output=True,
timeout=5,
)
except Exception:
pass
def load_env() -> dict[str, str]:
env: dict[str, str] = {}
if not ENV_FILE.exists():
return env
with open(ENV_FILE) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
env[key.strip()] = value.strip()
return env
def hn_fetch_item(item_id: int) -> dict | None:
try:
r = requests.get(f"{HN_API}/item/{item_id}.json", timeout=10)
r.raise_for_status()
return r.json()
except Exception as e:
print(f" [HN] Error fetching item {item_id}: {e}")
return None
def hn_collect_comments(root_id: int) -> list[dict]:
comments: list[dict] = []
queue = [root_id]
while queue:
item_id = queue.pop(0)
item = hn_fetch_item(item_id)
if item is None:
continue
if item.get("type") == "comment" and not item.get("deleted") and not item.get("dead"):
comments.append(item)
for kid in item.get("kids", []):
queue.append(kid)
return comments
def poll_hn(hn_id: int, state: dict, silent: bool) -> int:
key = f"hn_{hn_id}"
seen: set[int] = set(state.get(key, []))
print(f" [HN] Fetching comments for item {hn_id}...")
comments = hn_collect_comments(hn_id)
new_count = 0
for c in comments:
cid = c["id"]
if cid not in seen:
seen.add(cid)
new_count += 1
if not silent:
author = c.get("by", "anonymous")
text = c.get("text", "")[:120]
text = re.sub(r"<[^>]+>", "", text)
notify(
f"HN Comment by {author}",
text or "(empty comment)",
)
print(f" [HN] New comment by {author}: {text[:80]}...")
state[key] = list(seen)
return new_count
def reddit_get_token(env: dict[str, str]) -> str | None:
client_id = env.get("REDDIT_CLIENT_ID", "")
client_secret = env.get("REDDIT_CLIENT_SECRET", "")
username = env.get("REDDIT_USERNAME", "")
password = env.get("REDDIT_PASSWORD", "")
if not all([client_id, client_secret, username, password]):
print(" [Reddit] Missing credentials in .env.local, skipping Reddit.")
return None
try:
r = requests.post(
REDDIT_TOKEN_URL,
auth=(client_id, client_secret),
data={
"grant_type": "password",
"username": username,
"password": password,
},
headers={"User-Agent": USER_AGENT},
timeout=10,
)
r.raise_for_status()
return r.json().get("access_token")
except Exception as e:
print(f" [Reddit] Auth error: {e}")
return None
def reddit_extract_post_path(url: str) -> str | None:
m = re.search(r"r/(\w+)/comments/(\w+)", url)
if not m:
return None
subreddit, post_id = m.group(1), m.group(2)
return f"/r/{subreddit}/comments/{post_id}"
def reddit_flatten_comments(node: dict, out: list[dict]) -> None:
if node.get("kind") == "t1":
data = node.get("data", {})
out.append(data)
replies = data.get("replies")
if isinstance(replies, dict):
for child in replies.get("data", {}).get("children", []):
reddit_flatten_comments(child, out)
elif node.get("kind") == "Listing":
for child in node.get("data", {}).get("children", []):
reddit_flatten_comments(child, out)
def poll_reddit(reddit_url: str, token: str, state: dict, silent: bool) -> int:
post_path = reddit_extract_post_path(reddit_url)
if not post_path:
print(f" [Reddit] Could not parse URL: {reddit_url}")
return 0
key = f"reddit_{post_path}"
seen: set[str] = set(state.get(key, []))
print(f" [Reddit] Fetching comments for {post_path}...")
try:
r = requests.get(
f"https://oauth.reddit.com{post_path}.json?limit=500&depth=10",
headers={
"Authorization": f"Bearer {token}",
"User-Agent": USER_AGENT,
},
timeout=15,
)
r.raise_for_status()
data = r.json()
except Exception as e:
print(f" [Reddit] Fetch error: {e}")
return 0
comments: list[dict] = []
if isinstance(data, list) and len(data) > 1:
reddit_flatten_comments(data[1], comments)
new_count = 0
for c in comments:
cid = c.get("id", "")
if not cid or cid in seen:
continue
seen.add(cid)
new_count += 1
if not silent:
author = c.get("author", "[deleted]")
body = (c.get("body") or "")[:120]
notify(
f"Reddit Comment by {author}",
body or "(empty comment)",
)
print(f" [Reddit] New comment by {author}: {body[:80]}...")
state[key] = list(seen)
return new_count
_shutdown = False
def _handle_sigint(sig, frame):
global _shutdown
_shutdown = True
def interruptible_sleep(seconds: int) -> None:
for _ in range(seconds):
if _shutdown:
break
time.sleep(1)
def main() -> None:
parser = argparse.ArgumentParser(
description="Monitor HN/Reddit posts for new comments and send macOS notifications."
)
parser.add_argument("--hn-id", type=int, help="Hacker News item ID to monitor")
parser.add_argument("--reddit-url", type=str, help="Reddit post URL to monitor")
parser.add_argument(
"--interval", type=int, default=60, help="Poll interval in seconds (default: 60)"
)
args = parser.parse_args()
if not args.hn_id and not args.reddit_url:
parser.error("At least one of --hn-id or --reddit-url is required.")
signal.signal(signal.SIGINT, _handle_sigint)
state = load_state()
env = load_env()
hn_key = f"hn_{args.hn_id}" if args.hn_id else None
reddit_key = None
if args.reddit_url:
post_path = reddit_extract_post_path(args.reddit_url)
if post_path:
reddit_key = f"reddit_{post_path}"
hn_first_run = hn_key and hn_key not in state
reddit_first_run = reddit_key and reddit_key not in state
reddit_token = None
if args.reddit_url:
reddit_token = reddit_get_token(env)
if not reddit_token:
if not args.hn_id:
print("Error: Reddit auth failed and no HN ID provided. Nothing to monitor.")
sys.exit(1)
print(" [Reddit] Will skip Reddit polling (no valid token).")
if hn_first_run or reddit_first_run:
print("First run detected — indexing existing comments silently...")
if hn_first_run and args.hn_id:
count = poll_hn(args.hn_id, state, silent=True)
print(f" [HN] Indexed {count} existing comments.")
if reddit_first_run and args.reddit_url and reddit_token:
count = poll_reddit(args.reddit_url, reddit_token, state, silent=True)
print(f" [Reddit] Indexed {count} existing comments.")
save_state(state)
print("Indexing complete. Future runs will notify on new comments.\n")
print(f"Monitoring started (interval: {args.interval}s). Press Ctrl+C to stop.\n")
poll_count = 0
while not _shutdown:
poll_count += 1
timestamp = time.strftime("%H:%M:%S")
print(f"[{timestamp}] Poll #{poll_count}")
hn_new = 0
reddit_new = 0
if args.hn_id:
try:
hn_new = poll_hn(args.hn_id, state, silent=False)
except Exception as e:
print(f" [HN] Unexpected error: {e}")
if args.reddit_url and reddit_token:
try:
reddit_new = poll_reddit(args.reddit_url, reddit_token, state, silent=False)
except Exception as e:
print(f" [Reddit] Unexpected error: {e}")
total_new = hn_new + reddit_new
if total_new == 0:
print(" No new comments.\n")
else:
print(f" {total_new} new comment(s) found.\n")
save_state(state)
interruptible_sleep(args.interval)
print("\nShutting down — saving state...")
save_state(state)
print("State saved. Goodbye!")
if __name__ == "__main__":
main()