Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6    PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12    name = "pulpo",
13    about = "Manage agent sessions across your machines",
14    version = env!("PULPO_VERSION"),
15    args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18    /// Target node (default: localhost)
19    #[arg(long, default_value = "localhost:7433")]
20    pub node: String,
21
22    /// Auth token (auto-discovered from local daemon if omitted)
23    #[arg(long)]
24    pub token: Option<String>,
25
26    #[command(subcommand)]
27    pub command: Option<Commands>,
28
29    /// Quick spawn: `pulpo <path>` spawns a session in that directory
30    #[arg(value_name = "PATH")]
31    pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37    /// Attach to a session's terminal
38    #[command(visible_alias = "a")]
39    Attach {
40        /// Session name or ID
41        name: String,
42    },
43
44    /// Send input to a session
45    #[command(visible_alias = "i", visible_alias = "send")]
46    Input {
47        /// Session name or ID
48        name: String,
49        /// Text to send (sends Enter if omitted)
50        text: Option<String>,
51    },
52
53    /// Spawn a new agent session
54    #[command(visible_alias = "s")]
55    Spawn {
56        /// Session name (auto-generated from workdir if omitted)
57        name: Option<String>,
58
59        /// Working directory (defaults to current directory)
60        #[arg(long)]
61        workdir: Option<String>,
62
63        /// Ink name (from config)
64        #[arg(long)]
65        ink: Option<String>,
66
67        /// Human-readable description of the task
68        #[arg(long)]
69        description: Option<String>,
70
71        /// Don't attach to the session after spawning
72        #[arg(short, long)]
73        detach: bool,
74
75        /// Idle threshold in seconds (0 = never idle)
76        #[arg(long)]
77        idle_threshold: Option<u32>,
78
79        /// Auto-select the least loaded node
80        #[arg(long)]
81        auto: bool,
82
83        /// Create an isolated git worktree for the session
84        #[arg(long)]
85        worktree: bool,
86
87        /// Runtime environment: tmux (default) or docker
88        #[arg(long)]
89        runtime: Option<String>,
90
91        /// Secrets to inject as environment variables (by name)
92        #[arg(long)]
93        secret: Vec<String>,
94
95        /// Command to run (everything after --)
96        #[arg(last = true)]
97        command: Vec<String>,
98    },
99
100    /// List sessions (live only by default)
101    #[command(visible_alias = "ls")]
102    List {
103        /// Show all sessions including killed and lost
104        #[arg(short, long)]
105        all: bool,
106    },
107
108    /// Show session logs/output
109    #[command(visible_alias = "l")]
110    Logs {
111        /// Session name or ID
112        name: String,
113
114        /// Number of lines to fetch
115        #[arg(long, default_value = "100")]
116        lines: usize,
117
118        /// Follow output (like `tail -f`)
119        #[arg(short, long)]
120        follow: bool,
121    },
122
123    /// Kill one or more sessions
124    #[command(visible_alias = "k")]
125    Kill {
126        /// Session names or IDs
127        #[arg(required = true)]
128        names: Vec<String>,
129    },
130
131    /// Permanently remove one or more sessions from history
132    #[command(visible_alias = "rm")]
133    Delete {
134        /// Session names or IDs
135        #[arg(required = true)]
136        names: Vec<String>,
137    },
138
139    /// Remove all killed and lost sessions
140    Cleanup,
141
142    /// Resume a lost session
143    #[command(visible_alias = "r")]
144    Resume {
145        /// Session name or ID
146        name: String,
147    },
148
149    /// List all known nodes
150    #[command(visible_alias = "n")]
151    Nodes,
152
153    /// Show intervention history for a session
154    #[command(visible_alias = "iv")]
155    Interventions {
156        /// Session name or ID
157        name: String,
158    },
159
160    /// Open the web dashboard in your browser
161    Ui,
162
163    /// Manage scheduled agent runs
164    #[command(visible_alias = "sched")]
165    Schedule {
166        #[command(subcommand)]
167        action: ScheduleAction,
168    },
169
170    /// Manage secrets (environment variables injected into sessions)
171    #[command(visible_alias = "sec")]
172    Secret {
173        #[command(subcommand)]
174        action: SecretAction,
175    },
176}
177
178#[derive(Subcommand, Debug)]
179pub enum SecretAction {
180    /// Set a secret
181    Set {
182        /// Secret name (will be the env var name, uppercase + underscores)
183        name: String,
184        /// Secret value
185        value: String,
186        /// Environment variable name (defaults to secret name)
187        #[arg(long)]
188        env: Option<String>,
189    },
190    /// List secret names
191    #[command(visible_alias = "ls")]
192    List,
193    /// Delete a secret
194    #[command(visible_alias = "rm")]
195    Delete {
196        /// Secret name
197        name: String,
198    },
199}
200
201#[derive(Subcommand, Debug)]
202pub enum ScheduleAction {
203    /// Add a new schedule
204    #[command(alias = "install")]
205    Add {
206        /// Schedule name
207        name: String,
208        /// Cron expression (e.g. "0 3 * * *")
209        cron: String,
210        /// Working directory
211        #[arg(long)]
212        workdir: Option<String>,
213        /// Target node (omit = local, "auto" = least-loaded)
214        #[arg(long)]
215        node: Option<String>,
216        /// Ink preset
217        #[arg(long)]
218        ink: Option<String>,
219        /// Description
220        #[arg(long)]
221        description: Option<String>,
222        /// Command to run (everything after --)
223        #[arg(last = true)]
224        command: Vec<String>,
225    },
226    /// List all schedules
227    #[command(alias = "ls")]
228    List,
229    /// Remove a schedule
230    #[command(alias = "rm")]
231    Remove {
232        /// Schedule name or ID
233        name: String,
234    },
235    /// Pause a schedule
236    Pause {
237        /// Schedule name or ID
238        name: String,
239    },
240    /// Resume a paused schedule
241    Resume {
242        /// Schedule name or ID
243        name: String,
244    },
245}
246
247/// The marker emitted by the agent wrapper when the agent process exits.
248const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
249
250/// Resolve a path to an absolute path string.
251fn resolve_path(path: &str) -> String {
252    let p = std::path::Path::new(path);
253    if p.is_absolute() {
254        path.to_owned()
255    } else {
256        std::env::current_dir().map_or_else(
257            |_| path.to_owned(),
258            |cwd| cwd.join(p).to_string_lossy().into_owned(),
259        )
260    }
261}
262
263/// Derive a session name from a directory path (basename, kebab-cased).
264fn derive_session_name(path: &str) -> String {
265    let basename = std::path::Path::new(path)
266        .file_name()
267        .and_then(|n| n.to_str())
268        .unwrap_or("session");
269    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
270    let kebab: String = basename
271        .chars()
272        .map(|c| {
273            if c.is_ascii_alphanumeric() {
274                c.to_ascii_lowercase()
275            } else {
276                '-'
277            }
278        })
279        .collect();
280    // Collapse consecutive hyphens and trim leading/trailing hyphens
281    let mut result = String::new();
282    for c in kebab.chars() {
283        if c == '-' && result.ends_with('-') {
284            continue;
285        }
286        result.push(c);
287    }
288    let result = result.trim_matches('-').to_owned();
289    if result.is_empty() {
290        "session".to_owned()
291    } else {
292        result
293    }
294}
295
296/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
297async fn deduplicate_session_name(
298    client: &reqwest::Client,
299    base: &str,
300    name: &str,
301    token: Option<&str>,
302) -> String {
303    // Check if the name is already taken by fetching the session
304    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
305        .send()
306        .await;
307    match resp {
308        Ok(r) if r.status().is_success() => {
309            // Session exists — try suffixed names
310            for i in 2..=99 {
311                let candidate = format!("{name}-{i}");
312                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
313                    .send()
314                    .await;
315                match resp {
316                    Ok(r) if r.status().is_success() => {}
317                    _ => return candidate,
318                }
319            }
320            format!("{name}-100")
321        }
322        _ => name.to_owned(),
323    }
324}
325
326/// Format the base URL from the node address.
327pub fn base_url(node: &str) -> String {
328    format!("http://{node}")
329}
330
331/// Response shape for the output endpoint.
332#[derive(serde::Deserialize)]
333struct OutputResponse {
334    output: String,
335}
336
337/// Format a list of sessions as a table.
338const fn session_runtime(session: &Session) -> &'static str {
339    match session.runtime {
340        Runtime::Tmux => "tmux",
341        Runtime::Docker => "docker",
342    }
343}
344
345fn format_sessions(sessions: &[Session]) -> String {
346    if sessions.is_empty() {
347        return "No sessions.".into();
348    }
349    let mut lines = vec![format!(
350        "{:<10} {:<24} {:<12} {:<8} {}",
351        "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
352    )];
353    for s in sessions {
354        let cmd_display = if s.command.len() > 50 {
355            let truncated: String = s.command.chars().take(47).collect();
356            format!("{truncated}...")
357        } else {
358            s.command.clone()
359        };
360        let short_id = &s.id.to_string()[..8];
361        let has_pr = s
362            .metadata
363            .as_ref()
364            .is_some_and(|m| m.contains_key("pr_url"));
365        let name_display = match (s.worktree_path.is_some(), has_pr) {
366            (true, true) => format!("{} [wt] [PR]", s.name),
367            (true, false) => format!("{} [wt]", s.name),
368            (false, true) => format!("{} [PR]", s.name),
369            (false, false) => s.name.clone(),
370        };
371        lines.push(format!(
372            "{:<10} {:<24} {:<12} {:<8} {}",
373            short_id,
374            name_display,
375            s.status,
376            session_runtime(s),
377            cmd_display
378        ));
379    }
380    lines.join("\n")
381}
382
383/// Format the peers response as a table.
384fn format_nodes(resp: &PeersResponse) -> String {
385    let mut lines = vec![format!(
386        "{:<20} {:<25} {:<10} {}",
387        "NAME", "ADDRESS", "STATUS", "SESSIONS"
388    )];
389    lines.push(format!(
390        "{:<20} {:<25} {:<10} {}",
391        resp.local.name, "(local)", "online", "-"
392    ));
393    for p in &resp.peers {
394        let sessions = p
395            .session_count
396            .map_or_else(|| "-".into(), |c| c.to_string());
397        lines.push(format!(
398            "{:<20} {:<25} {:<10} {}",
399            p.name, p.address, p.status, sessions
400        ));
401    }
402    lines.join("\n")
403}
404
405/// Format intervention events as a table.
406fn format_interventions(events: &[InterventionEventResponse]) -> String {
407    if events.is_empty() {
408        return "No intervention events.".into();
409    }
410    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
411    for e in events {
412        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
413    }
414    lines.join("\n")
415}
416
417/// Build the command to open a URL in the default browser.
418#[cfg_attr(coverage, allow(dead_code))]
419fn build_open_command(url: &str) -> std::process::Command {
420    #[cfg(target_os = "macos")]
421    {
422        let mut cmd = std::process::Command::new("open");
423        cmd.arg(url);
424        cmd
425    }
426    #[cfg(target_os = "linux")]
427    {
428        let mut cmd = std::process::Command::new("xdg-open");
429        cmd.arg(url);
430        cmd
431    }
432    #[cfg(target_os = "windows")]
433    {
434        let mut cmd = std::process::Command::new("cmd");
435        cmd.args(["/C", "start", url]);
436        cmd
437    }
438    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
439    {
440        // Fallback: try xdg-open
441        let mut cmd = std::process::Command::new("xdg-open");
442        cmd.arg(url);
443        cmd
444    }
445}
446
447/// Open a URL in the default browser.
448#[cfg(not(coverage))]
449fn open_browser(url: &str) -> Result<()> {
450    build_open_command(url).status()?;
451    Ok(())
452}
453
454/// Stub for coverage builds — avoids opening a browser during tests.
455#[cfg(coverage)]
456fn open_browser(_url: &str) -> Result<()> {
457    Ok(())
458}
459
460/// Build the command to attach to a session's terminal.
461/// Detects Docker sessions by the `docker:` prefix in the backend session ID.
462#[cfg_attr(coverage, allow(dead_code))]
463fn build_attach_command(backend_session_id: &str) -> std::process::Command {
464    // Docker sessions: exec into the container
465    if let Some(container) = backend_session_id.strip_prefix("docker:") {
466        let mut cmd = std::process::Command::new("docker");
467        cmd.args(["exec", "-it", container, "/bin/sh"]);
468        return cmd;
469    }
470    // tmux sessions
471    #[cfg(not(target_os = "windows"))]
472    {
473        let mut cmd = std::process::Command::new("tmux");
474        cmd.args(["attach-session", "-t", backend_session_id]);
475        cmd
476    }
477    #[cfg(target_os = "windows")]
478    {
479        // tmux attach not available on Windows — inform the user
480        let mut cmd = std::process::Command::new("cmd");
481        cmd.args([
482            "/C",
483            "echo",
484            "Attach not available on Windows. Use the web UI or --runtime docker.",
485        ]);
486        cmd
487    }
488}
489
490/// Attach to a session's terminal.
491#[cfg(not(any(test, coverage, target_os = "windows")))]
492fn attach_session(backend_session_id: &str) -> Result<()> {
493    let status = build_attach_command(backend_session_id).status()?;
494    if !status.success() {
495        anyhow::bail!("attach failed with {status}");
496    }
497    Ok(())
498}
499
500/// Stub for Windows — tmux attach is not available.
501#[cfg(all(target_os = "windows", not(test), not(coverage)))]
502fn attach_session(_backend_session_id: &str) -> Result<()> {
503    eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
504    Ok(())
505}
506
507/// Stub for test and coverage builds — avoids spawning real terminals during tests.
508#[cfg(any(test, coverage))]
509#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
510fn attach_session(_backend_session_id: &str) -> Result<()> {
511    Ok(())
512}
513
514/// Extract a clean error message from an API JSON response (or fall back to raw text).
515fn api_error(text: &str) -> anyhow::Error {
516    serde_json::from_str::<serde_json::Value>(text)
517        .ok()
518        .and_then(|v| v["error"].as_str().map(String::from))
519        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
520}
521
522/// Return the response body text, or a clean error if the response was non-success.
523async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
524    if resp.status().is_success() {
525        Ok(resp.text().await?)
526    } else {
527        let text = resp.text().await?;
528        Err(api_error(&text))
529    }
530}
531
532/// Map a reqwest error to a user-friendly message.
533fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
534    if err.is_connect() {
535        anyhow::anyhow!(
536            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
537        )
538    } else {
539        anyhow::anyhow!("Network error connecting to {node}: {err}")
540    }
541}
542
543/// Check if the node address points to localhost.
544fn is_localhost(node: &str) -> bool {
545    let host = node.split(':').next().unwrap_or(node);
546    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
547}
548
549/// Try to auto-discover the auth token from a local daemon.
550async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
551    let resp = client
552        .get(format!("{base}/api/v1/auth/token"))
553        .send()
554        .await
555        .ok()?;
556    let body: AuthTokenResponse = resp.json().await.ok()?;
557    if body.token.is_empty() {
558        None
559    } else {
560        Some(body.token)
561    }
562}
563
564/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
565async fn resolve_token(
566    client: &reqwest::Client,
567    base: &str,
568    node: &str,
569    explicit: Option<&str>,
570) -> Option<String> {
571    if let Some(t) = explicit {
572        return Some(t.to_owned());
573    }
574    if is_localhost(node) {
575        return discover_token(client, base).await;
576    }
577    None
578}
579
580/// Check if a node string needs resolution (no port specified).
581fn node_needs_resolution(node: &str) -> bool {
582    !node.contains(':')
583}
584
585/// Resolve a node reference to a `host:port` address.
586///
587/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
588/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
589/// online peer is found, return its address and optionally its configured auth token
590/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
591#[cfg(not(coverage))]
592async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
593    // Already has port — use as-is
594    if !node_needs_resolution(node) {
595        return (node.to_owned(), None);
596    }
597
598    // Try to resolve via local daemon's peer registry
599    let local_base = "http://localhost:7433";
600    let mut resolved_address: Option<String> = None;
601
602    if let Ok(resp) = client
603        .get(format!("{local_base}/api/v1/peers"))
604        .send()
605        .await
606        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
607    {
608        for peer in &peers_resp.peers {
609            if peer.name == node {
610                resolved_address = Some(peer.address.clone());
611                break;
612            }
613        }
614    }
615
616    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
617
618    // Try to get the peer's auth token from the config endpoint
619    let peer_token = if let Ok(resp) = client
620        .get(format!("{local_base}/api/v1/config"))
621        .send()
622        .await
623        && let Ok(config) = resp.json::<ConfigResponse>().await
624        && let Some(entry) = config.peers.get(node)
625    {
626        entry.token().map(String::from)
627    } else {
628        None
629    };
630
631    (address, peer_token)
632}
633
634/// Coverage stub — no real HTTP resolution during coverage builds.
635#[cfg(coverage)]
636async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
637    if node_needs_resolution(node) {
638        (format!("{node}:7433"), None)
639    } else {
640        (node.to_owned(), None)
641    }
642}
643
644/// Build an authenticated GET request.
645fn authed_get(
646    client: &reqwest::Client,
647    url: String,
648    token: Option<&str>,
649) -> reqwest::RequestBuilder {
650    let req = client.get(url);
651    if let Some(t) = token {
652        req.bearer_auth(t)
653    } else {
654        req
655    }
656}
657
658/// Build an authenticated POST request.
659fn authed_post(
660    client: &reqwest::Client,
661    url: String,
662    token: Option<&str>,
663) -> reqwest::RequestBuilder {
664    let req = client.post(url);
665    if let Some(t) = token {
666        req.bearer_auth(t)
667    } else {
668        req
669    }
670}
671
672/// Build an authenticated DELETE request.
673fn authed_delete(
674    client: &reqwest::Client,
675    url: String,
676    token: Option<&str>,
677) -> reqwest::RequestBuilder {
678    let req = client.delete(url);
679    if let Some(t) = token {
680        req.bearer_auth(t)
681    } else {
682        req
683    }
684}
685
686/// Build an authenticated PUT request.
687#[cfg(not(coverage))]
688fn authed_put(
689    client: &reqwest::Client,
690    url: String,
691    token: Option<&str>,
692) -> reqwest::RequestBuilder {
693    let req = client.put(url);
694    if let Some(t) = token {
695        req.bearer_auth(t)
696    } else {
697        req
698    }
699}
700
701/// Fetch session output from the API.
702async fn fetch_output(
703    client: &reqwest::Client,
704    base: &str,
705    name: &str,
706    lines: usize,
707    token: Option<&str>,
708) -> Result<String> {
709    let resp = authed_get(
710        client,
711        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
712        token,
713    )
714    .send()
715    .await?;
716    let text = ok_or_api_error(resp).await?;
717    let output: OutputResponse = serde_json::from_str(&text)?;
718    Ok(output.output)
719}
720
721/// Fetch session status from the API.
722async fn fetch_session_status(
723    client: &reqwest::Client,
724    base: &str,
725    name: &str,
726    token: Option<&str>,
727) -> Result<String> {
728    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
729        .send()
730        .await?;
731    let text = ok_or_api_error(resp).await?;
732    let session: Session = serde_json::from_str(&text)?;
733    Ok(session.status.to_string())
734}
735
736/// Wait for the session to leave "creating" state, then check if it died instantly.
737/// Uses the session ID (not name) to avoid matching old killed sessions with the same name.
738/// Returns an error with a helpful message if the session is lost/killed.
739async fn check_session_alive(
740    client: &reqwest::Client,
741    base: &str,
742    session_id: &str,
743    token: Option<&str>,
744) -> Result<()> {
745    // Poll up to 3 times at 500ms intervals — handles slow daemons and Docker pull delays
746    for _ in 0..3 {
747        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
748        // Fetch by ID to avoid name collisions with old sessions
749        let resp = authed_get(
750            client,
751            format!("{base}/api/v1/sessions/{session_id}"),
752            token,
753        )
754        .send()
755        .await;
756        if let Ok(resp) = resp
757            && let Ok(text) = ok_or_api_error(resp).await
758            && let Ok(session) = serde_json::from_str::<Session>(&text)
759        {
760            match session.status.to_string().as_str() {
761                "creating" => continue,
762                "lost" | "killed" => {
763                    anyhow::bail!(
764                        "Session \"{}\" exited immediately — the command may have failed.\n  Check logs: pulpo logs {}",
765                        session.name,
766                        session.name
767                    );
768                }
769                _ => return Ok(()),
770            }
771        }
772        // fetch failed — don't block, proceed to attach
773        break;
774    }
775    Ok(())
776}
777
778/// Compute the new trailing lines that differ from the previous output.
779///
780/// The output endpoint returns the last N lines from the terminal pane. As new lines
781/// appear, old lines at the top scroll off. We find the overlap between the end
782/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
783/// trailing lines.
784fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
785    if prev.is_empty() {
786        return new;
787    }
788
789    let prev_lines: Vec<&str> = prev.lines().collect();
790    let new_lines: Vec<&str> = new.lines().collect();
791
792    if new_lines.is_empty() {
793        return "";
794    }
795
796    // prev is non-empty (early return above), so last() always succeeds
797    let last_prev = prev_lines[prev_lines.len() - 1];
798
799    // Find the last line of prev in new to determine the overlap boundary
800    for i in (0..new_lines.len()).rev() {
801        if new_lines[i] == last_prev {
802            // Verify contiguous overlap: check that lines before this match too
803            let overlap_len = prev_lines.len().min(i + 1);
804            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
805            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
806            if prev_tail == new_overlap {
807                if i + 1 < new_lines.len() {
808                    // Return the slice of `new` after the overlap
809                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
810                    return new.get(consumed.min(new.len())..).unwrap_or("");
811                }
812                return "";
813            }
814        }
815    }
816
817    // No overlap found — output changed completely, print it all
818    new
819}
820
821/// Follow logs by polling, printing only new output. Returns when the session ends.
822async fn follow_logs(
823    client: &reqwest::Client,
824    base: &str,
825    name: &str,
826    lines: usize,
827    token: Option<&str>,
828    writer: &mut (dyn std::io::Write + Send),
829) -> Result<()> {
830    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
831    write!(writer, "{prev_output}")?;
832
833    let mut unchanged_ticks: u32 = 0;
834
835    loop {
836        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
837
838        // Fetch latest output
839        let new_output = fetch_output(client, base, name, lines, token).await?;
840
841        let diff = diff_output(&prev_output, &new_output);
842        if diff.is_empty() {
843            unchanged_ticks += 1;
844        } else {
845            write!(writer, "{diff}")?;
846            unchanged_ticks = 0;
847        }
848
849        // Check for agent exit marker in output
850        if new_output.contains(AGENT_EXIT_MARKER) {
851            break;
852        }
853
854        prev_output = new_output;
855
856        // Only check session status when output has been unchanged for 3+ ticks
857        if unchanged_ticks >= 3 {
858            let status = fetch_session_status(client, base, name, token).await?;
859            let is_terminal = status == "ready" || status == "killed" || status == "lost";
860            if is_terminal {
861                break;
862            }
863        }
864    }
865    Ok(())
866}
867
868// --- Schedule API ---
869
870/// Execute a schedule subcommand via the scheduler API.
871#[cfg(not(coverage))]
872async fn execute_schedule(
873    client: &reqwest::Client,
874    action: &ScheduleAction,
875    base: &str,
876    token: Option<&str>,
877) -> Result<String> {
878    match action {
879        ScheduleAction::Add {
880            name,
881            cron,
882            workdir,
883            node,
884            ink,
885            description,
886            command,
887        } => {
888            let cmd = if command.is_empty() {
889                None
890            } else {
891                Some(command.join(" "))
892            };
893            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
894                std::env::current_dir()
895                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
896            });
897            let mut body = serde_json::json!({
898                "name": name,
899                "cron": cron,
900                "workdir": resolved_workdir,
901            });
902            if let Some(c) = &cmd {
903                body["command"] = serde_json::json!(c);
904            }
905            if let Some(n) = node {
906                body["target_node"] = serde_json::json!(n);
907            }
908            if let Some(i) = ink {
909                body["ink"] = serde_json::json!(i);
910            }
911            if let Some(d) = description {
912                body["description"] = serde_json::json!(d);
913            }
914            let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
915                .json(&body)
916                .send()
917                .await?;
918            ok_or_api_error(resp).await?;
919            Ok(format!("Created schedule \"{name}\""))
920        }
921        ScheduleAction::List => {
922            let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
923                .send()
924                .await?;
925            let text = ok_or_api_error(resp).await?;
926            let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
927            Ok(format_schedules(&schedules))
928        }
929        ScheduleAction::Remove { name } => {
930            let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
931                .send()
932                .await?;
933            ok_or_api_error(resp).await?;
934            Ok(format!("Removed schedule \"{name}\""))
935        }
936        ScheduleAction::Pause { name } => {
937            let body = serde_json::json!({ "enabled": false });
938            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
939                .json(&body)
940                .send()
941                .await?;
942            ok_or_api_error(resp).await?;
943            Ok(format!("Paused schedule \"{name}\""))
944        }
945        ScheduleAction::Resume { name } => {
946            let body = serde_json::json!({ "enabled": true });
947            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
948                .json(&body)
949                .send()
950                .await?;
951            ok_or_api_error(resp).await?;
952            Ok(format!("Resumed schedule \"{name}\""))
953        }
954    }
955}
956
957/// Coverage stub for schedule execution.
958#[cfg(coverage)]
959#[allow(clippy::unnecessary_wraps)]
960async fn execute_schedule(
961    _client: &reqwest::Client,
962    _action: &ScheduleAction,
963    _base: &str,
964    _token: Option<&str>,
965) -> Result<String> {
966    Ok(String::new())
967}
968
969// --- Secret API ---
970
971/// Format secret entries as a table.
972#[cfg_attr(coverage, allow(dead_code))]
973fn format_secrets(secrets: &[serde_json::Value]) -> String {
974    if secrets.is_empty() {
975        return "No secrets configured.".into();
976    }
977    let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
978    for s in secrets {
979        let name = s["name"].as_str().unwrap_or("?");
980        let env_display = s["env"]
981            .as_str()
982            .map_or_else(|| name.to_owned(), String::from);
983        let created = s["created_at"]
984            .as_str()
985            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
986        lines.push(format!("{name:<24} {env_display:<24} {created}"));
987    }
988    lines.join("\n")
989}
990
991/// Execute a secret subcommand via the secrets API.
992#[cfg(not(coverage))]
993async fn execute_secret(
994    client: &reqwest::Client,
995    action: &SecretAction,
996    base: &str,
997    token: Option<&str>,
998) -> Result<String> {
999    match action {
1000        SecretAction::Set { name, value, env } => {
1001            let mut body = serde_json::json!({ "value": value });
1002            if let Some(e) = env {
1003                body["env"] = serde_json::json!(e);
1004            }
1005            let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1006                .json(&body)
1007                .send()
1008                .await?;
1009            ok_or_api_error(resp).await?;
1010            Ok(format!("Secret \"{name}\" set."))
1011        }
1012        SecretAction::List => {
1013            let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1014                .send()
1015                .await?;
1016            let text = ok_or_api_error(resp).await?;
1017            let parsed: serde_json::Value = serde_json::from_str(&text)?;
1018            let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1019            Ok(format_secrets(secrets))
1020        }
1021        SecretAction::Delete { name } => {
1022            let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1023                .send()
1024                .await?;
1025            ok_or_api_error(resp).await?;
1026            Ok(format!("Secret \"{name}\" deleted."))
1027        }
1028    }
1029}
1030
1031/// Coverage stub for secret execution.
1032#[cfg(coverage)]
1033#[allow(clippy::unnecessary_wraps)]
1034async fn execute_secret(
1035    _client: &reqwest::Client,
1036    _action: &SecretAction,
1037    _base: &str,
1038    _token: Option<&str>,
1039) -> Result<String> {
1040    Ok(String::new())
1041}
1042
1043/// Format a list of schedules as a table.
1044#[cfg_attr(coverage, allow(dead_code))]
1045fn format_schedules(schedules: &[serde_json::Value]) -> String {
1046    if schedules.is_empty() {
1047        return "No schedules.".into();
1048    }
1049    let mut lines = vec![format!(
1050        "{:<20} {:<18} {:<8} {:<12} {}",
1051        "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1052    )];
1053    for s in schedules {
1054        let name = s["name"].as_str().unwrap_or("?");
1055        let cron = s["cron"].as_str().unwrap_or("?");
1056        let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1057            "yes"
1058        } else {
1059            "no"
1060        };
1061        let last_run = s["last_run_at"]
1062            .as_str()
1063            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1064        let node = s["target_node"].as_str().unwrap_or("local");
1065        lines.push(format!(
1066            "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1067        ));
1068    }
1069    lines.join("\n")
1070}
1071
1072/// Select the best node from the peer registry based on load.
1073/// Returns the node address and name of the least loaded online peer.
1074/// Scoring: lower memory usage + fewer active sessions = better.
1075#[cfg(not(coverage))]
1076async fn select_best_node(
1077    client: &reqwest::Client,
1078    base: &str,
1079    token: Option<&str>,
1080) -> Result<(String, String)> {
1081    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1082        .send()
1083        .await?;
1084    let text = ok_or_api_error(resp).await?;
1085    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1086
1087    // Score: fewer active sessions is better, more memory is better
1088    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
1089
1090    for peer in &peers_resp.peers {
1091        if peer.status != pulpo_common::peer::PeerStatus::Online {
1092            continue;
1093        }
1094        let sessions = peer.session_count.unwrap_or(0);
1095        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1096        // Lower score = better (fewer sessions, more memory)
1097        #[allow(clippy::cast_precision_loss)]
1098        let score = sessions as f64 - (mem as f64 / 1024.0);
1099        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1100            best = Some((peer.address.clone(), peer.name.clone(), score));
1101        }
1102    }
1103
1104    // Fall back to local if no online peers
1105    match best {
1106        Some((addr, name, _)) => Ok((addr, name)),
1107        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1108    }
1109}
1110
1111/// Coverage stub — auto-select always falls back to local.
1112#[cfg(coverage)]
1113#[allow(clippy::unnecessary_wraps)]
1114async fn select_best_node(
1115    _client: &reqwest::Client,
1116    _base: &str,
1117    _token: Option<&str>,
1118) -> Result<(String, String)> {
1119    Ok(("localhost:7433".into(), "local".into()))
1120}
1121
1122/// Execute the given CLI command against the specified node.
1123#[allow(clippy::too_many_lines)]
1124pub async fn execute(cli: &Cli) -> Result<String> {
1125    let client = reqwest::Client::new();
1126    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1127    let url = base_url(&resolved_node);
1128    let node = &resolved_node;
1129    let token = resolve_token(&client, &url, node, cli.token.as_deref())
1130        .await
1131        .or(peer_token);
1132
1133    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
1134    if cli.command.is_none() && cli.path.is_none() {
1135        // No subcommand and no path: print help
1136        use clap::CommandFactory;
1137        let mut cmd = Cli::command();
1138        cmd.print_help()?;
1139        println!();
1140        return Ok(String::new());
1141    }
1142    if cli.command.is_none() {
1143        let path = cli.path.as_deref().unwrap_or(".");
1144        let resolved_workdir = resolve_path(path);
1145        let base_name = derive_session_name(&resolved_workdir);
1146        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1147        let body = serde_json::json!({
1148            "name": name,
1149            "workdir": resolved_workdir,
1150        });
1151        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1152            .json(&body)
1153            .send()
1154            .await
1155            .map_err(|e| friendly_error(&e, node))?;
1156        let text = ok_or_api_error(resp).await?;
1157        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1158        let msg = format!(
1159            "Created session \"{}\" ({})",
1160            resp.session.name, resp.session.id
1161        );
1162        let backend_id = resp
1163            .session
1164            .backend_session_id
1165            .as_deref()
1166            .unwrap_or(&resp.session.name);
1167        eprintln!("{msg}");
1168        // Path shortcut spawns a shell (no command) — skip liveness check
1169        // since shell sessions are immediately detected as idle by the watchdog
1170        attach_session(backend_id)?;
1171        return Ok(format!("Detached from session \"{}\".", resp.session.name));
1172    }
1173
1174    match cli.command.as_ref().unwrap() {
1175        Commands::Attach { name } => {
1176            // Fetch session to get status and backend_session_id
1177            let resp = authed_get(
1178                &client,
1179                format!("{url}/api/v1/sessions/{name}"),
1180                token.as_deref(),
1181            )
1182            .send()
1183            .await
1184            .map_err(|e| friendly_error(&e, node))?;
1185            let text = ok_or_api_error(resp).await?;
1186            let session: Session = serde_json::from_str(&text)?;
1187            match session.status.to_string().as_str() {
1188                "lost" => {
1189                    anyhow::bail!(
1190                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
1191                    );
1192                }
1193                "killed" => {
1194                    anyhow::bail!(
1195                        "Session \"{name}\" is {} — cannot attach to a killed session.",
1196                        session.status
1197                    );
1198                }
1199                _ => {}
1200            }
1201            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1202            attach_session(&backend_id)?;
1203            Ok(format!("Detached from session {name}."))
1204        }
1205        Commands::Input { name, text } => {
1206            let input_text = text.as_deref().unwrap_or("\n");
1207            let body = serde_json::json!({ "text": input_text });
1208            let resp = authed_post(
1209                &client,
1210                format!("{url}/api/v1/sessions/{name}/input"),
1211                token.as_deref(),
1212            )
1213            .json(&body)
1214            .send()
1215            .await
1216            .map_err(|e| friendly_error(&e, node))?;
1217            ok_or_api_error(resp).await?;
1218            Ok(format!("Sent input to session {name}."))
1219        }
1220        Commands::List { all } => {
1221            let list_url = if *all {
1222                format!("{url}/api/v1/sessions")
1223            } else {
1224                format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1225            };
1226            let resp = authed_get(&client, list_url, token.as_deref())
1227                .send()
1228                .await
1229                .map_err(|e| friendly_error(&e, node))?;
1230            let text = ok_or_api_error(resp).await?;
1231            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1232            Ok(format_sessions(&sessions))
1233        }
1234        Commands::Nodes => {
1235            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1236                .send()
1237                .await
1238                .map_err(|e| friendly_error(&e, node))?;
1239            let text = ok_or_api_error(resp).await?;
1240            let resp: PeersResponse = serde_json::from_str(&text)?;
1241            Ok(format_nodes(&resp))
1242        }
1243        Commands::Spawn {
1244            workdir,
1245            name,
1246            ink,
1247            description,
1248            detach,
1249            idle_threshold,
1250            auto,
1251            worktree,
1252            runtime,
1253            secret,
1254            command,
1255        } => {
1256            let cmd = if command.is_empty() {
1257                None
1258            } else {
1259                Some(command.join(" "))
1260            };
1261            // Resolve workdir: --workdir flag > current directory
1262            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1263                std::env::current_dir()
1264                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1265            });
1266            // Resolve name: explicit > derived from workdir (with dedup)
1267            let resolved_name = if let Some(n) = name {
1268                n.clone()
1269            } else {
1270                let base_name = derive_session_name(&resolved_workdir);
1271                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1272            };
1273            let mut body = serde_json::json!({
1274                "name": resolved_name,
1275                "workdir": resolved_workdir,
1276            });
1277            if let Some(c) = &cmd {
1278                body["command"] = serde_json::json!(c);
1279            }
1280            if let Some(i) = ink {
1281                body["ink"] = serde_json::json!(i);
1282            }
1283            if let Some(d) = description {
1284                body["description"] = serde_json::json!(d);
1285            }
1286            if let Some(t) = idle_threshold {
1287                body["idle_threshold_secs"] = serde_json::json!(t);
1288            }
1289            if *worktree {
1290                body["worktree"] = serde_json::json!(true);
1291                eprintln!(
1292                    "Worktree: branch pulpo/{resolved_name} in {resolved_workdir}/.pulpo/worktrees/{resolved_name}/"
1293                );
1294            }
1295            if let Some(rt) = runtime {
1296                body["runtime"] = serde_json::json!(rt);
1297            }
1298            if !secret.is_empty() {
1299                body["secrets"] = serde_json::json!(secret);
1300            }
1301            let spawn_url = if *auto {
1302                let (auto_addr, auto_name) =
1303                    select_best_node(&client, &url, token.as_deref()).await?;
1304                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1305                base_url(&auto_addr)
1306            } else {
1307                url.clone()
1308            };
1309            let resp = authed_post(
1310                &client,
1311                format!("{spawn_url}/api/v1/sessions"),
1312                token.as_deref(),
1313            )
1314            .json(&body)
1315            .send()
1316            .await
1317            .map_err(|e| friendly_error(&e, node))?;
1318            let text = ok_or_api_error(resp).await?;
1319            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1320            let msg = format!(
1321                "Created session \"{}\" ({})",
1322                resp.session.name, resp.session.id
1323            );
1324            if !detach {
1325                let backend_id = resp
1326                    .session
1327                    .backend_session_id
1328                    .as_deref()
1329                    .unwrap_or(&resp.session.name);
1330                eprintln!("{msg}");
1331                // Only check liveness for explicit commands — shell sessions (no command)
1332                // may be immediately marked idle/killed by the watchdog, which is expected
1333                if cmd.is_some() {
1334                    let sid = resp.session.id.to_string();
1335                    check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1336                }
1337                attach_session(backend_id)?;
1338                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1339            }
1340            Ok(msg)
1341        }
1342        Commands::Kill { names } => {
1343            let mut results = Vec::new();
1344            for name in names {
1345                let resp = authed_post(
1346                    &client,
1347                    format!("{url}/api/v1/sessions/{name}/kill"),
1348                    token.as_deref(),
1349                )
1350                .send()
1351                .await
1352                .map_err(|e| friendly_error(&e, node))?;
1353                match ok_or_api_error(resp).await {
1354                    Ok(_) => results.push(format!("Session {name} killed.")),
1355                    Err(e) => results.push(format!("Error killing {name}: {e}")),
1356                }
1357            }
1358            Ok(results.join("\n"))
1359        }
1360        Commands::Delete { names } => {
1361            let mut results = Vec::new();
1362            for name in names {
1363                let resp = authed_delete(
1364                    &client,
1365                    format!("{url}/api/v1/sessions/{name}"),
1366                    token.as_deref(),
1367                )
1368                .send()
1369                .await
1370                .map_err(|e| friendly_error(&e, node))?;
1371                match ok_or_api_error(resp).await {
1372                    Ok(_) => results.push(format!("Session {name} deleted.")),
1373                    Err(e) => results.push(format!("Error deleting {name}: {e}")),
1374                }
1375            }
1376            Ok(results.join("\n"))
1377        }
1378        Commands::Cleanup => {
1379            let resp = authed_post(
1380                &client,
1381                format!("{url}/api/v1/sessions/cleanup"),
1382                token.as_deref(),
1383            )
1384            .send()
1385            .await
1386            .map_err(|e| friendly_error(&e, node))?;
1387            let text = ok_or_api_error(resp).await?;
1388            let result: serde_json::Value = serde_json::from_str(&text)?;
1389            let count = result["deleted"].as_u64().unwrap_or(0);
1390            if count == 0 {
1391                Ok("No killed or lost sessions to clean up.".into())
1392            } else {
1393                Ok(format!("Cleaned up {count} session(s)."))
1394            }
1395        }
1396        Commands::Logs {
1397            name,
1398            lines,
1399            follow,
1400        } => {
1401            if *follow {
1402                let mut stdout = std::io::stdout();
1403                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1404                    .await
1405                    .map_err(|e| {
1406                        // Unwrap reqwest errors to friendly messages
1407                        match e.downcast::<reqwest::Error>() {
1408                            Ok(re) => friendly_error(&re, node),
1409                            Err(other) => other,
1410                        }
1411                    })?;
1412                Ok(String::new())
1413            } else {
1414                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1415                    .await
1416                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1417                        Ok(re) => friendly_error(&re, node),
1418                        Err(other) => other,
1419                    })?;
1420                Ok(output)
1421            }
1422        }
1423        Commands::Interventions { name } => {
1424            let resp = authed_get(
1425                &client,
1426                format!("{url}/api/v1/sessions/{name}/interventions"),
1427                token.as_deref(),
1428            )
1429            .send()
1430            .await
1431            .map_err(|e| friendly_error(&e, node))?;
1432            let text = ok_or_api_error(resp).await?;
1433            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1434            Ok(format_interventions(&events))
1435        }
1436        Commands::Ui => {
1437            let dashboard = base_url(node);
1438            open_browser(&dashboard)?;
1439            Ok(format!("Opening {dashboard}"))
1440        }
1441        Commands::Resume { name } => {
1442            let resp = authed_post(
1443                &client,
1444                format!("{url}/api/v1/sessions/{name}/resume"),
1445                token.as_deref(),
1446            )
1447            .send()
1448            .await
1449            .map_err(|e| friendly_error(&e, node))?;
1450            let text = ok_or_api_error(resp).await?;
1451            let session: Session = serde_json::from_str(&text)?;
1452            let backend_id = session
1453                .backend_session_id
1454                .as_deref()
1455                .unwrap_or(&session.name);
1456            eprintln!("Resumed session \"{}\"", session.name);
1457            let sid = session.id.to_string();
1458            check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1459            attach_session(backend_id)?;
1460            Ok(format!("Detached from session \"{}\".", session.name))
1461        }
1462        Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1463            .await
1464            .map_err(|e| match e.downcast::<reqwest::Error>() {
1465                Ok(re) => friendly_error(&re, node),
1466                Err(other) => other,
1467            }),
1468        Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1469            .await
1470            .map_err(|e| match e.downcast::<reqwest::Error>() {
1471                Ok(re) => friendly_error(&re, node),
1472                Err(other) => other,
1473            }),
1474    }
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479    use super::*;
1480
1481    #[test]
1482    fn test_base_url() {
1483        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1484        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1485    }
1486
1487    #[test]
1488    fn test_cli_parse_list() {
1489        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1490        assert_eq!(cli.node, "localhost:7433");
1491        assert!(matches!(cli.command, Some(Commands::List { .. })));
1492    }
1493
1494    #[test]
1495    fn test_cli_parse_nodes() {
1496        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1497        assert!(matches!(cli.command, Some(Commands::Nodes)));
1498    }
1499
1500    #[test]
1501    fn test_cli_parse_ui() {
1502        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1503        assert!(matches!(cli.command, Some(Commands::Ui)));
1504    }
1505
1506    #[test]
1507    fn test_cli_parse_ui_custom_node() {
1508        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1509        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1510        assert_eq!(cli.node, "mac-mini:7433");
1511        assert_eq!(cli.path.as_deref(), Some("ui"));
1512    }
1513
1514    #[test]
1515    fn test_build_open_command() {
1516        let cmd = build_open_command("http://localhost:7433");
1517        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1518        assert_eq!(args, vec!["http://localhost:7433"]);
1519        #[cfg(target_os = "macos")]
1520        assert_eq!(cmd.get_program(), "open");
1521        #[cfg(target_os = "linux")]
1522        assert_eq!(cmd.get_program(), "xdg-open");
1523    }
1524
1525    #[test]
1526    fn test_cli_parse_spawn() {
1527        let cli = Cli::try_parse_from([
1528            "pulpo",
1529            "spawn",
1530            "my-task",
1531            "--workdir",
1532            "/tmp/repo",
1533            "--",
1534            "claude",
1535            "-p",
1536            "Fix the bug",
1537        ])
1538        .unwrap();
1539        assert!(matches!(
1540            &cli.command,
1541            Some(Commands::Spawn { name, workdir, command, .. })
1542                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1543                && command == &["claude", "-p", "Fix the bug"]
1544        ));
1545    }
1546
1547    #[test]
1548    fn test_cli_parse_spawn_with_ink() {
1549        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1550        assert!(matches!(
1551            &cli.command,
1552            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1553        ));
1554    }
1555
1556    #[test]
1557    fn test_cli_parse_spawn_with_description() {
1558        let cli =
1559            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1560                .unwrap();
1561        assert!(matches!(
1562            &cli.command,
1563            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1564        ));
1565    }
1566
1567    #[test]
1568    fn test_cli_parse_spawn_name_positional() {
1569        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1570        assert!(matches!(
1571            &cli.command,
1572            Some(Commands::Spawn { name, command, .. })
1573                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1574        ));
1575    }
1576
1577    #[test]
1578    fn test_cli_parse_spawn_no_command() {
1579        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1580        assert!(matches!(
1581            &cli.command,
1582            Some(Commands::Spawn { command, .. }) if command.is_empty()
1583        ));
1584    }
1585
1586    #[test]
1587    fn test_cli_parse_spawn_idle_threshold() {
1588        let cli =
1589            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1590        assert!(matches!(
1591            &cli.command,
1592            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1593        ));
1594    }
1595
1596    #[test]
1597    fn test_cli_parse_spawn_idle_threshold_60() {
1598        let cli =
1599            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1600        assert!(matches!(
1601            &cli.command,
1602            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1603        ));
1604    }
1605
1606    #[test]
1607    fn test_cli_parse_spawn_secrets() {
1608        let cli = Cli::try_parse_from([
1609            "pulpo",
1610            "spawn",
1611            "my-task",
1612            "--secret",
1613            "GITHUB_TOKEN",
1614            "--secret",
1615            "NPM_TOKEN",
1616        ])
1617        .unwrap();
1618        assert!(matches!(
1619            &cli.command,
1620            Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1621        ));
1622    }
1623
1624    #[test]
1625    fn test_cli_parse_spawn_no_secrets() {
1626        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1627        assert!(matches!(
1628            &cli.command,
1629            Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1630        ));
1631    }
1632
1633    #[test]
1634    fn test_cli_parse_secret_set_with_env() {
1635        let cli = Cli::try_parse_from([
1636            "pulpo",
1637            "secret",
1638            "set",
1639            "GH_WORK",
1640            "token123",
1641            "--env",
1642            "GITHUB_TOKEN",
1643        ])
1644        .unwrap();
1645        assert!(matches!(
1646            &cli.command,
1647            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1648                if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1649        ));
1650    }
1651
1652    #[test]
1653    fn test_cli_parse_secret_set_without_env() {
1654        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1655        assert!(matches!(
1656            &cli.command,
1657            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1658                if name == "MY_KEY" && value == "val" && env.is_none()
1659        ));
1660    }
1661
1662    #[test]
1663    fn test_cli_parse_spawn_worktree() {
1664        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1665        assert!(matches!(
1666            &cli.command,
1667            Some(Commands::Spawn { worktree, .. }) if *worktree
1668        ));
1669    }
1670
1671    #[test]
1672    fn test_cli_parse_spawn_detach() {
1673        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1674        assert!(matches!(
1675            &cli.command,
1676            Some(Commands::Spawn { detach, .. }) if *detach
1677        ));
1678    }
1679
1680    #[test]
1681    fn test_cli_parse_spawn_detach_short() {
1682        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1683        assert!(matches!(
1684            &cli.command,
1685            Some(Commands::Spawn { detach, .. }) if *detach
1686        ));
1687    }
1688
1689    #[test]
1690    fn test_cli_parse_spawn_detach_default() {
1691        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1692        assert!(matches!(
1693            &cli.command,
1694            Some(Commands::Spawn { detach, .. }) if !detach
1695        ));
1696    }
1697
1698    #[test]
1699    fn test_cli_parse_logs() {
1700        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1701        assert!(matches!(
1702            &cli.command,
1703            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1704        ));
1705    }
1706
1707    #[test]
1708    fn test_cli_parse_logs_with_lines() {
1709        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1710        assert!(matches!(
1711            &cli.command,
1712            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1713        ));
1714    }
1715
1716    #[test]
1717    fn test_cli_parse_logs_follow() {
1718        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1719        assert!(matches!(
1720            &cli.command,
1721            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1722        ));
1723    }
1724
1725    #[test]
1726    fn test_cli_parse_logs_follow_short() {
1727        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1728        assert!(matches!(
1729            &cli.command,
1730            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1731        ));
1732    }
1733
1734    #[test]
1735    fn test_cli_parse_kill() {
1736        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1737        assert!(matches!(
1738            &cli.command,
1739            Some(Commands::Kill { names }) if names == &["my-session"]
1740        ));
1741    }
1742
1743    #[test]
1744    fn test_cli_parse_delete() {
1745        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1746        assert!(matches!(
1747            &cli.command,
1748            Some(Commands::Delete { names }) if names == &["my-session"]
1749        ));
1750    }
1751
1752    #[test]
1753    fn test_cli_parse_resume() {
1754        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1755        assert!(matches!(
1756            &cli.command,
1757            Some(Commands::Resume { name }) if name == "my-session"
1758        ));
1759    }
1760
1761    #[test]
1762    fn test_cli_parse_input() {
1763        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1764        assert!(matches!(
1765            &cli.command,
1766            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1767        ));
1768    }
1769
1770    #[test]
1771    fn test_cli_parse_input_no_text() {
1772        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1773        assert!(matches!(
1774            &cli.command,
1775            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1776        ));
1777    }
1778
1779    #[test]
1780    fn test_cli_parse_input_alias() {
1781        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1782        assert!(matches!(
1783            &cli.command,
1784            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1785        ));
1786    }
1787
1788    #[test]
1789    fn test_cli_parse_custom_node() {
1790        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1791        assert_eq!(cli.node, "win-pc:8080");
1792        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
1793        assert_eq!(cli.path.as_deref(), Some("list"));
1794    }
1795
1796    #[test]
1797    fn test_cli_version() {
1798        let result = Cli::try_parse_from(["pulpo", "--version"]);
1799        // clap exits with an error (kind DisplayVersion) when --version is used
1800        let err = result.unwrap_err();
1801        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1802    }
1803
1804    #[test]
1805    fn test_cli_parse_no_subcommand_succeeds() {
1806        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1807        assert!(cli.command.is_none());
1808        assert!(cli.path.is_none());
1809    }
1810
1811    #[test]
1812    fn test_cli_debug() {
1813        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1814        let debug = format!("{cli:?}");
1815        assert!(debug.contains("List"));
1816    }
1817
1818    #[test]
1819    fn test_commands_debug() {
1820        let cmd = Commands::List { all: false };
1821        assert_eq!(format!("{cmd:?}"), "List { all: false }");
1822    }
1823
1824    /// A valid Session JSON for test responses.
1825    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1826
1827    /// A valid `CreateSessionResponse` JSON wrapping the session.
1828    fn test_create_response_json() -> String {
1829        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1830    }
1831
1832    /// Start a lightweight test HTTP server and return its address.
1833    async fn start_test_server() -> String {
1834        use axum::http::StatusCode;
1835        use axum::{
1836            Json, Router,
1837            routing::{get, post},
1838        };
1839
1840        let create_json = test_create_response_json();
1841
1842        let app = Router::new()
1843            .route(
1844                "/api/v1/sessions",
1845                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1846                    (StatusCode::CREATED, create_json.clone())
1847                }),
1848            )
1849            .route(
1850                "/api/v1/sessions/{id}",
1851                get(|| async { TEST_SESSION_JSON.to_owned() })
1852                    .delete(|| async { StatusCode::NO_CONTENT }),
1853            )
1854            .route(
1855                "/api/v1/sessions/{id}/kill",
1856                post(|| async { StatusCode::NO_CONTENT }),
1857            )
1858            .route(
1859                "/api/v1/sessions/{id}/output",
1860                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1861            )
1862            .route(
1863                "/api/v1/peers",
1864                get(|| async {
1865                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1866                }),
1867            )
1868            .route(
1869                "/api/v1/sessions/{id}/resume",
1870                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1871            )
1872            .route(
1873                "/api/v1/sessions/{id}/interventions",
1874                get(|| async { "[]".to_owned() }),
1875            )
1876            .route(
1877                "/api/v1/sessions/{id}/input",
1878                post(|| async { StatusCode::NO_CONTENT }),
1879            )
1880            .route(
1881                "/api/v1/schedules",
1882                get(|| async { Json::<Vec<()>>(vec![]) })
1883                    .post(|| async { StatusCode::CREATED }),
1884            )
1885            .route(
1886                "/api/v1/schedules/{id}",
1887                axum::routing::put(|| async { StatusCode::OK })
1888                    .delete(|| async { StatusCode::NO_CONTENT }),
1889            );
1890
1891        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1892        let addr = listener.local_addr().unwrap();
1893        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1894        format!("127.0.0.1:{}", addr.port())
1895    }
1896
1897    #[tokio::test]
1898    async fn test_execute_list_success() {
1899        let node = start_test_server().await;
1900        let cli = Cli {
1901            node,
1902            token: None,
1903            command: Some(Commands::List { all: false }),
1904            path: None,
1905        };
1906        let result = execute(&cli).await.unwrap();
1907        assert_eq!(result, "No sessions.");
1908    }
1909
1910    #[tokio::test]
1911    async fn test_execute_nodes_success() {
1912        let node = start_test_server().await;
1913        let cli = Cli {
1914            node,
1915            token: None,
1916            command: Some(Commands::Nodes),
1917            path: None,
1918        };
1919        let result = execute(&cli).await.unwrap();
1920        assert!(result.contains("test"));
1921        assert!(result.contains("(local)"));
1922        assert!(result.contains("NAME"));
1923    }
1924
1925    #[tokio::test]
1926    async fn test_execute_spawn_success() {
1927        let node = start_test_server().await;
1928        let cli = Cli {
1929            node,
1930            token: None,
1931            command: Some(Commands::Spawn {
1932                name: Some("test".into()),
1933                workdir: Some("/tmp/repo".into()),
1934                ink: None,
1935                description: None,
1936                detach: true,
1937                idle_threshold: None,
1938                auto: false,
1939                worktree: false,
1940                runtime: None,
1941                secret: vec![],
1942                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1943            }),
1944            path: None,
1945        };
1946        let result = execute(&cli).await.unwrap();
1947        assert!(result.contains("Created session"));
1948        assert!(result.contains("repo"));
1949    }
1950
1951    #[tokio::test]
1952    async fn test_execute_spawn_with_all_flags() {
1953        let node = start_test_server().await;
1954        let cli = Cli {
1955            node,
1956            token: None,
1957            command: Some(Commands::Spawn {
1958                name: Some("test".into()),
1959                workdir: Some("/tmp/repo".into()),
1960                ink: Some("coder".into()),
1961                description: Some("Fix the bug".into()),
1962                detach: true,
1963                idle_threshold: None,
1964                auto: false,
1965                worktree: false,
1966                runtime: None,
1967                secret: vec![],
1968                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1969            }),
1970            path: None,
1971        };
1972        let result = execute(&cli).await.unwrap();
1973        assert!(result.contains("Created session"));
1974    }
1975
1976    #[tokio::test]
1977    async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
1978        let node = start_test_server().await;
1979        let cli = Cli {
1980            node,
1981            token: None,
1982            command: Some(Commands::Spawn {
1983                name: Some("full-opts".into()),
1984                workdir: Some("/tmp/repo".into()),
1985                ink: Some("coder".into()),
1986                description: Some("Full options".into()),
1987                detach: true,
1988                idle_threshold: Some(120),
1989                auto: false,
1990                worktree: true,
1991                runtime: Some("docker".into()),
1992                secret: vec![],
1993                command: vec!["claude".into()],
1994            }),
1995            path: None,
1996        };
1997        let result = execute(&cli).await.unwrap();
1998        assert!(result.contains("Created session"));
1999    }
2000
2001    #[tokio::test]
2002    async fn test_execute_spawn_no_name_derives_from_workdir() {
2003        let node = start_test_server().await;
2004        let cli = Cli {
2005            node,
2006            token: None,
2007            command: Some(Commands::Spawn {
2008                name: None,
2009                workdir: Some("/tmp/my-project".into()),
2010                ink: None,
2011                description: None,
2012                detach: true,
2013                idle_threshold: None,
2014                auto: false,
2015                worktree: false,
2016                runtime: None,
2017                secret: vec![],
2018                command: vec!["echo".into(), "hello".into()],
2019            }),
2020            path: None,
2021        };
2022        let result = execute(&cli).await.unwrap();
2023        assert!(result.contains("Created session"));
2024    }
2025
2026    #[tokio::test]
2027    async fn test_execute_spawn_no_command() {
2028        let node = start_test_server().await;
2029        let cli = Cli {
2030            node,
2031            token: None,
2032            command: Some(Commands::Spawn {
2033                name: Some("test".into()),
2034                workdir: Some("/tmp/repo".into()),
2035                ink: None,
2036                description: None,
2037                detach: true,
2038                idle_threshold: None,
2039                auto: false,
2040                worktree: false,
2041                runtime: None,
2042                secret: vec![],
2043                command: vec![],
2044            }),
2045            path: None,
2046        };
2047        let result = execute(&cli).await.unwrap();
2048        assert!(result.contains("Created session"));
2049    }
2050
2051    #[tokio::test]
2052    async fn test_execute_spawn_with_name() {
2053        let node = start_test_server().await;
2054        let cli = Cli {
2055            node,
2056            token: None,
2057            command: Some(Commands::Spawn {
2058                name: Some("my-task".into()),
2059                workdir: Some("/tmp/repo".into()),
2060                ink: None,
2061                description: None,
2062                detach: true,
2063                idle_threshold: None,
2064                auto: false,
2065                worktree: false,
2066                runtime: None,
2067                secret: vec![],
2068                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2069            }),
2070            path: None,
2071        };
2072        let result = execute(&cli).await.unwrap();
2073        assert!(result.contains("Created session"));
2074    }
2075
2076    #[tokio::test]
2077    async fn test_execute_spawn_auto_attach() {
2078        let node = start_test_server().await;
2079        let cli = Cli {
2080            node,
2081            token: None,
2082            command: Some(Commands::Spawn {
2083                name: Some("test".into()),
2084                workdir: Some("/tmp/repo".into()),
2085                ink: None,
2086                description: None,
2087                detach: false,
2088                idle_threshold: None,
2089                auto: false,
2090                worktree: false,
2091                runtime: None,
2092                secret: vec![],
2093                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2094            }),
2095            path: None,
2096        };
2097        let result = execute(&cli).await.unwrap();
2098        // When not detached, spawn prints creation to stderr and returns detach message
2099        assert!(result.contains("Detached from session"));
2100    }
2101
2102    #[tokio::test]
2103    async fn test_execute_kill_success() {
2104        let node = start_test_server().await;
2105        let cli = Cli {
2106            node,
2107            token: None,
2108            command: Some(Commands::Kill {
2109                names: vec!["test-session".into()],
2110            }),
2111            path: None,
2112        };
2113        let result = execute(&cli).await.unwrap();
2114        assert!(result.contains("killed"));
2115    }
2116
2117    #[tokio::test]
2118    async fn test_execute_delete_success() {
2119        let node = start_test_server().await;
2120        let cli = Cli {
2121            node,
2122            token: None,
2123            command: Some(Commands::Delete {
2124                names: vec!["test-session".into()],
2125            }),
2126            path: None,
2127        };
2128        let result = execute(&cli).await.unwrap();
2129        assert!(result.contains("deleted"));
2130    }
2131
2132    #[tokio::test]
2133    async fn test_execute_logs_success() {
2134        let node = start_test_server().await;
2135        let cli = Cli {
2136            node,
2137            token: None,
2138            command: Some(Commands::Logs {
2139                name: "test-session".into(),
2140                lines: 50,
2141                follow: false,
2142            }),
2143            path: None,
2144        };
2145        let result = execute(&cli).await.unwrap();
2146        assert!(result.contains("test output"));
2147    }
2148
2149    #[tokio::test]
2150    async fn test_execute_list_connection_refused() {
2151        let cli = Cli {
2152            node: "localhost:1".into(),
2153            token: None,
2154            command: Some(Commands::List { all: false }),
2155            path: None,
2156        };
2157        let result = execute(&cli).await;
2158        let err = result.unwrap_err().to_string();
2159        assert!(
2160            err.contains("Could not connect to pulpod"),
2161            "Expected friendly error, got: {err}"
2162        );
2163        assert!(err.contains("localhost:1"));
2164    }
2165
2166    #[tokio::test]
2167    async fn test_execute_nodes_connection_refused() {
2168        let cli = Cli {
2169            node: "localhost:1".into(),
2170            token: None,
2171            command: Some(Commands::Nodes),
2172            path: None,
2173        };
2174        let result = execute(&cli).await;
2175        let err = result.unwrap_err().to_string();
2176        assert!(err.contains("Could not connect to pulpod"));
2177    }
2178
2179    #[tokio::test]
2180    async fn test_execute_kill_error_response() {
2181        use axum::{Router, http::StatusCode, routing::post};
2182
2183        let app = Router::new().route(
2184            "/api/v1/sessions/{id}/kill",
2185            post(|| async {
2186                (
2187                    StatusCode::NOT_FOUND,
2188                    "{\"error\":\"session not found: test-session\"}",
2189                )
2190            }),
2191        );
2192        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2193        let addr = listener.local_addr().unwrap();
2194        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2195        let node = format!("127.0.0.1:{}", addr.port());
2196
2197        let cli = Cli {
2198            node,
2199            token: None,
2200            command: Some(Commands::Kill {
2201                names: vec!["test-session".into()],
2202            }),
2203            path: None,
2204        };
2205        let result = execute(&cli).await.unwrap();
2206        assert!(result.contains("Error killing test-session"), "{result}");
2207    }
2208
2209    #[tokio::test]
2210    async fn test_execute_delete_error_response() {
2211        use axum::{Router, http::StatusCode, routing::delete};
2212
2213        let app = Router::new().route(
2214            "/api/v1/sessions/{id}",
2215            delete(|| async {
2216                (
2217                    StatusCode::CONFLICT,
2218                    "{\"error\":\"cannot delete session in 'running' state\"}",
2219                )
2220            }),
2221        );
2222        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2223        let addr = listener.local_addr().unwrap();
2224        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2225        let node = format!("127.0.0.1:{}", addr.port());
2226
2227        let cli = Cli {
2228            node,
2229            token: None,
2230            command: Some(Commands::Delete {
2231                names: vec!["test-session".into()],
2232            }),
2233            path: None,
2234        };
2235        let result = execute(&cli).await.unwrap();
2236        assert!(result.contains("Error deleting test-session"), "{result}");
2237    }
2238
2239    #[tokio::test]
2240    async fn test_execute_logs_error_response() {
2241        use axum::{Router, http::StatusCode, routing::get};
2242
2243        let app = Router::new().route(
2244            "/api/v1/sessions/{id}/output",
2245            get(|| async {
2246                (
2247                    StatusCode::NOT_FOUND,
2248                    "{\"error\":\"session not found: ghost\"}",
2249                )
2250            }),
2251        );
2252        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2253        let addr = listener.local_addr().unwrap();
2254        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2255        let node = format!("127.0.0.1:{}", addr.port());
2256
2257        let cli = Cli {
2258            node,
2259            token: None,
2260            command: Some(Commands::Logs {
2261                name: "ghost".into(),
2262                lines: 50,
2263                follow: false,
2264            }),
2265            path: None,
2266        };
2267        let err = execute(&cli).await.unwrap_err();
2268        assert_eq!(err.to_string(), "session not found: ghost");
2269    }
2270
2271    #[tokio::test]
2272    async fn test_execute_resume_error_response() {
2273        use axum::{Router, http::StatusCode, routing::post};
2274
2275        let app = Router::new().route(
2276            "/api/v1/sessions/{id}/resume",
2277            post(|| async {
2278                (
2279                    StatusCode::BAD_REQUEST,
2280                    "{\"error\":\"session is not lost (status: active)\"}",
2281                )
2282            }),
2283        );
2284        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2285        let addr = listener.local_addr().unwrap();
2286        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2287        let node = format!("127.0.0.1:{}", addr.port());
2288
2289        let cli = Cli {
2290            node,
2291            token: None,
2292            command: Some(Commands::Resume {
2293                name: "test-session".into(),
2294            }),
2295            path: None,
2296        };
2297        let err = execute(&cli).await.unwrap_err();
2298        assert_eq!(err.to_string(), "session is not lost (status: active)");
2299    }
2300
2301    #[tokio::test]
2302    async fn test_execute_spawn_error_response() {
2303        use axum::{Router, http::StatusCode, routing::post};
2304
2305        let app = Router::new().route(
2306            "/api/v1/sessions",
2307            post(|| async {
2308                (
2309                    StatusCode::INTERNAL_SERVER_ERROR,
2310                    "{\"error\":\"failed to spawn session\"}",
2311                )
2312            }),
2313        );
2314        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2315        let addr = listener.local_addr().unwrap();
2316        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2317        let node = format!("127.0.0.1:{}", addr.port());
2318
2319        let cli = Cli {
2320            node,
2321            token: None,
2322            command: Some(Commands::Spawn {
2323                name: Some("test".into()),
2324                workdir: Some("/tmp/repo".into()),
2325                ink: None,
2326                description: None,
2327                detach: true,
2328                idle_threshold: None,
2329                auto: false,
2330                worktree: false,
2331                runtime: None,
2332                secret: vec![],
2333                command: vec!["test".into()],
2334            }),
2335            path: None,
2336        };
2337        let err = execute(&cli).await.unwrap_err();
2338        assert_eq!(err.to_string(), "failed to spawn session");
2339    }
2340
2341    #[tokio::test]
2342    async fn test_execute_interventions_error_response() {
2343        use axum::{Router, http::StatusCode, routing::get};
2344
2345        let app = Router::new().route(
2346            "/api/v1/sessions/{id}/interventions",
2347            get(|| async {
2348                (
2349                    StatusCode::NOT_FOUND,
2350                    "{\"error\":\"session not found: ghost\"}",
2351                )
2352            }),
2353        );
2354        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2355        let addr = listener.local_addr().unwrap();
2356        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2357        let node = format!("127.0.0.1:{}", addr.port());
2358
2359        let cli = Cli {
2360            node,
2361            token: None,
2362            command: Some(Commands::Interventions {
2363                name: "ghost".into(),
2364            }),
2365            path: None,
2366        };
2367        let err = execute(&cli).await.unwrap_err();
2368        assert_eq!(err.to_string(), "session not found: ghost");
2369    }
2370
2371    #[tokio::test]
2372    async fn test_execute_resume_success() {
2373        let node = start_test_server().await;
2374        let cli = Cli {
2375            node,
2376            token: None,
2377            command: Some(Commands::Resume {
2378                name: "test-session".into(),
2379            }),
2380            path: None,
2381        };
2382        let result = execute(&cli).await.unwrap();
2383        assert!(result.contains("Detached from session"));
2384    }
2385
2386    #[tokio::test]
2387    async fn test_execute_input_success() {
2388        let node = start_test_server().await;
2389        let cli = Cli {
2390            node,
2391            token: None,
2392            command: Some(Commands::Input {
2393                name: "test-session".into(),
2394                text: Some("yes".into()),
2395            }),
2396            path: None,
2397        };
2398        let result = execute(&cli).await.unwrap();
2399        assert!(result.contains("Sent input to session test-session"));
2400    }
2401
2402    #[tokio::test]
2403    async fn test_execute_input_no_text() {
2404        let node = start_test_server().await;
2405        let cli = Cli {
2406            node,
2407            token: None,
2408            command: Some(Commands::Input {
2409                name: "test-session".into(),
2410                text: None,
2411            }),
2412            path: None,
2413        };
2414        let result = execute(&cli).await.unwrap();
2415        assert!(result.contains("Sent input to session test-session"));
2416    }
2417
2418    #[tokio::test]
2419    async fn test_execute_input_connection_refused() {
2420        let cli = Cli {
2421            node: "localhost:1".into(),
2422            token: None,
2423            command: Some(Commands::Input {
2424                name: "test".into(),
2425                text: Some("y".into()),
2426            }),
2427            path: None,
2428        };
2429        let result = execute(&cli).await;
2430        let err = result.unwrap_err().to_string();
2431        assert!(err.contains("Could not connect to pulpod"));
2432    }
2433
2434    #[tokio::test]
2435    async fn test_execute_input_error_response() {
2436        use axum::{Router, http::StatusCode, routing::post};
2437
2438        let app = Router::new().route(
2439            "/api/v1/sessions/{id}/input",
2440            post(|| async {
2441                (
2442                    StatusCode::NOT_FOUND,
2443                    "{\"error\":\"session not found: ghost\"}",
2444                )
2445            }),
2446        );
2447        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2448        let addr = listener.local_addr().unwrap();
2449        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2450        let node = format!("127.0.0.1:{}", addr.port());
2451
2452        let cli = Cli {
2453            node,
2454            token: None,
2455            command: Some(Commands::Input {
2456                name: "ghost".into(),
2457                text: Some("y".into()),
2458            }),
2459            path: None,
2460        };
2461        let err = execute(&cli).await.unwrap_err();
2462        assert_eq!(err.to_string(), "session not found: ghost");
2463    }
2464
2465    #[tokio::test]
2466    async fn test_execute_ui() {
2467        let cli = Cli {
2468            node: "localhost:7433".into(),
2469            token: None,
2470            command: Some(Commands::Ui),
2471            path: None,
2472        };
2473        let result = execute(&cli).await.unwrap();
2474        assert!(result.contains("Opening"));
2475        assert!(result.contains("http://localhost:7433"));
2476    }
2477
2478    #[tokio::test]
2479    async fn test_execute_ui_custom_node() {
2480        let cli = Cli {
2481            node: "mac-mini:7433".into(),
2482            token: None,
2483            command: Some(Commands::Ui),
2484            path: None,
2485        };
2486        let result = execute(&cli).await.unwrap();
2487        assert!(result.contains("http://mac-mini:7433"));
2488    }
2489
2490    #[test]
2491    fn test_format_sessions_empty() {
2492        assert_eq!(format_sessions(&[]), "No sessions.");
2493    }
2494
2495    #[test]
2496    fn test_format_sessions_with_data() {
2497        use chrono::Utc;
2498        use pulpo_common::session::SessionStatus;
2499        use uuid::Uuid;
2500
2501        let sessions = vec![Session {
2502            id: Uuid::nil(),
2503            name: "my-api".into(),
2504            workdir: "/tmp/repo".into(),
2505            command: "claude -p 'Fix the bug'".into(),
2506            description: Some("Fix the bug".into()),
2507            status: SessionStatus::Active,
2508            exit_code: None,
2509            backend_session_id: None,
2510            output_snapshot: None,
2511            metadata: None,
2512            ink: None,
2513            intervention_code: None,
2514            intervention_reason: None,
2515            intervention_at: None,
2516            last_output_at: None,
2517            idle_since: None,
2518            idle_threshold_secs: None,
2519            worktree_path: None,
2520            runtime: Runtime::Tmux,
2521            created_at: Utc::now(),
2522            updated_at: Utc::now(),
2523        }];
2524        let output = format_sessions(&sessions);
2525        assert!(output.contains("ID"));
2526        assert!(output.contains("NAME"));
2527        assert!(output.contains("RUNTIME"));
2528        assert!(output.contains("COMMAND"));
2529        assert!(output.contains("00000000"));
2530        assert!(output.contains("my-api"));
2531        assert!(output.contains("active"));
2532        assert!(output.contains("tmux"));
2533        assert!(output.contains("claude -p 'Fix the bug'"));
2534    }
2535
2536    #[test]
2537    fn test_format_sessions_docker_runtime() {
2538        use chrono::Utc;
2539        use pulpo_common::session::SessionStatus;
2540        use uuid::Uuid;
2541
2542        let sessions = vec![Session {
2543            id: Uuid::nil(),
2544            name: "sandbox-test".into(),
2545            workdir: "/tmp".into(),
2546            command: "claude".into(),
2547            description: None,
2548            status: SessionStatus::Active,
2549            exit_code: None,
2550            backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2551            output_snapshot: None,
2552            metadata: None,
2553            ink: None,
2554            intervention_code: None,
2555            intervention_reason: None,
2556            intervention_at: None,
2557            last_output_at: None,
2558            idle_since: None,
2559            idle_threshold_secs: None,
2560            worktree_path: None,
2561            runtime: Runtime::Docker,
2562            created_at: Utc::now(),
2563            updated_at: Utc::now(),
2564        }];
2565        let output = format_sessions(&sessions);
2566        assert!(output.contains("docker"));
2567    }
2568
2569    #[test]
2570    fn test_format_sessions_long_command_truncated() {
2571        use chrono::Utc;
2572        use pulpo_common::session::SessionStatus;
2573        use uuid::Uuid;
2574
2575        let sessions = vec![Session {
2576            id: Uuid::nil(),
2577            name: "test".into(),
2578            workdir: "/tmp".into(),
2579            command:
2580                "claude -p 'A very long command that exceeds fifty characters in total length here'"
2581                    .into(),
2582            description: None,
2583            status: SessionStatus::Ready,
2584            exit_code: None,
2585            backend_session_id: None,
2586            output_snapshot: None,
2587            metadata: None,
2588            ink: None,
2589            intervention_code: None,
2590            intervention_reason: None,
2591            intervention_at: None,
2592            last_output_at: None,
2593            idle_since: None,
2594            idle_threshold_secs: None,
2595            worktree_path: None,
2596            runtime: Runtime::Tmux,
2597            created_at: Utc::now(),
2598            updated_at: Utc::now(),
2599        }];
2600        let output = format_sessions(&sessions);
2601        assert!(output.contains("..."));
2602    }
2603
2604    #[test]
2605    fn test_format_sessions_worktree_indicator() {
2606        use chrono::Utc;
2607        use pulpo_common::session::SessionStatus;
2608        use uuid::Uuid;
2609
2610        let sessions = vec![Session {
2611            id: Uuid::nil(),
2612            name: "wt-task".into(),
2613            workdir: "/repo/.pulpo/worktrees/wt-task".into(),
2614            command: "claude".into(),
2615            description: None,
2616            status: SessionStatus::Active,
2617            exit_code: None,
2618            backend_session_id: None,
2619            output_snapshot: None,
2620            metadata: None,
2621            ink: None,
2622            intervention_code: None,
2623            intervention_reason: None,
2624            intervention_at: None,
2625            last_output_at: None,
2626            idle_since: None,
2627            idle_threshold_secs: None,
2628            worktree_path: Some("/repo/.pulpo/worktrees/wt-task".into()),
2629            runtime: Runtime::Tmux,
2630            created_at: Utc::now(),
2631            updated_at: Utc::now(),
2632        }];
2633        let output = format_sessions(&sessions);
2634        assert!(
2635            output.contains("[wt]"),
2636            "should show worktree indicator: {output}"
2637        );
2638        assert!(output.contains("wt-task [wt]"));
2639    }
2640
2641    #[test]
2642    fn test_format_sessions_pr_indicator() {
2643        use chrono::Utc;
2644        use pulpo_common::session::SessionStatus;
2645        use std::collections::HashMap;
2646        use uuid::Uuid;
2647
2648        let mut meta = HashMap::new();
2649        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2650        let sessions = vec![Session {
2651            id: Uuid::nil(),
2652            name: "pr-task".into(),
2653            workdir: "/tmp".into(),
2654            command: "claude".into(),
2655            description: None,
2656            status: SessionStatus::Active,
2657            exit_code: None,
2658            backend_session_id: None,
2659            output_snapshot: None,
2660            metadata: Some(meta),
2661            ink: None,
2662            intervention_code: None,
2663            intervention_reason: None,
2664            intervention_at: None,
2665            last_output_at: None,
2666            idle_since: None,
2667            idle_threshold_secs: None,
2668            worktree_path: None,
2669            runtime: Runtime::Tmux,
2670            created_at: Utc::now(),
2671            updated_at: Utc::now(),
2672        }];
2673        let output = format_sessions(&sessions);
2674        assert!(
2675            output.contains("[PR]"),
2676            "should show PR indicator: {output}"
2677        );
2678        assert!(output.contains("pr-task [PR]"));
2679    }
2680
2681    #[test]
2682    fn test_format_sessions_worktree_and_pr_indicator() {
2683        use chrono::Utc;
2684        use pulpo_common::session::SessionStatus;
2685        use std::collections::HashMap;
2686        use uuid::Uuid;
2687
2688        let mut meta = HashMap::new();
2689        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2690        let sessions = vec![Session {
2691            id: Uuid::nil(),
2692            name: "both-task".into(),
2693            workdir: "/tmp".into(),
2694            command: "claude".into(),
2695            description: None,
2696            status: SessionStatus::Active,
2697            exit_code: None,
2698            backend_session_id: None,
2699            output_snapshot: None,
2700            metadata: Some(meta),
2701            ink: None,
2702            intervention_code: None,
2703            intervention_reason: None,
2704            intervention_at: None,
2705            last_output_at: None,
2706            idle_since: None,
2707            idle_threshold_secs: None,
2708            worktree_path: Some("/repo/.pulpo/worktrees/both-task".into()),
2709            runtime: Runtime::Tmux,
2710            created_at: Utc::now(),
2711            updated_at: Utc::now(),
2712        }];
2713        let output = format_sessions(&sessions);
2714        assert!(
2715            output.contains("[wt] [PR]"),
2716            "should show both indicators: {output}"
2717        );
2718    }
2719
2720    #[test]
2721    fn test_format_sessions_no_pr_without_metadata() {
2722        use chrono::Utc;
2723        use pulpo_common::session::SessionStatus;
2724        use uuid::Uuid;
2725
2726        let sessions = vec![Session {
2727            id: Uuid::nil(),
2728            name: "no-pr".into(),
2729            workdir: "/tmp".into(),
2730            command: "claude".into(),
2731            description: None,
2732            status: SessionStatus::Active,
2733            exit_code: None,
2734            backend_session_id: None,
2735            output_snapshot: None,
2736            metadata: None,
2737            ink: None,
2738            intervention_code: None,
2739            intervention_reason: None,
2740            intervention_at: None,
2741            last_output_at: None,
2742            idle_since: None,
2743            idle_threshold_secs: None,
2744            worktree_path: None,
2745            runtime: Runtime::Tmux,
2746            created_at: Utc::now(),
2747            updated_at: Utc::now(),
2748        }];
2749        let output = format_sessions(&sessions);
2750        assert!(
2751            !output.contains("[PR]"),
2752            "should not show PR indicator: {output}"
2753        );
2754    }
2755
2756    #[test]
2757    fn test_format_nodes() {
2758        use pulpo_common::node::NodeInfo;
2759        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2760
2761        let resp = PeersResponse {
2762            local: NodeInfo {
2763                name: "mac-mini".into(),
2764                hostname: "h".into(),
2765                os: "macos".into(),
2766                arch: "arm64".into(),
2767                cpus: 8,
2768                memory_mb: 16384,
2769                gpu: None,
2770            },
2771            peers: vec![PeerInfo {
2772                name: "win-pc".into(),
2773                address: "win-pc:7433".into(),
2774                status: PeerStatus::Online,
2775                node_info: None,
2776                session_count: Some(3),
2777                source: PeerSource::Configured,
2778            }],
2779        };
2780        let output = format_nodes(&resp);
2781        assert!(output.contains("mac-mini"));
2782        assert!(output.contains("(local)"));
2783        assert!(output.contains("win-pc"));
2784        assert!(output.contains('3'));
2785    }
2786
2787    #[test]
2788    fn test_format_nodes_no_session_count() {
2789        use pulpo_common::node::NodeInfo;
2790        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2791
2792        let resp = PeersResponse {
2793            local: NodeInfo {
2794                name: "local".into(),
2795                hostname: "h".into(),
2796                os: "linux".into(),
2797                arch: "x86_64".into(),
2798                cpus: 4,
2799                memory_mb: 8192,
2800                gpu: None,
2801            },
2802            peers: vec![PeerInfo {
2803                name: "peer".into(),
2804                address: "peer:7433".into(),
2805                status: PeerStatus::Offline,
2806                node_info: None,
2807                session_count: None,
2808                source: PeerSource::Configured,
2809            }],
2810        };
2811        let output = format_nodes(&resp);
2812        assert!(output.contains("offline"));
2813        // No session count → shows "-"
2814        let lines: Vec<&str> = output.lines().collect();
2815        assert!(lines[2].contains('-'));
2816    }
2817
2818    #[tokio::test]
2819    async fn test_execute_resume_connection_refused() {
2820        let cli = Cli {
2821            node: "localhost:1".into(),
2822            token: None,
2823            command: Some(Commands::Resume {
2824                name: "test".into(),
2825            }),
2826            path: None,
2827        };
2828        let result = execute(&cli).await;
2829        let err = result.unwrap_err().to_string();
2830        assert!(err.contains("Could not connect to pulpod"));
2831    }
2832
2833    #[tokio::test]
2834    async fn test_execute_spawn_connection_refused() {
2835        let cli = Cli {
2836            node: "localhost:1".into(),
2837            token: None,
2838            command: Some(Commands::Spawn {
2839                name: Some("test".into()),
2840                workdir: Some("/tmp".into()),
2841                ink: None,
2842                description: None,
2843                detach: true,
2844                idle_threshold: None,
2845                auto: false,
2846                worktree: false,
2847                runtime: None,
2848                secret: vec![],
2849                command: vec!["test".into()],
2850            }),
2851            path: None,
2852        };
2853        let result = execute(&cli).await;
2854        let err = result.unwrap_err().to_string();
2855        assert!(err.contains("Could not connect to pulpod"));
2856    }
2857
2858    #[tokio::test]
2859    async fn test_execute_kill_connection_refused() {
2860        let cli = Cli {
2861            node: "localhost:1".into(),
2862            token: None,
2863            command: Some(Commands::Kill {
2864                names: vec!["test".into()],
2865            }),
2866            path: None,
2867        };
2868        let result = execute(&cli).await;
2869        let err = result.unwrap_err().to_string();
2870        assert!(err.contains("Could not connect to pulpod"));
2871    }
2872
2873    #[tokio::test]
2874    async fn test_execute_delete_connection_refused() {
2875        let cli = Cli {
2876            node: "localhost:1".into(),
2877            token: None,
2878            command: Some(Commands::Delete {
2879                names: vec!["test".into()],
2880            }),
2881            path: None,
2882        };
2883        let result = execute(&cli).await;
2884        let err = result.unwrap_err().to_string();
2885        assert!(err.contains("Could not connect to pulpod"));
2886    }
2887
2888    #[tokio::test]
2889    async fn test_execute_logs_connection_refused() {
2890        let cli = Cli {
2891            node: "localhost:1".into(),
2892            token: None,
2893            command: Some(Commands::Logs {
2894                name: "test".into(),
2895                lines: 50,
2896                follow: false,
2897            }),
2898            path: None,
2899        };
2900        let result = execute(&cli).await;
2901        let err = result.unwrap_err().to_string();
2902        assert!(err.contains("Could not connect to pulpod"));
2903    }
2904
2905    #[tokio::test]
2906    async fn test_friendly_error_connect() {
2907        // Make a request to a closed port to get a connect error
2908        let err = reqwest::Client::new()
2909            .get("http://127.0.0.1:1")
2910            .send()
2911            .await
2912            .unwrap_err();
2913        let friendly = friendly_error(&err, "test-node:1");
2914        let msg = friendly.to_string();
2915        assert!(
2916            msg.contains("Could not connect"),
2917            "Expected connect message, got: {msg}"
2918        );
2919    }
2920
2921    #[tokio::test]
2922    async fn test_friendly_error_other() {
2923        // A request to an invalid URL creates a builder error, not a connect error
2924        let err = reqwest::Client::new()
2925            .get("http://[::invalid::url")
2926            .send()
2927            .await
2928            .unwrap_err();
2929        let friendly = friendly_error(&err, "bad-host");
2930        let msg = friendly.to_string();
2931        assert!(
2932            msg.contains("Network error"),
2933            "Expected network error message, got: {msg}"
2934        );
2935        assert!(msg.contains("bad-host"));
2936    }
2937
2938    // -- Auth helper tests --
2939
2940    #[test]
2941    fn test_is_localhost_variants() {
2942        assert!(is_localhost("localhost:7433"));
2943        assert!(is_localhost("127.0.0.1:7433"));
2944        assert!(is_localhost("[::1]:7433"));
2945        assert!(is_localhost("::1"));
2946        assert!(is_localhost("localhost"));
2947        assert!(!is_localhost("mac-mini:7433"));
2948        assert!(!is_localhost("192.168.1.100:7433"));
2949    }
2950
2951    #[test]
2952    fn test_authed_get_with_token() {
2953        let client = reqwest::Client::new();
2954        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2955            .build()
2956            .unwrap();
2957        let auth = req
2958            .headers()
2959            .get("authorization")
2960            .unwrap()
2961            .to_str()
2962            .unwrap();
2963        assert_eq!(auth, "Bearer tok");
2964    }
2965
2966    #[test]
2967    fn test_authed_get_without_token() {
2968        let client = reqwest::Client::new();
2969        let req = authed_get(&client, "http://h:1/api".into(), None)
2970            .build()
2971            .unwrap();
2972        assert!(req.headers().get("authorization").is_none());
2973    }
2974
2975    #[test]
2976    fn test_authed_post_with_token() {
2977        let client = reqwest::Client::new();
2978        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2979            .build()
2980            .unwrap();
2981        let auth = req
2982            .headers()
2983            .get("authorization")
2984            .unwrap()
2985            .to_str()
2986            .unwrap();
2987        assert_eq!(auth, "Bearer secret");
2988    }
2989
2990    #[test]
2991    fn test_authed_post_without_token() {
2992        let client = reqwest::Client::new();
2993        let req = authed_post(&client, "http://h:1/api".into(), None)
2994            .build()
2995            .unwrap();
2996        assert!(req.headers().get("authorization").is_none());
2997    }
2998
2999    #[test]
3000    fn test_authed_delete_with_token() {
3001        let client = reqwest::Client::new();
3002        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
3003            .build()
3004            .unwrap();
3005        let auth = req
3006            .headers()
3007            .get("authorization")
3008            .unwrap()
3009            .to_str()
3010            .unwrap();
3011        assert_eq!(auth, "Bearer del-tok");
3012    }
3013
3014    #[test]
3015    fn test_authed_delete_without_token() {
3016        let client = reqwest::Client::new();
3017        let req = authed_delete(&client, "http://h:1/api".into(), None)
3018            .build()
3019            .unwrap();
3020        assert!(req.headers().get("authorization").is_none());
3021    }
3022
3023    #[tokio::test]
3024    async fn test_resolve_token_explicit() {
3025        let client = reqwest::Client::new();
3026        let token =
3027            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
3028        assert_eq!(token, Some("my-tok".into()));
3029    }
3030
3031    #[tokio::test]
3032    async fn test_resolve_token_remote_no_explicit() {
3033        let client = reqwest::Client::new();
3034        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3035        assert_eq!(token, None);
3036    }
3037
3038    #[tokio::test]
3039    async fn test_resolve_token_localhost_auto_discover() {
3040        use axum::{Json, Router, routing::get};
3041
3042        let app = Router::new().route(
3043            "/api/v1/auth/token",
3044            get(|| async {
3045                Json(AuthTokenResponse {
3046                    token: "discovered".into(),
3047                })
3048            }),
3049        );
3050        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3051        let addr = listener.local_addr().unwrap();
3052        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3053
3054        let node = format!("localhost:{}", addr.port());
3055        let base = base_url(&node);
3056        let client = reqwest::Client::new();
3057        let token = resolve_token(&client, &base, &node, None).await;
3058        assert_eq!(token, Some("discovered".into()));
3059    }
3060
3061    #[tokio::test]
3062    async fn test_discover_token_empty_returns_none() {
3063        use axum::{Json, Router, routing::get};
3064
3065        let app = Router::new().route(
3066            "/api/v1/auth/token",
3067            get(|| async {
3068                Json(AuthTokenResponse {
3069                    token: String::new(),
3070                })
3071            }),
3072        );
3073        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3074        let addr = listener.local_addr().unwrap();
3075        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3076
3077        let base = format!("http://127.0.0.1:{}", addr.port());
3078        let client = reqwest::Client::new();
3079        assert_eq!(discover_token(&client, &base).await, None);
3080    }
3081
3082    #[tokio::test]
3083    async fn test_discover_token_unreachable_returns_none() {
3084        let client = reqwest::Client::new();
3085        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3086    }
3087
3088    #[test]
3089    fn test_cli_parse_with_token() {
3090        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3091        assert_eq!(cli.token, Some("my-secret".into()));
3092    }
3093
3094    #[test]
3095    fn test_cli_parse_without_token() {
3096        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3097        assert_eq!(cli.token, None);
3098    }
3099
3100    #[tokio::test]
3101    async fn test_execute_with_explicit_token_sends_header() {
3102        use axum::{Router, extract::Request, http::StatusCode, routing::get};
3103
3104        let app = Router::new().route(
3105            "/api/v1/sessions",
3106            get(|req: Request| async move {
3107                let auth = req
3108                    .headers()
3109                    .get("authorization")
3110                    .and_then(|v| v.to_str().ok())
3111                    .unwrap_or("");
3112                assert_eq!(auth, "Bearer test-token");
3113                (StatusCode::OK, "[]".to_owned())
3114            }),
3115        );
3116        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3117        let addr = listener.local_addr().unwrap();
3118        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3119        let node = format!("127.0.0.1:{}", addr.port());
3120
3121        let cli = Cli {
3122            node,
3123            token: Some("test-token".into()),
3124            command: Some(Commands::List { all: false }),
3125            path: None,
3126        };
3127        let result = execute(&cli).await.unwrap();
3128        assert_eq!(result, "No sessions.");
3129    }
3130
3131    // -- Interventions tests --
3132
3133    #[test]
3134    fn test_cli_parse_interventions() {
3135        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3136        assert!(matches!(
3137            &cli.command,
3138            Some(Commands::Interventions { name }) if name == "my-session"
3139        ));
3140    }
3141
3142    #[test]
3143    fn test_format_interventions_empty() {
3144        assert_eq!(format_interventions(&[]), "No intervention events.");
3145    }
3146
3147    #[test]
3148    fn test_format_interventions_with_data() {
3149        let events = vec![
3150            InterventionEventResponse {
3151                id: 1,
3152                session_id: "sess-1".into(),
3153                code: None,
3154                reason: "Memory exceeded threshold".into(),
3155                created_at: "2026-01-01T00:00:00Z".into(),
3156            },
3157            InterventionEventResponse {
3158                id: 2,
3159                session_id: "sess-1".into(),
3160                code: None,
3161                reason: "Idle for 10 minutes".into(),
3162                created_at: "2026-01-02T00:00:00Z".into(),
3163            },
3164        ];
3165        let output = format_interventions(&events);
3166        assert!(output.contains("ID"));
3167        assert!(output.contains("TIMESTAMP"));
3168        assert!(output.contains("REASON"));
3169        assert!(output.contains("Memory exceeded threshold"));
3170        assert!(output.contains("Idle for 10 minutes"));
3171        assert!(output.contains("2026-01-01T00:00:00Z"));
3172    }
3173
3174    #[tokio::test]
3175    async fn test_execute_interventions_empty() {
3176        let node = start_test_server().await;
3177        let cli = Cli {
3178            node,
3179            token: None,
3180            command: Some(Commands::Interventions {
3181                name: "my-session".into(),
3182            }),
3183            path: None,
3184        };
3185        let result = execute(&cli).await.unwrap();
3186        assert_eq!(result, "No intervention events.");
3187    }
3188
3189    #[tokio::test]
3190    async fn test_execute_interventions_with_data() {
3191        use axum::{Router, routing::get};
3192
3193        let app = Router::new().route(
3194            "/api/v1/sessions/{id}/interventions",
3195            get(|| async {
3196                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3197                    .to_owned()
3198            }),
3199        );
3200        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3201        let addr = listener.local_addr().unwrap();
3202        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3203        let node = format!("127.0.0.1:{}", addr.port());
3204
3205        let cli = Cli {
3206            node,
3207            token: None,
3208            command: Some(Commands::Interventions {
3209                name: "test".into(),
3210            }),
3211            path: None,
3212        };
3213        let result = execute(&cli).await.unwrap();
3214        assert!(result.contains("OOM"));
3215        assert!(result.contains("2026-01-01T00:00:00Z"));
3216    }
3217
3218    #[tokio::test]
3219    async fn test_execute_interventions_connection_refused() {
3220        let cli = Cli {
3221            node: "localhost:1".into(),
3222            token: None,
3223            command: Some(Commands::Interventions {
3224                name: "test".into(),
3225            }),
3226            path: None,
3227        };
3228        let result = execute(&cli).await;
3229        let err = result.unwrap_err().to_string();
3230        assert!(err.contains("Could not connect to pulpod"));
3231    }
3232
3233    // -- Attach command tests --
3234
3235    #[test]
3236    fn test_build_attach_command_tmux() {
3237        let cmd = build_attach_command("my-session");
3238        assert_eq!(cmd.get_program(), "tmux");
3239        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3240        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3241    }
3242
3243    #[test]
3244    fn test_build_attach_command_docker() {
3245        let cmd = build_attach_command("docker:pulpo-my-task");
3246        assert_eq!(cmd.get_program(), "docker");
3247        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3248        assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3249    }
3250
3251    #[test]
3252    fn test_cli_parse_attach() {
3253        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3254        assert!(matches!(
3255            &cli.command,
3256            Some(Commands::Attach { name }) if name == "my-session"
3257        ));
3258    }
3259
3260    #[test]
3261    fn test_cli_parse_attach_alias() {
3262        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3263        assert!(matches!(
3264            &cli.command,
3265            Some(Commands::Attach { name }) if name == "my-session"
3266        ));
3267    }
3268
3269    #[tokio::test]
3270    async fn test_execute_attach_success() {
3271        let node = start_test_server().await;
3272        let cli = Cli {
3273            node,
3274            token: None,
3275            command: Some(Commands::Attach {
3276                name: "test-session".into(),
3277            }),
3278            path: None,
3279        };
3280        let result = execute(&cli).await.unwrap();
3281        assert!(result.contains("Detached from session test-session"));
3282    }
3283
3284    #[tokio::test]
3285    async fn test_execute_attach_with_backend_session_id() {
3286        use axum::{Router, routing::get};
3287        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3288        let app = Router::new().route(
3289            "/api/v1/sessions/{id}",
3290            get(move || async move { session_json.to_owned() }),
3291        );
3292        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3293        let addr = listener.local_addr().unwrap();
3294        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3295
3296        let cli = Cli {
3297            node: format!("127.0.0.1:{}", addr.port()),
3298            token: None,
3299            command: Some(Commands::Attach {
3300                name: "my-session".into(),
3301            }),
3302            path: None,
3303        };
3304        let result = execute(&cli).await.unwrap();
3305        assert!(result.contains("Detached from session my-session"));
3306    }
3307
3308    #[tokio::test]
3309    async fn test_execute_attach_connection_refused() {
3310        let cli = Cli {
3311            node: "localhost:1".into(),
3312            token: None,
3313            command: Some(Commands::Attach {
3314                name: "test-session".into(),
3315            }),
3316            path: None,
3317        };
3318        let result = execute(&cli).await;
3319        let err = result.unwrap_err().to_string();
3320        assert!(err.contains("Could not connect to pulpod"));
3321    }
3322
3323    #[tokio::test]
3324    async fn test_execute_attach_error_response() {
3325        use axum::{Router, http::StatusCode, routing::get};
3326        let app = Router::new().route(
3327            "/api/v1/sessions/{id}",
3328            get(|| async {
3329                (
3330                    StatusCode::NOT_FOUND,
3331                    r#"{"error":"session not found"}"#.to_owned(),
3332                )
3333            }),
3334        );
3335        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3336        let addr = listener.local_addr().unwrap();
3337        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3338
3339        let cli = Cli {
3340            node: format!("127.0.0.1:{}", addr.port()),
3341            token: None,
3342            command: Some(Commands::Attach {
3343                name: "nonexistent".into(),
3344            }),
3345            path: None,
3346        };
3347        let result = execute(&cli).await;
3348        let err = result.unwrap_err().to_string();
3349        assert!(err.contains("session not found"));
3350    }
3351
3352    #[tokio::test]
3353    async fn test_execute_attach_stale_session() {
3354        use axum::{Router, routing::get};
3355        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3356        let app = Router::new().route(
3357            "/api/v1/sessions/{id}",
3358            get(move || async move { session_json.to_owned() }),
3359        );
3360        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3361        let addr = listener.local_addr().unwrap();
3362        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3363
3364        let cli = Cli {
3365            node: format!("127.0.0.1:{}", addr.port()),
3366            token: None,
3367            command: Some(Commands::Attach {
3368                name: "stale-sess".into(),
3369            }),
3370            path: None,
3371        };
3372        let result = execute(&cli).await;
3373        let err = result.unwrap_err().to_string();
3374        assert!(err.contains("lost"));
3375        assert!(err.contains("pulpo resume"));
3376    }
3377
3378    #[tokio::test]
3379    async fn test_execute_attach_dead_session() {
3380        use axum::{Router, routing::get};
3381        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3382        let app = Router::new().route(
3383            "/api/v1/sessions/{id}",
3384            get(move || async move { session_json.to_owned() }),
3385        );
3386        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3387        let addr = listener.local_addr().unwrap();
3388        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3389
3390        let cli = Cli {
3391            node: format!("127.0.0.1:{}", addr.port()),
3392            token: None,
3393            command: Some(Commands::Attach {
3394                name: "dead-sess".into(),
3395            }),
3396            path: None,
3397        };
3398        let result = execute(&cli).await;
3399        let err = result.unwrap_err().to_string();
3400        assert!(err.contains("killed"));
3401        assert!(err.contains("cannot attach"));
3402    }
3403
3404    // -- Alias parse tests --
3405
3406    #[test]
3407    fn test_cli_parse_alias_spawn() {
3408        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3409        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3410    }
3411
3412    #[test]
3413    fn test_cli_parse_alias_list() {
3414        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3415        assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3416    }
3417
3418    #[test]
3419    fn test_cli_parse_list_all() {
3420        let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3421        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3422
3423        let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3424        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3425    }
3426
3427    #[test]
3428    fn test_cli_parse_alias_logs() {
3429        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3430        assert!(matches!(
3431            &cli.command,
3432            Some(Commands::Logs { name, .. }) if name == "my-session"
3433        ));
3434    }
3435
3436    #[test]
3437    fn test_cli_parse_alias_kill() {
3438        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3439        assert!(matches!(
3440            &cli.command,
3441            Some(Commands::Kill { names }) if names == &["my-session"]
3442        ));
3443    }
3444
3445    #[test]
3446    fn test_cli_parse_alias_delete() {
3447        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
3448        assert!(matches!(
3449            &cli.command,
3450            Some(Commands::Delete { names }) if names == &["my-session"]
3451        ));
3452    }
3453
3454    #[test]
3455    fn test_cli_parse_alias_resume() {
3456        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3457        assert!(matches!(
3458            &cli.command,
3459            Some(Commands::Resume { name }) if name == "my-session"
3460        ));
3461    }
3462
3463    #[test]
3464    fn test_cli_parse_alias_nodes() {
3465        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3466        assert!(matches!(&cli.command, Some(Commands::Nodes)));
3467    }
3468
3469    #[test]
3470    fn test_cli_parse_alias_interventions() {
3471        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3472        assert!(matches!(
3473            &cli.command,
3474            Some(Commands::Interventions { name }) if name == "my-session"
3475        ));
3476    }
3477
3478    #[test]
3479    fn test_api_error_json() {
3480        let err = api_error("{\"error\":\"session not found: foo\"}");
3481        assert_eq!(err.to_string(), "session not found: foo");
3482    }
3483
3484    #[test]
3485    fn test_api_error_plain_text() {
3486        let err = api_error("plain text error");
3487        assert_eq!(err.to_string(), "plain text error");
3488    }
3489
3490    // -- diff_output tests --
3491
3492    #[test]
3493    fn test_diff_output_empty_prev() {
3494        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3495    }
3496
3497    #[test]
3498    fn test_diff_output_identical() {
3499        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3500    }
3501
3502    #[test]
3503    fn test_diff_output_new_lines_appended() {
3504        let prev = "line1\nline2";
3505        let new = "line1\nline2\nline3\nline4";
3506        assert_eq!(diff_output(prev, new), "line3\nline4");
3507    }
3508
3509    #[test]
3510    fn test_diff_output_scrolled_window() {
3511        // Window of 3 lines: old lines scroll off top, new appear at bottom
3512        let prev = "line1\nline2\nline3";
3513        let new = "line2\nline3\nline4";
3514        assert_eq!(diff_output(prev, new), "line4");
3515    }
3516
3517    #[test]
3518    fn test_diff_output_completely_different() {
3519        let prev = "aaa\nbbb";
3520        let new = "xxx\nyyy";
3521        assert_eq!(diff_output(prev, new), "xxx\nyyy");
3522    }
3523
3524    #[test]
3525    fn test_diff_output_last_line_matches_but_overlap_fails() {
3526        // Last line of prev appears in new but preceding lines don't match
3527        let prev = "aaa\ncommon";
3528        let new = "zzz\ncommon\nnew_line";
3529        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
3530        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
3531        // Falls through, no verified overlap, so returns everything
3532        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3533    }
3534
3535    #[test]
3536    fn test_diff_output_new_empty() {
3537        assert_eq!(diff_output("line1", ""), "");
3538    }
3539
3540    // -- follow_logs tests --
3541
3542    /// Start a test server that simulates evolving output and session status transitions.
3543    /// Start a test server that simulates evolving output with agent exit marker.
3544    async fn start_follow_test_server() -> String {
3545        use axum::{Router, extract::Path, extract::Query, routing::get};
3546        use std::sync::Arc;
3547        use std::sync::atomic::{AtomicUsize, Ordering};
3548
3549        let call_count = Arc::new(AtomicUsize::new(0));
3550        let output_count = call_count.clone();
3551
3552        let app = Router::new()
3553            .route(
3554                "/api/v1/sessions/{id}/output",
3555                get(
3556                    move |_path: Path<String>,
3557                          _query: Query<std::collections::HashMap<String, String>>| {
3558                        let count = output_count.clone();
3559                        async move {
3560                            let n = count.fetch_add(1, Ordering::SeqCst);
3561                            let output = match n {
3562                                0 => "line1\nline2".to_owned(),
3563                                1 => "line1\nline2\nline3".to_owned(),
3564                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3565                            };
3566                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3567                        }
3568                    },
3569                ),
3570            )
3571            .route(
3572                "/api/v1/sessions/{id}",
3573                get(|_path: Path<String>| async {
3574                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3575                }),
3576            );
3577
3578        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3579        let addr = listener.local_addr().unwrap();
3580        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3581        format!("http://127.0.0.1:{}", addr.port())
3582    }
3583
3584    #[tokio::test]
3585    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3586        let base = start_follow_test_server().await;
3587        let client = reqwest::Client::new();
3588        let mut buf = Vec::new();
3589
3590        follow_logs(&client, &base, "test", 100, None, &mut buf)
3591            .await
3592            .unwrap();
3593
3594        let output = String::from_utf8(buf).unwrap();
3595        // Should contain initial output + new lines + agent exit marker
3596        assert!(output.contains("line1"));
3597        assert!(output.contains("line2"));
3598        assert!(output.contains("line3"));
3599        assert!(output.contains("line4"));
3600        assert!(output.contains("[pulpo] Agent exited"));
3601    }
3602
3603    #[tokio::test]
3604    async fn test_execute_logs_follow_success() {
3605        let base = start_follow_test_server().await;
3606        // Extract host:port from http://127.0.0.1:PORT
3607        let node = base.strip_prefix("http://").unwrap().to_owned();
3608
3609        let cli = Cli {
3610            node,
3611            token: None,
3612            command: Some(Commands::Logs {
3613                name: "test".into(),
3614                lines: 100,
3615                follow: true,
3616            }),
3617            path: None,
3618        };
3619        // execute() with follow writes to stdout and returns empty string
3620        let result = execute(&cli).await.unwrap();
3621        assert_eq!(result, "");
3622    }
3623
3624    #[tokio::test]
3625    async fn test_execute_logs_follow_connection_refused() {
3626        let cli = Cli {
3627            node: "localhost:1".into(),
3628            token: None,
3629            command: Some(Commands::Logs {
3630                name: "test".into(),
3631                lines: 50,
3632                follow: true,
3633            }),
3634            path: None,
3635        };
3636        let result = execute(&cli).await;
3637        let err = result.unwrap_err().to_string();
3638        assert!(
3639            err.contains("Could not connect to pulpod"),
3640            "Expected friendly error, got: {err}"
3641        );
3642    }
3643
3644    #[tokio::test]
3645    async fn test_follow_logs_exits_on_dead() {
3646        use axum::{Router, extract::Path, extract::Query, routing::get};
3647
3648        let app = Router::new()
3649            .route(
3650                "/api/v1/sessions/{id}/output",
3651                get(
3652                    |_path: Path<String>,
3653                     _query: Query<std::collections::HashMap<String, String>>| async {
3654                        r#"{"output":"some output"}"#.to_owned()
3655                    },
3656                ),
3657            )
3658            .route(
3659                "/api/v1/sessions/{id}",
3660                get(|_path: Path<String>| async {
3661                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3662                }),
3663            );
3664
3665        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3666        let addr = listener.local_addr().unwrap();
3667        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3668        let base = format!("http://127.0.0.1:{}", addr.port());
3669
3670        let client = reqwest::Client::new();
3671        let mut buf = Vec::new();
3672        follow_logs(&client, &base, "test", 100, None, &mut buf)
3673            .await
3674            .unwrap();
3675
3676        let output = String::from_utf8(buf).unwrap();
3677        assert!(output.contains("some output"));
3678    }
3679
3680    #[tokio::test]
3681    async fn test_follow_logs_exits_on_stale() {
3682        use axum::{Router, extract::Path, extract::Query, routing::get};
3683
3684        let app = Router::new()
3685            .route(
3686                "/api/v1/sessions/{id}/output",
3687                get(
3688                    |_path: Path<String>,
3689                     _query: Query<std::collections::HashMap<String, String>>| async {
3690                        r#"{"output":"stale output"}"#.to_owned()
3691                    },
3692                ),
3693            )
3694            .route(
3695                "/api/v1/sessions/{id}",
3696                get(|_path: Path<String>| async {
3697                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3698                }),
3699            );
3700
3701        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3702        let addr = listener.local_addr().unwrap();
3703        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3704        let base = format!("http://127.0.0.1:{}", addr.port());
3705
3706        let client = reqwest::Client::new();
3707        let mut buf = Vec::new();
3708        follow_logs(&client, &base, "test", 100, None, &mut buf)
3709            .await
3710            .unwrap();
3711
3712        let output = String::from_utf8(buf).unwrap();
3713        assert!(output.contains("stale output"));
3714    }
3715
3716    #[tokio::test]
3717    async fn test_execute_logs_follow_non_reqwest_error() {
3718        use axum::{Router, extract::Path, extract::Query, routing::get};
3719
3720        // Session status endpoint returns invalid JSON to trigger a serde error
3721        let app = Router::new()
3722            .route(
3723                "/api/v1/sessions/{id}/output",
3724                get(
3725                    |_path: Path<String>,
3726                     _query: Query<std::collections::HashMap<String, String>>| async {
3727                        r#"{"output":"initial"}"#.to_owned()
3728                    },
3729                ),
3730            )
3731            .route(
3732                "/api/v1/sessions/{id}",
3733                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3734            );
3735
3736        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3737        let addr = listener.local_addr().unwrap();
3738        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3739        let node = format!("127.0.0.1:{}", addr.port());
3740
3741        let cli = Cli {
3742            node,
3743            token: None,
3744            command: Some(Commands::Logs {
3745                name: "test".into(),
3746                lines: 100,
3747                follow: true,
3748            }),
3749            path: None,
3750        };
3751        let err = execute(&cli).await.unwrap_err();
3752        // serde_json error, not a reqwest error — hits the Err(other) branch
3753        let msg = err.to_string();
3754        assert!(
3755            msg.contains("expected ident"),
3756            "Expected serde parse error, got: {msg}"
3757        );
3758    }
3759
3760    #[tokio::test]
3761    async fn test_fetch_session_status_connection_error() {
3762        let client = reqwest::Client::new();
3763        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3764        assert!(result.is_err());
3765    }
3766
3767    // -- Schedule tests --
3768
3769    #[test]
3770    fn test_format_schedules_empty() {
3771        assert_eq!(format_schedules(&[]), "No schedules.");
3772    }
3773
3774    #[test]
3775    fn test_format_schedules_with_entries() {
3776        let schedules = vec![serde_json::json!({
3777            "name": "nightly",
3778            "cron": "0 3 * * *",
3779            "enabled": true,
3780            "last_run_at": null,
3781            "target_node": null
3782        })];
3783        let output = format_schedules(&schedules);
3784        assert!(output.contains("nightly"));
3785        assert!(output.contains("0 3 * * *"));
3786        assert!(output.contains("local"));
3787        assert!(output.contains("yes"));
3788        assert!(output.contains('-'));
3789    }
3790
3791    #[test]
3792    fn test_format_schedules_disabled_entry() {
3793        let schedules = vec![serde_json::json!({
3794            "name": "weekly",
3795            "cron": "0 0 * * 0",
3796            "enabled": false,
3797            "last_run_at": "2026-03-18T03:00:00Z",
3798            "target_node": "gpu-box"
3799        })];
3800        let output = format_schedules(&schedules);
3801        assert!(output.contains("weekly"));
3802        assert!(output.contains("no"));
3803        assert!(output.contains("gpu-box"));
3804        assert!(output.contains("2026-03-18T03:00"));
3805    }
3806
3807    #[test]
3808    fn test_format_schedules_header() {
3809        let schedules = vec![serde_json::json!({
3810            "name": "test",
3811            "cron": "* * * * *",
3812            "enabled": true,
3813            "last_run_at": null,
3814            "target_node": null
3815        })];
3816        let output = format_schedules(&schedules);
3817        assert!(output.contains("NAME"));
3818        assert!(output.contains("CRON"));
3819        assert!(output.contains("ENABLED"));
3820        assert!(output.contains("LAST RUN"));
3821        assert!(output.contains("NODE"));
3822    }
3823
3824    // -- Schedule CLI parse tests --
3825
3826    #[test]
3827    fn test_cli_parse_schedule_add() {
3828        let cli = Cli::try_parse_from([
3829            "pulpo",
3830            "schedule",
3831            "add",
3832            "nightly",
3833            "0 3 * * *",
3834            "--workdir",
3835            "/repo",
3836            "--",
3837            "claude",
3838            "-p",
3839            "review",
3840        ])
3841        .unwrap();
3842        assert!(matches!(
3843            &cli.command,
3844            Some(Commands::Schedule {
3845                action: ScheduleAction::Add { name, cron, .. }
3846            }) if name == "nightly" && cron == "0 3 * * *"
3847        ));
3848    }
3849
3850    #[test]
3851    fn test_cli_parse_schedule_add_with_node() {
3852        let cli = Cli::try_parse_from([
3853            "pulpo",
3854            "schedule",
3855            "add",
3856            "nightly",
3857            "0 3 * * *",
3858            "--workdir",
3859            "/repo",
3860            "--node",
3861            "gpu-box",
3862            "--",
3863            "claude",
3864        ])
3865        .unwrap();
3866        assert!(matches!(
3867            &cli.command,
3868            Some(Commands::Schedule {
3869                action: ScheduleAction::Add { node, .. }
3870            }) if node.as_deref() == Some("gpu-box")
3871        ));
3872    }
3873
3874    #[test]
3875    fn test_cli_parse_schedule_add_install_alias() {
3876        let cli =
3877            Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3878        assert!(matches!(
3879            &cli.command,
3880            Some(Commands::Schedule {
3881                action: ScheduleAction::Add { name, .. }
3882            }) if name == "nightly"
3883        ));
3884    }
3885
3886    #[test]
3887    fn test_cli_parse_schedule_list() {
3888        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3889        assert!(matches!(
3890            &cli.command,
3891            Some(Commands::Schedule {
3892                action: ScheduleAction::List
3893            })
3894        ));
3895    }
3896
3897    #[test]
3898    fn test_cli_parse_schedule_remove() {
3899        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3900        assert!(matches!(
3901            &cli.command,
3902            Some(Commands::Schedule {
3903                action: ScheduleAction::Remove { name }
3904            }) if name == "nightly"
3905        ));
3906    }
3907
3908    #[test]
3909    fn test_cli_parse_schedule_pause() {
3910        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3911        assert!(matches!(
3912            &cli.command,
3913            Some(Commands::Schedule {
3914                action: ScheduleAction::Pause { name }
3915            }) if name == "nightly"
3916        ));
3917    }
3918
3919    #[test]
3920    fn test_cli_parse_schedule_resume() {
3921        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3922        assert!(matches!(
3923            &cli.command,
3924            Some(Commands::Schedule {
3925                action: ScheduleAction::Resume { name }
3926            }) if name == "nightly"
3927        ));
3928    }
3929
3930    #[test]
3931    fn test_cli_parse_schedule_alias() {
3932        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3933        assert!(matches!(
3934            &cli.command,
3935            Some(Commands::Schedule {
3936                action: ScheduleAction::List
3937            })
3938        ));
3939    }
3940
3941    #[test]
3942    fn test_cli_parse_schedule_list_alias() {
3943        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3944        assert!(matches!(
3945            &cli.command,
3946            Some(Commands::Schedule {
3947                action: ScheduleAction::List
3948            })
3949        ));
3950    }
3951
3952    #[test]
3953    fn test_cli_parse_schedule_remove_alias() {
3954        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3955        assert!(matches!(
3956            &cli.command,
3957            Some(Commands::Schedule {
3958                action: ScheduleAction::Remove { name }
3959            }) if name == "nightly"
3960        ));
3961    }
3962
3963    #[tokio::test]
3964    async fn test_execute_schedule_list_via_execute() {
3965        let node = start_test_server().await;
3966        let cli = Cli {
3967            node,
3968            token: None,
3969            command: Some(Commands::Schedule {
3970                action: ScheduleAction::List,
3971            }),
3972            path: None,
3973        };
3974        let result = execute(&cli).await.unwrap();
3975        // Under coverage, execute_schedule is a stub that returns empty string
3976        #[cfg(coverage)]
3977        assert!(result.is_empty());
3978        #[cfg(not(coverage))]
3979        assert_eq!(result, "No schedules.");
3980    }
3981
3982    #[test]
3983    fn test_schedule_action_debug() {
3984        let action = ScheduleAction::List;
3985        assert_eq!(format!("{action:?}"), "List");
3986    }
3987
3988    #[test]
3989    fn test_cli_parse_send_alias() {
3990        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3991        assert!(matches!(
3992            &cli.command,
3993            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3994        ));
3995    }
3996
3997    #[test]
3998    fn test_cli_parse_spawn_no_name() {
3999        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
4000        assert!(matches!(
4001            &cli.command,
4002            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
4003        ));
4004    }
4005
4006    #[test]
4007    fn test_cli_parse_spawn_optional_name_with_command() {
4008        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
4009        assert!(matches!(
4010            &cli.command,
4011            Some(Commands::Spawn { name, command, .. })
4012                if name.is_none() && command == &["echo", "hello"]
4013        ));
4014    }
4015
4016    #[test]
4017    fn test_cli_parse_path_shortcut() {
4018        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
4019        assert!(cli.command.is_none());
4020        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
4021    }
4022
4023    #[test]
4024    fn test_cli_parse_no_args() {
4025        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
4026        assert!(cli.command.is_none());
4027        assert!(cli.path.is_none());
4028    }
4029
4030    #[test]
4031    fn test_derive_session_name_simple() {
4032        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
4033    }
4034
4035    #[test]
4036    fn test_derive_session_name_with_special_chars() {
4037        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4038    }
4039
4040    #[test]
4041    fn test_derive_session_name_root() {
4042        assert_eq!(derive_session_name("/"), "session");
4043    }
4044
4045    #[test]
4046    fn test_derive_session_name_dots() {
4047        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4048    }
4049
4050    #[test]
4051    fn test_resolve_path_absolute() {
4052        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4053    }
4054
4055    #[test]
4056    fn test_resolve_path_relative() {
4057        let resolved = resolve_path("my-repo");
4058        assert!(resolved.ends_with("my-repo"));
4059        assert!(resolved.starts_with('/'));
4060    }
4061
4062    #[tokio::test]
4063    async fn test_execute_no_args_shows_help() {
4064        let node = start_test_server().await;
4065        let cli = Cli {
4066            node,
4067            token: None,
4068            path: None,
4069            command: None,
4070        };
4071        let result = execute(&cli).await.unwrap();
4072        assert!(
4073            result.is_empty(),
4074            "no-args should return empty string after printing help"
4075        );
4076    }
4077
4078    #[tokio::test]
4079    async fn test_execute_path_shortcut() {
4080        let node = start_test_server().await;
4081        let cli = Cli {
4082            node,
4083            token: None,
4084            path: Some("/tmp".into()),
4085            command: None,
4086        };
4087        let result = execute(&cli).await.unwrap();
4088        assert!(result.contains("Detached from session"));
4089    }
4090
4091    #[tokio::test]
4092    async fn test_deduplicate_session_name_no_conflict() {
4093        // Connection refused → falls through to "name not taken" path
4094        let base = "http://127.0.0.1:1";
4095        let client = reqwest::Client::new();
4096        let name = deduplicate_session_name(&client, base, "fresh", None).await;
4097        assert_eq!(name, "fresh");
4098    }
4099
4100    #[tokio::test]
4101    async fn test_deduplicate_session_name_with_conflict() {
4102        use axum::{Router, routing::get};
4103        use std::sync::atomic::{AtomicU32, Ordering};
4104
4105        let call_count = std::sync::Arc::new(AtomicU32::new(0));
4106        let counter = call_count.clone();
4107        let app = Router::new()
4108            .route(
4109                "/api/v1/sessions/{id}",
4110                get(move || {
4111                    let c = counter.clone();
4112                    async move {
4113                        let n = c.fetch_add(1, Ordering::SeqCst);
4114                        if n == 0 {
4115                            // First call (base name) → exists
4116                            (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4117                        } else {
4118                            // Suffixed name → not found
4119                            (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4120                        }
4121                    }
4122                }),
4123            )
4124            .route(
4125                "/api/v1/peers",
4126                get(|| async {
4127                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4128                }),
4129            );
4130        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4131        let addr = listener.local_addr().unwrap();
4132        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4133        let base = format!("http://127.0.0.1:{}", addr.port());
4134        let client = reqwest::Client::new();
4135        let name = deduplicate_session_name(&client, &base, "repo", None).await;
4136        assert_eq!(name, "repo-2");
4137    }
4138
4139    // -- Node resolution tests --
4140
4141    #[test]
4142    fn test_node_needs_resolution() {
4143        assert!(!node_needs_resolution("localhost:7433"));
4144        assert!(!node_needs_resolution("mac-mini:7433"));
4145        assert!(!node_needs_resolution("10.0.0.1:7433"));
4146        assert!(!node_needs_resolution("[::1]:7433"));
4147        assert!(node_needs_resolution("mac-mini"));
4148        assert!(node_needs_resolution("linux-server"));
4149        assert!(node_needs_resolution("localhost"));
4150    }
4151
4152    #[tokio::test]
4153    async fn test_resolve_node_with_port() {
4154        let client = reqwest::Client::new();
4155        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4156        assert_eq!(addr, "mac-mini:7433");
4157        assert!(token.is_none());
4158    }
4159
4160    #[tokio::test]
4161    async fn test_resolve_node_fallback_appends_port() {
4162        // No local daemon running on localhost:7433, so peer lookup fails
4163        // and it falls back to appending :7433
4164        let client = reqwest::Client::new();
4165        let (addr, token) = resolve_node(&client, "unknown-host").await;
4166        assert_eq!(addr, "unknown-host:7433");
4167        assert!(token.is_none());
4168    }
4169
4170    #[cfg(not(coverage))]
4171    #[tokio::test]
4172    async fn test_resolve_node_finds_peer() {
4173        use axum::{Router, routing::get};
4174
4175        let app = Router::new()
4176            .route(
4177                "/api/v1/peers",
4178                get(|| async {
4179                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4180                }),
4181            )
4182            .route(
4183                "/api/v1/config",
4184                get(|| async {
4185                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4186                }),
4187            );
4188
4189        // Port 7433 may be in use; skip test if so
4190        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4191            return;
4192        };
4193        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4194
4195        let client = reqwest::Client::new();
4196        let (addr, token) = resolve_node(&client, "mac-mini").await;
4197        assert_eq!(addr, "10.0.0.5:7433");
4198        assert_eq!(token, Some("peer-secret".into()));
4199    }
4200
4201    #[cfg(not(coverage))]
4202    #[tokio::test]
4203    async fn test_resolve_node_peer_no_token() {
4204        use axum::{Router, routing::get};
4205
4206        let app = Router::new()
4207            .route(
4208                "/api/v1/peers",
4209                get(|| async {
4210                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4211                }),
4212            )
4213            .route(
4214                "/api/v1/config",
4215                get(|| async {
4216                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4217                }),
4218            );
4219
4220        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4221            return; // Port in use, skip
4222        };
4223        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4224
4225        let client = reqwest::Client::new();
4226        let (addr, token) = resolve_node(&client, "test-peer").await;
4227        assert_eq!(addr, "10.0.0.9:7433");
4228        assert!(token.is_none()); // Simple peer entry has no token
4229    }
4230
4231    #[tokio::test]
4232    async fn test_execute_with_peer_name_resolution() {
4233        // When node doesn't contain ':', resolve_node is called.
4234        // Since there's no local daemon on port 7433, it falls back to appending :7433.
4235        // The connection to the fallback address will fail, giving us a connection error.
4236        let cli = Cli {
4237            node: "nonexistent-peer".into(),
4238            token: None,
4239            command: Some(Commands::List { all: false }),
4240            path: None,
4241        };
4242        let result = execute(&cli).await;
4243        // Should try to connect to nonexistent-peer:7433 and fail
4244        assert!(result.is_err());
4245    }
4246
4247    // -- Auto node selection tests --
4248
4249    #[test]
4250    fn test_cli_parse_spawn_auto() {
4251        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4252        assert!(matches!(
4253            &cli.command,
4254            Some(Commands::Spawn { auto, .. }) if *auto
4255        ));
4256    }
4257
4258    #[test]
4259    fn test_cli_parse_spawn_auto_default() {
4260        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4261        assert!(matches!(
4262            &cli.command,
4263            Some(Commands::Spawn { auto, .. }) if !auto
4264        ));
4265    }
4266
4267    #[tokio::test]
4268    async fn test_select_best_node_coverage_stub() {
4269        // Exercise the coverage stub (or real function in non-coverage builds)
4270        let client = reqwest::Client::new();
4271        // In coverage builds, the stub returns ("localhost:7433", "local")
4272        // In non-coverage builds, this fails because no server is running — that's OK
4273        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4274    }
4275
4276    #[cfg(not(coverage))]
4277    #[tokio::test]
4278    async fn test_select_best_node_picks_least_loaded() {
4279        use axum::{Router, routing::get};
4280
4281        let app = Router::new().route(
4282            "/api/v1/peers",
4283            get(|| async {
4284                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4285            }),
4286        );
4287        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4288        let addr = listener.local_addr().unwrap();
4289        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4290        let base = format!("http://127.0.0.1:{}", addr.port());
4291
4292        let client = reqwest::Client::new();
4293        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4294        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
4295        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
4296        // idle wins (lower score)
4297        assert_eq!(name, "idle");
4298        assert_eq!(addr, "idle:7433");
4299    }
4300
4301    #[cfg(not(coverage))]
4302    #[tokio::test]
4303    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4304        use axum::{Router, routing::get};
4305
4306        let app = Router::new().route(
4307            "/api/v1/peers",
4308            get(|| async {
4309                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4310            }),
4311        );
4312        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4313        let addr = listener.local_addr().unwrap();
4314        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4315        let base = format!("http://127.0.0.1:{}", addr.port());
4316
4317        let client = reqwest::Client::new();
4318        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4319        assert_eq!(name, "my-mac");
4320        assert_eq!(addr, "localhost:7433");
4321    }
4322
4323    #[cfg(not(coverage))]
4324    #[tokio::test]
4325    async fn test_select_best_node_empty_peers_falls_back_to_local() {
4326        use axum::{Router, routing::get};
4327
4328        let app = Router::new().route(
4329            "/api/v1/peers",
4330            get(|| async {
4331                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4332            }),
4333        );
4334        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4335        let addr = listener.local_addr().unwrap();
4336        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4337        let base = format!("http://127.0.0.1:{}", addr.port());
4338
4339        let client = reqwest::Client::new();
4340        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4341        assert_eq!(name, "solo");
4342        assert_eq!(addr, "localhost:7433");
4343    }
4344
4345    #[cfg(not(coverage))]
4346    #[tokio::test]
4347    async fn test_execute_spawn_auto_selects_node() {
4348        use axum::{
4349            Router,
4350            http::StatusCode,
4351            routing::{get, post},
4352        };
4353
4354        let create_json = test_create_response_json();
4355
4356        // Bind early so we know the address to embed in the peers response
4357        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4358        let addr = listener.local_addr().unwrap();
4359        let node = format!("127.0.0.1:{}", addr.port());
4360        let peer_addr = node.clone();
4361
4362        let app = Router::new()
4363            .route(
4364                "/api/v1/peers",
4365                get(move || {
4366                    let peer_addr = peer_addr.clone();
4367                    async move {
4368                        format!(
4369                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4370                        )
4371                    }
4372                }),
4373            )
4374            .route(
4375                "/api/v1/sessions",
4376                post(move || async move {
4377                    (StatusCode::CREATED, create_json.clone())
4378                }),
4379            );
4380
4381        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4382
4383        let cli = Cli {
4384            node,
4385            token: None,
4386            command: Some(Commands::Spawn {
4387                name: Some("test".into()),
4388                workdir: Some("/tmp/repo".into()),
4389                ink: None,
4390                description: None,
4391                detach: true,
4392                idle_threshold: None,
4393                auto: true,
4394                worktree: false,
4395                runtime: None,
4396                secret: vec![],
4397                command: vec!["echo".into(), "hello".into()],
4398            }),
4399            path: None,
4400        };
4401        let result = execute(&cli).await.unwrap();
4402        assert!(result.contains("Created session"));
4403    }
4404
4405    #[cfg(not(coverage))]
4406    #[tokio::test]
4407    async fn test_select_best_node_peer_no_session_count() {
4408        use axum::{Router, routing::get};
4409
4410        let app = Router::new().route(
4411            "/api/v1/peers",
4412            get(|| async {
4413                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4414            }),
4415        );
4416        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4417        let addr = listener.local_addr().unwrap();
4418        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4419        let base = format!("http://127.0.0.1:{}", addr.port());
4420
4421        let client = reqwest::Client::new();
4422        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4423        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
4424        assert_eq!(name, "fresh");
4425        assert_eq!(addr, "fresh:7433");
4426    }
4427
4428    // -- Secret CLI parse tests --
4429
4430    #[test]
4431    fn test_cli_parse_secret_set() {
4432        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4433        assert!(matches!(
4434            &cli.command,
4435            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4436                if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4437        ));
4438    }
4439
4440    #[test]
4441    fn test_cli_parse_secret_list() {
4442        let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4443        assert!(matches!(
4444            &cli.command,
4445            Some(Commands::Secret {
4446                action: SecretAction::List
4447            })
4448        ));
4449    }
4450
4451    #[test]
4452    fn test_cli_parse_secret_list_alias() {
4453        let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4454        assert!(matches!(
4455            &cli.command,
4456            Some(Commands::Secret {
4457                action: SecretAction::List
4458            })
4459        ));
4460    }
4461
4462    #[test]
4463    fn test_cli_parse_secret_delete() {
4464        let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4465        assert!(matches!(
4466            &cli.command,
4467            Some(Commands::Secret { action: SecretAction::Delete { name } })
4468                if name == "MY_TOKEN"
4469        ));
4470    }
4471
4472    #[test]
4473    fn test_cli_parse_secret_delete_alias() {
4474        let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4475        assert!(matches!(
4476            &cli.command,
4477            Some(Commands::Secret { action: SecretAction::Delete { name } })
4478                if name == "MY_TOKEN"
4479        ));
4480    }
4481
4482    #[test]
4483    fn test_cli_parse_secret_alias() {
4484        let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4485        assert!(matches!(
4486            &cli.command,
4487            Some(Commands::Secret {
4488                action: SecretAction::List
4489            })
4490        ));
4491    }
4492
4493    #[test]
4494    fn test_format_secrets_empty() {
4495        let secrets: Vec<serde_json::Value> = vec![];
4496        assert_eq!(format_secrets(&secrets), "No secrets configured.");
4497    }
4498
4499    #[test]
4500    fn test_format_secrets_with_entries() {
4501        let secrets = vec![
4502            serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4503            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4504        ];
4505        let output = format_secrets(&secrets);
4506        assert!(output.contains("GITHUB_TOKEN"));
4507        assert!(output.contains("NPM_TOKEN"));
4508        assert!(output.contains("NAME"));
4509        assert!(output.contains("ENV"));
4510        assert!(output.contains("CREATED"));
4511    }
4512
4513    #[test]
4514    fn test_format_secrets_with_env() {
4515        let secrets = vec![
4516            serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4517            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4518        ];
4519        let output = format_secrets(&secrets);
4520        assert!(output.contains("GH_WORK"));
4521        assert!(output.contains("GITHUB_TOKEN"));
4522        assert!(output.contains("NPM_TOKEN"));
4523    }
4524
4525    #[test]
4526    fn test_format_secrets_short_timestamp() {
4527        let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4528        let output = format_secrets(&secrets);
4529        assert!(output.contains("now"));
4530    }
4531
4532    #[test]
4533    fn test_format_schedules_short_last_run_at() {
4534        // Regression: last_run_at shorter than 16 chars must not panic
4535        let schedules = vec![serde_json::json!({
4536            "name": "test",
4537            "cron": "* * * * *",
4538            "enabled": true,
4539            "last_run_at": "short",
4540            "target_node": null
4541        })];
4542        let output = format_schedules(&schedules);
4543        assert!(output.contains("short"));
4544    }
4545
4546    #[test]
4547    fn test_format_sessions_multibyte_command_truncation() {
4548        use chrono::Utc;
4549        use pulpo_common::session::SessionStatus;
4550        use uuid::Uuid;
4551
4552        // Command with multi-byte chars exceeding 50 bytes; must not panic
4553        let sessions = vec![Session {
4554            id: Uuid::nil(),
4555            name: "test".into(),
4556            workdir: "/tmp".into(),
4557            command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4558            description: None,
4559            status: SessionStatus::Active,
4560            exit_code: None,
4561            backend_session_id: None,
4562            output_snapshot: None,
4563            metadata: None,
4564            ink: None,
4565            intervention_code: None,
4566            intervention_reason: None,
4567            intervention_at: None,
4568            last_output_at: None,
4569            idle_since: None,
4570            idle_threshold_secs: None,
4571            worktree_path: None,
4572            runtime: Runtime::Tmux,
4573            created_at: Utc::now(),
4574            updated_at: Utc::now(),
4575        }];
4576        let output = format_sessions(&sessions);
4577        assert!(output.contains("..."));
4578    }
4579}