Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6    PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12    name = "pulpo",
13    about = "Manage agent sessions across your machines",
14    version = env!("PULPO_VERSION"),
15    args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18    /// Target node (default: localhost)
19    #[arg(long, default_value = "localhost:7433")]
20    pub node: String,
21
22    /// Auth token (auto-discovered from local daemon if omitted)
23    #[arg(long)]
24    pub token: Option<String>,
25
26    #[command(subcommand)]
27    pub command: Option<Commands>,
28
29    /// Quick spawn: `pulpo <path>` spawns a session in that directory
30    #[arg(value_name = "PATH")]
31    pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37    /// Attach to a session's terminal
38    #[command(visible_alias = "a")]
39    Attach {
40        /// Session name or ID
41        name: String,
42    },
43
44    /// Send input to a session
45    #[command(visible_alias = "i", visible_alias = "send")]
46    Input {
47        /// Session name or ID
48        name: String,
49        /// Text to send (sends Enter if omitted)
50        text: Option<String>,
51    },
52
53    /// Spawn a new agent session
54    #[command(visible_alias = "s")]
55    Spawn {
56        /// Session name (auto-generated from workdir if omitted)
57        name: Option<String>,
58
59        /// Working directory (defaults to current directory)
60        #[arg(long)]
61        workdir: Option<String>,
62
63        /// Ink name (from config)
64        #[arg(long)]
65        ink: Option<String>,
66
67        /// Human-readable description of the task
68        #[arg(long)]
69        description: Option<String>,
70
71        /// Don't attach to the session after spawning
72        #[arg(short, long)]
73        detach: bool,
74
75        /// Idle threshold in seconds (0 = never idle)
76        #[arg(long)]
77        idle_threshold: Option<u32>,
78
79        /// Auto-select the least loaded node
80        #[arg(long)]
81        auto: bool,
82
83        /// Create an isolated git worktree for the session
84        #[arg(long)]
85        worktree: bool,
86
87        /// Runtime environment: tmux (default) or docker
88        #[arg(long)]
89        runtime: Option<String>,
90
91        /// Secrets to inject as environment variables (by name)
92        #[arg(long)]
93        secret: Vec<String>,
94
95        /// Command to run (everything after --)
96        #[arg(last = true)]
97        command: Vec<String>,
98    },
99
100    /// List sessions (live only by default)
101    #[command(visible_alias = "ls")]
102    List {
103        /// Show all sessions including killed and lost
104        #[arg(short, long)]
105        all: bool,
106    },
107
108    /// Show session logs/output
109    #[command(visible_alias = "l")]
110    Logs {
111        /// Session name or ID
112        name: String,
113
114        /// Number of lines to fetch
115        #[arg(long, default_value = "100")]
116        lines: usize,
117
118        /// Follow output (like `tail -f`)
119        #[arg(short, long)]
120        follow: bool,
121    },
122
123    /// Kill a session
124    #[command(visible_alias = "k")]
125    Kill {
126        /// Session name or ID
127        name: String,
128    },
129
130    /// Permanently remove a session from history
131    #[command(visible_alias = "rm")]
132    Delete {
133        /// Session name or ID
134        name: String,
135    },
136
137    /// Resume a lost session
138    #[command(visible_alias = "r")]
139    Resume {
140        /// Session name or ID
141        name: String,
142    },
143
144    /// List all known nodes
145    #[command(visible_alias = "n")]
146    Nodes,
147
148    /// Show intervention history for a session
149    #[command(visible_alias = "iv")]
150    Interventions {
151        /// Session name or ID
152        name: String,
153    },
154
155    /// Open the web dashboard in your browser
156    Ui,
157
158    /// Manage scheduled agent runs
159    #[command(visible_alias = "sched")]
160    Schedule {
161        #[command(subcommand)]
162        action: ScheduleAction,
163    },
164
165    /// Manage secrets (environment variables injected into sessions)
166    #[command(visible_alias = "sec")]
167    Secret {
168        #[command(subcommand)]
169        action: SecretAction,
170    },
171}
172
173#[derive(Subcommand, Debug)]
174pub enum SecretAction {
175    /// Set a secret
176    Set {
177        /// Secret name (will be the env var name, uppercase + underscores)
178        name: String,
179        /// Secret value
180        value: String,
181        /// Environment variable name (defaults to secret name)
182        #[arg(long)]
183        env: Option<String>,
184    },
185    /// List secret names
186    #[command(visible_alias = "ls")]
187    List,
188    /// Delete a secret
189    #[command(visible_alias = "rm")]
190    Delete {
191        /// Secret name
192        name: String,
193    },
194}
195
196#[derive(Subcommand, Debug)]
197pub enum ScheduleAction {
198    /// Add a new schedule
199    #[command(alias = "install")]
200    Add {
201        /// Schedule name
202        name: String,
203        /// Cron expression (e.g. "0 3 * * *")
204        cron: String,
205        /// Working directory
206        #[arg(long)]
207        workdir: Option<String>,
208        /// Target node (omit = local, "auto" = least-loaded)
209        #[arg(long)]
210        node: Option<String>,
211        /// Ink preset
212        #[arg(long)]
213        ink: Option<String>,
214        /// Description
215        #[arg(long)]
216        description: Option<String>,
217        /// Command to run (everything after --)
218        #[arg(last = true)]
219        command: Vec<String>,
220    },
221    /// List all schedules
222    #[command(alias = "ls")]
223    List,
224    /// Remove a schedule
225    #[command(alias = "rm")]
226    Remove {
227        /// Schedule name or ID
228        name: String,
229    },
230    /// Pause a schedule
231    Pause {
232        /// Schedule name or ID
233        name: String,
234    },
235    /// Resume a paused schedule
236    Resume {
237        /// Schedule name or ID
238        name: String,
239    },
240}
241
242/// The marker emitted by the agent wrapper when the agent process exits.
243const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
244
245/// Resolve a path to an absolute path string.
246fn resolve_path(path: &str) -> String {
247    let p = std::path::Path::new(path);
248    if p.is_absolute() {
249        path.to_owned()
250    } else {
251        std::env::current_dir().map_or_else(
252            |_| path.to_owned(),
253            |cwd| cwd.join(p).to_string_lossy().into_owned(),
254        )
255    }
256}
257
258/// Derive a session name from a directory path (basename, kebab-cased).
259fn derive_session_name(path: &str) -> String {
260    let basename = std::path::Path::new(path)
261        .file_name()
262        .and_then(|n| n.to_str())
263        .unwrap_or("session");
264    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
265    let kebab: String = basename
266        .chars()
267        .map(|c| {
268            if c.is_ascii_alphanumeric() {
269                c.to_ascii_lowercase()
270            } else {
271                '-'
272            }
273        })
274        .collect();
275    // Collapse consecutive hyphens and trim leading/trailing hyphens
276    let mut result = String::new();
277    for c in kebab.chars() {
278        if c == '-' && result.ends_with('-') {
279            continue;
280        }
281        result.push(c);
282    }
283    let result = result.trim_matches('-').to_owned();
284    if result.is_empty() {
285        "session".to_owned()
286    } else {
287        result
288    }
289}
290
291/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
292async fn deduplicate_session_name(
293    client: &reqwest::Client,
294    base: &str,
295    name: &str,
296    token: Option<&str>,
297) -> String {
298    // Check if the name is already taken by fetching the session
299    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
300        .send()
301        .await;
302    match resp {
303        Ok(r) if r.status().is_success() => {
304            // Session exists — try suffixed names
305            for i in 2..=99 {
306                let candidate = format!("{name}-{i}");
307                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
308                    .send()
309                    .await;
310                match resp {
311                    Ok(r) if r.status().is_success() => {}
312                    _ => return candidate,
313                }
314            }
315            format!("{name}-100")
316        }
317        _ => name.to_owned(),
318    }
319}
320
321/// Format the base URL from the node address.
322pub fn base_url(node: &str) -> String {
323    format!("http://{node}")
324}
325
326/// Response shape for the output endpoint.
327#[derive(serde::Deserialize)]
328struct OutputResponse {
329    output: String,
330}
331
332/// Format a list of sessions as a table.
333const fn session_runtime(session: &Session) -> &'static str {
334    match session.runtime {
335        Runtime::Tmux => "tmux",
336        Runtime::Docker => "docker",
337    }
338}
339
340fn format_sessions(sessions: &[Session]) -> String {
341    if sessions.is_empty() {
342        return "No sessions.".into();
343    }
344    let mut lines = vec![format!(
345        "{:<10} {:<24} {:<12} {:<8} {}",
346        "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
347    )];
348    for s in sessions {
349        let cmd_display = if s.command.len() > 50 {
350            let truncated: String = s.command.chars().take(47).collect();
351            format!("{truncated}...")
352        } else {
353            s.command.clone()
354        };
355        let short_id = &s.id.to_string()[..8];
356        let has_pr = s
357            .metadata
358            .as_ref()
359            .is_some_and(|m| m.contains_key("pr_url"));
360        let name_display = match (s.worktree_path.is_some(), has_pr) {
361            (true, true) => format!("{} [wt] [PR]", s.name),
362            (true, false) => format!("{} [wt]", s.name),
363            (false, true) => format!("{} [PR]", s.name),
364            (false, false) => s.name.clone(),
365        };
366        lines.push(format!(
367            "{:<10} {:<24} {:<12} {:<8} {}",
368            short_id,
369            name_display,
370            s.status,
371            session_runtime(s),
372            cmd_display
373        ));
374    }
375    lines.join("\n")
376}
377
378/// Format the peers response as a table.
379fn format_nodes(resp: &PeersResponse) -> String {
380    let mut lines = vec![format!(
381        "{:<20} {:<25} {:<10} {}",
382        "NAME", "ADDRESS", "STATUS", "SESSIONS"
383    )];
384    lines.push(format!(
385        "{:<20} {:<25} {:<10} {}",
386        resp.local.name, "(local)", "online", "-"
387    ));
388    for p in &resp.peers {
389        let sessions = p
390            .session_count
391            .map_or_else(|| "-".into(), |c| c.to_string());
392        lines.push(format!(
393            "{:<20} {:<25} {:<10} {}",
394            p.name, p.address, p.status, sessions
395        ));
396    }
397    lines.join("\n")
398}
399
400/// Format intervention events as a table.
401fn format_interventions(events: &[InterventionEventResponse]) -> String {
402    if events.is_empty() {
403        return "No intervention events.".into();
404    }
405    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
406    for e in events {
407        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
408    }
409    lines.join("\n")
410}
411
412/// Build the command to open a URL in the default browser.
413#[cfg_attr(coverage, allow(dead_code))]
414fn build_open_command(url: &str) -> std::process::Command {
415    #[cfg(target_os = "macos")]
416    {
417        let mut cmd = std::process::Command::new("open");
418        cmd.arg(url);
419        cmd
420    }
421    #[cfg(target_os = "linux")]
422    {
423        let mut cmd = std::process::Command::new("xdg-open");
424        cmd.arg(url);
425        cmd
426    }
427    #[cfg(target_os = "windows")]
428    {
429        let mut cmd = std::process::Command::new("cmd");
430        cmd.args(["/C", "start", url]);
431        cmd
432    }
433    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
434    {
435        // Fallback: try xdg-open
436        let mut cmd = std::process::Command::new("xdg-open");
437        cmd.arg(url);
438        cmd
439    }
440}
441
442/// Open a URL in the default browser.
443#[cfg(not(coverage))]
444fn open_browser(url: &str) -> Result<()> {
445    build_open_command(url).status()?;
446    Ok(())
447}
448
449/// Stub for coverage builds — avoids opening a browser during tests.
450#[cfg(coverage)]
451fn open_browser(_url: &str) -> Result<()> {
452    Ok(())
453}
454
455/// Build the command to attach to a session's terminal.
456/// Detects Docker sessions by the `docker:` prefix in the backend session ID.
457#[cfg_attr(coverage, allow(dead_code))]
458fn build_attach_command(backend_session_id: &str) -> std::process::Command {
459    // Docker sessions: exec into the container
460    if let Some(container) = backend_session_id.strip_prefix("docker:") {
461        let mut cmd = std::process::Command::new("docker");
462        cmd.args(["exec", "-it", container, "/bin/sh"]);
463        return cmd;
464    }
465    // tmux sessions
466    #[cfg(not(target_os = "windows"))]
467    {
468        let mut cmd = std::process::Command::new("tmux");
469        cmd.args(["attach-session", "-t", backend_session_id]);
470        cmd
471    }
472    #[cfg(target_os = "windows")]
473    {
474        // tmux attach not available on Windows — inform the user
475        let mut cmd = std::process::Command::new("cmd");
476        cmd.args([
477            "/C",
478            "echo",
479            "Attach not available on Windows. Use the web UI or --runtime docker.",
480        ]);
481        cmd
482    }
483}
484
485/// Attach to a session's terminal.
486#[cfg(not(any(test, coverage, target_os = "windows")))]
487fn attach_session(backend_session_id: &str) -> Result<()> {
488    let status = build_attach_command(backend_session_id).status()?;
489    if !status.success() {
490        anyhow::bail!("attach failed with {status}");
491    }
492    Ok(())
493}
494
495/// Stub for Windows — tmux attach is not available.
496#[cfg(all(target_os = "windows", not(test), not(coverage)))]
497fn attach_session(_backend_session_id: &str) -> Result<()> {
498    eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
499    Ok(())
500}
501
502/// Stub for test and coverage builds — avoids spawning real terminals during tests.
503#[cfg(any(test, coverage))]
504#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
505fn attach_session(_backend_session_id: &str) -> Result<()> {
506    Ok(())
507}
508
509/// Extract a clean error message from an API JSON response (or fall back to raw text).
510fn api_error(text: &str) -> anyhow::Error {
511    serde_json::from_str::<serde_json::Value>(text)
512        .ok()
513        .and_then(|v| v["error"].as_str().map(String::from))
514        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
515}
516
517/// Return the response body text, or a clean error if the response was non-success.
518async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
519    if resp.status().is_success() {
520        Ok(resp.text().await?)
521    } else {
522        let text = resp.text().await?;
523        Err(api_error(&text))
524    }
525}
526
527/// Map a reqwest error to a user-friendly message.
528fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
529    if err.is_connect() {
530        anyhow::anyhow!(
531            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
532        )
533    } else {
534        anyhow::anyhow!("Network error connecting to {node}: {err}")
535    }
536}
537
538/// Check if the node address points to localhost.
539fn is_localhost(node: &str) -> bool {
540    let host = node.split(':').next().unwrap_or(node);
541    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
542}
543
544/// Try to auto-discover the auth token from a local daemon.
545async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
546    let resp = client
547        .get(format!("{base}/api/v1/auth/token"))
548        .send()
549        .await
550        .ok()?;
551    let body: AuthTokenResponse = resp.json().await.ok()?;
552    if body.token.is_empty() {
553        None
554    } else {
555        Some(body.token)
556    }
557}
558
559/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
560async fn resolve_token(
561    client: &reqwest::Client,
562    base: &str,
563    node: &str,
564    explicit: Option<&str>,
565) -> Option<String> {
566    if let Some(t) = explicit {
567        return Some(t.to_owned());
568    }
569    if is_localhost(node) {
570        return discover_token(client, base).await;
571    }
572    None
573}
574
575/// Check if a node string needs resolution (no port specified).
576fn node_needs_resolution(node: &str) -> bool {
577    !node.contains(':')
578}
579
580/// Resolve a node reference to a `host:port` address.
581///
582/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
583/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
584/// online peer is found, return its address and optionally its configured auth token
585/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
586#[cfg(not(coverage))]
587async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
588    // Already has port — use as-is
589    if !node_needs_resolution(node) {
590        return (node.to_owned(), None);
591    }
592
593    // Try to resolve via local daemon's peer registry
594    let local_base = "http://localhost:7433";
595    let mut resolved_address: Option<String> = None;
596
597    if let Ok(resp) = client
598        .get(format!("{local_base}/api/v1/peers"))
599        .send()
600        .await
601        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
602    {
603        for peer in &peers_resp.peers {
604            if peer.name == node {
605                resolved_address = Some(peer.address.clone());
606                break;
607            }
608        }
609    }
610
611    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
612
613    // Try to get the peer's auth token from the config endpoint
614    let peer_token = if let Ok(resp) = client
615        .get(format!("{local_base}/api/v1/config"))
616        .send()
617        .await
618        && let Ok(config) = resp.json::<ConfigResponse>().await
619        && let Some(entry) = config.peers.get(node)
620    {
621        entry.token().map(String::from)
622    } else {
623        None
624    };
625
626    (address, peer_token)
627}
628
629/// Coverage stub — no real HTTP resolution during coverage builds.
630#[cfg(coverage)]
631async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
632    if node_needs_resolution(node) {
633        (format!("{node}:7433"), None)
634    } else {
635        (node.to_owned(), None)
636    }
637}
638
639/// Build an authenticated GET request.
640fn authed_get(
641    client: &reqwest::Client,
642    url: String,
643    token: Option<&str>,
644) -> reqwest::RequestBuilder {
645    let req = client.get(url);
646    if let Some(t) = token {
647        req.bearer_auth(t)
648    } else {
649        req
650    }
651}
652
653/// Build an authenticated POST request.
654fn authed_post(
655    client: &reqwest::Client,
656    url: String,
657    token: Option<&str>,
658) -> reqwest::RequestBuilder {
659    let req = client.post(url);
660    if let Some(t) = token {
661        req.bearer_auth(t)
662    } else {
663        req
664    }
665}
666
667/// Build an authenticated DELETE request.
668fn authed_delete(
669    client: &reqwest::Client,
670    url: String,
671    token: Option<&str>,
672) -> reqwest::RequestBuilder {
673    let req = client.delete(url);
674    if let Some(t) = token {
675        req.bearer_auth(t)
676    } else {
677        req
678    }
679}
680
681/// Build an authenticated PUT request.
682#[cfg(not(coverage))]
683fn authed_put(
684    client: &reqwest::Client,
685    url: String,
686    token: Option<&str>,
687) -> reqwest::RequestBuilder {
688    let req = client.put(url);
689    if let Some(t) = token {
690        req.bearer_auth(t)
691    } else {
692        req
693    }
694}
695
696/// Fetch session output from the API.
697async fn fetch_output(
698    client: &reqwest::Client,
699    base: &str,
700    name: &str,
701    lines: usize,
702    token: Option<&str>,
703) -> Result<String> {
704    let resp = authed_get(
705        client,
706        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
707        token,
708    )
709    .send()
710    .await?;
711    let text = ok_or_api_error(resp).await?;
712    let output: OutputResponse = serde_json::from_str(&text)?;
713    Ok(output.output)
714}
715
716/// Fetch session status from the API.
717async fn fetch_session_status(
718    client: &reqwest::Client,
719    base: &str,
720    name: &str,
721    token: Option<&str>,
722) -> Result<String> {
723    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
724        .send()
725        .await?;
726    let text = ok_or_api_error(resp).await?;
727    let session: Session = serde_json::from_str(&text)?;
728    Ok(session.status.to_string())
729}
730
731/// Wait for the session to leave "creating" state, then check if it died instantly.
732/// Returns an error with a helpful message if the session is lost/killed.
733async fn check_session_alive(
734    client: &reqwest::Client,
735    base: &str,
736    name: &str,
737    token: Option<&str>,
738) -> Result<()> {
739    // Poll up to 3 times at 500ms intervals — handles slow daemons and Docker pull delays
740    for _ in 0..3 {
741        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
742        if let Ok(status) = fetch_session_status(client, base, name, token).await {
743            match status.as_str() {
744                "creating" => continue, // Still starting — wait more
745                "lost" | "killed" => {
746                    anyhow::bail!(
747                        "Session \"{name}\" exited immediately — the command may have failed.\n  Check logs: pulpo logs {name}"
748                    );
749                }
750                _ => return Ok(()), // Active, idle, ready — all good
751            }
752        }
753        // fetch failed (network issue) — don't block, proceed to attach
754        break;
755    }
756    Ok(())
757}
758
759/// Compute the new trailing lines that differ from the previous output.
760///
761/// The output endpoint returns the last N lines from the terminal pane. As new lines
762/// appear, old lines at the top scroll off. We find the overlap between the end
763/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
764/// trailing lines.
765fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
766    if prev.is_empty() {
767        return new;
768    }
769
770    let prev_lines: Vec<&str> = prev.lines().collect();
771    let new_lines: Vec<&str> = new.lines().collect();
772
773    if new_lines.is_empty() {
774        return "";
775    }
776
777    // prev is non-empty (early return above), so last() always succeeds
778    let last_prev = prev_lines[prev_lines.len() - 1];
779
780    // Find the last line of prev in new to determine the overlap boundary
781    for i in (0..new_lines.len()).rev() {
782        if new_lines[i] == last_prev {
783            // Verify contiguous overlap: check that lines before this match too
784            let overlap_len = prev_lines.len().min(i + 1);
785            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
786            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
787            if prev_tail == new_overlap {
788                if i + 1 < new_lines.len() {
789                    // Return the slice of `new` after the overlap
790                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
791                    return new.get(consumed.min(new.len())..).unwrap_or("");
792                }
793                return "";
794            }
795        }
796    }
797
798    // No overlap found — output changed completely, print it all
799    new
800}
801
802/// Follow logs by polling, printing only new output. Returns when the session ends.
803async fn follow_logs(
804    client: &reqwest::Client,
805    base: &str,
806    name: &str,
807    lines: usize,
808    token: Option<&str>,
809    writer: &mut (dyn std::io::Write + Send),
810) -> Result<()> {
811    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
812    write!(writer, "{prev_output}")?;
813
814    let mut unchanged_ticks: u32 = 0;
815
816    loop {
817        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
818
819        // Fetch latest output
820        let new_output = fetch_output(client, base, name, lines, token).await?;
821
822        let diff = diff_output(&prev_output, &new_output);
823        if diff.is_empty() {
824            unchanged_ticks += 1;
825        } else {
826            write!(writer, "{diff}")?;
827            unchanged_ticks = 0;
828        }
829
830        // Check for agent exit marker in output
831        if new_output.contains(AGENT_EXIT_MARKER) {
832            break;
833        }
834
835        prev_output = new_output;
836
837        // Only check session status when output has been unchanged for 3+ ticks
838        if unchanged_ticks >= 3 {
839            let status = fetch_session_status(client, base, name, token).await?;
840            let is_terminal = status == "ready" || status == "killed" || status == "lost";
841            if is_terminal {
842                break;
843            }
844        }
845    }
846    Ok(())
847}
848
849// --- Schedule API ---
850
851/// Execute a schedule subcommand via the scheduler API.
852#[cfg(not(coverage))]
853async fn execute_schedule(
854    client: &reqwest::Client,
855    action: &ScheduleAction,
856    base: &str,
857    token: Option<&str>,
858) -> Result<String> {
859    match action {
860        ScheduleAction::Add {
861            name,
862            cron,
863            workdir,
864            node,
865            ink,
866            description,
867            command,
868        } => {
869            let cmd = if command.is_empty() {
870                None
871            } else {
872                Some(command.join(" "))
873            };
874            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
875                std::env::current_dir()
876                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
877            });
878            let mut body = serde_json::json!({
879                "name": name,
880                "cron": cron,
881                "workdir": resolved_workdir,
882            });
883            if let Some(c) = &cmd {
884                body["command"] = serde_json::json!(c);
885            }
886            if let Some(n) = node {
887                body["target_node"] = serde_json::json!(n);
888            }
889            if let Some(i) = ink {
890                body["ink"] = serde_json::json!(i);
891            }
892            if let Some(d) = description {
893                body["description"] = serde_json::json!(d);
894            }
895            let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
896                .json(&body)
897                .send()
898                .await?;
899            ok_or_api_error(resp).await?;
900            Ok(format!("Created schedule \"{name}\""))
901        }
902        ScheduleAction::List => {
903            let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
904                .send()
905                .await?;
906            let text = ok_or_api_error(resp).await?;
907            let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
908            Ok(format_schedules(&schedules))
909        }
910        ScheduleAction::Remove { name } => {
911            let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
912                .send()
913                .await?;
914            ok_or_api_error(resp).await?;
915            Ok(format!("Removed schedule \"{name}\""))
916        }
917        ScheduleAction::Pause { name } => {
918            let body = serde_json::json!({ "enabled": false });
919            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
920                .json(&body)
921                .send()
922                .await?;
923            ok_or_api_error(resp).await?;
924            Ok(format!("Paused schedule \"{name}\""))
925        }
926        ScheduleAction::Resume { name } => {
927            let body = serde_json::json!({ "enabled": true });
928            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
929                .json(&body)
930                .send()
931                .await?;
932            ok_or_api_error(resp).await?;
933            Ok(format!("Resumed schedule \"{name}\""))
934        }
935    }
936}
937
938/// Coverage stub for schedule execution.
939#[cfg(coverage)]
940#[allow(clippy::unnecessary_wraps)]
941async fn execute_schedule(
942    _client: &reqwest::Client,
943    _action: &ScheduleAction,
944    _base: &str,
945    _token: Option<&str>,
946) -> Result<String> {
947    Ok(String::new())
948}
949
950// --- Secret API ---
951
952/// Format secret entries as a table.
953#[cfg_attr(coverage, allow(dead_code))]
954fn format_secrets(secrets: &[serde_json::Value]) -> String {
955    if secrets.is_empty() {
956        return "No secrets configured.".into();
957    }
958    let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
959    for s in secrets {
960        let name = s["name"].as_str().unwrap_or("?");
961        let env_display = s["env"]
962            .as_str()
963            .map_or_else(|| name.to_owned(), String::from);
964        let created = s["created_at"]
965            .as_str()
966            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
967        lines.push(format!("{name:<24} {env_display:<24} {created}"));
968    }
969    lines.join("\n")
970}
971
972/// Execute a secret subcommand via the secrets API.
973#[cfg(not(coverage))]
974async fn execute_secret(
975    client: &reqwest::Client,
976    action: &SecretAction,
977    base: &str,
978    token: Option<&str>,
979) -> Result<String> {
980    match action {
981        SecretAction::Set { name, value, env } => {
982            let mut body = serde_json::json!({ "value": value });
983            if let Some(e) = env {
984                body["env"] = serde_json::json!(e);
985            }
986            let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
987                .json(&body)
988                .send()
989                .await?;
990            ok_or_api_error(resp).await?;
991            Ok(format!("Secret \"{name}\" set."))
992        }
993        SecretAction::List => {
994            let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
995                .send()
996                .await?;
997            let text = ok_or_api_error(resp).await?;
998            let parsed: serde_json::Value = serde_json::from_str(&text)?;
999            let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1000            Ok(format_secrets(secrets))
1001        }
1002        SecretAction::Delete { name } => {
1003            let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1004                .send()
1005                .await?;
1006            ok_or_api_error(resp).await?;
1007            Ok(format!("Secret \"{name}\" deleted."))
1008        }
1009    }
1010}
1011
1012/// Coverage stub for secret execution.
1013#[cfg(coverage)]
1014#[allow(clippy::unnecessary_wraps)]
1015async fn execute_secret(
1016    _client: &reqwest::Client,
1017    _action: &SecretAction,
1018    _base: &str,
1019    _token: Option<&str>,
1020) -> Result<String> {
1021    Ok(String::new())
1022}
1023
1024/// Format a list of schedules as a table.
1025#[cfg_attr(coverage, allow(dead_code))]
1026fn format_schedules(schedules: &[serde_json::Value]) -> String {
1027    if schedules.is_empty() {
1028        return "No schedules.".into();
1029    }
1030    let mut lines = vec![format!(
1031        "{:<20} {:<18} {:<8} {:<12} {}",
1032        "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1033    )];
1034    for s in schedules {
1035        let name = s["name"].as_str().unwrap_or("?");
1036        let cron = s["cron"].as_str().unwrap_or("?");
1037        let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1038            "yes"
1039        } else {
1040            "no"
1041        };
1042        let last_run = s["last_run_at"]
1043            .as_str()
1044            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1045        let node = s["target_node"].as_str().unwrap_or("local");
1046        lines.push(format!(
1047            "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1048        ));
1049    }
1050    lines.join("\n")
1051}
1052
1053/// Select the best node from the peer registry based on load.
1054/// Returns the node address and name of the least loaded online peer.
1055/// Scoring: lower memory usage + fewer active sessions = better.
1056#[cfg(not(coverage))]
1057async fn select_best_node(
1058    client: &reqwest::Client,
1059    base: &str,
1060    token: Option<&str>,
1061) -> Result<(String, String)> {
1062    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1063        .send()
1064        .await?;
1065    let text = ok_or_api_error(resp).await?;
1066    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1067
1068    // Score: fewer active sessions is better, more memory is better
1069    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
1070
1071    for peer in &peers_resp.peers {
1072        if peer.status != pulpo_common::peer::PeerStatus::Online {
1073            continue;
1074        }
1075        let sessions = peer.session_count.unwrap_or(0);
1076        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1077        // Lower score = better (fewer sessions, more memory)
1078        #[allow(clippy::cast_precision_loss)]
1079        let score = sessions as f64 - (mem as f64 / 1024.0);
1080        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1081            best = Some((peer.address.clone(), peer.name.clone(), score));
1082        }
1083    }
1084
1085    // Fall back to local if no online peers
1086    match best {
1087        Some((addr, name, _)) => Ok((addr, name)),
1088        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1089    }
1090}
1091
1092/// Coverage stub — auto-select always falls back to local.
1093#[cfg(coverage)]
1094#[allow(clippy::unnecessary_wraps)]
1095async fn select_best_node(
1096    _client: &reqwest::Client,
1097    _base: &str,
1098    _token: Option<&str>,
1099) -> Result<(String, String)> {
1100    Ok(("localhost:7433".into(), "local".into()))
1101}
1102
1103/// Execute the given CLI command against the specified node.
1104#[allow(clippy::too_many_lines)]
1105pub async fn execute(cli: &Cli) -> Result<String> {
1106    let client = reqwest::Client::new();
1107    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1108    let url = base_url(&resolved_node);
1109    let node = &resolved_node;
1110    let token = resolve_token(&client, &url, node, cli.token.as_deref())
1111        .await
1112        .or(peer_token);
1113
1114    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
1115    if cli.command.is_none() && cli.path.is_none() {
1116        // No subcommand and no path: print help
1117        use clap::CommandFactory;
1118        let mut cmd = Cli::command();
1119        cmd.print_help()?;
1120        println!();
1121        return Ok(String::new());
1122    }
1123    if cli.command.is_none() {
1124        let path = cli.path.as_deref().unwrap_or(".");
1125        let resolved_workdir = resolve_path(path);
1126        let base_name = derive_session_name(&resolved_workdir);
1127        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1128        let body = serde_json::json!({
1129            "name": name,
1130            "workdir": resolved_workdir,
1131        });
1132        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1133            .json(&body)
1134            .send()
1135            .await
1136            .map_err(|e| friendly_error(&e, node))?;
1137        let text = ok_or_api_error(resp).await?;
1138        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1139        let msg = format!(
1140            "Created session \"{}\" ({})",
1141            resp.session.name, resp.session.id
1142        );
1143        let backend_id = resp
1144            .session
1145            .backend_session_id
1146            .as_deref()
1147            .unwrap_or(&resp.session.name);
1148        eprintln!("{msg}");
1149        check_session_alive(&client, &url, &resp.session.name, token.as_deref()).await?;
1150        attach_session(backend_id)?;
1151        return Ok(format!("Detached from session \"{}\".", resp.session.name));
1152    }
1153
1154    match cli.command.as_ref().unwrap() {
1155        Commands::Attach { name } => {
1156            // Fetch session to get status and backend_session_id
1157            let resp = authed_get(
1158                &client,
1159                format!("{url}/api/v1/sessions/{name}"),
1160                token.as_deref(),
1161            )
1162            .send()
1163            .await
1164            .map_err(|e| friendly_error(&e, node))?;
1165            let text = ok_or_api_error(resp).await?;
1166            let session: Session = serde_json::from_str(&text)?;
1167            match session.status.to_string().as_str() {
1168                "lost" => {
1169                    anyhow::bail!(
1170                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
1171                    );
1172                }
1173                "killed" => {
1174                    anyhow::bail!(
1175                        "Session \"{name}\" is {} — cannot attach to a killed session.",
1176                        session.status
1177                    );
1178                }
1179                _ => {}
1180            }
1181            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1182            attach_session(&backend_id)?;
1183            Ok(format!("Detached from session {name}."))
1184        }
1185        Commands::Input { name, text } => {
1186            let input_text = text.as_deref().unwrap_or("\n");
1187            let body = serde_json::json!({ "text": input_text });
1188            let resp = authed_post(
1189                &client,
1190                format!("{url}/api/v1/sessions/{name}/input"),
1191                token.as_deref(),
1192            )
1193            .json(&body)
1194            .send()
1195            .await
1196            .map_err(|e| friendly_error(&e, node))?;
1197            ok_or_api_error(resp).await?;
1198            Ok(format!("Sent input to session {name}."))
1199        }
1200        Commands::List { all } => {
1201            let list_url = if *all {
1202                format!("{url}/api/v1/sessions")
1203            } else {
1204                format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1205            };
1206            let resp = authed_get(&client, list_url, token.as_deref())
1207                .send()
1208                .await
1209                .map_err(|e| friendly_error(&e, node))?;
1210            let text = ok_or_api_error(resp).await?;
1211            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1212            Ok(format_sessions(&sessions))
1213        }
1214        Commands::Nodes => {
1215            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1216                .send()
1217                .await
1218                .map_err(|e| friendly_error(&e, node))?;
1219            let text = ok_or_api_error(resp).await?;
1220            let resp: PeersResponse = serde_json::from_str(&text)?;
1221            Ok(format_nodes(&resp))
1222        }
1223        Commands::Spawn {
1224            workdir,
1225            name,
1226            ink,
1227            description,
1228            detach,
1229            idle_threshold,
1230            auto,
1231            worktree,
1232            runtime,
1233            secret,
1234            command,
1235        } => {
1236            let cmd = if command.is_empty() {
1237                None
1238            } else {
1239                Some(command.join(" "))
1240            };
1241            // Resolve workdir: --workdir flag > current directory
1242            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1243                std::env::current_dir()
1244                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1245            });
1246            // Resolve name: explicit > derived from workdir (with dedup)
1247            let resolved_name = if let Some(n) = name {
1248                n.clone()
1249            } else {
1250                let base_name = derive_session_name(&resolved_workdir);
1251                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1252            };
1253            let mut body = serde_json::json!({
1254                "name": resolved_name,
1255                "workdir": resolved_workdir,
1256            });
1257            if let Some(c) = &cmd {
1258                body["command"] = serde_json::json!(c);
1259            }
1260            if let Some(i) = ink {
1261                body["ink"] = serde_json::json!(i);
1262            }
1263            if let Some(d) = description {
1264                body["description"] = serde_json::json!(d);
1265            }
1266            if let Some(t) = idle_threshold {
1267                body["idle_threshold_secs"] = serde_json::json!(t);
1268            }
1269            if *worktree {
1270                body["worktree"] = serde_json::json!(true);
1271                eprintln!(
1272                    "Worktree: branch pulpo/{resolved_name} in {resolved_workdir}/.pulpo/worktrees/{resolved_name}/"
1273                );
1274            }
1275            if let Some(rt) = runtime {
1276                body["runtime"] = serde_json::json!(rt);
1277            }
1278            if !secret.is_empty() {
1279                body["secrets"] = serde_json::json!(secret);
1280            }
1281            let spawn_url = if *auto {
1282                let (auto_addr, auto_name) =
1283                    select_best_node(&client, &url, token.as_deref()).await?;
1284                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1285                base_url(&auto_addr)
1286            } else {
1287                url.clone()
1288            };
1289            let resp = authed_post(
1290                &client,
1291                format!("{spawn_url}/api/v1/sessions"),
1292                token.as_deref(),
1293            )
1294            .json(&body)
1295            .send()
1296            .await
1297            .map_err(|e| friendly_error(&e, node))?;
1298            let text = ok_or_api_error(resp).await?;
1299            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1300            let msg = format!(
1301                "Created session \"{}\" ({})",
1302                resp.session.name, resp.session.id
1303            );
1304            if !detach {
1305                let backend_id = resp
1306                    .session
1307                    .backend_session_id
1308                    .as_deref()
1309                    .unwrap_or(&resp.session.name);
1310                eprintln!("{msg}");
1311                check_session_alive(&client, &url, &resp.session.name, token.as_deref()).await?;
1312                attach_session(backend_id)?;
1313                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1314            }
1315            Ok(msg)
1316        }
1317        Commands::Kill { name } => {
1318            let resp = authed_post(
1319                &client,
1320                format!("{url}/api/v1/sessions/{name}/kill"),
1321                token.as_deref(),
1322            )
1323            .send()
1324            .await
1325            .map_err(|e| friendly_error(&e, node))?;
1326            ok_or_api_error(resp).await?;
1327            Ok(format!("Session {name} killed."))
1328        }
1329        Commands::Delete { name } => {
1330            let resp = authed_delete(
1331                &client,
1332                format!("{url}/api/v1/sessions/{name}"),
1333                token.as_deref(),
1334            )
1335            .send()
1336            .await
1337            .map_err(|e| friendly_error(&e, node))?;
1338            ok_or_api_error(resp).await?;
1339            Ok(format!("Session {name} deleted."))
1340        }
1341        Commands::Logs {
1342            name,
1343            lines,
1344            follow,
1345        } => {
1346            if *follow {
1347                let mut stdout = std::io::stdout();
1348                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1349                    .await
1350                    .map_err(|e| {
1351                        // Unwrap reqwest errors to friendly messages
1352                        match e.downcast::<reqwest::Error>() {
1353                            Ok(re) => friendly_error(&re, node),
1354                            Err(other) => other,
1355                        }
1356                    })?;
1357                Ok(String::new())
1358            } else {
1359                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1360                    .await
1361                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1362                        Ok(re) => friendly_error(&re, node),
1363                        Err(other) => other,
1364                    })?;
1365                Ok(output)
1366            }
1367        }
1368        Commands::Interventions { name } => {
1369            let resp = authed_get(
1370                &client,
1371                format!("{url}/api/v1/sessions/{name}/interventions"),
1372                token.as_deref(),
1373            )
1374            .send()
1375            .await
1376            .map_err(|e| friendly_error(&e, node))?;
1377            let text = ok_or_api_error(resp).await?;
1378            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1379            Ok(format_interventions(&events))
1380        }
1381        Commands::Ui => {
1382            let dashboard = base_url(node);
1383            open_browser(&dashboard)?;
1384            Ok(format!("Opening {dashboard}"))
1385        }
1386        Commands::Resume { name } => {
1387            let resp = authed_post(
1388                &client,
1389                format!("{url}/api/v1/sessions/{name}/resume"),
1390                token.as_deref(),
1391            )
1392            .send()
1393            .await
1394            .map_err(|e| friendly_error(&e, node))?;
1395            let text = ok_or_api_error(resp).await?;
1396            let session: Session = serde_json::from_str(&text)?;
1397            let backend_id = session
1398                .backend_session_id
1399                .as_deref()
1400                .unwrap_or(&session.name);
1401            eprintln!("Resumed session \"{}\"", session.name);
1402            check_session_alive(&client, &url, name, token.as_deref()).await?;
1403            attach_session(backend_id)?;
1404            Ok(format!("Detached from session \"{}\".", session.name))
1405        }
1406        Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1407            .await
1408            .map_err(|e| match e.downcast::<reqwest::Error>() {
1409                Ok(re) => friendly_error(&re, node),
1410                Err(other) => other,
1411            }),
1412        Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1413            .await
1414            .map_err(|e| match e.downcast::<reqwest::Error>() {
1415                Ok(re) => friendly_error(&re, node),
1416                Err(other) => other,
1417            }),
1418    }
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423    use super::*;
1424
1425    #[test]
1426    fn test_base_url() {
1427        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1428        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1429    }
1430
1431    #[test]
1432    fn test_cli_parse_list() {
1433        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1434        assert_eq!(cli.node, "localhost:7433");
1435        assert!(matches!(cli.command, Some(Commands::List { .. })));
1436    }
1437
1438    #[test]
1439    fn test_cli_parse_nodes() {
1440        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1441        assert!(matches!(cli.command, Some(Commands::Nodes)));
1442    }
1443
1444    #[test]
1445    fn test_cli_parse_ui() {
1446        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1447        assert!(matches!(cli.command, Some(Commands::Ui)));
1448    }
1449
1450    #[test]
1451    fn test_cli_parse_ui_custom_node() {
1452        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1453        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1454        assert_eq!(cli.node, "mac-mini:7433");
1455        assert_eq!(cli.path.as_deref(), Some("ui"));
1456    }
1457
1458    #[test]
1459    fn test_build_open_command() {
1460        let cmd = build_open_command("http://localhost:7433");
1461        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1462        assert_eq!(args, vec!["http://localhost:7433"]);
1463        #[cfg(target_os = "macos")]
1464        assert_eq!(cmd.get_program(), "open");
1465        #[cfg(target_os = "linux")]
1466        assert_eq!(cmd.get_program(), "xdg-open");
1467    }
1468
1469    #[test]
1470    fn test_cli_parse_spawn() {
1471        let cli = Cli::try_parse_from([
1472            "pulpo",
1473            "spawn",
1474            "my-task",
1475            "--workdir",
1476            "/tmp/repo",
1477            "--",
1478            "claude",
1479            "-p",
1480            "Fix the bug",
1481        ])
1482        .unwrap();
1483        assert!(matches!(
1484            &cli.command,
1485            Some(Commands::Spawn { name, workdir, command, .. })
1486                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1487                && command == &["claude", "-p", "Fix the bug"]
1488        ));
1489    }
1490
1491    #[test]
1492    fn test_cli_parse_spawn_with_ink() {
1493        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1494        assert!(matches!(
1495            &cli.command,
1496            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1497        ));
1498    }
1499
1500    #[test]
1501    fn test_cli_parse_spawn_with_description() {
1502        let cli =
1503            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1504                .unwrap();
1505        assert!(matches!(
1506            &cli.command,
1507            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1508        ));
1509    }
1510
1511    #[test]
1512    fn test_cli_parse_spawn_name_positional() {
1513        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1514        assert!(matches!(
1515            &cli.command,
1516            Some(Commands::Spawn { name, command, .. })
1517                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1518        ));
1519    }
1520
1521    #[test]
1522    fn test_cli_parse_spawn_no_command() {
1523        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1524        assert!(matches!(
1525            &cli.command,
1526            Some(Commands::Spawn { command, .. }) if command.is_empty()
1527        ));
1528    }
1529
1530    #[test]
1531    fn test_cli_parse_spawn_idle_threshold() {
1532        let cli =
1533            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1534        assert!(matches!(
1535            &cli.command,
1536            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1537        ));
1538    }
1539
1540    #[test]
1541    fn test_cli_parse_spawn_idle_threshold_60() {
1542        let cli =
1543            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1544        assert!(matches!(
1545            &cli.command,
1546            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1547        ));
1548    }
1549
1550    #[test]
1551    fn test_cli_parse_spawn_secrets() {
1552        let cli = Cli::try_parse_from([
1553            "pulpo",
1554            "spawn",
1555            "my-task",
1556            "--secret",
1557            "GITHUB_TOKEN",
1558            "--secret",
1559            "NPM_TOKEN",
1560        ])
1561        .unwrap();
1562        assert!(matches!(
1563            &cli.command,
1564            Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1565        ));
1566    }
1567
1568    #[test]
1569    fn test_cli_parse_spawn_no_secrets() {
1570        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1571        assert!(matches!(
1572            &cli.command,
1573            Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1574        ));
1575    }
1576
1577    #[test]
1578    fn test_cli_parse_secret_set_with_env() {
1579        let cli = Cli::try_parse_from([
1580            "pulpo",
1581            "secret",
1582            "set",
1583            "GH_WORK",
1584            "token123",
1585            "--env",
1586            "GITHUB_TOKEN",
1587        ])
1588        .unwrap();
1589        assert!(matches!(
1590            &cli.command,
1591            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1592                if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1593        ));
1594    }
1595
1596    #[test]
1597    fn test_cli_parse_secret_set_without_env() {
1598        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1599        assert!(matches!(
1600            &cli.command,
1601            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1602                if name == "MY_KEY" && value == "val" && env.is_none()
1603        ));
1604    }
1605
1606    #[test]
1607    fn test_cli_parse_spawn_worktree() {
1608        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1609        assert!(matches!(
1610            &cli.command,
1611            Some(Commands::Spawn { worktree, .. }) if *worktree
1612        ));
1613    }
1614
1615    #[test]
1616    fn test_cli_parse_spawn_detach() {
1617        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1618        assert!(matches!(
1619            &cli.command,
1620            Some(Commands::Spawn { detach, .. }) if *detach
1621        ));
1622    }
1623
1624    #[test]
1625    fn test_cli_parse_spawn_detach_short() {
1626        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1627        assert!(matches!(
1628            &cli.command,
1629            Some(Commands::Spawn { detach, .. }) if *detach
1630        ));
1631    }
1632
1633    #[test]
1634    fn test_cli_parse_spawn_detach_default() {
1635        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1636        assert!(matches!(
1637            &cli.command,
1638            Some(Commands::Spawn { detach, .. }) if !detach
1639        ));
1640    }
1641
1642    #[test]
1643    fn test_cli_parse_logs() {
1644        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1645        assert!(matches!(
1646            &cli.command,
1647            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1648        ));
1649    }
1650
1651    #[test]
1652    fn test_cli_parse_logs_with_lines() {
1653        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1654        assert!(matches!(
1655            &cli.command,
1656            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1657        ));
1658    }
1659
1660    #[test]
1661    fn test_cli_parse_logs_follow() {
1662        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1663        assert!(matches!(
1664            &cli.command,
1665            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1666        ));
1667    }
1668
1669    #[test]
1670    fn test_cli_parse_logs_follow_short() {
1671        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1672        assert!(matches!(
1673            &cli.command,
1674            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1675        ));
1676    }
1677
1678    #[test]
1679    fn test_cli_parse_kill() {
1680        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1681        assert!(matches!(
1682            &cli.command,
1683            Some(Commands::Kill { name }) if name == "my-session"
1684        ));
1685    }
1686
1687    #[test]
1688    fn test_cli_parse_delete() {
1689        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1690        assert!(matches!(
1691            &cli.command,
1692            Some(Commands::Delete { name }) if name == "my-session"
1693        ));
1694    }
1695
1696    #[test]
1697    fn test_cli_parse_resume() {
1698        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1699        assert!(matches!(
1700            &cli.command,
1701            Some(Commands::Resume { name }) if name == "my-session"
1702        ));
1703    }
1704
1705    #[test]
1706    fn test_cli_parse_input() {
1707        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1708        assert!(matches!(
1709            &cli.command,
1710            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1711        ));
1712    }
1713
1714    #[test]
1715    fn test_cli_parse_input_no_text() {
1716        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1717        assert!(matches!(
1718            &cli.command,
1719            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1720        ));
1721    }
1722
1723    #[test]
1724    fn test_cli_parse_input_alias() {
1725        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1726        assert!(matches!(
1727            &cli.command,
1728            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1729        ));
1730    }
1731
1732    #[test]
1733    fn test_cli_parse_custom_node() {
1734        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1735        assert_eq!(cli.node, "win-pc:8080");
1736        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
1737        assert_eq!(cli.path.as_deref(), Some("list"));
1738    }
1739
1740    #[test]
1741    fn test_cli_version() {
1742        let result = Cli::try_parse_from(["pulpo", "--version"]);
1743        // clap exits with an error (kind DisplayVersion) when --version is used
1744        let err = result.unwrap_err();
1745        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1746    }
1747
1748    #[test]
1749    fn test_cli_parse_no_subcommand_succeeds() {
1750        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1751        assert!(cli.command.is_none());
1752        assert!(cli.path.is_none());
1753    }
1754
1755    #[test]
1756    fn test_cli_debug() {
1757        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1758        let debug = format!("{cli:?}");
1759        assert!(debug.contains("List"));
1760    }
1761
1762    #[test]
1763    fn test_commands_debug() {
1764        let cmd = Commands::List { all: false };
1765        assert_eq!(format!("{cmd:?}"), "List { all: false }");
1766    }
1767
1768    /// A valid Session JSON for test responses.
1769    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1770
1771    /// A valid `CreateSessionResponse` JSON wrapping the session.
1772    fn test_create_response_json() -> String {
1773        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1774    }
1775
1776    /// Start a lightweight test HTTP server and return its address.
1777    async fn start_test_server() -> String {
1778        use axum::http::StatusCode;
1779        use axum::{
1780            Json, Router,
1781            routing::{get, post},
1782        };
1783
1784        let create_json = test_create_response_json();
1785
1786        let app = Router::new()
1787            .route(
1788                "/api/v1/sessions",
1789                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1790                    (StatusCode::CREATED, create_json.clone())
1791                }),
1792            )
1793            .route(
1794                "/api/v1/sessions/{id}",
1795                get(|| async { TEST_SESSION_JSON.to_owned() })
1796                    .delete(|| async { StatusCode::NO_CONTENT }),
1797            )
1798            .route(
1799                "/api/v1/sessions/{id}/kill",
1800                post(|| async { StatusCode::NO_CONTENT }),
1801            )
1802            .route(
1803                "/api/v1/sessions/{id}/output",
1804                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1805            )
1806            .route(
1807                "/api/v1/peers",
1808                get(|| async {
1809                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1810                }),
1811            )
1812            .route(
1813                "/api/v1/sessions/{id}/resume",
1814                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1815            )
1816            .route(
1817                "/api/v1/sessions/{id}/interventions",
1818                get(|| async { "[]".to_owned() }),
1819            )
1820            .route(
1821                "/api/v1/sessions/{id}/input",
1822                post(|| async { StatusCode::NO_CONTENT }),
1823            )
1824            .route(
1825                "/api/v1/schedules",
1826                get(|| async { Json::<Vec<()>>(vec![]) })
1827                    .post(|| async { StatusCode::CREATED }),
1828            )
1829            .route(
1830                "/api/v1/schedules/{id}",
1831                axum::routing::put(|| async { StatusCode::OK })
1832                    .delete(|| async { StatusCode::NO_CONTENT }),
1833            );
1834
1835        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1836        let addr = listener.local_addr().unwrap();
1837        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1838        format!("127.0.0.1:{}", addr.port())
1839    }
1840
1841    #[tokio::test]
1842    async fn test_execute_list_success() {
1843        let node = start_test_server().await;
1844        let cli = Cli {
1845            node,
1846            token: None,
1847            command: Some(Commands::List { all: false }),
1848            path: None,
1849        };
1850        let result = execute(&cli).await.unwrap();
1851        assert_eq!(result, "No sessions.");
1852    }
1853
1854    #[tokio::test]
1855    async fn test_execute_nodes_success() {
1856        let node = start_test_server().await;
1857        let cli = Cli {
1858            node,
1859            token: None,
1860            command: Some(Commands::Nodes),
1861            path: None,
1862        };
1863        let result = execute(&cli).await.unwrap();
1864        assert!(result.contains("test"));
1865        assert!(result.contains("(local)"));
1866        assert!(result.contains("NAME"));
1867    }
1868
1869    #[tokio::test]
1870    async fn test_execute_spawn_success() {
1871        let node = start_test_server().await;
1872        let cli = Cli {
1873            node,
1874            token: None,
1875            command: Some(Commands::Spawn {
1876                name: Some("test".into()),
1877                workdir: Some("/tmp/repo".into()),
1878                ink: None,
1879                description: None,
1880                detach: true,
1881                idle_threshold: None,
1882                auto: false,
1883                worktree: false,
1884                runtime: None,
1885                secret: vec![],
1886                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1887            }),
1888            path: None,
1889        };
1890        let result = execute(&cli).await.unwrap();
1891        assert!(result.contains("Created session"));
1892        assert!(result.contains("repo"));
1893    }
1894
1895    #[tokio::test]
1896    async fn test_execute_spawn_with_all_flags() {
1897        let node = start_test_server().await;
1898        let cli = Cli {
1899            node,
1900            token: None,
1901            command: Some(Commands::Spawn {
1902                name: Some("test".into()),
1903                workdir: Some("/tmp/repo".into()),
1904                ink: Some("coder".into()),
1905                description: Some("Fix the bug".into()),
1906                detach: true,
1907                idle_threshold: None,
1908                auto: false,
1909                worktree: false,
1910                runtime: None,
1911                secret: vec![],
1912                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1913            }),
1914            path: None,
1915        };
1916        let result = execute(&cli).await.unwrap();
1917        assert!(result.contains("Created session"));
1918    }
1919
1920    #[tokio::test]
1921    async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
1922        let node = start_test_server().await;
1923        let cli = Cli {
1924            node,
1925            token: None,
1926            command: Some(Commands::Spawn {
1927                name: Some("full-opts".into()),
1928                workdir: Some("/tmp/repo".into()),
1929                ink: Some("coder".into()),
1930                description: Some("Full options".into()),
1931                detach: true,
1932                idle_threshold: Some(120),
1933                auto: false,
1934                worktree: true,
1935                runtime: Some("docker".into()),
1936                secret: vec![],
1937                command: vec!["claude".into()],
1938            }),
1939            path: None,
1940        };
1941        let result = execute(&cli).await.unwrap();
1942        assert!(result.contains("Created session"));
1943    }
1944
1945    #[tokio::test]
1946    async fn test_execute_spawn_no_name_derives_from_workdir() {
1947        let node = start_test_server().await;
1948        let cli = Cli {
1949            node,
1950            token: None,
1951            command: Some(Commands::Spawn {
1952                name: None,
1953                workdir: Some("/tmp/my-project".into()),
1954                ink: None,
1955                description: None,
1956                detach: true,
1957                idle_threshold: None,
1958                auto: false,
1959                worktree: false,
1960                runtime: None,
1961                secret: vec![],
1962                command: vec!["echo".into(), "hello".into()],
1963            }),
1964            path: None,
1965        };
1966        let result = execute(&cli).await.unwrap();
1967        assert!(result.contains("Created session"));
1968    }
1969
1970    #[tokio::test]
1971    async fn test_execute_spawn_no_command() {
1972        let node = start_test_server().await;
1973        let cli = Cli {
1974            node,
1975            token: None,
1976            command: Some(Commands::Spawn {
1977                name: Some("test".into()),
1978                workdir: Some("/tmp/repo".into()),
1979                ink: None,
1980                description: None,
1981                detach: true,
1982                idle_threshold: None,
1983                auto: false,
1984                worktree: false,
1985                runtime: None,
1986                secret: vec![],
1987                command: vec![],
1988            }),
1989            path: None,
1990        };
1991        let result = execute(&cli).await.unwrap();
1992        assert!(result.contains("Created session"));
1993    }
1994
1995    #[tokio::test]
1996    async fn test_execute_spawn_with_name() {
1997        let node = start_test_server().await;
1998        let cli = Cli {
1999            node,
2000            token: None,
2001            command: Some(Commands::Spawn {
2002                name: Some("my-task".into()),
2003                workdir: Some("/tmp/repo".into()),
2004                ink: None,
2005                description: None,
2006                detach: true,
2007                idle_threshold: None,
2008                auto: false,
2009                worktree: false,
2010                runtime: None,
2011                secret: vec![],
2012                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2013            }),
2014            path: None,
2015        };
2016        let result = execute(&cli).await.unwrap();
2017        assert!(result.contains("Created session"));
2018    }
2019
2020    #[tokio::test]
2021    async fn test_execute_spawn_auto_attach() {
2022        let node = start_test_server().await;
2023        let cli = Cli {
2024            node,
2025            token: None,
2026            command: Some(Commands::Spawn {
2027                name: Some("test".into()),
2028                workdir: Some("/tmp/repo".into()),
2029                ink: None,
2030                description: None,
2031                detach: false,
2032                idle_threshold: None,
2033                auto: false,
2034                worktree: false,
2035                runtime: None,
2036                secret: vec![],
2037                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2038            }),
2039            path: None,
2040        };
2041        let result = execute(&cli).await.unwrap();
2042        // When not detached, spawn prints creation to stderr and returns detach message
2043        assert!(result.contains("Detached from session"));
2044    }
2045
2046    #[tokio::test]
2047    async fn test_execute_kill_success() {
2048        let node = start_test_server().await;
2049        let cli = Cli {
2050            node,
2051            token: None,
2052            command: Some(Commands::Kill {
2053                name: "test-session".into(),
2054            }),
2055            path: None,
2056        };
2057        let result = execute(&cli).await.unwrap();
2058        assert!(result.contains("killed"));
2059    }
2060
2061    #[tokio::test]
2062    async fn test_execute_delete_success() {
2063        let node = start_test_server().await;
2064        let cli = Cli {
2065            node,
2066            token: None,
2067            command: Some(Commands::Delete {
2068                name: "test-session".into(),
2069            }),
2070            path: None,
2071        };
2072        let result = execute(&cli).await.unwrap();
2073        assert!(result.contains("deleted"));
2074    }
2075
2076    #[tokio::test]
2077    async fn test_execute_logs_success() {
2078        let node = start_test_server().await;
2079        let cli = Cli {
2080            node,
2081            token: None,
2082            command: Some(Commands::Logs {
2083                name: "test-session".into(),
2084                lines: 50,
2085                follow: false,
2086            }),
2087            path: None,
2088        };
2089        let result = execute(&cli).await.unwrap();
2090        assert!(result.contains("test output"));
2091    }
2092
2093    #[tokio::test]
2094    async fn test_execute_list_connection_refused() {
2095        let cli = Cli {
2096            node: "localhost:1".into(),
2097            token: None,
2098            command: Some(Commands::List { all: false }),
2099            path: None,
2100        };
2101        let result = execute(&cli).await;
2102        let err = result.unwrap_err().to_string();
2103        assert!(
2104            err.contains("Could not connect to pulpod"),
2105            "Expected friendly error, got: {err}"
2106        );
2107        assert!(err.contains("localhost:1"));
2108    }
2109
2110    #[tokio::test]
2111    async fn test_execute_nodes_connection_refused() {
2112        let cli = Cli {
2113            node: "localhost:1".into(),
2114            token: None,
2115            command: Some(Commands::Nodes),
2116            path: None,
2117        };
2118        let result = execute(&cli).await;
2119        let err = result.unwrap_err().to_string();
2120        assert!(err.contains("Could not connect to pulpod"));
2121    }
2122
2123    #[tokio::test]
2124    async fn test_execute_kill_error_response() {
2125        use axum::{Router, http::StatusCode, routing::post};
2126
2127        let app = Router::new().route(
2128            "/api/v1/sessions/{id}/kill",
2129            post(|| async {
2130                (
2131                    StatusCode::NOT_FOUND,
2132                    "{\"error\":\"session not found: test-session\"}",
2133                )
2134            }),
2135        );
2136        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2137        let addr = listener.local_addr().unwrap();
2138        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2139        let node = format!("127.0.0.1:{}", addr.port());
2140
2141        let cli = Cli {
2142            node,
2143            token: None,
2144            command: Some(Commands::Kill {
2145                name: "test-session".into(),
2146            }),
2147            path: None,
2148        };
2149        let err = execute(&cli).await.unwrap_err();
2150        assert_eq!(err.to_string(), "session not found: test-session");
2151    }
2152
2153    #[tokio::test]
2154    async fn test_execute_delete_error_response() {
2155        use axum::{Router, http::StatusCode, routing::delete};
2156
2157        let app = Router::new().route(
2158            "/api/v1/sessions/{id}",
2159            delete(|| async {
2160                (
2161                    StatusCode::CONFLICT,
2162                    "{\"error\":\"cannot delete session in 'running' state\"}",
2163                )
2164            }),
2165        );
2166        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2167        let addr = listener.local_addr().unwrap();
2168        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2169        let node = format!("127.0.0.1:{}", addr.port());
2170
2171        let cli = Cli {
2172            node,
2173            token: None,
2174            command: Some(Commands::Delete {
2175                name: "test-session".into(),
2176            }),
2177            path: None,
2178        };
2179        let err = execute(&cli).await.unwrap_err();
2180        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
2181    }
2182
2183    #[tokio::test]
2184    async fn test_execute_logs_error_response() {
2185        use axum::{Router, http::StatusCode, routing::get};
2186
2187        let app = Router::new().route(
2188            "/api/v1/sessions/{id}/output",
2189            get(|| async {
2190                (
2191                    StatusCode::NOT_FOUND,
2192                    "{\"error\":\"session not found: ghost\"}",
2193                )
2194            }),
2195        );
2196        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2197        let addr = listener.local_addr().unwrap();
2198        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2199        let node = format!("127.0.0.1:{}", addr.port());
2200
2201        let cli = Cli {
2202            node,
2203            token: None,
2204            command: Some(Commands::Logs {
2205                name: "ghost".into(),
2206                lines: 50,
2207                follow: false,
2208            }),
2209            path: None,
2210        };
2211        let err = execute(&cli).await.unwrap_err();
2212        assert_eq!(err.to_string(), "session not found: ghost");
2213    }
2214
2215    #[tokio::test]
2216    async fn test_execute_resume_error_response() {
2217        use axum::{Router, http::StatusCode, routing::post};
2218
2219        let app = Router::new().route(
2220            "/api/v1/sessions/{id}/resume",
2221            post(|| async {
2222                (
2223                    StatusCode::BAD_REQUEST,
2224                    "{\"error\":\"session is not lost (status: active)\"}",
2225                )
2226            }),
2227        );
2228        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2229        let addr = listener.local_addr().unwrap();
2230        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2231        let node = format!("127.0.0.1:{}", addr.port());
2232
2233        let cli = Cli {
2234            node,
2235            token: None,
2236            command: Some(Commands::Resume {
2237                name: "test-session".into(),
2238            }),
2239            path: None,
2240        };
2241        let err = execute(&cli).await.unwrap_err();
2242        assert_eq!(err.to_string(), "session is not lost (status: active)");
2243    }
2244
2245    #[tokio::test]
2246    async fn test_execute_spawn_error_response() {
2247        use axum::{Router, http::StatusCode, routing::post};
2248
2249        let app = Router::new().route(
2250            "/api/v1/sessions",
2251            post(|| async {
2252                (
2253                    StatusCode::INTERNAL_SERVER_ERROR,
2254                    "{\"error\":\"failed to spawn session\"}",
2255                )
2256            }),
2257        );
2258        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2259        let addr = listener.local_addr().unwrap();
2260        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2261        let node = format!("127.0.0.1:{}", addr.port());
2262
2263        let cli = Cli {
2264            node,
2265            token: None,
2266            command: Some(Commands::Spawn {
2267                name: Some("test".into()),
2268                workdir: Some("/tmp/repo".into()),
2269                ink: None,
2270                description: None,
2271                detach: true,
2272                idle_threshold: None,
2273                auto: false,
2274                worktree: false,
2275                runtime: None,
2276                secret: vec![],
2277                command: vec!["test".into()],
2278            }),
2279            path: None,
2280        };
2281        let err = execute(&cli).await.unwrap_err();
2282        assert_eq!(err.to_string(), "failed to spawn session");
2283    }
2284
2285    #[tokio::test]
2286    async fn test_execute_interventions_error_response() {
2287        use axum::{Router, http::StatusCode, routing::get};
2288
2289        let app = Router::new().route(
2290            "/api/v1/sessions/{id}/interventions",
2291            get(|| async {
2292                (
2293                    StatusCode::NOT_FOUND,
2294                    "{\"error\":\"session not found: ghost\"}",
2295                )
2296            }),
2297        );
2298        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2299        let addr = listener.local_addr().unwrap();
2300        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2301        let node = format!("127.0.0.1:{}", addr.port());
2302
2303        let cli = Cli {
2304            node,
2305            token: None,
2306            command: Some(Commands::Interventions {
2307                name: "ghost".into(),
2308            }),
2309            path: None,
2310        };
2311        let err = execute(&cli).await.unwrap_err();
2312        assert_eq!(err.to_string(), "session not found: ghost");
2313    }
2314
2315    #[tokio::test]
2316    async fn test_execute_resume_success() {
2317        let node = start_test_server().await;
2318        let cli = Cli {
2319            node,
2320            token: None,
2321            command: Some(Commands::Resume {
2322                name: "test-session".into(),
2323            }),
2324            path: None,
2325        };
2326        let result = execute(&cli).await.unwrap();
2327        assert!(result.contains("Detached from session"));
2328    }
2329
2330    #[tokio::test]
2331    async fn test_execute_input_success() {
2332        let node = start_test_server().await;
2333        let cli = Cli {
2334            node,
2335            token: None,
2336            command: Some(Commands::Input {
2337                name: "test-session".into(),
2338                text: Some("yes".into()),
2339            }),
2340            path: None,
2341        };
2342        let result = execute(&cli).await.unwrap();
2343        assert!(result.contains("Sent input to session test-session"));
2344    }
2345
2346    #[tokio::test]
2347    async fn test_execute_input_no_text() {
2348        let node = start_test_server().await;
2349        let cli = Cli {
2350            node,
2351            token: None,
2352            command: Some(Commands::Input {
2353                name: "test-session".into(),
2354                text: None,
2355            }),
2356            path: None,
2357        };
2358        let result = execute(&cli).await.unwrap();
2359        assert!(result.contains("Sent input to session test-session"));
2360    }
2361
2362    #[tokio::test]
2363    async fn test_execute_input_connection_refused() {
2364        let cli = Cli {
2365            node: "localhost:1".into(),
2366            token: None,
2367            command: Some(Commands::Input {
2368                name: "test".into(),
2369                text: Some("y".into()),
2370            }),
2371            path: None,
2372        };
2373        let result = execute(&cli).await;
2374        let err = result.unwrap_err().to_string();
2375        assert!(err.contains("Could not connect to pulpod"));
2376    }
2377
2378    #[tokio::test]
2379    async fn test_execute_input_error_response() {
2380        use axum::{Router, http::StatusCode, routing::post};
2381
2382        let app = Router::new().route(
2383            "/api/v1/sessions/{id}/input",
2384            post(|| async {
2385                (
2386                    StatusCode::NOT_FOUND,
2387                    "{\"error\":\"session not found: ghost\"}",
2388                )
2389            }),
2390        );
2391        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2392        let addr = listener.local_addr().unwrap();
2393        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2394        let node = format!("127.0.0.1:{}", addr.port());
2395
2396        let cli = Cli {
2397            node,
2398            token: None,
2399            command: Some(Commands::Input {
2400                name: "ghost".into(),
2401                text: Some("y".into()),
2402            }),
2403            path: None,
2404        };
2405        let err = execute(&cli).await.unwrap_err();
2406        assert_eq!(err.to_string(), "session not found: ghost");
2407    }
2408
2409    #[tokio::test]
2410    async fn test_execute_ui() {
2411        let cli = Cli {
2412            node: "localhost:7433".into(),
2413            token: None,
2414            command: Some(Commands::Ui),
2415            path: None,
2416        };
2417        let result = execute(&cli).await.unwrap();
2418        assert!(result.contains("Opening"));
2419        assert!(result.contains("http://localhost:7433"));
2420    }
2421
2422    #[tokio::test]
2423    async fn test_execute_ui_custom_node() {
2424        let cli = Cli {
2425            node: "mac-mini:7433".into(),
2426            token: None,
2427            command: Some(Commands::Ui),
2428            path: None,
2429        };
2430        let result = execute(&cli).await.unwrap();
2431        assert!(result.contains("http://mac-mini:7433"));
2432    }
2433
2434    #[test]
2435    fn test_format_sessions_empty() {
2436        assert_eq!(format_sessions(&[]), "No sessions.");
2437    }
2438
2439    #[test]
2440    fn test_format_sessions_with_data() {
2441        use chrono::Utc;
2442        use pulpo_common::session::SessionStatus;
2443        use uuid::Uuid;
2444
2445        let sessions = vec![Session {
2446            id: Uuid::nil(),
2447            name: "my-api".into(),
2448            workdir: "/tmp/repo".into(),
2449            command: "claude -p 'Fix the bug'".into(),
2450            description: Some("Fix the bug".into()),
2451            status: SessionStatus::Active,
2452            exit_code: None,
2453            backend_session_id: None,
2454            output_snapshot: None,
2455            metadata: None,
2456            ink: None,
2457            intervention_code: None,
2458            intervention_reason: None,
2459            intervention_at: None,
2460            last_output_at: None,
2461            idle_since: None,
2462            idle_threshold_secs: None,
2463            worktree_path: None,
2464            runtime: Runtime::Tmux,
2465            created_at: Utc::now(),
2466            updated_at: Utc::now(),
2467        }];
2468        let output = format_sessions(&sessions);
2469        assert!(output.contains("ID"));
2470        assert!(output.contains("NAME"));
2471        assert!(output.contains("RUNTIME"));
2472        assert!(output.contains("COMMAND"));
2473        assert!(output.contains("00000000"));
2474        assert!(output.contains("my-api"));
2475        assert!(output.contains("active"));
2476        assert!(output.contains("tmux"));
2477        assert!(output.contains("claude -p 'Fix the bug'"));
2478    }
2479
2480    #[test]
2481    fn test_format_sessions_docker_runtime() {
2482        use chrono::Utc;
2483        use pulpo_common::session::SessionStatus;
2484        use uuid::Uuid;
2485
2486        let sessions = vec![Session {
2487            id: Uuid::nil(),
2488            name: "sandbox-test".into(),
2489            workdir: "/tmp".into(),
2490            command: "claude".into(),
2491            description: None,
2492            status: SessionStatus::Active,
2493            exit_code: None,
2494            backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2495            output_snapshot: None,
2496            metadata: None,
2497            ink: None,
2498            intervention_code: None,
2499            intervention_reason: None,
2500            intervention_at: None,
2501            last_output_at: None,
2502            idle_since: None,
2503            idle_threshold_secs: None,
2504            worktree_path: None,
2505            runtime: Runtime::Docker,
2506            created_at: Utc::now(),
2507            updated_at: Utc::now(),
2508        }];
2509        let output = format_sessions(&sessions);
2510        assert!(output.contains("docker"));
2511    }
2512
2513    #[test]
2514    fn test_format_sessions_long_command_truncated() {
2515        use chrono::Utc;
2516        use pulpo_common::session::SessionStatus;
2517        use uuid::Uuid;
2518
2519        let sessions = vec![Session {
2520            id: Uuid::nil(),
2521            name: "test".into(),
2522            workdir: "/tmp".into(),
2523            command:
2524                "claude -p 'A very long command that exceeds fifty characters in total length here'"
2525                    .into(),
2526            description: None,
2527            status: SessionStatus::Ready,
2528            exit_code: None,
2529            backend_session_id: None,
2530            output_snapshot: None,
2531            metadata: None,
2532            ink: None,
2533            intervention_code: None,
2534            intervention_reason: None,
2535            intervention_at: None,
2536            last_output_at: None,
2537            idle_since: None,
2538            idle_threshold_secs: None,
2539            worktree_path: None,
2540            runtime: Runtime::Tmux,
2541            created_at: Utc::now(),
2542            updated_at: Utc::now(),
2543        }];
2544        let output = format_sessions(&sessions);
2545        assert!(output.contains("..."));
2546    }
2547
2548    #[test]
2549    fn test_format_sessions_worktree_indicator() {
2550        use chrono::Utc;
2551        use pulpo_common::session::SessionStatus;
2552        use uuid::Uuid;
2553
2554        let sessions = vec![Session {
2555            id: Uuid::nil(),
2556            name: "wt-task".into(),
2557            workdir: "/repo/.pulpo/worktrees/wt-task".into(),
2558            command: "claude".into(),
2559            description: None,
2560            status: SessionStatus::Active,
2561            exit_code: None,
2562            backend_session_id: None,
2563            output_snapshot: None,
2564            metadata: None,
2565            ink: None,
2566            intervention_code: None,
2567            intervention_reason: None,
2568            intervention_at: None,
2569            last_output_at: None,
2570            idle_since: None,
2571            idle_threshold_secs: None,
2572            worktree_path: Some("/repo/.pulpo/worktrees/wt-task".into()),
2573            runtime: Runtime::Tmux,
2574            created_at: Utc::now(),
2575            updated_at: Utc::now(),
2576        }];
2577        let output = format_sessions(&sessions);
2578        assert!(
2579            output.contains("[wt]"),
2580            "should show worktree indicator: {output}"
2581        );
2582        assert!(output.contains("wt-task [wt]"));
2583    }
2584
2585    #[test]
2586    fn test_format_sessions_pr_indicator() {
2587        use chrono::Utc;
2588        use pulpo_common::session::SessionStatus;
2589        use std::collections::HashMap;
2590        use uuid::Uuid;
2591
2592        let mut meta = HashMap::new();
2593        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2594        let sessions = vec![Session {
2595            id: Uuid::nil(),
2596            name: "pr-task".into(),
2597            workdir: "/tmp".into(),
2598            command: "claude".into(),
2599            description: None,
2600            status: SessionStatus::Active,
2601            exit_code: None,
2602            backend_session_id: None,
2603            output_snapshot: None,
2604            metadata: Some(meta),
2605            ink: None,
2606            intervention_code: None,
2607            intervention_reason: None,
2608            intervention_at: None,
2609            last_output_at: None,
2610            idle_since: None,
2611            idle_threshold_secs: None,
2612            worktree_path: None,
2613            runtime: Runtime::Tmux,
2614            created_at: Utc::now(),
2615            updated_at: Utc::now(),
2616        }];
2617        let output = format_sessions(&sessions);
2618        assert!(
2619            output.contains("[PR]"),
2620            "should show PR indicator: {output}"
2621        );
2622        assert!(output.contains("pr-task [PR]"));
2623    }
2624
2625    #[test]
2626    fn test_format_sessions_worktree_and_pr_indicator() {
2627        use chrono::Utc;
2628        use pulpo_common::session::SessionStatus;
2629        use std::collections::HashMap;
2630        use uuid::Uuid;
2631
2632        let mut meta = HashMap::new();
2633        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2634        let sessions = vec![Session {
2635            id: Uuid::nil(),
2636            name: "both-task".into(),
2637            workdir: "/tmp".into(),
2638            command: "claude".into(),
2639            description: None,
2640            status: SessionStatus::Active,
2641            exit_code: None,
2642            backend_session_id: None,
2643            output_snapshot: None,
2644            metadata: Some(meta),
2645            ink: None,
2646            intervention_code: None,
2647            intervention_reason: None,
2648            intervention_at: None,
2649            last_output_at: None,
2650            idle_since: None,
2651            idle_threshold_secs: None,
2652            worktree_path: Some("/repo/.pulpo/worktrees/both-task".into()),
2653            runtime: Runtime::Tmux,
2654            created_at: Utc::now(),
2655            updated_at: Utc::now(),
2656        }];
2657        let output = format_sessions(&sessions);
2658        assert!(
2659            output.contains("[wt] [PR]"),
2660            "should show both indicators: {output}"
2661        );
2662    }
2663
2664    #[test]
2665    fn test_format_sessions_no_pr_without_metadata() {
2666        use chrono::Utc;
2667        use pulpo_common::session::SessionStatus;
2668        use uuid::Uuid;
2669
2670        let sessions = vec![Session {
2671            id: Uuid::nil(),
2672            name: "no-pr".into(),
2673            workdir: "/tmp".into(),
2674            command: "claude".into(),
2675            description: None,
2676            status: SessionStatus::Active,
2677            exit_code: None,
2678            backend_session_id: None,
2679            output_snapshot: None,
2680            metadata: None,
2681            ink: None,
2682            intervention_code: None,
2683            intervention_reason: None,
2684            intervention_at: None,
2685            last_output_at: None,
2686            idle_since: None,
2687            idle_threshold_secs: None,
2688            worktree_path: None,
2689            runtime: Runtime::Tmux,
2690            created_at: Utc::now(),
2691            updated_at: Utc::now(),
2692        }];
2693        let output = format_sessions(&sessions);
2694        assert!(
2695            !output.contains("[PR]"),
2696            "should not show PR indicator: {output}"
2697        );
2698    }
2699
2700    #[test]
2701    fn test_format_nodes() {
2702        use pulpo_common::node::NodeInfo;
2703        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2704
2705        let resp = PeersResponse {
2706            local: NodeInfo {
2707                name: "mac-mini".into(),
2708                hostname: "h".into(),
2709                os: "macos".into(),
2710                arch: "arm64".into(),
2711                cpus: 8,
2712                memory_mb: 16384,
2713                gpu: None,
2714            },
2715            peers: vec![PeerInfo {
2716                name: "win-pc".into(),
2717                address: "win-pc:7433".into(),
2718                status: PeerStatus::Online,
2719                node_info: None,
2720                session_count: Some(3),
2721                source: PeerSource::Configured,
2722            }],
2723        };
2724        let output = format_nodes(&resp);
2725        assert!(output.contains("mac-mini"));
2726        assert!(output.contains("(local)"));
2727        assert!(output.contains("win-pc"));
2728        assert!(output.contains('3'));
2729    }
2730
2731    #[test]
2732    fn test_format_nodes_no_session_count() {
2733        use pulpo_common::node::NodeInfo;
2734        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2735
2736        let resp = PeersResponse {
2737            local: NodeInfo {
2738                name: "local".into(),
2739                hostname: "h".into(),
2740                os: "linux".into(),
2741                arch: "x86_64".into(),
2742                cpus: 4,
2743                memory_mb: 8192,
2744                gpu: None,
2745            },
2746            peers: vec![PeerInfo {
2747                name: "peer".into(),
2748                address: "peer:7433".into(),
2749                status: PeerStatus::Offline,
2750                node_info: None,
2751                session_count: None,
2752                source: PeerSource::Configured,
2753            }],
2754        };
2755        let output = format_nodes(&resp);
2756        assert!(output.contains("offline"));
2757        // No session count → shows "-"
2758        let lines: Vec<&str> = output.lines().collect();
2759        assert!(lines[2].contains('-'));
2760    }
2761
2762    #[tokio::test]
2763    async fn test_execute_resume_connection_refused() {
2764        let cli = Cli {
2765            node: "localhost:1".into(),
2766            token: None,
2767            command: Some(Commands::Resume {
2768                name: "test".into(),
2769            }),
2770            path: None,
2771        };
2772        let result = execute(&cli).await;
2773        let err = result.unwrap_err().to_string();
2774        assert!(err.contains("Could not connect to pulpod"));
2775    }
2776
2777    #[tokio::test]
2778    async fn test_execute_spawn_connection_refused() {
2779        let cli = Cli {
2780            node: "localhost:1".into(),
2781            token: None,
2782            command: Some(Commands::Spawn {
2783                name: Some("test".into()),
2784                workdir: Some("/tmp".into()),
2785                ink: None,
2786                description: None,
2787                detach: true,
2788                idle_threshold: None,
2789                auto: false,
2790                worktree: false,
2791                runtime: None,
2792                secret: vec![],
2793                command: vec!["test".into()],
2794            }),
2795            path: None,
2796        };
2797        let result = execute(&cli).await;
2798        let err = result.unwrap_err().to_string();
2799        assert!(err.contains("Could not connect to pulpod"));
2800    }
2801
2802    #[tokio::test]
2803    async fn test_execute_kill_connection_refused() {
2804        let cli = Cli {
2805            node: "localhost:1".into(),
2806            token: None,
2807            command: Some(Commands::Kill {
2808                name: "test".into(),
2809            }),
2810            path: None,
2811        };
2812        let result = execute(&cli).await;
2813        let err = result.unwrap_err().to_string();
2814        assert!(err.contains("Could not connect to pulpod"));
2815    }
2816
2817    #[tokio::test]
2818    async fn test_execute_delete_connection_refused() {
2819        let cli = Cli {
2820            node: "localhost:1".into(),
2821            token: None,
2822            command: Some(Commands::Delete {
2823                name: "test".into(),
2824            }),
2825            path: None,
2826        };
2827        let result = execute(&cli).await;
2828        let err = result.unwrap_err().to_string();
2829        assert!(err.contains("Could not connect to pulpod"));
2830    }
2831
2832    #[tokio::test]
2833    async fn test_execute_logs_connection_refused() {
2834        let cli = Cli {
2835            node: "localhost:1".into(),
2836            token: None,
2837            command: Some(Commands::Logs {
2838                name: "test".into(),
2839                lines: 50,
2840                follow: false,
2841            }),
2842            path: None,
2843        };
2844        let result = execute(&cli).await;
2845        let err = result.unwrap_err().to_string();
2846        assert!(err.contains("Could not connect to pulpod"));
2847    }
2848
2849    #[tokio::test]
2850    async fn test_friendly_error_connect() {
2851        // Make a request to a closed port to get a connect error
2852        let err = reqwest::Client::new()
2853            .get("http://127.0.0.1:1")
2854            .send()
2855            .await
2856            .unwrap_err();
2857        let friendly = friendly_error(&err, "test-node:1");
2858        let msg = friendly.to_string();
2859        assert!(
2860            msg.contains("Could not connect"),
2861            "Expected connect message, got: {msg}"
2862        );
2863    }
2864
2865    #[tokio::test]
2866    async fn test_friendly_error_other() {
2867        // A request to an invalid URL creates a builder error, not a connect error
2868        let err = reqwest::Client::new()
2869            .get("http://[::invalid::url")
2870            .send()
2871            .await
2872            .unwrap_err();
2873        let friendly = friendly_error(&err, "bad-host");
2874        let msg = friendly.to_string();
2875        assert!(
2876            msg.contains("Network error"),
2877            "Expected network error message, got: {msg}"
2878        );
2879        assert!(msg.contains("bad-host"));
2880    }
2881
2882    // -- Auth helper tests --
2883
2884    #[test]
2885    fn test_is_localhost_variants() {
2886        assert!(is_localhost("localhost:7433"));
2887        assert!(is_localhost("127.0.0.1:7433"));
2888        assert!(is_localhost("[::1]:7433"));
2889        assert!(is_localhost("::1"));
2890        assert!(is_localhost("localhost"));
2891        assert!(!is_localhost("mac-mini:7433"));
2892        assert!(!is_localhost("192.168.1.100:7433"));
2893    }
2894
2895    #[test]
2896    fn test_authed_get_with_token() {
2897        let client = reqwest::Client::new();
2898        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2899            .build()
2900            .unwrap();
2901        let auth = req
2902            .headers()
2903            .get("authorization")
2904            .unwrap()
2905            .to_str()
2906            .unwrap();
2907        assert_eq!(auth, "Bearer tok");
2908    }
2909
2910    #[test]
2911    fn test_authed_get_without_token() {
2912        let client = reqwest::Client::new();
2913        let req = authed_get(&client, "http://h:1/api".into(), None)
2914            .build()
2915            .unwrap();
2916        assert!(req.headers().get("authorization").is_none());
2917    }
2918
2919    #[test]
2920    fn test_authed_post_with_token() {
2921        let client = reqwest::Client::new();
2922        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2923            .build()
2924            .unwrap();
2925        let auth = req
2926            .headers()
2927            .get("authorization")
2928            .unwrap()
2929            .to_str()
2930            .unwrap();
2931        assert_eq!(auth, "Bearer secret");
2932    }
2933
2934    #[test]
2935    fn test_authed_post_without_token() {
2936        let client = reqwest::Client::new();
2937        let req = authed_post(&client, "http://h:1/api".into(), None)
2938            .build()
2939            .unwrap();
2940        assert!(req.headers().get("authorization").is_none());
2941    }
2942
2943    #[test]
2944    fn test_authed_delete_with_token() {
2945        let client = reqwest::Client::new();
2946        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2947            .build()
2948            .unwrap();
2949        let auth = req
2950            .headers()
2951            .get("authorization")
2952            .unwrap()
2953            .to_str()
2954            .unwrap();
2955        assert_eq!(auth, "Bearer del-tok");
2956    }
2957
2958    #[test]
2959    fn test_authed_delete_without_token() {
2960        let client = reqwest::Client::new();
2961        let req = authed_delete(&client, "http://h:1/api".into(), None)
2962            .build()
2963            .unwrap();
2964        assert!(req.headers().get("authorization").is_none());
2965    }
2966
2967    #[tokio::test]
2968    async fn test_resolve_token_explicit() {
2969        let client = reqwest::Client::new();
2970        let token =
2971            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2972        assert_eq!(token, Some("my-tok".into()));
2973    }
2974
2975    #[tokio::test]
2976    async fn test_resolve_token_remote_no_explicit() {
2977        let client = reqwest::Client::new();
2978        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2979        assert_eq!(token, None);
2980    }
2981
2982    #[tokio::test]
2983    async fn test_resolve_token_localhost_auto_discover() {
2984        use axum::{Json, Router, routing::get};
2985
2986        let app = Router::new().route(
2987            "/api/v1/auth/token",
2988            get(|| async {
2989                Json(AuthTokenResponse {
2990                    token: "discovered".into(),
2991                })
2992            }),
2993        );
2994        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2995        let addr = listener.local_addr().unwrap();
2996        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2997
2998        let node = format!("localhost:{}", addr.port());
2999        let base = base_url(&node);
3000        let client = reqwest::Client::new();
3001        let token = resolve_token(&client, &base, &node, None).await;
3002        assert_eq!(token, Some("discovered".into()));
3003    }
3004
3005    #[tokio::test]
3006    async fn test_discover_token_empty_returns_none() {
3007        use axum::{Json, Router, routing::get};
3008
3009        let app = Router::new().route(
3010            "/api/v1/auth/token",
3011            get(|| async {
3012                Json(AuthTokenResponse {
3013                    token: String::new(),
3014                })
3015            }),
3016        );
3017        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3018        let addr = listener.local_addr().unwrap();
3019        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3020
3021        let base = format!("http://127.0.0.1:{}", addr.port());
3022        let client = reqwest::Client::new();
3023        assert_eq!(discover_token(&client, &base).await, None);
3024    }
3025
3026    #[tokio::test]
3027    async fn test_discover_token_unreachable_returns_none() {
3028        let client = reqwest::Client::new();
3029        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3030    }
3031
3032    #[test]
3033    fn test_cli_parse_with_token() {
3034        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3035        assert_eq!(cli.token, Some("my-secret".into()));
3036    }
3037
3038    #[test]
3039    fn test_cli_parse_without_token() {
3040        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3041        assert_eq!(cli.token, None);
3042    }
3043
3044    #[tokio::test]
3045    async fn test_execute_with_explicit_token_sends_header() {
3046        use axum::{Router, extract::Request, http::StatusCode, routing::get};
3047
3048        let app = Router::new().route(
3049            "/api/v1/sessions",
3050            get(|req: Request| async move {
3051                let auth = req
3052                    .headers()
3053                    .get("authorization")
3054                    .and_then(|v| v.to_str().ok())
3055                    .unwrap_or("");
3056                assert_eq!(auth, "Bearer test-token");
3057                (StatusCode::OK, "[]".to_owned())
3058            }),
3059        );
3060        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3061        let addr = listener.local_addr().unwrap();
3062        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3063        let node = format!("127.0.0.1:{}", addr.port());
3064
3065        let cli = Cli {
3066            node,
3067            token: Some("test-token".into()),
3068            command: Some(Commands::List { all: false }),
3069            path: None,
3070        };
3071        let result = execute(&cli).await.unwrap();
3072        assert_eq!(result, "No sessions.");
3073    }
3074
3075    // -- Interventions tests --
3076
3077    #[test]
3078    fn test_cli_parse_interventions() {
3079        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3080        assert!(matches!(
3081            &cli.command,
3082            Some(Commands::Interventions { name }) if name == "my-session"
3083        ));
3084    }
3085
3086    #[test]
3087    fn test_format_interventions_empty() {
3088        assert_eq!(format_interventions(&[]), "No intervention events.");
3089    }
3090
3091    #[test]
3092    fn test_format_interventions_with_data() {
3093        let events = vec![
3094            InterventionEventResponse {
3095                id: 1,
3096                session_id: "sess-1".into(),
3097                code: None,
3098                reason: "Memory exceeded threshold".into(),
3099                created_at: "2026-01-01T00:00:00Z".into(),
3100            },
3101            InterventionEventResponse {
3102                id: 2,
3103                session_id: "sess-1".into(),
3104                code: None,
3105                reason: "Idle for 10 minutes".into(),
3106                created_at: "2026-01-02T00:00:00Z".into(),
3107            },
3108        ];
3109        let output = format_interventions(&events);
3110        assert!(output.contains("ID"));
3111        assert!(output.contains("TIMESTAMP"));
3112        assert!(output.contains("REASON"));
3113        assert!(output.contains("Memory exceeded threshold"));
3114        assert!(output.contains("Idle for 10 minutes"));
3115        assert!(output.contains("2026-01-01T00:00:00Z"));
3116    }
3117
3118    #[tokio::test]
3119    async fn test_execute_interventions_empty() {
3120        let node = start_test_server().await;
3121        let cli = Cli {
3122            node,
3123            token: None,
3124            command: Some(Commands::Interventions {
3125                name: "my-session".into(),
3126            }),
3127            path: None,
3128        };
3129        let result = execute(&cli).await.unwrap();
3130        assert_eq!(result, "No intervention events.");
3131    }
3132
3133    #[tokio::test]
3134    async fn test_execute_interventions_with_data() {
3135        use axum::{Router, routing::get};
3136
3137        let app = Router::new().route(
3138            "/api/v1/sessions/{id}/interventions",
3139            get(|| async {
3140                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3141                    .to_owned()
3142            }),
3143        );
3144        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3145        let addr = listener.local_addr().unwrap();
3146        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3147        let node = format!("127.0.0.1:{}", addr.port());
3148
3149        let cli = Cli {
3150            node,
3151            token: None,
3152            command: Some(Commands::Interventions {
3153                name: "test".into(),
3154            }),
3155            path: None,
3156        };
3157        let result = execute(&cli).await.unwrap();
3158        assert!(result.contains("OOM"));
3159        assert!(result.contains("2026-01-01T00:00:00Z"));
3160    }
3161
3162    #[tokio::test]
3163    async fn test_execute_interventions_connection_refused() {
3164        let cli = Cli {
3165            node: "localhost:1".into(),
3166            token: None,
3167            command: Some(Commands::Interventions {
3168                name: "test".into(),
3169            }),
3170            path: None,
3171        };
3172        let result = execute(&cli).await;
3173        let err = result.unwrap_err().to_string();
3174        assert!(err.contains("Could not connect to pulpod"));
3175    }
3176
3177    // -- Attach command tests --
3178
3179    #[test]
3180    fn test_build_attach_command_tmux() {
3181        let cmd = build_attach_command("my-session");
3182        assert_eq!(cmd.get_program(), "tmux");
3183        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3184        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3185    }
3186
3187    #[test]
3188    fn test_build_attach_command_docker() {
3189        let cmd = build_attach_command("docker:pulpo-my-task");
3190        assert_eq!(cmd.get_program(), "docker");
3191        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3192        assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3193    }
3194
3195    #[test]
3196    fn test_cli_parse_attach() {
3197        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3198        assert!(matches!(
3199            &cli.command,
3200            Some(Commands::Attach { name }) if name == "my-session"
3201        ));
3202    }
3203
3204    #[test]
3205    fn test_cli_parse_attach_alias() {
3206        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3207        assert!(matches!(
3208            &cli.command,
3209            Some(Commands::Attach { name }) if name == "my-session"
3210        ));
3211    }
3212
3213    #[tokio::test]
3214    async fn test_execute_attach_success() {
3215        let node = start_test_server().await;
3216        let cli = Cli {
3217            node,
3218            token: None,
3219            command: Some(Commands::Attach {
3220                name: "test-session".into(),
3221            }),
3222            path: None,
3223        };
3224        let result = execute(&cli).await.unwrap();
3225        assert!(result.contains("Detached from session test-session"));
3226    }
3227
3228    #[tokio::test]
3229    async fn test_execute_attach_with_backend_session_id() {
3230        use axum::{Router, routing::get};
3231        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3232        let app = Router::new().route(
3233            "/api/v1/sessions/{id}",
3234            get(move || async move { session_json.to_owned() }),
3235        );
3236        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3237        let addr = listener.local_addr().unwrap();
3238        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3239
3240        let cli = Cli {
3241            node: format!("127.0.0.1:{}", addr.port()),
3242            token: None,
3243            command: Some(Commands::Attach {
3244                name: "my-session".into(),
3245            }),
3246            path: None,
3247        };
3248        let result = execute(&cli).await.unwrap();
3249        assert!(result.contains("Detached from session my-session"));
3250    }
3251
3252    #[tokio::test]
3253    async fn test_execute_attach_connection_refused() {
3254        let cli = Cli {
3255            node: "localhost:1".into(),
3256            token: None,
3257            command: Some(Commands::Attach {
3258                name: "test-session".into(),
3259            }),
3260            path: None,
3261        };
3262        let result = execute(&cli).await;
3263        let err = result.unwrap_err().to_string();
3264        assert!(err.contains("Could not connect to pulpod"));
3265    }
3266
3267    #[tokio::test]
3268    async fn test_execute_attach_error_response() {
3269        use axum::{Router, http::StatusCode, routing::get};
3270        let app = Router::new().route(
3271            "/api/v1/sessions/{id}",
3272            get(|| async {
3273                (
3274                    StatusCode::NOT_FOUND,
3275                    r#"{"error":"session not found"}"#.to_owned(),
3276                )
3277            }),
3278        );
3279        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3280        let addr = listener.local_addr().unwrap();
3281        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3282
3283        let cli = Cli {
3284            node: format!("127.0.0.1:{}", addr.port()),
3285            token: None,
3286            command: Some(Commands::Attach {
3287                name: "nonexistent".into(),
3288            }),
3289            path: None,
3290        };
3291        let result = execute(&cli).await;
3292        let err = result.unwrap_err().to_string();
3293        assert!(err.contains("session not found"));
3294    }
3295
3296    #[tokio::test]
3297    async fn test_execute_attach_stale_session() {
3298        use axum::{Router, routing::get};
3299        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3300        let app = Router::new().route(
3301            "/api/v1/sessions/{id}",
3302            get(move || async move { session_json.to_owned() }),
3303        );
3304        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3305        let addr = listener.local_addr().unwrap();
3306        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3307
3308        let cli = Cli {
3309            node: format!("127.0.0.1:{}", addr.port()),
3310            token: None,
3311            command: Some(Commands::Attach {
3312                name: "stale-sess".into(),
3313            }),
3314            path: None,
3315        };
3316        let result = execute(&cli).await;
3317        let err = result.unwrap_err().to_string();
3318        assert!(err.contains("lost"));
3319        assert!(err.contains("pulpo resume"));
3320    }
3321
3322    #[tokio::test]
3323    async fn test_execute_attach_dead_session() {
3324        use axum::{Router, routing::get};
3325        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3326        let app = Router::new().route(
3327            "/api/v1/sessions/{id}",
3328            get(move || async move { session_json.to_owned() }),
3329        );
3330        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3331        let addr = listener.local_addr().unwrap();
3332        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3333
3334        let cli = Cli {
3335            node: format!("127.0.0.1:{}", addr.port()),
3336            token: None,
3337            command: Some(Commands::Attach {
3338                name: "dead-sess".into(),
3339            }),
3340            path: None,
3341        };
3342        let result = execute(&cli).await;
3343        let err = result.unwrap_err().to_string();
3344        assert!(err.contains("killed"));
3345        assert!(err.contains("cannot attach"));
3346    }
3347
3348    // -- Alias parse tests --
3349
3350    #[test]
3351    fn test_cli_parse_alias_spawn() {
3352        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3353        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3354    }
3355
3356    #[test]
3357    fn test_cli_parse_alias_list() {
3358        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3359        assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3360    }
3361
3362    #[test]
3363    fn test_cli_parse_list_all() {
3364        let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3365        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3366
3367        let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3368        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3369    }
3370
3371    #[test]
3372    fn test_cli_parse_alias_logs() {
3373        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3374        assert!(matches!(
3375            &cli.command,
3376            Some(Commands::Logs { name, .. }) if name == "my-session"
3377        ));
3378    }
3379
3380    #[test]
3381    fn test_cli_parse_alias_kill() {
3382        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3383        assert!(matches!(
3384            &cli.command,
3385            Some(Commands::Kill { name }) if name == "my-session"
3386        ));
3387    }
3388
3389    #[test]
3390    fn test_cli_parse_alias_delete() {
3391        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
3392        assert!(matches!(
3393            &cli.command,
3394            Some(Commands::Delete { name }) if name == "my-session"
3395        ));
3396    }
3397
3398    #[test]
3399    fn test_cli_parse_alias_resume() {
3400        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3401        assert!(matches!(
3402            &cli.command,
3403            Some(Commands::Resume { name }) if name == "my-session"
3404        ));
3405    }
3406
3407    #[test]
3408    fn test_cli_parse_alias_nodes() {
3409        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3410        assert!(matches!(&cli.command, Some(Commands::Nodes)));
3411    }
3412
3413    #[test]
3414    fn test_cli_parse_alias_interventions() {
3415        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3416        assert!(matches!(
3417            &cli.command,
3418            Some(Commands::Interventions { name }) if name == "my-session"
3419        ));
3420    }
3421
3422    #[test]
3423    fn test_api_error_json() {
3424        let err = api_error("{\"error\":\"session not found: foo\"}");
3425        assert_eq!(err.to_string(), "session not found: foo");
3426    }
3427
3428    #[test]
3429    fn test_api_error_plain_text() {
3430        let err = api_error("plain text error");
3431        assert_eq!(err.to_string(), "plain text error");
3432    }
3433
3434    // -- diff_output tests --
3435
3436    #[test]
3437    fn test_diff_output_empty_prev() {
3438        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3439    }
3440
3441    #[test]
3442    fn test_diff_output_identical() {
3443        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3444    }
3445
3446    #[test]
3447    fn test_diff_output_new_lines_appended() {
3448        let prev = "line1\nline2";
3449        let new = "line1\nline2\nline3\nline4";
3450        assert_eq!(diff_output(prev, new), "line3\nline4");
3451    }
3452
3453    #[test]
3454    fn test_diff_output_scrolled_window() {
3455        // Window of 3 lines: old lines scroll off top, new appear at bottom
3456        let prev = "line1\nline2\nline3";
3457        let new = "line2\nline3\nline4";
3458        assert_eq!(diff_output(prev, new), "line4");
3459    }
3460
3461    #[test]
3462    fn test_diff_output_completely_different() {
3463        let prev = "aaa\nbbb";
3464        let new = "xxx\nyyy";
3465        assert_eq!(diff_output(prev, new), "xxx\nyyy");
3466    }
3467
3468    #[test]
3469    fn test_diff_output_last_line_matches_but_overlap_fails() {
3470        // Last line of prev appears in new but preceding lines don't match
3471        let prev = "aaa\ncommon";
3472        let new = "zzz\ncommon\nnew_line";
3473        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
3474        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
3475        // Falls through, no verified overlap, so returns everything
3476        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3477    }
3478
3479    #[test]
3480    fn test_diff_output_new_empty() {
3481        assert_eq!(diff_output("line1", ""), "");
3482    }
3483
3484    // -- follow_logs tests --
3485
3486    /// Start a test server that simulates evolving output and session status transitions.
3487    /// Start a test server that simulates evolving output with agent exit marker.
3488    async fn start_follow_test_server() -> String {
3489        use axum::{Router, extract::Path, extract::Query, routing::get};
3490        use std::sync::Arc;
3491        use std::sync::atomic::{AtomicUsize, Ordering};
3492
3493        let call_count = Arc::new(AtomicUsize::new(0));
3494        let output_count = call_count.clone();
3495
3496        let app = Router::new()
3497            .route(
3498                "/api/v1/sessions/{id}/output",
3499                get(
3500                    move |_path: Path<String>,
3501                          _query: Query<std::collections::HashMap<String, String>>| {
3502                        let count = output_count.clone();
3503                        async move {
3504                            let n = count.fetch_add(1, Ordering::SeqCst);
3505                            let output = match n {
3506                                0 => "line1\nline2".to_owned(),
3507                                1 => "line1\nline2\nline3".to_owned(),
3508                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3509                            };
3510                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3511                        }
3512                    },
3513                ),
3514            )
3515            .route(
3516                "/api/v1/sessions/{id}",
3517                get(|_path: Path<String>| async {
3518                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3519                }),
3520            );
3521
3522        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3523        let addr = listener.local_addr().unwrap();
3524        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3525        format!("http://127.0.0.1:{}", addr.port())
3526    }
3527
3528    #[tokio::test]
3529    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3530        let base = start_follow_test_server().await;
3531        let client = reqwest::Client::new();
3532        let mut buf = Vec::new();
3533
3534        follow_logs(&client, &base, "test", 100, None, &mut buf)
3535            .await
3536            .unwrap();
3537
3538        let output = String::from_utf8(buf).unwrap();
3539        // Should contain initial output + new lines + agent exit marker
3540        assert!(output.contains("line1"));
3541        assert!(output.contains("line2"));
3542        assert!(output.contains("line3"));
3543        assert!(output.contains("line4"));
3544        assert!(output.contains("[pulpo] Agent exited"));
3545    }
3546
3547    #[tokio::test]
3548    async fn test_execute_logs_follow_success() {
3549        let base = start_follow_test_server().await;
3550        // Extract host:port from http://127.0.0.1:PORT
3551        let node = base.strip_prefix("http://").unwrap().to_owned();
3552
3553        let cli = Cli {
3554            node,
3555            token: None,
3556            command: Some(Commands::Logs {
3557                name: "test".into(),
3558                lines: 100,
3559                follow: true,
3560            }),
3561            path: None,
3562        };
3563        // execute() with follow writes to stdout and returns empty string
3564        let result = execute(&cli).await.unwrap();
3565        assert_eq!(result, "");
3566    }
3567
3568    #[tokio::test]
3569    async fn test_execute_logs_follow_connection_refused() {
3570        let cli = Cli {
3571            node: "localhost:1".into(),
3572            token: None,
3573            command: Some(Commands::Logs {
3574                name: "test".into(),
3575                lines: 50,
3576                follow: true,
3577            }),
3578            path: None,
3579        };
3580        let result = execute(&cli).await;
3581        let err = result.unwrap_err().to_string();
3582        assert!(
3583            err.contains("Could not connect to pulpod"),
3584            "Expected friendly error, got: {err}"
3585        );
3586    }
3587
3588    #[tokio::test]
3589    async fn test_follow_logs_exits_on_dead() {
3590        use axum::{Router, extract::Path, extract::Query, routing::get};
3591
3592        let app = Router::new()
3593            .route(
3594                "/api/v1/sessions/{id}/output",
3595                get(
3596                    |_path: Path<String>,
3597                     _query: Query<std::collections::HashMap<String, String>>| async {
3598                        r#"{"output":"some output"}"#.to_owned()
3599                    },
3600                ),
3601            )
3602            .route(
3603                "/api/v1/sessions/{id}",
3604                get(|_path: Path<String>| async {
3605                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3606                }),
3607            );
3608
3609        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3610        let addr = listener.local_addr().unwrap();
3611        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3612        let base = format!("http://127.0.0.1:{}", addr.port());
3613
3614        let client = reqwest::Client::new();
3615        let mut buf = Vec::new();
3616        follow_logs(&client, &base, "test", 100, None, &mut buf)
3617            .await
3618            .unwrap();
3619
3620        let output = String::from_utf8(buf).unwrap();
3621        assert!(output.contains("some output"));
3622    }
3623
3624    #[tokio::test]
3625    async fn test_follow_logs_exits_on_stale() {
3626        use axum::{Router, extract::Path, extract::Query, routing::get};
3627
3628        let app = Router::new()
3629            .route(
3630                "/api/v1/sessions/{id}/output",
3631                get(
3632                    |_path: Path<String>,
3633                     _query: Query<std::collections::HashMap<String, String>>| async {
3634                        r#"{"output":"stale output"}"#.to_owned()
3635                    },
3636                ),
3637            )
3638            .route(
3639                "/api/v1/sessions/{id}",
3640                get(|_path: Path<String>| async {
3641                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3642                }),
3643            );
3644
3645        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3646        let addr = listener.local_addr().unwrap();
3647        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3648        let base = format!("http://127.0.0.1:{}", addr.port());
3649
3650        let client = reqwest::Client::new();
3651        let mut buf = Vec::new();
3652        follow_logs(&client, &base, "test", 100, None, &mut buf)
3653            .await
3654            .unwrap();
3655
3656        let output = String::from_utf8(buf).unwrap();
3657        assert!(output.contains("stale output"));
3658    }
3659
3660    #[tokio::test]
3661    async fn test_execute_logs_follow_non_reqwest_error() {
3662        use axum::{Router, extract::Path, extract::Query, routing::get};
3663
3664        // Session status endpoint returns invalid JSON to trigger a serde error
3665        let app = Router::new()
3666            .route(
3667                "/api/v1/sessions/{id}/output",
3668                get(
3669                    |_path: Path<String>,
3670                     _query: Query<std::collections::HashMap<String, String>>| async {
3671                        r#"{"output":"initial"}"#.to_owned()
3672                    },
3673                ),
3674            )
3675            .route(
3676                "/api/v1/sessions/{id}",
3677                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3678            );
3679
3680        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3681        let addr = listener.local_addr().unwrap();
3682        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3683        let node = format!("127.0.0.1:{}", addr.port());
3684
3685        let cli = Cli {
3686            node,
3687            token: None,
3688            command: Some(Commands::Logs {
3689                name: "test".into(),
3690                lines: 100,
3691                follow: true,
3692            }),
3693            path: None,
3694        };
3695        let err = execute(&cli).await.unwrap_err();
3696        // serde_json error, not a reqwest error — hits the Err(other) branch
3697        let msg = err.to_string();
3698        assert!(
3699            msg.contains("expected ident"),
3700            "Expected serde parse error, got: {msg}"
3701        );
3702    }
3703
3704    #[tokio::test]
3705    async fn test_fetch_session_status_connection_error() {
3706        let client = reqwest::Client::new();
3707        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3708        assert!(result.is_err());
3709    }
3710
3711    // -- Schedule tests --
3712
3713    #[test]
3714    fn test_format_schedules_empty() {
3715        assert_eq!(format_schedules(&[]), "No schedules.");
3716    }
3717
3718    #[test]
3719    fn test_format_schedules_with_entries() {
3720        let schedules = vec![serde_json::json!({
3721            "name": "nightly",
3722            "cron": "0 3 * * *",
3723            "enabled": true,
3724            "last_run_at": null,
3725            "target_node": null
3726        })];
3727        let output = format_schedules(&schedules);
3728        assert!(output.contains("nightly"));
3729        assert!(output.contains("0 3 * * *"));
3730        assert!(output.contains("local"));
3731        assert!(output.contains("yes"));
3732        assert!(output.contains('-'));
3733    }
3734
3735    #[test]
3736    fn test_format_schedules_disabled_entry() {
3737        let schedules = vec![serde_json::json!({
3738            "name": "weekly",
3739            "cron": "0 0 * * 0",
3740            "enabled": false,
3741            "last_run_at": "2026-03-18T03:00:00Z",
3742            "target_node": "gpu-box"
3743        })];
3744        let output = format_schedules(&schedules);
3745        assert!(output.contains("weekly"));
3746        assert!(output.contains("no"));
3747        assert!(output.contains("gpu-box"));
3748        assert!(output.contains("2026-03-18T03:00"));
3749    }
3750
3751    #[test]
3752    fn test_format_schedules_header() {
3753        let schedules = vec![serde_json::json!({
3754            "name": "test",
3755            "cron": "* * * * *",
3756            "enabled": true,
3757            "last_run_at": null,
3758            "target_node": null
3759        })];
3760        let output = format_schedules(&schedules);
3761        assert!(output.contains("NAME"));
3762        assert!(output.contains("CRON"));
3763        assert!(output.contains("ENABLED"));
3764        assert!(output.contains("LAST RUN"));
3765        assert!(output.contains("NODE"));
3766    }
3767
3768    // -- Schedule CLI parse tests --
3769
3770    #[test]
3771    fn test_cli_parse_schedule_add() {
3772        let cli = Cli::try_parse_from([
3773            "pulpo",
3774            "schedule",
3775            "add",
3776            "nightly",
3777            "0 3 * * *",
3778            "--workdir",
3779            "/repo",
3780            "--",
3781            "claude",
3782            "-p",
3783            "review",
3784        ])
3785        .unwrap();
3786        assert!(matches!(
3787            &cli.command,
3788            Some(Commands::Schedule {
3789                action: ScheduleAction::Add { name, cron, .. }
3790            }) if name == "nightly" && cron == "0 3 * * *"
3791        ));
3792    }
3793
3794    #[test]
3795    fn test_cli_parse_schedule_add_with_node() {
3796        let cli = Cli::try_parse_from([
3797            "pulpo",
3798            "schedule",
3799            "add",
3800            "nightly",
3801            "0 3 * * *",
3802            "--workdir",
3803            "/repo",
3804            "--node",
3805            "gpu-box",
3806            "--",
3807            "claude",
3808        ])
3809        .unwrap();
3810        assert!(matches!(
3811            &cli.command,
3812            Some(Commands::Schedule {
3813                action: ScheduleAction::Add { node, .. }
3814            }) if node.as_deref() == Some("gpu-box")
3815        ));
3816    }
3817
3818    #[test]
3819    fn test_cli_parse_schedule_add_install_alias() {
3820        let cli =
3821            Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3822        assert!(matches!(
3823            &cli.command,
3824            Some(Commands::Schedule {
3825                action: ScheduleAction::Add { name, .. }
3826            }) if name == "nightly"
3827        ));
3828    }
3829
3830    #[test]
3831    fn test_cli_parse_schedule_list() {
3832        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3833        assert!(matches!(
3834            &cli.command,
3835            Some(Commands::Schedule {
3836                action: ScheduleAction::List
3837            })
3838        ));
3839    }
3840
3841    #[test]
3842    fn test_cli_parse_schedule_remove() {
3843        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3844        assert!(matches!(
3845            &cli.command,
3846            Some(Commands::Schedule {
3847                action: ScheduleAction::Remove { name }
3848            }) if name == "nightly"
3849        ));
3850    }
3851
3852    #[test]
3853    fn test_cli_parse_schedule_pause() {
3854        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3855        assert!(matches!(
3856            &cli.command,
3857            Some(Commands::Schedule {
3858                action: ScheduleAction::Pause { name }
3859            }) if name == "nightly"
3860        ));
3861    }
3862
3863    #[test]
3864    fn test_cli_parse_schedule_resume() {
3865        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3866        assert!(matches!(
3867            &cli.command,
3868            Some(Commands::Schedule {
3869                action: ScheduleAction::Resume { name }
3870            }) if name == "nightly"
3871        ));
3872    }
3873
3874    #[test]
3875    fn test_cli_parse_schedule_alias() {
3876        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3877        assert!(matches!(
3878            &cli.command,
3879            Some(Commands::Schedule {
3880                action: ScheduleAction::List
3881            })
3882        ));
3883    }
3884
3885    #[test]
3886    fn test_cli_parse_schedule_list_alias() {
3887        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3888        assert!(matches!(
3889            &cli.command,
3890            Some(Commands::Schedule {
3891                action: ScheduleAction::List
3892            })
3893        ));
3894    }
3895
3896    #[test]
3897    fn test_cli_parse_schedule_remove_alias() {
3898        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3899        assert!(matches!(
3900            &cli.command,
3901            Some(Commands::Schedule {
3902                action: ScheduleAction::Remove { name }
3903            }) if name == "nightly"
3904        ));
3905    }
3906
3907    #[tokio::test]
3908    async fn test_execute_schedule_list_via_execute() {
3909        let node = start_test_server().await;
3910        let cli = Cli {
3911            node,
3912            token: None,
3913            command: Some(Commands::Schedule {
3914                action: ScheduleAction::List,
3915            }),
3916            path: None,
3917        };
3918        let result = execute(&cli).await.unwrap();
3919        // Under coverage, execute_schedule is a stub that returns empty string
3920        #[cfg(coverage)]
3921        assert!(result.is_empty());
3922        #[cfg(not(coverage))]
3923        assert_eq!(result, "No schedules.");
3924    }
3925
3926    #[test]
3927    fn test_schedule_action_debug() {
3928        let action = ScheduleAction::List;
3929        assert_eq!(format!("{action:?}"), "List");
3930    }
3931
3932    #[test]
3933    fn test_cli_parse_send_alias() {
3934        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3935        assert!(matches!(
3936            &cli.command,
3937            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3938        ));
3939    }
3940
3941    #[test]
3942    fn test_cli_parse_spawn_no_name() {
3943        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3944        assert!(matches!(
3945            &cli.command,
3946            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3947        ));
3948    }
3949
3950    #[test]
3951    fn test_cli_parse_spawn_optional_name_with_command() {
3952        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3953        assert!(matches!(
3954            &cli.command,
3955            Some(Commands::Spawn { name, command, .. })
3956                if name.is_none() && command == &["echo", "hello"]
3957        ));
3958    }
3959
3960    #[test]
3961    fn test_cli_parse_path_shortcut() {
3962        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3963        assert!(cli.command.is_none());
3964        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3965    }
3966
3967    #[test]
3968    fn test_cli_parse_no_args() {
3969        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3970        assert!(cli.command.is_none());
3971        assert!(cli.path.is_none());
3972    }
3973
3974    #[test]
3975    fn test_derive_session_name_simple() {
3976        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3977    }
3978
3979    #[test]
3980    fn test_derive_session_name_with_special_chars() {
3981        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
3982    }
3983
3984    #[test]
3985    fn test_derive_session_name_root() {
3986        assert_eq!(derive_session_name("/"), "session");
3987    }
3988
3989    #[test]
3990    fn test_derive_session_name_dots() {
3991        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
3992    }
3993
3994    #[test]
3995    fn test_resolve_path_absolute() {
3996        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
3997    }
3998
3999    #[test]
4000    fn test_resolve_path_relative() {
4001        let resolved = resolve_path("my-repo");
4002        assert!(resolved.ends_with("my-repo"));
4003        assert!(resolved.starts_with('/'));
4004    }
4005
4006    #[tokio::test]
4007    async fn test_execute_no_args_shows_help() {
4008        let node = start_test_server().await;
4009        let cli = Cli {
4010            node,
4011            token: None,
4012            path: None,
4013            command: None,
4014        };
4015        let result = execute(&cli).await.unwrap();
4016        assert!(
4017            result.is_empty(),
4018            "no-args should return empty string after printing help"
4019        );
4020    }
4021
4022    #[tokio::test]
4023    async fn test_execute_path_shortcut() {
4024        let node = start_test_server().await;
4025        let cli = Cli {
4026            node,
4027            token: None,
4028            path: Some("/tmp".into()),
4029            command: None,
4030        };
4031        let result = execute(&cli).await.unwrap();
4032        assert!(result.contains("Detached from session"));
4033    }
4034
4035    #[tokio::test]
4036    async fn test_deduplicate_session_name_no_conflict() {
4037        // Connection refused → falls through to "name not taken" path
4038        let base = "http://127.0.0.1:1";
4039        let client = reqwest::Client::new();
4040        let name = deduplicate_session_name(&client, base, "fresh", None).await;
4041        assert_eq!(name, "fresh");
4042    }
4043
4044    #[tokio::test]
4045    async fn test_deduplicate_session_name_with_conflict() {
4046        use axum::{Router, routing::get};
4047        use std::sync::atomic::{AtomicU32, Ordering};
4048
4049        let call_count = std::sync::Arc::new(AtomicU32::new(0));
4050        let counter = call_count.clone();
4051        let app = Router::new()
4052            .route(
4053                "/api/v1/sessions/{id}",
4054                get(move || {
4055                    let c = counter.clone();
4056                    async move {
4057                        let n = c.fetch_add(1, Ordering::SeqCst);
4058                        if n == 0 {
4059                            // First call (base name) → exists
4060                            (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4061                        } else {
4062                            // Suffixed name → not found
4063                            (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4064                        }
4065                    }
4066                }),
4067            )
4068            .route(
4069                "/api/v1/peers",
4070                get(|| async {
4071                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4072                }),
4073            );
4074        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4075        let addr = listener.local_addr().unwrap();
4076        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4077        let base = format!("http://127.0.0.1:{}", addr.port());
4078        let client = reqwest::Client::new();
4079        let name = deduplicate_session_name(&client, &base, "repo", None).await;
4080        assert_eq!(name, "repo-2");
4081    }
4082
4083    // -- Node resolution tests --
4084
4085    #[test]
4086    fn test_node_needs_resolution() {
4087        assert!(!node_needs_resolution("localhost:7433"));
4088        assert!(!node_needs_resolution("mac-mini:7433"));
4089        assert!(!node_needs_resolution("10.0.0.1:7433"));
4090        assert!(!node_needs_resolution("[::1]:7433"));
4091        assert!(node_needs_resolution("mac-mini"));
4092        assert!(node_needs_resolution("linux-server"));
4093        assert!(node_needs_resolution("localhost"));
4094    }
4095
4096    #[tokio::test]
4097    async fn test_resolve_node_with_port() {
4098        let client = reqwest::Client::new();
4099        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4100        assert_eq!(addr, "mac-mini:7433");
4101        assert!(token.is_none());
4102    }
4103
4104    #[tokio::test]
4105    async fn test_resolve_node_fallback_appends_port() {
4106        // No local daemon running on localhost:7433, so peer lookup fails
4107        // and it falls back to appending :7433
4108        let client = reqwest::Client::new();
4109        let (addr, token) = resolve_node(&client, "unknown-host").await;
4110        assert_eq!(addr, "unknown-host:7433");
4111        assert!(token.is_none());
4112    }
4113
4114    #[cfg(not(coverage))]
4115    #[tokio::test]
4116    async fn test_resolve_node_finds_peer() {
4117        use axum::{Router, routing::get};
4118
4119        let app = Router::new()
4120            .route(
4121                "/api/v1/peers",
4122                get(|| async {
4123                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4124                }),
4125            )
4126            .route(
4127                "/api/v1/config",
4128                get(|| async {
4129                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4130                }),
4131            );
4132
4133        // Port 7433 may be in use; skip test if so
4134        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4135            return;
4136        };
4137        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4138
4139        let client = reqwest::Client::new();
4140        let (addr, token) = resolve_node(&client, "mac-mini").await;
4141        assert_eq!(addr, "10.0.0.5:7433");
4142        assert_eq!(token, Some("peer-secret".into()));
4143    }
4144
4145    #[cfg(not(coverage))]
4146    #[tokio::test]
4147    async fn test_resolve_node_peer_no_token() {
4148        use axum::{Router, routing::get};
4149
4150        let app = Router::new()
4151            .route(
4152                "/api/v1/peers",
4153                get(|| async {
4154                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4155                }),
4156            )
4157            .route(
4158                "/api/v1/config",
4159                get(|| async {
4160                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4161                }),
4162            );
4163
4164        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4165            return; // Port in use, skip
4166        };
4167        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4168
4169        let client = reqwest::Client::new();
4170        let (addr, token) = resolve_node(&client, "test-peer").await;
4171        assert_eq!(addr, "10.0.0.9:7433");
4172        assert!(token.is_none()); // Simple peer entry has no token
4173    }
4174
4175    #[tokio::test]
4176    async fn test_execute_with_peer_name_resolution() {
4177        // When node doesn't contain ':', resolve_node is called.
4178        // Since there's no local daemon on port 7433, it falls back to appending :7433.
4179        // The connection to the fallback address will fail, giving us a connection error.
4180        let cli = Cli {
4181            node: "nonexistent-peer".into(),
4182            token: None,
4183            command: Some(Commands::List { all: false }),
4184            path: None,
4185        };
4186        let result = execute(&cli).await;
4187        // Should try to connect to nonexistent-peer:7433 and fail
4188        assert!(result.is_err());
4189    }
4190
4191    // -- Auto node selection tests --
4192
4193    #[test]
4194    fn test_cli_parse_spawn_auto() {
4195        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4196        assert!(matches!(
4197            &cli.command,
4198            Some(Commands::Spawn { auto, .. }) if *auto
4199        ));
4200    }
4201
4202    #[test]
4203    fn test_cli_parse_spawn_auto_default() {
4204        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4205        assert!(matches!(
4206            &cli.command,
4207            Some(Commands::Spawn { auto, .. }) if !auto
4208        ));
4209    }
4210
4211    #[tokio::test]
4212    async fn test_select_best_node_coverage_stub() {
4213        // Exercise the coverage stub (or real function in non-coverage builds)
4214        let client = reqwest::Client::new();
4215        // In coverage builds, the stub returns ("localhost:7433", "local")
4216        // In non-coverage builds, this fails because no server is running — that's OK
4217        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4218    }
4219
4220    #[cfg(not(coverage))]
4221    #[tokio::test]
4222    async fn test_select_best_node_picks_least_loaded() {
4223        use axum::{Router, routing::get};
4224
4225        let app = Router::new().route(
4226            "/api/v1/peers",
4227            get(|| async {
4228                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4229            }),
4230        );
4231        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4232        let addr = listener.local_addr().unwrap();
4233        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4234        let base = format!("http://127.0.0.1:{}", addr.port());
4235
4236        let client = reqwest::Client::new();
4237        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4238        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
4239        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
4240        // idle wins (lower score)
4241        assert_eq!(name, "idle");
4242        assert_eq!(addr, "idle:7433");
4243    }
4244
4245    #[cfg(not(coverage))]
4246    #[tokio::test]
4247    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4248        use axum::{Router, routing::get};
4249
4250        let app = Router::new().route(
4251            "/api/v1/peers",
4252            get(|| async {
4253                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4254            }),
4255        );
4256        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4257        let addr = listener.local_addr().unwrap();
4258        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4259        let base = format!("http://127.0.0.1:{}", addr.port());
4260
4261        let client = reqwest::Client::new();
4262        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4263        assert_eq!(name, "my-mac");
4264        assert_eq!(addr, "localhost:7433");
4265    }
4266
4267    #[cfg(not(coverage))]
4268    #[tokio::test]
4269    async fn test_select_best_node_empty_peers_falls_back_to_local() {
4270        use axum::{Router, routing::get};
4271
4272        let app = Router::new().route(
4273            "/api/v1/peers",
4274            get(|| async {
4275                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4276            }),
4277        );
4278        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4279        let addr = listener.local_addr().unwrap();
4280        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4281        let base = format!("http://127.0.0.1:{}", addr.port());
4282
4283        let client = reqwest::Client::new();
4284        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4285        assert_eq!(name, "solo");
4286        assert_eq!(addr, "localhost:7433");
4287    }
4288
4289    #[cfg(not(coverage))]
4290    #[tokio::test]
4291    async fn test_execute_spawn_auto_selects_node() {
4292        use axum::{
4293            Router,
4294            http::StatusCode,
4295            routing::{get, post},
4296        };
4297
4298        let create_json = test_create_response_json();
4299
4300        // Bind early so we know the address to embed in the peers response
4301        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4302        let addr = listener.local_addr().unwrap();
4303        let node = format!("127.0.0.1:{}", addr.port());
4304        let peer_addr = node.clone();
4305
4306        let app = Router::new()
4307            .route(
4308                "/api/v1/peers",
4309                get(move || {
4310                    let peer_addr = peer_addr.clone();
4311                    async move {
4312                        format!(
4313                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4314                        )
4315                    }
4316                }),
4317            )
4318            .route(
4319                "/api/v1/sessions",
4320                post(move || async move {
4321                    (StatusCode::CREATED, create_json.clone())
4322                }),
4323            );
4324
4325        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4326
4327        let cli = Cli {
4328            node,
4329            token: None,
4330            command: Some(Commands::Spawn {
4331                name: Some("test".into()),
4332                workdir: Some("/tmp/repo".into()),
4333                ink: None,
4334                description: None,
4335                detach: true,
4336                idle_threshold: None,
4337                auto: true,
4338                worktree: false,
4339                runtime: None,
4340                secret: vec![],
4341                command: vec!["echo".into(), "hello".into()],
4342            }),
4343            path: None,
4344        };
4345        let result = execute(&cli).await.unwrap();
4346        assert!(result.contains("Created session"));
4347    }
4348
4349    #[cfg(not(coverage))]
4350    #[tokio::test]
4351    async fn test_select_best_node_peer_no_session_count() {
4352        use axum::{Router, routing::get};
4353
4354        let app = Router::new().route(
4355            "/api/v1/peers",
4356            get(|| async {
4357                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4358            }),
4359        );
4360        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4361        let addr = listener.local_addr().unwrap();
4362        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4363        let base = format!("http://127.0.0.1:{}", addr.port());
4364
4365        let client = reqwest::Client::new();
4366        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4367        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
4368        assert_eq!(name, "fresh");
4369        assert_eq!(addr, "fresh:7433");
4370    }
4371
4372    // -- Secret CLI parse tests --
4373
4374    #[test]
4375    fn test_cli_parse_secret_set() {
4376        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4377        assert!(matches!(
4378            &cli.command,
4379            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4380                if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4381        ));
4382    }
4383
4384    #[test]
4385    fn test_cli_parse_secret_list() {
4386        let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4387        assert!(matches!(
4388            &cli.command,
4389            Some(Commands::Secret {
4390                action: SecretAction::List
4391            })
4392        ));
4393    }
4394
4395    #[test]
4396    fn test_cli_parse_secret_list_alias() {
4397        let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4398        assert!(matches!(
4399            &cli.command,
4400            Some(Commands::Secret {
4401                action: SecretAction::List
4402            })
4403        ));
4404    }
4405
4406    #[test]
4407    fn test_cli_parse_secret_delete() {
4408        let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4409        assert!(matches!(
4410            &cli.command,
4411            Some(Commands::Secret { action: SecretAction::Delete { name } })
4412                if name == "MY_TOKEN"
4413        ));
4414    }
4415
4416    #[test]
4417    fn test_cli_parse_secret_delete_alias() {
4418        let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4419        assert!(matches!(
4420            &cli.command,
4421            Some(Commands::Secret { action: SecretAction::Delete { name } })
4422                if name == "MY_TOKEN"
4423        ));
4424    }
4425
4426    #[test]
4427    fn test_cli_parse_secret_alias() {
4428        let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4429        assert!(matches!(
4430            &cli.command,
4431            Some(Commands::Secret {
4432                action: SecretAction::List
4433            })
4434        ));
4435    }
4436
4437    #[test]
4438    fn test_format_secrets_empty() {
4439        let secrets: Vec<serde_json::Value> = vec![];
4440        assert_eq!(format_secrets(&secrets), "No secrets configured.");
4441    }
4442
4443    #[test]
4444    fn test_format_secrets_with_entries() {
4445        let secrets = vec![
4446            serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4447            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4448        ];
4449        let output = format_secrets(&secrets);
4450        assert!(output.contains("GITHUB_TOKEN"));
4451        assert!(output.contains("NPM_TOKEN"));
4452        assert!(output.contains("NAME"));
4453        assert!(output.contains("ENV"));
4454        assert!(output.contains("CREATED"));
4455    }
4456
4457    #[test]
4458    fn test_format_secrets_with_env() {
4459        let secrets = vec![
4460            serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4461            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4462        ];
4463        let output = format_secrets(&secrets);
4464        assert!(output.contains("GH_WORK"));
4465        assert!(output.contains("GITHUB_TOKEN"));
4466        assert!(output.contains("NPM_TOKEN"));
4467    }
4468
4469    #[test]
4470    fn test_format_secrets_short_timestamp() {
4471        let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4472        let output = format_secrets(&secrets);
4473        assert!(output.contains("now"));
4474    }
4475
4476    #[test]
4477    fn test_format_schedules_short_last_run_at() {
4478        // Regression: last_run_at shorter than 16 chars must not panic
4479        let schedules = vec![serde_json::json!({
4480            "name": "test",
4481            "cron": "* * * * *",
4482            "enabled": true,
4483            "last_run_at": "short",
4484            "target_node": null
4485        })];
4486        let output = format_schedules(&schedules);
4487        assert!(output.contains("short"));
4488    }
4489
4490    #[test]
4491    fn test_format_sessions_multibyte_command_truncation() {
4492        use chrono::Utc;
4493        use pulpo_common::session::SessionStatus;
4494        use uuid::Uuid;
4495
4496        // Command with multi-byte chars exceeding 50 bytes; must not panic
4497        let sessions = vec![Session {
4498            id: Uuid::nil(),
4499            name: "test".into(),
4500            workdir: "/tmp".into(),
4501            command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4502            description: None,
4503            status: SessionStatus::Active,
4504            exit_code: None,
4505            backend_session_id: None,
4506            output_snapshot: None,
4507            metadata: None,
4508            ink: None,
4509            intervention_code: None,
4510            intervention_reason: None,
4511            intervention_at: None,
4512            last_output_at: None,
4513            idle_since: None,
4514            idle_threshold_secs: None,
4515            worktree_path: None,
4516            runtime: Runtime::Tmux,
4517            created_at: Utc::now(),
4518            updated_at: Utc::now(),
4519        }];
4520        let output = format_sessions(&sessions);
4521        assert!(output.contains("..."));
4522    }
4523}