Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
5    PeersResponse,
6};
7use pulpo_common::session::Session;
8
9#[derive(Parser, Debug)]
10#[command(
11    name = "pulpo",
12    about = "Manage agent sessions across your machines",
13    version = env!("PULPO_VERSION"),
14    args_conflicts_with_subcommands = true
15)]
16pub struct Cli {
17    /// Target node (default: localhost)
18    #[arg(long, default_value = "localhost:7433")]
19    pub node: String,
20
21    /// Auth token (auto-discovered from local daemon if omitted)
22    #[arg(long)]
23    pub token: Option<String>,
24
25    #[command(subcommand)]
26    pub command: Option<Commands>,
27
28    /// Quick spawn: `pulpo <path>` spawns a session in that directory
29    #[arg(value_name = "PATH")]
30    pub path: Option<String>,
31}
32
33#[derive(Subcommand, Debug)]
34#[allow(clippy::large_enum_variant)]
35pub enum Commands {
36    /// Attach to a session's terminal
37    #[command(visible_alias = "a")]
38    Attach {
39        /// Session name or ID
40        name: String,
41    },
42
43    /// Send input to a session
44    #[command(visible_alias = "i", visible_alias = "send")]
45    Input {
46        /// Session name or ID
47        name: String,
48        /// Text to send (sends Enter if omitted)
49        text: Option<String>,
50    },
51
52    /// Spawn a new agent session
53    #[command(visible_alias = "s")]
54    Spawn {
55        /// Session name (auto-generated from workdir if omitted)
56        name: Option<String>,
57
58        /// Working directory (defaults to current directory)
59        #[arg(long)]
60        workdir: Option<String>,
61
62        /// Ink name (from config)
63        #[arg(long)]
64        ink: Option<String>,
65
66        /// Human-readable description of the task
67        #[arg(long)]
68        description: Option<String>,
69
70        /// Don't attach to the session after spawning
71        #[arg(short, long)]
72        detach: bool,
73
74        /// Idle threshold in seconds (0 = never idle)
75        #[arg(long)]
76        idle_threshold: Option<u32>,
77
78        /// Auto-select the least loaded node
79        #[arg(long)]
80        auto: bool,
81
82        /// Create an isolated git worktree for the session
83        #[arg(long)]
84        worktree: bool,
85
86        /// Run in a Docker sandbox container
87        #[arg(long)]
88        sandbox: bool,
89
90        /// Command to run (everything after --)
91        #[arg(last = true)]
92        command: Vec<String>,
93    },
94
95    /// List all sessions
96    #[command(visible_alias = "ls")]
97    List,
98
99    /// Show session logs/output
100    #[command(visible_alias = "l")]
101    Logs {
102        /// Session name or ID
103        name: String,
104
105        /// Number of lines to fetch
106        #[arg(long, default_value = "100")]
107        lines: usize,
108
109        /// Follow output (like `tail -f`)
110        #[arg(short, long)]
111        follow: bool,
112    },
113
114    /// Kill a session
115    #[command(visible_alias = "k")]
116    Kill {
117        /// Session name or ID
118        name: String,
119    },
120
121    /// Permanently remove a session from history
122    #[command(visible_alias = "rm")]
123    Delete {
124        /// Session name or ID
125        name: String,
126    },
127
128    /// Resume a lost session
129    #[command(visible_alias = "r")]
130    Resume {
131        /// Session name or ID
132        name: String,
133    },
134
135    /// List all known nodes
136    #[command(visible_alias = "n")]
137    Nodes,
138
139    /// Show intervention history for a session
140    #[command(visible_alias = "iv")]
141    Interventions {
142        /// Session name or ID
143        name: String,
144    },
145
146    /// Open the web dashboard in your browser
147    Ui,
148
149    /// Manage scheduled agent runs
150    #[command(visible_alias = "sched")]
151    Schedule {
152        #[command(subcommand)]
153        action: ScheduleAction,
154    },
155}
156
157#[derive(Subcommand, Debug)]
158pub enum ScheduleAction {
159    /// Add a new schedule
160    #[command(alias = "install")]
161    Add {
162        /// Schedule name
163        name: String,
164        /// Cron expression (e.g. "0 3 * * *")
165        cron: String,
166        /// Working directory
167        #[arg(long)]
168        workdir: Option<String>,
169        /// Target node (omit = local, "auto" = least-loaded)
170        #[arg(long)]
171        node: Option<String>,
172        /// Ink preset
173        #[arg(long)]
174        ink: Option<String>,
175        /// Description
176        #[arg(long)]
177        description: Option<String>,
178        /// Command to run (everything after --)
179        #[arg(last = true)]
180        command: Vec<String>,
181    },
182    /// List all schedules
183    #[command(alias = "ls")]
184    List,
185    /// Remove a schedule
186    #[command(alias = "rm")]
187    Remove {
188        /// Schedule name or ID
189        name: String,
190    },
191    /// Pause a schedule
192    Pause {
193        /// Schedule name or ID
194        name: String,
195    },
196    /// Resume a paused schedule
197    Resume {
198        /// Schedule name or ID
199        name: String,
200    },
201}
202
203/// The marker emitted by the agent wrapper when the agent process exits.
204const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
205
206/// Resolve a path to an absolute path string.
207fn resolve_path(path: &str) -> String {
208    let p = std::path::Path::new(path);
209    if p.is_absolute() {
210        path.to_owned()
211    } else {
212        std::env::current_dir().map_or_else(
213            |_| path.to_owned(),
214            |cwd| cwd.join(p).to_string_lossy().into_owned(),
215        )
216    }
217}
218
219/// Derive a session name from a directory path (basename, kebab-cased).
220fn derive_session_name(path: &str) -> String {
221    let basename = std::path::Path::new(path)
222        .file_name()
223        .and_then(|n| n.to_str())
224        .unwrap_or("session");
225    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
226    let kebab: String = basename
227        .chars()
228        .map(|c| {
229            if c.is_ascii_alphanumeric() {
230                c.to_ascii_lowercase()
231            } else {
232                '-'
233            }
234        })
235        .collect();
236    // Collapse consecutive hyphens and trim leading/trailing hyphens
237    let mut result = String::new();
238    for c in kebab.chars() {
239        if c == '-' && result.ends_with('-') {
240            continue;
241        }
242        result.push(c);
243    }
244    let result = result.trim_matches('-').to_owned();
245    if result.is_empty() {
246        "session".to_owned()
247    } else {
248        result
249    }
250}
251
252/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
253async fn deduplicate_session_name(
254    client: &reqwest::Client,
255    base: &str,
256    name: &str,
257    token: Option<&str>,
258) -> String {
259    // Check if the name is already taken by fetching the session
260    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
261        .send()
262        .await;
263    match resp {
264        Ok(r) if r.status().is_success() => {
265            // Session exists — try suffixed names
266            for i in 2..=99 {
267                let candidate = format!("{name}-{i}");
268                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
269                    .send()
270                    .await;
271                match resp {
272                    Ok(r) if r.status().is_success() => {}
273                    _ => return candidate,
274                }
275            }
276            format!("{name}-100")
277        }
278        _ => name.to_owned(),
279    }
280}
281
282/// Format the base URL from the node address.
283pub fn base_url(node: &str) -> String {
284    format!("http://{node}")
285}
286
287/// Response shape for the output endpoint.
288#[derive(serde::Deserialize)]
289struct OutputResponse {
290    output: String,
291}
292
293/// Format a list of sessions as a table.
294fn format_sessions(sessions: &[Session]) -> String {
295    if sessions.is_empty() {
296        return "No sessions.".into();
297    }
298    let mut lines = vec![format!("{:<20} {:<12} {}", "NAME", "STATUS", "COMMAND")];
299    for s in sessions {
300        let cmd_display = if s.command.len() > 50 {
301            format!("{}...", &s.command[..47])
302        } else {
303            s.command.clone()
304        };
305        lines.push(format!("{:<20} {:<12} {}", s.name, s.status, cmd_display));
306    }
307    lines.join("\n")
308}
309
310/// Format the peers response as a table.
311fn format_nodes(resp: &PeersResponse) -> String {
312    let mut lines = vec![format!(
313        "{:<20} {:<25} {:<10} {}",
314        "NAME", "ADDRESS", "STATUS", "SESSIONS"
315    )];
316    lines.push(format!(
317        "{:<20} {:<25} {:<10} {}",
318        resp.local.name, "(local)", "online", "-"
319    ));
320    for p in &resp.peers {
321        let sessions = p
322            .session_count
323            .map_or_else(|| "-".into(), |c| c.to_string());
324        lines.push(format!(
325            "{:<20} {:<25} {:<10} {}",
326            p.name, p.address, p.status, sessions
327        ));
328    }
329    lines.join("\n")
330}
331
332/// Format intervention events as a table.
333fn format_interventions(events: &[InterventionEventResponse]) -> String {
334    if events.is_empty() {
335        return "No intervention events.".into();
336    }
337    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
338    for e in events {
339        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
340    }
341    lines.join("\n")
342}
343
344/// Build the command to open a URL in the default browser.
345#[cfg_attr(coverage, allow(dead_code))]
346fn build_open_command(url: &str) -> std::process::Command {
347    #[cfg(target_os = "macos")]
348    {
349        let mut cmd = std::process::Command::new("open");
350        cmd.arg(url);
351        cmd
352    }
353    #[cfg(target_os = "linux")]
354    {
355        let mut cmd = std::process::Command::new("xdg-open");
356        cmd.arg(url);
357        cmd
358    }
359    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
360    {
361        // Fallback: try xdg-open
362        let mut cmd = std::process::Command::new("xdg-open");
363        cmd.arg(url);
364        cmd
365    }
366}
367
368/// Open a URL in the default browser.
369#[cfg(not(coverage))]
370fn open_browser(url: &str) -> Result<()> {
371    build_open_command(url).status()?;
372    Ok(())
373}
374
375/// Stub for coverage builds — avoids opening a browser during tests.
376#[cfg(coverage)]
377fn open_browser(_url: &str) -> Result<()> {
378    Ok(())
379}
380
381/// Build the command to attach to a session's terminal.
382/// Takes the backend session ID (e.g. `my-session`) — the tmux session name.
383#[cfg_attr(coverage, allow(dead_code))]
384fn build_attach_command(backend_session_id: &str) -> std::process::Command {
385    let mut cmd = std::process::Command::new("tmux");
386    cmd.args(["attach-session", "-t", backend_session_id]);
387    cmd
388}
389
390/// Attach to a session's terminal.
391#[cfg(not(any(test, coverage)))]
392fn attach_session(backend_session_id: &str) -> Result<()> {
393    let status = build_attach_command(backend_session_id).status()?;
394    if !status.success() {
395        anyhow::bail!("attach failed with {status}");
396    }
397    Ok(())
398}
399
400/// Stub for test and coverage builds — avoids spawning real terminals during tests.
401#[cfg(any(test, coverage))]
402#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
403fn attach_session(_backend_session_id: &str) -> Result<()> {
404    Ok(())
405}
406
407/// Extract a clean error message from an API JSON response (or fall back to raw text).
408fn api_error(text: &str) -> anyhow::Error {
409    serde_json::from_str::<serde_json::Value>(text)
410        .ok()
411        .and_then(|v| v["error"].as_str().map(String::from))
412        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
413}
414
415/// Return the response body text, or a clean error if the response was non-success.
416async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
417    if resp.status().is_success() {
418        Ok(resp.text().await?)
419    } else {
420        let text = resp.text().await?;
421        Err(api_error(&text))
422    }
423}
424
425/// Map a reqwest error to a user-friendly message.
426fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
427    if err.is_connect() {
428        anyhow::anyhow!(
429            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
430        )
431    } else {
432        anyhow::anyhow!("Network error connecting to {node}: {err}")
433    }
434}
435
436/// Check if the node address points to localhost.
437fn is_localhost(node: &str) -> bool {
438    let host = node.split(':').next().unwrap_or(node);
439    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
440}
441
442/// Try to auto-discover the auth token from a local daemon.
443async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
444    let resp = client
445        .get(format!("{base}/api/v1/auth/token"))
446        .send()
447        .await
448        .ok()?;
449    let body: AuthTokenResponse = resp.json().await.ok()?;
450    if body.token.is_empty() {
451        None
452    } else {
453        Some(body.token)
454    }
455}
456
457/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
458async fn resolve_token(
459    client: &reqwest::Client,
460    base: &str,
461    node: &str,
462    explicit: Option<&str>,
463) -> Option<String> {
464    if let Some(t) = explicit {
465        return Some(t.to_owned());
466    }
467    if is_localhost(node) {
468        return discover_token(client, base).await;
469    }
470    None
471}
472
473/// Check if a node string needs resolution (no port specified).
474fn node_needs_resolution(node: &str) -> bool {
475    !node.contains(':')
476}
477
478/// Resolve a node reference to a `host:port` address.
479///
480/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
481/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
482/// online peer is found, return its address and optionally its configured auth token
483/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
484#[cfg(not(coverage))]
485async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
486    // Already has port — use as-is
487    if !node_needs_resolution(node) {
488        return (node.to_owned(), None);
489    }
490
491    // Try to resolve via local daemon's peer registry
492    let local_base = "http://localhost:7433";
493    let mut resolved_address: Option<String> = None;
494
495    if let Ok(resp) = client
496        .get(format!("{local_base}/api/v1/peers"))
497        .send()
498        .await
499        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
500    {
501        for peer in &peers_resp.peers {
502            if peer.name == node {
503                resolved_address = Some(peer.address.clone());
504                break;
505            }
506        }
507    }
508
509    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
510
511    // Try to get the peer's auth token from the config endpoint
512    let peer_token = if let Ok(resp) = client
513        .get(format!("{local_base}/api/v1/config"))
514        .send()
515        .await
516        && let Ok(config) = resp.json::<ConfigResponse>().await
517        && let Some(entry) = config.peers.get(node)
518    {
519        entry.token().map(String::from)
520    } else {
521        None
522    };
523
524    (address, peer_token)
525}
526
527/// Coverage stub — no real HTTP resolution during coverage builds.
528#[cfg(coverage)]
529async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
530    if node_needs_resolution(node) {
531        (format!("{node}:7433"), None)
532    } else {
533        (node.to_owned(), None)
534    }
535}
536
537/// Build an authenticated GET request.
538fn authed_get(
539    client: &reqwest::Client,
540    url: String,
541    token: Option<&str>,
542) -> reqwest::RequestBuilder {
543    let req = client.get(url);
544    if let Some(t) = token {
545        req.bearer_auth(t)
546    } else {
547        req
548    }
549}
550
551/// Build an authenticated POST request.
552fn authed_post(
553    client: &reqwest::Client,
554    url: String,
555    token: Option<&str>,
556) -> reqwest::RequestBuilder {
557    let req = client.post(url);
558    if let Some(t) = token {
559        req.bearer_auth(t)
560    } else {
561        req
562    }
563}
564
565/// Build an authenticated DELETE request.
566fn authed_delete(
567    client: &reqwest::Client,
568    url: String,
569    token: Option<&str>,
570) -> reqwest::RequestBuilder {
571    let req = client.delete(url);
572    if let Some(t) = token {
573        req.bearer_auth(t)
574    } else {
575        req
576    }
577}
578
579/// Build an authenticated PUT request.
580#[cfg_attr(coverage, allow(dead_code))]
581fn authed_put(
582    client: &reqwest::Client,
583    url: String,
584    token: Option<&str>,
585) -> reqwest::RequestBuilder {
586    let req = client.put(url);
587    if let Some(t) = token {
588        req.bearer_auth(t)
589    } else {
590        req
591    }
592}
593
594/// Fetch session output from the API.
595async fn fetch_output(
596    client: &reqwest::Client,
597    base: &str,
598    name: &str,
599    lines: usize,
600    token: Option<&str>,
601) -> Result<String> {
602    let resp = authed_get(
603        client,
604        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
605        token,
606    )
607    .send()
608    .await?;
609    let text = ok_or_api_error(resp).await?;
610    let output: OutputResponse = serde_json::from_str(&text)?;
611    Ok(output.output)
612}
613
614/// Fetch session status from the API.
615async fn fetch_session_status(
616    client: &reqwest::Client,
617    base: &str,
618    name: &str,
619    token: Option<&str>,
620) -> Result<String> {
621    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
622        .send()
623        .await?;
624    let text = ok_or_api_error(resp).await?;
625    let session: Session = serde_json::from_str(&text)?;
626    Ok(session.status.to_string())
627}
628
629/// Compute the new trailing lines that differ from the previous output.
630///
631/// The output endpoint returns the last N lines from the terminal pane. As new lines
632/// appear, old lines at the top scroll off. We find the overlap between the end
633/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
634/// trailing lines.
635fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
636    if prev.is_empty() {
637        return new;
638    }
639
640    let prev_lines: Vec<&str> = prev.lines().collect();
641    let new_lines: Vec<&str> = new.lines().collect();
642
643    if new_lines.is_empty() {
644        return "";
645    }
646
647    // prev is non-empty (early return above), so last() always succeeds
648    let last_prev = prev_lines[prev_lines.len() - 1];
649
650    // Find the last line of prev in new to determine the overlap boundary
651    for i in (0..new_lines.len()).rev() {
652        if new_lines[i] == last_prev {
653            // Verify contiguous overlap: check that lines before this match too
654            let overlap_len = prev_lines.len().min(i + 1);
655            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
656            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
657            if prev_tail == new_overlap {
658                if i + 1 < new_lines.len() {
659                    // Return the slice of `new` after the overlap
660                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
661                    return new.get(consumed.min(new.len())..).unwrap_or("");
662                }
663                return "";
664            }
665        }
666    }
667
668    // No overlap found — output changed completely, print it all
669    new
670}
671
672/// Follow logs by polling, printing only new output. Returns when the session ends.
673async fn follow_logs(
674    client: &reqwest::Client,
675    base: &str,
676    name: &str,
677    lines: usize,
678    token: Option<&str>,
679    writer: &mut (dyn std::io::Write + Send),
680) -> Result<()> {
681    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
682    write!(writer, "{prev_output}")?;
683
684    let mut unchanged_ticks: u32 = 0;
685
686    loop {
687        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
688
689        // Fetch latest output
690        let new_output = fetch_output(client, base, name, lines, token).await?;
691
692        let diff = diff_output(&prev_output, &new_output);
693        if diff.is_empty() {
694            unchanged_ticks += 1;
695        } else {
696            write!(writer, "{diff}")?;
697            unchanged_ticks = 0;
698        }
699
700        // Check for agent exit marker in output
701        if new_output.contains(AGENT_EXIT_MARKER) {
702            break;
703        }
704
705        prev_output = new_output;
706
707        // Only check session status when output has been unchanged for 3+ ticks
708        if unchanged_ticks >= 3 {
709            let status = fetch_session_status(client, base, name, token).await?;
710            let is_terminal = status == "ready" || status == "killed" || status == "lost";
711            if is_terminal {
712                break;
713            }
714        }
715    }
716    Ok(())
717}
718
719// --- Schedule API ---
720
721/// Execute a schedule subcommand via the scheduler API.
722#[cfg(not(coverage))]
723async fn execute_schedule(
724    client: &reqwest::Client,
725    action: &ScheduleAction,
726    base: &str,
727    token: Option<&str>,
728) -> Result<String> {
729    match action {
730        ScheduleAction::Add {
731            name,
732            cron,
733            workdir,
734            node,
735            ink,
736            description,
737            command,
738        } => {
739            let cmd = if command.is_empty() {
740                None
741            } else {
742                Some(command.join(" "))
743            };
744            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
745                std::env::current_dir()
746                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
747            });
748            let mut body = serde_json::json!({
749                "name": name,
750                "cron": cron,
751                "workdir": resolved_workdir,
752            });
753            if let Some(c) = &cmd {
754                body["command"] = serde_json::json!(c);
755            }
756            if let Some(n) = node {
757                body["target_node"] = serde_json::json!(n);
758            }
759            if let Some(i) = ink {
760                body["ink"] = serde_json::json!(i);
761            }
762            if let Some(d) = description {
763                body["description"] = serde_json::json!(d);
764            }
765            let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
766                .json(&body)
767                .send()
768                .await?;
769            ok_or_api_error(resp).await?;
770            Ok(format!("Created schedule \"{name}\""))
771        }
772        ScheduleAction::List => {
773            let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
774                .send()
775                .await?;
776            let text = ok_or_api_error(resp).await?;
777            let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
778            Ok(format_schedules(&schedules))
779        }
780        ScheduleAction::Remove { name } => {
781            let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
782                .send()
783                .await?;
784            ok_or_api_error(resp).await?;
785            Ok(format!("Removed schedule \"{name}\""))
786        }
787        ScheduleAction::Pause { name } => {
788            let body = serde_json::json!({ "enabled": false });
789            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
790                .json(&body)
791                .send()
792                .await?;
793            ok_or_api_error(resp).await?;
794            Ok(format!("Paused schedule \"{name}\""))
795        }
796        ScheduleAction::Resume { name } => {
797            let body = serde_json::json!({ "enabled": true });
798            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
799                .json(&body)
800                .send()
801                .await?;
802            ok_or_api_error(resp).await?;
803            Ok(format!("Resumed schedule \"{name}\""))
804        }
805    }
806}
807
808/// Coverage stub for schedule execution.
809#[cfg(coverage)]
810#[allow(clippy::unnecessary_wraps)]
811async fn execute_schedule(
812    _client: &reqwest::Client,
813    _action: &ScheduleAction,
814    _base: &str,
815    _token: Option<&str>,
816) -> Result<String> {
817    Ok(String::new())
818}
819
820/// Format a list of schedules as a table.
821#[cfg_attr(coverage, allow(dead_code))]
822fn format_schedules(schedules: &[serde_json::Value]) -> String {
823    if schedules.is_empty() {
824        return "No schedules.".into();
825    }
826    let mut lines = vec![format!(
827        "{:<20} {:<18} {:<8} {:<12} {}",
828        "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
829    )];
830    for s in schedules {
831        let name = s["name"].as_str().unwrap_or("?");
832        let cron = s["cron"].as_str().unwrap_or("?");
833        let enabled = if s["enabled"].as_bool().unwrap_or(true) {
834            "yes"
835        } else {
836            "no"
837        };
838        let last_run = s["last_run_at"].as_str().map_or("-", |t| &t[..16]);
839        let node = s["target_node"].as_str().unwrap_or("local");
840        lines.push(format!(
841            "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
842        ));
843    }
844    lines.join("\n")
845}
846
847/// Select the best node from the peer registry based on load.
848/// Returns the node address and name of the least loaded online peer.
849/// Scoring: lower memory usage + fewer active sessions = better.
850#[cfg(not(coverage))]
851async fn select_best_node(
852    client: &reqwest::Client,
853    base: &str,
854    token: Option<&str>,
855) -> Result<(String, String)> {
856    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
857        .send()
858        .await?;
859    let text = ok_or_api_error(resp).await?;
860    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
861
862    // Score: fewer active sessions is better, more memory is better
863    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
864
865    for peer in &peers_resp.peers {
866        if peer.status != pulpo_common::peer::PeerStatus::Online {
867            continue;
868        }
869        let sessions = peer.session_count.unwrap_or(0);
870        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
871        // Lower score = better (fewer sessions, more memory)
872        #[allow(clippy::cast_precision_loss)]
873        let score = sessions as f64 - (mem as f64 / 1024.0);
874        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
875            best = Some((peer.address.clone(), peer.name.clone(), score));
876        }
877    }
878
879    // Fall back to local if no online peers
880    match best {
881        Some((addr, name, _)) => Ok((addr, name)),
882        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
883    }
884}
885
886/// Coverage stub — auto-select always falls back to local.
887#[cfg(coverage)]
888#[allow(clippy::unnecessary_wraps)]
889async fn select_best_node(
890    _client: &reqwest::Client,
891    _base: &str,
892    _token: Option<&str>,
893) -> Result<(String, String)> {
894    Ok(("localhost:7433".into(), "local".into()))
895}
896
897/// Execute the given CLI command against the specified node.
898#[allow(clippy::too_many_lines)]
899pub async fn execute(cli: &Cli) -> Result<String> {
900    let client = reqwest::Client::new();
901    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
902    let url = base_url(&resolved_node);
903    let node = &resolved_node;
904    let token = resolve_token(&client, &url, node, cli.token.as_deref())
905        .await
906        .or(peer_token);
907
908    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
909    if cli.command.is_none() {
910        let path = cli.path.as_deref().unwrap_or(".");
911        let resolved_workdir = resolve_path(path);
912        let base_name = derive_session_name(&resolved_workdir);
913        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
914        let body = serde_json::json!({
915            "name": name,
916            "workdir": resolved_workdir,
917        });
918        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
919            .json(&body)
920            .send()
921            .await
922            .map_err(|e| friendly_error(&e, node))?;
923        let text = ok_or_api_error(resp).await?;
924        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
925        let msg = format!(
926            "Created session \"{}\" ({})",
927            resp.session.name, resp.session.id
928        );
929        let backend_id = resp
930            .session
931            .backend_session_id
932            .as_deref()
933            .unwrap_or(&resp.session.name);
934        eprintln!("{msg}");
935        attach_session(backend_id)?;
936        return Ok(format!("Detached from session \"{}\".", resp.session.name));
937    }
938
939    match cli.command.as_ref().unwrap() {
940        Commands::Attach { name } => {
941            // Fetch session to get status and backend_session_id
942            let resp = authed_get(
943                &client,
944                format!("{url}/api/v1/sessions/{name}"),
945                token.as_deref(),
946            )
947            .send()
948            .await
949            .map_err(|e| friendly_error(&e, node))?;
950            let text = ok_or_api_error(resp).await?;
951            let session: Session = serde_json::from_str(&text)?;
952            match session.status.to_string().as_str() {
953                "lost" => {
954                    anyhow::bail!(
955                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
956                    );
957                }
958                "killed" => {
959                    anyhow::bail!(
960                        "Session \"{name}\" is {} — cannot attach to a killed session.",
961                        session.status
962                    );
963                }
964                _ => {}
965            }
966            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
967            attach_session(&backend_id)?;
968            Ok(format!("Detached from session {name}."))
969        }
970        Commands::Input { name, text } => {
971            let input_text = text.as_deref().unwrap_or("\n");
972            let body = serde_json::json!({ "text": input_text });
973            let resp = authed_post(
974                &client,
975                format!("{url}/api/v1/sessions/{name}/input"),
976                token.as_deref(),
977            )
978            .json(&body)
979            .send()
980            .await
981            .map_err(|e| friendly_error(&e, node))?;
982            ok_or_api_error(resp).await?;
983            Ok(format!("Sent input to session {name}."))
984        }
985        Commands::List => {
986            let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
987                .send()
988                .await
989                .map_err(|e| friendly_error(&e, node))?;
990            let text = ok_or_api_error(resp).await?;
991            let sessions: Vec<Session> = serde_json::from_str(&text)?;
992            Ok(format_sessions(&sessions))
993        }
994        Commands::Nodes => {
995            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
996                .send()
997                .await
998                .map_err(|e| friendly_error(&e, node))?;
999            let text = ok_or_api_error(resp).await?;
1000            let resp: PeersResponse = serde_json::from_str(&text)?;
1001            Ok(format_nodes(&resp))
1002        }
1003        Commands::Spawn {
1004            workdir,
1005            name,
1006            ink,
1007            description,
1008            detach,
1009            idle_threshold,
1010            auto,
1011            worktree,
1012            sandbox,
1013            command,
1014        } => {
1015            let cmd = if command.is_empty() {
1016                None
1017            } else {
1018                Some(command.join(" "))
1019            };
1020            // Resolve workdir: --workdir flag > current directory
1021            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1022                std::env::current_dir()
1023                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1024            });
1025            // Resolve name: explicit > derived from workdir (with dedup)
1026            let resolved_name = if let Some(n) = name {
1027                n.clone()
1028            } else {
1029                let base_name = derive_session_name(&resolved_workdir);
1030                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1031            };
1032            let mut body = serde_json::json!({
1033                "name": resolved_name,
1034                "workdir": resolved_workdir,
1035            });
1036            if let Some(c) = &cmd {
1037                body["command"] = serde_json::json!(c);
1038            }
1039            if let Some(i) = ink {
1040                body["ink"] = serde_json::json!(i);
1041            }
1042            if let Some(d) = description {
1043                body["description"] = serde_json::json!(d);
1044            }
1045            if let Some(t) = idle_threshold {
1046                body["idle_threshold_secs"] = serde_json::json!(t);
1047            }
1048            if *worktree {
1049                body["worktree"] = serde_json::json!(true);
1050            }
1051            if *sandbox {
1052                body["sandbox"] = serde_json::json!(true);
1053            }
1054            let spawn_url = if *auto {
1055                let (auto_addr, auto_name) =
1056                    select_best_node(&client, &url, token.as_deref()).await?;
1057                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1058                base_url(&auto_addr)
1059            } else {
1060                url.clone()
1061            };
1062            let resp = authed_post(
1063                &client,
1064                format!("{spawn_url}/api/v1/sessions"),
1065                token.as_deref(),
1066            )
1067            .json(&body)
1068            .send()
1069            .await
1070            .map_err(|e| friendly_error(&e, node))?;
1071            let text = ok_or_api_error(resp).await?;
1072            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1073            let msg = format!(
1074                "Created session \"{}\" ({})",
1075                resp.session.name, resp.session.id
1076            );
1077            if !detach {
1078                let backend_id = resp
1079                    .session
1080                    .backend_session_id
1081                    .as_deref()
1082                    .unwrap_or(&resp.session.name);
1083                eprintln!("{msg}");
1084                attach_session(backend_id)?;
1085                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1086            }
1087            Ok(msg)
1088        }
1089        Commands::Kill { name } => {
1090            let resp = authed_post(
1091                &client,
1092                format!("{url}/api/v1/sessions/{name}/kill"),
1093                token.as_deref(),
1094            )
1095            .send()
1096            .await
1097            .map_err(|e| friendly_error(&e, node))?;
1098            ok_or_api_error(resp).await?;
1099            Ok(format!("Session {name} killed."))
1100        }
1101        Commands::Delete { name } => {
1102            let resp = authed_delete(
1103                &client,
1104                format!("{url}/api/v1/sessions/{name}"),
1105                token.as_deref(),
1106            )
1107            .send()
1108            .await
1109            .map_err(|e| friendly_error(&e, node))?;
1110            ok_or_api_error(resp).await?;
1111            Ok(format!("Session {name} deleted."))
1112        }
1113        Commands::Logs {
1114            name,
1115            lines,
1116            follow,
1117        } => {
1118            if *follow {
1119                let mut stdout = std::io::stdout();
1120                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1121                    .await
1122                    .map_err(|e| {
1123                        // Unwrap reqwest errors to friendly messages
1124                        match e.downcast::<reqwest::Error>() {
1125                            Ok(re) => friendly_error(&re, node),
1126                            Err(other) => other,
1127                        }
1128                    })?;
1129                Ok(String::new())
1130            } else {
1131                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1132                    .await
1133                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1134                        Ok(re) => friendly_error(&re, node),
1135                        Err(other) => other,
1136                    })?;
1137                Ok(output)
1138            }
1139        }
1140        Commands::Interventions { name } => {
1141            let resp = authed_get(
1142                &client,
1143                format!("{url}/api/v1/sessions/{name}/interventions"),
1144                token.as_deref(),
1145            )
1146            .send()
1147            .await
1148            .map_err(|e| friendly_error(&e, node))?;
1149            let text = ok_or_api_error(resp).await?;
1150            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1151            Ok(format_interventions(&events))
1152        }
1153        Commands::Ui => {
1154            let dashboard = base_url(node);
1155            open_browser(&dashboard)?;
1156            Ok(format!("Opening {dashboard}"))
1157        }
1158        Commands::Resume { name } => {
1159            let resp = authed_post(
1160                &client,
1161                format!("{url}/api/v1/sessions/{name}/resume"),
1162                token.as_deref(),
1163            )
1164            .send()
1165            .await
1166            .map_err(|e| friendly_error(&e, node))?;
1167            let text = ok_or_api_error(resp).await?;
1168            let session: Session = serde_json::from_str(&text)?;
1169            let backend_id = session
1170                .backend_session_id
1171                .as_deref()
1172                .unwrap_or(&session.name);
1173            eprintln!("Resumed session \"{}\"", session.name);
1174            attach_session(backend_id)?;
1175            Ok(format!("Detached from session \"{}\".", session.name))
1176        }
1177        Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1178            .await
1179            .map_err(|e| match e.downcast::<reqwest::Error>() {
1180                Ok(re) => friendly_error(&re, node),
1181                Err(other) => other,
1182            }),
1183    }
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188    use super::*;
1189
1190    #[test]
1191    fn test_base_url() {
1192        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1193        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1194    }
1195
1196    #[test]
1197    fn test_cli_parse_list() {
1198        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1199        assert_eq!(cli.node, "localhost:7433");
1200        assert!(matches!(cli.command, Some(Commands::List)));
1201    }
1202
1203    #[test]
1204    fn test_cli_parse_nodes() {
1205        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1206        assert!(matches!(cli.command, Some(Commands::Nodes)));
1207    }
1208
1209    #[test]
1210    fn test_cli_parse_ui() {
1211        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1212        assert!(matches!(cli.command, Some(Commands::Ui)));
1213    }
1214
1215    #[test]
1216    fn test_cli_parse_ui_custom_node() {
1217        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1218        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1219        assert_eq!(cli.node, "mac-mini:7433");
1220        assert_eq!(cli.path.as_deref(), Some("ui"));
1221    }
1222
1223    #[test]
1224    fn test_build_open_command() {
1225        let cmd = build_open_command("http://localhost:7433");
1226        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1227        assert_eq!(args, vec!["http://localhost:7433"]);
1228        #[cfg(target_os = "macos")]
1229        assert_eq!(cmd.get_program(), "open");
1230        #[cfg(target_os = "linux")]
1231        assert_eq!(cmd.get_program(), "xdg-open");
1232    }
1233
1234    #[test]
1235    fn test_cli_parse_spawn() {
1236        let cli = Cli::try_parse_from([
1237            "pulpo",
1238            "spawn",
1239            "my-task",
1240            "--workdir",
1241            "/tmp/repo",
1242            "--",
1243            "claude",
1244            "-p",
1245            "Fix the bug",
1246        ])
1247        .unwrap();
1248        assert!(matches!(
1249            &cli.command,
1250            Some(Commands::Spawn { name, workdir, command, .. })
1251                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1252                && command == &["claude", "-p", "Fix the bug"]
1253        ));
1254    }
1255
1256    #[test]
1257    fn test_cli_parse_spawn_with_ink() {
1258        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1259        assert!(matches!(
1260            &cli.command,
1261            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1262        ));
1263    }
1264
1265    #[test]
1266    fn test_cli_parse_spawn_with_description() {
1267        let cli =
1268            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1269                .unwrap();
1270        assert!(matches!(
1271            &cli.command,
1272            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1273        ));
1274    }
1275
1276    #[test]
1277    fn test_cli_parse_spawn_name_positional() {
1278        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1279        assert!(matches!(
1280            &cli.command,
1281            Some(Commands::Spawn { name, command, .. })
1282                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1283        ));
1284    }
1285
1286    #[test]
1287    fn test_cli_parse_spawn_no_command() {
1288        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1289        assert!(matches!(
1290            &cli.command,
1291            Some(Commands::Spawn { command, .. }) if command.is_empty()
1292        ));
1293    }
1294
1295    #[test]
1296    fn test_cli_parse_spawn_idle_threshold() {
1297        let cli =
1298            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1299        assert!(matches!(
1300            &cli.command,
1301            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1302        ));
1303    }
1304
1305    #[test]
1306    fn test_cli_parse_spawn_idle_threshold_60() {
1307        let cli =
1308            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1309        assert!(matches!(
1310            &cli.command,
1311            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1312        ));
1313    }
1314
1315    #[test]
1316    fn test_cli_parse_spawn_worktree() {
1317        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1318        assert!(matches!(
1319            &cli.command,
1320            Some(Commands::Spawn { worktree, .. }) if *worktree
1321        ));
1322    }
1323
1324    #[test]
1325    fn test_cli_parse_spawn_detach() {
1326        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1327        assert!(matches!(
1328            &cli.command,
1329            Some(Commands::Spawn { detach, .. }) if *detach
1330        ));
1331    }
1332
1333    #[test]
1334    fn test_cli_parse_spawn_detach_short() {
1335        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1336        assert!(matches!(
1337            &cli.command,
1338            Some(Commands::Spawn { detach, .. }) if *detach
1339        ));
1340    }
1341
1342    #[test]
1343    fn test_cli_parse_spawn_detach_default() {
1344        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1345        assert!(matches!(
1346            &cli.command,
1347            Some(Commands::Spawn { detach, .. }) if !detach
1348        ));
1349    }
1350
1351    #[test]
1352    fn test_cli_parse_logs() {
1353        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1354        assert!(matches!(
1355            &cli.command,
1356            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1357        ));
1358    }
1359
1360    #[test]
1361    fn test_cli_parse_logs_with_lines() {
1362        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1363        assert!(matches!(
1364            &cli.command,
1365            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1366        ));
1367    }
1368
1369    #[test]
1370    fn test_cli_parse_logs_follow() {
1371        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1372        assert!(matches!(
1373            &cli.command,
1374            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1375        ));
1376    }
1377
1378    #[test]
1379    fn test_cli_parse_logs_follow_short() {
1380        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1381        assert!(matches!(
1382            &cli.command,
1383            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1384        ));
1385    }
1386
1387    #[test]
1388    fn test_cli_parse_kill() {
1389        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1390        assert!(matches!(
1391            &cli.command,
1392            Some(Commands::Kill { name }) if name == "my-session"
1393        ));
1394    }
1395
1396    #[test]
1397    fn test_cli_parse_delete() {
1398        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1399        assert!(matches!(
1400            &cli.command,
1401            Some(Commands::Delete { name }) if name == "my-session"
1402        ));
1403    }
1404
1405    #[test]
1406    fn test_cli_parse_resume() {
1407        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1408        assert!(matches!(
1409            &cli.command,
1410            Some(Commands::Resume { name }) if name == "my-session"
1411        ));
1412    }
1413
1414    #[test]
1415    fn test_cli_parse_input() {
1416        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1417        assert!(matches!(
1418            &cli.command,
1419            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1420        ));
1421    }
1422
1423    #[test]
1424    fn test_cli_parse_input_no_text() {
1425        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1426        assert!(matches!(
1427            &cli.command,
1428            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1429        ));
1430    }
1431
1432    #[test]
1433    fn test_cli_parse_input_alias() {
1434        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1435        assert!(matches!(
1436            &cli.command,
1437            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1438        ));
1439    }
1440
1441    #[test]
1442    fn test_cli_parse_custom_node() {
1443        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1444        assert_eq!(cli.node, "win-pc:8080");
1445        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
1446        assert_eq!(cli.path.as_deref(), Some("list"));
1447    }
1448
1449    #[test]
1450    fn test_cli_version() {
1451        let result = Cli::try_parse_from(["pulpo", "--version"]);
1452        // clap exits with an error (kind DisplayVersion) when --version is used
1453        let err = result.unwrap_err();
1454        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1455    }
1456
1457    #[test]
1458    fn test_cli_parse_no_subcommand_succeeds() {
1459        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1460        assert!(cli.command.is_none());
1461        assert!(cli.path.is_none());
1462    }
1463
1464    #[test]
1465    fn test_cli_debug() {
1466        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1467        let debug = format!("{cli:?}");
1468        assert!(debug.contains("List"));
1469    }
1470
1471    #[test]
1472    fn test_commands_debug() {
1473        let cmd = Commands::List;
1474        assert_eq!(format!("{cmd:?}"), "List");
1475    }
1476
1477    /// A valid Session JSON for test responses.
1478    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1479
1480    /// A valid `CreateSessionResponse` JSON wrapping the session.
1481    fn test_create_response_json() -> String {
1482        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1483    }
1484
1485    /// Start a lightweight test HTTP server and return its address.
1486    async fn start_test_server() -> String {
1487        use axum::http::StatusCode;
1488        use axum::{
1489            Json, Router,
1490            routing::{get, post},
1491        };
1492
1493        let create_json = test_create_response_json();
1494
1495        let app = Router::new()
1496            .route(
1497                "/api/v1/sessions",
1498                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1499                    (StatusCode::CREATED, create_json.clone())
1500                }),
1501            )
1502            .route(
1503                "/api/v1/sessions/{id}",
1504                get(|| async { TEST_SESSION_JSON.to_owned() })
1505                    .delete(|| async { StatusCode::NO_CONTENT }),
1506            )
1507            .route(
1508                "/api/v1/sessions/{id}/kill",
1509                post(|| async { StatusCode::NO_CONTENT }),
1510            )
1511            .route(
1512                "/api/v1/sessions/{id}/output",
1513                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1514            )
1515            .route(
1516                "/api/v1/peers",
1517                get(|| async {
1518                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1519                }),
1520            )
1521            .route(
1522                "/api/v1/sessions/{id}/resume",
1523                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1524            )
1525            .route(
1526                "/api/v1/sessions/{id}/interventions",
1527                get(|| async { "[]".to_owned() }),
1528            )
1529            .route(
1530                "/api/v1/sessions/{id}/input",
1531                post(|| async { StatusCode::NO_CONTENT }),
1532            )
1533            .route(
1534                "/api/v1/schedules",
1535                get(|| async { Json::<Vec<()>>(vec![]) })
1536                    .post(|| async { StatusCode::CREATED }),
1537            )
1538            .route(
1539                "/api/v1/schedules/{id}",
1540                axum::routing::put(|| async { StatusCode::OK })
1541                    .delete(|| async { StatusCode::NO_CONTENT }),
1542            );
1543
1544        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1545        let addr = listener.local_addr().unwrap();
1546        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1547        format!("127.0.0.1:{}", addr.port())
1548    }
1549
1550    #[tokio::test]
1551    async fn test_execute_list_success() {
1552        let node = start_test_server().await;
1553        let cli = Cli {
1554            node,
1555            token: None,
1556            command: Some(Commands::List),
1557            path: None,
1558        };
1559        let result = execute(&cli).await.unwrap();
1560        assert_eq!(result, "No sessions.");
1561    }
1562
1563    #[tokio::test]
1564    async fn test_execute_nodes_success() {
1565        let node = start_test_server().await;
1566        let cli = Cli {
1567            node,
1568            token: None,
1569            command: Some(Commands::Nodes),
1570            path: None,
1571        };
1572        let result = execute(&cli).await.unwrap();
1573        assert!(result.contains("test"));
1574        assert!(result.contains("(local)"));
1575        assert!(result.contains("NAME"));
1576    }
1577
1578    #[tokio::test]
1579    async fn test_execute_spawn_success() {
1580        let node = start_test_server().await;
1581        let cli = Cli {
1582            node,
1583            token: None,
1584            command: Some(Commands::Spawn {
1585                name: Some("test".into()),
1586                workdir: Some("/tmp/repo".into()),
1587                ink: None,
1588                description: None,
1589                detach: true,
1590                idle_threshold: None,
1591                auto: false,
1592                worktree: false,
1593                sandbox: false,
1594                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1595            }),
1596            path: None,
1597        };
1598        let result = execute(&cli).await.unwrap();
1599        assert!(result.contains("Created session"));
1600        assert!(result.contains("repo"));
1601    }
1602
1603    #[tokio::test]
1604    async fn test_execute_spawn_with_all_flags() {
1605        let node = start_test_server().await;
1606        let cli = Cli {
1607            node,
1608            token: None,
1609            command: Some(Commands::Spawn {
1610                name: Some("test".into()),
1611                workdir: Some("/tmp/repo".into()),
1612                ink: Some("coder".into()),
1613                description: Some("Fix the bug".into()),
1614                detach: true,
1615                idle_threshold: None,
1616                auto: false,
1617                worktree: false,
1618                sandbox: false,
1619                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1620            }),
1621            path: None,
1622        };
1623        let result = execute(&cli).await.unwrap();
1624        assert!(result.contains("Created session"));
1625    }
1626
1627    #[tokio::test]
1628    async fn test_execute_spawn_no_command() {
1629        let node = start_test_server().await;
1630        let cli = Cli {
1631            node,
1632            token: None,
1633            command: Some(Commands::Spawn {
1634                name: Some("test".into()),
1635                workdir: Some("/tmp/repo".into()),
1636                ink: None,
1637                description: None,
1638                detach: true,
1639                idle_threshold: None,
1640                auto: false,
1641                worktree: false,
1642                sandbox: false,
1643                command: vec![],
1644            }),
1645            path: None,
1646        };
1647        let result = execute(&cli).await.unwrap();
1648        assert!(result.contains("Created session"));
1649    }
1650
1651    #[tokio::test]
1652    async fn test_execute_spawn_with_name() {
1653        let node = start_test_server().await;
1654        let cli = Cli {
1655            node,
1656            token: None,
1657            command: Some(Commands::Spawn {
1658                name: Some("my-task".into()),
1659                workdir: Some("/tmp/repo".into()),
1660                ink: None,
1661                description: None,
1662                detach: true,
1663                idle_threshold: None,
1664                auto: false,
1665                worktree: false,
1666                sandbox: false,
1667                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1668            }),
1669            path: None,
1670        };
1671        let result = execute(&cli).await.unwrap();
1672        assert!(result.contains("Created session"));
1673    }
1674
1675    #[tokio::test]
1676    async fn test_execute_spawn_auto_attach() {
1677        let node = start_test_server().await;
1678        let cli = Cli {
1679            node,
1680            token: None,
1681            command: Some(Commands::Spawn {
1682                name: Some("test".into()),
1683                workdir: Some("/tmp/repo".into()),
1684                ink: None,
1685                description: None,
1686                detach: false,
1687                idle_threshold: None,
1688                auto: false,
1689                worktree: false,
1690                sandbox: false,
1691                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1692            }),
1693            path: None,
1694        };
1695        let result = execute(&cli).await.unwrap();
1696        // When not detached, spawn prints creation to stderr and returns detach message
1697        assert!(result.contains("Detached from session"));
1698    }
1699
1700    #[tokio::test]
1701    async fn test_execute_kill_success() {
1702        let node = start_test_server().await;
1703        let cli = Cli {
1704            node,
1705            token: None,
1706            command: Some(Commands::Kill {
1707                name: "test-session".into(),
1708            }),
1709            path: None,
1710        };
1711        let result = execute(&cli).await.unwrap();
1712        assert!(result.contains("killed"));
1713    }
1714
1715    #[tokio::test]
1716    async fn test_execute_delete_success() {
1717        let node = start_test_server().await;
1718        let cli = Cli {
1719            node,
1720            token: None,
1721            command: Some(Commands::Delete {
1722                name: "test-session".into(),
1723            }),
1724            path: None,
1725        };
1726        let result = execute(&cli).await.unwrap();
1727        assert!(result.contains("deleted"));
1728    }
1729
1730    #[tokio::test]
1731    async fn test_execute_logs_success() {
1732        let node = start_test_server().await;
1733        let cli = Cli {
1734            node,
1735            token: None,
1736            command: Some(Commands::Logs {
1737                name: "test-session".into(),
1738                lines: 50,
1739                follow: false,
1740            }),
1741            path: None,
1742        };
1743        let result = execute(&cli).await.unwrap();
1744        assert!(result.contains("test output"));
1745    }
1746
1747    #[tokio::test]
1748    async fn test_execute_list_connection_refused() {
1749        let cli = Cli {
1750            node: "localhost:1".into(),
1751            token: None,
1752            command: Some(Commands::List),
1753            path: None,
1754        };
1755        let result = execute(&cli).await;
1756        let err = result.unwrap_err().to_string();
1757        assert!(
1758            err.contains("Could not connect to pulpod"),
1759            "Expected friendly error, got: {err}"
1760        );
1761        assert!(err.contains("localhost:1"));
1762    }
1763
1764    #[tokio::test]
1765    async fn test_execute_nodes_connection_refused() {
1766        let cli = Cli {
1767            node: "localhost:1".into(),
1768            token: None,
1769            command: Some(Commands::Nodes),
1770            path: None,
1771        };
1772        let result = execute(&cli).await;
1773        let err = result.unwrap_err().to_string();
1774        assert!(err.contains("Could not connect to pulpod"));
1775    }
1776
1777    #[tokio::test]
1778    async fn test_execute_kill_error_response() {
1779        use axum::{Router, http::StatusCode, routing::post};
1780
1781        let app = Router::new().route(
1782            "/api/v1/sessions/{id}/kill",
1783            post(|| async {
1784                (
1785                    StatusCode::NOT_FOUND,
1786                    "{\"error\":\"session not found: test-session\"}",
1787                )
1788            }),
1789        );
1790        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1791        let addr = listener.local_addr().unwrap();
1792        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1793        let node = format!("127.0.0.1:{}", addr.port());
1794
1795        let cli = Cli {
1796            node,
1797            token: None,
1798            command: Some(Commands::Kill {
1799                name: "test-session".into(),
1800            }),
1801            path: None,
1802        };
1803        let err = execute(&cli).await.unwrap_err();
1804        assert_eq!(err.to_string(), "session not found: test-session");
1805    }
1806
1807    #[tokio::test]
1808    async fn test_execute_delete_error_response() {
1809        use axum::{Router, http::StatusCode, routing::delete};
1810
1811        let app = Router::new().route(
1812            "/api/v1/sessions/{id}",
1813            delete(|| async {
1814                (
1815                    StatusCode::CONFLICT,
1816                    "{\"error\":\"cannot delete session in 'running' state\"}",
1817                )
1818            }),
1819        );
1820        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1821        let addr = listener.local_addr().unwrap();
1822        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1823        let node = format!("127.0.0.1:{}", addr.port());
1824
1825        let cli = Cli {
1826            node,
1827            token: None,
1828            command: Some(Commands::Delete {
1829                name: "test-session".into(),
1830            }),
1831            path: None,
1832        };
1833        let err = execute(&cli).await.unwrap_err();
1834        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1835    }
1836
1837    #[tokio::test]
1838    async fn test_execute_logs_error_response() {
1839        use axum::{Router, http::StatusCode, routing::get};
1840
1841        let app = Router::new().route(
1842            "/api/v1/sessions/{id}/output",
1843            get(|| async {
1844                (
1845                    StatusCode::NOT_FOUND,
1846                    "{\"error\":\"session not found: ghost\"}",
1847                )
1848            }),
1849        );
1850        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1851        let addr = listener.local_addr().unwrap();
1852        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1853        let node = format!("127.0.0.1:{}", addr.port());
1854
1855        let cli = Cli {
1856            node,
1857            token: None,
1858            command: Some(Commands::Logs {
1859                name: "ghost".into(),
1860                lines: 50,
1861                follow: false,
1862            }),
1863            path: None,
1864        };
1865        let err = execute(&cli).await.unwrap_err();
1866        assert_eq!(err.to_string(), "session not found: ghost");
1867    }
1868
1869    #[tokio::test]
1870    async fn test_execute_resume_error_response() {
1871        use axum::{Router, http::StatusCode, routing::post};
1872
1873        let app = Router::new().route(
1874            "/api/v1/sessions/{id}/resume",
1875            post(|| async {
1876                (
1877                    StatusCode::BAD_REQUEST,
1878                    "{\"error\":\"session is not lost (status: active)\"}",
1879                )
1880            }),
1881        );
1882        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1883        let addr = listener.local_addr().unwrap();
1884        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1885        let node = format!("127.0.0.1:{}", addr.port());
1886
1887        let cli = Cli {
1888            node,
1889            token: None,
1890            command: Some(Commands::Resume {
1891                name: "test-session".into(),
1892            }),
1893            path: None,
1894        };
1895        let err = execute(&cli).await.unwrap_err();
1896        assert_eq!(err.to_string(), "session is not lost (status: active)");
1897    }
1898
1899    #[tokio::test]
1900    async fn test_execute_spawn_error_response() {
1901        use axum::{Router, http::StatusCode, routing::post};
1902
1903        let app = Router::new().route(
1904            "/api/v1/sessions",
1905            post(|| async {
1906                (
1907                    StatusCode::INTERNAL_SERVER_ERROR,
1908                    "{\"error\":\"failed to spawn session\"}",
1909                )
1910            }),
1911        );
1912        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1913        let addr = listener.local_addr().unwrap();
1914        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1915        let node = format!("127.0.0.1:{}", addr.port());
1916
1917        let cli = Cli {
1918            node,
1919            token: None,
1920            command: Some(Commands::Spawn {
1921                name: Some("test".into()),
1922                workdir: Some("/tmp/repo".into()),
1923                ink: None,
1924                description: None,
1925                detach: true,
1926                idle_threshold: None,
1927                auto: false,
1928                worktree: false,
1929                sandbox: false,
1930                command: vec!["test".into()],
1931            }),
1932            path: None,
1933        };
1934        let err = execute(&cli).await.unwrap_err();
1935        assert_eq!(err.to_string(), "failed to spawn session");
1936    }
1937
1938    #[tokio::test]
1939    async fn test_execute_interventions_error_response() {
1940        use axum::{Router, http::StatusCode, routing::get};
1941
1942        let app = Router::new().route(
1943            "/api/v1/sessions/{id}/interventions",
1944            get(|| async {
1945                (
1946                    StatusCode::NOT_FOUND,
1947                    "{\"error\":\"session not found: ghost\"}",
1948                )
1949            }),
1950        );
1951        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1952        let addr = listener.local_addr().unwrap();
1953        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1954        let node = format!("127.0.0.1:{}", addr.port());
1955
1956        let cli = Cli {
1957            node,
1958            token: None,
1959            command: Some(Commands::Interventions {
1960                name: "ghost".into(),
1961            }),
1962            path: None,
1963        };
1964        let err = execute(&cli).await.unwrap_err();
1965        assert_eq!(err.to_string(), "session not found: ghost");
1966    }
1967
1968    #[tokio::test]
1969    async fn test_execute_resume_success() {
1970        let node = start_test_server().await;
1971        let cli = Cli {
1972            node,
1973            token: None,
1974            command: Some(Commands::Resume {
1975                name: "test-session".into(),
1976            }),
1977            path: None,
1978        };
1979        let result = execute(&cli).await.unwrap();
1980        assert!(result.contains("Detached from session"));
1981    }
1982
1983    #[tokio::test]
1984    async fn test_execute_input_success() {
1985        let node = start_test_server().await;
1986        let cli = Cli {
1987            node,
1988            token: None,
1989            command: Some(Commands::Input {
1990                name: "test-session".into(),
1991                text: Some("yes".into()),
1992            }),
1993            path: None,
1994        };
1995        let result = execute(&cli).await.unwrap();
1996        assert!(result.contains("Sent input to session test-session"));
1997    }
1998
1999    #[tokio::test]
2000    async fn test_execute_input_no_text() {
2001        let node = start_test_server().await;
2002        let cli = Cli {
2003            node,
2004            token: None,
2005            command: Some(Commands::Input {
2006                name: "test-session".into(),
2007                text: None,
2008            }),
2009            path: None,
2010        };
2011        let result = execute(&cli).await.unwrap();
2012        assert!(result.contains("Sent input to session test-session"));
2013    }
2014
2015    #[tokio::test]
2016    async fn test_execute_input_connection_refused() {
2017        let cli = Cli {
2018            node: "localhost:1".into(),
2019            token: None,
2020            command: Some(Commands::Input {
2021                name: "test".into(),
2022                text: Some("y".into()),
2023            }),
2024            path: None,
2025        };
2026        let result = execute(&cli).await;
2027        let err = result.unwrap_err().to_string();
2028        assert!(err.contains("Could not connect to pulpod"));
2029    }
2030
2031    #[tokio::test]
2032    async fn test_execute_input_error_response() {
2033        use axum::{Router, http::StatusCode, routing::post};
2034
2035        let app = Router::new().route(
2036            "/api/v1/sessions/{id}/input",
2037            post(|| async {
2038                (
2039                    StatusCode::NOT_FOUND,
2040                    "{\"error\":\"session not found: ghost\"}",
2041                )
2042            }),
2043        );
2044        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2045        let addr = listener.local_addr().unwrap();
2046        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2047        let node = format!("127.0.0.1:{}", addr.port());
2048
2049        let cli = Cli {
2050            node,
2051            token: None,
2052            command: Some(Commands::Input {
2053                name: "ghost".into(),
2054                text: Some("y".into()),
2055            }),
2056            path: None,
2057        };
2058        let err = execute(&cli).await.unwrap_err();
2059        assert_eq!(err.to_string(), "session not found: ghost");
2060    }
2061
2062    #[tokio::test]
2063    async fn test_execute_ui() {
2064        let cli = Cli {
2065            node: "localhost:7433".into(),
2066            token: None,
2067            command: Some(Commands::Ui),
2068            path: None,
2069        };
2070        let result = execute(&cli).await.unwrap();
2071        assert!(result.contains("Opening"));
2072        assert!(result.contains("http://localhost:7433"));
2073    }
2074
2075    #[tokio::test]
2076    async fn test_execute_ui_custom_node() {
2077        let cli = Cli {
2078            node: "mac-mini:7433".into(),
2079            token: None,
2080            command: Some(Commands::Ui),
2081            path: None,
2082        };
2083        let result = execute(&cli).await.unwrap();
2084        assert!(result.contains("http://mac-mini:7433"));
2085    }
2086
2087    #[test]
2088    fn test_format_sessions_empty() {
2089        assert_eq!(format_sessions(&[]), "No sessions.");
2090    }
2091
2092    #[test]
2093    fn test_format_sessions_with_data() {
2094        use chrono::Utc;
2095        use pulpo_common::session::SessionStatus;
2096        use uuid::Uuid;
2097
2098        let sessions = vec![Session {
2099            id: Uuid::nil(),
2100            name: "my-api".into(),
2101            workdir: "/tmp/repo".into(),
2102            command: "claude -p 'Fix the bug'".into(),
2103            description: Some("Fix the bug".into()),
2104            status: SessionStatus::Active,
2105            exit_code: None,
2106            backend_session_id: None,
2107            output_snapshot: None,
2108            metadata: None,
2109            ink: None,
2110            intervention_code: None,
2111            intervention_reason: None,
2112            intervention_at: None,
2113            last_output_at: None,
2114            idle_since: None,
2115            idle_threshold_secs: None,
2116            worktree_path: None,
2117            sandbox: false,
2118            created_at: Utc::now(),
2119            updated_at: Utc::now(),
2120        }];
2121        let output = format_sessions(&sessions);
2122        assert!(output.contains("NAME"));
2123        assert!(output.contains("COMMAND"));
2124        assert!(output.contains("my-api"));
2125        assert!(output.contains("active"));
2126        assert!(output.contains("claude -p 'Fix the bug'"));
2127    }
2128
2129    #[test]
2130    fn test_format_sessions_long_command_truncated() {
2131        use chrono::Utc;
2132        use pulpo_common::session::SessionStatus;
2133        use uuid::Uuid;
2134
2135        let sessions = vec![Session {
2136            id: Uuid::nil(),
2137            name: "test".into(),
2138            workdir: "/tmp".into(),
2139            command:
2140                "claude -p 'A very long command that exceeds fifty characters in total length here'"
2141                    .into(),
2142            description: None,
2143            status: SessionStatus::Ready,
2144            exit_code: None,
2145            backend_session_id: None,
2146            output_snapshot: None,
2147            metadata: None,
2148            ink: None,
2149            intervention_code: None,
2150            intervention_reason: None,
2151            intervention_at: None,
2152            last_output_at: None,
2153            idle_since: None,
2154            idle_threshold_secs: None,
2155            worktree_path: None,
2156            sandbox: false,
2157            created_at: Utc::now(),
2158            updated_at: Utc::now(),
2159        }];
2160        let output = format_sessions(&sessions);
2161        assert!(output.contains("..."));
2162    }
2163
2164    #[test]
2165    fn test_format_nodes() {
2166        use pulpo_common::node::NodeInfo;
2167        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2168
2169        let resp = PeersResponse {
2170            local: NodeInfo {
2171                name: "mac-mini".into(),
2172                hostname: "h".into(),
2173                os: "macos".into(),
2174                arch: "arm64".into(),
2175                cpus: 8,
2176                memory_mb: 16384,
2177                gpu: None,
2178            },
2179            peers: vec![PeerInfo {
2180                name: "win-pc".into(),
2181                address: "win-pc:7433".into(),
2182                status: PeerStatus::Online,
2183                node_info: None,
2184                session_count: Some(3),
2185                source: PeerSource::Configured,
2186            }],
2187        };
2188        let output = format_nodes(&resp);
2189        assert!(output.contains("mac-mini"));
2190        assert!(output.contains("(local)"));
2191        assert!(output.contains("win-pc"));
2192        assert!(output.contains('3'));
2193    }
2194
2195    #[test]
2196    fn test_format_nodes_no_session_count() {
2197        use pulpo_common::node::NodeInfo;
2198        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2199
2200        let resp = PeersResponse {
2201            local: NodeInfo {
2202                name: "local".into(),
2203                hostname: "h".into(),
2204                os: "linux".into(),
2205                arch: "x86_64".into(),
2206                cpus: 4,
2207                memory_mb: 8192,
2208                gpu: None,
2209            },
2210            peers: vec![PeerInfo {
2211                name: "peer".into(),
2212                address: "peer:7433".into(),
2213                status: PeerStatus::Offline,
2214                node_info: None,
2215                session_count: None,
2216                source: PeerSource::Configured,
2217            }],
2218        };
2219        let output = format_nodes(&resp);
2220        assert!(output.contains("offline"));
2221        // No session count → shows "-"
2222        let lines: Vec<&str> = output.lines().collect();
2223        assert!(lines[2].contains('-'));
2224    }
2225
2226    #[tokio::test]
2227    async fn test_execute_resume_connection_refused() {
2228        let cli = Cli {
2229            node: "localhost:1".into(),
2230            token: None,
2231            command: Some(Commands::Resume {
2232                name: "test".into(),
2233            }),
2234            path: None,
2235        };
2236        let result = execute(&cli).await;
2237        let err = result.unwrap_err().to_string();
2238        assert!(err.contains("Could not connect to pulpod"));
2239    }
2240
2241    #[tokio::test]
2242    async fn test_execute_spawn_connection_refused() {
2243        let cli = Cli {
2244            node: "localhost:1".into(),
2245            token: None,
2246            command: Some(Commands::Spawn {
2247                name: Some("test".into()),
2248                workdir: Some("/tmp".into()),
2249                ink: None,
2250                description: None,
2251                detach: true,
2252                idle_threshold: None,
2253                auto: false,
2254                worktree: false,
2255                sandbox: false,
2256                command: vec!["test".into()],
2257            }),
2258            path: None,
2259        };
2260        let result = execute(&cli).await;
2261        let err = result.unwrap_err().to_string();
2262        assert!(err.contains("Could not connect to pulpod"));
2263    }
2264
2265    #[tokio::test]
2266    async fn test_execute_kill_connection_refused() {
2267        let cli = Cli {
2268            node: "localhost:1".into(),
2269            token: None,
2270            command: Some(Commands::Kill {
2271                name: "test".into(),
2272            }),
2273            path: None,
2274        };
2275        let result = execute(&cli).await;
2276        let err = result.unwrap_err().to_string();
2277        assert!(err.contains("Could not connect to pulpod"));
2278    }
2279
2280    #[tokio::test]
2281    async fn test_execute_delete_connection_refused() {
2282        let cli = Cli {
2283            node: "localhost:1".into(),
2284            token: None,
2285            command: Some(Commands::Delete {
2286                name: "test".into(),
2287            }),
2288            path: None,
2289        };
2290        let result = execute(&cli).await;
2291        let err = result.unwrap_err().to_string();
2292        assert!(err.contains("Could not connect to pulpod"));
2293    }
2294
2295    #[tokio::test]
2296    async fn test_execute_logs_connection_refused() {
2297        let cli = Cli {
2298            node: "localhost:1".into(),
2299            token: None,
2300            command: Some(Commands::Logs {
2301                name: "test".into(),
2302                lines: 50,
2303                follow: false,
2304            }),
2305            path: None,
2306        };
2307        let result = execute(&cli).await;
2308        let err = result.unwrap_err().to_string();
2309        assert!(err.contains("Could not connect to pulpod"));
2310    }
2311
2312    #[tokio::test]
2313    async fn test_friendly_error_connect() {
2314        // Make a request to a closed port to get a connect error
2315        let err = reqwest::Client::new()
2316            .get("http://127.0.0.1:1")
2317            .send()
2318            .await
2319            .unwrap_err();
2320        let friendly = friendly_error(&err, "test-node:1");
2321        let msg = friendly.to_string();
2322        assert!(
2323            msg.contains("Could not connect"),
2324            "Expected connect message, got: {msg}"
2325        );
2326    }
2327
2328    #[tokio::test]
2329    async fn test_friendly_error_other() {
2330        // A request to an invalid URL creates a builder error, not a connect error
2331        let err = reqwest::Client::new()
2332            .get("http://[::invalid::url")
2333            .send()
2334            .await
2335            .unwrap_err();
2336        let friendly = friendly_error(&err, "bad-host");
2337        let msg = friendly.to_string();
2338        assert!(
2339            msg.contains("Network error"),
2340            "Expected network error message, got: {msg}"
2341        );
2342        assert!(msg.contains("bad-host"));
2343    }
2344
2345    // -- Auth helper tests --
2346
2347    #[test]
2348    fn test_is_localhost_variants() {
2349        assert!(is_localhost("localhost:7433"));
2350        assert!(is_localhost("127.0.0.1:7433"));
2351        assert!(is_localhost("[::1]:7433"));
2352        assert!(is_localhost("::1"));
2353        assert!(is_localhost("localhost"));
2354        assert!(!is_localhost("mac-mini:7433"));
2355        assert!(!is_localhost("192.168.1.100:7433"));
2356    }
2357
2358    #[test]
2359    fn test_authed_get_with_token() {
2360        let client = reqwest::Client::new();
2361        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2362            .build()
2363            .unwrap();
2364        let auth = req
2365            .headers()
2366            .get("authorization")
2367            .unwrap()
2368            .to_str()
2369            .unwrap();
2370        assert_eq!(auth, "Bearer tok");
2371    }
2372
2373    #[test]
2374    fn test_authed_get_without_token() {
2375        let client = reqwest::Client::new();
2376        let req = authed_get(&client, "http://h:1/api".into(), None)
2377            .build()
2378            .unwrap();
2379        assert!(req.headers().get("authorization").is_none());
2380    }
2381
2382    #[test]
2383    fn test_authed_post_with_token() {
2384        let client = reqwest::Client::new();
2385        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2386            .build()
2387            .unwrap();
2388        let auth = req
2389            .headers()
2390            .get("authorization")
2391            .unwrap()
2392            .to_str()
2393            .unwrap();
2394        assert_eq!(auth, "Bearer secret");
2395    }
2396
2397    #[test]
2398    fn test_authed_post_without_token() {
2399        let client = reqwest::Client::new();
2400        let req = authed_post(&client, "http://h:1/api".into(), None)
2401            .build()
2402            .unwrap();
2403        assert!(req.headers().get("authorization").is_none());
2404    }
2405
2406    #[test]
2407    fn test_authed_delete_with_token() {
2408        let client = reqwest::Client::new();
2409        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2410            .build()
2411            .unwrap();
2412        let auth = req
2413            .headers()
2414            .get("authorization")
2415            .unwrap()
2416            .to_str()
2417            .unwrap();
2418        assert_eq!(auth, "Bearer del-tok");
2419    }
2420
2421    #[test]
2422    fn test_authed_delete_without_token() {
2423        let client = reqwest::Client::new();
2424        let req = authed_delete(&client, "http://h:1/api".into(), None)
2425            .build()
2426            .unwrap();
2427        assert!(req.headers().get("authorization").is_none());
2428    }
2429
2430    #[tokio::test]
2431    async fn test_resolve_token_explicit() {
2432        let client = reqwest::Client::new();
2433        let token =
2434            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2435        assert_eq!(token, Some("my-tok".into()));
2436    }
2437
2438    #[tokio::test]
2439    async fn test_resolve_token_remote_no_explicit() {
2440        let client = reqwest::Client::new();
2441        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2442        assert_eq!(token, None);
2443    }
2444
2445    #[tokio::test]
2446    async fn test_resolve_token_localhost_auto_discover() {
2447        use axum::{Json, Router, routing::get};
2448
2449        let app = Router::new().route(
2450            "/api/v1/auth/token",
2451            get(|| async {
2452                Json(AuthTokenResponse {
2453                    token: "discovered".into(),
2454                })
2455            }),
2456        );
2457        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2458        let addr = listener.local_addr().unwrap();
2459        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2460
2461        let node = format!("localhost:{}", addr.port());
2462        let base = base_url(&node);
2463        let client = reqwest::Client::new();
2464        let token = resolve_token(&client, &base, &node, None).await;
2465        assert_eq!(token, Some("discovered".into()));
2466    }
2467
2468    #[tokio::test]
2469    async fn test_discover_token_empty_returns_none() {
2470        use axum::{Json, Router, routing::get};
2471
2472        let app = Router::new().route(
2473            "/api/v1/auth/token",
2474            get(|| async {
2475                Json(AuthTokenResponse {
2476                    token: String::new(),
2477                })
2478            }),
2479        );
2480        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2481        let addr = listener.local_addr().unwrap();
2482        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2483
2484        let base = format!("http://127.0.0.1:{}", addr.port());
2485        let client = reqwest::Client::new();
2486        assert_eq!(discover_token(&client, &base).await, None);
2487    }
2488
2489    #[tokio::test]
2490    async fn test_discover_token_unreachable_returns_none() {
2491        let client = reqwest::Client::new();
2492        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2493    }
2494
2495    #[test]
2496    fn test_cli_parse_with_token() {
2497        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2498        assert_eq!(cli.token, Some("my-secret".into()));
2499    }
2500
2501    #[test]
2502    fn test_cli_parse_without_token() {
2503        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2504        assert_eq!(cli.token, None);
2505    }
2506
2507    #[tokio::test]
2508    async fn test_execute_with_explicit_token_sends_header() {
2509        use axum::{Router, extract::Request, http::StatusCode, routing::get};
2510
2511        let app = Router::new().route(
2512            "/api/v1/sessions",
2513            get(|req: Request| async move {
2514                let auth = req
2515                    .headers()
2516                    .get("authorization")
2517                    .and_then(|v| v.to_str().ok())
2518                    .unwrap_or("");
2519                assert_eq!(auth, "Bearer test-token");
2520                (StatusCode::OK, "[]".to_owned())
2521            }),
2522        );
2523        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2524        let addr = listener.local_addr().unwrap();
2525        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2526        let node = format!("127.0.0.1:{}", addr.port());
2527
2528        let cli = Cli {
2529            node,
2530            token: Some("test-token".into()),
2531            command: Some(Commands::List),
2532            path: None,
2533        };
2534        let result = execute(&cli).await.unwrap();
2535        assert_eq!(result, "No sessions.");
2536    }
2537
2538    // -- Interventions tests --
2539
2540    #[test]
2541    fn test_cli_parse_interventions() {
2542        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2543        assert!(matches!(
2544            &cli.command,
2545            Some(Commands::Interventions { name }) if name == "my-session"
2546        ));
2547    }
2548
2549    #[test]
2550    fn test_format_interventions_empty() {
2551        assert_eq!(format_interventions(&[]), "No intervention events.");
2552    }
2553
2554    #[test]
2555    fn test_format_interventions_with_data() {
2556        let events = vec![
2557            InterventionEventResponse {
2558                id: 1,
2559                session_id: "sess-1".into(),
2560                code: None,
2561                reason: "Memory exceeded threshold".into(),
2562                created_at: "2026-01-01T00:00:00Z".into(),
2563            },
2564            InterventionEventResponse {
2565                id: 2,
2566                session_id: "sess-1".into(),
2567                code: None,
2568                reason: "Idle for 10 minutes".into(),
2569                created_at: "2026-01-02T00:00:00Z".into(),
2570            },
2571        ];
2572        let output = format_interventions(&events);
2573        assert!(output.contains("ID"));
2574        assert!(output.contains("TIMESTAMP"));
2575        assert!(output.contains("REASON"));
2576        assert!(output.contains("Memory exceeded threshold"));
2577        assert!(output.contains("Idle for 10 minutes"));
2578        assert!(output.contains("2026-01-01T00:00:00Z"));
2579    }
2580
2581    #[tokio::test]
2582    async fn test_execute_interventions_empty() {
2583        let node = start_test_server().await;
2584        let cli = Cli {
2585            node,
2586            token: None,
2587            command: Some(Commands::Interventions {
2588                name: "my-session".into(),
2589            }),
2590            path: None,
2591        };
2592        let result = execute(&cli).await.unwrap();
2593        assert_eq!(result, "No intervention events.");
2594    }
2595
2596    #[tokio::test]
2597    async fn test_execute_interventions_with_data() {
2598        use axum::{Router, routing::get};
2599
2600        let app = Router::new().route(
2601            "/api/v1/sessions/{id}/interventions",
2602            get(|| async {
2603                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2604                    .to_owned()
2605            }),
2606        );
2607        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2608        let addr = listener.local_addr().unwrap();
2609        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2610        let node = format!("127.0.0.1:{}", addr.port());
2611
2612        let cli = Cli {
2613            node,
2614            token: None,
2615            command: Some(Commands::Interventions {
2616                name: "test".into(),
2617            }),
2618            path: None,
2619        };
2620        let result = execute(&cli).await.unwrap();
2621        assert!(result.contains("OOM"));
2622        assert!(result.contains("2026-01-01T00:00:00Z"));
2623    }
2624
2625    #[tokio::test]
2626    async fn test_execute_interventions_connection_refused() {
2627        let cli = Cli {
2628            node: "localhost:1".into(),
2629            token: None,
2630            command: Some(Commands::Interventions {
2631                name: "test".into(),
2632            }),
2633            path: None,
2634        };
2635        let result = execute(&cli).await;
2636        let err = result.unwrap_err().to_string();
2637        assert!(err.contains("Could not connect to pulpod"));
2638    }
2639
2640    // -- Attach command tests --
2641
2642    #[test]
2643    fn test_build_attach_command() {
2644        let cmd = build_attach_command("my-session");
2645        assert_eq!(cmd.get_program(), "tmux");
2646        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2647        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2648    }
2649
2650    #[test]
2651    fn test_cli_parse_attach() {
2652        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2653        assert!(matches!(
2654            &cli.command,
2655            Some(Commands::Attach { name }) if name == "my-session"
2656        ));
2657    }
2658
2659    #[test]
2660    fn test_cli_parse_attach_alias() {
2661        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2662        assert!(matches!(
2663            &cli.command,
2664            Some(Commands::Attach { name }) if name == "my-session"
2665        ));
2666    }
2667
2668    #[tokio::test]
2669    async fn test_execute_attach_success() {
2670        let node = start_test_server().await;
2671        let cli = Cli {
2672            node,
2673            token: None,
2674            command: Some(Commands::Attach {
2675                name: "test-session".into(),
2676            }),
2677            path: None,
2678        };
2679        let result = execute(&cli).await.unwrap();
2680        assert!(result.contains("Detached from session test-session"));
2681    }
2682
2683    #[tokio::test]
2684    async fn test_execute_attach_with_backend_session_id() {
2685        use axum::{Router, routing::get};
2686        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2687        let app = Router::new().route(
2688            "/api/v1/sessions/{id}",
2689            get(move || async move { session_json.to_owned() }),
2690        );
2691        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2692        let addr = listener.local_addr().unwrap();
2693        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2694
2695        let cli = Cli {
2696            node: format!("127.0.0.1:{}", addr.port()),
2697            token: None,
2698            command: Some(Commands::Attach {
2699                name: "my-session".into(),
2700            }),
2701            path: None,
2702        };
2703        let result = execute(&cli).await.unwrap();
2704        assert!(result.contains("Detached from session my-session"));
2705    }
2706
2707    #[tokio::test]
2708    async fn test_execute_attach_connection_refused() {
2709        let cli = Cli {
2710            node: "localhost:1".into(),
2711            token: None,
2712            command: Some(Commands::Attach {
2713                name: "test-session".into(),
2714            }),
2715            path: None,
2716        };
2717        let result = execute(&cli).await;
2718        let err = result.unwrap_err().to_string();
2719        assert!(err.contains("Could not connect to pulpod"));
2720    }
2721
2722    #[tokio::test]
2723    async fn test_execute_attach_error_response() {
2724        use axum::{Router, http::StatusCode, routing::get};
2725        let app = Router::new().route(
2726            "/api/v1/sessions/{id}",
2727            get(|| async {
2728                (
2729                    StatusCode::NOT_FOUND,
2730                    r#"{"error":"session not found"}"#.to_owned(),
2731                )
2732            }),
2733        );
2734        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2735        let addr = listener.local_addr().unwrap();
2736        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2737
2738        let cli = Cli {
2739            node: format!("127.0.0.1:{}", addr.port()),
2740            token: None,
2741            command: Some(Commands::Attach {
2742                name: "nonexistent".into(),
2743            }),
2744            path: None,
2745        };
2746        let result = execute(&cli).await;
2747        let err = result.unwrap_err().to_string();
2748        assert!(err.contains("session not found"));
2749    }
2750
2751    #[tokio::test]
2752    async fn test_execute_attach_stale_session() {
2753        use axum::{Router, routing::get};
2754        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2755        let app = Router::new().route(
2756            "/api/v1/sessions/{id}",
2757            get(move || async move { session_json.to_owned() }),
2758        );
2759        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2760        let addr = listener.local_addr().unwrap();
2761        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2762
2763        let cli = Cli {
2764            node: format!("127.0.0.1:{}", addr.port()),
2765            token: None,
2766            command: Some(Commands::Attach {
2767                name: "stale-sess".into(),
2768            }),
2769            path: None,
2770        };
2771        let result = execute(&cli).await;
2772        let err = result.unwrap_err().to_string();
2773        assert!(err.contains("lost"));
2774        assert!(err.contains("pulpo resume"));
2775    }
2776
2777    #[tokio::test]
2778    async fn test_execute_attach_dead_session() {
2779        use axum::{Router, routing::get};
2780        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2781        let app = Router::new().route(
2782            "/api/v1/sessions/{id}",
2783            get(move || async move { session_json.to_owned() }),
2784        );
2785        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2786        let addr = listener.local_addr().unwrap();
2787        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2788
2789        let cli = Cli {
2790            node: format!("127.0.0.1:{}", addr.port()),
2791            token: None,
2792            command: Some(Commands::Attach {
2793                name: "dead-sess".into(),
2794            }),
2795            path: None,
2796        };
2797        let result = execute(&cli).await;
2798        let err = result.unwrap_err().to_string();
2799        assert!(err.contains("killed"));
2800        assert!(err.contains("cannot attach"));
2801    }
2802
2803    // -- Alias parse tests --
2804
2805    #[test]
2806    fn test_cli_parse_alias_spawn() {
2807        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
2808        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
2809    }
2810
2811    #[test]
2812    fn test_cli_parse_alias_list() {
2813        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2814        assert!(matches!(&cli.command, Some(Commands::List)));
2815    }
2816
2817    #[test]
2818    fn test_cli_parse_alias_logs() {
2819        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2820        assert!(matches!(
2821            &cli.command,
2822            Some(Commands::Logs { name, .. }) if name == "my-session"
2823        ));
2824    }
2825
2826    #[test]
2827    fn test_cli_parse_alias_kill() {
2828        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2829        assert!(matches!(
2830            &cli.command,
2831            Some(Commands::Kill { name }) if name == "my-session"
2832        ));
2833    }
2834
2835    #[test]
2836    fn test_cli_parse_alias_delete() {
2837        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2838        assert!(matches!(
2839            &cli.command,
2840            Some(Commands::Delete { name }) if name == "my-session"
2841        ));
2842    }
2843
2844    #[test]
2845    fn test_cli_parse_alias_resume() {
2846        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2847        assert!(matches!(
2848            &cli.command,
2849            Some(Commands::Resume { name }) if name == "my-session"
2850        ));
2851    }
2852
2853    #[test]
2854    fn test_cli_parse_alias_nodes() {
2855        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2856        assert!(matches!(&cli.command, Some(Commands::Nodes)));
2857    }
2858
2859    #[test]
2860    fn test_cli_parse_alias_interventions() {
2861        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2862        assert!(matches!(
2863            &cli.command,
2864            Some(Commands::Interventions { name }) if name == "my-session"
2865        ));
2866    }
2867
2868    #[test]
2869    fn test_api_error_json() {
2870        let err = api_error("{\"error\":\"session not found: foo\"}");
2871        assert_eq!(err.to_string(), "session not found: foo");
2872    }
2873
2874    #[test]
2875    fn test_api_error_plain_text() {
2876        let err = api_error("plain text error");
2877        assert_eq!(err.to_string(), "plain text error");
2878    }
2879
2880    // -- diff_output tests --
2881
2882    #[test]
2883    fn test_diff_output_empty_prev() {
2884        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2885    }
2886
2887    #[test]
2888    fn test_diff_output_identical() {
2889        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2890    }
2891
2892    #[test]
2893    fn test_diff_output_new_lines_appended() {
2894        let prev = "line1\nline2";
2895        let new = "line1\nline2\nline3\nline4";
2896        assert_eq!(diff_output(prev, new), "line3\nline4");
2897    }
2898
2899    #[test]
2900    fn test_diff_output_scrolled_window() {
2901        // Window of 3 lines: old lines scroll off top, new appear at bottom
2902        let prev = "line1\nline2\nline3";
2903        let new = "line2\nline3\nline4";
2904        assert_eq!(diff_output(prev, new), "line4");
2905    }
2906
2907    #[test]
2908    fn test_diff_output_completely_different() {
2909        let prev = "aaa\nbbb";
2910        let new = "xxx\nyyy";
2911        assert_eq!(diff_output(prev, new), "xxx\nyyy");
2912    }
2913
2914    #[test]
2915    fn test_diff_output_last_line_matches_but_overlap_fails() {
2916        // Last line of prev appears in new but preceding lines don't match
2917        let prev = "aaa\ncommon";
2918        let new = "zzz\ncommon\nnew_line";
2919        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
2920        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
2921        // Falls through, no verified overlap, so returns everything
2922        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2923    }
2924
2925    #[test]
2926    fn test_diff_output_new_empty() {
2927        assert_eq!(diff_output("line1", ""), "");
2928    }
2929
2930    // -- follow_logs tests --
2931
2932    /// Start a test server that simulates evolving output and session status transitions.
2933    /// Start a test server that simulates evolving output with agent exit marker.
2934    async fn start_follow_test_server() -> String {
2935        use axum::{Router, extract::Path, extract::Query, routing::get};
2936        use std::sync::Arc;
2937        use std::sync::atomic::{AtomicUsize, Ordering};
2938
2939        let call_count = Arc::new(AtomicUsize::new(0));
2940        let output_count = call_count.clone();
2941
2942        let app = Router::new()
2943            .route(
2944                "/api/v1/sessions/{id}/output",
2945                get(
2946                    move |_path: Path<String>,
2947                          _query: Query<std::collections::HashMap<String, String>>| {
2948                        let count = output_count.clone();
2949                        async move {
2950                            let n = count.fetch_add(1, Ordering::SeqCst);
2951                            let output = match n {
2952                                0 => "line1\nline2".to_owned(),
2953                                1 => "line1\nline2\nline3".to_owned(),
2954                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
2955                            };
2956                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2957                        }
2958                    },
2959                ),
2960            )
2961            .route(
2962                "/api/v1/sessions/{id}",
2963                get(|_path: Path<String>| async {
2964                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2965                }),
2966            );
2967
2968        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2969        let addr = listener.local_addr().unwrap();
2970        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2971        format!("http://127.0.0.1:{}", addr.port())
2972    }
2973
2974    #[tokio::test]
2975    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
2976        let base = start_follow_test_server().await;
2977        let client = reqwest::Client::new();
2978        let mut buf = Vec::new();
2979
2980        follow_logs(&client, &base, "test", 100, None, &mut buf)
2981            .await
2982            .unwrap();
2983
2984        let output = String::from_utf8(buf).unwrap();
2985        // Should contain initial output + new lines + agent exit marker
2986        assert!(output.contains("line1"));
2987        assert!(output.contains("line2"));
2988        assert!(output.contains("line3"));
2989        assert!(output.contains("line4"));
2990        assert!(output.contains("[pulpo] Agent exited"));
2991    }
2992
2993    #[tokio::test]
2994    async fn test_execute_logs_follow_success() {
2995        let base = start_follow_test_server().await;
2996        // Extract host:port from http://127.0.0.1:PORT
2997        let node = base.strip_prefix("http://").unwrap().to_owned();
2998
2999        let cli = Cli {
3000            node,
3001            token: None,
3002            command: Some(Commands::Logs {
3003                name: "test".into(),
3004                lines: 100,
3005                follow: true,
3006            }),
3007            path: None,
3008        };
3009        // execute() with follow writes to stdout and returns empty string
3010        let result = execute(&cli).await.unwrap();
3011        assert_eq!(result, "");
3012    }
3013
3014    #[tokio::test]
3015    async fn test_execute_logs_follow_connection_refused() {
3016        let cli = Cli {
3017            node: "localhost:1".into(),
3018            token: None,
3019            command: Some(Commands::Logs {
3020                name: "test".into(),
3021                lines: 50,
3022                follow: true,
3023            }),
3024            path: None,
3025        };
3026        let result = execute(&cli).await;
3027        let err = result.unwrap_err().to_string();
3028        assert!(
3029            err.contains("Could not connect to pulpod"),
3030            "Expected friendly error, got: {err}"
3031        );
3032    }
3033
3034    #[tokio::test]
3035    async fn test_follow_logs_exits_on_dead() {
3036        use axum::{Router, extract::Path, extract::Query, routing::get};
3037
3038        let app = Router::new()
3039            .route(
3040                "/api/v1/sessions/{id}/output",
3041                get(
3042                    |_path: Path<String>,
3043                     _query: Query<std::collections::HashMap<String, String>>| async {
3044                        r#"{"output":"some output"}"#.to_owned()
3045                    },
3046                ),
3047            )
3048            .route(
3049                "/api/v1/sessions/{id}",
3050                get(|_path: Path<String>| async {
3051                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3052                }),
3053            );
3054
3055        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3056        let addr = listener.local_addr().unwrap();
3057        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3058        let base = format!("http://127.0.0.1:{}", addr.port());
3059
3060        let client = reqwest::Client::new();
3061        let mut buf = Vec::new();
3062        follow_logs(&client, &base, "test", 100, None, &mut buf)
3063            .await
3064            .unwrap();
3065
3066        let output = String::from_utf8(buf).unwrap();
3067        assert!(output.contains("some output"));
3068    }
3069
3070    #[tokio::test]
3071    async fn test_follow_logs_exits_on_stale() {
3072        use axum::{Router, extract::Path, extract::Query, routing::get};
3073
3074        let app = Router::new()
3075            .route(
3076                "/api/v1/sessions/{id}/output",
3077                get(
3078                    |_path: Path<String>,
3079                     _query: Query<std::collections::HashMap<String, String>>| async {
3080                        r#"{"output":"stale output"}"#.to_owned()
3081                    },
3082                ),
3083            )
3084            .route(
3085                "/api/v1/sessions/{id}",
3086                get(|_path: Path<String>| async {
3087                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3088                }),
3089            );
3090
3091        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3092        let addr = listener.local_addr().unwrap();
3093        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3094        let base = format!("http://127.0.0.1:{}", addr.port());
3095
3096        let client = reqwest::Client::new();
3097        let mut buf = Vec::new();
3098        follow_logs(&client, &base, "test", 100, None, &mut buf)
3099            .await
3100            .unwrap();
3101
3102        let output = String::from_utf8(buf).unwrap();
3103        assert!(output.contains("stale output"));
3104    }
3105
3106    #[tokio::test]
3107    async fn test_execute_logs_follow_non_reqwest_error() {
3108        use axum::{Router, extract::Path, extract::Query, routing::get};
3109
3110        // Session status endpoint returns invalid JSON to trigger a serde error
3111        let app = Router::new()
3112            .route(
3113                "/api/v1/sessions/{id}/output",
3114                get(
3115                    |_path: Path<String>,
3116                     _query: Query<std::collections::HashMap<String, String>>| async {
3117                        r#"{"output":"initial"}"#.to_owned()
3118                    },
3119                ),
3120            )
3121            .route(
3122                "/api/v1/sessions/{id}",
3123                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3124            );
3125
3126        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3127        let addr = listener.local_addr().unwrap();
3128        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3129        let node = format!("127.0.0.1:{}", addr.port());
3130
3131        let cli = Cli {
3132            node,
3133            token: None,
3134            command: Some(Commands::Logs {
3135                name: "test".into(),
3136                lines: 100,
3137                follow: true,
3138            }),
3139            path: None,
3140        };
3141        let err = execute(&cli).await.unwrap_err();
3142        // serde_json error, not a reqwest error — hits the Err(other) branch
3143        let msg = err.to_string();
3144        assert!(
3145            msg.contains("expected ident"),
3146            "Expected serde parse error, got: {msg}"
3147        );
3148    }
3149
3150    #[tokio::test]
3151    async fn test_fetch_session_status_connection_error() {
3152        let client = reqwest::Client::new();
3153        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3154        assert!(result.is_err());
3155    }
3156
3157    // -- Schedule tests --
3158
3159    #[test]
3160    fn test_format_schedules_empty() {
3161        assert_eq!(format_schedules(&[]), "No schedules.");
3162    }
3163
3164    #[test]
3165    fn test_format_schedules_with_entries() {
3166        let schedules = vec![serde_json::json!({
3167            "name": "nightly",
3168            "cron": "0 3 * * *",
3169            "enabled": true,
3170            "last_run_at": null,
3171            "target_node": null
3172        })];
3173        let output = format_schedules(&schedules);
3174        assert!(output.contains("nightly"));
3175        assert!(output.contains("0 3 * * *"));
3176        assert!(output.contains("local"));
3177        assert!(output.contains("yes"));
3178        assert!(output.contains('-'));
3179    }
3180
3181    #[test]
3182    fn test_format_schedules_disabled_entry() {
3183        let schedules = vec![serde_json::json!({
3184            "name": "weekly",
3185            "cron": "0 0 * * 0",
3186            "enabled": false,
3187            "last_run_at": "2026-03-18T03:00:00Z",
3188            "target_node": "gpu-box"
3189        })];
3190        let output = format_schedules(&schedules);
3191        assert!(output.contains("weekly"));
3192        assert!(output.contains("no"));
3193        assert!(output.contains("gpu-box"));
3194        assert!(output.contains("2026-03-18T03:00"));
3195    }
3196
3197    #[test]
3198    fn test_format_schedules_header() {
3199        let schedules = vec![serde_json::json!({
3200            "name": "test",
3201            "cron": "* * * * *",
3202            "enabled": true,
3203            "last_run_at": null,
3204            "target_node": null
3205        })];
3206        let output = format_schedules(&schedules);
3207        assert!(output.contains("NAME"));
3208        assert!(output.contains("CRON"));
3209        assert!(output.contains("ENABLED"));
3210        assert!(output.contains("LAST RUN"));
3211        assert!(output.contains("NODE"));
3212    }
3213
3214    // -- Schedule CLI parse tests --
3215
3216    #[test]
3217    fn test_cli_parse_schedule_add() {
3218        let cli = Cli::try_parse_from([
3219            "pulpo",
3220            "schedule",
3221            "add",
3222            "nightly",
3223            "0 3 * * *",
3224            "--workdir",
3225            "/repo",
3226            "--",
3227            "claude",
3228            "-p",
3229            "review",
3230        ])
3231        .unwrap();
3232        assert!(matches!(
3233            &cli.command,
3234            Some(Commands::Schedule {
3235                action: ScheduleAction::Add { name, cron, .. }
3236            }) if name == "nightly" && cron == "0 3 * * *"
3237        ));
3238    }
3239
3240    #[test]
3241    fn test_cli_parse_schedule_add_with_node() {
3242        let cli = Cli::try_parse_from([
3243            "pulpo",
3244            "schedule",
3245            "add",
3246            "nightly",
3247            "0 3 * * *",
3248            "--workdir",
3249            "/repo",
3250            "--node",
3251            "gpu-box",
3252            "--",
3253            "claude",
3254        ])
3255        .unwrap();
3256        assert!(matches!(
3257            &cli.command,
3258            Some(Commands::Schedule {
3259                action: ScheduleAction::Add { node, .. }
3260            }) if node.as_deref() == Some("gpu-box")
3261        ));
3262    }
3263
3264    #[test]
3265    fn test_cli_parse_schedule_add_install_alias() {
3266        let cli =
3267            Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3268        assert!(matches!(
3269            &cli.command,
3270            Some(Commands::Schedule {
3271                action: ScheduleAction::Add { name, .. }
3272            }) if name == "nightly"
3273        ));
3274    }
3275
3276    #[test]
3277    fn test_cli_parse_schedule_list() {
3278        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3279        assert!(matches!(
3280            &cli.command,
3281            Some(Commands::Schedule {
3282                action: ScheduleAction::List
3283            })
3284        ));
3285    }
3286
3287    #[test]
3288    fn test_cli_parse_schedule_remove() {
3289        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3290        assert!(matches!(
3291            &cli.command,
3292            Some(Commands::Schedule {
3293                action: ScheduleAction::Remove { name }
3294            }) if name == "nightly"
3295        ));
3296    }
3297
3298    #[test]
3299    fn test_cli_parse_schedule_pause() {
3300        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3301        assert!(matches!(
3302            &cli.command,
3303            Some(Commands::Schedule {
3304                action: ScheduleAction::Pause { name }
3305            }) if name == "nightly"
3306        ));
3307    }
3308
3309    #[test]
3310    fn test_cli_parse_schedule_resume() {
3311        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3312        assert!(matches!(
3313            &cli.command,
3314            Some(Commands::Schedule {
3315                action: ScheduleAction::Resume { name }
3316            }) if name == "nightly"
3317        ));
3318    }
3319
3320    #[test]
3321    fn test_cli_parse_schedule_alias() {
3322        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3323        assert!(matches!(
3324            &cli.command,
3325            Some(Commands::Schedule {
3326                action: ScheduleAction::List
3327            })
3328        ));
3329    }
3330
3331    #[test]
3332    fn test_cli_parse_schedule_list_alias() {
3333        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3334        assert!(matches!(
3335            &cli.command,
3336            Some(Commands::Schedule {
3337                action: ScheduleAction::List
3338            })
3339        ));
3340    }
3341
3342    #[test]
3343    fn test_cli_parse_schedule_remove_alias() {
3344        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3345        assert!(matches!(
3346            &cli.command,
3347            Some(Commands::Schedule {
3348                action: ScheduleAction::Remove { name }
3349            }) if name == "nightly"
3350        ));
3351    }
3352
3353    #[cfg(not(coverage))]
3354    #[tokio::test]
3355    async fn test_execute_schedule_list_via_execute() {
3356        let node = start_test_server().await;
3357        let cli = Cli {
3358            node,
3359            token: None,
3360            command: Some(Commands::Schedule {
3361                action: ScheduleAction::List,
3362            }),
3363            path: None,
3364        };
3365        let result = execute(&cli).await.unwrap();
3366        assert_eq!(result, "No schedules.");
3367    }
3368
3369    #[test]
3370    fn test_schedule_action_debug() {
3371        let action = ScheduleAction::List;
3372        assert_eq!(format!("{action:?}"), "List");
3373    }
3374
3375    #[test]
3376    fn test_cli_parse_send_alias() {
3377        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3378        assert!(matches!(
3379            &cli.command,
3380            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3381        ));
3382    }
3383
3384    #[test]
3385    fn test_cli_parse_spawn_no_name() {
3386        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3387        assert!(matches!(
3388            &cli.command,
3389            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3390        ));
3391    }
3392
3393    #[test]
3394    fn test_cli_parse_spawn_optional_name_with_command() {
3395        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3396        assert!(matches!(
3397            &cli.command,
3398            Some(Commands::Spawn { name, command, .. })
3399                if name.is_none() && command == &["echo", "hello"]
3400        ));
3401    }
3402
3403    #[test]
3404    fn test_cli_parse_path_shortcut() {
3405        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3406        assert!(cli.command.is_none());
3407        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3408    }
3409
3410    #[test]
3411    fn test_cli_parse_no_args() {
3412        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3413        assert!(cli.command.is_none());
3414        assert!(cli.path.is_none());
3415    }
3416
3417    #[test]
3418    fn test_derive_session_name_simple() {
3419        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3420    }
3421
3422    #[test]
3423    fn test_derive_session_name_with_special_chars() {
3424        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
3425    }
3426
3427    #[test]
3428    fn test_derive_session_name_root() {
3429        assert_eq!(derive_session_name("/"), "session");
3430    }
3431
3432    #[test]
3433    fn test_derive_session_name_dots() {
3434        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
3435    }
3436
3437    #[test]
3438    fn test_resolve_path_absolute() {
3439        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
3440    }
3441
3442    #[test]
3443    fn test_resolve_path_relative() {
3444        let resolved = resolve_path("my-repo");
3445        assert!(resolved.ends_with("my-repo"));
3446        assert!(resolved.starts_with('/'));
3447    }
3448
3449    #[tokio::test]
3450    async fn test_execute_path_shortcut() {
3451        let node = start_test_server().await;
3452        let cli = Cli {
3453            node,
3454            token: None,
3455            path: Some("/tmp".into()),
3456            command: None,
3457        };
3458        let result = execute(&cli).await.unwrap();
3459        assert!(result.contains("Detached from session"));
3460    }
3461
3462    // -- Node resolution tests --
3463
3464    #[test]
3465    fn test_node_needs_resolution() {
3466        assert!(!node_needs_resolution("localhost:7433"));
3467        assert!(!node_needs_resolution("mac-mini:7433"));
3468        assert!(!node_needs_resolution("10.0.0.1:7433"));
3469        assert!(!node_needs_resolution("[::1]:7433"));
3470        assert!(node_needs_resolution("mac-mini"));
3471        assert!(node_needs_resolution("linux-server"));
3472        assert!(node_needs_resolution("localhost"));
3473    }
3474
3475    #[tokio::test]
3476    async fn test_resolve_node_with_port() {
3477        let client = reqwest::Client::new();
3478        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
3479        assert_eq!(addr, "mac-mini:7433");
3480        assert!(token.is_none());
3481    }
3482
3483    #[tokio::test]
3484    async fn test_resolve_node_fallback_appends_port() {
3485        // No local daemon running on localhost:7433, so peer lookup fails
3486        // and it falls back to appending :7433
3487        let client = reqwest::Client::new();
3488        let (addr, token) = resolve_node(&client, "unknown-host").await;
3489        assert_eq!(addr, "unknown-host:7433");
3490        assert!(token.is_none());
3491    }
3492
3493    #[cfg(not(coverage))]
3494    #[tokio::test]
3495    async fn test_resolve_node_finds_peer() {
3496        use axum::{Router, routing::get};
3497
3498        let app = Router::new()
3499            .route(
3500                "/api/v1/peers",
3501                get(|| async {
3502                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
3503                }),
3504            )
3505            .route(
3506                "/api/v1/config",
3507                get(|| async {
3508                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
3509                }),
3510            );
3511
3512        // Port 7433 may be in use; skip test if so
3513        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
3514            return;
3515        };
3516        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3517
3518        let client = reqwest::Client::new();
3519        let (addr, token) = resolve_node(&client, "mac-mini").await;
3520        assert_eq!(addr, "10.0.0.5:7433");
3521        assert_eq!(token, Some("peer-secret".into()));
3522    }
3523
3524    #[cfg(not(coverage))]
3525    #[tokio::test]
3526    async fn test_resolve_node_peer_no_token() {
3527        use axum::{Router, routing::get};
3528
3529        let app = Router::new()
3530            .route(
3531                "/api/v1/peers",
3532                get(|| async {
3533                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3534                }),
3535            )
3536            .route(
3537                "/api/v1/config",
3538                get(|| async {
3539                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
3540                }),
3541            );
3542
3543        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
3544            return; // Port in use, skip
3545        };
3546        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3547
3548        let client = reqwest::Client::new();
3549        let (addr, token) = resolve_node(&client, "test-peer").await;
3550        assert_eq!(addr, "10.0.0.9:7433");
3551        assert!(token.is_none()); // Simple peer entry has no token
3552    }
3553
3554    #[tokio::test]
3555    async fn test_execute_with_peer_name_resolution() {
3556        // When node doesn't contain ':', resolve_node is called.
3557        // Since there's no local daemon on port 7433, it falls back to appending :7433.
3558        // The connection to the fallback address will fail, giving us a connection error.
3559        let cli = Cli {
3560            node: "nonexistent-peer".into(),
3561            token: None,
3562            command: Some(Commands::List),
3563            path: None,
3564        };
3565        let result = execute(&cli).await;
3566        // Should try to connect to nonexistent-peer:7433 and fail
3567        assert!(result.is_err());
3568    }
3569
3570    // -- Auto node selection tests --
3571
3572    #[test]
3573    fn test_cli_parse_spawn_auto() {
3574        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
3575        assert!(matches!(
3576            &cli.command,
3577            Some(Commands::Spawn { auto, .. }) if *auto
3578        ));
3579    }
3580
3581    #[test]
3582    fn test_cli_parse_spawn_auto_default() {
3583        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
3584        assert!(matches!(
3585            &cli.command,
3586            Some(Commands::Spawn { auto, .. }) if !auto
3587        ));
3588    }
3589
3590    #[tokio::test]
3591    async fn test_select_best_node_coverage_stub() {
3592        // Exercise the coverage stub (or real function in non-coverage builds)
3593        let client = reqwest::Client::new();
3594        // In coverage builds, the stub returns ("localhost:7433", "local")
3595        // In non-coverage builds, this fails because no server is running — that's OK
3596        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
3597    }
3598
3599    #[cfg(not(coverage))]
3600    #[tokio::test]
3601    async fn test_select_best_node_picks_least_loaded() {
3602        use axum::{Router, routing::get};
3603
3604        let app = Router::new().route(
3605            "/api/v1/peers",
3606            get(|| async {
3607                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
3608            }),
3609        );
3610        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3611        let addr = listener.local_addr().unwrap();
3612        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3613        let base = format!("http://127.0.0.1:{}", addr.port());
3614
3615        let client = reqwest::Client::new();
3616        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3617        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
3618        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
3619        // idle wins (lower score)
3620        assert_eq!(name, "idle");
3621        assert_eq!(addr, "idle:7433");
3622    }
3623
3624    #[cfg(not(coverage))]
3625    #[tokio::test]
3626    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
3627        use axum::{Router, routing::get};
3628
3629        let app = Router::new().route(
3630            "/api/v1/peers",
3631            get(|| async {
3632                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3633            }),
3634        );
3635        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3636        let addr = listener.local_addr().unwrap();
3637        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3638        let base = format!("http://127.0.0.1:{}", addr.port());
3639
3640        let client = reqwest::Client::new();
3641        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3642        assert_eq!(name, "my-mac");
3643        assert_eq!(addr, "localhost:7433");
3644    }
3645
3646    #[cfg(not(coverage))]
3647    #[tokio::test]
3648    async fn test_select_best_node_empty_peers_falls_back_to_local() {
3649        use axum::{Router, routing::get};
3650
3651        let app = Router::new().route(
3652            "/api/v1/peers",
3653            get(|| async {
3654                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
3655            }),
3656        );
3657        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3658        let addr = listener.local_addr().unwrap();
3659        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3660        let base = format!("http://127.0.0.1:{}", addr.port());
3661
3662        let client = reqwest::Client::new();
3663        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3664        assert_eq!(name, "solo");
3665        assert_eq!(addr, "localhost:7433");
3666    }
3667
3668    #[cfg(not(coverage))]
3669    #[tokio::test]
3670    async fn test_execute_spawn_auto_selects_node() {
3671        use axum::{
3672            Router,
3673            http::StatusCode,
3674            routing::{get, post},
3675        };
3676
3677        let create_json = test_create_response_json();
3678
3679        // Bind early so we know the address to embed in the peers response
3680        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3681        let addr = listener.local_addr().unwrap();
3682        let node = format!("127.0.0.1:{}", addr.port());
3683        let peer_addr = node.clone();
3684
3685        let app = Router::new()
3686            .route(
3687                "/api/v1/peers",
3688                get(move || {
3689                    let peer_addr = peer_addr.clone();
3690                    async move {
3691                        format!(
3692                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
3693                        )
3694                    }
3695                }),
3696            )
3697            .route(
3698                "/api/v1/sessions",
3699                post(move || async move {
3700                    (StatusCode::CREATED, create_json.clone())
3701                }),
3702            );
3703
3704        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3705
3706        let cli = Cli {
3707            node,
3708            token: None,
3709            command: Some(Commands::Spawn {
3710                name: Some("test".into()),
3711                workdir: Some("/tmp/repo".into()),
3712                ink: None,
3713                description: None,
3714                detach: true,
3715                idle_threshold: None,
3716                auto: true,
3717                worktree: false,
3718                sandbox: false,
3719                command: vec!["echo".into(), "hello".into()],
3720            }),
3721            path: None,
3722        };
3723        let result = execute(&cli).await.unwrap();
3724        assert!(result.contains("Created session"));
3725    }
3726
3727    #[cfg(not(coverage))]
3728    #[tokio::test]
3729    async fn test_select_best_node_peer_no_session_count() {
3730        use axum::{Router, routing::get};
3731
3732        let app = Router::new().route(
3733            "/api/v1/peers",
3734            get(|| async {
3735                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3736            }),
3737        );
3738        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3739        let addr = listener.local_addr().unwrap();
3740        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3741        let base = format!("http://127.0.0.1:{}", addr.port());
3742
3743        let client = reqwest::Client::new();
3744        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3745        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
3746        assert_eq!(name, "fresh");
3747        assert_eq!(addr, "fresh:7433");
3748    }
3749}