Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6    PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12    name = "pulpo",
13    about = "Manage agent sessions across your machines",
14    version = env!("PULPO_VERSION"),
15    args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18    /// Target node (default: localhost)
19    #[arg(long, default_value = "localhost:7433")]
20    pub node: String,
21
22    /// Auth token (auto-discovered from local daemon if omitted)
23    #[arg(long)]
24    pub token: Option<String>,
25
26    #[command(subcommand)]
27    pub command: Option<Commands>,
28
29    /// Quick spawn: `pulpo <path>` spawns a session in that directory
30    #[arg(value_name = "PATH")]
31    pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37    /// Attach to a session's terminal
38    #[command(visible_alias = "a")]
39    Attach {
40        /// Session name or ID
41        name: String,
42    },
43
44    /// Send input to a session
45    #[command(visible_alias = "i", visible_alias = "send")]
46    Input {
47        /// Session name or ID
48        name: String,
49        /// Text to send (sends Enter if omitted)
50        text: Option<String>,
51    },
52
53    /// Spawn a new agent session
54    #[command(visible_alias = "s")]
55    Spawn {
56        /// Session name (auto-generated from workdir if omitted)
57        name: Option<String>,
58
59        /// Working directory (defaults to current directory)
60        #[arg(long)]
61        workdir: Option<String>,
62
63        /// Ink name (from config)
64        #[arg(long)]
65        ink: Option<String>,
66
67        /// Human-readable description of the task
68        #[arg(long)]
69        description: Option<String>,
70
71        /// Don't attach to the session after spawning
72        #[arg(short, long)]
73        detach: bool,
74
75        /// Idle threshold in seconds (0 = never idle)
76        #[arg(long)]
77        idle_threshold: Option<u32>,
78
79        /// Auto-select the least loaded node
80        #[arg(long)]
81        auto: bool,
82
83        /// Create an isolated git worktree for the session
84        #[arg(long)]
85        worktree: bool,
86
87        /// Runtime environment: tmux (default) or docker
88        #[arg(long)]
89        runtime: Option<String>,
90
91        /// Secrets to inject as environment variables (by name)
92        #[arg(long)]
93        secret: Vec<String>,
94
95        /// Command to run (everything after --)
96        #[arg(last = true)]
97        command: Vec<String>,
98    },
99
100    /// List sessions (live only by default)
101    #[command(visible_alias = "ls")]
102    List {
103        /// Show all sessions including killed and lost
104        #[arg(short, long)]
105        all: bool,
106    },
107
108    /// Show session logs/output
109    #[command(visible_alias = "l")]
110    Logs {
111        /// Session name or ID
112        name: String,
113
114        /// Number of lines to fetch
115        #[arg(long, default_value = "100")]
116        lines: usize,
117
118        /// Follow output (like `tail -f`)
119        #[arg(short, long)]
120        follow: bool,
121    },
122
123    /// Kill a session
124    #[command(visible_alias = "k")]
125    Kill {
126        /// Session name or ID
127        name: String,
128    },
129
130    /// Permanently remove a session from history
131    #[command(visible_alias = "rm")]
132    Delete {
133        /// Session name or ID
134        name: String,
135    },
136
137    /// Resume a lost session
138    #[command(visible_alias = "r")]
139    Resume {
140        /// Session name or ID
141        name: String,
142    },
143
144    /// List all known nodes
145    #[command(visible_alias = "n")]
146    Nodes,
147
148    /// Show intervention history for a session
149    #[command(visible_alias = "iv")]
150    Interventions {
151        /// Session name or ID
152        name: String,
153    },
154
155    /// Open the web dashboard in your browser
156    Ui,
157
158    /// Manage scheduled agent runs
159    #[command(visible_alias = "sched")]
160    Schedule {
161        #[command(subcommand)]
162        action: ScheduleAction,
163    },
164
165    /// Manage secrets (environment variables injected into sessions)
166    #[command(visible_alias = "sec")]
167    Secret {
168        #[command(subcommand)]
169        action: SecretAction,
170    },
171}
172
173#[derive(Subcommand, Debug)]
174pub enum SecretAction {
175    /// Set a secret
176    Set {
177        /// Secret name (will be the env var name, uppercase + underscores)
178        name: String,
179        /// Secret value
180        value: String,
181        /// Environment variable name (defaults to secret name)
182        #[arg(long)]
183        env: Option<String>,
184    },
185    /// List secret names
186    #[command(visible_alias = "ls")]
187    List,
188    /// Delete a secret
189    #[command(visible_alias = "rm")]
190    Delete {
191        /// Secret name
192        name: String,
193    },
194}
195
196#[derive(Subcommand, Debug)]
197pub enum ScheduleAction {
198    /// Add a new schedule
199    #[command(alias = "install")]
200    Add {
201        /// Schedule name
202        name: String,
203        /// Cron expression (e.g. "0 3 * * *")
204        cron: String,
205        /// Working directory
206        #[arg(long)]
207        workdir: Option<String>,
208        /// Target node (omit = local, "auto" = least-loaded)
209        #[arg(long)]
210        node: Option<String>,
211        /// Ink preset
212        #[arg(long)]
213        ink: Option<String>,
214        /// Description
215        #[arg(long)]
216        description: Option<String>,
217        /// Command to run (everything after --)
218        #[arg(last = true)]
219        command: Vec<String>,
220    },
221    /// List all schedules
222    #[command(alias = "ls")]
223    List,
224    /// Remove a schedule
225    #[command(alias = "rm")]
226    Remove {
227        /// Schedule name or ID
228        name: String,
229    },
230    /// Pause a schedule
231    Pause {
232        /// Schedule name or ID
233        name: String,
234    },
235    /// Resume a paused schedule
236    Resume {
237        /// Schedule name or ID
238        name: String,
239    },
240}
241
242/// The marker emitted by the agent wrapper when the agent process exits.
243const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
244
245/// Resolve a path to an absolute path string.
246fn resolve_path(path: &str) -> String {
247    let p = std::path::Path::new(path);
248    if p.is_absolute() {
249        path.to_owned()
250    } else {
251        std::env::current_dir().map_or_else(
252            |_| path.to_owned(),
253            |cwd| cwd.join(p).to_string_lossy().into_owned(),
254        )
255    }
256}
257
258/// Derive a session name from a directory path (basename, kebab-cased).
259fn derive_session_name(path: &str) -> String {
260    let basename = std::path::Path::new(path)
261        .file_name()
262        .and_then(|n| n.to_str())
263        .unwrap_or("session");
264    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
265    let kebab: String = basename
266        .chars()
267        .map(|c| {
268            if c.is_ascii_alphanumeric() {
269                c.to_ascii_lowercase()
270            } else {
271                '-'
272            }
273        })
274        .collect();
275    // Collapse consecutive hyphens and trim leading/trailing hyphens
276    let mut result = String::new();
277    for c in kebab.chars() {
278        if c == '-' && result.ends_with('-') {
279            continue;
280        }
281        result.push(c);
282    }
283    let result = result.trim_matches('-').to_owned();
284    if result.is_empty() {
285        "session".to_owned()
286    } else {
287        result
288    }
289}
290
291/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
292async fn deduplicate_session_name(
293    client: &reqwest::Client,
294    base: &str,
295    name: &str,
296    token: Option<&str>,
297) -> String {
298    // Check if the name is already taken by fetching the session
299    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
300        .send()
301        .await;
302    match resp {
303        Ok(r) if r.status().is_success() => {
304            // Session exists — try suffixed names
305            for i in 2..=99 {
306                let candidate = format!("{name}-{i}");
307                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
308                    .send()
309                    .await;
310                match resp {
311                    Ok(r) if r.status().is_success() => {}
312                    _ => return candidate,
313                }
314            }
315            format!("{name}-100")
316        }
317        _ => name.to_owned(),
318    }
319}
320
321/// Format the base URL from the node address.
322pub fn base_url(node: &str) -> String {
323    format!("http://{node}")
324}
325
326/// Response shape for the output endpoint.
327#[derive(serde::Deserialize)]
328struct OutputResponse {
329    output: String,
330}
331
332/// Format a list of sessions as a table.
333const fn session_runtime(session: &Session) -> &'static str {
334    match session.runtime {
335        Runtime::Tmux => "tmux",
336        Runtime::Docker => "docker",
337    }
338}
339
340fn format_sessions(sessions: &[Session]) -> String {
341    if sessions.is_empty() {
342        return "No sessions.".into();
343    }
344    let mut lines = vec![format!(
345        "{:<10} {:<24} {:<12} {:<8} {}",
346        "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
347    )];
348    for s in sessions {
349        let cmd_display = if s.command.len() > 50 {
350            let truncated: String = s.command.chars().take(47).collect();
351            format!("{truncated}...")
352        } else {
353            s.command.clone()
354        };
355        let short_id = &s.id.to_string()[..8];
356        let has_pr = s
357            .metadata
358            .as_ref()
359            .is_some_and(|m| m.contains_key("pr_url"));
360        let name_display = match (s.worktree_path.is_some(), has_pr) {
361            (true, true) => format!("{} [wt] [PR]", s.name),
362            (true, false) => format!("{} [wt]", s.name),
363            (false, true) => format!("{} [PR]", s.name),
364            (false, false) => s.name.clone(),
365        };
366        lines.push(format!(
367            "{:<10} {:<24} {:<12} {:<8} {}",
368            short_id,
369            name_display,
370            s.status,
371            session_runtime(s),
372            cmd_display
373        ));
374    }
375    lines.join("\n")
376}
377
378/// Format the peers response as a table.
379fn format_nodes(resp: &PeersResponse) -> String {
380    let mut lines = vec![format!(
381        "{:<20} {:<25} {:<10} {}",
382        "NAME", "ADDRESS", "STATUS", "SESSIONS"
383    )];
384    lines.push(format!(
385        "{:<20} {:<25} {:<10} {}",
386        resp.local.name, "(local)", "online", "-"
387    ));
388    for p in &resp.peers {
389        let sessions = p
390            .session_count
391            .map_or_else(|| "-".into(), |c| c.to_string());
392        lines.push(format!(
393            "{:<20} {:<25} {:<10} {}",
394            p.name, p.address, p.status, sessions
395        ));
396    }
397    lines.join("\n")
398}
399
400/// Format intervention events as a table.
401fn format_interventions(events: &[InterventionEventResponse]) -> String {
402    if events.is_empty() {
403        return "No intervention events.".into();
404    }
405    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
406    for e in events {
407        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
408    }
409    lines.join("\n")
410}
411
412/// Build the command to open a URL in the default browser.
413#[cfg_attr(coverage, allow(dead_code))]
414fn build_open_command(url: &str) -> std::process::Command {
415    #[cfg(target_os = "macos")]
416    {
417        let mut cmd = std::process::Command::new("open");
418        cmd.arg(url);
419        cmd
420    }
421    #[cfg(target_os = "linux")]
422    {
423        let mut cmd = std::process::Command::new("xdg-open");
424        cmd.arg(url);
425        cmd
426    }
427    #[cfg(target_os = "windows")]
428    {
429        let mut cmd = std::process::Command::new("cmd");
430        cmd.args(["/C", "start", url]);
431        cmd
432    }
433    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
434    {
435        // Fallback: try xdg-open
436        let mut cmd = std::process::Command::new("xdg-open");
437        cmd.arg(url);
438        cmd
439    }
440}
441
442/// Open a URL in the default browser.
443#[cfg(not(coverage))]
444fn open_browser(url: &str) -> Result<()> {
445    build_open_command(url).status()?;
446    Ok(())
447}
448
449/// Stub for coverage builds — avoids opening a browser during tests.
450#[cfg(coverage)]
451fn open_browser(_url: &str) -> Result<()> {
452    Ok(())
453}
454
455/// Build the command to attach to a session's terminal.
456/// Detects Docker sessions by the `docker:` prefix in the backend session ID.
457#[cfg_attr(coverage, allow(dead_code))]
458fn build_attach_command(backend_session_id: &str) -> std::process::Command {
459    // Docker sessions: exec into the container
460    if let Some(container) = backend_session_id.strip_prefix("docker:") {
461        let mut cmd = std::process::Command::new("docker");
462        cmd.args(["exec", "-it", container, "/bin/sh"]);
463        return cmd;
464    }
465    // tmux sessions
466    #[cfg(not(target_os = "windows"))]
467    {
468        let mut cmd = std::process::Command::new("tmux");
469        cmd.args(["attach-session", "-t", backend_session_id]);
470        cmd
471    }
472    #[cfg(target_os = "windows")]
473    {
474        // tmux attach not available on Windows — inform the user
475        let mut cmd = std::process::Command::new("cmd");
476        cmd.args([
477            "/C",
478            "echo",
479            "Attach not available on Windows. Use the web UI or --runtime docker.",
480        ]);
481        cmd
482    }
483}
484
485/// Attach to a session's terminal.
486#[cfg(not(any(test, coverage, target_os = "windows")))]
487fn attach_session(backend_session_id: &str) -> Result<()> {
488    let status = build_attach_command(backend_session_id).status()?;
489    if !status.success() {
490        anyhow::bail!("attach failed with {status}");
491    }
492    Ok(())
493}
494
495/// Stub for Windows — tmux attach is not available.
496#[cfg(all(target_os = "windows", not(test), not(coverage)))]
497fn attach_session(_backend_session_id: &str) -> Result<()> {
498    eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
499    Ok(())
500}
501
502/// Stub for test and coverage builds — avoids spawning real terminals during tests.
503#[cfg(any(test, coverage))]
504#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
505fn attach_session(_backend_session_id: &str) -> Result<()> {
506    Ok(())
507}
508
509/// Extract a clean error message from an API JSON response (or fall back to raw text).
510fn api_error(text: &str) -> anyhow::Error {
511    serde_json::from_str::<serde_json::Value>(text)
512        .ok()
513        .and_then(|v| v["error"].as_str().map(String::from))
514        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
515}
516
517/// Return the response body text, or a clean error if the response was non-success.
518async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
519    if resp.status().is_success() {
520        Ok(resp.text().await?)
521    } else {
522        let text = resp.text().await?;
523        Err(api_error(&text))
524    }
525}
526
527/// Map a reqwest error to a user-friendly message.
528fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
529    if err.is_connect() {
530        anyhow::anyhow!(
531            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
532        )
533    } else {
534        anyhow::anyhow!("Network error connecting to {node}: {err}")
535    }
536}
537
538/// Check if the node address points to localhost.
539fn is_localhost(node: &str) -> bool {
540    let host = node.split(':').next().unwrap_or(node);
541    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
542}
543
544/// Try to auto-discover the auth token from a local daemon.
545async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
546    let resp = client
547        .get(format!("{base}/api/v1/auth/token"))
548        .send()
549        .await
550        .ok()?;
551    let body: AuthTokenResponse = resp.json().await.ok()?;
552    if body.token.is_empty() {
553        None
554    } else {
555        Some(body.token)
556    }
557}
558
559/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
560async fn resolve_token(
561    client: &reqwest::Client,
562    base: &str,
563    node: &str,
564    explicit: Option<&str>,
565) -> Option<String> {
566    if let Some(t) = explicit {
567        return Some(t.to_owned());
568    }
569    if is_localhost(node) {
570        return discover_token(client, base).await;
571    }
572    None
573}
574
575/// Check if a node string needs resolution (no port specified).
576fn node_needs_resolution(node: &str) -> bool {
577    !node.contains(':')
578}
579
580/// Resolve a node reference to a `host:port` address.
581///
582/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
583/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
584/// online peer is found, return its address and optionally its configured auth token
585/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
586#[cfg(not(coverage))]
587async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
588    // Already has port — use as-is
589    if !node_needs_resolution(node) {
590        return (node.to_owned(), None);
591    }
592
593    // Try to resolve via local daemon's peer registry
594    let local_base = "http://localhost:7433";
595    let mut resolved_address: Option<String> = None;
596
597    if let Ok(resp) = client
598        .get(format!("{local_base}/api/v1/peers"))
599        .send()
600        .await
601        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
602    {
603        for peer in &peers_resp.peers {
604            if peer.name == node {
605                resolved_address = Some(peer.address.clone());
606                break;
607            }
608        }
609    }
610
611    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
612
613    // Try to get the peer's auth token from the config endpoint
614    let peer_token = if let Ok(resp) = client
615        .get(format!("{local_base}/api/v1/config"))
616        .send()
617        .await
618        && let Ok(config) = resp.json::<ConfigResponse>().await
619        && let Some(entry) = config.peers.get(node)
620    {
621        entry.token().map(String::from)
622    } else {
623        None
624    };
625
626    (address, peer_token)
627}
628
629/// Coverage stub — no real HTTP resolution during coverage builds.
630#[cfg(coverage)]
631async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
632    if node_needs_resolution(node) {
633        (format!("{node}:7433"), None)
634    } else {
635        (node.to_owned(), None)
636    }
637}
638
639/// Build an authenticated GET request.
640fn authed_get(
641    client: &reqwest::Client,
642    url: String,
643    token: Option<&str>,
644) -> reqwest::RequestBuilder {
645    let req = client.get(url);
646    if let Some(t) = token {
647        req.bearer_auth(t)
648    } else {
649        req
650    }
651}
652
653/// Build an authenticated POST request.
654fn authed_post(
655    client: &reqwest::Client,
656    url: String,
657    token: Option<&str>,
658) -> reqwest::RequestBuilder {
659    let req = client.post(url);
660    if let Some(t) = token {
661        req.bearer_auth(t)
662    } else {
663        req
664    }
665}
666
667/// Build an authenticated DELETE request.
668fn authed_delete(
669    client: &reqwest::Client,
670    url: String,
671    token: Option<&str>,
672) -> reqwest::RequestBuilder {
673    let req = client.delete(url);
674    if let Some(t) = token {
675        req.bearer_auth(t)
676    } else {
677        req
678    }
679}
680
681/// Build an authenticated PUT request.
682#[cfg(not(coverage))]
683fn authed_put(
684    client: &reqwest::Client,
685    url: String,
686    token: Option<&str>,
687) -> reqwest::RequestBuilder {
688    let req = client.put(url);
689    if let Some(t) = token {
690        req.bearer_auth(t)
691    } else {
692        req
693    }
694}
695
696/// Fetch session output from the API.
697async fn fetch_output(
698    client: &reqwest::Client,
699    base: &str,
700    name: &str,
701    lines: usize,
702    token: Option<&str>,
703) -> Result<String> {
704    let resp = authed_get(
705        client,
706        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
707        token,
708    )
709    .send()
710    .await?;
711    let text = ok_or_api_error(resp).await?;
712    let output: OutputResponse = serde_json::from_str(&text)?;
713    Ok(output.output)
714}
715
716/// Fetch session status from the API.
717async fn fetch_session_status(
718    client: &reqwest::Client,
719    base: &str,
720    name: &str,
721    token: Option<&str>,
722) -> Result<String> {
723    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
724        .send()
725        .await?;
726    let text = ok_or_api_error(resp).await?;
727    let session: Session = serde_json::from_str(&text)?;
728    Ok(session.status.to_string())
729}
730
731/// Wait for the session to leave "creating" state, then check if it died instantly.
732/// Uses the session ID (not name) to avoid matching old killed sessions with the same name.
733/// Returns an error with a helpful message if the session is lost/killed.
734async fn check_session_alive(
735    client: &reqwest::Client,
736    base: &str,
737    session_id: &str,
738    token: Option<&str>,
739) -> Result<()> {
740    // Poll up to 3 times at 500ms intervals — handles slow daemons and Docker pull delays
741    for _ in 0..3 {
742        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
743        // Fetch by ID to avoid name collisions with old sessions
744        let resp = authed_get(
745            client,
746            format!("{base}/api/v1/sessions/{session_id}"),
747            token,
748        )
749        .send()
750        .await;
751        if let Ok(resp) = resp
752            && let Ok(text) = ok_or_api_error(resp).await
753            && let Ok(session) = serde_json::from_str::<Session>(&text)
754        {
755            match session.status.to_string().as_str() {
756                "creating" => continue,
757                "lost" | "killed" => {
758                    anyhow::bail!(
759                        "Session \"{}\" exited immediately — the command may have failed.\n  Check logs: pulpo logs {}",
760                        session.name,
761                        session.name
762                    );
763                }
764                _ => return Ok(()),
765            }
766        }
767        // fetch failed — don't block, proceed to attach
768        break;
769    }
770    Ok(())
771}
772
773/// Compute the new trailing lines that differ from the previous output.
774///
775/// The output endpoint returns the last N lines from the terminal pane. As new lines
776/// appear, old lines at the top scroll off. We find the overlap between the end
777/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
778/// trailing lines.
779fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
780    if prev.is_empty() {
781        return new;
782    }
783
784    let prev_lines: Vec<&str> = prev.lines().collect();
785    let new_lines: Vec<&str> = new.lines().collect();
786
787    if new_lines.is_empty() {
788        return "";
789    }
790
791    // prev is non-empty (early return above), so last() always succeeds
792    let last_prev = prev_lines[prev_lines.len() - 1];
793
794    // Find the last line of prev in new to determine the overlap boundary
795    for i in (0..new_lines.len()).rev() {
796        if new_lines[i] == last_prev {
797            // Verify contiguous overlap: check that lines before this match too
798            let overlap_len = prev_lines.len().min(i + 1);
799            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
800            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
801            if prev_tail == new_overlap {
802                if i + 1 < new_lines.len() {
803                    // Return the slice of `new` after the overlap
804                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
805                    return new.get(consumed.min(new.len())..).unwrap_or("");
806                }
807                return "";
808            }
809        }
810    }
811
812    // No overlap found — output changed completely, print it all
813    new
814}
815
816/// Follow logs by polling, printing only new output. Returns when the session ends.
817async fn follow_logs(
818    client: &reqwest::Client,
819    base: &str,
820    name: &str,
821    lines: usize,
822    token: Option<&str>,
823    writer: &mut (dyn std::io::Write + Send),
824) -> Result<()> {
825    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
826    write!(writer, "{prev_output}")?;
827
828    let mut unchanged_ticks: u32 = 0;
829
830    loop {
831        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
832
833        // Fetch latest output
834        let new_output = fetch_output(client, base, name, lines, token).await?;
835
836        let diff = diff_output(&prev_output, &new_output);
837        if diff.is_empty() {
838            unchanged_ticks += 1;
839        } else {
840            write!(writer, "{diff}")?;
841            unchanged_ticks = 0;
842        }
843
844        // Check for agent exit marker in output
845        if new_output.contains(AGENT_EXIT_MARKER) {
846            break;
847        }
848
849        prev_output = new_output;
850
851        // Only check session status when output has been unchanged for 3+ ticks
852        if unchanged_ticks >= 3 {
853            let status = fetch_session_status(client, base, name, token).await?;
854            let is_terminal = status == "ready" || status == "killed" || status == "lost";
855            if is_terminal {
856                break;
857            }
858        }
859    }
860    Ok(())
861}
862
863// --- Schedule API ---
864
865/// Execute a schedule subcommand via the scheduler API.
866#[cfg(not(coverage))]
867async fn execute_schedule(
868    client: &reqwest::Client,
869    action: &ScheduleAction,
870    base: &str,
871    token: Option<&str>,
872) -> Result<String> {
873    match action {
874        ScheduleAction::Add {
875            name,
876            cron,
877            workdir,
878            node,
879            ink,
880            description,
881            command,
882        } => {
883            let cmd = if command.is_empty() {
884                None
885            } else {
886                Some(command.join(" "))
887            };
888            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
889                std::env::current_dir()
890                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
891            });
892            let mut body = serde_json::json!({
893                "name": name,
894                "cron": cron,
895                "workdir": resolved_workdir,
896            });
897            if let Some(c) = &cmd {
898                body["command"] = serde_json::json!(c);
899            }
900            if let Some(n) = node {
901                body["target_node"] = serde_json::json!(n);
902            }
903            if let Some(i) = ink {
904                body["ink"] = serde_json::json!(i);
905            }
906            if let Some(d) = description {
907                body["description"] = serde_json::json!(d);
908            }
909            let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
910                .json(&body)
911                .send()
912                .await?;
913            ok_or_api_error(resp).await?;
914            Ok(format!("Created schedule \"{name}\""))
915        }
916        ScheduleAction::List => {
917            let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
918                .send()
919                .await?;
920            let text = ok_or_api_error(resp).await?;
921            let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
922            Ok(format_schedules(&schedules))
923        }
924        ScheduleAction::Remove { name } => {
925            let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
926                .send()
927                .await?;
928            ok_or_api_error(resp).await?;
929            Ok(format!("Removed schedule \"{name}\""))
930        }
931        ScheduleAction::Pause { name } => {
932            let body = serde_json::json!({ "enabled": false });
933            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
934                .json(&body)
935                .send()
936                .await?;
937            ok_or_api_error(resp).await?;
938            Ok(format!("Paused schedule \"{name}\""))
939        }
940        ScheduleAction::Resume { name } => {
941            let body = serde_json::json!({ "enabled": true });
942            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
943                .json(&body)
944                .send()
945                .await?;
946            ok_or_api_error(resp).await?;
947            Ok(format!("Resumed schedule \"{name}\""))
948        }
949    }
950}
951
952/// Coverage stub for schedule execution.
953#[cfg(coverage)]
954#[allow(clippy::unnecessary_wraps)]
955async fn execute_schedule(
956    _client: &reqwest::Client,
957    _action: &ScheduleAction,
958    _base: &str,
959    _token: Option<&str>,
960) -> Result<String> {
961    Ok(String::new())
962}
963
964// --- Secret API ---
965
966/// Format secret entries as a table.
967#[cfg_attr(coverage, allow(dead_code))]
968fn format_secrets(secrets: &[serde_json::Value]) -> String {
969    if secrets.is_empty() {
970        return "No secrets configured.".into();
971    }
972    let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
973    for s in secrets {
974        let name = s["name"].as_str().unwrap_or("?");
975        let env_display = s["env"]
976            .as_str()
977            .map_or_else(|| name.to_owned(), String::from);
978        let created = s["created_at"]
979            .as_str()
980            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
981        lines.push(format!("{name:<24} {env_display:<24} {created}"));
982    }
983    lines.join("\n")
984}
985
986/// Execute a secret subcommand via the secrets API.
987#[cfg(not(coverage))]
988async fn execute_secret(
989    client: &reqwest::Client,
990    action: &SecretAction,
991    base: &str,
992    token: Option<&str>,
993) -> Result<String> {
994    match action {
995        SecretAction::Set { name, value, env } => {
996            let mut body = serde_json::json!({ "value": value });
997            if let Some(e) = env {
998                body["env"] = serde_json::json!(e);
999            }
1000            let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1001                .json(&body)
1002                .send()
1003                .await?;
1004            ok_or_api_error(resp).await?;
1005            Ok(format!("Secret \"{name}\" set."))
1006        }
1007        SecretAction::List => {
1008            let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1009                .send()
1010                .await?;
1011            let text = ok_or_api_error(resp).await?;
1012            let parsed: serde_json::Value = serde_json::from_str(&text)?;
1013            let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1014            Ok(format_secrets(secrets))
1015        }
1016        SecretAction::Delete { name } => {
1017            let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1018                .send()
1019                .await?;
1020            ok_or_api_error(resp).await?;
1021            Ok(format!("Secret \"{name}\" deleted."))
1022        }
1023    }
1024}
1025
1026/// Coverage stub for secret execution.
1027#[cfg(coverage)]
1028#[allow(clippy::unnecessary_wraps)]
1029async fn execute_secret(
1030    _client: &reqwest::Client,
1031    _action: &SecretAction,
1032    _base: &str,
1033    _token: Option<&str>,
1034) -> Result<String> {
1035    Ok(String::new())
1036}
1037
1038/// Format a list of schedules as a table.
1039#[cfg_attr(coverage, allow(dead_code))]
1040fn format_schedules(schedules: &[serde_json::Value]) -> String {
1041    if schedules.is_empty() {
1042        return "No schedules.".into();
1043    }
1044    let mut lines = vec![format!(
1045        "{:<20} {:<18} {:<8} {:<12} {}",
1046        "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1047    )];
1048    for s in schedules {
1049        let name = s["name"].as_str().unwrap_or("?");
1050        let cron = s["cron"].as_str().unwrap_or("?");
1051        let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1052            "yes"
1053        } else {
1054            "no"
1055        };
1056        let last_run = s["last_run_at"]
1057            .as_str()
1058            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1059        let node = s["target_node"].as_str().unwrap_or("local");
1060        lines.push(format!(
1061            "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1062        ));
1063    }
1064    lines.join("\n")
1065}
1066
1067/// Select the best node from the peer registry based on load.
1068/// Returns the node address and name of the least loaded online peer.
1069/// Scoring: lower memory usage + fewer active sessions = better.
1070#[cfg(not(coverage))]
1071async fn select_best_node(
1072    client: &reqwest::Client,
1073    base: &str,
1074    token: Option<&str>,
1075) -> Result<(String, String)> {
1076    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1077        .send()
1078        .await?;
1079    let text = ok_or_api_error(resp).await?;
1080    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1081
1082    // Score: fewer active sessions is better, more memory is better
1083    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
1084
1085    for peer in &peers_resp.peers {
1086        if peer.status != pulpo_common::peer::PeerStatus::Online {
1087            continue;
1088        }
1089        let sessions = peer.session_count.unwrap_or(0);
1090        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1091        // Lower score = better (fewer sessions, more memory)
1092        #[allow(clippy::cast_precision_loss)]
1093        let score = sessions as f64 - (mem as f64 / 1024.0);
1094        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1095            best = Some((peer.address.clone(), peer.name.clone(), score));
1096        }
1097    }
1098
1099    // Fall back to local if no online peers
1100    match best {
1101        Some((addr, name, _)) => Ok((addr, name)),
1102        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1103    }
1104}
1105
1106/// Coverage stub — auto-select always falls back to local.
1107#[cfg(coverage)]
1108#[allow(clippy::unnecessary_wraps)]
1109async fn select_best_node(
1110    _client: &reqwest::Client,
1111    _base: &str,
1112    _token: Option<&str>,
1113) -> Result<(String, String)> {
1114    Ok(("localhost:7433".into(), "local".into()))
1115}
1116
1117/// Execute the given CLI command against the specified node.
1118#[allow(clippy::too_many_lines)]
1119pub async fn execute(cli: &Cli) -> Result<String> {
1120    let client = reqwest::Client::new();
1121    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1122    let url = base_url(&resolved_node);
1123    let node = &resolved_node;
1124    let token = resolve_token(&client, &url, node, cli.token.as_deref())
1125        .await
1126        .or(peer_token);
1127
1128    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
1129    if cli.command.is_none() && cli.path.is_none() {
1130        // No subcommand and no path: print help
1131        use clap::CommandFactory;
1132        let mut cmd = Cli::command();
1133        cmd.print_help()?;
1134        println!();
1135        return Ok(String::new());
1136    }
1137    if cli.command.is_none() {
1138        let path = cli.path.as_deref().unwrap_or(".");
1139        let resolved_workdir = resolve_path(path);
1140        let base_name = derive_session_name(&resolved_workdir);
1141        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1142        let body = serde_json::json!({
1143            "name": name,
1144            "workdir": resolved_workdir,
1145        });
1146        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1147            .json(&body)
1148            .send()
1149            .await
1150            .map_err(|e| friendly_error(&e, node))?;
1151        let text = ok_or_api_error(resp).await?;
1152        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1153        let msg = format!(
1154            "Created session \"{}\" ({})",
1155            resp.session.name, resp.session.id
1156        );
1157        let backend_id = resp
1158            .session
1159            .backend_session_id
1160            .as_deref()
1161            .unwrap_or(&resp.session.name);
1162        eprintln!("{msg}");
1163        // Path shortcut spawns a shell (no command) — skip liveness check
1164        // since shell sessions are immediately detected as idle by the watchdog
1165        attach_session(backend_id)?;
1166        return Ok(format!("Detached from session \"{}\".", resp.session.name));
1167    }
1168
1169    match cli.command.as_ref().unwrap() {
1170        Commands::Attach { name } => {
1171            // Fetch session to get status and backend_session_id
1172            let resp = authed_get(
1173                &client,
1174                format!("{url}/api/v1/sessions/{name}"),
1175                token.as_deref(),
1176            )
1177            .send()
1178            .await
1179            .map_err(|e| friendly_error(&e, node))?;
1180            let text = ok_or_api_error(resp).await?;
1181            let session: Session = serde_json::from_str(&text)?;
1182            match session.status.to_string().as_str() {
1183                "lost" => {
1184                    anyhow::bail!(
1185                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
1186                    );
1187                }
1188                "killed" => {
1189                    anyhow::bail!(
1190                        "Session \"{name}\" is {} — cannot attach to a killed session.",
1191                        session.status
1192                    );
1193                }
1194                _ => {}
1195            }
1196            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1197            attach_session(&backend_id)?;
1198            Ok(format!("Detached from session {name}."))
1199        }
1200        Commands::Input { name, text } => {
1201            let input_text = text.as_deref().unwrap_or("\n");
1202            let body = serde_json::json!({ "text": input_text });
1203            let resp = authed_post(
1204                &client,
1205                format!("{url}/api/v1/sessions/{name}/input"),
1206                token.as_deref(),
1207            )
1208            .json(&body)
1209            .send()
1210            .await
1211            .map_err(|e| friendly_error(&e, node))?;
1212            ok_or_api_error(resp).await?;
1213            Ok(format!("Sent input to session {name}."))
1214        }
1215        Commands::List { all } => {
1216            let list_url = if *all {
1217                format!("{url}/api/v1/sessions")
1218            } else {
1219                format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1220            };
1221            let resp = authed_get(&client, list_url, token.as_deref())
1222                .send()
1223                .await
1224                .map_err(|e| friendly_error(&e, node))?;
1225            let text = ok_or_api_error(resp).await?;
1226            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1227            Ok(format_sessions(&sessions))
1228        }
1229        Commands::Nodes => {
1230            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1231                .send()
1232                .await
1233                .map_err(|e| friendly_error(&e, node))?;
1234            let text = ok_or_api_error(resp).await?;
1235            let resp: PeersResponse = serde_json::from_str(&text)?;
1236            Ok(format_nodes(&resp))
1237        }
1238        Commands::Spawn {
1239            workdir,
1240            name,
1241            ink,
1242            description,
1243            detach,
1244            idle_threshold,
1245            auto,
1246            worktree,
1247            runtime,
1248            secret,
1249            command,
1250        } => {
1251            let cmd = if command.is_empty() {
1252                None
1253            } else {
1254                Some(command.join(" "))
1255            };
1256            // Resolve workdir: --workdir flag > current directory
1257            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1258                std::env::current_dir()
1259                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1260            });
1261            // Resolve name: explicit > derived from workdir (with dedup)
1262            let resolved_name = if let Some(n) = name {
1263                n.clone()
1264            } else {
1265                let base_name = derive_session_name(&resolved_workdir);
1266                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1267            };
1268            let mut body = serde_json::json!({
1269                "name": resolved_name,
1270                "workdir": resolved_workdir,
1271            });
1272            if let Some(c) = &cmd {
1273                body["command"] = serde_json::json!(c);
1274            }
1275            if let Some(i) = ink {
1276                body["ink"] = serde_json::json!(i);
1277            }
1278            if let Some(d) = description {
1279                body["description"] = serde_json::json!(d);
1280            }
1281            if let Some(t) = idle_threshold {
1282                body["idle_threshold_secs"] = serde_json::json!(t);
1283            }
1284            if *worktree {
1285                body["worktree"] = serde_json::json!(true);
1286                eprintln!(
1287                    "Worktree: branch pulpo/{resolved_name} in {resolved_workdir}/.pulpo/worktrees/{resolved_name}/"
1288                );
1289            }
1290            if let Some(rt) = runtime {
1291                body["runtime"] = serde_json::json!(rt);
1292            }
1293            if !secret.is_empty() {
1294                body["secrets"] = serde_json::json!(secret);
1295            }
1296            let spawn_url = if *auto {
1297                let (auto_addr, auto_name) =
1298                    select_best_node(&client, &url, token.as_deref()).await?;
1299                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1300                base_url(&auto_addr)
1301            } else {
1302                url.clone()
1303            };
1304            let resp = authed_post(
1305                &client,
1306                format!("{spawn_url}/api/v1/sessions"),
1307                token.as_deref(),
1308            )
1309            .json(&body)
1310            .send()
1311            .await
1312            .map_err(|e| friendly_error(&e, node))?;
1313            let text = ok_or_api_error(resp).await?;
1314            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1315            let msg = format!(
1316                "Created session \"{}\" ({})",
1317                resp.session.name, resp.session.id
1318            );
1319            if !detach {
1320                let backend_id = resp
1321                    .session
1322                    .backend_session_id
1323                    .as_deref()
1324                    .unwrap_or(&resp.session.name);
1325                eprintln!("{msg}");
1326                // Only check liveness for explicit commands — shell sessions (no command)
1327                // may be immediately marked idle/killed by the watchdog, which is expected
1328                if cmd.is_some() {
1329                    let sid = resp.session.id.to_string();
1330                    check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1331                }
1332                attach_session(backend_id)?;
1333                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1334            }
1335            Ok(msg)
1336        }
1337        Commands::Kill { name } => {
1338            let resp = authed_post(
1339                &client,
1340                format!("{url}/api/v1/sessions/{name}/kill"),
1341                token.as_deref(),
1342            )
1343            .send()
1344            .await
1345            .map_err(|e| friendly_error(&e, node))?;
1346            ok_or_api_error(resp).await?;
1347            Ok(format!("Session {name} killed."))
1348        }
1349        Commands::Delete { name } => {
1350            let resp = authed_delete(
1351                &client,
1352                format!("{url}/api/v1/sessions/{name}"),
1353                token.as_deref(),
1354            )
1355            .send()
1356            .await
1357            .map_err(|e| friendly_error(&e, node))?;
1358            ok_or_api_error(resp).await?;
1359            Ok(format!("Session {name} deleted."))
1360        }
1361        Commands::Logs {
1362            name,
1363            lines,
1364            follow,
1365        } => {
1366            if *follow {
1367                let mut stdout = std::io::stdout();
1368                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1369                    .await
1370                    .map_err(|e| {
1371                        // Unwrap reqwest errors to friendly messages
1372                        match e.downcast::<reqwest::Error>() {
1373                            Ok(re) => friendly_error(&re, node),
1374                            Err(other) => other,
1375                        }
1376                    })?;
1377                Ok(String::new())
1378            } else {
1379                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1380                    .await
1381                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1382                        Ok(re) => friendly_error(&re, node),
1383                        Err(other) => other,
1384                    })?;
1385                Ok(output)
1386            }
1387        }
1388        Commands::Interventions { name } => {
1389            let resp = authed_get(
1390                &client,
1391                format!("{url}/api/v1/sessions/{name}/interventions"),
1392                token.as_deref(),
1393            )
1394            .send()
1395            .await
1396            .map_err(|e| friendly_error(&e, node))?;
1397            let text = ok_or_api_error(resp).await?;
1398            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1399            Ok(format_interventions(&events))
1400        }
1401        Commands::Ui => {
1402            let dashboard = base_url(node);
1403            open_browser(&dashboard)?;
1404            Ok(format!("Opening {dashboard}"))
1405        }
1406        Commands::Resume { name } => {
1407            let resp = authed_post(
1408                &client,
1409                format!("{url}/api/v1/sessions/{name}/resume"),
1410                token.as_deref(),
1411            )
1412            .send()
1413            .await
1414            .map_err(|e| friendly_error(&e, node))?;
1415            let text = ok_or_api_error(resp).await?;
1416            let session: Session = serde_json::from_str(&text)?;
1417            let backend_id = session
1418                .backend_session_id
1419                .as_deref()
1420                .unwrap_or(&session.name);
1421            eprintln!("Resumed session \"{}\"", session.name);
1422            let sid = session.id.to_string();
1423            check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1424            attach_session(backend_id)?;
1425            Ok(format!("Detached from session \"{}\".", session.name))
1426        }
1427        Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1428            .await
1429            .map_err(|e| match e.downcast::<reqwest::Error>() {
1430                Ok(re) => friendly_error(&re, node),
1431                Err(other) => other,
1432            }),
1433        Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1434            .await
1435            .map_err(|e| match e.downcast::<reqwest::Error>() {
1436                Ok(re) => friendly_error(&re, node),
1437                Err(other) => other,
1438            }),
1439    }
1440}
1441
1442#[cfg(test)]
1443mod tests {
1444    use super::*;
1445
1446    #[test]
1447    fn test_base_url() {
1448        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1449        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1450    }
1451
1452    #[test]
1453    fn test_cli_parse_list() {
1454        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1455        assert_eq!(cli.node, "localhost:7433");
1456        assert!(matches!(cli.command, Some(Commands::List { .. })));
1457    }
1458
1459    #[test]
1460    fn test_cli_parse_nodes() {
1461        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1462        assert!(matches!(cli.command, Some(Commands::Nodes)));
1463    }
1464
1465    #[test]
1466    fn test_cli_parse_ui() {
1467        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1468        assert!(matches!(cli.command, Some(Commands::Ui)));
1469    }
1470
1471    #[test]
1472    fn test_cli_parse_ui_custom_node() {
1473        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1474        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1475        assert_eq!(cli.node, "mac-mini:7433");
1476        assert_eq!(cli.path.as_deref(), Some("ui"));
1477    }
1478
1479    #[test]
1480    fn test_build_open_command() {
1481        let cmd = build_open_command("http://localhost:7433");
1482        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1483        assert_eq!(args, vec!["http://localhost:7433"]);
1484        #[cfg(target_os = "macos")]
1485        assert_eq!(cmd.get_program(), "open");
1486        #[cfg(target_os = "linux")]
1487        assert_eq!(cmd.get_program(), "xdg-open");
1488    }
1489
1490    #[test]
1491    fn test_cli_parse_spawn() {
1492        let cli = Cli::try_parse_from([
1493            "pulpo",
1494            "spawn",
1495            "my-task",
1496            "--workdir",
1497            "/tmp/repo",
1498            "--",
1499            "claude",
1500            "-p",
1501            "Fix the bug",
1502        ])
1503        .unwrap();
1504        assert!(matches!(
1505            &cli.command,
1506            Some(Commands::Spawn { name, workdir, command, .. })
1507                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1508                && command == &["claude", "-p", "Fix the bug"]
1509        ));
1510    }
1511
1512    #[test]
1513    fn test_cli_parse_spawn_with_ink() {
1514        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1515        assert!(matches!(
1516            &cli.command,
1517            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1518        ));
1519    }
1520
1521    #[test]
1522    fn test_cli_parse_spawn_with_description() {
1523        let cli =
1524            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1525                .unwrap();
1526        assert!(matches!(
1527            &cli.command,
1528            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1529        ));
1530    }
1531
1532    #[test]
1533    fn test_cli_parse_spawn_name_positional() {
1534        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1535        assert!(matches!(
1536            &cli.command,
1537            Some(Commands::Spawn { name, command, .. })
1538                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1539        ));
1540    }
1541
1542    #[test]
1543    fn test_cli_parse_spawn_no_command() {
1544        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1545        assert!(matches!(
1546            &cli.command,
1547            Some(Commands::Spawn { command, .. }) if command.is_empty()
1548        ));
1549    }
1550
1551    #[test]
1552    fn test_cli_parse_spawn_idle_threshold() {
1553        let cli =
1554            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1555        assert!(matches!(
1556            &cli.command,
1557            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1558        ));
1559    }
1560
1561    #[test]
1562    fn test_cli_parse_spawn_idle_threshold_60() {
1563        let cli =
1564            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1565        assert!(matches!(
1566            &cli.command,
1567            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1568        ));
1569    }
1570
1571    #[test]
1572    fn test_cli_parse_spawn_secrets() {
1573        let cli = Cli::try_parse_from([
1574            "pulpo",
1575            "spawn",
1576            "my-task",
1577            "--secret",
1578            "GITHUB_TOKEN",
1579            "--secret",
1580            "NPM_TOKEN",
1581        ])
1582        .unwrap();
1583        assert!(matches!(
1584            &cli.command,
1585            Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1586        ));
1587    }
1588
1589    #[test]
1590    fn test_cli_parse_spawn_no_secrets() {
1591        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1592        assert!(matches!(
1593            &cli.command,
1594            Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1595        ));
1596    }
1597
1598    #[test]
1599    fn test_cli_parse_secret_set_with_env() {
1600        let cli = Cli::try_parse_from([
1601            "pulpo",
1602            "secret",
1603            "set",
1604            "GH_WORK",
1605            "token123",
1606            "--env",
1607            "GITHUB_TOKEN",
1608        ])
1609        .unwrap();
1610        assert!(matches!(
1611            &cli.command,
1612            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1613                if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1614        ));
1615    }
1616
1617    #[test]
1618    fn test_cli_parse_secret_set_without_env() {
1619        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1620        assert!(matches!(
1621            &cli.command,
1622            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1623                if name == "MY_KEY" && value == "val" && env.is_none()
1624        ));
1625    }
1626
1627    #[test]
1628    fn test_cli_parse_spawn_worktree() {
1629        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1630        assert!(matches!(
1631            &cli.command,
1632            Some(Commands::Spawn { worktree, .. }) if *worktree
1633        ));
1634    }
1635
1636    #[test]
1637    fn test_cli_parse_spawn_detach() {
1638        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1639        assert!(matches!(
1640            &cli.command,
1641            Some(Commands::Spawn { detach, .. }) if *detach
1642        ));
1643    }
1644
1645    #[test]
1646    fn test_cli_parse_spawn_detach_short() {
1647        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1648        assert!(matches!(
1649            &cli.command,
1650            Some(Commands::Spawn { detach, .. }) if *detach
1651        ));
1652    }
1653
1654    #[test]
1655    fn test_cli_parse_spawn_detach_default() {
1656        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1657        assert!(matches!(
1658            &cli.command,
1659            Some(Commands::Spawn { detach, .. }) if !detach
1660        ));
1661    }
1662
1663    #[test]
1664    fn test_cli_parse_logs() {
1665        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1666        assert!(matches!(
1667            &cli.command,
1668            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1669        ));
1670    }
1671
1672    #[test]
1673    fn test_cli_parse_logs_with_lines() {
1674        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1675        assert!(matches!(
1676            &cli.command,
1677            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1678        ));
1679    }
1680
1681    #[test]
1682    fn test_cli_parse_logs_follow() {
1683        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1684        assert!(matches!(
1685            &cli.command,
1686            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1687        ));
1688    }
1689
1690    #[test]
1691    fn test_cli_parse_logs_follow_short() {
1692        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1693        assert!(matches!(
1694            &cli.command,
1695            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1696        ));
1697    }
1698
1699    #[test]
1700    fn test_cli_parse_kill() {
1701        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1702        assert!(matches!(
1703            &cli.command,
1704            Some(Commands::Kill { name }) if name == "my-session"
1705        ));
1706    }
1707
1708    #[test]
1709    fn test_cli_parse_delete() {
1710        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1711        assert!(matches!(
1712            &cli.command,
1713            Some(Commands::Delete { name }) if name == "my-session"
1714        ));
1715    }
1716
1717    #[test]
1718    fn test_cli_parse_resume() {
1719        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1720        assert!(matches!(
1721            &cli.command,
1722            Some(Commands::Resume { name }) if name == "my-session"
1723        ));
1724    }
1725
1726    #[test]
1727    fn test_cli_parse_input() {
1728        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1729        assert!(matches!(
1730            &cli.command,
1731            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1732        ));
1733    }
1734
1735    #[test]
1736    fn test_cli_parse_input_no_text() {
1737        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1738        assert!(matches!(
1739            &cli.command,
1740            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1741        ));
1742    }
1743
1744    #[test]
1745    fn test_cli_parse_input_alias() {
1746        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1747        assert!(matches!(
1748            &cli.command,
1749            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1750        ));
1751    }
1752
1753    #[test]
1754    fn test_cli_parse_custom_node() {
1755        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1756        assert_eq!(cli.node, "win-pc:8080");
1757        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
1758        assert_eq!(cli.path.as_deref(), Some("list"));
1759    }
1760
1761    #[test]
1762    fn test_cli_version() {
1763        let result = Cli::try_parse_from(["pulpo", "--version"]);
1764        // clap exits with an error (kind DisplayVersion) when --version is used
1765        let err = result.unwrap_err();
1766        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1767    }
1768
1769    #[test]
1770    fn test_cli_parse_no_subcommand_succeeds() {
1771        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1772        assert!(cli.command.is_none());
1773        assert!(cli.path.is_none());
1774    }
1775
1776    #[test]
1777    fn test_cli_debug() {
1778        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1779        let debug = format!("{cli:?}");
1780        assert!(debug.contains("List"));
1781    }
1782
1783    #[test]
1784    fn test_commands_debug() {
1785        let cmd = Commands::List { all: false };
1786        assert_eq!(format!("{cmd:?}"), "List { all: false }");
1787    }
1788
1789    /// A valid Session JSON for test responses.
1790    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1791
1792    /// A valid `CreateSessionResponse` JSON wrapping the session.
1793    fn test_create_response_json() -> String {
1794        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1795    }
1796
1797    /// Start a lightweight test HTTP server and return its address.
1798    async fn start_test_server() -> String {
1799        use axum::http::StatusCode;
1800        use axum::{
1801            Json, Router,
1802            routing::{get, post},
1803        };
1804
1805        let create_json = test_create_response_json();
1806
1807        let app = Router::new()
1808            .route(
1809                "/api/v1/sessions",
1810                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1811                    (StatusCode::CREATED, create_json.clone())
1812                }),
1813            )
1814            .route(
1815                "/api/v1/sessions/{id}",
1816                get(|| async { TEST_SESSION_JSON.to_owned() })
1817                    .delete(|| async { StatusCode::NO_CONTENT }),
1818            )
1819            .route(
1820                "/api/v1/sessions/{id}/kill",
1821                post(|| async { StatusCode::NO_CONTENT }),
1822            )
1823            .route(
1824                "/api/v1/sessions/{id}/output",
1825                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1826            )
1827            .route(
1828                "/api/v1/peers",
1829                get(|| async {
1830                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1831                }),
1832            )
1833            .route(
1834                "/api/v1/sessions/{id}/resume",
1835                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1836            )
1837            .route(
1838                "/api/v1/sessions/{id}/interventions",
1839                get(|| async { "[]".to_owned() }),
1840            )
1841            .route(
1842                "/api/v1/sessions/{id}/input",
1843                post(|| async { StatusCode::NO_CONTENT }),
1844            )
1845            .route(
1846                "/api/v1/schedules",
1847                get(|| async { Json::<Vec<()>>(vec![]) })
1848                    .post(|| async { StatusCode::CREATED }),
1849            )
1850            .route(
1851                "/api/v1/schedules/{id}",
1852                axum::routing::put(|| async { StatusCode::OK })
1853                    .delete(|| async { StatusCode::NO_CONTENT }),
1854            );
1855
1856        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1857        let addr = listener.local_addr().unwrap();
1858        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1859        format!("127.0.0.1:{}", addr.port())
1860    }
1861
1862    #[tokio::test]
1863    async fn test_execute_list_success() {
1864        let node = start_test_server().await;
1865        let cli = Cli {
1866            node,
1867            token: None,
1868            command: Some(Commands::List { all: false }),
1869            path: None,
1870        };
1871        let result = execute(&cli).await.unwrap();
1872        assert_eq!(result, "No sessions.");
1873    }
1874
1875    #[tokio::test]
1876    async fn test_execute_nodes_success() {
1877        let node = start_test_server().await;
1878        let cli = Cli {
1879            node,
1880            token: None,
1881            command: Some(Commands::Nodes),
1882            path: None,
1883        };
1884        let result = execute(&cli).await.unwrap();
1885        assert!(result.contains("test"));
1886        assert!(result.contains("(local)"));
1887        assert!(result.contains("NAME"));
1888    }
1889
1890    #[tokio::test]
1891    async fn test_execute_spawn_success() {
1892        let node = start_test_server().await;
1893        let cli = Cli {
1894            node,
1895            token: None,
1896            command: Some(Commands::Spawn {
1897                name: Some("test".into()),
1898                workdir: Some("/tmp/repo".into()),
1899                ink: None,
1900                description: None,
1901                detach: true,
1902                idle_threshold: None,
1903                auto: false,
1904                worktree: false,
1905                runtime: None,
1906                secret: vec![],
1907                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1908            }),
1909            path: None,
1910        };
1911        let result = execute(&cli).await.unwrap();
1912        assert!(result.contains("Created session"));
1913        assert!(result.contains("repo"));
1914    }
1915
1916    #[tokio::test]
1917    async fn test_execute_spawn_with_all_flags() {
1918        let node = start_test_server().await;
1919        let cli = Cli {
1920            node,
1921            token: None,
1922            command: Some(Commands::Spawn {
1923                name: Some("test".into()),
1924                workdir: Some("/tmp/repo".into()),
1925                ink: Some("coder".into()),
1926                description: Some("Fix the bug".into()),
1927                detach: true,
1928                idle_threshold: None,
1929                auto: false,
1930                worktree: false,
1931                runtime: None,
1932                secret: vec![],
1933                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1934            }),
1935            path: None,
1936        };
1937        let result = execute(&cli).await.unwrap();
1938        assert!(result.contains("Created session"));
1939    }
1940
1941    #[tokio::test]
1942    async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
1943        let node = start_test_server().await;
1944        let cli = Cli {
1945            node,
1946            token: None,
1947            command: Some(Commands::Spawn {
1948                name: Some("full-opts".into()),
1949                workdir: Some("/tmp/repo".into()),
1950                ink: Some("coder".into()),
1951                description: Some("Full options".into()),
1952                detach: true,
1953                idle_threshold: Some(120),
1954                auto: false,
1955                worktree: true,
1956                runtime: Some("docker".into()),
1957                secret: vec![],
1958                command: vec!["claude".into()],
1959            }),
1960            path: None,
1961        };
1962        let result = execute(&cli).await.unwrap();
1963        assert!(result.contains("Created session"));
1964    }
1965
1966    #[tokio::test]
1967    async fn test_execute_spawn_no_name_derives_from_workdir() {
1968        let node = start_test_server().await;
1969        let cli = Cli {
1970            node,
1971            token: None,
1972            command: Some(Commands::Spawn {
1973                name: None,
1974                workdir: Some("/tmp/my-project".into()),
1975                ink: None,
1976                description: None,
1977                detach: true,
1978                idle_threshold: None,
1979                auto: false,
1980                worktree: false,
1981                runtime: None,
1982                secret: vec![],
1983                command: vec!["echo".into(), "hello".into()],
1984            }),
1985            path: None,
1986        };
1987        let result = execute(&cli).await.unwrap();
1988        assert!(result.contains("Created session"));
1989    }
1990
1991    #[tokio::test]
1992    async fn test_execute_spawn_no_command() {
1993        let node = start_test_server().await;
1994        let cli = Cli {
1995            node,
1996            token: None,
1997            command: Some(Commands::Spawn {
1998                name: Some("test".into()),
1999                workdir: Some("/tmp/repo".into()),
2000                ink: None,
2001                description: None,
2002                detach: true,
2003                idle_threshold: None,
2004                auto: false,
2005                worktree: false,
2006                runtime: None,
2007                secret: vec![],
2008                command: vec![],
2009            }),
2010            path: None,
2011        };
2012        let result = execute(&cli).await.unwrap();
2013        assert!(result.contains("Created session"));
2014    }
2015
2016    #[tokio::test]
2017    async fn test_execute_spawn_with_name() {
2018        let node = start_test_server().await;
2019        let cli = Cli {
2020            node,
2021            token: None,
2022            command: Some(Commands::Spawn {
2023                name: Some("my-task".into()),
2024                workdir: Some("/tmp/repo".into()),
2025                ink: None,
2026                description: None,
2027                detach: true,
2028                idle_threshold: None,
2029                auto: false,
2030                worktree: false,
2031                runtime: None,
2032                secret: vec![],
2033                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2034            }),
2035            path: None,
2036        };
2037        let result = execute(&cli).await.unwrap();
2038        assert!(result.contains("Created session"));
2039    }
2040
2041    #[tokio::test]
2042    async fn test_execute_spawn_auto_attach() {
2043        let node = start_test_server().await;
2044        let cli = Cli {
2045            node,
2046            token: None,
2047            command: Some(Commands::Spawn {
2048                name: Some("test".into()),
2049                workdir: Some("/tmp/repo".into()),
2050                ink: None,
2051                description: None,
2052                detach: false,
2053                idle_threshold: None,
2054                auto: false,
2055                worktree: false,
2056                runtime: None,
2057                secret: vec![],
2058                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2059            }),
2060            path: None,
2061        };
2062        let result = execute(&cli).await.unwrap();
2063        // When not detached, spawn prints creation to stderr and returns detach message
2064        assert!(result.contains("Detached from session"));
2065    }
2066
2067    #[tokio::test]
2068    async fn test_execute_kill_success() {
2069        let node = start_test_server().await;
2070        let cli = Cli {
2071            node,
2072            token: None,
2073            command: Some(Commands::Kill {
2074                name: "test-session".into(),
2075            }),
2076            path: None,
2077        };
2078        let result = execute(&cli).await.unwrap();
2079        assert!(result.contains("killed"));
2080    }
2081
2082    #[tokio::test]
2083    async fn test_execute_delete_success() {
2084        let node = start_test_server().await;
2085        let cli = Cli {
2086            node,
2087            token: None,
2088            command: Some(Commands::Delete {
2089                name: "test-session".into(),
2090            }),
2091            path: None,
2092        };
2093        let result = execute(&cli).await.unwrap();
2094        assert!(result.contains("deleted"));
2095    }
2096
2097    #[tokio::test]
2098    async fn test_execute_logs_success() {
2099        let node = start_test_server().await;
2100        let cli = Cli {
2101            node,
2102            token: None,
2103            command: Some(Commands::Logs {
2104                name: "test-session".into(),
2105                lines: 50,
2106                follow: false,
2107            }),
2108            path: None,
2109        };
2110        let result = execute(&cli).await.unwrap();
2111        assert!(result.contains("test output"));
2112    }
2113
2114    #[tokio::test]
2115    async fn test_execute_list_connection_refused() {
2116        let cli = Cli {
2117            node: "localhost:1".into(),
2118            token: None,
2119            command: Some(Commands::List { all: false }),
2120            path: None,
2121        };
2122        let result = execute(&cli).await;
2123        let err = result.unwrap_err().to_string();
2124        assert!(
2125            err.contains("Could not connect to pulpod"),
2126            "Expected friendly error, got: {err}"
2127        );
2128        assert!(err.contains("localhost:1"));
2129    }
2130
2131    #[tokio::test]
2132    async fn test_execute_nodes_connection_refused() {
2133        let cli = Cli {
2134            node: "localhost:1".into(),
2135            token: None,
2136            command: Some(Commands::Nodes),
2137            path: None,
2138        };
2139        let result = execute(&cli).await;
2140        let err = result.unwrap_err().to_string();
2141        assert!(err.contains("Could not connect to pulpod"));
2142    }
2143
2144    #[tokio::test]
2145    async fn test_execute_kill_error_response() {
2146        use axum::{Router, http::StatusCode, routing::post};
2147
2148        let app = Router::new().route(
2149            "/api/v1/sessions/{id}/kill",
2150            post(|| async {
2151                (
2152                    StatusCode::NOT_FOUND,
2153                    "{\"error\":\"session not found: test-session\"}",
2154                )
2155            }),
2156        );
2157        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2158        let addr = listener.local_addr().unwrap();
2159        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2160        let node = format!("127.0.0.1:{}", addr.port());
2161
2162        let cli = Cli {
2163            node,
2164            token: None,
2165            command: Some(Commands::Kill {
2166                name: "test-session".into(),
2167            }),
2168            path: None,
2169        };
2170        let err = execute(&cli).await.unwrap_err();
2171        assert_eq!(err.to_string(), "session not found: test-session");
2172    }
2173
2174    #[tokio::test]
2175    async fn test_execute_delete_error_response() {
2176        use axum::{Router, http::StatusCode, routing::delete};
2177
2178        let app = Router::new().route(
2179            "/api/v1/sessions/{id}",
2180            delete(|| async {
2181                (
2182                    StatusCode::CONFLICT,
2183                    "{\"error\":\"cannot delete session in 'running' state\"}",
2184                )
2185            }),
2186        );
2187        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2188        let addr = listener.local_addr().unwrap();
2189        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2190        let node = format!("127.0.0.1:{}", addr.port());
2191
2192        let cli = Cli {
2193            node,
2194            token: None,
2195            command: Some(Commands::Delete {
2196                name: "test-session".into(),
2197            }),
2198            path: None,
2199        };
2200        let err = execute(&cli).await.unwrap_err();
2201        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
2202    }
2203
2204    #[tokio::test]
2205    async fn test_execute_logs_error_response() {
2206        use axum::{Router, http::StatusCode, routing::get};
2207
2208        let app = Router::new().route(
2209            "/api/v1/sessions/{id}/output",
2210            get(|| async {
2211                (
2212                    StatusCode::NOT_FOUND,
2213                    "{\"error\":\"session not found: ghost\"}",
2214                )
2215            }),
2216        );
2217        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2218        let addr = listener.local_addr().unwrap();
2219        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2220        let node = format!("127.0.0.1:{}", addr.port());
2221
2222        let cli = Cli {
2223            node,
2224            token: None,
2225            command: Some(Commands::Logs {
2226                name: "ghost".into(),
2227                lines: 50,
2228                follow: false,
2229            }),
2230            path: None,
2231        };
2232        let err = execute(&cli).await.unwrap_err();
2233        assert_eq!(err.to_string(), "session not found: ghost");
2234    }
2235
2236    #[tokio::test]
2237    async fn test_execute_resume_error_response() {
2238        use axum::{Router, http::StatusCode, routing::post};
2239
2240        let app = Router::new().route(
2241            "/api/v1/sessions/{id}/resume",
2242            post(|| async {
2243                (
2244                    StatusCode::BAD_REQUEST,
2245                    "{\"error\":\"session is not lost (status: active)\"}",
2246                )
2247            }),
2248        );
2249        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2250        let addr = listener.local_addr().unwrap();
2251        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2252        let node = format!("127.0.0.1:{}", addr.port());
2253
2254        let cli = Cli {
2255            node,
2256            token: None,
2257            command: Some(Commands::Resume {
2258                name: "test-session".into(),
2259            }),
2260            path: None,
2261        };
2262        let err = execute(&cli).await.unwrap_err();
2263        assert_eq!(err.to_string(), "session is not lost (status: active)");
2264    }
2265
2266    #[tokio::test]
2267    async fn test_execute_spawn_error_response() {
2268        use axum::{Router, http::StatusCode, routing::post};
2269
2270        let app = Router::new().route(
2271            "/api/v1/sessions",
2272            post(|| async {
2273                (
2274                    StatusCode::INTERNAL_SERVER_ERROR,
2275                    "{\"error\":\"failed to spawn session\"}",
2276                )
2277            }),
2278        );
2279        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2280        let addr = listener.local_addr().unwrap();
2281        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2282        let node = format!("127.0.0.1:{}", addr.port());
2283
2284        let cli = Cli {
2285            node,
2286            token: None,
2287            command: Some(Commands::Spawn {
2288                name: Some("test".into()),
2289                workdir: Some("/tmp/repo".into()),
2290                ink: None,
2291                description: None,
2292                detach: true,
2293                idle_threshold: None,
2294                auto: false,
2295                worktree: false,
2296                runtime: None,
2297                secret: vec![],
2298                command: vec!["test".into()],
2299            }),
2300            path: None,
2301        };
2302        let err = execute(&cli).await.unwrap_err();
2303        assert_eq!(err.to_string(), "failed to spawn session");
2304    }
2305
2306    #[tokio::test]
2307    async fn test_execute_interventions_error_response() {
2308        use axum::{Router, http::StatusCode, routing::get};
2309
2310        let app = Router::new().route(
2311            "/api/v1/sessions/{id}/interventions",
2312            get(|| async {
2313                (
2314                    StatusCode::NOT_FOUND,
2315                    "{\"error\":\"session not found: ghost\"}",
2316                )
2317            }),
2318        );
2319        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2320        let addr = listener.local_addr().unwrap();
2321        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2322        let node = format!("127.0.0.1:{}", addr.port());
2323
2324        let cli = Cli {
2325            node,
2326            token: None,
2327            command: Some(Commands::Interventions {
2328                name: "ghost".into(),
2329            }),
2330            path: None,
2331        };
2332        let err = execute(&cli).await.unwrap_err();
2333        assert_eq!(err.to_string(), "session not found: ghost");
2334    }
2335
2336    #[tokio::test]
2337    async fn test_execute_resume_success() {
2338        let node = start_test_server().await;
2339        let cli = Cli {
2340            node,
2341            token: None,
2342            command: Some(Commands::Resume {
2343                name: "test-session".into(),
2344            }),
2345            path: None,
2346        };
2347        let result = execute(&cli).await.unwrap();
2348        assert!(result.contains("Detached from session"));
2349    }
2350
2351    #[tokio::test]
2352    async fn test_execute_input_success() {
2353        let node = start_test_server().await;
2354        let cli = Cli {
2355            node,
2356            token: None,
2357            command: Some(Commands::Input {
2358                name: "test-session".into(),
2359                text: Some("yes".into()),
2360            }),
2361            path: None,
2362        };
2363        let result = execute(&cli).await.unwrap();
2364        assert!(result.contains("Sent input to session test-session"));
2365    }
2366
2367    #[tokio::test]
2368    async fn test_execute_input_no_text() {
2369        let node = start_test_server().await;
2370        let cli = Cli {
2371            node,
2372            token: None,
2373            command: Some(Commands::Input {
2374                name: "test-session".into(),
2375                text: None,
2376            }),
2377            path: None,
2378        };
2379        let result = execute(&cli).await.unwrap();
2380        assert!(result.contains("Sent input to session test-session"));
2381    }
2382
2383    #[tokio::test]
2384    async fn test_execute_input_connection_refused() {
2385        let cli = Cli {
2386            node: "localhost:1".into(),
2387            token: None,
2388            command: Some(Commands::Input {
2389                name: "test".into(),
2390                text: Some("y".into()),
2391            }),
2392            path: None,
2393        };
2394        let result = execute(&cli).await;
2395        let err = result.unwrap_err().to_string();
2396        assert!(err.contains("Could not connect to pulpod"));
2397    }
2398
2399    #[tokio::test]
2400    async fn test_execute_input_error_response() {
2401        use axum::{Router, http::StatusCode, routing::post};
2402
2403        let app = Router::new().route(
2404            "/api/v1/sessions/{id}/input",
2405            post(|| async {
2406                (
2407                    StatusCode::NOT_FOUND,
2408                    "{\"error\":\"session not found: ghost\"}",
2409                )
2410            }),
2411        );
2412        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2413        let addr = listener.local_addr().unwrap();
2414        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2415        let node = format!("127.0.0.1:{}", addr.port());
2416
2417        let cli = Cli {
2418            node,
2419            token: None,
2420            command: Some(Commands::Input {
2421                name: "ghost".into(),
2422                text: Some("y".into()),
2423            }),
2424            path: None,
2425        };
2426        let err = execute(&cli).await.unwrap_err();
2427        assert_eq!(err.to_string(), "session not found: ghost");
2428    }
2429
2430    #[tokio::test]
2431    async fn test_execute_ui() {
2432        let cli = Cli {
2433            node: "localhost:7433".into(),
2434            token: None,
2435            command: Some(Commands::Ui),
2436            path: None,
2437        };
2438        let result = execute(&cli).await.unwrap();
2439        assert!(result.contains("Opening"));
2440        assert!(result.contains("http://localhost:7433"));
2441    }
2442
2443    #[tokio::test]
2444    async fn test_execute_ui_custom_node() {
2445        let cli = Cli {
2446            node: "mac-mini:7433".into(),
2447            token: None,
2448            command: Some(Commands::Ui),
2449            path: None,
2450        };
2451        let result = execute(&cli).await.unwrap();
2452        assert!(result.contains("http://mac-mini:7433"));
2453    }
2454
2455    #[test]
2456    fn test_format_sessions_empty() {
2457        assert_eq!(format_sessions(&[]), "No sessions.");
2458    }
2459
2460    #[test]
2461    fn test_format_sessions_with_data() {
2462        use chrono::Utc;
2463        use pulpo_common::session::SessionStatus;
2464        use uuid::Uuid;
2465
2466        let sessions = vec![Session {
2467            id: Uuid::nil(),
2468            name: "my-api".into(),
2469            workdir: "/tmp/repo".into(),
2470            command: "claude -p 'Fix the bug'".into(),
2471            description: Some("Fix the bug".into()),
2472            status: SessionStatus::Active,
2473            exit_code: None,
2474            backend_session_id: None,
2475            output_snapshot: None,
2476            metadata: None,
2477            ink: None,
2478            intervention_code: None,
2479            intervention_reason: None,
2480            intervention_at: None,
2481            last_output_at: None,
2482            idle_since: None,
2483            idle_threshold_secs: None,
2484            worktree_path: None,
2485            runtime: Runtime::Tmux,
2486            created_at: Utc::now(),
2487            updated_at: Utc::now(),
2488        }];
2489        let output = format_sessions(&sessions);
2490        assert!(output.contains("ID"));
2491        assert!(output.contains("NAME"));
2492        assert!(output.contains("RUNTIME"));
2493        assert!(output.contains("COMMAND"));
2494        assert!(output.contains("00000000"));
2495        assert!(output.contains("my-api"));
2496        assert!(output.contains("active"));
2497        assert!(output.contains("tmux"));
2498        assert!(output.contains("claude -p 'Fix the bug'"));
2499    }
2500
2501    #[test]
2502    fn test_format_sessions_docker_runtime() {
2503        use chrono::Utc;
2504        use pulpo_common::session::SessionStatus;
2505        use uuid::Uuid;
2506
2507        let sessions = vec![Session {
2508            id: Uuid::nil(),
2509            name: "sandbox-test".into(),
2510            workdir: "/tmp".into(),
2511            command: "claude".into(),
2512            description: None,
2513            status: SessionStatus::Active,
2514            exit_code: None,
2515            backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2516            output_snapshot: None,
2517            metadata: None,
2518            ink: None,
2519            intervention_code: None,
2520            intervention_reason: None,
2521            intervention_at: None,
2522            last_output_at: None,
2523            idle_since: None,
2524            idle_threshold_secs: None,
2525            worktree_path: None,
2526            runtime: Runtime::Docker,
2527            created_at: Utc::now(),
2528            updated_at: Utc::now(),
2529        }];
2530        let output = format_sessions(&sessions);
2531        assert!(output.contains("docker"));
2532    }
2533
2534    #[test]
2535    fn test_format_sessions_long_command_truncated() {
2536        use chrono::Utc;
2537        use pulpo_common::session::SessionStatus;
2538        use uuid::Uuid;
2539
2540        let sessions = vec![Session {
2541            id: Uuid::nil(),
2542            name: "test".into(),
2543            workdir: "/tmp".into(),
2544            command:
2545                "claude -p 'A very long command that exceeds fifty characters in total length here'"
2546                    .into(),
2547            description: None,
2548            status: SessionStatus::Ready,
2549            exit_code: None,
2550            backend_session_id: None,
2551            output_snapshot: None,
2552            metadata: None,
2553            ink: None,
2554            intervention_code: None,
2555            intervention_reason: None,
2556            intervention_at: None,
2557            last_output_at: None,
2558            idle_since: None,
2559            idle_threshold_secs: None,
2560            worktree_path: None,
2561            runtime: Runtime::Tmux,
2562            created_at: Utc::now(),
2563            updated_at: Utc::now(),
2564        }];
2565        let output = format_sessions(&sessions);
2566        assert!(output.contains("..."));
2567    }
2568
2569    #[test]
2570    fn test_format_sessions_worktree_indicator() {
2571        use chrono::Utc;
2572        use pulpo_common::session::SessionStatus;
2573        use uuid::Uuid;
2574
2575        let sessions = vec![Session {
2576            id: Uuid::nil(),
2577            name: "wt-task".into(),
2578            workdir: "/repo/.pulpo/worktrees/wt-task".into(),
2579            command: "claude".into(),
2580            description: None,
2581            status: SessionStatus::Active,
2582            exit_code: None,
2583            backend_session_id: None,
2584            output_snapshot: None,
2585            metadata: None,
2586            ink: None,
2587            intervention_code: None,
2588            intervention_reason: None,
2589            intervention_at: None,
2590            last_output_at: None,
2591            idle_since: None,
2592            idle_threshold_secs: None,
2593            worktree_path: Some("/repo/.pulpo/worktrees/wt-task".into()),
2594            runtime: Runtime::Tmux,
2595            created_at: Utc::now(),
2596            updated_at: Utc::now(),
2597        }];
2598        let output = format_sessions(&sessions);
2599        assert!(
2600            output.contains("[wt]"),
2601            "should show worktree indicator: {output}"
2602        );
2603        assert!(output.contains("wt-task [wt]"));
2604    }
2605
2606    #[test]
2607    fn test_format_sessions_pr_indicator() {
2608        use chrono::Utc;
2609        use pulpo_common::session::SessionStatus;
2610        use std::collections::HashMap;
2611        use uuid::Uuid;
2612
2613        let mut meta = HashMap::new();
2614        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2615        let sessions = vec![Session {
2616            id: Uuid::nil(),
2617            name: "pr-task".into(),
2618            workdir: "/tmp".into(),
2619            command: "claude".into(),
2620            description: None,
2621            status: SessionStatus::Active,
2622            exit_code: None,
2623            backend_session_id: None,
2624            output_snapshot: None,
2625            metadata: Some(meta),
2626            ink: None,
2627            intervention_code: None,
2628            intervention_reason: None,
2629            intervention_at: None,
2630            last_output_at: None,
2631            idle_since: None,
2632            idle_threshold_secs: None,
2633            worktree_path: None,
2634            runtime: Runtime::Tmux,
2635            created_at: Utc::now(),
2636            updated_at: Utc::now(),
2637        }];
2638        let output = format_sessions(&sessions);
2639        assert!(
2640            output.contains("[PR]"),
2641            "should show PR indicator: {output}"
2642        );
2643        assert!(output.contains("pr-task [PR]"));
2644    }
2645
2646    #[test]
2647    fn test_format_sessions_worktree_and_pr_indicator() {
2648        use chrono::Utc;
2649        use pulpo_common::session::SessionStatus;
2650        use std::collections::HashMap;
2651        use uuid::Uuid;
2652
2653        let mut meta = HashMap::new();
2654        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2655        let sessions = vec![Session {
2656            id: Uuid::nil(),
2657            name: "both-task".into(),
2658            workdir: "/tmp".into(),
2659            command: "claude".into(),
2660            description: None,
2661            status: SessionStatus::Active,
2662            exit_code: None,
2663            backend_session_id: None,
2664            output_snapshot: None,
2665            metadata: Some(meta),
2666            ink: None,
2667            intervention_code: None,
2668            intervention_reason: None,
2669            intervention_at: None,
2670            last_output_at: None,
2671            idle_since: None,
2672            idle_threshold_secs: None,
2673            worktree_path: Some("/repo/.pulpo/worktrees/both-task".into()),
2674            runtime: Runtime::Tmux,
2675            created_at: Utc::now(),
2676            updated_at: Utc::now(),
2677        }];
2678        let output = format_sessions(&sessions);
2679        assert!(
2680            output.contains("[wt] [PR]"),
2681            "should show both indicators: {output}"
2682        );
2683    }
2684
2685    #[test]
2686    fn test_format_sessions_no_pr_without_metadata() {
2687        use chrono::Utc;
2688        use pulpo_common::session::SessionStatus;
2689        use uuid::Uuid;
2690
2691        let sessions = vec![Session {
2692            id: Uuid::nil(),
2693            name: "no-pr".into(),
2694            workdir: "/tmp".into(),
2695            command: "claude".into(),
2696            description: None,
2697            status: SessionStatus::Active,
2698            exit_code: None,
2699            backend_session_id: None,
2700            output_snapshot: None,
2701            metadata: None,
2702            ink: None,
2703            intervention_code: None,
2704            intervention_reason: None,
2705            intervention_at: None,
2706            last_output_at: None,
2707            idle_since: None,
2708            idle_threshold_secs: None,
2709            worktree_path: None,
2710            runtime: Runtime::Tmux,
2711            created_at: Utc::now(),
2712            updated_at: Utc::now(),
2713        }];
2714        let output = format_sessions(&sessions);
2715        assert!(
2716            !output.contains("[PR]"),
2717            "should not show PR indicator: {output}"
2718        );
2719    }
2720
2721    #[test]
2722    fn test_format_nodes() {
2723        use pulpo_common::node::NodeInfo;
2724        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2725
2726        let resp = PeersResponse {
2727            local: NodeInfo {
2728                name: "mac-mini".into(),
2729                hostname: "h".into(),
2730                os: "macos".into(),
2731                arch: "arm64".into(),
2732                cpus: 8,
2733                memory_mb: 16384,
2734                gpu: None,
2735            },
2736            peers: vec![PeerInfo {
2737                name: "win-pc".into(),
2738                address: "win-pc:7433".into(),
2739                status: PeerStatus::Online,
2740                node_info: None,
2741                session_count: Some(3),
2742                source: PeerSource::Configured,
2743            }],
2744        };
2745        let output = format_nodes(&resp);
2746        assert!(output.contains("mac-mini"));
2747        assert!(output.contains("(local)"));
2748        assert!(output.contains("win-pc"));
2749        assert!(output.contains('3'));
2750    }
2751
2752    #[test]
2753    fn test_format_nodes_no_session_count() {
2754        use pulpo_common::node::NodeInfo;
2755        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2756
2757        let resp = PeersResponse {
2758            local: NodeInfo {
2759                name: "local".into(),
2760                hostname: "h".into(),
2761                os: "linux".into(),
2762                arch: "x86_64".into(),
2763                cpus: 4,
2764                memory_mb: 8192,
2765                gpu: None,
2766            },
2767            peers: vec![PeerInfo {
2768                name: "peer".into(),
2769                address: "peer:7433".into(),
2770                status: PeerStatus::Offline,
2771                node_info: None,
2772                session_count: None,
2773                source: PeerSource::Configured,
2774            }],
2775        };
2776        let output = format_nodes(&resp);
2777        assert!(output.contains("offline"));
2778        // No session count → shows "-"
2779        let lines: Vec<&str> = output.lines().collect();
2780        assert!(lines[2].contains('-'));
2781    }
2782
2783    #[tokio::test]
2784    async fn test_execute_resume_connection_refused() {
2785        let cli = Cli {
2786            node: "localhost:1".into(),
2787            token: None,
2788            command: Some(Commands::Resume {
2789                name: "test".into(),
2790            }),
2791            path: None,
2792        };
2793        let result = execute(&cli).await;
2794        let err = result.unwrap_err().to_string();
2795        assert!(err.contains("Could not connect to pulpod"));
2796    }
2797
2798    #[tokio::test]
2799    async fn test_execute_spawn_connection_refused() {
2800        let cli = Cli {
2801            node: "localhost:1".into(),
2802            token: None,
2803            command: Some(Commands::Spawn {
2804                name: Some("test".into()),
2805                workdir: Some("/tmp".into()),
2806                ink: None,
2807                description: None,
2808                detach: true,
2809                idle_threshold: None,
2810                auto: false,
2811                worktree: false,
2812                runtime: None,
2813                secret: vec![],
2814                command: vec!["test".into()],
2815            }),
2816            path: None,
2817        };
2818        let result = execute(&cli).await;
2819        let err = result.unwrap_err().to_string();
2820        assert!(err.contains("Could not connect to pulpod"));
2821    }
2822
2823    #[tokio::test]
2824    async fn test_execute_kill_connection_refused() {
2825        let cli = Cli {
2826            node: "localhost:1".into(),
2827            token: None,
2828            command: Some(Commands::Kill {
2829                name: "test".into(),
2830            }),
2831            path: None,
2832        };
2833        let result = execute(&cli).await;
2834        let err = result.unwrap_err().to_string();
2835        assert!(err.contains("Could not connect to pulpod"));
2836    }
2837
2838    #[tokio::test]
2839    async fn test_execute_delete_connection_refused() {
2840        let cli = Cli {
2841            node: "localhost:1".into(),
2842            token: None,
2843            command: Some(Commands::Delete {
2844                name: "test".into(),
2845            }),
2846            path: None,
2847        };
2848        let result = execute(&cli).await;
2849        let err = result.unwrap_err().to_string();
2850        assert!(err.contains("Could not connect to pulpod"));
2851    }
2852
2853    #[tokio::test]
2854    async fn test_execute_logs_connection_refused() {
2855        let cli = Cli {
2856            node: "localhost:1".into(),
2857            token: None,
2858            command: Some(Commands::Logs {
2859                name: "test".into(),
2860                lines: 50,
2861                follow: false,
2862            }),
2863            path: None,
2864        };
2865        let result = execute(&cli).await;
2866        let err = result.unwrap_err().to_string();
2867        assert!(err.contains("Could not connect to pulpod"));
2868    }
2869
2870    #[tokio::test]
2871    async fn test_friendly_error_connect() {
2872        // Make a request to a closed port to get a connect error
2873        let err = reqwest::Client::new()
2874            .get("http://127.0.0.1:1")
2875            .send()
2876            .await
2877            .unwrap_err();
2878        let friendly = friendly_error(&err, "test-node:1");
2879        let msg = friendly.to_string();
2880        assert!(
2881            msg.contains("Could not connect"),
2882            "Expected connect message, got: {msg}"
2883        );
2884    }
2885
2886    #[tokio::test]
2887    async fn test_friendly_error_other() {
2888        // A request to an invalid URL creates a builder error, not a connect error
2889        let err = reqwest::Client::new()
2890            .get("http://[::invalid::url")
2891            .send()
2892            .await
2893            .unwrap_err();
2894        let friendly = friendly_error(&err, "bad-host");
2895        let msg = friendly.to_string();
2896        assert!(
2897            msg.contains("Network error"),
2898            "Expected network error message, got: {msg}"
2899        );
2900        assert!(msg.contains("bad-host"));
2901    }
2902
2903    // -- Auth helper tests --
2904
2905    #[test]
2906    fn test_is_localhost_variants() {
2907        assert!(is_localhost("localhost:7433"));
2908        assert!(is_localhost("127.0.0.1:7433"));
2909        assert!(is_localhost("[::1]:7433"));
2910        assert!(is_localhost("::1"));
2911        assert!(is_localhost("localhost"));
2912        assert!(!is_localhost("mac-mini:7433"));
2913        assert!(!is_localhost("192.168.1.100:7433"));
2914    }
2915
2916    #[test]
2917    fn test_authed_get_with_token() {
2918        let client = reqwest::Client::new();
2919        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2920            .build()
2921            .unwrap();
2922        let auth = req
2923            .headers()
2924            .get("authorization")
2925            .unwrap()
2926            .to_str()
2927            .unwrap();
2928        assert_eq!(auth, "Bearer tok");
2929    }
2930
2931    #[test]
2932    fn test_authed_get_without_token() {
2933        let client = reqwest::Client::new();
2934        let req = authed_get(&client, "http://h:1/api".into(), None)
2935            .build()
2936            .unwrap();
2937        assert!(req.headers().get("authorization").is_none());
2938    }
2939
2940    #[test]
2941    fn test_authed_post_with_token() {
2942        let client = reqwest::Client::new();
2943        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2944            .build()
2945            .unwrap();
2946        let auth = req
2947            .headers()
2948            .get("authorization")
2949            .unwrap()
2950            .to_str()
2951            .unwrap();
2952        assert_eq!(auth, "Bearer secret");
2953    }
2954
2955    #[test]
2956    fn test_authed_post_without_token() {
2957        let client = reqwest::Client::new();
2958        let req = authed_post(&client, "http://h:1/api".into(), None)
2959            .build()
2960            .unwrap();
2961        assert!(req.headers().get("authorization").is_none());
2962    }
2963
2964    #[test]
2965    fn test_authed_delete_with_token() {
2966        let client = reqwest::Client::new();
2967        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2968            .build()
2969            .unwrap();
2970        let auth = req
2971            .headers()
2972            .get("authorization")
2973            .unwrap()
2974            .to_str()
2975            .unwrap();
2976        assert_eq!(auth, "Bearer del-tok");
2977    }
2978
2979    #[test]
2980    fn test_authed_delete_without_token() {
2981        let client = reqwest::Client::new();
2982        let req = authed_delete(&client, "http://h:1/api".into(), None)
2983            .build()
2984            .unwrap();
2985        assert!(req.headers().get("authorization").is_none());
2986    }
2987
2988    #[tokio::test]
2989    async fn test_resolve_token_explicit() {
2990        let client = reqwest::Client::new();
2991        let token =
2992            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2993        assert_eq!(token, Some("my-tok".into()));
2994    }
2995
2996    #[tokio::test]
2997    async fn test_resolve_token_remote_no_explicit() {
2998        let client = reqwest::Client::new();
2999        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3000        assert_eq!(token, None);
3001    }
3002
3003    #[tokio::test]
3004    async fn test_resolve_token_localhost_auto_discover() {
3005        use axum::{Json, Router, routing::get};
3006
3007        let app = Router::new().route(
3008            "/api/v1/auth/token",
3009            get(|| async {
3010                Json(AuthTokenResponse {
3011                    token: "discovered".into(),
3012                })
3013            }),
3014        );
3015        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3016        let addr = listener.local_addr().unwrap();
3017        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3018
3019        let node = format!("localhost:{}", addr.port());
3020        let base = base_url(&node);
3021        let client = reqwest::Client::new();
3022        let token = resolve_token(&client, &base, &node, None).await;
3023        assert_eq!(token, Some("discovered".into()));
3024    }
3025
3026    #[tokio::test]
3027    async fn test_discover_token_empty_returns_none() {
3028        use axum::{Json, Router, routing::get};
3029
3030        let app = Router::new().route(
3031            "/api/v1/auth/token",
3032            get(|| async {
3033                Json(AuthTokenResponse {
3034                    token: String::new(),
3035                })
3036            }),
3037        );
3038        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3039        let addr = listener.local_addr().unwrap();
3040        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3041
3042        let base = format!("http://127.0.0.1:{}", addr.port());
3043        let client = reqwest::Client::new();
3044        assert_eq!(discover_token(&client, &base).await, None);
3045    }
3046
3047    #[tokio::test]
3048    async fn test_discover_token_unreachable_returns_none() {
3049        let client = reqwest::Client::new();
3050        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3051    }
3052
3053    #[test]
3054    fn test_cli_parse_with_token() {
3055        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3056        assert_eq!(cli.token, Some("my-secret".into()));
3057    }
3058
3059    #[test]
3060    fn test_cli_parse_without_token() {
3061        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3062        assert_eq!(cli.token, None);
3063    }
3064
3065    #[tokio::test]
3066    async fn test_execute_with_explicit_token_sends_header() {
3067        use axum::{Router, extract::Request, http::StatusCode, routing::get};
3068
3069        let app = Router::new().route(
3070            "/api/v1/sessions",
3071            get(|req: Request| async move {
3072                let auth = req
3073                    .headers()
3074                    .get("authorization")
3075                    .and_then(|v| v.to_str().ok())
3076                    .unwrap_or("");
3077                assert_eq!(auth, "Bearer test-token");
3078                (StatusCode::OK, "[]".to_owned())
3079            }),
3080        );
3081        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3082        let addr = listener.local_addr().unwrap();
3083        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3084        let node = format!("127.0.0.1:{}", addr.port());
3085
3086        let cli = Cli {
3087            node,
3088            token: Some("test-token".into()),
3089            command: Some(Commands::List { all: false }),
3090            path: None,
3091        };
3092        let result = execute(&cli).await.unwrap();
3093        assert_eq!(result, "No sessions.");
3094    }
3095
3096    // -- Interventions tests --
3097
3098    #[test]
3099    fn test_cli_parse_interventions() {
3100        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3101        assert!(matches!(
3102            &cli.command,
3103            Some(Commands::Interventions { name }) if name == "my-session"
3104        ));
3105    }
3106
3107    #[test]
3108    fn test_format_interventions_empty() {
3109        assert_eq!(format_interventions(&[]), "No intervention events.");
3110    }
3111
3112    #[test]
3113    fn test_format_interventions_with_data() {
3114        let events = vec![
3115            InterventionEventResponse {
3116                id: 1,
3117                session_id: "sess-1".into(),
3118                code: None,
3119                reason: "Memory exceeded threshold".into(),
3120                created_at: "2026-01-01T00:00:00Z".into(),
3121            },
3122            InterventionEventResponse {
3123                id: 2,
3124                session_id: "sess-1".into(),
3125                code: None,
3126                reason: "Idle for 10 minutes".into(),
3127                created_at: "2026-01-02T00:00:00Z".into(),
3128            },
3129        ];
3130        let output = format_interventions(&events);
3131        assert!(output.contains("ID"));
3132        assert!(output.contains("TIMESTAMP"));
3133        assert!(output.contains("REASON"));
3134        assert!(output.contains("Memory exceeded threshold"));
3135        assert!(output.contains("Idle for 10 minutes"));
3136        assert!(output.contains("2026-01-01T00:00:00Z"));
3137    }
3138
3139    #[tokio::test]
3140    async fn test_execute_interventions_empty() {
3141        let node = start_test_server().await;
3142        let cli = Cli {
3143            node,
3144            token: None,
3145            command: Some(Commands::Interventions {
3146                name: "my-session".into(),
3147            }),
3148            path: None,
3149        };
3150        let result = execute(&cli).await.unwrap();
3151        assert_eq!(result, "No intervention events.");
3152    }
3153
3154    #[tokio::test]
3155    async fn test_execute_interventions_with_data() {
3156        use axum::{Router, routing::get};
3157
3158        let app = Router::new().route(
3159            "/api/v1/sessions/{id}/interventions",
3160            get(|| async {
3161                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3162                    .to_owned()
3163            }),
3164        );
3165        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3166        let addr = listener.local_addr().unwrap();
3167        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3168        let node = format!("127.0.0.1:{}", addr.port());
3169
3170        let cli = Cli {
3171            node,
3172            token: None,
3173            command: Some(Commands::Interventions {
3174                name: "test".into(),
3175            }),
3176            path: None,
3177        };
3178        let result = execute(&cli).await.unwrap();
3179        assert!(result.contains("OOM"));
3180        assert!(result.contains("2026-01-01T00:00:00Z"));
3181    }
3182
3183    #[tokio::test]
3184    async fn test_execute_interventions_connection_refused() {
3185        let cli = Cli {
3186            node: "localhost:1".into(),
3187            token: None,
3188            command: Some(Commands::Interventions {
3189                name: "test".into(),
3190            }),
3191            path: None,
3192        };
3193        let result = execute(&cli).await;
3194        let err = result.unwrap_err().to_string();
3195        assert!(err.contains("Could not connect to pulpod"));
3196    }
3197
3198    // -- Attach command tests --
3199
3200    #[test]
3201    fn test_build_attach_command_tmux() {
3202        let cmd = build_attach_command("my-session");
3203        assert_eq!(cmd.get_program(), "tmux");
3204        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3205        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3206    }
3207
3208    #[test]
3209    fn test_build_attach_command_docker() {
3210        let cmd = build_attach_command("docker:pulpo-my-task");
3211        assert_eq!(cmd.get_program(), "docker");
3212        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3213        assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3214    }
3215
3216    #[test]
3217    fn test_cli_parse_attach() {
3218        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3219        assert!(matches!(
3220            &cli.command,
3221            Some(Commands::Attach { name }) if name == "my-session"
3222        ));
3223    }
3224
3225    #[test]
3226    fn test_cli_parse_attach_alias() {
3227        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3228        assert!(matches!(
3229            &cli.command,
3230            Some(Commands::Attach { name }) if name == "my-session"
3231        ));
3232    }
3233
3234    #[tokio::test]
3235    async fn test_execute_attach_success() {
3236        let node = start_test_server().await;
3237        let cli = Cli {
3238            node,
3239            token: None,
3240            command: Some(Commands::Attach {
3241                name: "test-session".into(),
3242            }),
3243            path: None,
3244        };
3245        let result = execute(&cli).await.unwrap();
3246        assert!(result.contains("Detached from session test-session"));
3247    }
3248
3249    #[tokio::test]
3250    async fn test_execute_attach_with_backend_session_id() {
3251        use axum::{Router, routing::get};
3252        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3253        let app = Router::new().route(
3254            "/api/v1/sessions/{id}",
3255            get(move || async move { session_json.to_owned() }),
3256        );
3257        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3258        let addr = listener.local_addr().unwrap();
3259        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3260
3261        let cli = Cli {
3262            node: format!("127.0.0.1:{}", addr.port()),
3263            token: None,
3264            command: Some(Commands::Attach {
3265                name: "my-session".into(),
3266            }),
3267            path: None,
3268        };
3269        let result = execute(&cli).await.unwrap();
3270        assert!(result.contains("Detached from session my-session"));
3271    }
3272
3273    #[tokio::test]
3274    async fn test_execute_attach_connection_refused() {
3275        let cli = Cli {
3276            node: "localhost:1".into(),
3277            token: None,
3278            command: Some(Commands::Attach {
3279                name: "test-session".into(),
3280            }),
3281            path: None,
3282        };
3283        let result = execute(&cli).await;
3284        let err = result.unwrap_err().to_string();
3285        assert!(err.contains("Could not connect to pulpod"));
3286    }
3287
3288    #[tokio::test]
3289    async fn test_execute_attach_error_response() {
3290        use axum::{Router, http::StatusCode, routing::get};
3291        let app = Router::new().route(
3292            "/api/v1/sessions/{id}",
3293            get(|| async {
3294                (
3295                    StatusCode::NOT_FOUND,
3296                    r#"{"error":"session not found"}"#.to_owned(),
3297                )
3298            }),
3299        );
3300        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3301        let addr = listener.local_addr().unwrap();
3302        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3303
3304        let cli = Cli {
3305            node: format!("127.0.0.1:{}", addr.port()),
3306            token: None,
3307            command: Some(Commands::Attach {
3308                name: "nonexistent".into(),
3309            }),
3310            path: None,
3311        };
3312        let result = execute(&cli).await;
3313        let err = result.unwrap_err().to_string();
3314        assert!(err.contains("session not found"));
3315    }
3316
3317    #[tokio::test]
3318    async fn test_execute_attach_stale_session() {
3319        use axum::{Router, routing::get};
3320        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3321        let app = Router::new().route(
3322            "/api/v1/sessions/{id}",
3323            get(move || async move { session_json.to_owned() }),
3324        );
3325        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3326        let addr = listener.local_addr().unwrap();
3327        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3328
3329        let cli = Cli {
3330            node: format!("127.0.0.1:{}", addr.port()),
3331            token: None,
3332            command: Some(Commands::Attach {
3333                name: "stale-sess".into(),
3334            }),
3335            path: None,
3336        };
3337        let result = execute(&cli).await;
3338        let err = result.unwrap_err().to_string();
3339        assert!(err.contains("lost"));
3340        assert!(err.contains("pulpo resume"));
3341    }
3342
3343    #[tokio::test]
3344    async fn test_execute_attach_dead_session() {
3345        use axum::{Router, routing::get};
3346        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3347        let app = Router::new().route(
3348            "/api/v1/sessions/{id}",
3349            get(move || async move { session_json.to_owned() }),
3350        );
3351        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3352        let addr = listener.local_addr().unwrap();
3353        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3354
3355        let cli = Cli {
3356            node: format!("127.0.0.1:{}", addr.port()),
3357            token: None,
3358            command: Some(Commands::Attach {
3359                name: "dead-sess".into(),
3360            }),
3361            path: None,
3362        };
3363        let result = execute(&cli).await;
3364        let err = result.unwrap_err().to_string();
3365        assert!(err.contains("killed"));
3366        assert!(err.contains("cannot attach"));
3367    }
3368
3369    // -- Alias parse tests --
3370
3371    #[test]
3372    fn test_cli_parse_alias_spawn() {
3373        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3374        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3375    }
3376
3377    #[test]
3378    fn test_cli_parse_alias_list() {
3379        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3380        assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3381    }
3382
3383    #[test]
3384    fn test_cli_parse_list_all() {
3385        let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3386        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3387
3388        let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3389        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3390    }
3391
3392    #[test]
3393    fn test_cli_parse_alias_logs() {
3394        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3395        assert!(matches!(
3396            &cli.command,
3397            Some(Commands::Logs { name, .. }) if name == "my-session"
3398        ));
3399    }
3400
3401    #[test]
3402    fn test_cli_parse_alias_kill() {
3403        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3404        assert!(matches!(
3405            &cli.command,
3406            Some(Commands::Kill { name }) if name == "my-session"
3407        ));
3408    }
3409
3410    #[test]
3411    fn test_cli_parse_alias_delete() {
3412        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
3413        assert!(matches!(
3414            &cli.command,
3415            Some(Commands::Delete { name }) if name == "my-session"
3416        ));
3417    }
3418
3419    #[test]
3420    fn test_cli_parse_alias_resume() {
3421        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3422        assert!(matches!(
3423            &cli.command,
3424            Some(Commands::Resume { name }) if name == "my-session"
3425        ));
3426    }
3427
3428    #[test]
3429    fn test_cli_parse_alias_nodes() {
3430        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3431        assert!(matches!(&cli.command, Some(Commands::Nodes)));
3432    }
3433
3434    #[test]
3435    fn test_cli_parse_alias_interventions() {
3436        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3437        assert!(matches!(
3438            &cli.command,
3439            Some(Commands::Interventions { name }) if name == "my-session"
3440        ));
3441    }
3442
3443    #[test]
3444    fn test_api_error_json() {
3445        let err = api_error("{\"error\":\"session not found: foo\"}");
3446        assert_eq!(err.to_string(), "session not found: foo");
3447    }
3448
3449    #[test]
3450    fn test_api_error_plain_text() {
3451        let err = api_error("plain text error");
3452        assert_eq!(err.to_string(), "plain text error");
3453    }
3454
3455    // -- diff_output tests --
3456
3457    #[test]
3458    fn test_diff_output_empty_prev() {
3459        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3460    }
3461
3462    #[test]
3463    fn test_diff_output_identical() {
3464        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3465    }
3466
3467    #[test]
3468    fn test_diff_output_new_lines_appended() {
3469        let prev = "line1\nline2";
3470        let new = "line1\nline2\nline3\nline4";
3471        assert_eq!(diff_output(prev, new), "line3\nline4");
3472    }
3473
3474    #[test]
3475    fn test_diff_output_scrolled_window() {
3476        // Window of 3 lines: old lines scroll off top, new appear at bottom
3477        let prev = "line1\nline2\nline3";
3478        let new = "line2\nline3\nline4";
3479        assert_eq!(diff_output(prev, new), "line4");
3480    }
3481
3482    #[test]
3483    fn test_diff_output_completely_different() {
3484        let prev = "aaa\nbbb";
3485        let new = "xxx\nyyy";
3486        assert_eq!(diff_output(prev, new), "xxx\nyyy");
3487    }
3488
3489    #[test]
3490    fn test_diff_output_last_line_matches_but_overlap_fails() {
3491        // Last line of prev appears in new but preceding lines don't match
3492        let prev = "aaa\ncommon";
3493        let new = "zzz\ncommon\nnew_line";
3494        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
3495        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
3496        // Falls through, no verified overlap, so returns everything
3497        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3498    }
3499
3500    #[test]
3501    fn test_diff_output_new_empty() {
3502        assert_eq!(diff_output("line1", ""), "");
3503    }
3504
3505    // -- follow_logs tests --
3506
3507    /// Start a test server that simulates evolving output and session status transitions.
3508    /// Start a test server that simulates evolving output with agent exit marker.
3509    async fn start_follow_test_server() -> String {
3510        use axum::{Router, extract::Path, extract::Query, routing::get};
3511        use std::sync::Arc;
3512        use std::sync::atomic::{AtomicUsize, Ordering};
3513
3514        let call_count = Arc::new(AtomicUsize::new(0));
3515        let output_count = call_count.clone();
3516
3517        let app = Router::new()
3518            .route(
3519                "/api/v1/sessions/{id}/output",
3520                get(
3521                    move |_path: Path<String>,
3522                          _query: Query<std::collections::HashMap<String, String>>| {
3523                        let count = output_count.clone();
3524                        async move {
3525                            let n = count.fetch_add(1, Ordering::SeqCst);
3526                            let output = match n {
3527                                0 => "line1\nline2".to_owned(),
3528                                1 => "line1\nline2\nline3".to_owned(),
3529                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3530                            };
3531                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3532                        }
3533                    },
3534                ),
3535            )
3536            .route(
3537                "/api/v1/sessions/{id}",
3538                get(|_path: Path<String>| async {
3539                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3540                }),
3541            );
3542
3543        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3544        let addr = listener.local_addr().unwrap();
3545        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3546        format!("http://127.0.0.1:{}", addr.port())
3547    }
3548
3549    #[tokio::test]
3550    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3551        let base = start_follow_test_server().await;
3552        let client = reqwest::Client::new();
3553        let mut buf = Vec::new();
3554
3555        follow_logs(&client, &base, "test", 100, None, &mut buf)
3556            .await
3557            .unwrap();
3558
3559        let output = String::from_utf8(buf).unwrap();
3560        // Should contain initial output + new lines + agent exit marker
3561        assert!(output.contains("line1"));
3562        assert!(output.contains("line2"));
3563        assert!(output.contains("line3"));
3564        assert!(output.contains("line4"));
3565        assert!(output.contains("[pulpo] Agent exited"));
3566    }
3567
3568    #[tokio::test]
3569    async fn test_execute_logs_follow_success() {
3570        let base = start_follow_test_server().await;
3571        // Extract host:port from http://127.0.0.1:PORT
3572        let node = base.strip_prefix("http://").unwrap().to_owned();
3573
3574        let cli = Cli {
3575            node,
3576            token: None,
3577            command: Some(Commands::Logs {
3578                name: "test".into(),
3579                lines: 100,
3580                follow: true,
3581            }),
3582            path: None,
3583        };
3584        // execute() with follow writes to stdout and returns empty string
3585        let result = execute(&cli).await.unwrap();
3586        assert_eq!(result, "");
3587    }
3588
3589    #[tokio::test]
3590    async fn test_execute_logs_follow_connection_refused() {
3591        let cli = Cli {
3592            node: "localhost:1".into(),
3593            token: None,
3594            command: Some(Commands::Logs {
3595                name: "test".into(),
3596                lines: 50,
3597                follow: true,
3598            }),
3599            path: None,
3600        };
3601        let result = execute(&cli).await;
3602        let err = result.unwrap_err().to_string();
3603        assert!(
3604            err.contains("Could not connect to pulpod"),
3605            "Expected friendly error, got: {err}"
3606        );
3607    }
3608
3609    #[tokio::test]
3610    async fn test_follow_logs_exits_on_dead() {
3611        use axum::{Router, extract::Path, extract::Query, routing::get};
3612
3613        let app = Router::new()
3614            .route(
3615                "/api/v1/sessions/{id}/output",
3616                get(
3617                    |_path: Path<String>,
3618                     _query: Query<std::collections::HashMap<String, String>>| async {
3619                        r#"{"output":"some output"}"#.to_owned()
3620                    },
3621                ),
3622            )
3623            .route(
3624                "/api/v1/sessions/{id}",
3625                get(|_path: Path<String>| async {
3626                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3627                }),
3628            );
3629
3630        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3631        let addr = listener.local_addr().unwrap();
3632        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3633        let base = format!("http://127.0.0.1:{}", addr.port());
3634
3635        let client = reqwest::Client::new();
3636        let mut buf = Vec::new();
3637        follow_logs(&client, &base, "test", 100, None, &mut buf)
3638            .await
3639            .unwrap();
3640
3641        let output = String::from_utf8(buf).unwrap();
3642        assert!(output.contains("some output"));
3643    }
3644
3645    #[tokio::test]
3646    async fn test_follow_logs_exits_on_stale() {
3647        use axum::{Router, extract::Path, extract::Query, routing::get};
3648
3649        let app = Router::new()
3650            .route(
3651                "/api/v1/sessions/{id}/output",
3652                get(
3653                    |_path: Path<String>,
3654                     _query: Query<std::collections::HashMap<String, String>>| async {
3655                        r#"{"output":"stale output"}"#.to_owned()
3656                    },
3657                ),
3658            )
3659            .route(
3660                "/api/v1/sessions/{id}",
3661                get(|_path: Path<String>| async {
3662                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3663                }),
3664            );
3665
3666        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3667        let addr = listener.local_addr().unwrap();
3668        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3669        let base = format!("http://127.0.0.1:{}", addr.port());
3670
3671        let client = reqwest::Client::new();
3672        let mut buf = Vec::new();
3673        follow_logs(&client, &base, "test", 100, None, &mut buf)
3674            .await
3675            .unwrap();
3676
3677        let output = String::from_utf8(buf).unwrap();
3678        assert!(output.contains("stale output"));
3679    }
3680
3681    #[tokio::test]
3682    async fn test_execute_logs_follow_non_reqwest_error() {
3683        use axum::{Router, extract::Path, extract::Query, routing::get};
3684
3685        // Session status endpoint returns invalid JSON to trigger a serde error
3686        let app = Router::new()
3687            .route(
3688                "/api/v1/sessions/{id}/output",
3689                get(
3690                    |_path: Path<String>,
3691                     _query: Query<std::collections::HashMap<String, String>>| async {
3692                        r#"{"output":"initial"}"#.to_owned()
3693                    },
3694                ),
3695            )
3696            .route(
3697                "/api/v1/sessions/{id}",
3698                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3699            );
3700
3701        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3702        let addr = listener.local_addr().unwrap();
3703        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3704        let node = format!("127.0.0.1:{}", addr.port());
3705
3706        let cli = Cli {
3707            node,
3708            token: None,
3709            command: Some(Commands::Logs {
3710                name: "test".into(),
3711                lines: 100,
3712                follow: true,
3713            }),
3714            path: None,
3715        };
3716        let err = execute(&cli).await.unwrap_err();
3717        // serde_json error, not a reqwest error — hits the Err(other) branch
3718        let msg = err.to_string();
3719        assert!(
3720            msg.contains("expected ident"),
3721            "Expected serde parse error, got: {msg}"
3722        );
3723    }
3724
3725    #[tokio::test]
3726    async fn test_fetch_session_status_connection_error() {
3727        let client = reqwest::Client::new();
3728        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3729        assert!(result.is_err());
3730    }
3731
3732    // -- Schedule tests --
3733
3734    #[test]
3735    fn test_format_schedules_empty() {
3736        assert_eq!(format_schedules(&[]), "No schedules.");
3737    }
3738
3739    #[test]
3740    fn test_format_schedules_with_entries() {
3741        let schedules = vec![serde_json::json!({
3742            "name": "nightly",
3743            "cron": "0 3 * * *",
3744            "enabled": true,
3745            "last_run_at": null,
3746            "target_node": null
3747        })];
3748        let output = format_schedules(&schedules);
3749        assert!(output.contains("nightly"));
3750        assert!(output.contains("0 3 * * *"));
3751        assert!(output.contains("local"));
3752        assert!(output.contains("yes"));
3753        assert!(output.contains('-'));
3754    }
3755
3756    #[test]
3757    fn test_format_schedules_disabled_entry() {
3758        let schedules = vec![serde_json::json!({
3759            "name": "weekly",
3760            "cron": "0 0 * * 0",
3761            "enabled": false,
3762            "last_run_at": "2026-03-18T03:00:00Z",
3763            "target_node": "gpu-box"
3764        })];
3765        let output = format_schedules(&schedules);
3766        assert!(output.contains("weekly"));
3767        assert!(output.contains("no"));
3768        assert!(output.contains("gpu-box"));
3769        assert!(output.contains("2026-03-18T03:00"));
3770    }
3771
3772    #[test]
3773    fn test_format_schedules_header() {
3774        let schedules = vec![serde_json::json!({
3775            "name": "test",
3776            "cron": "* * * * *",
3777            "enabled": true,
3778            "last_run_at": null,
3779            "target_node": null
3780        })];
3781        let output = format_schedules(&schedules);
3782        assert!(output.contains("NAME"));
3783        assert!(output.contains("CRON"));
3784        assert!(output.contains("ENABLED"));
3785        assert!(output.contains("LAST RUN"));
3786        assert!(output.contains("NODE"));
3787    }
3788
3789    // -- Schedule CLI parse tests --
3790
3791    #[test]
3792    fn test_cli_parse_schedule_add() {
3793        let cli = Cli::try_parse_from([
3794            "pulpo",
3795            "schedule",
3796            "add",
3797            "nightly",
3798            "0 3 * * *",
3799            "--workdir",
3800            "/repo",
3801            "--",
3802            "claude",
3803            "-p",
3804            "review",
3805        ])
3806        .unwrap();
3807        assert!(matches!(
3808            &cli.command,
3809            Some(Commands::Schedule {
3810                action: ScheduleAction::Add { name, cron, .. }
3811            }) if name == "nightly" && cron == "0 3 * * *"
3812        ));
3813    }
3814
3815    #[test]
3816    fn test_cli_parse_schedule_add_with_node() {
3817        let cli = Cli::try_parse_from([
3818            "pulpo",
3819            "schedule",
3820            "add",
3821            "nightly",
3822            "0 3 * * *",
3823            "--workdir",
3824            "/repo",
3825            "--node",
3826            "gpu-box",
3827            "--",
3828            "claude",
3829        ])
3830        .unwrap();
3831        assert!(matches!(
3832            &cli.command,
3833            Some(Commands::Schedule {
3834                action: ScheduleAction::Add { node, .. }
3835            }) if node.as_deref() == Some("gpu-box")
3836        ));
3837    }
3838
3839    #[test]
3840    fn test_cli_parse_schedule_add_install_alias() {
3841        let cli =
3842            Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3843        assert!(matches!(
3844            &cli.command,
3845            Some(Commands::Schedule {
3846                action: ScheduleAction::Add { name, .. }
3847            }) if name == "nightly"
3848        ));
3849    }
3850
3851    #[test]
3852    fn test_cli_parse_schedule_list() {
3853        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3854        assert!(matches!(
3855            &cli.command,
3856            Some(Commands::Schedule {
3857                action: ScheduleAction::List
3858            })
3859        ));
3860    }
3861
3862    #[test]
3863    fn test_cli_parse_schedule_remove() {
3864        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3865        assert!(matches!(
3866            &cli.command,
3867            Some(Commands::Schedule {
3868                action: ScheduleAction::Remove { name }
3869            }) if name == "nightly"
3870        ));
3871    }
3872
3873    #[test]
3874    fn test_cli_parse_schedule_pause() {
3875        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3876        assert!(matches!(
3877            &cli.command,
3878            Some(Commands::Schedule {
3879                action: ScheduleAction::Pause { name }
3880            }) if name == "nightly"
3881        ));
3882    }
3883
3884    #[test]
3885    fn test_cli_parse_schedule_resume() {
3886        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3887        assert!(matches!(
3888            &cli.command,
3889            Some(Commands::Schedule {
3890                action: ScheduleAction::Resume { name }
3891            }) if name == "nightly"
3892        ));
3893    }
3894
3895    #[test]
3896    fn test_cli_parse_schedule_alias() {
3897        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3898        assert!(matches!(
3899            &cli.command,
3900            Some(Commands::Schedule {
3901                action: ScheduleAction::List
3902            })
3903        ));
3904    }
3905
3906    #[test]
3907    fn test_cli_parse_schedule_list_alias() {
3908        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3909        assert!(matches!(
3910            &cli.command,
3911            Some(Commands::Schedule {
3912                action: ScheduleAction::List
3913            })
3914        ));
3915    }
3916
3917    #[test]
3918    fn test_cli_parse_schedule_remove_alias() {
3919        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3920        assert!(matches!(
3921            &cli.command,
3922            Some(Commands::Schedule {
3923                action: ScheduleAction::Remove { name }
3924            }) if name == "nightly"
3925        ));
3926    }
3927
3928    #[tokio::test]
3929    async fn test_execute_schedule_list_via_execute() {
3930        let node = start_test_server().await;
3931        let cli = Cli {
3932            node,
3933            token: None,
3934            command: Some(Commands::Schedule {
3935                action: ScheduleAction::List,
3936            }),
3937            path: None,
3938        };
3939        let result = execute(&cli).await.unwrap();
3940        // Under coverage, execute_schedule is a stub that returns empty string
3941        #[cfg(coverage)]
3942        assert!(result.is_empty());
3943        #[cfg(not(coverage))]
3944        assert_eq!(result, "No schedules.");
3945    }
3946
3947    #[test]
3948    fn test_schedule_action_debug() {
3949        let action = ScheduleAction::List;
3950        assert_eq!(format!("{action:?}"), "List");
3951    }
3952
3953    #[test]
3954    fn test_cli_parse_send_alias() {
3955        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3956        assert!(matches!(
3957            &cli.command,
3958            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3959        ));
3960    }
3961
3962    #[test]
3963    fn test_cli_parse_spawn_no_name() {
3964        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3965        assert!(matches!(
3966            &cli.command,
3967            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3968        ));
3969    }
3970
3971    #[test]
3972    fn test_cli_parse_spawn_optional_name_with_command() {
3973        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3974        assert!(matches!(
3975            &cli.command,
3976            Some(Commands::Spawn { name, command, .. })
3977                if name.is_none() && command == &["echo", "hello"]
3978        ));
3979    }
3980
3981    #[test]
3982    fn test_cli_parse_path_shortcut() {
3983        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3984        assert!(cli.command.is_none());
3985        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3986    }
3987
3988    #[test]
3989    fn test_cli_parse_no_args() {
3990        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3991        assert!(cli.command.is_none());
3992        assert!(cli.path.is_none());
3993    }
3994
3995    #[test]
3996    fn test_derive_session_name_simple() {
3997        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3998    }
3999
4000    #[test]
4001    fn test_derive_session_name_with_special_chars() {
4002        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4003    }
4004
4005    #[test]
4006    fn test_derive_session_name_root() {
4007        assert_eq!(derive_session_name("/"), "session");
4008    }
4009
4010    #[test]
4011    fn test_derive_session_name_dots() {
4012        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4013    }
4014
4015    #[test]
4016    fn test_resolve_path_absolute() {
4017        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4018    }
4019
4020    #[test]
4021    fn test_resolve_path_relative() {
4022        let resolved = resolve_path("my-repo");
4023        assert!(resolved.ends_with("my-repo"));
4024        assert!(resolved.starts_with('/'));
4025    }
4026
4027    #[tokio::test]
4028    async fn test_execute_no_args_shows_help() {
4029        let node = start_test_server().await;
4030        let cli = Cli {
4031            node,
4032            token: None,
4033            path: None,
4034            command: None,
4035        };
4036        let result = execute(&cli).await.unwrap();
4037        assert!(
4038            result.is_empty(),
4039            "no-args should return empty string after printing help"
4040        );
4041    }
4042
4043    #[tokio::test]
4044    async fn test_execute_path_shortcut() {
4045        let node = start_test_server().await;
4046        let cli = Cli {
4047            node,
4048            token: None,
4049            path: Some("/tmp".into()),
4050            command: None,
4051        };
4052        let result = execute(&cli).await.unwrap();
4053        assert!(result.contains("Detached from session"));
4054    }
4055
4056    #[tokio::test]
4057    async fn test_deduplicate_session_name_no_conflict() {
4058        // Connection refused → falls through to "name not taken" path
4059        let base = "http://127.0.0.1:1";
4060        let client = reqwest::Client::new();
4061        let name = deduplicate_session_name(&client, base, "fresh", None).await;
4062        assert_eq!(name, "fresh");
4063    }
4064
4065    #[tokio::test]
4066    async fn test_deduplicate_session_name_with_conflict() {
4067        use axum::{Router, routing::get};
4068        use std::sync::atomic::{AtomicU32, Ordering};
4069
4070        let call_count = std::sync::Arc::new(AtomicU32::new(0));
4071        let counter = call_count.clone();
4072        let app = Router::new()
4073            .route(
4074                "/api/v1/sessions/{id}",
4075                get(move || {
4076                    let c = counter.clone();
4077                    async move {
4078                        let n = c.fetch_add(1, Ordering::SeqCst);
4079                        if n == 0 {
4080                            // First call (base name) → exists
4081                            (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4082                        } else {
4083                            // Suffixed name → not found
4084                            (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4085                        }
4086                    }
4087                }),
4088            )
4089            .route(
4090                "/api/v1/peers",
4091                get(|| async {
4092                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4093                }),
4094            );
4095        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4096        let addr = listener.local_addr().unwrap();
4097        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4098        let base = format!("http://127.0.0.1:{}", addr.port());
4099        let client = reqwest::Client::new();
4100        let name = deduplicate_session_name(&client, &base, "repo", None).await;
4101        assert_eq!(name, "repo-2");
4102    }
4103
4104    // -- Node resolution tests --
4105
4106    #[test]
4107    fn test_node_needs_resolution() {
4108        assert!(!node_needs_resolution("localhost:7433"));
4109        assert!(!node_needs_resolution("mac-mini:7433"));
4110        assert!(!node_needs_resolution("10.0.0.1:7433"));
4111        assert!(!node_needs_resolution("[::1]:7433"));
4112        assert!(node_needs_resolution("mac-mini"));
4113        assert!(node_needs_resolution("linux-server"));
4114        assert!(node_needs_resolution("localhost"));
4115    }
4116
4117    #[tokio::test]
4118    async fn test_resolve_node_with_port() {
4119        let client = reqwest::Client::new();
4120        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4121        assert_eq!(addr, "mac-mini:7433");
4122        assert!(token.is_none());
4123    }
4124
4125    #[tokio::test]
4126    async fn test_resolve_node_fallback_appends_port() {
4127        // No local daemon running on localhost:7433, so peer lookup fails
4128        // and it falls back to appending :7433
4129        let client = reqwest::Client::new();
4130        let (addr, token) = resolve_node(&client, "unknown-host").await;
4131        assert_eq!(addr, "unknown-host:7433");
4132        assert!(token.is_none());
4133    }
4134
4135    #[cfg(not(coverage))]
4136    #[tokio::test]
4137    async fn test_resolve_node_finds_peer() {
4138        use axum::{Router, routing::get};
4139
4140        let app = Router::new()
4141            .route(
4142                "/api/v1/peers",
4143                get(|| async {
4144                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4145                }),
4146            )
4147            .route(
4148                "/api/v1/config",
4149                get(|| async {
4150                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4151                }),
4152            );
4153
4154        // Port 7433 may be in use; skip test if so
4155        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4156            return;
4157        };
4158        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4159
4160        let client = reqwest::Client::new();
4161        let (addr, token) = resolve_node(&client, "mac-mini").await;
4162        assert_eq!(addr, "10.0.0.5:7433");
4163        assert_eq!(token, Some("peer-secret".into()));
4164    }
4165
4166    #[cfg(not(coverage))]
4167    #[tokio::test]
4168    async fn test_resolve_node_peer_no_token() {
4169        use axum::{Router, routing::get};
4170
4171        let app = Router::new()
4172            .route(
4173                "/api/v1/peers",
4174                get(|| async {
4175                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4176                }),
4177            )
4178            .route(
4179                "/api/v1/config",
4180                get(|| async {
4181                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4182                }),
4183            );
4184
4185        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4186            return; // Port in use, skip
4187        };
4188        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4189
4190        let client = reqwest::Client::new();
4191        let (addr, token) = resolve_node(&client, "test-peer").await;
4192        assert_eq!(addr, "10.0.0.9:7433");
4193        assert!(token.is_none()); // Simple peer entry has no token
4194    }
4195
4196    #[tokio::test]
4197    async fn test_execute_with_peer_name_resolution() {
4198        // When node doesn't contain ':', resolve_node is called.
4199        // Since there's no local daemon on port 7433, it falls back to appending :7433.
4200        // The connection to the fallback address will fail, giving us a connection error.
4201        let cli = Cli {
4202            node: "nonexistent-peer".into(),
4203            token: None,
4204            command: Some(Commands::List { all: false }),
4205            path: None,
4206        };
4207        let result = execute(&cli).await;
4208        // Should try to connect to nonexistent-peer:7433 and fail
4209        assert!(result.is_err());
4210    }
4211
4212    // -- Auto node selection tests --
4213
4214    #[test]
4215    fn test_cli_parse_spawn_auto() {
4216        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4217        assert!(matches!(
4218            &cli.command,
4219            Some(Commands::Spawn { auto, .. }) if *auto
4220        ));
4221    }
4222
4223    #[test]
4224    fn test_cli_parse_spawn_auto_default() {
4225        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4226        assert!(matches!(
4227            &cli.command,
4228            Some(Commands::Spawn { auto, .. }) if !auto
4229        ));
4230    }
4231
4232    #[tokio::test]
4233    async fn test_select_best_node_coverage_stub() {
4234        // Exercise the coverage stub (or real function in non-coverage builds)
4235        let client = reqwest::Client::new();
4236        // In coverage builds, the stub returns ("localhost:7433", "local")
4237        // In non-coverage builds, this fails because no server is running — that's OK
4238        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4239    }
4240
4241    #[cfg(not(coverage))]
4242    #[tokio::test]
4243    async fn test_select_best_node_picks_least_loaded() {
4244        use axum::{Router, routing::get};
4245
4246        let app = Router::new().route(
4247            "/api/v1/peers",
4248            get(|| async {
4249                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4250            }),
4251        );
4252        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4253        let addr = listener.local_addr().unwrap();
4254        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4255        let base = format!("http://127.0.0.1:{}", addr.port());
4256
4257        let client = reqwest::Client::new();
4258        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4259        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
4260        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
4261        // idle wins (lower score)
4262        assert_eq!(name, "idle");
4263        assert_eq!(addr, "idle:7433");
4264    }
4265
4266    #[cfg(not(coverage))]
4267    #[tokio::test]
4268    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4269        use axum::{Router, routing::get};
4270
4271        let app = Router::new().route(
4272            "/api/v1/peers",
4273            get(|| async {
4274                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4275            }),
4276        );
4277        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4278        let addr = listener.local_addr().unwrap();
4279        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4280        let base = format!("http://127.0.0.1:{}", addr.port());
4281
4282        let client = reqwest::Client::new();
4283        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4284        assert_eq!(name, "my-mac");
4285        assert_eq!(addr, "localhost:7433");
4286    }
4287
4288    #[cfg(not(coverage))]
4289    #[tokio::test]
4290    async fn test_select_best_node_empty_peers_falls_back_to_local() {
4291        use axum::{Router, routing::get};
4292
4293        let app = Router::new().route(
4294            "/api/v1/peers",
4295            get(|| async {
4296                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4297            }),
4298        );
4299        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4300        let addr = listener.local_addr().unwrap();
4301        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4302        let base = format!("http://127.0.0.1:{}", addr.port());
4303
4304        let client = reqwest::Client::new();
4305        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4306        assert_eq!(name, "solo");
4307        assert_eq!(addr, "localhost:7433");
4308    }
4309
4310    #[cfg(not(coverage))]
4311    #[tokio::test]
4312    async fn test_execute_spawn_auto_selects_node() {
4313        use axum::{
4314            Router,
4315            http::StatusCode,
4316            routing::{get, post},
4317        };
4318
4319        let create_json = test_create_response_json();
4320
4321        // Bind early so we know the address to embed in the peers response
4322        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4323        let addr = listener.local_addr().unwrap();
4324        let node = format!("127.0.0.1:{}", addr.port());
4325        let peer_addr = node.clone();
4326
4327        let app = Router::new()
4328            .route(
4329                "/api/v1/peers",
4330                get(move || {
4331                    let peer_addr = peer_addr.clone();
4332                    async move {
4333                        format!(
4334                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4335                        )
4336                    }
4337                }),
4338            )
4339            .route(
4340                "/api/v1/sessions",
4341                post(move || async move {
4342                    (StatusCode::CREATED, create_json.clone())
4343                }),
4344            );
4345
4346        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4347
4348        let cli = Cli {
4349            node,
4350            token: None,
4351            command: Some(Commands::Spawn {
4352                name: Some("test".into()),
4353                workdir: Some("/tmp/repo".into()),
4354                ink: None,
4355                description: None,
4356                detach: true,
4357                idle_threshold: None,
4358                auto: true,
4359                worktree: false,
4360                runtime: None,
4361                secret: vec![],
4362                command: vec!["echo".into(), "hello".into()],
4363            }),
4364            path: None,
4365        };
4366        let result = execute(&cli).await.unwrap();
4367        assert!(result.contains("Created session"));
4368    }
4369
4370    #[cfg(not(coverage))]
4371    #[tokio::test]
4372    async fn test_select_best_node_peer_no_session_count() {
4373        use axum::{Router, routing::get};
4374
4375        let app = Router::new().route(
4376            "/api/v1/peers",
4377            get(|| async {
4378                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4379            }),
4380        );
4381        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4382        let addr = listener.local_addr().unwrap();
4383        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4384        let base = format!("http://127.0.0.1:{}", addr.port());
4385
4386        let client = reqwest::Client::new();
4387        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4388        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
4389        assert_eq!(name, "fresh");
4390        assert_eq!(addr, "fresh:7433");
4391    }
4392
4393    // -- Secret CLI parse tests --
4394
4395    #[test]
4396    fn test_cli_parse_secret_set() {
4397        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4398        assert!(matches!(
4399            &cli.command,
4400            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4401                if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4402        ));
4403    }
4404
4405    #[test]
4406    fn test_cli_parse_secret_list() {
4407        let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4408        assert!(matches!(
4409            &cli.command,
4410            Some(Commands::Secret {
4411                action: SecretAction::List
4412            })
4413        ));
4414    }
4415
4416    #[test]
4417    fn test_cli_parse_secret_list_alias() {
4418        let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4419        assert!(matches!(
4420            &cli.command,
4421            Some(Commands::Secret {
4422                action: SecretAction::List
4423            })
4424        ));
4425    }
4426
4427    #[test]
4428    fn test_cli_parse_secret_delete() {
4429        let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4430        assert!(matches!(
4431            &cli.command,
4432            Some(Commands::Secret { action: SecretAction::Delete { name } })
4433                if name == "MY_TOKEN"
4434        ));
4435    }
4436
4437    #[test]
4438    fn test_cli_parse_secret_delete_alias() {
4439        let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4440        assert!(matches!(
4441            &cli.command,
4442            Some(Commands::Secret { action: SecretAction::Delete { name } })
4443                if name == "MY_TOKEN"
4444        ));
4445    }
4446
4447    #[test]
4448    fn test_cli_parse_secret_alias() {
4449        let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4450        assert!(matches!(
4451            &cli.command,
4452            Some(Commands::Secret {
4453                action: SecretAction::List
4454            })
4455        ));
4456    }
4457
4458    #[test]
4459    fn test_format_secrets_empty() {
4460        let secrets: Vec<serde_json::Value> = vec![];
4461        assert_eq!(format_secrets(&secrets), "No secrets configured.");
4462    }
4463
4464    #[test]
4465    fn test_format_secrets_with_entries() {
4466        let secrets = vec![
4467            serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4468            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4469        ];
4470        let output = format_secrets(&secrets);
4471        assert!(output.contains("GITHUB_TOKEN"));
4472        assert!(output.contains("NPM_TOKEN"));
4473        assert!(output.contains("NAME"));
4474        assert!(output.contains("ENV"));
4475        assert!(output.contains("CREATED"));
4476    }
4477
4478    #[test]
4479    fn test_format_secrets_with_env() {
4480        let secrets = vec![
4481            serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4482            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4483        ];
4484        let output = format_secrets(&secrets);
4485        assert!(output.contains("GH_WORK"));
4486        assert!(output.contains("GITHUB_TOKEN"));
4487        assert!(output.contains("NPM_TOKEN"));
4488    }
4489
4490    #[test]
4491    fn test_format_secrets_short_timestamp() {
4492        let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4493        let output = format_secrets(&secrets);
4494        assert!(output.contains("now"));
4495    }
4496
4497    #[test]
4498    fn test_format_schedules_short_last_run_at() {
4499        // Regression: last_run_at shorter than 16 chars must not panic
4500        let schedules = vec![serde_json::json!({
4501            "name": "test",
4502            "cron": "* * * * *",
4503            "enabled": true,
4504            "last_run_at": "short",
4505            "target_node": null
4506        })];
4507        let output = format_schedules(&schedules);
4508        assert!(output.contains("short"));
4509    }
4510
4511    #[test]
4512    fn test_format_sessions_multibyte_command_truncation() {
4513        use chrono::Utc;
4514        use pulpo_common::session::SessionStatus;
4515        use uuid::Uuid;
4516
4517        // Command with multi-byte chars exceeding 50 bytes; must not panic
4518        let sessions = vec![Session {
4519            id: Uuid::nil(),
4520            name: "test".into(),
4521            workdir: "/tmp".into(),
4522            command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4523            description: None,
4524            status: SessionStatus::Active,
4525            exit_code: None,
4526            backend_session_id: None,
4527            output_snapshot: None,
4528            metadata: None,
4529            ink: None,
4530            intervention_code: None,
4531            intervention_reason: None,
4532            intervention_at: None,
4533            last_output_at: None,
4534            idle_since: None,
4535            idle_threshold_secs: None,
4536            worktree_path: None,
4537            runtime: Runtime::Tmux,
4538            created_at: Utc::now(),
4539            updated_at: Utc::now(),
4540        }];
4541        let output = format_sessions(&sessions);
4542        assert!(output.contains("..."));
4543    }
4544}