Skip to main content

shunt/
remote.rs

1/// `shunt remote` — relay-based remote account event watcher.
2///
3/// **Host mode** (`shunt remote`):
4///   - Generates a one-time watch code (`RM-…`)
5///   - Polls the local shunt `/status` every 10 s
6///   - Encrypts the snapshot and pushes it to the relay
7///   - Prints the code so the user can enter it on another device
8///
9/// **Client mode** (`shunt remote RM-…`):
10///   - Polls the relay for the latest encrypted snapshot
11///   - Decrypts and diffs against the previous poll
12///   - Fires local system notifications on account events
13use std::collections::{HashMap, HashSet};
14use std::time::{Duration, Instant};
15
16use anyhow::{Context, Result};
17use serde::{Deserialize, Serialize};
18
19use crate::term::{bold, cyan, dim, fmt_duration_ms, green, red, yellow};
20
21// ---------------------------------------------------------------------------
22// Relay URL default
23// ---------------------------------------------------------------------------
24
25const DEFAULT_RELAY: &str = "https://relay.ramcharan.shop";
26
27// ---------------------------------------------------------------------------
28// Snapshot types (serialized + encrypted over the relay)
29// ---------------------------------------------------------------------------
30
31#[derive(Debug, Serialize, Deserialize, Clone)]
32pub struct AccountStatus {
33    pub name: String,
34    #[serde(default)]
35    pub available: bool,
36    #[serde(default)]
37    pub disabled: bool,
38    #[serde(default)]
39    pub auth_failed: bool,
40    #[serde(default)]
41    pub cooldown_until_ms: u64,
42}
43
44#[derive(Debug, Serialize, Deserialize)]
45struct RemoteSnapshot {
46    accounts: Vec<AccountStatus>,
47    ts_ms: u64,
48}
49
50// ---------------------------------------------------------------------------
51// State snapshot for diffing (client side)
52// ---------------------------------------------------------------------------
53
54#[derive(Debug, Clone)]
55struct Snap {
56    available: bool,
57    auth_failed: bool,
58    /// true if `cooldown_until_ms > now_ms` at snapshot time
59    cooling: bool,
60    disabled: bool,
61}
62
63impl Snap {
64    fn from_status(acc: &AccountStatus, now_ms: u64) -> Self {
65        Self {
66            available: acc.available,
67            auth_failed: acc.auth_failed,
68            cooling: acc.cooldown_until_ms > now_ms,
69            disabled: acc.disabled,
70        }
71    }
72}
73
74// ---------------------------------------------------------------------------
75// Thresholds
76// ---------------------------------------------------------------------------
77
78const POLL_INTERVAL: Duration = Duration::from_secs(10);
79/// How often the host pushes to the relay even if nothing changed (keeps session alive).
80const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5 * 60);
81/// Cooldowns shorter than this are transient — skip notification.
82const LONG_COOLDOWN_MS: u64 = 5 * 60_000;
83/// Minimum gap between "all accounts offline" notifications.
84const ALL_OFFLINE_NOTIFY_COOLDOWN: Duration = Duration::from_secs(3_600);
85
86// ---------------------------------------------------------------------------
87// Entry point — dispatches host vs client
88// ---------------------------------------------------------------------------
89
90pub async fn run_remote(code: Option<String>, relay_url: Option<String>, local_url: String) -> Result<()> {
91    let relay = relay_url.unwrap_or_else(|| DEFAULT_RELAY.to_string());
92    match code {
93        None        => run_host(relay, local_url).await,
94        Some(code)  => run_client(code, relay).await,
95    }
96}
97
98// ---------------------------------------------------------------------------
99// Host mode
100// ---------------------------------------------------------------------------
101
102async fn run_host(relay_url: String, local_url: String) -> Result<()> {
103    let code = crate::sync::generate_remote_code();
104
105    let client = reqwest::Client::builder()
106        .timeout(Duration::from_secs(10))
107        .pool_max_idle_per_host(0)
108        .build()?;
109
110    println!();
111    println!("  {}  {}  {}", bold("◆"), bold("shunt"), dim("remote  host"));
112    println!();
113    println!("  {}  {}", dim("code"), cyan(&code));
114    println!("  {}  on another device run:", dim("·"));
115    println!("  {}  {}", dim("·"), bold(&format!("shunt remote {code}")));
116    println!();
117    println!("  {}  watching local accounts — Ctrl-C to stop", dim("·"));
118    println!();
119
120    let mut last_push: Option<Instant> = None;
121    let mut last_accounts_json: Option<String> = None;
122
123    loop {
124        match fetch_local_status(&client, &local_url).await {
125            Ok(accounts) => {
126                let accounts_json = serde_json::to_string(&accounts).unwrap_or_default();
127                let changed = last_accounts_json.as_deref() != Some(accounts_json.as_str());
128                let heartbeat_due = last_push
129                    .map(|t| t.elapsed() >= HEARTBEAT_INTERVAL)
130                    .unwrap_or(true);
131
132                if changed || heartbeat_due {
133                    let snapshot = RemoteSnapshot { accounts, ts_ms: now_ms() };
134                    let data = serde_json::to_vec(&snapshot)?;
135                    let payload = crate::sync::encrypt_bytes(&data, &code)?;
136
137                    let push_url = format!("{relay_url}/watch/{code}");
138                    match client
139                        .put(&push_url)
140                        .json(&serde_json::json!({ "payload": payload }))
141                        .send()
142                        .await
143                    {
144                        Ok(r) if r.status().is_success() => {
145                            last_push = Some(Instant::now());
146                            last_accounts_json = Some(accounts_json);
147                        }
148                        Ok(r) => {
149                            let status = r.status();
150                            eprintln!("  {}  relay push failed: {status}", red("✗"));
151                        }
152                        Err(e) => eprintln!("  {}  relay unreachable: {e}", red("✗")),
153                    }
154                }
155            }
156            Err(e) => eprintln!("  {}  local shunt unreachable: {e}", red("✗")),
157        }
158
159        tokio::time::sleep(POLL_INTERVAL).await;
160    }
161}
162
163// ---------------------------------------------------------------------------
164// Client mode
165// ---------------------------------------------------------------------------
166
167async fn run_client(code: String, relay_url: String) -> Result<()> {
168    crate::sync::validate_remote_code(&code)
169        .context("invalid remote code")?;
170
171    let client = reqwest::Client::builder()
172        .timeout(Duration::from_secs(10))
173        .pool_max_idle_per_host(0)
174        .build()?;
175
176    println!();
177    println!("  {}  {}  {}", bold("◆"), bold("shunt"), dim("remote  client"));
178    println!("  {}  {}", dim("·"), cyan(&relay_url));
179    println!("  {}  connecting…", dim("·"));
180    println!();
181
182    let mut prev: HashMap<String, Snap> = HashMap::new();
183    let mut first_poll = true;
184    let mut was_session_missing = false;
185
186    // Throttle state
187    let mut notified_cooldown: HashMap<String, u64> = HashMap::new();
188    let mut notified_auth_failed: HashSet<String> = HashSet::new();
189    let mut last_all_offline: Option<Instant> = None;
190    let mut was_all_offline = false;
191
192    loop {
193        let poll_url = format!("{relay_url}/watch/{code}");
194        match client.get(&poll_url).send().await {
195            Ok(resp) if resp.status() == reqwest::StatusCode::NOT_FOUND => {
196                if !was_session_missing {
197                    println!("  {}  session not found — waiting for host…", yellow("⏸"));
198                    was_session_missing = true;
199                }
200            }
201            Ok(resp) if resp.status().is_success() => {
202                if was_session_missing {
203                    println!("  {}  host connected", green("✓"));
204                    was_session_missing = false;
205                }
206
207                let body: serde_json::Value = match resp.json().await {
208                    Ok(v) => v,
209                    Err(e) => { eprintln!("  {}  bad relay response: {e}", red("✗")); continue; }
210                };
211
212                let payload = match body["payload"].as_str() {
213                    Some(p) => p.to_owned(),
214                    None => { eprintln!("  {}  relay response missing payload", red("✗")); continue; }
215                };
216
217                let data = match crate::sync::decrypt_bytes(&payload, &code) {
218                    Ok(d) => d,
219                    Err(e) => { eprintln!("  {}  decryption failed: {e}", red("✗")); continue; }
220                };
221
222                let snapshot: RemoteSnapshot = match serde_json::from_slice(&data) {
223                    Ok(s) => s,
224                    Err(e) => { eprintln!("  {}  snapshot parse error: {e}", red("✗")); continue; }
225                };
226
227                let now = now_ms();
228
229                if first_poll {
230                    print_initial_state(&snapshot.accounts, now);
231                    for acc in &snapshot.accounts {
232                        prev.insert(acc.name.clone(), Snap::from_status(acc, now));
233                    }
234                    first_poll = false;
235                } else {
236                    diff_and_notify(
237                        &snapshot.accounts,
238                        &prev,
239                        now,
240                        &mut notified_cooldown,
241                        &mut notified_auth_failed,
242                        &mut last_all_offline,
243                        &mut was_all_offline,
244                    );
245                    prev.clear();
246                    for acc in &snapshot.accounts {
247                        prev.insert(acc.name.clone(), Snap::from_status(acc, now));
248                    }
249                }
250            }
251            Ok(resp) => {
252                eprintln!("  {}  relay error: {}", red("✗"), resp.status());
253            }
254            Err(e) => {
255                eprintln!("  {}  cannot reach relay: {e}", red("✗"));
256            }
257        }
258
259        tokio::time::sleep(POLL_INTERVAL).await;
260    }
261}
262
263// ---------------------------------------------------------------------------
264// State diffing + notification dispatch
265// ---------------------------------------------------------------------------
266
267fn diff_and_notify(
268    accounts: &[AccountStatus],
269    prev: &HashMap<String, Snap>,
270    now_ms: u64,
271    notified_cooldown: &mut HashMap<String, u64>,
272    notified_auth_failed: &mut HashSet<String>,
273    last_all_offline: &mut Option<Instant>,
274    was_all_offline: &mut bool,
275) {
276    let all_unavailable = accounts.iter().all(|a| !a.available);
277
278    for acc in accounts {
279        let Some(p) = prev.get(&acc.name) else { continue };
280
281        // ── Reauth required (newly auth_failed) ─────────────────────────────
282        if acc.auth_failed && !p.auth_failed && !notified_auth_failed.contains(&acc.name) {
283            let msg = format!(
284                "Account '{}' needs re-authorization. Run `shunt add-account`.",
285                acc.name
286            );
287            println!("  {}  [{}]  reauth required", red("✗"), yellow(&acc.name));
288            crate::notify::notify("shunt: Reauth Required", &msg, "Basso");
289            notified_auth_failed.insert(acc.name.clone());
290        }
291        if !acc.auth_failed {
292            notified_auth_failed.remove(&acc.name);
293        }
294
295        // ── Entered cooldown (newly, long enough to matter) ──────────────────
296        let curr_cooling = acc.cooldown_until_ms > now_ms;
297        if curr_cooling && !p.cooling {
298            let remaining_ms = acc.cooldown_until_ms - now_ms;
299            let last_cdl = notified_cooldown.get(&acc.name).copied().unwrap_or(0);
300            if remaining_ms >= LONG_COOLDOWN_MS && acc.cooldown_until_ms != last_cdl {
301                let mins = remaining_ms / 60_000;
302                let msg = format!("Account '{}' hit quota limit — cooling {}m.", acc.name, mins);
303                println!(
304                    "  {}  [{}]  rate limited — cooling {}",
305                    yellow("⏸"), yellow(&acc.name), yellow(&fmt_duration_ms(remaining_ms)),
306                );
307                crate::notify::notify("shunt: Rate Limited", &msg, "Ping");
308                notified_cooldown.insert(acc.name.clone(), acc.cooldown_until_ms);
309            }
310        }
311
312        // ── Resumed from cooldown ────────────────────────────────────────────
313        if p.cooling && acc.available && !acc.auth_failed {
314            println!("  {}  [{}]  back online", green("✓"), green(&acc.name));
315            crate::notify::notify(
316                "shunt: Account Resumed",
317                &format!("Account '{}' is back online.", acc.name),
318                "Glass",
319            );
320            notified_cooldown.remove(&acc.name);
321        }
322
323        // ── Recovered from auth_failed / disabled ────────────────────────────
324        if (p.auth_failed || p.disabled) && acc.available {
325            println!("  {}  [{}]  recovered", green("✓"), green(&acc.name));
326            crate::notify::notify(
327                "shunt: Account Recovered",
328                &format!("Account '{}' is back online.", acc.name),
329                "Glass",
330            );
331        }
332    }
333
334    // ── All accounts offline ─────────────────────────────────────────────────
335    if all_unavailable && !*was_all_offline {
336        let should_notify = last_all_offline
337            .map(|t| t.elapsed() >= ALL_OFFLINE_NOTIFY_COOLDOWN)
338            .unwrap_or(true);
339        if should_notify {
340            println!("  {}  all accounts are offline", red("✗"));
341            crate::notify::notify(
342                "shunt: All Accounts Offline",
343                "All accounts are offline or on cooldown.",
344                "Basso",
345            );
346            *last_all_offline = Some(Instant::now());
347        }
348    }
349    if *was_all_offline && !all_unavailable {
350        println!("  {}  accounts back online", green("✓"));
351    }
352    *was_all_offline = all_unavailable;
353}
354
355// ---------------------------------------------------------------------------
356// Display helpers
357// ---------------------------------------------------------------------------
358
359fn print_initial_state(accounts: &[AccountStatus], now_ms: u64) {
360    println!("  {}  {} account(s)", green("✓"), accounts.len());
361    for acc in accounts {
362        let (sym, label) = if acc.auth_failed || acc.disabled {
363            (red("✗"), red(&acc.name))
364        } else if acc.cooldown_until_ms > now_ms {
365            let rem = fmt_duration_ms(acc.cooldown_until_ms - now_ms);
366            (yellow("⏸"), yellow(&format!("{}  cooling {}", acc.name, rem)))
367        } else {
368            (green("✓"), green(&acc.name))
369        };
370        println!("    {}  {}", sym, label);
371    }
372    println!();
373}
374
375// ---------------------------------------------------------------------------
376// HTTP helpers
377// ---------------------------------------------------------------------------
378
379async fn fetch_local_status(
380    client: &reqwest::Client,
381    local_url: &str,
382) -> Result<Vec<AccountStatus>> {
383    let url = format!("{}/status", local_url.trim_end_matches('/'));
384    let resp = client.get(&url).send().await?;
385    let body: serde_json::Value = resp.json().await?;
386    let accounts = serde_json::from_value(body["accounts"].clone())
387        .context("failed to parse accounts from /status")?;
388    Ok(accounts)
389}
390
391// ---------------------------------------------------------------------------
392// Time
393// ---------------------------------------------------------------------------
394
395fn now_ms() -> u64 {
396    std::time::SystemTime::now()
397        .duration_since(std::time::UNIX_EPOCH)
398        .unwrap_or_default()
399        .as_millis() as u64
400}