Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
5    PeersResponse,
6};
7use pulpo_common::session::Session;
8
9#[derive(Parser, Debug)]
10#[command(
11    name = "pulpo",
12    about = "Manage agent sessions across your machines",
13    version = env!("PULPO_VERSION"),
14    args_conflicts_with_subcommands = true
15)]
16pub struct Cli {
17    /// Target node (default: localhost)
18    #[arg(long, default_value = "localhost:7433")]
19    pub node: String,
20
21    /// Auth token (auto-discovered from local daemon if omitted)
22    #[arg(long)]
23    pub token: Option<String>,
24
25    #[command(subcommand)]
26    pub command: Option<Commands>,
27
28    /// Quick spawn: `pulpo <path>` spawns a session in that directory
29    #[arg(value_name = "PATH")]
30    pub path: Option<String>,
31}
32
33#[derive(Subcommand, Debug)]
34#[allow(clippy::large_enum_variant)]
35pub enum Commands {
36    /// Attach to a session's terminal
37    #[command(visible_alias = "a")]
38    Attach {
39        /// Session name or ID
40        name: String,
41    },
42
43    /// Send input to a session
44    #[command(visible_alias = "i", visible_alias = "send")]
45    Input {
46        /// Session name or ID
47        name: String,
48        /// Text to send (sends Enter if omitted)
49        text: Option<String>,
50    },
51
52    /// Spawn a new agent session
53    #[command(visible_alias = "s")]
54    Spawn {
55        /// Session name (auto-generated from workdir if omitted)
56        name: Option<String>,
57
58        /// Working directory (defaults to current directory)
59        #[arg(long)]
60        workdir: Option<String>,
61
62        /// Ink name (from config)
63        #[arg(long)]
64        ink: Option<String>,
65
66        /// Human-readable description of the task
67        #[arg(long)]
68        description: Option<String>,
69
70        /// Don't attach to the session after spawning
71        #[arg(short, long)]
72        detach: bool,
73
74        /// Idle threshold in seconds (0 = never idle)
75        #[arg(long)]
76        idle_threshold: Option<u32>,
77
78        /// Auto-select the least loaded node
79        #[arg(long)]
80        auto: bool,
81
82        /// Command to run (everything after --)
83        #[arg(last = true)]
84        command: Vec<String>,
85    },
86
87    /// List all sessions
88    #[command(visible_alias = "ls")]
89    List,
90
91    /// Show session logs/output
92    #[command(visible_alias = "l")]
93    Logs {
94        /// Session name or ID
95        name: String,
96
97        /// Number of lines to fetch
98        #[arg(long, default_value = "100")]
99        lines: usize,
100
101        /// Follow output (like `tail -f`)
102        #[arg(short, long)]
103        follow: bool,
104    },
105
106    /// Kill a session
107    #[command(visible_alias = "k")]
108    Kill {
109        /// Session name or ID
110        name: String,
111    },
112
113    /// Permanently remove a session from history
114    #[command(visible_alias = "rm")]
115    Delete {
116        /// Session name or ID
117        name: String,
118    },
119
120    /// Resume a lost session
121    #[command(visible_alias = "r")]
122    Resume {
123        /// Session name or ID
124        name: String,
125    },
126
127    /// List all known nodes
128    #[command(visible_alias = "n")]
129    Nodes,
130
131    /// Show intervention history for a session
132    #[command(visible_alias = "iv")]
133    Interventions {
134        /// Session name or ID
135        name: String,
136    },
137
138    /// Open the web dashboard in your browser
139    Ui,
140
141    /// Manage scheduled agent runs via crontab
142    #[command(visible_alias = "sched")]
143    Schedule {
144        #[command(subcommand)]
145        action: ScheduleAction,
146    },
147}
148
149#[derive(Subcommand, Debug)]
150pub enum ScheduleAction {
151    /// Install a cron schedule that spawns a session
152    Install {
153        /// Schedule name
154        name: String,
155        /// Cron expression (e.g. "0 3 * * *")
156        cron: String,
157        /// Working directory
158        #[arg(long)]
159        workdir: String,
160        /// Command to run (everything after --)
161        #[arg(last = true)]
162        command: Vec<String>,
163    },
164    /// List installed pulpo cron schedules
165    #[command(alias = "ls")]
166    List,
167    /// Remove a cron schedule
168    #[command(alias = "rm")]
169    Remove {
170        /// Schedule name
171        name: String,
172    },
173    /// Pause a cron schedule (comments out the line)
174    Pause {
175        /// Schedule name
176        name: String,
177    },
178    /// Resume a paused cron schedule (uncomments the line)
179    Resume {
180        /// Schedule name
181        name: String,
182    },
183}
184
185/// The marker emitted by the agent wrapper when the agent process exits.
186const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
187
188/// Resolve a path to an absolute path string.
189fn resolve_path(path: &str) -> String {
190    let p = std::path::Path::new(path);
191    if p.is_absolute() {
192        path.to_owned()
193    } else {
194        std::env::current_dir().map_or_else(
195            |_| path.to_owned(),
196            |cwd| cwd.join(p).to_string_lossy().into_owned(),
197        )
198    }
199}
200
201/// Derive a session name from a directory path (basename, kebab-cased).
202fn derive_session_name(path: &str) -> String {
203    let basename = std::path::Path::new(path)
204        .file_name()
205        .and_then(|n| n.to_str())
206        .unwrap_or("session");
207    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
208    let kebab: String = basename
209        .chars()
210        .map(|c| {
211            if c.is_ascii_alphanumeric() {
212                c.to_ascii_lowercase()
213            } else {
214                '-'
215            }
216        })
217        .collect();
218    // Collapse consecutive hyphens and trim leading/trailing hyphens
219    let mut result = String::new();
220    for c in kebab.chars() {
221        if c == '-' && result.ends_with('-') {
222            continue;
223        }
224        result.push(c);
225    }
226    let result = result.trim_matches('-').to_owned();
227    if result.is_empty() {
228        "session".to_owned()
229    } else {
230        result
231    }
232}
233
234/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
235async fn deduplicate_session_name(
236    client: &reqwest::Client,
237    base: &str,
238    name: &str,
239    token: Option<&str>,
240) -> String {
241    // Check if the name is already taken by fetching the session
242    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
243        .send()
244        .await;
245    match resp {
246        Ok(r) if r.status().is_success() => {
247            // Session exists — try suffixed names
248            for i in 2..=99 {
249                let candidate = format!("{name}-{i}");
250                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
251                    .send()
252                    .await;
253                match resp {
254                    Ok(r) if r.status().is_success() => {}
255                    _ => return candidate,
256                }
257            }
258            format!("{name}-100")
259        }
260        _ => name.to_owned(),
261    }
262}
263
264/// Format the base URL from the node address.
265pub fn base_url(node: &str) -> String {
266    format!("http://{node}")
267}
268
269/// Response shape for the output endpoint.
270#[derive(serde::Deserialize)]
271struct OutputResponse {
272    output: String,
273}
274
275/// Format a list of sessions as a table.
276fn format_sessions(sessions: &[Session]) -> String {
277    if sessions.is_empty() {
278        return "No sessions.".into();
279    }
280    let mut lines = vec![format!("{:<20} {:<12} {}", "NAME", "STATUS", "COMMAND")];
281    for s in sessions {
282        let cmd_display = if s.command.len() > 50 {
283            format!("{}...", &s.command[..47])
284        } else {
285            s.command.clone()
286        };
287        lines.push(format!("{:<20} {:<12} {}", s.name, s.status, cmd_display));
288    }
289    lines.join("\n")
290}
291
292/// Format the peers response as a table.
293fn format_nodes(resp: &PeersResponse) -> String {
294    let mut lines = vec![format!(
295        "{:<20} {:<25} {:<10} {}",
296        "NAME", "ADDRESS", "STATUS", "SESSIONS"
297    )];
298    lines.push(format!(
299        "{:<20} {:<25} {:<10} {}",
300        resp.local.name, "(local)", "online", "-"
301    ));
302    for p in &resp.peers {
303        let sessions = p
304            .session_count
305            .map_or_else(|| "-".into(), |c| c.to_string());
306        lines.push(format!(
307            "{:<20} {:<25} {:<10} {}",
308            p.name, p.address, p.status, sessions
309        ));
310    }
311    lines.join("\n")
312}
313
314/// Format intervention events as a table.
315fn format_interventions(events: &[InterventionEventResponse]) -> String {
316    if events.is_empty() {
317        return "No intervention events.".into();
318    }
319    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
320    for e in events {
321        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
322    }
323    lines.join("\n")
324}
325
326/// Build the command to open a URL in the default browser.
327#[cfg_attr(coverage, allow(dead_code))]
328fn build_open_command(url: &str) -> std::process::Command {
329    #[cfg(target_os = "macos")]
330    {
331        let mut cmd = std::process::Command::new("open");
332        cmd.arg(url);
333        cmd
334    }
335    #[cfg(target_os = "linux")]
336    {
337        let mut cmd = std::process::Command::new("xdg-open");
338        cmd.arg(url);
339        cmd
340    }
341    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
342    {
343        // Fallback: try xdg-open
344        let mut cmd = std::process::Command::new("xdg-open");
345        cmd.arg(url);
346        cmd
347    }
348}
349
350/// Open a URL in the default browser.
351#[cfg(not(coverage))]
352fn open_browser(url: &str) -> Result<()> {
353    build_open_command(url).status()?;
354    Ok(())
355}
356
357/// Stub for coverage builds — avoids opening a browser during tests.
358#[cfg(coverage)]
359fn open_browser(_url: &str) -> Result<()> {
360    Ok(())
361}
362
363/// Build the command to attach to a session's terminal.
364/// Takes the backend session ID (e.g. `my-session`) — the tmux session name.
365#[cfg_attr(coverage, allow(dead_code))]
366fn build_attach_command(backend_session_id: &str) -> std::process::Command {
367    let mut cmd = std::process::Command::new("tmux");
368    cmd.args(["attach-session", "-t", backend_session_id]);
369    cmd
370}
371
372/// Attach to a session's terminal.
373#[cfg(not(any(test, coverage)))]
374fn attach_session(backend_session_id: &str) -> Result<()> {
375    let status = build_attach_command(backend_session_id).status()?;
376    if !status.success() {
377        anyhow::bail!("attach failed with {status}");
378    }
379    Ok(())
380}
381
382/// Stub for test and coverage builds — avoids spawning real terminals during tests.
383#[cfg(any(test, coverage))]
384#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
385fn attach_session(_backend_session_id: &str) -> Result<()> {
386    Ok(())
387}
388
389/// Extract a clean error message from an API JSON response (or fall back to raw text).
390fn api_error(text: &str) -> anyhow::Error {
391    serde_json::from_str::<serde_json::Value>(text)
392        .ok()
393        .and_then(|v| v["error"].as_str().map(String::from))
394        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
395}
396
397/// Return the response body text, or a clean error if the response was non-success.
398async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
399    if resp.status().is_success() {
400        Ok(resp.text().await?)
401    } else {
402        let text = resp.text().await?;
403        Err(api_error(&text))
404    }
405}
406
407/// Map a reqwest error to a user-friendly message.
408fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
409    if err.is_connect() {
410        anyhow::anyhow!(
411            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
412        )
413    } else {
414        anyhow::anyhow!("Network error connecting to {node}: {err}")
415    }
416}
417
418/// Check if the node address points to localhost.
419fn is_localhost(node: &str) -> bool {
420    let host = node.split(':').next().unwrap_or(node);
421    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
422}
423
424/// Try to auto-discover the auth token from a local daemon.
425async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
426    let resp = client
427        .get(format!("{base}/api/v1/auth/token"))
428        .send()
429        .await
430        .ok()?;
431    let body: AuthTokenResponse = resp.json().await.ok()?;
432    if body.token.is_empty() {
433        None
434    } else {
435        Some(body.token)
436    }
437}
438
439/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
440async fn resolve_token(
441    client: &reqwest::Client,
442    base: &str,
443    node: &str,
444    explicit: Option<&str>,
445) -> Option<String> {
446    if let Some(t) = explicit {
447        return Some(t.to_owned());
448    }
449    if is_localhost(node) {
450        return discover_token(client, base).await;
451    }
452    None
453}
454
455/// Check if a node string needs resolution (no port specified).
456fn node_needs_resolution(node: &str) -> bool {
457    !node.contains(':')
458}
459
460/// Resolve a node reference to a `host:port` address.
461///
462/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
463/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
464/// online peer is found, return its address and optionally its configured auth token
465/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
466#[cfg(not(coverage))]
467async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
468    // Already has port — use as-is
469    if !node_needs_resolution(node) {
470        return (node.to_owned(), None);
471    }
472
473    // Try to resolve via local daemon's peer registry
474    let local_base = "http://localhost:7433";
475    let mut resolved_address: Option<String> = None;
476
477    if let Ok(resp) = client
478        .get(format!("{local_base}/api/v1/peers"))
479        .send()
480        .await
481        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
482    {
483        for peer in &peers_resp.peers {
484            if peer.name == node {
485                resolved_address = Some(peer.address.clone());
486                break;
487            }
488        }
489    }
490
491    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
492
493    // Try to get the peer's auth token from the config endpoint
494    let peer_token = if let Ok(resp) = client
495        .get(format!("{local_base}/api/v1/config"))
496        .send()
497        .await
498        && let Ok(config) = resp.json::<ConfigResponse>().await
499        && let Some(entry) = config.peers.get(node)
500    {
501        entry.token().map(String::from)
502    } else {
503        None
504    };
505
506    (address, peer_token)
507}
508
509/// Coverage stub — no real HTTP resolution during coverage builds.
510#[cfg(coverage)]
511async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
512    if node_needs_resolution(node) {
513        (format!("{node}:7433"), None)
514    } else {
515        (node.to_owned(), None)
516    }
517}
518
519/// Build an authenticated GET request.
520fn authed_get(
521    client: &reqwest::Client,
522    url: String,
523    token: Option<&str>,
524) -> reqwest::RequestBuilder {
525    let req = client.get(url);
526    if let Some(t) = token {
527        req.bearer_auth(t)
528    } else {
529        req
530    }
531}
532
533/// Build an authenticated POST request.
534fn authed_post(
535    client: &reqwest::Client,
536    url: String,
537    token: Option<&str>,
538) -> reqwest::RequestBuilder {
539    let req = client.post(url);
540    if let Some(t) = token {
541        req.bearer_auth(t)
542    } else {
543        req
544    }
545}
546
547/// Build an authenticated DELETE request.
548fn authed_delete(
549    client: &reqwest::Client,
550    url: String,
551    token: Option<&str>,
552) -> reqwest::RequestBuilder {
553    let req = client.delete(url);
554    if let Some(t) = token {
555        req.bearer_auth(t)
556    } else {
557        req
558    }
559}
560
561/// Fetch session output from the API.
562async fn fetch_output(
563    client: &reqwest::Client,
564    base: &str,
565    name: &str,
566    lines: usize,
567    token: Option<&str>,
568) -> Result<String> {
569    let resp = authed_get(
570        client,
571        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
572        token,
573    )
574    .send()
575    .await?;
576    let text = ok_or_api_error(resp).await?;
577    let output: OutputResponse = serde_json::from_str(&text)?;
578    Ok(output.output)
579}
580
581/// Fetch session status from the API.
582async fn fetch_session_status(
583    client: &reqwest::Client,
584    base: &str,
585    name: &str,
586    token: Option<&str>,
587) -> Result<String> {
588    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
589        .send()
590        .await?;
591    let text = ok_or_api_error(resp).await?;
592    let session: Session = serde_json::from_str(&text)?;
593    Ok(session.status.to_string())
594}
595
596/// Compute the new trailing lines that differ from the previous output.
597///
598/// The output endpoint returns the last N lines from the terminal pane. As new lines
599/// appear, old lines at the top scroll off. We find the overlap between the end
600/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
601/// trailing lines.
602fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
603    if prev.is_empty() {
604        return new;
605    }
606
607    let prev_lines: Vec<&str> = prev.lines().collect();
608    let new_lines: Vec<&str> = new.lines().collect();
609
610    if new_lines.is_empty() {
611        return "";
612    }
613
614    // prev is non-empty (early return above), so last() always succeeds
615    let last_prev = prev_lines[prev_lines.len() - 1];
616
617    // Find the last line of prev in new to determine the overlap boundary
618    for i in (0..new_lines.len()).rev() {
619        if new_lines[i] == last_prev {
620            // Verify contiguous overlap: check that lines before this match too
621            let overlap_len = prev_lines.len().min(i + 1);
622            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
623            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
624            if prev_tail == new_overlap {
625                if i + 1 < new_lines.len() {
626                    // Return the slice of `new` after the overlap
627                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
628                    return new.get(consumed.min(new.len())..).unwrap_or("");
629                }
630                return "";
631            }
632        }
633    }
634
635    // No overlap found — output changed completely, print it all
636    new
637}
638
639/// Follow logs by polling, printing only new output. Returns when the session ends.
640async fn follow_logs(
641    client: &reqwest::Client,
642    base: &str,
643    name: &str,
644    lines: usize,
645    token: Option<&str>,
646    writer: &mut (dyn std::io::Write + Send),
647) -> Result<()> {
648    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
649    write!(writer, "{prev_output}")?;
650
651    let mut unchanged_ticks: u32 = 0;
652
653    loop {
654        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
655
656        // Fetch latest output
657        let new_output = fetch_output(client, base, name, lines, token).await?;
658
659        let diff = diff_output(&prev_output, &new_output);
660        if diff.is_empty() {
661            unchanged_ticks += 1;
662        } else {
663            write!(writer, "{diff}")?;
664            unchanged_ticks = 0;
665        }
666
667        // Check for agent exit marker in output
668        if new_output.contains(AGENT_EXIT_MARKER) {
669            break;
670        }
671
672        prev_output = new_output;
673
674        // Only check session status when output has been unchanged for 3+ ticks
675        if unchanged_ticks >= 3 {
676            let status = fetch_session_status(client, base, name, token).await?;
677            let is_terminal = status == "ready" || status == "killed" || status == "lost";
678            if is_terminal {
679                break;
680            }
681        }
682    }
683    Ok(())
684}
685
686// --- Crontab wrapper ---
687
688#[cfg_attr(coverage, allow(dead_code))]
689const CRONTAB_TAG: &str = "#pulpo:";
690
691/// Read the current crontab. Returns empty string if no crontab exists.
692#[cfg(not(coverage))]
693fn read_crontab() -> Result<String> {
694    let output = std::process::Command::new("crontab").arg("-l").output()?;
695    if output.status.success() {
696        Ok(String::from_utf8_lossy(&output.stdout).to_string())
697    } else {
698        Ok(String::new())
699    }
700}
701
702/// Write the given content as the user's crontab.
703#[cfg(not(coverage))]
704fn write_crontab(content: &str) -> Result<()> {
705    use std::io::Write;
706    let mut child = std::process::Command::new("crontab")
707        .arg("-")
708        .stdin(std::process::Stdio::piped())
709        .spawn()?;
710    child
711        .stdin
712        .as_mut()
713        .unwrap()
714        .write_all(content.as_bytes())?;
715    let status = child.wait()?;
716    if !status.success() {
717        anyhow::bail!("crontab write failed");
718    }
719    Ok(())
720}
721
722/// Build the crontab line for a pulpo schedule.
723#[cfg_attr(coverage, allow(dead_code))]
724fn build_crontab_line(name: &str, cron: &str, workdir: &str, command: &str, node: &str) -> String {
725    format!(
726        "{cron} pulpo --node {node} spawn {name} --workdir {workdir} -- {command} {CRONTAB_TAG}{name}\n"
727    )
728}
729
730/// Install a cron schedule into a crontab string. Returns the updated crontab.
731#[cfg_attr(coverage, allow(dead_code))]
732fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
733    let tag = format!("{CRONTAB_TAG}{name}");
734    if crontab.contains(&tag) {
735        anyhow::bail!("schedule \"{name}\" already exists — remove it first");
736    }
737    let mut result = crontab.to_owned();
738    result.push_str(line);
739    Ok(result)
740}
741
742/// Format pulpo crontab entries for display.
743#[cfg_attr(coverage, allow(dead_code))]
744fn crontab_list(crontab: &str) -> String {
745    let entries: Vec<&str> = crontab
746        .lines()
747        .filter(|l| l.contains(CRONTAB_TAG))
748        .collect();
749    if entries.is_empty() {
750        return "No pulpo schedules.".into();
751    }
752    let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
753    for entry in entries {
754        let paused = entry.starts_with('#');
755        let raw = entry.trim_start_matches('#').trim();
756        let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
757        let parts: Vec<&str> = raw.splitn(6, ' ').collect();
758        let cron_expr = if parts.len() >= 5 {
759            parts[..5].join(" ")
760        } else {
761            "?".into()
762        };
763        lines.push(format!(
764            "{:<20} {:<15} {}",
765            name,
766            cron_expr,
767            if paused { "yes" } else { "no" }
768        ));
769    }
770    lines.join("\n")
771}
772
773/// Remove a schedule from a crontab string. Returns the updated crontab.
774#[cfg_attr(coverage, allow(dead_code))]
775fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
776    use std::fmt::Write;
777    let tag = format!("{CRONTAB_TAG}{name}");
778    let filtered =
779        crontab
780            .lines()
781            .filter(|l| !l.contains(&tag))
782            .fold(String::new(), |mut acc, l| {
783                writeln!(acc, "{l}").unwrap();
784                acc
785            });
786    if filtered.len() == crontab.len() {
787        anyhow::bail!("schedule \"{name}\" not found");
788    }
789    Ok(filtered)
790}
791
792/// Pause (comment out) a schedule in a crontab string.
793#[cfg_attr(coverage, allow(dead_code))]
794fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
795    use std::fmt::Write;
796    let tag = format!("{CRONTAB_TAG}{name}");
797    let mut found = false;
798    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
799        if l.contains(&tag) && !l.starts_with('#') {
800            found = true;
801            writeln!(acc, "#{l}").unwrap();
802        } else {
803            writeln!(acc, "{l}").unwrap();
804        }
805        acc
806    });
807    if !found {
808        anyhow::bail!("schedule \"{name}\" not found or already paused");
809    }
810    Ok(updated)
811}
812
813/// Resume (uncomment) a schedule in a crontab string.
814#[cfg_attr(coverage, allow(dead_code))]
815fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
816    use std::fmt::Write;
817    let tag = format!("{CRONTAB_TAG}{name}");
818    let mut found = false;
819    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
820        if l.contains(&tag) && l.starts_with('#') {
821            found = true;
822            writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
823        } else {
824            writeln!(acc, "{l}").unwrap();
825        }
826        acc
827    });
828    if !found {
829        anyhow::bail!("schedule \"{name}\" not found or not paused");
830    }
831    Ok(updated)
832}
833
834/// Execute a schedule subcommand using the crontab wrapper.
835#[cfg(not(coverage))]
836fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
837    match action {
838        ScheduleAction::Install {
839            name,
840            cron,
841            workdir,
842            command,
843        } => {
844            let crontab = read_crontab()?;
845            let joined_command = command.join(" ");
846            let line = build_crontab_line(name, cron, workdir, &joined_command, node);
847            let updated = crontab_install(&crontab, name, &line)?;
848            write_crontab(&updated)?;
849            Ok(format!("Installed schedule \"{name}\""))
850        }
851        ScheduleAction::List => {
852            let crontab = read_crontab()?;
853            Ok(crontab_list(&crontab))
854        }
855        ScheduleAction::Remove { name } => {
856            let crontab = read_crontab()?;
857            let updated = crontab_remove(&crontab, name)?;
858            write_crontab(&updated)?;
859            Ok(format!("Removed schedule \"{name}\""))
860        }
861        ScheduleAction::Pause { name } => {
862            let crontab = read_crontab()?;
863            let updated = crontab_pause(&crontab, name)?;
864            write_crontab(&updated)?;
865            Ok(format!("Paused schedule \"{name}\""))
866        }
867        ScheduleAction::Resume { name } => {
868            let crontab = read_crontab()?;
869            let updated = crontab_resume(&crontab, name)?;
870            write_crontab(&updated)?;
871            Ok(format!("Resumed schedule \"{name}\""))
872        }
873    }
874}
875
876/// Stub for coverage builds — crontab is real I/O.
877#[cfg(coverage)]
878fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
879    Ok(String::new())
880}
881
882/// Select the best node from the peer registry based on load.
883/// Returns the node address and name of the least loaded online peer.
884/// Scoring: lower memory usage + fewer active sessions = better.
885#[cfg(not(coverage))]
886async fn select_best_node(
887    client: &reqwest::Client,
888    base: &str,
889    token: Option<&str>,
890) -> Result<(String, String)> {
891    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
892        .send()
893        .await?;
894    let text = ok_or_api_error(resp).await?;
895    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
896
897    // Score: fewer active sessions is better, more memory is better
898    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
899
900    for peer in &peers_resp.peers {
901        if peer.status != pulpo_common::peer::PeerStatus::Online {
902            continue;
903        }
904        let sessions = peer.session_count.unwrap_or(0);
905        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
906        // Lower score = better (fewer sessions, more memory)
907        #[allow(clippy::cast_precision_loss)]
908        let score = sessions as f64 - (mem as f64 / 1024.0);
909        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
910            best = Some((peer.address.clone(), peer.name.clone(), score));
911        }
912    }
913
914    // Fall back to local if no online peers
915    match best {
916        Some((addr, name, _)) => Ok((addr, name)),
917        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
918    }
919}
920
921/// Coverage stub — auto-select always falls back to local.
922#[cfg(coverage)]
923#[allow(clippy::unnecessary_wraps)]
924async fn select_best_node(
925    _client: &reqwest::Client,
926    _base: &str,
927    _token: Option<&str>,
928) -> Result<(String, String)> {
929    Ok(("localhost:7433".into(), "local".into()))
930}
931
932/// Execute the given CLI command against the specified node.
933#[allow(clippy::too_many_lines)]
934pub async fn execute(cli: &Cli) -> Result<String> {
935    let client = reqwest::Client::new();
936    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
937    let url = base_url(&resolved_node);
938    let node = &resolved_node;
939    let token = resolve_token(&client, &url, node, cli.token.as_deref())
940        .await
941        .or(peer_token);
942
943    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
944    if cli.command.is_none() {
945        let path = cli.path.as_deref().unwrap_or(".");
946        let resolved_workdir = resolve_path(path);
947        let base_name = derive_session_name(&resolved_workdir);
948        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
949        let body = serde_json::json!({
950            "name": name,
951            "workdir": resolved_workdir,
952        });
953        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
954            .json(&body)
955            .send()
956            .await
957            .map_err(|e| friendly_error(&e, node))?;
958        let text = ok_or_api_error(resp).await?;
959        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
960        let msg = format!(
961            "Created session \"{}\" ({})",
962            resp.session.name, resp.session.id
963        );
964        let backend_id = resp
965            .session
966            .backend_session_id
967            .as_deref()
968            .unwrap_or(&resp.session.name);
969        eprintln!("{msg}");
970        attach_session(backend_id)?;
971        return Ok(format!("Detached from session \"{}\".", resp.session.name));
972    }
973
974    match cli.command.as_ref().unwrap() {
975        Commands::Attach { name } => {
976            // Fetch session to get status and backend_session_id
977            let resp = authed_get(
978                &client,
979                format!("{url}/api/v1/sessions/{name}"),
980                token.as_deref(),
981            )
982            .send()
983            .await
984            .map_err(|e| friendly_error(&e, node))?;
985            let text = ok_or_api_error(resp).await?;
986            let session: Session = serde_json::from_str(&text)?;
987            match session.status.to_string().as_str() {
988                "lost" => {
989                    anyhow::bail!(
990                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
991                    );
992                }
993                "killed" => {
994                    anyhow::bail!(
995                        "Session \"{name}\" is {} — cannot attach to a killed session.",
996                        session.status
997                    );
998                }
999                _ => {}
1000            }
1001            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1002            attach_session(&backend_id)?;
1003            Ok(format!("Detached from session {name}."))
1004        }
1005        Commands::Input { name, text } => {
1006            let input_text = text.as_deref().unwrap_or("\n");
1007            let body = serde_json::json!({ "text": input_text });
1008            let resp = authed_post(
1009                &client,
1010                format!("{url}/api/v1/sessions/{name}/input"),
1011                token.as_deref(),
1012            )
1013            .json(&body)
1014            .send()
1015            .await
1016            .map_err(|e| friendly_error(&e, node))?;
1017            ok_or_api_error(resp).await?;
1018            Ok(format!("Sent input to session {name}."))
1019        }
1020        Commands::List => {
1021            let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1022                .send()
1023                .await
1024                .map_err(|e| friendly_error(&e, node))?;
1025            let text = ok_or_api_error(resp).await?;
1026            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1027            Ok(format_sessions(&sessions))
1028        }
1029        Commands::Nodes => {
1030            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1031                .send()
1032                .await
1033                .map_err(|e| friendly_error(&e, node))?;
1034            let text = ok_or_api_error(resp).await?;
1035            let resp: PeersResponse = serde_json::from_str(&text)?;
1036            Ok(format_nodes(&resp))
1037        }
1038        Commands::Spawn {
1039            workdir,
1040            name,
1041            ink,
1042            description,
1043            detach,
1044            idle_threshold,
1045            auto,
1046            command,
1047        } => {
1048            let cmd = if command.is_empty() {
1049                None
1050            } else {
1051                Some(command.join(" "))
1052            };
1053            // Resolve workdir: --workdir flag > current directory
1054            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1055                std::env::current_dir()
1056                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1057            });
1058            // Resolve name: explicit > derived from workdir (with dedup)
1059            let resolved_name = if let Some(n) = name {
1060                n.clone()
1061            } else {
1062                let base_name = derive_session_name(&resolved_workdir);
1063                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1064            };
1065            let mut body = serde_json::json!({
1066                "name": resolved_name,
1067                "workdir": resolved_workdir,
1068            });
1069            if let Some(c) = &cmd {
1070                body["command"] = serde_json::json!(c);
1071            }
1072            if let Some(i) = ink {
1073                body["ink"] = serde_json::json!(i);
1074            }
1075            if let Some(d) = description {
1076                body["description"] = serde_json::json!(d);
1077            }
1078            if let Some(t) = idle_threshold {
1079                body["idle_threshold_secs"] = serde_json::json!(t);
1080            }
1081            let spawn_url = if *auto {
1082                let (auto_addr, auto_name) =
1083                    select_best_node(&client, &url, token.as_deref()).await?;
1084                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1085                base_url(&auto_addr)
1086            } else {
1087                url.clone()
1088            };
1089            let resp = authed_post(
1090                &client,
1091                format!("{spawn_url}/api/v1/sessions"),
1092                token.as_deref(),
1093            )
1094            .json(&body)
1095            .send()
1096            .await
1097            .map_err(|e| friendly_error(&e, node))?;
1098            let text = ok_or_api_error(resp).await?;
1099            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1100            let msg = format!(
1101                "Created session \"{}\" ({})",
1102                resp.session.name, resp.session.id
1103            );
1104            if !detach {
1105                let backend_id = resp
1106                    .session
1107                    .backend_session_id
1108                    .as_deref()
1109                    .unwrap_or(&resp.session.name);
1110                eprintln!("{msg}");
1111                attach_session(backend_id)?;
1112                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1113            }
1114            Ok(msg)
1115        }
1116        Commands::Kill { name } => {
1117            let resp = authed_post(
1118                &client,
1119                format!("{url}/api/v1/sessions/{name}/kill"),
1120                token.as_deref(),
1121            )
1122            .send()
1123            .await
1124            .map_err(|e| friendly_error(&e, node))?;
1125            ok_or_api_error(resp).await?;
1126            Ok(format!("Session {name} killed."))
1127        }
1128        Commands::Delete { name } => {
1129            let resp = authed_delete(
1130                &client,
1131                format!("{url}/api/v1/sessions/{name}"),
1132                token.as_deref(),
1133            )
1134            .send()
1135            .await
1136            .map_err(|e| friendly_error(&e, node))?;
1137            ok_or_api_error(resp).await?;
1138            Ok(format!("Session {name} deleted."))
1139        }
1140        Commands::Logs {
1141            name,
1142            lines,
1143            follow,
1144        } => {
1145            if *follow {
1146                let mut stdout = std::io::stdout();
1147                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1148                    .await
1149                    .map_err(|e| {
1150                        // Unwrap reqwest errors to friendly messages
1151                        match e.downcast::<reqwest::Error>() {
1152                            Ok(re) => friendly_error(&re, node),
1153                            Err(other) => other,
1154                        }
1155                    })?;
1156                Ok(String::new())
1157            } else {
1158                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1159                    .await
1160                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1161                        Ok(re) => friendly_error(&re, node),
1162                        Err(other) => other,
1163                    })?;
1164                Ok(output)
1165            }
1166        }
1167        Commands::Interventions { name } => {
1168            let resp = authed_get(
1169                &client,
1170                format!("{url}/api/v1/sessions/{name}/interventions"),
1171                token.as_deref(),
1172            )
1173            .send()
1174            .await
1175            .map_err(|e| friendly_error(&e, node))?;
1176            let text = ok_or_api_error(resp).await?;
1177            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1178            Ok(format_interventions(&events))
1179        }
1180        Commands::Ui => {
1181            let dashboard = base_url(node);
1182            open_browser(&dashboard)?;
1183            Ok(format!("Opening {dashboard}"))
1184        }
1185        Commands::Resume { name } => {
1186            let resp = authed_post(
1187                &client,
1188                format!("{url}/api/v1/sessions/{name}/resume"),
1189                token.as_deref(),
1190            )
1191            .send()
1192            .await
1193            .map_err(|e| friendly_error(&e, node))?;
1194            let text = ok_or_api_error(resp).await?;
1195            let session: Session = serde_json::from_str(&text)?;
1196            let backend_id = session
1197                .backend_session_id
1198                .as_deref()
1199                .unwrap_or(&session.name);
1200            eprintln!("Resumed session \"{}\"", session.name);
1201            attach_session(backend_id)?;
1202            Ok(format!("Detached from session \"{}\".", session.name))
1203        }
1204        Commands::Schedule { action } => execute_schedule(action, node),
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211
1212    #[test]
1213    fn test_base_url() {
1214        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1215        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1216    }
1217
1218    #[test]
1219    fn test_cli_parse_list() {
1220        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1221        assert_eq!(cli.node, "localhost:7433");
1222        assert!(matches!(cli.command, Some(Commands::List)));
1223    }
1224
1225    #[test]
1226    fn test_cli_parse_nodes() {
1227        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1228        assert!(matches!(cli.command, Some(Commands::Nodes)));
1229    }
1230
1231    #[test]
1232    fn test_cli_parse_ui() {
1233        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1234        assert!(matches!(cli.command, Some(Commands::Ui)));
1235    }
1236
1237    #[test]
1238    fn test_cli_parse_ui_custom_node() {
1239        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1240        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1241        assert_eq!(cli.node, "mac-mini:7433");
1242        assert_eq!(cli.path.as_deref(), Some("ui"));
1243    }
1244
1245    #[test]
1246    fn test_build_open_command() {
1247        let cmd = build_open_command("http://localhost:7433");
1248        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1249        assert_eq!(args, vec!["http://localhost:7433"]);
1250        #[cfg(target_os = "macos")]
1251        assert_eq!(cmd.get_program(), "open");
1252        #[cfg(target_os = "linux")]
1253        assert_eq!(cmd.get_program(), "xdg-open");
1254    }
1255
1256    #[test]
1257    fn test_cli_parse_spawn() {
1258        let cli = Cli::try_parse_from([
1259            "pulpo",
1260            "spawn",
1261            "my-task",
1262            "--workdir",
1263            "/tmp/repo",
1264            "--",
1265            "claude",
1266            "-p",
1267            "Fix the bug",
1268        ])
1269        .unwrap();
1270        assert!(matches!(
1271            &cli.command,
1272            Some(Commands::Spawn { name, workdir, command, .. })
1273                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1274                && command == &["claude", "-p", "Fix the bug"]
1275        ));
1276    }
1277
1278    #[test]
1279    fn test_cli_parse_spawn_with_ink() {
1280        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1281        assert!(matches!(
1282            &cli.command,
1283            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1284        ));
1285    }
1286
1287    #[test]
1288    fn test_cli_parse_spawn_with_description() {
1289        let cli =
1290            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1291                .unwrap();
1292        assert!(matches!(
1293            &cli.command,
1294            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1295        ));
1296    }
1297
1298    #[test]
1299    fn test_cli_parse_spawn_name_positional() {
1300        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1301        assert!(matches!(
1302            &cli.command,
1303            Some(Commands::Spawn { name, command, .. })
1304                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1305        ));
1306    }
1307
1308    #[test]
1309    fn test_cli_parse_spawn_no_command() {
1310        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1311        assert!(matches!(
1312            &cli.command,
1313            Some(Commands::Spawn { command, .. }) if command.is_empty()
1314        ));
1315    }
1316
1317    #[test]
1318    fn test_cli_parse_spawn_idle_threshold() {
1319        let cli =
1320            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1321        assert!(matches!(
1322            &cli.command,
1323            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1324        ));
1325    }
1326
1327    #[test]
1328    fn test_cli_parse_spawn_idle_threshold_60() {
1329        let cli =
1330            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1331        assert!(matches!(
1332            &cli.command,
1333            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1334        ));
1335    }
1336
1337    #[test]
1338    fn test_cli_parse_spawn_detach() {
1339        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1340        assert!(matches!(
1341            &cli.command,
1342            Some(Commands::Spawn { detach, .. }) if *detach
1343        ));
1344    }
1345
1346    #[test]
1347    fn test_cli_parse_spawn_detach_short() {
1348        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1349        assert!(matches!(
1350            &cli.command,
1351            Some(Commands::Spawn { detach, .. }) if *detach
1352        ));
1353    }
1354
1355    #[test]
1356    fn test_cli_parse_spawn_detach_default() {
1357        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1358        assert!(matches!(
1359            &cli.command,
1360            Some(Commands::Spawn { detach, .. }) if !detach
1361        ));
1362    }
1363
1364    #[test]
1365    fn test_cli_parse_logs() {
1366        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1367        assert!(matches!(
1368            &cli.command,
1369            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1370        ));
1371    }
1372
1373    #[test]
1374    fn test_cli_parse_logs_with_lines() {
1375        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1376        assert!(matches!(
1377            &cli.command,
1378            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1379        ));
1380    }
1381
1382    #[test]
1383    fn test_cli_parse_logs_follow() {
1384        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1385        assert!(matches!(
1386            &cli.command,
1387            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1388        ));
1389    }
1390
1391    #[test]
1392    fn test_cli_parse_logs_follow_short() {
1393        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1394        assert!(matches!(
1395            &cli.command,
1396            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1397        ));
1398    }
1399
1400    #[test]
1401    fn test_cli_parse_kill() {
1402        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1403        assert!(matches!(
1404            &cli.command,
1405            Some(Commands::Kill { name }) if name == "my-session"
1406        ));
1407    }
1408
1409    #[test]
1410    fn test_cli_parse_delete() {
1411        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1412        assert!(matches!(
1413            &cli.command,
1414            Some(Commands::Delete { name }) if name == "my-session"
1415        ));
1416    }
1417
1418    #[test]
1419    fn test_cli_parse_resume() {
1420        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1421        assert!(matches!(
1422            &cli.command,
1423            Some(Commands::Resume { name }) if name == "my-session"
1424        ));
1425    }
1426
1427    #[test]
1428    fn test_cli_parse_input() {
1429        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1430        assert!(matches!(
1431            &cli.command,
1432            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1433        ));
1434    }
1435
1436    #[test]
1437    fn test_cli_parse_input_no_text() {
1438        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1439        assert!(matches!(
1440            &cli.command,
1441            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1442        ));
1443    }
1444
1445    #[test]
1446    fn test_cli_parse_input_alias() {
1447        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1448        assert!(matches!(
1449            &cli.command,
1450            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1451        ));
1452    }
1453
1454    #[test]
1455    fn test_cli_parse_custom_node() {
1456        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1457        assert_eq!(cli.node, "win-pc:8080");
1458        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
1459        assert_eq!(cli.path.as_deref(), Some("list"));
1460    }
1461
1462    #[test]
1463    fn test_cli_version() {
1464        let result = Cli::try_parse_from(["pulpo", "--version"]);
1465        // clap exits with an error (kind DisplayVersion) when --version is used
1466        let err = result.unwrap_err();
1467        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1468    }
1469
1470    #[test]
1471    fn test_cli_parse_no_subcommand_succeeds() {
1472        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1473        assert!(cli.command.is_none());
1474        assert!(cli.path.is_none());
1475    }
1476
1477    #[test]
1478    fn test_cli_debug() {
1479        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1480        let debug = format!("{cli:?}");
1481        assert!(debug.contains("List"));
1482    }
1483
1484    #[test]
1485    fn test_commands_debug() {
1486        let cmd = Commands::List;
1487        assert_eq!(format!("{cmd:?}"), "List");
1488    }
1489
1490    /// A valid Session JSON for test responses.
1491    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1492
1493    /// A valid `CreateSessionResponse` JSON wrapping the session.
1494    fn test_create_response_json() -> String {
1495        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1496    }
1497
1498    /// Start a lightweight test HTTP server and return its address.
1499    async fn start_test_server() -> String {
1500        use axum::http::StatusCode;
1501        use axum::{
1502            Json, Router,
1503            routing::{get, post},
1504        };
1505
1506        let create_json = test_create_response_json();
1507
1508        let app = Router::new()
1509            .route(
1510                "/api/v1/sessions",
1511                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1512                    (StatusCode::CREATED, create_json.clone())
1513                }),
1514            )
1515            .route(
1516                "/api/v1/sessions/{id}",
1517                get(|| async { TEST_SESSION_JSON.to_owned() })
1518                    .delete(|| async { StatusCode::NO_CONTENT }),
1519            )
1520            .route(
1521                "/api/v1/sessions/{id}/kill",
1522                post(|| async { StatusCode::NO_CONTENT }),
1523            )
1524            .route(
1525                "/api/v1/sessions/{id}/output",
1526                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1527            )
1528            .route(
1529                "/api/v1/peers",
1530                get(|| async {
1531                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1532                }),
1533            )
1534            .route(
1535                "/api/v1/sessions/{id}/resume",
1536                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1537            )
1538            .route(
1539                "/api/v1/sessions/{id}/interventions",
1540                get(|| async { "[]".to_owned() }),
1541            )
1542            .route(
1543                "/api/v1/sessions/{id}/input",
1544                post(|| async { StatusCode::NO_CONTENT }),
1545            );
1546
1547        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1548        let addr = listener.local_addr().unwrap();
1549        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1550        format!("127.0.0.1:{}", addr.port())
1551    }
1552
1553    #[tokio::test]
1554    async fn test_execute_list_success() {
1555        let node = start_test_server().await;
1556        let cli = Cli {
1557            node,
1558            token: None,
1559            command: Some(Commands::List),
1560            path: None,
1561        };
1562        let result = execute(&cli).await.unwrap();
1563        assert_eq!(result, "No sessions.");
1564    }
1565
1566    #[tokio::test]
1567    async fn test_execute_nodes_success() {
1568        let node = start_test_server().await;
1569        let cli = Cli {
1570            node,
1571            token: None,
1572            command: Some(Commands::Nodes),
1573            path: None,
1574        };
1575        let result = execute(&cli).await.unwrap();
1576        assert!(result.contains("test"));
1577        assert!(result.contains("(local)"));
1578        assert!(result.contains("NAME"));
1579    }
1580
1581    #[tokio::test]
1582    async fn test_execute_spawn_success() {
1583        let node = start_test_server().await;
1584        let cli = Cli {
1585            node,
1586            token: None,
1587            command: Some(Commands::Spawn {
1588                name: Some("test".into()),
1589                workdir: Some("/tmp/repo".into()),
1590                ink: None,
1591                description: None,
1592                detach: true,
1593                idle_threshold: None,
1594                auto: false,
1595                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1596            }),
1597            path: None,
1598        };
1599        let result = execute(&cli).await.unwrap();
1600        assert!(result.contains("Created session"));
1601        assert!(result.contains("repo"));
1602    }
1603
1604    #[tokio::test]
1605    async fn test_execute_spawn_with_all_flags() {
1606        let node = start_test_server().await;
1607        let cli = Cli {
1608            node,
1609            token: None,
1610            command: Some(Commands::Spawn {
1611                name: Some("test".into()),
1612                workdir: Some("/tmp/repo".into()),
1613                ink: Some("coder".into()),
1614                description: Some("Fix the bug".into()),
1615                detach: true,
1616                idle_threshold: None,
1617                auto: false,
1618                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1619            }),
1620            path: None,
1621        };
1622        let result = execute(&cli).await.unwrap();
1623        assert!(result.contains("Created session"));
1624    }
1625
1626    #[tokio::test]
1627    async fn test_execute_spawn_no_command() {
1628        let node = start_test_server().await;
1629        let cli = Cli {
1630            node,
1631            token: None,
1632            command: Some(Commands::Spawn {
1633                name: Some("test".into()),
1634                workdir: Some("/tmp/repo".into()),
1635                ink: None,
1636                description: None,
1637                detach: true,
1638                idle_threshold: None,
1639                auto: false,
1640                command: vec![],
1641            }),
1642            path: None,
1643        };
1644        let result = execute(&cli).await.unwrap();
1645        assert!(result.contains("Created session"));
1646    }
1647
1648    #[tokio::test]
1649    async fn test_execute_spawn_with_name() {
1650        let node = start_test_server().await;
1651        let cli = Cli {
1652            node,
1653            token: None,
1654            command: Some(Commands::Spawn {
1655                name: Some("my-task".into()),
1656                workdir: Some("/tmp/repo".into()),
1657                ink: None,
1658                description: None,
1659                detach: true,
1660                idle_threshold: None,
1661                auto: false,
1662                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1663            }),
1664            path: None,
1665        };
1666        let result = execute(&cli).await.unwrap();
1667        assert!(result.contains("Created session"));
1668    }
1669
1670    #[tokio::test]
1671    async fn test_execute_spawn_auto_attach() {
1672        let node = start_test_server().await;
1673        let cli = Cli {
1674            node,
1675            token: None,
1676            command: Some(Commands::Spawn {
1677                name: Some("test".into()),
1678                workdir: Some("/tmp/repo".into()),
1679                ink: None,
1680                description: None,
1681                detach: false,
1682                idle_threshold: None,
1683                auto: false,
1684                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1685            }),
1686            path: None,
1687        };
1688        let result = execute(&cli).await.unwrap();
1689        // When not detached, spawn prints creation to stderr and returns detach message
1690        assert!(result.contains("Detached from session"));
1691    }
1692
1693    #[tokio::test]
1694    async fn test_execute_kill_success() {
1695        let node = start_test_server().await;
1696        let cli = Cli {
1697            node,
1698            token: None,
1699            command: Some(Commands::Kill {
1700                name: "test-session".into(),
1701            }),
1702            path: None,
1703        };
1704        let result = execute(&cli).await.unwrap();
1705        assert!(result.contains("killed"));
1706    }
1707
1708    #[tokio::test]
1709    async fn test_execute_delete_success() {
1710        let node = start_test_server().await;
1711        let cli = Cli {
1712            node,
1713            token: None,
1714            command: Some(Commands::Delete {
1715                name: "test-session".into(),
1716            }),
1717            path: None,
1718        };
1719        let result = execute(&cli).await.unwrap();
1720        assert!(result.contains("deleted"));
1721    }
1722
1723    #[tokio::test]
1724    async fn test_execute_logs_success() {
1725        let node = start_test_server().await;
1726        let cli = Cli {
1727            node,
1728            token: None,
1729            command: Some(Commands::Logs {
1730                name: "test-session".into(),
1731                lines: 50,
1732                follow: false,
1733            }),
1734            path: None,
1735        };
1736        let result = execute(&cli).await.unwrap();
1737        assert!(result.contains("test output"));
1738    }
1739
1740    #[tokio::test]
1741    async fn test_execute_list_connection_refused() {
1742        let cli = Cli {
1743            node: "localhost:1".into(),
1744            token: None,
1745            command: Some(Commands::List),
1746            path: None,
1747        };
1748        let result = execute(&cli).await;
1749        let err = result.unwrap_err().to_string();
1750        assert!(
1751            err.contains("Could not connect to pulpod"),
1752            "Expected friendly error, got: {err}"
1753        );
1754        assert!(err.contains("localhost:1"));
1755    }
1756
1757    #[tokio::test]
1758    async fn test_execute_nodes_connection_refused() {
1759        let cli = Cli {
1760            node: "localhost:1".into(),
1761            token: None,
1762            command: Some(Commands::Nodes),
1763            path: None,
1764        };
1765        let result = execute(&cli).await;
1766        let err = result.unwrap_err().to_string();
1767        assert!(err.contains("Could not connect to pulpod"));
1768    }
1769
1770    #[tokio::test]
1771    async fn test_execute_kill_error_response() {
1772        use axum::{Router, http::StatusCode, routing::post};
1773
1774        let app = Router::new().route(
1775            "/api/v1/sessions/{id}/kill",
1776            post(|| async {
1777                (
1778                    StatusCode::NOT_FOUND,
1779                    "{\"error\":\"session not found: test-session\"}",
1780                )
1781            }),
1782        );
1783        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1784        let addr = listener.local_addr().unwrap();
1785        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1786        let node = format!("127.0.0.1:{}", addr.port());
1787
1788        let cli = Cli {
1789            node,
1790            token: None,
1791            command: Some(Commands::Kill {
1792                name: "test-session".into(),
1793            }),
1794            path: None,
1795        };
1796        let err = execute(&cli).await.unwrap_err();
1797        assert_eq!(err.to_string(), "session not found: test-session");
1798    }
1799
1800    #[tokio::test]
1801    async fn test_execute_delete_error_response() {
1802        use axum::{Router, http::StatusCode, routing::delete};
1803
1804        let app = Router::new().route(
1805            "/api/v1/sessions/{id}",
1806            delete(|| async {
1807                (
1808                    StatusCode::CONFLICT,
1809                    "{\"error\":\"cannot delete session in 'running' state\"}",
1810                )
1811            }),
1812        );
1813        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1814        let addr = listener.local_addr().unwrap();
1815        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1816        let node = format!("127.0.0.1:{}", addr.port());
1817
1818        let cli = Cli {
1819            node,
1820            token: None,
1821            command: Some(Commands::Delete {
1822                name: "test-session".into(),
1823            }),
1824            path: None,
1825        };
1826        let err = execute(&cli).await.unwrap_err();
1827        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1828    }
1829
1830    #[tokio::test]
1831    async fn test_execute_logs_error_response() {
1832        use axum::{Router, http::StatusCode, routing::get};
1833
1834        let app = Router::new().route(
1835            "/api/v1/sessions/{id}/output",
1836            get(|| async {
1837                (
1838                    StatusCode::NOT_FOUND,
1839                    "{\"error\":\"session not found: ghost\"}",
1840                )
1841            }),
1842        );
1843        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1844        let addr = listener.local_addr().unwrap();
1845        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1846        let node = format!("127.0.0.1:{}", addr.port());
1847
1848        let cli = Cli {
1849            node,
1850            token: None,
1851            command: Some(Commands::Logs {
1852                name: "ghost".into(),
1853                lines: 50,
1854                follow: false,
1855            }),
1856            path: None,
1857        };
1858        let err = execute(&cli).await.unwrap_err();
1859        assert_eq!(err.to_string(), "session not found: ghost");
1860    }
1861
1862    #[tokio::test]
1863    async fn test_execute_resume_error_response() {
1864        use axum::{Router, http::StatusCode, routing::post};
1865
1866        let app = Router::new().route(
1867            "/api/v1/sessions/{id}/resume",
1868            post(|| async {
1869                (
1870                    StatusCode::BAD_REQUEST,
1871                    "{\"error\":\"session is not lost (status: active)\"}",
1872                )
1873            }),
1874        );
1875        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1876        let addr = listener.local_addr().unwrap();
1877        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1878        let node = format!("127.0.0.1:{}", addr.port());
1879
1880        let cli = Cli {
1881            node,
1882            token: None,
1883            command: Some(Commands::Resume {
1884                name: "test-session".into(),
1885            }),
1886            path: None,
1887        };
1888        let err = execute(&cli).await.unwrap_err();
1889        assert_eq!(err.to_string(), "session is not lost (status: active)");
1890    }
1891
1892    #[tokio::test]
1893    async fn test_execute_spawn_error_response() {
1894        use axum::{Router, http::StatusCode, routing::post};
1895
1896        let app = Router::new().route(
1897            "/api/v1/sessions",
1898            post(|| async {
1899                (
1900                    StatusCode::INTERNAL_SERVER_ERROR,
1901                    "{\"error\":\"failed to spawn session\"}",
1902                )
1903            }),
1904        );
1905        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1906        let addr = listener.local_addr().unwrap();
1907        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1908        let node = format!("127.0.0.1:{}", addr.port());
1909
1910        let cli = Cli {
1911            node,
1912            token: None,
1913            command: Some(Commands::Spawn {
1914                name: Some("test".into()),
1915                workdir: Some("/tmp/repo".into()),
1916                ink: None,
1917                description: None,
1918                detach: true,
1919                idle_threshold: None,
1920                auto: false,
1921                command: vec!["test".into()],
1922            }),
1923            path: None,
1924        };
1925        let err = execute(&cli).await.unwrap_err();
1926        assert_eq!(err.to_string(), "failed to spawn session");
1927    }
1928
1929    #[tokio::test]
1930    async fn test_execute_interventions_error_response() {
1931        use axum::{Router, http::StatusCode, routing::get};
1932
1933        let app = Router::new().route(
1934            "/api/v1/sessions/{id}/interventions",
1935            get(|| async {
1936                (
1937                    StatusCode::NOT_FOUND,
1938                    "{\"error\":\"session not found: ghost\"}",
1939                )
1940            }),
1941        );
1942        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1943        let addr = listener.local_addr().unwrap();
1944        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1945        let node = format!("127.0.0.1:{}", addr.port());
1946
1947        let cli = Cli {
1948            node,
1949            token: None,
1950            command: Some(Commands::Interventions {
1951                name: "ghost".into(),
1952            }),
1953            path: None,
1954        };
1955        let err = execute(&cli).await.unwrap_err();
1956        assert_eq!(err.to_string(), "session not found: ghost");
1957    }
1958
1959    #[tokio::test]
1960    async fn test_execute_resume_success() {
1961        let node = start_test_server().await;
1962        let cli = Cli {
1963            node,
1964            token: None,
1965            command: Some(Commands::Resume {
1966                name: "test-session".into(),
1967            }),
1968            path: None,
1969        };
1970        let result = execute(&cli).await.unwrap();
1971        assert!(result.contains("Detached from session"));
1972    }
1973
1974    #[tokio::test]
1975    async fn test_execute_input_success() {
1976        let node = start_test_server().await;
1977        let cli = Cli {
1978            node,
1979            token: None,
1980            command: Some(Commands::Input {
1981                name: "test-session".into(),
1982                text: Some("yes".into()),
1983            }),
1984            path: None,
1985        };
1986        let result = execute(&cli).await.unwrap();
1987        assert!(result.contains("Sent input to session test-session"));
1988    }
1989
1990    #[tokio::test]
1991    async fn test_execute_input_no_text() {
1992        let node = start_test_server().await;
1993        let cli = Cli {
1994            node,
1995            token: None,
1996            command: Some(Commands::Input {
1997                name: "test-session".into(),
1998                text: None,
1999            }),
2000            path: None,
2001        };
2002        let result = execute(&cli).await.unwrap();
2003        assert!(result.contains("Sent input to session test-session"));
2004    }
2005
2006    #[tokio::test]
2007    async fn test_execute_input_connection_refused() {
2008        let cli = Cli {
2009            node: "localhost:1".into(),
2010            token: None,
2011            command: Some(Commands::Input {
2012                name: "test".into(),
2013                text: Some("y".into()),
2014            }),
2015            path: None,
2016        };
2017        let result = execute(&cli).await;
2018        let err = result.unwrap_err().to_string();
2019        assert!(err.contains("Could not connect to pulpod"));
2020    }
2021
2022    #[tokio::test]
2023    async fn test_execute_input_error_response() {
2024        use axum::{Router, http::StatusCode, routing::post};
2025
2026        let app = Router::new().route(
2027            "/api/v1/sessions/{id}/input",
2028            post(|| async {
2029                (
2030                    StatusCode::NOT_FOUND,
2031                    "{\"error\":\"session not found: ghost\"}",
2032                )
2033            }),
2034        );
2035        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2036        let addr = listener.local_addr().unwrap();
2037        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2038        let node = format!("127.0.0.1:{}", addr.port());
2039
2040        let cli = Cli {
2041            node,
2042            token: None,
2043            command: Some(Commands::Input {
2044                name: "ghost".into(),
2045                text: Some("y".into()),
2046            }),
2047            path: None,
2048        };
2049        let err = execute(&cli).await.unwrap_err();
2050        assert_eq!(err.to_string(), "session not found: ghost");
2051    }
2052
2053    #[tokio::test]
2054    async fn test_execute_ui() {
2055        let cli = Cli {
2056            node: "localhost:7433".into(),
2057            token: None,
2058            command: Some(Commands::Ui),
2059            path: None,
2060        };
2061        let result = execute(&cli).await.unwrap();
2062        assert!(result.contains("Opening"));
2063        assert!(result.contains("http://localhost:7433"));
2064    }
2065
2066    #[tokio::test]
2067    async fn test_execute_ui_custom_node() {
2068        let cli = Cli {
2069            node: "mac-mini:7433".into(),
2070            token: None,
2071            command: Some(Commands::Ui),
2072            path: None,
2073        };
2074        let result = execute(&cli).await.unwrap();
2075        assert!(result.contains("http://mac-mini:7433"));
2076    }
2077
2078    #[test]
2079    fn test_format_sessions_empty() {
2080        assert_eq!(format_sessions(&[]), "No sessions.");
2081    }
2082
2083    #[test]
2084    fn test_format_sessions_with_data() {
2085        use chrono::Utc;
2086        use pulpo_common::session::SessionStatus;
2087        use uuid::Uuid;
2088
2089        let sessions = vec![Session {
2090            id: Uuid::nil(),
2091            name: "my-api".into(),
2092            workdir: "/tmp/repo".into(),
2093            command: "claude -p 'Fix the bug'".into(),
2094            description: Some("Fix the bug".into()),
2095            status: SessionStatus::Active,
2096            exit_code: None,
2097            backend_session_id: None,
2098            output_snapshot: None,
2099            metadata: None,
2100            ink: None,
2101            intervention_code: None,
2102            intervention_reason: None,
2103            intervention_at: None,
2104            last_output_at: None,
2105            idle_since: None,
2106            idle_threshold_secs: None,
2107            created_at: Utc::now(),
2108            updated_at: Utc::now(),
2109        }];
2110        let output = format_sessions(&sessions);
2111        assert!(output.contains("NAME"));
2112        assert!(output.contains("COMMAND"));
2113        assert!(output.contains("my-api"));
2114        assert!(output.contains("active"));
2115        assert!(output.contains("claude -p 'Fix the bug'"));
2116    }
2117
2118    #[test]
2119    fn test_format_sessions_long_command_truncated() {
2120        use chrono::Utc;
2121        use pulpo_common::session::SessionStatus;
2122        use uuid::Uuid;
2123
2124        let sessions = vec![Session {
2125            id: Uuid::nil(),
2126            name: "test".into(),
2127            workdir: "/tmp".into(),
2128            command:
2129                "claude -p 'A very long command that exceeds fifty characters in total length here'"
2130                    .into(),
2131            description: None,
2132            status: SessionStatus::Ready,
2133            exit_code: None,
2134            backend_session_id: None,
2135            output_snapshot: None,
2136            metadata: None,
2137            ink: None,
2138            intervention_code: None,
2139            intervention_reason: None,
2140            intervention_at: None,
2141            last_output_at: None,
2142            idle_since: None,
2143            idle_threshold_secs: None,
2144            created_at: Utc::now(),
2145            updated_at: Utc::now(),
2146        }];
2147        let output = format_sessions(&sessions);
2148        assert!(output.contains("..."));
2149    }
2150
2151    #[test]
2152    fn test_format_nodes() {
2153        use pulpo_common::node::NodeInfo;
2154        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2155
2156        let resp = PeersResponse {
2157            local: NodeInfo {
2158                name: "mac-mini".into(),
2159                hostname: "h".into(),
2160                os: "macos".into(),
2161                arch: "arm64".into(),
2162                cpus: 8,
2163                memory_mb: 16384,
2164                gpu: None,
2165            },
2166            peers: vec![PeerInfo {
2167                name: "win-pc".into(),
2168                address: "win-pc:7433".into(),
2169                status: PeerStatus::Online,
2170                node_info: None,
2171                session_count: Some(3),
2172                source: PeerSource::Configured,
2173            }],
2174        };
2175        let output = format_nodes(&resp);
2176        assert!(output.contains("mac-mini"));
2177        assert!(output.contains("(local)"));
2178        assert!(output.contains("win-pc"));
2179        assert!(output.contains('3'));
2180    }
2181
2182    #[test]
2183    fn test_format_nodes_no_session_count() {
2184        use pulpo_common::node::NodeInfo;
2185        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2186
2187        let resp = PeersResponse {
2188            local: NodeInfo {
2189                name: "local".into(),
2190                hostname: "h".into(),
2191                os: "linux".into(),
2192                arch: "x86_64".into(),
2193                cpus: 4,
2194                memory_mb: 8192,
2195                gpu: None,
2196            },
2197            peers: vec![PeerInfo {
2198                name: "peer".into(),
2199                address: "peer:7433".into(),
2200                status: PeerStatus::Offline,
2201                node_info: None,
2202                session_count: None,
2203                source: PeerSource::Configured,
2204            }],
2205        };
2206        let output = format_nodes(&resp);
2207        assert!(output.contains("offline"));
2208        // No session count → shows "-"
2209        let lines: Vec<&str> = output.lines().collect();
2210        assert!(lines[2].contains('-'));
2211    }
2212
2213    #[tokio::test]
2214    async fn test_execute_resume_connection_refused() {
2215        let cli = Cli {
2216            node: "localhost:1".into(),
2217            token: None,
2218            command: Some(Commands::Resume {
2219                name: "test".into(),
2220            }),
2221            path: None,
2222        };
2223        let result = execute(&cli).await;
2224        let err = result.unwrap_err().to_string();
2225        assert!(err.contains("Could not connect to pulpod"));
2226    }
2227
2228    #[tokio::test]
2229    async fn test_execute_spawn_connection_refused() {
2230        let cli = Cli {
2231            node: "localhost:1".into(),
2232            token: None,
2233            command: Some(Commands::Spawn {
2234                name: Some("test".into()),
2235                workdir: Some("/tmp".into()),
2236                ink: None,
2237                description: None,
2238                detach: true,
2239                idle_threshold: None,
2240                auto: false,
2241                command: vec!["test".into()],
2242            }),
2243            path: None,
2244        };
2245        let result = execute(&cli).await;
2246        let err = result.unwrap_err().to_string();
2247        assert!(err.contains("Could not connect to pulpod"));
2248    }
2249
2250    #[tokio::test]
2251    async fn test_execute_kill_connection_refused() {
2252        let cli = Cli {
2253            node: "localhost:1".into(),
2254            token: None,
2255            command: Some(Commands::Kill {
2256                name: "test".into(),
2257            }),
2258            path: None,
2259        };
2260        let result = execute(&cli).await;
2261        let err = result.unwrap_err().to_string();
2262        assert!(err.contains("Could not connect to pulpod"));
2263    }
2264
2265    #[tokio::test]
2266    async fn test_execute_delete_connection_refused() {
2267        let cli = Cli {
2268            node: "localhost:1".into(),
2269            token: None,
2270            command: Some(Commands::Delete {
2271                name: "test".into(),
2272            }),
2273            path: None,
2274        };
2275        let result = execute(&cli).await;
2276        let err = result.unwrap_err().to_string();
2277        assert!(err.contains("Could not connect to pulpod"));
2278    }
2279
2280    #[tokio::test]
2281    async fn test_execute_logs_connection_refused() {
2282        let cli = Cli {
2283            node: "localhost:1".into(),
2284            token: None,
2285            command: Some(Commands::Logs {
2286                name: "test".into(),
2287                lines: 50,
2288                follow: false,
2289            }),
2290            path: None,
2291        };
2292        let result = execute(&cli).await;
2293        let err = result.unwrap_err().to_string();
2294        assert!(err.contains("Could not connect to pulpod"));
2295    }
2296
2297    #[tokio::test]
2298    async fn test_friendly_error_connect() {
2299        // Make a request to a closed port to get a connect error
2300        let err = reqwest::Client::new()
2301            .get("http://127.0.0.1:1")
2302            .send()
2303            .await
2304            .unwrap_err();
2305        let friendly = friendly_error(&err, "test-node:1");
2306        let msg = friendly.to_string();
2307        assert!(
2308            msg.contains("Could not connect"),
2309            "Expected connect message, got: {msg}"
2310        );
2311    }
2312
2313    #[tokio::test]
2314    async fn test_friendly_error_other() {
2315        // A request to an invalid URL creates a builder error, not a connect error
2316        let err = reqwest::Client::new()
2317            .get("http://[::invalid::url")
2318            .send()
2319            .await
2320            .unwrap_err();
2321        let friendly = friendly_error(&err, "bad-host");
2322        let msg = friendly.to_string();
2323        assert!(
2324            msg.contains("Network error"),
2325            "Expected network error message, got: {msg}"
2326        );
2327        assert!(msg.contains("bad-host"));
2328    }
2329
2330    // -- Auth helper tests --
2331
2332    #[test]
2333    fn test_is_localhost_variants() {
2334        assert!(is_localhost("localhost:7433"));
2335        assert!(is_localhost("127.0.0.1:7433"));
2336        assert!(is_localhost("[::1]:7433"));
2337        assert!(is_localhost("::1"));
2338        assert!(is_localhost("localhost"));
2339        assert!(!is_localhost("mac-mini:7433"));
2340        assert!(!is_localhost("192.168.1.100:7433"));
2341    }
2342
2343    #[test]
2344    fn test_authed_get_with_token() {
2345        let client = reqwest::Client::new();
2346        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2347            .build()
2348            .unwrap();
2349        let auth = req
2350            .headers()
2351            .get("authorization")
2352            .unwrap()
2353            .to_str()
2354            .unwrap();
2355        assert_eq!(auth, "Bearer tok");
2356    }
2357
2358    #[test]
2359    fn test_authed_get_without_token() {
2360        let client = reqwest::Client::new();
2361        let req = authed_get(&client, "http://h:1/api".into(), None)
2362            .build()
2363            .unwrap();
2364        assert!(req.headers().get("authorization").is_none());
2365    }
2366
2367    #[test]
2368    fn test_authed_post_with_token() {
2369        let client = reqwest::Client::new();
2370        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2371            .build()
2372            .unwrap();
2373        let auth = req
2374            .headers()
2375            .get("authorization")
2376            .unwrap()
2377            .to_str()
2378            .unwrap();
2379        assert_eq!(auth, "Bearer secret");
2380    }
2381
2382    #[test]
2383    fn test_authed_post_without_token() {
2384        let client = reqwest::Client::new();
2385        let req = authed_post(&client, "http://h:1/api".into(), None)
2386            .build()
2387            .unwrap();
2388        assert!(req.headers().get("authorization").is_none());
2389    }
2390
2391    #[test]
2392    fn test_authed_delete_with_token() {
2393        let client = reqwest::Client::new();
2394        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2395            .build()
2396            .unwrap();
2397        let auth = req
2398            .headers()
2399            .get("authorization")
2400            .unwrap()
2401            .to_str()
2402            .unwrap();
2403        assert_eq!(auth, "Bearer del-tok");
2404    }
2405
2406    #[test]
2407    fn test_authed_delete_without_token() {
2408        let client = reqwest::Client::new();
2409        let req = authed_delete(&client, "http://h:1/api".into(), None)
2410            .build()
2411            .unwrap();
2412        assert!(req.headers().get("authorization").is_none());
2413    }
2414
2415    #[tokio::test]
2416    async fn test_resolve_token_explicit() {
2417        let client = reqwest::Client::new();
2418        let token =
2419            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2420        assert_eq!(token, Some("my-tok".into()));
2421    }
2422
2423    #[tokio::test]
2424    async fn test_resolve_token_remote_no_explicit() {
2425        let client = reqwest::Client::new();
2426        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2427        assert_eq!(token, None);
2428    }
2429
2430    #[tokio::test]
2431    async fn test_resolve_token_localhost_auto_discover() {
2432        use axum::{Json, Router, routing::get};
2433
2434        let app = Router::new().route(
2435            "/api/v1/auth/token",
2436            get(|| async {
2437                Json(AuthTokenResponse {
2438                    token: "discovered".into(),
2439                })
2440            }),
2441        );
2442        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2443        let addr = listener.local_addr().unwrap();
2444        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2445
2446        let node = format!("localhost:{}", addr.port());
2447        let base = base_url(&node);
2448        let client = reqwest::Client::new();
2449        let token = resolve_token(&client, &base, &node, None).await;
2450        assert_eq!(token, Some("discovered".into()));
2451    }
2452
2453    #[tokio::test]
2454    async fn test_discover_token_empty_returns_none() {
2455        use axum::{Json, Router, routing::get};
2456
2457        let app = Router::new().route(
2458            "/api/v1/auth/token",
2459            get(|| async {
2460                Json(AuthTokenResponse {
2461                    token: String::new(),
2462                })
2463            }),
2464        );
2465        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2466        let addr = listener.local_addr().unwrap();
2467        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2468
2469        let base = format!("http://127.0.0.1:{}", addr.port());
2470        let client = reqwest::Client::new();
2471        assert_eq!(discover_token(&client, &base).await, None);
2472    }
2473
2474    #[tokio::test]
2475    async fn test_discover_token_unreachable_returns_none() {
2476        let client = reqwest::Client::new();
2477        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2478    }
2479
2480    #[test]
2481    fn test_cli_parse_with_token() {
2482        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2483        assert_eq!(cli.token, Some("my-secret".into()));
2484    }
2485
2486    #[test]
2487    fn test_cli_parse_without_token() {
2488        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2489        assert_eq!(cli.token, None);
2490    }
2491
2492    #[tokio::test]
2493    async fn test_execute_with_explicit_token_sends_header() {
2494        use axum::{Router, extract::Request, http::StatusCode, routing::get};
2495
2496        let app = Router::new().route(
2497            "/api/v1/sessions",
2498            get(|req: Request| async move {
2499                let auth = req
2500                    .headers()
2501                    .get("authorization")
2502                    .and_then(|v| v.to_str().ok())
2503                    .unwrap_or("");
2504                assert_eq!(auth, "Bearer test-token");
2505                (StatusCode::OK, "[]".to_owned())
2506            }),
2507        );
2508        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2509        let addr = listener.local_addr().unwrap();
2510        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2511        let node = format!("127.0.0.1:{}", addr.port());
2512
2513        let cli = Cli {
2514            node,
2515            token: Some("test-token".into()),
2516            command: Some(Commands::List),
2517            path: None,
2518        };
2519        let result = execute(&cli).await.unwrap();
2520        assert_eq!(result, "No sessions.");
2521    }
2522
2523    // -- Interventions tests --
2524
2525    #[test]
2526    fn test_cli_parse_interventions() {
2527        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2528        assert!(matches!(
2529            &cli.command,
2530            Some(Commands::Interventions { name }) if name == "my-session"
2531        ));
2532    }
2533
2534    #[test]
2535    fn test_format_interventions_empty() {
2536        assert_eq!(format_interventions(&[]), "No intervention events.");
2537    }
2538
2539    #[test]
2540    fn test_format_interventions_with_data() {
2541        let events = vec![
2542            InterventionEventResponse {
2543                id: 1,
2544                session_id: "sess-1".into(),
2545                code: None,
2546                reason: "Memory exceeded threshold".into(),
2547                created_at: "2026-01-01T00:00:00Z".into(),
2548            },
2549            InterventionEventResponse {
2550                id: 2,
2551                session_id: "sess-1".into(),
2552                code: None,
2553                reason: "Idle for 10 minutes".into(),
2554                created_at: "2026-01-02T00:00:00Z".into(),
2555            },
2556        ];
2557        let output = format_interventions(&events);
2558        assert!(output.contains("ID"));
2559        assert!(output.contains("TIMESTAMP"));
2560        assert!(output.contains("REASON"));
2561        assert!(output.contains("Memory exceeded threshold"));
2562        assert!(output.contains("Idle for 10 minutes"));
2563        assert!(output.contains("2026-01-01T00:00:00Z"));
2564    }
2565
2566    #[tokio::test]
2567    async fn test_execute_interventions_empty() {
2568        let node = start_test_server().await;
2569        let cli = Cli {
2570            node,
2571            token: None,
2572            command: Some(Commands::Interventions {
2573                name: "my-session".into(),
2574            }),
2575            path: None,
2576        };
2577        let result = execute(&cli).await.unwrap();
2578        assert_eq!(result, "No intervention events.");
2579    }
2580
2581    #[tokio::test]
2582    async fn test_execute_interventions_with_data() {
2583        use axum::{Router, routing::get};
2584
2585        let app = Router::new().route(
2586            "/api/v1/sessions/{id}/interventions",
2587            get(|| async {
2588                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2589                    .to_owned()
2590            }),
2591        );
2592        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2593        let addr = listener.local_addr().unwrap();
2594        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2595        let node = format!("127.0.0.1:{}", addr.port());
2596
2597        let cli = Cli {
2598            node,
2599            token: None,
2600            command: Some(Commands::Interventions {
2601                name: "test".into(),
2602            }),
2603            path: None,
2604        };
2605        let result = execute(&cli).await.unwrap();
2606        assert!(result.contains("OOM"));
2607        assert!(result.contains("2026-01-01T00:00:00Z"));
2608    }
2609
2610    #[tokio::test]
2611    async fn test_execute_interventions_connection_refused() {
2612        let cli = Cli {
2613            node: "localhost:1".into(),
2614            token: None,
2615            command: Some(Commands::Interventions {
2616                name: "test".into(),
2617            }),
2618            path: None,
2619        };
2620        let result = execute(&cli).await;
2621        let err = result.unwrap_err().to_string();
2622        assert!(err.contains("Could not connect to pulpod"));
2623    }
2624
2625    // -- Attach command tests --
2626
2627    #[test]
2628    fn test_build_attach_command() {
2629        let cmd = build_attach_command("my-session");
2630        assert_eq!(cmd.get_program(), "tmux");
2631        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2632        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2633    }
2634
2635    #[test]
2636    fn test_cli_parse_attach() {
2637        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2638        assert!(matches!(
2639            &cli.command,
2640            Some(Commands::Attach { name }) if name == "my-session"
2641        ));
2642    }
2643
2644    #[test]
2645    fn test_cli_parse_attach_alias() {
2646        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2647        assert!(matches!(
2648            &cli.command,
2649            Some(Commands::Attach { name }) if name == "my-session"
2650        ));
2651    }
2652
2653    #[tokio::test]
2654    async fn test_execute_attach_success() {
2655        let node = start_test_server().await;
2656        let cli = Cli {
2657            node,
2658            token: None,
2659            command: Some(Commands::Attach {
2660                name: "test-session".into(),
2661            }),
2662            path: None,
2663        };
2664        let result = execute(&cli).await.unwrap();
2665        assert!(result.contains("Detached from session test-session"));
2666    }
2667
2668    #[tokio::test]
2669    async fn test_execute_attach_with_backend_session_id() {
2670        use axum::{Router, routing::get};
2671        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2672        let app = Router::new().route(
2673            "/api/v1/sessions/{id}",
2674            get(move || async move { session_json.to_owned() }),
2675        );
2676        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2677        let addr = listener.local_addr().unwrap();
2678        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2679
2680        let cli = Cli {
2681            node: format!("127.0.0.1:{}", addr.port()),
2682            token: None,
2683            command: Some(Commands::Attach {
2684                name: "my-session".into(),
2685            }),
2686            path: None,
2687        };
2688        let result = execute(&cli).await.unwrap();
2689        assert!(result.contains("Detached from session my-session"));
2690    }
2691
2692    #[tokio::test]
2693    async fn test_execute_attach_connection_refused() {
2694        let cli = Cli {
2695            node: "localhost:1".into(),
2696            token: None,
2697            command: Some(Commands::Attach {
2698                name: "test-session".into(),
2699            }),
2700            path: None,
2701        };
2702        let result = execute(&cli).await;
2703        let err = result.unwrap_err().to_string();
2704        assert!(err.contains("Could not connect to pulpod"));
2705    }
2706
2707    #[tokio::test]
2708    async fn test_execute_attach_error_response() {
2709        use axum::{Router, http::StatusCode, routing::get};
2710        let app = Router::new().route(
2711            "/api/v1/sessions/{id}",
2712            get(|| async {
2713                (
2714                    StatusCode::NOT_FOUND,
2715                    r#"{"error":"session not found"}"#.to_owned(),
2716                )
2717            }),
2718        );
2719        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2720        let addr = listener.local_addr().unwrap();
2721        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2722
2723        let cli = Cli {
2724            node: format!("127.0.0.1:{}", addr.port()),
2725            token: None,
2726            command: Some(Commands::Attach {
2727                name: "nonexistent".into(),
2728            }),
2729            path: None,
2730        };
2731        let result = execute(&cli).await;
2732        let err = result.unwrap_err().to_string();
2733        assert!(err.contains("session not found"));
2734    }
2735
2736    #[tokio::test]
2737    async fn test_execute_attach_stale_session() {
2738        use axum::{Router, routing::get};
2739        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2740        let app = Router::new().route(
2741            "/api/v1/sessions/{id}",
2742            get(move || async move { session_json.to_owned() }),
2743        );
2744        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2745        let addr = listener.local_addr().unwrap();
2746        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2747
2748        let cli = Cli {
2749            node: format!("127.0.0.1:{}", addr.port()),
2750            token: None,
2751            command: Some(Commands::Attach {
2752                name: "stale-sess".into(),
2753            }),
2754            path: None,
2755        };
2756        let result = execute(&cli).await;
2757        let err = result.unwrap_err().to_string();
2758        assert!(err.contains("lost"));
2759        assert!(err.contains("pulpo resume"));
2760    }
2761
2762    #[tokio::test]
2763    async fn test_execute_attach_dead_session() {
2764        use axum::{Router, routing::get};
2765        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2766        let app = Router::new().route(
2767            "/api/v1/sessions/{id}",
2768            get(move || async move { session_json.to_owned() }),
2769        );
2770        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2771        let addr = listener.local_addr().unwrap();
2772        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2773
2774        let cli = Cli {
2775            node: format!("127.0.0.1:{}", addr.port()),
2776            token: None,
2777            command: Some(Commands::Attach {
2778                name: "dead-sess".into(),
2779            }),
2780            path: None,
2781        };
2782        let result = execute(&cli).await;
2783        let err = result.unwrap_err().to_string();
2784        assert!(err.contains("killed"));
2785        assert!(err.contains("cannot attach"));
2786    }
2787
2788    // -- Alias parse tests --
2789
2790    #[test]
2791    fn test_cli_parse_alias_spawn() {
2792        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
2793        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
2794    }
2795
2796    #[test]
2797    fn test_cli_parse_alias_list() {
2798        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2799        assert!(matches!(&cli.command, Some(Commands::List)));
2800    }
2801
2802    #[test]
2803    fn test_cli_parse_alias_logs() {
2804        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2805        assert!(matches!(
2806            &cli.command,
2807            Some(Commands::Logs { name, .. }) if name == "my-session"
2808        ));
2809    }
2810
2811    #[test]
2812    fn test_cli_parse_alias_kill() {
2813        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2814        assert!(matches!(
2815            &cli.command,
2816            Some(Commands::Kill { name }) if name == "my-session"
2817        ));
2818    }
2819
2820    #[test]
2821    fn test_cli_parse_alias_delete() {
2822        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2823        assert!(matches!(
2824            &cli.command,
2825            Some(Commands::Delete { name }) if name == "my-session"
2826        ));
2827    }
2828
2829    #[test]
2830    fn test_cli_parse_alias_resume() {
2831        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2832        assert!(matches!(
2833            &cli.command,
2834            Some(Commands::Resume { name }) if name == "my-session"
2835        ));
2836    }
2837
2838    #[test]
2839    fn test_cli_parse_alias_nodes() {
2840        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2841        assert!(matches!(&cli.command, Some(Commands::Nodes)));
2842    }
2843
2844    #[test]
2845    fn test_cli_parse_alias_interventions() {
2846        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2847        assert!(matches!(
2848            &cli.command,
2849            Some(Commands::Interventions { name }) if name == "my-session"
2850        ));
2851    }
2852
2853    #[test]
2854    fn test_api_error_json() {
2855        let err = api_error("{\"error\":\"session not found: foo\"}");
2856        assert_eq!(err.to_string(), "session not found: foo");
2857    }
2858
2859    #[test]
2860    fn test_api_error_plain_text() {
2861        let err = api_error("plain text error");
2862        assert_eq!(err.to_string(), "plain text error");
2863    }
2864
2865    // -- diff_output tests --
2866
2867    #[test]
2868    fn test_diff_output_empty_prev() {
2869        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2870    }
2871
2872    #[test]
2873    fn test_diff_output_identical() {
2874        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2875    }
2876
2877    #[test]
2878    fn test_diff_output_new_lines_appended() {
2879        let prev = "line1\nline2";
2880        let new = "line1\nline2\nline3\nline4";
2881        assert_eq!(diff_output(prev, new), "line3\nline4");
2882    }
2883
2884    #[test]
2885    fn test_diff_output_scrolled_window() {
2886        // Window of 3 lines: old lines scroll off top, new appear at bottom
2887        let prev = "line1\nline2\nline3";
2888        let new = "line2\nline3\nline4";
2889        assert_eq!(diff_output(prev, new), "line4");
2890    }
2891
2892    #[test]
2893    fn test_diff_output_completely_different() {
2894        let prev = "aaa\nbbb";
2895        let new = "xxx\nyyy";
2896        assert_eq!(diff_output(prev, new), "xxx\nyyy");
2897    }
2898
2899    #[test]
2900    fn test_diff_output_last_line_matches_but_overlap_fails() {
2901        // Last line of prev appears in new but preceding lines don't match
2902        let prev = "aaa\ncommon";
2903        let new = "zzz\ncommon\nnew_line";
2904        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
2905        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
2906        // Falls through, no verified overlap, so returns everything
2907        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2908    }
2909
2910    #[test]
2911    fn test_diff_output_new_empty() {
2912        assert_eq!(diff_output("line1", ""), "");
2913    }
2914
2915    // -- follow_logs tests --
2916
2917    /// Start a test server that simulates evolving output and session status transitions.
2918    /// Start a test server that simulates evolving output with agent exit marker.
2919    async fn start_follow_test_server() -> String {
2920        use axum::{Router, extract::Path, extract::Query, routing::get};
2921        use std::sync::Arc;
2922        use std::sync::atomic::{AtomicUsize, Ordering};
2923
2924        let call_count = Arc::new(AtomicUsize::new(0));
2925        let output_count = call_count.clone();
2926
2927        let app = Router::new()
2928            .route(
2929                "/api/v1/sessions/{id}/output",
2930                get(
2931                    move |_path: Path<String>,
2932                          _query: Query<std::collections::HashMap<String, String>>| {
2933                        let count = output_count.clone();
2934                        async move {
2935                            let n = count.fetch_add(1, Ordering::SeqCst);
2936                            let output = match n {
2937                                0 => "line1\nline2".to_owned(),
2938                                1 => "line1\nline2\nline3".to_owned(),
2939                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
2940                            };
2941                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2942                        }
2943                    },
2944                ),
2945            )
2946            .route(
2947                "/api/v1/sessions/{id}",
2948                get(|_path: Path<String>| async {
2949                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2950                }),
2951            );
2952
2953        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2954        let addr = listener.local_addr().unwrap();
2955        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2956        format!("http://127.0.0.1:{}", addr.port())
2957    }
2958
2959    #[tokio::test]
2960    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
2961        let base = start_follow_test_server().await;
2962        let client = reqwest::Client::new();
2963        let mut buf = Vec::new();
2964
2965        follow_logs(&client, &base, "test", 100, None, &mut buf)
2966            .await
2967            .unwrap();
2968
2969        let output = String::from_utf8(buf).unwrap();
2970        // Should contain initial output + new lines + agent exit marker
2971        assert!(output.contains("line1"));
2972        assert!(output.contains("line2"));
2973        assert!(output.contains("line3"));
2974        assert!(output.contains("line4"));
2975        assert!(output.contains("[pulpo] Agent exited"));
2976    }
2977
2978    #[tokio::test]
2979    async fn test_execute_logs_follow_success() {
2980        let base = start_follow_test_server().await;
2981        // Extract host:port from http://127.0.0.1:PORT
2982        let node = base.strip_prefix("http://").unwrap().to_owned();
2983
2984        let cli = Cli {
2985            node,
2986            token: None,
2987            command: Some(Commands::Logs {
2988                name: "test".into(),
2989                lines: 100,
2990                follow: true,
2991            }),
2992            path: None,
2993        };
2994        // execute() with follow writes to stdout and returns empty string
2995        let result = execute(&cli).await.unwrap();
2996        assert_eq!(result, "");
2997    }
2998
2999    #[tokio::test]
3000    async fn test_execute_logs_follow_connection_refused() {
3001        let cli = Cli {
3002            node: "localhost:1".into(),
3003            token: None,
3004            command: Some(Commands::Logs {
3005                name: "test".into(),
3006                lines: 50,
3007                follow: true,
3008            }),
3009            path: None,
3010        };
3011        let result = execute(&cli).await;
3012        let err = result.unwrap_err().to_string();
3013        assert!(
3014            err.contains("Could not connect to pulpod"),
3015            "Expected friendly error, got: {err}"
3016        );
3017    }
3018
3019    #[tokio::test]
3020    async fn test_follow_logs_exits_on_dead() {
3021        use axum::{Router, extract::Path, extract::Query, routing::get};
3022
3023        let app = Router::new()
3024            .route(
3025                "/api/v1/sessions/{id}/output",
3026                get(
3027                    |_path: Path<String>,
3028                     _query: Query<std::collections::HashMap<String, String>>| async {
3029                        r#"{"output":"some output"}"#.to_owned()
3030                    },
3031                ),
3032            )
3033            .route(
3034                "/api/v1/sessions/{id}",
3035                get(|_path: Path<String>| async {
3036                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3037                }),
3038            );
3039
3040        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3041        let addr = listener.local_addr().unwrap();
3042        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3043        let base = format!("http://127.0.0.1:{}", addr.port());
3044
3045        let client = reqwest::Client::new();
3046        let mut buf = Vec::new();
3047        follow_logs(&client, &base, "test", 100, None, &mut buf)
3048            .await
3049            .unwrap();
3050
3051        let output = String::from_utf8(buf).unwrap();
3052        assert!(output.contains("some output"));
3053    }
3054
3055    #[tokio::test]
3056    async fn test_follow_logs_exits_on_stale() {
3057        use axum::{Router, extract::Path, extract::Query, routing::get};
3058
3059        let app = Router::new()
3060            .route(
3061                "/api/v1/sessions/{id}/output",
3062                get(
3063                    |_path: Path<String>,
3064                     _query: Query<std::collections::HashMap<String, String>>| async {
3065                        r#"{"output":"stale output"}"#.to_owned()
3066                    },
3067                ),
3068            )
3069            .route(
3070                "/api/v1/sessions/{id}",
3071                get(|_path: Path<String>| async {
3072                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3073                }),
3074            );
3075
3076        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3077        let addr = listener.local_addr().unwrap();
3078        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3079        let base = format!("http://127.0.0.1:{}", addr.port());
3080
3081        let client = reqwest::Client::new();
3082        let mut buf = Vec::new();
3083        follow_logs(&client, &base, "test", 100, None, &mut buf)
3084            .await
3085            .unwrap();
3086
3087        let output = String::from_utf8(buf).unwrap();
3088        assert!(output.contains("stale output"));
3089    }
3090
3091    #[tokio::test]
3092    async fn test_execute_logs_follow_non_reqwest_error() {
3093        use axum::{Router, extract::Path, extract::Query, routing::get};
3094
3095        // Session status endpoint returns invalid JSON to trigger a serde error
3096        let app = Router::new()
3097            .route(
3098                "/api/v1/sessions/{id}/output",
3099                get(
3100                    |_path: Path<String>,
3101                     _query: Query<std::collections::HashMap<String, String>>| async {
3102                        r#"{"output":"initial"}"#.to_owned()
3103                    },
3104                ),
3105            )
3106            .route(
3107                "/api/v1/sessions/{id}",
3108                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3109            );
3110
3111        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3112        let addr = listener.local_addr().unwrap();
3113        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3114        let node = format!("127.0.0.1:{}", addr.port());
3115
3116        let cli = Cli {
3117            node,
3118            token: None,
3119            command: Some(Commands::Logs {
3120                name: "test".into(),
3121                lines: 100,
3122                follow: true,
3123            }),
3124            path: None,
3125        };
3126        let err = execute(&cli).await.unwrap_err();
3127        // serde_json error, not a reqwest error — hits the Err(other) branch
3128        let msg = err.to_string();
3129        assert!(
3130            msg.contains("expected ident"),
3131            "Expected serde parse error, got: {msg}"
3132        );
3133    }
3134
3135    #[tokio::test]
3136    async fn test_fetch_session_status_connection_error() {
3137        let client = reqwest::Client::new();
3138        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3139        assert!(result.is_err());
3140    }
3141
3142    // -- Crontab wrapper tests --
3143
3144    #[test]
3145    fn test_build_crontab_line() {
3146        let line = build_crontab_line(
3147            "nightly-review",
3148            "0 3 * * *",
3149            "/home/me/repo",
3150            "claude -p 'Review PRs'",
3151            "localhost:7433",
3152        );
3153        assert_eq!(
3154            line,
3155            "0 3 * * * pulpo --node localhost:7433 spawn nightly-review --workdir /home/me/repo -- claude -p 'Review PRs' #pulpo:nightly-review\n"
3156        );
3157    }
3158
3159    #[test]
3160    fn test_crontab_install_success() {
3161        let crontab = "# existing cron\n0 * * * * echo hi\n";
3162        let line = "0 3 * * * pulpo --node n spawn my-job --workdir /tmp -- claude -p task #pulpo:my-job\n";
3163        let result = crontab_install(crontab, "my-job", line).unwrap();
3164        assert!(result.starts_with("# existing cron\n"));
3165        assert!(result.ends_with("#pulpo:my-job\n"));
3166        assert!(result.contains("echo hi"));
3167    }
3168
3169    #[test]
3170    fn test_crontab_install_duplicate_error() {
3171        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3172        let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3173        let err = crontab_install(crontab, "my-job", line).unwrap_err();
3174        assert!(err.to_string().contains("already exists"));
3175    }
3176
3177    #[test]
3178    fn test_crontab_list_empty() {
3179        assert_eq!(crontab_list(""), "No pulpo schedules.");
3180    }
3181
3182    #[test]
3183    fn test_crontab_list_no_pulpo_entries() {
3184        assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3185    }
3186
3187    #[test]
3188    fn test_crontab_list_with_entries() {
3189        let crontab = "0 3 * * * pulpo --node n spawn nightly --workdir /tmp -- claude -p task #pulpo:nightly\n";
3190        let output = crontab_list(crontab);
3191        assert!(output.contains("NAME"));
3192        assert!(output.contains("CRON"));
3193        assert!(output.contains("PAUSED"));
3194        assert!(output.contains("nightly"));
3195        assert!(output.contains("0 3 * * *"));
3196        assert!(output.contains("no"));
3197    }
3198
3199    #[test]
3200    fn test_crontab_list_paused_entry() {
3201        let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3202        let output = crontab_list(crontab);
3203        assert!(output.contains("paused-job"));
3204        assert!(output.contains("yes"));
3205    }
3206
3207    #[test]
3208    fn test_crontab_list_short_line() {
3209        // A line with fewer than 5 space-separated parts but still tagged
3210        let crontab = "badcron #pulpo:broken\n";
3211        let output = crontab_list(crontab);
3212        assert!(output.contains("broken"));
3213        assert!(output.contains('?'));
3214    }
3215
3216    #[test]
3217    fn test_crontab_remove_success() {
3218        let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3219        let result = crontab_remove(crontab, "my-job").unwrap();
3220        assert!(result.contains("echo hi"));
3221        assert!(!result.contains("my-job"));
3222    }
3223
3224    #[test]
3225    fn test_crontab_remove_not_found() {
3226        let crontab = "0 * * * * echo hi\n";
3227        let err = crontab_remove(crontab, "ghost").unwrap_err();
3228        assert!(err.to_string().contains("not found"));
3229    }
3230
3231    #[test]
3232    fn test_crontab_pause_success() {
3233        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3234        let result = crontab_pause(crontab, "my-job").unwrap();
3235        assert!(result.starts_with('#'));
3236        assert!(result.contains("#pulpo:my-job"));
3237    }
3238
3239    #[test]
3240    fn test_crontab_pause_not_found() {
3241        let crontab = "0 * * * * echo hi\n";
3242        let err = crontab_pause(crontab, "ghost").unwrap_err();
3243        assert!(err.to_string().contains("not found or already paused"));
3244    }
3245
3246    #[test]
3247    fn test_crontab_pause_already_paused() {
3248        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3249        let err = crontab_pause(crontab, "my-job").unwrap_err();
3250        assert!(err.to_string().contains("already paused"));
3251    }
3252
3253    #[test]
3254    fn test_crontab_resume_success() {
3255        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3256        let result = crontab_resume(crontab, "my-job").unwrap();
3257        assert!(!result.starts_with('#'));
3258        assert!(result.contains("#pulpo:my-job"));
3259    }
3260
3261    #[test]
3262    fn test_crontab_resume_not_found() {
3263        let crontab = "0 * * * * echo hi\n";
3264        let err = crontab_resume(crontab, "ghost").unwrap_err();
3265        assert!(err.to_string().contains("not found or not paused"));
3266    }
3267
3268    #[test]
3269    fn test_crontab_resume_not_paused() {
3270        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3271        let err = crontab_resume(crontab, "my-job").unwrap_err();
3272        assert!(err.to_string().contains("not paused"));
3273    }
3274
3275    // -- Schedule CLI parse tests --
3276
3277    #[test]
3278    fn test_cli_parse_schedule_install() {
3279        let cli = Cli::try_parse_from([
3280            "pulpo",
3281            "schedule",
3282            "install",
3283            "nightly",
3284            "0 3 * * *",
3285            "--workdir",
3286            "/tmp/repo",
3287            "--",
3288            "claude",
3289            "-p",
3290            "Review PRs",
3291        ])
3292        .unwrap();
3293        assert!(matches!(
3294            &cli.command,
3295            Some(Commands::Schedule {
3296                action: ScheduleAction::Install { name, cron, workdir, command }
3297            }) if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3298              && command == &["claude", "-p", "Review PRs"]
3299        ));
3300    }
3301
3302    #[test]
3303    fn test_cli_parse_schedule_list() {
3304        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3305        assert!(matches!(
3306            &cli.command,
3307            Some(Commands::Schedule {
3308                action: ScheduleAction::List
3309            })
3310        ));
3311    }
3312
3313    #[test]
3314    fn test_cli_parse_schedule_remove() {
3315        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3316        assert!(matches!(
3317            &cli.command,
3318            Some(Commands::Schedule {
3319                action: ScheduleAction::Remove { name }
3320            }) if name == "nightly"
3321        ));
3322    }
3323
3324    #[test]
3325    fn test_cli_parse_schedule_pause() {
3326        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3327        assert!(matches!(
3328            &cli.command,
3329            Some(Commands::Schedule {
3330                action: ScheduleAction::Pause { name }
3331            }) if name == "nightly"
3332        ));
3333    }
3334
3335    #[test]
3336    fn test_cli_parse_schedule_resume() {
3337        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3338        assert!(matches!(
3339            &cli.command,
3340            Some(Commands::Schedule {
3341                action: ScheduleAction::Resume { name }
3342            }) if name == "nightly"
3343        ));
3344    }
3345
3346    #[test]
3347    fn test_cli_parse_schedule_alias() {
3348        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3349        assert!(matches!(
3350            &cli.command,
3351            Some(Commands::Schedule {
3352                action: ScheduleAction::List
3353            })
3354        ));
3355    }
3356
3357    #[test]
3358    fn test_cli_parse_schedule_list_alias() {
3359        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3360        assert!(matches!(
3361            &cli.command,
3362            Some(Commands::Schedule {
3363                action: ScheduleAction::List
3364            })
3365        ));
3366    }
3367
3368    #[test]
3369    fn test_cli_parse_schedule_remove_alias() {
3370        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3371        assert!(matches!(
3372            &cli.command,
3373            Some(Commands::Schedule {
3374                action: ScheduleAction::Remove { name }
3375            }) if name == "nightly"
3376        ));
3377    }
3378
3379    #[tokio::test]
3380    async fn test_execute_schedule_via_execute() {
3381        // Under coverage builds, execute_schedule is a stub returning Ok("")
3382        let node = start_test_server().await;
3383        let cli = Cli {
3384            node,
3385            token: None,
3386            command: Some(Commands::Schedule {
3387                action: ScheduleAction::List,
3388            }),
3389            path: None,
3390        };
3391        let result = execute(&cli).await;
3392        // Under coverage: succeeds with empty string; under non-coverage: may fail (no crontab)
3393        assert!(result.is_ok() || result.is_err());
3394    }
3395
3396    #[test]
3397    fn test_schedule_action_debug() {
3398        let action = ScheduleAction::List;
3399        assert_eq!(format!("{action:?}"), "List");
3400    }
3401
3402    #[test]
3403    fn test_cli_parse_send_alias() {
3404        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3405        assert!(matches!(
3406            &cli.command,
3407            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3408        ));
3409    }
3410
3411    #[test]
3412    fn test_cli_parse_spawn_no_name() {
3413        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3414        assert!(matches!(
3415            &cli.command,
3416            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3417        ));
3418    }
3419
3420    #[test]
3421    fn test_cli_parse_spawn_optional_name_with_command() {
3422        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3423        assert!(matches!(
3424            &cli.command,
3425            Some(Commands::Spawn { name, command, .. })
3426                if name.is_none() && command == &["echo", "hello"]
3427        ));
3428    }
3429
3430    #[test]
3431    fn test_cli_parse_path_shortcut() {
3432        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3433        assert!(cli.command.is_none());
3434        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3435    }
3436
3437    #[test]
3438    fn test_cli_parse_no_args() {
3439        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3440        assert!(cli.command.is_none());
3441        assert!(cli.path.is_none());
3442    }
3443
3444    #[test]
3445    fn test_derive_session_name_simple() {
3446        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3447    }
3448
3449    #[test]
3450    fn test_derive_session_name_with_special_chars() {
3451        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
3452    }
3453
3454    #[test]
3455    fn test_derive_session_name_root() {
3456        assert_eq!(derive_session_name("/"), "session");
3457    }
3458
3459    #[test]
3460    fn test_derive_session_name_dots() {
3461        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
3462    }
3463
3464    #[test]
3465    fn test_resolve_path_absolute() {
3466        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
3467    }
3468
3469    #[test]
3470    fn test_resolve_path_relative() {
3471        let resolved = resolve_path("my-repo");
3472        assert!(resolved.ends_with("my-repo"));
3473        assert!(resolved.starts_with('/'));
3474    }
3475
3476    #[tokio::test]
3477    async fn test_execute_path_shortcut() {
3478        let node = start_test_server().await;
3479        let cli = Cli {
3480            node,
3481            token: None,
3482            path: Some("/tmp".into()),
3483            command: None,
3484        };
3485        let result = execute(&cli).await.unwrap();
3486        assert!(result.contains("Detached from session"));
3487    }
3488
3489    // -- Node resolution tests --
3490
3491    #[test]
3492    fn test_node_needs_resolution() {
3493        assert!(!node_needs_resolution("localhost:7433"));
3494        assert!(!node_needs_resolution("mac-mini:7433"));
3495        assert!(!node_needs_resolution("10.0.0.1:7433"));
3496        assert!(!node_needs_resolution("[::1]:7433"));
3497        assert!(node_needs_resolution("mac-mini"));
3498        assert!(node_needs_resolution("linux-server"));
3499        assert!(node_needs_resolution("localhost"));
3500    }
3501
3502    #[tokio::test]
3503    async fn test_resolve_node_with_port() {
3504        let client = reqwest::Client::new();
3505        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
3506        assert_eq!(addr, "mac-mini:7433");
3507        assert!(token.is_none());
3508    }
3509
3510    #[tokio::test]
3511    async fn test_resolve_node_fallback_appends_port() {
3512        // No local daemon running on localhost:7433, so peer lookup fails
3513        // and it falls back to appending :7433
3514        let client = reqwest::Client::new();
3515        let (addr, token) = resolve_node(&client, "unknown-host").await;
3516        assert_eq!(addr, "unknown-host:7433");
3517        assert!(token.is_none());
3518    }
3519
3520    #[cfg(not(coverage))]
3521    #[tokio::test]
3522    async fn test_resolve_node_finds_peer() {
3523        use axum::{Router, routing::get};
3524
3525        let app = Router::new()
3526            .route(
3527                "/api/v1/peers",
3528                get(|| async {
3529                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
3530                }),
3531            )
3532            .route(
3533                "/api/v1/config",
3534                get(|| async {
3535                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
3536                }),
3537            );
3538
3539        // Port 7433 may be in use; skip test if so
3540        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
3541            return;
3542        };
3543        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3544
3545        let client = reqwest::Client::new();
3546        let (addr, token) = resolve_node(&client, "mac-mini").await;
3547        assert_eq!(addr, "10.0.0.5:7433");
3548        assert_eq!(token, Some("peer-secret".into()));
3549    }
3550
3551    #[cfg(not(coverage))]
3552    #[tokio::test]
3553    async fn test_resolve_node_peer_no_token() {
3554        use axum::{Router, routing::get};
3555
3556        let app = Router::new()
3557            .route(
3558                "/api/v1/peers",
3559                get(|| async {
3560                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3561                }),
3562            )
3563            .route(
3564                "/api/v1/config",
3565                get(|| async {
3566                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
3567                }),
3568            );
3569
3570        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
3571            return; // Port in use, skip
3572        };
3573        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3574
3575        let client = reqwest::Client::new();
3576        let (addr, token) = resolve_node(&client, "test-peer").await;
3577        assert_eq!(addr, "10.0.0.9:7433");
3578        assert!(token.is_none()); // Simple peer entry has no token
3579    }
3580
3581    #[tokio::test]
3582    async fn test_execute_with_peer_name_resolution() {
3583        // When node doesn't contain ':', resolve_node is called.
3584        // Since there's no local daemon on port 7433, it falls back to appending :7433.
3585        // The connection to the fallback address will fail, giving us a connection error.
3586        let cli = Cli {
3587            node: "nonexistent-peer".into(),
3588            token: None,
3589            command: Some(Commands::List),
3590            path: None,
3591        };
3592        let result = execute(&cli).await;
3593        // Should try to connect to nonexistent-peer:7433 and fail
3594        assert!(result.is_err());
3595    }
3596
3597    // -- Auto node selection tests --
3598
3599    #[test]
3600    fn test_cli_parse_spawn_auto() {
3601        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
3602        assert!(matches!(
3603            &cli.command,
3604            Some(Commands::Spawn { auto, .. }) if *auto
3605        ));
3606    }
3607
3608    #[test]
3609    fn test_cli_parse_spawn_auto_default() {
3610        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
3611        assert!(matches!(
3612            &cli.command,
3613            Some(Commands::Spawn { auto, .. }) if !auto
3614        ));
3615    }
3616
3617    #[tokio::test]
3618    async fn test_select_best_node_coverage_stub() {
3619        // Exercise the coverage stub (or real function in non-coverage builds)
3620        let client = reqwest::Client::new();
3621        // In coverage builds, the stub returns ("localhost:7433", "local")
3622        // In non-coverage builds, this fails because no server is running — that's OK
3623        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
3624    }
3625
3626    #[cfg(not(coverage))]
3627    #[tokio::test]
3628    async fn test_select_best_node_picks_least_loaded() {
3629        use axum::{Router, routing::get};
3630
3631        let app = Router::new().route(
3632            "/api/v1/peers",
3633            get(|| async {
3634                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
3635            }),
3636        );
3637        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3638        let addr = listener.local_addr().unwrap();
3639        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3640        let base = format!("http://127.0.0.1:{}", addr.port());
3641
3642        let client = reqwest::Client::new();
3643        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3644        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
3645        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
3646        // idle wins (lower score)
3647        assert_eq!(name, "idle");
3648        assert_eq!(addr, "idle:7433");
3649    }
3650
3651    #[cfg(not(coverage))]
3652    #[tokio::test]
3653    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
3654        use axum::{Router, routing::get};
3655
3656        let app = Router::new().route(
3657            "/api/v1/peers",
3658            get(|| async {
3659                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3660            }),
3661        );
3662        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3663        let addr = listener.local_addr().unwrap();
3664        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3665        let base = format!("http://127.0.0.1:{}", addr.port());
3666
3667        let client = reqwest::Client::new();
3668        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3669        assert_eq!(name, "my-mac");
3670        assert_eq!(addr, "localhost:7433");
3671    }
3672
3673    #[cfg(not(coverage))]
3674    #[tokio::test]
3675    async fn test_select_best_node_empty_peers_falls_back_to_local() {
3676        use axum::{Router, routing::get};
3677
3678        let app = Router::new().route(
3679            "/api/v1/peers",
3680            get(|| async {
3681                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
3682            }),
3683        );
3684        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3685        let addr = listener.local_addr().unwrap();
3686        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3687        let base = format!("http://127.0.0.1:{}", addr.port());
3688
3689        let client = reqwest::Client::new();
3690        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3691        assert_eq!(name, "solo");
3692        assert_eq!(addr, "localhost:7433");
3693    }
3694
3695    #[cfg(not(coverage))]
3696    #[tokio::test]
3697    async fn test_execute_spawn_auto_selects_node() {
3698        use axum::{
3699            Router,
3700            http::StatusCode,
3701            routing::{get, post},
3702        };
3703
3704        let create_json = test_create_response_json();
3705
3706        // Bind early so we know the address to embed in the peers response
3707        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3708        let addr = listener.local_addr().unwrap();
3709        let node = format!("127.0.0.1:{}", addr.port());
3710        let peer_addr = node.clone();
3711
3712        let app = Router::new()
3713            .route(
3714                "/api/v1/peers",
3715                get(move || {
3716                    let peer_addr = peer_addr.clone();
3717                    async move {
3718                        format!(
3719                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
3720                        )
3721                    }
3722                }),
3723            )
3724            .route(
3725                "/api/v1/sessions",
3726                post(move || async move {
3727                    (StatusCode::CREATED, create_json.clone())
3728                }),
3729            );
3730
3731        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3732
3733        let cli = Cli {
3734            node,
3735            token: None,
3736            command: Some(Commands::Spawn {
3737                name: Some("test".into()),
3738                workdir: Some("/tmp/repo".into()),
3739                ink: None,
3740                description: None,
3741                detach: true,
3742                idle_threshold: None,
3743                auto: true,
3744                command: vec!["echo".into(), "hello".into()],
3745            }),
3746            path: None,
3747        };
3748        let result = execute(&cli).await.unwrap();
3749        assert!(result.contains("Created session"));
3750    }
3751
3752    #[cfg(not(coverage))]
3753    #[tokio::test]
3754    async fn test_select_best_node_peer_no_session_count() {
3755        use axum::{Router, routing::get};
3756
3757        let app = Router::new().route(
3758            "/api/v1/peers",
3759            get(|| async {
3760                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3761            }),
3762        );
3763        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3764        let addr = listener.local_addr().unwrap();
3765        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3766        let base = format!("http://127.0.0.1:{}", addr.port());
3767
3768        let client = reqwest::Client::new();
3769        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3770        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
3771        assert_eq!(name, "fresh");
3772        assert_eq!(addr, "fresh:7433");
3773    }
3774}