1use std::collections::{HashMap, HashSet};
14use std::time::{Duration, Instant};
15
16use anyhow::{Context, Result};
17use serde::{Deserialize, Serialize};
18
19use crate::term::{bold, cyan, dim, fmt_duration_ms, green, red, yellow};
20
21const DEFAULT_RELAY: &str = "https://relay.ramcharan.shop";
26
27#[derive(Debug, Serialize, Deserialize, Clone)]
32pub struct AccountStatus {
33 pub name: String,
34 #[serde(default)]
35 pub available: bool,
36 #[serde(default)]
37 pub disabled: bool,
38 #[serde(default)]
39 pub auth_failed: bool,
40 #[serde(default)]
41 pub cooldown_until_ms: u64,
42}
43
44#[derive(Debug, Serialize, Deserialize)]
45struct RemoteSnapshot {
46 accounts: Vec<AccountStatus>,
47 ts_ms: u64,
48}
49
50#[derive(Debug, Clone)]
55struct Snap {
56 available: bool,
57 auth_failed: bool,
58 cooling: bool,
60 disabled: bool,
61}
62
63impl Snap {
64 fn from_status(acc: &AccountStatus, now_ms: u64) -> Self {
65 Self {
66 available: acc.available,
67 auth_failed: acc.auth_failed,
68 cooling: acc.cooldown_until_ms > now_ms,
69 disabled: acc.disabled,
70 }
71 }
72}
73
74const POLL_INTERVAL: Duration = Duration::from_secs(10);
79const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5 * 60);
81const LONG_COOLDOWN_MS: u64 = 5 * 60_000;
83const ALL_OFFLINE_NOTIFY_COOLDOWN: Duration = Duration::from_secs(3_600);
85
86pub async fn run_remote(code: Option<String>, relay_url: Option<String>, local_url: String) -> Result<()> {
91 let relay = relay_url.unwrap_or_else(|| DEFAULT_RELAY.to_string());
92 match code {
93 None => run_host(relay, local_url).await,
94 Some(code) => run_client(code, relay).await,
95 }
96}
97
98async fn run_host(relay_url: String, local_url: String) -> Result<()> {
103 let code = crate::sync::generate_remote_code();
104
105 let client = reqwest::Client::builder()
106 .timeout(Duration::from_secs(10))
107 .pool_max_idle_per_host(0)
108 .build()?;
109
110 println!();
111 println!(" {} {} {}", bold("◆"), bold("shunt"), dim("remote host"));
112 println!();
113 println!(" {} {}", dim("code"), cyan(&code));
114 println!(" {} on another device run:", dim("·"));
115 println!(" {} {}", dim("·"), bold(&format!("shunt remote {code}")));
116 println!();
117 println!(" {} watching local accounts — Ctrl-C to stop", dim("·"));
118 println!();
119
120 let mut last_push: Option<Instant> = None;
121 let mut last_accounts_json: Option<String> = None;
122
123 loop {
124 match fetch_local_status(&client, &local_url).await {
125 Ok(accounts) => {
126 let accounts_json = serde_json::to_string(&accounts).unwrap_or_default();
127 let changed = last_accounts_json.as_deref() != Some(accounts_json.as_str());
128 let heartbeat_due = last_push
129 .map(|t| t.elapsed() >= HEARTBEAT_INTERVAL)
130 .unwrap_or(true);
131
132 if changed || heartbeat_due {
133 let snapshot = RemoteSnapshot { accounts, ts_ms: now_ms() };
134 let data = serde_json::to_vec(&snapshot)?;
135 let payload = crate::sync::encrypt_bytes(&data, &code)?;
136
137 let push_url = format!("{relay_url}/watch/{code}");
138 match client
139 .put(&push_url)
140 .json(&serde_json::json!({ "payload": payload }))
141 .send()
142 .await
143 {
144 Ok(r) if r.status().is_success() => {
145 last_push = Some(Instant::now());
146 last_accounts_json = Some(accounts_json);
147 }
148 Ok(r) => {
149 let status = r.status();
150 eprintln!(" {} relay push failed: {status}", red("✗"));
151 }
152 Err(e) => eprintln!(" {} relay unreachable: {e}", red("✗")),
153 }
154 }
155 }
156 Err(e) => eprintln!(" {} local shunt unreachable: {e}", red("✗")),
157 }
158
159 tokio::time::sleep(POLL_INTERVAL).await;
160 }
161}
162
163async fn run_client(code: String, relay_url: String) -> Result<()> {
168 crate::sync::validate_remote_code(&code)
169 .context("invalid remote code")?;
170
171 let client = reqwest::Client::builder()
172 .timeout(Duration::from_secs(10))
173 .pool_max_idle_per_host(0)
174 .build()?;
175
176 println!();
177 println!(" {} {} {}", bold("◆"), bold("shunt"), dim("remote client"));
178 println!(" {} {}", dim("·"), cyan(&relay_url));
179 println!(" {} connecting…", dim("·"));
180 println!();
181
182 let mut prev: HashMap<String, Snap> = HashMap::new();
183 let mut first_poll = true;
184 let mut was_session_missing = false;
185
186 let mut notified_cooldown: HashMap<String, u64> = HashMap::new();
188 let mut notified_auth_failed: HashSet<String> = HashSet::new();
189 let mut last_all_offline: Option<Instant> = None;
190 let mut was_all_offline = false;
191
192 loop {
193 let poll_url = format!("{relay_url}/watch/{code}");
194 match client.get(&poll_url).send().await {
195 Ok(resp) if resp.status() == reqwest::StatusCode::NOT_FOUND => {
196 if !was_session_missing {
197 println!(" {} session not found — waiting for host…", yellow("⏸"));
198 was_session_missing = true;
199 }
200 }
201 Ok(resp) if resp.status().is_success() => {
202 if was_session_missing {
203 println!(" {} host connected", green("✓"));
204 was_session_missing = false;
205 }
206
207 let body: serde_json::Value = match resp.json().await {
208 Ok(v) => v,
209 Err(e) => { eprintln!(" {} bad relay response: {e}", red("✗")); continue; }
210 };
211
212 let payload = match body["payload"].as_str() {
213 Some(p) => p.to_owned(),
214 None => { eprintln!(" {} relay response missing payload", red("✗")); continue; }
215 };
216
217 let data = match crate::sync::decrypt_bytes(&payload, &code) {
218 Ok(d) => d,
219 Err(e) => { eprintln!(" {} decryption failed: {e}", red("✗")); continue; }
220 };
221
222 let snapshot: RemoteSnapshot = match serde_json::from_slice(&data) {
223 Ok(s) => s,
224 Err(e) => { eprintln!(" {} snapshot parse error: {e}", red("✗")); continue; }
225 };
226
227 let now = now_ms();
228
229 if first_poll {
230 print_initial_state(&snapshot.accounts, now);
231 for acc in &snapshot.accounts {
232 prev.insert(acc.name.clone(), Snap::from_status(acc, now));
233 }
234 first_poll = false;
235 } else {
236 diff_and_notify(
237 &snapshot.accounts,
238 &prev,
239 now,
240 &mut notified_cooldown,
241 &mut notified_auth_failed,
242 &mut last_all_offline,
243 &mut was_all_offline,
244 );
245 prev.clear();
246 for acc in &snapshot.accounts {
247 prev.insert(acc.name.clone(), Snap::from_status(acc, now));
248 }
249 }
250 }
251 Ok(resp) => {
252 eprintln!(" {} relay error: {}", red("✗"), resp.status());
253 }
254 Err(e) => {
255 eprintln!(" {} cannot reach relay: {e}", red("✗"));
256 }
257 }
258
259 tokio::time::sleep(POLL_INTERVAL).await;
260 }
261}
262
263fn diff_and_notify(
268 accounts: &[AccountStatus],
269 prev: &HashMap<String, Snap>,
270 now_ms: u64,
271 notified_cooldown: &mut HashMap<String, u64>,
272 notified_auth_failed: &mut HashSet<String>,
273 last_all_offline: &mut Option<Instant>,
274 was_all_offline: &mut bool,
275) {
276 let all_unavailable = accounts.iter().all(|a| !a.available);
277
278 for acc in accounts {
279 let Some(p) = prev.get(&acc.name) else { continue };
280
281 if acc.auth_failed && !p.auth_failed && !notified_auth_failed.contains(&acc.name) {
283 let msg = format!(
284 "Account '{}' needs re-authorization. Run `shunt add-account`.",
285 acc.name
286 );
287 println!(" {} [{}] reauth required", red("✗"), yellow(&acc.name));
288 crate::notify::notify("shunt: Reauth Required", &msg, "Basso");
289 notified_auth_failed.insert(acc.name.clone());
290 }
291 if !acc.auth_failed {
292 notified_auth_failed.remove(&acc.name);
293 }
294
295 let curr_cooling = acc.cooldown_until_ms > now_ms;
297 if curr_cooling && !p.cooling {
298 let remaining_ms = acc.cooldown_until_ms - now_ms;
299 let last_cdl = notified_cooldown.get(&acc.name).copied().unwrap_or(0);
300 if remaining_ms >= LONG_COOLDOWN_MS && acc.cooldown_until_ms != last_cdl {
301 let mins = remaining_ms / 60_000;
302 let msg = format!("Account '{}' hit quota limit — cooling {}m.", acc.name, mins);
303 println!(
304 " {} [{}] rate limited — cooling {}",
305 yellow("⏸"), yellow(&acc.name), yellow(&fmt_duration_ms(remaining_ms)),
306 );
307 crate::notify::notify("shunt: Rate Limited", &msg, "Ping");
308 notified_cooldown.insert(acc.name.clone(), acc.cooldown_until_ms);
309 }
310 }
311
312 if p.cooling && acc.available && !acc.auth_failed {
314 println!(" {} [{}] back online", green("✓"), green(&acc.name));
315 crate::notify::notify(
316 "shunt: Account Resumed",
317 &format!("Account '{}' is back online.", acc.name),
318 "Glass",
319 );
320 notified_cooldown.remove(&acc.name);
321 }
322
323 if (p.auth_failed || p.disabled) && acc.available {
325 println!(" {} [{}] recovered", green("✓"), green(&acc.name));
326 crate::notify::notify(
327 "shunt: Account Recovered",
328 &format!("Account '{}' is back online.", acc.name),
329 "Glass",
330 );
331 }
332 }
333
334 if all_unavailable && !*was_all_offline {
336 let should_notify = last_all_offline
337 .map(|t| t.elapsed() >= ALL_OFFLINE_NOTIFY_COOLDOWN)
338 .unwrap_or(true);
339 if should_notify {
340 println!(" {} all accounts are offline", red("✗"));
341 crate::notify::notify(
342 "shunt: All Accounts Offline",
343 "All accounts are offline or on cooldown.",
344 "Basso",
345 );
346 *last_all_offline = Some(Instant::now());
347 }
348 }
349 if *was_all_offline && !all_unavailable {
350 println!(" {} accounts back online", green("✓"));
351 }
352 *was_all_offline = all_unavailable;
353}
354
355fn print_initial_state(accounts: &[AccountStatus], now_ms: u64) {
360 println!(" {} {} account(s)", green("✓"), accounts.len());
361 for acc in accounts {
362 let (sym, label) = if acc.auth_failed || acc.disabled {
363 (red("✗"), red(&acc.name))
364 } else if acc.cooldown_until_ms > now_ms {
365 let rem = fmt_duration_ms(acc.cooldown_until_ms - now_ms);
366 (yellow("⏸"), yellow(&format!("{} cooling {}", acc.name, rem)))
367 } else {
368 (green("✓"), green(&acc.name))
369 };
370 println!(" {} {}", sym, label);
371 }
372 println!();
373}
374
375async fn fetch_local_status(
380 client: &reqwest::Client,
381 local_url: &str,
382) -> Result<Vec<AccountStatus>> {
383 let url = format!("{}/status", local_url.trim_end_matches('/'));
384 let resp = client.get(&url).send().await?;
385 let body: serde_json::Value = resp.json().await?;
386 let accounts = serde_json::from_value(body["accounts"].clone())
387 .context("failed to parse accounts from /status")?;
388 Ok(accounts)
389}
390
391fn now_ms() -> u64 {
396 std::time::SystemTime::now()
397 .duration_since(std::time::UNIX_EPOCH)
398 .unwrap_or_default()
399 .as_millis() as u64
400}